274 lines
10 KiB
Python
274 lines
10 KiB
Python
import os
|
|
import logging
|
|
import time
|
|
import concurrent.futures
|
|
from datetime import datetime
|
|
from typing import List, Tuple, Optional
|
|
from dotenv import load_dotenv
|
|
from utils.Stocks.news.collector import NewsCollector
|
|
from utils.Stocks.news.utils import DatabaseManager
|
|
from functools import partial
|
|
|
|
# Configuration constants
|
|
API_TIMEOUT = 30 # Seconds
|
|
RETRY_DELAY = 1 # Seconds
|
|
MAX_RETRIES = 3
|
|
BATCH_SIZE = 25 # Tickers per batch
|
|
MAX_WORKERS = 2 # Parallel workers
|
|
TICKER_DELAY = 0.5 # Seconds between tickers
|
|
BATCH_DELAY = 1 # Seconds between batches
|
|
|
|
def setup_logging():
|
|
"""Set up logging configuration with rotating files."""
|
|
log_dir = 'logs'
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
log_file = os.path.join(log_dir, f'news_update_{datetime.now():%Y%m%d_%H%M%S}.log')
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
def ensure_database_schema(db_manager: DatabaseManager):
|
|
"""Ensure the database schema is up to date."""
|
|
conn = db_manager.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Check if stocknews table exists
|
|
cursor.execute("""
|
|
SELECT EXISTS (
|
|
SELECT FROM information_schema.tables
|
|
WHERE table_name = 'stocknews'
|
|
);
|
|
""")
|
|
|
|
if not cursor.fetchone()[0]:
|
|
logging.info("Creating stocknews table")
|
|
cursor.execute("""
|
|
CREATE TABLE stocknews (
|
|
id SERIAL PRIMARY KEY,
|
|
ticker_id INTEGER NOT NULL,
|
|
news_type VARCHAR(50) NOT NULL,
|
|
title TEXT NOT NULL,
|
|
content TEXT,
|
|
url TEXT,
|
|
published_at TIMESTAMP WITH TIME ZONE NOT NULL,
|
|
sentiment_score FLOAT,
|
|
source VARCHAR(100),
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
CONSTRAINT fk_ticker
|
|
FOREIGN KEY(ticker_id)
|
|
REFERENCES tickers(id)
|
|
ON DELETE CASCADE
|
|
);
|
|
CREATE INDEX idx_stocknews_ticker_date ON stocknews(ticker_id, published_at);
|
|
CREATE INDEX idx_stocknews_title ON stocknews(title);
|
|
CREATE INDEX idx_stocknews_title ON stocknews(title);
|
|
""")
|
|
conn.commit()
|
|
logging.info("Successfully created stocknews table")
|
|
|
|
# Check if last_checked_at column exists in tickers table
|
|
cursor.execute("""
|
|
SELECT column_name
|
|
FROM information_schema.columns
|
|
WHERE table_name = 'tickers'
|
|
AND column_name = 'last_checked_at';
|
|
""")
|
|
|
|
if not cursor.fetchone():
|
|
logging.info("Adding last_checked_at column to tickers table")
|
|
cursor.execute("""
|
|
ALTER TABLE tickers
|
|
ADD COLUMN last_checked_at TIMESTAMP WITH TIME ZONE;
|
|
""")
|
|
conn.commit()
|
|
logging.info("Successfully added last_checked_at column")
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error ensuring schema: {e}")
|
|
raise
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
def get_news_with_retry(
|
|
collector: NewsCollector,
|
|
method: str,
|
|
ticker_symbol: str,
|
|
latest_date: Optional[datetime],
|
|
existing_titles: set,
|
|
) -> List[dict]:
|
|
"""
|
|
Fetch news with retry logic.
|
|
"""
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
if method == "general":
|
|
return collector.get_general_news(ticker_symbol, latest_date, existing_titles)
|
|
elif method == "press":
|
|
return collector.get_press_releases(ticker_symbol, existing_titles)
|
|
return []
|
|
except Exception as e:
|
|
if attempt == MAX_RETRIES - 1:
|
|
logging.error(f"Final attempt failed for {method} news - {ticker_symbol}: {e}")
|
|
return []
|
|
logging.warning(f"Attempt {attempt + 1} failed for {method} news - {ticker_symbol}: {e}")
|
|
time.sleep(RETRY_DELAY * (attempt + 1)) # Exponential backoff
|
|
return []
|
|
|
|
def process_ticker_batch(
|
|
collector: NewsCollector,
|
|
db_manager: DatabaseManager,
|
|
batch: List[Tuple[int, str]],
|
|
batch_num: int,
|
|
total_batches: int
|
|
) -> Tuple[int, int]:
|
|
"""
|
|
Process a batch of tickers and return collection statistics.
|
|
Returns: (total_collected, total_saved)
|
|
"""
|
|
batch_collected = 0
|
|
batch_saved = 0
|
|
|
|
logging.info(f"Starting batch {batch_num}/{total_batches} with {len(batch)} tickers")
|
|
|
|
for ticker_id, yf_ticker in batch:
|
|
try:
|
|
latest_date = db_manager.get_latest_news_date(ticker_id)
|
|
existing_titles = db_manager.get_existing_titles(ticker_id, latest_date)
|
|
|
|
# Collect news with retries
|
|
general_news = get_news_with_retry(collector, "general", yf_ticker, latest_date, existing_titles)
|
|
time.sleep(TICKER_DELAY)
|
|
press_releases = get_news_with_retry(collector, "press", yf_ticker, latest_date, existing_titles)
|
|
|
|
news_items = []
|
|
news_items.extend((ticker_id, 'general', news) for news in general_news)
|
|
news_items.extend((ticker_id, 'press', news) for news in press_releases)
|
|
|
|
collected = len(news_items)
|
|
batch_collected += collected
|
|
|
|
if collected > 0:
|
|
try:
|
|
saved = db_manager.save_news_batch(news_items)
|
|
batch_saved += saved
|
|
logging.info(f"Ticker {yf_ticker} (ID: {ticker_id}): Collected {collected}, Saved {saved} "
|
|
f"(Duplicates: {collected - saved})")
|
|
except Exception as e:
|
|
logging.error(f"Error saving news for {yf_ticker} (ID: {ticker_id}): {e}")
|
|
else:
|
|
logging.debug(f"No new news for {yf_ticker} (ID: {ticker_id})")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing {yf_ticker} (ID: {ticker_id}): {e}")
|
|
continue
|
|
|
|
time.sleep(TICKER_DELAY)
|
|
|
|
logging.info(f"Batch {batch_num}/{total_batches} completed: "
|
|
f"Collected {batch_collected}, Saved {batch_saved} "
|
|
f"(Duplicates: {batch_collected - batch_saved})")
|
|
return batch_collected, batch_saved
|
|
|
|
def chunk_tickers(tickers: List[Tuple[int, str]], chunk_size: int):
|
|
"""Split tickers into chunks for parallel processing."""
|
|
for i in range(0, len(tickers), chunk_size):
|
|
yield tickers[i:i + chunk_size]
|
|
|
|
def main():
|
|
load_dotenv()
|
|
setup_logging()
|
|
start_time = time.time()
|
|
|
|
try:
|
|
logging.info("Starting news update process")
|
|
collector = NewsCollector(os.getenv('FMP_API_KEY'))
|
|
db_manager = DatabaseManager()
|
|
|
|
# Ensure database schema is up to date
|
|
logging.info("Checking database schema...")
|
|
ensure_database_schema(db_manager)
|
|
logging.info("Database schema check completed")
|
|
|
|
# Get tickers that need updating
|
|
tickers = db_manager.get_tickers_for_update(15) # 15 minutes interval
|
|
|
|
if not tickers:
|
|
logging.info("No tickers need updating at this time")
|
|
return
|
|
|
|
logging.info(f"Found {len(tickers)} tickers to update")
|
|
|
|
batches = list(chunk_tickers(tickers, BATCH_SIZE))
|
|
total_batches = len(batches)
|
|
|
|
total_collected = 0
|
|
total_saved = 0
|
|
failed_tickers = []
|
|
|
|
# Process batches in parallel
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
|
process_batch_partial = partial(
|
|
process_ticker_batch,
|
|
collector,
|
|
db_manager
|
|
)
|
|
|
|
# Submit all batches
|
|
future_to_batch = {
|
|
executor.submit(
|
|
process_batch_partial,
|
|
batch,
|
|
i + 1,
|
|
total_batches
|
|
): (i, batch) for i, batch in enumerate(batches)
|
|
}
|
|
|
|
# Process completed batches
|
|
for future in concurrent.futures.as_completed(future_to_batch):
|
|
batch_idx, batch = future_to_batch[future]
|
|
try:
|
|
batch_collected, batch_saved = future.result()
|
|
total_collected += batch_collected
|
|
total_saved += batch_saved
|
|
except Exception as e:
|
|
logging.error(f"Batch {batch_idx + 1} failed: {e}")
|
|
failed_tickers.extend(ticker[1] for ticker in batch)
|
|
|
|
time.sleep(BATCH_DELAY) # Delay between batches
|
|
|
|
# Calculate and log final statistics
|
|
execution_time = time.time() - start_time
|
|
minutes_elapsed = execution_time / 60
|
|
|
|
logging.info("\nNews update completed:")
|
|
logging.info(f"Total articles collected: {total_collected}")
|
|
logging.info(f"Total articles saved: {total_saved}")
|
|
logging.info(f"Duplicates skipped: {total_collected - total_saved}")
|
|
logging.info(f"Failed tickers: {len(failed_tickers)}")
|
|
if failed_tickers:
|
|
logging.info(f"Failed ticker symbols: {', '.join(failed_tickers)}")
|
|
logging.info(f"Total execution time: {execution_time:.2f} seconds ({minutes_elapsed:.2f} minutes)")
|
|
|
|
if total_collected > 0:
|
|
logging.info(f"Processing rate: {total_collected/minutes_elapsed:.2f} articles/minute")
|
|
logging.info(f"Ticker processing rate: {len(tickers)/minutes_elapsed:.2f} tickers/minute")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Critical error in main process: {e}", exc_info=True)
|
|
raise
|
|
finally:
|
|
logging.info("News update process finished")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|