diff --git a/backend/app/services/price_monitor_service.py b/backend/app/services/price_monitor_service.py index 9dffb86..056f38c 100644 --- a/backend/app/services/price_monitor_service.py +++ b/backend/app/services/price_monitor_service.py @@ -1,217 +1,129 @@ """ -价格监控服务 - 使用 Binance WebSocket 实时监控价格 +价格监控服务 - 使用轮询方式获取实时价格(更稳定) """ import threading import time -import sys -import os -import logging +import requests from typing import Dict, List, Callable, Optional, Set from app.utils.logger import logger from app.config import get_settings -# 抑制 binance 库的 WebSocket 错误日志(正确的 logger 名称) -logging.getLogger('binance.ws.threaded_stream').setLevel(logging.CRITICAL) -logging.getLogger('binance.ws.reconnecting_websocket').setLevel(logging.CRITICAL) -logging.getLogger('binance.ws').setLevel(logging.WARNING) # 只显示警告及以上 - - -class SuppressOutput: - """临时抑制 stdout/stderr 输出""" - def __init__(self, suppress_stderr=True, suppress_stdout=False): - self.suppress_stderr = suppress_stderr - self.suppress_stdout = suppress_stdout - self._stderr = None - self._stdout = None - self._devnull = None - - def __enter__(self): - self._devnull = open(os.devnull, 'w') - if self.suppress_stderr: - self._stderr = sys.stderr - sys.stderr = self._devnull - if self.suppress_stdout: - self._stdout = sys.stdout - sys.stdout = self._devnull - return self - - def __exit__(self, *args): - if self._stderr: - sys.stderr = self._stderr - if self._stdout: - sys.stdout = self._stdout - if self._devnull: - self._devnull.close() - class PriceMonitorService: - """实时价格监控服务""" + """实时价格监控服务(轮询模式)""" + + # Binance API + BASE_URL = "https://api.binance.com" def __init__(self): """初始化价格监控服务""" self.settings = get_settings() - self.twm = None self.running = False - self.subscribed_symbols: Dict[str, str] = {} # symbol -> stream_name + self.subscribed_symbols: Set[str] = set() self.price_callbacks: List[Callable[[str, float], None]] = [] self.latest_prices: Dict[str, float] = {} self._lock = threading.Lock() - self._pending_symbols: List[str] = [] # 待订阅的交易对 - self._reconnecting = False # 是否正在重连 - self._desired_symbols: Set[str] = set() # 期望订阅的交易对(用于重连) - self._stop_requested = False # 是否请求停止(区分主动停止和意外断开) - self._last_message_time: Dict[str, float] = {} # 上次收到消息的时间 - self._health_check_thread = None + self._poll_thread: Optional[threading.Thread] = None + self._poll_interval = 3 # 轮询间隔(秒) + self._session = requests.Session() - logger.info("价格监控服务初始化完成") + logger.info("价格监控服务初始化完成(轮询模式)") def start(self): - """启动 WebSocket 管理器(在独立线程中)""" + """启动价格轮询""" if self.running: logger.warning("价格监控服务已在运行") return - self._stop_requested = False + self.running = True - def _start_in_thread(): - try: - # 延迟导入,避免在模块加载时就创建事件循环 - from binance import ThreadedWebsocketManager + def _poll_loop(): + logger.info(f"价格轮询已启动,间隔 {self._poll_interval} 秒") + while self.running: + try: + self._fetch_prices() + except Exception as e: + logger.error(f"获取价格失败: {e}") - self.twm = ThreadedWebsocketManager( - api_key=self.settings.binance_api_key or "", - api_secret=self.settings.binance_api_secret or "" - ) - self.twm.start() - self.running = True - self._reconnecting = False - logger.info("WebSocket 管理器已启动") - - # 等待 WebSocket 完全启动 - time.sleep(1) - - # 订阅待处理的交易对 - for symbol in self._pending_symbols: - self._do_subscribe(symbol) - self._pending_symbols.clear() - - # 重连时恢复之前的订阅 - for symbol in self._desired_symbols: - if symbol not in self.subscribed_symbols: - self._do_subscribe(symbol) - - # 启动健康检查 - self._start_health_check() - - except Exception as e: - logger.error(f"启动 WebSocket 管理器失败: {e}") - import traceback - logger.error(traceback.format_exc()) - # 启动失败,尝试重连 - if not self._stop_requested: - self._schedule_reconnect() - - # 在独立线程中启动 - thread = threading.Thread(target=_start_in_thread, daemon=True) - thread.start() - - def _start_health_check(self): - """启动健康检查线程""" - def _check_health(): - while self.running and not self._stop_requested: - time.sleep(30) # 每30秒检查一次 - - if not self.running or self._stop_requested: - break - - # 检查是否有超过60秒没收到消息的交易对 - now = time.time() - for symbol in list(self._desired_symbols): - last_time = self._last_message_time.get(symbol, now) - if now - last_time > 60: - logger.warning(f"{symbol} 超过60秒未收到数据,触发重连") - self._schedule_reconnect() + # 等待下一次轮询 + for _ in range(self._poll_interval * 10): + if not self.running: break + time.sleep(0.1) - self._health_check_thread = threading.Thread(target=_check_health, daemon=True) - self._health_check_thread.start() + self._poll_thread = threading.Thread(target=_poll_loop, daemon=True) + self._poll_thread.start() def stop(self): - """停止 WebSocket 管理器""" - # 标记为主动停止 - self._stop_requested = True - + """停止价格轮询""" if not self.running: return - # 先标记为停止,防止回调继续处理 self.running = False + logger.info("价格监控服务已停止") + def _fetch_prices(self): + """获取所有订阅交易对的价格""" + if not self.subscribed_symbols: + return + + symbols = list(self.subscribed_symbols) + + # 如果只有少量交易对,逐个获取 + if len(symbols) <= 3: + for symbol in symbols: + self._fetch_single_price(symbol) + else: + # 批量获取所有价格 + self._fetch_all_prices(symbols) + + def _fetch_single_price(self, symbol: str): + """获取单个交易对价格""" try: - # 抑制关闭时的错误输出(binance 库用 print 输出错误) - with SuppressOutput(suppress_stderr=True, suppress_stdout=True): - # 先停止所有 socket 订阅 - if self.twm: - for _, stream_name in list(self.subscribed_symbols.items()): - try: - self.twm.stop_socket(stream_name) - except: - pass - - # 等待一小段时间让 socket 关闭 - time.sleep(0.5) - - # 然后停止管理器 - try: - self.twm.stop() - except: - pass - - self.subscribed_symbols.clear() - self._desired_symbols.clear() - self._last_message_time.clear() - logger.info("WebSocket 管理器已停止") + url = f"{self.BASE_URL}/api/v3/ticker/price" + response = self._session.get(url, params={'symbol': symbol}, timeout=5) + response.raise_for_status() + data = response.json() + price = float(data['price']) + self._update_price(symbol, price) except Exception as e: - # 忽略关闭时的错误 - pass + logger.debug(f"获取 {symbol} 价格失败: {e}") - def _schedule_reconnect(self, delay: int = 5): - """安排重连""" - if self._stop_requested or self._reconnecting: - return + def _fetch_all_prices(self, symbols: List[str]): + """批量获取价格""" + try: + url = f"{self.BASE_URL}/api/v3/ticker/price" + response = self._session.get(url, timeout=10) + response.raise_for_status() + all_prices = response.json() - self._reconnecting = True - logger.warning(f"WebSocket 连接断开,{delay} 秒后尝试重连...") + # 过滤出订阅的交易对 + symbol_set = set(symbols) + for item in all_prices: + symbol = item['symbol'] + if symbol in symbol_set: + price = float(item['price']) + self._update_price(symbol, price) + except Exception as e: + logger.debug(f"批量获取价格失败: {e}") - def _reconnect(): - time.sleep(delay) - if not self._stop_requested: - self._do_reconnect() + def _update_price(self, symbol: str, price: float): + """更新价格并触发回调""" + old_price = self.latest_prices.get(symbol) - thread = threading.Thread(target=_reconnect, daemon=True) - thread.start() + # 只有价格变化时才触发回调 + if old_price != price: + self.latest_prices[symbol] = price - def _do_reconnect(self): - """执行重连""" - if self._stop_requested: - return + # 调用所有注册的回调函数 + with self._lock: + callbacks = self.price_callbacks.copy() - logger.info("正在重新连接 WebSocket...") - - # 清理旧连接(抑制错误输出) - with SuppressOutput(suppress_stderr=True, suppress_stdout=True): - try: - if self.twm: - self.twm.stop() - except: - pass - - self.twm = None - self.running = False - self.subscribed_symbols.clear() - - # 重新启动 - self.start() + for callback in callbacks: + try: + callback(symbol, price) + except Exception as e: + logger.error(f"价格回调执行出错: {e}") def subscribe_symbol(self, symbol: str): """ @@ -222,52 +134,27 @@ class PriceMonitorService: """ symbol = symbol.upper() - # 记录期望订阅的交易对(用于重连恢复) - self._desired_symbols.add(symbol) - if symbol in self.subscribed_symbols: logger.debug(f"已订阅 {symbol}") return + self.subscribed_symbols.add(symbol) + logger.info(f"已订阅 {symbol} 价格更新") + + # 如果服务未启动,自动启动 if not self.running: - # 如果还没启动,先加入待订阅列表,然后启动 - if symbol not in self._pending_symbols: - self._pending_symbols.append(symbol) self.start() - return - self._do_subscribe(symbol) - - def _do_subscribe(self, symbol: str): - """实际执行订阅""" - if not self.twm or not self.running: - return - - try: - stream_name = self.twm.start_symbol_ticker_socket( - callback=self._handle_price_update, - symbol=symbol - ) - self.subscribed_symbols[symbol] = stream_name - self._last_message_time[symbol] = time.time() - logger.info(f"已订阅 {symbol} 价格更新") - except Exception as e: - logger.error(f"订阅 {symbol} 失败: {e}") + # 立即获取一次价格 + self._fetch_single_price(symbol) def unsubscribe_symbol(self, symbol: str): """取消订阅交易对""" symbol = symbol.upper() - if symbol not in self.subscribed_symbols: - return - - try: - stream_name = self.subscribed_symbols[symbol] - self.twm.stop_socket(stream_name) - del self.subscribed_symbols[symbol] - self._desired_symbols.discard(symbol) + if symbol in self.subscribed_symbols: + self.subscribed_symbols.discard(symbol) + self.latest_prices.pop(symbol, None) logger.info(f"已取消订阅 {symbol}") - except Exception as e: - logger.error(f"取消订阅 {symbol} 失败: {e}") def add_price_callback(self, callback: Callable[[str, float], None]): """ @@ -286,70 +173,27 @@ class PriceMonitorService: if callback in self.price_callbacks: self.price_callbacks.remove(callback) - def _handle_price_update(self, msg: Dict): - """处理 WebSocket 价格更新消息""" - # 如果服务已停止或正在重连,忽略消息 - if not self.running or self._reconnecting or self._stop_requested: - return - - try: - # 检查错误消息 - if msg.get('e') == 'error': - error_type = msg.get('type', '') - error_msg = str(msg.get('m', '')) - - # 这些错误通常是正常的连接关闭,不需要记录 - ignored_errors = ['ReadLoopClosed', 'ConnectionClosed', 'WebSocketClosed', 'read loop'] - if error_type in ignored_errors or any(e.lower() in error_msg.lower() for e in ignored_errors): - # 如果不是主动停止,触发重连 - if not self._stop_requested and self.running: - self.running = False - self._schedule_reconnect() - return - - # 其他错误记录日志(但不刷屏) - if self.running and not self._stop_requested: - logger.warning(f"WebSocket 消息: {msg}") - return - - symbol = msg.get('s') # 交易对 - price_str = msg.get('c') # 最新价格 - - if not symbol or not price_str: - return - - price = float(price_str) - - # 更新最新价格缓存和消息时间 - self.latest_prices[symbol] = price - self._last_message_time[symbol] = time.time() - - # 调用所有注册的回调函数 - with self._lock: - callbacks = self.price_callbacks.copy() - - for callback in callbacks: - try: - callback(symbol, price) - except Exception as e: - logger.error(f"价格回调执行出错: {e}") - - except Exception as e: - if self.running and not self._stop_requested: - logger.error(f"处理价格更新出错: {e}") - def get_latest_price(self, symbol: str) -> Optional[float]: """获取交易对的最新缓存价格""" return self.latest_prices.get(symbol.upper()) + def get_all_prices(self) -> Dict[str, float]: + """获取所有订阅交易对的最新价格""" + return self.latest_prices.copy() + def get_subscribed_symbols(self) -> List[str]: """获取已订阅的交易对列表""" - return list(self.subscribed_symbols.keys()) + return list(self.subscribed_symbols) def is_running(self) -> bool: """检查服务是否在运行""" return self.running + def set_poll_interval(self, seconds: int): + """设置轮询间隔(秒)""" + self._poll_interval = max(1, seconds) + logger.info(f"轮询间隔已设置为 {self._poll_interval} 秒") + # 全局单例 _price_monitor_service: Optional[PriceMonitorService] = None