""" 运行时状态服务 用于暴露后台任务(如价格监控/模拟盘执行循环)的运行状态, 方便页面与 API 判断“前端价格链路”和“后台执行链路”是否正常。 """ from __future__ import annotations from copy import deepcopy from datetime import datetime from typing import Any, Dict _runtime_status: Dict[str, Dict[str, Any]] = { "price_monitor_loop": { "running": False, "started_at": None, "last_heartbeat_at": None, "last_error": "", "mode": "rest_polling", "active_orders": 0, "last_symbols": [], } } def mark_runtime_started(task_name: str, **extra: Any) -> None: state = _runtime_status.setdefault(task_name, {}) now = datetime.now().isoformat() state["running"] = True state["started_at"] = state.get("started_at") or now state["last_heartbeat_at"] = now state["last_error"] = "" state.update(extra) def mark_runtime_heartbeat(task_name: str, **extra: Any) -> None: state = _runtime_status.setdefault(task_name, {}) state["running"] = True state["last_heartbeat_at"] = datetime.now().isoformat() state.update(extra) def mark_runtime_error(task_name: str, error: str, **extra: Any) -> None: state = _runtime_status.setdefault(task_name, {}) state["running"] = True state["last_heartbeat_at"] = datetime.now().isoformat() state["last_error"] = str(error or "") state.update(extra) def mark_runtime_stopped(task_name: str, **extra: Any) -> None: state = _runtime_status.setdefault(task_name, {}) state["running"] = False state["last_heartbeat_at"] = datetime.now().isoformat() state.update(extra) def get_runtime_status(task_name: str) -> Dict[str, Any]: return deepcopy(_runtime_status.get(task_name, {}))