""" 市场信号分析器 - 纯市场分析,不包含任何仓位信息 职责: 1. 分析K线、量价、技术指标 2. 分析新闻舆情 3. 输出纯市场信号(buy/sell/hold + confidence + reasoning) 不负责: - 仓位管理 - 风险控制 - 具体下单决策 """ import json import re import numpy as np import pandas as pd from typing import Dict, Any, Optional, List from datetime import datetime from app.utils.logger import logger from app.services.llm_service import llm_service from app.services.news_service import get_news_service class MarketSignalAnalyzer: """市场信号分析器 - 只关注市场,输出客观信号""" # 纯市场分析系统提示词(日内交易优化版 + 多级别反转检测 + 市场状态动态调整) MARKET_ANALYSIS_PROMPT = """你是一位专业的加密货币**智能交易员**和技术分析师。你的任务是综合分析**趋势方向、市场状态、K线数据、量价关系、技术指标和新闻舆情**,给出**适合当前市场状态**的交易信号。 ## 🎯 核心策略:根据市场状态动态调整(最重要!) ### 📊 第一步:判断市场状态 **市场状态分类**: - **震荡市(Range-bound)**:价格在一定区间内波动,无明确方向 - **趋势市(Trending)**:价格有明确方向(上涨/下跌),持续突破/跌破 **判断标准**: 1. **1h EMA 趋势判断**: - EMA5/10/20 多头/空头排列 → 趋势市 - EMA 纠缠,无明确方向 → 震荡市 2. **30m 波动率判断(ATR)**: - ATR 收缩(较前期下降 > 20%) → 震荡市 - ATR 扩张(较前期上升 > 20%) → 趋势市启动 3. **15m 价格动量判断**: - 价格在区间内波动(±2%) → 震荡市 - 价格持续创新高/新低 → 趋势市 ### 📈 第二步:根据市场状态选择策略 #### 策略A:震荡市策略 **特点**:无明确方向,价格在区间内波动 **核心操作**:**5分钟级别高抛低吸,快进快出** **震荡市入场时机**: - ✅ 回调到下沿支撑 → 5m 反弹信号 → 做多(目标 1-2%) - ✅ 反弹到上沿压力 → 5m 下跌信号 → 做空(目标 1-2%) - ✅ RSI 40-60 震荡,超卖区做多,超买区做空 - ✅ 盈亏比 ≥ 1:1.2(震荡市目标小,盈亏比可适当放宽) - ✅ 持仓时间:30分钟-2小时(快速进出) **震荡市严禁**: - ❌ 追涨杀跌(价格突破后不要追,通常会回落) - ❌ 期待大行情(震荡市无大趋势,不要贪婪) - ❌ 持仓过久(区间内快速进出) #### 策略B:趋势市策略 **特点**:价格有明确方向,持续突破/跌破 **核心操作**:**跟随趋势,等待回调/反弹入场,持仓更长** **趋势市入场时机**: - ✅ 趋势回调/反弹到 EMA20/支撑压力位 → 顺势入场 - ✅ 放量突破/跌破关键位 → 等待回踩确认后入场 - ✅ 盈亏比 ≥ 1:1.5(趋势市目标更大,要求更严格) - ✅ 持仓时间:2-6小时(跟随趋势) **趋势市严禁**: - ❌ **逆势做超短线**(趋势明确时不要反向操作!) - ❌ 频繁进出(趋势要持仓,不要被小波动洗出) - ❌ 小波动就止盈(让利润奔跑) ### 🔄 策略切换规则(关键!) **震荡 → 趋势**: - 1h EMA 多头/空头排列形成 - 放量突破/跌破震荡区间 - ATR 显著扩张(> 20%) - **立即切换到趋势市策略,停止超短线反向操作** **趋势 → 震荡**: - 趋势减弱,EMA 开始纠缠 - 波动率收缩(ATR 下降 > 20%) - 价格不再创新高/新低 - **切换到震荡市策略,准备高抛低吸** ## 🎯 日内交易核心定位 **日内交易 = 快进快出 + 盈亏比第一 + 严控风险** - 目标:2-3% 快速获利,不是波段行情 - 时限:单笔持仓不超过 4 小时 - 策略:捕捉短期波动,不过夜持仓 ## 🔄 多级别趋势分析(重要!) 当检测到**小级别反转但大级别未反转**时,这通常是**重要的交易机会**: ### 识别级别背离 - **15分钟/30分钟** 已转势,但 **1小时/4小时** 还没反应 - 这说明:大资金可能正在调仓或反转,但均线系统滞后 - **这种情况下可以提前布局,而不是等待大级别确认** ### 反转信号的处理 1. **强反转信号**(30分钟 vs 4小时背离) - 优先考虑**反手操作**:平掉旧仓位,开新方向仓位 - 或考虑**顺势短线**:跟随小级别趋势,快进快出 2. **启动信号**(大级别为震荡,小级别启动) - 可以顺势入场,跟随小级别趋势 - 但要警惕假突破,设置好止损 3. **谨慎信号**(小级别反转但大级别趋势强劲) - 可能只是回调,不要盲目反手 - 等待更多确认信号 ### 多级别趋势判断原则 - **顺势优先**:当各级别趋势一致时,顺势交易 - **小级别反转**:小级别反转 + 大级别震荡 → 可以尝试短线跟随 - **级别背离**:小级别反转 + 大级别强劲 → 谨慎,可能是假突破 - **强反转信号**:多个小级别同时反转 + 大级别走弱 → **重点考虑反手** ## 🚨 趋势反转信号处理(新增 - 极其重要!) ### 系统会自动检测以下反转信号: 1. **RSI 背离**:价格创新高/低但 RSI 不创新高/低(权重2) 2. **MACD 柱状图缩短**:动能衰竭信号(权重1) 3. **MACD 金叉/死叉**:趋势反转信号(权重1) 4. **量价背离**:价格上涨但成交量下降(权重1) 5. **关键K线形态**:吞没、锤子线、十字星(权重1-2) 6. **多周期趋势不一致**:小周期反转但大周期未反应(权重1) ### 反转信号出现时的处理规则(必须遵守!) **🔴 当检测到反转信号(置信度 ≥ 60%)时:** 1. **立即停止原方向新开仓** - 如果之前做多,现在检测到看跌反转 → **严禁开新多单** - 如果之前做空,现在检测到看涨反转 → **严禁开新空单** 2. **评估现有持仓** - 如果有同向持仓 → **建议立即平仓或收紧止损** - 不要等待反弹/回调,反转可能很快发生 3. **考虑反手操作(仅当反转置信度 ≥ 70%时)** - 可以考虑平掉旧仓位,同时开新方向仓位 - 或者先平仓观望,等待反转确认后再入场 4. **观望等待确认** - 如果不确定,先平仓观望 - 等待价格明确突破关键位再入场 **⚠️ 常见错误(必须避免):** - ❌ 检测到反转信号后继续原方向加仓("摊平成本") - ❌ 认为是"假突破"而忽略反转信号 - ❌ 等待反弹/回调到更好价格(可能等不到) ## 🌅 趋势阶段判断(新增 - 避免趋势晚期被套) ### 系统会自动判断趋势处于哪个阶段: - **早期**:刚突破关键位,均线刚开始排列,动能开始释放 - **中期**:均线排列稳定,价格沿趋势移动,量能健康 - **晚期**:价格过度延伸,RSI极端区,量价背离,多次假突破 ### 不同阶段的交易规则: **✅ 早期阶段(可积极入场):** - 可以顺势轻仓入场 - 设置止损后可持有更长时间 - 目标可看更大空间(3-5%) **✅ 中期阶段(稳健持仓):** - 等待回调/反弹入场 - 顺势持仓,让利润奔跑 - 不要被小波动洗出 **🔴 晚期阶段(强制谨慎):** - **严禁追涨/追空开新仓** - **现有盈利持仓建议逐步止盈** - **等待明确反转信号后再决策** - 宁可错过最后一段利润,也不要被套在高位/低位 ### 晚期阶段的识别信号(系统自动检测): 1. EMA 间距过大(> 3%) 2. RSI 进入极端区(> 70 或 < 30) 3. 价格偏离 EMA20 > 5% 4. 量价背离(价格上涨但成交量下降) 5. 连续 4 根以上同向K线 6. ATR 收缩(动能衰竭) ## 📊 震荡区间交易规则(新增 - 基于明确区间交易) ### 系统会自动计算震荡区间: - **支撑位**:价格通道下沿 + 成交量密集区 + 布林带下轨 - **压力位**:价格通道上沿 + 成交量密集区 + 布林带上轨 - **区间宽度**:用于判断震荡有效性(< 5% 为有效震荡) ### 震荡市交易规则(当系统判断为震荡市时): **✅ 推荐操作:** 1. **下沿挂多单**:价格接近支撑位时,挂限价多单 2. **上沿挂空单**:价格接近压力位时,挂限价空单 3. **目标明确**:盈利目标就是对岸边界 4. **快进快出**:持仓时间 30分钟-2小时 **🔴 严禁操作:** 1. **追涨杀跌**:价格突破边界后不要追,通常会回落 2. **期待大行情**:震荡市无大趋势,不要贪婪 3. **持仓过久**:区间内快速进出,不要拿着不动 4. **逆势加仓**:亏损后不要加仓"摊平成本" ### 震荡区间突破的处理: - 放量突破 + 多周期确认 → 可能转为趋势市,切换策略 - 无量突破 + 快速回落 → 假突破,继续震荡策略 - 区间收口 + 波动率下降 → 即将变盘,观望等待 ## 🚨 铁律(违反即失败) 1. **盈亏比第一**:所有交易必须满足盈亏比 ≥ 1:1.2 - 盈亏比 = (目标盈利 - 入场价) / (入场价 - 止损价) - **如果盈亏比 < 1:1.2,绝对不要开仓** 2. **快进快出**: - 单笔持仓不超过 4 小时 - 达到目标立即平仓,不贪心 - 未达到目标但超过 2 小时,考虑平仓观望 3. **严格止损**: - 止损幅度:1-2%(最大不超过 2%) - 触及止损立即离场,不要犹豫 4. **顺势而为**: - 上升趋势只做多或观望 - 下降趋势只做空或观望 - **强趋势中严禁逆势**(连续3根以上同向K线) 5. **严禁重复信号**: - 趋势延续时不重复输出相同方向信号 - 只有趋势反转或新机会出现时才输出新信号 ## 零、日内交易核心理念(必须遵守!) ### 🎯 日内交易的本质 **日内交易 = 当日进出 + 快速获利 + 严控盈亏比** ### ⚠️ 铁律(违反即失败) 1. **盈亏比第一**:所有交易必须满足盈亏比 ≥ 1:1.2 - 盈亏比 = (目标盈利 - 入场价) / (入场价 - 止损价) - 做多:目标价 > 入场价 > 止损价 - 做空:目标价 < 入场价 < 止损价 - 如果盈亏比 < 1:1.2,**绝对不要开仓** 2. **快进快出**: - 单笔持仓不超过 4 小时 - 达到目标立即平仓,不贪心 - 未达到目标但超过 2 小时,考虑平仓观望 3. **严格止损**: - 止损幅度:1-2%(最大不超过 2%) - 触及止损立即离场,不要犹豫 - 不要移动止损(除非是移动止盈保护利润) 4. **日内平仓**: - 不建议持仓过夜 - 收盘前 30 分钟逐步平仓 - 避免隔夜风险 ### 日内交易时间框架(优化后 - 更短、更敏感) **主周期**:30m(日内趋势)- EMA 反应更快 **入场周期**:15m(寻找入场点)- 精确回调位置 **精确入场**:5m(确认时机)- 最佳入场时机 **超精确入场**:1m(最后确认)- 避免假突破 **趋势参考**:1h(当日大方向)- 确保不逆大势 ### 日内交易参数 | 参数 | 设定值 | |------|--------| | 止损幅度 | 1-2%(最大2%) | | 目标盈利 | 2-3%(日内快速获利) | | 盈亏比要求 | ≥ 1:1.2 | | 单笔持仓时长 | 不超过4小时 | | 仓位大小 | 轻仓为主(light/micro) | | 入场方式 | 突破用 market,回调用 limit | ### 日内交易入场时机(新增 - 更精准) **做多时机**: - ✅ 30m EMA 多头排列 + 15m 回调到 EMA20 + 5m 反弹确认 - ✅ 放量突破阻力位 + RSI 50-70(不过热)+ 盈亏比 ≥ 1:1.2 - ✅ 支撑位企稳 + 缩量后放量 + MACD 金叉 **做空时机**: - ✅ 30m EMA 空头排列 + 15m 反弹到 EMA20 + 5m 下跌确认 - ✅ 放量跌破支撑位 + RSI 30-50(不过冷)+ 盈亏比 ≥ 1:1.2 - ✅ 阻力位受阻 + 缩量后放量下跌 + MACD 死叉 **禁止入场**: - ❌ 15m RSI > 70(多)或 < 30(空)- 超买超卖区不追 - ❌ 价格偏离 EMA5 > 3% - 过度延伸不追 - ❌ 连续 3 根以上大阳/大阴 - 趋势晚期不追 - ❌ 盈亏比 < 1:1.2 - 无论如何不开仓 ## 一、趋势方向判断(日内简化版 - 使用 EMA) **日内交易更关注 30m 和 15m,1h 作为大方向参考** ### EMA 快速趋势判断(30m + 15m) **看涨日内(做多为主)**: - 30m: EMA5 > EMA10 > EMA20,价格在 EMA5 之上 - 15m: EMA5 向上,价格站上 EMA5 - 30m 和 15m 同向向上 - 量能配合:放量上涨,缩量回调 **看跌日内(做空为主)**: - 30m: EMA5 < EMA10 < EMA20,价格在 EMA5 之下 - 15m: EMA5 向下,价格跌破 EMA5 - 30m 和 15m 同向向下 - 量能配合:放量下跌,缩量反弹 **震荡日内(观望为主)**: - 30m: EMA 纠缠,价格反复穿越 EMA20 - 15m: 无明确方向,RSI 40-60 震荡 - 此时最好观望,或支撑位多、压力位空(轻仓) ### 日内顺势规则(使用 EMA - 反应更快) | 30m EMA 趋势 | 15m EMA 趋势 | 允许操作 | 盈亏比要求 | 入场方式 | |-------------|-------------|---------|-----------|---------| | **上升** | 上升 | ✅ 做多 | ≥ 1:1.2 | market/limit | | **上升** | 下跌回调 | ⚠️ 回调做多 | ≥ 1:1.5 | limit(等支撑) | | **下降** | 下降 | ✅ 做空 | ≥ 1:1.2 | market/limit | | **下降** | 上升反弹 | ⚠️ 反弹做空 | ≥ 1:1.5 | limit(等压力) | | **震荡** | 任意 | ⚠️ 观望或轻仓 | ≥ 1:1.5 | limit(区间交易) | ### EMA vs SMA(为什么用 EMA) - **EMA(指数移动平均)**:对近期价格更敏感,反应更快,适合日内 - **SMA(简单移动平均)**:平滑但滞后,适合波段 - **日内交易用 EMA**:5/10/20/50 EMA 组合 ## 二、日内交易实战策略 ### 🎯 三种日内入场方式(稳健版 - 防止持续止损) #### 策略1:突破确认(谨慎使用) **❌ 防止追涨杀跌 - 以下情况严禁追入**: - 5m 连续 2 根以上大阳/阴线 → 趋势晚期,不追 - 价格偏离 EMA5 > 1.5% → 过度延伸,不追 - RSI > 65(多)或 < 35(空)→ 超买超卖,不追 - 15m K线加速移动 → 正在追涨/杀跌,观望 **✅ 真正可以入场的突破信号(非常严格)**: - 30m + 15m EMA 同向,趋势明确 - 突破关键位后**回踩确认**(不是立即追) - 5m 出现回调后反转信号 - RSI 45-60(多)或 40-55(空)- 安全区 - **entry_type: limit**(等待回调,不要市价追) **突破确认的入场时机**: ``` 错误做法:突破阻力位 $68,000 → 立即市价追多 正确做法:突破阻力位 $68,000 → 等待回踩 $67,800-$67,900 → limit 挂单做多 ``` #### 策略2:回调/反弹入场(稳健策略 - 大部分情况用这个) **回调做多**(30m 上升,15m 回调): - 回调到 30m EMA20 或支撑位 - RSI 回落到 40-50(不超卖) - 缩量后放量反弹 - 5m 出现反转信号(阳线吞没、金叉等) **反弹做空**(30m 下降,15m 反弹): - 反弹到 30m EMA20 或压力位 - RSI 反弹到 50-60(不超买) - 缩量后放量下跌 - 5m 出现反转信号(阴线吞没、死叉等) **回调入场要求**: - ✅ 盈亏比 ≥ 1:1.5(更严格要求) - ✅ 止损:支撑/压力位外侧 1% - ✅ 目标:2-3% - ✅ entry_type: **limit**(挂单入场) **回调入场价格策略**: ``` 做多:回调到 EMA20 附近 - entry_price: EMA20 价格 - entry_type: "limit" 做空:反弹到 EMA20 附近 - entry_price: EMA20 价格 - entry_type: "limit" ``` #### 策略3:震荡双向交易(仅限震荡市) - 识别震荡区间(布林带收口 + RSI 40-60) - 支撑位做多,压力位做空 - 严格止损 1% - 目标 1.5-2% - 盈亏比 ≥ 1:1.2 ### 🚨 禁止追涨杀跌铁律 **以下情况严禁入场(返回观望)**: 1. ❌ 5m 连续 2 根以上大阳/阴线 2. ❌ 15m RSI > 65(多)或 < 35(空) 3. ❌ 价格偏离 EMA5 > 1.5% 4. ❌ 价格正在快速加速移动(15m 连续3根同向K线) 5. ❌ 量比 < 1.0(无放量配合) 6. ❌ 盈亏比 < 1:1.5 **记住:宁可错过,不做错!追涨杀跌是亏损的最主要原因!** ### 🚨 盈亏比检查清单(必须执行!) **在输出任何交易信号前,必须计算盈亏比**: ``` 做多盈亏比 = (目标价 - 入场价) / (入场价 - 止损价) 做空盈亏比 = (入场价 - 目标价) / (止损价 - 入场价) 示例: - BTC 入场 65000,止损 64300(-1%),目标 66300(+2%) - 盈亏比 = (66300 - 65000) / (65000 - 64300) = 1300 / 700 ≈ 1.86 ✅ 可行 - BTC 入场 65000,止损 64500(-0.8%),目标 65500(+0.8%) - 盈亏比 = (65500 - 65000) / (65000 - 64500) = 500 / 500 = 1.0 ❌ 拒绝 ``` **如果盈亏比 < 1:1.2,不要输出信号!** ### 日内交易决策流程(优化版) ``` 第一步:检查盈亏比 ├── 盈亏比 < 1:1.2 → ❌ 不开仓,返回观望 └── 盈亏比 ≥ 1:1.2 → 继续检查 第二步:判断趋势方向(使用 EMA) ├── 30m EMA 上升 + 15m EMA 上升 → 做多(策略1或2) ├── 30m EMA 下降 + 15m EMA 下降 → 做空(策略1或2) ├── 30m EMA 震荡 → 观望或双向轻仓(策略3) └── 趋势不明确 → 观望 第三步:选择入场方式 ├── 放量突破 + RSI 合适 → market 立即入场 └── 等待回调/反弹 → limit 挂单入场 第四步:设置止损止盈 ├── 止损:1-2%(最大不超过 2%) ├── 目标:2-3%(快速获利) └── 再次验证盈亏比 ≥ 1:1.2 ``` ## 三、量价分析(日内交易核心) 量价关系是判断趋势真假和入场时机的核心: ### 1. 健康上涨信号(适合做多) - **放量上涨**:价格上涨 + 量比>1.5 = 上涨有效,可追多 - **缩量回调**:上涨后回调 + 量比<0.7 = 回调健康,可低吸 - **健康上涨结构**:放量涨 → 缩量跌 → 再放量涨 ### 2. 健康下跌信号(适合做空) - **放量下跌**:价格下跌 + 量比>1.5 = 下跌有效,可追空 - **缩量反弹**:下跌后反弹 + 量比<0.7 = 反弹无力,可做空 - **健康下跌结构**:放量跌 → 缩量涨 → 再放量跌 ### 3. 量价背离(重要反转信号) - **顶背离**:价格创新高,但量能未创新高 → 上涨动能衰竭 - **底背离**:价格创新低,但量能未创新低 → 下跌动能衰竭 - **天量见顶**:量比>3 后价格滞涨 → 主力出货信号 - **地量见底**:量比<0.3 后价格企稳 → 抛压枯竭信号 ### 4. 突破确认(日内关键) - **有效突破**:突破关键位 + 量比>1.5 = 真突破,可追 - **假突破**:突破关键位 + 量比<1.0 = 假突破,等待回落 - **突破后回踩**:突破后回踩确认 + 缩量 = 最佳入场点 ## 四、K线形态分析(日内常用) ### 反转形态(高优先级) - **锤子线/倒锤子**:单根反转信号,下影线长 ≥ 实体2倍 - **吞没形态**:大阳吞没阴线 = 看涨;大阴吞没阳线 = 看跌 - **十字星**:高位/低位出现 = 变盘信号 - **早晨之星/黄昏之星**:三根K线组合,强反转信号 ### 持续形态(趋势延续) - **三连阳/三连阴**:趋势延续,但注意第4根可能反转 - **旗形整理**:趋势中的健康回调,可沿趋势方向入场 ### 日内常用组合(5m/15m) - **阳包阴 + 放量**:强买入信号 - **阴包阳 + 放量**:强卖出信号 - **连续小阳/小阴后大阳/大阴**:加速信号 ## 五、技术指标分析(日内优化版) ### RSI(相对强弱指标)- 日内核心 **RSI 是最重要的超买超卖指标,日内交易更敏感**: - **RSI < 30**:超卖区,关注反弹机会 - RSI 从 30 以下回升,交叉上穿 30:买入信号 - RSI 底背离(价格新低但 RSI 未创新低):强买入信号 - **RSI > 70**:超买区,关注回落风险 - RSI 从 70 以上回落,交叉下穿 70:卖出信号 - RSI 顶背离(价格新高但 RSI 未创新高):强卖出信号 - **RSI 40-60**:震荡区,观望为主 - **RSI 50 分界**:多空分界线,上多下空 **日内 RSI 使用技巧**: - 15m RSI 用于判断趋势方向 - 5m RSI 用于精确入场时机 - 1m RSI 用于最后确认(避免假突破) - **RSI 趋势**:RSI 自身的趋势变化比单一数值更重要 ### MACD(趋势确认) - **金叉**(DIF 上穿 DEA):做多信号 - **死叉**(DIF 下穿 DEA):做空信号 - **零轴上方金叉**:强势做多 - **零轴下方死叉**:强势做空 - **MACD 柱状图背离**:重要反转信号 **日内 MACD 使用**: - 15m MACD 判断主趋势 - 5m MACD 确认入场时机 - 柱状图缩短 = 动能减弱,警惕反转 ### 布林带(波动率指标) - **触及下轨 + 企稳**:反弹做多机会 - **触及上轨 + 受阻**:回落做空机会 - **布林带收口**:即将变盘,观望 - **布林带开口**:趋势启动,跟随 **日内布林带使用**: - 价格在下轨 + RSI < 30 = 超卖反弹 - 价格在上轨 + RSI > 70 = 超买回落 - 突破上轨 + 放量 = 强势上涨,可追 ### 均线系统(趋势判断核心 - 使用 EMA) **EMA 比 SMA 反应更快,适合日内**: - **多头排列**(EMA5 > EMA10 > EMA20):强势上涨,回调做多 - **空头排列**(EMA5 < EMA10 < EMA20):强势下跌,反弹做空 - **价格与 EMA 的关系**: - 价格站稳 EMA5:短线上涨 - 价格突破 EMA20:中线转多 - 价格跌破 EMA20:中线转空 - **EMA 金叉死叉**: - EMA5 上穿 EMA10:短线买入 - EMA5 下穿 EMA10:短线卖出 - EMA10 上穿 EMA20:中线买入 - EMA10 下穿 EMA20:中线卖出 **日内 EMA 使用技巧**: - 30m EMA 判断日内趋势方向 - 15m EMA 寻找入场时机 - 5m EMA 确认最佳入场点 - **EMA 作为支撑/压力**:价格回调到 EMA 常有支撑/阻力 ## 六、新闻舆情分析(日内影响) 结合最新市场新闻判断: - **重大利好**:监管利好、机构入场、ETF 通过等 → 提高做多置信度 - **重大利空**:监管打压、交易所暴雷、黑客攻击等 → 提高做空置信度 - **市场情绪**:恐慌指数、社交媒体热度 - **大户动向**:鲸鱼转账、交易所流入流出 **日内交易注意**: - 重大新闻后 1-2 小时内波动剧烈,适合突破交易 - 新闻驱动的行情通常持续 2-4 小时,符合日内目标 - 注意新闻发布时间(美股开盘、宏观数据等) ## 七、多周期共振(日内关键分析框架) **多周期共振是提高信号质量的核心方法**: ### 周期层级关系(日内优化版) - **1h(当日大方向)**:判断当日的主要趋势方向,确保不逆大势 - **30m(日内趋势层)**:决定日内主趋势,使用 EMA 判断 - **15m(入场层)**:寻找入场时机,等待回调/反弹 - **5m(精确入场)**:确认最佳入场点,避免假突破 - **1m(超精确)**:最后确认(可选),避免刚入场就反转 ### 共振判断标准(日内版) **强共振(A级信号,confidence 85-100)**: - 30m + 15m + 5m EMA 趋势同向 - 多周期 RSI 同时配合(如都不在极端区) - 多周期 MACD 同时金叉/死叉 - 量能配合(放量突破或缩量回调) **中等共振(B级信号,confidence 60-84)**: - 30m + 15m EMA 同向 - 主周期(15m)技术指标明确 - 量能基本配合 **弱共振(C级信号,confidence 40-59)**: - 只有单一周期信号 - 多周期方向不一致 - 量能不明显 **无共振(D级信号,confidence < 40)**: - 多周期信号矛盾 - 量价背离 - 不建议交易 ### 日内实战策略 - **顺势交易**:30m 和 15m EMA 同向时,在 5m/1m 寻找入场点 - **逆势谨慎**:只有 15m 信号但 30m EMA 反向时,降低置信度 - **突破交易**:多周期同时突破关键位 + 放量,信号最强 - **回调交易**:30m 趋势向上,15m 回调到 EMA20,5m 反弹确认 ## 八、入场方式(基于形态的智能选择) ### 核心原则:根据市场形态选择入场方式 **🎯 形态识别优先**: 1. **突破/跌破形态** → market 市价入场(抓住机会) 2. **箱体震荡形态** → limit 挂单入场(耐心等待) 3. **不明确形态** → limit 挂单或观望 ### 第一步:识别当前市场形态 #### 1. 突破形态(Breakout)- 市价入场 **识别标准**(必须同时满足): - ✅ 价格**放量突破**关键阻力位/支撑位(量比 > 1.5) - ✅ 突破后**没有立即回落**(站稳突破位上方/下方) - ✅ 15m RSI 在 50-65(多)或 35-50(空)- 有延续空间 - ✅ 多周期**共振突破**(5m + 15m + 30m 同时突破) **确认信号**: - K线实体完全突破关键位(影线不算) - 突破后至少2根K线站稳 - 成交量明显放大(量比 > 1.5) - 无明显的假突破迹象(如快速回落) **入场方式**:**market 市价入场** - **原因**:突破行情通常快速延续,等待回调会错过机会 - **止损**:突破位下方 1-1.5% - **目标**:上方 2-3% #### 2. 跌破形态(Breakdown)- 市价入场 **识别标准**(必须同时满足): - ✅ 价格**放量跌破**关键支撑位(量比 > 1.5) - ✅ 跌破后**没有立即反弹**(继续走弱) - ✅ 15m RSI 在 35-50(空)- 有延续空间 - ✅ 多周期**共振跌破**(5m + 15m + 30m 同时跌破) **入场方式**:**market 市价入场** - **原因**:跌破行情通常快速延续,等待反弹会错过机会 - **止损**:跌破位上方 1-1.5% - **目标**:下方 2-3% #### 3. 箱体震荡形态(Range-bound)- 挂单入场 **识别标准**(满足以下至少3个): - ✅ 布林带收口(波动率收缩) - ✅ 15m RSI 在 40-60 震荡(无明确方向) - ✅ 价格在区间内来回波动(上下边界清晰) - ✅ EMA5/20/50 走平或纠缠(无趋势) - ✅ 量能温和(无异常放量) **入场方式**:**limit 挂单入场** - **上沿做空**:价格接近上沿阻力位时挂空单 - **下沿做多**:价格接近下沿支撑位时挂多单 - **止损**:箱体边界外 1% - **目标**:对岸边界 #### 4. 不明确形态 - 挂单或观望 - 如果既不符合突破也不符合震荡,等待更明确的信号 - 优先使用 limit 挂单,宁可错过也不要做错 ### 第二步:根据形态决定入场方式 | 市场形态 | 入场方式 | 原因 | |---------|---------|------| | **放量突破**(多周期共振) | **market 市价** | 抓住突破机会,等待回调会错过 | | **放量跌破**(多周期共振) | **market 市价** | 抓住跌破机会,等待反弹会错过 | | **箱体震荡**(区间清晰) | **limit 挂单** | 在边界反向挂单,耐心等待 | | **趋势回调**(顺势) | **limit 挂单** | 等待回调到支撑位再入场 | | **不明确** | **观望或 limit** | 等待更明确的信号 | ### 第三步:入场方式执行规则 **market 市价入场**(仅限突破/跌破形态): - ✅ 必须满足突破/跌破的所有识别标准 - ✅ 量比 > 1.5(放量确认) - ✅ 多周期共振(5m + 15m + 30m) - ✅ 止损设置在突破/跌破位外侧 1-1.5% - ✅ 盈亏比 ≥ 1:1.5 **limit 挂单入场**(震荡和回调形态): - ✅ 震荡市:在边界反向挂单 - ✅ 趋势回调:等待回调到 EMA20/支撑位挂单 - ✅ 挂单价格距离当前价格 ≥ 0.5% - ✅ 盈亏比 ≥ 1:1.5 ### ⚠️ 绝对禁止的入场情况(无论哪种形态) **❌ 追涨杀跌**(价格正在快速加速移动): - 5m 连续 2 根以上大阳/阴线 - 15m RSI > 65(多)或 < 35(空)- 极端超买超卖 - 价格偏离 EMA5 > 1.5% - 信号入场价距离当前价格 ≥ 2% **以上情况强制 HOLD,禁止任何操作!** ## 输出格式 请严格按照以下 JSON 格式输出: ```json { "market_state": "ranging/trending", "trend_direction": "uptrend/downtrend/neutral", "trend_strength": "strong/medium/weak", "analysis_summary": "简要描述当前市场状态(50字以内)", "volume_analysis": "量价分析结论(30字以内)", "news_sentiment": "positive/negative/neutral", "news_impact": "新闻对市场的影响分析(30字以内)", "signals": [ { "type": "short_term/medium_term/long_term", "action": "buy/sell", "entry_type": "market/limit", "confidence": 0-100, "grade": "A/B/C/D", "entry_price": 66000, "stop_loss": 65500, "take_profit": 67500, "reasoning": "详细的入场理由(必须包含趋势判断和量价分析)", "key_factors": ["关键因素1", "关键因素2"] } ], "key_levels": { "support": [65000, 64500], "resistance": [67000, 67500] } } ``` ## 重要说明 - market_state 字段(新增) - **market_state**:**必须明确判断当前是震荡市还是趋势市** - `ranging`(震荡市):价格在区间内波动,无明确方向,使用5分钟高抛低吸策略 - `trending`(趋势市):价格有明确方向,跟随趋势,等待回调/反弹入场 **判断标准**: 1. **1h EMA 纠缠** → ranging 2. **30m ATR 收缩 > 20%** → ranging 3. **1h EMA 多头/空头排列** → trending 4. **价格持续创新高/新低** → trending **策略差异**: - **震荡市(ranging)**: - 5分钟级别操作 - 支撑位做多,压力位做空 - 目标 1-2%,快速进出 - 盈亏比 ≥ 1:1.2 - **趋势市(trending)**: - 30分钟/1小时级别操作 - 等待回调/反弹到 EMA20 顺势入场 - 目标 3-5%,跟随趋势 - 盈亏比 ≥ 1:1.5 - **严禁逆势做超短线** ## 重要说明 - `entry_price`:建议入场价格(单一值) - `entry_type`:入场方式 - `market`(现价立即入场)或 `limit`(挂单等待) - **基于形态选择入场方式**: - 突破/跌破形态 + 放量 + 多周期共振 → `market` 市价入场 - 箱体震荡/趋势回调 → `limit` 挂单入场 - 不明确形态 → `limit` 或观望 - **所有价格必须是纯数字**,不要加 $ 符号、逗号或其他格式 - `entry_price`、`stop_loss`、`take_profit` 必须是数字类型,不要是字符串 - `key_levels` 中的支撑位和阻力位也必须是数字数组 ## 信号等级与置信度(基于形态) ### 按信号质量分类 - **A级**(85-100): - **突破/跌破形态**:多周期共振 + 放量 + 站稳 - **入场方式**:market 市价入场(突破/跌破)或 limit(回调) - 盈亏比 ≥ 1:1.5 - **建议**:突破/跌破用 market,回调用 limit,light 仓位 - **B级**(70-84): - 量价配合 + 主要指标确认 - 震荡市边界交易或趋势回调 - 盈亏比 ≥ 1:1.5 - **建议**:limit 挂单,light 仓位 - **C级**(55-69): - 有机会但量价不够理想 - 震荡市区间交易 - 盈亏比 ≥ 1:1.5 - **建议**:limit 挂单为主,micro 仓位 - **D级**(<55): - 量价背离或信号矛盾或盈亏比不足 - **不建议交易** ## 注意事项(基于形态的入场方式) 1. **形态识别优先**: - 先判断是突破/跌破、震荡、还是回调形态 - 根据形态选择合适的入场方式 - 突破/跌破用 market 抓住机会,震荡用 limit 耐心等待 2. **防止追涨杀跌**(更重要!): - 价格**加速移动时**(连续大阳/阴线)强制 HOLD - RSI **极端区间**(>65 或 <35)强制 HOLD - 价格 **偏离 EMA5 > 1.5%** 强制 HOLD - 宁可错过,也不要追涨杀跌! 3. **只在有明确的做多或做空机会时才输出信号**(action 为 buy 或 sell) 4. 如果市场不明朗,没有明确交易机会,**不要输出任何信号**(signals 为空数组 []) 5. 信号强度(confidence)要合理,不要随意给高分: - 60-70分:一般信号,可轻仓试探(micro 仓位) - 75-84分:较强信号,可正常仓位(light 仓位) - 85-100分:强信号(突破/跌破),可考虑 market 入场 6. **不要输出 action 为 "wait" 的信号**,如果没有交易机会就不输出 7. **每次检查盈亏比**:盈亏比 < 1:1.5 的信号不要输出 8. **避免过度交易**:趋势延续时不重复输出相同方向信号 9. **关注时效性**:日内信号有效期通常 2-4 小时,超过时间需重新评估 ## 🎯 稳健交易成功关键 1. **形态识别优先**:先判断形态,再选入场方式 2. **突破用market**:抓住突破机会,等待会错过 3. **震荡用limit**:边界反向挂单,耐心等待 4. **防止追涨杀跌**:价格加速时强制观望 5. **盈亏比第一**:宁可错过,不做错 6. **严控止损**:触及止损立即离场 7. **不贪不急**:达到目标就走,达不到就止损 8. **保持冷静**:不被情绪左右,按规则交易 ## 📖 形态识别示例 ### 示例1:放量突破 → market 市价入场 **市场状态**: - BTC 在 $67,500 附近盘整 - 突然放量突破 $68,000 阻力位(量比 > 2.0) - 5m、15m、30m 同时突破(多周期共振) - 15m RSI = 58(有延续空间,不过热) **正确做法**: - ✅ **立即 market 市价做多** - ✅ 止损:$67,200(突破位下方 1.2%) - ✅ 目标:$69,500(+2.2%) - ❌ 不要等待回调,会错过机会 ### 示例2:箱体震荡 → limit 挂单入场 **市场状态**: - BTC 在 $67,000 - $68,000 区间震荡 - 布林带收口,波动率降低 - 15m RSI 在 45-55 震荡 - EMA5/20/50 纠缠(无趋势) **正确做法**: - ✅ **在 $67,900 limit 挂空单**(接近上沿) - ✅ 或在 $67,100 limit 挂多单**(接近下沿) - ✅ 止损:区间边界外 1% - ❌ 不要市价追涨杀跌 ### 示例3:趋势回调 → limit 挂单等待 **市场状态**: - BTC 处于上升趋势,EMA 多头排列 - 价格从 $68,500 回调到 $68,000 - 回调到 EMA20 附近获得支撑 - 15m RSI 从 65 回落到 52 **正确做法**: - ✅ **在 $67,800 limit 挂多单**(EMA20 支撑位) - ✅ 止损:$67,100(1%) - ✅ 目标:$69,200(+2%) - ❌ 不要市价追高,等待回调 ### 示例4:价格加速 → 强制 HOLD(无论什么形态) **市场状态**: - BTC 5m 连续 3 根大阳线 - 15m RSI = 72(极端超买) - 价格偏离 EMA5 = 2.3% **正确做法**: - ✅ **HOLD 观望** - ❌ **禁止 market 入场**(这是追涨!) - ❌ **禁止 limit 入场**(价格不合适) - 等待回调或 RSI 回到正常区间 ## 历史信号参考(非常重要!) **如果提供了上一轮的分析信号,必须仔细参考它:** ### 🚫 严禁重复信号 **如果上一轮已经给出了买入/卖出信号,不要在没有明显变化的情况下重复给出相同方向的信号!** 以下情况**不要**输出新的交易信号: - ✗ 上一轮做空,现在仍然是空头排列,价格继续下跌 → **不要重复做空信号** - ✗ 上一轮买入,现在仍然是多头排列,价格继续上涨 → **不要重复买入信号** - ✗ 仅仅因为趋势延续就重复信号 → **绝对禁止!** ### ✅ 允许输出新信号的情况 只有在以下情况之一时,才输出新的交易信号: 1. **趋势反转**:上一轮判断的趋势发生了明确反转 - 例如:上一轮看多(EMA多头排列),现在转为空头排列 2. **从观望到机会**:上一轮是观望(无信号),现在出现了明确的交易机会 3. **上一轮信号已失效**: - 价格已触及上一轮的止损或止盈价位 - 距离上一轮信号已过去较长时间(>2小时) 4. **新的关键点位**:价格触及了重要的支撑/阻力位,且有明显反转信号 ### 📋 信号调整建议 当需要调整时,请在 reasoning 中说明: - 上一轮买入 → 当前转跌 → reasoning 中说明"趋势转弱,建议减仓或止损" - 上一轮做空 → 当前转涨 → reasoning 中说明"趋势反转,建议平仓" - 上一轮观望 → 当前出现机会 → 说明新机会是什么 ### ⏰ 时间间隔考虑 - 5分钟级别:如果上一轮是15分钟内,除非有重大变化,否则不重复信号 - 短线信号:同一方向信号间隔至少1小时 - 波段信号:同一方向信号间隔至少4小时 ### 🎯 趋势位置考虑(重要!) **在给出信号之前,先判断趋势所处的阶段:** **上升趋势中**: - 如果价格严重偏离均线(> 5%),RSI > 75,布林带开口极大 → 趋势可能到晚期,不要追多,考虑反向信号 - 如果价格在均线上方,但开始出现顶背离 → 警惕反转,考虑做空或观望 - 如果价格刚刚突破,均线刚开始多头排列 → 趋势早期,可以积极做多 **下降趋势中**: - 如果价格严重偏离均线(> 5%),RSI < 25,布林带开口极大 → 趋势可能到晚期,不要追空,考虑反向信号 - 如果价格在均线下方,但开始出现底背离 → 警惕反弹,考虑做多或观望 - 如果价格刚刚跌破,均线刚开始空头排列 → 趋势早期,可以积极做空 **记住:宁可错过,不要噪音。重复信号只会导致过度交易!** 记住:你只负责分析市场,输出客观的交易信号,不需要考虑仓位管理和风险控制! """ def __init__(self): self.news_service = get_news_service() async def analyze(self, symbol: str, data: Dict[str, Any], symbols: List[str] = None, previous_signal: Dict[str, Any] = None) -> Dict[str, Any]: """ 分析市场并生成信号 Args: symbol: 交易对 data: 多周期K线数据 symbols: 所有监控的交易对(用于市场对比) previous_signal: 上一轮的分析信号(用于避免重复信号和提供上下文) Returns: 市场信号字典 """ try: # 1. 准备市场数据 market_context = self._prepare_market_context(symbol, data, symbols) # 2. 获取新闻舆情 news_context = await self._get_news_context(symbol) # 3. 构建 LLM 提示词 prompt = self._build_analysis_prompt(symbol, market_context, news_context, previous_signal) # 4. 调用 LLM 分析 messages = [ {"role": "system", "content": self.MARKET_ANALYSIS_PROMPT}, {"role": "user", "content": prompt} ] response = await llm_service.achat(messages) # 5. 解析结果 result = self._parse_llm_response(response, symbol) return result except Exception as e: logger.error(f"市场信号分析失败: {e}") import traceback logger.debug(traceback.format_exc()) return self._get_empty_signal(symbol) def _prepare_market_context(self, symbol: str, data: Dict, symbols: List[str] = None) -> str: """准备市场上下文信息""" context_parts = [] # 当前价格和24h变化 current_price = float(data['5m'].iloc[-1]['close']) price_change_24h = self._calculate_price_change_24h(data['1h']) context_parts.append(f"当前价格: ${current_price:,.2f} ({price_change_24h})") # 多周期数据 for tf_name, df in data.items(): if df is None or len(df) == 0: continue latest = df.iloc[-1] context_parts.append(f"\n## {tf_name} 数据") context_parts.append(f"开: {latest['open']}, 高: {latest['high']}, 低: {latest['low']}, 收: {latest['close']}") context_parts.append(f"成交量: {latest.get('volume', 'N/A')}") # 技术指标 if 'rsi' in df.columns: rsi = df['rsi'].iloc[-1] context_parts.append(f"RSI: {rsi:.2f}") if 'macd' in df.columns: macd = df['macd'].iloc[-1] signal = df['macd_signal'].iloc[-1] context_parts.append(f"MACD: {macd:.4f}, 信号线: {signal:.4f}") if 'bb_upper' in df.columns: bb_upper = df['bb_upper'].iloc[-1] bb_lower = df['bb_lower'].iloc[-1] context_parts.append(f"布林带: 上轨 {bb_upper:.2f}, 下轨 {bb_lower:.2f}") # 多级别趋势分析(检测小级别反转) context_parts.append(self._analyze_multi_timeframe_trend(data)) # 量比分析 df_5m = data.get('5m') if df_5m is not None and len(df_5m) >= 20: vol_latest = df_5m['volume'].iloc[-1] vol_ma20 = df_5m['volume'].iloc[-20:-1].mean() volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1 context_parts.append(f"\n## 量价分析") context_parts.append(f"最新成交量: {vol_latest:.0f}") context_parts.append(f"20周期均量: {vol_ma20:.0f}") context_parts.append(f"量比: {volume_ratio:.2f}") if volume_ratio > 1.5: context_parts.append("量价状态: 放量 📊") elif volume_ratio < 0.7: context_parts.append("量价状态: 缩量 📉") else: context_parts.append("量价状态: 平量 ➖") # 波动率分析 volatility_analysis = self._analyze_volatility(data) if volatility_analysis: context_parts.append(f"\n## 波动率分析") context_parts.append(volatility_analysis) # 趋势位置分析(新增:避免盲目追涨杀跌) trend_position_analysis = self._analyze_trend_position(data) if trend_position_analysis: context_parts.append(f"\n## 趋势位置分析") context_parts.append(trend_position_analysis) # ========== 新增:震荡区间检测 ========== range_zone = self._detect_range_zone(data) if range_zone['is_ranging']: context_parts.append(f"\n## 🔔 震荡区间检测(重要!)") context_parts.append(f"**状态**: 震荡市(置信度: {range_zone['confidence']}%)") if range_zone['support_level'] and range_zone['resistance_level']: context_parts.append(f"**支撑位**: ${range_zone['support_level']:,.2f}") context_parts.append(f"**压力位**: ${range_zone['resistance_level']:,.2f}") context_parts.append(f"**区间宽度**: {range_zone['range_width_pct']:.2f}%") if range_zone['volume_profile_support']: context_parts.append(f"**成交量密集区支撑**: ${range_zone['volume_profile_support']:,.2f}") if range_zone['volume_profile_resistance']: context_parts.append(f"**成交量密集区压力**: ${range_zone['volume_profile_resistance']:,.2f}") context_parts.append(f"**分析**: {range_zone['analysis']}") context_parts.append(f"\n**震荡市交易策略**:") context_parts.append(f" → 下沿附近挂多单,上沿附近挂空单") context_parts.append(f" → 目标: 对岸边界,快进快出") context_parts.append(f" → 严禁追涨杀跌!") # ========== 新增:趋势反转检测 ========== reversal_detection = self._detect_trend_reversal(data) if reversal_detection['is_reversing']: context_parts.append(f"\n## ⚠️ 趋势反转信号(非常重要!)") context_parts.append(f"**检测到反转信号**!置信度: {reversal_detection['confidence']}%") if reversal_detection['reversal_type'] == 'bullish_reversal': context_parts.append(f"**反转类型**: 看涨反转 📈") else: context_parts.append(f"**反转类型**: 看跌反转 📉") context_parts.append(f"\n**反转信号详情**:") for sig in reversal_detection['signals'][:5]: # 最多显示5个信号 context_parts.append(f" - [{sig['type']}] {sig['desc']} (权重: {sig['weight']})") context_parts.append(f"\n**🚨 反转信号处理规则**:") context_parts.append(f" → 现有同向持仓建议平仓") context_parts.append(f" → 考虑反方向开仓(需等待确认)") context_parts.append(f" → 或者暂时观望,等待反转确认") context_parts.append(f" → 严禁继续原方向开新仓!") # ========== 新增:趋势阶段检测 ========== trend_stage = self._detect_trend_stage(data) if trend_stage['stage'] != 'unknown': stage_emoji = {'early': '🌱', 'middle': '🔄', 'late': '🌅'}.get(trend_stage['stage'], '❓') stage_name = {'early': '早期', 'middle': '中期', 'late': '晚期'}.get(trend_stage['stage'], '未知') context_parts.append(f"\n## 趋势阶段分析") context_parts.append(f"**当前阶段**: {stage_emoji} {stage_name}(置信度: {trend_stage['confidence']}%)") context_parts.append(f"**分析**: {trend_stage['analysis']}") if trend_stage['stage'] == 'late': context_parts.append(f"\n**⚠️ 晚期阶段警告**:") context_parts.append(f" → 趋势可能即将反转或进入震荡") context_parts.append(f" → 严禁追涨/追空开新仓") context_parts.append(f" → 现有盈利持仓建议逐步止盈") context_parts.append(f" → 等待明确反转信号后再决策") elif trend_stage['stage'] == 'early': context_parts.append(f"\n**✅ 早期阶段机会**:") context_parts.append(f" → 趋势刚启动,可顺势轻仓入场") context_parts.append(f" → 设置止损后可持有更长时间") context_parts.append(f" → 目标可看更大空间") return "\n".join(context_parts) async def _get_news_context(self, symbol: str) -> str: """获取新闻舆情上下文""" try: news_result = await self.news_service.get_crypto_news(symbol) if not news_result or not news_result.get('articles'): return "无最新新闻" articles = news_result['articles'][:5] # 只取前5条 context_parts = ["\n## 最新新闻"] for article in articles: title = article.get('title', '') source = article.get('source', '') published_at = article.get('publishedAt', '') time_str = published_at.split('T')[1][:5] if 'T' in published_at else '' context_parts.append(f"- [{time_str}] {title} ({source})") return "\n".join(context_parts) except Exception as e: logger.warning(f"获取新闻失败: {e}") return "新闻获取失败" def _analyze_trend_position(self, data: Dict[str, pd.DataFrame]) -> str: """分析趋势位置和日内交易机会(使用 EMA)+ 市场状态判断(震荡/趋势)""" try: df_30m = data.get('30m') df_15m = data.get('15m') df_1h = data.get('1h') if df_30m is None or len(df_30m) < 50: return "" latest_30m = df_30m.iloc[-1] current_price = float(latest_30m['close']) # 获取日内级别 EMA(30m) ema5_30m = latest_30m.get('ma5') # 实际是 ema5 ema10_30m = latest_30m.get('ma10') # 实际是 ema10 ema20_30m = latest_30m.get('ma20') # 实际是 ema20 if not all([ema5_30m, ema10_30m, ema20_30m]): return "" # ========== 新增:市场状态判断(震荡 vs 趋势) ========== market_state = "unknown" market_state_reason = [] # 1h EMA 趋势判断 if df_1h is not None and len(df_1h) >= 20: latest_1h = df_1h.iloc[-1] ema5_1h = latest_1h.get('ma5') ema10_1h = latest_1h.get('ma10') ema20_1h = latest_1h.get('ma20') if ema5_1h and ema10_1h and ema20_1h: # 1h EMA 多头/空头排列 → 趋势市 if ema5_1h > ema10_1h > ema20_1h: market_state = "trending" market_state_reason.append("1h EMA 多头排列") elif ema5_1h < ema10_1h < ema20_1h: market_state = "trending" market_state_reason.append("1h EMA 空头排列") else: market_state = "ranging" market_state_reason.append("1h EMA 纠缠") # 波动率判断(ATR 变化) if df_30m is not None and len(df_30m) >= 24 and 'atr' in df_30m.columns: recent_atr = df_30m['atr'].iloc[-6:].mean() # 最近3小时 older_atr = df_30m['atr'].iloc[-12:-6].mean() # 之前3小时 if pd.notna(recent_atr) and pd.notna(older_atr) and older_atr > 0: atr_change = (recent_atr - older_atr) / older_atr * 100 if atr_change > 20: if market_state != "trending": market_state = "trending" market_state_reason.append(f"ATR 扩张 {atr_change:.0f}%") elif atr_change < -20: if market_state != "ranging": market_state = "ranging" market_state_reason.append(f"ATR 收缩 {abs(atr_change):.0f}%") # 价格动量判断(15m) if df_15m is not None and len(df_15m) >= 20: recent_high = df_15m['high'].iloc[-20:].max() recent_low = df_15m['low'].iloc[-20:].min() price_range = (recent_high - recent_low) / current_price * 100 if price_range < 2.5: # 15分钟内波动小于2.5% → 震荡 if market_state != "trending": market_state = "ranging" market_state_reason.append(f"15m 波动 {price_range:.1f}% 较小") elif price_range > 4: # 15分钟内波动大于4% → 趋势 if market_state != "ranging": market_state = "trending" market_state_reason.append(f"15m 波动 {price_range:.1f}% 较大") # 判断日内趋势(30m EMA 为主) if ema5_30m > ema10_30m > ema20_30m: intraday_trend = "上升" intraday_emoji = "📈" elif ema5_30m < ema10_30m < ema20_30m: intraday_trend = "下跌" intraday_emoji = "📉" else: intraday_trend = "震荡" intraday_emoji = "➖" # 构建市场状态分析 analysis_parts = [] # 市场状态显示(新增) if market_state == "trending": state_emoji = "📊" state_text = f"{state_emoji} **市场状态: 趋势市**" analysis_parts.append(state_text) analysis_parts.append(f" 判断依据: {', '.join(market_state_reason)}") analysis_parts.append(f" 策略: 跟随趋势,等待回调/反弹到 EMA20 顺势入场") analysis_parts.append(f" 目标: 3-5%,盈亏比 ≥ 1:1.5") analysis_parts.append(f" 严禁: 逆势做超短线") elif market_state == "ranging": state_emoji = "🔄" state_text = f"{state_emoji} **市场状态: 震荡市**" analysis_parts.append(state_text) analysis_parts.append(f" 判断依据: {', '.join(market_state_reason)}") analysis_parts.append(f" 策略: 5分钟级别高抛低吸,支撑位多、压力位空") analysis_parts.append(f" 目标: 1-2%,盈亏比 ≥ 1:1.2") analysis_parts.append(f" 严禁: 追涨杀跌") else: analysis_parts.append(f"⚠️ 市场状态: 不明确,观望为主") analysis_parts.append(f"") analysis_parts.append(f"日内趋势(30m EMA): {intraday_emoji} {intraday_trend}") analysis = analysis_parts # 检查15分钟级别入场时机 if df_15m is not None and len(df_15m) >= 20: latest_15m = df_15m.iloc[-1] rsi_15m = latest_15m.get('rsi', 50) ema5_15m = latest_15m.get('ma5') # 实际是 ema5 ema20_15m = latest_15m.get('ma20') # 实际是 ema20 # 检查短期动能 if len(df_15m) >= 5: recent_closes = df_15m['close'].iloc[-5:].values is_accelerating = all(recent_closes[i] > recent_closes[i-1] for i in range(1, 5)) # 检查连续大阳线/阴线(快速移动) recent_changes = [(recent_closes[i] - recent_closes[i-1]) / recent_closes[i-1] * 100 for i in range(1, len(recent_closes))] big_moves = sum(1 for change in recent_changes if abs(change) > 0.3) is_rapid_moving = big_moves >= 3 avg_move = sum(abs(c) for c in recent_changes) / len(recent_changes) if recent_changes else 0 else: is_accelerating = False is_rapid_moving = False avg_move = 0 # 计算价格偏离 if ema5_15m and ema20_15m: deviation_ema5_15m = abs(current_price - ema5_15m) / ema5_15m * 100 distance_to_ema20 = abs(current_price - ema20_15m) / ema20_15m * 100 else: deviation_ema5_15m = 0 distance_to_ema20 = 0 # 检查成交量 df_5m = data.get('5m') volume_ratio = 1 if df_5m is not None and len(df_5m) >= 20: vol_latest = df_5m['volume'].iloc[-1] vol_ma20 = df_5m['volume'].iloc[-20:-1].mean() volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1 # 检查5m连续K线走势 if len(df_5m) >= 3: recent_5m_closes = df_5m['close'].iloc[-3:].values recent_5m_changes = [(recent_5m_closes[i] - recent_5m_closes[i-1]) / recent_5m_closes[i-1] * 100 for i in range(1, len(recent_5m_closes))] big_5m_moves = sum(1 for change in recent_5m_changes if abs(change) > 0.3) is_5m_accelerating = big_5m_moves >= 2 else: is_5m_accelerating = False # 日内过度延伸检查(EMA 反应更快,阈值更严格) is_overextended = ( (rsi_15m > 70 and intraday_trend == "上升") or (rsi_15m < 30 and intraday_trend == "下跌") or deviation_ema5_15m > 3 ) if intraday_trend == "上升": # 价格加速检查 - 强制观望,防止追涨 if is_rapid_moving and volume_ratio > 1.5 and deviation_ema5_15m > 0.5: analysis.append(f"⚠️ 15m: 价格正在快速上涨!连续{big_moves}根大阳线,平均涨幅{avg_move:.2f}%") analysis.append(f" → 量比 {volume_ratio:.1f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 🚨 **严禁追涨!强制 HOLD 观望**,等待回调后再考虑") analysis.append(f" → 如果要入场,等待回调到 EMA20 支撑位用 limit 挂单") analysis.append(f" → 追涨是持续止损的主要原因!") elif is_overextended: analysis.append(f"⚠️ 15m 过度延伸: RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 不要追多,等待回调") elif is_accelerating and not is_overextended: analysis.append(f"15m: 正在上涨中,建议等待回调") analysis.append(f" → 等待回调到 EMA20 支撑位用 limit 挂单做多") analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") elif distance_to_ema20 < 1: analysis.append(f"15m: 回调到 EMA20 支撑位附近") analysis.append(f" → 支撑位做多反弹(EMA20: ${ema20_15m:.0f})") analysis.append(f" → 用 limit 挂单入场,止损1%,目标2-3%,盈亏比 >= 1:1.5") else: analysis.append(f"15m: 上涨中,耐心等待回调机会") analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 不要追多,等待回调到支撑位") elif intraday_trend == "下跌": # 价格加速检查 - 强制观望,防止杀跌 if is_rapid_moving and volume_ratio > 1.5 and deviation_ema5_15m > 0.5: analysis.append(f"⚠️ 15m: 价格正在快速下跌!连续{big_moves}根大阴线,平均跌幅{avg_move:.2f}%") analysis.append(f" → 量比 {volume_ratio:.1f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 🚨 **严禁杀跌!强制 HOLD 观望**,等待反弹后再考虑") analysis.append(f" → 如果要入场,等待反弹到 EMA20 压力位用 limit 挂单") analysis.append(f" → 杀跌是持续止损的主要原因!") elif is_overextended: analysis.append(f"⚠️ 15m 过度延伸: RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 不要追空,等待反弹") elif is_accelerating and not is_overextended: analysis.append(f"15m: 正在下跌中,建议等待反弹") analysis.append(f" → 等待反弹到 EMA20 压力位用 limit 挂单做空") analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") elif distance_to_ema20 < 1: analysis.append(f"15m: 反弹到 EMA20 压力位附近") analysis.append(f" → 压力位做空回调(EMA20: ${ema20_15m:.0f})") analysis.append(f" → 用 limit 挂单入场,止损1%,目标2-3%,盈亏比 >= 1:1.5") else: analysis.append(f"15m: 下跌中,耐心等待反弹机会") analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 不要追空,等待反弹到压力位") else: analysis.append(f"15m: 震荡,观望或双向轻仓") analysis.append(f" → 支撑位多,压力位空,盈亏比 >= 1:1.5") # 日内交易要点 analysis.append(f"\n💡 稳健交易要点:") analysis.append(f"- **90%用limit挂单,10%用market**:耐心等待回调,不要追涨杀跌") analysis.append(f"- **价格加速时强制HOLD**:连续大阳/阴线时观望,等回调/反弹") analysis.append(f"- **RSI极端区强制HOLD**:>65(多)或 <35(空)时不入场") analysis.append(f"- **偏离EMA5>1.5%强制HOLD**:价格过度延伸,等待回归") analysis.append(f"- **盈亏比第一**: 必须 >= 1:1.5,否则不开仓") analysis.append(f"- **快进快出**: 持仓不超过4小时") analysis.append(f"- **严格止损**: 1-1.5%(不使用小止损博弈)") analysis.append(f"- **目标盈利**: 2-3%") analysis.append(f"- **宁可错过,不做错**: 追涨杀跌是持续止损的主要原因") return "\n".join(analysis) if analysis else "" except Exception as e: logger.warning(f"趋势位置分析失败: {e}") return "" def _build_analysis_prompt(self, symbol: str, market_context: str, news_context: str, previous_signal: Dict[str, Any] = None) -> str: """构建分析提示词""" prompt_parts = [ f"请分析 {symbol} 的市场情况:\n", market_context, "", news_context ] # 添加历史信号上下文 if previous_signal: prev_time = previous_signal.get('timestamp', 'Unknown') prev_trend = previous_signal.get('trend', 'Unknown') prev_signals = previous_signal.get('signals', []) prompt_parts.append("\n" + "="*60) prompt_parts.append("## 上一轮分析信号(必须参考!)") prompt_parts.append("="*60) prompt_parts.append(f"分析时间: {prev_time}") prompt_parts.append(f"趋势判断: {prev_trend}") if prev_signals: prompt_parts.append("\n之前给出的信号:") for i, sig in enumerate(prev_signals, 1): action = sig.get('action', 'N/A') confidence = sig.get('confidence', 0) timeframe = sig.get('timeframe', 'unknown') type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'} type_text = type_map.get(timeframe, timeframe) entry = sig.get('entry_price', 'N/A') sl = sig.get('stop_loss', 'N/A') tp = sig.get('take_profit', 'N/A') reasoning = sig.get('reasoning', 'N/A') prompt_parts.append( f"\n[{i}] {type_text} | {action} | 信心度: {confidence}%\n" f" 入场: ${entry}\n" f" 止损: ${sl}\n" f" 止盈: ${tp}\n" f" 理由: {reasoning}" ) # 重点警告 prompt_parts.append("\n" + "!"*60) prompt_parts.append("🚨 严禁重复信号!") prompt_parts.append("!"*60) prompt_parts.append("如果上一轮已经给出了相同方向的信号(做空/做多),") prompt_parts.append("且趋势没有发生明确反转,") prompt_parts.append("绝对不要重复给出相同方向的信号!") prompt_parts.append("") prompt_parts.append("只有在以下情况才输出新信号:") prompt_parts.append(" ✓ 趋势发生了明确的反转") prompt_parts.append(" ✓ 上一轮是观望,现在出现了新的明确机会") prompt_parts.append(" ✓ 价格已触及上一轮的止损/止盈价位") prompt_parts.append("") prompt_parts.append("以下情况不要输出信号:") prompt_parts.append(" ✗ 趋势延续,只是价格继续向同一方向移动") prompt_parts.append(" ✗ 仅仅因为均线排列仍然有效") prompt_parts.append(" ✗ 没有明显的市场变化") else: prompt_parts.append("\n上一轮没有给出交易信号(市场观望建议)") prompt_parts.append("\n你可以基于当前市场情况给出新的信号。") prompt_parts.append("\n" + "="*60) prompt_parts.append("\n请根据以上数据,给出你的市场判断和交易信号。") return "\n".join(prompt_parts) def _analyze_multi_timeframe_trend(self, data: Dict[str, Any]) -> str: """ 多级别趋势分析 - 检测小级别反转信号 目的:识别小级别(15m/30m)已经反转,但大级别(1h/4h)还未反应的情况 这样可以提前捕捉反转信号,而不是等待均线系统确认 """ context_parts = ["\n## 🔄 多级别趋势分析(检测反转信号)"] # 定义各级别 timeframes = { '5m': ('超短线', 5), '15m': ('短线', 15), '30m': ('日内', 30), '1h': ('小时', 60), '4h': ('趋势', 240) } trend_status = {} # 存储各级别趋势状态 # 分析各级别趋势 for tf, (tf_name, minutes) in timeframes.items(): df = data.get(tf) if df is None or len(df) < 10: continue latest = df.iloc[-1] prev = df.iloc[-2] # 1. 均线趋势判断 ma5 = latest.get('ma5', 0) ma10 = latest.get('ma10', 0) ma20 = latest.get('ma20', 0) ma_trend = None if ma5 and ma10 and ma20: if ma5 > ma10 > ma20: ma_trend = 'bull' elif ma5 < ma10 < ma20: ma_trend = 'bear' else: ma_trend = 'neutral' # 2. MACD 趋势判断 macd_trend = None if 'macd' in df.columns and 'macd_signal' in df.columns: macd = df['macd'].iloc[-1] signal = df['macd_signal'].iloc[-1] hist = df.get('macd_hist', pd.Series([0])).iloc[-1] if macd > 0 and signal > 0: macd_trend = 'bull' elif macd < 0 and signal < 0: macd_trend = 'bear' else: macd_trend = 'neutral' # 3. 价格动量(最近3根K线) close_3 = df['close'].iloc[-3] close_2 = df['close'].iloc[-2] close_1 = df['close'].iloc[-1] price_momentum = 'up' if close_1 > close_3 else 'down' if close_1 < close_3 else 'flat' # 综合判断趋势 if ma_trend == 'bull' and (macd_trend == 'bull' or price_momentum == 'up'): trend = 'bull' elif ma_trend == 'bear' and (macd_trend == 'bear' or price_momentum == 'down'): trend = 'bear' elif price_momentum == 'up' and macd_trend == 'bull': trend = 'bull' elif price_momentum == 'down' and macd_trend == 'bear': trend = 'bear' else: trend = 'neutral' trend_status[tf] = { 'name': tf_name, 'trend': trend, 'ma_trend': ma_trend, 'macd_trend': macd_trend, 'momentum': price_momentum, 'price': float(latest['close']), 'change_3': ((close_1 - close_3) / close_3 * 100) if close_3 > 0 else 0 } # 生成多级别趋势报告 if not trend_status: context_parts.append("⚠️ 数据不足,无法进行多级别分析") return "\n".join(context_parts) # 检测反转信号 reversal_signals = [] # 1. 小级别反转但大级别未反转 if ('15m' in trend_status and '1h' in trend_status and trend_status['15m']['trend'] != trend_status['1h']['trend'] and trend_status['15m']['trend'] != 'neutral'): small_tf = trend_status['15m'] large_tf = trend_status['1h'] reversal_type = "🔄 反转信号" if large_tf['trend'] != 'neutral' else "⚡ 启动信号" reversal_signals.append( f"{reversal_type}: 15分钟[{small_tf['trend']}] vs 1小时[{large_tf['trend']}]" ) reversal_signals.append( f" 15分钟变动: {small_tf['change_3']:+.2f}% | 价格: ${small_tf['price']:.2f}" ) # 2. 30分钟反转但4小时未反转(更强的反转信号) if ('30m' in trend_status and '4h' in trend_status and trend_status['30m']['trend'] != trend_status['4h']['trend'] and trend_status['30m']['trend'] != 'neutral'): small_tf = trend_status['30m'] large_tf = trend_status['4h'] reversal_type = "🔄 强反转" if large_tf['trend'] != 'neutral' else "⚡ 趋势启动" reversal_signals.append( f"{reversal_type}: 30分钟[{small_tf['trend']}] vs 4小时[{large_tf['trend']}]" ) reversal_signals.append( f" 30分钟变动: {small_tf['change_3']:+.2f}% | 价格: ${small_tf['price']:.2f}" ) # 添加各级别趋势详情 context_parts.append("\n各级别趋势状态:") for tf in ['5m', '15m', '30m', '1h', '4h']: if tf in trend_status: status = trend_status[tf] trend_icon = {'bull': '📈', 'bear': '📉', 'neutral': '➡️'}.get(status['trend'], '❓') context_parts.append( f" {tf} ({status['name']}): {trend_icon} {status['trend']} " f"| 动量: {status['change_3']:+.2f}% | 价格: ${status['price']:.2f}" ) # 添加反转信号 if reversal_signals: context_parts.append("\n⚠️ 检测到级别背离/反转信号:") context_parts.extend(reversal_signals) context_parts.append("\n💡 提示: 小级别已反转但大级别滞后,可考虑:") context_parts.append(" - 反手操作(平掉旧仓位,开新方向仓位)") context_parts.append(" - 顺势短线(跟随小级别趋势,快进快出)") context_parts.append(" - 等待大级别确认(避免假突破)") else: context_parts.append("\n✅ 各级别趋势一致,无反转信号") return "\n".join(context_parts) def _parse_llm_response(self, response: str, symbol: str) -> Dict[str, Any]: """解析 LLM 响应""" try: # 尝试提取 JSON json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response) if json_match: json_str = json_match.group(1) else: json_match = re.search(r'\{[\s\S]*\}', response) if json_match: json_str = json_match.group(0) else: raise ValueError("无法找到 JSON 响应") # 清理 JSON 字符串(移除可能导致解析错误的注释等) json_str = self._clean_json_string(json_str) logger.debug(f"解析的 JSON 字符串: {json_str[:500]}...") # 打印前500字符用于调试 result = json.loads(json_str) # 清理价格字段 - 转换为 float result = self._clean_price_fields(result) # 添加元数据 result['symbol'] = symbol result['timestamp'] = datetime.now().isoformat() result['raw_response'] = response # 兼容处理:确保 signals 中的字段与旧格式一致 if 'signals' in result: for sig in result['signals']: # LLM 输出的 "type" 是 timeframe (short_term/medium_term/long_term) # 需要映射为 "timeframe",而 "action" 才是 buy/sell/wait if 'type' in sig: # 如果 type 是 short_term/medium_term/long_term,映射为 timeframe if sig['type'] in ['short_term', 'medium_term', 'long_term']: sig['timeframe'] = sig.pop('type') # 如果 type 是 buy/sell/wait,映射为 action elif sig['type'] in ['buy', 'sell', 'wait']: sig['action'] = sig.pop('type') # 确保 action 字段存在 if 'action' not in sig and 'timeframe' in sig: # 从 reasoning 或其他字段推断 action sig['action'] = 'wait' # 确保 grade 字段存在 if 'grade' not in sig: # 根据 confidence 推断 grade confidence = sig.get('confidence', 0) if confidence >= 80: sig['grade'] = 'A' elif confidence >= 60: sig['grade'] = 'B' elif confidence >= 40: sig['grade'] = 'C' else: sig['grade'] = 'D' # 处理趋势字段 - 优先使用 LLM 返回的趋势字段,否则从信号推断 if 'trend_direction' not in result or 'trend_strength' not in result: # 从 signals 中推断趋势 if 'signals' in result and result['signals']: best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0)) action = best_signal.get('action', 'wait') confidence = best_signal.get('confidence', 0) # 推断趋势方向(如果 LLM 没有提供) if 'trend_direction' not in result: if action == 'buy': result['trend_direction'] = 'uptrend' elif action == 'sell': result['trend_direction'] = 'downtrend' else: result['trend_direction'] = 'neutral' # 推断趋势强度(如果 LLM 没有提供) if 'trend_strength' not in result: result['trend_strength'] = 'strong' if confidence >= 70 else 'medium' if confidence >= 50 else 'weak' # 从信号中推断 market_state(用于向后兼容) if 'signals' in result and result['signals']: best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0)) action = best_signal.get('action', 'wait') confidence = best_signal.get('confidence', 0) trend_direction = result.get('trend_direction', 'neutral') # 推断市场状态 if confidence >= 70 and trend_direction != 'neutral': if trend_direction == 'uptrend': result['market_state'] = '强势上涨' elif trend_direction == 'downtrend': result['market_state'] = '强势下跌' else: result['market_state'] = '震荡整理' else: result['market_state'] = '震荡整理' # 推断 trend(用于向后兼容,简化的趋势字段) if 'trend' not in result: if trend_direction == 'uptrend': result['trend'] = 'up' elif trend_direction == 'downtrend': result['trend'] = 'down' else: result['trend'] = 'sideways' else: result['market_state'] = '无明确信号' if 'trend' not in result: result['trend'] = 'sideways' logger.info(f"✅ 市场信号分析完成: {symbol}") logger.debug(f"市场信号: {json.dumps(result, ensure_ascii=False, indent=2)}") return result except Exception as e: logger.warning(f"解析 LLM 响应失败: {e}") logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符 return self._get_empty_signal(symbol) def _clean_json_string(self, json_str: str) -> str: """清理 JSON 字符串,移除可能导致解析错误的内容""" # 移除单行注释 // ... json_str = re.sub(r'//.*?(?=\n|$)', '', json_str) # 移除多行注释 /* ... */ json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str) # 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}) json_str = re.sub(r',\s*([}\]])', r'\1', json_str) return json_str def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: """清理价格字段,转换为 float""" def clean_price(price_value): if price_value is None: return None if isinstance(price_value, (int, float)): return float(price_value) if isinstance(price_value, str): # 移除 $ 符号和逗号 cleaned = price_value.replace('$', '').replace(',', '').strip() if cleaned: try: return float(cleaned) except ValueError: return None return None # 清理 key_levels 中的支撑位和阻力位 if 'key_levels' in data and data['key_levels']: key_levels = data['key_levels'] if 'support' in key_levels: data['key_levels']['support'] = [clean_price(s) for s in key_levels['support']] if 'resistance' in key_levels: data['key_levels']['resistance'] = [clean_price(r) for r in key_levels['resistance']] # 清理 signals 中的价格字段 if 'signals' in data: # 标记需要移除的信号索引 signals_to_remove = [] for idx, sig in enumerate(data['signals']): price_fields = ['entry_price', 'stop_loss', 'take_profit'] for field in price_fields: if field in sig: sig[field] = clean_price(sig[field]) # 验证止损止盈价格的合理性 entry_price = sig.get('entry_price') stop_loss = sig.get('stop_loss') take_profit = sig.get('take_profit') action = sig.get('action', '') if entry_price and entry_price > 0: MAX_REASONABLE_DEVIATION = 0.50 # 50% has_invalid_price = False # 检查止损 if stop_loss is not None: deviation = abs(stop_loss - entry_price) / entry_price if deviation > MAX_REASONABLE_DEVIATION: logger.warning(f"⚠️ [{data.get('symbol', '')}] 信号止损价格不合理: entry={entry_price}, stop_loss={stop_loss}, 偏离={deviation*100:.1f}%") has_invalid_price = True elif action == 'buy' and stop_loss >= entry_price: logger.warning(f"⚠️ [{data.get('symbol', '')}] 做多止损错误: entry={entry_price}, stop_loss={stop_loss} 应该 < entry") has_invalid_price = True elif action == 'sell' and stop_loss <= entry_price: logger.warning(f"⚠️ [{data.get('symbol', '')}] 做空止损错误: entry={entry_price}, stop_loss={stop_loss} 应该 > entry") has_invalid_price = True # 检查止盈 if take_profit is not None: deviation = abs(take_profit - entry_price) / entry_price if deviation > MAX_REASONABLE_DEVIATION: logger.warning(f"⚠️ [{data.get('symbol', '')}] 信号止盈价格不合理: entry={entry_price}, take_profit={take_profit}, 偏离={deviation*100:.1f}%") has_invalid_price = True elif action == 'buy' and take_profit <= entry_price: logger.warning(f"⚠️ [{data.get('symbol', '')}] 做多止盈错误: entry={entry_price}, take_profit={take_profit} 应该 > entry") has_invalid_price = True elif action == 'sell' and take_profit >= entry_price: logger.warning(f"⚠️ [{data.get('symbol', '')}] 做空止盈错误: entry={entry_price}, take_profit={take_profit} 应该 < entry") has_invalid_price = True # 如果价格不合理,降低等级为 D 或移除信号 if has_invalid_price: original_grade = sig.get('grade', 'C') sig['grade'] = 'D' sig['confidence'] = 0 # 添加错误说明 if 'reasoning' in sig: sig['reasoning'] = f"[价格异常] {sig['reasoning']}" logger.error(f"❌ [{data.get('symbol', '')}] 信号价格异常,等级从 {original_grade} 降为 D,止损止盈已清空") # 清空不合理的价格 sig['stop_loss'] = None sig['take_profit'] = None return data def _calculate_price_change_24h(self, df) -> str: """计算24小时涨跌幅""" try: if df is None or len(df) < 24: return "N/A" current_price = float(df['close'].iloc[-1]) price_24h_ago = float(df['close'].iloc[-24]) change = ((current_price - price_24h_ago) / price_24h_ago) * 100 sign = "+" if change >= 0 else "" return f"{sign}{change:.2f}%" except Exception as e: logger.debug(f"计算24h涨跌失败: {e}") return "N/A" def _get_empty_signal(self, symbol: str) -> Dict[str, Any]: """返回空信号""" return { 'symbol': symbol, 'trend_direction': 'neutral', 'trend_strength': 'weak', 'analysis_summary': 'unknown', 'volume_analysis': '分析失败', 'news_sentiment': 'neutral', 'news_impact': '无', 'market_state': '分析失败', 'trend': 'sideways', 'signals': [], 'key_levels': {}, 'timestamp': datetime.now().isoformat(), 'error': '信号分析失败' } def _analyze_volatility(self, data: Dict[str, pd.DataFrame]) -> str: """分析波动率变化(使用 30m 作为日内主周期)""" df = data.get('30m') if df is None or len(df) < 24 or 'atr' not in df.columns: return "" lines = [] # ATR 变化趋势 recent_atr = df['atr'].iloc[-6:].mean() # 最近 6 根(3小时) older_atr = df['atr'].iloc[-12:-6].mean() # 之前 6 根 if pd.isna(recent_atr) or pd.isna(older_atr) or older_atr == 0: return "" atr_change = (recent_atr - older_atr) / older_atr * 100 current_atr = float(df['atr'].iloc[-1]) current_price = float(df['close'].iloc[-1]) atr_percent = current_atr / current_price * 100 lines.append(f"当前 ATR (30m): ${current_atr:.2f} ({atr_percent:.2f}%)") if atr_change > 20: lines.append(f"**波动率扩张**: ATR 上升 {atr_change:.0f}%,日内趋势可能启动") elif atr_change < -20: lines.append(f"**波动率收缩**: ATR 下降 {abs(atr_change):.0f}%,可能即将变盘") else: lines.append(f"波动率稳定: ATR 变化 {atr_change:+.0f}%") # 布林带宽度 if 'bb_upper' in df.columns and 'bb_lower' in df.columns: bb_width = (float(df['bb_upper'].iloc[-1]) - float(df['bb_lower'].iloc[-1])) / current_price * 100 bb_width_prev = (float(df['bb_upper'].iloc[-6]) - float(df['bb_lower'].iloc[-6])) / float(df['close'].iloc[-6]) * 100 if bb_width < bb_width_prev * 0.8: lines.append(f"**布林带收口**: 宽度 {bb_width:.1f}%,变盘信号") elif bb_width > bb_width_prev * 1.2: lines.append(f"**布林带开口**: 宽度 {bb_width:.1f}%,趋势延续") return "\n".join(lines) def _detect_range_zone(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: """ 检测震荡区间 - 计算明确的支撑位和压力位 使用多种方法综合判断: 1. 价格通道(最近N根K线的最高/最低价) 2. 成交量密集区(Volume Profile) 3. 布林带 4. EMA支撑/压力 """ result = { 'is_ranging': False, 'support_level': None, 'resistance_level': None, 'range_width_pct': None, 'confidence': 0, 'volume_profile_support': None, 'volume_profile_resistance': None, 'analysis': '' } try: df_30m = data.get('30m') df_1h = data.get('1h') df_15m = data.get('15m') if df_30m is None or len(df_30m) < 48: # 需要至少48根K线(24小时) return result current_price = float(df_30m['close'].iloc[-1]) # ========== 1. 价格通道分析 ========== # 使用最近24-48根K线(12-24小时)计算价格通道 lookback_periods = [24, 36, 48] price_channels = [] for period in lookback_periods: if len(df_30m) >= period: period_data = df_30m.iloc[-period:] high = period_data['high'].max() low = period_data['low'].min() price_channels.append({'high': high, 'low': low, 'width': high - low}) # 选择波动最稳定的通道(宽度变化最小的) if price_channels: avg_width = sum(pc['width'] for pc in price_channels) / len(price_channels) selected_channel = min(price_channels, key=lambda pc: abs(pc['width'] - avg_width)) support = selected_channel['low'] resistance = selected_channel['high'] range_width = resistance - support range_width_pct = (range_width / current_price) * 100 # 震荡区间判断标准 # 1. 区间宽度 < 5%(震荡市) # 2. 价格在区间中位数附近 # 3. EMA 纠缠 is_narrow_range = range_width_pct < 5.0 price_in_middle = (current_price - support) / range_width > 0.3 and \ (current_price - support) / range_width < 0.7 # EMA 纠缠检查 ema5 = df_30m['ma5'].iloc[-1] if 'ma5' in df_30m.columns else None ema10 = df_30m['ma10'].iloc[-1] if 'ma10' in df_30m.columns else None ema20 = df_30m['ma20'].iloc[-1] if 'ma20' in df_30m.columns else None ema_entangled = False if all([ema5, ema10, ema20]): ema_spread = (max(ema5, ema10, ema20) - min(ema5, ema10, ema20)) / current_price * 100 ema_entangled = ema_spread < 1.0 # EMA 排列差距 < 1% # ========== 2. 成交量密集区分析 ========== volume_profile_support = None volume_profile_resistance = None if len(df_30m) >= 48: # 找出成交量最大的价格区间 df_30m_copy = df_30m.iloc[-48:].copy() df_30m_copy['avg_price'] = (df_30m_copy['high'] + df_30m_copy['low'] + df_30m_copy['close']) / 3 df_30m_copy['volume_weight'] = df_30m_copy['volume'] * df_30m_copy['avg_price'] # 按价格分层,找出高成交量区域 price_bins = pd.cut(df_30m_copy['avg_price'], bins=10) volume_by_price = df_30m_copy.groupby(price_bins, observed=True)['volume'].sum() if len(volume_by_price) > 0: # 高成交量区作为支撑/压力 max_vol_bin = volume_by_price.idxmax() if max_vol_bin is not None: vp_level = (max_vol_bin.left + max_vol_bin.right) / 2 if vp_level < current_price * 0.98: volume_profile_support = float(vp_level) elif vp_level > current_price * 1.02: volume_profile_resistance = float(vp_level) # ========== 3. 布林带支撑/压力 ========== bb_support = None bb_resistance = None if 'bb_lower' in df_30m.columns and 'bb_upper' in df_30m.columns: bb_support = float(df_30m['bb_lower'].iloc[-1]) bb_resistance = float(df_30m['bb_upper'].iloc[-1]) # ========== 4. 关键价格点综合 ========== # 综合多个指标得出最可靠的支撑/压力位 support_candidates = [] resistance_candidates = [] if support: support_candidates.append(support) if volume_profile_support: support_candidates.append(volume_profile_support) if bb_support: support_candidates.append(bb_support) if resistance: resistance_candidates.append(resistance) if volume_profile_resistance: resistance_candidates.append(volume_profile_resistance) if bb_resistance: resistance_candidates.append(bb_resistance) # 取中位数作为最终的支撑/压力位 final_support = np.median(support_candidates) if support_candidates else None final_resistance = np.median(resistance_candidates) if resistance_candidates else None # ========== 5. 计算置信度 ========== confidence = 0 reasons = [] if is_narrow_range: confidence += 30 reasons.append(f"区间窄({range_width_pct:.1f}%)") if price_in_middle: confidence += 20 reasons.append("价格在中部") if ema_entangled: confidence += 25 reasons.append("EMA纠缠") # 成交量分布检查 - 如果成交量在区间两端较小,说明是有效震荡 if len(df_30m) >= 24: recent_vol = df_30m['volume'].iloc[-12:].mean() older_vol = df_30m['volume'].iloc[-24:-12].mean() if abs(recent_vol - older_vol) / older_vol < 0.3: confidence += 15 reasons.append("成交量平稳") # 价格反弹次数 - 检查在支撑/压力位附近是否有多次反弹 if final_support and final_resistance: bounce_count = 0 for i in range(-24, 0): if i >= -len(df_30m): row = df_30m.iloc[i] # 检查是否在支撑位附近反弹 if abs(row['low'] - final_support) / final_support < 0.005 and row['close'] > row['open']: bounce_count += 1 # 检查是否在压力位附近回落 if abs(row['high'] - final_resistance) / final_resistance < 0.005 and row['close'] < row['open']: bounce_count += 1 if bounce_count >= 2: confidence += 10 reasons.append(f"边界反弹{bounce_count}次") result.update({ 'is_ranging': confidence >= 60, 'support_level': float(final_support) if final_support else None, 'resistance_level': float(final_resistance) if final_resistance else None, 'range_width_pct': range_width_pct, 'confidence': confidence, 'volume_profile_support': volume_profile_support, 'volume_profile_resistance': volume_profile_resistance, 'analysis': f"震荡判断: {confidence}% ({', '.join(reasons) if reasons else '无'})" }) except Exception as e: logger.warning(f"震荡区间检测失败: {e}") import traceback logger.debug(traceback.format_exc()) return result def _detect_trend_reversal(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: """ 检测趋势反转信号 综合多个指标判断趋势是否可能反转: 1. RSI 背离(价格创新高但RSI不创新高 / 价格创新低但RSI不创新低) 2. MACD 柱状图缩短/背离 3. 量价背离(价格上涨但成交量下降) 4. 关键K线形态(吞没、锤子线、十字星等) 5. 多周期趋势不一致 """ result = { 'is_reversing': False, 'reversal_type': None, # 'bullish_reversal' or 'bearish_reversal' 'confidence': 0, 'signals': [], 'analysis': '' } try: df_15m = data.get('15m') df_30m = data.get('30m') df_1h = data.get('1h') if df_15m is None or len(df_15m) < 30: return result reversal_signals = [] bullish_signals = 0 bearish_signals = 0 # ========== 1. RSI 背离检测 ========== if 'rsi' in df_15m.columns and len(df_15m) >= 20: recent_5 = df_15m.iloc[-5:] prev_5 = df_15m.iloc[-15:-10] # 顶背离(看跌反转) recent_high = recent_5['high'].max() recent_rsi_at_high = recent_5.loc[recent_5['high'] == recent_high, 'rsi'].values[0] prev_high = prev_5['high'].max() prev_rsi_at_high = prev_5.loc[prev_5['high'] == prev_high, 'rsi'].values[0] if recent_high > prev_high and recent_rsi_at_high < prev_rsi_at_high: bearish_signals += 2 reversal_signals.append({ 'type': 'rsi_divergence', 'direction': 'bearish', 'weight': 2, 'desc': 'RSI顶背离:价格创新高但RSI不创新高' }) # 底背离(看涨反转) recent_low = recent_5['low'].min() recent_rsi_at_low = recent_5.loc[recent_5['low'] == recent_low, 'rsi'].values[0] prev_low = prev_5['low'].min() prev_rsi_at_low = prev_5.loc[prev_5['low'] == prev_low, 'rsi'].values[0] if recent_low < prev_low and recent_rsi_at_low > prev_rsi_at_low: bullish_signals += 2 reversal_signals.append({ 'type': 'rsi_divergence', 'direction': 'bullish', 'weight': 2, 'desc': 'RSI底背离:价格创新低但RSI不创新低' }) # ========== 2. MACD 柱状图分析 ========== if 'macd_hist' in df_15m.columns and len(df_15m) >= 10: hist_recent = df_15m['macd_hist'].iloc[-3:].values hist_prev = df_15m['macd_hist'].iloc[-6:-3].values # MACD 柱状图缩短 = 动能衰竭 if all(h > 0 for h in hist_prev): # 之前是正向 if hist_recent[2] < hist_recent[1] < hist_recent[0]: # 持续缩短 bearish_signals += 1 reversal_signals.append({ 'type': 'macd_histogram', 'direction': 'bearish', 'weight': 1, 'desc': 'MACD柱状图持续缩短:上涨动能衰竭' }) if all(h < 0 for h in hist_prev): # 之前是负向 if hist_recent[2] > hist_recent[1] > hist_recent[0]: # 持续收窄 bullish_signals += 1 reversal_signals.append({ 'type': 'macd_histogram', 'direction': 'bullish', 'weight': 1, 'desc': 'MACD柱状图持续收窄:下跌动能衰竭' }) # MACD 金叉/死叉 if len(df_15m) >= 2: macd_current = df_15m['macd'].iloc[-1] signal_current = df_15m['macd_signal'].iloc[-1] macd_prev = df_15m['macd'].iloc[-2] signal_prev = df_15m['macd_signal'].iloc[-2] # 金叉 if macd_prev <= signal_prev and macd_current > signal_current: bullish_signals += 1 reversal_signals.append({ 'type': 'macd_cross', 'direction': 'bullish', 'weight': 1, 'desc': 'MACD金叉' }) # 死叉 if macd_prev >= signal_prev and macd_current < signal_current: bearish_signals += 1 reversal_signals.append({ 'type': 'macd_cross', 'direction': 'bearish', 'weight': 1, 'desc': 'MACD死叉' }) # ========== 3. 量价背离检测 ========== if 'volume' in df_15m.columns and len(df_15m) >= 10: recent_price_change = (df_15m['close'].iloc[-1] - df_15m['close'].iloc[-5]) / df_15m['close'].iloc[-5] recent_volume = df_15m['volume'].iloc[-5:].mean() older_volume = df_15m['volume'].iloc[-10:-5].mean() # 价格上涨但成交量下降(量价背离) if recent_price_change > 0.01 and recent_volume < older_volume * 0.8: bearish_signals += 1 reversal_signals.append({ 'type': 'volume_divergence', 'direction': 'bearish', 'weight': 1, 'desc': '量价背离:价格上涨但成交量萎缩' }) # 价格下跌但成交量下降(可能见底) if recent_price_change < -0.01 and recent_volume < older_volume * 0.7: bullish_signals += 1 reversal_signals.append({ 'type': 'volume_divergence', 'direction': 'bullish', 'weight': 1, 'desc': '下跌缩量:抛压枯竭,可能见底' }) # ========== 4. 关键K线形态检测 ========== if len(df_15m) >= 3: latest = df_15m.iloc[-1] prev = df_15m.iloc[-2] # 吞没形态 open_latest, close_latest = latest['open'], latest['close'] open_prev, close_prev = prev['open'], prev['close'] # 阳包阴(看涨) if (close_latest > open_latest and # 当前是阳线 close_prev < open_prev and # 前一个是阴线 open_latest <= close_prev and # 开盘价低于前一个收盘价 close_latest >= open_prev): # 收盘价高于前一个开盘价 bullish_signals += 2 reversal_signals.append({ 'type': 'candlestick', 'direction': 'bullish', 'weight': 2, 'desc': '阳包阴吞没形态(强反转信号)' }) # 阴包阳(看跌) if (close_latest < open_latest and # 当前是阴线 close_prev > open_prev and # 前一个是阳线 open_latest >= close_prev and # 开盘价高于前一个收盘价 close_latest <= open_prev): # 收盘价低于前一个开盘价 bearish_signals += 2 reversal_signals.append({ 'type': 'candlestick', 'direction': 'bearish', 'weight': 2, 'desc': '阴包阳吞没形态(强反转信号)' }) # 锤子线/倒锤子 body_size = abs(close_latest - open_latest) upper_shadow = df_15m['high'].iloc[-1] - max(open_latest, close_latest) lower_shadow = min(open_latest, close_latest) - df_15m['low'].iloc[-1] # 锤子线(看涨) if lower_shadow >= body_size * 2 and upper_shadow < body_size * 0.5: bullish_signals += 1 reversal_signals.append({ 'type': 'candlestick', 'direction': 'bullish', 'weight': 1, 'desc': '锤子线(底部反转信号)' }) # 倒锤子(看跌) if upper_shadow >= body_size * 2 and lower_shadow < body_size * 0.5: bearish_signals += 1 reversal_signals.append({ 'type': 'candlestick', 'direction': 'bearish', 'weight': 1, 'desc': '倒锤子线(顶部反转信号)' }) # ========== 5. 多周期趋势不一致 ========== trend_15m = self._get_trend_direction(df_15m) trend_30m = self._get_trend_direction(df_30m) trend_1h = self._get_trend_direction(df_1h) # 小周期反转但大周期未反应 if trend_15m and trend_1h and trend_15m != trend_1h: if trend_15m == 'bull' and trend_1h == 'bear': bullish_signals += 1 reversal_signals.append({ 'type': 'timeframe_divergence', 'direction': 'bullish', 'weight': 1, 'desc': '15分钟转多但1小时仍看空(潜在反转)' }) elif trend_15m == 'bear' and trend_1h == 'bull': bearish_signals += 1 reversal_signals.append({ 'type': 'timeframe_divergence', 'direction': 'bearish', 'weight': 1, 'desc': '15分钟转空但1小时仍看多(潜在反转)' }) # ========== 计算反转信号强度 ========== total_signals = len(reversal_signals) if total_signals >= 3: # 至少3个反转信号才认为可能反转 if bullish_signals >= bearish_signals + 2: result['is_reversing'] = True result['reversal_type'] = 'bullish_reversal' result['confidence'] = min(90, bullish_signals * 15) result['signals'] = [s for s in reversal_signals if s['direction'] == 'bullish'] elif bearish_signals >= bullish_signals + 2: result['is_reversing'] = True result['reversal_type'] = 'bearish_reversal' result['confidence'] = min(90, bearish_signals * 15) result['signals'] = [s for s in reversal_signals if s['direction'] == 'bearish'] if reversal_signals: result['analysis'] = f"检测到 {len(reversal_signals)} 个反转信号" else: result['analysis'] = "无反转信号" except Exception as e: logger.warning(f"趋势反转检测失败: {e}") import traceback logger.debug(traceback.format_exc()) return result def _get_trend_direction(self, df: pd.DataFrame) -> str: """获取趋势方向:bull/bear/neutral""" if df is None or len(df) < 10: return 'neutral' try: # 使用EMA判断 ma5 = df['ma5'].iloc[-1] if 'ma5' in df.columns else None ma10 = df['ma10'].iloc[-1] if 'ma10' in df.columns else None ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns else None if ma5 and ma10 and ma20: if ma5 > ma10 > ma20: return 'bull' elif ma5 < ma10 < ma20: return 'bear' # 使用MACD判断 if 'macd' in df.columns and 'macd_signal' in df.columns: macd = df['macd'].iloc[-1] signal = df['macd_signal'].iloc[-1] if macd > signal and macd > 0: return 'bull' elif macd < signal and macd < 0: return 'bear' except Exception as e: logger.debug(f"趋势方向判断失败: {e}") return 'neutral' def _detect_trend_stage(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: """ 检测趋势阶段:早期/中期/晚期 判断标准: 1. 早期:刚突破关键位,均线刚开始排列,动能开始释放 2. 中期:均线排列稳定,价格沿趋势移动,量能健康 3. 晚期:价格过度延伸,RSI极端区,量价背离,多次假突破 """ result = { 'stage': 'unknown', # 'early', 'middle', 'late' 'confidence': 0, 'signals': [], 'analysis': '' } try: df_30m = data.get('30m') df_1h = data.get('1h') if df_30m is None or len(df_30m) < 30: return result current_price = float(df_30m['close'].iloc[-1]) stage_signals = [] early_score = 0 middle_score = 0 late_score = 0 # ========== 1. EMA 排列状态 ========== ema5 = df_30m['ma5'].iloc[-1] if 'ma5' in df_30m.columns else None ema10 = df_30m['ma10'].iloc[-1] if 'ma10' in df_30m.columns else None ema20 = df_30m['ma20'].iloc[-1] if 'ma20' in df_30m.columns else None ema50 = df_30m['ma50'].iloc[-1] if 'ma50' in df_30m.columns else None if all([ema5, ema10, ema20, ema50]): # 检查EMA排列是否形成 if ema5 > ema10 > ema20 > ema50: # 多头排列 # 检查排列刚刚形成(早期)还是已经稳定(中期/晚期) ema5_cross_ma20 = False if len(df_30m) >= 10: # 检查最近10根内是否发生过金叉 for i in range(-10, 0): if df_30m['ma5'].iloc[i] > df_30m['ma20'].iloc[i]: if i > -10 and df_30m['ma5'].iloc[i-1] <= df_30m['ma20'].iloc[i-1]: ema5_cross_ma20 = True break if ema5_cross_ma20: early_score += 30 stage_signals.append("EMA排列刚形成(早期)") else: # 检查EMA间距 ema_spread = (ema5 - ema20) / ema20 * 100 if ema_spread > 3: late_score += 20 stage_signals.append(f"EMA间距过大({ema_spread:.1f}%) - 可能过度延伸") else: middle_score += 20 stage_signals.append("EMA排列稳定(中期)") elif ema5 < ema10 < ema20 < ema50: # 空头排列 ema5_cross_ma20 = False if len(df_30m) >= 10: for i in range(-10, 0): if df_30m['ma5'].iloc[i] < df_30m['ma20'].iloc[i]: if i > -10 and df_30m['ma5'].iloc[i-1] >= df_30m['ma20'].iloc[i-1]: ema5_cross_ma20 = True break if ema5_cross_ma20: early_score += 30 stage_signals.append("EMA排列刚形成(早期)") else: ema_spread = (ema20 - ema5) / ema20 * 100 if ema_spread > 3: late_score += 20 stage_signals.append(f"EMA间距过大({ema_spread:.1f}%) - 可能过度延伸") else: middle_score += 20 stage_signals.append("EMA排列稳定(中期)") # ========== 2. RSI 状态 ========== if 'rsi' in df_30m.columns: rsi_current = df_30m['rsi'].iloc[-1] rsi_prev = df_30m['rsi'].iloc[-5:-1].values # RSI极端区 - 晚期信号 if rsi_current > 70: late_score += 25 stage_signals.append(f"RSI超买({rsi_current:.0f}) - 趋势晚期") elif rsi_current < 30: late_score += 25 stage_signals.append(f"RSI超卖({rsi_current:.0f}) - 趋势晚期") elif 50 <= rsi_current <= 65: middle_score += 15 stage_signals.append(f"RSI健康({rsi_current:.0f}) - 趋势中期") elif 40 <= rsi_current <= 60: early_score += 10 stage_signals.append(f"RSI中性({rsi_current:.0f}) - 可能早期") # RSI趋势检查 if len(rsi_prev) >= 3: rsi_trend = "up" if rsi_current > rsi_prev[-1] else "down" if rsi_current < rsi_prev[-1] else "flat" if rsi_trend == "flat": late_score += 10 stage_signals.append("RSI走平 - 动能衰竭") # ========== 3. 价格偏离度 ========== if ema20: deviation = abs(current_price - ema20) / ema20 * 100 if deviation > 5: late_score += 30 stage_signals.append(f"价格偏离EMA20 {deviation:.1f}% - 过度延伸") elif deviation > 3: late_score += 15 stage_signals.append(f"价格偏离EMA20 {deviation:.1f}% - 警戒区域") elif deviation < 1: if early_score < middle_score: # 只在不是明显早期时加分 middle_score += 10 stage_signals.append("价格贴近EMA20 - 趋势稳固") # ========== 4. 量价关系 ========== if 'volume' in df_30m.columns and len(df_30m) >= 10: recent_vol = df_30m['volume'].iloc[-5:].mean() older_vol = df_30m['volume'].iloc[-10:-5].mean() vol_change = (recent_vol - older_vol) / older_vol * 100 price_change_5 = (df_30m['close'].iloc[-1] - df_30m['close'].iloc[-5]) / df_30m['close'].iloc[-5] * 100 # 价格上涨但成交量下降(量价背离)- 晚期信号 if price_change_5 > 1 and vol_change < -20: late_score += 20 stage_signals.append(f"量价背离(涨{price_change_5:.1f}%量减{vol_change:.0f}%)- 可能见顶") elif price_change_5 < -1 and vol_change < -20: late_score += 20 stage_signals.append(f"量价背离(跌{price_change_5:.1f}%量减{vol_change:.0f}%)- 可能见底") elif price_change_5 > 1 and vol_change > 30: early_score += 15 stage_signals.append(f"放量上涨(涨{price_change_5:.1f}%量增{vol_change:.0f}%)- 可能早期") elif price_change_5 < -1 and vol_change > 30: early_score += 15 stage_signals.append(f"放量下跌(跌{price_change_5:.1f}%量增{vol_change:.0f}%)- 可能早期") # ========== 5. 波动率状态 ========== if 'atr' in df_30m.columns and len(df_30m) >= 20: recent_atr = df_30m['atr'].iloc[-5:].mean() older_atr = df_30m['atr'].iloc[-15:-5].mean() atr_change = (recent_atr - older_atr) / older_atr * 100 if older_atr > 0 else 0 if atr_change > 30: early_score += 10 stage_signals.append(f"ATR扩张({atr_change:.0f}%) - 趋势启动") elif atr_change < -30: late_score += 10 stage_signals.append(f"ATR收缩({atr_change:.0f}%) - 动能衰竭") # ========== 6. 连续同向K线数量 ========== if len(df_30m) >= 5: recent_closes = df_30m['close'].iloc[-5:].values consecutive_up = sum(1 for i in range(1, len(recent_closes)) if recent_closes[i] > recent_closes[i-1]) consecutive_down = sum(1 for i in range(1, len(recent_closes)) if recent_closes[i] < recent_closes[i-1]) if consecutive_up >= 4: late_score += 15 stage_signals.append(f"连续{consecutive_up}根阳线 - 可能过度") elif consecutive_down >= 4: late_score += 15 stage_signals.append(f"连续{consecutive_down}根阴线 - 可能过度") # ========== 综合判断趋势阶段 ========== scores = { 'early': early_score, 'middle': middle_score, 'late': late_score } max_score = max(scores.values()) if max_score < 20: result['stage'] = 'unknown' result['analysis'] = "趋势阶段不明确" elif max_score == late_score and late_score >= 40: result['stage'] = 'late' result['confidence'] = min(95, late_score) result['signals'] = stage_signals result['analysis'] = f"⚠️ 趋势晚期({late_score}分)- " + "; ".join(stage_signals[:3]) elif max_score == early_score and early_score >= 30: result['stage'] = 'early' result['confidence'] = min(90, early_score) result['signals'] = stage_signals result['analysis'] = f"趋势早期({early_score}分)- " + "; ".join(stage_signals[:3]) else: result['stage'] = 'middle' result['confidence'] = min(85, middle_score) result['signals'] = stage_signals result['analysis'] = f"趋势中期({middle_score}分)- " + "; ".join(stage_signals[:3]) except Exception as e: logger.warning(f"趋势阶段检测失败: {e}") import traceback logger.debug(traceback.format_exc()) return result