456 lines
18 KiB
Python
456 lines
18 KiB
Python
import logging
|
|
from datetime import datetime, timedelta
|
|
import psycopg2
|
|
from psycopg2.extras import execute_batch
|
|
|
|
def get_last_business_day(current_date):
|
|
"""Return the last business day before the given date"""
|
|
offset = 1
|
|
if current_date.weekday() == 6: # Sunday
|
|
offset = 2
|
|
elif current_date.weekday() == 5: # Saturday
|
|
offset = 1
|
|
elif current_date.weekday() == 0: # Monday
|
|
offset = 3 # Go back to Friday
|
|
|
|
return current_date - timedelta(days=offset)
|
|
|
|
def setup_database(connection):
|
|
"""Setup the database tables for US Markets data"""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
|
|
# Check if tables exist first
|
|
cursor.execute("""
|
|
SELECT EXISTS (
|
|
SELECT FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_name = 'market_indices'
|
|
);
|
|
""")
|
|
tables_exist = cursor.fetchone()[0]
|
|
|
|
if not tables_exist:
|
|
# Create tables if they don't exist
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS market_indices (
|
|
id SERIAL PRIMARY KEY,
|
|
symbol VARCHAR(20) NOT NULL,
|
|
name VARCHAR(255),
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(symbol)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS index_quotes (
|
|
id SERIAL PRIMARY KEY,
|
|
index_id INTEGER REFERENCES market_indices(id),
|
|
price DECIMAL NULL,
|
|
change DECIMAL NULL,
|
|
changes_percentage DECIMAL NULL,
|
|
day_high DECIMAL NULL,
|
|
day_low DECIMAL NULL,
|
|
previous_close DECIMAL NULL,
|
|
volume BIGINT NULL,
|
|
trade_date DATE NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(index_id, trade_date)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS sector_performance (
|
|
id SERIAL PRIMARY KEY,
|
|
sector VARCHAR(100),
|
|
change_percentage DECIMAL NULL,
|
|
trade_date DATE NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(sector, trade_date)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS sector_historical (
|
|
id SERIAL PRIMARY KEY,
|
|
trade_date DATE NOT NULL,
|
|
basic_materials DECIMAL NULL,
|
|
communication_services DECIMAL NULL,
|
|
consumer_cyclical DECIMAL NULL,
|
|
consumer_defensive DECIMAL NULL,
|
|
energy DECIMAL NULL,
|
|
financial_services DECIMAL NULL,
|
|
healthcare DECIMAL NULL,
|
|
industrials DECIMAL NULL,
|
|
real_estate DECIMAL NULL,
|
|
technology DECIMAL NULL,
|
|
utilities DECIMAL NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(trade_date)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS sector_pe_ratios (
|
|
id SERIAL PRIMARY KEY,
|
|
sector VARCHAR(100),
|
|
pe_ratio DECIMAL NULL,
|
|
exchange VARCHAR(20),
|
|
trade_date DATE NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(sector, exchange, trade_date)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS industry_pe_ratios (
|
|
id SERIAL PRIMARY KEY,
|
|
industry VARCHAR(100),
|
|
pe_ratio DECIMAL NULL,
|
|
exchange VARCHAR(20),
|
|
trade_date DATE NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(industry, exchange, trade_date)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS market_movers (
|
|
id SERIAL PRIMARY KEY,
|
|
symbol VARCHAR(20),
|
|
name VARCHAR(255),
|
|
change DECIMAL NULL,
|
|
price DECIMAL NULL,
|
|
changes_percentage DECIMAL NULL,
|
|
mover_type VARCHAR(20),
|
|
trade_date DATE NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(symbol, trade_date, mover_type)
|
|
);
|
|
|
|
-- Create indexes for better query performance
|
|
CREATE INDEX IF NOT EXISTS idx_index_quotes_date ON index_quotes(trade_date);
|
|
CREATE INDEX IF NOT EXISTS idx_sector_performance_date ON sector_performance(trade_date);
|
|
CREATE INDEX IF NOT EXISTS idx_sector_historical_date ON sector_historical(trade_date);
|
|
CREATE INDEX IF NOT EXISTS idx_sector_pe_date ON sector_pe_ratios(trade_date);
|
|
CREATE INDEX IF NOT EXISTS idx_industry_pe_date ON industry_pe_ratios(trade_date);
|
|
CREATE INDEX IF NOT EXISTS idx_market_movers_date ON market_movers(trade_date, mover_type);
|
|
""")
|
|
|
|
logging.info("Database tables created successfully")
|
|
else:
|
|
logging.info("Database tables already exist")
|
|
|
|
connection.commit()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Database setup error: {e}")
|
|
if connection:
|
|
connection.rollback()
|
|
return False
|
|
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def update_market_indices(connection, indices_data):
|
|
"""Update market indices"""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
current_date = get_last_business_day(datetime.now().date())
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM index_quotes iq
|
|
JOIN market_indices mi ON iq.index_id = mi.id
|
|
WHERE iq.trade_date = %s
|
|
""", (current_date,))
|
|
|
|
count = cursor.fetchone()[0]
|
|
if count > 0:
|
|
logging.info(f"Market indices data already exists for {current_date}")
|
|
return 0
|
|
|
|
records_affected = 0
|
|
|
|
for index in indices_data:
|
|
try:
|
|
cursor.execute("""
|
|
INSERT INTO market_indices (symbol, name)
|
|
VALUES (%s, %s)
|
|
ON CONFLICT (symbol)
|
|
DO UPDATE SET name = EXCLUDED.name
|
|
RETURNING id
|
|
""", (index['symbol'], index.get('name', '')))
|
|
|
|
index_id = cursor.fetchone()[0]
|
|
|
|
cursor.execute("""
|
|
INSERT INTO index_quotes
|
|
(index_id, price, change, changes_percentage, day_high, day_low,
|
|
previous_close, volume, trade_date)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (index_id, trade_date)
|
|
DO UPDATE SET
|
|
price = EXCLUDED.price,
|
|
change = EXCLUDED.change,
|
|
changes_percentage = EXCLUDED.changes_percentage,
|
|
day_high = EXCLUDED.day_high,
|
|
day_low = EXCLUDED.day_low,
|
|
previous_close = EXCLUDED.previous_close,
|
|
volume = EXCLUDED.volume
|
|
""", (
|
|
index_id,
|
|
float(index.get('price', 0)) if index.get('price') is not None else None,
|
|
float(index.get('change', 0)) if index.get('change') is not None else None,
|
|
float(index.get('changesPercentage', 0)) if index.get('changesPercentage') is not None else None,
|
|
float(index.get('dayHigh', 0)) if index.get('dayHigh') is not None else None,
|
|
float(index.get('dayLow', 0)) if index.get('dayLow') is not None else None,
|
|
float(index.get('previousClose', 0)) if index.get('previousClose') is not None else None,
|
|
int(index.get('volume', 0)) if index.get('volume') is not None else None,
|
|
current_date
|
|
))
|
|
records_affected += 1
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Error processing index {index.get('symbol', 'unknown')}: {e}")
|
|
continue
|
|
|
|
connection.commit()
|
|
return records_affected
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error updating market indices: {e}")
|
|
if connection:
|
|
connection.rollback()
|
|
return 0
|
|
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def update_sector_performance(connection, sector_data):
|
|
"""Update sector performance"""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
current_date = get_last_business_day(datetime.now().date())
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM sector_performance
|
|
WHERE trade_date = %s
|
|
""", (current_date,))
|
|
|
|
count = cursor.fetchone()[0]
|
|
if count > 0:
|
|
logging.info(f"Sector performance data already exists for {current_date}")
|
|
return 0
|
|
|
|
records_affected = 0
|
|
|
|
for sector in sector_data:
|
|
try:
|
|
change_percentage = float(str(sector.get('changesPercentage', '0')).replace('%', '')) if sector.get('changesPercentage') is not None else None
|
|
|
|
cursor.execute("""
|
|
INSERT INTO sector_performance
|
|
(sector, change_percentage, trade_date)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (sector, trade_date)
|
|
DO UPDATE SET change_percentage = EXCLUDED.change_percentage
|
|
""", (
|
|
sector['sector'],
|
|
change_percentage,
|
|
current_date
|
|
))
|
|
records_affected += 1
|
|
|
|
except (ValueError, TypeError) as e:
|
|
logging.warning(f"Error processing sector {sector.get('sector', 'unknown')}: {e}")
|
|
continue
|
|
|
|
connection.commit()
|
|
return records_affected
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error updating sector performance: {e}")
|
|
if connection:
|
|
connection.rollback()
|
|
return 0
|
|
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def update_sector_historical(connection, historical_data):
|
|
"""Update historical sector performance"""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
records_affected = 0
|
|
|
|
for data in historical_data:
|
|
try:
|
|
trade_date = datetime.strptime(data['date'], '%Y-%m-%d').date()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO sector_historical (
|
|
trade_date, basic_materials, communication_services,
|
|
consumer_cyclical, consumer_defensive, energy,
|
|
financial_services, healthcare, industrials,
|
|
real_estate, technology, utilities
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (trade_date)
|
|
DO UPDATE SET
|
|
basic_materials = EXCLUDED.basic_materials,
|
|
communication_services = EXCLUDED.communication_services,
|
|
consumer_cyclical = EXCLUDED.consumer_cyclical,
|
|
consumer_defensive = EXCLUDED.consumer_defensive,
|
|
energy = EXCLUDED.energy,
|
|
financial_services = EXCLUDED.financial_services,
|
|
healthcare = EXCLUDED.healthcare,
|
|
industrials = EXCLUDED.industrials,
|
|
real_estate = EXCLUDED.real_estate,
|
|
technology = EXCLUDED.technology,
|
|
utilities = EXCLUDED.utilities
|
|
""", (
|
|
trade_date,
|
|
float(data.get('basicMaterialsChangesPercentage', 0)) if data.get('basicMaterialsChangesPercentage') is not None else None,
|
|
float(data.get('communicationServicesChangesPercentage', 0)) if data.get('communicationServicesChangesPercentage') is not None else None,
|
|
float(data.get('consumerCyclicalChangesPercentage', 0)) if data.get('consumerCyclicalChangesPercentage') is not None else None,
|
|
float(data.get('consumerDefensiveChangesPercentage', 0)) if data.get('consumerDefensiveChangesPercentage') is not None else None,
|
|
float(data.get('energyChangesPercentage', 0)) if data.get('energyChangesPercentage') is not None else None,
|
|
float(data.get('financialServicesChangesPercentage', 0)) if data.get('financialServicesChangesPercentage') is not None else None,
|
|
float(data.get('healthcareChangesPercentage', 0)) if data.get('healthcareChangesPercentage') is not None else None,
|
|
float(data.get('industrialsChangesPercentage', 0)) if data.get('industrialsChangesPercentage') is not None else None,
|
|
float(data.get('realEstateChangesPercentage', 0)) if data.get('realEstateChangesPercentage') is not None else None,
|
|
float(data.get('technologyChangesPercentage', 0)) if data.get('technologyChangesPercentage') is not None else None,
|
|
float(data.get('utilitiesChangesPercentage', 0)) if data.get('utilitiesChangesPercentage') is not None else None
|
|
))
|
|
records_affected += 1
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Error processing historical data for date {data.get('date', 'unknown')}: {e}")
|
|
continue
|
|
|
|
connection.commit()
|
|
return records_affected
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error updating sector historical data: {e}")
|
|
if connection:
|
|
connection.rollback()
|
|
return 0
|
|
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def update_pe_ratios(connection, pe_data, is_sector=True):
|
|
"""Update PE ratios"""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
table_name = 'sector_pe_ratios' if is_sector else 'industry_pe_ratios'
|
|
name_field = 'sector' if is_sector else 'industry'
|
|
|
|
if not pe_data:
|
|
logging.warning(f"No PE ratio data provided for {name_field}s")
|
|
return 0
|
|
|
|
trade_date = datetime.strptime(pe_data[0]['date'], '%Y-%m-%d').date()
|
|
|
|
cursor.execute(f"""
|
|
SELECT COUNT(*) FROM {table_name}
|
|
WHERE trade_date = %s
|
|
""", (trade_date,))
|
|
|
|
count = cursor.fetchone()[0]
|
|
if count > 0:
|
|
logging.info(f"{table_name} data already exists for {trade_date}")
|
|
return 0
|
|
|
|
records_affected = 0
|
|
|
|
for data in pe_data:
|
|
try:
|
|
cursor.execute(f"""
|
|
INSERT INTO {table_name}
|
|
({name_field}, pe_ratio, exchange, trade_date)
|
|
VALUES (%s, %s, %s, %s)
|
|
ON CONFLICT ({name_field}, exchange, trade_date)
|
|
DO UPDATE SET pe_ratio = EXCLUDED.pe_ratio
|
|
""", (
|
|
data[name_field],
|
|
float(data['pe']) if data.get('pe') is not None else None,
|
|
data['exchange'],
|
|
trade_date
|
|
))
|
|
records_affected += 1
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Error processing PE ratio for {data.get(name_field, 'unknown')}: {e}")
|
|
continue
|
|
|
|
connection.commit()
|
|
return records_affected
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error updating PE ratios: {e}")
|
|
if connection:
|
|
connection.rollback()
|
|
return 0
|
|
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def update_market_movers(connection, movers_data, mover_type):
|
|
"""Update market movers"""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
current_date = get_last_business_day(datetime.now().date())
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM market_movers
|
|
WHERE trade_date = %s AND mover_type = %s
|
|
""", (current_date, mover_type))
|
|
|
|
count = cursor.fetchone()[0]
|
|
if count > 0:
|
|
logging.info(f"Market movers data for {mover_type} already exists for {current_date}")
|
|
return 0
|
|
|
|
records_affected = 0
|
|
|
|
for mover in movers_data:
|
|
try:
|
|
cursor.execute("""
|
|
INSERT INTO market_movers
|
|
(symbol, name, change, price, changes_percentage, mover_type, trade_date)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (symbol, trade_date, mover_type)
|
|
DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
change = EXCLUDED.change,
|
|
price = EXCLUDED.price,
|
|
changes_percentage = EXCLUDED.changes_percentage
|
|
""", (
|
|
mover['symbol'],
|
|
mover.get('name', ''),
|
|
float(mover.get('change', 0)) if mover.get('change') is not None else None,
|
|
float(mover.get('price', 0)) if mover.get('price') is not None else None,
|
|
float(mover.get('changesPercentage', 0)) if mover.get('changesPercentage') is not None else None,
|
|
mover_type,
|
|
current_date
|
|
))
|
|
records_affected += 1
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Error processing market mover {mover.get('symbol', 'unknown')}: {e}")
|
|
continue
|
|
|
|
connection.commit()
|
|
return records_affected
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error updating market movers: {e}")
|
|
if connection:
|
|
connection.rollback()
|
|
return 0
|
|
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|