330 lines
11 KiB
Python
Executable File
330 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""AlphaX Docker scheduler with lightweight process concurrency."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from app.db.schema import init_db
|
|
from app.db.scheduler_db import (
|
|
claim_manual_triggers,
|
|
get_job_configs,
|
|
init_scheduler_tables,
|
|
update_manual_trigger,
|
|
update_runtime,
|
|
)
|
|
from app.db.system_logs import record_system_error
|
|
from app.config.system_config import scheduler_config
|
|
|
|
PYTHON = sys.executable
|
|
DRY_RUN = None # Test-only override; runtime reads system_config.scheduler.dry_run.
|
|
|
|
|
|
def _scheduler_settings() -> dict:
|
|
cfg = scheduler_config() or {}
|
|
return {
|
|
"dry_run": bool(cfg.get("dry_run", True)),
|
|
"poll_seconds": float(cfg.get("poll_seconds") or 1.0),
|
|
"config_reload_seconds": float(cfg.get("config_reload_seconds") or 5.0),
|
|
"pending_warn_seconds": float(cfg.get("pending_warn_seconds") or 30.0),
|
|
}
|
|
|
|
|
|
def scheduler_dry_run() -> bool:
|
|
if DRY_RUN is not None:
|
|
return bool(DRY_RUN)
|
|
return bool(_scheduler_settings()["dry_run"])
|
|
|
|
|
|
@dataclass
|
|
class Job:
|
|
name: str
|
|
command: str
|
|
every_seconds: int
|
|
args: tuple[str, ...] = ()
|
|
initial_delay: int = 0
|
|
lock_group: str = ""
|
|
enabled: bool = True
|
|
description: str = ""
|
|
next_run: float = 0.0
|
|
pending: bool = False
|
|
pending_since: float = 0.0
|
|
last_pending_log: float = 0.0
|
|
|
|
|
|
@dataclass
|
|
class RunningJob:
|
|
job: Job
|
|
proc: subprocess.Popen
|
|
started_at: float
|
|
started_iso: str
|
|
run_kind: str = "auto"
|
|
trigger_id: int = 0
|
|
output_file: object | None = None
|
|
|
|
|
|
def now_str() -> str:
|
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def iso_now() -> str:
|
|
return datetime.now().isoformat()
|
|
|
|
|
|
def env_for_child() -> dict[str, str]:
|
|
env = os.environ.copy()
|
|
env.setdefault("PYTHONUNBUFFERED", "1")
|
|
return env
|
|
|
|
|
|
def _tail(text: str, limit: int = 8000) -> str:
|
|
value = text or ""
|
|
return value[-limit:] if len(value) > limit else value
|
|
|
|
|
|
def _next_run_iso(ts: float) -> str:
|
|
return datetime.fromtimestamp(ts).isoformat() if ts else ""
|
|
|
|
|
|
def load_jobs(existing: dict[str, Job], base: float, running: dict[str, RunningJob] | None = None) -> dict[str, Job]:
|
|
running = running or {}
|
|
loaded = {}
|
|
for cfg in get_job_configs():
|
|
name = cfg["job_name"]
|
|
old = existing.get(name)
|
|
every = int(cfg.get("every_seconds") or 60)
|
|
job = Job(
|
|
name=name,
|
|
command=cfg.get("command") or name,
|
|
every_seconds=every,
|
|
args=tuple(cfg.get("args") or ()),
|
|
initial_delay=int(cfg.get("initial_delay") or 0),
|
|
lock_group=cfg.get("lock_group") or name,
|
|
enabled=bool(cfg.get("enabled")),
|
|
description=cfg.get("description") or "",
|
|
next_run=old.next_run if old else base + int(cfg.get("initial_delay") or 0),
|
|
pending=old.pending if old else False,
|
|
pending_since=old.pending_since if old else 0.0,
|
|
last_pending_log=old.last_pending_log if old else 0.0,
|
|
)
|
|
loaded[name] = job
|
|
if name not in running:
|
|
update_runtime(name, next_run_at=_next_run_iso(job.next_run), status=("disabled" if not job.enabled else ("pending" if job.pending else "idle")))
|
|
return loaded
|
|
|
|
|
|
def _lock_busy(job: Job, running: dict[str, RunningJob]) -> bool:
|
|
if job.name in running:
|
|
return True
|
|
return any(item.job.lock_group == job.lock_group for item in running.values())
|
|
|
|
|
|
def _mark_pending(job: Job, reason: str) -> None:
|
|
now = time.time()
|
|
pending_warn_seconds = _scheduler_settings()["pending_warn_seconds"]
|
|
if not job.pending:
|
|
job.pending = True
|
|
job.pending_since = now
|
|
if now - job.last_pending_log >= pending_warn_seconds:
|
|
print(f"[{now_str()}] [scheduler] pending {job.name}: {reason}", flush=True)
|
|
job.last_pending_log = now
|
|
update_runtime(job.name, status="pending", locked_by=reason, next_run_at=_next_run_iso(job.next_run))
|
|
|
|
|
|
def start_job(job: Job, running: dict[str, RunningJob], run_kind: str = "auto", trigger_id: int = 0) -> bool:
|
|
if _lock_busy(job, running):
|
|
blocker = "same_job" if job.name in running else f"lock:{job.lock_group}"
|
|
_mark_pending(job, blocker)
|
|
return False
|
|
|
|
cmd = [PYTHON, "-m", "app.cli", job.command, *job.args]
|
|
print(f"[{now_str()}] [scheduler] start {job.name} kind={run_kind}: {' '.join(cmd)}", flush=True)
|
|
job.pending = False
|
|
job.pending_since = 0.0
|
|
job.last_pending_log = 0.0
|
|
|
|
if scheduler_dry_run():
|
|
print(f"[{now_str()}] [scheduler] DRY_RUN=1 skip {job.name}", flush=True)
|
|
update_runtime(
|
|
job.name,
|
|
status="idle" if job.enabled else "disabled",
|
|
pid=0,
|
|
run_kind=run_kind,
|
|
trigger_id=trigger_id,
|
|
locked_by="",
|
|
last_started_at=iso_now(),
|
|
last_finished_at=iso_now(),
|
|
last_exit_code=0,
|
|
last_duration_ms=0,
|
|
last_error="dry_run",
|
|
output_tail="DRY_RUN=1",
|
|
)
|
|
if trigger_id:
|
|
update_manual_trigger(trigger_id, status="skipped", finished_at=iso_now(), output_tail="DRY_RUN=1")
|
|
return True
|
|
|
|
started_iso = iso_now()
|
|
output_file = tempfile.TemporaryFile(mode="w+t", encoding="utf-8", errors="replace")
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
cwd=ROOT,
|
|
env=env_for_child(),
|
|
text=True,
|
|
stdout=output_file,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
running[job.name] = RunningJob(
|
|
job=job,
|
|
proc=proc,
|
|
started_at=time.time(),
|
|
started_iso=started_iso,
|
|
run_kind=run_kind,
|
|
trigger_id=trigger_id,
|
|
output_file=output_file,
|
|
)
|
|
update_runtime(
|
|
job.name,
|
|
status="running",
|
|
pid=proc.pid,
|
|
run_kind=run_kind,
|
|
trigger_id=trigger_id,
|
|
locked_by=job.lock_group,
|
|
last_started_at=started_iso,
|
|
last_error="",
|
|
output_tail="",
|
|
)
|
|
if trigger_id:
|
|
update_manual_trigger(trigger_id, status="running", started_at=started_iso)
|
|
return True
|
|
|
|
|
|
def finish_running_jobs(running: dict[str, RunningJob]) -> None:
|
|
for name, item in list(running.items()):
|
|
proc = item.proc
|
|
timeout = max(item.job.every_seconds * 2, 600)
|
|
elapsed = time.time() - item.started_at
|
|
if proc.poll() is None and elapsed > timeout:
|
|
proc.kill()
|
|
print(f"[{now_str()}] [scheduler] timeout {name} after {elapsed:.1f}s", flush=True)
|
|
if proc.poll() is None:
|
|
continue
|
|
out = ""
|
|
try:
|
|
if item.output_file:
|
|
item.output_file.seek(0)
|
|
out = item.output_file.read()
|
|
except Exception:
|
|
out = ""
|
|
finally:
|
|
try:
|
|
if item.output_file:
|
|
item.output_file.close()
|
|
except Exception:
|
|
pass
|
|
duration_ms = int((time.time() - item.started_at) * 1000)
|
|
exit_code = int(proc.returncode or 0)
|
|
output_tail = _tail(out.strip())
|
|
status = "idle" if item.job.enabled else "disabled"
|
|
err = "" if exit_code == 0 else f"exit={exit_code}"
|
|
print(f"[{now_str()}] [scheduler] done {name} exit={exit_code} duration={duration_ms/1000:.1f}s", flush=True)
|
|
if output_tail:
|
|
print(output_tail, flush=True)
|
|
if exit_code != 0:
|
|
record_system_error(
|
|
source="scheduler",
|
|
level="error",
|
|
error_type=f"{name}_exit_{exit_code}",
|
|
message=f"scheduler job {name} failed with exit={exit_code}",
|
|
stack_trace=output_tail,
|
|
status_code=exit_code,
|
|
context={"job_name": name, "run_kind": item.run_kind, "trigger_id": item.trigger_id},
|
|
)
|
|
update_runtime(
|
|
name,
|
|
status=status,
|
|
pid=0,
|
|
run_kind=item.run_kind,
|
|
trigger_id=item.trigger_id,
|
|
locked_by="",
|
|
next_run_at=_next_run_iso(item.job.next_run),
|
|
last_finished_at=iso_now(),
|
|
last_exit_code=exit_code,
|
|
last_duration_ms=duration_ms,
|
|
last_error=err,
|
|
output_tail=output_tail,
|
|
)
|
|
if item.trigger_id:
|
|
update_manual_trigger(
|
|
item.trigger_id,
|
|
status="success" if exit_code == 0 else "error",
|
|
finished_at=iso_now(),
|
|
exit_code=exit_code,
|
|
duration_ms=duration_ms,
|
|
output_tail=output_tail,
|
|
error_message=err,
|
|
)
|
|
running.pop(name, None)
|
|
|
|
|
|
def handle_manual_triggers(jobs: dict[str, Job], running: dict[str, RunningJob]) -> None:
|
|
for trigger in claim_manual_triggers(limit=10):
|
|
job = jobs.get(trigger.get("job_name"))
|
|
trigger_id = int(trigger.get("id") or 0)
|
|
if not job:
|
|
update_manual_trigger(trigger_id, status="error", error_message="unknown job", finished_at=iso_now())
|
|
continue
|
|
if not job.enabled and not trigger.get("force"):
|
|
update_manual_trigger(trigger_id, status="rejected", error_message="job disabled", finished_at=iso_now())
|
|
continue
|
|
if start_job(job, running, run_kind="manual", trigger_id=trigger_id):
|
|
continue
|
|
update_manual_trigger(trigger_id, status="pending")
|
|
|
|
|
|
def schedule_due_jobs(jobs: dict[str, Job], running: dict[str, RunningJob]) -> None:
|
|
now = time.time()
|
|
for job in sorted(jobs.values(), key=lambda item: item.next_run):
|
|
if not job.enabled:
|
|
update_runtime(job.name, status="disabled", next_run_at="")
|
|
continue
|
|
if job.pending or now >= job.next_run:
|
|
if start_job(job, running, run_kind="auto"):
|
|
job.next_run = time.time() + job.every_seconds
|
|
update_runtime(job.name, next_run_at=_next_run_iso(job.next_run))
|
|
|
|
|
|
def main() -> None:
|
|
init_db()
|
|
init_scheduler_tables()
|
|
base = time.time()
|
|
jobs: dict[str, Job] = {}
|
|
running: dict[str, RunningJob] = {}
|
|
jobs = load_jobs(jobs, base, running)
|
|
last_reload = time.time()
|
|
settings = _scheduler_settings()
|
|
print(f"[{now_str()}] [scheduler] started jobs={len(jobs)} dry_run={settings['dry_run']} mode=concurrent", flush=True)
|
|
while True:
|
|
finish_running_jobs(running)
|
|
settings = _scheduler_settings()
|
|
if time.time() - last_reload >= settings["config_reload_seconds"]:
|
|
jobs = load_jobs(jobs, time.time(), running)
|
|
last_reload = time.time()
|
|
handle_manual_triggers(jobs, running)
|
|
schedule_due_jobs(jobs, running)
|
|
time.sleep(settings["poll_seconds"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|