import os import logging from datetime import datetime, timedelta import psycopg2 from psycopg2.extras import execute_values from dotenv import load_dotenv from utils.Forex.collector import ForexCollector from utils.Forex.utils import ( get_forex_statistics, get_last_price_date, update_forex_prices ) def setup_logging(): """Setup logging configuration.""" log_dir = 'logs' if not os.path.exists(log_dir): os.makedirs(log_dir) log_file = os.path.join(log_dir, f'forex_update_{datetime.now():%Y%m%d_%H%M%S}.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) def verify_database_tables(connection: psycopg2.extensions.connection) -> None: """Verify database tables exist and have correct structure.""" cursor = None try: cursor = connection.cursor() # Check if tables exist with correct structure cursor.execute(""" DO $$ BEGIN -- Create forex_pairs if it doesn't exist IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'forex_pairs') THEN CREATE TABLE forex_pairs ( id SERIAL PRIMARY KEY, symbol TEXT UNIQUE NOT NULL, description TEXT, last_update DATE, is_active BOOLEAN DEFAULT true ); RAISE NOTICE 'Created forex_pairs table'; END IF; -- Create forex_prices if it doesn't exist IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'forex_prices') THEN CREATE TABLE forex_prices ( pair_id INTEGER REFERENCES forex_pairs(id), date DATE NOT NULL, open NUMERIC(19,4) NOT NULL, high NUMERIC(19,4) NOT NULL, low NUMERIC(19,4) NOT NULL, close NUMERIC(19,4) NOT NULL, PRIMARY KEY (pair_id, date) ); CREATE INDEX IF NOT EXISTS idx_forex_prices_date ON forex_prices(date); RAISE NOTICE 'Created forex_prices table and index'; END IF; END; $$; """) connection.commit() logging.info("Database tables verified successfully") except Exception as e: connection.rollback() logging.error(f"Database verification failed: {e}") raise finally: if cursor: cursor.close() def initialize_forex_pairs(connection: psycopg2.extensions.connection, collector: ForexCollector) -> None: """Initialize forex pairs in the database.""" cursor = None try: cursor = connection.cursor() # Check existing pairs cursor.execute("SELECT symbol FROM forex_pairs WHERE is_active = true") existing_pairs = {row[0] for row in cursor.fetchall()} pairs = collector.get_pairs() new_pairs = [pair for pair in pairs if pair['symbol'] not in existing_pairs] if not new_pairs: logging.info("All pairs are already initialized") return logging.info(f"Attempting to initialize {len(new_pairs)} new pairs") for pair in new_pairs: try: cursor.execute(""" INSERT INTO forex_pairs (symbol, description, is_active) VALUES (%s, %s, true) ON CONFLICT (symbol) DO UPDATE SET description = EXCLUDED.description, is_active = true RETURNING id, symbol """, (pair['symbol'], pair['description'])) result = cursor.fetchone() if result: logging.debug(f"Initialized/Updated pair {result[1]} with ID {result[0]}") except Exception as e: logging.error(f"Error initializing pair {pair['symbol']}: {e}") continue connection.commit() # Verify initialization cursor.execute("SELECT COUNT(*) FROM forex_pairs WHERE is_active = true") count = cursor.fetchone()[0] logging.info(f"Total active pairs in database: {count}") except Exception as e: connection.rollback() logging.error(f"Error in forex pairs initialization: {e}") raise finally: if cursor: cursor.close() def main(): load_dotenv() setup_logging() try: # Connect to database connection = psycopg2.connect( dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT') ) logging.info("Database connection established") # Only verify tables structure (don't reset) verify_database_tables(connection) # Initialize collector collector = ForexCollector(os.getenv('FMP_API_KEY')) # Initialize forex pairs initialize_forex_pairs(connection, collector) # Get initial statistics initial_stats = get_forex_statistics(connection) logging.info(f"Initial statistics: {initial_stats}") # Get all pairs for update cursor = connection.cursor() cursor.execute(""" SELECT id, symbol FROM forex_pairs WHERE is_active = true ORDER BY symbol """) pairs = cursor.fetchall() cursor.close() logging.info(f"Found {len(pairs)} active pairs to update") # Update each pair total_records = 0 for pair_id, symbol in pairs: try: # Get last price date for this pair last_update = get_last_price_date(connection, pair_id) rows_inserted, error = update_forex_prices( connection, pair_id, symbol, collector, last_update ) if error: logging.error(f"Error updating {symbol}: {error}") else: total_records += rows_inserted logging.info(f"Successfully updated {rows_inserted} records for {symbol}") except Exception as e: logging.error(f"Error processing {symbol}: {e}") continue # Get final statistics final_stats = get_forex_statistics(connection) logging.info(f"Final statistics: {final_stats}") logging.info(f"Total records updated: {total_records}") connection.close() logging.info("Database update completed successfully") except Exception as e: logging.error(f"Error in main execution: {str(e)}") raise if __name__ == "__main__": main()