deliveryman-api/app/core/wecombot.py
2025-02-24 11:37:51 +08:00

227 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import requests
from typing import List, Dict, Union, Optional
from datetime import datetime
from app.core.utils import CommonUtils
from app.models.order import OrderStatus
from app.models.order import ShippingOrderDB
import aiohttp
from app.models.community import CommunityDB
from app.models.user import UserDB
from sqlalchemy.orm import Session
import logging
class WecomBot:
"""企业微信机器人工具类"""
def __init__(self, webhook_url: str = None):
"""
初始化机器人
Args:
webhook_url: 机器人webhook地址
"""
self.webhook_url = webhook_url
async def send_text(self, content: str, mentioned_list: List[str] = None, mentioned_mobile_list: List[str] = None, webhook_url: str = None) -> bool:
"""
发送文本消息
Args:
content: 消息内容
mentioned_list: 需要@的用户ID列表
mentioned_mobile_list: 需要@的手机号列表
Returns:
bool: 是否发送成功
"""
data = {
"msgtype": "text",
"text": {
"content": content
}
}
if mentioned_list:
data["text"]["mentioned_list"] = mentioned_list
if mentioned_mobile_list:
data["text"]["mentioned_mobile_list"] = mentioned_mobile_list
return await self._send(data, webhook_url)
async def send_markdown(self, content: str, webhook_url: str = None) -> bool:
"""
发送markdown消息
Args:
content: markdown格式的消息内容
Returns:
bool: 是否发送成功
"""
data = {
"msgtype": "markdown",
"markdown": {
"content": content
}
}
return await self._send(data, webhook_url)
async def send_image(self, base64: str, md5: str, webhook_url: str = None) -> bool:
"""
发送图片消息
Args:
base64: 图片base64编码
md5: 图片MD5值
Returns:
bool: 是否发送成功
"""
data = {
"msgtype": "image",
"image": {
"base64": base64,
"md5": md5
}
}
return await self._send(data)
async def send_news(self, articles: List[Dict[str, str]], webhook_url: str = None) -> bool:
"""
发送图文消息
Args:
articles: 图文消息列表每个消息包含title、description、url、picurl
Returns:
bool: 是否发送成功
"""
data = {
"msgtype": "news",
"news": {
"articles": articles
}
}
return await self._send(data, webhook_url)
async def send_template_card(self, template_card: Dict, webhook_url: str = None) -> bool:
"""
发送模板卡片消息
Args:
template_card: 模板卡片数据
Returns:
bool: 是否发送成功
"""
data = {
"msgtype": "template_card",
"template_card": template_card
}
return await self._send(data)
async def _send(self, data: Dict, webhook_url: str = None) -> bool:
"""
发送消息
Args:
data: 消息数据
Returns:
bool: 是否发送成功
"""
try:
headers = {'Content-Type': 'application/json'}
async with aiohttp.ClientSession() as session:
async with session.post(
webhook_url or self.webhook_url,
headers=headers,
data=json.dumps(data)
) as response:
result = await response.json()
if result.get('errcode') == 0:
return True
else:
print(f"发送失败: {result.get('errmsg')}")
return False
except Exception as e:
print(f"发送异常: {str(e)}")
return False
async def send_order_notification(self,db: Session, shipping_order: ShippingOrderDB, notify_type: OrderStatus = OrderStatus.CREATED) -> bool:
"""
发送订单通知
Args:
order_data: 订单数据
notify_type: 通知类型(create/complete/cancel)
Returns:
bool: 是否发送成功
"""
# 获取webhook
webhook = None
if shipping_order.address_community_id:
community = db.query(CommunityDB).filter(
CommunityDB.id == shipping_order.address_community_id
).first()
if community:
webhook = community.webot_webhook
if not webhook:
return False
deliveryman = None
if shipping_order.deliveryman_user_id:
deliveryman = db.query(UserDB).filter(
UserDB.userid == shipping_order.deliveryman_user_id
).first()
try:
if notify_type == OrderStatus.CREATED:
title = "🆕 新订单通知"
color = "yellow"
elif notify_type == OrderStatus.RECEIVED:
title = "🚚 已接单通知"
color = "blue"
elif notify_type == OrderStatus.COMPLETED:
title = "✅ 配送完成通知"
color = "green"
elif notify_type == OrderStatus.CANCELLED:
title = "❌ 订单取消通知"
color = "red"
else:
raise ValueError("不支持的通知类型")
content = f"""# {title}
> 订单号: {shipping_order.orderid}
> 下单时间: {CommonUtils.get_asia_datetime(shipping_order.create_time)}
> 包裹数量: {shipping_order.package_count}
> 订单金额: ¥{shipping_order.original_amount}
> 实付金额: ¥{shipping_order.final_amount}
> 订单状态: <font color="{color}">{shipping_order.status.status_text}</font>
**收件信息**
> 姓名: {shipping_order.address_customer_name}
> 电话: {shipping_order.address_customer_phone[:3]}\*\*\*\*{shipping_order.address_customer_phone[7:]}
> 地址: {shipping_order.address_community_name} {shipping_order.address_community_building_name} {shipping_order.address_detail[0:3]} \*\*\*\*
"""
if shipping_order.deliveryman_user_id:
content += f"""
**配送员信息**
> 姓名: {deliveryman.nickname}
> 电话: {deliveryman.phone[:3]}\*\*\*\*{deliveryman.phone[7:]}
"""
return await self.send_markdown(content, webhook)
except Exception as e:
logging.exception(f"发送企业微信消息失败: {str(e)}")
raise e