227 lines
8.6 KiB
Python
227 lines
8.6 KiB
Python
import requests
|
|
import logging
|
|
from time import sleep
|
|
from typing import List, Dict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
class MarketIndexCollector:
|
|
def __init__(self, api_key: str):
|
|
self.api_key = api_key
|
|
self.base_url = "https://financialmodelingprep.com/api/v3"
|
|
self.base_url_v4 = "https://financialmodelingprep.com/api/v4"
|
|
self.rate_limit_pause = 0.5
|
|
self.stats = {
|
|
'requests': 0,
|
|
'successful': 0,
|
|
'failed': 0,
|
|
'last_update': None
|
|
}
|
|
# Simplified symbol alternatives for core indexes
|
|
self.symbol_alternatives = {
|
|
'SPX': ['GSPC', 'INX', 'SP500'],
|
|
'NDX': ['IXIC', 'QQQ', 'COMP'],
|
|
'DJI': ['DJX', 'INDU'],
|
|
'STOXX50E': ['SX5E', 'STOXX50E'],
|
|
'MSCIWORLD': ['WRLD', 'MXWO'],
|
|
'HSI': ['HK50', 'HSIX'],
|
|
'N300': ['NKY', 'NI300'],
|
|
'GSPE': ['SPEN', 'SPEG']
|
|
}
|
|
self.indexes = self._load_indexes()
|
|
logging.info(f"Initialized MarketIndexCollector with {len(self.indexes)} unique indexes")
|
|
|
|
def _load_indexes(self) -> Dict[str, str]:
|
|
"""Load index symbols and their descriptions from indexes.txt file."""
|
|
try:
|
|
index_file = Path(__file__).parent / 'indexes.txt'
|
|
if not index_file.exists():
|
|
logging.error(f"Index file not found: {index_file}")
|
|
return {}
|
|
|
|
indexes = {}
|
|
with open(index_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
parts = line.split('#', 1)
|
|
symbol = parts[0].strip()
|
|
description = parts[1].strip() if len(parts) > 1 else ''
|
|
|
|
if symbol:
|
|
indexes[symbol] = description
|
|
|
|
logging.info(f"Loaded {len(indexes)} unique indexes")
|
|
return indexes
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error loading indexes: {e}")
|
|
return {}
|
|
|
|
def _make_request(self, url: str, params: Dict, max_retries: int = 3) -> List[Dict]:
|
|
"""Make API request with retry logic."""
|
|
self.stats['requests'] += 1
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
sleep(self.rate_limit_pause * (attempt + 1))
|
|
logging.debug(f"Making request to: {url} with params: {params}")
|
|
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status()
|
|
|
|
if not response.content:
|
|
logging.warning(f"Empty response from {url}")
|
|
return []
|
|
|
|
data = response.json()
|
|
self.stats['successful'] += 1
|
|
logging.debug(f"Received response for {url}")
|
|
return data
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.stats['failed'] += 1
|
|
logging.error(f"API request error (attempt {attempt + 1}/{max_retries}) for {url}: {str(e)}")
|
|
if attempt == max_retries - 1:
|
|
return []
|
|
sleep(2 ** attempt)
|
|
except Exception as e:
|
|
self.stats['failed'] += 1
|
|
logging.error(f"Unexpected error for {url}: {str(e)}")
|
|
return []
|
|
|
|
def _get_alternative_symbols(self, symbol: str) -> List[str]:
|
|
"""Get list of alternative symbols to try."""
|
|
alternatives = [symbol]
|
|
clean_symbol = symbol.replace('^', '')
|
|
|
|
# Add the original symbol without '^'
|
|
if clean_symbol != symbol:
|
|
alternatives.append(clean_symbol)
|
|
|
|
# Add known alternatives
|
|
if clean_symbol in self.symbol_alternatives:
|
|
alternatives.extend(self.symbol_alternatives[clean_symbol])
|
|
|
|
# Remove duplicates while preserving order
|
|
seen = set()
|
|
return [x for x in alternatives if not (x in seen or seen.add(x))]
|
|
|
|
def get_historical_prices(self, symbol: str, from_date: str = None) -> List[Dict]:
|
|
"""Get historical price data for an index."""
|
|
try:
|
|
params = {
|
|
'apikey': self.api_key,
|
|
'from': from_date if from_date else '1900-01-01'
|
|
}
|
|
|
|
alternatives = self._get_alternative_symbols(symbol)
|
|
logging.debug(f"Trying alternatives for {symbol}: {alternatives}")
|
|
|
|
for alt_symbol in alternatives:
|
|
url = f"{self.base_url}/historical-price-full/{alt_symbol}"
|
|
response = self._make_request(url, params)
|
|
|
|
if response and isinstance(response, dict) and 'historical' in response:
|
|
historical_data = response['historical']
|
|
logging.info(f"Retrieved {len(historical_data)} historical prices for {symbol} using {alt_symbol}")
|
|
return historical_data
|
|
|
|
sleep(self.rate_limit_pause)
|
|
|
|
logging.warning(f"No historical data found for {symbol} after trying alternatives")
|
|
return []
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting historical prices for {symbol}: {e}")
|
|
return []
|
|
|
|
def get_index_quotes(self) -> List[Dict]:
|
|
"""Get real-time quotes for all indexes."""
|
|
if not self.indexes:
|
|
logging.error("No indexes loaded")
|
|
return []
|
|
|
|
try:
|
|
all_results = []
|
|
failed_symbols = set()
|
|
|
|
# Process indexes in smaller batches
|
|
batch_size = 5
|
|
symbols = list(self.indexes.keys())
|
|
|
|
for i in range(0, len(symbols), batch_size):
|
|
batch = symbols[i:i + batch_size]
|
|
batch_results = []
|
|
|
|
# Try each symbol in the batch
|
|
for symbol in batch:
|
|
alternatives = self._get_alternative_symbols(symbol)
|
|
success = False
|
|
|
|
for alt_symbol in alternatives:
|
|
params = {'apikey': self.api_key}
|
|
url = f"{self.base_url}/quote/{alt_symbol}"
|
|
result = self._make_request(url, params)
|
|
|
|
if result and isinstance(result, list) and result[0]:
|
|
result[0]['symbol'] = symbol # Use original symbol
|
|
result[0]['description'] = self.indexes[symbol]
|
|
batch_results.extend(result)
|
|
success = True
|
|
break
|
|
|
|
sleep(self.rate_limit_pause)
|
|
|
|
if not success:
|
|
failed_symbols.add(symbol)
|
|
|
|
all_results.extend(batch_results)
|
|
|
|
if failed_symbols:
|
|
logging.warning(f"Failed to retrieve data for: {', '.join(sorted(failed_symbols))}")
|
|
|
|
self.stats['last_update'] = datetime.now()
|
|
return all_results
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting index quotes: {e}")
|
|
return []
|
|
|
|
def get_index_info(self) -> List[Dict]:
|
|
"""Get information about available indexes."""
|
|
return [
|
|
{
|
|
'symbol': symbol,
|
|
'description': desc,
|
|
'category': self._categorize_index(symbol)
|
|
}
|
|
for symbol, desc in self.indexes.items()
|
|
]
|
|
|
|
def _categorize_index(self, symbol: str) -> str:
|
|
"""Categorize index based on symbol."""
|
|
symbol = symbol.upper()
|
|
if any(x in symbol for x in ['SPX', 'DJI', 'NDX', 'RUI', 'NYA']):
|
|
return 'US Major'
|
|
elif 'SP500-' in symbol or symbol == 'GSPE':
|
|
return 'S&P 500 Sectors'
|
|
elif any(x in symbol for x in ['STOXX', 'DAX', 'CAC', 'FTSE', 'IBEX', 'AEX']):
|
|
return 'Europe'
|
|
elif any(x in symbol for x in ['HSI', 'N300', 'KOSPI']):
|
|
return 'Asia'
|
|
elif 'MSCI' in symbol:
|
|
return 'Global'
|
|
else:
|
|
return 'Other'
|
|
|
|
def get_stats(self) -> Dict:
|
|
"""Get collector statistics."""
|
|
return {
|
|
**self.stats,
|
|
'total_indexes': len(self.indexes),
|
|
'last_update': self.stats['last_update'].isoformat() if self.stats['last_update'] else None
|
|
}
|