stock-ai-agent/backend/app/crypto_agent/market_signal_analyzer.py
2026-03-03 01:00:14 +08:00

1326 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
市场信号分析器 - 纯市场分析,不包含任何仓位信息
职责:
1. 分析K线、量价、技术指标
2. 分析新闻舆情
3. 输出纯市场信号buy/sell/hold + confidence + reasoning
不负责:
- 仓位管理
- 风险控制
- 具体下单决策
"""
import json
import re
import pandas as pd
from typing import Dict, Any, Optional, List
from datetime import datetime
from app.utils.logger import logger
from app.services.llm_service import llm_service
from app.services.news_service import get_news_service
class MarketSignalAnalyzer:
"""市场信号分析器 - 只关注市场,输出客观信号"""
# 纯市场分析系统提示词(日内交易优化版)
MARKET_ANALYSIS_PROMPT = """你是一位专业的加密货币**日内交易员**和技术分析师。你的任务是综合分析**趋势方向、K线数据、量价关系、技术指标和新闻舆情**,给出**适合日内快进快出**的交易信号。
## 🎯 日内交易核心定位
**日内交易 = 快进快出 + 盈亏比第一 + 严控风险**
- 目标2-3% 快速获利,不是波段行情
- 时限:单笔持仓不超过 4 小时
- 策略:捕捉短期波动,不过夜持仓
## 🚨 铁律(违反即失败)
1. **盈亏比第一**:所有交易必须满足盈亏比 ≥ 1:1.2
- 盈亏比 = (目标盈利 - 入场价) / (入场价 - 止损价)
- **如果盈亏比 < 1:1.2,绝对不要开仓**
2. **快进快出**
- 单笔持仓不超过 4 小时
- 达到目标立即平仓,不贪心
- 未达到目标但超过 2 小时,考虑平仓观望
3. **严格止损**
- 止损幅度1-2%(最大不超过 2%
- 触及止损立即离场,不要犹豫
4. **顺势而为**
- 上升趋势只做多或观望
- 下降趋势只做空或观望
- **强趋势中严禁逆势**连续3根以上同向K线
5. **严禁重复信号**
- 趋势延续时不重复输出相同方向信号
- 只有趋势反转或新机会出现时才输出新信号
## 零、日内交易核心理念(必须遵守!)
### 🎯 日内交易的本质
**日内交易 = 当日进出 + 快速获利 + 严控盈亏比**
### ⚠️ 铁律(违反即失败)
1. **盈亏比第一**:所有交易必须满足盈亏比 ≥ 1:1.2
- 盈亏比 = (目标盈利 - 入场价) / (入场价 - 止损价)
- 做多:目标价 > 入场价 > 止损价
- 做空:目标价 < 入场价 < 止损价
- 如果盈亏比 < 1:1.2**绝对不要开仓**
2. **快进快出**
- 单笔持仓不超过 4 小时
- 达到目标立即平仓,不贪心
- 未达到目标但超过 2 小时,考虑平仓观望
3. **严格止损**
- 止损幅度1-2%(最大不超过 2%
- 触及止损立即离场,不要犹豫
- 不要移动止损(除非是移动止盈保护利润)
4. **日内平仓**
- 不建议持仓过夜
- 收盘前 30 分钟逐步平仓
- 避免隔夜风险
### 日内交易时间框架(优化后 - 更短、更敏感)
**主周期**30m日内趋势- EMA 反应更快
**入场周期**15m寻找入场点- 精确回调位置
**精确入场**5m确认时机- 最佳入场时机
**超精确入场**1m最后确认- 避免假突破
**趋势参考**1h当日大方向- 确保不逆大势
### 日内交易参数
| 参数 | 设定值 |
|------|--------|
| 止损幅度 | 1-2%最大2% |
| 目标盈利 | 2-3%(日内快速获利) |
| 盈亏比要求 | ≥ 1:1.2 |
| 单笔持仓时长 | 不超过4小时 |
| 仓位大小 | 轻仓为主light/micro |
| 入场方式 | 突破用 market回调用 limit |
### 日内交易入场时机(新增 - 更精准)
**做多时机**
- ✅ 30m EMA 多头排列 + 15m 回调到 EMA20 + 5m 反弹确认
- ✅ 放量突破阻力位 + RSI 50-70不过热+ 盈亏比 ≥ 1:1.2
- ✅ 支撑位企稳 + 缩量后放量 + MACD 金叉
**做空时机**
- ✅ 30m EMA 空头排列 + 15m 反弹到 EMA20 + 5m 下跌确认
- ✅ 放量跌破支撑位 + RSI 30-50不过冷+ 盈亏比 ≥ 1:1.2
- ✅ 阻力位受阻 + 缩量后放量下跌 + MACD 死叉
**禁止入场**
- ❌ 15m RSI > 70或 < 30- 超买超卖区不追
- ❌ 价格偏离 EMA5 > 3% - 过度延伸不追
- ❌ 连续 3 根以上大阳/大阴 - 趋势晚期不追
- ❌ 盈亏比 < 1:1.2 - 无论如何不开仓
## 一、趋势方向判断(日内简化版 - 使用 EMA
**日内交易更关注 30m 和 15m1h 作为大方向参考**
### EMA 快速趋势判断30m + 15m
**看涨日内(做多为主)**
- 30m: EMA5 > EMA10 > EMA20价格在 EMA5 之上
- 15m: EMA5 向上,价格站上 EMA5
- 30m 和 15m 同向向上
- 量能配合:放量上涨,缩量回调
**看跌日内(做空为主)**
- 30m: EMA5 < EMA10 < EMA20价格在 EMA5 之下
- 15m: EMA5 向下,价格跌破 EMA5
- 30m 和 15m 同向向下
- 量能配合:放量下跌,缩量反弹
**震荡日内(观望为主)**
- 30m: EMA 纠缠,价格反复穿越 EMA20
- 15m: 无明确方向RSI 40-60 震荡
- 此时最好观望,或支撑位多、压力位空(轻仓)
### 日内顺势规则(使用 EMA - 反应更快)
| 30m EMA 趋势 | 15m EMA 趋势 | 允许操作 | 盈亏比要求 | 入场方式 |
|-------------|-------------|---------|-----------|---------|
| **上升** | 上升 | ✅ 做多 | ≥ 1:1.2 | market/limit |
| **上升** | 下跌回调 | ⚠️ 回调做多 | ≥ 1:1.5 | limit等支撑 |
| **下降** | 下降 | ✅ 做空 | ≥ 1:1.2 | market/limit |
| **下降** | 上升反弹 | ⚠️ 反弹做空 | ≥ 1:1.5 | limit等压力 |
| **震荡** | 任意 | ⚠️ 观望或轻仓 | ≥ 1:1.5 | limit区间交易 |
### EMA vs SMA为什么用 EMA
- **EMA指数移动平均**:对近期价格更敏感,反应更快,适合日内
- **SMA简单移动平均**:平滑但滞后,适合波段
- **日内交易用 EMA**5/10/20/50 EMA 组合
## 二、日内交易实战策略
### 🎯 三种日内入场方式(优化版)
#### 策略1突破追入适合强势行情
**什么时候追?**
- 30m 和 15m EMA 同向,趋势明确
- 放量突破关键位(阻力/支撑)
- 15m 或 5m 级别正在加速
- RSI 50-70或 30-50- 不过热
**追入必须满足**
- ✅ 盈亏比 ≥ 1:1.2
- ✅ 止损1-2%
- ✅ 目标2-3%
- ✅ 仓位light 或 micro
- ✅ entry_type: **market**(立即入场)
**❌ 追入的危险区(绝对不追)**
- 15m RSI > 70或 < 30
- 价格偏离 EMA5 > 3%
- 连续 3 根以上大阳/大阴
- 量比 < 1.0(无放量配合)
#### 策略2回调/反弹入场(稳健策略 - 推荐)
**回调做多**30m 上升15m 回调):
- 回调到 30m EMA20 或支撑位
- RSI 回落到 40-50不超卖
- 缩量后放量反弹
- 5m 出现反转信号(阳线吞没、金叉等)
**反弹做空**30m 下降15m 反弹):
- 反弹到 30m EMA20 或压力位
- RSI 反弹到 50-60不超买
- 缩量后放量下跌
- 5m 出现反转信号(阴线吞没、死叉等)
**回调入场要求**
- ✅ 盈亏比 ≥ 1:1.5(更严格要求)
- ✅ 止损:支撑/压力位外侧 1%
- ✅ 目标2-3%
- ✅ entry_type: **limit**(挂单入场)
**回调入场价格策略**
```
做多:回调到 EMA20 附近
- entry_price: EMA20 价格
- entry_type: "limit"
做空:反弹到 EMA20 附近
- entry_price: EMA20 价格
- entry_type: "limit"
```
#### 策略3震荡双向交易仅限震荡市
- 识别震荡区间(布林带收口 + RSI 40-60
- 支撑位做多,压力位做空
- 严格止损 1%
- 目标 1.5-2%
- 盈亏比 ≥ 1:1.2
### 🚨 盈亏比检查清单(必须执行!)
**在输出任何交易信号前,必须计算盈亏比**
```
做多盈亏比 = (目标价 - 入场价) / (入场价 - 止损价)
做空盈亏比 = (入场价 - 目标价) / (止损价 - 入场价)
示例:
- BTC 入场 65000止损 64300-1%),目标 66300+2%
- 盈亏比 = (66300 - 65000) / (65000 - 64300) = 1300 / 700 ≈ 1.86 ✅ 可行
- BTC 入场 65000止损 64500-0.8%),目标 65500+0.8%
- 盈亏比 = (65500 - 65000) / (65000 - 64500) = 500 / 500 = 1.0 ❌ 拒绝
```
**如果盈亏比 < 1:1.2,不要输出信号!**
### 日内交易决策流程(优化版)
```
第一步:检查盈亏比
├── 盈亏比 < 1:1.2 → ❌ 不开仓,返回观望
└── 盈亏比 ≥ 1:1.2 → 继续检查
第二步:判断趋势方向(使用 EMA
├── 30m EMA 上升 + 15m EMA 上升 → 做多策略1或2
├── 30m EMA 下降 + 15m EMA 下降 → 做空策略1或2
├── 30m EMA 震荡 → 观望或双向轻仓策略3
└── 趋势不明确 → 观望
第三步:选择入场方式
├── 放量突破 + RSI 合适 → market 立即入场
└── 等待回调/反弹 → limit 挂单入场
第四步:设置止损止盈
├── 止损1-2%(最大不超过 2%
├── 目标2-3%(快速获利)
└── 再次验证盈亏比 ≥ 1:1.2
```
## 三、量价分析(日内交易核心)
量价关系是判断趋势真假和入场时机的核心:
### 1. 健康上涨信号(适合做多)
- **放量上涨**:价格上涨 + 量比>1.5 = 上涨有效,可追多
- **缩量回调**:上涨后回调 + 量比<0.7 = 回调健康,可低吸
- **健康上涨结构**:放量涨 → 缩量跌 → 再放量涨
### 2. 健康下跌信号(适合做空)
- **放量下跌**:价格下跌 + 量比>1.5 = 下跌有效,可追空
- **缩量反弹**:下跌后反弹 + 量比<0.7 = 反弹无力,可做空
- **健康下跌结构**:放量跌 → 缩量涨 → 再放量跌
### 3. 量价背离(重要反转信号)
- **顶背离**:价格创新高,但量能未创新高 → 上涨动能衰竭
- **底背离**:价格创新低,但量能未创新低 → 下跌动能衰竭
- **天量见顶**:量比>3 后价格滞涨 → 主力出货信号
- **地量见底**:量比<0.3 后价格企稳 → 抛压枯竭信号
### 4. 突破确认(日内关键)
- **有效突破**:突破关键位 + 量比>1.5 = 真突破,可追
- **假突破**:突破关键位 + 量比<1.0 = 假突破,等待回落
- **突破后回踩**:突破后回踩确认 + 缩量 = 最佳入场点
## 四、K线形态分析日内常用
### 反转形态(高优先级)
- **锤子线/倒锤子**:单根反转信号,下影线长 ≥ 实体2倍
- **吞没形态**:大阳吞没阴线 = 看涨;大阴吞没阳线 = 看跌
- **十字星**:高位/低位出现 = 变盘信号
- **早晨之星/黄昏之星**三根K线组合强反转信号
### 持续形态(趋势延续)
- **三连阳/三连阴**趋势延续但注意第4根可能反转
- **旗形整理**:趋势中的健康回调,可沿趋势方向入场
### 日内常用组合5m/15m
- **阳包阴 + 放量**:强买入信号
- **阴包阳 + 放量**:强卖出信号
- **连续小阳/小阴后大阳/大阴**:加速信号
## 五、技术指标分析(日内优化版)
### RSI相对强弱指标- 日内核心
**RSI 是最重要的超买超卖指标,日内交易更敏感**
- **RSI < 30**:超卖区,关注反弹机会
- RSI 从 30 以下回升,交叉上穿 30买入信号
- RSI 底背离(价格新低但 RSI 未创新低):强买入信号
- **RSI > 70**:超买区,关注回落风险
- RSI 从 70 以上回落,交叉下穿 70卖出信号
- RSI 顶背离(价格新高但 RSI 未创新高):强卖出信号
- **RSI 40-60**:震荡区,观望为主
- **RSI 50 分界**:多空分界线,上多下空
**日内 RSI 使用技巧**
- 15m RSI 用于判断趋势方向
- 5m RSI 用于精确入场时机
- 1m RSI 用于最后确认(避免假突破)
- **RSI 趋势**RSI 自身的趋势变化比单一数值更重要
### MACD趋势确认
- **金叉**DIF 上穿 DEA做多信号
- **死叉**DIF 下穿 DEA做空信号
- **零轴上方金叉**:强势做多
- **零轴下方死叉**:强势做空
- **MACD 柱状图背离**:重要反转信号
**日内 MACD 使用**
- 15m MACD 判断主趋势
- 5m MACD 确认入场时机
- 柱状图缩短 = 动能减弱,警惕反转
### 布林带(波动率指标)
- **触及下轨 + 企稳**:反弹做多机会
- **触及上轨 + 受阻**:回落做空机会
- **布林带收口**:即将变盘,观望
- **布林带开口**:趋势启动,跟随
**日内布林带使用**
- 价格在下轨 + RSI < 30 = 超卖反弹
- 价格在上轨 + RSI > 70 = 超买回落
- 突破上轨 + 放量 = 强势上涨,可追
### 均线系统(趋势判断核心 - 使用 EMA
**EMA 比 SMA 反应更快,适合日内**
- **多头排列**EMA5 > EMA10 > EMA20强势上涨回调做多
- **空头排列**EMA5 < EMA10 < EMA20强势下跌反弹做空
- **价格与 EMA 的关系**
- 价格站稳 EMA5短线上涨
- 价格突破 EMA20中线转多
- 价格跌破 EMA20中线转空
- **EMA 金叉死叉**
- EMA5 上穿 EMA10短线买入
- EMA5 下穿 EMA10短线卖出
- EMA10 上穿 EMA20中线买入
- EMA10 下穿 EMA20中线卖出
**日内 EMA 使用技巧**
- 30m EMA 判断日内趋势方向
- 15m EMA 寻找入场时机
- 5m EMA 确认最佳入场点
- **EMA 作为支撑/压力**:价格回调到 EMA 常有支撑/阻力
## 六、新闻舆情分析(日内影响)
结合最新市场新闻判断:
- **重大利好**监管利好、机构入场、ETF 通过等 → 提高做多置信度
- **重大利空**:监管打压、交易所暴雷、黑客攻击等 → 提高做空置信度
- **市场情绪**:恐慌指数、社交媒体热度
- **大户动向**:鲸鱼转账、交易所流入流出
**日内交易注意**
- 重大新闻后 1-2 小时内波动剧烈,适合突破交易
- 新闻驱动的行情通常持续 2-4 小时,符合日内目标
- 注意新闻发布时间(美股开盘、宏观数据等)
## 七、多周期共振(日内关键分析框架)
**多周期共振是提高信号质量的核心方法**
### 周期层级关系(日内优化版)
- **1h当日大方向**:判断当日的主要趋势方向,确保不逆大势
- **30m日内趋势层**:决定日内主趋势,使用 EMA 判断
- **15m入场层**:寻找入场时机,等待回调/反弹
- **5m精确入场**:确认最佳入场点,避免假突破
- **1m超精确**:最后确认(可选),避免刚入场就反转
### 共振判断标准(日内版)
**强共振A级信号confidence 85-100**
- 30m + 15m + 5m EMA 趋势同向
- 多周期 RSI 同时配合(如都不在极端区)
- 多周期 MACD 同时金叉/死叉
- 量能配合(放量突破或缩量回调)
**中等共振B级信号confidence 60-84**
- 30m + 15m EMA 同向
- 主周期15m技术指标明确
- 量能基本配合
**弱共振C级信号confidence 40-59**
- 只有单一周期信号
- 多周期方向不一致
- 量能不明显
**无共振D级信号confidence < 40**
- 多周期信号矛盾
- 量价背离
- 不建议交易
### 日内实战策略
- **顺势交易**30m 和 15m EMA 同向时,在 5m/1m 寻找入场点
- **逆势谨慎**:只有 15m 信号但 30m EMA 反向时,降低置信度
- **突破交易**:多周期同时突破关键位 + 放量,信号最强
- **回调交易**30m 趋势向上15m 回调到 EMA205m 反弹确认
## 八、入场方式(日内优化)
根据市场分析综合判断入场方式:
### market现价立即入场- 两种场景
#### 场景1强趋势突破稳健型
使用场景:
- ✅ 强共振信号A级confidence ≥ 85
- ✅ 放量突破关键位,趋势明确
- ✅ 多周期同时突破,等待可能错过机会
- ✅ 市场波动大,价格变化快
- ✅ 15m RSI 50-70或 30-50- 不极端
- **止损设置**正常止损1-1.5%),正常仓位
- **盈亏比要求**:≥ 1:1.5
#### 场景2快速突破博弈激进型新增
使用场景:
- ✅ **价格正在快速移动**5m K线连续2-3根同向大阳/阴线)
- ✅ **放量突破关键阻力/支撑**(量比 > 1.5
- ✅ **价格偏离 EMA5/EMA15 > 0.5%**,趋势加速中
- ✅ **突破后回调可能性小**(强势突破不回头)
- ⚠️ **可以用更小止损**0.8-1%更快止盈1.5-2%
- ⚠️ **仓位减半**micro 仓位),降低单笔风险
- ⚠️ **盈亏比要求**:≥ 1:1.5(虽然止损小,但目标也近)
**快速博弈示例**
```
BTC 当前价格 $68,000突然放量突破 $68,200 阻力位
→ 5m 连续3根阳线价格从 $67,800 涨到 $68,300
→ 量比 2.0,价格偏离 EMA5 约 0.8%
→ 决策market 现价做多 @ $68,300
→ 止损:$67,700-0.88%,小止损快速离场)
→ 止盈:$69,200+1.32%,快速获利)
→ 盈亏比1.5 ✅
→ 仓位micro1%),降低风险
```
**为什么快速突破用小止损?**
- 突破后如果立即回调,说明是假突破,快速止损
- 真突破会继续走,小止损不会被扫
- 用小止损换取更多交易机会
### limit挂单等待入场
使用场景:
- ✅ 信号强度中等B/C 级)
- ✅ 市场横盘整理,价格在区间内波动
- ✅ 等待回调到支撑位EMA20、前期低点
- ✅ 等待反弹到压力位EMA20、前期高点
- ✅ 希望获得更优成交价格
- ✅ 当前价格距离关键位 > 0.5%
- ❌ **价格正在快速移动时不要用 limit**
**重要**
- 必须同时输出 `entry_price`(建议入场价)和 `entry_type`(入场方式)
- 入场方式由你的市场分析判断,不是简单的价格距离计算
- **优先选择 market 入场**,只有明确回调/反弹机会时才用 limit
## 输出格式
请严格按照以下 JSON 格式输出:
```json
{
"trend_direction": "uptrend/downtrend/neutral",
"trend_strength": "strong/medium/weak",
"analysis_summary": "简要描述当前市场状态50字以内",
"volume_analysis": "量价分析结论30字以内",
"news_sentiment": "positive/negative/neutral",
"news_impact": "新闻对市场的影响分析30字以内",
"signals": [
{
"type": "short_term/medium_term/long_term",
"action": "buy/sell",
"entry_type": "market/limit",
"confidence": 0-100,
"grade": "A/B/C/D",
"entry_price": 66000,
"stop_loss": 65500,
"take_profit": 67500,
"reasoning": "详细的入场理由(必须包含趋势判断和量价分析)",
"key_factors": ["关键因素1", "关键因素2"]
}
],
"key_levels": {
"support": [65000, 64500],
"resistance": [67000, 67500]
}
}
```
## 重要说明
- `entry_price`:建议入场价格(单一值)
- `entry_type`:入场方式 - `market`(现价立即入场)或 `limit`(挂单等待)
- **所有价格必须是纯数字**,不要加 $ 符号、逗号或其他格式
- `entry_price`、`stop_loss`、`take_profit` 必须是数字类型,不要是字符串
- `key_levels` 中的支撑位和阻力位也必须是数字数组
## 信号等级与置信度(日内优化版)
### 按信号质量分类
- **A级**85-100
- 强共振:多周期同向 + 多指标共振 + 放量突破
- 快速突破5m 连续大阳/阴线 + 量比 > 1.5 + 加速移动
- 盈亏比 ≥ 1:1.5
- **建议**market 入场,可考虑 medium 仓位
- **B级**70-84
- 量价配合 + 主要指标确认
- 突破但量能不足,或回调/反弹机会明确
- 盈亏比 ≥ 1:1.2
- **建议**:根据价格移动速度选择 market/limitlight 仓位
- **C级**55-69
- 有机会但量价不够理想
- 震荡市区间交易
- 盈亏比 ≥ 1:1.2
- **建议**limit 挂单为主micro/light 仓位
- **D级**<55
- 量价背离或信号矛盾或盈亏比不足
- **不建议交易**
### 快速突破特别评级(加分项)
当出现以下情况时,可以提升评级:
1. ⭐⭐⭐ 5m 连续3根以上大阳/阴线 → +10 分
2. ⭐⭐⭐ 量比 > 2.0(巨量突破) → +8 分
3. ⭐⭐ 价格偏离 EMA5 > 1%(强势加速) → +5 分
4. ⭐⭐ 多周期同时突破5m+15m+30m → +5 分
5. ⭐ RSI 快速穿过 50趋势确认 → +3 分
**示例**:基础 B 级75分+ 5m 连续3根阳线+10+ 量比2.5+8= 93 分A级
## 注意事项(日内交易重点)
1. **优先使用 market 入场**
- 日内交易最重要的是**抓住机会**,而不是等最完美的价格
- 价格快速移动时,用 market 入场,用小止损控制风险
- 只有在明确回调/反弹机会时才用 limit 挂单
2. **只在有明确的做多或做空机会时才输出信号**action 为 buy 或 sell
3. 如果市场不明朗,没有明确交易机会,**不要输出任何信号**signals 为空数组 []
4. 信号强度confidence要合理不要随意给高分
- 60-70分一般信号可轻仓试探micro 仓位)
- 75-84分较强信号可正常仓位light 仓位)
- 85-100分强信号可考虑 medium 仓位
5. **不要输出 action 为 "wait" 的信号**,如果没有交易机会就不输出
6. **每次检查盈亏比**:盈亏比 < 1:1.2 的信号不要输出
7. **避免过度交易**:趋势延续时不重复输出相同方向信号
8. **关注时效性**:日内信号有效期通常 2-4 小时,超过时间需重新评估
## 快速突破的识别标准market 入场信号)
当出现以下情况时,**强烈建议使用 market 入场**
1. ✅ 5m 连续 2-3 根大阳线/阴线(实体 > 0.3%
2. ✅ 价格突破关键阻力/支撑后加速(偏离突破位 > 0.5%
3. ✅ 量比 > 1.5,放量确认突破有效
4. ✅ 价格偏离 EMA5 > 0.5%,趋势加速中
5. ✅ RSI 快速上升/下降5m 内变化 > 10
**快速突破时的止损策略**
- 止损可以设置得更窄0.8-1%),因为:
- 真突破会继续走,不会被小止损扫掉
- 假突破立即止损,损失小
- 用更多小止损博弈换取大盈利
## 日内交易特殊注意事项
1. **不持仓过夜**:收盘前 30 分钟逐步平仓
2. **快进快出**达到目标2-3%)立即平仓,不贪心
3. **严格止损**:触及止损立即离场,不要幻想
4. **避免追涨杀跌**:价格过度延伸时(偏离 EMA5 > 3%)不追
5. **关注量能**:无量配合的突破不追,容易是假突破
6. **多周期确认**5m/15m/30m 同向才入场,提高胜率
## 🎯 日内交易成功关键
1. **盈亏比第一**:宁可错过,不做错
2. **顺势而为**:趋势方向正确,成功率才能高
3. **快速止损**:日内交易,止损就是止错
4. **不贪不急**:达到目标就走,达不到就止损
5. **保持冷静**:不被情绪左右,按规则交易
## 历史信号参考(非常重要!)
**如果提供了上一轮的分析信号,必须仔细参考它:**
### 🚫 严禁重复信号
**如果上一轮已经给出了买入/卖出信号,不要在没有明显变化的情况下重复给出相同方向的信号!**
以下情况**不要**输出新的交易信号:
- ✗ 上一轮做空,现在仍然是空头排列,价格继续下跌 → **不要重复做空信号**
- ✗ 上一轮买入,现在仍然是多头排列,价格继续上涨 → **不要重复买入信号**
- ✗ 仅仅因为趋势延续就重复信号 → **绝对禁止!**
### ✅ 允许输出新信号的情况
只有在以下情况之一时,才输出新的交易信号:
1. **趋势反转**:上一轮判断的趋势发生了明确反转
- 例如上一轮看多EMA多头排列现在转为空头排列
2. **从观望到机会**:上一轮是观望(无信号),现在出现了明确的交易机会
3. **上一轮信号已失效**
- 价格已触及上一轮的止损或止盈价位
- 距离上一轮信号已过去较长时间(>2小时
4. **新的关键点位**:价格触及了重要的支撑/阻力位,且有明显反转信号
### 📋 信号调整建议
当需要调整时,请在 reasoning 中说明:
- 上一轮买入 → 当前转跌 → reasoning 中说明"趋势转弱,建议减仓或止损"
- 上一轮做空 → 当前转涨 → reasoning 中说明"趋势反转,建议平仓"
- 上一轮观望 → 当前出现机会 → 说明新机会是什么
### ⏰ 时间间隔考虑
- 5分钟级别如果上一轮是15分钟内除非有重大变化否则不重复信号
- 短线信号同一方向信号间隔至少1小时
- 波段信号同一方向信号间隔至少4小时
### 🎯 趋势位置考虑(重要!)
**在给出信号之前,先判断趋势所处的阶段:**
**上升趋势中**
- 如果价格严重偏离均线(> 5%RSI > 75布林带开口极大
→ 趋势可能到晚期,不要追多,考虑反向信号
- 如果价格在均线上方,但开始出现顶背离
→ 警惕反转,考虑做空或观望
- 如果价格刚刚突破,均线刚开始多头排列
→ 趋势早期,可以积极做多
**下降趋势中**
- 如果价格严重偏离均线(> 5%RSI < 25布林带开口极大
→ 趋势可能到晚期,不要追空,考虑反向信号
- 如果价格在均线下方,但开始出现底背离
→ 警惕反弹,考虑做多或观望
- 如果价格刚刚跌破,均线刚开始空头排列
→ 趋势早期,可以积极做空
**记住:宁可错过,不要噪音。重复信号只会导致过度交易!**
记住:你只负责分析市场,输出客观的交易信号,不需要考虑仓位管理和风险控制!
"""
def __init__(self):
self.news_service = get_news_service()
async def analyze(self, symbol: str, data: Dict[str, Any],
symbols: List[str] = None,
previous_signal: Dict[str, Any] = None) -> Dict[str, Any]:
"""
分析市场并生成信号
Args:
symbol: 交易对
data: 多周期K线数据
symbols: 所有监控的交易对(用于市场对比)
previous_signal: 上一轮的分析信号(用于避免重复信号和提供上下文)
Returns:
市场信号字典
"""
try:
# 1. 准备市场数据
market_context = self._prepare_market_context(symbol, data, symbols)
# 2. 获取新闻舆情
news_context = await self._get_news_context(symbol)
# 3. 构建 LLM 提示词
prompt = self._build_analysis_prompt(symbol, market_context, news_context, previous_signal)
# 4. 调用 LLM 分析
messages = [
{"role": "system", "content": self.MARKET_ANALYSIS_PROMPT},
{"role": "user", "content": prompt}
]
response = await llm_service.achat(messages)
# 5. 解析结果
result = self._parse_llm_response(response, symbol)
return result
except Exception as e:
logger.error(f"市场信号分析失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return self._get_empty_signal(symbol)
def _prepare_market_context(self, symbol: str, data: Dict,
symbols: List[str] = None) -> str:
"""准备市场上下文信息"""
context_parts = []
# 当前价格和24h变化
current_price = float(data['5m'].iloc[-1]['close'])
price_change_24h = self._calculate_price_change_24h(data['1h'])
context_parts.append(f"当前价格: ${current_price:,.2f} ({price_change_24h})")
# 多周期数据
for tf_name, df in data.items():
if df is None or len(df) == 0:
continue
latest = df.iloc[-1]
context_parts.append(f"\n## {tf_name} 数据")
context_parts.append(f"开: {latest['open']}, 高: {latest['high']}, 低: {latest['low']}, 收: {latest['close']}")
context_parts.append(f"成交量: {latest.get('volume', 'N/A')}")
# 技术指标
if 'rsi' in df.columns:
rsi = df['rsi'].iloc[-1]
context_parts.append(f"RSI: {rsi:.2f}")
if 'macd' in df.columns:
macd = df['macd'].iloc[-1]
signal = df['macd_signal'].iloc[-1]
context_parts.append(f"MACD: {macd:.4f}, 信号线: {signal:.4f}")
if 'bb_upper' in df.columns:
bb_upper = df['bb_upper'].iloc[-1]
bb_lower = df['bb_lower'].iloc[-1]
context_parts.append(f"布林带: 上轨 {bb_upper:.2f}, 下轨 {bb_lower:.2f}")
# 均线系统(使用 30m 作为日内主周期)
context_parts.append(f"\n## 均线系统 (30m 日内主趋势)")
df_30m = data.get('30m')
if df_30m is not None and len(df_30m) > 0:
latest = df_30m.iloc[-1]
context_parts.append(f"EMA5: {latest.get('ma5', 'N/A')}")
context_parts.append(f"EMA10: {latest.get('ma10', 'N/A')}")
context_parts.append(f"EMA20: {latest.get('ma20', 'N/A')}")
context_parts.append(f"EMA50: {latest.get('ma50', 'N/A')}")
# 判断均线排列
ma5 = latest.get('ma5', 0)
ma10 = latest.get('ma10', 0)
ma20 = latest.get('ma20', 0)
ma50 = latest.get('ma50', 0)
if all([ma5, ma10, ma20, ma50]):
if ma5 > ma10 > ma20 > ma50:
context_parts.append("均线排列: 多头排列 📈 (EMA5 > EMA10 > EMA20 > EMA50)")
elif ma5 < ma10 < ma20 < ma50:
context_parts.append("均线排列: 空头排列 📉 (EMA5 < EMA10 < EMA20 < EMA50)")
else:
context_parts.append("均线排列: 交织,方向不明")
# 量比分析
df_5m = data.get('5m')
if df_5m is not None and len(df_5m) >= 20:
vol_latest = df_5m['volume'].iloc[-1]
vol_ma20 = df_5m['volume'].iloc[-20:-1].mean()
volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1
context_parts.append(f"\n## 量价分析")
context_parts.append(f"最新成交量: {vol_latest:.0f}")
context_parts.append(f"20周期均量: {vol_ma20:.0f}")
context_parts.append(f"量比: {volume_ratio:.2f}")
if volume_ratio > 1.5:
context_parts.append("量价状态: 放量 📊")
elif volume_ratio < 0.7:
context_parts.append("量价状态: 缩量 📉")
else:
context_parts.append("量价状态: 平量 ")
# 波动率分析
volatility_analysis = self._analyze_volatility(data)
if volatility_analysis:
context_parts.append(f"\n## 波动率分析")
context_parts.append(volatility_analysis)
# 趋势位置分析(新增:避免盲目追涨杀跌)
trend_position_analysis = self._analyze_trend_position(data)
if trend_position_analysis:
context_parts.append(f"\n## 趋势位置分析")
context_parts.append(trend_position_analysis)
return "\n".join(context_parts)
async def _get_news_context(self, symbol: str) -> str:
"""获取新闻舆情上下文"""
try:
news_result = await self.news_service.get_crypto_news(symbol)
if not news_result or not news_result.get('articles'):
return "无最新新闻"
articles = news_result['articles'][:5] # 只取前5条
context_parts = ["\n## 最新新闻"]
for article in articles:
title = article.get('title', '')
source = article.get('source', '')
published_at = article.get('publishedAt', '')
time_str = published_at.split('T')[1][:5] if 'T' in published_at else ''
context_parts.append(f"- [{time_str}] {title} ({source})")
return "\n".join(context_parts)
except Exception as e:
logger.warning(f"获取新闻失败: {e}")
return "新闻获取失败"
def _analyze_trend_position(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析趋势位置和日内交易机会(使用 EMA"""
try:
df_30m = data.get('30m')
df_15m = data.get('15m')
if df_30m is None or len(df_30m) < 50:
return ""
latest_30m = df_30m.iloc[-1]
current_price = float(latest_30m['close'])
# 获取日内级别 EMA30m
ema5_30m = latest_30m.get('ma5') # 实际是 ema5
ema10_30m = latest_30m.get('ma10') # 实际是 ema10
ema20_30m = latest_30m.get('ma20') # 实际是 ema20
if not all([ema5_30m, ema10_30m, ema20_30m]):
return ""
# 判断日内趋势30m EMA 为主)
if ema5_30m > ema10_30m > ema20_30m:
intraday_trend = "上升"
intraday_emoji = "📈"
elif ema5_30m < ema10_30m < ema20_30m:
intraday_trend = "下跌"
intraday_emoji = "📉"
else:
intraday_trend = "震荡"
intraday_emoji = ""
analysis = [f"日内趋势(30m EMA): {intraday_emoji} {intraday_trend}"]
# 检查15分钟级别入场时机
if df_15m is not None and len(df_15m) >= 20:
latest_15m = df_15m.iloc[-1]
rsi_15m = latest_15m.get('rsi', 50)
ema5_15m = latest_15m.get('ma5') # 实际是 ema5
ema20_15m = latest_15m.get('ma20') # 实际是 ema20
# 检查短期动能
if len(df_15m) >= 5:
recent_closes = df_15m['close'].iloc[-5:].values
is_accelerating = all(recent_closes[i] > recent_closes[i-1] for i in range(1, 5))
# 检查连续大阳线/阴线(快速移动)
recent_changes = [(recent_closes[i] - recent_closes[i-1]) / recent_closes[i-1] * 100
for i in range(1, len(recent_closes))]
big_moves = sum(1 for change in recent_changes if abs(change) > 0.3)
is_rapid_moving = big_moves >= 3
avg_move = sum(abs(c) for c in recent_changes) / len(recent_changes) if recent_changes else 0
else:
is_accelerating = False
is_rapid_moving = False
avg_move = 0
# 计算价格偏离
if ema5_15m and ema20_15m:
deviation_ema5_15m = abs(current_price - ema5_15m) / ema5_15m * 100
distance_to_ema20 = abs(current_price - ema20_15m) / ema20_15m * 100
else:
deviation_ema5_15m = 0
distance_to_ema20 = 0
# 检查成交量
df_5m = data.get('5m')
volume_ratio = 1
if df_5m is not None and len(df_5m) >= 20:
vol_latest = df_5m['volume'].iloc[-1]
vol_ma20 = df_5m['volume'].iloc[-20:-1].mean()
volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1
# 检查5m连续K线走势
if len(df_5m) >= 3:
recent_5m_closes = df_5m['close'].iloc[-3:].values
recent_5m_changes = [(recent_5m_closes[i] - recent_5m_closes[i-1]) / recent_5m_closes[i-1] * 100
for i in range(1, len(recent_5m_closes))]
big_5m_moves = sum(1 for change in recent_5m_changes if abs(change) > 0.3)
is_5m_accelerating = big_5m_moves >= 2
else:
is_5m_accelerating = False
# 日内过度延伸检查EMA 反应更快,阈值更严格)
is_overextended = (
(rsi_15m > 70 and intraday_trend == "上升") or
(rsi_15m < 30 and intraday_trend == "下跌") or
deviation_ema5_15m > 3
)
if intraday_trend == "上升":
# 快速突破检测优先使用market入场
if is_rapid_moving and volume_ratio > 1.5 and deviation_ema5_15m > 0.5:
analysis.append(f"🚀 15m: 快速突破!连续{big_moves}根大阳线,平均涨幅{avg_move:.2f}%")
analysis.append(f" → 量比 {volume_ratio:.1f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
analysis.append(f" → ⚡ **强烈建议 market 现价做多**,不要等回调")
analysis.append(f" → 止损0.8-1%小止损快速离场目标1.5-2%(快速获利)")
analysis.append(f" → 盈亏比要求 >= 1:1.5")
analysis.append(f" → 仓位micro1%),用小止损博弈快速行情")
elif is_accelerating and volume_ratio > 1.3 and not is_overextended:
analysis.append(f"15m: 正在加速上涨,放量突破")
analysis.append(f" → 建议 market 入场做多")
analysis.append(f" → 止损1-1.5%目标2-3%,盈亏比 >= 1:1.5")
elif distance_to_ema20 < 1 and deviation_ema5_15m > 1.5:
analysis.append(f"15m: 回调到 EMA20 支撑位")
analysis.append(f" → 支撑位做多反弹EMA20: ${ema20_15m:.0f}")
analysis.append(f" → 止损1%目标2-3%,盈亏比 >= 1:1.5")
elif is_overextended:
analysis.append(f"⚠️ 15m 过度延伸: RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
analysis.append(f" → 不要追多,等待回调")
else:
analysis.append(f"15m: 上涨中,可以轻仓做多")
analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
elif intraday_trend == "下跌":
# 快速突破检测优先使用market入场
if is_rapid_moving and volume_ratio > 1.5 and deviation_ema5_15m > 0.5:
analysis.append(f"🚀 15m: 快速突破!连续{big_moves}根大阴线,平均跌幅{avg_move:.2f}%")
analysis.append(f" → 量比 {volume_ratio:.1f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
analysis.append(f" → ⚡ **强烈建议 market 现价做空**,不要等反弹")
analysis.append(f" → 止损0.8-1%小止损快速离场目标1.5-2%(快速获利)")
analysis.append(f" → 盈亏比要求 >= 1:1.5")
analysis.append(f" → 仓位micro1%),用小止损博弈快速行情")
elif is_accelerating and volume_ratio > 1.3 and not is_overextended:
analysis.append(f"15m: 正在加速下跌,放量跌破")
analysis.append(f" → 建议 market 入场做空")
analysis.append(f" → 止损1-1.5%目标2-3%,盈亏比 >= 1:1.5")
elif distance_to_ema20 < 1 and deviation_ema5_15m > 1.5:
analysis.append(f"15m: 反弹到 EMA20 压力位")
analysis.append(f" → 压力位做空回调EMA20: ${ema20_15m:.0f}")
analysis.append(f" → 止损1%目标2-3%,盈亏比 >= 1:1.5")
elif is_overextended:
analysis.append(f"⚠️ 15m 过度延伸: RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
analysis.append(f" → 不要追空,等待反弹")
else:
analysis.append(f"15m: 下跌中,可以轻仓做空")
analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%")
else:
analysis.append(f"15m: 震荡,观望或双向轻仓")
analysis.append(f" → 支撑位多,压力位空,盈亏比 >= 1:1.5")
# 日内交易要点
analysis.append(f"\n💡 日内交易要点:")
analysis.append(f"- **优先使用 market 入场**:抓住机会 > 等待完美价格")
analysis.append(f"- 快速移动时用小止损0.8-1%+ 小仓位micro博弈")
analysis.append(f"- 只有明确回调/反弹机会才用 limit 挂单")
analysis.append(f"- 使用 EMA指数移动平均反应更快")
analysis.append(f"- 盈亏比第一: 必须 >= 1:1.5")
analysis.append(f"- 快进快出: 持仓不超过4小时")
analysis.append(f"- 严格止损: 1-1.5%快速突破时0.8-1%")
analysis.append(f"- 目标盈利: 1.5-3%(根据止损调整)")
return "\n".join(analysis) if analysis else ""
except Exception as e:
logger.warning(f"趋势位置分析失败: {e}")
return ""
def _build_analysis_prompt(self, symbol: str, market_context: str,
news_context: str,
previous_signal: Dict[str, Any] = None) -> str:
"""构建分析提示词"""
prompt_parts = [
f"请分析 {symbol} 的市场情况:\n",
market_context,
"",
news_context
]
# 添加历史信号上下文
if previous_signal:
prev_time = previous_signal.get('timestamp', 'Unknown')
prev_trend = previous_signal.get('trend', 'Unknown')
prev_signals = previous_signal.get('signals', [])
prompt_parts.append("\n" + "="*60)
prompt_parts.append("## 上一轮分析信号(必须参考!)")
prompt_parts.append("="*60)
prompt_parts.append(f"分析时间: {prev_time}")
prompt_parts.append(f"趋势判断: {prev_trend}")
if prev_signals:
prompt_parts.append("\n之前给出的信号:")
for i, sig in enumerate(prev_signals, 1):
action = sig.get('action', 'N/A')
confidence = sig.get('confidence', 0)
timeframe = sig.get('timeframe', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(timeframe, timeframe)
entry = sig.get('entry_price', 'N/A')
sl = sig.get('stop_loss', 'N/A')
tp = sig.get('take_profit', 'N/A')
reasoning = sig.get('reasoning', 'N/A')
prompt_parts.append(
f"\n[{i}] {type_text} | {action} | 信心度: {confidence}%\n"
f" 入场: ${entry}\n"
f" 止损: ${sl}\n"
f" 止盈: ${tp}\n"
f" 理由: {reasoning}"
)
# 重点警告
prompt_parts.append("\n" + "!"*60)
prompt_parts.append("🚨 严禁重复信号!")
prompt_parts.append("!"*60)
prompt_parts.append("如果上一轮已经给出了相同方向的信号(做空/做多),")
prompt_parts.append("且趋势没有发生明确反转,")
prompt_parts.append("绝对不要重复给出相同方向的信号!")
prompt_parts.append("")
prompt_parts.append("只有在以下情况才输出新信号:")
prompt_parts.append(" ✓ 趋势发生了明确的反转")
prompt_parts.append(" ✓ 上一轮是观望,现在出现了新的明确机会")
prompt_parts.append(" ✓ 价格已触及上一轮的止损/止盈价位")
prompt_parts.append("")
prompt_parts.append("以下情况不要输出信号:")
prompt_parts.append(" ✗ 趋势延续,只是价格继续向同一方向移动")
prompt_parts.append(" ✗ 仅仅因为均线排列仍然有效")
prompt_parts.append(" ✗ 没有明显的市场变化")
else:
prompt_parts.append("\n上一轮没有给出交易信号(市场观望建议)")
prompt_parts.append("\n你可以基于当前市场情况给出新的信号。")
prompt_parts.append("\n" + "="*60)
prompt_parts.append("\n请根据以上数据,给出你的市场判断和交易信号。")
return "\n".join(prompt_parts)
def _parse_llm_response(self, response: str, symbol: str) -> Dict[str, Any]:
"""解析 LLM 响应"""
try:
# 尝试提取 JSON
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response)
if json_match:
json_str = json_match.group(1)
else:
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
json_str = json_match.group(0)
else:
raise ValueError("无法找到 JSON 响应")
# 清理 JSON 字符串(移除可能导致解析错误的注释等)
json_str = self._clean_json_string(json_str)
logger.debug(f"解析的 JSON 字符串: {json_str[:500]}...") # 打印前500字符用于调试
result = json.loads(json_str)
# 清理价格字段 - 转换为 float
result = self._clean_price_fields(result)
# 添加元数据
result['symbol'] = symbol
result['timestamp'] = datetime.now().isoformat()
result['raw_response'] = response
# 兼容处理:确保 signals 中的字段与旧格式一致
if 'signals' in result:
for sig in result['signals']:
# LLM 输出的 "type" 是 timeframe (short_term/medium_term/long_term)
# 需要映射为 "timeframe",而 "action" 才是 buy/sell/wait
if 'type' in sig:
# 如果 type 是 short_term/medium_term/long_term映射为 timeframe
if sig['type'] in ['short_term', 'medium_term', 'long_term']:
sig['timeframe'] = sig.pop('type')
# 如果 type 是 buy/sell/wait映射为 action
elif sig['type'] in ['buy', 'sell', 'wait']:
sig['action'] = sig.pop('type')
# 确保 action 字段存在
if 'action' not in sig and 'timeframe' in sig:
# 从 reasoning 或其他字段推断 action
sig['action'] = 'wait'
# 确保 grade 字段存在
if 'grade' not in sig:
# 根据 confidence 推断 grade
confidence = sig.get('confidence', 0)
if confidence >= 80:
sig['grade'] = 'A'
elif confidence >= 60:
sig['grade'] = 'B'
elif confidence >= 40:
sig['grade'] = 'C'
else:
sig['grade'] = 'D'
# 处理趋势字段 - 优先使用 LLM 返回的趋势字段,否则从信号推断
if 'trend_direction' not in result or 'trend_strength' not in result:
# 从 signals 中推断趋势
if 'signals' in result and result['signals']:
best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0))
action = best_signal.get('action', 'wait')
confidence = best_signal.get('confidence', 0)
# 推断趋势方向(如果 LLM 没有提供)
if 'trend_direction' not in result:
if action == 'buy':
result['trend_direction'] = 'uptrend'
elif action == 'sell':
result['trend_direction'] = 'downtrend'
else:
result['trend_direction'] = 'neutral'
# 推断趋势强度(如果 LLM 没有提供)
if 'trend_strength' not in result:
result['trend_strength'] = 'strong' if confidence >= 70 else 'medium' if confidence >= 50 else 'weak'
# 从信号中推断 market_state用于向后兼容
if 'signals' in result and result['signals']:
best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0))
action = best_signal.get('action', 'wait')
confidence = best_signal.get('confidence', 0)
trend_direction = result.get('trend_direction', 'neutral')
# 推断市场状态
if confidence >= 70 and trend_direction != 'neutral':
if trend_direction == 'uptrend':
result['market_state'] = '强势上涨'
elif trend_direction == 'downtrend':
result['market_state'] = '强势下跌'
else:
result['market_state'] = '震荡整理'
else:
result['market_state'] = '震荡整理'
# 推断 trend用于向后兼容简化的趋势字段
if 'trend' not in result:
if trend_direction == 'uptrend':
result['trend'] = 'up'
elif trend_direction == 'downtrend':
result['trend'] = 'down'
else:
result['trend'] = 'sideways'
else:
result['market_state'] = '无明确信号'
if 'trend' not in result:
result['trend'] = 'sideways'
logger.info(f"✅ 市场信号分析完成: {symbol}")
logger.debug(f"市场信号: {json.dumps(result, ensure_ascii=False, indent=2)}")
return result
except Exception as e:
logger.warning(f"解析 LLM 响应失败: {e}")
logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符
return self._get_empty_signal(symbol)
def _clean_json_string(self, json_str: str) -> str:
"""清理 JSON 字符串,移除可能导致解析错误的内容"""
# 移除单行注释 // ...
json_str = re.sub(r'//.*?(?=\n|$)', '', json_str)
# 移除多行注释 /* ... */
json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str)
# 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
return json_str
def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清理价格字段,转换为 float"""
def clean_price(price_value):
if price_value is None:
return None
if isinstance(price_value, (int, float)):
return float(price_value)
if isinstance(price_value, str):
# 移除 $ 符号和逗号
cleaned = price_value.replace('$', '').replace(',', '').strip()
if cleaned:
try:
return float(cleaned)
except ValueError:
return None
return None
# 清理 key_levels 中的支撑位和阻力位
if 'key_levels' in data and data['key_levels']:
key_levels = data['key_levels']
if 'support' in key_levels:
data['key_levels']['support'] = [clean_price(s) for s in key_levels['support']]
if 'resistance' in key_levels:
data['key_levels']['resistance'] = [clean_price(r) for r in key_levels['resistance']]
# 清理 signals 中的价格字段
if 'signals' in data:
# 标记需要移除的信号索引
signals_to_remove = []
for idx, sig in enumerate(data['signals']):
price_fields = ['entry_price', 'stop_loss', 'take_profit']
for field in price_fields:
if field in sig:
sig[field] = clean_price(sig[field])
# 验证止损止盈价格的合理性
entry_price = sig.get('entry_price')
stop_loss = sig.get('stop_loss')
take_profit = sig.get('take_profit')
action = sig.get('action', '')
if entry_price and entry_price > 0:
MAX_REASONABLE_DEVIATION = 0.50 # 50%
has_invalid_price = False
# 检查止损
if stop_loss is not None:
deviation = abs(stop_loss - entry_price) / entry_price
if deviation > MAX_REASONABLE_DEVIATION:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 信号止损价格不合理: entry={entry_price}, stop_loss={stop_loss}, 偏离={deviation*100:.1f}%")
has_invalid_price = True
elif action == 'buy' and stop_loss >= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做多止损错误: entry={entry_price}, stop_loss={stop_loss} 应该 < entry")
has_invalid_price = True
elif action == 'sell' and stop_loss <= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做空止损错误: entry={entry_price}, stop_loss={stop_loss} 应该 > entry")
has_invalid_price = True
# 检查止盈
if take_profit is not None:
deviation = abs(take_profit - entry_price) / entry_price
if deviation > MAX_REASONABLE_DEVIATION:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 信号止盈价格不合理: entry={entry_price}, take_profit={take_profit}, 偏离={deviation*100:.1f}%")
has_invalid_price = True
elif action == 'buy' and take_profit <= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做多止盈错误: entry={entry_price}, take_profit={take_profit} 应该 > entry")
has_invalid_price = True
elif action == 'sell' and take_profit >= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做空止盈错误: entry={entry_price}, take_profit={take_profit} 应该 < entry")
has_invalid_price = True
# 如果价格不合理,降低等级为 D 或移除信号
if has_invalid_price:
original_grade = sig.get('grade', 'C')
sig['grade'] = 'D'
sig['confidence'] = 0
# 添加错误说明
if 'reasoning' in sig:
sig['reasoning'] = f"[价格异常] {sig['reasoning']}"
logger.error(f"❌ [{data.get('symbol', '')}] 信号价格异常,等级从 {original_grade} 降为 D止损止盈已清空")
# 清空不合理的价格
sig['stop_loss'] = None
sig['take_profit'] = None
return data
def _calculate_price_change_24h(self, df) -> str:
"""计算24小时涨跌幅"""
try:
if df is None or len(df) < 24:
return "N/A"
current_price = float(df['close'].iloc[-1])
price_24h_ago = float(df['close'].iloc[-24])
change = ((current_price - price_24h_ago) / price_24h_ago) * 100
sign = "+" if change >= 0 else ""
return f"{sign}{change:.2f}%"
except Exception as e:
logger.debug(f"计算24h涨跌失败: {e}")
return "N/A"
def _get_empty_signal(self, symbol: str) -> Dict[str, Any]:
"""返回空信号"""
return {
'symbol': symbol,
'trend_direction': 'neutral',
'trend_strength': 'weak',
'analysis_summary': 'unknown',
'volume_analysis': '分析失败',
'news_sentiment': 'neutral',
'news_impact': '',
'market_state': '分析失败',
'trend': 'sideways',
'signals': [],
'key_levels': {},
'timestamp': datetime.now().isoformat(),
'error': '信号分析失败'
}
def _analyze_volatility(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析波动率变化(使用 30m 作为日内主周期)"""
df = data.get('30m')
if df is None or len(df) < 24 or 'atr' not in df.columns:
return ""
lines = []
# ATR 变化趋势
recent_atr = df['atr'].iloc[-6:].mean() # 最近 6 根3小时
older_atr = df['atr'].iloc[-12:-6].mean() # 之前 6 根
if pd.isna(recent_atr) or pd.isna(older_atr) or older_atr == 0:
return ""
atr_change = (recent_atr - older_atr) / older_atr * 100
current_atr = float(df['atr'].iloc[-1])
current_price = float(df['close'].iloc[-1])
atr_percent = current_atr / current_price * 100
lines.append(f"当前 ATR (30m): ${current_atr:.2f} ({atr_percent:.2f}%)")
if atr_change > 20:
lines.append(f"**波动率扩张**: ATR 上升 {atr_change:.0f}%,日内趋势可能启动")
elif atr_change < -20:
lines.append(f"**波动率收缩**: ATR 下降 {abs(atr_change):.0f}%,可能即将变盘")
else:
lines.append(f"波动率稳定: ATR 变化 {atr_change:+.0f}%")
# 布林带宽度
if 'bb_upper' in df.columns and 'bb_lower' in df.columns:
bb_width = (float(df['bb_upper'].iloc[-1]) - float(df['bb_lower'].iloc[-1])) / current_price * 100
bb_width_prev = (float(df['bb_upper'].iloc[-6]) - float(df['bb_lower'].iloc[-6])) / float(df['close'].iloc[-6]) * 100
if bb_width < bb_width_prev * 0.8:
lines.append(f"**布林带收口**: 宽度 {bb_width:.1f}%,变盘信号")
elif bb_width > bb_width_prev * 1.2:
lines.append(f"**布林带开口**: 宽度 {bb_width:.1f}%,趋势延续")
return "\n".join(lines)