#!/usr/bin/env python3 """AlphaX Docker scheduler with lightweight process concurrency.""" from __future__ import annotations import os import subprocess import sys import tempfile import time from dataclasses import dataclass, field from datetime import datetime from pathlib import Path ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from app.db.schema import init_db from app.db.scheduler_db import ( claim_manual_triggers, get_job_configs, init_scheduler_tables, update_manual_trigger, update_runtime, ) from app.db.system_logs import record_system_error from app.config.system_config import scheduler_config PYTHON = sys.executable DRY_RUN = None # Test-only override; runtime reads system_config.scheduler.dry_run. def _scheduler_settings() -> dict: cfg = scheduler_config() or {} return { "dry_run": bool(cfg.get("dry_run", True)), "poll_seconds": float(cfg.get("poll_seconds") or 1.0), "config_reload_seconds": float(cfg.get("config_reload_seconds") or 5.0), "pending_warn_seconds": float(cfg.get("pending_warn_seconds") or 30.0), } def scheduler_dry_run() -> bool: if DRY_RUN is not None: return bool(DRY_RUN) return bool(_scheduler_settings()["dry_run"]) @dataclass class Job: name: str command: str every_seconds: int args: tuple[str, ...] = () initial_delay: int = 0 lock_group: str = "" enabled: bool = True description: str = "" next_run: float = 0.0 pending: bool = False pending_since: float = 0.0 last_pending_log: float = 0.0 @dataclass class RunningJob: job: Job proc: subprocess.Popen started_at: float started_iso: str run_kind: str = "auto" trigger_id: int = 0 output_file: object | None = None def now_str() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def iso_now() -> str: return datetime.now().isoformat() def env_for_child() -> dict[str, str]: env = os.environ.copy() env.setdefault("PYTHONUNBUFFERED", "1") return env def _tail(text: str, limit: int = 8000) -> str: value = text or "" return value[-limit:] if len(value) > limit else value def _next_run_iso(ts: float) -> str: return datetime.fromtimestamp(ts).isoformat() if ts else "" def load_jobs(existing: dict[str, Job], base: float, running: dict[str, RunningJob] | None = None) -> dict[str, Job]: running = running or {} loaded = {} for cfg in get_job_configs(): name = cfg["job_name"] old = existing.get(name) every = int(cfg.get("every_seconds") or 60) job = Job( name=name, command=cfg.get("command") or name, every_seconds=every, args=tuple(cfg.get("args") or ()), initial_delay=int(cfg.get("initial_delay") or 0), lock_group=cfg.get("lock_group") or name, enabled=bool(cfg.get("enabled")), description=cfg.get("description") or "", next_run=old.next_run if old else base + int(cfg.get("initial_delay") or 0), pending=old.pending if old else False, pending_since=old.pending_since if old else 0.0, last_pending_log=old.last_pending_log if old else 0.0, ) loaded[name] = job if name not in running: update_runtime(name, next_run_at=_next_run_iso(job.next_run), status=("disabled" if not job.enabled else ("pending" if job.pending else "idle"))) return loaded def _lock_busy(job: Job, running: dict[str, RunningJob]) -> bool: if job.name in running: return True return any(item.job.lock_group == job.lock_group for item in running.values()) def _mark_pending(job: Job, reason: str) -> None: now = time.time() pending_warn_seconds = _scheduler_settings()["pending_warn_seconds"] if not job.pending: job.pending = True job.pending_since = now if now - job.last_pending_log >= pending_warn_seconds: print(f"[{now_str()}] [scheduler] pending {job.name}: {reason}", flush=True) job.last_pending_log = now update_runtime(job.name, status="pending", locked_by=reason, next_run_at=_next_run_iso(job.next_run)) def start_job(job: Job, running: dict[str, RunningJob], run_kind: str = "auto", trigger_id: int = 0) -> bool: if _lock_busy(job, running): blocker = "same_job" if job.name in running else f"lock:{job.lock_group}" _mark_pending(job, blocker) return False cmd = [PYTHON, "-m", "app.cli", job.command, *job.args] print(f"[{now_str()}] [scheduler] start {job.name} kind={run_kind}: {' '.join(cmd)}", flush=True) job.pending = False job.pending_since = 0.0 job.last_pending_log = 0.0 if scheduler_dry_run(): print(f"[{now_str()}] [scheduler] DRY_RUN=1 skip {job.name}", flush=True) update_runtime( job.name, status="idle" if job.enabled else "disabled", pid=0, run_kind=run_kind, trigger_id=trigger_id, locked_by="", last_started_at=iso_now(), last_finished_at=iso_now(), last_exit_code=0, last_duration_ms=0, last_error="dry_run", output_tail="DRY_RUN=1", ) if trigger_id: update_manual_trigger(trigger_id, status="skipped", finished_at=iso_now(), output_tail="DRY_RUN=1") return True started_iso = iso_now() output_file = tempfile.TemporaryFile(mode="w+t", encoding="utf-8", errors="replace") proc = subprocess.Popen( cmd, cwd=ROOT, env=env_for_child(), text=True, stdout=output_file, stderr=subprocess.STDOUT, ) running[job.name] = RunningJob( job=job, proc=proc, started_at=time.time(), started_iso=started_iso, run_kind=run_kind, trigger_id=trigger_id, output_file=output_file, ) update_runtime( job.name, status="running", pid=proc.pid, run_kind=run_kind, trigger_id=trigger_id, locked_by=job.lock_group, last_started_at=started_iso, last_error="", output_tail="", ) if trigger_id: update_manual_trigger(trigger_id, status="running", started_at=started_iso) return True def finish_running_jobs(running: dict[str, RunningJob]) -> None: for name, item in list(running.items()): proc = item.proc timeout = max(item.job.every_seconds * 2, 600) elapsed = time.time() - item.started_at killed_by_timeout = False if proc.poll() is None and elapsed > timeout: proc.kill() killed_by_timeout = True print(f"[{now_str()}] [scheduler] timeout {name} after {elapsed:.1f}s", flush=True) if proc.poll() is None: continue out = "" try: if item.output_file: item.output_file.seek(0) out = item.output_file.read() except Exception: out = "" finally: try: if item.output_file: item.output_file.close() except Exception: pass duration_ms = int((time.time() - item.started_at) * 1000) exit_code = int(proc.returncode or 0) output_tail = _tail(out.strip()) status = "idle" if item.job.enabled else "disabled" err = "" if exit_code == 0 else f"exit={exit_code}" print(f"[{now_str()}] [scheduler] done {name} exit={exit_code} duration={duration_ms/1000:.1f}s", flush=True) if output_tail: print(output_tail, flush=True) if exit_code != 0: timed_out = killed_by_timeout or elapsed >= timeout error_type = f"{name}_exit_{exit_code}" message = f"scheduler job {name} failed with exit={exit_code}" if timed_out and exit_code == -9: error_type = f"{name}_timeout_killed" message = f"scheduler job {name} exceeded timeout={int(timeout)}s and was killed" elif exit_code == -9: error_type = f"{name}_sigkill" message = f"scheduler job {name} received SIGKILL before scheduler timeout; check container resources and external API stalls" record_system_error( source="scheduler", level="error", error_type=error_type, message=message, stack_trace=output_tail, status_code=exit_code, context={ "job_name": name, "run_kind": item.run_kind, "trigger_id": item.trigger_id, "duration_ms": duration_ms, "timeout_seconds": int(timeout), "killed_by_timeout": timed_out, }, ) update_runtime( name, status=status, pid=0, run_kind=item.run_kind, trigger_id=item.trigger_id, locked_by="", next_run_at=_next_run_iso(item.job.next_run), last_finished_at=iso_now(), last_exit_code=exit_code, last_duration_ms=duration_ms, last_error=err, output_tail=output_tail, ) if item.trigger_id: update_manual_trigger( item.trigger_id, status="success" if exit_code == 0 else "error", finished_at=iso_now(), exit_code=exit_code, duration_ms=duration_ms, output_tail=output_tail, error_message=err, ) running.pop(name, None) def handle_manual_triggers(jobs: dict[str, Job], running: dict[str, RunningJob]) -> None: for trigger in claim_manual_triggers(limit=10): job = jobs.get(trigger.get("job_name")) trigger_id = int(trigger.get("id") or 0) if not job: update_manual_trigger(trigger_id, status="error", error_message="unknown job", finished_at=iso_now()) continue if not job.enabled and not trigger.get("force"): update_manual_trigger(trigger_id, status="rejected", error_message="job disabled", finished_at=iso_now()) continue if start_job(job, running, run_kind="manual", trigger_id=trigger_id): continue update_manual_trigger(trigger_id, status="pending") def schedule_due_jobs(jobs: dict[str, Job], running: dict[str, RunningJob]) -> None: now = time.time() for job in sorted(jobs.values(), key=lambda item: item.next_run): if not job.enabled: update_runtime(job.name, status="disabled", next_run_at="") continue if job.pending or now >= job.next_run: if start_job(job, running, run_kind="auto"): job.next_run = time.time() + job.every_seconds update_runtime(job.name, next_run_at=_next_run_iso(job.next_run)) def main() -> None: init_db() init_scheduler_tables() base = time.time() jobs: dict[str, Job] = {} running: dict[str, RunningJob] = {} jobs = load_jobs(jobs, base, running) last_reload = time.time() settings = _scheduler_settings() print(f"[{now_str()}] [scheduler] started jobs={len(jobs)} dry_run={settings['dry_run']} mode=concurrent", flush=True) while True: finish_running_jobs(running) settings = _scheduler_settings() if time.time() - last_reload >= settings["config_reload_seconds"]: jobs = load_jobs(jobs, time.time(), running) last_reload = time.time() handle_manual_triggers(jobs, running) schedule_due_jobs(jobs, running) time.sleep(settings["poll_seconds"]) if __name__ == "__main__": main()