RivaCube/utils/Stocks/consensus/utils.py
2025-02-04 19:31:18 +01:00

240 lines
8.4 KiB
Python

import logging
from datetime import datetime
import psycopg2
from typing import Dict, Optional, List, Tuple, Any
from psycopg2.extras import execute_values
def parse_date(date_str: Optional[str]) -> Optional[datetime.date]:
"""Parse date string with multiple format support and validation."""
if not date_str:
logging.warning(f"Empty date string received")
return None
date_formats = [
'%Y-%m-%d',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S.%fZ',
'%Y-%m-%dT%H:%M:%SZ'
]
for date_format in date_formats:
try:
return datetime.strptime(date_str, date_format).date()
except ValueError:
continue
logging.warning(f"Could not parse date: {date_str}")
return None
def validate_price(price_value: Any) -> Optional[float]:
"""Validate and convert price value to float."""
if price_value is None:
return None
try:
price = float(price_value)
if price < 0:
logging.warning(f"Negative price value: {price}")
return None
return price
except (ValueError, TypeError):
logging.warning(f"Invalid price value: {price_value}")
return None
def is_valid_rating(rating: dict) -> bool:
"""Validate rating data with enhanced checks."""
try:
required_fields = {
'symbol': str,
'publishedDate': str,
'gradingCompany': str,
'newGrade': str
}
# Check all required fields exist and are of correct type
for field, field_type in required_fields.items():
if field not in rating or not isinstance(rating[field], field_type):
logging.debug(f"Missing or invalid field {field} in rating data")
return False
# Validate date format
if not parse_date(rating['publishedDate']):
logging.debug(f"Invalid date format in rating data")
return False
# Validate price if present
if 'priceWhenPosted' in rating and rating['priceWhenPosted'] is not None:
if validate_price(rating['priceWhenPosted']) is None:
return False
return True
except Exception as e:
logging.error(f"Rating validation error: {str(e)}")
return False
def save_ratings_batch(
conn: psycopg2.extensions.connection,
ticker_id: int,
ratings: List[Dict]
) -> None:
"""Save multiple ratings efficiently with duplicate handling."""
if not ratings:
return
try:
cursor = conn.cursor()
valid_ratings = [r for r in ratings if is_valid_rating(r)]
if not valid_ratings:
logging.warning("No valid ratings to save")
return
# Create a dictionary to handle duplicates by keeping the latest rating
unique_ratings: Dict[Tuple, Dict] = {}
for rating in valid_ratings:
date = parse_date(rating['publishedDate'])
if not date:
continue
key = (ticker_id, date, rating['gradingCompany'])
# If duplicate exists, keep the one with the most recent date
if key not in unique_ratings or parse_date(rating['publishedDate']) > parse_date(unique_ratings[key]['publishedDate']):
unique_ratings[key] = rating
# Convert unique ratings to list of tuples for batch insert
args = []
for key, rating in unique_ratings.items():
price_target = validate_price(rating.get('priceWhenPosted'))
date = parse_date(rating['publishedDate'])
if date: # Additional date check before adding to args
args.append((
key[0], # ticker_id
date, # date
key[2], # analyst
rating.get('previousGrade'),
rating['newGrade'],
rating.get('action', 'unknown'),
price_target,
rating.get('newsURL')
))
if not args:
logging.warning("No valid ratings to insert after processing")
return
execute_values(cursor, """
INSERT INTO ratings
(ticker_id, date, analyst, prior_rating, current_rating,
action, price_target, url)
VALUES %s
ON CONFLICT (ticker_id, date, analyst)
DO UPDATE SET
prior_rating = EXCLUDED.prior_rating,
current_rating = EXCLUDED.current_rating,
action = EXCLUDED.action,
price_target = EXCLUDED.price_target,
url = EXCLUDED.url
WHERE ratings.current_rating IS DISTINCT FROM EXCLUDED.current_rating
OR ratings.price_target IS DISTINCT FROM EXCLUDED.price_target
OR ratings.prior_rating IS DISTINCT FROM EXCLUDED.prior_rating
""", args)
conn.commit()
logging.info(f"Saved {len(args)} ratings for ticker {ticker_id}")
except psycopg2.Error as e:
conn.rollback()
logging.error(f"Database error in batch save ratings: {str(e)}")
logging.error(f"PostgreSQL Error Code: {e.pgcode}")
if hasattr(e, 'diag'):
logging.error(f"PostgreSQL Diagnostic: {e.diag}")
raise
except Exception as e:
conn.rollback()
logging.error(f"Unexpected error in batch save ratings: {str(e)}")
logging.debug(f"Failed ratings data: {ratings}")
raise
finally:
cursor.close()
def save_rating_consensus(
conn: psycopg2.extensions.connection,
ticker_id: int,
consensus: Dict
) -> None:
"""Save rating consensus with validation and error handling."""
if not consensus:
logging.warning(f"Empty consensus data for ticker {ticker_id}")
return
try:
cursor = conn.cursor()
date = datetime.now().date()
# Validate consensus data
total_ratings = sum([
consensus.get('strongBuy', 0),
consensus.get('buy', 0),
consensus.get('hold', 0),
consensus.get('sell', 0),
consensus.get('strongSell', 0)
])
if total_ratings == 0:
logging.warning(f"No ratings found in consensus data for ticker {ticker_id}")
return
cursor.execute("""
INSERT INTO rating_consensus
(ticker_id, date, consensus_rating, total_ratings,
buy_ratings, hold_ratings, sell_ratings,
strong_buy_ratings, strong_sell_ratings, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (ticker_id, date)
DO UPDATE SET
consensus_rating = EXCLUDED.consensus_rating,
total_ratings = EXCLUDED.total_ratings,
buy_ratings = EXCLUDED.buy_ratings,
hold_ratings = EXCLUDED.hold_ratings,
sell_ratings = EXCLUDED.sell_ratings,
strong_buy_ratings = EXCLUDED.strong_buy_ratings,
strong_sell_ratings = EXCLUDED.strong_sell_ratings,
created_at = CURRENT_TIMESTAMP
WHERE rating_consensus.total_ratings != EXCLUDED.total_ratings
OR rating_consensus.consensus_rating IS DISTINCT FROM EXCLUDED.consensus_rating
""", (
ticker_id,
date,
consensus.get('consensus'),
total_ratings,
consensus.get('buy', 0),
consensus.get('hold', 0),
consensus.get('sell', 0),
consensus.get('strongBuy', 0),
consensus.get('strongSell', 0)
))
conn.commit()
logging.info(f"Saved rating consensus for ticker {ticker_id} on {date}")
except psycopg2.Error as e:
conn.rollback()
logging.error(f"Database error saving rating consensus: {str(e)}")
logging.error(f"PostgreSQL Error Code: {e.pgcode}")
if hasattr(e, 'diag'):
logging.error(f"PostgreSQL Diagnostic: {e.diag}")
raise
except Exception as e:
conn.rollback()
logging.error(f"Unexpected error saving rating consensus: {str(e)}")
logging.debug(f"Failed consensus data: {consensus}")
raise
finally:
cursor.close()