This commit is contained in:
aaron 2026-02-07 01:03:12 +08:00
parent 5107b0ed09
commit 17995c0a0b

View File

@ -1,217 +1,129 @@
""" """
价格监控服务 - 使用 Binance WebSocket 实时监控价格 价格监控服务 - 使用轮询方式获取实时价格更稳定
""" """
import threading import threading
import time import time
import sys import requests
import os
import logging
from typing import Dict, List, Callable, Optional, Set from typing import Dict, List, Callable, Optional, Set
from app.utils.logger import logger from app.utils.logger import logger
from app.config import get_settings from app.config import get_settings
# 抑制 binance 库的 WebSocket 错误日志(正确的 logger 名称)
logging.getLogger('binance.ws.threaded_stream').setLevel(logging.CRITICAL)
logging.getLogger('binance.ws.reconnecting_websocket').setLevel(logging.CRITICAL)
logging.getLogger('binance.ws').setLevel(logging.WARNING) # 只显示警告及以上
class SuppressOutput:
"""临时抑制 stdout/stderr 输出"""
def __init__(self, suppress_stderr=True, suppress_stdout=False):
self.suppress_stderr = suppress_stderr
self.suppress_stdout = suppress_stdout
self._stderr = None
self._stdout = None
self._devnull = None
def __enter__(self):
self._devnull = open(os.devnull, 'w')
if self.suppress_stderr:
self._stderr = sys.stderr
sys.stderr = self._devnull
if self.suppress_stdout:
self._stdout = sys.stdout
sys.stdout = self._devnull
return self
def __exit__(self, *args):
if self._stderr:
sys.stderr = self._stderr
if self._stdout:
sys.stdout = self._stdout
if self._devnull:
self._devnull.close()
class PriceMonitorService: class PriceMonitorService:
"""实时价格监控服务""" """实时价格监控服务(轮询模式)"""
# Binance API
BASE_URL = "https://api.binance.com"
def __init__(self): def __init__(self):
"""初始化价格监控服务""" """初始化价格监控服务"""
self.settings = get_settings() self.settings = get_settings()
self.twm = None
self.running = False self.running = False
self.subscribed_symbols: Dict[str, str] = {} # symbol -> stream_name self.subscribed_symbols: Set[str] = set()
self.price_callbacks: List[Callable[[str, float], None]] = [] self.price_callbacks: List[Callable[[str, float], None]] = []
self.latest_prices: Dict[str, float] = {} self.latest_prices: Dict[str, float] = {}
self._lock = threading.Lock() self._lock = threading.Lock()
self._pending_symbols: List[str] = [] # 待订阅的交易对 self._poll_thread: Optional[threading.Thread] = None
self._reconnecting = False # 是否正在重连 self._poll_interval = 3 # 轮询间隔(秒)
self._desired_symbols: Set[str] = set() # 期望订阅的交易对(用于重连) self._session = requests.Session()
self._stop_requested = False # 是否请求停止(区分主动停止和意外断开)
self._last_message_time: Dict[str, float] = {} # 上次收到消息的时间
self._health_check_thread = None
logger.info("价格监控服务初始化完成") logger.info("价格监控服务初始化完成(轮询模式)")
def start(self): def start(self):
"""启动 WebSocket 管理器(在独立线程中)""" """启动价格轮询"""
if self.running: if self.running:
logger.warning("价格监控服务已在运行") logger.warning("价格监控服务已在运行")
return return
self._stop_requested = False
def _start_in_thread():
try:
# 延迟导入,避免在模块加载时就创建事件循环
from binance import ThreadedWebsocketManager
self.twm = ThreadedWebsocketManager(
api_key=self.settings.binance_api_key or "",
api_secret=self.settings.binance_api_secret or ""
)
self.twm.start()
self.running = True self.running = True
self._reconnecting = False
logger.info("WebSocket 管理器已启动")
# 等待 WebSocket 完全启动
time.sleep(1)
# 订阅待处理的交易对
for symbol in self._pending_symbols:
self._do_subscribe(symbol)
self._pending_symbols.clear()
# 重连时恢复之前的订阅
for symbol in self._desired_symbols:
if symbol not in self.subscribed_symbols:
self._do_subscribe(symbol)
# 启动健康检查
self._start_health_check()
def _poll_loop():
logger.info(f"价格轮询已启动,间隔 {self._poll_interval}")
while self.running:
try:
self._fetch_prices()
except Exception as e: except Exception as e:
logger.error(f"启动 WebSocket 管理器失败: {e}") logger.error(f"获取价格失败: {e}")
import traceback
logger.error(traceback.format_exc())
# 启动失败,尝试重连
if not self._stop_requested:
self._schedule_reconnect()
# 在独立线程中启动 # 等待下一次轮询
thread = threading.Thread(target=_start_in_thread, daemon=True) for _ in range(self._poll_interval * 10):
thread.start() if not self.running:
def _start_health_check(self):
"""启动健康检查线程"""
def _check_health():
while self.running and not self._stop_requested:
time.sleep(30) # 每30秒检查一次
if not self.running or self._stop_requested:
break break
time.sleep(0.1)
# 检查是否有超过60秒没收到消息的交易对 self._poll_thread = threading.Thread(target=_poll_loop, daemon=True)
now = time.time() self._poll_thread.start()
for symbol in list(self._desired_symbols):
last_time = self._last_message_time.get(symbol, now)
if now - last_time > 60:
logger.warning(f"{symbol} 超过60秒未收到数据触发重连")
self._schedule_reconnect()
break
self._health_check_thread = threading.Thread(target=_check_health, daemon=True)
self._health_check_thread.start()
def stop(self): def stop(self):
"""停止 WebSocket 管理器""" """停止价格轮询"""
# 标记为主动停止
self._stop_requested = True
if not self.running: if not self.running:
return return
# 先标记为停止,防止回调继续处理
self.running = False self.running = False
logger.info("价格监控服务已停止")
def _fetch_prices(self):
"""获取所有订阅交易对的价格"""
if not self.subscribed_symbols:
return
symbols = list(self.subscribed_symbols)
# 如果只有少量交易对,逐个获取
if len(symbols) <= 3:
for symbol in symbols:
self._fetch_single_price(symbol)
else:
# 批量获取所有价格
self._fetch_all_prices(symbols)
def _fetch_single_price(self, symbol: str):
"""获取单个交易对价格"""
try: try:
# 抑制关闭时的错误输出binance 库用 print 输出错误) url = f"{self.BASE_URL}/api/v3/ticker/price"
with SuppressOutput(suppress_stderr=True, suppress_stdout=True): response = self._session.get(url, params={'symbol': symbol}, timeout=5)
# 先停止所有 socket 订阅 response.raise_for_status()
if self.twm: data = response.json()
for _, stream_name in list(self.subscribed_symbols.items()): price = float(data['price'])
try: self._update_price(symbol, price)
self.twm.stop_socket(stream_name)
except:
pass
# 等待一小段时间让 socket 关闭
time.sleep(0.5)
# 然后停止管理器
try:
self.twm.stop()
except:
pass
self.subscribed_symbols.clear()
self._desired_symbols.clear()
self._last_message_time.clear()
logger.info("WebSocket 管理器已停止")
except Exception as e: except Exception as e:
# 忽略关闭时的错误 logger.debug(f"获取 {symbol} 价格失败: {e}")
pass
def _schedule_reconnect(self, delay: int = 5): def _fetch_all_prices(self, symbols: List[str]):
"""安排重连""" """批量获取价格"""
if self._stop_requested or self._reconnecting:
return
self._reconnecting = True
logger.warning(f"WebSocket 连接断开,{delay} 秒后尝试重连...")
def _reconnect():
time.sleep(delay)
if not self._stop_requested:
self._do_reconnect()
thread = threading.Thread(target=_reconnect, daemon=True)
thread.start()
def _do_reconnect(self):
"""执行重连"""
if self._stop_requested:
return
logger.info("正在重新连接 WebSocket...")
# 清理旧连接(抑制错误输出)
with SuppressOutput(suppress_stderr=True, suppress_stdout=True):
try: try:
if self.twm: url = f"{self.BASE_URL}/api/v3/ticker/price"
self.twm.stop() response = self._session.get(url, timeout=10)
except: response.raise_for_status()
pass all_prices = response.json()
self.twm = None # 过滤出订阅的交易对
self.running = False symbol_set = set(symbols)
self.subscribed_symbols.clear() for item in all_prices:
symbol = item['symbol']
if symbol in symbol_set:
price = float(item['price'])
self._update_price(symbol, price)
except Exception as e:
logger.debug(f"批量获取价格失败: {e}")
# 重新启动 def _update_price(self, symbol: str, price: float):
self.start() """更新价格并触发回调"""
old_price = self.latest_prices.get(symbol)
# 只有价格变化时才触发回调
if old_price != price:
self.latest_prices[symbol] = price
# 调用所有注册的回调函数
with self._lock:
callbacks = self.price_callbacks.copy()
for callback in callbacks:
try:
callback(symbol, price)
except Exception as e:
logger.error(f"价格回调执行出错: {e}")
def subscribe_symbol(self, symbol: str): def subscribe_symbol(self, symbol: str):
""" """
@ -222,52 +134,27 @@ class PriceMonitorService:
""" """
symbol = symbol.upper() symbol = symbol.upper()
# 记录期望订阅的交易对(用于重连恢复)
self._desired_symbols.add(symbol)
if symbol in self.subscribed_symbols: if symbol in self.subscribed_symbols:
logger.debug(f"已订阅 {symbol}") logger.debug(f"已订阅 {symbol}")
return return
if not self.running: self.subscribed_symbols.add(symbol)
# 如果还没启动,先加入待订阅列表,然后启动
if symbol not in self._pending_symbols:
self._pending_symbols.append(symbol)
self.start()
return
self._do_subscribe(symbol)
def _do_subscribe(self, symbol: str):
"""实际执行订阅"""
if not self.twm or not self.running:
return
try:
stream_name = self.twm.start_symbol_ticker_socket(
callback=self._handle_price_update,
symbol=symbol
)
self.subscribed_symbols[symbol] = stream_name
self._last_message_time[symbol] = time.time()
logger.info(f"已订阅 {symbol} 价格更新") logger.info(f"已订阅 {symbol} 价格更新")
except Exception as e:
logger.error(f"订阅 {symbol} 失败: {e}") # 如果服务未启动,自动启动
if not self.running:
self.start()
# 立即获取一次价格
self._fetch_single_price(symbol)
def unsubscribe_symbol(self, symbol: str): def unsubscribe_symbol(self, symbol: str):
"""取消订阅交易对""" """取消订阅交易对"""
symbol = symbol.upper() symbol = symbol.upper()
if symbol not in self.subscribed_symbols: if symbol in self.subscribed_symbols:
return self.subscribed_symbols.discard(symbol)
self.latest_prices.pop(symbol, None)
try:
stream_name = self.subscribed_symbols[symbol]
self.twm.stop_socket(stream_name)
del self.subscribed_symbols[symbol]
self._desired_symbols.discard(symbol)
logger.info(f"已取消订阅 {symbol}") logger.info(f"已取消订阅 {symbol}")
except Exception as e:
logger.error(f"取消订阅 {symbol} 失败: {e}")
def add_price_callback(self, callback: Callable[[str, float], None]): def add_price_callback(self, callback: Callable[[str, float], None]):
""" """
@ -286,70 +173,27 @@ class PriceMonitorService:
if callback in self.price_callbacks: if callback in self.price_callbacks:
self.price_callbacks.remove(callback) self.price_callbacks.remove(callback)
def _handle_price_update(self, msg: Dict):
"""处理 WebSocket 价格更新消息"""
# 如果服务已停止或正在重连,忽略消息
if not self.running or self._reconnecting or self._stop_requested:
return
try:
# 检查错误消息
if msg.get('e') == 'error':
error_type = msg.get('type', '')
error_msg = str(msg.get('m', ''))
# 这些错误通常是正常的连接关闭,不需要记录
ignored_errors = ['ReadLoopClosed', 'ConnectionClosed', 'WebSocketClosed', 'read loop']
if error_type in ignored_errors or any(e.lower() in error_msg.lower() for e in ignored_errors):
# 如果不是主动停止,触发重连
if not self._stop_requested and self.running:
self.running = False
self._schedule_reconnect()
return
# 其他错误记录日志(但不刷屏)
if self.running and not self._stop_requested:
logger.warning(f"WebSocket 消息: {msg}")
return
symbol = msg.get('s') # 交易对
price_str = msg.get('c') # 最新价格
if not symbol or not price_str:
return
price = float(price_str)
# 更新最新价格缓存和消息时间
self.latest_prices[symbol] = price
self._last_message_time[symbol] = time.time()
# 调用所有注册的回调函数
with self._lock:
callbacks = self.price_callbacks.copy()
for callback in callbacks:
try:
callback(symbol, price)
except Exception as e:
logger.error(f"价格回调执行出错: {e}")
except Exception as e:
if self.running and not self._stop_requested:
logger.error(f"处理价格更新出错: {e}")
def get_latest_price(self, symbol: str) -> Optional[float]: def get_latest_price(self, symbol: str) -> Optional[float]:
"""获取交易对的最新缓存价格""" """获取交易对的最新缓存价格"""
return self.latest_prices.get(symbol.upper()) return self.latest_prices.get(symbol.upper())
def get_all_prices(self) -> Dict[str, float]:
"""获取所有订阅交易对的最新价格"""
return self.latest_prices.copy()
def get_subscribed_symbols(self) -> List[str]: def get_subscribed_symbols(self) -> List[str]:
"""获取已订阅的交易对列表""" """获取已订阅的交易对列表"""
return list(self.subscribed_symbols.keys()) return list(self.subscribed_symbols)
def is_running(self) -> bool: def is_running(self) -> bool:
"""检查服务是否在运行""" """检查服务是否在运行"""
return self.running return self.running
def set_poll_interval(self, seconds: int):
"""设置轮询间隔(秒)"""
self._poll_interval = max(1, seconds)
logger.info(f"轮询间隔已设置为 {self._poll_interval}")
# 全局单例 # 全局单例
_price_monitor_service: Optional[PriceMonitorService] = None _price_monitor_service: Optional[PriceMonitorService] = None