deliveryman-api/app/api/endpoints/coupon.py
2025-02-12 21:03:47 +08:00

149 lines
4.6 KiB
Python

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from typing import List, Optional
from app.models.coupon import (
CouponDB,
UserCouponDB,
CouponCreate,
CouponUpdate,
CouponInfo,
UserCouponCreate,
UserCouponInfo,
CouponStatus
)
from app.models.database import get_db
from app.api.deps import get_admin_user, get_current_user
from app.models.user import UserDB
from app.core.response import success_response, error_response, ResponseModel
from datetime import datetime, timezone
router = APIRouter()
@router.post("", response_model=ResponseModel)
async def create_coupon(
coupon: CouponCreate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""创建优惠券(管理员)"""
db_coupon = CouponDB(**coupon.model_dump())
db.add(db_coupon)
try:
db.commit()
db.refresh(db_coupon)
return success_response(data=CouponInfo.model_validate(db_coupon))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"创建优惠券失败: {str(e)}")
@router.put("/{coupon_id}", response_model=ResponseModel)
async def update_coupon(
coupon_id: int,
coupon: CouponUpdate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""更新优惠券(管理员)"""
db_coupon = db.query(CouponDB).filter(CouponDB.id == coupon_id).first()
if not db_coupon:
return error_response(code=404, message="优惠券不存在")
update_data = coupon.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_coupon, key, value)
try:
db.commit()
db.refresh(db_coupon)
return success_response(data=CouponInfo.model_validate(db_coupon))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新优惠券失败: {str(e)}")
@router.post("/issue", response_model=ResponseModel)
async def issue_coupon(
user_coupon: UserCouponCreate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""发放优惠券给用户(管理员)"""
# 查询优惠券信息
coupon = db.query(CouponDB).filter(CouponDB.id == user_coupon.coupon_id).first()
if not coupon:
return error_response(code=404, message="优惠券不存在")
issued_coupons = []
# 批量创建用户优惠券
for _ in range(user_coupon.count):
db_user_coupon = UserCouponDB(
user_id=user_coupon.user_id,
coupon_id=coupon.id,
coupon_name=coupon.name,
coupon_amount=coupon.amount,
expire_time=user_coupon.expire_time
)
db.add(db_user_coupon)
issued_coupons.append(db_user_coupon)
try:
db.commit()
return success_response(
message=f"成功发放 {user_coupon.count} 张优惠券",
data=[UserCouponInfo.model_validate(c) for c in issued_coupons]
)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"发放优惠券失败: {str(e)}")
@router.get("/user/list", response_model=ResponseModel)
async def get_user_coupons(
skip: int = 0,
limit: int = 10,
status: Optional[CouponStatus] = None,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取用户的优惠券列表"""
query = db.query(UserCouponDB).filter(
UserCouponDB.user_id == current_user.userid
)
# 如果指定了状态,添加状态过滤
if status:
query = query.filter(UserCouponDB.status == status)
# 更新过期状态
now = datetime.now(timezone.utc)
db.query(UserCouponDB).filter(
UserCouponDB.user_id == current_user.userid,
UserCouponDB.status == CouponStatus.UNUSED,
UserCouponDB.expire_time < now
).update({"status": CouponStatus.EXPIRED})
db.commit()
# 获取分页数据
coupons = query.order_by(
UserCouponDB.create_time.desc()
).offset(skip).limit(limit).all()
return success_response(
data=[UserCouponInfo.model_validate(c) for c in coupons]
)
@router.get("/list", response_model=ResponseModel)
async def get_all_coupons(
skip: int = 0,
limit: int = 10,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""获取所有优惠券列表(管理员)"""
coupons = db.query(CouponDB).order_by(
CouponDB.create_time.desc()
).offset(skip).limit(limit).all()
return success_response(
data=[CouponInfo.model_validate(c) for c in coupons]
)