import os import logging import json from datetime import datetime import psycopg2 from dotenv import load_dotenv from typing import List, Dict, Tuple import time from utils.Stocks.valuations.collector import ValuationCollector from utils.Stocks.valuations.utils import ( save_dcf_valuation, save_company_rating, init_database, setup_logging ) def check_ticker_updates(conn: psycopg2.extensions.connection, ticker_id: int) -> bool: """ Check if a ticker has been updated recently. Returns True if ticker needs update, False if it's already up to date. """ cursor = conn.cursor() try: # Check for recent DCF valuations and ratings (within last 90 days) cursor.execute(""" WITH recent_dcf AS ( SELECT dcf_type, MAX(date) as last_update FROM dcf_valuations WHERE ticker_id = %s AND date >= CURRENT_DATE - INTERVAL '90 days' GROUP BY dcf_type ), recent_rating AS ( SELECT MAX(date) as last_rating FROM company_ratings WHERE ticker_id = %s AND date >= CURRENT_DATE - INTERVAL '90 days' ) SELECT (SELECT ARRAY_AGG(dcf_type) FROM recent_dcf) as dcf_types, (SELECT MAX(last_update) FROM recent_dcf) as latest_dcf_date, (SELECT last_rating FROM recent_rating) as latest_rating_date; """, (ticker_id, ticker_id)) result = cursor.fetchone() dcf_types = result[0] if result and result[0] else [] latest_dcf_date = result[1] latest_rating_date = result[2] required_dcf_types = {'simple', 'advanced', 'levered'} missing_dcf_types = required_dcf_types - set(dcf_types) if missing_dcf_types: logging.info(f"Ticker {ticker_id} needs update: Missing DCF types: {missing_dcf_types}") return True if not latest_rating_date: logging.info(f"Ticker {ticker_id} needs update: No ratings in last 90 days") return True logging.info(f"Ticker {ticker_id} is up to date. Latest DCF: {latest_dcf_date}, Latest Rating: {latest_rating_date}") return False except Exception as e: logging.error(f"Error checking ticker updates: {e}") return True finally: cursor.close() def process_ticker_batch(conn: psycopg2.extensions.connection, collector: ValuationCollector, tickers: List[Tuple[int, str]]) -> None: """Process a batch of tickers.""" for ticker_id, ticker_symbol in tickers: try: # Check if ticker needs update needs_update = check_ticker_updates(conn, ticker_id) if not needs_update: logging.info(f"Skipping {ticker_symbol} - Already has all valuations within 90 days") continue logging.info(f"Updating valuations for {ticker_symbol}") # Get and save simple DCF simple_dcf = collector.get_simple_dcf(ticker_symbol) if simple_dcf and simple_dcf[0]: save_dcf_valuation(conn, ticker_id, 'simple', simple_dcf[0]) # Get and save advanced DCF advanced_dcf = collector.get_advanced_dcf(ticker_symbol) if advanced_dcf and advanced_dcf[0]: save_dcf_valuation(conn, ticker_id, 'advanced', advanced_dcf[0]) # Get and save levered DCF levered_dcf = collector.get_levered_dcf(ticker_symbol) if levered_dcf and levered_dcf[0]: save_dcf_valuation(conn, ticker_id, 'levered', levered_dcf[0]) # Get and save current rating current_rating = collector.get_company_rating(ticker_symbol) if current_rating and current_rating[0]: save_company_rating(conn, ticker_id, current_rating[0]) # Get and save historical ratings historical_ratings = collector.get_historical_rating(ticker_symbol) if historical_ratings: for rating in historical_ratings: if rating: save_company_rating(conn, ticker_id, rating) conn.commit() logging.info(f"Completed updating valuations for {ticker_symbol}") except Exception as e: conn.rollback() logging.error(f"Error updating valuations for {ticker_symbol}: {e}") continue def update_all_valuations(conn: psycopg2.extensions.connection, collector: ValuationCollector): cursor = conn.cursor() BATCH_SIZE = 50 # Process 50 tickers at a time try: # Get all active tickers cursor.execute(""" SELECT id, yf_ticker FROM tickers WHERE yf_ticker IS NOT NULL ORDER BY id """) tickers = cursor.fetchall() total_tickers = len(tickers) # Process tickers in batches for i in range(0, total_tickers, BATCH_SIZE): batch = tickers[i:i + BATCH_SIZE] logging.info(f"Processing batch {i//BATCH_SIZE + 1} of {(total_tickers + BATCH_SIZE - 1)//BATCH_SIZE}") process_ticker_batch(conn, collector, batch) # Small delay between batches to avoid overwhelming the API if i + BATCH_SIZE < total_tickers: time.sleep(1) finally: cursor.close() def main(): load_dotenv() setup_logging() try: connection = psycopg2.connect( dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT') ) init_database(connection) collector = ValuationCollector(os.getenv('FMP_API_KEY')) start_time = time.time() update_all_valuations(connection, collector) elapsed_time = time.time() - start_time logging.info(f"Valuations update completed in {elapsed_time:.2f} seconds") except Exception as e: logging.error(f"Error updating valuations: {e}") finally: if 'connection' in locals(): connection.close() if __name__ == "__main__": main()