from fastapi import APIRouter, Request, Response from app.core.mpclient import mp_client from app.core.response import success_response, error_response from app.models.database import get_db from app.models.user import UserDB import xml.etree.ElementTree as ET from datetime import datetime import hashlib import time from app.core.config import settings import logging router = APIRouter() def check_signature(signature: str, timestamp: str, nonce: str) -> bool: """验证微信服务器签名""" token = settings.MP_TOKEN # 在配置文件中添加 MP_TOKEN # 按字典序排序 temp_list = [token, timestamp, nonce] temp_list.sort() # 拼接字符串 temp_str = "".join(temp_list) # SHA1加密 sign = hashlib.sha1(temp_str.encode('utf-8')).hexdigest() # 与微信发送的签名对比 return sign == signature @router.get("") async def verify_server( signature: str, timestamp: str, nonce: str, echostr: str ): """验证服务器地址的有效性""" if check_signature(signature, timestamp, nonce): return Response(content=echostr, media_type="text/plain") return Response(status_code=403) @router.post("") async def handle_server(request: Request): """处理微信服务器推送的消息和事件""" try: # 读取原始XML数据 body = await request.body() root = ET.fromstring(body) logging.info(f"微信公众号消息:{root}") # 解析基本信息 msg_type = root.find('MsgType').text from_user = root.find('FromUserName').text to_user = root.find('ToUserName').text create_time = int(root.find('CreateTime').text) # 获取数据库会话 db = next(get_db()) # 处理不同类型的消息和事件 if msg_type == 'event': event = root.find('Event').text.lower() if event == 'subscribe': # 关注事件 # 获取用户信息 user_info = await mp_client.get_user_info(from_user) if user_info: # 查找或创建用户 user = db.query(UserDB).filter( UserDB.unionid == user_info.get('unionid') ).first() if user: # 更新用户信息 user.mp_openid = from_user db.commit() return Response(content="", media_type="text/plain") except Exception as e: logging.exception("处理微信消息异常") # 返回空字符串表示接收成功 return Response(content="", media_type="text/plain")