api/app/api/deps.py
2025-04-09 21:45:42 +08:00

62 lines
1.8 KiB
Python

from typing import Generator, Optional
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.ext.asyncio import AsyncSession
from app.core import security
from app.core.config import settings
from app.models.users import User
from app.schemas.user import User as UserSchema
from app.core.security import verify_token
from app.db.database import AsyncSessionLocal
from sqlalchemy import select
import logging
# OAuth2密码Bearer - JWT token的位置
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
async def get_db():
"""获取数据库会话"""
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
async def get_current_user(
request: Request,
db: AsyncSession = Depends(get_db)
) -> UserSchema:
"""获取当前登录用户"""
UNAUTHORIZED = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未提供认证信息",
headers={"WWW-Authenticate": "Bearer"},
)
auth_header = request.headers.get("Authorization")
if auth_header:
access_token = auth_header.split(" ")[1]
else:
access_token = request.session.get("access_token")
if not access_token:
raise UNAUTHORIZED
try:
sub = verify_token(access_token)
if sub:
user = (await db.execute(select(User).filter(User.openid == sub))).scalars().first()
if user:
return user
raise UNAUTHORIZED
except Exception as e:
logger.error(f"获取当前登录用户失败: {e}")
raise UNAUTHORIZED