"""SQLite-backed scheduler configuration and runtime state.""" import json import os import sqlite3 from datetime import datetime from pathlib import Path from app.db import altcoin_db REPO_ROOT = Path(__file__).resolve().parents[2] SCHEDULER_DB_PATH = os.getenv("ALPHAX_SCHEDULER_DB_PATH", str(REPO_ROOT / "data" / "scheduler_state.db")) def get_scheduler_conn(): path = Path(SCHEDULER_DB_PATH) path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(path), timeout=30, isolation_level=None) conn.row_factory = sqlite3.Row conn.execute("PRAGMA busy_timeout=30000") conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") return conn def get_main_conn(): return altcoin_db.get_conn() DEFAULT_JOBS = [ { "job_name": "event", "command": "event", "args": [], "every_seconds": 60, "initial_delay": 5, "lock_group": "recommendation_write", "description": "事件/舆情驱动技术检查", "sort_order": 10, }, { "job_name": "tracker", "command": "tracker", "args": [], "every_seconds": 180, "initial_delay": 20, "lock_group": "tracking_write", "description": "推荐价格跟踪", "sort_order": 20, }, { "job_name": "confirm", "command": "confirm", "args": [], "every_seconds": 600, "initial_delay": 40, "lock_group": "recommendation_write", "description": "确认层", "sort_order": 30, }, { "job_name": "screener", "command": "screener", "args": [], "every_seconds": 900, "initial_delay": 80, "lock_group": "screening_write", "description": "粗筛/细筛", "sort_order": 40, }, { "job_name": "sentiment", "command": "sentiment", "args": ["--collect"], "every_seconds": 1800, "initial_delay": 120, "lock_group": "sentiment_write", "description": "舆情采集", "sort_order": 50, }, { "job_name": "onchain", "command": "onchain", "args": [], "every_seconds": 1800, "initial_delay": 150, "lock_group": "onchain_write", "description": "链上异动追踪", "sort_order": 55, }, { "job_name": "llm-sentiment", "command": "llm-insights", "args": ["--scope", "sentiment", "--limit", "40"], "every_seconds": 1800, "initial_delay": 180, "lock_group": "llm_write", "description": "LLM 批量舆情分析", "sort_order": 60, }, { "job_name": "review", "command": "review", "args": [], "every_seconds": 86400, "initial_delay": 300, "lock_group": "review_write", "description": "复盘", "sort_order": 70, }, ] def _now(): return datetime.now().isoformat() def _dump(value): return json.dumps(value or [], ensure_ascii=False, default=str) def _load(value, fallback=None): try: return json.loads(value) if isinstance(value, str) else (value if value is not None else fallback) except Exception: return fallback def _create_runtime_table(conn): conn.execute("DROP TABLE IF EXISTS scheduler_runtime_status") conn.execute( """ CREATE TABLE scheduler_runtime_status ( job_name TEXT PRIMARY KEY, status TEXT DEFAULT 'idle', pid INTEGER DEFAULT 0, run_kind TEXT DEFAULT '', trigger_id INTEGER DEFAULT 0, locked_by TEXT DEFAULT '', next_run_at TEXT DEFAULT '', last_started_at TEXT DEFAULT '', last_finished_at TEXT DEFAULT '', last_exit_code INTEGER DEFAULT 0, last_duration_ms INTEGER DEFAULT 0, last_error TEXT DEFAULT '', output_tail TEXT DEFAULT '', updated_at TEXT NOT NULL ) """ ) def _create_config_table(conn): conn.execute("DROP TABLE IF EXISTS scheduler_job_config") conn.execute( """ CREATE TABLE scheduler_job_config ( job_name TEXT PRIMARY KEY, command TEXT NOT NULL, args_json TEXT DEFAULT '[]', enabled INTEGER DEFAULT 1, every_seconds INTEGER NOT NULL, initial_delay INTEGER DEFAULT 0, lock_group TEXT DEFAULT '', description TEXT DEFAULT '', sort_order INTEGER DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) def _create_manual_trigger_table(conn): conn.execute("DROP INDEX IF EXISTS idx_scheduler_trigger_status") conn.execute("DROP TABLE IF EXISTS scheduler_manual_trigger") conn.execute( """ CREATE TABLE scheduler_manual_trigger ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_name TEXT NOT NULL, force INTEGER DEFAULT 0, status TEXT DEFAULT 'queued', requested_by TEXT DEFAULT '', requested_at TEXT NOT NULL, started_at TEXT DEFAULT '', finished_at TEXT DEFAULT '', exit_code INTEGER DEFAULT 0, duration_ms INTEGER DEFAULT 0, output_tail TEXT DEFAULT '', error_message TEXT DEFAULT '' ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_scheduler_trigger_status ON scheduler_manual_trigger(status, requested_at)") def _reset_scheduler_tables(conn): _create_manual_trigger_table(conn) _create_runtime_table(conn) _create_config_table(conn) def _seed_scheduler_tables(conn): now = _now() for job in DEFAULT_JOBS: conn.execute( """ INSERT INTO scheduler_job_config ( job_name, command, args_json, enabled, every_seconds, initial_delay, lock_group, description, sort_order, created_at, updated_at ) VALUES (?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(job_name) DO UPDATE SET command=excluded.command, args_json=excluded.args_json, initial_delay=excluded.initial_delay, lock_group=excluded.lock_group, description=excluded.description, sort_order=excluded.sort_order, updated_at=scheduler_job_config.updated_at """, ( job["job_name"], job["command"], _dump(job.get("args")), int(job["every_seconds"]), int(job.get("initial_delay") or 0), job.get("lock_group") or "", job.get("description") or "", int(job.get("sort_order") or 0), now, now, ), ) conn.execute( """ INSERT INTO scheduler_runtime_status (job_name, status, updated_at) VALUES (?, 'idle', ?) ON CONFLICT(job_name) DO NOTHING """, (job["job_name"], now), ) def init_scheduler_tables(): conn = get_scheduler_conn() conn.execute( """ CREATE TABLE IF NOT EXISTS scheduler_job_config ( job_name TEXT PRIMARY KEY, command TEXT NOT NULL, args_json TEXT DEFAULT '[]', enabled INTEGER DEFAULT 1, every_seconds INTEGER NOT NULL, initial_delay INTEGER DEFAULT 0, lock_group TEXT DEFAULT '', description TEXT DEFAULT '', sort_order INTEGER DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) try: conn.execute( """ CREATE TABLE IF NOT EXISTS scheduler_runtime_status ( job_name TEXT PRIMARY KEY, status TEXT DEFAULT 'idle', pid INTEGER DEFAULT 0, run_kind TEXT DEFAULT '', trigger_id INTEGER DEFAULT 0, locked_by TEXT DEFAULT '', next_run_at TEXT DEFAULT '', last_started_at TEXT DEFAULT '', last_finished_at TEXT DEFAULT '', last_exit_code INTEGER DEFAULT 0, last_duration_ms INTEGER DEFAULT 0, last_error TEXT DEFAULT '', output_tail TEXT DEFAULT '', updated_at TEXT NOT NULL ) """ ) conn.execute("SELECT COUNT(*) FROM scheduler_runtime_status").fetchone() except Exception: _create_runtime_table(conn) conn.execute( """ CREATE TABLE IF NOT EXISTS scheduler_manual_trigger ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_name TEXT NOT NULL, force INTEGER DEFAULT 0, status TEXT DEFAULT 'queued', requested_by TEXT DEFAULT '', requested_at TEXT NOT NULL, started_at TEXT DEFAULT '', finished_at TEXT DEFAULT '', exit_code INTEGER DEFAULT 0, duration_ms INTEGER DEFAULT 0, output_tail TEXT DEFAULT '', error_message TEXT DEFAULT '' ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_scheduler_trigger_status ON scheduler_manual_trigger(status, requested_at)") try: _seed_scheduler_tables(conn) except Exception: _reset_scheduler_tables(conn) _seed_scheduler_tables(conn) conn.commit() conn.close() def get_job_configs(): init_scheduler_tables() conn = get_scheduler_conn() rows = conn.execute("SELECT * FROM scheduler_job_config ORDER BY sort_order ASC, job_name ASC").fetchall() conn.close() jobs = [] for row in rows: item = dict(row) item["args"] = _load(item.pop("args_json", "[]"), []) item["enabled"] = bool(item.get("enabled")) jobs.append(item) return jobs def get_job_config(job_name): init_scheduler_tables() conn = get_scheduler_conn() row = conn.execute("SELECT * FROM scheduler_job_config WHERE job_name=?", (job_name,)).fetchone() conn.close() if not row: return None item = dict(row) item["args"] = _load(item.pop("args_json", "[]"), []) item["enabled"] = bool(item.get("enabled")) return item def set_job_enabled(job_name, enabled): init_scheduler_tables() now = _now() conn = get_scheduler_conn() cur = conn.execute( "UPDATE scheduler_job_config SET enabled=?, updated_at=? WHERE job_name=?", (1 if enabled else 0, now, job_name), ) conn.commit() conn.close() return cur.rowcount > 0 def set_job_interval(job_name, every_seconds): seconds = max(30, int(every_seconds or 0)) init_scheduler_tables() now = _now() conn = get_scheduler_conn() cur = conn.execute( "UPDATE scheduler_job_config SET every_seconds=?, updated_at=? WHERE job_name=?", (seconds, now, job_name), ) conn.commit() conn.close() return cur.rowcount > 0 def update_runtime(job_name, **fields): init_scheduler_tables() allowed = { "status", "pid", "run_kind", "trigger_id", "locked_by", "next_run_at", "last_started_at", "last_finished_at", "last_exit_code", "last_duration_ms", "last_error", "output_tail", } values = {k: v for k, v in fields.items() if k in allowed} values["updated_at"] = _now() conn = get_scheduler_conn() try: conn.execute( "INSERT INTO scheduler_runtime_status (job_name, updated_at) VALUES (?, ?) ON CONFLICT(job_name) DO NOTHING", (job_name, values["updated_at"]), ) assignments = ", ".join([f"{k}=?" for k in values]) conn.execute( f"UPDATE scheduler_runtime_status SET {assignments} WHERE job_name=?", (*values.values(), job_name), ) except Exception: _create_runtime_table(conn) conn.execute( "INSERT INTO scheduler_runtime_status (job_name, updated_at) VALUES (?, ?)", (job_name, values["updated_at"]), ) assignments = ", ".join([f"{k}=?" for k in values]) conn.execute( f"UPDATE scheduler_runtime_status SET {assignments} WHERE job_name=?", (*values.values(), job_name), ) conn.commit() conn.close() def enqueue_manual_trigger(job_name, force=False, requested_by=""): init_scheduler_tables() if not get_job_config(job_name): return None conn = get_scheduler_conn() cur = conn.execute( """ INSERT INTO scheduler_manual_trigger (job_name, force, status, requested_by, requested_at) VALUES (?, ?, 'queued', ?, ?) """, (job_name, 1 if force else 0, requested_by or "", _now()), ) conn.commit() trigger_id = cur.lastrowid conn.close() return trigger_id def claim_manual_triggers(limit=10): init_scheduler_tables() conn = get_scheduler_conn() rows = conn.execute( """ SELECT * FROM scheduler_manual_trigger WHERE status IN ('queued', 'pending') ORDER BY requested_at ASC, id ASC LIMIT ? """, (int(limit or 10),), ).fetchall() conn.close() return [dict(row) for row in rows] def update_manual_trigger(trigger_id, **fields): init_scheduler_tables() allowed = {"status", "started_at", "finished_at", "exit_code", "duration_ms", "output_tail", "error_message"} values = {k: v for k, v in fields.items() if k in allowed} if not values: return conn = get_scheduler_conn() assignments = ", ".join([f"{k}=?" for k in values]) conn.execute( f"UPDATE scheduler_manual_trigger SET {assignments} WHERE id=?", (*values.values(), int(trigger_id)), ) conn.commit() conn.close() def list_manual_triggers(limit=30): init_scheduler_tables() limit = max(1, min(int(limit or 30), 100)) conn = get_scheduler_conn() rows = conn.execute( "SELECT * FROM scheduler_manual_trigger ORDER BY requested_at DESC, id DESC LIMIT ?", (limit,), ).fetchall() conn.close() return [dict(row) for row in rows] def get_scheduler_overview(): init_scheduler_tables() conn = get_scheduler_conn() configs = conn.execute("SELECT * FROM scheduler_job_config ORDER BY sort_order ASC, job_name ASC").fetchall() runtime_rows = conn.execute("SELECT * FROM scheduler_runtime_status").fetchall() conn.close() try: main_conn = get_main_conn() latest_rows = main_conn.execute( """ SELECT c.* FROM cron_run_log c JOIN ( SELECT job_name, MAX(id) AS max_id FROM cron_run_log GROUP BY job_name ) x ON x.max_id = c.id """ ).fetchall() main_conn.close() except Exception: latest_rows = [] runtime = {row["job_name"]: dict(row) for row in runtime_rows} latest = {row["job_name"]: dict(row) for row in latest_rows} jobs = [] for row in configs: item = dict(row) item["args"] = _load(item.pop("args_json", "[]"), []) item["enabled"] = bool(item.get("enabled")) item["runtime"] = runtime.get(item["job_name"], {}) item["latest_cron"] = latest.get(_display_job_name(item["job_name"]), latest.get(item["job_name"], {})) jobs.append(item) return {"jobs": jobs, "updated_at": _now()} def _display_job_name(job_name): return { "event": "事件舆情", "tracker": "跟踪", "confirm": "确认", "screener": "粗筛", "sentiment": "舆情", "onchain": "链上", "llm-sentiment": "AI舆情", "review": "复盘", }.get(job_name, job_name) __all__ = [ "DEFAULT_JOBS", "claim_manual_triggers", "enqueue_manual_trigger", "get_job_config", "get_job_configs", "get_scheduler_overview", "get_scheduler_conn", "init_scheduler_tables", "list_manual_triggers", "set_job_enabled", "set_job_interval", "update_manual_trigger", "update_runtime", ]