From 60a5410907b76266ed28630596d502a578ce3e3e Mon Sep 17 00:00:00 2001 From: aaron <> Date: Fri, 6 Feb 2026 23:35:16 +0800 Subject: [PATCH] update --- backend/app/crypto_agent/crypto_agent.py | 9 +- backend/app/services/binance_service.py | 56 ++++++----- backend/app/services/price_monitor_service.py | 22 +---- frontend/paper-trading.html | 1 + test_websocket.py | 94 ------------------- 5 files changed, 47 insertions(+), 135 deletions(-) delete mode 100644 test_websocket.py diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index afe743f..b2ea31a 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -52,6 +52,7 @@ class CryptoAgent: # 运行状态 self.running = False + self._event_loop = None # 保存主事件循环引用 logger.info(f"加密货币智能体初始化完成,监控交易对: {self.symbols}") if self.paper_trading_enabled: @@ -66,8 +67,11 @@ class CryptoAgent: triggered = self.paper_trading.check_price_triggers(symbol, price) for result in triggered: - # 异步发送平仓通知 - asyncio.create_task(self._notify_order_closed(result)) + # 使用 asyncio.run_coroutine_threadsafe 从 WebSocket 线程安全地调度协程 + if self._event_loop and self._event_loop.is_running(): + asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop) + else: + logger.warning(f"无法发送平仓通知: 事件循环不可用") async def _notify_order_closed(self, result: Dict[str, Any]): """发送订单平仓通知""" @@ -123,6 +127,7 @@ class CryptoAgent: async def run(self): """主运行循环 - 在5的倍数分钟执行""" self.running = True + self._event_loop = asyncio.get_event_loop() # 保存事件循环引用 # 启动横幅 logger.info("\n" + "=" * 60) diff --git a/backend/app/services/binance_service.py b/backend/app/services/binance_service.py index 4870444..bd65f73 100644 --- a/backend/app/services/binance_service.py +++ b/backend/app/services/binance_service.py @@ -1,39 +1,41 @@ """ Binance 数据服务 - 获取加密货币 K 线数据和技术指标 +使用 requests 直接调用 REST API,避免与 WebSocket 的事件循环冲突 """ import pandas as pd import numpy as np +import requests from typing import Dict, List, Optional, Any -from binance.client import Client -from binance.enums import ( - KLINE_INTERVAL_5MINUTE, - KLINE_INTERVAL_15MINUTE, - KLINE_INTERVAL_1HOUR, - KLINE_INTERVAL_4HOUR -) from app.utils.logger import logger class BinanceService: - """Binance 数据服务""" + """Binance 数据服务(使用 requests 直接调用 REST API)""" # K线周期映射 INTERVALS = { - '5m': KLINE_INTERVAL_5MINUTE, - '15m': KLINE_INTERVAL_15MINUTE, - '1h': KLINE_INTERVAL_1HOUR, - '4h': KLINE_INTERVAL_4HOUR + '5m': '5m', + '15m': '15m', + '1h': '1h', + '4h': '4h' } + # Binance API 基础 URL + BASE_URL = "https://api.binance.com" + def __init__(self, api_key: str = "", api_secret: str = ""): """ - 初始化 Binance 客户端 + 初始化 Binance 服务 Args: api_key: API 密钥(可选,公开数据不需要) api_secret: API 密钥(可选) """ - self.client = Client(api_key=api_key, api_secret=api_secret) + self._api_key = api_key + self._api_secret = api_secret + self._session = requests.Session() + if api_key: + self._session.headers.update({'X-MBX-APIKEY': api_key}) logger.info("Binance 服务初始化完成") def get_klines(self, symbol: str, interval: str, limit: int = 100) -> pd.DataFrame: @@ -50,11 +52,15 @@ class BinanceService: """ try: binance_interval = self.INTERVALS.get(interval, interval) - klines = self.client.get_klines( - symbol=symbol, - interval=binance_interval, - limit=limit - ) + url = f"{self.BASE_URL}/api/v3/klines" + params = { + 'symbol': symbol, + 'interval': binance_interval, + 'limit': limit + } + response = self._session.get(url, params=params, timeout=10) + response.raise_for_status() + klines = response.json() return self._parse_klines(klines) except Exception as e: logger.error(f"获取 {symbol} {interval} K线数据失败: {e}") @@ -223,7 +229,11 @@ class BinanceService: def get_current_price(self, symbol: str) -> Optional[float]: """获取当前价格""" try: - ticker = self.client.get_symbol_ticker(symbol=symbol) + url = f"{self.BASE_URL}/api/v3/ticker/price" + params = {'symbol': symbol} + response = self._session.get(url, params=params, timeout=10) + response.raise_for_status() + ticker = response.json() return float(ticker['price']) except Exception as e: logger.error(f"获取 {symbol} 当前价格失败: {e}") @@ -232,7 +242,11 @@ class BinanceService: def get_24h_stats(self, symbol: str) -> Optional[Dict[str, Any]]: """获取 24 小时统计数据""" try: - stats = self.client.get_ticker(symbol=symbol) + url = f"{self.BASE_URL}/api/v3/ticker/24hr" + params = {'symbol': symbol} + response = self._session.get(url, params=params, timeout=10) + response.raise_for_status() + stats = response.json() return { 'price': float(stats['lastPrice']), 'price_change': float(stats['priceChange']), diff --git a/backend/app/services/price_monitor_service.py b/backend/app/services/price_monitor_service.py index 52de027..9dffb86 100644 --- a/backend/app/services/price_monitor_service.py +++ b/backend/app/services/price_monitor_service.py @@ -10,9 +10,10 @@ from typing import Dict, List, Callable, Optional, Set from app.utils.logger import logger from app.config import get_settings -# 抑制 binance 库的 WebSocket 错误日志 -logging.getLogger('binance.websocket.reconnecting_websocket').setLevel(logging.CRITICAL) -logging.getLogger('binance.websocket.threaded_stream').setLevel(logging.CRITICAL) +# 抑制 binance 库的 WebSocket 错误日志(正确的 logger 名称) +logging.getLogger('binance.ws.threaded_stream').setLevel(logging.CRITICAL) +logging.getLogger('binance.ws.reconnecting_websocket').setLevel(logging.CRITICAL) +logging.getLogger('binance.ws').setLevel(logging.WARNING) # 只显示警告及以上 class SuppressOutput: @@ -77,21 +78,6 @@ class PriceMonitorService: # 延迟导入,避免在模块加载时就创建事件循环 from binance import ThreadedWebsocketManager - # Monkey patch: 抑制 binance 库的 "Read loop has been closed" 错误消息 - try: - from binance.ws import reconnecting_websocket - original_print = print - - def filtered_print(*args, **kwargs): - # 过滤掉 binance 的 read loop 错误消息 - if args and "Read loop" in str(args[0]): - return - original_print(*args, **kwargs) - - reconnecting_websocket.print = filtered_print - except Exception: - pass # 如果 patch 失败,继续运行 - self.twm = ThreadedWebsocketManager( api_key=self.settings.binance_api_key or "", api_secret=self.settings.binance_api_secret or "" diff --git a/frontend/paper-trading.html b/frontend/paper-trading.html index eab6ab5..1a51394 100644 --- a/frontend/paper-trading.html +++ b/frontend/paper-trading.html @@ -14,6 +14,7 @@ .trading-container { max-width: 1400px; + min-width: 800px; margin: 0 auto; } diff --git a/test_websocket.py b/test_websocket.py deleted file mode 100644 index a774e0b..0000000 --- a/test_websocket.py +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python3 -""" -测试 Binance WebSocket 连接 -""" -import time -import sys - -def test_websocket(): - print("=" * 50) - print("Binance WebSocket 连接测试") - print("=" * 50) - - # 1. 测试基本网络连接 - print("\n1. 测试网络连接...") - try: - import socket - socket.setdefaulttimeout(10) - socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(("stream.binance.com", 443)) - print(" ✅ 可以连接到 stream.binance.com:443") - except Exception as e: - print(f" ❌ 无法连接到 Binance 服务器: {e}") - print(" 提示: 可能需要使用代理或 VPN") - return False - - # 2. 测试 REST API - print("\n2. 测试 REST API...") - try: - import requests - resp = requests.get("https://api.binance.com/api/v3/ping", timeout=10) - if resp.status_code == 200: - print(" ✅ REST API 正常") - else: - print(f" ❌ REST API 返回状态码: {resp.status_code}") - except Exception as e: - print(f" ❌ REST API 请求失败: {e}") - - # 3. 测试 WebSocket 连接 - print("\n3. 测试 WebSocket 连接...") - try: - from binance import ThreadedWebsocketManager - - received_data = [] - - def handle_message(msg): - if msg.get('e') == 'error': - print(f" ❌ WebSocket 错误: {msg}") - else: - symbol = msg.get('s', 'unknown') - price = msg.get('c', 'unknown') - received_data.append(msg) - print(f" 📊 收到数据: {symbol} = ${price}") - - print(" 正在启动 WebSocket...") - twm = ThreadedWebsocketManager() - twm.start() - - # 等待启动 - time.sleep(2) - - print(" 正在订阅 BTCUSDT...") - twm.start_symbol_ticker_socket(callback=handle_message, symbol="BTCUSDT") - - # 等待数据 - print(" 等待数据 (10秒)...") - for i in range(10): - time.sleep(1) - if received_data: - print(f"\n ✅ WebSocket 连接正常!已收到 {len(received_data)} 条数据") - twm.stop() - return True - - print(" ❌ 10秒内未收到任何数据") - twm.stop() - return False - - except Exception as e: - print(f" ❌ WebSocket 测试失败: {e}") - import traceback - traceback.print_exc() - return False - - -if __name__ == "__main__": - success = test_websocket() - print("\n" + "=" * 50) - if success: - print("结论: WebSocket 连接正常") - else: - print("结论: WebSocket 连接有问题") - print("\n可能的解决方案:") - print("1. 检查网络连接") - print("2. 使用代理/VPN (某些地区无法直接访问 Binance)") - print("3. 检查防火墙设置") - print("=" * 50)