RivaCube/update_valuations.py
2025-02-04 19:31:18 +01:00

177 lines
6.3 KiB
Python

import os
import logging
import json
from datetime import datetime
import psycopg2
from dotenv import load_dotenv
from typing import List, Dict, Tuple
import time
from utils.Stocks.valuations.collector import ValuationCollector
from utils.Stocks.valuations.utils import (
save_dcf_valuation,
save_company_rating,
init_database,
setup_logging
)
def check_ticker_updates(conn: psycopg2.extensions.connection, ticker_id: int) -> bool:
"""
Check if a ticker has been updated recently.
Returns True if ticker needs update, False if it's already up to date.
"""
cursor = conn.cursor()
try:
# Check for recent DCF valuations and ratings (within last 90 days)
cursor.execute("""
WITH recent_dcf AS (
SELECT dcf_type, MAX(date) as last_update
FROM dcf_valuations
WHERE ticker_id = %s
AND date >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY dcf_type
),
recent_rating AS (
SELECT MAX(date) as last_rating
FROM company_ratings
WHERE ticker_id = %s
AND date >= CURRENT_DATE - INTERVAL '90 days'
)
SELECT
(SELECT ARRAY_AGG(dcf_type) FROM recent_dcf) as dcf_types,
(SELECT MAX(last_update) FROM recent_dcf) as latest_dcf_date,
(SELECT last_rating FROM recent_rating) as latest_rating_date;
""", (ticker_id, ticker_id))
result = cursor.fetchone()
dcf_types = result[0] if result and result[0] else []
latest_dcf_date = result[1]
latest_rating_date = result[2]
required_dcf_types = {'simple', 'advanced', 'levered'}
missing_dcf_types = required_dcf_types - set(dcf_types)
if missing_dcf_types:
logging.info(f"Ticker {ticker_id} needs update: Missing DCF types: {missing_dcf_types}")
return True
if not latest_rating_date:
logging.info(f"Ticker {ticker_id} needs update: No ratings in last 90 days")
return True
logging.info(f"Ticker {ticker_id} is up to date. Latest DCF: {latest_dcf_date}, Latest Rating: {latest_rating_date}")
return False
except Exception as e:
logging.error(f"Error checking ticker updates: {e}")
return True
finally:
cursor.close()
def process_ticker_batch(conn: psycopg2.extensions.connection,
collector: ValuationCollector,
tickers: List[Tuple[int, str]]) -> None:
"""Process a batch of tickers."""
for ticker_id, ticker_symbol in tickers:
try:
# Check if ticker needs update
needs_update = check_ticker_updates(conn, ticker_id)
if not needs_update:
logging.info(f"Skipping {ticker_symbol} - Already has all valuations within 90 days")
continue
logging.info(f"Updating valuations for {ticker_symbol}")
# Get and save simple DCF
simple_dcf = collector.get_simple_dcf(ticker_symbol)
if simple_dcf and simple_dcf[0]:
save_dcf_valuation(conn, ticker_id, 'simple', simple_dcf[0])
# Get and save advanced DCF
advanced_dcf = collector.get_advanced_dcf(ticker_symbol)
if advanced_dcf and advanced_dcf[0]:
save_dcf_valuation(conn, ticker_id, 'advanced', advanced_dcf[0])
# Get and save levered DCF
levered_dcf = collector.get_levered_dcf(ticker_symbol)
if levered_dcf and levered_dcf[0]:
save_dcf_valuation(conn, ticker_id, 'levered', levered_dcf[0])
# Get and save current rating
current_rating = collector.get_company_rating(ticker_symbol)
if current_rating and current_rating[0]:
save_company_rating(conn, ticker_id, current_rating[0])
# Get and save historical ratings
historical_ratings = collector.get_historical_rating(ticker_symbol)
if historical_ratings:
for rating in historical_ratings:
if rating:
save_company_rating(conn, ticker_id, rating)
conn.commit()
logging.info(f"Completed updating valuations for {ticker_symbol}")
except Exception as e:
conn.rollback()
logging.error(f"Error updating valuations for {ticker_symbol}: {e}")
continue
def update_all_valuations(conn: psycopg2.extensions.connection, collector: ValuationCollector):
cursor = conn.cursor()
BATCH_SIZE = 50 # Process 50 tickers at a time
try:
# Get all active tickers
cursor.execute("""
SELECT id, yf_ticker
FROM tickers
WHERE yf_ticker IS NOT NULL
ORDER BY id
""")
tickers = cursor.fetchall()
total_tickers = len(tickers)
# Process tickers in batches
for i in range(0, total_tickers, BATCH_SIZE):
batch = tickers[i:i + BATCH_SIZE]
logging.info(f"Processing batch {i//BATCH_SIZE + 1} of {(total_tickers + BATCH_SIZE - 1)//BATCH_SIZE}")
process_ticker_batch(conn, collector, batch)
# Small delay between batches to avoid overwhelming the API
if i + BATCH_SIZE < total_tickers:
time.sleep(1)
finally:
cursor.close()
def main():
load_dotenv()
setup_logging()
try:
connection = psycopg2.connect(
dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT')
)
init_database(connection)
collector = ValuationCollector(os.getenv('FMP_API_KEY'))
start_time = time.time()
update_all_valuations(connection, collector)
elapsed_time = time.time() - start_time
logging.info(f"Valuations update completed in {elapsed_time:.2f} seconds")
except Exception as e:
logging.error(f"Error updating valuations: {e}")
finally:
if 'connection' in locals():
connection.close()
if __name__ == "__main__":
main()