增加定时任务

This commit is contained in:
aaron 2025-03-11 08:09:04 +08:00
parent be946cc639
commit da0b42cd1d
7 changed files with 491 additions and 7 deletions

View File

@ -0,0 +1,230 @@
from fastapi import APIRouter, Depends, HTTPException, Body, Query, Path
from app.core.response import success_response, ResponseModel
from app.core.scheduler import scheduler
from app.api.deps import get_admin_user
from app.models.user import UserDB
from typing import List, Optional, Dict, Any, Union
from pydantic import BaseModel, Field
from datetime import datetime
import importlib
import inspect
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
class JobInfo(BaseModel):
"""任务信息模型"""
id: str
name: str
next_run_time: Optional[datetime] = None
trigger: str
active: bool
class CronJobCreate(BaseModel):
"""Cron定时任务创建模型"""
job_id: str = Field(..., description="任务ID唯一标识")
module_path: str = Field(..., description="模块路径,如'app.tasks.daily_tasks'")
function_name: str = Field(..., description="函数名称,如'daily_database_cleanup'")
hour: Optional[Union[int, str]] = Field(None, description="小时 (0-23)")
minute: Optional[Union[int, str]] = Field(None, description="分钟 (0-59)")
second: Optional[Union[int, str]] = Field(None, description="秒 (0-59)")
day: Optional[Union[int, str]] = Field(None, description="日期 (1-31)")
month: Optional[Union[int, str]] = Field(None, description="月份 (1-12)")
day_of_week: Optional[Union[int, str]] = Field(None, description="工作日 (0-6 或 mon,tue,wed,thu,fri,sat,sun)")
replace_existing: bool = Field(True, description="如果任务已存在,是否替换")
class IntervalJobCreate(BaseModel):
"""间隔定时任务创建模型"""
job_id: str = Field(..., description="任务ID唯一标识")
module_path: str = Field(..., description="模块路径,如'app.tasks.daily_tasks'")
function_name: str = Field(..., description="函数名称,如'daily_database_cleanup'")
seconds: Optional[int] = Field(None, description="间隔秒数")
minutes: Optional[int] = Field(None, description="间隔分钟数")
hours: Optional[int] = Field(None, description="间隔小时数")
days: Optional[int] = Field(None, description="间隔天数")
replace_existing: bool = Field(True, description="如果任务已存在,是否替换")
@router.get("/jobs", response_model=ResponseModel)
async def get_jobs(
admin: UserDB = Depends(get_admin_user)
):
"""获取所有定时任务"""
jobs = scheduler.get_jobs()
job_list = []
for job in jobs:
job_info = {
"id": job.id,
"name": job.name or job.func.__name__,
"next_run_time": job.next_run_time,
"trigger": str(job.trigger),
"active": job.next_run_time is not None
}
job_list.append(job_info)
return success_response(data=job_list)
@router.post("/jobs/{job_id}/pause", response_model=ResponseModel)
async def pause_job(
job_id: str = Path(..., description="任务ID"),
admin: UserDB = Depends(get_admin_user)
):
"""暂停定时任务"""
success = scheduler.pause_job(job_id)
if not success:
raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法暂停")
return success_response(message=f"任务 {job_id} 已暂停")
@router.post("/jobs/{job_id}/resume", response_model=ResponseModel)
async def resume_job(
job_id: str = Path(..., description="任务ID"),
admin: UserDB = Depends(get_admin_user)
):
"""恢复定时任务"""
success = scheduler.resume_job(job_id)
if not success:
raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法恢复")
return success_response(message=f"任务 {job_id} 已恢复")
@router.delete("/jobs/{job_id}", response_model=ResponseModel)
async def remove_job(
job_id: str = Path(..., description="任务ID"),
admin: UserDB = Depends(get_admin_user)
):
"""删除定时任务"""
success = scheduler.remove_job(job_id)
if not success:
raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法删除")
return success_response(message=f"任务 {job_id} 已删除")
@router.get("/status", response_model=ResponseModel)
async def get_scheduler_status(
admin: UserDB = Depends(get_admin_user)
):
"""获取调度器状态"""
status = {
"running": scheduler.scheduler.running,
"job_count": len(scheduler.get_jobs()),
"timezone": str(scheduler.timezone)
}
return success_response(data=status)
@router.post("/jobs/cron", response_model=ResponseModel)
async def create_cron_job(
job_data: CronJobCreate,
admin: UserDB = Depends(get_admin_user)
):
"""创建Cron定时任务"""
try:
# 动态导入模块和函数
module = importlib.import_module(job_data.module_path)
func = getattr(module, job_data.function_name)
# 验证函数是否可调用
if not callable(func):
raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不可调用")
# 构建cron参数
cron_params = {}
for field in ["hour", "minute", "second", "day", "month", "day_of_week"]:
value = getattr(job_data, field)
if value is not None:
cron_params[field] = value
# 添加任务
job = scheduler.add_cron_job(
func,
job_id=job_data.job_id,
**cron_params
)
return success_response(
message=f"Cron定时任务 {job_data.job_id} 创建成功",
data={
"job_id": job.id,
"next_run_time": job.next_run_time
}
)
except ImportError:
raise HTTPException(status_code=400, detail=f"模块 {job_data.module_path} 不存在")
except AttributeError:
raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不存在")
except Exception as e:
logger.error(f"创建Cron定时任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}")
@router.post("/jobs/interval", response_model=ResponseModel)
async def create_interval_job(
job_data: IntervalJobCreate,
admin: UserDB = Depends(get_admin_user)
):
"""创建间隔定时任务"""
try:
# 动态导入模块和函数
module = importlib.import_module(job_data.module_path)
func = getattr(module, job_data.function_name)
# 验证函数是否可调用
if not callable(func):
raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不可调用")
# 构建间隔参数
interval_params = {}
for field in ["seconds", "minutes", "hours", "days"]:
value = getattr(job_data, field)
if value is not None:
interval_params[field] = value
# 添加任务
job = scheduler.add_interval_job(
func,
job_id=job_data.job_id,
**interval_params
)
return success_response(
message=f"间隔定时任务 {job_data.job_id} 创建成功",
data={
"job_id": job.id,
"next_run_time": job.next_run_time
}
)
except ImportError:
raise HTTPException(status_code=400, detail=f"模块 {job_data.module_path} 不存在")
except AttributeError:
raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不存在")
except Exception as e:
logger.error(f"创建间隔定时任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}")
@router.get("/available-tasks", response_model=ResponseModel)
async def get_available_tasks(
admin: UserDB = Depends(get_admin_user)
):
"""获取可用的定时任务函数"""
try:
# 导入任务模块
from app.tasks import daily_tasks
# 获取所有异步函数
tasks = []
for name, func in inspect.getmembers(daily_tasks):
if inspect.iscoroutinefunction(func) and not name.startswith('_'):
# 获取函数文档
doc = inspect.getdoc(func) or "无描述"
tasks.append({
"module": "app.tasks.daily_tasks",
"name": name,
"description": doc.split('\n')[0] # 获取第一行作为简短描述
})
return success_response(data=tasks)
except Exception as e:
logger.error(f"获取可用任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取可用任务失败: {str(e)}")

