RivaTerminal/rivaterminalbu

641 lines
26 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import logging
import readline
from dotenv import load_dotenv
import time
import pandas as pd
from rich.console import Console
from rich.prompt import Prompt
from rich.panel import Panel
from rich.text import Text
from rich import box
from rich.table import Table
from rich.columns import Columns
from typing import Optional, List
project_root = os.path.dirname(os.path.abspath(__file__))
sys.path.append(project_root)
from modules.database import DatabaseConnection
from modules.signals.markets_view import MarketsView
from modules.signals.sectors_view import SectorsView
from modules.signals.commodities_view import CommoditiesView
from modules.signals.forex_view import ForexView
from modules.signals.stocks_view import StocksView
from modules.signals.whispers import Whispers
load_dotenv()
logging.basicConfig(
level=logging.WARNING,
format='%(message)s',
handlers=[
logging.FileHandler('rivaterminal.log', mode='a'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
console = Console()
LOGO = """
*
*****
** ****
* * ** *** *
** ** **** *
*** ** * *** *
** ** ******* **** *
* ** ** * **** **
** ******* *** ******
* ** *** ** ** ***** *
* *** ******** ***** *
* ** *********** *
** *** **** # ********
*** *** ** ********
**** *** ********** *
** ** *** ** *****
*** **+** * ***# *** **
***** ***+* ******** *****
#* *** *** **++**
* *** ** ** **++** ** ******* *** ** ** ** *** ******** ***
**** #*********** *** *+******** *** *+* *+* **** *+* *********+** ****
***++**** ** ***** *+** **+* *** **+ *** ***** *+* **+* ***+* ****
#***++***** *+**** *+** *+* *** +* #** ** *** *+* **+* **+* ****
**** *** *+** **** *** *** #*** **** *** *** **+* **** ****
******** ** **+* *+******* *** *** *** **+* *** *+* **+* **** ****
************ ***** *+****+ *** *** *** *+*******++* *+* **+* **** ****
** *+** **** *** *+ +* ************** *+* **+* **+* ****
****+****** *+** **** *** **+** *** **** *** **+* ***+* ****
**** ** *+** **** *** *+* *+* *+* *++++++* **++++++++* ****
*** *
"""
class Dashboard:
def __init__(self, db_connection):
self.db = db_connection
self.console = Console()
self.logo = LOGO
def display_commands(self):
table = Table(title="Available Commands", box=box.HEAVY_EDGE)
table.add_column("Command", style="cyan")
table.add_column("Description", style="white")
commands = [
("main", "Return to main screen"),
("marketsview", "View market indices and performance"),
("sectorsview", "View sector performance and analysis"),
("commoditiesview", "View commodities markets"),
("forexview", "View forex markets"),
("stocksview", "View stock analysis and data"),
("whispers", "View market whispers and insights"),
("news", "View market news [-d days] [-s source] [-t ticker]"),
("portfolio", "Portfolio management (coming soon)"),
("pricing", "Options pricing tools (coming soon)"),
("reports", "Financial reports (coming soon)"),
("slm", "SLM Reports (coming soon)"),
("talk", "Revaldi LLM (coming soon)"),
("help", "Display this help message"),
("quit", "Exit the terminal")
]
for command, description in commands:
table.add_row(command, description)
self.console.print(table)
def get_market_overview(self):
query = """
WITH market_symbols AS (
SELECT DISTINCT ON (m.symbol)
m.symbol,
mp.date,
mp.close as last_price,
ROUND(((mp.close - LAG(mp.close) OVER (PARTITION BY m.id ORDER BY mp.date)) /
NULLIF(LAG(mp.close) OVER (PARTITION BY m.id ORDER BY mp.date), 0) * 100)::numeric, 2) as price_change_pct
FROM market_indexes_ref m
JOIN market_indexes_prices mp ON mp.index_id = m.id
WHERE m.is_active = true
AND m.symbol IN (
'^SPX', -- S&P 500
'^NDX', -- NASDAQ 100
'^STOXX50E', -- EuroStoxx50
'^N300', -- Nikkei
'^HSI', -- Hang Seng
'MSCIWORLD', -- MSCI World
'EUR=X' -- USD/EUR
)
AND mp.date >= CURRENT_DATE - INTERVAL '7 days'
ORDER BY m.symbol, mp.date DESC
)
SELECT
ROW_NUMBER() OVER (ORDER BY
CASE
WHEN symbol = '^SPX' THEN 1
WHEN symbol = '^NDX' THEN 2
WHEN symbol = '^STOXX50E' THEN 3
WHEN symbol = '^N300' THEN 4
WHEN symbol = '^HSI' THEN 5
WHEN symbol = 'MSCIWORLD' THEN 6
WHEN symbol = 'EUR=X' THEN 7
END
) - 1 as row_num,
CASE
WHEN symbol = '^SPX' THEN 'S&P500'
WHEN symbol = '^NDX' THEN 'Nasdaq'
WHEN symbol = '^STOXX50E' THEN 'EuroStoxx50'
WHEN symbol = '^N300' THEN 'Nikkei'
WHEN symbol = '^HSI' THEN 'HangSeng'
WHEN symbol = 'MSCIWORLD' THEN 'MSCI World'
WHEN symbol = 'EUR=X' THEN 'USD/EUR'
END as index_name,
last_price::numeric as last_price,
price_change_pct::numeric as price_change_pct
FROM market_symbols;
"""
try:
return self.db.execute_query(query)
except Exception as e:
logger.error(f"Market data query failed: {e}")
return pd.DataFrame()
def get_market_panel(self):
"""Generate market overview panel content"""
market_data = self.get_market_overview()
if market_data is not None and not market_data.empty:
table = Table(
show_header=True,
header_style="white",
box=box.MINIMAL_DOUBLE_HEAD,
border_style="white",
padding=(0, 2)
)
table.add_column("Index", style="white", width=25, justify="left")
table.add_column("Last Price", style="white", justify="right", width=15)
table.add_column("Change", style="white", justify="right", width=12)
market_data = market_data.sort_values(by='row_num')
for _, row in market_data.iterrows():
change_color = "red" if row['price_change_pct'] < 0 else "green"
if row['index_name'] == 'USD/EUR':
formatted_price = f"{row['last_price']:.4f}"
else:
formatted_price = f"{row['last_price']:,.2f}"
table.add_row(
str(row['index_name']),
formatted_price,
f"{row['price_change_pct']:+.2f}%",
style=change_color
)
return table
return Text("No market data available")
def get_eco_news(self):
"""Get the latest economic news for dashboard display"""
query = """
SELECT
'ECO' as news_type,
title,
source,
published_at
FROM econews
WHERE published_at >= CURRENT_DATE - INTERVAL '2 days'
ORDER BY published_at DESC
LIMIT 8;
"""
try:
return self.db.execute_query(query)
except Exception as e:
logger.error(f"Economic news query failed: {e}")
return pd.DataFrame()
def get_sdg_news(self):
"""Get the latest SDG news for dashboard display"""
query = """
SELECT
'SDG' as news_type,
title,
category as source,
published_at
FROM news
WHERE published_at >= CURRENT_DATE - INTERVAL '2 days'
ORDER BY published_at DESC
LIMIT 8;
"""
try:
return self.db.execute_query(query)
except Exception as e:
logger.error(f"SDG news query failed: {e}")
return pd.DataFrame()
def get_eco_news_panel(self):
"""Generate economic news panel content"""
news_data = self.get_eco_news()
if news_data is not None and not news_data.empty:
table = Table(
show_header=False,
box=box.MINIMAL_DOUBLE_HEAD,
border_style="white",
width=155,
padding=(0, 1)
)
table.add_column(width=65, overflow="fold")
for _, row in news_data.iterrows():
published_time = pd.to_datetime(row['published_at']).strftime('%H:%M')
title = row['title']
if len(title) > 152:
title = title[:150] + "..."
formatted_row = f"[dim]{published_time}[/dim] [green]ECO[/green] 📈 {title}"
table.add_row(formatted_row)
return table
return Text("No economic news available")
def get_sdg_news_panel(self):
"""Generate SDG news panel content"""
news_data = self.get_sdg_news()
if news_data is not None and not news_data.empty:
table = Table(
show_header=False,
box=box.MINIMAL_DOUBLE_HEAD,
border_style="white",
width=155,
padding=(0, 1)
)
table.add_column(width=65, overflow="fold")
for _, row in news_data.iterrows():
published_time = pd.to_datetime(row['published_at']).strftime('%H:%M')
title = row['title']
if len(title) > 152:
title = title[:150] + "..."
formatted_row = f"[dim]{published_time}[/dim] [blue]SDG[/blue] 🌍 {title}"
table.add_row(formatted_row)
return table
return Text("No SDG news available")
def display_welcome_screen(self):
self.console.print(self.logo, style="cyan")
self.console.print("\nWelcome to RivaCube Financial Terminal", style="yellow bold")
from datetime import datetime
current_date = datetime.now()
day_suffix = "th" if 11 <= current_date.day <= 13 else {1: "st", 2: "nd", 3: "rd"}.get(current_date.day % 10, "th")
formatted_date = current_date.strftime(f"%A, %B {current_date.day}{day_suffix} %Y")
date_copyright = Text()
date_copyright.append(formatted_date, style="cyan italic")
date_copyright.append(" ", style="dim")
date_copyright.append("Copyright SLM IMPACT FINANCE 2020", style="dim")
self.console.print(date_copyright)
# Create market panel
market_panel = Panel(
self.get_market_panel(),
title="Markets Overview",
border_style="white",
box=box.ROUNDED,
width=65,
padding=(0, 1)
)
# Create news panels
eco_panel = Panel(
self.get_eco_news_panel(),
title="Economic News",
border_style="green",
box=box.ROUNDED,
width=160,
padding=(0, 1)
)
sdg_panel = Panel(
self.get_sdg_news_panel(),
title="SDG News",
border_style="blue",
box=box.ROUNDED,
width=160,
padding=(0, 1)
)
# Print panels: market on top, news panels side by side below
self.console.print("\n") # Add some spacing
self.console.print(market_panel)
self.console.print("\n") # Add spacing between panels
self.console.print(sdg_panel)
self.console.print("\n") # Add spacing between panels
self.console.print(eco_panel)
self.console.print("\n")
self.display_commands()
class RivaTerminal:
def __init__(self):
self.db = None
self.components_initialized = False
self.commands = {}
self.tickers = {}
self.current_mode = None
self.current_ticker = None
self.styles = {
'success': 'green',
'error': 'red bold',
'warning': 'yellow',
'info': 'cyan',
'prompt': 'blue',
'title': 'magenta bold',
'accent': 'yellow bold'
}
self._init_database()
if self.db:
self._init_components()
self._setup_autocomplete()
def _init_database(self) -> None:
max_retries = 3
for retry in range(max_retries):
try:
self.db = DatabaseConnection()
self.db.execute_query("SELECT 1")
return
except Exception as e:
if retry < max_retries - 1:
console.print(f"[{self.styles['warning']}]Retrying database connection... ({retry + 1}/{max_retries})[/]")
time.sleep(2)
console.print(f"[{self.styles['error']}]Database connection unavailable. Features will be limited.[/]")
self.db = None
def _init_components(self) -> None:
try:
if not self.db:
logger.error("Cannot initialize: No database connection")
return
self.dashboard = Dashboard(self.db)
self.markets_view = MarketsView(self.db)
self.sectors_view = SectorsView(self.db)
self.commodities_view = CommoditiesView(self.db)
self.forex_view = ForexView(self.db)
self.stocks_view = StocksView(self.db)
self.whispers = Whispers(self.db)
self.commands = {
'quit': self._handle_quit,
'help': lambda _: self.dashboard.display_commands(),
'main': lambda _: self.dashboard.display_welcome_screen(),
'marketsview': lambda _: self.markets_view.display(),
'sectorsview': lambda _: self.sectors_view.display(),
'commoditiesview': lambda _: self.commodities_view.display(),
'forexview': lambda _: self.forex_view.display(),
'stocksview': self._handle_stocks_view,
'whispers': lambda _: self.whispers.display(),
'portfolio': lambda _: console.print("[yellow]Portfolio module coming soon...[/]"),
'pricing': lambda _: console.print("[yellow]Pricing module coming soon...[/]"),
'reports': lambda _: console.print("[yellow]Reports module coming soon...[/]"),
'slm': lambda _: console.print("[yellow]SLM Reports coming soon...[/]"),
'talk': lambda _: console.print("[yellow]Revaldi LLM coming soon...[/]")
}
self.components_initialized = True
except Exception as e:
logger.error(f"Component initialization error: {e}")
self.components_initialized = False
console.print(f"[{self.styles['error']}]Component initialization failed.[/]")
def _setup_autocomplete(self) -> None:
try:
if not self.db:
return
query = """
SELECT DISTINCT
t.yf_ticker,
t.name,
s.name as sector,
z.name as zone
FROM tickers t
LEFT JOIN sectors s ON s.id = t.sector_id
LEFT JOIN zones z ON z.id = t.zone_id
WHERE t.yf_ticker IS NOT NULL
ORDER BY t.yf_ticker;
"""
tickers_df = self.db.execute_query(query)
self.tickers = {
row['yf_ticker']: {
'name': row['name'],
'sector': row['sector'] if pd.notna(row['sector']) else 'N/A',
'zone': row['zone'] if pd.notna(row['zone']) else 'N/A'
}
for _, row in tickers_df.iterrows()
}
readline.parse_and_bind('tab: complete')
readline.set_completer(self._completer)
readline.set_completer_delims(' \t\n;')
except Exception as e:
logger.error(f"Autocomplete setup error: {e}")
self.tickers = {}
def _completer(self, text: str, state: int) -> Optional[str]:
try:
if self.current_mode == 'stocks':
text = text.upper()
options = [
f"{ticker:<6} - {info['name']:<30} [{info['zone']}] ({info['sector']})"
for ticker, info in self.tickers.items()
if ticker.startswith(text)
]
options.sort()
return options[state] if state < len(options) else None
else:
buffer = readline.get_line_buffer()
if not buffer or buffer.endswith(' '):
options = [cmd for cmd in self.commands if cmd.startswith(text.lower())]
return sorted(options)[state] if state < len(options) else None
return None
except Exception as e:
logger.error(f"Autocomplete error: {e}")
return None
def _handle_stocks_view(self, args=None) -> bool:
try:
if args and args[0]:
ticker = args[0].split(' - ')[0].strip().upper()
if ticker in self.tickers:
return self.stocks_view.display(ticker)
console.print(f"[{self.styles['error']}]Unknown ticker: {ticker}[/]")
self._handle_stocks_view()
return True
self.current_mode = 'stocks'
self.current_ticker = None
console.print(f"\n[{self.styles['info']}]Enter stock ticker (Tab for suggestions)[/]")
console.print("[blue]Commands: 'main' for main menu, 'quit' to exit[/]")
while True:
try:
prompt = f"Stock ({self.current_ticker})> " if self.current_ticker else "\nStock> "
ticker_input = Prompt.ask(prompt, console=console)
if not ticker_input:
continue
ticker_input = ticker_input.lower()
if ticker_input == 'main':
self.current_mode = None
self.current_ticker = None
return self.dashboard.display_welcome_screen()
if ticker_input == 'quit':
if Prompt.ask("Exit terminal?", choices=['yes', 'no'], default='no') == 'yes':
return self._handle_quit()
continue
if self.current_ticker:
try:
insight_num = int(ticker_input)
if 1 <= insight_num <= 8:
self.handle_insights_command(self.current_ticker, ticker_input)
continue
except ValueError:
pass
ticker = ticker_input.split(' - ')[0].strip().upper()
if ticker in self.tickers:
self.current_ticker = ticker
self.stocks_view.display(ticker)
else:
console.print(f"[{self.styles['error']}]Unknown ticker: {ticker}[/]")
console.print("[blue]Press Tab for suggestions[/]")
except (KeyboardInterrupt, EOFError):
console.print("\n[blue]Use 'main' for main menu or 'quit' to exit[/]")
continue
except Exception as e:
console.print(f"\n[{self.styles['error']}]Error: {str(e)}[/]")
continue
except Exception as e:
logger.error(f"Stocks view error: {e}")
self.current_mode = None
self.current_ticker = None
return True
def handle_insights_command(self, ticker: str, choice: str) -> bool:
try:
from modules.signals.stocks_insights import StocksInsights, StocksInsightsView
insight_num = int(choice)
if 1 <= insight_num <= 8:
insights = StocksInsights(self.db)
view = StocksInsightsView(insights)
view.handle_insight_request(ticker, insight_num)
return True
return False
except Exception as e:
logger.error(f"Insights error: {e}")
console.print(f"[{self.styles['error']}]Error displaying analysis: {str(e)}[/]")
return False
def run(self) -> None:
console.clear()
self.dashboard.display_welcome_screen()
while True:
try:
prompt = "Stock> " if self.current_mode == 'stocks' else "Rivaldi> "
command_line = Prompt.ask(f"\n{prompt}", console=console)
if not command_line:
continue
if command_line.lower() == 'quit':
if Prompt.ask("Exit terminal?", choices=['yes', 'no'], default='no') == 'yes':
if not self.process_command('quit', None):
return
continue
parts = command_line.split()
if not self.process_command(parts[0].lower(), parts[1:] if len(parts) > 1 else None):
if parts[0].lower() == 'quit':
return
continue
except KeyboardInterrupt:
console.print("\n[blue]Use 'quit' to exit properly[/]")
continue
except EOFError:
console.print("\n[blue]Use 'quit' to exit properly[/]")
continue
except Exception as e:
console.print(f"\n[{self.styles['error']}]Command processing error[/]")
continue
def process_command(self, command: str, args: Optional[List[str]] = None) -> bool:
try:
if command in self.commands:
return self.commands[command](args)
console.print(f"[{self.styles['error']}]Unknown command: {command}[/]")
console.print("[blue]Type 'help' for available commands[/]")
return True
except Exception as e:
console.print(f"[{self.styles['error']}]Command error: {str(e)}[/]")
console.print("[blue]Type 'help' for available commands[/]")
return True
def _handle_quit(self, _=None) -> bool:
try:
if self.db:
self.db.close()
self.db = None
console.print(f"\n[{self.styles['success']}]Goodbye![/]")
return False
except Exception as e:
logger.error(f"Cleanup error: {e}")
return True
def main():
console.clear()
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
terminal = RivaTerminal()
terminal.run()
break
except Exception as e:
logger.error(f"Terminal error: {e}")
retry_count += 1
if retry_count < max_retries:
console.print("\n[yellow]Restarting terminal...[/]")
time.sleep(2)
else:
console.print(f"\n[red]Terminal failed to start after {max_retries} attempts.[/]")
continue
if __name__ == "__main__":
main()