RivaCube/utils/news/utils.py
2025-02-04 19:31:18 +01:00

191 lines
6.6 KiB
Python

# utils/news/utils.py
import os
import logging
from datetime import datetime
import psycopg2
from psycopg2 import pool
from dotenv import load_dotenv
from dateutil import parser as date_parser
# Load environment variables and setup logging
load_dotenv()
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self):
"""Initialize database manager with connection pool"""
self.db_config = {
'dbname': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD'),
'host': os.getenv('DB_HOST'),
'port': os.getenv('DB_PORT')
}
self.pool = None
self.setup_connection_pool()
def setup_connection_pool(self):
"""Setup the database connection pool"""
try:
self.pool = psycopg2.pool.SimpleConnectionPool(1, 10, **self.db_config)
logger.info("Database connection pool created successfully.")
except Exception as e:
logger.error(f"Error creating connection pool: {str(e)}")
raise
def get_connection(self):
"""Get a connection from the pool"""
return self.pool.getconn()
def return_connection(self, conn):
"""Return a connection to the pool"""
self.pool.putconn(conn)
def close_all(self):
"""Close all connections in the pool"""
if self.pool:
self.pool.closeall()
def ensure_table_exists(self):
"""Ensure the news table exists with correct schema"""
conn = None
cursor = None
try:
logger.info("Getting database connection...")
conn = self.get_connection()
logger.info("Creating cursor...")
cursor = conn.cursor()
# Check if table exists
cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'news'
);
""")
table_exists = cursor.fetchone()[0]
logger.info(f"Table exists check result: {table_exists}")
if not table_exists:
logger.info("Creating table...")
cursor.execute("""
CREATE TABLE news (
id SERIAL PRIMARY KEY,
title VARCHAR NOT NULL,
url VARCHAR NOT NULL,
date TIMESTAMP NOT NULL,
source VARCHAR NOT NULL,
content TEXT,
sentiment_score DOUBLE PRECISION,
sentiment VARCHAR,
symbols TEXT[],
symbol VARCHAR,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
description TEXT,
category VARCHAR(50)
);
""")
logger.info("Table created, creating indexes...")
cursor.execute("""
CREATE INDEX idx_news_date ON news(date);
CREATE INDEX idx_news_symbol ON news(symbol);
CREATE UNIQUE INDEX news_url_key ON news(url);
CREATE INDEX idx_news_category ON news(category);
""")
conn.commit()
logger.info("News table created successfully.")
else:
logger.info("Table exists, verifying indexes...")
# Verify indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_news_date ON news(date);
CREATE INDEX IF NOT EXISTS idx_news_symbol ON news(symbol);
CREATE INDEX IF NOT EXISTS news_url_key ON news(url);
CREATE INDEX IF NOT EXISTS idx_news_category ON news(category);
""")
conn.commit()
logger.info("News table schema verified.")
except Exception as e:
if conn:
conn.rollback()
logger.error(f"Error ensuring table exists: {str(e)}")
raise
finally:
if cursor:
cursor.close()
if conn:
self.return_connection(conn)
logger.info("Database resources cleaned up")
def is_duplicate(self, title, url):
"""Check if a news item already exists"""
conn = None
cursor = None
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT EXISTS(
SELECT 1
FROM news
WHERE title = %s AND url = %s
)
""", (title, url))
return cursor.fetchone()[0]
except Exception as e:
logger.error(f"Error checking for duplicate: {str(e)}")
return False
finally:
if cursor:
cursor.close()
if conn:
self.return_connection(conn)
def parse_date(date_str):
"""Parse date string using multiple formats"""
if not date_str:
return datetime.now()
# Remove any extra whitespace
date_str = str(date_str).strip()
# Try common formats first for better performance
date_formats = [
"%Y-%m-%dT%H:%M:%S.%fZ", # ISO format with milliseconds
"%Y-%m-%dT%H:%M:%SZ", # ISO format without milliseconds
"%Y-%m-%d %H:%M:%S", # Standard format
"%a, %d %b %Y %H:%M:%S %z", # RFC 822 format
"%a, %d %b %Y %H:%M:%S %Z", # Alternative RFC 822
"%Y-%m-%dT%H:%M:%S%z", # ISO format with timezone offset
"%Y-%m-%dT%H:%M:%S%Z", # ISO format with timezone name
"%Y-%m-%dT%H:%M:%S+00:00", # ISO format with explicit timezone
"%Y-%m-%dT%H:%M:%S-08:00", # Pacific timezone format
"%a, %d %b %Y %H:%M:%S GMT", # Common RSS format
]
# Try explicit formats first
for date_format in date_formats:
try:
return datetime.strptime(date_str, date_format)
except (ValueError, TypeError):
continue
# If explicit formats fail, try dateutil parser
try:
return date_parser.parse(date_str)
except Exception as e:
logger.warning(f"Could not parse date '{date_str}': {str(e)}")
return datetime.now()
def setup_logging():
"""Configure and return a logger instance"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
return logger