import os import logging from datetime import datetime import psycopg2 from typing import Dict, Optional, List from dotenv import load_dotenv import requests from time import sleep from psycopg2.extras import execute_values from utils.Stocks.consensus.utils import save_ratings_batch, save_rating_consensus class ConsensusCollector: """Collector for consensus and ratings data.""" def __init__(self, api_key: str): self.api_key = api_key self.base_url = "https://financialmodelingprep.com/api/v4" self.session = requests.Session() def _make_request(self, url: str, params: Dict) -> Optional[Dict]: """Make API request with rate limiting and error handling.""" try: response = self.session.get(url, params=params) response.raise_for_status() # Rate limiting sleep(0.2) # Basic rate limiting return response.json() except requests.exceptions.RequestException as e: if response.status_code == 429: # Too Many Requests sleep(1) # Wait longer on rate limit return self._make_request(url, params) # Retry once logging.error(f"API request failed: {str(e)}") if hasattr(response, 'text'): logging.error(f"Response content: {response.text}") return None except Exception as e: logging.error(f"Unexpected error in API request: {str(e)}") return None def get_upgrades_downgrades(self, ticker: str) -> List[Dict]: """Get upgrades/downgrades data with duplicate handling.""" try: params = {'apikey': self.api_key, 'symbol': ticker} data = self._make_request(f"{self.base_url}/upgrades-downgrades", params) if not data: logging.info(f"No ratings data available for {ticker}") return [] if not isinstance(data, list): logging.warning(f"Unexpected ratings data format for {ticker}: {type(data)}") return [] # Remove duplicates based on date and analyst seen = set() unique_data = [] for item in data: if not isinstance(item, dict): continue key = ( item.get('publishedDate'), item.get('gradingCompany') ) if key not in seen and None not in key: seen.add(key) unique_data.append(item) return unique_data except Exception as e: logging.error(f"Error getting upgrades/downgrades for {ticker}: {str(e)}") return [] def get_upgrades_downgrades_consensus(self, ticker: str) -> Optional[Dict]: """Get upgrades/downgrades consensus data with improved validation.""" try: params = {'apikey': self.api_key, 'symbol': ticker} data = self._make_request( f"{self.base_url}/upgrades-downgrades-consensus", params ) # Handle empty response cases if not data: logging.info(f"No consensus data available for {ticker}") return None if isinstance(data, list): if not data: # Empty list logging.info(f"No consensus data available for {ticker}") return None return data[0] # Return first item if list is not empty if isinstance(data, dict): return data logging.warning( f"Unexpected consensus data format for {ticker}:" f" {type(data)}, content: {data}" ) return None except Exception as e: logging.error( f"Error getting consensus for {ticker}: {str(e)}" ) return None def setup_logging(): """Configure logging settings.""" log_filename = f'logs/consensus_update_{datetime.now():%Y%m%d_%H%M%S}.log' os.makedirs('logs', exist_ok=True) # Clear any existing handlers logging.getLogger().handlers = [] logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_filename), logging.StreamHandler() ] ) def ensure_tables(conn: psycopg2.extensions.connection) -> None: """Ensure all required database tables exist.""" try: with conn.cursor() as cursor: # Create ratings table cursor.execute(""" CREATE TABLE IF NOT EXISTS ratings ( id SERIAL PRIMARY KEY, ticker_id INTEGER REFERENCES tickers(id), date DATE NOT NULL, analyst VARCHAR(255) NOT NULL, prior_rating VARCHAR(255), current_rating VARCHAR(255) NOT NULL, action VARCHAR(50), price_target NUMERIC, url TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE (ticker_id, date, analyst) ) """) # Create rating_consensus table cursor.execute(""" CREATE TABLE IF NOT EXISTS rating_consensus ( id SERIAL PRIMARY KEY, ticker_id INTEGER REFERENCES tickers(id), date DATE NOT NULL, consensus_rating VARCHAR(50), total_ratings INTEGER NOT NULL, buy_ratings INTEGER NOT NULL, hold_ratings INTEGER NOT NULL, sell_ratings INTEGER NOT NULL, strong_buy_ratings INTEGER NOT NULL, strong_sell_ratings INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE (ticker_id, date) ) """) conn.commit() logging.info("Ensured all required tables exist") except Exception as e: conn.rollback() logging.error(f"Error ensuring tables: {str(e)}") raise def process_ticker( conn: psycopg2.extensions.connection, collector: ConsensusCollector, ticker_id: int, ticker_symbol: str ) -> None: """Process a single ticker's consensus data.""" try: # Skip certain exchanges if any(suffix in ticker_symbol for suffix in ['.KS', '.KQ', '.SS', '.SZ']): logging.info(f"Skipping {ticker_symbol} - exchange not supported by FMP") return # Get and process ratings ratings = collector.get_upgrades_downgrades(ticker_symbol) sleep(0.5) # Add delay between API calls if ratings: save_ratings_batch(conn, ticker_id, ratings) logging.info(f"Processed {len(ratings)} ratings for {ticker_symbol}") else: logging.info(f"No ratings data available for {ticker_symbol}") # Get and process consensus rating_consensus = collector.get_upgrades_downgrades_consensus(ticker_symbol) sleep(0.5) # Add delay between API calls if rating_consensus: save_rating_consensus(conn, ticker_id, rating_consensus) logging.info(f"Processed rating consensus for {ticker_symbol}") else: logging.info(f"No consensus data available for {ticker_symbol}") except Exception as e: logging.error( f"Error processing ticker {ticker_symbol} (ID: {ticker_id}): {str(e)}", exc_info=True ) def update_all_consensus( conn: psycopg2.extensions.connection, collector: ConsensusCollector, progress_callback=None ) -> None: """Update consensus for all tickers with progress tracking.""" try: cursor = conn.cursor() # Get tickers, excluding certain exchanges cursor.execute(""" SELECT id, yf_ticker FROM tickers WHERE yf_ticker IS NOT NULL AND yf_ticker NOT LIKE '%.KS' AND yf_ticker NOT LIKE '%.KQ' AND yf_ticker NOT LIKE '%.SS' AND yf_ticker NOT LIKE '%.SZ' ORDER BY yf_ticker """) tickers = cursor.fetchall() total_tickers = len(tickers) logging.info(f"Starting consensus update for {total_tickers} tickers") for index, (ticker_id, ticker_symbol) in enumerate(tickers, 1): logging.info(f"Processing {ticker_symbol} ({index}/{total_tickers})") process_ticker(conn, collector, ticker_id, ticker_symbol) if progress_callback: progress_callback(index, total_tickers) except Exception as e: logging.error(f"Error in consensus update: {str(e)}") raise finally: cursor.close() def main(): """Main entry point for consensus update process.""" load_dotenv() setup_logging() # Create progress file progress_file = f'progress_{datetime.now():%Y%m%d_%H%M%S}.txt' try: connection = psycopg2.connect( dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT') ) ensure_tables(connection) collector = ConsensusCollector(os.getenv('FMP_API_KEY')) # Get total number of tickers (excluding unsupported exchanges) with connection.cursor() as cursor: cursor.execute(""" SELECT COUNT(*) FROM tickers WHERE yf_ticker IS NOT NULL AND yf_ticker NOT LIKE '%.KS' AND yf_ticker NOT LIKE '%.KQ' AND yf_ticker NOT LIKE '%.SS' AND yf_ticker NOT LIKE '%.SZ' """) total_tickers = cursor.fetchone()[0] # Write initial progress with open(progress_file, 'w') as f: f.write(f"0/{total_tickers} tickers processed\n") def update_progress(current, total): with open(progress_file, 'w') as f: f.write(f"{current}/{total} tickers processed\n") f.write(f"Last update: {datetime.now():%Y-%m-%d %H:%M:%S}\n") update_all_consensus(connection, collector, progress_callback=update_progress) logging.info("Consensus update completed successfully") except Exception as e: logging.error(f"Error updating consensus: {str(e)}") raise finally: if 'connection' in locals(): connection.close() if __name__ == "__main__": main()