from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.db.models import Announcement, User from app.schemas.announcement import AnnouncementCreate, AnnouncementUpdate from app.services.notification_service import create_notifications_for_class async def create_announcement( db: AsyncSession, class_id: int, author_id: int, data: AnnouncementCreate ) -> Announcement: announcement = Announcement( class_id=class_id, author_id=author_id, title=data.title, content=data.content, is_pinned=data.is_pinned, ) db.add(announcement) await db.commit() await db.refresh(announcement) # Send notifications + email to class members content_preview = (data.content[:100] + "...") if data.content and len(data.content) > 100 else (data.content or "") await create_notifications_for_class( db, class_id, "announcement", f"新公告: {data.title}", content=content_preview, related_id=announcement.id, email_subject=f"HKU ICB - 新公告: {data.title}", email_body=f"

{content_preview}

" if content_preview else None, email_action_path="/announcements", ) return announcement async def update_announcement( db: AsyncSession, announcement: Announcement, data: AnnouncementUpdate ) -> Announcement: for field, value in data.model_dump(exclude_unset=True).items(): setattr(announcement, field, value) await db.commit() await db.refresh(announcement) return announcement async def delete_announcement(db: AsyncSession, announcement: Announcement): await db.delete(announcement) await db.commit() async def get_announcement_by_id(db: AsyncSession, announcement_id: int) -> Announcement | None: result = await db.execute( select(Announcement) .options(selectinload(Announcement.author)) .where(Announcement.id == announcement_id) ) return result.scalar_one_or_none() async def list_announcements( db: AsyncSession, class_id: int, page: int = 1, page_size: int = 20 ) -> tuple[list[Announcement], int]: total_result = await db.execute( select(func.count(Announcement.id)).where(Announcement.class_id == class_id) ) total = total_result.scalar() or 0 result = await db.execute( select(Announcement) .options(selectinload(Announcement.author)) .where(Announcement.class_id == class_id) .order_by(Announcement.is_pinned.desc(), Announcement.created_at.desc()) .offset((page - 1) * page_size) .limit(page_size) ) announcements = list(result.scalars().all()) return announcements, total