134 lines
4.7 KiB
Python
134 lines
4.7 KiB
Python
import os
|
|
import logging
|
|
import psycopg2
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Database configuration
|
|
DB_USER = os.getenv('DB_USER')
|
|
DB_PASSWORD = os.getenv('DB_PASSWORD')
|
|
DB_NAME = os.getenv('DB_NAME')
|
|
DB_HOST = os.getenv('DB_HOST')
|
|
DB_PORT = os.getenv('DB_PORT')
|
|
|
|
def get_db_connection():
|
|
"""Establish and return a database connection"""
|
|
try:
|
|
conn = psycopg2.connect(
|
|
dbname=DB_NAME,
|
|
user=DB_USER,
|
|
password=DB_PASSWORD,
|
|
host=DB_HOST,
|
|
port=DB_PORT
|
|
)
|
|
return conn
|
|
except Exception as e:
|
|
logging.error(f"Error connecting to the database: {str(e)}")
|
|
raise
|
|
|
|
def test_db_connection():
|
|
"""Test the database connection"""
|
|
try:
|
|
conn = get_db_connection()
|
|
conn.close()
|
|
logging.info("Database connection successful.")
|
|
except Exception as e:
|
|
logging.error(f"Error connecting to the database: {str(e)}")
|
|
raise
|
|
|
|
def ensure_table_exists():
|
|
"""Ensure the econews table exists with correct schema"""
|
|
conn = None
|
|
cursor = None
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if table exists
|
|
cursor.execute("""
|
|
SELECT EXISTS (
|
|
SELECT FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_name = 'econews'
|
|
);
|
|
""")
|
|
table_exists = cursor.fetchone()[0]
|
|
|
|
if not table_exists:
|
|
cursor.execute("""
|
|
CREATE TABLE econews (
|
|
id SERIAL PRIMARY KEY,
|
|
title TEXT NOT NULL,
|
|
url TEXT NOT NULL,
|
|
source TEXT NOT NULL,
|
|
snippet TEXT,
|
|
published_at TIMESTAMP NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
site TEXT NOT NULL,
|
|
image_url TEXT,
|
|
author TEXT,
|
|
category TEXT
|
|
);
|
|
|
|
CREATE INDEX idx_econews_title_url ON econews(title, url);
|
|
CREATE INDEX idx_econews_published_at ON econews(published_at DESC);
|
|
CREATE INDEX idx_econews_source ON econews(source);
|
|
""")
|
|
conn.commit()
|
|
logging.info("Econews table created successfully.")
|
|
else:
|
|
# Ensure all required columns exist
|
|
cursor.execute("""
|
|
DO $$
|
|
BEGIN
|
|
BEGIN
|
|
ALTER TABLE econews ADD COLUMN IF NOT EXISTS title TEXT NOT NULL DEFAULT '';
|
|
ALTER TABLE econews ADD COLUMN IF NOT EXISTS url TEXT NOT NULL DEFAULT '';
|
|
ALTER TABLE econews ADD COLUMN IF NOT EXISTS source TEXT NOT NULL DEFAULT '';
|
|
ALTER TABLE econews ADD COLUMN IF NOT EXISTS snippet TEXT;
|
|
ALTER TABLE econews ADD COLUMN IF NOT EXISTS published_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP;
|
|
ALTER TABLE econews ADD COLUMN IF NOT EXISTS created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
|
|
ALTER TABLE econews ADD COLUMN IF NOT EXISTS site TEXT NOT NULL DEFAULT '';
|
|
ALTER TABLE econews ADD COLUMN IF NOT EXISTS image_url TEXT;
|
|
ALTER TABLE econews ADD COLUMN IF NOT EXISTS author TEXT;
|
|
ALTER TABLE econews ADD COLUMN IF NOT EXISTS category TEXT;
|
|
EXCEPTION
|
|
WHEN others THEN NULL;
|
|
END;
|
|
END $$;
|
|
""")
|
|
|
|
# Ensure indexes exist
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_econews_title_url ON econews(title, url);
|
|
CREATE INDEX IF NOT EXISTS idx_econews_published_at ON econews(published_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_econews_source ON econews(source);
|
|
""")
|
|
conn.commit()
|
|
logging.info("Econews table schema verified.")
|
|
|
|
except Exception as e:
|
|
if conn:
|
|
conn.rollback()
|
|
logging.error(f"Error ensuring table exists: {str(e)}")
|
|
raise
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
if conn:
|
|
conn.close()
|
|
|
|
def parse_fmp_date(date_str):
|
|
"""Parse FMP date strings to datetime objects"""
|
|
try:
|
|
return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
|
|
except ValueError:
|
|
try:
|
|
return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
|
|
except ValueError:
|
|
logging.warning(f"Could not parse date: {date_str}")
|
|
return datetime.now()
|