from fastapi import APIRouter, Depends, Query from sqlalchemy.orm import Session from app.models.database import get_db from app.models.request_log import RequestLogDB from app.core.response import success_response, error_response, ResponseModel from app.api.deps import get_admin_user from app.models.user import UserDB from datetime import datetime, timedelta from typing import Optional from sqlalchemy import and_ router = APIRouter() @router.get("/request-logs", response_model=ResponseModel) async def get_request_logs( start_time: Optional[datetime] = Query(None, description="开始时间"), end_time: Optional[datetime] = Query(None, description="结束时间"), path: Optional[str] = Query(None, description="请求路径"), method: Optional[str] = Query(None, description="请求方法"), user_id: Optional[int] = Query(None, description="用户ID"), skip: int = Query(0, description="分页偏移"), limit: int = Query(20, description="每页数量"), db: Session = Depends(get_db), current_user: UserDB = Depends(get_admin_user) # 只有管理员可以查看日志 ): """查询请求日志 Args: start_time: 开始时间 end_time: 结束时间 path: 请求路径(支持模糊查询) method: 请求方法 user_id: 用户ID skip: 分页偏移 limit: 每页数量 """ # 构建查询条件 conditions = [] if start_time: conditions.append(RequestLogDB.create_time >= start_time) if end_time: conditions.append(RequestLogDB.create_time <= end_time) if path: conditions.append(RequestLogDB.path.like(f"%{path}%")) if method: conditions.append(RequestLogDB.method == method.upper()) if user_id: conditions.append(RequestLogDB.user_id == user_id) # 执行查询 query = db.query(RequestLogDB) if conditions: query = query.filter(and_(*conditions)) # 获取总数 total = query.count() # 获取分页数据 logs = query.order_by( RequestLogDB.create_time.desc() ).offset(skip).limit(limit).all() # 转换为响应格式 log_list = [{ "id": log.id, "path": log.path, "method": log.method, "body": log.body, "user_id": log.user_id, "ip_address": log.ip_address, "status_code": log.status_code, "response_time": log.response_time, "create_time": log.create_time } for log in logs] return success_response(data={ "total": total, "items": log_list })