stock-ai-agent/backend/app/services/bitget_websocket.py
2026-02-25 23:28:04 +08:00

434 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Bitget WebSocket 实时价格服务
通过 WebSocket 订阅实时 ticker 价格更新
"""
import asyncio
import json
import logging
from typing import Dict, Callable, Optional, Set, Any
from datetime import datetime
try:
import websockets
WEBSOCKETS_AVAILABLE = True
except ImportError:
WEBSOCKETS_AVAILABLE = False
from app.utils.logger import logger
class BitgetWebSocketClient:
"""
Bitget WebSocket 客户端 - 实时价格订阅
使用异步 WebSocket 连接获取实时价格更新
"""
# Bitget WebSocket v2 端点
WS_URL = "wss://ws.bitget.com/v2/ws/public" # 公共频道 v2
# 心跳间隔(秒)
HEARTBEAT_INTERVAL = 25
# 重连间隔(秒)
RECONNECT_INTERVAL = 5
# 订阅限制:每个连接最多订阅 50 个交易对
MAX_SUBSCRIPTIONS = 50
def __init__(self):
"""初始化 WebSocket 客户端"""
if not WEBSOCKETS_AVAILABLE:
raise ImportError("需要安装 websockets 库: pip install websockets")
self._ws: Optional[websockets.WebSocketClientProtocol] = None
self._running = False
self._subscribed_symbols: Set[str] = set()
# 价格缓存
self._prices: Dict[str, float] = {}
# 回调函数
self._callbacks: Dict[str, Set[Callable]] = {}
# 心跳任务
self._heartbeat_task: Optional[asyncio.Task] = None
self._reconnect_task: Optional[asyncio.Task] = None
logger.info("Bitget WebSocket 客户端初始化完成")
async def connect(self) -> bool:
"""
连接到 Bitget WebSocket
Returns:
连接是否成功
"""
try:
logger.info(f"正在连接 Bitget WebSocket: {self.WS_URL}")
self._ws = await websockets.connect(
self.WS_URL,
ping_interval=self.HEARTBEAT_INTERVAL,
ping_timeout=10,
close_timeout=10
)
self._running = True
logger.info("✅ Bitget WebSocket 连接成功")
# 启动消息接收循环
asyncio.create_task(self._message_loop())
# 启动心跳任务
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
return True
except Exception as e:
logger.error(f"❌ Bitget WebSocket 连接失败: {e}")
return False
async def disconnect(self):
"""断开 WebSocket 连接"""
logger.info("正在断开 Bitget WebSocket...")
self._running = False
# 取消心跳任务
if self._heartbeat_task:
self._heartbeat_task.cancel()
self._heartbeat_task = None
# 取消重连任务
if self._reconnect_task:
self._reconnect_task.cancel()
self._reconnect_task = None
# 关闭 WebSocket 连接
if self._ws:
await self._ws.close()
self._ws = None
logger.info("Bitget WebSocket 已断开")
async def subscribe(self, symbols: list) -> bool:
"""
订阅交易对价格
Args:
symbols: 交易对列表,如 ['BTCUSDT', 'ETHUSDT']
Returns:
是否订阅成功
"""
if not self._ws or not self._running:
logger.warning("WebSocket 未连接,无法订阅")
return False
try:
# 构建订阅消息 (根据 Bitget 官方文档)
# USDT-FUTURES = USDT 永续合约
args = []
for symbol in symbols:
if symbol not in self._subscribed_symbols:
args.append({
"instType": "USDT-FUTURES",
"channel": "ticker",
"instId": symbol
})
self._subscribed_symbols.add(symbol)
if not args:
logger.info("所有交易对已订阅")
return True
message = {
"op": "subscribe",
"args": args
}
await self._ws.send(json.dumps(message))
logger.info(f"✅ 订阅 {len(args)} 个交易对: {[s['instId'] for s in args]}")
return True
except Exception as e:
logger.error(f"订阅失败: {e}")
return False
async def unsubscribe(self, symbols: list) -> bool:
"""
取消订阅交易对价格
Args:
symbols: 交易对列表
Returns:
是否取消成功
"""
if not self._ws or not self._running:
return False
try:
args = []
for symbol in symbols:
if symbol in self._subscribed_symbols:
args.append({
"instType": "USDT-FUTURES",
"channel": "ticker",
"instId": symbol
})
self._subscribed_symbols.discard(symbol)
if not args:
return True
message = {
"op": "unsubscribe",
"args": args
}
await self._ws.send(json.dumps(message))
logger.info(f"取消订阅 {len(args)} 个交易对")
return True
except Exception as e:
logger.error(f"取消订阅失败: {e}")
return False
def get_price(self, symbol: str) -> Optional[float]:
"""
获取交易对的最新价格(从缓存)
Args:
symbol: 交易对
Returns:
最新价格,如果未订阅则返回 None
"""
return self._prices.get(symbol)
def get_all_prices(self) -> Dict[str, float]:
"""
获取所有已订阅交易对的价格
Returns:
{symbol: price} 字典
"""
return self._prices.copy()
def on_price_update(self, symbol: str, callback: Callable[[str, float, Dict], None]):
"""
注册价格更新回调
Args:
symbol: 交易对,'*' 表示所有交易对
callback: 回调函数 (symbol, price, data) -> None
"""
if symbol not in self._callbacks:
self._callbacks[symbol] = set()
self._callbacks[symbol].add(callback)
logger.debug(f"注册价格回调: {symbol}")
def off_price_update(self, symbol: str, callback: Callable):
"""
取消价格更新回调
Args:
symbol: 交易对
callback: 回调函数
"""
if symbol in self._callbacks:
self._callbacks[symbol].discard(callback)
async def _message_loop(self):
"""消息接收循环"""
try:
async for message in self._ws:
await self._handle_message(message)
except websockets.ConnectionClosed:
logger.warning("WebSocket 连接已关闭")
if self._running:
# 自动重连
self._schedule_reconnect()
except Exception as e:
logger.error(f"消息循环错误: {e}")
if self._running:
self._schedule_reconnect()
async def _handle_message(self, message: str):
"""
处理接收到的消息 (v2 API 格式)
Args:
message: WebSocket 消息
"""
try:
data = json.loads(message)
# 调试:记录所有收到的消息
logger.info(f"📨 收到消息: action={data.get('action', 'unknown')}, event={data.get('event', 'none')}")
# v2 API: 订阅/取消订阅确认事件 (使用 event 字段)
if data.get('event') == 'subscribe':
logger.info(f"✅ 订阅确认: {data}")
return
if data.get('event') == 'unsubscribe':
logger.info(f"✅ 取消订阅确认: {data}")
return
if data.get('event') == 'error':
logger.error(f"❌ WebSocket 错误: {data}")
return
# v2 API: ticker 数据格式 (使用 action 字段)
# {"action": "snapshot" or "update", "data": [...], "arg": {...}}
if 'data' in data and isinstance(data['data'], list):
# 处理 data 数组中的每个 ticker
for ticker_item in data['data']:
if 'instId' in ticker_item or 'lastPr' in ticker_item:
self._process_ticker(ticker_item)
except json.JSONDecodeError:
logger.warning(f"无法解析消息: {message[:100]}")
except Exception as e:
logger.error(f"处理消息错误: {e}")
def _process_ticker(self, ticker: Dict[str, Any]):
"""
处理 ticker 数据 (v2 API 格式)
Args:
ticker: ticker 数据,包含 instId 和 lastPr 字段
"""
try:
# v2 API: 直接从 ticker 获取 instId 和 lastPr
symbol = ticker.get('instId', '')
price_str = ticker.get('lastPr', '0')
if not symbol:
logger.debug(f"跳过无效 ticker (无 instId): {ticker}")
return
try:
price = float(price_str)
except (ValueError, TypeError):
logger.debug(f"跳过无效 ticker (价格无效): symbol={symbol}, price_str={price_str}")
return
if price == 0:
logger.debug(f"跳过无效 ticker (价格为0): symbol={symbol}")
return
# 更新价格缓存
old_price = self._prices.get(symbol)
self._prices[symbol] = price
# 触发回调
self._trigger_callbacks(symbol, price, ticker)
# 价格变化日志 - 改为 info 级别方便调试
logger.info(f"💰 {symbol}: ${price:,.2f}")
if old_price and old_price != price:
change = ((price - old_price) / old_price) * 100
logger.debug(f" 变化: {change:+.2f}%")
except Exception as e:
logger.error(f"解析 ticker 数据错误: {e}")
def _trigger_callbacks(self, symbol: str, price: float, data: Dict):
"""
触发价格更新回调
Args:
symbol: 交易对
price: 价格
data: 完整的 ticker 数据
"""
# 触发该交易对的回调
if symbol in self._callbacks:
for callback in self._callbacks[symbol]:
try:
callback(symbol, price, data)
except Exception as e:
logger.error(f"回调函数错误 ({symbol}): {e}")
# 触发全局回调('*'
if '*' in self._callbacks:
for callback in self._callbacks['*']:
try:
callback(symbol, price, data)
except Exception as e:
logger.error(f"全局回调函数错误: {e}")
async def _heartbeat_loop(self):
"""心跳循环"""
while self._running and self._ws:
try:
await asyncio.sleep(self.HEARTBEAT_INTERVAL)
# 发送 ping
await self._ws.ping()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"心跳错误: {e}")
self._schedule_reconnect()
break
def _schedule_reconnect(self):
"""安排重连"""
if not self._running:
return
if self._reconnect_task and not self._reconnect_task.done():
return # 已经有重连任务在运行
async def reconnect():
await asyncio.sleep(self.RECONNECT_INTERVAL)
if self._running:
logger.info("尝试重新连接...")
await self.disconnect()
if await self.connect():
# 重新订阅之前的交易对
if self._subscribed_symbols:
await self.subscribe(list(self._subscribed_symbols))
self._reconnect_task = asyncio.create_task(reconnect())
@property
def is_connected(self) -> bool:
"""是否已连接"""
return self._ws is not None and self._running
@property
def subscribed_symbols(self) -> Set[str]:
"""已订阅的交易对"""
return self._subscribed_symbols.copy()
# 全局实例
_bitget_ws_client: Optional[BitgetWebSocketClient] = None
def get_bitget_ws_client() -> Optional[BitgetWebSocketClient]:
"""
获取 Bitget WebSocket 客户端单例
Returns:
WebSocket 客户端实例,如果 websockets 库未安装则返回 None
"""
global _bitget_ws_client
if _bitget_ws_client is None:
try:
_bitget_ws_client = BitgetWebSocketClient()
except ImportError:
logger.warning("websockets 库未安装WebSocket 功能不可用")
return None
return _bitget_ws_client