修复 wechat 解析返回。

This commit is contained in:
aaron 2025-01-22 13:46:50 +08:00
parent 66db8f0a1e
commit 5b5aa13c29

View File

@ -25,61 +25,67 @@ class WeChatClient:
self.secret = settings.WECHAT_SECRET
async def get_access_token(self):
"""获取接口调用凭证"""
if self.access_token:
return self.access_token
"""获取小程序全局接口调用凭据"""
url = "https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.appid,
"secret": self.secret
}
async with aiohttp.ClientSession() as session:
url = f"https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.appid,
"secret": self.secret
}
async with session.get(url, params=params) as response:
result = await response.json()
if "access_token" in result:
self.access_token = result["access_token"]
return self.access_token
raise Exception(result.get("errmsg", "获取access_token失败"))
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
async def get_phone_number(self, code: str):
if "errcode" in result and result["errcode"] != 0:
raise Exception(result.get("errmsg", "获取access_token失败"))
return result.get("access_token")
async def get_phone_number(self, code: str) -> dict:
"""获取用户手机号"""
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber?access_token={access_token}"
async with aiohttp.ClientSession() as session:
url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber"
params = {"access_token": access_token}
data = {"code": code}
async with session.post(url, json={"code": code}) as response:
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
async with session.post(url, params=params, json=data) as response:
result = await response.json()
if result.get("errcode") == 0:
return result.get("phone_info")
raise Exception(result.get("errmsg", "获取手机号失败"))
if result.get("errcode", 0) != 0:
raise Exception(result.get("errmsg", "获取手机号失败"))
return result.get("phone_info", {})
async def code2session(self, code: str) -> dict:
"""通过 code 获取用户 openid
"""通过 code 获取用户 openid"""
url = f"https://api.weixin.qq.com/sns/jscode2session"
params = {
"appid": self.appid,
"secret": self.secret,
"js_code": code,
"grant_type": "authorization_code"
}
Args:
code: 登录凭证
Returns:
dict: 包含 openid 等信息
"""
async with aiohttp.ClientSession() as session:
url = "https://api.weixin.qq.com/sns/jscode2session"
params = {
"appid": self.appid,
"secret": self.secret,
"js_code": code,
"grant_type": "authorization_code"
}
async with session.get(url, params=params) as response:
result = await response.json()
if "openid" in result:
return result
raise Exception(result.get("errmsg", "获取openid失败"))
text = await response.text()
try:
result = json.loads(text)
except json.JSONDecodeError:
raise Exception(f"解析微信返回数据失败: {text}")
if "errcode" in result and result["errcode"] != 0:
raise Exception(result.get("errmsg", "获取openid失败"))
return result