diff --git a/backend/app/crypto_agent/llm_signal_analyzer.py b/backend/app/crypto_agent/llm_signal_analyzer.py index ca0017e..9dea52b 100644 --- a/backend/app/crypto_agent/llm_signal_analyzer.py +++ b/backend/app/crypto_agent/llm_signal_analyzer.py @@ -211,10 +211,35 @@ class LLMSignalAnalyzer: parts.append(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") # 当前价格 + current_price = 0 if '5m' in data and not data['5m'].empty: current_price = float(data['5m'].iloc[-1]['close']) parts.append(f"**当前价格**: ${current_price:,.2f}\n") + # === 新增:关键价位分析 === + key_levels = self._calculate_key_levels(data) + if key_levels: + parts.append("\n## 关键价位") + parts.append(key_levels) + + # === 新增:多周期共振分析 === + resonance = self._analyze_multi_timeframe_resonance(data) + if resonance: + parts.append("\n## 多周期共振") + parts.append(resonance) + + # === 新增:市场结构分析 === + structure = self._analyze_market_structure(data) + if structure: + parts.append("\n## 市场结构") + parts.append(structure) + + # === 新增:波动率分析 === + volatility = self._analyze_volatility(data) + if volatility: + parts.append("\n## 波动率分析") + parts.append(volatility) + # 各周期数据 for interval in ['4h', '1h', '15m', '5m']: df = data.get(interval) @@ -223,9 +248,9 @@ class LLMSignalAnalyzer: parts.append(f"\n## {interval.upper()} 周期数据") - # 最新指标 + # 最新指标(传入 df 以分析趋势变化) latest = df.iloc[-1] - parts.append(self._format_indicators(latest)) + parts.append(self._format_indicators(latest, df)) # 最近 K 线数据 parts.append(self._format_recent_klines(df, interval)) @@ -240,8 +265,242 @@ class LLMSignalAnalyzer: return "\n".join(parts) - def _format_indicators(self, row: pd.Series) -> str: - """格式化指标数据""" + def _calculate_key_levels(self, data: Dict[str, pd.DataFrame]) -> str: + """计算关键支撑阻力位""" + lines = [] + + # 使用 4h 数据计算关键价位 + df = data.get('4h') + if df is None or len(df) < 20: + return "" + + current_price = float(df.iloc[-1]['close']) + + # 1. 前高前低(最近 20 根 K 线) + recent = df.iloc[-20:] + recent_high = float(recent['high'].max()) + recent_low = float(recent['low'].min()) + + # 2. 整数关口 + round_levels = [] + base = int(current_price / 1000) * 1000 + for offset in [-2000, -1000, 0, 1000, 2000]: + level = base + offset + if level > 0: + round_levels.append(level) + + # 3. 斐波那契回撤位(基于最近的高低点) + fib_levels = [] + price_range = recent_high - recent_low + if price_range > 0: + fib_ratios = [0, 0.236, 0.382, 0.5, 0.618, 0.786, 1] + for ratio in fib_ratios: + fib_price = recent_low + price_range * ratio + fib_levels.append((ratio, fib_price)) + + # 构建输出 + lines.append(f"- 近期高点: ${recent_high:,.2f}") + lines.append(f"- 近期低点: ${recent_low:,.2f}") + + # 找出最近的支撑和阻力 + supports = [] + resistances = [] + + # 从斐波那契位找支撑阻力 + for ratio, price in fib_levels: + if price < current_price * 0.995: # 低于当前价 0.5% 以上 + supports.append(price) + elif price > current_price * 1.005: # 高于当前价 0.5% 以上 + resistances.append(price) + + # 添加整数关口 + for level in round_levels: + if level < current_price * 0.995: + supports.append(level) + elif level > current_price * 1.005: + resistances.append(level) + + # 排序并取最近的 + supports = sorted(set(supports), reverse=True)[:3] + resistances = sorted(set(resistances))[:3] + + if supports: + support_str = ", ".join([f"${s:,.0f}" for s in supports]) + lines.append(f"- 支撑位: {support_str}") + if resistances: + resistance_str = ", ".join([f"${r:,.0f}" for r in resistances]) + lines.append(f"- 阻力位: {resistance_str}") + + # 当前价格位置 + if recent_high > recent_low: + position = (current_price - recent_low) / (recent_high - recent_low) * 100 + if position > 80: + pos_text = "接近高点,注意回调风险" + elif position < 20: + pos_text = "接近低点,关注反弹机会" + else: + pos_text = f"处于区间 {position:.0f}% 位置" + lines.append(f"- 价格位置: {pos_text}") + + return "\n".join(lines) + + def _analyze_multi_timeframe_resonance(self, data: Dict[str, pd.DataFrame]) -> str: + """分析多周期共振""" + trends = {} + + for interval in ['4h', '1h', '15m', '5m']: + df = data.get(interval) + if df is None or len(df) < 10: + continue + + # 判断趋势方向 + ma5 = df['ma5'].iloc[-1] if 'ma5' in df.columns else None + ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns else None + close = df['close'].iloc[-1] + + if pd.notna(ma5) and pd.notna(ma20): + if close > ma5 > ma20: + trends[interval] = 'bullish' + elif close < ma5 < ma20: + trends[interval] = 'bearish' + else: + trends[interval] = 'neutral' + + if len(trends) < 2: + return "" + + lines = [] + + # 统计各方向数量 + bullish_count = sum(1 for t in trends.values() if t == 'bullish') + bearish_count = sum(1 for t in trends.values() if t == 'bearish') + total = len(trends) + + # 各周期趋势 + trend_map = {'bullish': '📈多', 'bearish': '📉空', 'neutral': '➡️震荡'} + trend_str = " | ".join([f"{k}: {trend_map.get(v, v)}" for k, v in trends.items()]) + lines.append(f"- 各周期趋势: {trend_str}") + + # 共振判断 + if bullish_count == total: + lines.append(f"- **强共振做多**: 所有周期均为多头排列") + elif bearish_count == total: + lines.append(f"- **强共振做空**: 所有周期均为空头排列") + elif bullish_count >= total * 0.75: + lines.append(f"- **偏多共振**: {bullish_count}/{total} 周期看多") + elif bearish_count >= total * 0.75: + lines.append(f"- **偏空共振**: {bearish_count}/{total} 周期看空") + else: + lines.append(f"- **无明显共振**: 多空分歧,建议观望") + + return "\n".join(lines) + + def _analyze_market_structure(self, data: Dict[str, pd.DataFrame]) -> str: + """分析市场结构(趋势、高低点)""" + df = data.get('1h') + if df is None or len(df) < 24: + return "" + + lines = [] + recent = df.iloc[-24:] # 最近 24 根 1h K 线 + + # 找出局部高低点 + highs = [] + lows = [] + + for i in range(2, len(recent) - 2): + # 局部高点:比前后两根都高 + if (recent.iloc[i]['high'] > recent.iloc[i-1]['high'] and + recent.iloc[i]['high'] > recent.iloc[i-2]['high'] and + recent.iloc[i]['high'] > recent.iloc[i+1]['high'] and + recent.iloc[i]['high'] > recent.iloc[i+2]['high']): + highs.append((i, float(recent.iloc[i]['high']))) + + # 局部低点:比前后两根都低 + if (recent.iloc[i]['low'] < recent.iloc[i-1]['low'] and + recent.iloc[i]['low'] < recent.iloc[i-2]['low'] and + recent.iloc[i]['low'] < recent.iloc[i+1]['low'] and + recent.iloc[i]['low'] < recent.iloc[i+2]['low']): + lows.append((i, float(recent.iloc[i]['low']))) + + # 判断趋势结构 + if len(highs) >= 2 and len(lows) >= 2: + # 检查高点是否越来越高 + higher_highs = all(highs[i][1] < highs[i+1][1] for i in range(len(highs)-1)) + lower_highs = all(highs[i][1] > highs[i+1][1] for i in range(len(highs)-1)) + + # 检查低点是否越来越高 + higher_lows = all(lows[i][1] < lows[i+1][1] for i in range(len(lows)-1)) + lower_lows = all(lows[i][1] > lows[i+1][1] for i in range(len(lows)-1)) + + if higher_highs and higher_lows: + lines.append("- **上升趋势**: 更高的高点(HH) + 更高的低点(HL)") + elif lower_highs and lower_lows: + lines.append("- **下降趋势**: 更低的高点(LH) + 更低的低点(LL)") + elif higher_lows and lower_highs: + lines.append("- **收敛三角形**: 高点下移 + 低点上移,即将突破") + elif lower_lows and higher_highs: + lines.append("- **扩散形态**: 波动加大,方向不明") + else: + lines.append("- **震荡结构**: 无明显趋势") + else: + lines.append("- **结构不明**: 高低点不足,难以判断") + + # 计算趋势强度 + if len(recent) >= 10: + price_change = (float(recent.iloc[-1]['close']) - float(recent.iloc[0]['close'])) / float(recent.iloc[0]['close']) * 100 + if abs(price_change) > 3: + direction = "上涨" if price_change > 0 else "下跌" + lines.append(f"- 24h 趋势: {direction} {abs(price_change):.1f}%") + + return "\n".join(lines) + + def _analyze_volatility(self, data: Dict[str, pd.DataFrame]) -> str: + """分析波动率变化""" + df = data.get('1h') + if df is None or len(df) < 24 or 'atr' not in df.columns: + return "" + + lines = [] + + # ATR 变化趋势 + recent_atr = df['atr'].iloc[-6:].mean() # 最近 6 根 + older_atr = df['atr'].iloc[-12:-6].mean() # 之前 6 根 + + if pd.isna(recent_atr) or pd.isna(older_atr) or older_atr == 0: + return "" + + atr_change = (recent_atr - older_atr) / older_atr * 100 + + current_atr = float(df['atr'].iloc[-1]) + current_price = float(df['close'].iloc[-1]) + atr_percent = current_atr / current_price * 100 + + lines.append(f"- 当前 ATR: ${current_atr:.2f} ({atr_percent:.2f}%)") + + if atr_change > 20: + lines.append(f"- **波动率扩张**: ATR 上升 {atr_change:.0f}%,趋势可能启动") + elif atr_change < -20: + lines.append(f"- **波动率收缩**: ATR 下降 {abs(atr_change):.0f}%,可能即将突破") + else: + lines.append(f"- 波动率稳定: ATR 变化 {atr_change:+.0f}%") + + # 布林带宽度 + if 'bb_upper' in df.columns and 'bb_lower' in df.columns: + bb_width = (float(df['bb_upper'].iloc[-1]) - float(df['bb_lower'].iloc[-1])) / current_price * 100 + bb_width_prev = (float(df['bb_upper'].iloc[-6]) - float(df['bb_lower'].iloc[-6])) / float(df['close'].iloc[-6]) * 100 + + if bb_width < bb_width_prev * 0.8: + lines.append(f"- **布林带收口**: 宽度 {bb_width:.1f}%,变盘信号") + elif bb_width > bb_width_prev * 1.2: + lines.append(f"- **布林带开口**: 宽度 {bb_width:.1f}%,趋势延续") + + return "\n".join(lines) + + return "\n".join(parts) + + def _format_indicators(self, row: pd.Series, df: pd.DataFrame = None) -> str: + """格式化指标数据(含趋势变化分析)""" lines = [] # 价格 @@ -258,38 +517,71 @@ class LLMSignalAnalyzer: ma20 = row.get('ma20', 0) ma50 = row.get('ma50', 0) if pd.notna(ma20): + # 判断均线排列 + if pd.notna(ma5) and pd.notna(ma10): + if ma5 > ma10 > ma20: + ma_trend = "多头排列" + elif ma5 < ma10 < ma20: + ma_trend = "空头排列" + else: + ma_trend = "交织" + else: + ma_trend = "" ma_str = f"- 均线: MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f}" if pd.notna(ma50): ma_str += f", MA50={ma50:.2f}" + if ma_trend: + ma_str += f" ({ma_trend})" lines.append(ma_str) - # RSI + # RSI(含趋势分析) rsi = row.get('rsi', 0) if pd.notna(rsi): rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性") - lines.append(f"- RSI: {rsi:.1f} ({rsi_status})") + rsi_trend = self._analyze_indicator_trend(df, 'rsi', 6) if df is not None else "" + rsi_line = f"- RSI: {rsi:.1f} ({rsi_status})" + if rsi_trend: + rsi_line += f" {rsi_trend}" + lines.append(rsi_line) - # MACD + # MACD(含趋势分析) macd = row.get('macd', 0) macd_signal = row.get('macd_signal', 0) macd_hist = row.get('macd_hist', 0) if pd.notna(macd): macd_status = "多头" if macd > macd_signal else "空头" - lines.append(f"- MACD: DIF={macd:.4f}, DEA={macd_signal:.4f}, 柱={macd_hist:.4f} ({macd_status})") + macd_trend = self._analyze_macd_trend(df) if df is not None else "" + macd_line = f"- MACD: DIF={macd:.4f}, DEA={macd_signal:.4f}, 柱={macd_hist:.4f} ({macd_status})" + if macd_trend: + macd_line += f" {macd_trend}" + lines.append(macd_line) - # KDJ + # KDJ(含金叉死叉检测) k = row.get('k', 0) d = row.get('d', 0) j = row.get('j', 0) if pd.notna(k): - lines.append(f"- KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}") + kdj_signal = self._detect_kdj_cross(df) if df is not None else "" + kdj_line = f"- KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}" + if kdj_signal: + kdj_line += f" {kdj_signal}" + lines.append(kdj_line) - # 布林带 + # 布林带(含位置分析) bb_upper = row.get('bb_upper', 0) bb_middle = row.get('bb_middle', 0) bb_lower = row.get('bb_lower', 0) if pd.notna(bb_upper): - lines.append(f"- 布林带: 上={bb_upper:.2f}, 中={bb_middle:.2f}, 下={bb_lower:.2f}") + # 判断价格在布林带中的位置 + if close >= bb_upper: + bb_pos = "触及上轨" + elif close <= bb_lower: + bb_pos = "触及下轨" + elif close > bb_middle: + bb_pos = "中轨上方" + else: + bb_pos = "中轨下方" + lines.append(f"- 布林带: 上={bb_upper:.2f}, 中={bb_middle:.2f}, 下={bb_lower:.2f} ({bb_pos})") # ATR atr = row.get('atr', 0) @@ -305,10 +597,110 @@ class LLMSignalAnalyzer: return "\n".join(lines) + def _analyze_indicator_trend(self, df: pd.DataFrame, indicator: str, lookback: int = 6) -> str: + """分析指标趋势变化""" + if df is None or len(df) < lookback: + return "" + + recent = df[indicator].iloc[-lookback:] + if recent.isna().any(): + return "" + + first_val = recent.iloc[0] + last_val = recent.iloc[-1] + change = last_val - first_val + + # RSI 特殊处理 + if indicator == 'rsi': + if first_val > 70 and last_val < 70: + return "[从超买回落]" + elif first_val < 30 and last_val > 30: + return "[从超卖反弹]" + elif change > 10: + return "[快速上升]" + elif change < -10: + return "[快速下降]" + + return "" + + def _analyze_macd_trend(self, df: pd.DataFrame, lookback: int = 6) -> str: + """分析 MACD 趋势""" + if df is None or len(df) < lookback: + return "" + + recent_hist = df['macd_hist'].iloc[-lookback:] + recent_macd = df['macd'].iloc[-lookback:] + recent_signal = df['macd_signal'].iloc[-lookback:] + + if recent_hist.isna().any(): + return "" + + # 检测金叉死叉 + for i in range(-3, 0): + if i - 1 >= -len(recent_macd): + prev_diff = recent_macd.iloc[i-1] - recent_signal.iloc[i-1] + curr_diff = recent_macd.iloc[i] - recent_signal.iloc[i] + if prev_diff < 0 and curr_diff > 0: + return "[刚刚金叉]" + elif prev_diff > 0 and curr_diff < 0: + return "[刚刚死叉]" + + # 检测柱状图趋势 + positive_count = sum(1 for x in recent_hist if x > 0) + hist_trend = recent_hist.iloc[-1] - recent_hist.iloc[-3] if len(recent_hist) >= 3 else 0 + + if positive_count == lookback and hist_trend > 0: + return "[红柱持续放大]" + elif positive_count == lookback and hist_trend < 0: + return "[红柱开始缩小]" + elif positive_count == 0 and hist_trend < 0: + return "[绿柱持续放大]" + elif positive_count == 0 and hist_trend > 0: + return "[绿柱开始缩小]" + + return "" + + def _detect_kdj_cross(self, df: pd.DataFrame, lookback: int = 3) -> str: + """检测 KDJ 金叉死叉""" + if df is None or len(df) < lookback: + return "" + + recent_k = df['k'].iloc[-lookback:] + recent_d = df['d'].iloc[-lookback:] + + if recent_k.isna().any() or recent_d.isna().any(): + return "" + + # 检测最近的交叉 + for i in range(-lookback + 1, 0): + prev_diff = recent_k.iloc[i-1] - recent_d.iloc[i-1] + curr_diff = recent_k.iloc[i] - recent_d.iloc[i] + + if prev_diff < 0 and curr_diff > 0: + # 金叉位置判断 + k_val = recent_k.iloc[i] + if k_val < 20: + return "[低位金叉,强买入信号]" + elif k_val < 50: + return "[中位金叉]" + else: + return "[高位金叉,谨慎]" + elif prev_diff > 0 and curr_diff < 0: + k_val = recent_k.iloc[i] + if k_val > 80: + return "[高位死叉,强卖出信号]" + elif k_val > 50: + return "[中位死叉]" + else: + return "[低位死叉,谨慎]" + + return "" + def _format_recent_klines(self, df: pd.DataFrame, interval: str) -> str: """格式化最近 K 线(含量价分析)""" # 根据周期决定显示数量 - count = {'4h': 6, '1h': 12, '15m': 8, '5m': 6}.get(interval, 6) + # 4h: 12根=2天, 1h: 24根=1天, 15m: 16根=4小时, 5m: 12根=1小时 + count = {'4h': 12, '1h': 24, '15m': 16, '5m': 12}.get(interval, 12) if len(df) < count: count = len(df)