""" 信号分析器 - 多周期技术分析和 LLM 深度分析 """ import pandas as pd from typing import Dict, Any, Optional, List from app.utils.logger import logger from app.services.llm_service import llm_service class SignalAnalyzer: """交易信号分析器 - 波段交易优化版""" # LLM 系统提示词 - 波段交易版 CRYPTO_ANALYST_PROMPT = """你是一位经验丰富的加密货币波段交易员,专注于捕捉 1-7 天的中等波段行情。 ## 交易风格 - **波段交易**:持仓 1-7 天,不做超短线 - **顺势回调**:在趋势中寻找回调入场机会 - **风险控制**:单笔亏损不超过本金 2% ## 多周期分析框架 1. **4H 周期**:判断主趋势方向和强度 - 趋势明确:价格在 MA20 同侧运行 3 根以上 K 线 - 趋势强度:看 MACD 柱状图是否放大 2. **1H 周期**:确认趋势 + 寻找回调位置 - 上涨趋势中:等待回调到 MA20 或前低支撑 - 下跌趋势中:等待反弹到 MA20 或前高阻力 3. **15M 周期**:入场信号确认 - 做多:RSI 从超卖回升 + MACD 金叉 + K 线企稳 - 做空:RSI 从超买回落 + MACD 死叉 + K 线见顶 ## 入场条件(波段做多) 1. 4H 趋势向上(价格 > MA20,MACD > 0 或底背离) 2. 1H 回调到支撑位(MA20 附近或前低) 3. 15M 出现止跌信号(RSI < 40 回升,或 MACD 金叉) 4. 止损明确(前低下方),风险收益比 >= 1:2 ## 入场条件(波段做空) 1. 4H 趋势向下(价格 < MA20,MACD < 0 或顶背离) 2. 1H 反弹到阻力位(MA20 附近或前高) 3. 15M 出现见顶信号(RSI > 60 回落,或 MACD 死叉) 4. 止损明确(前高上方),风险收益比 >= 1:2 ## 特殊情况处理 - **极度超卖(RSI < 20)**:不追空,等待反弹做多机会 - **极度超买(RSI > 80)**:不追多,等待回调做空机会 - **震荡市**:观望,等待突破方向 ## 输出格式(JSON) ```json { "market_structure": { "trend": "uptrend/downtrend/sideways", "strength": "strong/moderate/weak", "phase": "impulse/correction/reversal" }, "key_levels": { "resistance": [阻力位1, 阻力位2], "support": [支撑位1, 支撑位2] }, "signal": { "quality": "A/B/C/D", "action": "buy/sell/wait", "confidence": 0-100, "entry_zone": [入场区间下限, 入场区间上限], "stop_loss": 止损价, "targets": [目标1, 目标2], "reason": "入场理由" }, "risk_warning": "风险提示" } ``` 信号质量说明: - A级:趋势明确 + 回调到位 + 多重信号共振(置信度 80+) - B级:趋势明确 + 信号较好(置信度 60-80) - C级:有机会但需要更多确认(置信度 40-60) - D级:不建议交易(置信度 < 40) 重要:波段交易要有耐心,宁可错过也不要在不理想的位置入场。""" def __init__(self): """初始化信号分析器""" logger.info("信号分析器初始化完成") # ==================== K线形态识别 ==================== def _detect_candlestick_patterns(self, df: pd.DataFrame) -> Dict[str, Any]: """ 识别 K 线形态 Args: df: K线数据(至少需要3根K线) Returns: { 'bullish_patterns': [...], # 看涨形态 'bearish_patterns': [...], # 看跌形态 'pattern_weight': float # 形态权重 } """ if len(df) < 3: return {'bullish_patterns': [], 'bearish_patterns': [], 'pattern_weight': 0} bullish = [] bearish = [] weight = 0 # 获取最近3根K线 curr = df.iloc[-1] prev = df.iloc[-2] prev2 = df.iloc[-3] curr_body = curr['close'] - curr['open'] curr_body_abs = abs(curr_body) curr_range = curr['high'] - curr['low'] prev_body = prev['close'] - prev['open'] prev_body_abs = abs(prev_body) # 避免除零 if curr_range == 0: curr_range = 0.0001 # === 锤子线 / 倒锤子线 === upper_shadow = curr['high'] - max(curr['open'], curr['close']) lower_shadow = min(curr['open'], curr['close']) - curr['low'] # 锤子线:下影线长,实体小,出现在下跌后 if lower_shadow > curr_body_abs * 2 and upper_shadow < curr_body_abs * 0.5: if prev_body < 0: # 前一根是阴线 bullish.append("锤子线") weight += 1.5 # 倒锤子线:上影线长,实体小,出现在下跌后 if upper_shadow > curr_body_abs * 2 and lower_shadow < curr_body_abs * 0.5: if prev_body < 0: bullish.append("倒锤子线") weight += 1 # 上吊线:锤子线形态但出现在上涨后 if lower_shadow > curr_body_abs * 2 and upper_shadow < curr_body_abs * 0.5: if prev_body > 0: bearish.append("上吊线") weight -= 1.5 # === 吞没形态 === # 看涨吞没:阳线实体完全包住前一根阴线 if curr_body > 0 and prev_body < 0: if curr['open'] <= prev['close'] and curr['close'] >= prev['open']: if curr_body_abs > prev_body_abs * 1.2: bullish.append("看涨吞没") weight += 2 # 看跌吞没:阴线实体完全包住前一根阳线 if curr_body < 0 and prev_body > 0: if curr['open'] >= prev['close'] and curr['close'] <= prev['open']: if curr_body_abs > prev_body_abs * 1.2: bearish.append("看跌吞没") weight -= 2 # === 十字星 === if curr_body_abs < curr_range * 0.1: # 实体很小 if upper_shadow > curr_range * 0.3 and lower_shadow > curr_range * 0.3: # 十字星本身是中性的,需要结合前一根K线判断 if prev_body > 0: bearish.append("十字星(上涨后)") weight -= 1 elif prev_body < 0: bullish.append("十字星(下跌后)") weight += 1 # === 早晨之星 / 黄昏之星 (3根K线形态) === prev2_body = prev2['close'] - prev2['open'] prev_range = prev['high'] - prev['low'] if prev['high'] != prev['low'] else 0.0001 # 早晨之星:大阴线 + 小实体(星) + 大阳线 if prev2_body < 0 and abs(prev2_body) > prev_range * 0.5: # 第一根大阴线 if abs(prev_body) < prev_range * 0.3: # 第二根小实体 if curr_body > 0 and curr_body_abs > curr_range * 0.5: # 第三根大阳线 if curr['close'] > (prev2['open'] + prev2['close']) / 2: bullish.append("早晨之星") weight += 2.5 # 黄昏之星:大阳线 + 小实体(星) + 大阴线 if prev2_body > 0 and prev2_body > prev_range * 0.5: if abs(prev_body) < prev_range * 0.3: if curr_body < 0 and curr_body_abs > curr_range * 0.5: if curr['close'] < (prev2['open'] + prev2['close']) / 2: bearish.append("黄昏之星") weight -= 2.5 return { 'bullish_patterns': bullish, 'bearish_patterns': bearish, 'pattern_weight': weight } # ==================== 支撑阻力位计算 ==================== def _calculate_support_resistance(self, df: pd.DataFrame, current_price: float) -> Dict[str, Any]: """ 计算支撑位和阻力位 Args: df: K线数据(建议使用1H或4H数据) current_price: 当前价格 Returns: { 'supports': [支撑位1, 支撑位2], 'resistances': [阻力位1, 阻力位2], 'nearest_support': float, 'nearest_resistance': float, 'at_support': bool, 'at_resistance': bool } """ if len(df) < 20: return { 'supports': [], 'resistances': [], 'nearest_support': 0, 'nearest_resistance': 0, 'at_support': False, 'at_resistance': False } # 方法1:使用近期高低点 highs = df['high'].tail(50).values lows = df['low'].tail(50).values # 找局部高点和低点 local_highs = [] local_lows = [] for i in range(2, len(highs) - 2): # 局部高点 if highs[i] > highs[i-1] and highs[i] > highs[i-2] and \ highs[i] > highs[i+1] and highs[i] > highs[i+2]: local_highs.append(highs[i]) # 局部低点 if lows[i] < lows[i-1] and lows[i] < lows[i-2] and \ lows[i] < lows[i+1] and lows[i] < lows[i+2]: local_lows.append(lows[i]) # 方法2:使用均线作为动态支撑阻力 ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns and pd.notna(df['ma20'].iloc[-1]) else 0 ma50 = df['ma50'].iloc[-1] if 'ma50' in df.columns and pd.notna(df['ma50'].iloc[-1]) else 0 # 方法3:布林带 bb_upper = df['bb_upper'].iloc[-1] if 'bb_upper' in df.columns and pd.notna(df['bb_upper'].iloc[-1]) else 0 bb_lower = df['bb_lower'].iloc[-1] if 'bb_lower' in df.columns and pd.notna(df['bb_lower'].iloc[-1]) else 0 # 合并所有支撑位(低于当前价格) all_supports = [] if local_lows: all_supports.extend([l for l in local_lows if l < current_price]) if ma20 and ma20 < current_price: all_supports.append(ma20) if ma50 and ma50 < current_price: all_supports.append(ma50) if bb_lower and bb_lower < current_price: all_supports.append(bb_lower) # 合并所有阻力位(高于当前价格) all_resistances = [] if local_highs: all_resistances.extend([h for h in local_highs if h > current_price]) if ma20 and ma20 > current_price: all_resistances.append(ma20) if ma50 and ma50 > current_price: all_resistances.append(ma50) if bb_upper and bb_upper > current_price: all_resistances.append(bb_upper) # 排序并去重(合并相近的价位) supports = sorted(set(all_supports), reverse=True)[:3] # 最近的3个支撑 resistances = sorted(set(all_resistances))[:3] # 最近的3个阻力 # 找最近的支撑和阻力 nearest_support = supports[0] if supports else 0 nearest_resistance = resistances[0] if resistances else 0 # 判断是否在支撑/阻力位附近(1%范围内) at_support = nearest_support > 0 and abs(current_price - nearest_support) / current_price < 0.01 at_resistance = nearest_resistance > 0 and abs(current_price - nearest_resistance) / current_price < 0.01 return { 'supports': supports, 'resistances': resistances, 'nearest_support': nearest_support, 'nearest_resistance': nearest_resistance, 'at_support': at_support, 'at_resistance': at_resistance } # ==================== 成交量分析 ==================== def _analyze_volume(self, df: pd.DataFrame) -> Dict[str, Any]: """ 分析成交量 Args: df: K线数据(需要包含 volume, volume_ma20, volume_ratio) Returns: { 'volume_signal': 'high' | 'normal' | 'low', 'volume_trend': 'increasing' | 'decreasing' | 'stable', 'volume_confirms': bool, # 成交量是否确认价格走势 'volume_weight': float } """ if len(df) < 5 or 'volume' not in df.columns: return { 'volume_signal': 'normal', 'volume_trend': 'stable', 'volume_confirms': False, 'volume_weight': 0 } latest = df.iloc[-1] prev = df.iloc[-2] # 量比判断 volume_ratio = latest.get('volume_ratio', 1) if pd.isna(volume_ratio): volume_ratio = 1 if volume_ratio > 2: volume_signal = 'high' elif volume_ratio < 0.5: volume_signal = 'low' else: volume_signal = 'normal' # 成交量趋势(最近5根K线) recent_volumes = df['volume'].tail(5) if len(recent_volumes) >= 5: first_half = recent_volumes.iloc[:2].mean() second_half = recent_volumes.iloc[-2:].mean() if second_half > first_half * 1.3: volume_trend = 'increasing' elif second_half < first_half * 0.7: volume_trend = 'decreasing' else: volume_trend = 'stable' else: volume_trend = 'stable' # 判断成交量是否确认价格走势 price_up = latest['close'] > prev['close'] volume_up = latest['volume'] > prev['volume'] # 价涨量增 或 价跌量缩 = 确认 volume_confirms = (price_up and volume_up) or (not price_up and not volume_up) # 计算权重 weight = 0 if volume_signal == 'high' and volume_confirms: weight = 1.5 elif volume_signal == 'high' and not volume_confirms: weight = -0.5 # 放量但不确认,可能是假突破 elif volume_signal == 'low': weight = -0.5 # 缩量,信号可靠性降低 return { 'volume_signal': volume_signal, 'volume_trend': volume_trend, 'volume_confirms': volume_confirms, 'volume_weight': weight } # ==================== 短线信号分析 ==================== def _analyze_short_term_signal(self, m5_data: pd.DataFrame, m15_data: pd.DataFrame, trend_direction: str) -> Dict[str, Any]: """ 分析短线交易信号(超跌反弹/超涨回落) 短线信号特点: - 不依赖大趋势方向,主要看短周期超买超卖 - 快进快出,持仓时间短 - 置信度上限较低,建议轻仓 Args: m5_data: 5分钟K线数据 m15_data: 15分钟K线数据 trend_direction: 大趋势方向(用于顺势加分) Returns: { 'action': 'buy' | 'sell' | 'hold', 'confidence': 0-100, 'reasons': [...], 'signal_type': 'short_term' } """ if len(m5_data) < 5 or len(m15_data) < 5: return {'action': 'hold', 'confidence': 0, 'reasons': [], 'signal_type': 'short_term'} m5_latest = m5_data.iloc[-1] m5_prev = m5_data.iloc[-2] m15_latest = m15_data.iloc[-1] m15_prev = m15_data.iloc[-2] buy_score = 0 sell_score = 0 buy_reasons = [] sell_reasons = [] # === 15M RSI 超卖/超买 === m15_rsi = m15_latest.get('rsi', 50) if pd.notna(m15_rsi): if m15_rsi < 25: buy_reasons.append(f"15M RSI极度超卖({m15_rsi:.1f})") buy_score += 3 elif m15_rsi < 35: buy_reasons.append(f"15M RSI超卖({m15_rsi:.1f})") buy_score += 2 elif m15_rsi > 75: sell_reasons.append(f"15M RSI极度超买({m15_rsi:.1f})") sell_score += 3 elif m15_rsi > 65: sell_reasons.append(f"15M RSI超买({m15_rsi:.1f})") sell_score += 2 # === 5M RSI 反转确认 === m5_rsi = m5_latest.get('rsi', 50) m5_prev_rsi = m5_prev.get('rsi', 50) if pd.notna(m5_rsi) and pd.notna(m5_prev_rsi): # 超卖后回升 if m5_rsi < 40 and m5_rsi > m5_prev_rsi and m5_prev_rsi < 35: buy_reasons.append(f"5M RSI反转回升({m5_prev_rsi:.1f}→{m5_rsi:.1f})") buy_score += 2 # 超买后回落 if m5_rsi > 60 and m5_rsi < m5_prev_rsi and m5_prev_rsi > 65: sell_reasons.append(f"5M RSI反转回落({m5_prev_rsi:.1f}→{m5_rsi:.1f})") sell_score += 2 # === 布林带触轨 === if 'bb_lower' in m15_latest and pd.notna(m15_latest['bb_lower']): if m15_latest['close'] <= m15_latest['bb_lower']: buy_reasons.append("15M触及布林下轨") buy_score += 2 elif m15_latest['close'] >= m15_latest['bb_upper']: sell_reasons.append("15M触及布林上轨") sell_score += 2 # === KDJ 超卖/超买 === m15_k = m15_latest.get('k', 50) m15_d = m15_latest.get('d', 50) if pd.notna(m15_k) and pd.notna(m15_d): if m15_k < 20 and m15_d < 20: buy_reasons.append(f"15M KDJ超卖区(K={m15_k:.1f})") buy_score += 1.5 elif m15_k > 80 and m15_d > 80: sell_reasons.append(f"15M KDJ超买区(K={m15_k:.1f})") sell_score += 1.5 # === 5M K线反转形态 === if m5_latest['close'] > m5_latest['open'] and m5_prev['close'] < m5_prev['open']: # 阴转阳 if m5_rsi < 40: buy_reasons.append("5M阴转阳反转") buy_score += 1.5 elif m5_latest['close'] < m5_latest['open'] and m5_prev['close'] > m5_prev['open']: # 阳转阴 if m5_rsi > 60: sell_reasons.append("5M阳转阴反转") sell_score += 1.5 # === 5M MACD 金叉/死叉 === m5_macd = m5_latest.get('macd', 0) m5_macd_signal = m5_latest.get('macd_signal', 0) m5_prev_macd = m5_prev.get('macd', 0) m5_prev_macd_signal = m5_prev.get('macd_signal', 0) if pd.notna(m5_macd) and pd.notna(m5_prev_macd): if m5_prev_macd <= m5_prev_macd_signal and m5_macd > m5_macd_signal: buy_reasons.append("5M MACD金叉") buy_score += 1.5 elif m5_prev_macd >= m5_prev_macd_signal and m5_macd < m5_macd_signal: sell_reasons.append("5M MACD死叉") sell_score += 1.5 # === 顺势加分 === if trend_direction == 'bullish' and buy_score > 0: buy_score += 1 buy_reasons.append("顺大势做多") elif trend_direction == 'bearish' and sell_score > 0: sell_score += 1 sell_reasons.append("顺大势做空") # === 决策 === action = 'hold' confidence = 0 reasons = [] # 短线信号阈值较低,但置信度上限也低 if buy_score >= 4 and buy_score > sell_score: action = 'buy' confidence = min(35 + buy_score * 6, 65) # 短线最高65% reasons = buy_reasons + ["📈 短线超跌反弹"] elif sell_score >= 4 and sell_score > buy_score: action = 'sell' confidence = min(35 + sell_score * 6, 65) reasons = sell_reasons + ["📉 短线超涨回落"] return { 'action': action, 'confidence': confidence, 'reasons': reasons, 'signal_type': 'short_term', 'scores': {'buy': buy_score, 'sell': sell_score} } # ==================== 5M 精确入场 ==================== def _analyze_5m_entry(self, m5_data: pd.DataFrame, action: str) -> Dict[str, Any]: """ 使用 5M 数据寻找精确入场点 Args: m5_data: 5分钟K线数据 action: 'buy' 或 'sell' Returns: { 'entry_confirmed': bool, 'entry_reasons': [...], 'entry_weight': float } """ if len(m5_data) < 5: return {'entry_confirmed': False, 'entry_reasons': [], 'entry_weight': 0} latest = m5_data.iloc[-1] prev = m5_data.iloc[-2] reasons = [] weight = 0 # 获取指标 rsi = latest.get('rsi', 50) prev_rsi = prev.get('rsi', 50) macd = latest.get('macd', 0) macd_signal = latest.get('macd_signal', 0) prev_macd = prev.get('macd', 0) prev_macd_signal = prev.get('macd_signal', 0) if action == 'buy': # 5M RSI 从超卖回升 if pd.notna(rsi) and pd.notna(prev_rsi): if rsi < 40 and rsi > prev_rsi: reasons.append("5M RSI回升") weight += 1 if rsi < 30: reasons.append("5M RSI超卖") weight += 0.5 # 5M MACD 金叉 if pd.notna(macd) and pd.notna(prev_macd): if prev_macd <= prev_macd_signal and macd > macd_signal: reasons.append("5M MACD金叉") weight += 1.5 # 5M K线企稳(阳线) if latest['close'] > latest['open']: if prev['close'] < prev['open']: # 前一根是阴线 reasons.append("5M阳线反转") weight += 1 elif action == 'sell': # 5M RSI 从超买回落 if pd.notna(rsi) and pd.notna(prev_rsi): if rsi > 60 and rsi < prev_rsi: reasons.append("5M RSI回落") weight += 1 if rsi > 70: reasons.append("5M RSI超买") weight += 0.5 # 5M MACD 死叉 if pd.notna(macd) and pd.notna(prev_macd): if prev_macd >= prev_macd_signal and macd < macd_signal: reasons.append("5M MACD死叉") weight += 1.5 # 5M K线见顶(阴线) if latest['close'] < latest['open']: if prev['close'] > prev['open']: reasons.append("5M阴线反转") weight += 1 entry_confirmed = weight >= 2 return { 'entry_confirmed': entry_confirmed, 'entry_reasons': reasons, 'entry_weight': weight } def analyze_trend(self, h1_data: pd.DataFrame, h4_data: pd.DataFrame) -> Dict[str, Any]: """ 分析趋势方向和强度(波段交易优化版) Args: h1_data: 1小时K线数据(含技术指标) h4_data: 4小时K线数据(含技术指标) Returns: { 'direction': 'bullish' | 'bearish' | 'neutral', 'strength': 'strong' | 'moderate' | 'weak', 'phase': 'impulse' | 'correction' | 'reversal', 'h4_score': float, 'h1_score': float } """ if h1_data.empty or h4_data.empty: return { 'direction': 'neutral', 'strength': 'weak', 'phase': 'sideways', 'h4_score': 0, 'h1_score': 0 } # 获取最新数据 h1_latest = h1_data.iloc[-1] h4_latest = h4_data.iloc[-1] # 计算各周期的趋势得分 h4_score, h4_details = self._calculate_trend_score(h4_latest) h1_score, h1_details = self._calculate_trend_score(h1_latest) # 判断趋势方向(4H 为主) if h4_score > 0.3: direction = 'bullish' elif h4_score < -0.3: direction = 'bearish' else: direction = 'neutral' # 判断趋势强度 strength = self._assess_trend_strength(h4_data, h1_data) # 判断当前阶段(是主升/主跌还是回调) phase = self._detect_market_phase(h4_data, h1_data, direction) # 检查极端情况 h4_rsi = h4_latest.get('rsi', 50) extreme_warning = "" if pd.notna(h4_rsi): if h4_rsi < 20: extreme_warning = f" [RSI={h4_rsi:.1f} 极度超卖]" phase = 'oversold' elif h4_rsi > 80: extreme_warning = f" [RSI={h4_rsi:.1f} 极度超买]" phase = 'overbought' logger.info(f"趋势分析: 方向={direction}, 强度={strength}, 阶段={phase} | " f"4H={h4_score:.2f}{h4_details}, 1H={h1_score:.2f}{h1_details}{extreme_warning}") return { 'direction': direction, 'strength': strength, 'phase': phase, 'h4_score': h4_score, 'h1_score': h1_score } def _assess_trend_strength(self, h4_data: pd.DataFrame, h1_data: pd.DataFrame) -> str: """评估趋势强度""" if len(h4_data) < 5: return 'weak' h4_latest = h4_data.iloc[-1] # 检查 MACD 柱状图是否放大 macd_hist = h4_data['macd_hist'].tail(5) macd_expanding = False if len(macd_hist) >= 3: recent_abs = abs(macd_hist.iloc[-1]) prev_abs = abs(macd_hist.iloc[-3]) if recent_abs > prev_abs * 1.2: macd_expanding = True # 检查价格是否持续在 MA20 同侧 close_prices = h4_data['close'].tail(5) ma20_values = h4_data['ma20'].tail(5) consistent_side = True if pd.notna(ma20_values.iloc[-1]): above_ma = close_prices > ma20_values consistent_side = above_ma.all() or (~above_ma).all() # 检查 RSI 是否在趋势区间 rsi = h4_latest.get('rsi', 50) rsi_trending = 40 < rsi < 60 # 中性区间表示趋势不强 if macd_expanding and consistent_side and not rsi_trending: return 'strong' elif consistent_side: return 'moderate' else: return 'weak' def _detect_market_phase(self, h4_data: pd.DataFrame, h1_data: pd.DataFrame, direction: str) -> str: """检测市场阶段(主升/主跌 vs 回调)""" if len(h1_data) < 10: return 'unknown' h1_latest = h1_data.iloc[-1] h4_latest = h4_data.iloc[-1] # 获取 1H 的短期趋势 h1_ma5 = h1_latest.get('ma5', 0) h1_ma20 = h1_latest.get('ma20', 0) h1_close = h1_latest.get('close', 0) if not (pd.notna(h1_ma5) and pd.notna(h1_ma20)): return 'unknown' # 判断 1H 是否在回调 if direction == 'bullish': # 上涨趋势中,1H 价格回落到 MA20 附近 = 回调 if h1_close < h1_ma5 and h1_close > h1_ma20 * 0.98: return 'correction' # 回调中,可能是入场机会 elif h1_close > h1_ma5: return 'impulse' # 主升浪 elif direction == 'bearish': # 下跌趋势中,1H 价格反弹到 MA20 附近 = 反弹 if h1_close > h1_ma5 and h1_close < h1_ma20 * 1.02: return 'correction' # 反弹中,可能是做空机会 elif h1_close < h1_ma5: return 'impulse' # 主跌浪 return 'sideways' def _calculate_trend_score(self, data: pd.Series) -> tuple: """ 计算单周期趋势得分 Args: data: 包含技术指标的数据行 Returns: (得分, 详情字符串) """ score = 0.0 count = 0 details = [] # 价格与均线关系 if 'close' in data and 'ma20' in data and pd.notna(data['ma20']): if data['close'] > data['ma20']: score += 1 details.append("价格>MA20") else: score -= 1 details.append("价格 data['ma20']: score += 1 details.append("MA5>MA20") else: score -= 1 details.append("MA5 data['macd_signal']: score += 1 details.append("MACD多") else: score -= 1 details.append("MACD空") count += 1 # RSI if 'rsi' in data and pd.notna(data['rsi']): if data['rsi'] > 50: score += 0.5 else: score -= 0.5 count += 0.5 final_score = score / count if count > 0 else 0 detail_str = f"({','.join(details)})" if details else "" return final_score, detail_str def analyze_entry_signal(self, m5_data: pd.DataFrame, m15_data: pd.DataFrame, trend: Dict[str, Any], h1_data: pd.DataFrame = None) -> Dict[str, Any]: """ 分析 15M 进场信号(波段交易优化版 - 增强版) 新增功能: - 成交量确认 - K线形态识别 - 支撑阻力位判断 - 5M精确入场 Args: m5_data: 5分钟K线数据(用于精确入场) m15_data: 15分钟K线数据(主要入场周期) trend: 趋势分析结果 h1_data: 1小时K线数据(用于支撑阻力位计算,可选) Returns: { 'action': 'buy' | 'sell' | 'hold', 'confidence': 0-100, 'signal_grade': 'A' | 'B' | 'C' | 'D', 'reasons': [...], 'indicators': {...}, 'patterns': {...}, 'levels': {...}, 'volume_analysis': {...} } """ if m5_data.empty or m15_data.empty: return {'action': 'hold', 'confidence': 0, 'signal_grade': 'D', 'reasons': ['数据不足'], 'indicators': {}} # 兼容旧格式(如果 trend 是字符串) if isinstance(trend, str): trend_direction = trend trend_phase = 'unknown' trend_strength = 'moderate' else: trend_direction = trend.get('direction', 'neutral') trend_phase = trend.get('phase', 'unknown') trend_strength = trend.get('strength', 'moderate') m15_latest = m15_data.iloc[-1] current_price = float(m15_latest['close']) # 收集信号 buy_signals = [] sell_signals = [] signal_weights = {'buy': 0, 'sell': 0} # ==================== 1. 传统技术指标信号 ==================== # === RSI 信号 === if 'rsi' in m15_latest and pd.notna(m15_latest['rsi']): rsi = m15_latest['rsi'] if rsi < 30: buy_signals.append(f"RSI超卖({rsi:.1f})") signal_weights['buy'] += 2 elif rsi < 40 and len(m15_data) >= 2: prev_rsi = m15_data.iloc[-2].get('rsi', 50) if pd.notna(prev_rsi) and rsi > prev_rsi: buy_signals.append(f"RSI回升({prev_rsi:.1f}→{rsi:.1f})") signal_weights['buy'] += 1.5 elif rsi > 70: sell_signals.append(f"RSI超买({rsi:.1f})") signal_weights['sell'] += 2 elif rsi > 60 and len(m15_data) >= 2: prev_rsi = m15_data.iloc[-2].get('rsi', 50) if pd.notna(prev_rsi) and rsi < prev_rsi: sell_signals.append(f"RSI回落({prev_rsi:.1f}→{rsi:.1f})") signal_weights['sell'] += 1.5 # === MACD 信号 === if len(m15_data) >= 2: prev = m15_data.iloc[-2] if 'macd' in m15_latest and 'macd_signal' in m15_latest: if pd.notna(m15_latest['macd']) and pd.notna(prev['macd']): if prev['macd'] <= prev['macd_signal'] and m15_latest['macd'] > m15_latest['macd_signal']: buy_signals.append("MACD金叉") signal_weights['buy'] += 2 elif prev['macd'] >= prev['macd_signal'] and m15_latest['macd'] < m15_latest['macd_signal']: sell_signals.append("MACD死叉") signal_weights['sell'] += 2 elif abs(m15_latest['macd_hist']) < abs(prev['macd_hist']) * 0.7: if m15_latest['macd_hist'] > 0: sell_signals.append("MACD动能减弱") signal_weights['sell'] += 0.5 else: buy_signals.append("MACD动能减弱") signal_weights['buy'] += 0.5 # === 布林带信号 === if 'close' in m15_latest and 'bb_lower' in m15_latest and 'bb_upper' in m15_latest: if pd.notna(m15_latest['bb_lower']) and pd.notna(m15_latest['bb_upper']): bb_middle = m15_latest.get('bb_middle', (m15_latest['bb_upper'] + m15_latest['bb_lower']) / 2) if m15_latest['close'] < m15_latest['bb_lower']: buy_signals.append("触及布林下轨") signal_weights['buy'] += 1.5 elif m15_latest['close'] > m15_latest['bb_upper']: sell_signals.append("触及布林上轨") signal_weights['sell'] += 1.5 elif len(m15_data) >= 2: prev_close = m15_data.iloc[-2]['close'] if prev_close < bb_middle and m15_latest['close'] > bb_middle: buy_signals.append("突破布林中轨") signal_weights['buy'] += 1 elif prev_close > bb_middle and m15_latest['close'] < bb_middle: sell_signals.append("跌破布林中轨") signal_weights['sell'] += 1 # === KDJ 信号 === if 'k' in m15_latest and 'd' in m15_latest and len(m15_data) >= 2: prev = m15_data.iloc[-2] if pd.notna(m15_latest['k']) and pd.notna(prev['k']): if prev['k'] <= prev['d'] and m15_latest['k'] > m15_latest['d']: if m15_latest['k'] < 30: buy_signals.append("KDJ低位金叉") signal_weights['buy'] += 1.5 else: buy_signals.append("KDJ金叉") signal_weights['buy'] += 0.5 elif prev['k'] >= prev['d'] and m15_latest['k'] < m15_latest['d']: if m15_latest['k'] > 70: sell_signals.append("KDJ高位死叉") signal_weights['sell'] += 1.5 else: sell_signals.append("KDJ死叉") signal_weights['sell'] += 0.5 # ==================== 2. K线形态识别 ==================== patterns = self._detect_candlestick_patterns(m15_data) if patterns['bullish_patterns']: buy_signals.extend(patterns['bullish_patterns']) signal_weights['buy'] += patterns['pattern_weight'] if patterns['bearish_patterns']: sell_signals.extend(patterns['bearish_patterns']) signal_weights['sell'] += abs(patterns['pattern_weight']) # ==================== 3. 成交量分析 ==================== volume_analysis = self._analyze_volume(m15_data) if volume_analysis['volume_signal'] == 'high': if volume_analysis['volume_confirms']: # 放量确认 if signal_weights['buy'] > signal_weights['sell']: buy_signals.append(f"放量确认(量比{m15_latest.get('volume_ratio', 1):.1f})") signal_weights['buy'] += volume_analysis['volume_weight'] else: sell_signals.append(f"放量确认(量比{m15_latest.get('volume_ratio', 1):.1f})") signal_weights['sell'] += volume_analysis['volume_weight'] else: # 放量不确认,可能是假信号 if signal_weights['buy'] > signal_weights['sell']: buy_signals.append("放量但不确认(警惕)") signal_weights['buy'] += volume_analysis['volume_weight'] # 负权重 else: sell_signals.append("放量但不确认(警惕)") signal_weights['sell'] += volume_analysis['volume_weight'] # ==================== 4. 支撑阻力位分析 ==================== levels = {} if h1_data is not None and not h1_data.empty: levels = self._calculate_support_resistance(h1_data, current_price) if levels.get('at_support') and trend_direction == 'bullish': buy_signals.append(f"触及支撑位({levels['nearest_support']:.2f})") signal_weights['buy'] += 1.5 if levels.get('at_resistance') and trend_direction == 'bearish': sell_signals.append(f"触及阻力位({levels['nearest_resistance']:.2f})") signal_weights['sell'] += 1.5 # ==================== 5. 根据趋势和阶段决定动作(中长线信号)==================== action = 'hold' confidence = 0 reasons = [] signal_grade = 'D' signal_type = 'swing' # 默认波段信号 # 波段交易核心逻辑:在回调中寻找入场机会 if trend_direction == 'bullish': if trend_phase == 'correction' and signal_weights['buy'] >= 3: action = 'buy' confidence = min(40 + signal_weights['buy'] * 10, 95) reasons = buy_signals + [f"📊 波段信号: 上涨趋势回调({trend_strength})"] signal_grade = 'A' if confidence >= 80 else ('B' if confidence >= 60 else 'C') elif trend_phase == 'impulse' and signal_weights['buy'] >= 4: action = 'buy' confidence = min(30 + signal_weights['buy'] * 8, 80) reasons = buy_signals + ["📊 波段信号: 主升浪追多"] signal_grade = 'B' if confidence >= 60 else 'C' elif trend_phase == 'oversold' and signal_weights['buy'] >= 3: # 极度超卖时允许抄底,但降低置信度并提示风险 action = 'buy' confidence = min(30 + signal_weights['buy'] * 8, 70) # 最高70% reasons = buy_signals + ["📊 波段信号: 极度超卖抄底(高风险)", "建议轻仓试探"] signal_grade = 'C' # 最高C级 elif trend_phase == 'overbought': reasons = ['极度超买,不宜追多'] elif trend_direction == 'bearish': if trend_phase == 'correction' and signal_weights['sell'] >= 3: action = 'sell' confidence = min(40 + signal_weights['sell'] * 10, 95) reasons = sell_signals + [f"📊 波段信号: 下跌趋势反弹({trend_strength})"] signal_grade = 'A' if confidence >= 80 else ('B' if confidence >= 60 else 'C') elif trend_phase == 'impulse' and signal_weights['sell'] >= 4: action = 'sell' confidence = min(30 + signal_weights['sell'] * 8, 80) reasons = sell_signals + ["📊 波段信号: 主跌浪追空"] signal_grade = 'B' if confidence >= 60 else 'C' elif trend_phase == 'overbought' and signal_weights['sell'] >= 3: # 极度超买时允许做空,但降低置信度并提示风险 action = 'sell' confidence = min(30 + signal_weights['sell'] * 8, 70) # 最高70% reasons = sell_signals + ["📊 波段信号: 极度超买摸顶(高风险)", "建议轻仓试探"] signal_grade = 'C' # 最高C级 elif trend_phase == 'oversold': reasons = ['极度超卖,不宜追空'] # ==================== 5.5 短线信号检测(中长线无信号时)==================== # 如果中长线没有触发信号,检查短线超跌反弹/超涨回落机会 if action == 'hold': short_term = self._analyze_short_term_signal(m5_data, m15_data, trend_direction) if short_term['action'] != 'hold' and short_term['confidence'] >= 50: action = short_term['action'] confidence = short_term['confidence'] reasons = short_term['reasons'] signal_type = 'short_term' signal_grade = 'C' # 短线信号最高C级 # 如果还是没有信号 if action == 'hold' and trend_direction == 'neutral': reasons = ['趋势不明确,观望'] # ==================== 6. 5M 精确入场确认 ==================== if action != 'hold' and not m5_data.empty: entry_5m = self._analyze_5m_entry(m5_data, action) if entry_5m['entry_confirmed']: confidence = min(confidence + 10, 95) reasons.extend(entry_5m['entry_reasons']) if signal_grade == 'B': signal_grade = 'A' elif signal_grade == 'C': signal_grade = 'B' elif entry_5m['entry_weight'] < 1: # 5M 没有确认,降低置信度 confidence = max(confidence - 10, 0) reasons.append("5M未确认入场") if not reasons: reasons = ['信号不足,继续观望'] # 收集指标数据 indicators = {} for col in ['rsi', 'macd', 'macd_signal', 'macd_hist', 'k', 'd', 'j', 'close', 'ma20', 'volume_ratio']: if col in m15_latest and pd.notna(m15_latest[col]): indicators[col] = float(m15_latest[col]) # 记录详细日志(简化版,详细日志在 crypto_agent 中输出) if action != 'hold': type_text = "短线" if signal_type == 'short_term' else "波段" logger.debug(f"信号详情: [{type_text}] {action} {confidence}% {signal_grade} | 买权重={signal_weights['buy']:.1f} 卖权重={signal_weights['sell']:.1f}") return { 'action': action, 'confidence': confidence, 'signal_grade': signal_grade, 'signal_type': signal_type, # 'swing' 波段 | 'short_term' 短线 'reasons': reasons, 'indicators': indicators, 'trend_info': { 'direction': trend_direction, 'phase': trend_phase, 'strength': trend_strength }, 'patterns': patterns, 'volume_analysis': volume_analysis, 'levels': levels, 'signal_weights': signal_weights } async def llm_analyze(self, data: Dict[str, pd.DataFrame], signal: Dict[str, Any], symbol: str) -> Dict[str, Any]: """ 使用 LLM 进行深度分析 Args: data: 多周期K线数据 signal: 初步信号分析结果 symbol: 交易对 Returns: LLM 分析结果(结构化数据) """ try: # 构建分析提示 prompt = self._build_analysis_prompt(data, signal, symbol) # 调用 LLM response = llm_service.chat([ {"role": "system", "content": self.CRYPTO_ANALYST_PROMPT}, {"role": "user", "content": prompt} ]) if response: logger.info(f"LLM 分析完成: {symbol}") # 解析 JSON 响应 return self._parse_llm_response(response) else: return {"error": "LLM 分析暂时不可用", "raw": ""} except Exception as e: logger.error(f"LLM 分析失败: {e}") return {"error": str(e), "raw": ""} def _parse_llm_response(self, response: str) -> Dict[str, Any]: """ 解析 LLM 的 JSON 响应 Args: response: LLM 原始响应 Returns: 解析后的结构化数据 """ import json import re result = { "raw": response, "parsed": None, "summary": "" } try: # 尝试提取 JSON 块 json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response) if json_match: json_str = json_match.group(1) else: # 尝试直接解析整个响应 json_str = response # 解析 JSON parsed = json.loads(json_str) result["parsed"] = parsed # 生成摘要 if parsed: recommendation = parsed.get("recommendation", {}) action = recommendation.get("action", "wait") confidence = recommendation.get("confidence", 0) reason = recommendation.get("reason", "") if action == "wait": result["summary"] = f"建议观望。{parsed.get('risk_warning', '')}" else: action_text = "做多" if action == "buy" else "做空" result["summary"] = f"建议{action_text},置信度{confidence}%。{reason}" except json.JSONDecodeError: # JSON 解析失败,提取关键信息 logger.warning("LLM 响应不是有效 JSON,尝试提取关键信息") result["summary"] = self._extract_summary_from_text(response) return result def _extract_summary_from_text(self, text: str) -> str: """从非 JSON 文本中提取摘要""" # 简单提取前 200 字符作为摘要 text = text.strip() if len(text) > 200: return text[:200] + "..." return text def _build_analysis_prompt(self, data: Dict[str, pd.DataFrame], signal: Dict[str, Any], symbol: str) -> str: """构建 LLM 分析提示 - 优化版""" parts = [f"# {symbol} 技术分析数据\n"] # 当前价格 current_price = float(data['5m'].iloc[-1]['close']) parts.append(f"**当前价格**: ${current_price:,.2f}\n") # 添加各周期指标摘要 for interval in ['4h', '1h', '15m']: df = data.get(interval) if df is None or df.empty: continue latest = df.iloc[-1] parts.append(f"\n## {interval.upper()} 周期指标") # 价格与均线关系 close = latest.get('close', 0) ma20 = latest.get('ma20', 0) ma50 = latest.get('ma50', 0) if pd.notna(ma20): position = "上方" if close > ma20 else "下方" parts.append(f"- 价格在 MA20 {position} (MA20={ma20:.2f})") if pd.notna(ma50): parts.append(f"- MA50: {ma50:.2f}") # RSI rsi = latest.get('rsi', 0) if pd.notna(rsi): rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性") parts.append(f"- RSI: {rsi:.1f} ({rsi_status})") # MACD macd = latest.get('macd', 0) macd_signal = latest.get('macd_signal', 0) if pd.notna(macd) and pd.notna(macd_signal): macd_status = "多头" if macd > macd_signal else "空头" parts.append(f"- MACD: {macd:.2f}, Signal: {macd_signal:.2f} ({macd_status})") # 布林带 bb_upper = latest.get('bb_upper', 0) bb_lower = latest.get('bb_lower', 0) if pd.notna(bb_upper) and pd.notna(bb_lower): parts.append(f"- 布林带: 上轨={bb_upper:.2f}, 下轨={bb_lower:.2f}") # 添加最近 5 根 15M K 线(让 LLM 看形态) parts.append("\n## 最近 5 根 15M K线") parts.append("| 时间 | 开盘 | 最高 | 最低 | 收盘 | 涨跌 |") parts.append("|------|------|------|------|------|------|") df_15m = data.get('15m') if df_15m is not None and len(df_15m) >= 5: for i in range(-5, 0): row = df_15m.iloc[i] change = ((row['close'] - row['open']) / row['open']) * 100 change_str = f"+{change:.2f}%" if change >= 0 else f"{change:.2f}%" time_str = row['open_time'].strftime('%H:%M') if pd.notna(row['open_time']) else 'N/A' parts.append(f"| {time_str} | {row['open']:.2f} | {row['high']:.2f} | {row['low']:.2f} | {row['close']:.2f} | {change_str} |") # 计算关键价位 parts.append("\n## 关键价位参考") df_1h = data.get('1h') if df_1h is not None and len(df_1h) >= 20: recent_high = df_1h['high'].tail(20).max() recent_low = df_1h['low'].tail(20).min() parts.append(f"- 近期高点: ${recent_high:,.2f}") parts.append(f"- 近期低点: ${recent_low:,.2f}") # 初步信号分析结果 parts.append(f"\n## 规则引擎初步判断") parts.append(f"- 趋势: {signal.get('trend', 'unknown')}") parts.append(f"- 信号: {signal.get('action', 'hold')}") parts.append(f"- 置信度: {signal.get('confidence', 0)}%") parts.append(f"- 触发原因: {', '.join(signal.get('reasons', []))}") parts.append("\n---") parts.append("请基于以上数据进行分析,严格按照 JSON 格式输出你的判断。") return "\n".join(parts) def calculate_stop_loss_take_profit(self, price: float, action: str, atr: float) -> Dict[str, float]: """ 计算止损止盈位置 Args: price: 当前价格 action: 'buy' 或 'sell' atr: ATR 值 Returns: {'stop_loss': float, 'take_profit': float} """ if action == 'buy': stop_loss = price - atr * 2 take_profit = price + atr * 3 elif action == 'sell': stop_loss = price + atr * 2 take_profit = price - atr * 3 else: stop_loss = 0 take_profit = 0 return { 'stop_loss': round(stop_loss, 2), 'take_profit': round(take_profit, 2) }