import redis import json import logging from app.core.config import settings from typing import List, Dict, Any, Optional class RedisClient: """Redis 客户端""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(RedisClient, cls).__new__(cls) try: cls._instance.client = redis.Redis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB, password=settings.REDIS_PASSWORD, decode_responses=True # 自动解码为字符串 ) # 测试连接 cls._instance.client.ping() logging.info("Redis 连接成功") except Exception as e: logging.error(f"Redis 连接失败: {str(e)}") cls._instance.client = None return cls._instance def get_client(self) -> redis.Redis: """获取 Redis 客户端""" return self.client def push_order_to_queue(self, community_id: int, order_id: str) -> bool: """ 添加新订单到社区队列 Args: community_id: 社区ID order_id: 订单ID Returns: bool: 是否添加成功 """ try: key = f"community:{community_id}:new_orders" # 使用 LPUSH 将订单ID添加到列表头部 self.client.lpush(key, order_id) # 设置过期时间为24小时 self.client.expire(key, 86400) return True except Exception as e: logging.error(f"添加新订单到队列失败: {str(e)}") return False def get_orders_from_queue(self, community_id: int, count: int = 10) -> List[str]: """ 获取社区新订单列表 Args: community_id: 社区ID count: 获取数量,默认10个 Returns: List[str]: 订单ID列表 """ try: key = f"community:{community_id}:new_orders" # 获取订单列表,并不移除 orders = self.client.lrange(key, 0, count-1) return orders except Exception as e: logging.error(f"获取社区新订单列表失败: {str(e)}") return [] def remove_orders(self, community_id: int) -> bool: """ 从社区队列中移除订单 Args: community_id: 社区ID Returns: bool: 是否移除成功 """ try: key = f"community:{community_id}:new_orders" # 使用 ltrim 移除队列中的所有元素 self.client.ltrim(key, 0, -1) return True except Exception as e: logging.error(f"从队列中移除订单失败: {str(e)}") return False # 创建全局实例 redis_client = RedisClient()