# utils/news/utils.py import os import logging from datetime import datetime import psycopg2 from psycopg2 import pool from dotenv import load_dotenv from dateutil import parser as date_parser # Load environment variables and setup logging load_dotenv() logger = logging.getLogger(__name__) class DatabaseManager: def __init__(self): """Initialize database manager with connection pool""" self.db_config = { 'dbname': os.getenv('DB_NAME'), 'user': os.getenv('DB_USER'), 'password': os.getenv('DB_PASSWORD'), 'host': os.getenv('DB_HOST'), 'port': os.getenv('DB_PORT') } self.pool = None self.setup_connection_pool() def setup_connection_pool(self): """Setup the database connection pool""" try: self.pool = psycopg2.pool.SimpleConnectionPool(1, 10, **self.db_config) logger.info("Database connection pool created successfully.") except Exception as e: logger.error(f"Error creating connection pool: {str(e)}") raise def get_connection(self): """Get a connection from the pool""" return self.pool.getconn() def return_connection(self, conn): """Return a connection to the pool""" self.pool.putconn(conn) def close_all(self): """Close all connections in the pool""" if self.pool: self.pool.closeall() def ensure_table_exists(self): """Ensure the news table exists with correct schema""" conn = None cursor = None try: logger.info("Getting database connection...") conn = self.get_connection() logger.info("Creating cursor...") cursor = conn.cursor() # Check if table exists cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'news' ); """) table_exists = cursor.fetchone()[0] logger.info(f"Table exists check result: {table_exists}") if not table_exists: logger.info("Creating table...") cursor.execute(""" CREATE TABLE news ( id SERIAL PRIMARY KEY, title VARCHAR NOT NULL, url VARCHAR NOT NULL, date TIMESTAMP NOT NULL, source VARCHAR NOT NULL, content TEXT, sentiment_score DOUBLE PRECISION, sentiment VARCHAR, symbols TEXT[], symbol VARCHAR, processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, description TEXT, category VARCHAR(50) ); """) logger.info("Table created, creating indexes...") cursor.execute(""" CREATE INDEX idx_news_date ON news(date); CREATE INDEX idx_news_symbol ON news(symbol); CREATE UNIQUE INDEX news_url_key ON news(url); CREATE INDEX idx_news_category ON news(category); """) conn.commit() logger.info("News table created successfully.") else: logger.info("Table exists, verifying indexes...") # Verify indexes cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_news_date ON news(date); CREATE INDEX IF NOT EXISTS idx_news_symbol ON news(symbol); CREATE INDEX IF NOT EXISTS news_url_key ON news(url); CREATE INDEX IF NOT EXISTS idx_news_category ON news(category); """) conn.commit() logger.info("News table schema verified.") except Exception as e: if conn: conn.rollback() logger.error(f"Error ensuring table exists: {str(e)}") raise finally: if cursor: cursor.close() if conn: self.return_connection(conn) logger.info("Database resources cleaned up") def is_duplicate(self, title, url): """Check if a news item already exists""" conn = None cursor = None try: conn = self.get_connection() cursor = conn.cursor() cursor.execute(""" SELECT EXISTS( SELECT 1 FROM news WHERE title = %s AND url = %s ) """, (title, url)) return cursor.fetchone()[0] except Exception as e: logger.error(f"Error checking for duplicate: {str(e)}") return False finally: if cursor: cursor.close() if conn: self.return_connection(conn) def parse_date(date_str): """Parse date string using multiple formats""" if not date_str: return datetime.now() # Remove any extra whitespace date_str = str(date_str).strip() # Try common formats first for better performance date_formats = [ "%Y-%m-%dT%H:%M:%S.%fZ", # ISO format with milliseconds "%Y-%m-%dT%H:%M:%SZ", # ISO format without milliseconds "%Y-%m-%d %H:%M:%S", # Standard format "%a, %d %b %Y %H:%M:%S %z", # RFC 822 format "%a, %d %b %Y %H:%M:%S %Z", # Alternative RFC 822 "%Y-%m-%dT%H:%M:%S%z", # ISO format with timezone offset "%Y-%m-%dT%H:%M:%S%Z", # ISO format with timezone name "%Y-%m-%dT%H:%M:%S+00:00", # ISO format with explicit timezone "%Y-%m-%dT%H:%M:%S-08:00", # Pacific timezone format "%a, %d %b %Y %H:%M:%S GMT", # Common RSS format ] # Try explicit formats first for date_format in date_formats: try: return datetime.strptime(date_str, date_format) except (ValueError, TypeError): continue # If explicit formats fail, try dateutil parser try: return date_parser.parse(date_str) except Exception as e: logger.warning(f"Could not parse date '{date_str}': {str(e)}") return datetime.now() def setup_logging(): """Configure and return a logger instance""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) return logger