171
app/core/scheduler.py Normal file
View File

@ -0,0 +1,171 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from datetime import datetime
import logging
import pytz
logger = logging.getLogger(__name__)
class TaskScheduler:
"""定时任务调度器"""
def __init__(self, timezone='Asia/Shanghai'):
"""初始化调度器"""
self.timezone = pytz.timezone(timezone)
# 配置作业存储和执行器
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20), # 默认线程池
'processpool': ProcessPoolExecutor(5) # 进程池
}
job_defaults = {
'coalesce': False, # 是否合并执行
'max_instances': 3, # 最大实例数
'misfire_grace_time': 60 # 任务错过执行时间的宽限时间(秒)
}
# 创建调度器
self.scheduler = AsyncIOScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=self.timezone
)
def start(self):
"""启动调度器"""
if not self.scheduler.running:
self.scheduler.start()
logger.info("定时任务调度器已启动")
def shutdown(self):
"""关闭调度器"""
if self.scheduler.running:
self.scheduler.shutdown()
logger.info("定时任务调度器已关闭")
def add_cron_job(self, func, job_id=None, **trigger_args):
"""添加Cron定时任务
参数:
func: 要执行的函数
job_id: 任务ID如果不指定则自动生成
trigger_args: cron触发器参数:
year (int|str) 4位数年份
month (int|str) 月份 (1-12)
day (int|str) 日期 (1-31)
week (int|str) ISO周数 (1-53)
day_of_week (int|str) 工作日 (0-6 mon,tue,wed,thu,fri,sat,sun)
hour (int|str) 小时 (0-23)
minute (int|str) 分钟 (0-59)
second (int|str) (0-59)
示例:
# 每天凌晨2点执行
scheduler.add_cron_job(my_function, hour=2)
# 每周一、三、五的下午5:30执行
scheduler.add_cron_job(my_function, day_of_week='mon,wed,fri', hour=17, minute=30)
"""
trigger = CronTrigger(**trigger_args, timezone=self.timezone)
return self.scheduler.add_job(
func,
trigger=trigger,
id=job_id,
replace_existing=True
)
def add_interval_job(self, func, seconds=0, minutes=0, hours=0, days=0, job_id=None):
"""添加间隔定时任务
参数:
func: 要执行的函数
seconds: 间隔秒数
minutes: 间隔分钟数
hours: 间隔小时数
days: 间隔天数
job_id: 任务ID如果不指定则自动生成
示例:
# 每30分钟执行一次
scheduler.add_interval_job(my_function, minutes=30)
"""
trigger = IntervalTrigger(
seconds=seconds,
minutes=minutes,
hours=hours,
days=days,
timezone=self.timezone
)
return self.scheduler.add_job(
func,
trigger=trigger,
id=job_id,
replace_existing=True
)
def add_one_time_job(self, func, run_date, job_id=None):
"""添加一次性定时任务
参数:
func: 要执行的函数
run_date: 运行日期时间
job_id: 任务ID如果不指定则自动生成
示例:
# 在指定时间执行一次
scheduler.add_one_time_job(my_function, datetime(2023, 12, 31, 23, 59, 59))
"""
trigger = DateTrigger(run_date=run_date, timezone=self.timezone)
return self.scheduler.add_job(
func,
trigger=trigger,
id=job_id,
replace_existing=True
)
def remove_job(self, job_id):
"""移除定时任务"""
try:
self.scheduler.remove_job(job_id)
logger.info(f"已移除任务: {job_id}")
return True
except Exception as e:
logger.error(f"移除任务失败: {job_id}, 错误: {str(e)}")
return False
def get_jobs(self):
"""获取所有定时任务"""
return self.scheduler.get_jobs()
def pause_job(self, job_id):
"""暂停定时任务"""
try:
self.scheduler.pause_job(job_id)
logger.info(f"已暂停任务: {job_id}")
return True
except Exception as e:
logger.error(f"暂停任务失败: {job_id}, 错误: {str(e)}")
return False
def resume_job(self, job_id):
"""恢复定时任务"""
try:
self.scheduler.resume_job(job_id)
logger.info(f"已恢复任务: {job_id}")
return True
except Exception as e:
logger.error(f"恢复任务失败: {job_id}, 错误: {str(e)}")
return False
# 创建全局调度器实例
scheduler = TaskScheduler()

