import os import logging import json from datetime import datetime import psycopg2 from typing import Dict, Optional, List from decimal import Decimal, InvalidOperation def setup_logging(): """Set up logging configuration.""" log_dir = 'logs' os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f'valuations_update_{datetime.now():%Y%m%d_%H%M%S}.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) def init_database(conn: psycopg2.extensions.connection) -> None: """Initialize database tables if they don't exist.""" cursor = conn.cursor() try: # Create dcf_valuations table cursor.execute(""" CREATE TABLE IF NOT EXISTS dcf_valuations ( id SERIAL PRIMARY KEY, ticker_id INTEGER NOT NULL, date DATE NOT NULL, dcf_type VARCHAR(50) NOT NULL, stock_price DECIMAL(15, 4), dcf_price DECIMAL(15, 4), fair_value DECIMAL(15, 4), upside_downside DECIMAL(15, 4), assumptions JSONB, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE (ticker_id, date, dcf_type) ); CREATE INDEX IF NOT EXISTS idx_dcf_valuations_ticker_date ON dcf_valuations (ticker_id, date); """) # Create company_ratings table cursor.execute(""" CREATE TABLE IF NOT EXISTS company_ratings ( id SERIAL PRIMARY KEY, ticker_id INTEGER NOT NULL, date DATE NOT NULL, rating VARCHAR(50), rating_score DECIMAL(10, 4), rating_recommendation VARCHAR(50), rating_details JSONB, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE (ticker_id, date) ); CREATE INDEX IF NOT EXISTS idx_company_ratings_ticker_date ON company_ratings (ticker_id, date); """) conn.commit() logging.info("Database tables initialized successfully") except Exception as e: conn.rollback() logging.error(f"Error initializing database tables: {e}") raise finally: cursor.close() def parse_date(date_str: Optional[str]) -> datetime.date: """Parse date string with multiple format support.""" if not date_str: return datetime.now().date() date_formats = [ '%Y-%m-%d', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ' ] for date_format in date_formats: try: return datetime.strptime(date_str, date_format).date() except ValueError: continue return datetime.now().date() def convert_to_decimal(value: any) -> Optional[Decimal]: """Convert value to Decimal with proper handling of various input types.""" if value is None: return None try: if isinstance(value, (int, float)): return Decimal(str(value)) elif isinstance(value, str): return Decimal(value.replace(',', '')) return None except (InvalidOperation, TypeError): return None def save_dcf_valuation( conn: psycopg2.extensions.connection, ticker_id: int, dcf_type: str, valuation: Dict ) -> None: """Save DCF valuation with proper data validation and error handling.""" try: cursor = conn.cursor() date = parse_date(valuation.get('date')) # Convert numeric values to Decimal stock_price = convert_to_decimal(valuation.get('stockPrice')) dcf_price = convert_to_decimal(valuation.get('dcfValue', valuation.get('dcfPrice'))) fair_value = convert_to_decimal(valuation.get('fairValue')) upside_downside = convert_to_decimal(valuation.get('upsideDownside')) # Validate required fields if not all([date, dcf_type]): logging.error(f"Missing required fields for DCF valuation: ticker_id={ticker_id}") return cursor.execute(""" INSERT INTO dcf_valuations (ticker_id, date, dcf_type, stock_price, dcf_price, fair_value, upside_downside, assumptions) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (ticker_id, date, dcf_type) DO UPDATE SET stock_price = EXCLUDED.stock_price, dcf_price = EXCLUDED.dcf_price, fair_value = EXCLUDED.fair_value, upside_downside = EXCLUDED.upside_downside, assumptions = EXCLUDED.assumptions, updated_at = CURRENT_TIMESTAMP """, ( ticker_id, date, dcf_type, stock_price, dcf_price, fair_value, upside_downside, json.dumps(valuation.get('assumptions', {})) )) conn.commit() logging.debug(f"Saved {dcf_type} DCF valuation for {ticker_id} on {date}") except Exception as e: conn.rollback() logging.error(f"Error saving DCF valuation: {e}") logging.debug(f"Failed valuation data: {valuation}") finally: cursor.close() def save_company_rating( conn: psycopg2.extensions.connection, ticker_id: int, rating_data: Dict ) -> None: """Save company rating with proper data validation and error handling.""" try: cursor = conn.cursor() date = parse_date(rating_data.get('date')) # Validate required fields if not all([date, rating_data.get('rating')]): logging.error(f"Missing required fields for company rating: ticker_id={ticker_id}") return cursor.execute(""" INSERT INTO company_ratings (ticker_id, date, rating, rating_score, rating_recommendation, rating_details) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (ticker_id, date) DO UPDATE SET rating = EXCLUDED.rating, rating_score = EXCLUDED.rating_score, rating_recommendation = EXCLUDED.rating_recommendation, rating_details = EXCLUDED.rating_details, updated_at = CURRENT_TIMESTAMP """, ( ticker_id, date, rating_data.get('rating'), convert_to_decimal(rating_data.get('ratingScore')), rating_data.get('recommendation'), json.dumps(rating_data) )) conn.commit() logging.debug(f"Saved company rating for {ticker_id} on {date}") except Exception as e: conn.rollback() logging.error(f"Error saving company rating: {e}") logging.debug(f"Failed rating data: {rating_data}") finally: cursor.close()