This commit is contained in:
aaron 2026-06-08 00:02:01 +08:00
parent db2080a8ef
commit 458ef4002f
5 changed files with 307 additions and 2 deletions

View File

@ -100,6 +100,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组
8. `app/db/live_trading.py` / `app/web/routes_live_trading.py`
负责实盘控制台:多交易所/多 API 账户配置、账号级风控、交易所接口验收和执行审计事件。页面不再使用“订单意图”作为产品概念,也不区分 Demo/正式环境,实际环境由 endpoint/API key 配置决定。
实盘控制台页面默认只读取 PostgreSQL 中的账户快照,不应在首屏加载时直接阻塞调用交易所 API。`live-trading-sync` 调度任务负责定时同步余额、持仓、挂单、订单历史到 `live_account_snapshots`,并按配置把策略交易 open 仓位同步到实盘账户;手动“立即同步”只是强制刷新同一份 DB 快照。
实盘同步不能只做开仓:已提交到交易所的实盘仓位必须持续跟随 paper trading 的保护状态。paper 移动止盈/保护价上移时要替换实盘止损保护单paper 平仓时,要撤保护单并用 reduce-only 市价单同步实盘平仓paper 挂单取消/过期时,相关 live entry intent 也必须撤单或标记取消,并写入 `live_order_events`
9. `app/services/review_engine.py`
负责复盘与策略自迭代,包括信号绩效、漏选复盘、规则候选、版本演进。
@ -411,6 +412,7 @@ docker compose exec alphax-web python -m app.cli live-trading-sync --limit 20
```
该命令会先同步所有启用账号的交易所快照到 `live_account_snapshots`,再根据 `live_trading` 配置把未同步的策略交易 open 仓位写入/提交实盘执行。页面读取快照,不应为了展示余额、持仓、挂单而直接访问交易所。
同一个同步入口还会执行实盘保护同步:同步 paper trailing stop 到实盘 stop order同步 paper 平仓到实盘 reduce-only 平仓,并同步 paper 挂单取消到实盘撤单。
### 8.3 测试与校验

View File

@ -56,6 +56,7 @@ def build_parser():
live_sync.add_argument("--no-execute", action="store_true", help="只创建/检查同步意图,不提交交易所")
live_sync.add_argument("--skip-snapshots", action="store_true", help="跳过账户余额/持仓/订单快照同步")
live_sync.add_argument("--skip-paper-sync", action="store_true", help="跳过策略交易到实盘同步")
live_sync.add_argument("--skip-protection-sync", action="store_true", help="跳过实盘保护单/平仓/撤单同步")
repair_strategy = subparsers.add_parser("repair-strategy-direction", help="修复策略方向与交易方向不一致的推荐数据")
repair_strategy.add_argument("--limit", type=int, default=500, help="最多扫描的 recommendation 数量")
@ -138,6 +139,7 @@ def main():
execute=not args.no_execute,
sync_snapshots=not args.skip_snapshots,
sync_paper=not args.skip_paper_sync,
sync_protection=not args.skip_protection_sync,
)
print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2, default=str))
return result

View File

@ -599,6 +599,7 @@ def update_live_order_intent(intent_id: int, **fields) -> dict:
intent_id = _safe_int(intent_id)
allowed = {
"status", "reason", "quantity", "price", "exchange_order_id",
"stop_loss", "take_profit",
"response_json", "submitted_at", "finished_at", "updated_at",
}
updates = []

View File

@ -108,6 +108,49 @@ def _position_notional(position: dict) -> float:
return abs(_safe_float(position.get("notional") or info.get("notional")))
def _order_identifier(order: dict | None) -> dict:
data = order if isinstance(order, dict) else {}
return {
"id": str(data.get("id") or data.get("orderId") or data.get("algoId") or ""),
"algo_id": str(data.get("algoId") or data.get("id") or ""),
"client_algo_id": str(data.get("clientAlgoId") or data.get("clientOrderId") or ""),
}
def _protection_orders(intent: dict) -> dict:
response = intent.get("response") if isinstance(intent.get("response"), dict) else {}
return {
"stop_loss_order": response.get("stop_loss_order") if isinstance(response.get("stop_loss_order"), dict) else {},
"take_profit_order": response.get("take_profit_order") if isinstance(response.get("take_profit_order"), dict) else {},
"market_order": response.get("market_order") if isinstance(response.get("market_order"), dict) else {},
"live_protection": response.get("live_protection") if isinstance(response.get("live_protection"), dict) else {},
"raw": response,
}
def _cancel_protection_orders(client, intent: dict, keys: tuple[str, ...] = ("stop_loss_order", "take_profit_order")) -> list[dict]:
canceled = []
orders = _protection_orders(intent)
for key in keys:
ident = _order_identifier(orders.get(key))
try:
if ident["algo_id"] and hasattr(client, "cancel_algo_order"):
canceled.append({"key": key, "result": client.cancel_algo_order(algo_id=ident["algo_id"])})
elif ident["client_algo_id"] and hasattr(client, "cancel_algo_order"):
canceled.append({"key": key, "result": client.cancel_algo_order(client_algo_id=ident["client_algo_id"])})
elif ident["id"]:
canceled.append({"key": key, "result": client.cancel_order(ident["id"], intent.get("symbol"))})
except Exception as exc:
canceled.append({"key": key, "error": str(exc)})
return canceled
def _merge_intent_response(intent: dict, patch: dict) -> dict:
response = intent.get("response") if isinstance(intent.get("response"), dict) else {}
merged = {**response, **(patch or {})}
return update_live_order_intent(intent["id"], response_json=merged, updated_at=_now())
def _check_live_cumulative_leverage(client, account: dict, additional_notional: float) -> tuple[bool, dict]:
risk = _risk_for_account(account)
cap = _safe_float(risk.get("max_cumulative_leverage"), 0)
@ -282,12 +325,178 @@ def sync_open_paper_trades_to_live(*, limit: int = 20, execute: bool = True, cli
}
def _submitted_live_intents(limit: int = 100) -> list[dict]:
conn = get_conn()
try:
rows = conn.execute(
"""
SELECT loi.*, pt.status AS paper_status, pt.trailing_stop AS paper_trailing_stop,
pt.stop_loss AS paper_stop_loss, pt.tp1 AS paper_tp1, pt.current_price AS paper_current_price,
pt.closed_at AS paper_closed_at, pt.exit_reason AS paper_exit_reason
FROM live_order_intents loi
LEFT JOIN paper_trades pt ON pt.id=loi.paper_trade_id
WHERE loi.source_type='paper_trade_sync'
AND loi.status='submitted'
ORDER BY loi.updated_at ASC, loi.id ASC
LIMIT %s
""",
(max(1, min(_safe_int(limit, 100), 300)),),
).fetchall()
finally:
conn.close()
return [_row(r) for r in rows]
def _paper_canceled_live_order_intents(limit: int = 100) -> list[dict]:
conn = get_conn()
try:
rows = conn.execute(
"""
SELECT loi.*, po.status AS paper_order_status, po.cancel_reason AS paper_order_cancel_reason
FROM live_order_intents loi
JOIN paper_orders po ON po.id=loi.paper_order_id
WHERE loi.paper_order_id > 0
AND loi.status IN ('prepared','submitted','submitting')
AND po.status IN ('canceled','expired','rejected')
ORDER BY loi.updated_at ASC, loi.id ASC
LIMIT %s
""",
(max(1, min(_safe_int(limit, 100), 300)),),
).fetchall()
finally:
conn.close()
return [_row(r) for r in rows]
def _sync_live_stop_to_paper(intent: dict, account: dict, *, client=None) -> dict:
paper_stop = _safe_float(intent.get("paper_trailing_stop") or intent.get("paper_stop_loss") or intent.get("stop_loss"))
current_stop = _safe_float(intent.get("stop_loss"))
if paper_stop <= 0:
return {"ok": True, "action": "hold", "reason": "missing_paper_stop"}
side = str(intent.get("side") or "long").lower()
tighter = paper_stop < current_stop - 1e-12 if side == "short" else paper_stop > current_stop + 1e-12
if current_stop > 0 and not tighter:
return {"ok": True, "action": "hold", "reason": "stop_not_tighter", "paper_stop": paper_stop, "current_stop": current_stop}
client = client or build_binance_client(account, require_testnet=True)
client.load_markets()
_, close_side = _side_to_exchange(side)
amount = _safe_float(intent.get("quantity"))
if amount <= 0:
return {"ok": False, "action": "error", "reason": "missing_live_quantity"}
canceled = _cancel_protection_orders(client, intent, keys=("stop_loss_order",))
new_stop = client.create_stop_loss_order(intent["symbol"], close_side, amount, paper_stop)
updated = _merge_intent_response(intent, {
"stop_loss_order": new_stop,
"live_protection": {
**(_protection_orders(intent).get("live_protection") or {}),
"stop_loss": paper_stop,
"synced_at": _now(),
"source": "paper_trailing_stop" if _safe_float(intent.get("paper_trailing_stop")) > 0 else "paper_stop_loss",
},
})
update_live_order_intent(intent["id"], stop_loss=paper_stop, updated_at=_now())
record_live_order_event(intent["id"], "live_protection_stop_replace", "submitted", "replace_live_stop_with_paper_protection", {
"previous_stop": current_stop,
"new_stop": paper_stop,
"canceled": canceled,
"new_stop_order": new_stop,
"updated_intent": updated.get("id"),
})
return {"ok": True, "action": "replace_stop", "previous_stop": current_stop, "new_stop": paper_stop}
def _sync_live_close_from_paper(intent: dict, account: dict, *, client=None) -> dict:
client = client or build_binance_client(account, require_testnet=True)
client.load_markets()
_, close_side = _side_to_exchange(intent.get("side"))
amount = _safe_float(intent.get("quantity"))
if amount <= 0:
return {"ok": False, "action": "error", "reason": "missing_live_quantity"}
canceled = _cancel_protection_orders(client, intent)
close_order = client.create_market_order(intent["symbol"], close_side, amount, {
"reduceOnly": True,
"newClientOrderId": f"alphax_close_{intent['id']}_{int(datetime.now().timestamp())}",
})
finished_at = _now()
_merge_intent_response(intent, {
"live_close_order": close_order,
"live_close_reason": intent.get("paper_exit_reason") or "paper_trade_closed",
"protection_canceled_on_close": canceled,
})
update_live_order_intent(
intent["id"],
status="closed",
reason=f"paper_trade_closed:{intent.get('paper_exit_reason') or ''}",
finished_at=finished_at,
updated_at=finished_at,
)
record_live_order_event(intent["id"], "live_sync_close", "closed", "paper_trade_closed_reduce_only_live_close", {
"paper_trade_id": intent.get("paper_trade_id"),
"paper_exit_reason": intent.get("paper_exit_reason"),
"close_order": close_order,
"canceled": canceled,
})
return {"ok": True, "action": "close", "close_order": close_order}
def sync_live_protection_from_paper(*, limit: int = 100, client_factory=None) -> dict:
intents = _submitted_live_intents(limit=limit)
results = []
for intent in intents:
account = get_live_account(intent.get("account_id"))
if not account or account.get("status") != "enabled":
continue
client = client_factory(account) if client_factory else None
try:
if intent.get("paper_status") == "closed":
result = _sync_live_close_from_paper(intent, account, client=client)
elif intent.get("paper_status") == "open":
result = _sync_live_stop_to_paper(intent, account, client=client)
else:
result = {"ok": True, "action": "hold", "reason": f"paper_status_{intent.get('paper_status')}"}
except Exception as exc:
record_live_order_event(intent["id"], "live_protection_error", "error", str(exc), {
"paper_trade_id": intent.get("paper_trade_id"),
"paper_status": intent.get("paper_status"),
})
result = {"ok": False, "action": "error", "reason": str(exc)}
results.append({"intent_id": intent.get("id"), "paper_trade_id": intent.get("paper_trade_id"), **result})
canceled_results = []
for intent in _paper_canceled_live_order_intents(limit=limit):
account = get_live_account(intent.get("account_id"))
client = client_factory(account) if client_factory and account else None
try:
if client is None and account:
client = build_binance_client(account, require_testnet=True)
if client:
client.load_markets()
exchange_order_id = str(intent.get("exchange_order_id") or "")
cancel_result = client.cancel_order(exchange_order_id, intent.get("symbol")) if exchange_order_id else {"skipped": True, "reason": "missing_exchange_order_id"}
else:
cancel_result = {"skipped": True, "reason": "missing_account"}
update_live_order_intent(intent["id"], status="canceled", reason=f"paper_order_{intent.get('paper_order_status')}", updated_at=_now(), finished_at=_now())
record_live_order_event(intent["id"], "live_order_cancel", "canceled", "paper_order_canceled_or_expired", {
"paper_order_id": intent.get("paper_order_id"),
"paper_order_status": intent.get("paper_order_status"),
"paper_order_cancel_reason": intent.get("paper_order_cancel_reason"),
"cancel_result": cancel_result,
})
canceled_results.append({"intent_id": intent.get("id"), "ok": True, "action": "cancel_entry"})
except Exception as exc:
record_live_order_event(intent["id"], "live_order_cancel_error", "error", str(exc), {"paper_order_id": intent.get("paper_order_id")})
canceled_results.append({"intent_id": intent.get("id"), "ok": False, "action": "error", "reason": str(exc)})
ok = all(x.get("ok", True) for x in [*results, *canceled_results])
return {"ok": ok, "processed_count": len(results), "canceled_entry_count": len(canceled_results), "results": results, "canceled_entries": canceled_results}
def run_live_trading_sync(
*,
limit: int = 20,
execute: bool = True,
sync_snapshots: bool = True,
sync_paper: bool = True,
sync_protection: bool = True,
client_factory=None,
) -> dict:
"""Single scheduler entrypoint for live account snapshots and paper sync."""
@ -295,6 +504,7 @@ def run_live_trading_sync(
cfg = live_trading_config()
snapshot_result = {"skipped": True, "reason": "snapshot_sync_disabled"}
paper_result = {"skipped": True, "reason": "paper_sync_disabled"}
protection_result = {"skipped": True, "reason": "live_protection_sync_disabled"}
if sync_snapshots:
snapshot_result = sync_live_account_snapshots(client_factory=client_factory)
if sync_paper:
@ -306,10 +516,13 @@ def run_live_trading_sync(
execute=execute,
client_factory=client_factory,
)
if sync_protection:
protection_result = sync_live_protection_from_paper(limit=limit, client_factory=client_factory)
return {
"ok": bool(snapshot_result.get("ok", True)) and bool(paper_result.get("ok", True)),
"ok": bool(snapshot_result.get("ok", True)) and bool(paper_result.get("ok", True)) and bool(protection_result.get("ok", True)),
"enabled": bool(cfg.get("enabled")),
"execute": execute,
"snapshots": snapshot_result,
"paper_sync": paper_result,
"protection_sync": protection_result,
}

View File

@ -7,6 +7,7 @@ from app.db.live_trading import (
get_live_account,
get_live_account_snapshot,
list_live_accounts,
list_live_order_events,
list_live_order_intents,
upsert_live_account,
)
@ -15,7 +16,7 @@ from app.integrations.binance_live import build_binance_client
from app.services.live_trading_account import get_live_account_overview
from app.services import live_trading_account
from app.services.live_trading_smoke import run_binance_testnet_smoke
from app.services.live_trading_sync import run_live_trading_sync, sync_paper_trade_to_live
from app.services.live_trading_sync import run_live_trading_sync, sync_live_protection_from_paper, sync_paper_trade_to_live
from app.web import web_server
from app.db.schema import get_conn
@ -472,6 +473,10 @@ class _FakeBinanceClient:
self.calls.append(("cancel_order", order_id, symbol))
return {"id": order_id, "status": "canceled"}
def cancel_algo_order(self, *, algo_id=None, client_algo_id=None):
self.calls.append(("cancel_algo_order", algo_id, client_algo_id))
return {"algoId": algo_id, "clientAlgoId": client_algo_id, "status": "canceled"}
def create_stop_loss_order(self, symbol, side, amount, stop_price, params=None):
return self._order("stop_loss", symbol, side, amount, stop_price, params or {})
@ -577,6 +582,88 @@ def test_live_trading_sync_job_refreshes_snapshots_and_syncs_open_paper_trade():
assert intents[0]["status"] == "submitted"
def test_live_protection_sync_replaces_live_stop_when_paper_trailing_moves():
set_config("system", "live_trading", {
"enabled": True,
"execution_mode": "exchange_api",
"require_human_approval": False,
"exchange": "binance",
"market_type": "um_futures",
"testnet": True,
"risk": {"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "allowed_symbols": []},
}, source="test")
account = upsert_live_account(
account_code="binance_protection_move",
status="enabled",
risk_config={"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "max_cumulative_leverage": 5, "allowed_symbols": []},
)
trade = _insert_paper_trade(symbol="BTC/USDT")
fake = _FakeBinanceClient()
fake.fetch_balance = lambda: {"total": {"USDT": 1000}}
sync_paper_trade_to_live(trade["id"], account_ids=[account["id"]], execute=True, client_factory=lambda acct: fake)
conn = get_conn()
try:
conn.execute("UPDATE paper_trades SET trailing_stop=0.105, updated_at='2026-05-22T00:10:00' WHERE id=%s", (trade["id"],))
conn.commit()
finally:
conn.close()
result = sync_live_protection_from_paper(client_factory=lambda acct: fake)
intent = list_live_order_intents(account_id=account["id"])["items"][0]
events = list_live_order_events(limit=20)["items"]
call_names = [c[0] for c in fake.calls]
assert result["ok"] is True
assert result["results"][0]["action"] == "replace_stop"
assert intent["stop_loss"] == 0.105
assert "cancel_algo_order" in call_names
assert call_names.count("stop_loss") == 2
assert any(e["event_type"] == "live_protection_stop_replace" for e in events)
def test_live_protection_sync_closes_live_position_when_paper_trade_closed():
set_config("system", "live_trading", {
"enabled": True,
"execution_mode": "exchange_api",
"require_human_approval": False,
"exchange": "binance",
"market_type": "um_futures",
"testnet": True,
"risk": {"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "allowed_symbols": []},
}, source="test")
account = upsert_live_account(
account_code="binance_protection_close",
status="enabled",
risk_config={"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "max_cumulative_leverage": 5, "allowed_symbols": []},
)
trade = _insert_paper_trade(symbol="BTC/USDT")
fake = _FakeBinanceClient()
fake.fetch_balance = lambda: {"total": {"USDT": 1000}}
sync_paper_trade_to_live(trade["id"], account_ids=[account["id"]], execute=True, client_factory=lambda acct: fake)
conn = get_conn()
try:
conn.execute(
"UPDATE paper_trades SET status='closed', exit_reason='trailing_stop', closed_at='2026-05-22T00:20:00', updated_at='2026-05-22T00:20:00' WHERE id=%s",
(trade["id"],),
)
conn.commit()
finally:
conn.close()
result = sync_live_protection_from_paper(client_factory=lambda acct: fake)
intent = list_live_order_intents(account_id=account["id"])["items"][0]
events = list_live_order_events(limit=20)["items"]
market_orders = [c for c in fake.calls if c[0] == "market_order"]
assert result["ok"] is True
assert result["results"][0]["action"] == "close"
assert intent["status"] == "closed"
assert market_orders[-1][-1]["reduceOnly"] is True
assert any(e["event_type"] == "live_sync_close" for e in events)
def test_binance_testnet_smoke_covers_market_limit_cancel_tp_sl_interfaces():
account = upsert_live_account(
account_code="binance_testnet",