40 lines
1.3 KiB
Python
40 lines
1.3 KiB
Python
from datetime import datetime, timedelta
|
|
from typing import Dict
|
|
|
|
from fastapi import Cookie, HTTPException
|
|
from jose import JWTError, jwt
|
|
|
|
from app.config import get_settings
|
|
|
|
|
|
CONSOLE_AUTH_COOKIE = "console_access_token"
|
|
|
|
|
|
def create_console_access_token() -> str:
|
|
settings = get_settings()
|
|
expire = datetime.utcnow() + timedelta(days=max(1, int(settings.console_access_expire_days or 30)))
|
|
payload = {
|
|
"scope": "console_access",
|
|
"exp": expire,
|
|
"iat": datetime.utcnow(),
|
|
}
|
|
return jwt.encode(payload, settings.secret_key, algorithm=settings.jwt_algorithm)
|
|
|
|
|
|
def verify_console_access_token(token: str) -> Dict:
|
|
settings = get_settings()
|
|
try:
|
|
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.jwt_algorithm])
|
|
except JWTError as exc:
|
|
raise HTTPException(status_code=401, detail="总控台访问已失效,请重新登录") from exc
|
|
|
|
if payload.get("scope") != "console_access":
|
|
raise HTTPException(status_code=401, detail="总控台访问凭证无效")
|
|
return payload
|
|
|
|
|
|
def require_console_access(console_access_token: str | None = Cookie(default=None, alias=CONSOLE_AUTH_COOKIE)) -> Dict:
|
|
if not console_access_token:
|
|
raise HTTPException(status_code=401, detail="请先登录总控台")
|
|
return verify_console_access_token(console_access_token)
|