240 lines
8.4 KiB
Python
240 lines
8.4 KiB
Python
import logging
|
|
from datetime import datetime
|
|
import psycopg2
|
|
from typing import Dict, Optional, List, Tuple, Any
|
|
from psycopg2.extras import execute_values
|
|
|
|
def parse_date(date_str: Optional[str]) -> Optional[datetime.date]:
|
|
"""Parse date string with multiple format support and validation."""
|
|
if not date_str:
|
|
logging.warning(f"Empty date string received")
|
|
return None
|
|
|
|
date_formats = [
|
|
'%Y-%m-%d',
|
|
'%Y-%m-%d %H:%M:%S',
|
|
'%Y-%m-%dT%H:%M:%S.%fZ',
|
|
'%Y-%m-%dT%H:%M:%SZ'
|
|
]
|
|
|
|
for date_format in date_formats:
|
|
try:
|
|
return datetime.strptime(date_str, date_format).date()
|
|
except ValueError:
|
|
continue
|
|
|
|
logging.warning(f"Could not parse date: {date_str}")
|
|
return None
|
|
|
|
def validate_price(price_value: Any) -> Optional[float]:
|
|
"""Validate and convert price value to float."""
|
|
if price_value is None:
|
|
return None
|
|
|
|
try:
|
|
price = float(price_value)
|
|
if price < 0:
|
|
logging.warning(f"Negative price value: {price}")
|
|
return None
|
|
return price
|
|
except (ValueError, TypeError):
|
|
logging.warning(f"Invalid price value: {price_value}")
|
|
return None
|
|
|
|
def is_valid_rating(rating: dict) -> bool:
|
|
"""Validate rating data with enhanced checks."""
|
|
try:
|
|
required_fields = {
|
|
'symbol': str,
|
|
'publishedDate': str,
|
|
'gradingCompany': str,
|
|
'newGrade': str
|
|
}
|
|
|
|
# Check all required fields exist and are of correct type
|
|
for field, field_type in required_fields.items():
|
|
if field not in rating or not isinstance(rating[field], field_type):
|
|
logging.debug(f"Missing or invalid field {field} in rating data")
|
|
return False
|
|
|
|
# Validate date format
|
|
if not parse_date(rating['publishedDate']):
|
|
logging.debug(f"Invalid date format in rating data")
|
|
return False
|
|
|
|
# Validate price if present
|
|
if 'priceWhenPosted' in rating and rating['priceWhenPosted'] is not None:
|
|
if validate_price(rating['priceWhenPosted']) is None:
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Rating validation error: {str(e)}")
|
|
return False
|
|
|
|
def save_ratings_batch(
|
|
conn: psycopg2.extensions.connection,
|
|
ticker_id: int,
|
|
ratings: List[Dict]
|
|
) -> None:
|
|
"""Save multiple ratings efficiently with duplicate handling."""
|
|
if not ratings:
|
|
return
|
|
|
|
try:
|
|
cursor = conn.cursor()
|
|
valid_ratings = [r for r in ratings if is_valid_rating(r)]
|
|
|
|
if not valid_ratings:
|
|
logging.warning("No valid ratings to save")
|
|
return
|
|
|
|
# Create a dictionary to handle duplicates by keeping the latest rating
|
|
unique_ratings: Dict[Tuple, Dict] = {}
|
|
for rating in valid_ratings:
|
|
date = parse_date(rating['publishedDate'])
|
|
if not date:
|
|
continue
|
|
|
|
key = (ticker_id, date, rating['gradingCompany'])
|
|
|
|
# If duplicate exists, keep the one with the most recent date
|
|
if key not in unique_ratings or parse_date(rating['publishedDate']) > parse_date(unique_ratings[key]['publishedDate']):
|
|
unique_ratings[key] = rating
|
|
|
|
# Convert unique ratings to list of tuples for batch insert
|
|
args = []
|
|
for key, rating in unique_ratings.items():
|
|
price_target = validate_price(rating.get('priceWhenPosted'))
|
|
date = parse_date(rating['publishedDate'])
|
|
if date: # Additional date check before adding to args
|
|
args.append((
|
|
key[0], # ticker_id
|
|
date, # date
|
|
key[2], # analyst
|
|
rating.get('previousGrade'),
|
|
rating['newGrade'],
|
|
rating.get('action', 'unknown'),
|
|
price_target,
|
|
rating.get('newsURL')
|
|
))
|
|
|
|
if not args:
|
|
logging.warning("No valid ratings to insert after processing")
|
|
return
|
|
|
|
execute_values(cursor, """
|
|
INSERT INTO ratings
|
|
(ticker_id, date, analyst, prior_rating, current_rating,
|
|
action, price_target, url)
|
|
VALUES %s
|
|
ON CONFLICT (ticker_id, date, analyst)
|
|
DO UPDATE SET
|
|
prior_rating = EXCLUDED.prior_rating,
|
|
current_rating = EXCLUDED.current_rating,
|
|
action = EXCLUDED.action,
|
|
price_target = EXCLUDED.price_target,
|
|
url = EXCLUDED.url
|
|
WHERE ratings.current_rating IS DISTINCT FROM EXCLUDED.current_rating
|
|
OR ratings.price_target IS DISTINCT FROM EXCLUDED.price_target
|
|
OR ratings.prior_rating IS DISTINCT FROM EXCLUDED.prior_rating
|
|
""", args)
|
|
|
|
conn.commit()
|
|
logging.info(f"Saved {len(args)} ratings for ticker {ticker_id}")
|
|
|
|
except psycopg2.Error as e:
|
|
conn.rollback()
|
|
logging.error(f"Database error in batch save ratings: {str(e)}")
|
|
logging.error(f"PostgreSQL Error Code: {e.pgcode}")
|
|
if hasattr(e, 'diag'):
|
|
logging.error(f"PostgreSQL Diagnostic: {e.diag}")
|
|
raise
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Unexpected error in batch save ratings: {str(e)}")
|
|
logging.debug(f"Failed ratings data: {ratings}")
|
|
raise
|
|
|
|
finally:
|
|
cursor.close()
|
|
|
|
def save_rating_consensus(
|
|
conn: psycopg2.extensions.connection,
|
|
ticker_id: int,
|
|
consensus: Dict
|
|
) -> None:
|
|
"""Save rating consensus with validation and error handling."""
|
|
if not consensus:
|
|
logging.warning(f"Empty consensus data for ticker {ticker_id}")
|
|
return
|
|
|
|
try:
|
|
cursor = conn.cursor()
|
|
date = datetime.now().date()
|
|
|
|
# Validate consensus data
|
|
total_ratings = sum([
|
|
consensus.get('strongBuy', 0),
|
|
consensus.get('buy', 0),
|
|
consensus.get('hold', 0),
|
|
consensus.get('sell', 0),
|
|
consensus.get('strongSell', 0)
|
|
])
|
|
|
|
if total_ratings == 0:
|
|
logging.warning(f"No ratings found in consensus data for ticker {ticker_id}")
|
|
return
|
|
|
|
cursor.execute("""
|
|
INSERT INTO rating_consensus
|
|
(ticker_id, date, consensus_rating, total_ratings,
|
|
buy_ratings, hold_ratings, sell_ratings,
|
|
strong_buy_ratings, strong_sell_ratings, created_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
|
|
ON CONFLICT (ticker_id, date)
|
|
DO UPDATE SET
|
|
consensus_rating = EXCLUDED.consensus_rating,
|
|
total_ratings = EXCLUDED.total_ratings,
|
|
buy_ratings = EXCLUDED.buy_ratings,
|
|
hold_ratings = EXCLUDED.hold_ratings,
|
|
sell_ratings = EXCLUDED.sell_ratings,
|
|
strong_buy_ratings = EXCLUDED.strong_buy_ratings,
|
|
strong_sell_ratings = EXCLUDED.strong_sell_ratings,
|
|
created_at = CURRENT_TIMESTAMP
|
|
WHERE rating_consensus.total_ratings != EXCLUDED.total_ratings
|
|
OR rating_consensus.consensus_rating IS DISTINCT FROM EXCLUDED.consensus_rating
|
|
""", (
|
|
ticker_id,
|
|
date,
|
|
consensus.get('consensus'),
|
|
total_ratings,
|
|
consensus.get('buy', 0),
|
|
consensus.get('hold', 0),
|
|
consensus.get('sell', 0),
|
|
consensus.get('strongBuy', 0),
|
|
consensus.get('strongSell', 0)
|
|
))
|
|
|
|
conn.commit()
|
|
logging.info(f"Saved rating consensus for ticker {ticker_id} on {date}")
|
|
|
|
except psycopg2.Error as e:
|
|
conn.rollback()
|
|
logging.error(f"Database error saving rating consensus: {str(e)}")
|
|
logging.error(f"PostgreSQL Error Code: {e.pgcode}")
|
|
if hasattr(e, 'diag'):
|
|
logging.error(f"PostgreSQL Diagnostic: {e.diag}")
|
|
raise
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Unexpected error saving rating consensus: {str(e)}")
|
|
logging.debug(f"Failed consensus data: {consensus}")
|
|
raise
|
|
|
|
finally:
|
|
cursor.close()
|