from datetime import datetime, timedelta, timezone from typing import Optional from jose import JWTError, jwt from app.core.config import settings from fastapi import Response from passlib.context import CryptContext # 密码加密上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() # 更新 token 数据,不设置过期时间 to_encode.update({ "userid": data.get("userid"), "phone": data.get("phone") }) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm="HS256") return encoded_jwt def set_jwt_cookie(response: Response, token: str): """设置JWT cookie""" response.set_cookie( key="access_token", value=f"Bearer {token}", httponly=True, # 防止JavaScript访问 # secure=not settings.DEBUG, # 生产环境使用HTTPS samesite="lax", # CSRF保护 max_age=None if settings.ACCESS_TOKEN_EXPIRE_MINUTES is None else settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 ) def clear_jwt_cookie(response: Response): """清除JWT cookie""" response.delete_cookie( key="access_token", httponly=True, # secure=not settings.DEBUG, samesite="lax" ) def verify_token(token: str) -> Optional[str]: try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) phone: str = payload.get("phone") if phone is None: return None return phone except JWTError: return None def get_password_hash(password: str) -> str: """获取密码哈希值""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """验证密码""" return pwd_context.verify(plain_password, hashed_password) def decode_jwt(token: str) -> dict: """解码 JWT token 获取完整信息""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) return { "userid": payload.get("userid"), "phone": payload.get("phone") } except: return None