RivaCube/update_metrics.py
2025-02-04 19:31:18 +01:00

177 lines
6.3 KiB
Python

import os
import logging
from datetime import datetime
import psycopg2
from dotenv import load_dotenv
from utils.Stocks.financials.metrics_collector import FinancialMetricsCollector
from utils.Stocks.financials.metrics_utils import (
ensure_tables_exist,
get_last_update_status,
needs_update,
save_financial_metric,
save_owner_earnings
)
def setup_logging() -> None:
"""Configure logging with both file and console output."""
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f'metrics_update_{datetime.now():%Y%m%d_%H%M%S}.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
def process_historical_metrics(
conn: psycopg2.extensions.connection,
ticker_id: int,
ticker_symbol: str,
collector: FinancialMetricsCollector,
period: str
) -> None:
"""Process historical metrics for a ticker."""
# Standard metrics
metric_types = {
'key_metrics': collector.get_key_metrics,
'ratios': collector.get_ratios,
'enterprise': collector.get_enterprise_values
}
for metric_type, collector_func in metric_types.items():
metrics = collector_func(ticker_symbol, period)
for metric in metrics:
save_financial_metric(conn, ticker_id, metric_type, period, metric)
# Growth metrics
growth_collectors = {
'cash_flow': collector.get_cash_flow_growth,
'income': collector.get_income_growth,
'balance': collector.get_balance_sheet_growth,
'financial': collector.get_financial_growth
}
for growth_type, growth_collector in growth_collectors.items():
growth_data = growth_collector(ticker_symbol, period)
for growth in growth_data:
growth_entry = {
'type': growth_type,
'date': growth['date'],
'data': growth
}
save_financial_metric(conn, ticker_id, 'growth', period, growth_entry)
def process_owner_earnings(
conn: psycopg2.extensions.connection,
ticker_id: int,
ticker_symbol: str,
collector: FinancialMetricsCollector
) -> None:
"""Process owner earnings for a ticker."""
owner_earnings = collector.get_owner_earnings(ticker_symbol)
for earnings in owner_earnings:
save_owner_earnings(conn, ticker_id, earnings)
def process_ticker_metrics(
conn: psycopg2.extensions.connection,
ticker_id: int,
ticker_symbol: str,
collector: FinancialMetricsCollector
) -> None:
"""Process all metrics for a single ticker."""
# Get current update status
update_status = get_last_update_status(conn, ticker_id)
logging.debug(f"Current update status for {ticker_symbol}: {update_status}")
# Process TTM metrics (daily update)
if needs_update(update_status.get('key_metrics_ttm'), 'daily'):
logging.info(f"Updating TTM key metrics for {ticker_symbol}")
key_metrics_ttm = collector.get_key_metrics_ttm(ticker_symbol)
for metric in key_metrics_ttm:
metric['date'] = datetime.now().date().strftime('%Y-%m-%d')
save_financial_metric(conn, ticker_id, 'key_metrics', 'ttm', metric)
else:
logging.info(f"Skipping TTM key metrics for {ticker_symbol} - up to date")
if needs_update(update_status.get('ratios_ttm'), 'daily'):
logging.info(f"Updating TTM ratios for {ticker_symbol}")
ratios_ttm = collector.get_ratios_ttm(ticker_symbol)
for ratio in ratios_ttm:
ratio['date'] = datetime.now().date().strftime('%Y-%m-%d')
save_financial_metric(conn, ticker_id, 'ratios', 'ttm', ratio)
else:
logging.info(f"Skipping TTM ratios for {ticker_symbol} - up to date")
# Process historical metrics (quarterly update)
for period in ['annual', 'quarter']:
if needs_update(update_status.get(f'key_metrics_{period}'), 'quarterly'):
logging.info(f"Updating {period} metrics for {ticker_symbol}")
process_historical_metrics(conn, ticker_id, ticker_symbol, collector, period)
else:
logging.info(f"Skipping {period} metrics for {ticker_symbol} - up to date")
# Process owner earnings (quarterly update)
if needs_update(update_status.get('owner_earnings'), 'quarterly'):
logging.info(f"Updating owner earnings for {ticker_symbol}")
process_owner_earnings(conn, ticker_id, ticker_symbol, collector)
else:
logging.info(f"Skipping owner earnings for {ticker_symbol} - up to date")
def update_all_metrics(conn: psycopg2.extensions.connection, collector: FinancialMetricsCollector) -> None:
"""Update all metrics for all tickers."""
cursor = None
try:
cursor = conn.cursor()
cursor.execute("SELECT id, yf_ticker FROM tickers WHERE yf_ticker IS NOT NULL")
tickers = cursor.fetchall()
for ticker_id, ticker_symbol in tickers:
logging.info(f"Processing metrics for {ticker_symbol}")
try:
process_ticker_metrics(conn, ticker_id, ticker_symbol, collector)
logging.info(f"Completed processing metrics for {ticker_symbol}")
except Exception as e:
logging.error(f"Error processing metrics for {ticker_symbol}: {e}")
continue
except Exception as e:
logging.error(f"Error in update_all_metrics: {e}")
finally:
if cursor:
cursor.close()
def main() -> None:
"""Main function to run the metrics update process."""
load_dotenv()
setup_logging()
conn = None
try:
conn = psycopg2.connect(
dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT')
)
ensure_tables_exist(conn)
collector = FinancialMetricsCollector(os.getenv('FMP_API_KEY'))
update_all_metrics(conn, collector)
logging.info("Metrics update completed successfully")
except Exception as e:
logging.error(f"Error in main: {e}")
finally:
if conn:
conn.close()
if __name__ == "__main__":
main()