deliveryman-api/app/core/wecomclient.py
2025-02-27 12:29:24 +08:00

91 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aiohttp
from app.core.config import settings
import logging
from typing import Optional, Dict
class WecomClient:
def __init__(self):
self.corpid = settings.WECHAT_CORP_ID
self.secret = settings.WECHAT_CORP_SECRET # 注意:这里使用普通的 secret不是 provider_secret
self.access_token = None
async def get_access_token(self) -> str:
"""获取企业微信普通access_token"""
try:
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {
"corpid": self.corpid,
"corpsecret": self.secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
self.access_token = result["access_token"]
print(f"企业微信access_token: {self.access_token}")
return self.access_token
else:
logging.error(f"获取access_token返回: {result}")
raise Exception(f"获取access_token失败: {result}")
except Exception as e:
logging.error(f"获取access_token失败: {str(e)}")
raise
async def miniprogram_to_userid(self, code: str) -> Optional[Dict]:
"""
小程序code转换为企业微信userid
注意这里使用code而不是openid
"""
try:
url = "https://qyapi.weixin.qq.com/cgi-bin/miniprogram/jscode2session"
params = {
"access_token": await self.get_access_token(),
"js_code": code,
"grant_type": "authorization_code"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
print(result)
if result.get("errcode") == 0:
data = {
"userid": result.get("userid"),
"pending_id": result.get("pending_id"),
"session_key": result.get("session_key")
}
return data
else:
logging.error(f"code转换失败: {result}")
return None
except Exception as e:
logging.error(f"code转换失败: {str(e)}")
return None
async def get_user_info(self, userid: str) -> Optional[Dict]:
"""获取企业成员信息"""
try:
url = "https://qyapi.weixin.qq.com/cgi-bin/user/get"
params = {
"access_token": await self.get_access_token(),
"userid": userid
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
return result
else:
logging.error(f"获取用户信息失败: {result}")
return None
except Exception as e:
logging.error(f"获取用户信息失败: {str(e)}")
return None