deliveryman-api/app/core/wecomclient.py
2025-02-27 13:12:04 +08:00

93 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aiohttp
from app.core.config import settings
import logging
from typing import Optional, Dict
class WecomClient:
def __init__(self):
self.corpid = settings.WECHAT_CORP_ID
self.secret = settings.WECHAT_CORP_SECRET # 注意:这里使用普通的 secret不是 provider_secret
self.access_token = None
async def get_access_token(self) -> str:
"""获取企业微信普通access_token"""
try:
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {
"corpid": self.corpid,
"corpsecret": self.secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
self.access_token = result["access_token"]
print(f"企业微信access_token: {self.access_token}")
return self.access_token
else:
logging.error(f"获取access_token返回: {result}")
raise Exception(f"获取access_token失败: {result}")
except Exception as e:
logging.error(f"获取access_token失败: {str(e)}")
raise
async def miniprogram_to_userid(self, openid: str) -> Optional[Dict]:
"""
转换小程序身份到企业微信
可以使用code或openid
"""
try:
# 构建请求参数
url = "https://qyapi.weixin.qq.com/cgi-bin/miniprogram/jscode2session"
params = {
"access_token": await self.get_access_token(),
"appid": settings.WECHAT_APPID,
}
# 根据传入参数类型选择
params["openid"] = openid
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
logging.info(f"转换结果: {result}")
if result.get("errcode") == 0:
return {
"userid": result.get("userid"), # 如果是企业成员返回userid
"pending_id": result.get("pending_id"), # 如果是外部联系人返回pending_id
"openid": result.get("openid"),
"session_key": result.get("session_key")
}
else:
logging.error(f"身份转换失败: {result}")
return None
except Exception as e:
logging.error(f"身份转换失败: {str(e)}")
return None
async def get_user_info(self, userid: str) -> Optional[Dict]:
"""获取企业成员信息"""
try:
url = "https://qyapi.weixin.qq.com/cgi-bin/user/get"
params = {
"access_token": await self.get_access_token(),
"userid": userid
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
return result
else:
logging.error(f"获取用户信息失败: {result}")
return None
except Exception as e:
logging.error(f"获取用户信息失败: {str(e)}")
return None