import os import logging from datetime import datetime, timedelta import psycopg2 from dotenv import load_dotenv import sys import argparse from utils.usmarkets.utils import ( setup_database, update_market_indices, update_sector_performance, update_sector_historical, update_pe_ratios, update_market_movers, get_last_business_day ) from utils.usmarkets.collector import USMarketsClient def setup_logging(): """Setup logging configuration""" log_dir = 'logs/USMarkets' if not os.path.exists(log_dir): os.makedirs(log_dir) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') log_file = os.path.join(log_dir, f'market_update_{timestamp}.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser(description='Update US Markets data') parser.add_argument( '--components', nargs='+', choices=['indices', 'sectors', 'historical', 'pe_ratios', 'movers', 'all'], default=['all'], help='Specify which components to update' ) parser.add_argument( '--force', action='store_true', default=False, help='Force update even if data exists for current date' ) return parser.parse_args() def should_update_component(connection, component, date, force=False): """Check if component should be updated""" if force: return True cursor = None try: cursor = connection.cursor() if component == 'indices': cursor.execute(""" SELECT COUNT(*) FROM index_quotes iq JOIN market_indices mi ON iq.index_id = mi.id WHERE iq.trade_date = %s """, (date,)) elif component == 'sectors': cursor.execute(""" SELECT COUNT(*) FROM sector_performance WHERE trade_date = %s """, (date,)) elif component == 'pe_ratios': cursor.execute(""" SELECT COUNT(*) FROM sector_pe_ratios WHERE trade_date = %s """, (date,)) elif component == 'movers': cursor.execute(""" SELECT COUNT(*) FROM market_movers WHERE trade_date = %s """, (date,)) else: return True count = cursor.fetchone()[0] return count == 0 except Exception as e: logging.error(f"Error checking component status: {e}") return True finally: if cursor: cursor.close() def update_market_data(connection, client, component, force=False): """Update specific market data component""" try: current_date = datetime.now().date() last_business_day = get_last_business_day(current_date) records_affected = 0 if not should_update_component(connection, component, last_business_day, force): logging.info(f"Skipping {component} - data already exists for {last_business_day}") return 0 if component == 'indices': logging.info("Fetching market indices...") data = client.get_market_indices() if data: records_affected = update_market_indices(connection, data) logging.info(f"Updated {records_affected} market indices") else: logging.warning("No market indices data received") elif component == 'sectors': logging.info("Fetching sector performance...") data = client.get_sector_performance() if data: records_affected = update_sector_performance(connection, data) logging.info(f"Updated {records_affected} sector performance records") else: logging.warning("No sector performance data received") elif component == 'historical': logging.info("Fetching historical sector performance...") end_date = last_business_day start_date = end_date - timedelta(days=7) logging.info(f"Fetching historical data from {start_date} to {end_date}") data = client.get_sector_historical( start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d') ) if data: records_affected = update_sector_historical(connection, data) logging.info(f"Updated {records_affected} historical sector records") else: logging.warning("No historical sector data received") elif component == 'pe_ratios': logging.info("Fetching PE ratios...") date_str = last_business_day.strftime('%Y-%m-%d') logging.info(f"Using date: {date_str} for PE ratios") sector_data = client.get_sector_pe_ratios(date_str) if sector_data: sector_records = update_pe_ratios(connection, sector_data, is_sector=True) records_affected += sector_records logging.info(f"Updated {sector_records} sector PE ratios") else: logging.warning(f"No sector PE ratio data available for {date_str}") industry_data = client.get_industry_pe_ratios(date_str) if industry_data: industry_records = update_pe_ratios(connection, industry_data, is_sector=False) records_affected += industry_records logging.info(f"Updated {industry_records} industry PE ratios") else: logging.warning(f"No industry PE ratio data available for {date_str}") elif component == 'movers': logging.info("Fetching market movers...") gainers_data = client.get_market_movers('gainers') if gainers_data: gainers_records = update_market_movers(connection, gainers_data, 'gainers') records_affected += gainers_records logging.info(f"Updated {gainers_records} gainers") else: logging.warning("No gainers data received") losers_data = client.get_market_movers('losers') if losers_data: losers_records = update_market_movers(connection, losers_data, 'losers') records_affected += losers_records logging.info(f"Updated {losers_records} losers") else: logging.warning("No losers data received") actives_data = client.get_market_actives() if actives_data: actives_records = update_market_movers(connection, actives_data, 'actives') records_affected += actives_records logging.info(f"Updated {actives_records} actives") else: logging.warning("No active movers data received") return records_affected except Exception as e: logging.error(f"Error updating {component}: {e}") if connection: connection.rollback() return 0 def main(): load_dotenv() setup_logging() logging.info("Starting US Markets update process...") connection = None try: # Database connection connection = psycopg2.connect( dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT') ) connection.autocommit = False # Explicit transaction control # Ensure database is setup if not setup_database(connection): raise Exception("Database setup failed") # Initialize API client client = USMarketsClient( api_key=os.getenv('FMP_API_KEY'), rate_limit_per_minute=int(os.getenv('API_RATE_LIMIT', '300')) ) args = parse_arguments() components_to_update = ['indices', 'sectors', 'historical', 'pe_ratios', 'movers'] \ if 'all' in args.components else args.components total_records_updated = 0 for component in components_to_update: try: records = update_market_data(connection, client, component, args.force) total_records_updated += records if records else 0 logging.info(f"Completed updating {component}") except Exception as e: logging.error(f"Failed to update {component}: {e}") continue logging.info(f"Update process completed. Total records updated: {total_records_updated}") except Exception as e: logging.error(f"Critical error in main execution: {str(e)}") if connection: connection.rollback() sys.exit(1) finally: if connection: connection.close() logging.info("Database connection closed") if __name__ == "__main__": main()