227 lines
7.0 KiB
Python
227 lines
7.0 KiB
Python
import json
|
||
import requests
|
||
from typing import List, Dict, Union, Optional
|
||
from datetime import datetime
|
||
from app.core.utils import CommonUtils
|
||
from app.models.order import OrderStatus
|
||
from app.models.order import ShippingOrderDB
|
||
import aiohttp
|
||
from app.models.community import CommunityDB
|
||
from app.models.user import UserDB
|
||
from sqlalchemy.orm import Session
|
||
import logging
|
||
|
||
class WecomBot:
|
||
"""企业微信机器人工具类"""
|
||
|
||
def __init__(self, webhook_url: str = None):
|
||
"""
|
||
初始化机器人
|
||
|
||
Args:
|
||
webhook_url: 机器人webhook地址
|
||
"""
|
||
self.webhook_url = webhook_url
|
||
|
||
async def send_text(self, content: str, mentioned_list: List[str] = None, mentioned_mobile_list: List[str] = None, webhook_url: str = None) -> bool:
|
||
"""
|
||
发送文本消息
|
||
|
||
Args:
|
||
content: 消息内容
|
||
mentioned_list: 需要@的用户ID列表
|
||
mentioned_mobile_list: 需要@的手机号列表
|
||
|
||
Returns:
|
||
bool: 是否发送成功
|
||
"""
|
||
data = {
|
||
"msgtype": "text",
|
||
"text": {
|
||
"content": content
|
||
}
|
||
}
|
||
|
||
if mentioned_list:
|
||
data["text"]["mentioned_list"] = mentioned_list
|
||
if mentioned_mobile_list:
|
||
data["text"]["mentioned_mobile_list"] = mentioned_mobile_list
|
||
|
||
return await self._send(data, webhook_url)
|
||
|
||
async def send_markdown(self, content: str, webhook_url: str = None) -> bool:
|
||
"""
|
||
发送markdown消息
|
||
|
||
Args:
|
||
content: markdown格式的消息内容
|
||
|
||
Returns:
|
||
bool: 是否发送成功
|
||
"""
|
||
data = {
|
||
"msgtype": "markdown",
|
||
"markdown": {
|
||
"content": content
|
||
}
|
||
}
|
||
return await self._send(data, webhook_url)
|
||
|
||
async def send_image(self, base64: str, md5: str, webhook_url: str = None) -> bool:
|
||
"""
|
||
发送图片消息
|
||
|
||
Args:
|
||
base64: 图片base64编码
|
||
md5: 图片MD5值
|
||
|
||
Returns:
|
||
bool: 是否发送成功
|
||
"""
|
||
data = {
|
||
"msgtype": "image",
|
||
"image": {
|
||
"base64": base64,
|
||
"md5": md5
|
||
}
|
||
}
|
||
return await self._send(data)
|
||
|
||
async def send_news(self, articles: List[Dict[str, str]], webhook_url: str = None) -> bool:
|
||
"""
|
||
发送图文消息
|
||
|
||
Args:
|
||
articles: 图文消息列表,每个消息包含title、description、url、picurl
|
||
|
||
Returns:
|
||
bool: 是否发送成功
|
||
"""
|
||
data = {
|
||
"msgtype": "news",
|
||
"news": {
|
||
"articles": articles
|
||
}
|
||
}
|
||
return await self._send(data, webhook_url)
|
||
|
||
async def send_template_card(self, template_card: Dict, webhook_url: str = None) -> bool:
|
||
"""
|
||
发送模板卡片消息
|
||
|
||
Args:
|
||
template_card: 模板卡片数据
|
||
|
||
Returns:
|
||
bool: 是否发送成功
|
||
"""
|
||
data = {
|
||
"msgtype": "template_card",
|
||
"template_card": template_card
|
||
}
|
||
return await self._send(data)
|
||
|
||
async def _send(self, data: Dict, webhook_url: str = None) -> bool:
|
||
"""
|
||
发送消息
|
||
|
||
Args:
|
||
data: 消息数据
|
||
|
||
Returns:
|
||
bool: 是否发送成功
|
||
"""
|
||
try:
|
||
headers = {'Content-Type': 'application/json'}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.post(
|
||
webhook_url or self.webhook_url,
|
||
headers=headers,
|
||
data=json.dumps(data)
|
||
) as response:
|
||
result = await response.json()
|
||
|
||
if result.get('errcode') == 0:
|
||
return True
|
||
else:
|
||
print(f"发送失败: {result.get('errmsg')}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"发送异常: {str(e)}")
|
||
return False
|
||
|
||
def send_order_notification(self,db: Session, shipping_order: ShippingOrderDB, notify_type: OrderStatus = OrderStatus.CREATED) -> bool:
|
||
"""
|
||
发送订单通知
|
||
|
||
Args:
|
||
order_data: 订单数据
|
||
notify_type: 通知类型(create/complete/cancel)
|
||
|
||
Returns:
|
||
bool: 是否发送成功
|
||
"""
|
||
|
||
# 获取webhook
|
||
webhook = None
|
||
if shipping_order.address_community_id:
|
||
community = db.query(CommunityDB).filter(
|
||
CommunityDB.id == shipping_order.address_community_id
|
||
).first()
|
||
if community:
|
||
webhook = community.webot_webhook
|
||
|
||
if not webhook:
|
||
raise ValueError("不支持的通知类型")
|
||
|
||
deliveryman = None
|
||
if shipping_order.deliveryman_user_id:
|
||
deliveryman = db.query(UserDB).filter(
|
||
UserDB.userid == shipping_order.deliveryman_user_id
|
||
).first()
|
||
|
||
try:
|
||
if notify_type == OrderStatus.CREATED:
|
||
title = "🆕 新订单通知"
|
||
color = "yellow"
|
||
elif notify_type == OrderStatus.RECEIVED:
|
||
title = "🚚 已接单通知"
|
||
color = "blue"
|
||
elif notify_type == OrderStatus.COMPLETED:
|
||
title = "✅ 配送完成通知"
|
||
color = "green"
|
||
elif notify_type == OrderStatus.CANCELLED:
|
||
title = "❌ 订单取消通知"
|
||
color = "red"
|
||
else:
|
||
raise ValueError("不支持的通知类型")
|
||
|
||
content = f"""# {title}
|
||
|
||
> 订单号: {shipping_order.orderid}
|
||
> 下单时间: {CommonUtils.get_asia_datetime(shipping_order.create_time)}
|
||
> 包裹数量: {shipping_order.package_count}个
|
||
> 订单金额: ¥{shipping_order.original_amount}
|
||
> 实付金额: ¥{shipping_order.final_amount}
|
||
> 订单状态: <font color="{color}">{shipping_order.status.status_text}</font>
|
||
|
||
**收件信息**
|
||
> 姓名: {shipping_order.address_customer_name}
|
||
> 电话: {shipping_order.address_customer_phone[:3]}\*\*\*\*{shipping_order.address_customer_phone[7:]}
|
||
> 地址: {shipping_order.address_community_name} | {shipping_order.address_community_building_name} {shipping_order.address_detail[0:3]} \*\*\*\*
|
||
"""
|
||
|
||
if shipping_order.deliveryman_user_id:
|
||
content += f"""
|
||
**配送员信息**
|
||
> 姓名: {deliveryman.nickname}
|
||
> 电话: {deliveryman.phone[:3]}\*\*\*\*{deliveryman.phone[7:]}
|
||
"""
|
||
|
||
return self.send_markdown(content, webhook)
|
||
|
||
except Exception as e:
|
||
logging.exception(f"发送企业微信消息失败: {str(e)}")
|
||
raise e |