from fastapi import APIRouter, Request, Response, Query from app.core.wecombot import WecomBot from app.models.database import get_db from app.models.community import CommunityDB from app.core.config import settings import xml.etree.ElementTree as ET import logging import hashlib import base64 from Crypto.Cipher import AES import typing import struct router = APIRouter() def decrypt_msg(msg_encrypt: str, signature: str, timestamp: str, nonce: str) -> typing.Optional[str]: """解密企业微信消息""" try: token = settings.WECHAT_CORP_TOKEN encoding_aes_key = settings.WECHAT_CORP_ENCODING_AES_KEY corpid = settings.WECHAT_CORP_ID # 1. 验证签名 sort_list = [token, timestamp, nonce, msg_encrypt] sort_list.sort() sha1 = hashlib.sha1() sha1.update("".join(sort_list).encode()) calc_signature = sha1.hexdigest() if calc_signature != signature: return None # 2. 解密消息 aes_key = base64.b64decode(encoding_aes_key + "=") aes = AES.new(aes_key, AES.MODE_CBC, aes_key[:16]) # 解密 text = base64.b64decode(msg_encrypt) decrypted_text = aes.decrypt(text) # 去除补位 pad = decrypted_text[-1] if not isinstance(pad, int): pad = ord(pad) content = decrypted_text[:-pad] # 验证corpid xml_len = struct.unpack('!I', content[16:20])[0] xml_content = content[20 : 20 + xml_len] from_corpid = content[20 + xml_len:] if from_corpid.decode() != corpid: return None return xml_content.decode() except Exception as e: logging.exception("解密企业微信消息失败") return None @router.get("/callback") async def verify_callback( msg_signature: str = Query(..., description="签名"), timestamp: str = Query(..., description="时间戳"), nonce: str = Query(..., description="随机数"), echostr: str = Query(..., description="随机字符串") ): """验证回调配置""" try: # 解密echostr decrypted_str = decrypt_msg(echostr, msg_signature, timestamp, nonce) if not decrypted_str: return Response(status_code=403) return Response(content=decrypted_str, media_type="text/plain") except Exception as e: logging.exception("验证回调配置失败") return Response(status_code=403) @router.post("/callback") async def wechat_corp_callback( request: Request, msg_signature: str = Query(..., description="签名"), timestamp: str = Query(..., description="时间戳"), nonce: str = Query(..., description="随机数") ): """处理企业微信回调消息""" try: # 读取原始XML数据 body = await request.body() body_str = body.decode() # 解析XML获取加密消息 root = ET.fromstring(body_str) encrypt_msg = root.find('Encrypt').text # 解密消息 decrypted_msg = decrypt_msg(encrypt_msg, msg_signature, timestamp, nonce) if not decrypted_msg: return Response(content="success", media_type="text/plain") # 解析解密后的XML msg_root = ET.fromstring(decrypted_msg) print(f"企业微信回调消息:{decrypted_msg}") # 解析基本信息 msg_type = msg_root.find('MsgType').text # 处理事件消息 if msg_type == 'event': event = msg_root.find('Event').text # 处理进群事件 if event == 'change_external_chat': chat_id = msg_root.find('ChatId').text change_type = msg_root.find('ChangeType').text if change_type == 'add_member': # 获取数据库会话 db = next(get_db()) # 查找对应的小区 community = db.query(CommunityDB).filter( CommunityDB.webot_webhook.isnot(None) ).first() if community and community.webot_webhook: # 发送欢迎消息 wecom_bot = WecomBot() await wecom_bot.send_welcome_message(community.webot_webhook) return Response(content="success", media_type="text/plain") except Exception as e: logging.exception("处理企业微信回调消息异常") return Response(content="success", media_type="text/plain")