deliveryman-api/app/core/wechat.py

44 lines
1.7 KiB
Python

import aiohttp
from app.core.config import settings
class WeChatClient:
"""微信客户端"""
def __init__(self):
self.appid = settings.WECHAT_APPID
self.secret = settings.WECHAT_SECRET
self.access_token = None
async def get_access_token(self):
"""获取接口调用凭证"""
if self.access_token:
return self.access_token
async with aiohttp.ClientSession() as session:
url = f"https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.appid,
"secret": self.secret
}
async with session.get(url, params=params) as response:
result = await response.json()
if "access_token" in result:
self.access_token = result["access_token"]
return self.access_token
raise Exception(result.get("errmsg", "获取access_token失败"))
async def get_phone_number(self, code: str):
"""获取用户手机号"""
access_token = await self.get_access_token()
async with aiohttp.ClientSession() as session:
url = f"https://api.weixin.qq.com/wxa/business/getuserphonenumber"
params = {"access_token": access_token}
data = {"code": code}
async with session.post(url, params=params, json=data) as response:
result = await response.json()
if result.get("errcode") == 0:
return result.get("phone_info")
raise Exception(result.get("errmsg", "获取手机号失败"))