deliveryman-api/app/core/mpclient.py
2025-03-08 13:58:34 +08:00

112 lines
3.7 KiB
Python

import aiohttp
import json
from app.core.config import settings
import redis
import time
from typing import Dict, Any, Optional
class MPClient:
"""微信公众号客户端"""
def __init__(self):
self.appid = settings.MP_APPID
self.secret = settings.MP_SECRET
async def get_access_token(self) -> str:
"""获取访问令牌"""
async with aiohttp.ClientSession() as session:
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.appid}&secret={self.secret}"
async with session.get(url) as response:
result = await response.json()
if "access_token" not in result:
raise Exception(f"获取access_token失败: {result}")
access_token = result["access_token"]
return access_token
async def send_template_message(
self,
openid: str,
template_id: str,
data: Dict[str, Any],
url: Optional[str] = None,
miniprogram: Optional[Dict[str, str]] = None
) -> bool:
"""
发送模板消息
:param openid: 用户openid
:param template_id: 模板ID
:param data: 模板数据
:param url: 点击跳转的链接(可选)
:param miniprogram: 跳转小程序信息(可选)
:return: 发送是否成功
"""
try:
access_token = await self.get_access_token()
api_url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={access_token}"
message = {
"touser": openid,
"template_id": template_id,
"data": {
key: {
"value": value
} for key, value in data.items()
}
}
# 添加跳转链接
if url:
message["url"] = url
# 添加小程序跳转信息
if miniprogram:
message["miniprogram"] = miniprogram
print(f"发送模板消息: {message}")
async with aiohttp.ClientSession() as session:
async with session.post(api_url, json=message) as response:
result = await response.json()
print(f"发送模板消息结果: {result}")
if result.get("errcode") == 0:
return True
return False
except Exception as e:
print(f"发送模板消息异常: {str(e)}")
return False
# 根据unionid获取用户信息
async def get_user_info(self, openid: str) -> Optional[Dict]:
"""
获取用户基本信息
:param openid: 用户openid
:return: 用户信息字典
"""
try:
access_token = await self.get_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/user/info?access_token={access_token}&openid={openid}&lang=zh_CN"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
result = await response.json()
if "errcode" in result:
print(f"获取用户信息失败: {result}")
return None
return result
except Exception as e:
print(f"获取用户信息异常: {str(e)}")
return None
# 创建全局实例
mp_client = MPClient()