diff --git a/.gitignore b/.gitignore index 50e2816..8e8a685 100644 --- a/.gitignore +++ b/.gitignore @@ -39,4 +39,8 @@ ENV/ # Local development .env.local -*.db \ No newline at end of file +*.db + +# sqlite +*.sqlite +*.sqlite3 \ No newline at end of file diff --git a/app/api/endpoints/merchant_order.py b/app/api/endpoints/merchant_order.py index f9a5c7b..8a855b8 100644 --- a/app/api/endpoints/merchant_order.py +++ b/app/api/endpoints/merchant_order.py @@ -310,6 +310,9 @@ async def complete_order( if not order: return error_response(code=404, message="订单不存在") + if merchant_user.userid != order.merchant.user_id: + return error_response(code=403, message="不是你的订单,无权限完成") + if order.status not in [MerchantOrderStatus.DELIVERING, MerchantOrderStatus.PICKUP_READY]: return error_response(code=400, message="订单状态不正确") @@ -326,24 +329,31 @@ async def complete_order( order_id = order.order_id ) + # 对商家进行结算 + account_manager = AccountManager(db) + settlement_amount = float(order.MerchantProductDB.settlement_amount) * order.MerchantOrderDB.qty + if settlement_amount > 0: + account_manager.change_balance( + user_id=order.merchant.user_id, + amount=settlement_amount, + description=order.merchant_product.name, + transaction_id=order.order_id + ) - user = db.query(UserDB).filter( - UserDB.userid == order.user_id - ).first() - - product = db.query(MerchantProductDB).filter( - MerchantProductDB.id == order.merchant_product_id - ).first() - if product: - product.sold_total += order.qty + # 更新商品销量 + if order.merchant_product: + order.merchant_product.sold_total += order.qty db.commit() # 发送商家订单完成消息 + user = db.query(UserDB).filter( + UserDB.userid == order.user_id + ).first() if user and user.mp_openid: data={ "character_string7": order_id, - "thing5": product.name, + "thing5": order.merchant_product.name, "character_string24": order.qty, "time10": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } @@ -467,61 +477,53 @@ async def verify_order( ): """核销订单""" # 查询订单及相关信息 - order = db.query( - MerchantOrderDB, - MerchantProductDB, - MerchantDB - ).join( - MerchantProductDB, - MerchantOrderDB.merchant_product_id == MerchantProductDB.id - ).join( - MerchantDB, - MerchantProductDB.merchant_id == MerchantDB.id - ).filter( + order = db.query(MerchantOrderDB).filter( MerchantOrderDB.order_verify_code == request.verify_code, - MerchantOrderDB.status == MerchantOrderStatus.PICKUP_READY, - MerchantDB.user_id == merchant_user.userid + MerchantOrderDB.status == MerchantOrderStatus.PICKUP_READY ).first() if not order: return error_response(code=404, message="订单不存在或已核销") + if merchant_user.userid != order.merchant.user_id: + return error_response(code=403, message="不是你的订单,无权限核销") + try: # 更新核销时间和核销用户 - order.MerchantOrderDB.verify_time = datetime.now(timezone.utc) - order.MerchantOrderDB.verify_user_id = merchant_user.userid - order.MerchantOrderDB.status = MerchantOrderStatus.COMPLETED + order.verify_time = datetime.now(timezone.utc) + order.verify_user_id = merchant_user.userid + order.status = MerchantOrderStatus.COMPLETED # 如果有积分奖励,赠送积分 - if order.MerchantProductDB.gift_points > 0: + if order.gift_points > 0: point_manager = PointManager(db) point_manager.add_points( - user_id=order.MerchantOrderDB.user_id, - points=order.MerchantProductDB.gift_points, - description=f"团购券核销奖励", - order_id=order.MerchantOrderDB.order_id + user_id=order.user_id, + points=order.gift_points, + description=order.merchant_product.name, + order_id=order.order_id ) # 对商家进行结算 account_manager = AccountManager(db) - settlement_amount = float(order.MerchantProductDB.settlement_amount) + settlement_amount = float(order.settlement_amount) * order.qty if settlement_amount > 0: account_manager.change_balance( - user_id=order.MerchantDB.user_id, + user_id=order.merchant.user_id, amount=settlement_amount, - description=f"团购券核销", - transaction_id=order.MerchantOrderDB.order_id + description=order.merchant_product.name, + transaction_id=order.order_id ) # 更新商品销量 - if order.MerchantProductDB: - order.MerchantProductDB.sold_total += order.MerchantOrderDB.qty + if order.merchant_product: + order.merchant_product.sold_total += order.qty db.commit() return success_response( message="核销成功", - data=MerchantOrderInfo.model_validate(order.MerchantOrderDB) + data=MerchantOrderInfo.model_validate(order) ) except Exception as e: db.rollback() diff --git a/app/api/endpoints/merchant_product.py b/app/api/endpoints/merchant_product.py index cc6bdc8..900f64f 100644 --- a/app/api/endpoints/merchant_product.py +++ b/app/api/endpoints/merchant_product.py @@ -8,7 +8,7 @@ from app.models.merchant_product import ( MerchantProductInfo ) from app.models.database import get_db -from app.api.deps import get_admin_user +from app.api.deps import get_admin_user, get_merchant_user from app.models.user import UserDB from app.core.response import success_response, error_response, ResponseModel from app.models.merchant import MerchantDB @@ -77,15 +77,42 @@ async def update_product( db.rollback() return error_response(code=500, message=f"更新失败: {str(e)}") +@router.get("/merchant_product_list", response_model=ResponseModel) +async def merchant_product_list( + db: Session = Depends(get_db), + current_user: UserDB = Depends(get_merchant_user) +): + """获取商家产品列表(商家端)""" + + merchant = db.query(MerchantDB).filter(MerchantDB.user_id == current_user.userid).first() + if not merchant: + return error_response(code=404, message="商家不存在") + + products = db.query(MerchantProductDB).filter(MerchantProductDB.merchant_id == merchant.id).all() + + product_list = [] + for product in products: + product_list.append( + { + **MerchantProductInfo.model_validate(product).model_dump(), + "merchant": MerchantInfo.model_validate(merchant).model_dump(), + "total_sales_amount": 0, + "total_profit_amount": 0 + } + ) + + return success_response(data=product_list) + + @router.get("/list", response_model=ResponseModel) async def list_merchant_products( user_id: Optional[int] = None, community_id: Optional[int] = None, skip: int = 0, limit: int = 20, - db: Session = Depends(get_db) + db: Session = Depends(get_db), ): - """获取商品列表""" + """获取商品列表(用户端)""" # 联表查询商家信息 query = db.query(MerchantProductDB).options( joinedload(MerchantProductDB.merchant) diff --git a/app/api/endpoints/order.py b/app/api/endpoints/order.py index ac0ea51..d6135fe 100644 --- a/app/api/endpoints/order.py +++ b/app/api/endpoints/order.py @@ -639,10 +639,10 @@ async def deliveryman_get_order_status_count( ) # 待接单的订单,只显示今天以及今天以前的订单 - if status == OrderStatus.CREATED: - query = query.filter( - ShippingOrderDB.delivery_date <= datetime.now().date() - ) + # if status == OrderStatus.CREATED: + # query = query.filter( + # ShippingOrderDB.delivery_date <= datetime.now().date() + # ) count = query.count() result.append({ @@ -699,10 +699,10 @@ async def deliveryman_get_community_building_order_count( ) # 待接单的订单,只显示今天以及今天以前的订单 - if OrderStatus.CREATED in status: - query = query.filter( - ShippingOrderDB.delivery_date <= datetime.now().date() - ) + # if OrderStatus.CREATED in status: + # query = query.filter( + # ShippingOrderDB.delivery_date <= datetime.now().date() + # ) building_order_count = query.group_by( ShippingOrderDB.address_community_building_id @@ -958,8 +958,8 @@ async def deliveryman_orders( query = query.filter(ShippingOrderDB.deliveryman_user_id == deliveryman.userid) # 待接单的订单,只显示今天以及今天以前的订单 - if OrderStatus.CREATED in statuses: - query = query.filter(ShippingOrderDB.delivery_date <= datetime.now().date()) + # if OrderStatus.CREATED in statuses: + # query = query.filter(ShippingOrderDB.delivery_date <= datetime.now().date()) # 楼栋筛选 if building_id: @@ -1242,21 +1242,23 @@ async def deliveryman_complete_order( db.commit() - # 如果当前订单是首单,如果有邀请人,给邀请人发放优惠券 - if order.is_first_order and order_user.referral_code: + # 有邀请人,给邀请人积分奖励 + if order_user.referral_code: # 查询邀请人 invite_user = db.query(UserDB).filter( UserDB.user_code == order_user.referral_code ).first() if invite_user: - expire_time = datetime.now() + timedelta(days=settings.FIRST_ORDER_REFERRAL_COUPON_EXPIRE_DAYS) - manager = CouponManager(db) - manager.add_coupon( + points = settings.FIRST_ORDER_REFERRAL_POINT if order.is_first_order else settings.COMMON_ORDER_REFERRAL_POINT + desc = f"蜜友首单奖励" if order.is_first_order else f"蜜友下单奖励" + # 邀请人赠送积分 + point_manager = PointManager(db) + point_manager.add_points( user_id=invite_user.userid, - coupon_id=settings.FIRST_ORDER_REFERRAL_COUPON_ID, - expire_time=expire_time, - count=settings.FIRST_ORDER_REFERRAL_COUPON_COUNT + points=points, + description=desc, + order_id=order.orderid ) # 发送企业微信消息 @@ -1505,10 +1507,12 @@ async def get_orders( "coupon_discount_amount": order.coupon_discount_amount, "point_discount_amount": order.point_discount_amount, "cancel_reason": order.cancel_reason, + "additional_fee_amount": order.additional_fee_amount, "is_delivery_cancel": order.cancel_user_id == order.deliveryman_user_id, "complete_images": order.optimized_complete_images, "completed_time": order.completed_time, "final_amount": order.final_amount, + "is_first_order": order.is_first_order, "packages": package_list, "address": { "name": order.address_customer_name, diff --git a/app/api/endpoints/wecom.py b/app/api/endpoints/wecom.py index 1d86d5a..1e657c1 100644 --- a/app/api/endpoints/wecom.py +++ b/app/api/endpoints/wecom.py @@ -15,11 +15,16 @@ from app.models.user import UserDB from app.api.deps import get_current_user from fastapi import Depends from sqlalchemy.orm import Session -from app.core.response import error_response, success_response +from app.core.response import error_response, success_response, ResponseModel from pydantic import BaseModel +from app.models.wecom_external_chat import WecomExternalChatDB, WecomExternalChatInfo, WecomExternalChatMemberDB, WecomExternalChatMemberInfo +from datetime import datetime router = APIRouter() +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + def decrypt_msg(msg_encrypt: str, signature: str, timestamp: str, nonce: str) -> typing.Optional[str]: """解密企业微信消息""" try: @@ -61,7 +66,7 @@ def decrypt_msg(msg_encrypt: str, signature: str, timestamp: str, nonce: str) -> return xml_content.decode() except Exception as e: - logging.exception("解密企业微信消息失败") + logger.exception("解密企业微信消息失败") return None @router.get("") @@ -81,7 +86,7 @@ async def verify_callback( return Response(content=decrypted_str, media_type="text/plain") except Exception as e: - logging.exception("验证回调配置失败") + logger.exception("验证回调配置失败") return Response(status_code=403) @router.post("") @@ -109,43 +114,102 @@ async def wechat_corp_callback( # 解析解密后的XML msg_root = ET.fromstring(decrypted_msg) - print(f"企业微信回调消息:{decrypted_msg}") + logger.info(f"企业微信回调消息:{decrypted_msg}") # 解析基本信息 msg_type = msg_root.find('MsgType').text - print(f"msg_type: {msg_type}") + logger.info(f"msg_type: {msg_type}") # 处理事件消息 if msg_type == 'event': event = msg_root.find('Event').text - print(f"event: {event}") + logger.info(f"event: {event}") - # 处理进群事件 + # 处理外部群聊变更事件 if event == 'change_external_chat': chat_id = msg_root.find('ChatId').text change_type = msg_root.find('ChangeType').text update_detail = msg_root.find('UpdateDetail').text - - print(f"chat_id: {chat_id}") - print(f"change_type: {change_type}") - print(f"update_detail: {update_detail}") - # 处理进群事件 - + join_user_id = None + quit_user_ids = [] + + # 处理成员加入事件 if update_detail == 'add_member': - print(f"发送欢迎消息") - # 发送欢迎消息 - # await wecom_client.send_welcome_message(chat_id) + logger.info(f"有新成员加入群聊") + join_scene_elem = msg_root.find('JoinScene') + join_scene = int(join_scene_elem.text) if join_scene_elem is not None else 0 + logger.info(f"加入场景: {join_scene}") + + # 获取加入成员列表 + mem_change_list = msg_root.find('MemChangeList') + if mem_change_list is not None: + for item in mem_change_list.findall('Item'): + join_user_id = item.text + if join_user_id: + logger.info(f"从MemChangeList中获取到新加入成员: {join_user_id}") + # 处理群聊变更事件 - 添加单个成员 + await wecom_client.handle_chat_change_event( + chat_id=chat_id, + change_type=change_type, + update_detail=update_detail, + join_user_id=join_user_id + ) + + # 兼容旧格式,如果找不到MemChangeList,尝试JoinUserID + elif msg_root.find('JoinUserID') is not None: + join_user_id = msg_root.find('JoinUserID').text + if join_user_id: + logger.info(f"从JoinUserID中获取到新加入成员: {join_user_id}") + # 处理群聊变更事件 - 添加单个成员 + await wecom_client.handle_chat_change_event( + chat_id=chat_id, + change_type=change_type, + update_detail=update_detail, + join_user_id=join_user_id + ) + + # 处理成员离开事件 + elif update_detail == 'del_member': + logger.info(f"有成员离开群聊") + quit_scene_elem = msg_root.find('QuitScene') + quit_scene = int(quit_scene_elem.text) if quit_scene_elem is not None else 0 + + # 获取离开成员列表 + mem_change_list = msg_root.find('MemChangeList') + if mem_change_list is not None: + for item in mem_change_list.findall('Item'): + quit_user_id = item.text + if quit_user_id: + quit_user_ids.append(quit_user_id) + # 处理成员离开 + await wecom_client.handle_chat_change_event( + chat_id=chat_id, + change_type=change_type, + update_detail=update_detail, + join_user_id=quit_user_id # 复用这个参数名来表示离开的用户ID + ) + + logger.info(f"离开场景: {quit_scene}, 离开成员: {quit_user_ids}") + logger.info(f"chat_id: {chat_id}, change_type: {change_type}, update_detail: {update_detail}, join_user_id: {join_user_id}, quit_user_ids: {quit_user_ids}") + + # 处理其他群聊变更事件(不是add_member和del_member) + if update_detail != 'add_member' and update_detail != 'del_member': + # 其他群聊变更事件 + await wecom_client.handle_chat_change_event( + chat_id=chat_id, + change_type=change_type, + update_detail=update_detail + ) return Response(content="success", media_type="text/plain") except Exception as e: - logging.exception("处理企业微信回调消息异常") - return Response(content="success", media_type="text/plain") - + logger.exception("处理企业微信回调消息异常") + return Response(content="success", media_type="text/plain") class UnionidToExternalUseridRequest(BaseModel): unionid: str @@ -158,4 +222,316 @@ async def unionid_to_external_userid( """根据unionid获取external_userid""" result = await wecom_client.unionid_to_external_userid(request.unionid, request.openid) print(f"根据unionid获取external_userid结果: {result}") - return success_response(message="获取external_userid成功", data=result) \ No newline at end of file + return success_response(message="获取external_userid成功", data=result) + +@router.get("/external-chats", response_model=ResponseModel) +async def get_external_chats( + db: Session = Depends(get_db), + current_user: UserDB = Depends(get_current_user) +): + """获取企业微信外部群聊列表""" + try: + # 检查是否为管理员 + if current_user.userid != settings.PLATFORM_USER_ID: + return error_response(code=403, message="权限不足") + + # 获取群聊列表 + chats = db.query(WecomExternalChatDB).filter( + WecomExternalChatDB.is_active == True + ).order_by(WecomExternalChatDB.update_time.desc()).all() + + # 转换为Pydantic模型 + chat_list = [WecomExternalChatInfo.model_validate(chat) for chat in chats] + + return success_response(message="获取群聊列表成功", data=chat_list) + except Exception as e: + logger.exception("获取群聊列表异常") + return error_response(code=500, message=f"获取群聊列表失败: {str(e)}") + +@router.get("/external-chats/{chat_id}/members", response_model=ResponseModel) +async def get_external_chat_members( + chat_id: str, + db: Session = Depends(get_db), + current_user: UserDB = Depends(get_current_user) +): + """获取企业微信外部群聊成员列表""" + try: + # 检查是否为管理员 + if current_user.userid != settings.PLATFORM_USER_ID: + return error_response(code=403, message="权限不足") + + # 检查群聊是否存在 + chat = db.query(WecomExternalChatDB).filter( + WecomExternalChatDB.chat_id == chat_id + ).first() + + if not chat: + return error_response(code=404, message="群聊不存在") + + # 获取成员列表 + members = db.query(WecomExternalChatMemberDB).filter( + WecomExternalChatMemberDB.chat_id == chat_id + ).order_by(WecomExternalChatMemberDB.join_time.desc()).all() + + # 转换为Pydantic模型 + member_list = [WecomExternalChatMemberInfo.model_validate(member) for member in members] + + return success_response(message="获取群聊成员列表成功", data=member_list) + except Exception as e: + logger.exception("获取群聊成员列表异常") + return error_response(code=500, message=f"获取群聊成员列表失败: {str(e)}") + +@router.post("/sync-chat/{chat_id}", response_model=ResponseModel) +async def sync_external_chat( + chat_id: str, + db: Session = Depends(get_db), + current_user: UserDB = Depends(get_current_user) +): + """同步企业微信外部群聊信息""" + try: + # 检查是否为管理员 + if current_user.userid != settings.PLATFORM_USER_ID: + return error_response(code=403, message="权限不足") + + # 获取群聊信息 + result = await wecom_client.handle_chat_change_event( + chat_id=chat_id, + change_type="create", + update_detail="" + ) + + if not result: + return error_response(code=500, message="同步群聊信息失败") + + return success_response(message="同步群聊信息成功") + except Exception as e: + logger.exception("同步群聊信息异常") + return error_response(code=500, message=f"同步群聊信息失败: {str(e)}") + +@router.get("/chat-dashboard") +async def chat_dashboard( + current_user: UserDB = Depends(get_current_user), + db: Session = Depends(get_db) +): + """显示企业微信外部群聊信息的HTML页面""" + # 检查是否为管理员 + if current_user.userid != settings.PLATFORM_USER_ID: + return Response(content="权限不足", media_type="text/html") + + # 获取群聊列表 + chats = db.query(WecomExternalChatDB).filter( + WecomExternalChatDB.is_active == True + ).order_by(WecomExternalChatDB.update_time.desc()).all() + + # 生成HTML + html = """ + + + + 企业微信外部群聊信息 + + + + + + +
+

