import requests import logging from time import sleep from typing import List, Dict, Optional, Any, Union from datetime import datetime, timedelta from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from dataclasses import dataclass from enum import Enum import os from dotenv import load_dotenv class IndicatorType(Enum): GDP = "gdp" REAL_GDP = "realGDP" NOMINAL_GDP = "nominalPotentialGDP" GDP_PER_CAPITA = "realGDPPerCapita" INFLATION = "inflationRate" CPI = "cpi" UNEMPLOYMENT = "unemploymentRate" NONFARM_PAYROLL = "totalNonfarmPayroll" INITIAL_CLAIMS = "initialClaims" INDUSTRIAL_PRODUCTION = "industrialProductionTotalIndex" RETAIL_SALES = "retailSales" CONSUMER_SENTIMENT = "consumerSentiment" DURABLE_GOODS = "durableGoods" HOUSING_STARTS = "newPrivatelyOwnedHousingUnitsStartedTotalUnits" VEHICLE_SALES = "totalVehicleSales" INTEREST_RATE = "federalFunds" RECESSION_PROB = "smoothedUSRecessionProbabilities" CD_RATE = "3MonthOr90DayRatesAndYieldsCertificatesOfDeposit" CREDIT_RATE = "commercialBankInterestRateOnCreditCardPlansAllAccounts" MORTGAGE_30Y = "30YearFixedRateMortgageAverage" MORTGAGE_15Y = "15YearFixedRateMortgageAverage" @dataclass class APIConfig: base_url: str = "https://financialmodelingprep.com/api/v3" base_url_v4: str = "https://financialmodelingprep.com/api/v4" rate_limit_pause: float = 0.2 timeout: int = 30 max_retries: int = 3 backoff_factor: float = 0.5 class EconomicsCollector: """Collector class for fetching economic data from Financial Modeling Prep API.""" def __init__(self, api_key: Optional[str] = None): """Initialize the collector with API configuration.""" load_dotenv() self.api_key = api_key or os.getenv('FMP_API_KEY') if not self.api_key: raise ValueError("API key is required. Set FMP_API_KEY environment variable or pass as parameter.") self.config = APIConfig() self.session = self._create_session() self.logger = self._setup_logger() def _setup_logger(self) -> logging.Logger: """Configure and return logger instance.""" logger = logging.getLogger(__name__) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger def _create_session(self) -> requests.Session: """Create and configure a requests session with retry strategy.""" session = requests.Session() retry_strategy = Retry( total=self.config.max_retries, backoff_factor=self.config.backoff_factor, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("https://", adapter) session.mount("http://", adapter) return session def _make_request(self, url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]: """Make an API request with retry logic and error handling.""" try: sleep(self.config.rate_limit_pause) params['apikey'] = self.api_key self.logger.debug(f"Making request to {url}") response = self.session.get( url, params=params, timeout=self.config.timeout ) if response.status_code == 403: self.logger.error(f"Access denied to {url}. Please check your API subscription level.") return [] if response.status_code == 429: self.logger.warning("Rate limit reached, increasing pause time") self.config.rate_limit_pause *= 2 sleep(self.config.rate_limit_pause) return self._make_request(url, params) response.raise_for_status() data = response.json() if not data: self.logger.warning(f"Empty response received from {url}") return [] if isinstance(data, dict): return [data] self.logger.debug(f"Received {len(data)} records from {url}") return data except requests.exceptions.Timeout: self.logger.error(f"Timeout error accessing {url}") return [] except requests.exceptions.RequestException as e: self.logger.error(f"API request error for {url}: {str(e)}") return [] except ValueError as e: self.logger.error(f"JSON parsing error for {url}: {str(e)}") return [] except Exception as e: self.logger.error(f"Unexpected error accessing {url}: {str(e)}") return [] def _format_indicator_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Format indicator data into a consistent structure.""" formatted_data = [] for item in data: try: formatted_item = { 'date': item.get('date'), 'value': float(item.get('value', 0)), 'unit': item.get('unit', ''), 'frequency': item.get('frequency', '') } formatted_data.append(formatted_item) except (ValueError, TypeError) as e: self.logger.error(f"Error formatting indicator data: {e}, Data: {item}") continue return formatted_data def get_treasury_rates(self, from_date: str, to_date: str) -> List[Dict[str, Any]]: """Get treasury rates for a date range.""" self.logger.info(f"Fetching treasury rates from {from_date} to {to_date}") params = { 'from': from_date, 'to': to_date } data = self._make_request(f"{self.config.base_url_v4}/treasury", params) formatted_data = [] for item in data: try: formatted_item = { 'date': item.get('date'), } # Extract rate values for key, value in item.items(): if key not in ['date', 'id'] and value is not None: try: formatted_item[key] = float(value) except (ValueError, TypeError): continue formatted_data.append(formatted_item) except Exception as e: self.logger.error(f"Error formatting treasury rate: {e}, Data: {item}") continue return formatted_data def get_economic_indicator(self, indicator: Union[str, IndicatorType]) -> List[Dict[str, Any]]: """Get specific economic indicator data.""" if isinstance(indicator, str): indicator_value = indicator else: indicator_value = indicator.value self.logger.info(f"Fetching {indicator_value} indicator data") params = { 'name': indicator_value } data = self._make_request(f"{self.config.base_url_v4}/economic", params) return self._format_indicator_data(data) def get_economic_calendar(self, from_date: str, to_date: str) -> List[Dict[str, Any]]: """Get economic calendar events.""" self.logger.info(f"Fetching economic calendar from {from_date} to {to_date}") params = { 'from': from_date, 'to': to_date } data = self._make_request(f"{self.config.base_url}/economic_calendar", params) formatted_data = [] for event in data: try: # Extract date without time date_str = event.get('date', '').split(' ')[0] formatted_event = { 'date': date_str, 'event': event.get('event', ''), 'impact': event.get('impact', ''), 'actual': event.get('actual'), 'previous': event.get('previous'), 'country': event.get('country', 'US') } formatted_data.append(formatted_event) except Exception as e: self.logger.error(f"Error formatting calendar event: {e}, Data: {event}") continue return formatted_data def get_market_risk_premium(self) -> List[Dict[str, Any]]: """Get market risk premium data.""" self.logger.info("Fetching market risk premium data") data = self._make_request(f"{self.config.base_url_v4}/market_risk_premium", {}) formatted_data = [] for item in data: try: formatted_item = { 'country': item.get('country', ''), 'country_risk_premium': float(item.get('countryRiskPremium', 0)), 'total_equity_risk_premium': float(item.get('totalEquityRiskPremium', 0)) } formatted_data.append(formatted_item) except (ValueError, TypeError) as e: self.logger.error(f"Error formatting risk premium data: {e}, Data: {item}") continue return formatted_data def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit with proper cleanup.""" self.session.close() import requests import logging from time import sleep from typing import List, Dict, Optional, Any, Union from datetime import datetime, timedelta from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from dataclasses import dataclass from enum import Enum import os from dotenv import load_dotenv class IndicatorType(Enum): GDP = "gdp" REAL_GDP = "realGDP" NOMINAL_GDP = "nominalPotentialGDP" GDP_PER_CAPITA = "realGDPPerCapita" INFLATION = "inflationRate" CPI = "cpi" UNEMPLOYMENT = "unemploymentRate" NONFARM_PAYROLL = "totalNonfarmPayroll" INITIAL_CLAIMS = "initialClaims" INDUSTRIAL_PRODUCTION = "industrialProductionTotalIndex" RETAIL_SALES = "retailSales" CONSUMER_SENTIMENT = "consumerSentiment" DURABLE_GOODS = "durableGoods" HOUSING_STARTS = "newPrivatelyOwnedHousingUnitsStartedTotalUnits" VEHICLE_SALES = "totalVehicleSales" INTEREST_RATE = "federalFunds" RECESSION_PROB = "smoothedUSRecessionProbabilities" CD_RATE = "3MonthOr90DayRatesAndYieldsCertificatesOfDeposit" CREDIT_RATE = "commercialBankInterestRateOnCreditCardPlansAllAccounts" MORTGAGE_30Y = "30YearFixedRateMortgageAverage" MORTGAGE_15Y = "15YearFixedRateMortgageAverage" @dataclass class APIConfig: base_url: str = "https://financialmodelingprep.com/api/v3" base_url_v4: str = "https://financialmodelingprep.com/api/v4" rate_limit_pause: float = 0.2 timeout: int = 30 max_retries: int = 3 backoff_factor: float = 0.5 class EconomicsCollector: """Collector class for fetching economic data from Financial Modeling Prep API.""" def __init__(self, api_key: Optional[str] = None): """Initialize the collector with API configuration.""" load_dotenv() self.api_key = api_key or os.getenv('FMP_API_KEY') if not self.api_key: raise ValueError("API key is required. Set FMP_API_KEY environment variable or pass as parameter.") self.config = APIConfig() self.session = self._create_session() self.logger = self._setup_logger() def _setup_logger(self) -> logging.Logger: """Configure and return logger instance.""" logger = logging.getLogger(__name__) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger def _create_session(self) -> requests.Session: """Create and configure a requests session with retry strategy.""" session = requests.Session() retry_strategy = Retry( total=self.config.max_retries, backoff_factor=self.config.backoff_factor, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("https://", adapter) session.mount("http://", adapter) return session def _make_request(self, url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]: """Make an API request with retry logic and error handling.""" try: sleep(self.config.rate_limit_pause) params['apikey'] = self.api_key self.logger.debug(f"Making request to {url}") response = self.session.get( url, params=params, timeout=self.config.timeout ) if response.status_code == 403: self.logger.error(f"Access denied to {url}. Please check your API subscription level.") return [] if response.status_code == 429: self.logger.warning("Rate limit reached, increasing pause time") self.config.rate_limit_pause *= 2 sleep(self.config.rate_limit_pause) return self._make_request(url, params) response.raise_for_status() data = response.json() if not data: self.logger.warning(f"Empty response received from {url}") return [] if isinstance(data, dict): return [data] self.logger.debug(f"Received {len(data)} records from {url}") return data except requests.exceptions.Timeout: self.logger.error(f"Timeout error accessing {url}") return [] except requests.exceptions.RequestException as e: self.logger.error(f"API request error for {url}: {str(e)}") return [] except ValueError as e: self.logger.error(f"JSON parsing error for {url}: {str(e)}") return [] except Exception as e: self.logger.error(f"Unexpected error accessing {url}: {str(e)}") return [] def _format_indicator_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Format indicator data into a consistent structure.""" formatted_data = [] for item in data: try: formatted_item = { 'date': item.get('date'), 'value': float(item.get('value', 0)), 'unit': item.get('unit', ''), 'frequency': item.get('frequency', '') } formatted_data.append(formatted_item) except (ValueError, TypeError) as e: self.logger.error(f"Error formatting indicator data: {e}, Data: {item}") continue return formatted_data def get_treasury_rates(self, from_date: str, to_date: str) -> List[Dict[str, Any]]: """Get treasury rates for a date range.""" self.logger.info(f"Fetching treasury rates from {from_date} to {to_date}") params = { 'from': from_date, 'to': to_date } data = self._make_request(f"{self.config.base_url_v4}/treasury", params) formatted_data = [] for item in data: try: formatted_item = { 'date': item.get('date'), } # Extract rate values for key, value in item.items(): if key not in ['date', 'id'] and value is not None: try: formatted_item[key] = float(value) except (ValueError, TypeError): continue formatted_data.append(formatted_item) except Exception as e: self.logger.error(f"Error formatting treasury rate: {e}, Data: {item}") continue return formatted_data def get_economic_indicator(self, indicator: Union[str, IndicatorType]) -> List[Dict[str, Any]]: """Get specific economic indicator data.""" if isinstance(indicator, str): indicator_value = indicator else: indicator_value = indicator.value self.logger.info(f"Fetching {indicator_value} indicator data") params = { 'name': indicator_value } data = self._make_request(f"{self.config.base_url_v4}/economic", params) return self._format_indicator_data(data) def get_economic_calendar(self, from_date: str, to_date: str) -> List[Dict[str, Any]]: """Get economic calendar events.""" self.logger.info(f"Fetching economic calendar from {from_date} to {to_date}") params = { 'from': from_date, 'to': to_date } data = self._make_request(f"{self.config.base_url}/economic_calendar", params) formatted_data = [] for event in data: try: # Extract date without time date_str = event.get('date', '').split(' ')[0] formatted_event = { 'date': date_str, 'event': event.get('event', ''), 'impact': event.get('impact', ''), 'actual': event.get('actual'), 'previous': event.get('previous'), 'country': event.get('country', 'US') } formatted_data.append(formatted_event) except Exception as e: self.logger.error(f"Error formatting calendar event: {e}, Data: {event}") continue return formatted_data def get_market_risk_premium(self) -> List[Dict[str, Any]]: """Get market risk premium data.""" self.logger.info("Fetching market risk premium data") data = self._make_request(f"{self.config.base_url_v4}/market_risk_premium", {}) formatted_data = [] for item in data: try: formatted_item = { 'country': item.get('country', ''), 'country_risk_premium': float(item.get('countryRiskPremium', 0)), 'total_equity_risk_premium': float(item.get('totalEquityRiskPremium', 0)) } formatted_data.append(formatted_item) except (ValueError, TypeError) as e: self.logger.error(f"Error formatting risk premium data: {e}, Data: {item}") continue return formatted_data def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit with proper cleanup.""" self.session.close()