from fastapi import APIRouter, Depends, HTTPException, Body, Query, Path from app.core.response import success_response, ResponseModel from app.core.scheduler import scheduler from app.api.deps import get_admin_user from app.models.user import UserDB from typing import List, Optional, Dict, Any, Union from pydantic import BaseModel, Field from datetime import datetime import importlib import inspect import logging from app.models.database import get_db_context from sqlalchemy import text import asyncio import threading logger = logging.getLogger(__name__) router = APIRouter() class JobInfo(BaseModel): """任务信息模型""" id: str name: str next_run_time: Optional[datetime] = None trigger: str active: bool class CronJobCreate(BaseModel): """Cron定时任务创建模型""" job_id: str = Field(..., description="任务ID,唯一标识") module_path: str = Field(..., description="模块路径,如'app.tasks.daily_tasks'") function_name: str = Field(..., description="函数名称,如'daily_database_cleanup'") hour: Optional[Union[int, str]] = Field(None, description="小时 (0-23)") minute: Optional[Union[int, str]] = Field(None, description="分钟 (0-59)") second: Optional[Union[int, str]] = Field(None, description="秒 (0-59)") day: Optional[Union[int, str]] = Field(None, description="日期 (1-31)") month: Optional[Union[int, str]] = Field(None, description="月份 (1-12)") day_of_week: Optional[Union[int, str]] = Field(None, description="工作日 (0-6 或 mon,tue,wed,thu,fri,sat,sun)") replace_existing: bool = Field(True, description="如果任务已存在,是否替换") class IntervalJobCreate(BaseModel): """间隔定时任务创建模型""" job_id: str = Field(..., description="任务ID,唯一标识") module_path: str = Field(..., description="模块路径,如'app.tasks.daily_tasks'") function_name: str = Field(..., description="函数名称,如'daily_database_cleanup'") seconds: Optional[int] = Field(None, description="间隔秒数") minutes: Optional[int] = Field(None, description="间隔分钟数") hours: Optional[int] = Field(None, description="间隔小时数") days: Optional[int] = Field(None, description="间隔天数") replace_existing: bool = Field(True, description="如果任务已存在,是否替换") @router.get("/jobs", response_model=ResponseModel) async def get_jobs( admin: UserDB = Depends(get_admin_user) ): """获取所有定时任务""" jobs = scheduler.get_jobs() job_list = [] for job in jobs: job_info = { "id": job.id, "name": job.name or job.func.__name__, "next_run_time": job.next_run_time, "trigger": str(job.trigger), "active": job.next_run_time is not None } job_list.append(job_info) return success_response(data=job_list) @router.post("/jobs/{job_id}/pause", response_model=ResponseModel) async def pause_job( job_id: str = Path(..., description="任务ID"), admin: UserDB = Depends(get_admin_user) ): """暂停定时任务""" success = scheduler.pause_job(job_id) if not success: raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法暂停") return success_response(message=f"任务 {job_id} 已暂停") @router.post("/jobs/{job_id}/resume", response_model=ResponseModel) async def resume_job( job_id: str = Path(..., description="任务ID"), admin: UserDB = Depends(get_admin_user) ): """恢复定时任务""" success = scheduler.resume_job(job_id) if not success: raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法恢复") return success_response(message=f"任务 {job_id} 已恢复") @router.delete("/jobs/{job_id}", response_model=ResponseModel) async def remove_job( job_id: str = Path(..., description="任务ID"), admin: UserDB = Depends(get_admin_user) ): """删除定时任务""" success = scheduler.remove_job(job_id) if not success: raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法删除") return success_response(message=f"任务 {job_id} 已删除") @router.get("/status", response_model=ResponseModel) async def get_scheduler_status( admin: UserDB = Depends(get_admin_user) ): """获取调度器状态""" try: # 测试数据库连接 db_status = "正常" db_error = None try: with get_db_context() as db: # 执行简单查询测试连接 result = db.execute(text("SELECT 1")).scalar() if result != 1: db_status = "异常" db_error = "数据库连接测试失败" except Exception as e: db_status = "异常" db_error = str(e) logger.error(f"数据库连接测试失败: {str(e)}") # 获取调度器状态 status = { "running": scheduler.scheduler.running, "job_count": len(scheduler.get_jobs()), "timezone": str(scheduler.timezone), "database_connection": db_status } if db_error: status["database_error"] = db_error # 获取所有任务的下一次执行时间 jobs = scheduler.get_jobs() next_runs = [] for job in jobs: next_runs.append({ "job_id": job.id, "next_run_time": job.next_run_time, "active": job.next_run_time is not None }) status["next_runs"] = next_runs return success_response(data=status) except Exception as e: logger.error(f"获取调度器状态失败: {str(e)}") return success_response( code=500, message=f"获取调度器状态失败: {str(e)}", data={ "running": scheduler.scheduler.running if hasattr(scheduler, "scheduler") else False, "error": str(e) } ) @router.post("/jobs/cron", response_model=ResponseModel) async def create_cron_job( job_data: CronJobCreate, admin: UserDB = Depends(get_admin_user) ): """创建Cron定时任务""" try: # 动态导入模块和函数 module = importlib.import_module(job_data.module_path) func = getattr(module, job_data.function_name) # 验证函数是否可调用 if not callable(func): raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不可调用") # 构建cron参数 cron_params = {} for field in ["hour", "minute", "second", "day", "month", "day_of_week"]: value = getattr(job_data, field) if value is not None: cron_params[field] = value # 添加任务 job = scheduler.add_cron_job( func, job_id=job_data.job_id, **cron_params ) return success_response( message=f"Cron定时任务 {job_data.job_id} 创建成功", data={ "job_id": job.id, "next_run_time": job.next_run_time } ) except ImportError: raise HTTPException(status_code=400, detail=f"模块 {job_data.module_path} 不存在") except AttributeError: raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不存在") except Exception as e: logger.error(f"创建Cron定时任务失败: {str(e)}") raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}") @router.post("/jobs/interval", response_model=ResponseModel) async def create_interval_job( job_data: IntervalJobCreate, admin: UserDB = Depends(get_admin_user) ): """创建间隔定时任务""" try: # 动态导入模块和函数 module = importlib.import_module(job_data.module_path) func = getattr(module, job_data.function_name) # 验证函数是否可调用 if not callable(func): raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不可调用") # 构建间隔参数 interval_params = {} for field in ["seconds", "minutes", "hours", "days"]: value = getattr(job_data, field) if value is not None: interval_params[field] = value # 添加任务 job = scheduler.add_interval_job( func, job_id=job_data.job_id, **interval_params ) return success_response( message=f"间隔定时任务 {job_data.job_id} 创建成功", data={ "job_id": job.id, "next_run_time": job.next_run_time } ) except ImportError: raise HTTPException(status_code=400, detail=f"模块 {job_data.module_path} 不存在") except AttributeError: raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不存在") except Exception as e: logger.error(f"创建间隔定时任务失败: {str(e)}") raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}") @router.get("/available-tasks", response_model=ResponseModel) async def get_available_tasks( admin: UserDB = Depends(get_admin_user) ): """获取可用的定时任务函数""" try: # 导入任务模块 from app.tasks import daily_tasks # 获取所有异步函数 tasks = [] for name, func in inspect.getmembers(daily_tasks): if inspect.iscoroutinefunction(func) and not name.startswith('_'): # 获取函数文档 doc = inspect.getdoc(func) or "无描述" tasks.append({ "module": "app.tasks.daily_tasks", "name": name, "description": doc.split('\n')[0] # 获取第一行作为简短描述 }) return success_response(data=tasks) except Exception as e: logger.error(f"获取可用任务失败: {str(e)}") raise HTTPException(status_code=500, detail=f"获取可用任务失败: {str(e)}") @router.post("/jobs/{job_id}/run", response_model=ResponseModel) async def run_job( job_id: str = Path(..., description="任务ID"), admin: UserDB = Depends(get_admin_user) ): """手动触发定时任务执行""" try: # 获取任务 job = scheduler.scheduler.get_job(job_id) if not job: raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在") # 获取任务函数 func = job.func # 检查是否为异步函数 is_async = inspect.iscoroutinefunction(func) # 执行任务 if is_async: # 对于异步函数,需要特殊处理 try: # 获取或创建事件循环 try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 运行异步任务 if loop.is_running(): # 如果循环已经在运行,使用create_task future = asyncio.ensure_future(func()) # 可以添加回调函数处理结果 future.add_done_callback(lambda f: logger.info(f"任务 {job_id} 手动执行完成")) message = f"任务 {job_id} 已触发异步执行" else: # 如果循环没有运行,直接运行到完成 loop.run_until_complete(func()) message = f"任务 {job_id} 已同步执行完成" except Exception as e: logger.error(f"手动执行异步任务 {job_id} 失败: {str(e)}") raise HTTPException(status_code=500, detail=f"执行任务失败: {str(e)}") else: # 对于同步函数,直接执行 try: func() message = f"任务 {job_id} 已执行完成" except Exception as e: logger.error(f"手动执行任务 {job_id} 失败: {str(e)}") raise HTTPException(status_code=500, detail=f"执行任务失败: {str(e)}") return success_response(message=message) except HTTPException: raise except Exception as e: logger.error(f"手动触发任务 {job_id} 失败: {str(e)}") raise HTTPException(status_code=500, detail=f"触发任务失败: {str(e)}")