from fastapi import APIRouter, Depends, HTTPException, Body, Query, Path from app.core.response import success_response, ResponseModel from app.core.scheduler import scheduler from app.api.deps import get_admin_user from app.models.user import UserDB from typing import List, Optional, Dict, Any, Union from pydantic import BaseModel, Field from datetime import datetime import importlib import inspect import logging logger = logging.getLogger(__name__) router = APIRouter() class JobInfo(BaseModel): """任务信息模型""" id: str name: str next_run_time: Optional[datetime] = None trigger: str active: bool class CronJobCreate(BaseModel): """Cron定时任务创建模型""" job_id: str = Field(..., description="任务ID,唯一标识") module_path: str = Field(..., description="模块路径,如'app.tasks.daily_tasks'") function_name: str = Field(..., description="函数名称,如'daily_database_cleanup'") hour: Optional[Union[int, str]] = Field(None, description="小时 (0-23)") minute: Optional[Union[int, str]] = Field(None, description="分钟 (0-59)") second: Optional[Union[int, str]] = Field(None, description="秒 (0-59)") day: Optional[Union[int, str]] = Field(None, description="日期 (1-31)") month: Optional[Union[int, str]] = Field(None, description="月份 (1-12)") day_of_week: Optional[Union[int, str]] = Field(None, description="工作日 (0-6 或 mon,tue,wed,thu,fri,sat,sun)") replace_existing: bool = Field(True, description="如果任务已存在,是否替换") class IntervalJobCreate(BaseModel): """间隔定时任务创建模型""" job_id: str = Field(..., description="任务ID,唯一标识") module_path: str = Field(..., description="模块路径,如'app.tasks.daily_tasks'") function_name: str = Field(..., description="函数名称,如'daily_database_cleanup'") seconds: Optional[int] = Field(None, description="间隔秒数") minutes: Optional[int] = Field(None, description="间隔分钟数") hours: Optional[int] = Field(None, description="间隔小时数") days: Optional[int] = Field(None, description="间隔天数") replace_existing: bool = Field(True, description="如果任务已存在,是否替换") @router.get("/jobs", response_model=ResponseModel) async def get_jobs( admin: UserDB = Depends(get_admin_user) ): """获取所有定时任务""" jobs = scheduler.get_jobs() job_list = [] for job in jobs: job_info = { "id": job.id, "name": job.name or job.func.__name__, "next_run_time": job.next_run_time, "trigger": str(job.trigger), "active": job.next_run_time is not None } job_list.append(job_info) return success_response(data=job_list) @router.post("/jobs/{job_id}/pause", response_model=ResponseModel) async def pause_job( job_id: str = Path(..., description="任务ID"), admin: UserDB = Depends(get_admin_user) ): """暂停定时任务""" success = scheduler.pause_job(job_id) if not success: raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法暂停") return success_response(message=f"任务 {job_id} 已暂停") @router.post("/jobs/{job_id}/resume", response_model=ResponseModel) async def resume_job( job_id: str = Path(..., description="任务ID"), admin: UserDB = Depends(get_admin_user) ): """恢复定时任务""" success = scheduler.resume_job(job_id) if not success: raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法恢复") return success_response(message=f"任务 {job_id} 已恢复") @router.delete("/jobs/{job_id}", response_model=ResponseModel) async def remove_job( job_id: str = Path(..., description="任务ID"), admin: UserDB = Depends(get_admin_user) ): """删除定时任务""" success = scheduler.remove_job(job_id) if not success: raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法删除") return success_response(message=f"任务 {job_id} 已删除") @router.get("/status", response_model=ResponseModel) async def get_scheduler_status( admin: UserDB = Depends(get_admin_user) ): """获取调度器状态""" status = { "running": scheduler.scheduler.running, "job_count": len(scheduler.get_jobs()), "timezone": str(scheduler.timezone) } return success_response(data=status) @router.post("/jobs/cron", response_model=ResponseModel) async def create_cron_job( job_data: CronJobCreate, admin: UserDB = Depends(get_admin_user) ): """创建Cron定时任务""" try: # 动态导入模块和函数 module = importlib.import_module(job_data.module_path) func = getattr(module, job_data.function_name) # 验证函数是否可调用 if not callable(func): raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不可调用") # 构建cron参数 cron_params = {} for field in ["hour", "minute", "second", "day", "month", "day_of_week"]: value = getattr(job_data, field) if value is not None: cron_params[field] = value # 添加任务 job = scheduler.add_cron_job( func, job_id=job_data.job_id, **cron_params ) return success_response( message=f"Cron定时任务 {job_data.job_id} 创建成功", data={ "job_id": job.id, "next_run_time": job.next_run_time } ) except ImportError: raise HTTPException(status_code=400, detail=f"模块 {job_data.module_path} 不存在") except AttributeError: raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不存在") except Exception as e: logger.error(f"创建Cron定时任务失败: {str(e)}") raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}") @router.post("/jobs/interval", response_model=ResponseModel) async def create_interval_job( job_data: IntervalJobCreate, admin: UserDB = Depends(get_admin_user) ): """创建间隔定时任务""" try: # 动态导入模块和函数 module = importlib.import_module(job_data.module_path) func = getattr(module, job_data.function_name) # 验证函数是否可调用 if not callable(func): raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不可调用") # 构建间隔参数 interval_params = {} for field in ["seconds", "minutes", "hours", "days"]: value = getattr(job_data, field) if value is not None: interval_params[field] = value # 添加任务 job = scheduler.add_interval_job( func, job_id=job_data.job_id, **interval_params ) return success_response( message=f"间隔定时任务 {job_data.job_id} 创建成功", data={ "job_id": job.id, "next_run_time": job.next_run_time } ) except ImportError: raise HTTPException(status_code=400, detail=f"模块 {job_data.module_path} 不存在") except AttributeError: raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不存在") except Exception as e: logger.error(f"创建间隔定时任务失败: {str(e)}") raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}") @router.get("/available-tasks", response_model=ResponseModel) async def get_available_tasks( admin: UserDB = Depends(get_admin_user) ): """获取可用的定时任务函数""" try: # 导入任务模块 from app.tasks import daily_tasks # 获取所有异步函数 tasks = [] for name, func in inspect.getmembers(daily_tasks): if inspect.iscoroutinefunction(func) and not name.startswith('_'): # 获取函数文档 doc = inspect.getdoc(func) or "无描述" tasks.append({ "module": "app.tasks.daily_tasks", "name": name, "description": doc.split('\n')[0] # 获取第一行作为简短描述 }) return success_response(data=tasks) except Exception as e: logger.error(f"获取可用任务失败: {str(e)}") raise HTTPException(status_code=500, detail=f"获取可用任务失败: {str(e)}")