企业微信外部群聊信息

+ +
+
+
总群聊数
+
-
+
+
+
总成员数
+
-
+
+
+
活跃群聊数
+
-
+
+
+ """ + + if not chats: + html += "

暂无群聊信息

" + else: + for chat in chats: + html += f""" +
+

{chat.name or '未命名群聊'} ({chat.chat_id})

+

+ 创建时间: {chat.create_time.strftime('%Y-%m-%d %H:%M:%S')}
+ 更新时间: {chat.update_time.strftime('%Y-%m-%d %H:%M:%S') if chat.update_time else '无'}
+ 成员数量: {chat.member_count}
+ 群主: {chat.owner or '未知'}
+ 公告: {chat.notice or '无'}
+

+ + +
+
+ """ + + html += """ +
+ + + """ + + return Response(content=html, media_type="text/html") + +@router.get("/chat-stats", response_model=ResponseModel) +async def get_chat_stats( + db: Session = Depends(get_db), + current_user: UserDB = Depends(get_current_user) +): + """获取企业微信外部群聊统计数据""" + try: + # 检查是否为管理员 + if current_user.userid != settings.PLATFORM_USER_ID: + return error_response(code=403, message="权限不足") + + # 计算统计数据 + total_chats = db.query(WecomExternalChatDB).count() + active_chats = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.is_active == True).count() + total_members = db.query(WecomExternalChatMemberDB).count() + + # 获取今日新增成员数 + today = datetime.now().date() + today_start = datetime.combine(today, datetime.min.time()) + today_members = db.query(WecomExternalChatMemberDB).filter( + WecomExternalChatMemberDB.join_time >= today_start + ).count() + + # 返回统计数据 + stats = { + "total_chats": total_chats, + "active_chats": active_chats, + "total_members": total_members, + "today_members": today_members + } + + return success_response(message="获取统计数据成功", data=stats) + except Exception as e: + logger.exception("获取统计数据异常") + return error_response(code=500, message=f"获取统计数据失败: {str(e)}") \ No newline at end of file diff --git a/app/core/config.py b/app/core/config.py index e88ad32..0a69037 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -29,10 +29,11 @@ class Settings(BaseSettings): ORDER_EXTRA_PACKAGE_PRICE: float = 0.5 # 额外包裹费用 ORDER_EXTRA_PACKAGE_THRESHOLD: int = 5 # 额外收费阈值 - # 邀请新人赠送优惠券ID - FIRST_ORDER_REFERRAL_COUPON_ID: int = 1 - FIRST_ORDER_REFERRAL_COUPON_COUNT: int = 1 - FIRST_ORDER_REFERRAL_COUPON_EXPIRE_DAYS: int = 7 + # 邀请新人赠送积分 + FIRST_ORDER_REFERRAL_POINT: int = 5 + COMMON_ORDER_REFERRAL_POINT: int = 1 + + # JWT 配置 SECRET_KEY: str = "s10GmiRMmplfYWXYZLSsE3X36Ld4gVZxHgAcdqFGC20v3llv7UdOeWLBEEP3e40p" @@ -133,6 +134,12 @@ class Settings(BaseSettings): # 千问 API 配置 QWEN_API_KEY: str = "sk-caa199589f1c451aaac471fad2986e28" QWEN_API_URL: str = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" + + # UniSMS 配置 + UNISMS_ACCESS_KEY_ID: str = "xxxxxx" # 替换为您的 UniSMS Access Key ID + UNISMS_ACCESS_KEY_SECRET: str = "xxxxxx" # 替换为您的 UniSMS Access Key Secret + UNISMS_SIGNATURE: str = "蜂快到家" # 短信签名 + UNISMS_VERIFICATION_TEMPLATE_ID: str = "pub_verif_8dgk" # 验证码短信模板ID class Config: case_sensitive = True @@ -187,10 +194,6 @@ class ProdSettings(Settings): REDIS_PASSWORD: str = "redis_tjcZif" VERIFICATION_CODE_EXPIRE_SECONDS: int = 300 - FIRST_ORDER_REFERRAL_COUPON_ID: int = 2 - FIRST_ORDER_REFERRAL_COUPON_COUNT: int = 1 - FIRST_ORDER_REFERRAL_COUPON_EXPIRE_DAYS: int = 3 - class Config: env_file = ".env.prod" diff --git a/app/core/unisms.py b/app/core/unisms.py new file mode 100644 index 0000000..b5308ea --- /dev/null +++ b/app/core/unisms.py @@ -0,0 +1,144 @@ +import json +import time +import hmac +import hashlib +import base64 +import uuid +import logging +import aiohttp +from typing import List, Dict, Any, Optional +from app.core.config import settings + +class UniSMSClient: + """UniSMS短信客户端""" + + def __init__(self): + self.access_key_id = settings.UNISMS_ACCESS_KEY_ID + self.access_key_secret = settings.UNISMS_ACCESS_KEY_SECRET + self.base_url = "https://uni.apistd.com" + self.endpoint = "/2022-12-28/sms/send" + self.signature_method = "HMAC-SHA256" + self.signature_version = "1" + + def _generate_signature(self, string_to_sign: str) -> str: + """生成签名""" + key = self.access_key_secret.encode('utf-8') + message = string_to_sign.encode('utf-8') + sign = hmac.new(key, message, digestmod=hashlib.sha256).digest() + return base64.b64encode(sign).decode('utf-8') + + def _build_request_headers(self, body: Dict[str, Any]) -> Dict[str, str]: + """构建请求头""" + timestamp = str(int(time.time())) + nonce = str(uuid.uuid4()).replace('-', '') + content_md5 = hashlib.md5(json.dumps(body).encode('utf-8')).hexdigest() + + # 构建待签名字符串 + string_to_sign = "\n".join([ + self.endpoint, + timestamp, + nonce, + content_md5 + ]) + + # 生成签名 + signature = self._generate_signature(string_to_sign) + + # 构建请求头 + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "X-Uni-Timestamp": timestamp, + "X-Uni-Nonce": nonce, + "X-Uni-Content-MD5": content_md5, + "X-Uni-Signature-Method": self.signature_method, + "X-Uni-Signature-Version": self.signature_version, + "X-Uni-Signature": signature, + "X-Uni-AccessKeyId": self.access_key_id, + } + + return headers + + async def send_sms( + self, + to: str, + signature: str, + template_id: str, + template_data: Dict[str, Any] = None + ) -> Dict[str, Any]: + """ + 发送短信 + + Args: + to: 接收短信的手机号码 + signature: 短信签名 + template_id: 短信模板ID + template_data: 短信模板参数 + + Returns: + Dict[str, Any]: 发送结果 + """ + try: + # 构建请求体 + body = { + "to": to, + "signature": signature, + "templateId": template_id, + } + + if template_data: + body["templateData"] = template_data + + # 构建请求头 + headers = self._build_request_headers(body) + + # 发送请求 + url = f"{self.base_url}{self.endpoint}" + + async with aiohttp.ClientSession() as session: + async with session.post(url, json=body, headers=headers) as response: + result = await response.json() + logging.info(f"UniSMS响应: {result}") + + if result.get("status") != "success": + logging.error(f"发送短信失败: {result}") + return { + "success": False, + "error": result.get("message", "发送短信失败") + } + + return { + "success": True, + "message_id": result.get("data", {}).get("messageId", ""), + "fee": result.get("data", {}).get("fee", 0) + } + + except Exception as e: + logging.error(f"发送短信异常: {str(e)}") + return { + "success": False, + "error": f"发送短信异常: {str(e)}" + } + + async def send_verification_code(self, phone: str, code: str) -> Dict[str, Any]: + """ + 发送验证码短信 + + Args: + phone: 手机号码 + code: 验证码 + + Returns: + Dict[str, Any]: 发送结果 + """ + # 使用验证码短信模板 + template_id = settings.UNISMS_VERIFICATION_TEMPLATE_ID + signature = settings.UNISMS_SIGNATURE + template_data = {"code": code} + + return await self.send_sms( + to=phone, + signature=signature, + template_id=template_id, + template_data=template_data + ) \ No newline at end of file diff --git a/app/core/wecomclient.py b/app/core/wecomclient.py index b5393b6..b9f5a1d 100644 --- a/app/core/wecomclient.py +++ b/app/core/wecomclient.py @@ -5,6 +5,10 @@ import logging from app.core.config import settings from typing import Dict, Any, Optional, List import aiohttp + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + class WecomClient: """企业微信客户端""" @@ -32,7 +36,7 @@ class WecomClient: else: raise Exception(f"获取access_token失败: {result}") except Exception as e: - logging.error(f"获取access_token异常: {str(e)}") + logger.error(f"获取access_token异常: {str(e)}") return None async def code2session(self, js_code: str) -> Optional[Dict[str, Any]]: @@ -64,11 +68,11 @@ class WecomClient: "session_key": result.get("session_key") } else: - logging.error(f"code2session失败: {result}") + logger.error(f"code2session失败: {result}") return None except Exception as e: - logging.error(f"code2session异常: {str(e)}") + logger.error(f"code2session异常: {str(e)}") return None async def get_unionid_from_userid(self, userid: str) -> Optional[str]: @@ -90,7 +94,7 @@ class WecomClient: return result.get("unionid") except Exception as e: - logging.error(f"get_unionid_from_userid异常: {str(e)}") + logger.error(f"get_unionid_from_userid异常: {str(e)}") return None async def unionid_to_external_userid(self, unionid: str, openid: str) -> Optional[str]: @@ -109,48 +113,365 @@ class WecomClient: return result except Exception as e: - logging.error(f"unionid_to_external_userid异常: {str(e)}") + logger.error(f"unionid_to_external_userid异常: {str(e)}") return None - async def send_welcome_message(self, chat_id: str) -> bool: - """发送欢迎消息""" + async def get_external_chat_info(self, chat_id: str) -> Optional[Dict[str, Any]]: + """获取外部群聊信息 + + Args: + chat_id: 群聊ID + + Returns: + Dict: 群聊信息 + """ try: - # 1. 获取 access_token access_token = await self.get_access_token() if not access_token: - logging.error("获取access_token失败") - return False + logger.error("获取access_token失败") + return None - # 2. 发送欢迎消息 - url = f"https://qyapi.weixin.qq.com/cgi-bin/appchat/send?access_token={access_token}" + url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/groupchat/get?access_token={access_token}" data = { - "chatid": chat_id, - "msgtype": "text", - "text": { - "content": f"""🥳 欢迎您进群,在群内可以享受📦【代取快递】跑腿服务。 - - ‼ 微信下单,快递到家 ‼ - - 🎁 新人礼包 - 𝟏 赠送𝟏𝟓张【𝟑元跑腿券】 - 𝟐 赠送𝟔枚鲜鸡蛋【首次下单】 - ━ ━ ━ ━ ━🎊━ ━ ━ ━ ━ - ↓点击↓小程序领券下单 &""" - }, - "safe": 0 + "chat_id": chat_id } - + async with aiohttp.ClientSession() as session: async with session.post(url, json=data) as response: result = await response.json() if result.get("errcode") == 0: - return True + return result.get("group_chat") else: - logging.error(f"发送欢迎消息失败: {result}") - return False - + logger.error(f"获取外部群聊信息失败: {result}") + return None except Exception as e: - logging.error(f"发送欢迎消息异常: {str(e)}") + logger.error(f"获取外部群聊信息异常: {str(e)}") + return None + + async def get_external_contact_info(self, external_userid: str) -> Optional[Dict[str, Any]]: + """获取外部联系人信息 + + Args: + external_userid: 外部联系人ID + + Returns: + Dict: 外部联系人信息 + """ + try: + access_token = await self.get_access_token() + if not access_token: + logger.error("获取access_token失败") + return None + + url = f"https://qyapi.weixin.qq.com/cgi-bin/externalcontact/get?access_token={access_token}" + params = { + "external_userid": external_userid + } + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + result = await response.json() + if result.get("errcode") == 0: + logger.info(f"获取外部联系人信息成功: {result}") + return result.get("external_contact") + else: + logger.error(f"获取外部联系人信息失败: {result}") + return None + except Exception as e: + logger.error(f"获取外部联系人信息异常: {str(e)}") + return None + + async def get_internal_user_info(self, user_id: str) -> Optional[Dict[str, Any]]: + """获取企业内部成员信息 + + Args: + user_id: 企业成员userid + + Returns: + Dict: 企业成员信息 + """ + try: + access_token = await self.get_access_token() + if not access_token: + logger.error("获取access_token失败") + return None + + url = f"https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token={access_token}&userid={user_id}" + + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + result = await response.json() + if result.get("errcode") == 0: + logger.info(f"获取内部成员信息成功: {result}") + return result + else: + logger.error(f"获取内部成员信息失败: {result}") + return None + except Exception as e: + logger.error(f"获取内部成员信息异常: {str(e)}") + return None + + async def handle_chat_change_event(self, chat_id: str, change_type: str, update_detail: str, join_user_id: str = None) -> bool: + """处理群聊变更事件 + + Args: + chat_id: 群聊ID + change_type: 变更类型 create/update/dismiss + update_detail: 变更详情 add_member/del_member/change_owner/change_name/change_notice + join_user_id: 加入/离开的用户ID + + Returns: + bool: 处理是否成功 + """ + from app.models.wecom_external_chat import WecomExternalChatDB, WecomExternalChatMemberDB + from app.models.wecom_external_chat import WecomExternalChatCreate, WecomExternalChatMemberCreate + from app.models.database import SessionLocal + + try: + db = SessionLocal() + + # 群创建事件 + if change_type == "create": + # 获取群聊信息 + chat_info = await self.get_external_chat_info(chat_id) + if not chat_info: + logger.warning(f"无法获取群聊信息: {chat_id}") + return False + + logger.info(f"获取到群聊信息: {chat_info}") + + # 保存群聊信息 + chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first() + if not chat_db: + chat_create = WecomExternalChatCreate( + chat_id=chat_id, + name=chat_info.get("name"), + owner=chat_info.get("owner"), + member_count=len(chat_info.get("member_list", [])), + notice=chat_info.get("notice") + ) + chat_db = WecomExternalChatDB(**chat_create.model_dump()) + db.add(chat_db) + db.commit() + + # 保存群成员信息 + for member in chat_info.get("member_list", []): + user_id = member.get("userid") + # 处理数字类型: 1=企业成员, 2=外部联系人 + member_type_num = member.get("type") + if member_type_num == 1: + member_type = "INTERNAL" + elif member_type_num == 2: + member_type = "EXTERNAL" + else: + member_type = "EXTERNAL" # 默认为外部联系人 + + logger.info(f"成员类型: {member_type}(原始值:{member_type_num}), 成员ID: {user_id}") + + # 检查成员是否已存在 + member_db = db.query(WecomExternalChatMemberDB).filter( + WecomExternalChatMemberDB.chat_id == chat_id, + WecomExternalChatMemberDB.user_id == user_id + ).first() + + if not member_db: + # 获取成员详情 - 根据成员类型调用不同API + user_info = None + + if member_type == "EXTERNAL": + try: + user_info = await self.get_external_contact_info(user_id) + logger.info(f"获取到外部联系人详情: {user_info}") + except Exception as e: + logger.warning(f"获取外部联系人信息失败: {user_id}, 错误: {str(e)}") + elif member_type == "INTERNAL": + try: + user_info = await self.get_internal_user_info(user_id) + logger.info(f"获取到内部成员详情: {user_info}") + except Exception as e: + logger.warning(f"获取内部成员信息失败: {user_id}, 错误: {str(e)}") + + # 获取名称和unionid (内部成员可能没有unionid) + member_name = None + member_unionid = None + + if user_info: + if member_type == "EXTERNAL": + member_name = user_info.get("name") + member_unionid = user_info.get("unionid") + elif member_type == "INTERNAL": + member_name = user_info.get("name") + # 注意: 内部成员可能没有unionid字段 + + # 如果API获取失败,则使用群聊详情中的信息 + if not member_name: + member_name = "未知成员" # 没有任何名称信息时的默认值 + + member_create = WecomExternalChatMemberCreate( + chat_id=chat_id, + user_id=user_id, + type=member_type, + name=member_name, + unionid=member_unionid + ) + member_db = WecomExternalChatMemberDB(**member_create.model_dump()) + db.add(member_db) + + db.commit() + return True + + # 群更新事件 - 添加成员 + elif change_type == "update" and update_detail == "add_member": + if not join_user_id: + return False + + # 检查群聊是否存在 + chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first() + if not chat_db: + # 获取群聊信息并创建 + chat_info = await self.get_external_chat_info(chat_id) + if chat_info: + chat_create = WecomExternalChatCreate( + chat_id=chat_id, + name=chat_info.get("name"), + owner=chat_info.get("owner"), + member_count=len(chat_info.get("member_list", [])), + notice=chat_info.get("notice") + ) + chat_db = WecomExternalChatDB(**chat_create.model_dump()) + db.add(chat_db) + db.commit() + + # 检查成员是否已存在 + member_db = db.query(WecomExternalChatMemberDB).filter( + WecomExternalChatMemberDB.chat_id == chat_id, + WecomExternalChatMemberDB.user_id == join_user_id + ).first() + + if not member_db: + # 尝试获取群聊成员信息来确定成员类型 + member_type = "EXTERNAL" # 默认为外部联系人 + member_name = None + user_info = None + + # 获取群聊详情,查找成员类型 + try: + chat_info = await self.get_external_chat_info(chat_id) + if chat_info and "member_list" in chat_info: + for member in chat_info["member_list"]: + if member.get("userid") == join_user_id: + # 处理数字类型: 1=企业成员, 2=外部联系人 + member_type_num = member.get("type") + if member_type_num == 1: + member_type = "INTERNAL" + elif member_type_num == 2: + member_type = "EXTERNAL" + else: + member_type = "EXTERNAL" # 默认为外部联系人 + + member_name = member.get("name") + logger.info(f"从群聊详情中确定成员类型: {member_type}(原始值:{member_type_num}), 成员: {join_user_id}") + break + except Exception as e: + logger.warning(f"获取群聊详情失败: {str(e)}") + + # 如果是内部成员,不做任何处理 + if member_type == "INTERNAL": + logger.info(f"成员 {join_user_id} 是内部成员,不做处理") + return True + + # 如果是外部联系人,获取详细信息 + user_info = None + member_name = None + member_unionid = None + + if member_type == "EXTERNAL": + try: + user_info = await self.get_external_contact_info(join_user_id) + if user_info: + member_name = user_info.get("name") + member_unionid = user_info.get("unionid") + logger.info(f"获取到外部联系人详情: {member_name}, unionid: {member_unionid}") + except Exception as e: + logger.warning(f"获取外部联系人信息失败,可能是内部成员: {str(e)}") + if "84061" in str(e) or "not external contact" in str(e): + logger.info(f"用户 {join_user_id} 可能是内部成员,不做处理") + return True + + # 如果API获取失败,则使用群聊详情中的信息 + if not member_name: + member_name = "未知成员" # 没有任何名称信息时的默认值 + + member_create = WecomExternalChatMemberCreate( + chat_id=chat_id, + user_id=join_user_id, + type=member_type, + name=member_name, + unionid=member_unionid + ) + member_db = WecomExternalChatMemberDB(**member_create.model_dump()) + db.add(member_db) + db.commit() + + # 更新群成员数量 + if chat_db: + chat_db.member_count = chat_db.member_count + 1 + db.commit() + + return True + + # 群更新事件 - 移除成员 + elif change_type == "update" and update_detail == "del_member": + if not join_user_id: + return False + + logger.info(f"处理成员离开群聊事件: 群ID={chat_id}, 成员ID={join_user_id}") + + # 删除成员记录 + member_db = db.query(WecomExternalChatMemberDB).filter( + WecomExternalChatMemberDB.chat_id == chat_id, + WecomExternalChatMemberDB.user_id == join_user_id + ).first() + + if member_db: + logger.info(f"从数据库中删除成员记录: {member_db.user_id}, 姓名: {member_db.name}") + db.delete(member_db) + db.commit() + else: + logger.warning(f"未找到要删除的成员记录: chat_id={chat_id}, user_id={join_user_id}") + + # 更新群成员数量 + chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first() + if chat_db: + chat_db.member_count = max(0, chat_db.member_count - 1) + logger.info(f"更新群成员数量: {chat_db.member_count}") + db.commit() + else: + logger.warning(f"未找到群聊记录: chat_id={chat_id}") + + return True + + # 群解散事件 + elif change_type == "dismiss": + # 标记群聊为非活跃 + chat_db = db.query(WecomExternalChatDB).filter(WecomExternalChatDB.chat_id == chat_id).first() + if chat_db: + chat_db.is_active = False + db.commit() + logger.info(f"群聊解散: {chat_id}") + + return True + return False + except Exception as e: + logger.error(f"处理群聊变更事件异常: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + if 'db' in locals(): + db.close() + return False + finally: + if 'db' in locals(): + db.close() wecom_client = WecomClient() \ No newline at end of file diff --git a/app/models/wecom_external_chat.py b/app/models/wecom_external_chat.py new file mode 100644 index 0000000..bf30312 --- /dev/null +++ b/app/models/wecom_external_chat.py @@ -0,0 +1,98 @@ +from sqlalchemy import Column, String, Integer, DateTime, JSON, Boolean, ForeignKey, Text +from sqlalchemy.sql import func +from pydantic import BaseModel, Field +from typing import Optional, List, Dict, Any +from datetime import datetime +from .database import Base + +class WecomExternalChatDB(Base): + """企业微信外部群聊表""" + __tablename__ = "wecom_external_chats" + + id = Column(Integer, primary_key=True, autoincrement=True) + chat_id = Column(String(64), nullable=False, unique=True, index=True) # 群聊ID + name = Column(String(100), nullable=True) # 群名称 + owner = Column(String(64), nullable=True) # 群主ID + create_time = Column(DateTime(timezone=True), server_default=func.now()) + update_time = Column(DateTime(timezone=True), onupdate=func.now()) + member_count = Column(Integer, nullable=False, default=0) # 成员数量 + notice = Column(String(500), nullable=True) # 群公告 + is_active = Column(Boolean, nullable=False, default=True) # 是否活跃 + +class WecomExternalChatMemberDB(Base): + """企业微信外部群聊成员表""" + __tablename__ = "wecom_external_chat_members" + + id = Column(Integer, primary_key=True, autoincrement=True) + chat_id = Column(String(64), nullable=False, index=True) # 群聊ID + user_id = Column(String(64), nullable=False, index=True) # 用户ID + type = Column(String(32), nullable=False) # 成员类型: INTERNAL(内部成员)、EXTERNAL(外部联系人) + join_time = Column(DateTime(timezone=True), server_default=func.now()) # 加入时间 + unionid = Column(String(64), nullable=True) # 微信unionid + name = Column(String(100), nullable=True) # 成员名称 + mobile = Column(String(20), nullable=True) # 手机号 + welcome_sent = Column(Boolean, nullable=False, default=False) # 是否已发送欢迎消息 + + # 设置联合唯一索引 + __table_args__ = ( + {"mysql_charset": "utf8mb4"}, + ) + +# Pydantic 模型 +class WecomExternalChatCreate(BaseModel): + chat_id: str + name: Optional[str] = None + owner: Optional[str] = None + member_count: int = 0 + notice: Optional[str] = None + is_active: bool = True + +class WecomExternalChatUpdate(BaseModel): + name: Optional[str] = None + owner: Optional[str] = None + member_count: Optional[int] = None + notice: Optional[str] = None + is_active: Optional[bool] = None + +class WecomExternalChatInfo(BaseModel): + id: int + chat_id: str + name: Optional[str] = None + owner: Optional[str] = None + create_time: datetime + update_time: Optional[datetime] = None + member_count: int = 0 + notice: Optional[str] = None + is_active: bool = True + + class Config: + from_attributes = True + +class WecomExternalChatMemberCreate(BaseModel): + chat_id: str + user_id: str + type: str + unionid: Optional[str] = None + name: Optional[str] = None + mobile: Optional[str] = None + welcome_sent: bool = False + +class WecomExternalChatMemberUpdate(BaseModel): + unionid: Optional[str] = None + name: Optional[str] = None + mobile: Optional[str] = None + welcome_sent: Optional[bool] = None + +class WecomExternalChatMemberInfo(BaseModel): + id: int + chat_id: str + user_id: str + type: str + join_time: datetime + unionid: Optional[str] = None + name: Optional[str] = None + mobile: Optional[str] = None + welcome_sent: bool + + class Config: + from_attributes = True \ No newline at end of file