import os import logging from datetime import datetime import psycopg2 from typing import Dict, Optional, List from dotenv import load_dotenv import requests from time import sleep import json import hashlib class TranscriptCollector: """Collector for earnings call transcripts.""" def __init__(self, api_key: str): self.api_key = api_key self.base_url_v4 = "https://financialmodelingprep.com/api/v4" self.base_url_v3 = "https://financialmodelingprep.com/api/v3" self.session = requests.Session() self.rate_limit_pause = 0.5 # Increased pause time for better rate limiting def _make_request(self, url: str, params: Dict, max_retries: int = 3) -> Optional[Dict]: """Make API request with improved rate limiting and error handling.""" for attempt in range(max_retries): try: sleep(self.rate_limit_pause * (attempt + 1)) # Progressive delay response = self.session.get(url, params=params) response.raise_for_status() if response.status_code == 200: try: data = response.json() logging.debug(f"Raw API response: {json.dumps(data, indent=2)}") return data except json.JSONDecodeError as e: logging.error(f"Failed to parse JSON response: {str(e)}") logging.debug(f"Raw response content: {response.text}") return None except requests.exceptions.RequestException as e: if response.status_code == 429: # Too Many Requests sleep(2 ** attempt) # Exponential backoff continue logging.error(f"API request failed: {str(e)}") if hasattr(response, 'text'): logging.error(f"Response content: {response.text}") if attempt == max_retries - 1: return None except Exception as e: logging.error(f"Unexpected error in API request: {str(e)}") return None def get_earnings_call_transcripts(self, ticker: str, limit: int = 100) -> List[Dict]: """Get earnings call transcripts with validation.""" try: params = { 'apikey': self.api_key, 'symbol': ticker, 'limit': limit } # Try v4 endpoint first data = self._make_request( f"{self.base_url_v4}/earning_call_transcript/{ticker}", params ) # Fallback to v3 endpoint if needed if not data: data = self._make_request( f"{self.base_url_v3}/earning_call_transcript/{ticker}", params ) if not data: logging.info(f"No transcript data available for {ticker}") return [] if not isinstance(data, list): data = [data] if isinstance(data, dict) else [] # Log the number of transcripts found logging.info(f"Found {len(data)} transcripts for {ticker}") # Validate each transcript valid_transcripts = [] for transcript in data: if self._validate_transcript(transcript): valid_transcripts.append(transcript) return valid_transcripts except Exception as e: logging.error(f"Error getting transcripts for {ticker}: {str(e)}") return [] def _validate_transcript(self, transcript: Dict) -> bool: """Validate transcript data.""" if not isinstance(transcript, dict): return False required_fields = ['date', 'quarter', 'year', 'content'] if not all(field in transcript for field in required_fields): return False content = transcript.get('content', '').strip() if not content or len(content) < 100: return False try: year = int(transcript.get('year', 0)) quarter = int(transcript.get('quarter', 0)) return 1900 <= year <= datetime.now().year and 1 <= quarter <= 4 except (TypeError, ValueError): return False def calculate_content_hash(content: str) -> str: """Calculate hash of transcript content for deduplication.""" return hashlib.md5(content.encode('utf-8')).hexdigest() def setup_logging(): """Configure logging settings.""" log_filename = f'logs/transcript_update_{datetime.now():%Y%m%d_%H%M%S}.log' os.makedirs('logs', exist_ok=True) logging.getLogger().handlers = [] logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_filename), logging.StreamHandler() ] ) def ensure_tables(conn: psycopg2.extensions.connection) -> None: """Ensure required database tables exist.""" try: with conn.cursor() as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS earnings_transcripts ( id SERIAL PRIMARY KEY, ticker_id INTEGER REFERENCES tickers(id), date TIMESTAMP NOT NULL, quarter INTEGER NOT NULL, year INTEGER NOT NULL, content TEXT, content_hash TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE (ticker_id, content_hash) ) """) cursor.execute(""" DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_earnings_transcripts_ticker_date') THEN CREATE INDEX idx_earnings_transcripts_ticker_date ON earnings_transcripts(ticker_id, date); END IF; END $$; """) conn.commit() logging.info("Ensured all required tables exist") except Exception as e: conn.rollback() logging.error(f"Error ensuring tables: {str(e)}") raise def save_transcript( conn: psycopg2.extensions.connection, ticker_id: int, transcript: Dict ) -> bool: """Save transcript with deduplication.""" try: cursor = conn.cursor() # Validate content content = transcript.get('content', '').strip() if not content or len(content) < 100: logging.warning(f"Invalid transcript content for ticker {ticker_id}") return False # Generate content hash for deduplication content_hash = calculate_content_hash(content) # Parse date try: date = datetime.strptime(transcript['date'], '%Y-%m-%d %H:%M:%S') except ValueError: logging.error(f"Invalid date format: {transcript.get('date')}") return False cursor.execute(""" INSERT INTO earnings_transcripts (ticker_id, date, quarter, year, content, content_hash, created_at) VALUES (%s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP) ON CONFLICT (ticker_id, content_hash) DO UPDATE SET date = EXCLUDED.date, quarter = EXCLUDED.quarter, year = EXCLUDED.year, created_at = CURRENT_TIMESTAMP WHERE earnings_transcripts.content_hash = EXCLUDED.content_hash RETURNING id """, ( ticker_id, date, transcript['quarter'], transcript['year'], content, content_hash )) result = cursor.fetchone() conn.commit() if result: logging.info(f"Saved/updated transcript for {ticker_id} on {date}") return True else: logging.debug(f"No changes needed for transcript {ticker_id} on {date}") return False except Exception as e: conn.rollback() logging.error(f"Error saving transcript: {str(e)}") logging.debug(f"Failed transcript data: {transcript}") return False finally: cursor.close() def process_ticker( conn: psycopg2.extensions.connection, collector: TranscriptCollector, ticker_id: int, ticker_symbol: str ) -> None: """Process all transcripts for a single ticker.""" try: # Skip certain exchanges if any(suffix in ticker_symbol for suffix in ['.KS', '.KQ', '.SS', '.SZ']): logging.info(f"Skipping {ticker_symbol} - exchange not supported") return # Get all available transcripts transcripts = collector.get_earnings_call_transcripts(ticker_symbol) sleep(0.5) # Rate limiting between API calls if transcripts: saved_count = 0 for transcript in transcripts: if save_transcript(conn, ticker_id, transcript): saved_count += 1 logging.info( f"Processed {len(transcripts)} transcripts for {ticker_symbol}, " f"{saved_count} saved/updated" ) else: logging.info(f"No transcript data available for {ticker_symbol}") except Exception as e: logging.error( f"Error processing ticker {ticker_symbol} (ID: {ticker_id}): {str(e)}" ) def main(): """Main entry point for transcript update process.""" load_dotenv() setup_logging() try: conn = psycopg2.connect( dbname=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), host=os.getenv('DB_HOST'), port=os.getenv('DB_PORT') ) ensure_tables(conn) collector = TranscriptCollector(os.getenv('FMP_API_KEY')) # Get all active tickers with conn.cursor() as cursor: cursor.execute(""" SELECT id, yf_ticker FROM tickers WHERE yf_ticker IS NOT NULL AND yf_ticker NOT LIKE '%.KS' AND yf_ticker NOT LIKE '%.KQ' AND yf_ticker NOT LIKE '%.SS' AND yf_ticker NOT LIKE '%.SZ' ORDER BY yf_ticker """) tickers = cursor.fetchall() total_tickers = len(tickers) logging.info(f"Starting transcript update for {total_tickers} tickers") for index, (ticker_id, ticker_symbol) in enumerate(tickers, 1): logging.info(f"Processing {ticker_symbol} ({index}/{total_tickers})") process_ticker(conn, collector, ticker_id, ticker_symbol) logging.info("Transcript update completed successfully") except Exception as e: logging.error(f"Error updating transcripts: {str(e)}") raise finally: if 'conn' in locals(): conn.close() if __name__ == "__main__": main()