View File

@ -1,6 +1,6 @@
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from app.api.endpoints import wechat,user, address, community, station, order, coupon, community_building, upload, merchant, merchant_product, merchant_order, point, config, merchant_category, log, account,merchant_pay_order, message, bank_card, withdraw, mp, point_product, point_product_order, coupon_activity, dashboard, wecom, feedback, timeperiod, community_timeperiod, order_additional_fee, ai, community_set, community_set_mapping, community_profit_sharing, partner, health from app.api.endpoints import wechat,user, address, community, station, order, coupon, community_building, upload, merchant, merchant_product, merchant_order, point, config, merchant_category, log, account,merchant_pay_order, message, bank_card, withdraw, mp, point_product, point_product_order, coupon_activity, dashboard, wecom, feedback, timeperiod, community_timeperiod, order_additional_fee, ai, community_set, community_set_mapping, community_profit_sharing, partner, health, scheduler
from app.models.database import Base, engine from app.models.database import Base, engine
from fastapi.exceptions import RequestValidationError from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
@ -16,15 +16,32 @@ from app.api.endpoints import feedback
from starlette.middleware.sessions import SessionMiddleware from starlette.middleware.sessions import SessionMiddleware
import os import os
from app.core.db_monitor import setup_db_monitor from app.core.db_monitor import setup_db_monitor
from app.tasks import init_scheduler
from contextlib import asynccontextmanager
# 创建数据库表 # 创建数据库表
Base.metadata.create_all(bind=engine) Base.metadata.create_all(bind=engine)
# 定义应用生命周期上下文管理器
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时执行
# 初始化并启动任务调度器
init_scheduler()
yield # 应用运行期间
# 关闭时执行
# 关闭任务调度器
from app.core.scheduler import scheduler
scheduler.shutdown()
app = FastAPI( app = FastAPI(
title="Beefast 蜂快到家", title="Beefast 蜂快到家",
description="API 文档", description="API 文档",
version="1.0.0", version="1.0.0",
docs_url="/docs" if settings.DEBUG else None docs_url="/docs" if settings.DEBUG else None,
lifespan=lifespan # 使用lifespan上下文管理器
) )
# 设置数据库连接监控 # 设置数据库连接监控
@ -45,8 +62,10 @@ app.add_middleware(
app.add_middleware(RequestLoggerMiddleware) app.add_middleware(RequestLoggerMiddleware)
app.add_middleware(SessionMiddleware, secret_key=settings.SECRET_KEY) app.add_middleware(SessionMiddleware, secret_key=settings.SECRET_KEY)
# 添加定时任务路由
app.include_router(scheduler.router, prefix="/api/scheduler", tags=["定时任务管理"])
# 添加用户路由 # 添加其他路由
app.include_router(ai.router, prefix="/api/ai", tags=["AI服务"]) app.include_router(ai.router, prefix="/api/ai", tags=["AI服务"])
app.include_router(dashboard.router, prefix="/api/dashboard", tags=["仪表盘"]) app.include_router(dashboard.router, prefix="/api/dashboard", tags=["仪表盘"])
@ -86,13 +105,10 @@ app.include_router(log.router, prefix="/api/logs", tags=["系统日志"])
app.include_router(feedback.router, prefix="/api/feedback", tags=["反馈"]) app.include_router(feedback.router, prefix="/api/feedback", tags=["反馈"])
app.include_router(health.router, prefix="/api/health", tags=["系统健康检查"]) app.include_router(health.router, prefix="/api/health", tags=["系统健康检查"])
@app.get("/") @app.get("/")
async def root(): async def root():
return {"message": "欢迎使用 Beefast 蜂快到家 API"} return {"message": "欢迎使用 Beefast 蜂快到家 API"}
@app.exception_handler(Exception) @app.exception_handler(Exception)
async def exception_handler(request, exc): async def exception_handler(request, exc):
env = "测试环境" if settings.DEBUG else "生产环境" env = "测试环境" if settings.DEBUG else "生产环境"

23
app/tasks/__init__.py Normal file
View File

@ -0,0 +1,23 @@
import logging
from app.core.scheduler import scheduler
from app.tasks.daily_tasks import register_daily_tasks
logger = logging.getLogger(__name__)
def init_scheduler():
"""初始化并启动任务调度器"""
try:
# 注册所有定时任务
register_daily_tasks()
# 这里可以添加其他类型的定时任务注册
# register_weekly_tasks()
# register_monthly_tasks()
# 启动调度器
scheduler.start()
logger.info("任务调度器初始化完成并已启动")
except Exception as e:
logger.error(f"任务调度器初始化失败: {str(e)}")
raise

43
app/tasks/daily_tasks.py Normal file
View File

@ -0,0 +1,43 @@
import logging
import asyncio
from datetime import datetime, timedelta
from app.models.database import get_db_context
from app.core.scheduler import scheduler
from sqlalchemy import text
import json
import os
logger = logging.getLogger(__name__)
async def daily_statistics_report():
"""每日统计报告任务"""
logger.info(f"开始生成每日统计报告: {datetime.now()}")
try:
with get_db_context() as db:
#获取昨日时间
yesterday = datetime.now() - timedelta(days=1)
yesterday_start = datetime.combine(yesterday, datetime.min.time())
yesterday_end = datetime.combine(yesterday, datetime.max.time())
except Exception as e:
logger.error(f"生成每日统计报告失败: {str(e)}")
def register_daily_tasks():
"""注册所有每日定时任务"""
# 每天早上8点生成统计报告
scheduler.add_cron_job(
daily_statistics_report,
hour=8,
minute=0,
job_id="daily_stats_report"
)
logger.info("已注册所有每日定时任务")

BIN
jobs.sqlite Normal file

Binary file not shown.

View File

@ -19,3 +19,4 @@ pillow>=9.0.0
pytz==2024.1 pytz==2024.1
dashscope>=1.13.0 dashscope>=1.13.0
itsdangerous==2.2.0 itsdangerous==2.2.0
apscheduler==3.11.0