354 lines
12 KiB
Python
354 lines
12 KiB
Python
import os
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
import psycopg2
|
|
from psycopg2.extras import execute_batch
|
|
from psycopg2.extensions import register_type, UNICODE, UNICODEARRAY
|
|
import pandas as pd
|
|
from dotenv import load_dotenv
|
|
import sys
|
|
|
|
# Add the project root directory to Python path
|
|
project_root = os.path.dirname(os.path.abspath(__file__))
|
|
sys.path.append(project_root)
|
|
|
|
from utils.Stocks.historical.collector import (
|
|
StockDataClient,
|
|
update_stocks_batch,
|
|
get_stock_statistics
|
|
)
|
|
|
|
def setup_logging():
|
|
log_dir = 'logs'
|
|
if not os.path.exists(log_dir):
|
|
os.makedirs(log_dir)
|
|
log_file = os.path.join(log_dir, f'stock_update_{datetime.now():%Y%m%d_%H%M%S}.log')
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[logging.FileHandler(log_file), logging.StreamHandler()]
|
|
)
|
|
|
|
def setup_database(conn: psycopg2.extensions.connection) -> bool:
|
|
cursor = None
|
|
try:
|
|
conn.set_client_encoding('UTF8')
|
|
cursor = conn.cursor()
|
|
register_type(UNICODE, cursor)
|
|
register_type(UNICODEARRAY, cursor)
|
|
|
|
# Execute each table creation in its own transaction for better error handling
|
|
tables_sql = [
|
|
# Base tables first
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS sectors (
|
|
id SERIAL PRIMARY KEY,
|
|
name VARCHAR UNIQUE NOT NULL
|
|
);
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS zones (
|
|
id SERIAL PRIMARY KEY,
|
|
name VARCHAR UNIQUE NOT NULL
|
|
);
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS tickers (
|
|
id SERIAL PRIMARY KEY,
|
|
isin VARCHAR UNIQUE NOT NULL,
|
|
bbg_ticker VARCHAR,
|
|
yf_ticker VARCHAR,
|
|
name VARCHAR NOT NULL,
|
|
description TEXT,
|
|
hashtags VARCHAR,
|
|
sector_id INTEGER REFERENCES sectors(id),
|
|
gics VARCHAR,
|
|
zone_id INTEGER REFERENCES zones(id),
|
|
ccy VARCHAR
|
|
);
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS stocks (
|
|
id SERIAL PRIMARY KEY,
|
|
ticker_id INTEGER REFERENCES tickers(id),
|
|
date DATE NOT NULL,
|
|
open DECIMAL(19,4),
|
|
high DECIMAL(19,4),
|
|
low DECIMAL(19,4),
|
|
close DECIMAL(19,4),
|
|
adj_close DECIMAL(19,4),
|
|
volume BIGINT,
|
|
UNIQUE(ticker_id, date)
|
|
);
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS invalid_tickers (
|
|
id SERIAL PRIMARY KEY,
|
|
ticker VARCHAR UNIQUE NOT NULL,
|
|
reason TEXT,
|
|
last_check TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
attempts INTEGER DEFAULT 1
|
|
);
|
|
"""
|
|
]
|
|
|
|
# Execute each table creation in its own transaction
|
|
for sql in tables_sql:
|
|
try:
|
|
cursor.execute(sql)
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error creating table: {e}")
|
|
return False
|
|
|
|
# Create indexes in separate transactions
|
|
indexes = [
|
|
"CREATE INDEX IF NOT EXISTS idx_stocks_date ON stocks(date);",
|
|
"CREATE INDEX IF NOT EXISTS idx_stocks_ticker_date ON stocks(ticker_id, date);",
|
|
"CREATE INDEX IF NOT EXISTS idx_tickers_isin ON tickers(isin);"
|
|
]
|
|
|
|
# Create each index in its own transaction
|
|
for index in indexes:
|
|
try:
|
|
cursor.execute(index)
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error creating index: {e}")
|
|
return False
|
|
|
|
logging.info("Database structure setup completed successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error setting up database: {e}")
|
|
if conn:
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def get_existing_tickers(connection) -> dict:
|
|
"""Get existing tickers with their latest stock date"""
|
|
cursor = connection.cursor()
|
|
try:
|
|
cursor.execute("""
|
|
WITH latest_updates AS (
|
|
SELECT ticker_id, MAX(date) as last_update
|
|
FROM stocks
|
|
GROUP BY ticker_id
|
|
)
|
|
SELECT t.isin, t.id, t.yf_ticker, t.name, COALESCE(lu.last_update, '1900-01-01'::date) as last_update,
|
|
s.name as sector, z.name as zone
|
|
FROM tickers t
|
|
LEFT JOIN latest_updates lu ON t.id = lu.ticker_id
|
|
LEFT JOIN sectors s ON t.sector_id = s.id
|
|
LEFT JOIN zones z ON t.zone_id = z.id
|
|
WHERE t.yf_ticker IS NOT NULL
|
|
""")
|
|
return {
|
|
row[0]: {
|
|
'id': row[1],
|
|
'yf_ticker': row[2],
|
|
'name': row[3],
|
|
'last_update': row[4],
|
|
'sector': row[5],
|
|
'zone': row[6]
|
|
}
|
|
for row in cursor.fetchall()
|
|
}
|
|
finally:
|
|
cursor.close()
|
|
|
|
def bulk_upsert_sectors_zones(connection, df):
|
|
"""Efficiently upsert sectors and zones"""
|
|
cursor = connection.cursor()
|
|
sector_map = {}
|
|
zone_map = {}
|
|
try:
|
|
# Bulk upsert sectors
|
|
sectors = list(set(df['SECTOR'].dropna()))
|
|
execute_batch(cursor, """
|
|
INSERT INTO sectors (name)
|
|
VALUES (%s)
|
|
ON CONFLICT (name) DO NOTHING
|
|
""", [(s,) for s in sectors])
|
|
cursor.execute("SELECT id, name FROM sectors")
|
|
sector_map = {row[1]: row[0] for row in cursor.fetchall()}
|
|
|
|
# Bulk upsert zones
|
|
zones = list(set(df['ID_ZONE'].dropna()))
|
|
execute_batch(cursor, """
|
|
INSERT INTO zones (name)
|
|
VALUES (%s)
|
|
ON CONFLICT (name) DO NOTHING
|
|
""", [(z,) for z in zones])
|
|
cursor.execute("SELECT id, name FROM zones")
|
|
zone_map = {row[1]: row[0] for row in cursor.fetchall()}
|
|
|
|
connection.commit()
|
|
return sector_map, zone_map
|
|
except Exception as e:
|
|
connection.rollback()
|
|
logging.error(f"Error in bulk upsert of sectors/zones: {e}")
|
|
raise
|
|
finally:
|
|
cursor.close()
|
|
|
|
def process_excel_updates(connection, excel_path):
|
|
"""Process Excel updates efficiently"""
|
|
df = pd.read_excel(excel_path)
|
|
existing_tickers = get_existing_tickers(connection)
|
|
tickers_to_update = []
|
|
try:
|
|
# Bulk upsert sectors and zones
|
|
sector_map, zone_map = bulk_upsert_sectors_zones(connection, df)
|
|
|
|
for _, row in df.iterrows():
|
|
if not pd.notna(row['ISIN']):
|
|
continue
|
|
|
|
isin = row['ISIN']
|
|
existing = existing_tickers.get(isin)
|
|
|
|
try:
|
|
cursor = connection.cursor()
|
|
# Upsert ticker
|
|
cursor.execute("""
|
|
INSERT INTO tickers (
|
|
isin, bbg_ticker, yf_ticker, name, description, hashtags,
|
|
sector_id, gics, zone_id, ccy
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (isin) DO UPDATE SET
|
|
bbg_ticker = COALESCE(EXCLUDED.bbg_ticker, tickers.bbg_ticker),
|
|
yf_ticker = COALESCE(EXCLUDED.yf_ticker, tickers.yf_ticker),
|
|
name = COALESCE(EXCLUDED.name, tickers.name),
|
|
description = COALESCE(EXCLUDED.description, tickers.description),
|
|
hashtags = COALESCE(EXCLUDED.hashtags, tickers.hashtags),
|
|
sector_id = COALESCE(EXCLUDED.sector_id, tickers.sector_id),
|
|
gics = COALESCE(EXCLUDED.gics, tickers.gics),
|
|
zone_id = COALESCE(EXCLUDED.zone_id, tickers.zone_id),
|
|
ccy = COALESCE(EXCLUDED.ccy, tickers.ccy)
|
|
RETURNING id
|
|
""", (
|
|
isin,
|
|
row['BBG_TICKER'] if pd.notna(row['BBG_TICKER']) else None,
|
|
row['YF_TICKER'] if pd.notna(row['YF_TICKER']) else None,
|
|
row['NAME'],
|
|
row['Des'] if pd.notna(row['Des']) else None,
|
|
row['Hshtag'] if pd.notna(row['Hshtag']) else None,
|
|
sector_map.get(row['SECTOR']),
|
|
row['GICS'] if pd.notna(row['GICS']) else None,
|
|
zone_map.get(row['ID_ZONE']),
|
|
row['CCY'] if pd.notna(row['CCY']) else None
|
|
))
|
|
|
|
ticker_id = cursor.fetchone()[0]
|
|
connection.commit()
|
|
|
|
if pd.notna(row['YF_TICKER']):
|
|
last_update = existing['last_update'] if existing else datetime(1900, 1, 1).date()
|
|
tickers_to_update.append({
|
|
'id': ticker_id,
|
|
'ticker': row['YF_TICKER'],
|
|
'name': row['NAME'],
|
|
'last_update': last_update
|
|
})
|
|
except Exception as e:
|
|
connection.rollback()
|
|
logging.error(f"Error processing ISIN {isin}: {e}")
|
|
finally:
|
|
cursor.close()
|
|
|
|
return tickers_to_update
|
|
except Exception as e:
|
|
logging.error(f"Error in Excel processing: {e}")
|
|
raise
|
|
|
|
def get_stock_statistics(connection):
|
|
"""Get basic statistics about the stock data"""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
cursor.execute("""
|
|
SELECT
|
|
(SELECT COUNT(*) FROM stocks) as stocks_count,
|
|
(SELECT COUNT(*) FROM tickers) as tickers_count,
|
|
(SELECT COUNT(DISTINCT ticker_id) FROM stocks) as active_tickers,
|
|
(SELECT MAX(date) FROM stocks) as latest_date
|
|
""")
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return {
|
|
'stocks_count': row[0],
|
|
'tickers_count': row[1],
|
|
'active_tickers': row[2],
|
|
'latest_date': row[3]
|
|
}
|
|
return {}
|
|
except Exception as e:
|
|
logging.error(f"Error getting stock statistics: {e}")
|
|
return {}
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def main():
|
|
load_dotenv()
|
|
setup_logging()
|
|
connection = None
|
|
|
|
try:
|
|
connection = psycopg2.connect(
|
|
dbname=os.getenv('DB_NAME'),
|
|
user=os.getenv('DB_USER'),
|
|
password=os.getenv('DB_PASSWORD'),
|
|
host=os.getenv('DB_HOST'),
|
|
port=os.getenv('DB_PORT')
|
|
)
|
|
|
|
# First, set up the database
|
|
if not setup_database(connection):
|
|
raise Exception("Database setup failed")
|
|
|
|
# Now that tables exist, we can get statistics
|
|
initial_stats = get_stock_statistics(connection)
|
|
logging.info(f"Initial statistics: {initial_stats}")
|
|
|
|
excel_path = os.path.join('utils', 'Stocks', 'RivaLexique.xlsx')
|
|
tickers = process_excel_updates(connection, excel_path)
|
|
logging.info(f"Processing updates for {len(tickers)} tickers")
|
|
|
|
# Initialize stock client (assuming StockDataClient is imported)
|
|
stock_client = StockDataClient(
|
|
api_key=os.getenv('FMP_API_KEY'),
|
|
rate_limit_per_minute=300
|
|
)
|
|
|
|
# Update stocks
|
|
total_records = update_stocks_batch(
|
|
tickers=tickers,
|
|
connection=connection,
|
|
client=stock_client,
|
|
batch_size=50
|
|
)
|
|
|
|
final_stats = get_stock_statistics(connection)
|
|
logging.info(f"Final statistics: {final_stats}")
|
|
logging.info(f"Total records processed: {total_records}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in main execution: {e}")
|
|
if connection:
|
|
connection.rollback()
|
|
raise
|
|
finally:
|
|
if connection:
|
|
connection.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|