stock-ai-agent/backend/app/crypto_agent/signal_analyzer.py
2026-02-06 00:34:08 +08:00

1108 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
信号分析器 - 多周期技术分析和 LLM 深度分析
"""
import pandas as pd
from typing import Dict, Any, Optional, List
from app.utils.logger import logger
from app.services.llm_service import llm_service
class SignalAnalyzer:
"""交易信号分析器 - 波段交易优化版"""
# LLM 系统提示词 - 波段交易版
CRYPTO_ANALYST_PROMPT = """你是一位经验丰富的加密货币波段交易员,专注于捕捉 1-7 天的中等波段行情。
## 交易风格
- **波段交易**:持仓 1-7 天,不做超短线
- **顺势回调**:在趋势中寻找回调入场机会
- **风险控制**:单笔亏损不超过本金 2%
## 多周期分析框架
1. **4H 周期**:判断主趋势方向和强度
- 趋势明确:价格在 MA20 同侧运行 3 根以上 K 线
- 趋势强度:看 MACD 柱状图是否放大
2. **1H 周期**:确认趋势 + 寻找回调位置
- 上涨趋势中:等待回调到 MA20 或前低支撑
- 下跌趋势中:等待反弹到 MA20 或前高阻力
3. **15M 周期**:入场信号确认
- 做多RSI 从超卖回升 + MACD 金叉 + K 线企稳
- 做空RSI 从超买回落 + MACD 死叉 + K 线见顶
## 入场条件(波段做多)
1. 4H 趋势向上(价格 > MA20MACD > 0 或底背离)
2. 1H 回调到支撑位MA20 附近或前低)
3. 15M 出现止跌信号RSI < 40 回升,或 MACD 金叉)
4. 止损明确(前低下方),风险收益比 >= 1:2
## 入场条件(波段做空)
1. 4H 趋势向下(价格 < MA20MACD < 0 或顶背离)
2. 1H 反弹到阻力位MA20 附近或前高)
3. 15M 出现见顶信号RSI > 60 回落,或 MACD 死叉)
4. 止损明确(前高上方),风险收益比 >= 1:2
## 特殊情况处理
- **极度超卖RSI < 20**:不追空,等待反弹做多机会
- **极度超买RSI > 80**:不追多,等待回调做空机会
- **震荡市**:观望,等待突破方向
## 输出格式JSON
```json
{
"market_structure": {
"trend": "uptrend/downtrend/sideways",
"strength": "strong/moderate/weak",
"phase": "impulse/correction/reversal"
},
"key_levels": {
"resistance": [阻力位1, 阻力位2],
"support": [支撑位1, 支撑位2]
},
"signal": {
"quality": "A/B/C/D",
"action": "buy/sell/wait",
"confidence": 0-100,
"entry_zone": [入场区间下限, 入场区间上限],
"stop_loss": 止损价,
"targets": [目标1, 目标2],
"reason": "入场理由"
},
"risk_warning": "风险提示"
}
```
信号质量说明:
- A级趋势明确 + 回调到位 + 多重信号共振(置信度 80+
- B级趋势明确 + 信号较好(置信度 60-80
- C级有机会但需要更多确认置信度 40-60
- D级不建议交易置信度 < 40
重要:波段交易要有耐心,宁可错过也不要在不理想的位置入场。"""
def __init__(self):
"""初始化信号分析器"""
logger.info("信号分析器初始化完成")
# ==================== K线形态识别 ====================
def _detect_candlestick_patterns(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
识别 K 线形态
Args:
df: K线数据至少需要3根K线
Returns:
{
'bullish_patterns': [...], # 看涨形态
'bearish_patterns': [...], # 看跌形态
'pattern_weight': float # 形态权重
}
"""
if len(df) < 3:
return {'bullish_patterns': [], 'bearish_patterns': [], 'pattern_weight': 0}
bullish = []
bearish = []
weight = 0
# 获取最近3根K线
curr = df.iloc[-1]
prev = df.iloc[-2]
prev2 = df.iloc[-3]
curr_body = curr['close'] - curr['open']
curr_body_abs = abs(curr_body)
curr_range = curr['high'] - curr['low']
prev_body = prev['close'] - prev['open']
prev_body_abs = abs(prev_body)
# 避免除零
if curr_range == 0:
curr_range = 0.0001
# === 锤子线 / 倒锤子线 ===
upper_shadow = curr['high'] - max(curr['open'], curr['close'])
lower_shadow = min(curr['open'], curr['close']) - curr['low']
# 锤子线:下影线长,实体小,出现在下跌后
if lower_shadow > curr_body_abs * 2 and upper_shadow < curr_body_abs * 0.5:
if prev_body < 0: # 前一根是阴线
bullish.append("锤子线")
weight += 1.5
# 倒锤子线:上影线长,实体小,出现在下跌后
if upper_shadow > curr_body_abs * 2 and lower_shadow < curr_body_abs * 0.5:
if prev_body < 0:
bullish.append("倒锤子线")
weight += 1
# 上吊线:锤子线形态但出现在上涨后
if lower_shadow > curr_body_abs * 2 and upper_shadow < curr_body_abs * 0.5:
if prev_body > 0:
bearish.append("上吊线")
weight -= 1.5
# === 吞没形态 ===
# 看涨吞没:阳线实体完全包住前一根阴线
if curr_body > 0 and prev_body < 0:
if curr['open'] <= prev['close'] and curr['close'] >= prev['open']:
if curr_body_abs > prev_body_abs * 1.2:
bullish.append("看涨吞没")
weight += 2
# 看跌吞没:阴线实体完全包住前一根阳线
if curr_body < 0 and prev_body > 0:
if curr['open'] >= prev['close'] and curr['close'] <= prev['open']:
if curr_body_abs > prev_body_abs * 1.2:
bearish.append("看跌吞没")
weight -= 2
# === 十字星 ===
if curr_body_abs < curr_range * 0.1: # 实体很小
if upper_shadow > curr_range * 0.3 and lower_shadow > curr_range * 0.3:
# 十字星本身是中性的需要结合前一根K线判断
if prev_body > 0:
bearish.append("十字星(上涨后)")
weight -= 1
elif prev_body < 0:
bullish.append("十字星(下跌后)")
weight += 1
# === 早晨之星 / 黄昏之星 (3根K线形态) ===
prev2_body = prev2['close'] - prev2['open']
prev_range = prev['high'] - prev['low'] if prev['high'] != prev['low'] else 0.0001
# 早晨之星:大阴线 + 小实体(星) + 大阳线
if prev2_body < 0 and abs(prev2_body) > prev_range * 0.5: # 第一根大阴线
if abs(prev_body) < prev_range * 0.3: # 第二根小实体
if curr_body > 0 and curr_body_abs > curr_range * 0.5: # 第三根大阳线
if curr['close'] > (prev2['open'] + prev2['close']) / 2:
bullish.append("早晨之星")
weight += 2.5
# 黄昏之星:大阳线 + 小实体(星) + 大阴线
if prev2_body > 0 and prev2_body > prev_range * 0.5:
if abs(prev_body) < prev_range * 0.3:
if curr_body < 0 and curr_body_abs > curr_range * 0.5:
if curr['close'] < (prev2['open'] + prev2['close']) / 2:
bearish.append("黄昏之星")
weight -= 2.5
return {
'bullish_patterns': bullish,
'bearish_patterns': bearish,
'pattern_weight': weight
}
# ==================== 支撑阻力位计算 ====================
def _calculate_support_resistance(self, df: pd.DataFrame, current_price: float) -> Dict[str, Any]:
"""
计算支撑位和阻力位
Args:
df: K线数据建议使用1H或4H数据
current_price: 当前价格
Returns:
{
'supports': [支撑位1, 支撑位2],
'resistances': [阻力位1, 阻力位2],
'nearest_support': float,
'nearest_resistance': float,
'at_support': bool,
'at_resistance': bool
}
"""
if len(df) < 20:
return {
'supports': [], 'resistances': [],
'nearest_support': 0, 'nearest_resistance': 0,
'at_support': False, 'at_resistance': False
}
# 方法1使用近期高低点
highs = df['high'].tail(50).values
lows = df['low'].tail(50).values
# 找局部高点和低点
local_highs = []
local_lows = []
for i in range(2, len(highs) - 2):
# 局部高点
if highs[i] > highs[i-1] and highs[i] > highs[i-2] and \
highs[i] > highs[i+1] and highs[i] > highs[i+2]:
local_highs.append(highs[i])
# 局部低点
if lows[i] < lows[i-1] and lows[i] < lows[i-2] and \
lows[i] < lows[i+1] and lows[i] < lows[i+2]:
local_lows.append(lows[i])
# 方法2使用均线作为动态支撑阻力
ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns and pd.notna(df['ma20'].iloc[-1]) else 0
ma50 = df['ma50'].iloc[-1] if 'ma50' in df.columns and pd.notna(df['ma50'].iloc[-1]) else 0
# 方法3布林带
bb_upper = df['bb_upper'].iloc[-1] if 'bb_upper' in df.columns and pd.notna(df['bb_upper'].iloc[-1]) else 0
bb_lower = df['bb_lower'].iloc[-1] if 'bb_lower' in df.columns and pd.notna(df['bb_lower'].iloc[-1]) else 0
# 合并所有支撑位(低于当前价格)
all_supports = []
if local_lows:
all_supports.extend([l for l in local_lows if l < current_price])
if ma20 and ma20 < current_price:
all_supports.append(ma20)
if ma50 and ma50 < current_price:
all_supports.append(ma50)
if bb_lower and bb_lower < current_price:
all_supports.append(bb_lower)
# 合并所有阻力位(高于当前价格)
all_resistances = []
if local_highs:
all_resistances.extend([h for h in local_highs if h > current_price])
if ma20 and ma20 > current_price:
all_resistances.append(ma20)
if ma50 and ma50 > current_price:
all_resistances.append(ma50)
if bb_upper and bb_upper > current_price:
all_resistances.append(bb_upper)
# 排序并去重(合并相近的价位)
supports = sorted(set(all_supports), reverse=True)[:3] # 最近的3个支撑
resistances = sorted(set(all_resistances))[:3] # 最近的3个阻力
# 找最近的支撑和阻力
nearest_support = supports[0] if supports else 0
nearest_resistance = resistances[0] if resistances else 0
# 判断是否在支撑/阻力位附近1%范围内)
at_support = nearest_support > 0 and abs(current_price - nearest_support) / current_price < 0.01
at_resistance = nearest_resistance > 0 and abs(current_price - nearest_resistance) / current_price < 0.01
return {
'supports': supports,
'resistances': resistances,
'nearest_support': nearest_support,
'nearest_resistance': nearest_resistance,
'at_support': at_support,
'at_resistance': at_resistance
}
# ==================== 成交量分析 ====================
def _analyze_volume(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
分析成交量
Args:
df: K线数据需要包含 volume, volume_ma20, volume_ratio
Returns:
{
'volume_signal': 'high' | 'normal' | 'low',
'volume_trend': 'increasing' | 'decreasing' | 'stable',
'volume_confirms': bool, # 成交量是否确认价格走势
'volume_weight': float
}
"""
if len(df) < 5 or 'volume' not in df.columns:
return {
'volume_signal': 'normal',
'volume_trend': 'stable',
'volume_confirms': False,
'volume_weight': 0
}
latest = df.iloc[-1]
prev = df.iloc[-2]
# 量比判断
volume_ratio = latest.get('volume_ratio', 1)
if pd.isna(volume_ratio):
volume_ratio = 1
if volume_ratio > 2:
volume_signal = 'high'
elif volume_ratio < 0.5:
volume_signal = 'low'
else:
volume_signal = 'normal'
# 成交量趋势最近5根K线
recent_volumes = df['volume'].tail(5)
if len(recent_volumes) >= 5:
first_half = recent_volumes.iloc[:2].mean()
second_half = recent_volumes.iloc[-2:].mean()
if second_half > first_half * 1.3:
volume_trend = 'increasing'
elif second_half < first_half * 0.7:
volume_trend = 'decreasing'
else:
volume_trend = 'stable'
else:
volume_trend = 'stable'
# 判断成交量是否确认价格走势
price_up = latest['close'] > prev['close']
volume_up = latest['volume'] > prev['volume']
# 价涨量增 或 价跌量缩 = 确认
volume_confirms = (price_up and volume_up) or (not price_up and not volume_up)
# 计算权重
weight = 0
if volume_signal == 'high' and volume_confirms:
weight = 1.5
elif volume_signal == 'high' and not volume_confirms:
weight = -0.5 # 放量但不确认,可能是假突破
elif volume_signal == 'low':
weight = -0.5 # 缩量,信号可靠性降低
return {
'volume_signal': volume_signal,
'volume_trend': volume_trend,
'volume_confirms': volume_confirms,
'volume_weight': weight
}
# ==================== 5M 精确入场 ====================
def _analyze_5m_entry(self, m5_data: pd.DataFrame, action: str) -> Dict[str, Any]:
"""
使用 5M 数据寻找精确入场点
Args:
m5_data: 5分钟K线数据
action: 'buy''sell'
Returns:
{
'entry_confirmed': bool,
'entry_reasons': [...],
'entry_weight': float
}
"""
if len(m5_data) < 5:
return {'entry_confirmed': False, 'entry_reasons': [], 'entry_weight': 0}
latest = m5_data.iloc[-1]
prev = m5_data.iloc[-2]
reasons = []
weight = 0
# 获取指标
rsi = latest.get('rsi', 50)
prev_rsi = prev.get('rsi', 50)
macd = latest.get('macd', 0)
macd_signal = latest.get('macd_signal', 0)
prev_macd = prev.get('macd', 0)
prev_macd_signal = prev.get('macd_signal', 0)
if action == 'buy':
# 5M RSI 从超卖回升
if pd.notna(rsi) and pd.notna(prev_rsi):
if rsi < 40 and rsi > prev_rsi:
reasons.append("5M RSI回升")
weight += 1
if rsi < 30:
reasons.append("5M RSI超卖")
weight += 0.5
# 5M MACD 金叉
if pd.notna(macd) and pd.notna(prev_macd):
if prev_macd <= prev_macd_signal and macd > macd_signal:
reasons.append("5M MACD金叉")
weight += 1.5
# 5M K线企稳阳线
if latest['close'] > latest['open']:
if prev['close'] < prev['open']: # 前一根是阴线
reasons.append("5M阳线反转")
weight += 1
elif action == 'sell':
# 5M RSI 从超买回落
if pd.notna(rsi) and pd.notna(prev_rsi):
if rsi > 60 and rsi < prev_rsi:
reasons.append("5M RSI回落")
weight += 1
if rsi > 70:
reasons.append("5M RSI超买")
weight += 0.5
# 5M MACD 死叉
if pd.notna(macd) and pd.notna(prev_macd):
if prev_macd >= prev_macd_signal and macd < macd_signal:
reasons.append("5M MACD死叉")
weight += 1.5
# 5M K线见顶阴线
if latest['close'] < latest['open']:
if prev['close'] > prev['open']:
reasons.append("5M阴线反转")
weight += 1
entry_confirmed = weight >= 2
return {
'entry_confirmed': entry_confirmed,
'entry_reasons': reasons,
'entry_weight': weight
}
def analyze_trend(self, h1_data: pd.DataFrame, h4_data: pd.DataFrame) -> Dict[str, Any]:
"""
分析趋势方向和强度(波段交易优化版)
Args:
h1_data: 1小时K线数据含技术指标
h4_data: 4小时K线数据含技术指标
Returns:
{
'direction': 'bullish' | 'bearish' | 'neutral',
'strength': 'strong' | 'moderate' | 'weak',
'phase': 'impulse' | 'correction' | 'reversal',
'h4_score': float,
'h1_score': float
}
"""
if h1_data.empty or h4_data.empty:
return {
'direction': 'neutral',
'strength': 'weak',
'phase': 'sideways',
'h4_score': 0,
'h1_score': 0
}
# 获取最新数据
h1_latest = h1_data.iloc[-1]
h4_latest = h4_data.iloc[-1]
# 计算各周期的趋势得分
h4_score, h4_details = self._calculate_trend_score(h4_latest)
h1_score, h1_details = self._calculate_trend_score(h1_latest)
# 判断趋势方向4H 为主)
if h4_score > 0.3:
direction = 'bullish'
elif h4_score < -0.3:
direction = 'bearish'
else:
direction = 'neutral'
# 判断趋势强度
strength = self._assess_trend_strength(h4_data, h1_data)
# 判断当前阶段(是主升/主跌还是回调)
phase = self._detect_market_phase(h4_data, h1_data, direction)
# 检查极端情况
h4_rsi = h4_latest.get('rsi', 50)
extreme_warning = ""
if pd.notna(h4_rsi):
if h4_rsi < 20:
extreme_warning = f" [RSI={h4_rsi:.1f} 极度超卖]"
phase = 'oversold'
elif h4_rsi > 80:
extreme_warning = f" [RSI={h4_rsi:.1f} 极度超买]"
phase = 'overbought'
logger.info(f"趋势分析: 方向={direction}, 强度={strength}, 阶段={phase} | "
f"4H={h4_score:.2f}{h4_details}, 1H={h1_score:.2f}{h1_details}{extreme_warning}")
return {
'direction': direction,
'strength': strength,
'phase': phase,
'h4_score': h4_score,
'h1_score': h1_score
}
def _assess_trend_strength(self, h4_data: pd.DataFrame, h1_data: pd.DataFrame) -> str:
"""评估趋势强度"""
if len(h4_data) < 5:
return 'weak'
h4_latest = h4_data.iloc[-1]
# 检查 MACD 柱状图是否放大
macd_hist = h4_data['macd_hist'].tail(5)
macd_expanding = False
if len(macd_hist) >= 3:
recent_abs = abs(macd_hist.iloc[-1])
prev_abs = abs(macd_hist.iloc[-3])
if recent_abs > prev_abs * 1.2:
macd_expanding = True
# 检查价格是否持续在 MA20 同侧
close_prices = h4_data['close'].tail(5)
ma20_values = h4_data['ma20'].tail(5)
consistent_side = True
if pd.notna(ma20_values.iloc[-1]):
above_ma = close_prices > ma20_values
consistent_side = above_ma.all() or (~above_ma).all()
# 检查 RSI 是否在趋势区间
rsi = h4_latest.get('rsi', 50)
rsi_trending = 40 < rsi < 60 # 中性区间表示趋势不强
if macd_expanding and consistent_side and not rsi_trending:
return 'strong'
elif consistent_side:
return 'moderate'
else:
return 'weak'
def _detect_market_phase(self, h4_data: pd.DataFrame, h1_data: pd.DataFrame,
direction: str) -> str:
"""检测市场阶段(主升/主跌 vs 回调)"""
if len(h1_data) < 10:
return 'unknown'
h1_latest = h1_data.iloc[-1]
h4_latest = h4_data.iloc[-1]
# 获取 1H 的短期趋势
h1_ma5 = h1_latest.get('ma5', 0)
h1_ma20 = h1_latest.get('ma20', 0)
h1_close = h1_latest.get('close', 0)
if not (pd.notna(h1_ma5) and pd.notna(h1_ma20)):
return 'unknown'
# 判断 1H 是否在回调
if direction == 'bullish':
# 上涨趋势中1H 价格回落到 MA20 附近 = 回调
if h1_close < h1_ma5 and h1_close > h1_ma20 * 0.98:
return 'correction' # 回调中,可能是入场机会
elif h1_close > h1_ma5:
return 'impulse' # 主升浪
elif direction == 'bearish':
# 下跌趋势中1H 价格反弹到 MA20 附近 = 反弹
if h1_close > h1_ma5 and h1_close < h1_ma20 * 1.02:
return 'correction' # 反弹中,可能是做空机会
elif h1_close < h1_ma5:
return 'impulse' # 主跌浪
return 'sideways'
def _calculate_trend_score(self, data: pd.Series) -> tuple:
"""
计算单周期趋势得分
Args:
data: 包含技术指标的数据行
Returns:
(得分, 详情字符串)
"""
score = 0.0
count = 0
details = []
# 价格与均线关系
if 'close' in data and 'ma20' in data and pd.notna(data['ma20']):
if data['close'] > data['ma20']:
score += 1
details.append("价格>MA20")
else:
score -= 1
details.append("价格<MA20")
count += 1
# MA5 与 MA20 关系
if 'ma5' in data and 'ma20' in data and pd.notna(data['ma5']) and pd.notna(data['ma20']):
if data['ma5'] > data['ma20']:
score += 1
details.append("MA5>MA20")
else:
score -= 1
details.append("MA5<MA20")
count += 1
# MACD
if 'macd' in data and 'macd_signal' in data and pd.notna(data['macd']):
if data['macd'] > data['macd_signal']:
score += 1
details.append("MACD多")
else:
score -= 1
details.append("MACD空")
count += 1
# RSI
if 'rsi' in data and pd.notna(data['rsi']):
if data['rsi'] > 50:
score += 0.5
else:
score -= 0.5
count += 0.5
final_score = score / count if count > 0 else 0
detail_str = f"({','.join(details)})" if details else ""
return final_score, detail_str
def analyze_entry_signal(self, m5_data: pd.DataFrame, m15_data: pd.DataFrame,
trend: Dict[str, Any], h1_data: pd.DataFrame = None) -> Dict[str, Any]:
"""
分析 15M 进场信号(波段交易优化版 - 增强版)
新增功能:
- 成交量确认
- K线形态识别
- 支撑阻力位判断
- 5M精确入场
Args:
m5_data: 5分钟K线数据用于精确入场
m15_data: 15分钟K线数据主要入场周期
trend: 趋势分析结果
h1_data: 1小时K线数据用于支撑阻力位计算可选
Returns:
{
'action': 'buy' | 'sell' | 'hold',
'confidence': 0-100,
'signal_grade': 'A' | 'B' | 'C' | 'D',
'reasons': [...],
'indicators': {...},
'patterns': {...},
'levels': {...},
'volume_analysis': {...}
}
"""
if m5_data.empty or m15_data.empty:
return {'action': 'hold', 'confidence': 0, 'signal_grade': 'D',
'reasons': ['数据不足'], 'indicators': {}}
# 兼容旧格式(如果 trend 是字符串)
if isinstance(trend, str):
trend_direction = trend
trend_phase = 'unknown'
trend_strength = 'moderate'
else:
trend_direction = trend.get('direction', 'neutral')
trend_phase = trend.get('phase', 'unknown')
trend_strength = trend.get('strength', 'moderate')
m15_latest = m15_data.iloc[-1]
current_price = float(m15_latest['close'])
# 收集信号
buy_signals = []
sell_signals = []
signal_weights = {'buy': 0, 'sell': 0}
# ==================== 1. 传统技术指标信号 ====================
# === RSI 信号 ===
if 'rsi' in m15_latest and pd.notna(m15_latest['rsi']):
rsi = m15_latest['rsi']
if rsi < 30:
buy_signals.append(f"RSI超卖({rsi:.1f})")
signal_weights['buy'] += 2
elif rsi < 40 and len(m15_data) >= 2:
prev_rsi = m15_data.iloc[-2].get('rsi', 50)
if pd.notna(prev_rsi) and rsi > prev_rsi:
buy_signals.append(f"RSI回升({prev_rsi:.1f}{rsi:.1f})")
signal_weights['buy'] += 1.5
elif rsi > 70:
sell_signals.append(f"RSI超买({rsi:.1f})")
signal_weights['sell'] += 2
elif rsi > 60 and len(m15_data) >= 2:
prev_rsi = m15_data.iloc[-2].get('rsi', 50)
if pd.notna(prev_rsi) and rsi < prev_rsi:
sell_signals.append(f"RSI回落({prev_rsi:.1f}{rsi:.1f})")
signal_weights['sell'] += 1.5
# === MACD 信号 ===
if len(m15_data) >= 2:
prev = m15_data.iloc[-2]
if 'macd' in m15_latest and 'macd_signal' in m15_latest:
if pd.notna(m15_latest['macd']) and pd.notna(prev['macd']):
if prev['macd'] <= prev['macd_signal'] and m15_latest['macd'] > m15_latest['macd_signal']:
buy_signals.append("MACD金叉")
signal_weights['buy'] += 2
elif prev['macd'] >= prev['macd_signal'] and m15_latest['macd'] < m15_latest['macd_signal']:
sell_signals.append("MACD死叉")
signal_weights['sell'] += 2
elif abs(m15_latest['macd_hist']) < abs(prev['macd_hist']) * 0.7:
if m15_latest['macd_hist'] > 0:
sell_signals.append("MACD动能减弱")
signal_weights['sell'] += 0.5
else:
buy_signals.append("MACD动能减弱")
signal_weights['buy'] += 0.5
# === 布林带信号 ===
if 'close' in m15_latest and 'bb_lower' in m15_latest and 'bb_upper' in m15_latest:
if pd.notna(m15_latest['bb_lower']) and pd.notna(m15_latest['bb_upper']):
bb_middle = m15_latest.get('bb_middle', (m15_latest['bb_upper'] + m15_latest['bb_lower']) / 2)
if m15_latest['close'] < m15_latest['bb_lower']:
buy_signals.append("触及布林下轨")
signal_weights['buy'] += 1.5
elif m15_latest['close'] > m15_latest['bb_upper']:
sell_signals.append("触及布林上轨")
signal_weights['sell'] += 1.5
elif len(m15_data) >= 2:
prev_close = m15_data.iloc[-2]['close']
if prev_close < bb_middle and m15_latest['close'] > bb_middle:
buy_signals.append("突破布林中轨")
signal_weights['buy'] += 1
elif prev_close > bb_middle and m15_latest['close'] < bb_middle:
sell_signals.append("跌破布林中轨")
signal_weights['sell'] += 1
# === KDJ 信号 ===
if 'k' in m15_latest and 'd' in m15_latest and len(m15_data) >= 2:
prev = m15_data.iloc[-2]
if pd.notna(m15_latest['k']) and pd.notna(prev['k']):
if prev['k'] <= prev['d'] and m15_latest['k'] > m15_latest['d']:
if m15_latest['k'] < 30:
buy_signals.append("KDJ低位金叉")
signal_weights['buy'] += 1.5
else:
buy_signals.append("KDJ金叉")
signal_weights['buy'] += 0.5
elif prev['k'] >= prev['d'] and m15_latest['k'] < m15_latest['d']:
if m15_latest['k'] > 70:
sell_signals.append("KDJ高位死叉")
signal_weights['sell'] += 1.5
else:
sell_signals.append("KDJ死叉")
signal_weights['sell'] += 0.5
# ==================== 2. K线形态识别 ====================
patterns = self._detect_candlestick_patterns(m15_data)
if patterns['bullish_patterns']:
buy_signals.extend(patterns['bullish_patterns'])
signal_weights['buy'] += patterns['pattern_weight']
if patterns['bearish_patterns']:
sell_signals.extend(patterns['bearish_patterns'])
signal_weights['sell'] += abs(patterns['pattern_weight'])
# ==================== 3. 成交量分析 ====================
volume_analysis = self._analyze_volume(m15_data)
if volume_analysis['volume_signal'] == 'high':
if volume_analysis['volume_confirms']:
# 放量确认
if signal_weights['buy'] > signal_weights['sell']:
buy_signals.append(f"放量确认(量比{m15_latest.get('volume_ratio', 1):.1f})")
signal_weights['buy'] += volume_analysis['volume_weight']
else:
sell_signals.append(f"放量确认(量比{m15_latest.get('volume_ratio', 1):.1f})")
signal_weights['sell'] += volume_analysis['volume_weight']
else:
# 放量不确认,可能是假信号
if signal_weights['buy'] > signal_weights['sell']:
buy_signals.append("放量但不确认(警惕)")
signal_weights['buy'] += volume_analysis['volume_weight'] # 负权重
else:
sell_signals.append("放量但不确认(警惕)")
signal_weights['sell'] += volume_analysis['volume_weight']
# ==================== 4. 支撑阻力位分析 ====================
levels = {}
if h1_data is not None and not h1_data.empty:
levels = self._calculate_support_resistance(h1_data, current_price)
if levels.get('at_support') and trend_direction == 'bullish':
buy_signals.append(f"触及支撑位({levels['nearest_support']:.2f})")
signal_weights['buy'] += 1.5
if levels.get('at_resistance') and trend_direction == 'bearish':
sell_signals.append(f"触及阻力位({levels['nearest_resistance']:.2f})")
signal_weights['sell'] += 1.5
# ==================== 5. 根据趋势和阶段决定动作 ====================
action = 'hold'
confidence = 0
reasons = []
signal_grade = 'D'
# 波段交易核心逻辑:在回调中寻找入场机会
if trend_direction == 'bullish':
if trend_phase == 'correction' and signal_weights['buy'] >= 3:
action = 'buy'
confidence = min(40 + signal_weights['buy'] * 10, 95)
reasons = buy_signals + [f"上涨趋势回调({trend_strength})"]
signal_grade = 'A' if confidence >= 80 else ('B' if confidence >= 60 else 'C')
elif trend_phase == 'impulse' and signal_weights['buy'] >= 4:
action = 'buy'
confidence = min(30 + signal_weights['buy'] * 8, 80)
reasons = buy_signals + ["主升浪追多"]
signal_grade = 'B' if confidence >= 60 else 'C'
elif trend_phase in ['oversold', 'overbought']:
reasons = ['极端行情,等待企稳']
elif trend_direction == 'bearish':
if trend_phase == 'correction' and signal_weights['sell'] >= 3:
action = 'sell'
confidence = min(40 + signal_weights['sell'] * 10, 95)
reasons = sell_signals + [f"下跌趋势反弹({trend_strength})"]
signal_grade = 'A' if confidence >= 80 else ('B' if confidence >= 60 else 'C')
elif trend_phase == 'impulse' and signal_weights['sell'] >= 4:
action = 'sell'
confidence = min(30 + signal_weights['sell'] * 8, 80)
reasons = sell_signals + ["主跌浪追空"]
signal_grade = 'B' if confidence >= 60 else 'C'
elif trend_phase in ['oversold', 'overbought']:
reasons = ['极端行情,等待企稳']
else: # neutral
reasons = ['趋势不明确,观望']
# ==================== 6. 5M 精确入场确认 ====================
if action != 'hold' and not m5_data.empty:
entry_5m = self._analyze_5m_entry(m5_data, action)
if entry_5m['entry_confirmed']:
confidence = min(confidence + 10, 95)
reasons.extend(entry_5m['entry_reasons'])
if signal_grade == 'B':
signal_grade = 'A'
elif signal_grade == 'C':
signal_grade = 'B'
elif entry_5m['entry_weight'] < 1:
# 5M 没有确认,降低置信度
confidence = max(confidence - 10, 0)
reasons.append("5M未确认入场")
if not reasons:
reasons = ['信号不足,继续观望']
# 收集指标数据
indicators = {}
for col in ['rsi', 'macd', 'macd_signal', 'macd_hist', 'k', 'd', 'j', 'close', 'ma20', 'volume_ratio']:
if col in m15_latest and pd.notna(m15_latest[col]):
indicators[col] = float(m15_latest[col])
# 记录详细日志(简化版,详细日志在 crypto_agent 中输出)
if action != 'hold':
logger.debug(f"信号详情: {action} {confidence}% {signal_grade} | 买权重={signal_weights['buy']:.1f} 卖权重={signal_weights['sell']:.1f}")
return {
'action': action,
'confidence': confidence,
'signal_grade': signal_grade,
'reasons': reasons,
'indicators': indicators,
'trend_info': {
'direction': trend_direction,
'phase': trend_phase,
'strength': trend_strength
},
'patterns': patterns,
'volume_analysis': volume_analysis,
'levels': levels,
'signal_weights': signal_weights
}
async def llm_analyze(self, data: Dict[str, pd.DataFrame], signal: Dict[str, Any],
symbol: str) -> Dict[str, Any]:
"""
使用 LLM 进行深度分析
Args:
data: 多周期K线数据
signal: 初步信号分析结果
symbol: 交易对
Returns:
LLM 分析结果(结构化数据)
"""
try:
# 构建分析提示
prompt = self._build_analysis_prompt(data, signal, symbol)
# 调用 LLM
response = llm_service.chat([
{"role": "system", "content": self.CRYPTO_ANALYST_PROMPT},
{"role": "user", "content": prompt}
])
if response:
logger.info(f"LLM 分析完成: {symbol}")
# 解析 JSON 响应
return self._parse_llm_response(response)
else:
return {"error": "LLM 分析暂时不可用", "raw": ""}
except Exception as e:
logger.error(f"LLM 分析失败: {e}")
return {"error": str(e), "raw": ""}
def _parse_llm_response(self, response: str) -> Dict[str, Any]:
"""
解析 LLM 的 JSON 响应
Args:
response: LLM 原始响应
Returns:
解析后的结构化数据
"""
import json
import re
result = {
"raw": response,
"parsed": None,
"summary": ""
}
try:
# 尝试提取 JSON 块
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response)
if json_match:
json_str = json_match.group(1)
else:
# 尝试直接解析整个响应
json_str = response
# 解析 JSON
parsed = json.loads(json_str)
result["parsed"] = parsed
# 生成摘要
if parsed:
recommendation = parsed.get("recommendation", {})
action = recommendation.get("action", "wait")
confidence = recommendation.get("confidence", 0)
reason = recommendation.get("reason", "")
if action == "wait":
result["summary"] = f"建议观望。{parsed.get('risk_warning', '')}"
else:
action_text = "做多" if action == "buy" else "做空"
result["summary"] = f"建议{action_text},置信度{confidence}%。{reason}"
except json.JSONDecodeError:
# JSON 解析失败,提取关键信息
logger.warning("LLM 响应不是有效 JSON尝试提取关键信息")
result["summary"] = self._extract_summary_from_text(response)
return result
def _extract_summary_from_text(self, text: str) -> str:
"""从非 JSON 文本中提取摘要"""
# 简单提取前 200 字符作为摘要
text = text.strip()
if len(text) > 200:
return text[:200] + "..."
return text
def _build_analysis_prompt(self, data: Dict[str, pd.DataFrame], signal: Dict[str, Any],
symbol: str) -> str:
"""构建 LLM 分析提示 - 优化版"""
parts = [f"# {symbol} 技术分析数据\n"]
# 当前价格
current_price = float(data['5m'].iloc[-1]['close'])
parts.append(f"**当前价格**: ${current_price:,.2f}\n")
# 添加各周期指标摘要
for interval in ['4h', '1h', '15m']:
df = data.get(interval)
if df is None or df.empty:
continue
latest = df.iloc[-1]
parts.append(f"\n## {interval.upper()} 周期指标")
# 价格与均线关系
close = latest.get('close', 0)
ma20 = latest.get('ma20', 0)
ma50 = latest.get('ma50', 0)
if pd.notna(ma20):
position = "上方" if close > ma20 else "下方"
parts.append(f"- 价格在 MA20 {position} (MA20={ma20:.2f})")
if pd.notna(ma50):
parts.append(f"- MA50: {ma50:.2f}")
# RSI
rsi = latest.get('rsi', 0)
if pd.notna(rsi):
rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性")
parts.append(f"- RSI: {rsi:.1f} ({rsi_status})")
# MACD
macd = latest.get('macd', 0)
macd_signal = latest.get('macd_signal', 0)
if pd.notna(macd) and pd.notna(macd_signal):
macd_status = "多头" if macd > macd_signal else "空头"
parts.append(f"- MACD: {macd:.2f}, Signal: {macd_signal:.2f} ({macd_status})")
# 布林带
bb_upper = latest.get('bb_upper', 0)
bb_lower = latest.get('bb_lower', 0)
if pd.notna(bb_upper) and pd.notna(bb_lower):
parts.append(f"- 布林带: 上轨={bb_upper:.2f}, 下轨={bb_lower:.2f}")
# 添加最近 5 根 15M K 线(让 LLM 看形态)
parts.append("\n## 最近 5 根 15M K线")
parts.append("| 时间 | 开盘 | 最高 | 最低 | 收盘 | 涨跌 |")
parts.append("|------|------|------|------|------|------|")
df_15m = data.get('15m')
if df_15m is not None and len(df_15m) >= 5:
for i in range(-5, 0):
row = df_15m.iloc[i]
change = ((row['close'] - row['open']) / row['open']) * 100
change_str = f"+{change:.2f}%" if change >= 0 else f"{change:.2f}%"
time_str = row['open_time'].strftime('%H:%M') if pd.notna(row['open_time']) else 'N/A'
parts.append(f"| {time_str} | {row['open']:.2f} | {row['high']:.2f} | {row['low']:.2f} | {row['close']:.2f} | {change_str} |")
# 计算关键价位
parts.append("\n## 关键价位参考")
df_1h = data.get('1h')
if df_1h is not None and len(df_1h) >= 20:
recent_high = df_1h['high'].tail(20).max()
recent_low = df_1h['low'].tail(20).min()
parts.append(f"- 近期高点: ${recent_high:,.2f}")
parts.append(f"- 近期低点: ${recent_low:,.2f}")
# 初步信号分析结果
parts.append(f"\n## 规则引擎初步判断")
parts.append(f"- 趋势: {signal.get('trend', 'unknown')}")
parts.append(f"- 信号: {signal.get('action', 'hold')}")
parts.append(f"- 置信度: {signal.get('confidence', 0)}%")
parts.append(f"- 触发原因: {', '.join(signal.get('reasons', []))}")
parts.append("\n---")
parts.append("请基于以上数据进行分析,严格按照 JSON 格式输出你的判断。")
return "\n".join(parts)
def calculate_stop_loss_take_profit(self, price: float, action: str,
atr: float) -> Dict[str, float]:
"""
计算止损止盈位置
Args:
price: 当前价格
action: 'buy''sell'
atr: ATR 值
Returns:
{'stop_loss': float, 'take_profit': float}
"""
if action == 'buy':
stop_loss = price - atr * 2
take_profit = price + atr * 3
elif action == 'sell':
stop_loss = price + atr * 2
take_profit = price - atr * 3
else:
stop_loss = 0
take_profit = 0
return {
'stop_loss': round(stop_loss, 2),
'take_profit': round(take_profit, 2)
}