deliveryman-api/app/core/wecomclient.py
2025-03-30 11:39:03 +08:00

418 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import time
import logging
from app.core.config import settings
from typing import Dict, Any, Optional, List
import aiohttp
logging.basicConfig(level=logging.INFO)
class WecomClient:
"""企业微信客户端"""
def __init__(self):
self.corp_id = settings.WECHAT_CORP_ID
self.corp_secret = settings.WECHAT_CORP_SECRET
self.access_token = None
async def get_access_token(self) -> str:
"""获取访问令牌"""
try:
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {
"corpid": self.corp_id,
"corpsecret": self.corp_secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
self.access_token = result["access_token"]
return self.access_token
else:
raise Exception(f"获取access_token失败: {result}")
except Exception as e:
logging.error(f"获取access_token异常: {str(e)}")
return None
async def code2session(self, js_code: str) -> Optional[Dict[str, Any]]:
"""
小程序登录凭证校验
"""
try:
# 1. 获取 access_token
access_token = await self.get_access_token()
if not access_token:
raise Exception("获取access_token失败")
# 2. 调用 jscode2session 接口
url = "https://qyapi.weixin.qq.com/cgi-bin/miniprogram/jscode2session"
params = {
"access_token": access_token,
"js_code": js_code,
"grant_type": "authorization_code"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
return {
"userid": result.get("userid"),
"pending_id": result.get("pending_id"),
"session_key": result.get("session_key")
}
else:
logging.error(f"code2session失败: {result}")
return None
except Exception as e:
logging.error(f"code2session异常: {str(e)}")
return None
async def get_unionid_from_userid(self, userid: str) -> Optional[str]:
"""根据userid获取unionid"""
try:
access_token = await self.get_access_token()
if not access_token:
raise Exception("获取access_token失败")
url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get"
params = {
"access_token": access_token,
"external_userid": userid
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
return result.get("unionid")
except Exception as e:
logging.error(f"get_unionid_from_userid异常: {str(e)}")
return None
async def unionid_to_external_userid(self, unionid: str, openid: str) -> Optional[str]:
"""根据unionid获取external_userid"""
try:
url = f"https://qyapi.weixin.qq.com/cgi-bin/idconvert/unionid_to_external_userid?access_token={await self.get_access_token()}"
params = {
"unionid": unionid,
"openid": openid,
"subject_type": 1
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=params) as response:
result = await response.json()
return result
except Exception as e:
logging.error(f"unionid_to_external_userid异常: {str(e)}")
return None
async def send_welcome_message(self, chat_id: str, user_id: str = None) -> bool:
"""发送欢迎消息
Args:
chat_id: 群聊ID
user_id: 用户ID如果指定则发送私信否则发送群消息
Returns:
bool: 是否发送成功
"""
try:
# 1. 获取 access_token
access_token = await self.get_access_token()
if not access_token:
logging.error("获取access_token失败")
return False
welcome_text = f"""🥳 欢迎您进群,在群内可以享受📦【代取快递】跑腿服务。
‼ 微信下单,快递到家 ‼
🎁 新人礼包
𝟏 赠送𝟏𝟓张【𝟑元跑腿券】
𝟐 赠送𝟔枚鲜鸡蛋【首次下单】
━ ━ ━ ━ ━🎊━ ━ ━ ━ ━
↓点击↓小程序领券下单 &"""
# 2. 发送欢迎消息
if user_id:
# 发送私信
url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/send_welcome_msg?access_token={access_token}"
data = {
"welcome_code": user_id, # 这里使用user_id作为临时的欢迎码
"text": {
"content": welcome_text
}
}
else:
# 发送群消息
url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/send?access_token={access_token}"
data = {
"chatid": chat_id,
"msgtype": "text",
"text": {
"content": welcome_text
},
"safe": 0
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
result = await response.json()
if result.get("errcode") == 0:
return True
else:
logging.error(f"发送欢迎消息失败: {result}")
return False
except Exception as e:
logging.error(f"发送欢迎消息异常: {str(e)}")
return False
async def get_external_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]:
"""获取外部群聊信息
Args:
chat_id: 群聊ID
Returns:
Dict: 群聊信息
"""
try:
access_token = await self.get_access_token()
if not access_token:
logging.error("获取access_token失败")
return None
url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/groupchat/get?access_token={access_token}"
data = {
"chat_id": chat_id
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
result = await response.json()
if result.get("errcode") == 0:
return result.get("group_chat")
else:
logging.error(f"获取外部群聊信息失败: {result}")
return None
except Exception as e:
logging.error(f"获取外部群聊信息异常: {str(e)}")
return None
async def get_external_contact_info(self, external_userid: str) -> Optional[Dict[str, Any]]:
"""获取外部联系人信息
Args:
external_userid: 外部联系人ID
Returns:
Dict: 外部联系人信息
"""
try:
access_token = await self.get_access_token()
if not access_token:
logging.error("获取access_token失败")
return None
url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get?access_token={access_token}"
params = {
"external_userid": external_userid
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
return result.get("external_contact")
else:
logging.error(f"获取外部联系人信息失败: {result}")
return None
except Exception as e:
logging.error(f"获取外部联系人信息异常: {str(e)}")
return None
async def handle_chat_change_event(self, chat_id: str, change_type: str, update_detail: str, join_user_id: str = None) -> bool:
"""处理群聊变更事件
Args:
chat_id: 群聊ID
change_type: 变更类型 create/update/dismiss
update_detail: 变更详情 add_member/del_member/change_owner/change_name/change_notice
join_user_id: 加入的用户ID
Returns:
bool: 处理是否成功
"""
from app.models.wecom_external_chat import WecomExternalChatDB, WecomExternalChatMemberDB
from app.models.wecom_external_chat import WecomExternalChatCreate, WecomExternalChatMemberCreate
from app.models.database import SessionLocal
try:
db = SessionLocal()
# 群创建事件
if change_type == "create":
# 获取群聊信息
chat_info = await self.get_external_chat_info(chat_id)
if not chat_info:
return False
# 保存群聊信息
chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first()
if not chat_db:
chat_create = WecomExternalChatCreate(
chat_id=chat_id,
name=chat_info.get("name"),
owner=chat_info.get("owner"),
member_count=len(chat_info.get("member_list", [])),
notice=chat_info.get("notice")
)
chat_db = WecomExternalChatDB(**chat_create.dict())
db.add(chat_db)
db.commit()
# 保存群成员信息
for member in chat_info.get("member_list", []):
user_id = member.get("userid")
member_type = member.get("type")
# 检查成员是否已存在
member_db = db.query(WecomExternalChatMemberDB).filter(
WecomExternalChatMemberDB.chat_id == chat_id,
WecomExternalChatMemberDB.user_id == user_id
).first()
if not member_db:
# 获取外部联系人详情
user_info = None
if member_type == "EXTERNAL":
user_info = await self.get_external_contact_info(user_id)
member_create = WecomExternalChatMemberCreate(
chat_id=chat_id,
user_id=user_id,
type=member_type,
name=user_info.get("name") if user_info else None,
unionid=user_info.get("unionid") if user_info else None
)
member_db = WecomExternalChatMemberDB(**member_create.dict())
db.add(member_db)
db.commit()
return True
# 群更新事件 - 添加成员
elif change_type == "update" and update_detail == "add_member":
if not join_user_id:
return False
# 检查群聊是否存在
chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first()
if not chat_db:
# 获取群聊信息并创建
chat_info = await self.get_external_chat_info(chat_id)
if chat_info:
chat_create = WecomExternalChatCreate(
chat_id=chat_id,
name=chat_info.get("name"),
owner=chat_info.get("owner"),
member_count=len(chat_info.get("member_list", [])),
notice=chat_info.get("notice")
)
chat_db = WecomExternalChatDB(**chat_create.dict())
db.add(chat_db)
db.commit()
# 检查成员是否已存在
member_db = db.query(WecomExternalChatMemberDB).filter(
WecomExternalChatMemberDB.chat_id == chat_id,
WecomExternalChatMemberDB.user_id == join_user_id
).first()
if not member_db:
# 判断是内部成员还是外部联系人
member_type = "EXTERNAL" # 默认为外部联系人
# 获取外部联系人详情
user_info = await self.get_external_contact_info(join_user_id)
member_create = WecomExternalChatMemberCreate(
chat_id=chat_id,
user_id=join_user_id,
type=member_type,
name=user_info.get("name") if user_info else None,
unionid=user_info.get("unionid") if user_info else None
)
member_db = WecomExternalChatMemberDB(**member_create.dict())
db.add(member_db)
db.commit()
# 发送欢迎消息
if member_type == "EXTERNAL":
await self.send_welcome_message(chat_id)
# 更新发送欢迎消息状态
member_db.welcome_sent = True
db.commit()
# 更新群成员数量
if chat_db:
chat_db.member_count = chat_db.member_count + 1
db.commit()
return True
# 群更新事件 - 移除成员
elif change_type == "update" and update_detail == "del_member":
if not join_user_id:
return False
# 删除成员记录
member_db = db.query(WecomExternalChatMemberDB).filter(
WecomExternalChatMemberDB.chat_id == chat_id,
WecomExternalChatMemberDB.user_id == join_user_id
).first()
if member_db:
db.delete(member_db)
db.commit()
# 更新群成员数量
chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first()
if chat_db:
chat_db.member_count = max(0, chat_db.member_count - 1)
db.commit()
return True
# 群解散事件
elif change_type == "dismiss":
# 标记群聊为非活跃
chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first()
if chat_db:
chat_db.is_active = False
db.commit()
return True
return False
except Exception as e:
logging.error(f"处理群聊变更事件异常: {str(e)}")
if 'db' in locals():
db.close()
return False
finally:
if 'db' in locals():
db.close()
wecom_client = WecomClient()