from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session from sqlalchemy import text from app.models.database import get_db, get_active_sessions_count, get_long_running_sessions from app.core.response import success_response, ResponseModel from app.core.db_monitor import DBConnectionMonitor from typing import Optional, List router = APIRouter() @router.get("/health", response_model=ResponseModel) async def health_check(db: Session = Depends(get_db)): """健康检查端点,检查API和数据库连接状态""" # 尝试执行简单查询以验证数据库连接 try: db.execute(text("SELECT 1")).scalar() db_status = "healthy" except Exception as e: db_status = f"unhealthy: {str(e)}" # 获取连接池状态和性能统计 all_stats = DBConnectionMonitor.get_all_stats() # 获取活跃会话信息 active_sessions_count = get_active_sessions_count() long_running_sessions = get_long_running_sessions(threshold_seconds=30) return success_response(data={ "status": "ok", "database": { "status": db_status, "connection_pool": all_stats["connection_pool"], "active_sessions": active_sessions_count, "long_running_sessions": len(long_running_sessions) } }) @router.get("/stats", response_model=ResponseModel) async def performance_stats( include_slow_queries: bool = Query(False, description="是否包含慢查询记录"), include_long_sessions: bool = Query(False, description="是否包含长时间运行的会话详情"), min_duration: Optional[float] = Query(None, description="慢查询最小持续时间(秒)"), table_filter: Optional[str] = Query(None, description="按表名筛选慢查询"), query_type: Optional[str] = Query(None, description="按查询类型筛选(SELECT, INSERT, UPDATE, DELETE)"), limit: int = Query(20, ge=1, le=100, description="返回的慢查询数量限制"), db: Session = Depends(get_db) ): """获取详细的性能统计信息""" # 获取所有统计信息 all_stats = DBConnectionMonitor.get_all_stats() # 处理慢查询记录 if include_slow_queries and "slow_queries" in all_stats["performance_stats"]: # 获取原始慢查询列表 slow_queries = all_stats["performance_stats"]["slow_queries"] # 应用过滤条件 filtered_queries = slow_queries if min_duration is not None: filtered_queries = [q for q in filtered_queries if q.get("duration", 0) >= min_duration] if table_filter: filtered_queries = [q for q in filtered_queries if table_filter.lower() in q.get("table", "").lower()] if query_type: filtered_queries = [q for q in filtered_queries if q.get("query_type", "").upper() == query_type.upper()] # 按持续时间排序并限制数量 sorted_queries = sorted(filtered_queries, key=lambda x: x.get("duration", 0), reverse=True)[:limit] # 更新统计信息 all_stats["performance_stats"]["slow_queries"] = sorted_queries all_stats["performance_stats"]["slow_queries_total_count"] = len(slow_queries) all_stats["performance_stats"]["slow_queries_filtered_count"] = len(filtered_queries) elif "slow_queries" in all_stats["performance_stats"]: # 如果不包含慢查询记录,只返回计数 all_stats["performance_stats"]["slow_queries_count"] = len(all_stats["performance_stats"]["slow_queries"]) del all_stats["performance_stats"]["slow_queries"] # 添加会话信息 all_stats["sessions"] = { "active_count": get_active_sessions_count() } # 如果需要包含长时间运行的会话详情 if include_long_sessions: all_stats["sessions"]["long_running"] = get_long_running_sessions(threshold_seconds=30) else: all_stats["sessions"]["long_running_count"] = len(get_long_running_sessions(threshold_seconds=30)) return success_response(data=all_stats) @router.get("/slow-queries", response_model=ResponseModel) async def get_slow_queries( min_duration: Optional[float] = Query(0.5, description="最小持续时间(秒)"), table_filter: Optional[str] = Query(None, description="按表名筛选"), query_type: Optional[str] = Query(None, description="按查询类型筛选(SELECT, INSERT, UPDATE, DELETE)"), path_filter: Optional[str] = Query(None, description="按API路径筛选"), limit: int = Query(50, ge=1, le=100, description="返回的记录数量限制"), db: Session = Depends(get_db) ): """获取慢查询记录,支持多种过滤条件""" # 获取所有慢查询 all_stats = DBConnectionMonitor.get_all_stats() slow_queries = all_stats["performance_stats"].get("slow_queries", []) # 应用过滤条件 filtered_queries = slow_queries if min_duration is not None: filtered_queries = [q for q in filtered_queries if q.get("duration", 0) >= min_duration] if table_filter: filtered_queries = [q for q in filtered_queries if table_filter.lower() in q.get("table", "").lower()] if query_type: filtered_queries = [q for q in filtered_queries if q.get("query_type", "").upper() == query_type.upper()] if path_filter: filtered_queries = [q for q in filtered_queries if path_filter.lower() in q.get("path", "").lower()] # 按持续时间排序并限制数量 sorted_queries = sorted(filtered_queries, key=lambda x: x.get("duration", 0), reverse=True)[:limit] # 计算统计信息 stats = { "total_count": len(slow_queries), "filtered_count": len(filtered_queries), "displayed_count": len(sorted_queries), "queries": sorted_queries } # 如果有查询,计算平均持续时间 if sorted_queries: stats["avg_duration"] = sum(q.get("duration", 0) for q in sorted_queries) / len(sorted_queries) stats["max_duration"] = max(q.get("duration", 0) for q in sorted_queries) stats["min_duration"] = min(q.get("duration", 0) for q in sorted_queries) return success_response(data=stats)