import os import logging from datetime import datetime import psycopg2 from dotenv import load_dotenv from concurrent.futures import ThreadPoolExecutor from typing import List from utils.Stocks.financials import FinancialsCollector from utils.Stocks.financials.statements_utils import save_financial_statement, StatementStatus # Maximum number of threads for parallel processing MAX_THREADS = int(os.getenv('MAX_THREADS', 10)) def setup_logging(): """Set up logging configuration.""" log_dir = os.getenv('LOG_DIR', 'logs') os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f'statements_update_{datetime.now():%Y%m%d_%H%M%S}.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler(log_file), logging.StreamHandler()] ) def update_ticker_statements( conn: psycopg2.extensions.connection, status_tracker: StatementStatus, collector: FinancialsCollector, ticker_id: int, ticker_symbol: str ): """Update financial statements for a single ticker.""" try: logging.info(f"Updating statements for {ticker_symbol}") status_tracker.update_status(ticker_id, "IN_PROGRESS") # Fetch and save financial statements for both annual and quarterly periods for period in ['annual', 'quarter']: statements = { 'income': collector.get_income_statement(ticker_symbol, period), 'balance': collector.get_balance_sheet(ticker_symbol, period), 'cash_flow': collector.get_cash_flow(ticker_symbol, period) } for stmt_type, data in statements.items(): if data: for stmt in data: save_financial_statement(conn, ticker_id, stmt_type, period, stmt) status_tracker.update_status(ticker_id, "COMPLETED") logging.info(f"Completed updating statements for {ticker_symbol}") except Exception as e: error_msg = f"Error updating statements for {ticker_symbol}: {e}" logging.error(error_msg) status_tracker.update_status(ticker_id, "ERROR", error_msg) def update_all_statements(conn: psycopg2.extensions.connection, collector: FinancialsCollector): """Update statements for all tickers.""" status_tracker = StatementStatus(conn) cursor = conn.cursor() try: # Fetch all tickers from the database cursor.execute(""" SELECT id, yf_ticker FROM tickers WHERE yf_ticker IS NOT NULL ORDER BY id ASC """) tickers = cursor.fetchall() logging.info(f"Found {len(tickers)} tickers to update.") # Use ThreadPoolExecutor for parallel processing with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: futures = [ executor.submit(update_ticker_statements, conn, status_tracker, collector, ticker_id, ticker_symbol) for ticker_id, ticker_symbol in tickers ] for future in futures: future.result() # Ensure all tasks complete logging.info("All tickers updated successfully.") except Exception as e: logging.error(f"Error during bulk update: {e}") finally: cursor.close() def main(): """Main function to initialize connections and update financial statements.""" load_dotenv() setup_logging() try: # Connect to the database connection = psycopg2.connect( dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT') ) # Initialize the financial metrics collector collector = FinancialsCollector(os.getenv('FMP_API_KEY')) # Update all financial statements update_all_statements(connection, collector) logging.info("Financial statements update completed successfully.") except Exception as e: logging.error(f"Error in main function: {e}") finally: if 'connection' in locals(): connection.close() logging.info("Database connection closed.") if __name__ == "__main__": main()