RivaCube/update_technicals.py
2025-02-04 19:31:18 +01:00

339 lines
12 KiB
Python

import os
import sys
import logging
import psycopg2
from dotenv import load_dotenv
from datetime import datetime, timedelta
import yfinance as yf
import pandas as pd
import numpy as np
from typing import Optional, List, Dict, Any
from psycopg2.extras import execute_values
from utils.Stocks.technicals.utils import setup_database_indices, verify_technical_tables
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('technical_update.log')
]
)
def get_stocks_to_update(
connection: psycopg2.extensions.connection,
max_stocks: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Get stocks that need updating."""
cursor = None
try:
cursor = connection.cursor()
threshold_date = datetime.now() - timedelta(days=1)
# Check database status
cursor.execute("""
SELECT COUNT(*) as total,
COUNT(CASE WHEN yf_ticker IS NOT NULL THEN 1 END) as with_yf,
COUNT(CASE WHEN last_checked_at IS NOT NULL THEN 1 END) as with_last_checked
FROM tickers
""")
stats = cursor.fetchone()
logging.info(f"Database status - Total: {stats[0]}, With YF: {stats[1]}, Last checked: {stats[2]}")
# Base query without LIMIT
query = """
SELECT
t.id,
t.yf_ticker,
t.name,
t.sector_id,
t.zone_id,
t.last_checked_at
FROM tickers t
WHERE t.yf_ticker IS NOT NULL
AND (t.last_checked_at IS NULL OR t.last_checked_at < %s)
ORDER BY t.last_checked_at NULLS FIRST
"""
# Add LIMIT only if max_stocks is specified
if max_stocks is not None:
query += " LIMIT %s"
cursor.execute(query, (threshold_date, max_stocks))
else:
cursor.execute(query, (threshold_date,))
stocks = [{
'id': row[0],
'ticker': row[1],
'name': row[2],
'sector_id': row[3],
'zone_id': row[4],
'last_checked_at': row[5]
} for row in cursor.fetchall()]
logging.info(f"Found {len(stocks)} stocks to update")
return stocks
except Exception as e:
logging.error(f"Error getting stocks to update: {str(e)}")
return []
finally:
if cursor and not cursor.closed:
cursor.close()
def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate technical indicators for a stock."""
try:
periods = [14, 20, 50, 200]
for period in periods:
# SMA
df[f'SMA_{period}'] = df['Close'].rolling(window=period).mean()
# EMA
df[f'EMA_{period}'] = df['Close'].ewm(span=period, adjust=False).mean()
# RSI
delta = df['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
df[f'RSI_{period}'] = 100 - (100 / (1 + rs))
# Bollinger Bands
sma = df['Close'].rolling(window=period).mean()
std = df['Close'].rolling(window=period).std()
df[f'BB_UPPER_{period}'] = sma + (std * 2)
df[f'BB_LOWER_{period}'] = sma - (std * 2)
# MACD
exp1 = df['Close'].ewm(span=12, adjust=False).mean()
exp2 = df['Close'].ewm(span=26, adjust=False).mean()
df['MACD'] = exp1 - exp2
df['SIGNAL'] = df['MACD'].ewm(span=9, adjust=False).mean()
return df
except Exception as e:
logging.error(f"Error calculating indicators: {str(e)}")
return pd.DataFrame()
def update_stock_technicals(
connection: psycopg2.extensions.connection,
stock: Dict[str, Any]
) -> bool:
"""Update technical indicators for a stock."""
try:
# Get stock data
ticker = yf.Ticker(stock['ticker'])
df = ticker.history(period='1y')
if df.empty:
logging.warning(f"No data retrieved for {stock['ticker']}")
return False
# Calculate indicators
df = calculate_technical_indicators(df)
df.reset_index(inplace=True)
cursor = connection.cursor()
# Insert indicators
for period in [14, 20, 50, 200]:
indicators = {
'SMA': f'SMA_{period}',
'EMA': f'EMA_{period}',
'RSI': f'RSI_{period}',
'BB_UPPER': f'BB_UPPER_{period}',
'BB_LOWER': f'BB_LOWER_{period}'
}
for ind_type, column in indicators.items():
if column in df.columns:
records = [
(stock['id'], row['Date'], ind_type, period, float(row[column]))
for _, row in df.iterrows()
if pd.notna(row[column])
]
if records:
execute_values(
cursor,
"""
INSERT INTO technical_indicators
(ticker_id, datetime, indicator_type, period, value)
VALUES %s
ON CONFLICT (ticker_id, datetime, indicator_type, period)
DO UPDATE SET value = EXCLUDED.value
""",
records
)
# Update MACD
for ind_type in ['MACD', 'SIGNAL']:
if ind_type in df.columns:
records = [
(stock['id'], row['Date'], ind_type, 14, float(row[ind_type]))
for _, row in df.iterrows()
if pd.notna(row[ind_type])
]
if records:
execute_values(
cursor,
"""
INSERT INTO technical_indicators
(ticker_id, datetime, indicator_type, period, value)
VALUES %s
ON CONFLICT (ticker_id, datetime, indicator_type, period)
DO UPDATE SET value = EXCLUDED.value
""",
records
)
# Update last_checked_at
cursor.execute("""
UPDATE tickers
SET last_checked_at = CURRENT_TIMESTAMP
WHERE id = %s
""", (stock['id'],))
connection.commit()
cursor.close()
return True
except Exception as e:
logging.error(f"Error updating {stock['ticker']}: {str(e)}")
if connection:
connection.rollback()
return False
def process_batch(connection, stocks, total_processed):
"""Process a batch of stocks and return the number of successful updates."""
successful_updates = 0
batch_total = len(stocks)
for i, stock in enumerate(stocks, 1):
try:
logging.info(f"Processing stock {i}/{batch_total} (Total processed: {total_processed + i}): {stock['ticker']}")
if update_stock_technicals(connection, stock):
successful_updates += 1
connection.commit()
logging.info(f"Successfully updated {stock['ticker']}")
else:
connection.rollback()
logging.warning(f"Failed to update {stock['ticker']}")
except Exception as e:
logging.error(f"Error processing stock {stock['ticker']}: {str(e)}")
connection.rollback()
continue
return successful_updates
def main():
# Load environment variables
load_dotenv()
# Database connection
connection = None
try:
logging.info("Starting technical indicators update process...")
# Connect to database with timeout
connection = psycopg2.connect(
dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT'),
connect_timeout=10
)
connection.set_session(autocommit=False)
logging.info("Database connection established successfully")
# Verify tables and indices
verify_technical_tables(connection)
setup_database_indices(connection)
# Initialize counters
total_stocks_processed = 0
total_successful_updates = 0
start_time = datetime.now()
# Get all stocks that need updating
all_stocks = get_stocks_to_update(connection)
total_stocks = len(all_stocks)
if not all_stocks:
logging.info("No stocks found that need updating. Process complete.")
return
logging.info(f"Found total of {total_stocks} stocks that need updating")
# Process stocks in batches
batch_size = 100
for i in range(0, total_stocks, batch_size):
batch = all_stocks[i:i + batch_size]
batch_number = (i // batch_size) + 1
total_batches = (total_stocks + batch_size - 1) // batch_size
logging.info(f"Processing batch {batch_number}/{total_batches} ({len(batch)} stocks)")
# Process the batch
successful_updates = process_batch(connection, batch, total_stocks_processed)
total_stocks_processed += len(batch)
total_successful_updates += successful_updates
# Log progress
elapsed_time = datetime.now() - start_time
stocks_per_hour = (total_stocks_processed / elapsed_time.total_seconds()) * 3600 if elapsed_time.total_seconds() > 0 else 0
logging.info(f"Batch {batch_number} completed. Success rate: {successful_updates}/{len(batch)}")
logging.info(f"Overall progress: {total_stocks_processed}/{total_stocks} stocks processed")
logging.info(f"Current processing speed: {stocks_per_hour:.2f} stocks/hour")
# Optional: Add a small delay between batches to prevent rate limiting
if i + batch_size < total_stocks:
time.sleep(1)
# Log final statistics
elapsed_time = datetime.now() - start_time
total_hours = elapsed_time.total_seconds() / 3600
final_speed = (total_stocks_processed / elapsed_time.total_seconds()) * 3600 if elapsed_time.total_seconds() > 0 else 0
logging.info("Process completed successfully")
logging.info(f"Final Statistics:")
logging.info(f"Total stocks processed: {total_stocks_processed}")
logging.info(f"Successfully updated: {total_successful_updates}")
logging.info(f"Failed updates: {total_stocks_processed - total_successful_updates}")
logging.info(f"Total time: {total_hours:.2f} hours")
logging.info(f"Average processing speed: {final_speed:.2f} stocks/hour")
except psycopg2.Error as e:
logging.error(f"Database error: {str(e)}")
if connection:
connection.rollback()
raise
except Exception as e:
logging.error(f"Error in main process: {str(e)}")
if connection:
connection.rollback()
raise
finally:
if connection:
try:
connection.close()
logging.info("Database connection closed")
except Exception as e:
logging.error(f"Error closing database connection: {str(e)}")
if __name__ == "__main__":
try:
# Add import for time.sleep
import time
main()
except Exception as e:
logging.error("Fatal error in main program", exc_info=True)
sys.exit(1)