163 lines
5.9 KiB
Python
163 lines
5.9 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Query, Path
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
from pydantic import BaseModel, Field
|
|
from app.models.database import get_db
|
|
from app.models.community_set_mapping import CommunitySetMapping
|
|
from app.models.community_set import CommunitySet
|
|
from app.models.community import CommunityDB
|
|
from app.api.deps import get_current_user, get_admin_user
|
|
from app.models.user import UserDB
|
|
from datetime import datetime
|
|
from app.core.response import success_response, error_response
|
|
|
|
router = APIRouter()
|
|
|
|
# 请求和响应模型
|
|
class CommunitySetMappingCreate(BaseModel):
|
|
set_id: int = Field(..., ge=1, description="社区集合ID")
|
|
community_id: int = Field(..., ge=1, description="社区ID")
|
|
|
|
class CommunitySetMappingResponse(BaseModel):
|
|
id: int
|
|
set_id: int
|
|
community_id: int
|
|
create_time: datetime
|
|
update_time: datetime
|
|
|
|
# 包含社区集合名称和社区名称
|
|
set_name: Optional[str] = None
|
|
community_name: Optional[str] = None
|
|
address: Optional[str] = None
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
# 创建社区集合映射
|
|
@router.post("/", response_model=CommunitySetMappingResponse)
|
|
async def create_community_set_mapping(
|
|
mapping: CommunitySetMappingCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: UserDB = Depends(get_admin_user)
|
|
):
|
|
"""创建社区集合映射"""
|
|
# 检查社区集合是否存在
|
|
community_set = db.query(CommunitySet).filter(CommunitySet.id == mapping.set_id).first()
|
|
if not community_set:
|
|
return error_response(code=404, message="社区集合不存在")
|
|
|
|
# 检查社区是否存在
|
|
community = db.query(CommunityDB).filter(CommunityDB.id == mapping.community_id).first()
|
|
if not community:
|
|
return error_response(code=404, message="社区不存在")
|
|
|
|
# 检查映射是否已存在
|
|
existing_mapping = db.query(CommunitySetMapping).filter(
|
|
CommunitySetMapping.set_id == mapping.set_id,
|
|
CommunitySetMapping.community_id == mapping.community_id
|
|
).first()
|
|
|
|
if existing_mapping:
|
|
return error_response(code=400, message="该映射关系已存在")
|
|
|
|
# 创建新映射
|
|
new_mapping = CommunitySetMapping(
|
|
set_id=mapping.set_id,
|
|
community_id=mapping.community_id
|
|
)
|
|
|
|
db.add(new_mapping)
|
|
db.commit()
|
|
db.refresh(new_mapping)
|
|
|
|
# 添加社区集合名称和社区名称
|
|
result = CommunitySetMappingResponse.model_validate(new_mapping)
|
|
result.set_name = community_set.set_name
|
|
result.community_name = community.name
|
|
|
|
return success_response(data=result)
|
|
|
|
# 获取社区集合的所有社区
|
|
@router.get("/set/{set_id}/communities", response_model=List[CommunitySetMappingResponse])
|
|
async def get_communities_by_set(
|
|
set_id: int = Path(..., ge=1),
|
|
db: Session = Depends(get_db),
|
|
current_user: UserDB = Depends(get_current_user),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=100)
|
|
):
|
|
"""获取社区集合的所有社区"""
|
|
# 检查社区集合是否存在
|
|
community_set = db.query(CommunitySet).filter(CommunitySet.id == set_id).first()
|
|
if not community_set:
|
|
return error_response(code=404, message="社区集合不存在")
|
|
|
|
# 获取映射
|
|
mappings = db.query(CommunitySetMapping).filter(
|
|
CommunitySetMapping.set_id == set_id
|
|
).offset(skip).limit(limit).all()
|
|
|
|
# 添加社区集合名称和社区名称
|
|
results = []
|
|
for mapping in mappings:
|
|
community = db.query(CommunityDB).filter(CommunityDB.id == mapping.community_id).first()
|
|
result = CommunitySetMappingResponse.model_validate(mapping)
|
|
result.set_name = community_set.set_name
|
|
result.community_name = community.name if community else None
|
|
result.address = community.address if community else None
|
|
|
|
results.append(result)
|
|
|
|
total = db.query(CommunitySetMapping).filter(
|
|
CommunitySetMapping.set_id == set_id
|
|
).count()
|
|
|
|
return success_response(data={"total": total, "items": results})
|
|
|
|
# 获取社区所属的所有集合
|
|
@router.get("/community/{community_id}/sets", response_model=List[CommunitySetMappingResponse])
|
|
async def get_sets_by_community(
|
|
community_id: int = Path(..., ge=1),
|
|
db: Session = Depends(get_db),
|
|
current_user: UserDB = Depends(get_current_user),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=100)
|
|
):
|
|
"""获取社区所属的所有集合"""
|
|
# 检查社区是否存在
|
|
community = db.query(CommunityDB).filter(CommunityDB.id == community_id).first()
|
|
if not community:
|
|
return error_response(code=404, message="社区不存在")
|
|
|
|
# 获取映射
|
|
mappings = db.query(CommunitySetMapping).filter(
|
|
CommunitySetMapping.community_id == community_id
|
|
).offset(skip).limit(limit).all()
|
|
|
|
# 添加社区集合名称和社区名称
|
|
results = []
|
|
for mapping in mappings:
|
|
community_set = db.query(CommunitySet).filter(CommunitySet.id == mapping.set_id).first()
|
|
result = CommunitySetMappingResponse.model_validate(mapping)
|
|
result.set_name = community_set.set_name if community_set else None
|
|
result.community_name = community.name
|
|
results.append(result)
|
|
|
|
return success_response(data=results)
|
|
|
|
# 删除社区集合映射
|
|
@router.delete("/{mapping_id}", response_model=dict)
|
|
async def delete_community_set_mapping(
|
|
mapping_id: int = Path(..., ge=1),
|
|
db: Session = Depends(get_db),
|
|
current_user: UserDB = Depends(get_admin_user)
|
|
):
|
|
"""删除社区集合映射"""
|
|
mapping = db.query(CommunitySetMapping).filter(CommunitySetMapping.id == mapping_id).first()
|
|
|
|
if not mapping:
|
|
return error_response(code=404, message="映射关系不存在")
|
|
|
|
db.delete(mapping)
|
|
db.commit()
|
|
|
|
return success_response(data={"message": "映射关系已删除"}) |