hku-class/backend/app/api/auth.py
2026-04-27 15:40:24 +08:00

131 lines
4.7 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.auth import hash_password, verify_password, create_access_token
from app.core.deps import get_current_user
from app.db.database import get_db
from app.db.models import ClassMembership, User
from app.schemas.auth import (
ChangePasswordRequest,
InviteCodeClassPreview,
LoginRequest,
RegisterRequest,
)
from app.schemas.user import TokenResponse, UserOut, build_user_out
from app.services.member_activation_service import get_class_by_invite_code, validate_registration
router = APIRouter(prefix="/api/auth", tags=["auth"])
@router.get("/invite-code/{invite_code}", response_model=InviteCodeClassPreview)
async def preview_invite_code_class(invite_code: str, db: AsyncSession = Depends(get_db)):
class_ = await get_class_by_invite_code(db, invite_code.strip().upper())
if class_ is None:
raise HTTPException(status_code=404, detail="邀请码无效")
return InviteCodeClassPreview(
id=class_.id,
name=class_.name,
cohort_year=class_.cohort_year,
)
@router.post("/activate")
async def activate_account(req: RegisterRequest, db: AsyncSession = Depends(get_db)):
# 1. Check if email is already in use
existing = await db.execute(select(User).where(User.email == req.email))
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="该邮箱已注册")
# 2. Validate invite_code + student_id against inactive class member
activation_target = await validate_registration(db, req.invite_code, req.student_id)
if activation_target is None:
raise HTTPException(status_code=400, detail="邀请码或学号无效,或账号已激活")
user, class_id = activation_target
user.email = req.email
user.password_hash = hash_password(req.password)
user.status = "approved"
await db.commit()
result = await db.execute(
select(User)
.options(
selectinload(User.memberships),
selectinload(User.memberships).selectinload(ClassMembership.class_),
)
.where(User.id == user.id)
)
user = result.scalar_one()
user.set_active_membership(class_id)
# 3. Issue token — activation and login in one step
token = create_access_token({"sub": str(user.id), "role": user.role})
return {
"message": "账号激活成功",
"token": token,
"user": build_user_out(user, class_id),
}
@router.post("/login", response_model=TokenResponse)
async def login(req: LoginRequest, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(User)
.options(
selectinload(User.memberships),
selectinload(User.memberships).selectinload(ClassMembership.class_),
)
.where(User.email == req.email)
)
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=401, detail="邮箱或密码错误")
if user.status == "inactive":
raise HTTPException(status_code=401, detail="账号尚未激活")
if user.status == "disabled":
raise HTTPException(status_code=401, detail="账号已被禁用")
if not user.password_hash or not verify_password(req.password, user.password_hash):
raise HTTPException(status_code=401, detail="邮箱或密码错误")
token = create_access_token({"sub": str(user.id), "role": user.role})
return TokenResponse(
token=token,
user=build_user_out(user),
)
@router.get("/me", response_model=UserOut)
async def get_me(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
default_membership = user.get_default_membership()
user_out = build_user_out(user, default_membership.class_id if default_membership else None)
# Attach enabled_modules from active class
if default_membership:
from app.db.models import Class_
from sqlalchemy import select
result = await db.execute(select(Class_).where(Class_.id == default_membership.class_id))
class_ = result.scalar_one_or_none()
if class_:
user_out.enabled_modules = class_.get_enabled_modules()
return user_out
@router.put("/change-password")
async def change_password(
req: ChangePasswordRequest,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
if not user.password_hash or not verify_password(req.old_password, user.password_hash):
raise HTTPException(status_code=400, detail="Old password is incorrect")
user.password_hash = hash_password(req.new_password)
await db.commit()
return {"message": "Password changed successfully"}