RivaTerminal/modules/database.py

146 lines
4.7 KiB
Python

# modules/database.py
import psycopg2
from psycopg2.extras import RealDictCursor
import pandas as pd
import logging
from typing import Optional, Dict, List, Any
from config.database_config import DB_CONFIG
import time
logger = logging.getLogger(__name__)
class DatabaseConnection:
def __init__(self):
"""Initialize database connection"""
self.conn = None
self.cursor = None
self._connect()
def _connect(self):
"""Establish database connection with retry mechanism"""
try:
if not self.conn or self.conn.closed:
self.conn = psycopg2.connect(**DB_CONFIG)
self.cursor = self.conn.cursor(cursor_factory=RealDictCursor)
# Set shorter statement timeout to prevent long-running queries
self.cursor.execute("SET statement_timeout = '30s';")
self.conn.commit()
except Exception as e:
logger.error(f"Database connection error: {e}")
self.conn = None
self.cursor = None
raise
def ensure_connection(self) -> bool:
"""Ensure database connection is active with retry mechanism"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
if self.conn and not self.conn.closed and self.cursor:
# Test connection with a simple query
self.cursor.execute("SELECT 1")
return True
else:
self._connect()
return True
except Exception as e:
retry_count += 1
if retry_count < max_retries:
self.close() # Clean up any partial connections
time.sleep(1) # Wait before retry
return False
def execute_query(self, query: str, params: Optional[tuple] = None) -> pd.DataFrame:
"""Execute a SELECT query with retry mechanism"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
if not self.ensure_connection():
raise Exception("Could not establish database connection")
# Execute query with timeout protection
self.cursor.execute(query, params)
results = self.cursor.fetchall()
# Convert to DataFrame
if results:
df = pd.DataFrame(results)
else:
df = pd.DataFrame()
return df
except Exception as e:
retry_count += 1
if self.conn:
try:
self.conn.rollback()
except:
pass
if retry_count < max_retries:
time.sleep(1) # Wait before retry
else:
return pd.DataFrame()
def execute_update(self, query: str, params: Optional[tuple] = None) -> bool:
"""Execute UPDATE, INSERT, or DELETE query with retry mechanism"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
if not self.ensure_connection():
raise Exception("Could not establish database connection")
self.cursor.execute(query, params)
self.conn.commit()
return True
except Exception as e:
retry_count += 1
if self.conn:
try:
self.conn.rollback()
except:
pass
if retry_count < max_retries:
time.sleep(1) # Wait before retry
else:
return False
def close(self):
"""Close database connection silently"""
try:
if self.cursor:
try:
self.cursor.close()
except:
pass
self.cursor = None
if self.conn and not self.conn.closed:
try:
self.conn.close()
except:
pass
self.conn = None
except Exception:
pass
finally:
self.conn = None
self.cursor = None
def __del__(self):
"""Destructor to ensure connection is closed"""
self.close()