From da0b42cd1dccb3cdfe05e90a8b069853f3c8df24 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Tue, 11 Mar 2025 08:09:04 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/endpoints/scheduler.py | 230 +++++++++++++++++++++++++++++++++ app/core/scheduler.py | 171 ++++++++++++++++++++++++ app/main.py | 28 +++- app/tasks/__init__.py | 23 ++++ app/tasks/daily_tasks.py | 43 ++++++ jobs.sqlite | Bin 0 -> 16384 bytes requirements.txt | 3 +- 7 files changed, 491 insertions(+), 7 deletions(-) create mode 100644 app/api/endpoints/scheduler.py create mode 100644 app/core/scheduler.py create mode 100644 app/tasks/__init__.py create mode 100644 app/tasks/daily_tasks.py create mode 100644 jobs.sqlite diff --git a/app/api/endpoints/scheduler.py b/app/api/endpoints/scheduler.py new file mode 100644 index 0000000..a7b1bb0 --- /dev/null +++ b/app/api/endpoints/scheduler.py @@ -0,0 +1,230 @@ +from fastapi import APIRouter, Depends, HTTPException, Body, Query, Path +from app.core.response import success_response, ResponseModel +from app.core.scheduler import scheduler +from app.api.deps import get_admin_user +from app.models.user import UserDB +from typing import List, Optional, Dict, Any, Union +from pydantic import BaseModel, Field +from datetime import datetime +import importlib +import inspect +import logging + +logger = logging.getLogger(__name__) + +router = APIRouter() + +class JobInfo(BaseModel): + """任务信息模型""" + id: str + name: str + next_run_time: Optional[datetime] = None + trigger: str + active: bool + +class CronJobCreate(BaseModel): + """Cron定时任务创建模型""" + job_id: str = Field(..., description="任务ID,唯一标识") + module_path: str = Field(..., description="模块路径,如'app.tasks.daily_tasks'") + function_name: str = Field(..., description="函数名称,如'daily_database_cleanup'") + hour: Optional[Union[int, str]] = Field(None, description="小时 (0-23)") + minute: Optional[Union[int, str]] = Field(None, description="分钟 (0-59)") + second: Optional[Union[int, str]] = Field(None, description="秒 (0-59)") + day: Optional[Union[int, str]] = Field(None, description="日期 (1-31)") + month: Optional[Union[int, str]] = Field(None, description="月份 (1-12)") + day_of_week: Optional[Union[int, str]] = Field(None, description="工作日 (0-6 或 mon,tue,wed,thu,fri,sat,sun)") + replace_existing: bool = Field(True, description="如果任务已存在,是否替换") + +class IntervalJobCreate(BaseModel): + """间隔定时任务创建模型""" + job_id: str = Field(..., description="任务ID,唯一标识") + module_path: str = Field(..., description="模块路径,如'app.tasks.daily_tasks'") + function_name: str = Field(..., description="函数名称,如'daily_database_cleanup'") + seconds: Optional[int] = Field(None, description="间隔秒数") + minutes: Optional[int] = Field(None, description="间隔分钟数") + hours: Optional[int] = Field(None, description="间隔小时数") + days: Optional[int] = Field(None, description="间隔天数") + replace_existing: bool = Field(True, description="如果任务已存在,是否替换") + +@router.get("/jobs", response_model=ResponseModel) +async def get_jobs( + admin: UserDB = Depends(get_admin_user) +): + """获取所有定时任务""" + jobs = scheduler.get_jobs() + job_list = [] + + for job in jobs: + job_info = { + "id": job.id, + "name": job.name or job.func.__name__, + "next_run_time": job.next_run_time, + "trigger": str(job.trigger), + "active": job.next_run_time is not None + } + job_list.append(job_info) + + return success_response(data=job_list) + +@router.post("/jobs/{job_id}/pause", response_model=ResponseModel) +async def pause_job( + job_id: str = Path(..., description="任务ID"), + admin: UserDB = Depends(get_admin_user) +): + """暂停定时任务""" + success = scheduler.pause_job(job_id) + if not success: + raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法暂停") + + return success_response(message=f"任务 {job_id} 已暂停") + +@router.post("/jobs/{job_id}/resume", response_model=ResponseModel) +async def resume_job( + job_id: str = Path(..., description="任务ID"), + admin: UserDB = Depends(get_admin_user) +): + """恢复定时任务""" + success = scheduler.resume_job(job_id) + if not success: + raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法恢复") + + return success_response(message=f"任务 {job_id} 已恢复") + +@router.delete("/jobs/{job_id}", response_model=ResponseModel) +async def remove_job( + job_id: str = Path(..., description="任务ID"), + admin: UserDB = Depends(get_admin_user) +): + """删除定时任务""" + success = scheduler.remove_job(job_id) + if not success: + raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在或无法删除") + + return success_response(message=f"任务 {job_id} 已删除") + +@router.get("/status", response_model=ResponseModel) +async def get_scheduler_status( + admin: UserDB = Depends(get_admin_user) +): + """获取调度器状态""" + status = { + "running": scheduler.scheduler.running, + "job_count": len(scheduler.get_jobs()), + "timezone": str(scheduler.timezone) + } + + return success_response(data=status) + +@router.post("/jobs/cron", response_model=ResponseModel) +async def create_cron_job( + job_data: CronJobCreate, + admin: UserDB = Depends(get_admin_user) +): + """创建Cron定时任务""" + try: + # 动态导入模块和函数 + module = importlib.import_module(job_data.module_path) + func = getattr(module, job_data.function_name) + + # 验证函数是否可调用 + if not callable(func): + raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不可调用") + + # 构建cron参数 + cron_params = {} + for field in ["hour", "minute", "second", "day", "month", "day_of_week"]: + value = getattr(job_data, field) + if value is not None: + cron_params[field] = value + + # 添加任务 + job = scheduler.add_cron_job( + func, + job_id=job_data.job_id, + **cron_params + ) + + return success_response( + message=f"Cron定时任务 {job_data.job_id} 创建成功", + data={ + "job_id": job.id, + "next_run_time": job.next_run_time + } + ) + except ImportError: + raise HTTPException(status_code=400, detail=f"模块 {job_data.module_path} 不存在") + except AttributeError: + raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不存在") + except Exception as e: + logger.error(f"创建Cron定时任务失败: {str(e)}") + raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}") + +@router.post("/jobs/interval", response_model=ResponseModel) +async def create_interval_job( + job_data: IntervalJobCreate, + admin: UserDB = Depends(get_admin_user) +): + """创建间隔定时任务""" + try: + # 动态导入模块和函数 + module = importlib.import_module(job_data.module_path) + func = getattr(module, job_data.function_name) + + # 验证函数是否可调用 + if not callable(func): + raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不可调用") + + # 构建间隔参数 + interval_params = {} + for field in ["seconds", "minutes", "hours", "days"]: + value = getattr(job_data, field) + if value is not None: + interval_params[field] = value + + # 添加任务 + job = scheduler.add_interval_job( + func, + job_id=job_data.job_id, + **interval_params + ) + + return success_response( + message=f"间隔定时任务 {job_data.job_id} 创建成功", + data={ + "job_id": job.id, + "next_run_time": job.next_run_time + } + ) + except ImportError: + raise HTTPException(status_code=400, detail=f"模块 {job_data.module_path} 不存在") + except AttributeError: + raise HTTPException(status_code=400, detail=f"函数 {job_data.function_name} 不存在") + except Exception as e: + logger.error(f"创建间隔定时任务失败: {str(e)}") + raise HTTPException(status_code=500, detail=f"创建任务失败: {str(e)}") + +@router.get("/available-tasks", response_model=ResponseModel) +async def get_available_tasks( + admin: UserDB = Depends(get_admin_user) +): + """获取可用的定时任务函数""" + try: + # 导入任务模块 + from app.tasks import daily_tasks + + # 获取所有异步函数 + tasks = [] + for name, func in inspect.getmembers(daily_tasks): + if inspect.iscoroutinefunction(func) and not name.startswith('_'): + # 获取函数文档 + doc = inspect.getdoc(func) or "无描述" + tasks.append({ + "module": "app.tasks.daily_tasks", + "name": name, + "description": doc.split('\n')[0] # 获取第一行作为简短描述 + }) + + return success_response(data=tasks) + except Exception as e: + logger.error(f"获取可用任务失败: {str(e)}") + raise HTTPException(status_code=500, detail=f"获取可用任务失败: {str(e)}") \ No newline at end of file diff --git a/app/core/scheduler.py b/app/core/scheduler.py new file mode 100644 index 0000000..349d5c6 --- /dev/null +++ b/app/core/scheduler.py @@ -0,0 +1,171 @@ +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.date import DateTrigger +from datetime import datetime +import logging +import pytz + +logger = logging.getLogger(__name__) + +class TaskScheduler: + """定时任务调度器""" + + def __init__(self, timezone='Asia/Shanghai'): + """初始化调度器""" + self.timezone = pytz.timezone(timezone) + + # 配置作业存储和执行器 + jobstores = { + 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') + } + + executors = { + 'default': ThreadPoolExecutor(20), # 默认线程池 + 'processpool': ProcessPoolExecutor(5) # 进程池 + } + + job_defaults = { + 'coalesce': False, # 是否合并执行 + 'max_instances': 3, # 最大实例数 + 'misfire_grace_time': 60 # 任务错过执行时间的宽限时间(秒) + } + + # 创建调度器 + self.scheduler = AsyncIOScheduler( + jobstores=jobstores, + executors=executors, + job_defaults=job_defaults, + timezone=self.timezone + ) + + def start(self): + """启动调度器""" + if not self.scheduler.running: + self.scheduler.start() + logger.info("定时任务调度器已启动") + + def shutdown(self): + """关闭调度器""" + if self.scheduler.running: + self.scheduler.shutdown() + logger.info("定时任务调度器已关闭") + + def add_cron_job(self, func, job_id=None, **trigger_args): + """添加Cron定时任务 + + 参数: + func: 要执行的函数 + job_id: 任务ID,如果不指定则自动生成 + trigger_args: cron触发器参数,如: + year (int|str) – 4位数年份 + month (int|str) – 月份 (1-12) + day (int|str) – 日期 (1-31) + week (int|str) – ISO周数 (1-53) + day_of_week (int|str) – 工作日 (0-6 或 mon,tue,wed,thu,fri,sat,sun) + hour (int|str) – 小时 (0-23) + minute (int|str) – 分钟 (0-59) + second (int|str) – 秒 (0-59) + + 示例: + # 每天凌晨2点执行 + scheduler.add_cron_job(my_function, hour=2) + + # 每周一、三、五的下午5:30执行 + scheduler.add_cron_job(my_function, day_of_week='mon,wed,fri', hour=17, minute=30) + """ + trigger = CronTrigger(**trigger_args, timezone=self.timezone) + return self.scheduler.add_job( + func, + trigger=trigger, + id=job_id, + replace_existing=True + ) + + def add_interval_job(self, func, seconds=0, minutes=0, hours=0, days=0, job_id=None): + """添加间隔定时任务 + + 参数: + func: 要执行的函数 + seconds: 间隔秒数 + minutes: 间隔分钟数 + hours: 间隔小时数 + days: 间隔天数 + job_id: 任务ID,如果不指定则自动生成 + + 示例: + # 每30分钟执行一次 + scheduler.add_interval_job(my_function, minutes=30) + """ + trigger = IntervalTrigger( + seconds=seconds, + minutes=minutes, + hours=hours, + days=days, + timezone=self.timezone + ) + return self.scheduler.add_job( + func, + trigger=trigger, + id=job_id, + replace_existing=True + ) + + def add_one_time_job(self, func, run_date, job_id=None): + """添加一次性定时任务 + + 参数: + func: 要执行的函数 + run_date: 运行日期时间 + job_id: 任务ID,如果不指定则自动生成 + + 示例: + # 在指定时间执行一次 + scheduler.add_one_time_job(my_function, datetime(2023, 12, 31, 23, 59, 59)) + """ + trigger = DateTrigger(run_date=run_date, timezone=self.timezone) + return self.scheduler.add_job( + func, + trigger=trigger, + id=job_id, + replace_existing=True + ) + + def remove_job(self, job_id): + """移除定时任务""" + try: + self.scheduler.remove_job(job_id) + logger.info(f"已移除任务: {job_id}") + return True + except Exception as e: + logger.error(f"移除任务失败: {job_id}, 错误: {str(e)}") + return False + + def get_jobs(self): + """获取所有定时任务""" + return self.scheduler.get_jobs() + + def pause_job(self, job_id): + """暂停定时任务""" + try: + self.scheduler.pause_job(job_id) + logger.info(f"已暂停任务: {job_id}") + return True + except Exception as e: + logger.error(f"暂停任务失败: {job_id}, 错误: {str(e)}") + return False + + def resume_job(self, job_id): + """恢复定时任务""" + try: + self.scheduler.resume_job(job_id) + logger.info(f"已恢复任务: {job_id}") + return True + except Exception as e: + logger.error(f"恢复任务失败: {job_id}, 错误: {str(e)}") + return False + +# 创建全局调度器实例 +scheduler = TaskScheduler() \ No newline at end of file diff --git a/app/main.py b/app/main.py index 79d7b87..e386ac4 100644 --- a/app/main.py +++ b/app/main.py @@ -1,6 +1,6 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from app.api.endpoints import wechat,user, address, community, station, order, coupon, community_building, upload, merchant, merchant_product, merchant_order, point, config, merchant_category, log, account,merchant_pay_order, message, bank_card, withdraw, mp, point_product, point_product_order, coupon_activity, dashboard, wecom, feedback, timeperiod, community_timeperiod, order_additional_fee, ai, community_set, community_set_mapping, community_profit_sharing, partner, health +from app.api.endpoints import wechat,user, address, community, station, order, coupon, community_building, upload, merchant, merchant_product, merchant_order, point, config, merchant_category, log, account,merchant_pay_order, message, bank_card, withdraw, mp, point_product, point_product_order, coupon_activity, dashboard, wecom, feedback, timeperiod, community_timeperiod, order_additional_fee, ai, community_set, community_set_mapping, community_profit_sharing, partner, health, scheduler from app.models.database import Base, engine from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse @@ -16,15 +16,32 @@ from app.api.endpoints import feedback from starlette.middleware.sessions import SessionMiddleware import os from app.core.db_monitor import setup_db_monitor +from app.tasks import init_scheduler +from contextlib import asynccontextmanager # 创建数据库表 Base.metadata.create_all(bind=engine) +# 定义应用生命周期上下文管理器 +@asynccontextmanager +async def lifespan(app: FastAPI): + # 启动时执行 + # 初始化并启动任务调度器 + init_scheduler() + + yield # 应用运行期间 + + # 关闭时执行 + # 关闭任务调度器 + from app.core.scheduler import scheduler + scheduler.shutdown() + app = FastAPI( title="Beefast 蜂快到家", description="API 文档", version="1.0.0", - docs_url="/docs" if settings.DEBUG else None + docs_url="/docs" if settings.DEBUG else None, + lifespan=lifespan # 使用lifespan上下文管理器 ) # 设置数据库连接监控 @@ -45,8 +62,10 @@ app.add_middleware( app.add_middleware(RequestLoggerMiddleware) app.add_middleware(SessionMiddleware, secret_key=settings.SECRET_KEY) +# 添加定时任务路由 +app.include_router(scheduler.router, prefix="/api/scheduler", tags=["定时任务管理"]) -# 添加用户路由 +# 添加其他路由 app.include_router(ai.router, prefix="/api/ai", tags=["AI服务"]) app.include_router(dashboard.router, prefix="/api/dashboard", tags=["仪表盘"]) @@ -86,13 +105,10 @@ app.include_router(log.router, prefix="/api/logs", tags=["系统日志"]) app.include_router(feedback.router, prefix="/api/feedback", tags=["反馈"]) app.include_router(health.router, prefix="/api/health", tags=["系统健康检查"]) - @app.get("/") async def root(): return {"message": "欢迎使用 Beefast 蜂快到家 API"} - - @app.exception_handler(Exception) async def exception_handler(request, exc): env = "测试环境" if settings.DEBUG else "生产环境" diff --git a/app/tasks/__init__.py b/app/tasks/__init__.py new file mode 100644 index 0000000..5bb39ef --- /dev/null +++ b/app/tasks/__init__.py @@ -0,0 +1,23 @@ +import logging +from app.core.scheduler import scheduler +from app.tasks.daily_tasks import register_daily_tasks + +logger = logging.getLogger(__name__) + +def init_scheduler(): + """初始化并启动任务调度器""" + try: + # 注册所有定时任务 + register_daily_tasks() + + # 这里可以添加其他类型的定时任务注册 + # register_weekly_tasks() + # register_monthly_tasks() + + # 启动调度器 + scheduler.start() + + logger.info("任务调度器初始化完成并已启动") + except Exception as e: + logger.error(f"任务调度器初始化失败: {str(e)}") + raise \ No newline at end of file diff --git a/app/tasks/daily_tasks.py b/app/tasks/daily_tasks.py new file mode 100644 index 0000000..17ec90a --- /dev/null +++ b/app/tasks/daily_tasks.py @@ -0,0 +1,43 @@ +import logging +import asyncio +from datetime import datetime, timedelta +from app.models.database import get_db_context +from app.core.scheduler import scheduler +from sqlalchemy import text +import json +import os + +logger = logging.getLogger(__name__) + + + +async def daily_statistics_report(): + """每日统计报告任务""" + logger.info(f"开始生成每日统计报告: {datetime.now()}") + + try: + with get_db_context() as db: + + #获取昨日时间 + yesterday = datetime.now() - timedelta(days=1) + yesterday_start = datetime.combine(yesterday, datetime.min.time()) + yesterday_end = datetime.combine(yesterday, datetime.max.time()) + + + + except Exception as e: + logger.error(f"生成每日统计报告失败: {str(e)}") + + +def register_daily_tasks(): + """注册所有每日定时任务""" + + # 每天早上8点生成统计报告 + scheduler.add_cron_job( + daily_statistics_report, + hour=8, + minute=0, + job_id="daily_stats_report" + ) + + logger.info("已注册所有每日定时任务") \ No newline at end of file diff --git a/jobs.sqlite b/jobs.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..90423dd099c2db8aa7aae1db7ce1c72812ae4925 GIT binary patch literal 16384 zcmeI2UyIvD5Wpo{=YLnnAy6)T2o<5BHk=*YKD87Y-^HPa9lP~8{|Z5@rM0y=S#q_j z#uo_W`cN*gFXgr$p`W494^jFx+E0+jj&$~&eLknJZ8M96Mx&jbou6h6v)2AMZ6>HW z48k!H=BL_KP1m(Mrm1OK7ES}sc}sz?c(2bNyycqJYW{D<(i<&(g=?iu=~uX5Ljp(u z2_OL^fCP{L54=NbIga0-tJe8Ub+5Ry<&E{eY5kR-QG3}`6bupy>_?JkJS*k#RY+4e5c*r zS+e-Lw|BqMdt$blPs}oNDuv3Ism#`XRo4=N{5ub7c#A|LNbXm8)-OhohE&VN=~jeO z-~2^`0~-=R0!RP}AOR$R1dsp{Kmz{;0>_W_jk@tu#v#l*Oo{|({xV)V!FuU>8Z z{K@QpxtFRev)|K@v%ps^U8b0$U^+e%i+WKt7^AMn~~uSEX*+=6kz?7253 zvm#`p5e=36V4v^IoM+zq9>aCQ_#Rc7qz^6*)1`+cF^Er$< zqEPsVP@RA}e#-n|0OGeG?!_YL^H$_fn0??;^`q)3S!{4dZtuIqAGw4M0}>6~8?tae@P)fP9rvc( z7{_Ymest{KdaB$?GEaB=%&c^m9CnA7+om0IxZX1Vh|+@LM~J*Ck>AzLX=QDA1*Uyegp|I zAFwxbwB2zZv^4i&OIxwv)DC>-EYvj?RA!zrAz(Rlq5)*vl-f{(LvCc}i6IRVj<~AG z%)#?Sda2;OZLM8j*T#%PJk%P6#HPvihiZKxb9O*H%5AD%%i@?!Em$^!_%`LLm6pY& z9R*l7ar(y#ZBaZ?yM^qZ>GzVVUsLY&c)dim0{p(|rN1;dupt2?fCP{L5=7.3.1 pillow>=9.0.0 pytz==2024.1 dashscope>=1.13.0 -itsdangerous==2.2.0 \ No newline at end of file +itsdangerous==2.2.0 +apscheduler==3.11.0 \ No newline at end of file