37 lines
1.3 KiB
Python
37 lines
1.3 KiB
Python
from fastapi import Depends, HTTPException, Header, Cookie
|
||
from typing import Optional
|
||
from sqlalchemy.orm import Session
|
||
from app.models.database import get_db
|
||
from app.models.user import UserDB
|
||
from app.core.security import verify_token
|
||
|
||
async def get_current_user(
|
||
authorization: Optional[str] = Header(None),
|
||
access_token: Optional[str] = Cookie(None),
|
||
db: Session = Depends(get_db)
|
||
) -> UserDB:
|
||
# 优先使用Header中的token,其次使用Cookie中的token
|
||
token = None
|
||
if authorization and authorization.startswith("Bearer "):
|
||
token = authorization.split(" ")[1]
|
||
elif access_token and access_token.startswith("Bearer "):
|
||
token = access_token.split(" ")[1]
|
||
|
||
if not token:
|
||
raise HTTPException(status_code=401, detail="未提供有效的认证信息")
|
||
|
||
phone = verify_token(token)
|
||
if not phone:
|
||
raise HTTPException(status_code=401, detail="Token已过期或无效")
|
||
|
||
user = db.query(UserDB).filter(UserDB.phone == phone).first()
|
||
if not user:
|
||
raise HTTPException(status_code=401, detail="用户未登录")
|
||
return user
|
||
|
||
async def get_admin_user(
|
||
current_user: UserDB = Depends(get_current_user)
|
||
) -> UserDB:
|
||
if not current_user.is_admin:
|
||
raise HTTPException(status_code=403, detail="需要管理员权限")
|
||
return current_user |