import os import re import subprocess import sys import shutil from fastapi.testclient import TestClient PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if PROJECT_DIR not in sys.path: sys.path.insert(0, PROJECT_DIR) from app.db import altcoin_db from app.db import scheduler_db from app.web import web_server import docker.scheduler as scheduler def test_scheduler_tables_seed_defaults(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) altcoin_db.init_db() scheduler_db.init_scheduler_tables() jobs = {item["job_name"]: item for item in scheduler_db.get_job_configs()} assert jobs["event"]["lock_group"] == "recommendation_write" assert jobs["confirm"]["lock_group"] == "recommendation_write" assert jobs["tracker"]["every_seconds"] == 180 assert jobs["paper-trader"]["lock_group"] == "paper_trading_write" assert jobs["live-trading-sync"]["command"] == "live-trading-sync" assert jobs["live-trading-sync"]["lock_group"] == "live_trading_write" assert "onchain" not in jobs def test_scheduler_init_retires_legacy_onchain_job(pg_conn): pg_conn.execute( """ INSERT INTO scheduler_job_config (job_name, command, args_json, enabled, every_seconds, initial_delay, lock_group, description, sort_order, created_at, updated_at) VALUES ('onchain', 'onchain', '[]', 1, 60, 0, 'onchain', 'legacy', 99, NOW(), NOW()) ON CONFLICT(job_name) DO UPDATE SET enabled=1, command='onchain', updated_at=NOW() """ ) pg_conn.execute( """ INSERT INTO scheduler_manual_trigger (job_name, force, status, requested_by, requested_at) VALUES ('onchain', 1, 'queued', 'test', NOW()) """ ) pg_conn.commit() scheduler_db._SCHEDULER_INIT_DONE = False scheduler_db.init_scheduler_tables() assert scheduler_db.get_job_config("onchain") is None assert "onchain" not in {item["job_name"] for item in scheduler_db.get_job_configs()} row = pg_conn.execute("SELECT enabled, description FROM scheduler_job_config WHERE job_name='onchain'").fetchone() trigger = pg_conn.execute("SELECT status, error_message FROM scheduler_manual_trigger WHERE job_name='onchain' ORDER BY id DESC LIMIT 1").fetchone() assert row["enabled"] == 0 assert "已下线" in row["description"] assert trigger["status"] == "skipped" def test_scheduler_control_api_and_page(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) altcoin_db.init_db() scheduler_db.init_scheduler_tables() client = TestClient(web_server.app) page = client.get("/cron") assert page.status_code == 200 assert "调度中心" in page.text scripts = re.findall(r"", page.text) scheduler_scripts = [s for s in scripts if "/api/scheduler/jobs" in s] assert scheduler_scripts if shutil.which("node"): subprocess.run( ["node", "-e", f"new Function({scheduler_scripts[-1]!r})"], check=True, cwd=PROJECT_DIR, ) assert "data-action=\"trigger\"" in page.text assert "onchange=\"toggleJob" not in page.text assert "onclick=\"triggerJob" not in page.text resp = client.get("/api/scheduler/jobs") assert resp.status_code == 200 assert any(item["job_name"] == "event" for item in resp.json()["jobs"]) toggle = client.post("/api/scheduler/jobs/event/toggle", json={"enabled": False}) assert toggle.status_code == 200 assert scheduler_db.get_job_config("event")["enabled"] is False blocked = client.post("/api/scheduler/jobs/event/trigger", json={"force": False}) assert blocked.status_code == 409 forced = client.post("/api/scheduler/jobs/event/trigger", json={"force": True}) assert forced.status_code == 200 triggers = scheduler_db.list_manual_triggers() assert triggers[0]["job_name"] == "event" assert triggers[0]["force"] == 1 interval = client.post("/api/scheduler/jobs/tracker/interval", json={"every_seconds": 240}) assert interval.status_code == 200 assert scheduler_db.get_job_config("tracker")["every_seconds"] == 240 class _FakeProc: _next_pid = 100 def __init__(self, cmd, **kwargs): self.cmd = cmd self.pid = _FakeProc._next_pid _FakeProc._next_pid += 1 self.returncode = None self.stdout = self def poll(self): return self.returncode def read(self): return "ok" def kill(self): self.returncode = -9 def test_scheduler_starts_different_lock_groups_concurrently(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) altcoin_db.init_db() scheduler_db.init_scheduler_tables() monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc) monkeypatch.setattr(scheduler, "DRY_RUN", False) jobs = { "tracker": scheduler.Job("tracker", "tracker", 180, lock_group="tracking_write", enabled=True, next_run=0), "screener": scheduler.Job("screener", "screener", 900, lock_group="screening_write", enabled=True, next_run=0), } running = {} scheduler.schedule_due_jobs(jobs, running) assert set(running) == {"tracker", "screener"} def test_scheduler_blocks_shared_lock_and_prevents_reentry(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) altcoin_db.init_db() scheduler_db.init_scheduler_tables() monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc) monkeypatch.setattr(scheduler, "DRY_RUN", False) jobs = { "event": scheduler.Job("event", "event", 60, lock_group="recommendation_write", enabled=True, next_run=0), "confirm": scheduler.Job("confirm", "confirm", 600, lock_group="recommendation_write", enabled=True, next_run=0), } running = {} scheduler.schedule_due_jobs(jobs, running) assert "event" in running assert "confirm" not in running assert jobs["confirm"].pending is True again = scheduler.start_job(jobs["event"], running) assert again is False assert len(running) == 1 def test_disabled_job_does_not_auto_run_but_manual_force_can_start(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) altcoin_db.init_db() scheduler_db.init_scheduler_tables() monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc) monkeypatch.setattr(scheduler, "DRY_RUN", False) job = scheduler.Job("event", "event", 60, lock_group="recommendation_write", enabled=False, next_run=0) running = {} scheduler.schedule_due_jobs({"event": job}, running) assert running == {} assert scheduler.start_job(job, running, run_kind="manual", trigger_id=1) is True assert "event" in running