import requests import logging from time import sleep from typing import List, Dict, Any, Optional from datetime import datetime from requests.exceptions import ( ConnectionError, Timeout, RequestException, HTTPError ) import socket class ConsensusCollector: def __init__(self, api_key: str): self.api_key = api_key self.base_url = "https://financialmodelingprep.com/api/v4" self.rate_limit_pause = 0.5 self.stats = { 'requests': 0, 'successful': 0, 'failed': 0, 'empty': 0, 'network_errors': 0 } def _make_request(self, url: str, params: dict, max_retries: int = 3) -> Any: """Make API request with enhanced retry logic and error handling.""" self.stats['requests'] += 1 for attempt in range(max_retries): try: # Add exponential backoff sleep_time = self.rate_limit_pause * (2 ** attempt) sleep(sleep_time) # Configure timeout and verify SSL response = requests.get( url, params=params, timeout=(5, 30), # (connect timeout, read timeout) verify=True ) # Handle HTTP errors response.raise_for_status() if not response.content: self.stats['empty'] += 1 logging.warning(f"Empty response from {url}") return [] data = response.json() self.stats['successful'] += 1 return data except ConnectionError as e: self.stats['network_errors'] += 1 logging.error( f"Network connection error (attempt {attempt + 1}/{max_retries})" f" for {url}: {str(e)}" ) except Timeout as e: self.stats['network_errors'] += 1 logging.error( f"Timeout error (attempt {attempt + 1}/{max_retries})" f" for {url}: {str(e)}" ) except HTTPError as e: self.stats['failed'] += 1 logging.error( f"HTTP error (attempt {attempt + 1}/{max_retries})" f" for {url}: {str(e)}" ) except socket.error as e: self.stats['network_errors'] += 1 logging.error( f"Socket error (attempt {attempt + 1}/{max_retries})" f" for {url}: {str(e)}" ) except Exception as e: self.stats['failed'] += 1 logging.error( f"Unexpected error (attempt {attempt + 1}/{max_retries})" f" for {url}: {str(e)}" ) # If not the last attempt, wait before retrying if attempt < max_retries - 1: sleep(2 ** attempt) else: logging.error(f"All retry attempts failed for {url}") return [] def get_upgrades_downgrades(self, ticker: str) -> List[Dict]: """Get upgrades/downgrades data with validation.""" try: params = {'apikey': self.api_key, 'symbol': ticker} data = self._make_request(f"{self.base_url}/upgrades-downgrades", params) if not isinstance(data, list): logging.warning(f"Unexpected data format for {ticker}: {type(data)}") return [] return data except Exception as e: logging.error(f"Error getting upgrades/downgrades for {ticker}: {str(e)}") return [] def get_upgrades_downgrades_consensus(self, ticker: str) -> Optional[Dict]: """Get upgrades/downgrades consensus data with validation.""" try: params = {'apikey': self.api_key, 'symbol': ticker} data = self._make_request( f"{self.base_url}/upgrades-downgrades-consensus", params ) if isinstance(data, list) and data: return data[0] logging.warning( f"Unexpected consensus data format for {ticker}:" f" {type(data)}" ) return None except Exception as e: logging.error( f"Error getting consensus for {ticker}: {str(e)}" ) return None def get_stats(self) -> Dict: """Get collector statistics.""" return self.stats