import logging from datetime import datetime import psycopg2 from typing import Dict, List, Any, Optional from decimal import Decimal def reset_and_setup_database(conn: psycopg2.extensions.connection) -> bool: """Reset and setup the database with proper schema.""" cursor = None try: cursor = conn.cursor() # Drop existing tables in correct order cursor.execute(""" DROP TABLE IF EXISTS market_indexes_prices CASCADE; DROP TABLE IF EXISTS market_indexes_ref CASCADE; """) conn.commit() logging.info("Existing tables dropped successfully") # Create tables with proper schema cursor.execute(""" CREATE TABLE market_indexes_ref ( id SERIAL PRIMARY KEY, symbol TEXT UNIQUE NOT NULL, description TEXT, category TEXT, last_update TIMESTAMP WITH TIME ZONE, is_active BOOLEAN DEFAULT true, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE market_indexes_prices ( index_id INTEGER REFERENCES market_indexes_ref(id), date DATE NOT NULL, open NUMERIC(19,4) NOT NULL, high NUMERIC(19,4) NOT NULL, low NUMERIC(19,4) NOT NULL, close NUMERIC(19,4) NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (index_id, date) ); CREATE INDEX idx_market_indexes_prices_date ON market_indexes_prices(date); CREATE INDEX idx_market_indexes_prices_index_date ON market_indexes_prices(index_id, date DESC); """) # Create update trigger for updated_at cursor.execute(""" CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = CURRENT_TIMESTAMP; RETURN NEW; END; $$ language 'plpgsql'; CREATE TRIGGER update_market_indexes_ref_updated_at BEFORE UPDATE ON market_indexes_ref FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); """) conn.commit() logging.info("Database reset and setup completed successfully") return True except Exception as e: logging.error(f"Error in reset_and_setup_database: {e}") if conn: conn.rollback() return False finally: if cursor: cursor.close() def get_last_price_date(conn: psycopg2.extensions.connection, index_id: int) -> Optional[datetime.date]: """Get the last available price date for an index.""" cursor = None try: cursor = conn.cursor() cursor.execute(""" SELECT MAX(date) FROM market_indexes_prices WHERE index_id = %s """, (index_id,)) result = cursor.fetchone() return result[0] if result and result[0] else None except Exception as e: logging.error(f"Error getting last price date for index {index_id}: {e}") return None finally: if cursor: cursor.close() def get_index_statistics(conn: psycopg2.extensions.connection) -> Dict[str, Any]: """Get detailed statistics about index data.""" cursor = None try: cursor = conn.cursor() stats = {} # Get index counts by category cursor.execute(""" SELECT category, COUNT(*) as total, COUNT(CASE WHEN is_active THEN 1 END) as active FROM market_indexes_ref GROUP BY category ORDER BY category """) stats['categories'] = [ {'category': row[0] or 'Uncategorized', 'total': row[1], 'active': row[2]} for row in cursor.fetchall() ] # Get price statistics cursor.execute(""" SELECT COUNT(*) as total_prices, COUNT(DISTINCT index_id) as indexes_with_prices, MIN(date) as earliest_date, MAX(date) as latest_date FROM market_indexes_prices """) row = cursor.fetchone() if row: stats.update({ 'total_prices': row[0], 'indexes_with_prices': row[1], 'earliest_date': row[2], 'latest_date': row[3] }) # Get data coverage by index cursor.execute(""" SELECT mir.symbol, mir.category, MIN(mip.date) as first_date, MAX(mip.date) as last_date, COUNT(*) as records, mir.is_active FROM market_indexes_ref mir LEFT JOIN market_indexes_prices mip ON mir.id = mip.index_id GROUP BY mir.id, mir.symbol, mir.category, mir.is_active ORDER BY mir.category, mir.symbol """) stats['coverage'] = [ { 'symbol': row[0], 'category': row[1], 'first_date': row[2], 'last_date': row[3], 'records': row[4], 'is_active': row[5] } for row in cursor.fetchall() ] return stats except Exception as e: logging.error(f"Error getting index statistics: {e}") return {} finally: if cursor: cursor.close() def verify_data_integrity(conn: psycopg2.extensions.connection) -> Dict[str, Any]: """Verify data integrity and return any issues found.""" cursor = None try: cursor = conn.cursor() issues = { 'duplicate_dates': [], 'gaps': [], 'anomalies': [] } # Check for duplicate dates cursor.execute(""" SELECT mir.symbol, mip.date, COUNT(*) FROM market_indexes_prices mip JOIN market_indexes_ref mir ON mir.id = mip.index_id GROUP BY mir.symbol, mip.date HAVING COUNT(*) > 1 """) issues['duplicate_dates'] = [ {'symbol': row[0], 'date': row[1], 'count': row[2]} for row in cursor.fetchall() ] # Check for price anomalies (e.g., zero prices) cursor.execute(""" SELECT mir.symbol, mip.date, mip.close FROM market_indexes_prices mip JOIN market_indexes_ref mir ON mir.id = mip.index_id WHERE mip.open <= 0 OR mip.high <= 0 OR mip.low <= 0 OR mip.close <= 0 ORDER BY mir.symbol, mip.date """) issues['anomalies'] = [ {'symbol': row[0], 'date': row[1], 'close': row[2]} for row in cursor.fetchall() ] return issues except Exception as e: logging.error(f"Error verifying data integrity: {e}") return {} finally: if cursor: cursor.close()