import os import logging import time import concurrent.futures from datetime import datetime from typing import List, Tuple, Optional from dotenv import load_dotenv from utils.Stocks.news.collector import NewsCollector from utils.Stocks.news.utils import DatabaseManager from functools import partial # Configuration constants API_TIMEOUT = 30 # Seconds RETRY_DELAY = 1 # Seconds MAX_RETRIES = 3 BATCH_SIZE = 25 # Tickers per batch MAX_WORKERS = 2 # Parallel workers TICKER_DELAY = 0.5 # Seconds between tickers BATCH_DELAY = 1 # Seconds between batches def setup_logging(): """Set up logging configuration with rotating files.""" log_dir = 'logs' os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f'news_update_{datetime.now():%Y%m%d_%H%M%S}.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) def ensure_database_schema(db_manager: DatabaseManager): """Ensure the database schema is up to date.""" conn = db_manager.get_connection() cursor = conn.cursor() try: # Check if stocknews table exists cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'stocknews' ); """) if not cursor.fetchone()[0]: logging.info("Creating stocknews table") cursor.execute(""" CREATE TABLE stocknews ( id SERIAL PRIMARY KEY, ticker_id INTEGER NOT NULL, news_type VARCHAR(50) NOT NULL, title TEXT NOT NULL, content TEXT, url TEXT, published_at TIMESTAMP WITH TIME ZONE NOT NULL, sentiment_score FLOAT, source VARCHAR(100), created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, CONSTRAINT fk_ticker FOREIGN KEY(ticker_id) REFERENCES tickers(id) ON DELETE CASCADE ); CREATE INDEX idx_stocknews_ticker_date ON stocknews(ticker_id, published_at); CREATE INDEX idx_stocknews_title ON stocknews(title); CREATE INDEX idx_stocknews_title ON stocknews(title); """) conn.commit() logging.info("Successfully created stocknews table") # Check if last_checked_at column exists in tickers table cursor.execute(""" SELECT column_name FROM information_schema.columns WHERE table_name = 'tickers' AND column_name = 'last_checked_at'; """) if not cursor.fetchone(): logging.info("Adding last_checked_at column to tickers table") cursor.execute(""" ALTER TABLE tickers ADD COLUMN last_checked_at TIMESTAMP WITH TIME ZONE; """) conn.commit() logging.info("Successfully added last_checked_at column") except Exception as e: conn.rollback() logging.error(f"Error ensuring schema: {e}") raise finally: cursor.close() conn.close() def get_news_with_retry( collector: NewsCollector, method: str, ticker_symbol: str, latest_date: Optional[datetime], existing_titles: set, ) -> List[dict]: """ Fetch news with retry logic. """ for attempt in range(MAX_RETRIES): try: if method == "general": return collector.get_general_news(ticker_symbol, latest_date, existing_titles) elif method == "press": return collector.get_press_releases(ticker_symbol, existing_titles) return [] except Exception as e: if attempt == MAX_RETRIES - 1: logging.error(f"Final attempt failed for {method} news - {ticker_symbol}: {e}") return [] logging.warning(f"Attempt {attempt + 1} failed for {method} news - {ticker_symbol}: {e}") time.sleep(RETRY_DELAY * (attempt + 1)) # Exponential backoff return [] def process_ticker_batch( collector: NewsCollector, db_manager: DatabaseManager, batch: List[Tuple[int, str]], batch_num: int, total_batches: int ) -> Tuple[int, int]: """ Process a batch of tickers and return collection statistics. Returns: (total_collected, total_saved) """ batch_collected = 0 batch_saved = 0 logging.info(f"Starting batch {batch_num}/{total_batches} with {len(batch)} tickers") for ticker_id, yf_ticker in batch: try: latest_date = db_manager.get_latest_news_date(ticker_id) existing_titles = db_manager.get_existing_titles(ticker_id, latest_date) # Collect news with retries general_news = get_news_with_retry(collector, "general", yf_ticker, latest_date, existing_titles) time.sleep(TICKER_DELAY) press_releases = get_news_with_retry(collector, "press", yf_ticker, latest_date, existing_titles) news_items = [] news_items.extend((ticker_id, 'general', news) for news in general_news) news_items.extend((ticker_id, 'press', news) for news in press_releases) collected = len(news_items) batch_collected += collected if collected > 0: try: saved = db_manager.save_news_batch(news_items) batch_saved += saved logging.info(f"Ticker {yf_ticker} (ID: {ticker_id}): Collected {collected}, Saved {saved} " f"(Duplicates: {collected - saved})") except Exception as e: logging.error(f"Error saving news for {yf_ticker} (ID: {ticker_id}): {e}") else: logging.debug(f"No new news for {yf_ticker} (ID: {ticker_id})") except Exception as e: logging.error(f"Error processing {yf_ticker} (ID: {ticker_id}): {e}") continue time.sleep(TICKER_DELAY) logging.info(f"Batch {batch_num}/{total_batches} completed: " f"Collected {batch_collected}, Saved {batch_saved} " f"(Duplicates: {batch_collected - batch_saved})") return batch_collected, batch_saved def chunk_tickers(tickers: List[Tuple[int, str]], chunk_size: int): """Split tickers into chunks for parallel processing.""" for i in range(0, len(tickers), chunk_size): yield tickers[i:i + chunk_size] def main(): load_dotenv() setup_logging() start_time = time.time() try: logging.info("Starting news update process") collector = NewsCollector(os.getenv('FMP_API_KEY')) db_manager = DatabaseManager() # Ensure database schema is up to date logging.info("Checking database schema...") ensure_database_schema(db_manager) logging.info("Database schema check completed") # Get tickers that need updating tickers = db_manager.get_tickers_for_update(15) # 15 minutes interval if not tickers: logging.info("No tickers need updating at this time") return logging.info(f"Found {len(tickers)} tickers to update") batches = list(chunk_tickers(tickers, BATCH_SIZE)) total_batches = len(batches) total_collected = 0 total_saved = 0 failed_tickers = [] # Process batches in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: process_batch_partial = partial( process_ticker_batch, collector, db_manager ) # Submit all batches future_to_batch = { executor.submit( process_batch_partial, batch, i + 1, total_batches ): (i, batch) for i, batch in enumerate(batches) } # Process completed batches for future in concurrent.futures.as_completed(future_to_batch): batch_idx, batch = future_to_batch[future] try: batch_collected, batch_saved = future.result() total_collected += batch_collected total_saved += batch_saved except Exception as e: logging.error(f"Batch {batch_idx + 1} failed: {e}") failed_tickers.extend(ticker[1] for ticker in batch) time.sleep(BATCH_DELAY) # Delay between batches # Calculate and log final statistics execution_time = time.time() - start_time minutes_elapsed = execution_time / 60 logging.info("\nNews update completed:") logging.info(f"Total articles collected: {total_collected}") logging.info(f"Total articles saved: {total_saved}") logging.info(f"Duplicates skipped: {total_collected - total_saved}") logging.info(f"Failed tickers: {len(failed_tickers)}") if failed_tickers: logging.info(f"Failed ticker symbols: {', '.join(failed_tickers)}") logging.info(f"Total execution time: {execution_time:.2f} seconds ({minutes_elapsed:.2f} minutes)") if total_collected > 0: logging.info(f"Processing rate: {total_collected/minutes_elapsed:.2f} articles/minute") logging.info(f"Ticker processing rate: {len(tickers)/minutes_elapsed:.2f} tickers/minute") except Exception as e: logging.error(f"Critical error in main process: {e}", exc_info=True) raise finally: logging.info("News update process finished") if __name__ == "__main__": main()