RivaCube/utils/Stocks/valuations/utils.py
2025-02-04 19:31:18 +01:00

211 lines
7.1 KiB
Python

import os
import logging
import json
from datetime import datetime
import psycopg2
from typing import Dict, Optional, List
from decimal import Decimal, InvalidOperation
def setup_logging():
"""Set up logging configuration."""
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f'valuations_update_{datetime.now():%Y%m%d_%H%M%S}.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
def init_database(conn: psycopg2.extensions.connection) -> None:
"""Initialize database tables if they don't exist."""
cursor = conn.cursor()
try:
# Create dcf_valuations table
cursor.execute("""
CREATE TABLE IF NOT EXISTS dcf_valuations (
id SERIAL PRIMARY KEY,
ticker_id INTEGER NOT NULL,
date DATE NOT NULL,
dcf_type VARCHAR(50) NOT NULL,
stock_price DECIMAL(15, 4),
dcf_price DECIMAL(15, 4),
fair_value DECIMAL(15, 4),
upside_downside DECIMAL(15, 4),
assumptions JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE (ticker_id, date, dcf_type)
);
CREATE INDEX IF NOT EXISTS idx_dcf_valuations_ticker_date
ON dcf_valuations (ticker_id, date);
""")
# Create company_ratings table
cursor.execute("""
CREATE TABLE IF NOT EXISTS company_ratings (
id SERIAL PRIMARY KEY,
ticker_id INTEGER NOT NULL,
date DATE NOT NULL,
rating VARCHAR(50),
rating_score DECIMAL(10, 4),
rating_recommendation VARCHAR(50),
rating_details JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE (ticker_id, date)
);
CREATE INDEX IF NOT EXISTS idx_company_ratings_ticker_date
ON company_ratings (ticker_id, date);
""")
conn.commit()
logging.info("Database tables initialized successfully")
except Exception as e:
conn.rollback()
logging.error(f"Error initializing database tables: {e}")
raise
finally:
cursor.close()
def parse_date(date_str: Optional[str]) -> datetime.date:
"""Parse date string with multiple format support."""
if not date_str:
return datetime.now().date()
date_formats = [
'%Y-%m-%d',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S.%fZ',
'%Y-%m-%dT%H:%M:%SZ'
]
for date_format in date_formats:
try:
return datetime.strptime(date_str, date_format).date()
except ValueError:
continue
return datetime.now().date()
def convert_to_decimal(value: any) -> Optional[Decimal]:
"""Convert value to Decimal with proper handling of various input types."""
if value is None:
return None
try:
if isinstance(value, (int, float)):
return Decimal(str(value))
elif isinstance(value, str):
return Decimal(value.replace(',', ''))
return None
except (InvalidOperation, TypeError):
return None
def save_dcf_valuation(
conn: psycopg2.extensions.connection,
ticker_id: int,
dcf_type: str,
valuation: Dict
) -> None:
"""Save DCF valuation with proper data validation and error handling."""
try:
cursor = conn.cursor()
date = parse_date(valuation.get('date'))
# Convert numeric values to Decimal
stock_price = convert_to_decimal(valuation.get('stockPrice'))
dcf_price = convert_to_decimal(valuation.get('dcfValue', valuation.get('dcfPrice')))
fair_value = convert_to_decimal(valuation.get('fairValue'))
upside_downside = convert_to_decimal(valuation.get('upsideDownside'))
# Validate required fields
if not all([date, dcf_type]):
logging.error(f"Missing required fields for DCF valuation: ticker_id={ticker_id}")
return
cursor.execute("""
INSERT INTO dcf_valuations
(ticker_id, date, dcf_type, stock_price, dcf_price, fair_value,
upside_downside, assumptions)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (ticker_id, date, dcf_type)
DO UPDATE SET
stock_price = EXCLUDED.stock_price,
dcf_price = EXCLUDED.dcf_price,
fair_value = EXCLUDED.fair_value,
upside_downside = EXCLUDED.upside_downside,
assumptions = EXCLUDED.assumptions,
updated_at = CURRENT_TIMESTAMP
""", (
ticker_id,
date,
dcf_type,
stock_price,
dcf_price,
fair_value,
upside_downside,
json.dumps(valuation.get('assumptions', {}))
))
conn.commit()
logging.debug(f"Saved {dcf_type} DCF valuation for {ticker_id} on {date}")
except Exception as e:
conn.rollback()
logging.error(f"Error saving DCF valuation: {e}")
logging.debug(f"Failed valuation data: {valuation}")
finally:
cursor.close()
def save_company_rating(
conn: psycopg2.extensions.connection,
ticker_id: int,
rating_data: Dict
) -> None:
"""Save company rating with proper data validation and error handling."""
try:
cursor = conn.cursor()
date = parse_date(rating_data.get('date'))
# Validate required fields
if not all([date, rating_data.get('rating')]):
logging.error(f"Missing required fields for company rating: ticker_id={ticker_id}")
return
cursor.execute("""
INSERT INTO company_ratings
(ticker_id, date, rating, rating_score, rating_recommendation, rating_details)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (ticker_id, date)
DO UPDATE SET
rating = EXCLUDED.rating,
rating_score = EXCLUDED.rating_score,
rating_recommendation = EXCLUDED.rating_recommendation,
rating_details = EXCLUDED.rating_details,
updated_at = CURRENT_TIMESTAMP
""", (
ticker_id,
date,
rating_data.get('rating'),
convert_to_decimal(rating_data.get('ratingScore')),
rating_data.get('recommendation'),
json.dumps(rating_data)
))
conn.commit()
logging.debug(f"Saved company rating for {ticker_id} on {date}")
except Exception as e:
conn.rollback()
logging.error(f"Error saving company rating: {e}")
logging.debug(f"Failed rating data: {rating_data}")
finally:
cursor.close()