156 lines
6.0 KiB
Python
156 lines
6.0 KiB
Python
import requests
|
||
import json
|
||
import time
|
||
import logging
|
||
from app.core.config import settings
|
||
from typing import Dict, Any, Optional, List
|
||
import aiohttp
|
||
class WecomClient:
|
||
"""企业微信客户端"""
|
||
|
||
def __init__(self):
|
||
self.corp_id = settings.WECHAT_CORP_ID
|
||
self.corp_secret = settings.WECHAT_CORP_SECRET
|
||
self.access_token = None
|
||
|
||
async def get_access_token(self) -> str:
|
||
"""获取访问令牌"""
|
||
try:
|
||
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
|
||
params = {
|
||
"corpid": self.corp_id,
|
||
"corpsecret": self.corp_secret
|
||
}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url, params=params) as response:
|
||
result = await response.json()
|
||
|
||
if result.get("errcode") == 0:
|
||
self.access_token = result["access_token"]
|
||
return self.access_token
|
||
else:
|
||
raise Exception(f"获取access_token失败: {result}")
|
||
except Exception as e:
|
||
logging.error(f"获取access_token异常: {str(e)}")
|
||
return None
|
||
|
||
async def code2session(self, js_code: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
小程序登录凭证校验
|
||
"""
|
||
try:
|
||
# 1. 获取 access_token
|
||
access_token = await self.get_access_token()
|
||
if not access_token:
|
||
raise Exception("获取access_token失败")
|
||
|
||
# 2. 调用 jscode2session 接口
|
||
url = "https://qyapi.weixin.qq.com/cgi-bin/miniprogram/jscode2session"
|
||
params = {
|
||
"access_token": access_token,
|
||
"js_code": js_code,
|
||
"grant_type": "authorization_code"
|
||
}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url, params=params) as response:
|
||
result = await response.json()
|
||
|
||
if result.get("errcode") == 0:
|
||
return {
|
||
"userid": result.get("userid"),
|
||
"pending_id": result.get("pending_id"),
|
||
"session_key": result.get("session_key")
|
||
}
|
||
else:
|
||
logging.error(f"code2session失败: {result}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logging.error(f"code2session异常: {str(e)}")
|
||
return None
|
||
|
||
async def get_unionid_from_userid(self, userid: str) -> Optional[str]:
|
||
"""根据userid获取unionid"""
|
||
try:
|
||
access_token = await self.get_access_token()
|
||
if not access_token:
|
||
raise Exception("获取access_token失败")
|
||
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get"
|
||
params = {
|
||
"access_token": access_token,
|
||
"external_userid": userid
|
||
}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url, params=params) as response:
|
||
result = await response.json()
|
||
|
||
return result.get("unionid")
|
||
except Exception as e:
|
||
logging.error(f"get_unionid_from_userid异常: {str(e)}")
|
||
return None
|
||
|
||
async def unionid_to_external_userid(self, unionid: str, openid: str) -> Optional[str]:
|
||
"""根据unionid获取external_userid"""
|
||
try:
|
||
url = f"https://qyapi.weixin.qq.com/cgi-bin/idconvert/unionid_to_external_userid?access_token={await self.get_access_token()}"
|
||
params = {
|
||
"unionid": unionid,
|
||
"openid": openid,
|
||
"subject_type": 1
|
||
}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.post(url, json=params) as response:
|
||
result = await response.json()
|
||
|
||
return result
|
||
except Exception as e:
|
||
logging.error(f"unionid_to_external_userid异常: {str(e)}")
|
||
return None
|
||
|
||
async def send_welcome_message(self, group_id: str) -> bool:
|
||
"""发送欢迎消息"""
|
||
try:
|
||
# 1. 获取 access_token
|
||
access_token = await self.get_access_token()
|
||
if not access_token:
|
||
logging.error("获取access_token失败")
|
||
return False
|
||
|
||
# 2. 发送欢迎消息
|
||
url = "https://qyapi.weixin.qq.com/cgi-bin/message/send"
|
||
params = {
|
||
"access_token": access_token,
|
||
"group_id": group_id,
|
||
"msgtype": "text",
|
||
"text": {
|
||
"content": f"""🥳 欢迎您进群,在群内可以享受📦【代取快递】跑腿服务。
|
||
|
||
‼ 微信下单,快递到家 ‼
|
||
|
||
🎁 新人礼包
|
||
𝟏 赠送𝟏𝟓张【𝟑元跑腿券】
|
||
𝟐 赠送𝟔枚鲜鸡蛋【首次下单】
|
||
━ ━ ━ ━ ━🎊━ ━ ━ ━ ━
|
||
↓点击↓小程序领券下单 &"""
|
||
}
|
||
}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.post(url, json=params) as response:
|
||
result = await response.json()
|
||
if result.get("errcode") == 0:
|
||
return True
|
||
else:
|
||
logging.error(f"发送欢迎消息失败: {result}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logging.error(f"发送欢迎消息异常: {str(e)}")
|
||
return False
|
||
|
||
wecom_client = WecomClient() |