stock-ai-agent/backend/app/crypto_agent/llm_signal_analyzer.py
2026-02-07 17:31:38 +08:00

544 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
LLM 驱动的信号分析器 - 让 LLM 自主分析市场数据并给出交易信号
"""
import json
import re
import pandas as pd
from typing import Dict, Any, Optional, List
from datetime import datetime
from app.utils.logger import logger
from app.services.llm_service import llm_service
from app.services.news_service import get_news_service
class LLMSignalAnalyzer:
"""LLM 驱动的交易信号分析器"""
# 系统提示词 - 让 LLM 自主分析
SYSTEM_PROMPT = """你是一位专业的加密货币交易员和技术分析师。你的任务是综合分析市场数据和新闻舆情,**积极寻找交易机会**。
## 核心理念
加密货币市场波动大,每天都有交易机会。你的目标是:
- **主动寻找机会**,而不是被动等待完美信号
- 短线交易重点关注:超跌反弹、超涨回落、关键位突破
- 中线交易重点关注:趋势回调、形态突破、多周期共振
## 你的分析方法
运用以下技术分析方法寻找入场点:
- **趋势判断**:均线排列、高低点结构
- **动量指标**RSI 超买超卖(<30 超卖机会,>70 超买机会、MACD 金叉死叉
- **支撑阻力**:关键价位的突破或反弹
- **K线形态**:锤子线、吞没形态、十字星等反转信号
- **布林带**:触及下轨反弹、触及上轨回落、收口后突破
## 日内交易机会识别
以下情况应该给出**短线信号**(置信度 60-75
1. RSI < 30 且出现止跌迹象(超跌反弹做多)
2. RSI > 70 且出现滞涨迹象(超涨回落做空)
3. 价格触及布林带下轨并企稳(反弹做多)
4. 价格触及布林带上轨并受阻(回落做空)
5. 5分钟/15分钟级别 MACD 金叉/死叉 + 量能配合
6. 关键支撑位/阻力位的突破或反弹
## 波段交易机会识别
以下情况应该给出**中线信号**(置信度 70-85
1. 1小时级别趋势明确 + 回调到均线支撑
2. 4小时级别形态突破三角形、旗形等
3. 多周期 RSI 共振(如 1H 和 4H 同时超卖)
4. 重大利好/利空消息 + 技术面配合
## 新闻舆情分析
结合最新市场新闻:
- 重大利好/利空消息
- 市场情绪(恐慌/贪婪)
- 大户/机构动向
## 入场方式
- **market**:现价立即入场 - 信号已经触发,建议立即开仓
- **limit**:挂单等待入场 - 等价格回调到更好位置再入场
## 输出格式
请严格按照以下 JSON 格式输出:
```json
{
"analysis_summary": "简要描述当前市场状态50字以内",
"news_sentiment": "positive/negative/neutral",
"news_impact": "新闻对市场的影响分析30字以内",
"signals": [
{
"type": "short_term/medium_term/long_term",
"action": "buy/sell/wait",
"entry_type": "market/limit",
"confidence": 0-100,
"grade": "A/B/C/D",
"entry_price": 建议入场价,
"stop_loss": 止损价,
"take_profit": 止盈价,
"reason": "详细的入场理由",
"risk_warning": "风险提示"
}
],
"key_levels": {
"support": [支撑位列表],
"resistance": [阻力位列表]
}
}
```
## 信号等级与置信度
- **A级**80-100多重信号共振强烈建议入场
- **B级**60-79信号较好可以入场
- **C级**40-59有机会但需谨慎
- **D级**<40不建议交易
## 重要原则
1. **积极但不冒进** - 有合理依据就给出信号,不要过于保守
2. 每种类型最多输出一个信号
3. 止损必须明确,风险收益比至少 1:1.5
4. reason 字段要具体说明技术依据(如"15M RSI=28 超卖MACD 即将金叉"
5. entry_type 必须明确:信号已触发用 market等待更好价位用 limit
6. 短线信号止损控制在 1-2%,中线信号止损控制在 2-4%"""
def __init__(self):
"""初始化分析器"""
self.news_service = get_news_service()
logger.info("LLM 信号分析器初始化完成(含新闻舆情)")
async def analyze(self, symbol: str, data: Dict[str, pd.DataFrame],
symbols: List[str] = None) -> Dict[str, Any]:
"""
使用 LLM 分析市场数据
Args:
symbol: 交易对,如 'BTCUSDT'
data: 多周期K线数据 {'5m': df, '15m': df, '1h': df, '4h': df}
symbols: 所有监控的交易对(用于过滤相关新闻)
Returns:
分析结果
"""
try:
# 获取新闻数据
news_text = await self._get_news_context(symbol, symbols or [symbol])
# 构建数据提示
data_prompt = self._build_data_prompt(symbol, data, news_text)
# 调用 LLM
response = llm_service.chat([
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": data_prompt}
])
if not response:
logger.warning(f"{symbol} LLM 分析无响应")
return self._empty_result(symbol, "LLM 无响应")
# 解析响应
result = self._parse_response(response)
result['symbol'] = symbol
result['timestamp'] = datetime.now().isoformat()
# 记录日志
signals = result.get('signals', [])
if signals:
for sig in signals:
logger.info(f"{symbol} [{sig['type']}] {sig['action']} "
f"置信度:{sig['confidence']}% 等级:{sig['grade']} "
f"原因:{sig['reason'][:50]}...")
else:
logger.info(f"{symbol} 无交易信号 - {result.get('analysis_summary', '观望')}")
return result
except Exception as e:
logger.error(f"{symbol} LLM 分析出错: {e}")
import traceback
logger.error(traceback.format_exc())
return self._empty_result(symbol, str(e))
async def _get_news_context(self, symbol: str, symbols: List[str]) -> str:
"""获取新闻上下文"""
try:
# 获取最新新闻
all_news = await self.news_service.get_latest_news(limit=30)
# 过滤相关新闻最近4小时
relevant_news = self.news_service.filter_relevant_news(
all_news, symbols=symbols, hours=4
)
if not relevant_news:
return "暂无相关新闻"
# 格式化新闻
return self.news_service.format_news_for_llm(relevant_news, max_items=8)
except Exception as e:
logger.warning(f"获取新闻失败: {e}")
return "新闻数据暂时不可用"
def _build_data_prompt(self, symbol: str, data: Dict[str, pd.DataFrame],
news_text: str = "") -> str:
"""构建数据提示词"""
parts = [f"# {symbol} 市场数据分析\n"]
parts.append(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# 当前价格
if '5m' in data and not data['5m'].empty:
current_price = float(data['5m'].iloc[-1]['close'])
parts.append(f"**当前价格**: ${current_price:,.2f}\n")
# 各周期数据
for interval in ['4h', '1h', '15m', '5m']:
df = data.get(interval)
if df is None or df.empty:
continue
parts.append(f"\n## {interval.upper()} 周期数据")
# 最新指标
latest = df.iloc[-1]
parts.append(self._format_indicators(latest))
# 最近 K 线数据
parts.append(self._format_recent_klines(df, interval))
# 添加新闻数据
if news_text and news_text != "暂无相关新闻":
parts.append(f"\n{news_text}")
parts.append("\n---")
parts.append("请综合分析以上技术数据和新闻舆情,判断是否存在短线、中线或长线的交易机会。")
parts.append("如果没有明确的交易机会signals 数组返回空即可。")
return "\n".join(parts)
def _format_indicators(self, row: pd.Series) -> str:
"""格式化指标数据"""
lines = []
# 价格
close = row.get('close', 0)
open_price = row.get('open', 0)
high = row.get('high', 0)
low = row.get('low', 0)
change = ((close - open_price) / open_price * 100) if open_price else 0
lines.append(f"- K线: O={open_price:.2f} H={high:.2f} L={low:.2f} C={close:.2f} ({change:+.2f}%)")
# 均线
ma5 = row.get('ma5', 0)
ma10 = row.get('ma10', 0)
ma20 = row.get('ma20', 0)
ma50 = row.get('ma50', 0)
if pd.notna(ma20):
ma_str = f"- 均线: MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f}"
if pd.notna(ma50):
ma_str += f", MA50={ma50:.2f}"
lines.append(ma_str)
# RSI
rsi = row.get('rsi', 0)
if pd.notna(rsi):
rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性")
lines.append(f"- RSI: {rsi:.1f} ({rsi_status})")
# MACD
macd = row.get('macd', 0)
macd_signal = row.get('macd_signal', 0)
macd_hist = row.get('macd_hist', 0)
if pd.notna(macd):
macd_status = "多头" if macd > macd_signal else "空头"
lines.append(f"- MACD: DIF={macd:.4f}, DEA={macd_signal:.4f}, 柱={macd_hist:.4f} ({macd_status})")
# KDJ
k = row.get('k', 0)
d = row.get('d', 0)
j = row.get('j', 0)
if pd.notna(k):
lines.append(f"- KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}")
# 布林带
bb_upper = row.get('bb_upper', 0)
bb_middle = row.get('bb_middle', 0)
bb_lower = row.get('bb_lower', 0)
if pd.notna(bb_upper):
lines.append(f"- 布林带: 上={bb_upper:.2f}, 中={bb_middle:.2f}, 下={bb_lower:.2f}")
# ATR
atr = row.get('atr', 0)
if pd.notna(atr):
lines.append(f"- ATR: {atr:.2f}")
# 成交量
volume = row.get('volume', 0)
volume_ratio = row.get('volume_ratio', 0)
if pd.notna(volume_ratio):
vol_status = "放量" if volume_ratio > 1.5 else ("缩量" if volume_ratio < 0.5 else "正常")
lines.append(f"- 成交量: {volume:.2f}, 量比={volume_ratio:.2f} ({vol_status})")
return "\n".join(lines)
def _format_recent_klines(self, df: pd.DataFrame, interval: str) -> str:
"""格式化最近 K 线"""
# 根据周期决定显示数量
count = {'4h': 6, '1h': 12, '15m': 8, '5m': 6}.get(interval, 6)
if len(df) < count:
count = len(df)
lines = [f"\n最近 {count} 根 K 线:"]
lines.append("| 时间 | 开盘 | 最高 | 最低 | 收盘 | 涨跌 | RSI |")
lines.append("|------|------|------|------|------|------|-----|")
for i in range(-count, 0):
row = df.iloc[i]
change = ((row['close'] - row['open']) / row['open'] * 100) if row['open'] else 0
change_str = f"{change:+.2f}%"
time_str = row['open_time'].strftime('%m-%d %H:%M') if pd.notna(row.get('open_time')) else 'N/A'
rsi = row.get('rsi', 0)
rsi_str = f"{rsi:.0f}" if pd.notna(rsi) else "-"
lines.append(f"| {time_str} | {row['open']:.2f} | {row['high']:.2f} | "
f"{row['low']:.2f} | {row['close']:.2f} | {change_str} | {rsi_str} |")
return "\n".join(lines)
def _parse_response(self, response: str) -> Dict[str, Any]:
"""解析 LLM 响应"""
result = {
'raw_response': response,
'analysis_summary': '',
'signals': [],
'key_levels': {'support': [], 'resistance': []}
}
try:
# 尝试提取 JSON
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response)
if json_match:
json_str = json_match.group(1)
else:
# 尝试直接解析
json_str = response
parsed = json.loads(json_str)
result['analysis_summary'] = parsed.get('analysis_summary', '')
result['signals'] = parsed.get('signals', [])
result['key_levels'] = parsed.get('key_levels', {'support': [], 'resistance': []})
# 验证和清理信号
valid_signals = []
for sig in result['signals']:
if self._validate_signal(sig):
valid_signals.append(sig)
result['signals'] = valid_signals
except json.JSONDecodeError:
logger.warning("LLM 响应不是有效 JSON尝试提取关键信息")
result['analysis_summary'] = self._extract_summary(response)
return result
def _validate_signal(self, signal: Dict[str, Any]) -> bool:
"""验证信号是否有效"""
required_fields = ['type', 'action', 'confidence', 'grade', 'reason']
for field in required_fields:
if field not in signal:
return False
# 验证类型
if signal['type'] not in ['short_term', 'medium_term', 'long_term']:
return False
# 验证动作
if signal['action'] not in ['buy', 'sell', 'wait']:
return False
# wait 动作不算有效信号
if signal['action'] == 'wait':
return False
# 验证置信度(必须 >= 60 才算有效信号,即 B 级及以上)
confidence = signal.get('confidence', 0)
if not isinstance(confidence, (int, float)) or confidence < 60:
return False
# 验证入场类型(默认为 market
entry_type = signal.get('entry_type', 'market')
if entry_type not in ['market', 'limit']:
signal['entry_type'] = 'market' # 默认现价入场
return True
def _extract_summary(self, text: str) -> str:
"""从文本中提取摘要"""
text = text.strip()
if len(text) > 100:
return text[:100] + "..."
return text
def _empty_result(self, symbol: str, reason: str = "") -> Dict[str, Any]:
"""返回空结果"""
return {
'symbol': symbol,
'timestamp': datetime.now().isoformat(),
'analysis_summary': reason or '无法分析',
'signals': [],
'key_levels': {'support': [], 'resistance': []},
'error': reason
}
def get_best_signal(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
从分析结果中获取最佳信号
Args:
result: analyze() 的返回结果
Returns:
最佳信号,如果没有则返回 None
"""
signals = result.get('signals', [])
if not signals:
return None
# 按置信度排序
sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True)
return sorted_signals[0]
def format_signal_message(self, signal: Dict[str, Any], symbol: str) -> str:
"""
格式化信号消息(用于 Telegram 通知)
Args:
signal: 信号数据
symbol: 交易对
Returns:
格式化的消息文本
"""
type_map = {
'short_term': '短线',
'medium_term': '中线',
'long_term': '长线'
}
action_map = {
'buy': '做多',
'sell': '做空'
}
signal_type = type_map.get(signal['type'], signal['type'])
action = action_map.get(signal['action'], signal['action'])
grade = signal.get('grade', 'C')
confidence = signal.get('confidence', 0)
entry_type = signal.get('entry_type', 'market')
# 等级图标
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(grade, '')
# 方向图标
action_icon = '🟢' if signal['action'] == 'buy' else '🔴'
# 入场类型
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
entry_type_icon = '' if entry_type == 'market' else ''
# 计算风险收益比
entry = signal.get('entry_price', 0)
sl = signal.get('stop_loss', 0)
tp = signal.get('take_profit', 0)
sl_percent = ((sl - entry) / entry * 100) if entry else 0
tp_percent = ((tp - entry) / entry * 100) if entry else 0
message = f"""📊 {symbol} {signal_type}信号
{action_icon} **方向**: {action}
{entry_type_icon} **入场**: {entry_type_text}
⭐ **等级**: {grade} {grade_icon}
📈 **置信度**: {confidence}%
💰 **入场价**: ${entry:,.2f}
🛑 **止损价**: ${sl:,.2f} ({sl_percent:+.1f}%)
🎯 **止盈价**: ${tp:,.2f} ({tp_percent:+.1f}%)
📝 **分析理由**:
{signal.get('reason', '')}
⚠️ **风险提示**:
{signal.get('risk_warning', '请注意风险控制')}"""
return message
def format_feishu_card(self, signal: Dict[str, Any], symbol: str) -> Dict[str, Any]:
"""
格式化飞书卡片消息
Args:
signal: 信号数据
symbol: 交易对
Returns:
包含 title, content, color 的字典
"""
type_map = {
'short_term': '短线',
'medium_term': '中线',
'long_term': '长线'
}
action_map = {
'buy': '做多',
'sell': '做空'
}
signal_type = type_map.get(signal['type'], signal['type'])
action = action_map.get(signal['action'], signal['action'])
grade = signal.get('grade', 'C')
confidence = signal.get('confidence', 0)
entry_type = signal.get('entry_type', 'market')
# 等级图标
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(grade, '')
# 入场类型
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
entry_type_icon = '' if entry_type == 'market' else ''
# 标题和颜色
if signal['action'] == 'buy':
title = f"🟢 {symbol} {signal_type}做多信号 [{entry_type_text}]"
color = "green"
else:
title = f"🔴 {symbol} {signal_type}做空信号 [{entry_type_text}]"
color = "red"
# 计算风险收益比
entry = signal.get('entry_price', 0)
sl = signal.get('stop_loss', 0)
tp = signal.get('take_profit', 0)
sl_percent = ((sl - entry) / entry * 100) if entry else 0
tp_percent = ((tp - entry) / entry * 100) if entry else 0
# 构建 Markdown 内容
content_parts = [
f"**{signal_type}** | **{grade}**{grade_icon} | **{confidence}%** 置信度",
f"{entry_type_icon} **入场方式**: {entry_type_text}",
"",
f"💰 **入场**: ${entry:,.2f}",
f"🛑 **止损**: ${sl:,.2f} ({sl_percent:+.1f}%)",
f"🎯 **止盈**: ${tp:,.2f} ({tp_percent:+.1f}%)",
"",
f"📝 **分析理由**:",
f"{signal.get('reason', '')}",
"",
f"⚠️ **风险提示**:",
f"{signal.get('risk_warning', '请注意风险控制')}",
]
return {
'title': title,
'content': '\n'.join(content_parts),
'color': color
}