stock-ai-agent/backend/app/crypto_agent/executor/bitget_executor.py
2026-03-28 22:14:32 +08:00

322 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Bitget 实盘交易执行器
"""
from typing import Dict, Any, List, Optional
from app.crypto_agent.executor.base_executor import BaseExecutor
from app.services.bitget_live_trading_service import get_bitget_live_service
from app.utils.logger import logger
import re
class BitgetExecutor(BaseExecutor):
"""Bitget 实盘交易执行器"""
def __init__(self):
super().__init__("Bitget")
self.bitget = get_bitget_live_service()
# ==================== 核心执行方法 ====================
async def execute_open(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行开仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
action = decision.get('action') # buy/sell
margin = decision.get('margin', decision.get('quantity', 0))
entry_price = decision.get('entry_price', current_price)
stop_loss = decision.get('stop_loss')
take_profit = decision.get('take_profit')
# 决定订单类型
order_type, order_reason = self.decide_order_type(decision, current_price)
logger.info(f" 订单类型: {order_reason}")
# 调整保证金(预留手续费)
account_state = self.bitget.get_account_state()
available = account_state.get('available_balance', 0)
adjusted_margin = self.calculate_effective_margin(available, margin)
# 讣算合约张数
contracts = self._calculate_contracts(symbol, adjusted_margin, entry_price)
if contracts < 1:
return {
'success': False,
'error': f'仓位计算结果 {contracts} 张,低于最小下单量'
}
# 设置杠杆
leverage = min(decision.get('leverage', 5), 10)
self.bitget.update_leverage(symbol, leverage)
# 下单
is_buy = (action == 'buy')
if order_type == 'market':
result = self.bitget.place_market_order(symbol, is_buy=is_buy, size=contracts)
else:
result = self.bitget.place_limit_order(symbol, is_buy=is_buy, size=contracts, price=entry_price)
if not result.get('success'):
return result
order_id = result.get('order_id')
order_status = result.get('order_status', 'filled')
# 设置止盈止损(成交后)
if stop_loss or take_profit:
if order_status == 'filled':
# 已成交,立即设置
tp_sl_result = self.bitget.set_tp_sl(
symbol=symbol,
is_long=is_buy,
size=contracts,
tp_price=take_profit,
sl_price=stop_loss
)
if not tp_sl_result.get('success'):
logger.warning(f" ⚠️ 止盈止损设置失败: {tp_sl_result.get('error')}")
result['tp_sl_warning'] = tp_sl_result.get('error')
else:
# 挂单中,记录到待处理列表
logger.info(f" 📌 挌单中TP/SL 将在成交后设置")
# 由 crypto_agent 的循环检查机制处理
result['pending_tp_sl'] = {
'stop_loss': stop_loss,
'take_profit': take_profit
}
logger.info(f" ✅ 开仓成功: {symbol} {contracts}张 @ ${order_type}")
# 发送飞书通知
await self.send_execution_notification(
operation='OPEN',
symbol=symbol,
result=result,
details={
'size': contracts,
'price': entry_price,
'margin': adjusted_margin,
'leverage': leverage,
'stop_loss': stop_loss,
'take_profit': take_profit,
'order_type': order_type
}
)
return result
except Exception as e:
logger.error(f"Bitget 开仓失败: {e}")
error_result = {'success': False, 'error': str(e)}
# 发送失败通知
await self.send_execution_notification(
operation='OPEN',
symbol=decision.get('symbol', ''),
result=error_result
)
return error_result
async def execute_close(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行平仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
orders_to_close = decision.get('orders_to_close', [])
if not orders_to_close:
# 平掉所有持仓
result = self.bitget.market_close_all()
logger.info(f" ✅ 平仓所有持仓")
# 发送飞书通知
await self.send_execution_notification(
operation='CLOSE',
symbol=symbol,
result=result
)
return result
# 平掉指定订单
results = []
for order_id in orders_to_close:
# 这里需要根据 order_id 找到对应的持仓
# Bitget 的持仓管理需要优化
results.append({'order_id': order_id, 'success': True})
success_result = {'success': True, 'results': results}
# 发送飞书通知
await self.send_execution_notification(
operation='CLOSE',
symbol=symbol,
result=success_result
)
return success_result
except Exception as e:
logger.error(f"Bitget 平仓失败: {e}")
error_result = {'success': False, 'error': str(e)}
# 发送失败通知
await self.send_execution_notification(
operation='CLOSE',
symbol=decision.get('symbol', ''),
result=error_result
)
return error_result
async def execute_cancel(self, order_id: str, symbol: str) -> Dict[str, Any]:
"""执行撤单"""
try:
result = self.bitget.cancel_order(symbol.replace('USDT', ''), order_id)
logger.info(f" ✅ 撤单成功: {order_id}")
# 发送飞书通知
await self.send_execution_notification(
operation='CANCEL',
symbol=symbol,
result=result,
details={'order_id': order_id}
)
return result
except Exception as e:
logger.error(f"Bitget 撤单失败: {e}")
error_result = {'success': False, 'error': str(e), 'order_id': order_id}
# 发送失败通知
await self.send_execution_notification(
operation='CANCEL',
symbol=symbol,
result=error_result,
details={'order_id': order_id}
)
return error_result
async def set_stop_loss_take_profit(self,
symbol: str,
order_id: str,
stop_loss: Optional[float],
take_profit: Optional[float],
position_size: float) -> Dict[str, Any]:
"""设置止盈止损"""
try:
# Bitget 需要知道方向
positions = self.bitget.get_open_positions()
pos = next((p for p in positions if p.get('coin') == symbol.replace('USDT', '')), None)
if not pos:
return {'success': False, 'message': f'找不到 {symbol} 的持仓'}
is_long = pos['size'] > 0
result = self.bitget.set_tp_sl(
symbol=symbol.replace('USDT', ''),
is_long=is_long,
size=position_size,
tp_price=take_profit,
sl_price=stop_loss
)
if result.get('success'):
logger.info(f" ✅ 止盈止损设置成功: SL=${stop_loss}, TP={take_profit}")
return result
except Exception as e:
logger.error(f"Bitget 设置止盈止损失败: {e}")
return {'success': False, 'message': str(e)}
def should_set_tp_sl_on_order(self) -> bool:
"""Bitget 不支持在下单时设置 TP/SL"""
return False
# ==================== 平台特定配置 ====================
def get_market_order_threshold(self) -> float:
"""市价单阈值: 0.2%"""
return 0.2
def get_pending_order_timeout(self) -> float:
"""挂单超时: 24 小时Bitget 流动性好)"""
return 24.0
def get_position_exit_rules(self) -> tuple:
"""持仓退出规则:(目标盈利 3%, 最大持仓 6h)"""
return (3.0, 6.0)
def get_fee_rate(self) -> float:
"""手续费率: 0.06% (taker)"""
return 0.0006
def get_max_retries(self) -> int:
"""最大重试次数: 5Bitget API 限流严格)"""
return 5
def is_rate_limit_error(self, error_msg: str) -> bool:
"""判断是否是限流错误"""
rate_limit_indicators = [
'rate limit',
'too many requests',
'429',
'limit exceeded',
'请求频率'
]
return any(indicator in error_msg.lower() for indicator in rate_limit_indicators)
def get_rate_limit_wait_time(self, error_msg: str, attempt: int) -> float:
"""
获取限流等待时间
Bitget API 限流:指数退避 + 抖动
"""
base_wait = min(2 ** attempt, 60) # 指数退避,最大 60s
# 检查错误信息中是否包含等待时间
wait_match = re.search(r'wait\s+(\d+)\s*s', error_msg, re.IGNORECASE)
if wait_match:
suggested_wait = int(wait_match.group(1))
return min(suggested_wait, 60)
# 指数退避 + 随机抖动
import random
jitter = random.uniform(0.5, 1.5)
return base_wait * jitter
def get_price_update_threshold(self) -> float:
"""价格更新阈值: 0.5%"""
return 0.5
# ==================== 辅助方法 ====================
def _calculate_contracts(self, symbol: str, margin: float, price: float) -> int:
"""计算合约张数"""
try:
# 获取合约规格
contract_size = self.bitget.get_contract_size(symbol.replace('USDT', ''))
# 计算持仓价值
position_value = margin * 5 # 假设 5x 杠杆
# 计算币数量
coin_amount = position_value / price
# 计算合约张数(向下取整)
contracts = int(coin_amount / contract_size)
logger.info(f" 仓位计算: ${margin:.2f} USD → {coin_amount:.6f} {symbol}{contracts}")
return contracts
except Exception as e:
logger.error(f"计算合约张数失败: {e}")
return 0