import logging import json from datetime import datetime import psycopg2 from typing import Dict, Optional def validate_statement_data(data: Dict) -> bool: """Validate that required fields exist in financial statement data.""" required_fields = ['date', 'fillingDate', 'period'] return all(field in data for field in required_fields) def save_financial_statement( conn: psycopg2.extensions.connection, ticker_id: int, statement_type: str, period: str, data: Dict, is_reported: bool = False ) -> bool: """Save financial statement data to database.""" if not validate_statement_data(data): logging.error(f"Invalid statement data for ticker_id {ticker_id}") return False try: cursor = conn.cursor() date = datetime.strptime(data.get('date', data.get('fillingDate')), '%Y-%m-%d').date() cursor.execute(""" INSERT INTO financial_statements (ticker_id, date, period, statement_type, is_reported, data) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (ticker_id, date, period, statement_type, is_reported) DO UPDATE SET data = EXCLUDED.data, updated_at = CURRENT_TIMESTAMP RETURNING id """, ( ticker_id, date, period, statement_type, is_reported, json.dumps(data) )) conn.commit() return True except Exception as e: conn.rollback() logging.error(f"Error saving financial statement: {e}") return False finally: cursor.close() class StatementStatus: def __init__(self, connection: psycopg2.extensions.connection): self.conn = connection self._ensure_status_table() def _ensure_status_table(self): """Ensure status tracking table exists.""" try: cursor = self.conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS statement_update_status ( ticker_id INTEGER PRIMARY KEY, last_update TIMESTAMP WITH TIME ZONE, status VARCHAR(50), error_message TEXT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ) """) self.conn.commit() except Exception as e: self.conn.rollback() logging.error(f"Error creating status table: {e}") finally: cursor.close() def update_status(self, ticker_id: int, status: str, error_message: Optional[str] = None): """Update the status for a ticker's statement collection.""" try: cursor = self.conn.cursor() cursor.execute(""" INSERT INTO statement_update_status (ticker_id, last_update, status, error_message) VALUES (%s, CURRENT_TIMESTAMP, %s, %s) ON CONFLICT (ticker_id) DO UPDATE SET last_update = CURRENT_TIMESTAMP, status = EXCLUDED.status, error_message = EXCLUDED.error_message, updated_at = CURRENT_TIMESTAMP """, (ticker_id, status, error_message)) self.conn.commit() except Exception as e: self.conn.rollback() logging.error(f"Error updating status: {e}") finally: cursor.close() def get_last_update(self, ticker_id: int) -> Optional[datetime]: """Get the last update timestamp for a ticker.""" try: cursor = self.conn.cursor() cursor.execute(""" SELECT last_update FROM statement_update_status WHERE ticker_id = %s """, (ticker_id,)) result = cursor.fetchone() return result[0] if result else None finally: cursor.close()