""" 测试钉钉通知服务 """ import asyncio import sys import os # 添加项目路径 sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from app.services.dingtalk_service import get_dingtalk_service async def test_dingtalk(): """测试钉钉消息发送""" dingtalk = get_dingtalk_service() print("=" * 50) print("测试钉钉通知服务") print("=" * 50) print(f"服务状态: {'启用' if dingtalk.enabled else '禁用'}") print(f"Webhook URL: {dingtalk.webhook_url[:50]}...") print(f"Secret: {dingtalk.secret[:20]}...") print() # 测试 1: 发送文本消息 print("测试 1: 发送文本消息...") success = await dingtalk.send_text("📊 钉钉通知服务测试\n\n这是一条测试消息,来自 Stock Agent 系统。") print(f"结果: {'✅ 成功' if success else '❌ 失败'}") print() # 等待一下 await asyncio.sleep(2) # 测试 2: 发送 Markdown 消息 print("测试 2: 发送 Markdown 消息...") markdown_content = """### 📈 交易信号测试 > **交易对**: BTCUSDT > **方向**: 🟢 做多 > **价格**: $95,000.00 > **信心度**: 85% **分析理由**: - 突破关键阻力位 - 量价配合良好 - 多周期共振向上 *来自 Stock Agent 系统* """ success = await dingtalk.send_markdown("交易信号", markdown_content) print(f"结果: {'✅ 成功' if success else '❌ 失败'}") print() await asyncio.sleep(2) # 测试 3: 发送 ActionCard 消息 print("测试 3: 发送 ActionCard 消息...") card_content = """### 📊 模拟交易报告 #### 统计概览 - 总交易次数: 50 - 胜率: 65% - 总盈亏: +15.2% #### 最近交易 | 交易对 | 方向 | 盈亏 | |--------|------|------| | BTCUSDT | 做多 | +5.2% | | ETHUSDT | 做空 | +2.1% | """ success = await dingtalk.send_action_card( title="📊 4小时交易报告", content=card_content, btn_orientation="0", btn_title="查看详情", btn_url="https://example.com" ) print(f"结果: {'✅ 成功' if success else '❌ 失败'}") print() # 测试 4: 发送交易信号 print("测试 4: 发送交易信号...") signal = { 'action': 'buy', 'symbol': 'BTCUSDT', 'price': 95000, 'trend': 'uptrend', 'confidence': 85, 'agent_type': 'crypto' } success = await dingtalk.send_trading_signal(signal) print(f"结果: {'✅ 成功' if success else '❌ 失败'}") print() print("=" * 50) print("测试完成!") print("=" * 50) if __name__ == "__main__": asyncio.run(test_dingtalk())