From ab63e8541cc2e9cd619ec3c4a0b763d17a95bec1 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Wed, 12 Mar 2025 11:19:38 +0800 Subject: [PATCH] update --- app/api/endpoints/scheduler.py | 122 ++++++++++++++++++++++++++++++--- app/core/config.py | 7 +- app/main.py | 5 +- app/models/database.py | 39 ++++++++++- app/tasks/daily_tasks.py | 26 ++++--- jobs.sqlite | Bin 24576 -> 24576 bytes 6 files changed, 174 insertions(+), 25 deletions(-) diff --git a/app/api/endpoints/scheduler.py b/app/api/endpoints/scheduler.py index a7b1bb0..49253c7 100644 --- a/app/api/endpoints/scheduler.py +++ b/app/api/endpoints/scheduler.py @@ -9,6 +9,10 @@ from datetime import datetime import importlib import inspect import logging +from app.models.database import get_db_context +from sqlalchemy import text +import asyncio +import threading logger = logging.getLogger(__name__) @@ -107,13 +111,56 @@ async def get_scheduler_status( admin: UserDB = Depends(get_admin_user) ): """获取调度器状态""" - status = { - "running": scheduler.scheduler.running, - "job_count": len(scheduler.get_jobs()), - "timezone": str(scheduler.timezone) - } - - return success_response(data=status) + try: + # 测试数据库连接 + db_status = "正常" + db_error = None + try: + with get_db_context() as db: + # 执行简单查询测试连接 + result = db.execute(text("SELECT 1")).scalar() + if result != 1: + db_status = "异常" + db_error = "数据库连接测试失败" + except Exception as e: + db_status = "异常" + db_error = str(e) + logger.error(f"数据库连接测试失败: {str(e)}") + + # 获取调度器状态 + status = { + "running": scheduler.scheduler.running, + "job_count": len(scheduler.get_jobs()), + "timezone": str(scheduler.timezone), + "database_connection": db_status + } + + if db_error: + status["database_error"] = db_error + + # 获取所有任务的下一次执行时间 + jobs = scheduler.get_jobs() + next_runs = [] + for job in jobs: + next_runs.append({ + "job_id": job.id, + "next_run_time": job.next_run_time, + "active": job.next_run_time is not None + }) + + status["next_runs"] = next_runs + + return success_response(data=status) + except Exception as e: + logger.error(f"获取调度器状态失败: {str(e)}") + return success_response( + code=500, + message=f"获取调度器状态失败: {str(e)}", + data={ + "running": scheduler.scheduler.running if hasattr(scheduler, "scheduler") else False, + "error": str(e) + } + ) @router.post("/jobs/cron", response_model=ResponseModel) async def create_cron_job( @@ -227,4 +274,63 @@ async def get_available_tasks( return success_response(data=tasks) except Exception as e: logger.error(f"获取可用任务失败: {str(e)}") - raise HTTPException(status_code=500, detail=f"获取可用任务失败: {str(e)}") \ No newline at end of file + raise HTTPException(status_code=500, detail=f"获取可用任务失败: {str(e)}") + +@router.post("/jobs/{job_id}/run", response_model=ResponseModel) +async def run_job( + job_id: str = Path(..., description="任务ID"), + admin: UserDB = Depends(get_admin_user) +): + """手动触发定时任务执行""" + try: + # 获取任务 + job = scheduler.scheduler.get_job(job_id) + if not job: + raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在") + + # 获取任务函数 + func = job.func + + # 检查是否为异步函数 + is_async = inspect.iscoroutinefunction(func) + + # 执行任务 + if is_async: + # 对于异步函数,需要特殊处理 + try: + # 获取或创建事件循环 + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # 运行异步任务 + if loop.is_running(): + # 如果循环已经在运行,使用create_task + future = asyncio.ensure_future(func()) + # 可以添加回调函数处理结果 + future.add_done_callback(lambda f: logger.info(f"任务 {job_id} 手动执行完成")) + message = f"任务 {job_id} 已触发异步执行" + else: + # 如果循环没有运行,直接运行到完成 + loop.run_until_complete(func()) + message = f"任务 {job_id} 已同步执行完成" + except Exception as e: + logger.error(f"手动执行异步任务 {job_id} 失败: {str(e)}") + raise HTTPException(status_code=500, detail=f"执行任务失败: {str(e)}") + else: + # 对于同步函数,直接执行 + try: + func() + message = f"任务 {job_id} 已执行完成" + except Exception as e: + logger.error(f"手动执行任务 {job_id} 失败: {str(e)}") + raise HTTPException(status_code=500, detail=f"执行任务失败: {str(e)}") + + return success_response(message=message) + except HTTPException: + raise + except Exception as e: + logger.error(f"手动触发任务 {job_id} 失败: {str(e)}") + raise HTTPException(status_code=500, detail=f"触发任务失败: {str(e)}") \ No newline at end of file diff --git a/app/core/config.py b/app/core/config.py index 946a9da..4628507 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -7,6 +7,7 @@ class Settings(BaseSettings): DEBUG: bool = True # 开发模式标志 API_V1_STR: str = "/api/v1" PROJECT_NAME: str = "FastAPI 项目" + ENV_NAME: str = "开发环境" API_BASE_URL: str = "https://api-dev.beefast.co" @@ -123,7 +124,9 @@ class Settings(BaseSettings): class DevSettings(Settings): DEBUG: bool = True # 开发模式标志 API_V1_STR: str = "/api/v1" - PROJECT_NAME: str = "FastAPI 项目 (开发环境)" + PROJECT_NAME: str = "FastAPI 项目 (测试环境)" + ENV_NAME: str = "测试环境" + API_BASE_URL: str = "https://api-dev.beefast.co" @@ -148,7 +151,7 @@ class ProdSettings(Settings): DEBUG: bool = False # 生产模式标志 API_V1_STR: str = "/api/v1" PROJECT_NAME: str = "FastAPI 项目 (生产环境)" - + ENV_NAME: str = "生产环境" API_BASE_URL: str = "https://api.beefast.co" # 数据库配置 diff --git a/app/main.py b/app/main.py index 1366472..418b538 100644 --- a/app/main.py +++ b/app/main.py @@ -119,9 +119,10 @@ async def exception_handler(request, exc): # 发送企业微信消息 wecom_bot = WecomBot(settings.URL_WECOMBOT_SYS_EXCEPTION) - env = os.getenv("APP_ENV", "dev").lower() + exception_log = f"""**API异常** + +**环境:{settings.ENV_NAME}** - exception_log = f"""{env}环境**API异常** **请求信息** > 请求方法:{request.method} > 请求URL:{request.url} diff --git a/app/models/database.py b/app/models/database.py index 163c771..8ae80b6 100644 --- a/app/models/database.py +++ b/app/models/database.py @@ -18,10 +18,11 @@ pymysql.install_as_MySQLdb() engine = create_engine( settings.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True, # 自动处理断开的连接 - pool_recycle=3600, # 连接回收时间(1小时) + pool_recycle=1800, # 连接回收时间(30分钟) pool_size=10, # 连接池大小 max_overflow=20, # 允许的最大连接数超出pool_size的数量 - pool_timeout=30 # 获取连接的超时时间(秒 + pool_timeout=30, # 获取连接的超时时间(秒) + echo_pool=settings.DEBUG # 在调试模式下记录连接池事件 ) # 创建线程安全的会话工厂 @@ -41,6 +42,40 @@ def get_db_monitor(): from app.core.db_monitor import DBConnectionMonitor return DBConnectionMonitor +# 添加连接池事件监听器 +@event.listens_for(engine, "connect") +def connect(dbapi_connection, connection_record): + """连接创建时的回调""" + logger.debug("数据库连接已创建") + +@event.listens_for(engine, "checkout") +def checkout(dbapi_connection, connection_record, connection_proxy): + """连接从池中取出时的回调""" + # 记录连接被取出的时间 + connection_record.info['checkout_time'] = time.time() + + # 验证连接是否有效 + try: + # 执行简单查询测试连接 + cursor = dbapi_connection.cursor() + cursor.execute("SELECT 1") + cursor.close() + except Exception as e: + # 如果连接无效,断开它 + logger.warning(f"检测到无效的数据库连接: {str(e)}") + connection_proxy._pool.dispose() + raise + +@event.listens_for(engine, "checkin") +def checkin(dbapi_connection, connection_record): + """连接归还到池中时的回调""" + checkout_time = connection_record.info.get('checkout_time') + if checkout_time: + # 计算连接被使用的时间 + used_time = time.time() - checkout_time + if used_time > 10: # 记录使用时间超过10秒的连接 + logger.warning(f"数据库连接使用时间较长: {used_time:.2f}秒") + # 添加事件监听器,记录长时间运行的查询 @event.listens_for(engine, "before_cursor_execute") def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): diff --git a/app/tasks/daily_tasks.py b/app/tasks/daily_tasks.py index a203ba7..4957ccd 100644 --- a/app/tasks/daily_tasks.py +++ b/app/tasks/daily_tasks.py @@ -151,14 +151,16 @@ async def daily_community_order_statistics(): logger.error(f"保存订单统计数据到数据库失败: {str(e)}") # 生成报告消息 - message = f"""## {yesterday_str} 小区订单统计报告 - + message = f"""### {yesterday_str} 小区订单统计报告 + +**环境:{settings.ENV_NAME}** + ### 总计 -- 订单总数: {total_order_count} -- 订单总金额: ¥{total_original_amount:.2f} -- 支付总金额: ¥{total_final_amount:.2f} -- 有订单小区数: {total_communities} -- 总小区数: {len(community_stats)} +> - 订单总数: {total_order_count} +> - 订单总金额: ¥{total_original_amount:.2f} +> - 支付总金额: ¥{total_final_amount:.2f} +> - 有订单小区数: {total_communities} +> - 总小区数: {len(community_stats)} ### 小区排名 (前5名) """ @@ -166,10 +168,11 @@ async def daily_community_order_statistics(): # 添加前5个小区的数据 for i, item in enumerate(communities_with_orders[:5], 1): message += f""" -{i}. **{item['community_name']}** - - 订单数: {item['order_count']} - - 订单金额: ¥{item['total_original_amount']:.2f} - - 支付金额: ¥{item['total_final_amount']:.2f} +> {i}. **{item['community_name']}** + > - 订单数: {item['order_count']} + > - 订单金额: ¥{item['total_original_amount']:.2f} + > - 支付金额: ¥{item['total_final_amount']:.2f} + """ # 发送企业微信消息 @@ -209,6 +212,7 @@ def run_daily_community_order_statistics(): except Exception as e: logger.error(f"运行每日小区订单统计任务失败: {str(e)}") + def register_daily_tasks(): """注册所有每日定时任务""" diff --git a/jobs.sqlite b/jobs.sqlite index 1f6f50d4a2e5eba01ec91f80c6311457f43c9e66..b6dcd28672a0ba76bbf5234bbaafa524612735a6 100644 GIT binary patch delta 220 zcmZoTz}Rqrae_2s`a~IL#`KK|OZd5%*u@yw#kdzU?cCVN!9H0`T!s1K%?+Dv#RFK( zc$pbMU`mEW+mxUwrFuX%8v{c}6f>(X%S67E#LS$^_>`ph9Ly|?AiJ1hjLrY;85ID|PeVHZ delta 139 zcmZoTz}Rqrae_3X&qNt#MxTudOZd4M*~J*x#n{D|c5ZCsV4o}|uCm$6J(-1tfq^A? zV`Dsv02lu&21fo<4E(3~UxDCeL4#fVlTYa@s4*}whygJg-~#d)`M)#pf9L-Jf2h+hAbk#