""" JWT令牌服务 """ from datetime import datetime, timedelta from typing import Optional, Dict from jose import JWTError, jwt from app.config import get_settings from app.utils.logger import logger class JWTService: """JWT服务类""" def __init__(self): """初始化JWT服务""" settings = get_settings() self.secret_key = settings.secret_key self.algorithm = settings.jwt_algorithm self.expire_days = settings.jwt_expire_days def create_access_token(self, user_id: int, phone: str) -> str: """ 创建访问令牌 Args: user_id: 用户ID phone: 手机号 Returns: JWT token """ expire = datetime.utcnow() + timedelta(days=self.expire_days) to_encode = { "sub": str(user_id), "phone": phone, "exp": expire, "iat": datetime.utcnow() } try: encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm) logger.info(f"创建JWT token成功: user_id={user_id}") return encoded_jwt except Exception as e: logger.error(f"创建JWT token失败: {e}") raise def verify_token(self, token: str) -> Dict: """ 验证令牌 Args: token: JWT token Returns: 解码后的payload Raises: ValueError: token无效或过期 """ try: payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) user_id = payload.get("sub") if user_id is None: raise ValueError("Token中缺少用户ID") return payload except JWTError as e: logger.warning(f"Token验证失败: {e}") raise ValueError(f"Token验证失败: {str(e)}") except Exception as e: logger.error(f"Token解析异常: {e}") raise ValueError(f"Token解析异常: {str(e)}") def decode_token_without_verification(self, token: str) -> Optional[Dict]: """ 不验证签名解码token(用于调试) Args: token: JWT token Returns: 解码后的payload或None """ try: payload = jwt.decode(token, options={"verify_signature": False}) return payload except Exception as e: logger.error(f"Token解码失败: {e}") return None # 创建全局实例 jwt_service = JWTService()