RivaTerminal/rivaterminal.py

682 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import logging
import readline
import requests
from dotenv import load_dotenv
import time
import pandas as pd
from rich.console import Console
from rich.prompt import Prompt
from rich.panel import Panel
from rich.text import Text
from rich import box
from rich.table import Table
from rich.columns import Columns
from typing import Optional, List
project_root = os.path.dirname(os.path.abspath(__file__))
sys.path.append(project_root)
from modules.database import DatabaseConnection
from modules.signals.markets_view import MarketsView
from modules.signals.sectors_view import SectorsView
from modules.signals.commodities_view import CommoditiesView
from modules.signals.forex_view import ForexView
from modules.signals.stocks_view import StocksView
from modules.signals.whispers import Whispers
load_dotenv()
logging.basicConfig(
level=logging.WARNING,
format='%(message)s',
handlers=[
logging.FileHandler('rivaterminal.log', mode='a'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
console = Console()
LOGO = """
*
*****
** ****
* * ** *** *
** ** **** *
*** ** * *** *
** ** ******* **** *
* ** ** * **** **
** ******* *** ******
* ** *** ** ** ***** *
* *** ******** ***** *
* ** *********** *
** *** **** # ********
*** *** ** ********
**** *** ********** *
** ** *** ** *****
*** **+** * ***# *** **
***** ***+* ******** *****
#* *** *** **++**
* *** ** ** **++** ** ******* *** ** ** ** *** ******** ***
**** #*********** *** *+******** *** *+* *+* **** *+* *********+** ****
***++**** ** ***** *+** **+* *** **+ *** ***** *+* **+* ***+* ****
#***++***** *+**** *+** *+* *** +* #** ** *** *+* **+* **+* ****
**** *** *+** **** *** *** #*** **** *** *** **+* **** ****
******** ** **+* *+******* *** *** *** **+* *** *+* **+* **** ****
************ ***** *+****+ *** *** *** *+*******++* *+* **+* **** ****
** *+** **** *** *+ +* ************** *+* **+* **+* ****
****+****** *+** **** *** **+** *** **** *** **+* ***+* ****
**** ** *+** **** *** *+* *+* *+* *++++++* **++++++++* ****
*** *
"""
class Dashboard:
def __init__(self, db_connection):
self.db = db_connection
self.console = Console()
self.logo = LOGO
def display_commands(self):
table = Table(title="Available Commands", box=box.HEAVY_EDGE)
table.add_column("Command", style="cyan")
table.add_column("Description", style="white")
commands = [
("main", "Return to main screen"),
("marketsview", "View market indices and performance"),
("sectorsview", "View sector performance and analysis"),
("commoditiesview", "View commodities markets"),
("forexview", "View forex markets"),
("stocksview", "View stock analysis and data"),
("whispers", "View market whispers and insights"),
("news", "View market news [-d days] [-s source] [-t ticker]"),
("portfolio", "Portfolio management (coming soon)"),
("pricing", "Options pricing tools (coming soon)"),
("reports", "Financial reports (coming soon)"),
("slm", "SLM Reports (coming soon)"),
("talk", "Revaldi LLM (coming soon)"),
("help", "Display this help message"),
("quit", "Exit the terminal")
]
for command, description in commands:
table.add_row(command, description)
self.console.print(table)
def get_market_overview(self):
"""Get market data using FMP API"""
FMP_API_KEY = os.getenv('FMP_API_KEY')
if not FMP_API_KEY:
logger.error("FMP API key not found in environment variables")
return pd.DataFrame()
market_data = []
try:
# Market indices data
indices = [
{'symbol': '^SPX', 'name': 'S&P500'},
{'symbol': '^IXIC', 'name': 'Nasdaq'},
{'symbol': '^STOXX50E', 'name': 'EuroStoxx50'},
{'symbol': '^N225', 'name': 'Nikkei'},
{'symbol': '^HSI', 'name': 'HangSeng'},
{'symbol': 'MSCIWORLD', 'name': 'MSCI World'}
]
for index in indices:
try:
url = f'https://financialmodelingprep.com/api/v3/quote/{index["symbol"]}?apikey={FMP_API_KEY}'
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if data:
data = data[0]
market_data.append({
'symbol': index['symbol'],
'index_name': index['name'],
'last_price': float(data['price']),
'price_change_pct': float(data['changesPercentage'])
})
except Exception as e:
logger.error(f"Error fetching data for {index['symbol']}: {e}")
continue
# Commodities data
commodities = [
{'symbol': 'GCUSD', 'name': 'Gold'},
{'symbol': 'SIUSD', 'name': 'Silver'}
]
for commodity in commodities:
try:
url = f'https://financialmodelingprep.com/api/v3/quote/{commodity["symbol"]}?apikey={FMP_API_KEY}'
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if data:
data = data[0]
market_data.append({
'symbol': commodity['symbol'],
'index_name': commodity['name'],
'last_price': float(data['price']),
'price_change_pct': float(data['changesPercentage'])
})
except Exception as e:
logger.error(f"Error fetching data for {commodity['symbol']}: {e}")
continue
# Forex data
try:
url = f'https://financialmodelingprep.com/api/v3/fx/EURUSD?apikey={FMP_API_KEY}'
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if data:
data = data[0]
market_data.append({
'symbol': 'EURUSD',
'index_name': 'EUR/USD',
'last_price': float(data['bid']),
'price_change_pct': float(data['changes']) * 100
})
except Exception as e:
logger.error(f"Error fetching data for EURUSD: {e}")
except Exception as e:
logger.error(f"General error in market data fetch: {e}")
return pd.DataFrame(market_data)
def get_market_panel(self):
"""Generate market overview panel content"""
market_data = self.get_market_overview()
if market_data is not None and not market_data.empty:
table = Table(
show_header=True,
header_style="white",
box=box.MINIMAL_DOUBLE_HEAD,
border_style="white",
padding=(0, 2)
)
table.add_column("Index", style="white", width=25, justify="left")
table.add_column("Last Price", style="white", justify="right", width=15)
table.add_column("Change", style="white", justify="right", width=12)
for _, row in market_data.iterrows():
change_color = "red" if row['price_change_pct'] < 0 else "green"
if row['index_name'] == 'EUR/USD':
formatted_price = f"{row['last_price']:.4f}"
elif row['index_name'] in ['Gold', 'Silver']:
formatted_price = f"{row['last_price']:.2f}"
else:
formatted_price = f"{row['last_price']:,.2f}"
table.add_row(
str(row['index_name']),
formatted_price,
f"{row['price_change_pct']:+.2f}%",
style=change_color
)
return table
return Text("No market data available")
def get_eco_news(self):
"""Get the latest economic news for dashboard display"""
query = """
SELECT
'ECO' as news_type,
title,
source,
published_at
FROM econews
WHERE published_at >= CURRENT_DATE - INTERVAL '2 days'
ORDER BY published_at DESC
LIMIT 8;
"""
try:
return self.db.execute_query(query)
except Exception as e:
logger.error(f"Economic news query failed: {e}")
return pd.DataFrame()
def get_sdg_news(self):
"""Get the latest SDG news for dashboard display"""
query = """
SELECT
'SDG' as news_type,
title,
category,
date
FROM news
WHERE date >= CURRENT_DATE - INTERVAL '2 days'
AND category IS NOT NULL
ORDER BY date DESC
LIMIT 8;
"""
try:
return self.db.execute_query(query)
except Exception as e:
logger.error(f"SDG news query failed: {e}")
return pd.DataFrame()
def get_eco_news_panel(self):
"""Generate economic news panel content"""
news_data = self.get_eco_news()
if news_data is not None and not news_data.empty:
table = Table(
show_header=False,
box=box.MINIMAL_DOUBLE_HEAD,
border_style="white",
width=155,
padding=(0, 1)
)
table.add_column(width=65, overflow="fold")
for _, row in news_data.iterrows():
published_time = pd.to_datetime(row['published_at']).strftime('%H:%M')
title = row['title']
if len(title) > 152:
title = title[:150] + "..."
formatted_row = f"[dim]{published_time}[/dim] [green]ECO[/green] 📈 {title}"
table.add_row(formatted_row)
return table
return Text("No economic news available")
def get_sdg_news_panel(self):
"""Generate SDG news panel content"""
try:
news_data = self.get_sdg_news()
if not news_data.empty:
table = Table(
show_header=False,
box=box.MINIMAL_DOUBLE_HEAD,
border_style="white",
width=155,
padding=(0, 1)
)
table.add_column(width=65, overflow="fold")
for _, row in news_data.iterrows():
try:
published_time = pd.to_datetime(row['date']).strftime('%H:%M')
title = row['title'][:150] + "..." if len(row['title']) > 152 else row['title']
category = row['category'].split(' - ')[0]
formatted_row = f"[dim]{published_time}[/dim] [blue]SDG[/blue] 🌍 {title}"
table.add_row(formatted_row)
except Exception as e:
print(f"Row processing error: {e}") # Debug print
continue
return table
return Text("No SDG news available")
except Exception as e:
print(f"Panel creation error: {e}") # Debug print
return Text(f"Error: {str(e)}")
def display_welcome_screen(self):
self.console.print(self.logo, style="cyan")
self.console.print("\nWelcome to RivaCube Financial Terminal", style="yellow bold")
from datetime import datetime
current_date = datetime.now()
day_suffix = "th" if 11 <= current_date.day <= 13 else {1: "st", 2: "nd", 3: "rd"}.get(current_date.day % 10, "th")
formatted_date = current_date.strftime(f"%A, %B {current_date.day}{day_suffix} %Y")
date_copyright = Text()
date_copyright.append(formatted_date, style="cyan italic")
date_copyright.append(" ", style="dim")
date_copyright.append("Copyright SLM IMPACT FINANCE 2020", style="dim")
self.console.print(date_copyright)
# Create market panel
market_panel = Panel(
self.get_market_panel(),
title="Markets Overview",
border_style="white",
box=box.ROUNDED,
width=65,
padding=(0, 1)
)
# Create news panels
eco_panel = Panel(
self.get_eco_news_panel(),
title="Economic News",
border_style="green",
box=box.ROUNDED,
width=160,
padding=(0, 1)
)
sdg_panel = Panel(
self.get_sdg_news_panel(),
title="SDG News",
border_style="blue",
box=box.ROUNDED,
width=160,
padding=(0, 1)
)
# Print panels: market on top, news panels side by side below
self.console.print("\n") # Add some spacing
self.console.print(market_panel)
self.console.print("\n") # Add spacing between panels
self.console.print(sdg_panel)
self.console.print("\n") # Add spacing between panels
self.console.print(eco_panel)
self.console.print("\n")
self.display_commands()
class RivaTerminal:
def __init__(self):
self.db = None
self.components_initialized = False
self.commands = {}
self.tickers = {}
self.current_mode = None
self.current_ticker = None
self.styles = {
'success': 'green',
'error': 'red bold',
'warning': 'yellow',
'info': 'cyan',
'prompt': 'blue',
'title': 'magenta bold',
'accent': 'yellow bold'
}
self._init_database()
if self.db:
self._init_components()
self._setup_autocomplete()
def _init_database(self) -> None:
max_retries = 3
for retry in range(max_retries):
try:
self.db = DatabaseConnection()
self.db.execute_query("SELECT 1")
return
except Exception as e:
if retry < max_retries - 1:
console.print(f"[{self.styles['warning']}]Retrying database connection... ({retry + 1}/{max_retries})[/]")
time.sleep(2)
console.print(f"[{self.styles['error']}]Database connection unavailable. Features will be limited.[/]")
self.db = None
def _init_components(self) -> None:
try:
if not self.db:
logger.error("Cannot initialize: No database connection")
return
self.dashboard = Dashboard(self.db)
self.markets_view = MarketsView(self.db)
self.sectors_view = SectorsView(self.db)
self.commodities_view = CommoditiesView(self.db)
self.forex_view = ForexView(self.db)
self.stocks_view = StocksView(self.db)
self.whispers = Whispers(self.db)
self.commands = {
'quit': self._handle_quit,
'help': lambda _: self.dashboard.display_commands(),
'main': lambda _: self.dashboard.display_welcome_screen(),
'marketsview': lambda _: self.markets_view.display(),
'sectorsview': lambda _: self.sectors_view.display(),
'commoditiesview': lambda _: self.commodities_view.display(),
'forexview': lambda _: self.forex_view.display(),
'stocksview': self._handle_stocks_view,
'whispers': lambda _: self.whispers.display(),
'portfolio': lambda _: console.print("[yellow]Portfolio module coming soon...[/]"),
'pricing': lambda _: console.print("[yellow]Pricing module coming soon...[/]"),
'reports': lambda _: console.print("[yellow]Reports module coming soon...[/]"),
'slm': lambda _: console.print("[yellow]SLM Reports coming soon...[/]"),
'talk': lambda _: console.print("[yellow]Revaldi LLM coming soon...[/]")
}
self.components_initialized = True
except Exception as e:
logger.error(f"Component initialization error: {e}")
self.components_initialized = False
console.print(f"[{self.styles['error']}]Component initialization failed.[/]")
def _setup_autocomplete(self) -> None:
try:
if not self.db:
return
query = """
SELECT DISTINCT
t.yf_ticker,
t.name,
s.name as sector,
z.name as zone
FROM tickers t
LEFT JOIN sectors s ON s.id = t.sector_id
LEFT JOIN zones z ON z.id = t.zone_id
WHERE t.yf_ticker IS NOT NULL
ORDER BY t.yf_ticker;
"""
tickers_df = self.db.execute_query(query)
self.tickers = {
row['yf_ticker']: {
'name': row['name'],
'sector': row['sector'] if pd.notna(row['sector']) else 'N/A',
'zone': row['zone'] if pd.notna(row['zone']) else 'N/A'
}
for _, row in tickers_df.iterrows()
}
readline.parse_and_bind('tab: complete')
readline.set_completer(self._completer)
readline.set_completer_delims(' \t\n;')
except Exception as e:
logger.error(f"Autocomplete setup error: {e}")
self.tickers = {}
def _completer(self, text: str, state: int) -> Optional[str]:
try:
if self.current_mode == 'stocks':
text = text.upper()
options = [
f"{ticker:<6} - {info['name']:<30} [{info['zone']}] ({info['sector']})"
for ticker, info in self.tickers.items()
if ticker.startswith(text)
]
options.sort()
return options[state] if state < len(options) else None
else:
buffer = readline.get_line_buffer()
if not buffer or buffer.endswith(' '):
options = [cmd for cmd in self.commands if cmd.startswith(text.lower())]
return sorted(options)[state] if state < len(options) else None
return None
except Exception as e:
logger.error(f"Autocomplete error: {e}")
return None
def _handle_stocks_view(self, args=None) -> bool:
try:
if args and args[0]:
ticker = args[0].split(' - ')[0].strip().upper()
if ticker in self.tickers:
return self.stocks_view.display(ticker)
console.print(f"[{self.styles['error']}]Unknown ticker: {ticker}[/]")
self._handle_stocks_view()
return True
self.current_mode = 'stocks'
self.current_ticker = None
console.print(f"\n[{self.styles['info']}]Enter stock ticker (Tab for suggestions)[/]")
console.print("[blue]Commands: 'main' for main menu, 'quit' to exit[/]")
while True:
try:
prompt = f"Stock ({self.current_ticker})> " if self.current_ticker else "\nStock> "
ticker_input = Prompt.ask(prompt, console=console)
if not ticker_input:
continue
ticker_input = ticker_input.lower()
if ticker_input == 'main':
self.current_mode = None
self.current_ticker = None
return self.dashboard.display_welcome_screen()
if ticker_input == 'quit':
if Prompt.ask("Exit terminal?", choices=['yes', 'no'], default='no') == 'yes':
return self._handle_quit()
continue
if self.current_ticker:
try:
insight_num = int(ticker_input)
if 1 <= insight_num <= 8:
self.handle_insights_command(self.current_ticker, ticker_input)
continue
except ValueError:
pass
ticker = ticker_input.split(' - ')[0].strip().upper()
if ticker in self.tickers:
self.current_ticker = ticker
self.stocks_view.display(ticker)
else:
console.print(f"[{self.styles['error']}]Unknown ticker: {ticker}[/]")
console.print("[blue]Press Tab for suggestions[/]")
except (KeyboardInterrupt, EOFError):
console.print("\n[blue]Use 'main' for main menu or 'quit' to exit[/]")
continue
except Exception as e:
console.print(f"\n[{self.styles['error']}]Error: {str(e)}[/]")
continue
except Exception as e:
logger.error(f"Stocks view error: {e}")
self.current_mode = None
self.current_ticker = None
return True
def handle_insights_command(self, ticker: str, choice: str) -> bool:
try:
from modules.signals.stocks_insights import StocksInsights, StocksInsightsView
insight_num = int(choice)
if 1 <= insight_num <= 8:
insights = StocksInsights(self.db)
view = StocksInsightsView(insights)
view.handle_insight_request(ticker, insight_num)
return True
return False
except Exception as e:
logger.error(f"Insights error: {e}")
console.print(f"[{self.styles['error']}]Error displaying analysis: {str(e)}[/]")
return False
def run(self) -> None:
console.clear()
self.dashboard.display_welcome_screen()
while True:
try:
prompt = "Stock> " if self.current_mode == 'stocks' else "Rivaldi> "
command_line = Prompt.ask(f"\n{prompt}", console=console)
if not command_line:
continue
if command_line.lower() == 'quit':
if Prompt.ask("Exit terminal?", choices=['yes', 'no'], default='no') == 'yes':
if not self.process_command('quit', None):
return
continue
parts = command_line.split()
if not self.process_command(parts[0].lower(), parts[1:] if len(parts) > 1 else None):
if parts[0].lower() == 'quit':
return
continue
except KeyboardInterrupt:
console.print("\n[blue]Use 'quit' to exit properly[/]")
continue
except EOFError:
console.print("\n[blue]Use 'quit' to exit properly[/]")
continue
except Exception as e:
console.print(f"\n[{self.styles['error']}]Command processing error[/]")
continue
def process_command(self, command: str, args: Optional[List[str]] = None) -> bool:
try:
if command in self.commands:
return self.commands[command](args)
console.print(f"[{self.styles['error']}]Unknown command: {command}[/]")
console.print("[blue]Type 'help' for available commands[/]")
return True
except Exception as e:
console.print(f"[{self.styles['error']}]Command error: {str(e)}[/]")
console.print("[blue]Type 'help' for available commands[/]")
return True
def _handle_quit(self, _=None) -> bool:
try:
if self.db:
self.db.close()
self.db = None
console.print(f"\n[{self.styles['success']}]Goodbye![/]")
return False
except Exception as e:
logger.error(f"Cleanup error: {e}")
return True
def main():
console.clear()
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
terminal = RivaTerminal()
terminal.run()
break
except Exception as e:
logger.error(f"Terminal error: {e}")
retry_count += 1
if retry_count < max_retries:
console.print("\n[yellow]Restarting terminal...[/]")
time.sleep(2)
else:
console.print(f"\n[red]Terminal failed to start after {max_retries} attempts.[/]")
continue
if __name__ == "__main__":
main()