from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.core.auth import hash_password, verify_password, create_access_token from app.core.deps import get_current_user from app.db.database import get_db from app.db.models import ClassMembership, User from app.schemas.auth import ( ChangePasswordRequest, InviteCodeClassPreview, LoginRequest, RegisterRequest, SendEmailCodeRequest, ) from app.schemas.user import TokenResponse, UserOut, build_user_out from app.services.email_service import ( send_account_activated_email, send_email_verification_code_email, ) from app.services.email_verification_service import ( ACTIVATION_EMAIL_PURPOSE, issue_email_verification_code, verify_email_code, ) from app.services.member_activation_service import get_class_by_invite_code, validate_registration router = APIRouter(prefix="/api/auth", tags=["auth"]) @router.get("/invite-code/{invite_code}", response_model=InviteCodeClassPreview) async def preview_invite_code_class(invite_code: str, db: AsyncSession = Depends(get_db)): class_ = await get_class_by_invite_code(db, invite_code.strip().upper()) if class_ is None: raise HTTPException(status_code=404, detail="邀请码无效") return InviteCodeClassPreview( id=class_.id, name=class_.name, cohort_year=class_.cohort_year, ) @router.post("/email-verification-code") async def send_activation_email_code( req: SendEmailCodeRequest, db: AsyncSession = Depends(get_db), ): existing = await db.execute(select(User).where(User.email == req.email)) if existing.scalar_one_or_none(): raise HTTPException(status_code=400, detail="该邮箱已注册") activation_target = await validate_registration(db, req.invite_code, req.student_id) if activation_target is None: raise HTTPException(status_code=400, detail="邀请码或学号无效,或账号已激活") code = await issue_email_verification_code( db, email=req.email, purpose=ACTIVATION_EMAIL_PURPOSE, ) sent = await send_email_verification_code_email(req.email, code) if sent is False: raise HTTPException(status_code=500, detail="邮件发送失败,请检查邮件服务配置") return {"message": "验证码已发送,请查收邮箱"} @router.post("/activate") async def activate_account(req: RegisterRequest, db: AsyncSession = Depends(get_db)): # 1. Check if email is already in use existing = await db.execute(select(User).where(User.email == req.email)) if existing.scalar_one_or_none(): raise HTTPException(status_code=400, detail="该邮箱已注册") # 2. Validate invite_code + student_id against inactive class member activation_target = await validate_registration(db, req.invite_code, req.student_id) if activation_target is None: raise HTTPException(status_code=400, detail="邀请码或学号无效,或账号已激活") is_code_valid = await verify_email_code( db, email=req.email, code=req.email_code, purpose=ACTIVATION_EMAIL_PURPOSE, ) if not is_code_valid: raise HTTPException(status_code=400, detail="邮箱验证码无效或已过期") user, class_id = activation_target user.email = req.email user.password_hash = hash_password(req.password) user.status = "approved" await db.commit() result = await db.execute( select(User) .options( selectinload(User.memberships), selectinload(User.memberships).selectinload(ClassMembership.class_), ) .where(User.id == user.id) ) user = result.scalar_one() user.set_active_membership(class_id) # 3. Issue token — activation and login in one step token = create_access_token({"sub": str(user.id), "role": user.role}) await send_account_activated_email(req.email) return { "message": "账号激活成功", "token": token, "user": build_user_out(user, class_id), } @router.post("/login", response_model=TokenResponse) async def login(req: LoginRequest, db: AsyncSession = Depends(get_db)): result = await db.execute( select(User) .options( selectinload(User.memberships), selectinload(User.memberships).selectinload(ClassMembership.class_), ) .where(User.email == req.email) ) user = result.scalar_one_or_none() if user is None: raise HTTPException(status_code=401, detail="邮箱或密码错误") if user.status == "inactive": raise HTTPException(status_code=401, detail="账号尚未激活") if user.status == "disabled": raise HTTPException(status_code=401, detail="账号已被禁用") if not user.password_hash or not verify_password(req.password, user.password_hash): raise HTTPException(status_code=401, detail="邮箱或密码错误") token = create_access_token({"sub": str(user.id), "role": user.role}) return TokenResponse( token=token, user=build_user_out(user), ) @router.get("/me", response_model=UserOut) async def get_me(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)): default_membership = user.get_default_membership() user_out = build_user_out(user, default_membership.class_id if default_membership else None) # Attach enabled_modules from active class if default_membership: from app.db.models import Class_ from sqlalchemy import select result = await db.execute(select(Class_).where(Class_.id == default_membership.class_id)) class_ = result.scalar_one_or_none() if class_: user_out.enabled_modules = class_.get_enabled_modules() return user_out @router.put("/change-password") async def change_password( req: ChangePasswordRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): if not user.password_hash or not verify_password(req.old_password, user.password_hash): raise HTTPException(status_code=400, detail="Old password is incorrect") user.password_hash = hash_password(req.new_password) await db.commit() return {"message": "Password changed successfully"}