from sqlalchemy import select, func from sqlalchemy.orm import selectinload from sqlalchemy.ext.asyncio import AsyncSession from app.db.models import ClassMembership, User from app.core.auth import hash_password from app.schemas.user import TeacherCreateRequest, TeacherAssignRequest, UserUpdate async def get_user_by_email(db: AsyncSession, email: str) -> User | None: result = await db.execute( select(User) .options( selectinload(User.memberships), selectinload(User.memberships).selectinload(ClassMembership.class_), ) .where(User.email == email) ) return result.scalar_one_or_none() async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None: result = await db.execute( select(User) .options( selectinload(User.memberships), selectinload(User.memberships).selectinload(ClassMembership.class_), ) .where(User.id == user_id) ) return result.scalar_one_or_none() async def update_profile(db: AsyncSession, user: User, data: UserUpdate) -> User: update_data = data.model_dump(exclude_unset=True) # Handle email change with uniqueness check if "email" in update_data and update_data["email"] != user.email: existing = await db.execute( select(User).where(User.email == update_data["email"]) ) if existing.scalar_one_or_none(): raise ValueError("该邮箱已被使用") user.email = update_data.pop("email") for field, value in update_data.items(): setattr(user, field, value) await db.commit() await db.refresh(user) return user async def update_user_status( db: AsyncSession, user_id: int, status: str, role: str | None = None ) -> User | None: user = await get_user_by_id(db, user_id) if user is None: return None user.status = status if role is not None: user.role = role await db.commit() await db.refresh(user) return user async def update_user_role( db: AsyncSession, user: User, role: str ) -> User: user.role = role await db.commit() await db.refresh(user) return user async def update_user_committee_role( db: AsyncSession, user: User, class_id: int, committee_role: str | None ) -> User: membership = user.get_membership(class_id) if membership is None: raise ValueError("User is not a member of the class") membership.committee_role = committee_role await db.commit() await db.refresh(user) return user async def update_user_class_permissions( db: AsyncSession, user: User, class_id: int, class_permissions: list[str] ) -> User: membership = user.get_membership(class_id) if membership is None: raise ValueError("User is not a member of the class") membership.set_class_permissions(class_permissions) await db.commit() await db.refresh(user) return user async def list_users( db: AsyncSession, page: int = 1, page_size: int = 20, class_id: int | None = None, status: str | None = None, role: str | None = None, ) -> tuple[list[User], int]: query = select(User).options( selectinload(User.memberships), selectinload(User.memberships).selectinload(ClassMembership.class_), ) count_query = select(func.count(User.id)) if class_id is not None: query = query.join(ClassMembership).where(ClassMembership.class_id == class_id) count_query = count_query.join(ClassMembership).where(ClassMembership.class_id == class_id) if status is not None: query = query.where(User.status == status) count_query = count_query.where(User.status == status) if role is not None: query = query.where(User.role == role) count_query = count_query.where(User.role == role) total_result = await db.execute(count_query) total = total_result.scalar() or 0 query = query.order_by(User.created_at.desc()) query = query.offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) users = list(result.scalars().unique().all()) return users, total async def create_or_assign_teacher( db: AsyncSession, data: TeacherCreateRequest, ) -> tuple[User, bool, bool]: existing_user = await get_user_by_email(db, data.email) created = False assigned = False if existing_user is not None: if existing_user.role != "teacher": raise ValueError("该邮箱已存在且不是老师账号") membership = existing_user.get_membership(data.class_id) if membership is None: db.add( ClassMembership( user_id=existing_user.id, class_id=data.class_id, membership_role="teacher", ) ) assigned = True await db.commit() refreshed = await get_user_by_id(db, existing_user.id) if refreshed is None: raise ValueError("老师账号创建失败") refreshed.set_active_membership(data.class_id) return refreshed, created, assigned existing_user.set_active_membership(data.class_id) return existing_user, created, assigned user = User( email=data.email, password_hash=hash_password(data.password), name=data.name.strip(), student_id=None, role="teacher", status="approved", ) db.add(user) await db.flush() db.add( ClassMembership( user_id=user.id, class_id=data.class_id, membership_role="teacher", ) ) await db.commit() created = True assigned = True refreshed = await get_user_by_id(db, user.id) if refreshed is None: raise ValueError("老师账号创建失败") refreshed.set_active_membership(data.class_id) return refreshed, created, assigned async def assign_existing_teacher_to_class( db: AsyncSession, data: TeacherAssignRequest, ) -> tuple[User, bool]: user = await get_user_by_email(db, data.email) if user is None: raise ValueError("未找到该邮箱对应的老师账号") if user.role != "teacher": raise ValueError("该邮箱对应的账号不是老师") membership = user.get_membership(data.class_id) if membership is not None: user.set_active_membership(data.class_id) return user, False db.add( ClassMembership( user_id=user.id, class_id=data.class_id, membership_role="teacher", ) ) await db.commit() refreshed = await get_user_by_id(db, user.id) if refreshed is None: raise ValueError("老师分配失败") refreshed.set_active_membership(data.class_id) return refreshed, True