hku-class/backend/app/services/user_service.py
2026-04-27 09:21:20 +08:00

223 lines
6.7 KiB
Python

from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models import ClassMembership, User
from app.core.auth import hash_password
from app.schemas.user import TeacherCreateRequest, TeacherAssignRequest, UserUpdate
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(
select(User)
.options(
selectinload(User.memberships),
selectinload(User.memberships).selectinload(ClassMembership.class_),
)
.where(User.email == email)
)
return result.scalar_one_or_none()
async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
result = await db.execute(
select(User)
.options(
selectinload(User.memberships),
selectinload(User.memberships).selectinload(ClassMembership.class_),
)
.where(User.id == user_id)
)
return result.scalar_one_or_none()
async def update_profile(db: AsyncSession, user: User, data: UserUpdate) -> User:
update_data = data.model_dump(exclude_unset=True)
# Handle email change with uniqueness check
if "email" in update_data and update_data["email"] != user.email:
existing = await db.execute(
select(User).where(User.email == update_data["email"])
)
if existing.scalar_one_or_none():
raise ValueError("该邮箱已被使用")
user.email = update_data.pop("email")
for field, value in update_data.items():
setattr(user, field, value)
await db.commit()
await db.refresh(user)
return user
async def update_user_status(
db: AsyncSession, user_id: int, status: str, role: str | None = None
) -> User | None:
user = await get_user_by_id(db, user_id)
if user is None:
return None
user.status = status
if role is not None:
user.role = role
await db.commit()
await db.refresh(user)
return user
async def update_user_role(
db: AsyncSession, user: User, role: str
) -> User:
user.role = role
await db.commit()
await db.refresh(user)
return user
async def update_user_committee_role(
db: AsyncSession, user: User, class_id: int, committee_role: str | None
) -> User:
membership = user.get_membership(class_id)
if membership is None:
raise ValueError("User is not a member of the class")
membership.committee_role = committee_role
await db.commit()
await db.refresh(user)
return user
async def update_user_class_permissions(
db: AsyncSession, user: User, class_id: int, class_permissions: list[str]
) -> User:
membership = user.get_membership(class_id)
if membership is None:
raise ValueError("User is not a member of the class")
membership.set_class_permissions(class_permissions)
await db.commit()
await db.refresh(user)
return user
async def list_users(
db: AsyncSession,
page: int = 1,
page_size: int = 20,
class_id: int | None = None,
status: str | None = None,
role: str | None = None,
) -> tuple[list[User], int]:
query = select(User).options(
selectinload(User.memberships),
selectinload(User.memberships).selectinload(ClassMembership.class_),
)
count_query = select(func.count(User.id))
if class_id is not None:
query = query.join(ClassMembership).where(ClassMembership.class_id == class_id)
count_query = count_query.join(ClassMembership).where(ClassMembership.class_id == class_id)
if status is not None:
query = query.where(User.status == status)
count_query = count_query.where(User.status == status)
if role is not None:
query = query.where(User.role == role)
count_query = count_query.where(User.role == role)
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
query = query.order_by(User.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
users = list(result.scalars().unique().all())
return users, total
async def create_or_assign_teacher(
db: AsyncSession,
data: TeacherCreateRequest,
) -> tuple[User, bool, bool]:
existing_user = await get_user_by_email(db, data.email)
created = False
assigned = False
if existing_user is not None:
if existing_user.role != "teacher":
raise ValueError("该邮箱已存在且不是老师账号")
membership = existing_user.get_membership(data.class_id)
if membership is None:
db.add(
ClassMembership(
user_id=existing_user.id,
class_id=data.class_id,
membership_role="teacher",
)
)
assigned = True
await db.commit()
refreshed = await get_user_by_id(db, existing_user.id)
if refreshed is None:
raise ValueError("老师账号创建失败")
refreshed.set_active_membership(data.class_id)
return refreshed, created, assigned
existing_user.set_active_membership(data.class_id)
return existing_user, created, assigned
user = User(
email=data.email,
password_hash=hash_password(data.password),
name=data.name.strip(),
student_id=None,
role="teacher",
status="approved",
)
db.add(user)
await db.flush()
db.add(
ClassMembership(
user_id=user.id,
class_id=data.class_id,
membership_role="teacher",
)
)
await db.commit()
created = True
assigned = True
refreshed = await get_user_by_id(db, user.id)
if refreshed is None:
raise ValueError("老师账号创建失败")
refreshed.set_active_membership(data.class_id)
return refreshed, created, assigned
async def assign_existing_teacher_to_class(
db: AsyncSession,
data: TeacherAssignRequest,
) -> tuple[User, bool]:
user = await get_user_by_email(db, data.email)
if user is None:
raise ValueError("未找到该邮箱对应的老师账号")
if user.role != "teacher":
raise ValueError("该邮箱对应的账号不是老师")
membership = user.get_membership(data.class_id)
if membership is not None:
user.set_active_membership(data.class_id)
return user, False
db.add(
ClassMembership(
user_id=user.id,
class_id=data.class_id,
membership_role="teacher",
)
)
await db.commit()
refreshed = await get_user_by_id(db, user.id)
if refreshed is None:
raise ValueError("老师分配失败")
refreshed.set_active_membership(data.class_id)
return refreshed, True