deliveryman-api/app/api/endpoints/wecom.py
2025-02-27 19:19:11 +08:00

180 lines
6.0 KiB
Python

from fastapi import APIRouter, Request, Response, Query
from app.core.wecombot import WecomBot
from app.models.database import get_db
from app.models.community import CommunityDB
from app.core.config import settings
import xml.etree.ElementTree as ET
import logging
import hashlib
import base64
from Crypto.Cipher import AES
import typing
import struct
from app.core.wecomclient import wecom_client
from app.models.user import UserDB
from app.api.deps import get_current_user
from fastapi import Depends
from sqlalchemy.orm import Session
from app.core.response import error_response, success_response
from pydantic import BaseModel
router = APIRouter()
def decrypt_msg(msg_encrypt: str, signature: str, timestamp: str, nonce: str) -> typing.Optional[str]:
"""解密企业微信消息"""
try:
token = settings.WECHAT_CORP_TOKEN
encoding_aes_key = settings.WECHAT_CORP_ENCODING_AES_KEY
corpid = settings.WECHAT_CORP_ID
# 1. 验证签名
sort_list = [token, timestamp, nonce, msg_encrypt]
sort_list.sort()
sha1 = hashlib.sha1()
sha1.update("".join(sort_list).encode())
calc_signature = sha1.hexdigest()
if calc_signature != signature:
return None
# 2. 解密消息
aes_key = base64.b64decode(encoding_aes_key + "=")
aes = AES.new(aes_key, AES.MODE_CBC, aes_key[:16])
# 解密
text = base64.b64decode(msg_encrypt)
decrypted_text = aes.decrypt(text)
# 去除补位
pad = decrypted_text[-1]
if not isinstance(pad, int):
pad = ord(pad)
content = decrypted_text[:-pad]
# 验证corpid
xml_len = struct.unpack('!I', content[16:20])[0]
xml_content = content[20 : 20 + xml_len]
from_corpid = content[20 + xml_len:]
if from_corpid.decode() != corpid:
return None
return xml_content.decode()
except Exception as e:
logging.exception("解密企业微信消息失败")
return None
@router.get("")
async def verify_callback(
msg_signature: str = Query(..., description="签名"),
timestamp: str = Query(..., description="时间戳"),
nonce: str = Query(..., description="随机数"),
echostr: str = Query(..., description="随机字符串")
):
"""验证回调配置"""
try:
# 解密echostr
decrypted_str = decrypt_msg(echostr, msg_signature, timestamp, nonce)
if not decrypted_str:
return Response(status_code=403)
return Response(content=decrypted_str, media_type="text/plain")
except Exception as e:
logging.exception("验证回调配置失败")
return Response(status_code=403)
@router.post("")
async def wechat_corp_callback(
request: Request,
msg_signature: str = Query(..., description="签名"),
timestamp: str = Query(..., description="时间戳"),
nonce: str = Query(..., description="随机数")
):
"""处理企业微信回调消息"""
try:
# 读取原始XML数据
body = await request.body()
body_str = body.decode()
# 解析XML获取加密消息
root = ET.fromstring(body_str)
encrypt_msg = root.find('Encrypt').text
# 解密消息
decrypted_msg = decrypt_msg(encrypt_msg, msg_signature, timestamp, nonce)
if not decrypted_msg:
return Response(content="success", media_type="text/plain")
# 解析解密后的XML
msg_root = ET.fromstring(decrypted_msg)
print(f"企业微信回调消息:{decrypted_msg}")
# 解析基本信息
msg_type = msg_root.find('MsgType').text
# 处理事件消息
if msg_type == 'event':
event = msg_root.find('Event').text
# 处理进群事件
if event == 'change_external_chat':
chat_id = msg_root.find('ChatId').text
change_type = msg_root.find('ChangeType').text
if change_type == 'add_member':
# 获取数据库会话
db = next(get_db())
# 查找对应的小区
community = db.query(CommunityDB).filter(
CommunityDB.webot_webhook.isnot(None)
).first()
if community and community.webot_webhook:
# 发送欢迎消息
wecom_bot = WecomBot()
await wecom_bot.send_welcome_message(community.webot_webhook)
return Response(content="success", media_type="text/plain")
except Exception as e:
logging.exception("处理企业微信回调消息异常")
return Response(content="success", media_type="text/plain")
class RefreshRequest(BaseModel):
code: str
@router.get("/access_token")
async def get_access_token():
access_token = await wecom_client.get_access_token()
return success_response(message="获取企业微信access_token成功", data=access_token)
@router.post("/refresh")
async def refresh(
request: RefreshRequest,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
if current_user and (current_user.wecom_userid or current_user.wecom_pending_id):
return error_response(code=400, message="用户已绑定企业微信")
user_info = await wecom_client.code2session(request.code)
print(f"获取到的企业微信用户信息: {user_info}")
if not user_info:
return error_response(code=400, message="获取企业微信用户信息失败")
if user_info.get("userid"):
current_user.wecom_userid = user_info.get("userid")
else:
current_user.wecom_pending_id = user_info.get("pending_id")
db.commit()
db.refresh(current_user)
return success_response(message="企业微信用户信息刷新成功")