stock-ai-agent/backend/app/crypto_agent/llm_signal_analyzer.py
2026-02-07 01:43:24 +08:00

544 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
LLM 驱动的信号分析器 - 让 LLM 自主分析市场数据并给出交易信号
"""
import json
import re
import pandas as pd
from typing import Dict, Any, Optional, List
from datetime import datetime
from app.utils.logger import logger
from app.services.llm_service import llm_service
from app.services.news_service import get_news_service
class LLMSignalAnalyzer:
"""LLM 驱动的交易信号分析器"""
# 系统提示词 - 让 LLM 自主分析
SYSTEM_PROMPT = """你是一位专业的加密货币技术分析师。你的任务是综合分析市场数据和新闻舆情,判断是否存在交易机会。
## 你的分析方法
你可以自由运用你所知道的任何技术分析方法,包括但不限于:
- 趋势分析(均线、趋势线、高低点)
- 动量指标RSI、MACD、KDJ 等)
- 波动率分析布林带、ATR
- 价格形态K线形态、图表形态
- 支撑阻力位
- 成交量分析
- 多周期共振
## 新闻舆情分析
你还需要结合最新的市场新闻进行分析:
- 重大利好/利空消息
- 市场情绪(恐慌/贪婪)
- 大户/机构动向
- 监管政策变化
- 宏观经济影响
## 信号类型
请判断是否存在以下三种类型的交易机会:
1. **短线信号**(持仓 4小时 - 1天
- 适合快速的超跌反弹或超涨回落
- 风险较高,需要快速止盈止损
2. **中线信号**(持仓 1-7 天)
- 波段交易,顺势回调入场
- 风险适中,有明确的止损止盈
3. **长线信号**(持仓 1周以上
- 趋势交易,大级别趋势确认
- 风险较低,止损较宽
## 入场方式
你需要明确指定入场方式:
- **market**:现价立即入场 - 当前价格就是好的入场点,建议立即开仓
- **limit**:挂单等待入场 - 等价格回调/突破到指定位置再入场
## 输出格式
请严格按照以下 JSON 格式输出你的分析结果:
```json
{
"analysis_summary": "简要描述当前市场状态50字以内",
"news_sentiment": "positive/negative/neutral",
"news_impact": "新闻对市场的影响分析30字以内",
"signals": [
{
"type": "short_term/medium_term/long_term",
"action": "buy/sell/wait",
"entry_type": "market/limit",
"confidence": 0-100,
"grade": "A/B/C/D",
"entry_price": 建议入场价,
"stop_loss": 止损价,
"take_profit": 止盈价,
"reason": "详细的入场理由,说明你看到了什么技术信号和消息面因素",
"risk_warning": "风险提示"
}
],
"key_levels": {
"support": [支撑位列表],
"resistance": [阻力位列表]
}
}
```
## 信号等级说明
- **A级**:技术面+消息面共振高置信度80+),强烈建议入场
- **B级**信号较好置信度中等60-80可以入场
- **C级**有机会但需谨慎40-60轻仓试探
- **D级**:不建议交易(<40继续观望
## 重要原则
1. 宁可错过,不要做错 - 没有明确信号时输出空的 signals 数组
2. 每种类型最多输出一个信号
3. 止损必须明确,风险收益比至少 1:1.5
4. 如果市场混乱或数据不足,直接建议观望
5. reason 字段要具体说明你看到了什么(如"15M RSI 从 25 回升到 35同时 MACD 金叉,且有大户加仓消息"
6. 消息面和技术面冲突时,优先考虑技术面,但要在 risk_warning 中提示
7. entry_type 必须明确:如果当前价格合适立即入场用 market如果需要等待更好价位用 limit
8. limit 挂单的 entry_price 应该是你期望的入场价位,而不是当前价格"""
def __init__(self):
"""初始化分析器"""
self.news_service = get_news_service()
logger.info("LLM 信号分析器初始化完成(含新闻舆情)")
async def analyze(self, symbol: str, data: Dict[str, pd.DataFrame],
symbols: List[str] = None) -> Dict[str, Any]:
"""
使用 LLM 分析市场数据
Args:
symbol: 交易对,如 'BTCUSDT'
data: 多周期K线数据 {'5m': df, '15m': df, '1h': df, '4h': df}
symbols: 所有监控的交易对(用于过滤相关新闻)
Returns:
分析结果
"""
try:
# 获取新闻数据
news_text = await self._get_news_context(symbol, symbols or [symbol])
# 构建数据提示
data_prompt = self._build_data_prompt(symbol, data, news_text)
# 调用 LLM
response = llm_service.chat([
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": data_prompt}
])
if not response:
logger.warning(f"{symbol} LLM 分析无响应")
return self._empty_result(symbol, "LLM 无响应")
# 解析响应
result = self._parse_response(response)
result['symbol'] = symbol
result['timestamp'] = datetime.now().isoformat()
# 记录日志
signals = result.get('signals', [])
if signals:
for sig in signals:
logger.info(f"{symbol} [{sig['type']}] {sig['action']} "
f"置信度:{sig['confidence']}% 等级:{sig['grade']} "
f"原因:{sig['reason'][:50]}...")
else:
logger.info(f"{symbol} 无交易信号 - {result.get('analysis_summary', '观望')}")
return result
except Exception as e:
logger.error(f"{symbol} LLM 分析出错: {e}")
import traceback
logger.error(traceback.format_exc())
return self._empty_result(symbol, str(e))
async def _get_news_context(self, symbol: str, symbols: List[str]) -> str:
"""获取新闻上下文"""
try:
# 获取最新新闻
all_news = await self.news_service.get_latest_news(limit=30)
# 过滤相关新闻最近4小时
relevant_news = self.news_service.filter_relevant_news(
all_news, symbols=symbols, hours=4
)
if not relevant_news:
return "暂无相关新闻"
# 格式化新闻
return self.news_service.format_news_for_llm(relevant_news, max_items=8)
except Exception as e:
logger.warning(f"获取新闻失败: {e}")
return "新闻数据暂时不可用"
def _build_data_prompt(self, symbol: str, data: Dict[str, pd.DataFrame],
news_text: str = "") -> str:
"""构建数据提示词"""
parts = [f"# {symbol} 市场数据分析\n"]
parts.append(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# 当前价格
if '5m' in data and not data['5m'].empty:
current_price = float(data['5m'].iloc[-1]['close'])
parts.append(f"**当前价格**: ${current_price:,.2f}\n")
# 各周期数据
for interval in ['4h', '1h', '15m', '5m']:
df = data.get(interval)
if df is None or df.empty:
continue
parts.append(f"\n## {interval.upper()} 周期数据")
# 最新指标
latest = df.iloc[-1]
parts.append(self._format_indicators(latest))
# 最近 K 线数据
parts.append(self._format_recent_klines(df, interval))
# 添加新闻数据
if news_text and news_text != "暂无相关新闻":
parts.append(f"\n{news_text}")
parts.append("\n---")
parts.append("请综合分析以上技术数据和新闻舆情,判断是否存在短线、中线或长线的交易机会。")
parts.append("如果没有明确的交易机会signals 数组返回空即可。")
return "\n".join(parts)
def _format_indicators(self, row: pd.Series) -> str:
"""格式化指标数据"""
lines = []
# 价格
close = row.get('close', 0)
open_price = row.get('open', 0)
high = row.get('high', 0)
low = row.get('low', 0)
change = ((close - open_price) / open_price * 100) if open_price else 0
lines.append(f"- K线: O={open_price:.2f} H={high:.2f} L={low:.2f} C={close:.2f} ({change:+.2f}%)")
# 均线
ma5 = row.get('ma5', 0)
ma10 = row.get('ma10', 0)
ma20 = row.get('ma20', 0)
ma50 = row.get('ma50', 0)
if pd.notna(ma20):
ma_str = f"- 均线: MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f}"
if pd.notna(ma50):
ma_str += f", MA50={ma50:.2f}"
lines.append(ma_str)
# RSI
rsi = row.get('rsi', 0)
if pd.notna(rsi):
rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性")
lines.append(f"- RSI: {rsi:.1f} ({rsi_status})")
# MACD
macd = row.get('macd', 0)
macd_signal = row.get('macd_signal', 0)
macd_hist = row.get('macd_hist', 0)
if pd.notna(macd):
macd_status = "多头" if macd > macd_signal else "空头"
lines.append(f"- MACD: DIF={macd:.4f}, DEA={macd_signal:.4f}, 柱={macd_hist:.4f} ({macd_status})")
# KDJ
k = row.get('k', 0)
d = row.get('d', 0)
j = row.get('j', 0)
if pd.notna(k):
lines.append(f"- KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}")
# 布林带
bb_upper = row.get('bb_upper', 0)
bb_middle = row.get('bb_middle', 0)
bb_lower = row.get('bb_lower', 0)
if pd.notna(bb_upper):
lines.append(f"- 布林带: 上={bb_upper:.2f}, 中={bb_middle:.2f}, 下={bb_lower:.2f}")
# ATR
atr = row.get('atr', 0)
if pd.notna(atr):
lines.append(f"- ATR: {atr:.2f}")
# 成交量
volume = row.get('volume', 0)
volume_ratio = row.get('volume_ratio', 0)
if pd.notna(volume_ratio):
vol_status = "放量" if volume_ratio > 1.5 else ("缩量" if volume_ratio < 0.5 else "正常")
lines.append(f"- 成交量: {volume:.2f}, 量比={volume_ratio:.2f} ({vol_status})")
return "\n".join(lines)
def _format_recent_klines(self, df: pd.DataFrame, interval: str) -> str:
"""格式化最近 K 线"""
# 根据周期决定显示数量
count = {'4h': 6, '1h': 12, '15m': 8, '5m': 6}.get(interval, 6)
if len(df) < count:
count = len(df)
lines = [f"\n最近 {count} 根 K 线:"]
lines.append("| 时间 | 开盘 | 最高 | 最低 | 收盘 | 涨跌 | RSI |")
lines.append("|------|------|------|------|------|------|-----|")
for i in range(-count, 0):
row = df.iloc[i]
change = ((row['close'] - row['open']) / row['open'] * 100) if row['open'] else 0
change_str = f"{change:+.2f}%"
time_str = row['open_time'].strftime('%m-%d %H:%M') if pd.notna(row.get('open_time')) else 'N/A'
rsi = row.get('rsi', 0)
rsi_str = f"{rsi:.0f}" if pd.notna(rsi) else "-"
lines.append(f"| {time_str} | {row['open']:.2f} | {row['high']:.2f} | "
f"{row['low']:.2f} | {row['close']:.2f} | {change_str} | {rsi_str} |")
return "\n".join(lines)
def _parse_response(self, response: str) -> Dict[str, Any]:
"""解析 LLM 响应"""
result = {
'raw_response': response,
'analysis_summary': '',
'signals': [],
'key_levels': {'support': [], 'resistance': []}
}
try:
# 尝试提取 JSON
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response)
if json_match:
json_str = json_match.group(1)
else:
# 尝试直接解析
json_str = response
parsed = json.loads(json_str)
result['analysis_summary'] = parsed.get('analysis_summary', '')
result['signals'] = parsed.get('signals', [])
result['key_levels'] = parsed.get('key_levels', {'support': [], 'resistance': []})
# 验证和清理信号
valid_signals = []
for sig in result['signals']:
if self._validate_signal(sig):
valid_signals.append(sig)
result['signals'] = valid_signals
except json.JSONDecodeError:
logger.warning("LLM 响应不是有效 JSON尝试提取关键信息")
result['analysis_summary'] = self._extract_summary(response)
return result
def _validate_signal(self, signal: Dict[str, Any]) -> bool:
"""验证信号是否有效"""
required_fields = ['type', 'action', 'confidence', 'grade', 'reason']
for field in required_fields:
if field not in signal:
return False
# 验证类型
if signal['type'] not in ['short_term', 'medium_term', 'long_term']:
return False
# 验证动作
if signal['action'] not in ['buy', 'sell', 'wait']:
return False
# wait 动作不算有效信号
if signal['action'] == 'wait':
return False
# 验证置信度(必须 >= 80 才算有效信号)
confidence = signal.get('confidence', 0)
if not isinstance(confidence, (int, float)) or confidence < 80:
return False
# 验证入场类型(默认为 market
entry_type = signal.get('entry_type', 'market')
if entry_type not in ['market', 'limit']:
signal['entry_type'] = 'market' # 默认现价入场
return True
def _extract_summary(self, text: str) -> str:
"""从文本中提取摘要"""
text = text.strip()
if len(text) > 100:
return text[:100] + "..."
return text
def _empty_result(self, symbol: str, reason: str = "") -> Dict[str, Any]:
"""返回空结果"""
return {
'symbol': symbol,
'timestamp': datetime.now().isoformat(),
'analysis_summary': reason or '无法分析',
'signals': [],
'key_levels': {'support': [], 'resistance': []},
'error': reason
}
def get_best_signal(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
从分析结果中获取最佳信号
Args:
result: analyze() 的返回结果
Returns:
最佳信号,如果没有则返回 None
"""
signals = result.get('signals', [])
if not signals:
return None
# 按置信度排序
sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True)
return sorted_signals[0]
def format_signal_message(self, signal: Dict[str, Any], symbol: str) -> str:
"""
格式化信号消息(用于 Telegram 通知)
Args:
signal: 信号数据
symbol: 交易对
Returns:
格式化的消息文本
"""
type_map = {
'short_term': '短线',
'medium_term': '中线',
'long_term': '长线'
}
action_map = {
'buy': '做多',
'sell': '做空'
}
signal_type = type_map.get(signal['type'], signal['type'])
action = action_map.get(signal['action'], signal['action'])
grade = signal.get('grade', 'C')
confidence = signal.get('confidence', 0)
entry_type = signal.get('entry_type', 'market')
# 等级图标
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(grade, '')
# 方向图标
action_icon = '🟢' if signal['action'] == 'buy' else '🔴'
# 入场类型
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
entry_type_icon = '' if entry_type == 'market' else ''
# 计算风险收益比
entry = signal.get('entry_price', 0)
sl = signal.get('stop_loss', 0)
tp = signal.get('take_profit', 0)
sl_percent = ((sl - entry) / entry * 100) if entry else 0
tp_percent = ((tp - entry) / entry * 100) if entry else 0
message = f"""📊 {symbol} {signal_type}信号
{action_icon} **方向**: {action}
{entry_type_icon} **入场**: {entry_type_text}
⭐ **等级**: {grade} {grade_icon}
📈 **置信度**: {confidence}%
💰 **入场价**: ${entry:,.2f}
🛑 **止损价**: ${sl:,.2f} ({sl_percent:+.1f}%)
🎯 **止盈价**: ${tp:,.2f} ({tp_percent:+.1f}%)
📝 **分析理由**:
{signal.get('reason', '')}
⚠️ **风险提示**:
{signal.get('risk_warning', '请注意风险控制')}"""
return message
def format_feishu_card(self, signal: Dict[str, Any], symbol: str) -> Dict[str, Any]:
"""
格式化飞书卡片消息
Args:
signal: 信号数据
symbol: 交易对
Returns:
包含 title, content, color 的字典
"""
type_map = {
'short_term': '短线',
'medium_term': '中线',
'long_term': '长线'
}
action_map = {
'buy': '做多',
'sell': '做空'
}
signal_type = type_map.get(signal['type'], signal['type'])
action = action_map.get(signal['action'], signal['action'])
grade = signal.get('grade', 'C')
confidence = signal.get('confidence', 0)
entry_type = signal.get('entry_type', 'market')
# 等级图标
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(grade, '')
# 入场类型
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
entry_type_icon = '' if entry_type == 'market' else ''
# 标题和颜色
if signal['action'] == 'buy':
title = f"🟢 {symbol} {signal_type}做多信号 [{entry_type_text}]"
color = "green"
else:
title = f"🔴 {symbol} {signal_type}做空信号 [{entry_type_text}]"
color = "red"
# 计算风险收益比
entry = signal.get('entry_price', 0)
sl = signal.get('stop_loss', 0)
tp = signal.get('take_profit', 0)
sl_percent = ((sl - entry) / entry * 100) if entry else 0
tp_percent = ((tp - entry) / entry * 100) if entry else 0
# 构建 Markdown 内容
content_parts = [
f"**{signal_type}** | **{grade}**{grade_icon} | **{confidence}%** 置信度",
f"{entry_type_icon} **入场方式**: {entry_type_text}",
"",
f"💰 **入场**: ${entry:,.2f}",
f"🛑 **止损**: ${sl:,.2f} ({sl_percent:+.1f}%)",
f"🎯 **止盈**: ${tp:,.2f} ({tp_percent:+.1f}%)",
"",
f"📝 **分析理由**:",
f"{signal.get('reason', '')}",
"",
f"⚠️ **风险提示**:",
f"{signal.get('risk_warning', '请注意风险控制')}",
]
return {
'title': title,
'content': '\n'.join(content_parts),
'color': color
}