from fastapi import APIRouter, Request, Response, Query from app.core.wecombot import WecomBot from app.models.database import get_db from app.models.community import CommunityDB from app.core.config import settings import xml.etree.ElementTree as ET import logging import hashlib import base64 from Crypto.Cipher import AES import typing import struct from app.core.wecomclient import wecom_client from app.models.user import UserDB from app.api.deps import get_current_user from fastapi import Depends from sqlalchemy.orm import Session from app.core.response import error_response, success_response from pydantic import BaseModel router = APIRouter() def decrypt_msg(msg_encrypt: str, signature: str, timestamp: str, nonce: str) -> typing.Optional[str]: """解密企业微信消息""" try: token = settings.WECHAT_CORP_TOKEN encoding_aes_key = settings.WECHAT_CORP_ENCODING_AES_KEY corpid = settings.WECHAT_CORP_ID # 1. 验证签名 sort_list = [token, timestamp, nonce, msg_encrypt] sort_list.sort() sha1 = hashlib.sha1() sha1.update("".join(sort_list).encode()) calc_signature = sha1.hexdigest() if calc_signature != signature: return None # 2. 解密消息 aes_key = base64.b64decode(encoding_aes_key + "=") aes = AES.new(aes_key, AES.MODE_CBC, aes_key[:16]) # 解密 text = base64.b64decode(msg_encrypt) decrypted_text = aes.decrypt(text) # 去除补位 pad = decrypted_text[-1] if not isinstance(pad, int): pad = ord(pad) content = decrypted_text[:-pad] # 验证corpid xml_len = struct.unpack('!I', content[16:20])[0] xml_content = content[20 : 20 + xml_len] from_corpid = content[20 + xml_len:] if from_corpid.decode() != corpid: return None return xml_content.decode() except Exception as e: logging.exception("解密企业微信消息失败") return None @router.get("") async def verify_callback( msg_signature: str = Query(..., description="签名"), timestamp: str = Query(..., description="时间戳"), nonce: str = Query(..., description="随机数"), echostr: str = Query(..., description="随机字符串") ): """验证回调配置""" try: # 解密echostr decrypted_str = decrypt_msg(echostr, msg_signature, timestamp, nonce) if not decrypted_str: return Response(status_code=403) return Response(content=decrypted_str, media_type="text/plain") except Exception as e: logging.exception("验证回调配置失败") return Response(status_code=403) @router.post("") async def wechat_corp_callback( request: Request, msg_signature: str = Query(..., description="签名"), timestamp: str = Query(..., description="时间戳"), nonce: str = Query(..., description="随机数") ): """处理企业微信回调消息""" try: # 读取原始XML数据 body = await request.body() body_str = body.decode() # 解析XML获取加密消息 root = ET.fromstring(body_str) encrypt_msg = root.find('Encrypt').text # 解密消息 decrypted_msg = decrypt_msg(encrypt_msg, msg_signature, timestamp, nonce) if not decrypted_msg: return Response(content="success", media_type="text/plain") # 解析解密后的XML msg_root = ET.fromstring(decrypted_msg) print(f"企业微信回调消息:{decrypted_msg}") # 解析基本信息 msg_type = msg_root.find('MsgType').text # 处理事件消息 if msg_type == 'event': event = msg_root.find('Event').text # 处理进群事件 if event == 'change_external_chat': chat_id = msg_root.find('ChatId').text change_type = msg_root.find('ChangeType').text if change_type == 'add_member': # # 获取数据库会话 db = next(get_db()) # # 查找对应的小区 # community = db.query(CommunityDB).filter( # CommunityDB.webot_webhook.isnot(None) # ).first() # if community and community.webot_webhook: # # 发送欢迎消息 # wecom_bot = WecomBot() # await wecom_bot.send_welcome_message(community.webot_webhook) return Response(content="success", media_type="text/plain") except Exception as e: logging.exception("处理企业微信回调消息异常") return Response(content="success", media_type="text/plain") class RefreshRequest(BaseModel): code: str @router.get("/access_token") async def get_access_token(): access_token = await wecom_client.get_access_token() return success_response(message="获取企业微信access_token成功", data=access_token) @router.post("/refresh") async def refresh( request: RefreshRequest, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): if current_user and (current_user.wecom_userid or current_user.wecom_pending_id): return error_response(code=400, message="用户已绑定企业微信") user_info = await wecom_client.code2session(request.code) print(f"获取到的企业微信用户信息: {user_info}") if not user_info: return error_response(code=400, message="获取企业微信用户信息失败") if user_info.get("userid"): current_user.wecom_userid = user_info.get("userid") else: current_user.wecom_pending_id = user_info.get("pending_id") db.commit() db.refresh(current_user) return success_response(message="企业微信用户信息刷新成功")