""" Hyperliquid 实盘交易执行器 """ from typing import Dict, Any, List, Optional from app.crypto_agent.executor.base_executor import BaseExecutor from app.services.hyperliquid_trading_service import get_hyperliquid_service from app.utils.logger import logger class HyperliquidExecutor(BaseExecutor): """Hyperliquid 实盘交易执行器""" def __init__(self): super().__init__("Hyperliquid") self.hyperliquid = get_hyperliquid_service() # ==================== 核心执行方法 ==================== async def execute_open(self, decision: Dict[str, Any], current_price: float) -> Dict[str, Any]: """执行开仓""" try: symbol = decision.get('symbol', '').replace('USDT', '') action = decision.get('action') # buy/sell margin = decision.get('margin', decision.get('quantity', 0)) entry_price = decision.get('entry_price', current_price) stop_loss = decision.get('stop_loss') take_profit = decision.get('take_profit') # 决定订单类型 order_type, order_reason = self.decide_order_type(decision, current_price) logger.info(f" 订单类型: {order_reason}") # 调整保证金(预留手续费) account_state = self.hyperliquid.get_account_state() available = account_state.get('available_balance', 0) adjusted_margin = self.calculate_effective_margin(available, margin) # 计算仓位大小 leverage = min(decision.get('leverage', 5), 10) position_size = self._calculate_position_size(symbol, adjusted_margin, entry_price, leverage) if position_size <= 0: return { 'success': False, 'error': f'仓位计算失败: {position_size}' } # 设置杠杆 self.hyperliquid.update_leverage(symbol, leverage) # 下单(Hyperliquid 支持在下单时设置 TP/SL) is_buy = (action == 'buy') order_params = { 'symbol': symbol, 'is_buy': is_buy, 'size': position_size, 'price': entry_price if order_type == 'limit' else None, 'order_type': order_type, } # 如果支持下单时设置 TP/SL,添加到参数中 if stop_loss or take_profit: if self.should_set_tp_sl_on_order(): order_params['sl'] = stop_loss order_params['tp'] = take_profit logger.info(f" 下单时设置 TP/SL: SL=${stop_loss}, TP={take_profit}") result = self.hyperliquid.place_order(**order_params) if not result.get('success'): return result order_id = result.get('order_id') order_status = result.get('order_status', 'filled') logger.info(f" ✅ 开仓成功: {symbol} {position_size} @ ${order_type}") # 发送飞书通知 await self.send_execution_notification( operation='OPEN', symbol=symbol, result=result, details={ 'size': position_size, 'price': entry_price, 'margin': adjusted_margin, 'leverage': leverage, 'stop_loss': stop_loss, 'take_profit': take_profit, 'order_type': order_type } ) # 如果成交且未在下单时设置 TP/SL,单独设置 if order_status == 'filled' and not self.should_set_tp_sl_on_order(): if stop_loss or take_profit: tp_sl_result = await self.set_stop_loss_take_profit( symbol, order_id, stop_loss, take_profit, position_size ) if not tp_sl_result.get('success'): result['tp_sl_warning'] = tp_sl_result.get('message') return result except Exception as e: logger.error(f"Hyperliquid 开仓失败: {e}") error_result = {'success': False, 'error': str(e)} # 发送失败通知 await self.send_execution_notification( operation='OPEN', symbol=decision.get('symbol', ''), result=error_result ) return error_result async def execute_close(self, decision: Dict[str, Any], current_price: float) -> Dict[str, Any]: """执行平仓""" try: symbol = decision.get('symbol', '').replace('USDT', '') orders_to_close = decision.get('orders_to_close', []) if not orders_to_close: # 平掉所有持仓 result = self.hyperliquid.market_close_all() logger.info(f" ✅ 平仓所有持仓") # 发送飞书通知 await self.send_execution_notification( operation='CLOSE', symbol=symbol, result=result ) return result # 平掉指定订单 results = [] for order_id in orders_to_close: close_result = self.hyperliquid.close_position(symbol, order_id) results.append({ 'order_id': order_id, 'success': close_result.get('success', False) }) success_result = {'success': True, 'results': results} # 发送飞书通知 await self.send_execution_notification( operation='CLOSE', symbol=symbol, result=success_result ) return success_result except Exception as e: logger.error(f"Hyperliquid 平仓失败: {e}") error_result = {'success': False, 'error': str(e)} # 发送失败通知 await self.send_execution_notification( operation='CLOSE', symbol=decision.get('symbol', ''), result=error_result ) return error_result async def execute_cancel(self, order_id: str, symbol: str) -> Dict[str, Any]: """执行撤单""" try: result = self.hyperliquid.cancel_order(symbol.replace('USDT', ''), order_id) logger.info(f" ✅ 撤单成功: {order_id}") # 发送飞书通知 await self.send_execution_notification( operation='CANCEL', symbol=symbol, result=result, details={'order_id': order_id} ) return result except Exception as e: logger.error(f"Hyperliquid 撤单失败: {e}") error_result = {'success': False, 'error': str(e), 'order_id': order_id} # 发送失败通知 await self.send_execution_notification( operation='CANCEL', symbol=symbol, result=error_result, details={'order_id': order_id} ) return error_result async def set_stop_loss_take_profit(self, symbol: str, order_id: str, stop_loss: Optional[float], take_profit: Optional[float], position_size: float) -> Dict[str, Any]: """设置止盈止损""" try: # Hyperliquid 的 TP/SL 设置方式可能需要查文档 # 这里假设有类似的方法 result = self.hyperliquid.set_tp_sl( symbol=symbol.replace('USDT', ''), size=position_size, tp_price=take_profit, sl_price=stop_loss ) if result.get('success'): logger.info(f" ✅ 止盈止损设置成功: SL=${stop_loss}, TP={take_profit}") return result except Exception as e: logger.error(f"Hyperliquid 设置止盈止损失败: {e}") return {'success': False, 'message': str(e)} def should_set_tp_sl_on_order(self) -> bool: """Hyperliquid 支持在下单时设置 TP/SL""" return True # ==================== 平台特定配置 ==================== def get_market_order_threshold(self) -> float: """市价单阈值: 0.1% (Hyperliquid 流动性好)""" return 0.1 def get_pending_order_timeout(self) -> float: """挂单超时: 4 小时""" return 4.0 def get_position_exit_rules(self) -> tuple: """持仓退出规则: (目标盈利 2.5%, 最大持仓 4h)""" return (2.5, 4.0) def get_fee_rate(self) -> float: """手续费率: 0.05% (taker)""" return 0.0005 def get_max_retries(self) -> int: """最大重试次数: 5""" return 5 def is_rate_limit_error(self, error_msg: str) -> bool: """判断是否是限流错误""" rate_limit_indicators = [ 'rate limit', 'too many requests', '429', 'limit exceeded' ] return any(indicator in error_msg.lower() for indicator in rate_limit_indicators) def get_rate_limit_wait_time(self, error_msg: str, attempt: int) -> float: """获取限流等待时间""" import random base_wait = min(2 ** attempt, 30) # 指数退避,最大 30s jitter = random.uniform(0.5, 1.5) return base_wait * jitter def get_price_update_threshold(self) -> float: """价格更新阈值: 0.5%""" return 0.5 # ==================== 辅助方法 ==================== def _calculate_position_size(self, symbol: str, margin: float, price: float, leverage: int) -> float: """计算仓位大小""" try: # Hyperliquid 的仓位计算 position_value = margin * leverage position_size = position_value / price logger.info(f" 仓位计算: ${margin:.2f} × {leverage}x = ${position_value:.2f} → {position_size:.6f} {symbol}") return position_size except Exception as e: logger.error(f"计算仓位大小失败: {e}") return 0 # ==================== 移动止损 ==================== async def move_stop_loss(self, symbol: str, new_stop_loss: float, current_stop_loss: Optional[float] = None) -> Dict[str, Any]: """ 移动止损(Hyperliquid) Args: symbol: 交易对 new_stop_loss: 新止损价 current_stop_loss: 当前止损价(可选) Returns: {'success': bool, 'message': str} """ try: # Hyperliquid 使用 set_tp_sl 方法(只传 sl_price) result = self.hyperliquid.set_tp_sl( symbol=symbol.replace('USDT', ''), sl_price=new_stop_loss ) if result.get('success', False): logger.info(f" ✅ 移动止损成功: {symbol} → ${new_stop_loss:.2f}") return {'success': True, 'message': f'移动止损成功: {new_stop_loss:.2f}'} else: return {'success': False, 'message': result.get('message', '移动止损失败')} except Exception as e: logger.error(f"Hyperliquid 移动止损失败: {e}") return {'success': False, 'message': str(e)}