import json import requests from typing import List, Dict, Union, Optional from datetime import datetime from app.core.utils import CommonUtils from app.models.order import OrderStatus from app.models.order import ShippingOrderDB class WecomBot: """企业微信机器人工具类""" def __init__(self, webhook_url: str): """ 初始化机器人 Args: webhook_url: 机器人webhook地址 """ self.webhook_url = webhook_url def send_text(self, content: str, mentioned_list: List[str] = None, mentioned_mobile_list: List[str] = None) -> bool: """ 发送文本消息 Args: content: 消息内容 mentioned_list: 需要@的用户ID列表 mentioned_mobile_list: 需要@的手机号列表 Returns: bool: 是否发送成功 """ data = { "msgtype": "text", "text": { "content": content } } if mentioned_list: data["text"]["mentioned_list"] = mentioned_list if mentioned_mobile_list: data["text"]["mentioned_mobile_list"] = mentioned_mobile_list return self._send(data) def send_markdown(self, content: str) -> bool: """ 发送markdown消息 Args: content: markdown格式的消息内容 Returns: bool: 是否发送成功 """ data = { "msgtype": "markdown", "markdown": { "content": content } } return self._send(data) def send_image(self, base64: str, md5: str) -> bool: """ 发送图片消息 Args: base64: 图片base64编码 md5: 图片MD5值 Returns: bool: 是否发送成功 """ data = { "msgtype": "image", "image": { "base64": base64, "md5": md5 } } return self._send(data) def send_news(self, articles: List[Dict[str, str]]) -> bool: """ 发送图文消息 Args: articles: 图文消息列表,每个消息包含title、description、url、picurl Returns: bool: 是否发送成功 """ data = { "msgtype": "news", "news": { "articles": articles } } return self._send(data) def send_template_card(self, template_card: Dict) -> bool: """ 发送模板卡片消息 Args: template_card: 模板卡片数据 Returns: bool: 是否发送成功 """ data = { "msgtype": "template_card", "template_card": template_card } return self._send(data) def _send(self, data: Dict) -> bool: """ 发送消息 Args: data: 消息数据 Returns: bool: 是否发送成功 """ try: headers = {'Content-Type': 'application/json'} response = requests.post( self.webhook_url, headers=headers, data=json.dumps(data) ) result = response.json() if result.get('errcode') == 0: return True else: print(f"发送失败: {result.get('errmsg')}") return False except Exception as e: print(f"发送异常: {str(e)}") return False def send_order_notification(self, shipping_order: ShippingOrderDB, notify_type: OrderStatus = OrderStatus.CREATED) -> bool: """ 发送订单通知 Args: order_data: 订单数据 notify_type: 通知类型(create/complete/cancel) Returns: bool: 是否发送成功 """ try: if notify_type == OrderStatus.CREATED: title = "🆕 新订单通知" color = "green" elif notify_type == OrderStatus.COMPLETED: title = "✅ 订单完成通知" color = "blue" elif notify_type == OrderStatus.CANCELLED: title = "❌ 订单取消通知" color = "red" else: raise ValueError("不支持的通知类型") content = f"""# {title} > 订单号: {shipping_order.orderid} > 下单时间: {CommonUtils.get_asia_datetime(shipping_order.create_time)} > 包裹数量: {shipping_order.package_count}个 > 订单金额: ¥{shipping_order.original_amount} > 积分抵扣: ¥{shipping_order.point_discount_amount} > 实付金额: ¥{shipping_order.final_amount} > 配送方式: {shipping_order.delivery_method} **收件信息** > 姓名: {shipping_order.address_customer_name} > 电话: {shipping_order.address_customer_phone} > 地址: {shipping_order.address_community_name} | {shipping_order.address_community_building_name} {shipping_order.address_detail} """ return self.send_markdown(content) except Exception as e: print(f"发送订单通知异常: {str(e)}") return False