import logging import psycopg2 import psycopg2.extensions def verify_technical_tables(connection: psycopg2.extensions.connection) -> None: """Verify and create necessary tables.""" cursor = None try: cursor = connection.cursor() logging.info("Starting table verification...") # Check tickers table cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'tickers' ); """) if not cursor.fetchone()[0]: raise Exception("Tickers table must exist") logging.info("Tickers table exists") # Create technical_indicators table cursor.execute(""" CREATE TABLE IF NOT EXISTS technical_indicators ( id SERIAL PRIMARY KEY, ticker_id INTEGER REFERENCES tickers(id), datetime TIMESTAMP NOT NULL, indicator_type VARCHAR(20) NOT NULL, period INTEGER NOT NULL, value NUMERIC(20,5) NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, UNIQUE (ticker_id, datetime, indicator_type, period) ); """) logging.info("Technical indicators table verified") connection.commit() logging.info("Technical tables verified successfully") except Exception as e: if cursor and not cursor.closed: connection.rollback() logging.error(f"Error verifying tables: {str(e)}") raise finally: if cursor and not cursor.closed: cursor.close() def setup_database_indices(connection: psycopg2.extensions.connection) -> None: """Create necessary database indices.""" cursor = None try: cursor = connection.cursor() logging.info("Starting index verification...") # First check existing indices cursor.execute(""" SELECT indexname FROM pg_indexes WHERE tablename = 'technical_indicators' """) existing_indices = [row[0] for row in cursor.fetchall()] logging.info(f"Found existing indices: {existing_indices}") indices = { 'idx_tech_ticker_datetime': "CREATE INDEX IF NOT EXISTS idx_tech_ticker_datetime ON technical_indicators(ticker_id, datetime)", 'idx_tech_type_period': "CREATE INDEX IF NOT EXISTS idx_tech_type_period ON technical_indicators(indicator_type, period)", 'idx_tickers_last_checked': "CREATE INDEX IF NOT EXISTS idx_tickers_last_checked ON tickers(last_checked_at)" } for index_name, index_sql in indices.items(): try: logging.info(f"Verifying index: {index_name}") cursor.execute(index_sql) connection.commit() logging.info(f"Index {index_name} verified successfully") except Exception as e: connection.rollback() logging.error(f"Error creating index {index_name}: {str(e)}") raise logging.info("All database indices verified successfully") except Exception as e: if cursor and not cursor.closed: connection.rollback() logging.error(f"Error setting up indices: {str(e)}") raise finally: if cursor and not cursor.closed: cursor.close() def check_database_status(connection: psycopg2.extensions.connection) -> None: """Check database status and active queries.""" cursor = None try: cursor = connection.cursor() # Check active queries cursor.execute(""" SELECT pid, state, wait_event_type, query_start, query FROM pg_stat_activity WHERE state != 'idle' AND pid != pg_backend_pid() """) active_queries = cursor.fetchall() if active_queries: logging.info("Active queries found:") for query in active_queries: logging.info(f"PID: {query[0]}, State: {query[1]}, Query: {query[4][:100]}...") else: logging.info("No active queries found") except Exception as e: logging.error(f"Error checking database status: {str(e)}") finally: if cursor and not cursor.closed: cursor.close()