diff --git a/backend/app/api/paper_trading.py b/backend/app/api/paper_trading.py index fa63325..9213567 100644 --- a/backend/app/api/paper_trading.py +++ b/backend/app/api/paper_trading.py @@ -10,6 +10,7 @@ from app.services.paper_trading_service import get_paper_trading_service from app.services.price_monitor_service import get_price_monitor_service from app.services.bitget_service import bitget_service from app.services.db_service import db_service +from app.services.runtime_status_service import get_runtime_status from app.utils.logger import logger from app.crypto_agent.crypto_agent import get_crypto_agent @@ -485,6 +486,11 @@ async def get_monitor_status(): # 始终显示配置的交易对价格 configured_symbols = settings.crypto_symbols.split(',') + for symbol in configured_symbols: + symbol = symbol.strip().upper() + if symbol: + monitor.subscribe_symbol(symbol) + # 获取价格 - 优先使用监控服务的缓存价格 latest_prices = dict(monitor.latest_prices) @@ -498,9 +504,11 @@ async def get_monitor_status(): return { "success": True, - "running": True, + "running": monitor.is_running(), + "mode": "websocket" if getattr(monitor, "_use_websocket", False) else "polling", "subscribed_symbols": configured_symbols, - "latest_prices": latest_prices + "latest_prices": latest_prices, + "execution_loop": get_runtime_status("price_monitor_loop"), } except Exception as e: logger.error(f"获取监控状态失败: {e}") diff --git a/backend/app/main.py b/backend/app/main.py index aea52e4..09c31a8 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -26,8 +26,14 @@ async def price_monitor_loop(): from app.services.bitget_service import bitget_service from app.services.feishu_service import get_feishu_paper_trading_service from app.services.telegram_service import get_telegram_service + from app.services.runtime_status_service import ( + mark_runtime_started, + mark_runtime_heartbeat, + mark_runtime_error, + ) logger.info("后台价格监控任务已启动(轮询模式)") + mark_runtime_started("price_monitor_loop", mode="rest_polling") feishu = get_feishu_paper_trading_service() # 使用 trading webhook telegram = get_telegram_service() @@ -138,6 +144,11 @@ async def price_monitor_loop(): try: # 获取活跃订单 active_orders = paper_trading.get_active_orders() + mark_runtime_heartbeat( + "price_monitor_loop", + active_orders=len(active_orders), + last_symbols=sorted({order.get('symbol') for order in active_orders if order.get('symbol')}), + ) if not active_orders: await asyncio.sleep(10) # 没有活跃订单时,10秒检查一次 continue @@ -316,6 +327,7 @@ async def price_monitor_loop(): except Exception as e: logger.error(f"价格监控循环出错: {e}") + mark_runtime_error("price_monitor_loop", str(e)) await asyncio.sleep(5) diff --git a/backend/app/services/bitget_websocket.py b/backend/app/services/bitget_websocket.py index 16e0bdc..1de87e8 100644 --- a/backend/app/services/bitget_websocket.py +++ b/backend/app/services/bitget_websocket.py @@ -111,12 +111,13 @@ class BitgetWebSocketClient: logger.info("Bitget WebSocket 已断开") - async def subscribe(self, symbols: list) -> bool: + async def subscribe(self, symbols: list, force: bool = False) -> bool: """ 订阅交易对价格 Args: symbols: 交易对列表,如 ['BTCUSDT', 'ETHUSDT'] + force: 是否强制重新发送订阅请求(用于重连后恢复订阅) Returns: 是否订阅成功 @@ -130,13 +131,12 @@ class BitgetWebSocketClient: # USDT-FUTURES = USDT 永续合约 args = [] for symbol in symbols: - if symbol not in self._subscribed_symbols: + if force or symbol not in self._subscribed_symbols: args.append({ "instType": "USDT-FUTURES", "channel": "ticker", "instId": symbol }) - self._subscribed_symbols.add(symbol) if not args: logger.info("所有交易对已订阅") @@ -148,6 +148,8 @@ class BitgetWebSocketClient: } await self._ws.send(json.dumps(message)) + for item in args: + self._subscribed_symbols.add(item["instId"]) logger.info(f"✅ 订阅 {len(args)} 个交易对: {[s['instId'] for s in args]}") return True @@ -397,7 +399,7 @@ class BitgetWebSocketClient: if await self.connect(): # 重新订阅之前的交易对 if self._subscribed_symbols: - await self.subscribe(list(self._subscribed_symbols)) + await self.subscribe(list(self._subscribed_symbols), force=True) self._reconnect_task = asyncio.create_task(reconnect()) diff --git a/backend/app/services/price_monitor_service.py b/backend/app/services/price_monitor_service.py index 2212491..92325fc 100644 --- a/backend/app/services/price_monitor_service.py +++ b/backend/app/services/price_monitor_service.py @@ -314,6 +314,9 @@ class PriceMonitorService: if callback not in self.price_callbacks: self.price_callbacks.append(callback) + if not self.running: + self.start() + def remove_price_callback(self, callback: Callable): """移除价格回调函数""" with self._lock: @@ -352,4 +355,7 @@ _price_monitor_service: Optional[PriceMonitorService] = None def get_price_monitor_service() -> PriceMonitorService: """获取价格监控服务单例""" # 直接使用类单例,不使用全局变量(避免 reload 时重置) - return PriceMonitorService() + service = PriceMonitorService() + if not service.running: + service.start() + return service diff --git a/backend/app/services/runtime_status_service.py b/backend/app/services/runtime_status_service.py new file mode 100644 index 0000000..c036104 --- /dev/null +++ b/backend/app/services/runtime_status_service.py @@ -0,0 +1,61 @@ +""" +运行时状态服务 + +用于暴露后台任务(如价格监控/模拟盘执行循环)的运行状态, +方便页面与 API 判断“前端价格链路”和“后台执行链路”是否正常。 +""" +from __future__ import annotations + +from copy import deepcopy +from datetime import datetime +from typing import Any, Dict + + +_runtime_status: Dict[str, Dict[str, Any]] = { + "price_monitor_loop": { + "running": False, + "started_at": None, + "last_heartbeat_at": None, + "last_error": "", + "mode": "rest_polling", + "active_orders": 0, + "last_symbols": [], + } +} + + +def mark_runtime_started(task_name: str, **extra: Any) -> None: + state = _runtime_status.setdefault(task_name, {}) + now = datetime.now().isoformat() + state["running"] = True + state["started_at"] = state.get("started_at") or now + state["last_heartbeat_at"] = now + state["last_error"] = "" + state.update(extra) + + +def mark_runtime_heartbeat(task_name: str, **extra: Any) -> None: + state = _runtime_status.setdefault(task_name, {}) + state["running"] = True + state["last_heartbeat_at"] = datetime.now().isoformat() + state.update(extra) + + +def mark_runtime_error(task_name: str, error: str, **extra: Any) -> None: + state = _runtime_status.setdefault(task_name, {}) + state["running"] = True + state["last_heartbeat_at"] = datetime.now().isoformat() + state["last_error"] = str(error or "") + state.update(extra) + + +def mark_runtime_stopped(task_name: str, **extra: Any) -> None: + state = _runtime_status.setdefault(task_name, {}) + state["running"] = False + state["last_heartbeat_at"] = datetime.now().isoformat() + state.update(extra) + + +def get_runtime_status(task_name: str) -> Dict[str, Any]: + return deepcopy(_runtime_status.get(task_name, {})) + diff --git a/frontend/trading.html b/frontend/trading.html index 5e194ae..24898b7 100644 --- a/frontend/trading.html +++ b/frontend/trading.html @@ -425,7 +425,12 @@
-
实时价格
+
+ 实时价格 + + {{ priceMonitor.mode === 'websocket' ? 'WebSocket' : '轮询' }} · {{ priceMonitor.running ? '运行中' : '未运行' }} + +
{{ symbol }} @@ -434,6 +439,22 @@
+
+
+ 模拟盘执行链路 + + {{ executionLoop.running ? '运行中' : '未运行' }} + +
+
+ 心跳: {{ executionLoop.last_heartbeat_at || '-' }}
+ 活跃订单: {{ executionLoop.active_orders ?? 0 }}
+ 标的: {{ (executionLoop.last_symbols || []).join(', ') || '-' }}
+ 错误: {{ executionLoop.last_error }} + 错误: 无 +
+
+