244 lines
9.9 KiB
Python
244 lines
9.9 KiB
Python
import requests
|
||
import json
|
||
import logging
|
||
import hashlib
|
||
import hmac
|
||
import base64
|
||
import urllib.parse
|
||
import time
|
||
from datetime import datetime
|
||
from typing import List, Optional
|
||
from technical_analyzer import CoinSignal
|
||
|
||
class DingTalkNotifier:
|
||
def __init__(self, webhook_url: str = None, secret: str = None):
|
||
"""初始化钉钉通知器
|
||
|
||
Args:
|
||
webhook_url: 钉钉机器人的webhook URL
|
||
secret: 钉钉机器人的加签密钥(可选)
|
||
"""
|
||
self.webhook_url = webhook_url
|
||
self.secret = secret
|
||
self.enabled = webhook_url is not None and webhook_url.strip() != ""
|
||
|
||
if not self.enabled:
|
||
logging.warning("钉钉webhook URL未配置,通知功能已禁用")
|
||
elif self.secret:
|
||
logging.info("钉钉通知已启用(加签模式)")
|
||
else:
|
||
logging.info("钉钉通知已启用(普通模式)")
|
||
|
||
def send_coin_selection_notification(self, signals: List[CoinSignal]) -> bool:
|
||
"""发送选币结果通知
|
||
|
||
Args:
|
||
signals: 选币信号列表
|
||
|
||
Returns:
|
||
bool: 发送是否成功
|
||
"""
|
||
if not self.enabled:
|
||
logging.info("钉钉通知未启用,跳过发送")
|
||
return False
|
||
|
||
if not signals:
|
||
return self._send_no_signals_notification()
|
||
|
||
try:
|
||
# 按多头和空头分组
|
||
long_signals = [s for s in signals if s.signal_type == "LONG"]
|
||
short_signals = [s for s in signals if s.signal_type == "SHORT"]
|
||
|
||
# 构建消息内容
|
||
message = self._build_selection_message(long_signals, short_signals)
|
||
|
||
# 发送消息
|
||
return self._send_message(message)
|
||
|
||
except Exception as e:
|
||
logging.error(f"发送钉钉通知失败: {e}")
|
||
return False
|
||
|
||
def _build_selection_message(self, long_signals: List[CoinSignal], short_signals: List[CoinSignal]) -> dict:
|
||
"""构建选币结果消息"""
|
||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# 构建markdown格式消息
|
||
markdown_text = f"# 🎯 加密货币选币结果\n\n"
|
||
markdown_text += f"**时间**: {current_time}\n"
|
||
markdown_text += f"**总计**: {len(long_signals + short_signals)}个信号 (多头{len(long_signals)}个, 空头{len(short_signals)}个)\n\n"
|
||
|
||
# 多头信号
|
||
if long_signals:
|
||
markdown_text += "## 📈 多头信号\n\n"
|
||
for i, signal in enumerate(long_signals, 1):
|
||
confidence_emoji = "🔥" if signal.confidence == "高" else "⚡" if signal.confidence == "中" else "💡"
|
||
strategy_emoji = self._get_strategy_emoji(signal.strategy_type)
|
||
|
||
markdown_text += f"### {i}. {signal.symbol} {confidence_emoji}\n"
|
||
markdown_text += f"- **策略**: {strategy_emoji} {signal.strategy_type}\n"
|
||
markdown_text += f"- **评分**: {signal.score:.1f}分 ({signal.confidence}信心)\n"
|
||
markdown_text += f"- **建议**: {signal.action_suggestion}\n"
|
||
markdown_text += f"- **入场**: ${signal.entry_price:.4f}\n"
|
||
markdown_text += f"- **止损**: ${signal.stop_loss:.4f} ({self._get_percentage_change(signal.entry_price, signal.stop_loss):.1f}%)\n"
|
||
markdown_text += f"- **止盈**: ${signal.take_profit:.4f} ({self._get_percentage_change(signal.entry_price, signal.take_profit):.1f}%)\n"
|
||
markdown_text += f"- **风险回报比**: 1:{signal.risk_reward_ratio:.2f}\n"
|
||
markdown_text += f"- **持仓周期**: {signal.holding_period}天\n\n"
|
||
|
||
# 空头信号
|
||
if short_signals:
|
||
markdown_text += "## 📉 空头信号\n\n"
|
||
for i, signal in enumerate(short_signals, 1):
|
||
confidence_emoji = "🔥" if signal.confidence == "高" else "⚡" if signal.confidence == "中" else "💡"
|
||
strategy_emoji = self._get_strategy_emoji(signal.strategy_type)
|
||
|
||
markdown_text += f"### {i}. {signal.symbol} {confidence_emoji}\n"
|
||
markdown_text += f"- **策略**: {strategy_emoji} {signal.strategy_type}\n"
|
||
markdown_text += f"- **评分**: {signal.score:.1f}分 ({signal.confidence}信心)\n"
|
||
markdown_text += f"- **建议**: {signal.action_suggestion}\n"
|
||
markdown_text += f"- **入场**: ${signal.entry_price:.4f}\n"
|
||
markdown_text += f"- **止损**: ${signal.stop_loss:.4f} ({self._get_percentage_change(signal.entry_price, signal.stop_loss):.1f}%)\n"
|
||
markdown_text += f"- **止盈**: ${signal.take_profit:.4f} ({self._get_percentage_change(signal.entry_price, signal.take_profit):.1f}%)\n"
|
||
markdown_text += f"- **风险回报比**: 1:{signal.risk_reward_ratio:.2f}\n"
|
||
markdown_text += f"- **持仓周期**: {signal.holding_period}天\n\n"
|
||
|
||
# 添加风险提示
|
||
markdown_text += "---\n"
|
||
markdown_text += "⚠️ **风险提示**: 投资有风险,决策需谨慎。本信号仅供参考,不构成投资建议。\n"
|
||
markdown_text += "🤖 *由AI选币系统自动生成*"
|
||
|
||
return {
|
||
"msgtype": "markdown",
|
||
"markdown": {
|
||
"title": f"选币结果 ({len(long_signals + short_signals)}个信号)",
|
||
"text": markdown_text
|
||
}
|
||
}
|
||
|
||
def _send_no_signals_notification(self) -> bool:
|
||
"""发送无信号通知"""
|
||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
message = {
|
||
"msgtype": "text",
|
||
"text": {
|
||
"content": f"📊 加密货币选币结果\n\n"
|
||
f"时间: {current_time}\n"
|
||
f"结果: 暂无符合条件的投资机会\n"
|
||
f"建议: 继续观察市场动态\n\n"
|
||
f"🤖 由AI选币系统自动生成"
|
||
}
|
||
}
|
||
|
||
return self._send_message(message)
|
||
|
||
def _generate_sign_url(self) -> str:
|
||
"""生成带加签的webhook URL"""
|
||
if not self.secret:
|
||
return self.webhook_url
|
||
|
||
# 当前时间戳(毫秒)
|
||
timestamp = str(round(time.time() * 1000))
|
||
|
||
# 拼接字符串
|
||
string_to_sign = f"{timestamp}\n{self.secret}"
|
||
string_to_sign_enc = string_to_sign.encode('utf-8')
|
||
secret_enc = self.secret.encode('utf-8')
|
||
|
||
# HMAC-SHA256加密
|
||
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
|
||
|
||
# Base64编码
|
||
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
|
||
|
||
# 拼接最终URL
|
||
if '?' in self.webhook_url:
|
||
signed_url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}"
|
||
else:
|
||
signed_url = f"{self.webhook_url}?timestamp={timestamp}&sign={sign}"
|
||
|
||
return signed_url
|
||
|
||
def _send_message(self, message: dict) -> bool:
|
||
"""发送消息到钉钉"""
|
||
try:
|
||
# 生成签名URL(如果配置了密钥)
|
||
url = self._generate_sign_url()
|
||
|
||
headers = {'Content-Type': 'application/json'}
|
||
response = requests.post(
|
||
url,
|
||
data=json.dumps(message),
|
||
headers=headers,
|
||
timeout=10
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('errcode') == 0:
|
||
logging.info("钉钉通知发送成功")
|
||
return True
|
||
else:
|
||
logging.error(f"钉钉通知发送失败: {result.get('errmsg', '未知错误')}")
|
||
return False
|
||
else:
|
||
logging.error(f"钉钉通知HTTP请求失败: {response.status_code}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logging.error(f"发送钉钉消息异常: {e}")
|
||
return False
|
||
|
||
def _get_strategy_emoji(self, strategy: str) -> str:
|
||
"""获取策略对应的emoji"""
|
||
strategy_emojis = {
|
||
"超短线": "⚡",
|
||
"短线": "🏃",
|
||
"中线": "🚶",
|
||
"波段": "🌊",
|
||
"长线": "🐢",
|
||
"趋势": "📈"
|
||
}
|
||
return strategy_emojis.get(strategy, "📊")
|
||
|
||
def _get_percentage_change(self, entry_price: float, target_price: float) -> float:
|
||
"""计算价格变化百分比"""
|
||
if entry_price == 0:
|
||
return 0
|
||
return ((target_price - entry_price) / entry_price) * 100
|
||
|
||
def send_test_message(self) -> bool:
|
||
"""发送测试消息"""
|
||
if not self.enabled:
|
||
print("钉钉webhook URL未配置,无法发送测试消息")
|
||
return False
|
||
|
||
test_message = {
|
||
"msgtype": "text",
|
||
"text": {
|
||
"content": f"🤖 加密货币选币系统测试消息\n\n"
|
||
f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||
f"状态: 系统运行正常\n"
|
||
f"功能: 钉钉通知已配置成功"
|
||
}
|
||
}
|
||
|
||
success = self._send_message(test_message)
|
||
if success:
|
||
print("✅ 钉钉测试消息发送成功")
|
||
else:
|
||
print("❌ 钉钉测试消息发送失败")
|
||
|
||
return success
|
||
|
||
# 测试脚本
|
||
if __name__ == "__main__":
|
||
# 测试用例
|
||
webhook_url = input("请输入钉钉机器人webhook URL: ").strip()
|
||
|
||
if webhook_url:
|
||
notifier = DingTalkNotifier(webhook_url)
|
||
notifier.send_test_message()
|
||
else:
|
||
print("未提供webhook URL,退出测试") |