177 lines
6.3 KiB
Python
177 lines
6.3 KiB
Python
import os
|
|
import logging
|
|
import json
|
|
from datetime import datetime
|
|
import psycopg2
|
|
from dotenv import load_dotenv
|
|
from typing import List, Dict, Tuple
|
|
import time
|
|
from utils.Stocks.valuations.collector import ValuationCollector
|
|
from utils.Stocks.valuations.utils import (
|
|
save_dcf_valuation,
|
|
save_company_rating,
|
|
init_database,
|
|
setup_logging
|
|
)
|
|
|
|
def check_ticker_updates(conn: psycopg2.extensions.connection, ticker_id: int) -> bool:
|
|
"""
|
|
Check if a ticker has been updated recently.
|
|
Returns True if ticker needs update, False if it's already up to date.
|
|
"""
|
|
cursor = conn.cursor()
|
|
try:
|
|
# Check for recent DCF valuations and ratings (within last 90 days)
|
|
cursor.execute("""
|
|
WITH recent_dcf AS (
|
|
SELECT dcf_type, MAX(date) as last_update
|
|
FROM dcf_valuations
|
|
WHERE ticker_id = %s
|
|
AND date >= CURRENT_DATE - INTERVAL '90 days'
|
|
GROUP BY dcf_type
|
|
),
|
|
recent_rating AS (
|
|
SELECT MAX(date) as last_rating
|
|
FROM company_ratings
|
|
WHERE ticker_id = %s
|
|
AND date >= CURRENT_DATE - INTERVAL '90 days'
|
|
)
|
|
SELECT
|
|
(SELECT ARRAY_AGG(dcf_type) FROM recent_dcf) as dcf_types,
|
|
(SELECT MAX(last_update) FROM recent_dcf) as latest_dcf_date,
|
|
(SELECT last_rating FROM recent_rating) as latest_rating_date;
|
|
""", (ticker_id, ticker_id))
|
|
|
|
result = cursor.fetchone()
|
|
dcf_types = result[0] if result and result[0] else []
|
|
latest_dcf_date = result[1]
|
|
latest_rating_date = result[2]
|
|
|
|
required_dcf_types = {'simple', 'advanced', 'levered'}
|
|
missing_dcf_types = required_dcf_types - set(dcf_types)
|
|
|
|
if missing_dcf_types:
|
|
logging.info(f"Ticker {ticker_id} needs update: Missing DCF types: {missing_dcf_types}")
|
|
return True
|
|
|
|
if not latest_rating_date:
|
|
logging.info(f"Ticker {ticker_id} needs update: No ratings in last 90 days")
|
|
return True
|
|
|
|
logging.info(f"Ticker {ticker_id} is up to date. Latest DCF: {latest_dcf_date}, Latest Rating: {latest_rating_date}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error checking ticker updates: {e}")
|
|
return True
|
|
finally:
|
|
cursor.close()
|
|
|
|
def process_ticker_batch(conn: psycopg2.extensions.connection,
|
|
collector: ValuationCollector,
|
|
tickers: List[Tuple[int, str]]) -> None:
|
|
"""Process a batch of tickers."""
|
|
for ticker_id, ticker_symbol in tickers:
|
|
try:
|
|
# Check if ticker needs update
|
|
needs_update = check_ticker_updates(conn, ticker_id)
|
|
if not needs_update:
|
|
logging.info(f"Skipping {ticker_symbol} - Already has all valuations within 90 days")
|
|
continue
|
|
|
|
logging.info(f"Updating valuations for {ticker_symbol}")
|
|
|
|
# Get and save simple DCF
|
|
simple_dcf = collector.get_simple_dcf(ticker_symbol)
|
|
if simple_dcf and simple_dcf[0]:
|
|
save_dcf_valuation(conn, ticker_id, 'simple', simple_dcf[0])
|
|
|
|
# Get and save advanced DCF
|
|
advanced_dcf = collector.get_advanced_dcf(ticker_symbol)
|
|
if advanced_dcf and advanced_dcf[0]:
|
|
save_dcf_valuation(conn, ticker_id, 'advanced', advanced_dcf[0])
|
|
|
|
# Get and save levered DCF
|
|
levered_dcf = collector.get_levered_dcf(ticker_symbol)
|
|
if levered_dcf and levered_dcf[0]:
|
|
save_dcf_valuation(conn, ticker_id, 'levered', levered_dcf[0])
|
|
|
|
# Get and save current rating
|
|
current_rating = collector.get_company_rating(ticker_symbol)
|
|
if current_rating and current_rating[0]:
|
|
save_company_rating(conn, ticker_id, current_rating[0])
|
|
|
|
# Get and save historical ratings
|
|
historical_ratings = collector.get_historical_rating(ticker_symbol)
|
|
if historical_ratings:
|
|
for rating in historical_ratings:
|
|
if rating:
|
|
save_company_rating(conn, ticker_id, rating)
|
|
|
|
conn.commit()
|
|
logging.info(f"Completed updating valuations for {ticker_symbol}")
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error updating valuations for {ticker_symbol}: {e}")
|
|
continue
|
|
|
|
def update_all_valuations(conn: psycopg2.extensions.connection, collector: ValuationCollector):
|
|
cursor = conn.cursor()
|
|
BATCH_SIZE = 50 # Process 50 tickers at a time
|
|
|
|
try:
|
|
# Get all active tickers
|
|
cursor.execute("""
|
|
SELECT id, yf_ticker
|
|
FROM tickers
|
|
WHERE yf_ticker IS NOT NULL
|
|
ORDER BY id
|
|
""")
|
|
tickers = cursor.fetchall()
|
|
total_tickers = len(tickers)
|
|
|
|
# Process tickers in batches
|
|
for i in range(0, total_tickers, BATCH_SIZE):
|
|
batch = tickers[i:i + BATCH_SIZE]
|
|
logging.info(f"Processing batch {i//BATCH_SIZE + 1} of {(total_tickers + BATCH_SIZE - 1)//BATCH_SIZE}")
|
|
process_ticker_batch(conn, collector, batch)
|
|
|
|
# Small delay between batches to avoid overwhelming the API
|
|
if i + BATCH_SIZE < total_tickers:
|
|
time.sleep(1)
|
|
|
|
finally:
|
|
cursor.close()
|
|
|
|
def main():
|
|
load_dotenv()
|
|
setup_logging()
|
|
|
|
try:
|
|
connection = psycopg2.connect(
|
|
dbname=os.getenv('DB_NAME'),
|
|
user=os.getenv('DB_USER'),
|
|
password=os.getenv('DB_PASSWORD'),
|
|
host=os.getenv('DB_HOST'),
|
|
port=os.getenv('DB_PORT')
|
|
)
|
|
|
|
init_database(connection)
|
|
collector = ValuationCollector(os.getenv('FMP_API_KEY'))
|
|
|
|
start_time = time.time()
|
|
update_all_valuations(connection, collector)
|
|
|
|
elapsed_time = time.time() - start_time
|
|
logging.info(f"Valuations update completed in {elapsed_time:.2f} seconds")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error updating valuations: {e}")
|
|
finally:
|
|
if 'connection' in locals():
|
|
connection.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|