""" DingTalk Notifier - 钉钉群机器人消息推送 支持功能: - Markdown格式消息 - 交易信号格式化 - 错误重试 - 消息去重 """ import logging import json import time from typing import Dict, Any, Optional from datetime import datetime import hmac import hashlib import base64 import urllib.parse logger = logging.getLogger(__name__) class DingTalkNotifier: """钉钉群机器人通知器""" def __init__( self, webhook_url: Optional[str] = None, secret: Optional[str] = None, enabled: bool = True ): """ 初始化钉钉通知器 Args: webhook_url: 钉钉机器人webhook地址 secret: 钉钉机器人加签密钥(可选,增强安全性) enabled: 是否启用通知 """ self.webhook_url = webhook_url self.secret = secret self.enabled = enabled and webhook_url is not None if self.enabled: logger.info(f"📱 钉钉通知已启用 - Webhook: {webhook_url[:50]}...") else: logger.info("📱 钉钉通知未启用 (未配置webhook_url)") # 统计信息 self.stats = { 'total_sent': 0, 'total_failed': 0, 'last_send_time': None } def _generate_sign(self, timestamp: int) -> str: """ 生成钉钉机器人加签 Args: timestamp: 时间戳(毫秒) Returns: 签名字符串 """ if not self.secret: return "" secret_enc = self.secret.encode('utf-8') string_to_sign = f'{timestamp}\n{self.secret}' string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new( secret_enc, string_to_sign_enc, digestmod=hashlib.sha256 ).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return sign def _build_webhook_url(self) -> str: """ 构建带签名的webhook URL Returns: 完整的webhook URL """ if not self.secret: return self.webhook_url timestamp = int(time.time() * 1000) sign = self._generate_sign(timestamp) return f"{self.webhook_url}×tamp={timestamp}&sign={sign}" def send_markdown( self, title: str, text: str, at_mobiles: Optional[list] = None, at_all: bool = False ) -> bool: """ 发送Markdown格式消息 Args: title: 消息标题 text: Markdown格式文本 at_mobiles: @的手机号列表 at_all: 是否@所有人 Returns: 是否发送成功 """ if not self.enabled: logger.debug("钉钉通知未启用,跳过发送") return False try: import requests url = self._build_webhook_url() payload = { "msgtype": "markdown", "markdown": { "title": title, "text": text }, "at": { "atMobiles": at_mobiles or [], "isAtAll": at_all } } headers = {'Content-Type': 'application/json'} response = requests.post( url, data=json.dumps(payload), headers=headers, timeout=5 ) result = response.json() if result.get('errcode') == 0: self.stats['total_sent'] += 1 self.stats['last_send_time'] = datetime.now().isoformat() logger.info(f"✅ 钉钉消息发送成功: {title}") return True else: self.stats['total_failed'] += 1 logger.error(f"❌ 钉钉消息发送失败: {result.get('errmsg')}") return False except Exception as e: self.stats['total_failed'] += 1 logger.error(f"❌ 钉钉消息发送异常: {e}", exc_info=True) return False def send_signal(self, aggregated_signal: Dict[str, Any]) -> bool: """ 发送交易信号通知 Args: aggregated_signal: 聚合后的交易信号 Returns: 是否发送成功 """ if not self.enabled: return False try: # 格式化信号为Markdown markdown = self._format_signal_markdown(aggregated_signal) # 提取标题 signal_type = aggregated_signal.get('final_signal', 'HOLD') confidence = aggregated_signal.get('final_confidence', 0) title = f"🚨 交易信号: {signal_type} (置信度: {confidence:.0%})" # 发送消息 return self.send_markdown(title, markdown) except Exception as e: logger.error(f"❌ 格式化交易信号失败: {e}", exc_info=True) return False def _format_signal_markdown(self, signal: Dict[str, Any]) -> str: """ 格式化交易信号为Markdown文本(多时间级别版本) 支持两种格式: 1. 新格式: trades数组 (优先) 2. 旧格式: opportunities对象 (向后兼容) Args: signal: 聚合信号 Returns: Markdown格式文本 """ # 信号类型对应的emoji signal_type = signal.get('final_signal', 'HOLD') confidence = signal.get('final_confidence', 0) signal_emoji = { 'BUY': '🟢', 'SELL': '🔴', 'HOLD': '🟡' } emoji = signal_emoji.get(signal_type, '⚪') lines = [] # === 核心信号 === symbol = signal.get('symbol', 'BTC/USDT') lines.append(f"# {emoji} {symbol} {signal_type}") lines.append("") lines.append(f"**综合置信度**: {confidence:.0%} | **时间**: {datetime.now().strftime('%H:%M')}") lines.append("") # === 当前价格 === levels = signal.get('levels', {}) current_price = levels.get('current_price', 0) if current_price > 0: lines.append(f"**当前价格**: ${current_price:,.2f}") lines.append("") # === 多时间级别分析 === lines.append("## 📊 多时间级别分析") lines.append("") # 获取LLM信号 llm_signal = signal.get('llm_signal') or {} # 检测是否为新格式 (trades数组) trades = llm_signal.get('trades', []) if trades and isinstance(trades, list) and len(trades) >= 3: # 新格式: trades数组 trades_by_tf = {t.get('timeframe'): t for t in trades if t.get('timeframe')} # 短期分析 self._add_trade_section( lines, "短期 (5m/15m/1h)", "⚡", trades_by_tf.get('short', {}), signal ) # 中期分析 self._add_trade_section( lines, "中期 (4h/1d)", "📈", trades_by_tf.get('medium', {}), signal ) # 长期分析 self._add_trade_section( lines, "长期 (1d/1w)", "📅", trades_by_tf.get('long', {}), signal ) # 综合分析 analysis = llm_signal.get('analysis', {}) reason = analysis.get('summary', '') or llm_signal.get('reasoning', '') else: # 旧格式: opportunities对象 opportunities = llm_signal.get('opportunities', {}) recommendations = llm_signal.get('recommendations_by_timeframe', {}) # 短期分析 self._add_timeframe_section( lines, "短期 (5m/15m/1h)", "⚡", opportunities.get('short_term_5m_15m_1h', {}), recommendations.get('short_term', ''), signal ) # 中期分析 self._add_timeframe_section( lines, "中期 (4h/1d)", "📈", opportunities.get('medium_term_4h_1d', {}), recommendations.get('medium_term', ''), signal ) # 长期分析 self._add_timeframe_section( lines, "长期 (1d/1w)", "📅", opportunities.get('long_term_1d_1w', {}), recommendations.get('long_term', ''), signal ) reason = llm_signal.get('reasoning', '') or self._get_brief_reason(signal) # === 综合建议 === if reason: lines.append("---") lines.append("## 💡 综合分析") lines.append("") lines.append(f"{reason}") lines.append("") # === 关键价位 === key_levels = llm_signal.get('key_levels', {}) if key_levels: support = key_levels.get('support', []) resistance = key_levels.get('resistance', []) if support or resistance: lines.append("---") lines.append("## 📍 关键价位") lines.append("") if support: support_str = ", ".join([f"${p:,.0f}" for p in support[:3]]) lines.append(f"**支撑**: {support_str}") if resistance: resistance_str = ", ".join([f"${p:,.0f}" for p in resistance[:3]]) lines.append(f"**阻力**: {resistance_str}") lines.append("") # === 页脚 === lines.append("---") lines.append("*仅供参考,不构成投资建议*") return "\n".join(lines) def _add_trade_section( self, lines: list, timeframe_label: str, emoji: str, trade: Dict[str, Any], signal: Dict[str, Any] = None ): """ 添加单个时间级别的交易区块(新格式trades数组) Args: lines: 输出行列表 timeframe_label: 时间级别标签 emoji: emoji图标 trade: 该时间级别的交易信息 signal: 完整信号数据 """ lines.append(f"### {emoji} {timeframe_label}") lines.append("") status = trade.get('status', 'INACTIVE') is_active = status == 'ACTIVE' if is_active: direction = trade.get('direction', 'NONE') entry = trade.get('entry', {}) exit_data = trade.get('exit', {}) position = trade.get('position', {}) risk_reward = trade.get('risk_reward', 0) expected_profit = trade.get('expected_profit_pct', 0) reasoning = trade.get('reasoning', '') # 方向标识 direction_emoji = "🟢" if direction == "LONG" else "🔴" if direction == "SHORT" else "⚪" lines.append(f"{direction_emoji} **方向**: {direction}") lines.append("") # 金字塔入场价格 entry_prices = [] for i in range(1, 5): price = entry.get(f'price_{i}', 0) pct = position.get(f'size_pct_{i}', 0) if price > 0: entry_prices.append(f"${price:,.0f}({pct}%)") if entry_prices: lines.append(f"**入场**: {' → '.join(entry_prices)}") # 止损止盈 stop_loss = exit_data.get('stop_loss', 0) tp1 = exit_data.get('take_profit_1', 0) tp2 = exit_data.get('take_profit_2', 0) tp3 = exit_data.get('take_profit_3', 0) if stop_loss > 0: lines.append(f"**止损**: ${stop_loss:,.0f}") take_profits = [] if tp1 > 0: take_profits.append(f"${tp1:,.0f}") if tp2 > 0: take_profits.append(f"${tp2:,.0f}") if tp3 > 0: take_profits.append(f"${tp3:,.0f}") if take_profits: lines.append(f"**止盈**: {' / '.join(take_profits)}") # 风险回报比和预期盈利 if risk_reward > 0: lines.append(f"**风险回报**: 1:{risk_reward:.1f}") if expected_profit > 0: lines.append(f"**预期盈利**: {expected_profit:.1f}%") lines.append("") # 理由 if reasoning: lines.append(f"💭 {reasoning}") lines.append("") else: # 无交易机会 reasoning = trade.get('reasoning', '') if reasoning: lines.append(f"💭 {reasoning}") else: lines.append("💭 暂无明确交易机会") lines.append("") def _add_timeframe_section( self, lines: list, timeframe_label: str, emoji: str, opportunity: Dict[str, Any], recommendation: str, signal: Dict[str, Any] = None ): """ 添加单个时间级别的分析区块 Args: lines: 输出行列表 timeframe_label: 时间级别标签 emoji: emoji图标 opportunity: 该时间级别的交易机会 recommendation: 该时间级别的操作建议 signal: 完整信号数据(用于获取量化评分等) """ lines.append(f"### {emoji} {timeframe_label}") lines.append("") exists = opportunity.get('exists', False) if exists: direction = opportunity.get('direction', 'UNKNOWN') entry = opportunity.get('entry_price', 0) stop = opportunity.get('stop_loss', 0) tp = opportunity.get('take_profit', 0) reasoning = opportunity.get('reasoning', '') # 方向标识 direction_emoji = "🟢" if direction == "LONG" else "🔴" if direction == "SHORT" else "⚪" lines.append(f"{direction_emoji} **方向**: {direction}") lines.append("") # 价格信息 if entry and stop and tp: lines.append(f"**入场**: ${entry:,.2f}") lines.append(f"**止损**: ${stop:,.2f}") lines.append(f"**止盈**: ${tp:,.2f}") # 风险回报比 risk = abs(entry - stop) reward = abs(tp - entry) rr = reward / risk if risk > 0 else 0 lines.append(f"**风险回报**: 1:{rr:.1f}") lines.append("") # 理由 if reasoning: lines.append(f"💭 {reasoning}") lines.append("") else: # 无交易机会时,显示关键数据支撑 # 尝试从市场分析中获取该周期的数据 if signal: market_analysis = signal.get('market_analysis', {}) # 显示量化评分(如果有) quant_signal = signal.get('quantitative_signal', {}) if quant_signal: composite = quant_signal.get('composite_score', 0) lines.append(f"📊 **量化评分**: {composite:.1f}") scores = quant_signal.get('scores', {}) if scores: lines.append(f"- 趋势: {scores.get('trend', 0):.0f} | 动量: {scores.get('momentum', 0):.0f} | 订单流: {scores.get('orderflow', 0):.0f}") lines.append("") # 显示关键价格和技术指标 if market_analysis: trend = market_analysis.get('trend', {}) momentum = market_analysis.get('momentum', {}) if trend or momentum: lines.append(f"📈 **技术状态**:") if trend: lines.append(f"- 趋势: {trend.get('direction', 'unknown')} ({trend.get('strength', 'weak')})") if momentum: lines.append(f"- RSI: {momentum.get('rsi', 50):.0f} ({momentum.get('rsi_status', '中性')})") lines.append(f"- MACD: {momentum.get('macd_signal', 'unknown')}") lines.append("") # 操作建议 if recommendation: lines.append(f"💭 **操作建议**: {recommendation}") lines.append("") else: lines.append("💭 暂无明确交易机会") lines.append("") def _get_best_trade_plan(self, signal: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ 获取最优交易计划(优先中长线,盈利空间更大) 优先级: 1. 中长线机会 (swing) - 4h/1d/1w级别,盈利空间2%-5%+,适合通知 2. 日内机会 (intraday) - 5m/15m/1h级别,盈利空间0.5%-1% 3. 量化信号价格位 - 回退选项 Returns: { 'entry': float, 'stop_loss': float, 'take_profit': float, 'type': 'swing'|'intraday'|'quant' # 交易类型 } or None """ llm = signal.get('llm_signal') if llm and isinstance(llm, dict): opportunities = llm.get('opportunities', {}) # 优先1:中长线机会(盈利空间大,适合手机通知) swing = opportunities.get('swing', {}) if swing.get('exists'): entry = swing.get('entry_price') stop = swing.get('stop_loss') tp = swing.get('take_profit') if entry and stop and tp: return { 'entry': float(entry), 'stop_loss': float(stop), 'take_profit': float(tp), 'type': 'swing' } # 优先2:日内机会(如果没有中长线机会) intraday = opportunities.get('intraday', {}) if intraday.get('exists'): entry = intraday.get('entry_price') stop = intraday.get('stop_loss') tp = intraday.get('take_profit') if entry and stop and tp: return { 'entry': float(entry), 'stop_loss': float(stop), 'take_profit': float(tp), 'type': 'intraday' } # 回退到量化信号的价格位 levels = signal.get('levels', {}) entry = levels.get('entry') stop = levels.get('stop_loss') tp = levels.get('take_profit_1') if entry and stop and tp: return { 'entry': float(entry), 'stop_loss': float(stop), 'take_profit': float(tp), 'type': 'quant' } return None def _get_brief_reason(self, signal: Dict[str, Any]) -> str: """ 获取简短的信号原因(1-2句话) Returns: 简短原因描述 """ reasons = [] # 优先使用LLM的推理(截取前100字) llm = signal.get('llm_signal') if llm and isinstance(llm, dict): llm_reasoning = llm.get('reasoning', '') if llm_reasoning: # 取第一句话或前100字 brief = llm_reasoning.split('。')[0] + '。' if len(brief) > 100: brief = brief[:100] + '...' return brief # 如果有日内机会的说明 opportunities = llm.get('opportunities', {}) intraday = opportunities.get('intraday', {}) if intraday.get('exists') and intraday.get('reasoning'): brief = intraday['reasoning'] if len(brief) > 100: brief = brief[:100] + '...' return brief # 回退到量化信号的推理 quant = signal.get('quantitative_signal', {}) quant_reasoning = quant.get('reasoning', '') if quant_reasoning: return quant_reasoning # 默认 return signal.get('recommendation', '系统分析建议关注') def send_error(self, error_msg: str, context: Optional[str] = None) -> bool: """ 发送错误通知 Args: error_msg: 错误消息 context: 错误上下文 Returns: 是否发送成功 """ if not self.enabled: return False lines = [] lines.append("# ❌ 系统错误通知") lines.append("") lines.append(f"**时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append(f"**错误**: {error_msg}") if context: lines.append("") lines.append(f"**上下文**: {context}") lines.append("") lines.append("---") lines.append("*请及时检查系统状态*") markdown = "\n".join(lines) return self.send_markdown("系统错误通知", markdown, at_all=True) def get_stats(self) -> Dict[str, Any]: """获取统计信息""" return { 'enabled': self.enabled, 'total_sent': self.stats['total_sent'], 'total_failed': self.stats['total_failed'], 'success_rate': ( self.stats['total_sent'] / (self.stats['total_sent'] + self.stats['total_failed']) if (self.stats['total_sent'] + self.stats['total_failed']) > 0 else 0 ), 'last_send_time': self.stats['last_send_time'] }