from datetime import datetime, timedelta, timezone from typing import Optional from jose import JWTError, jwt from app.core.config import settings from fastapi import Response from passlib.context import CryptContext # 密码加密上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() to_encode = { "sub": data.get("phone") } if expires_delta: to_encode.update({"exp": datetime.now(timezone.utc) + expires_delta}) else: to_encode.update({"exp": datetime.now(timezone.utc) + timedelta(days=180)}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256") return encoded_jwt def verify_token(token: str) -> Optional[str]: try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) sub: str = payload.get("sub") return sub except JWTError: return None def get_password_hash(password: str) -> str: """获取密码哈希值""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """验证密码""" return pwd_context.verify(plain_password, hashed_password) def decode_jwt(token: str) -> dict: """解码 JWT token 获取完整信息""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) return { "phone": payload.get("sub"), } except: return None