import json from datetime import datetime from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.db.models import Timeline, User from app.schemas.timeline import TimelineCreate, TimelineUpdate from app.services.notification_service import create_notifications_for_class async def create_timeline( db: AsyncSession, class_id: int, author_id: int, data: TimelineCreate ) -> Timeline: post = Timeline( class_id=class_id, author_id=author_id, title=data.title, content=data.content, ) db.add(post) await db.commit() await db.refresh(post) # Send notifications + email to class members content_preview = (data.content[:100] + "...") if data.content and len(data.content) > 100 else (data.content or "") await create_notifications_for_class( db, class_id, "timeline", f"新大事件: {data.title}", content=content_preview, related_id=post.id, email_subject=f"HKU ICB - 新大事件: {data.title}", email_body=f"
{content_preview}
" if content_preview else None, email_action_path="/timeline", ) return post async def update_timeline( db: AsyncSession, post: Timeline, data: TimelineUpdate ) -> Timeline: for field, value in data.model_dump(exclude_unset=True).items(): setattr(post, field, value) await db.commit() await db.refresh(post) return post async def delete_timeline(db: AsyncSession, post: Timeline): await db.delete(post) await db.commit() async def get_timeline_by_id(db: AsyncSession, post_id: int) -> Timeline | None: result = await db.execute(select(Timeline).where(Timeline.id == post_id)) return result.scalar_one_or_none() async def list_timelines( db: AsyncSession, class_id: int, page: int = 1, page_size: int = 20 ) -> tuple[list[Timeline], int]: total_result = await db.execute( select(func.count(Timeline.id)).where(Timeline.class_id == class_id) ) total = total_result.scalar() or 0 result = await db.execute( select(Timeline) .options(selectinload(Timeline.author)) .where(Timeline.class_id == class_id) .order_by(Timeline.created_at.desc()) .offset((page - 1) * page_size) .limit(page_size) ) posts = list(result.scalars().all()) return posts, total async def add_images_to_timeline(db: AsyncSession, post: Timeline, urls: list[str]): existing = post.get_image_urls_list() existing.extend(urls) post.set_image_urls_list(existing) await db.commit() await db.refresh(post)