import os import logging from datetime import datetime import psycopg2 from dotenv import load_dotenv from typing import List, Tuple from utils.Stocks.financials.financials_collector import FinancialsCollector from utils.Stocks.financials.financials_utils import DatabaseManager, save_financial_statement def create_tables(conn: psycopg2.extensions.connection) -> None: try: with conn.cursor() as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS financial_statements ( id SERIAL PRIMARY KEY, ticker_id INTEGER NOT NULL, date DATE NOT NULL, period VARCHAR(10) NOT NULL, statement_type VARCHAR(20) NOT NULL, is_reported BOOLEAN NOT NULL, data JSONB NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE (ticker_id, date, period, statement_type, is_reported) ); CREATE INDEX IF NOT EXISTS idx_financial_statements_ticker_date ON financial_statements(ticker_id, date); """) conn.commit() logging.info("Database tables created successfully") except Exception as e: conn.rollback() logging.error(f"Error creating tables: {e}") raise def setup_logging(): log_dir = 'logs' os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f'financials_update_{datetime.now():%Y%m%d_%H%M%S}.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) class FinancialsUpdater: def __init__(self, db_manager: DatabaseManager, collector: FinancialsCollector): self.db_manager = db_manager self.collector = collector def update_ticker(self, ticker_data: Tuple[int, str]) -> None: ticker_id, ticker_symbol = ticker_data try: logging.info(f"Starting update check for {ticker_symbol}") # Check latest date in database latest_db_date = self.db_manager.get_latest_statement_date(ticker_id, 'income') logging.info(f"{ticker_symbol}: Latest database date: {latest_db_date}") # Check latest date from API latest_api_date = self.collector.get_latest_financial_statement_date(ticker_symbol) logging.info(f"{ticker_symbol}: Latest API date: {latest_api_date}") # Determine if update is needed if not latest_db_date: logging.info(f"{ticker_symbol}: No data in database, fetching all statements") self._update_financial_statements(ticker_id, ticker_symbol) elif latest_api_date and latest_api_date > latest_db_date: logging.info(f"{ticker_symbol}: Found newer data (DB: {latest_db_date}, API: {latest_api_date}), updating") self._update_financial_statements(ticker_id, ticker_symbol) else: logging.info(f"{ticker_symbol}: Data is up to date, skipping") logging.info(f"Completed update check for {ticker_symbol}") except Exception as e: logging.error(f"Error updating {ticker_symbol}: {e}") def _update_financial_statements(self, ticker_id: int, ticker_symbol: str) -> None: try: logging.info(f"Fetching financial statements for {ticker_symbol}") # Get quarterly statements logging.info(f"{ticker_symbol}: Fetching income statements") income = self.collector.get_income_statement(ticker_symbol, 'quarter') logging.info(f"{ticker_symbol}: Found {len(income)} income statements") logging.info(f"{ticker_symbol}: Fetching balance sheets") balance = self.collector.get_balance_sheet(ticker_symbol, 'quarter') logging.info(f"{ticker_symbol}: Found {len(balance)} balance sheets") logging.info(f"{ticker_symbol}: Fetching cash flow statements") cash_flow = self.collector.get_cash_flow(ticker_symbol, 'quarter') logging.info(f"{ticker_symbol}: Found {len(cash_flow)} cash flow statements") # Save statements to database statements_saved = 0 for stmt in income: save_financial_statement(self.db_manager.conn, ticker_id, 'income', 'quarter', False, stmt) statements_saved += 1 for stmt in balance: save_financial_statement(self.db_manager.conn, ticker_id, 'balance', 'quarter', False, stmt) statements_saved += 1 for stmt in cash_flow: save_financial_statement(self.db_manager.conn, ticker_id, 'cash_flow', 'quarter', False, stmt) statements_saved += 1 logging.info(f"Successfully saved {statements_saved} statements for {ticker_symbol}") except Exception as e: logging.error(f"Error updating financial statements for {ticker_symbol}: {e}") def update_all_tickers(self, tickers: List[Tuple[int, str]]) -> None: total_tickers = len(tickers) for index, ticker in enumerate(tickers, 1): logging.info(f"Processing ticker {index}/{total_tickers}") self.update_ticker(ticker) def main(): load_dotenv() setup_logging() logging.info("Starting financial statements update process") try: # Establish database connection conn = psycopg2.connect( dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT') ) logging.info("Database connection established") # Create tables if they don't exist create_tables(conn) # Initialize managers and collectors db_manager = DatabaseManager(conn) collector = FinancialsCollector(os.getenv('FMP_API_KEY')) updater = FinancialsUpdater(db_manager, collector) # Get list of tickers to update with conn.cursor() as cursor: cursor.execute("SELECT id, yf_ticker FROM tickers WHERE yf_ticker IS NOT NULL") tickers = cursor.fetchall() logging.info(f"Found {len(tickers)} tickers to process") # Process all tickers updater.update_all_tickers(tickers) logging.info("Financial updates completed successfully") except Exception as e: logging.error(f"Critical error in financial updates: {e}") finally: if 'conn' in locals(): conn.close() logging.info("Database connection closed") if __name__ == "__main__": main()