stock-ai-agent/backend/app/crypto_agent/executor/hyperliquid_executor.py
2026-03-30 21:29:23 +08:00

329 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Hyperliquid 实盘交易执行器
"""
from typing import Dict, Any, List, Optional
from app.crypto_agent.executor.base_executor import BaseExecutor
from app.services.hyperliquid_trading_service import get_hyperliquid_service
from app.utils.logger import logger
class HyperliquidExecutor(BaseExecutor):
"""Hyperliquid 实盘交易执行器"""
def __init__(self):
super().__init__("Hyperliquid")
self.hyperliquid = get_hyperliquid_service()
# ==================== 核心执行方法 ====================
async def execute_open(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行开仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
action = decision.get('signal_action', decision.get('action')) # buy/sell
margin = decision.get('margin', decision.get('quantity', 0))
entry_price = decision.get('entry_price', current_price)
stop_loss = decision.get('stop_loss')
take_profit = decision.get('take_profit')
# 决定订单类型
order_type, order_reason = self.decide_order_type(decision, current_price)
logger.info(f" 订单类型: {order_reason}")
# 调整保证金(预留手续费)
account_state = self.hyperliquid.get_account_state()
available = account_state.get('available_balance', 0)
adjusted_margin = self.calculate_effective_margin(available, margin)
# 计算仓位大小
leverage = min(decision.get('leverage', 10), 10)
position_size = self._calculate_position_size(symbol, adjusted_margin, entry_price, leverage)
if position_size <= 0:
return {
'success': False,
'error': f'仓位计算失败: {position_size}'
}
# 设置杠杆
self.hyperliquid.update_leverage(symbol, leverage)
# 下单
is_buy = (action == 'buy')
if order_type == 'market':
# 市价单
result = self.hyperliquid.place_market_order(
symbol=symbol,
is_buy=is_buy,
size=position_size,
reduce_only=False
)
else:
# 限价单
result = self.hyperliquid.place_limit_order(
symbol=symbol,
is_buy=is_buy,
size=position_size,
price=entry_price,
reduce_only=False
)
if not result.get('success'):
return result
order_id = result.get('order_id')
order_status = result.get('order_status', 'filled')
logger.info(f" ✅ 开仓成功: {symbol} {position_size} @ ${order_type}")
# 发送飞书通知
await self.send_execution_notification(
operation='OPEN',
symbol=symbol,
result=result,
details={
'size': position_size,
'price': entry_price,
'margin': adjusted_margin,
'leverage': leverage,
'stop_loss': stop_loss,
'take_profit': take_profit,
'order_type': order_type
}
)
# 成交后单独设置止盈止损Hyperliquid 不支持下单时设置 TP/SL
if order_status == 'filled' and (stop_loss or take_profit):
tp_sl_result = self.hyperliquid.set_tp_sl(
symbol=symbol,
is_long=is_buy,
size=position_size,
tp_price=take_profit,
sl_price=stop_loss
)
if not tp_sl_result.get('success'):
logger.warning(f" ⚠️ 止盈止损设置失败: {tp_sl_result.get('message')}")
result['tp_sl_warning'] = tp_sl_result.get('message')
return result
except Exception as e:
logger.error(f"Hyperliquid 开仓失败: {e}")
error_result = {'success': False, 'error': str(e)}
# 发送失败通知
await self.send_execution_notification(
operation='OPEN',
symbol=decision.get('symbol', ''),
result=error_result
)
return error_result
async def execute_close(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行平仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
orders_to_close = decision.get('orders_to_close', [])
result = self.hyperliquid.market_close_position(symbol)
if result.get('success'):
logger.info(f" ✅ 平仓成功: {symbol}")
else:
logger.warning(f" ⚠️ 平仓失败: {symbol} - {result.get('error', '未知错误')}")
if orders_to_close:
result['requested_order_ids'] = orders_to_close
# 发送飞书通知
await self.send_execution_notification(
operation='CLOSE',
symbol=symbol,
result=result
)
return result
except Exception as e:
logger.error(f"Hyperliquid 平仓失败: {e}")
error_result = {'success': False, 'error': str(e)}
# 发送失败通知
await self.send_execution_notification(
operation='CLOSE',
symbol=decision.get('symbol', ''),
result=error_result
)
return error_result
async def execute_cancel(self, order_id: str, symbol: str) -> Dict[str, Any]:
"""执行撤单"""
try:
result = self.hyperliquid.cancel_order(symbol.replace('USDT', ''), order_id)
logger.info(f" ✅ 撤单成功: {order_id}")
# 发送飞书通知
await self.send_execution_notification(
operation='CANCEL',
symbol=symbol,
result=result,
details={'order_id': order_id}
)
return result
except Exception as e:
logger.error(f"Hyperliquid 撤单失败: {e}")
error_result = {'success': False, 'error': str(e), 'order_id': order_id}
# 发送失败通知
await self.send_execution_notification(
operation='CANCEL',
symbol=symbol,
result=error_result,
details={'order_id': order_id}
)
return error_result
async def set_stop_loss_take_profit(self,
symbol: str,
order_id: str,
stop_loss: Optional[float],
take_profit: Optional[float],
position_size: float) -> Dict[str, Any]:
"""设置止盈止损"""
try:
# Hyperliquid 的 TP/SL 设置方式可能需要查文档
# 这里假设有类似的方法
result = self.hyperliquid.set_tp_sl(
symbol=symbol.replace('USDT', ''),
size=position_size,
tp_price=take_profit,
sl_price=stop_loss
)
if result.get('success'):
logger.info(f" ✅ 止盈止损设置成功: SL=${stop_loss}, TP={take_profit}")
return result
except Exception as e:
logger.error(f"Hyperliquid 设置止盈止损失败: {e}")
return {'success': False, 'message': str(e)}
def should_set_tp_sl_on_order(self) -> bool:
"""Hyperliquid 支持在下单时设置 TP/SL"""
return True
# ==================== 平台特定配置 ====================
def get_market_order_threshold(self) -> float:
"""市价单阈值: 0.1% (Hyperliquid 流动性好)"""
return 0.1
def get_pending_order_timeout(self) -> float:
"""挂单超时: 4 小时"""
return 4.0
def get_position_exit_rules(self) -> tuple:
"""持仓退出规则: (目标盈利 2.5%, 无最大持仓时间限制)"""
return (2.5, float('inf'))
def get_fee_rate(self) -> float:
"""手续费率: 0.05% (taker)"""
return 0.0005
def get_max_retries(self) -> int:
"""最大重试次数: 5"""
return 5
def is_rate_limit_error(self, error_msg: str) -> bool:
"""判断是否是限流错误"""
rate_limit_indicators = [
'rate limit',
'too many requests',
'429',
'limit exceeded'
]
return any(indicator in error_msg.lower() for indicator in rate_limit_indicators)
def get_rate_limit_wait_time(self, error_msg: str, attempt: int) -> float:
"""获取限流等待时间"""
import random
base_wait = min(2 ** attempt, 30) # 指数退避,最大 30s
jitter = random.uniform(0.5, 1.5)
return base_wait * jitter
def get_price_update_threshold(self) -> float:
"""价格更新阈值: 0.5%"""
return 0.5
# ==================== 辅助方法 ====================
def _calculate_position_size(self, symbol: str, margin: float, price: float, leverage: int) -> float:
"""计算仓位大小(按 Hyperliquid szDecimals 精度截断)"""
try:
import math
# Hyperliquid 的仓位计算
position_value = margin * leverage
position_size = position_value / price
# 按 Hyperliquid 要求的精度截断szDecimals
sz_decimals = self.hyperliquid.get_sz_decimals(symbol)
factor = 10 ** sz_decimals
position_size = math.floor(position_size * factor) / factor
logger.info(f" 仓位计算: ${margin:.2f} × {leverage}x = ${position_value:.2f}{position_size:.6f} {symbol} (精度: {sz_decimals}位)")
return position_size
except Exception as e:
logger.error(f"计算仓位大小失败: {e}")
return 0
# ==================== 移动止损 ====================
async def move_stop_loss(self,
symbol: str,
new_stop_loss: float,
current_stop_loss: Optional[float] = None) -> Dict[str, Any]:
"""
移动止损Hyperliquid
Args:
symbol: 交易对
new_stop_loss: 新止损价
current_stop_loss: 当前止损价(可选)
Returns:
{'success': bool, 'message': str}
"""
try:
position = self.hyperliquid.get_position_for_symbol(symbol)
if not position:
return {'success': False, 'message': f'找不到 {symbol} 的持仓'}
tp_sl_prices = self.hyperliquid.get_tp_sl_prices(symbol.replace('USDT', ''))
result = self.hyperliquid.set_tp_sl(
symbol=symbol.replace('USDT', ''),
is_long=position['size'] > 0,
size=abs(position['size']),
tp_price=tp_sl_prices.get('take_profit'),
sl_price=new_stop_loss
)
if result.get('success', False):
logger.info(f" ✅ 移动止损成功: {symbol} → ${new_stop_loss:.2f}")
return {'success': True, 'message': f'移动止损成功: {new_stop_loss:.2f}'}
else:
return {'success': False, 'message': result.get('error', result.get('message', '移动止损失败'))}
except Exception as e:
logger.error(f"Hyperliquid 移动止损失败: {e}")
return {'success': False, 'message': str(e)}