import os import sys import logging import readline import requests from dotenv import load_dotenv import time import pandas as pd from rich.console import Console from rich.prompt import Prompt from rich.panel import Panel from rich.text import Text from rich import box from rich.table import Table from rich.columns import Columns from typing import Optional, List project_root = os.path.dirname(os.path.abspath(__file__)) sys.path.append(project_root) from modules.database import DatabaseConnection from modules.signals.markets_view import MarketsView from modules.signals.sectors_view import SectorsView from modules.signals.commodities_view import CommoditiesView from modules.signals.forex_view import ForexView from modules.signals.stocks_view import StocksView from modules.signals.whispers import Whispers load_dotenv() logging.basicConfig( level=logging.WARNING, format='%(message)s', handlers=[ logging.FileHandler('rivaterminal.log', mode='a'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) console = Console() LOGO = """ * ***** ** **** * * ** *** * ** ** **** * *** ** * *** * ** ** ******* **** * * ** ** * **** ** ** ******* *** ****** * ** *** ** ** ***** * * *** ******** ***** * * ** *********** * ** *** **** # ******** *** *** ** ******** **** *** ********** * ** ** *** ** ***** *** **+** * ***# *** ** ***** ***+* ******** ***** #* *** *** **++** * *** ** ** **++** ** ******* *** ** ** ** *** ******** *** **** #*********** *** *+******** *** *+* *+* **** *+* *********+** **** ***++**** ** ***** *+** **+* *** **+ *** ***** *+* **+* ***+* **** #***++***** *+**** *+** *+* *** +* #** ** *** *+* **+* **+* **** **** *** *+** **** *** *** #*** **** *** *** **+* **** **** ******** ** **+* *+******* *** *** *** **+* *** *+* **+* **** **** ************ ***** *+****+ *** *** *** *+*******++* *+* **+* **** **** ** *+** **** *** *+ +* ************** *+* **+* **+* **** ****+****** *+** **** *** **+** *** **** *** **+* ***+* **** **** ** *+** **** *** *+* *+* *+* *++++++* **++++++++* **** *** * """ class Dashboard: def __init__(self, db_connection): self.db = db_connection self.console = Console() self.logo = LOGO def display_commands(self): table = Table(title="Available Commands", box=box.HEAVY_EDGE) table.add_column("Command", style="cyan") table.add_column("Description", style="white") commands = [ ("main", "Return to main screen"), ("marketsview", "View market indices and performance"), ("sectorsview", "View sector performance and analysis"), ("commoditiesview", "View commodities markets"), ("forexview", "View forex markets"), ("stocksview", "View stock analysis and data"), ("whispers", "View market whispers and insights"), ("news", "View market news [-d days] [-s source] [-t ticker]"), ("portfolio", "Portfolio management (coming soon)"), ("pricing", "Options pricing tools (coming soon)"), ("reports", "Financial reports (coming soon)"), ("slm", "SLM Reports (coming soon)"), ("talk", "Revaldi LLM (coming soon)"), ("help", "Display this help message"), ("quit", "Exit the terminal") ] for command, description in commands: table.add_row(command, description) self.console.print(table) def get_market_overview(self): """Get market data using FMP API""" FMP_API_KEY = os.getenv('FMP_API_KEY') if not FMP_API_KEY: logger.error("FMP API key not found in environment variables") return pd.DataFrame() market_data = [] try: # Market indices data indices = [ {'symbol': '^SPX', 'name': 'S&P500'}, {'symbol': '^IXIC', 'name': 'Nasdaq'}, {'symbol': '^STOXX50E', 'name': 'EuroStoxx50'}, {'symbol': '^N225', 'name': 'Nikkei'}, {'symbol': '^HSI', 'name': 'HangSeng'}, {'symbol': 'MSCIWORLD', 'name': 'MSCI World'} ] for index in indices: try: url = f'https://financialmodelingprep.com/api/v3/quote/{index["symbol"]}?apikey={FMP_API_KEY}' response = requests.get(url) if response.status_code == 200: data = response.json() if data: data = data[0] market_data.append({ 'symbol': index['symbol'], 'index_name': index['name'], 'last_price': float(data['price']), 'price_change_pct': float(data['changesPercentage']) }) except Exception as e: logger.error(f"Error fetching data for {index['symbol']}: {e}") continue # Commodities data commodities = [ {'symbol': 'GCUSD', 'name': 'Gold'}, {'symbol': 'SIUSD', 'name': 'Silver'} ] for commodity in commodities: try: url = f'https://financialmodelingprep.com/api/v3/quote/{commodity["symbol"]}?apikey={FMP_API_KEY}' response = requests.get(url) if response.status_code == 200: data = response.json() if data: data = data[0] market_data.append({ 'symbol': commodity['symbol'], 'index_name': commodity['name'], 'last_price': float(data['price']), 'price_change_pct': float(data['changesPercentage']) }) except Exception as e: logger.error(f"Error fetching data for {commodity['symbol']}: {e}") continue # Forex data try: url = f'https://financialmodelingprep.com/api/v3/fx/EURUSD?apikey={FMP_API_KEY}' response = requests.get(url) if response.status_code == 200: data = response.json() if data: data = data[0] market_data.append({ 'symbol': 'EURUSD', 'index_name': 'EUR/USD', 'last_price': float(data['bid']), 'price_change_pct': float(data['changes']) * 100 }) except Exception as e: logger.error(f"Error fetching data for EURUSD: {e}") except Exception as e: logger.error(f"General error in market data fetch: {e}") return pd.DataFrame(market_data) def get_market_panel(self): """Generate market overview panel content""" market_data = self.get_market_overview() if market_data is not None and not market_data.empty: table = Table( show_header=True, header_style="white", box=box.MINIMAL_DOUBLE_HEAD, border_style="white", padding=(0, 2) ) table.add_column("Index", style="white", width=25, justify="left") table.add_column("Last Price", style="white", justify="right", width=15) table.add_column("Change", style="white", justify="right", width=12) for _, row in market_data.iterrows(): change_color = "red" if row['price_change_pct'] < 0 else "green" if row['index_name'] == 'EUR/USD': formatted_price = f"{row['last_price']:.4f}" elif row['index_name'] in ['Gold', 'Silver']: formatted_price = f"{row['last_price']:.2f}" else: formatted_price = f"{row['last_price']:,.2f}" table.add_row( str(row['index_name']), formatted_price, f"{row['price_change_pct']:+.2f}%", style=change_color ) return table return Text("No market data available") def get_eco_news(self): """Get the latest economic news for dashboard display""" query = """ SELECT 'ECO' as news_type, title, source, published_at FROM econews WHERE published_at >= CURRENT_DATE - INTERVAL '2 days' ORDER BY published_at DESC LIMIT 8; """ try: return self.db.execute_query(query) except Exception as e: logger.error(f"Economic news query failed: {e}") return pd.DataFrame() def get_sdg_news(self): """Get the latest SDG news for dashboard display""" query = """ SELECT 'SDG' as news_type, title, category, date FROM news WHERE date >= CURRENT_DATE - INTERVAL '2 days' AND category IS NOT NULL ORDER BY date DESC LIMIT 8; """ try: return self.db.execute_query(query) except Exception as e: logger.error(f"SDG news query failed: {e}") return pd.DataFrame() def get_eco_news_panel(self): """Generate economic news panel content""" news_data = self.get_eco_news() if news_data is not None and not news_data.empty: table = Table( show_header=False, box=box.MINIMAL_DOUBLE_HEAD, border_style="white", width=155, padding=(0, 1) ) table.add_column(width=65, overflow="fold") for _, row in news_data.iterrows(): published_time = pd.to_datetime(row['published_at']).strftime('%H:%M') title = row['title'] if len(title) > 152: title = title[:150] + "..." formatted_row = f"[dim]{published_time}[/dim] [green]ECO[/green] 📈 {title}" table.add_row(formatted_row) return table return Text("No economic news available") def get_sdg_news_panel(self): """Generate SDG news panel content""" try: news_data = self.get_sdg_news() if not news_data.empty: table = Table( show_header=False, box=box.MINIMAL_DOUBLE_HEAD, border_style="white", width=155, padding=(0, 1) ) table.add_column(width=65, overflow="fold") for _, row in news_data.iterrows(): try: published_time = pd.to_datetime(row['date']).strftime('%H:%M') title = row['title'][:150] + "..." if len(row['title']) > 152 else row['title'] category = row['category'].split(' - ')[0] formatted_row = f"[dim]{published_time}[/dim] [blue]SDG[/blue] 🌍 {title}" table.add_row(formatted_row) except Exception as e: print(f"Row processing error: {e}") # Debug print continue return table return Text("No SDG news available") except Exception as e: print(f"Panel creation error: {e}") # Debug print return Text(f"Error: {str(e)}") def display_welcome_screen(self): self.console.print(self.logo, style="cyan") self.console.print("\nWelcome to RivaCube Financial Terminal", style="yellow bold") from datetime import datetime current_date = datetime.now() day_suffix = "th" if 11 <= current_date.day <= 13 else {1: "st", 2: "nd", 3: "rd"}.get(current_date.day % 10, "th") formatted_date = current_date.strftime(f"%A, %B {current_date.day}{day_suffix} %Y") date_copyright = Text() date_copyright.append(formatted_date, style="cyan italic") date_copyright.append(" ‒ ", style="dim") date_copyright.append("Copyright SLM IMPACT FINANCE 2020", style="dim") self.console.print(date_copyright) # Create market panel market_panel = Panel( self.get_market_panel(), title="Markets Overview", border_style="white", box=box.ROUNDED, width=65, padding=(0, 1) ) # Create news panels eco_panel = Panel( self.get_eco_news_panel(), title="Economic News", border_style="green", box=box.ROUNDED, width=160, padding=(0, 1) ) sdg_panel = Panel( self.get_sdg_news_panel(), title="SDG News", border_style="blue", box=box.ROUNDED, width=160, padding=(0, 1) ) # Print panels: market on top, news panels side by side below self.console.print("\n") # Add some spacing self.console.print(market_panel) self.console.print("\n") # Add spacing between panels self.console.print(sdg_panel) self.console.print("\n") # Add spacing between panels self.console.print(eco_panel) self.console.print("\n") self.display_commands() class RivaTerminal: def __init__(self): self.db = None self.components_initialized = False self.commands = {} self.tickers = {} self.current_mode = None self.current_ticker = None self.styles = { 'success': 'green', 'error': 'red bold', 'warning': 'yellow', 'info': 'cyan', 'prompt': 'blue', 'title': 'magenta bold', 'accent': 'yellow bold' } self._init_database() if self.db: self._init_components() self._setup_autocomplete() def _init_database(self) -> None: max_retries = 3 for retry in range(max_retries): try: self.db = DatabaseConnection() self.db.execute_query("SELECT 1") return except Exception as e: if retry < max_retries - 1: console.print(f"[{self.styles['warning']}]Retrying database connection... ({retry + 1}/{max_retries})[/]") time.sleep(2) console.print(f"[{self.styles['error']}]Database connection unavailable. Features will be limited.[/]") self.db = None def _init_components(self) -> None: try: if not self.db: logger.error("Cannot initialize: No database connection") return self.dashboard = Dashboard(self.db) self.markets_view = MarketsView(self.db) self.sectors_view = SectorsView(self.db) self.commodities_view = CommoditiesView(self.db) self.forex_view = ForexView(self.db) self.stocks_view = StocksView(self.db) self.whispers = Whispers(self.db) self.commands = { 'quit': self._handle_quit, 'help': lambda _: self.dashboard.display_commands(), 'main': lambda _: self.dashboard.display_welcome_screen(), 'marketsview': lambda _: self.markets_view.display(), 'sectorsview': lambda _: self.sectors_view.display(), 'commoditiesview': lambda _: self.commodities_view.display(), 'forexview': lambda _: self.forex_view.display(), 'stocksview': self._handle_stocks_view, 'whispers': lambda _: self.whispers.display(), 'portfolio': lambda _: console.print("[yellow]Portfolio module coming soon...[/]"), 'pricing': lambda _: console.print("[yellow]Pricing module coming soon...[/]"), 'reports': lambda _: console.print("[yellow]Reports module coming soon...[/]"), 'slm': lambda _: console.print("[yellow]SLM Reports coming soon...[/]"), 'talk': lambda _: console.print("[yellow]Revaldi LLM coming soon...[/]") } self.components_initialized = True except Exception as e: logger.error(f"Component initialization error: {e}") self.components_initialized = False console.print(f"[{self.styles['error']}]Component initialization failed.[/]") def _setup_autocomplete(self) -> None: try: if not self.db: return query = """ SELECT DISTINCT t.yf_ticker, t.name, s.name as sector, z.name as zone FROM tickers t LEFT JOIN sectors s ON s.id = t.sector_id LEFT JOIN zones z ON z.id = t.zone_id WHERE t.yf_ticker IS NOT NULL ORDER BY t.yf_ticker; """ tickers_df = self.db.execute_query(query) self.tickers = { row['yf_ticker']: { 'name': row['name'], 'sector': row['sector'] if pd.notna(row['sector']) else 'N/A', 'zone': row['zone'] if pd.notna(row['zone']) else 'N/A' } for _, row in tickers_df.iterrows() } readline.parse_and_bind('tab: complete') readline.set_completer(self._completer) readline.set_completer_delims(' \t\n;') except Exception as e: logger.error(f"Autocomplete setup error: {e}") self.tickers = {} def _completer(self, text: str, state: int) -> Optional[str]: try: if self.current_mode == 'stocks': text = text.upper() options = [ f"{ticker:<6} - {info['name']:<30} [{info['zone']}] ({info['sector']})" for ticker, info in self.tickers.items() if ticker.startswith(text) ] options.sort() return options[state] if state < len(options) else None else: buffer = readline.get_line_buffer() if not buffer or buffer.endswith(' '): options = [cmd for cmd in self.commands if cmd.startswith(text.lower())] return sorted(options)[state] if state < len(options) else None return None except Exception as e: logger.error(f"Autocomplete error: {e}") return None def _handle_stocks_view(self, args=None) -> bool: try: if args and args[0]: ticker = args[0].split(' - ')[0].strip().upper() if ticker in self.tickers: return self.stocks_view.display(ticker) console.print(f"[{self.styles['error']}]Unknown ticker: {ticker}[/]") self._handle_stocks_view() return True self.current_mode = 'stocks' self.current_ticker = None console.print(f"\n[{self.styles['info']}]Enter stock ticker (Tab for suggestions)[/]") console.print("[blue]Commands: 'main' for main menu, 'quit' to exit[/]") while True: try: prompt = f"Stock ({self.current_ticker})> " if self.current_ticker else "\nStock> " ticker_input = Prompt.ask(prompt, console=console) if not ticker_input: continue ticker_input = ticker_input.lower() if ticker_input == 'main': self.current_mode = None self.current_ticker = None return self.dashboard.display_welcome_screen() if ticker_input == 'quit': if Prompt.ask("Exit terminal?", choices=['yes', 'no'], default='no') == 'yes': return self._handle_quit() continue if self.current_ticker: try: insight_num = int(ticker_input) if 1 <= insight_num <= 8: self.handle_insights_command(self.current_ticker, ticker_input) continue except ValueError: pass ticker = ticker_input.split(' - ')[0].strip().upper() if ticker in self.tickers: self.current_ticker = ticker self.stocks_view.display(ticker) else: console.print(f"[{self.styles['error']}]Unknown ticker: {ticker}[/]") console.print("[blue]Press Tab for suggestions[/]") except (KeyboardInterrupt, EOFError): console.print("\n[blue]Use 'main' for main menu or 'quit' to exit[/]") continue except Exception as e: console.print(f"\n[{self.styles['error']}]Error: {str(e)}[/]") continue except Exception as e: logger.error(f"Stocks view error: {e}") self.current_mode = None self.current_ticker = None return True def handle_insights_command(self, ticker: str, choice: str) -> bool: try: from modules.signals.stocks_insights import StocksInsights, StocksInsightsView insight_num = int(choice) if 1 <= insight_num <= 8: insights = StocksInsights(self.db) view = StocksInsightsView(insights) view.handle_insight_request(ticker, insight_num) return True return False except Exception as e: logger.error(f"Insights error: {e}") console.print(f"[{self.styles['error']}]Error displaying analysis: {str(e)}[/]") return False def run(self) -> None: console.clear() self.dashboard.display_welcome_screen() while True: try: prompt = "Stock> " if self.current_mode == 'stocks' else "Rivaldi> " command_line = Prompt.ask(f"\n{prompt}", console=console) if not command_line: continue if command_line.lower() == 'quit': if Prompt.ask("Exit terminal?", choices=['yes', 'no'], default='no') == 'yes': if not self.process_command('quit', None): return continue parts = command_line.split() if not self.process_command(parts[0].lower(), parts[1:] if len(parts) > 1 else None): if parts[0].lower() == 'quit': return continue except KeyboardInterrupt: console.print("\n[blue]Use 'quit' to exit properly[/]") continue except EOFError: console.print("\n[blue]Use 'quit' to exit properly[/]") continue except Exception as e: console.print(f"\n[{self.styles['error']}]Command processing error[/]") continue def process_command(self, command: str, args: Optional[List[str]] = None) -> bool: try: if command in self.commands: return self.commands[command](args) console.print(f"[{self.styles['error']}]Unknown command: {command}[/]") console.print("[blue]Type 'help' for available commands[/]") return True except Exception as e: console.print(f"[{self.styles['error']}]Command error: {str(e)}[/]") console.print("[blue]Type 'help' for available commands[/]") return True def _handle_quit(self, _=None) -> bool: try: if self.db: self.db.close() self.db = None console.print(f"\n[{self.styles['success']}]Goodbye![/]") return False except Exception as e: logger.error(f"Cleanup error: {e}") return True def main(): console.clear() max_retries = 3 retry_count = 0 while retry_count < max_retries: try: terminal = RivaTerminal() terminal.run() break except Exception as e: logger.error(f"Terminal error: {e}") retry_count += 1 if retry_count < max_retries: console.print("\n[yellow]Restarting terminal...[/]") time.sleep(2) else: console.print(f"\n[red]Terminal failed to start after {max_retries} attempts.[/]") continue if __name__ == "__main__": main()