134 lines
4.9 KiB
Python
134 lines
4.9 KiB
Python
import logging
|
|
import pandas as pd
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich import box
|
|
from rich.text import Text
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
console = Console()
|
|
|
|
class NewsView:
|
|
def __init__(self, db_connection):
|
|
self.db = db_connection
|
|
self.console = Console()
|
|
|
|
def get_news(self, days: int = 7, source: str = None) -> pd.DataFrame:
|
|
try:
|
|
query = """
|
|
WITH combined_news AS (
|
|
SELECT
|
|
title,
|
|
published_at as date,
|
|
link as url,
|
|
category as source,
|
|
COALESCE(SUBSTRING(description, 1, 150) || '...', '') as snippet,
|
|
'General' as news_type
|
|
FROM news
|
|
WHERE published_at >= CURRENT_DATE - INTERVAL '%s days'
|
|
{source_filter1}
|
|
ORDER BY published_at DESC
|
|
), economic_news AS (
|
|
SELECT
|
|
title,
|
|
published_at as date,
|
|
url,
|
|
source,
|
|
COALESCE(SUBSTRING(snippet, 1, 150) || '...', '') as snippet,
|
|
'Economic' as news_type
|
|
FROM econews
|
|
WHERE published_at >= CURRENT_DATE - INTERVAL '%s days'
|
|
{source_filter2}
|
|
ORDER BY published_at DESC
|
|
)
|
|
SELECT * FROM combined_news
|
|
UNION ALL
|
|
SELECT * FROM economic_news
|
|
ORDER BY date DESC
|
|
LIMIT 50;
|
|
"""
|
|
|
|
source_filter1 = ""
|
|
source_filter2 = ""
|
|
params = [days, days]
|
|
|
|
if source:
|
|
source_filter1 = "AND LOWER(category) = LOWER(%s)"
|
|
source_filter2 = "AND LOWER(source) = LOWER(%s)"
|
|
params.extend([source, source])
|
|
|
|
query = query.format(
|
|
source_filter1=source_filter1,
|
|
source_filter2=source_filter2
|
|
)
|
|
|
|
return self.db.execute_query(query, params=tuple(params))
|
|
except Exception as e:
|
|
logger.error(f"News query failed: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def get_news_sources(self) -> List[str]:
|
|
try:
|
|
query = """
|
|
SELECT DISTINCT category as source FROM news
|
|
UNION
|
|
SELECT DISTINCT source FROM econews
|
|
ORDER BY source;
|
|
"""
|
|
sources_df = self.db.execute_query(query)
|
|
return sources_df['source'].tolist()
|
|
except Exception as e:
|
|
logger.error(f"Error fetching news sources: {e}")
|
|
return []
|
|
|
|
def display(self, days: int = 7, source: str = None) -> None:
|
|
try:
|
|
news_data = self.get_news(days, source)
|
|
|
|
if news_data.empty:
|
|
console.print("[yellow]No news available for the specified filters.[/]")
|
|
return
|
|
|
|
filter_info = f"\n[cyan]News for the last {days} days"
|
|
if source:
|
|
filter_info += f" from {source}"
|
|
filter_info += "[/]"
|
|
console.print(filter_info, justify="center")
|
|
|
|
current_date = None
|
|
for _, row in news_data.iterrows():
|
|
news_date = row['date'].date()
|
|
|
|
if current_date != news_date:
|
|
current_date = news_date
|
|
console.print(f"\n[magenta]{current_date.strftime('%A, %B %d, %Y')}[/]")
|
|
console.print("─" * 100)
|
|
|
|
table = Table(show_header=False, box=box.MINIMAL, padding=(0, 1))
|
|
table.add_column(style="cyan", width=100)
|
|
|
|
time_str = row['date'].strftime('%H:%M')
|
|
|
|
title_text = Text(row['title'])
|
|
table.add_row(title_text)
|
|
table.add_row(f"[green]{time_str} | {row['source']} | {row['news_type']}[/]")
|
|
|
|
if row['snippet'] and not pd.isna(row['snippet']) and not row['snippet'].startswith('<p><img'):
|
|
table.add_row(Text(row['snippet'], style="dim"))
|
|
|
|
if row['url'] and not pd.isna(row['url']):
|
|
table.add_row(f"[blue]Link: {row['url']}[/]")
|
|
|
|
console.print(table)
|
|
console.print("─" * 100)
|
|
|
|
console.print("\n[blue]Commands:[/]")
|
|
console.print("[dim]news -d <days> : Show news from last N days[/]")
|
|
console.print("[dim]news -s <source> : Filter news by source[/]")
|
|
|
|
except Exception as e:
|
|
logger.error(f"News display error: {e}")
|
|
console.print(f"[red bold]Error displaying news: {str(e)}[/]")
|