deliveryman-api/app/api/endpoints/merchant_product.py
2025-03-28 14:44:48 +08:00

186 lines
6.6 KiB
Python

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from typing import List, Optional
from app.models.merchant_product import (
MerchantProductDB,
MerchantProductCreate,
MerchantProductUpdate,
MerchantProductInfo
)
from app.models.database import get_db
from app.api.deps import get_admin_user
from app.models.user import UserDB
from app.core.response import success_response, error_response, ResponseModel
from app.models.merchant import MerchantDB
from sqlalchemy import func
from sqlalchemy.orm import joinedload
from app.models.merchant_product import OperationType, DeliveryType, DeliveryTimeType
from app.models.merchant import MerchantInfo
from app.models.address import AddressDB, AddressInfo, AddressType
from app.api.deps import get_current_user
from app.core.config import settings
router = APIRouter()
@router.post("", response_model=ResponseModel)
async def create_merchant_product(
product: MerchantProductCreate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""创建商家产品(管理员)"""
# 检查商家是否存在
merchant = db.query(MerchantDB).filter(
MerchantDB.id == product.merchant_id
).first()
if not merchant:
return error_response(code=404, message="商家不存在")
db_product = MerchantProductDB(**product.model_dump())
db.add(db_product)
try:
db.commit()
db.refresh(db_product)
return success_response(data=MerchantProductInfo.model_validate(db_product))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"创建失败: {str(e)}")
@router.put("/{product_id}", response_model=ResponseModel)
async def update_product(
product_id: int,
product: MerchantProductUpdate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""更新商家产品(管理员)"""
db_product = db.query(MerchantProductDB).filter(
MerchantProductDB.id == product_id
).first()
if not db_product:
return error_response(code=404, message="产品不存在")
update_data = product.model_dump(exclude_unset=True)
for key, value in update_data.items():
if value is not None:
setattr(db_product, key, value)
try:
db.commit()
db.refresh(db_product)
return success_response(data=MerchantProductInfo.model_validate(db_product))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.get("/list", response_model=ResponseModel)
async def list_merchant_products(
user_id: Optional[int] = None,
community_id: Optional[int] = None,
skip: int = 0,
limit: int = 20,
db: Session = Depends(get_db)
):
"""获取商品列表"""
# 联表查询商家信息
query = db.query(MerchantProductDB).options(
joinedload(MerchantProductDB.merchant)
)
if user_id:
merchant = db.query(MerchantDB).filter(MerchantDB.user_id == user_id).first()
if merchant:
query = query.filter(MerchantProductDB.merchant_id == merchant.id)
if community_id:
query = query.filter(MerchantProductDB.merchant.has(community_id=community_id))
total = query.count()
results = query.order_by(
MerchantProductDB.recommend.asc(),
MerchantProductDB.create_time.desc()
).offset(skip).limit(limit).all()
# 处理返回数据
products = []
for product in results:
product_info = MerchantProductInfo.model_validate(product)
products.append({
**product_info.model_dump(),
"merchant": MerchantInfo.model_validate(product.merchant),
"delivery_type_name": "配送到家" if product.delivery_type == DeliveryType.DELIVERY else "自提",
"delivery_time_type_name": "小时达" if product.delivery_time_type == DeliveryTimeType.IMMEDIATE else "社区团购",
"pickup_time_format": f"{product.delivery_date.strftime('%m月%d')} {product.pickup_time_from.strftime('%H:%M')}~{product.pickup_time_to.strftime('%H:%M')}" if product.delivery_date and product.pickup_time_from and product.pickup_time_to else None
})
return success_response(data={
"total": total,
"items": products
})
@router.get("/{product_id}", response_model=ResponseModel)
async def get_product(
product_id: int,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取产品详情"""
product = db.query(MerchantProductDB).filter(
MerchantProductDB.id == product_id
).first()
if not product:
return error_response(code=404, message="产品不存在")
product_info = MerchantProductInfo.model_validate(product)
merchant = MerchantInfo.model_validate(product.merchant)
result = {
**product_info.model_dump(),
"gift_points" : int(float(product.gift_points_rate) / 100 * float(product.sale_price)) * settings.POINT_RATIO,
"merchant": merchant.model_dump(),
"delivery_type_name": "配送到家" if product.delivery_type == DeliveryType.DELIVERY else "自提",
"delivery_time_type_name": "小时达" if product.delivery_time_type == DeliveryTimeType.IMMEDIATE else "社区团购",
"pickup_time_format": f"{product.delivery_date.strftime('%m月%d')} {product.pickup_time_from.strftime('%H:%M')}~{product.pickup_time_to.strftime('%H:%M')}" if product.delivery_date and product.pickup_time_from and product.pickup_time_to else None
}
# 获取默认地址
default_address = db.query(AddressDB).filter(
AddressDB.user_id == current_user.userid,
AddressDB.is_default == True,
AddressDB.address_type == AddressType.COMMON
).first()
if default_address:
result["default_address"] = AddressInfo.model_validate(default_address)
return success_response(data=result)
@router.delete("/{product_id}", response_model=ResponseModel)
async def delete_product(
product_id: int,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""删除商家产品(管理员)"""
db_product = db.query(MerchantProductDB).filter(
MerchantProductDB.id == product_id
).first()
if not db_product:
return error_response(code=404, message="产品不存在")
try:
db.delete(db_product)
db.commit()
return success_response(message="删除成功")
except Exception as e:
db.rollback()
return error_response(code=500, message=f"删除失败: {str(e)}")