from typing import Optional, Dict import aiohttp import logging from app.core.config import settings class WecomClient: def __init__(self): self.corpid = settings.WECHAT_CORP_ID self.provider_secret = settings.WECHAT_CORP_SECRET self.access_token = None async def get_provider_token(self) -> str: """获取服务商token""" try: url = "https://qyapi.weixin.qq.com/cgi-bin/service/get_provider_token" data = { "corpid": self.corpid, "provider_secret": self.provider_secret } async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: result = await response.json() if result.get("errcode") == 0: self.access_token = result["provider_access_token"] return self.access_token else: raise Exception(f"获取provider_token失败: {result}") except Exception as e: logging.error(f"获取provider_token失败: {str(e)}") raise async def miniprogram_to_userid(self, openid: str) -> Optional[Dict]: """ 小程序openid转换为企业微信userid """ try: url = "https://qyapi.weixin.qq.com/cgi-bin/service/miniprogram/jscode2session" params = { "provider_access_token": await self.get_provider_token(), "appid": settings.WECHAT_APPID, "code": openid } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: result = await response.json() if result.get("errcode") == 0: data = { "userid": result.get("userid"), "pending_id": result.get("pending_id") } return data else: logging.error(f"openid转换失败: {result}") return None except Exception as e: logging.error(f"openid转换失败: {str(e)}") return None