212 lines
7.2 KiB
Python
212 lines
7.2 KiB
Python
import os
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values
|
|
from dotenv import load_dotenv
|
|
from utils.Forex.collector import ForexCollector
|
|
from utils.Forex.utils import (
|
|
get_forex_statistics,
|
|
get_last_price_date,
|
|
update_forex_prices
|
|
)
|
|
|
|
def setup_logging():
|
|
"""Setup logging configuration."""
|
|
log_dir = 'logs'
|
|
if not os.path.exists(log_dir):
|
|
os.makedirs(log_dir)
|
|
|
|
log_file = os.path.join(log_dir, f'forex_update_{datetime.now():%Y%m%d_%H%M%S}.log')
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
def verify_database_tables(connection: psycopg2.extensions.connection) -> None:
|
|
"""Verify database tables exist and have correct structure."""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
|
|
# Check if tables exist with correct structure
|
|
cursor.execute("""
|
|
DO $$
|
|
BEGIN
|
|
-- Create forex_pairs if it doesn't exist
|
|
IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'forex_pairs') THEN
|
|
CREATE TABLE forex_pairs (
|
|
id SERIAL PRIMARY KEY,
|
|
symbol TEXT UNIQUE NOT NULL,
|
|
description TEXT,
|
|
last_update DATE,
|
|
is_active BOOLEAN DEFAULT true
|
|
);
|
|
RAISE NOTICE 'Created forex_pairs table';
|
|
END IF;
|
|
|
|
-- Create forex_prices if it doesn't exist
|
|
IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'forex_prices') THEN
|
|
CREATE TABLE forex_prices (
|
|
pair_id INTEGER REFERENCES forex_pairs(id),
|
|
date DATE NOT NULL,
|
|
open NUMERIC(19,4) NOT NULL,
|
|
high NUMERIC(19,4) NOT NULL,
|
|
low NUMERIC(19,4) NOT NULL,
|
|
close NUMERIC(19,4) NOT NULL,
|
|
PRIMARY KEY (pair_id, date)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_forex_prices_date
|
|
ON forex_prices(date);
|
|
RAISE NOTICE 'Created forex_prices table and index';
|
|
END IF;
|
|
END;
|
|
$$;
|
|
""")
|
|
|
|
connection.commit()
|
|
logging.info("Database tables verified successfully")
|
|
|
|
except Exception as e:
|
|
connection.rollback()
|
|
logging.error(f"Database verification failed: {e}")
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def initialize_forex_pairs(connection: psycopg2.extensions.connection, collector: ForexCollector) -> None:
|
|
"""Initialize forex pairs in the database."""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
|
|
# Check existing pairs
|
|
cursor.execute("SELECT symbol FROM forex_pairs WHERE is_active = true")
|
|
existing_pairs = {row[0] for row in cursor.fetchall()}
|
|
|
|
pairs = collector.get_pairs()
|
|
new_pairs = [pair for pair in pairs if pair['symbol'] not in existing_pairs]
|
|
|
|
if not new_pairs:
|
|
logging.info("All pairs are already initialized")
|
|
return
|
|
|
|
logging.info(f"Attempting to initialize {len(new_pairs)} new pairs")
|
|
for pair in new_pairs:
|
|
try:
|
|
cursor.execute("""
|
|
INSERT INTO forex_pairs (symbol, description, is_active)
|
|
VALUES (%s, %s, true)
|
|
ON CONFLICT (symbol) DO UPDATE SET
|
|
description = EXCLUDED.description,
|
|
is_active = true
|
|
RETURNING id, symbol
|
|
""", (pair['symbol'], pair['description']))
|
|
|
|
result = cursor.fetchone()
|
|
if result:
|
|
logging.debug(f"Initialized/Updated pair {result[1]} with ID {result[0]}")
|
|
except Exception as e:
|
|
logging.error(f"Error initializing pair {pair['symbol']}: {e}")
|
|
continue
|
|
|
|
connection.commit()
|
|
|
|
# Verify initialization
|
|
cursor.execute("SELECT COUNT(*) FROM forex_pairs WHERE is_active = true")
|
|
count = cursor.fetchone()[0]
|
|
logging.info(f"Total active pairs in database: {count}")
|
|
|
|
except Exception as e:
|
|
connection.rollback()
|
|
logging.error(f"Error in forex pairs initialization: {e}")
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def main():
|
|
load_dotenv()
|
|
setup_logging()
|
|
|
|
try:
|
|
# Connect to database
|
|
connection = psycopg2.connect(
|
|
dbname=os.getenv('DB_NAME'),
|
|
user=os.getenv('DB_USER'),
|
|
password=os.getenv('DB_PASSWORD'),
|
|
host=os.getenv('DB_HOST'),
|
|
port=os.getenv('DB_PORT')
|
|
)
|
|
logging.info("Database connection established")
|
|
|
|
# Only verify tables structure (don't reset)
|
|
verify_database_tables(connection)
|
|
|
|
# Initialize collector
|
|
collector = ForexCollector(os.getenv('FMP_API_KEY'))
|
|
|
|
# Initialize forex pairs
|
|
initialize_forex_pairs(connection, collector)
|
|
|
|
# Get initial statistics
|
|
initial_stats = get_forex_statistics(connection)
|
|
logging.info(f"Initial statistics: {initial_stats}")
|
|
|
|
# Get all pairs for update
|
|
cursor = connection.cursor()
|
|
cursor.execute("""
|
|
SELECT id, symbol
|
|
FROM forex_pairs
|
|
WHERE is_active = true
|
|
ORDER BY symbol
|
|
""")
|
|
pairs = cursor.fetchall()
|
|
cursor.close()
|
|
logging.info(f"Found {len(pairs)} active pairs to update")
|
|
|
|
# Update each pair
|
|
total_records = 0
|
|
for pair_id, symbol in pairs:
|
|
try:
|
|
# Get last price date for this pair
|
|
last_update = get_last_price_date(connection, pair_id)
|
|
rows_inserted, error = update_forex_prices(
|
|
connection,
|
|
pair_id,
|
|
symbol,
|
|
collector,
|
|
last_update
|
|
)
|
|
|
|
if error:
|
|
logging.error(f"Error updating {symbol}: {error}")
|
|
else:
|
|
total_records += rows_inserted
|
|
logging.info(f"Successfully updated {rows_inserted} records for {symbol}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing {symbol}: {e}")
|
|
continue
|
|
|
|
# Get final statistics
|
|
final_stats = get_forex_statistics(connection)
|
|
logging.info(f"Final statistics: {final_stats}")
|
|
logging.info(f"Total records updated: {total_records}")
|
|
|
|
connection.close()
|
|
logging.info("Database update completed successfully")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in main execution: {str(e)}")
|
|
raise
|
|
|
|
if __name__ == "__main__":
|
|
main()
|