trading.ai/test_dingtalk.py
aaron 283901df18 Initial commit: A股量化交易系统
主要功能:
- K线形态策略: 两阳+阴+阳突破形态识别
- 信号时间修复: 使用K线时间而非发送时间
- 换手率约束: 最后阳线换手率不超过40%
- 汇总通知: 钉钉webhook单次发送所有信号
- 数据获取: 支持AKShare数据源
- 舆情分析: 北向资金、热门股票等

技术特性:
- 支持日线/周线/月线多时间周期
- EMA20趋势确认
- 实体比例验证
- 突破价格确认
- 流动性约束检查

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-16 15:59:48 +08:00

96 lines
2.8 KiB
Python

#!/usr/bin/env python3
"""
测试钉钉通知功能
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from utils.notification import DingTalkNotifier, NotificationManager
import yaml
def test_dingtalk_with_secret():
"""测试带加签的钉钉通知"""
print("🔧 测试钉钉加签功能...")
# 测试加签生成
webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN"
secret = "SEC6e9dbd71d4addd2c4e673fb72d686293b342da5ae48da2f8ec788a68de99f981"
notifier = DingTalkNotifier(webhook_url, secret)
# 生成签名URL
signed_url = notifier._get_signed_url()
print(f"✅ 签名URL生成成功")
print(f"📄 原始URL: {webhook_url}")
print(f"🔐 签名URL: {signed_url}")
# 检查URL格式
if "timestamp=" in signed_url and "sign=" in signed_url:
print("✅ 加签参数正确添加")
else:
print("❌ 加签参数缺失")
return False
return True
def test_notification_manager():
"""测试通知管理器配置"""
print("\n🔧 测试通知管理器配置...")
# 从配置文件读取配置
try:
with open('config/config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
notification_config = config.get('notification', {})
print(f"✅ 配置文件加载成功")
print(f"📄 钉钉配置: {notification_config.get('dingtalk', {})}")
# 创建通知管理器
notifier_manager = NotificationManager(notification_config)
if notifier_manager.dingtalk_notifier:
print("✅ 钉钉通知器初始化成功")
if notifier_manager.dingtalk_notifier.secret:
print("✅ 加签密钥配置正确")
else:
print("❌ 加签密钥未配置")
return False
else:
print("❌ 钉钉通知器未启用")
return False
return True
except Exception as e:
print(f"❌ 配置测试失败: {e}")
return False
def main():
print("=" * 60)
print(" 钉钉通知功能测试")
print("=" * 60)
# 测试加签功能
test1_passed = test_dingtalk_with_secret()
# 测试配置管理
test2_passed = test_notification_manager()
print("\n" + "=" * 60)
print("测试结果:")
print(f"🔐 加签功能测试: {'✅ 通过' if test1_passed else '❌ 失败'}")
print(f"⚙️ 配置管理测试: {'✅ 通过' if test2_passed else '❌ 失败'}")
if test1_passed and test2_passed:
print("\n🎉 所有测试通过!钉钉通知功能配置正确")
print("💡 注意: 需要提供完整的webhook URL才能发送实际消息")
else:
print("\n❌ 部分测试失败,请检查配置")
print("=" * 60)
if __name__ == "__main__":
main()