import logging from datetime import datetime, timedelta import psycopg2 from psycopg2.extras import execute_batch def get_last_business_day(current_date): """Return the last business day before the given date""" offset = 1 if current_date.weekday() == 6: # Sunday offset = 2 elif current_date.weekday() == 5: # Saturday offset = 1 elif current_date.weekday() == 0: # Monday offset = 3 # Go back to Friday return current_date - timedelta(days=offset) def setup_database(connection): """Setup the database tables for US Markets data""" cursor = None try: cursor = connection.cursor() # Check if tables exist first cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'market_indices' ); """) tables_exist = cursor.fetchone()[0] if not tables_exist: # Create tables if they don't exist cursor.execute(""" CREATE TABLE IF NOT EXISTS market_indices ( id SERIAL PRIMARY KEY, symbol VARCHAR(20) NOT NULL, name VARCHAR(255), created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(symbol) ); CREATE TABLE IF NOT EXISTS index_quotes ( id SERIAL PRIMARY KEY, index_id INTEGER REFERENCES market_indices(id), price DECIMAL NULL, change DECIMAL NULL, changes_percentage DECIMAL NULL, day_high DECIMAL NULL, day_low DECIMAL NULL, previous_close DECIMAL NULL, volume BIGINT NULL, trade_date DATE NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(index_id, trade_date) ); CREATE TABLE IF NOT EXISTS sector_performance ( id SERIAL PRIMARY KEY, sector VARCHAR(100), change_percentage DECIMAL NULL, trade_date DATE NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(sector, trade_date) ); CREATE TABLE IF NOT EXISTS sector_historical ( id SERIAL PRIMARY KEY, trade_date DATE NOT NULL, basic_materials DECIMAL NULL, communication_services DECIMAL NULL, consumer_cyclical DECIMAL NULL, consumer_defensive DECIMAL NULL, energy DECIMAL NULL, financial_services DECIMAL NULL, healthcare DECIMAL NULL, industrials DECIMAL NULL, real_estate DECIMAL NULL, technology DECIMAL NULL, utilities DECIMAL NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(trade_date) ); CREATE TABLE IF NOT EXISTS sector_pe_ratios ( id SERIAL PRIMARY KEY, sector VARCHAR(100), pe_ratio DECIMAL NULL, exchange VARCHAR(20), trade_date DATE NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(sector, exchange, trade_date) ); CREATE TABLE IF NOT EXISTS industry_pe_ratios ( id SERIAL PRIMARY KEY, industry VARCHAR(100), pe_ratio DECIMAL NULL, exchange VARCHAR(20), trade_date DATE NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(industry, exchange, trade_date) ); CREATE TABLE IF NOT EXISTS market_movers ( id SERIAL PRIMARY KEY, symbol VARCHAR(20), name VARCHAR(255), change DECIMAL NULL, price DECIMAL NULL, changes_percentage DECIMAL NULL, mover_type VARCHAR(20), trade_date DATE NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE(symbol, trade_date, mover_type) ); -- Create indexes for better query performance CREATE INDEX IF NOT EXISTS idx_index_quotes_date ON index_quotes(trade_date); CREATE INDEX IF NOT EXISTS idx_sector_performance_date ON sector_performance(trade_date); CREATE INDEX IF NOT EXISTS idx_sector_historical_date ON sector_historical(trade_date); CREATE INDEX IF NOT EXISTS idx_sector_pe_date ON sector_pe_ratios(trade_date); CREATE INDEX IF NOT EXISTS idx_industry_pe_date ON industry_pe_ratios(trade_date); CREATE INDEX IF NOT EXISTS idx_market_movers_date ON market_movers(trade_date, mover_type); """) logging.info("Database tables created successfully") else: logging.info("Database tables already exist") connection.commit() return True except Exception as e: logging.error(f"Database setup error: {e}") if connection: connection.rollback() return False finally: if cursor: cursor.close() def update_market_indices(connection, indices_data): """Update market indices""" cursor = None try: cursor = connection.cursor() current_date = get_last_business_day(datetime.now().date()) cursor.execute(""" SELECT COUNT(*) FROM index_quotes iq JOIN market_indices mi ON iq.index_id = mi.id WHERE iq.trade_date = %s """, (current_date,)) count = cursor.fetchone()[0] if count > 0: logging.info(f"Market indices data already exists for {current_date}") return 0 records_affected = 0 for index in indices_data: try: cursor.execute(""" INSERT INTO market_indices (symbol, name) VALUES (%s, %s) ON CONFLICT (symbol) DO UPDATE SET name = EXCLUDED.name RETURNING id """, (index['symbol'], index.get('name', ''))) index_id = cursor.fetchone()[0] cursor.execute(""" INSERT INTO index_quotes (index_id, price, change, changes_percentage, day_high, day_low, previous_close, volume, trade_date) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (index_id, trade_date) DO UPDATE SET price = EXCLUDED.price, change = EXCLUDED.change, changes_percentage = EXCLUDED.changes_percentage, day_high = EXCLUDED.day_high, day_low = EXCLUDED.day_low, previous_close = EXCLUDED.previous_close, volume = EXCLUDED.volume """, ( index_id, float(index.get('price', 0)) if index.get('price') is not None else None, float(index.get('change', 0)) if index.get('change') is not None else None, float(index.get('changesPercentage', 0)) if index.get('changesPercentage') is not None else None, float(index.get('dayHigh', 0)) if index.get('dayHigh') is not None else None, float(index.get('dayLow', 0)) if index.get('dayLow') is not None else None, float(index.get('previousClose', 0)) if index.get('previousClose') is not None else None, int(index.get('volume', 0)) if index.get('volume') is not None else None, current_date )) records_affected += 1 except Exception as e: logging.warning(f"Error processing index {index.get('symbol', 'unknown')}: {e}") continue connection.commit() return records_affected except Exception as e: logging.error(f"Error updating market indices: {e}") if connection: connection.rollback() return 0 finally: if cursor: cursor.close() def update_sector_performance(connection, sector_data): """Update sector performance""" cursor = None try: cursor = connection.cursor() current_date = get_last_business_day(datetime.now().date()) cursor.execute(""" SELECT COUNT(*) FROM sector_performance WHERE trade_date = %s """, (current_date,)) count = cursor.fetchone()[0] if count > 0: logging.info(f"Sector performance data already exists for {current_date}") return 0 records_affected = 0 for sector in sector_data: try: change_percentage = float(str(sector.get('changesPercentage', '0')).replace('%', '')) if sector.get('changesPercentage') is not None else None cursor.execute(""" INSERT INTO sector_performance (sector, change_percentage, trade_date) VALUES (%s, %s, %s) ON CONFLICT (sector, trade_date) DO UPDATE SET change_percentage = EXCLUDED.change_percentage """, ( sector['sector'], change_percentage, current_date )) records_affected += 1 except (ValueError, TypeError) as e: logging.warning(f"Error processing sector {sector.get('sector', 'unknown')}: {e}") continue connection.commit() return records_affected except Exception as e: logging.error(f"Error updating sector performance: {e}") if connection: connection.rollback() return 0 finally: if cursor: cursor.close() def update_sector_historical(connection, historical_data): """Update historical sector performance""" cursor = None try: cursor = connection.cursor() records_affected = 0 for data in historical_data: try: trade_date = datetime.strptime(data['date'], '%Y-%m-%d').date() cursor.execute(""" INSERT INTO sector_historical ( trade_date, basic_materials, communication_services, consumer_cyclical, consumer_defensive, energy, financial_services, healthcare, industrials, real_estate, technology, utilities ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (trade_date) DO UPDATE SET basic_materials = EXCLUDED.basic_materials, communication_services = EXCLUDED.communication_services, consumer_cyclical = EXCLUDED.consumer_cyclical, consumer_defensive = EXCLUDED.consumer_defensive, energy = EXCLUDED.energy, financial_services = EXCLUDED.financial_services, healthcare = EXCLUDED.healthcare, industrials = EXCLUDED.industrials, real_estate = EXCLUDED.real_estate, technology = EXCLUDED.technology, utilities = EXCLUDED.utilities """, ( trade_date, float(data.get('basicMaterialsChangesPercentage', 0)) if data.get('basicMaterialsChangesPercentage') is not None else None, float(data.get('communicationServicesChangesPercentage', 0)) if data.get('communicationServicesChangesPercentage') is not None else None, float(data.get('consumerCyclicalChangesPercentage', 0)) if data.get('consumerCyclicalChangesPercentage') is not None else None, float(data.get('consumerDefensiveChangesPercentage', 0)) if data.get('consumerDefensiveChangesPercentage') is not None else None, float(data.get('energyChangesPercentage', 0)) if data.get('energyChangesPercentage') is not None else None, float(data.get('financialServicesChangesPercentage', 0)) if data.get('financialServicesChangesPercentage') is not None else None, float(data.get('healthcareChangesPercentage', 0)) if data.get('healthcareChangesPercentage') is not None else None, float(data.get('industrialsChangesPercentage', 0)) if data.get('industrialsChangesPercentage') is not None else None, float(data.get('realEstateChangesPercentage', 0)) if data.get('realEstateChangesPercentage') is not None else None, float(data.get('technologyChangesPercentage', 0)) if data.get('technologyChangesPercentage') is not None else None, float(data.get('utilitiesChangesPercentage', 0)) if data.get('utilitiesChangesPercentage') is not None else None )) records_affected += 1 except Exception as e: logging.warning(f"Error processing historical data for date {data.get('date', 'unknown')}: {e}") continue connection.commit() return records_affected except Exception as e: logging.error(f"Error updating sector historical data: {e}") if connection: connection.rollback() return 0 finally: if cursor: cursor.close() def update_pe_ratios(connection, pe_data, is_sector=True): """Update PE ratios""" cursor = None try: cursor = connection.cursor() table_name = 'sector_pe_ratios' if is_sector else 'industry_pe_ratios' name_field = 'sector' if is_sector else 'industry' if not pe_data: logging.warning(f"No PE ratio data provided for {name_field}s") return 0 trade_date = datetime.strptime(pe_data[0]['date'], '%Y-%m-%d').date() cursor.execute(f""" SELECT COUNT(*) FROM {table_name} WHERE trade_date = %s """, (trade_date,)) count = cursor.fetchone()[0] if count > 0: logging.info(f"{table_name} data already exists for {trade_date}") return 0 records_affected = 0 for data in pe_data: try: cursor.execute(f""" INSERT INTO {table_name} ({name_field}, pe_ratio, exchange, trade_date) VALUES (%s, %s, %s, %s) ON CONFLICT ({name_field}, exchange, trade_date) DO UPDATE SET pe_ratio = EXCLUDED.pe_ratio """, ( data[name_field], float(data['pe']) if data.get('pe') is not None else None, data['exchange'], trade_date )) records_affected += 1 except Exception as e: logging.warning(f"Error processing PE ratio for {data.get(name_field, 'unknown')}: {e}") continue connection.commit() return records_affected except Exception as e: logging.error(f"Error updating PE ratios: {e}") if connection: connection.rollback() return 0 finally: if cursor: cursor.close() def update_market_movers(connection, movers_data, mover_type): """Update market movers""" cursor = None try: cursor = connection.cursor() current_date = get_last_business_day(datetime.now().date()) cursor.execute(""" SELECT COUNT(*) FROM market_movers WHERE trade_date = %s AND mover_type = %s """, (current_date, mover_type)) count = cursor.fetchone()[0] if count > 0: logging.info(f"Market movers data for {mover_type} already exists for {current_date}") return 0 records_affected = 0 for mover in movers_data: try: cursor.execute(""" INSERT INTO market_movers (symbol, name, change, price, changes_percentage, mover_type, trade_date) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (symbol, trade_date, mover_type) DO UPDATE SET name = EXCLUDED.name, change = EXCLUDED.change, price = EXCLUDED.price, changes_percentage = EXCLUDED.changes_percentage """, ( mover['symbol'], mover.get('name', ''), float(mover.get('change', 0)) if mover.get('change') is not None else None, float(mover.get('price', 0)) if mover.get('price') is not None else None, float(mover.get('changesPercentage', 0)) if mover.get('changesPercentage') is not None else None, mover_type, current_date )) records_affected += 1 except Exception as e: logging.warning(f"Error processing market mover {mover.get('symbol', 'unknown')}: {e}") continue connection.commit() return records_affected except Exception as e: logging.error(f"Error updating market movers: {e}") if connection: connection.rollback() return 0 finally: if cursor: cursor.close()