RivaCube/update_indexes.py
2025-02-04 19:31:18 +01:00

317 lines
11 KiB
Python

import os
import logging
from datetime import datetime, timedelta, timezone
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv
from utils.Indexes.collector import MarketIndexCollector
from utils.Indexes.utils import (
get_index_statistics,
get_last_price_date,
verify_data_integrity,
reset_and_setup_database
)
def setup_logging():
"""Setup logging configuration."""
log_dir = 'logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, f'index_update_{datetime.now():%Y%m%d_%H%M%S}.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
def check_tables_exist(connection: psycopg2.extensions.connection) -> bool:
"""Check if required database tables exist."""
cursor = None
try:
cursor = connection.cursor()
cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'market_indexes_ref'
) AND EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'market_indexes_prices'
);
""")
return cursor.fetchone()[0]
except Exception as e:
logging.error(f"Error checking tables existence: {e}")
return False
finally:
if cursor:
cursor.close()
def initialize_indexes(connection: psycopg2.extensions.connection,
collector: MarketIndexCollector) -> None:
"""Initialize market indexes in the database."""
cursor = None
try:
cursor = connection.cursor()
# Get index information from collector
indexes = collector.get_index_info()
# Get existing symbols
cursor.execute("SELECT symbol FROM market_indexes_ref")
existing_symbols = {row[0] for row in cursor.fetchall()}
# Filter out already existing symbols
new_indexes = [idx for idx in indexes if idx['symbol'] not in existing_symbols]
if not new_indexes:
logging.info("All indexes are already initialized")
return
logging.info(f"Initializing {len(new_indexes)} new indexes")
# Prepare data for batch insertion
index_data = [
(idx['symbol'], idx['description'], idx['category'], True)
for idx in new_indexes
]
execute_values(
cursor,
"""
INSERT INTO market_indexes_ref
(symbol, description, category, is_active)
VALUES %s
ON CONFLICT (symbol) DO UPDATE SET
description = EXCLUDED.description,
category = EXCLUDED.category,
is_active = EXCLUDED.is_active
""",
index_data
)
connection.commit()
logging.info(f"Successfully initialized {len(new_indexes)} indexes")
except Exception as e:
connection.rollback()
logging.error(f"Error in indexes initialization: {e}")
raise
finally:
if cursor:
cursor.close()
def update_historical_prices(connection: psycopg2.extensions.connection,
index_id: int,
symbol: str,
collector: MarketIndexCollector,
last_update: datetime.date) -> None:
"""Update historical prices for a specific market index."""
cursor = None
try:
cursor = connection.cursor()
# Set to_date to yesterday to avoid partial day data
today = datetime.now().date()
from_date = None
if last_update:
# Start 3 days before last update to ensure continuity
from_date = (last_update - timedelta(days=3)).strftime('%Y-%m-%d')
else:
# For new indexes, start from a reasonable date
from_date = '1980-01-01'
# Get historical data
historical_data = collector.get_historical_prices(symbol, from_date)
if not historical_data:
logging.warning(f"No historical data available for {symbol}")
return
# Filter out today's data and future dates
filtered_data = [
record for record in historical_data
if datetime.strptime(record['date'], '%Y-%m-%d').date() < today
]
if not filtered_data:
logging.info(f"No new complete day data available for {symbol}")
return
# Deduplicate data by date, keeping the last record for each date
date_dict = {}
duplicate_dates = set()
for record in filtered_data:
if record['date'] in date_dict:
duplicate_dates.add(record['date'])
date_dict[record['date']] = record
if duplicate_dates:
logging.warning(f"Found {len(duplicate_dates)} duplicate dates for {symbol}: {sorted(duplicate_dates)}")
# Prepare data for batch insertion with deduplicated records
records = [
(index_id, date,
record['open'], record['high'], record['low'], record['close'])
for date, record in date_dict.items()
]
# Process in batches
batch_size = 1000
total_updated = 0
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
execute_values(
cursor,
"""
INSERT INTO market_indexes_prices
(index_id, date, open, high, low, close)
VALUES %s
ON CONFLICT (index_id, date) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close
""",
batch
)
connection.commit()
total_updated += len(batch)
logging.info(f"Updated {len(batch)} records for {symbol}")
# Update last_update timestamp with timezone awareness
current_time = datetime.now(tz=timezone.utc)
cursor.execute(
"""
UPDATE market_indexes_ref
SET last_update = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(current_time, index_id)
)
connection.commit()
logging.info(f"Successfully updated {total_updated} records for {symbol}")
except Exception as e:
connection.rollback()
logging.error(f"Error updating historical prices for {symbol}: {e}")
raise
finally:
if cursor:
cursor.close()
def main():
load_dotenv()
setup_logging()
connection = None
try:
# Connect to database
connection = psycopg2.connect(
dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT')
)
connection.autocommit = False # Explicit transaction control
logging.info("Database connection established")
# Initialize collector
collector = MarketIndexCollector(os.getenv('FMP_API_KEY'))
# Check if tables exist or reset is needed
tables_exist = check_tables_exist(connection)
if not tables_exist or os.getenv('RESET_DATABASE', '').lower() == 'true':
logging.info("Setting up database tables...")
if reset_and_setup_database(connection):
logging.info("Database setup completed successfully")
else:
logging.error("Database setup failed")
return
# Initialize indexes
initialize_indexes(connection, collector)
# Get initial statistics
initial_stats = get_index_statistics(connection)
logging.info("Initial statistics:")
logging.info(f"Total indexes: {len(initial_stats['coverage'])}")
logging.info(f"Total prices: {initial_stats.get('total_prices', 0)}")
# Get active indexes for update
cursor = connection.cursor()
cursor.execute("""
SELECT id, symbol
FROM market_indexes_ref
WHERE is_active = true
ORDER BY symbol
""")
indexes = cursor.fetchall()
cursor.close()
logging.info(f"Found {len(indexes)} active indexes to update")
# Update each index
total_processed = 0
total_failed = 0
for index_id, symbol in indexes:
try:
last_update = get_last_price_date(connection, index_id)
update_historical_prices(
connection,
index_id,
symbol,
collector,
last_update
)
total_processed += 1
# Log progress every 5 indexes
if total_processed % 5 == 0:
logging.info(f"Progress: {total_processed}/{len(indexes)} indexes processed")
except Exception as e:
total_failed += 1
logging.error(f"Error processing {symbol}: {e}")
continue
# Verify data integrity
issues = verify_data_integrity(connection)
if any(issues.values()):
logging.warning("Data integrity issues found:")
for issue_type, issue_list in issues.items():
if issue_list:
logging.warning(f"{issue_type}: {len(issue_list)} issues found")
# Get final statistics
final_stats = get_index_statistics(connection)
logging.info("\nFinal statistics:")
logging.info(f"Total indexes: {len(final_stats['coverage'])}")
logging.info(f"Total prices: {final_stats.get('total_prices', 0)}")
logging.info(f"Successfully processed: {total_processed}")
logging.info(f"Failed: {total_failed}")
except Exception as e:
logging.error(f"Error in main execution: {str(e)}")
raise
finally:
try:
if connection and not connection.closed:
connection.close()
logging.info("Database connection closed")
except Exception as e:
logging.error(f"Error closing database connection: {e}")
if __name__ == "__main__":
main()