deliveryman-api/app/api/endpoints/mp.py
2025-02-19 00:09:53 +08:00

102 lines
3.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Request, Response
from app.core.mpclient import mp_client
from app.core.response import success_response, error_response
from app.models.database import get_db
from app.models.user import UserDB
import xml.etree.ElementTree as ET
from datetime import datetime
import hashlib
import time
from app.core.config import settings
import logging
from typing import Optional
router = APIRouter()
def check_signature(signature: str, timestamp: str, nonce: str) -> bool:
"""验证微信服务器签名"""
token = settings.MP_TOKEN # 在配置文件中添加 MP_TOKEN
# 按字典序排序
temp_list = [token, timestamp, nonce]
temp_list.sort()
# 拼接字符串
temp_str = "".join(temp_list)
# SHA1加密
sign = hashlib.sha1(temp_str.encode('utf-8')).hexdigest()
# 与微信发送的签名对比
return sign == signature
@router.get("")
async def verify_server(
signature: str,
timestamp: str,
nonce: str,
echostr: str
):
"""验证服务器地址的有效性"""
if check_signature(signature, timestamp, nonce):
return Response(content=echostr, media_type="text/plain")
return Response(status_code=403)
@router.post("")
async def handle_server(request: Request):
"""处理微信服务器推送的消息和事件"""
try:
# 读取原始XML数据
body = await request.body()
root = ET.fromstring(body)
print(f"微信公众号消息:{body}")
# 解析基本信息
msg_type = root.find('MsgType').text
from_user = root.find('FromUserName').text
to_user = root.find('ToUserName').text
create_time = int(root.find('CreateTime').text)
# 获取数据库会话
db = next(get_db())
# 处理不同类型的消息和事件
if msg_type == 'event':
event = root.find('Event').text.lower()
if event == 'subscribe': # 关注事件
# 获取用户信息
user_info = await mp_client.get_user_info(from_user)
print(f"微信公众号用户信息:{user_info}")
if user_info:
# 查找或创建用户
user = db.query(UserDB).filter(
UserDB.unionid == user_info.get('unionid')
).first()
if user:
# 更新用户信息
user.mp_openid = from_user
db.commit()
return Response(content="", media_type="text/plain")
except Exception as e:
logging.exception("处理微信消息异常")
# 返回空字符串表示接收成功
return Response(content="", media_type="text/plain")
# 发送模板消息接口传入参数openid,template_id,data,url,miniprogram
@router.post("/template/send")
async def send_template_message(
openid: str,
template_id: str,
data: dict,
url: Optional[str] = None,
miniprogram: Optional[dict] = None
):
# {"data":{"phrase2":"快递代取","character_string13":"123","time4":"2025-02-25 11:11:11","thing15":"天悦龙庭10栋","thing10":"周龙"}}
"""发送模板消息"""
await mp_client.send_template_message(openid, template_id, data, url, miniprogram)
return success_response(message="模板消息发送成功")