import json from datetime import datetime from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.db.models import Timeline, TimelineLike, TimelineComment, User from app.schemas.timeline import TimelineCreate, TimelineUpdate, TimelineCommentCreate from app.services.notification_service import create_notifications_for_class async def create_timeline( db: AsyncSession, class_id: int, author_id: int, data: TimelineCreate ) -> Timeline: post = Timeline( class_id=class_id, author_id=author_id, title=data.title, content=data.content, ) db.add(post) await db.commit() await db.refresh(post) content_preview = (data.content[:100] + "...") if data.content and len(data.content) > 100 else (data.content or "") await create_notifications_for_class( db, class_id, "timeline", f"新动态: {data.title}", content=content_preview, related_id=post.id, email_subject=f"HKU ICB - 新动态: {data.title}", email_body=f"
{content_preview}
" if content_preview else None, email_action_path="/timeline", ) return post async def update_timeline( db: AsyncSession, post: Timeline, data: TimelineUpdate ) -> Timeline: for field, value in data.model_dump(exclude_unset=True).items(): setattr(post, field, value) await db.commit() await db.refresh(post) return post async def delete_timeline(db: AsyncSession, post: Timeline): await db.delete(post) await db.commit() async def get_timeline_by_id(db: AsyncSession, post_id: int) -> Timeline | None: result = await db.execute( select(Timeline) .options( selectinload(Timeline.author), selectinload(Timeline.likes), selectinload(Timeline.comments).selectinload(TimelineComment.author), ) .where(Timeline.id == post_id) ) return result.scalar_one_or_none() async def list_timelines( db: AsyncSession, class_id: int, page: int = 1, page_size: int = 20 ) -> tuple[list[Timeline], int]: total_result = await db.execute( select(func.count(Timeline.id)).where(Timeline.class_id == class_id) ) total = total_result.scalar() or 0 result = await db.execute( select(Timeline) .options( selectinload(Timeline.author), selectinload(Timeline.likes), selectinload(Timeline.comments), ) .where(Timeline.class_id == class_id) .order_by(Timeline.created_at.desc()) .offset((page - 1) * page_size) .limit(page_size) ) posts = list(result.scalars().all()) return posts, total async def add_images_to_timeline(db: AsyncSession, post: Timeline, urls: list[str]): existing = post.get_image_urls_list() existing.extend(urls) post.set_image_urls_list(existing) await db.commit() await db.refresh(post) async def toggle_like(db: AsyncSession, post_id: int, user_id: int) -> dict: result = await db.execute( select(TimelineLike).where( TimelineLike.post_id == post_id, TimelineLike.user_id == user_id, ) ) existing = result.scalar_one_or_none() if existing: await db.delete(existing) await db.commit() # Count remaining likes count_result = await db.execute( select(func.count(TimelineLike.id)).where(TimelineLike.post_id == post_id) ) return {"liked": False, "like_count": count_result.scalar() or 0} else: like = TimelineLike(post_id=post_id, user_id=user_id) db.add(like) await db.commit() count_result = await db.execute( select(func.count(TimelineLike.id)).where(TimelineLike.post_id == post_id) ) return {"liked": True, "like_count": count_result.scalar() or 0} async def create_comment( db: AsyncSession, post_id: int, author_id: int, data: TimelineCommentCreate ) -> TimelineComment: comment = TimelineComment( post_id=post_id, author_id=author_id, content=data.content, ) db.add(comment) await db.commit() await db.refresh(comment) # Load author relationship for response result = await db.execute( select(TimelineComment) .options(selectinload(TimelineComment.author)) .where(TimelineComment.id == comment.id) ) return result.scalar_one() async def delete_comment(db: AsyncSession, comment: TimelineComment): await db.delete(comment) await db.commit() async def get_comment_by_id(db: AsyncSession, comment_id: int) -> TimelineComment | None: result = await db.execute( select(TimelineComment) .options(selectinload(TimelineComment.author)) .where(TimelineComment.id == comment_id) ) return result.scalar_one_or_none() async def list_comments( db: AsyncSession, post_id: int, page: int = 1, page_size: int = 50 ) -> tuple[list[TimelineComment], int]: total_result = await db.execute( select(func.count(TimelineComment.id)).where(TimelineComment.post_id == post_id) ) total = total_result.scalar() or 0 result = await db.execute( select(TimelineComment) .options(selectinload(TimelineComment.author)) .where(TimelineComment.post_id == post_id) .order_by(TimelineComment.created_at.asc()) .offset((page - 1) * page_size) .limit(page_size) ) return list(result.scalars().all()), total