This commit is contained in:
aaron 2026-06-02 23:50:41 +08:00
parent 8d8b5c6f15
commit 48e27bacf3
3 changed files with 126 additions and 14 deletions

View File

@ -85,9 +85,20 @@ def _directional_market_gate(regime: dict, side: str, base_risk_level: str, base
}
def _portfolio_snapshot(conn, account_equity: float, additional_notional: float) -> dict:
def _portfolio_snapshot(conn, account_equity: float, additional_notional: float, exclude_order_id: int = 0) -> dict:
open_rows = conn.execute("SELECT notional_usdt, pnl_pct FROM paper_trades WHERE status='open'").fetchall()
pending_notional = _safe_float(conn.execute("SELECT COALESCE(SUM(notional_usdt),0) FROM paper_orders WHERE status='pending'").fetchone()[0])
exclude_order_id = _safe_int(exclude_order_id)
pending_where = "status='pending'"
pending_params = []
if exclude_order_id > 0:
pending_where += " AND id<>%s"
pending_params.append(exclude_order_id)
pending_notional = _safe_float(
conn.execute(
f"SELECT COALESCE(SUM(notional_usdt),0) FROM paper_orders WHERE {pending_where}",
tuple(pending_params),
).fetchone()[0]
)
open_notional = 0.0
unrealized = 0.0
for row in open_rows:
@ -173,7 +184,8 @@ def evaluate_global_risk(
overview = get_crypto_market_overview(allow_live_fallback=False)
regime = classify_market_regime(overview)
account_equity = max(1.0, _safe_float(cfg.get("account_equity_usdt"), 20000.0))
portfolio = _portfolio_snapshot(conn, account_equity, additional_notional)
exclude_order_id = _safe_int((rec or {}).get("exclude_order_id") or (rec or {}).get("paper_order_id"))
portfolio = _portfolio_snapshot(conn, account_equity, additional_notional, exclude_order_id=exclude_order_id)
concentration = _concentration_snapshot(conn, rec)
side = _side_from_rec(rec)
rec_score = _safe_float((rec or {}).get("rec_score") or (rec or {}).get("score"))

View File

@ -314,6 +314,40 @@ def _strategy_lineage_from_rec(rec: dict) -> dict:
}
def _rec_with_order_snapshot(rec: dict, order: dict, fill_price: float | None = None) -> dict:
"""Use the executable order snapshot as the trade-time source of truth."""
merged = dict(rec or {})
plan = _entry_plan(merged)
order_plan = _loads_json((order or {}).get("entry_plan_snapshot_json"), {})
if isinstance(order_plan, dict) and order_plan:
plan.update(order_plan)
side = normalize_side((order or {}).get("side") or plan.get("side") or merged.get("side"))
entry_price = _safe_float(fill_price if fill_price is not None else (order or {}).get("target_price"))
plan.update({
"side": side,
"entry_price": entry_price or _safe_float(plan.get("entry_price") or merged.get("entry_price")),
"stop_loss": _safe_float((order or {}).get("stop_loss") or plan.get("stop_loss") or merged.get("stop_loss")),
"tp1": _safe_float((order or {}).get("tp1") or plan.get("tp1") or plan.get("take_profit_1") or merged.get("tp1")),
"tp2": _safe_float((order or {}).get("tp2") or plan.get("tp2") or plan.get("take_profit_2") or merged.get("tp2")),
})
merged.update({
"side": side,
"entry_plan": plan,
"entry_price": plan["entry_price"],
"stop_loss": plan["stop_loss"],
"tp1": plan["tp1"],
"tp2": plan["tp2"],
"strategy_version": (order or {}).get("strategy_version") or merged.get("strategy_version"),
"strategy_code": (order or {}).get("strategy_code") or merged.get("strategy_code"),
"strategy_signal_id": (order or {}).get("strategy_signal_id") or merged.get("strategy_signal_id"),
"strategy_snapshot_json": (order or {}).get("strategy_snapshot_json") or merged.get("strategy_snapshot_json"),
"factor_roles_json": (order or {}).get("factor_roles_json") or merged.get("factor_roles_json"),
"paper_order_id": (order or {}).get("id"),
"exclude_order_id": (order or {}).get("id"),
})
return merged
def _strategy_lineage_from_trade_or_order(item: dict) -> dict:
code = normalize_strategy_code(item.get("strategy_code"))
snapshot = _loads_json(item.get("strategy_snapshot_json"), {})
@ -1043,10 +1077,11 @@ def _order_payload_from_rec(rec: dict, current_price: float, event_time: str, co
def _fill_paper_order(conn, order: dict, rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict:
fill_price = _safe_float(order.get("target_price")) or current_price
cfg = _paper_cfg_for_rec(rec, config)
stop_loss = _safe_float(order.get("stop_loss") or _entry_plan(rec).get("stop_loss") or rec.get("stop_loss"))
trade_rec = _rec_with_order_snapshot(rec, order, fill_price)
cfg = _paper_cfg_for_rec(trade_rec, config)
stop_loss = _safe_float(order.get("stop_loss") or _entry_plan(trade_rec).get("stop_loss") or trade_rec.get("stop_loss"))
base_notional = _safe_float(order.get("notional_usdt"), default_notional_usdt(cfg))
global_ok, global_detail = _global_risk_entry_check(conn, rec, base_notional, cfg)
global_ok, global_detail = _global_risk_entry_check(conn, trade_rec, base_notional, cfg)
if not global_ok:
# 触价后的限价单已经完成“等待成交”阶段。若此刻风控不允许开仓,
# 这张挂单必须结束,不能继续 pending 等待下一次风控放行,否则会在
@ -1062,9 +1097,8 @@ def _fill_paper_order(conn, order: dict, rec: dict, current_price: float, event_
pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, adjusted_notional, event_time, cfg)
if not pause_ok:
return _cancel_paper_order(conn, order, pause_reason, event_time)
trade_rec = dict(rec)
plan = _entry_plan(trade_rec)
plan.setdefault("entry_price", fill_price)
plan["entry_price"] = fill_price
if adjusted_notional != base_notional:
plan["market_position_sizing"] = {
"base_notional_usdt": base_notional,
@ -1634,12 +1668,13 @@ def sync_pending_paper_orders(limit: int = 100, event_time: str = "", config: di
"stop_loss": item.get("rec_stop_loss") or item.get("stop_loss"),
"tp1": item.get("rec_tp1") or item.get("tp1"),
"tp2": item.get("rec_tp2") or item.get("tp2"),
"strategy_version": item.get("rec_strategy_version") or item.get("strategy_version"),
"strategy_code": item.get("rec_strategy_code") or item.get("strategy_code"),
"strategy_signal_id": item.get("rec_strategy_signal_id") or item.get("strategy_signal_id"),
"strategy_snapshot_json": item.get("rec_strategy_snapshot_json") or item.get("strategy_snapshot_json"),
"factor_roles_json": item.get("rec_factor_roles_json") or item.get("factor_roles_json"),
"entry_plan_json": item.get("entry_plan_json") or item.get("entry_plan_snapshot_json"),
"side": item.get("side"),
"strategy_version": item.get("strategy_version") or item.get("rec_strategy_version"),
"strategy_code": item.get("strategy_code") or item.get("rec_strategy_code"),
"strategy_signal_id": item.get("strategy_signal_id") or item.get("rec_strategy_signal_id"),
"strategy_snapshot_json": item.get("strategy_snapshot_json") or item.get("rec_strategy_snapshot_json"),
"factor_roles_json": item.get("factor_roles_json") or item.get("rec_factor_roles_json"),
"entry_plan_json": item.get("entry_plan_snapshot_json") or item.get("entry_plan_json"),
"market_context_json": item.get("market_context_json"),
"derivatives_context_json": item.get("derivatives_context_json"),
"sector_context_json": item.get("sector_context_json"),

View File

@ -864,6 +864,71 @@ def test_touched_wait_pullback_order_cancels_when_global_risk_pauses(monkeypatch
assert canceled["cancel_reason"] == "risk_paused_at_touch"
def test_touched_order_uses_order_snapshot_side_for_global_risk(monkeypatch, pg_conn):
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
monkeypatch.setenv("ALPHAX_PAPER_GLOBAL_RISK_GATE_ENABLED", "1")
captured = []
def fake_global_risk(**kwargs):
captured.append(dict(kwargs.get("rec") or {}))
return {
"allow_new_entries": True,
"decision": "allow",
"risk_level": "medium",
"position_multiplier": 1,
"reasons": ["测试允许成交"],
}
monkeypatch.setattr("app.db.paper_trading.evaluate_global_risk", fake_global_risk)
altcoin_db.init_db()
pg_conn.execute(
"""
INSERT INTO recommendation (
id, symbol, rec_time, rec_state, rec_score, entry_price,
status, execution_status, action_status, display_bucket, entry_triggered,
stop_loss, tp1, tp2, strategy_code, entry_plan_json
) VALUES (
301, 'SNAPSHORT/USDT', '2026-05-16T10:00:00', '蓄力', 88, 105,
'active', 'wait_pullback', '等回踩', 'watch_pool', 0,
95, 115, 122, 'volume_ignition_1h_v1', %s
)
""",
(json.dumps({"side": "long", "entry_action": "等回踩", "entry_price": 105, "stop_loss": 95, "tp1": 115, "rr1": 2.0}, ensure_ascii=False),),
)
pg_conn.execute(
"""
INSERT INTO paper_orders (
recommendation_id, symbol, side, order_type, status,
source_status, source_action, target_price, current_price_at_create,
notional_usdt, stop_loss, tp1, tp2, strategy_version, strategy_code,
strategy_signal_id, strategy_snapshot_json, factor_roles_json,
entry_plan_snapshot_json, created_at, updated_at, expires_at
) VALUES (
301, 'SNAPSHORT/USDT', 'short', 'limit', 'pending',
'wait_pullback', '等反抽', 105, 100,
5000, 110, 95, 90, 'v-test', 'breakdown_retest_short_1h_v1',
0, %s, '{}', %s, '2026-05-16T10:00:00', '2026-05-16T10:00:00', '2026-05-17T10:00:00'
)
""",
(
json.dumps({"strategy_code": "breakdown_retest_short_1h_v1"}, ensure_ascii=False),
json.dumps({"side": "short", "entry_action": "等反抽", "entry_price": 105, "stop_loss": 110, "tp1": 95, "rr1": 2.0}, ensure_ascii=False),
),
)
pg_conn.commit()
altcoin_db.update_latest_price_cache("SNAPSHORT/USDT", 106, updated_at="2026-05-16T10:05:00", source="test")
result = sync_pending_paper_orders(event_time="2026-05-16T10:05:00")
assert result["filled_count"] == 1
assert captured
assert all(x.get("side") == "short" for x in captured)
assert all(x.get("strategy_code") == "breakdown_retest_short_1h_v1" for x in captured)
trade = list_paper_trades()["items"][0]
assert trade["side"] == "short"
assert trade["strategy_code"] == "breakdown_retest_short_1h_v1"
def test_wait_pullback_order_cancels_when_recommendation_invalid(monkeypatch):
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
altcoin_db.init_db()