stock-ai-agent/backend/app/services/dingtalk_service.py
2026-02-26 20:46:56 +08:00

320 lines
8.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
钉钉通知服务 - 通过群机器人发送消息
"""
import httpx
import hmac
import hashlib
import base64
import json
from typing import Dict, Any, Optional
from datetime import datetime
from urllib.parse import quote
from app.utils.logger import logger
from app.config import get_settings
class DingTalkService:
"""钉钉群机器人通知服务"""
def __init__(self, webhook_url: str = "", secret: str = ""):
"""
初始化钉钉服务
Args:
webhook_url: 钉钉群机器人 Webhook URL
secret: 加签密钥
"""
settings = get_settings()
self.webhook_url = webhook_url or getattr(settings, 'dingtalk_webhook_url', '')
self.secret = secret or getattr(settings, 'dingtalk_secret', '')
# 检查配置开关和必要参数是否都有效
config_enabled = getattr(settings, 'dingtalk_enabled', True)
self.enabled = config_enabled and bool(self.webhook_url)
if not config_enabled:
logger.info("钉钉通知已通过配置禁用")
elif self.enabled:
logger.info(f"钉钉通知服务初始化完成")
else:
logger.warning("钉钉 Webhook URL 未配置,通知功能已禁用")
def _generate_sign(self, timestamp: int) -> str:
"""
生成钉钉签名
Args:
timestamp: 当前时间戳(毫秒)
Returns:
签名字符串
"""
if not self.secret:
return ""
secret_enc = self.secret.encode('utf-8')
string_to_sign = f'{timestamp}\n{self.secret}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = base64.b64encode(hmac_code).decode('utf-8')
return sign
def _build_url(self) -> str:
"""
构建带签名的 Webhook URL
Returns:
完整的 Webhook URL
"""
if not self.secret:
return self.webhook_url
timestamp = int(datetime.now().timestamp() * 1000)
sign = self._generate_sign(timestamp)
sign_encoded = quote(sign, safe='')
return f"{self.webhook_url}&timestamp={timestamp}&sign={sign_encoded}"
async def send_text(self, content: str, at_mobiles: list = None, at_user_ids: list = None) -> bool:
"""
发送文本消息
Args:
content: 消息内容
at_mobiles: @的手机号列表
at_user_ids: @的用户ID列表
Returns:
是否发送成功
"""
if not self.enabled:
logger.warning("钉钉服务未启用,跳过发送")
return False
data = {
"msgtype": "text",
"text": {
"content": content
}
}
# 添加 @ 信息
if at_mobiles or at_user_ids:
data["at"] = {
"atMobiles": at_mobiles or [],
"atUserIds": at_user_ids or [],
"isAtAll": False
}
return await self._send(data)
async def send_markdown(self, title: str, content: str) -> bool:
"""
发送 Markdown 消息
Args:
title: 消息标题
content: Markdown 格式的内容
Returns:
是否发送成功
"""
if not self.enabled:
logger.warning("钉钉服务未启用,跳过发送")
return False
data = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": content
}
}
return await self._send(data)
async def send_link(self, title: str, text: str, message_url: str, pic_url: str = "") -> bool:
"""
发送链接消息
Args:
title: 消息标题
text: 消息内容
message_url: 点击消息跳转的 URL
pic_url: 图片 URL
Returns:
是否发送成功
"""
if not self.enabled:
logger.warning("钉钉服务未启用,跳过发送")
return False
data = {
"msgtype": "link",
"link": {
"title": title,
"text": text,
"messageUrl": message_url
}
}
if pic_url:
data["link"]["picUrl"] = pic_url
return await self._send(data)
async def send_action_card(self, title: str, content: str, btn_orientation: str = "0",
btn_title: str = "", btn_url: str = "") -> bool:
"""
发送 ActionCard 消息(卡片消息)
Args:
title: 标题
content: Markdown 格式的内容
btn_orientation: 按钮排列方向0-竖直1-横向
btn_title: 按钮标题
btn_url: 按钮跳转链接
Returns:
是否发送成功
"""
if not self.enabled:
logger.warning("钉钉服务未启用,跳过发送")
return False
data = {
"msgtype": "actionCard",
"actionCard": {
"title": title,
"text": content,
"btnOrientation": btn_orientation
}
}
if btn_title and btn_url:
data["actionCard"]["btns"] = [
{
"title": btn_title,
"actionURL": btn_url
}
]
return await self._send(data)
async def send_feed_card(self, links: list) -> bool:
"""
发送 FeedCard 消息(多条链接)
Args:
links: 链接列表,每个元素包含 title, messageURL, picURL
Returns:
是否发送成功
"""
if not self.enabled:
logger.warning("钉钉服务未启用,跳过发送")
return False
data = {
"msgtype": "feedCard",
"feedCard": {
"links": links
}
}
return await self._send(data)
async def send_trading_signal(self, signal: Dict[str, Any]) -> bool:
"""
发送交易信号消息Markdown 格式)
Args:
signal: 交易信号数据
Returns:
是否发送成功
"""
if not self.enabled:
logger.warning("钉钉服务未启用,跳过发送")
return False
action = signal.get('action', 'hold')
symbol = signal.get('symbol', 'UNKNOWN')
price = signal.get('price', 0)
trend = signal.get('trend', 'neutral')
confidence = signal.get('confidence', 0)
agent_type = signal.get('agent_type', 'crypto')
# 操作方向映射
action_map = {
'buy': '🟢 做多',
'sell': '🔴 做空',
'hold': '⏸️ 观望',
'close': '❌ 平仓'
}
# 市场类型映射
market_map = {
'crypto': '[加密货币]',
'stock': '[股票]'
}
action_text = action_map.get(action, action)
market_text = market_map.get(agent_type, '')
title = f"{market_text} {symbol} {action_text} 信号"
content = f"""### {title}
> **操作**: {action_text}
> **价格**: ${price:,.2f}
> **趋势**: {trend}
> **信心度**: {confidence}%
*信号来源: Stock Agent*
"""
return await self.send_markdown(title, content)
async def _send(self, data: Dict[str, Any]) -> bool:
"""
发送消息到钉钉
Args:
data: 消息数据
Returns:
是否发送成功
"""
try:
url = self._build_url()
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(url, json=data)
result = response.json()
if result.get('errcode') == 0:
logger.info(f"钉钉消息发送成功")
return True
else:
logger.error(f"钉钉消息发送失败: {result.get('errmsg', 'Unknown error')}")
return False
except Exception as e:
logger.error(f"钉钉消息发送异常: {e}")
return False
# 全局单例
_dingtalk_service: Optional[DingTalkService] = None
def get_dingtalk_service() -> DingTalkService:
"""获取钉钉服务单例"""
global _dingtalk_service
if _dingtalk_service is None:
_dingtalk_service = DingTalkService()
return _dingtalk_service