RivaCube/utils/Stocks/technicals/collector.py
2025-02-04 19:31:18 +01:00

193 lines
6.5 KiB
Python

import logging
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
import psycopg2
import psycopg2.extensions
from psycopg2.extras import execute_values
def get_stocks_to_update(
connection: psycopg2.extensions.connection,
max_stocks: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Get stocks that need updating."""
cursor = None
try:
cursor = connection.cursor()
threshold_date = datetime.now() - timedelta(days=1)
# Check database status
cursor.execute("""
SELECT COUNT(*) as total,
COUNT(CASE WHEN yf_ticker IS NOT NULL THEN 1 END) as with_yf,
COUNT(CASE WHEN last_checked_at IS NOT NULL THEN 1 END) as with_last_checked
FROM tickers
""")
stats = cursor.fetchone()
logging.info(f"Database status - Total: {stats[0]}, With YF: {stats[1]}, Last checked: {stats[2]}")
query = """
SELECT
t.id,
t.yf_ticker,
t.name,
t.sector_id,
t.zone_id,
t.last_checked_at
FROM tickers t
WHERE t.yf_ticker IS NOT NULL
AND (t.last_checked_at IS NULL OR t.last_checked_at < %s)
ORDER BY t.last_checked_at NULLS FIRST
LIMIT %s
"""
cursor.execute(query, (threshold_date, max_stocks or 100))
stocks = [{
'id': row[0],
'ticker': row[1],
'name': row[2],
'sector_id': row[3],
'zone_id': row[4],
'last_checked_at': row[5]
} for row in cursor.fetchall()]
logging.info(f"Found {len(stocks)} stocks to update")
return stocks
except Exception as e:
logging.error(f"Error getting stocks to update: {str(e)}")
return []
finally:
if cursor and not cursor.closed:
cursor.close()
def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate technical indicators for a stock."""
try:
periods = [14, 20, 50, 200]
for period in periods:
# SMA
df[f'SMA_{period}'] = df['Close'].rolling(window=period).mean()
# EMA
df[f'EMA_{period}'] = df['Close'].ewm(span=period, adjust=False).mean()
# RSI
delta = df['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
df[f'RSI_{period}'] = 100 - (100 / (1 + rs))
# Bollinger Bands
sma = df['Close'].rolling(window=period).mean()
std = df['Close'].rolling(window=period).std()
df[f'BB_UPPER_{period}'] = sma + (std * 2)
df[f'BB_LOWER_{period}'] = sma - (std * 2)
# MACD
exp1 = df['Close'].ewm(span=12, adjust=False).mean()
exp2 = df['Close'].ewm(span=26, adjust=False).mean()
df['MACD'] = exp1 - exp2
df['SIGNAL'] = df['MACD'].ewm(span=9, adjust=False).mean()
return df
except Exception as e:
logging.error(f"Error calculating indicators: {str(e)}")
return pd.DataFrame()
def update_stock_technicals(
connection: psycopg2.extensions.connection,
stock: Dict[str, Any]
) -> bool:
"""Update technical indicators for a stock."""
try:
# Get stock data
ticker = yf.Ticker(stock['ticker'])
df = ticker.history(period='1y')
if df.empty:
logging.warning(f"No data retrieved for {stock['ticker']}")
return False
# Calculate indicators
df = calculate_technical_indicators(df)
df.reset_index(inplace=True)
cursor = connection.cursor()
# Insert indicators
for period in [14, 20, 50, 200]:
indicators = {
'SMA': f'SMA_{period}',
'EMA': f'EMA_{period}',
'RSI': f'RSI_{period}',
'BB_UPPER': f'BB_UPPER_{period}',
'BB_LOWER': f'BB_LOWER_{period}'
}
for ind_type, column in indicators.items():
if column in df.columns:
records = [
(stock['id'], row['Date'], ind_type, period, float(row[column]))
for _, row in df.iterrows()
if pd.notna(row[column])
]
if records:
execute_values(
cursor,
"""
INSERT INTO technical_indicators
(ticker_id, datetime, indicator_type, period, value)
VALUES %s
ON CONFLICT (ticker_id, datetime, indicator_type, period)
DO UPDATE SET value = EXCLUDED.value
""",
records
)
# Update MACD
for ind_type in ['MACD', 'SIGNAL']:
if ind_type in df.columns:
records = [
(stock['id'], row['Date'], ind_type, 14, float(row[ind_type]))
for _, row in df.iterrows()
if pd.notna(row[ind_type])
]
if records:
execute_values(
cursor,
"""
INSERT INTO technical_indicators
(ticker_id, datetime, indicator_type, period, value)
VALUES %s
ON CONFLICT (ticker_id, datetime, indicator_type, period)
DO UPDATE SET value = EXCLUDED.value
""",
records
)
# Update last_checked_at
cursor.execute("""
UPDATE tickers
SET last_checked_at = CURRENT_TIMESTAMP
WHERE id = %s
""", (stock['id'],))
connection.commit()
cursor.close()
return True
except Exception as e:
logging.error(f"Error updating {stock['ticker']}: {str(e)}")
if connection:
connection.rollback()
return False