554 lines
21 KiB
Python
554 lines
21 KiB
Python
import pandas as pd
|
|
from tabulate import tabulate
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List, Dict
|
|
import numpy as np
|
|
import decimal
|
|
from decimal import Decimal
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich.panel import Panel
|
|
from rich.text import Text
|
|
from rich.box import DOUBLE, ROUNDED
|
|
from rich import box
|
|
from rich.rule import Rule
|
|
|
|
logger = logging.getLogger(__name__)
|
|
console = Console()
|
|
|
|
class StocksView:
|
|
def __init__(self, db_connection):
|
|
self.db = db_connection
|
|
self.BOX_SIZES = {
|
|
'company': {'width': 60, 'content': 56},
|
|
'large': {'width': 80, 'content': 76}
|
|
}
|
|
self.NEWS_LIMIT = 5
|
|
self.PRICE_HISTORY_DAYS = 5
|
|
self.styles = {
|
|
'positive': 'green',
|
|
'negative': 'red',
|
|
'neutral': 'blue',
|
|
'header': 'yellow bold',
|
|
'date': 'cyan',
|
|
'title': 'magenta bold'
|
|
}
|
|
|
|
def get_stock_info(self, ticker: str) -> pd.DataFrame:
|
|
try:
|
|
query = """
|
|
SELECT
|
|
t.isin,
|
|
t.yf_ticker,
|
|
t.name,
|
|
t.description,
|
|
z.name as zone,
|
|
s.name as sector
|
|
FROM tickers t
|
|
LEFT JOIN zones z ON z.id = t.zone_id
|
|
LEFT JOIN sectors s ON s.id = t.sector_id
|
|
WHERE UPPER(t.yf_ticker) = UPPER(%s);
|
|
"""
|
|
return self.db.execute_query(query, (ticker,))
|
|
except Exception as e:
|
|
logger.error(f"Error fetching stock info: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def get_ytd_prices(self, ticker: str) -> pd.DataFrame:
|
|
try:
|
|
query = """
|
|
SELECT
|
|
s.date,
|
|
s.open,
|
|
s.high,
|
|
s.low,
|
|
s.close,
|
|
s.adj_close,
|
|
s.volume
|
|
FROM stocks s
|
|
JOIN tickers t ON t.id = s.ticker_id
|
|
WHERE UPPER(t.yf_ticker) = UPPER(%s)
|
|
AND s.date >= DATE_TRUNC('year', CURRENT_DATE)
|
|
ORDER BY s.date ASC;
|
|
"""
|
|
df = self.db.execute_query(query, (ticker,))
|
|
if not df.empty:
|
|
df['date'] = pd.to_datetime(df['date'])
|
|
return df
|
|
except Exception as e:
|
|
logger.error(f"Error fetching YTD prices: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def get_stock_prices(self, ticker: str, days: int = 5) -> pd.DataFrame:
|
|
try:
|
|
query = """
|
|
SELECT
|
|
s.date,
|
|
s.open,
|
|
s.high,
|
|
s.low,
|
|
s.close,
|
|
s.adj_close,
|
|
s.volume
|
|
FROM stocks s
|
|
JOIN tickers t ON t.id = s.ticker_id
|
|
WHERE UPPER(t.yf_ticker) = UPPER(%s)
|
|
AND s.date >= CURRENT_DATE - INTERVAL '%s days'
|
|
ORDER BY s.date DESC;
|
|
"""
|
|
df = self.db.execute_query(query, (ticker, days))
|
|
if not df.empty:
|
|
df['date'] = pd.to_datetime(df['date'])
|
|
return df
|
|
except Exception as e:
|
|
logger.error(f"Error fetching stock prices: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def get_stock_news(self, ticker: str, limit: int = 5) -> pd.DataFrame:
|
|
try:
|
|
query = """
|
|
SELECT
|
|
sn.published_at,
|
|
sn.news_type,
|
|
sn.title,
|
|
sn.content
|
|
FROM stocknews sn
|
|
JOIN tickers t ON t.id = sn.ticker_id
|
|
WHERE UPPER(t.yf_ticker) = UPPER(%s)
|
|
ORDER BY sn.published_at DESC
|
|
LIMIT %s;
|
|
"""
|
|
df = self.db.execute_query(query, (ticker, limit))
|
|
if not df.empty:
|
|
df['published_at'] = pd.to_datetime(df['published_at'])
|
|
return df
|
|
except Exception as e:
|
|
logger.error(f"Error fetching news for {ticker}: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def get_ratings(self, ticker: str) -> pd.DataFrame:
|
|
try:
|
|
query = """
|
|
WITH latest_ratings AS (
|
|
SELECT DISTINCT ON (analyst)
|
|
date,
|
|
analyst,
|
|
prior_rating,
|
|
current_rating,
|
|
price_target
|
|
FROM ratings r
|
|
JOIN tickers t ON t.id = r.ticker_id
|
|
WHERE UPPER(t.yf_ticker) = UPPER(%s)
|
|
ORDER BY analyst, date DESC
|
|
)
|
|
SELECT
|
|
date,
|
|
analyst,
|
|
prior_rating,
|
|
current_rating,
|
|
price_target
|
|
FROM latest_ratings
|
|
ORDER BY date DESC;
|
|
"""
|
|
return self.db.execute_query(query, (ticker,))
|
|
except Exception as e:
|
|
logger.error(f"Error fetching ratings: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def get_financial_metrics(self, ticker: str) -> pd.DataFrame:
|
|
try:
|
|
query = """
|
|
WITH latest_metrics AS (
|
|
SELECT DISTINCT ON (metric_type)
|
|
ticker_id,
|
|
date,
|
|
metric_type,
|
|
data
|
|
FROM financial_metrics fm
|
|
JOIN tickers t ON t.id = fm.ticker_id
|
|
WHERE UPPER(t.yf_ticker) = UPPER(%s)
|
|
AND period = 'ttm'
|
|
ORDER BY metric_type, date DESC
|
|
)
|
|
SELECT
|
|
t.yf_ticker,
|
|
t.ccy as currency,
|
|
(SELECT data->>'marketCapTTM' FROM latest_metrics WHERE metric_type = 'key_metrics')::numeric as market_cap,
|
|
(SELECT data->>'peRatioTTM' FROM latest_metrics WHERE metric_type = 'key_metrics')::numeric as pe_ratio,
|
|
(SELECT data->>'netIncomePerShareTTM' FROM latest_metrics WHERE metric_type = 'key_metrics')::numeric as eps,
|
|
(SELECT data->>'beta' FROM latest_metrics WHERE metric_type = 'key_metrics')::numeric as beta,
|
|
(SELECT data->>'next_earnings_date' FROM latest_metrics WHERE metric_type = 'key_metrics')::date as next_earnings_date,
|
|
(SELECT data->>'dividendPerShareTTM' FROM latest_metrics WHERE metric_type = 'key_metrics')::numeric as dividend_per_share
|
|
FROM tickers t
|
|
WHERE UPPER(t.yf_ticker) = UPPER(%s)
|
|
"""
|
|
return self.db.execute_query(query, (ticker, ticker))
|
|
except Exception as e:
|
|
logger.error(f"Error fetching financial metrics: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def display_company_info(self, info: pd.DataFrame):
|
|
if info.empty:
|
|
return
|
|
|
|
try:
|
|
company_panel = Panel(
|
|
Text.assemble(
|
|
("ISIN: ", self.styles['header']),
|
|
(f"{info['isin'].iloc[0]}\n", self.styles['neutral']),
|
|
("Ticker: ", self.styles['header']),
|
|
(f"{info['yf_ticker'].iloc[0]}\n", self.styles['neutral']),
|
|
("Name: ", self.styles['header']),
|
|
(f"{info['name'].iloc[0]}\n", self.styles['neutral']),
|
|
("Zone: ", self.styles['header']),
|
|
(f"{info['zone'].iloc[0]}\n", self.styles['neutral']),
|
|
("Sector: ", self.styles['header']),
|
|
(f"{info['sector'].iloc[0]}", self.styles['neutral'])
|
|
),
|
|
title="Company Information",
|
|
box=box.ROUNDED
|
|
)
|
|
console.print(company_panel)
|
|
|
|
if 'description' in info and not pd.isna(info['description'].iloc[0]):
|
|
desc_panel = Panel(
|
|
info['description'].iloc[0],
|
|
title="Description",
|
|
box=box.ROUNDED
|
|
)
|
|
console.print(desc_panel)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error displaying company info: {e}")
|
|
|
|
def display_financial_metrics(self, metrics: pd.DataFrame):
|
|
if metrics.empty:
|
|
return
|
|
|
|
try:
|
|
metrics_table = Table(
|
|
title="Key Financial Metrics",
|
|
box=box.ROUNDED,
|
|
show_header=False,
|
|
title_style=self.styles['header']
|
|
)
|
|
|
|
metrics_table.add_column("Metric", style=self.styles['neutral'])
|
|
metrics_table.add_column("Value", style=self.styles['neutral'])
|
|
|
|
currency = metrics['currency'].iloc[0] if pd.notna(metrics['currency'].iloc[0]) else 'USD'
|
|
currency_symbol = '$' if currency == 'USD' else currency
|
|
|
|
market_cap = metrics['market_cap'].iloc[0]
|
|
if pd.notna(market_cap):
|
|
market_cap = Decimal(str(market_cap))
|
|
if market_cap >= Decimal('1000000000'):
|
|
market_cap_str = f"{currency_symbol}{float(market_cap/Decimal('1000000000')):.2f}B"
|
|
else:
|
|
market_cap_str = f"{currency_symbol}{float(market_cap/Decimal('1000000')):.2f}M"
|
|
else:
|
|
market_cap_str = "N/A"
|
|
|
|
metrics_table.add_row("Market Cap", market_cap_str)
|
|
metrics_table.add_row(
|
|
"P/E Ratio",
|
|
f"{float(Decimal(str(metrics['pe_ratio'].iloc[0]))):.2f}" if pd.notna(metrics['pe_ratio'].iloc[0]) else "N/A"
|
|
)
|
|
metrics_table.add_row(
|
|
"EPS",
|
|
f"{currency_symbol}{float(Decimal(str(metrics['eps'].iloc[0]))):.2f}" if pd.notna(metrics['eps'].iloc[0]) else "N/A"
|
|
)
|
|
|
|
console.print(metrics_table)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error displaying financial metrics: {e}")
|
|
|
|
def display_price_graph(self, prices: pd.DataFrame, ticker: str):
|
|
if prices.empty:
|
|
return
|
|
|
|
try:
|
|
prices['close'] = prices['close'].apply(
|
|
lambda x: float(x) if isinstance(x, decimal.Decimal) else float(x)
|
|
)
|
|
close_prices = prices['close'].values
|
|
dates = prices['date'].dt.strftime('%Y-%m-%d').values
|
|
|
|
current_price = float(close_prices[-1])
|
|
initial_price = float(close_prices[0])
|
|
price_change = current_price - initial_price
|
|
price_change_pct = (price_change / initial_price) * 100 if initial_price != 0 else 0
|
|
|
|
# Create price evolution panel with rich formatting
|
|
price_panel = Panel(
|
|
Text.assemble(
|
|
("Current: ", self.styles['header']),
|
|
(f"{current_price:.2f}\n", self.styles['neutral']),
|
|
("Change: ", self.styles['header']),
|
|
(
|
|
f"{price_change:+.2f} ({price_change_pct:+.1f}%)",
|
|
self.styles['positive'] if price_change >= 0 else self.styles['negative']
|
|
)
|
|
),
|
|
title="Price Evolution",
|
|
box=box.ROUNDED
|
|
)
|
|
console.print(price_panel)
|
|
|
|
# Create ASCII price chart
|
|
height = 20
|
|
width = 60
|
|
|
|
max_price = float(np.nanmax(close_prices))
|
|
min_price = float(np.nanmin(close_prices))
|
|
price_range = max_price - min_price
|
|
|
|
if price_range == 0:
|
|
price_range = max_price * 0.1
|
|
|
|
padding = price_range * 0.1
|
|
max_price += padding
|
|
min_price = max(0, min_price - padding)
|
|
price_range = max_price - min_price
|
|
|
|
chart = ""
|
|
canvas = [[" " for _ in range(width)] for _ in range(height)]
|
|
|
|
# Plot price line
|
|
for idx in range(len(close_prices) - 1):
|
|
x1 = int(idx * (width - 1) / (len(close_prices) - 1))
|
|
x2 = int((idx + 1) * (width - 1) / (len(close_prices) - 1))
|
|
y1 = int((max_price - close_prices[idx]) * (height - 1) / price_range)
|
|
y2 = int((max_price - close_prices[idx + 1]) * (height - 1) / price_range)
|
|
|
|
if x1 != x2:
|
|
dx = x2 - x1
|
|
dy = y2 - y1
|
|
steps = max(abs(dx), abs(dy))
|
|
|
|
if steps > 0:
|
|
x_step = dx / steps
|
|
y_step = dy / steps
|
|
|
|
for step in range(steps + 1):
|
|
x = int(x1 + step * x_step)
|
|
y = int(y1 + step * y_step)
|
|
|
|
if 0 <= x < width and 0 <= y < height:
|
|
canvas[y][x] = "█"
|
|
|
|
# Add price labels and create chart string
|
|
price_labels = np.linspace(max_price, min_price, 5)
|
|
label_positions = np.linspace(0, height-1, 5).astype(int)
|
|
|
|
for i in range(height):
|
|
if i in label_positions:
|
|
label_idx = np.where(label_positions == i)[0][0]
|
|
chart += f"{price_labels[label_idx]:8.2f} {''.join(canvas[i])}\n"
|
|
else:
|
|
chart += " " * 10 + ''.join(canvas[i]) + "\n"
|
|
|
|
# Add date range
|
|
chart += "─" * (width + 10) + "\n"
|
|
chart += f"{dates[0]}{' ' * (width - len(dates[0]) - len(dates[-1]))}{dates[-1]}"
|
|
|
|
# Display chart in a panel
|
|
console.print(Panel(chart, box=box.ROUNDED))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating price graph: {e}")
|
|
console.print("[red]Error creating price graph[/red]")
|
|
|
|
def display_stock_prices(self, prices: pd.DataFrame):
|
|
if prices.empty:
|
|
return
|
|
|
|
try:
|
|
prices_table = Table(
|
|
title="Recent Stock Prices",
|
|
box=box.ROUNDED,
|
|
title_style=self.styles['header']
|
|
)
|
|
|
|
# Add columns
|
|
columns = {
|
|
'date': 'Date',
|
|
'open': 'Open',
|
|
'high': 'High',
|
|
'low': 'Low',
|
|
'close': 'Close',
|
|
'adj_close': 'Adj Close',
|
|
'volume': 'Volume'
|
|
}
|
|
|
|
for col_name in columns.values():
|
|
prices_table.add_column(col_name, style=self.styles['neutral'])
|
|
|
|
# Format and add rows
|
|
for _, row in prices.iterrows():
|
|
formatted_row = []
|
|
|
|
# Date formatting
|
|
formatted_row.append(pd.to_datetime(row['date']).strftime('%Y-%m-%d'))
|
|
|
|
# Price columns formatting
|
|
for col in ['open', 'high', 'low', 'close', 'adj_close']:
|
|
val = float(row[col]) if isinstance(row[col], decimal.Decimal) else row[col]
|
|
formatted_row.append(f"{val:.2f}")
|
|
|
|
# Volume formatting
|
|
volume = int(row['volume'])
|
|
formatted_row.append(f"{volume:,}")
|
|
|
|
prices_table.add_row(*formatted_row)
|
|
|
|
console.print(prices_table)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error displaying stock prices: {e}")
|
|
|
|
def display_news(self, news: pd.DataFrame):
|
|
if news.empty:
|
|
console.print(Panel("No news available", title="News", box=box.ROUNDED))
|
|
return
|
|
|
|
try:
|
|
news_table = Table(
|
|
title="Recent Company News",
|
|
box=box.ROUNDED,
|
|
show_header=True,
|
|
title_style=self.styles['header']
|
|
)
|
|
|
|
news_table.add_column("Date", style=self.styles['date'])
|
|
news_table.add_column("Type", style=self.styles['neutral'])
|
|
news_table.add_column("Title", style=self.styles['title'])
|
|
news_table.add_column("Preview", style=self.styles['neutral'])
|
|
|
|
for _, row in news.iterrows():
|
|
news_date = pd.to_datetime(row['published_at']).strftime('%Y-%m-%d %H:%M')
|
|
content_preview = row['content'][:100] + "..." if len(row['content']) > 100 else row['content']
|
|
|
|
news_table.add_row(
|
|
news_date,
|
|
row['news_type'],
|
|
row['title'],
|
|
content_preview
|
|
)
|
|
|
|
console.print(news_table)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error displaying news: {e}")
|
|
|
|
def display_ratings(self, ratings: pd.DataFrame):
|
|
if ratings.empty:
|
|
return
|
|
|
|
try:
|
|
ratings_table = Table(
|
|
title="Analyst Ratings",
|
|
box=box.ROUNDED,
|
|
show_header=True,
|
|
title_style=self.styles['header']
|
|
)
|
|
|
|
ratings_table.add_column("Date", style=self.styles['date'])
|
|
ratings_table.add_column("Analyst", style=self.styles['neutral'])
|
|
ratings_table.add_column("Prior Rating", style=self.styles['neutral'])
|
|
ratings_table.add_column("Current Rating", style=self.styles['neutral'])
|
|
ratings_table.add_column("Price Target", style=self.styles['neutral'])
|
|
|
|
for _, row in ratings.iterrows():
|
|
date = pd.to_datetime(row['date']).strftime('%Y-%m-%d')
|
|
price_target = f"{float(row['price_target']):.2f}" if pd.notna(row['price_target']) else "N/A"
|
|
|
|
ratings_table.add_row(
|
|
date,
|
|
row['analyst'],
|
|
str(row['prior_rating']),
|
|
str(row['current_rating']),
|
|
price_target
|
|
)
|
|
|
|
console.print(ratings_table)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error displaying ratings: {e}")
|
|
|
|
def display(self, ticker: Optional[str] = None) -> None:
|
|
if not ticker:
|
|
console.print("[red]Please provide a stock ticker[/red]")
|
|
return
|
|
|
|
ticker = ticker.upper()
|
|
console.rule(f"[yellow]Stock Analysis: {ticker}[/yellow]")
|
|
|
|
try:
|
|
# Display basic stock information
|
|
info = self.get_stock_info(ticker)
|
|
if not info.empty:
|
|
self.display_company_info(info)
|
|
|
|
prices = self.get_ytd_prices(ticker)
|
|
if not prices.empty:
|
|
self.display_price_graph(prices, ticker)
|
|
|
|
recent_prices = self.get_stock_prices(ticker, self.PRICE_HISTORY_DAYS)
|
|
if not recent_prices.empty:
|
|
self.display_stock_prices(recent_prices)
|
|
|
|
metrics = self.get_financial_metrics(ticker)
|
|
if not metrics.empty:
|
|
self.display_financial_metrics(metrics)
|
|
|
|
news = self.get_stock_news(ticker, self.NEWS_LIMIT)
|
|
if not news.empty:
|
|
self.display_news(news)
|
|
|
|
ratings = self.get_ratings(ticker)
|
|
if not ratings.empty:
|
|
self.display_ratings(ratings)
|
|
|
|
console.rule()
|
|
|
|
# Display insights menu
|
|
console.print("\nGet Deeper Insights:")
|
|
|
|
insights = [
|
|
"1. Historical: Price history and charts",
|
|
"2. Financials: Balance sheet, Income statement, Cash flow",
|
|
"3. Valuation: Company valuation metrics",
|
|
"4. Metrics: Key performance indicators",
|
|
"5. Technical: Technical analysis indicators",
|
|
"6. News: Company news and updates",
|
|
"7. Consensus: Analyst ratings and consensus",
|
|
"8. Transcripts: Earnings call transcripts"
|
|
]
|
|
|
|
for insight in insights:
|
|
console.print(insight)
|
|
console.print("\nEnter a number (1-8) to view detailed insights or press Enter to exit:")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error displaying stock information: {e}")
|
|
console.print("[red]Error displaying stock information[/red]")
|
|
|
|
def handle_insights_command(self, ticker: str, choice: str) -> bool:
|
|
"""Handle insights menu selection"""
|
|
try:
|
|
from modules.signals.stocks_insights import StocksInsights, StocksInsightsView
|
|
|
|
insight_num = int(choice)
|
|
if 1 <= insight_num <= 8:
|
|
insights = StocksInsights(self.db)
|
|
view = StocksInsightsView(insights)
|
|
view.handle_insight_request(ticker, insight_num)
|
|
return True
|
|
return False
|
|
except (ValueError, ImportError) as e:
|
|
logger.error(f"Error handling insights command: {e}")
|
|
return False
|