517 lines
18 KiB
Python
Executable File
517 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import signal
|
|
import logging
|
|
import psutil
|
|
import subprocess
|
|
from datetime import datetime
|
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
from apscheduler.events import (
|
|
EVENT_JOB_ERROR,
|
|
EVENT_JOB_EXECUTED,
|
|
EVENT_JOB_MISSED
|
|
)
|
|
|
|
class Config:
|
|
LOG_FILE = "scheduler.log"
|
|
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
|
|
LOG_LEVEL = logging.INFO
|
|
|
|
CPU_WARNING_THRESHOLD = 80
|
|
CPU_CRITICAL_THRESHOLD = 90
|
|
MEMORY_WARNING_THRESHOLD = 80
|
|
MEMORY_CRITICAL_THRESHOLD = 90
|
|
DISK_WARNING_THRESHOLD = 80
|
|
DISK_CRITICAL_THRESHOLD = 90
|
|
|
|
SCRIPT_TIMEOUT = 3600 # 1 hour
|
|
MAX_RETRIES = 3
|
|
RETRY_DELAY = 300 # 5 minutes
|
|
|
|
USE_CONDA = False
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
RIVDROITE_DIR = BASE_DIR
|
|
RIVGAUCHE_DIR = BASE_DIR
|
|
|
|
def find_conda():
|
|
possible_paths = [
|
|
"/home/cube/mambaforge/etc/profile.d/conda.sh",
|
|
"/home/cube/miniconda3/etc/profile.d/conda.sh",
|
|
"/home/cube/anaconda3/etc/profile.d/conda.sh",
|
|
os.path.expanduser("~/mambaforge/etc/profile.d/conda.sh"),
|
|
os.path.expanduser("~/miniconda3/etc/profile.d/conda.sh"),
|
|
os.path.expanduser("~/anaconda3/etc/profile.d/conda.sh")
|
|
]
|
|
|
|
for path in possible_paths:
|
|
if os.path.exists(path):
|
|
Config.USE_CONDA = True
|
|
return path
|
|
return None
|
|
|
|
CONDA_PATH = find_conda()
|
|
|
|
def setup_logging():
|
|
try:
|
|
logging.basicConfig(
|
|
level=Config.LOG_LEVEL,
|
|
format=Config.LOG_FORMAT,
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler(Config.LOG_FILE)
|
|
]
|
|
)
|
|
logging.info("Logging system initialized")
|
|
except Exception as e:
|
|
print(f"Failed to initialize logging: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def verify_dependencies():
|
|
try:
|
|
if not os.path.exists(RIVDROITE_DIR):
|
|
logging.error(f"RivDroite directory not found: {RIVDROITE_DIR}")
|
|
return False
|
|
|
|
if not os.path.exists(RIVGAUCHE_DIR):
|
|
logging.error(f"RivGauche directory not found: {RIVGAUCHE_DIR}")
|
|
return False
|
|
|
|
try:
|
|
import pandas
|
|
import numpy
|
|
import psycopg2
|
|
import requests
|
|
except ImportError as e:
|
|
logging.warning(f"Missing optional Python package: {str(e)}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Dependency verification failed: {str(e)}")
|
|
return False
|
|
|
|
class ScriptExecutor:
|
|
def __init__(self):
|
|
self.conda_path = CONDA_PATH
|
|
self.env_setup_complete = False
|
|
|
|
def setup_conda_env(self):
|
|
if not Config.USE_CONDA or self.env_setup_complete:
|
|
return True
|
|
|
|
try:
|
|
if self.conda_path:
|
|
cmd = f'source {self.conda_path}'
|
|
subprocess.run(cmd, shell=True, executable='/bin/bash', check=True)
|
|
self.env_setup_complete = True
|
|
return True
|
|
return False
|
|
except subprocess.CalledProcessError as e:
|
|
logging.error(f"Failed to setup conda environment: {str(e)}")
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error during conda setup: {str(e)}")
|
|
return False
|
|
|
|
def run_script(self, script_path, env_name="base"):
|
|
if not os.path.exists(script_path):
|
|
logging.error(f"Script not found: {script_path}")
|
|
return False
|
|
|
|
try:
|
|
script_name = os.path.basename(script_path)
|
|
logging.info(f"Starting {script_name}")
|
|
|
|
if Config.USE_CONDA and self.conda_path:
|
|
if not self.setup_conda_env():
|
|
return False
|
|
cmd = f'source {self.conda_path} && conda activate {env_name} && python {script_path}'
|
|
else:
|
|
cmd = f'python {script_path}'
|
|
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
shell=True,
|
|
executable='/bin/bash',
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE
|
|
)
|
|
|
|
try:
|
|
stdout, stderr = process.communicate(timeout=Config.SCRIPT_TIMEOUT)
|
|
|
|
if process.returncode != 0:
|
|
logging.error(f"Script {script_name} failed with return code {process.returncode}")
|
|
if stderr:
|
|
logging.error(f"Error output: {stderr.decode()}")
|
|
return False
|
|
|
|
if stdout:
|
|
logging.debug(f"Script output: {stdout.decode()}")
|
|
return True
|
|
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
logging.error(f"Script {script_name} timed out after {Config.SCRIPT_TIMEOUT} seconds")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error executing {script_path}: {str(e)}")
|
|
return False
|
|
|
|
def run_script_with_retry(self, script_path, env_name="base"):
|
|
retries = 0
|
|
while retries < Config.MAX_RETRIES:
|
|
if self.run_script(script_path, env_name):
|
|
return True
|
|
|
|
retries += 1
|
|
if retries < Config.MAX_RETRIES:
|
|
logging.warning(f"Retrying {script_path} in {Config.RETRY_DELAY} seconds... (Attempt {retries + 1}/{Config.MAX_RETRIES})")
|
|
time.sleep(Config.RETRY_DELAY)
|
|
|
|
logging.error(f"Script {script_path} failed after {Config.MAX_RETRIES} attempts")
|
|
return False
|
|
|
|
class SystemHealthCheck:
|
|
def __init__(self):
|
|
self.warnings = []
|
|
self.critical_issues = []
|
|
|
|
def check_cpu_usage(self):
|
|
try:
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
|
|
if cpu_percent >= Config.CPU_CRITICAL_THRESHOLD:
|
|
self.critical_issues.append(f"CPU usage critical: {cpu_percent}%")
|
|
return "CRITICAL"
|
|
elif cpu_percent >= Config.CPU_WARNING_THRESHOLD:
|
|
self.warnings.append(f"CPU usage high: {cpu_percent}%")
|
|
return "WARNING"
|
|
return "OK"
|
|
|
|
except Exception as e:
|
|
self.critical_issues.append(f"CPU check failed: {str(e)}")
|
|
return "ERROR"
|
|
|
|
def check_memory_usage(self):
|
|
try:
|
|
memory = psutil.virtual_memory()
|
|
memory_percent = memory.percent
|
|
|
|
if memory_percent >= Config.MEMORY_CRITICAL_THRESHOLD:
|
|
self.critical_issues.append(f"Memory usage critical: {memory_percent}%")
|
|
return "CRITICAL"
|
|
elif memory_percent >= Config.MEMORY_WARNING_THRESHOLD:
|
|
self.warnings.append(f"Memory usage high: {memory_percent}%")
|
|
return "WARNING"
|
|
return "OK"
|
|
|
|
except Exception as e:
|
|
self.critical_issues.append(f"Memory check failed: {str(e)}")
|
|
return "ERROR"
|
|
|
|
def check_disk_usage(self):
|
|
try:
|
|
disk = psutil.disk_usage('/')
|
|
disk_percent = disk.percent
|
|
|
|
if disk_percent >= Config.DISK_CRITICAL_THRESHOLD:
|
|
self.critical_issues.append(f"Disk usage critical: {disk_percent}%")
|
|
return "CRITICAL"
|
|
elif disk_percent >= Config.DISK_WARNING_THRESHOLD:
|
|
self.warnings.append(f"Disk usage high: {disk_percent}%")
|
|
return "WARNING"
|
|
return "OK"
|
|
|
|
except Exception as e:
|
|
self.critical_issues.append(f"Disk check failed: {str(e)}")
|
|
return "ERROR"
|
|
|
|
def check_script_directories(self):
|
|
try:
|
|
if not os.path.exists(RIVDROITE_DIR) or not os.access(RIVDROITE_DIR, os.R_OK):
|
|
self.critical_issues.append(f"Cannot access RivDroite directory: {RIVDROITE_DIR}")
|
|
return "CRITICAL"
|
|
|
|
if not os.path.exists(RIVGAUCHE_DIR) or not os.access(RIVGAUCHE_DIR, os.R_OK):
|
|
self.critical_issues.append(f"Cannot access RivGauche directory: {RIVGAUCHE_DIR}")
|
|
return "CRITICAL"
|
|
|
|
return "OK"
|
|
|
|
except Exception as e:
|
|
self.critical_issues.append(f"Directory check failed: {str(e)}")
|
|
return "ERROR"
|
|
|
|
def run_health_check(self):
|
|
self.warnings = []
|
|
self.critical_issues = []
|
|
|
|
cpu_status = self.check_cpu_usage()
|
|
memory_status = self.check_memory_usage()
|
|
disk_status = self.check_disk_usage()
|
|
dir_status = self.check_script_directories()
|
|
|
|
if "CRITICAL" in [cpu_status, memory_status, disk_status, dir_status]:
|
|
system_status = "CRITICAL"
|
|
elif "WARNING" in [cpu_status, memory_status, disk_status, dir_status]:
|
|
system_status = "WARNING"
|
|
elif "ERROR" in [cpu_status, memory_status, disk_status, dir_status]:
|
|
system_status = "ERROR"
|
|
else:
|
|
system_status = "OK"
|
|
|
|
report = {
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"system": {
|
|
"status": system_status,
|
|
"warnings": self.warnings,
|
|
"critical_issues": self.critical_issues
|
|
},
|
|
"metrics": {
|
|
"cpu": {
|
|
"status": cpu_status,
|
|
"usage": psutil.cpu_percent()
|
|
},
|
|
"memory": {
|
|
"status": memory_status,
|
|
"usage": psutil.virtual_memory().percent
|
|
},
|
|
"disk": {
|
|
"status": disk_status,
|
|
"usage": psutil.disk_usage('/').percent
|
|
},
|
|
"directories": {
|
|
"status": dir_status
|
|
}
|
|
}
|
|
}
|
|
|
|
if self.warnings:
|
|
logging.warning("System warnings:\n" + "\n".join(self.warnings))
|
|
if self.critical_issues:
|
|
logging.error("Critical issues:\n" + "\n".join(self.critical_issues))
|
|
|
|
return report
|
|
|
|
class TaskManager:
|
|
def __init__(self):
|
|
self.script_executor = ScriptExecutor()
|
|
self.health_checker = SystemHealthCheck()
|
|
|
|
def run_script(self, script_name, env_name="base"):
|
|
script_path = os.path.join(RIVDROITE_DIR, script_name)
|
|
return self.script_executor.run_script_with_retry(script_path, env_name)
|
|
|
|
def run_health_check(self):
|
|
logging.info("Starting system health check")
|
|
try:
|
|
health_report = self.health_checker.run_health_check()
|
|
if health_report['system']['status'] in ['CRITICAL', 'ERROR']:
|
|
pass
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Health check failed: {str(e)}")
|
|
return False
|
|
|
|
class SchedulerManager:
|
|
def __init__(self):
|
|
self.scheduler = BlockingScheduler()
|
|
self.task_manager = TaskManager()
|
|
self.setup_job_listeners()
|
|
|
|
def setup_job_listeners(self):
|
|
def job_listener(event):
|
|
if hasattr(event, 'job_id'):
|
|
if event.exception:
|
|
logging.error(f'Job {event.job_id} failed: {str(event.exception)}')
|
|
elif event.code == EVENT_JOB_MISSED:
|
|
logging.warning(f'Job {event.job_id} missed scheduled run time')
|
|
else:
|
|
logging.info(f'Job {event.job_id} completed successfully')
|
|
|
|
self.scheduler.add_listener(
|
|
job_listener,
|
|
EVENT_JOB_ERROR | EVENT_JOB_EXECUTED | EVENT_JOB_MISSED
|
|
)
|
|
|
|
def add_jobs(self):
|
|
try:
|
|
# Midnight updates (weekdays)
|
|
midnight_jobs = [
|
|
('commodities', self.task_manager.run_script, 'update_commodities.py'),
|
|
('economics', self.task_manager.run_script, 'update_economics.py'),
|
|
('indexes', self.task_manager.run_script, 'update_indexes.py'),
|
|
('stocks', self.task_manager.run_script, 'update_stocks.py'),
|
|
('stocksnews', self.task_manager.run_script, 'update_stocksnews.py'),
|
|
('forex', self.task_manager.run_script, 'update_forex.py')
|
|
]
|
|
|
|
# Add midnight jobs (weekdays)
|
|
for job_id, func, script in midnight_jobs:
|
|
self.scheduler.add_job(
|
|
func,
|
|
CronTrigger(day_of_week='mon-fri', hour=0, minute=0),
|
|
args=[script],
|
|
id=job_id
|
|
)
|
|
|
|
# 2 AM updates (weekdays)
|
|
early_morning_jobs = [
|
|
('statements', self.task_manager.run_script, 'update_statements.py'),
|
|
('technicals', self.task_manager.run_script, 'update_technicals.py'),
|
|
('consensus', self.task_manager.run_script, 'update_consensus.py'),
|
|
('financials', self.task_manager.run_script, 'update_financials.py')
|
|
]
|
|
|
|
# Add 2 AM jobs
|
|
for job_id, func, script in early_morning_jobs:
|
|
self.scheduler.add_job(
|
|
func,
|
|
CronTrigger(day_of_week='mon-fri', hour=2, minute=0),
|
|
args=[script],
|
|
id=job_id
|
|
)
|
|
|
|
# Weekly Sunday updates
|
|
sunday_jobs = [
|
|
('valuations', self.task_manager.run_script, 'update_valuations.py', 6),
|
|
('metrics', self.task_manager.run_script, 'update_metrics.py', 4),
|
|
('transcripts', self.task_manager.run_script, 'update_transcripts.py', 8)
|
|
]
|
|
|
|
# Add Sunday jobs
|
|
for job_id, func, script, hour in sunday_jobs:
|
|
self.scheduler.add_job(
|
|
func,
|
|
CronTrigger(day_of_week='sun', hour=hour, minute=0),
|
|
args=[script],
|
|
id=job_id
|
|
)
|
|
|
|
# Twice daily updates (midnight and midday, weekdays)
|
|
twice_daily_jobs = [
|
|
('econews', self.task_manager.run_script, 'update_econews.py'),
|
|
('sdgnews', self.task_manager.run_script, 'update_sdgnews.py'),
|
|
('usmarkets', self.task_manager.run_script, 'update_USmarkets.py')
|
|
]
|
|
|
|
# Add twice daily jobs
|
|
for job_id, func, script in twice_daily_jobs:
|
|
for hour in [0, 12]: # Midnight and Midday
|
|
self.scheduler.add_job(
|
|
func,
|
|
CronTrigger(day_of_week='mon-fri', hour=hour, minute=0),
|
|
args=[script],
|
|
id=f"{job_id}_{hour}"
|
|
)
|
|
|
|
# Health check (hourly)
|
|
self.scheduler.add_job(
|
|
self.task_manager.run_health_check,
|
|
IntervalTrigger(hours=1),
|
|
id='health_check',
|
|
next_run_time=datetime.now()
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error adding jobs: {str(e)}")
|
|
return False
|
|
|
|
def print_schedule(self):
|
|
try:
|
|
logging.info("\nScheduled Jobs:")
|
|
logging.info("-" * 80)
|
|
format_str = "{:<20} {:<30} {:<30}"
|
|
logging.info(format_str.format("Job ID", "Next Run Time", "Schedule"))
|
|
logging.info("-" * 80)
|
|
|
|
for job in self.scheduler.get_jobs():
|
|
try:
|
|
if hasattr(job, 'next_run_time') and job.next_run_time:
|
|
next_run = job.next_run_time.strftime('%Y-%m-%d %H:%M:%S')
|
|
else:
|
|
next_run = "Pending"
|
|
|
|
if isinstance(job.trigger, CronTrigger):
|
|
schedule = f"Cron: {job.trigger}"
|
|
elif isinstance(job.trigger, IntervalTrigger):
|
|
schedule = f"Every {job.trigger.interval}"
|
|
else:
|
|
schedule = str(job.trigger)
|
|
|
|
logging.info(format_str.format(
|
|
str(job.id),
|
|
next_run,
|
|
schedule
|
|
))
|
|
except Exception as e:
|
|
logging.warning(f"Could not print details for job {job.id}: {str(e)}")
|
|
continue
|
|
|
|
logging.info("-" * 80)
|
|
except Exception as e:
|
|
logging.error(f"Error printing schedule: {str(e)}")
|
|
|
|
def start(self):
|
|
try:
|
|
if not self.add_jobs():
|
|
raise Exception("Failed to add jobs to scheduler")
|
|
|
|
self.print_schedule()
|
|
logging.info("Starting scheduler...")
|
|
self.scheduler.start()
|
|
|
|
except (KeyboardInterrupt, SystemExit):
|
|
self.shutdown()
|
|
except Exception as e:
|
|
logging.error(f"Scheduler error: {str(e)}")
|
|
self.shutdown()
|
|
raise
|
|
|
|
def shutdown(self):
|
|
try:
|
|
logging.info("Shutting down scheduler...")
|
|
if hasattr(self.scheduler, 'running') and self.scheduler.running:
|
|
self.scheduler.shutdown()
|
|
logging.info("Scheduler shutdown complete")
|
|
except Exception as e:
|
|
logging.error(f"Error during shutdown: {str(e)}")
|
|
|
|
def main():
|
|
setup_logging()
|
|
logging.info("Starting RivaCube Scheduler...")
|
|
|
|
if not verify_dependencies():
|
|
logging.error("Failed to verify dependencies. Exiting.")
|
|
sys.exit(1)
|
|
|
|
scheduler_manager = SchedulerManager()
|
|
|
|
def signal_handler(signum, frame):
|
|
logging.info("Received shutdown signal...")
|
|
scheduler_manager.shutdown()
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
try:
|
|
scheduler_manager.start()
|
|
except Exception as e:
|
|
logging.error(f"Fatal error: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|