124 lines
4.2 KiB
Python
124 lines
4.2 KiB
Python
import os
|
|
import logging
|
|
from datetime import datetime
|
|
import psycopg2
|
|
from dotenv import load_dotenv
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from typing import List
|
|
from utils.Stocks.financials import FinancialsCollector
|
|
from utils.Stocks.financials.statements_utils import save_financial_statement, StatementStatus
|
|
|
|
# Maximum number of threads for parallel processing
|
|
MAX_THREADS = int(os.getenv('MAX_THREADS', 10))
|
|
|
|
def setup_logging():
|
|
"""Set up logging configuration."""
|
|
log_dir = os.getenv('LOG_DIR', 'logs')
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
log_file = os.path.join(log_dir, f'statements_update_{datetime.now():%Y%m%d_%H%M%S}.log')
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[logging.FileHandler(log_file), logging.StreamHandler()]
|
|
)
|
|
|
|
def update_ticker_statements(
|
|
conn: psycopg2.extensions.connection,
|
|
status_tracker: StatementStatus,
|
|
collector: FinancialsCollector,
|
|
ticker_id: int,
|
|
ticker_symbol: str
|
|
):
|
|
"""Update financial statements for a single ticker."""
|
|
try:
|
|
logging.info(f"Updating statements for {ticker_symbol}")
|
|
status_tracker.update_status(ticker_id, "IN_PROGRESS")
|
|
|
|
# Fetch and save financial statements for both annual and quarterly periods
|
|
for period in ['annual', 'quarter']:
|
|
statements = {
|
|
'income': collector.get_income_statement(ticker_symbol, period),
|
|
'balance': collector.get_balance_sheet(ticker_symbol, period),
|
|
'cash_flow': collector.get_cash_flow(ticker_symbol, period)
|
|
}
|
|
for stmt_type, data in statements.items():
|
|
if data:
|
|
for stmt in data:
|
|
save_financial_statement(conn, ticker_id, stmt_type, period, stmt)
|
|
|
|
status_tracker.update_status(ticker_id, "COMPLETED")
|
|
logging.info(f"Completed updating statements for {ticker_symbol}")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error updating statements for {ticker_symbol}: {e}"
|
|
logging.error(error_msg)
|
|
status_tracker.update_status(ticker_id, "ERROR", error_msg)
|
|
|
|
def update_all_statements(conn: psycopg2.extensions.connection, collector: FinancialsCollector):
|
|
"""Update statements for all tickers."""
|
|
status_tracker = StatementStatus(conn)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Fetch all tickers from the database
|
|
cursor.execute("""
|
|
SELECT id, yf_ticker
|
|
FROM tickers
|
|
WHERE yf_ticker IS NOT NULL
|
|
ORDER BY id ASC
|
|
""")
|
|
tickers = cursor.fetchall()
|
|
|
|
logging.info(f"Found {len(tickers)} tickers to update.")
|
|
|
|
# Use ThreadPoolExecutor for parallel processing
|
|
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
|
|
futures = [
|
|
executor.submit(update_ticker_statements, conn, status_tracker, collector, ticker_id, ticker_symbol)
|
|
for ticker_id, ticker_symbol in tickers
|
|
]
|
|
for future in futures:
|
|
future.result() # Ensure all tasks complete
|
|
|
|
logging.info("All tickers updated successfully.")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error during bulk update: {e}")
|
|
|
|
finally:
|
|
cursor.close()
|
|
|
|
def main():
|
|
"""Main function to initialize connections and update financial statements."""
|
|
load_dotenv()
|
|
setup_logging()
|
|
|
|
try:
|
|
# Connect to the database
|
|
connection = psycopg2.connect(
|
|
dbname=os.getenv('DB_NAME'),
|
|
user=os.getenv('DB_USER'),
|
|
password=os.getenv('DB_PASSWORD'),
|
|
host=os.getenv('DB_HOST'),
|
|
port=os.getenv('DB_PORT')
|
|
)
|
|
|
|
# Initialize the financial metrics collector
|
|
collector = FinancialsCollector(os.getenv('FMP_API_KEY'))
|
|
|
|
# Update all financial statements
|
|
update_all_statements(connection, collector)
|
|
|
|
logging.info("Financial statements update completed successfully.")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in main function: {e}")
|
|
|
|
finally:
|
|
if 'connection' in locals():
|
|
connection.close()
|
|
logging.info("Database connection closed.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|