138 lines
4.4 KiB
Python
138 lines
4.4 KiB
Python
import logging
|
|
import json
|
|
import hashlib
|
|
from datetime import datetime
|
|
import psycopg2
|
|
from typing import Dict, Optional
|
|
|
|
def parse_date(date_str: Optional[str]) -> datetime:
|
|
"""Parse date string with multiple format support."""
|
|
if not date_str:
|
|
return datetime.now()
|
|
|
|
date_formats = [
|
|
'%Y-%m-%d %H:%M:%S',
|
|
'%Y-%m-%dT%H:%M:%S.%fZ',
|
|
'%Y-%m-%dT%H:%M:%SZ',
|
|
'%Y-%m-%d'
|
|
]
|
|
|
|
for date_format in date_formats:
|
|
try:
|
|
return datetime.strptime(date_str, date_format)
|
|
except ValueError:
|
|
continue
|
|
|
|
return datetime.now()
|
|
|
|
def calculate_content_hash(content: str) -> str:
|
|
"""Calculate hash of transcript content for deduplication."""
|
|
return hashlib.md5(content.encode('utf-8')).hexdigest()
|
|
|
|
def save_transcript(
|
|
conn: psycopg2.extensions.connection,
|
|
ticker_id: int,
|
|
transcript: Dict
|
|
) -> None:
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Validate content
|
|
content = transcript.get('content', '').strip()
|
|
if not content or len(content) < 100:
|
|
logging.warning(f"Invalid transcript content for ticker {ticker_id}")
|
|
return
|
|
|
|
# Calculate content hash
|
|
content_hash = calculate_content_hash(content)
|
|
|
|
# Parse date
|
|
date = parse_date(transcript.get('date'))
|
|
year = transcript.get('year', date.year)
|
|
quarter = transcript.get('quarter', (date.month - 1) // 3 + 1)
|
|
|
|
# Extract participants
|
|
participants = extract_participants(content)
|
|
|
|
# Check for existing content
|
|
cursor.execute("""
|
|
SELECT id FROM earnings_transcripts
|
|
WHERE ticker_id = %s AND content_hash = %s
|
|
""", (ticker_id, content_hash))
|
|
|
|
if cursor.fetchone():
|
|
logging.debug(f"Duplicate transcript content for {ticker_id} {year}Q{quarter}")
|
|
return
|
|
|
|
cursor.execute("""
|
|
INSERT INTO earnings_transcripts
|
|
(ticker_id, date, year, quarter, content, content_hash,
|
|
participants, url, created_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
|
|
ON CONFLICT (ticker_id, year, quarter)
|
|
DO UPDATE SET
|
|
content = EXCLUDED.content,
|
|
content_hash = EXCLUDED.content_hash,
|
|
participants = EXCLUDED.participants,
|
|
url = EXCLUDED.url,
|
|
created_at = CURRENT_TIMESTAMP
|
|
WHERE earnings_transcripts.content_hash != EXCLUDED.content_hash
|
|
""", (
|
|
ticker_id,
|
|
date,
|
|
year,
|
|
quarter,
|
|
content,
|
|
content_hash,
|
|
json.dumps(participants),
|
|
transcript.get('url')
|
|
))
|
|
|
|
if cursor.rowcount > 0:
|
|
logging.info(f"Saved/updated transcript for {ticker_id} {year}Q{quarter}")
|
|
else:
|
|
logging.debug(f"No changes for transcript {ticker_id} {year}Q{quarter}")
|
|
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error saving transcript: {e}")
|
|
logging.debug(f"Failed transcript data: {transcript}")
|
|
finally:
|
|
cursor.close()
|
|
|
|
def extract_participants(content: str) -> Dict:
|
|
"""Extract participants and their roles from transcript content."""
|
|
participants = {
|
|
'executives': [],
|
|
'analysts': [],
|
|
'operator': []
|
|
}
|
|
|
|
try:
|
|
seen = set() # Track unique names
|
|
lines = content.split('\n')
|
|
|
|
for line in lines:
|
|
if ':' in line:
|
|
speaker = line.split(':')[0].strip()
|
|
|
|
# Skip if already processed
|
|
if speaker in seen:
|
|
continue
|
|
|
|
seen.add(speaker)
|
|
speaker_lower = speaker.lower()
|
|
|
|
if any(title in speaker_lower for title in
|
|
['ceo', 'cfo', 'president', 'director', 'chief']):
|
|
participants['executives'].append(speaker)
|
|
elif 'analyst' in speaker_lower:
|
|
participants['analysts'].append(speaker)
|
|
elif 'operator' in speaker_lower:
|
|
participants['operator'].append(speaker)
|
|
except Exception as e:
|
|
logging.error(f"Error extracting participants: {e}")
|
|
|
|
return participants
|