RivaCube/utils/Indexes/utils.py
2025-02-04 19:31:18 +01:00

219 lines
7.2 KiB
Python

import logging
from datetime import datetime
import psycopg2
from typing import Dict, List, Any, Optional
from decimal import Decimal
def reset_and_setup_database(conn: psycopg2.extensions.connection) -> bool:
"""Reset and setup the database with proper schema."""
cursor = None
try:
cursor = conn.cursor()
# Drop existing tables in correct order
cursor.execute("""
DROP TABLE IF EXISTS market_indexes_prices CASCADE;
DROP TABLE IF EXISTS market_indexes_ref CASCADE;
""")
conn.commit()
logging.info("Existing tables dropped successfully")
# Create tables with proper schema
cursor.execute("""
CREATE TABLE market_indexes_ref (
id SERIAL PRIMARY KEY,
symbol TEXT UNIQUE NOT NULL,
description TEXT,
category TEXT,
last_update TIMESTAMP WITH TIME ZONE,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE market_indexes_prices (
index_id INTEGER REFERENCES market_indexes_ref(id),
date DATE NOT NULL,
open NUMERIC(19,4) NOT NULL,
high NUMERIC(19,4) NOT NULL,
low NUMERIC(19,4) NOT NULL,
close NUMERIC(19,4) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (index_id, date)
);
CREATE INDEX idx_market_indexes_prices_date ON market_indexes_prices(date);
CREATE INDEX idx_market_indexes_prices_index_date ON market_indexes_prices(index_id, date DESC);
""")
# Create update trigger for updated_at
cursor.execute("""
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_market_indexes_ref_updated_at
BEFORE UPDATE ON market_indexes_ref
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
""")
conn.commit()
logging.info("Database reset and setup completed successfully")
return True
except Exception as e:
logging.error(f"Error in reset_and_setup_database: {e}")
if conn:
conn.rollback()
return False
finally:
if cursor:
cursor.close()
def get_last_price_date(conn: psycopg2.extensions.connection, index_id: int) -> Optional[datetime.date]:
"""Get the last available price date for an index."""
cursor = None
try:
cursor = conn.cursor()
cursor.execute("""
SELECT MAX(date)
FROM market_indexes_prices
WHERE index_id = %s
""", (index_id,))
result = cursor.fetchone()
return result[0] if result and result[0] else None
except Exception as e:
logging.error(f"Error getting last price date for index {index_id}: {e}")
return None
finally:
if cursor:
cursor.close()
def get_index_statistics(conn: psycopg2.extensions.connection) -> Dict[str, Any]:
"""Get detailed statistics about index data."""
cursor = None
try:
cursor = conn.cursor()
stats = {}
# Get index counts by category
cursor.execute("""
SELECT
category,
COUNT(*) as total,
COUNT(CASE WHEN is_active THEN 1 END) as active
FROM market_indexes_ref
GROUP BY category
ORDER BY category
""")
stats['categories'] = [
{'category': row[0] or 'Uncategorized',
'total': row[1],
'active': row[2]}
for row in cursor.fetchall()
]
# Get price statistics
cursor.execute("""
SELECT
COUNT(*) as total_prices,
COUNT(DISTINCT index_id) as indexes_with_prices,
MIN(date) as earliest_date,
MAX(date) as latest_date
FROM market_indexes_prices
""")
row = cursor.fetchone()
if row:
stats.update({
'total_prices': row[0],
'indexes_with_prices': row[1],
'earliest_date': row[2],
'latest_date': row[3]
})
# Get data coverage by index
cursor.execute("""
SELECT
mir.symbol,
mir.category,
MIN(mip.date) as first_date,
MAX(mip.date) as last_date,
COUNT(*) as records,
mir.is_active
FROM market_indexes_ref mir
LEFT JOIN market_indexes_prices mip ON mir.id = mip.index_id
GROUP BY mir.id, mir.symbol, mir.category, mir.is_active
ORDER BY mir.category, mir.symbol
""")
stats['coverage'] = [
{
'symbol': row[0],
'category': row[1],
'first_date': row[2],
'last_date': row[3],
'records': row[4],
'is_active': row[5]
}
for row in cursor.fetchall()
]
return stats
except Exception as e:
logging.error(f"Error getting index statistics: {e}")
return {}
finally:
if cursor:
cursor.close()
def verify_data_integrity(conn: psycopg2.extensions.connection) -> Dict[str, Any]:
"""Verify data integrity and return any issues found."""
cursor = None
try:
cursor = conn.cursor()
issues = {
'duplicate_dates': [],
'gaps': [],
'anomalies': []
}
# Check for duplicate dates
cursor.execute("""
SELECT mir.symbol, mip.date, COUNT(*)
FROM market_indexes_prices mip
JOIN market_indexes_ref mir ON mir.id = mip.index_id
GROUP BY mir.symbol, mip.date
HAVING COUNT(*) > 1
""")
issues['duplicate_dates'] = [
{'symbol': row[0], 'date': row[1], 'count': row[2]}
for row in cursor.fetchall()
]
# Check for price anomalies (e.g., zero prices)
cursor.execute("""
SELECT mir.symbol, mip.date, mip.close
FROM market_indexes_prices mip
JOIN market_indexes_ref mir ON mir.id = mip.index_id
WHERE mip.open <= 0 OR mip.high <= 0 OR mip.low <= 0 OR mip.close <= 0
ORDER BY mir.symbol, mip.date
""")
issues['anomalies'] = [
{'symbol': row[0], 'date': row[1], 'close': row[2]}
for row in cursor.fetchall()
]
return issues
except Exception as e:
logging.error(f"Error verifying data integrity: {e}")
return {}
finally:
if cursor:
cursor.close()