RivaCube/scheduler.py
2025-02-04 19:31:18 +01:00

517 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import signal
import logging
import psutil
import subprocess
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.events import (
EVENT_JOB_ERROR,
EVENT_JOB_EXECUTED,
EVENT_JOB_MISSED
)
class Config:
LOG_FILE = "scheduler.log"
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
LOG_LEVEL = logging.INFO
CPU_WARNING_THRESHOLD = 80
CPU_CRITICAL_THRESHOLD = 90
MEMORY_WARNING_THRESHOLD = 80
MEMORY_CRITICAL_THRESHOLD = 90
DISK_WARNING_THRESHOLD = 80
DISK_CRITICAL_THRESHOLD = 90
SCRIPT_TIMEOUT = 3600 # 1 hour
MAX_RETRIES = 3
RETRY_DELAY = 300 # 5 minutes
USE_CONDA = False
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
RIVDROITE_DIR = BASE_DIR
RIVGAUCHE_DIR = BASE_DIR
def find_conda():
possible_paths = [
"/home/cube/mambaforge/etc/profile.d/conda.sh",
"/home/cube/miniconda3/etc/profile.d/conda.sh",
"/home/cube/anaconda3/etc/profile.d/conda.sh",
os.path.expanduser("~/mambaforge/etc/profile.d/conda.sh"),
os.path.expanduser("~/miniconda3/etc/profile.d/conda.sh"),
os.path.expanduser("~/anaconda3/etc/profile.d/conda.sh")
]
for path in possible_paths:
if os.path.exists(path):
Config.USE_CONDA = True
return path
return None
CONDA_PATH = find_conda()
def setup_logging():
try:
logging.basicConfig(
level=Config.LOG_LEVEL,
format=Config.LOG_FORMAT,
handlers=[
logging.StreamHandler(),
logging.FileHandler(Config.LOG_FILE)
]
)
logging.info("Logging system initialized")
except Exception as e:
print(f"Failed to initialize logging: {str(e)}")
sys.exit(1)
def verify_dependencies():
try:
if not os.path.exists(RIVDROITE_DIR):
logging.error(f"RivDroite directory not found: {RIVDROITE_DIR}")
return False
if not os.path.exists(RIVGAUCHE_DIR):
logging.error(f"RivGauche directory not found: {RIVGAUCHE_DIR}")
return False
try:
import pandas
import numpy
import psycopg2
import requests
except ImportError as e:
logging.warning(f"Missing optional Python package: {str(e)}")
return True
except Exception as e:
logging.error(f"Dependency verification failed: {str(e)}")
return False
class ScriptExecutor:
def __init__(self):
self.conda_path = CONDA_PATH
self.env_setup_complete = False
def setup_conda_env(self):
if not Config.USE_CONDA or self.env_setup_complete:
return True
try:
if self.conda_path:
cmd = f'source {self.conda_path}'
subprocess.run(cmd, shell=True, executable='/bin/bash', check=True)
self.env_setup_complete = True
return True
return False
except subprocess.CalledProcessError as e:
logging.error(f"Failed to setup conda environment: {str(e)}")
return False
except Exception as e:
logging.error(f"Unexpected error during conda setup: {str(e)}")
return False
def run_script(self, script_path, env_name="base"):
if not os.path.exists(script_path):
logging.error(f"Script not found: {script_path}")
return False
try:
script_name = os.path.basename(script_path)
logging.info(f"Starting {script_name}")
if Config.USE_CONDA and self.conda_path:
if not self.setup_conda_env():
return False
cmd = f'source {self.conda_path} && conda activate {env_name} && python {script_path}'
else:
cmd = f'python {script_path}'
process = subprocess.Popen(
cmd,
shell=True,
executable='/bin/bash',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
try:
stdout, stderr = process.communicate(timeout=Config.SCRIPT_TIMEOUT)
if process.returncode != 0:
logging.error(f"Script {script_name} failed with return code {process.returncode}")
if stderr:
logging.error(f"Error output: {stderr.decode()}")
return False
if stdout:
logging.debug(f"Script output: {stdout.decode()}")
return True
except subprocess.TimeoutExpired:
process.kill()
logging.error(f"Script {script_name} timed out after {Config.SCRIPT_TIMEOUT} seconds")
return False
except Exception as e:
logging.error(f"Error executing {script_path}: {str(e)}")
return False
def run_script_with_retry(self, script_path, env_name="base"):
retries = 0
while retries < Config.MAX_RETRIES:
if self.run_script(script_path, env_name):
return True
retries += 1
if retries < Config.MAX_RETRIES:
logging.warning(f"Retrying {script_path} in {Config.RETRY_DELAY} seconds... (Attempt {retries + 1}/{Config.MAX_RETRIES})")
time.sleep(Config.RETRY_DELAY)
logging.error(f"Script {script_path} failed after {Config.MAX_RETRIES} attempts")
return False
class SystemHealthCheck:
def __init__(self):
self.warnings = []
self.critical_issues = []
def check_cpu_usage(self):
try:
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent >= Config.CPU_CRITICAL_THRESHOLD:
self.critical_issues.append(f"CPU usage critical: {cpu_percent}%")
return "CRITICAL"
elif cpu_percent >= Config.CPU_WARNING_THRESHOLD:
self.warnings.append(f"CPU usage high: {cpu_percent}%")
return "WARNING"
return "OK"
except Exception as e:
self.critical_issues.append(f"CPU check failed: {str(e)}")
return "ERROR"
def check_memory_usage(self):
try:
memory = psutil.virtual_memory()
memory_percent = memory.percent
if memory_percent >= Config.MEMORY_CRITICAL_THRESHOLD:
self.critical_issues.append(f"Memory usage critical: {memory_percent}%")
return "CRITICAL"
elif memory_percent >= Config.MEMORY_WARNING_THRESHOLD:
self.warnings.append(f"Memory usage high: {memory_percent}%")
return "WARNING"
return "OK"
except Exception as e:
self.critical_issues.append(f"Memory check failed: {str(e)}")
return "ERROR"
def check_disk_usage(self):
try:
disk = psutil.disk_usage('/')
disk_percent = disk.percent
if disk_percent >= Config.DISK_CRITICAL_THRESHOLD:
self.critical_issues.append(f"Disk usage critical: {disk_percent}%")
return "CRITICAL"
elif disk_percent >= Config.DISK_WARNING_THRESHOLD:
self.warnings.append(f"Disk usage high: {disk_percent}%")
return "WARNING"
return "OK"
except Exception as e:
self.critical_issues.append(f"Disk check failed: {str(e)}")
return "ERROR"
def check_script_directories(self):
try:
if not os.path.exists(RIVDROITE_DIR) or not os.access(RIVDROITE_DIR, os.R_OK):
self.critical_issues.append(f"Cannot access RivDroite directory: {RIVDROITE_DIR}")
return "CRITICAL"
if not os.path.exists(RIVGAUCHE_DIR) or not os.access(RIVGAUCHE_DIR, os.R_OK):
self.critical_issues.append(f"Cannot access RivGauche directory: {RIVGAUCHE_DIR}")
return "CRITICAL"
return "OK"
except Exception as e:
self.critical_issues.append(f"Directory check failed: {str(e)}")
return "ERROR"
def run_health_check(self):
self.warnings = []
self.critical_issues = []
cpu_status = self.check_cpu_usage()
memory_status = self.check_memory_usage()
disk_status = self.check_disk_usage()
dir_status = self.check_script_directories()
if "CRITICAL" in [cpu_status, memory_status, disk_status, dir_status]:
system_status = "CRITICAL"
elif "WARNING" in [cpu_status, memory_status, disk_status, dir_status]:
system_status = "WARNING"
elif "ERROR" in [cpu_status, memory_status, disk_status, dir_status]:
system_status = "ERROR"
else:
system_status = "OK"
report = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"system": {
"status": system_status,
"warnings": self.warnings,
"critical_issues": self.critical_issues
},
"metrics": {
"cpu": {
"status": cpu_status,
"usage": psutil.cpu_percent()
},
"memory": {
"status": memory_status,
"usage": psutil.virtual_memory().percent
},
"disk": {
"status": disk_status,
"usage": psutil.disk_usage('/').percent
},
"directories": {
"status": dir_status
}
}
}
if self.warnings:
logging.warning("System warnings:\n" + "\n".join(self.warnings))
if self.critical_issues:
logging.error("Critical issues:\n" + "\n".join(self.critical_issues))
return report
class TaskManager:
def __init__(self):
self.script_executor = ScriptExecutor()
self.health_checker = SystemHealthCheck()
def run_script(self, script_name, env_name="base"):
script_path = os.path.join(RIVDROITE_DIR, script_name)
return self.script_executor.run_script_with_retry(script_path, env_name)
def run_health_check(self):
logging.info("Starting system health check")
try:
health_report = self.health_checker.run_health_check()
if health_report['system']['status'] in ['CRITICAL', 'ERROR']:
pass
return True
except Exception as e:
logging.error(f"Health check failed: {str(e)}")
return False
class SchedulerManager:
def __init__(self):
self.scheduler = BlockingScheduler()
self.task_manager = TaskManager()
self.setup_job_listeners()
def setup_job_listeners(self):
def job_listener(event):
if hasattr(event, 'job_id'):
if event.exception:
logging.error(f'Job {event.job_id} failed: {str(event.exception)}')
elif event.code == EVENT_JOB_MISSED:
logging.warning(f'Job {event.job_id} missed scheduled run time')
else:
logging.info(f'Job {event.job_id} completed successfully')
self.scheduler.add_listener(
job_listener,
EVENT_JOB_ERROR | EVENT_JOB_EXECUTED | EVENT_JOB_MISSED
)
def add_jobs(self):
try:
# Midnight updates (weekdays)
midnight_jobs = [
('commodities', self.task_manager.run_script, 'update_commodities.py'),
('economics', self.task_manager.run_script, 'update_economics.py'),
('indexes', self.task_manager.run_script, 'update_indexes.py'),
('stocks', self.task_manager.run_script, 'update_stocks.py'),
('stocksnews', self.task_manager.run_script, 'update_stocksnews.py'),
('forex', self.task_manager.run_script, 'update_forex.py')
]
# Add midnight jobs (weekdays)
for job_id, func, script in midnight_jobs:
self.scheduler.add_job(
func,
CronTrigger(day_of_week='mon-fri', hour=0, minute=0),
args=[script],
id=job_id
)
# 2 AM updates (weekdays)
early_morning_jobs = [
('statements', self.task_manager.run_script, 'update_statements.py'),
('technicals', self.task_manager.run_script, 'update_technicals.py'),
('consensus', self.task_manager.run_script, 'update_consensus.py'),
('financials', self.task_manager.run_script, 'update_financials.py')
]
# Add 2 AM jobs
for job_id, func, script in early_morning_jobs:
self.scheduler.add_job(
func,
CronTrigger(day_of_week='mon-fri', hour=2, minute=0),
args=[script],
id=job_id
)
# Weekly Sunday updates
sunday_jobs = [
('valuations', self.task_manager.run_script, 'update_valuations.py', 6),
('metrics', self.task_manager.run_script, 'update_metrics.py', 4),
('transcripts', self.task_manager.run_script, 'update_transcripts.py', 8)
]
# Add Sunday jobs
for job_id, func, script, hour in sunday_jobs:
self.scheduler.add_job(
func,
CronTrigger(day_of_week='sun', hour=hour, minute=0),
args=[script],
id=job_id
)
# Twice daily updates (midnight and midday, weekdays)
twice_daily_jobs = [
('econews', self.task_manager.run_script, 'update_econews.py'),
('sdgnews', self.task_manager.run_script, 'update_sdgnews.py'),
('usmarkets', self.task_manager.run_script, 'update_USmarkets.py')
]
# Add twice daily jobs
for job_id, func, script in twice_daily_jobs:
for hour in [0, 12]: # Midnight and Midday
self.scheduler.add_job(
func,
CronTrigger(day_of_week='mon-fri', hour=hour, minute=0),
args=[script],
id=f"{job_id}_{hour}"
)
# Health check (hourly)
self.scheduler.add_job(
self.task_manager.run_health_check,
IntervalTrigger(hours=1),
id='health_check',
next_run_time=datetime.now()
)
return True
except Exception as e:
logging.error(f"Error adding jobs: {str(e)}")
return False
def print_schedule(self):
try:
logging.info("\nScheduled Jobs:")
logging.info("-" * 80)
format_str = "{:<20} {:<30} {:<30}"
logging.info(format_str.format("Job ID", "Next Run Time", "Schedule"))
logging.info("-" * 80)
for job in self.scheduler.get_jobs():
try:
if hasattr(job, 'next_run_time') and job.next_run_time:
next_run = job.next_run_time.strftime('%Y-%m-%d %H:%M:%S')
else:
next_run = "Pending"
if isinstance(job.trigger, CronTrigger):
schedule = f"Cron: {job.trigger}"
elif isinstance(job.trigger, IntervalTrigger):
schedule = f"Every {job.trigger.interval}"
else:
schedule = str(job.trigger)
logging.info(format_str.format(
str(job.id),
next_run,
schedule
))
except Exception as e:
logging.warning(f"Could not print details for job {job.id}: {str(e)}")
continue
logging.info("-" * 80)
except Exception as e:
logging.error(f"Error printing schedule: {str(e)}")
def start(self):
try:
if not self.add_jobs():
raise Exception("Failed to add jobs to scheduler")
self.print_schedule()
logging.info("Starting scheduler...")
self.scheduler.start()
except (KeyboardInterrupt, SystemExit):
self.shutdown()
except Exception as e:
logging.error(f"Scheduler error: {str(e)}")
self.shutdown()
raise
def shutdown(self):
try:
logging.info("Shutting down scheduler...")
if hasattr(self.scheduler, 'running') and self.scheduler.running:
self.scheduler.shutdown()
logging.info("Scheduler shutdown complete")
except Exception as e:
logging.error(f"Error during shutdown: {str(e)}")
def main():
setup_logging()
logging.info("Starting RivaCube Scheduler...")
if not verify_dependencies():
logging.error("Failed to verify dependencies. Exiting.")
sys.exit(1)
scheduler_manager = SchedulerManager()
def signal_handler(signum, frame):
logging.info("Received shutdown signal...")
scheduler_manager.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
scheduler_manager.start()
except Exception as e:
logging.error(f"Fatal error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()