"""Operations cockpit read model for system-wide health monitoring.""" from __future__ import annotations import json from datetime import datetime, timedelta from app.db.schema import get_conn from app.db.scheduler_db import get_scheduler_overview RETIRED_RUNTIME_JOBS = {"onchain"} def _now() -> datetime: return datetime.now() def _safe_int(value, default: int = 0) -> int: try: return int(value or 0) except Exception: return default def _safe_float(value, default: float = 0.0) -> float: try: return float(value or 0) except Exception: return default def _parse_dt(value) -> datetime | None: if isinstance(value, datetime): return value.replace(tzinfo=None) if not value: return None try: return datetime.fromisoformat(str(value).replace("Z", "+00:00")).replace(tzinfo=None) except Exception: return None def _iso(value) -> str: dt = _parse_dt(value) if dt: return dt.isoformat(timespec="seconds") return str(value or "") def _age_seconds(value) -> int | None: dt = _parse_dt(value) if not dt: return None return max(0, int((_now() - dt).total_seconds())) def _status_by_age(value, warn_seconds: int, danger_seconds: int, missing: str = "danger") -> str: age = _age_seconds(value) if age is None: return missing if age >= danger_seconds: return "danger" if age >= warn_seconds: return "warn" return "ok" def _count(conn, sql: str, params=()) -> int: try: row = conn.execute(sql, params).fetchone() return _safe_int(row[0] if row else 0) except Exception: return 0 def _fetchone(conn, sql: str, params=()) -> dict: try: row = conn.execute(sql, params).fetchone() return dict(row) if row else {} except Exception: return {} def _fetchall(conn, sql: str, params=()) -> list[dict]: try: return [dict(r) for r in conn.execute(sql, params).fetchall()] except Exception: return [] def _severity_rank(status: str) -> int: return {"ok": 0, "warn": 1, "danger": 2}.get(str(status or ""), 0) def _display_error_summary(message: str, source: str = "", error_type: str = "") -> str: text = str(message or "").strip() if not text: return "" lowered = text.lower() source_text = f"{source} " if source else "" if "feishu" in lowered or "lark" in lowered or "webhook" in lowered: return "飞书通知配置或发送异常" if "timeout" in lowered or "timed out" in lowered: return f"{source_text}请求超时".strip() if "connectionpool" in lowered or "max retries" in lowered: return f"{source_text}外部服务连接异常".strip() if "ssl" in lowered or "ssleoferror" in lowered: return f"{source_text}外部服务 SSL 连接异常".strip() if len(text) > 72: return text[:72].rstrip() + "..." return text def _overall_status(cards: list[dict], errors_24h: int) -> dict: worst = "ok" if errors_24h > 0: worst = "warn" for item in cards: if _severity_rank(item.get("status")) > _severity_rank(worst): worst = item.get("status") label = {"ok": "运行正常", "warn": "需要关注", "danger": "存在异常"}.get(worst, "运行正常") return { "status": worst, "label": label, "summary": "所有关键链路近期有心跳" if worst == "ok" else "部分链路延迟、失败或数据源不新鲜", } def _job_display_name(job_name: str) -> str: return { "event": "事件检查", "tracker": "价格跟踪", "paper-trader": "策略交易", "market": "市场快照", "confirm": "交易确认", "screener": "异动筛选", "sentiment": "舆情采集", "llm-sentiment": "AI 舆情", "review": "复盘迭代", }.get(job_name, job_name) def _build_scheduler_cards() -> tuple[list[dict], list[dict]]: overview = get_scheduler_overview() cards = [] timeline = [] for job in overview.get("jobs") or []: if str(job.get("job_name") or "") in RETIRED_RUNTIME_JOBS: continue runtime = job.get("runtime") or {} latest = job.get("latest_cron") or {} enabled = bool(job.get("enabled")) last_time = runtime.get("last_finished_at") or latest.get("finished_at") or runtime.get("updated_at") interval = max(30, _safe_int(job.get("every_seconds"), 300)) status = "off" if enabled: status = _status_by_age(last_time, interval * 2, interval * 5, missing="warn") if str(runtime.get("status") or "") in {"running", "pending"}: status = "running" if latest.get("run_status") == "error" or latest.get("error_message"): status = "danger" card = { "job_name": job.get("job_name"), "name": _job_display_name(job.get("job_name")), "enabled": enabled, "status": status, "runtime_status": runtime.get("status") or ("disabled" if not enabled else "idle"), "last_time": _iso(last_time), "age_seconds": _age_seconds(last_time), "next_run_at": _iso(runtime.get("next_run_at")), "interval_seconds": interval, "last_result": latest.get("run_status") or "", "result_status": latest.get("result_status") or "", "duration_ms": _safe_int(latest.get("duration_ms") or runtime.get("last_duration_ms")), "error": _display_error_summary(latest.get("error_message") or runtime.get("last_error") or "", source=job.get("job_name") or ""), } cards.append(card) if last_time: timeline.append({ "time": _iso(last_time), "type": "scheduler", "title": f"{card['name']}完成", "status": card["status"], "detail": card["result_status"] or card["last_result"] or card["runtime_status"], }) return cards, timeline def _build_data_sources(conn) -> tuple[list[dict], list[dict]]: now = _now() sources = [] timeline = [] specs = [ ("market", "市场快照", "market_snapshots", "snapshot_time", 600, 1800), ("prices", "实时价格", "latest_price_cache", "updated_at", 360, 900), ("sentiment", "新闻舆情", "event_news", "detected_at", 3600, 10800), ("llm", "AI 解读", "llm_insights", "updated_at", 7200, 21600), ] for code, name, table, time_col, warn, danger in specs: row = _fetchone(conn, f"SELECT MAX({time_col}) AS last_time, COUNT(*) AS total FROM {table}") count_24h = _count(conn, f"SELECT COUNT(*) FROM {table} WHERE {time_col} >= %s", ((now - timedelta(hours=24)).isoformat(),)) last_time = row.get("last_time") status = _status_by_age(last_time, warn, danger, missing="warn") sources.append({ "code": code, "name": name, "status": status, "last_time": _iso(last_time), "age_seconds": _age_seconds(last_time), "count_24h": count_24h, "total": _safe_int(row.get("total")), }) if last_time: timeline.append({ "time": _iso(last_time), "type": "data", "title": f"{name}更新", "status": status, "detail": f"近 24h {count_24h} 条", }) return sources, timeline def _build_funnel(conn, since: str) -> list[dict]: coverage = _fetchone( conn, """ SELECT * FROM screening_coverage_audit ORDER BY scan_started_at DESC, id DESC LIMIT 1 """, ) confirm_count = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s", (since,)) buy_now = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s AND execution_status='buy_now'", (since,)) wait_pullback = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s AND execution_status='wait_pullback'", (since,)) observe = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s AND execution_status='observe'", (since,)) orders = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE created_at >= %s", (since,)) filled_orders = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE filled_at >= %s AND status='filled'", (since,)) trades = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE opened_at >= %s", (since,)) closed = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE closed_at >= %s AND status='closed'", (since,)) reviews = _count(conn, "SELECT COUNT(*) FROM review_log WHERE review_time >= %s", (since,)) return [ { "code": "universe", "label": "交易宇宙", "value": _safe_int(coverage.get("tradable_universe_count")), "sub": f"原始 {_safe_int(coverage.get('usdt_pair_count'))} / 过滤 {_safe_int(coverage.get('universe_gate_count'))}", "status": _status_by_age(coverage.get("scan_finished_at"), 1800, 7200, missing="warn"), }, { "code": "discovery", "label": "异动发现", "value": _safe_int(coverage.get("coarse_candidate_count")), "sub": f"强势榜 {_safe_int(coverage.get('top_gainer_discovery_count'))}", "status": _status_by_age(coverage.get("scan_finished_at"), 1800, 7200, missing="warn"), }, { "code": "quality", "label": "质量验证", "value": _safe_int(coverage.get("fine_qualified_count")), "sub": f"拒绝 {_safe_int(coverage.get('quality_rejected_count'))}", "status": _status_by_age(coverage.get("scan_finished_at"), 1800, 7200, missing="warn"), }, { "code": "confirm", "label": "交易确认", "value": confirm_count, "sub": f"可买 {buy_now} / 挂单 {wait_pullback} / 观察 {observe}", "status": "ok" if confirm_count or observe or wait_pullback else "warn", }, { "code": "execution", "label": "策略交易", "value": orders + trades, "sub": f"挂单 {orders} / 成交 {filled_orders} / 开仓 {trades} / 平仓 {closed}", "status": "ok", }, { "code": "review", "label": "复盘迭代", "value": reviews, "sub": "只统计窗口内复盘样本", "status": "ok" if reviews else "warn", }, ] def _build_trading(conn, since: str) -> dict: open_count = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE status='open'") pending_count = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE status='pending'") filled_24h = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE status='filled' AND filled_at >= %s", (since,)) canceled_24h = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE status IN ('canceled','expired','rejected') AND COALESCE(canceled_at, updated_at, created_at) >= %s", (since,)) closed_24h = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE status='closed' AND closed_at >= %s", (since,)) pnl = _safe_float(_fetchone(conn, "SELECT COALESCE(SUM(realized_pnl_usdt),0) AS v FROM paper_trades WHERE status='closed' AND closed_at >= %s", (since,)).get("v")) recent_orders = _fetchall( conn, """ SELECT symbol, side, status, target_price, fill_price, cancel_reason, COALESCE(filled_at, canceled_at, updated_at, created_at) AS event_time FROM paper_orders ORDER BY COALESCE(filled_at, canceled_at, updated_at, created_at) DESC, id DESC LIMIT 8 """, ) return { "open_positions": open_count, "pending_orders": pending_count, "filled_orders_24h": filled_24h, "canceled_orders_24h": canceled_24h, "closed_trades_24h": closed_24h, "realized_pnl_24h": round(pnl, 4), "recent_orders": recent_orders, "status": "warn" if canceled_24h > max(3, filled_24h * 2) else "ok", } def _build_timeline(conn, base_events: list[dict], since: str) -> list[dict]: timeline = list(base_events) seen_trade_events = set() for row in _fetchall( conn, """ SELECT event_time AS time, event_type, symbol, message, recommendation_id, strategy_code, detail_json FROM paper_trade_events WHERE event_time >= %s ORDER BY event_time DESC, id DESC LIMIT 80 """, (since,), ): detail = {} try: raw_detail = row.get("detail_json") detail = json.loads(raw_detail) if isinstance(raw_detail, str) and raw_detail.strip() else (raw_detail or {}) except Exception: detail = {} if row.get("event_type") == "paper_gate_reject": reject_key = ( row.get("recommendation_id") or detail.get("recommendation_id") or detail.get("rec_id") or "", row.get("strategy_code") or detail.get("strategy_code") or "", detail.get("reason") or "", tuple(detail.get("gate_reasons") if isinstance(detail.get("gate_reasons"), list) else []), ) if reject_key in seen_trade_events: continue seen_trade_events.add(reject_key) timeline.append({ "time": _iso(row.get("time")), "type": "trade", "title": f"{row.get('symbol') or '--'} · {row.get('event_type') or '交易事件'}", "status": "warn" if row.get("event_type") == "paper_gate_reject" else "ok", "detail": _display_error_summary(row.get("message") or "", source="paper_trade"), }) if len(seen_trade_events) >= 12: break for row in _fetchall( conn, """ SELECT started_at AS time, job_name, run_status, result_status, error_message FROM cron_run_log WHERE started_at >= %s ORDER BY started_at DESC, id DESC LIMIT 12 """, (since,), ): if str(row.get("job_name") or "") in RETIRED_RUNTIME_JOBS: continue status = "danger" if row.get("run_status") == "error" or row.get("error_message") else "ok" timeline.append({ "time": _iso(row.get("time")), "type": "job", "title": f"{row.get('job_name') or '--'} 运行", "status": status, "detail": _display_error_summary(row.get("error_message") or "", source=row.get("job_name") or "") or row.get("result_status") or row.get("run_status") or "", }) timeline.sort(key=lambda x: _parse_dt(x.get("time")) or datetime.min, reverse=True) return timeline[:28] def get_operations_dashboard(hours: int = 24) -> dict: hours = max(1, min(_safe_int(hours, 24), 168)) since = (_now() - timedelta(hours=hours)).isoformat() scheduler, scheduler_events = _build_scheduler_cards() conn = get_conn() try: data_sources, source_events = _build_data_sources(conn) funnel = _build_funnel(conn, since) trading = _build_trading(conn, since) errors_24h = _count(conn, "SELECT COUNT(*) FROM system_error_log WHERE created_at >= %s AND resolved_at=''", (since,)) latest_error = _fetchone( conn, """ SELECT created_at, source, error_type, message FROM system_error_log WHERE created_at >= %s AND resolved_at='' ORDER BY created_at DESC, id DESC LIMIT 1 """, (since,), ) if latest_error: latest_error["display_message"] = _display_error_summary( latest_error.get("message") or "", source=latest_error.get("source") or "", error_type=latest_error.get("error_type") or "", ) timeline = _build_timeline(conn, scheduler_events + source_events, since) finally: conn.close() health_inputs = [x for x in scheduler if x.get("enabled")] + data_sources + [trading] overall = _overall_status(health_inputs, errors_24h) return { "hours": hours, "generated_at": _now().isoformat(timespec="seconds"), "overall": overall, "summary": { "enabled_jobs": sum(1 for x in scheduler if x.get("enabled")), "running_jobs": sum(1 for x in scheduler if x.get("runtime_status") == "running"), "data_sources_ok": sum(1 for x in data_sources if x.get("status") == "ok"), "active_opportunities": _safe_int(next((x.get("value") for x in funnel if x.get("code") == "confirm"), 0)), "pending_orders": trading.get("pending_orders", 0), "open_positions": trading.get("open_positions", 0), "system_errors": errors_24h, "latest_error": latest_error, }, "scheduler": scheduler, "data_sources": data_sources, "funnel": funnel, "trading": trading, "timeline": timeline, } __all__ = ["get_operations_dashboard"]