This commit is contained in:
aaron 2025-03-12 11:19:38 +08:00
parent 64cdc5d648
commit ab63e8541c
6 changed files with 174 additions and 25 deletions

View File

@ -9,6 +9,10 @@ from datetime import datetime
import importlib
import inspect
import logging
from app.models.database import get_db_context
from sqlalchemy import text
import asyncio
import threading
logger = logging.getLogger(__name__)
@ -107,13 +111,56 @@ async def get_scheduler_status(
admin: UserDB = Depends(get_admin_user)
):
"""获取调度器状态"""
status = {
"running": scheduler.scheduler.running,
"job_count": len(scheduler.get_jobs()),
"timezone": str(scheduler.timezone)
}
return success_response(data=status)
try:
# 测试数据库连接
db_status = "正常"
db_error = None
try:
with get_db_context() as db:
# 执行简单查询测试连接
result = db.execute(text("SELECT 1")).scalar()
if result != 1:
db_status = "异常"
db_error = "数据库连接测试失败"
except Exception as e:
db_status = "异常"
db_error = str(e)
logger.error(f"数据库连接测试失败: {str(e)}")
# 获取调度器状态
status = {
"running": scheduler.scheduler.running,
"job_count": len(scheduler.get_jobs()),
"timezone": str(scheduler.timezone),
"database_connection": db_status
}
if db_error:
status["database_error"] = db_error
# 获取所有任务的下一次执行时间
jobs = scheduler.get_jobs()
next_runs = []
for job in jobs:
next_runs.append({
"job_id": job.id,
"next_run_time": job.next_run_time,
"active": job.next_run_time is not None
})
status["next_runs"] = next_runs
return success_response(data=status)
except Exception as e:
logger.error(f"获取调度器状态失败: {str(e)}")
return success_response(
code=500,
message=f"获取调度器状态失败: {str(e)}",
data={
"running": scheduler.scheduler.running if hasattr(scheduler, "scheduler") else False,
"error": str(e)
}
)
@router.post("/jobs/cron", response_model=ResponseModel)
async def create_cron_job(
@ -227,4 +274,63 @@ async def get_available_tasks(
return success_response(data=tasks)
except Exception as e:
logger.error(f"获取可用任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取可用任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取可用任务失败: {str(e)}")
@router.post("/jobs/{job_id}/run", response_model=ResponseModel)
async def run_job(
job_id: str = Path(..., description="任务ID"),
admin: UserDB = Depends(get_admin_user)
):
"""手动触发定时任务执行"""
try:
# 获取任务
job = scheduler.scheduler.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail=f"任务 {job_id} 不存在")
# 获取任务函数
func = job.func
# 检查是否为异步函数
is_async = inspect.iscoroutinefunction(func)
# 执行任务
if is_async:
# 对于异步函数,需要特殊处理
try:
# 获取或创建事件循环
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 运行异步任务
if loop.is_running():
# 如果循环已经在运行使用create_task
future = asyncio.ensure_future(func())
# 可以添加回调函数处理结果
future.add_done_callback(lambda f: logger.info(f"任务 {job_id} 手动执行完成"))
message = f"任务 {job_id} 已触发异步执行"
else:
# 如果循环没有运行,直接运行到完成
loop.run_until_complete(func())
message = f"任务 {job_id} 已同步执行完成"
except Exception as e:
logger.error(f"手动执行异步任务 {job_id} 失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"执行任务失败: {str(e)}")
else:
# 对于同步函数,直接执行
try:
func()
message = f"任务 {job_id} 已执行完成"
except Exception as e:
logger.error(f"手动执行任务 {job_id} 失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"执行任务失败: {str(e)}")
return success_response(message=message)
except HTTPException:
raise
except Exception as e:
logger.error(f"手动触发任务 {job_id} 失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"触发任务失败: {str(e)}")

View File

@ -7,6 +7,7 @@ class Settings(BaseSettings):
DEBUG: bool = True # 开发模式标志
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "FastAPI 项目"
ENV_NAME: str = "开发环境"
API_BASE_URL: str = "https://api-dev.beefast.co"
@ -123,7 +124,9 @@ class Settings(BaseSettings):
class DevSettings(Settings):
DEBUG: bool = True # 开发模式标志
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "FastAPI 项目 (开发环境)"
PROJECT_NAME: str = "FastAPI 项目 (测试环境)"
ENV_NAME: str = "测试环境"
API_BASE_URL: str = "https://api-dev.beefast.co"
@ -148,7 +151,7 @@ class ProdSettings(Settings):
DEBUG: bool = False # 生产模式标志
API_V1_STR: str = "/api/v1"
PROJECT_NAME: str = "FastAPI 项目 (生产环境)"
ENV_NAME: str = "生产环境"
API_BASE_URL: str = "https://api.beefast.co"
# 数据库配置

View File

@ -119,9 +119,10 @@ async def exception_handler(request, exc):
# 发送企业微信消息
wecom_bot = WecomBot(settings.URL_WECOMBOT_SYS_EXCEPTION)
env = os.getenv("APP_ENV", "dev").lower()
exception_log = f"""**API异常**
**环境{settings.ENV_NAME}**
exception_log = f"""{env}环境**API异常**
**请求信息**
> 请求方法{request.method}
> 请求URL{request.url}

View File

@ -18,10 +18,11 @@ pymysql.install_as_MySQLdb()
engine = create_engine(
settings.SQLALCHEMY_DATABASE_URL,
pool_pre_ping=True, # 自动处理断开的连接
pool_recycle=3600, # 连接回收时间1小时
pool_recycle=1800, # 连接回收时间30分钟
pool_size=10, # 连接池大小
max_overflow=20, # 允许的最大连接数超出pool_size的数量
pool_timeout=30 # 获取连接的超时时间(秒
pool_timeout=30, # 获取连接的超时时间(秒)
echo_pool=settings.DEBUG # 在调试模式下记录连接池事件
)
# 创建线程安全的会话工厂
@ -41,6 +42,40 @@ def get_db_monitor():
from app.core.db_monitor import DBConnectionMonitor
return DBConnectionMonitor
# 添加连接池事件监听器
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
"""连接创建时的回调"""
logger.debug("数据库连接已创建")
@event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):
"""连接从池中取出时的回调"""
# 记录连接被取出的时间
connection_record.info['checkout_time'] = time.time()
# 验证连接是否有效
try:
# 执行简单查询测试连接
cursor = dbapi_connection.cursor()
cursor.execute("SELECT 1")
cursor.close()
except Exception as e:
# 如果连接无效,断开它
logger.warning(f"检测到无效的数据库连接: {str(e)}")
connection_proxy._pool.dispose()
raise
@event.listens_for(engine, "checkin")
def checkin(dbapi_connection, connection_record):
"""连接归还到池中时的回调"""
checkout_time = connection_record.info.get('checkout_time')
if checkout_time:
# 计算连接被使用的时间
used_time = time.time() - checkout_time
if used_time > 10: # 记录使用时间超过10秒的连接
logger.warning(f"数据库连接使用时间较长: {used_time:.2f}")
# 添加事件监听器,记录长时间运行的查询
@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):

View File

@ -151,14 +151,16 @@ async def daily_community_order_statistics():
logger.error(f"保存订单统计数据到数据库失败: {str(e)}")
# 生成报告消息
message = f"""## {yesterday_str} 小区订单统计报告
message = f"""### {yesterday_str} 小区订单统计报告
**环境{settings.ENV_NAME}**
### 总计
- 订单总数: {total_order_count}
- 订单总金额: ¥{total_original_amount:.2f}
- 支付总金额: ¥{total_final_amount:.2f}
- 有订单小区数: {total_communities}
- 总小区数: {len(community_stats)}
> - 订单总数: {total_order_count}
> - 订单总金额: ¥{total_original_amount:.2f}
> - 支付总金额: ¥{total_final_amount:.2f}
> - 有订单小区数: {total_communities}
> - 总小区数: {len(community_stats)}
### 小区排名 (前5名)
"""
@ -166,10 +168,11 @@ async def daily_community_order_statistics():
# 添加前5个小区的数据
for i, item in enumerate(communities_with_orders[:5], 1):
message += f"""
{i}. **{item['community_name']}**
- 订单数: {item['order_count']}
- 订单金额: ¥{item['total_original_amount']:.2f}
- 支付金额: ¥{item['total_final_amount']:.2f}
> {i}. **{item['community_name']}**
> - 订单数: {item['order_count']}
> - 订单金额: ¥{item['total_original_amount']:.2f}
> - 支付金额: ¥{item['total_final_amount']:.2f}
"""
# 发送企业微信消息
@ -209,6 +212,7 @@ def run_daily_community_order_statistics():
except Exception as e:
logger.error(f"运行每日小区订单统计任务失败: {str(e)}")
def register_daily_tasks():
"""注册所有每日定时任务"""

Binary file not shown.