RivaCube/utils/Commodities/collector.py
2025-02-04 19:31:18 +01:00

101 lines
4.0 KiB
Python

import requests
import logging
from time import sleep
from typing import List, Dict, Optional
from datetime import datetime
from pathlib import Path
class CommodityCollector:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://financialmodelingprep.com/api/v3"
self.rate_limit_pause = 0.5
self.max_retries = 3
self.commodities = self._load_commodities()
logging.info(f"Initialized CommodityCollector with {len(self.commodities)} unique commodities")
def _load_commodities(self) -> Dict[str, str]:
"""Load commodities and their descriptions from commodities.txt file."""
try:
commodities_file = Path(__file__).parent / 'commodities.txt'
if not commodities_file.exists():
logging.error(f"Commodities file not found: {commodities_file}")
return {}
commodities = {}
with open(commodities_file, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split('#', 1)
symbol = parts[0].strip()
description = parts[1].strip() if len(parts) > 1 else ''
if symbol:
commodities[symbol] = description
logging.info(f"Loaded {len(commodities)} unique commodities")
return commodities
except Exception as e:
logging.error(f"Error loading commodities: {e}")
return {}
def _make_request(self, url: str, params: Dict) -> Optional[Dict]:
"""Make API request with retry logic."""
for attempt in range(self.max_retries):
try:
sleep(self.rate_limit_pause * (attempt + 1))
logging.debug(f"Making request to: {url} with params: {params}")
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
if not response.content:
logging.warning(f"Empty response from {url}")
return None
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"API request error (attempt {attempt + 1}/{self.max_retries}): {str(e)}")
if attempt == self.max_retries - 1:
return None
sleep(2 ** attempt)
except Exception as e:
logging.error(f"Unexpected error: {str(e)}")
return None
def get_historical_prices(self, symbol: str, from_date: datetime.date) -> List[Dict]:
"""Get historical price data for a commodity."""
try:
params = {
'apikey': self.api_key,
'from': from_date.strftime('%Y-%m-%d')
}
url = f"{self.base_url}/historical-price-full/{symbol}"
response = self._make_request(url, params)
if response and isinstance(response, dict) and 'historical' in response:
historical_data = response['historical']
clean_data = [{
'date': record['date'],
'open': float(record.get('open', 0)),
'high': float(record.get('high', 0)),
'low': float(record.get('low', 0)),
'close': float(record.get('close', 0))
} for record in historical_data]
clean_data.sort(key=lambda x: x['date'])
logging.info(f"Retrieved {len(clean_data)} historical prices for {symbol}")
return clean_data
logging.warning(f"No historical data found for {symbol}")
return []
except Exception as e:
logging.error(f"Error getting historical prices for {symbol}: {e}")
return []