deliveryman-api/app/api/endpoints/health.py
2025-03-10 21:15:01 +08:00

140 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from sqlalchemy import text
from app.models.database import get_db, get_active_sessions_count, get_long_running_sessions
from app.core.response import success_response, ResponseModel
from app.core.db_monitor import DBConnectionMonitor
from typing import Optional, List
router = APIRouter()
@router.get("/health", response_model=ResponseModel)
async def health_check(db: Session = Depends(get_db)):
"""健康检查端点检查API和数据库连接状态"""
# 尝试执行简单查询以验证数据库连接
try:
db.execute(text("SELECT 1")).scalar()
db_status = "healthy"
except Exception as e:
db_status = f"unhealthy: {str(e)}"
# 获取连接池状态和性能统计
all_stats = DBConnectionMonitor.get_all_stats()
# 获取活跃会话信息
active_sessions_count = get_active_sessions_count()
long_running_sessions = get_long_running_sessions(threshold_seconds=30)
return success_response(data={
"status": "ok",
"database": {
"status": db_status,
"connection_pool": all_stats["connection_pool"],
"active_sessions": active_sessions_count,
"long_running_sessions": len(long_running_sessions)
}
})
@router.get("/stats", response_model=ResponseModel)
async def performance_stats(
include_slow_queries: bool = Query(False, description="是否包含慢查询记录"),
include_long_sessions: bool = Query(False, description="是否包含长时间运行的会话详情"),
min_duration: Optional[float] = Query(None, description="慢查询最小持续时间(秒)"),
table_filter: Optional[str] = Query(None, description="按表名筛选慢查询"),
query_type: Optional[str] = Query(None, description="按查询类型筛选SELECT, INSERT, UPDATE, DELETE"),
limit: int = Query(20, ge=1, le=100, description="返回的慢查询数量限制"),
db: Session = Depends(get_db)
):
"""获取详细的性能统计信息"""
# 获取所有统计信息
all_stats = DBConnectionMonitor.get_all_stats()
# 处理慢查询记录
if include_slow_queries and "slow_queries" in all_stats["performance_stats"]:
# 获取原始慢查询列表
slow_queries = all_stats["performance_stats"]["slow_queries"]
# 应用过滤条件
filtered_queries = slow_queries
if min_duration is not None:
filtered_queries = [q for q in filtered_queries if q.get("duration", 0) >= min_duration]
if table_filter:
filtered_queries = [q for q in filtered_queries if table_filter.lower() in q.get("table", "").lower()]
if query_type:
filtered_queries = [q for q in filtered_queries if q.get("query_type", "").upper() == query_type.upper()]
# 按持续时间排序并限制数量
sorted_queries = sorted(filtered_queries, key=lambda x: x.get("duration", 0), reverse=True)[:limit]
# 更新统计信息
all_stats["performance_stats"]["slow_queries"] = sorted_queries
all_stats["performance_stats"]["slow_queries_total_count"] = len(slow_queries)
all_stats["performance_stats"]["slow_queries_filtered_count"] = len(filtered_queries)
elif "slow_queries" in all_stats["performance_stats"]:
# 如果不包含慢查询记录,只返回计数
all_stats["performance_stats"]["slow_queries_count"] = len(all_stats["performance_stats"]["slow_queries"])
del all_stats["performance_stats"]["slow_queries"]
# 添加会话信息
all_stats["sessions"] = {
"active_count": get_active_sessions_count()
}
# 如果需要包含长时间运行的会话详情
if include_long_sessions:
all_stats["sessions"]["long_running"] = get_long_running_sessions(threshold_seconds=30)
else:
all_stats["sessions"]["long_running_count"] = len(get_long_running_sessions(threshold_seconds=30))
return success_response(data=all_stats)
@router.get("/slow-queries", response_model=ResponseModel)
async def get_slow_queries(
min_duration: Optional[float] = Query(0.5, description="最小持续时间(秒)"),
table_filter: Optional[str] = Query(None, description="按表名筛选"),
query_type: Optional[str] = Query(None, description="按查询类型筛选SELECT, INSERT, UPDATE, DELETE"),
path_filter: Optional[str] = Query(None, description="按API路径筛选"),
limit: int = Query(50, ge=1, le=100, description="返回的记录数量限制"),
db: Session = Depends(get_db)
):
"""获取慢查询记录,支持多种过滤条件"""
# 获取所有慢查询
all_stats = DBConnectionMonitor.get_all_stats()
slow_queries = all_stats["performance_stats"].get("slow_queries", [])
# 应用过滤条件
filtered_queries = slow_queries
if min_duration is not None:
filtered_queries = [q for q in filtered_queries if q.get("duration", 0) >= min_duration]
if table_filter:
filtered_queries = [q for q in filtered_queries if table_filter.lower() in q.get("table", "").lower()]
if query_type:
filtered_queries = [q for q in filtered_queries if q.get("query_type", "").upper() == query_type.upper()]
if path_filter:
filtered_queries = [q for q in filtered_queries if path_filter.lower() in q.get("path", "").lower()]
# 按持续时间排序并限制数量
sorted_queries = sorted(filtered_queries, key=lambda x: x.get("duration", 0), reverse=True)[:limit]
# 计算统计信息
stats = {
"total_count": len(slow_queries),
"filtered_count": len(filtered_queries),
"displayed_count": len(sorted_queries),
"queries": sorted_queries
}
# 如果有查询,计算平均持续时间
if sorted_queries:
stats["avg_duration"] = sum(q.get("duration", 0) for q in sorted_queries) / len(sorted_queries)
stats["max_duration"] = max(q.get("duration", 0) for q in sorted_queries)
stats["min_duration"] = min(q.get("duration", 0) for q in sorted_queries)
return success_response(data=stats)