from fastapi import APIRouter, Request, Response, Query from app.core.wecombot import WecomBot from app.models.database import get_db from app.models.community import CommunityDB from app.core.config import settings import xml.etree.ElementTree as ET import logging import hashlib import base64 from Crypto.Cipher import AES import typing import struct from app.core.wecomclient import wecom_client from app.models.user import UserDB from app.api.deps import get_current_user from fastapi import Depends from sqlalchemy.orm import Session from app.core.response import error_response, success_response from pydantic import BaseModel router = APIRouter() def decrypt_msg(msg_encrypt: str, signature: str, timestamp: str, nonce: str) -> typing.Optional[str]: """解密企业微信消息""" try: token = settings.WECHAT_CORP_TOKEN encoding_aes_key = settings.WECHAT_CORP_ENCODING_AES_KEY corpid = settings.WECHAT_CORP_ID # 1. 验证签名 sort_list = [token, timestamp, nonce, msg_encrypt] sort_list.sort() sha1 = hashlib.sha1() sha1.update("".join(sort_list).encode()) calc_signature = sha1.hexdigest() if calc_signature != signature: return None # 2. 解密消息 aes_key = base64.b64decode(encoding_aes_key + "=") aes = AES.new(aes_key, AES.MODE_CBC, aes_key[:16]) # 解密 text = base64.b64decode(msg_encrypt) decrypted_text = aes.decrypt(text) # 去除补位 pad = decrypted_text[-1] if not isinstance(pad, int): pad = ord(pad) content = decrypted_text[:-pad] # 验证corpid xml_len = struct.unpack('!I', content[16:20])[0] xml_content = content[20 : 20 + xml_len] from_corpid = content[20 + xml_len:] if from_corpid.decode() != corpid: return None return xml_content.decode() except Exception as e: logging.exception("解密企业微信消息失败") return None @router.get("") async def verify_callback( msg_signature: str = Query(..., description="签名"), timestamp: str = Query(..., description="时间戳"), nonce: str = Query(..., description="随机数"), echostr: str = Query(..., description="随机字符串") ): """验证回调配置""" try: # 解密echostr decrypted_str = decrypt_msg(echostr, msg_signature, timestamp, nonce) if not decrypted_str: return Response(status_code=403) return Response(content=decrypted_str, media_type="text/plain") except Exception as e: logging.exception("验证回调配置失败") return Response(status_code=403) @router.post("") async def wechat_corp_callback( request: Request, msg_signature: str = Query(..., description="签名"), timestamp: str = Query(..., description="时间戳"), nonce: str = Query(..., description="随机数") ): """处理企业微信回调消息""" try: # 读取原始XML数据 body = await request.body() body_str = body.decode() # 解析XML获取加密消息 root = ET.fromstring(body_str) encrypt_msg = root.find('Encrypt').text # 解密消息 decrypted_msg = decrypt_msg(encrypt_msg, msg_signature, timestamp, nonce) if not decrypted_msg: return Response(content="success", media_type="text/plain") # 解析解密后的XML msg_root = ET.fromstring(decrypted_msg) print(f"企业微信回调消息:{decrypted_msg}") # 解析基本信息 msg_type = msg_root.find('MsgType').text print(f"msg_type: {msg_type}") # 处理事件消息 if msg_type == 'event': event = msg_root.find('Event').text print(f"event: {event}") # 处理进群事件 if event == 'change_external_chat': chat_id = msg_root.find('ChatId').text change_type = msg_root.find('ChangeType').text update_detail = msg_root.find('UpdateDetail') print(f"chat_id: {chat_id}") print(f"change_type: {change_type}") print(f"update_detail: {update_detail}") # 处理进群事件 if update_detail == 'add_member': userid = msg_root.find('MemChangeList')[0].find('Item').text print(f"userid: {userid}") unionid = await wecom_client.get_unionid_from_userid(userid) print(f"根据userid获取unionid结果: {unionid}") return Response(content="success", media_type="text/plain") except Exception as e: logging.exception("处理企业微信回调消息异常") return Response(content="success", media_type="text/plain") class UnionidToExternalUseridRequest(BaseModel): unionid: str openid: str @router.post("/unionid_to_external_userid") async def unionid_to_external_userid( request: UnionidToExternalUseridRequest ): """根据unionid获取external_userid""" result = await wecom_client.unionid_to_external_userid(request.unionid, request.openid) print(f"根据unionid获取external_userid结果: {result}") return success_response(message="获取external_userid成功", data=result)