503 lines
19 KiB
Python
503 lines
19 KiB
Python
import requests
|
|
import logging
|
|
from time import sleep
|
|
from typing import List, Dict, Optional, Any, Union
|
|
from datetime import datetime, timedelta
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.packages.urllib3.util.retry import Retry
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
class IndicatorType(Enum):
|
|
GDP = "gdp"
|
|
REAL_GDP = "realGDP"
|
|
NOMINAL_GDP = "nominalPotentialGDP"
|
|
GDP_PER_CAPITA = "realGDPPerCapita"
|
|
INFLATION = "inflationRate"
|
|
CPI = "cpi"
|
|
UNEMPLOYMENT = "unemploymentRate"
|
|
NONFARM_PAYROLL = "totalNonfarmPayroll"
|
|
INITIAL_CLAIMS = "initialClaims"
|
|
INDUSTRIAL_PRODUCTION = "industrialProductionTotalIndex"
|
|
RETAIL_SALES = "retailSales"
|
|
CONSUMER_SENTIMENT = "consumerSentiment"
|
|
DURABLE_GOODS = "durableGoods"
|
|
HOUSING_STARTS = "newPrivatelyOwnedHousingUnitsStartedTotalUnits"
|
|
VEHICLE_SALES = "totalVehicleSales"
|
|
INTEREST_RATE = "federalFunds"
|
|
RECESSION_PROB = "smoothedUSRecessionProbabilities"
|
|
CD_RATE = "3MonthOr90DayRatesAndYieldsCertificatesOfDeposit"
|
|
CREDIT_RATE = "commercialBankInterestRateOnCreditCardPlansAllAccounts"
|
|
MORTGAGE_30Y = "30YearFixedRateMortgageAverage"
|
|
MORTGAGE_15Y = "15YearFixedRateMortgageAverage"
|
|
|
|
@dataclass
|
|
class APIConfig:
|
|
base_url: str = "https://financialmodelingprep.com/api/v3"
|
|
base_url_v4: str = "https://financialmodelingprep.com/api/v4"
|
|
rate_limit_pause: float = 0.2
|
|
timeout: int = 30
|
|
max_retries: int = 3
|
|
backoff_factor: float = 0.5
|
|
|
|
class EconomicsCollector:
|
|
"""Collector class for fetching economic data from Financial Modeling Prep API."""
|
|
|
|
def __init__(self, api_key: Optional[str] = None):
|
|
"""Initialize the collector with API configuration."""
|
|
load_dotenv()
|
|
self.api_key = api_key or os.getenv('FMP_API_KEY')
|
|
if not self.api_key:
|
|
raise ValueError("API key is required. Set FMP_API_KEY environment variable or pass as parameter.")
|
|
|
|
self.config = APIConfig()
|
|
self.session = self._create_session()
|
|
self.logger = self._setup_logger()
|
|
|
|
def _setup_logger(self) -> logging.Logger:
|
|
"""Configure and return logger instance."""
|
|
logger = logging.getLogger(__name__)
|
|
if not logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
logger.setLevel(logging.INFO)
|
|
return logger
|
|
|
|
def _create_session(self) -> requests.Session:
|
|
"""Create and configure a requests session with retry strategy."""
|
|
session = requests.Session()
|
|
retry_strategy = Retry(
|
|
total=self.config.max_retries,
|
|
backoff_factor=self.config.backoff_factor,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
allowed_methods=["GET"]
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
session.mount("https://", adapter)
|
|
session.mount("http://", adapter)
|
|
return session
|
|
|
|
def _make_request(self, url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Make an API request with retry logic and error handling."""
|
|
try:
|
|
sleep(self.config.rate_limit_pause)
|
|
|
|
params['apikey'] = self.api_key
|
|
|
|
self.logger.debug(f"Making request to {url}")
|
|
response = self.session.get(
|
|
url,
|
|
params=params,
|
|
timeout=self.config.timeout
|
|
)
|
|
|
|
if response.status_code == 403:
|
|
self.logger.error(f"Access denied to {url}. Please check your API subscription level.")
|
|
return []
|
|
|
|
if response.status_code == 429:
|
|
self.logger.warning("Rate limit reached, increasing pause time")
|
|
self.config.rate_limit_pause *= 2
|
|
sleep(self.config.rate_limit_pause)
|
|
return self._make_request(url, params)
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if not data:
|
|
self.logger.warning(f"Empty response received from {url}")
|
|
return []
|
|
|
|
if isinstance(data, dict):
|
|
return [data]
|
|
|
|
self.logger.debug(f"Received {len(data)} records from {url}")
|
|
return data
|
|
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"Timeout error accessing {url}")
|
|
return []
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"API request error for {url}: {str(e)}")
|
|
return []
|
|
except ValueError as e:
|
|
self.logger.error(f"JSON parsing error for {url}: {str(e)}")
|
|
return []
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error accessing {url}: {str(e)}")
|
|
return []
|
|
|
|
def _format_indicator_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Format indicator data into a consistent structure."""
|
|
formatted_data = []
|
|
for item in data:
|
|
try:
|
|
formatted_item = {
|
|
'date': item.get('date'),
|
|
'value': float(item.get('value', 0)),
|
|
'unit': item.get('unit', ''),
|
|
'frequency': item.get('frequency', '')
|
|
}
|
|
formatted_data.append(formatted_item)
|
|
except (ValueError, TypeError) as e:
|
|
self.logger.error(f"Error formatting indicator data: {e}, Data: {item}")
|
|
continue
|
|
return formatted_data
|
|
|
|
def get_treasury_rates(self, from_date: str, to_date: str) -> List[Dict[str, Any]]:
|
|
"""Get treasury rates for a date range."""
|
|
self.logger.info(f"Fetching treasury rates from {from_date} to {to_date}")
|
|
params = {
|
|
'from': from_date,
|
|
'to': to_date
|
|
}
|
|
data = self._make_request(f"{self.config.base_url_v4}/treasury", params)
|
|
|
|
formatted_data = []
|
|
for item in data:
|
|
try:
|
|
formatted_item = {
|
|
'date': item.get('date'),
|
|
}
|
|
# Extract rate values
|
|
for key, value in item.items():
|
|
if key not in ['date', 'id'] and value is not None:
|
|
try:
|
|
formatted_item[key] = float(value)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
formatted_data.append(formatted_item)
|
|
except Exception as e:
|
|
self.logger.error(f"Error formatting treasury rate: {e}, Data: {item}")
|
|
continue
|
|
|
|
return formatted_data
|
|
|
|
def get_economic_indicator(self, indicator: Union[str, IndicatorType]) -> List[Dict[str, Any]]:
|
|
"""Get specific economic indicator data."""
|
|
if isinstance(indicator, str):
|
|
indicator_value = indicator
|
|
else:
|
|
indicator_value = indicator.value
|
|
|
|
self.logger.info(f"Fetching {indicator_value} indicator data")
|
|
params = {
|
|
'name': indicator_value
|
|
}
|
|
data = self._make_request(f"{self.config.base_url_v4}/economic", params)
|
|
return self._format_indicator_data(data)
|
|
|
|
def get_economic_calendar(self, from_date: str, to_date: str) -> List[Dict[str, Any]]:
|
|
"""Get economic calendar events."""
|
|
self.logger.info(f"Fetching economic calendar from {from_date} to {to_date}")
|
|
params = {
|
|
'from': from_date,
|
|
'to': to_date
|
|
}
|
|
data = self._make_request(f"{self.config.base_url}/economic_calendar", params)
|
|
|
|
formatted_data = []
|
|
for event in data:
|
|
try:
|
|
# Extract date without time
|
|
date_str = event.get('date', '').split(' ')[0]
|
|
formatted_event = {
|
|
'date': date_str,
|
|
'event': event.get('event', ''),
|
|
'impact': event.get('impact', ''),
|
|
'actual': event.get('actual'),
|
|
'previous': event.get('previous'),
|
|
'country': event.get('country', 'US')
|
|
}
|
|
formatted_data.append(formatted_event)
|
|
except Exception as e:
|
|
self.logger.error(f"Error formatting calendar event: {e}, Data: {event}")
|
|
continue
|
|
|
|
return formatted_data
|
|
|
|
def get_market_risk_premium(self) -> List[Dict[str, Any]]:
|
|
"""Get market risk premium data."""
|
|
self.logger.info("Fetching market risk premium data")
|
|
data = self._make_request(f"{self.config.base_url_v4}/market_risk_premium", {})
|
|
|
|
formatted_data = []
|
|
for item in data:
|
|
try:
|
|
formatted_item = {
|
|
'country': item.get('country', ''),
|
|
'country_risk_premium': float(item.get('countryRiskPremium', 0)),
|
|
'total_equity_risk_premium': float(item.get('totalEquityRiskPremium', 0))
|
|
}
|
|
formatted_data.append(formatted_item)
|
|
except (ValueError, TypeError) as e:
|
|
self.logger.error(f"Error formatting risk premium data: {e}, Data: {item}")
|
|
continue
|
|
|
|
return formatted_data
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry."""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit with proper cleanup."""
|
|
self.session.close()
|
|
import requests
|
|
import logging
|
|
from time import sleep
|
|
from typing import List, Dict, Optional, Any, Union
|
|
from datetime import datetime, timedelta
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.packages.urllib3.util.retry import Retry
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
class IndicatorType(Enum):
|
|
GDP = "gdp"
|
|
REAL_GDP = "realGDP"
|
|
NOMINAL_GDP = "nominalPotentialGDP"
|
|
GDP_PER_CAPITA = "realGDPPerCapita"
|
|
INFLATION = "inflationRate"
|
|
CPI = "cpi"
|
|
UNEMPLOYMENT = "unemploymentRate"
|
|
NONFARM_PAYROLL = "totalNonfarmPayroll"
|
|
INITIAL_CLAIMS = "initialClaims"
|
|
INDUSTRIAL_PRODUCTION = "industrialProductionTotalIndex"
|
|
RETAIL_SALES = "retailSales"
|
|
CONSUMER_SENTIMENT = "consumerSentiment"
|
|
DURABLE_GOODS = "durableGoods"
|
|
HOUSING_STARTS = "newPrivatelyOwnedHousingUnitsStartedTotalUnits"
|
|
VEHICLE_SALES = "totalVehicleSales"
|
|
INTEREST_RATE = "federalFunds"
|
|
RECESSION_PROB = "smoothedUSRecessionProbabilities"
|
|
CD_RATE = "3MonthOr90DayRatesAndYieldsCertificatesOfDeposit"
|
|
CREDIT_RATE = "commercialBankInterestRateOnCreditCardPlansAllAccounts"
|
|
MORTGAGE_30Y = "30YearFixedRateMortgageAverage"
|
|
MORTGAGE_15Y = "15YearFixedRateMortgageAverage"
|
|
|
|
@dataclass
|
|
class APIConfig:
|
|
base_url: str = "https://financialmodelingprep.com/api/v3"
|
|
base_url_v4: str = "https://financialmodelingprep.com/api/v4"
|
|
rate_limit_pause: float = 0.2
|
|
timeout: int = 30
|
|
max_retries: int = 3
|
|
backoff_factor: float = 0.5
|
|
|
|
class EconomicsCollector:
|
|
"""Collector class for fetching economic data from Financial Modeling Prep API."""
|
|
|
|
def __init__(self, api_key: Optional[str] = None):
|
|
"""Initialize the collector with API configuration."""
|
|
load_dotenv()
|
|
self.api_key = api_key or os.getenv('FMP_API_KEY')
|
|
if not self.api_key:
|
|
raise ValueError("API key is required. Set FMP_API_KEY environment variable or pass as parameter.")
|
|
|
|
self.config = APIConfig()
|
|
self.session = self._create_session()
|
|
self.logger = self._setup_logger()
|
|
|
|
def _setup_logger(self) -> logging.Logger:
|
|
"""Configure and return logger instance."""
|
|
logger = logging.getLogger(__name__)
|
|
if not logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
logger.setLevel(logging.INFO)
|
|
return logger
|
|
|
|
def _create_session(self) -> requests.Session:
|
|
"""Create and configure a requests session with retry strategy."""
|
|
session = requests.Session()
|
|
retry_strategy = Retry(
|
|
total=self.config.max_retries,
|
|
backoff_factor=self.config.backoff_factor,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
allowed_methods=["GET"]
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
session.mount("https://", adapter)
|
|
session.mount("http://", adapter)
|
|
return session
|
|
|
|
def _make_request(self, url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Make an API request with retry logic and error handling."""
|
|
try:
|
|
sleep(self.config.rate_limit_pause)
|
|
|
|
params['apikey'] = self.api_key
|
|
|
|
self.logger.debug(f"Making request to {url}")
|
|
response = self.session.get(
|
|
url,
|
|
params=params,
|
|
timeout=self.config.timeout
|
|
)
|
|
|
|
if response.status_code == 403:
|
|
self.logger.error(f"Access denied to {url}. Please check your API subscription level.")
|
|
return []
|
|
|
|
if response.status_code == 429:
|
|
self.logger.warning("Rate limit reached, increasing pause time")
|
|
self.config.rate_limit_pause *= 2
|
|
sleep(self.config.rate_limit_pause)
|
|
return self._make_request(url, params)
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if not data:
|
|
self.logger.warning(f"Empty response received from {url}")
|
|
return []
|
|
|
|
if isinstance(data, dict):
|
|
return [data]
|
|
|
|
self.logger.debug(f"Received {len(data)} records from {url}")
|
|
return data
|
|
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"Timeout error accessing {url}")
|
|
return []
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"API request error for {url}: {str(e)}")
|
|
return []
|
|
except ValueError as e:
|
|
self.logger.error(f"JSON parsing error for {url}: {str(e)}")
|
|
return []
|
|
except Exception as e:
|
|
self.logger.error(f"Unexpected error accessing {url}: {str(e)}")
|
|
return []
|
|
|
|
def _format_indicator_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Format indicator data into a consistent structure."""
|
|
formatted_data = []
|
|
for item in data:
|
|
try:
|
|
formatted_item = {
|
|
'date': item.get('date'),
|
|
'value': float(item.get('value', 0)),
|
|
'unit': item.get('unit', ''),
|
|
'frequency': item.get('frequency', '')
|
|
}
|
|
formatted_data.append(formatted_item)
|
|
except (ValueError, TypeError) as e:
|
|
self.logger.error(f"Error formatting indicator data: {e}, Data: {item}")
|
|
continue
|
|
return formatted_data
|
|
|
|
def get_treasury_rates(self, from_date: str, to_date: str) -> List[Dict[str, Any]]:
|
|
"""Get treasury rates for a date range."""
|
|
self.logger.info(f"Fetching treasury rates from {from_date} to {to_date}")
|
|
params = {
|
|
'from': from_date,
|
|
'to': to_date
|
|
}
|
|
data = self._make_request(f"{self.config.base_url_v4}/treasury", params)
|
|
|
|
formatted_data = []
|
|
for item in data:
|
|
try:
|
|
formatted_item = {
|
|
'date': item.get('date'),
|
|
}
|
|
# Extract rate values
|
|
for key, value in item.items():
|
|
if key not in ['date', 'id'] and value is not None:
|
|
try:
|
|
formatted_item[key] = float(value)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
formatted_data.append(formatted_item)
|
|
except Exception as e:
|
|
self.logger.error(f"Error formatting treasury rate: {e}, Data: {item}")
|
|
continue
|
|
|
|
return formatted_data
|
|
|
|
def get_economic_indicator(self, indicator: Union[str, IndicatorType]) -> List[Dict[str, Any]]:
|
|
"""Get specific economic indicator data."""
|
|
if isinstance(indicator, str):
|
|
indicator_value = indicator
|
|
else:
|
|
indicator_value = indicator.value
|
|
|
|
self.logger.info(f"Fetching {indicator_value} indicator data")
|
|
params = {
|
|
'name': indicator_value
|
|
}
|
|
data = self._make_request(f"{self.config.base_url_v4}/economic", params)
|
|
return self._format_indicator_data(data)
|
|
|
|
def get_economic_calendar(self, from_date: str, to_date: str) -> List[Dict[str, Any]]:
|
|
"""Get economic calendar events."""
|
|
self.logger.info(f"Fetching economic calendar from {from_date} to {to_date}")
|
|
params = {
|
|
'from': from_date,
|
|
'to': to_date
|
|
}
|
|
data = self._make_request(f"{self.config.base_url}/economic_calendar", params)
|
|
|
|
formatted_data = []
|
|
for event in data:
|
|
try:
|
|
# Extract date without time
|
|
date_str = event.get('date', '').split(' ')[0]
|
|
formatted_event = {
|
|
'date': date_str,
|
|
'event': event.get('event', ''),
|
|
'impact': event.get('impact', ''),
|
|
'actual': event.get('actual'),
|
|
'previous': event.get('previous'),
|
|
'country': event.get('country', 'US')
|
|
}
|
|
formatted_data.append(formatted_event)
|
|
except Exception as e:
|
|
self.logger.error(f"Error formatting calendar event: {e}, Data: {event}")
|
|
continue
|
|
|
|
return formatted_data
|
|
|
|
def get_market_risk_premium(self) -> List[Dict[str, Any]]:
|
|
"""Get market risk premium data."""
|
|
self.logger.info("Fetching market risk premium data")
|
|
data = self._make_request(f"{self.config.base_url_v4}/market_risk_premium", {})
|
|
|
|
formatted_data = []
|
|
for item in data:
|
|
try:
|
|
formatted_item = {
|
|
'country': item.get('country', ''),
|
|
'country_risk_premium': float(item.get('countryRiskPremium', 0)),
|
|
'total_equity_risk_premium': float(item.get('totalEquityRiskPremium', 0))
|
|
}
|
|
formatted_data.append(formatted_item)
|
|
except (ValueError, TypeError) as e:
|
|
self.logger.error(f"Error formatting risk premium data: {e}, Data: {item}")
|
|
continue
|
|
|
|
return formatted_data
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry."""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit with proper cleanup."""
|
|
self.session.close()
|