import logging from datetime import datetime import psycopg2 from typing import Dict, Optional, List, Tuple, Any from psycopg2.extras import execute_values def parse_date(date_str: Optional[str]) -> Optional[datetime.date]: """Parse date string with multiple format support and validation.""" if not date_str: logging.warning(f"Empty date string received") return None date_formats = [ '%Y-%m-%d', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ' ] for date_format in date_formats: try: return datetime.strptime(date_str, date_format).date() except ValueError: continue logging.warning(f"Could not parse date: {date_str}") return None def validate_price(price_value: Any) -> Optional[float]: """Validate and convert price value to float.""" if price_value is None: return None try: price = float(price_value) if price < 0: logging.warning(f"Negative price value: {price}") return None return price except (ValueError, TypeError): logging.warning(f"Invalid price value: {price_value}") return None def is_valid_rating(rating: dict) -> bool: """Validate rating data with enhanced checks.""" try: required_fields = { 'symbol': str, 'publishedDate': str, 'gradingCompany': str, 'newGrade': str } # Check all required fields exist and are of correct type for field, field_type in required_fields.items(): if field not in rating or not isinstance(rating[field], field_type): logging.debug(f"Missing or invalid field {field} in rating data") return False # Validate date format if not parse_date(rating['publishedDate']): logging.debug(f"Invalid date format in rating data") return False # Validate price if present if 'priceWhenPosted' in rating and rating['priceWhenPosted'] is not None: if validate_price(rating['priceWhenPosted']) is None: return False return True except Exception as e: logging.error(f"Rating validation error: {str(e)}") return False def save_ratings_batch( conn: psycopg2.extensions.connection, ticker_id: int, ratings: List[Dict] ) -> None: """Save multiple ratings efficiently with duplicate handling.""" if not ratings: return try: cursor = conn.cursor() valid_ratings = [r for r in ratings if is_valid_rating(r)] if not valid_ratings: logging.warning("No valid ratings to save") return # Create a dictionary to handle duplicates by keeping the latest rating unique_ratings: Dict[Tuple, Dict] = {} for rating in valid_ratings: date = parse_date(rating['publishedDate']) if not date: continue key = (ticker_id, date, rating['gradingCompany']) # If duplicate exists, keep the one with the most recent date if key not in unique_ratings or parse_date(rating['publishedDate']) > parse_date(unique_ratings[key]['publishedDate']): unique_ratings[key] = rating # Convert unique ratings to list of tuples for batch insert args = [] for key, rating in unique_ratings.items(): price_target = validate_price(rating.get('priceWhenPosted')) date = parse_date(rating['publishedDate']) if date: # Additional date check before adding to args args.append(( key[0], # ticker_id date, # date key[2], # analyst rating.get('previousGrade'), rating['newGrade'], rating.get('action', 'unknown'), price_target, rating.get('newsURL') )) if not args: logging.warning("No valid ratings to insert after processing") return execute_values(cursor, """ INSERT INTO ratings (ticker_id, date, analyst, prior_rating, current_rating, action, price_target, url) VALUES %s ON CONFLICT (ticker_id, date, analyst) DO UPDATE SET prior_rating = EXCLUDED.prior_rating, current_rating = EXCLUDED.current_rating, action = EXCLUDED.action, price_target = EXCLUDED.price_target, url = EXCLUDED.url WHERE ratings.current_rating IS DISTINCT FROM EXCLUDED.current_rating OR ratings.price_target IS DISTINCT FROM EXCLUDED.price_target OR ratings.prior_rating IS DISTINCT FROM EXCLUDED.prior_rating """, args) conn.commit() logging.info(f"Saved {len(args)} ratings for ticker {ticker_id}") except psycopg2.Error as e: conn.rollback() logging.error(f"Database error in batch save ratings: {str(e)}") logging.error(f"PostgreSQL Error Code: {e.pgcode}") if hasattr(e, 'diag'): logging.error(f"PostgreSQL Diagnostic: {e.diag}") raise except Exception as e: conn.rollback() logging.error(f"Unexpected error in batch save ratings: {str(e)}") logging.debug(f"Failed ratings data: {ratings}") raise finally: cursor.close() def save_rating_consensus( conn: psycopg2.extensions.connection, ticker_id: int, consensus: Dict ) -> None: """Save rating consensus with validation and error handling.""" if not consensus: logging.warning(f"Empty consensus data for ticker {ticker_id}") return try: cursor = conn.cursor() date = datetime.now().date() # Validate consensus data total_ratings = sum([ consensus.get('strongBuy', 0), consensus.get('buy', 0), consensus.get('hold', 0), consensus.get('sell', 0), consensus.get('strongSell', 0) ]) if total_ratings == 0: logging.warning(f"No ratings found in consensus data for ticker {ticker_id}") return cursor.execute(""" INSERT INTO rating_consensus (ticker_id, date, consensus_rating, total_ratings, buy_ratings, hold_ratings, sell_ratings, strong_buy_ratings, strong_sell_ratings, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP) ON CONFLICT (ticker_id, date) DO UPDATE SET consensus_rating = EXCLUDED.consensus_rating, total_ratings = EXCLUDED.total_ratings, buy_ratings = EXCLUDED.buy_ratings, hold_ratings = EXCLUDED.hold_ratings, sell_ratings = EXCLUDED.sell_ratings, strong_buy_ratings = EXCLUDED.strong_buy_ratings, strong_sell_ratings = EXCLUDED.strong_sell_ratings, created_at = CURRENT_TIMESTAMP WHERE rating_consensus.total_ratings != EXCLUDED.total_ratings OR rating_consensus.consensus_rating IS DISTINCT FROM EXCLUDED.consensus_rating """, ( ticker_id, date, consensus.get('consensus'), total_ratings, consensus.get('buy', 0), consensus.get('hold', 0), consensus.get('sell', 0), consensus.get('strongBuy', 0), consensus.get('strongSell', 0) )) conn.commit() logging.info(f"Saved rating consensus for ticker {ticker_id} on {date}") except psycopg2.Error as e: conn.rollback() logging.error(f"Database error saving rating consensus: {str(e)}") logging.error(f"PostgreSQL Error Code: {e.pgcode}") if hasattr(e, 'diag'): logging.error(f"PostgreSQL Diagnostic: {e.diag}") raise except Exception as e: conn.rollback() logging.error(f"Unexpected error saving rating consensus: {str(e)}") logging.debug(f"Failed consensus data: {consensus}") raise finally: cursor.close()