RivaCube/utils/Indexes/collector.py
2025-02-04 19:31:18 +01:00

227 lines
8.6 KiB
Python

import requests
import logging
from time import sleep
from typing import List, Dict
from datetime import datetime
from pathlib import Path
class MarketIndexCollector:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://financialmodelingprep.com/api/v3"
self.base_url_v4 = "https://financialmodelingprep.com/api/v4"
self.rate_limit_pause = 0.5
self.stats = {
'requests': 0,
'successful': 0,
'failed': 0,
'last_update': None
}
# Simplified symbol alternatives for core indexes
self.symbol_alternatives = {
'SPX': ['GSPC', 'INX', 'SP500'],
'NDX': ['IXIC', 'QQQ', 'COMP'],
'DJI': ['DJX', 'INDU'],
'STOXX50E': ['SX5E', 'STOXX50E'],
'MSCIWORLD': ['WRLD', 'MXWO'],
'HSI': ['HK50', 'HSIX'],
'N300': ['NKY', 'NI300'],
'GSPE': ['SPEN', 'SPEG']
}
self.indexes = self._load_indexes()
logging.info(f"Initialized MarketIndexCollector with {len(self.indexes)} unique indexes")
def _load_indexes(self) -> Dict[str, str]:
"""Load index symbols and their descriptions from indexes.txt file."""
try:
index_file = Path(__file__).parent / 'indexes.txt'
if not index_file.exists():
logging.error(f"Index file not found: {index_file}")
return {}
indexes = {}
with open(index_file, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split('#', 1)
symbol = parts[0].strip()
description = parts[1].strip() if len(parts) > 1 else ''
if symbol:
indexes[symbol] = description
logging.info(f"Loaded {len(indexes)} unique indexes")
return indexes
except Exception as e:
logging.error(f"Error loading indexes: {e}")
return {}
def _make_request(self, url: str, params: Dict, max_retries: int = 3) -> List[Dict]:
"""Make API request with retry logic."""
self.stats['requests'] += 1
for attempt in range(max_retries):
try:
sleep(self.rate_limit_pause * (attempt + 1))
logging.debug(f"Making request to: {url} with params: {params}")
response = requests.get(url, params=params)
response.raise_for_status()
if not response.content:
logging.warning(f"Empty response from {url}")
return []
data = response.json()
self.stats['successful'] += 1
logging.debug(f"Received response for {url}")
return data
except requests.exceptions.RequestException as e:
self.stats['failed'] += 1
logging.error(f"API request error (attempt {attempt + 1}/{max_retries}) for {url}: {str(e)}")
if attempt == max_retries - 1:
return []
sleep(2 ** attempt)
except Exception as e:
self.stats['failed'] += 1
logging.error(f"Unexpected error for {url}: {str(e)}")
return []
def _get_alternative_symbols(self, symbol: str) -> List[str]:
"""Get list of alternative symbols to try."""
alternatives = [symbol]
clean_symbol = symbol.replace('^', '')
# Add the original symbol without '^'
if clean_symbol != symbol:
alternatives.append(clean_symbol)
# Add known alternatives
if clean_symbol in self.symbol_alternatives:
alternatives.extend(self.symbol_alternatives[clean_symbol])
# Remove duplicates while preserving order
seen = set()
return [x for x in alternatives if not (x in seen or seen.add(x))]
def get_historical_prices(self, symbol: str, from_date: str = None) -> List[Dict]:
"""Get historical price data for an index."""
try:
params = {
'apikey': self.api_key,
'from': from_date if from_date else '1900-01-01'
}
alternatives = self._get_alternative_symbols(symbol)
logging.debug(f"Trying alternatives for {symbol}: {alternatives}")
for alt_symbol in alternatives:
url = f"{self.base_url}/historical-price-full/{alt_symbol}"
response = self._make_request(url, params)
if response and isinstance(response, dict) and 'historical' in response:
historical_data = response['historical']
logging.info(f"Retrieved {len(historical_data)} historical prices for {symbol} using {alt_symbol}")
return historical_data
sleep(self.rate_limit_pause)
logging.warning(f"No historical data found for {symbol} after trying alternatives")
return []
except Exception as e:
logging.error(f"Error getting historical prices for {symbol}: {e}")
return []
def get_index_quotes(self) -> List[Dict]:
"""Get real-time quotes for all indexes."""
if not self.indexes:
logging.error("No indexes loaded")
return []
try:
all_results = []
failed_symbols = set()
# Process indexes in smaller batches
batch_size = 5
symbols = list(self.indexes.keys())
for i in range(0, len(symbols), batch_size):
batch = symbols[i:i + batch_size]
batch_results = []
# Try each symbol in the batch
for symbol in batch:
alternatives = self._get_alternative_symbols(symbol)
success = False
for alt_symbol in alternatives:
params = {'apikey': self.api_key}
url = f"{self.base_url}/quote/{alt_symbol}"
result = self._make_request(url, params)
if result and isinstance(result, list) and result[0]:
result[0]['symbol'] = symbol # Use original symbol
result[0]['description'] = self.indexes[symbol]
batch_results.extend(result)
success = True
break
sleep(self.rate_limit_pause)
if not success:
failed_symbols.add(symbol)
all_results.extend(batch_results)
if failed_symbols:
logging.warning(f"Failed to retrieve data for: {', '.join(sorted(failed_symbols))}")
self.stats['last_update'] = datetime.now()
return all_results
except Exception as e:
logging.error(f"Error getting index quotes: {e}")
return []
def get_index_info(self) -> List[Dict]:
"""Get information about available indexes."""
return [
{
'symbol': symbol,
'description': desc,
'category': self._categorize_index(symbol)
}
for symbol, desc in self.indexes.items()
]
def _categorize_index(self, symbol: str) -> str:
"""Categorize index based on symbol."""
symbol = symbol.upper()
if any(x in symbol for x in ['SPX', 'DJI', 'NDX', 'RUI', 'NYA']):
return 'US Major'
elif 'SP500-' in symbol or symbol == 'GSPE':
return 'S&P 500 Sectors'
elif any(x in symbol for x in ['STOXX', 'DAX', 'CAC', 'FTSE', 'IBEX', 'AEX']):
return 'Europe'
elif any(x in symbol for x in ['HSI', 'N300', 'KOSPI']):
return 'Asia'
elif 'MSCI' in symbol:
return 'Global'
else:
return 'Other'
def get_stats(self) -> Dict:
"""Get collector statistics."""
return {
**self.stats,
'total_indexes': len(self.indexes),
'last_update': self.stats['last_update'].isoformat() if self.stats['last_update'] else None
}