deliveryman-api/app/api/endpoints/merchant.py
2025-01-06 00:47:03 +08:00

142 lines
4.5 KiB
Python

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from typing import List, Optional
from sqlalchemy import text
from app.models.merchant import (
MerchantDB,
MerchantCreate,
MerchantUpdate,
MerchantInfo,
MerchantImageDB
)
from app.models.database import get_db
from app.api.deps import get_admin_user
from app.models.user import UserDB
from app.core.response import success_response, error_response, ResponseModel
router = APIRouter()
@router.post("", response_model=ResponseModel)
async def create_merchant(
merchant: MerchantCreate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""创建商家(管理员)"""
# 创建商家基本信息
merchant_data = merchant.model_dump(exclude={'images'})
db_merchant = MerchantDB(**merchant_data)
db.add(db_merchant)
try:
db.flush() # 获取商家ID
# 创建商家图片
for image in merchant.images:
db_image = MerchantImageDB(
merchant_id=db_merchant.id,
image_url=image.image_url,
sort=image.sort
)
db.add(db_image)
db.commit()
db.refresh(db_merchant)
return success_response(data=MerchantInfo.model_validate(db_merchant))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"创建失败: {str(e)}")
@router.put("/{merchant_id}", response_model=ResponseModel)
async def update_merchant(
merchant_id: int,
merchant: MerchantUpdate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""更新商家信息(管理员)"""
db_merchant = db.query(MerchantDB).filter(
MerchantDB.id == merchant_id
).first()
if not db_merchant:
return error_response(code=404, message="商家不存在")
# 更新基本信息
update_data = merchant.model_dump(exclude={'images'}, exclude_unset=True)
for key, value in update_data.items():
setattr(db_merchant, key, value)
# 如果更新了图片
if merchant.images is not None:
# 删除原有图片
db.query(MerchantImageDB).filter(
MerchantImageDB.merchant_id == merchant_id
).delete()
# 添加新图片
for image in merchant.images:
db_image = MerchantImageDB(
merchant_id=merchant_id,
image_url=image.image_url,
sort=image.sort
)
db.add(db_image)
try:
db.commit()
db.refresh(db_merchant)
return success_response(data=MerchantInfo.model_validate(db_merchant))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.get("/{merchant_id}", response_model=ResponseModel)
async def get_merchant(
merchant_id: int,
db: Session = Depends(get_db)
):
"""获取商家详情"""
merchant = db.query(MerchantDB).filter(
MerchantDB.id == merchant_id
).first()
if not merchant:
return error_response(code=404, message="商家不存在")
return success_response(data=MerchantInfo.model_validate(merchant))
@router.get("", response_model=ResponseModel)
async def list_merchants(
longitude: Optional[float] = None,
latitude: Optional[float] = None,
skip: int = 0,
limit: int = 20,
db: Session = Depends(get_db)
):
"""获取商家列表,如果提供经纬度则按距离排序,否则按创建时间排序"""
if longitude is not None and latitude is not None:
# 使用 MySQL 的 ST_Distance_Sphere 计算距离(单位:米)
merchants = db.query(
MerchantDB,
text("ST_Distance_Sphere(point(longitude, latitude), point(:lon, :lat)) as distance")
).params(
lon=longitude,
lat=latitude
).order_by(
text("distance")
).offset(skip).limit(limit).all()
return success_response(data=[{
**MerchantInfo.model_validate(m[0]).model_dump(),
"distance": round(m[1]) # 四舍五入到整数米
} for m in merchants])
else:
# 如果没有提供经纬度,按创建时间排序
merchants = db.query(MerchantDB).order_by(
MerchantDB.create_time.desc()
).offset(skip).limit(limit).all()
return success_response(data=[
MerchantInfo.model_validate(m) for m in merchants
])