167 lines
7.0 KiB
Python
167 lines
7.0 KiB
Python
import os
|
|
import logging
|
|
from datetime import datetime
|
|
import psycopg2
|
|
from dotenv import load_dotenv
|
|
from typing import List, Tuple
|
|
from utils.Stocks.financials.financials_collector import FinancialsCollector
|
|
from utils.Stocks.financials.financials_utils import DatabaseManager, save_financial_statement
|
|
|
|
def create_tables(conn: psycopg2.extensions.connection) -> None:
|
|
try:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS financial_statements (
|
|
id SERIAL PRIMARY KEY,
|
|
ticker_id INTEGER NOT NULL,
|
|
date DATE NOT NULL,
|
|
period VARCHAR(10) NOT NULL,
|
|
statement_type VARCHAR(20) NOT NULL,
|
|
is_reported BOOLEAN NOT NULL,
|
|
data JSONB NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE (ticker_id, date, period, statement_type, is_reported)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_financial_statements_ticker_date
|
|
ON financial_statements(ticker_id, date);
|
|
""")
|
|
conn.commit()
|
|
logging.info("Database tables created successfully")
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error creating tables: {e}")
|
|
raise
|
|
|
|
def setup_logging():
|
|
log_dir = 'logs'
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
log_file = os.path.join(log_dir, f'financials_update_{datetime.now():%Y%m%d_%H%M%S}.log')
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
class FinancialsUpdater:
|
|
def __init__(self, db_manager: DatabaseManager, collector: FinancialsCollector):
|
|
self.db_manager = db_manager
|
|
self.collector = collector
|
|
|
|
def update_ticker(self, ticker_data: Tuple[int, str]) -> None:
|
|
ticker_id, ticker_symbol = ticker_data
|
|
try:
|
|
logging.info(f"Starting update check for {ticker_symbol}")
|
|
|
|
# Check latest date in database
|
|
latest_db_date = self.db_manager.get_latest_statement_date(ticker_id, 'income')
|
|
logging.info(f"{ticker_symbol}: Latest database date: {latest_db_date}")
|
|
|
|
# Check latest date from API
|
|
latest_api_date = self.collector.get_latest_financial_statement_date(ticker_symbol)
|
|
logging.info(f"{ticker_symbol}: Latest API date: {latest_api_date}")
|
|
|
|
# Determine if update is needed
|
|
if not latest_db_date:
|
|
logging.info(f"{ticker_symbol}: No data in database, fetching all statements")
|
|
self._update_financial_statements(ticker_id, ticker_symbol)
|
|
elif latest_api_date and latest_api_date > latest_db_date:
|
|
logging.info(f"{ticker_symbol}: Found newer data (DB: {latest_db_date}, API: {latest_api_date}), updating")
|
|
self._update_financial_statements(ticker_id, ticker_symbol)
|
|
else:
|
|
logging.info(f"{ticker_symbol}: Data is up to date, skipping")
|
|
|
|
logging.info(f"Completed update check for {ticker_symbol}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error updating {ticker_symbol}: {e}")
|
|
|
|
def _update_financial_statements(self, ticker_id: int, ticker_symbol: str) -> None:
|
|
try:
|
|
logging.info(f"Fetching financial statements for {ticker_symbol}")
|
|
|
|
# Get quarterly statements
|
|
logging.info(f"{ticker_symbol}: Fetching income statements")
|
|
income = self.collector.get_income_statement(ticker_symbol, 'quarter')
|
|
logging.info(f"{ticker_symbol}: Found {len(income)} income statements")
|
|
|
|
logging.info(f"{ticker_symbol}: Fetching balance sheets")
|
|
balance = self.collector.get_balance_sheet(ticker_symbol, 'quarter')
|
|
logging.info(f"{ticker_symbol}: Found {len(balance)} balance sheets")
|
|
|
|
logging.info(f"{ticker_symbol}: Fetching cash flow statements")
|
|
cash_flow = self.collector.get_cash_flow(ticker_symbol, 'quarter')
|
|
logging.info(f"{ticker_symbol}: Found {len(cash_flow)} cash flow statements")
|
|
|
|
# Save statements to database
|
|
statements_saved = 0
|
|
for stmt in income:
|
|
save_financial_statement(self.db_manager.conn, ticker_id, 'income', 'quarter', False, stmt)
|
|
statements_saved += 1
|
|
for stmt in balance:
|
|
save_financial_statement(self.db_manager.conn, ticker_id, 'balance', 'quarter', False, stmt)
|
|
statements_saved += 1
|
|
for stmt in cash_flow:
|
|
save_financial_statement(self.db_manager.conn, ticker_id, 'cash_flow', 'quarter', False, stmt)
|
|
statements_saved += 1
|
|
|
|
logging.info(f"Successfully saved {statements_saved} statements for {ticker_symbol}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error updating financial statements for {ticker_symbol}: {e}")
|
|
|
|
def update_all_tickers(self, tickers: List[Tuple[int, str]]) -> None:
|
|
total_tickers = len(tickers)
|
|
for index, ticker in enumerate(tickers, 1):
|
|
logging.info(f"Processing ticker {index}/{total_tickers}")
|
|
self.update_ticker(ticker)
|
|
|
|
def main():
|
|
load_dotenv()
|
|
setup_logging()
|
|
|
|
logging.info("Starting financial statements update process")
|
|
|
|
try:
|
|
# Establish database connection
|
|
conn = psycopg2.connect(
|
|
dbname=os.getenv('DB_NAME'),
|
|
user=os.getenv('DB_USER'),
|
|
password=os.getenv('DB_PASSWORD'),
|
|
host=os.getenv('DB_HOST'),
|
|
port=os.getenv('DB_PORT')
|
|
)
|
|
logging.info("Database connection established")
|
|
|
|
# Create tables if they don't exist
|
|
create_tables(conn)
|
|
|
|
# Initialize managers and collectors
|
|
db_manager = DatabaseManager(conn)
|
|
collector = FinancialsCollector(os.getenv('FMP_API_KEY'))
|
|
updater = FinancialsUpdater(db_manager, collector)
|
|
|
|
# Get list of tickers to update
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("SELECT id, yf_ticker FROM tickers WHERE yf_ticker IS NOT NULL")
|
|
tickers = cursor.fetchall()
|
|
logging.info(f"Found {len(tickers)} tickers to process")
|
|
|
|
# Process all tickers
|
|
updater.update_all_tickers(tickers)
|
|
logging.info("Financial updates completed successfully")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Critical error in financial updates: {e}")
|
|
finally:
|
|
if 'conn' in locals():
|
|
conn.close()
|
|
logging.info("Database connection closed")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|