146 lines
4.7 KiB
Python
146 lines
4.7 KiB
Python
# modules/database.py
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
import pandas as pd
|
|
import logging
|
|
from typing import Optional, Dict, List, Any
|
|
from config.database_config import DB_CONFIG
|
|
import time
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DatabaseConnection:
|
|
def __init__(self):
|
|
"""Initialize database connection"""
|
|
self.conn = None
|
|
self.cursor = None
|
|
self._connect()
|
|
|
|
def _connect(self):
|
|
"""Establish database connection with retry mechanism"""
|
|
try:
|
|
if not self.conn or self.conn.closed:
|
|
self.conn = psycopg2.connect(**DB_CONFIG)
|
|
self.cursor = self.conn.cursor(cursor_factory=RealDictCursor)
|
|
# Set shorter statement timeout to prevent long-running queries
|
|
self.cursor.execute("SET statement_timeout = '30s';")
|
|
self.conn.commit()
|
|
except Exception as e:
|
|
logger.error(f"Database connection error: {e}")
|
|
self.conn = None
|
|
self.cursor = None
|
|
raise
|
|
|
|
def ensure_connection(self) -> bool:
|
|
"""Ensure database connection is active with retry mechanism"""
|
|
max_retries = 3
|
|
retry_count = 0
|
|
|
|
while retry_count < max_retries:
|
|
try:
|
|
if self.conn and not self.conn.closed and self.cursor:
|
|
# Test connection with a simple query
|
|
self.cursor.execute("SELECT 1")
|
|
return True
|
|
else:
|
|
self._connect()
|
|
return True
|
|
except Exception as e:
|
|
retry_count += 1
|
|
|
|
if retry_count < max_retries:
|
|
self.close() # Clean up any partial connections
|
|
time.sleep(1) # Wait before retry
|
|
|
|
return False
|
|
|
|
def execute_query(self, query: str, params: Optional[tuple] = None) -> pd.DataFrame:
|
|
"""Execute a SELECT query with retry mechanism"""
|
|
max_retries = 3
|
|
retry_count = 0
|
|
|
|
while retry_count < max_retries:
|
|
try:
|
|
if not self.ensure_connection():
|
|
raise Exception("Could not establish database connection")
|
|
|
|
# Execute query with timeout protection
|
|
self.cursor.execute(query, params)
|
|
results = self.cursor.fetchall()
|
|
|
|
# Convert to DataFrame
|
|
if results:
|
|
df = pd.DataFrame(results)
|
|
else:
|
|
df = pd.DataFrame()
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
retry_count += 1
|
|
|
|
if self.conn:
|
|
try:
|
|
self.conn.rollback()
|
|
except:
|
|
pass
|
|
|
|
if retry_count < max_retries:
|
|
time.sleep(1) # Wait before retry
|
|
else:
|
|
return pd.DataFrame()
|
|
|
|
def execute_update(self, query: str, params: Optional[tuple] = None) -> bool:
|
|
"""Execute UPDATE, INSERT, or DELETE query with retry mechanism"""
|
|
max_retries = 3
|
|
retry_count = 0
|
|
|
|
while retry_count < max_retries:
|
|
try:
|
|
if not self.ensure_connection():
|
|
raise Exception("Could not establish database connection")
|
|
|
|
self.cursor.execute(query, params)
|
|
self.conn.commit()
|
|
return True
|
|
|
|
except Exception as e:
|
|
retry_count += 1
|
|
|
|
if self.conn:
|
|
try:
|
|
self.conn.rollback()
|
|
except:
|
|
pass
|
|
|
|
if retry_count < max_retries:
|
|
time.sleep(1) # Wait before retry
|
|
else:
|
|
return False
|
|
|
|
def close(self):
|
|
"""Close database connection silently"""
|
|
try:
|
|
if self.cursor:
|
|
try:
|
|
self.cursor.close()
|
|
except:
|
|
pass
|
|
self.cursor = None
|
|
if self.conn and not self.conn.closed:
|
|
try:
|
|
self.conn.close()
|
|
except:
|
|
pass
|
|
self.conn = None
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
self.conn = None
|
|
self.cursor = None
|
|
|
|
def __del__(self):
|
|
"""Destructor to ensure connection is closed"""
|
|
self.close()
|