RivaCube/update_financials.py
2025-02-04 19:31:18 +01:00

167 lines
7.0 KiB
Python

import os
import logging
from datetime import datetime
import psycopg2
from dotenv import load_dotenv
from typing import List, Tuple
from utils.Stocks.financials.financials_collector import FinancialsCollector
from utils.Stocks.financials.financials_utils import DatabaseManager, save_financial_statement
def create_tables(conn: psycopg2.extensions.connection) -> None:
try:
with conn.cursor() as cursor:
cursor.execute("""
CREATE TABLE IF NOT EXISTS financial_statements (
id SERIAL PRIMARY KEY,
ticker_id INTEGER NOT NULL,
date DATE NOT NULL,
period VARCHAR(10) NOT NULL,
statement_type VARCHAR(20) NOT NULL,
is_reported BOOLEAN NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE (ticker_id, date, period, statement_type, is_reported)
);
CREATE INDEX IF NOT EXISTS idx_financial_statements_ticker_date
ON financial_statements(ticker_id, date);
""")
conn.commit()
logging.info("Database tables created successfully")
except Exception as e:
conn.rollback()
logging.error(f"Error creating tables: {e}")
raise
def setup_logging():
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f'financials_update_{datetime.now():%Y%m%d_%H%M%S}.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
class FinancialsUpdater:
def __init__(self, db_manager: DatabaseManager, collector: FinancialsCollector):
self.db_manager = db_manager
self.collector = collector
def update_ticker(self, ticker_data: Tuple[int, str]) -> None:
ticker_id, ticker_symbol = ticker_data
try:
logging.info(f"Starting update check for {ticker_symbol}")
# Check latest date in database
latest_db_date = self.db_manager.get_latest_statement_date(ticker_id, 'income')
logging.info(f"{ticker_symbol}: Latest database date: {latest_db_date}")
# Check latest date from API
latest_api_date = self.collector.get_latest_financial_statement_date(ticker_symbol)
logging.info(f"{ticker_symbol}: Latest API date: {latest_api_date}")
# Determine if update is needed
if not latest_db_date:
logging.info(f"{ticker_symbol}: No data in database, fetching all statements")
self._update_financial_statements(ticker_id, ticker_symbol)
elif latest_api_date and latest_api_date > latest_db_date:
logging.info(f"{ticker_symbol}: Found newer data (DB: {latest_db_date}, API: {latest_api_date}), updating")
self._update_financial_statements(ticker_id, ticker_symbol)
else:
logging.info(f"{ticker_symbol}: Data is up to date, skipping")
logging.info(f"Completed update check for {ticker_symbol}")
except Exception as e:
logging.error(f"Error updating {ticker_symbol}: {e}")
def _update_financial_statements(self, ticker_id: int, ticker_symbol: str) -> None:
try:
logging.info(f"Fetching financial statements for {ticker_symbol}")
# Get quarterly statements
logging.info(f"{ticker_symbol}: Fetching income statements")
income = self.collector.get_income_statement(ticker_symbol, 'quarter')
logging.info(f"{ticker_symbol}: Found {len(income)} income statements")
logging.info(f"{ticker_symbol}: Fetching balance sheets")
balance = self.collector.get_balance_sheet(ticker_symbol, 'quarter')
logging.info(f"{ticker_symbol}: Found {len(balance)} balance sheets")
logging.info(f"{ticker_symbol}: Fetching cash flow statements")
cash_flow = self.collector.get_cash_flow(ticker_symbol, 'quarter')
logging.info(f"{ticker_symbol}: Found {len(cash_flow)} cash flow statements")
# Save statements to database
statements_saved = 0
for stmt in income:
save_financial_statement(self.db_manager.conn, ticker_id, 'income', 'quarter', False, stmt)
statements_saved += 1
for stmt in balance:
save_financial_statement(self.db_manager.conn, ticker_id, 'balance', 'quarter', False, stmt)
statements_saved += 1
for stmt in cash_flow:
save_financial_statement(self.db_manager.conn, ticker_id, 'cash_flow', 'quarter', False, stmt)
statements_saved += 1
logging.info(f"Successfully saved {statements_saved} statements for {ticker_symbol}")
except Exception as e:
logging.error(f"Error updating financial statements for {ticker_symbol}: {e}")
def update_all_tickers(self, tickers: List[Tuple[int, str]]) -> None:
total_tickers = len(tickers)
for index, ticker in enumerate(tickers, 1):
logging.info(f"Processing ticker {index}/{total_tickers}")
self.update_ticker(ticker)
def main():
load_dotenv()
setup_logging()
logging.info("Starting financial statements update process")
try:
# Establish database connection
conn = psycopg2.connect(
dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT')
)
logging.info("Database connection established")
# Create tables if they don't exist
create_tables(conn)
# Initialize managers and collectors
db_manager = DatabaseManager(conn)
collector = FinancialsCollector(os.getenv('FMP_API_KEY'))
updater = FinancialsUpdater(db_manager, collector)
# Get list of tickers to update
with conn.cursor() as cursor:
cursor.execute("SELECT id, yf_ticker FROM tickers WHERE yf_ticker IS NOT NULL")
tickers = cursor.fetchall()
logging.info(f"Found {len(tickers)} tickers to process")
# Process all tickers
updater.update_all_tickers(tickers)
logging.info("Financial updates completed successfully")
except Exception as e:
logging.error(f"Critical error in financial updates: {e}")
finally:
if 'conn' in locals():
conn.close()
logging.info("Database connection closed")
if __name__ == "__main__":
main()