hku-class/backend/app/services/schedule_service.py
2026-04-27 11:04:00 +08:00

106 lines
3.5 KiB
Python

from datetime import datetime, timezone
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models import Schedule
from app.schemas.schedule import ScheduleCreate, ScheduleUpdate
from app.services.notification_service import create_notifications_for_class
async def create_schedule(
db: AsyncSession, class_id: int, data: ScheduleCreate
) -> Schedule:
payload = data.model_dump()
if payload.get("type") == "deadline" and payload.get("end_time") is not None:
# Deadlines are anchored on the due time only; reuse start_time as the sort/query key.
payload["start_time"] = payload["end_time"]
item = Schedule(
class_id=class_id,
**payload,
)
db.add(item)
await db.commit()
await db.refresh(item)
# Send notifications + email to class members
effective_time = payload["end_time"] if payload.get("type") == "deadline" and payload.get("end_time") else payload["start_time"]
time_str = effective_time.strftime("%Y-%m-%d %H:%M")
location_info = f" · {data.location}" if data.location else ""
await create_notifications_for_class(
db, class_id, "schedule", f"新排期: {data.title}",
content=f"{time_str}{location_info}",
related_id=item.id,
email_subject=f"HKU ICB - 新排期: {data.title}",
email_body=f"<p><strong>{data.title}</strong></p><p>时间: {time_str}{location_info}</p>",
email_action_path="/schedule",
)
return item
async def update_schedule(
db: AsyncSession, item: Schedule, data: ScheduleUpdate
) -> Schedule:
payload = data.model_dump(exclude_unset=True)
next_type = payload.get("type", item.type)
next_end_time = payload.get("end_time", item.end_time)
if next_type == "deadline" and next_end_time is not None:
payload["start_time"] = next_end_time
for field, value in payload.items():
setattr(item, field, value)
await db.commit()
await db.refresh(item)
return item
async def delete_schedule(db: AsyncSession, item: Schedule):
await db.delete(item)
await db.commit()
async def get_schedule_by_id(db: AsyncSession, schedule_id: int) -> Schedule | None:
result = await db.execute(select(Schedule).where(Schedule.id == schedule_id))
return result.scalar_one_or_none()
async def list_schedules(
db: AsyncSession,
class_id: int,
schedule_type: str | None = None,
page: int = 1,
page_size: int = 50,
) -> tuple[list[Schedule], int]:
query = select(Schedule).where(Schedule.class_id == class_id)
count_query = select(func.count(Schedule.id)).where(Schedule.class_id == class_id)
if schedule_type:
query = query.where(Schedule.type == schedule_type)
count_query = count_query.where(Schedule.type == schedule_type)
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
result = await db.execute(
query.order_by(Schedule.start_time.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
items = list(result.scalars().all())
return items, total
async def get_upcoming_schedules(
db: AsyncSession, class_id: int, limit: int = 10
) -> list[Schedule]:
now = datetime.now(timezone.utc)
result = await db.execute(
select(Schedule)
.where(Schedule.class_id == class_id, Schedule.start_time >= now)
.order_by(Schedule.start_time.asc())
.limit(limit)
)
return list(result.scalars().all())