stock-ai-agent/scripts/test_websocket_monitor.py
2026-02-20 22:49:23 +08:00

93 lines
2.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
测试 WebSocket 价格监控服务
"""
import sys
import os
import asyncio
# 确保路径正确
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
backend_dir = os.path.join(project_root, 'backend')
sys.path.insert(0, backend_dir)
from app.services.websocket_monitor import get_ws_price_monitor
from app.utils.logger import logger
def on_price_update(symbol: str, price: float):
"""价格更新回调"""
print(f"📊 {symbol}: ${price:,.2f}")
async def main():
print("=" * 60)
print("🔌 测试 WebSocket 价格监控服务")
print("=" * 60)
ws_monitor = get_ws_price_monitor()
# 订阅几个交易对
symbols = ['BTCUSDT', 'ETHUSDT']
print(f"\n订阅交易对: {', '.join(symbols)}")
for symbol in symbols:
ws_monitor.subscribe_symbol(symbol)
# 注册回调
ws_monitor.add_price_callback(on_price_update)
print("\n等待价格推送30秒...")
print("提示: WebSocket 连接可能需要几秒钟建立...")
print("-" * 60)
# 使用 asyncio.sleep 而不是 time.sleep让事件循环运行
connection_check = 0
for i in range(30):
await asyncio.sleep(1)
# 检查 WebSocket 运行状态
if i == 2:
print(f"📡 WebSocket 运行状态: {ws_monitor.is_running()}")
print(f"📡 已订阅交易对: {ws_monitor.get_subscribed_symbols()}")
# 每秒检查一次价格
for symbol in symbols:
price = ws_monitor.get_latest_price(symbol)
if price:
print(f"📊 {symbol}: ${price:,.2f}")
# 每5秒打印一次状态
connection_check += 1
if connection_check >= 5:
connection_check = 0
print(f"⏱️ 已运行 {i+1} 秒 | WebSocket 状态: {'🟢 运行中' if ws_monitor.is_running() else '🔴 未运行'}")
# 显示获取到的价格
print("\n" + "=" * 60)
print("📊 获取到的价格:")
print("=" * 60)
for symbol in symbols:
price = ws_monitor.get_latest_price(symbol)
if price:
print(f" {symbol}: ${price:,.2f}")
else:
print(f" {symbol}: 未获取到价格")
# 停止服务
print("\n停止服务...")
ws_monitor.stop()
print("=" * 60)
print("✅ 测试完成")
print("=" * 60)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n⚠️ 测试中断")
sys.exit(0)