44 lines
1.7 KiB
Python
44 lines
1.7 KiB
Python
import aiohttp
|
|
from app.core.config import settings
|
|
|
|
class WeChatClient:
|
|
"""微信客户端"""
|
|
|
|
def __init__(self):
|
|
self.appid = settings.WECHAT_APPID
|
|
self.secret = settings.WECHAT_SECRET
|
|
self.access_token = None
|
|
|
|
async def get_access_token(self):
|
|
"""获取接口调用凭证"""
|
|
if self.access_token:
|
|
return self.access_token
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
url = f"https://api.weixin.qq.com/cgi-bin/token"
|
|
params = {
|
|
"grant_type": "client_credential",
|
|
"appid": self.appid,
|
|
"secret": self.secret
|
|
}
|
|
async with session.get(url, params=params) as response:
|
|
result = await response.json()
|
|
if "access_token" in result:
|
|
self.access_token = result["access_token"]
|
|
return self.access_token
|
|
raise Exception(result.get("errmsg", "获取access_token失败"))
|
|
|
|
async def get_phone_number(self, code: str):
|
|
"""获取用户手机号"""
|
|
access_token = await self.get_access_token()
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber"
|
|
params = {"access_token": access_token}
|
|
data = {"code": code}
|
|
|
|
async with session.post(url, params=params, json=data) as response:
|
|
result = await response.json()
|
|
if result.get("errcode") == 0:
|
|
return result.get("phone_info")
|
|
raise Exception(result.get("errmsg", "获取手机号失败")) |