RivaTerminal/modules/signals/stocks_insights.py

1193 lines
53 KiB
Python

# modules/signals/stocks_insights.py
import pandas as pd
import numpy as np
import logging
from datetime import datetime, timedelta
from typing import Optional, List, Dict
import decimal
from decimal import Decimal
import json
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.text import Text
from rich.box import ROUNDED
from rich import box
from rich.rule import Rule
logger = logging.getLogger(__name__)
console = Console()
class StocksInsights:
"""Handle database queries and data retrieval for stock insights"""
def __init__(self, db_connection):
self.db = db_connection
self.styles = {
'positive': 'green',
'negative': 'red',
'neutral': 'blue',
'header': 'yellow bold',
'date': 'cyan',
'title': 'magenta bold',
'subtitle': 'blue bold'
}
def get_historical_prices(self, ticker: str, period: str = '1Y') -> pd.DataFrame:
"""Get historical price data with customizable periods"""
try:
query = """
SELECT
s.date,
s.open,
s.high,
s.low,
s.close,
s.volume,
s.adj_close
FROM stocks s
JOIN tickers t ON t.id = s.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND s.date >= CASE %s
WHEN '1M' THEN CURRENT_DATE - INTERVAL '1 month'
WHEN '3M' THEN CURRENT_DATE - INTERVAL '3 months'
WHEN '6M' THEN CURRENT_DATE - INTERVAL '6 months'
WHEN '1Y' THEN CURRENT_DATE - INTERVAL '1 year'
WHEN '2Y' THEN CURRENT_DATE - INTERVAL '2 years'
WHEN '5Y' THEN CURRENT_DATE - INTERVAL '5 years'
ELSE CURRENT_DATE - INTERVAL '1 year'
END
ORDER BY s.date ASC;
"""
df = self.db.execute_query(query, (ticker, period))
if not df.empty:
df['date'] = pd.to_datetime(df['date'])
return df
except Exception as e:
logger.error(f"Error fetching historical prices: {e}")
return pd.DataFrame()
def get_financial_statements(self, ticker: str) -> pd.DataFrame:
"""Get financial statements data with both annual and quarterly data"""
try:
query = """
WITH annual_statements AS (
SELECT
fs.date,
fs.period,
fs.statement_type,
fs.data::text as data
FROM financial_statements fs
JOIN tickers t ON t.id = fs.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND fs.period = 'annual'
AND fs.date >= CURRENT_DATE - INTERVAL '3 years'
ORDER BY fs.date DESC, fs.statement_type
),
quarterly_statements AS (
SELECT
fs.date,
fs.period,
fs.statement_type,
fs.data::text as data
FROM financial_statements fs
JOIN tickers t ON t.id = fs.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND fs.period = 'quarterly'
AND fs.date >= CURRENT_DATE - INTERVAL '12 months'
ORDER BY fs.date DESC, fs.statement_type
LIMIT 4
)
SELECT
'annual' as report_type,
date,
period,
statement_type,
data
FROM annual_statements
UNION ALL
SELECT
'quarterly' as report_type,
date,
period,
statement_type,
data
FROM quarterly_statements
ORDER BY report_type, date DESC;
"""
df = self.db.execute_query(query, (ticker, ticker))
if not df.empty:
df['date'] = pd.to_datetime(df['date'])
return df
except Exception as e:
logger.error(f"Error fetching financial statements: {e}")
return pd.DataFrame()
def get_valuation_metrics(self, ticker: str) -> pd.DataFrame:
"""Get comprehensive valuation metrics"""
try:
query = """
WITH latest_metrics AS (
SELECT
fm.date,
fm.metric_type,
fm.data::text as data,
t.ccy as currency
FROM financial_metrics fm
JOIN tickers t ON t.id = fm.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND fm.metric_type IN ('key_metrics', 'ratios')
AND fm.date = (
SELECT MAX(date)
FROM financial_metrics fm2
WHERE fm2.ticker_id = fm.ticker_id
AND fm2.metric_type = fm.metric_type
)
)
SELECT * FROM latest_metrics;
"""
return self.db.execute_query(query, (ticker,))
except Exception as e:
logger.error(f"Error fetching valuation metrics: {e}")
return pd.DataFrame()
def get_company_metrics(self, ticker: str) -> pd.DataFrame:
"""Get comprehensive company metrics"""
try:
query = """
SELECT
fm.date,
fm.metric_type,
fm.data::text as data,
t.ccy as currency
FROM financial_metrics fm
JOIN tickers t ON t.id = fm.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND fm.metric_type IN ('key_metrics', 'financial_ratios', 'growth_metrics')
AND fm.date = (
SELECT MAX(date)
FROM financial_metrics fm2
WHERE fm2.ticker_id = fm.ticker_id
AND fm2.metric_type = fm.metric_type
)
ORDER BY fm.metric_type;
"""
return self.db.execute_query(query, (ticker,))
except Exception as e:
logger.error(f"Error fetching company metrics: {e}")
return pd.DataFrame()
def get_technical_indicators(self, ticker: str) -> pd.DataFrame:
"""Get technical analysis indicators"""
try:
query = """
WITH latest_date AS (
SELECT MAX(datetime) as max_date
FROM technical_indicators ti
JOIN tickers t ON t.id = ti.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
)
SELECT
ti.datetime,
UPPER(ti.indicator_type) as indicator_type,
ti.value,
ti.period
FROM technical_indicators ti
JOIN tickers t ON t.id = ti.ticker_id
CROSS JOIN latest_date ld
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND ti.datetime >= ld.max_date - INTERVAL '5 days'
ORDER BY ti.datetime DESC, ti.indicator_type;
"""
return self.db.execute_query(query, (ticker, ticker))
except Exception as e:
logger.error(f"Error fetching technical indicators: {e}")
return pd.DataFrame()
def get_news_analysis(self, ticker: str) -> pd.DataFrame:
"""Get recent news with sentiment analysis"""
try:
query = """
SELECT
sn.published_at,
sn.news_type,
sn.title,
sn.content,
sn.source,
COALESCE(sn.sentiment_score, 0) as sentiment_score
FROM stocknews sn
JOIN tickers t ON t.id = sn.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND sn.published_at >= CURRENT_DATE - INTERVAL '30 days'
ORDER BY sn.published_at DESC
LIMIT 10;
"""
df = self.db.execute_query(query, (ticker,))
if not df.empty:
df['published_at'] = pd.to_datetime(df['published_at'])
return df
except Exception as e:
logger.error(f"Error fetching news: {e}")
return pd.DataFrame()
def get_analyst_consensus(self, ticker: str) -> Dict[str, pd.DataFrame]:
"""Get detailed analyst consensus and recommendations"""
try:
ratings_query = """
WITH latest_ratings AS (
SELECT DISTINCT ON (analyst)
date,
analyst,
prior_rating,
current_rating,
price_target
FROM ratings r
JOIN tickers t ON t.id = r.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
ORDER BY analyst, date DESC
)
SELECT
date,
analyst,
prior_rating,
current_rating,
price_target
FROM latest_ratings
ORDER BY date DESC;
"""
consensus_query = """
WITH rating_counts AS (
SELECT
COUNT(*) FILTER (
WHERE LOWER(current_rating) SIMILAR TO '%(buy|outperform|overweight)%'
) as buy_count,
COUNT(*) FILTER (
WHERE LOWER(current_rating) SIMILAR TO '%(hold|neutral|perform)%'
) as hold_count,
COUNT(*) FILTER (
WHERE LOWER(current_rating) SIMILAR TO '%(sell|underperform|underweight)%'
) as sell_count,
COUNT(*) as total_count,
AVG(price_target) as avg_price_target,
MIN(price_target) as min_price_target,
MAX(price_target) as max_price_target
FROM ratings r
JOIN tickers t ON t.id = r.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND date >= CURRENT_DATE - INTERVAL '90 days'
)
SELECT * FROM rating_counts;
"""
return {
'ratings': self.db.execute_query(ratings_query, (ticker,)),
'consensus': self.db.execute_query(consensus_query, (ticker,))
}
except Exception as e:
logger.error(f"Error fetching analyst consensus: {e}")
return {'ratings': pd.DataFrame(), 'consensus': pd.DataFrame()}
def get_earnings_transcripts(self, ticker: str) -> pd.DataFrame:
"""Get earnings call transcripts"""
try:
query = """
SELECT
et.date,
et.quarter,
et.year,
et.content::text as content
FROM earnings_transcripts et
JOIN tickers t ON t.id = et.ticker_id
WHERE UPPER(t.yf_ticker) = UPPER(%s)
AND et.content IS NOT NULL
ORDER BY et.date DESC
LIMIT 4;
"""
df = self.db.execute_query(query, (ticker,))
if not df.empty:
df['date'] = pd.to_datetime(df['date'])
return df
except Exception as e:
logger.error(f"Error fetching earnings transcripts: {e}")
return pd.DataFrame()
class StocksInsightsView:
"""Handle display formatting for stock insights"""
def __init__(self, insights):
self.insights = insights
self.styles = insights.styles
def _format_number(self, number: float, prefix: str = '', decimals: int = 2) -> str:
"""Format numbers with appropriate scaling"""
try:
if pd.isna(number):
return "N/A"
number = float(number)
if abs(number) >= 1_000_000_000:
return f"{prefix}{number/1_000_000_000:.{decimals}f}B"
elif abs(number) >= 1_000_000:
return f"{prefix}{number/1_000_000:.{decimals}f}M"
elif abs(number) >= 1_000:
return f"{prefix}{number/1_000:.{decimals}f}K"
else:
return f"{prefix}{number:.{decimals}f}"
except (ValueError, TypeError):
return "N/A"
def _get_color_for_value(self, value: float, baseline: float = 0) -> str:
"""Determine color based on value comparison"""
try:
if pd.isna(value):
return self.styles['neutral']
value = float(value)
return self.styles['positive'] if value > baseline else self.styles['negative']
except (ValueError, TypeError):
return self.styles['neutral']
def display_historical_prices(self, ticker: str):
"""Display historical price analysis with periods"""
periods = ['1M', '3M', '6M', '1Y', '2Y', '5Y']
for period in periods:
data = self.insights.get_historical_prices(ticker, period)
if data.empty:
continue
try:
latest_price = float(data['close'].iloc[-1])
first_price = float(data['close'].iloc[0])
price_change = latest_price - first_price
price_change_pct = (price_change / first_price) * 100
panel = Panel(
Text.assemble(
("Period: ", self.styles['header']),
(f"{period}\n", self.styles['neutral']),
("Start Price: ", self.styles['header']),
(f"{first_price:.2f}\n", self.styles['neutral']),
("End Price: ", self.styles['header']),
(f"{latest_price:.2f}\n", self.styles['neutral']),
("Change: ", self.styles['header']),
(
f"{price_change:+.2f} ({price_change_pct:+.2f}%)",
self.styles['positive'] if price_change >= 0 else self.styles['negative']
)
),
title=f"Price History - {period}",
box=box.ROUNDED
)
console.print(panel)
console.print("\n")
except Exception as e:
logger.error(f"Error processing price data for period {period}: {e}")
continue
def _format_statement_item(self, item: str, statement_type: str = None) -> str:
"""Format statement items for better readability"""
# Income Statement items
income_items = {
'revenue': 'Revenue',
'grossprofit': 'Gross Profit',
'grossprofitratio': 'Gross Margin',
'operatingincome': 'Operating Income',
'operatingincomeratio': 'Operating Margin',
'ebitda': 'EBITDA',
'ebitdaratio': 'EBITDA Margin',
'netincome': 'Net Income',
'netincomeratio': 'Net Margin',
'eps': 'EPS (Basic)',
'epsdiluted': 'EPS (Diluted)',
'costofrevenue': 'Cost of Revenue',
'researchanddevelopmentexpenses': 'R&D Expenses',
'sellingandmarketingexpenses': 'Sales & Marketing',
'generalandadministrativeexpenses': 'G&A Expenses',
'sellinggeneralandadministrativeexpenses': 'SG&A Expenses',
'depreciationandamortization': 'Depreciation & Amortization',
'operatingexpenses': 'Operating Expenses',
'interestincome': 'Interest Income',
'interestexpense': 'Interest Expense',
'incomebeforetax': 'Income Before Tax',
'incometaxexpense': 'Income Tax Expense'
}
# Balance Sheet items
balance_items = {
'totalassets': 'Total Assets',
'totalliabilities': 'Total Liabilities',
'totalequity': 'Total Equity',
'cashandcashequivalents': 'Cash & Cash Equivalents',
'shortterminvestments': 'Short-term Investments',
'netreceivables': 'Accounts Receivable (Net)',
'inventory': 'Inventory',
'totalcurrentassets': 'Total Current Assets',
'propertyplantequipmentnet': 'Property, Plant & Equipment (Net)',
'goodwill': 'Goodwill',
'intangibleassets': 'Intangible Assets',
'totalnoncurrentassets': 'Total Non-current Assets',
'accountspayables': 'Accounts Payable',
'shorttermdebt': 'Short-term Debt',
'totalcurrentliabilities': 'Total Current Liabilities',
'longtermdebt': 'Long-term Debt',
'deferredrevenue': 'Deferred Revenue',
'totalnoncurrentliabilities': 'Total Non-current Liabilities',
'retainedearnings': 'Retained Earnings',
'commonstock': 'Common Stock',
'accumulatedothercomprehensiveincomeloss': 'Accumulated Other Comprehensive Income'
}
# Cash Flow Statement items with improved formatting
cashflow_items = {
'netincome': 'Net Income',
'depreciationandamortization': 'Depreciation & Amortization',
'deferredincometax': 'Deferred Income Tax',
'stockbasedcompensation': 'Stock-Based Compensation',
'othernoncashitems': 'Other Non-Cash Items',
'accountsreceivables': 'Changes in Accounts Receivable',
'accountspayables': 'Changes in Accounts Payable',
'inventory': 'Changes in Inventory',
'otherworkingcapital': 'Other Working Capital Changes',
'changeinworkingcapital': 'Net Working Capital Change',
'operatingcashflow': 'Operating Cash Flow',
'netcashprovidedbyoperatingactivities': 'Net Cash from Operations',
'capitalexpenditure': 'Capital Expenditure',
'investmentsinpropertyplantandequipment': 'Investments in PP&E',
'acquisitionsnet': 'Acquisitions (Net)',
'purchasesofinvestments': 'Purchase of Investments',
'salesmaturitiesofinvestments': 'Sale/Maturity of Investments',
'otherinvestingactivites': 'Other Investing Activities',
'netcashusedforinvestingactivites': 'Net Cash from Investing',
'debtrepayment': 'Debt Repayment',
'commonstockissued': 'Common Stock Issued',
'commonstockrepurchased': 'Stock Repurchased',
'dividendspaid': 'Dividends Paid',
'otherfinancingactivites': 'Other Financing Activities',
'netcashusedprovidedbyfinancingactivities': 'Net Cash from Financing',
'freecashflow': 'Free Cash Flow',
'netchangeincash': 'Net Change in Cash',
'cashatbeginningofperiod': 'Cash at Beginning of Period',
'cashatendofperiod': 'Cash at End of Period'
}
item_lower = item.lower()
# Choose appropriate mapping based on statement type
if statement_type == 'income':
mapping = income_items
elif statement_type == 'balance':
mapping = balance_items
elif statement_type == 'cash_flow':
mapping = cashflow_items
else:
mapping = {**income_items, **balance_items, **cashflow_items}
# Return mapped name if available
if item_lower in mapping:
return mapping[item_lower]
# Default formatting for items not in the mapping
words = item_lower.replace('_', ' ').split()
return ' '.join(word.capitalize() for word in words)
def _should_skip_item(self, item: str, values: List) -> bool:
"""Determine if an item should be skipped based on its values"""
# Technical fields to always skip
skip_fields = {'cik', 'date', 'link', 'period', 'symbol', 'finallink',
'fillingdate', 'accepteddate', 'calendaryear', 'reportedcurrency'}
item_lower = item.lower()
# Skip if it's a technical field
if item_lower in skip_fields:
return True
# Skip if all values are N/A, null, empty or zero
all_na = all(
pd.isna(val) or
str(val).strip().upper() == 'N/A' or
str(val).strip() == '' or
str(val).strip() == '0' or
str(val).strip() == '$0.00'
for val in values
)
return all_na
def display_financial_statements(self, ticker: str):
"""Display financial statements with both annual and quarterly views"""
data = self.insights.get_financial_statements(ticker)
if data.empty:
console.print("[red]No financial statements available[/red]")
return
# Split data into annual and quarterly
annual_data = data[data['report_type'] == 'annual']
quarterly_data = data[data['report_type'] == 'quarterly']
for stmt_type in ['income', 'balance', 'cash_flow']:
console.print(f"\n[{self.styles['header']}]{stmt_type.upper()} STATEMENT[/{self.styles['header']}]")
# Process Annual Data
annual_stmt = annual_data[annual_data['statement_type'] == stmt_type].sort_values('date', ascending=False)
if not annual_stmt.empty:
console.print(f"\n[{self.styles['subtitle']}]Annual View[/{self.styles['subtitle']}]")
annual_table = Table(
box=box.ROUNDED,
title_style=self.styles['header']
)
# Add columns for items and years
annual_table.add_column("Item", style=self.styles['neutral'])
years = sorted(annual_stmt['date'].dt.year.unique(), reverse=True)
for year in years:
annual_table.add_column(str(year), style=self.styles['neutral'])
try:
# Get first statement to establish base items
base_items = json.loads(annual_stmt.iloc[0]['data']) if isinstance(annual_stmt.iloc[0]['data'], str) else annual_stmt.iloc[0]['data']
for item in base_items.keys():
# Get all values for this item across years to check if should be skipped
item_values = []
for year in years:
year_data = annual_stmt[annual_stmt['date'].dt.year == year]
if not year_data.empty:
stmt_data = json.loads(year_data.iloc[0]['data']) if isinstance(year_data.iloc[0]['data'], str) else year_data.iloc[0]['data']
item_values.append(stmt_data.get(item))
# Skip if all values are N/A or item should be skipped
if self._should_skip_item(item, item_values):
continue
# Format item name
formatted_item = self._format_statement_item(item, stmt_type)
if not formatted_item:
continue
row_values = [formatted_item]
for year in years:
year_data = annual_stmt[annual_stmt['date'].dt.year == year]
if not year_data.empty:
stmt_data = json.loads(year_data.iloc[0]['data']) if isinstance(year_data.iloc[0]['data'], str) else year_data.iloc[0]['data']
value = stmt_data.get(item)
if pd.notna(value) and float(value) != 0:
# Format based on whether it's a ratio/percentage
if item.lower().endswith('ratio') or (isinstance(value, (float, int)) and abs(float(value)) < 100):
formatted_value = f"{float(value)*100:.1f}%" if float(value) < 1 else f"{float(value):.1f}%"
else:
formatted_value = self._format_number(value, '')
row_values.append(formatted_value)
else:
row_values.append('')
else:
row_values.append('')
annual_table.add_row(*row_values)
except Exception as e:
logger.error(f"Error processing annual statement data: {e}")
console.print(annual_table)
# Similar process for quarterly data
quarterly_stmt = quarterly_data[quarterly_data['statement_type'] == stmt_type].sort_values('date', ascending=False)
if not quarterly_stmt.empty:
console.print(f"\n[{self.styles['subtitle']}]Quarterly View[/{self.styles['subtitle']}]")
quarterly_table = Table(
box=box.ROUNDED,
title_style=self.styles['header']
)
quarterly_table.add_column("Item", style=self.styles['neutral'])
quarters = sorted(quarterly_stmt['date'].dt.strftime('%Y Q%q').unique(), reverse=True)
for quarter in quarters:
quarterly_table.add_column(quarter, style=self.styles['neutral'])
try:
base_items = json.loads(quarterly_stmt.iloc[0]['data']) if isinstance(quarterly_stmt.iloc[0]['data'], str) else quarterly_stmt.iloc[0]['data']
for item in base_items.keys():
# Get all values for this item across quarters to check if should be skipped
item_values = []
for quarter in quarters:
quarter_year = quarter.split()[0]
quarter_num = quarter.split()[1]
quarter_data = quarterly_stmt[
(quarterly_stmt['date'].dt.year.astype(str) == quarter_year) &
(quarterly_stmt['date'].dt.quarter.astype(str) == quarter_num[1])
]
if not quarter_data.empty:
stmt_data = json.loads(quarter_data.iloc[0]['data']) if isinstance(quarter_data.iloc[0]['data'], str) else quarter_data.iloc[0]['data']
item_values.append(stmt_data.get(item))
# Skip if all values are N/A or item should be skipped
if self._should_skip_item(item, item_values):
continue
# Format item name
formatted_item = self._format_statement_item(item, stmt_type)
if not formatted_item:
continue
row_values = [formatted_item]
for quarter in quarters:
quarter_year = quarter.split()[0]
quarter_num = quarter.split()[1]
quarter_data = quarterly_stmt[
(quarterly_stmt['date'].dt.year.astype(str) == quarter_year) &
(quarterly_stmt['date'].dt.quarter.astype(str) == quarter_num[1])
]
if not quarter_data.empty:
stmt_data = json.loads(quarter_data.iloc[0]['data']) if isinstance(quarter_data.iloc[0]['data'], str) else quarter_data.iloc[0]['data']
value = stmt_data.get(item)
if pd.notna(value) and float(value) != 0:
# Format based on whether it's a ratio/percentage
if item.lower().endswith('ratio') or (isinstance(value, (float, int)) and abs(float(value)) < 100):
formatted_value = f"{float(value)*100:.1f}%" if float(value) < 1 else f"{float(value):.1f}%"
else:
formatted_value = self._format_number(value, '')
row_values.append(formatted_value)
else:
row_values.append('')
else:
row_values.append('')
quarterly_table.add_row(*row_values)
except Exception as e:
logger.error(f"Error processing quarterly statement data: {e}")
console.print(quarterly_table)
console.print("\n")
def display_valuation_metrics(self, ticker: str):
"""Display valuation metrics and analysis"""
data = self.insights.get_valuation_metrics(ticker)
if data.empty:
console.print("[red]No valuation metrics available[/red]")
return
try:
# Create valuation metrics table
metrics_table = Table(
title="Valuation Metrics (TTM)",
box=box.ROUNDED,
title_style=self.styles['header']
)
metrics_table.add_column("Metric", style=self.styles['neutral'])
metrics_table.add_column("Value", style=self.styles['neutral'])
# Extract data from the DataFrame
key_metrics_data = None
ratios_data = None
for _, row in data.iterrows():
row_data = json.loads(row['data'])
if row['metric_type'] == 'key_metrics':
key_metrics_data = row_data
elif row['metric_type'] == 'ratios':
ratios_data = row_data
if not key_metrics_data or not ratios_data:
console.print("[red]Incomplete metrics data available[/red]")
return
# Define metrics to display with their sources
metrics_to_display = [
# Market Valuation
("Market Cap", "marketCapTTM", key_metrics_data, True, True),
("Enterprise Value", "enterpriseValueTTM", key_metrics_data, True, True),
# Multiples
("P/E Ratio", "peRatioTTM", ratios_data, False, False),
("EV/EBITDA", "enterpriseValueOverEBITDATTM", key_metrics_data, False, False),
("P/B Ratio", "priceBookValueRatioTTM", ratios_data, False, False),
("P/S Ratio", "priceToSalesRatioTTM", ratios_data, False, False),
("PEG Ratio", "pegRatioTTM", ratios_data, False, False),
# Returns
("ROE", "roeTTM", key_metrics_data, True, False),
("ROIC", "roicTTM", key_metrics_data, True, False),
("ROA", "returnOnAssetsTTM", ratios_data, True, False),
# Margins
("Gross Margin", "grossProfitMarginTTM", ratios_data, True, False),
("Operating Margin", "operatingProfitMarginTTM", ratios_data, True, False),
("Net Profit Margin", "netProfitMarginTTM", ratios_data, True, False),
# Financial Health
("Current Ratio", "currentRatioTTM", ratios_data, False, False),
("Debt/Equity", "debtEquityRatioTTM", ratios_data, False, False),
("Interest Coverage", "interestCoverageTTM", ratios_data, False, False),
# Dividend
("Dividend Yield", "dividendYieldPercentageTTM", key_metrics_data, True, False),
("Payout Ratio", "payoutRatioTTM", ratios_data, True, False)
]
for metric_name, key, data_source, is_percentage, is_currency in metrics_to_display:
try:
value = data_source.get(key)
if pd.notna(value) and value not in [0, "0", "0.0"]:
value = float(value)
note = ""
# Format based on type
if is_currency and abs(value) >= 1_000_000_000:
formatted_value = f"{value/1_000_000_000:.2f}B"
elif is_currency and abs(value) >= 1_000_000:
formatted_value = f"{value/1_000_000:.2f}M"
elif is_percentage:
formatted_value = f"{value*100:.2f}%" if abs(value) < 1 else f"{value:.2f}%"
else:
formatted_value = f"{value:.2f}"
# Add notes for certain metrics
if key == "peRatioTTM" and value < 0:
note = "Negative earnings"
elif key == "interestCoverageTTM" and value < 1:
note = "Below safe level"
elif key == "currentRatioTTM" and value < 1:
note = "Below safe level"
metrics_table.add_row(
metric_name,
formatted_value
)
except (ValueError, TypeError) as e:
continue
console.print(metrics_table)
console.print("\n")
except Exception as e:
logger.error(f"Error displaying valuation metrics: {e}")
console.print("[red]Error displaying valuation metrics[/red]")
def format_metric_name(self, metric: str) -> str:
"""Format metric names with proper financial terminology"""
metrics_mapping = {
# Returns and Profitability
'roettm': 'ROE',
'roicttm': 'ROIC',
'returnontangibleassetsttm': 'Return on Tangible Assets',
# Valuation Ratios
'peratiottm': 'P/E Ratio',
'pbratiottm': 'P/B Ratio',
'ptbratiottm': 'Price to Book',
'evtosalesttm': 'EV/Sales',
'evtofreecashflowttm': 'EV/FCF',
'evtooperatingcashflowttm': 'EV/Operating CF',
'enterprisevalueoverebitdattm': 'EV/EBITDA',
'pricetosalesratiottm': 'Price to Sales',
# Market Metrics
'marketcapttm': 'Market Cap',
'enterprisevaluettm': 'Enterprise Value',
'grahamnetnetttm': 'Graham Net-Net',
'grahamnumberttm': 'Graham Number',
# Per Share Metrics
'cashpersharettm': 'Cash per Share',
'capexpersharettm': 'CapEx per Share',
'revenuepersharettm': 'Revenue per Share',
'bookvaluepersharettm': 'Book Value per Share',
'netincomepersharettm': 'Net Income per Share',
'freecashflowpersharettm': 'FCF per Share',
'interestdebtpersharettm': 'Interest Debt per Share',
'operatingcashflowpersharettm': 'Operating CF per Share',
'tangiblebookvaluepersharettm': 'Tangible Book Value per Share',
'shareholdersequitypersharettm': "Shareholders' Equity per Share",
# Efficiency Ratios
'currentratiottm': 'Current Ratio',
'payablesturnoverttm': 'Payables Turnover',
'inventoryturnoverttm': 'Inventory Turnover',
'receivablesturnoverttm': 'Receivables Turnover',
# Debt Ratios
'debttoassetsttm': 'Debt to Assets',
'debttoequityttm': 'Debt to Equity',
'debttomarketcapttm': 'Debt to Market Cap',
'netdebttoebitdattm': 'Net Debt to EBITDA',
# Working Capital
'workingcapitalttm': 'Working Capital',
'averagepayablesttm': 'Average Payables',
'averageinventoryttm': 'Average Inventory',
'averagereceivablesttm': 'Average Receivables',
'netcurrentassetvaluettm': 'Net Current Asset Value',
# Time Metrics
'dayssalesoutstandingttm': 'Days Sales Outstanding',
'daysofinventoryonhandttm': 'Days Inventory on Hand',
'dayspayablesoutstandingttm': 'Days Payables Outstanding',
# Dividend Metrics
'dividendyieldttm': 'Dividend Yield',
'dividendpersharettm': 'Dividend per Share',
'payoutratiottm': 'Payout Ratio',
'dividendyieldpercentagettm': 'Dividend Yield %',
# Operating Metrics
'earningsyieldttm': 'Earnings Yield',
'incomequalityttm': 'Income Quality',
'capextorevenuettm': 'CapEx to Revenue',
'investedcapitalttm': 'Invested Capital',
'interestcoveragettm': 'Interest Coverage',
'tangibleassetvaluettm': 'Tangible Asset Value',
'capextodepreciationttm': 'CapEx to Depreciation',
'capextooperatingcashflowttm': 'CapEx to Operating CF',
'intangiblestototalassetsttm': 'Intangibles to Total Assets',
'stockbasedcompensationtorevenuettm': 'Stock Comp to Revenue',
'researchanddevelopementtorevenuettm': 'R&D to Revenue',
'salesgeneralandadministrativetorevenuettm': 'SG&A to Revenue'
}
# Remove 'ttm' suffix and look up in mapping
metric_base = metric.lower()
if metric_base in metrics_mapping:
return metrics_mapping[metric_base]
# Fall back to basic formatting if not in mapping
return metric.replace('ttm', '').replace('_', ' ').title()
def display_company_metrics(self, ticker: str):
"""Display comprehensive company metrics"""
data = self.insights.get_company_metrics(ticker)
if data.empty:
console.print("[red]No company metrics available[/red]")
return
try:
for metric_type in ['key_metrics', 'financial_ratios', 'growth_metrics']:
metrics_data = data[data['metric_type'] == metric_type]
if metrics_data.empty:
continue
table = Table(
title=f"Metrics (TTM)", # Include TTM in title only
box=box.ROUNDED,
title_style=self.styles['header']
)
table.add_column("Metric", style=self.styles['neutral'])
table.add_column("Value", style=self.styles['neutral'])
try:
row_data = json.loads(metrics_data['data'].iloc[0]) if isinstance(metrics_data['data'].iloc[0], str) else metrics_data['data'].iloc[0]
formatted_rows = []
for key, value in row_data.items():
if pd.notna(value):
try:
# Format the metric name
formatted_name = self.format_metric_name(key)
if not formatted_name:
continue
# Format the value based on type
if isinstance(value, (int, float)):
if abs(value) < 1 and key.lower().endswith(('ratio', 'yield', 'percentage')):
formatted_value = f"{float(value)*100:.2f}%"
elif abs(value) > 1_000_000_000:
formatted_value = f"{value/1_000_000_000:.2f}B"
elif abs(value) > 1_000_000:
formatted_value = f"{value/1_000_000:.2f}M"
elif 'ratio' in key.lower() or 'percentage' in key.lower():
formatted_value = f"{float(value):.2f}"
else:
formatted_value = f"{float(value):.2f}"
else:
formatted_value = str(value)
formatted_rows.append((formatted_name, formatted_value))
except (ValueError, TypeError):
continue
# Sort rows alphabetically by metric name
formatted_rows.sort(key=lambda x: x[0])
# Add rows to table
for name, value in formatted_rows:
table.add_row(name, value)
except Exception as e:
logger.error(f"Error processing metric data: {e}")
continue
console.print(table)
console.print("\n")
except Exception as e:
logger.error(f"Error displaying company metrics: {e}")
console.print("[red]Error displaying company metrics[/red]")
def display_technical_indicators(self, ticker: str):
"""Display technical analysis indicators"""
data = self.insights.get_technical_indicators(ticker)
if data.empty:
console.print("[red]No technical indicators available[/red]")
return
try:
# Group by indicator type
for indicator_type in ['SMA', 'EMA', 'RSI', 'MACD', 'BB_UPPER', 'BB_LOWER', 'ADX']:
indicator_data = data[data['indicator_type'] == indicator_type]
if indicator_data.empty:
continue
table = Table(
title=f"{indicator_type} Analysis",
box=box.ROUNDED,
title_style=self.styles['header']
)
table.add_column("Date", style=self.styles['date'])
table.add_column("Value", style=self.styles['neutral'])
table.add_column("Period", style=self.styles['neutral'])
table.add_column("Signal", style=self.styles['neutral'])
for _, row in indicator_data.iterrows():
value = float(row['value'])
signal = "Neutral"
signal_style = self.styles['neutral']
# Signal logic for different indicators
if indicator_type == 'RSI':
if value > 70:
signal = "Overbought"
signal_style = self.styles['negative']
elif value < 30:
signal = "Oversold"
signal_style = self.styles['positive']
elif indicator_type == 'MACD':
if value > 0:
signal = "Bullish"
signal_style = self.styles['positive']
else:
signal = "Bearish"
signal_style = self.styles['negative']
elif indicator_type == 'ADX':
if value > 25:
signal = "Strong Trend"
signal_style = self.styles['positive']
else:
signal = "Weak Trend"
signal_style = self.styles['neutral']
table.add_row(
row['datetime'].strftime('%Y-%m-%d'),
f"{value:.4f}",
str(row['period']),
Text(signal, style=signal_style)
)
console.print(table)
console.print("\n")
except Exception as e:
logger.error(f"Error displaying technical indicators: {e}")
console.print("[red]Error displaying technical indicators[/red]")
def display_news_analysis(self, ticker: str):
"""Display news with sentiment analysis"""
data = self.insights.get_news_analysis(ticker)
if data.empty:
console.print("[red]No news available[/red]")
return
try:
news_table = Table(
title="Recent News Analysis",
box=box.ROUNDED,
title_style=self.styles['header']
)
news_table.add_column("Date", style=self.styles['date'], width=16)
news_table.add_column("Title", style=self.styles['title'], width=120)
news_table.add_column("Type", style=self.styles['neutral'], width=10)
for _, row in data.iterrows():
title_truncated = row['title'][:117] + "..." if len(row['title']) > 117 else row['title']
news_table.add_row(
row['published_at'].strftime('%Y-%m-%d %H:%M'),
title_truncated,
row['news_type'].title()
)
console.print(news_table)
console.print("\n")
except Exception as e:
logger.error(f"Error displaying news analysis: {e}")
console.print("[red]Error displaying news[/red]")
def display_analyst_consensus(self, ticker: str):
"""Display analyst consensus and recommendations"""
data = self.insights.get_analyst_consensus(ticker)
if data['consensus'].empty and data['ratings'].empty:
console.print("[red]No analyst consensus available[/red]")
return
try:
# Display consensus summary if available
if not data['consensus'].empty:
consensus = data['consensus'].iloc[0]
summary_panel = Panel(
Text.assemble(
("Coverage: ", self.styles['header']),
(f"{int(consensus['total_count'])} analysts\n\n", self.styles['neutral']),
("Distribution:\n", self.styles['header']),
("Buy/Outperform: ", self.styles['header']),
(f"{int(consensus['buy_count'])} analysts\n", self.styles['positive']),
("Hold/Neutral: ", self.styles['header']),
(f"{int(consensus['hold_count'])} analysts\n", self.styles['neutral']),
("Sell/Underperform: ", self.styles['header']),
(f"{int(consensus['sell_count'])} analysts\n\n", self.styles['negative']),
("Price Target Range: ", self.styles['header']),
(f"{float(consensus['min_price_target']):.2f} - {float(consensus['max_price_target']):.2f}\n", self.styles['neutral']),
("Average Target: ", self.styles['header']),
(f"{float(consensus['avg_price_target']):.2f}", self.styles['neutral'])
),
title="Analyst Consensus Summary",
box=box.ROUNDED
)
console.print(summary_panel)
console.print("\n")
# Display detailed ratings
if not data['ratings'].empty:
ratings_table = Table(
title="Recent Analyst Ratings",
box=box.ROUNDED,
title_style=self.styles['header']
)
ratings_table.add_column("Date", style=self.styles['date'])
ratings_table.add_column("Analyst", style=self.styles['neutral'])
ratings_table.add_column("Rating Change", style=self.styles['neutral'])
ratings_table.add_column("Current Rating", style=self.styles['neutral'])
ratings_table.add_column("Price Target", style=self.styles['neutral'])
for _, row in data['ratings'].iterrows():
try:
# Determine rating change direction and color
if row['prior_rating'] == row['current_rating']:
change_color = self.styles['neutral']
rating_change = ""
else:
current_rating_lower = str(row['current_rating']).lower()
if 'buy' in current_rating_lower or 'outperform' in current_rating_lower:
change_color = self.styles['positive']
rating_change = ""
elif 'sell' in current_rating_lower or 'underperform' in current_rating_lower:
change_color = self.styles['negative']
rating_change = ""
else:
change_color = self.styles['neutral']
rating_change = ""
ratings_table.add_row(
pd.to_datetime(row['date']).strftime('%Y-%m-%d'),
str(row['analyst']),
Text(f"{row['prior_rating']} {rating_change} {row['current_rating']}", style=change_color),
str(row['current_rating']),
f"{float(row['price_target']):.2f}" if pd.notna(row['price_target']) else "N/A"
)
except (ValueError, TypeError):
continue
console.print(ratings_table)
console.print("\n")
except Exception as e:
logger.error(f"Error displaying analyst consensus: {e}")
console.print("[red]Error displaying analyst consensus[/red]")
def display_earnings_transcripts(self, ticker: str):
"""Display earnings call transcripts analysis"""
data = self.insights.get_earnings_transcripts(ticker)
if data.empty:
console.print("[red]No earnings transcripts available[/red]")
return
try:
for _, transcript in data.iterrows():
# Create header panel for each transcript
header_panel = Panel(
Text.assemble(
("Quarter: ", self.styles['header']),
(f"Q{transcript['quarter']} {transcript['year']}\n", self.styles['neutral']),
("Date: ", self.styles['header']),
(transcript['date'].strftime('%Y-%m-%d'), self.styles['neutral'])
),
title="Earnings Call Transcript",
box=box.ROUNDED
)
console.print(header_panel)
# Display content preview
if pd.notna(transcript['content']):
content = transcript['content']
preview_length = 5500 # Show first 500 characters
preview = content[:preview_length] + "..." if len(content) > preview_length else content
content_panel = Panel(
preview,
title="Transcript Preview",
box=box.ROUNDED
)
console.print(content_panel)
console.print("\n")
except Exception as e:
logger.error(f"Error displaying earnings transcripts: {e}")
console.print("[red]Error displaying earnings transcripts[/red]")
def handle_insight_request(self, ticker: str, insight_type: int):
"""Handle insight request and display appropriate visualization"""
console.rule(f"[yellow]Detailed Analysis: {ticker}[/yellow]")
try:
if insight_type == 1:
self.display_historical_prices(ticker)
elif insight_type == 2:
self.display_financial_statements(ticker)
elif insight_type == 3:
self.display_valuation_metrics(ticker)
elif insight_type == 4:
self.display_company_metrics(ticker)
elif insight_type == 5:
self.display_technical_indicators(ticker)
elif insight_type == 6:
self.display_news_analysis(ticker)
elif insight_type == 7:
self.display_analyst_consensus(ticker)
elif insight_type == 8:
self.display_earnings_transcripts(ticker)
else:
console.print("[red]Invalid insight type selected[/red]")
console.print("\nPress Enter to return to ticker prompt...")
input()
except Exception as e:
logger.error(f"Error displaying insight type {insight_type}: {e}")
console.print(f"[red]Error displaying analysis: {str(e)}[/red]")