stock-ai-agent/backend/app/astock_agent/notifier.py
2026-02-27 09:54:17 +08:00

339 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
钉钉通知模块
格式化并发送板块异动通知
"""
import json
import hmac
import hashlib
import base64
import time
import requests
from typing import Dict, List
from datetime import datetime
from urllib.parse import quote
from app.utils.logger import logger
class DingTalkNotifier:
"""钉钉通知器"""
def __init__(self, webhook: str, secret: str = None):
"""
初始化通知器
Args:
webhook: 钉钉机器人 Webhook URL
secret: 加签密钥(可选)
"""
self.webhook = webhook
self.secret = secret
def _sign(self, timestamp: int) -> str:
"""
生成签名
Args:
timestamp: 时间戳(毫秒)
Returns:
签名字符串
"""
if not self.secret:
return ""
secret_enc = self.secret.encode('utf-8')
string_to_sign = f'{timestamp}\n{self.secret}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = base64.b64encode(hmac_code).decode('utf-8')
return sign
def _build_url(self) -> str:
"""
构建带签名的 Webhook URL
Returns:
完整的 Webhook URL
"""
if not self.secret:
return self.webhook
timestamp = int(time.time() * 1000)
sign = self._sign(timestamp)
sign_encoded = quote(sign, safe='')
return f"{self.webhook}&timestamp={timestamp}&sign={sign_encoded}"
def send_sector_alert(self, sector_data: Dict, top_stocks: List[Dict], reason: str = "") -> bool:
"""
发送板块异动提醒
Args:
sector_data: 板块数据
top_stocks: 龙头股列表
reason: 异动原因
Returns:
是否发送成功
"""
try:
# 构建消息卡片
card = self._format_sector_card(sector_data, top_stocks, reason)
# 构建请求数据
data = {
"msgtype": "markdown",
"markdown": {
"title": f"🔥 {sector_data['name']} 异动提醒",
"text": card
}
}
# 构建带签名的 URL
url = self._build_url()
# 发送请求
headers = {"Content-Type": "application/json;charset=utf-8"}
response = requests.post(
url,
data=json.dumps(data),
headers=headers,
timeout=10
)
result = response.json()
if result.get("errcode") == 0:
logger.info(f"钉钉通知发送成功: {sector_data['name']}")
return True
else:
logger.error(f"钉钉通知发送失败: {result}")
return False
except Exception as e:
logger.error(f"发送钉钉通知异常: {e}")
return False
def _format_sector_card(self, sector_data: Dict, top_stocks: List[Dict], reason: str) -> str:
"""
格式化板块异动卡片
Args:
sector_data: 板块数据
top_stocks: 龙头股列表
reason: 异动原因
Returns:
Markdown 格式的消息内容
"""
lines = []
# 标题
lines.append("### 🔥 A股板块异动提醒")
lines.append("")
# 基本信息
change_pct = sector_data['change_pct']
change_icon = "📈" if change_pct > 0 else "📉"
lines.append(f"**异动板块**: {sector_data['name']} {change_icon} {change_pct:+.2f}%")
lines.append(f"**异动时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"**异动类型**: 涨幅突增 | {reason if reason else '资金集中流入'}")
lines.append("")
# 板块概况
lines.append("#### 📊 板块概况")
lines.append(f"- 涨幅: {change_pct:+.2f}%")
lines.append(f"- 涨跌额: {sector_data.get('change_amount', 0):+.2f}")
if sector_data.get('amount', 0) > 0:
amount = sector_data['amount']
if amount >= 100000:
amount_str = f"{amount/100000:.1f}亿"
else:
amount_str = f"{amount/10000:.1f}"
lines.append(f"- 成交额: {amount_str}")
if sector_data.get('leading_stock'):
lines.append(f"- 领涨股: {sector_data['leading_stock']}")
lines.append("")
# 龙头股
if top_stocks:
lines.append("#### 🏆 龙头股 Top " + str(len(top_stocks)))
lines.append("")
for idx, stock in enumerate(top_stocks, 1):
# 价格格式化
price = stock['price']
change_pct = stock['change_pct']
# 涨跌幅图标
if change_pct >= 9.9:
change_icon = "🚀"
elif change_pct >= 5:
change_icon = ""
elif change_pct > 0:
change_icon = "📈"
elif change_pct > -3:
change_icon = ""
else:
change_icon = "📉"
lines.append(f"**{idx}. {stock['name']} ({stock['code']})**")
lines.append(f" 现价: ¥{price:.2f} ({change_icon} {change_pct:+.2f}%)")
lines.append(f" 成交额: {self._format_amount(stock['amount'])}")
lines.append(f" 换手率: {stock['turnover']:.2f}%")
lines.append(f" 涨速: {stock['speed_level']}")
if stock.get('volume_ratio', 1) > 2:
lines.append(f" 量比: {stock['volume_ratio']:.2f} 🔥")
lines.append("")
lines.append("---")
lines.append(f"📊 综合评分: {top_stocks[0]['score']:.1f}")
return "\n".join(lines)
def _format_amount(self, amount: float) -> str:
"""
格式化成交额
Args:
amount: 成交额(元)
Returns:
格式化后的字符串
"""
if amount >= 100000000:
return f"{amount/100000000:.2f}亿"
elif amount >= 10000:
return f"{amount/10000:.2f}"
else:
return f"{amount:.0f}"
def send_summary(self, total_sectors: int, total_stocks: int) -> bool:
"""
发送监控汇总
Args:
total_sectors: 异动板块总数
total_stocks: 龙头股总数
Returns:
是否发送成功
"""
try:
lines = []
lines.append("### 📋 A股板块监控汇总")
lines.append("")
lines.append(f"**监控时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("")
lines.append("#### 📊 今日统计")
lines.append(f"- 异动板块: {total_sectors}")
lines.append(f"- 龙头股: {total_stocks}")
lines.append("")
lines.append("---")
lines.append(f"⏰ 下次更新: {datetime.now().strftime('%H:%M')}")
card = "\n".join(lines)
# 构建请求数据
data = {
"msgtype": "markdown",
"markdown": {
"title": "📋 A股板块监控汇总",
"text": card
}
}
# 构建带签名的 URL
url = self._build_url()
# 发送请求
headers = {"Content-Type": "application/json;charset=utf-8"}
response = requests.post(
url,
data=json.dumps(data),
headers=headers,
timeout=10
)
result = response.json()
if result.get("errcode") == 0:
logger.info(f"钉钉汇总发送成功")
return True
else:
logger.error(f"钉钉汇总发送失败: {result}")
return False
except Exception as e:
logger.error(f"发送钉钉汇总异常: {e}")
return False
def send_error(self, error_msg: str) -> bool:
"""
发送错误通知
Args:
error_msg: 错误信息
Returns:
是否发送成功
"""
try:
lines = []
lines.append("### ❌ A股板块监控异常")
lines.append("")
lines.append(f"**时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("")
lines.append(f"```\n{error_msg}\n```")
card = "\n".join(lines)
# 构建请求数据
data = {
"msgtype": "markdown",
"markdown": {
"title": "❌ A股板块监控异常",
"text": card
}
}
# 构建带签名的 URL
url = self._build_url()
# 发送请求
headers = {"Content-Type": "application/json;charset=utf-8"}
response = requests.post(
url,
data=json.dumps(data),
headers=headers,
timeout=10
)
result = response.json()
return result.get("errcode") == 0
except Exception as e:
logger.error(f"发送错误通知异常: {e}")
return False
# 全局单例
_notifier: DingTalkNotifier = None
def get_dingtalk_notifier() -> DingTalkNotifier:
"""获取钉钉通知器单例"""
global _notifier
if _notifier is None:
from app.config import get_settings
settings = get_settings()
# 优先使用A股专用配置否则使用通用配置
webhook = settings.dingtalk_astock_webhook or settings.dingtalk_webhook_url
secret = settings.dingtalk_astock_secret or settings.dingtalk_secret
if webhook:
_notifier = DingTalkNotifier(webhook, secret)
return _notifier