alphax/app/db/scheduler_db.py
2026-05-25 21:57:38 +08:00

409 lines
12 KiB
Python

"""PostgreSQL-backed scheduler configuration and runtime state."""
import json
from datetime import datetime
from app.db import altcoin_db
from app.db.postgres_connection import connect as pg_connect, ensure_migrations_once
_SCHEDULER_INIT_DONE = False
def get_scheduler_conn():
return pg_connect()
def get_main_conn():
return altcoin_db.get_conn()
DEFAULT_JOBS = [
{
"job_name": "event",
"command": "event",
"args": ["--limit", "4", "--max-seconds", "50"],
"every_seconds": 60,
"initial_delay": 5,
"lock_group": "recommendation_write",
"description": "事件/舆情驱动技术检查",
"sort_order": 10,
},
{
"job_name": "tracker",
"command": "tracker",
"args": [],
"every_seconds": 180,
"initial_delay": 20,
"lock_group": "tracking_write",
"description": "推荐价格跟踪",
"sort_order": 20,
},
{
"job_name": "paper-trader",
"command": "paper-trader",
"args": [],
"every_seconds": 180,
"initial_delay": 30,
"lock_group": "paper_trading_write",
"description": "策略交易账本同步",
"sort_order": 25,
},
{
"job_name": "market",
"command": "market",
"args": [],
"every_seconds": 180,
"initial_delay": 35,
"lock_group": "market_write",
"description": "全市场快照采集",
"sort_order": 28,
},
{
"job_name": "confirm",
"command": "confirm",
"args": [],
"every_seconds": 600,
"initial_delay": 40,
"lock_group": "recommendation_write",
"description": "确认层",
"sort_order": 30,
},
{
"job_name": "screener",
"command": "screener",
"args": [],
"every_seconds": 900,
"initial_delay": 80,
"lock_group": "screening_write",
"description": "粗筛/细筛",
"sort_order": 40,
},
{
"job_name": "sentiment",
"command": "sentiment",
"args": ["--collect"],
"every_seconds": 1800,
"initial_delay": 120,
"lock_group": "sentiment_write",
"description": "舆情采集",
"sort_order": 50,
},
{
"job_name": "onchain",
"command": "onchain",
"args": [],
"every_seconds": 1800,
"initial_delay": 150,
"lock_group": "onchain_write",
"description": "链上异动追踪",
"sort_order": 55,
},
{
"job_name": "llm-sentiment",
"command": "llm-insights",
"args": ["--scope", "sentiment", "--limit", "40"],
"every_seconds": 1800,
"initial_delay": 180,
"lock_group": "llm_write",
"description": "LLM 批量舆情分析",
"sort_order": 60,
},
{
"job_name": "review",
"command": "review",
"args": [],
"every_seconds": 86400,
"initial_delay": 300,
"lock_group": "review_write",
"description": "复盘",
"sort_order": 70,
},
]
def _now():
return datetime.now().isoformat()
def _dump(value):
return json.dumps(value or [], ensure_ascii=False, default=str)
def _load(value, fallback=None):
try:
return json.loads(value) if isinstance(value, str) else (value if value is not None else fallback)
except Exception:
return fallback
def _seed_scheduler_tables(conn):
now = _now()
for job in DEFAULT_JOBS:
conn.execute(
"""
INSERT INTO scheduler_job_config (
job_name, command, args_json, enabled, every_seconds, initial_delay,
lock_group, description, sort_order, created_at, updated_at
) VALUES (%s, %s, %s, 1, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(job_name) DO UPDATE SET
command=excluded.command,
args_json=excluded.args_json,
initial_delay=excluded.initial_delay,
lock_group=excluded.lock_group,
description=excluded.description,
sort_order=excluded.sort_order,
updated_at=scheduler_job_config.updated_at
""",
(
job["job_name"],
job["command"],
_dump(job.get("args")),
int(job["every_seconds"]),
int(job.get("initial_delay") or 0),
job.get("lock_group") or "",
job.get("description") or "",
int(job.get("sort_order") or 0),
now,
now,
),
)
conn.execute(
"""
INSERT INTO scheduler_runtime_status (job_name, status, updated_at)
VALUES (%s, 'idle', %s)
ON CONFLICT(job_name) DO NOTHING
""",
(job["job_name"], now),
)
def init_scheduler_tables():
global _SCHEDULER_INIT_DONE
if not _SCHEDULER_INIT_DONE:
ensure_migrations_once()
_SCHEDULER_INIT_DONE = True
conn = get_scheduler_conn()
try:
_seed_scheduler_tables(conn)
conn.commit()
finally:
conn.close()
def get_job_configs():
init_scheduler_tables()
conn = get_scheduler_conn()
rows = conn.execute("SELECT * FROM scheduler_job_config ORDER BY sort_order ASC, job_name ASC").fetchall()
conn.close()
jobs = []
for row in rows:
item = dict(row)
item["args"] = _load(item.pop("args_json", "[]"), [])
item["enabled"] = bool(item.get("enabled"))
jobs.append(item)
return jobs
def get_job_config(job_name):
init_scheduler_tables()
conn = get_scheduler_conn()
row = conn.execute("SELECT * FROM scheduler_job_config WHERE job_name=%s", (job_name,)).fetchone()
conn.close()
if not row:
return None
item = dict(row)
item["args"] = _load(item.pop("args_json", "[]"), [])
item["enabled"] = bool(item.get("enabled"))
return item
def set_job_enabled(job_name, enabled):
init_scheduler_tables()
now = _now()
conn = get_scheduler_conn()
cur = conn.execute(
"UPDATE scheduler_job_config SET enabled=%s, updated_at=%s WHERE job_name=%s",
(1 if enabled else 0, now, job_name),
)
conn.commit()
conn.close()
return cur.rowcount > 0
def set_job_interval(job_name, every_seconds):
seconds = max(30, int(every_seconds or 0))
init_scheduler_tables()
now = _now()
conn = get_scheduler_conn()
cur = conn.execute(
"UPDATE scheduler_job_config SET every_seconds=%s, updated_at=%s WHERE job_name=%s",
(seconds, now, job_name),
)
conn.commit()
conn.close()
return cur.rowcount > 0
def update_runtime(job_name, **fields):
init_scheduler_tables()
allowed = {
"status", "pid", "run_kind", "trigger_id", "locked_by", "next_run_at",
"last_started_at", "last_finished_at", "last_exit_code", "last_duration_ms",
"last_error", "output_tail",
}
values = {k: v for k, v in fields.items() if k in allowed}
values["updated_at"] = _now()
conn = get_scheduler_conn()
try:
conn.execute(
"INSERT INTO scheduler_runtime_status (job_name, updated_at) VALUES (%s, %s) ON CONFLICT(job_name) DO NOTHING",
(job_name, values["updated_at"]),
)
assignments = ", ".join([f"{k}=%s" for k in values])
conn.execute(
f"UPDATE scheduler_runtime_status SET {assignments} WHERE job_name=%s",
(*values.values(), job_name),
)
except Exception:
_create_runtime_table(conn)
conn.execute(
"INSERT INTO scheduler_runtime_status (job_name, updated_at) VALUES (%s, %s)",
(job_name, values["updated_at"]),
)
assignments = ", ".join([f"{k}=%s" for k in values])
conn.execute(
f"UPDATE scheduler_runtime_status SET {assignments} WHERE job_name=%s",
(*values.values(), job_name),
)
conn.commit()
conn.close()
def enqueue_manual_trigger(job_name, force=False, requested_by=""):
init_scheduler_tables()
if not get_job_config(job_name):
return None
conn = get_scheduler_conn()
row = conn.execute(
"""
INSERT INTO scheduler_manual_trigger (job_name, force, status, requested_by, requested_at)
VALUES (%s, %s, 'queued', %s, %s)
RETURNING id
""",
(job_name, 1 if force else 0, requested_by or "", _now()),
)
trigger_id = row.fetchone()["id"]
conn.commit()
conn.close()
return trigger_id
def claim_manual_triggers(limit=10):
init_scheduler_tables()
conn = get_scheduler_conn()
rows = conn.execute(
"""
SELECT * FROM scheduler_manual_trigger
WHERE status IN ('queued', 'pending')
ORDER BY requested_at ASC, id ASC
LIMIT %s
""",
(int(limit or 10),),
).fetchall()
conn.close()
return [dict(row) for row in rows]
def update_manual_trigger(trigger_id, **fields):
init_scheduler_tables()
allowed = {"status", "started_at", "finished_at", "exit_code", "duration_ms", "output_tail", "error_message"}
values = {k: v for k, v in fields.items() if k in allowed}
if not values:
return
conn = get_scheduler_conn()
assignments = ", ".join([f"{k}=%s" for k in values])
conn.execute(
f"UPDATE scheduler_manual_trigger SET {assignments} WHERE id=%s",
(*values.values(), int(trigger_id)),
)
conn.commit()
conn.close()
def list_manual_triggers(limit=30):
init_scheduler_tables()
limit = max(1, min(int(limit or 30), 100))
conn = get_scheduler_conn()
rows = conn.execute(
"SELECT * FROM scheduler_manual_trigger ORDER BY requested_at DESC, id DESC LIMIT %s",
(limit,),
).fetchall()
conn.close()
return [dict(row) for row in rows]
def get_scheduler_overview():
init_scheduler_tables()
conn = get_scheduler_conn()
configs = conn.execute("SELECT * FROM scheduler_job_config ORDER BY sort_order ASC, job_name ASC").fetchall()
runtime_rows = conn.execute("SELECT * FROM scheduler_runtime_status").fetchall()
conn.close()
try:
main_conn = get_main_conn()
latest_rows = main_conn.execute(
"""
SELECT c.*
FROM cron_run_log c
JOIN (
SELECT job_name, MAX(id) AS max_id
FROM cron_run_log
GROUP BY job_name
) x ON x.max_id = c.id
"""
).fetchall()
main_conn.close()
except Exception:
latest_rows = []
runtime = {row["job_name"]: dict(row) for row in runtime_rows}
latest = {row["job_name"]: dict(row) for row in latest_rows}
jobs = []
for row in configs:
item = dict(row)
item["args"] = _load(item.pop("args_json", "[]"), [])
item["enabled"] = bool(item.get("enabled"))
item["runtime"] = runtime.get(item["job_name"], {})
item["latest_cron"] = latest.get(_display_job_name(item["job_name"]), latest.get(item["job_name"], {}))
jobs.append(item)
return {"jobs": jobs, "updated_at": _now()}
def _display_job_name(job_name):
return {
"event": "事件舆情",
"tracker": "跟踪",
"confirm": "确认",
"screener": "粗筛",
"sentiment": "舆情",
"onchain": "链上",
"llm-sentiment": "AI舆情",
"paper-trader": "策略交易",
"review": "复盘",
}.get(job_name, job_name)
__all__ = [
"DEFAULT_JOBS",
"claim_manual_triggers",
"enqueue_manual_trigger",
"get_job_config",
"get_job_configs",
"get_scheduler_overview",
"get_scheduler_conn",
"init_scheduler_tables",
"list_manual_triggers",
"set_job_enabled",
"set_job_interval",
"update_manual_trigger",
"update_runtime",
]