deliveryman-api/app/api/endpoints/community_building.py
2025-03-22 17:04:17 +08:00

211 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session, joinedload
from typing import List, Optional
from app.models.community_building import (
CommunityBuildingDB,
CommunityBuildingCreate,
CommunityBuildingUpdate,
CommunityBuildingInfo,
CommunityBuildingCreateBatch
)
from app.models.community import CommunityDB
from app.models.database import get_db
from app.api.deps import get_admin_user, get_current_user
from app.models.user import UserDB
from app.core.response import success_response, error_response, ResponseModel
router = APIRouter()
@router.get("/group_by_community", response_model=ResponseModel)
async def get_buildings_group_by_community(
db: Session = Depends(get_db),
skip: int = 0,
limit: int = 10,
current_user: UserDB = Depends(get_admin_user)
):
"""获取楼栋列表,按社区分组"""
query = db.query(CommunityDB.id, CommunityDB.name, CommunityBuildingDB.id, CommunityBuildingDB.building_name).join(CommunityBuildingDB, CommunityBuildingDB.community_id == CommunityDB.id)
results = query.all()
# 按社区分组
grouped_results = {}
for community_id, community_name, building_id, building_name in results:
if community_id not in grouped_results:
grouped_results[community_id] = {
"community_id": community_id,
"community_name": community_name,
"buildings": []
}
grouped_results[community_id]["buildings"].append({"building_id": building_id, "building_name": building_name})
return success_response(data=list(grouped_results.values()))
@router.get("/list", response_model=ResponseModel)
async def get_buildings(
community_id: Optional[int] = None,
user_id: Optional[int] = None,
skip: int = 0,
limit: int = 1000,
db: Session = Depends(get_db)
):
"""获取楼栋列表
Args:
community_id: 小区ID
user_id: 用户ID用于获取用户所属小区的楼栋
skip: 跳过记录数
limit: 返回记录数
"""
# 联表查询,获取社区名称
query = db.query(
CommunityBuildingDB,
CommunityDB.name.label('community_name')
).join(
CommunityDB,
CommunityBuildingDB.community_id == CommunityDB.id
)
# 如果指定了用户ID查询用户所属小区
if user_id:
user = db.query(UserDB).filter(UserDB.userid == user_id).first()
if not user or not user.community_id:
return error_response(code=400, message="用户未关联小区")
query = query.filter(CommunityBuildingDB.community_id == user.community_id)
# 如果指定了小区ID直接筛选
elif community_id:
query = query.filter(CommunityBuildingDB.community_id == community_id)
query = query.order_by(CommunityBuildingDB.id.asc())
# 获取总数
total = query.count()
# 查询数据
results = query.offset(skip).limit(limit).all()
# 处理返回数据
building_list = []
for building, community_name in results:
building_info = CommunityBuildingInfo.model_validate(building)
building_info.community_name = community_name
building_list.append(building_info)
return success_response(data={
"total": total,
"items": building_list
})
@router.post("/batch", response_model=ResponseModel)
async def create_buildings(
batch: CommunityBuildingCreateBatch,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""批量创建楼栋(管理员)"""
# 检查社区是否存在
community = db.query(CommunityDB).filter(CommunityDB.id == batch.community_id).first()
if not community:
return error_response(code=404, message="社区不存在")
# 批量创建楼栋
buildings = []
for building_name in batch.building_names:
building = CommunityBuildingDB(community_id=batch.community_id, building_name=building_name)
buildings.append(building)
db.add_all(buildings)
try:
db.commit()
# 不要尝试刷新整个列表
# db.refresh(buildings)
# 查询刚刚创建的楼栋
created_buildings = db.query(CommunityBuildingDB).filter(
CommunityBuildingDB.community_id == batch.community_id,
CommunityBuildingDB.building_name.in_(batch.building_names)
).all()
# 转换为响应模型
building_info_list = [CommunityBuildingInfo.model_validate(building) for building in created_buildings]
return success_response(data=building_info_list)
except Exception as e:
db.rollback()
return error_response(code=500, message=f"创建失败: {str(e)}")
@router.post("", response_model=ResponseModel)
async def create_building(
building: CommunityBuildingCreate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""创建楼栋(管理员)"""
# 检查是否已存在相同编号的楼栋
exists = db.query(CommunityBuildingDB).filter(
CommunityBuildingDB.community_id == building.community_id,
CommunityBuildingDB.building_name == building.building_name
).first()
if exists:
return error_response(code=400, message="该楼栋编号已存在")
db_building = CommunityBuildingDB(**building.model_dump())
db.add(db_building)
try:
db.commit()
db.refresh(db_building)
return success_response(data=CommunityBuildingInfo.model_validate(db_building))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"创建失败: {str(e)}")
@router.put("/{building_id}", response_model=ResponseModel)
async def update_building(
building_id: int,
building: CommunityBuildingUpdate,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""更新楼栋信息(管理员)"""
db_building = db.query(CommunityBuildingDB).filter(
CommunityBuildingDB.id == building_id
).first()
if not db_building:
return error_response(code=404, message="楼栋不存在")
update_data = building.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_building, key, value)
try:
db.commit()
db.refresh(db_building)
return success_response(data=CommunityBuildingInfo.model_validate(db_building))
except Exception as e:
db.rollback()
return error_response(code=500, message=f"更新失败: {str(e)}")
@router.delete("/{building_id}", response_model=ResponseModel)
async def delete_building(
building_id: int,
db: Session = Depends(get_db),
admin: UserDB = Depends(get_admin_user)
):
"""删除楼栋(管理员)"""
db_building = db.query(CommunityBuildingDB).filter(
CommunityBuildingDB.id == building_id
).first()
if not db_building:
return error_response(code=404, message="楼栋不存在")
try:
db.delete(db_building)
db.commit()
return success_response(message="删除成功")
except Exception as e:
db.rollback()
return error_response(code=500, message=f"删除失败: {str(e)}")