RivaTerminal/modules/signals/stocks_view.py

554 lines
21 KiB
Python

import pandas as pd
from tabulate import tabulate
import logging
from datetime import datetime, timedelta
from typing import Optional, List, Dict
import numpy as np
import decimal
from decimal import Decimal
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.text import Text
from rich.box import DOUBLE, ROUNDED
from rich import box
from rich.rule import Rule
logger = logging.getLogger(__name__)
console = Console()
class StocksView:
def __init__(self, db_connection):
self.db = db_connection
self.BOX_SIZES = {
'company': {'width': 60, 'content': 56},
'large': {'width': 80, 'content': 76}
}
self.NEWS_LIMIT = 5
self.PRICE_HISTORY_DAYS = 5
self.styles = {
'positive': 'green',
'negative': 'red',
'neutral': 'blue',
'header': 'yellow bold',
'date': 'cyan',
'title': 'magenta bold'
}
def get_stock_info(self, ticker: str) -> pd.DataFrame:
try:
query = """
SELECT
t.isin,
t.yf_ticker,
t.name,
t.description,
z.name as zone,
s.name as sector
FROM tickers t
LEFT JOIN zones z ON z.id = t.zone_id
LEFT JOIN sectors s ON s.id = t.sector_id
WHERE UPPER(t.yf_ticker) = UPPER(%s);
"""
return self.db.execute_query(query, (ticker,))
except Exception as e:
logger.error(f"Error fetching stock info: {e}")
return pd.DataFrame()
def get_ytd_prices(self, ticker: str) -> pd.DataFrame:
try:
query = """
SELECT
s.date,
s.open,
s.high,
s.low,
s.close,
s.adj_close,
s.volume
FROM stocks s
JOIN tickers t ON t.id = s.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND s.date >= DATE_TRUNC('year', CURRENT_DATE)
ORDER BY s.date ASC;
"""
df = self.db.execute_query(query, (ticker,))
if not df.empty:
df['date'] = pd.to_datetime(df['date'])
return df
except Exception as e:
logger.error(f"Error fetching YTD prices: {e}")
return pd.DataFrame()
def get_stock_prices(self, ticker: str, days: int = 5) -> pd.DataFrame:
try:
query = """
SELECT
s.date,
s.open,
s.high,
s.low,
s.close,
s.adj_close,
s.volume
FROM stocks s
JOIN tickers t ON t.id = s.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND s.date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY s.date DESC;
"""
df = self.db.execute_query(query, (ticker, days))
if not df.empty:
df['date'] = pd.to_datetime(df['date'])
return df
except Exception as e:
logger.error(f"Error fetching stock prices: {e}")
return pd.DataFrame()
def get_stock_news(self, ticker: str, limit: int = 5) -> pd.DataFrame:
try:
query = """
SELECT
sn.published_at,
sn.news_type,
sn.title,
sn.content
FROM stocknews sn
JOIN tickers t ON t.id = sn.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
ORDER BY sn.published_at DESC
LIMIT %s;
"""
df = self.db.execute_query(query, (ticker, limit))
if not df.empty:
df['published_at'] = pd.to_datetime(df['published_at'])
return df
except Exception as e:
logger.error(f"Error fetching news for {ticker}: {e}")
return pd.DataFrame()
def get_ratings(self, ticker: str) -> pd.DataFrame:
try:
query = """
WITH latest_ratings AS (
SELECT DISTINCT ON (analyst)
date,
analyst,
prior_rating,
current_rating,
price_target
FROM ratings r
JOIN tickers t ON t.id = r.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
ORDER BY analyst, date DESC
)
SELECT
date,
analyst,
prior_rating,
current_rating,
price_target
FROM latest_ratings
ORDER BY date DESC;
"""
return self.db.execute_query(query, (ticker,))
except Exception as e:
logger.error(f"Error fetching ratings: {e}")
return pd.DataFrame()
def get_financial_metrics(self, ticker: str) -> pd.DataFrame:
try:
query = """
WITH latest_metrics AS (
SELECT DISTINCT ON (metric_type)
ticker_id,
date,
metric_type,
data
FROM financial_metrics fm
JOIN tickers t ON t.id = fm.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND period = 'ttm'
ORDER BY metric_type, date DESC
)
SELECT
t.yf_ticker,
t.ccy as currency,
(SELECT data->>'marketCapTTM' FROM latest_metrics WHERE metric_type = 'key_metrics')::numeric as market_cap,
(SELECT data->>'peRatioTTM' FROM latest_metrics WHERE metric_type = 'key_metrics')::numeric as pe_ratio,
(SELECT data->>'netIncomePerShareTTM' FROM latest_metrics WHERE metric_type = 'key_metrics')::numeric as eps,
(SELECT data->>'beta' FROM latest_metrics WHERE metric_type = 'key_metrics')::numeric as beta,
(SELECT data->>'next_earnings_date' FROM latest_metrics WHERE metric_type = 'key_metrics')::date as next_earnings_date,
(SELECT data->>'dividendPerShareTTM' FROM latest_metrics WHERE metric_type = 'key_metrics')::numeric as dividend_per_share
FROM tickers t
WHERE UPPER(t.yf_ticker) = UPPER(%s)
"""
return self.db.execute_query(query, (ticker, ticker))
except Exception as e:
logger.error(f"Error fetching financial metrics: {e}")
return pd.DataFrame()
def display_company_info(self, info: pd.DataFrame):
if info.empty:
return
try:
company_panel = Panel(
Text.assemble(
("ISIN: ", self.styles['header']),
(f"{info['isin'].iloc[0]}\n", self.styles['neutral']),
("Ticker: ", self.styles['header']),
(f"{info['yf_ticker'].iloc[0]}\n", self.styles['neutral']),
("Name: ", self.styles['header']),
(f"{info['name'].iloc[0]}\n", self.styles['neutral']),
("Zone: ", self.styles['header']),
(f"{info['zone'].iloc[0]}\n", self.styles['neutral']),
("Sector: ", self.styles['header']),
(f"{info['sector'].iloc[0]}", self.styles['neutral'])
),
title="Company Information",
box=box.ROUNDED
)
console.print(company_panel)
if 'description' in info and not pd.isna(info['description'].iloc[0]):
desc_panel = Panel(
info['description'].iloc[0],
title="Description",
box=box.ROUNDED
)
console.print(desc_panel)
except Exception as e:
logger.error(f"Error displaying company info: {e}")
def display_financial_metrics(self, metrics: pd.DataFrame):
if metrics.empty:
return
try:
metrics_table = Table(
title="Key Financial Metrics",
box=box.ROUNDED,
show_header=False,
title_style=self.styles['header']
)
metrics_table.add_column("Metric", style=self.styles['neutral'])
metrics_table.add_column("Value", style=self.styles['neutral'])
currency = metrics['currency'].iloc[0] if pd.notna(metrics['currency'].iloc[0]) else 'USD'
currency_symbol = '$' if currency == 'USD' else currency
market_cap = metrics['market_cap'].iloc[0]
if pd.notna(market_cap):
market_cap = Decimal(str(market_cap))
if market_cap >= Decimal('1000000000'):
market_cap_str = f"{currency_symbol}{float(market_cap/Decimal('1000000000')):.2f}B"
else:
market_cap_str = f"{currency_symbol}{float(market_cap/Decimal('1000000')):.2f}M"
else:
market_cap_str = "N/A"
metrics_table.add_row("Market Cap", market_cap_str)
metrics_table.add_row(
"P/E Ratio",
f"{float(Decimal(str(metrics['pe_ratio'].iloc[0]))):.2f}" if pd.notna(metrics['pe_ratio'].iloc[0]) else "N/A"
)
metrics_table.add_row(
"EPS",
f"{currency_symbol}{float(Decimal(str(metrics['eps'].iloc[0]))):.2f}" if pd.notna(metrics['eps'].iloc[0]) else "N/A"
)
console.print(metrics_table)
except Exception as e:
logger.error(f"Error displaying financial metrics: {e}")
def display_price_graph(self, prices: pd.DataFrame, ticker: str):
if prices.empty:
return
try:
prices['close'] = prices['close'].apply(
lambda x: float(x) if isinstance(x, decimal.Decimal) else float(x)
)
close_prices = prices['close'].values
dates = prices['date'].dt.strftime('%Y-%m-%d').values
current_price = float(close_prices[-1])
initial_price = float(close_prices[0])
price_change = current_price - initial_price
price_change_pct = (price_change / initial_price) * 100 if initial_price != 0 else 0
# Create price evolution panel with rich formatting
price_panel = Panel(
Text.assemble(
("Current: ", self.styles['header']),
(f"{current_price:.2f}\n", self.styles['neutral']),
("Change: ", self.styles['header']),
(
f"{price_change:+.2f} ({price_change_pct:+.1f}%)",
self.styles['positive'] if price_change >= 0 else self.styles['negative']
)
),
title="Price Evolution",
box=box.ROUNDED
)
console.print(price_panel)
# Create ASCII price chart
height = 20
width = 60
max_price = float(np.nanmax(close_prices))
min_price = float(np.nanmin(close_prices))
price_range = max_price - min_price
if price_range == 0:
price_range = max_price * 0.1
padding = price_range * 0.1
max_price += padding
min_price = max(0, min_price - padding)
price_range = max_price - min_price
chart = ""
canvas = [[" " for _ in range(width)] for _ in range(height)]
# Plot price line
for idx in range(len(close_prices) - 1):
x1 = int(idx * (width - 1) / (len(close_prices) - 1))
x2 = int((idx + 1) * (width - 1) / (len(close_prices) - 1))
y1 = int((max_price - close_prices[idx]) * (height - 1) / price_range)
y2 = int((max_price - close_prices[idx + 1]) * (height - 1) / price_range)
if x1 != x2:
dx = x2 - x1
dy = y2 - y1
steps = max(abs(dx), abs(dy))
if steps > 0:
x_step = dx / steps
y_step = dy / steps
for step in range(steps + 1):
x = int(x1 + step * x_step)
y = int(y1 + step * y_step)
if 0 <= x < width and 0 <= y < height:
canvas[y][x] = ""
# Add price labels and create chart string
price_labels = np.linspace(max_price, min_price, 5)
label_positions = np.linspace(0, height-1, 5).astype(int)
for i in range(height):
if i in label_positions:
label_idx = np.where(label_positions == i)[0][0]
chart += f"{price_labels[label_idx]:8.2f} {''.join(canvas[i])}\n"
else:
chart += " " * 10 + ''.join(canvas[i]) + "\n"
# Add date range
chart += "" * (width + 10) + "\n"
chart += f"{dates[0]}{' ' * (width - len(dates[0]) - len(dates[-1]))}{dates[-1]}"
# Display chart in a panel
console.print(Panel(chart, box=box.ROUNDED))
except Exception as e:
logger.error(f"Error creating price graph: {e}")
console.print("[red]Error creating price graph[/red]")
def display_stock_prices(self, prices: pd.DataFrame):
if prices.empty:
return
try:
prices_table = Table(
title="Recent Stock Prices",
box=box.ROUNDED,
title_style=self.styles['header']
)
# Add columns
columns = {
'date': 'Date',
'open': 'Open',
'high': 'High',
'low': 'Low',
'close': 'Close',
'adj_close': 'Adj Close',
'volume': 'Volume'
}
for col_name in columns.values():
prices_table.add_column(col_name, style=self.styles['neutral'])
# Format and add rows
for _, row in prices.iterrows():
formatted_row = []
# Date formatting
formatted_row.append(pd.to_datetime(row['date']).strftime('%Y-%m-%d'))
# Price columns formatting
for col in ['open', 'high', 'low', 'close', 'adj_close']:
val = float(row[col]) if isinstance(row[col], decimal.Decimal) else row[col]
formatted_row.append(f"{val:.2f}")
# Volume formatting
volume = int(row['volume'])
formatted_row.append(f"{volume:,}")
prices_table.add_row(*formatted_row)
console.print(prices_table)
except Exception as e:
logger.error(f"Error displaying stock prices: {e}")
def display_news(self, news: pd.DataFrame):
if news.empty:
console.print(Panel("No news available", title="News", box=box.ROUNDED))
return
try:
news_table = Table(
title="Recent Company News",
box=box.ROUNDED,
show_header=True,
title_style=self.styles['header']
)
news_table.add_column("Date", style=self.styles['date'])
news_table.add_column("Type", style=self.styles['neutral'])
news_table.add_column("Title", style=self.styles['title'])
news_table.add_column("Preview", style=self.styles['neutral'])
for _, row in news.iterrows():
news_date = pd.to_datetime(row['published_at']).strftime('%Y-%m-%d %H:%M')
content_preview = row['content'][:100] + "..." if len(row['content']) > 100 else row['content']
news_table.add_row(
news_date,
row['news_type'],
row['title'],
content_preview
)
console.print(news_table)
except Exception as e:
logger.error(f"Error displaying news: {e}")
def display_ratings(self, ratings: pd.DataFrame):
if ratings.empty:
return
try:
ratings_table = Table(
title="Analyst Ratings",
box=box.ROUNDED,
show_header=True,
title_style=self.styles['header']
)
ratings_table.add_column("Date", style=self.styles['date'])
ratings_table.add_column("Analyst", style=self.styles['neutral'])
ratings_table.add_column("Prior Rating", style=self.styles['neutral'])
ratings_table.add_column("Current Rating", style=self.styles['neutral'])
ratings_table.add_column("Price Target", style=self.styles['neutral'])
for _, row in ratings.iterrows():
date = pd.to_datetime(row['date']).strftime('%Y-%m-%d')
price_target = f"{float(row['price_target']):.2f}" if pd.notna(row['price_target']) else "N/A"
ratings_table.add_row(
date,
row['analyst'],
str(row['prior_rating']),
str(row['current_rating']),
price_target
)
console.print(ratings_table)
except Exception as e:
logger.error(f"Error displaying ratings: {e}")
def display(self, ticker: Optional[str] = None) -> None:
if not ticker:
console.print("[red]Please provide a stock ticker[/red]")
return
ticker = ticker.upper()
console.rule(f"[yellow]Stock Analysis: {ticker}[/yellow]")
try:
# Display basic stock information
info = self.get_stock_info(ticker)
if not info.empty:
self.display_company_info(info)
prices = self.get_ytd_prices(ticker)
if not prices.empty:
self.display_price_graph(prices, ticker)
recent_prices = self.get_stock_prices(ticker, self.PRICE_HISTORY_DAYS)
if not recent_prices.empty:
self.display_stock_prices(recent_prices)
metrics = self.get_financial_metrics(ticker)
if not metrics.empty:
self.display_financial_metrics(metrics)
news = self.get_stock_news(ticker, self.NEWS_LIMIT)
if not news.empty:
self.display_news(news)
ratings = self.get_ratings(ticker)
if not ratings.empty:
self.display_ratings(ratings)
console.rule()
# Display insights menu
console.print("\nGet Deeper Insights:")
insights = [
"1. Historical: Price history and charts",
"2. Financials: Balance sheet, Income statement, Cash flow",
"3. Valuation: Company valuation metrics",
"4. Metrics: Key performance indicators",
"5. Technical: Technical analysis indicators",
"6. News: Company news and updates",
"7. Consensus: Analyst ratings and consensus",
"8. Transcripts: Earnings call transcripts"
]
for insight in insights:
console.print(insight)
console.print("\nEnter a number (1-8) to view detailed insights or press Enter to exit:")
except Exception as e:
logger.error(f"Error displaying stock information: {e}")
console.print("[red]Error displaying stock information[/red]")
def handle_insights_command(self, ticker: str, choice: str) -> bool:
"""Handle insights menu selection"""
try:
from modules.signals.stocks_insights import StocksInsights, StocksInsightsView
insight_num = int(choice)
if 1 <= insight_num <= 8:
insights = StocksInsights(self.db)
view = StocksInsightsView(insights)
view.handle_insight_request(ticker, insight_num)
return True
return False
except (ValueError, ImportError) as e:
logger.error(f"Error handling insights command: {e}")
return False