deliveryman-api/app/core/wecomclient.py
2025-02-27 18:41:49 +08:00

75 lines
2.5 KiB
Python

import requests
import json
import time
import logging
from app.core.config import settings
from typing import Dict, Any, Optional, List
class WecomClient:
"""企业微信客户端"""
def __init__(self):
self.corp_id = settings.WECHAT_CORP_ID
self.corp_secret = settings.WECHAT_CORP_SECRET
self.access_token = None
self.token_expires_at = 0
async def get_access_token(self) -> str:
"""获取访问令牌"""
# 检查令牌是否过期
if self.access_token and time.time() < self.token_expires_at:
return self.access_token
# 获取新令牌
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.corp_id}&corpsecret={self.corp_secret}"
try:
response = requests.get(url)
result = response.json()
if result.get("errcode") == 0:
self.access_token = result.get("access_token")
self.token_expires_at = time.time() + result.get("expires_in") - 200 # 提前200秒过期
return self.access_token
else:
logging.error(f"获取企业微信访问令牌失败: {result}")
return None
except Exception as e:
logging.exception(f"获取企业微信访问令牌异常: {str(e)}")
return None
async def code2session(self, code: str) -> Optional[Dict[str, Any]]:
"""
小程序code转换为企业微信userid
Args:
code: 小程序用户的code
Returns:
Dict: 包含userid或pending_id的字典
"""
token = await self.get_access_token()
if not token:
return None
url = f"https://qyapi.weixin.qq.com/cgi-bin/miniprogram/jscode2session?access_token={token}&js_code={code}&grant_type=authorization_code"
try:
response = requests.get(url)
result = response.json()
if result.get("errcode") == 0:
return {
"userid": result.get("userid"),
"pending_id": result.get("pending_id")
}
else:
logging.error(f"小程序code转换失败: {result}")
return None
except Exception as e:
logging.exception(f"小程序code转换异常: {str(e)}")
return None
wecom_client = WecomClient()