deliveryman-api/app/api/endpoints/community_set_mapping.py
2025-03-09 23:37:33 +08:00

163 lines
5.9 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, Path
from sqlalchemy.orm import Session
from typing import List, Optional
from pydantic import BaseModel, Field
from app.models.database import get_db
from app.models.community_set_mapping import CommunitySetMapping
from app.models.community_set import CommunitySet
from app.models.community import CommunityDB
from app.api.deps import get_current_user, get_admin_user
from app.models.user import UserDB
from datetime import datetime
from app.core.response import success_response, error_response
router = APIRouter()
# 请求和响应模型
class CommunitySetMappingCreate(BaseModel):
set_id: int = Field(..., ge=1, description="社区集合ID")
community_id: int = Field(..., ge=1, description="社区ID")
class CommunitySetMappingResponse(BaseModel):
id: int
set_id: int
community_id: int
create_time: datetime
update_time: datetime
# 包含社区集合名称和社区名称
set_name: Optional[str] = None
community_name: Optional[str] = None
address: Optional[str] = None
class Config:
from_attributes = True
# 创建社区集合映射
@router.post("/", response_model=CommunitySetMappingResponse)
async def create_community_set_mapping(
mapping: CommunitySetMappingCreate,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_admin_user)
):
"""创建社区集合映射"""
# 检查社区集合是否存在
community_set = db.query(CommunitySet).filter(CommunitySet.id == mapping.set_id).first()
if not community_set:
return error_response(code=404, message="社区集合不存在")
# 检查社区是否存在
community = db.query(CommunityDB).filter(CommunityDB.id == mapping.community_id).first()
if not community:
return error_response(code=404, message="社区不存在")
# 检查映射是否已存在
existing_mapping = db.query(CommunitySetMapping).filter(
CommunitySetMapping.set_id == mapping.set_id,
CommunitySetMapping.community_id == mapping.community_id
).first()
if existing_mapping:
return error_response(code=400, message="该映射关系已存在")
# 创建新映射
new_mapping = CommunitySetMapping(
set_id=mapping.set_id,
community_id=mapping.community_id
)
db.add(new_mapping)
db.commit()
db.refresh(new_mapping)
# 添加社区集合名称和社区名称
result = CommunitySetMappingResponse.model_validate(new_mapping)
result.set_name = community_set.set_name
result.community_name = community.name
return success_response(data=result)
# 获取社区集合的所有社区
@router.get("/set/{set_id}/communities", response_model=List[CommunitySetMappingResponse])
async def get_communities_by_set(
set_id: int = Path(..., ge=1),
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100)
):
"""获取社区集合的所有社区"""
# 检查社区集合是否存在
community_set = db.query(CommunitySet).filter(CommunitySet.id == set_id).first()
if not community_set:
return error_response(code=404, message="社区集合不存在")
# 获取映射
mappings = db.query(CommunitySetMapping).filter(
CommunitySetMapping.set_id == set_id
).offset(skip).limit(limit).all()
# 添加社区集合名称和社区名称
results = []
for mapping in mappings:
community = db.query(CommunityDB).filter(CommunityDB.id == mapping.community_id).first()
result = CommunitySetMappingResponse.model_validate(mapping)
result.set_name = community_set.set_name
result.community_name = community.name if community else None
result.address = community.address if community else None
results.append(result)
total = db.query(CommunitySetMapping).filter(
CommunitySetMapping.set_id == set_id
).count()
return success_response(data={"total": total, "items": results})
# 获取社区所属的所有集合
@router.get("/community/{community_id}/sets", response_model=List[CommunitySetMappingResponse])
async def get_sets_by_community(
community_id: int = Path(..., ge=1),
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100)
):
"""获取社区所属的所有集合"""
# 检查社区是否存在
community = db.query(CommunityDB).filter(CommunityDB.id == community_id).first()
if not community:
return error_response(code=404, message="社区不存在")
# 获取映射
mappings = db.query(CommunitySetMapping).filter(
CommunitySetMapping.community_id == community_id
).offset(skip).limit(limit).all()
# 添加社区集合名称和社区名称
results = []
for mapping in mappings:
community_set = db.query(CommunitySet).filter(CommunitySet.id == mapping.set_id).first()
result = CommunitySetMappingResponse.model_validate(mapping)
result.set_name = community_set.set_name if community_set else None
result.community_name = community.name
results.append(result)
return success_response(data=results)
# 删除社区集合映射
@router.delete("/{mapping_id}", response_model=dict)
async def delete_community_set_mapping(
mapping_id: int = Path(..., ge=1),
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_admin_user)
):
"""删除社区集合映射"""
mapping = db.query(CommunitySetMapping).filter(CommunitySetMapping.id == mapping_id).first()
if not mapping:
return error_response(code=404, message="映射关系不存在")
db.delete(mapping)
db.commit()
return success_response(data={"message": "映射关系已删除"})