deliveryman-api/app/core/wecomclient.py
2025-03-30 14:28:46 +08:00

477 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import time
import logging
from app.core.config import settings
from typing import Dict, Any, Optional, List
import aiohttp
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class WecomClient:
"""企业微信客户端"""
def __init__(self):
self.corp_id = settings.WECHAT_CORP_ID
self.corp_secret = settings.WECHAT_CORP_SECRET
self.access_token = None
async def get_access_token(self) -> str:
"""获取访问令牌"""
try:
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {
"corpid": self.corp_id,
"corpsecret": self.corp_secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
self.access_token = result["access_token"]
return self.access_token
else:
raise Exception(f"获取access_token失败: {result}")
except Exception as e:
logger.error(f"获取access_token异常: {str(e)}")
return None
async def code2session(self, js_code: str) -> Optional[Dict[str, Any]]:
"""
小程序登录凭证校验
"""
try:
# 1. 获取 access_token
access_token = await self.get_access_token()
if not access_token:
raise Exception("获取access_token失败")
# 2. 调用 jscode2session 接口
url = "https://qyapi.weixin.qq.com/cgi-bin/miniprogram/jscode2session"
params = {
"access_token": access_token,
"js_code": js_code,
"grant_type": "authorization_code"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
return {
"userid": result.get("userid"),
"pending_id": result.get("pending_id"),
"session_key": result.get("session_key")
}
else:
logger.error(f"code2session失败: {result}")
return None
except Exception as e:
logger.error(f"code2session异常: {str(e)}")
return None
async def get_unionid_from_userid(self, userid: str) -> Optional[str]:
"""根据userid获取unionid"""
try:
access_token = await self.get_access_token()
if not access_token:
raise Exception("获取access_token失败")
url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get"
params = {
"access_token": access_token,
"external_userid": userid
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
return result.get("unionid")
except Exception as e:
logger.error(f"get_unionid_from_userid异常: {str(e)}")
return None
async def unionid_to_external_userid(self, unionid: str, openid: str) -> Optional[str]:
"""根据unionid获取external_userid"""
try:
url = f"https://qyapi.weixin.qq.com/cgi-bin/idconvert/unionid_to_external_userid?access_token={await self.get_access_token()}"
params = {
"unionid": unionid,
"openid": openid,
"subject_type": 1
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=params) as response:
result = await response.json()
return result
except Exception as e:
logger.error(f"unionid_to_external_userid异常: {str(e)}")
return None
async def get_external_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
"""获取外部群聊信息
Args:
chat_id: 群聊ID
Returns:
Dict: 群聊信息
"""
try:
access_token = await self.get_access_token()
if not access_token:
logger.error("获取access_token失败")
return None
url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/groupchat/get?access_token={access_token}"
data = {
"chat_id": chat_id
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
result = await response.json()
if result.get("errcode") == 0:
return result.get("group_chat")
else:
logger.error(f"获取外部群聊信息失败: {result}")
return None
except Exception as e:
logger.error(f"获取外部群聊信息异常: {str(e)}")
return None
async def get_external_contact_info(self, external_userid: str) -> Optional[Dict[str, Any]]:
"""获取外部联系人信息
Args:
external_userid: 外部联系人ID
Returns:
Dict: 外部联系人信息
"""
try:
access_token = await self.get_access_token()
if not access_token:
logger.error("获取access_token失败")
return None
url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get?access_token={access_token}"
params = {
"external_userid": external_userid
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
logger.info(f"获取外部联系人信息成功: {result}")
return result.get("external_contact")
else:
logger.error(f"获取外部联系人信息失败: {result}")
return None
except Exception as e:
logger.error(f"获取外部联系人信息异常: {str(e)}")
return None
async def get_internal_user_info(self, user_id: str) -> Optional[Dict[str, Any]]:
"""获取企业内部成员信息
Args:
user_id: 企业成员userid
Returns:
Dict: 企业成员信息
"""
try:
access_token = await self.get_access_token()
if not access_token:
logger.error("获取access_token失败")
return None
url = f"https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token={access_token}&userid={user_id}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
result = await response.json()
if result.get("errcode") == 0:
logger.info(f"获取内部成员信息成功: {result}")
return result
else:
logger.error(f"获取内部成员信息失败: {result}")
return None
except Exception as e:
logger.error(f"获取内部成员信息异常: {str(e)}")
return None
async def handle_chat_change_event(self, chat_id: str, change_type: str, update_detail: str, join_user_id: str = None) -> bool:
"""处理群聊变更事件
Args:
chat_id: 群聊ID
change_type: 变更类型 create/update/dismiss
update_detail: 变更详情 add_member/del_member/change_owner/change_name/change_notice
join_user_id: 加入/离开的用户ID
Returns:
bool: 处理是否成功
"""
from app.models.wecom_external_chat import WecomExternalChatDB, WecomExternalChatMemberDB
from app.models.wecom_external_chat import WecomExternalChatCreate, WecomExternalChatMemberCreate
from app.models.database import SessionLocal
try:
db = SessionLocal()
# 群创建事件
if change_type == "create":
# 获取群聊信息
chat_info = await self.get_external_chat_info(chat_id)
if not chat_info:
logger.warning(f"无法获取群聊信息: {chat_id}")
return False
logger.info(f"获取到群聊信息: {chat_info}")
# 保存群聊信息
chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first()
if not chat_db:
chat_create = WecomExternalChatCreate(
chat_id=chat_id,
name=chat_info.get("name"),
owner=chat_info.get("owner"),
member_count=len(chat_info.get("member_list", [])),
notice=chat_info.get("notice")
)
chat_db = WecomExternalChatDB(**chat_create.model_dump())
db.add(chat_db)
db.commit()
# 保存群成员信息
for member in chat_info.get("member_list", []):
user_id = member.get("userid")
# 处理数字类型: 1=企业成员, 2=外部联系人
member_type_num = member.get("type")
if member_type_num == 1:
member_type = "INTERNAL"
elif member_type_num == 2:
member_type = "EXTERNAL"
else:
member_type = "EXTERNAL" # 默认为外部联系人
logger.info(f"成员类型: {member_type}(原始值:{member_type_num}), 成员ID: {user_id}")
# 检查成员是否已存在
member_db = db.query(WecomExternalChatMemberDB).filter(
WecomExternalChatMemberDB.chat_id == chat_id,
WecomExternalChatMemberDB.user_id == user_id
).first()
if not member_db:
# 获取成员详情 - 根据成员类型调用不同API
user_info = None
if member_type == "EXTERNAL":
try:
user_info = await self.get_external_contact_info(user_id)
logger.info(f"获取到外部联系人详情: {user_info}")
except Exception as e:
logger.warning(f"获取外部联系人信息失败: {user_id}, 错误: {str(e)}")
elif member_type == "INTERNAL":
try:
user_info = await self.get_internal_user_info(user_id)
logger.info(f"获取到内部成员详情: {user_info}")
except Exception as e:
logger.warning(f"获取内部成员信息失败: {user_id}, 错误: {str(e)}")
# 获取名称和unionid (内部成员可能没有unionid)
member_name = None
member_unionid = None
if user_info:
if member_type == "EXTERNAL":
member_name = user_info.get("name")
member_unionid = user_info.get("unionid")
elif member_type == "INTERNAL":
member_name = user_info.get("name")
# 注意: 内部成员可能没有unionid字段
# 如果API获取失败则使用群聊详情中的信息
if not member_name:
member_name = "未知成员" # 没有任何名称信息时的默认值
member_create = WecomExternalChatMemberCreate(
chat_id=chat_id,
user_id=user_id,
type=member_type,
name=member_name,
unionid=member_unionid
)
member_db = WecomExternalChatMemberDB(**member_create.model_dump())
db.add(member_db)
db.commit()
return True
# 群更新事件 - 添加成员
elif change_type == "update" and update_detail == "add_member":
if not join_user_id:
return False
# 检查群聊是否存在
chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first()
if not chat_db:
# 获取群聊信息并创建
chat_info = await self.get_external_chat_info(chat_id)
if chat_info:
chat_create = WecomExternalChatCreate(
chat_id=chat_id,
name=chat_info.get("name"),
owner=chat_info.get("owner"),
member_count=len(chat_info.get("member_list", [])),
notice=chat_info.get("notice")
)
chat_db = WecomExternalChatDB(**chat_create.model_dump())
db.add(chat_db)
db.commit()
# 检查成员是否已存在
member_db = db.query(WecomExternalChatMemberDB).filter(
WecomExternalChatMemberDB.chat_id == chat_id,
WecomExternalChatMemberDB.user_id == join_user_id
).first()
if not member_db:
# 尝试获取群聊成员信息来确定成员类型
member_type = "EXTERNAL" # 默认为外部联系人
member_name = None
user_info = None
# 获取群聊详情,查找成员类型
try:
chat_info = await self.get_external_chat_info(chat_id)
if chat_info and "member_list" in chat_info:
for member in chat_info["member_list"]:
if member.get("userid") == join_user_id:
# 处理数字类型: 1=企业成员, 2=外部联系人
member_type_num = member.get("type")
if member_type_num == 1:
member_type = "INTERNAL"
elif member_type_num == 2:
member_type = "EXTERNAL"
else:
member_type = "EXTERNAL" # 默认为外部联系人
member_name = member.get("name")
logger.info(f"从群聊详情中确定成员类型: {member_type}(原始值:{member_type_num}), 成员: {join_user_id}")
break
except Exception as e:
logger.warning(f"获取群聊详情失败: {str(e)}")
# 如果是内部成员,不做任何处理
if member_type == "INTERNAL":
logger.info(f"成员 {join_user_id} 是内部成员,不做处理")
return True
# 如果是外部联系人,获取详细信息
user_info = None
member_name = None
member_unionid = None
if member_type == "EXTERNAL":
try:
user_info = await self.get_external_contact_info(join_user_id)
if user_info:
member_name = user_info.get("name")
member_unionid = user_info.get("unionid")
logger.info(f"获取到外部联系人详情: {member_name}, unionid: {member_unionid}")
except Exception as e:
logger.warning(f"获取外部联系人信息失败,可能是内部成员: {str(e)}")
if "84061" in str(e) or "not external contact" in str(e):
logger.info(f"用户 {join_user_id} 可能是内部成员,不做处理")
return True
# 如果API获取失败则使用群聊详情中的信息
if not member_name:
member_name = "未知成员" # 没有任何名称信息时的默认值
member_create = WecomExternalChatMemberCreate(
chat_id=chat_id,
user_id=join_user_id,
type=member_type,
name=member_name,
unionid=member_unionid
)
member_db = WecomExternalChatMemberDB(**member_create.model_dump())
db.add(member_db)
db.commit()
# 更新群成员数量
if chat_db:
chat_db.member_count = chat_db.member_count + 1
db.commit()
return True
# 群更新事件 - 移除成员
elif change_type == "update" and update_detail == "del_member":
if not join_user_id:
return False
logger.info(f"处理成员离开群聊事件: 群ID={chat_id}, 成员ID={join_user_id}")
# 删除成员记录
member_db = db.query(WecomExternalChatMemberDB).filter(
WecomExternalChatMemberDB.chat_id == chat_id,
WecomExternalChatMemberDB.user_id == join_user_id
).first()
if member_db:
logger.info(f"从数据库中删除成员记录: {member_db.user_id}, 姓名: {member_db.name}")
db.delete(member_db)
db.commit()
else:
logger.warning(f"未找到要删除的成员记录: chat_id={chat_id}, user_id={join_user_id}")
# 更新群成员数量
chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first()
if chat_db:
chat_db.member_count = max(0, chat_db.member_count - 1)
logger.info(f"更新群成员数量: {chat_db.member_count}")
db.commit()
else:
logger.warning(f"未找到群聊记录: chat_id={chat_id}")
return True
# 群解散事件
elif change_type == "dismiss":
# 标记群聊为非活跃
chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first()
if chat_db:
chat_db.is_active = False
db.commit()
logger.info(f"群聊解散: {chat_id}")
return True
return False
except Exception as e:
logger.error(f"处理群聊变更事件异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
if 'db' in locals():
db.close()
return False
finally:
if 'db' in locals():
db.close()
wecom_client = WecomClient()