RivaCube/update_USmarkets.py
2025-02-04 19:31:18 +01:00

252 lines
9.0 KiB
Python

import os
import logging
from datetime import datetime, timedelta
import psycopg2
from dotenv import load_dotenv
import sys
import argparse
from utils.usmarkets.utils import (
setup_database, update_market_indices, update_sector_performance,
update_sector_historical, update_pe_ratios, update_market_movers,
get_last_business_day
)
from utils.usmarkets.collector import USMarketsClient
def setup_logging():
"""Setup logging configuration"""
log_dir = 'logs/USMarkets'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
log_file = os.path.join(log_dir, f'market_update_{timestamp}.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Update US Markets data')
parser.add_argument(
'--components',
nargs='+',
choices=['indices', 'sectors', 'historical', 'pe_ratios', 'movers', 'all'],
default=['all'],
help='Specify which components to update'
)
parser.add_argument(
'--force',
action='store_true',
default=False,
help='Force update even if data exists for current date'
)
return parser.parse_args()
def should_update_component(connection, component, date, force=False):
"""Check if component should be updated"""
if force:
return True
cursor = None
try:
cursor = connection.cursor()
if component == 'indices':
cursor.execute("""
SELECT COUNT(*) FROM index_quotes iq
JOIN market_indices mi ON iq.index_id = mi.id
WHERE iq.trade_date = %s
""", (date,))
elif component == 'sectors':
cursor.execute("""
SELECT COUNT(*) FROM sector_performance
WHERE trade_date = %s
""", (date,))
elif component == 'pe_ratios':
cursor.execute("""
SELECT COUNT(*) FROM sector_pe_ratios
WHERE trade_date = %s
""", (date,))
elif component == 'movers':
cursor.execute("""
SELECT COUNT(*) FROM market_movers
WHERE trade_date = %s
""", (date,))
else:
return True
count = cursor.fetchone()[0]
return count == 0
except Exception as e:
logging.error(f"Error checking component status: {e}")
return True
finally:
if cursor:
cursor.close()
def update_market_data(connection, client, component, force=False):
"""Update specific market data component"""
try:
current_date = datetime.now().date()
last_business_day = get_last_business_day(current_date)
records_affected = 0
if not should_update_component(connection, component, last_business_day, force):
logging.info(f"Skipping {component} - data already exists for {last_business_day}")
return 0
if component == 'indices':
logging.info("Fetching market indices...")
data = client.get_market_indices()
if data:
records_affected = update_market_indices(connection, data)
logging.info(f"Updated {records_affected} market indices")
else:
logging.warning("No market indices data received")
elif component == 'sectors':
logging.info("Fetching sector performance...")
data = client.get_sector_performance()
if data:
records_affected = update_sector_performance(connection, data)
logging.info(f"Updated {records_affected} sector performance records")
else:
logging.warning("No sector performance data received")
elif component == 'historical':
logging.info("Fetching historical sector performance...")
end_date = last_business_day
start_date = end_date - timedelta(days=7)
logging.info(f"Fetching historical data from {start_date} to {end_date}")
data = client.get_sector_historical(
start_date.strftime('%Y-%m-%d'),
end_date.strftime('%Y-%m-%d')
)
if data:
records_affected = update_sector_historical(connection, data)
logging.info(f"Updated {records_affected} historical sector records")
else:
logging.warning("No historical sector data received")
elif component == 'pe_ratios':
logging.info("Fetching PE ratios...")
date_str = last_business_day.strftime('%Y-%m-%d')
logging.info(f"Using date: {date_str} for PE ratios")
sector_data = client.get_sector_pe_ratios(date_str)
if sector_data:
sector_records = update_pe_ratios(connection, sector_data, is_sector=True)
records_affected += sector_records
logging.info(f"Updated {sector_records} sector PE ratios")
else:
logging.warning(f"No sector PE ratio data available for {date_str}")
industry_data = client.get_industry_pe_ratios(date_str)
if industry_data:
industry_records = update_pe_ratios(connection, industry_data, is_sector=False)
records_affected += industry_records
logging.info(f"Updated {industry_records} industry PE ratios")
else:
logging.warning(f"No industry PE ratio data available for {date_str}")
elif component == 'movers':
logging.info("Fetching market movers...")
gainers_data = client.get_market_movers('gainers')
if gainers_data:
gainers_records = update_market_movers(connection, gainers_data, 'gainers')
records_affected += gainers_records
logging.info(f"Updated {gainers_records} gainers")
else:
logging.warning("No gainers data received")
losers_data = client.get_market_movers('losers')
if losers_data:
losers_records = update_market_movers(connection, losers_data, 'losers')
records_affected += losers_records
logging.info(f"Updated {losers_records} losers")
else:
logging.warning("No losers data received")
actives_data = client.get_market_actives()
if actives_data:
actives_records = update_market_movers(connection, actives_data, 'actives')
records_affected += actives_records
logging.info(f"Updated {actives_records} actives")
else:
logging.warning("No active movers data received")
return records_affected
except Exception as e:
logging.error(f"Error updating {component}: {e}")
if connection:
connection.rollback()
return 0
def main():
load_dotenv()
setup_logging()
logging.info("Starting US Markets update process...")
connection = None
try:
# Database connection
connection = psycopg2.connect(
dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT')
)
connection.autocommit = False # Explicit transaction control
# Ensure database is setup
if not setup_database(connection):
raise Exception("Database setup failed")
# Initialize API client
client = USMarketsClient(
api_key=os.getenv('FMP_API_KEY'),
rate_limit_per_minute=int(os.getenv('API_RATE_LIMIT', '300'))
)
args = parse_arguments()
components_to_update = ['indices', 'sectors', 'historical', 'pe_ratios', 'movers'] \
if 'all' in args.components else args.components
total_records_updated = 0
for component in components_to_update:
try:
records = update_market_data(connection, client, component, args.force)
total_records_updated += records if records else 0
logging.info(f"Completed updating {component}")
except Exception as e:
logging.error(f"Failed to update {component}: {e}")
continue
logging.info(f"Update process completed. Total records updated: {total_records_updated}")
except Exception as e:
logging.error(f"Critical error in main execution: {str(e)}")
if connection:
connection.rollback()
sys.exit(1)
finally:
if connection:
connection.close()
logging.info("Database connection closed")
if __name__ == "__main__":
main()