from datetime import datetime, timedelta from typing import Dict from fastapi import Cookie, HTTPException from jose import JWTError, jwt from app.config import get_settings CONSOLE_AUTH_COOKIE = "console_access_token" def create_console_access_token() -> str: settings = get_settings() expire = datetime.utcnow() + timedelta(days=max(1, int(settings.console_access_expire_days or 30))) payload = { "scope": "console_access", "exp": expire, "iat": datetime.utcnow(), } return jwt.encode(payload, settings.secret_key, algorithm=settings.jwt_algorithm) def verify_console_access_token(token: str) -> Dict: settings = get_settings() try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.jwt_algorithm]) except JWTError as exc: raise HTTPException(status_code=401, detail="总控台访问已失效,请重新登录") from exc if payload.get("scope") != "console_access": raise HTTPException(status_code=401, detail="总控台访问凭证无效") return payload def require_console_access(console_access_token: str | None = Cookie(default=None, alias=CONSOLE_AUTH_COOKIE)) -> Dict: if not console_access_token: raise HTTPException(status_code=401, detail="请先登录总控台") return verify_console_access_token(console_access_token)