223 lines
6.7 KiB
Python
223 lines
6.7 KiB
Python
from sqlalchemy import select, func
|
|
from sqlalchemy.orm import selectinload
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.db.models import ClassMembership, User
|
|
from app.core.auth import hash_password
|
|
from app.schemas.user import TeacherCreateRequest, TeacherAssignRequest, UserUpdate
|
|
|
|
|
|
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
|
|
result = await db.execute(
|
|
select(User)
|
|
.options(
|
|
selectinload(User.memberships),
|
|
selectinload(User.memberships).selectinload(ClassMembership.class_),
|
|
)
|
|
.where(User.email == email)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
|
|
async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
|
|
result = await db.execute(
|
|
select(User)
|
|
.options(
|
|
selectinload(User.memberships),
|
|
selectinload(User.memberships).selectinload(ClassMembership.class_),
|
|
)
|
|
.where(User.id == user_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
|
|
async def update_profile(db: AsyncSession, user: User, data: UserUpdate) -> User:
|
|
update_data = data.model_dump(exclude_unset=True)
|
|
|
|
# Handle email change with uniqueness check
|
|
if "email" in update_data and update_data["email"] != user.email:
|
|
existing = await db.execute(
|
|
select(User).where(User.email == update_data["email"])
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
raise ValueError("该邮箱已被使用")
|
|
user.email = update_data.pop("email")
|
|
|
|
for field, value in update_data.items():
|
|
setattr(user, field, value)
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
async def update_user_status(
|
|
db: AsyncSession, user_id: int, status: str, role: str | None = None
|
|
) -> User | None:
|
|
user = await get_user_by_id(db, user_id)
|
|
if user is None:
|
|
return None
|
|
user.status = status
|
|
if role is not None:
|
|
user.role = role
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
async def update_user_role(
|
|
db: AsyncSession, user: User, role: str
|
|
) -> User:
|
|
user.role = role
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
async def update_user_committee_role(
|
|
db: AsyncSession, user: User, class_id: int, committee_role: str | None
|
|
) -> User:
|
|
membership = user.get_membership(class_id)
|
|
if membership is None:
|
|
raise ValueError("User is not a member of the class")
|
|
membership.committee_role = committee_role
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
async def update_user_class_permissions(
|
|
db: AsyncSession, user: User, class_id: int, class_permissions: list[str]
|
|
) -> User:
|
|
membership = user.get_membership(class_id)
|
|
if membership is None:
|
|
raise ValueError("User is not a member of the class")
|
|
membership.set_class_permissions(class_permissions)
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
async def list_users(
|
|
db: AsyncSession,
|
|
page: int = 1,
|
|
page_size: int = 20,
|
|
class_id: int | None = None,
|
|
status: str | None = None,
|
|
role: str | None = None,
|
|
) -> tuple[list[User], int]:
|
|
query = select(User).options(
|
|
selectinload(User.memberships),
|
|
selectinload(User.memberships).selectinload(ClassMembership.class_),
|
|
)
|
|
count_query = select(func.count(User.id))
|
|
|
|
if class_id is not None:
|
|
query = query.join(ClassMembership).where(ClassMembership.class_id == class_id)
|
|
count_query = count_query.join(ClassMembership).where(ClassMembership.class_id == class_id)
|
|
if status is not None:
|
|
query = query.where(User.status == status)
|
|
count_query = count_query.where(User.status == status)
|
|
if role is not None:
|
|
query = query.where(User.role == role)
|
|
count_query = count_query.where(User.role == role)
|
|
|
|
total_result = await db.execute(count_query)
|
|
total = total_result.scalar() or 0
|
|
|
|
query = query.order_by(User.created_at.desc())
|
|
query = query.offset((page - 1) * page_size).limit(page_size)
|
|
result = await db.execute(query)
|
|
users = list(result.scalars().unique().all())
|
|
|
|
return users, total
|
|
|
|
|
|
async def create_or_assign_teacher(
|
|
db: AsyncSession,
|
|
data: TeacherCreateRequest,
|
|
) -> tuple[User, bool, bool]:
|
|
existing_user = await get_user_by_email(db, data.email)
|
|
created = False
|
|
assigned = False
|
|
|
|
if existing_user is not None:
|
|
if existing_user.role != "teacher":
|
|
raise ValueError("该邮箱已存在且不是老师账号")
|
|
|
|
membership = existing_user.get_membership(data.class_id)
|
|
if membership is None:
|
|
db.add(
|
|
ClassMembership(
|
|
user_id=existing_user.id,
|
|
class_id=data.class_id,
|
|
membership_role="teacher",
|
|
)
|
|
)
|
|
assigned = True
|
|
await db.commit()
|
|
refreshed = await get_user_by_id(db, existing_user.id)
|
|
if refreshed is None:
|
|
raise ValueError("老师账号创建失败")
|
|
refreshed.set_active_membership(data.class_id)
|
|
return refreshed, created, assigned
|
|
|
|
existing_user.set_active_membership(data.class_id)
|
|
return existing_user, created, assigned
|
|
|
|
user = User(
|
|
email=data.email,
|
|
password_hash=hash_password(data.password),
|
|
name=data.name.strip(),
|
|
student_id=None,
|
|
role="teacher",
|
|
status="approved",
|
|
)
|
|
db.add(user)
|
|
await db.flush()
|
|
db.add(
|
|
ClassMembership(
|
|
user_id=user.id,
|
|
class_id=data.class_id,
|
|
membership_role="teacher",
|
|
)
|
|
)
|
|
await db.commit()
|
|
created = True
|
|
assigned = True
|
|
|
|
refreshed = await get_user_by_id(db, user.id)
|
|
if refreshed is None:
|
|
raise ValueError("老师账号创建失败")
|
|
refreshed.set_active_membership(data.class_id)
|
|
return refreshed, created, assigned
|
|
|
|
|
|
async def assign_existing_teacher_to_class(
|
|
db: AsyncSession,
|
|
data: TeacherAssignRequest,
|
|
) -> tuple[User, bool]:
|
|
user = await get_user_by_email(db, data.email)
|
|
if user is None:
|
|
raise ValueError("未找到该邮箱对应的老师账号")
|
|
if user.role != "teacher":
|
|
raise ValueError("该邮箱对应的账号不是老师")
|
|
|
|
membership = user.get_membership(data.class_id)
|
|
if membership is not None:
|
|
user.set_active_membership(data.class_id)
|
|
return user, False
|
|
|
|
db.add(
|
|
ClassMembership(
|
|
user_id=user.id,
|
|
class_id=data.class_id,
|
|
membership_role="teacher",
|
|
)
|
|
)
|
|
await db.commit()
|
|
refreshed = await get_user_by_id(db, user.id)
|
|
if refreshed is None:
|
|
raise ValueError("老师分配失败")
|
|
refreshed.set_active_membership(data.class_id)
|
|
return refreshed, True
|