RivaCube/utils/economics/collector.py
2025-02-04 19:31:18 +01:00

503 lines
19 KiB
Python

import requests
import logging
from time import sleep
from typing import List, Dict, Optional, Any, Union
from datetime import datetime, timedelta
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from dataclasses import dataclass
from enum import Enum
import os
from dotenv import load_dotenv
class IndicatorType(Enum):
GDP = "gdp"
REAL_GDP = "realGDP"
NOMINAL_GDP = "nominalPotentialGDP"
GDP_PER_CAPITA = "realGDPPerCapita"
INFLATION = "inflationRate"
CPI = "cpi"
UNEMPLOYMENT = "unemploymentRate"
NONFARM_PAYROLL = "totalNonfarmPayroll"
INITIAL_CLAIMS = "initialClaims"
INDUSTRIAL_PRODUCTION = "industrialProductionTotalIndex"
RETAIL_SALES = "retailSales"
CONSUMER_SENTIMENT = "consumerSentiment"
DURABLE_GOODS = "durableGoods"
HOUSING_STARTS = "newPrivatelyOwnedHousingUnitsStartedTotalUnits"
VEHICLE_SALES = "totalVehicleSales"
INTEREST_RATE = "federalFunds"
RECESSION_PROB = "smoothedUSRecessionProbabilities"
CD_RATE = "3MonthOr90DayRatesAndYieldsCertificatesOfDeposit"
CREDIT_RATE = "commercialBankInterestRateOnCreditCardPlansAllAccounts"
MORTGAGE_30Y = "30YearFixedRateMortgageAverage"
MORTGAGE_15Y = "15YearFixedRateMortgageAverage"
@dataclass
class APIConfig:
base_url: str = "https://financialmodelingprep.com/api/v3"
base_url_v4: str = "https://financialmodelingprep.com/api/v4"
rate_limit_pause: float = 0.2
timeout: int = 30
max_retries: int = 3
backoff_factor: float = 0.5
class EconomicsCollector:
"""Collector class for fetching economic data from Financial Modeling Prep API."""
def __init__(self, api_key: Optional[str] = None):
"""Initialize the collector with API configuration."""
load_dotenv()
self.api_key = api_key or os.getenv('FMP_API_KEY')
if not self.api_key:
raise ValueError("API key is required. Set FMP_API_KEY environment variable or pass as parameter.")
self.config = APIConfig()
self.session = self._create_session()
self.logger = self._setup_logger()
def _setup_logger(self) -> logging.Logger:
"""Configure and return logger instance."""
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def _create_session(self) -> requests.Session:
"""Create and configure a requests session with retry strategy."""
session = requests.Session()
retry_strategy = Retry(
total=self.config.max_retries,
backoff_factor=self.config.backoff_factor,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _make_request(self, url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Make an API request with retry logic and error handling."""
try:
sleep(self.config.rate_limit_pause)
params['apikey'] = self.api_key
self.logger.debug(f"Making request to {url}")
response = self.session.get(
url,
params=params,
timeout=self.config.timeout
)
if response.status_code == 403:
self.logger.error(f"Access denied to {url}. Please check your API subscription level.")
return []
if response.status_code == 429:
self.logger.warning("Rate limit reached, increasing pause time")
self.config.rate_limit_pause *= 2
sleep(self.config.rate_limit_pause)
return self._make_request(url, params)
response.raise_for_status()
data = response.json()
if not data:
self.logger.warning(f"Empty response received from {url}")
return []
if isinstance(data, dict):
return [data]
self.logger.debug(f"Received {len(data)} records from {url}")
return data
except requests.exceptions.Timeout:
self.logger.error(f"Timeout error accessing {url}")
return []
except requests.exceptions.RequestException as e:
self.logger.error(f"API request error for {url}: {str(e)}")
return []
except ValueError as e:
self.logger.error(f"JSON parsing error for {url}: {str(e)}")
return []
except Exception as e:
self.logger.error(f"Unexpected error accessing {url}: {str(e)}")
return []
def _format_indicator_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format indicator data into a consistent structure."""
formatted_data = []
for item in data:
try:
formatted_item = {
'date': item.get('date'),
'value': float(item.get('value', 0)),
'unit': item.get('unit', ''),
'frequency': item.get('frequency', '')
}
formatted_data.append(formatted_item)
except (ValueError, TypeError) as e:
self.logger.error(f"Error formatting indicator data: {e}, Data: {item}")
continue
return formatted_data
def get_treasury_rates(self, from_date: str, to_date: str) -> List[Dict[str, Any]]:
"""Get treasury rates for a date range."""
self.logger.info(f"Fetching treasury rates from {from_date} to {to_date}")
params = {
'from': from_date,
'to': to_date
}
data = self._make_request(f"{self.config.base_url_v4}/treasury", params)
formatted_data = []
for item in data:
try:
formatted_item = {
'date': item.get('date'),
}
# Extract rate values
for key, value in item.items():
if key not in ['date', 'id'] and value is not None:
try:
formatted_item[key] = float(value)
except (ValueError, TypeError):
continue
formatted_data.append(formatted_item)
except Exception as e:
self.logger.error(f"Error formatting treasury rate: {e}, Data: {item}")
continue
return formatted_data
def get_economic_indicator(self, indicator: Union[str, IndicatorType]) -> List[Dict[str, Any]]:
"""Get specific economic indicator data."""
if isinstance(indicator, str):
indicator_value = indicator
else:
indicator_value = indicator.value
self.logger.info(f"Fetching {indicator_value} indicator data")
params = {
'name': indicator_value
}
data = self._make_request(f"{self.config.base_url_v4}/economic", params)
return self._format_indicator_data(data)
def get_economic_calendar(self, from_date: str, to_date: str) -> List[Dict[str, Any]]:
"""Get economic calendar events."""
self.logger.info(f"Fetching economic calendar from {from_date} to {to_date}")
params = {
'from': from_date,
'to': to_date
}
data = self._make_request(f"{self.config.base_url}/economic_calendar", params)
formatted_data = []
for event in data:
try:
# Extract date without time
date_str = event.get('date', '').split(' ')[0]
formatted_event = {
'date': date_str,
'event': event.get('event', ''),
'impact': event.get('impact', ''),
'actual': event.get('actual'),
'previous': event.get('previous'),
'country': event.get('country', 'US')
}
formatted_data.append(formatted_event)
except Exception as e:
self.logger.error(f"Error formatting calendar event: {e}, Data: {event}")
continue
return formatted_data
def get_market_risk_premium(self) -> List[Dict[str, Any]]:
"""Get market risk premium data."""
self.logger.info("Fetching market risk premium data")
data = self._make_request(f"{self.config.base_url_v4}/market_risk_premium", {})
formatted_data = []
for item in data:
try:
formatted_item = {
'country': item.get('country', ''),
'country_risk_premium': float(item.get('countryRiskPremium', 0)),
'total_equity_risk_premium': float(item.get('totalEquityRiskPremium', 0))
}
formatted_data.append(formatted_item)
except (ValueError, TypeError) as e:
self.logger.error(f"Error formatting risk premium data: {e}, Data: {item}")
continue
return formatted_data
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with proper cleanup."""
self.session.close()
import requests
import logging
from time import sleep
from typing import List, Dict, Optional, Any, Union
from datetime import datetime, timedelta
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from dataclasses import dataclass
from enum import Enum
import os
from dotenv import load_dotenv
class IndicatorType(Enum):
GDP = "gdp"
REAL_GDP = "realGDP"
NOMINAL_GDP = "nominalPotentialGDP"
GDP_PER_CAPITA = "realGDPPerCapita"
INFLATION = "inflationRate"
CPI = "cpi"
UNEMPLOYMENT = "unemploymentRate"
NONFARM_PAYROLL = "totalNonfarmPayroll"
INITIAL_CLAIMS = "initialClaims"
INDUSTRIAL_PRODUCTION = "industrialProductionTotalIndex"
RETAIL_SALES = "retailSales"
CONSUMER_SENTIMENT = "consumerSentiment"
DURABLE_GOODS = "durableGoods"
HOUSING_STARTS = "newPrivatelyOwnedHousingUnitsStartedTotalUnits"
VEHICLE_SALES = "totalVehicleSales"
INTEREST_RATE = "federalFunds"
RECESSION_PROB = "smoothedUSRecessionProbabilities"
CD_RATE = "3MonthOr90DayRatesAndYieldsCertificatesOfDeposit"
CREDIT_RATE = "commercialBankInterestRateOnCreditCardPlansAllAccounts"
MORTGAGE_30Y = "30YearFixedRateMortgageAverage"
MORTGAGE_15Y = "15YearFixedRateMortgageAverage"
@dataclass
class APIConfig:
base_url: str = "https://financialmodelingprep.com/api/v3"
base_url_v4: str = "https://financialmodelingprep.com/api/v4"
rate_limit_pause: float = 0.2
timeout: int = 30
max_retries: int = 3
backoff_factor: float = 0.5
class EconomicsCollector:
"""Collector class for fetching economic data from Financial Modeling Prep API."""
def __init__(self, api_key: Optional[str] = None):
"""Initialize the collector with API configuration."""
load_dotenv()
self.api_key = api_key or os.getenv('FMP_API_KEY')
if not self.api_key:
raise ValueError("API key is required. Set FMP_API_KEY environment variable or pass as parameter.")
self.config = APIConfig()
self.session = self._create_session()
self.logger = self._setup_logger()
def _setup_logger(self) -> logging.Logger:
"""Configure and return logger instance."""
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def _create_session(self) -> requests.Session:
"""Create and configure a requests session with retry strategy."""
session = requests.Session()
retry_strategy = Retry(
total=self.config.max_retries,
backoff_factor=self.config.backoff_factor,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _make_request(self, url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Make an API request with retry logic and error handling."""
try:
sleep(self.config.rate_limit_pause)
params['apikey'] = self.api_key
self.logger.debug(f"Making request to {url}")
response = self.session.get(
url,
params=params,
timeout=self.config.timeout
)
if response.status_code == 403:
self.logger.error(f"Access denied to {url}. Please check your API subscription level.")
return []
if response.status_code == 429:
self.logger.warning("Rate limit reached, increasing pause time")
self.config.rate_limit_pause *= 2
sleep(self.config.rate_limit_pause)
return self._make_request(url, params)
response.raise_for_status()
data = response.json()
if not data:
self.logger.warning(f"Empty response received from {url}")
return []
if isinstance(data, dict):
return [data]
self.logger.debug(f"Received {len(data)} records from {url}")
return data
except requests.exceptions.Timeout:
self.logger.error(f"Timeout error accessing {url}")
return []
except requests.exceptions.RequestException as e:
self.logger.error(f"API request error for {url}: {str(e)}")
return []
except ValueError as e:
self.logger.error(f"JSON parsing error for {url}: {str(e)}")
return []
except Exception as e:
self.logger.error(f"Unexpected error accessing {url}: {str(e)}")
return []
def _format_indicator_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format indicator data into a consistent structure."""
formatted_data = []
for item in data:
try:
formatted_item = {
'date': item.get('date'),
'value': float(item.get('value', 0)),
'unit': item.get('unit', ''),
'frequency': item.get('frequency', '')
}
formatted_data.append(formatted_item)
except (ValueError, TypeError) as e:
self.logger.error(f"Error formatting indicator data: {e}, Data: {item}")
continue
return formatted_data
def get_treasury_rates(self, from_date: str, to_date: str) -> List[Dict[str, Any]]:
"""Get treasury rates for a date range."""
self.logger.info(f"Fetching treasury rates from {from_date} to {to_date}")
params = {
'from': from_date,
'to': to_date
}
data = self._make_request(f"{self.config.base_url_v4}/treasury", params)
formatted_data = []
for item in data:
try:
formatted_item = {
'date': item.get('date'),
}
# Extract rate values
for key, value in item.items():
if key not in ['date', 'id'] and value is not None:
try:
formatted_item[key] = float(value)
except (ValueError, TypeError):
continue
formatted_data.append(formatted_item)
except Exception as e:
self.logger.error(f"Error formatting treasury rate: {e}, Data: {item}")
continue
return formatted_data
def get_economic_indicator(self, indicator: Union[str, IndicatorType]) -> List[Dict[str, Any]]:
"""Get specific economic indicator data."""
if isinstance(indicator, str):
indicator_value = indicator
else:
indicator_value = indicator.value
self.logger.info(f"Fetching {indicator_value} indicator data")
params = {
'name': indicator_value
}
data = self._make_request(f"{self.config.base_url_v4}/economic", params)
return self._format_indicator_data(data)
def get_economic_calendar(self, from_date: str, to_date: str) -> List[Dict[str, Any]]:
"""Get economic calendar events."""
self.logger.info(f"Fetching economic calendar from {from_date} to {to_date}")
params = {
'from': from_date,
'to': to_date
}
data = self._make_request(f"{self.config.base_url}/economic_calendar", params)
formatted_data = []
for event in data:
try:
# Extract date without time
date_str = event.get('date', '').split(' ')[0]
formatted_event = {
'date': date_str,
'event': event.get('event', ''),
'impact': event.get('impact', ''),
'actual': event.get('actual'),
'previous': event.get('previous'),
'country': event.get('country', 'US')
}
formatted_data.append(formatted_event)
except Exception as e:
self.logger.error(f"Error formatting calendar event: {e}, Data: {event}")
continue
return formatted_data
def get_market_risk_premium(self) -> List[Dict[str, Any]]:
"""Get market risk premium data."""
self.logger.info("Fetching market risk premium data")
data = self._make_request(f"{self.config.base_url_v4}/market_risk_premium", {})
formatted_data = []
for item in data:
try:
formatted_item = {
'country': item.get('country', ''),
'country_risk_premium': float(item.get('countryRiskPremium', 0)),
'total_equity_risk_premium': float(item.get('totalEquityRiskPremium', 0))
}
formatted_data.append(formatted_item)
except (ValueError, TypeError) as e:
self.logger.error(f"Error formatting risk premium data: {e}, Data: {item}")
continue
return formatted_data
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with proper cleanup."""
self.session.close()