"""Account-centric read model for live trading console.""" from __future__ import annotations from datetime import datetime from app.db.live_trading import ( _safe_float, get_live_account, get_live_account_snapshot, list_live_account_equity_history, list_enabled_live_accounts, list_live_order_events, list_live_order_intents, record_live_account_equity_snapshot, upsert_live_account_snapshot, ) from app.integrations.binance_live import LiveTradingConfigError, build_binance_client _ACCOUNT_OVERVIEW_CACHE: dict[int, dict] = {} def _now() -> str: return datetime.now().isoformat() def _compact_balance(balance: dict) -> dict: total = balance.get("total") if isinstance(balance.get("total"), dict) else {} free = balance.get("free") if isinstance(balance.get("free"), dict) else {} used = balance.get("used") if isinstance(balance.get("used"), dict) else {} assets = [] for asset in sorted(set(total) | set(free) | set(used)): total_value = _safe_float(total.get(asset)) free_value = _safe_float(free.get(asset)) used_value = _safe_float(used.get(asset)) if abs(total_value) > 0 or abs(free_value) > 0 or abs(used_value) > 0: assets.append({"asset": asset, "free": free_value, "used": used_value, "total": total_value}) return { "assets": assets, "usdt": { "free": _safe_float(free.get("USDT")), "used": _safe_float(used.get("USDT")), "total": _safe_float(total.get("USDT")), }, } def _position_side_label(side: str) -> str: side = str(side or "").strip().lower() if side in {"long", "buy"}: return "多" if side in {"short", "sell"}: return "空" return "--" def _position_pnl_pct(unrealized_pnl: float, margin: float, position_value: float) -> float: if margin > 0: return round(unrealized_pnl / margin * 100.0, 6) if position_value > 0: return round(unrealized_pnl / position_value * 100.0, 6) return 0.0 def _display_symbol(symbol: str) -> str: """Normalize ccxt futures symbols like BANK/USDT:USDT for UI display.""" value = str(symbol or "").strip().upper() if ":" in value: value = value.split(":", 1)[0] if value and "/" not in value and value.endswith("USDT"): value = value[:-4] + "/USDT" return value def _safe_bool(value) -> bool: if isinstance(value, bool): return value if value is None: return False if isinstance(value, (int, float)): return value != 0 text = str(value).strip().lower() return text in {"1", "true", "yes", "y", "on"} def _compact_position(item: dict, account: dict | None = None) -> dict: info = item.get("info") if isinstance(item.get("info"), dict) else {} contracts = _safe_float(item.get("contracts") or info.get("positionAmt")) notional = _safe_float(item.get("notional") or info.get("notional")) entry_price = _safe_float(item.get("entryPrice") or info.get("entryPrice")) mark_price = _safe_float(item.get("markPrice") or info.get("markPrice")) position_value = abs(notional) if position_value <= 0 and abs(contracts) > 0 and mark_price > 0: position_value = abs(contracts) * mark_price margin = _safe_float( item.get("initialMargin") or item.get("collateral") or info.get("initialMargin") or info.get("positionInitialMargin") or info.get("isolatedWallet") ) leverage = _safe_float(item.get("leverage") or info.get("leverage")) leverage_source = "exchange" if leverage <= 0 and position_value > 0 and margin > 0: leverage = position_value / margin leverage_source = "computed" if leverage <= 0 and account: risk = account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {} leverage = _safe_float(risk.get("max_symbol_leverage"), 0) leverage_source = "account_config" if leverage > 0 else "missing" side = item.get("side") or ("long" if contracts > 0 else ("short" if contracts < 0 else "")) return { "symbol": _display_symbol(item.get("symbol") or info.get("symbol")), "side": side, "side_label": _position_side_label(side), "contracts": abs(contracts), "entry_price": entry_price, "mark_price": mark_price, "notional": notional, "position_value_usdt": position_value, "margin_usdt": margin, "unrealized_pnl": _safe_float(item.get("unrealizedPnl") or info.get("unrealizedProfit")), "leverage": leverage, "leverage_source": leverage_source, } def _compact_order(item: dict) -> dict: info = item.get("info") if isinstance(item.get("info"), dict) else {} return { "id": str(item.get("id") or info.get("orderId") or ""), "client_order_id": item.get("clientOrderId") or info.get("clientOrderId") or "", "symbol": _display_symbol(item.get("symbol") or info.get("symbol")), "type": item.get("type") or info.get("type"), "side": item.get("side") or info.get("side"), "status": item.get("status") or info.get("status"), "price": _safe_float(item.get("price") or info.get("price")), "amount": _safe_float(item.get("amount") or info.get("origQty")), "filled": _safe_float(item.get("filled") or info.get("executedQty")), "average": _safe_float(item.get("average") or info.get("avgPrice")), "realized_pnl": _safe_float(item.get("realizedPnl") or info.get("realizedPnl") or info.get("realizedProfit")), "reduce_only": _safe_bool(item.get("reduceOnly") if item.get("reduceOnly") is not None else info.get("reduceOnly")), "position_side": item.get("positionSide") or info.get("positionSide") or "", "timestamp": item.get("datetime") or item.get("timestamp") or info.get("updateTime") or info.get("time"), } def _enrich_positions(positions: list[dict]) -> list[dict]: enriched = [] for item in positions or []: row = dict(item) unrealized = _safe_float(row.get("unrealized_pnl")) margin = _safe_float(row.get("margin_usdt")) value = _safe_float(row.get("position_value_usdt")) row["pnl_pct"] = _position_pnl_pct(unrealized, margin, value) if unrealized > 0: row["pnl_status"] = "profit" elif unrealized < 0: row["pnl_status"] = "loss" else: row["pnl_status"] = "flat" enriched.append(row) return enriched def _normalize_symbol(symbol: str) -> str: return _display_symbol(symbol) def _order_history_symbols(account: dict, overview: dict) -> list[str]: """Build the smallest safe symbol set for Binance order-history queries.""" risk = account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {} symbols: list[str] = [] for raw in risk.get("allowed_symbols") or []: symbol = _normalize_symbol(raw) if symbol: symbols.append(symbol) for row in (overview.get("positions") or []) + (overview.get("open_orders") or []): symbol = _normalize_symbol(row.get("symbol")) if symbol: symbols.append(symbol) for row in overview.get("intent_history") or []: symbol = _normalize_symbol(row.get("symbol")) if symbol: symbols.append(symbol) result = [] seen = set() for symbol in symbols: key = symbol.upper() if key not in seen: seen.add(key) result.append(symbol) return result[:20] def _fetch_order_history_by_symbol(client, symbols: list[str], limit: int) -> tuple[list[dict], list[str]]: orders = [] errors = [] if not symbols: return orders, errors per_symbol_limit = max(1, min(int(limit or 30), 50)) for symbol in symbols: try: orders.extend(_compact_order(o) for o in client.fetch_orders(symbol, limit=per_symbol_limit)) except Exception as exc: errors.append(f"订单历史读取失败 {symbol}:{exc}") orders.sort(key=lambda x: str(x.get("timestamp") or ""), reverse=True) return orders[: max(1, int(limit or 30))], errors def _account_risk_view(account: dict) -> dict: risk = account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {} allowed = [str(x).strip().upper() for x in risk.get("allowed_symbols", []) if str(x).strip()] max_leverage = _safe_float(risk.get("max_symbol_leverage"), 1) margin = _safe_float(risk.get("max_order_margin_usdt"), 0) return { "max_order_margin_usdt": margin, "max_symbol_leverage": max_leverage, "max_order_notional_usdt": _safe_float(risk.get("max_order_notional_usdt"), margin * max(1.0, max_leverage)), "max_cumulative_leverage": _safe_float(risk.get("max_cumulative_leverage"), 1), "max_daily_order_count": int(risk.get("max_daily_order_count") or 0), "allowed_symbols": allowed, "symbol_policy": "all" if not allowed else "allowlist", } def _cache_overview(account_id: int, overview: dict) -> dict: _ACCOUNT_OVERVIEW_CACHE[int(account_id)] = overview return overview def _cached_overview(account_id: int) -> dict | None: item = _ACCOUNT_OVERVIEW_CACHE.get(int(account_id)) if not item: return None cached = dict(item) cached["exchange_cache"] = {**(cached.get("exchange_cache") or {}), "cached": True} return cached def _exchange_snapshot_payload(overview: dict) -> dict: return { "balance": overview.get("balance") or {"assets": [], "usdt": {"free": 0, "used": 0, "total": 0}}, "positions": overview.get("positions") or [], "open_orders": overview.get("open_orders") or [], "order_history": overview.get("order_history") or [], "exchange_cache": overview.get("exchange_cache") or {}, "errors": overview.get("errors") or [], "performance": overview.get("performance") or {}, "historical_positions": overview.get("historical_positions") or [], } def _current_equity_metrics(overview: dict) -> dict: balance = overview.get("balance") if isinstance(overview.get("balance"), dict) else {} usdt = balance.get("usdt") if isinstance(balance.get("usdt"), dict) else {} positions = overview.get("positions") or [] equity = _safe_float(usdt.get("total")) available = _safe_float(usdt.get("free")) used = _safe_float(usdt.get("used")) unrealized = sum(_safe_float(x.get("unrealized_pnl")) for x in positions) position_value = sum(abs(_safe_float(x.get("position_value_usdt"))) for x in positions) return { "equity_usdt": round(equity, 8), "wallet_balance_usdt": round(equity - unrealized, 8), "available_usdt": round(available, 8), "used_margin_usdt": round(used, 8), "unrealized_pnl_usdt": round(unrealized, 8), "open_position_value_usdt": round(position_value, 8), "position_count": len(positions), } def _max_drawdown(history: list[dict]) -> tuple[float, float]: peak = 0.0 max_dd_pct = 0.0 max_dd_usdt = 0.0 for row in history: equity = _safe_float(row.get("equity_usdt")) if equity <= 0: continue peak = max(peak, equity) if peak <= 0: continue dd_usdt = max(0.0, peak - equity) dd_pct = dd_usdt / peak * 100.0 if dd_pct > max_dd_pct: max_dd_pct = dd_pct max_dd_usdt = dd_usdt return round(max_dd_pct, 6), round(max_dd_usdt, 8) def _order_ts(order: dict) -> str: return str(order.get("timestamp") or "") def _order_price(order: dict) -> float: return _safe_float(order.get("average") or order.get("price")) def _order_amount(order: dict) -> float: return _safe_float(order.get("filled") or order.get("amount")) def _order_direction(order: dict, *, closing: bool = False) -> str: side = str(order.get("side") or "").strip().lower() position_side = str(order.get("position_side") or "").strip().lower() if position_side in {"long", "short"}: return position_side if closing: if side == "sell": return "long" if side == "buy": return "short" if side == "buy": return "long" if side == "sell": return "short" return "" def _realized_pnl_pct(side: str, entry_price: float, exit_price: float) -> float: if entry_price <= 0 or exit_price <= 0: return 0.0 if side == "short": return round((entry_price / exit_price - 1.0) * 100.0, 6) return round((exit_price / entry_price - 1.0) * 100.0, 6) def _historical_positions_from_orders(orders: list[dict]) -> list[dict]: """Build a position-level history from exchange order history. Binance futures order history is order-centric. For the console we pair open orders with later reduce-only/realized-PnL orders so users can see entry price, exit price and whether the completed position made money. """ active_lots: dict[tuple[str, str], list[dict]] = {} rows = [] sorted_orders = sorted(orders or [], key=_order_ts) for order in sorted_orders: status = str(order.get("status") or "").lower() if status not in {"closed", "filled"}: continue symbol = _display_symbol(order.get("symbol") or "") if not symbol: continue amount = _order_amount(order) price = _order_price(order) pnl = _safe_float(order.get("realized_pnl")) reduce_only = _safe_bool(order.get("reduce_only")) is_close = reduce_only or abs(pnl) > 0 direction = _order_direction(order, closing=is_close) if not direction: continue key = (symbol, direction) if not is_close: active_lots.setdefault(key, []).append({ "time": order.get("timestamp") or "", "entry_price": price, "amount": amount, }) continue lot = active_lots.get(key, []).pop(0) if active_lots.get(key) else {} entry_price = _safe_float(lot.get("entry_price")) exit_price = price if pnl == 0 and entry_price > 0 and exit_price > 0 and amount > 0: raw_pnl = (exit_price - entry_price) * amount if direction == "long" else (entry_price - exit_price) * amount pnl = round(raw_pnl, 8) pnl_pct = _realized_pnl_pct(direction, entry_price, exit_price) result = "盈利" if pnl > 0 else "亏损" if pnl < 0 else "未知" rows.append({ "time": order.get("timestamp") or "", "symbol": symbol, "side": direction, "side_label": _position_side_label(direction), "entry_price": entry_price, "exit_price": exit_price, "price": exit_price, "amount": amount, "realized_pnl": pnl, "realized_pnl_pct": pnl_pct, "result": result, "status": order.get("status") or "", }) if rows: return sorted(rows, key=lambda x: str(x.get("time") or ""), reverse=True)[:30] for order in orders or []: status = str(order.get("status") or "").lower() if status not in {"closed", "filled"}: continue symbol = _display_symbol(order.get("symbol") or "") pnl = _safe_float(order.get("realized_pnl")) result = "盈利" if pnl > 0 else "亏损" if pnl < 0 else "未知" rows.append({ "time": order.get("timestamp") or "", "symbol": symbol, "side": order.get("side") or "", "side_label": _position_side_label(order.get("side")), "entry_price": 0, "exit_price": _order_price(order), "price": _order_price(order), "amount": _order_amount(order), "realized_pnl": pnl, "realized_pnl_pct": 0, "result": result, "status": order.get("status") or "", }) return rows[:30] def _attach_performance(overview: dict, account_id: int, *, record_history: bool = False, snapshot_at: str = "") -> dict: overview["positions"] = _enrich_positions(overview.get("positions") or []) metrics = _current_equity_metrics(overview) if record_history and metrics["equity_usdt"] > 0: record_live_account_equity_snapshot(account_id, **metrics, snapshot_at=snapshot_at or _now()) history = list_live_account_equity_history(account_id) if not history and metrics["equity_usdt"] > 0: history = [{**metrics, "snapshot_at": snapshot_at or _now()}] baseline = _safe_float(history[0].get("equity_usdt")) if history else metrics["equity_usdt"] peak = max([_safe_float(x.get("equity_usdt")) for x in history] + [metrics["equity_usdt"], 0.0]) total_pnl = metrics["equity_usdt"] - baseline if baseline > 0 else 0.0 return_pct = total_pnl / baseline * 100.0 if baseline > 0 else 0.0 current_dd_usdt = max(0.0, peak - metrics["equity_usdt"]) current_dd_pct = current_dd_usdt / peak * 100.0 if peak > 0 else 0.0 max_dd_pct, max_dd_usdt = _max_drawdown(history + [{**metrics, "snapshot_at": snapshot_at or _now()}]) overview["performance"] = { **metrics, "baseline_equity_usdt": round(baseline, 8), "total_pnl_usdt": round(total_pnl, 8), "return_pct": round(return_pct, 6), "peak_equity_usdt": round(peak, 8), "current_drawdown_usdt": round(current_dd_usdt, 8), "current_drawdown_pct": round(current_dd_pct, 6), "max_drawdown_usdt": max_dd_usdt, "max_drawdown_pct": max_dd_pct, "history_points": len(history), "basis": "按首次同步净值计算,未单独扣除充值/提现影响", } overview["historical_positions"] = _historical_positions_from_orders(overview.get("order_history") or []) return overview def _merge_snapshot(overview: dict, snapshot_row: dict) -> dict: payload = snapshot_row.get("snapshot") if isinstance(snapshot_row.get("snapshot"), dict) else {} if not payload: return overview for key in ("balance", "positions", "open_orders", "order_history", "errors", "performance", "historical_positions"): if key in payload: overview[key] = payload.get(key) synced_at = snapshot_row.get("synced_at") or "" status = snapshot_row.get("status") or "" error_message = snapshot_row.get("error_message") or "" overview["exchange_cache"] = { **(payload.get("exchange_cache") or {}), "cached": True, "loaded": bool((payload.get("exchange_cache") or {}).get("loaded", True)), "requires_refresh": False, "source": "database", "status": status, "synced_at": synced_at, } if status == "error" and error_message and error_message not in (overview.get("errors") or []): overview.setdefault("errors", []).append(error_message) return _attach_performance(overview, int(overview.get("account", {}).get("id") or 0)) def get_live_account_overview(account_id: int, *, history_limit: int = 30, refresh: bool = False, client_factory=None) -> dict: account = get_live_account(account_id) if not account: raise LiveTradingConfigError("live account not found") overview = { "account": account, "risk": _account_risk_view(account), "balance": {"assets": [], "usdt": {"free": 0, "used": 0, "total": 0}}, "positions": [], "open_orders": [], "order_history": [], "historical_positions": [], "intent_history": list_live_order_intents(limit=history_limit, account_id=account_id).get("items", []), "events": list_live_order_events(limit=history_limit).get("items", []), "performance": {}, "exchange_cache": {"cached": False, "loaded": False, "requires_refresh": True}, "errors": [], } if account.get("status") != "enabled": return overview if not refresh: snapshot = get_live_account_snapshot(account_id) if snapshot: return _merge_snapshot(overview, snapshot) cached = _cached_overview(account_id) if cached: cached["account"] = account cached["risk"] = overview["risk"] cached["intent_history"] = overview["intent_history"] cached["events"] = overview["events"] return cached overview["exchange_cache"]["reason"] = "等待后台实盘同步生成账户快照" return overview synced_at = _now() try: client = client_factory(account) if client_factory else build_binance_client(account, require_testnet=True) client.load_markets() except Exception as exc: overview["errors"].append(f"账户连接失败:{exc}") overview["exchange_cache"] = { "cached": False, "loaded": False, "requires_refresh": True, "source": "exchange", "status": "error", "synced_at": synced_at, } overview = _attach_performance(overview, account_id, record_history=False, snapshot_at=synced_at) upsert_live_account_snapshot( account_id, _exchange_snapshot_payload(overview), status="error", error_message=overview["errors"][0], synced_at=synced_at, ) return overview try: overview["balance"] = _compact_balance(client.fetch_balance()) except Exception as exc: overview["errors"].append(f"余额读取失败:{exc}") try: overview["positions"] = [ item for item in (_compact_position(p, account) for p in client.fetch_positions(None)) if abs(_safe_float(item.get("contracts"))) > 0 ] except Exception as exc: overview["errors"].append(f"持仓读取失败:{exc}") try: overview["open_orders"] = [_compact_order(o) for o in client.fetch_open_orders(None)] except Exception as exc: overview["errors"].append(f"挂单读取失败:{exc}") symbols = _order_history_symbols(account, overview) order_history, order_history_errors = _fetch_order_history_by_symbol(client, symbols, history_limit) overview["order_history"] = order_history overview["errors"].extend(order_history_errors) overview["exchange_cache"] = { "cached": False, "loaded": True, "requires_refresh": False, "source": "exchange", "status": "error" if overview["errors"] else "ok", "synced_at": synced_at, } overview = _attach_performance(overview, account_id, record_history=True, snapshot_at=synced_at) upsert_live_account_snapshot( account_id, _exchange_snapshot_payload(overview), status="error" if overview["errors"] else "ok", error_message=";".join(overview["errors"][:3]), synced_at=synced_at, ) return _cache_overview(account_id, overview) def sync_live_account_snapshots(*, account_ids: list[int] | None = None, history_limit: int = 30, client_factory=None) -> dict: selected = {int(x) for x in (account_ids or []) if int(x or 0) > 0} accounts = list_enabled_live_accounts() if selected: accounts = [account for account in accounts if int(account.get("id") or 0) in selected] items = [] ok_count = 0 error_count = 0 for account in accounts: try: overview = get_live_account_overview( account["id"], history_limit=history_limit, refresh=True, client_factory=client_factory, ) status = (overview.get("exchange_cache") or {}).get("status") or ("error" if overview.get("errors") else "ok") if status == "ok": ok_count += 1 else: error_count += 1 items.append({ "account_id": account["id"], "account_code": account.get("account_code"), "status": status, "synced_at": (overview.get("exchange_cache") or {}).get("synced_at"), "errors": overview.get("errors") or [], }) except Exception as exc: error_count += 1 items.append({ "account_id": account.get("id"), "account_code": account.get("account_code"), "status": "error", "errors": [str(exc)], }) return { "ok": error_count == 0, "total": len(accounts), "ok_count": ok_count, "error_count": error_count, "items": items, }