101 lines
4.0 KiB
Python
101 lines
4.0 KiB
Python
import requests
|
|
import logging
|
|
from time import sleep
|
|
from typing import List, Dict, Optional
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
class CommodityCollector:
|
|
def __init__(self, api_key: str):
|
|
self.api_key = api_key
|
|
self.base_url = "https://financialmodelingprep.com/api/v3"
|
|
self.rate_limit_pause = 0.5
|
|
self.max_retries = 3
|
|
|
|
self.commodities = self._load_commodities()
|
|
logging.info(f"Initialized CommodityCollector with {len(self.commodities)} unique commodities")
|
|
|
|
def _load_commodities(self) -> Dict[str, str]:
|
|
"""Load commodities and their descriptions from commodities.txt file."""
|
|
try:
|
|
commodities_file = Path(__file__).parent / 'commodities.txt'
|
|
if not commodities_file.exists():
|
|
logging.error(f"Commodities file not found: {commodities_file}")
|
|
return {}
|
|
|
|
commodities = {}
|
|
with open(commodities_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
parts = line.split('#', 1)
|
|
symbol = parts[0].strip()
|
|
description = parts[1].strip() if len(parts) > 1 else ''
|
|
if symbol:
|
|
commodities[symbol] = description
|
|
|
|
logging.info(f"Loaded {len(commodities)} unique commodities")
|
|
return commodities
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error loading commodities: {e}")
|
|
return {}
|
|
|
|
def _make_request(self, url: str, params: Dict) -> Optional[Dict]:
|
|
"""Make API request with retry logic."""
|
|
for attempt in range(self.max_retries):
|
|
try:
|
|
sleep(self.rate_limit_pause * (attempt + 1))
|
|
logging.debug(f"Making request to: {url} with params: {params}")
|
|
|
|
response = requests.get(url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
if not response.content:
|
|
logging.warning(f"Empty response from {url}")
|
|
return None
|
|
|
|
return response.json()
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"API request error (attempt {attempt + 1}/{self.max_retries}): {str(e)}")
|
|
if attempt == self.max_retries - 1:
|
|
return None
|
|
sleep(2 ** attempt)
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {str(e)}")
|
|
return None
|
|
|
|
def get_historical_prices(self, symbol: str, from_date: datetime.date) -> List[Dict]:
|
|
"""Get historical price data for a commodity."""
|
|
try:
|
|
params = {
|
|
'apikey': self.api_key,
|
|
'from': from_date.strftime('%Y-%m-%d')
|
|
}
|
|
|
|
url = f"{self.base_url}/historical-price-full/{symbol}"
|
|
response = self._make_request(url, params)
|
|
|
|
if response and isinstance(response, dict) and 'historical' in response:
|
|
historical_data = response['historical']
|
|
clean_data = [{
|
|
'date': record['date'],
|
|
'open': float(record.get('open', 0)),
|
|
'high': float(record.get('high', 0)),
|
|
'low': float(record.get('low', 0)),
|
|
'close': float(record.get('close', 0))
|
|
} for record in historical_data]
|
|
|
|
clean_data.sort(key=lambda x: x['date'])
|
|
logging.info(f"Retrieved {len(clean_data)} historical prices for {symbol}")
|
|
return clean_data
|
|
|
|
logging.warning(f"No historical data found for {symbol}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting historical prices for {symbol}: {e}")
|
|
return []
|