stock-ai-agent/backend/app/crypto_agent/executor/bitget_executor.py
2026-03-29 10:35:22 +08:00

349 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Bitget 实盘交易执行器
"""
from typing import Dict, Any, List, Optional
from app.crypto_agent.executor.base_executor import BaseExecutor
from app.services.bitget_live_trading_service import get_bitget_live_service
from app.utils.logger import logger
import re
class BitgetExecutor(BaseExecutor):
"""Bitget 实盘交易执行器"""
def __init__(self):
super().__init__("Bitget")
self.bitget = get_bitget_live_service()
# ==================== 核心执行方法 ====================
async def execute_open(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行开仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
action = decision.get('action') # buy/sell
margin = decision.get('margin', decision.get('quantity', 0))
entry_price = decision.get('entry_price', current_price)
stop_loss = decision.get('stop_loss')
take_profit = decision.get('take_profit')
# 决定订单类型
order_type, order_reason = self.decide_order_type(decision, current_price)
logger.info(f" 订单类型: {order_reason}")
# 调整保证金(预留手续费)
account_state = self.bitget.get_account_state()
available = account_state.get('available_balance', 0)
adjusted_margin = self.calculate_effective_margin(available, margin)
# 讣算合约张数
contracts = self._calculate_contracts(symbol, adjusted_margin, entry_price)
if contracts < 1:
return {
'success': False,
'error': f'仓位计算结果 {contracts} 张,低于最小下单量'
}
# 设置杠杆
leverage = min(decision.get('leverage', 5), 10)
self.bitget.update_leverage(symbol, leverage)
# 下单
is_buy = (action == 'buy')
if order_type == 'market':
result = self.bitget.place_market_order(symbol, is_buy=is_buy, size=contracts)
else:
result = self.bitget.place_limit_order(symbol, is_buy=is_buy, size=contracts, price=entry_price)
if not result.get('success'):
return result
order_id = result.get('order_id')
order_status = result.get('order_status', 'filled')
# 设置止盈止损
# 策略:总是记录到 pending由定期检查机制处理
# 原因Bitget 的持仓确认可能有延迟,立即设置可能失败
if stop_loss or take_profit:
# 返回给 crypto_agent 的 pending_tp_sl 格式
# crypto_agent 会合并: {symbol, is_long, contracts, **pending_tp_sl}
result['pending_tp_sl'] = {
'tp_price': take_profit,
'sl_price': stop_loss
}
# 同时记录 contracts 到 result供 crypto_agent 使用
result['contracts'] = contracts
logger.info(f" 📌 已记录 TP/SL 到待处理列表: TP=${take_profit}, SL=${stop_loss}")
logger.info(f" 📌 将由定期检查机制在持仓确认后自动设置")
logger.info(f" ✅ 开仓成功: {symbol} {contracts}张 @ ${order_type}")
# 发送飞书通知
await self.send_execution_notification(
operation='OPEN',
symbol=symbol,
result=result,
details={
'size': contracts,
'price': entry_price,
'margin': adjusted_margin,
'leverage': leverage,
'stop_loss': stop_loss,
'take_profit': take_profit,
'order_type': order_type
}
)
return result
except Exception as e:
logger.error(f"Bitget 开仓失败: {e}")
error_result = {'success': False, 'error': str(e)}
# 发送失败通知
await self.send_execution_notification(
operation='OPEN',
symbol=decision.get('symbol', ''),
result=error_result
)
return error_result
async def execute_close(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行平仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
orders_to_close = decision.get('orders_to_close', [])
if not orders_to_close:
# 平掉所有持仓
result = self.bitget.market_close_all()
logger.info(f" ✅ 平仓所有持仓")
# 发送飞书通知
await self.send_execution_notification(
operation='CLOSE',
symbol=symbol,
result=result
)
return result
# 平掉指定订单
results = []
for order_id in orders_to_close:
# 这里需要根据 order_id 找到对应的持仓
# Bitget 的持仓管理需要优化
results.append({'order_id': order_id, 'success': True})
success_result = {'success': True, 'results': results}
# 发送飞书通知
await self.send_execution_notification(
operation='CLOSE',
symbol=symbol,
result=success_result
)
return success_result
except Exception as e:
logger.error(f"Bitget 平仓失败: {e}")
error_result = {'success': False, 'error': str(e)}
# 发送失败通知
await self.send_execution_notification(
operation='CLOSE',
symbol=decision.get('symbol', ''),
result=error_result
)
return error_result
async def execute_cancel(self, order_id: str, symbol: str) -> Dict[str, Any]:
"""执行撤单"""
try:
result = self.bitget.cancel_order(symbol.replace('USDT', ''), order_id)
logger.info(f" ✅ 撤单成功: {order_id}")
# 发送飞书通知
await self.send_execution_notification(
operation='CANCEL',
symbol=symbol,
result=result,
details={'order_id': order_id}
)
return result
except Exception as e:
logger.error(f"Bitget 撤单失败: {e}")
error_result = {'success': False, 'error': str(e), 'order_id': order_id}
# 发送失败通知
await self.send_execution_notification(
operation='CANCEL',
symbol=symbol,
result=error_result,
details={'order_id': order_id}
)
return error_result
async def set_stop_loss_take_profit(self,
symbol: str,
order_id: str,
stop_loss: Optional[float],
take_profit: Optional[float],
position_size: float) -> Dict[str, Any]:
"""设置止盈止损"""
try:
# Bitget 需要知道方向
positions = self.bitget.get_open_positions()
pos = next((p for p in positions if p.get('coin') == symbol.replace('USDT', '')), None)
if not pos:
return {'success': False, 'message': f'找不到 {symbol} 的持仓'}
is_long = pos['size'] > 0
result = self.bitget.set_tp_sl(
symbol=symbol.replace('USDT', ''),
is_long=is_long,
size=position_size,
tp_price=take_profit,
sl_price=stop_loss
)
if result.get('success'):
logger.info(f" ✅ 止盈止损设置成功: SL=${stop_loss}, TP={take_profit}")
return result
except Exception as e:
logger.error(f"Bitget 设置止盈止损失败: {e}")
return {'success': False, 'message': str(e)}
def should_set_tp_sl_on_order(self) -> bool:
"""Bitget 不支持在下单时设置 TP/SL"""
return False
# ==================== 平台特定配置 ====================
def get_market_order_threshold(self) -> float:
"""市价单阈值: 0.2%"""
return 0.2
def get_pending_order_timeout(self) -> float:
"""挂单超时: 24 小时Bitget 流动性好)"""
return 24.0
def get_position_exit_rules(self) -> tuple:
"""持仓退出规则:(目标盈利 3%, 最大持仓 6h)"""
return (3.0, 6.0)
def get_fee_rate(self) -> float:
"""手续费率: 0.06% (taker)"""
return 0.0006
def get_max_retries(self) -> int:
"""最大重试次数: 5Bitget API 限流严格)"""
return 5
def is_rate_limit_error(self, error_msg: str) -> bool:
"""判断是否是限流错误"""
rate_limit_indicators = [
'rate limit',
'too many requests',
'429',
'limit exceeded',
'请求频率'
]
return any(indicator in error_msg.lower() for indicator in rate_limit_indicators)
def get_rate_limit_wait_time(self, error_msg: str, attempt: int) -> float:
"""
获取限流等待时间
Bitget API 限流:指数退避 + 抖动
"""
base_wait = min(2 ** attempt, 60) # 指数退避,最大 60s
# 检查错误信息中是否包含等待时间
wait_match = re.search(r'wait\s+(\d+)\s*s', error_msg, re.IGNORECASE)
if wait_match:
suggested_wait = int(wait_match.group(1))
return min(suggested_wait, 60)
# 指数退避 + 随机抖动
import random
jitter = random.uniform(0.5, 1.5)
return base_wait * jitter
def get_price_update_threshold(self) -> float:
"""价格更新阈值: 0.5%"""
return 0.5
# ==================== 移动止损 ====================
async def move_stop_loss(self,
symbol: str,
new_stop_loss: float,
current_stop_loss: Optional[float] = None) -> Dict[str, Any]:
"""
移动止损Bitget
Args:
symbol: 交易对(如 BTCUSDT
new_stop_loss: 新止损价
current_stop_loss: 当前止损价(可选)
Returns:
{'success': bool, 'message': str}
"""
try:
# Bitget 使用 modify_sl_tp 方法
success = self.bitget.modify_sl_tp(
symbol=symbol.replace('USDT', ''),
stop_loss=new_stop_loss
)
if success:
logger.info(f" ✅ 移动止损成功: {symbol} → ${new_stop_loss:.2f}")
return {'success': True, 'message': f'移动止损成功: {new_stop_loss:.2f}'}
else:
return {'success': False, 'message': '移动止损失败'}
except Exception as e:
logger.error(f"Bitget 移动止损失败: {e}")
return {'success': False, 'message': str(e)}
# ==================== 辅助方法 ====================
def _calculate_contracts(self, symbol: str, margin: float, price: float) -> int:
"""计算合约张数"""
try:
# 获取合约规格
contract_size = self.bitget.get_contract_size(symbol.replace('USDT', ''))
# 计算持仓价值
position_value = margin * 5 # 假设 5x 杠杆
# 计算币数量
coin_amount = position_value / price
# 计算合约张数(向下取整)
contracts = int(coin_amount / contract_size)
logger.info(f" 仓位计算: ${margin:.2f} USD → {coin_amount:.6f} {symbol}{contracts}")
return contracts
except Exception as e:
logger.error(f"计算合约张数失败: {e}")
return 0