# modules/database.py import psycopg2 from psycopg2.extras import RealDictCursor import pandas as pd import logging from typing import Optional, Dict, List, Any from config.database_config import DB_CONFIG import time logger = logging.getLogger(__name__) class DatabaseConnection: def __init__(self): """Initialize database connection""" self.conn = None self.cursor = None self._connect() def _connect(self): """Establish database connection with retry mechanism""" try: if not self.conn or self.conn.closed: self.conn = psycopg2.connect(**DB_CONFIG) self.cursor = self.conn.cursor(cursor_factory=RealDictCursor) # Set shorter statement timeout to prevent long-running queries self.cursor.execute("SET statement_timeout = '30s';") self.conn.commit() except Exception as e: logger.error(f"Database connection error: {e}") self.conn = None self.cursor = None raise def ensure_connection(self) -> bool: """Ensure database connection is active with retry mechanism""" max_retries = 3 retry_count = 0 while retry_count < max_retries: try: if self.conn and not self.conn.closed and self.cursor: # Test connection with a simple query self.cursor.execute("SELECT 1") return True else: self._connect() return True except Exception as e: retry_count += 1 if retry_count < max_retries: self.close() # Clean up any partial connections time.sleep(1) # Wait before retry return False def execute_query(self, query: str, params: Optional[tuple] = None) -> pd.DataFrame: """Execute a SELECT query with retry mechanism""" max_retries = 3 retry_count = 0 while retry_count < max_retries: try: if not self.ensure_connection(): raise Exception("Could not establish database connection") # Execute query with timeout protection self.cursor.execute(query, params) results = self.cursor.fetchall() # Convert to DataFrame if results: df = pd.DataFrame(results) else: df = pd.DataFrame() return df except Exception as e: retry_count += 1 if self.conn: try: self.conn.rollback() except: pass if retry_count < max_retries: time.sleep(1) # Wait before retry else: return pd.DataFrame() def execute_update(self, query: str, params: Optional[tuple] = None) -> bool: """Execute UPDATE, INSERT, or DELETE query with retry mechanism""" max_retries = 3 retry_count = 0 while retry_count < max_retries: try: if not self.ensure_connection(): raise Exception("Could not establish database connection") self.cursor.execute(query, params) self.conn.commit() return True except Exception as e: retry_count += 1 if self.conn: try: self.conn.rollback() except: pass if retry_count < max_retries: time.sleep(1) # Wait before retry else: return False def close(self): """Close database connection silently""" try: if self.cursor: try: self.cursor.close() except: pass self.cursor = None if self.conn and not self.conn.closed: try: self.conn.close() except: pass self.conn = None except Exception: pass finally: self.conn = None self.cursor = None def __del__(self): """Destructor to ensure connection is closed""" self.close()