674 lines
22 KiB
Python
674 lines
22 KiB
Python
"""
|
||
DingTalk Notifier - 钉钉群机器人消息推送
|
||
|
||
支持功能:
|
||
- Markdown格式消息
|
||
- 交易信号格式化
|
||
- 错误重试
|
||
- 消息去重
|
||
"""
|
||
import logging
|
||
import json
|
||
import time
|
||
from typing import Dict, Any, Optional
|
||
from datetime import datetime
|
||
import hmac
|
||
import hashlib
|
||
import base64
|
||
import urllib.parse
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class DingTalkNotifier:
|
||
"""钉钉群机器人通知器"""
|
||
|
||
def __init__(
|
||
self,
|
||
webhook_url: Optional[str] = None,
|
||
secret: Optional[str] = None,
|
||
enabled: bool = True
|
||
):
|
||
"""
|
||
初始化钉钉通知器
|
||
|
||
Args:
|
||
webhook_url: 钉钉机器人webhook地址
|
||
secret: 钉钉机器人加签密钥(可选,增强安全性)
|
||
enabled: 是否启用通知
|
||
"""
|
||
self.webhook_url = webhook_url
|
||
self.secret = secret
|
||
self.enabled = enabled and webhook_url is not None
|
||
|
||
if self.enabled:
|
||
logger.info(f"📱 钉钉通知已启用 - Webhook: {webhook_url[:50]}...")
|
||
else:
|
||
logger.info("📱 钉钉通知未启用 (未配置webhook_url)")
|
||
|
||
# 统计信息
|
||
self.stats = {
|
||
'total_sent': 0,
|
||
'total_failed': 0,
|
||
'last_send_time': None
|
||
}
|
||
|
||
def _generate_sign(self, timestamp: int) -> str:
|
||
"""
|
||
生成钉钉机器人加签
|
||
|
||
Args:
|
||
timestamp: 时间戳(毫秒)
|
||
|
||
Returns:
|
||
签名字符串
|
||
"""
|
||
if not self.secret:
|
||
return ""
|
||
|
||
secret_enc = self.secret.encode('utf-8')
|
||
string_to_sign = f'{timestamp}\n{self.secret}'
|
||
string_to_sign_enc = string_to_sign.encode('utf-8')
|
||
|
||
hmac_code = hmac.new(
|
||
secret_enc,
|
||
string_to_sign_enc,
|
||
digestmod=hashlib.sha256
|
||
).digest()
|
||
|
||
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
|
||
return sign
|
||
|
||
def _build_webhook_url(self) -> str:
|
||
"""
|
||
构建带签名的webhook URL
|
||
|
||
Returns:
|
||
完整的webhook URL
|
||
"""
|
||
if not self.secret:
|
||
return self.webhook_url
|
||
|
||
timestamp = int(time.time() * 1000)
|
||
sign = self._generate_sign(timestamp)
|
||
|
||
return f"{self.webhook_url}×tamp={timestamp}&sign={sign}"
|
||
|
||
def send_markdown(
|
||
self,
|
||
title: str,
|
||
text: str,
|
||
at_mobiles: Optional[list] = None,
|
||
at_all: bool = False
|
||
) -> bool:
|
||
"""
|
||
发送Markdown格式消息
|
||
|
||
Args:
|
||
title: 消息标题
|
||
text: Markdown格式文本
|
||
at_mobiles: @的手机号列表
|
||
at_all: 是否@所有人
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
if not self.enabled:
|
||
logger.debug("钉钉通知未启用,跳过发送")
|
||
return False
|
||
|
||
try:
|
||
import requests
|
||
|
||
url = self._build_webhook_url()
|
||
|
||
payload = {
|
||
"msgtype": "markdown",
|
||
"markdown": {
|
||
"title": title,
|
||
"text": text
|
||
},
|
||
"at": {
|
||
"atMobiles": at_mobiles or [],
|
||
"isAtAll": at_all
|
||
}
|
||
}
|
||
|
||
headers = {'Content-Type': 'application/json'}
|
||
|
||
response = requests.post(
|
||
url,
|
||
data=json.dumps(payload),
|
||
headers=headers,
|
||
timeout=5
|
||
)
|
||
|
||
result = response.json()
|
||
|
||
if result.get('errcode') == 0:
|
||
self.stats['total_sent'] += 1
|
||
self.stats['last_send_time'] = datetime.now().isoformat()
|
||
logger.info(f"✅ 钉钉消息发送成功: {title}")
|
||
return True
|
||
else:
|
||
self.stats['total_failed'] += 1
|
||
logger.error(f"❌ 钉钉消息发送失败: {result.get('errmsg')}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.stats['total_failed'] += 1
|
||
logger.error(f"❌ 钉钉消息发送异常: {e}", exc_info=True)
|
||
return False
|
||
|
||
def send_signal(self, aggregated_signal: Dict[str, Any]) -> bool:
|
||
"""
|
||
发送交易信号通知
|
||
|
||
Args:
|
||
aggregated_signal: 聚合后的交易信号
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
if not self.enabled:
|
||
return False
|
||
|
||
try:
|
||
# 格式化信号为Markdown
|
||
markdown = self._format_signal_markdown(aggregated_signal)
|
||
|
||
# 提取标题
|
||
signal_type = aggregated_signal.get('final_signal', 'HOLD')
|
||
confidence = aggregated_signal.get('final_confidence', 0)
|
||
|
||
title = f"🚨 交易信号: {signal_type} (置信度: {confidence:.0%})"
|
||
|
||
# 发送消息
|
||
return self.send_markdown(title, markdown)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 格式化交易信号失败: {e}", exc_info=True)
|
||
return False
|
||
|
||
def _format_signal_markdown(self, signal: Dict[str, Any]) -> str:
|
||
"""
|
||
格式化交易信号为Markdown文本(多时间级别版本)
|
||
|
||
支持两种格式:
|
||
1. 新格式: trades数组 (优先)
|
||
2. 旧格式: opportunities对象 (向后兼容)
|
||
|
||
Args:
|
||
signal: 聚合信号
|
||
|
||
Returns:
|
||
Markdown格式文本
|
||
"""
|
||
# 信号类型对应的emoji
|
||
signal_type = signal.get('final_signal', 'HOLD')
|
||
confidence = signal.get('final_confidence', 0)
|
||
|
||
signal_emoji = {
|
||
'BUY': '🟢',
|
||
'SELL': '🔴',
|
||
'HOLD': '🟡'
|
||
}
|
||
emoji = signal_emoji.get(signal_type, '⚪')
|
||
|
||
lines = []
|
||
|
||
# === 核心信号 ===
|
||
symbol = signal.get('symbol', 'BTC/USDT')
|
||
lines.append(f"# {emoji} {symbol} {signal_type}")
|
||
lines.append("")
|
||
lines.append(f"**综合置信度**: {confidence:.0%} | **时间**: {datetime.now().strftime('%H:%M')}")
|
||
lines.append("")
|
||
|
||
# === 当前价格 ===
|
||
levels = signal.get('levels', {})
|
||
current_price = levels.get('current_price', 0)
|
||
|
||
if current_price > 0:
|
||
lines.append(f"**当前价格**: ${current_price:,.2f}")
|
||
lines.append("")
|
||
|
||
# === 多时间级别分析 ===
|
||
lines.append("## 📊 多时间级别分析")
|
||
lines.append("")
|
||
|
||
# 获取LLM信号
|
||
llm_signal = signal.get('llm_signal') or {}
|
||
|
||
# 检测是否为新格式 (trades数组)
|
||
trades = llm_signal.get('trades', [])
|
||
if trades and isinstance(trades, list) and len(trades) >= 3:
|
||
# 新格式: trades数组
|
||
trades_by_tf = {t.get('timeframe'): t for t in trades if t.get('timeframe')}
|
||
|
||
# 短期分析
|
||
self._add_trade_section(
|
||
lines,
|
||
"短期 (5m/15m/1h)",
|
||
"⚡",
|
||
trades_by_tf.get('short', {}),
|
||
signal
|
||
)
|
||
|
||
# 中期分析
|
||
self._add_trade_section(
|
||
lines,
|
||
"中期 (4h/1d)",
|
||
"📈",
|
||
trades_by_tf.get('medium', {}),
|
||
signal
|
||
)
|
||
|
||
# 长期分析
|
||
self._add_trade_section(
|
||
lines,
|
||
"长期 (1d/1w)",
|
||
"📅",
|
||
trades_by_tf.get('long', {}),
|
||
signal
|
||
)
|
||
|
||
# 综合分析
|
||
analysis = llm_signal.get('analysis', {})
|
||
reason = analysis.get('summary', '') or llm_signal.get('reasoning', '')
|
||
else:
|
||
# 旧格式: opportunities对象
|
||
opportunities = llm_signal.get('opportunities', {})
|
||
recommendations = llm_signal.get('recommendations_by_timeframe', {})
|
||
|
||
# 短期分析
|
||
self._add_timeframe_section(
|
||
lines,
|
||
"短期 (5m/15m/1h)",
|
||
"⚡",
|
||
opportunities.get('short_term_5m_15m_1h', {}),
|
||
recommendations.get('short_term', ''),
|
||
signal
|
||
)
|
||
|
||
# 中期分析
|
||
self._add_timeframe_section(
|
||
lines,
|
||
"中期 (4h/1d)",
|
||
"📈",
|
||
opportunities.get('medium_term_4h_1d', {}),
|
||
recommendations.get('medium_term', ''),
|
||
signal
|
||
)
|
||
|
||
# 长期分析
|
||
self._add_timeframe_section(
|
||
lines,
|
||
"长期 (1d/1w)",
|
||
"📅",
|
||
opportunities.get('long_term_1d_1w', {}),
|
||
recommendations.get('long_term', ''),
|
||
signal
|
||
)
|
||
|
||
reason = llm_signal.get('reasoning', '') or self._get_brief_reason(signal)
|
||
|
||
# === 综合建议 ===
|
||
if reason:
|
||
lines.append("---")
|
||
lines.append("## 💡 综合分析")
|
||
lines.append("")
|
||
lines.append(f"{reason}")
|
||
lines.append("")
|
||
|
||
# === 关键价位 ===
|
||
key_levels = llm_signal.get('key_levels', {})
|
||
if key_levels:
|
||
support = key_levels.get('support', [])
|
||
resistance = key_levels.get('resistance', [])
|
||
if support or resistance:
|
||
lines.append("---")
|
||
lines.append("## 📍 关键价位")
|
||
lines.append("")
|
||
if support:
|
||
support_str = ", ".join([f"${p:,.0f}" for p in support[:3]])
|
||
lines.append(f"**支撑**: {support_str}")
|
||
if resistance:
|
||
resistance_str = ", ".join([f"${p:,.0f}" for p in resistance[:3]])
|
||
lines.append(f"**阻力**: {resistance_str}")
|
||
lines.append("")
|
||
|
||
# === 页脚 ===
|
||
lines.append("---")
|
||
lines.append("*仅供参考,不构成投资建议*")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _add_trade_section(
|
||
self,
|
||
lines: list,
|
||
timeframe_label: str,
|
||
emoji: str,
|
||
trade: Dict[str, Any],
|
||
signal: Dict[str, Any] = None
|
||
):
|
||
"""
|
||
添加单个时间级别的交易区块(新格式trades数组)
|
||
|
||
Args:
|
||
lines: 输出行列表
|
||
timeframe_label: 时间级别标签
|
||
emoji: emoji图标
|
||
trade: 该时间级别的交易信息
|
||
signal: 完整信号数据
|
||
"""
|
||
lines.append(f"### {emoji} {timeframe_label}")
|
||
lines.append("")
|
||
|
||
status = trade.get('status', 'INACTIVE')
|
||
is_active = status == 'ACTIVE'
|
||
|
||
if is_active:
|
||
direction = trade.get('direction', 'NONE')
|
||
entry = trade.get('entry', {})
|
||
exit_data = trade.get('exit', {})
|
||
position = trade.get('position', {})
|
||
risk_reward = trade.get('risk_reward', 0)
|
||
expected_profit = trade.get('expected_profit_pct', 0)
|
||
reasoning = trade.get('reasoning', '')
|
||
|
||
# 方向标识
|
||
direction_emoji = "🟢" if direction == "LONG" else "🔴" if direction == "SHORT" else "⚪"
|
||
lines.append(f"{direction_emoji} **方向**: {direction}")
|
||
lines.append("")
|
||
|
||
# 金字塔入场价格
|
||
entry_prices = []
|
||
for i in range(1, 5):
|
||
price = entry.get(f'price_{i}', 0)
|
||
pct = position.get(f'size_pct_{i}', 0)
|
||
if price > 0:
|
||
entry_prices.append(f"${price:,.0f}({pct}%)")
|
||
|
||
if entry_prices:
|
||
lines.append(f"**入场**: {' → '.join(entry_prices)}")
|
||
|
||
# 止损止盈
|
||
stop_loss = exit_data.get('stop_loss', 0)
|
||
tp1 = exit_data.get('take_profit_1', 0)
|
||
tp2 = exit_data.get('take_profit_2', 0)
|
||
tp3 = exit_data.get('take_profit_3', 0)
|
||
|
||
if stop_loss > 0:
|
||
lines.append(f"**止损**: ${stop_loss:,.0f}")
|
||
|
||
take_profits = []
|
||
if tp1 > 0:
|
||
take_profits.append(f"${tp1:,.0f}")
|
||
if tp2 > 0:
|
||
take_profits.append(f"${tp2:,.0f}")
|
||
if tp3 > 0:
|
||
take_profits.append(f"${tp3:,.0f}")
|
||
|
||
if take_profits:
|
||
lines.append(f"**止盈**: {' / '.join(take_profits)}")
|
||
|
||
# 风险回报比和预期盈利
|
||
if risk_reward > 0:
|
||
lines.append(f"**风险回报**: 1:{risk_reward:.1f}")
|
||
if expected_profit > 0:
|
||
lines.append(f"**预期盈利**: {expected_profit:.1f}%")
|
||
lines.append("")
|
||
|
||
# 理由
|
||
if reasoning:
|
||
lines.append(f"💭 {reasoning}")
|
||
lines.append("")
|
||
else:
|
||
# 无交易机会
|
||
reasoning = trade.get('reasoning', '')
|
||
if reasoning:
|
||
lines.append(f"💭 {reasoning}")
|
||
else:
|
||
lines.append("💭 暂无明确交易机会")
|
||
lines.append("")
|
||
|
||
def _add_timeframe_section(
|
||
self,
|
||
lines: list,
|
||
timeframe_label: str,
|
||
emoji: str,
|
||
opportunity: Dict[str, Any],
|
||
recommendation: str,
|
||
signal: Dict[str, Any] = None
|
||
):
|
||
"""
|
||
添加单个时间级别的分析区块
|
||
|
||
Args:
|
||
lines: 输出行列表
|
||
timeframe_label: 时间级别标签
|
||
emoji: emoji图标
|
||
opportunity: 该时间级别的交易机会
|
||
recommendation: 该时间级别的操作建议
|
||
signal: 完整信号数据(用于获取量化评分等)
|
||
"""
|
||
lines.append(f"### {emoji} {timeframe_label}")
|
||
lines.append("")
|
||
|
||
exists = opportunity.get('exists', False)
|
||
|
||
if exists:
|
||
direction = opportunity.get('direction', 'UNKNOWN')
|
||
entry = opportunity.get('entry_price', 0)
|
||
stop = opportunity.get('stop_loss', 0)
|
||
tp = opportunity.get('take_profit', 0)
|
||
reasoning = opportunity.get('reasoning', '')
|
||
|
||
# 方向标识
|
||
direction_emoji = "🟢" if direction == "LONG" else "🔴" if direction == "SHORT" else "⚪"
|
||
lines.append(f"{direction_emoji} **方向**: {direction}")
|
||
lines.append("")
|
||
|
||
# 价格信息
|
||
if entry and stop and tp:
|
||
lines.append(f"**入场**: ${entry:,.2f}")
|
||
lines.append(f"**止损**: ${stop:,.2f}")
|
||
lines.append(f"**止盈**: ${tp:,.2f}")
|
||
|
||
# 风险回报比
|
||
risk = abs(entry - stop)
|
||
reward = abs(tp - entry)
|
||
rr = reward / risk if risk > 0 else 0
|
||
lines.append(f"**风险回报**: 1:{rr:.1f}")
|
||
lines.append("")
|
||
|
||
# 理由
|
||
if reasoning:
|
||
lines.append(f"💭 {reasoning}")
|
||
lines.append("")
|
||
else:
|
||
# 无交易机会时,显示关键数据支撑
|
||
# 尝试从市场分析中获取该周期的数据
|
||
if signal:
|
||
market_analysis = signal.get('market_analysis', {})
|
||
# 显示量化评分(如果有)
|
||
quant_signal = signal.get('quantitative_signal', {})
|
||
if quant_signal:
|
||
composite = quant_signal.get('composite_score', 0)
|
||
lines.append(f"📊 **量化评分**: {composite:.1f}")
|
||
scores = quant_signal.get('scores', {})
|
||
if scores:
|
||
lines.append(f"- 趋势: {scores.get('trend', 0):.0f} | 动量: {scores.get('momentum', 0):.0f} | 订单流: {scores.get('orderflow', 0):.0f}")
|
||
lines.append("")
|
||
|
||
# 显示关键价格和技术指标
|
||
if market_analysis:
|
||
trend = market_analysis.get('trend', {})
|
||
momentum = market_analysis.get('momentum', {})
|
||
if trend or momentum:
|
||
lines.append(f"📈 **技术状态**:")
|
||
if trend:
|
||
lines.append(f"- 趋势: {trend.get('direction', 'unknown')} ({trend.get('strength', 'weak')})")
|
||
if momentum:
|
||
lines.append(f"- RSI: {momentum.get('rsi', 50):.0f} ({momentum.get('rsi_status', '中性')})")
|
||
lines.append(f"- MACD: {momentum.get('macd_signal', 'unknown')}")
|
||
lines.append("")
|
||
|
||
# 操作建议
|
||
if recommendation:
|
||
lines.append(f"💭 **操作建议**: {recommendation}")
|
||
lines.append("")
|
||
else:
|
||
lines.append("💭 暂无明确交易机会")
|
||
lines.append("")
|
||
|
||
def _get_best_trade_plan(self, signal: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
获取最优交易计划(优先中长线,盈利空间更大)
|
||
|
||
优先级:
|
||
1. 中长线机会 (swing) - 4h/1d/1w级别,盈利空间2%-5%+,适合通知
|
||
2. 日内机会 (intraday) - 5m/15m/1h级别,盈利空间0.5%-1%
|
||
3. 量化信号价格位 - 回退选项
|
||
|
||
Returns:
|
||
{
|
||
'entry': float,
|
||
'stop_loss': float,
|
||
'take_profit': float,
|
||
'type': 'swing'|'intraday'|'quant' # 交易类型
|
||
} or None
|
||
"""
|
||
llm = signal.get('llm_signal')
|
||
if llm and isinstance(llm, dict):
|
||
opportunities = llm.get('opportunities', {})
|
||
|
||
# 优先1:中长线机会(盈利空间大,适合手机通知)
|
||
swing = opportunities.get('swing', {})
|
||
if swing.get('exists'):
|
||
entry = swing.get('entry_price')
|
||
stop = swing.get('stop_loss')
|
||
tp = swing.get('take_profit')
|
||
|
||
if entry and stop and tp:
|
||
return {
|
||
'entry': float(entry),
|
||
'stop_loss': float(stop),
|
||
'take_profit': float(tp),
|
||
'type': 'swing'
|
||
}
|
||
|
||
# 优先2:日内机会(如果没有中长线机会)
|
||
intraday = opportunities.get('intraday', {})
|
||
if intraday.get('exists'):
|
||
entry = intraday.get('entry_price')
|
||
stop = intraday.get('stop_loss')
|
||
tp = intraday.get('take_profit')
|
||
|
||
if entry and stop and tp:
|
||
return {
|
||
'entry': float(entry),
|
||
'stop_loss': float(stop),
|
||
'take_profit': float(tp),
|
||
'type': 'intraday'
|
||
}
|
||
|
||
# 回退到量化信号的价格位
|
||
levels = signal.get('levels', {})
|
||
entry = levels.get('entry')
|
||
stop = levels.get('stop_loss')
|
||
tp = levels.get('take_profit_1')
|
||
|
||
if entry and stop and tp:
|
||
return {
|
||
'entry': float(entry),
|
||
'stop_loss': float(stop),
|
||
'take_profit': float(tp),
|
||
'type': 'quant'
|
||
}
|
||
|
||
return None
|
||
|
||
def _get_brief_reason(self, signal: Dict[str, Any]) -> str:
|
||
"""
|
||
获取简短的信号原因(1-2句话)
|
||
|
||
Returns:
|
||
简短原因描述
|
||
"""
|
||
reasons = []
|
||
|
||
# 优先使用LLM的推理(截取前100字)
|
||
llm = signal.get('llm_signal')
|
||
if llm and isinstance(llm, dict):
|
||
llm_reasoning = llm.get('reasoning', '')
|
||
if llm_reasoning:
|
||
# 取第一句话或前100字
|
||
brief = llm_reasoning.split('。')[0] + '。'
|
||
if len(brief) > 100:
|
||
brief = brief[:100] + '...'
|
||
return brief
|
||
|
||
# 如果有日内机会的说明
|
||
opportunities = llm.get('opportunities', {})
|
||
intraday = opportunities.get('intraday', {})
|
||
if intraday.get('exists') and intraday.get('reasoning'):
|
||
brief = intraday['reasoning']
|
||
if len(brief) > 100:
|
||
brief = brief[:100] + '...'
|
||
return brief
|
||
|
||
# 回退到量化信号的推理
|
||
quant = signal.get('quantitative_signal', {})
|
||
quant_reasoning = quant.get('reasoning', '')
|
||
if quant_reasoning:
|
||
return quant_reasoning
|
||
|
||
# 默认
|
||
return signal.get('recommendation', '系统分析建议关注')
|
||
|
||
def send_error(self, error_msg: str, context: Optional[str] = None) -> bool:
|
||
"""
|
||
发送错误通知
|
||
|
||
Args:
|
||
error_msg: 错误消息
|
||
context: 错误上下文
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
if not self.enabled:
|
||
return False
|
||
|
||
lines = []
|
||
lines.append("# ❌ 系统错误通知")
|
||
lines.append("")
|
||
lines.append(f"**时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
lines.append(f"**错误**: {error_msg}")
|
||
|
||
if context:
|
||
lines.append("")
|
||
lines.append(f"**上下文**: {context}")
|
||
|
||
lines.append("")
|
||
lines.append("---")
|
||
lines.append("*请及时检查系统状态*")
|
||
|
||
markdown = "\n".join(lines)
|
||
return self.send_markdown("系统错误通知", markdown, at_all=True)
|
||
|
||
def get_stats(self) -> Dict[str, Any]:
|
||
"""获取统计信息"""
|
||
return {
|
||
'enabled': self.enabled,
|
||
'total_sent': self.stats['total_sent'],
|
||
'total_failed': self.stats['total_failed'],
|
||
'success_rate': (
|
||
self.stats['total_sent'] / (self.stats['total_sent'] + self.stats['total_failed'])
|
||
if (self.stats['total_sent'] + self.stats['total_failed']) > 0
|
||
else 0
|
||
),
|
||
'last_send_time': self.stats['last_send_time']
|
||
}
|