109 lines
3.7 KiB
Python
109 lines
3.7 KiB
Python
import aiohttp
|
|
import json
|
|
from app.core.config import settings
|
|
import redis
|
|
import time
|
|
from typing import Dict, Any, Optional
|
|
|
|
class MPClient:
|
|
"""微信公众号客户端"""
|
|
|
|
def __init__(self):
|
|
self.appid = settings.MP_APPID
|
|
self.secret = settings.MP_SECRET
|
|
|
|
async def get_access_token(self) -> str:
|
|
"""获取访问令牌"""
|
|
async with aiohttp.ClientSession() as session:
|
|
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.appid}&secret={self.secret}"
|
|
async with session.get(url) as response:
|
|
result = await response.json()
|
|
|
|
if "access_token" not in result:
|
|
raise Exception(f"获取access_token失败: {result}")
|
|
|
|
access_token = result["access_token"]
|
|
|
|
return access_token
|
|
|
|
async def send_template_message(
|
|
self,
|
|
openid: str,
|
|
template_id: str,
|
|
data: Dict[str, Any],
|
|
url: Optional[str] = None,
|
|
miniprogram: Optional[Dict[str, str]] = None
|
|
) -> bool:
|
|
"""
|
|
发送模板消息
|
|
:param openid: 用户openid
|
|
:param template_id: 模板ID
|
|
:param data: 模板数据
|
|
:param url: 点击跳转的链接(可选)
|
|
:param miniprogram: 跳转小程序信息(可选)
|
|
:return: 发送是否成功
|
|
"""
|
|
try:
|
|
access_token = await self.get_access_token()
|
|
api_url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={access_token}"
|
|
|
|
message = {
|
|
"touser": openid,
|
|
"template_id": template_id,
|
|
"data": {
|
|
key: {
|
|
"value": value
|
|
} for key, value in data.items()
|
|
}
|
|
}
|
|
|
|
# 添加跳转链接
|
|
if url:
|
|
message["url"] = url
|
|
|
|
# 添加小程序跳转信息
|
|
if miniprogram:
|
|
message["miniprogram"] = miniprogram
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(api_url, json=message) as response:
|
|
result = await response.json()
|
|
|
|
if result.get("errcode") == 0:
|
|
return True
|
|
|
|
print(f"发送模板消息失败: {result}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"发送模板消息异常: {str(e)}")
|
|
return False
|
|
|
|
|
|
# 根据unionid获取用户信息
|
|
async def get_user_info(self, openid: str) -> Optional[Dict]:
|
|
"""
|
|
获取用户基本信息
|
|
:param openid: 用户openid
|
|
:return: 用户信息字典
|
|
"""
|
|
try:
|
|
access_token = await self.get_access_token()
|
|
url = f"https://api.weixin.qq.com/cgi-bin/user/info?access_token={access_token}&openid={openid}&lang=zh_CN"
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
result = await response.json()
|
|
|
|
if "errcode" in result:
|
|
print(f"获取用户信息失败: {result}")
|
|
return None
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
print(f"获取用户信息异常: {str(e)}")
|
|
return None
|
|
|
|
# 创建全局实例
|
|
mp_client = MPClient() |