stock-ai-agent/backend/app/crypto_agent/llm_signal_analyzer.py
2026-02-22 22:16:08 +08:00

2084 lines
82 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
LLM 驱动的信号分析器 - 让 LLM 自主分析市场数据并给出交易信号
"""
import json
import re
import pandas as pd
from typing import Dict, Any, Optional, List
from datetime import datetime
from app.utils.logger import logger
from app.services.llm_service import llm_service
from app.services.news_service import get_news_service
class LLMSignalAnalyzer:
"""LLM 驱动的交易信号分析器"""
# 加密货币专用系统提示词
CRYPTO_SYSTEM_PROMPT = """你是一位专业的加密货币交易员和技术分析师。你的任务是综合分析**K线数据、量价关系、技术指标和新闻舆情**,给出交易信号。
## 核心理念
加密货币市场波动大,每天都有交易机会。你的目标是:
- **主动寻找机会**,而不是被动等待完美信号
- 短线交易重点关注:超跌反弹、超涨回落、关键位突破
- 中线交易重点关注:趋势回调、形态突破、多周期共振
## 一、量价分析(最重要)
量价关系是判断趋势真假的核心:
### 1. 健康上涨信号
- **放量上涨**:价格上涨 + 成交量放大(量比>1.5= 上涨有效,可追多
- **缩量回调**:上涨后回调 + 成交量萎缩(量比<0.7= 回调健康,可低吸
### 2. 健康下跌信号
- **放量下跌**:价格下跌 + 成交量放大 = 下跌有效,可追空
- **缩量反弹**:下跌后反弹 + 成交量萎缩 = 反弹无力,可做空
### 3. 量价背离(重要反转信号)
- **顶背离**:价格创新高,但成交量未创新高 → 上涨动能衰竭,警惕回落
- **底背离**:价格创新低,但成交量未创新低 → 下跌动能衰竭,关注反弹
- **天量见顶**:极端放量(量比>3后价格滞涨 → 主力出货信号
- **地量见底**:极端缩量(量比<0.3)后价格企稳 → 抛压枯竭信号
### 4. 突破确认
- **有效突破**:突破关键位 + 放量确认(量比>1.5= 真突破
- **假突破**:突破关键位 + 缩量 = 假突破,可能回落
## 二、K线形态分析
### 反转形态
- **锤子线/倒锤子**:下跌趋势中出现,下影线长 = 底部信号
- **吞没形态**:大阳吞没前一根阴线 = 看涨;大阴吞没前一根阳线 = 看跌
- **十字星**:在高位/低位出现 = 变盘信号
- **早晨之星/黄昏之星**三根K线组合的反转信号
### 持续形态
- **三连阳/三连阴**:趋势延续信号
- **旗形整理**:趋势中的健康回调
## 三、技术指标分析
### RSI相对强弱指标- 使用 Wilder's Smoothing 标准算法
**RSI 是最重要的超买超卖指标,请注意细节:**
- **RSI < 30**:超卖区,关注反弹机会
- RSI 从 30 以下回升,交叉上穿 30买入信号
- RSI 底背离(价格新低但 RSI 未创新低):强买入信号
- **RSI > 70**:超买区,关注回落风险
- RSI 从 70 以上回落,交叉下穿 70卖出信号
- RSI 顶背离(价格新高但 RSI 未创新高):强卖出信号
- **RSI 40-60**:震荡区,观望为主
- **RSI 趋势**RSI 自身的趋势变化比单一数值更重要
### MACD
- 金叉DIF 上穿 DEA做多信号
- 死叉DIF 下穿 DEA做空信号
- 零轴上方金叉:强势做多
- 零轴下方死叉:强势做空
- MACD 柱状图背离:重要反转信号
### 布林带
- 触及下轨 + 企稳:反弹做多
- 触及上轨 + 受阻:回落做空
- 布林带收口:即将变盘
- 布林带开口:趋势启动
### 均线系统(重要)
**均线系统是趋势判断的核心,请仔细分析:**
- **多头排列**MA5 > MA10 > MA20 > MA50强势上涨趋势回调做多
- **空头排列**MA5 < MA10 < MA20 < MA50强势下跌趋势反弹做空
- **价格与 MA 的关系**
- 价格站稳 MA5/MA10 上方:短线上涨
- 价格突破 MA20中线转多
- 价格跌破 MA20中线转空
- MA50 是中期趋势的分水岭
- **均线金叉死叉**
- MA5 上穿 MA10短线买入信号
- MA5 下穿 MA10短线卖出信号
- MA10 上穿 MA20中线买入信号
- MA10 下穿 MA20中线卖出信号
## 四、新闻舆情分析
结合最新市场新闻判断:
- **重大利好**监管利好、机构入场、ETF 通过等 → 提高做多置信度
- **重大利空**:监管打压、交易所暴雷、黑客攻击等 → 提高做空置信度
- **市场情绪**:恐慌指数、社交媒体热度
- **大户动向**:鲸鱼转账、交易所流入流出
## 五、多周期共振(关键分析框架)
**多周期共振是提高信号质量的核心方法:**
### 周期层级关系
- **4h趋势层**:决定中期大方向
- **1h主周期**:主要交易周期
- **15m入场层**:寻找入场时机
- **5m精确入场**:确认最佳入场点
### 共振判断标准
**强共振A级信号**
- 所有周期趋势同向(如 4h多 + 1h多 + 15m多
- 多周期 RSI 同时超买/超卖后出现背离
- 多周期 MA 同时金叉/死叉
**中等共振B级信号**
- 大周期4h+1h同向
- 主周期1h技术指标明确
**弱共振C级信号**
- 只有单一周期信号
- 多周期方向不一致
### 实战策略
- **顺势交易**4h 和 1h 同向时,在 15m/5m 寻找入场点
- **逆势谨慎**:只有 1h 信号但 4h 反向时,降低置信度
- **突破交易**:多周期同时突破关键位,信号最强
## 六、入场方式
- **market**:现价立即入场 - 信号已经触发,建议立即开仓
- **limit**:挂单等待入场 - 等价格回调到更好位置再入场
## 输出格式
请严格按照以下 JSON 格式输出:
```json
{
"analysis_summary": "简要描述当前市场状态50字以内",
"volume_analysis": "量价分析结论30字以内",
"news_sentiment": "positive/negative/neutral",
"news_impact": "新闻对市场的影响分析30字以内",
"signals": [
{
"type": "short_term/medium_term/long_term",
"action": "buy/sell/wait",
"entry_type": "market/limit",
"confidence": 0-100,
"grade": "A/B/C/D",
"position_size": "heavy/medium/light",
"position_reason": "仓位建议理由20字以内",
"entry_price": 建议入场价,
"stop_loss": 止损价,
"take_profit": 止盈价,
"reason": "详细的入场理由(必须包含量价分析)",
"risk_warning": "风险提示"
}
],
"key_levels": {
"support": [支撑位列表],
"resistance": [阻力位列表]
}
}
```
## 信号等级与置信度
- **A级**80-100量价配合 + 多指标共振 + 多周期确认
- **B级**60-79量价配合 + 主要指标确认
- **C级**40-59有机会但量价不够理想
- **D级**<40量价背离或信号矛盾
## 七、仓位管理(重要)
你需要根据信号质量和当前持仓情况,建议合适的仓位大小。
### 仓位等级
- **heavy**(重仓):机会极佳,建议使用较大仓位
- **medium**(中仓):机会不错,建议使用中等仓位
- **light**(轻仓):机会一般或风险较高,建议轻仓试探
### 仓位决策规则
1. **A级信号**:可建议 heavy 或 medium
2. **B级信号**:建议 medium 或 light
3. **C级信号**:只能建议 light
4. **已有同向持仓时**:新仓位应降一级(避免过度集中)
5. **已有反向持仓时**:谨慎开仓,除非信号极强
6. **市场波动剧烈时**:仓位应保守
### 安全底线(必须遵守)
- 总杠杆永远不得超过 20 倍
- 单一交易对持仓不宜过大
- 如果当前持仓已经较重,即使有好机会也要控制仓位
## 八、止损止盈策略(重要更新)
### 止损设置原则(结构化止损)
**不要使用固定百分比或固定ATR倍数止损必须基于关键价位**
1. **做多止损**
- 优先放在最近支撑位(前低)下方 0.3-0.5%
- 如果有 MA20 支撑,可放在 MA20 下方 0.5%
- 如果最近低点距离过近(<1%),则使用 ATR 1.2-1.5倍
- 避免止损距离过大(>4%ATR
2. **做空止损**
- 优先放在最近阻力位(前高)上方 0.3-0.5%
- 如果有 MA20 阻力,可放在 MA20 上方 0.5%
- 如果最近高点距离过近(<1%),则使用 ATR 1.2-1.5倍
- 避免止损距离过大(>4%ATR
### 止盈设置(移动止盈策略)
**不设固定止盈位,让利润奔跑!**
1. **take_profit 设置为保险价位**
- 做多:入场价 + 15%(作为极端情况的保险止盈)
- 做空:入场价 - 15%
- 这个价位只是"保险",正常情况下不会触及
2. **实际止盈靠移动止损**
- 系统会通过移动止损自动锁定利润
- 盈利 2% 后开始移动止损,锁定部分利润
- 盈利越多,止损跟随移动,确保吃到趋势
### 风险收益比
- 虽然不设固定止盈,但仍要确保止损合理
- 理想情况下,潜在风险(止损距离)应控制在 2-3% 以内
## 重要原则
1. **量价优先** - 任何信号都必须有量能配合才可靠
2. **积极但不冒进** - 有合理依据就给出信号,不要过于保守
3. 每种类型最多输出一个信号
4. 止损必须基于关键支撑/阻力位前低前高、MA20不要用固定百分比
5. 止盈设置为保险价位(做多+15%,做空-15%),实际靠移动止损锁定利润
6. reason 字段必须包含量价分析(如"放量突破+RSI=45量比1.8确认有效"
7. entry_type 必须明确:信号已触发用 market等待更好价位用 limit
8. **position_size 必须明确**:根据信号质量和持仓情况给出 heavy/medium/light"""
# 股票专用系统提示词
STOCK_SYSTEM_PROMPT = """你是一位专业的股票交易员和技术分析师。你的任务是综合分析**K线数据、量价关系、技术指标**,给出交易信号建议。
## 核心理念
股票市场相对稳定,不需要每天都交易。你的目标是:
- **精选机会**,只在高质量信号时给出建议
- 短线交易重点关注:突破回踩、趋势延续、箱体突破
- 中线交易重点关注:趋势反转、业绩驱动、板块轮动
- 长线交易重点关注:价值投资、成长股、红利股
## 一、量价分析(最重要)
量价关系是判断趋势真假的核心:
### 1. 健康上涨信号
- **放量上涨**:价格上涨 + 成交量放大(量比>1.5= 上涨有效,可考虑买入
- **缩量回调**:上涨后回调 + 成交量萎缩(量比<0.7= 回调健康,可低吸
- **温和放量**:温和放量上涨是最健康的上涨方式
### 2. 健康下跌信号
- **放量下跌**:价格下跌 + 成交量放大 = 下跌有效,下跌趋势中不接飞刀
- **缩量反弹**:下跌后反弹 + 成交量萎缩 = 反弹无力,反弹后可能继续下跌
- **地量下跌**:成交量极度萎缩后价格企稳,可能见底
### 3. 量价背离(重要反转信号)
- **顶背离**:价格创新高,但成交量未创新高 → 上涨动能衰竭,警惕回落
- **底背离**:价格创新低,但成交量未创新低 → 下跌动能衰竭,关注反弹
- **高位天量**:高位放出巨量后价格滞涨 → 主力出货信号
- **低位地量**:低位成交量极度萎缩 → 抛压枯竭信号
### 4. 突破确认
- **有效突破**:突破关键位 + 放量确认(量比>1.3+ 收盘站稳 = 真突破
- **假突破**:突破关键位但缩量或无法站稳 = 假突破,可能回落
- **回踩确认**:突破后回踩原压力位变成支撑位,是更好的买点
## 二、K线形态分析
### 反转形态
- **锤子线/倒锤子**:下跌趋势中出现,下影线长 = 底部信号
- **吞没形态**:大阳吞没前一根阴线 = 看涨;大阴吞没前一根阳线 = 看跌
- **十字星**:在高位/低位出现 = 变盘信号
- **早晨之星/黄昏之星**三根K线组合的反转信号
- **头肩顶/头肩底**:重要的反转形态
### 持续形态
- **上升三角形/下降三角形**:趋势延续信号
- **旗形整理**:趋势中的健康回调
- **箱体震荡**:震荡区间,突破后选择方向
## 三、技术指标分析
### RSI相对强弱指标- 使用 Wilder's Smoothing 标准算法
**RSI 是最重要的超买超卖指标,请注意细节:**
- **RSI < 30**:超卖区,关注反弹机会
- RSI 从 30 以下回升,交叉上穿 30买入信号
- RSI 底背离(价格新低但 RSI 未创新低):强买入信号
- **RSI > 70**:超买区,关注回落风险
- RSI 从 70 以上回落,交叉下穿 70卖出信号
- RSI 顶背离(价格新高但 RSI 未创新高):强卖出信号
- **RSI 40-60**:震荡区,观望为主
- **RSI 趋势**RSI 自身的趋势变化比单一数值更重要
- 股票市场中 RSI 极端值比加密货币更可靠
### MACD
- 金叉DIF 上穿 DEA做多信号
- 死叉DIF 下穿 DEA做空信号
- 零轴上方金叉:强势做多
- 零轴下方金叉:弱势反弹
- MACD 柱状图背离:重要反转信号
### 布林带
- 触及下轨 + 企稳:反弹做多
- 触及上轨 + 受阻:回落做空
- 布林带收口:即将变盘
- 布林带开口:趋势启动
### 均线系统(重要)
**均线系统是趋势判断的核心,请仔细分析:**
- **多头排列**MA5 > MA10 > MA20 > MA50强势上涨趋势回调做多
- **空头排列**MA5 < MA10 < MA20 < MA50强势下跌趋势反弹做空
- **价格与 MA 的关系**
- 价格站稳 MA5/MA10 上方:短线上涨
- 价格突破 MA20中线转多
- 价格跌破 MA20中线转空
- MA20/MA50 是中期趋势的分水岭
- **均线金叉死叉**
- MA5 上穿 MA10短线买入信号
- MA5 下穿 MA10短线卖出信号
- MA10 上穿 MA20中线买入信号
- MA10 下穿 MA20中线卖出信号
### 成交量分析
- **量价配合**:价格上涨+放量或下跌+缩量是健康的
- **量价背离**:价格上涨+缩量或下跌+放量要警惕
- **换手率**:换手率过低说明关注度不够,换手率过高可能是投机
## 四、多周期共振(关键分析框架)
**多周期共振是提高信号质量的核心方法:**
### 周期层级关系
- **日线(趋势层)**:决定中长期大方向
- **4h/1h主周期**:主要交易周期
- **15m/5m入场层**:寻找最佳入场时机
### 共振判断标准
**强共振A级信号**
- 所有周期趋势同向(如日线多 + 4h多 + 1h多
- 多周期 RSI 同时超买/超卖后出现背离
- 多周期 MA 同时金叉/死叉
**中等共振B级信号**
- 大周期(日线+4h同向
- 主周期技术指标明确
**弱共振C级信号**
- 只有单一周期信号
- 多周期方向不一致
### 实战策略
- **顺势交易**:大周期和小周期同向时,信号最强
- **逆势谨慎**:只有小周期信号但大周期反向时,降低置信度
- **突破交易**:多周期同时突破关键位,信号最可靠
- 大周期决定方向,小周期决定入场时机
## 五、股票市场特殊性
### 与加密货币的区别
1. **交易时间**:股票有固定交易时间,收盘后无法交易
2. **波动性**:股票波动性通常低于加密货币
3. **T+1规则**部分市场如A股实行T+1当天买入第二天才能卖出
4. **涨跌停限制**:部分市场有涨跌停限制
5. **分红送转**:股票有分红、送股等除权除息事件
### 港股特殊性
- 无涨跌停限制
- T+0交易当天可买卖
- 有港币兑换考虑
- 受内地和美股双重影响
### 美股特殊性
- 无涨跌停限制(但有熔断机制)
- T+0交易当天可买卖
- 有盘前盘后交易
- 受财报季影响大
## 六、入场方式
- **market**:现价立即入场 - 信号已经触发,建议立即开仓
- **limit**:挂单等待入场 - 等价格回调到更好位置再入场
## 输出格式
请严格按照以下 JSON 格式输出:
```json
{
"analysis_summary": "简要描述当前市场状态50字以内",
"volume_analysis": "量价分析结论30字以内",
"news_sentiment": "positive/negative/neutral",
"news_impact": "新闻对市场的影响分析30字以内",
"signals": [
{
"type": "short_term/medium_term/long_term",
"action": "buy/sell/wait",
"entry_type": "market/limit",
"confidence": 0-100,
"grade": "A/B/C/D",
"position_size": "heavy/medium/light",
"position_reason": "仓位建议理由20字以内",
"entry_price": 建议入场价,
"stop_loss": 止损价,
"take_profit": 止盈价,
"reason": "详细的入场理由(必须包含量价分析)",
"risk_warning": "风险提示"
}
],
"key_levels": {
"support": [支撑位列表],
"resistance": [阻力位列表]
}
}
```
## 信号等级与置信度
- **A级**80-100量价配合 + 多指标共振 + 多周期确认 + 形态完美
- **B级**60-79量价配合 + 主要指标确认 + 形态清晰
- **C级**40-59有机会但量价不够理想或形态不完整
- **D级**<40量价背离或信号矛盾
## 七、仓位管理(重要)
股票交易不需要频繁交易,建议精选机会。
### 仓位等级
- **heavy**(重仓):机会极佳,建议使用较大仓位
- **medium**(中仓):机会不错,建议使用中等仓位
- **light**(轻仓):机会一般或风险较高,建议轻仓试探
### 仓位决策规则
1. **A级信号**:可建议 heavy 或 medium
2. **B级信号**:建议 medium 或 light
3. **C级信号**:只能建议 light
4. **已在高位或低位**:即使有好机会也要控制仓位
5. **市场整体环境**:大盘不好时要控制仓位
### 安全底线
- 单一股票仓位不宜超过总资金的 30%
- 同一行业股票不宜过度集中
- 保留现金储备应对市场变化
## 八、止损止盈策略
### 止损设置原则(结构化止损)
**止损必须基于关键价位,不要用固定百分比:**
1. **做多止损**
- 优先放在最近支撑位(前低)下方 2-3%
- 如果有 MA20/MA50 支撑,可放在均线下方 1-2%
- 如果最近低点距离过近(<3%),则使用 ATR 1.5-2倍
- 技术位止损通常在 3-8% 之间
2. **做空止损**
- 优先放在最近阻力位(前高)上方 2-3%
- 如果有 MA20/MA50 阻力,可放在均线上方 1-2%
- 如果最近高点距离过近(<3%),则使用 ATR 1.5-2倍
### 止盈设置
**股票可以设置合理的止盈目标:**
1. **短线止盈**
- 突破类:目标 8-15%
- 反弹类:目标 10-20%
2. **中线止盈**
- 趋势类:目标 20-40%
- 可以分批止盈,保护利润
3. **长线止盈**
- 价值投资:目标 50%+
- 关注基本面变化
### 移动止盈
- 盈利达到目标后,可以将止损移动到成本价以上
- 盈利 15% 后,开始移动止盈锁定利润
- 趋势强劲时,可以让利润奔跑
### 风险收益比
- 理想的风险收益比应该在 1:3 以上
- 即:潜在风险 3%,潜在收益 9% 以上
## 九、基本面分析(重要补充)
**对于股票交易,基本面分析是重要的参考维度:**
### 估值指标分析
- **PE市盈率**
- PE < 15估值偏低价值投资机会
- PE 15-25估值合理
- PE 25-40估值偏高
- PE > 40估值过高风险较大
- **PB市净率**PB < 1.5 通常表示被低估
- **PEG市盈率相对盈利增长比率**PEG < 1 表示被低估
### 盈利能力分析
- **ROE净资产收益率**
- ROE > 20%:优秀公司,盈利能力强
- ROE 15-20%:良好
- ROE 10-15%:一般
- ROE < 10%:盈利能力较弱
- **净利率**:净利率 > 20% 表示盈利质量高
- **毛利率**:毛利率 > 40% 表示有竞争优势
### 成长性分析
- **营收增长率**
- > 30%:高成长
- 20-30%:稳定成长
- 10-20%:一般成长
- < 10%:成长性不足
- **盈利增长率**:与营收增长同步更健康
### 财务健康分析
- **债务股本比**
- < 1财务健康
- 1-2可控范围
- > 2风险较高
- **流动比率**> 2 表示偿债能力强
### 基本面与技术面结合
1. **基本面优秀 + 技术面突破** = 高质量做多机会(可提高置信度)
2. **基本面差 + 技术面破位** = 高质量做空机会(可提高置信度)
3. **基本面优秀 + 技术面回调** = 低吸机会(中线/长线)
4. **基本面差 + 技术面上涨** = 谨慎(可能是诱多)
### 基本面评分参考
- **80分以上A级**:基本面优秀,技术信号确认时可提高置信度
- **60-80分B级**:基本面良好,可作为参考
- **40-60分C级**:基本面一般,主要依赖技术分析
- **40分以下D级**:基本面较差,降低信号置信度
## 重要原则
1. **量价优先** - 任何信号都必须有量能配合才可靠
2. **精选机会** - 股票不需要频繁交易,等待高质量信号
3. **多周期确认** - 日线决定方向,小周期决定入场
4. **结构止损** - 止损必须基于关键支撑/阻力位(前低前高、均线)
5. **合理止盈** - 根据交易周期设置合理的止盈目标
6. **基本面参考** - 结合基本面评分和技术面综合判断,提高信号质量
7. **reason 字段必须包含量价分析**(如"放量突破+RSI=45量比1.8确认有效"
8. **entry_type 必须明确**:信号已触发用 market等待更好价位用 limit
9. **position_size 必须明确**:根据信号质量给出 heavy/medium/light"""
# 兼容旧代码,使用加密货币提示词作为默认值
SYSTEM_PROMPT = CRYPTO_SYSTEM_PROMPT
def __init__(self, agent_type: str = "crypto"):
"""初始化分析器
Args:
agent_type: 智能体类型,支持 'crypto', 'stock', 'smart'
"""
from app.config import get_settings
from app.services.bitget_service import bitget_service
self.news_service = get_news_service()
self.binance_service = bitget_service # 使用 Bitget 服务
settings = get_settings()
# 根据智能体类型选择模型配置
model_config_map = {
'crypto': 'crypto_agent_model',
'stock': 'stock_agent_model',
'smart': 'smart_agent_model'
}
config_key = model_config_map.get(agent_type, 'crypto_agent_model')
self.model_override = getattr(settings, config_key, None)
self.agent_type = agent_type
agent_name_map = {
'crypto': '加密货币',
'stock': '股票', # 改为通用的"股票",具体市场类型会在分析时根据符号判断
'smart': '智能助手'
}
agent_name = agent_name_map.get(agent_type, '未知')
logger.info(f"LLM 信号分析器初始化完成({agent_name},模型: {self.model_override or '默认'}")
def _get_market_type(self, symbol: str) -> str:
"""根据股票代码判断市场类型"""
if symbol.endswith('.HK'):
return '港股'
else:
return '美股'
async def analyze(self, symbol: str, data: Dict[str, pd.DataFrame],
symbols: List[str] = None,
position_info: Dict[str, Any] = None,
fundamental_data: Dict[str, Any] = None,
fundamental_summary: str = "") -> Dict[str, Any]:
"""
使用 LLM 分析市场数据
Args:
symbol: 交易对,如 'BTCUSDT'
data: 多周期K线数据 {'5m': df, '15m': df, '1h': df, '4h': df}
symbols: 所有监控的交易对(用于过滤相关新闻)
position_info: 当前持仓信息,用于仓位管理决策
- account_balance: 账户余额
- total_position_value: 总持仓价值
- current_leverage: 当前杠杆倍数
- positions: 各交易对持仓列表
fundamental_data: 基本面数据(仅股票)
fundamental_summary: 基本面摘要文本(仅股票)
Returns:
分析结果
"""
try:
# 获取市场类型
market_type = self._get_market_type(symbol) if self.agent_type == 'stock' else ''
# 获取新闻数据
news_text = await self._get_news_context(symbol, symbols or [symbol])
# 获取合约市场数据(仅加密货币)
futures_data = None
if self.agent_type == 'crypto':
try:
futures_data = self.binance_service.get_futures_market_data(symbol)
if futures_data:
logger.info(f"{symbol} 资金费率: {futures_data.get('funding_rate', {}).get('funding_rate_percent', 0):.4f}% | "
f"情绪: {futures_data.get('market_sentiment', '')}")
except Exception as e:
logger.warning(f"获取 {symbol} 合约数据失败: {e}")
# 根据智能体类型选择提示词
if self.agent_type == 'stock':
system_prompt = self.STOCK_SYSTEM_PROMPT
else:
system_prompt = self.CRYPTO_SYSTEM_PROMPT
# 构建数据提示
data_prompt = self._build_data_prompt(symbol, data, news_text, position_info, futures_data,
fundamental_data, fundamental_summary)
# 调用 LLM使用异步方法避免阻塞事件循环
response = await llm_service.achat([
{"role": "system", "content": system_prompt},
{"role": "user", "content": data_prompt}
], model_override=self.model_override)
if not response:
logger.warning(f"{symbol} LLM 分析无响应")
return self._empty_result(symbol, "LLM 无响应")
# 解析响应
result = self._parse_response(response)
result['symbol'] = symbol
result['timestamp'] = datetime.now().isoformat()
# 记录日志
signals = result.get('signals', [])
if signals:
for sig in signals:
logger.info(f"{symbol} [{market_type}][{sig['type']}] {sig['action']} "
f"置信度:{sig['confidence']}% 等级:{sig['grade']} "
f"原因:{sig['reason'][:50]}...")
else:
logger.info(f"{symbol} [{market_type}] 无交易信号 - {result.get('analysis_summary', '观望')}")
return result
except Exception as e:
logger.error(f"{symbol} LLM 分析出错: {e}")
import traceback
logger.error(traceback.format_exc())
return self._empty_result(symbol, str(e))
async def _get_news_context(self, symbol: str, symbols: List[str]) -> str:
"""获取新闻上下文"""
try:
# 如果是股票类型,使用 Brave Search 搜索新闻
if self.agent_type == 'stock':
# 获取股票名称
from app.stock_agent.stock_agent import STOCK_NAMES
stock_name = STOCK_NAMES.get(symbol, '')
# 搜索股票新闻
news_list = await self.news_service.search_stock_news(symbol, stock_name)
if news_list:
return self.news_service.format_news_for_llm(news_list, max_items=5)
else:
return ""
else:
# 加密货币使用原有的 RSS 新闻
news_list = await self.news_service.get_latest_news(limit=50)
filtered = self.news_service.filter_relevant_news(
news_list, symbols=symbols, hours=4
)
return self.news_service.format_news_for_llm(filtered, max_items=10)
except Exception as e:
logger.warning(f"获取新闻上下文失败: {e}")
return ""
def _format_position_info(self, symbol: str, position_info: Dict[str, Any]) -> str:
"""格式化持仓信息供 LLM 参考"""
lines = []
# 账户概况
balance = position_info.get('account_balance', 0)
total_value = position_info.get('total_position_value', 0)
current_leverage = position_info.get('current_leverage', 0)
max_leverage = 20 # 最大杠杆限制
lines.append(f"- 账户余额: ${balance:,.2f}")
lines.append(f"- 总持仓价值: ${total_value:,.2f}")
lines.append(f"- 当前杠杆: {current_leverage:.1f}x / {max_leverage}x")
# 可用杠杆空间
available_leverage = max_leverage - current_leverage
if available_leverage > 0:
available_value = balance * available_leverage
lines.append(f"- 可开仓空间: ${available_value:,.2f} ({available_leverage:.1f}x)")
else:
lines.append("- ⚠️ 已达最大杠杆,不建议加仓")
# 当前交易对持仓
positions = position_info.get('positions', [])
symbol_positions = [p for p in positions if p.get('symbol') == symbol]
if symbol_positions:
lines.append(f"\n**{symbol} 当前持仓**:")
for pos in symbol_positions:
side = "做多" if pos.get('side') == 'long' else "做空"
entry = pos.get('entry_price', 0)
pnl = pos.get('pnl_percent', 0)
lines.append(f" - {side} @ ${entry:,.2f} | 盈亏: {pnl:+.2f}%")
else:
lines.append(f"\n**{symbol}**: 无持仓")
# 其他交易对持仓概况
other_positions = [p for p in positions if p.get('symbol') != symbol and p.get('status') == 'open']
if other_positions:
lines.append(f"\n**其他持仓**: {len(other_positions)}")
return "\n".join(lines)
def _format_fundamental_data(self, fundamental_data: Dict[str, Any]) -> str:
"""格式化基本面数据供 LLM 参考"""
lines = []
# 基本信息
company_name = fundamental_data.get('company_name', 'N/A')
sector = fundamental_data.get('sector', 'N/A')
industry = fundamental_data.get('industry', 'N/A')
market_cap = fundamental_data.get('market_cap', 0)
lines.append(f"**公司**: {company_name}")
lines.append(f"**行业**: {sector} / {industry}")
if market_cap:
lines.append(f"**市值**: ${market_cap:,.0f}")
# 基本面评分
score_data = fundamental_data.get('score', {})
total_score = score_data.get('total', 0)
rating = score_data.get('rating', 'N/A')
if total_score > 0:
lines.append(f"**基本面评分**: {total_score:.0f}/100 ({rating}级)")
# 估值指标
valuation = fundamental_data.get('valuation', {})
if valuation.get('pe_ratio'):
pe = valuation['pe_ratio']
pb = valuation.get('pb_ratio', 'N/A')
ps = valuation.get('ps_ratio', 'N/A')
lines.append(f"**估值**: PE={pe:.2f} | PB={pb} | PS={ps}")
# 盈利能力
profitability = fundamental_data.get('profitability', {})
if profitability.get('return_on_equity'):
roe = profitability['return_on_equity']
profit_margin = profitability.get('profit_margin')
gross_margin = profitability.get('gross_margin')
pm_str = f"{profit_margin:.1f}" if profit_margin is not None else "N/A"
gm_str = f"{gross_margin:.1f}" if gross_margin is not None else "N/A"
lines.append(f"**盈利**: ROE={roe:.2f}% | 净利率={pm_str}% | 毛利率={gm_str}%")
# 成长性
growth = fundamental_data.get('growth', {})
revenue_growth = growth.get('revenue_growth')
earnings_growth = growth.get('earnings_growth')
if revenue_growth is not None or earnings_growth is not None:
rg_str = f"{revenue_growth:.1f}" if revenue_growth is not None else "N/A"
eg_str = f"{earnings_growth:.1f}" if earnings_growth is not None else "N/A"
lines.append(f"**成长**: 营收增长={rg_str}% | 盈利增长={eg_str}%")
# 财务健康
financial = fundamental_data.get('financial_health', {})
if financial.get('debt_to_equity'):
debt_to_equity = financial['debt_to_equity']
current_ratio = financial.get('current_ratio')
cr_str = f"{current_ratio:.2f}" if current_ratio is not None else "N/A"
lines.append(f"**财务**: 债务股本比={debt_to_equity:.2f} | 流动比率={cr_str}")
# 分析师建议
analyst = fundamental_data.get('analyst', {})
if analyst.get('target_price'):
target_price = analyst['target_price']
recommendation = analyst.get('recommendation', 'N/A')
lines.append(f"**分析师**: 目标价=${target_price:.2f} | 建议={recommendation}")
return "\n".join(lines)
def _build_data_prompt(self, symbol: str, data: Dict[str, pd.DataFrame],
news_text: str = "", position_info: Dict[str, Any] = None,
futures_data: Dict[str, Any] = None,
fundamental_data: Dict[str, Any] = None,
fundamental_summary: str = "") -> str:
"""构建数据提示词"""
parts = [f"# {symbol} 市场数据分析\n"]
parts.append(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# 当前价格
current_price = 0
if '5m' in data and not data['5m'].empty:
current_price = float(data['5m'].iloc[-1]['close'])
parts.append(f"**当前价格**: ${current_price:,.2f}\n")
# === 新增:基本面数据(仅股票) ===
if fundamental_data and self.agent_type == 'stock':
parts.append("\n## 基本面分析")
if fundamental_summary:
parts.append(fundamental_summary)
else:
parts.append(self._format_fundamental_data(fundamental_data))
# === 新增:合约市场数据 ===
if futures_data and self.agent_type == 'crypto':
parts.append(self.binance_service.format_futures_data_for_llm(symbol, futures_data))
# === 新增:账户和持仓信息 ===
if position_info:
parts.append("\n## 账户与持仓状态")
parts.append(self._format_position_info(symbol, position_info))
# === 新增:关键价位分析 ===
key_levels = self._calculate_key_levels(data)
if key_levels:
parts.append("\n## 关键价位")
parts.append(key_levels)
# === 新增:多周期共振分析 ===
resonance = self._analyze_multi_timeframe_resonance(data)
if resonance:
parts.append("\n## 多周期共振")
parts.append(resonance)
# === 新增:市场结构分析 ===
structure = self._analyze_market_structure(data)
if structure:
parts.append("\n## 市场结构")
parts.append(structure)
# === 新增:波动率分析 ===
volatility = self._analyze_volatility(data)
if volatility:
parts.append("\n## 波动率分析")
parts.append(volatility)
# 各周期数据
for interval in ['4h', '1h', '15m', '5m']:
df = data.get(interval)
if df is None or df.empty:
continue
parts.append(f"\n## {interval.upper()} 周期数据")
# 最新指标(传入 df 以分析趋势变化)
latest = df.iloc[-1]
parts.append(self._format_indicators(latest, df))
# 最近 K 线数据
parts.append(self._format_recent_klines(df, interval))
# 添加新闻数据
if news_text and news_text != "暂无相关新闻":
parts.append(f"\n{news_text}")
parts.append("\n---")
parts.append("请综合分析以上技术数据和新闻舆情,判断是否存在短线、中线或长线的交易机会。")
parts.append("如果没有明确的交易机会signals 数组返回空即可。")
return "\n".join(parts)
def _calculate_key_levels(self, data: Dict[str, pd.DataFrame]) -> str:
"""计算关键支撑阻力位"""
lines = []
# 使用 4h 数据计算关键价位
df = data.get('4h')
if df is None or len(df) < 20:
return ""
current_price = float(df.iloc[-1]['close'])
# 1. 前高前低(最近 20 根 K 线)
recent = df.iloc[-20:]
recent_high = float(recent['high'].max())
recent_low = float(recent['low'].min())
# 2. 整数关口
round_levels = []
base = int(current_price / 1000) * 1000
for offset in [-2000, -1000, 0, 1000, 2000]:
level = base + offset
if level > 0:
round_levels.append(level)
# 3. 斐波那契回撤位(基于最近的高低点)
fib_levels = []
price_range = recent_high - recent_low
if price_range > 0:
fib_ratios = [0, 0.236, 0.382, 0.5, 0.618, 0.786, 1]
for ratio in fib_ratios:
fib_price = recent_low + price_range * ratio
fib_levels.append((ratio, fib_price))
# 构建输出
lines.append(f"- 近期高点: ${recent_high:,.2f}")
lines.append(f"- 近期低点: ${recent_low:,.2f}")
# 找出最近的支撑和阻力
supports = []
resistances = []
# 从斐波那契位找支撑阻力
for ratio, price in fib_levels:
if price < current_price * 0.995: # 低于当前价 0.5% 以上
supports.append(price)
elif price > current_price * 1.005: # 高于当前价 0.5% 以上
resistances.append(price)
# 添加整数关口
for level in round_levels:
if level < current_price * 0.995:
supports.append(level)
elif level > current_price * 1.005:
resistances.append(level)
# 排序并取最近的
supports = sorted(set(supports), reverse=True)[:3]
resistances = sorted(set(resistances))[:3]
if supports:
support_str = ", ".join([f"${s:,.0f}" for s in supports])
lines.append(f"- 支撑位: {support_str}")
if resistances:
resistance_str = ", ".join([f"${r:,.0f}" for r in resistances])
lines.append(f"- 阻力位: {resistance_str}")
# 当前价格位置
if recent_high > recent_low:
position = (current_price - recent_low) / (recent_high - recent_low) * 100
if position > 80:
pos_text = "接近高点,注意回调风险"
elif position < 20:
pos_text = "接近低点,关注反弹机会"
else:
pos_text = f"处于区间 {position:.0f}% 位置"
lines.append(f"- 价格位置: {pos_text}")
return "\n".join(lines)
def _analyze_multi_timeframe_resonance(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析多周期共振"""
trends = {}
for interval in ['4h', '1h', '15m', '5m']:
df = data.get(interval)
if df is None or len(df) < 10:
continue
# 判断趋势方向
ma5 = df['ma5'].iloc[-1] if 'ma5' in df.columns else None
ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns else None
close = df['close'].iloc[-1]
if pd.notna(ma5) and pd.notna(ma20):
if close > ma5 > ma20:
trends[interval] = 'bullish'
elif close < ma5 < ma20:
trends[interval] = 'bearish'
else:
trends[interval] = 'neutral'
if len(trends) < 2:
return ""
lines = []
# 统计各方向数量
bullish_count = sum(1 for t in trends.values() if t == 'bullish')
bearish_count = sum(1 for t in trends.values() if t == 'bearish')
total = len(trends)
# 各周期趋势
trend_map = {'bullish': '📈多', 'bearish': '📉空', 'neutral': '➡️震荡'}
trend_str = " | ".join([f"{k}: {trend_map.get(v, v)}" for k, v in trends.items()])
lines.append(f"- 各周期趋势: {trend_str}")
# 共振判断
if bullish_count == total:
lines.append(f"- **强共振做多**: 所有周期均为多头排列")
elif bearish_count == total:
lines.append(f"- **强共振做空**: 所有周期均为空头排列")
elif bullish_count >= total * 0.75:
lines.append(f"- **偏多共振**: {bullish_count}/{total} 周期看多")
elif bearish_count >= total * 0.75:
lines.append(f"- **偏空共振**: {bearish_count}/{total} 周期看空")
else:
lines.append(f"- **无明显共振**: 多空分歧,建议观望")
return "\n".join(lines)
def _analyze_market_structure(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析市场结构(趋势、高低点)"""
df = data.get('1h')
if df is None or len(df) < 24:
return ""
lines = []
recent = df.iloc[-24:] # 最近 24 根 1h K 线
# 找出局部高低点
highs = []
lows = []
for i in range(2, len(recent) - 2):
# 局部高点:比前后两根都高
if (recent.iloc[i]['high'] > recent.iloc[i-1]['high'] and
recent.iloc[i]['high'] > recent.iloc[i-2]['high'] and
recent.iloc[i]['high'] > recent.iloc[i+1]['high'] and
recent.iloc[i]['high'] > recent.iloc[i+2]['high']):
highs.append((i, float(recent.iloc[i]['high'])))
# 局部低点:比前后两根都低
if (recent.iloc[i]['low'] < recent.iloc[i-1]['low'] and
recent.iloc[i]['low'] < recent.iloc[i-2]['low'] and
recent.iloc[i]['low'] < recent.iloc[i+1]['low'] and
recent.iloc[i]['low'] < recent.iloc[i+2]['low']):
lows.append((i, float(recent.iloc[i]['low'])))
# 判断趋势结构
if len(highs) >= 2 and len(lows) >= 2:
# 检查高点是否越来越高
higher_highs = all(highs[i][1] < highs[i+1][1] for i in range(len(highs)-1))
lower_highs = all(highs[i][1] > highs[i+1][1] for i in range(len(highs)-1))
# 检查低点是否越来越高
higher_lows = all(lows[i][1] < lows[i+1][1] for i in range(len(lows)-1))
lower_lows = all(lows[i][1] > lows[i+1][1] for i in range(len(lows)-1))
if higher_highs and higher_lows:
lines.append("- **上升趋势**: 更高的高点(HH) + 更高的低点(HL)")
elif lower_highs and lower_lows:
lines.append("- **下降趋势**: 更低的高点(LH) + 更低的低点(LL)")
elif higher_lows and lower_highs:
lines.append("- **收敛三角形**: 高点下移 + 低点上移,即将突破")
elif lower_lows and higher_highs:
lines.append("- **扩散形态**: 波动加大,方向不明")
else:
lines.append("- **震荡结构**: 无明显趋势")
else:
lines.append("- **结构不明**: 高低点不足,难以判断")
# 计算趋势强度
if len(recent) >= 10:
price_change = (float(recent.iloc[-1]['close']) - float(recent.iloc[0]['close'])) / float(recent.iloc[0]['close']) * 100
if abs(price_change) > 3:
direction = "上涨" if price_change > 0 else "下跌"
lines.append(f"- 24h 趋势: {direction} {abs(price_change):.1f}%")
return "\n".join(lines)
def _analyze_volatility(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析波动率变化"""
df = data.get('1h')
if df is None or len(df) < 24 or 'atr' not in df.columns:
return ""
lines = []
# ATR 变化趋势
recent_atr = df['atr'].iloc[-6:].mean() # 最近 6 根
older_atr = df['atr'].iloc[-12:-6].mean() # 之前 6 根
if pd.isna(recent_atr) or pd.isna(older_atr) or older_atr == 0:
return ""
atr_change = (recent_atr - older_atr) / older_atr * 100
current_atr = float(df['atr'].iloc[-1])
current_price = float(df['close'].iloc[-1])
atr_percent = current_atr / current_price * 100
lines.append(f"- 当前 ATR: ${current_atr:.2f} ({atr_percent:.2f}%)")
if atr_change > 20:
lines.append(f"- **波动率扩张**: ATR 上升 {atr_change:.0f}%,趋势可能启动")
elif atr_change < -20:
lines.append(f"- **波动率收缩**: ATR 下降 {abs(atr_change):.0f}%,可能即将突破")
else:
lines.append(f"- 波动率稳定: ATR 变化 {atr_change:+.0f}%")
# 布林带宽度
if 'bb_upper' in df.columns and 'bb_lower' in df.columns:
bb_width = (float(df['bb_upper'].iloc[-1]) - float(df['bb_lower'].iloc[-1])) / current_price * 100
bb_width_prev = (float(df['bb_upper'].iloc[-6]) - float(df['bb_lower'].iloc[-6])) / float(df['close'].iloc[-6]) * 100
if bb_width < bb_width_prev * 0.8:
lines.append(f"- **布林带收口**: 宽度 {bb_width:.1f}%,变盘信号")
elif bb_width > bb_width_prev * 1.2:
lines.append(f"- **布林带开口**: 宽度 {bb_width:.1f}%,趋势延续")
return "\n".join(lines)
return "\n".join(parts)
def _format_indicators(self, row: pd.Series, df: pd.DataFrame = None) -> str:
"""格式化指标数据(含趋势变化分析)"""
lines = []
# 价格
close = row.get('close', 0)
open_price = row.get('open', 0)
high = row.get('high', 0)
low = row.get('low', 0)
change = ((close - open_price) / open_price * 100) if open_price else 0
lines.append(f"- K线: O={open_price:.2f} H={high:.2f} L={low:.2f} C={close:.2f} ({change:+.2f}%)")
# 均线
ma5 = row.get('ma5', 0)
ma10 = row.get('ma10', 0)
ma20 = row.get('ma20', 0)
ma50 = row.get('ma50', 0)
if pd.notna(ma20):
# 判断均线排列
if pd.notna(ma5) and pd.notna(ma10):
if ma5 > ma10 > ma20:
ma_trend = "多头排列"
elif ma5 < ma10 < ma20:
ma_trend = "空头排列"
else:
ma_trend = "交织"
else:
ma_trend = ""
ma_str = f"- 均线: MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f}"
if pd.notna(ma50):
ma_str += f", MA50={ma50:.2f}"
if ma_trend:
ma_str += f" ({ma_trend})"
lines.append(ma_str)
# RSI含趋势分析
rsi = row.get('rsi', 0)
if pd.notna(rsi):
rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性")
rsi_trend = self._analyze_indicator_trend(df, 'rsi', 6) if df is not None else ""
rsi_line = f"- RSI: {rsi:.1f} ({rsi_status})"
if rsi_trend:
rsi_line += f" {rsi_trend}"
lines.append(rsi_line)
# MACD含趋势分析
macd = row.get('macd', 0)
macd_signal = row.get('macd_signal', 0)
macd_hist = row.get('macd_hist', 0)
if pd.notna(macd):
macd_status = "多头" if macd > macd_signal else "空头"
macd_trend = self._analyze_macd_trend(df) if df is not None else ""
macd_line = f"- MACD: DIF={macd:.4f}, DEA={macd_signal:.4f}, 柱={macd_hist:.4f} ({macd_status})"
if macd_trend:
macd_line += f" {macd_trend}"
lines.append(macd_line)
# KDJ含金叉死叉检测
k = row.get('k', 0)
d = row.get('d', 0)
j = row.get('j', 0)
if pd.notna(k):
kdj_signal = self._detect_kdj_cross(df) if df is not None else ""
kdj_line = f"- KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}"
if kdj_signal:
kdj_line += f" {kdj_signal}"
lines.append(kdj_line)
# 布林带(含位置分析)
bb_upper = row.get('bb_upper', 0)
bb_middle = row.get('bb_middle', 0)
bb_lower = row.get('bb_lower', 0)
if pd.notna(bb_upper):
# 判断价格在布林带中的位置
if close >= bb_upper:
bb_pos = "触及上轨"
elif close <= bb_lower:
bb_pos = "触及下轨"
elif close > bb_middle:
bb_pos = "中轨上方"
else:
bb_pos = "中轨下方"
lines.append(f"- 布林带: 上={bb_upper:.2f}, 中={bb_middle:.2f}, 下={bb_lower:.2f} ({bb_pos})")
# ATR
atr = row.get('atr', 0)
if pd.notna(atr):
lines.append(f"- ATR: {atr:.2f}")
# 成交量
volume = row.get('volume', 0)
volume_ratio = row.get('volume_ratio', 0)
if pd.notna(volume_ratio):
vol_status = "放量" if volume_ratio > 1.5 else ("缩量" if volume_ratio < 0.5 else "正常")
lines.append(f"- 成交量: {volume:.2f}, 量比={volume_ratio:.2f} ({vol_status})")
return "\n".join(lines)
def _analyze_indicator_trend(self, df: pd.DataFrame, indicator: str, lookback: int = 6) -> str:
"""分析指标趋势变化"""
if df is None or len(df) < lookback:
return ""
recent = df[indicator].iloc[-lookback:]
if recent.isna().any():
return ""
first_val = recent.iloc[0]
last_val = recent.iloc[-1]
change = last_val - first_val
# RSI 特殊处理
if indicator == 'rsi':
if first_val > 70 and last_val < 70:
return "[从超买回落]"
elif first_val < 30 and last_val > 30:
return "[从超卖反弹]"
elif change > 10:
return "[快速上升]"
elif change < -10:
return "[快速下降]"
return ""
def _analyze_macd_trend(self, df: pd.DataFrame, lookback: int = 6) -> str:
"""分析 MACD 趋势"""
if df is None or len(df) < lookback:
return ""
recent_hist = df['macd_hist'].iloc[-lookback:]
recent_macd = df['macd'].iloc[-lookback:]
recent_signal = df['macd_signal'].iloc[-lookback:]
if recent_hist.isna().any():
return ""
# 检测金叉死叉
for i in range(-3, 0):
if i - 1 >= -len(recent_macd):
prev_diff = recent_macd.iloc[i-1] - recent_signal.iloc[i-1]
curr_diff = recent_macd.iloc[i] - recent_signal.iloc[i]
if prev_diff < 0 and curr_diff > 0:
return "[刚刚金叉]"
elif prev_diff > 0 and curr_diff < 0:
return "[刚刚死叉]"
# 检测柱状图趋势
positive_count = sum(1 for x in recent_hist if x > 0)
hist_trend = recent_hist.iloc[-1] - recent_hist.iloc[-3] if len(recent_hist) >= 3 else 0
if positive_count == lookback and hist_trend > 0:
return "[红柱持续放大]"
elif positive_count == lookback and hist_trend < 0:
return "[红柱开始缩小]"
elif positive_count == 0 and hist_trend < 0:
return "[绿柱持续放大]"
elif positive_count == 0 and hist_trend > 0:
return "[绿柱开始缩小]"
return ""
def _detect_kdj_cross(self, df: pd.DataFrame, lookback: int = 3) -> str:
"""检测 KDJ 金叉死叉"""
if df is None or len(df) < lookback:
return ""
recent_k = df['k'].iloc[-lookback:]
recent_d = df['d'].iloc[-lookback:]
if recent_k.isna().any() or recent_d.isna().any():
return ""
# 检测最近的交叉
for i in range(-lookback + 1, 0):
prev_diff = recent_k.iloc[i-1] - recent_d.iloc[i-1]
curr_diff = recent_k.iloc[i] - recent_d.iloc[i]
if prev_diff < 0 and curr_diff > 0:
# 金叉位置判断
k_val = recent_k.iloc[i]
if k_val < 20:
return "[低位金叉,强买入信号]"
elif k_val < 50:
return "[中位金叉]"
else:
return "[高位金叉,谨慎]"
elif prev_diff > 0 and curr_diff < 0:
k_val = recent_k.iloc[i]
if k_val > 80:
return "[高位死叉,强卖出信号]"
elif k_val > 50:
return "[中位死叉]"
else:
return "[低位死叉,谨慎]"
return ""
def _format_recent_klines(self, df: pd.DataFrame, interval: str) -> str:
"""格式化最近 K 线(含量价分析)"""
# 根据周期决定显示数量
# 4h: 12根=2天, 1h: 24根=1天, 15m: 16根=4小时, 5m: 12根=1小时
count = {'4h': 12, '1h': 24, '15m': 16, '5m': 12}.get(interval, 12)
if len(df) < count:
count = len(df)
lines = [f"\n最近 {count} 根 K 线(含量价数据):"]
lines.append("| 时间 | 开盘 | 最高 | 最低 | 收盘 | 涨跌 | 成交量 | 量比 | RSI |")
lines.append("|------|------|------|------|------|------|--------|------|-----|")
for i in range(-count, 0):
row = df.iloc[i]
change = ((row['close'] - row['open']) / row['open'] * 100) if row['open'] else 0
change_str = f"{change:+.2f}%"
time_str = row['open_time'].strftime('%m-%d %H:%M') if pd.notna(row.get('open_time')) else 'N/A'
rsi = row.get('rsi', 0)
rsi_str = f"{rsi:.0f}" if pd.notna(rsi) else "-"
# 成交量和量比
volume = row.get('volume', 0)
volume_ratio = row.get('volume_ratio', 1.0)
if pd.notna(volume) and volume > 0:
# 格式化成交量大数字用K/M表示
if volume >= 1000000:
vol_str = f"{volume/1000000:.1f}M"
elif volume >= 1000:
vol_str = f"{volume/1000:.1f}K"
else:
vol_str = f"{volume:.0f}"
else:
vol_str = "-"
vol_ratio_str = f"{volume_ratio:.2f}" if pd.notna(volume_ratio) else "-"
lines.append(f"| {time_str} | {row['open']:.2f} | {row['high']:.2f} | "
f"{row['low']:.2f} | {row['close']:.2f} | {change_str} | {vol_str} | {vol_ratio_str} | {rsi_str} |")
# 添加量价分析提示
lines.append(self._analyze_volume_price(df, count))
return "\n".join(lines)
def _analyze_volume_price(self, df: pd.DataFrame, count: int) -> str:
"""分析量价关系"""
if len(df) < count:
return ""
recent = df.iloc[-count:]
analysis = []
# 计算价格趋势
price_change = (recent.iloc[-1]['close'] - recent.iloc[0]['close']) / recent.iloc[0]['close'] * 100
# 计算成交量趋势
vol_first_half = recent.iloc[:count//2]['volume'].mean() if 'volume' in recent.columns else 0
vol_second_half = recent.iloc[count//2:]['volume'].mean() if 'volume' in recent.columns else 0
if vol_first_half > 0 and vol_second_half > 0:
vol_change = (vol_second_half - vol_first_half) / vol_first_half * 100
# 量价分析
if price_change > 1: # 上涨
if vol_change > 20:
analysis.append("📈 **量价分析**: 放量上涨,上涨有效")
elif vol_change < -20:
analysis.append("⚠️ **量价分析**: 缩量上涨,警惕回调")
else:
analysis.append("➡️ **量价分析**: 量能平稳上涨")
elif price_change < -1: # 下跌
if vol_change > 20:
analysis.append("📉 **量价分析**: 放量下跌,下跌有效")
elif vol_change < -20:
analysis.append("💡 **量价分析**: 缩量下跌,关注企稳")
else:
analysis.append("➡️ **量价分析**: 量能平稳下跌")
else: # 横盘
if vol_change < -30:
analysis.append("🔄 **量价分析**: 缩量整理,等待方向")
else:
analysis.append("🔄 **量价分析**: 横盘震荡")
# 检测量价背离
if len(df) >= 10:
recent_10 = df.iloc[-10:]
# 检查是否有新高/新低
price_high_idx = recent_10['high'].idxmax()
price_low_idx = recent_10['low'].idxmin()
if 'volume' in recent_10.columns:
# 顶背离检测
if price_high_idx == recent_10.index[-1]: # 最新K线创新高
prev_high_idx = recent_10['high'].iloc[:-1].idxmax()
if recent_10.loc[price_high_idx, 'volume'] < recent_10.loc[prev_high_idx, 'volume'] * 0.8:
analysis.append("🔴 **顶背离**: 价格新高但量能不足,警惕回落")
# 底背离检测
if price_low_idx == recent_10.index[-1]: # 最新K线创新低
prev_low_idx = recent_10['low'].iloc[:-1].idxmin()
if recent_10.loc[price_low_idx, 'volume'] < recent_10.loc[prev_low_idx, 'volume'] * 0.8:
analysis.append("🟢 **底背离**: 价格新低但量能萎缩,关注反弹")
return "\n" + "\n".join(analysis) if analysis else ""
def _parse_response(self, response: str) -> Dict[str, Any]:
"""解析 LLM 响应"""
result = {
'raw_response': response,
'analysis_summary': '',
'signals': [],
'key_levels': {'support': [], 'resistance': []}
}
try:
json_str = None
# 尝试多种方式提取 JSON
# 1. 尝试提取 ```json ... ``` 代码块
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response)
if json_match:
json_str = json_match.group(1).strip()
logger.debug(f"从 ```json 代码块提取 JSON长度: {len(json_str)}")
else:
# 2. 尝试提取 ``` ... ``` 代码块(没有 json 标记)
code_match = re.search(r'```\s*([\s\S]*?)\s*```', response)
if code_match:
potential_json = code_match.group(1).strip()
# 检查是否像 JSON以 { 开头)
if potential_json.startswith('{'):
json_str = potential_json
logger.debug(f"从 ``` 代码块提取 JSON长度: {len(json_str)}")
# 3. 如果还没找到,尝试直接找到 { ... } 结构
if not json_str:
brace_match = re.search(r'\{[\s\S]*\}', response)
if brace_match:
json_str = brace_match.group(0).strip()
logger.debug(f"从花括号提取 JSON长度: {len(json_str)}")
# 4. 最后尝试直接解析整个响应
if not json_str:
json_str = response.strip()
logger.debug(f"直接使用整个响应作为 JSON长度: {len(json_str)}")
# 清理 JSON 字符串中的问题字符
json_str = self._clean_json_string(json_str)
# 解析 JSON
parsed = json.loads(json_str)
result['analysis_summary'] = parsed.get('analysis_summary', '')
result['signals'] = parsed.get('signals', [])
result['key_levels'] = parsed.get('key_levels', {'support': [], 'resistance': []})
# 验证和清理信号
valid_signals = []
for sig in result['signals']:
if self._validate_signal(sig):
valid_signals.append(sig)
result['signals'] = valid_signals
logger.info(f"JSON 解析成功: {len(valid_signals)} 个有效信号")
except json.JSONDecodeError as e:
logger.warning(f"LLM 响应不是有效 JSON: {e},尝试提取关键信息")
logger.debug(f"无法解析的 JSON 字符串: {json_str[:200] if json_str else response[:200]}...")
result['analysis_summary'] = self._extract_summary(response)
except Exception as e:
logger.error(f"解析响应时出错: {e}")
result['analysis_summary'] = self._extract_summary(response)
return result
def _validate_signal(self, signal: Dict[str, Any]) -> bool:
"""验证信号是否有效"""
required_fields = ['type', 'action', 'confidence', 'grade', 'reason']
for field in required_fields:
if field not in signal:
return False
# 验证类型
if signal['type'] not in ['short_term', 'medium_term', 'long_term']:
return False
def _clean_json_string(self, json_str: str) -> str:
"""清理 JSON 字符串中的问题字符"""
# 移除 BOM 标记
json_str = json_str.strip('\ufeff')
# 使用更健壮的方法:使用 json.JSONDecoder 的 raw_decode
# 但首先尝试简单的清理
try:
# 方法1: 直接解析,如果成功就不需要清理
json.loads(json_str)
return json_str
except json.JSONDecodeError:
pass
# 方法2: 移除控制字符(保留换行、制表符等常见字符)
# 控制字符范围: 0x00-0x1F除了 0x09(\t), 0x0A(\n), 0x0D(\r)
def remove_control_chars(s):
"""移除字符串值中的控制字符"""
result = []
in_string = False
escape = False
i = 0
while i < len(s):
char = s[i]
if escape:
# 转义模式下,保留所有字符
result.append(char)
escape = False
elif char == '\\':
# 开始转义
result.append(char)
escape = True
elif char == '"' and (i == 0 or s[i-1] != '\\'):
# 字符串边界
in_string = not in_string
result.append(char)
elif in_string:
# 在字符串内,检查是否为控制字符
code = ord(char)
if code < 0x20 and code not in (0x09, 0x0A, 0x0D):
# 控制字符,跳过或替换
if char == '\n':
result.append('\\n')
elif char == '\r':
result.append('\\r')
elif char == '\t':
result.append('\\t')
# 其他控制字符直接跳过
else:
result.append(char)
else:
# 不在字符串内,保留所有字符(包括空格、换行等)
result.append(char)
i += 1
return ''.join(result)
json_str = remove_control_chars(json_str)
return json_str
def _validate_signal(self, signal: Dict[str, Any]) -> bool:
"""验证信号是否有效"""
required_fields = ['type', 'action', 'confidence', 'grade', 'reason']
for field in required_fields:
if field not in signal:
return False
# 验证类型
if signal['type'] not in ['short_term', 'medium_term', 'long_term']:
return False
# 验证动作
if signal['action'] not in ['buy', 'sell', 'wait']:
return False
# wait 动作不算有效信号
if signal['action'] == 'wait':
return False
# 验证置信度(必须 >= 60 才算有效信号,即 B 级及以上)
confidence = signal.get('confidence', 0)
if not isinstance(confidence, (int, float)) or confidence < 60:
return False
# 验证入场类型(默认为 market
entry_type = signal.get('entry_type', 'market')
if entry_type not in ['market', 'limit']:
signal['entry_type'] = 'market' # 默认现价入场
# 验证仓位大小(默认根据等级设置)
position_size = signal.get('position_size', '')
if position_size not in ['heavy', 'medium', 'light']:
# 根据信号等级设置默认仓位
grade = signal.get('grade', 'C')
if grade == 'A':
signal['position_size'] = 'medium' # A级默认中仓
elif grade == 'B':
signal['position_size'] = 'light' # B级默认轻仓
else:
signal['position_size'] = 'light' # C级默认轻仓
return True
def _extract_summary(self, text: str) -> str:
"""从文本中提取摘要"""
text = text.strip()
if len(text) > 100:
return text[:100] + "..."
return text
def _empty_result(self, symbol: str, reason: str = "") -> Dict[str, Any]:
"""返回空结果"""
return {
'symbol': symbol,
'timestamp': datetime.now().isoformat(),
'analysis_summary': reason or '无法分析',
'signals': [],
'key_levels': {'support': [], 'resistance': []},
'error': reason
}
def get_best_signal(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
从分析结果中获取最佳信号
Args:
result: analyze() 的返回结果
Returns:
最佳信号,如果没有则返回 None
"""
signals = result.get('signals', [])
if not signals:
return None
# 按置信度排序
sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True)
return sorted_signals[0]
def format_signal_message(self, signal: Dict[str, Any], symbol: str) -> str:
"""
格式化信号消息(用于 Telegram 通知)
Args:
signal: 信号数据
symbol: 交易对
Returns:
格式化的消息文本
"""
# 获取股票名称
from app.stock_agent.stock_agent import STOCK_NAMES
stock_name = STOCK_NAMES.get(symbol, '')
type_map = {
'short_term': '短线',
'medium_term': '中线',
'long_term': '长线'
}
action_map = {
'buy': '做多',
'sell': '做空'
}
signal_type = type_map.get(signal['type'], signal['type'])
action = action_map.get(signal['action'], signal['action'])
grade = signal.get('grade', 'C')
confidence = signal.get('confidence', 0)
entry_type = signal.get('entry_type', 'market')
# 等级图标
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(grade, '')
# 方向图标
action_icon = '🟢' if signal['action'] == 'buy' else '🔴'
# 入场类型
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
entry_type_icon = '' if entry_type == 'market' else ''
# 仓位大小
position_size = signal.get('position_size', 'light')
position_map = {'heavy': '重仓', 'medium': '中仓', 'light': '轻仓'}
position_icon = {'heavy': '🔥', 'medium': '📊', 'light': '🌱'}.get(position_size, '🌱')
position_text = position_map.get(position_size, '轻仓')
# 计算风险收益比
entry = signal.get('entry_price', 0)
sl = signal.get('stop_loss', 0)
tp = signal.get('take_profit', 0)
sl_percent = ((sl - entry) / entry * 100) if entry else 0
tp_percent = ((tp - entry) / entry * 100) if entry else 0
# 识别市场类型
if self.agent_type == 'crypto':
market_tag = '[加密货币] '
elif symbol.endswith('.HK'):
market_tag = '[港股] '
else:
market_tag = '[美股] '
# 构建标题(带股票名称和市场类型)
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
message = f"""📊 {market_tag}{symbol_display} {signal_type}信号
{action_icon} **方向**: {action}
{entry_type_icon} **入场**: {entry_type_text}
{position_icon} **仓位**: {position_text}
⭐ **等级**: {grade} {grade_icon}
📈 **置信度**: {confidence}%
💰 **入场价**: ${entry:,.2f}
🛑 **止损价**: ${sl:,.2f} ({sl_percent:+.1f}%)
🎯 **止盈价**: ${tp:,.2f} ({tp_percent:+.1f}%)
📝 **分析理由**:
{signal.get('reason', '')}
⚠️ **风险提示**:
{signal.get('risk_warning', '请注意风险控制')}"""
return message
def format_feishu_card(self, signal: Dict[str, Any], symbol: str) -> Dict[str, Any]:
"""
格式化飞书卡片消息
Args:
signal: 信号数据
symbol: 交易对
Returns:
包含 title, content, color 的字典
"""
# 获取股票名称
from app.stock_agent.stock_agent import STOCK_NAMES
stock_name = STOCK_NAMES.get(symbol, '')
type_map = {
'short_term': '短线',
'medium_term': '中线',
'long_term': '长线'
}
action_map = {
'buy': '做多',
'sell': '做空'
}
signal_type = type_map.get(signal['type'], signal['type'])
action = action_map.get(signal['action'], signal['action'])
grade = signal.get('grade', 'C')
confidence = signal.get('confidence', 0)
entry_type = signal.get('entry_type', 'market')
# 等级图标
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(grade, '')
# 入场类型
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
entry_type_icon = '' if entry_type == 'market' else ''
# 仓位大小
position_size = signal.get('position_size', 'light')
position_map = {'heavy': '重仓', 'medium': '中仓', 'light': '轻仓'}
position_icon = {'heavy': '🔥', 'medium': '📊', 'light': '🌱'}.get(position_size, '🌱')
position_text = position_map.get(position_size, '轻仓')
# 标题和颜色 - 区分加密货币/美股/港股
is_market_order = entry_type == 'market'
market_badge = '【现价】' if is_market_order else ''
# 识别市场类型
if self.agent_type == 'crypto':
market_tag = '[加密货币] '
elif symbol.endswith('.HK'):
market_tag = '[港股] '
else:
market_tag = '[美股] '
# 构建带名称的股票显示
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
if signal['action'] == 'buy':
title = f"🟢 {market_tag}{symbol_display} {signal_type}做多信号 {market_badge}"
color = "green"
else:
title = f"🔴 {market_tag}{symbol_display} {signal_type}做空信号 {market_badge}"
color = "red"
# 计算风险收益比
entry = signal.get('entry_price', 0)
sl = signal.get('stop_loss', 0)
tp = signal.get('take_profit', 0)
sl_percent = ((sl - entry) / entry * 100) if entry else 0
tp_percent = ((tp - entry) / entry * 100) if entry else 0
# 构建 Markdown 内容 - 现价时突出显示
if is_market_order:
# 现价入场,重点突出价格
content_parts = [
f"**{signal_type}** | **{grade}**{grade_icon} | **{confidence}%** 置信度",
f"{entry_type_icon} **入场方式**: {entry_type_text}",
f"{position_icon} **建议仓位**: {position_text}",
"",
f"💰 **⭐ 现价入场 ⭐**",
f"**>>> ${entry:,.2f} <<<**",
"",
f"🛑 **止损**: ${sl:,.2f} ({sl_percent:+.1f}%)",
f"🎯 **止盈**: ${tp:,.2f} ({tp_percent:+.1f}%)",
"",
f"📝 **分析理由**:",
f"{signal.get('reason', '')}",
"",
f"⚠️ **风险提示**:",
f"{signal.get('risk_warning', '请注意风险控制')}",
]
else:
# 挂单,正常显示
content_parts = [
f"**{signal_type}** | **{grade}**{grade_icon} | **{confidence}%** 置信度",
f"{entry_type_icon} **入场方式**: {entry_type_text}",
f"{position_icon} **建议仓位**: {position_text}",
"",
f"💰 **入场**: ${entry:,.2f}",
f"🛑 **止损**: ${sl:,.2f} ({sl_percent:+.1f}%)",
f"🎯 **止盈**: ${tp:,.2f} ({tp_percent:+.1f}%)",
"",
f"📝 **分析理由**:",
f"{signal.get('reason', '')}",
"",
f"⚠️ **风险提示**:",
f"{signal.get('risk_warning', '请注意风险控制')}",
]
return {
'title': title,
'content': '\n'.join(content_parts),
'color': color
}
# 持仓回顾分析的 System Prompt
POSITION_REVIEW_PROMPT = """你是一个专业的加密货币交易风险管理专家。你的任务是回顾现有持仓,根据最新市场行情决定是否需要调整。
## 你的职责
对于每个持仓,你需要分析:
1. 当前持仓状态(盈亏、持仓时间、风险敞口)
2. 最新市场行情(趋势、支撑阻力、技术指标)
3. 原有交易逻辑是否依然有效
4. 是否需要调整止损止盈
5. 是否需要平仓(部分或全部)
## 决策类型
### 1. HOLD保持
- 适用场景:行情符合预期,趋势延续
- 操作:不改变任何设置
### 2. ADJUST_SL_TP调整止损止盈
- 适用场景:
- **盈利状态**:趋势强劲,可以收紧止损锁定更多利润
- **亏损状态**:支撑/阻力位变化,需要调整止损到更合理位置
- **目标接近**:原止盈目标接近,但趋势仍强,可上移止盈
- 操作:更新 stop_loss 和/或 take_profit
### 3. PARTIAL_CLOSE部分平仓
- 适用场景:
- 盈利较大,但不确定性增加
- 重要阻力位附近,锁定部分利润
- 趋势有转弱迹象
- 操作:平掉 close_percent 比例的仓位
### 4. FULL_CLOSE全部平仓
- 适用场景:
- **止损型**:趋势明确反转,止损信号出现
- **止盈型**:目标达成,或出现更好的机会
- **风险型**:重大利空/利好的不确定性
- 操作:平掉全部仓位
## 调整原则
### 盈利状态(盈亏 > 0
1. **收紧止损**:如果盈利 > 2%,可以将止损移至保本或盈利 1% 位置
2. **部分止盈**:如果盈利 > 5% 且接近重要阻力位,可平掉 30-50% 仓位
3. **继续持有**:如果趋势强劲,可以放宽止损让利润奔跑
### 亏损状态(盈亏 < 0
1. **提前止损**:如果出现明确的反转信号,不要等止损触发
2. **调整止损**:如果关键支撑/阻力位变化,更新止损位置
3. **继续持有**:如果只是正常波动,原交易逻辑未变,继续持有
### 重要技术信号
1. **趋势反转**:多周期共振转反、跌破/突破关键 MA
2. **量价背离**:价格新高但成交量萎缩
3. **MACD 背离**:价格新高/新低但 MACD 未确认
4. **RSI 极端**RSI > 75 或 < 25 后掉头
## 输出格式
对于每个持仓,输出 JSON
```json
{
"order_id": "订单ID",
"action": "HOLD | ADJUST_SL_TP | PARTIAL_CLOSE | FULL_CLOSE",
"new_sl": 新止损价格(仅 ADJUST_SL_TP 时),
"new_tp": 新止盈价格(仅 ADJUST_SL_TP 时),
"close_percent": 平仓比例 0-100仅 PARTIAL_CLOSE 时),
"reason": "调整原因简明扼要20字以内"
}
```
## 重要原则
1. **主动管理**:不要被动等待止损触发,主动识别风险
2. **保护利润**:盈利状态下,优先考虑锁定利润
3. **果断止损**:亏损状态下,如果趋势反转,果断离场
4. **灵活调整**:根据最新行情,不局限于开仓时的判断
5. **考虑成本**:频繁调整会增加交易成本,只在有明确信号时调整
"""
async def review_positions(
self,
symbol: str,
positions: List[Dict[str, Any]],
data: Dict[str, pd.DataFrame]
) -> List[Dict[str, Any]]:
"""
回顾并分析现有持仓,给出调整建议
Args:
symbol: 交易对
positions: 持仓列表,每个持仓包含:
- order_id: 订单ID
- side: 'long' or 'short'
- entry_price: 开仓价格
- current_price: 当前价格
- stop_loss: 当前止损
- take_profit: 当前止盈
- quantity: 仓位数量
- pnl_percent: 盈亏百分比
- open_time: 开仓时间
data: 多周期K线数据
Returns:
调整建议列表
"""
if not positions:
return []
try:
# 构建持仓分析提示
prompt = self._build_position_review_prompt(symbol, positions, data)
# 调用 LLM使用异步方法避免阻塞事件循环
response = await llm_service.achat([
{"role": "system", "content": self.POSITION_REVIEW_PROMPT},
{"role": "user", "content": prompt}
], model_override=self.model_override)
if not response:
logger.warning(f"{symbol} 持仓回顾 LLM 分析无响应")
return []
# 解析响应
return self._parse_position_review_response(response)
except Exception as e:
logger.error(f"持仓回顾分析失败: {e}", exc_info=True)
return []
def _build_position_review_prompt(
self,
symbol: str,
positions: List[Dict[str, Any]],
data: Dict[str, pd.DataFrame]
) -> str:
"""构建持仓分析提示"""
lines = [f"# {symbol} 持仓回顾分析", f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"]
lines.append("\n## 当前持仓")
for idx, pos in enumerate(positions, 1):
side_text = "做多 📈" if pos['side'] == 'long' else "做空 📉"
pnl_text = f"+{pos['pnl_percent']:.1f}%" if pos['pnl_percent'] >= 0 else f"{pos['pnl_percent']:.1f}%"
pnl_emoji = "" if pos['pnl_percent'] >= 0 else ""
lines.append(f"\n### 持仓 {idx}: {pos['order_id']}")
lines.append(f"- 方向: {side_text}")
lines.append(f"- 开仓价: ${pos['entry_price']:,.2f}")
lines.append(f"- 当前价: ${pos['current_price']:,.2f}")
lines.append(f"- 盈亏: {pnl_emoji} {pnl_text}")
lines.append(f"- 止损: ${pos['stop_loss']:,.2f}")
lines.append(f"- 止盈: ${pos['take_profit']:,.2f}")
lines.append(f"- 仓位: ${pos['quantity']:,.0f}")
# 计算持仓时间
if 'open_time' in pos:
open_time = pos['open_time']
if isinstance(open_time, str):
open_time = datetime.fromisoformat(open_time)
duration = datetime.now() - open_time
hours = duration.total_seconds() / 3600
lines.append(f"- 持仓时间: {hours:.1f} 小时")
# 添加市场分析
lines.append("\n## 最新市场分析")
# 使用 1h 和 4h 数据分析
for interval in ['4h', '1h']:
df = data.get(interval)
if df is None or len(df) < 20:
continue
latest = df.iloc[-1]
prev = df.iloc[-2]
lines.append(f"\n### {interval} 周期")
lines.append(f"- 当前价格: ${latest['close']:,.2f}")
lines.append(f"- 涨跌幅: {((latest['close'] - prev['close']) / prev['close'] * 100):+.2f}%")
if 'ma5' in df.columns and pd.notna(latest['ma5']):
lines.append(f"- MA5: ${latest['ma5']:,.2f}")
if 'ma20' in df.columns and pd.notna(latest['ma20']):
lines.append(f"- MA20: ${latest['ma20']:,.2f}")
if 'rsi' in df.columns and pd.notna(latest['rsi']):
rsi_val = latest['rsi']
rsi_status = "超买" if rsi_val > 70 else "超卖" if rsi_val < 30 else "正常"
lines.append(f"- RSI: {rsi_val:.1f} ({rsi_status})")
if 'macd' in df.columns and pd.notna(latest['macd']):
macd_trend = "多头" if latest['macd'] > 0 else "空头"
lines.append(f"- MACD: {latest['macd']:.4f} ({macd_trend})")
# 添加趋势判断
lines.append("\n## 请给出调整建议")
lines.append("对于每个持仓,请分析是否需要调整,并按 JSON 格式输出。")
return "\n".join(lines)
def _parse_position_review_response(self, response: str) -> List[Dict[str, Any]]:
"""解析持仓回顾响应"""
try:
# 尝试提取 JSON 数组
import json
import re
# 查找 JSON 数组
json_match = re.search(r'\[\s*\{.*?\}\s*\]', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
decisions = json.loads(json_str)
# 验证每个决策的格式
valid_decisions = []
for decision in decisions:
if 'order_id' in decision and 'action' in decision:
valid_decisions.append(decision)
else:
logger.warning(f"无效的决策格式: {decision}")
return valid_decisions
# 如果找不到 JSON 数组,尝试解析单个对象
json_match = re.search(r'\{[^{}]*"action"[^{}]*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
decision = json.loads(json_str)
if 'order_id' in decision and 'action' in decision:
return [decision]
logger.warning(f"无法解析持仓回顾响应: {response[:200]}")
return []
except json.JSONDecodeError as e:
logger.error(f"解析持仓回顾 JSON 失败: {e}")
return []
except Exception as e:
logger.error(f"解析持仓回顾响应时出错: {e}")
return []