from datetime import datetime, timedelta, timezone from typing import Optional from jose import JWTError, jwt from app.core.config import settings from fastapi import Response from passlib.context import CryptContext # 密码加密上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() if expires_delta is not None: expire = datetime.now(timezone.utc) + expires_delta to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256") return encoded_jwt def set_jwt_cookie(response: Response, token: str): """设置JWT cookie""" response.set_cookie( key="access_token", value=f"Bearer {token}", httponly=True, # 防止JavaScript访问 secure=not settings.DEBUG, # 生产环境使用HTTPS samesite="lax", # CSRF保护 max_age=None if settings.ACCESS_TOKEN_EXPIRE_MINUTES is None else settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 ) def clear_jwt_cookie(response: Response): """清除JWT cookie""" response.delete_cookie( key="access_token", httponly=True, secure=not settings.DEBUG, samesite="lax" ) def verify_token(token: str) -> Optional[str]: try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) phone: str = payload.get("sub") if phone is None: return None return phone except JWTError: return None def get_password_hash(password: str) -> str: """获取密码哈希值""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """验证密码""" return pwd_context.verify(plain_password, hashed_password)