deliveryman-api/app/api/endpoints/merchant_product.py
2025-01-17 00:30:46 +08:00

147 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from typing import List, Optional
from app.models.merchant_product import (
MerchantProductDB,
MerchantProductCreate,
MerchantProductUpdate,
MerchantProductInfo
)
from app.models.database import get_db
from app.api.deps import get_admin_user
from app.models.user import UserDB
from app.core.response import success_response, error_response, ResponseModel
from app.models.merchant import MerchantDB
router = APIRouter()
@router.post("", response_model=ResponseModel)
async def create_merchant_product(
product: MerchantProductCreate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""创建商家产品(管理员)"""
# 检查商家是否存在
merchant = db.query(MerchantDB).filter(
MerchantDB.id == product.merchant_id
).first()
if not merchant:
return error_response(code=404, message="商家不存在")
db_product = MerchantProductDB(**product.model_dump())
db.add(db_product)
try:
db.commit()
db.refresh(db_product)
return success_response(data=MerchantProductInfo.model_validate(db_product))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"创建失败: {str(e)}")
@router.put("/{product_id}", response_model=ResponseModel)
async def update_product(
product_id: int,
product: MerchantProductUpdate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""更新商家产品(管理员)"""
db_product = db.query(MerchantProductDB).filter(
MerchantProductDB.id == product_id
).first()
if not db_product:
return error_response(code=404, message="产品不存在")
update_data = product.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_product, key, value)
try:
db.commit()
db.refresh(db_product)
return success_response(data=MerchantProductInfo.model_validate(db_product))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.get("/list", response_model=ResponseModel)
async def list_merchant_products(
merchant_id: Optional[int] = None,
skip: int = 0,
limit: int = 20,
db: Session = Depends(get_db)
):
"""获取商品列表"""
# 联表查询商家信息
query = db.query(
MerchantProductDB,
MerchantDB.name.label('merchant_name')
).join(
MerchantDB,
MerchantProductDB.merchant_id == MerchantDB.id
)
# 如果指定了商家ID添加筛选条件
if merchant_id:
query = query.filter(MerchantProductDB.merchant_id == merchant_id)
total = query.count()
results = query.order_by(
MerchantProductDB.create_time.desc()
).offset(skip).limit(limit).all()
# 处理返回数据
products = []
for product, merchant_name in results:
product_info = MerchantProductInfo.model_validate(product)
products.append({
**product_info.model_dump(),
"merchant_name": merchant_name
})
return success_response(data={
"total": total,
"items": products
})
@router.get("/{product_id}", response_model=ResponseModel)
async def get_product(
product_id: int,
db: Session = Depends(get_db)
):
"""获取产品详情"""
product = db.query(MerchantProductDB).filter(
MerchantProductDB.id == product_id
).first()
if not product:
return error_response(code=404, message="产品不存在")
return success_response(data=MerchantProductInfo.model_validate(product))
@router.delete("/{product_id}", response_model=ResponseModel)
async def delete_product(
product_id: int,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""删除商家产品(管理员)"""
db_product = db.query(MerchantProductDB).filter(
MerchantProductDB.id == product_id
).first()
if not db_product:
return error_response(code=404, message="产品不存在")
try:
db.delete(db_product)
db.commit()
return success_response(message="删除成功")
except Exception as e:
db.rollback()
return error_response(code=500, message=f"删除失败: {str(e)}")