alphax/app/db/operations_dashboard.py
2026-06-07 21:23:12 +08:00

422 lines
16 KiB
Python

"""Operations cockpit read model for system-wide health monitoring."""
from __future__ import annotations
from datetime import datetime, timedelta
from app.db.schema import get_conn
from app.db.scheduler_db import get_scheduler_overview
RETIRED_RUNTIME_JOBS = {"onchain"}
def _now() -> datetime:
return datetime.now()
def _safe_int(value, default: int = 0) -> int:
try:
return int(value or 0)
except Exception:
return default
def _safe_float(value, default: float = 0.0) -> float:
try:
return float(value or 0)
except Exception:
return default
def _parse_dt(value) -> datetime | None:
if isinstance(value, datetime):
return value.replace(tzinfo=None)
if not value:
return None
try:
return datetime.fromisoformat(str(value).replace("Z", "+00:00")).replace(tzinfo=None)
except Exception:
return None
def _iso(value) -> str:
dt = _parse_dt(value)
if dt:
return dt.isoformat(timespec="seconds")
return str(value or "")
def _age_seconds(value) -> int | None:
dt = _parse_dt(value)
if not dt:
return None
return max(0, int((_now() - dt).total_seconds()))
def _status_by_age(value, warn_seconds: int, danger_seconds: int, missing: str = "danger") -> str:
age = _age_seconds(value)
if age is None:
return missing
if age >= danger_seconds:
return "danger"
if age >= warn_seconds:
return "warn"
return "ok"
def _count(conn, sql: str, params=()) -> int:
try:
row = conn.execute(sql, params).fetchone()
return _safe_int(row[0] if row else 0)
except Exception:
return 0
def _fetchone(conn, sql: str, params=()) -> dict:
try:
row = conn.execute(sql, params).fetchone()
return dict(row) if row else {}
except Exception:
return {}
def _fetchall(conn, sql: str, params=()) -> list[dict]:
try:
return [dict(r) for r in conn.execute(sql, params).fetchall()]
except Exception:
return []
def _severity_rank(status: str) -> int:
return {"ok": 0, "warn": 1, "danger": 2}.get(str(status or ""), 0)
def _display_error_summary(message: str, source: str = "", error_type: str = "") -> str:
text = str(message or "").strip()
if not text:
return ""
lowered = text.lower()
source_text = f"{source} " if source else ""
if "feishu" in lowered or "lark" in lowered or "webhook" in lowered:
return "飞书通知配置或发送异常"
if "timeout" in lowered or "timed out" in lowered:
return f"{source_text}请求超时".strip()
if "connectionpool" in lowered or "max retries" in lowered:
return f"{source_text}外部服务连接异常".strip()
if "ssl" in lowered or "ssleoferror" in lowered:
return f"{source_text}外部服务 SSL 连接异常".strip()
if len(text) > 72:
return text[:72].rstrip() + "..."
return text
def _overall_status(cards: list[dict], errors_24h: int) -> dict:
worst = "ok"
if errors_24h > 0:
worst = "warn"
for item in cards:
if _severity_rank(item.get("status")) > _severity_rank(worst):
worst = item.get("status")
label = {"ok": "运行正常", "warn": "需要关注", "danger": "存在异常"}.get(worst, "运行正常")
return {
"status": worst,
"label": label,
"summary": "所有关键链路近期有心跳" if worst == "ok" else "部分链路延迟、失败或数据源不新鲜",
}
def _job_display_name(job_name: str) -> str:
return {
"event": "事件检查",
"tracker": "价格跟踪",
"paper-trader": "策略交易",
"market": "市场快照",
"confirm": "交易确认",
"screener": "异动筛选",
"sentiment": "舆情采集",
"llm-sentiment": "AI 舆情",
"review": "复盘迭代",
}.get(job_name, job_name)
def _build_scheduler_cards() -> tuple[list[dict], list[dict]]:
overview = get_scheduler_overview()
cards = []
timeline = []
for job in overview.get("jobs") or []:
if str(job.get("job_name") or "") in RETIRED_RUNTIME_JOBS:
continue
runtime = job.get("runtime") or {}
latest = job.get("latest_cron") or {}
enabled = bool(job.get("enabled"))
last_time = runtime.get("last_finished_at") or latest.get("finished_at") or runtime.get("updated_at")
interval = max(30, _safe_int(job.get("every_seconds"), 300))
status = "off"
if enabled:
status = _status_by_age(last_time, interval * 2, interval * 5, missing="warn")
if str(runtime.get("status") or "") in {"running", "pending"}:
status = "running"
if latest.get("run_status") == "error" or latest.get("error_message"):
status = "danger"
card = {
"job_name": job.get("job_name"),
"name": _job_display_name(job.get("job_name")),
"enabled": enabled,
"status": status,
"runtime_status": runtime.get("status") or ("disabled" if not enabled else "idle"),
"last_time": _iso(last_time),
"age_seconds": _age_seconds(last_time),
"next_run_at": _iso(runtime.get("next_run_at")),
"interval_seconds": interval,
"last_result": latest.get("run_status") or "",
"result_status": latest.get("result_status") or "",
"duration_ms": _safe_int(latest.get("duration_ms") or runtime.get("last_duration_ms")),
"error": _display_error_summary(latest.get("error_message") or runtime.get("last_error") or "", source=job.get("job_name") or ""),
}
cards.append(card)
if last_time:
timeline.append({
"time": _iso(last_time),
"type": "scheduler",
"title": f"{card['name']}完成",
"status": card["status"],
"detail": card["result_status"] or card["last_result"] or card["runtime_status"],
})
return cards, timeline
def _build_data_sources(conn) -> tuple[list[dict], list[dict]]:
now = _now()
sources = []
timeline = []
specs = [
("market", "市场快照", "market_snapshots", "snapshot_time", 600, 1800),
("prices", "实时价格", "latest_price_cache", "updated_at", 360, 900),
("sentiment", "新闻舆情", "event_news", "detected_at", 3600, 10800),
("llm", "AI 解读", "llm_insights", "updated_at", 7200, 21600),
]
for code, name, table, time_col, warn, danger in specs:
row = _fetchone(conn, f"SELECT MAX({time_col}) AS last_time, COUNT(*) AS total FROM {table}")
count_24h = _count(conn, f"SELECT COUNT(*) FROM {table} WHERE {time_col} >= %s", ((now - timedelta(hours=24)).isoformat(),))
last_time = row.get("last_time")
status = _status_by_age(last_time, warn, danger, missing="warn")
sources.append({
"code": code,
"name": name,
"status": status,
"last_time": _iso(last_time),
"age_seconds": _age_seconds(last_time),
"count_24h": count_24h,
"total": _safe_int(row.get("total")),
})
if last_time:
timeline.append({
"time": _iso(last_time),
"type": "data",
"title": f"{name}更新",
"status": status,
"detail": f"近 24h {count_24h}",
})
return sources, timeline
def _build_funnel(conn, since: str) -> list[dict]:
coverage = _fetchone(
conn,
"""
SELECT *
FROM screening_coverage_audit
ORDER BY scan_started_at DESC, id DESC
LIMIT 1
""",
)
confirm_count = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s", (since,))
buy_now = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s AND execution_status='buy_now'", (since,))
wait_pullback = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s AND execution_status='wait_pullback'", (since,))
observe = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s AND execution_status='observe'", (since,))
orders = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE created_at >= %s", (since,))
filled_orders = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE filled_at >= %s AND status='filled'", (since,))
trades = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE opened_at >= %s", (since,))
closed = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE closed_at >= %s AND status='closed'", (since,))
reviews = _count(conn, "SELECT COUNT(*) FROM review_log WHERE review_time >= %s", (since,))
return [
{
"code": "universe",
"label": "交易宇宙",
"value": _safe_int(coverage.get("tradable_universe_count")),
"sub": f"原始 {_safe_int(coverage.get('usdt_pair_count'))} / 过滤 {_safe_int(coverage.get('universe_gate_count'))}",
"status": _status_by_age(coverage.get("scan_finished_at"), 1800, 7200, missing="warn"),
},
{
"code": "discovery",
"label": "异动发现",
"value": _safe_int(coverage.get("coarse_candidate_count")),
"sub": f"强势榜 {_safe_int(coverage.get('top_gainer_discovery_count'))}",
"status": _status_by_age(coverage.get("scan_finished_at"), 1800, 7200, missing="warn"),
},
{
"code": "quality",
"label": "质量验证",
"value": _safe_int(coverage.get("fine_qualified_count")),
"sub": f"拒绝 {_safe_int(coverage.get('quality_rejected_count'))}",
"status": _status_by_age(coverage.get("scan_finished_at"), 1800, 7200, missing="warn"),
},
{
"code": "confirm",
"label": "交易确认",
"value": confirm_count,
"sub": f"可买 {buy_now} / 挂单 {wait_pullback} / 观察 {observe}",
"status": "ok" if confirm_count or observe or wait_pullback else "warn",
},
{
"code": "execution",
"label": "策略交易",
"value": orders + trades,
"sub": f"挂单 {orders} / 成交 {filled_orders} / 开仓 {trades} / 平仓 {closed}",
"status": "ok",
},
{
"code": "review",
"label": "复盘迭代",
"value": reviews,
"sub": "只统计窗口内复盘样本",
"status": "ok" if reviews else "warn",
},
]
def _build_trading(conn, since: str) -> dict:
open_count = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE status='open'")
pending_count = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE status='pending'")
filled_24h = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE status='filled' AND filled_at >= %s", (since,))
canceled_24h = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE status IN ('canceled','expired','rejected') AND COALESCE(canceled_at, updated_at, created_at) >= %s", (since,))
closed_24h = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE status='closed' AND closed_at >= %s", (since,))
pnl = _safe_float(_fetchone(conn, "SELECT COALESCE(SUM(realized_pnl_usdt),0) AS v FROM paper_trades WHERE status='closed' AND closed_at >= %s", (since,)).get("v"))
recent_orders = _fetchall(
conn,
"""
SELECT symbol, side, status, target_price, fill_price, cancel_reason,
COALESCE(filled_at, canceled_at, updated_at, created_at) AS event_time
FROM paper_orders
ORDER BY COALESCE(filled_at, canceled_at, updated_at, created_at) DESC, id DESC
LIMIT 8
""",
)
return {
"open_positions": open_count,
"pending_orders": pending_count,
"filled_orders_24h": filled_24h,
"canceled_orders_24h": canceled_24h,
"closed_trades_24h": closed_24h,
"realized_pnl_24h": round(pnl, 4),
"recent_orders": recent_orders,
"status": "warn" if canceled_24h > max(3, filled_24h * 2) else "ok",
}
def _build_timeline(conn, base_events: list[dict], since: str) -> list[dict]:
timeline = list(base_events)
for row in _fetchall(
conn,
"""
SELECT event_time AS time, event_type, symbol, message
FROM paper_trade_events
WHERE event_time >= %s
ORDER BY event_time DESC, id DESC
LIMIT 12
""",
(since,),
):
timeline.append({
"time": _iso(row.get("time")),
"type": "trade",
"title": f"{row.get('symbol') or '--'} · {row.get('event_type') or '交易事件'}",
"status": "ok",
"detail": _display_error_summary(row.get("message") or "", source="paper_trade"),
})
for row in _fetchall(
conn,
"""
SELECT started_at AS time, job_name, run_status, result_status, error_message
FROM cron_run_log
WHERE started_at >= %s
ORDER BY started_at DESC, id DESC
LIMIT 12
""",
(since,),
):
if str(row.get("job_name") or "") in RETIRED_RUNTIME_JOBS:
continue
status = "danger" if row.get("run_status") == "error" or row.get("error_message") else "ok"
timeline.append({
"time": _iso(row.get("time")),
"type": "job",
"title": f"{row.get('job_name') or '--'} 运行",
"status": status,
"detail": _display_error_summary(row.get("error_message") or "", source=row.get("job_name") or "")
or row.get("result_status")
or row.get("run_status")
or "",
})
timeline.sort(key=lambda x: _parse_dt(x.get("time")) or datetime.min, reverse=True)
return timeline[:28]
def get_operations_dashboard(hours: int = 24) -> dict:
hours = max(1, min(_safe_int(hours, 24), 168))
since = (_now() - timedelta(hours=hours)).isoformat()
scheduler, scheduler_events = _build_scheduler_cards()
conn = get_conn()
try:
data_sources, source_events = _build_data_sources(conn)
funnel = _build_funnel(conn, since)
trading = _build_trading(conn, since)
errors_24h = _count(conn, "SELECT COUNT(*) FROM system_error_log WHERE created_at >= %s AND resolved_at=''", (since,))
latest_error = _fetchone(
conn,
"""
SELECT created_at, source, error_type, message
FROM system_error_log
WHERE created_at >= %s AND resolved_at=''
ORDER BY created_at DESC, id DESC
LIMIT 1
""",
(since,),
)
if latest_error:
latest_error["display_message"] = _display_error_summary(
latest_error.get("message") or "",
source=latest_error.get("source") or "",
error_type=latest_error.get("error_type") or "",
)
timeline = _build_timeline(conn, scheduler_events + source_events, since)
finally:
conn.close()
health_inputs = [x for x in scheduler if x.get("enabled")] + data_sources + [trading]
overall = _overall_status(health_inputs, errors_24h)
return {
"hours": hours,
"generated_at": _now().isoformat(timespec="seconds"),
"overall": overall,
"summary": {
"enabled_jobs": sum(1 for x in scheduler if x.get("enabled")),
"running_jobs": sum(1 for x in scheduler if x.get("runtime_status") == "running"),
"data_sources_ok": sum(1 for x in data_sources if x.get("status") == "ok"),
"active_opportunities": _safe_int(next((x.get("value") for x in funnel if x.get("code") == "confirm"), 0)),
"pending_orders": trading.get("pending_orders", 0),
"open_positions": trading.get("open_positions", 0),
"system_errors": errors_24h,
"latest_error": latest_error,
},
"scheduler": scheduler,
"data_sources": data_sources,
"funnel": funnel,
"trading": trading,
"timeline": timeline,
}
__all__ = ["get_operations_dashboard"]