tradusai/notifiers/dingtalk.py
2025-12-11 22:51:51 +08:00

674 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
DingTalk Notifier - 钉钉群机器人消息推送
支持功能:
- Markdown格式消息
- 交易信号格式化
- 错误重试
- 消息去重
"""
import logging
import json
import time
from typing import Dict, Any, Optional
from datetime import datetime
import hmac
import hashlib
import base64
import urllib.parse
logger = logging.getLogger(__name__)
class DingTalkNotifier:
"""钉钉群机器人通知器"""
def __init__(
self,
webhook_url: Optional[str] = None,
secret: Optional[str] = None,
enabled: bool = True
):
"""
初始化钉钉通知器
Args:
webhook_url: 钉钉机器人webhook地址
secret: 钉钉机器人加签密钥(可选,增强安全性)
enabled: 是否启用通知
"""
self.webhook_url = webhook_url
self.secret = secret
self.enabled = enabled and webhook_url is not None
if self.enabled:
logger.info(f"📱 钉钉通知已启用 - Webhook: {webhook_url[:50]}...")
else:
logger.info("📱 钉钉通知未启用 (未配置webhook_url)")
# 统计信息
self.stats = {
'total_sent': 0,
'total_failed': 0,
'last_send_time': None
}
def _generate_sign(self, timestamp: int) -> str:
"""
生成钉钉机器人加签
Args:
timestamp: 时间戳(毫秒)
Returns:
签名字符串
"""
if not self.secret:
return ""
secret_enc = self.secret.encode('utf-8')
string_to_sign = f'{timestamp}\n{self.secret}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(
secret_enc,
string_to_sign_enc,
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def _build_webhook_url(self) -> str:
"""
构建带签名的webhook URL
Returns:
完整的webhook URL
"""
if not self.secret:
return self.webhook_url
timestamp = int(time.time() * 1000)
sign = self._generate_sign(timestamp)
return f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
def send_markdown(
self,
title: str,
text: str,
at_mobiles: Optional[list] = None,
at_all: bool = False
) -> bool:
"""
发送Markdown格式消息
Args:
title: 消息标题
text: Markdown格式文本
at_mobiles: @的手机号列表
at_all: 是否@所有人
Returns:
是否发送成功
"""
if not self.enabled:
logger.debug("钉钉通知未启用,跳过发送")
return False
try:
import requests
url = self._build_webhook_url()
payload = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": text
},
"at": {
"atMobiles": at_mobiles or [],
"isAtAll": at_all
}
}
headers = {'Content-Type': 'application/json'}
response = requests.post(
url,
data=json.dumps(payload),
headers=headers,
timeout=5
)
result = response.json()
if result.get('errcode') == 0:
self.stats['total_sent'] += 1
self.stats['last_send_time'] = datetime.now().isoformat()
logger.info(f"✅ 钉钉消息发送成功: {title}")
return True
else:
self.stats['total_failed'] += 1
logger.error(f"❌ 钉钉消息发送失败: {result.get('errmsg')}")
return False
except Exception as e:
self.stats['total_failed'] += 1
logger.error(f"❌ 钉钉消息发送异常: {e}", exc_info=True)
return False
def send_signal(self, aggregated_signal: Dict[str, Any]) -> bool:
"""
发送交易信号通知
Args:
aggregated_signal: 聚合后的交易信号
Returns:
是否发送成功
"""
if not self.enabled:
return False
try:
# 格式化信号为Markdown
markdown = self._format_signal_markdown(aggregated_signal)
# 提取标题
signal_type = aggregated_signal.get('final_signal', 'HOLD')
confidence = aggregated_signal.get('final_confidence', 0)
title = f"🚨 交易信号: {signal_type} (置信度: {confidence:.0%})"
# 发送消息
return self.send_markdown(title, markdown)
except Exception as e:
logger.error(f"❌ 格式化交易信号失败: {e}", exc_info=True)
return False
def _format_signal_markdown(self, signal: Dict[str, Any]) -> str:
"""
格式化交易信号为Markdown文本多时间级别版本
支持两种格式:
1. 新格式: trades数组 (优先)
2. 旧格式: opportunities对象 (向后兼容)
Args:
signal: 聚合信号
Returns:
Markdown格式文本
"""
# 信号类型对应的emoji
signal_type = signal.get('final_signal', 'HOLD')
confidence = signal.get('final_confidence', 0)
signal_emoji = {
'BUY': '🟢',
'SELL': '🔴',
'HOLD': '🟡'
}
emoji = signal_emoji.get(signal_type, '')
lines = []
# === 核心信号 ===
symbol = signal.get('symbol', 'BTC/USDT')
lines.append(f"# {emoji} {symbol} {signal_type}")
lines.append("")
lines.append(f"**综合置信度**: {confidence:.0%} | **时间**: {datetime.now().strftime('%H:%M')}")
lines.append("")
# === 当前价格 ===
levels = signal.get('levels', {})
current_price = levels.get('current_price', 0)
if current_price > 0:
lines.append(f"**当前价格**: ${current_price:,.2f}")
lines.append("")
# === 多时间级别分析 ===
lines.append("## 📊 多时间级别分析")
lines.append("")
# 获取LLM信号
llm_signal = signal.get('llm_signal') or {}
# 检测是否为新格式 (trades数组)
trades = llm_signal.get('trades', [])
if trades and isinstance(trades, list) and len(trades) >= 3:
# 新格式: trades数组
trades_by_tf = {t.get('timeframe'): t for t in trades if t.get('timeframe')}
# 短期分析
self._add_trade_section(
lines,
"短期 (5m/15m/1h)",
"",
trades_by_tf.get('short', {}),
signal
)
# 中期分析
self._add_trade_section(
lines,
"中期 (4h/1d)",
"📈",
trades_by_tf.get('medium', {}),
signal
)
# 长期分析
self._add_trade_section(
lines,
"长期 (1d/1w)",
"📅",
trades_by_tf.get('long', {}),
signal
)
# 综合分析
analysis = llm_signal.get('analysis', {})
reason = analysis.get('summary', '') or llm_signal.get('reasoning', '')
else:
# 旧格式: opportunities对象
opportunities = llm_signal.get('opportunities', {})
recommendations = llm_signal.get('recommendations_by_timeframe', {})
# 短期分析
self._add_timeframe_section(
lines,
"短期 (5m/15m/1h)",
"",
opportunities.get('short_term_5m_15m_1h', {}),
recommendations.get('short_term', ''),
signal
)
# 中期分析
self._add_timeframe_section(
lines,
"中期 (4h/1d)",
"📈",
opportunities.get('medium_term_4h_1d', {}),
recommendations.get('medium_term', ''),
signal
)
# 长期分析
self._add_timeframe_section(
lines,
"长期 (1d/1w)",
"📅",
opportunities.get('long_term_1d_1w', {}),
recommendations.get('long_term', ''),
signal
)
reason = llm_signal.get('reasoning', '') or self._get_brief_reason(signal)
# === 综合建议 ===
if reason:
lines.append("---")
lines.append("## 💡 综合分析")
lines.append("")
lines.append(f"{reason}")
lines.append("")
# === 关键价位 ===
key_levels = llm_signal.get('key_levels', {})
if key_levels:
support = key_levels.get('support', [])
resistance = key_levels.get('resistance', [])
if support or resistance:
lines.append("---")
lines.append("## 📍 关键价位")
lines.append("")
if support:
support_str = ", ".join([f"${p:,.0f}" for p in support[:3]])
lines.append(f"**支撑**: {support_str}")
if resistance:
resistance_str = ", ".join([f"${p:,.0f}" for p in resistance[:3]])
lines.append(f"**阻力**: {resistance_str}")
lines.append("")
# === 页脚 ===
lines.append("---")
lines.append("*仅供参考,不构成投资建议*")
return "\n".join(lines)
def _add_trade_section(
self,
lines: list,
timeframe_label: str,
emoji: str,
trade: Dict[str, Any],
signal: Dict[str, Any] = None
):
"""
添加单个时间级别的交易区块新格式trades数组
Args:
lines: 输出行列表
timeframe_label: 时间级别标签
emoji: emoji图标
trade: 该时间级别的交易信息
signal: 完整信号数据
"""
lines.append(f"### {emoji} {timeframe_label}")
lines.append("")
status = trade.get('status', 'INACTIVE')
is_active = status == 'ACTIVE'
if is_active:
direction = trade.get('direction', 'NONE')
entry = trade.get('entry', {})
exit_data = trade.get('exit', {})
position = trade.get('position', {})
risk_reward = trade.get('risk_reward', 0)
expected_profit = trade.get('expected_profit_pct', 0)
reasoning = trade.get('reasoning', '')
# 方向标识
direction_emoji = "🟢" if direction == "LONG" else "🔴" if direction == "SHORT" else ""
lines.append(f"{direction_emoji} **方向**: {direction}")
lines.append("")
# 金字塔入场价格
entry_prices = []
for i in range(1, 5):
price = entry.get(f'price_{i}', 0)
pct = position.get(f'size_pct_{i}', 0)
if price > 0:
entry_prices.append(f"${price:,.0f}({pct}%)")
if entry_prices:
lines.append(f"**入场**: {''.join(entry_prices)}")
# 止损止盈
stop_loss = exit_data.get('stop_loss', 0)
tp1 = exit_data.get('take_profit_1', 0)
tp2 = exit_data.get('take_profit_2', 0)
tp3 = exit_data.get('take_profit_3', 0)
if stop_loss > 0:
lines.append(f"**止损**: ${stop_loss:,.0f}")
take_profits = []
if tp1 > 0:
take_profits.append(f"${tp1:,.0f}")
if tp2 > 0:
take_profits.append(f"${tp2:,.0f}")
if tp3 > 0:
take_profits.append(f"${tp3:,.0f}")
if take_profits:
lines.append(f"**止盈**: {' / '.join(take_profits)}")
# 风险回报比和预期盈利
if risk_reward > 0:
lines.append(f"**风险回报**: 1:{risk_reward:.1f}")
if expected_profit > 0:
lines.append(f"**预期盈利**: {expected_profit:.1f}%")
lines.append("")
# 理由
if reasoning:
lines.append(f"💭 {reasoning}")
lines.append("")
else:
# 无交易机会
reasoning = trade.get('reasoning', '')
if reasoning:
lines.append(f"💭 {reasoning}")
else:
lines.append("💭 暂无明确交易机会")
lines.append("")
def _add_timeframe_section(
self,
lines: list,
timeframe_label: str,
emoji: str,
opportunity: Dict[str, Any],
recommendation: str,
signal: Dict[str, Any] = None
):
"""
添加单个时间级别的分析区块
Args:
lines: 输出行列表
timeframe_label: 时间级别标签
emoji: emoji图标
opportunity: 该时间级别的交易机会
recommendation: 该时间级别的操作建议
signal: 完整信号数据(用于获取量化评分等)
"""
lines.append(f"### {emoji} {timeframe_label}")
lines.append("")
exists = opportunity.get('exists', False)
if exists:
direction = opportunity.get('direction', 'UNKNOWN')
entry = opportunity.get('entry_price', 0)
stop = opportunity.get('stop_loss', 0)
tp = opportunity.get('take_profit', 0)
reasoning = opportunity.get('reasoning', '')
# 方向标识
direction_emoji = "🟢" if direction == "LONG" else "🔴" if direction == "SHORT" else ""
lines.append(f"{direction_emoji} **方向**: {direction}")
lines.append("")
# 价格信息
if entry and stop and tp:
lines.append(f"**入场**: ${entry:,.2f}")
lines.append(f"**止损**: ${stop:,.2f}")
lines.append(f"**止盈**: ${tp:,.2f}")
# 风险回报比
risk = abs(entry - stop)
reward = abs(tp - entry)
rr = reward / risk if risk > 0 else 0
lines.append(f"**风险回报**: 1:{rr:.1f}")
lines.append("")
# 理由
if reasoning:
lines.append(f"💭 {reasoning}")
lines.append("")
else:
# 无交易机会时,显示关键数据支撑
# 尝试从市场分析中获取该周期的数据
if signal:
market_analysis = signal.get('market_analysis', {})
# 显示量化评分(如果有)
quant_signal = signal.get('quantitative_signal', {})
if quant_signal:
composite = quant_signal.get('composite_score', 0)
lines.append(f"📊 **量化评分**: {composite:.1f}")
scores = quant_signal.get('scores', {})
if scores:
lines.append(f"- 趋势: {scores.get('trend', 0):.0f} | 动量: {scores.get('momentum', 0):.0f} | 订单流: {scores.get('orderflow', 0):.0f}")
lines.append("")
# 显示关键价格和技术指标
if market_analysis:
trend = market_analysis.get('trend', {})
momentum = market_analysis.get('momentum', {})
if trend or momentum:
lines.append(f"📈 **技术状态**:")
if trend:
lines.append(f"- 趋势: {trend.get('direction', 'unknown')} ({trend.get('strength', 'weak')})")
if momentum:
lines.append(f"- RSI: {momentum.get('rsi', 50):.0f} ({momentum.get('rsi_status', '中性')})")
lines.append(f"- MACD: {momentum.get('macd_signal', 'unknown')}")
lines.append("")
# 操作建议
if recommendation:
lines.append(f"💭 **操作建议**: {recommendation}")
lines.append("")
else:
lines.append("💭 暂无明确交易机会")
lines.append("")
def _get_best_trade_plan(self, signal: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
获取最优交易计划(优先中长线,盈利空间更大)
优先级:
1. 中长线机会 (swing) - 4h/1d/1w级别盈利空间2%-5%+,适合通知
2. 日内机会 (intraday) - 5m/15m/1h级别盈利空间0.5%-1%
3. 量化信号价格位 - 回退选项
Returns:
{
'entry': float,
'stop_loss': float,
'take_profit': float,
'type': 'swing'|'intraday'|'quant' # 交易类型
} or None
"""
llm = signal.get('llm_signal')
if llm and isinstance(llm, dict):
opportunities = llm.get('opportunities', {})
# 优先1中长线机会盈利空间大适合手机通知
swing = opportunities.get('swing', {})
if swing.get('exists'):
entry = swing.get('entry_price')
stop = swing.get('stop_loss')
tp = swing.get('take_profit')
if entry and stop and tp:
return {
'entry': float(entry),
'stop_loss': float(stop),
'take_profit': float(tp),
'type': 'swing'
}
# 优先2日内机会如果没有中长线机会
intraday = opportunities.get('intraday', {})
if intraday.get('exists'):
entry = intraday.get('entry_price')
stop = intraday.get('stop_loss')
tp = intraday.get('take_profit')
if entry and stop and tp:
return {
'entry': float(entry),
'stop_loss': float(stop),
'take_profit': float(tp),
'type': 'intraday'
}
# 回退到量化信号的价格位
levels = signal.get('levels', {})
entry = levels.get('entry')
stop = levels.get('stop_loss')
tp = levels.get('take_profit_1')
if entry and stop and tp:
return {
'entry': float(entry),
'stop_loss': float(stop),
'take_profit': float(tp),
'type': 'quant'
}
return None
def _get_brief_reason(self, signal: Dict[str, Any]) -> str:
"""
获取简短的信号原因1-2句话
Returns:
简短原因描述
"""
reasons = []
# 优先使用LLM的推理截取前100字
llm = signal.get('llm_signal')
if llm and isinstance(llm, dict):
llm_reasoning = llm.get('reasoning', '')
if llm_reasoning:
# 取第一句话或前100字
brief = llm_reasoning.split('')[0] + ''
if len(brief) > 100:
brief = brief[:100] + '...'
return brief
# 如果有日内机会的说明
opportunities = llm.get('opportunities', {})
intraday = opportunities.get('intraday', {})
if intraday.get('exists') and intraday.get('reasoning'):
brief = intraday['reasoning']
if len(brief) > 100:
brief = brief[:100] + '...'
return brief
# 回退到量化信号的推理
quant = signal.get('quantitative_signal', {})
quant_reasoning = quant.get('reasoning', '')
if quant_reasoning:
return quant_reasoning
# 默认
return signal.get('recommendation', '系统分析建议关注')
def send_error(self, error_msg: str, context: Optional[str] = None) -> bool:
"""
发送错误通知
Args:
error_msg: 错误消息
context: 错误上下文
Returns:
是否发送成功
"""
if not self.enabled:
return False
lines = []
lines.append("# ❌ 系统错误通知")
lines.append("")
lines.append(f"**时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"**错误**: {error_msg}")
if context:
lines.append("")
lines.append(f"**上下文**: {context}")
lines.append("")
lines.append("---")
lines.append("*请及时检查系统状态*")
markdown = "\n".join(lines)
return self.send_markdown("系统错误通知", markdown, at_all=True)
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
'enabled': self.enabled,
'total_sent': self.stats['total_sent'],
'total_failed': self.stats['total_failed'],
'success_rate': (
self.stats['total_sent'] / (self.stats['total_sent'] + self.stats['total_failed'])
if (self.stats['total_sent'] + self.stats['total_failed']) > 0
else 0
),
'last_send_time': self.stats['last_send_time']
}