import redis import json import logging from app.core.config import settings from typing import List, Dict, Any, Optional from sqlalchemy.orm import Session from app.models.user import UserDB class RedisClient: """Redis 客户端""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(RedisClient, cls).__new__(cls) try: cls._instance.client = redis.Redis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB, password=settings.REDIS_PASSWORD, decode_responses=True # 自动解码为字符串 ) # 测试连接 cls._instance.client.ping() logging.info("Redis 连接成功") except Exception as e: logging.error(f"Redis 连接失败: {str(e)}") cls._instance.client = None return cls._instance def get_client(self) -> redis.Redis: """获取 Redis 客户端""" return self.client def push_order_to_queue(self, community_id: int, order_id: str, db: Session) -> bool: """ 添加新订单到社区队列 Args: user_id: 用户ID community_id: 社区ID order_id: 订单ID Returns: bool: 是否添加成功 """ try: # 查询所有社区的用户 users = db.query(UserDB).filter(UserDB.community_id == community_id).all() for user in users: key = f"user:{user.userid}:community:{community_id}:new_orders" # 使用 LPUSH 将订单ID添加到列表头部 self.client.lpush(key, order_id) # 设置过期时间为24小时 self.client.expire(key, 86400) return True except Exception as e: logging.error(f"添加新订单到队列失败: {str(e)}") return False def pop_orders_from_queue(self, user_id: int, community_id: int, count: int = 10) -> List[str]: """ 获取社区新订单列表 Args: user_id: 用户ID community_id: 社区ID count: 获取数量,默认10个 Returns: List[str]: 订单ID列表 """ try: key = f"user:{user_id}:community:{community_id}:new_orders" # 获取订单列表, orders = self.client.lpop(key, count) return orders except Exception as e: logging.error(f"获取社区新订单列表失败: {str(e)}") return [] # 创建全局实例 redis_client = RedisClient()