deliveryman-api/app/api/endpoints/address.py
2025-03-25 12:42:59 +08:00

225 lines
7.3 KiB
Python

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from sqlalchemy import and_
from typing import List, Optional
from app.models.address import AddressDB, AddressCreate, AddressUpdate, AddressInfo, AddressType
from app.models.community import CommunityDB
from app.models.database import get_db
from app.api.deps import get_current_user
from app.models.user import UserDB
from app.core.response import success_response, error_response, ResponseModel
from app.models.community_building import CommunityBuildingDB
router = APIRouter()
@router.post("", response_model=ResponseModel)
async def create_address(
address: AddressCreate,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""创建配送地址"""
if address.is_default:
db.query(AddressDB).filter(
and_(
AddressDB.address_type == address.address_type,
AddressDB.user_id == current_user.userid,
AddressDB.is_default == True
)
).update({"is_default": False})
# 查询社区名字和楼栋名字
if address.community_id:
community = db.query(CommunityDB).filter(
CommunityDB.id == address.community_id
).first()
community_building = db.query(CommunityBuildingDB).filter(
CommunityBuildingDB.id == address.community_building_id
).first()
db_address = AddressDB(
user_id=current_user.userid,
community_name=community.name,
community_building_name=community_building.building_name,
**address.model_dump()
)
else:
db_address = AddressDB(
user_id=current_user.userid,
**address.model_dump()
)
db.add(db_address)
db.commit()
db.refresh(db_address)
return success_response(data=AddressInfo.model_validate(db_address))
@router.get("", response_model=ResponseModel)
async def get_addresses(
community_id: Optional[int] = None,
address_type : Optional[AddressType] = AddressType.PICKUP,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取用户的所有配送地址"""
addresses = db.query(
AddressDB
).filter(
AddressDB.user_id == current_user.userid
)
if address_type:
addresses = addresses.filter(AddressDB.address_type == address_type)
if community_id is not None:
addresses = addresses.filter(AddressDB.community_id == community_id)
addresses = addresses.all()
addresses_list = []
for a in addresses:
addresses_list.append(AddressInfo.model_validate(a))
return success_response(data=addresses_list)
@router.put("/{address_id}", response_model=ResponseModel)
async def update_address(
address_id: int,
address: AddressUpdate,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""更新配送地址"""
db_address = db.query(AddressDB).filter(
and_(
AddressDB.id == address_id,
AddressDB.user_id == current_user.userid
)
).first()
if not db_address:
return error_response(code=404, message="地址不存在")
update_data = address.model_dump(exclude_unset=True)
if update_data.get("is_default"):
db.query(AddressDB).filter(
and_(
AddressDB.address_type == address.address_type,
AddressDB.user_id == current_user.userid,
AddressDB.is_default == True
)
).update({"is_default": False})
for key, value in update_data.items():
setattr(db_address, key, value)
# 更新社区名字和楼栋名字
if address.community_id:
community = db.query(CommunityDB).filter(
CommunityDB.id == address.community_id
).first()
db_address.community_name = community.name
if address.community_building_id:
community_building = db.query(CommunityBuildingDB).filter(
CommunityBuildingDB.id == address.community_building_id
).first()
db_address.community_building_name = community_building.building_name
db.commit()
db.refresh(db_address)
return success_response(data=AddressInfo.model_validate(db_address))
@router.delete("/{address_id}", response_model=ResponseModel)
async def delete_address(
address_id: int,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""删除配送地址"""
result = db.query(AddressDB).filter(
and_(
AddressDB.id == address_id,
AddressDB.user_id == current_user.userid
)
).delete()
if not result:
return error_response(code=404, message="地址不存在")
db.commit()
return success_response(message="地址已删除")
@router.post("/{address_id}/set-default", response_model=ResponseModel)
async def set_default_address(
address_id: int,
address_type: AddressType = AddressType.PICKUP,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""设置默认地址"""
db.query(AddressDB).filter(
and_(
AddressDB.address_type == address_type,
AddressDB.user_id == current_user.userid,
AddressDB.is_default == True
)
).update({"is_default": False})
db_address = db.query(AddressDB).filter(
and_(
AddressDB.id == address_id,
AddressDB.user_id == current_user.userid
)
).first()
if not db_address:
return error_response(code=404, message="地址不存在")
db_address.is_default = True
db.commit()
db.refresh(db_address)
return success_response(data=AddressInfo.model_validate(db_address))
@router.get("/{address_id}", response_model=ResponseModel)
async def get_address(
address_id: int,
db: Session = Depends(get_db),
current_user: UserDB = Depends(get_current_user)
):
"""获取地址详情"""
# 查询地址,并 join 小区名称
address = db.query(
AddressDB,
CommunityDB.name.label('community_name')
).join(
CommunityDB,
AddressDB.community_id == CommunityDB.id
).filter(
AddressDB.id == address_id,
AddressDB.user_id == current_user.userid # 只能查看自己的地址
).first()
if not address:
return error_response(code=404, message="地址不存在")
# 构建返回数据
address_data = {
"id": address.AddressDB.id,
"community_id": address.AddressDB.community_id,
"community_name": address.community_name,
"community_building_id": address.AddressDB.community_building_id,
"community_building_name": address.AddressDB.community_building_name,
"address_detail": address.AddressDB.address_detail,
"name": address.AddressDB.name,
"phone": address.AddressDB.phone,
"gender": address.AddressDB.gender,
"is_default": address.AddressDB.is_default,
"address_type": address.AddressDB.address_type,
"longitude": address.AddressDB.longitude,
"latitude": address.AddressDB.latitude
}
return success_response(data=AddressInfo(**address_data))