RivaCube/update_statements.py
2025-02-04 19:31:18 +01:00

124 lines
4.2 KiB
Python

import os
import logging
from datetime import datetime
import psycopg2
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor
from typing import List
from utils.Stocks.financials import FinancialsCollector
from utils.Stocks.financials.statements_utils import save_financial_statement, StatementStatus
# Maximum number of threads for parallel processing
MAX_THREADS = int(os.getenv('MAX_THREADS', 10))
def setup_logging():
"""Set up logging configuration."""
log_dir = os.getenv('LOG_DIR', 'logs')
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f'statements_update_{datetime.now():%Y%m%d_%H%M%S}.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(log_file), logging.StreamHandler()]
)
def update_ticker_statements(
conn: psycopg2.extensions.connection,
status_tracker: StatementStatus,
collector: FinancialsCollector,
ticker_id: int,
ticker_symbol: str
):
"""Update financial statements for a single ticker."""
try:
logging.info(f"Updating statements for {ticker_symbol}")
status_tracker.update_status(ticker_id, "IN_PROGRESS")
# Fetch and save financial statements for both annual and quarterly periods
for period in ['annual', 'quarter']:
statements = {
'income': collector.get_income_statement(ticker_symbol, period),
'balance': collector.get_balance_sheet(ticker_symbol, period),
'cash_flow': collector.get_cash_flow(ticker_symbol, period)
}
for stmt_type, data in statements.items():
if data:
for stmt in data:
save_financial_statement(conn, ticker_id, stmt_type, period, stmt)
status_tracker.update_status(ticker_id, "COMPLETED")
logging.info(f"Completed updating statements for {ticker_symbol}")
except Exception as e:
error_msg = f"Error updating statements for {ticker_symbol}: {e}"
logging.error(error_msg)
status_tracker.update_status(ticker_id, "ERROR", error_msg)
def update_all_statements(conn: psycopg2.extensions.connection, collector: FinancialsCollector):
"""Update statements for all tickers."""
status_tracker = StatementStatus(conn)
cursor = conn.cursor()
try:
# Fetch all tickers from the database
cursor.execute("""
SELECT id, yf_ticker
FROM tickers
WHERE yf_ticker IS NOT NULL
ORDER BY id ASC
""")
tickers = cursor.fetchall()
logging.info(f"Found {len(tickers)} tickers to update.")
# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
futures = [
executor.submit(update_ticker_statements, conn, status_tracker, collector, ticker_id, ticker_symbol)
for ticker_id, ticker_symbol in tickers
]
for future in futures:
future.result() # Ensure all tasks complete
logging.info("All tickers updated successfully.")
except Exception as e:
logging.error(f"Error during bulk update: {e}")
finally:
cursor.close()
def main():
"""Main function to initialize connections and update financial statements."""
load_dotenv()
setup_logging()
try:
# Connect to the database
connection = psycopg2.connect(
dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT')
)
# Initialize the financial metrics collector
collector = FinancialsCollector(os.getenv('FMP_API_KEY'))
# Update all financial statements
update_all_statements(connection, collector)
logging.info("Financial statements update completed successfully.")
except Exception as e:
logging.error(f"Error in main function: {e}")
finally:
if 'connection' in locals():
connection.close()
logging.info("Database connection closed.")
if __name__ == "__main__":
main()