from fastapi import APIRouter, HTTPException, Depends, Response, Body from sqlalchemy.orm import Session from app.models.user import UserLogin ,UserInfo, ResetPasswordRequest,PhoneLoginRequest,VerifyCodeRequest, UserDB, UserUpdate, UserRole, UserPasswordLogin, ReferralUserInfo, generate_user_code, ChangePasswordRequest from app.models.coupon import CouponDB, UserCouponDB, CouponStatus from app.api.deps import get_current_user, get_admin_user from app.models.database import get_db import random import string import redis from app.core.config import settings from tencentcloud.common import credential from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException from tencentcloud.sms.v20210111 import sms_client, models from app.core.security import create_access_token, get_password_hash, verify_password from app.core.response import success_response, error_response, ResponseModel from pydantic import BaseModel, Field from typing import List from typing import Optional from datetime import datetime, timedelta from sqlalchemy import text from app.models.community import CommunityDB from app.models.user_auth import UserAuthDB, UserAuthCreate, UserAuthInfo from app.core.qcloud import qcloud_manager from app.models.merchant import MerchantDB from app.models.address import AddressDB, AddressInfo from app.models.user import UserUpdateRoles, DeliverymanSetDelivering from app.models.order import ShippingOrderDB, OrderStatus from app.core.redis_client import redis_client import logging from fastapi import Request from app.core.wecombot import WecomBot from fastapi import BackgroundTasks router = APIRouter() @router.post("/send-code") async def send_verify_code(request: VerifyCodeRequest): """发送验证码""" phone = request.phone # 通过 redis 实现验证码发送频率限制 stored_phone = redis_client.get_client().get(f"verify_code_limit:{phone}") if phone == stored_phone: return error_response(message="验证码发送频率过高") # 发送验证码 code, request_id = await qcloud_manager.send_sms_code(phone) print(f"验证码发送:code: {code}, request_id: {request_id}") # 存储验证码到 Redis redis_client.get_client().setex( f"verify_code:{phone}", settings.VERIFICATION_CODE_EXPIRE_SECONDS, code ) # 设置验证码发送频率限制 redis_client.get_client().setex( f"verify_code_limit:{phone}", settings.VERIFICATION_CODE_LIMIT_SECONDS, phone, ) return success_response(message="验证码已发送") @router.get("/info", response_model=ResponseModel) async def get_user_info( db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """获取用户信息""" # 获取用户默认地址 default_address = db.query(AddressDB, CommunityDB.name.label('community_name')).join( CommunityDB, AddressDB.community_id == CommunityDB.id ).filter( AddressDB.user_id == current_user.userid, AddressDB.is_default == True ).first() # 构建返回数据 user_info = UserInfo.model_validate(current_user) user_data = user_info.model_dump() # 获取用户归属小区信息 if current_user.community_id: user_data['community_id'] = current_user.community_id community = db.query(CommunityDB).filter(CommunityDB.id == current_user.community_id).first() if community: user_data['community_name'] = community.name # 处理默认地址 if default_address: address_data = { "id": default_address.AddressDB.id, "community_id": default_address.AddressDB.community_id, "community_name": default_address.community_name, "community_building_id": default_address.AddressDB.community_building_id, "community_building_name": default_address.AddressDB.community_building_name, "address_detail": default_address.AddressDB.address_detail, "name": default_address.AddressDB.name, "phone": default_address.AddressDB.phone, "gender": default_address.AddressDB.gender, "is_default": default_address.AddressDB.is_default } user_data['default_address'] = AddressInfo(**address_data) # 查询用户未使用的优惠券数量 coupon_count = db.query(UserCouponDB).filter( UserCouponDB.user_id == current_user.userid, UserCouponDB.status == CouponStatus.UNUSED ).count() user_data['coupon_count'] = coupon_count return success_response(data=user_data) @router.post("/logout", response_model=ResponseModel) async def logout( response: Response, request: Request, current_user: UserDB = Depends(get_current_user) ): """退出登录""" request.session.clear() return success_response(message="退出登录成功") @router.put("/update", response_model=ResponseModel) async def update_user_info( update_data: UserUpdate, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """更新用户信息""" # 获取非空的更新字段 update_fields = update_data.model_dump(exclude_unset=True) if not update_fields: return error_response(code=400, message="没有提供要更新的字段") # 更新字段 for field, value in update_fields.items(): setattr(current_user, field, value) try: db.commit() db.refresh(current_user) return success_response( message="用户信息更新成功", data=UserInfo.model_validate(current_user) ) except Exception as e: db.rollback() return error_response(code=500, message=f"更新失败: {str(e)}") @router.put("/roles", response_model=ResponseModel) async def update_user_roles( update_data: UserUpdateRoles, db: Session = Depends(get_db), admin: UserDB = Depends(get_admin_user) ): """更新用户角色(管理员)""" user = db.query(UserDB).filter(UserDB.userid == update_data.user_id).first() if not user: return error_response(code=404, message="用户不存在") # 确保至少有一个角色 if not update_data.roles: return error_response(code=400, message="用户必须至少有一个角色") # 确保普通用户角色始终存在 if UserRole.USER not in update_data.roles: update_data.roles.append(UserRole.USER) # 更新角色 user.roles = list(set(update_data.roles)) # 去重 try: db.commit() db.refresh(user) return success_response( message="用户角色更新成功", data=UserInfo.model_validate(user) ) except Exception as e: db.rollback() return error_response(code=500, message=f"更新失败: {str(e)}") @router.post("/password-login", response_model=ResponseModel) async def password_login( login_data: UserPasswordLogin, db: Session = Depends(get_db), response: Response = None, request: Request = None ): """密码登录""" print(f"login_data: {login_data}") user = db.query(UserDB).filter(UserDB.phone == login_data.phone).first() print(f"user: {user}") if not user: return error_response(code=401, message="用户不存在") if not user.password: return error_response(code=401, message="请先设置密码") if not verify_password(login_data.password, user.password): return error_response(code=401, message="密码错误") if login_data.role == UserRole.ADMIN and UserRole.ADMIN not in user.roles: return error_response(code=401, message="你没有登录权限") if login_data.role == UserRole.PARTNER and UserRole.PARTNER not in user.roles: return error_response(code=401, message="你没有登录权限") if login_data.role == UserRole.MERCHANT and UserRole.MERCHANT not in user.roles: return error_response(code=401, message="你没有登录权限") if login_data.role == UserRole.DELIVERYMAN and UserRole.DELIVERYMAN not in user.roles: return error_response(code=401, message="你没有登录权限") if login_data.role == UserRole.USER and UserRole.USER not in user.roles: return error_response(code=401, message="你没有登录权限") if login_data.role == UserRole.MERCHANT and UserRole.MERCHANT in user.roles: # 检查是否有商家设置了当前用户 id merchant = db.query(MerchantDB).filter(MerchantDB.user_id == user.userid).first() if not merchant: return error_response(code=401, message="商家账户,请先关联商家") if login_data.role == UserRole.DELIVERYMAN and UserRole.DELIVERYMAN in user.roles and not user.community_id: return error_response(code=401, message="配送员账户,请先设置归属小区") # 生成访问令牌 access_token = create_access_token(data={"phone": user.phone}) request.session["access_token"] = access_token return success_response( data={ "access_token": access_token, "token_type": "bearer", "user": UserInfo.model_validate(user) } ) @router.get("/referrals", response_model=ResponseModel) async def get_referral_users( skip: int = 0, limit: int = 10, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """获取我邀请的用户列表""" referral_users = db.query(UserDB).filter( UserDB.referral_code == current_user.user_code ).order_by( UserDB.create_time.desc() ).offset(skip).limit(limit).all() total = db.query(UserDB).filter( UserDB.referral_code == current_user.user_code ).count() user_list = [] # 获取用户是否下单 for user in referral_users: u = ReferralUserInfo.model_validate(user).model_dump() order_count = db.query(ShippingOrderDB).filter( ShippingOrderDB.userid == user.userid, ShippingOrderDB.status == OrderStatus.COMPLETED ).count() u['is_place_order'] = order_count > 0 u['phone'] = f"{u['phone'][:3]}****{u['phone'][7:]}" user_list.append(u) return success_response(data={ "total": total, "items": user_list }) @router.get("/list", response_model=ResponseModel) async def get_list( skip: int = 0, limit: int = 10, role: Optional[str] = None, phone: Optional[str] = None, # 手机号精确查询 community_id: Optional[int] = None, db: Session = Depends(get_db), admin: UserDB = Depends(get_admin_user) ): """获取用户列表(管理员)""" # 使用 join 查询用户和小区信息 query = db.query( UserDB, CommunityDB.name.label('community_name') ).outerjoin( CommunityDB, UserDB.community_id == CommunityDB.id ) # 如果指定了角色,添加角色筛选条件 if role: query = query.filter(text(f"JSON_CONTAINS(users.roles, '\"{role}\"')")) # 如果指定了手机号,添加精确匹配条件 if phone: query = query.filter(UserDB.phone == phone) # 如果指定了小区ID,添加精确匹配条件 if community_id: query = query.filter(UserDB.community_id == community_id) total = query.count() results = query.order_by( UserDB.create_time.desc() ).offset(skip).limit(limit).all() # 处理手机号脱敏 def mask_phone(phone: str) -> str: return f"{phone[:3]}****{phone[7:]}" user_list = [] for user, community_name in results: user_info = UserInfo.model_validate(user) user_info.phone = mask_phone(user_info.phone) # 手机号脱敏 user_list.append({ **user_info.model_dump(), "community_name": community_name }) return success_response(data={ "total": total, "items": user_list }) @router.post("/reset-password", response_model=ResponseModel) async def reset_password( request: ResetPasswordRequest, db: Session = Depends(get_db), admin: UserDB = Depends(get_admin_user) # 仅管理员可操作 ): """重置用户密码(管理员)""" # 查找用户 user = db.query(UserDB).filter(UserDB.userid == request.user_id).first() if not user: return error_response(code=404, message="用户不存在") # 重置密码 hashed_password = get_password_hash(request.new_password) user.password = hashed_password try: db.commit() return success_response( message="密码重置成功", data={ "userid": user.userid, "nickname": user.nickname, "phone": f"{user.phone[:3]}****{user.phone[7:]}" # 手机号脱敏 } ) except Exception as e: db.rollback() return error_response(code=500, message=f"密码重置失败: {str(e)}") class UpdateUserCommunityRequest(BaseModel): user_id: int community_id: Optional[int] = None # None 表示清除归属小区 @router.put("/community", response_model=ResponseModel) async def update_user_community( request: UpdateUserCommunityRequest, db: Session = Depends(get_db), admin: UserDB = Depends(get_admin_user) # 仅管理员可操作 ): """更新用户归属小区(管理员)""" # 查找用户 user = db.query(UserDB).filter(UserDB.userid == request.user_id).first() if not user: return error_response(code=404, message="用户不存在") # 如果指定了小区ID,验证小区是否存在 if request.community_id is not None: community = db.query(CommunityDB).filter( CommunityDB.id == request.community_id ).first() if not community: return error_response(code=404, message="小区不存在") try: # 更新用户归属小区 user.community_id = request.community_id db.commit() return success_response( message="更新成功", data=UserInfo.model_validate(user) ) except Exception as e: db.rollback() return error_response(code=500, message=f"更新失败: {str(e)}") @router.post("/change-password", response_model=ResponseModel) async def change_password( request: ChangePasswordRequest, db: Session = Depends(get_db) ): """通过短信验证码修改密码""" user = db.query(UserDB).filter(UserDB.phone == request.phone).first() if not user: return error_response(message="用户不存在") # 验证短信验证码 redis_code = redis_client.get_client().get(f"verify_code:{user.phone}") if not redis_code: return error_response(message="验证码已过期") if redis_code != request.verify_code: return error_response(message="验证码错误") try: # 更新密码 user.password = get_password_hash(request.new_password) db.commit() # 删除验证码 redis_client.get_client().delete(f"verify_code:{request.phone}") return success_response(message="密码修改成功") except Exception as e: db.rollback() return error_response(code=500, message=f"修改密码失败: {str(e)}") @router.put("/deliveryman/set_delivering", response_model=ResponseModel) async def deliveryman_set_delivering( background_tasks: BackgroundTasks, request: DeliverymanSetDelivering, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """配送员设置是否在配送中""" current_user.is_delivering = request.is_delivering db.commit() # 发送企微通知 if not request.is_delivering: wecom_bot = WecomBot() background_tasks.add_task( wecom_bot.send_deliveryman_close_delivering, db, current_user, current_user.community.name ) return success_response(message="设置成功",data={ "is_delivering": current_user.is_delivering }) @router.post("/auth", response_model=ResponseModel) async def create_user_auth( auth: UserAuthCreate, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """用户实名认证""" # 检查是否已认证 if current_user.is_auth: return error_response(code=400, message="用户已完成实名认证") # 检查是否存在认证记录 exists = db.query(UserAuthDB).filter( UserAuthDB.user_id == current_user.userid ).first() if exists: return error_response(code=400, message="该用户已有认证记录") try: # 调用实名认证 verify_result = await qcloud_manager.verify_id_card( id_card=auth.id_number, name=auth.name ) # 验证不通过 if verify_result["Result"] != "0": # 0 表示一致,其他值表示不一致 return error_response( code=400, message=f"实名认证失败: {verify_result['Description']}" ) # 创建认证记录 auth_record = UserAuthDB( user_id=current_user.userid, name=auth.name, id_number=auth.id_number ) db.add(auth_record) # 更新用户认证状态 current_user.is_auth = True db.commit() return success_response(data=UserAuthInfo.model_validate(auth_record)) except Exception as e: db.rollback() return error_response(code=500, message=f"认证失败: {str(e)}") @router.get("/auth", response_model=ResponseModel) async def get_user_auth( db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """获取用户认证信息""" auth_record = db.query(UserAuthDB).filter( UserAuthDB.user_id == current_user.userid ).first() if not auth_record: return error_response(code=404, message="未找到认证记录") # 身份证号脱敏:保留前4位和后4位,中间用*代替 masked_id_number = f"{auth_record.id_number[:4]}{'*' * 10}{auth_record.id_number[-4:]}" auth_info = UserAuthInfo.model_validate(auth_record) auth_info.id_number = masked_id_number return success_response(data=auth_info) #通过手机号搜索用户 @router.get("/search_by_phone/{phone}", response_model=ResponseModel) async def search_user_by_phone( phone: str, role: Optional[str] = None, db: Session = Depends(get_db) ): """通过手机号搜索用户""" user = db.query(UserDB).filter(UserDB.phone == phone).first() if not user: return error_response(code=404, message="用户不存在") if role: user.roles = list(set(user.roles)) # 去重 if role not in user.roles: return error_response(code=404, message="该用户没有运营商权限") return success_response(data=UserInfo.model_validate(user))