#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import sys import time import signal import logging import psutil import subprocess from datetime import datetime from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from apscheduler.events import ( EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_JOB_MISSED ) class Config: LOG_FILE = "scheduler.log" LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" LOG_LEVEL = logging.INFO CPU_WARNING_THRESHOLD = 80 CPU_CRITICAL_THRESHOLD = 90 MEMORY_WARNING_THRESHOLD = 80 MEMORY_CRITICAL_THRESHOLD = 90 DISK_WARNING_THRESHOLD = 80 DISK_CRITICAL_THRESHOLD = 90 SCRIPT_TIMEOUT = 3600 # 1 hour MAX_RETRIES = 3 RETRY_DELAY = 300 # 5 minutes USE_CONDA = False BASE_DIR = os.path.dirname(os.path.abspath(__file__)) RIVDROITE_DIR = BASE_DIR RIVGAUCHE_DIR = BASE_DIR def find_conda(): possible_paths = [ "/home/cube/mambaforge/etc/profile.d/conda.sh", "/home/cube/miniconda3/etc/profile.d/conda.sh", "/home/cube/anaconda3/etc/profile.d/conda.sh", os.path.expanduser("~/mambaforge/etc/profile.d/conda.sh"), os.path.expanduser("~/miniconda3/etc/profile.d/conda.sh"), os.path.expanduser("~/anaconda3/etc/profile.d/conda.sh") ] for path in possible_paths: if os.path.exists(path): Config.USE_CONDA = True return path return None CONDA_PATH = find_conda() def setup_logging(): try: logging.basicConfig( level=Config.LOG_LEVEL, format=Config.LOG_FORMAT, handlers=[ logging.StreamHandler(), logging.FileHandler(Config.LOG_FILE) ] ) logging.info("Logging system initialized") except Exception as e: print(f"Failed to initialize logging: {str(e)}") sys.exit(1) def verify_dependencies(): try: if not os.path.exists(RIVDROITE_DIR): logging.error(f"RivDroite directory not found: {RIVDROITE_DIR}") return False if not os.path.exists(RIVGAUCHE_DIR): logging.error(f"RivGauche directory not found: {RIVGAUCHE_DIR}") return False try: import pandas import numpy import psycopg2 import requests except ImportError as e: logging.warning(f"Missing optional Python package: {str(e)}") return True except Exception as e: logging.error(f"Dependency verification failed: {str(e)}") return False class ScriptExecutor: def __init__(self): self.conda_path = CONDA_PATH self.env_setup_complete = False def setup_conda_env(self): if not Config.USE_CONDA or self.env_setup_complete: return True try: if self.conda_path: cmd = f'source {self.conda_path}' subprocess.run(cmd, shell=True, executable='/bin/bash', check=True) self.env_setup_complete = True return True return False except subprocess.CalledProcessError as e: logging.error(f"Failed to setup conda environment: {str(e)}") return False except Exception as e: logging.error(f"Unexpected error during conda setup: {str(e)}") return False def run_script(self, script_path, env_name="base"): if not os.path.exists(script_path): logging.error(f"Script not found: {script_path}") return False try: script_name = os.path.basename(script_path) logging.info(f"Starting {script_name}") if Config.USE_CONDA and self.conda_path: if not self.setup_conda_env(): return False cmd = f'source {self.conda_path} && conda activate {env_name} && python {script_path}' else: cmd = f'python {script_path}' process = subprocess.Popen( cmd, shell=True, executable='/bin/bash', stdout=subprocess.PIPE, stderr=subprocess.PIPE ) try: stdout, stderr = process.communicate(timeout=Config.SCRIPT_TIMEOUT) if process.returncode != 0: logging.error(f"Script {script_name} failed with return code {process.returncode}") if stderr: logging.error(f"Error output: {stderr.decode()}") return False if stdout: logging.debug(f"Script output: {stdout.decode()}") return True except subprocess.TimeoutExpired: process.kill() logging.error(f"Script {script_name} timed out after {Config.SCRIPT_TIMEOUT} seconds") return False except Exception as e: logging.error(f"Error executing {script_path}: {str(e)}") return False def run_script_with_retry(self, script_path, env_name="base"): retries = 0 while retries < Config.MAX_RETRIES: if self.run_script(script_path, env_name): return True retries += 1 if retries < Config.MAX_RETRIES: logging.warning(f"Retrying {script_path} in {Config.RETRY_DELAY} seconds... (Attempt {retries + 1}/{Config.MAX_RETRIES})") time.sleep(Config.RETRY_DELAY) logging.error(f"Script {script_path} failed after {Config.MAX_RETRIES} attempts") return False class SystemHealthCheck: def __init__(self): self.warnings = [] self.critical_issues = [] def check_cpu_usage(self): try: cpu_percent = psutil.cpu_percent(interval=1) if cpu_percent >= Config.CPU_CRITICAL_THRESHOLD: self.critical_issues.append(f"CPU usage critical: {cpu_percent}%") return "CRITICAL" elif cpu_percent >= Config.CPU_WARNING_THRESHOLD: self.warnings.append(f"CPU usage high: {cpu_percent}%") return "WARNING" return "OK" except Exception as e: self.critical_issues.append(f"CPU check failed: {str(e)}") return "ERROR" def check_memory_usage(self): try: memory = psutil.virtual_memory() memory_percent = memory.percent if memory_percent >= Config.MEMORY_CRITICAL_THRESHOLD: self.critical_issues.append(f"Memory usage critical: {memory_percent}%") return "CRITICAL" elif memory_percent >= Config.MEMORY_WARNING_THRESHOLD: self.warnings.append(f"Memory usage high: {memory_percent}%") return "WARNING" return "OK" except Exception as e: self.critical_issues.append(f"Memory check failed: {str(e)}") return "ERROR" def check_disk_usage(self): try: disk = psutil.disk_usage('/') disk_percent = disk.percent if disk_percent >= Config.DISK_CRITICAL_THRESHOLD: self.critical_issues.append(f"Disk usage critical: {disk_percent}%") return "CRITICAL" elif disk_percent >= Config.DISK_WARNING_THRESHOLD: self.warnings.append(f"Disk usage high: {disk_percent}%") return "WARNING" return "OK" except Exception as e: self.critical_issues.append(f"Disk check failed: {str(e)}") return "ERROR" def check_script_directories(self): try: if not os.path.exists(RIVDROITE_DIR) or not os.access(RIVDROITE_DIR, os.R_OK): self.critical_issues.append(f"Cannot access RivDroite directory: {RIVDROITE_DIR}") return "CRITICAL" if not os.path.exists(RIVGAUCHE_DIR) or not os.access(RIVGAUCHE_DIR, os.R_OK): self.critical_issues.append(f"Cannot access RivGauche directory: {RIVGAUCHE_DIR}") return "CRITICAL" return "OK" except Exception as e: self.critical_issues.append(f"Directory check failed: {str(e)}") return "ERROR" def run_health_check(self): self.warnings = [] self.critical_issues = [] cpu_status = self.check_cpu_usage() memory_status = self.check_memory_usage() disk_status = self.check_disk_usage() dir_status = self.check_script_directories() if "CRITICAL" in [cpu_status, memory_status, disk_status, dir_status]: system_status = "CRITICAL" elif "WARNING" in [cpu_status, memory_status, disk_status, dir_status]: system_status = "WARNING" elif "ERROR" in [cpu_status, memory_status, disk_status, dir_status]: system_status = "ERROR" else: system_status = "OK" report = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "system": { "status": system_status, "warnings": self.warnings, "critical_issues": self.critical_issues }, "metrics": { "cpu": { "status": cpu_status, "usage": psutil.cpu_percent() }, "memory": { "status": memory_status, "usage": psutil.virtual_memory().percent }, "disk": { "status": disk_status, "usage": psutil.disk_usage('/').percent }, "directories": { "status": dir_status } } } if self.warnings: logging.warning("System warnings:\n" + "\n".join(self.warnings)) if self.critical_issues: logging.error("Critical issues:\n" + "\n".join(self.critical_issues)) return report class TaskManager: def __init__(self): self.script_executor = ScriptExecutor() self.health_checker = SystemHealthCheck() def run_script(self, script_name, env_name="base"): script_path = os.path.join(RIVDROITE_DIR, script_name) return self.script_executor.run_script_with_retry(script_path, env_name) def run_health_check(self): logging.info("Starting system health check") try: health_report = self.health_checker.run_health_check() if health_report['system']['status'] in ['CRITICAL', 'ERROR']: pass return True except Exception as e: logging.error(f"Health check failed: {str(e)}") return False class SchedulerManager: def __init__(self): self.scheduler = BlockingScheduler() self.task_manager = TaskManager() self.setup_job_listeners() def setup_job_listeners(self): def job_listener(event): if hasattr(event, 'job_id'): if event.exception: logging.error(f'Job {event.job_id} failed: {str(event.exception)}') elif event.code == EVENT_JOB_MISSED: logging.warning(f'Job {event.job_id} missed scheduled run time') else: logging.info(f'Job {event.job_id} completed successfully') self.scheduler.add_listener( job_listener, EVENT_JOB_ERROR | EVENT_JOB_EXECUTED | EVENT_JOB_MISSED ) def add_jobs(self): try: # Midnight updates (weekdays) midnight_jobs = [ ('commodities', self.task_manager.run_script, 'update_commodities.py'), ('economics', self.task_manager.run_script, 'update_economics.py'), ('indexes', self.task_manager.run_script, 'update_indexes.py'), ('stocks', self.task_manager.run_script, 'update_stocks.py'), ('stocksnews', self.task_manager.run_script, 'update_stocksnews.py'), ('forex', self.task_manager.run_script, 'update_forex.py') ] # Add midnight jobs (weekdays) for job_id, func, script in midnight_jobs: self.scheduler.add_job( func, CronTrigger(day_of_week='mon-fri', hour=0, minute=0), args=[script], id=job_id ) # 2 AM updates (weekdays) early_morning_jobs = [ ('statements', self.task_manager.run_script, 'update_statements.py'), ('technicals', self.task_manager.run_script, 'update_technicals.py'), ('consensus', self.task_manager.run_script, 'update_consensus.py'), ('financials', self.task_manager.run_script, 'update_financials.py') ] # Add 2 AM jobs for job_id, func, script in early_morning_jobs: self.scheduler.add_job( func, CronTrigger(day_of_week='mon-fri', hour=2, minute=0), args=[script], id=job_id ) # Weekly Sunday updates sunday_jobs = [ ('valuations', self.task_manager.run_script, 'update_valuations.py', 6), ('metrics', self.task_manager.run_script, 'update_metrics.py', 4), ('transcripts', self.task_manager.run_script, 'update_transcripts.py', 8) ] # Add Sunday jobs for job_id, func, script, hour in sunday_jobs: self.scheduler.add_job( func, CronTrigger(day_of_week='sun', hour=hour, minute=0), args=[script], id=job_id ) # Twice daily updates (midnight and midday, weekdays) twice_daily_jobs = [ ('econews', self.task_manager.run_script, 'update_econews.py'), ('sdgnews', self.task_manager.run_script, 'update_sdgnews.py'), ('usmarkets', self.task_manager.run_script, 'update_USmarkets.py') ] # Add twice daily jobs for job_id, func, script in twice_daily_jobs: for hour in [0, 12]: # Midnight and Midday self.scheduler.add_job( func, CronTrigger(day_of_week='mon-fri', hour=hour, minute=0), args=[script], id=f"{job_id}_{hour}" ) # Health check (hourly) self.scheduler.add_job( self.task_manager.run_health_check, IntervalTrigger(hours=1), id='health_check', next_run_time=datetime.now() ) return True except Exception as e: logging.error(f"Error adding jobs: {str(e)}") return False def print_schedule(self): try: logging.info("\nScheduled Jobs:") logging.info("-" * 80) format_str = "{:<20} {:<30} {:<30}" logging.info(format_str.format("Job ID", "Next Run Time", "Schedule")) logging.info("-" * 80) for job in self.scheduler.get_jobs(): try: if hasattr(job, 'next_run_time') and job.next_run_time: next_run = job.next_run_time.strftime('%Y-%m-%d %H:%M:%S') else: next_run = "Pending" if isinstance(job.trigger, CronTrigger): schedule = f"Cron: {job.trigger}" elif isinstance(job.trigger, IntervalTrigger): schedule = f"Every {job.trigger.interval}" else: schedule = str(job.trigger) logging.info(format_str.format( str(job.id), next_run, schedule )) except Exception as e: logging.warning(f"Could not print details for job {job.id}: {str(e)}") continue logging.info("-" * 80) except Exception as e: logging.error(f"Error printing schedule: {str(e)}") def start(self): try: if not self.add_jobs(): raise Exception("Failed to add jobs to scheduler") self.print_schedule() logging.info("Starting scheduler...") self.scheduler.start() except (KeyboardInterrupt, SystemExit): self.shutdown() except Exception as e: logging.error(f"Scheduler error: {str(e)}") self.shutdown() raise def shutdown(self): try: logging.info("Shutting down scheduler...") if hasattr(self.scheduler, 'running') and self.scheduler.running: self.scheduler.shutdown() logging.info("Scheduler shutdown complete") except Exception as e: logging.error(f"Error during shutdown: {str(e)}") def main(): setup_logging() logging.info("Starting RivaCube Scheduler...") if not verify_dependencies(): logging.error("Failed to verify dependencies. Exiting.") sys.exit(1) scheduler_manager = SchedulerManager() def signal_handler(signum, frame): logging.info("Received shutdown signal...") scheduler_manager.shutdown() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: scheduler_manager.start() except Exception as e: logging.error(f"Fatal error: {str(e)}") sys.exit(1) if __name__ == "__main__": main()