128 lines
5.1 KiB
Python
128 lines
5.1 KiB
Python
import requests
|
|
import logging
|
|
from time import sleep
|
|
from typing import List, Dict, Optional
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
class ForexCollector:
|
|
def __init__(self, api_key: str):
|
|
self.api_key = api_key
|
|
self.base_url = "https://financialmodelingprep.com/api/v3"
|
|
self.rate_limit_pause = 0.5
|
|
self.max_retries = 3
|
|
|
|
# Dictionary of alternative symbols for each pair
|
|
self.symbol_alternatives = {
|
|
'EURUSD': ['EUR/USD', 'EUR=X'],
|
|
'GBPUSD': ['GBP/USD', 'GBP=X'],
|
|
'USDJPY': ['USD/JPY', 'JPY=X'],
|
|
'USDCHF': ['USD/CHF', 'CHF=X'],
|
|
'AUDUSD': ['AUD/USD', 'AUD=X'],
|
|
'USDCAD': ['USD/CAD', 'CAD=X'],
|
|
# Add more alternatives as needed
|
|
}
|
|
|
|
self.pairs = self._load_pairs()
|
|
logging.info(f"Initialized ForexCollector with {len(self.pairs)} pairs")
|
|
|
|
def _load_pairs(self) -> Dict[str, str]:
|
|
try:
|
|
forex_file = Path(__file__).parent / 'forex.txt'
|
|
if not forex_file.exists():
|
|
logging.error(f"Forex file not found: {forex_file}")
|
|
return {}
|
|
|
|
pairs = {}
|
|
with open(forex_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
parts = line.split('#', 1)
|
|
symbol = parts[0].strip()
|
|
description = parts[1].strip() if len(parts) > 1 else ''
|
|
if symbol:
|
|
pairs[symbol] = description
|
|
|
|
logging.info(f"Loaded {len(pairs)} forex pairs from file")
|
|
return pairs
|
|
except Exception as e:
|
|
logging.error(f"Error loading forex pairs: {e}")
|
|
return {}
|
|
|
|
def _make_request(self, url: str, params: Dict, max_retries: int = 3) -> Optional[Dict]:
|
|
for attempt in range(max_retries):
|
|
try:
|
|
sleep(self.rate_limit_pause * (attempt + 1))
|
|
logging.debug(f"Making request to: {url} with params: {params}")
|
|
|
|
response = requests.get(url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
if not response.content:
|
|
logging.warning(f"Empty response from {url}")
|
|
return None
|
|
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"API request error (attempt {attempt + 1}/{max_retries}) for {url}: {str(e)}")
|
|
if attempt == max_retries - 1:
|
|
return None
|
|
sleep(2 ** attempt)
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error for {url}: {str(e)}")
|
|
return None
|
|
return None
|
|
|
|
def _get_alternative_symbols(self, symbol: str) -> List[str]:
|
|
alternatives = [symbol]
|
|
|
|
if symbol in self.symbol_alternatives:
|
|
alternatives.extend(self.symbol_alternatives[symbol])
|
|
|
|
seen = set()
|
|
return [x for x in alternatives if not (x in seen or seen.add(x))]
|
|
|
|
def get_historical_prices(self, symbol: str, from_date: str = None, to_date: str = None) -> List[Dict]:
|
|
try:
|
|
params = {
|
|
'apikey': self.api_key,
|
|
'from': from_date if from_date else '1900-01-01',
|
|
'to': to_date if to_date else datetime.now().strftime('%Y-%m-%d')
|
|
}
|
|
|
|
alternatives = self._get_alternative_symbols(symbol)
|
|
for alt_symbol in alternatives:
|
|
url = f"{self.base_url}/historical-price-full/{alt_symbol}"
|
|
response = self._make_request(url, params)
|
|
|
|
if response and isinstance(response, dict) and 'historical' in response:
|
|
historical_data = response['historical']
|
|
clean_data = [{
|
|
'date': record['date'],
|
|
'open': float(record.get('open', 0)),
|
|
'high': float(record.get('high', 0)),
|
|
'low': float(record.get('low', 0)),
|
|
'close': float(record.get('close', 0))
|
|
} for record in historical_data]
|
|
|
|
clean_data.sort(key=lambda x: x['date'])
|
|
logging.info(f"Retrieved {len(clean_data)} historical prices for {symbol}")
|
|
return clean_data
|
|
|
|
sleep(self.rate_limit_pause)
|
|
|
|
logging.warning(f"No historical data found for {symbol}")
|
|
return []
|
|
except Exception as e:
|
|
logging.error(f"Error getting historical prices for {symbol}: {e}")
|
|
return []
|
|
|
|
def get_pairs(self) -> List[Dict]:
|
|
return [
|
|
{'symbol': symbol, 'description': desc}
|
|
for symbol, desc in self.pairs.items()
|
|
if symbol # Ensure symbol is not empty
|
|
]
|