From de75b0419b497fa115622aa6453d59edd7fbd067 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Sat, 4 Apr 2026 23:12:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/crypto_agent/crypto_agent.py | 217 ++++++++++++++++-- .../crypto_agent/executor/base_executor.py | 67 ++++-- .../crypto_agent/market_signal_analyzer.py | 36 ++- 3 files changed, 284 insertions(+), 36 deletions(-) diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index 50a9adc..9998258 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -509,6 +509,7 @@ class CryptoAgent: await self._check_and_set_pending_tp_sl_hyperliquid() if self.bitget: await self._check_and_set_pending_tp_sl_bitget() + await self._check_bitget_missing_tp_sl() # 兜底:检查缺少的 TP/SL 并补救 for symbol in self.symbols: await self.analyze_symbol(symbol) @@ -721,13 +722,13 @@ class CryptoAgent: if self.settings.paper_trading_enabled: logger.info(f"\n📊 【模拟盘】") paper_positions, paper_account, paper_pending = self._get_paper_trading_state() - paper_signal = self._select_signal_for_platform(valid_signals, 'PaperTrading') + paper_signal = self._select_signal_for_platform(valid_signals, 'PaperTrading', market_state=market_signal.get('market_state', '中性')) if paper_signal: logger.info( f" 采用信号: {paper_signal.get('timeframe', 'unknown')} | " f"{paper_signal.get('action')} | {paper_signal.get('confidence', 0)}%" ) - trading_signal = self._build_execution_signal(symbol, paper_signal, current_price) + trading_signal = self._build_execution_signal(symbol, paper_signal, current_price, market_signal) paper_decision = self.execute_signal_with_rules( trading_signal, 'PaperTrading', paper_account, paper_positions, paper_pending ) @@ -745,13 +746,13 @@ class CryptoAgent: if self.hyperliquid: logger.info(f"\n🔥 【Hyperliquid】") hl_positions, hl_account, hl_pending = self._get_hyperliquid_trading_state() - hl_signal = self._select_signal_for_platform(valid_signals, 'Hyperliquid') + hl_signal = self._select_signal_for_platform(valid_signals, 'Hyperliquid', market_state=market_signal.get('market_state', '中性')) if hl_signal: logger.info( f" 采用信号: {hl_signal.get('timeframe', 'unknown')} | " f"{hl_signal.get('action')} | {hl_signal.get('confidence', 0)}%" ) - trading_signal = self._build_execution_signal(symbol, hl_signal, current_price) + trading_signal = self._build_execution_signal(symbol, hl_signal, current_price, market_signal) hl_decision = self.execute_signal_with_rules( trading_signal, 'Hyperliquid', hl_account, hl_positions, hl_pending ) @@ -769,13 +770,13 @@ class CryptoAgent: if self.bitget: logger.info(f"\n🔥 【Bitget】") bg_positions, bg_account, bg_pending = self._get_bitget_trading_state() - bg_signal = self._select_signal_for_platform(valid_signals, 'Bitget') + bg_signal = self._select_signal_for_platform(valid_signals, 'Bitget', market_state=market_signal.get('market_state', '中性')) if bg_signal: logger.info( f" 采用信号: {bg_signal.get('timeframe', 'unknown')} | " f"{bg_signal.get('action')} | {bg_signal.get('confidence', 0)}%" ) - trading_signal = self._build_execution_signal(symbol, bg_signal, current_price) + trading_signal = self._build_execution_signal(symbol, bg_signal, current_price, market_signal) bg_decision = self.execute_signal_with_rules( trading_signal, 'Bitget', bg_account, bg_positions, bg_pending ) @@ -1223,25 +1224,48 @@ class CryptoAgent: sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True) return sorted_signals[0] - def _select_signal_for_platform(self, signals: List[Dict[str, Any]], platform_name: str) -> Optional[Dict[str, Any]]: - """根据平台偏好选择最适合执行的信号""" + def _select_signal_for_platform(self, signals: List[Dict[str, Any]], + platform_name: str, + market_state: str = '中性') -> Optional[Dict[str, Any]]: + """根据平台偏好和市场状态选择最适合执行的信号""" if not signals: return None + # 震荡市:趋势信号降权(信心 × 0.8),优先选择日内反转信号 + adjusted_signals = [] + for signal in signals: + s = dict(signal) # 不修改原信号 + confidence = s.get('confidence', 50) + lane = s.get('timeframe') or s.get('type') or 'unknown' + + if market_state == '震荡市' and lane == 'medium_term': + confidence = int(confidence * 0.8) + s['_regime_adjusted'] = True + s['_original_confidence'] = s.get('confidence', 50) + s['confidence'] = confidence + elif market_state in ('趋势市', '日内趋势') and lane == 'short_term': + pass # 趋势市不降权日内信号 + adjusted_signals.append(s) + lane_priority = self.PLATFORM_SIGNAL_PRIORITY.get(platform_name, ['short_term', 'medium_term']) by_lane: Dict[str, List[Dict[str, Any]]] = {} - for signal in signals: + for signal in adjusted_signals: lane = signal.get('timeframe') or signal.get('type') or 'unknown' by_lane.setdefault(lane, []).append(signal) for lane in lane_priority: candidates = by_lane.get(lane, []) if candidates: - return sorted(candidates, key=lambda item: item.get('confidence', 0), reverse=True)[0] + best = sorted(candidates, key=lambda item: item.get('confidence', 0), reverse=True)[0] + if best.get('_regime_adjusted'): + logger.info(f" 📊 震荡市降权: {lane} 原始信心={best.get('_original_confidence')} → 调整后={best.get('confidence')}") + return best - return sorted(signals, key=lambda item: item.get('confidence', 0), reverse=True)[0] + return sorted(adjusted_signals, key=lambda item: item.get('confidence', 0), reverse=True)[0] - def _build_execution_signal(self, symbol: str, signal: Dict[str, Any], current_price: float) -> Dict[str, Any]: + def _build_execution_signal(self, symbol: str, signal: Dict[str, Any], + current_price: float, + market_signal: Dict[str, Any] = None) -> Dict[str, Any]: """构建传给执行规则层的标准信号格式""" signal_type = signal.get('timeframe') or signal.get('type') or 'medium_term' position_size = signal.get('position_size') or self.SIGNAL_POSITION_SIZE_DEFAULTS.get(signal_type, 'light') @@ -1259,6 +1283,9 @@ class CryptoAgent: 'timeframe': signal_type, 'type': signal_type, 'position_size': position_size, + 'market_state': market_signal.get('market_state', '中性') if market_signal else '中性', + 'regime': (market_signal.get('range_metrics') or {}).get('regime', '') if market_signal else '', + 'funding_rate_data': market_signal.get('funding_rate_data') if market_signal else None, } def _get_signal_for_decision(self, market_signal: Dict[str, Any], decision: Dict[str, Any]) -> Dict[str, Any]: @@ -2254,6 +2281,25 @@ class CryptoAgent: default_positions=self.SIGNAL_POSITION_SIZE_DEFAULTS, ) + # 市场状态仓位调整:震荡市降低仓位,避免来回被止损 + regime = signal.get('regime', '') + REGIME_MARGIN_MULTIPLIERS = { + 'ranging': 0.5, # 震荡市:仓位减半 + 'transitional': 0.7, # 过渡期:仓位七折 + 'weak_trend': 0.9, # 弱趋势:仓位九折 + 'strong_trend': 1.0, # 强趋势:满仓 + } + regime_multiplier = REGIME_MARGIN_MULTIPLIERS.get(regime, 1.0) + if regime_multiplier < 1.0: + logger.info(f" 📊 市场状态调整: regime={regime}, 仓位系数={regime_multiplier}") + target_margin_pct *= regime_multiplier + + # 连败降温:连败时降低仓位 + streak_multiplier = signal.get('_streak_margin_multiplier', 1.0) + if streak_multiplier < 1.0: + logger.info(f" 📊 连败降温: 仓位系数={streak_multiplier}") + target_margin_pct *= streak_multiplier + margin, _, budget_reason = calculate_margin_and_position_value( balance=balance, available_margin=available, @@ -2386,6 +2432,53 @@ class CryptoAgent: # 无反向订单 → 正常开仓 return "OPEN", "无反向订单,正常开仓" + def _check_losing_streak(self, platform_name: str, max_lookback: int = 5) -> Dict[str, Any]: + """ + 检查近期交易连败情况 + + Returns: + { + 'losing_streak': int, + 'should_cool_down': bool, + 'margin_multiplier': float, + 'reason': str, + } + """ + recent_orders = [] + + if platform_name == 'PaperTrading' and self.paper_trading: + recent_orders = self.paper_trading.get_order_history(limit=max_lookback) + # Bitget/Hyperliquid 实盘暂不查询历史(API 限制),用空列表 + + if not recent_orders: + return {'losing_streak': 0, 'should_cool_down': False, 'margin_multiplier': 1.0, 'reason': ''} + + # 计算连败(从最近的交易往回数) + losing_streak = 0 + for order in recent_orders: + pnl = order.get('pnl_amount', 0) or 0 + if pnl < 0: + losing_streak += 1 + else: + break # 遇到盈利就停止计数 + + if losing_streak >= 3: + return { + 'losing_streak': losing_streak, + 'should_cool_down': True, + 'margin_multiplier': 0.3, + 'reason': f"连败 {losing_streak} 次,降温(仓位×0.3)" + } + elif losing_streak >= 2: + return { + 'losing_streak': losing_streak, + 'should_cool_down': True, + 'margin_multiplier': 0.5, + 'reason': f"连败 {losing_streak} 次,降温(仓位×0.5)" + } + + return {'losing_streak': losing_streak, 'should_cool_down': False, 'margin_multiplier': 1.0, 'reason': ''} + def _check_risk_control(self, signal: Dict[str, Any], platform_name: str, account: Dict[str, Any], @@ -2456,6 +2549,22 @@ class CryptoAgent: except: pass # 价格解析失败,跳过检查 + # 5. 资金费率检查(极端资金费率时拒绝开仓) + funding_data = signal.get('funding_rate_data') + if funding_data: + fr_pct = funding_data.get('funding_rate_percent', 0) or 0 + signal_action = signal.get('action', '') + if signal_action == 'buy' and fr_pct > 0.05: + return False, f"资金费率过热 {fr_pct:.4f}%(做多拥挤),拒绝做多" + elif signal_action == 'sell' and fr_pct < -0.05: + return False, f"资金费率过冷 {fr_pct:.4f}%(做空拥挤),拒绝做空" + + # 6. 连败降温检查(不拒绝,但降低仓位) + streak_info = self._check_losing_streak(platform_name) + if streak_info.get('should_cool_down'): + signal['_streak_margin_multiplier'] = streak_info['margin_multiplier'] + logger.warning(f"[{platform_name}] ⚠️ {streak_info['reason']}") + return True, "通过风控检查" def execute_signal_with_rules(self, signal: Dict[str, Any], @@ -3212,6 +3321,77 @@ class CryptoAgent: except Exception as e: logger.error(f"[Bitget] 检查挂单 TP/SL 补设异常: {e}") + async def _check_bitget_missing_tp_sl(self): + """定时检查 Bitget 持仓是否缺少止盈止损,缺少则从信号补救""" + if not self.bitget: + return + try: + positions = self.bitget.get_open_positions() + if not positions: + return + + for pos in positions: + symbol = pos.get('symbol', '') # e.g. "BTCUSDT" + if not symbol: + continue + + # 获取当前止盈止损 + coin = symbol.replace('USDT', '') + tp_sl = self.bitget.get_tp_sl_prices(coin) + has_tp = tp_sl.get('take_profit') is not None + has_sl = tp_sl.get('stop_loss') is not None + + if has_tp and has_sl: + continue # 都有,跳过 + + # 缺少 TP 或 SL,从信号数据库查找最近信号补救 + latest_signal = self.signal_db.get_latest_signal('crypto', symbol) + if not latest_signal: + missing = ('止盈' if not has_tp else '') + ('/' if not has_tp and not has_sl else '') + ('止损' if not has_sl else '') + logger.warning(f"[Bitget] ⚠️ {symbol} 缺少{missing},且无历史信号可补救") + continue + + tp_price = latest_signal.get('take_profit') + sl_price = latest_signal.get('stop_loss') + + if not tp_price and not sl_price: + logger.warning(f"[Bitget] ⚠️ {symbol} 缺少止盈止损,最近信号也无 TP/SL") + continue + + # 只补救缺少的 + set_tp = tp_price if not has_tp else None + set_sl = sl_price if not has_sl else None + + missing_parts = [] + if not has_tp: + missing_parts.append(f"TP={set_tp}") + if not has_sl: + missing_parts.append(f"SL={set_sl}") + missing_desc = ' & '.join(missing_parts) + + logger.warning(f"[Bitget] 🔧 {symbol} 缺少 {missing_desc},从信号补救...") + + # 用仓位实际大小设置 + size = abs(pos.get('size', 0)) + if size <= 0: + continue + + tp_sl_result = self.bitget.set_tp_sl( + symbol=coin, + is_long=pos.get('size', 0) > 0, + size=size, + tp_price=set_tp, + sl_price=set_sl, + ) + + if tp_sl_result.get('success'): + logger.info(f"[Bitget] ✅ 补救成功: {symbol} {missing_desc}") + else: + logger.warning(f"[Bitget] ⚠️ 补救失败: {tp_sl_result.get('error')}") + + except Exception as e: + logger.error(f"[Bitget] 止盈止损兜底检查异常: {e}") + def _calculate_hyperliquid_position_size(self, decision: Dict[str, Any], current_price: float) -> float: """ 计算 Hyperliquid 仓位大小(基于可用保证金和风控限制) @@ -3858,12 +4038,19 @@ class CryptoAgent: async def _check_position_management_all_platforms(self): """检查各平台的持仓管理(止盈/止损/移动止损)""" try: - # 获取当前价格 + # 获取当前价格和 ATR 波动率 current_prices = {} + volatility_data = {} for symbol in self.symbols: try: data = self.exchange.get_multi_timeframe_data(symbol) current_prices[symbol] = float(data['5m'].iloc[-1]['close']) + # 提取 1h ATR 占价格的百分比,用于动态移动止损 + if '1h' in data and 'atr' in data['1h'].columns: + atr_value = data['1h']['atr'].iloc[-1] + price_1h = data['1h']['close'].iloc[-1] + if atr_value and price_1h > 0: + volatility_data[symbol] = float(atr_value) / float(price_1h) except: continue @@ -3881,8 +4068,8 @@ class CryptoAgent: if not positions: continue - # 检查持仓管理 - actions = executor.check_position_management(positions, current_prices) + # 检查持仓管理(传递 ATR 波动率数据) + actions = executor.check_position_management(positions, current_prices, volatility_data) # 执行建议的操作 for action_info in actions: diff --git a/backend/app/crypto_agent/executor/base_executor.py b/backend/app/crypto_agent/executor/base_executor.py index e03b10d..371ee7d 100644 --- a/backend/app/crypto_agent/executor/base_executor.py +++ b/backend/app/crypto_agent/executor/base_executor.py @@ -174,10 +174,16 @@ class BaseExecutor(ABC): def check_position_management(self, positions: List[Dict], - current_prices: Dict[str, float]) -> List[Dict[str, Any]]: + current_prices: Dict[str, float], + volatility_data: Optional[Dict[str, float]] = None) -> List[Dict[str, Any]]: """ 持仓管理检查 + Args: + positions: 持仓列表 + current_prices: 当前价格 {symbol: price} + volatility_data: 波动率数据 {symbol: atr_pct}(1h ATR / price) + Returns: 建议的操作列表 """ @@ -230,25 +236,58 @@ class BaseExecutor(ABC): 'priority': 2 }) - # 规则3: 移动止损 - if pnl_pct >= 2: - current_sl = pos.get('stop_loss') - if side == 'buy' and current_sl and current_sl < entry_price: + # 规则3: 动态移动止损(基于 ATR 波动率) + current_sl = pos.get('stop_loss') + if not current_sl: + continue + + # 获取波动率:ATR 占价格的百分比(如 0.015 = 1.5%) + atr_pct = (volatility_data.get(symbol) if volatility_data else None) or 0.02 + # 波动率倍数:低波动(震荡)给更多空间,高波动(趋势)收得更紧 + atr_multiplier = max(1.5, 2.0 / atr_pct) + + # 两级移动止损 + # 第一级:盈利 >= 0.6 * ATR% * multiplier → SL 移到入场价 ± 1倍ATR(给足震荡空间) + # 第二级:盈利 >= 1.0 * ATR% * multiplier → SL 移到保本 + step1_threshold = atr_pct * 100 * atr_multiplier * 0.6 + step2_threshold = atr_pct * 100 * atr_multiplier * 1.0 + + if pnl_pct >= step2_threshold: + # 盈利充足,移到保本 + if (side == 'buy' and current_sl < entry_price) or \ + (side == 'sell' and current_sl > entry_price): actions.append({ 'symbol': symbol, 'action': 'MOVE_SL', 'new_sl': entry_price, - 'reason': f"盈利 {pnl_pct:.1f}% >= 2%,移动止损到入场价", - 'priority': 3 - }) - elif side == 'sell' and current_sl and current_sl > entry_price: - actions.append({ - 'symbol': symbol, - 'action': 'MOVE_SL', - 'new_sl': entry_price, - 'reason': f"盈利 {pnl_pct:.1f}% >= 2%,移动止损到入场价", + 'pnl_pct': pnl_pct, + 'reason': f"盈利 {pnl_pct:.1f}% >= {step2_threshold:.1f}%,移动止损到保本", 'priority': 3 }) + elif pnl_pct >= step1_threshold: + # 盈利初步,移到入场价 ± 1倍ATR(保留震荡空间,不急于保本) + if side == 'buy' and current_sl < entry_price: + new_sl = entry_price * (1 - atr_pct) + if new_sl > current_sl: + actions.append({ + 'symbol': symbol, + 'action': 'MOVE_SL', + 'new_sl': new_sl, + 'pnl_pct': pnl_pct, + 'reason': f"盈利 {pnl_pct:.1f}% >= {step1_threshold:.1f}%,移动止损到入场价-1ATR (${new_sl:.2f})", + 'priority': 3 + }) + elif side == 'sell' and current_sl > entry_price: + new_sl = entry_price * (1 + atr_pct) + if new_sl < current_sl: + actions.append({ + 'symbol': symbol, + 'action': 'MOVE_SL', + 'new_sl': new_sl, + 'pnl_pct': pnl_pct, + 'reason': f"盈利 {pnl_pct:.1f}% >= {step1_threshold:.1f}%,移动止损到入场价+1ATR (${new_sl:.2f})", + 'priority': 3 + }) # 按优先级排序 actions.sort(key=lambda x: x.get('priority', 99)) diff --git a/backend/app/crypto_agent/market_signal_analyzer.py b/backend/app/crypto_agent/market_signal_analyzer.py index 19e45aa..bce63b1 100644 --- a/backend/app/crypto_agent/market_signal_analyzer.py +++ b/backend/app/crypto_agent/market_signal_analyzer.py @@ -208,7 +208,7 @@ class MarketSignalAnalyzer: news_context = await self._get_news_context(symbol) # 3. 获取合约市场数据(资金费率、持仓量等) - futures_context = await self._get_futures_context(symbol) + futures_context, futures_market_data = await self._get_futures_context(symbol) # 4. 将日内和趋势拆成两次独立分析,避免一个 prompt 同时混做两件事 intraday_prompt = self._build_analysis_prompt( @@ -251,7 +251,24 @@ class MarketSignalAnalyzer: intraday_result = self._parse_llm_response(intraday_response or "", symbol) trend_result = self._parse_llm_response(trend_response or "", symbol) - return self._merge_lane_results(symbol, intraday_result, trend_result) + result = self._merge_lane_results(symbol, intraday_result, trend_result) + + # 携带量化 regime 数据到最终结果,供执行层使用 + if market_context.get('range_metrics'): + result['range_metrics'] = market_context['range_metrics'] + + # 携带资金费率数据到最终结果,供执行层风控使用 + if futures_market_data: + funding = futures_market_data.get('funding_rate') or {} + result['funding_rate_data'] = { + 'funding_rate': funding.get('funding_rate'), + 'funding_rate_percent': funding.get('funding_rate_percent', 0), + 'sentiment': funding.get('sentiment'), + 'sentiment_level': funding.get('sentiment_level'), + 'open_interest': futures_market_data.get('open_interest'), + } + + return result except Exception as e: logger.error(f"市场信号分析失败: {e}") @@ -385,6 +402,7 @@ class MarketSignalAnalyzer: 'trend': "\n".join(trend_parts), 'levels': "\n".join(levels_parts), 'range_warning': range_warning, + 'range_metrics': range_metrics, } def _get_session_open(self, df: Optional[pd.DataFrame]) -> Optional[float]: @@ -1184,8 +1202,12 @@ class MarketSignalAnalyzer: logger.warning(f"获取新闻失败: {e}") return "新闻获取失败" - async def _get_futures_context(self, symbol: str) -> str: - """获取合约市场数据(资金费率、持仓量、溢价率)""" + async def _get_futures_context(self, symbol: str) -> tuple: + """获取合约市场数据(资金费率、持仓量、溢价率) + + Returns: + (formatted_str, raw_market_data) - 格式化文本和原始数据 + """ try: loop = asyncio.get_event_loop() market_data = await loop.run_in_executor( @@ -1194,11 +1216,11 @@ class MarketSignalAnalyzer: symbol ) if not market_data: - return "" - return self._format_futures_context(symbol, market_data) + return "", None + return self._format_futures_context(symbol, market_data), market_data except Exception as e: logger.warning(f"获取 {symbol} 合约数据失败: {e}") - return "" + return "", None def _format_futures_context(self, symbol: str, market_data: Dict[str, Any]) -> str: """格式化高价值合约特征,避免大段说明性文本"""