diff --git a/app/core/order_lifecycle.py b/app/core/order_lifecycle.py index 910cb53..315db2b 100644 --- a/app/core/order_lifecycle.py +++ b/app/core/order_lifecycle.py @@ -10,6 +10,8 @@ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timedelta +from app.core.trade_math import normalize_side + def _safe_float(value, default: float = 0.0) -> float: try: @@ -46,7 +48,7 @@ def order_expires_at(event_time: str, expire_hours: float = 24.0) -> str: def order_touched(order: dict, current_price: float) -> bool: - side = str(order.get("side") or "long").lower() + side = normalize_side(order.get("side")) target = _safe_float(order.get("target_price")) price = _safe_float(current_price) if target <= 0 or price <= 0: @@ -60,7 +62,7 @@ def order_too_far(order: dict, current_price: float, threshold_pct: float = 12.0 threshold = max(0.0, _safe_float(threshold_pct, 12.0)) if threshold <= 0: return False - side = str(order.get("side") or "long").lower() + side = normalize_side(order.get("side")) target = _safe_float(order.get("target_price")) price = _safe_float(current_price) if target <= 0 or price <= 0: @@ -71,7 +73,7 @@ def order_too_far(order: dict, current_price: float, threshold_pct: float = 12.0 def order_rr(side: str, target: float, stop_loss: float, tp1: float) -> float: - if str(side or "long").lower() == "short": + if normalize_side(side) == "short": risk = _safe_float(stop_loss) - _safe_float(target) reward = _safe_float(target) - _safe_float(tp1) else: @@ -87,7 +89,7 @@ def order_distance_pct(side: str, current_price: float, target: float) -> float: target_price = _safe_float(target) if target_price <= 0 or price <= 0: return 999.0 - if str(side or "long").lower() == "short": + if normalize_side(side) == "short": return max(0.0, (target_price / price - 1) * 100) return max(0.0, (price / target_price - 1) * 100) diff --git a/app/core/trade_math.py b/app/core/trade_math.py index 4e0ecfe..6fd4686 100644 --- a/app/core/trade_math.py +++ b/app/core/trade_math.py @@ -13,7 +13,10 @@ def safe_float(value, default: float = 0.0) -> float: def normalize_side(side: str | None) -> str: - return "short" if str(side or "").strip().lower() == "short" else "long" + text = str(side or "").strip().lower() + if text in {"short", "sell", "空", "空头", "做空", "开空"} or "空" in text: + return "short" + return "long" def open_price(side: str, current_price: float, slippage_pct: float = 0.0) -> float: diff --git a/app/db/operations_dashboard.py b/app/db/operations_dashboard.py new file mode 100644 index 0000000..5ed137b --- /dev/null +++ b/app/db/operations_dashboard.py @@ -0,0 +1,438 @@ +"""Operations cockpit read model for system-wide health monitoring.""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +from app.db.schema import get_conn +from app.db.scheduler_db import get_scheduler_overview + + +def _now() -> datetime: + return datetime.now() + + +def _safe_int(value, default: int = 0) -> int: + try: + return int(value or 0) + except Exception: + return default + + +def _safe_float(value, default: float = 0.0) -> float: + try: + return float(value or 0) + except Exception: + return default + + +def _parse_dt(value) -> datetime | None: + if isinstance(value, datetime): + return value.replace(tzinfo=None) + if not value: + return None + try: + return datetime.fromisoformat(str(value).replace("Z", "+00:00")).replace(tzinfo=None) + except Exception: + return None + + +def _iso(value) -> str: + dt = _parse_dt(value) + if dt: + return dt.isoformat(timespec="seconds") + return str(value or "") + + +def _age_seconds(value) -> int | None: + dt = _parse_dt(value) + if not dt: + return None + return max(0, int((_now() - dt).total_seconds())) + + +def _status_by_age(value, warn_seconds: int, danger_seconds: int, missing: str = "danger") -> str: + age = _age_seconds(value) + if age is None: + return missing + if age >= danger_seconds: + return "danger" + if age >= warn_seconds: + return "warn" + return "ok" + + +def _count(conn, sql: str, params=()) -> int: + try: + row = conn.execute(sql, params).fetchone() + return _safe_int(row[0] if row else 0) + except Exception: + return 0 + + +def _fetchone(conn, sql: str, params=()) -> dict: + try: + row = conn.execute(sql, params).fetchone() + return dict(row) if row else {} + except Exception: + return {} + + +def _fetchall(conn, sql: str, params=()) -> list[dict]: + try: + return [dict(r) for r in conn.execute(sql, params).fetchall()] + except Exception: + return [] + + +def _severity_rank(status: str) -> int: + return {"ok": 0, "warn": 1, "danger": 2}.get(str(status or ""), 0) + + +def _display_error_summary(message: str, source: str = "", error_type: str = "") -> str: + text = str(message or "").strip() + if not text: + return "" + lowered = text.lower() + source_text = f"{source} " if source else "" + if "alchemy" in lowered and ( + "ssl" in lowered + or "httpsconnectionpool" in lowered + or "max retries" in lowered + or "ssleoferror" in lowered + or "nameresolution" in lowered + or "name resolution" in lowered + ): + symbol = text.split(":", 1)[0] if "/USDT" in text.split(":", 1)[0] else "" + prefix = f"{symbol} · " if symbol else "" + return f"{prefix}Alchemy 链上数据源连接异常" + if "nodereal" in lowered and ( + "ssl" in lowered + or "httpsconnectionpool" in lowered + or "max retries" in lowered + or "timeout" in lowered + or "nameresolution" in lowered + or "name resolution" in lowered + ): + symbol = text.split(":", 1)[0] if "/USDT" in text.split(":", 1)[0] else "" + prefix = f"{symbol} · " if symbol else "" + return f"{prefix}NodeReal 链上数据源连接异常" + if "feishu" in lowered or "lark" in lowered or "webhook" in lowered: + return "飞书通知配置或发送异常" + if "timeout" in lowered or "timed out" in lowered: + return f"{source_text}请求超时".strip() + if "connectionpool" in lowered or "max retries" in lowered: + return f"{source_text}外部服务连接异常".strip() + if "ssl" in lowered or "ssleoferror" in lowered: + return f"{source_text}外部服务 SSL 连接异常".strip() + if len(text) > 72: + return text[:72].rstrip() + "..." + return text + + +def _overall_status(cards: list[dict], errors_24h: int) -> dict: + worst = "ok" + if errors_24h > 0: + worst = "warn" + for item in cards: + if _severity_rank(item.get("status")) > _severity_rank(worst): + worst = item.get("status") + label = {"ok": "运行正常", "warn": "需要关注", "danger": "存在异常"}.get(worst, "运行正常") + return { + "status": worst, + "label": label, + "summary": "所有关键链路近期有心跳" if worst == "ok" else "部分链路延迟、失败或数据源不新鲜", + } + + +def _job_display_name(job_name: str) -> str: + return { + "event": "事件检查", + "tracker": "价格跟踪", + "paper-trader": "策略交易", + "market": "市场快照", + "confirm": "交易确认", + "screener": "异动筛选", + "sentiment": "舆情采集", + "onchain": "链上追踪", + "llm-sentiment": "AI 舆情", + "review": "复盘迭代", + }.get(job_name, job_name) + + +def _build_scheduler_cards() -> tuple[list[dict], list[dict]]: + overview = get_scheduler_overview() + cards = [] + timeline = [] + for job in overview.get("jobs") or []: + runtime = job.get("runtime") or {} + latest = job.get("latest_cron") or {} + enabled = bool(job.get("enabled")) + last_time = runtime.get("last_finished_at") or latest.get("finished_at") or runtime.get("updated_at") + interval = max(30, _safe_int(job.get("every_seconds"), 300)) + status = "off" + if enabled: + status = _status_by_age(last_time, interval * 2, interval * 5, missing="warn") + if str(runtime.get("status") or "") in {"running", "pending"}: + status = "running" + if latest.get("run_status") == "error" or latest.get("error_message"): + status = "danger" + card = { + "job_name": job.get("job_name"), + "name": _job_display_name(job.get("job_name")), + "enabled": enabled, + "status": status, + "runtime_status": runtime.get("status") or ("disabled" if not enabled else "idle"), + "last_time": _iso(last_time), + "age_seconds": _age_seconds(last_time), + "next_run_at": _iso(runtime.get("next_run_at")), + "interval_seconds": interval, + "last_result": latest.get("run_status") or "", + "result_status": latest.get("result_status") or "", + "duration_ms": _safe_int(latest.get("duration_ms") or runtime.get("last_duration_ms")), + "error": _display_error_summary(latest.get("error_message") or runtime.get("last_error") or "", source=job.get("job_name") or ""), + } + cards.append(card) + if last_time: + timeline.append({ + "time": _iso(last_time), + "type": "scheduler", + "title": f"{card['name']}完成", + "status": card["status"], + "detail": card["result_status"] or card["last_result"] or card["runtime_status"], + }) + return cards, timeline + + +def _build_data_sources(conn) -> tuple[list[dict], list[dict]]: + now = _now() + sources = [] + timeline = [] + specs = [ + ("market", "市场快照", "market_snapshots", "snapshot_time", 600, 1800), + ("prices", "实时价格", "latest_price_cache", "updated_at", 360, 900), + ("sentiment", "新闻舆情", "event_news", "detected_at", 3600, 10800), + ("onchain", "链上事件", "onchain_raw_events", "detected_at", 7200, 21600), + ("llm", "AI 解读", "llm_insights", "updated_at", 7200, 21600), + ] + for code, name, table, time_col, warn, danger in specs: + row = _fetchone(conn, f"SELECT MAX({time_col}) AS last_time, COUNT(*) AS total FROM {table}") + count_24h = _count(conn, f"SELECT COUNT(*) FROM {table} WHERE {time_col} >= %s", ((now - timedelta(hours=24)).isoformat(),)) + last_time = row.get("last_time") + status = _status_by_age(last_time, warn, danger, missing="warn") + sources.append({ + "code": code, + "name": name, + "status": status, + "last_time": _iso(last_time), + "age_seconds": _age_seconds(last_time), + "count_24h": count_24h, + "total": _safe_int(row.get("total")), + }) + if last_time: + timeline.append({ + "time": _iso(last_time), + "type": "data", + "title": f"{name}更新", + "status": status, + "detail": f"近 24h {count_24h} 条", + }) + return sources, timeline + + +def _build_funnel(conn, since: str) -> list[dict]: + coverage = _fetchone( + conn, + """ + SELECT * + FROM screening_coverage_audit + ORDER BY scan_started_at DESC, id DESC + LIMIT 1 + """, + ) + confirm_count = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s", (since,)) + buy_now = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s AND execution_status='buy_now'", (since,)) + wait_pullback = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s AND execution_status='wait_pullback'", (since,)) + observe = _count(conn, "SELECT COUNT(*) FROM recommendation WHERE rec_time >= %s AND execution_status='observe'", (since,)) + orders = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE created_at >= %s", (since,)) + filled_orders = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE filled_at >= %s AND status='filled'", (since,)) + trades = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE opened_at >= %s", (since,)) + closed = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE closed_at >= %s AND status='closed'", (since,)) + reviews = _count(conn, "SELECT COUNT(*) FROM review_log WHERE review_time >= %s", (since,)) + return [ + { + "code": "universe", + "label": "交易宇宙", + "value": _safe_int(coverage.get("tradable_universe_count")), + "sub": f"原始 {_safe_int(coverage.get('usdt_pair_count'))} / 过滤 {_safe_int(coverage.get('universe_gate_count'))}", + "status": _status_by_age(coverage.get("scan_finished_at"), 1800, 7200, missing="warn"), + }, + { + "code": "discovery", + "label": "异动发现", + "value": _safe_int(coverage.get("coarse_candidate_count")), + "sub": f"强势榜 {_safe_int(coverage.get('top_gainer_discovery_count'))}", + "status": _status_by_age(coverage.get("scan_finished_at"), 1800, 7200, missing="warn"), + }, + { + "code": "quality", + "label": "质量验证", + "value": _safe_int(coverage.get("fine_qualified_count")), + "sub": f"拒绝 {_safe_int(coverage.get('quality_rejected_count'))}", + "status": _status_by_age(coverage.get("scan_finished_at"), 1800, 7200, missing="warn"), + }, + { + "code": "confirm", + "label": "交易确认", + "value": confirm_count, + "sub": f"可买 {buy_now} / 挂单 {wait_pullback} / 观察 {observe}", + "status": "ok" if confirm_count or observe or wait_pullback else "warn", + }, + { + "code": "execution", + "label": "策略交易", + "value": orders + trades, + "sub": f"挂单 {orders} / 成交 {filled_orders} / 开仓 {trades} / 平仓 {closed}", + "status": "ok", + }, + { + "code": "review", + "label": "复盘迭代", + "value": reviews, + "sub": "只统计窗口内复盘样本", + "status": "ok" if reviews else "warn", + }, + ] + + +def _build_trading(conn, since: str) -> dict: + open_count = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE status='open'") + pending_count = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE status='pending'") + filled_24h = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE status='filled' AND filled_at >= %s", (since,)) + canceled_24h = _count(conn, "SELECT COUNT(*) FROM paper_orders WHERE status IN ('canceled','expired','rejected') AND COALESCE(canceled_at, updated_at, created_at) >= %s", (since,)) + closed_24h = _count(conn, "SELECT COUNT(*) FROM paper_trades WHERE status='closed' AND closed_at >= %s", (since,)) + pnl = _safe_float(_fetchone(conn, "SELECT COALESCE(SUM(realized_pnl_usdt),0) AS v FROM paper_trades WHERE status='closed' AND closed_at >= %s", (since,)).get("v")) + recent_orders = _fetchall( + conn, + """ + SELECT symbol, side, status, target_price, fill_price, cancel_reason, + COALESCE(filled_at, canceled_at, updated_at, created_at) AS event_time + FROM paper_orders + ORDER BY COALESCE(filled_at, canceled_at, updated_at, created_at) DESC, id DESC + LIMIT 8 + """, + ) + return { + "open_positions": open_count, + "pending_orders": pending_count, + "filled_orders_24h": filled_24h, + "canceled_orders_24h": canceled_24h, + "closed_trades_24h": closed_24h, + "realized_pnl_24h": round(pnl, 4), + "recent_orders": recent_orders, + "status": "warn" if canceled_24h > max(3, filled_24h * 2) else "ok", + } + + +def _build_timeline(conn, base_events: list[dict], since: str) -> list[dict]: + timeline = list(base_events) + for row in _fetchall( + conn, + """ + SELECT event_time AS time, event_type, symbol, message + FROM paper_trade_events + WHERE event_time >= %s + ORDER BY event_time DESC, id DESC + LIMIT 12 + """, + (since,), + ): + timeline.append({ + "time": _iso(row.get("time")), + "type": "trade", + "title": f"{row.get('symbol') or '--'} · {row.get('event_type') or '交易事件'}", + "status": "ok", + "detail": _display_error_summary(row.get("message") or "", source="paper_trade"), + }) + for row in _fetchall( + conn, + """ + SELECT started_at AS time, job_name, run_status, result_status, error_message + FROM cron_run_log + WHERE started_at >= %s + ORDER BY started_at DESC, id DESC + LIMIT 12 + """, + (since,), + ): + status = "danger" if row.get("run_status") == "error" or row.get("error_message") else "ok" + timeline.append({ + "time": _iso(row.get("time")), + "type": "job", + "title": f"{row.get('job_name') or '--'} 运行", + "status": status, + "detail": _display_error_summary(row.get("error_message") or "", source=row.get("job_name") or "") + or row.get("result_status") + or row.get("run_status") + or "", + }) + timeline.sort(key=lambda x: _parse_dt(x.get("time")) or datetime.min, reverse=True) + return timeline[:28] + + +def get_operations_dashboard(hours: int = 24) -> dict: + hours = max(1, min(_safe_int(hours, 24), 168)) + since = (_now() - timedelta(hours=hours)).isoformat() + scheduler, scheduler_events = _build_scheduler_cards() + conn = get_conn() + try: + data_sources, source_events = _build_data_sources(conn) + funnel = _build_funnel(conn, since) + trading = _build_trading(conn, since) + errors_24h = _count(conn, "SELECT COUNT(*) FROM system_error_log WHERE created_at >= %s AND resolved_at=''", (since,)) + latest_error = _fetchone( + conn, + """ + SELECT created_at, source, error_type, message + FROM system_error_log + WHERE created_at >= %s AND resolved_at='' + ORDER BY created_at DESC, id DESC + LIMIT 1 + """, + (since,), + ) + if latest_error: + latest_error["display_message"] = _display_error_summary( + latest_error.get("message") or "", + source=latest_error.get("source") or "", + error_type=latest_error.get("error_type") or "", + ) + timeline = _build_timeline(conn, scheduler_events + source_events, since) + finally: + conn.close() + + health_inputs = [x for x in scheduler if x.get("enabled")] + data_sources + [trading] + overall = _overall_status(health_inputs, errors_24h) + return { + "hours": hours, + "generated_at": _now().isoformat(timespec="seconds"), + "overall": overall, + "summary": { + "enabled_jobs": sum(1 for x in scheduler if x.get("enabled")), + "running_jobs": sum(1 for x in scheduler if x.get("runtime_status") == "running"), + "data_sources_ok": sum(1 for x in data_sources if x.get("status") == "ok"), + "active_opportunities": _safe_int(next((x.get("value") for x in funnel if x.get("code") == "confirm"), 0)), + "pending_orders": trading.get("pending_orders", 0), + "open_positions": trading.get("open_positions", 0), + "system_errors": errors_24h, + "latest_error": latest_error, + }, + "scheduler": scheduler, + "data_sources": data_sources, + "funnel": funnel, + "trading": trading, + "timeline": timeline, + } + + +__all__ = ["get_operations_dashboard"] diff --git a/app/db/paper_trading.py b/app/db/paper_trading.py index a4dfb9b..aecd2d0 100644 --- a/app/db/paper_trading.py +++ b/app/db/paper_trading.py @@ -754,14 +754,16 @@ def _open_trade(conn, rec: dict, current_price: float, event_time: str, config: "leverage_risk": leverage_risk, }, } - global_ok, global_detail = _global_risk_entry_check(conn, rec, notional, cfg) - if not global_ok: - return { - "opened": False, - "skipped": True, - "reason": "global_risk_rejected", - "risk_detail": global_detail, - } + global_detail = rec.get("_prefilled_global_risk") if isinstance(rec.get("_prefilled_global_risk"), dict) else None + if global_detail is None: + global_ok, global_detail = _global_risk_entry_check(conn, rec, notional, cfg) + if not global_ok: + return { + "opened": False, + "skipped": True, + "reason": "global_risk_rejected", + "risk_detail": global_detail, + } adjusted_notional = _market_risk_adjusted_notional(notional, global_detail, cfg) if adjusted_notional != notional: plan["market_position_sizing"] = { @@ -1014,6 +1016,24 @@ def _cancel_paper_order(conn, order: dict, reason: str, event_time: str) -> dict return {"skipped": True, "reason": f"paper_order_{reason}", "paper_order_id": order["id"]} +def _touch_global_risk_cancel_reason(global_detail: dict | None) -> str: + """Only hard account/portfolio gates should cancel an already-touched order.""" + detail = global_detail if isinstance(global_detail, dict) else {} + decision = str(detail.get("decision") or "").strip() + position_multiplier = _safe_float(detail.get("position_multiplier"), 1.0) + if position_multiplier <= 0: + return "touch_position_multiplier_zero" + if decision == "block_critical": + return "touch_critical_risk" + if decision == "block_max_open_positions": + return "touch_max_open_positions" + if decision == "block_same_direction_concentration": + return "touch_same_direction_concentration" + if decision == "block_same_sector_concentration": + return "touch_same_sector_concentration" + return "" + + def _order_recommendation_cancel_reason(conn, rec: dict, order: dict) -> str: rec_id = _safe_int(rec.get("id") or order.get("recommendation_id")) if rec_id <= 0: @@ -1083,16 +1103,22 @@ def _fill_paper_order(conn, order: dict, rec: dict, current_price: float, event_ base_notional = _safe_float(order.get("notional_usdt"), default_notional_usdt(cfg)) global_ok, global_detail = _global_risk_entry_check(conn, trade_rec, base_notional, cfg) if not global_ok: - # 触价后的限价单已经完成“等待成交”阶段。若此刻风控不允许开仓, - # 这张挂单必须结束,不能继续 pending 等待下一次风控放行,否则会在 - # 页面上出现“做多价格已低于目标价仍挂单”的错误状态。 - result = _cancel_paper_order(conn, order, "risk_paused_at_touch", event_time) - result.update({ - "target_price": order.get("target_price"), - "current_price": current_price, - "risk_detail": global_detail, - }) - return result + cancel_reason = _touch_global_risk_cancel_reason(global_detail) + if cancel_reason: + result = _cancel_paper_order(conn, order, cancel_reason, event_time) + result.update({ + "target_price": order.get("target_price"), + "current_price": current_price, + "risk_detail": global_detail, + }) + return result + global_detail = { + **(global_detail or {}), + "allow_new_entries": True, + "touch_soft_gate_overridden": True, + "touch_soft_gate_reason": (global_detail or {}).get("decision") or "soft_global_risk_gate", + } + trade_rec["_prefilled_global_risk"] = global_detail adjusted_notional = _market_risk_adjusted_notional(base_notional, global_detail, cfg) pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, adjusted_notional, event_time, cfg) if not pause_ok: diff --git a/app/services/onchain_monitor.py b/app/services/onchain_monitor.py index 7818dc1..53359a7 100644 --- a/app/services/onchain_monitor.py +++ b/app/services/onchain_monitor.py @@ -48,6 +48,32 @@ ERC20_SYMBOL_SELECTOR = "0x95d89b41" ERC20_NAME_SELECTOR = "0x06fdde03" ERC20_DECIMALS_SELECTOR = "0x313ce567" + +def _provider_error_summary(provider: str, chain: str = "", scope: str = "", symbol: str = "", exc: Exception | str = "") -> str: + provider_label = {"alchemy": "Alchemy", "nodereal": "NodeReal"}.get(str(provider or "").lower(), provider or "链上数据源") + chain_label = {"ethereum": "Ethereum", "bsc": "BSC"}.get(str(chain or "").lower(), chain or "") + scope_label = { + "logs": "映射代币日志", + "raw_logs": "原始转账流", + "metadata": "Token 资料", + }.get(str(scope or ""), scope or "链上数据") + text = str(exc or "") + reason = "采集失败" + lowered = text.lower() + if "name resolution" in lowered or "nameresolution" in lowered or "temporary failure in name resolution" in lowered: + reason = "DNS 解析异常" + elif "ssl" in lowered or "connectionpool" in lowered or "max retries" in lowered or "eof" in lowered: + reason = "连接异常" + elif "timeout" in lowered or "timed out" in lowered: + reason = "请求超时" + elif "rate" in lowered or "429" in lowered: + reason = "额度或限流" + elif "403" in lowered or "401" in lowered or "api_key" in lowered: + reason = "鉴权异常" + prefix = f"{symbol}:" if symbol else "" + chain_part = f"{chain_label} " if chain_label else "" + return f"{prefix}{provider_label} {chain_part}{scope_label} {reason}" + # --------------------------------------------------------------------------- # Known CEX hot/deposit wallet addresses (lowercase). # Sources: Etherscan/BscScan labeled addresses, Arkham, Nansen public tags. @@ -733,7 +759,7 @@ def fetch_nodereal_events(limit=60): if holder_event and insert_onchain_event(holder_event): events.append(holder_event) except Exception as exc: - errors.append(f"{mapping.get('symbol')}:nodereal_holder:{str(exc)[:160]}") + errors.append(_provider_error_summary("nodereal", chain=chain, scope="metadata", symbol=mapping.get("symbol"), exc=exc)) try: latest = client.block_number(chain) if latest <= 0: @@ -756,7 +782,7 @@ def fetch_nodereal_events(limit=60): if insert_onchain_event(event): events.append(event) except Exception as exc: - errors.append(f"{mapping.get('symbol')}:nodereal_logs:{str(exc)[:160]}") + errors.append(_provider_error_summary("nodereal", chain=chain, scope="logs", symbol=mapping.get("symbol"), exc=exc)) if not all_mappings: diagnostics["mapping_note"] = "no_strategy_mappings_raw_events_only" elif not chain_mappings: @@ -831,7 +857,7 @@ def fetch_alchemy_events(limit=60): if insert_onchain_event(event): events.append(event) except Exception as exc: - errors.append(f"{mapping.get('symbol')}:alchemy_logs:{str(exc)[:160]}") + errors.append(_provider_error_summary("alchemy", chain=chain, scope="logs", symbol=mapping.get("symbol"), exc=exc)) if not all_mappings: diagnostics["mapping_note"] = "no_strategy_mappings_raw_events_only" elif not chain_mappings: @@ -883,7 +909,7 @@ def fetch_nodereal_raw_events(client=None, cfg=None, limit=60): if insert_onchain_raw_event(item): inserted.append(item) except Exception as exc: - errors.append(f"{chain}:nodereal_raw_logs:{str(exc)[:160]}") + errors.append(_provider_error_summary("nodereal", chain=chain, scope="raw_logs", exc=exc)) return {"raw_events": inserted, "errors": errors} @@ -923,7 +949,7 @@ def fetch_alchemy_raw_events(client=None, cfg=None, limit=60): if insert_onchain_raw_event(item): inserted.append(item) except Exception as exc: - errors.append(f"{chain}:alchemy_raw_logs:{str(exc)[:160]}") + errors.append(_provider_error_summary("alchemy", chain=chain, scope="raw_logs", exc=exc)) return {"raw_events": inserted, "errors": errors} diff --git a/app/web/routes_admin.py b/app/web/routes_admin.py index d397ff1..373ff63 100644 --- a/app/web/routes_admin.py +++ b/app/web/routes_admin.py @@ -6,6 +6,7 @@ from app.db import auth_db from app.db import chat_assistant_db from app.db.analytics import get_cron_run_logs, get_cron_run_summary, get_pipeline_runs from app.db.data_export import build_data_export_bundle +from app.db.operations_dashboard import get_operations_dashboard from app.db.scheduler_db import ( enqueue_manual_trigger, get_job_config, @@ -51,6 +52,11 @@ def build_router(templates): require_admin(altcoin_session) return auth_db.get_admin_stats() + @router.get("/api/admin/operations-dashboard") + async def api_admin_operations_dashboard(hours: int = 24, altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + return get_operations_dashboard(hours=hours) + @router.get("/api/admin/users") async def api_admin_users(search: str = "", offset: int = 0, limit: int = 50, tab: str = "all", altcoin_session: str = Cookie(default="")): require_admin(altcoin_session) diff --git a/app/web/routes_pages.py b/app/web/routes_pages.py index ff7bc88..c1b45b2 100644 --- a/app/web/routes_pages.py +++ b/app/web/routes_pages.py @@ -47,6 +47,10 @@ def build_router(templates, repo_root: Path, stock_report_template: str): user, redirect = require_page_user(request) if redirect: return redirect + try: + require_admin(request.cookies.get("altcoin_session", "")) + except HTTPException as exc: + return HTMLResponse(content=f"

需要管理员权限

{exc.detail}

返回机会中心", status_code=exc.status_code) return render_page("pipeline.html", request, active_nav="pipeline") @router.get("/chat", response_class=HTMLResponse) @@ -61,6 +65,10 @@ def build_router(templates, repo_root: Path, stock_report_template: str): user, redirect = require_page_user(request) if redirect: return redirect + try: + require_admin(request.cookies.get("altcoin_session", "")) + except HTTPException as exc: + return HTMLResponse(content=f"

需要管理员权限

{exc.detail}

返回机会中心", status_code=exc.status_code) return render_page("llm_insights.html", request, active_nav="llm_insights") @router.get("/cron", response_class=HTMLResponse) @@ -74,6 +82,17 @@ def build_router(templates, repo_root: Path, stock_report_template: str): return HTMLResponse(content=f"

需要管理员权限

{exc.detail}

返回看板", status_code=exc.status_code) return render_page("cron.html", request, active_nav="cron") + @router.get("/operations", response_class=HTMLResponse) + async def operations_page(request: Request): + user, redirect = require_page_user(request) + if redirect: + return redirect + try: + require_admin(request.cookies.get("altcoin_session", "")) + except HTTPException as exc: + return HTMLResponse(content=f"

需要管理员权限

{exc.detail}

返回看板", status_code=exc.status_code) + return templates.TemplateResponse(request=request, name="operations.html", context={"show_nav": False, "active_nav": "operations"}) + @router.get("/config", response_class=HTMLResponse) async def config_page(request: Request): user, redirect = require_page_user(request) @@ -167,6 +186,10 @@ def build_router(templates, repo_root: Path, stock_report_template: str): user, redirect = require_page_user(request) if redirect: return redirect + try: + require_admin(request.cookies.get("altcoin_session", "")) + except HTTPException as exc: + return HTMLResponse(content=f"

需要管理员权限

{exc.detail}

返回机会中心", status_code=exc.status_code) return render_page("strategy.html", request, active_nav="strategy") @router.get("/subscription", response_class=HTMLResponse) @@ -231,6 +254,10 @@ def build_router(templates, repo_root: Path, stock_report_template: str): user, redirect = require_page_user(request) if redirect: return redirect + try: + require_admin(request.cookies.get("altcoin_session", "")) + except HTTPException as exc: + return HTMLResponse(content=f"

需要管理员权限

{exc.detail}

返回机会中心", status_code=exc.status_code) return render_page("iteration.html", request, active_nav="iteration") @router.get("/stock-report", response_class=HTMLResponse) diff --git a/docs/PRODUCT_INFORMATION_ARCHITECTURE.md b/docs/PRODUCT_INFORMATION_ARCHITECTURE.md new file mode 100644 index 0000000..fbc1008 --- /dev/null +++ b/docs/PRODUCT_INFORMATION_ARCHITECTURE.md @@ -0,0 +1,37 @@ +# AlphaX Product Information Architecture + +AlphaX 的页面入口按“普通用户能直接理解”和“管理员/研发排障”分层。新增页面前先判断它属于哪一层,不要把工程日志、调度、provider、JSON 调用记录直接放进普通用户菜单。 + +## 用户菜单 + +普通用户只看到能帮助判断市场和机会的页面: + +- 机会中心:当前机会、机会归档、机会状态。 +- 市场总览:全市场环境、强势榜、成交额、资金费率、链上/舆情摘要。 +- 消息面:新闻源和 AI 舆情分析。 +- 链上观察:重要资金流、风险线索、相关币种。 +- AI 助手:对话式加密研究助手。 +- 订阅、邀请:账号商业功能。 + +用户页面应该避免这些词作为第一层展示:`cron`、`scheduler`、`provider`、`raw logs`、`pipeline`、`JSON`、`runtime config`、`strategy version`。如果必须保留,应转成用户能理解的说法,例如“系统自动刷新中”“链上监控正常”“进入机会检查”。 + +## 管理员菜单 + +管理员菜单保留少数高价值入口: + +- 运行大屏:投屏式系统运行状态。 +- 策略交易:paper/live 执行账本和操作日志。 +- 实盘控制台:交易所账号与真实执行控制。 +- 复盘中心:策略表现、证据贡献、迭代摘要。 +- 诊断中心:系统错误、链路批次、AI 调用、问答日志、数据导出等排障入口。 +- 配置中心、调度中心、用户管理。 + +深层工程页面可以保留路由和 API,但不应直接出现在侧边栏。优先从“诊断中心”进入。 + +## 设计原则 + +- 先给结论,再给证据。 +- 首页和普通用户页面只显示最重要状态,不展示完整工程流水。 +- 收益只来自策略交易账本,不把观察样本当收益。 +- 链上和舆情是机会发现与风险上下文,不直接表达买入指令。 +- 管理员页面可以保留工程细节,但需要聚合入口,避免侧边栏变成功能清单。 diff --git a/static/base.html b/static/base.html index 2407f93..35577da 100644 --- a/static/base.html +++ b/static/base.html @@ -249,6 +249,7 @@ a { color: inherit; text-decoration: none; } + @@ -259,20 +260,19 @@ a { color: inherit; text-decoration: none; } AlphaX Agent