import os import logging from datetime import datetime, timedelta, timezone import psycopg2 from psycopg2.extras import execute_values from dotenv import load_dotenv from utils.Indexes.collector import MarketIndexCollector from utils.Indexes.utils import ( get_index_statistics, get_last_price_date, verify_data_integrity, reset_and_setup_database ) def setup_logging(): """Setup logging configuration.""" log_dir = 'logs' if not os.path.exists(log_dir): os.makedirs(log_dir) log_file = os.path.join(log_dir, f'index_update_{datetime.now():%Y%m%d_%H%M%S}.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) def check_tables_exist(connection: psycopg2.extensions.connection) -> bool: """Check if required database tables exist.""" cursor = None try: cursor = connection.cursor() cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'market_indexes_ref' ) AND EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'market_indexes_prices' ); """) return cursor.fetchone()[0] except Exception as e: logging.error(f"Error checking tables existence: {e}") return False finally: if cursor: cursor.close() def initialize_indexes(connection: psycopg2.extensions.connection, collector: MarketIndexCollector) -> None: """Initialize market indexes in the database.""" cursor = None try: cursor = connection.cursor() # Get index information from collector indexes = collector.get_index_info() # Get existing symbols cursor.execute("SELECT symbol FROM market_indexes_ref") existing_symbols = {row[0] for row in cursor.fetchall()} # Filter out already existing symbols new_indexes = [idx for idx in indexes if idx['symbol'] not in existing_symbols] if not new_indexes: logging.info("All indexes are already initialized") return logging.info(f"Initializing {len(new_indexes)} new indexes") # Prepare data for batch insertion index_data = [ (idx['symbol'], idx['description'], idx['category'], True) for idx in new_indexes ] execute_values( cursor, """ INSERT INTO market_indexes_ref (symbol, description, category, is_active) VALUES %s ON CONFLICT (symbol) DO UPDATE SET description = EXCLUDED.description, category = EXCLUDED.category, is_active = EXCLUDED.is_active """, index_data ) connection.commit() logging.info(f"Successfully initialized {len(new_indexes)} indexes") except Exception as e: connection.rollback() logging.error(f"Error in indexes initialization: {e}") raise finally: if cursor: cursor.close() def update_historical_prices(connection: psycopg2.extensions.connection, index_id: int, symbol: str, collector: MarketIndexCollector, last_update: datetime.date) -> None: """Update historical prices for a specific market index.""" cursor = None try: cursor = connection.cursor() # Set to_date to yesterday to avoid partial day data today = datetime.now().date() from_date = None if last_update: # Start 3 days before last update to ensure continuity from_date = (last_update - timedelta(days=3)).strftime('%Y-%m-%d') else: # For new indexes, start from a reasonable date from_date = '1980-01-01' # Get historical data historical_data = collector.get_historical_prices(symbol, from_date) if not historical_data: logging.warning(f"No historical data available for {symbol}") return # Filter out today's data and future dates filtered_data = [ record for record in historical_data if datetime.strptime(record['date'], '%Y-%m-%d').date() < today ] if not filtered_data: logging.info(f"No new complete day data available for {symbol}") return # Deduplicate data by date, keeping the last record for each date date_dict = {} duplicate_dates = set() for record in filtered_data: if record['date'] in date_dict: duplicate_dates.add(record['date']) date_dict[record['date']] = record if duplicate_dates: logging.warning(f"Found {len(duplicate_dates)} duplicate dates for {symbol}: {sorted(duplicate_dates)}") # Prepare data for batch insertion with deduplicated records records = [ (index_id, date, record['open'], record['high'], record['low'], record['close']) for date, record in date_dict.items() ] # Process in batches batch_size = 1000 total_updated = 0 for i in range(0, len(records), batch_size): batch = records[i:i + batch_size] execute_values( cursor, """ INSERT INTO market_indexes_prices (index_id, date, open, high, low, close) VALUES %s ON CONFLICT (index_id, date) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close """, batch ) connection.commit() total_updated += len(batch) logging.info(f"Updated {len(batch)} records for {symbol}") # Update last_update timestamp with timezone awareness current_time = datetime.now(tz=timezone.utc) cursor.execute( """ UPDATE market_indexes_ref SET last_update = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s """, (current_time, index_id) ) connection.commit() logging.info(f"Successfully updated {total_updated} records for {symbol}") except Exception as e: connection.rollback() logging.error(f"Error updating historical prices for {symbol}: {e}") raise finally: if cursor: cursor.close() def main(): load_dotenv() setup_logging() connection = None try: # Connect to database connection = psycopg2.connect( dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT') ) connection.autocommit = False # Explicit transaction control logging.info("Database connection established") # Initialize collector collector = MarketIndexCollector(os.getenv('FMP_API_KEY')) # Check if tables exist or reset is needed tables_exist = check_tables_exist(connection) if not tables_exist or os.getenv('RESET_DATABASE', '').lower() == 'true': logging.info("Setting up database tables...") if reset_and_setup_database(connection): logging.info("Database setup completed successfully") else: logging.error("Database setup failed") return # Initialize indexes initialize_indexes(connection, collector) # Get initial statistics initial_stats = get_index_statistics(connection) logging.info("Initial statistics:") logging.info(f"Total indexes: {len(initial_stats['coverage'])}") logging.info(f"Total prices: {initial_stats.get('total_prices', 0)}") # Get active indexes for update cursor = connection.cursor() cursor.execute(""" SELECT id, symbol FROM market_indexes_ref WHERE is_active = true ORDER BY symbol """) indexes = cursor.fetchall() cursor.close() logging.info(f"Found {len(indexes)} active indexes to update") # Update each index total_processed = 0 total_failed = 0 for index_id, symbol in indexes: try: last_update = get_last_price_date(connection, index_id) update_historical_prices( connection, index_id, symbol, collector, last_update ) total_processed += 1 # Log progress every 5 indexes if total_processed % 5 == 0: logging.info(f"Progress: {total_processed}/{len(indexes)} indexes processed") except Exception as e: total_failed += 1 logging.error(f"Error processing {symbol}: {e}") continue # Verify data integrity issues = verify_data_integrity(connection) if any(issues.values()): logging.warning("Data integrity issues found:") for issue_type, issue_list in issues.items(): if issue_list: logging.warning(f"{issue_type}: {len(issue_list)} issues found") # Get final statistics final_stats = get_index_statistics(connection) logging.info("\nFinal statistics:") logging.info(f"Total indexes: {len(final_stats['coverage'])}") logging.info(f"Total prices: {final_stats.get('total_prices', 0)}") logging.info(f"Successfully processed: {total_processed}") logging.info(f"Failed: {total_failed}") except Exception as e: logging.error(f"Error in main execution: {str(e)}") raise finally: try: if connection and not connection.closed: connection.close() logging.info("Database connection closed") except Exception as e: logging.error(f"Error closing database connection: {e}") if __name__ == "__main__": main()