astock-agent/backend/app/api/debug.py
2026-05-19 11:25:21 +08:00

501 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Debug API — 系统日志与运行状态"""
import json
import os
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends
from sqlalchemy import text
from app.core.deps import get_current_admin
from app.db.database import get_db
from app.config import settings, is_trading_hours
router = APIRouter(prefix="/api/debug", tags=["debug"])
@router.get("/errors")
async def get_errors(
limit: int = 50,
source: str = None,
level: str = None,
days: int = 7,
_admin: dict = Depends(get_current_admin),
):
"""获取错误日志(管理员)"""
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
async with get_db() as db:
conditions = ["created_at >= :start"]
params = {"start": start}
if source:
conditions.append("source = :source")
params["source"] = source
if level:
conditions.append("level = :level")
params["level"] = level
where = " AND ".join(conditions)
# 总数
count_result = await db.execute(
text(f"SELECT COUNT(*) FROM error_logs WHERE {where}"), params
)
total = count_result.scalar() or 0
# 查询
params["limit"] = limit
result = await db.execute(
text(
f"SELECT id, source, level, message, detail, created_at "
f"FROM error_logs WHERE {where} "
f"ORDER BY created_at DESC LIMIT :limit"
),
params,
)
rows = result.fetchall()
errors = []
for row in rows:
r = row._mapping
errors.append({
"id": r["id"],
"source": r["source"],
"level": r["level"],
"message": r["message"],
"detail": r["detail"] or "",
"created_at": str(r["created_at"]) if r["created_at"] else "",
})
# 可选的 source/level 列表(用于前端过滤)
sources_result = await db.execute(
text("SELECT DISTINCT source FROM error_logs ORDER BY source")
)
sources = [r[0] for r in sources_result.fetchall()]
levels_result = await db.execute(
text("SELECT DISTINCT level FROM error_logs ORDER BY level")
)
levels = [r[0] for r in levels_result.fetchall()]
return {
"total": total,
"errors": errors,
"sources": sources,
"levels": levels,
}
@router.delete("/errors")
async def clear_errors(
days: int = 30,
_admin: dict = Depends(get_current_admin),
):
"""清除旧错误日志(管理员)"""
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
async with get_db() as db:
result = await db.execute(
text("DELETE FROM error_logs WHERE created_at < :cutoff"),
{"cutoff": cutoff},
)
deleted = result.rowcount
await db.commit()
return {"status": "ok", "deleted": deleted}
@router.get("/system")
async def system_status(_admin: dict = Depends(get_current_admin)):
"""系统运行状态摘要(管理员)"""
from app.engine.recommender import _scan_running, _scan_lock
async with get_db() as db:
# 各表数据量
tables_counts = {}
for t in ["recommendations", "sector_heat", "market_temperature",
"recommendation_tracking", "stock_diagnoses",
"error_logs", "scan_process_logs", "users"]:
result = await db.execute(text(f"SELECT COUNT(*) FROM {t}"))
tables_counts[t] = result.scalar() or 0
# 最近 24h 错误数
since = (datetime.now() - timedelta(hours=24)).strftime("%Y-%m-%d %H:%M:%S")
result = await db.execute(
text("SELECT COUNT(*) FROM error_logs WHERE created_at >= :since"),
{"since": since},
)
recent_errors = result.scalar() or 0
# 最近错误
result = await db.execute(
text("SELECT source, message, created_at FROM error_logs ORDER BY created_at DESC LIMIT 5")
)
last_errors = [
{"source": r[0], "message": r[1], "created_at": str(r[2])}
for r in result.fetchall()
]
# 数据库文件大小
db_path = settings.database_url.replace("sqlite:///", "")
db_size_mb = 0
if os.path.exists(db_path):
db_size_mb = round(os.path.getsize(db_path) / 1024 / 1024, 2)
return {
"is_trading": is_trading_hours(),
"scan_running": _scan_running,
"scan_locked": _scan_lock.locked(),
"recent_errors": recent_errors,
"last_errors": last_errors,
"tables_counts": tables_counts,
"db_size_mb": db_size_mb,
}
def _decode_detail(raw: str | None) -> dict:
if not raw:
return {}
try:
parsed = json.loads(raw)
return parsed if isinstance(parsed, dict) else {"value": parsed}
except Exception:
return {"raw": raw}
def _row_observation(row) -> dict:
r = row._mapping
return {
"id": r["id"],
"scan_session": r["scan_session"],
"scan_mode": r["scan_mode"] or "",
"ts_code": r["ts_code"],
"name": r["name"],
"theme_name": r["theme_name"] or "",
"stock_role": r["stock_role"] or "",
"action_plan": r["action_plan"] or "观察",
"final_score": r["final_score"] or 0,
"catalyst_score": r["catalyst_score"] or 0,
"theme_money_score": r["theme_money_score"] or 0,
"stock_money_score": r["stock_money_score"] or 0,
"emotion_role_score": r["emotion_role_score"] or 0,
"timing_score": r["timing_score"] or 0,
"entry_signal_type": r["entry_signal_type"] or "none",
"elimination_reason": r["elimination_reason"] or "",
"detail": _decode_detail(r["detail_json"]),
"created_at": str(r["created_at"]) if r["created_at"] else "",
}
@router.get("/scan-logs")
async def get_scan_logs(
limit: int = 100,
scan_session: str = None,
days: int = 7,
_admin: dict = Depends(get_current_admin),
):
"""获取筛选过程日志(管理员)"""
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
limit = max(1, min(limit, 300))
async with get_db() as db:
selected_session = scan_session
if not selected_session:
latest = await db.execute(
text(
"SELECT scan_session FROM scan_process_logs "
"WHERE created_at >= :start "
"ORDER BY created_at DESC LIMIT 1"
),
{"start": start},
)
selected_session = latest.scalar()
if not selected_session:
return {"scan_session": None, "logs": []}
result = await db.execute(
text(
"SELECT id, scan_session, scan_mode, stage, stage_label, status, "
"input_count, output_count, filtered_count, summary, detail_json, created_at "
"FROM scan_process_logs "
"WHERE created_at >= :start AND scan_session = :scan_session "
"ORDER BY created_at ASC LIMIT :limit"
),
{"start": start, "scan_session": selected_session, "limit": limit},
)
rows = result.fetchall()
logs = []
for row in rows:
r = row._mapping
logs.append({
"id": r["id"],
"scan_session": r["scan_session"],
"scan_mode": r["scan_mode"] or "",
"stage": r["stage"],
"stage_label": r["stage_label"],
"status": r["status"] or "ok",
"input_count": r["input_count"] or 0,
"output_count": r["output_count"] or 0,
"filtered_count": r["filtered_count"] or 0,
"summary": r["summary"] or "",
"detail": _decode_detail(r["detail_json"]),
"created_at": str(r["created_at"]) if r["created_at"] else "",
})
return {
"scan_session": selected_session,
"logs": logs,
}
@router.get("/research-observations")
async def get_research_observations(
scan_session: str = None,
limit: int = 80,
days: int = 7,
_admin: dict = Depends(get_current_admin),
):
"""获取候选股投研观察记录(管理员)"""
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
limit = max(1, min(limit, 200))
async with get_db() as db:
selected_session = scan_session
if not selected_session:
latest = await db.execute(
text(
"SELECT scan_session FROM research_observations "
"WHERE created_at >= :start ORDER BY created_at DESC LIMIT 1"
),
{"start": start},
)
selected_session = latest.scalar()
if not selected_session:
return {"scan_session": None, "observations": [], "reason_counts": {}}
result = await db.execute(
text(
"SELECT id, scan_session, scan_mode, ts_code, name, theme_name, stock_role, "
"action_plan, final_score, catalyst_score, theme_money_score, stock_money_score, "
"emotion_role_score, timing_score, entry_signal_type, elimination_reason, detail_json, created_at "
"FROM research_observations "
"WHERE created_at >= :start AND scan_session = :scan_session "
"ORDER BY final_score DESC LIMIT :limit"
),
{"start": start, "scan_session": selected_session, "limit": limit},
)
rows = result.fetchall()
observations = [_row_observation(row) for row in rows]
reason_counts = {}
for item in observations:
for part in (item["elimination_reason"] or "未知").split(""):
if not part:
continue
reason_counts[part] = reason_counts.get(part, 0) + 1
return {
"scan_session": selected_session,
"observations": observations,
"reason_counts": reason_counts,
}
@router.get("/scan-sessions")
async def get_scan_sessions(
days: int = 7,
limit: int = 30,
_admin: dict = Depends(get_current_admin),
):
"""获取筛选会话摘要(管理员)"""
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
limit = max(1, min(limit, 100))
async with get_db() as db:
result = await db.execute(
text(
"SELECT scan_session, scan_mode, stage, status, input_count, "
"output_count, filtered_count, summary, created_at "
"FROM scan_process_logs "
"WHERE created_at >= :start "
"ORDER BY created_at DESC LIMIT 1000"
),
{"start": start},
)
rows = result.fetchall()
sessions = {}
order = []
for row in rows:
r = row._mapping
session_id = r["scan_session"]
if session_id not in sessions:
sessions[session_id] = {
"scan_session": session_id,
"scan_mode": r["scan_mode"] or "",
"created_at": str(r["created_at"]) if r["created_at"] else "",
"stage_count": 0,
"status": "ok",
"input_count": 0,
"final_count": 0,
"drop_count": 0,
"last_summary": r["summary"] or "",
}
order.append(session_id)
item = sessions[session_id]
item["stage_count"] += 1
item["input_count"] = max(item["input_count"], int(r["input_count"] or 0))
item["drop_count"] += int(r["filtered_count"] or 0)
if r["stage"] == "final_filter" and item["final_count"] == 0:
item["final_count"] = int(r["output_count"] or 0)
item["last_summary"] = r["summary"] or item["last_summary"]
status = (r["status"] or "ok").lower()
if status in {"failed", "error", "critical"}:
item["status"] = "failed"
elif status in {"warning", "empty"} and item["status"] == "ok":
item["status"] = status
return {
"sessions": [sessions[sid] for sid in order[:limit]],
}
@router.get("/data-source-health")
async def get_data_source_health(
days: int = 7,
_admin: dict = Depends(get_current_admin),
):
"""数据源健康摘要(管理员,只读)。"""
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
known_sources = ["eastmoney", "tencent", "tushare", "akshare", "sina", "news", "tushare_news"]
health = {
source: {
"source": source,
"status": "ok",
"error_count": 0,
"warning_count": 0,
"last_error": "",
"last_seen_at": "",
}
for source in known_sources
}
async with get_db() as db:
result = await db.execute(
text(
"SELECT source, level, message, created_at FROM error_logs "
"WHERE created_at >= :start "
"ORDER BY created_at DESC LIMIT 500"
),
{"start": start},
)
rows = result.fetchall()
table_rows = {}
for table_name, sql in {
"market_temperature": "SELECT trade_date, created_at FROM market_temperature ORDER BY id DESC LIMIT 1",
"sector_heat": "SELECT trade_date, created_at FROM sector_heat ORDER BY id DESC LIMIT 1",
"recommendations": "SELECT created_at FROM recommendations ORDER BY id DESC LIMIT 1",
"news_items": "SELECT created_at FROM news_items ORDER BY id DESC LIMIT 1",
"catalysts": "SELECT created_at FROM catalysts ORDER BY id DESC LIMIT 1",
}.items():
row = (await db.execute(text(sql))).fetchone()
table_rows[table_name] = dict(row._mapping) if row else {}
for row in rows:
r = row._mapping
source_text = str(r["source"] or "").lower()
matched = next((source for source in known_sources if source in source_text), source_text or "unknown")
if matched not in health:
health[matched] = {
"source": matched,
"status": "ok",
"error_count": 0,
"warning_count": 0,
"last_error": "",
"last_seen_at": "",
}
item = health[matched]
level_text = str(r["level"] or "error").lower()
if level_text in {"warning", "warn"}:
item["warning_count"] += 1
if item["status"] == "ok":
item["status"] = "warning"
else:
item["error_count"] += 1
item["status"] = "error"
if not item["last_error"]:
item["last_error"] = r["message"] or ""
item["last_seen_at"] = str(r["created_at"] or "")
return {
"days": days,
"sources": sorted(health.values(), key=lambda item: (item["status"] != "error", item["status"] != "warning", item["source"])),
"freshness": {
key: {k: str(v or "") for k, v in value.items()}
for key, value in table_rows.items()
},
"generated_at": datetime.now().isoformat(),
}
@router.get("/tasks")
async def get_tasks(_admin: dict = Depends(get_current_admin)):
"""后台任务中心摘要(管理员,只读)。"""
from app.engine.scheduler import scheduler
from app.engine.recommender import _scan_running, _scan_lock
jobs = []
for job in scheduler.get_jobs():
jobs.append({
"id": job.id,
"name": job.name,
"next_run_time": str(job.next_run_time) if job.next_run_time else "",
"trigger": str(job.trigger),
})
async with get_db() as db:
recent_scan = await db.execute(
text(
"SELECT scan_session, scan_mode, stage, status, output_count, summary, created_at "
"FROM scan_process_logs ORDER BY created_at DESC LIMIT 12"
)
)
recent_errors = await db.execute(
text(
"SELECT source, level, message, created_at FROM error_logs "
"ORDER BY created_at DESC LIMIT 8"
)
)
return {
"scheduler_running": scheduler.running,
"scan_running": _scan_running,
"scan_locked": _scan_lock.locked(),
"job_count": len(jobs),
"jobs": sorted(jobs, key=lambda item: item["next_run_time"] or "9999"),
"recent_scan_logs": [
{
"scan_session": r._mapping["scan_session"],
"scan_mode": r._mapping["scan_mode"] or "",
"stage": r._mapping["stage"],
"status": r._mapping["status"] or "ok",
"output_count": r._mapping["output_count"] or 0,
"summary": r._mapping["summary"] or "",
"created_at": str(r._mapping["created_at"] or ""),
}
for r in recent_scan.fetchall()
],
"recent_errors": [
{
"source": r._mapping["source"],
"level": r._mapping["level"],
"message": r._mapping["message"],
"created_at": str(r._mapping["created_at"] or ""),
}
for r in recent_errors.fetchall()
],
"generated_at": datetime.now().isoformat(),
}