From 514aceaac68bd0b97a3b42c935b8587a69826220 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Fri, 20 Feb 2026 22:49:23 +0800 Subject: [PATCH] update --- backend/app/crypto_agent/crypto_agent.py | 34 ++- backend/app/main.py | 150 +++++++----- backend/app/services/websocket_monitor.py | 279 ++++++++++++++++++++++ scripts/test_websocket_monitor.py | 92 +++++++ 4 files changed, 483 insertions(+), 72 deletions(-) create mode 100644 backend/app/services/websocket_monitor.py create mode 100644 scripts/test_websocket_monitor.py diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index 904ca2e..1fa6216 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -206,11 +206,26 @@ class CryptoAgent: logger.info(f" 模拟交易: 已启用") logger.info("=" * 60 + "\n") - # 启动价格监控 - if self.paper_trading_enabled and self.price_monitor: - for symbol in self.symbols: - self.price_monitor.subscribe_symbol(symbol) - logger.info(f"已启动 WebSocket 价格监控: {', '.join(self.symbols)}") + # 启动 WebSocket 价格监控(替代轮询模式) + if self.paper_trading_enabled and self.symbols: + try: + from app.services.websocket_monitor import get_ws_price_monitor + self.ws_monitor = get_ws_price_monitor() + + for symbol in self.symbols: + self.ws_monitor.subscribe_symbol(symbol) + # 同时注册回调(用于模拟交易的价格触发) + self.ws_monitor.add_price_callback(self._on_price_update) + + logger.info(f"已启动 WebSocket 价格监控: {', '.join(self.symbols)}") + except Exception as e: + logger.error(f"WebSocket 价格监控启动失败,将使用轮询模式: {e}") + # 降级到轮询模式 + self.price_monitor = get_price_monitor_service() + for symbol in self.symbols: + self.price_monitor.subscribe_symbol(symbol) + self.price_monitor.add_price_callback(self._on_price_update) + logger.info(f"已启动轮询模式价格监控: {', '.join(self.symbols)}") # 发送启动通知 await self.feishu.send_text( @@ -251,8 +266,15 @@ class CryptoAgent: def stop(self): """停止运行""" self.running = False - if self.price_monitor: + + # 停止 WebSocket 价格监控 + if hasattr(self, 'ws_monitor') and self.ws_monitor: + self.ws_monitor.stop() + + # 停止轮询价格监控(如果存在) + if hasattr(self, 'price_monitor') and self.price_monitor: self.price_monitor.stop() + logger.info("加密货币智能体已停止") async def analyze_symbol(self, symbol: str): diff --git a/backend/app/main.py b/backend/app/main.py index 321849f..c4c7f0f 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -21,50 +21,37 @@ _crypto_agent_task = None async def price_monitor_loop(): - """后台价格监控循环 - 检查止盈止损""" + """后台价格监控循环 - 使用 WebSocket 实时检查止盈止损""" from app.services.paper_trading_service import get_paper_trading_service - from app.services.binance_service import binance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service + from app.services.websocket_monitor import get_ws_price_monitor - logger.info("后台价格监控任务已启动") + logger.info("后台价格监控任务已启动(WebSocket 模式)") - while True: + feishu = get_feishu_service() + telegram = get_telegram_service() + paper_trading = get_paper_trading_service() + ws_monitor = get_ws_price_monitor() + + # 价格更新回调 - 检查止盈止损 + async def on_price_update(symbol: str, price: float): + """价格更新时检查止盈止损""" try: - paper_trading = get_paper_trading_service() - feishu = get_feishu_service() - telegram = get_telegram_service() + # 检查止盈止损 + triggered = paper_trading.check_price_triggers(symbol, price) - # 获取活跃订单 - active_orders = paper_trading.get_active_orders() - if not active_orders: - await asyncio.sleep(10) # 没有活跃订单时,10秒检查一次 - continue + # 发送通知 + for result in triggered: + status = result.get('status', '') + event_type = result.get('event_type', 'order_closed') - # 获取所有需要的交易对 - symbols = set(order.get('symbol') for order in active_orders if order.get('symbol')) + # 处理挂单成交事件 + if event_type == 'order_filled': + side_text = "做多" if result.get('side') == 'long' else "做空" + grade = result.get('signal_grade', 'N/A') - # 获取价格并检查止盈止损 - for symbol in symbols: - try: - price = binance_service.get_current_price(symbol) - if not price: - continue - - # 检查止盈止损 - triggered = paper_trading.check_price_triggers(symbol, price) - - # 发送通知 - for result in triggered: - status = result.get('status', '') - event_type = result.get('event_type', 'order_closed') - - # 处理挂单成交事件 - if event_type == 'order_filled': - side_text = "做多" if result.get('side') == 'long' else "做空" - grade = result.get('signal_grade', 'N/A') - - message = f"""✅ 挂单成交 + message = f"""✅ 挂单成交 交易对: {result.get('symbol')} 方向: {side_text} @@ -75,32 +62,32 @@ async def price_monitor_loop(): 止损: ${result.get('stop_loss', 0):,.2f} 止盈: ${result.get('take_profit', 0):,.2f}""" - # 发送通知 - await feishu.send_text(message) - await telegram.send_message(message) - logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}") - continue + # 发送通知 + await feishu.send_text(message) + await telegram.send_message(message) + logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}") + continue - # 处理订单平仓事件 - is_win = result.get('is_win', False) + # 处理订单平仓事件 + is_win = result.get('is_win', False) - if status == 'closed_tp': - emoji = "🎯" - status_text = "止盈平仓" - elif status == 'closed_sl': - emoji = "🛑" - status_text = "止损平仓" - elif status == 'closed_be': - emoji = "🔒" - status_text = "保本止损" - else: - emoji = "📤" - status_text = "平仓" + if status == 'closed_tp': + emoji = "🎯" + status_text = "止盈平仓" + elif status == 'closed_sl': + emoji = "🛑" + status_text = "止损平仓" + elif status == 'closed_be': + emoji = "🔒" + status_text = "保本止损" + else: + emoji = "📤" + status_text = "平仓" - win_text = "盈利" if is_win else "亏损" - side_text = "做多" if result.get('side') == 'long' else "做空" + win_text = "盈利" if is_win else "亏损" + side_text = "做多" if result.get('side') == 'long' else "做空" - message = f"""{emoji} 订单{status_text} + message = f"""{emoji} 订单{status_text} 交易对: {result.get('symbol')} 方向: {side_text} @@ -109,20 +96,51 @@ async def price_monitor_loop(): {win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f}) 持仓时间: {result.get('hold_duration', 'N/A')}""" - # 发送通知 - await feishu.send_text(message) - await telegram.send_message(message) - logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}") + # 发送通知 + await feishu.send_text(message) + await telegram.send_message(message) + logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}") - except Exception as e: - logger.error(f"检查 {symbol} 价格失败: {e}") + except Exception as e: + logger.error(f"处理 {symbol} 价格更新失败: {e}") - # 每 3 秒检查一次 - await asyncio.sleep(3) + # 注册 WebSocket 回调 + ws_monitor.add_price_callback(lambda s, p: asyncio.create_task(on_price_update(s, p))) + + # 持续监控活跃订单,动态订阅/取消订阅 + monitored_symbols = set() + + while True: + try: + # 获取活跃订单 + active_orders = paper_trading.get_active_orders() + + # 获取需要监控的交易对 + current_symbols = set(order.get('symbol') for order in active_orders if order.get('symbol')) + + # 订阅新交易对 + new_symbols = current_symbols - monitored_symbols + for symbol in new_symbols: + ws_monitor.subscribe_symbol(symbol) + logger.info(f"后台监控订阅 {symbol}") + + # 取消订阅不再需要的交易对(可选,这里保留订阅以便快速响应) + # unmonitored_symbols = monitored_symbols - current_symbols + # for symbol in unmonitored_symbols: + # ws_monitor.unsubscribe_symbol(symbol) + + monitored_symbols = current_symbols + + # 没有活跃订单时,等待 10 秒 + if not active_orders: + await asyncio.sleep(10) + else: + # 有活跃订单时,每 30 秒检查一次订阅状态 + await asyncio.sleep(30) except Exception as e: logger.error(f"价格监控循环出错: {e}") - await asyncio.sleep(5) + await asyncio.sleep(60) async def periodic_report_loop(): diff --git a/backend/app/services/websocket_monitor.py b/backend/app/services/websocket_monitor.py new file mode 100644 index 0000000..81572e8 --- /dev/null +++ b/backend/app/services/websocket_monitor.py @@ -0,0 +1,279 @@ +""" +WebSocket 价格监控服务 - 使用 Binance WebSocket API 实现实时价格推送 +""" +import json +import asyncio +import threading +from typing import Dict, List, Callable, Optional, Set +from datetime import datetime +import websockets +from app.utils.logger import logger + + +class WebSocketPriceMonitor: + """WebSocket 实时价格监控服务""" + + # Binance WebSocket 端点 + BASE_WS_URL = "wss://stream.binance.com:9443/ws" + + def __init__(self): + """初始化 WebSocket 价格监控服务""" + self._ws = None + self._loop = None + self._thread = None + self._running = False + self._subscribed_symbols: Set[str] = set() + self._price_callbacks: List[Callable[[str, float], None]] = [] + self._latest_prices: Dict[str, float] = {} + self._lock = threading.Lock() + self._last_heartbeat: Optional[datetime] = None + + # 连接和重连配置 + self._reconnect_delay = 5 # 重连延迟(秒) + self._max_reconnect_attempts = 10 + + logger.info("WebSocket 价格监控服务初始化完成") + + def is_running(self) -> bool: + """检查服务是否在运行""" + return self._running and self._ws is not None and self._running + + def subscribe_symbol(self, symbol: str): + """ + 订阅交易对的价格推送 + + Args: + symbol: 交易对,如 "BTCUSDT" + """ + symbol = symbol.upper() + + with self._lock: + if symbol in self._subscribed_symbols: + logger.debug(f"[WS:{id(self)}] {symbol} 已订阅,跳过") + return + + self._subscribed_symbols.add(symbol) + + # 如果服务未运行,自动启动 + if not self.is_running(): + self.start() + + # 立即获取当前价格 + self._fetch_current_price(symbol) + + logger.info(f"[WS:{id(self)}] 已订阅 {symbol} 价格更新 (当前订阅: {self._subscribed_symbols})") + + def unsubscribe_symbol(self, symbol: str): + """取消订阅交易对""" + symbol = symbol.upper() + + with self._lock: + if symbol in self._subscribed_symbols: + self._subscribed_symbols.discard(symbol) + self._latest_prices.pop(symbol, None) + logger.info(f"[WS:{id(self)}] 已取消订阅 {symbol}") + + # 如果没有订阅了,可以考虑断开连接 + if not self._subscribed_symbols: + logger.info(f"[WS:{id(self)}] 没有订阅的交易对,准备断开连接") + + def add_price_callback(self, callback: Callable[[str, float], None]): + """添加价格更新回调函数""" + with self._lock: + if callback not in self._price_callbacks: + self._price_callbacks.append(callback) + + def remove_price_callback(self, callback: Callable): + """移除价格回调函数""" + with self._lock: + if callback in self._price_callbacks: + self._price_callbacks.remove(callback) + + def get_latest_price(self, symbol: str) -> Optional[float]: + """获取交易对的最新缓存价格""" + return self._latest_prices.get(symbol.upper()) + + def get_subscribed_symbols(self) -> List[str]: + """获取已订阅的交易对列表""" + with self._lock: + return list(self._subscribed_symbols) + + def start(self): + """启动 WebSocket 连接""" + with self._lock: + if self._running: + logger.debug(f"[WS:{id(self)}] WebSocket 服务已在运行") + return + + self._running = True + + # 在新线程中运行事件循环 + self._thread = threading.Thread(target=self._run_event_loop, daemon=True) + self._thread.start() + + def stop(self): + """停止 WebSocket 连接""" + with self._lock: + if not self._running: + return + + self._running = False + + # 关闭 WebSocket 连接 + if self._loop and self._loop.is_running(): + self._loop.call_soon_threadsafe(self._close_ws()) + + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=5) + + logger.info(f"[WS:{id(self)}] WebSocket 价格监控服务已停止") + + def _run_event_loop(self): + """运行 WebSocket 事件循环(在单独线程中)""" + # 创建新的事件循环 + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + try: + self._loop.run_until_complete(self._connect_and_listen()) + except Exception as e: + logger.error(f"[WS:{id(self)}] WebSocket 事件循环出错: {e}") + finally: + self._loop.close() + + async def _connect_and_listen(self): + """连接并监听 WebSocket 消息""" + retry_count = 0 + + while self._running and retry_count < self._max_reconnect_attempts: + try: + # 构建订阅流 + with self._lock: + symbols = list(self._subscribed_symbols) + + if not symbols: + # 没有订阅的交易对,等待订阅 + logger.debug(f"[WS:{id(self)}] 没有订阅的交易对,等待 5 秒") + await asyncio.sleep(5) + continue + + # 构建 WebSocket 流 + streams = [] + for symbol in symbols: + streams.append(f"{symbol.lower()}@ticker") + + # Binance 组合流 URL 格式: /stream?streams=btcusdt@ticker/ethusdt@ticker + url = f"{self.BASE_WS_URL}/stream?streams={'/'.join(streams)}" + + logger.info(f"[WS:{id(self)}] 正在连接 WebSocket... (订阅: {', '.join(symbols)})") + logger.debug(f"[WS:{id(self)}] WebSocket URL: {url}") + + async with websockets.connect(url, ping_interval=30) as ws: + self._ws = ws + retry_count = 0 # 连接成功,重置重试计数 + self._last_heartbeat = datetime.now() + + logger.info(f"[WS:{id(self)}] WebSocket 已连接") + + # 监听消息 + async for message in self._ws: + await self._on_message(message) + + except websockets.exceptions.ConnectionClosed as e: + logger.warning(f"[WS:{id(self)}] WebSocket 连接关闭: {e}") + except websockets.exceptions.ConnectionError as e: + logger.error(f"[WS:{id(self)}] WebSocket 连接错误: {e}") + except Exception as e: + logger.error(f"[WS:{id(self)}] WebSocket 异常: {e}") + + # 检查是否需要重连 + with self._lock: + should_reconnect = self._running and self._subscribed_symbols and retry_count < self._max_reconnect_attempts + + if should_reconnect: + retry_count += 1 + logger.info(f"[WS:{id(self)}] 将在 {self._reconnect_delay} 秒后重连... (尝试 {retry_count}/{self._max_reconnect_attempts})") + await asyncio.sleep(self._reconnect_delay) + else: + if self._running: + logger.warning(f"[WS:{id(self)}] 达到最大重连次数,停止服务") + self._running = False + break + + async def _on_message(self, message): + """处理 WebSocket 消息""" + try: + data = json.loads(message) + + # 处理不同的消息类型 + if data.get('e') == '24hrTicker': # 24小时价格变动 + symbol = data.get('s') + if symbol: + # 解析价格 + price = float(data.get('c', 0)) # 当前价格 + self._update_price(symbol.upper(), price) + + elif data.get('result') is not None and isinstance(data['result'], list): + # 多个交易对的价格推送 + for item in data['result']: + symbol = item.get('s') + if symbol: + price = float(item.get('c', 0)) + self._update_price(symbol.upper(), price) + + except json.JSONDecodeError as e: + logger.error(f"[WS:{id(self)}] 解析 WebSocket 消息失败: {e}") + except Exception as e: + logger.error(f"[WS:{id(self)}] 处理 WebSocket 消息出错: {e}") + + def _update_price(self, symbol: str, price: float): + """更新价格并触发回调""" + old_price = self._latest_prices.get(symbol) + + # 只有价格变化时才触发回调 + if old_price != price: + self._latest_prices[symbol] = price + + # 调用所有注册的回调函数 + with self._lock: + callbacks = self._price_callbacks.copy() + + # 在线程中执行回调 + for callback in callbacks: + try: + callback(symbol, price) + except Exception as e: + logger.error(f"[WS:{id(self)}] 价格回调执行出错: {e}") + + async def _close_ws(self): + """关闭 WebSocket 连接""" + if self._ws: + await self._ws.close() + self._ws = None + logger.info(f"[WS:{id(self)}] WebSocket 连接已关闭") + + def _fetch_current_price(self, symbol: str): + """立即获取当前价格(WebSocket 连接建立前的临时方案)""" + try: + import requests + url = f"https://api.binance.com/api/v3/ticker/price?symbol={symbol}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + data = response.json() + price = float(data['price']) + self._latest_prices[symbol] = price + logger.debug(f"[WS:{id(self)}] 获取 {symbol} 当前价格: ${price:,.2f}") + except Exception as e: + logger.warning(f"[WS:{id(self)}] 获取 {symbol} 当前价格失败: {e}") + + +# 全局单例 +_ws_monitor: Optional[WebSocketPriceMonitor] = None + + +def get_ws_price_monitor() -> WebSocketPriceMonitor: + """获取 WebSocket 价格监控服务单例""" + global _ws_monitor + if _ws_monitor is None: + _ws_monitor = WebSocketPriceMonitor() + return _ws_monitor diff --git a/scripts/test_websocket_monitor.py b/scripts/test_websocket_monitor.py new file mode 100644 index 0000000..bf1fff5 --- /dev/null +++ b/scripts/test_websocket_monitor.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +测试 WebSocket 价格监控服务 +""" +import sys +import os +import asyncio + +# 确保路径正确 +script_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(script_dir) +backend_dir = os.path.join(project_root, 'backend') +sys.path.insert(0, backend_dir) + +from app.services.websocket_monitor import get_ws_price_monitor +from app.utils.logger import logger + + +def on_price_update(symbol: str, price: float): + """价格更新回调""" + print(f"📊 {symbol}: ${price:,.2f}") + + +async def main(): + print("=" * 60) + print("🔌 测试 WebSocket 价格监控服务") + print("=" * 60) + + ws_monitor = get_ws_price_monitor() + + # 订阅几个交易对 + symbols = ['BTCUSDT', 'ETHUSDT'] + + print(f"\n订阅交易对: {', '.join(symbols)}") + for symbol in symbols: + ws_monitor.subscribe_symbol(symbol) + + # 注册回调 + ws_monitor.add_price_callback(on_price_update) + + print("\n等待价格推送(30秒)...") + print("提示: WebSocket 连接可能需要几秒钟建立...") + print("-" * 60) + + # 使用 asyncio.sleep 而不是 time.sleep,让事件循环运行 + connection_check = 0 + for i in range(30): + await asyncio.sleep(1) + + # 检查 WebSocket 运行状态 + if i == 2: + print(f"📡 WebSocket 运行状态: {ws_monitor.is_running()}") + print(f"📡 已订阅交易对: {ws_monitor.get_subscribed_symbols()}") + + # 每秒检查一次价格 + for symbol in symbols: + price = ws_monitor.get_latest_price(symbol) + if price: + print(f"📊 {symbol}: ${price:,.2f}") + + # 每5秒打印一次状态 + connection_check += 1 + if connection_check >= 5: + connection_check = 0 + print(f"⏱️ 已运行 {i+1} 秒 | WebSocket 状态: {'🟢 运行中' if ws_monitor.is_running() else '🔴 未运行'}") + + # 显示获取到的价格 + print("\n" + "=" * 60) + print("📊 获取到的价格:") + print("=" * 60) + for symbol in symbols: + price = ws_monitor.get_latest_price(symbol) + if price: + print(f" {symbol}: ${price:,.2f}") + else: + print(f" {symbol}: 未获取到价格") + + # 停止服务 + print("\n停止服务...") + ws_monitor.stop() + + print("=" * 60) + print("✅ 测试完成") + print("=" * 60) + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n\n⚠️ 测试中断") + sys.exit(0)