142 lines
4.8 KiB
Python
142 lines
4.8 KiB
Python
import requests
|
|
import logging
|
|
from time import sleep
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
from requests.exceptions import (
|
|
ConnectionError,
|
|
Timeout,
|
|
RequestException,
|
|
HTTPError
|
|
)
|
|
import socket
|
|
|
|
class ConsensusCollector:
|
|
def __init__(self, api_key: str):
|
|
self.api_key = api_key
|
|
self.base_url = "https://financialmodelingprep.com/api/v4"
|
|
self.rate_limit_pause = 0.5
|
|
self.stats = {
|
|
'requests': 0,
|
|
'successful': 0,
|
|
'failed': 0,
|
|
'empty': 0,
|
|
'network_errors': 0
|
|
}
|
|
|
|
def _make_request(self, url: str, params: dict, max_retries: int = 3) -> Any:
|
|
"""Make API request with enhanced retry logic and error handling."""
|
|
self.stats['requests'] += 1
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# Add exponential backoff
|
|
sleep_time = self.rate_limit_pause * (2 ** attempt)
|
|
sleep(sleep_time)
|
|
|
|
# Configure timeout and verify SSL
|
|
response = requests.get(
|
|
url,
|
|
params=params,
|
|
timeout=(5, 30), # (connect timeout, read timeout)
|
|
verify=True
|
|
)
|
|
|
|
# Handle HTTP errors
|
|
response.raise_for_status()
|
|
|
|
if not response.content:
|
|
self.stats['empty'] += 1
|
|
logging.warning(f"Empty response from {url}")
|
|
return []
|
|
|
|
data = response.json()
|
|
self.stats['successful'] += 1
|
|
return data
|
|
|
|
except ConnectionError as e:
|
|
self.stats['network_errors'] += 1
|
|
logging.error(
|
|
f"Network connection error (attempt {attempt + 1}/{max_retries})"
|
|
f" for {url}: {str(e)}"
|
|
)
|
|
|
|
except Timeout as e:
|
|
self.stats['network_errors'] += 1
|
|
logging.error(
|
|
f"Timeout error (attempt {attempt + 1}/{max_retries})"
|
|
f" for {url}: {str(e)}"
|
|
)
|
|
|
|
except HTTPError as e:
|
|
self.stats['failed'] += 1
|
|
logging.error(
|
|
f"HTTP error (attempt {attempt + 1}/{max_retries})"
|
|
f" for {url}: {str(e)}"
|
|
)
|
|
|
|
except socket.error as e:
|
|
self.stats['network_errors'] += 1
|
|
logging.error(
|
|
f"Socket error (attempt {attempt + 1}/{max_retries})"
|
|
f" for {url}: {str(e)}"
|
|
)
|
|
|
|
except Exception as e:
|
|
self.stats['failed'] += 1
|
|
logging.error(
|
|
f"Unexpected error (attempt {attempt + 1}/{max_retries})"
|
|
f" for {url}: {str(e)}"
|
|
)
|
|
|
|
# If not the last attempt, wait before retrying
|
|
if attempt < max_retries - 1:
|
|
sleep(2 ** attempt)
|
|
else:
|
|
logging.error(f"All retry attempts failed for {url}")
|
|
return []
|
|
|
|
def get_upgrades_downgrades(self, ticker: str) -> List[Dict]:
|
|
"""Get upgrades/downgrades data with validation."""
|
|
try:
|
|
params = {'apikey': self.api_key, 'symbol': ticker}
|
|
data = self._make_request(f"{self.base_url}/upgrades-downgrades", params)
|
|
|
|
if not isinstance(data, list):
|
|
logging.warning(f"Unexpected data format for {ticker}: {type(data)}")
|
|
return []
|
|
|
|
return data
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting upgrades/downgrades for {ticker}: {str(e)}")
|
|
return []
|
|
|
|
def get_upgrades_downgrades_consensus(self, ticker: str) -> Optional[Dict]:
|
|
"""Get upgrades/downgrades consensus data with validation."""
|
|
try:
|
|
params = {'apikey': self.api_key, 'symbol': ticker}
|
|
data = self._make_request(
|
|
f"{self.base_url}/upgrades-downgrades-consensus",
|
|
params
|
|
)
|
|
|
|
if isinstance(data, list) and data:
|
|
return data[0]
|
|
|
|
logging.warning(
|
|
f"Unexpected consensus data format for {ticker}:"
|
|
f" {type(data)}"
|
|
)
|
|
return None
|
|
|
|
except Exception as e:
|
|
logging.error(
|
|
f"Error getting consensus for {ticker}: {str(e)}"
|
|
)
|
|
return None
|
|
|
|
def get_stats(self) -> Dict:
|
|
"""Get collector statistics."""
|
|
return self.stats
|