RivaCube/utils/Stocks/financials/statements_utils.py
2025-02-04 19:31:18 +01:00

116 lines
4.0 KiB
Python

import logging
import json
from datetime import datetime
import psycopg2
from typing import Dict, Optional
def validate_statement_data(data: Dict) -> bool:
"""Validate that required fields exist in financial statement data."""
required_fields = ['date', 'fillingDate', 'period']
return all(field in data for field in required_fields)
def save_financial_statement(
conn: psycopg2.extensions.connection,
ticker_id: int,
statement_type: str,
period: str,
data: Dict,
is_reported: bool = False
) -> bool:
"""Save financial statement data to database."""
if not validate_statement_data(data):
logging.error(f"Invalid statement data for ticker_id {ticker_id}")
return False
try:
cursor = conn.cursor()
date = datetime.strptime(data.get('date', data.get('fillingDate')), '%Y-%m-%d').date()
cursor.execute("""
INSERT INTO financial_statements
(ticker_id, date, period, statement_type, is_reported, data)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (ticker_id, date, period, statement_type, is_reported)
DO UPDATE SET
data = EXCLUDED.data,
updated_at = CURRENT_TIMESTAMP
RETURNING id
""", (
ticker_id,
date,
period,
statement_type,
is_reported,
json.dumps(data)
))
conn.commit()
return True
except Exception as e:
conn.rollback()
logging.error(f"Error saving financial statement: {e}")
return False
finally:
cursor.close()
class StatementStatus:
def __init__(self, connection: psycopg2.extensions.connection):
self.conn = connection
self._ensure_status_table()
def _ensure_status_table(self):
"""Ensure status tracking table exists."""
try:
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS statement_update_status (
ticker_id INTEGER PRIMARY KEY,
last_update TIMESTAMP WITH TIME ZONE,
status VARCHAR(50),
error_message TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
except Exception as e:
self.conn.rollback()
logging.error(f"Error creating status table: {e}")
finally:
cursor.close()
def update_status(self, ticker_id: int, status: str, error_message: Optional[str] = None):
"""Update the status for a ticker's statement collection."""
try:
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO statement_update_status
(ticker_id, last_update, status, error_message)
VALUES (%s, CURRENT_TIMESTAMP, %s, %s)
ON CONFLICT (ticker_id) DO UPDATE SET
last_update = CURRENT_TIMESTAMP,
status = EXCLUDED.status,
error_message = EXCLUDED.error_message,
updated_at = CURRENT_TIMESTAMP
""", (ticker_id, status, error_message))
self.conn.commit()
except Exception as e:
self.conn.rollback()
logging.error(f"Error updating status: {e}")
finally:
cursor.close()
def get_last_update(self, ticker_id: int) -> Optional[datetime]:
"""Get the last update timestamp for a ticker."""
try:
cursor = self.conn.cursor()
cursor.execute("""
SELECT last_update
FROM statement_update_status
WHERE ticker_id = %s
""", (ticker_id,))
result = cursor.fetchone()
return result[0] if result else None
finally:
cursor.close()