RivaCube/update_stocksnews.py
2025-02-04 19:31:18 +01:00

274 lines
10 KiB
Python

import os
import logging
import time
import concurrent.futures
from datetime import datetime
from typing import List, Tuple, Optional
from dotenv import load_dotenv
from utils.Stocks.news.collector import NewsCollector
from utils.Stocks.news.utils import DatabaseManager
from functools import partial
# Configuration constants
API_TIMEOUT = 30 # Seconds
RETRY_DELAY = 1 # Seconds
MAX_RETRIES = 3
BATCH_SIZE = 25 # Tickers per batch
MAX_WORKERS = 2 # Parallel workers
TICKER_DELAY = 0.5 # Seconds between tickers
BATCH_DELAY = 1 # Seconds between batches
def setup_logging():
"""Set up logging configuration with rotating files."""
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f'news_update_{datetime.now():%Y%m%d_%H%M%S}.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
def ensure_database_schema(db_manager: DatabaseManager):
"""Ensure the database schema is up to date."""
conn = db_manager.get_connection()
cursor = conn.cursor()
try:
# Check if stocknews table exists
cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'stocknews'
);
""")
if not cursor.fetchone()[0]:
logging.info("Creating stocknews table")
cursor.execute("""
CREATE TABLE stocknews (
id SERIAL PRIMARY KEY,
ticker_id INTEGER NOT NULL,
news_type VARCHAR(50) NOT NULL,
title TEXT NOT NULL,
content TEXT,
url TEXT,
published_at TIMESTAMP WITH TIME ZONE NOT NULL,
sentiment_score FLOAT,
source VARCHAR(100),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT fk_ticker
FOREIGN KEY(ticker_id)
REFERENCES tickers(id)
ON DELETE CASCADE
);
CREATE INDEX idx_stocknews_ticker_date ON stocknews(ticker_id, published_at);
CREATE INDEX idx_stocknews_title ON stocknews(title);
CREATE INDEX idx_stocknews_title ON stocknews(title);
""")
conn.commit()
logging.info("Successfully created stocknews table")
# Check if last_checked_at column exists in tickers table
cursor.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'tickers'
AND column_name = 'last_checked_at';
""")
if not cursor.fetchone():
logging.info("Adding last_checked_at column to tickers table")
cursor.execute("""
ALTER TABLE tickers
ADD COLUMN last_checked_at TIMESTAMP WITH TIME ZONE;
""")
conn.commit()
logging.info("Successfully added last_checked_at column")
except Exception as e:
conn.rollback()
logging.error(f"Error ensuring schema: {e}")
raise
finally:
cursor.close()
conn.close()
def get_news_with_retry(
collector: NewsCollector,
method: str,
ticker_symbol: str,
latest_date: Optional[datetime],
existing_titles: set,
) -> List[dict]:
"""
Fetch news with retry logic.
"""
for attempt in range(MAX_RETRIES):
try:
if method == "general":
return collector.get_general_news(ticker_symbol, latest_date, existing_titles)
elif method == "press":
return collector.get_press_releases(ticker_symbol, existing_titles)
return []
except Exception as e:
if attempt == MAX_RETRIES - 1:
logging.error(f"Final attempt failed for {method} news - {ticker_symbol}: {e}")
return []
logging.warning(f"Attempt {attempt + 1} failed for {method} news - {ticker_symbol}: {e}")
time.sleep(RETRY_DELAY * (attempt + 1)) # Exponential backoff
return []
def process_ticker_batch(
collector: NewsCollector,
db_manager: DatabaseManager,
batch: List[Tuple[int, str]],
batch_num: int,
total_batches: int
) -> Tuple[int, int]:
"""
Process a batch of tickers and return collection statistics.
Returns: (total_collected, total_saved)
"""
batch_collected = 0
batch_saved = 0
logging.info(f"Starting batch {batch_num}/{total_batches} with {len(batch)} tickers")
for ticker_id, yf_ticker in batch:
try:
latest_date = db_manager.get_latest_news_date(ticker_id)
existing_titles = db_manager.get_existing_titles(ticker_id, latest_date)
# Collect news with retries
general_news = get_news_with_retry(collector, "general", yf_ticker, latest_date, existing_titles)
time.sleep(TICKER_DELAY)
press_releases = get_news_with_retry(collector, "press", yf_ticker, latest_date, existing_titles)
news_items = []
news_items.extend((ticker_id, 'general', news) for news in general_news)
news_items.extend((ticker_id, 'press', news) for news in press_releases)
collected = len(news_items)
batch_collected += collected
if collected > 0:
try:
saved = db_manager.save_news_batch(news_items)
batch_saved += saved
logging.info(f"Ticker {yf_ticker} (ID: {ticker_id}): Collected {collected}, Saved {saved} "
f"(Duplicates: {collected - saved})")
except Exception as e:
logging.error(f"Error saving news for {yf_ticker} (ID: {ticker_id}): {e}")
else:
logging.debug(f"No new news for {yf_ticker} (ID: {ticker_id})")
except Exception as e:
logging.error(f"Error processing {yf_ticker} (ID: {ticker_id}): {e}")
continue
time.sleep(TICKER_DELAY)
logging.info(f"Batch {batch_num}/{total_batches} completed: "
f"Collected {batch_collected}, Saved {batch_saved} "
f"(Duplicates: {batch_collected - batch_saved})")
return batch_collected, batch_saved
def chunk_tickers(tickers: List[Tuple[int, str]], chunk_size: int):
"""Split tickers into chunks for parallel processing."""
for i in range(0, len(tickers), chunk_size):
yield tickers[i:i + chunk_size]
def main():
load_dotenv()
setup_logging()
start_time = time.time()
try:
logging.info("Starting news update process")
collector = NewsCollector(os.getenv('FMP_API_KEY'))
db_manager = DatabaseManager()
# Ensure database schema is up to date
logging.info("Checking database schema...")
ensure_database_schema(db_manager)
logging.info("Database schema check completed")
# Get tickers that need updating
tickers = db_manager.get_tickers_for_update(15) # 15 minutes interval
if not tickers:
logging.info("No tickers need updating at this time")
return
logging.info(f"Found {len(tickers)} tickers to update")
batches = list(chunk_tickers(tickers, BATCH_SIZE))
total_batches = len(batches)
total_collected = 0
total_saved = 0
failed_tickers = []
# Process batches in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
process_batch_partial = partial(
process_ticker_batch,
collector,
db_manager
)
# Submit all batches
future_to_batch = {
executor.submit(
process_batch_partial,
batch,
i + 1,
total_batches
): (i, batch) for i, batch in enumerate(batches)
}
# Process completed batches
for future in concurrent.futures.as_completed(future_to_batch):
batch_idx, batch = future_to_batch[future]
try:
batch_collected, batch_saved = future.result()
total_collected += batch_collected
total_saved += batch_saved
except Exception as e:
logging.error(f"Batch {batch_idx + 1} failed: {e}")
failed_tickers.extend(ticker[1] for ticker in batch)
time.sleep(BATCH_DELAY) # Delay between batches
# Calculate and log final statistics
execution_time = time.time() - start_time
minutes_elapsed = execution_time / 60
logging.info("\nNews update completed:")
logging.info(f"Total articles collected: {total_collected}")
logging.info(f"Total articles saved: {total_saved}")
logging.info(f"Duplicates skipped: {total_collected - total_saved}")
logging.info(f"Failed tickers: {len(failed_tickers)}")
if failed_tickers:
logging.info(f"Failed ticker symbols: {', '.join(failed_tickers)}")
logging.info(f"Total execution time: {execution_time:.2f} seconds ({minutes_elapsed:.2f} minutes)")
if total_collected > 0:
logging.info(f"Processing rate: {total_collected/minutes_elapsed:.2f} articles/minute")
logging.info(f"Ticker processing rate: {len(tickers)/minutes_elapsed:.2f} tickers/minute")
except Exception as e:
logging.error(f"Critical error in main process: {e}", exc_info=True)
raise
finally:
logging.info("News update process finished")
if __name__ == "__main__":
main()