RivaCube/update_forex.py
2025-02-04 19:31:18 +01:00

212 lines
7.2 KiB
Python

import os
import logging
from datetime import datetime, timedelta
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv
from utils.Forex.collector import ForexCollector
from utils.Forex.utils import (
get_forex_statistics,
get_last_price_date,
update_forex_prices
)
def setup_logging():
"""Setup logging configuration."""
log_dir = 'logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, f'forex_update_{datetime.now():%Y%m%d_%H%M%S}.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
def verify_database_tables(connection: psycopg2.extensions.connection) -> None:
"""Verify database tables exist and have correct structure."""
cursor = None
try:
cursor = connection.cursor()
# Check if tables exist with correct structure
cursor.execute("""
DO $$
BEGIN
-- Create forex_pairs if it doesn't exist
IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'forex_pairs') THEN
CREATE TABLE forex_pairs (
id SERIAL PRIMARY KEY,
symbol TEXT UNIQUE NOT NULL,
description TEXT,
last_update DATE,
is_active BOOLEAN DEFAULT true
);
RAISE NOTICE 'Created forex_pairs table';
END IF;
-- Create forex_prices if it doesn't exist
IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'forex_prices') THEN
CREATE TABLE forex_prices (
pair_id INTEGER REFERENCES forex_pairs(id),
date DATE NOT NULL,
open NUMERIC(19,4) NOT NULL,
high NUMERIC(19,4) NOT NULL,
low NUMERIC(19,4) NOT NULL,
close NUMERIC(19,4) NOT NULL,
PRIMARY KEY (pair_id, date)
);
CREATE INDEX IF NOT EXISTS idx_forex_prices_date
ON forex_prices(date);
RAISE NOTICE 'Created forex_prices table and index';
END IF;
END;
$$;
""")
connection.commit()
logging.info("Database tables verified successfully")
except Exception as e:
connection.rollback()
logging.error(f"Database verification failed: {e}")
raise
finally:
if cursor:
cursor.close()
def initialize_forex_pairs(connection: psycopg2.extensions.connection, collector: ForexCollector) -> None:
"""Initialize forex pairs in the database."""
cursor = None
try:
cursor = connection.cursor()
# Check existing pairs
cursor.execute("SELECT symbol FROM forex_pairs WHERE is_active = true")
existing_pairs = {row[0] for row in cursor.fetchall()}
pairs = collector.get_pairs()
new_pairs = [pair for pair in pairs if pair['symbol'] not in existing_pairs]
if not new_pairs:
logging.info("All pairs are already initialized")
return
logging.info(f"Attempting to initialize {len(new_pairs)} new pairs")
for pair in new_pairs:
try:
cursor.execute("""
INSERT INTO forex_pairs (symbol, description, is_active)
VALUES (%s, %s, true)
ON CONFLICT (symbol) DO UPDATE SET
description = EXCLUDED.description,
is_active = true
RETURNING id, symbol
""", (pair['symbol'], pair['description']))
result = cursor.fetchone()
if result:
logging.debug(f"Initialized/Updated pair {result[1]} with ID {result[0]}")
except Exception as e:
logging.error(f"Error initializing pair {pair['symbol']}: {e}")
continue
connection.commit()
# Verify initialization
cursor.execute("SELECT COUNT(*) FROM forex_pairs WHERE is_active = true")
count = cursor.fetchone()[0]
logging.info(f"Total active pairs in database: {count}")
except Exception as e:
connection.rollback()
logging.error(f"Error in forex pairs initialization: {e}")
raise
finally:
if cursor:
cursor.close()
def main():
load_dotenv()
setup_logging()
try:
# Connect to database
connection = psycopg2.connect(
dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT')
)
logging.info("Database connection established")
# Only verify tables structure (don't reset)
verify_database_tables(connection)
# Initialize collector
collector = ForexCollector(os.getenv('FMP_API_KEY'))
# Initialize forex pairs
initialize_forex_pairs(connection, collector)
# Get initial statistics
initial_stats = get_forex_statistics(connection)
logging.info(f"Initial statistics: {initial_stats}")
# Get all pairs for update
cursor = connection.cursor()
cursor.execute("""
SELECT id, symbol
FROM forex_pairs
WHERE is_active = true
ORDER BY symbol
""")
pairs = cursor.fetchall()
cursor.close()
logging.info(f"Found {len(pairs)} active pairs to update")
# Update each pair
total_records = 0
for pair_id, symbol in pairs:
try:
# Get last price date for this pair
last_update = get_last_price_date(connection, pair_id)
rows_inserted, error = update_forex_prices(
connection,
pair_id,
symbol,
collector,
last_update
)
if error:
logging.error(f"Error updating {symbol}: {error}")
else:
total_records += rows_inserted
logging.info(f"Successfully updated {rows_inserted} records for {symbol}")
except Exception as e:
logging.error(f"Error processing {symbol}: {e}")
continue
# Get final statistics
final_stats = get_forex_statistics(connection)
logging.info(f"Final statistics: {final_stats}")
logging.info(f"Total records updated: {total_records}")
connection.close()
logging.info("Database update completed successfully")
except Exception as e:
logging.error(f"Error in main execution: {str(e)}")
raise
if __name__ == "__main__":
main()