336 lines
13 KiB
Python
336 lines
13 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Body, Query, Path
|
||
from app.core.response import success_response, ResponseModel
|
||
from app.core.scheduler import scheduler
|
||
from app.api.deps import get_admin_user
|
||
from app.models.user import UserDB
|
||
from typing import List, Optional, Dict, Any, Union
|
||
from pydantic import BaseModel, Field
|
||
from datetime import datetime
|
||
import importlib
|
||
import inspect
|
||
import logging
|
||
from app.models.database import get_db_context
|
||
from sqlalchemy import text
|
||
import asyncio
|
||
import threading
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter()
|
||
|
||
class JobInfo(BaseModel):
|
||
"""任务信息模型"""
|
||
id: str
|
||
name: str
|
||
next_run_time: Optional[datetime] = None
|
||
trigger: str
|
||
active: bool
|
||
|
||
class CronJobCreate(BaseModel):
|
||
"""Cron定时任务创建模型"""
|
||
job_id: str = Field(..., description="任务ID,唯一标识")
|
||
module_path: str = Field(..., description="模块路径,如'app.tasks.daily_tasks'")
|
||
function_name: str = Field(..., description="函数名称,如'daily_database_cleanup'")
|
||
hour: Optional[Union[int, str]] = Field(None, description="小时 (0-23)")
|
||
minute: Optional[Union[int, str]] = Field(None, description="分钟 (0-59)")
|
||
second: Optional[Union[int, str]] = Field(None, description="秒 (0-59)")
|
||
day: Optional[Union[int, str]] = Field(None, description="日期 (1-31)")
|
||
month: Optional[Union[int, str]] = Field(None, description="月份 (1-12)")
|
||
day_of_week: Optional[Union[int, str]] = Field(None, description="工作日 (0-6 或 mon,tue,wed,thu,fri,sat,sun)")
|
||
replace_existing: bool = Field(True, description="如果任务已存在,是否替换")
|
||
|
||
class IntervalJobCreate(BaseModel):
|
||
"""间隔定时任务创建模型"""
|
||
job_id: str = Field(..., description="任务ID,唯一标识")
|
||
module_path: str = Field(..., description="模块路径,如'app.tasks.daily_tasks'")
|
||
function_name: str = Field(..., description="函数名称,如'daily_database_cleanup'")
|
||
seconds: Optional[int] = Field(None, description="间隔秒数")
|
||
minutes: Optional[int] = Field(None, description="间隔分钟数")
|
||
hours: Optional[int] = Field(None, description="间隔小时数")
|
||
days: Optional[int] = Field(None, description="间隔天数")
|
||
replace_existing: bool = Field(True, description="如果任务已存在,是否替换")
|
||
|
||
@router.get("/jobs", response_model=ResponseModel)
|
||
async def get_jobs(
|
||
admin: UserDB = Depends(get_admin_user)
|
||
):
|
||
"""获取所有定时任务"""
|
||
jobs = scheduler.get_jobs()
|
||
job_list = []
|
||
|
||
for job in jobs:
|
||
job_info = {
|
||
"id": job.id,
|
||
"name": job.name or job.func.__name__,
|
||
"next_run_time": job.next_run_time,
|
||
"trigger": str(job.trigger),
|
||
"active": job.next_run_time is not None
|
||
}
|
||
job_list.append(job_info)
|
||
|
||
return success_response(data=job_list)
|
||
|
||
@router.post("/jobs/{job_id}/pause", response_model=ResponseModel)
|
||
async def pause_job(
|
||
job_id: str = Path(..., description="任务ID"),
|
||
admin: UserDB = Depends(get_admin_user)
|
||
):
|
||
"""暂停定时任务"""
|
||
success = scheduler.pause_job(job_id)
|
||
if not success:
|
||
raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法暂停")
|
||
|
||
return success_response(message=f"任务 {job_id} 已暂停")
|
||
|
||
@router.post("/jobs/{job_id}/resume", response_model=ResponseModel)
|
||
async def resume_job(
|
||
job_id: str = Path(..., description="任务ID"),
|
||
admin: UserDB = Depends(get_admin_user)
|
||
):
|
||
"""恢复定时任务"""
|
||
success = scheduler.resume_job(job_id)
|
||
if not success:
|
||
raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法恢复")
|
||
|
||
return success_response(message=f"任务 {job_id} 已恢复")
|
||
|
||
@router.delete("/jobs/{job_id}", response_model=ResponseModel)
|
||
async def remove_job(
|
||
job_id: str = Path(..., description="任务ID"),
|
||
admin: UserDB = Depends(get_admin_user)
|
||
):
|
||
"""删除定时任务"""
|
||
success = scheduler.remove_job(job_id)
|
||
if not success:
|
||
raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法删除")
|
||
|
||
return success_response(message=f"任务 {job_id} 已删除")
|
||
|
||
@router.get("/status", response_model=ResponseModel)
|
||
async def get_scheduler_status(
|
||
admin: UserDB = Depends(get_admin_user)
|
||
):
|
||
"""获取调度器状态"""
|
||
try:
|
||
# 测试数据库连接
|
||
db_status = "正常"
|
||
db_error = None
|
||
try:
|
||
with get_db_context() as db:
|
||
# 执行简单查询测试连接
|
||
result = db.execute(text("SELECT 1")).scalar()
|
||
if result != 1:
|
||
db_status = "异常"
|
||
db_error = "数据库连接测试失败"
|
||
except Exception as e:
|
||
db_status = "异常"
|
||
db_error = str(e)
|
||
logger.error(f"数据库连接测试失败: {str(e)}")
|
||
|
||
# 获取调度器状态
|
||
status = {
|
||
"running": scheduler.scheduler.running,
|
||
"job_count": len(scheduler.get_jobs()),
|
||
"timezone": str(scheduler.timezone),
|
||
"database_connection": db_status
|
||
}
|
||
|
||
if db_error:
|
||
status["database_error"] = db_error
|
||
|
||
# 获取所有任务的下一次执行时间
|
||
jobs = scheduler.get_jobs()
|
||
next_runs = []
|
||
for job in jobs:
|
||
next_runs.append({
|
||
"job_id": job.id,
|
||
"next_run_time": job.next_run_time,
|
||
"active": job.next_run_time is not None
|
||
})
|
||
|
||
status["next_runs"] = next_runs
|
||
|
||
return success_response(data=status)
|
||
except Exception as e:
|
||
logger.error(f"获取调度器状态失败: {str(e)}")
|
||
return success_response(
|
||
code=500,
|
||
message=f"获取调度器状态失败: {str(e)}",
|
||
data={
|
||
"running": scheduler.scheduler.running if hasattr(scheduler, "scheduler") else False,
|
||
"error": str(e)
|
||
}
|
||
)
|
||
|
||
@router.post("/jobs/cron", response_model=ResponseModel)
|
||
async def create_cron_job(
|
||
job_data: CronJobCreate,
|
||
admin: UserDB = Depends(get_admin_user)
|
||
):
|
||
"""创建Cron定时任务"""
|
||
try:
|
||
# 动态导入模块和函数
|
||
module = importlib.import_module(job_data.module_path)
|
||
func = getattr(module, job_data.function_name)
|
||
|
||
# 验证函数是否可调用
|
||
if not callable(func):
|
||
raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不可调用")
|
||
|
||
# 构建cron参数
|
||
cron_params = {}
|
||
for field in ["hour", "minute", "second", "day", "month", "day_of_week"]:
|
||
value = getattr(job_data, field)
|
||
if value is not None:
|
||
cron_params[field] = value
|
||
|
||
# 添加任务
|
||
job = scheduler.add_cron_job(
|
||
func,
|
||
job_id=job_data.job_id,
|
||
**cron_params
|
||
)
|
||
|
||
return success_response(
|
||
message=f"Cron定时任务 {job_data.job_id} 创建成功",
|
||
data={
|
||
"job_id": job.id,
|
||
"next_run_time": job.next_run_time
|
||
}
|
||
)
|
||
except ImportError:
|
||
raise HTTPException(status_code=400, detail=f"模块 {job_data.module_path} 不存在")
|
||
except AttributeError:
|
||
raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不存在")
|
||
except Exception as e:
|
||
logger.error(f"创建Cron定时任务失败: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}")
|
||
|
||
@router.post("/jobs/interval", response_model=ResponseModel)
|
||
async def create_interval_job(
|
||
job_data: IntervalJobCreate,
|
||
admin: UserDB = Depends(get_admin_user)
|
||
):
|
||
"""创建间隔定时任务"""
|
||
try:
|
||
# 动态导入模块和函数
|
||
module = importlib.import_module(job_data.module_path)
|
||
func = getattr(module, job_data.function_name)
|
||
|
||
# 验证函数是否可调用
|
||
if not callable(func):
|
||
raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不可调用")
|
||
|
||
# 构建间隔参数
|
||
interval_params = {}
|
||
for field in ["seconds", "minutes", "hours", "days"]:
|
||
value = getattr(job_data, field)
|
||
if value is not None:
|
||
interval_params[field] = value
|
||
|
||
# 添加任务
|
||
job = scheduler.add_interval_job(
|
||
func,
|
||
job_id=job_data.job_id,
|
||
**interval_params
|
||
)
|
||
|
||
return success_response(
|
||
message=f"间隔定时任务 {job_data.job_id} 创建成功",
|
||
data={
|
||
"job_id": job.id,
|
||
"next_run_time": job.next_run_time
|
||
}
|
||
)
|
||
except ImportError:
|
||
raise HTTPException(status_code=400, detail=f"模块 {job_data.module_path} 不存在")
|
||
except AttributeError:
|
||
raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不存在")
|
||
except Exception as e:
|
||
logger.error(f"创建间隔定时任务失败: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}")
|
||
|
||
@router.get("/available-tasks", response_model=ResponseModel)
|
||
async def get_available_tasks(
|
||
admin: UserDB = Depends(get_admin_user)
|
||
):
|
||
"""获取可用的定时任务函数"""
|
||
try:
|
||
# 导入任务模块
|
||
from app.tasks import daily_tasks
|
||
|
||
# 获取所有异步函数
|
||
tasks = []
|
||
for name, func in inspect.getmembers(daily_tasks):
|
||
if inspect.iscoroutinefunction(func) and not name.startswith('_'):
|
||
# 获取函数文档
|
||
doc = inspect.getdoc(func) or "无描述"
|
||
tasks.append({
|
||
"module": "app.tasks.daily_tasks",
|
||
"name": name,
|
||
"description": doc.split('\n')[0] # 获取第一行作为简短描述
|
||
})
|
||
|
||
return success_response(data=tasks)
|
||
except Exception as e:
|
||
logger.error(f"获取可用任务失败: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"获取可用任务失败: {str(e)}")
|
||
|
||
@router.post("/jobs/{job_id}/run", response_model=ResponseModel)
|
||
async def run_job(
|
||
job_id: str = Path(..., description="任务ID"),
|
||
admin: UserDB = Depends(get_admin_user)
|
||
):
|
||
"""手动触发定时任务执行"""
|
||
try:
|
||
# 获取任务
|
||
job = scheduler.scheduler.get_job(job_id)
|
||
if not job:
|
||
raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在")
|
||
|
||
# 获取任务函数
|
||
func = job.func
|
||
|
||
# 检查是否为异步函数
|
||
is_async = inspect.iscoroutinefunction(func)
|
||
|
||
# 执行任务
|
||
if is_async:
|
||
# 对于异步函数,需要特殊处理
|
||
try:
|
||
# 获取或创建事件循环
|
||
try:
|
||
loop = asyncio.get_event_loop()
|
||
except RuntimeError:
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
|
||
# 运行异步任务
|
||
if loop.is_running():
|
||
# 如果循环已经在运行,使用create_task
|
||
future = asyncio.ensure_future(func())
|
||
# 可以添加回调函数处理结果
|
||
future.add_done_callback(lambda f: logger.info(f"任务 {job_id} 手动执行完成"))
|
||
message = f"任务 {job_id} 已触发异步执行"
|
||
else:
|
||
# 如果循环没有运行,直接运行到完成
|
||
loop.run_until_complete(func())
|
||
message = f"任务 {job_id} 已同步执行完成"
|
||
except Exception as e:
|
||
logger.error(f"手动执行异步任务 {job_id} 失败: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"执行任务失败: {str(e)}")
|
||
else:
|
||
# 对于同步函数,直接执行
|
||
try:
|
||
func()
|
||
message = f"任务 {job_id} 已执行完成"
|
||
except Exception as e:
|
||
logger.error(f"手动执行任务 {job_id} 失败: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"执行任务失败: {str(e)}")
|
||
|
||
return success_response(message=message)
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"手动触发任务 {job_id} 失败: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"触发任务失败: {str(e)}") |