332 lines
12 KiB
Python
332 lines
12 KiB
Python
import os
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values
|
|
from dotenv import load_dotenv
|
|
from utils.Commodities.collector import CommodityCollector
|
|
from utils.Commodities.utils import get_commodity_statistics
|
|
|
|
def setup_logging():
|
|
"""Setup logging configuration."""
|
|
log_dir = 'logs'
|
|
if not os.path.exists(log_dir):
|
|
os.makedirs(log_dir)
|
|
|
|
log_file = os.path.join(log_dir, f'commodity_update_{datetime.now():%Y%m%d_%H%M%S}.log')
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
def verify_database_tables(connection: psycopg2.extensions.connection) -> None:
|
|
"""Verify database tables exist and have correct structure."""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
|
|
# Check if tables exist with correct structure
|
|
cursor.execute("""
|
|
DO $$
|
|
BEGIN
|
|
-- Create commodities_ref if it doesn't exist
|
|
IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'commodities_ref') THEN
|
|
CREATE TABLE commodities_ref (
|
|
id SERIAL PRIMARY KEY,
|
|
symbol VARCHAR(20) UNIQUE NOT NULL,
|
|
name TEXT,
|
|
description TEXT,
|
|
last_update DATE,
|
|
is_active BOOLEAN DEFAULT true
|
|
);
|
|
RAISE NOTICE 'Created commodities_ref table';
|
|
END IF;
|
|
|
|
-- Create commodities_prices if it doesn't exist
|
|
IF NOT EXISTS (SELECT FROM pg_tables WHERE tablename = 'commodities_prices') THEN
|
|
CREATE TABLE commodities_prices (
|
|
commodity_id INTEGER REFERENCES commodities_ref(id),
|
|
date DATE NOT NULL,
|
|
open NUMERIC(10,5) NOT NULL,
|
|
high NUMERIC(10,5) NOT NULL,
|
|
low NUMERIC(10,5) NOT NULL,
|
|
close NUMERIC(10,5) NOT NULL,
|
|
PRIMARY KEY (commodity_id, date)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_commodities_prices_date
|
|
ON commodities_prices(date);
|
|
RAISE NOTICE 'Created commodities_prices table and index';
|
|
END IF;
|
|
|
|
-- Add is_active column if it doesn't exist
|
|
IF NOT EXISTS (
|
|
SELECT FROM information_schema.columns
|
|
WHERE table_name = 'commodities_ref' AND column_name = 'is_active'
|
|
) THEN
|
|
ALTER TABLE commodities_ref ADD COLUMN is_active BOOLEAN DEFAULT true;
|
|
RAISE NOTICE 'Added is_active column to commodities_ref';
|
|
END IF;
|
|
END;
|
|
$$;
|
|
""")
|
|
|
|
connection.commit()
|
|
logging.info("Database tables verified successfully")
|
|
|
|
except Exception as e:
|
|
connection.rollback()
|
|
logging.error(f"Database verification failed: {e}")
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def initialize_commodities(connection: psycopg2.extensions.connection,
|
|
collector: CommodityCollector) -> None:
|
|
"""Initialize commodities in the database."""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
|
|
# Get commodities from collector's commodities property
|
|
commodities = collector.commodities
|
|
|
|
# Get existing symbols
|
|
cursor.execute("SELECT symbol FROM commodities_ref")
|
|
existing_symbols = {row[0] for row in cursor.fetchall()}
|
|
|
|
# Filter out already existing symbols
|
|
new_commodities = {symbol: desc for symbol, desc in commodities.items()
|
|
if symbol not in existing_symbols}
|
|
|
|
if not new_commodities:
|
|
logging.info("All commodities are already initialized")
|
|
return
|
|
|
|
logging.info(f"Attempting to initialize {len(new_commodities)} new commodities")
|
|
|
|
for symbol, description in new_commodities.items():
|
|
try:
|
|
cursor.execute("""
|
|
INSERT INTO commodities_ref
|
|
(symbol, description, is_active, last_update)
|
|
VALUES (%s, %s, true, NULL)
|
|
ON CONFLICT (symbol) DO NOTHING
|
|
RETURNING id, symbol
|
|
""", (symbol, description))
|
|
|
|
result = cursor.fetchone()
|
|
if result:
|
|
logging.debug(f"Initialized commodity {result[1]} with ID {result[0]}")
|
|
except Exception as e:
|
|
logging.error(f"Error initializing commodity {symbol}: {e}")
|
|
continue
|
|
|
|
connection.commit()
|
|
|
|
# Verify initialization
|
|
cursor.execute("SELECT COUNT(*) FROM commodities_ref WHERE is_active = true")
|
|
count = cursor.fetchone()[0]
|
|
logging.info(f"Total active commodities in database: {count}")
|
|
|
|
except Exception as e:
|
|
connection.rollback()
|
|
logging.error(f"Error in commodities initialization: {e}")
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def update_historical_prices(connection: psycopg2.extensions.connection,
|
|
commodity_id: int,
|
|
symbol: str,
|
|
collector: CommodityCollector,
|
|
last_update: datetime.date) -> None:
|
|
"""Update historical prices for a specific commodity."""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
|
|
# If no last_update, start from a much earlier date
|
|
if last_update is None:
|
|
last_update = datetime(2000, 1, 1).date()
|
|
|
|
# Set to_date to yesterday to avoid partial day data
|
|
today = datetime.now().date()
|
|
|
|
# Don't update if we already have data up to yesterday
|
|
if last_update and last_update >= (today - timedelta(days=1)):
|
|
logging.info(f"Data for {symbol} is already up to date (last update: {last_update})")
|
|
return
|
|
|
|
# Get historical data
|
|
historical_data = collector.get_historical_prices(symbol, last_update)
|
|
|
|
if not historical_data:
|
|
logging.warning(f"No historical data available for {symbol}")
|
|
return
|
|
|
|
# Filter out today's data if it exists
|
|
filtered_data = [
|
|
record for record in historical_data
|
|
if datetime.strptime(record['date'], '%Y-%m-%d').date() < today
|
|
]
|
|
|
|
if not filtered_data:
|
|
logging.info(f"No new complete day data available for {symbol}")
|
|
return
|
|
|
|
# Prepare data for batch insertion
|
|
records = [
|
|
(commodity_id, record['date'], record['open'], record['high'],
|
|
record['low'], record['close'])
|
|
for record in filtered_data
|
|
]
|
|
|
|
# Process in batches
|
|
batch_size = 1000
|
|
total_updated = 0
|
|
for i in range(0, len(records), batch_size):
|
|
batch = records[i:i + batch_size]
|
|
execute_values(
|
|
cursor,
|
|
"""
|
|
INSERT INTO commodities_prices
|
|
(commodity_id, date, open, high, low, close)
|
|
VALUES %s
|
|
ON CONFLICT (commodity_id, date) DO UPDATE SET
|
|
open = EXCLUDED.open,
|
|
high = EXCLUDED.high,
|
|
low = EXCLUDED.low,
|
|
close = EXCLUDED.close
|
|
""",
|
|
batch
|
|
)
|
|
connection.commit()
|
|
total_updated += len(batch)
|
|
logging.info(f"Updated {len(batch)} records for {symbol}")
|
|
|
|
# Update last_update timestamp to yesterday's date
|
|
cursor.execute(
|
|
"UPDATE commodities_ref SET last_update = %s WHERE id = %s",
|
|
(today - timedelta(days=1), commodity_id)
|
|
)
|
|
connection.commit()
|
|
|
|
logging.info(f"Successfully updated {total_updated} complete day records for {symbol}")
|
|
|
|
except Exception as e:
|
|
connection.rollback()
|
|
logging.error(f"Error updating historical prices for {symbol}: {e}")
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def backfill_historical_data(connection: psycopg2.extensions.connection,
|
|
collector: CommodityCollector,
|
|
start_date: datetime.date = datetime(2000, 1, 1).date()) -> None:
|
|
"""Backfill historical data for all commodities from a given start date."""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
cursor.execute("""
|
|
SELECT id, symbol
|
|
FROM commodities_ref
|
|
WHERE is_active = true
|
|
""")
|
|
|
|
commodities = cursor.fetchall()
|
|
logging.info(f"Starting backfill for {len(commodities)} commodities from {start_date}")
|
|
|
|
for commodity_id, symbol in commodities:
|
|
try:
|
|
update_historical_prices(connection, commodity_id, symbol, collector, start_date)
|
|
except Exception as e:
|
|
logging.error(f"Error backfilling {symbol}: {e}")
|
|
continue
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in backfill process: {e}")
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def main():
|
|
load_dotenv()
|
|
setup_logging()
|
|
|
|
try:
|
|
# Connect to database
|
|
connection = psycopg2.connect(
|
|
dbname=os.getenv('DB_NAME'),
|
|
user=os.getenv('DB_USER'),
|
|
password=os.getenv('DB_PASSWORD'),
|
|
host=os.getenv('DB_HOST'),
|
|
port=os.getenv('DB_PORT')
|
|
)
|
|
logging.info("Database connection established")
|
|
|
|
# Verify database structure
|
|
verify_database_tables(connection)
|
|
|
|
# Initialize collector
|
|
collector = CommodityCollector(os.getenv('FMP_API_KEY'))
|
|
|
|
# Initialize commodities
|
|
initialize_commodities(connection, collector)
|
|
|
|
# Get initial statistics
|
|
initial_stats = get_commodity_statistics(connection)
|
|
logging.info(f"Initial statistics: {initial_stats}")
|
|
|
|
# Perform a complete backfill for any missing historical data
|
|
backfill_historical_data(connection, collector)
|
|
|
|
# Get all commodities for regular update
|
|
cursor = connection.cursor()
|
|
cursor.execute("""
|
|
SELECT id, symbol, last_update
|
|
FROM commodities_ref
|
|
WHERE is_active = true
|
|
ORDER BY symbol
|
|
""")
|
|
commodities = cursor.fetchall()
|
|
logging.info(f"Found {len(commodities)} active commodities to update")
|
|
|
|
# Update each commodity
|
|
for commodity_id, symbol, last_update in commodities:
|
|
try:
|
|
update_historical_prices(
|
|
connection,
|
|
commodity_id,
|
|
symbol,
|
|
collector,
|
|
last_update
|
|
)
|
|
except Exception as e:
|
|
logging.error(f"Error processing {symbol}: {e}")
|
|
continue
|
|
|
|
# Get final statistics
|
|
final_stats = get_commodity_statistics(connection)
|
|
logging.info(f"Final statistics: {final_stats}")
|
|
|
|
if cursor:
|
|
cursor.close()
|
|
connection.close()
|
|
logging.info("Database update completed successfully")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in main execution: {str(e)}")
|
|
raise
|
|
|
|
if __name__ == "__main__":
|
|
main()
|