197 lines
7.4 KiB
Python
197 lines
7.4 KiB
Python
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import shutil
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
if PROJECT_DIR not in sys.path:
|
|
sys.path.insert(0, PROJECT_DIR)
|
|
|
|
from app.db import altcoin_db
|
|
from app.db import scheduler_db
|
|
from app.web import web_server
|
|
import docker.scheduler as scheduler
|
|
|
|
|
|
def test_scheduler_tables_seed_defaults(monkeypatch, tmp_path):
|
|
db_path = tmp_path / "altcoin_monitor.db"
|
|
sched_path = tmp_path / "scheduler_state.db"
|
|
monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path))
|
|
monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path))
|
|
altcoin_db.init_db()
|
|
scheduler_db.init_scheduler_tables()
|
|
|
|
jobs = {item["job_name"]: item for item in scheduler_db.get_job_configs()}
|
|
assert jobs["event"]["lock_group"] == "recommendation_write"
|
|
assert jobs["confirm"]["lock_group"] == "recommendation_write"
|
|
assert jobs["tracker"]["every_seconds"] == 180
|
|
assert jobs["paper-trader"]["lock_group"] == "paper_trading_write"
|
|
assert "onchain" not in jobs
|
|
|
|
|
|
def test_scheduler_init_retires_legacy_onchain_job(pg_conn):
|
|
pg_conn.execute(
|
|
"""
|
|
INSERT INTO scheduler_job_config (job_name, command, args_json, enabled, every_seconds, initial_delay, lock_group, description, sort_order, created_at, updated_at)
|
|
VALUES ('onchain', 'onchain', '[]', 1, 60, 0, 'onchain', 'legacy', 99, NOW(), NOW())
|
|
ON CONFLICT(job_name) DO UPDATE SET enabled=1, command='onchain', updated_at=NOW()
|
|
"""
|
|
)
|
|
pg_conn.execute(
|
|
"""
|
|
INSERT INTO scheduler_manual_trigger (job_name, force, status, requested_by, requested_at)
|
|
VALUES ('onchain', 1, 'queued', 'test', NOW())
|
|
"""
|
|
)
|
|
pg_conn.commit()
|
|
scheduler_db._SCHEDULER_INIT_DONE = False
|
|
|
|
scheduler_db.init_scheduler_tables()
|
|
|
|
assert scheduler_db.get_job_config("onchain") is None
|
|
assert "onchain" not in {item["job_name"] for item in scheduler_db.get_job_configs()}
|
|
row = pg_conn.execute("SELECT enabled, description FROM scheduler_job_config WHERE job_name='onchain'").fetchone()
|
|
trigger = pg_conn.execute("SELECT status, error_message FROM scheduler_manual_trigger WHERE job_name='onchain' ORDER BY id DESC LIMIT 1").fetchone()
|
|
assert row["enabled"] == 0
|
|
assert "已下线" in row["description"]
|
|
assert trigger["status"] == "skipped"
|
|
|
|
|
|
def test_scheduler_control_api_and_page(monkeypatch, tmp_path):
|
|
db_path = tmp_path / "altcoin_monitor.db"
|
|
sched_path = tmp_path / "scheduler_state.db"
|
|
monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path))
|
|
monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path))
|
|
monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db)
|
|
altcoin_db.init_db()
|
|
scheduler_db.init_scheduler_tables()
|
|
|
|
client = TestClient(web_server.app)
|
|
page = client.get("/cron")
|
|
assert page.status_code == 200
|
|
assert "调度中心" in page.text
|
|
scripts = re.findall(r"<script>([\s\S]*?)</script>", page.text)
|
|
scheduler_scripts = [s for s in scripts if "/api/scheduler/jobs" in s]
|
|
assert scheduler_scripts
|
|
if shutil.which("node"):
|
|
subprocess.run(
|
|
["node", "-e", f"new Function({scheduler_scripts[-1]!r})"],
|
|
check=True,
|
|
cwd=PROJECT_DIR,
|
|
)
|
|
assert "data-action=\"trigger\"" in page.text
|
|
assert "onchange=\"toggleJob" not in page.text
|
|
assert "onclick=\"triggerJob" not in page.text
|
|
|
|
resp = client.get("/api/scheduler/jobs")
|
|
assert resp.status_code == 200
|
|
assert any(item["job_name"] == "event" for item in resp.json()["jobs"])
|
|
|
|
toggle = client.post("/api/scheduler/jobs/event/toggle", json={"enabled": False})
|
|
assert toggle.status_code == 200
|
|
assert scheduler_db.get_job_config("event")["enabled"] is False
|
|
|
|
blocked = client.post("/api/scheduler/jobs/event/trigger", json={"force": False})
|
|
assert blocked.status_code == 409
|
|
|
|
forced = client.post("/api/scheduler/jobs/event/trigger", json={"force": True})
|
|
assert forced.status_code == 200
|
|
triggers = scheduler_db.list_manual_triggers()
|
|
assert triggers[0]["job_name"] == "event"
|
|
assert triggers[0]["force"] == 1
|
|
|
|
interval = client.post("/api/scheduler/jobs/tracker/interval", json={"every_seconds": 240})
|
|
assert interval.status_code == 200
|
|
assert scheduler_db.get_job_config("tracker")["every_seconds"] == 240
|
|
|
|
|
|
class _FakeProc:
|
|
_next_pid = 100
|
|
|
|
def __init__(self, cmd, **kwargs):
|
|
self.cmd = cmd
|
|
self.pid = _FakeProc._next_pid
|
|
_FakeProc._next_pid += 1
|
|
self.returncode = None
|
|
self.stdout = self
|
|
|
|
def poll(self):
|
|
return self.returncode
|
|
|
|
def read(self):
|
|
return "ok"
|
|
|
|
def kill(self):
|
|
self.returncode = -9
|
|
|
|
|
|
def test_scheduler_starts_different_lock_groups_concurrently(monkeypatch, tmp_path):
|
|
db_path = tmp_path / "altcoin_monitor.db"
|
|
sched_path = tmp_path / "scheduler_state.db"
|
|
monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path))
|
|
monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path))
|
|
altcoin_db.init_db()
|
|
scheduler_db.init_scheduler_tables()
|
|
monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc)
|
|
monkeypatch.setattr(scheduler, "DRY_RUN", False)
|
|
|
|
jobs = {
|
|
"tracker": scheduler.Job("tracker", "tracker", 180, lock_group="tracking_write", enabled=True, next_run=0),
|
|
"screener": scheduler.Job("screener", "screener", 900, lock_group="screening_write", enabled=True, next_run=0),
|
|
}
|
|
running = {}
|
|
|
|
scheduler.schedule_due_jobs(jobs, running)
|
|
|
|
assert set(running) == {"tracker", "screener"}
|
|
|
|
|
|
def test_scheduler_blocks_shared_lock_and_prevents_reentry(monkeypatch, tmp_path):
|
|
db_path = tmp_path / "altcoin_monitor.db"
|
|
sched_path = tmp_path / "scheduler_state.db"
|
|
monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path))
|
|
monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path))
|
|
altcoin_db.init_db()
|
|
scheduler_db.init_scheduler_tables()
|
|
monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc)
|
|
monkeypatch.setattr(scheduler, "DRY_RUN", False)
|
|
|
|
jobs = {
|
|
"event": scheduler.Job("event", "event", 60, lock_group="recommendation_write", enabled=True, next_run=0),
|
|
"confirm": scheduler.Job("confirm", "confirm", 600, lock_group="recommendation_write", enabled=True, next_run=0),
|
|
}
|
|
running = {}
|
|
|
|
scheduler.schedule_due_jobs(jobs, running)
|
|
|
|
assert "event" in running
|
|
assert "confirm" not in running
|
|
assert jobs["confirm"].pending is True
|
|
|
|
again = scheduler.start_job(jobs["event"], running)
|
|
assert again is False
|
|
assert len(running) == 1
|
|
|
|
|
|
def test_disabled_job_does_not_auto_run_but_manual_force_can_start(monkeypatch, tmp_path):
|
|
db_path = tmp_path / "altcoin_monitor.db"
|
|
sched_path = tmp_path / "scheduler_state.db"
|
|
monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path))
|
|
monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path))
|
|
altcoin_db.init_db()
|
|
scheduler_db.init_scheduler_tables()
|
|
monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc)
|
|
monkeypatch.setattr(scheduler, "DRY_RUN", False)
|
|
|
|
job = scheduler.Job("event", "event", 60, lock_group="recommendation_write", enabled=False, next_run=0)
|
|
running = {}
|
|
|
|
scheduler.schedule_due_jobs({"event": job}, running)
|
|
assert running == {}
|
|
|
|
assert scheduler.start_job(job, running, run_kind="manual", trigger_id=1) is True
|
|
assert "event" in running
|