diff --git a/AGENTS.md b/AGENTS.md index 13a42c0..7cac156 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -99,6 +99,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 负责策略交易账本同步和 paper 执行适配。TP/SL、移动止盈、仓位健康、仓位 sizing、账户级风控等可复用交易能力不应长期绑定在 paper trading 层;新增能力优先沉到 `app/core/*` 或独立 execution/risk 模块,再由 paper/live 适配调用。 8. `app/db/live_trading.py` / `app/web/routes_live_trading.py` 负责实盘控制台:多交易所/多 API 账户配置、账号级风控、交易所接口验收和执行审计事件。页面不再使用“订单意图”作为产品概念,也不区分 Demo/正式环境,实际环境由 endpoint/API key 配置决定。 + 实盘控制台页面默认只读取 PostgreSQL 中的账户快照,不应在首屏加载时直接阻塞调用交易所 API。`live-trading-sync` 调度任务负责定时同步余额、持仓、挂单、订单历史到 `live_account_snapshots`,并按配置把策略交易 open 仓位同步到实盘账户;手动“立即同步”只是强制刷新同一份 DB 快照。 9. `app/services/review_engine.py` 负责复盘与策略自迭代,包括信号绩效、漏选复盘、规则候选、版本演进。 @@ -244,7 +245,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - `app/db/paper_trading.py` - 模拟交易账本、仓位、成交事件和资金口径。 - `app/db/live_trading.py` - - 实盘控制台账本,多 API 账户、账号级风控、交易所接口验收与执行事件;不保存真实 API secret。 + - 实盘控制台账本,多 API 账户、账号级风控、交易所接口验收、账户快照与执行事件;不保存真实 API secret。 - `app/db/market_db.py` - 市场快照。 - `app/db/system_logs.py` @@ -273,6 +274,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - `paper_orders` - `paper_trade_events` - `live_trade_accounts` +- `live_account_snapshots` - `live_order_intents` - `live_order_events` - `market_snapshots` @@ -388,6 +390,7 @@ python -m app.cli market python -m app.cli review python -m app.cli event python -m app.cli sentiment --collect +python -m app.cli live-trading-sync python -m app.cli llm-insights --scope sentiment --limit 40 ``` @@ -401,6 +404,14 @@ docker compose exec alphax-web python -m app.cli live-trading-smoke --account-id 该命令会依次测试余额/行情、设置杠杆、市价单、止盈单、止损单、限价挂单、撤单、最后市价平仓,并写入 `live_order_events`。不要把真实 API key 写入数据库或聊天;只在环境变量中保存密钥,`live_trade_accounts` 只保存 env key 名。 +实盘日常同步入口: + +```bash +docker compose exec alphax-web python -m app.cli live-trading-sync --limit 20 +``` + +该命令会先同步所有启用账号的交易所快照到 `live_account_snapshots`,再根据 `live_trading` 配置把未同步的策略交易 open 仓位写入/提交实盘执行。页面读取快照,不应为了展示余额、持仓、挂单而直接访问交易所。 + ### 8.3 测试与校验 常用回归命令: diff --git a/app/cli.py b/app/cli.py index 81e0aee..b726d98 100644 --- a/app/cli.py +++ b/app/cli.py @@ -3,7 +3,7 @@ import argparse import sys -from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, live_trading_smoke, market_overview, paper_trader, price_streamer, price_tracker, review_engine, sentiment_monitor +from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, live_trading_smoke, live_trading_sync, market_overview, paper_trader, price_streamer, price_tracker, review_engine, sentiment_monitor def build_parser(): @@ -51,6 +51,12 @@ def build_parser(): live_smoke.add_argument("--notional-usdt", type=float, default=10.0, help="测试名义金额,默认 10U") live_smoke.add_argument("--leverage", type=float, default=1.0, help="测试杠杆,默认 1x") + live_sync = subparsers.add_parser("live-trading-sync", help="同步实盘账户快照,并把策略交易同步到实盘") + live_sync.add_argument("--limit", type=int, default=20, help="本轮最多同步的策略交易持仓数量") + live_sync.add_argument("--no-execute", action="store_true", help="只创建/检查同步意图,不提交交易所") + live_sync.add_argument("--skip-snapshots", action="store_true", help="跳过账户余额/持仓/订单快照同步") + live_sync.add_argument("--skip-paper-sync", action="store_true", help="跳过策略交易到实盘同步") + repair_strategy = subparsers.add_parser("repair-strategy-direction", help="修复策略方向与交易方向不一致的推荐数据") repair_strategy.add_argument("--limit", type=int, default=500, help="最多扫描的 recommendation 数量") repair_strategy.add_argument("--dry-run", action="store_true", help="只预览不写库") @@ -126,6 +132,15 @@ def main(): notional_usdt=args.notional_usdt, leverage=args.leverage, ) + if args.command == "live-trading-sync": + result = live_trading_sync.run_live_trading_sync( + limit=args.limit, + execute=not args.no_execute, + sync_snapshots=not args.skip_snapshots, + sync_paper=not args.skip_paper_sync, + ) + print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2, default=str)) + return result if args.command == "repair-strategy-direction": from app.db.strategy_direction_repair import repair_strategy_direction_mismatches diff --git a/app/db/live_trading.py b/app/db/live_trading.py index 14904cd..86dfd65 100644 --- a/app/db/live_trading.py +++ b/app/db/live_trading.py @@ -65,7 +65,7 @@ def _row(row) -> dict: if not row: return {} item = dict(row) - for key in ("permissions_json", "risk_config_json", "risk_check_json", "request_json", "response_json", "payload_json"): + for key in ("permissions_json", "risk_config_json", "risk_check_json", "request_json", "response_json", "payload_json", "snapshot_json"): if key in item: item[key.replace("_json", "")] = _loads(item.pop(key), {}) for key in ("testnet", "reduce_only"): @@ -270,6 +270,68 @@ def list_enabled_live_accounts() -> list[dict]: return [_row(r) for r in rows] +def upsert_live_account_snapshot( + account_id: int, + snapshot: dict, + *, + status: str = "ok", + error_message: str = "", + synced_at: str = "", +) -> dict: + account_id = _safe_int(account_id) + if account_id <= 0: + return {"ok": False, "reason": "invalid_account_id"} + now = _now() + synced_at = synced_at or now + conn = get_conn() + try: + row = conn.execute( + """ + INSERT INTO live_account_snapshots ( + account_id, status, error_message, snapshot_json, synced_at, created_at, updated_at + ) + VALUES (%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT(account_id) DO UPDATE SET + status=excluded.status, + error_message=excluded.error_message, + snapshot_json=excluded.snapshot_json, + synced_at=excluded.synced_at, + updated_at=excluded.updated_at + RETURNING * + """, + ( + account_id, + str(status or "ok"), + str(error_message or "")[:1000], + _dumps(snapshot or {}), + synced_at, + now, + now, + ), + ).fetchone() + conn.commit() + finally: + conn.close() + item = _row(row) + item["ok"] = True + return item + + +def get_live_account_snapshot(account_id: int) -> dict: + account_id = _safe_int(account_id) + if account_id <= 0: + return {} + conn = get_conn() + try: + row = conn.execute( + "SELECT * FROM live_account_snapshots WHERE account_id=%s", + (account_id,), + ).fetchone() + finally: + conn.close() + return _row(row) + + def _config_for_account(account: dict | None = None) -> dict: cfg = get_effective_live_trading_config() if account: @@ -565,6 +627,14 @@ def get_live_trading_summary() -> dict: "SELECT * FROM live_order_intents ORDER BY updated_at DESC, id DESC LIMIT 8" ).fetchall() account_count = conn.execute("SELECT COUNT(*) FROM live_trade_accounts").fetchone()[0] + snapshot_rows = conn.execute( + """ + SELECT status, COUNT(*) AS count, MAX(synced_at) AS latest_synced_at + FROM live_account_snapshots + GROUP BY status + ORDER BY status + """ + ).fetchall() finally: conn.close() return { @@ -580,6 +650,8 @@ def get_live_trading_summary() -> dict: "max_cumulative_leverage": risk["max_cumulative_leverage"], "max_daily_order_count": risk["max_daily_order_count"], "account_count": account_count, + "snapshot_status": {r["status"]: r["count"] for r in snapshot_rows}, + "latest_snapshot_synced_at": max([str(r["latest_synced_at"] or "") for r in snapshot_rows] or [""]), "intent_status": {r["status"]: r["count"] for r in status_rows}, "latest_intents": [_row(r) for r in latest_rows], } diff --git a/app/db/migrations/0021_live_account_snapshots.sql b/app/db/migrations/0021_live_account_snapshots.sql new file mode 100644 index 0000000..4add8d6 --- /dev/null +++ b/app/db/migrations/0021_live_account_snapshots.sql @@ -0,0 +1,15 @@ +-- Persist exchange account snapshots so the live-trading console reads DB state +-- instead of blocking page load on exchange API calls. + +CREATE TABLE IF NOT EXISTS live_account_snapshots ( + account_id INTEGER PRIMARY KEY REFERENCES live_trade_accounts(id) ON DELETE CASCADE, + status TEXT NOT NULL DEFAULT 'ok', + error_message TEXT NOT NULL DEFAULT '', + snapshot_json TEXT NOT NULL DEFAULT '{}', + synced_at TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL DEFAULT '', + updated_at TEXT NOT NULL DEFAULT '' +); + +CREATE INDEX IF NOT EXISTS idx_live_account_snapshots_synced_at + ON live_account_snapshots(synced_at DESC); diff --git a/app/db/scheduler_db.py b/app/db/scheduler_db.py index 05df4cc..a63d7b1 100644 --- a/app/db/scheduler_db.py +++ b/app/db/scheduler_db.py @@ -51,6 +51,16 @@ DEFAULT_JOBS = [ "description": "策略交易账本同步", "sort_order": 25, }, + { + "job_name": "live-trading-sync", + "command": "live-trading-sync", + "args": ["--limit", "20"], + "every_seconds": 120, + "initial_delay": 45, + "lock_group": "live_trading_write", + "description": "实盘账户快照与策略交易同步", + "sort_order": 27, + }, { "job_name": "market", "command": "market", diff --git a/app/services/live_trading_account.py b/app/services/live_trading_account.py index 52f991c..64f3682 100644 --- a/app/services/live_trading_account.py +++ b/app/services/live_trading_account.py @@ -2,12 +2,26 @@ from __future__ import annotations -from app.db.live_trading import _safe_float, get_live_account, list_live_order_events, list_live_order_intents +from datetime import datetime + +from app.db.live_trading import ( + _safe_float, + get_live_account, + get_live_account_snapshot, + list_enabled_live_accounts, + list_live_order_events, + list_live_order_intents, + upsert_live_account_snapshot, +) from app.integrations.binance_live import LiveTradingConfigError, build_binance_client _ACCOUNT_OVERVIEW_CACHE: dict[int, dict] = {} +def _now() -> str: + return datetime.now().isoformat() + + def _compact_balance(balance: dict) -> dict: total = balance.get("total") if isinstance(balance.get("total"), dict) else {} free = balance.get("free") if isinstance(balance.get("free"), dict) else {} @@ -176,6 +190,41 @@ def _cached_overview(account_id: int) -> dict | None: return cached +def _exchange_snapshot_payload(overview: dict) -> dict: + return { + "balance": overview.get("balance") or {"assets": [], "usdt": {"free": 0, "used": 0, "total": 0}}, + "positions": overview.get("positions") or [], + "open_orders": overview.get("open_orders") or [], + "order_history": overview.get("order_history") or [], + "exchange_cache": overview.get("exchange_cache") or {}, + "errors": overview.get("errors") or [], + } + + +def _merge_snapshot(overview: dict, snapshot_row: dict) -> dict: + payload = snapshot_row.get("snapshot") if isinstance(snapshot_row.get("snapshot"), dict) else {} + if not payload: + return overview + for key in ("balance", "positions", "open_orders", "order_history", "errors"): + if key in payload: + overview[key] = payload.get(key) + synced_at = snapshot_row.get("synced_at") or "" + status = snapshot_row.get("status") or "" + error_message = snapshot_row.get("error_message") or "" + overview["exchange_cache"] = { + **(payload.get("exchange_cache") or {}), + "cached": True, + "loaded": bool((payload.get("exchange_cache") or {}).get("loaded", True)), + "requires_refresh": False, + "source": "database", + "status": status, + "synced_at": synced_at, + } + if status == "error" and error_message and error_message not in (overview.get("errors") or []): + overview.setdefault("errors", []).append(error_message) + return overview + + def get_live_account_overview(account_id: int, *, history_limit: int = 30, refresh: bool = False, client_factory=None) -> dict: account = get_live_account(account_id) if not account: @@ -195,6 +244,9 @@ def get_live_account_overview(account_id: int, *, history_limit: int = 30, refre if account.get("status") != "enabled": return overview if not refresh: + snapshot = get_live_account_snapshot(account_id) + if snapshot: + return _merge_snapshot(overview, snapshot) cached = _cached_overview(account_id) if cached: cached["account"] = account @@ -202,13 +254,29 @@ def get_live_account_overview(account_id: int, *, history_limit: int = 30, refre cached["intent_history"] = overview["intent_history"] cached["events"] = overview["events"] return cached - overview["exchange_cache"]["reason"] = "点击刷新交易所数据后读取余额、持仓和订单" + overview["exchange_cache"]["reason"] = "等待后台实盘同步生成账户快照" return overview + synced_at = _now() try: client = client_factory(account) if client_factory else build_binance_client(account, require_testnet=True) client.load_markets() except Exception as exc: overview["errors"].append(f"账户连接失败:{exc}") + overview["exchange_cache"] = { + "cached": False, + "loaded": False, + "requires_refresh": True, + "source": "exchange", + "status": "error", + "synced_at": synced_at, + } + upsert_live_account_snapshot( + account_id, + _exchange_snapshot_payload(overview), + status="error", + error_message=overview["errors"][0], + synced_at=synced_at, + ) return overview try: overview["balance"] = _compact_balance(client.fetch_balance()) @@ -229,5 +297,64 @@ def get_live_account_overview(account_id: int, *, history_limit: int = 30, refre order_history, order_history_errors = _fetch_order_history_by_symbol(client, symbols, history_limit) overview["order_history"] = order_history overview["errors"].extend(order_history_errors) - overview["exchange_cache"] = {"cached": False, "loaded": True, "requires_refresh": False} + overview["exchange_cache"] = { + "cached": False, + "loaded": True, + "requires_refresh": False, + "source": "exchange", + "status": "error" if overview["errors"] else "ok", + "synced_at": synced_at, + } + upsert_live_account_snapshot( + account_id, + _exchange_snapshot_payload(overview), + status="error" if overview["errors"] else "ok", + error_message=";".join(overview["errors"][:3]), + synced_at=synced_at, + ) return _cache_overview(account_id, overview) + + +def sync_live_account_snapshots(*, account_ids: list[int] | None = None, history_limit: int = 30, client_factory=None) -> dict: + selected = {int(x) for x in (account_ids or []) if int(x or 0) > 0} + accounts = list_enabled_live_accounts() + if selected: + accounts = [account for account in accounts if int(account.get("id") or 0) in selected] + items = [] + ok_count = 0 + error_count = 0 + for account in accounts: + try: + overview = get_live_account_overview( + account["id"], + history_limit=history_limit, + refresh=True, + client_factory=client_factory, + ) + status = (overview.get("exchange_cache") or {}).get("status") or ("error" if overview.get("errors") else "ok") + if status == "ok": + ok_count += 1 + else: + error_count += 1 + items.append({ + "account_id": account["id"], + "account_code": account.get("account_code"), + "status": status, + "synced_at": (overview.get("exchange_cache") or {}).get("synced_at"), + "errors": overview.get("errors") or [], + }) + except Exception as exc: + error_count += 1 + items.append({ + "account_id": account.get("id"), + "account_code": account.get("account_code"), + "status": "error", + "errors": [str(exc)], + }) + return { + "ok": error_count == 0, + "total": len(accounts), + "ok_count": ok_count, + "error_count": error_count, + "items": items, + } diff --git a/app/services/live_trading_sync.py b/app/services/live_trading_sync.py index 187adbb..75d7548 100644 --- a/app/services/live_trading_sync.py +++ b/app/services/live_trading_sync.py @@ -18,6 +18,7 @@ from app.db.live_trading import ( from app.config.system_config import live_trading_config from app.db.schema import get_conn, init_db from app.integrations.binance_live import LiveTradingConfigError, build_binance_client +from app.services.live_trading_account import sync_live_account_snapshots def _now() -> str: @@ -279,3 +280,36 @@ def sync_open_paper_trades_to_live(*, limit: int = 20, execute: bool = True, cli "execute": execute, "results": results, } + + +def run_live_trading_sync( + *, + limit: int = 20, + execute: bool = True, + sync_snapshots: bool = True, + sync_paper: bool = True, + client_factory=None, +) -> dict: + """Single scheduler entrypoint for live account snapshots and paper sync.""" + init_db() + cfg = live_trading_config() + snapshot_result = {"skipped": True, "reason": "snapshot_sync_disabled"} + paper_result = {"skipped": True, "reason": "paper_sync_disabled"} + if sync_snapshots: + snapshot_result = sync_live_account_snapshots(client_factory=client_factory) + if sync_paper: + if not bool(cfg.get("enabled")): + paper_result = {"ok": False, "skipped": True, "reason": "live_trading_disabled", "results": []} + else: + paper_result = sync_open_paper_trades_to_live( + limit=limit, + execute=execute, + client_factory=client_factory, + ) + return { + "ok": bool(snapshot_result.get("ok", True)) and bool(paper_result.get("ok", True)), + "enabled": bool(cfg.get("enabled")), + "execute": execute, + "snapshots": snapshot_result, + "paper_sync": paper_result, + } diff --git a/static/live_trading.html b/static/live_trading.html index e2be172..9427702 100644 --- a/static/live_trading.html +++ b/static/live_trading.html @@ -37,7 +37,7 @@
-
资金与持仓
默认显示本地缓存,手动刷新才请求交易所
+
资金与持仓
默认读取数据库快照,后台定时同步交易所数据
@@ -119,7 +119,9 @@ function renderKpis(){var a=selectedAccountObj(),o=state.overview||{},risk=o.ris function fillAccountForm(id){var x=(state.accounts||[]).find(function(a){return Number(a.id)===Number(id)})||{},r=x.risk_config||{};$('accountCode').value=x.account_code||'';$('accountStatus').value=x.status||'disabled';$('accountExchange').value=x.exchange||'binance';$('accountMarket').value=x.market_type||'um_futures';$('apiKeyEnv').value=x.api_key_env||'ALPHAX_BINANCE_API_KEY';$('apiSecretEnv').value=x.api_secret_env||'ALPHAX_BINANCE_API_SECRET';$('maxOrderMargin').value=r.max_order_margin_usdt||10;$('maxSymbolLeverage').value=r.max_symbol_leverage||1;$('maxCumulativeLeverage').value=r.max_cumulative_leverage||1;$('allowedSymbols').value=(r.allowed_symbols||[]).join(',');if($('saveAccountBtn'))$('saveAccountBtn').textContent=Number(id)>0?'保存修改':'新增账号'} function resetForm(){state.selectedId=0;state.overview=null;renderAccounts();fillAccountForm(0);renderKpis();renderOverview();document.querySelectorAll('.tab').forEach(function(x){x.classList.remove('active')});document.querySelectorAll('.tab-panel').forEach(function(x){x.classList.remove('active')});document.querySelector('.tab[onclick*="config"]').classList.add('active');$('configPane').classList.add('active')} function info(k,v){return '
'+esc(k)+''+esc(v)+'
'} -function renderOverview(){var a=selectedAccountObj(),o=state.overview||{},risk=o.risk||{},errors=o.errors||[],cache=o.exchange_cache||{};if($('exchangeCacheNote'))$('exchangeCacheNote').textContent=cache.loaded?(cache.cached?'交易所数据来自缓存':'交易所数据已刷新'):(cache.reason||'默认显示本地缓存,手动刷新才请求交易所');$('accountInfo').innerHTML=[info('账号状态',a.status||'--'),info('API Key 变量',a.api_key_env||'--'),info('每单保证金上限',fmt(risk.max_order_margin_usdt,2)+' USDT'),info('单币杠杆上限',fmt(risk.max_symbol_leverage,2)+'x'),info('累计杠杆上限',fmt(risk.max_cumulative_leverage,2)+'x'),info('允许交易币种',risk.symbol_policy==='all'?'全部币种':(risk.allowed_symbols||[]).join(', '))].join('')+(errors.length?'
账户数据读取异常:'+esc(errors[0])+'
':'')+(!cache.loaded&&a.status==='enabled'?'
为避免页面打开被 Binance API 拖慢,余额、持仓和订单默认不自动刷新。点击“刷新交易所数据”后读取。
':'');renderPositions();renderOpenOrders();renderOrderHistory();renderEvents()} +function renderOverview(){var a=selectedAccountObj(),o=state.overview||{},risk=o.risk||{},errors=o.errors||[],cache=o.exchange_cache||{},syncText=cache.synced_at?(' · 同步 '+time(cache.synced_at)):''; +if($('exchangeCacheNote'))$('exchangeCacheNote').textContent=cache.loaded?(cache.source==='database'?'读取数据库快照'+syncText:'交易所数据已同步'+syncText):(cache.reason||'等待后台同步生成账户快照'); +$('accountInfo').innerHTML=[info('账号状态',a.status||'--'),info('API Key 变量',a.api_key_env||'--'),info('每单保证金上限',fmt(risk.max_order_margin_usdt,2)+' USDT'),info('单币杠杆上限',fmt(risk.max_symbol_leverage,2)+'x'),info('累计杠杆上限',fmt(risk.max_cumulative_leverage,2)+'x'),info('允许交易币种',risk.symbol_policy==='all'?'全部币种':(risk.allowed_symbols||[]).join(', '))].join('')+(errors.length?'
账户数据读取异常:'+esc(errors[0])+'
':'')+(!cache.loaded&&a.status==='enabled'?'
后台实盘同步还没有生成该账号的账户快照。可以等待调度器下一轮同步,或点击“立即同步”。
':'');renderPositions();renderOpenOrders();renderOrderHistory();renderEvents()} function table(headers,rows,empty){if(!rows.length)return '
'+esc(empty||'暂无数据')+'
';return ''+headers.map(function(h){return ''}).join('')+''+rows.join('')+'
'+esc(h)+'
'} function renderPositions(){var rows=(state.overview?.positions||[]).map(function(x){var lev=Number(x.leverage||0);return ''+esc(x.symbol)+''+badge(x.side_label||sideText(x.side))+''+fmt(x.contracts,6)+''+fmt(x.position_value_usdt,2)+' U'+fmt(x.entry_price,6)+''+fmt(x.mark_price,6)+''+fmt(x.unrealized_pnl,4)+''+fmt(lev,2)+'x'});$('positions').innerHTML=table(['币种','方向','数量','仓位价值','开仓价','标记价','未实现盈亏','杠杆'],rows,'当前账号暂无持仓')} function renderOpenOrders(){var rows=(state.overview?.open_orders||[]).map(function(x){return ''+time(x.timestamp)+''+esc(x.symbol)+''+esc(x.type)+''+badge(sideText(x.side))+''+fmt(x.price,8)+''+fmt(x.amount,6)+''+badge(x.status)+''});$('openOrders').innerHTML=table(['时间','币种','类型','方向','价格','数量','状态'],rows,'当前账号暂无挂单')} @@ -131,7 +133,7 @@ async function saveAccount(){var allowed=String($('allowedSymbols').value||'').s async function deleteAccount(){if(!state.selectedId){alert('请先选择要删除的账号');return}var acct=selectedAccountObj();if(!confirm('确认删除账号配置:'+(acct.account_code||state.selectedId)+'?\\n历史订单和调用日志会保留。'))return;var resp=await fetch('/api/live-trading/accounts/'+state.selectedId,{method:'DELETE'});var d=await resp.json().catch(function(){return {}});if(!resp.ok){alert('删除失败:'+(d.detail||'unknown_error'));return}state.selectedId=0;await loadAll()} async function runSmoke(){if(!state.selectedId){alert('请先选择账号');return}var acct=selectedAccountObj(),risk=acct.risk_config||{},lev=Number(risk.max_symbol_leverage||1),margin=Number($('smokeMargin').value||risk.max_order_margin_usdt||10),notional=margin*Math.max(1,lev);var btn=$('smokeBtn');btn.disabled=true;btn.textContent='验收中...';try{var resp=await fetch('/api/live-trading/smoke/binance',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({account_id:state.selectedId,symbol:$('smokeSymbol').value,notional_usdt:notional,leverage:lev})});var d=await resp.json();if(!resp.ok||d.detail){alert('接口验收失败:'+(d.detail||JSON.stringify(d).slice(0,220)))}await loadAll()}catch(e){alert('接口验收请求失败:'+e.message)}finally{btn.disabled=false;btn.textContent='开始验收'}} async function loadOverview(refresh){if(!state.selectedId){state.overview=null;renderAll();return}state.overview=await (await fetch('/api/live-trading/accounts/'+state.selectedId+'/overview?refresh='+(refresh?1:0)+'&_ts='+Date.now(),{cache:'no-store'})).json();renderAll()} -async function refreshExchangeData(){if(!state.selectedId){alert('请先选择账号');return}var btn=$('refreshExchangeBtn');if(btn){btn.disabled=true;btn.textContent='刷新中...'}try{await loadOverview(true)}finally{if(btn){btn.disabled=false;btn.textContent='刷新交易所数据'}}} +async function refreshExchangeData(){if(!state.selectedId){alert('请先选择账号');return}var btn=$('refreshExchangeBtn');if(btn){btn.disabled=true;btn.textContent='同步中...'}try{await loadOverview(true)}finally{if(btn){btn.disabled=false;btn.textContent='立即同步'}}} async function loadAll(){try{var s=await (await fetch('/api/live-trading/summary?_ts='+Date.now(),{cache:'no-store'})).json();var a=await (await fetch('/api/live-trading/accounts?_ts='+Date.now(),{cache:'no-store'})).json();var e=await (await fetch('/api/live-trading/events?limit=100&_ts='+Date.now(),{cache:'no-store'})).json();state.summary=s;state.accounts=a.items||[];state.events=e.items||[];if(!state.selectedId&&state.accounts[0])state.selectedId=state.accounts[0].id;await loadOverview()}catch(e){$('kpis').innerHTML='
状态加载失败'+esc(e.message)+'
'}} loadAll(); diff --git a/tests/conftest.py b/tests/conftest.py index 0576a17..f73044a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -77,6 +77,7 @@ _ID_TABLES = { "paper_orders", "paper_trades", "paper_trade_events", + "live_account_snapshots", "live_trade_accounts", "live_order_intents", "live_order_events", diff --git a/tests/test_live_trading.py b/tests/test_live_trading.py index 05311de..9f4a535 100644 --- a/tests/test_live_trading.py +++ b/tests/test_live_trading.py @@ -5,6 +5,7 @@ from app.db.live_trading import ( create_live_order_intents_for_accounts, delete_live_account, get_live_account, + get_live_account_snapshot, list_live_accounts, list_live_order_intents, upsert_live_account, @@ -14,7 +15,7 @@ from app.integrations.binance_live import build_binance_client from app.services.live_trading_account import get_live_account_overview from app.services import live_trading_account from app.services.live_trading_smoke import run_binance_testnet_smoke -from app.services.live_trading_sync import sync_paper_trade_to_live +from app.services.live_trading_sync import run_live_trading_sync, sync_paper_trade_to_live from app.web import web_server from app.db.schema import get_conn @@ -174,6 +175,46 @@ def test_live_account_overview_refresh_compacts_position_value_side_and_leverage assert pos["leverage_source"] == "account_config" +def test_live_account_overview_refresh_persists_database_snapshot_and_reads_without_exchange(monkeypatch): + account = upsert_live_account( + account_code="binance_snapshot_cache", + status="enabled", + risk_config={"max_order_margin_usdt": 10, "max_symbol_leverage": 2, "allowed_symbols": ["BTC/USDT"]}, + ) + + class Client: + def load_markets(self): + return {} + + def fetch_balance(self): + return {"total": {"USDT": 1234}, "free": {"USDT": 1200}, "used": {"USDT": 34}} + + def fetch_positions(self, symbols=None): + return [] + + def fetch_open_orders(self, symbol=None): + return [] + + def fetch_orders(self, symbol=None, limit=30): + return [] + + refreshed = get_live_account_overview(account["id"], refresh=True, client_factory=lambda account: Client()) + snapshot = get_live_account_snapshot(account["id"]) + + def fail_build(*args, **kwargs): + raise AssertionError("database snapshot read must not call exchange") + + monkeypatch.setattr(live_trading_account, "build_binance_client", fail_build) + cached = get_live_account_overview(account["id"], refresh=False) + + assert refreshed["balance"]["usdt"]["total"] == 1234 + assert snapshot["status"] == "ok" + assert snapshot["snapshot"]["balance"]["usdt"]["total"] == 1234 + assert cached["balance"]["usdt"]["total"] == 1234 + assert cached["exchange_cache"]["source"] == "database" + assert cached["exchange_cache"]["cached"] is True + + def test_live_account_overview_fetches_order_history_per_symbol(): account = upsert_live_account( account_code="binance_overview_orders_by_symbol", @@ -496,6 +537,39 @@ def test_paper_trade_sync_executes_scaled_live_order_once(): assert again["items"][0]["reason"] == "already_synced" +def test_live_trading_sync_job_refreshes_snapshots_and_syncs_open_paper_trade(): + set_config("system", "live_trading", { + "enabled": True, + "execution_mode": "exchange_api", + "require_human_approval": False, + "exchange": "binance", + "market_type": "um_futures", + "testnet": True, + "risk": {"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "allowed_symbols": []}, + }, source="test") + account = upsert_live_account( + account_code="binance_scheduler_sync", + status="enabled", + risk_config={"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "max_cumulative_leverage": 5, "allowed_symbols": []}, + ) + trade = _insert_paper_trade(symbol="BTC/USDT") + fake = _FakeBinanceClient() + fake.fetch_balance = lambda: {"total": {"USDT": 1000}, "free": {"USDT": 900}, "used": {"USDT": 100}} + fake.fetch_open_orders = lambda symbol=None: [] + fake.fetch_orders = lambda symbol=None, limit=30: [] + + result = run_live_trading_sync(limit=20, execute=True, client_factory=lambda acct: fake) + snapshot = get_live_account_snapshot(account["id"]) + intents = list_live_order_intents(account_id=account["id"])["items"] + + assert result["ok"] is True + assert result["snapshots"]["ok_count"] == 1 + assert result["paper_sync"]["processed_count"] == 1 + assert snapshot["snapshot"]["balance"]["usdt"]["total"] == 1000 + assert intents[0]["paper_trade_id"] == trade["id"] + assert intents[0]["status"] == "submitted" + + def test_binance_testnet_smoke_covers_market_limit_cancel_tp_sl_interfaces(): account = upsert_live_account( account_code="binance_testnet", diff --git a/tests/test_scheduler_control.py b/tests/test_scheduler_control.py index 443560e..e17925d 100644 --- a/tests/test_scheduler_control.py +++ b/tests/test_scheduler_control.py @@ -29,6 +29,8 @@ def test_scheduler_tables_seed_defaults(monkeypatch, tmp_path): assert jobs["confirm"]["lock_group"] == "recommendation_write" assert jobs["tracker"]["every_seconds"] == 180 assert jobs["paper-trader"]["lock_group"] == "paper_trading_write" + assert jobs["live-trading-sync"]["command"] == "live-trading-sync" + assert jobs["live-trading-sync"]["lock_group"] == "live_trading_write" assert "onchain" not in jobs