from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from typing import List, Optional from sqlalchemy import text from app.models.merchant import ( MerchantDB, MerchantCreate, MerchantUpdate, MerchantInfo, MerchantImageDB ) from app.models.database import get_db from app.api.deps import get_admin_user from app.models.user import UserDB from app.core.response import success_response, error_response, ResponseModel router = APIRouter() @router.post("", response_model=ResponseModel) async def create_merchant( merchant: MerchantCreate, db: Session = Depends(get_db), admin: UserDB = Depends(get_admin_user) ): """创建商家(管理员)""" # 创建商家基本信息 merchant_data = merchant.model_dump(exclude={'images'}) db_merchant = MerchantDB(**merchant_data) db.add(db_merchant) try: db.flush() # 获取商家ID # 创建商家图片 for image in merchant.images: db_image = MerchantImageDB( merchant_id=db_merchant.id, image_url=image.image_url, sort=image.sort ) db.add(db_image) db.commit() db.refresh(db_merchant) return success_response(data=MerchantInfo.model_validate(db_merchant)) except Exception as e: db.rollback() return error_response(code=500, message=f"创建失败: {str(e)}") @router.put("/{merchant_id}", response_model=ResponseModel) async def update_merchant( merchant_id: int, merchant: MerchantUpdate, db: Session = Depends(get_db), admin: UserDB = Depends(get_admin_user) ): """更新商家信息(管理员)""" db_merchant = db.query(MerchantDB).filter( MerchantDB.id == merchant_id ).first() if not db_merchant: return error_response(code=404, message="商家不存在") # 更新基本信息 update_data = merchant.model_dump(exclude={'images'}, exclude_unset=True) for key, value in update_data.items(): setattr(db_merchant, key, value) # 如果更新了图片 if merchant.images is not None: # 删除原有图片 db.query(MerchantImageDB).filter( MerchantImageDB.merchant_id == merchant_id ).delete() # 添加新图片 for image in merchant.images: db_image = MerchantImageDB( merchant_id=merchant_id, image_url=image.image_url, sort=image.sort ) db.add(db_image) try: db.commit() db.refresh(db_merchant) return success_response(data=MerchantInfo.model_validate(db_merchant)) except Exception as e: db.rollback() return error_response(code=500, message=f"更新失败: {str(e)}") @router.get("/{merchant_id}", response_model=ResponseModel) async def get_merchant( merchant_id: int, db: Session = Depends(get_db) ): """获取商家详情""" merchant = db.query(MerchantDB).filter( MerchantDB.id == merchant_id ).first() if not merchant: return error_response(code=404, message="商家不存在") return success_response(data=MerchantInfo.model_validate(merchant)) @router.get("", response_model=ResponseModel) async def list_merchants( longitude: Optional[float] = None, latitude: Optional[float] = None, skip: int = 0, limit: int = 20, db: Session = Depends(get_db) ): """获取商家列表,如果提供经纬度则按距离排序,否则按创建时间排序""" if longitude is not None and latitude is not None: # 使用 MySQL 的 ST_Distance_Sphere 计算距离(单位:米) merchants = db.query( MerchantDB, text("ST_Distance_Sphere(point(longitude, latitude), point(:lon, :lat)) as distance") ).params( lon=longitude, lat=latitude ).order_by( text("distance") ).offset(skip).limit(limit).all() return success_response(data=[{ **MerchantInfo.model_validate(m[0]).model_dump(), "distance": round(m[1]) # 四舍五入到整数米 } for m in merchants]) else: # 如果没有提供经纬度,按创建时间排序 merchants = db.query(MerchantDB).order_by( MerchantDB.create_time.desc() ).offset(skip).limit(limit).all() return success_response(data=[ MerchantInfo.model_validate(m) for m in merchants ])