94 lines
2.5 KiB
Python
94 lines
2.5 KiB
Python
"""
|
||
JWT令牌服务
|
||
"""
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, Dict
|
||
from jose import JWTError, jwt
|
||
from app.config import get_settings
|
||
from app.utils.logger import logger
|
||
|
||
|
||
class JWTService:
|
||
"""JWT服务类"""
|
||
|
||
def __init__(self):
|
||
"""初始化JWT服务"""
|
||
settings = get_settings()
|
||
self.secret_key = settings.secret_key
|
||
self.algorithm = settings.jwt_algorithm
|
||
self.expire_days = settings.jwt_expire_days
|
||
|
||
def create_access_token(self, user_id: int, phone: str) -> str:
|
||
"""
|
||
创建访问令牌
|
||
|
||
Args:
|
||
user_id: 用户ID
|
||
phone: 手机号
|
||
|
||
Returns:
|
||
JWT token
|
||
"""
|
||
expire = datetime.utcnow() + timedelta(days=self.expire_days)
|
||
to_encode = {
|
||
"sub": str(user_id),
|
||
"phone": phone,
|
||
"exp": expire,
|
||
"iat": datetime.utcnow()
|
||
}
|
||
|
||
try:
|
||
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
|
||
logger.info(f"创建JWT token成功: user_id={user_id}")
|
||
return encoded_jwt
|
||
except Exception as e:
|
||
logger.error(f"创建JWT token失败: {e}")
|
||
raise
|
||
|
||
def verify_token(self, token: str) -> Dict:
|
||
"""
|
||
验证令牌
|
||
|
||
Args:
|
||
token: JWT token
|
||
|
||
Returns:
|
||
解码后的payload
|
||
|
||
Raises:
|
||
ValueError: token无效或过期
|
||
"""
|
||
try:
|
||
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
|
||
user_id = payload.get("sub")
|
||
if user_id is None:
|
||
raise ValueError("Token中缺少用户ID")
|
||
return payload
|
||
except JWTError as e:
|
||
logger.warning(f"Token验证失败: {e}")
|
||
raise ValueError(f"Token验证失败: {str(e)}")
|
||
except Exception as e:
|
||
logger.error(f"Token解析异常: {e}")
|
||
raise ValueError(f"Token解析异常: {str(e)}")
|
||
|
||
def decode_token_without_verification(self, token: str) -> Optional[Dict]:
|
||
"""
|
||
不验证签名解码token(用于调试)
|
||
|
||
Args:
|
||
token: JWT token
|
||
|
||
Returns:
|
||
解码后的payload或None
|
||
"""
|
||
try:
|
||
payload = jwt.decode(token, options={"verify_signature": False})
|
||
return payload
|
||
except Exception as e:
|
||
logger.error(f"Token解码失败: {e}")
|
||
return None
|
||
|
||
|
||
# 创建全局实例
|
||
jwt_service = JWTService()
|