stock-ai-agent/backend/app/services/bitget_trading_api_sdk.py
2026-02-23 00:22:17 +08:00

572 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Bitget 实盘交易 API (基于 CCXT SDK)
处理 Bitget 合约交易的所有 API 调用,包括:
- 下单(开仓/平仓)
- 撤单
- 查询订单
- 查询持仓
- 查询账户余额
- 设置杠杆
使用 CCXT 统一交易所接口,提供更稳定的 API 交互。
"""
import ccxt
from typing import Dict, List, Optional, Any
from datetime import datetime
from app.utils.logger import logger
class BitgetTradingAPI:
"""Bitget 实盘交易 API (基于 CCXT)"""
def __init__(self, api_key: str, api_secret: str, passphrase: str = "", use_testnet: bool = True):
"""
初始化 Bitget 交易 API
Args:
api_key: API Key
api_secret: API Secret
passphrase: API Passphrase (Bitget 不需要,保留兼容性)
use_testnet: 是否使用测试网
"""
self.api_key = api_key
self.api_secret = api_secret
self.use_testnet = use_testnet
# 创建 CCXT Bitget 实例
config = {
'apiKey': api_key,
'secret': api_secret,
'enableRateLimit': True, # 启用速率限制
'options': {
'defaultType': 'swap', # 使用永续合约 API
}
}
# CCXT Bitget 使用 password 字段存储 passphrase
# 如果提供了 passphrase添加到配置中
if passphrase:
config['password'] = passphrase
self.exchange = ccxt.bitget(config)
# 设置测试网sandbox 模式)
if use_testnet:
if hasattr(self.exchange, 'set_sandbox_mode'):
self.exchange.set_sandbox_mode(True)
logger.info("✅ Bitget 测试网模式已启用")
logger.info(f"Bitget 交易 API 初始化完成 ({'测试网' if use_testnet else '生产网'})")
# ==================== 订单操作 ====================
def place_order(self, symbol: str, side: str, order_type: str,
size: float, price: float = None, client_order_id: str = None) -> Optional[Dict]:
"""
下单
Args:
symbol: 交易对 (如 BTC/USDT:USDT)
side: 订单方向 (buy/sell)
order_type: 订单类型 (limit/market)
size: 数量(张数)
price: 价格(限价单必需)
client_order_id: 自定义订单ID
Returns:
订单信息
"""
try:
# CCXT 标准化交易对格式
ccxt_symbol = self._standardize_symbol(symbol)
# 构建订单参数
params = {}
if client_order_id:
params['clientOrderId'] = client_order_id
# 下单
if order_type == 'market':
order = self.exchange.create_market_order(
symbol=ccxt_symbol,
side=side,
amount=size,
params=params
)
else: # limit
if not price:
logger.error("限价单必须指定价格")
return None
order = self.exchange.create_limit_order(
symbol=ccxt_symbol,
side=side,
amount=size,
price=price,
params=params
)
logger.info(f"✅ 下单成功: {symbol} {side} {size}张 @ {price or '市价'}")
logger.debug(f"订单详情: {order}")
return order
except ccxt.BaseError as e:
logger.error(f"❌ 下单失败: {e}")
return None
except Exception as e:
logger.error(f"❌ 下单异常: {e}")
return None
def cancel_order(self, symbol: str, order_id: str = None, client_order_id: str = None) -> bool:
"""
撤单
Args:
symbol: 交易对
order_id: 订单ID
client_order_id: 自定义订单ID
Returns:
是否成功
"""
try:
ccxt_symbol = self._standardize_symbol(symbol)
if order_id:
self.exchange.cancel_order(order_id, ccxt_symbol)
elif client_order_id:
self.exchange.cancel_order_by_client_order_id(client_order_id, ccxt_symbol)
else:
logger.error("必须提供 order_id 或 client_order_id")
return False
logger.info(f"✅ 撤单成功: {symbol} order_id={order_id or client_order_id}")
return True
except ccxt.BaseError as e:
logger.error(f"❌ 撤单失败: {e}")
return False
except Exception as e:
logger.error(f"❌ 撤单异常: {e}")
return False
def cancel_all_orders(self, symbol: str) -> bool:
"""
撤销所有挂单
Args:
symbol: 交易对
Returns:
是否成功
"""
try:
ccxt_symbol = self._standardize_symbol(symbol)
self.exchange.cancel_all_orders(ccxt_symbol)
logger.info(f"✅ 撤销所有挂单成功: {symbol}")
return True
except ccxt.BaseError as e:
logger.error(f"❌ 撤销所有挂单失败: {e}")
return False
except Exception as e:
logger.error(f"❌ 撤销所有挂单异常: {e}")
return False
def close_position(self, symbol: str, side: str, size: float = None,
price: float = None) -> Optional[Dict]:
"""
平仓
Args:
symbol: 交易对
side: 平仓方向 (buy=平空仓/sell=平多仓)
size: 平仓数量(不传则全部平仓)
price: 平仓价格(不传则市价)
Returns:
订单信息
"""
try:
ccxt_symbol = self._standardize_symbol(symbol)
# 获取当前持仓
positions = self.get_position(symbol)
if not positions:
logger.warning(f"没有找到 {symbol} 的持仓")
return None
position = positions[0]
current_size = abs(float(position.get('contracts', 0)))
if current_size == 0:
logger.warning(f"{symbol} 持仓数量为 0无需平仓")
return None
# 如果没有指定平仓数量,则全部平仓
close_size = size if size else current_size
# 执行平仓
order_type = 'limit' if price else 'market'
order = self.place_order(
symbol=symbol,
side=side,
order_type=order_type,
size=close_size,
price=price
)
if order:
logger.info(f"✅ 平仓成功: {symbol} {side} {close_size}")
return order
return None
except Exception as e:
logger.error(f"❌ 平仓异常: {e}")
return None
# ==================== 查询操作 ====================
def get_order(self, symbol: str, order_id: str = None, client_order_id: str = None) -> Optional[Dict]:
"""
查询订单
Args:
symbol: 交易对
order_id: 订单ID
client_order_id: 自定义订单ID
Returns:
订单信息
"""
try:
ccxt_symbol = self._standardize_symbol(symbol)
if order_id:
order = self.exchange.fetch_order(order_id, ccxt_symbol)
elif client_order_id:
order = self.exchange.fetch_order_by_client_order_id(client_order_id, ccxt_symbol)
else:
logger.error("必须提供 order_id 或 client_order_id")
return None
return order
except ccxt.BaseError as e:
logger.error(f"❌ 查询订单失败: {e}")
return None
except Exception as e:
logger.error(f"❌ 查询订单异常: {e}")
return None
def get_open_orders(self, symbol: str = None) -> List[Dict]:
"""
查询当前挂单
Args:
symbol: 交易对(不传则查询所有)
Returns:
挂单列表
"""
try:
ccxt_symbol = self._standardize_symbol(symbol) if symbol else None
orders = self.exchange.fetch_open_orders(ccxt_symbol)
logger.debug(f"查询到 {len(orders)} 个挂单")
return orders
except ccxt.BaseError as e:
logger.error(f"❌ 查询挂单失败: {e}")
return []
except Exception as e:
logger.error(f"❌ 查询挂单异常: {e}")
return []
def get_history_orders(self, symbol: str, limit: int = 100) -> List[Dict]:
"""
查询历史订单
Args:
symbol: 交易对
limit: 返回数量
Returns:
历史订单列表
"""
try:
ccxt_symbol = self._standardize_symbol(symbol)
orders = self.exchange.fetch_my_trades(ccxt_symbol, limit=limit)
logger.debug(f"查询到 {len(orders)} 条历史订单")
return orders
except ccxt.BaseError as e:
logger.error(f"❌ 查询历史订单失败: {e}")
return []
except Exception as e:
logger.error(f"❌ 查询历史订单异常: {e}")
return []
# ==================== 持仓操作 ====================
def get_position(self, symbol: str = None) -> List[Dict]:
"""
查询持仓
Args:
symbol: 交易对(不传则查询所有)
Returns:
持仓列表
"""
try:
# 获取所有持仓
positions = self.exchange.fetch_positions()
# 筛选非零持仓
active_positions = []
for pos in positions:
size = float(pos.get('contracts', 0))
if size != 0:
# 如果指定了交易对,进行过滤
if symbol:
ccxt_symbol = self._standardize_symbol(symbol)
if pos['symbol'] == ccxt_symbol:
active_positions.append(pos)
else:
active_positions.append(pos)
logger.debug(f"查询到 {len(active_positions)} 个持仓")
return active_positions
except ccxt.BaseError as e:
logger.error(f"❌ 查询持仓失败: {e}")
return []
except Exception as e:
logger.error(f"❌ 查询持仓异常: {e}")
return []
def get_single_position(self, symbol: str) -> Optional[Dict]:
"""
查询单个交易对的持仓
Args:
symbol: 交易对
Returns:
持仓信息
"""
positions = self.get_position(symbol)
if positions:
return positions[0]
return None
def get_closed_orders(self, symbol: str = None, limit: int = 100) -> List[Dict]:
"""
查询历史成交订单(从交易所获取)
Args:
symbol: 交易对(不传则查询所有)
limit: 返回数量
Returns:
历史订单列表
"""
try:
# 使用 CCXT 的 fetch_closed_orders 或 fetch_my_trades
if symbol:
ccxt_symbol = self._standardize_symbol(symbol)
orders = self.exchange.fetch_closed_orders(ccxt_symbol, limit=limit)
else:
orders = self.exchange.fetch_closed_orders(limit=limit)
logger.debug(f"查询到 {len(orders)} 条历史订单")
return orders
except ccxt.BaseError as e:
logger.error(f"❌ 查询历史订单失败: {e}")
return []
except Exception as e:
logger.error(f"❌ 查询历史订单异常: {e}")
return []
# ==================== 账户操作 ====================
def get_balance(self) -> Dict:
"""
查询账户余额
Returns:
余额信息 {USDT: {available: "...", frozen: "...", locked: "..."}}
"""
try:
balance = self.exchange.fetch_balance()
# 转换为统一格式
result = {}
for currency, info in balance.get('total', {}).items():
if info and info > 0: # 只返回有余额的币种
result[currency] = {
'available': str(balance.get('free', {}).get(currency, 0)),
'frozen': str(balance.get('used', {}).get(currency, 0)),
'locked': '0'
}
logger.debug(f"账户余额: {result}")
return result
except ccxt.BaseError as e:
logger.error(f"❌ 查询余额失败: {e}")
return {}
except Exception as e:
logger.error(f"❌ 查询余额异常: {e}")
return {}
def get_account_info(self) -> Dict:
"""
查询账户信息
Returns:
账户信息
"""
try:
balance = self.exchange.fetch_balance()
return balance
except ccxt.BaseError as e:
logger.error(f"❌ 查询账户信息失败: {e}")
return {}
except Exception as e:
logger.error(f"❌ 查询账户信息异常: {e}")
return {}
def set_leverage(self, symbol: str, leverage: int, hold_side: str = "long") -> bool:
"""
设置杠杆倍数
Args:
symbol: 交易对
leverage: 杠杆倍数 (1-125)
hold_side: 持仓方向 (long/short) - CCXT 通常设置为 both
Returns:
是否成功
"""
try:
ccxt_symbol = self._standardize_symbol(symbol)
# CCXT 设置杠杆
self.exchange.set_leverage(leverage, ccxt_symbol)
logger.info(f"✅ 设置杠杆成功: {symbol} {leverage}x")
return True
except ccxt.BaseError as e:
logger.error(f"❌ 设置杠杆失败: {e}")
return False
except Exception as e:
logger.error(f"❌ 设置杠杆异常: {e}")
return False
# ==================== 辅助方法 ====================
def _standardize_symbol(self, symbol: str) -> str:
"""
标准化交易对格式为 CCXT 格式
Args:
symbol: 原始交易对 (如 BTCUSDT)
Returns:
CCXT 标准格式 (如 BTC/USDT:USDT)
"""
# 如果已经是 CCXT 格式,直接返回
if '/' in symbol:
return symbol
# 简单的转换逻辑(可以根据实际情况扩展)
# 例如BTCUSDT -> BTC/USDT:USDT
if symbol.endswith('USDT'):
base = symbol[:-4] # 去掉 USDT
return f"{base}/USDT:USDT"
# 默认返回原值
return symbol
def test_connection(self) -> bool:
"""
测试 API 连接
Returns:
是否连接成功
"""
try:
balance = self.get_balance()
if balance is not None:
usdt_balance = balance.get('USDT', {}).get('available', 'N/A')
logger.info(f"✅ API 连接成功USDT 余额: {usdt_balance}")
return True
return False
except Exception as e:
logger.error(f"❌ API 连接失败: {e}")
return False
def close(self):
"""关闭连接"""
if self.exchange:
# CCXT exchange 对象不需要显式关闭
# 但可以清理 WebSocket 连接(如果有的话)
try:
if hasattr(self.exchange, 'close'):
self.exchange.close()
except Exception as e:
logger.debug(f"关闭连接时出错(可忽略): {e}")
logger.info("Bitget API 连接已关闭")
# 全局实例(延迟初始化)
_trading_api: Optional[BitgetTradingAPI] = None
def get_bitget_trading_api() -> Optional[BitgetTradingAPI]:
"""
获取 Bitget 交易 API 实例(单例)
Returns:
BitgetTradingAPI 实例或 None如果未配置
"""
global _trading_api
if _trading_api:
return _trading_api
from app.config import get_settings
settings = get_settings()
# 检查是否配置了 API Key
if not settings.bitget_api_key or not settings.bitget_api_secret:
logger.warning("Bitget API Key 未配置,实盘交易功能不可用")
return None
# 创建实例
_trading_api = BitgetTradingAPI(
api_key=settings.bitget_api_key,
api_secret=settings.bitget_api_secret,
passphrase=settings.bitget_passphrase,
use_testnet=settings.bitget_use_testnet
)
return _trading_api
def reset_bitget_trading_api():
"""重置全局实例(用于测试或配置更新)"""
global _trading_api
if _trading_api:
_trading_api.close()
_trading_api = None
logger.info("Bitget API 实例已重置")