api/app/services/wechat.py
2025-04-09 11:06:26 +08:00

52 lines
1.6 KiB
Python

import httpx
from typing import Optional, Dict, Any, Tuple
from app.core.config import settings
from app.core.exceptions import BusinessError
class WechatLoginError(Exception):
"""微信登录错误"""
pass
async def code2session(code: str) -> Tuple[str, Optional[str]]:
"""
使用微信登录code获取openid和unionid
Args:
code: 微信临时登录凭证
Returns:
Tuple[str, Optional[str]]: (openid, unionid)
Raises:
BusinessError: 当微信API调用失败时
"""
url = "https://api.weixin.qq.com/sns/jscode2session"
params = {
"appid": settings.WECHAT_APP_ID,
"secret": settings.WECHAT_APP_SECRET,
"js_code": code,
"grant_type": "authorization_code"
}
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params)
result = response.json()
if "errcode" in result and result["errcode"] != 0:
raise BusinessError(f"微信登录失败: {result.get('errmsg', '未知错误')}", code=500)
openid = result.get("openid")
unionid = result.get("unionid") # 可能为None
if not openid:
raise BusinessError("无法获取openid", code=500)
return openid, unionid
except httpx.RequestError as e:
raise BusinessError(f"网络请求失败: {str(e)}", code=500)
except Exception as e:
if isinstance(e, BusinessError):
raise
raise BusinessError(f"未知错误: {str(e)}", code=500)