325 lines
12 KiB
Python
325 lines
12 KiB
Python
import logging
|
|
from datetime import datetime
|
|
import psycopg2
|
|
from typing import Dict, List, Optional, Union, Any
|
|
from psycopg2.extras import execute_values
|
|
from psycopg2.pool import SimpleConnectionPool
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
class DatabaseManager:
|
|
"""Manage database connections and operations."""
|
|
|
|
def __init__(self):
|
|
"""Initialize the database manager."""
|
|
load_dotenv()
|
|
|
|
self.pool = SimpleConnectionPool(
|
|
minconn=1,
|
|
maxconn=10,
|
|
dbname=os.getenv('DB_NAME'),
|
|
user=os.getenv('DB_USER'),
|
|
password=os.getenv('DB_PASSWORD'),
|
|
host=os.getenv('DB_HOST'),
|
|
port=os.getenv('DB_PORT'),
|
|
connect_timeout=10
|
|
)
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def get_connection(self) -> Optional[psycopg2.extensions.connection]:
|
|
"""Get a database connection from the pool."""
|
|
try:
|
|
return self.pool.getconn()
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting database connection: {e}")
|
|
return None
|
|
|
|
def return_connection(self, conn: psycopg2.extensions.connection) -> None:
|
|
"""Return a connection to the pool."""
|
|
try:
|
|
self.pool.putconn(conn)
|
|
except Exception as e:
|
|
self.logger.error(f"Error returning connection to pool: {e}")
|
|
|
|
def close_all(self) -> None:
|
|
"""Close all connections in the pool."""
|
|
try:
|
|
if self.pool:
|
|
self.pool.closeall()
|
|
except Exception as e:
|
|
self.logger.error(f"Error closing connection pool: {e}")
|
|
|
|
def parse_date(date_str: str) -> datetime.date:
|
|
"""Parse date string to datetime.date object."""
|
|
try:
|
|
if ' ' in date_str:
|
|
date_str = date_str.split(' ')[0]
|
|
return datetime.strptime(date_str, '%Y-%m-%d').date()
|
|
except Exception as e:
|
|
raise ValueError(f"Invalid date format: {date_str}") from e
|
|
|
|
def parse_numeric(value: Any) -> float:
|
|
"""Parse numeric value, handling various formats."""
|
|
if value is None:
|
|
return 0.0
|
|
if isinstance(value, (int, float)):
|
|
return float(value)
|
|
if isinstance(value, str):
|
|
try:
|
|
return float(value.replace(',', '').strip())
|
|
except ValueError:
|
|
return 0.0
|
|
return 0.0
|
|
|
|
def save_treasury_rates(conn: psycopg2.extensions.connection, rates: List[Dict]) -> None:
|
|
"""Save treasury rates with batch processing."""
|
|
try:
|
|
cursor = conn.cursor()
|
|
processed_dates = set()
|
|
values = []
|
|
|
|
for rate_data in rates:
|
|
try:
|
|
date_str = rate_data.get('date', '')
|
|
if not date_str or date_str in processed_dates:
|
|
continue
|
|
|
|
date = parse_date(date_str)
|
|
|
|
for field, value in rate_data.items():
|
|
if field not in ['date', 'id'] and value is not None:
|
|
rate_value = parse_numeric(value)
|
|
if rate_value != 0.0:
|
|
values.append((date, field, rate_value))
|
|
|
|
processed_dates.add(date_str)
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Skipping invalid treasury rate data: {e}, Data: {rate_data}")
|
|
continue
|
|
|
|
if values:
|
|
execute_values(cursor, """
|
|
INSERT INTO treasury_rates (date, maturity, rate)
|
|
VALUES %s
|
|
ON CONFLICT (date, maturity)
|
|
DO UPDATE SET
|
|
rate = EXCLUDED.rate,
|
|
last_updated = CURRENT_TIMESTAMP
|
|
WHERE treasury_rates.rate IS DISTINCT FROM EXCLUDED.rate
|
|
""", values)
|
|
|
|
conn.commit()
|
|
logging.info(f"Saved {len(values)} treasury rates")
|
|
else:
|
|
logging.info("No new treasury rates to save")
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error saving treasury rates: {e}")
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def save_economic_indicator(
|
|
conn: psycopg2.extensions.connection,
|
|
indicator_name: str,
|
|
data: Union[List[Dict], Dict]
|
|
) -> None:
|
|
"""Save economic indicators with batch processing."""
|
|
try:
|
|
cursor = conn.cursor()
|
|
processed_dates = set()
|
|
values = []
|
|
|
|
# Convert single dict to list
|
|
if isinstance(data, dict):
|
|
data = [data]
|
|
|
|
for item in data:
|
|
try:
|
|
date_str = item.get('date', '')
|
|
if not date_str or date_str in processed_dates:
|
|
continue
|
|
|
|
date = parse_date(date_str)
|
|
value = parse_numeric(item.get('value', 0))
|
|
|
|
if value != 0.0:
|
|
values.append((
|
|
date,
|
|
indicator_name,
|
|
value,
|
|
str(item.get('unit', '')),
|
|
str(item.get('frequency', '')),
|
|
str(item.get('country', 'US'))
|
|
))
|
|
processed_dates.add(date_str)
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Skipping invalid indicator data: {e}, Data: {item}")
|
|
continue
|
|
|
|
if values:
|
|
execute_values(cursor, """
|
|
INSERT INTO economic_indicators
|
|
(date, indicator_name, value, unit, frequency, country)
|
|
VALUES %s
|
|
ON CONFLICT (date, indicator_name)
|
|
DO UPDATE SET
|
|
value = EXCLUDED.value,
|
|
unit = EXCLUDED.unit,
|
|
frequency = EXCLUDED.frequency,
|
|
country = EXCLUDED.country,
|
|
last_updated = CURRENT_TIMESTAMP
|
|
WHERE
|
|
economic_indicators.value IS DISTINCT FROM EXCLUDED.value
|
|
OR economic_indicators.unit IS DISTINCT FROM EXCLUDED.unit
|
|
OR economic_indicators.frequency IS DISTINCT FROM EXCLUDED.frequency
|
|
OR economic_indicators.country IS DISTINCT FROM EXCLUDED.country
|
|
""", values)
|
|
|
|
conn.commit()
|
|
logging.info(f"Saved {len(values)} {indicator_name} indicators")
|
|
else:
|
|
logging.info(f"No new data to save for {indicator_name}")
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error saving economic indicator: {e}")
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def save_economic_calendar(conn: psycopg2.extensions.connection, events: List[Dict]) -> None:
|
|
"""Save economic calendar events with batch processing."""
|
|
try:
|
|
cursor = conn.cursor()
|
|
processed_events = set()
|
|
values = []
|
|
|
|
for event in events:
|
|
try:
|
|
date_str = event.get('date', '').split(' ')[0]
|
|
if not date_str:
|
|
continue
|
|
|
|
event_name = str(event.get('event', ''))
|
|
country = str(event.get('country', 'US'))
|
|
|
|
# Create unique key for event
|
|
event_key = (date_str, event_name, country)
|
|
if event_key in processed_events:
|
|
continue
|
|
|
|
date = parse_date(date_str)
|
|
actual = str(event.get('actual', ''))
|
|
previous = str(event.get('previous', ''))
|
|
|
|
values.append((
|
|
date,
|
|
event_name,
|
|
str(event.get('impact', '')),
|
|
actual if actual != 'None' else '',
|
|
previous if previous != 'None' else '',
|
|
country
|
|
))
|
|
processed_events.add(event_key)
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Skipping invalid calendar event: {e}, Data: {event}")
|
|
continue
|
|
|
|
if values:
|
|
execute_values(cursor, """
|
|
INSERT INTO economic_calendar
|
|
(date, event, impact, actual, previous, country)
|
|
VALUES %s
|
|
ON CONFLICT (date, event, country)
|
|
DO UPDATE SET
|
|
impact = EXCLUDED.impact,
|
|
actual = EXCLUDED.actual,
|
|
previous = EXCLUDED.previous,
|
|
last_updated = CURRENT_TIMESTAMP
|
|
WHERE
|
|
economic_calendar.impact IS DISTINCT FROM EXCLUDED.impact
|
|
OR economic_calendar.actual IS DISTINCT FROM EXCLUDED.actual
|
|
OR economic_calendar.previous IS DISTINCT FROM EXCLUDED.previous
|
|
""", values)
|
|
|
|
conn.commit()
|
|
logging.info(f"Saved {len(values)} calendar events")
|
|
else:
|
|
logging.info("No new calendar events to save")
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error saving economic calendar: {e}")
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def save_market_risk_premium(conn: psycopg2.extensions.connection, data: List[Dict]) -> None:
|
|
"""Save market risk premium data with batch processing."""
|
|
try:
|
|
cursor = conn.cursor()
|
|
values = []
|
|
current_date = datetime.now().date()
|
|
processed_countries = set()
|
|
|
|
for item in data:
|
|
try:
|
|
country = str(item.get('country', ''))
|
|
if not country or country in processed_countries:
|
|
continue
|
|
|
|
# Convert string values to float, handling None values
|
|
country_risk_premium = item.get('countryRiskPremium')
|
|
total_equity_risk_premium = item.get('totalEquityRiskPremium')
|
|
|
|
# Skip countries with no valid premium data
|
|
if country_risk_premium is None and total_equity_risk_premium is None:
|
|
continue
|
|
|
|
try:
|
|
crp = float(country_risk_premium) if country_risk_premium is not None else 0.0
|
|
terp = float(total_equity_risk_premium) if total_equity_risk_premium is not None else 0.0
|
|
except (ValueError, TypeError):
|
|
continue
|
|
|
|
if crp != 0.0 or terp != 0.0:
|
|
values.append((current_date, country, crp, terp))
|
|
processed_countries.add(country)
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Skipping invalid risk premium data for {country}: {e}")
|
|
continue
|
|
|
|
if values:
|
|
execute_values(cursor, """
|
|
INSERT INTO market_risk_premium
|
|
(date, country, country_risk_premium, total_equity_risk_premium)
|
|
VALUES %s
|
|
ON CONFLICT (date, country)
|
|
DO UPDATE SET
|
|
country_risk_premium = EXCLUDED.country_risk_premium,
|
|
total_equity_risk_premium = EXCLUDED.total_equity_risk_premium,
|
|
last_updated = CURRENT_TIMESTAMP
|
|
WHERE
|
|
market_risk_premium.country_risk_premium IS DISTINCT FROM EXCLUDED.country_risk_premium
|
|
OR market_risk_premium.total_equity_risk_premium IS DISTINCT FROM EXCLUDED.total_equity_risk_premium
|
|
""", values)
|
|
|
|
conn.commit()
|
|
logging.info(f"Saved {len(values)} market risk premium records")
|
|
else:
|
|
logging.info("No valid market risk premium data to save")
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logging.error(f"Error saving market risk premium: {e}")
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|