""" Hyperliquid 交易服务 - ClawFi 集成 """ import os from typing import Dict, Any, Optional, List from datetime import datetime from app.config import get_settings from app.utils.logger import logger try: from hyperliquid.info import Info from hyperliquid.exchange import Exchange from eth_account import Account HYPERLIQUID_AVAILABLE = True except ImportError: HYPERLIQUID_AVAILABLE = False logger.warning("Hyperliquid SDK 未安装,请运行: npx clawfi-hyperliquid-skill") class HyperliquidTradingService: """Hyperliquid 交易服务(ClawFi 集成)""" def __init__(self): """初始化 Hyperliquid 交易服务""" if not HYPERLIQUID_AVAILABLE: raise ImportError("Hyperliquid SDK 未安装") self.settings = get_settings() # 从环境变量加载认证信息 self.wallet_address = os.getenv("CLAWFI_WALLET_ADDRESS") self.private_key = os.getenv("CLAWFI_PRIVATE_KEY") if not self.wallet_address or not self.private_key: raise ValueError( "缺少 Hyperliquid 认证信息。请运行: " "npx clawfi-hyperliquid-skill --wallet=0x... --key=0x..." ) # 风控配置 self.max_total_leverage = self.settings.hyperliquid_max_total_leverage self.circuit_breaker_drawdown = self.settings.hyperliquid_circuit_breaker_drawdown self.max_single_position = self.settings.hyperliquid_max_single_position # 初始化 SDK self.info = Info(base_url="https://api.hyperliquid.xyz") account = Account.from_key(self.private_key) self.exchange = Exchange(account, base_url="https://api.hyperliquid.xyz", account_address=self.wallet_address) # 初始账户价值(用于熔断检查) self.initial_balance: Optional[float] = None self._initialize_account() logger.info(f"Hyperliquid 交易服务初始化完成") logger.info(f" 钱包地址: {self.wallet_address}") logger.info(f" 总杠杆上限: {self.max_total_leverage}x") logger.info(f" 熔断阈值: {self.circuit_breaker_drawdown * 100}%") def _initialize_account(self): """初始化账户信息""" try: state = self.get_account_state() self.initial_balance = state["account_value"] logger.info(f" 初始账户价值: ${self.initial_balance:,.2f}") except Exception as e: logger.error(f"初始化账户失败: {e}") raise def get_account_state(self) -> Dict[str, Any]: """获取账户状态""" try: state = self.info.user_state(self.wallet_address) margin_summary = state.get("marginSummary", {}) account_value = float(margin_summary.get("accountValue", 0)) total_margin_used = float(margin_summary.get("totalMarginUsed", 0)) return { "account_value": account_value, "total_margin_used": total_margin_used, "available_balance": account_value - total_margin_used, "positions": state.get("assetPositions", []), "margin_summary": margin_summary } except Exception as e: logger.error(f"获取账户状态失败: {e}") raise def check_risk_limits(self) -> Dict[str, Any]: """ 检查风险限制(ClawFi 强制规则) Returns: 风险检查结果 """ state = self.get_account_state() current_value = state["account_value"] # 计算回撤 if self.initial_balance is None: self.initial_balance = current_value drawdown = (self.initial_balance - current_value) / self.initial_balance if self.initial_balance > 0 else 0 # 10% 熔断检查 circuit_breaker_triggered = drawdown >= self.circuit_breaker_drawdown if circuit_breaker_triggered: logger.error(f"🚨 触发 10% 熔断!当前回撤: {drawdown * 100:.2f}%") # 平掉所有持仓 self.market_close_all() raise Exception(f"触发 10% 熔断 - 所有持仓已平仓(回撤: {drawdown * 100:.2f}%)") return { "initial_balance": self.initial_balance, "current_value": current_value, "drawdown": drawdown, "drawdown_percent": drawdown * 100, "circuit_breaker_triggered": circuit_breaker_triggered, "safe_to_trade": not circuit_breaker_triggered } def update_leverage(self, symbol: str, leverage: int): """ 更新杠杆(必须在开仓前调用) Args: symbol: 交易对(如 "BTC") leverage: 杠杆倍数(≤10) """ if leverage > 10: raise ValueError(f"杠杆不能超过 10x(ClawFi 规则),当前: {leverage}x") try: result = self.exchange.update_leverage(leverage, symbol, is_cross=False) logger.info(f"更新杠杆: {symbol} → {leverage}x") return result except Exception as e: logger.error(f"更新杠杆失败: {e}") raise def place_market_order( self, symbol: str, is_buy: bool, size: float, reduce_only: bool = False ) -> Dict[str, Any]: """ 下市价单 Args: symbol: 交易对(如 "BTC") is_buy: True=做多,False=做空 size: 数量 reduce_only: 是否仅平仓 """ # 风险检查 self.check_risk_limits() try: result = self.exchange.market_open(symbol, is_buy, size, reduce_only=reduce_only) side = "买入" if is_buy else "卖出" logger.info(f"✅ Hyperliquid 市价单: {side} {symbol} {size}") return { "success": True, "symbol": symbol, "side": "buy" if is_buy else "sell", "size": size, "result": result } except Exception as e: logger.error(f"下单失败: {e}") return { "success": False, "error": str(e) } def place_limit_order( self, symbol: str, is_buy: bool, size: float, price: float, reduce_only: bool = False ) -> Dict[str, Any]: """下限价单""" self.check_risk_limits() try: result = self.exchange.order(symbol, is_buy, size, price, {"limit": {"tif": "Gtc"}}, reduce_only=reduce_only) side = "买入" if is_buy else "卖出" logger.info(f"✅ Hyperliquid 限价单: {side} {symbol} {size} @ ${price}") return { "success": True, "symbol": symbol, "side": "buy" if is_buy else "sell", "size": size, "price": price, "result": result } except Exception as e: logger.error(f"下单失败: {e}") return { "success": False, "error": str(e) } def get_open_orders(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]: """ 获取所有挂单(包括止盈止损订单) Args: symbol: 可选,指定币种 Returns: 挂单列表 """ try: # Hyperliquid 没有直接的获取挂单 API,需要通过 user_state # 注意:这个方法可能需要根据实际 API 调整 state = self.info.user_state(self.wallet_address) open_orders = state.get("openOrders", []) orders = [] for order in open_orders: coin = order.get("coin") if symbol and coin != symbol: continue orders.append({ "order_id": order.get("oid"), "symbol": coin, "side": order.get("side"), "size": float(order.get("totalSz", 0)), "price": float(order.get("limitPx", 0)), "is_reduce_only": order.get("reduceOnly", False), "order_type": order.get("orderType", {}) }) return orders except Exception as e: logger.error(f"获取挂单失败: {e}") return [] def get_tp_sl_prices(self, symbol: str) -> Dict[str, Optional[float]]: """ 获取指定币种的止盈止损价格 Args: symbol: 币种(如 "BTC") Returns: {'take_profit': price, 'stop_loss': price} """ try: orders = self.get_open_orders(symbol) tp_price = None sl_price = None for order in orders: if not order.get("is_reduce_only"): continue order_type = order.get("order_type", {}) # 止盈:限价单 if "limit" in order_type and order["price"] > 0: tp_price = order["price"] # 止损:触发单 if "trigger" in order_type: trigger_px = order_type.get("trigger", {}).get("triggerPx") if trigger_px: sl_price = float(trigger_px) return { "take_profit": tp_price, "stop_loss": sl_price } except Exception as e: logger.error(f"获取止盈止损价格失败: {e}") return {"take_profit": None, "stop_loss": None} def set_tp_sl( self, symbol: str, is_long: bool, size: float, tp_price: Optional[float] = None, sl_price: Optional[float] = None ) -> Dict[str, Any]: """ 设置止盈止损(开仓后调用) Args: symbol: 币种(如 "BTC") is_long: 是否多头 size: 数量 tp_price: 止盈价格(可选) sl_price: 止损价格(可选) Returns: 执行结果 """ try: results = [] close_is_buy = not is_long # 平多头=卖出,平空头=买入 # 设置止盈(限价单) if tp_price: tp_result = self.exchange.order( symbol, close_is_buy, size, tp_price, {"limit": {"tif": "Gtc"}}, reduce_only=True ) results.append({"type": "take_profit", "result": tp_result}) logger.info(f"✅ 设置止盈: {symbol} @ ${tp_price}") # 设置止损(触发单) if sl_price: # 触发价格需要稍微偏离(避免滑点问题) exec_px = sl_price * 0.999 if close_is_buy else sl_price * 1.001 sl_result = self.exchange.order( symbol, close_is_buy, size, exec_px, {"trigger": {"triggerPx": sl_price, "isMarket": True, "tpsl": "sl"}}, reduce_only=True ) results.append({"type": "stop_loss", "result": sl_result}) logger.info(f"✅ 设置止损: {symbol} @ ${sl_price}(触发)") return { "success": True, "results": results } except Exception as e: logger.error(f"设置止盈止损失败: {e}") return { "success": False, "error": str(e) } def cancel_tp_sl_orders(self, symbol: str) -> Dict[str, Any]: """ 取消指定币种的所有止盈止损订单 Args: symbol: 币种(如 "BTC") Returns: 取消结果 """ try: orders = self.get_open_orders(symbol) cancelled_count = 0 for order in orders: if order.get("is_reduce_only"): result = self.exchange.cancel(symbol, order["order_id"]) if result.get("status") == "ok": cancelled_count += 1 logger.info(f"✅ 取消 {symbol} 的止盈止损订单: {cancelled_count} 个") return { "success": True, "cancelled_count": cancelled_count } except Exception as e: logger.error(f"取消止盈止损订单失败: {e}") return { "success": False, "error": str(e) } def cancel_order(self, symbol: str, order_id: int) -> Dict[str, Any]: """取消订单""" try: result = self.exchange.cancel(symbol, order_id) logger.info(f"取消订单: {symbol} #{order_id}") return {"success": True, "result": result} except Exception as e: logger.error(f"取消订单失败: {e}") return {"success": False, "error": str(e)} def cancel_all_orders(self, symbol: Optional[str] = None) -> Dict[str, Any]: """取消所有订单""" try: result = self.exchange.cancel_all_orders(symbol) logger.info(f"取消所有订单: {symbol or '全部'}") return {"success": True, "result": result} except Exception as e: logger.error(f"取消所有订单失败: {e}") return {"success": False, "error": str(e)} def market_close_all(self) -> Dict[str, Any]: """紧急平仓所有持仓(熔断时使用)""" try: state = self.get_account_state() positions = state["positions"] results = [] for pos in positions: position_data = pos.get("position", {}) coin = position_data.get("coin") size = float(position_data.get("szi", 0)) if size == 0: continue # 取消该币种的所有挂单(包括止盈止损) self.cancel_all_orders(coin) is_long = size > 0 result = self.place_market_order( symbol=coin, is_buy=not is_long, # 平多头=卖出,平空头=买入 size=abs(size), reduce_only=True ) results.append(result) logger.info(f"🚨 紧急平仓完成,共平仓 {len(results)} 个持仓") return {"success": True, "closed_positions": len(results), "results": results} except Exception as e: logger.error(f"紧急平仓失败: {e}") return {"success": False, "error": str(e)} def get_open_positions(self) -> List[Dict[str, Any]]: """获取所有持仓""" try: state = self.get_account_state() positions = [] for pos in state["positions"]: position_data = pos.get("position", {}) coin = position_data.get("coin") size = float(position_data.get("szi", 0)) if size == 0: continue positions.append({ "coin": coin, "size": size, # 正数=多头,负数=空头 "entry_price": float(position_data.get("entryPx", 0)), "unrealized_pnl": float(position_data.get("unrealizedPnl", 0)), "leverage": position_data.get("leverage", {}).get("value"), "liquidation_price": position_data.get("liquidationPx"), "position": position_data # 保留原始数据 }) return positions except Exception as e: logger.error(f"获取持仓失败: {e}") return [] def get_position_for_symbol(self, symbol: str) -> Optional[Dict[str, Any]]: """获取指定币种的持仓""" positions = self.get_open_positions() for pos in positions: if pos["coin"] == symbol: return pos return None # 单例 _hyperliquid_service_instance = None def get_hyperliquid_service() -> Optional[HyperliquidTradingService]: """获取 Hyperliquid 交易服务单例""" global _hyperliquid_service_instance settings = get_settings() # 如果未启用,返回 None if not settings.hyperliquid_trading_enabled: return None if _hyperliquid_service_instance is None: try: _hyperliquid_service_instance = HyperliquidTradingService() except Exception as e: logger.error(f"初始化 Hyperliquid 服务失败: {e}") return None return _hyperliquid_service_instance