""" 通知模块 - 支持钉钉webhook通知 """ import requests import json import time import hmac import hashlib import base64 import urllib.parse from typing import Dict, Any, Optional from loguru import logger from datetime import datetime class DingTalkNotifier: """钉钉机器人通知器""" def __init__(self, webhook_url: str, secret: str = None): """ 初始化钉钉通知器 Args: webhook_url: 钉钉机器人webhook地址 secret: 加签密钥,如果提供则启用加签验证 """ self.webhook_url = webhook_url self.secret = secret self.session = requests.Session() logger.info(f"钉钉通知器初始化完成 {'(已启用加签)' if secret else '(未启用加签)'}") def _generate_signature(self, timestamp: str) -> str: """ 生成钉钉加签 Args: timestamp: 时间戳字符串 Returns: 加签结果 """ if not self.secret: return "" string_to_sign = f"{timestamp}\n{self.secret}" hmac_code = hmac.new( self.secret.encode('utf-8'), string_to_sign.encode('utf-8'), digestmod=hashlib.sha256 ).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return sign def _get_signed_url(self) -> str: """ 获取带加签的webhook URL Returns: 带加签的URL,如果未配置密钥则返回原URL """ if not self.secret: return self.webhook_url timestamp = str(round(time.time() * 1000)) sign = self._generate_signature(timestamp) # 添加时间戳和签名参数 separator = '&' if '?' in self.webhook_url else '?' return f"{self.webhook_url}{separator}timestamp={timestamp}&sign={sign}" def send_text_message(self, content: str, at_all: bool = False, at_mobiles: list = None) -> bool: """ 发送文本消息 Args: content: 消息内容 at_all: 是否@所有人 at_mobiles: @指定手机号列表 Returns: 发送是否成功 """ try: data = { "msgtype": "text", "text": { "content": content } } # 添加@功能 if at_all or at_mobiles: data["at"] = {} if at_all: data["at"]["isAtAll"] = True if at_mobiles: data["at"]["atMobiles"] = at_mobiles response = self.session.post( self._get_signed_url(), json=data, headers={'Content-Type': 'application/json'}, timeout=10 ) if response.status_code == 200: result = response.json() if result.get('errcode') == 0: logger.info("钉钉消息发送成功") return True else: logger.error(f"钉钉消息发送失败: {result.get('errmsg', '未知错误')}") return False else: logger.error(f"钉钉API请求失败: HTTP {response.status_code}") return False except Exception as e: logger.error(f"发送钉钉消息异常: {e}") return False def send_markdown_message(self, title: str, text: str, at_all: bool = False, at_mobiles: list = None) -> bool: """ 发送Markdown格式消息 Args: title: 消息标题 text: Markdown格式的消息内容 at_all: 是否@所有人 at_mobiles: @指定手机号列表 Returns: 发送是否成功 """ try: data = { "msgtype": "markdown", "markdown": { "title": title, "text": text } } # 添加@功能 if at_all or at_mobiles: data["at"] = {} if at_all: data["at"]["isAtAll"] = True if at_mobiles: data["at"]["atMobiles"] = at_mobiles response = self.session.post( self._get_signed_url(), json=data, headers={'Content-Type': 'application/json'}, timeout=10 ) if response.status_code == 200: result = response.json() if result.get('errcode') == 0: logger.info("钉钉Markdown消息发送成功") return True else: logger.error(f"钉钉Markdown消息发送失败: {result.get('errmsg', '未知错误')}") return False else: logger.error(f"钉钉API请求失败: HTTP {response.status_code}") return False except Exception as e: logger.error(f"发送钉钉Markdown消息异常: {e}") return False def send_strategy_summary_message(self, title: str, markdown_text: str) -> bool: """ 发送策略汇总消息(Markdown格式) Args: title: 消息标题 markdown_text: Markdown格式的消息内容 Returns: 发送是否成功 """ return self.send_markdown_message(title, markdown_text) def send_strategy_signal(self, stock_code: str, stock_name: str, timeframe: str, signal_type: str, price: float, signal_date: str = None, additional_info: Dict[str, Any] = None) -> bool: """ 发送策略信号通知 Args: stock_code: 股票代码 stock_name: 股票名称 timeframe: 时间周期 signal_type: 信号类型 price: 当前价格 signal_date: 信号发生的时间(K线时间) additional_info: 额外信息 Returns: 发送是否成功 """ try: # 使用信号时间或当前时间 display_time = signal_date if signal_date else datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 构建Markdown消息 title = f"📈 {signal_type}信号提醒" markdown_text = f""" # 📈 {signal_type}信号提醒 **股票信息:** - 代码: `{stock_code}` - 名称: `{stock_name}` - 价格: `{price}` 元 - 时间周期: `{timeframe}` **信号时间:** {display_time} **策略说明:** 两阳线+阴线+阳线形态突破 --- *量化交易系统自动发送* """ # 添加额外信息 if additional_info: markdown_text += "\n**额外信息:**\n" for key, value in additional_info.items(): markdown_text += f"- {key}: `{value}`\n" return self.send_markdown_message(title, markdown_text) except Exception as e: logger.error(f"发送策略信号通知异常: {e}") return False class NotificationManager: """通知管理器""" def __init__(self, config: Dict[str, Any]): """ 初始化通知管理器 Args: config: 通知配置 """ self.config = config self.dingtalk_notifier = None # 初始化钉钉通知器 dingtalk_config = config.get('dingtalk', {}) if dingtalk_config.get('enabled', False): webhook_url = dingtalk_config.get('webhook_url') secret = dingtalk_config.get('secret') if webhook_url: self.dingtalk_notifier = DingTalkNotifier(webhook_url, secret) logger.info("钉钉通知器已启用") else: logger.warning("钉钉通知已启用但未配置webhook_url") def send_strategy_signal(self, stock_code: str, stock_name: str, timeframe: str, signal_type: str, price: float, signal_date: str = None, additional_info: Dict[str, Any] = None) -> bool: """ 发送策略信号到所有启用的通知渠道 Args: stock_code: 股票代码 stock_name: 股票名称 timeframe: 时间周期 signal_type: 信号类型 price: 当前价格 signal_date: 信号发生的时间(K线时间) additional_info: 额外信息 Returns: 是否至少有一个渠道发送成功 """ success = False # 钉钉通知 if self.dingtalk_notifier: if self.dingtalk_notifier.send_strategy_signal( stock_code, stock_name, timeframe, signal_type, price, signal_date, additional_info ): success = True # 记录到日志 logger.info(f"策略信号: {signal_type} | {stock_code}({stock_name}) | {timeframe} | {price}元") if additional_info: logger.info(f"额外信息: {additional_info}") return success def send_strategy_summary(self, all_signals: Dict[str, Any], scan_stats: Dict[str, Any] = None) -> bool: """ 发送策略信号汇总通知(支持分组发送) Args: all_signals: 所有信号的汇总数据 {stock_code: {timeframe: [signals]}} scan_stats: 扫描统计信息 Returns: 发送是否成功 """ if not all_signals: return False try: from datetime import datetime import math current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 收集所有信号详情 all_signal_details = [] total_signals = 0 total_stocks = len(all_signals) for stock_code, stock_results in all_signals.items(): for timeframe, signals in stock_results.items(): for signal in signals: total_signals += 1 all_signal_details.append({ 'stock_code': stock_code, 'stock_name': signal.get('stock_name', '未知'), 'timeframe': timeframe, 'signal_date': signal['date'], 'price': signal['breakout_price'], 'turnover': signal.get('turnover_ratio', 0), 'breakout_pct': signal.get('breakout_pct', 0), 'ema20_status': '✅上方' if signal.get('above_ema20', False) else '❌下方' }) # 如果没有信号,直接返回 if total_signals == 0: return True # 按10个信号为一组分批发送 signals_per_group = 10 total_groups = math.ceil(total_signals / signals_per_group) success_count = 0 for group_idx in range(total_groups): start_idx = group_idx * signals_per_group end_idx = min(start_idx + signals_per_group, total_signals) group_signals = all_signal_details[start_idx:end_idx] # 构建当前组的消息 if total_groups > 1: title = f"📈 K线形态策略信号汇总 ({group_idx + 1}/{total_groups})" else: title = f"📈 K线形态策略信号汇总" markdown_text = f""" # {title} **扫描统计:** - 扫描时间: `{current_time}` - 总信号数: `{total_signals}` 个 - 本组信号: `{len(group_signals)}` 个 ({start_idx + 1}-{end_idx}) - 涉及股票: `{total_stocks}` 只 """ # 添加扫描范围信息 if scan_stats: markdown_text += f""" **扫描范围:** - 扫描股票总数: `{scan_stats.get('total_scanned', 'N/A')}` - 数据源: `{scan_stats.get('data_source', '热门股票')}` """ markdown_text += "\n**信号详情:**\n" # 添加当前组的信号详情 for i, signal in enumerate(group_signals, start_idx + 1): markdown_text += f""" {i}. **{signal['stock_code']} - {signal['stock_name']}** - K线时间: `{signal['signal_date']}` - 时间周期: `{signal['timeframe']}` - 当前价格: `{signal['price']:.2f}元` - 突破幅度: `{signal['breakout_pct']:.2f}%` - 换手率: `{signal['turnover']:.2f}%` - EMA20: `{signal['ema20_status']}` """ markdown_text += """ --- **策略说明:** 两阳线+阴线+阳线形态突破 *量化交易系统自动发送* """ # 发送当前组的通知 if self.dingtalk_notifier: if self.dingtalk_notifier.send_markdown_message(title, markdown_text): success_count += 1 logger.info(f"📱 发送信号汇总第{group_idx + 1}组成功 ({len(group_signals)}个信号)") else: logger.error(f"📱 发送信号汇总第{group_idx + 1}组失败") # 避免发送过快,添加短暂延迟 if group_idx < total_groups - 1: # 不是最后一组 import time time.sleep(1) # 1秒延迟 return success_count > 0 except Exception as e: logger.error(f"发送策略汇总通知异常: {e}") return False def send_pullback_alerts(self, pullback_alerts: list) -> bool: """ 发送价格回踩阴线最高点的特殊提醒 Args: pullback_alerts: 回踩提醒信号列表 Returns: 发送是否成功 """ if not pullback_alerts or not self.dingtalk_notifier: return False try: current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 按5个提醒为一组分批发送 alerts_per_group = 5 import math total_groups = math.ceil(len(pullback_alerts) / alerts_per_group) success_count = 0 for group_idx in range(total_groups): start_idx = group_idx * alerts_per_group end_idx = min(start_idx + alerts_per_group, len(pullback_alerts)) group_alerts = pullback_alerts[start_idx:end_idx] # 构建标题 title = f"⚠️ 价格回踩阴线最高点提醒 ({group_idx + 1}/{total_groups})" # 构建详细信息 alert_details = [] for i, alert in enumerate(group_alerts, 1): alert_detail = f""" **{start_idx + i}. {alert['stock_code']}({alert['stock_name']})** - 📅 原信号: {alert['signal_date']} | 当前: {alert['current_date']} - ⏰ 间隔: {alert['days_since_signal']}天 | 周期: {alert['timeframe']} - 💰 阴线高点: {alert['yin_high']:.2f}元 | 当时突破价: {alert['breakout_price']:.2f}元 - 📉 当前价格: {alert['current_price']:.2f}元 | 今日最低: {alert['current_low']:.2f}元 - 📊 回调幅度: {alert['pullback_pct']:.2f}% | 距阴线高点: {alert['distance_to_yin_high']:.2f}% """ alert_details.append(alert_detail) # 构建完整的Markdown消息 markdown_text = f""" # ⚠️ 价格回踩阴线最高点提醒 **🚨 重要提醒:** 以下股票在"两阳+阴+阳"形态突破后,价格回踩至阴线最高点附近,请关注支撑情况! **📊 本批提醒数量:** {len(group_alerts)}个 **🕐 检查时间:** {current_time} --- {''.join(alert_details)} --- **💡 操作建议:** - 关注是否在阴线最高点获得有效支撑 - 如跌破阴线最高点需要重新评估形态有效性 - 建议结合成交量和其他技术指标综合判断 **⚠️ 风险提示:** 本提醒仅供参考,投资需谨慎! """ # 发送消息 if self.dingtalk_notifier.send_markdown_message(title, markdown_text): success_count += 1 logger.info(f"📱 回踩提醒第{group_idx + 1}组发送成功 ({len(group_alerts)}个提醒)") else: logger.error(f"📱 回踩提醒第{group_idx + 1}组发送失败") # 避免发送过快,添加短暂延迟 if group_idx < total_groups - 1: import time time.sleep(1) # 1秒延迟 return success_count > 0 except Exception as e: logger.error(f"发送回踩提醒通知异常: {e}") return False def send_test_message(self) -> bool: """发送测试消息""" if self.dingtalk_notifier: return self.dingtalk_notifier.send_text_message("量化交易系统通知测试 ✅") return False if __name__ == "__main__": # 测试代码 # 注意: 需要有效的钉钉webhook地址才能测试 test_config = { 'dingtalk': { 'enabled': False, # 设置为True并提供webhook_url进行测试 'webhook_url': 'https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN' } } notifier = NotificationManager(test_config) print("通知管理器初始化完成")