173 lines
5.2 KiB
Python
173 lines
5.2 KiB
Python
from fastapi import APIRouter, Depends
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
from app.models.coupon import (
|
|
CouponDB,
|
|
UserCouponDB,
|
|
CouponCreate,
|
|
CouponUpdate,
|
|
CouponInfo,
|
|
UserCouponCreate,
|
|
UserCouponInfo,
|
|
CouponStatus
|
|
)
|
|
from app.models.database import get_db
|
|
from app.api.deps import get_admin_user, get_current_user
|
|
from app.models.user import UserDB
|
|
from app.core.response import success_response, error_response, ResponseModel
|
|
from datetime import datetime, timezone
|
|
from app.core.coupon_manager import CouponManager
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("", response_model=ResponseModel)
|
|
async def create_coupon(
|
|
coupon: CouponCreate,
|
|
db: Session = Depends(get_db),
|
|
admin: UserDB = Depends(get_admin_user)
|
|
):
|
|
"""创建优惠券(管理员)"""
|
|
db_coupon = CouponDB(**coupon.model_dump())
|
|
db.add(db_coupon)
|
|
|
|
try:
|
|
db.commit()
|
|
db.refresh(db_coupon)
|
|
return success_response(data=CouponInfo.model_validate(db_coupon))
|
|
except Exception as e:
|
|
db.rollback()
|
|
return error_response(code=500, message=f"创建优惠券失败: {str(e)}")
|
|
|
|
|
|
@router.put("/{coupon_id}/use", response_model=ResponseModel)
|
|
async def use_coupon(
|
|
coupon_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserDB = Depends(get_current_user)
|
|
):
|
|
""" 使用优惠券 """
|
|
|
|
coupon = db.query(UserCouponDB).filter(
|
|
UserCouponDB.id == coupon_id,
|
|
UserCouponDB.user_id == current_user.userid,
|
|
UserCouponDB.status == CouponStatus.UNUSED,
|
|
UserCouponDB.expire_time > datetime.now()
|
|
).first()
|
|
|
|
if not coupon:
|
|
return error_response(code=404, message="优惠券不存在")
|
|
|
|
coupon.status = CouponStatus.USED
|
|
coupon.used_time = datetime.now()
|
|
|
|
db.commit()
|
|
|
|
return success_response(data=UserCouponInfo.model_validate(coupon))
|
|
|
|
|
|
@router.put("/{coupon_id}", response_model=ResponseModel)
|
|
async def update_coupon(
|
|
coupon_id: int,
|
|
coupon: CouponUpdate,
|
|
db: Session = Depends(get_db),
|
|
admin: UserDB = Depends(get_admin_user)
|
|
):
|
|
"""更新优惠券(管理员)"""
|
|
db_coupon = db.query(CouponDB).filter(CouponDB.id == coupon_id).first()
|
|
if not db_coupon:
|
|
return error_response(code=404, message="优惠券不存在")
|
|
|
|
update_data = coupon.model_dump(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
setattr(db_coupon, key, value)
|
|
|
|
try:
|
|
db.commit()
|
|
db.refresh(db_coupon)
|
|
return success_response(data=CouponInfo.model_validate(db_coupon))
|
|
except Exception as e:
|
|
db.rollback()
|
|
return error_response(code=500, message=f"更新优惠券失败: {str(e)}")
|
|
|
|
@router.post("/issue", response_model=ResponseModel)
|
|
async def issue_coupon(
|
|
user_coupon: UserCouponCreate,
|
|
db: Session = Depends(get_db),
|
|
admin: UserDB = Depends(get_admin_user)
|
|
):
|
|
"""发放优惠券给用户(管理员)"""
|
|
# 查询优惠券信息
|
|
coupon = db.query(CouponDB).filter(CouponDB.id == user_coupon.coupon_id).first()
|
|
if not coupon:
|
|
return error_response(code=404, message="优惠券不存在")
|
|
|
|
# 批量创建用户优惠券
|
|
for _ in range(user_coupon.count):
|
|
manager = CouponManager(coupon)
|
|
manager.add_coupon(
|
|
user_id=user_coupon.user_id,
|
|
coupon_id=coupon.id,
|
|
count=user_coupon.count,
|
|
expire_time=user_coupon.expire_time
|
|
)
|
|
|
|
try:
|
|
db.commit()
|
|
return success_response(
|
|
message=f"成功发放 {user_coupon.count} 张优惠券"
|
|
)
|
|
except Exception as e:
|
|
db.rollback()
|
|
return error_response(code=500, message=f"发放优惠券失败: {str(e)}")
|
|
|
|
@router.get("/user/list", response_model=ResponseModel)
|
|
async def get_user_coupons(
|
|
skip: int = 0,
|
|
limit: int = 10,
|
|
status: Optional[CouponStatus] = None,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserDB = Depends(get_current_user)
|
|
):
|
|
"""获取用户的优惠券列表"""
|
|
query = db.query(UserCouponDB).filter(
|
|
UserCouponDB.user_id == current_user.userid
|
|
)
|
|
|
|
# 如果指定了状态,添加状态过滤
|
|
if status:
|
|
query = query.filter(UserCouponDB.status == status)
|
|
|
|
# 更新过期状态
|
|
now = datetime.now(timezone.utc)
|
|
db.query(UserCouponDB).filter(
|
|
UserCouponDB.user_id == current_user.userid,
|
|
UserCouponDB.status == CouponStatus.UNUSED,
|
|
UserCouponDB.expire_time < now
|
|
).update({"status": CouponStatus.EXPIRED})
|
|
|
|
db.commit()
|
|
|
|
# 获取分页数据
|
|
coupons = query.order_by(
|
|
UserCouponDB.create_time.desc()
|
|
).offset(skip).limit(limit).all()
|
|
|
|
return success_response(
|
|
data=[UserCouponInfo.model_validate(c) for c in coupons]
|
|
)
|
|
|
|
@router.get("/list", response_model=ResponseModel)
|
|
async def get_all_coupons(
|
|
skip: int = 0,
|
|
limit: int = 10,
|
|
db: Session = Depends(get_db),
|
|
admin: UserDB = Depends(get_admin_user)
|
|
):
|
|
"""获取所有优惠券列表(管理员)"""
|
|
coupons = db.query(CouponDB).order_by(
|
|
CouponDB.create_time.desc()
|
|
).offset(skip).limit(limit).all()
|
|
|
|
return success_response(
|
|
data=[CouponInfo.model_validate(c) for c in coupons]
|
|
) |