trading.ai/dingtalk_notifier.py
2025-08-14 10:06:19 +08:00

244 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import logging
import hashlib
import hmac
import base64
import urllib.parse
import time
from datetime import datetime
from typing import List, Optional
from technical_analyzer import CoinSignal
class DingTalkNotifier:
def __init__(self, webhook_url: str = None, secret: str = None):
"""初始化钉钉通知器
Args:
webhook_url: 钉钉机器人的webhook URL
secret: 钉钉机器人的加签密钥(可选)
"""
self.webhook_url = webhook_url
self.secret = secret
self.enabled = webhook_url is not None and webhook_url.strip() != ""
if not self.enabled:
logging.warning("钉钉webhook URL未配置通知功能已禁用")
elif self.secret:
logging.info("钉钉通知已启用(加签模式)")
else:
logging.info("钉钉通知已启用(普通模式)")
def send_coin_selection_notification(self, signals: List[CoinSignal]) -> bool:
"""发送选币结果通知
Args:
signals: 选币信号列表
Returns:
bool: 发送是否成功
"""
if not self.enabled:
logging.info("钉钉通知未启用,跳过发送")
return False
if not signals:
return self._send_no_signals_notification()
try:
# 按多头和空头分组
long_signals = [s for s in signals if s.signal_type == "LONG"]
short_signals = [s for s in signals if s.signal_type == "SHORT"]
# 构建消息内容
message = self._build_selection_message(long_signals, short_signals)
# 发送消息
return self._send_message(message)
except Exception as e:
logging.error(f"发送钉钉通知失败: {e}")
return False
def _build_selection_message(self, long_signals: List[CoinSignal], short_signals: List[CoinSignal]) -> dict:
"""构建选币结果消息"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 构建markdown格式消息
markdown_text = f"# 🎯 加密货币选币结果\n\n"
markdown_text += f"**时间**: {current_time}\n"
markdown_text += f"**总计**: {len(long_signals + short_signals)}个信号 (多头{len(long_signals)}个, 空头{len(short_signals)}个)\n\n"
# 多头信号
if long_signals:
markdown_text += "## 📈 多头信号\n\n"
for i, signal in enumerate(long_signals, 1):
confidence_emoji = "🔥" if signal.confidence == "" else "" if signal.confidence == "" else "💡"
strategy_emoji = self._get_strategy_emoji(signal.strategy_type)
markdown_text += f"### {i}. {signal.symbol} {confidence_emoji}\n"
markdown_text += f"- **策略**: {strategy_emoji} {signal.strategy_type}\n"
markdown_text += f"- **评分**: {signal.score:.1f}分 ({signal.confidence}信心)\n"
markdown_text += f"- **建议**: {signal.action_suggestion}\n"
markdown_text += f"- **入场**: ${signal.entry_price:.4f}\n"
markdown_text += f"- **止损**: ${signal.stop_loss:.4f} ({self._get_percentage_change(signal.entry_price, signal.stop_loss):.1f}%)\n"
markdown_text += f"- **止盈**: ${signal.take_profit:.4f} ({self._get_percentage_change(signal.entry_price, signal.take_profit):.1f}%)\n"
markdown_text += f"- **风险回报比**: 1:{signal.risk_reward_ratio:.2f}\n"
markdown_text += f"- **持仓周期**: {signal.holding_period}\n\n"
# 空头信号
if short_signals:
markdown_text += "## 📉 空头信号\n\n"
for i, signal in enumerate(short_signals, 1):
confidence_emoji = "🔥" if signal.confidence == "" else "" if signal.confidence == "" else "💡"
strategy_emoji = self._get_strategy_emoji(signal.strategy_type)
markdown_text += f"### {i}. {signal.symbol} {confidence_emoji}\n"
markdown_text += f"- **策略**: {strategy_emoji} {signal.strategy_type}\n"
markdown_text += f"- **评分**: {signal.score:.1f}分 ({signal.confidence}信心)\n"
markdown_text += f"- **建议**: {signal.action_suggestion}\n"
markdown_text += f"- **入场**: ${signal.entry_price:.4f}\n"
markdown_text += f"- **止损**: ${signal.stop_loss:.4f} ({self._get_percentage_change(signal.entry_price, signal.stop_loss):.1f}%)\n"
markdown_text += f"- **止盈**: ${signal.take_profit:.4f} ({self._get_percentage_change(signal.entry_price, signal.take_profit):.1f}%)\n"
markdown_text += f"- **风险回报比**: 1:{signal.risk_reward_ratio:.2f}\n"
markdown_text += f"- **持仓周期**: {signal.holding_period}\n\n"
# 添加风险提示
markdown_text += "---\n"
markdown_text += "⚠️ **风险提示**: 投资有风险,决策需谨慎。本信号仅供参考,不构成投资建议。\n"
markdown_text += "🤖 *由AI选币系统自动生成*"
return {
"msgtype": "markdown",
"markdown": {
"title": f"选币结果 ({len(long_signals + short_signals)}个信号)",
"text": markdown_text
}
}
def _send_no_signals_notification(self) -> bool:
"""发送无信号通知"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = {
"msgtype": "text",
"text": {
"content": f"📊 加密货币选币结果\n\n"
f"时间: {current_time}\n"
f"结果: 暂无符合条件的投资机会\n"
f"建议: 继续观察市场动态\n\n"
f"🤖 由AI选币系统自动生成"
}
}
return self._send_message(message)
def _generate_sign_url(self) -> str:
"""生成带加签的webhook URL"""
if not self.secret:
return self.webhook_url
# 当前时间戳(毫秒)
timestamp = str(round(time.time() * 1000))
# 拼接字符串
string_to_sign = f"{timestamp}\n{self.secret}"
string_to_sign_enc = string_to_sign.encode('utf-8')
secret_enc = self.secret.encode('utf-8')
# HMAC-SHA256加密
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
# Base64编码
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
# 拼接最终URL
if '?' in self.webhook_url:
signed_url = f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
else:
signed_url = f"{self.webhook_url}?timestamp={timestamp}&sign={sign}"
return signed_url
def _send_message(self, message: dict) -> bool:
"""发送消息到钉钉"""
try:
# 生成签名URL如果配置了密钥
url = self._generate_sign_url()
headers = {'Content-Type': 'application/json'}
response = requests.post(
url,
data=json.dumps(message),
headers=headers,
timeout=10
)
if response.status_code == 200:
result = response.json()
if result.get('errcode') == 0:
logging.info("钉钉通知发送成功")
return True
else:
logging.error(f"钉钉通知发送失败: {result.get('errmsg', '未知错误')}")
return False
else:
logging.error(f"钉钉通知HTTP请求失败: {response.status_code}")
return False
except Exception as e:
logging.error(f"发送钉钉消息异常: {e}")
return False
def _get_strategy_emoji(self, strategy: str) -> str:
"""获取策略对应的emoji"""
strategy_emojis = {
"超短线": "",
"短线": "🏃",
"中线": "🚶",
"波段": "🌊",
"长线": "🐢",
"趋势": "📈"
}
return strategy_emojis.get(strategy, "📊")
def _get_percentage_change(self, entry_price: float, target_price: float) -> float:
"""计算价格变化百分比"""
if entry_price == 0:
return 0
return ((target_price - entry_price) / entry_price) * 100
def send_test_message(self) -> bool:
"""发送测试消息"""
if not self.enabled:
print("钉钉webhook URL未配置无法发送测试消息")
return False
test_message = {
"msgtype": "text",
"text": {
"content": f"🤖 加密货币选币系统测试消息\n\n"
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
f"状态: 系统运行正常\n"
f"功能: 钉钉通知已配置成功"
}
}
success = self._send_message(test_message)
if success:
print("✅ 钉钉测试消息发送成功")
else:
print("❌ 钉钉测试消息发送失败")
return success
# 测试脚本
if __name__ == "__main__":
# 测试用例
webhook_url = input("请输入钉钉机器人webhook URL: ").strip()
if webhook_url:
notifier = DingTalkNotifier(webhook_url)
notifier.send_test_message()
else:
print("未提供webhook URL退出测试")