650 lines
24 KiB
Python
650 lines
24 KiB
Python
"""
|
||
LLM 驱动的信号分析器 - 让 LLM 自主分析市场数据并给出交易信号
|
||
"""
|
||
import json
|
||
import re
|
||
import pandas as pd
|
||
from typing import Dict, Any, Optional, List
|
||
from datetime import datetime
|
||
from app.utils.logger import logger
|
||
from app.services.llm_service import llm_service
|
||
from app.services.news_service import get_news_service
|
||
|
||
|
||
class LLMSignalAnalyzer:
|
||
"""LLM 驱动的交易信号分析器"""
|
||
|
||
# 系统提示词 - 让 LLM 自主分析
|
||
SYSTEM_PROMPT = """你是一位专业的加密货币交易员和技术分析师。你的任务是综合分析**K线数据、量价关系、技术指标和新闻舆情**,给出交易信号。
|
||
|
||
## 核心理念
|
||
加密货币市场波动大,每天都有交易机会。你的目标是:
|
||
- **主动寻找机会**,而不是被动等待完美信号
|
||
- 短线交易重点关注:超跌反弹、超涨回落、关键位突破
|
||
- 中线交易重点关注:趋势回调、形态突破、多周期共振
|
||
|
||
## 一、量价分析(最重要)
|
||
量价关系是判断趋势真假的核心:
|
||
|
||
### 1. 健康上涨信号
|
||
- **放量上涨**:价格上涨 + 成交量放大(量比>1.5)= 上涨有效,可追多
|
||
- **缩量回调**:上涨后回调 + 成交量萎缩(量比<0.7)= 回调健康,可低吸
|
||
|
||
### 2. 健康下跌信号
|
||
- **放量下跌**:价格下跌 + 成交量放大 = 下跌有效,可追空
|
||
- **缩量反弹**:下跌后反弹 + 成交量萎缩 = 反弹无力,可做空
|
||
|
||
### 3. 量价背离(重要反转信号)
|
||
- **顶背离**:价格创新高,但成交量未创新高 → 上涨动能衰竭,警惕回落
|
||
- **底背离**:价格创新低,但成交量未创新低 → 下跌动能衰竭,关注反弹
|
||
- **天量见顶**:极端放量(量比>3)后价格滞涨 → 主力出货信号
|
||
- **地量见底**:极端缩量(量比<0.3)后价格企稳 → 抛压枯竭信号
|
||
|
||
### 4. 突破确认
|
||
- **有效突破**:突破关键位 + 放量确认(量比>1.5)= 真突破
|
||
- **假突破**:突破关键位 + 缩量 = 假突破,可能回落
|
||
|
||
## 二、K线形态分析
|
||
### 反转形态
|
||
- **锤子线/倒锤子**:下跌趋势中出现,下影线长 = 底部信号
|
||
- **吞没形态**:大阳吞没前一根阴线 = 看涨;大阴吞没前一根阳线 = 看跌
|
||
- **十字星**:在高位/低位出现 = 变盘信号
|
||
- **早晨之星/黄昏之星**:三根K线组合的反转信号
|
||
|
||
### 持续形态
|
||
- **三连阳/三连阴**:趋势延续信号
|
||
- **旗形整理**:趋势中的健康回调
|
||
|
||
## 三、技术指标分析
|
||
### RSI(相对强弱指标)
|
||
- RSI < 30:超卖区,关注反弹机会
|
||
- RSI > 70:超买区,关注回落风险
|
||
- RSI 背离:价格与 RSI 走势相反 = 重要反转信号
|
||
|
||
### MACD
|
||
- 金叉(DIF 上穿 DEA):做多信号
|
||
- 死叉(DIF 下穿 DEA):做空信号
|
||
- 零轴上方金叉:强势做多
|
||
- 零轴下方死叉:强势做空
|
||
- MACD 柱状图背离:重要反转信号
|
||
|
||
### 布林带
|
||
- 触及下轨 + 企稳:反弹做多
|
||
- 触及上轨 + 受阻:回落做空
|
||
- 布林带收口:即将变盘
|
||
- 布林带开口:趋势启动
|
||
|
||
### 均线系统
|
||
- 多头排列(MA5>MA10>MA20):上涨趋势
|
||
- 空头排列(MA5<MA10<MA20):下跌趋势
|
||
- 价格回踩均线支撑:低吸机会
|
||
- 价格反弹均线压力:做空机会
|
||
|
||
## 四、新闻舆情分析
|
||
结合最新市场新闻判断:
|
||
- **重大利好**:监管利好、机构入场、ETF 通过等 → 提高做多置信度
|
||
- **重大利空**:监管打压、交易所暴雷、黑客攻击等 → 提高做空置信度
|
||
- **市场情绪**:恐慌指数、社交媒体热度
|
||
- **大户动向**:鲸鱼转账、交易所流入流出
|
||
|
||
## 五、多周期共振
|
||
- 4H + 1H 同向 = 中线信号更可靠
|
||
- 1H + 15M 同向 = 短线信号更可靠
|
||
- 多周期 RSI 同时超买/超卖 = 强反转信号
|
||
|
||
## 六、入场方式
|
||
- **market**:现价立即入场 - 信号已经触发,建议立即开仓
|
||
- **limit**:挂单等待入场 - 等价格回调到更好位置再入场
|
||
|
||
## 输出格式
|
||
请严格按照以下 JSON 格式输出:
|
||
|
||
```json
|
||
{
|
||
"analysis_summary": "简要描述当前市场状态(50字以内)",
|
||
"volume_analysis": "量价分析结论(30字以内)",
|
||
"news_sentiment": "positive/negative/neutral",
|
||
"news_impact": "新闻对市场的影响分析(30字以内)",
|
||
"signals": [
|
||
{
|
||
"type": "short_term/medium_term/long_term",
|
||
"action": "buy/sell/wait",
|
||
"entry_type": "market/limit",
|
||
"confidence": 0-100,
|
||
"grade": "A/B/C/D",
|
||
"entry_price": 建议入场价,
|
||
"stop_loss": 止损价,
|
||
"take_profit": 止盈价,
|
||
"reason": "详细的入场理由(必须包含量价分析)",
|
||
"risk_warning": "风险提示"
|
||
}
|
||
],
|
||
"key_levels": {
|
||
"support": [支撑位列表],
|
||
"resistance": [阻力位列表]
|
||
}
|
||
}
|
||
```
|
||
|
||
## 信号等级与置信度
|
||
- **A级**(80-100):量价配合 + 多指标共振 + 多周期确认
|
||
- **B级**(60-79):量价配合 + 主要指标确认
|
||
- **C级**(40-59):有机会但量价不够理想
|
||
- **D级**(<40):量价背离或信号矛盾
|
||
|
||
## 重要原则
|
||
1. **量价优先** - 任何信号都必须有量能配合才可靠
|
||
2. **积极但不冒进** - 有合理依据就给出信号,不要过于保守
|
||
3. 每种类型最多输出一个信号
|
||
4. 止损必须明确,风险收益比至少 1:1.5
|
||
5. reason 字段必须包含量价分析(如"放量突破+RSI=45,量比1.8确认有效")
|
||
6. entry_type 必须明确:信号已触发用 market,等待更好价位用 limit
|
||
7. 短线信号止损控制在 1-2%,中线信号止损控制在 2-4%"""
|
||
|
||
def __init__(self):
|
||
"""初始化分析器"""
|
||
self.news_service = get_news_service()
|
||
logger.info("LLM 信号分析器初始化完成(含新闻舆情)")
|
||
|
||
async def analyze(self, symbol: str, data: Dict[str, pd.DataFrame],
|
||
symbols: List[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
使用 LLM 分析市场数据
|
||
|
||
Args:
|
||
symbol: 交易对,如 'BTCUSDT'
|
||
data: 多周期K线数据 {'5m': df, '15m': df, '1h': df, '4h': df}
|
||
symbols: 所有监控的交易对(用于过滤相关新闻)
|
||
|
||
Returns:
|
||
分析结果
|
||
"""
|
||
try:
|
||
# 获取新闻数据
|
||
news_text = await self._get_news_context(symbol, symbols or [symbol])
|
||
|
||
# 构建数据提示
|
||
data_prompt = self._build_data_prompt(symbol, data, news_text)
|
||
|
||
# 调用 LLM
|
||
response = llm_service.chat([
|
||
{"role": "system", "content": self.SYSTEM_PROMPT},
|
||
{"role": "user", "content": data_prompt}
|
||
])
|
||
|
||
if not response:
|
||
logger.warning(f"{symbol} LLM 分析无响应")
|
||
return self._empty_result(symbol, "LLM 无响应")
|
||
|
||
# 解析响应
|
||
result = self._parse_response(response)
|
||
result['symbol'] = symbol
|
||
result['timestamp'] = datetime.now().isoformat()
|
||
|
||
# 记录日志
|
||
signals = result.get('signals', [])
|
||
if signals:
|
||
for sig in signals:
|
||
logger.info(f"{symbol} [{sig['type']}] {sig['action']} "
|
||
f"置信度:{sig['confidence']}% 等级:{sig['grade']} "
|
||
f"原因:{sig['reason'][:50]}...")
|
||
else:
|
||
logger.info(f"{symbol} 无交易信号 - {result.get('analysis_summary', '观望')}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"{symbol} LLM 分析出错: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return self._empty_result(symbol, str(e))
|
||
|
||
async def _get_news_context(self, symbol: str, symbols: List[str]) -> str:
|
||
"""获取新闻上下文(暂时禁用)"""
|
||
# 暂时禁用新闻获取,只做技术面分析
|
||
return ""
|
||
|
||
def _build_data_prompt(self, symbol: str, data: Dict[str, pd.DataFrame],
|
||
news_text: str = "") -> str:
|
||
"""构建数据提示词"""
|
||
parts = [f"# {symbol} 市场数据分析\n"]
|
||
parts.append(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
|
||
# 当前价格
|
||
if '5m' in data and not data['5m'].empty:
|
||
current_price = float(data['5m'].iloc[-1]['close'])
|
||
parts.append(f"**当前价格**: ${current_price:,.2f}\n")
|
||
|
||
# 各周期数据
|
||
for interval in ['4h', '1h', '15m', '5m']:
|
||
df = data.get(interval)
|
||
if df is None or df.empty:
|
||
continue
|
||
|
||
parts.append(f"\n## {interval.upper()} 周期数据")
|
||
|
||
# 最新指标
|
||
latest = df.iloc[-1]
|
||
parts.append(self._format_indicators(latest))
|
||
|
||
# 最近 K 线数据
|
||
parts.append(self._format_recent_klines(df, interval))
|
||
|
||
# 添加新闻数据
|
||
if news_text and news_text != "暂无相关新闻":
|
||
parts.append(f"\n{news_text}")
|
||
|
||
parts.append("\n---")
|
||
parts.append("请综合分析以上技术数据和新闻舆情,判断是否存在短线、中线或长线的交易机会。")
|
||
parts.append("如果没有明确的交易机会,signals 数组返回空即可。")
|
||
|
||
return "\n".join(parts)
|
||
|
||
def _format_indicators(self, row: pd.Series) -> str:
|
||
"""格式化指标数据"""
|
||
lines = []
|
||
|
||
# 价格
|
||
close = row.get('close', 0)
|
||
open_price = row.get('open', 0)
|
||
high = row.get('high', 0)
|
||
low = row.get('low', 0)
|
||
change = ((close - open_price) / open_price * 100) if open_price else 0
|
||
lines.append(f"- K线: O={open_price:.2f} H={high:.2f} L={low:.2f} C={close:.2f} ({change:+.2f}%)")
|
||
|
||
# 均线
|
||
ma5 = row.get('ma5', 0)
|
||
ma10 = row.get('ma10', 0)
|
||
ma20 = row.get('ma20', 0)
|
||
ma50 = row.get('ma50', 0)
|
||
if pd.notna(ma20):
|
||
ma_str = f"- 均线: MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f}"
|
||
if pd.notna(ma50):
|
||
ma_str += f", MA50={ma50:.2f}"
|
||
lines.append(ma_str)
|
||
|
||
# RSI
|
||
rsi = row.get('rsi', 0)
|
||
if pd.notna(rsi):
|
||
rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性")
|
||
lines.append(f"- RSI: {rsi:.1f} ({rsi_status})")
|
||
|
||
# MACD
|
||
macd = row.get('macd', 0)
|
||
macd_signal = row.get('macd_signal', 0)
|
||
macd_hist = row.get('macd_hist', 0)
|
||
if pd.notna(macd):
|
||
macd_status = "多头" if macd > macd_signal else "空头"
|
||
lines.append(f"- MACD: DIF={macd:.4f}, DEA={macd_signal:.4f}, 柱={macd_hist:.4f} ({macd_status})")
|
||
|
||
# KDJ
|
||
k = row.get('k', 0)
|
||
d = row.get('d', 0)
|
||
j = row.get('j', 0)
|
||
if pd.notna(k):
|
||
lines.append(f"- KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}")
|
||
|
||
# 布林带
|
||
bb_upper = row.get('bb_upper', 0)
|
||
bb_middle = row.get('bb_middle', 0)
|
||
bb_lower = row.get('bb_lower', 0)
|
||
if pd.notna(bb_upper):
|
||
lines.append(f"- 布林带: 上={bb_upper:.2f}, 中={bb_middle:.2f}, 下={bb_lower:.2f}")
|
||
|
||
# ATR
|
||
atr = row.get('atr', 0)
|
||
if pd.notna(atr):
|
||
lines.append(f"- ATR: {atr:.2f}")
|
||
|
||
# 成交量
|
||
volume = row.get('volume', 0)
|
||
volume_ratio = row.get('volume_ratio', 0)
|
||
if pd.notna(volume_ratio):
|
||
vol_status = "放量" if volume_ratio > 1.5 else ("缩量" if volume_ratio < 0.5 else "正常")
|
||
lines.append(f"- 成交量: {volume:.2f}, 量比={volume_ratio:.2f} ({vol_status})")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _format_recent_klines(self, df: pd.DataFrame, interval: str) -> str:
|
||
"""格式化最近 K 线(含量价分析)"""
|
||
# 根据周期决定显示数量
|
||
count = {'4h': 6, '1h': 12, '15m': 8, '5m': 6}.get(interval, 6)
|
||
|
||
if len(df) < count:
|
||
count = len(df)
|
||
|
||
lines = [f"\n最近 {count} 根 K 线(含量价数据):"]
|
||
lines.append("| 时间 | 开盘 | 最高 | 最低 | 收盘 | 涨跌 | 成交量 | 量比 | RSI |")
|
||
lines.append("|------|------|------|------|------|------|--------|------|-----|")
|
||
|
||
for i in range(-count, 0):
|
||
row = df.iloc[i]
|
||
change = ((row['close'] - row['open']) / row['open'] * 100) if row['open'] else 0
|
||
change_str = f"{change:+.2f}%"
|
||
time_str = row['open_time'].strftime('%m-%d %H:%M') if pd.notna(row.get('open_time')) else 'N/A'
|
||
rsi = row.get('rsi', 0)
|
||
rsi_str = f"{rsi:.0f}" if pd.notna(rsi) else "-"
|
||
|
||
# 成交量和量比
|
||
volume = row.get('volume', 0)
|
||
volume_ratio = row.get('volume_ratio', 1.0)
|
||
if pd.notna(volume) and volume > 0:
|
||
# 格式化成交量(大数字用K/M表示)
|
||
if volume >= 1000000:
|
||
vol_str = f"{volume/1000000:.1f}M"
|
||
elif volume >= 1000:
|
||
vol_str = f"{volume/1000:.1f}K"
|
||
else:
|
||
vol_str = f"{volume:.0f}"
|
||
else:
|
||
vol_str = "-"
|
||
|
||
vol_ratio_str = f"{volume_ratio:.2f}" if pd.notna(volume_ratio) else "-"
|
||
|
||
lines.append(f"| {time_str} | {row['open']:.2f} | {row['high']:.2f} | "
|
||
f"{row['low']:.2f} | {row['close']:.2f} | {change_str} | {vol_str} | {vol_ratio_str} | {rsi_str} |")
|
||
|
||
# 添加量价分析提示
|
||
lines.append(self._analyze_volume_price(df, count))
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _analyze_volume_price(self, df: pd.DataFrame, count: int) -> str:
|
||
"""分析量价关系"""
|
||
if len(df) < count:
|
||
return ""
|
||
|
||
recent = df.iloc[-count:]
|
||
analysis = []
|
||
|
||
# 计算价格趋势
|
||
price_change = (recent.iloc[-1]['close'] - recent.iloc[0]['close']) / recent.iloc[0]['close'] * 100
|
||
|
||
# 计算成交量趋势
|
||
vol_first_half = recent.iloc[:count//2]['volume'].mean() if 'volume' in recent.columns else 0
|
||
vol_second_half = recent.iloc[count//2:]['volume'].mean() if 'volume' in recent.columns else 0
|
||
|
||
if vol_first_half > 0 and vol_second_half > 0:
|
||
vol_change = (vol_second_half - vol_first_half) / vol_first_half * 100
|
||
|
||
# 量价分析
|
||
if price_change > 1: # 上涨
|
||
if vol_change > 20:
|
||
analysis.append("📈 **量价分析**: 放量上涨,上涨有效")
|
||
elif vol_change < -20:
|
||
analysis.append("⚠️ **量价分析**: 缩量上涨,警惕回调")
|
||
else:
|
||
analysis.append("➡️ **量价分析**: 量能平稳上涨")
|
||
elif price_change < -1: # 下跌
|
||
if vol_change > 20:
|
||
analysis.append("📉 **量价分析**: 放量下跌,下跌有效")
|
||
elif vol_change < -20:
|
||
analysis.append("💡 **量价分析**: 缩量下跌,关注企稳")
|
||
else:
|
||
analysis.append("➡️ **量价分析**: 量能平稳下跌")
|
||
else: # 横盘
|
||
if vol_change < -30:
|
||
analysis.append("🔄 **量价分析**: 缩量整理,等待方向")
|
||
else:
|
||
analysis.append("🔄 **量价分析**: 横盘震荡")
|
||
|
||
# 检测量价背离
|
||
if len(df) >= 10:
|
||
recent_10 = df.iloc[-10:]
|
||
# 检查是否有新高/新低
|
||
price_high_idx = recent_10['high'].idxmax()
|
||
price_low_idx = recent_10['low'].idxmin()
|
||
|
||
if 'volume' in recent_10.columns:
|
||
# 顶背离检测
|
||
if price_high_idx == recent_10.index[-1]: # 最新K线创新高
|
||
prev_high_idx = recent_10['high'].iloc[:-1].idxmax()
|
||
if recent_10.loc[price_high_idx, 'volume'] < recent_10.loc[prev_high_idx, 'volume'] * 0.8:
|
||
analysis.append("🔴 **顶背离**: 价格新高但量能不足,警惕回落")
|
||
|
||
# 底背离检测
|
||
if price_low_idx == recent_10.index[-1]: # 最新K线创新低
|
||
prev_low_idx = recent_10['low'].iloc[:-1].idxmin()
|
||
if recent_10.loc[price_low_idx, 'volume'] < recent_10.loc[prev_low_idx, 'volume'] * 0.8:
|
||
analysis.append("🟢 **底背离**: 价格新低但量能萎缩,关注反弹")
|
||
|
||
return "\n" + "\n".join(analysis) if analysis else ""
|
||
|
||
def _parse_response(self, response: str) -> Dict[str, Any]:
|
||
"""解析 LLM 响应"""
|
||
result = {
|
||
'raw_response': response,
|
||
'analysis_summary': '',
|
||
'signals': [],
|
||
'key_levels': {'support': [], 'resistance': []}
|
||
}
|
||
|
||
try:
|
||
# 尝试提取 JSON
|
||
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response)
|
||
if json_match:
|
||
json_str = json_match.group(1)
|
||
else:
|
||
# 尝试直接解析
|
||
json_str = response
|
||
|
||
parsed = json.loads(json_str)
|
||
|
||
result['analysis_summary'] = parsed.get('analysis_summary', '')
|
||
result['signals'] = parsed.get('signals', [])
|
||
result['key_levels'] = parsed.get('key_levels', {'support': [], 'resistance': []})
|
||
|
||
# 验证和清理信号
|
||
valid_signals = []
|
||
for sig in result['signals']:
|
||
if self._validate_signal(sig):
|
||
valid_signals.append(sig)
|
||
result['signals'] = valid_signals
|
||
|
||
except json.JSONDecodeError:
|
||
logger.warning("LLM 响应不是有效 JSON,尝试提取关键信息")
|
||
result['analysis_summary'] = self._extract_summary(response)
|
||
|
||
return result
|
||
|
||
def _validate_signal(self, signal: Dict[str, Any]) -> bool:
|
||
"""验证信号是否有效"""
|
||
required_fields = ['type', 'action', 'confidence', 'grade', 'reason']
|
||
for field in required_fields:
|
||
if field not in signal:
|
||
return False
|
||
|
||
# 验证类型
|
||
if signal['type'] not in ['short_term', 'medium_term', 'long_term']:
|
||
return False
|
||
|
||
# 验证动作
|
||
if signal['action'] not in ['buy', 'sell', 'wait']:
|
||
return False
|
||
|
||
# wait 动作不算有效信号
|
||
if signal['action'] == 'wait':
|
||
return False
|
||
|
||
# 验证置信度(必须 >= 60 才算有效信号,即 B 级及以上)
|
||
confidence = signal.get('confidence', 0)
|
||
if not isinstance(confidence, (int, float)) or confidence < 60:
|
||
return False
|
||
|
||
# 验证入场类型(默认为 market)
|
||
entry_type = signal.get('entry_type', 'market')
|
||
if entry_type not in ['market', 'limit']:
|
||
signal['entry_type'] = 'market' # 默认现价入场
|
||
|
||
return True
|
||
|
||
def _extract_summary(self, text: str) -> str:
|
||
"""从文本中提取摘要"""
|
||
text = text.strip()
|
||
if len(text) > 100:
|
||
return text[:100] + "..."
|
||
return text
|
||
|
||
def _empty_result(self, symbol: str, reason: str = "") -> Dict[str, Any]:
|
||
"""返回空结果"""
|
||
return {
|
||
'symbol': symbol,
|
||
'timestamp': datetime.now().isoformat(),
|
||
'analysis_summary': reason or '无法分析',
|
||
'signals': [],
|
||
'key_levels': {'support': [], 'resistance': []},
|
||
'error': reason
|
||
}
|
||
|
||
def get_best_signal(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
从分析结果中获取最佳信号
|
||
|
||
Args:
|
||
result: analyze() 的返回结果
|
||
|
||
Returns:
|
||
最佳信号,如果没有则返回 None
|
||
"""
|
||
signals = result.get('signals', [])
|
||
if not signals:
|
||
return None
|
||
|
||
# 按置信度排序
|
||
sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True)
|
||
return sorted_signals[0]
|
||
|
||
def format_signal_message(self, signal: Dict[str, Any], symbol: str) -> str:
|
||
"""
|
||
格式化信号消息(用于 Telegram 通知)
|
||
|
||
Args:
|
||
signal: 信号数据
|
||
symbol: 交易对
|
||
|
||
Returns:
|
||
格式化的消息文本
|
||
"""
|
||
type_map = {
|
||
'short_term': '短线',
|
||
'medium_term': '中线',
|
||
'long_term': '长线'
|
||
}
|
||
action_map = {
|
||
'buy': '做多',
|
||
'sell': '做空'
|
||
}
|
||
|
||
signal_type = type_map.get(signal['type'], signal['type'])
|
||
action = action_map.get(signal['action'], signal['action'])
|
||
grade = signal.get('grade', 'C')
|
||
confidence = signal.get('confidence', 0)
|
||
entry_type = signal.get('entry_type', 'market')
|
||
|
||
# 等级图标
|
||
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(grade, '')
|
||
|
||
# 方向图标
|
||
action_icon = '🟢' if signal['action'] == 'buy' else '🔴'
|
||
|
||
# 入场类型
|
||
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
|
||
entry_type_icon = '⚡' if entry_type == 'market' else '⏳'
|
||
|
||
# 计算风险收益比
|
||
entry = signal.get('entry_price', 0)
|
||
sl = signal.get('stop_loss', 0)
|
||
tp = signal.get('take_profit', 0)
|
||
sl_percent = ((sl - entry) / entry * 100) if entry else 0
|
||
tp_percent = ((tp - entry) / entry * 100) if entry else 0
|
||
|
||
message = f"""📊 {symbol} {signal_type}信号
|
||
|
||
{action_icon} **方向**: {action}
|
||
{entry_type_icon} **入场**: {entry_type_text}
|
||
⭐ **等级**: {grade} {grade_icon}
|
||
📈 **置信度**: {confidence}%
|
||
|
||
💰 **入场价**: ${entry:,.2f}
|
||
🛑 **止损价**: ${sl:,.2f} ({sl_percent:+.1f}%)
|
||
🎯 **止盈价**: ${tp:,.2f} ({tp_percent:+.1f}%)
|
||
|
||
📝 **分析理由**:
|
||
{signal.get('reason', '无')}
|
||
|
||
⚠️ **风险提示**:
|
||
{signal.get('risk_warning', '请注意风险控制')}"""
|
||
|
||
return message
|
||
|
||
def format_feishu_card(self, signal: Dict[str, Any], symbol: str) -> Dict[str, Any]:
|
||
"""
|
||
格式化飞书卡片消息
|
||
|
||
Args:
|
||
signal: 信号数据
|
||
symbol: 交易对
|
||
|
||
Returns:
|
||
包含 title, content, color 的字典
|
||
"""
|
||
type_map = {
|
||
'short_term': '短线',
|
||
'medium_term': '中线',
|
||
'long_term': '长线'
|
||
}
|
||
action_map = {
|
||
'buy': '做多',
|
||
'sell': '做空'
|
||
}
|
||
|
||
signal_type = type_map.get(signal['type'], signal['type'])
|
||
action = action_map.get(signal['action'], signal['action'])
|
||
grade = signal.get('grade', 'C')
|
||
confidence = signal.get('confidence', 0)
|
||
entry_type = signal.get('entry_type', 'market')
|
||
|
||
# 等级图标
|
||
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(grade, '')
|
||
|
||
# 入场类型
|
||
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
|
||
entry_type_icon = '⚡' if entry_type == 'market' else '⏳'
|
||
|
||
# 标题和颜色
|
||
if signal['action'] == 'buy':
|
||
title = f"🟢 {symbol} {signal_type}做多信号 [{entry_type_text}]"
|
||
color = "green"
|
||
else:
|
||
title = f"🔴 {symbol} {signal_type}做空信号 [{entry_type_text}]"
|
||
color = "red"
|
||
|
||
# 计算风险收益比
|
||
entry = signal.get('entry_price', 0)
|
||
sl = signal.get('stop_loss', 0)
|
||
tp = signal.get('take_profit', 0)
|
||
sl_percent = ((sl - entry) / entry * 100) if entry else 0
|
||
tp_percent = ((tp - entry) / entry * 100) if entry else 0
|
||
|
||
# 构建 Markdown 内容
|
||
content_parts = [
|
||
f"**{signal_type}** | **{grade}**{grade_icon} | **{confidence}%** 置信度",
|
||
f"{entry_type_icon} **入场方式**: {entry_type_text}",
|
||
"",
|
||
f"💰 **入场**: ${entry:,.2f}",
|
||
f"🛑 **止损**: ${sl:,.2f} ({sl_percent:+.1f}%)",
|
||
f"🎯 **止盈**: ${tp:,.2f} ({tp_percent:+.1f}%)",
|
||
"",
|
||
f"📝 **分析理由**:",
|
||
f"{signal.get('reason', '无')}",
|
||
"",
|
||
f"⚠️ **风险提示**:",
|
||
f"{signal.get('risk_warning', '请注意风险控制')}",
|
||
]
|
||
|
||
return {
|
||
'title': title,
|
||
'content': '\n'.join(content_parts),
|
||
'color': color
|
||
}
|