This commit is contained in:
aaron 2026-02-09 22:45:30 +08:00
parent eeecc51379
commit 935877b1d1

View File

@ -211,10 +211,35 @@ class LLMSignalAnalyzer:
parts.append(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") parts.append(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# 当前价格 # 当前价格
current_price = 0
if '5m' in data and not data['5m'].empty: if '5m' in data and not data['5m'].empty:
current_price = float(data['5m'].iloc[-1]['close']) current_price = float(data['5m'].iloc[-1]['close'])
parts.append(f"**当前价格**: ${current_price:,.2f}\n") parts.append(f"**当前价格**: ${current_price:,.2f}\n")
# === 新增:关键价位分析 ===
key_levels = self._calculate_key_levels(data)
if key_levels:
parts.append("\n## 关键价位")
parts.append(key_levels)
# === 新增:多周期共振分析 ===
resonance = self._analyze_multi_timeframe_resonance(data)
if resonance:
parts.append("\n## 多周期共振")
parts.append(resonance)
# === 新增:市场结构分析 ===
structure = self._analyze_market_structure(data)
if structure:
parts.append("\n## 市场结构")
parts.append(structure)
# === 新增:波动率分析 ===
volatility = self._analyze_volatility(data)
if volatility:
parts.append("\n## 波动率分析")
parts.append(volatility)
# 各周期数据 # 各周期数据
for interval in ['4h', '1h', '15m', '5m']: for interval in ['4h', '1h', '15m', '5m']:
df = data.get(interval) df = data.get(interval)
@ -223,9 +248,9 @@ class LLMSignalAnalyzer:
parts.append(f"\n## {interval.upper()} 周期数据") parts.append(f"\n## {interval.upper()} 周期数据")
# 最新指标 # 最新指标(传入 df 以分析趋势变化)
latest = df.iloc[-1] latest = df.iloc[-1]
parts.append(self._format_indicators(latest)) parts.append(self._format_indicators(latest, df))
# 最近 K 线数据 # 最近 K 线数据
parts.append(self._format_recent_klines(df, interval)) parts.append(self._format_recent_klines(df, interval))
@ -240,8 +265,242 @@ class LLMSignalAnalyzer:
return "\n".join(parts) return "\n".join(parts)
def _format_indicators(self, row: pd.Series) -> str: def _calculate_key_levels(self, data: Dict[str, pd.DataFrame]) -> str:
"""格式化指标数据""" """计算关键支撑阻力位"""
lines = []
# 使用 4h 数据计算关键价位
df = data.get('4h')
if df is None or len(df) < 20:
return ""
current_price = float(df.iloc[-1]['close'])
# 1. 前高前低(最近 20 根 K 线)
recent = df.iloc[-20:]
recent_high = float(recent['high'].max())
recent_low = float(recent['low'].min())
# 2. 整数关口
round_levels = []
base = int(current_price / 1000) * 1000
for offset in [-2000, -1000, 0, 1000, 2000]:
level = base + offset
if level > 0:
round_levels.append(level)
# 3. 斐波那契回撤位(基于最近的高低点)
fib_levels = []
price_range = recent_high - recent_low
if price_range > 0:
fib_ratios = [0, 0.236, 0.382, 0.5, 0.618, 0.786, 1]
for ratio in fib_ratios:
fib_price = recent_low + price_range * ratio
fib_levels.append((ratio, fib_price))
# 构建输出
lines.append(f"- 近期高点: ${recent_high:,.2f}")
lines.append(f"- 近期低点: ${recent_low:,.2f}")
# 找出最近的支撑和阻力
supports = []
resistances = []
# 从斐波那契位找支撑阻力
for ratio, price in fib_levels:
if price < current_price * 0.995: # 低于当前价 0.5% 以上
supports.append(price)
elif price > current_price * 1.005: # 高于当前价 0.5% 以上
resistances.append(price)
# 添加整数关口
for level in round_levels:
if level < current_price * 0.995:
supports.append(level)
elif level > current_price * 1.005:
resistances.append(level)
# 排序并取最近的
supports = sorted(set(supports), reverse=True)[:3]
resistances = sorted(set(resistances))[:3]
if supports:
support_str = ", ".join([f"${s:,.0f}" for s in supports])
lines.append(f"- 支撑位: {support_str}")
if resistances:
resistance_str = ", ".join([f"${r:,.0f}" for r in resistances])
lines.append(f"- 阻力位: {resistance_str}")
# 当前价格位置
if recent_high > recent_low:
position = (current_price - recent_low) / (recent_high - recent_low) * 100
if position > 80:
pos_text = "接近高点,注意回调风险"
elif position < 20:
pos_text = "接近低点,关注反弹机会"
else:
pos_text = f"处于区间 {position:.0f}% 位置"
lines.append(f"- 价格位置: {pos_text}")
return "\n".join(lines)
def _analyze_multi_timeframe_resonance(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析多周期共振"""
trends = {}
for interval in ['4h', '1h', '15m', '5m']:
df = data.get(interval)
if df is None or len(df) < 10:
continue
# 判断趋势方向
ma5 = df['ma5'].iloc[-1] if 'ma5' in df.columns else None
ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns else None
close = df['close'].iloc[-1]
if pd.notna(ma5) and pd.notna(ma20):
if close > ma5 > ma20:
trends[interval] = 'bullish'
elif close < ma5 < ma20:
trends[interval] = 'bearish'
else:
trends[interval] = 'neutral'
if len(trends) < 2:
return ""
lines = []
# 统计各方向数量
bullish_count = sum(1 for t in trends.values() if t == 'bullish')
bearish_count = sum(1 for t in trends.values() if t == 'bearish')
total = len(trends)
# 各周期趋势
trend_map = {'bullish': '📈多', 'bearish': '📉空', 'neutral': '➡️震荡'}
trend_str = " | ".join([f"{k}: {trend_map.get(v, v)}" for k, v in trends.items()])
lines.append(f"- 各周期趋势: {trend_str}")
# 共振判断
if bullish_count == total:
lines.append(f"- **强共振做多**: 所有周期均为多头排列")
elif bearish_count == total:
lines.append(f"- **强共振做空**: 所有周期均为空头排列")
elif bullish_count >= total * 0.75:
lines.append(f"- **偏多共振**: {bullish_count}/{total} 周期看多")
elif bearish_count >= total * 0.75:
lines.append(f"- **偏空共振**: {bearish_count}/{total} 周期看空")
else:
lines.append(f"- **无明显共振**: 多空分歧,建议观望")
return "\n".join(lines)
def _analyze_market_structure(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析市场结构(趋势、高低点)"""
df = data.get('1h')
if df is None or len(df) < 24:
return ""
lines = []
recent = df.iloc[-24:] # 最近 24 根 1h K 线
# 找出局部高低点
highs = []
lows = []
for i in range(2, len(recent) - 2):
# 局部高点:比前后两根都高
if (recent.iloc[i]['high'] > recent.iloc[i-1]['high'] and
recent.iloc[i]['high'] > recent.iloc[i-2]['high'] and
recent.iloc[i]['high'] > recent.iloc[i+1]['high'] and
recent.iloc[i]['high'] > recent.iloc[i+2]['high']):
highs.append((i, float(recent.iloc[i]['high'])))
# 局部低点:比前后两根都低
if (recent.iloc[i]['low'] < recent.iloc[i-1]['low'] and
recent.iloc[i]['low'] < recent.iloc[i-2]['low'] and
recent.iloc[i]['low'] < recent.iloc[i+1]['low'] and
recent.iloc[i]['low'] < recent.iloc[i+2]['low']):
lows.append((i, float(recent.iloc[i]['low'])))
# 判断趋势结构
if len(highs) >= 2 and len(lows) >= 2:
# 检查高点是否越来越高
higher_highs = all(highs[i][1] < highs[i+1][1] for i in range(len(highs)-1))
lower_highs = all(highs[i][1] > highs[i+1][1] for i in range(len(highs)-1))
# 检查低点是否越来越高
higher_lows = all(lows[i][1] < lows[i+1][1] for i in range(len(lows)-1))
lower_lows = all(lows[i][1] > lows[i+1][1] for i in range(len(lows)-1))
if higher_highs and higher_lows:
lines.append("- **上升趋势**: 更高的高点(HH) + 更高的低点(HL)")
elif lower_highs and lower_lows:
lines.append("- **下降趋势**: 更低的高点(LH) + 更低的低点(LL)")
elif higher_lows and lower_highs:
lines.append("- **收敛三角形**: 高点下移 + 低点上移,即将突破")
elif lower_lows and higher_highs:
lines.append("- **扩散形态**: 波动加大,方向不明")
else:
lines.append("- **震荡结构**: 无明显趋势")
else:
lines.append("- **结构不明**: 高低点不足,难以判断")
# 计算趋势强度
if len(recent) >= 10:
price_change = (float(recent.iloc[-1]['close']) - float(recent.iloc[0]['close'])) / float(recent.iloc[0]['close']) * 100
if abs(price_change) > 3:
direction = "上涨" if price_change > 0 else "下跌"
lines.append(f"- 24h 趋势: {direction} {abs(price_change):.1f}%")
return "\n".join(lines)
def _analyze_volatility(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析波动率变化"""
df = data.get('1h')
if df is None or len(df) < 24 or 'atr' not in df.columns:
return ""
lines = []
# ATR 变化趋势
recent_atr = df['atr'].iloc[-6:].mean() # 最近 6 根
older_atr = df['atr'].iloc[-12:-6].mean() # 之前 6 根
if pd.isna(recent_atr) or pd.isna(older_atr) or older_atr == 0:
return ""
atr_change = (recent_atr - older_atr) / older_atr * 100
current_atr = float(df['atr'].iloc[-1])
current_price = float(df['close'].iloc[-1])
atr_percent = current_atr / current_price * 100
lines.append(f"- 当前 ATR: ${current_atr:.2f} ({atr_percent:.2f}%)")
if atr_change > 20:
lines.append(f"- **波动率扩张**: ATR 上升 {atr_change:.0f}%,趋势可能启动")
elif atr_change < -20:
lines.append(f"- **波动率收缩**: ATR 下降 {abs(atr_change):.0f}%,可能即将突破")
else:
lines.append(f"- 波动率稳定: ATR 变化 {atr_change:+.0f}%")
# 布林带宽度
if 'bb_upper' in df.columns and 'bb_lower' in df.columns:
bb_width = (float(df['bb_upper'].iloc[-1]) - float(df['bb_lower'].iloc[-1])) / current_price * 100
bb_width_prev = (float(df['bb_upper'].iloc[-6]) - float(df['bb_lower'].iloc[-6])) / float(df['close'].iloc[-6]) * 100
if bb_width < bb_width_prev * 0.8:
lines.append(f"- **布林带收口**: 宽度 {bb_width:.1f}%,变盘信号")
elif bb_width > bb_width_prev * 1.2:
lines.append(f"- **布林带开口**: 宽度 {bb_width:.1f}%,趋势延续")
return "\n".join(lines)
return "\n".join(parts)
def _format_indicators(self, row: pd.Series, df: pd.DataFrame = None) -> str:
"""格式化指标数据(含趋势变化分析)"""
lines = [] lines = []
# 价格 # 价格
@ -258,38 +517,71 @@ class LLMSignalAnalyzer:
ma20 = row.get('ma20', 0) ma20 = row.get('ma20', 0)
ma50 = row.get('ma50', 0) ma50 = row.get('ma50', 0)
if pd.notna(ma20): if pd.notna(ma20):
# 判断均线排列
if pd.notna(ma5) and pd.notna(ma10):
if ma5 > ma10 > ma20:
ma_trend = "多头排列"
elif ma5 < ma10 < ma20:
ma_trend = "空头排列"
else:
ma_trend = "交织"
else:
ma_trend = ""
ma_str = f"- 均线: MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f}" ma_str = f"- 均线: MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f}"
if pd.notna(ma50): if pd.notna(ma50):
ma_str += f", MA50={ma50:.2f}" ma_str += f", MA50={ma50:.2f}"
if ma_trend:
ma_str += f" ({ma_trend})"
lines.append(ma_str) lines.append(ma_str)
# RSI # RSI(含趋势分析)
rsi = row.get('rsi', 0) rsi = row.get('rsi', 0)
if pd.notna(rsi): if pd.notna(rsi):
rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性") rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性")
lines.append(f"- RSI: {rsi:.1f} ({rsi_status})") rsi_trend = self._analyze_indicator_trend(df, 'rsi', 6) if df is not None else ""
rsi_line = f"- RSI: {rsi:.1f} ({rsi_status})"
if rsi_trend:
rsi_line += f" {rsi_trend}"
lines.append(rsi_line)
# MACD # MACD(含趋势分析)
macd = row.get('macd', 0) macd = row.get('macd', 0)
macd_signal = row.get('macd_signal', 0) macd_signal = row.get('macd_signal', 0)
macd_hist = row.get('macd_hist', 0) macd_hist = row.get('macd_hist', 0)
if pd.notna(macd): if pd.notna(macd):
macd_status = "多头" if macd > macd_signal else "空头" macd_status = "多头" if macd > macd_signal else "空头"
lines.append(f"- MACD: DIF={macd:.4f}, DEA={macd_signal:.4f}, 柱={macd_hist:.4f} ({macd_status})") macd_trend = self._analyze_macd_trend(df) if df is not None else ""
macd_line = f"- MACD: DIF={macd:.4f}, DEA={macd_signal:.4f}, 柱={macd_hist:.4f} ({macd_status})"
if macd_trend:
macd_line += f" {macd_trend}"
lines.append(macd_line)
# KDJ # KDJ(含金叉死叉检测)
k = row.get('k', 0) k = row.get('k', 0)
d = row.get('d', 0) d = row.get('d', 0)
j = row.get('j', 0) j = row.get('j', 0)
if pd.notna(k): if pd.notna(k):
lines.append(f"- KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}") kdj_signal = self._detect_kdj_cross(df) if df is not None else ""
kdj_line = f"- KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}"
if kdj_signal:
kdj_line += f" {kdj_signal}"
lines.append(kdj_line)
# 布林带 # 布林带(含位置分析)
bb_upper = row.get('bb_upper', 0) bb_upper = row.get('bb_upper', 0)
bb_middle = row.get('bb_middle', 0) bb_middle = row.get('bb_middle', 0)
bb_lower = row.get('bb_lower', 0) bb_lower = row.get('bb_lower', 0)
if pd.notna(bb_upper): if pd.notna(bb_upper):
lines.append(f"- 布林带: 上={bb_upper:.2f}, 中={bb_middle:.2f}, 下={bb_lower:.2f}") # 判断价格在布林带中的位置
if close >= bb_upper:
bb_pos = "触及上轨"
elif close <= bb_lower:
bb_pos = "触及下轨"
elif close > bb_middle:
bb_pos = "中轨上方"
else:
bb_pos = "中轨下方"
lines.append(f"- 布林带: 上={bb_upper:.2f}, 中={bb_middle:.2f}, 下={bb_lower:.2f} ({bb_pos})")
# ATR # ATR
atr = row.get('atr', 0) atr = row.get('atr', 0)
@ -305,10 +597,110 @@ class LLMSignalAnalyzer:
return "\n".join(lines) return "\n".join(lines)
def _analyze_indicator_trend(self, df: pd.DataFrame, indicator: str, lookback: int = 6) -> str:
"""分析指标趋势变化"""
if df is None or len(df) < lookback:
return ""
recent = df[indicator].iloc[-lookback:]
if recent.isna().any():
return ""
first_val = recent.iloc[0]
last_val = recent.iloc[-1]
change = last_val - first_val
# RSI 特殊处理
if indicator == 'rsi':
if first_val > 70 and last_val < 70:
return "[从超买回落]"
elif first_val < 30 and last_val > 30:
return "[从超卖反弹]"
elif change > 10:
return "[快速上升]"
elif change < -10:
return "[快速下降]"
return ""
def _analyze_macd_trend(self, df: pd.DataFrame, lookback: int = 6) -> str:
"""分析 MACD 趋势"""
if df is None or len(df) < lookback:
return ""
recent_hist = df['macd_hist'].iloc[-lookback:]
recent_macd = df['macd'].iloc[-lookback:]
recent_signal = df['macd_signal'].iloc[-lookback:]
if recent_hist.isna().any():
return ""
# 检测金叉死叉
for i in range(-3, 0):
if i - 1 >= -len(recent_macd):
prev_diff = recent_macd.iloc[i-1] - recent_signal.iloc[i-1]
curr_diff = recent_macd.iloc[i] - recent_signal.iloc[i]
if prev_diff < 0 and curr_diff > 0:
return "[刚刚金叉]"
elif prev_diff > 0 and curr_diff < 0:
return "[刚刚死叉]"
# 检测柱状图趋势
positive_count = sum(1 for x in recent_hist if x > 0)
hist_trend = recent_hist.iloc[-1] - recent_hist.iloc[-3] if len(recent_hist) >= 3 else 0
if positive_count == lookback and hist_trend > 0:
return "[红柱持续放大]"
elif positive_count == lookback and hist_trend < 0:
return "[红柱开始缩小]"
elif positive_count == 0 and hist_trend < 0:
return "[绿柱持续放大]"
elif positive_count == 0 and hist_trend > 0:
return "[绿柱开始缩小]"
return ""
def _detect_kdj_cross(self, df: pd.DataFrame, lookback: int = 3) -> str:
"""检测 KDJ 金叉死叉"""
if df is None or len(df) < lookback:
return ""
recent_k = df['k'].iloc[-lookback:]
recent_d = df['d'].iloc[-lookback:]
if recent_k.isna().any() or recent_d.isna().any():
return ""
# 检测最近的交叉
for i in range(-lookback + 1, 0):
prev_diff = recent_k.iloc[i-1] - recent_d.iloc[i-1]
curr_diff = recent_k.iloc[i] - recent_d.iloc[i]
if prev_diff < 0 and curr_diff > 0:
# 金叉位置判断
k_val = recent_k.iloc[i]
if k_val < 20:
return "[低位金叉,强买入信号]"
elif k_val < 50:
return "[中位金叉]"
else:
return "[高位金叉,谨慎]"
elif prev_diff > 0 and curr_diff < 0:
k_val = recent_k.iloc[i]
if k_val > 80:
return "[高位死叉,强卖出信号]"
elif k_val > 50:
return "[中位死叉]"
else:
return "[低位死叉,谨慎]"
return ""
def _format_recent_klines(self, df: pd.DataFrame, interval: str) -> str: def _format_recent_klines(self, df: pd.DataFrame, interval: str) -> str:
"""格式化最近 K 线(含量价分析)""" """格式化最近 K 线(含量价分析)"""
# 根据周期决定显示数量 # 根据周期决定显示数量
count = {'4h': 6, '1h': 12, '15m': 8, '5m': 6}.get(interval, 6) # 4h: 12根=2天, 1h: 24根=1天, 15m: 16根=4小时, 5m: 12根=1小时
count = {'4h': 12, '1h': 24, '15m': 16, '5m': 12}.get(interval, 12)
if len(df) < count: if len(df) < count:
count = len(df) count = len(df)