deliveryman-api/app/core/wecomclient.py
2025-03-03 00:07:23 +08:00

125 lines
4.8 KiB
Python

import requests
import json
import time
import logging
from app.core.config import settings
from typing import Dict, Any, Optional, List
import aiohttp
class WecomClient:
"""企业微信客户端"""
def __init__(self):
self.corp_id = settings.WECHAT_CORP_ID
self.corp_secret = settings.WECHAT_CORP_SECRET
self.access_token = None
async def get_access_token(self) -> str:
"""获取访问令牌"""
try:
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {
"corpid": self.corp_id,
"corpsecret": self.corp_secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
self.access_token = result["access_token"]
return self.access_token
else:
raise Exception(f"获取access_token失败: {result}")
except Exception as e:
logging.error(f"获取access_token异常: {str(e)}")
return None
async def code2session(self, js_code: str) -> Optional[Dict[str, Any]]:
"""
小程序登录凭证校验
"""
try:
# 1. 获取 access_token
access_token = await self.get_access_token()
if not access_token:
raise Exception("获取access_token失败")
# 2. 调用 jscode2session 接口
url = "https://qyapi.weixin.qq.com/cgi-bin/miniprogram/jscode2session"
params = {
"access_token": access_token,
"js_code": js_code,
"grant_type": "authorization_code"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
return {
"userid": result.get("userid"),
"pending_id": result.get("pending_id"),
"session_key": result.get("session_key")
}
else:
logging.error(f"code2session失败: {result}")
return None
except Exception as e:
logging.error(f"code2session异常: {str(e)}")
return None
async def unionid_to_external_userid(self, unionid: str, openid: str) -> Optional[str]:
"""根据unionid获取external_userid"""
try:
url = f"https://qyapi.weixin.qq.com/cgi-bin/corpgroup/unionid_to_external_userid?access_token={await self.get_access_token()}"
params = {
"unionid": unionid,
"openid": openid,
"corpid": self.corp_id
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=params) as response:
result = await response.json()
return result
except Exception as e:
logging.error(f"unionid_to_external_userid异常: {str(e)}")
return None
async def send_welcome_message(self, user_info: Dict[str, Any], group_id: str) -> bool:
"""发送欢迎消息"""
try:
# 1. 获取 access_token
access_token = await self.get_access_token()
if not access_token:
logging.error("获取access_token失败")
return False
# 2. 发送欢迎消息
url = "https://qyapi.weixin.qq.com/cgi-bin/message/send"
params = {
"access_token": access_token,
"chatid": group_id,
"msgtype": "text",
"text": {
"content": f"欢迎 @{user_info['name']} 加入群聊!\n我们为您准备了专属优惠券,请查收~"
}
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=params) as response:
result = await response.json()
if result.get("errcode") == 0:
return True
else:
logging.error(f"发送欢迎消息失败: {result}")
return False
except Exception as e:
logging.error(f"发送欢迎消息异常: {str(e)}")
return False
wecom_client = WecomClient()