from datetime import datetime, timedelta, timezone from typing import Optional from jose import JWTError, jwt from app.core.config import settings from fastapi import Response from passlib.context import CryptContext # 密码加密上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() to_encode = { "sub": data.get("phone") } if expires_delta: to_encode.update({"exp": datetime.now(timezone.utc) + expires_delta}) else: to_encode.update({"exp": datetime.now(timezone.utc) + timedelta(days=180)}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256") return encoded_jwt def set_jwt_cookie(response: Response, token: str): """设置JWT cookie""" response.set_cookie( key="access_token", value=token, httponly=True, # 防止JavaScript访问 secure=not settings.DEBUG, # 生产环境使用HTTPS samesite="Lax", # CSRF保护 expires=datetime.now(timezone.utc) + timedelta(days=180), max_age=180*24*60*60 # 30天的秒数 ) def clear_jwt_cookie(response: Response): """清除JWT cookie""" response.delete_cookie( key="access_token", httponly=True, # secure=not settings.DEBUG, samesite="lax" ) def verify_token(token: str) -> Optional[str]: try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) sub: str = payload.get("sub") print(f"payload: {payload}") return sub except JWTError: return None def get_password_hash(password: str) -> str: """获取密码哈希值""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """验证密码""" return pwd_context.verify(plain_password, hashed_password) def decode_jwt(token: str) -> dict: """解码 JWT token 获取完整信息""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) return { "phone": payload.get("sub"), } except: return None