from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from sqlalchemy import and_ from typing import List, Optional from app.models.address import AddressDB, AddressCreate, AddressUpdate, AddressInfo, AddressType from app.models.community import CommunityDB from app.models.database import get_db from app.api.deps import get_current_user from app.models.user import UserDB from app.core.response import success_response, error_response, ResponseModel from app.models.community_building import CommunityBuildingDB router = APIRouter() @router.post("", response_model=ResponseModel) async def create_address( address: AddressCreate, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """创建配送地址""" if address.is_default: db.query(AddressDB).filter( and_( AddressDB.address_type == address.address_type, AddressDB.user_id == current_user.userid, AddressDB.is_default == True ) ).update({"is_default": False}) # 查询社区名字和楼栋名字 community = db.query(CommunityDB).filter( CommunityDB.id == address.community_id ).first() community_building = db.query(CommunityBuildingDB).filter( CommunityBuildingDB.id == address.community_building_id ).first() db_address = AddressDB( user_id=current_user.userid, community_name=community.name, community_building_name=community_building.building_name, **address.model_dump() ) db.add(db_address) db.commit() db.refresh(db_address) return success_response(data=AddressInfo.model_validate(db_address)) @router.get("", response_model=ResponseModel) async def get_addresses( community_id: Optional[int] = None, address_type : Optional[AddressType] = AddressType.PICKUP, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """获取用户的所有配送地址""" addresses = db.query( AddressDB ).filter( AddressDB.user_id == current_user.userid ) if address_type: addresses = addresses.filter(AddressDB.address_type == address_type) if community_id is not None: addresses = addresses.filter(AddressDB.community_id == community_id) addresses = addresses.all() addresses_list = [] for a in addresses: addresses_list.append(AddressInfo.model_validate(a)) return success_response(data=addresses_list) @router.put("/{address_id}", response_model=ResponseModel) async def update_address( address_id: int, address: AddressUpdate, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """更新配送地址""" db_address = db.query(AddressDB).filter( and_( AddressDB.id == address_id, AddressDB.user_id == current_user.userid ) ).first() if not db_address: return error_response(code=404, message="地址不存在") update_data = address.model_dump(exclude_unset=True) if update_data.get("is_default"): db.query(AddressDB).filter( and_( AddressDB.address_type == address.address_type, AddressDB.user_id == current_user.userid, AddressDB.is_default == True ) ).update({"is_default": False}) for key, value in update_data.items(): setattr(db_address, key, value) # 更新社区名字和楼栋名字 if address.community_id: community = db.query(CommunityDB).filter( CommunityDB.id == address.community_id ).first() db_address.community_name = community.name if address.community_building_id: community_building = db.query(CommunityBuildingDB).filter( CommunityBuildingDB.id == address.community_building_id ).first() db_address.community_building_name = community_building.building_name db.commit() db.refresh(db_address) return success_response(data=AddressInfo.model_validate(db_address)) @router.delete("/{address_id}", response_model=ResponseModel) async def delete_address( address_id: int, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """删除配送地址""" result = db.query(AddressDB).filter( and_( AddressDB.id == address_id, AddressDB.user_id == current_user.userid ) ).delete() if not result: return error_response(code=404, message="地址不存在") db.commit() return success_response(message="地址已删除") @router.post("/{address_id}/set-default", response_model=ResponseModel) async def set_default_address( address_id: int, address_type: AddressType = AddressType.PICKUP, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """设置默认地址""" db.query(AddressDB).filter( and_( AddressDB.address_type == address_type, AddressDB.user_id == current_user.userid, AddressDB.is_default == True ) ).update({"is_default": False}) db_address = db.query(AddressDB).filter( and_( AddressDB.id == address_id, AddressDB.user_id == current_user.userid ) ).first() if not db_address: return error_response(code=404, message="地址不存在") db_address.is_default = True db.commit() db.refresh(db_address) return success_response(data=AddressInfo.model_validate(db_address)) @router.get("/{address_id}", response_model=ResponseModel) async def get_address( address_id: int, db: Session = Depends(get_db), current_user: UserDB = Depends(get_current_user) ): """获取地址详情""" # 查询地址,并 join 小区名称 address = db.query( AddressDB, CommunityDB.name.label('community_name') ).join( CommunityDB, AddressDB.community_id == CommunityDB.id ).filter( AddressDB.id == address_id, AddressDB.user_id == current_user.userid # 只能查看自己的地址 ).first() if not address: return error_response(code=404, message="地址不存在") # 构建返回数据 address_data = { "id": address.AddressDB.id, "community_id": address.AddressDB.community_id, "community_name": address.community_name, "community_building_id": address.AddressDB.community_building_id, "community_building_name": address.AddressDB.community_building_name, "address_detail": address.AddressDB.address_detail, "name": address.AddressDB.name, "phone": address.AddressDB.phone, "gender": address.AddressDB.gender, "is_default": address.AddressDB.is_default, "address_type": address.AddressDB.address_type, "longitude": address.AddressDB.longitude, "latitude": address.AddressDB.latitude } return success_response(data=AddressInfo(**address_data))