deliveryman-api/app/core/security.py
2025-03-06 15:04:10 +08:00

73 lines
2.2 KiB
Python

from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from app.core.config import settings
from fastapi import Response
from passlib.context import CryptContext
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
to_encode = {
"sub": data.get("phone")
}
if expires_delta:
to_encode.update({"exp": datetime.now(timezone.utc) + expires_delta})
else:
to_encode.update({"exp": datetime.now(timezone.utc) + timedelta(days=180)})
print(to_encode)
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256")
return encoded_jwt
def set_jwt_cookie(response: Response, token: str):
"""设置JWT cookie"""
response.set_cookie(
key="access_token",
value=token,
httponly=True, # 防止JavaScript访问
# secure=not settings.DEBUG, # 生产环境使用HTTPS
samesite="lax", # CSRF保护
max_age=None if settings.ACCESS_TOKEN_EXPIRE_MINUTES is None
else settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
def clear_jwt_cookie(response: Response):
"""清除JWT cookie"""
response.delete_cookie(
key="access_token",
httponly=True,
# secure=not settings.DEBUG,
samesite="lax"
)
def verify_token(token: str) -> Optional[str]:
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
sub: str = payload.get("sub")
print(f"payload: {payload}")
return sub
except JWTError:
return None
def get_password_hash(password: str) -> str:
"""获取密码哈希值"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def decode_jwt(token: str) -> dict:
"""解码 JWT token 获取完整信息"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
return {
"phone": payload.get("sub"),
}
except:
return None