import aiohttp import json from app.core.config import settings import redis import time from typing import Dict, Any, Optional class MPClient: """微信公众号客户端""" def __init__(self): self.appid = settings.MP_APPID self.secret = settings.MP_SECRET async def get_access_token(self) -> str: """获取访问令牌""" async with aiohttp.ClientSession() as session: url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.appid}&secret={self.secret}" async with session.get(url) as response: result = await response.json() if "access_token" not in result: raise Exception(f"获取access_token失败: {result}") access_token = result["access_token"] return access_token async def send_template_message( self, openid: str, template_id: str, data: Dict[str, Any], url: Optional[str] = None, miniprogram: Optional[Dict[str, str]] = None ) -> bool: """ 发送模板消息 :param openid: 用户openid :param template_id: 模板ID :param data: 模板数据 :param url: 点击跳转的链接(可选) :param miniprogram: 跳转小程序信息(可选) :return: 发送是否成功 """ try: access_token = await self.get_access_token() api_url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={access_token}" message = { "touser": openid, "template_id": template_id, "data": { key: { "value": value } for key, value in data.items() } } # 添加跳转链接 if url: message["url"] = url # 添加小程序跳转信息 if miniprogram: message["miniprogram"] = miniprogram print(f"发送模板消息: {message}") async with aiohttp.ClientSession() as session: async with session.post(api_url, json=message) as response: result = await response.json() print(f"发送模板消息结果: {result}") if result.get("errcode") == 0: return True return False except Exception as e: print(f"发送模板消息异常: {str(e)}") return False # 根据unionid获取用户信息 async def get_user_info(self, openid: str) -> Optional[Dict]: """ 获取用户基本信息 :param openid: 用户openid :return: 用户信息字典 """ try: access_token = await self.get_access_token() url = f"https://api.weixin.qq.com/cgi-bin/user/info?access_token={access_token}&openid={openid}&lang=zh_CN" async with aiohttp.ClientSession() as session: async with session.get(url) as response: result = await response.json() if "errcode" in result: print(f"获取用户信息失败: {result}") return None return result except Exception as e: print(f"获取用户信息异常: {str(e)}") return None # 创建全局实例 mp_client = MPClient()