RivaCube/utils/usmarkets/utils.py
2025-02-04 19:31:18 +01:00

456 lines
18 KiB
Python

import logging
from datetime import datetime, timedelta
import psycopg2
from psycopg2.extras import execute_batch
def get_last_business_day(current_date):
"""Return the last business day before the given date"""
offset = 1
if current_date.weekday() == 6: # Sunday
offset = 2
elif current_date.weekday() == 5: # Saturday
offset = 1
elif current_date.weekday() == 0: # Monday
offset = 3 # Go back to Friday
return current_date - timedelta(days=offset)
def setup_database(connection):
"""Setup the database tables for US Markets data"""
cursor = None
try:
cursor = connection.cursor()
# Check if tables exist first
cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'market_indices'
);
""")
tables_exist = cursor.fetchone()[0]
if not tables_exist:
# Create tables if they don't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS market_indices (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
name VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol)
);
CREATE TABLE IF NOT EXISTS index_quotes (
id SERIAL PRIMARY KEY,
index_id INTEGER REFERENCES market_indices(id),
price DECIMAL NULL,
change DECIMAL NULL,
changes_percentage DECIMAL NULL,
day_high DECIMAL NULL,
day_low DECIMAL NULL,
previous_close DECIMAL NULL,
volume BIGINT NULL,
trade_date DATE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(index_id, trade_date)
);
CREATE TABLE IF NOT EXISTS sector_performance (
id SERIAL PRIMARY KEY,
sector VARCHAR(100),
change_percentage DECIMAL NULL,
trade_date DATE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(sector, trade_date)
);
CREATE TABLE IF NOT EXISTS sector_historical (
id SERIAL PRIMARY KEY,
trade_date DATE NOT NULL,
basic_materials DECIMAL NULL,
communication_services DECIMAL NULL,
consumer_cyclical DECIMAL NULL,
consumer_defensive DECIMAL NULL,
energy DECIMAL NULL,
financial_services DECIMAL NULL,
healthcare DECIMAL NULL,
industrials DECIMAL NULL,
real_estate DECIMAL NULL,
technology DECIMAL NULL,
utilities DECIMAL NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(trade_date)
);
CREATE TABLE IF NOT EXISTS sector_pe_ratios (
id SERIAL PRIMARY KEY,
sector VARCHAR(100),
pe_ratio DECIMAL NULL,
exchange VARCHAR(20),
trade_date DATE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(sector, exchange, trade_date)
);
CREATE TABLE IF NOT EXISTS industry_pe_ratios (
id SERIAL PRIMARY KEY,
industry VARCHAR(100),
pe_ratio DECIMAL NULL,
exchange VARCHAR(20),
trade_date DATE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(industry, exchange, trade_date)
);
CREATE TABLE IF NOT EXISTS market_movers (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20),
name VARCHAR(255),
change DECIMAL NULL,
price DECIMAL NULL,
changes_percentage DECIMAL NULL,
mover_type VARCHAR(20),
trade_date DATE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, trade_date, mover_type)
);
-- Create indexes for better query performance
CREATE INDEX IF NOT EXISTS idx_index_quotes_date ON index_quotes(trade_date);
CREATE INDEX IF NOT EXISTS idx_sector_performance_date ON sector_performance(trade_date);
CREATE INDEX IF NOT EXISTS idx_sector_historical_date ON sector_historical(trade_date);
CREATE INDEX IF NOT EXISTS idx_sector_pe_date ON sector_pe_ratios(trade_date);
CREATE INDEX IF NOT EXISTS idx_industry_pe_date ON industry_pe_ratios(trade_date);
CREATE INDEX IF NOT EXISTS idx_market_movers_date ON market_movers(trade_date, mover_type);
""")
logging.info("Database tables created successfully")
else:
logging.info("Database tables already exist")
connection.commit()
return True
except Exception as e:
logging.error(f"Database setup error: {e}")
if connection:
connection.rollback()
return False
finally:
if cursor:
cursor.close()
def update_market_indices(connection, indices_data):
"""Update market indices"""
cursor = None
try:
cursor = connection.cursor()
current_date = get_last_business_day(datetime.now().date())
cursor.execute("""
SELECT COUNT(*) FROM index_quotes iq
JOIN market_indices mi ON iq.index_id = mi.id
WHERE iq.trade_date = %s
""", (current_date,))
count = cursor.fetchone()[0]
if count > 0:
logging.info(f"Market indices data already exists for {current_date}")
return 0
records_affected = 0
for index in indices_data:
try:
cursor.execute("""
INSERT INTO market_indices (symbol, name)
VALUES (%s, %s)
ON CONFLICT (symbol)
DO UPDATE SET name = EXCLUDED.name
RETURNING id
""", (index['symbol'], index.get('name', '')))
index_id = cursor.fetchone()[0]
cursor.execute("""
INSERT INTO index_quotes
(index_id, price, change, changes_percentage, day_high, day_low,
previous_close, volume, trade_date)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (index_id, trade_date)
DO UPDATE SET
price = EXCLUDED.price,
change = EXCLUDED.change,
changes_percentage = EXCLUDED.changes_percentage,
day_high = EXCLUDED.day_high,
day_low = EXCLUDED.day_low,
previous_close = EXCLUDED.previous_close,
volume = EXCLUDED.volume
""", (
index_id,
float(index.get('price', 0)) if index.get('price') is not None else None,
float(index.get('change', 0)) if index.get('change') is not None else None,
float(index.get('changesPercentage', 0)) if index.get('changesPercentage') is not None else None,
float(index.get('dayHigh', 0)) if index.get('dayHigh') is not None else None,
float(index.get('dayLow', 0)) if index.get('dayLow') is not None else None,
float(index.get('previousClose', 0)) if index.get('previousClose') is not None else None,
int(index.get('volume', 0)) if index.get('volume') is not None else None,
current_date
))
records_affected += 1
except Exception as e:
logging.warning(f"Error processing index {index.get('symbol', 'unknown')}: {e}")
continue
connection.commit()
return records_affected
except Exception as e:
logging.error(f"Error updating market indices: {e}")
if connection:
connection.rollback()
return 0
finally:
if cursor:
cursor.close()
def update_sector_performance(connection, sector_data):
"""Update sector performance"""
cursor = None
try:
cursor = connection.cursor()
current_date = get_last_business_day(datetime.now().date())
cursor.execute("""
SELECT COUNT(*) FROM sector_performance
WHERE trade_date = %s
""", (current_date,))
count = cursor.fetchone()[0]
if count > 0:
logging.info(f"Sector performance data already exists for {current_date}")
return 0
records_affected = 0
for sector in sector_data:
try:
change_percentage = float(str(sector.get('changesPercentage', '0')).replace('%', '')) if sector.get('changesPercentage') is not None else None
cursor.execute("""
INSERT INTO sector_performance
(sector, change_percentage, trade_date)
VALUES (%s, %s, %s)
ON CONFLICT (sector, trade_date)
DO UPDATE SET change_percentage = EXCLUDED.change_percentage
""", (
sector['sector'],
change_percentage,
current_date
))
records_affected += 1
except (ValueError, TypeError) as e:
logging.warning(f"Error processing sector {sector.get('sector', 'unknown')}: {e}")
continue
connection.commit()
return records_affected
except Exception as e:
logging.error(f"Error updating sector performance: {e}")
if connection:
connection.rollback()
return 0
finally:
if cursor:
cursor.close()
def update_sector_historical(connection, historical_data):
"""Update historical sector performance"""
cursor = None
try:
cursor = connection.cursor()
records_affected = 0
for data in historical_data:
try:
trade_date = datetime.strptime(data['date'], '%Y-%m-%d').date()
cursor.execute("""
INSERT INTO sector_historical (
trade_date, basic_materials, communication_services,
consumer_cyclical, consumer_defensive, energy,
financial_services, healthcare, industrials,
real_estate, technology, utilities
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (trade_date)
DO UPDATE SET
basic_materials = EXCLUDED.basic_materials,
communication_services = EXCLUDED.communication_services,
consumer_cyclical = EXCLUDED.consumer_cyclical,
consumer_defensive = EXCLUDED.consumer_defensive,
energy = EXCLUDED.energy,
financial_services = EXCLUDED.financial_services,
healthcare = EXCLUDED.healthcare,
industrials = EXCLUDED.industrials,
real_estate = EXCLUDED.real_estate,
technology = EXCLUDED.technology,
utilities = EXCLUDED.utilities
""", (
trade_date,
float(data.get('basicMaterialsChangesPercentage', 0)) if data.get('basicMaterialsChangesPercentage') is not None else None,
float(data.get('communicationServicesChangesPercentage', 0)) if data.get('communicationServicesChangesPercentage') is not None else None,
float(data.get('consumerCyclicalChangesPercentage', 0)) if data.get('consumerCyclicalChangesPercentage') is not None else None,
float(data.get('consumerDefensiveChangesPercentage', 0)) if data.get('consumerDefensiveChangesPercentage') is not None else None,
float(data.get('energyChangesPercentage', 0)) if data.get('energyChangesPercentage') is not None else None,
float(data.get('financialServicesChangesPercentage', 0)) if data.get('financialServicesChangesPercentage') is not None else None,
float(data.get('healthcareChangesPercentage', 0)) if data.get('healthcareChangesPercentage') is not None else None,
float(data.get('industrialsChangesPercentage', 0)) if data.get('industrialsChangesPercentage') is not None else None,
float(data.get('realEstateChangesPercentage', 0)) if data.get('realEstateChangesPercentage') is not None else None,
float(data.get('technologyChangesPercentage', 0)) if data.get('technologyChangesPercentage') is not None else None,
float(data.get('utilitiesChangesPercentage', 0)) if data.get('utilitiesChangesPercentage') is not None else None
))
records_affected += 1
except Exception as e:
logging.warning(f"Error processing historical data for date {data.get('date', 'unknown')}: {e}")
continue
connection.commit()
return records_affected
except Exception as e:
logging.error(f"Error updating sector historical data: {e}")
if connection:
connection.rollback()
return 0
finally:
if cursor:
cursor.close()
def update_pe_ratios(connection, pe_data, is_sector=True):
"""Update PE ratios"""
cursor = None
try:
cursor = connection.cursor()
table_name = 'sector_pe_ratios' if is_sector else 'industry_pe_ratios'
name_field = 'sector' if is_sector else 'industry'
if not pe_data:
logging.warning(f"No PE ratio data provided for {name_field}s")
return 0
trade_date = datetime.strptime(pe_data[0]['date'], '%Y-%m-%d').date()
cursor.execute(f"""
SELECT COUNT(*) FROM {table_name}
WHERE trade_date = %s
""", (trade_date,))
count = cursor.fetchone()[0]
if count > 0:
logging.info(f"{table_name} data already exists for {trade_date}")
return 0
records_affected = 0
for data in pe_data:
try:
cursor.execute(f"""
INSERT INTO {table_name}
({name_field}, pe_ratio, exchange, trade_date)
VALUES (%s, %s, %s, %s)
ON CONFLICT ({name_field}, exchange, trade_date)
DO UPDATE SET pe_ratio = EXCLUDED.pe_ratio
""", (
data[name_field],
float(data['pe']) if data.get('pe') is not None else None,
data['exchange'],
trade_date
))
records_affected += 1
except Exception as e:
logging.warning(f"Error processing PE ratio for {data.get(name_field, 'unknown')}: {e}")
continue
connection.commit()
return records_affected
except Exception as e:
logging.error(f"Error updating PE ratios: {e}")
if connection:
connection.rollback()
return 0
finally:
if cursor:
cursor.close()
def update_market_movers(connection, movers_data, mover_type):
"""Update market movers"""
cursor = None
try:
cursor = connection.cursor()
current_date = get_last_business_day(datetime.now().date())
cursor.execute("""
SELECT COUNT(*) FROM market_movers
WHERE trade_date = %s AND mover_type = %s
""", (current_date, mover_type))
count = cursor.fetchone()[0]
if count > 0:
logging.info(f"Market movers data for {mover_type} already exists for {current_date}")
return 0
records_affected = 0
for mover in movers_data:
try:
cursor.execute("""
INSERT INTO market_movers
(symbol, name, change, price, changes_percentage, mover_type, trade_date)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (symbol, trade_date, mover_type)
DO UPDATE SET
name = EXCLUDED.name,
change = EXCLUDED.change,
price = EXCLUDED.price,
changes_percentage = EXCLUDED.changes_percentage
""", (
mover['symbol'],
mover.get('name', ''),
float(mover.get('change', 0)) if mover.get('change') is not None else None,
float(mover.get('price', 0)) if mover.get('price') is not None else None,
float(mover.get('changesPercentage', 0)) if mover.get('changesPercentage') is not None else None,
mover_type,
current_date
))
records_affected += 1
except Exception as e:
logging.warning(f"Error processing market mover {mover.get('symbol', 'unknown')}: {e}")
continue
connection.commit()
return records_affected
except Exception as e:
logging.error(f"Error updating market movers: {e}")
if connection:
connection.rollback()
return 0
finally:
if cursor:
cursor.close()