# ~/RivaCube/RivDroite/utils/stocks/utils.py import logging import psycopg2 from psycopg2.extensions import register_type, UNICODE, UNICODEARRAY from typing import Optional, Dict, Any def setup_database(conn: psycopg2.extensions.connection) -> bool: cursor = None try: conn.set_client_encoding('UTF8') cursor = conn.cursor() register_type(UNICODE, cursor) register_type(UNICODEARRAY, cursor) tables_sql = [ """ CREATE TABLE IF NOT EXISTS sectors ( id SERIAL PRIMARY KEY, name VARCHAR UNIQUE NOT NULL ); """, """ CREATE TABLE IF NOT EXISTS zones ( id SERIAL PRIMARY KEY, name VARCHAR UNIQUE NOT NULL ); """, """ CREATE TABLE IF NOT EXISTS tickers ( id SERIAL PRIMARY KEY, isin VARCHAR UNIQUE NOT NULL, bbg_ticker VARCHAR, yf_ticker VARCHAR, name VARCHAR NOT NULL, description TEXT, hashtags VARCHAR, sector_id INTEGER REFERENCES sectors(id), gics VARCHAR, zone_id INTEGER REFERENCES zones(id), ccy VARCHAR ); """, """ CREATE TABLE IF NOT EXISTS stocks ( id SERIAL PRIMARY KEY, ticker_id INTEGER REFERENCES tickers(id), date DATE NOT NULL, open DECIMAL(19,4), high DECIMAL(19,4), low DECIMAL(19,4), close DECIMAL(19,4), adj_close DECIMAL(19,4), volume BIGINT, UNIQUE(ticker_id, date) ); """, """ CREATE TABLE IF NOT EXISTS invalid_tickers ( id SERIAL PRIMARY KEY, ticker VARCHAR NOT NULL, reason TEXT, last_check TIMESTAMP DEFAULT CURRENT_TIMESTAMP, attempts INTEGER DEFAULT 1 ); """, ] for sql in tables_sql: cursor.execute(sql) indexes_sql = """ DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_stocks_date') THEN CREATE INDEX idx_stocks_date ON stocks(date); END IF; IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_stocks_ticker_date') THEN CREATE INDEX idx_stocks_ticker_date ON stocks(ticker_id, date); END IF; IF NOT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_tickers_isin') THEN CREATE INDEX idx_tickers_isin ON tickers(isin); END IF; END$$; """ cursor.execute(indexes_sql) conn.commit() logging.info("Database structure setup completed successfully") return True except Exception as e: logging.error(f"Error setting up database: {e}") if conn: conn.rollback() return False finally: if cursor: cursor.close() def add_sector(cursor: psycopg2.extensions.cursor, sector_name: str) -> int: if not sector_name: raise ValueError("Sector name cannot be empty") cursor.execute( """ INSERT INTO sectors (name) VALUES (%s) ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name RETURNING id """, (sector_name,) ) return cursor.fetchone()[0] def add_zone(cursor: psycopg2.extensions.cursor, zone_name: str) -> int: if not zone_name: raise ValueError("Zone name cannot be empty") cursor.execute( """ INSERT INTO zones (name) VALUES (%s) ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name RETURNING id """, (zone_name,) ) return cursor.fetchone()[0] def add_ticker(conn: psycopg2.extensions.connection, isin: str, bbg_ticker: Optional[str] = None, yf_ticker: Optional[str] = None, name: Optional[str] = None, description: Optional[str] = None, hashtags: Optional[str] = None, sector_id: Optional[int] = None, gics: Optional[str] = None, zone_id: Optional[int] = None, ccy: Optional[str] = None) -> int: if not isin: raise ValueError("ISIN cannot be empty") cursor = None try: cursor = conn.cursor() cursor.execute( """ INSERT INTO tickers (isin, bbg_ticker, yf_ticker, name, description, hashtags, sector_id, gics, zone_id, ccy) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (isin) DO UPDATE SET bbg_ticker = COALESCE(EXCLUDED.bbg_ticker, tickers.bbg_ticker), yf_ticker = COALESCE(EXCLUDED.yf_ticker, tickers.yf_ticker), name = COALESCE(EXCLUDED.name, tickers.name), description = COALESCE(EXCLUDED.description, tickers.description), hashtags = COALESCE(EXCLUDED.hashtags, tickers.hashtags), sector_id = COALESCE(EXCLUDED.sector_id, tickers.sector_id), gics = COALESCE(EXCLUDED.gics, tickers.gics), zone_id = COALESCE(EXCLUDED.zone_id, tickers.zone_id), ccy = COALESCE(EXCLUDED.ccy, tickers.ccy) RETURNING id """, (isin, bbg_ticker, yf_ticker, name, description, hashtags, sector_id, gics, zone_id, ccy) ) ticker_id = cursor.fetchone()[0] conn.commit() return ticker_id except Exception as e: if conn: conn.rollback() raise e finally: if cursor: cursor.close() def get_database_statistics(conn: psycopg2.extensions.connection) -> Dict[str, Any]: cursor = None try: cursor = conn.cursor() stats = {} # Basic table counts tables = ['sectors', 'zones', 'tickers', 'stocks'] for table in tables: cursor.execute(f"SELECT COUNT(*) FROM {table}") stats[f'{table}_count'] = cursor.fetchone()[0] # Get latest stock date cursor.execute(""" SELECT MAX(date) FROM stocks """) stats['latest_stock_date'] = cursor.fetchone()[0] # Get number of tickers with recent updates cursor.execute(""" SELECT COUNT(DISTINCT ticker_id) FROM stocks WHERE date >= CURRENT_DATE - INTERVAL '7 days' """) stats['recently_updated_tickers'] = cursor.fetchone()[0] return stats except Exception as e: logging.error(f"Error getting database statistics: {e}") return {} finally: if cursor: cursor.close()