import json from sqlalchemy import select, or_, func, case from sqlalchemy.orm import selectinload from sqlalchemy.ext.asyncio import AsyncSession from app.db.models import ClassMembership, User from app.schemas.user import UserPublic async def search_directory( db: AsyncSession, class_id: int, search: str | None = None, industry: str | None = None, company: str | None = None, page: int = 1, page_size: int = 20, ) -> tuple[list[User], int]: """Search active members in a class.""" query = ( select(User) .options( selectinload(User.memberships), selectinload(User.memberships).selectinload(ClassMembership.class_), ) .join(ClassMembership) .where(ClassMembership.class_id == class_id, User.status == "approved") ) count_query = ( select(func.count(User.id)) .join(ClassMembership) .where(ClassMembership.class_id == class_id, User.status == "approved") ) if search: search_term = f"%{search}%" query = query.where( or_( User.name.ilike(search_term), User.company.ilike(search_term), User.position.ilike(search_term), ) ) count_query = count_query.where( or_( User.name.ilike(search_term), User.company.ilike(search_term), User.position.ilike(search_term), ) ) if industry: query = query.where(User.industry == industry) count_query = count_query.where(User.industry == industry) if company: company_term = f"%{company}%" query = query.where(User.company.ilike(company_term)) count_query = count_query.where(User.company.ilike(company_term)) total_result = await db.execute(count_query) total = total_result.scalar() or 0 # Committee role priority: 班长(1) > 副班长(2) > other roles(3) > no role(4) committee_order = case( (ClassMembership.committee_role == None, 4), (ClassMembership.committee_role == "班长", 1), (ClassMembership.committee_role == "副班长", 2), else_=3, ) result = await db.execute( query.order_by(committee_order, ClassMembership.committee_role, User.name) .offset((page - 1) * page_size) .limit(page_size) ) users = list(result.scalars().unique().all()) return users, total async def get_directory_role_counts(db: AsyncSession, class_id: int) -> dict[str, int]: result = await db.execute( select(ClassMembership.membership_role, func.count(ClassMembership.id)) .join(User) .where(ClassMembership.class_id == class_id, User.status == "approved") .group_by(ClassMembership.membership_role) ) counts = {"student": 0, "teacher": 0} for role, count in result.all(): if role in counts: counts[role] = count counts["total"] = counts["student"] + counts["teacher"] return counts def user_to_public( user: User, class_id: int | None = None, include_contact: bool = True ) -> UserPublic: """Convert User model to public profile, optionally hiding contact info.""" membership = user.get_membership(class_id) if class_id is not None else user.get_default_membership() return UserPublic( id=user.id, name=user.name, student_id=user.student_id, membership_role=membership.membership_role if membership else None, industry=user.industry, company=user.company, position=user.position, committee_role=membership.committee_role if membership else None, wechat_id=user.wechat_id if include_contact else None, phone=user.phone if include_contact else None, avatar_url=user.avatar_url, bio=user.bio, )