99 lines
3.2 KiB
Python
99 lines
3.2 KiB
Python
import json
|
|
from sqlalchemy import select, or_, func, case
|
|
from sqlalchemy.orm import selectinload
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.db.models import ClassMembership, User
|
|
from app.schemas.user import UserPublic
|
|
|
|
|
|
async def search_directory(
|
|
db: AsyncSession,
|
|
class_id: int,
|
|
search: str | None = None,
|
|
industry: str | None = None,
|
|
company: str | None = None,
|
|
page: int = 1,
|
|
page_size: int = 20,
|
|
) -> tuple[list[User], int]:
|
|
"""Search active members in a class."""
|
|
query = (
|
|
select(User)
|
|
.options(
|
|
selectinload(User.memberships),
|
|
selectinload(User.memberships).selectinload(ClassMembership.class_),
|
|
)
|
|
.join(ClassMembership)
|
|
.where(ClassMembership.class_id == class_id, User.status == "approved")
|
|
)
|
|
count_query = (
|
|
select(func.count(User.id))
|
|
.join(ClassMembership)
|
|
.where(ClassMembership.class_id == class_id, User.status == "approved")
|
|
)
|
|
|
|
if search:
|
|
search_term = f"%{search}%"
|
|
query = query.where(
|
|
or_(
|
|
User.name.ilike(search_term),
|
|
User.company.ilike(search_term),
|
|
User.position.ilike(search_term),
|
|
)
|
|
)
|
|
count_query = count_query.where(
|
|
or_(
|
|
User.name.ilike(search_term),
|
|
User.company.ilike(search_term),
|
|
User.position.ilike(search_term),
|
|
)
|
|
)
|
|
|
|
if industry:
|
|
query = query.where(User.industry == industry)
|
|
count_query = count_query.where(User.industry == industry)
|
|
|
|
if company:
|
|
company_term = f"%{company}%"
|
|
query = query.where(User.company.ilike(company_term))
|
|
count_query = count_query.where(User.company.ilike(company_term))
|
|
|
|
total_result = await db.execute(count_query)
|
|
total = total_result.scalar() or 0
|
|
|
|
# Committee role priority: 班长(1) > 副班长(2) > other roles(3) > no role(4)
|
|
committee_order = case(
|
|
(ClassMembership.committee_role == None, 4),
|
|
(ClassMembership.committee_role == "班长", 1),
|
|
(ClassMembership.committee_role == "副班长", 2),
|
|
else_=3,
|
|
)
|
|
|
|
result = await db.execute(
|
|
query.order_by(committee_order, ClassMembership.committee_role, User.name)
|
|
.offset((page - 1) * page_size)
|
|
.limit(page_size)
|
|
)
|
|
users = list(result.scalars().unique().all())
|
|
return users, total
|
|
|
|
|
|
def user_to_public(
|
|
user: User, class_id: int | None = None, include_contact: bool = True
|
|
) -> UserPublic:
|
|
"""Convert User model to public profile, optionally hiding contact info."""
|
|
membership = user.get_membership(class_id) if class_id is not None else user.get_default_membership()
|
|
return UserPublic(
|
|
id=user.id,
|
|
name=user.name,
|
|
student_id=user.student_id,
|
|
industry=user.industry,
|
|
company=user.company,
|
|
position=user.position,
|
|
committee_role=membership.committee_role if membership else None,
|
|
wechat_id=user.wechat_id if include_contact else None,
|
|
phone=user.phone if include_contact else None,
|
|
avatar_url=user.avatar_url,
|
|
bio=user.bio,
|
|
)
|