import requests import json import time import logging from app.core.config import settings from typing import Dict, Any, Optional, List import aiohttp logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class WecomClient: """企业微信客户端""" def __init__(self): self.corp_id = settings.WECHAT_CORP_ID self.corp_secret = settings.WECHAT_CORP_SECRET self.access_token = None async def get_access_token(self) -> str: """获取访问令牌""" try: url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" params = { "corpid": self.corp_id, "corpsecret": self.corp_secret } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: result = await response.json() if result.get("errcode") == 0: self.access_token = result["access_token"] return self.access_token else: raise Exception(f"获取access_token失败: {result}") except Exception as e: logging.error(f"获取access_token异常: {str(e)}") return None async def code2session(self, js_code: str) -> Optional[Dict[str, Any]]: """ 小程序登录凭证校验 """ try: # 1. 获取 access_token access_token = await self.get_access_token() if not access_token: raise Exception("获取access_token失败") # 2. 调用 jscode2session 接口 url = "https://qyapi.weixin.qq.com/cgi-bin/miniprogram/jscode2session" params = { "access_token": access_token, "js_code": js_code, "grant_type": "authorization_code" } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: result = await response.json() if result.get("errcode") == 0: return { "userid": result.get("userid"), "pending_id": result.get("pending_id"), "session_key": result.get("session_key") } else: logging.error(f"code2session失败: {result}") return None except Exception as e: logging.error(f"code2session异常: {str(e)}") return None async def get_unionid_from_userid(self, userid: str) -> Optional[str]: """根据userid获取unionid""" try: access_token = await self.get_access_token() if not access_token: raise Exception("获取access_token失败") url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get" params = { "access_token": access_token, "external_userid": userid } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: result = await response.json() return result.get("unionid") except Exception as e: logging.error(f"get_unionid_from_userid异常: {str(e)}") return None async def unionid_to_external_userid(self, unionid: str, openid: str) -> Optional[str]: """根据unionid获取external_userid""" try: url = f"https://qyapi.weixin.qq.com/cgi-bin/idconvert/unionid_to_external_userid?access_token={await self.get_access_token()}" params = { "unionid": unionid, "openid": openid, "subject_type": 1 } async with aiohttp.ClientSession() as session: async with session.post(url, json=params) as response: result = await response.json() return result except Exception as e: logging.error(f"unionid_to_external_userid异常: {str(e)}") return None async def send_welcome_message(self, chat_id: str, user_id: str = None) -> bool: """发送欢迎消息 Args: chat_id: 群聊ID user_id: 用户ID,如果指定则发送私信,否则发送群消息 Returns: bool: 是否发送成功 """ try: # 1. 获取 access_token access_token = await self.get_access_token() if not access_token: logging.error("获取access_token失败") return False welcome_text = f"""🥳 欢迎您进群,在群内可以享受📦【代取快递】跑腿服务。 ‼ 微信下单,快递到家 ‼ 🎁 新人礼包 𝟏 赠送𝟏𝟓张【𝟑元跑腿券】 𝟐 赠送𝟔枚鲜鸡蛋【首次下单】 ━ ━ ━ ━ ━🎊━ ━ ━ ━ ━ ↓点击↓小程序领券下单 &""" # 2. 发送欢迎消息 if user_id: # 发送私信 url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/send_welcome_msg?access_token={access_token}" data = { "welcome_code": user_id, # 这里使用user_id作为临时的欢迎码 "text": { "content": welcome_text } } else: # 发送群消息 url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/send?access_token={access_token}" data = { "chatid": chat_id, "msgtype": "text", "text": { "content": welcome_text }, "safe": 0 } async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: result = await response.json() if result.get("errcode") == 0: return True else: logging.error(f"发送欢迎消息失败: {result}") return False except Exception as e: logging.error(f"发送欢迎消息异常: {str(e)}") return False async def get_external_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: """获取外部群聊信息 Args: chat_id: 群聊ID Returns: Dict: 群聊信息 """ try: access_token = await self.get_access_token() if not access_token: logging.error("获取access_token失败") return None url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/groupchat/get?access_token={access_token}" data = { "chat_id": chat_id } async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: result = await response.json() if result.get("errcode") == 0: return result.get("group_chat") else: logging.error(f"获取外部群聊信息失败: {result}") return None except Exception as e: logging.error(f"获取外部群聊信息异常: {str(e)}") return None async def get_external_contact_info(self, external_userid: str) -> Optional[Dict[str, Any]]: """获取外部联系人信息 Args: external_userid: 外部联系人ID Returns: Dict: 外部联系人信息 """ try: access_token = await self.get_access_token() if not access_token: logging.error("获取access_token失败") return None url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get?access_token={access_token}" params = { "external_userid": external_userid } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: result = await response.json() if result.get("errcode") == 0: return result.get("external_contact") else: logging.error(f"获取外部联系人信息失败: {result}") return None except Exception as e: logging.error(f"获取外部联系人信息异常: {str(e)}") return None async def handle_chat_change_event(self, chat_id: str, change_type: str, update_detail: str, join_user_id: str = None) -> bool: """处理群聊变更事件 Args: chat_id: 群聊ID change_type: 变更类型 create/update/dismiss update_detail: 变更详情 add_member/del_member/change_owner/change_name/change_notice join_user_id: 加入的用户ID Returns: bool: 处理是否成功 """ from app.models.wecom_external_chat import WecomExternalChatDB, WecomExternalChatMemberDB from app.models.wecom_external_chat import WecomExternalChatCreate, WecomExternalChatMemberCreate from app.models.database import SessionLocal try: db = SessionLocal() # 群创建事件 if change_type == "create": # 获取群聊信息 chat_info = await self.get_external_chat_info(chat_id) if not chat_info: return False # 保存群聊信息 chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first() if not chat_db: chat_create = WecomExternalChatCreate( chat_id=chat_id, name=chat_info.get("name"), owner=chat_info.get("owner"), member_count=len(chat_info.get("member_list", [])), notice=chat_info.get("notice") ) chat_db = WecomExternalChatDB(**chat_create.dict()) db.add(chat_db) db.commit() # 保存群成员信息 for member in chat_info.get("member_list", []): user_id = member.get("userid") member_type = member.get("type") # 检查成员是否已存在 member_db = db.query(WecomExternalChatMemberDB).filter( WecomExternalChatMemberDB.chat_id == chat_id, WecomExternalChatMemberDB.user_id == user_id ).first() if not member_db: # 获取外部联系人详情 user_info = None if member_type == "EXTERNAL": user_info = await self.get_external_contact_info(user_id) member_create = WecomExternalChatMemberCreate( chat_id=chat_id, user_id=user_id, type=member_type, name=user_info.get("name") if user_info else None, unionid=user_info.get("unionid") if user_info else None ) member_db = WecomExternalChatMemberDB(**member_create.dict()) db.add(member_db) db.commit() return True # 群更新事件 - 添加成员 elif change_type == "update" and update_detail == "add_member": if not join_user_id: return False # 检查群聊是否存在 chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first() if not chat_db: # 获取群聊信息并创建 chat_info = await self.get_external_chat_info(chat_id) if chat_info: chat_create = WecomExternalChatCreate( chat_id=chat_id, name=chat_info.get("name"), owner=chat_info.get("owner"), member_count=len(chat_info.get("member_list", [])), notice=chat_info.get("notice") ) chat_db = WecomExternalChatDB(**chat_create.dict()) db.add(chat_db) db.commit() # 检查成员是否已存在 member_db = db.query(WecomExternalChatMemberDB).filter( WecomExternalChatMemberDB.chat_id == chat_id, WecomExternalChatMemberDB.user_id == join_user_id ).first() if not member_db: # 判断是内部成员还是外部联系人 member_type = "EXTERNAL" # 默认为外部联系人 # 获取外部联系人详情 user_info = await self.get_external_contact_info(join_user_id) member_create = WecomExternalChatMemberCreate( chat_id=chat_id, user_id=join_user_id, type=member_type, name=user_info.get("name") if user_info else None, unionid=user_info.get("unionid") if user_info else None ) member_db = WecomExternalChatMemberDB(**member_create.dict()) db.add(member_db) db.commit() # 发送欢迎消息 if member_type == "EXTERNAL": await self.send_welcome_message(chat_id) # 更新发送欢迎消息状态 member_db.welcome_sent = True db.commit() # 更新群成员数量 if chat_db: chat_db.member_count = chat_db.member_count + 1 db.commit() return True # 群更新事件 - 移除成员 elif change_type == "update" and update_detail == "del_member": if not join_user_id: return False # 删除成员记录 member_db = db.query(WecomExternalChatMemberDB).filter( WecomExternalChatMemberDB.chat_id == chat_id, WecomExternalChatMemberDB.user_id == join_user_id ).first() if member_db: db.delete(member_db) db.commit() # 更新群成员数量 chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first() if chat_db: chat_db.member_count = max(0, chat_db.member_count - 1) db.commit() return True # 群解散事件 elif change_type == "dismiss": # 标记群聊为非活跃 chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first() if chat_db: chat_db.is_active = False db.commit() return True return False except Exception as e: logging.error(f"处理群聊变更事件异常: {str(e)}") if 'db' in locals(): db.close() return False finally: if 'db' in locals(): db.close() wecom_client = WecomClient()