116 lines
4.0 KiB
Python
116 lines
4.0 KiB
Python
import logging
|
|
import json
|
|
from datetime import datetime
|
|
import psycopg2
|
|
from typing import Dict, Optional
|
|
|
|
def validate_statement_data(data: Dict) -> bool:
|
|
"""Validate that required fields exist in financial statement data."""
|
|
required_fields = ['date', 'fillingDate', 'period']
|
|
return all(field in data for field in required_fields)
|
|
|
|
def save_financial_statement(
|
|
conn: psycopg2.extensions.connection,
|
|
ticker_id: int,
|
|
statement_type: str,
|
|
period: str,
|
|
data: Dict,
|
|
is_reported: bool = False
|
|
) -> bool:
|
|
"""Save financial statement data to database."""
|
|
if not validate_statement_data(data):
|
|
logging.error(f"Invalid statement data for ticker_id {ticker_id}")
|
|
return False
|
|
|
|
try:
|
|
cursor = conn.cursor()
|
|
date = datetime.strptime(data.get('date', data.get('fillingDate')), '%Y-%m-%d').date()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO financial_statements
|
|
(ticker_id, date, period, statement_type, is_reported, data)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (ticker_id, date, period, statement_type, is_reported)
|
|
DO UPDATE SET
|
|
data = EXCLUDED.data,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
RETURNING id
|
|
""", (
|
|
ticker_id,
|
|
date,
|
|
period,
|
|
statement_type,
|
|
is_reported,
|
|
json.dumps(data)
|
|
))
|
|
|
|
conn.commit()
|
|
return True
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error saving financial statement: {e}")
|
|
return False
|
|
finally:
|
|
cursor.close()
|
|
|
|
class StatementStatus:
|
|
def __init__(self, connection: psycopg2.extensions.connection):
|
|
self.conn = connection
|
|
self._ensure_status_table()
|
|
|
|
def _ensure_status_table(self):
|
|
"""Ensure status tracking table exists."""
|
|
try:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS statement_update_status (
|
|
ticker_id INTEGER PRIMARY KEY,
|
|
last_update TIMESTAMP WITH TIME ZONE,
|
|
status VARCHAR(50),
|
|
error_message TEXT,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
self.conn.commit()
|
|
except Exception as e:
|
|
self.conn.rollback()
|
|
logging.error(f"Error creating status table: {e}")
|
|
finally:
|
|
cursor.close()
|
|
|
|
def update_status(self, ticker_id: int, status: str, error_message: Optional[str] = None):
|
|
"""Update the status for a ticker's statement collection."""
|
|
try:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO statement_update_status
|
|
(ticker_id, last_update, status, error_message)
|
|
VALUES (%s, CURRENT_TIMESTAMP, %s, %s)
|
|
ON CONFLICT (ticker_id) DO UPDATE SET
|
|
last_update = CURRENT_TIMESTAMP,
|
|
status = EXCLUDED.status,
|
|
error_message = EXCLUDED.error_message,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""", (ticker_id, status, error_message))
|
|
self.conn.commit()
|
|
except Exception as e:
|
|
self.conn.rollback()
|
|
logging.error(f"Error updating status: {e}")
|
|
finally:
|
|
cursor.close()
|
|
|
|
def get_last_update(self, ticker_id: int) -> Optional[datetime]:
|
|
"""Get the last update timestamp for a ticker."""
|
|
try:
|
|
cursor = self.conn.cursor()
|
|
cursor.execute("""
|
|
SELECT last_update
|
|
FROM statement_update_status
|
|
WHERE ticker_id = %s
|
|
""", (ticker_id,))
|
|
result = cursor.fetchone()
|
|
return result[0] if result else None
|
|
finally:
|
|
cursor.close()
|