deliveryman-api/app/api/endpoints/coupon.py
2025-04-06 20:20:43 +08:00

272 lines
8.4 KiB
Python

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from typing import List, Optional
from app.models.coupon import (
CouponDB,
UserCouponDB,
CouponCreate,
CouponUpdate,
CouponInfo,
UserCouponCreate,
UserCouponInfo,
CouponStatus,
CouponIssueRecordDB,
CouponIssueRecordCreate,
CouponIssueRecordInfo
)
from app.models.database import get_db
from app.api.deps import get_admin_user, get_current_user
from app.models.user import UserDB
from app.core.response import success_response, error_response, ResponseModel
from datetime import datetime, timezone
from app.core.coupon_manager import CouponManager
router = APIRouter()
@router.post("", response_model=ResponseModel)
async def create_coupon(
coupon: CouponCreate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""创建优惠券(管理员)"""
db_coupon = CouponDB(**coupon.model_dump())
db.add(db_coupon)
try:
db.commit()
db.refresh(db_coupon)
return success_response(data=CouponInfo.model_validate(db_coupon))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"创建优惠券失败: {str(e)}")
@router.put("/{coupon_id}/use", response_model=ResponseModel)
async def use_coupon(
coupon_id: int,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
""" 使用优惠券 """
coupon = db.query(UserCouponDB).filter(
UserCouponDB.id == coupon_id,
UserCouponDB.user_id == current_user.userid,
UserCouponDB.status == CouponStatus.UNUSED,
UserCouponDB.expire_time > datetime.now()
).first()
if not coupon:
return error_response(code=404, message="优惠券不存在")
coupon.status = CouponStatus.USED
coupon.used_time = datetime.now()
db.commit()
return success_response(data=UserCouponInfo.model_validate(coupon))
@router.put("/{coupon_id}", response_model=ResponseModel)
async def update_coupon(
coupon_id: int,
coupon: CouponUpdate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""更新优惠券(管理员)"""
db_coupon = db.query(CouponDB).filter(CouponDB.id == coupon_id).first()
if not db_coupon:
return error_response(code=404, message="优惠券不存在")
update_data = coupon.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_coupon, key, value)
try:
db.commit()
db.refresh(db_coupon)
return success_response(data=CouponInfo.model_validate(db_coupon))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新优惠券失败: {str(e)}")
@router.post("/issue", response_model=ResponseModel)
async def issue_coupon(
user_coupon: UserCouponCreate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""发放优惠券给用户(管理员)"""
# 查询优惠券信息
coupon = db.query(CouponDB).filter(CouponDB.id == user_coupon.coupon_id).first()
if not coupon:
return error_response(code=404, message="优惠券不存在")
# 查询用户信息
user = db.query(UserDB).filter(UserDB.userid == user_coupon.user_id).first()
if not user:
return error_response(code=404, message="用户不存在")
# 创建发放记录
issue_record = CouponIssueRecordDB(
coupon_id=coupon.id,
user_id=user_coupon.user_id,
operator_id=admin.userid,
count=user_coupon.count,
remark=f"{admin.nickname}: {user_coupon.remark}" if user_coupon.remark else f"{admin.nickname} 手动发放"
)
db.add(issue_record)
# 批量创建用户优惠券
manager = CouponManager(coupon)
user_coupons = []
for _ in range(user_coupon.count):
user_coupon_obj = manager.add_coupon(
user_id=user_coupon.user_id,
coupon_id=coupon.id,
expire_time=user_coupon.expire_time
)
db.add(user_coupon_obj)
user_coupons.append(user_coupon_obj)
try:
db.commit()
db.refresh(issue_record)
return success_response(
message=f"成功发放 {user_coupon.count} 张优惠券",
data={
"issue_record": CouponIssueRecordInfo(
id=issue_record.id,
coupon_id=issue_record.coupon_id,
user_id=issue_record.user_id,
operator_id=issue_record.operator_id,
count=issue_record.count,
issue_time=issue_record.issue_time,
remark=issue_record.remark,
coupon_name=coupon.name,
user_nickname=user.nickname,
operator_nickname=admin.nickname
),
"coupon_count": len(user_coupons)
}
)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"发放优惠券失败: {str(e)}")
@router.get("/user/list", response_model=ResponseModel)
async def get_user_coupons(
skip: int = 0,
limit: int = 10,
status: Optional[CouponStatus] = None,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取用户的优惠券列表"""
query = db.query(UserCouponDB).filter(
UserCouponDB.user_id == current_user.userid
)
# 如果指定了状态,添加状态过滤
if status:
query = query.filter(UserCouponDB.status == status)
# 更新过期状态
now = datetime.now(timezone.utc)
db.query(UserCouponDB).filter(
UserCouponDB.user_id == current_user.userid,
UserCouponDB.status == CouponStatus.UNUSED,
UserCouponDB.expire_time < now
).update({"status": CouponStatus.EXPIRED})
db.commit()
# 获取分页数据
coupons = query.order_by(
UserCouponDB.create_time.desc()
).offset(skip).limit(limit).all()
return success_response(
data=[UserCouponInfo.model_validate(c) for c in coupons]
)
@router.get("/list", response_model=ResponseModel)
async def get_all_coupons(
skip: int = 0,
limit: int = 10,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""获取所有优惠券列表(管理员)"""
query = db.query(CouponDB).order_by(
CouponDB.create_time.desc()
)
total = query.count()
items = query.offset(skip).limit(limit).all()
return success_response(
data={
"total": total,
"items": [CouponInfo.model_validate(c) for c in items]
}
)
@router.get("/issue/records", response_model=ResponseModel)
async def get_issue_records(
skip: int = 0,
limit: int = 10,
coupon_id: Optional[int] = None,
user_id: Optional[int] = None,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""获取优惠券发放记录(管理员)"""
query = db.query(CouponIssueRecordDB)
# 添加过滤条件
if coupon_id:
query = query.filter(CouponIssueRecordDB.coupon_id == coupon_id)
if user_id:
query = query.filter(CouponIssueRecordDB.user_id == user_id)
# 计算总数
total = query.count()
# 获取数据
records = query.order_by(
CouponIssueRecordDB.issue_time.desc()
).offset(skip).limit(limit).all()
# 构建响应数据
result = []
for record in records:
# 获取关联信息
coupon = db.query(CouponDB).filter(CouponDB.id == record.coupon_id).first()
user = db.query(UserDB).filter(UserDB.userid == record.user_id).first()
operator = db.query(UserDB).filter(UserDB.userid == record.operator_id).first()
record_info = CouponIssueRecordInfo(
id=record.id,
coupon_id=record.coupon_id,
user_id=record.user_id,
operator_id=record.operator_id,
count=record.count,
issue_time=record.issue_time,
remark=record.remark,
coupon_name=coupon.name if coupon else None,
user_nickname=user.nickname if user else None,
operator_nickname=operator.nickname if operator else None
)
result.append(record_info)
return success_response(
data={
"total": total,
"items": result
}
)