diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index 1fa6216..54e3e3c 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -206,26 +206,19 @@ class CryptoAgent: logger.info(f" 模拟交易: 已启用") logger.info("=" * 60 + "\n") - # 启动 WebSocket 价格监控(替代轮询模式) + # 启动价格监控(轮询模式) if self.paper_trading_enabled and self.symbols: try: - from app.services.websocket_monitor import get_ws_price_monitor - self.ws_monitor = get_ws_price_monitor() - - for symbol in self.symbols: - self.ws_monitor.subscribe_symbol(symbol) - # 同时注册回调(用于模拟交易的价格触发) - self.ws_monitor.add_price_callback(self._on_price_update) - - logger.info(f"已启动 WebSocket 价格监控: {', '.join(self.symbols)}") - except Exception as e: - logger.error(f"WebSocket 价格监控启动失败,将使用轮询模式: {e}") - # 降级到轮询模式 + from app.services.price_monitor_service import get_price_monitor_service self.price_monitor = get_price_monitor_service() + for symbol in self.symbols: self.price_monitor.subscribe_symbol(symbol) self.price_monitor.add_price_callback(self._on_price_update) + logger.info(f"已启动轮询模式价格监控: {', '.join(self.symbols)}") + except Exception as e: + logger.error(f"价格监控启动失败: {e}") # 发送启动通知 await self.feishu.send_text( @@ -267,11 +260,7 @@ class CryptoAgent: """停止运行""" self.running = False - # 停止 WebSocket 价格监控 - if hasattr(self, 'ws_monitor') and self.ws_monitor: - self.ws_monitor.stop() - - # 停止轮询价格监控(如果存在) + # 停止轮询价格监控 if hasattr(self, 'price_monitor') and self.price_monitor: self.price_monitor.stop() diff --git a/backend/app/main.py b/backend/app/main.py index c4c7f0f..f8e49d2 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -21,18 +21,17 @@ _crypto_agent_task = None async def price_monitor_loop(): - """后台价格监控循环 - 使用 WebSocket 实时检查止盈止损""" + """后台价格监控循环 - 使用轮询检查止盈止损""" from app.services.paper_trading_service import get_paper_trading_service + from app.services.binance_service import binance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service - from app.services.websocket_monitor import get_ws_price_monitor - logger.info("后台价格监控任务已启动(WebSocket 模式)") + logger.info("后台价格监控任务已启动(轮询模式)") feishu = get_feishu_service() telegram = get_telegram_service() paper_trading = get_paper_trading_service() - ws_monitor = get_ws_price_monitor() # 价格更新回调 - 检查止盈止损 async def on_price_update(symbol: str, price: float): @@ -104,43 +103,97 @@ async def price_monitor_loop(): except Exception as e: logger.error(f"处理 {symbol} 价格更新失败: {e}") - # 注册 WebSocket 回调 - ws_monitor.add_price_callback(lambda s, p: asyncio.create_task(on_price_update(s, p))) - - # 持续监控活跃订单,动态订阅/取消订阅 - monitored_symbols = set() - + # 持续监控活跃订单 while True: try: # 获取活跃订单 active_orders = paper_trading.get_active_orders() - - # 获取需要监控的交易对 - current_symbols = set(order.get('symbol') for order in active_orders if order.get('symbol')) - - # 订阅新交易对 - new_symbols = current_symbols - monitored_symbols - for symbol in new_symbols: - ws_monitor.subscribe_symbol(symbol) - logger.info(f"后台监控订阅 {symbol}") - - # 取消订阅不再需要的交易对(可选,这里保留订阅以便快速响应) - # unmonitored_symbols = monitored_symbols - current_symbols - # for symbol in unmonitored_symbols: - # ws_monitor.unsubscribe_symbol(symbol) - - monitored_symbols = current_symbols - - # 没有活跃订单时,等待 10 秒 if not active_orders: - await asyncio.sleep(10) - else: - # 有活跃订单时,每 30 秒检查一次订阅状态 - await asyncio.sleep(30) + await asyncio.sleep(10) # 没有活跃订单时,10秒检查一次 + continue + + # 获取所有需要的交易对 + symbols = set(order.get('symbol') for order in active_orders if order.get('symbol')) + + # 获取价格并检查止盈止损 + for symbol in symbols: + try: + price = binance_service.get_current_price(symbol) + if not price: + continue + + # 检查止盈止损 + triggered = paper_trading.check_price_triggers(symbol, price) + + # 发送通知 + for result in triggered: + status = result.get('status', '') + event_type = result.get('event_type', 'order_closed') + + # 处理挂单成交事件 + if event_type == 'order_filled': + side_text = "做多" if result.get('side') == 'long' else "做空" + grade = result.get('signal_grade', 'N/A') + + message = f"""✅ 挂单成交 + +交易对: {result.get('symbol')} +方向: {side_text} +等级: {grade} +挂单价: ${result.get('entry_price', 0):,.2f} +成交价: ${result.get('filled_price', 0):,.2f} +仓位: ${result.get('quantity', 0):,.0f} +止损: ${result.get('stop_loss', 0):,.2f} +止盈: ${result.get('take_profit', 0):,.2f}""" + + # 发送通知 + await feishu.send_text(message) + await telegram.send_message(message) + logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}") + continue + + # 处理订单平仓事件 + is_win = result.get('is_win', False) + + if status == 'closed_tp': + emoji = "🎯" + status_text = "止盈平仓" + elif status == 'closed_sl': + emoji = "🛑" + status_text = "止损平仓" + elif status == 'closed_be': + emoji = "🔒" + status_text = "保本止损" + else: + emoji = "📤" + status_text = "平仓" + + win_text = "盈利" if is_win else "亏损" + side_text = "做多" if result.get('side') == 'long' else "做空" + + message = f"""{emoji} 订单{status_text} + +交易对: {result.get('symbol')} +方向: {side_text} +入场: ${result.get('entry_price', 0):,.2f} +出场: ${result.get('exit_price', 0):,.2f} +{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f}) +持仓时间: {result.get('hold_duration', 'N/A')}""" + + # 发送通知 + await feishu.send_text(message) + await telegram.send_message(message) + logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}") + + except Exception as e: + logger.error(f"检查 {symbol} 价格失败: {e}") + + # 每 3 秒检查一次 + await asyncio.sleep(3) except Exception as e: logger.error(f"价格监控循环出错: {e}") - await asyncio.sleep(60) + await asyncio.sleep(5) async def periodic_report_loop(): diff --git a/backend/app/services/binance_service.py b/backend/app/services/binance_service.py index 2bad1a7..4619e07 100644 --- a/backend/app/services/binance_service.py +++ b/backend/app/services/binance_service.py @@ -1,6 +1,6 @@ """ Binance 数据服务 - 获取加密货币 K 线数据和技术指标 -使用 requests 直接调用 REST API,避免与 WebSocket 的事件循环冲突 +使用 requests 直接调用 REST API """ import pandas as pd import numpy as np diff --git a/backend/app/services/websocket_monitor.py b/backend/app/services/websocket_monitor.py deleted file mode 100644 index b53b140..0000000 --- a/backend/app/services/websocket_monitor.py +++ /dev/null @@ -1,284 +0,0 @@ -""" -WebSocket 价格监控服务 - 使用 Binance WebSocket API 实现实时价格推送 -""" -import json -import asyncio -import threading -from typing import Dict, List, Callable, Optional, Set -from datetime import datetime -import websockets -from app.utils.logger import logger - - -class WebSocketPriceMonitor: - """WebSocket 实时价格监控服务""" - - # Binance WebSocket 端点 - BASE_WS_URL = "wss://stream.binance.com:9443/ws" - - def __init__(self): - """初始化 WebSocket 价格监控服务""" - self._ws = None - self._loop = None - self._thread = None - self._running = False - self._subscribed_symbols: Set[str] = set() - self._price_callbacks: List[Callable[[str, float], None]] = [] - self._latest_prices: Dict[str, float] = {} - self._lock = threading.Lock() - self._last_heartbeat: Optional[datetime] = None - - # 连接和重连配置 - self._reconnect_delay = 5 # 重连延迟(秒) - self._max_reconnect_attempts = 10 - - logger.info("WebSocket 价格监控服务初始化完成") - - def is_running(self) -> bool: - """检查服务是否在运行""" - return self._running and self._ws is not None and self._running - - def subscribe_symbol(self, symbol: str): - """ - 订阅交易对的价格推送 - - Args: - symbol: 交易对,如 "BTCUSDT" - """ - symbol = symbol.upper() - - need_start = False - with self._lock: - if symbol in self._subscribed_symbols: - logger.debug(f"[WS:{id(self)}] {symbol} 已订阅,跳过") - return - - self._subscribed_symbols.add(symbol) - - # 检查是否需要启动服务 - if not self.is_running(): - need_start = True - - # 在锁外启动服务(避免死锁) - if need_start: - self.start() - - # 在锁外获取当前价格(避免阻塞) - self._fetch_current_price(symbol) - - logger.info(f"[WS:{id(self)}] 已订阅 {symbol} 价格更新 (当前订阅: {self._subscribed_symbols})") - - def unsubscribe_symbol(self, symbol: str): - """取消订阅交易对""" - symbol = symbol.upper() - - with self._lock: - if symbol in self._subscribed_symbols: - self._subscribed_symbols.discard(symbol) - self._latest_prices.pop(symbol, None) - logger.info(f"[WS:{id(self)}] 已取消订阅 {symbol}") - - # 如果没有订阅了,可以考虑断开连接 - if not self._subscribed_symbols: - logger.info(f"[WS:{id(self)}] 没有订阅的交易对,准备断开连接") - - def add_price_callback(self, callback: Callable[[str, float], None]): - """添加价格更新回调函数""" - with self._lock: - if callback not in self._price_callbacks: - self._price_callbacks.append(callback) - - def remove_price_callback(self, callback: Callable): - """移除价格回调函数""" - with self._lock: - if callback in self._price_callbacks: - self._price_callbacks.remove(callback) - - def get_latest_price(self, symbol: str) -> Optional[float]: - """获取交易对的最新缓存价格""" - return self._latest_prices.get(symbol.upper()) - - def get_subscribed_symbols(self) -> List[str]: - """获取已订阅的交易对列表""" - with self._lock: - return list(self._subscribed_symbols) - - def start(self): - """启动 WebSocket 连接""" - with self._lock: - if self._running: - logger.debug(f"[WS:{id(self)}] WebSocket 服务已在运行") - return - - self._running = True - - # 在新线程中运行事件循环 - self._thread = threading.Thread(target=self._run_event_loop, daemon=True) - self._thread.start() - - def stop(self): - """停止 WebSocket 连接""" - with self._lock: - if not self._running: - return - - self._running = False - - # 关闭 WebSocket 连接 - if self._loop and self._loop.is_running(): - self._loop.call_soon_threadsafe(self._close_ws()) - - if self._thread and self._thread.is_alive(): - self._thread.join(timeout=5) - - logger.info(f"[WS:{id(self)}] WebSocket 价格监控服务已停止") - - def _run_event_loop(self): - """运行 WebSocket 事件循环(在单独线程中)""" - # 创建新的事件循环 - self._loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._loop) - - try: - self._loop.run_until_complete(self._connect_and_listen()) - except Exception as e: - logger.error(f"[WS:{id(self)}] WebSocket 事件循环出错: {e}") - finally: - self._loop.close() - - async def _connect_and_listen(self): - """连接并监听 WebSocket 消息""" - retry_count = 0 - - while self._running and retry_count < self._max_reconnect_attempts: - try: - # 构建订阅流 - with self._lock: - symbols = list(self._subscribed_symbols) - - if not symbols: - # 没有订阅的交易对,等待订阅 - logger.debug(f"[WS:{id(self)}] 没有订阅的交易对,等待 5 秒") - await asyncio.sleep(5) - continue - - # 构建 WebSocket 流 - streams = [] - for symbol in symbols: - streams.append(f"{symbol.lower()}@ticker") - - # Binance 组合流 URL 格式: /stream?streams=btcusdt@ticker/ethusdt@ticker - url = f"{self.BASE_WS_URL}/stream?streams={'/'.join(streams)}" - - logger.info(f"[WS:{id(self)}] 正在连接 WebSocket... (订阅: {', '.join(symbols)})") - logger.debug(f"[WS:{id(self)}] WebSocket URL: {url}") - - async with websockets.connect(url, ping_interval=30) as ws: - self._ws = ws - retry_count = 0 # 连接成功,重置重试计数 - self._last_heartbeat = datetime.now() - - logger.info(f"[WS:{id(self)}] WebSocket 已连接") - - # 监听消息 - async for message in self._ws: - await self._on_message(message) - - except websockets.exceptions.ConnectionClosed as e: - logger.warning(f"[WS:{id(self)}] WebSocket 连接关闭: {e}") - except websockets.exceptions.ConnectionError as e: - logger.error(f"[WS:{id(self)}] WebSocket 连接错误: {e}") - except Exception as e: - logger.error(f"[WS:{id(self)}] WebSocket 异常: {e}") - - # 检查是否需要重连 - with self._lock: - should_reconnect = self._running and self._subscribed_symbols and retry_count < self._max_reconnect_attempts - - if should_reconnect: - retry_count += 1 - logger.info(f"[WS:{id(self)}] 将在 {self._reconnect_delay} 秒后重连... (尝试 {retry_count}/{self._max_reconnect_attempts})") - await asyncio.sleep(self._reconnect_delay) - else: - if self._running: - logger.warning(f"[WS:{id(self)}] 达到最大重连次数,停止服务") - self._running = False - break - - async def _on_message(self, message): - """处理 WebSocket 消息""" - try: - data = json.loads(message) - - # 处理不同的消息类型 - if data.get('e') == '24hrTicker': # 24小时价格变动 - symbol = data.get('s') - if symbol: - # 解析价格 - price = float(data.get('c', 0)) # 当前价格 - self._update_price(symbol.upper(), price) - - elif data.get('result') is not None and isinstance(data['result'], list): - # 多个交易对的价格推送 - for item in data['result']: - symbol = item.get('s') - if symbol: - price = float(item.get('c', 0)) - self._update_price(symbol.upper(), price) - - except json.JSONDecodeError as e: - logger.error(f"[WS:{id(self)}] 解析 WebSocket 消息失败: {e}") - except Exception as e: - logger.error(f"[WS:{id(self)}] 处理 WebSocket 消息出错: {e}") - - def _update_price(self, symbol: str, price: float): - """更新价格并触发回调""" - old_price = self._latest_prices.get(symbol) - - # 只有价格变化时才触发回调 - if old_price != price: - self._latest_prices[symbol] = price - - # 调用所有注册的回调函数 - with self._lock: - callbacks = self._price_callbacks.copy() - - # 在线程中执行回调 - for callback in callbacks: - try: - callback(symbol, price) - except Exception as e: - logger.error(f"[WS:{id(self)}] 价格回调执行出错: {e}") - - async def _close_ws(self): - """关闭 WebSocket 连接""" - if self._ws: - await self._ws.close() - self._ws = None - logger.info(f"[WS:{id(self)}] WebSocket 连接已关闭") - - def _fetch_current_price(self, symbol: str): - """立即获取当前价格(WebSocket 连接建立前的临时方案)""" - try: - import requests - url = f"https://api.binance.com/api/v3/ticker/price?symbol={symbol}" - response = requests.get(url, timeout=5) - if response.status_code == 200: - data = response.json() - price = float(data['price']) - self._latest_prices[symbol] = price - logger.debug(f"[WS:{id(self)}] 获取 {symbol} 当前价格: ${price:,.2f}") - except Exception as e: - logger.warning(f"[WS:{id(self)}] 获取 {symbol} 当前价格失败: {e}") - - -# 全局单例 -_ws_monitor: Optional[WebSocketPriceMonitor] = None - - -def get_ws_price_monitor() -> WebSocketPriceMonitor: - """获取 WebSocket 价格监控服务单例""" - global _ws_monitor - if _ws_monitor is None: - _ws_monitor = WebSocketPriceMonitor() - return _ws_monitor diff --git a/backend/requirements.txt b/backend/requirements.txt index 7f06878..3fa51a8 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -10,7 +10,6 @@ pydantic==2.5.3 pydantic-settings==2.1.0 python-dotenv==1.0.0 slowapi==0.1.9 -websockets>=13.0 pandas>=2.2.0 numpy>=1.26.0 python-multipart==0.0.6 diff --git a/backend/run_crypto_agent.py b/backend/run_crypto_agent.py index c3e4e6a..d8d9857 100644 --- a/backend/run_crypto_agent.py +++ b/backend/run_crypto_agent.py @@ -34,6 +34,6 @@ if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: - # 静默处理 Ctrl+C,避免 WebSocket 关闭时的错误刷屏 + # 静默处理 Ctrl+C print("\n程序已退出") sys.exit(0) diff --git a/scripts/test_websocket_monitor.py b/scripts/test_websocket_monitor.py deleted file mode 100644 index 63478f1..0000000 --- a/scripts/test_websocket_monitor.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env python3 -""" -测试 WebSocket 价格监控服务 -""" -import sys -import os -import asyncio -import threading -import time - -# 确保路径正确 -script_dir = os.path.dirname(os.path.abspath(__file__)) -project_root = os.path.dirname(script_dir) -backend_dir = os.path.join(project_root, 'backend') -sys.path.insert(0, backend_dir) - -from app.services.websocket_monitor import get_ws_price_monitor -from app.utils.logger import logger - - -# 用于接收价格更新的队列 -price_updates = [] - - -def on_price_update(symbol: str, price: float): - """价格更新回调""" - price_updates.append((symbol, price)) - print(f"📊 [回调] {symbol}: ${price:,.2f}") - - -async def main(): - print("=" * 60) - print("🔌 测试 WebSocket 价格监控服务") - print("=" * 60) - - ws_monitor = get_ws_price_monitor() - - # 订阅几个交易对 - symbols = ['BTCUSDT', 'ETHUSDT'] - - print(f"\n订阅交易对: {', '.join(symbols)}") - for symbol in symbols: - print(f" 订阅 {symbol}...") - ws_monitor.subscribe_symbol(symbol) - - # 注册回调 - print("注册价格回调...") - ws_monitor.add_price_callback(on_price_update) - - print("\n等待价格推送(30秒)...") - print("提示: WebSocket 连接可能需要几秒钟建立...") - print("-" * 60) - - # 给 WebSocket 线程一些时间启动 - await asyncio.sleep(2) - - # 检查状态 - print(f"\n📡 WebSocket 运行状态: {ws_monitor.is_running()}") - print(f"📡 已订阅交易对: {ws_monitor.get_subscribed_symbols()}") - print(f"📡 WebSocket 线程存活: {ws_monitor._thread.is_alive() if ws_monitor._thread else 'None'}") - print(f"📡 事件循环: {'已创建' if ws_monitor._loop else '未创建'}") - print("-" * 60) - - # 等待价格更新 - start_time = time.time() - last_price_count = 0 - - for i in range(30): - await asyncio.sleep(1) - - # 每5秒打印一次状态 - if (i + 1) % 5 == 0: - elapsed = time.time() - start_time - current_count = len(price_updates) - new_updates = current_count - last_price_count - last_price_count = current_count - - print(f"\n⏱️ 已运行 {i+1} 秒 | 收到 {current_count} 次价格更新 (最近5秒: +{new_updates})") - print(f" WebSocket 状态: {'🟢 运行中' if ws_monitor.is_running() else '🔴 未运行'}") - - # 显示当前缓存的价格 - for symbol in symbols: - price = ws_monitor.get_latest_price(symbol) - if price: - print(f" 📊 {symbol}: ${price:,.2f}") - - # 显示获取到的价格 - print("\n" + "=" * 60) - print("📊 最终结果:") - print("=" * 60) - print(f" 共收到 {len(price_updates)} 次价格更新") - for symbol in symbols: - price = ws_monitor.get_latest_price(symbol) - if price: - print(f" {symbol}: ${price:,.2f}") - else: - print(f" {symbol}: 未获取到价格") - - # 停止服务 - print("\n停止服务...") - ws_monitor.stop() - - # 等待线程结束 - if ws_monitor._thread: - ws_monitor._thread.join(timeout=3) - print(f"线程已结束: {not ws_monitor._thread.is_alive()}") - - print("=" * 60) - print("✅ 测试完成") - print("=" * 60) - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\n\n⚠️ 测试中断") - sys.exit(0)