import json import requests from typing import List, Dict, Union, Optional from datetime import datetime from app.core.utils import CommonUtils from app.models.order import OrderStatus from app.models.order import ShippingOrderDB import aiohttp from app.models.community import CommunityDB from app.models.user import UserDB from sqlalchemy.orm import Session import logging from app.core.config import settings class WecomBot: """企业微信机器人工具类""" def __init__(self, webhook_url: str = None): """ 初始化机器人 Args: webhook_url: 机器人webhook地址 """ self.webhook_url = webhook_url async def send_text(self, content: str, mentioned_list: List[str] = None, mentioned_mobile_list: List[str] = None, webhook_url: str = None) -> bool: """ 发送文本消息 Args: content: 消息内容 mentioned_list: 需要@的用户ID列表 mentioned_mobile_list: 需要@的手机号列表 Returns: bool: 是否发送成功 """ data = { "msgtype": "text", "text": { "content": content } } if mentioned_list: data["text"]["mentioned_list"] = mentioned_list if mentioned_mobile_list: data["text"]["mentioned_mobile_list"] = mentioned_mobile_list return await self._send(data, webhook_url) async def send_markdown(self, content: str, webhook_url: str = None) -> bool: """ 发送markdown消息 Args: content: markdown格式的消息内容 Returns: bool: 是否发送成功 """ data = { "msgtype": "markdown", "markdown": { "content": content } } return await self._send(data, webhook_url) async def send_image(self, base64: str, md5: str, webhook_url: str = None) -> bool: """ 发送图片消息 Args: base64: 图片base64编码 md5: 图片MD5值 Returns: bool: 是否发送成功 """ data = { "msgtype": "image", "image": { "base64": base64, "md5": md5 } } return await self._send(data) async def send_news(self, articles: List[Dict[str, str]], webhook_url: str = None) -> bool: """ 发送图文消息 Args: articles: 图文消息列表,每个消息包含title、description、url、picurl Returns: bool: 是否发送成功 """ data = { "msgtype": "news", "news": { "articles": articles } } return await self._send(data, webhook_url) async def send_template_card(self, template_card: Dict, webhook_url: str = None) -> bool: """ 发送模板卡片消息 Args: template_card: 模板卡片数据 Returns: bool: 是否发送成功 """ data = { "msgtype": "template_card", "template_card": template_card } return await self._send(data) async def _send(self, data: Dict, webhook_url: str = None) -> bool: """ 发送消息 Args: data: 消息数据 Returns: bool: 是否发送成功 """ try: headers = {'Content-Type': 'application/json'} async with aiohttp.ClientSession() as session: async with session.post( webhook_url or self.webhook_url, headers=headers, data=json.dumps(data) ) as response: result = await response.json() if result.get('errcode') == 0: return True else: print(f"发送失败: {result.get('errmsg')}") return False except Exception as e: print(f"发送异常: {str(e)}") return False async def send_order_notification(self,db: Session, shipping_order: ShippingOrderDB, notify_type: OrderStatus = OrderStatus.CREATED) -> bool: """ 发送订单通知 Args: order_data: 订单数据 notify_type: 通知类型(create/complete/cancel) Returns: bool: 是否发送成功 """ # 获取webhook webhook = None if shipping_order.address_community_id: community = db.query(CommunityDB).filter( CommunityDB.id == shipping_order.address_community_id ).first() if community: webhook = community.webot_webhook if not webhook: return False deliveryman = None if shipping_order.deliveryman_user_id: deliveryman = db.query(UserDB).filter( UserDB.userid == shipping_order.deliveryman_user_id ).first() try: if notify_type == OrderStatus.CREATED: title = "🆕 新订单通知" color = "yellow" elif notify_type == OrderStatus.RECEIVED: title = "🚚 已接单通知" color = "blue" elif notify_type == OrderStatus.COMPLETED: title = "✅ 配送完成通知" color = "green" elif notify_type == OrderStatus.CANCELLED: title = "❌ 订单取消通知" color = "red" else: raise ValueError("不支持的通知类型") content = f"""# {title} > **环境:{settings.ENV_NAME}** **所属小区** > {shipping_order.address_community_name} **订单信息** > 订单号: {shipping_order.orderid} > 下单时间: {CommonUtils.get_asia_datetime(shipping_order.create_time)} > 包裹数量: {shipping_order.package_count}个 > 订单金额: ¥{shipping_order.original_amount} > 实付金额: ¥{shipping_order.final_amount} > 订单状态: {shipping_order.status.status_text} **收件信息** > 姓名: {shipping_order.address_customer_name} > 电话: {shipping_order.address_customer_phone[:3]}\*\*\*\*{shipping_order.address_customer_phone[7:]} > 地址: {shipping_order.address_community_building_name} {shipping_order.address_detail[0:3]} \*\*\*\* """ if shipping_order.deliveryman_user_id: content += f""" **配送员信息** > 姓名: {deliveryman.nickname} > 电话: {deliveryman.phone[:3]}\*\*\*\*{deliveryman.phone[7:]} """ return await self.send_markdown(content, webhook) except Exception as e: logging.exception(f"发送企业微信消息失败: {str(e)}") raise e