339 lines
12 KiB
Python
339 lines
12 KiB
Python
import os
|
|
import sys
|
|
import logging
|
|
import psycopg2
|
|
from dotenv import load_dotenv
|
|
from datetime import datetime, timedelta
|
|
import yfinance as yf
|
|
import pandas as pd
|
|
import numpy as np
|
|
from typing import Optional, List, Dict, Any
|
|
from psycopg2.extras import execute_values
|
|
from utils.Stocks.technicals.utils import setup_database_indices, verify_technical_tables
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler('technical_update.log')
|
|
]
|
|
)
|
|
|
|
def get_stocks_to_update(
|
|
connection: psycopg2.extensions.connection,
|
|
max_stocks: Optional[int] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get stocks that need updating."""
|
|
cursor = None
|
|
try:
|
|
cursor = connection.cursor()
|
|
threshold_date = datetime.now() - timedelta(days=1)
|
|
|
|
# Check database status
|
|
cursor.execute("""
|
|
SELECT COUNT(*) as total,
|
|
COUNT(CASE WHEN yf_ticker IS NOT NULL THEN 1 END) as with_yf,
|
|
COUNT(CASE WHEN last_checked_at IS NOT NULL THEN 1 END) as with_last_checked
|
|
FROM tickers
|
|
""")
|
|
stats = cursor.fetchone()
|
|
logging.info(f"Database status - Total: {stats[0]}, With YF: {stats[1]}, Last checked: {stats[2]}")
|
|
|
|
# Base query without LIMIT
|
|
query = """
|
|
SELECT
|
|
t.id,
|
|
t.yf_ticker,
|
|
t.name,
|
|
t.sector_id,
|
|
t.zone_id,
|
|
t.last_checked_at
|
|
FROM tickers t
|
|
WHERE t.yf_ticker IS NOT NULL
|
|
AND (t.last_checked_at IS NULL OR t.last_checked_at < %s)
|
|
ORDER BY t.last_checked_at NULLS FIRST
|
|
"""
|
|
|
|
# Add LIMIT only if max_stocks is specified
|
|
if max_stocks is not None:
|
|
query += " LIMIT %s"
|
|
cursor.execute(query, (threshold_date, max_stocks))
|
|
else:
|
|
cursor.execute(query, (threshold_date,))
|
|
|
|
stocks = [{
|
|
'id': row[0],
|
|
'ticker': row[1],
|
|
'name': row[2],
|
|
'sector_id': row[3],
|
|
'zone_id': row[4],
|
|
'last_checked_at': row[5]
|
|
} for row in cursor.fetchall()]
|
|
|
|
logging.info(f"Found {len(stocks)} stocks to update")
|
|
return stocks
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting stocks to update: {str(e)}")
|
|
return []
|
|
finally:
|
|
if cursor and not cursor.closed:
|
|
cursor.close()
|
|
|
|
def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Calculate technical indicators for a stock."""
|
|
try:
|
|
periods = [14, 20, 50, 200]
|
|
|
|
for period in periods:
|
|
# SMA
|
|
df[f'SMA_{period}'] = df['Close'].rolling(window=period).mean()
|
|
|
|
# EMA
|
|
df[f'EMA_{period}'] = df['Close'].ewm(span=period, adjust=False).mean()
|
|
|
|
# RSI
|
|
delta = df['Close'].diff()
|
|
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
|
|
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
|
|
rs = gain / loss
|
|
df[f'RSI_{period}'] = 100 - (100 / (1 + rs))
|
|
|
|
# Bollinger Bands
|
|
sma = df['Close'].rolling(window=period).mean()
|
|
std = df['Close'].rolling(window=period).std()
|
|
df[f'BB_UPPER_{period}'] = sma + (std * 2)
|
|
df[f'BB_LOWER_{period}'] = sma - (std * 2)
|
|
|
|
# MACD
|
|
exp1 = df['Close'].ewm(span=12, adjust=False).mean()
|
|
exp2 = df['Close'].ewm(span=26, adjust=False).mean()
|
|
df['MACD'] = exp1 - exp2
|
|
df['SIGNAL'] = df['MACD'].ewm(span=9, adjust=False).mean()
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error calculating indicators: {str(e)}")
|
|
return pd.DataFrame()
|
|
|
|
def update_stock_technicals(
|
|
connection: psycopg2.extensions.connection,
|
|
stock: Dict[str, Any]
|
|
) -> bool:
|
|
"""Update technical indicators for a stock."""
|
|
try:
|
|
# Get stock data
|
|
ticker = yf.Ticker(stock['ticker'])
|
|
df = ticker.history(period='1y')
|
|
|
|
if df.empty:
|
|
logging.warning(f"No data retrieved for {stock['ticker']}")
|
|
return False
|
|
|
|
# Calculate indicators
|
|
df = calculate_technical_indicators(df)
|
|
df.reset_index(inplace=True)
|
|
|
|
cursor = connection.cursor()
|
|
|
|
# Insert indicators
|
|
for period in [14, 20, 50, 200]:
|
|
indicators = {
|
|
'SMA': f'SMA_{period}',
|
|
'EMA': f'EMA_{period}',
|
|
'RSI': f'RSI_{period}',
|
|
'BB_UPPER': f'BB_UPPER_{period}',
|
|
'BB_LOWER': f'BB_LOWER_{period}'
|
|
}
|
|
|
|
for ind_type, column in indicators.items():
|
|
if column in df.columns:
|
|
records = [
|
|
(stock['id'], row['Date'], ind_type, period, float(row[column]))
|
|
for _, row in df.iterrows()
|
|
if pd.notna(row[column])
|
|
]
|
|
|
|
if records:
|
|
execute_values(
|
|
cursor,
|
|
"""
|
|
INSERT INTO technical_indicators
|
|
(ticker_id, datetime, indicator_type, period, value)
|
|
VALUES %s
|
|
ON CONFLICT (ticker_id, datetime, indicator_type, period)
|
|
DO UPDATE SET value = EXCLUDED.value
|
|
""",
|
|
records
|
|
)
|
|
|
|
# Update MACD
|
|
for ind_type in ['MACD', 'SIGNAL']:
|
|
if ind_type in df.columns:
|
|
records = [
|
|
(stock['id'], row['Date'], ind_type, 14, float(row[ind_type]))
|
|
for _, row in df.iterrows()
|
|
if pd.notna(row[ind_type])
|
|
]
|
|
|
|
if records:
|
|
execute_values(
|
|
cursor,
|
|
"""
|
|
INSERT INTO technical_indicators
|
|
(ticker_id, datetime, indicator_type, period, value)
|
|
VALUES %s
|
|
ON CONFLICT (ticker_id, datetime, indicator_type, period)
|
|
DO UPDATE SET value = EXCLUDED.value
|
|
""",
|
|
records
|
|
)
|
|
|
|
# Update last_checked_at
|
|
cursor.execute("""
|
|
UPDATE tickers
|
|
SET last_checked_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
""", (stock['id'],))
|
|
|
|
connection.commit()
|
|
cursor.close()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error updating {stock['ticker']}: {str(e)}")
|
|
if connection:
|
|
connection.rollback()
|
|
return False
|
|
|
|
def process_batch(connection, stocks, total_processed):
|
|
"""Process a batch of stocks and return the number of successful updates."""
|
|
successful_updates = 0
|
|
batch_total = len(stocks)
|
|
|
|
for i, stock in enumerate(stocks, 1):
|
|
try:
|
|
logging.info(f"Processing stock {i}/{batch_total} (Total processed: {total_processed + i}): {stock['ticker']}")
|
|
if update_stock_technicals(connection, stock):
|
|
successful_updates += 1
|
|
connection.commit()
|
|
logging.info(f"Successfully updated {stock['ticker']}")
|
|
else:
|
|
connection.rollback()
|
|
logging.warning(f"Failed to update {stock['ticker']}")
|
|
except Exception as e:
|
|
logging.error(f"Error processing stock {stock['ticker']}: {str(e)}")
|
|
connection.rollback()
|
|
continue
|
|
|
|
return successful_updates
|
|
|
|
def main():
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Database connection
|
|
connection = None
|
|
try:
|
|
logging.info("Starting technical indicators update process...")
|
|
|
|
# Connect to database with timeout
|
|
connection = psycopg2.connect(
|
|
dbname=os.getenv('DB_NAME'),
|
|
user=os.getenv('DB_USER'),
|
|
password=os.getenv('DB_PASSWORD'),
|
|
host=os.getenv('DB_HOST'),
|
|
port=os.getenv('DB_PORT'),
|
|
connect_timeout=10
|
|
)
|
|
connection.set_session(autocommit=False)
|
|
logging.info("Database connection established successfully")
|
|
|
|
# Verify tables and indices
|
|
verify_technical_tables(connection)
|
|
setup_database_indices(connection)
|
|
|
|
# Initialize counters
|
|
total_stocks_processed = 0
|
|
total_successful_updates = 0
|
|
start_time = datetime.now()
|
|
|
|
# Get all stocks that need updating
|
|
all_stocks = get_stocks_to_update(connection)
|
|
total_stocks = len(all_stocks)
|
|
|
|
if not all_stocks:
|
|
logging.info("No stocks found that need updating. Process complete.")
|
|
return
|
|
|
|
logging.info(f"Found total of {total_stocks} stocks that need updating")
|
|
|
|
# Process stocks in batches
|
|
batch_size = 100
|
|
for i in range(0, total_stocks, batch_size):
|
|
batch = all_stocks[i:i + batch_size]
|
|
batch_number = (i // batch_size) + 1
|
|
total_batches = (total_stocks + batch_size - 1) // batch_size
|
|
|
|
logging.info(f"Processing batch {batch_number}/{total_batches} ({len(batch)} stocks)")
|
|
|
|
# Process the batch
|
|
successful_updates = process_batch(connection, batch, total_stocks_processed)
|
|
total_stocks_processed += len(batch)
|
|
total_successful_updates += successful_updates
|
|
|
|
# Log progress
|
|
elapsed_time = datetime.now() - start_time
|
|
stocks_per_hour = (total_stocks_processed / elapsed_time.total_seconds()) * 3600 if elapsed_time.total_seconds() > 0 else 0
|
|
|
|
logging.info(f"Batch {batch_number} completed. Success rate: {successful_updates}/{len(batch)}")
|
|
logging.info(f"Overall progress: {total_stocks_processed}/{total_stocks} stocks processed")
|
|
logging.info(f"Current processing speed: {stocks_per_hour:.2f} stocks/hour")
|
|
|
|
# Optional: Add a small delay between batches to prevent rate limiting
|
|
if i + batch_size < total_stocks:
|
|
time.sleep(1)
|
|
|
|
# Log final statistics
|
|
elapsed_time = datetime.now() - start_time
|
|
total_hours = elapsed_time.total_seconds() / 3600
|
|
final_speed = (total_stocks_processed / elapsed_time.total_seconds()) * 3600 if elapsed_time.total_seconds() > 0 else 0
|
|
|
|
logging.info("Process completed successfully")
|
|
logging.info(f"Final Statistics:")
|
|
logging.info(f"Total stocks processed: {total_stocks_processed}")
|
|
logging.info(f"Successfully updated: {total_successful_updates}")
|
|
logging.info(f"Failed updates: {total_stocks_processed - total_successful_updates}")
|
|
logging.info(f"Total time: {total_hours:.2f} hours")
|
|
logging.info(f"Average processing speed: {final_speed:.2f} stocks/hour")
|
|
|
|
except psycopg2.Error as e:
|
|
logging.error(f"Database error: {str(e)}")
|
|
if connection:
|
|
connection.rollback()
|
|
raise
|
|
except Exception as e:
|
|
logging.error(f"Error in main process: {str(e)}")
|
|
if connection:
|
|
connection.rollback()
|
|
raise
|
|
finally:
|
|
if connection:
|
|
try:
|
|
connection.close()
|
|
logging.info("Database connection closed")
|
|
except Exception as e:
|
|
logging.error(f"Error closing database connection: {str(e)}")
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
# Add import for time.sleep
|
|
import time
|
|
main()
|
|
except Exception as e:
|
|
logging.error("Fatal error in main program", exc_info=True)
|
|
sys.exit(1)
|