stock-ai-agent/backend/app/crypto_agent/market_signal_analyzer.py
2026-03-23 23:55:35 +08:00

1875 lines
88 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
市场信号分析器 - 纯市场分析,不包含任何仓位信息
职责:
1. 分析K线、量价、技术指标
2. 分析新闻舆情
3. 输出纯市场信号buy/sell/hold + confidence + reasoning
不负责:
- 仓位管理
- 风险控制
- 具体下单决策
"""
import json
import re
import asyncio
import numpy as np
import pandas as pd
from typing import Dict, Any, Optional, List
from datetime import datetime
from app.utils.logger import logger
from app.services.llm_service import llm_service
from app.services.news_service import get_news_service
from app.services.bitget_service import bitget_service
class MarketSignalAnalyzer:
"""市场信号分析器 - 只关注市场,输出客观信号"""
# 市场分析系统提示词Al Brooks价格行为学 + 供需 + 量价)
MARKET_ANALYSIS_PROMPT = """你是一位专业的加密货币日内交易员,采用 Al Brooks 价格行为学分析方法。
核心原则:价格行为 + 供需区域 + 量价关系 决定入场技术指标RSI/ATR仅作辅助参考。
满足 2-3 个入场条件即可交易,不追求所有条件同时满足。
## 第一步:判断市场状态
**价格结构判断优先于EMA**
- 上升趋势连续更高高点HH+ 更高低点HL
- 下降趋势连续更低高点LH+ 更低低点LL
- 震荡区间:高点不再创新高,低点不再创新低,价格横向震荡
**趋势强弱**
- 强趋势大实体K线居多回调幅度 < 前段波动的50%,回调缩量
- 弱趋势 / 变盘预警K线实体缩小、大影线增多、回调超过50%、多次假突破
## 第二步:识别供需区域
**供给区(压力/阻力)**
- 前期显著高点,价格曾在此快速下行的区域
- 多次测试未能有效突破的价格区间
- 当日开盘价、整数关口
**需求区(支撑)**
- 前期显著低点,价格曾在此快速上行的区域
- 多次测试未能有效跌破的价格区间
- 当日开盘价、整数关口
## 第三步:寻找入场机会
### A. 趋势市:顺势回调入场
满足以下 **2-3 项**即可入场,不需全部满足:
1. **两段回调**Al Brooks 核心上升趋势中回调分两波A段下→小反弹→B段再下B段低点出现看涨K线 → 做多;下降趋势反向同理
2. **关键位反转**:价格回调至供需区/前期高低点/整数关口出现明确反转K线吞没、锤子、大阳/阴线)
3. **量价确认**:回调缩量(量比<0.8),在支撑位出现放量反弹(量比>1.2
4. **强趋势K线**出现大实体顺势K线实体 > 影线总和),表明方向明确
5. **二次入场**(更可靠):第一次突破/回调往往是陷阱;等待回测确认后的第二次方向运动再入场
**entry_type 选择**
- 价格正在快速移动中 → limit 挂单等待回调
- 回调已到位出现反转K线 → market 立即入场
### B. 震荡市:区间边界反向入场
1. 价格接近区间上沿(供给区)+ 看跌K线形态 → 做空
2. 价格接近区间下沿(需求区)+ 看涨K线形态 → 做多
3. 需有量价配合入场K线相对放量
### C. 量价过滤(关键!)
**允许入场**
- ✅ 突破时量比 > 1.2(放量突破,真突破概率高)
- ✅ 回调缩量(量比 < 0.8)后,在支撑位出现放量反转
**禁止入场**
- ❌ 突破时量比 < 0.8(无量突破,假突破概率高)
- ❌ 连续 3 根以上大实体同向K线加速中不追
- ❌ 盈亏比 < 1:1.2
## 第四步:止损与目标
**止损设置**Al Brooks信号K线止损法
- 做多止损信号K线反转K线低点下方 0.2-0.3% 缓冲
- 做空止损信号K线高点上方 0.2-0.3% 缓冲
- 用 1.5×ATR(30m) 验证合理性,止损范围参考 0.8-2.5%
**目标设置**
- 下一个供需区域(做多看上方压力,做空看下方支撑)
- 最低盈亏比1:1.2(强信号可要求 1:1.5
- 日内目标2-3%
## 资金费率快速判断
| 情绪 | 做多操作 | 做空操作 |
|------|---------|---------|
| 极度贪婪(>+0.1% | 降低置信度,不加仓 | 可适当提高优先级 |
| 中性 | 正常操作 | 正常操作 |
| 极度恐惧(<-0.1% | 可适当提高优先级 | 降低置信度,不加仓 |
## 历史信号参考
若已有上一轮信号,只在以下情况输出新信号:
- 趋势结构发生明确反转HH/HL → LH/LL或反向
- 新入场价与上一轮差距 ≥ 1.5%(出现新的供需位)
- 距上一轮信号超过 2 小时
- 价格已触及上一轮止损或止盈
## 输出格式(严格遵守)
```json
{
"market_state": "ranging/trending",
"trend_direction": "uptrend/downtrend/neutral",
"trend_strength": "strong/medium/weak",
"analysis_summary": "市场状态简述30字以内",
"key_levels": {
"support": [数字, 数字],
"resistance": [数字, 数字]
},
"signals": [
{
"type": "short_term/medium_term",
"action": "buy/sell",
"entry_type": "market/limit",
"confidence": 0-100,
"grade": "A/B/C",
"entry_price": 数字,
"stop_loss": 数字,
"take_profit": 数字,
"reasoning": "入场依据:供需位+K线形态+量价状态"
}
]
}
```
**输出规则**
- 无明确信号时signals 为空数组 []
- 盈亏比 < 1:1.2 不输出信号
- 最多输出 2 个最强信号
- entry_price / stop_loss / take_profit 必须是纯数字,不加 $ 或逗号
你只负责分析市场信号,不负责仓位管理和风险控制。
"""
def __init__(self):
self.news_service = get_news_service()
self.exchange = bitget_service
async def analyze(self, symbol: str, data: Dict[str, Any],
symbols: List[str] = None,
previous_signal: Dict[str, Any] = None) -> Dict[str, Any]:
"""
分析市场并生成信号
Args:
symbol: 交易对
data: 多周期K线数据
symbols: 所有监控的交易对(用于市场对比)
previous_signal: 上一轮的分析信号(用于避免重复信号和提供上下文)
Returns:
市场信号字典
"""
try:
# 1. 准备市场数据
market_context = self._prepare_market_context(symbol, data, symbols)
# 2. 获取新闻舆情
news_context = await self._get_news_context(symbol)
# 3. 获取合约市场数据(资金费率、持仓量等)
futures_context = await self._get_futures_context(symbol)
# 4. 构建 LLM 提示词
prompt = self._build_analysis_prompt(symbol, market_context, news_context, previous_signal, futures_context)
# 5. 调用 LLM 分析
messages = [
{"role": "system", "content": self.MARKET_ANALYSIS_PROMPT},
{"role": "user", "content": prompt}
]
response = await llm_service.achat(messages)
# 6. 解析结果
result = self._parse_llm_response(response, symbol)
return result
except Exception as e:
logger.error(f"市场信号分析失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return self._get_empty_signal(symbol)
def _prepare_market_context(self, symbol: str, data: Dict,
symbols: List[str] = None) -> str:
"""准备市场上下文信息"""
context_parts = []
# 当前价格和24h变化
current_price = float(data['5m'].iloc[-1]['close'])
price_change_24h = self._calculate_price_change_24h(data['1h'])
context_parts.append(f"当前价格: ${current_price:,.2f} ({price_change_24h})")
# 当日开盘价(供需区域参考)
df_1h = data.get('1h')
if df_1h is not None and len(df_1h) > 0:
# 取今天0点后的第一根1h K线作为当日开盘
try:
now = pd.Timestamp.now()
today_start = now.normalize() # 今天0:00
today_bars = df_1h[df_1h.index >= today_start] if hasattr(df_1h.index, 'normalize') else df_1h.iloc[-24:]
day_open = float(today_bars.iloc[0]['open']) if len(today_bars) > 0 else float(df_1h.iloc[-24]['open'])
except Exception:
day_open = float(df_1h.iloc[-1]['open'])
context_parts.append(f"当日开盘价: {day_open:.1f}(供需参考)")
# 多周期数据价格行为K线 + 技术指标摘要)
# 主要时间周期显示多根K线辅助周期显示摘要
# 15m/30m加大根数以识别波段高低点和供需区
PRICE_ACTION_BARS = {'5m': 10, '15m': 16, '30m': 12, '1h': 6}
for tf_name, df in data.items():
if df is None or len(df) == 0:
continue
context_parts.append(f"\n## {tf_name} 数据")
n_bars = PRICE_ACTION_BARS.get(tf_name, 0)
if n_bars > 0 and len(df) >= 2:
# 预计算量比基准用前20根K线均量排除显示窗口以外的数据
vol_window = df['volume'].iloc[-(n_bars + 20):-n_bars] if len(df) > n_bars + 20 else df['volume'].iloc[:max(1, len(df) - n_bars)]
vol_ma = vol_window.mean() if len(vol_window) > 0 else df['volume'].mean()
# 显示最近 N 根K线价格行为分析
bars = df.iloc[-n_bars:] if len(df) >= n_bars else df
total = len(bars)
bar_lines = []
for i, (_, row) in enumerate(bars.iterrows()):
offset = -(total - 1 - i) # 最新=0往前为负
o, h, l, c = float(row['open']), float(row['high']), float(row['low']), float(row['close'])
vol = float(row.get('volume', 0))
body = abs(c - o)
upper_wick = h - max(o, c)
lower_wick = min(o, c) - l
direction = "" if c >= o else ""
# 量比标注:放量/缩量/平量
if vol_ma > 0:
vr = vol / vol_ma
if vr >= 1.5:
vol_tag = f"量比:{vr:.1f}🔥"
elif vr >= 1.2:
vol_tag = f"量比:{vr:.1f}"
elif vr <= 0.6:
vol_tag = f"量比:{vr:.1f}↓↓"
elif vr <= 0.8:
vol_tag = f"量比:{vr:.1f}"
else:
vol_tag = f"量比:{vr:.1f}"
else:
vol_tag = ""
label = " ← 当前" if offset == 0 else ""
bar_lines.append(
f" [{offset:+d}] {direction}线 开:{o:.1f} 高:{h:.1f} 低:{l:.1f} 收:{c:.1f} "
f"实体:{body:.1f} 上影:{upper_wick:.1f} 下影:{lower_wick:.1f} {vol_tag}{label}"
)
context_parts.append("\n".join(bar_lines))
else:
# 辅助周期只显示最新一根
latest = df.iloc[-1]
context_parts.append(f"开: {latest['open']}, 高: {latest['high']}, 低: {latest['low']}, 收: {latest['close']}")
context_parts.append(f"成交量: {latest.get('volume', 'N/A')}")
# 技术指标摘要RSI + ATR去掉 MACD/BB 的逐周期输出)
indicators = []
if 'rsi' in df.columns and not pd.isna(df['rsi'].iloc[-1]):
indicators.append(f"RSI: {df['rsi'].iloc[-1]:.1f}")
if 'atr' in df.columns and not pd.isna(df['atr'].iloc[-1]):
indicators.append(f"ATR: {df['atr'].iloc[-1]:.2f}")
if 'ema20' in df.columns and not pd.isna(df['ema20'].iloc[-1]):
indicators.append(f"EMA20: {df['ema20'].iloc[-1]:.1f}")
if indicators:
context_parts.append(" 指标: " + " | ".join(indicators))
# 多级别趋势分析(检测小级别反转)
context_parts.append(self._analyze_multi_timeframe_trend(data))
# 量比分析
df_5m = data.get('5m')
if df_5m is not None and len(df_5m) >= 20:
vol_latest = df_5m['volume'].iloc[-1]
vol_ma20 = df_5m['volume'].iloc[-20:-1].mean()
volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1
context_parts.append(f"\n## 量价分析")
context_parts.append(f"最新成交量: {vol_latest:.0f}")
context_parts.append(f"20周期均量: {vol_ma20:.0f}")
context_parts.append(f"量比: {volume_ratio:.2f}")
if volume_ratio > 1.5:
context_parts.append("量价状态: 放量 📊")
elif volume_ratio < 0.7:
context_parts.append("量价状态: 缩量 📉")
else:
context_parts.append("量价状态: 平量 ")
# 波动率分析
volatility_analysis = self._analyze_volatility(data)
if volatility_analysis:
context_parts.append(f"\n## 波动率分析")
context_parts.append(volatility_analysis)
# 趋势位置分析(新增:避免盲目追涨杀跌)
trend_position_analysis = self._analyze_trend_position(data)
if trend_position_analysis:
context_parts.append(f"\n## 趋势位置分析")
context_parts.append(trend_position_analysis)
# ========== 新增:震荡区间检测 ==========
range_zone = self._detect_range_zone(data)
if range_zone['is_ranging']:
context_parts.append(f"\n## 🔔 震荡区间检测(重要!)")
context_parts.append(f"**状态**: 震荡市(置信度: {range_zone['confidence']}%")
if range_zone['support_level'] and range_zone['resistance_level']:
context_parts.append(f"**支撑位**: ${range_zone['support_level']:,.2f}")
context_parts.append(f"**压力位**: ${range_zone['resistance_level']:,.2f}")
context_parts.append(f"**区间宽度**: {range_zone['range_width_pct']:.2f}%")
if range_zone['volume_profile_support']:
context_parts.append(f"**成交量密集区支撑**: ${range_zone['volume_profile_support']:,.2f}")
if range_zone['volume_profile_resistance']:
context_parts.append(f"**成交量密集区压力**: ${range_zone['volume_profile_resistance']:,.2f}")
context_parts.append(f"**分析**: {range_zone['analysis']}")
context_parts.append(f"\n**震荡市交易策略**:")
context_parts.append(f" → 下沿附近挂多单,上沿附近挂空单")
context_parts.append(f" → 目标: 对岸边界,快进快出")
context_parts.append(f" → 严禁追涨杀跌!")
# ========== 新增:趋势反转检测 ==========
reversal_detection = self._detect_trend_reversal(data)
if reversal_detection['is_reversing']:
context_parts.append(f"\n## ⚠️ 趋势反转信号(非常重要!)")
context_parts.append(f"**检测到反转信号**!置信度: {reversal_detection['confidence']}%")
if reversal_detection['reversal_type'] == 'bullish_reversal':
context_parts.append(f"**反转类型**: 看涨反转 📈")
else:
context_parts.append(f"**反转类型**: 看跌反转 📉")
context_parts.append(f"\n**反转信号详情**:")
for sig in reversal_detection['signals'][:5]: # 最多显示5个信号
context_parts.append(f" - [{sig['type']}] {sig['desc']} (权重: {sig['weight']})")
context_parts.append(f"\n**🚨 反转信号处理规则**:")
context_parts.append(f" → 现有同向持仓建议平仓")
context_parts.append(f" → 考虑反方向开仓(需等待确认)")
context_parts.append(f" → 或者暂时观望,等待反转确认")
context_parts.append(f" → 严禁继续原方向开新仓!")
# ========== 新增:趋势阶段检测 ==========
trend_stage = self._detect_trend_stage(data)
if trend_stage['stage'] != 'unknown':
stage_emoji = {'early': '🌱', 'middle': '🔄', 'late': '🌅'}.get(trend_stage['stage'], '')
stage_name = {'early': '早期', 'middle': '中期', 'late': '晚期'}.get(trend_stage['stage'], '未知')
context_parts.append(f"\n## 趋势阶段分析")
context_parts.append(f"**当前阶段**: {stage_emoji} {stage_name}(置信度: {trend_stage['confidence']}%")
context_parts.append(f"**分析**: {trend_stage['analysis']}")
if trend_stage['stage'] == 'late':
context_parts.append(f"\n**⚠️ 晚期阶段警告**:")
context_parts.append(f" → 趋势可能即将反转或进入震荡")
context_parts.append(f" → 严禁追涨/追空开新仓")
context_parts.append(f" → 现有盈利持仓建议逐步止盈")
context_parts.append(f" → 等待明确反转信号后再决策")
elif trend_stage['stage'] == 'early':
context_parts.append(f"\n**✅ 早期阶段机会**:")
context_parts.append(f" → 趋势刚启动,可顺势轻仓入场")
context_parts.append(f" → 设置止损后可持有更长时间")
context_parts.append(f" → 目标可看更大空间")
return "\n".join(context_parts)
async def _get_news_context(self, symbol: str) -> str:
"""获取新闻舆情上下文"""
try:
news_result = await self.news_service.get_crypto_news(symbol)
if not news_result or not news_result.get('articles'):
return "无最新新闻"
articles = news_result['articles'][:5] # 只取前5条
context_parts = ["\n## 最新新闻"]
for article in articles:
title = article.get('title', '')
source = article.get('source', '')
published_at = article.get('publishedAt', '')
time_str = published_at.split('T')[1][:5] if 'T' in published_at else ''
context_parts.append(f"- [{time_str}] {title} ({source})")
return "\n".join(context_parts)
except Exception as e:
logger.warning(f"获取新闻失败: {e}")
return "新闻获取失败"
async def _get_futures_context(self, symbol: str) -> str:
"""获取合约市场数据(资金费率、持仓量、溢价率)"""
try:
loop = asyncio.get_event_loop()
market_data = await loop.run_in_executor(
None,
self.exchange.get_futures_market_data,
symbol
)
if not market_data:
return ""
return self.exchange.format_futures_data_for_llm(symbol, market_data)
except Exception as e:
logger.warning(f"获取 {symbol} 合约数据失败: {e}")
return ""
def _analyze_trend_position(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析趋势位置和日内交易机会(使用 EMA+ 市场状态判断(震荡/趋势)"""
try:
df_30m = data.get('30m')
df_15m = data.get('15m')
df_1h = data.get('1h')
if df_30m is None or len(df_30m) < 50:
return ""
latest_30m = df_30m.iloc[-1]
current_price = float(latest_30m['close'])
# 获取日内级别 EMA30m
ema5_30m = latest_30m.get('ma5') # 实际是 ema5
ema10_30m = latest_30m.get('ma10') # 实际是 ema10
ema20_30m = latest_30m.get('ma20') # 实际是 ema20
if not all([ema5_30m, ema10_30m, ema20_30m]):
return ""
# ========== 新增:市场状态判断(震荡 vs 趋势) ==========
market_state = "unknown"
market_state_reason = []
# 1h EMA 趋势判断
if df_1h is not None and len(df_1h) >= 20:
latest_1h = df_1h.iloc[-1]
ema5_1h = latest_1h.get('ma5')
ema10_1h = latest_1h.get('ma10')
ema20_1h = latest_1h.get('ma20')
if ema5_1h and ema10_1h and ema20_1h:
# 1h EMA 多头/空头排列 → 趋势市
if ema5_1h > ema10_1h > ema20_1h:
market_state = "trending"
market_state_reason.append("1h EMA 多头排列")
elif ema5_1h < ema10_1h < ema20_1h:
market_state = "trending"
market_state_reason.append("1h EMA 空头排列")
else:
market_state = "ranging"
market_state_reason.append("1h EMA 纠缠")
# 波动率判断ATR 变化)
if df_30m is not None and len(df_30m) >= 24 and 'atr' in df_30m.columns:
recent_atr = df_30m['atr'].iloc[-6:].mean() # 最近3小时
older_atr = df_30m['atr'].iloc[-12:-6].mean() # 之前3小时
if pd.notna(recent_atr) and pd.notna(older_atr) and older_atr > 0:
atr_change = (recent_atr - older_atr) / older_atr * 100
if atr_change > 20:
if market_state != "trending":
market_state = "trending"
market_state_reason.append(f"ATR 扩张 {atr_change:.0f}%")
elif atr_change < -20:
if market_state != "ranging":
market_state = "ranging"
market_state_reason.append(f"ATR 收缩 {abs(atr_change):.0f}%")
# 价格动量判断15m
if df_15m is not None and len(df_15m) >= 20:
recent_high = df_15m['high'].iloc[-20:].max()
recent_low = df_15m['low'].iloc[-20:].min()
price_range = (recent_high - recent_low) / current_price * 100
if price_range < 2.5: # 15分钟内波动小于2.5% → 震荡
if market_state != "trending":
market_state = "ranging"
market_state_reason.append(f"15m 波动 {price_range:.1f}% 较小")
elif price_range > 4: # 15分钟内波动大于4% → 趋势
if market_state != "ranging":
market_state = "trending"
market_state_reason.append(f"15m 波动 {price_range:.1f}% 较大")
# 判断日内趋势30m EMA 为主)
if ema5_30m > ema10_30m > ema20_30m:
intraday_trend = "上升"
intraday_emoji = "📈"
elif ema5_30m < ema10_30m < ema20_30m:
intraday_trend = "下跌"
intraday_emoji = "📉"
else:
intraday_trend = "震荡"
intraday_emoji = ""
# 构建市场状态分析
analysis_parts = []
# 市场状态显示(新增)
if market_state == "trending":
state_emoji = "📊"
state_text = f"{state_emoji} **市场状态: 趋势市**"
analysis_parts.append(state_text)
analysis_parts.append(f" 判断依据: {', '.join(market_state_reason)}")
analysis_parts.append(f" 策略: 跟随趋势,等待回调/反弹到 EMA20 顺势入场")
analysis_parts.append(f" 目标: 3-5%,盈亏比 ≥ 1:1.5")
analysis_parts.append(f" 严禁: 逆势做超短线")
elif market_state == "ranging":
state_emoji = "🔄"
state_text = f"{state_emoji} **市场状态: 震荡市**"
analysis_parts.append(state_text)
analysis_parts.append(f" 判断依据: {', '.join(market_state_reason)}")
analysis_parts.append(f" 策略: 5分钟级别高抛低吸支撑位多、压力位空")
analysis_parts.append(f" 目标: 1-2%,盈亏比 ≥ 1:1.5")
analysis_parts.append(f" 严禁: 追涨杀跌")
else:
analysis_parts.append(f"⚠️ 市场状态: 不明确,观望为主")
analysis_parts.append(f"")
analysis_parts.append(f"日内趋势(30m EMA): {intraday_emoji} {intraday_trend}")
analysis = analysis_parts
# 检查15分钟级别入场时机
if df_15m is not None and len(df_15m) >= 20:
latest_15m = df_15m.iloc[-1]
rsi_15m = latest_15m.get('rsi', 50)
ema5_15m = latest_15m.get('ma5') # 实际是 ema5
ema20_15m = latest_15m.get('ma20') # 实际是 ema20
# 检查短期动能
if len(df_15m) >= 5:
recent_closes = df_15m['close'].iloc[-5:].values
is_accelerating = all(recent_closes[i] > recent_closes[i-1] for i in range(1, 5))
# 检查连续大阳线/阴线(快速移动)
recent_changes = [(recent_closes[i] - recent_closes[i-1]) / recent_closes[i-1] * 100
for i in range(1, len(recent_closes))]
big_moves = sum(1 for change in recent_changes if abs(change) > 0.3)
is_rapid_moving = big_moves >= 3
avg_move = sum(abs(c) for c in recent_changes) / len(recent_changes) if recent_changes else 0
else:
is_accelerating = False
is_rapid_moving = False
avg_move = 0
# 计算价格偏离
if ema5_15m and ema20_15m:
deviation_ema5_15m = abs(current_price - ema5_15m) / ema5_15m * 100
distance_to_ema20 = abs(current_price - ema20_15m) / ema20_15m * 100
else:
deviation_ema5_15m = 0
distance_to_ema20 = 0
# 检查成交量
df_5m = data.get('5m')
volume_ratio = 1
if df_5m is not None and len(df_5m) >= 20:
vol_latest = df_5m['volume'].iloc[-1]
vol_ma20 = df_5m['volume'].iloc[-20:-1].mean()
volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1
# 检查5m连续K线走势
if len(df_5m) >= 3:
recent_5m_closes = df_5m['close'].iloc[-3:].values
recent_5m_changes = [(recent_5m_closes[i] - recent_5m_closes[i-1]) / recent_5m_closes[i-1] * 100
for i in range(1, len(recent_5m_closes))]
big_5m_moves = sum(1 for change in recent_5m_changes if abs(change) > 0.3)
is_5m_accelerating = big_5m_moves >= 2
else:
is_5m_accelerating = False
# 日内过度延伸检查EMA 反应更快,阈值更严格)
is_overextended = (
(rsi_15m > 70 and intraday_trend == "上升") or
(rsi_15m < 30 and intraday_trend == "下跌") or
deviation_ema5_15m > 3
)
if intraday_trend == "上升":
# 价格加速检查 - 强制观望,防止追涨
if is_rapid_moving and volume_ratio > 1.5 and deviation_ema5_15m > 0.5:
analysis.append(f"⚠️ 15m: 价格正在快速上涨!连续{big_moves}根大阳线,平均涨幅{avg_move:.2f}%")
analysis.append(f" → 量比 {volume_ratio:.1f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
analysis.append(f" → 🚨 **严禁追涨!强制 HOLD 观望**,等待回调后再考虑")
analysis.append(f" → 如果要入场,等待回调到 EMA20 支撑位用 limit 挂单")
analysis.append(f" → 追涨是持续止损的主要原因!")
elif is_overextended:
analysis.append(f"⚠️ 15m 过度延伸: RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
analysis.append(f" → 不要追多,等待回调")
elif is_accelerating and not is_overextended:
analysis.append(f"15m: 正在上涨中,建议等待回调")
analysis.append(f" → 等待回调到 EMA20 支撑位用 limit 挂单做多")
analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
elif distance_to_ema20 < 1:
analysis.append(f"15m: 回调到 EMA20 支撑位附近")
analysis.append(f" → 支撑位做多反弹EMA20: ${ema20_15m:.0f}")
analysis.append(f" → 用 limit 挂单入场止损1%目标2-3%,盈亏比 >= 1:1.5")
else:
analysis.append(f"15m: 上涨中,耐心等待回调机会")
analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
analysis.append(f" → 不要追多,等待回调到支撑位")
elif intraday_trend == "下跌":
# 价格加速检查 - 强制观望,防止杀跌
if is_rapid_moving and volume_ratio > 1.5 and deviation_ema5_15m > 0.5:
analysis.append(f"⚠️ 15m: 价格正在快速下跌!连续{big_moves}根大阴线,平均跌幅{avg_move:.2f}%")
analysis.append(f" → 量比 {volume_ratio:.1f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
analysis.append(f" → 🚨 **严禁杀跌!强制 HOLD 观望**,等待反弹后再考虑")
analysis.append(f" → 如果要入场,等待反弹到 EMA20 压力位用 limit 挂单")
analysis.append(f" → 杀跌是持续止损的主要原因!")
elif is_overextended:
analysis.append(f"⚠️ 15m 过度延伸: RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
analysis.append(f" → 不要追空,等待反弹")
elif is_accelerating and not is_overextended:
analysis.append(f"15m: 正在下跌中,建议等待反弹")
analysis.append(f" → 等待反弹到 EMA20 压力位用 limit 挂单做空")
analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
elif distance_to_ema20 < 1:
analysis.append(f"15m: 反弹到 EMA20 压力位附近")
analysis.append(f" → 压力位做空回调EMA20: ${ema20_15m:.0f}")
analysis.append(f" → 用 limit 挂单入场止损1%目标2-3%,盈亏比 >= 1:1.5")
else:
analysis.append(f"15m: 下跌中,耐心等待反弹机会")
analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
analysis.append(f" → 不要追空,等待反弹到压力位")
else:
analysis.append(f"15m: 震荡,观望或双向轻仓")
analysis.append(f" → 支撑位多,压力位空,盈亏比 >= 1:1.5")
# 日内交易要点
analysis.append(f"\n💡 稳健交易要点:")
analysis.append(f"- **90%用limit挂单10%用market**:耐心等待回调,不要追涨杀跌")
analysis.append(f"- **价格加速时强制HOLD**:连续大阳/阴线时观望,等回调/反弹")
analysis.append(f"- **RSI极端区强制HOLD**>70或 <30时不入场")
analysis.append(f"- **偏离EMA5>1.5%强制HOLD**:价格过度延伸,等待回归")
analysis.append(f"- **盈亏比第一**: 必须 >= 1:1.5,否则不开仓")
analysis.append(f"- **快进快出**: 持仓不超过4小时")
analysis.append(f"- **止损设置**: 优先 1.5×ATR(30m),参考范围 0.8-2.5%")
analysis.append(f"- **目标盈利**: 2-3%")
analysis.append(f"- **宁可错过,不做错**: 追涨杀跌是持续止损的主要原因")
return "\n".join(analysis) if analysis else ""
except Exception as e:
logger.warning(f"趋势位置分析失败: {e}")
return ""
def _build_analysis_prompt(self, symbol: str, market_context: str,
news_context: str,
previous_signal: Dict[str, Any] = None,
futures_context: str = "") -> str:
"""构建分析提示词"""
prompt_parts = [
f"请分析 {symbol} 的市场情况:\n",
market_context,
"",
news_context
]
# 添加合约市场数据(资金费率等)
if futures_context:
prompt_parts.append("")
prompt_parts.append(futures_context)
# 添加历史信号上下文
if previous_signal:
prev_time = previous_signal.get('timestamp', 'Unknown')
prev_trend = previous_signal.get('trend', 'Unknown')
prev_signals = previous_signal.get('signals', [])
prompt_parts.append("\n" + "="*60)
prompt_parts.append("## 上一轮分析信号(必须参考!)")
prompt_parts.append("="*60)
prompt_parts.append(f"分析时间: {prev_time}")
prompt_parts.append(f"趋势判断: {prev_trend}")
if prev_signals:
prompt_parts.append("\n之前给出的信号:")
for i, sig in enumerate(prev_signals, 1):
action = sig.get('action', 'N/A')
confidence = sig.get('confidence', 0)
timeframe = sig.get('timeframe', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(timeframe, timeframe)
entry = sig.get('entry_price', 'N/A')
sl = sig.get('stop_loss', 'N/A')
tp = sig.get('take_profit', 'N/A')
reasoning = sig.get('reasoning', 'N/A')
prompt_parts.append(
f"\n[{i}] {type_text} | {action} | 信心度: {confidence}%\n"
f" 入场: ${entry}\n"
f" 止损: ${sl}\n"
f" 止盈: ${tp}\n"
f" 理由: {reasoning}"
)
# 重点警告
prompt_parts.append("\n" + "!"*60)
prompt_parts.append("🚨 严禁重复信号!")
prompt_parts.append("!"*60)
prompt_parts.append("如果上一轮已经给出了相同方向的信号(做空/做多),")
prompt_parts.append("且趋势没有发生明确反转,")
prompt_parts.append("绝对不要重复给出相同方向的信号!")
prompt_parts.append("")
prompt_parts.append("只有在以下情况才输出新信号:")
prompt_parts.append(" ✓ 趋势发生了明确的反转")
prompt_parts.append(" ✓ 上一轮是观望,现在出现了新的明确机会")
prompt_parts.append(" ✓ 价格已触及上一轮的止损/止盈价位")
prompt_parts.append("")
prompt_parts.append("以下情况不要输出信号:")
prompt_parts.append(" ✗ 趋势延续,只是价格继续向同一方向移动")
prompt_parts.append(" ✗ 仅仅因为均线排列仍然有效")
prompt_parts.append(" ✗ 没有明显的市场变化")
else:
prompt_parts.append("\n上一轮没有给出交易信号(市场观望建议)")
prompt_parts.append("\n你可以基于当前市场情况给出新的信号。")
prompt_parts.append("\n" + "="*60)
prompt_parts.append("\n请根据以上数据,给出你的市场判断和交易信号。")
return "\n".join(prompt_parts)
def _analyze_multi_timeframe_trend(self, data: Dict[str, Any]) -> str:
"""
多级别趋势分析 - 检测小级别反转信号
目的识别小级别15m/30m已经反转但大级别1h/4h还未反应的情况
这样可以提前捕捉反转信号,而不是等待均线系统确认
"""
context_parts = ["\n## 🔄 多级别趋势分析(检测反转信号)"]
# 定义各级别
timeframes = {
'5m': ('超短线', 5),
'15m': ('短线', 15),
'30m': ('日内', 30),
'1h': ('小时', 60),
'4h': ('趋势', 240)
}
trend_status = {} # 存储各级别趋势状态
# 分析各级别趋势
for tf, (tf_name, minutes) in timeframes.items():
df = data.get(tf)
if df is None or len(df) < 10:
continue
latest = df.iloc[-1]
prev = df.iloc[-2]
# 1. 均线趋势判断
ma5 = latest.get('ma5', 0)
ma10 = latest.get('ma10', 0)
ma20 = latest.get('ma20', 0)
ma_trend = None
if ma5 and ma10 and ma20:
if ma5 > ma10 > ma20:
ma_trend = 'bull'
elif ma5 < ma10 < ma20:
ma_trend = 'bear'
else:
ma_trend = 'neutral'
# 2. MACD 趋势判断
macd_trend = None
if 'macd' in df.columns and 'macd_signal' in df.columns:
macd = df['macd'].iloc[-1]
signal = df['macd_signal'].iloc[-1]
hist = df.get('macd_hist', pd.Series([0])).iloc[-1]
if macd > 0 and signal > 0:
macd_trend = 'bull'
elif macd < 0 and signal < 0:
macd_trend = 'bear'
else:
macd_trend = 'neutral'
# 3. 价格动量最近3根K线
close_3 = df['close'].iloc[-3]
close_2 = df['close'].iloc[-2]
close_1 = df['close'].iloc[-1]
price_momentum = 'up' if close_1 > close_3 else 'down' if close_1 < close_3 else 'flat'
# 综合判断趋势
if ma_trend == 'bull' and (macd_trend == 'bull' or price_momentum == 'up'):
trend = 'bull'
elif ma_trend == 'bear' and (macd_trend == 'bear' or price_momentum == 'down'):
trend = 'bear'
elif price_momentum == 'up' and macd_trend == 'bull':
trend = 'bull'
elif price_momentum == 'down' and macd_trend == 'bear':
trend = 'bear'
else:
trend = 'neutral'
trend_status[tf] = {
'name': tf_name,
'trend': trend,
'ma_trend': ma_trend,
'macd_trend': macd_trend,
'momentum': price_momentum,
'price': float(latest['close']),
'change_3': ((close_1 - close_3) / close_3 * 100) if close_3 > 0 else 0
}
# 生成多级别趋势报告
if not trend_status:
context_parts.append("⚠️ 数据不足,无法进行多级别分析")
return "\n".join(context_parts)
# 检测反转信号
reversal_signals = []
# 1. 小级别反转但大级别未反转
if ('15m' in trend_status and '1h' in trend_status and
trend_status['15m']['trend'] != trend_status['1h']['trend'] and
trend_status['15m']['trend'] != 'neutral'):
small_tf = trend_status['15m']
large_tf = trend_status['1h']
reversal_type = "🔄 反转信号" if large_tf['trend'] != 'neutral' else "⚡ 启动信号"
reversal_signals.append(
f"{reversal_type}: 15分钟[{small_tf['trend']}] vs 1小时[{large_tf['trend']}]"
)
reversal_signals.append(
f" 15分钟变动: {small_tf['change_3']:+.2f}% | 价格: ${small_tf['price']:.2f}"
)
# 2. 30分钟反转但4小时未反转更强的反转信号
if ('30m' in trend_status and '4h' in trend_status and
trend_status['30m']['trend'] != trend_status['4h']['trend'] and
trend_status['30m']['trend'] != 'neutral'):
small_tf = trend_status['30m']
large_tf = trend_status['4h']
reversal_type = "🔄 强反转" if large_tf['trend'] != 'neutral' else "⚡ 趋势启动"
reversal_signals.append(
f"{reversal_type}: 30分钟[{small_tf['trend']}] vs 4小时[{large_tf['trend']}]"
)
reversal_signals.append(
f" 30分钟变动: {small_tf['change_3']:+.2f}% | 价格: ${small_tf['price']:.2f}"
)
# 添加各级别趋势详情
context_parts.append("\n各级别趋势状态:")
for tf in ['5m', '15m', '30m', '1h', '4h']:
if tf in trend_status:
status = trend_status[tf]
trend_icon = {'bull': '📈', 'bear': '📉', 'neutral': '➡️'}.get(status['trend'], '')
context_parts.append(
f" {tf} ({status['name']}): {trend_icon} {status['trend']} "
f"| 动量: {status['change_3']:+.2f}% | 价格: ${status['price']:.2f}"
)
# 添加反转信号
if reversal_signals:
context_parts.append("\n⚠️ 检测到级别背离/反转信号:")
context_parts.extend(reversal_signals)
context_parts.append("\n💡 提示: 小级别已反转但大级别滞后,可考虑:")
context_parts.append(" - 反手操作(平掉旧仓位,开新方向仓位)")
context_parts.append(" - 顺势短线(跟随小级别趋势,快进快出)")
context_parts.append(" - 等待大级别确认(避免假突破)")
else:
context_parts.append("\n✅ 各级别趋势一致,无反转信号")
return "\n".join(context_parts)
def _parse_llm_response(self, response: str, symbol: str) -> Dict[str, Any]:
"""解析 LLM 响应"""
try:
# 尝试提取 JSON
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response)
if json_match:
json_str = json_match.group(1)
else:
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
json_str = json_match.group(0)
else:
raise ValueError("无法找到 JSON 响应")
# 清理 JSON 字符串(移除可能导致解析错误的注释等)
json_str = self._clean_json_string(json_str)
logger.debug(f"解析的 JSON 字符串: {json_str[:500]}...") # 打印前500字符用于调试
result = json.loads(json_str)
# 清理价格字段 - 转换为 float
result = self._clean_price_fields(result)
# 添加元数据
result['symbol'] = symbol
result['timestamp'] = datetime.now().isoformat()
result['raw_response'] = response
# 兼容处理:确保 signals 中的字段与旧格式一致
if 'signals' in result:
for sig in result['signals']:
# LLM 输出的 "type" 是 timeframe (short_term/medium_term/long_term)
# 需要映射为 "timeframe",而 "action" 才是 buy/sell/wait
if 'type' in sig:
# 如果 type 是 short_term/medium_term/long_term映射为 timeframe
if sig['type'] in ['short_term', 'medium_term', 'long_term']:
sig['timeframe'] = sig.pop('type')
# 如果 type 是 buy/sell/wait映射为 action
elif sig['type'] in ['buy', 'sell', 'wait']:
sig['action'] = sig.pop('type')
# 确保 action 字段存在
if 'action' not in sig and 'timeframe' in sig:
# 从 reasoning 或其他字段推断 action
sig['action'] = 'wait'
# 确保 grade 字段存在
if 'grade' not in sig:
# 根据 confidence 推断 grade
confidence = sig.get('confidence', 0)
if confidence >= 80:
sig['grade'] = 'A'
elif confidence >= 60:
sig['grade'] = 'B'
elif confidence >= 40:
sig['grade'] = 'C'
else:
sig['grade'] = 'D'
# 处理趋势字段 - 优先使用 LLM 返回的趋势字段,否则从信号推断
if 'trend_direction' not in result or 'trend_strength' not in result:
# 从 signals 中推断趋势
if 'signals' in result and result['signals']:
best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0))
action = best_signal.get('action', 'wait')
confidence = best_signal.get('confidence', 0)
# 推断趋势方向(如果 LLM 没有提供)
if 'trend_direction' not in result:
if action == 'buy':
result['trend_direction'] = 'uptrend'
elif action == 'sell':
result['trend_direction'] = 'downtrend'
else:
result['trend_direction'] = 'neutral'
# 推断趋势强度(如果 LLM 没有提供)
if 'trend_strength' not in result:
result['trend_strength'] = 'strong' if confidence >= 70 else 'medium' if confidence >= 50 else 'weak'
# 从信号中推断 market_state用于向后兼容
if 'signals' in result and result['signals']:
best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0))
action = best_signal.get('action', 'wait')
confidence = best_signal.get('confidence', 0)
trend_direction = result.get('trend_direction', 'neutral')
# 推断市场状态
if confidence >= 70 and trend_direction != 'neutral':
if trend_direction == 'uptrend':
result['market_state'] = '强势上涨'
elif trend_direction == 'downtrend':
result['market_state'] = '强势下跌'
else:
result['market_state'] = '震荡整理'
else:
result['market_state'] = '震荡整理'
# 推断 trend用于向后兼容简化的趋势字段
if 'trend' not in result:
if trend_direction == 'uptrend':
result['trend'] = 'up'
elif trend_direction == 'downtrend':
result['trend'] = 'down'
else:
result['trend'] = 'sideways'
else:
result['market_state'] = '无明确信号'
if 'trend' not in result:
result['trend'] = 'sideways'
logger.info(f"✅ 市场信号分析完成: {symbol}")
logger.debug(f"市场信号: {json.dumps(result, ensure_ascii=False, indent=2)}")
return result
except Exception as e:
logger.warning(f"解析 LLM 响应失败: {e}")
logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符
return self._get_empty_signal(symbol)
def _clean_json_string(self, json_str: str) -> str:
"""清理 JSON 字符串,移除可能导致解析错误的内容"""
# 移除单行注释 // ...
json_str = re.sub(r'//.*?(?=\n|$)', '', json_str)
# 移除多行注释 /* ... */
json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str)
# 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
return json_str
def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清理价格字段,转换为 float"""
def clean_price(price_value):
if price_value is None:
return None
if isinstance(price_value, (int, float)):
return float(price_value)
if isinstance(price_value, str):
# 移除 $ 符号和逗号
cleaned = price_value.replace('$', '').replace(',', '').strip()
if cleaned:
try:
return float(cleaned)
except ValueError:
return None
return None
# 清理 key_levels 中的支撑位和阻力位
if 'key_levels' in data and data['key_levels']:
key_levels = data['key_levels']
if 'support' in key_levels:
data['key_levels']['support'] = [clean_price(s) for s in key_levels['support']]
if 'resistance' in key_levels:
data['key_levels']['resistance'] = [clean_price(r) for r in key_levels['resistance']]
# 清理 signals 中的价格字段
if 'signals' in data:
# 标记需要移除的信号索引
signals_to_remove = []
for sig in data['signals']:
price_fields = ['entry_price', 'stop_loss', 'take_profit']
for field in price_fields:
if field in sig:
sig[field] = clean_price(sig[field])
# 验证止损止盈价格的合理性
entry_price = sig.get('entry_price')
stop_loss = sig.get('stop_loss')
take_profit = sig.get('take_profit')
action = sig.get('action', '')
if entry_price and entry_price > 0:
MAX_REASONABLE_DEVIATION = 0.50 # 50%
has_invalid_price = False
# 检查止损
if stop_loss is not None:
deviation = abs(stop_loss - entry_price) / entry_price
if deviation > MAX_REASONABLE_DEVIATION:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 信号止损价格不合理: entry={entry_price}, stop_loss={stop_loss}, 偏离={deviation*100:.1f}%")
has_invalid_price = True
elif action == 'buy' and stop_loss >= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做多止损错误: entry={entry_price}, stop_loss={stop_loss} 应该 < entry")
has_invalid_price = True
elif action == 'sell' and stop_loss <= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做空止损错误: entry={entry_price}, stop_loss={stop_loss} 应该 > entry")
has_invalid_price = True
# 检查止盈
if take_profit is not None:
deviation = abs(take_profit - entry_price) / entry_price
if deviation > MAX_REASONABLE_DEVIATION:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 信号止盈价格不合理: entry={entry_price}, take_profit={take_profit}, 偏离={deviation*100:.1f}%")
has_invalid_price = True
elif action == 'buy' and take_profit <= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做多止盈错误: entry={entry_price}, take_profit={take_profit} 应该 > entry")
has_invalid_price = True
elif action == 'sell' and take_profit >= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做空止盈错误: entry={entry_price}, take_profit={take_profit} 应该 < entry")
has_invalid_price = True
# 如果价格不合理,降低等级为 D 或移除信号
if has_invalid_price:
original_grade = sig.get('grade', 'C')
sig['grade'] = 'D'
sig['confidence'] = 0
# 添加错误说明
if 'reasoning' in sig:
sig['reasoning'] = f"[价格异常] {sig['reasoning']}"
logger.error(f"❌ [{data.get('symbol', '')}] 信号价格异常,等级从 {original_grade} 降为 D止损止盈已清空")
# 清空不合理的价格
sig['stop_loss'] = None
sig['take_profit'] = None
return data
def _calculate_price_change_24h(self, df) -> str:
"""计算24小时涨跌幅"""
try:
if df is None or len(df) < 24:
return "N/A"
current_price = float(df['close'].iloc[-1])
price_24h_ago = float(df['close'].iloc[-24])
change = ((current_price - price_24h_ago) / price_24h_ago) * 100
sign = "+" if change >= 0 else ""
return f"{sign}{change:.2f}%"
except Exception as e:
logger.debug(f"计算24h涨跌失败: {e}")
return "N/A"
def _get_empty_signal(self, symbol: str) -> Dict[str, Any]:
"""返回空信号"""
return {
'symbol': symbol,
'trend_direction': 'neutral',
'trend_strength': 'weak',
'analysis_summary': 'unknown',
'volume_analysis': '分析失败',
'news_sentiment': 'neutral',
'news_impact': '',
'market_state': '分析失败',
'trend': 'sideways',
'signals': [],
'key_levels': {},
'timestamp': datetime.now().isoformat(),
'error': '信号分析失败'
}
def _analyze_volatility(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析波动率变化(使用 30m 作为日内主周期)"""
df = data.get('30m')
if df is None or len(df) < 24 or 'atr' not in df.columns:
return ""
lines = []
# ATR 变化趋势
recent_atr = df['atr'].iloc[-6:].mean() # 最近 6 根3小时
older_atr = df['atr'].iloc[-12:-6].mean() # 之前 6 根
if pd.isna(recent_atr) or pd.isna(older_atr) or older_atr == 0:
return ""
atr_change = (recent_atr - older_atr) / older_atr * 100
current_atr = float(df['atr'].iloc[-1])
current_price = float(df['close'].iloc[-1])
atr_percent = current_atr / current_price * 100
lines.append(f"当前 ATR (30m): ${current_atr:.2f} ({atr_percent:.2f}%)")
if atr_change > 20:
lines.append(f"**波动率扩张**: ATR 上升 {atr_change:.0f}%,日内趋势可能启动")
elif atr_change < -20:
lines.append(f"**波动率收缩**: ATR 下降 {abs(atr_change):.0f}%,可能即将变盘")
else:
lines.append(f"波动率稳定: ATR 变化 {atr_change:+.0f}%")
# 布林带宽度
if 'bb_upper' in df.columns and 'bb_lower' in df.columns:
bb_width = (float(df['bb_upper'].iloc[-1]) - float(df['bb_lower'].iloc[-1])) / current_price * 100
bb_width_prev = (float(df['bb_upper'].iloc[-6]) - float(df['bb_lower'].iloc[-6])) / float(df['close'].iloc[-6]) * 100
if bb_width < bb_width_prev * 0.8:
lines.append(f"**布林带收口**: 宽度 {bb_width:.1f}%,变盘信号")
elif bb_width > bb_width_prev * 1.2:
lines.append(f"**布林带开口**: 宽度 {bb_width:.1f}%,趋势延续")
return "\n".join(lines)
def _detect_range_zone(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""
检测震荡区间 - 计算明确的支撑位和压力位
使用多种方法综合判断:
1. 价格通道最近N根K线的最高/最低价)
2. 成交量密集区Volume Profile
3. 布林带
4. EMA支撑/压力
"""
result = {
'is_ranging': False,
'support_level': None,
'resistance_level': None,
'range_width_pct': None,
'confidence': 0,
'volume_profile_support': None,
'volume_profile_resistance': None,
'analysis': ''
}
try:
df_30m = data.get('30m')
df_1h = data.get('1h')
df_15m = data.get('15m')
if df_30m is None or len(df_30m) < 48: # 需要至少48根K线24小时
return result
current_price = float(df_30m['close'].iloc[-1])
# ========== 1. 价格通道分析 ==========
# 使用最近24-48根K线12-24小时计算价格通道
lookback_periods = [24, 36, 48]
price_channels = []
for period in lookback_periods:
if len(df_30m) >= period:
period_data = df_30m.iloc[-period:]
high = period_data['high'].max()
low = period_data['low'].min()
price_channels.append({'high': high, 'low': low, 'width': high - low})
# 选择波动最稳定的通道(宽度变化最小的)
if price_channels:
avg_width = sum(pc['width'] for pc in price_channels) / len(price_channels)
selected_channel = min(price_channels,
key=lambda pc: abs(pc['width'] - avg_width))
support = selected_channel['low']
resistance = selected_channel['high']
range_width = resistance - support
range_width_pct = (range_width / current_price) * 100
# 震荡区间判断标准
# 1. 区间宽度 < 5%(震荡市)
# 2. 价格在区间中位数附近
# 3. EMA 纠缠
is_narrow_range = range_width_pct < 5.0
price_in_middle = (current_price - support) / range_width > 0.3 and \
(current_price - support) / range_width < 0.7
# EMA 纠缠检查
ema5 = df_30m['ma5'].iloc[-1] if 'ma5' in df_30m.columns else None
ema10 = df_30m['ma10'].iloc[-1] if 'ma10' in df_30m.columns else None
ema20 = df_30m['ma20'].iloc[-1] if 'ma20' in df_30m.columns else None
ema_entangled = False
if all([ema5, ema10, ema20]):
ema_spread = (max(ema5, ema10, ema20) - min(ema5, ema10, ema20)) / current_price * 100
ema_entangled = ema_spread < 1.0 # EMA 排列差距 < 1%
# ========== 2. 成交量密集区分析 ==========
volume_profile_support = None
volume_profile_resistance = None
if len(df_30m) >= 48:
# 找出成交量最大的价格区间
df_30m_copy = df_30m.iloc[-48:].copy()
df_30m_copy['avg_price'] = (df_30m_copy['high'] + df_30m_copy['low'] + df_30m_copy['close']) / 3
df_30m_copy['volume_weight'] = df_30m_copy['volume'] * df_30m_copy['avg_price']
# 按价格分层,找出高成交量区域
price_bins = pd.cut(df_30m_copy['avg_price'], bins=10)
volume_by_price = df_30m_copy.groupby(price_bins, observed=True)['volume'].sum()
if len(volume_by_price) > 0:
# 高成交量区作为支撑/压力
max_vol_bin = volume_by_price.idxmax()
if max_vol_bin is not None:
vp_level = (max_vol_bin.left + max_vol_bin.right) / 2
if vp_level < current_price * 0.98:
volume_profile_support = float(vp_level)
elif vp_level > current_price * 1.02:
volume_profile_resistance = float(vp_level)
# ========== 3. 布林带支撑/压力 ==========
bb_support = None
bb_resistance = None
if 'bb_lower' in df_30m.columns and 'bb_upper' in df_30m.columns:
bb_support = float(df_30m['bb_lower'].iloc[-1])
bb_resistance = float(df_30m['bb_upper'].iloc[-1])
# ========== 4. 关键价格点综合 ==========
# 综合多个指标得出最可靠的支撑/压力位
support_candidates = []
resistance_candidates = []
if support:
support_candidates.append(support)
if volume_profile_support:
support_candidates.append(volume_profile_support)
if bb_support:
support_candidates.append(bb_support)
if resistance:
resistance_candidates.append(resistance)
if volume_profile_resistance:
resistance_candidates.append(volume_profile_resistance)
if bb_resistance:
resistance_candidates.append(bb_resistance)
# 取中位数作为最终的支撑/压力位
final_support = np.median(support_candidates) if support_candidates else None
final_resistance = np.median(resistance_candidates) if resistance_candidates else None
# ========== 5. 计算置信度 ==========
confidence = 0
reasons = []
if is_narrow_range:
confidence += 30
reasons.append(f"区间窄({range_width_pct:.1f}%)")
if price_in_middle:
confidence += 20
reasons.append("价格在中部")
if ema_entangled:
confidence += 25
reasons.append("EMA纠缠")
# 成交量分布检查 - 如果成交量在区间两端较小,说明是有效震荡
if len(df_30m) >= 24:
recent_vol = df_30m['volume'].iloc[-12:].mean()
older_vol = df_30m['volume'].iloc[-24:-12].mean()
if abs(recent_vol - older_vol) / older_vol < 0.3:
confidence += 15
reasons.append("成交量平稳")
# 价格反弹次数 - 检查在支撑/压力位附近是否有多次反弹
if final_support and final_resistance:
bounce_count = 0
for i in range(-24, 0):
if i >= -len(df_30m):
row = df_30m.iloc[i]
# 检查是否在支撑位附近反弹
if abs(row['low'] - final_support) / final_support < 0.005 and row['close'] > row['open']:
bounce_count += 1
# 检查是否在压力位附近回落
if abs(row['high'] - final_resistance) / final_resistance < 0.005 and row['close'] < row['open']:
bounce_count += 1
if bounce_count >= 2:
confidence += 10
reasons.append(f"边界反弹{bounce_count}")
result.update({
'is_ranging': confidence >= 60,
'support_level': float(final_support) if final_support else None,
'resistance_level': float(final_resistance) if final_resistance else None,
'range_width_pct': range_width_pct,
'confidence': confidence,
'volume_profile_support': volume_profile_support,
'volume_profile_resistance': volume_profile_resistance,
'analysis': f"震荡判断: {confidence}% ({', '.join(reasons) if reasons else ''})"
})
except Exception as e:
logger.warning(f"震荡区间检测失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return result
def _detect_trend_reversal(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""
检测趋势反转信号
综合多个指标判断趋势是否可能反转:
1. RSI 背离价格创新高但RSI不创新高 / 价格创新低但RSI不创新低
2. MACD 柱状图缩短/背离
3. 量价背离(价格上涨但成交量下降)
4. 关键K线形态吞没、锤子线、十字星等
5. 多周期趋势不一致
"""
result = {
'is_reversing': False,
'reversal_type': None, # 'bullish_reversal' or 'bearish_reversal'
'confidence': 0,
'signals': [],
'analysis': ''
}
try:
df_15m = data.get('15m')
df_30m = data.get('30m')
df_1h = data.get('1h')
if df_15m is None or len(df_15m) < 30:
return result
reversal_signals = []
bullish_signals = 0
bearish_signals = 0
# ========== 1. RSI 背离检测 ==========
if 'rsi' in df_15m.columns and len(df_15m) >= 20:
recent_5 = df_15m.iloc[-5:]
prev_5 = df_15m.iloc[-15:-10]
# 顶背离(看跌反转)
recent_high = recent_5['high'].max()
recent_rsi_at_high = recent_5.loc[recent_5['high'] == recent_high, 'rsi'].values[0]
prev_high = prev_5['high'].max()
prev_rsi_at_high = prev_5.loc[prev_5['high'] == prev_high, 'rsi'].values[0]
if recent_high > prev_high and recent_rsi_at_high < prev_rsi_at_high:
bearish_signals += 2
reversal_signals.append({
'type': 'rsi_divergence',
'direction': 'bearish',
'weight': 2,
'desc': 'RSI顶背离价格创新高但RSI不创新高'
})
# 底背离(看涨反转)
recent_low = recent_5['low'].min()
recent_rsi_at_low = recent_5.loc[recent_5['low'] == recent_low, 'rsi'].values[0]
prev_low = prev_5['low'].min()
prev_rsi_at_low = prev_5.loc[prev_5['low'] == prev_low, 'rsi'].values[0]
if recent_low < prev_low and recent_rsi_at_low > prev_rsi_at_low:
bullish_signals += 2
reversal_signals.append({
'type': 'rsi_divergence',
'direction': 'bullish',
'weight': 2,
'desc': 'RSI底背离价格创新低但RSI不创新低'
})
# ========== 2. MACD 柱状图分析 ==========
if 'macd_hist' in df_15m.columns and len(df_15m) >= 10:
hist_recent = df_15m['macd_hist'].iloc[-3:].values
hist_prev = df_15m['macd_hist'].iloc[-6:-3].values
# MACD 柱状图缩短 = 动能衰竭
if all(h > 0 for h in hist_prev): # 之前是正向
if hist_recent[2] < hist_recent[1] < hist_recent[0]: # 持续缩短
bearish_signals += 1
reversal_signals.append({
'type': 'macd_histogram',
'direction': 'bearish',
'weight': 1,
'desc': 'MACD柱状图持续缩短上涨动能衰竭'
})
if all(h < 0 for h in hist_prev): # 之前是负向
if hist_recent[2] > hist_recent[1] > hist_recent[0]: # 持续收窄
bullish_signals += 1
reversal_signals.append({
'type': 'macd_histogram',
'direction': 'bullish',
'weight': 1,
'desc': 'MACD柱状图持续收窄下跌动能衰竭'
})
# MACD 金叉/死叉
if len(df_15m) >= 2:
macd_current = df_15m['macd'].iloc[-1]
signal_current = df_15m['macd_signal'].iloc[-1]
macd_prev = df_15m['macd'].iloc[-2]
signal_prev = df_15m['macd_signal'].iloc[-2]
# 金叉
if macd_prev <= signal_prev and macd_current > signal_current:
bullish_signals += 1
reversal_signals.append({
'type': 'macd_cross',
'direction': 'bullish',
'weight': 1,
'desc': 'MACD金叉'
})
# 死叉
if macd_prev >= signal_prev and macd_current < signal_current:
bearish_signals += 1
reversal_signals.append({
'type': 'macd_cross',
'direction': 'bearish',
'weight': 1,
'desc': 'MACD死叉'
})
# ========== 3. 量价背离检测 ==========
if 'volume' in df_15m.columns and len(df_15m) >= 10:
recent_price_change = (df_15m['close'].iloc[-1] - df_15m['close'].iloc[-5]) / df_15m['close'].iloc[-5]
recent_volume = df_15m['volume'].iloc[-5:].mean()
older_volume = df_15m['volume'].iloc[-10:-5].mean()
# 价格上涨但成交量下降(量价背离)
if recent_price_change > 0.01 and recent_volume < older_volume * 0.8:
bearish_signals += 1
reversal_signals.append({
'type': 'volume_divergence',
'direction': 'bearish',
'weight': 1,
'desc': '量价背离:价格上涨但成交量萎缩'
})
# 价格下跌但成交量下降(可能见底)
if recent_price_change < -0.01 and recent_volume < older_volume * 0.7:
bullish_signals += 1
reversal_signals.append({
'type': 'volume_divergence',
'direction': 'bullish',
'weight': 1,
'desc': '下跌缩量:抛压枯竭,可能见底'
})
# ========== 4. 关键K线形态检测 ==========
if len(df_15m) >= 3:
latest = df_15m.iloc[-1]
prev = df_15m.iloc[-2]
# 吞没形态
open_latest, close_latest = latest['open'], latest['close']
open_prev, close_prev = prev['open'], prev['close']
# 阳包阴(看涨)
if (close_latest > open_latest and # 当前是阳线
close_prev < open_prev and # 前一个是阴线
open_latest <= close_prev and # 开盘价低于前一个收盘价
close_latest >= open_prev): # 收盘价高于前一个开盘价
bullish_signals += 2
reversal_signals.append({
'type': 'candlestick',
'direction': 'bullish',
'weight': 2,
'desc': '阳包阴吞没形态(强反转信号)'
})
# 阴包阳(看跌)
if (close_latest < open_latest and # 当前是阴线
close_prev > open_prev and # 前一个是阳线
open_latest >= close_prev and # 开盘价高于前一个收盘价
close_latest <= open_prev): # 收盘价低于前一个开盘价
bearish_signals += 2
reversal_signals.append({
'type': 'candlestick',
'direction': 'bearish',
'weight': 2,
'desc': '阴包阳吞没形态(强反转信号)'
})
# 锤子线/倒锤子
body_size = abs(close_latest - open_latest)
upper_shadow = df_15m['high'].iloc[-1] - max(open_latest, close_latest)
lower_shadow = min(open_latest, close_latest) - df_15m['low'].iloc[-1]
# 锤子线(看涨)
if lower_shadow >= body_size * 2 and upper_shadow < body_size * 0.5:
bullish_signals += 1
reversal_signals.append({
'type': 'candlestick',
'direction': 'bullish',
'weight': 1,
'desc': '锤子线(底部反转信号)'
})
# 倒锤子(看跌)
if upper_shadow >= body_size * 2 and lower_shadow < body_size * 0.5:
bearish_signals += 1
reversal_signals.append({
'type': 'candlestick',
'direction': 'bearish',
'weight': 1,
'desc': '倒锤子线(顶部反转信号)'
})
# ========== 5. 多周期趋势不一致 ==========
trend_15m = self._get_trend_direction(df_15m)
trend_30m = self._get_trend_direction(df_30m)
trend_1h = self._get_trend_direction(df_1h)
# 小周期反转但大周期未反应
if trend_15m and trend_1h and trend_15m != trend_1h:
if trend_15m == 'bull' and trend_1h == 'bear':
bullish_signals += 1
reversal_signals.append({
'type': 'timeframe_divergence',
'direction': 'bullish',
'weight': 1,
'desc': '15分钟转多但1小时仍看空潜在反转'
})
elif trend_15m == 'bear' and trend_1h == 'bull':
bearish_signals += 1
reversal_signals.append({
'type': 'timeframe_divergence',
'direction': 'bearish',
'weight': 1,
'desc': '15分钟转空但1小时仍看多潜在反转'
})
# ========== 计算反转信号强度 ==========
total_signals = len(reversal_signals)
if total_signals >= 3:
# 至少3个反转信号才认为可能反转
if bullish_signals >= bearish_signals + 2:
result['is_reversing'] = True
result['reversal_type'] = 'bullish_reversal'
result['confidence'] = min(90, bullish_signals * 15)
result['signals'] = [s for s in reversal_signals if s['direction'] == 'bullish']
elif bearish_signals >= bullish_signals + 2:
result['is_reversing'] = True
result['reversal_type'] = 'bearish_reversal'
result['confidence'] = min(90, bearish_signals * 15)
result['signals'] = [s for s in reversal_signals if s['direction'] == 'bearish']
if reversal_signals:
result['analysis'] = f"检测到 {len(reversal_signals)} 个反转信号"
else:
result['analysis'] = "无反转信号"
except Exception as e:
logger.warning(f"趋势反转检测失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return result
def _get_trend_direction(self, df: pd.DataFrame) -> str:
"""获取趋势方向bull/bear/neutral"""
if df is None or len(df) < 10:
return 'neutral'
try:
# 使用EMA判断
ma5 = df['ma5'].iloc[-1] if 'ma5' in df.columns else None
ma10 = df['ma10'].iloc[-1] if 'ma10' in df.columns else None
ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns else None
if ma5 and ma10 and ma20:
if ma5 > ma10 > ma20:
return 'bull'
elif ma5 < ma10 < ma20:
return 'bear'
# 使用MACD判断
if 'macd' in df.columns and 'macd_signal' in df.columns:
macd = df['macd'].iloc[-1]
signal = df['macd_signal'].iloc[-1]
if macd > signal and macd > 0:
return 'bull'
elif macd < signal and macd < 0:
return 'bear'
except Exception as e:
logger.debug(f"趋势方向判断失败: {e}")
return 'neutral'
def _detect_trend_stage(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""
检测趋势阶段:早期/中期/晚期
判断标准:
1. 早期:刚突破关键位,均线刚开始排列,动能开始释放
2. 中期:均线排列稳定,价格沿趋势移动,量能健康
3. 晚期价格过度延伸RSI极端区量价背离多次假突破
"""
result = {
'stage': 'unknown', # 'early', 'middle', 'late'
'confidence': 0,
'signals': [],
'analysis': ''
}
try:
df_30m = data.get('30m')
df_1h = data.get('1h')
if df_30m is None or len(df_30m) < 30:
return result
current_price = float(df_30m['close'].iloc[-1])
stage_signals = []
early_score = 0
middle_score = 0
late_score = 0
# ========== 1. EMA 排列状态 ==========
ema5 = df_30m['ma5'].iloc[-1] if 'ma5' in df_30m.columns else None
ema10 = df_30m['ma10'].iloc[-1] if 'ma10' in df_30m.columns else None
ema20 = df_30m['ma20'].iloc[-1] if 'ma20' in df_30m.columns else None
ema50 = df_30m['ma50'].iloc[-1] if 'ma50' in df_30m.columns else None
if all([ema5, ema10, ema20, ema50]):
# 检查EMA排列是否形成
if ema5 > ema10 > ema20 > ema50:
# 多头排列
# 检查排列刚刚形成(早期)还是已经稳定(中期/晚期)
ema5_cross_ma20 = False
if len(df_30m) >= 10:
# 检查最近10根内是否发生过金叉
for i in range(-10, 0):
if df_30m['ma5'].iloc[i] > df_30m['ma20'].iloc[i]:
if i > -10 and df_30m['ma5'].iloc[i-1] <= df_30m['ma20'].iloc[i-1]:
ema5_cross_ma20 = True
break
if ema5_cross_ma20:
early_score += 30
stage_signals.append("EMA排列刚形成早期")
else:
# 检查EMA间距
ema_spread = (ema5 - ema20) / ema20 * 100
if ema_spread > 3:
late_score += 20
stage_signals.append(f"EMA间距过大({ema_spread:.1f}%) - 可能过度延伸")
else:
middle_score += 20
stage_signals.append("EMA排列稳定中期")
elif ema5 < ema10 < ema20 < ema50:
# 空头排列
ema5_cross_ma20 = False
if len(df_30m) >= 10:
for i in range(-10, 0):
if df_30m['ma5'].iloc[i] < df_30m['ma20'].iloc[i]:
if i > -10 and df_30m['ma5'].iloc[i-1] >= df_30m['ma20'].iloc[i-1]:
ema5_cross_ma20 = True
break
if ema5_cross_ma20:
early_score += 30
stage_signals.append("EMA排列刚形成早期")
else:
ema_spread = (ema20 - ema5) / ema20 * 100
if ema_spread > 3:
late_score += 20
stage_signals.append(f"EMA间距过大({ema_spread:.1f}%) - 可能过度延伸")
else:
middle_score += 20
stage_signals.append("EMA排列稳定中期")
# ========== 2. RSI 状态 ==========
if 'rsi' in df_30m.columns:
rsi_current = df_30m['rsi'].iloc[-1]
rsi_prev = df_30m['rsi'].iloc[-5:-1].values
# RSI极端区 - 晚期信号
if rsi_current > 70:
late_score += 25
stage_signals.append(f"RSI超买({rsi_current:.0f}) - 趋势晚期")
elif rsi_current < 30:
late_score += 25
stage_signals.append(f"RSI超卖({rsi_current:.0f}) - 趋势晚期")
elif 50 <= rsi_current <= 65:
middle_score += 15
stage_signals.append(f"RSI健康({rsi_current:.0f}) - 趋势中期")
elif 40 <= rsi_current <= 60:
early_score += 10
stage_signals.append(f"RSI中性({rsi_current:.0f}) - 可能早期")
# RSI趋势检查
if len(rsi_prev) >= 3:
rsi_trend = "up" if rsi_current > rsi_prev[-1] else "down" if rsi_current < rsi_prev[-1] else "flat"
if rsi_trend == "flat":
late_score += 10
stage_signals.append("RSI走平 - 动能衰竭")
# ========== 3. 价格偏离度 ==========
if ema20:
deviation = abs(current_price - ema20) / ema20 * 100
if deviation > 5:
late_score += 30
stage_signals.append(f"价格偏离EMA20 {deviation:.1f}% - 过度延伸")
elif deviation > 3:
late_score += 15
stage_signals.append(f"价格偏离EMA20 {deviation:.1f}% - 警戒区域")
elif deviation < 1:
if early_score < middle_score: # 只在不是明显早期时加分
middle_score += 10
stage_signals.append("价格贴近EMA20 - 趋势稳固")
# ========== 4. 量价关系 ==========
if 'volume' in df_30m.columns and len(df_30m) >= 10:
recent_vol = df_30m['volume'].iloc[-5:].mean()
older_vol = df_30m['volume'].iloc[-10:-5].mean()
vol_change = (recent_vol - older_vol) / older_vol * 100
price_change_5 = (df_30m['close'].iloc[-1] - df_30m['close'].iloc[-5]) / df_30m['close'].iloc[-5] * 100
# 价格上涨但成交量下降(量价背离)- 晚期信号
if price_change_5 > 1 and vol_change < -20:
late_score += 20
stage_signals.append(f"量价背离(涨{price_change_5:.1f}%量减{vol_change:.0f}%- 可能见顶")
elif price_change_5 < -1 and vol_change < -20:
late_score += 20
stage_signals.append(f"量价背离(跌{price_change_5:.1f}%量减{vol_change:.0f}%- 可能见底")
elif price_change_5 > 1 and vol_change > 30:
early_score += 15
stage_signals.append(f"放量上涨(涨{price_change_5:.1f}%量增{vol_change:.0f}%- 可能早期")
elif price_change_5 < -1 and vol_change > 30:
early_score += 15
stage_signals.append(f"放量下跌(跌{price_change_5:.1f}%量增{vol_change:.0f}%- 可能早期")
# ========== 5. 波动率状态 ==========
if 'atr' in df_30m.columns and len(df_30m) >= 20:
recent_atr = df_30m['atr'].iloc[-5:].mean()
older_atr = df_30m['atr'].iloc[-15:-5].mean()
atr_change = (recent_atr - older_atr) / older_atr * 100 if older_atr > 0 else 0
if atr_change > 30:
early_score += 10
stage_signals.append(f"ATR扩张({atr_change:.0f}%) - 趋势启动")
elif atr_change < -30:
late_score += 10
stage_signals.append(f"ATR收缩({atr_change:.0f}%) - 动能衰竭")
# ========== 6. 连续同向K线数量 ==========
if len(df_30m) >= 5:
recent_closes = df_30m['close'].iloc[-5:].values
consecutive_up = sum(1 for i in range(1, len(recent_closes)) if recent_closes[i] > recent_closes[i-1])
consecutive_down = sum(1 for i in range(1, len(recent_closes)) if recent_closes[i] < recent_closes[i-1])
if consecutive_up >= 4:
late_score += 15
stage_signals.append(f"连续{consecutive_up}根阳线 - 可能过度")
elif consecutive_down >= 4:
late_score += 15
stage_signals.append(f"连续{consecutive_down}根阴线 - 可能过度")
# ========== 综合判断趋势阶段 ==========
scores = {
'early': early_score,
'middle': middle_score,
'late': late_score
}
max_score = max(scores.values())
if max_score < 20:
result['stage'] = 'unknown'
result['analysis'] = "趋势阶段不明确"
elif max_score == late_score and late_score >= 40:
result['stage'] = 'late'
result['confidence'] = min(95, late_score)
result['signals'] = stage_signals
result['analysis'] = f"⚠️ 趋势晚期({late_score}分)- " + "; ".join(stage_signals[:3])
elif max_score == early_score and early_score >= 30:
result['stage'] = 'early'
result['confidence'] = min(90, early_score)
result['signals'] = stage_signals
result['analysis'] = f"趋势早期({early_score}分)- " + "; ".join(stage_signals[:3])
else:
result['stage'] = 'middle'
result['confidence'] = min(85, middle_score)
result['signals'] = stage_signals
result['analysis'] = f"趋势中期({middle_score}分)- " + "; ".join(stage_signals[:3])
except Exception as e:
logger.warning(f"趋势阶段检测失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return result