deliveryman-api/app/api/endpoints/wecom.py
2025-03-04 18:37:04 +08:00

161 lines
5.3 KiB
Python

from fastapi import APIRouter, Request, Response, Query
from app.core.wecombot import WecomBot
from app.models.database import get_db
from app.models.community import CommunityDB
from app.core.config import settings
import xml.etree.ElementTree as ET
import logging
import hashlib
import base64
from Crypto.Cipher import AES
import typing
import struct
from app.core.wecomclient import wecom_client
from app.models.user import UserDB
from app.api.deps import get_current_user
from fastapi import Depends
from sqlalchemy.orm import Session
from app.core.response import error_response, success_response
from pydantic import BaseModel
router = APIRouter()
def decrypt_msg(msg_encrypt: str, signature: str, timestamp: str, nonce: str) -> typing.Optional[str]:
"""解密企业微信消息"""
try:
token = settings.WECHAT_CORP_TOKEN
encoding_aes_key = settings.WECHAT_CORP_ENCODING_AES_KEY
corpid = settings.WECHAT_CORP_ID
# 1. 验证签名
sort_list = [token, timestamp, nonce, msg_encrypt]
sort_list.sort()
sha1 = hashlib.sha1()
sha1.update("".join(sort_list).encode())
calc_signature = sha1.hexdigest()
if calc_signature != signature:
return None
# 2. 解密消息
aes_key = base64.b64decode(encoding_aes_key + "=")
aes = AES.new(aes_key, AES.MODE_CBC, aes_key[:16])
# 解密
text = base64.b64decode(msg_encrypt)
decrypted_text = aes.decrypt(text)
# 去除补位
pad = decrypted_text[-1]
if not isinstance(pad, int):
pad = ord(pad)
content = decrypted_text[:-pad]
# 验证corpid
xml_len = struct.unpack('!I', content[16:20])[0]
xml_content = content[20 : 20 + xml_len]
from_corpid = content[20 + xml_len:]
if from_corpid.decode() != corpid:
return None
return xml_content.decode()
except Exception as e:
logging.exception("解密企业微信消息失败")
return None
@router.get("")
async def verify_callback(
msg_signature: str = Query(..., description="签名"),
timestamp: str = Query(..., description="时间戳"),
nonce: str = Query(..., description="随机数"),
echostr: str = Query(..., description="随机字符串")
):
"""验证回调配置"""
try:
# 解密echostr
decrypted_str = decrypt_msg(echostr, msg_signature, timestamp, nonce)
if not decrypted_str:
return Response(status_code=403)
return Response(content=decrypted_str, media_type="text/plain")
except Exception as e:
logging.exception("验证回调配置失败")
return Response(status_code=403)
@router.post("")
async def wechat_corp_callback(
request: Request,
msg_signature: str = Query(..., description="签名"),
timestamp: str = Query(..., description="时间戳"),
nonce: str = Query(..., description="随机数")
):
"""处理企业微信回调消息"""
try:
# 读取原始XML数据
body = await request.body()
body_str = body.decode()
# 解析XML获取加密消息
root = ET.fromstring(body_str)
encrypt_msg = root.find('Encrypt').text
# 解密消息
decrypted_msg = decrypt_msg(encrypt_msg, msg_signature, timestamp, nonce)
if not decrypted_msg:
return Response(content="success", media_type="text/plain")
# 解析解密后的XML
msg_root = ET.fromstring(decrypted_msg)
print(f"企业微信回调消息:{decrypted_msg}")
# 解析基本信息
msg_type = msg_root.find('MsgType').text
print(f"msg_type: {msg_type}")
# 处理事件消息
if msg_type == 'event':
event = msg_root.find('Event').text
print(f"event: {event}")
# 处理进群事件
if event == 'change_external_chat':
chat_id = msg_root.find('ChatId').text
change_type = msg_root.find('ChangeType').text
update_detail = msg_root.find('UpdateDetail').text
print(f"chat_id: {chat_id}")
print(f"change_type: {change_type}")
print(f"update_detail: {update_detail}")
# 处理进群事件
if update_detail == 'add_member':
print(f"发送欢迎消息")
# 发送欢迎消息
# await wecom_client.send_welcome_message(chat_id)
return Response(content="success", media_type="text/plain")
except Exception as e:
logging.exception("处理企业微信回调消息异常")
return Response(content="success", media_type="text/plain")
class UnionidToExternalUseridRequest(BaseModel):
unionid: str
openid: str
@router.post("/unionid_to_external_userid")
async def unionid_to_external_userid(
request: UnionidToExternalUseridRequest
):
"""根据unionid获取external_userid"""
result = await wecom_client.unionid_to_external_userid(request.unionid, request.openid)
print(f"根据unionid获取external_userid结果: {result}")
return success_response(message="获取external_userid成功", data=result)