diff --git a/app/core/global_risk.py b/app/core/global_risk.py index 9aa0b38..fac5f0e 100644 --- a/app/core/global_risk.py +++ b/app/core/global_risk.py @@ -85,9 +85,20 @@ def _directional_market_gate(regime: dict, side: str, base_risk_level: str, base } -def _portfolio_snapshot(conn, account_equity: float, additional_notional: float) -> dict: +def _portfolio_snapshot(conn, account_equity: float, additional_notional: float, exclude_order_id: int = 0) -> dict: open_rows = conn.execute("SELECT notional_usdt, pnl_pct FROM paper_trades WHERE status='open'").fetchall() - pending_notional = _safe_float(conn.execute("SELECT COALESCE(SUM(notional_usdt),0) FROM paper_orders WHERE status='pending'").fetchone()[0]) + exclude_order_id = _safe_int(exclude_order_id) + pending_where = "status='pending'" + pending_params = [] + if exclude_order_id > 0: + pending_where += " AND id<>%s" + pending_params.append(exclude_order_id) + pending_notional = _safe_float( + conn.execute( + f"SELECT COALESCE(SUM(notional_usdt),0) FROM paper_orders WHERE {pending_where}", + tuple(pending_params), + ).fetchone()[0] + ) open_notional = 0.0 unrealized = 0.0 for row in open_rows: @@ -173,7 +184,8 @@ def evaluate_global_risk( overview = get_crypto_market_overview(allow_live_fallback=False) regime = classify_market_regime(overview) account_equity = max(1.0, _safe_float(cfg.get("account_equity_usdt"), 20000.0)) - portfolio = _portfolio_snapshot(conn, account_equity, additional_notional) + exclude_order_id = _safe_int((rec or {}).get("exclude_order_id") or (rec or {}).get("paper_order_id")) + portfolio = _portfolio_snapshot(conn, account_equity, additional_notional, exclude_order_id=exclude_order_id) concentration = _concentration_snapshot(conn, rec) side = _side_from_rec(rec) rec_score = _safe_float((rec or {}).get("rec_score") or (rec or {}).get("score")) diff --git a/app/db/paper_trading.py b/app/db/paper_trading.py index 7bbec0c..a4dfb9b 100644 --- a/app/db/paper_trading.py +++ b/app/db/paper_trading.py @@ -314,6 +314,40 @@ def _strategy_lineage_from_rec(rec: dict) -> dict: } +def _rec_with_order_snapshot(rec: dict, order: dict, fill_price: float | None = None) -> dict: + """Use the executable order snapshot as the trade-time source of truth.""" + merged = dict(rec or {}) + plan = _entry_plan(merged) + order_plan = _loads_json((order or {}).get("entry_plan_snapshot_json"), {}) + if isinstance(order_plan, dict) and order_plan: + plan.update(order_plan) + side = normalize_side((order or {}).get("side") or plan.get("side") or merged.get("side")) + entry_price = _safe_float(fill_price if fill_price is not None else (order or {}).get("target_price")) + plan.update({ + "side": side, + "entry_price": entry_price or _safe_float(plan.get("entry_price") or merged.get("entry_price")), + "stop_loss": _safe_float((order or {}).get("stop_loss") or plan.get("stop_loss") or merged.get("stop_loss")), + "tp1": _safe_float((order or {}).get("tp1") or plan.get("tp1") or plan.get("take_profit_1") or merged.get("tp1")), + "tp2": _safe_float((order or {}).get("tp2") or plan.get("tp2") or plan.get("take_profit_2") or merged.get("tp2")), + }) + merged.update({ + "side": side, + "entry_plan": plan, + "entry_price": plan["entry_price"], + "stop_loss": plan["stop_loss"], + "tp1": plan["tp1"], + "tp2": plan["tp2"], + "strategy_version": (order or {}).get("strategy_version") or merged.get("strategy_version"), + "strategy_code": (order or {}).get("strategy_code") or merged.get("strategy_code"), + "strategy_signal_id": (order or {}).get("strategy_signal_id") or merged.get("strategy_signal_id"), + "strategy_snapshot_json": (order or {}).get("strategy_snapshot_json") or merged.get("strategy_snapshot_json"), + "factor_roles_json": (order or {}).get("factor_roles_json") or merged.get("factor_roles_json"), + "paper_order_id": (order or {}).get("id"), + "exclude_order_id": (order or {}).get("id"), + }) + return merged + + def _strategy_lineage_from_trade_or_order(item: dict) -> dict: code = normalize_strategy_code(item.get("strategy_code")) snapshot = _loads_json(item.get("strategy_snapshot_json"), {}) @@ -1043,10 +1077,11 @@ def _order_payload_from_rec(rec: dict, current_price: float, event_time: str, co def _fill_paper_order(conn, order: dict, rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict: fill_price = _safe_float(order.get("target_price")) or current_price - cfg = _paper_cfg_for_rec(rec, config) - stop_loss = _safe_float(order.get("stop_loss") or _entry_plan(rec).get("stop_loss") or rec.get("stop_loss")) + trade_rec = _rec_with_order_snapshot(rec, order, fill_price) + cfg = _paper_cfg_for_rec(trade_rec, config) + stop_loss = _safe_float(order.get("stop_loss") or _entry_plan(trade_rec).get("stop_loss") or trade_rec.get("stop_loss")) base_notional = _safe_float(order.get("notional_usdt"), default_notional_usdt(cfg)) - global_ok, global_detail = _global_risk_entry_check(conn, rec, base_notional, cfg) + global_ok, global_detail = _global_risk_entry_check(conn, trade_rec, base_notional, cfg) if not global_ok: # 触价后的限价单已经完成“等待成交”阶段。若此刻风控不允许开仓, # 这张挂单必须结束,不能继续 pending 等待下一次风控放行,否则会在 @@ -1062,9 +1097,8 @@ def _fill_paper_order(conn, order: dict, rec: dict, current_price: float, event_ pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, adjusted_notional, event_time, cfg) if not pause_ok: return _cancel_paper_order(conn, order, pause_reason, event_time) - trade_rec = dict(rec) plan = _entry_plan(trade_rec) - plan.setdefault("entry_price", fill_price) + plan["entry_price"] = fill_price if adjusted_notional != base_notional: plan["market_position_sizing"] = { "base_notional_usdt": base_notional, @@ -1634,12 +1668,13 @@ def sync_pending_paper_orders(limit: int = 100, event_time: str = "", config: di "stop_loss": item.get("rec_stop_loss") or item.get("stop_loss"), "tp1": item.get("rec_tp1") or item.get("tp1"), "tp2": item.get("rec_tp2") or item.get("tp2"), - "strategy_version": item.get("rec_strategy_version") or item.get("strategy_version"), - "strategy_code": item.get("rec_strategy_code") or item.get("strategy_code"), - "strategy_signal_id": item.get("rec_strategy_signal_id") or item.get("strategy_signal_id"), - "strategy_snapshot_json": item.get("rec_strategy_snapshot_json") or item.get("strategy_snapshot_json"), - "factor_roles_json": item.get("rec_factor_roles_json") or item.get("factor_roles_json"), - "entry_plan_json": item.get("entry_plan_json") or item.get("entry_plan_snapshot_json"), + "side": item.get("side"), + "strategy_version": item.get("strategy_version") or item.get("rec_strategy_version"), + "strategy_code": item.get("strategy_code") or item.get("rec_strategy_code"), + "strategy_signal_id": item.get("strategy_signal_id") or item.get("rec_strategy_signal_id"), + "strategy_snapshot_json": item.get("strategy_snapshot_json") or item.get("rec_strategy_snapshot_json"), + "factor_roles_json": item.get("factor_roles_json") or item.get("rec_factor_roles_json"), + "entry_plan_json": item.get("entry_plan_snapshot_json") or item.get("entry_plan_json"), "market_context_json": item.get("market_context_json"), "derivatives_context_json": item.get("derivatives_context_json"), "sector_context_json": item.get("sector_context_json"), diff --git a/tests/test_paper_trading.py b/tests/test_paper_trading.py index e7411ab..b43202a 100644 --- a/tests/test_paper_trading.py +++ b/tests/test_paper_trading.py @@ -864,6 +864,71 @@ def test_touched_wait_pullback_order_cancels_when_global_risk_pauses(monkeypatch assert canceled["cancel_reason"] == "risk_paused_at_touch" +def test_touched_order_uses_order_snapshot_side_for_global_risk(monkeypatch, pg_conn): + monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1") + monkeypatch.setenv("ALPHAX_PAPER_GLOBAL_RISK_GATE_ENABLED", "1") + captured = [] + + def fake_global_risk(**kwargs): + captured.append(dict(kwargs.get("rec") or {})) + return { + "allow_new_entries": True, + "decision": "allow", + "risk_level": "medium", + "position_multiplier": 1, + "reasons": ["测试允许成交"], + } + + monkeypatch.setattr("app.db.paper_trading.evaluate_global_risk", fake_global_risk) + altcoin_db.init_db() + pg_conn.execute( + """ + INSERT INTO recommendation ( + id, symbol, rec_time, rec_state, rec_score, entry_price, + status, execution_status, action_status, display_bucket, entry_triggered, + stop_loss, tp1, tp2, strategy_code, entry_plan_json + ) VALUES ( + 301, 'SNAPSHORT/USDT', '2026-05-16T10:00:00', '蓄力', 88, 105, + 'active', 'wait_pullback', '等回踩', 'watch_pool', 0, + 95, 115, 122, 'volume_ignition_1h_v1', %s + ) + """, + (json.dumps({"side": "long", "entry_action": "等回踩", "entry_price": 105, "stop_loss": 95, "tp1": 115, "rr1": 2.0}, ensure_ascii=False),), + ) + pg_conn.execute( + """ + INSERT INTO paper_orders ( + recommendation_id, symbol, side, order_type, status, + source_status, source_action, target_price, current_price_at_create, + notional_usdt, stop_loss, tp1, tp2, strategy_version, strategy_code, + strategy_signal_id, strategy_snapshot_json, factor_roles_json, + entry_plan_snapshot_json, created_at, updated_at, expires_at + ) VALUES ( + 301, 'SNAPSHORT/USDT', 'short', 'limit', 'pending', + 'wait_pullback', '等反抽', 105, 100, + 5000, 110, 95, 90, 'v-test', 'breakdown_retest_short_1h_v1', + 0, %s, '{}', %s, '2026-05-16T10:00:00', '2026-05-16T10:00:00', '2026-05-17T10:00:00' + ) + """, + ( + json.dumps({"strategy_code": "breakdown_retest_short_1h_v1"}, ensure_ascii=False), + json.dumps({"side": "short", "entry_action": "等反抽", "entry_price": 105, "stop_loss": 110, "tp1": 95, "rr1": 2.0}, ensure_ascii=False), + ), + ) + pg_conn.commit() + altcoin_db.update_latest_price_cache("SNAPSHORT/USDT", 106, updated_at="2026-05-16T10:05:00", source="test") + + result = sync_pending_paper_orders(event_time="2026-05-16T10:05:00") + + assert result["filled_count"] == 1 + assert captured + assert all(x.get("side") == "short" for x in captured) + assert all(x.get("strategy_code") == "breakdown_retest_short_1h_v1" for x in captured) + trade = list_paper_trades()["items"][0] + assert trade["side"] == "short" + assert trade["strategy_code"] == "breakdown_retest_short_1h_v1" + + def test_wait_pullback_order_cancels_when_recommendation_invalid(monkeypatch): monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1") altcoin_db.init_db()