""" Bitget 实盘交易 API (基于 CCXT SDK) 处理 Bitget 合约交易的所有 API 调用,包括: - 下单(开仓/平仓) - 撤单 - 查询订单 - 查询持仓 - 查询账户余额 - 设置杠杆 使用 CCXT 统一交易所接口,提供更稳定的 API 交互。 """ import ccxt from typing import Dict, List, Optional, Any from datetime import datetime from app.utils.logger import logger class BitgetTradingAPI: """Bitget 实盘交易 API (基于 CCXT)""" def __init__(self, api_key: str, api_secret: str, passphrase: str = "", use_testnet: bool = True): """ 初始化 Bitget 交易 API Args: api_key: API Key api_secret: API Secret passphrase: API Passphrase (Bitget 不需要,保留兼容性) use_testnet: 是否使用测试网 """ self.api_key = api_key self.api_secret = api_secret self.use_testnet = use_testnet # 创建 CCXT Bitget 实例 config = { 'apiKey': api_key, 'secret': api_secret, 'enableRateLimit': True, # 启用速率限制 'options': { 'defaultType': 'swap', # 使用永续合约 API } } # CCXT Bitget 使用 password 字段存储 passphrase # 如果提供了 passphrase,添加到配置中 if passphrase: config['password'] = passphrase self.exchange = ccxt.bitget(config) # 设置测试网(手动配置 API 端点) if use_testnet: # Bitget 测试网端点 self.exchange.urls['api']['swap'] = 'https://capi.bitget.com' self.exchange.urls['api']['swapPrivate'] = 'https://capi.bitget.com' self.exchange.urls['api']['swapPublic'] = 'https://capi.bitget.com' # 同时设置通用的 API URL self.exchange.urls['api'] = { 'swap': 'https://capi.bitget.com', 'swapPrivate': 'https://capi.bitget.com', 'swapPublic': 'https://capi.bitget.com', } logger.info("✅ Bitget 测试网模式已启用 (capi.bitget.com)") logger.info(f"Bitget 交易 API 初始化完成 ({'测试网' if use_testnet else '生产网'})") # ==================== 订单操作 ==================== def place_order(self, symbol: str, side: str, order_type: str, size: float, price: float = None, client_order_id: str = None, stop_loss: float = None, take_profit: float = None) -> Optional[Dict]: """ 下单 Args: symbol: 交易对 (如 BTC/USDT:USDT) side: 订单方向 (buy/sell) order_type: 订单类型 (limit/market) size: 数量(张数) price: 价格(限价单必需) client_order_id: 自定义订单ID stop_loss: 止损价格(可选) take_profit: 止盈价格(可选) Returns: 订单信息 """ try: # CCXT 标准化交易对格式 ccxt_symbol = self._standardize_symbol(symbol) # 手动获取合约规格(不依赖 CCXT 的 contractSize) # Bitget 永续合约规格 if 'BTC' in symbol: contract_size = 0.01 # BTC 每张 0.01 BTC elif 'ETH' in symbol: contract_size = 0.1 # ETH 每张 0.1 ETH elif 'SOL' in symbol: contract_size = 1 # SOL 每张 1 SOL elif 'BNB' in symbol: contract_size = 0.1 # BNB 每张 0.1 BNB elif 'XRP' in symbol: contract_size = 10 # XRP 每张 10 XRP elif 'DOGE' in symbol: contract_size = 100 # DOGE 每张 100 DOGE elif 'MATIC' in symbol or 'POL' in symbol: contract_size = 10 # MATIC 每张 10 MATIC elif 'AVAX' in symbol: contract_size = 1 # AVAX 每张 1 AVAX elif 'LINK' in symbol: contract_size = 1 # LINK 每张 1 LINK elif 'UNI' in symbol: contract_size = 1 # UNI 每张 1 UNI elif 'ATOM' in symbol: contract_size = 1 # ATOM 每张 1 ATOM elif 'LTC' in symbol: contract_size = 0.1 # LTC 每张 0.1 LTC elif 'BCH' in symbol: contract_size = 0.1 # BCH 每张 0.1 BCH elif 'FIL' in symbol: contract_size = 1 # FIL 每张 1 FIL elif 'DOT' in symbol: contract_size = 1 # DOT 每张 1 DOT else: # 默认尝试从市场信息获取 try: market = self.exchange.market(ccxt_symbol) contract_size = market.get('contractSize', 1) logger.info(f"从市场信息获取合约规格: {contract_size}") except Exception: contract_size = 1 logger.warning(f"无法确定 {symbol} 的合约规格,使用默认值 1") # 计算实际下单数量(张数 × 合约规格) # Bitget 的 amount 参数是币的数量,不是张数 actual_amount = size * contract_size # 构建订单参数 # 单向持仓模式 + 联合保证金模式 params = { 'tdMode': 'cross', # 联合保证金模式(全仓) 'marginCoin': 'USDT', # 保证金币种 'holdMode': 'oneWay', # 单向持仓模式 } if client_order_id: params['clientOrderId'] = client_order_id # 添加止损止盈参数 if stop_loss: params['stopLoss'] = str(stop_loss) if take_profit: params['takeProfit'] = str(take_profit) # Bitget 合约交易特殊参数 params['holdMode'] = 'oneWay' # 单向持仓模式 # 调试:打印下单参数 logger.info(f"下单参数: symbol={ccxt_symbol}, type={order_type}, side={side}, 张数={size}, 实际amount={actual_amount}") logger.debug(f"完整参数: {params}") # 下单 - 直接使用通用 create_order 方法 order = self.exchange.create_order( symbol=ccxt_symbol, type=order_type, side=side, amount=actual_amount, # 使用实际币数量(张数 × 合约规格) price=price, params=params ) logger.info(f"✅ 下单成功: {symbol} {side} {size}张 @ {price or '市价'}") if stop_loss or take_profit: logger.info(f" 止损: {stop_loss}, 止盈: {take_profit}") logger.debug(f"订单详情: {order}") return order except ccxt.BaseError as e: logger.error(f"❌ 下单失败: {e}") return None except Exception as e: logger.error(f"❌ 下单异常: {e}") return None def cancel_order(self, symbol: str, order_id: str = None, client_order_id: str = None) -> bool: """ 撤单 Args: symbol: 交易对 order_id: 订单ID client_order_id: 自定义订单ID Returns: 是否成功 """ try: ccxt_symbol = self._standardize_symbol(symbol) if order_id: self.exchange.cancel_order(order_id, ccxt_symbol) elif client_order_id: self.exchange.cancel_order_by_client_order_id(client_order_id, ccxt_symbol) else: logger.error("必须提供 order_id 或 client_order_id") return False logger.info(f"✅ 撤单成功: {symbol} order_id={order_id or client_order_id}") return True except ccxt.BaseError as e: logger.error(f"❌ 撤单失败: {e}") return False except Exception as e: logger.error(f"❌ 撤单异常: {e}") return False def cancel_all_orders(self, symbol: str) -> bool: """ 撤销所有挂单 Args: symbol: 交易对 Returns: 是否成功 """ try: ccxt_symbol = self._standardize_symbol(symbol) self.exchange.cancel_all_orders(ccxt_symbol) logger.info(f"✅ 撤销所有挂单成功: {symbol}") return True except ccxt.BaseError as e: logger.error(f"❌ 撤销所有挂单失败: {e}") return False except Exception as e: logger.error(f"❌ 撤销所有挂单异常: {e}") return False def close_position(self, symbol: str, side: str = None, size: float = None, price: float = None) -> Optional[Dict]: """ 平仓(双向持仓模式) Args: symbol: 交易对 side: 平仓方向(可选,不传则自动判断) - None: 自动判断(查询持仓后决定) - 'buy': 平空仓 - 'sell': 平多仓 size: 平仓数量(不传则全部平仓) price: 平仓价格(不传则市价) Returns: 订单信息 """ try: ccxt_symbol = self._standardize_symbol(symbol) # 获取当前持仓 positions = self.get_position(symbol) if not positions: logger.warning(f"没有找到 {symbol} 的持仓") return None # 查找有持仓的仓位 position = None for pos in positions: contracts = float(pos.get('contracts', 0)) if contracts != 0: position = pos break if not position: logger.warning(f"{symbol} 持仓数量为 0,无需平仓") return None current_size = abs(float(position.get('contracts', 0))) pos_side = position.get('side') # 'long' or 'short' # 如果没有指定平仓方向,根据持仓方向自动判断 if side is None: # 多仓用 sell 平,空仓用 buy 平 side = 'sell' if pos_side == 'long' else 'buy' # 如果没有指定平仓数量,则全部平仓 close_size = size if size else current_size logger.info(f"平仓: {symbol} 持仓方向={pos_side}, 平仓方向={side}, 数量={close_size}") # 执行平仓 order_type = 'limit' if price else 'market' order = self.place_order( symbol=symbol, side=side, order_type=order_type, size=close_size, price=price ) if order: logger.info(f"✅ 平仓成功: {symbol} {side} {close_size}张") return order return None except Exception as e: logger.error(f"❌ 平仓异常: {e}") return None def set_trailing_stop(self, symbol: str, callback_rate: float = None, activation_price: float = None) -> bool: """ 设置移动止损 Args: symbol: 交易对 callback_rate: 回调比例(如 0.01 表示 1%) activation_price: 激活价格 Returns: 是否成功 """ try: ccxt_symbol = self._standardize_symbol(symbol) # 获取当前持仓 positions = self.get_position(symbol) if not positions: logger.warning(f"没有找到 {symbol} 的持仓") return False position = positions[0] current_size = float(position.get('contracts', 0)) if current_size == 0: logger.warning(f"{symbol} 持仓数量为 0") return False # 使用 CCXT 的私人 API 设置移动止损 # Bitget 需要通过私人API调用 params = { 'symbol': ccxt_symbol, 'trailingStopCallbackRate': callback_rate, } if activation_price: params['trailingStopActivationPrice'] = activation_price self.exchange.private_mix_post_modify_contract_trailing_stop(params) logger.info(f"✅ 设置移动止损成功: {symbol} callback={callback_rate}") return True except ccxt.BaseError as e: logger.error(f"❌ 设置移动止损失败: {e}") return False except Exception as e: logger.error(f"❌ 设置移动止损异常: {e}") return False def modify_sl_tp(self, symbol: str, stop_loss: float = None, take_profit: float = None) -> bool: """ 修改持仓的止损止盈 注意:Bitget 的止损止盈需要在开仓时设置,或者通过下独立的止损/止盈计划订单。 对于单向持仓模式,我们使用计划订单来实现。 Args: symbol: 交易对 stop_loss: 止损价格 take_profit: 止盈价格 Returns: 是否成功 """ try: ccxt_symbol = self._standardize_symbol(symbol) # 获取当前持仓 positions = self.get_position(symbol) if not positions: logger.warning(f"没有找到 {symbol} 的持仓") return False # 查找有持仓的仓位 position = None for pos in positions: if float(pos.get('contracts', 0)) != 0: position = pos break if not position: logger.warning(f"{symbol} 持仓数量为 0") return False contracts = float(position.get('contracts', 0)) pos_side = position.get('side') mark_price = float(position.get('markPrice', 0)) logger.info(f"当前持仓: {symbol} {pos_side} {contracts}张, 标记价={mark_price}") # 验证价格 if pos_side == 'long': if stop_loss and stop_loss >= mark_price: logger.error(f"做多止损必须低于当前价格: SL={stop_loss}, Mark={mark_price}") return False if take_profit and take_profit <= mark_price: logger.error(f"做多止盈必须高于当前价格: TP={take_profit}, Mark={mark_price}") return False else: if stop_loss and stop_loss <= mark_price: logger.error(f"做空止损必须高于当前价格: SL={stop_loss}, Mark={mark_price}") return False if take_profit and take_profit >= mark_price: logger.error(f"做空止盈必须低于当前价格: TP={take_profit}, Mark={mark_price}") return False # 使用独立的止损/止盈计划订单 # 注意:这种方式需要在平仓时也取消这些计划订单 # 获取合约规格(用于转换张数为币数量) if 'BTC' in symbol: contract_size = 0.01 elif 'ETH' in symbol: contract_size = 0.1 elif 'SOL' in symbol: contract_size = 1 elif 'BNB' in symbol: contract_size = 0.1 elif 'XRP' in symbol: contract_size = 10 elif 'DOGE' in symbol: contract_size = 100 elif 'MATIC' in symbol or 'POL' in symbol: contract_size = 10 else: contract_size = 1 # 将张数转换为币数量 contracts_amount = contracts * contract_size orders_created = [] # 止损单 if stop_loss: sl_side = 'sell' if pos_side == 'long' else 'buy' try: # 使用普通的 create_order 创建止损市价单 sl_order = self.exchange.create_order( symbol=ccxt_symbol, type='stop_market', side=sl_side, amount=contracts_amount, # 使用币数量,不是张数 price=None, params={ 'stopPrice': stop_loss, 'triggerBy': 'mark_price', 'tdMode': 'cross', 'marginCoin': 'USDT', 'reduceOnly': True, # 只平仓 } ) orders_created.append(('止损', sl_order)) logger.info(f"✅ 止损单已下: {sl_side} {contracts}张 ({contracts_amount}币) @ ${stop_loss}") except Exception as e: logger.warning(f"下止损单失败: {e}") # 止盈单 if take_profit: tp_side = 'sell' if pos_side == 'long' else 'buy' try: # 使用普通的 create_order 创建止盈限价单 tp_order = self.exchange.create_order( symbol=ccxt_symbol, type='limit', side=tp_side, amount=contracts_amount, # 使用币数量,不是张数 price=take_profit, params={ 'tdMode': 'cross', 'marginCoin': 'USDT', 'reduceOnly': True, # 只平仓 } ) orders_created.append(('止盈', tp_order)) logger.info(f"✅ 止盈单已下: {tp_side} {contracts}张 ({contracts_amount}币) @ ${take_profit}") except Exception as e: logger.warning(f"下止盈单失败: {e}") if orders_created: logger.info(f"✅ 止损止盈设置完成: SL={stop_loss}, TP={take_profit}") logger.info(f" 注意: 止损止盈以独立订单形式存在,平仓时需要同时取消") return True else: logger.warning(f"⚠️ 未能成功下任何止损止盈单") return False except ccxt.BaseError as e: logger.error(f"❌ 修改止损止盈失败: {e}") return False except Exception as e: logger.error(f"❌ 修改止损止盈异常: {e}") import traceback logger.debug(traceback.format_exc()) return False # ==================== 查询操作 ==================== def get_order(self, symbol: str, order_id: str = None, client_order_id: str = None) -> Optional[Dict]: """ 查询订单 Args: symbol: 交易对 order_id: 订单ID client_order_id: 自定义订单ID Returns: 订单信息 """ try: ccxt_symbol = self._standardize_symbol(symbol) if order_id: order = self.exchange.fetch_order(order_id, ccxt_symbol) elif client_order_id: order = self.exchange.fetch_order_by_client_order_id(client_order_id, ccxt_symbol) else: logger.error("必须提供 order_id 或 client_order_id") return None return order except ccxt.BaseError as e: logger.error(f"❌ 查询订单失败: {e}") return None except Exception as e: logger.error(f"❌ 查询订单异常: {e}") return None def get_open_orders(self, symbol: str = None) -> List[Dict]: """ 查询当前挂单 Args: symbol: 交易对(不传则查询所有) Returns: 挂单列表 """ try: ccxt_symbol = self._standardize_symbol(symbol) if symbol else None orders = self.exchange.fetch_open_orders(ccxt_symbol) logger.debug(f"查询到 {len(orders)} 个挂单") return orders except ccxt.BaseError as e: logger.error(f"❌ 查询挂单失败: {e}") return [] except Exception as e: logger.error(f"❌ 查询挂单异常: {e}") return [] def get_history_orders(self, symbol: str, limit: int = 100) -> List[Dict]: """ 查询历史订单 Args: symbol: 交易对 limit: 返回数量 Returns: 历史订单列表 """ try: ccxt_symbol = self._standardize_symbol(symbol) orders = self.exchange.fetch_my_trades(ccxt_symbol, limit=limit) logger.debug(f"查询到 {len(orders)} 条历史订单") return orders except ccxt.BaseError as e: logger.error(f"❌ 查询历史订单失败: {e}") return [] except Exception as e: logger.error(f"❌ 查询历史订单异常: {e}") return [] # ==================== 持仓操作 ==================== def get_position(self, symbol: str = None) -> List[Dict]: """ 查询持仓 Args: symbol: 交易对(不传则查询所有) Returns: 持仓列表 """ try: # 获取所有持仓 positions = self.exchange.fetch_positions() # 筛选非零持仓 active_positions = [] for pos in positions: size = float(pos.get('contracts', 0)) if size != 0: # 如果指定了交易对,进行过滤 if symbol: ccxt_symbol = self._standardize_symbol(symbol) if pos['symbol'] == ccxt_symbol: active_positions.append(pos) else: active_positions.append(pos) logger.debug(f"查询到 {len(active_positions)} 个持仓") return active_positions except ccxt.BaseError as e: logger.error(f"❌ 查询持仓失败: {e}") return [] except Exception as e: logger.error(f"❌ 查询持仓异常: {e}") return [] def get_single_position(self, symbol: str) -> Optional[Dict]: """ 查询单个交易对的持仓 Args: symbol: 交易对 Returns: 持仓信息 """ positions = self.get_position(symbol) if positions: return positions[0] return None def get_closed_orders(self, symbol: str = None, limit: int = 100) -> List[Dict]: """ 查询历史成交订单(从交易所获取) Args: symbol: 交易对(不传则查询所有) limit: 返回数量 Returns: 历史订单列表 """ try: # 使用 CCXT 的 fetch_my_trades 获取历史成交记录(包含盈亏信息) if symbol: ccxt_symbol = self._standardize_symbol(symbol) trades = self.exchange.fetch_my_trades(ccxt_symbol, limit=limit) else: trades = self.exchange.fetch_my_trades(limit=limit) logger.debug(f"查询到 {len(trades)} 条历史成交记录") return trades except ccxt.BaseError as e: logger.error(f"❌ 查询历史成交失败: {e}") return [] except Exception as e: logger.error(f"❌ 查询历史成交异常: {e}") return [] # ==================== 账户操作 ==================== def get_balance(self) -> Dict: """ 查询账户余额 Returns: 余额信息 {USDT: {available: "...", frozen: "...", locked: "..."}} """ try: balance = self.exchange.fetch_balance() # 转换为统一格式 result = {} for currency, info in balance.get('total', {}).items(): if info and info > 0: # 只返回有余额的币种 result[currency] = { 'available': str(balance.get('free', {}).get(currency, 0)), 'frozen': str(balance.get('used', {}).get(currency, 0)), 'locked': '0' } logger.debug(f"账户余额: {result}") return result except ccxt.BaseError as e: logger.error(f"❌ 查询余额失败: {e}") return {} except Exception as e: logger.error(f"❌ 查询余额异常: {e}") return {} def get_account_info(self) -> Dict: """ 查询账户信息 Returns: 账户信息 """ try: balance = self.exchange.fetch_balance() return balance except ccxt.BaseError as e: logger.error(f"❌ 查询账户信息失败: {e}") return {} except Exception as e: logger.error(f"❌ 查询账户信息异常: {e}") return {} def set_leverage(self, symbol: str, leverage: int, hold_side: str = "long") -> bool: """ 设置杠杆倍数 Args: symbol: 交易对 leverage: 杠杆倍数 (1-125) hold_side: 持仓方向 (long/short) - CCXT 通常设置为 both Returns: 是否成功 """ try: ccxt_symbol = self._standardize_symbol(symbol) # CCXT 设置杠杆 self.exchange.set_leverage(leverage, ccxt_symbol) logger.info(f"✅ 设置杠杆成功: {symbol} {leverage}x") return True except ccxt.BaseError as e: logger.error(f"❌ 设置杠杆失败: {e}") return False except Exception as e: logger.error(f"❌ 设置杠杆异常: {e}") return False # ==================== 辅助方法 ==================== def _standardize_symbol(self, symbol: str) -> str: """ 标准化交易对格式为 CCXT 格式 Args: symbol: 原始交易对 (如 BTCUSDT) Returns: CCXT 标准格式 (如 BTC/USDT:USDT) """ # 如果已经是 CCXT 格式,直接返回 if '/' in symbol: return symbol # 简单的转换逻辑(可以根据实际情况扩展) # 例如:BTCUSDT -> BTC/USDT:USDT if symbol.endswith('USDT'): base = symbol[:-4] # 去掉 USDT return f"{base}/USDT:USDT" # 默认返回原值 return symbol def test_connection(self) -> bool: """ 测试 API 连接 Returns: 是否连接成功 """ try: balance = self.get_balance() if balance is not None: usdt_balance = balance.get('USDT', {}).get('available', 'N/A') logger.info(f"✅ API 连接成功,USDT 余额: {usdt_balance}") return True return False except Exception as e: logger.error(f"❌ API 连接失败: {e}") return False def close(self): """关闭连接""" if self.exchange: # CCXT exchange 对象不需要显式关闭 # 但可以清理 WebSocket 连接(如果有的话) try: if hasattr(self.exchange, 'close'): self.exchange.close() except Exception as e: logger.debug(f"关闭连接时出错(可忽略): {e}") logger.info("Bitget API 连接已关闭") # 全局实例(延迟初始化) _trading_api: Optional[BitgetTradingAPI] = None def get_bitget_trading_api() -> Optional[BitgetTradingAPI]: """ 获取 Bitget 交易 API 实例(单例) Returns: BitgetTradingAPI 实例或 None(如果未配置) """ global _trading_api # 如果已有实例,检查它是否仍然有效 if _trading_api: try: # 尝试获取余额来验证连接是否仍然有效 _trading_api.get_balance() return _trading_api except Exception as e: logger.warning(f"Bitget API 实例已失效({e}),将重新创建") _trading_api = None from app.config import get_settings settings = get_settings() # 检查是否配置了 API Key if not settings.bitget_api_key or not settings.bitget_api_secret: logger.warning("Bitget API Key 未配置,实盘交易功能不可用") return None # 创建实例 _trading_api = BitgetTradingAPI( api_key=settings.bitget_api_key, api_secret=settings.bitget_api_secret, passphrase=settings.bitget_passphrase, use_testnet=settings.bitget_use_testnet ) return _trading_api def reset_bitget_trading_api(): """重置全局实例(用于测试或配置更新)""" global _trading_api if _trading_api: _trading_api.close() _trading_api = None logger.info("Bitget API 实例已重置")