deliveryman-api/app/api/endpoints/coupon_activity.py
2025-04-07 18:41:08 +08:00

329 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from app.models.coupon_activity import (
CouponActivityDB,
CouponActivityCreate,
CouponActivityUpdate,
CouponActivityInfo
)
from app.models.coupon_receive_record import CouponReceiveRecordDB
from app.models.coupon import CouponDB, UserCouponDB, CouponStatus, CouponInfo
from app.models.database import get_db
from app.api.deps import get_current_user, get_admin_user
from app.models.user import UserDB
from app.core.response import success_response, error_response, ResponseModel
from typing import Optional, List
from datetime import datetime, time, timedelta
from app.core.coupon_manager import CouponManager
from app.core.wechat import WeChatClient
import random
import string
from app.core.qcloud import qcloud_manager
router = APIRouter()
@router.post("", response_model=ResponseModel)
async def create_coupon_activity(
activity: CouponActivityCreate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""创建优惠券活动(管理员)"""
# 检查优惠券是否存在
for coupon_id in activity.coupon_config.keys():
coupon = db.query(CouponDB).filter(CouponDB.id == coupon_id).first()
if not coupon:
return error_response(code=404, message=f"优惠券ID {coupon_id} 不存在")
# 检查数量是否大于0
if activity.coupon_config[coupon_id]['count'] <= 0:
return error_response(code=400, message=f"优惠券ID {coupon_id} 的数量必须大于0")
if activity.coupon_config[coupon_id]['days'] <= 0:
return error_response(code=400, message=f"优惠券ID {coupon_id} 的有效天数必须大于0")
db_activity = CouponActivityDB(**activity.model_dump())
db.add(db_activity)
try:
db.commit()
db.refresh(db_activity)
return success_response(data=CouponActivityInfo.model_validate(db_activity))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"创建失败: {str(e)}")
@router.get("/{activity_id}", response_model=ResponseModel)
async def get_coupon_activity(
activity_id: int,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取优惠券活动详情"""
activity = db.query(CouponActivityDB).filter(
CouponActivityDB.id == activity_id
).first()
if not activity:
return error_response(code=404, message="活动不存在")
# 获取活动对应的优惠券,并设置数量
coupons = db.query(CouponDB).filter(
CouponDB.id.in_(activity.coupon_config.keys())
).all()
activity_data = CouponActivityInfo.model_validate(activity).model_dump()
coupon_list = []
for coupon in coupons:
coupon_info = CouponInfo.model_validate(coupon).model_dump()
coupon_info.update({'count': activity_data['coupon_config'][coupon.id]['count']})
coupon_info.update({'available_days': activity_data['coupon_config'][coupon.id]['days']})
coupon_list.append(coupon_info)
activity_data.update({'coupons': coupon_list})
# 检查总领取次数是否超过限制
can_receive = True
if activity.total_limit > 0:
user_receive_count = db.query(func.count(CouponReceiveRecordDB.id)).filter(
CouponReceiveRecordDB.activity_id == activity_id,
).scalar()
if user_receive_count >= activity.total_limit:
can_receive = False
# 检查当前是否可以领取
current_time = datetime.now().time()
if current_time < activity.daily_start_time or current_time > activity.daily_end_time:
can_receive = False
activity_data.update({'can_receive': can_receive})
# 检查活动是否结束
is_end = activity.end_time < datetime.now()
activity_data.update({'can_receive': not is_end})
activity_data.update({'is_end': is_end})
return success_response(data=activity_data)
@router.get("", response_model=ResponseModel)
async def get_coupon_activities(
is_active: Optional[bool] = None,
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db)
):
"""获取优惠券活动列表"""
query = db.query(CouponActivityDB)
if is_active is not None:
query = query.filter(CouponActivityDB.is_active == is_active)
total = query.count()
activities = query.order_by(CouponActivityDB.create_time.desc())\
.offset(skip)\
.limit(limit)\
.all()
# 查询每个活动被领取的次数
activities_data = []
for activity in activities:
receive_count = db.query(func.sum(CouponReceiveRecordDB.receive_count)).filter(
CouponReceiveRecordDB.activity_id == activity.id,
).scalar()
activity_data = CouponActivityInfo.model_validate(activity).model_dump()
activity_data.update({'receive_count': receive_count})
activities_data.append(activity_data)
return success_response(data={
"total": total,
"items": activities_data
})
async def check_activity_can_receive(
activity_id: int,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""检查活动是否可以领取"""
# 查询活动
activity = db.query(CouponActivityDB).filter(
CouponActivityDB.id == activity_id
).first()
if not activity:
return False, "活动不存在", None
if not activity.is_active:
return False, "活动已结束", activity
# 检查领取时间
current_time = datetime.now().time()
if current_time < activity.daily_start_time or current_time > activity.daily_end_time:
return False, "不在领取时间范围内", activity
# 检查总领取次数
total_receive_count = db.query(func.sum(CouponReceiveRecordDB.receive_count)).filter(
CouponReceiveRecordDB.activity_id == activity_id
).scalar()
total = total_receive_count if total_receive_count else 0
if activity.total_limit > 0 and total >= activity.total_limit:
return False, f"优惠券活动已领完", activity
# 检查用户领取次数
record = db.query(CouponReceiveRecordDB).filter(
CouponReceiveRecordDB.user_id == current_user.userid,
CouponReceiveRecordDB.activity_id == activity_id
).first()
if record:
if activity.user_limit > 0 and record.receive_count >= activity.user_limit:
return False, "你已经领过了", activity
return True, "可领取", activity
@router.post("/{activity_id}/receive", response_model=ResponseModel)
async def receive_coupons(
activity_id: int,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""领取优惠券"""
can_receive, message, activity = await check_activity_can_receive(activity_id, db, current_user)
if not can_receive:
return error_response(code=400, message=message)
try:
# 发放优惠券
for coupon_id, config in activity.coupon_config.items():
coupon = db.query(CouponDB).filter(CouponDB.id == coupon_id).first()
if coupon:
today = datetime.now().date()
#过期时间15 天
expire_time = datetime.combine(today, datetime.max.time()) + timedelta(days=config['days'])
count=config['count']
for _ in range(count):
user_coupon = UserCouponDB(
user_id=current_user.userid,
coupon_id=coupon_id,
coupon_name=coupon.name,
coupon_amount=coupon.amount,
coupon_type=coupon.coupon_type,
expire_time=expire_time,
)
db.add(user_coupon)
# 检查是否领取过优惠券
receive_record = db.query(CouponReceiveRecordDB).filter(
CouponReceiveRecordDB.user_id == current_user.userid,
CouponReceiveRecordDB.activity_id == activity_id,
CouponReceiveRecordDB.receive_date == today
).first()
if receive_record:
receive_record.receive_count += 1
else:
record = CouponReceiveRecordDB(
user_id=current_user.userid,
activity_id=activity_id,
receive_date=today,
receive_count=1
)
db.add(record)
db.commit()
return success_response(message="领取成功")
except Exception as e:
db.rollback()
return error_response(code=500, message=f"领取失败: {str(e)}")
@router.get("/{activity_id}/check_receive", response_model=ResponseModel)
async def check_receive(
activity_id: int,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""检查是否领取过优惠券"""
can_receive, message, activity = await check_activity_can_receive(activity_id, db, current_user)
return success_response(data={
"can_receive": can_receive,
"message": message
})
@router.get("/{activity_id}/get_url_link", response_model=ResponseModel)
async def get_url_link(
activity_id: int,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取URL链接"""
activity = db.query(CouponActivityDB).filter(
CouponActivityDB.id == activity_id
).first()
if not activity:
return error_response(code=404, message="活动不存在")
if activity.qr_code:
return success_response(data=activity.qr_code)
# 获取URL链接
wechat_client = WeChatClient()
image_data = await wechat_client.get_wx_code(path=f"pages/my/promation/activities/index", query=f"id={activity_id}")
random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
key = f"qr_code/{current_user.user_code}_{random_str}.png"
url = await qcloud_manager.upload_file_bytes(image_data, key)
activity.qr_code = url
db.commit()
return success_response(data=url)
@router.put("/{activity_id}", response_model=ResponseModel)
async def update_coupon_activity(
activity_id: int,
activity: CouponActivityUpdate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""更新优惠券活动(管理员)"""
db_activity = db.query(CouponActivityDB).filter(
CouponActivityDB.id == activity_id
).first()
if not db_activity:
return error_response(code=404, message="活动不存在")
# 检查优惠券是否存在
if activity.coupon_config:
for coupon_id, config in activity.coupon_config.items():
coupon = db.query(CouponDB).filter(CouponDB.id == coupon_id).first()
if not coupon:
return error_response(code=404, message=f"优惠券ID {coupon_id} 不存在")
# 检查数量是否大于0
if config['count'] <= 0:
return error_response(code=400, message=f"优惠券ID {coupon_id} 的数量必须大于0")
if config['days'] <= 0:
return error_response(code=400, message=f"优惠券ID {coupon_id} 的有效天数必须大于0")
update_data = activity.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_activity, key, value)
try:
db.commit()
db.refresh(db_activity)
return success_response(data=CouponActivityInfo.model_validate(db_activity))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")