import os import logging from datetime import datetime, timedelta import psycopg2 from psycopg2.extras import execute_values from dotenv import load_dotenv from utils.Commodities.collector import CommodityCollector from utils.Commodities.utils import get_commodity_statistics def setup_logging(): """Setup logging configuration.""" log_dir = 'logs' if not os.path.exists(log_dir): os.makedirs(log_dir) log_file = os.path.join(log_dir, f'commodity_update_{datetime.now():%Y%m%d_%H%M%S}.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) def verify_database_tables(connection: psycopg2.extensions.connection) -> None: """Verify database tables exist and have correct structure.""" cursor = None try: cursor = connection.cursor() # Check if tables exist with correct structure cursor.execute(""" DO $$ BEGIN -- Create commodities_ref if it doesn't exist IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'commodities_ref') THEN CREATE TABLE commodities_ref ( id SERIAL PRIMARY KEY, symbol VARCHAR(20) UNIQUE NOT NULL, name TEXT, description TEXT, last_update DATE, is_active BOOLEAN DEFAULT true ); RAISE NOTICE 'Created commodities_ref table'; END IF; -- Create commodities_prices if it doesn't exist IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'commodities_prices') THEN CREATE TABLE commodities_prices ( commodity_id INTEGER REFERENCES commodities_ref(id), date DATE NOT NULL, open NUMERIC(10,5) NOT NULL, high NUMERIC(10,5) NOT NULL, low NUMERIC(10,5) NOT NULL, close NUMERIC(10,5) NOT NULL, PRIMARY KEY (commodity_id, date) ); CREATE INDEX IF NOT EXISTS idx_commodities_prices_date ON commodities_prices(date); RAISE NOTICE 'Created commodities_prices table and index'; END IF; -- Add is_active column if it doesn't exist IF NOT EXISTS ( SELECT FROM information_schema.columns WHERE table_name = 'commodities_ref' AND column_name = 'is_active' ) THEN ALTER TABLE commodities_ref ADD COLUMN is_active BOOLEAN DEFAULT true; RAISE NOTICE 'Added is_active column to commodities_ref'; END IF; END; $$; """) connection.commit() logging.info("Database tables verified successfully") except Exception as e: connection.rollback() logging.error(f"Database verification failed: {e}") raise finally: if cursor: cursor.close() def initialize_commodities(connection: psycopg2.extensions.connection, collector: CommodityCollector) -> None: """Initialize commodities in the database.""" cursor = None try: cursor = connection.cursor() # Get commodities from collector's commodities property commodities = collector.commodities # Get existing symbols cursor.execute("SELECT symbol FROM commodities_ref") existing_symbols = {row[0] for row in cursor.fetchall()} # Filter out already existing symbols new_commodities = {symbol: desc for symbol, desc in commodities.items() if symbol not in existing_symbols} if not new_commodities: logging.info("All commodities are already initialized") return logging.info(f"Attempting to initialize {len(new_commodities)} new commodities") for symbol, description in new_commodities.items(): try: cursor.execute(""" INSERT INTO commodities_ref (symbol, description, is_active, last_update) VALUES (%s, %s, true, NULL) ON CONFLICT (symbol) DO NOTHING RETURNING id, symbol """, (symbol, description)) result = cursor.fetchone() if result: logging.debug(f"Initialized commodity {result[1]} with ID {result[0]}") except Exception as e: logging.error(f"Error initializing commodity {symbol}: {e}") continue connection.commit() # Verify initialization cursor.execute("SELECT COUNT(*) FROM commodities_ref WHERE is_active = true") count = cursor.fetchone()[0] logging.info(f"Total active commodities in database: {count}") except Exception as e: connection.rollback() logging.error(f"Error in commodities initialization: {e}") raise finally: if cursor: cursor.close() def update_historical_prices(connection: psycopg2.extensions.connection, commodity_id: int, symbol: str, collector: CommodityCollector, last_update: datetime.date) -> None: """Update historical prices for a specific commodity.""" cursor = None try: cursor = connection.cursor() # If no last_update, start from a much earlier date if last_update is None: last_update = datetime(2000, 1, 1).date() # Set to_date to yesterday to avoid partial day data today = datetime.now().date() # Don't update if we already have data up to yesterday if last_update and last_update >= (today - timedelta(days=1)): logging.info(f"Data for {symbol} is already up to date (last update: {last_update})") return # Get historical data historical_data = collector.get_historical_prices(symbol, last_update) if not historical_data: logging.warning(f"No historical data available for {symbol}") return # Filter out today's data if it exists filtered_data = [ record for record in historical_data if datetime.strptime(record['date'], '%Y-%m-%d').date() < today ] if not filtered_data: logging.info(f"No new complete day data available for {symbol}") return # Prepare data for batch insertion records = [ (commodity_id, record['date'], record['open'], record['high'], record['low'], record['close']) for record in filtered_data ] # Process in batches batch_size = 1000 total_updated = 0 for i in range(0, len(records), batch_size): batch = records[i:i + batch_size] execute_values( cursor, """ INSERT INTO commodities_prices (commodity_id, date, open, high, low, close) VALUES %s ON CONFLICT (commodity_id, date) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close """, batch ) connection.commit() total_updated += len(batch) logging.info(f"Updated {len(batch)} records for {symbol}") # Update last_update timestamp to yesterday's date cursor.execute( "UPDATE commodities_ref SET last_update = %s WHERE id = %s", (today - timedelta(days=1), commodity_id) ) connection.commit() logging.info(f"Successfully updated {total_updated} complete day records for {symbol}") except Exception as e: connection.rollback() logging.error(f"Error updating historical prices for {symbol}: {e}") raise finally: if cursor: cursor.close() def backfill_historical_data(connection: psycopg2.extensions.connection, collector: CommodityCollector, start_date: datetime.date = datetime(2000, 1, 1).date()) -> None: """Backfill historical data for all commodities from a given start date.""" cursor = None try: cursor = connection.cursor() cursor.execute(""" SELECT id, symbol FROM commodities_ref WHERE is_active = true """) commodities = cursor.fetchall() logging.info(f"Starting backfill for {len(commodities)} commodities from {start_date}") for commodity_id, symbol in commodities: try: update_historical_prices(connection, commodity_id, symbol, collector, start_date) except Exception as e: logging.error(f"Error backfilling {symbol}: {e}") continue except Exception as e: logging.error(f"Error in backfill process: {e}") raise finally: if cursor: cursor.close() def main(): load_dotenv() setup_logging() try: # Connect to database connection = psycopg2.connect( dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT') ) logging.info("Database connection established") # Verify database structure verify_database_tables(connection) # Initialize collector collector = CommodityCollector(os.getenv('FMP_API_KEY')) # Initialize commodities initialize_commodities(connection, collector) # Get initial statistics initial_stats = get_commodity_statistics(connection) logging.info(f"Initial statistics: {initial_stats}") # Perform a complete backfill for any missing historical data backfill_historical_data(connection, collector) # Get all commodities for regular update cursor = connection.cursor() cursor.execute(""" SELECT id, symbol, last_update FROM commodities_ref WHERE is_active = true ORDER BY symbol """) commodities = cursor.fetchall() logging.info(f"Found {len(commodities)} active commodities to update") # Update each commodity for commodity_id, symbol, last_update in commodities: try: update_historical_prices( connection, commodity_id, symbol, collector, last_update ) except Exception as e: logging.error(f"Error processing {symbol}: {e}") continue # Get final statistics final_stats = get_commodity_statistics(connection) logging.info(f"Final statistics: {final_stats}") if cursor: cursor.close() connection.close() logging.info("Database update completed successfully") except Exception as e: logging.error(f"Error in main execution: {str(e)}") raise if __name__ == "__main__": main()