stock-ai-agent/backend/app/crypto_agent/executor/bitget_executor.py
2026-04-27 22:09:22 +08:00

586 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Bitget 实盘交易执行器
"""
from typing import Dict, Any, List, Optional
from app.crypto_agent.executor.base_executor import BaseExecutor
from app.services.bitget_live_trading_service import get_bitget_live_service
from app.utils.logger import logger
import re
class BitgetExecutor(BaseExecutor):
"""Bitget 实盘交易执行器"""
def __init__(self, service=None, account_id: str = "default"):
super().__init__("Bitget")
self.account_id = (account_id or "default").strip() or "default"
self.bitget = service or get_bitget_live_service(self.account_id)
self._position_protection_state: Dict[str, Dict[str, Any]] = {}
def _notification_context(self) -> Dict[str, str]:
account_id = getattr(self, 'account_id', 'default') or 'default'
return {
'account_id': account_id,
'target_key': f'Bitget:{account_id}',
}
# ==================== 核心执行方法 ====================
async def execute_open(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行开仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
action = decision.get('signal_action', decision.get('action')) # buy/sell
margin = decision.get('margin', decision.get('quantity', 0))
entry_price = decision.get('entry_price', current_price)
stop_loss = decision.get('stop_loss')
take_profit = decision.get('take_profit')
leverage = min(decision.get('leverage', self.bitget.settings.bitget_default_leverage), 10)
# 决定订单类型
order_type, order_reason = self.decide_order_type(decision, current_price)
logger.info(f" 订单类型: {order_reason}")
# 调整保证金(预留手续费)
account_state = self.bitget.get_account_state()
available = account_state.get('available_balance', 0)
adjusted_margin = self.calculate_effective_margin(available, margin)
# 计算合约张数,必须与实际执行杠杆保持一致
contracts = self._calculate_contracts(symbol, adjusted_margin, entry_price, leverage)
actual_position_value = contracts * self.bitget.get_contract_size(symbol) * entry_price
leverage_ok, leverage_reason, effective_leverage = self.validate_effective_leverage(
decision,
adjusted_margin,
actual_position_value,
)
if contracts < 1:
return {
'success': False,
'error': (
f'仓位计算结果 {contracts} 张,低于最小下单量 '
f'(保证金=${adjusted_margin:.2f}, 杠杆={leverage}x)'
)
}
if not leverage_ok:
return {
'success': False,
'error': leverage_reason,
'effective_leverage': effective_leverage,
}
# 设置杠杆
self.bitget.update_leverage(symbol, leverage)
# 下单
is_buy = (action == 'buy')
if order_type == 'market':
result = self.bitget.place_market_order(symbol, is_buy=is_buy, size=contracts)
else:
result = self.bitget.place_limit_order(symbol, is_buy=is_buy, size=contracts, price=entry_price)
if not result.get('success'):
return result
order_id = result.get('order_id')
order_status = result.get('order_status', 'filled')
result['contracts'] = contracts
result['margin'] = adjusted_margin
result['leverage'] = leverage
result['order_type'] = order_type
result['entry_price'] = entry_price
result['actual_position_value'] = actual_position_value
result['effective_leverage'] = effective_leverage
# 设置止盈止损
if stop_loss or take_profit:
is_buy = (action == 'buy')
if order_status == 'filled':
# 市价单已成交,直接设置 TP/SL
try:
tp_sl_result = self.bitget.set_tp_sl(
symbol=symbol,
is_long=is_buy,
size=contracts,
tp_price=take_profit,
sl_price=stop_loss
)
tp_set = tp_sl_result.get('tp_set', False)
sl_set = tp_sl_result.get('sl_set', False)
if tp_set and sl_set:
logger.info(f" ✅ 止盈止损已设置: TP={take_profit}, SL={stop_loss}")
elif tp_set or sl_set:
# 部分成功:记录缺失侧到 pending
missing_tp = take_profit if not tp_set else None
missing_sl = stop_loss if not sl_set else None
result['pending_tp_sl'] = {
'tp_price': missing_tp,
'sl_price': missing_sl
}
result['contracts'] = contracts
set_text = "TP" if tp_set else "SL"
fail_text = "TP" if not tp_set else "SL"
logger.warning(f" ⚠️ 止盈止损部分成功: {set_text}已设, {fail_text}待补设")
result['tp_sl_warning'] = f"{fail_text}设置失败,已加入待补设列表"
else:
# 全部失败:记录到 pending 等待补救
result['pending_tp_sl'] = {
'tp_price': take_profit,
'sl_price': stop_loss
}
result['contracts'] = contracts
errors = tp_sl_result.get('errors', [])
logger.warning(f" ⚠️ 止盈止损设置失败,已加入待补设列表: {errors}")
result['tp_sl_warning'] = f"TP/SL设置失败: {'; '.join(errors)}"
except Exception as tp_sl_err:
logger.error(f" ⚠️ 止盈止损设置异常: {tp_sl_err}")
result['pending_tp_sl'] = {
'tp_price': take_profit,
'sl_price': stop_loss
}
result['contracts'] = contracts
result['tp_sl_warning'] = str(tp_sl_err)
else:
# 限价单未成交,延迟到持仓确认后设置
result['pending_tp_sl'] = {
'tp_price': take_profit,
'sl_price': stop_loss
}
result['contracts'] = contracts
logger.info(f" 📌 限价单待成交TP/SL 将在成交后自动设置: TP={take_profit}, SL={stop_loss}")
logger.info(f" ✅ 开仓成功: {symbol} {contracts}张 @ ${order_type}")
# 开仓成功通知由 crypto_agent 统一发送,避免与执行摘要重复
return result
except Exception as e:
logger.error(f"Bitget 开仓失败: {e}")
error_result = {'success': False, 'error': str(e)}
# 发送失败通知
await self.send_execution_notification(
operation='OPEN',
symbol=decision.get('symbol', ''),
result=error_result,
details=self._notification_context()
)
return error_result
async def execute_close(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行平仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
orders_to_close = decision.get('orders_to_close', [])
# Bitget 持仓是按 symbol 聚合管理,不能按 order_id 精确平仓。
result = self.bitget.market_close_position(symbol)
if result.get('success'):
logger.info(f" ✅ 平仓成功: {symbol}")
else:
logger.warning(f" ⚠️ 平仓失败: {symbol} - {result.get('error', '未知错误')}")
if orders_to_close:
result['requested_order_ids'] = orders_to_close
# 发送飞书通知
await self.send_execution_notification(
operation='CLOSE',
symbol=symbol,
result=result,
details=self._notification_context()
)
return result
except Exception as e:
logger.error(f"Bitget 平仓失败: {e}")
error_result = {'success': False, 'error': str(e)}
# 发送失败通知
await self.send_execution_notification(
operation='CLOSE',
symbol=decision.get('symbol', ''),
result=error_result,
details=self._notification_context()
)
return error_result
async def execute_cancel(self, order_id: str, symbol: str) -> Dict[str, Any]:
"""执行撤单"""
try:
result = self.bitget.cancel_order(symbol.replace('USDT', ''), order_id)
if result.get('success'):
logger.info(f" ✅ 撤单成功: {order_id}")
# 发送飞书通知
await self.send_execution_notification(
operation='CANCEL',
symbol=symbol,
result=result,
details={'order_id': order_id, **self._notification_context()}
)
return result
except Exception as e:
logger.error(f"Bitget 撤单失败: {e}")
error_result = {'success': False, 'error': str(e), 'order_id': order_id}
# 发送失败通知
await self.send_execution_notification(
operation='CANCEL',
symbol=symbol,
result=error_result,
details={'order_id': order_id, **self._notification_context()}
)
return error_result
async def set_stop_loss_take_profit(self,
symbol: str,
order_id: str,
stop_loss: Optional[float],
take_profit: Optional[float],
position_size: float) -> Dict[str, Any]:
"""设置止盈止损"""
try:
# Bitget 需要知道方向
positions = self.bitget.get_open_positions()
pos = next((p for p in positions if p.get('coin') == symbol.replace('USDT', '')), None)
if not pos:
return {'success': False, 'message': f'找不到 {symbol} 的持仓'}
is_long = pos['size'] > 0
result = self.bitget.set_tp_sl(
symbol=symbol.replace('USDT', ''),
is_long=is_long,
size=position_size,
tp_price=take_profit,
sl_price=stop_loss
)
if result.get('success'):
logger.info(f" ✅ 止盈止损设置成功: SL=${stop_loss}, TP={take_profit}")
return result
except Exception as e:
logger.error(f"Bitget 设置止盈止损失败: {e}")
return {'success': False, 'message': str(e)}
def should_set_tp_sl_on_order(self) -> bool:
"""Bitget 不支持在下单时设置 TP/SL"""
return False
# ==================== 平台特定配置 ====================
def get_market_order_threshold(self) -> float:
"""市价单阈值: 0.2%"""
return 0.2
def get_pending_order_timeout(self) -> float:
"""挂单超时: 24 小时Bitget 流动性好)"""
return 24.0
def get_position_exit_rules(self) -> tuple:
"""Bitget 依赖保护单和移动止损管理利润,不做固定盈利自动平仓。"""
return (float('inf'), float('inf'))
def get_fee_rate(self) -> float:
"""手续费率: 0.06% (taker)"""
return 0.0006
def get_max_retries(self) -> int:
"""最大重试次数: 5Bitget API 限流严格)"""
return 5
def is_rate_limit_error(self, error_msg: str) -> bool:
"""判断是否是限流错误"""
rate_limit_indicators = [
'rate limit',
'too many requests',
'429',
'limit exceeded',
'请求频率'
]
return any(indicator in error_msg.lower() for indicator in rate_limit_indicators)
def get_rate_limit_wait_time(self, error_msg: str, attempt: int) -> float:
"""
获取限流等待时间
Bitget API 限流:指数退避 + 抖动
"""
base_wait = min(2 ** attempt, 60) # 指数退避,最大 60s
# 检查错误信息中是否包含等待时间
wait_match = re.search(r'wait\s+(\d+)\s*s', error_msg, re.IGNORECASE)
if wait_match:
suggested_wait = int(wait_match.group(1))
return min(suggested_wait, 60)
# 指数退避 + 随机抖动
import random
jitter = random.uniform(0.5, 1.5)
return base_wait * jitter
def get_price_update_threshold(self) -> float:
"""价格更新阈值: 0.5%"""
return 0.5
# ==================== 移动止损 ====================
async def move_stop_loss(self,
symbol: str,
new_stop_loss: float,
current_stop_loss: Optional[float] = None) -> Dict[str, Any]:
"""
移动止损Bitget
Args:
symbol: 交易对(如 BTCUSDT
new_stop_loss: 新止损价
current_stop_loss: 当前止损价(可选)
Returns:
{'success': bool, 'message': str}
"""
try:
position = self.bitget.get_position_for_symbol(symbol)
if not position:
return {'success': False, 'message': f'找不到 {symbol} 的持仓'}
tp_sl_prices = self.bitget.get_tp_sl_prices(symbol.replace('USDT', ''))
result = self.bitget.set_tp_sl(
symbol=symbol.replace('USDT', ''),
is_long=position['size'] > 0,
size=abs(position['size']),
tp_price=tp_sl_prices.get('take_profit'),
sl_price=new_stop_loss
)
if result.get('success'):
logger.info(f" ✅ 移动止损成功: {symbol} → ${new_stop_loss:.2f}")
return {'success': True, 'message': f'移动止损成功: {new_stop_loss:.2f}'}
else:
errors = result.get('errors', [])
return {'success': False, 'message': '; '.join(errors) if errors else '移动止损失败'}
except Exception as e:
logger.error(f"Bitget 移动止损失败: {e}")
return {'success': False, 'message': str(e)}
async def move_protection_levels(self,
symbol: str,
new_stop_loss: float,
new_take_profit: Optional[float] = None,
current_stop_loss: Optional[float] = None) -> Dict[str, Any]:
"""同时更新 Bitget 的止损/止盈保护单。"""
try:
position = self.bitget.get_position_for_symbol(symbol)
if not position:
return {'success': False, 'message': f'找不到 {symbol} 的持仓'}
current_tp_sl = self.bitget.get_tp_sl_prices(symbol.replace('USDT', ''))
target_tp = new_take_profit if new_take_profit is not None else current_tp_sl.get('take_profit')
result = self.bitget.set_tp_sl(
symbol=symbol.replace('USDT', ''),
is_long=position['size'] > 0,
size=abs(position['size']),
tp_price=target_tp,
sl_price=new_stop_loss
)
if result.get('success'):
logger.info(
f" ✅ Bitget 保护单更新成功: {symbol} "
f"SL→${new_stop_loss:.2f} TP→{f'${target_tp:.2f}' if isinstance(target_tp, (int, float)) else '保持'}"
)
return {
'success': True,
'message': f'保护单更新成功: SL={new_stop_loss:.2f}',
'take_profit': target_tp,
}
errors = result.get('errors', [])
return {'success': False, 'message': '; '.join(errors) if errors else '保护单更新失败'}
except Exception as e:
logger.error(f"Bitget 更新保护单失败: {e}")
return {'success': False, 'message': str(e)}
def _protection_state_key(self, position: Dict[str, Any]) -> str:
symbol = str(position.get('symbol') or '').upper()
side = str(position.get('side') or '')
entry_price = float(position.get('entry_price', 0) or 0)
return f"{self.account_id}:{symbol}:{side}:{entry_price:.8f}"
def export_position_protection_state(self) -> Dict[str, Dict[str, Any]]:
return {key: dict(value) for key, value in self._position_protection_state.items()}
def _compute_trailing_take_profit(self,
position: Dict[str, Any],
current_price: float,
new_stop_loss: float,
current_take_profit: Optional[float]) -> Optional[float]:
if not self.bitget.settings.bitget_dynamic_tp_enabled:
return current_take_profit
if not isinstance(current_take_profit, (int, float)) or current_take_profit <= 0:
return current_take_profit
side = position.get('side')
entry_price = float(position.get('entry_price', 0) or 0)
distance_ratio = float(self.bitget.settings.bitget_dynamic_tp_distance_ratio or 0.8)
if side == 'buy':
original_tp_distance = float(current_take_profit) - entry_price
candidate = new_stop_loss + (original_tp_distance * distance_ratio)
if candidate <= current_price:
return current_take_profit
return max(float(current_take_profit), candidate)
original_tp_distance = entry_price - float(current_take_profit)
candidate = new_stop_loss - (original_tp_distance * distance_ratio)
if candidate >= current_price:
return current_take_profit
return min(float(current_take_profit), candidate)
def check_position_management(self,
positions: List[Dict],
current_prices: Dict[str, float],
volatility_data: Optional[Dict[str, float]] = None) -> List[Dict[str, Any]]:
"""Bitget 仓位保护:先保本,再基于最大浮盈分段上移止损。"""
actions: List[Dict[str, Any]] = []
active_keys = set()
breakeven_threshold = float(self.bitget.settings.bitget_breakeven_threshold or 1.0)
trailing_enabled = bool(self.bitget.settings.bitget_trailing_stop_enabled)
trailing_threshold = breakeven_threshold * float(self.bitget.settings.bitget_trailing_stop_threshold_multiplier or 2.0)
trailing_ratio = float(self.bitget.settings.bitget_trailing_stop_ratio or 0.5)
min_move_step = float(self.bitget.settings.bitget_trailing_min_move_step or 0.4)
for pos in positions:
symbol = pos.get('symbol')
current_price = float(current_prices.get(symbol, pos.get('entry_price', 0)) or 0)
entry_price = float(pos.get('entry_price', 0) or 0)
side = pos.get('side')
current_sl = pos.get('stop_loss')
current_tp = pos.get('take_profit')
if current_price <= 0 or entry_price <= 0 or side not in {'buy', 'sell'} or not isinstance(current_sl, (int, float)):
continue
pnl_pct = ((current_price - entry_price) / entry_price * 100) if side == 'buy' else ((entry_price - current_price) / entry_price * 100)
state_key = self._protection_state_key(pos)
active_keys.add(state_key)
state = self._position_protection_state.setdefault(state_key, {
'max_pnl_pct': pnl_pct,
'breakeven_done': False,
'trailing_active': False,
})
state['max_pnl_pct'] = max(float(state.get('max_pnl_pct', pnl_pct) or pnl_pct), pnl_pct)
max_pnl_pct = float(state['max_pnl_pct'])
if trailing_enabled and pnl_pct >= trailing_threshold:
locked_profit_pct = max_pnl_pct * trailing_ratio
if side == 'buy':
new_sl = entry_price * (1 + locked_profit_pct / 100)
current_locked_pct = (float(current_sl) - entry_price) / entry_price * 100
can_move = new_sl > float(current_sl) and (locked_profit_pct - current_locked_pct) >= min_move_step
else:
new_sl = entry_price * (1 - locked_profit_pct / 100)
current_locked_pct = (entry_price - float(current_sl)) / entry_price * 100
can_move = new_sl < float(current_sl) and (locked_profit_pct - current_locked_pct) >= min_move_step
if can_move:
new_tp = self._compute_trailing_take_profit(pos, current_price, new_sl, current_tp)
state['trailing_active'] = True
state['last_move_reason'] = 'trailing'
state['last_new_sl'] = new_sl
state['last_new_tp'] = new_tp
actions.append({
'symbol': symbol,
'action': 'MOVE_SL',
'new_sl': new_sl,
'new_tp': new_tp,
'pnl_pct': pnl_pct,
'reason': f"最高盈利 {max_pnl_pct:.1f}% ,锁定利润 {locked_profit_pct:.1f}% ,上移止损",
'priority': 3,
})
continue
if pnl_pct >= breakeven_threshold:
if side == 'buy' and float(current_sl) < entry_price:
state['breakeven_done'] = True
state['last_move_reason'] = 'breakeven'
state['last_new_sl'] = entry_price
state['last_new_tp'] = current_tp
actions.append({
'symbol': symbol,
'action': 'MOVE_SL',
'new_sl': entry_price,
'new_tp': current_tp,
'pnl_pct': pnl_pct,
'reason': f"盈利 {pnl_pct:.1f}% >= {breakeven_threshold:.1f}% ,止损移到保本",
'priority': 3,
})
elif side == 'sell' and float(current_sl) > entry_price:
state['breakeven_done'] = True
state['last_move_reason'] = 'breakeven'
state['last_new_sl'] = entry_price
state['last_new_tp'] = current_tp
actions.append({
'symbol': symbol,
'action': 'MOVE_SL',
'new_sl': entry_price,
'new_tp': current_tp,
'pnl_pct': pnl_pct,
'reason': f"盈利 {pnl_pct:.1f}% >= {breakeven_threshold:.1f}% ,止损移到保本",
'priority': 3,
})
stale_keys = [key for key in self._position_protection_state.keys() if key not in active_keys]
for key in stale_keys:
self._position_protection_state.pop(key, None)
actions.sort(key=lambda x: x.get('priority', 99))
return actions
# ==================== 辅助方法 ====================
def _calculate_contracts(self, symbol: str, margin: float, price: float, leverage: int) -> int:
"""计算合约张数"""
try:
# 获取合约规格
contract_size = self.bitget.get_contract_size(symbol.replace('USDT', ''))
# 计算持仓价值
position_value = margin * leverage
# 计算币数量
coin_amount = position_value / price
# 计算合约张数(向下取整)
contracts = int(coin_amount / contract_size)
logger.info(
f" 仓位计算: ${margin:.2f} × {leverage}x = ${position_value:.2f} "
f"{coin_amount:.6f} {symbol}{contracts}"
)
return contracts
except Exception as e:
logger.error(f"计算合约张数失败: {e}")
return 0