216 lines
7.0 KiB
Python
216 lines
7.0 KiB
Python
# ~/RivaCube/RivDroite/utils/stocks/utils.py
|
|
import logging
|
|
import psycopg2
|
|
from psycopg2.extensions import register_type, UNICODE, UNICODEARRAY
|
|
from typing import Optional, Dict, Any
|
|
|
|
def setup_database(conn: psycopg2.extensions.connection) -> bool:
|
|
cursor = None
|
|
try:
|
|
conn.set_client_encoding('UTF8')
|
|
cursor = conn.cursor()
|
|
register_type(UNICODE, cursor)
|
|
register_type(UNICODEARRAY, cursor)
|
|
|
|
tables_sql = [
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS sectors (
|
|
id SERIAL PRIMARY KEY,
|
|
name VARCHAR UNIQUE NOT NULL
|
|
);
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS zones (
|
|
id SERIAL PRIMARY KEY,
|
|
name VARCHAR UNIQUE NOT NULL
|
|
);
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS tickers (
|
|
id SERIAL PRIMARY KEY,
|
|
isin VARCHAR UNIQUE NOT NULL,
|
|
bbg_ticker VARCHAR,
|
|
yf_ticker VARCHAR,
|
|
name VARCHAR NOT NULL,
|
|
description TEXT,
|
|
hashtags VARCHAR,
|
|
sector_id INTEGER REFERENCES sectors(id),
|
|
gics VARCHAR,
|
|
zone_id INTEGER REFERENCES zones(id),
|
|
ccy VARCHAR
|
|
);
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS stocks (
|
|
id SERIAL PRIMARY KEY,
|
|
ticker_id INTEGER REFERENCES tickers(id),
|
|
date DATE NOT NULL,
|
|
open DECIMAL(19,4),
|
|
high DECIMAL(19,4),
|
|
low DECIMAL(19,4),
|
|
close DECIMAL(19,4),
|
|
adj_close DECIMAL(19,4),
|
|
volume BIGINT,
|
|
UNIQUE(ticker_id, date)
|
|
);
|
|
""",
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS invalid_tickers (
|
|
id SERIAL PRIMARY KEY,
|
|
ticker VARCHAR NOT NULL,
|
|
reason TEXT,
|
|
last_check TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
attempts INTEGER DEFAULT 1
|
|
);
|
|
""",
|
|
]
|
|
|
|
for sql in tables_sql:
|
|
cursor.execute(sql)
|
|
|
|
indexes_sql = """
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_stocks_date') THEN
|
|
CREATE INDEX idx_stocks_date ON stocks(date);
|
|
END IF;
|
|
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_stocks_ticker_date') THEN
|
|
CREATE INDEX idx_stocks_ticker_date ON stocks(ticker_id, date);
|
|
END IF;
|
|
IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tickers_isin') THEN
|
|
CREATE INDEX idx_tickers_isin ON tickers(isin);
|
|
END IF;
|
|
END$$;
|
|
"""
|
|
cursor.execute(indexes_sql)
|
|
|
|
conn.commit()
|
|
logging.info("Database structure setup completed successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error setting up database: {e}")
|
|
if conn:
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def add_sector(cursor: psycopg2.extensions.cursor, sector_name: str) -> int:
|
|
if not sector_name:
|
|
raise ValueError("Sector name cannot be empty")
|
|
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO sectors (name)
|
|
VALUES (%s)
|
|
ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name
|
|
RETURNING id
|
|
""",
|
|
(sector_name,)
|
|
)
|
|
return cursor.fetchone()[0]
|
|
|
|
def add_zone(cursor: psycopg2.extensions.cursor, zone_name: str) -> int:
|
|
if not zone_name:
|
|
raise ValueError("Zone name cannot be empty")
|
|
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO zones (name)
|
|
VALUES (%s)
|
|
ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name
|
|
RETURNING id
|
|
""",
|
|
(zone_name,)
|
|
)
|
|
return cursor.fetchone()[0]
|
|
|
|
def add_ticker(conn: psycopg2.extensions.connection,
|
|
isin: str,
|
|
bbg_ticker: Optional[str] = None,
|
|
yf_ticker: Optional[str] = None,
|
|
name: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
hashtags: Optional[str] = None,
|
|
sector_id: Optional[int] = None,
|
|
gics: Optional[str] = None,
|
|
zone_id: Optional[int] = None,
|
|
ccy: Optional[str] = None) -> int:
|
|
|
|
if not isin:
|
|
raise ValueError("ISIN cannot be empty")
|
|
|
|
cursor = None
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO tickers
|
|
(isin, bbg_ticker, yf_ticker, name, description, hashtags,
|
|
sector_id, gics, zone_id, ccy)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (isin) DO UPDATE SET
|
|
bbg_ticker = COALESCE(EXCLUDED.bbg_ticker, tickers.bbg_ticker),
|
|
yf_ticker = COALESCE(EXCLUDED.yf_ticker, tickers.yf_ticker),
|
|
name = COALESCE(EXCLUDED.name, tickers.name),
|
|
description = COALESCE(EXCLUDED.description, tickers.description),
|
|
hashtags = COALESCE(EXCLUDED.hashtags, tickers.hashtags),
|
|
sector_id = COALESCE(EXCLUDED.sector_id, tickers.sector_id),
|
|
gics = COALESCE(EXCLUDED.gics, tickers.gics),
|
|
zone_id = COALESCE(EXCLUDED.zone_id, tickers.zone_id),
|
|
ccy = COALESCE(EXCLUDED.ccy, tickers.ccy)
|
|
RETURNING id
|
|
""",
|
|
(isin, bbg_ticker, yf_ticker, name, description, hashtags,
|
|
sector_id, gics, zone_id, ccy)
|
|
)
|
|
|
|
ticker_id = cursor.fetchone()[0]
|
|
conn.commit()
|
|
return ticker_id
|
|
|
|
except Exception as e:
|
|
if conn:
|
|
conn.rollback()
|
|
raise e
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
|
|
def get_database_statistics(conn: psycopg2.extensions.connection) -> Dict[str, Any]:
|
|
cursor = None
|
|
try:
|
|
cursor = conn.cursor()
|
|
stats = {}
|
|
|
|
# Basic table counts
|
|
tables = ['sectors', 'zones', 'tickers', 'stocks']
|
|
for table in tables:
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
|
stats[f'{table}_count'] = cursor.fetchone()[0]
|
|
|
|
# Get latest stock date
|
|
cursor.execute("""
|
|
SELECT MAX(date) FROM stocks
|
|
""")
|
|
stats['latest_stock_date'] = cursor.fetchone()[0]
|
|
|
|
# Get number of tickers with recent updates
|
|
cursor.execute("""
|
|
SELECT COUNT(DISTINCT ticker_id)
|
|
FROM stocks
|
|
WHERE date >= CURRENT_DATE - INTERVAL '7 days'
|
|
""")
|
|
stats['recently_updated_tickers'] = cursor.fetchone()[0]
|
|
|
|
return stats
|
|
except Exception as e:
|
|
logging.error(f"Error getting database statistics: {e}")
|
|
return {}
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|