RivaCube/utils/Stocks/historical/utils.py
2025-02-04 19:31:18 +01:00

216 lines
7.0 KiB
Python

# ~/RivaCube/RivDroite/utils/stocks/utils.py
import logging
import psycopg2
from psycopg2.extensions import register_type, UNICODE, UNICODEARRAY
from typing import Optional, Dict, Any
def setup_database(conn: psycopg2.extensions.connection) -> bool:
cursor = None
try:
conn.set_client_encoding('UTF8')
cursor = conn.cursor()
register_type(UNICODE, cursor)
register_type(UNICODEARRAY, cursor)
tables_sql = [
"""
CREATE TABLE IF NOT EXISTS sectors (
id SERIAL PRIMARY KEY,
name VARCHAR UNIQUE NOT NULL
);
""",
"""
CREATE TABLE IF NOT EXISTS zones (
id SERIAL PRIMARY KEY,
name VARCHAR UNIQUE NOT NULL
);
""",
"""
CREATE TABLE IF NOT EXISTS tickers (
id SERIAL PRIMARY KEY,
isin VARCHAR UNIQUE NOT NULL,
bbg_ticker VARCHAR,
yf_ticker VARCHAR,
name VARCHAR NOT NULL,
description TEXT,
hashtags VARCHAR,
sector_id INTEGER REFERENCES sectors(id),
gics VARCHAR,
zone_id INTEGER REFERENCES zones(id),
ccy VARCHAR
);
""",
"""
CREATE TABLE IF NOT EXISTS stocks (
id SERIAL PRIMARY KEY,
ticker_id INTEGER REFERENCES tickers(id),
date DATE NOT NULL,
open DECIMAL(19,4),
high DECIMAL(19,4),
low DECIMAL(19,4),
close DECIMAL(19,4),
adj_close DECIMAL(19,4),
volume BIGINT,
UNIQUE(ticker_id, date)
);
""",
"""
CREATE TABLE IF NOT EXISTS invalid_tickers (
id SERIAL PRIMARY KEY,
ticker VARCHAR NOT NULL,
reason TEXT,
last_check TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
attempts INTEGER DEFAULT 1
);
""",
]
for sql in tables_sql:
cursor.execute(sql)
indexes_sql = """
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_stocks_date') THEN
CREATE INDEX idx_stocks_date ON stocks(date);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_stocks_ticker_date') THEN
CREATE INDEX idx_stocks_ticker_date ON stocks(ticker_id, date);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tickers_isin') THEN
CREATE INDEX idx_tickers_isin ON tickers(isin);
END IF;
END$$;
"""
cursor.execute(indexes_sql)
conn.commit()
logging.info("Database structure setup completed successfully")
return True
except Exception as e:
logging.error(f"Error setting up database: {e}")
if conn:
conn.rollback()
return False
finally:
if cursor:
cursor.close()
def add_sector(cursor: psycopg2.extensions.cursor, sector_name: str) -> int:
if not sector_name:
raise ValueError("Sector name cannot be empty")
cursor.execute(
"""
INSERT INTO sectors (name)
VALUES (%s)
ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name
RETURNING id
""",
(sector_name,)
)
return cursor.fetchone()[0]
def add_zone(cursor: psycopg2.extensions.cursor, zone_name: str) -> int:
if not zone_name:
raise ValueError("Zone name cannot be empty")
cursor.execute(
"""
INSERT INTO zones (name)
VALUES (%s)
ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name
RETURNING id
""",
(zone_name,)
)
return cursor.fetchone()[0]
def add_ticker(conn: psycopg2.extensions.connection,
isin: str,
bbg_ticker: Optional[str] = None,
yf_ticker: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
hashtags: Optional[str] = None,
sector_id: Optional[int] = None,
gics: Optional[str] = None,
zone_id: Optional[int] = None,
ccy: Optional[str] = None) -> int:
if not isin:
raise ValueError("ISIN cannot be empty")
cursor = None
try:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO tickers
(isin, bbg_ticker, yf_ticker, name, description, hashtags,
sector_id, gics, zone_id, ccy)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (isin) DO UPDATE SET
bbg_ticker = COALESCE(EXCLUDED.bbg_ticker, tickers.bbg_ticker),
yf_ticker = COALESCE(EXCLUDED.yf_ticker, tickers.yf_ticker),
name = COALESCE(EXCLUDED.name, tickers.name),
description = COALESCE(EXCLUDED.description, tickers.description),
hashtags = COALESCE(EXCLUDED.hashtags, tickers.hashtags),
sector_id = COALESCE(EXCLUDED.sector_id, tickers.sector_id),
gics = COALESCE(EXCLUDED.gics, tickers.gics),
zone_id = COALESCE(EXCLUDED.zone_id, tickers.zone_id),
ccy = COALESCE(EXCLUDED.ccy, tickers.ccy)
RETURNING id
""",
(isin, bbg_ticker, yf_ticker, name, description, hashtags,
sector_id, gics, zone_id, ccy)
)
ticker_id = cursor.fetchone()[0]
conn.commit()
return ticker_id
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
if cursor:
cursor.close()
def get_database_statistics(conn: psycopg2.extensions.connection) -> Dict[str, Any]:
cursor = None
try:
cursor = conn.cursor()
stats = {}
# Basic table counts
tables = ['sectors', 'zones', 'tickers', 'stocks']
for table in tables:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
stats[f'{table}_count'] = cursor.fetchone()[0]
# Get latest stock date
cursor.execute("""
SELECT MAX(date) FROM stocks
""")
stats['latest_stock_date'] = cursor.fetchone()[0]
# Get number of tickers with recent updates
cursor.execute("""
SELECT COUNT(DISTINCT ticker_id)
FROM stocks
WHERE date >= CURRENT_DATE - INTERVAL '7 days'
""")
stats['recently_updated_tickers'] = cursor.fetchone()[0]
return stats
except Exception as e:
logging.error(f"Error getting database statistics: {e}")
return {}
finally:
if cursor:
cursor.close()