import redis import json import logging from app.core.config import settings from typing import List, Dict, Any, Optional from sqlalchemy.orm import Session from app.models.user import UserDB from datetime import datetime from app.models.user import UserRole class RedisClient: """Redis 客户端""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(RedisClient, cls).__new__(cls) try: cls._instance.client = redis.Redis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB, password=settings.REDIS_PASSWORD, decode_responses=True # 自动解码为字符串 ) # 测试连接 cls._instance.client.ping() logging.info("Redis 连接成功") except Exception as e: logging.error(f"Redis 连接失败: {str(e)}") cls._instance.client = None return cls._instance def get_client(self) -> redis.Redis: """获取 Redis 客户端""" return self.client def push_order_to_community_period_today_queue(self, community_time_period_id: int, order_id: str) -> bool: """ 添加新订单到今日队列 """ try: today_date_str = datetime.now().strftime("%Y-%m-%d") key = f"community_period:{community_time_period_id}:today_orders:{today_date_str}" self.client.lpush(key, order_id) # 设置过期时间为24小时 self.client.expire(key, 86400) return True except Exception as e: logging.error(f"添加新订单到今日队列失败: {str(e)}") return False def get_community_period_today_orders_count(self, community_time_period_id: int) -> int: """ 获取今日队列中的订单数量 """ try: today_date_str = datetime.now().strftime("%Y-%m-%d") key = f"community_period:{community_time_period_id}:today_orders:{today_date_str}" return self.client.llen(key) except Exception as e: logging.error(f"获取今日队列中的订单数量失败: {str(e)}") return 0 def push_order_to_queue(self, community_id: int, order_id: str, db: Session) -> bool: """ 添加新订单到社区队列 Args: user_id: 用户ID community_id: 社区ID order_id: 订单ID Returns: bool: 是否添加成功 """ try: # 查询所有社区的用户 users = db.query(UserDB).filter(UserDB.community_id == community_id, UserDB.roles == UserRole.DELIVERYMAN).all() for user in users: key = f"user:{user.userid}:community:{community_id}:new_orders" # 使用 LPUSH 将订单ID添加到列表头部 self.client.lpush(key, order_id) # 设置过期时间为24小时 self.client.expire(key, 86400) return True except Exception as e: logging.error(f"添加新订单到队列失败: {str(e)}") return False def pop_orders_from_queue(self, user_id: int, community_id: int, count: int = 10) -> List[str]: """ 获取社区新订单列表 Args: user_id: 用户ID community_id: 社区ID count: 获取数量,默认10个 Returns: List[str]: 订单ID列表 """ try: key = f"user:{user_id}:community:{community_id}:new_orders" # 获取订单列表, orders = self.client.lpop(key, count) return orders except Exception as e: logging.error(f"获取社区新订单列表失败: {str(e)}") return [] # 创建全局实例 redis_client = RedisClient()