deliveryman-api/app/core/redis_client.py
2025-03-12 00:32:34 +08:00

123 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import redis
import json
import logging
from app.core.config import settings
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from app.models.user import UserDB
from datetime import datetime
from app.models.user import UserRole
from datetime import date
class RedisClient:
"""Redis 客户端"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(RedisClient, cls).__new__(cls)
try:
cls._instance.client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
password=settings.REDIS_PASSWORD,
decode_responses=True # 自动解码为字符串
)
# 测试连接
cls._instance.client.ping()
logging.info("Redis 连接成功")
except Exception as e:
logging.error(f"Redis 连接失败: {str(e)}")
cls._instance.client = None
return cls._instance
def get_client(self) -> redis.Redis:
"""获取 Redis 客户端"""
return self.client
def push_order_to_community_period_queue(self, delivery_date: date,community_time_period_id: int, order_id: str) -> bool:
"""
添加新订单到今日队列
"""
try:
today_date_str = delivery_date.strftime("%Y-%m-%d")
key = f"community_period:{community_time_period_id}:today_orders:{today_date_str}"
self.client.lpush(key, order_id)
# 设置过期时间为24小时
self.client.expire(key, 86400)
return True
except Exception as e:
logging.error(f"添加新订单到今日队列失败: {str(e)}")
return False
def get_community_period_orders_count(self, delivery_date: date, community_time_period_id: int) -> int:
"""
获取今日队列中的订单数量
"""
try:
today_date_str = delivery_date.strftime("%Y-%m-%d")
key = f"community_period:{community_time_period_id}:today_orders:{today_date_str}"
return self.client.llen(key)
except Exception as e:
logging.error(f"获取今日队列中的订单数量失败: {str(e)}")
return 0
def push_new_order_to_queue(self, community_id: int, order_id: str, db: Session) -> bool:
"""
添加新订单到社区队列
Args:
user_id: 用户ID
community_id: 社区ID
order_id: 订单ID
Returns:
bool: 是否添加成功
"""
try:
# 查询所有社区的用户
users = db.query(UserDB).filter(UserDB.community_id == community_id).all()
for user in users:
key = f"deliveryman:{user.userid}:new_orders"
# 使用 LPUSH 将订单ID添加到列表头部
self.client.lpush(key, order_id)
# 设置过期时间为24小时
self.client.expire(key, 86400)
return True
except Exception as e:
logging.error(f"添加新订单到队列失败: {str(e)}")
return False
def pop_new_orders_from_queue(self, user_id: int) -> List[str]:
"""
获取社区新订单列表
Args:
user_id: 用户ID
community_id: 社区ID
count: 获取数量默认10个
Returns:
List[str]: 订单ID列表
"""
try:
key = f"deliveryman:{user_id}:new_orders"
# 获取所有订单号
orders = []
while True:
order = self.client.lpop(key)
if order is None:
break
orders.append(order)
return orders
except Exception as e:
logging.error(f"获取社区新订单列表失败: {str(e)}")
return []
# 创建全局实例
redis_client = RedisClient()