deliveryman-api/app/api/endpoints/wecom.py
2025-03-30 13:41:18 +08:00

537 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Request, Response, Query
from app.core.wecombot import WecomBot
from app.models.database import get_db
from app.models.community import CommunityDB
from app.core.config import settings
import xml.etree.ElementTree as ET
import logging
import hashlib
import base64
from Crypto.Cipher import AES
import typing
import struct
from app.core.wecomclient import wecom_client
from app.models.user import UserDB
from app.api.deps import get_current_user
from fastapi import Depends
from sqlalchemy.orm import Session
from app.core.response import error_response, success_response, ResponseModel
from pydantic import BaseModel
from app.models.wecom_external_chat import WecomExternalChatDB, WecomExternalChatInfo, WecomExternalChatMemberDB, WecomExternalChatMemberInfo
from datetime import datetime
router = APIRouter()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def decrypt_msg(msg_encrypt: str, signature: str, timestamp: str, nonce: str) -> typing.Optional[str]:
"""解密企业微信消息"""
try:
token = settings.WECHAT_CORP_TOKEN
encoding_aes_key = settings.WECHAT_CORP_ENCODING_AES_KEY
corpid = settings.WECHAT_CORP_ID
# 1. 验证签名
sort_list = [token, timestamp, nonce, msg_encrypt]
sort_list.sort()
sha1 = hashlib.sha1()
sha1.update("".join(sort_list).encode())
calc_signature = sha1.hexdigest()
if calc_signature != signature:
return None
# 2. 解密消息
aes_key = base64.b64decode(encoding_aes_key + "=")
aes = AES.new(aes_key, AES.MODE_CBC, aes_key[:16])
# 解密
text = base64.b64decode(msg_encrypt)
decrypted_text = aes.decrypt(text)
# 去除补位
pad = decrypted_text[-1]
if not isinstance(pad, int):
pad = ord(pad)
content = decrypted_text[:-pad]
# 验证corpid
xml_len = struct.unpack('!I', content[16:20])[0]
xml_content = content[20 : 20 + xml_len]
from_corpid = content[20 + xml_len:]
if from_corpid.decode() != corpid:
return None
return xml_content.decode()
except Exception as e:
logger.exception("解密企业微信消息失败")
return None
@router.get("")
async def verify_callback(
msg_signature: str = Query(..., description="签名"),
timestamp: str = Query(..., description="时间戳"),
nonce: str = Query(..., description="随机数"),
echostr: str = Query(..., description="随机字符串")
):
"""验证回调配置"""
try:
# 解密echostr
decrypted_str = decrypt_msg(echostr, msg_signature, timestamp, nonce)
if not decrypted_str:
return Response(status_code=403)
return Response(content=decrypted_str, media_type="text/plain")
except Exception as e:
logger.exception("验证回调配置失败")
return Response(status_code=403)
@router.post("")
async def wechat_corp_callback(
request: Request,
msg_signature: str = Query(..., description="签名"),
timestamp: str = Query(..., description="时间戳"),
nonce: str = Query(..., description="随机数")
):
"""处理企业微信回调消息"""
try:
# 读取原始XML数据
body = await request.body()
body_str = body.decode()
# 解析XML获取加密消息
root = ET.fromstring(body_str)
encrypt_msg = root.find('Encrypt').text
# 解密消息
decrypted_msg = decrypt_msg(encrypt_msg, msg_signature, timestamp, nonce)
if not decrypted_msg:
return Response(content="success", media_type="text/plain")
# 解析解密后的XML
msg_root = ET.fromstring(decrypted_msg)
logger.info(f"企业微信回调消息:{decrypted_msg}")
# 解析基本信息
msg_type = msg_root.find('MsgType').text
logger.info(f"msg_type: {msg_type}")
# 处理事件消息
if msg_type == 'event':
event = msg_root.find('Event').text
logger.info(f"event: {event}")
# 处理外部群聊变更事件
if event == 'change_external_chat':
chat_id = msg_root.find('ChatId').text
change_type = msg_root.find('ChangeType').text
update_detail = msg_root.find('UpdateDetail').text
join_user_id = None
quit_user_ids = []
# 处理成员加入事件
if update_detail == 'add_member':
logger.info(f"有新成员加入群聊")
join_scene_elem = msg_root.find('JoinScene')
join_scene = int(join_scene_elem.text) if join_scene_elem is not None else 0
logger.info(f"加入场景: {join_scene}")
# 获取加入成员列表
mem_change_list = msg_root.find('MemChangeList')
if mem_change_list is not None:
for item in mem_change_list.findall('Item'):
join_user_id = item.text
if join_user_id:
logger.info(f"从MemChangeList中获取到新加入成员: {join_user_id}")
# 处理群聊变更事件 - 添加单个成员
await wecom_client.handle_chat_change_event(
chat_id=chat_id,
change_type=change_type,
update_detail=update_detail,
join_user_id=join_user_id
)
# 兼容旧格式如果找不到MemChangeList尝试JoinUserID
elif msg_root.find('JoinUserID') is not None:
join_user_id = msg_root.find('JoinUserID').text
if join_user_id:
logger.info(f"从JoinUserID中获取到新加入成员: {join_user_id}")
# 处理群聊变更事件 - 添加单个成员
await wecom_client.handle_chat_change_event(
chat_id=chat_id,
change_type=change_type,
update_detail=update_detail,
join_user_id=join_user_id
)
# 处理成员离开事件
elif update_detail == 'del_member':
logger.info(f"有成员离开群聊")
quit_scene_elem = msg_root.find('QuitScene')
quit_scene = int(quit_scene_elem.text) if quit_scene_elem is not None else 0
# 获取离开成员列表
mem_change_list = msg_root.find('MemChangeList')
if mem_change_list is not None:
for item in mem_change_list.findall('Item'):
quit_user_id = item.text
if quit_user_id:
quit_user_ids.append(quit_user_id)
# 处理成员离开
await wecom_client.handle_chat_change_event(
chat_id=chat_id,
change_type=change_type,
update_detail=update_detail,
join_user_id=quit_user_id # 复用这个参数名来表示离开的用户ID
)
logger.info(f"离开场景: {quit_scene}, 离开成员: {quit_user_ids}")
logger.info(f"chat_id: {chat_id}, change_type: {change_type}, update_detail: {update_detail}, join_user_id: {join_user_id}, quit_user_ids: {quit_user_ids}")
# 处理其他群聊变更事件不是add_member和del_member
if update_detail != 'add_member' and update_detail != 'del_member':
# 其他群聊变更事件
await wecom_client.handle_chat_change_event(
chat_id=chat_id,
change_type=change_type,
update_detail=update_detail
)
return Response(content="success", media_type="text/plain")
except Exception as e:
logger.exception("处理企业微信回调消息异常")
return Response(content="success", media_type="text/plain")
class UnionidToExternalUseridRequest(BaseModel):
unionid: str
openid: str
@router.post("/unionid_to_external_userid")
async def unionid_to_external_userid(
request: UnionidToExternalUseridRequest
):
"""根据unionid获取external_userid"""
result = await wecom_client.unionid_to_external_userid(request.unionid, request.openid)
print(f"根据unionid获取external_userid结果: {result}")
return success_response(message="获取external_userid成功", data=result)
@router.get("/external-chats", response_model=ResponseModel)
async def get_external_chats(
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取企业微信外部群聊列表"""
try:
# 检查是否为管理员
if current_user.userid != settings.PLATFORM_USER_ID:
return error_response(code=403, message="权限不足")
# 获取群聊列表
chats = db.query(WecomExternalChatDB).filter(
WecomExternalChatDB.is_active == True
).order_by(WecomExternalChatDB.update_time.desc()).all()
# 转换为Pydantic模型
chat_list = [WecomExternalChatInfo.model_validate(chat) for chat in chats]
return success_response(message="获取群聊列表成功", data=chat_list)
except Exception as e:
logger.exception("获取群聊列表异常")
return error_response(code=500, message=f"获取群聊列表失败: {str(e)}")
@router.get("/external-chats/{chat_id}/members", response_model=ResponseModel)
async def get_external_chat_members(
chat_id: str,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取企业微信外部群聊成员列表"""
try:
# 检查是否为管理员
if current_user.userid != settings.PLATFORM_USER_ID:
return error_response(code=403, message="权限不足")
# 检查群聊是否存在
chat = db.query(WecomExternalChatDB).filter(
WecomExternalChatDB.chat_id == chat_id
).first()
if not chat:
return error_response(code=404, message="群聊不存在")
# 获取成员列表
members = db.query(WecomExternalChatMemberDB).filter(
WecomExternalChatMemberDB.chat_id == chat_id
).order_by(WecomExternalChatMemberDB.join_time.desc()).all()
# 转换为Pydantic模型
member_list = [WecomExternalChatMemberInfo.model_validate(member) for member in members]
return success_response(message="获取群聊成员列表成功", data=member_list)
except Exception as e:
logger.exception("获取群聊成员列表异常")
return error_response(code=500, message=f"获取群聊成员列表失败: {str(e)}")
@router.post("/sync-chat/{chat_id}", response_model=ResponseModel)
async def sync_external_chat(
chat_id: str,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""同步企业微信外部群聊信息"""
try:
# 检查是否为管理员
if current_user.userid != settings.PLATFORM_USER_ID:
return error_response(code=403, message="权限不足")
# 获取群聊信息
result = await wecom_client.handle_chat_change_event(
chat_id=chat_id,
change_type="create",
update_detail=""
)
if not result:
return error_response(code=500, message="同步群聊信息失败")
return success_response(message="同步群聊信息成功")
except Exception as e:
logger.exception("同步群聊信息异常")
return error_response(code=500, message=f"同步群聊信息失败: {str(e)}")
@router.get("/chat-dashboard")
async def chat_dashboard(
current_user: UserDB = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""显示企业微信外部群聊信息的HTML页面"""
# 检查是否为管理员
if current_user.userid != settings.PLATFORM_USER_ID:
return Response(content="权限不足", media_type="text/html")
# 获取群聊列表
chats = db.query(WecomExternalChatDB).filter(
WecomExternalChatDB.is_active == True
).order_by(WecomExternalChatDB.update_time.desc()).all()
# 生成HTML
html = """
<!DOCTYPE html>
<html>
<head>
<title>企业微信外部群聊信息</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body { font-family: Arial, sans-serif; margin: 0; padding: 20px; }
h1, h2 { color: #333; }
table { border-collapse: collapse; width: 100%; margin-bottom: 20px; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
tr:nth-child(even) { background-color: #f9f9f9; }
.container { max-width: 1200px; margin: 0 auto; }
.btn {
display: inline-block;
padding: 6px 12px;
margin-bottom: 0;
font-size: 14px;
font-weight: 400;
line-height: 1.42857143;
text-align: center;
white-space: nowrap;
vertical-align: middle;
cursor: pointer;
background-image: none;
border: 1px solid transparent;
border-radius: 4px;
color: #fff;
background-color: #337ab7;
text-decoration: none;
}
.btn:hover { background-color: #286090; }
.stats {
background-color: #f5f5f5;
border: 1px solid #ddd;
border-radius: 4px;
padding: 15px;
margin-bottom: 20px;
}
.stat-item {
display: inline-block;
margin-right: 30px;
font-size: 16px;
}
.stat-number {
font-size: 24px;
font-weight: bold;
color: #337ab7;
}
</style>
<script>
function loadMembers(chatId) {
fetch(`/api/wecom/external-chats/${chatId}/members`)
.then(response => response.json())
.then(data => {
if (data.code === 0) {
const members = data.data;
let html = '<table>';
html += '<tr><th>ID</th><th>用户ID</th><th>类型</th><th>姓名</th><th>加入时间</th><th>是否已发送欢迎</th></tr>';
members.forEach(member => {
html += `<tr>
<td>${member.id}</td>
<td>${member.user_id}</td>
<td>${member.type}</td>
<td>${member.name || '未知'}</td>
<td>${new Date(member.join_time).toLocaleString()}</td>
<td>${member.welcome_sent ? '' : ''}</td>
</tr>`;
});
html += '</table>';
document.getElementById('members-' + chatId).innerHTML = html;
} else {
alert('获取成员列表失败: ' + data.message);
}
})
.catch(error => {
console.error('Error:', error);
alert('获取成员列表失败');
});
}
function syncChat(chatId) {
fetch(`/api/wecom/sync-chat/${chatId}`, {
method: 'POST'
})
.then(response => response.json())
.then(data => {
if (data.code === 0) {
alert('同步成功');
location.reload();
} else {
alert('同步失败: ' + data.message);
}
})
.catch(error => {
console.error('Error:', error);
alert('同步失败');
});
}
function getChatStats() {
fetch(`/api/wecom/chat-stats`)
.then(response => response.json())
.then(data => {
if (data.code === 0) {
const stats = data.data;
document.getElementById('total-chats').innerText = stats.total_chats;
document.getElementById('total-members').innerText = stats.total_members;
document.getElementById('active-chats').innerText = stats.active_chats;
} else {
console.error('获取统计数据失败:', data.message);
}
})
.catch(error => {
console.error('Error:', error);
});
}
// 页面加载完成后获取统计数据
window.onload = function() {
getChatStats();
};
</script>
</head>
<body>
<div class="container">
<h1>企业微信外部群聊信息</h1>
<div class="stats">
<div class="stat-item">
<div>总群聊数</div>
<div class="stat-number" id="total-chats">-</div>
</div>
<div class="stat-item">
<div>总成员数</div>
<div class="stat-number" id="total-members">-</div>
</div>
<div class="stat-item">
<div>活跃群聊数</div>
<div class="stat-number" id="active-chats">-</div>
</div>
</div>
"""
if not chats:
html += "<p>暂无群聊信息</p>"
else:
for chat in chats:
html += f"""
<div style="margin-bottom: 30px; border: 1px solid #ddd; padding: 15px; border-radius: 5px;">
<h2>{chat.name or '未命名群聊'} ({chat.chat_id})</h2>
<p>
<strong>创建时间:</strong> {chat.create_time.strftime('%Y-%m-%d %H:%M:%S')}<br>
<strong>更新时间:</strong> {chat.update_time.strftime('%Y-%m-%d %H:%M:%S') if chat.update_time else ''}<br>
<strong>成员数量:</strong> {chat.member_count}<br>
<strong>群主:</strong> {chat.owner or '未知'}<br>
<strong>公告:</strong> {chat.notice or ''}<br>
</p>
<button class="btn" onclick="loadMembers('{chat.chat_id}')">查看成员</button>
<button class="btn" onclick="syncChat('{chat.chat_id}')">同步群信息</button>
<div id="members-{chat.chat_id}" style="margin-top: 15px;"></div>
</div>
"""
html += """
</div>
</body>
</html>
"""
return Response(content=html, media_type="text/html")
@router.get("/chat-stats", response_model=ResponseModel)
async def get_chat_stats(
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取企业微信外部群聊统计数据"""
try:
# 检查是否为管理员
if current_user.userid != settings.PLATFORM_USER_ID:
return error_response(code=403, message="权限不足")
# 计算统计数据
total_chats = db.query(WecomExternalChatDB).count()
active_chats = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.is_active == True).count()
total_members = db.query(WecomExternalChatMemberDB).count()
# 获取今日新增成员数
today = datetime.now().date()
today_start = datetime.combine(today, datetime.min.time())
today_members = db.query(WecomExternalChatMemberDB).filter(
WecomExternalChatMemberDB.join_time >= today_start
).count()
# 返回统计数据
stats = {
"total_chats": total_chats,
"active_chats": active_chats,
"total_members": total_members,
"today_members": today_members
}
return success_response(message="获取统计数据成功", data=stats)
except Exception as e:
logger.exception("获取统计数据异常")
return error_response(code=500, message=f"获取统计数据失败: {str(e)}")