RivaCube/utils/economics/utils.py
2025-02-04 19:31:18 +01:00

325 lines
12 KiB
Python

import logging
from datetime import datetime
import psycopg2
from typing import Dict, List, Optional, Union, Any
from psycopg2.extras import execute_values
from psycopg2.pool import SimpleConnectionPool
import os
from dotenv import load_dotenv
class DatabaseManager:
"""Manage database connections and operations."""
def __init__(self):
"""Initialize the database manager."""
load_dotenv()
self.pool = SimpleConnectionPool(
minconn=1,
maxconn=10,
dbname=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT'),
connect_timeout=10
)
self.logger = logging.getLogger(__name__)
def get_connection(self) -> Optional[psycopg2.extensions.connection]:
"""Get a database connection from the pool."""
try:
return self.pool.getconn()
except Exception as e:
self.logger.error(f"Error getting database connection: {e}")
return None
def return_connection(self, conn: psycopg2.extensions.connection) -> None:
"""Return a connection to the pool."""
try:
self.pool.putconn(conn)
except Exception as e:
self.logger.error(f"Error returning connection to pool: {e}")
def close_all(self) -> None:
"""Close all connections in the pool."""
try:
if self.pool:
self.pool.closeall()
except Exception as e:
self.logger.error(f"Error closing connection pool: {e}")
def parse_date(date_str: str) -> datetime.date:
"""Parse date string to datetime.date object."""
try:
if ' ' in date_str:
date_str = date_str.split(' ')[0]
return datetime.strptime(date_str, '%Y-%m-%d').date()
except Exception as e:
raise ValueError(f"Invalid date format: {date_str}") from e
def parse_numeric(value: Any) -> float:
"""Parse numeric value, handling various formats."""
if value is None:
return 0.0
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
try:
return float(value.replace(',', '').strip())
except ValueError:
return 0.0
return 0.0
def save_treasury_rates(conn: psycopg2.extensions.connection, rates: List[Dict]) -> None:
"""Save treasury rates with batch processing."""
try:
cursor = conn.cursor()
processed_dates = set()
values = []
for rate_data in rates:
try:
date_str = rate_data.get('date', '')
if not date_str or date_str in processed_dates:
continue
date = parse_date(date_str)
for field, value in rate_data.items():
if field not in ['date', 'id'] and value is not None:
rate_value = parse_numeric(value)
if rate_value != 0.0:
values.append((date, field, rate_value))
processed_dates.add(date_str)
except Exception as e:
logging.warning(f"Skipping invalid treasury rate data: {e}, Data: {rate_data}")
continue
if values:
execute_values(cursor, """
INSERT INTO treasury_rates (date, maturity, rate)
VALUES %s
ON CONFLICT (date, maturity)
DO UPDATE SET
rate = EXCLUDED.rate,
last_updated = CURRENT_TIMESTAMP
WHERE treasury_rates.rate IS DISTINCT FROM EXCLUDED.rate
""", values)
conn.commit()
logging.info(f"Saved {len(values)} treasury rates")
else:
logging.info("No new treasury rates to save")
except Exception as e:
conn.rollback()
logging.error(f"Error saving treasury rates: {e}")
finally:
if cursor:
cursor.close()
def save_economic_indicator(
conn: psycopg2.extensions.connection,
indicator_name: str,
data: Union[List[Dict], Dict]
) -> None:
"""Save economic indicators with batch processing."""
try:
cursor = conn.cursor()
processed_dates = set()
values = []
# Convert single dict to list
if isinstance(data, dict):
data = [data]
for item in data:
try:
date_str = item.get('date', '')
if not date_str or date_str in processed_dates:
continue
date = parse_date(date_str)
value = parse_numeric(item.get('value', 0))
if value != 0.0:
values.append((
date,
indicator_name,
value,
str(item.get('unit', '')),
str(item.get('frequency', '')),
str(item.get('country', 'US'))
))
processed_dates.add(date_str)
except Exception as e:
logging.warning(f"Skipping invalid indicator data: {e}, Data: {item}")
continue
if values:
execute_values(cursor, """
INSERT INTO economic_indicators
(date, indicator_name, value, unit, frequency, country)
VALUES %s
ON CONFLICT (date, indicator_name)
DO UPDATE SET
value = EXCLUDED.value,
unit = EXCLUDED.unit,
frequency = EXCLUDED.frequency,
country = EXCLUDED.country,
last_updated = CURRENT_TIMESTAMP
WHERE
economic_indicators.value IS DISTINCT FROM EXCLUDED.value
OR economic_indicators.unit IS DISTINCT FROM EXCLUDED.unit
OR economic_indicators.frequency IS DISTINCT FROM EXCLUDED.frequency
OR economic_indicators.country IS DISTINCT FROM EXCLUDED.country
""", values)
conn.commit()
logging.info(f"Saved {len(values)} {indicator_name} indicators")
else:
logging.info(f"No new data to save for {indicator_name}")
except Exception as e:
conn.rollback()
logging.error(f"Error saving economic indicator: {e}")
finally:
if cursor:
cursor.close()
def save_economic_calendar(conn: psycopg2.extensions.connection, events: List[Dict]) -> None:
"""Save economic calendar events with batch processing."""
try:
cursor = conn.cursor()
processed_events = set()
values = []
for event in events:
try:
date_str = event.get('date', '').split(' ')[0]
if not date_str:
continue
event_name = str(event.get('event', ''))
country = str(event.get('country', 'US'))
# Create unique key for event
event_key = (date_str, event_name, country)
if event_key in processed_events:
continue
date = parse_date(date_str)
actual = str(event.get('actual', ''))
previous = str(event.get('previous', ''))
values.append((
date,
event_name,
str(event.get('impact', '')),
actual if actual != 'None' else '',
previous if previous != 'None' else '',
country
))
processed_events.add(event_key)
except Exception as e:
logging.warning(f"Skipping invalid calendar event: {e}, Data: {event}")
continue
if values:
execute_values(cursor, """
INSERT INTO economic_calendar
(date, event, impact, actual, previous, country)
VALUES %s
ON CONFLICT (date, event, country)
DO UPDATE SET
impact = EXCLUDED.impact,
actual = EXCLUDED.actual,
previous = EXCLUDED.previous,
last_updated = CURRENT_TIMESTAMP
WHERE
economic_calendar.impact IS DISTINCT FROM EXCLUDED.impact
OR economic_calendar.actual IS DISTINCT FROM EXCLUDED.actual
OR economic_calendar.previous IS DISTINCT FROM EXCLUDED.previous
""", values)
conn.commit()
logging.info(f"Saved {len(values)} calendar events")
else:
logging.info("No new calendar events to save")
except Exception as e:
conn.rollback()
logging.error(f"Error saving economic calendar: {e}")
finally:
if cursor:
cursor.close()
def save_market_risk_premium(conn: psycopg2.extensions.connection, data: List[Dict]) -> None:
"""Save market risk premium data with batch processing."""
try:
cursor = conn.cursor()
values = []
current_date = datetime.now().date()
processed_countries = set()
for item in data:
try:
country = str(item.get('country', ''))
if not country or country in processed_countries:
continue
# Convert string values to float, handling None values
country_risk_premium = item.get('countryRiskPremium')
total_equity_risk_premium = item.get('totalEquityRiskPremium')
# Skip countries with no valid premium data
if country_risk_premium is None and total_equity_risk_premium is None:
continue
try:
crp = float(country_risk_premium) if country_risk_premium is not None else 0.0
terp = float(total_equity_risk_premium) if total_equity_risk_premium is not None else 0.0
except (ValueError, TypeError):
continue
if crp != 0.0 or terp != 0.0:
values.append((current_date, country, crp, terp))
processed_countries.add(country)
except Exception as e:
logging.warning(f"Skipping invalid risk premium data for {country}: {e}")
continue
if values:
execute_values(cursor, """
INSERT INTO market_risk_premium
(date, country, country_risk_premium, total_equity_risk_premium)
VALUES %s
ON CONFLICT (date, country)
DO UPDATE SET
country_risk_premium = EXCLUDED.country_risk_premium,
total_equity_risk_premium = EXCLUDED.total_equity_risk_premium,
last_updated = CURRENT_TIMESTAMP
WHERE
market_risk_premium.country_risk_premium IS DISTINCT FROM EXCLUDED.country_risk_premium
OR market_risk_premium.total_equity_risk_premium IS DISTINCT FROM EXCLUDED.total_equity_risk_premium
""", values)
conn.commit()
logging.info(f"Saved {len(values)} market risk premium records")
else:
logging.info("No valid market risk premium data to save")
except Exception as e:
conn.rollback()
logging.error(f"Error saving market risk premium: {e}")
finally:
if cursor:
cursor.close()