import requests import json import logging import hashlib import hmac import base64 import urllib.parse import time from datetime import datetime from typing import List, Optional from technical_analyzer import CoinSignal class DingTalkNotifier: def __init__(self, webhook_url: str = None, secret: str = None): """初始化钉钉通知器 Args: webhook_url: 钉钉机器人的webhook URL secret: 钉钉机器人的加签密钥(可选) """ self.webhook_url = webhook_url self.secret = secret self.enabled = webhook_url is not None and webhook_url.strip() != "" if not self.enabled: logging.warning("钉钉webhook URL未配置,通知功能已禁用") elif self.secret: logging.info("钉钉通知已启用(加签模式)") else: logging.info("钉钉通知已启用(普通模式)") def send_coin_selection_notification(self, signals: List[CoinSignal]) -> bool: """发送选币结果通知 Args: signals: 选币信号列表 Returns: bool: 发送是否成功 """ if not self.enabled: logging.info("钉钉通知未启用,跳过发送") return False if not signals: return self._send_no_signals_notification() try: # 按多头和空头分组 long_signals = [s for s in signals if s.signal_type == "LONG"] short_signals = [s for s in signals if s.signal_type == "SHORT"] # 构建消息内容 message = self._build_selection_message(long_signals, short_signals) # 发送消息 return self._send_message(message) except Exception as e: logging.error(f"发送钉钉通知失败: {e}") return False def _build_selection_message(self, long_signals: List[CoinSignal], short_signals: List[CoinSignal]) -> dict: """构建选币结果消息""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 构建markdown格式消息 markdown_text = f"# 🎯 加密货币选币结果\n\n" markdown_text += f"**时间**: {current_time}\n" markdown_text += f"**总计**: {len(long_signals + short_signals)}个信号 (多头{len(long_signals)}个, 空头{len(short_signals)}个)\n\n" # 多头信号 if long_signals: markdown_text += "## 📈 多头信号\n\n" for i, signal in enumerate(long_signals, 1): confidence_emoji = "🔥" if signal.confidence == "高" else "⚡" if signal.confidence == "中" else "💡" strategy_emoji = self._get_strategy_emoji(signal.strategy_type) markdown_text += f"### {i}. {signal.symbol} {confidence_emoji}\n" markdown_text += f"- **策略**: {strategy_emoji} {signal.strategy_type}\n" markdown_text += f"- **评分**: {signal.score:.1f}分 ({signal.confidence}信心)\n" markdown_text += f"- **建议**: {signal.action_suggestion}\n" markdown_text += f"- **入场**: ${signal.entry_price:.4f}\n" markdown_text += f"- **止损**: ${signal.stop_loss:.4f} ({self._get_percentage_change(signal.entry_price, signal.stop_loss):.1f}%)\n" markdown_text += f"- **止盈**: ${signal.take_profit:.4f} ({self._get_percentage_change(signal.entry_price, signal.take_profit):.1f}%)\n" markdown_text += f"- **风险回报比**: 1:{signal.risk_reward_ratio:.2f}\n" markdown_text += f"- **持仓周期**: {signal.holding_period}天\n\n" # 空头信号 if short_signals: markdown_text += "## 📉 空头信号\n\n" for i, signal in enumerate(short_signals, 1): confidence_emoji = "🔥" if signal.confidence == "高" else "⚡" if signal.confidence == "中" else "💡" strategy_emoji = self._get_strategy_emoji(signal.strategy_type) markdown_text += f"### {i}. {signal.symbol} {confidence_emoji}\n" markdown_text += f"- **策略**: {strategy_emoji} {signal.strategy_type}\n" markdown_text += f"- **评分**: {signal.score:.1f}分 ({signal.confidence}信心)\n" markdown_text += f"- **建议**: {signal.action_suggestion}\n" markdown_text += f"- **入场**: ${signal.entry_price:.4f}\n" markdown_text += f"- **止损**: ${signal.stop_loss:.4f} ({self._get_percentage_change(signal.entry_price, signal.stop_loss):.1f}%)\n" markdown_text += f"- **止盈**: ${signal.take_profit:.4f} ({self._get_percentage_change(signal.entry_price, signal.take_profit):.1f}%)\n" markdown_text += f"- **风险回报比**: 1:{signal.risk_reward_ratio:.2f}\n" markdown_text += f"- **持仓周期**: {signal.holding_period}天\n\n" # 添加风险提示 markdown_text += "---\n" markdown_text += "⚠️ **风险提示**: 投资有风险,决策需谨慎。本信号仅供参考,不构成投资建议。\n" markdown_text += "🤖 *由AI选币系统自动生成*" return { "msgtype": "markdown", "markdown": { "title": f"选币结果 ({len(long_signals + short_signals)}个信号)", "text": markdown_text } } def _send_no_signals_notification(self) -> bool: """发送无信号通知""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") message = { "msgtype": "text", "text": { "content": f"📊 加密货币选币结果\n\n" f"时间: {current_time}\n" f"结果: 暂无符合条件的投资机会\n" f"建议: 继续观察市场动态\n\n" f"🤖 由AI选币系统自动生成" } } return self._send_message(message) def _generate_sign_url(self) -> str: """生成带加签的webhook URL""" if not self.secret: return self.webhook_url # 当前时间戳(毫秒) timestamp = str(round(time.time() * 1000)) # 拼接字符串 string_to_sign = f"{timestamp}\n{self.secret}" string_to_sign_enc = string_to_sign.encode('utf-8') secret_enc = self.secret.encode('utf-8') # HMAC-SHA256加密 hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() # Base64编码 sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 拼接最终URL if '?' in self.webhook_url: signed_url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}" else: signed_url = f"{self.webhook_url}?timestamp={timestamp}&sign={sign}" return signed_url def _send_message(self, message: dict) -> bool: """发送消息到钉钉""" try: # 生成签名URL(如果配置了密钥) url = self._generate_sign_url() headers = {'Content-Type': 'application/json'} response = requests.post( url, data=json.dumps(message), headers=headers, timeout=10 ) if response.status_code == 200: result = response.json() if result.get('errcode') == 0: logging.info("钉钉通知发送成功") return True else: logging.error(f"钉钉通知发送失败: {result.get('errmsg', '未知错误')}") return False else: logging.error(f"钉钉通知HTTP请求失败: {response.status_code}") return False except Exception as e: logging.error(f"发送钉钉消息异常: {e}") return False def _get_strategy_emoji(self, strategy: str) -> str: """获取策略对应的emoji""" strategy_emojis = { "超短线": "⚡", "短线": "🏃", "中线": "🚶", "波段": "🌊", "长线": "🐢", "趋势": "📈" } return strategy_emojis.get(strategy, "📊") def _get_percentage_change(self, entry_price: float, target_price: float) -> float: """计算价格变化百分比""" if entry_price == 0: return 0 return ((target_price - entry_price) / entry_price) * 100 def send_test_message(self) -> bool: """发送测试消息""" if not self.enabled: print("钉钉webhook URL未配置,无法发送测试消息") return False test_message = { "msgtype": "text", "text": { "content": f"🤖 加密货币选币系统测试消息\n\n" f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" f"状态: 系统运行正常\n" f"功能: 钉钉通知已配置成功" } } success = self._send_message(test_message) if success: print("✅ 钉钉测试消息发送成功") else: print("❌ 钉钉测试消息发送失败") return success # 测试脚本 if __name__ == "__main__": # 测试用例 webhook_url = input("请输入钉钉机器人webhook URL: ").strip() if webhook_url: notifier = DingTalkNotifier(webhook_url) notifier.send_test_message() else: print("未提供webhook URL,退出测试")