import requests import json import time import logging from app.core.config import settings from typing import Dict, Any, Optional, List import aiohttp class WecomClient: """企业微信客户端""" def __init__(self): self.corp_id = settings.WECHAT_CORP_ID self.corp_secret = settings.WECHAT_CORP_SECRET self.access_token = None async def get_access_token(self) -> str: """获取访问令牌""" try: url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" params = { "corpid": self.corp_id, "corpsecret": self.corp_secret } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: result = await response.json() if result.get("errcode") == 0: self.access_token = result["access_token"] return self.access_token else: raise Exception(f"获取access_token失败: {result}") except Exception as e: logging.error(f"获取access_token异常: {str(e)}") return None async def code2session(self, js_code: str) -> Optional[Dict[str, Any]]: """ 小程序登录凭证校验 """ try: # 1. 获取 access_token access_token = await self.get_access_token() if not access_token: raise Exception("获取access_token失败") # 2. 调用 jscode2session 接口 url = "https://qyapi.weixin.qq.com/cgi-bin/miniprogram/jscode2session" params = { "access_token": access_token, "js_code": js_code, "grant_type": "authorization_code" } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: result = await response.json() if result.get("errcode") == 0: return { "userid": result.get("userid"), "pending_id": result.get("pending_id"), "session_key": result.get("session_key") } else: logging.error(f"code2session失败: {result}") return None except Exception as e: logging.error(f"code2session异常: {str(e)}") return None async def get_unionid_from_userid(self, userid: str) -> Optional[str]: """根据userid获取unionid""" try: access_token = await self.get_access_token() if not access_token: raise Exception("获取access_token失败") url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get" params = { "access_token": access_token, "external_userid": userid } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: result = await response.json() return result.get("unionid") except Exception as e: logging.error(f"get_unionid_from_userid异常: {str(e)}") return None async def unionid_to_external_userid(self, unionid: str, openid: str) -> Optional[str]: """根据unionid获取external_userid""" try: url = f"https://qyapi.weixin.qq.com/cgi-bin/idconvert/unionid_to_external_userid?access_token={await self.get_access_token()}" params = { "unionid": unionid, "openid": openid, "subject_type": 1 } async with aiohttp.ClientSession() as session: async with session.post(url, json=params) as response: result = await response.json() return result except Exception as e: logging.error(f"unionid_to_external_userid异常: {str(e)}") return None async def send_welcome_message(self, user_info: Dict[str, Any], group_id: str) -> bool: """发送欢迎消息""" try: # 1. 获取 access_token access_token = await self.get_access_token() if not access_token: logging.error("获取access_token失败") return False # 2. 发送欢迎消息 url = "https://qyapi.weixin.qq.com/cgi-bin/message/send" params = { "access_token": access_token, "chatid": group_id, "msgtype": "text", "text": { "content": f"""🥳 欢迎您进群,在群内可以享受📦【代取快递】跑腿服务。 ‼ 微信下单,快递到家 ‼ 🎁 新人礼包 𝟏 赠送𝟏𝟓张【𝟑元跑腿券】 𝟐 赠送𝟔枚鲜鸡蛋【首次下单】 ━ ━ ━ ━ ━🎊━ ━ ━ ━ ━ ↓点击↓小程序领券下单 &""" } } async with aiohttp.ClientSession() as session: async with session.post(url, json=params) as response: result = await response.json() if result.get("errcode") == 0: return True else: logging.error(f"发送欢迎消息失败: {result}") return False except Exception as e: logging.error(f"发送欢迎消息异常: {str(e)}") return False wecom_client = WecomClient()