From 458ef4002f26c7caa56356d3b79882d574c6c884 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Mon, 8 Jun 2026 00:02:01 +0800 Subject: [PATCH] 1 --- AGENTS.md | 2 + app/cli.py | 2 + app/db/live_trading.py | 1 + app/services/live_trading_sync.py | 215 +++++++++++++++++++++++++++++- tests/test_live_trading.py | 89 ++++++++++++- 5 files changed, 307 insertions(+), 2 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 7cac156..7c8cc48 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -100,6 +100,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 8. `app/db/live_trading.py` / `app/web/routes_live_trading.py` 负责实盘控制台:多交易所/多 API 账户配置、账号级风控、交易所接口验收和执行审计事件。页面不再使用“订单意图”作为产品概念,也不区分 Demo/正式环境,实际环境由 endpoint/API key 配置决定。 实盘控制台页面默认只读取 PostgreSQL 中的账户快照,不应在首屏加载时直接阻塞调用交易所 API。`live-trading-sync` 调度任务负责定时同步余额、持仓、挂单、订单历史到 `live_account_snapshots`,并按配置把策略交易 open 仓位同步到实盘账户;手动“立即同步”只是强制刷新同一份 DB 快照。 + 实盘同步不能只做开仓:已提交到交易所的实盘仓位必须持续跟随 paper trading 的保护状态。paper 移动止盈/保护价上移时,要替换实盘止损保护单;paper 平仓时,要撤保护单并用 reduce-only 市价单同步实盘平仓;paper 挂单取消/过期时,相关 live entry intent 也必须撤单或标记取消,并写入 `live_order_events`。 9. `app/services/review_engine.py` 负责复盘与策略自迭代,包括信号绩效、漏选复盘、规则候选、版本演进。 @@ -411,6 +412,7 @@ docker compose exec alphax-web python -m app.cli live-trading-sync --limit 20 ``` 该命令会先同步所有启用账号的交易所快照到 `live_account_snapshots`,再根据 `live_trading` 配置把未同步的策略交易 open 仓位写入/提交实盘执行。页面读取快照,不应为了展示余额、持仓、挂单而直接访问交易所。 +同一个同步入口还会执行实盘保护同步:同步 paper trailing stop 到实盘 stop order,同步 paper 平仓到实盘 reduce-only 平仓,并同步 paper 挂单取消到实盘撤单。 ### 8.3 测试与校验 diff --git a/app/cli.py b/app/cli.py index b726d98..e485701 100644 --- a/app/cli.py +++ b/app/cli.py @@ -56,6 +56,7 @@ def build_parser(): live_sync.add_argument("--no-execute", action="store_true", help="只创建/检查同步意图,不提交交易所") live_sync.add_argument("--skip-snapshots", action="store_true", help="跳过账户余额/持仓/订单快照同步") live_sync.add_argument("--skip-paper-sync", action="store_true", help="跳过策略交易到实盘同步") + live_sync.add_argument("--skip-protection-sync", action="store_true", help="跳过实盘保护单/平仓/撤单同步") repair_strategy = subparsers.add_parser("repair-strategy-direction", help="修复策略方向与交易方向不一致的推荐数据") repair_strategy.add_argument("--limit", type=int, default=500, help="最多扫描的 recommendation 数量") @@ -138,6 +139,7 @@ def main(): execute=not args.no_execute, sync_snapshots=not args.skip_snapshots, sync_paper=not args.skip_paper_sync, + sync_protection=not args.skip_protection_sync, ) print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2, default=str)) return result diff --git a/app/db/live_trading.py b/app/db/live_trading.py index 1afa200..8ff0e2b 100644 --- a/app/db/live_trading.py +++ b/app/db/live_trading.py @@ -599,6 +599,7 @@ def update_live_order_intent(intent_id: int, **fields) -> dict: intent_id = _safe_int(intent_id) allowed = { "status", "reason", "quantity", "price", "exchange_order_id", + "stop_loss", "take_profit", "response_json", "submitted_at", "finished_at", "updated_at", } updates = [] diff --git a/app/services/live_trading_sync.py b/app/services/live_trading_sync.py index 75d7548..68328cd 100644 --- a/app/services/live_trading_sync.py +++ b/app/services/live_trading_sync.py @@ -108,6 +108,49 @@ def _position_notional(position: dict) -> float: return abs(_safe_float(position.get("notional") or info.get("notional"))) +def _order_identifier(order: dict | None) -> dict: + data = order if isinstance(order, dict) else {} + return { + "id": str(data.get("id") or data.get("orderId") or data.get("algoId") or ""), + "algo_id": str(data.get("algoId") or data.get("id") or ""), + "client_algo_id": str(data.get("clientAlgoId") or data.get("clientOrderId") or ""), + } + + +def _protection_orders(intent: dict) -> dict: + response = intent.get("response") if isinstance(intent.get("response"), dict) else {} + return { + "stop_loss_order": response.get("stop_loss_order") if isinstance(response.get("stop_loss_order"), dict) else {}, + "take_profit_order": response.get("take_profit_order") if isinstance(response.get("take_profit_order"), dict) else {}, + "market_order": response.get("market_order") if isinstance(response.get("market_order"), dict) else {}, + "live_protection": response.get("live_protection") if isinstance(response.get("live_protection"), dict) else {}, + "raw": response, + } + + +def _cancel_protection_orders(client, intent: dict, keys: tuple[str, ...] = ("stop_loss_order", "take_profit_order")) -> list[dict]: + canceled = [] + orders = _protection_orders(intent) + for key in keys: + ident = _order_identifier(orders.get(key)) + try: + if ident["algo_id"] and hasattr(client, "cancel_algo_order"): + canceled.append({"key": key, "result": client.cancel_algo_order(algo_id=ident["algo_id"])}) + elif ident["client_algo_id"] and hasattr(client, "cancel_algo_order"): + canceled.append({"key": key, "result": client.cancel_algo_order(client_algo_id=ident["client_algo_id"])}) + elif ident["id"]: + canceled.append({"key": key, "result": client.cancel_order(ident["id"], intent.get("symbol"))}) + except Exception as exc: + canceled.append({"key": key, "error": str(exc)}) + return canceled + + +def _merge_intent_response(intent: dict, patch: dict) -> dict: + response = intent.get("response") if isinstance(intent.get("response"), dict) else {} + merged = {**response, **(patch or {})} + return update_live_order_intent(intent["id"], response_json=merged, updated_at=_now()) + + def _check_live_cumulative_leverage(client, account: dict, additional_notional: float) -> tuple[bool, dict]: risk = _risk_for_account(account) cap = _safe_float(risk.get("max_cumulative_leverage"), 0) @@ -282,12 +325,178 @@ def sync_open_paper_trades_to_live(*, limit: int = 20, execute: bool = True, cli } +def _submitted_live_intents(limit: int = 100) -> list[dict]: + conn = get_conn() + try: + rows = conn.execute( + """ + SELECT loi.*, pt.status AS paper_status, pt.trailing_stop AS paper_trailing_stop, + pt.stop_loss AS paper_stop_loss, pt.tp1 AS paper_tp1, pt.current_price AS paper_current_price, + pt.closed_at AS paper_closed_at, pt.exit_reason AS paper_exit_reason + FROM live_order_intents loi + LEFT JOIN paper_trades pt ON pt.id=loi.paper_trade_id + WHERE loi.source_type='paper_trade_sync' + AND loi.status='submitted' + ORDER BY loi.updated_at ASC, loi.id ASC + LIMIT %s + """, + (max(1, min(_safe_int(limit, 100), 300)),), + ).fetchall() + finally: + conn.close() + return [_row(r) for r in rows] + + +def _paper_canceled_live_order_intents(limit: int = 100) -> list[dict]: + conn = get_conn() + try: + rows = conn.execute( + """ + SELECT loi.*, po.status AS paper_order_status, po.cancel_reason AS paper_order_cancel_reason + FROM live_order_intents loi + JOIN paper_orders po ON po.id=loi.paper_order_id + WHERE loi.paper_order_id > 0 + AND loi.status IN ('prepared','submitted','submitting') + AND po.status IN ('canceled','expired','rejected') + ORDER BY loi.updated_at ASC, loi.id ASC + LIMIT %s + """, + (max(1, min(_safe_int(limit, 100), 300)),), + ).fetchall() + finally: + conn.close() + return [_row(r) for r in rows] + + +def _sync_live_stop_to_paper(intent: dict, account: dict, *, client=None) -> dict: + paper_stop = _safe_float(intent.get("paper_trailing_stop") or intent.get("paper_stop_loss") or intent.get("stop_loss")) + current_stop = _safe_float(intent.get("stop_loss")) + if paper_stop <= 0: + return {"ok": True, "action": "hold", "reason": "missing_paper_stop"} + side = str(intent.get("side") or "long").lower() + tighter = paper_stop < current_stop - 1e-12 if side == "short" else paper_stop > current_stop + 1e-12 + if current_stop > 0 and not tighter: + return {"ok": True, "action": "hold", "reason": "stop_not_tighter", "paper_stop": paper_stop, "current_stop": current_stop} + client = client or build_binance_client(account, require_testnet=True) + client.load_markets() + _, close_side = _side_to_exchange(side) + amount = _safe_float(intent.get("quantity")) + if amount <= 0: + return {"ok": False, "action": "error", "reason": "missing_live_quantity"} + canceled = _cancel_protection_orders(client, intent, keys=("stop_loss_order",)) + new_stop = client.create_stop_loss_order(intent["symbol"], close_side, amount, paper_stop) + updated = _merge_intent_response(intent, { + "stop_loss_order": new_stop, + "live_protection": { + **(_protection_orders(intent).get("live_protection") or {}), + "stop_loss": paper_stop, + "synced_at": _now(), + "source": "paper_trailing_stop" if _safe_float(intent.get("paper_trailing_stop")) > 0 else "paper_stop_loss", + }, + }) + update_live_order_intent(intent["id"], stop_loss=paper_stop, updated_at=_now()) + record_live_order_event(intent["id"], "live_protection_stop_replace", "submitted", "replace_live_stop_with_paper_protection", { + "previous_stop": current_stop, + "new_stop": paper_stop, + "canceled": canceled, + "new_stop_order": new_stop, + "updated_intent": updated.get("id"), + }) + return {"ok": True, "action": "replace_stop", "previous_stop": current_stop, "new_stop": paper_stop} + + +def _sync_live_close_from_paper(intent: dict, account: dict, *, client=None) -> dict: + client = client or build_binance_client(account, require_testnet=True) + client.load_markets() + _, close_side = _side_to_exchange(intent.get("side")) + amount = _safe_float(intent.get("quantity")) + if amount <= 0: + return {"ok": False, "action": "error", "reason": "missing_live_quantity"} + canceled = _cancel_protection_orders(client, intent) + close_order = client.create_market_order(intent["symbol"], close_side, amount, { + "reduceOnly": True, + "newClientOrderId": f"alphax_close_{intent['id']}_{int(datetime.now().timestamp())}", + }) + finished_at = _now() + _merge_intent_response(intent, { + "live_close_order": close_order, + "live_close_reason": intent.get("paper_exit_reason") or "paper_trade_closed", + "protection_canceled_on_close": canceled, + }) + update_live_order_intent( + intent["id"], + status="closed", + reason=f"paper_trade_closed:{intent.get('paper_exit_reason') or ''}", + finished_at=finished_at, + updated_at=finished_at, + ) + record_live_order_event(intent["id"], "live_sync_close", "closed", "paper_trade_closed_reduce_only_live_close", { + "paper_trade_id": intent.get("paper_trade_id"), + "paper_exit_reason": intent.get("paper_exit_reason"), + "close_order": close_order, + "canceled": canceled, + }) + return {"ok": True, "action": "close", "close_order": close_order} + + +def sync_live_protection_from_paper(*, limit: int = 100, client_factory=None) -> dict: + intents = _submitted_live_intents(limit=limit) + results = [] + for intent in intents: + account = get_live_account(intent.get("account_id")) + if not account or account.get("status") != "enabled": + continue + client = client_factory(account) if client_factory else None + try: + if intent.get("paper_status") == "closed": + result = _sync_live_close_from_paper(intent, account, client=client) + elif intent.get("paper_status") == "open": + result = _sync_live_stop_to_paper(intent, account, client=client) + else: + result = {"ok": True, "action": "hold", "reason": f"paper_status_{intent.get('paper_status')}"} + except Exception as exc: + record_live_order_event(intent["id"], "live_protection_error", "error", str(exc), { + "paper_trade_id": intent.get("paper_trade_id"), + "paper_status": intent.get("paper_status"), + }) + result = {"ok": False, "action": "error", "reason": str(exc)} + results.append({"intent_id": intent.get("id"), "paper_trade_id": intent.get("paper_trade_id"), **result}) + + canceled_results = [] + for intent in _paper_canceled_live_order_intents(limit=limit): + account = get_live_account(intent.get("account_id")) + client = client_factory(account) if client_factory and account else None + try: + if client is None and account: + client = build_binance_client(account, require_testnet=True) + if client: + client.load_markets() + exchange_order_id = str(intent.get("exchange_order_id") or "") + cancel_result = client.cancel_order(exchange_order_id, intent.get("symbol")) if exchange_order_id else {"skipped": True, "reason": "missing_exchange_order_id"} + else: + cancel_result = {"skipped": True, "reason": "missing_account"} + update_live_order_intent(intent["id"], status="canceled", reason=f"paper_order_{intent.get('paper_order_status')}", updated_at=_now(), finished_at=_now()) + record_live_order_event(intent["id"], "live_order_cancel", "canceled", "paper_order_canceled_or_expired", { + "paper_order_id": intent.get("paper_order_id"), + "paper_order_status": intent.get("paper_order_status"), + "paper_order_cancel_reason": intent.get("paper_order_cancel_reason"), + "cancel_result": cancel_result, + }) + canceled_results.append({"intent_id": intent.get("id"), "ok": True, "action": "cancel_entry"}) + except Exception as exc: + record_live_order_event(intent["id"], "live_order_cancel_error", "error", str(exc), {"paper_order_id": intent.get("paper_order_id")}) + canceled_results.append({"intent_id": intent.get("id"), "ok": False, "action": "error", "reason": str(exc)}) + ok = all(x.get("ok", True) for x in [*results, *canceled_results]) + return {"ok": ok, "processed_count": len(results), "canceled_entry_count": len(canceled_results), "results": results, "canceled_entries": canceled_results} + + def run_live_trading_sync( *, limit: int = 20, execute: bool = True, sync_snapshots: bool = True, sync_paper: bool = True, + sync_protection: bool = True, client_factory=None, ) -> dict: """Single scheduler entrypoint for live account snapshots and paper sync.""" @@ -295,6 +504,7 @@ def run_live_trading_sync( cfg = live_trading_config() snapshot_result = {"skipped": True, "reason": "snapshot_sync_disabled"} paper_result = {"skipped": True, "reason": "paper_sync_disabled"} + protection_result = {"skipped": True, "reason": "live_protection_sync_disabled"} if sync_snapshots: snapshot_result = sync_live_account_snapshots(client_factory=client_factory) if sync_paper: @@ -306,10 +516,13 @@ def run_live_trading_sync( execute=execute, client_factory=client_factory, ) + if sync_protection: + protection_result = sync_live_protection_from_paper(limit=limit, client_factory=client_factory) return { - "ok": bool(snapshot_result.get("ok", True)) and bool(paper_result.get("ok", True)), + "ok": bool(snapshot_result.get("ok", True)) and bool(paper_result.get("ok", True)) and bool(protection_result.get("ok", True)), "enabled": bool(cfg.get("enabled")), "execute": execute, "snapshots": snapshot_result, "paper_sync": paper_result, + "protection_sync": protection_result, } diff --git a/tests/test_live_trading.py b/tests/test_live_trading.py index 95d54c1..0e54637 100644 --- a/tests/test_live_trading.py +++ b/tests/test_live_trading.py @@ -7,6 +7,7 @@ from app.db.live_trading import ( get_live_account, get_live_account_snapshot, list_live_accounts, + list_live_order_events, list_live_order_intents, upsert_live_account, ) @@ -15,7 +16,7 @@ from app.integrations.binance_live import build_binance_client from app.services.live_trading_account import get_live_account_overview from app.services import live_trading_account from app.services.live_trading_smoke import run_binance_testnet_smoke -from app.services.live_trading_sync import run_live_trading_sync, sync_paper_trade_to_live +from app.services.live_trading_sync import run_live_trading_sync, sync_live_protection_from_paper, sync_paper_trade_to_live from app.web import web_server from app.db.schema import get_conn @@ -472,6 +473,10 @@ class _FakeBinanceClient: self.calls.append(("cancel_order", order_id, symbol)) return {"id": order_id, "status": "canceled"} + def cancel_algo_order(self, *, algo_id=None, client_algo_id=None): + self.calls.append(("cancel_algo_order", algo_id, client_algo_id)) + return {"algoId": algo_id, "clientAlgoId": client_algo_id, "status": "canceled"} + def create_stop_loss_order(self, symbol, side, amount, stop_price, params=None): return self._order("stop_loss", symbol, side, amount, stop_price, params or {}) @@ -577,6 +582,88 @@ def test_live_trading_sync_job_refreshes_snapshots_and_syncs_open_paper_trade(): assert intents[0]["status"] == "submitted" +def test_live_protection_sync_replaces_live_stop_when_paper_trailing_moves(): + set_config("system", "live_trading", { + "enabled": True, + "execution_mode": "exchange_api", + "require_human_approval": False, + "exchange": "binance", + "market_type": "um_futures", + "testnet": True, + "risk": {"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "allowed_symbols": []}, + }, source="test") + account = upsert_live_account( + account_code="binance_protection_move", + status="enabled", + risk_config={"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "max_cumulative_leverage": 5, "allowed_symbols": []}, + ) + trade = _insert_paper_trade(symbol="BTC/USDT") + fake = _FakeBinanceClient() + fake.fetch_balance = lambda: {"total": {"USDT": 1000}} + sync_paper_trade_to_live(trade["id"], account_ids=[account["id"]], execute=True, client_factory=lambda acct: fake) + + conn = get_conn() + try: + conn.execute("UPDATE paper_trades SET trailing_stop=0.105, updated_at='2026-05-22T00:10:00' WHERE id=%s", (trade["id"],)) + conn.commit() + finally: + conn.close() + + result = sync_live_protection_from_paper(client_factory=lambda acct: fake) + intent = list_live_order_intents(account_id=account["id"])["items"][0] + events = list_live_order_events(limit=20)["items"] + call_names = [c[0] for c in fake.calls] + + assert result["ok"] is True + assert result["results"][0]["action"] == "replace_stop" + assert intent["stop_loss"] == 0.105 + assert "cancel_algo_order" in call_names + assert call_names.count("stop_loss") == 2 + assert any(e["event_type"] == "live_protection_stop_replace" for e in events) + + +def test_live_protection_sync_closes_live_position_when_paper_trade_closed(): + set_config("system", "live_trading", { + "enabled": True, + "execution_mode": "exchange_api", + "require_human_approval": False, + "exchange": "binance", + "market_type": "um_futures", + "testnet": True, + "risk": {"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "allowed_symbols": []}, + }, source="test") + account = upsert_live_account( + account_code="binance_protection_close", + status="enabled", + risk_config={"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "max_cumulative_leverage": 5, "allowed_symbols": []}, + ) + trade = _insert_paper_trade(symbol="BTC/USDT") + fake = _FakeBinanceClient() + fake.fetch_balance = lambda: {"total": {"USDT": 1000}} + sync_paper_trade_to_live(trade["id"], account_ids=[account["id"]], execute=True, client_factory=lambda acct: fake) + + conn = get_conn() + try: + conn.execute( + "UPDATE paper_trades SET status='closed', exit_reason='trailing_stop', closed_at='2026-05-22T00:20:00', updated_at='2026-05-22T00:20:00' WHERE id=%s", + (trade["id"],), + ) + conn.commit() + finally: + conn.close() + + result = sync_live_protection_from_paper(client_factory=lambda acct: fake) + intent = list_live_order_intents(account_id=account["id"])["items"][0] + events = list_live_order_events(limit=20)["items"] + market_orders = [c for c in fake.calls if c[0] == "market_order"] + + assert result["ok"] is True + assert result["results"][0]["action"] == "close" + assert intent["status"] == "closed" + assert market_orders[-1][-1]["reduceOnly"] is True + assert any(e["event_type"] == "live_sync_close" for e in events) + + def test_binance_testnet_smoke_covers_market_limit_cancel_tp_sl_interfaces(): account = upsert_live_account( account_code="binance_testnet",