193 lines
6.5 KiB
Python
193 lines
6.5 KiB
Python
import logging
|
|
import yfinance as yf
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List, Dict, Any
|
|
import psycopg2
|
|
import psycopg2.extensions
|
|
from psycopg2.extras import execute_values
|
|
|
|
def get_stocks_to_update(
|
|
connection: psycopg2.extensions.connection,
|
|
max_stocks: Optional[int] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get stocks that need updating."""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
threshold_date = datetime.now() - timedelta(days=1)
|
|
|
|
# Check database status
|
|
cursor.execute("""
|
|
SELECT COUNT(*) as total,
|
|
COUNT(CASE WHEN yf_ticker IS NOT NULL THEN 1 END) as with_yf,
|
|
COUNT(CASE WHEN last_checked_at IS NOT NULL THEN 1 END) as with_last_checked
|
|
FROM tickers
|
|
""")
|
|
stats = cursor.fetchone()
|
|
logging.info(f"Database status - Total: {stats[0]}, With YF: {stats[1]}, Last checked: {stats[2]}")
|
|
|
|
query = """
|
|
SELECT
|
|
t.id,
|
|
t.yf_ticker,
|
|
t.name,
|
|
t.sector_id,
|
|
t.zone_id,
|
|
t.last_checked_at
|
|
FROM tickers t
|
|
WHERE t.yf_ticker IS NOT NULL
|
|
AND (t.last_checked_at IS NULL OR t.last_checked_at < %s)
|
|
ORDER BY t.last_checked_at NULLS FIRST
|
|
LIMIT %s
|
|
"""
|
|
|
|
cursor.execute(query, (threshold_date, max_stocks or 100))
|
|
|
|
stocks = [{
|
|
'id': row[0],
|
|
'ticker': row[1],
|
|
'name': row[2],
|
|
'sector_id': row[3],
|
|
'zone_id': row[4],
|
|
'last_checked_at': row[5]
|
|
} for row in cursor.fetchall()]
|
|
|
|
logging.info(f"Found {len(stocks)} stocks to update")
|
|
return stocks
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting stocks to update: {str(e)}")
|
|
return []
|
|
finally:
|
|
if cursor and not cursor.closed:
|
|
cursor.close()
|
|
|
|
def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Calculate technical indicators for a stock."""
|
|
try:
|
|
periods = [14, 20, 50, 200]
|
|
|
|
for period in periods:
|
|
# SMA
|
|
df[f'SMA_{period}'] = df['Close'].rolling(window=period).mean()
|
|
|
|
# EMA
|
|
df[f'EMA_{period}'] = df['Close'].ewm(span=period, adjust=False).mean()
|
|
|
|
# RSI
|
|
delta = df['Close'].diff()
|
|
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
|
|
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
|
|
rs = gain / loss
|
|
df[f'RSI_{period}'] = 100 - (100 / (1 + rs))
|
|
|
|
# Bollinger Bands
|
|
sma = df['Close'].rolling(window=period).mean()
|
|
std = df['Close'].rolling(window=period).std()
|
|
df[f'BB_UPPER_{period}'] = sma + (std * 2)
|
|
df[f'BB_LOWER_{period}'] = sma - (std * 2)
|
|
|
|
# MACD
|
|
exp1 = df['Close'].ewm(span=12, adjust=False).mean()
|
|
exp2 = df['Close'].ewm(span=26, adjust=False).mean()
|
|
df['MACD'] = exp1 - exp2
|
|
df['SIGNAL'] = df['MACD'].ewm(span=9, adjust=False).mean()
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error calculating indicators: {str(e)}")
|
|
return pd.DataFrame()
|
|
|
|
def update_stock_technicals(
|
|
connection: psycopg2.extensions.connection,
|
|
stock: Dict[str, Any]
|
|
) -> bool:
|
|
"""Update technical indicators for a stock."""
|
|
try:
|
|
# Get stock data
|
|
ticker = yf.Ticker(stock['ticker'])
|
|
df = ticker.history(period='1y')
|
|
|
|
if df.empty:
|
|
logging.warning(f"No data retrieved for {stock['ticker']}")
|
|
return False
|
|
|
|
# Calculate indicators
|
|
df = calculate_technical_indicators(df)
|
|
df.reset_index(inplace=True)
|
|
|
|
cursor = connection.cursor()
|
|
|
|
# Insert indicators
|
|
for period in [14, 20, 50, 200]:
|
|
indicators = {
|
|
'SMA': f'SMA_{period}',
|
|
'EMA': f'EMA_{period}',
|
|
'RSI': f'RSI_{period}',
|
|
'BB_UPPER': f'BB_UPPER_{period}',
|
|
'BB_LOWER': f'BB_LOWER_{period}'
|
|
}
|
|
|
|
for ind_type, column in indicators.items():
|
|
if column in df.columns:
|
|
records = [
|
|
(stock['id'], row['Date'], ind_type, period, float(row[column]))
|
|
for _, row in df.iterrows()
|
|
if pd.notna(row[column])
|
|
]
|
|
|
|
if records:
|
|
execute_values(
|
|
cursor,
|
|
"""
|
|
INSERT INTO technical_indicators
|
|
(ticker_id, datetime, indicator_type, period, value)
|
|
VALUES %s
|
|
ON CONFLICT (ticker_id, datetime, indicator_type, period)
|
|
DO UPDATE SET value = EXCLUDED.value
|
|
""",
|
|
records
|
|
)
|
|
|
|
# Update MACD
|
|
for ind_type in ['MACD', 'SIGNAL']:
|
|
if ind_type in df.columns:
|
|
records = [
|
|
(stock['id'], row['Date'], ind_type, 14, float(row[ind_type]))
|
|
for _, row in df.iterrows()
|
|
if pd.notna(row[ind_type])
|
|
]
|
|
|
|
if records:
|
|
execute_values(
|
|
cursor,
|
|
"""
|
|
INSERT INTO technical_indicators
|
|
(ticker_id, datetime, indicator_type, period, value)
|
|
VALUES %s
|
|
ON CONFLICT (ticker_id, datetime, indicator_type, period)
|
|
DO UPDATE SET value = EXCLUDED.value
|
|
""",
|
|
records
|
|
)
|
|
|
|
# Update last_checked_at
|
|
cursor.execute("""
|
|
UPDATE tickers
|
|
SET last_checked_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
""", (stock['id'],))
|
|
|
|
connection.commit()
|
|
cursor.close()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error updating {stock['ticker']}: {str(e)}")
|
|
if connection:
|
|
connection.rollback()
|
|
return False
|