RivaCube/utils/Stocks/consensus/collector.py
2025-02-04 19:31:18 +01:00

142 lines
4.8 KiB
Python

import requests
import logging
from time import sleep
from typing import List, Dict, Any, Optional
from datetime import datetime
from requests.exceptions import (
ConnectionError,
Timeout,
RequestException,
HTTPError
)
import socket
class ConsensusCollector:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://financialmodelingprep.com/api/v4"
self.rate_limit_pause = 0.5
self.stats = {
'requests': 0,
'successful': 0,
'failed': 0,
'empty': 0,
'network_errors': 0
}
def _make_request(self, url: str, params: dict, max_retries: int = 3) -> Any:
"""Make API request with enhanced retry logic and error handling."""
self.stats['requests'] += 1
for attempt in range(max_retries):
try:
# Add exponential backoff
sleep_time = self.rate_limit_pause * (2 ** attempt)
sleep(sleep_time)
# Configure timeout and verify SSL
response = requests.get(
url,
params=params,
timeout=(5, 30), # (connect timeout, read timeout)
verify=True
)
# Handle HTTP errors
response.raise_for_status()
if not response.content:
self.stats['empty'] += 1
logging.warning(f"Empty response from {url}")
return []
data = response.json()
self.stats['successful'] += 1
return data
except ConnectionError as e:
self.stats['network_errors'] += 1
logging.error(
f"Network connection error (attempt {attempt + 1}/{max_retries})"
f" for {url}: {str(e)}"
)
except Timeout as e:
self.stats['network_errors'] += 1
logging.error(
f"Timeout error (attempt {attempt + 1}/{max_retries})"
f" for {url}: {str(e)}"
)
except HTTPError as e:
self.stats['failed'] += 1
logging.error(
f"HTTP error (attempt {attempt + 1}/{max_retries})"
f" for {url}: {str(e)}"
)
except socket.error as e:
self.stats['network_errors'] += 1
logging.error(
f"Socket error (attempt {attempt + 1}/{max_retries})"
f" for {url}: {str(e)}"
)
except Exception as e:
self.stats['failed'] += 1
logging.error(
f"Unexpected error (attempt {attempt + 1}/{max_retries})"
f" for {url}: {str(e)}"
)
# If not the last attempt, wait before retrying
if attempt < max_retries - 1:
sleep(2 ** attempt)
else:
logging.error(f"All retry attempts failed for {url}")
return []
def get_upgrades_downgrades(self, ticker: str) -> List[Dict]:
"""Get upgrades/downgrades data with validation."""
try:
params = {'apikey': self.api_key, 'symbol': ticker}
data = self._make_request(f"{self.base_url}/upgrades-downgrades", params)
if not isinstance(data, list):
logging.warning(f"Unexpected data format for {ticker}: {type(data)}")
return []
return data
except Exception as e:
logging.error(f"Error getting upgrades/downgrades for {ticker}: {str(e)}")
return []
def get_upgrades_downgrades_consensus(self, ticker: str) -> Optional[Dict]:
"""Get upgrades/downgrades consensus data with validation."""
try:
params = {'apikey': self.api_key, 'symbol': ticker}
data = self._make_request(
f"{self.base_url}/upgrades-downgrades-consensus",
params
)
if isinstance(data, list) and data:
return data[0]
logging.warning(
f"Unexpected consensus data format for {ticker}:"
f" {type(data)}"
)
return None
except Exception as e:
logging.error(
f"Error getting consensus for {ticker}: {str(e)}"
)
return None
def get_stats(self) -> Dict:
"""Get collector statistics."""
return self.stats