From a2c248ff4d6a474aa780c6853b9ab0e2d6e17525 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Tue, 4 Mar 2025 11:07:35 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=A4=BE=E5=8C=BA=E9=85=8D?= =?UTF-8?q?=E9=80=81=E6=97=B6=E6=AE=B5=E7=9A=84=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/community_timeperiod.py | 242 ++++++++++++++++++++++ app/api/endpoints/order.py | 22 +- app/core/redis_client.py | 49 ++--- app/main.py | 3 +- app/models/community_timeperiod.py | 56 +++++ 5 files changed, 325 insertions(+), 47 deletions(-) create mode 100644 app/api/endpoints/community_timeperiod.py create mode 100644 app/models/community_timeperiod.py diff --git a/app/api/endpoints/community_timeperiod.py b/app/api/endpoints/community_timeperiod.py new file mode 100644 index 0000000..645d4ba --- /dev/null +++ b/app/api/endpoints/community_timeperiod.py @@ -0,0 +1,242 @@ +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session +from app.models.database import get_db +from app.models.user import UserDB +from app.api.deps import get_admin_user +from app.core.response import success_response, error_response, ResponseModel +from app.models.community_timeperiod import ( + CommunityTimePeriodDB, + CommunityTimePeriodCreate, + CommunityTimePeriodUpdate, + CommunityTimePeriodInfo, + CommunityTimePeriodWithDetail, + CommunityTimePeriodBatchCreate +) +from app.models.timeperiod import TimePeriodDB, TimePeriodInfo +from app.models.community import CommunityDB, CommunityInfo +from typing import List, Optional +import logging +from sqlalchemy.orm import joinedload + +router = APIRouter() + +@router.post("", response_model=ResponseModel) +async def create_community_time_period( + time_period: CommunityTimePeriodCreate, + db: Session = Depends(get_db), + admin: UserDB = Depends(get_admin_user) +): + """为社区添加配送时段""" + try: + # 检查社区是否存在 + community = db.query(CommunityDB).filter(CommunityDB.id == time_period.community_id).first() + if not community: + return error_response(code=404, message="社区不存在") + + # 检查时段是否存在 + time_period_obj = db.query(TimePeriodDB).filter(TimePeriodDB.id == time_period.time_period_id).first() + if not time_period_obj: + return error_response(code=404, message="配送时段不存在") + + # 检查是否已经存在 + existing = db.query(CommunityTimePeriodDB).filter( + CommunityTimePeriodDB.community_id == time_period.community_id, + CommunityTimePeriodDB.time_period_id == time_period.time_period_id + ).first() + + if existing: + return error_response(code=400, message="该社区已经添加了此配送时段") + + # 创建新的关联 + db_community_time_period = CommunityTimePeriodDB( + community_id=time_period.community_id, + time_period_id=time_period.time_period_id, + capacity=time_period.capacity + ) + + db.add(db_community_time_period) + db.commit() + db.refresh(db_community_time_period) + + return success_response(message="社区配送时段添加成功", data=CommunityTimePeriodInfo.model_validate(db_community_time_period)) + + except Exception as e: + db.rollback() + logging.exception(f"添加社区配送时段失败: {str(e)}") + return error_response(code=500, message="添加社区配送时段失败,请稍后重试") + +@router.post("/batch", response_model=ResponseModel) +async def batch_create_community_time_period( + batch: CommunityTimePeriodBatchCreate, + db: Session = Depends(get_db), + admin: UserDB = Depends(get_admin_user) +): + """批量为社区添加配送时段""" + try: + # 检查社区是否存在 + community = db.query(CommunityDB).filter(CommunityDB.id == batch.community_id).first() + if not community: + return error_response(code=404, message="社区不存在") + + # 检查时段是否存在 + time_periods = db.query(TimePeriodDB).filter(TimePeriodDB.id.in_([time_period_capacity.time_period_id for time_period_capacity in batch.time_period_capacity_list])).all() + if len(time_periods) != len([time_period_capacity.time_period_id for time_period_capacity in batch.time_period_capacity_list]): + return error_response(code=404, message="部分配送时段不存在") + + # 先删除该社区的所有配送时段 + db.query(CommunityTimePeriodDB).filter( + CommunityTimePeriodDB.community_id == batch.community_id + ).delete() + + # 批量添加新的配送时段 + for time_period_capacity in batch.time_period_capacity_list: + db_community_time_period = CommunityTimePeriodDB( + community_id=batch.community_id, + time_period_id=time_period_capacity.time_period_id, + capacity=time_period_capacity.capacity + ) + db.add(db_community_time_period) + + db.commit() + + return success_response(message="社区配送时段批量添加成功") + + except Exception as e: + db.rollback() + logging.exception(f"批量添加社区配送时段失败: {str(e)}") + return error_response(code=500, message="批量添加社区配送时段失败,请稍后重试") + +@router.get("/community/{community_id}", response_model=ResponseModel) +async def get_community_time_periods( + community_id: int, + db: Session = Depends(get_db) +): + """获取社区的配送时段列表""" + try: + # 查询社区的配送时段 + community_time_periods = db.query(CommunityTimePeriodDB).filter( + CommunityTimePeriodDB.community_id == community_id + ).all() + + # 获取时段详情 + result = [] + for ctp in community_time_periods: + time_period = db.query(TimePeriodDB).filter(TimePeriodDB.id == ctp.time_period_id).first() + if time_period: + ctp_info = CommunityTimePeriodWithDetail.model_validate(ctp) + ctp_info.time_period = TimePeriodInfo.model_validate(time_period) + result.append(ctp_info) + + # 按时段开始时间排序 + result.sort(key=lambda x: x.time_period.from_time if x.time_period else None) + + return success_response(data=result) + + except Exception as e: + logging.exception(f"获取社区配送时段列表失败: {str(e)}") + return error_response(code=500, message="获取社区配送时段列表失败,请稍后重试") + +@router.put("/{community_time_period_id}", response_model=ResponseModel) +async def update_community_time_period( + community_time_period_id: int, + update_data: CommunityTimePeriodUpdate, + db: Session = Depends(get_db), + admin: UserDB = Depends(get_admin_user) +): + """更新社区配送时段""" + try: + # 查询社区配送时段 + community_time_period = db.query(CommunityTimePeriodDB).filter( + CommunityTimePeriodDB.id == community_time_period_id + ).first() + + if not community_time_period: + return error_response(code=404, message="社区配送时段不存在") + + # 更新运力 + community_time_period.capacity = update_data.capacity + + db.commit() + db.refresh(community_time_period) + + return success_response(message="社区配送时段更新成功", data=CommunityTimePeriodInfo.model_validate(community_time_period)) + + except Exception as e: + db.rollback() + logging.exception(f"更新社区配送时段失败: {str(e)}") + return error_response(code=500, message="更新社区配送时段失败,请稍后重试") + +@router.delete("/{community_time_period_id}", response_model=ResponseModel) +async def delete_community_time_period( + community_time_period_id: int, + db: Session = Depends(get_db), + admin: UserDB = Depends(get_admin_user) +): + """删除社区配送时段""" + try: + # 查询社区配送时段 + community_time_period = db.query(CommunityTimePeriodDB).filter( + CommunityTimePeriodDB.id == community_time_period_id + ).first() + + if not community_time_period: + return error_response(code=404, message="社区配送时段不存在") + + # 删除 + db.delete(community_time_period) + db.commit() + + return success_response(message="社区配送时段删除成功") + + except Exception as e: + db.rollback() + logging.exception(f"删除社区配送时段失败: {str(e)}") + return error_response(code=500, message="删除社区配送时段失败,请稍后重试") + + +# 根据社区进行 group 的配送时段列表 +@router.get("/group_by_community", response_model=ResponseModel) +async def get_group_by_community( + skip: int = 0, + limit: int = 10, + db: Session = Depends(get_db) +): + """根据社区进行 group 的配送时段列表""" + try: + #1. 查询有配送时段数据的社区 + community_time_periods = db.query(CommunityTimePeriodDB).all() + communities = db.query(CommunityDB).offset(skip).limit(limit).all() + + #2. 查询社区的配送时段 + community_time_periods = db.query(CommunityTimePeriodDB, TimePeriodDB.name.label("time_period_name"),TimePeriodDB.from_time.label("time_period_from_time"), TimePeriodDB.to_time.label("time_period_to_time")).join(TimePeriodDB).filter( + CommunityTimePeriodDB.community_id.in_([community.id for community in communities]) + ).all() + + #3. 根据社区进行 group 的配送时段列表 + communities_with_time_periods = [] + for community in communities: + community_time_periods = [ctp for ctp in community_time_periods if ctp.CommunityTimePeriodDB.community_id == community.id] + communities_with_time_periods.append({ + "community_id": community.id, + "community_name": community.name, + "time_periods": [{ + "time_period_name": ctp.time_period_name, + "time_period_from_time": ctp.time_period_from_time, + "time_period_to_time": ctp.time_period_to_time, + "capacity": ctp.CommunityTimePeriodDB.capacity + } for ctp in community_time_periods] + }) + + return success_response(data=communities_with_time_periods) + + except Exception as e: + logging.exception(f"获取社区配送时段列表失败: {str(e)}") + return error_response(code=500, message="获取社区配送时段列表失败,请稍后重试") + + + + + + + + diff --git a/app/api/endpoints/order.py b/app/api/endpoints/order.py index 96ee488..fee25fb 100644 --- a/app/api/endpoints/order.py +++ b/app/api/endpoints/order.py @@ -334,7 +334,12 @@ async def create_order( # 添加到新订单队列 if db_order.address_community_id: - redis_client.push_order_to_queue(db_order.address_community_id, db_order.orderid) + background_tasks.add_task( + redis_client.push_order_to_queue, + db_order.address_community_id, + db_order.orderid, + db + ) return success_response( message="订单创建成功", @@ -1362,19 +1367,6 @@ async def deliveryman_order_summary( "yesterday_count": yesterday_total, "today_count": today_total }) - -@router.get("/deliveryman/new_orders_checked", response_model=ResponseModel) -async def deliveryman_new_orders_checked( - db: Session = Depends(get_db), - deliveryman: UserDB = Depends(get_deliveryman_user) -): - """获取新订单已检查标记""" - success = redis_client.remove_orders(deliveryman.community_id) - - return success_response(data={ - "message": "新订单已检查标记已移除", - "success": success - }) @router.get("/deliveryman/check_new_order", response_model=ResponseModel) @@ -1385,7 +1377,7 @@ async def deliveryman_check_new_orders( """检查新订单""" # 从Redis获取新订单ID列表 - order_ids = redis_client.get_orders_from_queue(deliveryman.community_id, 10) + order_ids = redis_client.pop_orders_from_queue(deliveryman.userid, deliveryman.community_id, 10) if not order_ids: return success_response(data={ diff --git a/app/core/redis_client.py b/app/core/redis_client.py index 89ec5bd..a7ba4ed 100644 --- a/app/core/redis_client.py +++ b/app/core/redis_client.py @@ -3,7 +3,8 @@ import json import logging from app.core.config import settings from typing import List, Dict, Any, Optional - +from sqlalchemy.orm import Session +from app.models.user import UserDB class RedisClient: """Redis 客户端""" @@ -32,11 +33,12 @@ class RedisClient: """获取 Redis 客户端""" return self.client - def push_order_to_queue(self, community_id: int, order_id: str) -> bool: + def push_order_to_queue(self, community_id: int, order_id: str, db: Session) -> bool: """ 添加新订单到社区队列 - Args: + Args: + user_id: 用户ID community_id: 社区ID order_id: 订单ID @@ -44,21 +46,25 @@ class RedisClient: bool: 是否添加成功 """ try: - key = f"community:{community_id}:new_orders" - # 使用 LPUSH 将订单ID添加到列表头部 - self.client.lpush(key, order_id) - # 设置过期时间为24小时 - self.client.expire(key, 86400) + # 查询所有社区的用户 + users = db.query(UserDB).filter(UserDB.community_id == community_id).all() + for user in users: + key = f"user:{user.userid}:community:{community_id}:new_orders" + # 使用 LPUSH 将订单ID添加到列表头部 + self.client.lpush(key, order_id) + # 设置过期时间为24小时 + self.client.expire(key, 86400) return True except Exception as e: logging.error(f"添加新订单到队列失败: {str(e)}") return False - def get_orders_from_queue(self, community_id: int, count: int = 10) -> List[str]: + def pop_orders_from_queue(self, user_id: int, community_id: int, count: int = 10) -> List[str]: """ 获取社区新订单列表 Args: + user_id: 用户ID community_id: 社区ID count: 获取数量,默认10个 @@ -66,32 +72,13 @@ class RedisClient: List[str]: 订单ID列表 """ try: - key = f"community:{community_id}:new_orders" - # 获取订单列表,并不移除 - orders = self.client.lrange(key, 0, count-1) + key = f"user:{user_id}:community:{community_id}:new_orders" + # 获取订单列表, + orders = self.client.lpop(key, count) return orders except Exception as e: logging.error(f"获取社区新订单列表失败: {str(e)}") return [] - - def remove_orders(self, community_id: int) -> bool: - """ - 从社区队列中移除订单 - - Args: - community_id: 社区ID - - Returns: - bool: 是否移除成功 - """ - try: - key = f"community:{community_id}:new_orders" - # 使用 ltrim 移除队列中的所有元素 - self.client.ltrim(key, 0, -1) - return True - except Exception as e: - logging.error(f"从队列中移除订单失败: {str(e)}") - return False # 创建全局实例 redis_client = RedisClient() \ No newline at end of file diff --git a/app/main.py b/app/main.py index 4b58098..586c72a 100644 --- a/app/main.py +++ b/app/main.py @@ -1,6 +1,6 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from app.api.endpoints import wechat,user, address, community, station, order, coupon, community_building, upload, merchant, merchant_product, merchant_order, point, config, merchant_category, log, account,merchant_pay_order, message, bank_card, withdraw, mp, point_product, point_product_order, coupon_activity, ocr, dashboard, wecom, feedback, timeperiod +from app.api.endpoints import wechat,user, address, community, station, order, coupon, community_building, upload, merchant, merchant_product, merchant_order, point, config, merchant_category, log, account,merchant_pay_order, message, bank_card, withdraw, mp, point_product, point_product_order, coupon_activity, ocr, dashboard, wecom, feedback, timeperiod, community_timeperiod from app.models.database import Base, engine from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse @@ -77,6 +77,7 @@ app.include_router(log.router, prefix="/api/logs", tags=["系统日志"]) app.include_router(ocr.router, prefix="/api/ai/ocr", tags=["图像识别"]) app.include_router(feedback.router, prefix="/api/feedback", tags=["反馈"]) app.include_router(timeperiod.router, prefix="/api/time-periods", tags=["配送时段"]) +app.include_router(community_timeperiod.router, prefix="/api/community-time-periods", tags=["社区配送时段"]) @app.get("/") async def root(): diff --git a/app/models/community_timeperiod.py b/app/models/community_timeperiod.py new file mode 100644 index 0000000..1ef97e7 --- /dev/null +++ b/app/models/community_timeperiod.py @@ -0,0 +1,56 @@ +from sqlalchemy import Column, Integer, ForeignKey, DateTime, UniqueConstraint +from sqlalchemy.sql import func +from pydantic import BaseModel, Field +from typing import Optional, List +from datetime import datetime +from .database import Base +from .timeperiod import TimePeriodInfo + +# 数据库模型 +class CommunityTimePeriodDB(Base): + """社区配送时段关联表""" + __tablename__ = "community_time_periods" + + id = Column(Integer, primary_key=True, autoincrement=True) + community_id = Column(Integer, ForeignKey("communities.id"), nullable=False, index=True) + time_period_id = Column(Integer, ForeignKey("time_periods.id"), nullable=False) + capacity = Column(Integer, default=0, nullable=False) # 运力(订单量),0表示无限制 + create_time = Column(DateTime(timezone=True), server_default=func.now()) + update_time = Column(DateTime(timezone=True), onupdate=func.now()) + + # 确保每个社区的每个时段只有一条记录 + __table_args__ = ( + UniqueConstraint('community_id', 'time_period_id', name='uix_community_time_period'), + ) + +# Pydantic 模型 +class CommunityTimePeriodCreate(BaseModel): + community_id: int = Field(..., description="社区ID") + time_period_id: int = Field(..., description="配送时段ID") + capacity: int = Field(0, description="运力(订单量),0表示无限制") + +class CommunityTimePeriodUpdate(BaseModel): + capacity: int = Field(..., description="运力(订单量),0表示无限制") + +class CommunityTimePeriodInfo(BaseModel): + id: int + community_id: int + time_period_id: int + capacity: int + create_time: datetime + update_time: Optional[datetime] = None + + class Config: + from_attributes = True + +class CommunityTimePeriodWithDetail(CommunityTimePeriodInfo): + time_period: Optional[TimePeriodInfo] = None + +class CommunityTimePeriodWithCapacity(BaseModel): + community_id: int + time_period_id: int + capacity: int + +class CommunityTimePeriodBatchCreate(BaseModel): + community_id: int = Field(..., description="社区ID") + time_period_capacity_list: List[CommunityTimePeriodWithCapacity] = Field(..., description="配送时段ID列表") \ No newline at end of file