deliveryman-api/app/api/endpoints/merchant_product.py
2025-03-26 23:52:12 +08:00

184 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from typing import List, Optional
from app.models.merchant_product import (
MerchantProductDB,
MerchantProductCreate,
MerchantProductUpdate,
MerchantProductInfo
)
from app.models.database import get_db
from app.api.deps import get_admin_user
from app.models.user import UserDB
from app.core.response import success_response, error_response, ResponseModel
from app.models.merchant import MerchantDB
from sqlalchemy import func
from sqlalchemy.orm import joinedload
from app.models.merchant_product import OperationType, DeliveryType, DeliveryTimeType
from app.models.merchant import MerchantInfo
from app.models.address import AddressDB, AddressInfo, AddressType
from app.api.deps import get_current_user
from app.core.config import settings
router = APIRouter()
@router.post("", response_model=ResponseModel)
async def create_merchant_product(
product: MerchantProductCreate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""创建商家产品(管理员)"""
# 检查商家是否存在
merchant = db.query(MerchantDB).filter(
MerchantDB.id == product.merchant_id
).first()
if not merchant:
return error_response(code=404, message="商家不存在")
db_product = MerchantProductDB(**product.model_dump())
db.add(db_product)
try:
db.commit()
db.refresh(db_product)
return success_response(data=MerchantProductInfo.model_validate(db_product))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"创建失败: {str(e)}")
@router.put("/{product_id}", response_model=ResponseModel)
async def update_product(
product_id: int,
product: MerchantProductUpdate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""更新商家产品(管理员)"""
db_product = db.query(MerchantProductDB).filter(
MerchantProductDB.id == product_id
).first()
if not db_product:
return error_response(code=404, message="产品不存在")
update_data = product.model_dump(exclude_unset=True)
for key, value in update_data.items():
if value is not None:
setattr(db_product, key, value)
try:
db.commit()
db.refresh(db_product)
return success_response(data=MerchantProductInfo.model_validate(db_product))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.get("/list", response_model=ResponseModel)
async def list_merchant_products(
merchant_id: Optional[int] = None,
community_id: Optional[int] = None,
skip: int = 0,
limit: int = 20,
db: Session = Depends(get_db)
):
"""获取商品列表"""
# 联表查询商家信息
query = db.query(MerchantProductDB).options(
joinedload(MerchantProductDB.merchant)
)
# 如果指定了商家ID添加筛选条件
if merchant_id:
query = query.filter(MerchantProductDB.merchant_id == merchant_id)
if community_id:
query = query.filter(MerchantProductDB.merchant.has(community_id=community_id))
total = query.count()
results = query.order_by(
MerchantProductDB.recommend.asc(),
MerchantProductDB.create_time.desc()
).offset(skip).limit(limit).all()
# 处理返回数据
products = []
for product in results:
product_info = MerchantProductInfo.model_validate(product)
products.append({
**product_info.model_dump(),
"delivery_type_name": "配送到家" if product.delivery_type == DeliveryType.DELIVERY else "自提",
"delivery_time_type_name": "小时达" if product.delivery_time_type == DeliveryTimeType.IMMEDIATE else "社区团购",
"pickup_time_format": f"{product.delivery_date.strftime('%m月%d')} {product.pickup_time_from.strftime('%H:%M')}~{product.pickup_time_to.strftime('%H:%M')}" if product.delivery_date and product.pickup_time_from and product.pickup_time_to else None
})
return success_response(data={
"total": total,
"items": products
})
@router.get("/{product_id}", response_model=ResponseModel)
async def get_product(
product_id: int,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取产品详情"""
product = db.query(MerchantProductDB).filter(
MerchantProductDB.id == product_id
).first()
if not product:
return error_response(code=404, message="产品不存在")
product_info = MerchantProductInfo.model_validate(product)
merchant = MerchantInfo.model_validate(product.merchant)
result = {
**product_info.model_dump(),
"gift_points" : int(float(product.gift_points_rate) / 100 * float(product.sale_price)) * settings.POINT_RATIO,
"merchant": merchant.model_dump(),
"delivery_type_name": "配送到家" if product.delivery_type == DeliveryType.DELIVERY else "自提",
"delivery_time_type_name": "小时达" if product.delivery_time_type == DeliveryTimeType.IMMEDIATE else "社区团购",
"pickup_time_format": f"{product.delivery_date.strftime('%m月%d')} {product.pickup_time_from.strftime('%H:%M')}~{product.pickup_time_to.strftime('%H:%M')}" if product.delivery_date and product.pickup_time_from and product.pickup_time_to else None
}
# 获取默认地址
default_address = db.query(AddressDB).filter(
AddressDB.user_id == current_user.userid,
AddressDB.is_default == True,
AddressDB.address_type == AddressType.COMMON
).first()
if default_address:
result["default_address"] = AddressInfo.model_validate(default_address)
return success_response(data=result)
@router.delete("/{product_id}", response_model=ResponseModel)
async def delete_product(
product_id: int,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""删除商家产品(管理员)"""
db_product = db.query(MerchantProductDB).filter(
MerchantProductDB.id == product_id
).first()
if not db_product:
return error_response(code=404, message="产品不存在")
try:
db.delete(db_product)
db.commit()
return success_response(message="删除成功")
except Exception as e:
db.rollback()
return error_response(code=500, message=f"删除失败: {str(e)}")