deliveryman-api/app/api/endpoints/user.py
2025-04-02 15:16:17 +08:00

593 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, HTTPException, Depends, Response, Body
from sqlalchemy.orm import Session
from app.models.user import UserLogin ,UserInfo, ResetPasswordRequest,PhoneLoginRequest,VerifyCodeRequest, UserDB, UserUpdate, UserRole, UserPasswordLogin, ReferralUserInfo, generate_user_code, ChangePasswordRequest
from app.models.coupon import CouponDB, UserCouponDB, CouponStatus
from app.api.deps import get_current_user, get_admin_user
from app.models.database import get_db
import random
import string
import redis
from app.core.config import settings
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.sms.v20210111 import sms_client, models
from app.core.security import create_access_token, get_password_hash, verify_password
from app.core.response import success_response, error_response, ResponseModel
from pydantic import BaseModel, Field
from typing import List
from typing import Optional
from datetime import datetime, timedelta
from sqlalchemy import text
from app.models.community import CommunityDB
from app.models.user_auth import UserAuthDB, UserAuthCreate, UserAuthInfo
from app.models.address import AddressType
from app.core.qcloud import qcloud_manager
from app.models.merchant import MerchantDB
from app.models.address import AddressDB, AddressInfo
from app.models.user import UserUpdateRoles, DeliverymanSetDelivering
from app.models.order import ShippingOrderDB, OrderStatus
from app.core.redis_client import redis_client
import logging
from fastapi import Request
from app.core.wecombot import WecomBot
from fastapi import BackgroundTasks
from app.core.wechat import WeChatClient
from app.core.qcloud import qcloud_manager
router = APIRouter()
@router.post("/send-code")
async def send_verify_code(request: VerifyCodeRequest):
"""发送验证码"""
phone = request.phone
# 通过 redis 实现验证码发送频率限制
stored_phone = redis_client.get_client().get(f"verify_code_limit:{phone}")
if phone == stored_phone:
return error_response(message="验证码发送频率过高")
# 发送验证码
code, request_id = await qcloud_manager.send_sms_code(phone)
print(f"验证码发送code: {code}, request_id: {request_id}")
# 存储验证码到 Redis
redis_client.get_client().setex(
f"verify_code:{phone}",
settings.VERIFICATION_CODE_EXPIRE_SECONDS,
code
)
# 设置验证码发送频率限制
redis_client.get_client().setex(
f"verify_code_limit:{phone}",
settings.VERIFICATION_CODE_LIMIT_SECONDS,
phone,
)
return success_response(message="验证码已发送")
@router.get("/qr_code", response_model=ResponseModel)
async def get_qr_code(
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
if current_user.qr_code_url:
return success_response(message="获取用户二维码成功",data={
"url": current_user.qr_code_url
})
wechat_client = WeChatClient()
image_data = await wechat_client.get_wx_code(path=f"pages/help/index/index", query=f"shared_user_code={current_user.user_code}")
random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
key = f"qr_code/{current_user.user_code}_{random_str}.png"
url = await qcloud_manager.upload_file_bytes(image_data, key)
current_user.qr_code_url = url
db.commit()
"""获取用户二维码"""
return success_response(message="获取用户二维码成功",data={
"url": current_user.qr_code_url
})
@router.get("/info", response_model=ResponseModel)
async def get_user_info(
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取用户信息"""
# 获取用户默认地址
default_address = db.query(AddressDB, CommunityDB.name.label('community_name')).join(
CommunityDB,
AddressDB.community_id == CommunityDB.id
).filter(
AddressDB.address_type == AddressType.PICKUP,
AddressDB.user_id == current_user.userid,
AddressDB.is_default == True
).first()
# 构建返回数据
user_info = UserInfo.model_validate(current_user)
user_data = user_info.model_dump()
# 获取用户归属小区信息
if current_user.community_id:
user_data['community_id'] = current_user.community_id
community = db.query(CommunityDB).filter(CommunityDB.id == current_user.community_id).first()
if community:
user_data['community_name'] = community.name
# 获取认证信息
auth_record = db.query(UserAuthDB).filter(UserAuthDB.user_id == current_user.userid).first()
if auth_record:
user_data['auth_name'] = auth_record.name
else:
user_data['auth_name'] = current_user.nickname
# 处理默认地址
if default_address:
address_data = {
"id": default_address.AddressDB.id,
"community_id": default_address.AddressDB.community_id,
"community_name": default_address.community_name,
"community_building_id": default_address.AddressDB.community_building_id,
"community_building_name": default_address.AddressDB.community_building_name,
"address_detail": default_address.AddressDB.address_detail,
"name": default_address.AddressDB.name,
"phone": default_address.AddressDB.phone,
"gender": default_address.AddressDB.gender,
"is_default": default_address.AddressDB.is_default
}
user_data['default_address'] = AddressInfo(**address_data)
# 查询用户未使用的优惠券数量
coupon_count = db.query(UserCouponDB).filter(
UserCouponDB.user_id == current_user.userid,
UserCouponDB.status == CouponStatus.UNUSED
).count()
user_data['coupon_count'] = coupon_count
return success_response(data=user_data)
@router.post("/logout", response_model=ResponseModel)
async def logout(
response: Response,
request: Request,
current_user: UserDB = Depends(get_current_user)
):
"""退出登录"""
request.session.clear()
return success_response(message="退出登录成功")
@router.put("/update", response_model=ResponseModel)
async def update_user_info(
update_data: UserUpdate,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""更新用户信息"""
# 获取非空的更新字段
update_fields = update_data.model_dump(exclude_unset=True)
if not update_fields:
return error_response(code=400, message="没有提供要更新的字段")
# 更新字段
for field, value in update_fields.items():
setattr(current_user, field, value)
try:
db.commit()
db.refresh(current_user)
return success_response(
message="用户信息更新成功",
data=UserInfo.model_validate(current_user)
)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.put("/roles", response_model=ResponseModel)
async def update_user_roles(
update_data: UserUpdateRoles,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""更新用户角色(管理员)"""
user = db.query(UserDB).filter(UserDB.userid == update_data.user_id).first()
if not user:
return error_response(code=404, message="用户不存在")
# 确保至少有一个角色
if not update_data.roles:
return error_response(code=400, message="用户必须至少有一个角色")
# 确保普通用户角色始终存在
if UserRole.USER not in update_data.roles:
update_data.roles.append(UserRole.USER)
# 更新角色
user.roles = list(set(update_data.roles)) # 去重
try:
db.commit()
db.refresh(user)
return success_response(
message="用户角色更新成功",
data=UserInfo.model_validate(user)
)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.post("/password-login", response_model=ResponseModel)
async def password_login(
login_data: UserPasswordLogin,
db: Session = Depends(get_db),
response: Response = None,
request: Request = None
):
"""密码登录"""
print(f"login_data: {login_data}")
user = db.query(UserDB).filter(UserDB.phone == login_data.phone).first()
print(f"user: {user}")
if not user:
return error_response(code=401, message="用户不存在")
if not user.password:
return error_response(code=401, message="请先设置密码")
if not verify_password(login_data.password, user.password):
return error_response(code=401, message="密码错误")
if login_data.role == UserRole.ADMIN and UserRole.ADMIN not in user.roles:
return error_response(code=401, message="你没有登录权限")
if login_data.role == UserRole.PARTNER and UserRole.PARTNER not in user.roles:
return error_response(code=401, message="你没有登录权限")
if login_data.role == UserRole.MERCHANT and UserRole.MERCHANT not in user.roles:
return error_response(code=401, message="你没有登录权限")
if login_data.role == UserRole.DELIVERYMAN and UserRole.DELIVERYMAN not in user.roles:
return error_response(code=401, message="你没有登录权限")
if login_data.role == UserRole.USER and UserRole.USER not in user.roles:
return error_response(code=401, message="你没有登录权限")
if login_data.role == UserRole.MERCHANT and UserRole.MERCHANT in user.roles:
# 检查是否有商家设置了当前用户 id
merchant = db.query(MerchantDB).filter(MerchantDB.user_id == user.userid).first()
if not merchant:
return error_response(code=401, message="商家账户,请先关联商家")
if login_data.role == UserRole.DELIVERYMAN and UserRole.DELIVERYMAN in user.roles and not user.community_id:
return error_response(code=401, message="配送员账户,请先设置归属小区")
# 生成访问令牌
access_token = create_access_token(data={"phone": user.phone})
request.session["access_token"] = access_token
return success_response(
data={
"access_token": access_token,
"token_type": "bearer",
"user": UserInfo.model_validate(user)
}
)
@router.get("/referrals", response_model=ResponseModel)
async def get_referral_users(
skip: int = 0,
limit: int = 10,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取我邀请的用户列表"""
referral_users = db.query(UserDB).filter(
UserDB.referral_code == current_user.user_code
).order_by(
UserDB.create_time.desc()
).offset(skip).limit(limit).all()
total = db.query(UserDB).filter(
UserDB.referral_code == current_user.user_code
).count()
user_list = []
# 获取用户是否下单
for user in referral_users:
u = ReferralUserInfo.model_validate(user).model_dump()
order_count = db.query(ShippingOrderDB).filter(
ShippingOrderDB.userid == user.userid,
ShippingOrderDB.status == OrderStatus.COMPLETED
).count()
u['is_place_order'] = order_count > 0
u['phone'] = f"{u['phone'][:3]}****{u['phone'][7:]}"
user_list.append(u)
return success_response(data={
"total": total,
"items": user_list
})
@router.get("/list", response_model=ResponseModel)
async def get_list(
skip: int = 0,
limit: int = 10,
role: Optional[str] = None,
phone: Optional[str] = None, # 手机号精确查询
community_id: Optional[int] = None,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""获取用户列表(管理员)"""
# 使用 join 查询用户和小区信息
query = db.query(
UserDB,
CommunityDB.name.label('community_name')
).outerjoin(
CommunityDB,
UserDB.community_id == CommunityDB.id
)
# 如果指定了角色,添加角色筛选条件
if role:
query = query.filter(text(f"JSON_CONTAINS(users.roles, '\"{role}\"')"))
# 如果指定了手机号,添加精确匹配条件
if phone:
query = query.filter(UserDB.phone == phone)
# 如果指定了小区ID添加精确匹配条件
if community_id:
query = query.filter(UserDB.community_id == community_id)
total = query.count()
results = query.order_by(
UserDB.create_time.desc()
).offset(skip).limit(limit).all()
# 处理手机号脱敏
def mask_phone(phone: str) -> str:
return f"{phone[:3]}****{phone[7:]}"
user_list = []
for user, community_name in results:
user_info = UserInfo.model_validate(user)
user_info.phone = mask_phone(user_info.phone) # 手机号脱敏
user_list.append({
**user_info.model_dump(),
"community_name": community_name
})
return success_response(data={
"total": total,
"items": user_list
})
@router.post("/reset-password", response_model=ResponseModel)
async def reset_password(
request: ResetPasswordRequest,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user) # 仅管理员可操作
):
"""重置用户密码(管理员)"""
# 查找用户
user = db.query(UserDB).filter(UserDB.userid == request.user_id).first()
if not user:
return error_response(code=404, message="用户不存在")
# 重置密码
hashed_password = get_password_hash(request.new_password)
user.password = hashed_password
try:
db.commit()
return success_response(
message="密码重置成功",
data={
"userid": user.userid,
"nickname": user.nickname,
"phone": f"{user.phone[:3]}****{user.phone[7:]}" # 手机号脱敏
}
)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"密码重置失败: {str(e)}")
class UpdateUserCommunityRequest(BaseModel):
user_id: int
community_id: Optional[int] = None # None 表示清除归属小区
@router.put("/community", response_model=ResponseModel)
async def update_user_community(
request: UpdateUserCommunityRequest,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user) # 仅管理员可操作
):
"""更新用户归属小区(管理员)"""
# 查找用户
user = db.query(UserDB).filter(UserDB.userid == request.user_id).first()
if not user:
return error_response(code=404, message="用户不存在")
# 如果指定了小区ID验证小区是否存在
if request.community_id is not None:
community = db.query(CommunityDB).filter(
CommunityDB.id == request.community_id
).first()
if not community:
return error_response(code=404, message="小区不存在")
try:
# 更新用户归属小区
user.community_id = request.community_id
db.commit()
return success_response(
message="更新成功",
data=UserInfo.model_validate(user)
)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.post("/change-password", response_model=ResponseModel)
async def change_password(
request: ChangePasswordRequest,
db: Session = Depends(get_db)
):
"""通过短信验证码修改密码"""
user = db.query(UserDB).filter(UserDB.phone == request.phone).first()
if not user:
return error_response(message="用户不存在")
# 验证短信验证码
redis_code = redis_client.get_client().get(f"verify_code:{user.phone}")
if not redis_code:
return error_response(message="验证码已过期")
if redis_code != request.verify_code:
return error_response(message="验证码错误")
try:
# 更新密码
user.password = get_password_hash(request.new_password)
db.commit()
# 删除验证码
redis_client.get_client().delete(f"verify_code:{request.phone}")
return success_response(message="密码修改成功")
except Exception as e:
db.rollback()
return error_response(code=500, message=f"修改密码失败: {str(e)}")
@router.put("/deliveryman/set_delivering", response_model=ResponseModel)
async def deliveryman_set_delivering(
background_tasks: BackgroundTasks,
request: DeliverymanSetDelivering,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""配送员设置是否在配送中"""
current_user.is_delivering = request.is_delivering
db.commit()
# 发送企微通知
if not request.is_delivering:
wecom_bot = WecomBot()
background_tasks.add_task(
wecom_bot.send_deliveryman_close_delivering,
db,
current_user,
current_user.community.name
)
return success_response(message="设置成功",data={
"is_delivering": current_user.is_delivering
})
@router.post("/auth", response_model=ResponseModel)
async def create_user_auth(
auth: UserAuthCreate,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""用户实名认证"""
# 检查是否已认证
if current_user.is_auth:
return error_response(code=400, message="用户已完成实名认证")
# 检查是否存在认证记录
exists = db.query(UserAuthDB).filter(
UserAuthDB.user_id == current_user.userid
).first()
if exists:
return error_response(code=400, message="该用户已有认证记录")
try:
#调用实名认证
verify_result = await qcloud_manager.verify_id_card(
id_card=auth.id_number,
name=auth.name
)
# 验证不通过
if verify_result["Result"] != "0": # 0 表示一致,其他值表示不一致
return error_response(
code=400,
message=f"实名认证失败: {verify_result['Description']}"
)
# 创建认证记录
auth_record = UserAuthDB(
user_id=current_user.userid,
name=auth.name,
id_number=auth.id_number
)
db.add(auth_record)
# 更新用户认证状态
current_user.is_auth = True
db.commit()
return success_response(data=UserAuthInfo.model_validate(auth_record))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"认证失败: {str(e)}")
@router.get("/auth", response_model=ResponseModel)
async def get_user_auth(
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取用户认证信息"""
auth_record = db.query(UserAuthDB).filter(
UserAuthDB.user_id == current_user.userid
).first()
if not auth_record:
return error_response(code=404, message="未找到认证记录")
# 身份证号脱敏保留前4位和后4位中间用*代替
masked_id_number = f"{auth_record.id_number[:4]}{'*' * 10}{auth_record.id_number[-4:]}"
auth_info = UserAuthInfo.model_validate(auth_record)
auth_info.id_number = masked_id_number
return success_response(data=auth_info)
#通过手机号搜索用户
@router.get("/search_by_phone/{phone}", response_model=ResponseModel)
async def search_user_by_phone(
phone: str,
role: Optional[str] = None,
db: Session = Depends(get_db)
):
"""通过手机号搜索用户"""
user = db.query(UserDB).filter(UserDB.phone == phone).first()
if not user:
return error_response(code=404, message="用户不存在")
if role:
user.roles = list(set(user.roles)) # 去重
if role not in user.roles:
return error_response(code=404, message="该用户没有运营商权限")
return success_response(data=UserInfo.model_validate(user))