import logging import yfinance as yf import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Optional, List, Dict, Any import psycopg2 import psycopg2.extensions from psycopg2.extras import execute_values def get_stocks_to_update( connection: psycopg2.extensions.connection, max_stocks: Optional[int] = None ) -> List[Dict[str, Any]]: """Get stocks that need updating.""" cursor = None try: cursor = connection.cursor() threshold_date = datetime.now() - timedelta(days=1) # Check database status cursor.execute(""" SELECT COUNT(*) as total, COUNT(CASE WHEN yf_ticker IS NOT NULL THEN 1 END) as with_yf, COUNT(CASE WHEN last_checked_at IS NOT NULL THEN 1 END) as with_last_checked FROM tickers """) stats = cursor.fetchone() logging.info(f"Database status - Total: {stats[0]}, With YF: {stats[1]}, Last checked: {stats[2]}") query = """ SELECT t.id, t.yf_ticker, t.name, t.sector_id, t.zone_id, t.last_checked_at FROM tickers t WHERE t.yf_ticker IS NOT NULL AND (t.last_checked_at IS NULL OR t.last_checked_at < %s) ORDER BY t.last_checked_at NULLS FIRST LIMIT %s """ cursor.execute(query, (threshold_date, max_stocks or 100)) stocks = [{ 'id': row[0], 'ticker': row[1], 'name': row[2], 'sector_id': row[3], 'zone_id': row[4], 'last_checked_at': row[5] } for row in cursor.fetchall()] logging.info(f"Found {len(stocks)} stocks to update") return stocks except Exception as e: logging.error(f"Error getting stocks to update: {str(e)}") return [] finally: if cursor and not cursor.closed: cursor.close() def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: """Calculate technical indicators for a stock.""" try: periods = [14, 20, 50, 200] for period in periods: # SMA df[f'SMA_{period}'] = df['Close'].rolling(window=period).mean() # EMA df[f'EMA_{period}'] = df['Close'].ewm(span=period, adjust=False).mean() # RSI delta = df['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss df[f'RSI_{period}'] = 100 - (100 / (1 + rs)) # Bollinger Bands sma = df['Close'].rolling(window=period).mean() std = df['Close'].rolling(window=period).std() df[f'BB_UPPER_{period}'] = sma + (std * 2) df[f'BB_LOWER_{period}'] = sma - (std * 2) # MACD exp1 = df['Close'].ewm(span=12, adjust=False).mean() exp2 = df['Close'].ewm(span=26, adjust=False).mean() df['MACD'] = exp1 - exp2 df['SIGNAL'] = df['MACD'].ewm(span=9, adjust=False).mean() return df except Exception as e: logging.error(f"Error calculating indicators: {str(e)}") return pd.DataFrame() def update_stock_technicals( connection: psycopg2.extensions.connection, stock: Dict[str, Any] ) -> bool: """Update technical indicators for a stock.""" try: # Get stock data ticker = yf.Ticker(stock['ticker']) df = ticker.history(period='1y') if df.empty: logging.warning(f"No data retrieved for {stock['ticker']}") return False # Calculate indicators df = calculate_technical_indicators(df) df.reset_index(inplace=True) cursor = connection.cursor() # Insert indicators for period in [14, 20, 50, 200]: indicators = { 'SMA': f'SMA_{period}', 'EMA': f'EMA_{period}', 'RSI': f'RSI_{period}', 'BB_UPPER': f'BB_UPPER_{period}', 'BB_LOWER': f'BB_LOWER_{period}' } for ind_type, column in indicators.items(): if column in df.columns: records = [ (stock['id'], row['Date'], ind_type, period, float(row[column])) for _, row in df.iterrows() if pd.notna(row[column]) ] if records: execute_values( cursor, """ INSERT INTO technical_indicators (ticker_id, datetime, indicator_type, period, value) VALUES %s ON CONFLICT (ticker_id, datetime, indicator_type, period) DO UPDATE SET value = EXCLUDED.value """, records ) # Update MACD for ind_type in ['MACD', 'SIGNAL']: if ind_type in df.columns: records = [ (stock['id'], row['Date'], ind_type, 14, float(row[ind_type])) for _, row in df.iterrows() if pd.notna(row[ind_type]) ] if records: execute_values( cursor, """ INSERT INTO technical_indicators (ticker_id, datetime, indicator_type, period, value) VALUES %s ON CONFLICT (ticker_id, datetime, indicator_type, period) DO UPDATE SET value = EXCLUDED.value """, records ) # Update last_checked_at cursor.execute(""" UPDATE tickers SET last_checked_at = CURRENT_TIMESTAMP WHERE id = %s """, (stock['id'],)) connection.commit() cursor.close() return True except Exception as e: logging.error(f"Error updating {stock['ticker']}: {str(e)}") if connection: connection.rollback() return False