deliveryman-api/app/core/wecomclient.py
2025-03-04 15:27:30 +08:00

156 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import time
import logging
from app.core.config import settings
from typing import Dict, Any, Optional, List
import aiohttp
class WecomClient:
"""企业微信客户端"""
def __init__(self):
self.corp_id = settings.WECHAT_CORP_ID
self.corp_secret = settings.WECHAT_CORP_SECRET
self.access_token = None
async def get_access_token(self) -> str:
"""获取访问令牌"""
try:
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
params = {
"corpid": self.corp_id,
"corpsecret": self.corp_secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
self.access_token = result["access_token"]
return self.access_token
else:
raise Exception(f"获取access_token失败: {result}")
except Exception as e:
logging.error(f"获取access_token异常: {str(e)}")
return None
async def code2session(self, js_code: str) -> Optional[Dict[str, Any]]:
"""
小程序登录凭证校验
"""
try:
# 1. 获取 access_token
access_token = await self.get_access_token()
if not access_token:
raise Exception("获取access_token失败")
# 2. 调用 jscode2session 接口
url = "https://qyapi.weixin.qq.com/cgi-bin/miniprogram/jscode2session"
params = {
"access_token": access_token,
"js_code": js_code,
"grant_type": "authorization_code"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
if result.get("errcode") == 0:
return {
"userid": result.get("userid"),
"pending_id": result.get("pending_id"),
"session_key": result.get("session_key")
}
else:
logging.error(f"code2session失败: {result}")
return None
except Exception as e:
logging.error(f"code2session异常: {str(e)}")
return None
async def get_unionid_from_userid(self, userid: str) -> Optional[str]:
"""根据userid获取unionid"""
try:
access_token = await self.get_access_token()
if not access_token:
raise Exception("获取access_token失败")
url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get"
params = {
"access_token": access_token,
"external_userid": userid
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
result = await response.json()
return result.get("unionid")
except Exception as e:
logging.error(f"get_unionid_from_userid异常: {str(e)}")
return None
async def unionid_to_external_userid(self, unionid: str, openid: str) -> Optional[str]:
"""根据unionid获取external_userid"""
try:
url = f"https://qyapi.weixin.qq.com/cgi-bin/idconvert/unionid_to_external_userid?access_token={await self.get_access_token()}"
params = {
"unionid": unionid,
"openid": openid,
"subject_type": 1
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=params) as response:
result = await response.json()
return result
except Exception as e:
logging.error(f"unionid_to_external_userid异常: {str(e)}")
return None
async def send_welcome_message(self, user_info: Dict[str, Any], group_id: str) -> bool:
"""发送欢迎消息"""
try:
# 1. 获取 access_token
access_token = await self.get_access_token()
if not access_token:
logging.error("获取access_token失败")
return False
# 2. 发送欢迎消息
url = "https://qyapi.weixin.qq.com/cgi-bin/message/send"
params = {
"access_token": access_token,
"group_id": group_id,
"msgtype": "text",
"text": {
"content": f"""🥳 欢迎您进群,在群内可以享受📦【代取快递】跑腿服务。
‼ 微信下单,快递到家 ‼
🎁 新人礼包
𝟏 赠送𝟏𝟓张【𝟑元跑腿券】
𝟐 赠送𝟔枚鲜鸡蛋【首次下单】
━ ━ ━ ━ ━🎊━ ━ ━ ━ ━
↓点击↓小程序领券下单 &"""
}
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=params) as response:
result = await response.json()
if result.get("errcode") == 0:
return True
else:
logging.error(f"发送欢迎消息失败: {result}")
return False
except Exception as e:
logging.error(f"发送欢迎消息异常: {str(e)}")
return False
wecom_client = WecomClient()