diff --git a/backend/app/services/websocket_monitor.py b/backend/app/services/websocket_monitor.py index 81572e8..b53b140 100644 --- a/backend/app/services/websocket_monitor.py +++ b/backend/app/services/websocket_monitor.py @@ -47,6 +47,7 @@ class WebSocketPriceMonitor: """ symbol = symbol.upper() + need_start = False with self._lock: if symbol in self._subscribed_symbols: logger.debug(f"[WS:{id(self)}] {symbol} 已订阅,跳过") @@ -54,12 +55,16 @@ class WebSocketPriceMonitor: self._subscribed_symbols.add(symbol) - # 如果服务未运行,自动启动 + # 检查是否需要启动服务 if not self.is_running(): - self.start() + need_start = True - # 立即获取当前价格 - self._fetch_current_price(symbol) + # 在锁外启动服务(避免死锁) + if need_start: + self.start() + + # 在锁外获取当前价格(避免阻塞) + self._fetch_current_price(symbol) logger.info(f"[WS:{id(self)}] 已订阅 {symbol} 价格更新 (当前订阅: {self._subscribed_symbols})") diff --git a/scripts/test_websocket_monitor.py b/scripts/test_websocket_monitor.py index bf1fff5..63478f1 100644 --- a/scripts/test_websocket_monitor.py +++ b/scripts/test_websocket_monitor.py @@ -5,6 +5,8 @@ import sys import os import asyncio +import threading +import time # 确保路径正确 script_dir = os.path.dirname(os.path.abspath(__file__)) @@ -16,9 +18,14 @@ from app.services.websocket_monitor import get_ws_price_monitor from app.utils.logger import logger +# 用于接收价格更新的队列 +price_updates = [] + + def on_price_update(symbol: str, price: float): """价格更新回调""" - print(f"📊 {symbol}: ${price:,.2f}") + price_updates.append((symbol, price)) + print(f"📊 [回调] {symbol}: ${price:,.2f}") async def main(): @@ -33,41 +40,55 @@ async def main(): print(f"\n订阅交易对: {', '.join(symbols)}") for symbol in symbols: + print(f" 订阅 {symbol}...") ws_monitor.subscribe_symbol(symbol) # 注册回调 + print("注册价格回调...") ws_monitor.add_price_callback(on_price_update) print("\n等待价格推送(30秒)...") print("提示: WebSocket 连接可能需要几秒钟建立...") print("-" * 60) - # 使用 asyncio.sleep 而不是 time.sleep,让事件循环运行 - connection_check = 0 + # 给 WebSocket 线程一些时间启动 + await asyncio.sleep(2) + + # 检查状态 + print(f"\n📡 WebSocket 运行状态: {ws_monitor.is_running()}") + print(f"📡 已订阅交易对: {ws_monitor.get_subscribed_symbols()}") + print(f"📡 WebSocket 线程存活: {ws_monitor._thread.is_alive() if ws_monitor._thread else 'None'}") + print(f"📡 事件循环: {'已创建' if ws_monitor._loop else '未创建'}") + print("-" * 60) + + # 等待价格更新 + start_time = time.time() + last_price_count = 0 + for i in range(30): await asyncio.sleep(1) - # 检查 WebSocket 运行状态 - if i == 2: - print(f"📡 WebSocket 运行状态: {ws_monitor.is_running()}") - print(f"📡 已订阅交易对: {ws_monitor.get_subscribed_symbols()}") - - # 每秒检查一次价格 - for symbol in symbols: - price = ws_monitor.get_latest_price(symbol) - if price: - print(f"📊 {symbol}: ${price:,.2f}") - # 每5秒打印一次状态 - connection_check += 1 - if connection_check >= 5: - connection_check = 0 - print(f"⏱️ 已运行 {i+1} 秒 | WebSocket 状态: {'🟢 运行中' if ws_monitor.is_running() else '🔴 未运行'}") + if (i + 1) % 5 == 0: + elapsed = time.time() - start_time + current_count = len(price_updates) + new_updates = current_count - last_price_count + last_price_count = current_count + + print(f"\n⏱️ 已运行 {i+1} 秒 | 收到 {current_count} 次价格更新 (最近5秒: +{new_updates})") + print(f" WebSocket 状态: {'🟢 运行中' if ws_monitor.is_running() else '🔴 未运行'}") + + # 显示当前缓存的价格 + for symbol in symbols: + price = ws_monitor.get_latest_price(symbol) + if price: + print(f" 📊 {symbol}: ${price:,.2f}") # 显示获取到的价格 print("\n" + "=" * 60) - print("📊 获取到的价格:") + print("📊 最终结果:") print("=" * 60) + print(f" 共收到 {len(price_updates)} 次价格更新") for symbol in symbols: price = ws_monitor.get_latest_price(symbol) if price: @@ -79,6 +100,11 @@ async def main(): print("\n停止服务...") ws_monitor.stop() + # 等待线程结束 + if ws_monitor._thread: + ws_monitor._thread.join(timeout=3) + print(f"线程已结束: {not ws_monitor._thread.is_alive()}") + print("=" * 60) print("✅ 测试完成") print("=" * 60)