RivaCube/utils/Forex/collector.py
2025-02-04 19:31:18 +01:00

128 lines
5.1 KiB
Python

import requests
import logging
from time import sleep
from typing import List, Dict, Optional
from datetime import datetime
from pathlib import Path
class ForexCollector:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://financialmodelingprep.com/api/v3"
self.rate_limit_pause = 0.5
self.max_retries = 3
# Dictionary of alternative symbols for each pair
self.symbol_alternatives = {
'EURUSD': ['EUR/USD', 'EUR=X'],
'GBPUSD': ['GBP/USD', 'GBP=X'],
'USDJPY': ['USD/JPY', 'JPY=X'],
'USDCHF': ['USD/CHF', 'CHF=X'],
'AUDUSD': ['AUD/USD', 'AUD=X'],
'USDCAD': ['USD/CAD', 'CAD=X'],
# Add more alternatives as needed
}
self.pairs = self._load_pairs()
logging.info(f"Initialized ForexCollector with {len(self.pairs)} pairs")
def _load_pairs(self) -> Dict[str, str]:
try:
forex_file = Path(__file__).parent / 'forex.txt'
if not forex_file.exists():
logging.error(f"Forex file not found: {forex_file}")
return {}
pairs = {}
with open(forex_file, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split('#', 1)
symbol = parts[0].strip()
description = parts[1].strip() if len(parts) > 1 else ''
if symbol:
pairs[symbol] = description
logging.info(f"Loaded {len(pairs)} forex pairs from file")
return pairs
except Exception as e:
logging.error(f"Error loading forex pairs: {e}")
return {}
def _make_request(self, url: str, params: Dict, max_retries: int = 3) -> Optional[Dict]:
for attempt in range(max_retries):
try:
sleep(self.rate_limit_pause * (attempt + 1))
logging.debug(f"Making request to: {url} with params: {params}")
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
if not response.content:
logging.warning(f"Empty response from {url}")
return None
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"API request error (attempt {attempt + 1}/{max_retries}) for {url}: {str(e)}")
if attempt == max_retries - 1:
return None
sleep(2 ** attempt)
except Exception as e:
logging.error(f"Unexpected error for {url}: {str(e)}")
return None
return None
def _get_alternative_symbols(self, symbol: str) -> List[str]:
alternatives = [symbol]
if symbol in self.symbol_alternatives:
alternatives.extend(self.symbol_alternatives[symbol])
seen = set()
return [x for x in alternatives if not (x in seen or seen.add(x))]
def get_historical_prices(self, symbol: str, from_date: str = None, to_date: str = None) -> List[Dict]:
try:
params = {
'apikey': self.api_key,
'from': from_date if from_date else '1900-01-01',
'to': to_date if to_date else datetime.now().strftime('%Y-%m-%d')
}
alternatives = self._get_alternative_symbols(symbol)
for alt_symbol in alternatives:
url = f"{self.base_url}/historical-price-full/{alt_symbol}"
response = self._make_request(url, params)
if response and isinstance(response, dict) and 'historical' in response:
historical_data = response['historical']
clean_data = [{
'date': record['date'],
'open': float(record.get('open', 0)),
'high': float(record.get('high', 0)),
'low': float(record.get('low', 0)),
'close': float(record.get('close', 0))
} for record in historical_data]
clean_data.sort(key=lambda x: x['date'])
logging.info(f"Retrieved {len(clean_data)} historical prices for {symbol}")
return clean_data
sleep(self.rate_limit_pause)
logging.warning(f"No historical data found for {symbol}")
return []
except Exception as e:
logging.error(f"Error getting historical prices for {symbol}: {e}")
return []
def get_pairs(self) -> List[Dict]:
return [
{'symbol': symbol, 'description': desc}
for symbol, desc in self.pairs.items()
if symbol # Ensure symbol is not empty
]