191 lines
6.6 KiB
Python
191 lines
6.6 KiB
Python
# utils/news/utils.py
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
import psycopg2
|
|
from psycopg2 import pool
|
|
from dotenv import load_dotenv
|
|
from dateutil import parser as date_parser
|
|
|
|
# Load environment variables and setup logging
|
|
load_dotenv()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DatabaseManager:
|
|
def __init__(self):
|
|
"""Initialize database manager with connection pool"""
|
|
self.db_config = {
|
|
'dbname': os.getenv('DB_NAME'),
|
|
'user': os.getenv('DB_USER'),
|
|
'password': os.getenv('DB_PASSWORD'),
|
|
'host': os.getenv('DB_HOST'),
|
|
'port': os.getenv('DB_PORT')
|
|
}
|
|
self.pool = None
|
|
self.setup_connection_pool()
|
|
|
|
def setup_connection_pool(self):
|
|
"""Setup the database connection pool"""
|
|
try:
|
|
self.pool = psycopg2.pool.SimpleConnectionPool(1, 10, **self.db_config)
|
|
logger.info("Database connection pool created successfully.")
|
|
except Exception as e:
|
|
logger.error(f"Error creating connection pool: {str(e)}")
|
|
raise
|
|
|
|
def get_connection(self):
|
|
"""Get a connection from the pool"""
|
|
return self.pool.getconn()
|
|
|
|
def return_connection(self, conn):
|
|
"""Return a connection to the pool"""
|
|
self.pool.putconn(conn)
|
|
|
|
def close_all(self):
|
|
"""Close all connections in the pool"""
|
|
if self.pool:
|
|
self.pool.closeall()
|
|
|
|
def ensure_table_exists(self):
|
|
"""Ensure the news table exists with correct schema"""
|
|
conn = None
|
|
cursor = None
|
|
try:
|
|
logger.info("Getting database connection...")
|
|
conn = self.get_connection()
|
|
logger.info("Creating cursor...")
|
|
cursor = conn.cursor()
|
|
|
|
# Check if table exists
|
|
cursor.execute("""
|
|
SELECT EXISTS (
|
|
SELECT FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_name = 'news'
|
|
);
|
|
""")
|
|
table_exists = cursor.fetchone()[0]
|
|
logger.info(f"Table exists check result: {table_exists}")
|
|
|
|
if not table_exists:
|
|
logger.info("Creating table...")
|
|
cursor.execute("""
|
|
CREATE TABLE news (
|
|
id SERIAL PRIMARY KEY,
|
|
title VARCHAR NOT NULL,
|
|
url VARCHAR NOT NULL,
|
|
date TIMESTAMP NOT NULL,
|
|
source VARCHAR NOT NULL,
|
|
content TEXT,
|
|
sentiment_score DOUBLE PRECISION,
|
|
sentiment VARCHAR,
|
|
symbols TEXT[],
|
|
symbol VARCHAR,
|
|
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
description TEXT,
|
|
category VARCHAR(50)
|
|
);
|
|
""")
|
|
logger.info("Table created, creating indexes...")
|
|
cursor.execute("""
|
|
CREATE INDEX idx_news_date ON news(date);
|
|
CREATE INDEX idx_news_symbol ON news(symbol);
|
|
CREATE UNIQUE INDEX news_url_key ON news(url);
|
|
CREATE INDEX idx_news_category ON news(category);
|
|
""")
|
|
conn.commit()
|
|
logger.info("News table created successfully.")
|
|
else:
|
|
logger.info("Table exists, verifying indexes...")
|
|
# Verify indexes
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_news_date ON news(date);
|
|
CREATE INDEX IF NOT EXISTS idx_news_symbol ON news(symbol);
|
|
CREATE INDEX IF NOT EXISTS news_url_key ON news(url);
|
|
CREATE INDEX IF NOT EXISTS idx_news_category ON news(category);
|
|
""")
|
|
conn.commit()
|
|
logger.info("News table schema verified.")
|
|
|
|
except Exception as e:
|
|
if conn:
|
|
conn.rollback()
|
|
logger.error(f"Error ensuring table exists: {str(e)}")
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
if conn:
|
|
self.return_connection(conn)
|
|
logger.info("Database resources cleaned up")
|
|
|
|
def is_duplicate(self, title, url):
|
|
"""Check if a news item already exists"""
|
|
conn = None
|
|
cursor = None
|
|
try:
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT EXISTS(
|
|
SELECT 1
|
|
FROM news
|
|
WHERE title = %s AND url = %s
|
|
)
|
|
""", (title, url))
|
|
|
|
return cursor.fetchone()[0]
|
|
except Exception as e:
|
|
logger.error(f"Error checking for duplicate: {str(e)}")
|
|
return False
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
if conn:
|
|
self.return_connection(conn)
|
|
|
|
def parse_date(date_str):
|
|
"""Parse date string using multiple formats"""
|
|
if not date_str:
|
|
return datetime.now()
|
|
|
|
# Remove any extra whitespace
|
|
date_str = str(date_str).strip()
|
|
|
|
# Try common formats first for better performance
|
|
date_formats = [
|
|
"%Y-%m-%dT%H:%M:%S.%fZ", # ISO format with milliseconds
|
|
"%Y-%m-%dT%H:%M:%SZ", # ISO format without milliseconds
|
|
"%Y-%m-%d %H:%M:%S", # Standard format
|
|
"%a, %d %b %Y %H:%M:%S %z", # RFC 822 format
|
|
"%a, %d %b %Y %H:%M:%S %Z", # Alternative RFC 822
|
|
"%Y-%m-%dT%H:%M:%S%z", # ISO format with timezone offset
|
|
"%Y-%m-%dT%H:%M:%S%Z", # ISO format with timezone name
|
|
"%Y-%m-%dT%H:%M:%S+00:00", # ISO format with explicit timezone
|
|
"%Y-%m-%dT%H:%M:%S-08:00", # Pacific timezone format
|
|
"%a, %d %b %Y %H:%M:%S GMT", # Common RSS format
|
|
]
|
|
|
|
# Try explicit formats first
|
|
for date_format in date_formats:
|
|
try:
|
|
return datetime.strptime(date_str, date_format)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
|
|
# If explicit formats fail, try dateutil parser
|
|
try:
|
|
return date_parser.parse(date_str)
|
|
except Exception as e:
|
|
logger.warning(f"Could not parse date '{date_str}': {str(e)}")
|
|
return datetime.now()
|
|
|
|
def setup_logging():
|
|
"""Configure and return a logger instance"""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
return logger
|