import logging import json import hashlib from datetime import datetime import psycopg2 from typing import Dict, Optional def parse_date(date_str: Optional[str]) -> datetime: """Parse date string with multiple format support.""" if not date_str: return datetime.now() date_formats = [ '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%d' ] for date_format in date_formats: try: return datetime.strptime(date_str, date_format) except ValueError: continue return datetime.now() def calculate_content_hash(content: str) -> str: """Calculate hash of transcript content for deduplication.""" return hashlib.md5(content.encode('utf-8')).hexdigest() def save_transcript( conn: psycopg2.extensions.connection, ticker_id: int, transcript: Dict ) -> None: try: cursor = conn.cursor() # Validate content content = transcript.get('content', '').strip() if not content or len(content) < 100: logging.warning(f"Invalid transcript content for ticker {ticker_id}") return # Calculate content hash content_hash = calculate_content_hash(content) # Parse date date = parse_date(transcript.get('date')) year = transcript.get('year', date.year) quarter = transcript.get('quarter', (date.month - 1) // 3 + 1) # Extract participants participants = extract_participants(content) # Check for existing content cursor.execute(""" SELECT id FROM earnings_transcripts WHERE ticker_id = %s AND content_hash = %s """, (ticker_id, content_hash)) if cursor.fetchone(): logging.debug(f"Duplicate transcript content for {ticker_id} {year}Q{quarter}") return cursor.execute(""" INSERT INTO earnings_transcripts (ticker_id, date, year, quarter, content, content_hash, participants, url, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP) ON CONFLICT (ticker_id, year, quarter) DO UPDATE SET content = EXCLUDED.content, content_hash = EXCLUDED.content_hash, participants = EXCLUDED.participants, url = EXCLUDED.url, created_at = CURRENT_TIMESTAMP WHERE earnings_transcripts.content_hash != EXCLUDED.content_hash """, ( ticker_id, date, year, quarter, content, content_hash, json.dumps(participants), transcript.get('url') )) if cursor.rowcount > 0: logging.info(f"Saved/updated transcript for {ticker_id} {year}Q{quarter}") else: logging.debug(f"No changes for transcript {ticker_id} {year}Q{quarter}") conn.commit() except Exception as e: conn.rollback() logging.error(f"Error saving transcript: {e}") logging.debug(f"Failed transcript data: {transcript}") finally: cursor.close() def extract_participants(content: str) -> Dict: """Extract participants and their roles from transcript content.""" participants = { 'executives': [], 'analysts': [], 'operator': [] } try: seen = set() # Track unique names lines = content.split('\n') for line in lines: if ':' in line: speaker = line.split(':')[0].strip() # Skip if already processed if speaker in seen: continue seen.add(speaker) speaker_lower = speaker.lower() if any(title in speaker_lower for title in ['ceo', 'cfo', 'president', 'director', 'chief']): participants['executives'].append(speaker) elif 'analyst' in speaker_lower: participants['analysts'].append(speaker) elif 'operator' in speaker_lower: participants['operator'].append(speaker) except Exception as e: logging.error(f"Error extracting participants: {e}") return participants