deliveryman-api/app/api/endpoints/merchant_pay_order.py
2025-02-06 22:35:19 +09:00

223 lines
7.0 KiB
Python

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from typing import Optional, List
from app.models.merchant_pay_order import (
MerchantPayOrderDB,
MerchantPayOrderCreate,
MerchantPayOrderInfo,
MerchantPayOrderStatus)
from app.models.merchant import MerchantDB
from app.models.database import get_db
from app.api.deps import get_current_user, get_admin_user
from app.models.user import UserDB
from app.core.response import success_response, error_response, ResponseModel
from datetime import datetime, timezone
from app.core.config import settings
from pydantic import BaseModel
from app.core.utils import CommonUtils
router = APIRouter()
class CalculatePointsRequest(BaseModel):
"""计算积分请求"""
merchant_id: int
amount: float
@router.post("/calculate-points", response_model=ResponseModel)
async def calculate_gift_points(
request: CalculatePointsRequest,
db: Session = Depends(get_db)
):
"""计算支付订单可获得的积分数量"""
# 检查商家是否存在
merchant = db.query(MerchantDB).filter(
MerchantDB.id == request.merchant_id
).first()
if not merchant:
return error_response(code=404, message="商家不存在")
# 计算赠送积分
gift_points = request.amount * (float(merchant.pay_gift_points_rate) / 100) * settings.POINT_RATIO
return success_response(data={
"gift_points": float(gift_points)
})
@router.post("", response_model=ResponseModel)
async def create_pay_order(
order: MerchantPayOrderCreate,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""创建支付订单"""
# 检查商家是否存在
merchant = db.query(MerchantDB).filter(
MerchantDB.id == order.merchant_id
).first()
if not merchant:
return error_response(code=404, message="商家不存在")
# 生成订单号
order_id = CommonUtils.generate_order_id('P')
# 创建订单
db_order = MerchantPayOrderDB(
order_id=order_id,
merchant_id=order.merchant_id,
user_id=current_user.userid,
amount=order.amount,
gift_points=order.amount * (merchant.pay_gift_points_rate / 100) * settings.POINT_RATIO
)
try:
db.add(db_order)
db.commit()
db.refresh(db_order)
# 获取完整订单信息
order = db.query(MerchantPayOrderDB).join(
MerchantDB,
MerchantPayOrderDB.merchant_id == MerchantDB.id
).filter(
MerchantPayOrderDB.id == db_order.id
).first()
# 获取商家名称
merchant_name = db.query(MerchantDB.name).filter(
MerchantDB.id == order.merchant_id
).scalar()
return success_response(data=MerchantPayOrderInfo(
id=order.id,
order_id=order.order_id,
merchant_id=order.merchant_id,
user_id=order.user_id,
amount=float(order.amount),
gift_points=float(order.gift_points),
status=order.status,
create_time=order.create_time,
update_time=order.update_time,
pay_time=order.pay_time,
merchant_name=merchant_name
))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"创建订单失败: {str(e)}")
@router.get("", response_model=ResponseModel)
async def list_pay_orders(
status: Optional[MerchantPayOrderStatus] = None,
skip: int = 0,
limit: int = 20,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取支付订单列表"""
# 构建基础查询
query = db.query(
MerchantPayOrderDB,
MerchantDB
).join(
MerchantDB,
MerchantPayOrderDB.merchant_id == MerchantDB.id
).filter(
MerchantPayOrderDB.user_id == current_user.userid,
)
# 添加状态筛选
if status:
query = query.filter(MerchantPayOrderDB.status == status)
# 获取总数
total = query.count()
# 获取分页数据
orders = query.order_by(
MerchantPayOrderDB.create_time.desc()
).offset(skip).limit(limit).all()
# 处理返回数据
order_list = [
MerchantPayOrderInfo(
id=order.MerchantPayOrderDB.id,
order_id=order.MerchantPayOrderDB.order_id,
merchant_id=order.MerchantPayOrderDB.merchant_id,
user_id=order.MerchantPayOrderDB.user_id,
amount=float(order.MerchantPayOrderDB.amount),
gift_points=float(order.MerchantPayOrderDB.gift_points),
status=order.MerchantPayOrderDB.status,
create_time=order.MerchantPayOrderDB.create_time,
update_time=order.MerchantPayOrderDB.update_time,
pay_time=order.MerchantPayOrderDB.pay_time,
merchant_name=order.MerchantDB.name,
merchant_image=order.MerchantDB.brand_image_url
) for order in orders
]
return success_response(data={
"total": total,
"items": order_list
})
@router.get("/admin/list", response_model=ResponseModel)
async def admin_list_pay_orders(
status: Optional[MerchantPayOrderStatus] = None,
user_id: Optional[int] = None,
order_id: Optional[str] = None,
skip: int = 0,
limit: int = 20,
db: Session = Depends(get_db),
admin_user: UserDB = Depends(get_admin_user)
):
"""获取支付订单列表(管理员接口)"""
# 构建基础查询
query = db.query(MerchantPayOrderDB).join(
MerchantDB,
MerchantPayOrderDB.merchant_id == MerchantDB.id
)
# 添加筛选条件
if status:
query = query.filter(MerchantPayOrderDB.status == status)
if user_id:
query = query.filter(MerchantPayOrderDB.user_id == user_id)
if order_id:
query = query.filter(MerchantPayOrderDB.order_id == order_id)
# 获取总数
total = query.count()
# 获取分页数据
orders = query.order_by(
MerchantPayOrderDB.create_time.desc()
).offset(skip).limit(limit).all()
# 获取商家名称映射
merchant_ids = [order.merchant_id for order in orders]
merchant_names = dict(
db.query(MerchantDB.id, MerchantDB.name).filter(
MerchantDB.id.in_(merchant_ids)
).all()
)
# 处理返回数据
order_list = [
MerchantPayOrderInfo(
id=order.id,
order_id=order.order_id,
merchant_id=order.merchant_id,
user_id=order.user_id,
amount=float(order.amount),
gift_points=float(order.gift_points),
status=order.status,
create_time=order.create_time,
update_time=order.update_time,
pay_time=order.pay_time,
merchant_name=merchant_names.get(order.merchant_id)
) for order in orders
]
return success_response(data={
"total": total,
"items": order_list
})