219 lines
7.2 KiB
Python
219 lines
7.2 KiB
Python
import logging
|
|
from datetime import datetime
|
|
import psycopg2
|
|
from typing import Dict, List, Any, Optional
|
|
from decimal import Decimal
|
|
|
|
def reset_and_setup_database(conn: psycopg2.extensions.connection) -> bool:
|
|
"""Reset and setup the database with proper schema."""
|
|
cursor = None
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Drop existing tables in correct order
|
|
cursor.execute("""
|
|
DROP TABLE IF EXISTS market_indexes_prices CASCADE;
|
|
DROP TABLE IF EXISTS market_indexes_ref CASCADE;
|
|
""")
|
|
conn.commit()
|
|
logging.info("Existing tables dropped successfully")
|
|
|
|
# Create tables with proper schema
|
|
cursor.execute("""
|
|
CREATE TABLE market_indexes_ref (
|
|
id SERIAL PRIMARY KEY,
|
|
symbol TEXT UNIQUE NOT NULL,
|
|
description TEXT,
|
|
category TEXT,
|
|
last_update TIMESTAMP WITH TIME ZONE,
|
|
is_active BOOLEAN DEFAULT true,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE market_indexes_prices (
|
|
index_id INTEGER REFERENCES market_indexes_ref(id),
|
|
date DATE NOT NULL,
|
|
open NUMERIC(19,4) NOT NULL,
|
|
high NUMERIC(19,4) NOT NULL,
|
|
low NUMERIC(19,4) NOT NULL,
|
|
close NUMERIC(19,4) NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (index_id, date)
|
|
);
|
|
|
|
CREATE INDEX idx_market_indexes_prices_date ON market_indexes_prices(date);
|
|
CREATE INDEX idx_market_indexes_prices_index_date ON market_indexes_prices(index_id, date DESC);
|
|
""")
|
|
|
|
# Create update trigger for updated_at
|
|
cursor.execute("""
|
|
CREATE OR REPLACE FUNCTION update_updated_at_column()
|
|
RETURNS TRIGGER AS $$
|
|
BEGIN
|
|
NEW.updated_at = CURRENT_TIMESTAMP;
|
|
RETURN NEW;
|
|
END;
|
|
$$ language 'plpgsql';
|
|
|
|
CREATE TRIGGER update_market_indexes_ref_updated_at
|
|
BEFORE UPDATE ON market_indexes_ref
|
|
FOR EACH ROW
|
|
EXECUTE FUNCTION update_updated_at_column();
|
|
""")
|
|
|
|
conn.commit()
|
|
logging.info("Database reset and setup completed successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in reset_and_setup_database: {e}")
|
|
if conn:
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def get_last_price_date(conn: psycopg2.extensions.connection, index_id: int) -> Optional[datetime.date]:
|
|
"""Get the last available price date for an index."""
|
|
cursor = None
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT MAX(date)
|
|
FROM market_indexes_prices
|
|
WHERE index_id = %s
|
|
""", (index_id,))
|
|
result = cursor.fetchone()
|
|
return result[0] if result and result[0] else None
|
|
except Exception as e:
|
|
logging.error(f"Error getting last price date for index {index_id}: {e}")
|
|
return None
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def get_index_statistics(conn: psycopg2.extensions.connection) -> Dict[str, Any]:
|
|
"""Get detailed statistics about index data."""
|
|
cursor = None
|
|
try:
|
|
cursor = conn.cursor()
|
|
stats = {}
|
|
|
|
# Get index counts by category
|
|
cursor.execute("""
|
|
SELECT
|
|
category,
|
|
COUNT(*) as total,
|
|
COUNT(CASE WHEN is_active THEN 1 END) as active
|
|
FROM market_indexes_ref
|
|
GROUP BY category
|
|
ORDER BY category
|
|
""")
|
|
stats['categories'] = [
|
|
{'category': row[0] or 'Uncategorized',
|
|
'total': row[1],
|
|
'active': row[2]}
|
|
for row in cursor.fetchall()
|
|
]
|
|
|
|
# Get price statistics
|
|
cursor.execute("""
|
|
SELECT
|
|
COUNT(*) as total_prices,
|
|
COUNT(DISTINCT index_id) as indexes_with_prices,
|
|
MIN(date) as earliest_date,
|
|
MAX(date) as latest_date
|
|
FROM market_indexes_prices
|
|
""")
|
|
row = cursor.fetchone()
|
|
if row:
|
|
stats.update({
|
|
'total_prices': row[0],
|
|
'indexes_with_prices': row[1],
|
|
'earliest_date': row[2],
|
|
'latest_date': row[3]
|
|
})
|
|
|
|
# Get data coverage by index
|
|
cursor.execute("""
|
|
SELECT
|
|
mir.symbol,
|
|
mir.category,
|
|
MIN(mip.date) as first_date,
|
|
MAX(mip.date) as last_date,
|
|
COUNT(*) as records,
|
|
mir.is_active
|
|
FROM market_indexes_ref mir
|
|
LEFT JOIN market_indexes_prices mip ON mir.id = mip.index_id
|
|
GROUP BY mir.id, mir.symbol, mir.category, mir.is_active
|
|
ORDER BY mir.category, mir.symbol
|
|
""")
|
|
stats['coverage'] = [
|
|
{
|
|
'symbol': row[0],
|
|
'category': row[1],
|
|
'first_date': row[2],
|
|
'last_date': row[3],
|
|
'records': row[4],
|
|
'is_active': row[5]
|
|
}
|
|
for row in cursor.fetchall()
|
|
]
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting index statistics: {e}")
|
|
return {}
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def verify_data_integrity(conn: psycopg2.extensions.connection) -> Dict[str, Any]:
|
|
"""Verify data integrity and return any issues found."""
|
|
cursor = None
|
|
try:
|
|
cursor = conn.cursor()
|
|
issues = {
|
|
'duplicate_dates': [],
|
|
'gaps': [],
|
|
'anomalies': []
|
|
}
|
|
|
|
# Check for duplicate dates
|
|
cursor.execute("""
|
|
SELECT mir.symbol, mip.date, COUNT(*)
|
|
FROM market_indexes_prices mip
|
|
JOIN market_indexes_ref mir ON mir.id = mip.index_id
|
|
GROUP BY mir.symbol, mip.date
|
|
HAVING COUNT(*) > 1
|
|
""")
|
|
issues['duplicate_dates'] = [
|
|
{'symbol': row[0], 'date': row[1], 'count': row[2]}
|
|
for row in cursor.fetchall()
|
|
]
|
|
|
|
# Check for price anomalies (e.g., zero prices)
|
|
cursor.execute("""
|
|
SELECT mir.symbol, mip.date, mip.close
|
|
FROM market_indexes_prices mip
|
|
JOIN market_indexes_ref mir ON mir.id = mip.index_id
|
|
WHERE mip.open <= 0 OR mip.high <= 0 OR mip.low <= 0 OR mip.close <= 0
|
|
ORDER BY mir.symbol, mip.date
|
|
""")
|
|
issues['anomalies'] = [
|
|
{'symbol': row[0], 'date': row[1], 'close': row[2]}
|
|
for row in cursor.fetchall()
|
|
]
|
|
|
|
return issues
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error verifying data integrity: {e}")
|
|
return {}
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|