astock-agent/backend/app/core/deps.py
2026-04-08 00:28:01 +08:00

58 lines
1.7 KiB
Python

"""FastAPI 认证依赖注入"""
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy import select
from app.core.auth import decode_access_token
from app.db.database import get_db
from app.db.tables import users_table
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
"""从 Authorization Bearer token 提取并验证用户"""
payload = decode_access_token(credentials.credentials)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 无效或已过期",
headers={"WWW-Authenticate": "Bearer"},
)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 格式错误",
)
async with get_db() as db:
result = await db.execute(
select(users_table).where(users_table.c.id == int(user_id))
)
user = result.mappings().first()
if user is None or not user["is_active"]:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在或已禁用",
)
return dict(user)
async def get_current_admin(
current_user: dict = Depends(get_current_user),
) -> dict:
"""要求管理员角色"""
if current_user["role"] != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限",
)
return current_user