import os import sys import logging import psycopg2 from dotenv import load_dotenv from datetime import datetime, timedelta import yfinance as yf import pandas as pd import numpy as np from typing import Optional, List, Dict, Any from psycopg2.extras import execute_values from utils.Stocks.technicals.utils import setup_database_indices, verify_technical_tables # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('technical_update.log') ] ) def get_stocks_to_update( connection: psycopg2.extensions.connection, max_stocks: Optional[int] = None ) -> List[Dict[str, Any]]: """Get stocks that need updating.""" cursor = None try: cursor = connection.cursor() threshold_date = datetime.now() - timedelta(days=1) # Check database status cursor.execute(""" SELECT COUNT(*) as total, COUNT(CASE WHEN yf_ticker IS NOT NULL THEN 1 END) as with_yf, COUNT(CASE WHEN last_checked_at IS NOT NULL THEN 1 END) as with_last_checked FROM tickers """) stats = cursor.fetchone() logging.info(f"Database status - Total: {stats[0]}, With YF: {stats[1]}, Last checked: {stats[2]}") # Base query without LIMIT query = """ SELECT t.id, t.yf_ticker, t.name, t.sector_id, t.zone_id, t.last_checked_at FROM tickers t WHERE t.yf_ticker IS NOT NULL AND (t.last_checked_at IS NULL OR t.last_checked_at < %s) ORDER BY t.last_checked_at NULLS FIRST """ # Add LIMIT only if max_stocks is specified if max_stocks is not None: query += " LIMIT %s" cursor.execute(query, (threshold_date, max_stocks)) else: cursor.execute(query, (threshold_date,)) stocks = [{ 'id': row[0], 'ticker': row[1], 'name': row[2], 'sector_id': row[3], 'zone_id': row[4], 'last_checked_at': row[5] } for row in cursor.fetchall()] logging.info(f"Found {len(stocks)} stocks to update") return stocks except Exception as e: logging.error(f"Error getting stocks to update: {str(e)}") return [] finally: if cursor and not cursor.closed: cursor.close() def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: """Calculate technical indicators for a stock.""" try: periods = [14, 20, 50, 200] for period in periods: # SMA df[f'SMA_{period}'] = df['Close'].rolling(window=period).mean() # EMA df[f'EMA_{period}'] = df['Close'].ewm(span=period, adjust=False).mean() # RSI delta = df['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss df[f'RSI_{period}'] = 100 - (100 / (1 + rs)) # Bollinger Bands sma = df['Close'].rolling(window=period).mean() std = df['Close'].rolling(window=period).std() df[f'BB_UPPER_{period}'] = sma + (std * 2) df[f'BB_LOWER_{period}'] = sma - (std * 2) # MACD exp1 = df['Close'].ewm(span=12, adjust=False).mean() exp2 = df['Close'].ewm(span=26, adjust=False).mean() df['MACD'] = exp1 - exp2 df['SIGNAL'] = df['MACD'].ewm(span=9, adjust=False).mean() return df except Exception as e: logging.error(f"Error calculating indicators: {str(e)}") return pd.DataFrame() def update_stock_technicals( connection: psycopg2.extensions.connection, stock: Dict[str, Any] ) -> bool: """Update technical indicators for a stock.""" try: # Get stock data ticker = yf.Ticker(stock['ticker']) df = ticker.history(period='1y') if df.empty: logging.warning(f"No data retrieved for {stock['ticker']}") return False # Calculate indicators df = calculate_technical_indicators(df) df.reset_index(inplace=True) cursor = connection.cursor() # Insert indicators for period in [14, 20, 50, 200]: indicators = { 'SMA': f'SMA_{period}', 'EMA': f'EMA_{period}', 'RSI': f'RSI_{period}', 'BB_UPPER': f'BB_UPPER_{period}', 'BB_LOWER': f'BB_LOWER_{period}' } for ind_type, column in indicators.items(): if column in df.columns: records = [ (stock['id'], row['Date'], ind_type, period, float(row[column])) for _, row in df.iterrows() if pd.notna(row[column]) ] if records: execute_values( cursor, """ INSERT INTO technical_indicators (ticker_id, datetime, indicator_type, period, value) VALUES %s ON CONFLICT (ticker_id, datetime, indicator_type, period) DO UPDATE SET value = EXCLUDED.value """, records ) # Update MACD for ind_type in ['MACD', 'SIGNAL']: if ind_type in df.columns: records = [ (stock['id'], row['Date'], ind_type, 14, float(row[ind_type])) for _, row in df.iterrows() if pd.notna(row[ind_type]) ] if records: execute_values( cursor, """ INSERT INTO technical_indicators (ticker_id, datetime, indicator_type, period, value) VALUES %s ON CONFLICT (ticker_id, datetime, indicator_type, period) DO UPDATE SET value = EXCLUDED.value """, records ) # Update last_checked_at cursor.execute(""" UPDATE tickers SET last_checked_at = CURRENT_TIMESTAMP WHERE id = %s """, (stock['id'],)) connection.commit() cursor.close() return True except Exception as e: logging.error(f"Error updating {stock['ticker']}: {str(e)}") if connection: connection.rollback() return False def process_batch(connection, stocks, total_processed): """Process a batch of stocks and return the number of successful updates.""" successful_updates = 0 batch_total = len(stocks) for i, stock in enumerate(stocks, 1): try: logging.info(f"Processing stock {i}/{batch_total} (Total processed: {total_processed + i}): {stock['ticker']}") if update_stock_technicals(connection, stock): successful_updates += 1 connection.commit() logging.info(f"Successfully updated {stock['ticker']}") else: connection.rollback() logging.warning(f"Failed to update {stock['ticker']}") except Exception as e: logging.error(f"Error processing stock {stock['ticker']}: {str(e)}") connection.rollback() continue return successful_updates def main(): # Load environment variables load_dotenv() # Database connection connection = None try: logging.info("Starting technical indicators update process...") # Connect to database with timeout connection = psycopg2.connect( dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT'), connect_timeout=10 ) connection.set_session(autocommit=False) logging.info("Database connection established successfully") # Verify tables and indices verify_technical_tables(connection) setup_database_indices(connection) # Initialize counters total_stocks_processed = 0 total_successful_updates = 0 start_time = datetime.now() # Get all stocks that need updating all_stocks = get_stocks_to_update(connection) total_stocks = len(all_stocks) if not all_stocks: logging.info("No stocks found that need updating. Process complete.") return logging.info(f"Found total of {total_stocks} stocks that need updating") # Process stocks in batches batch_size = 100 for i in range(0, total_stocks, batch_size): batch = all_stocks[i:i + batch_size] batch_number = (i // batch_size) + 1 total_batches = (total_stocks + batch_size - 1) // batch_size logging.info(f"Processing batch {batch_number}/{total_batches} ({len(batch)} stocks)") # Process the batch successful_updates = process_batch(connection, batch, total_stocks_processed) total_stocks_processed += len(batch) total_successful_updates += successful_updates # Log progress elapsed_time = datetime.now() - start_time stocks_per_hour = (total_stocks_processed / elapsed_time.total_seconds()) * 3600 if elapsed_time.total_seconds() > 0 else 0 logging.info(f"Batch {batch_number} completed. Success rate: {successful_updates}/{len(batch)}") logging.info(f"Overall progress: {total_stocks_processed}/{total_stocks} stocks processed") logging.info(f"Current processing speed: {stocks_per_hour:.2f} stocks/hour") # Optional: Add a small delay between batches to prevent rate limiting if i + batch_size < total_stocks: time.sleep(1) # Log final statistics elapsed_time = datetime.now() - start_time total_hours = elapsed_time.total_seconds() / 3600 final_speed = (total_stocks_processed / elapsed_time.total_seconds()) * 3600 if elapsed_time.total_seconds() > 0 else 0 logging.info("Process completed successfully") logging.info(f"Final Statistics:") logging.info(f"Total stocks processed: {total_stocks_processed}") logging.info(f"Successfully updated: {total_successful_updates}") logging.info(f"Failed updates: {total_stocks_processed - total_successful_updates}") logging.info(f"Total time: {total_hours:.2f} hours") logging.info(f"Average processing speed: {final_speed:.2f} stocks/hour") except psycopg2.Error as e: logging.error(f"Database error: {str(e)}") if connection: connection.rollback() raise except Exception as e: logging.error(f"Error in main process: {str(e)}") if connection: connection.rollback() raise finally: if connection: try: connection.close() logging.info("Database connection closed") except Exception as e: logging.error(f"Error closing database connection: {str(e)}") if __name__ == "__main__": try: # Add import for time.sleep import time main() except Exception as e: logging.error("Fatal error in main program", exc_info=True) sys.exit(1)