RivaCube/utils/Stocks/transcripts/utils.py
2025-02-04 19:31:18 +01:00

138 lines
4.4 KiB
Python

import logging
import json
import hashlib
from datetime import datetime
import psycopg2
from typing import Dict, Optional
def parse_date(date_str: Optional[str]) -> datetime:
"""Parse date string with multiple format support."""
if not date_str:
return datetime.now()
date_formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S.%fZ',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%d'
]
for date_format in date_formats:
try:
return datetime.strptime(date_str, date_format)
except ValueError:
continue
return datetime.now()
def calculate_content_hash(content: str) -> str:
"""Calculate hash of transcript content for deduplication."""
return hashlib.md5(content.encode('utf-8')).hexdigest()
def save_transcript(
conn: psycopg2.extensions.connection,
ticker_id: int,
transcript: Dict
) -> None:
try:
cursor = conn.cursor()
# Validate content
content = transcript.get('content', '').strip()
if not content or len(content) < 100:
logging.warning(f"Invalid transcript content for ticker {ticker_id}")
return
# Calculate content hash
content_hash = calculate_content_hash(content)
# Parse date
date = parse_date(transcript.get('date'))
year = transcript.get('year', date.year)
quarter = transcript.get('quarter', (date.month - 1) // 3 + 1)
# Extract participants
participants = extract_participants(content)
# Check for existing content
cursor.execute("""
SELECT id FROM earnings_transcripts
WHERE ticker_id = %s AND content_hash = %s
""", (ticker_id, content_hash))
if cursor.fetchone():
logging.debug(f"Duplicate transcript content for {ticker_id} {year}Q{quarter}")
return
cursor.execute("""
INSERT INTO earnings_transcripts
(ticker_id, date, year, quarter, content, content_hash,
participants, url, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (ticker_id, year, quarter)
DO UPDATE SET
content = EXCLUDED.content,
content_hash = EXCLUDED.content_hash,
participants = EXCLUDED.participants,
url = EXCLUDED.url,
created_at = CURRENT_TIMESTAMP
WHERE earnings_transcripts.content_hash != EXCLUDED.content_hash
""", (
ticker_id,
date,
year,
quarter,
content,
content_hash,
json.dumps(participants),
transcript.get('url')
))
if cursor.rowcount > 0:
logging.info(f"Saved/updated transcript for {ticker_id} {year}Q{quarter}")
else:
logging.debug(f"No changes for transcript {ticker_id} {year}Q{quarter}")
conn.commit()
except Exception as e:
conn.rollback()
logging.error(f"Error saving transcript: {e}")
logging.debug(f"Failed transcript data: {transcript}")
finally:
cursor.close()
def extract_participants(content: str) -> Dict:
"""Extract participants and their roles from transcript content."""
participants = {
'executives': [],
'analysts': [],
'operator': []
}
try:
seen = set() # Track unique names
lines = content.split('\n')
for line in lines:
if ':' in line:
speaker = line.split(':')[0].strip()
# Skip if already processed
if speaker in seen:
continue
seen.add(speaker)
speaker_lower = speaker.lower()
if any(title in speaker_lower for title in
['ceo', 'cfo', 'president', 'director', 'chief']):
participants['executives'].append(speaker)
elif 'analyst' in speaker_lower:
participants['analysts'].append(speaker)
elif 'operator' in speaker_lower:
participants['operator'].append(speaker)
except Exception as e:
logging.error(f"Error extracting participants: {e}")
return participants