deliveryman-api/app/api/endpoints/user.py

597 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, HTTPException, Depends, Response, Body
from sqlalchemy.orm import Session
from app.models.user import UserLogin ,UserInfo, ResetPasswordRequest,PhoneLoginRequest,VerifyCodeRequest, UserDB, UserUpdate, UserRole, UserPasswordLogin, ReferralUserInfo, generate_user_code, ChangePasswordRequest
from app.models.coupon import CouponDB, UserCouponDB, CouponStatus
from app.api.deps import get_current_user, get_admin_user
from app.models.database import get_db
import random
import string
import redis
from app.core.config import settings
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.sms.v20210111 import sms_client, models
from app.core.security import create_access_token, set_jwt_cookie, clear_jwt_cookie, get_password_hash, verify_password
from app.core.response import success_response, error_response, ResponseModel
from pydantic import BaseModel, Field
from typing import List
from typing import Optional
from datetime import datetime, timedelta
from sqlalchemy import text
from app.models.community import CommunityDB
from app.models.user_auth import UserAuthDB, UserAuthCreate, UserAuthInfo
from app.core.qcloud import qcloud_manager
from app.models.merchant import MerchantDB
from app.models.address import AddressDB, AddressInfo
from app.models.user import UserUpdateRoles, UserUpdateDeliveryCommissionRate
from app.models.order import ShippingOrderDB, OrderStatus
from app.core.redis_client import redis_client
router = APIRouter()
@router.post("/send-code")
async def send_verify_code(request: VerifyCodeRequest):
"""发送验证码"""
phone = request.phone
try:
# 发送验证码
code, request_id = await qcloud_manager.send_sms_code(phone)
# 存储验证码到 Redis
redis_client.get_client().setex(
f"verify_code:{phone}",
settings.VERIFICATION_CODE_EXPIRE_SECONDS,
code
)
return success_response(message="验证码已发送")
except Exception as e:
return error_response(message=f"发送验证码失败: {str(e)}")
@router.post("/login")
async def login(
user_login: UserLogin,
db: Session = Depends(get_db),
response: Response = None
):
"""用户登录"""
phone = user_login.phone
verify_code = user_login.verify_code
# 验证验证码
stored_code = redis_client.get_client().get(f"verify_code:{phone}")
if not stored_code or stored_code != verify_code:
return error_response(message="验证码错误或已过期")
redis_client.delete(f"verify_code:{phone}")
# 查找或创建用户
user = db.query(UserDB).filter(UserDB.phone == phone).first()
if not user:
# 生成用户编码
user_code = generate_user_code(db)
user = UserDB(
nickname=f"蜜友{phone[-4:]}",
phone=phone,
user_code=user_code,
referral_code=user_login.referral_code,
password=get_password_hash("123456"),
roles=[UserRole.USER]
)
db.add(user)
db.flush() # 获取用户ID
db.commit()
db.refresh(user)
# 创建访问令牌
access_token = create_access_token(
data={"phone": user.phone,"userid":user.userid}
)
# 设置JWT cookie
if response:
set_jwt_cookie(response, access_token)
return success_response(
message="登录成功",
data={
"user": UserInfo.model_validate(user),
"access_token": access_token,
"token_type": "bearer"
}
)
@router.get("/info", response_model=ResponseModel)
async def get_user_info(
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取用户信息"""
# 获取用户默认地址
default_address = db.query(AddressDB, CommunityDB.name.label('community_name')).join(
CommunityDB,
AddressDB.community_id == CommunityDB.id
).filter(
AddressDB.user_id == current_user.userid,
AddressDB.is_default == True
).first()
# 构建返回数据
user_info = UserInfo.model_validate(current_user)
user_data = user_info.model_dump()
# 获取用户归属小区信息
if current_user.community_id:
user_data['community_id'] = current_user.community_id
community = db.query(CommunityDB).filter(CommunityDB.id == current_user.community_id).first()
if community:
user_data['community_name'] = community.name
# 处理默认地址
if default_address:
address_data = {
"id": default_address.AddressDB.id,
"community_id": default_address.AddressDB.community_id,
"community_name": default_address.community_name,
"community_building_id": default_address.AddressDB.community_building_id,
"community_building_name": default_address.AddressDB.community_building_name,
"address_detail": default_address.AddressDB.address_detail,
"name": default_address.AddressDB.name,
"phone": default_address.AddressDB.phone,
"gender": default_address.AddressDB.gender,
"is_default": default_address.AddressDB.is_default
}
user_data['default_address'] = AddressInfo(**address_data)
# 查询用户未使用的优惠券数量
coupon_count = db.query(UserCouponDB).filter(
UserCouponDB.user_id == current_user.userid,
UserCouponDB.status == CouponStatus.UNUSED
).count()
user_data['coupon_count'] = coupon_count
return success_response(data=user_data)
@router.post("/phone-login", response_model=ResponseModel)
async def phone_login(
request: PhoneLoginRequest,
db: Session = Depends(get_db),
response: Response = None
):
""" 手机号登录(测试环境) """
if not settings.DEBUG:
return error_response(code=400, message="测试环境不支持手机号登录")
# 查找或创建用户
user = db.query(UserDB).filter(UserDB.phone == request.phone).first()
if not user:
# 生成用户编码
user_code = generate_user_code(db)
user = UserDB(
nickname=f"蜜友{request.phone[-4:]}",
phone=request.phone,
user_code=user_code,
referral_code=request.referral_code,
password=get_password_hash("123456"),
roles=[UserRole.USER]
)
db.add(user)
db.flush()
db.commit()
db.refresh(user)
# 创建访问令牌
access_token = create_access_token(
data={"phone": user.phone,"userid":user.userid}
)
# 设置JWT cookie
if response:
set_jwt_cookie(response, access_token)
return success_response(
message="登录成功",
data={
"user": UserInfo.model_validate(user),
"access_token": access_token,
"token_type": "bearer"
}
)
@router.post("/logout", response_model=ResponseModel)
async def logout(
response: Response,
current_user: UserDB = Depends(get_current_user)
):
"""退出登录"""
clear_jwt_cookie(response)
return success_response(message="退出登录成功")
@router.put("/update", response_model=ResponseModel)
async def update_user_info(
update_data: UserUpdate,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""更新用户信息"""
# 获取非空的更新字段
update_fields = update_data.model_dump(exclude_unset=True)
if not update_fields:
return error_response(code=400, message="没有提供要更新的字段")
# 更新字段
for field, value in update_fields.items():
setattr(current_user, field, value)
try:
db.commit()
db.refresh(current_user)
return success_response(
message="用户信息更新成功",
data=UserInfo.model_validate(current_user)
)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.put("/delivery-commission-rate", response_model=ResponseModel)
async def update_delivery_commission_rate(
update_data: UserUpdateDeliveryCommissionRate,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_admin_user)
):
"""更新配送佣金比例"""
user = db.query(UserDB).filter(UserDB.userid == update_data.user_id).first()
if not user:
return error_response(code=404, message="用户不存在")
user.delivery_commission_rate = update_data.delivery_commission_rate
db.commit()
db.refresh(user)
return success_response(message="配送佣金比例更新成功")
@router.put("/roles", response_model=ResponseModel)
async def update_user_roles(
update_data: UserUpdateRoles,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""更新用户角色(管理员)"""
user = db.query(UserDB).filter(UserDB.userid == update_data.user_id).first()
if not user:
return error_response(code=404, message="用户不存在")
# 确保至少有一个角色
if not update_data.roles:
return error_response(code=400, message="用户必须至少有一个角色")
# 确保普通用户角色始终存在
if UserRole.USER not in update_data.roles:
update_data.roles.append(UserRole.USER)
# 更新角色
user.roles = list(set(update_data.roles)) # 去重
try:
db.commit()
db.refresh(user)
return success_response(
message="用户角色更新成功",
data=UserInfo.model_validate(user)
)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.post("/password-login", response_model=ResponseModel)
async def password_login(
login_data: UserPasswordLogin,
db: Session = Depends(get_db)
):
"""密码登录"""
user = db.query(UserDB).filter(UserDB.phone == login_data.phone).first()
if not user:
return error_response(code=401, message="用户不存在")
if not user.password:
return error_response(code=401, message="请先设置密码")
if not verify_password(login_data.password, user.password):
return error_response(code=401, message="密码错误")
if login_data.role == UserRole.ADMIN:
if UserRole.ADMIN not in user.roles:
return error_response(code=401, message="管理员账户,请先设置管理员角色")
if login_data.role == UserRole.MERCHANT and UserRole.MERCHANT in user.roles:
# 检查是否有商家设置了当前用户 id
merchant = db.query(MerchantDB).filter(MerchantDB.user_id == user.userid).first()
if not merchant:
return error_response(code=401, message="商家账户,请先关联商家")
if login_data.role == UserRole.DELIVERYMAN and UserRole.DELIVERYMAN in user.roles and not user.community_id:
return error_response(code=401, message="配送员账户,请先设置归属小区")
# 生成访问令牌
access_token = create_access_token(data={"phone": user.phone,"userid":user.userid})
return success_response(
data={
"access_token": access_token,
"token_type": "bearer",
"user": UserInfo.model_validate(user)
}
)
@router.get("/referrals", response_model=ResponseModel)
async def get_referral_users(
skip: int = 0,
limit: int = 10,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取我邀请的用户列表"""
referral_users = db.query(UserDB).filter(
UserDB.referral_code == current_user.user_code
).order_by(
UserDB.create_time.desc()
).offset(skip).limit(limit).all()
total = db.query(UserDB).filter(
UserDB.referral_code == current_user.user_code
).count()
user_list = []
# 获取用户是否下单
for user in referral_users:
u = ReferralUserInfo.model_validate(user).model_dump()
order_count = db.query(ShippingOrderDB).filter(
ShippingOrderDB.userid == user.userid,
ShippingOrderDB.status == OrderStatus.COMPLETED
).count()
u['is_place_order'] = order_count > 0
u['phone'] = f"{u['phone'][:3]}****{u['phone'][7:]}"
user_list.append(u)
return success_response(data={
"total": total,
"items": user_list
})
@router.get("/list", response_model=ResponseModel)
async def get_list(
skip: int = 0,
limit: int = 10,
role: Optional[str] = None,
phone: Optional[str] = None, # 手机号精确查询
community_id: Optional[int] = None,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""获取用户列表(管理员)"""
# 使用 join 查询用户和小区信息
query = db.query(
UserDB,
CommunityDB.name.label('community_name')
).outerjoin(
CommunityDB,
UserDB.community_id == CommunityDB.id
)
# 如果指定了角色,添加角色筛选条件
if role:
query = query.filter(text(f"JSON_CONTAINS(users.roles, '\"{role}\"')"))
# 如果指定了手机号,添加精确匹配条件
if phone:
query = query.filter(UserDB.phone == phone)
# 如果指定了小区ID添加精确匹配条件
if community_id:
query = query.filter(UserDB.community_id == community_id)
total = query.count()
results = query.order_by(
UserDB.create_time.desc()
).offset(skip).limit(limit).all()
# 处理手机号脱敏
def mask_phone(phone: str) -> str:
return f"{phone[:3]}****{phone[7:]}"
user_list = []
for user, community_name in results:
user_info = UserInfo.model_validate(user)
user_info.phone = mask_phone(user_info.phone) # 手机号脱敏
user_list.append({
**user_info.model_dump(),
"community_name": community_name
})
return success_response(data={
"total": total,
"items": user_list
})
@router.post("/reset-password", response_model=ResponseModel)
async def reset_password(
request: ResetPasswordRequest,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user) # 仅管理员可操作
):
"""重置用户密码(管理员)"""
# 查找用户
user = db.query(UserDB).filter(UserDB.userid == request.user_id).first()
if not user:
return error_response(code=404, message="用户不存在")
# 重置密码
hashed_password = get_password_hash(request.new_password)
user.password = hashed_password
try:
db.commit()
return success_response(
message="密码重置成功",
data={
"userid": user.userid,
"nickname": user.nickname,
"phone": f"{user.phone[:3]}****{user.phone[7:]}" # 手机号脱敏
}
)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"密码重置失败: {str(e)}")
class UpdateUserCommunityRequest(BaseModel):
user_id: int
community_id: Optional[int] = None # None 表示清除归属小区
@router.put("/community", response_model=ResponseModel)
async def update_user_community(
request: UpdateUserCommunityRequest,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user) # 仅管理员可操作
):
"""更新用户归属小区(管理员)"""
# 查找用户
user = db.query(UserDB).filter(UserDB.userid == request.user_id).first()
if not user:
return error_response(code=404, message="用户不存在")
# 如果指定了小区ID验证小区是否存在
if request.community_id is not None:
community = db.query(CommunityDB).filter(
CommunityDB.id == request.community_id
).first()
if not community:
return error_response(code=404, message="小区不存在")
try:
# 更新用户归属小区
user.community_id = request.community_id
db.commit()
return success_response(
message="更新成功",
data=UserInfo.model_validate(user)
)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.post("/change-password", response_model=ResponseModel)
async def change_password(
request: ChangePasswordRequest,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user) # 获取当前登录用户
):
"""通过短信验证码修改密码"""
# 验证短信验证码
redis_code = redis_client.get(f"verify_code:{current_user.phone}")
if not redis_code:
return error_response(message="验证码已过期")
if redis_code != request.verify_code:
return error_response(message="验证码错误")
try:
# 更新密码
current_user.password = get_password_hash(request.new_password)
db.commit()
# 删除验证码
redis_client.delete(f"verify_code:{current_user.phone}")
return success_response(message="密码修改成功")
except Exception as e:
db.rollback()
return error_response(code=500, message=f"修改密码失败: {str(e)}")
@router.post("/auth", response_model=ResponseModel)
async def create_user_auth(
auth: UserAuthCreate,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""用户实名认证"""
# 检查是否已认证
if current_user.is_auth:
return error_response(code=400, message="用户已完成实名认证")
# 检查是否存在认证记录
exists = db.query(UserAuthDB).filter(
UserAuthDB.user_id == current_user.userid
).first()
if exists:
return error_response(code=400, message="该用户已有认证记录")
try:
# 调用实名认证
verify_result = await qcloud_manager.verify_id_card(
id_card=auth.id_number,
name=auth.name
)
# 验证不通过
if verify_result["Result"] != "0": # 0 表示一致,其他值表示不一致
return error_response(
code=400,
message=f"实名认证失败: {verify_result['Description']}"
)
# 创建认证记录
auth_record = UserAuthDB(
user_id=current_user.userid,
name=auth.name,
id_number=auth.id_number
)
db.add(auth_record)
# 更新用户认证状态
current_user.is_auth = True
db.commit()
return success_response(data=UserAuthInfo.model_validate(auth_record))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"认证失败: {str(e)}")
@router.get("/auth", response_model=ResponseModel)
async def get_user_auth(
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取用户认证信息"""
auth_record = db.query(UserAuthDB).filter(
UserAuthDB.user_id == current_user.userid
).first()
if not auth_record:
return error_response(code=404, message="未找到认证记录")
# 身份证号脱敏保留前4位和后4位中间用*代替
masked_id_number = f"{auth_record.id_number[:4]}{'*' * 10}{auth_record.id_number[-4:]}"
auth_info = UserAuthInfo.model_validate(auth_record)
auth_info.id_number = masked_id_number
return success_response(data=auth_info)