From 1e4d2e39d90f9ba1c60e2aef8d8240115365a0e4 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Mon, 8 Jun 2026 23:24:51 +0800 Subject: [PATCH] 1 --- app/services/price_streamer.py | 30 +++++++++++++++++++++++------- tests/test_price_streamer.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/app/services/price_streamer.py b/app/services/price_streamer.py index 4b845b8..25da145 100644 --- a/app/services/price_streamer.py +++ b/app/services/price_streamer.py @@ -91,6 +91,7 @@ def _load_pending_paper_order_recs() -> list[dict]: SELECT po.recommendation_id AS id, po.symbol, + po.side, po.target_price AS entry_price, po.stop_loss, po.tp1, @@ -98,6 +99,10 @@ def _load_pending_paper_order_recs() -> list[dict]: po.source_status AS execution_status, po.source_action AS action_status, po.strategy_version, + po.strategy_code, + po.strategy_signal_id, + po.strategy_snapshot_json, + po.factor_roles_json, po.entry_plan_snapshot_json AS entry_plan_json FROM paper_orders po WHERE po.status='pending' @@ -151,24 +156,35 @@ def load_stream_targets(limit: int | None = None, cfg: dict | None = None) -> di max_symbols = max(1, _safe_int(limit or cfg.get("max_stream_symbols"), 200)) targets: dict[str, dict] = {} - if cfg.get("include_actionable_recommendations", True): - for rec in get_active_recommendations_deduped(actionable_only=True, limit=max_symbols, with_meta=False): - symbol = str(rec.get("symbol") or "").strip().upper() - if symbol: - targets[symbol] = dict(rec) - + # Execution state must never be crowded out by a large watch/recommendation + # set. Pending orders and open positions drive fills, TP/SL, trailing stops, + # and live protection sync; recommendations only use the remaining capacity. if cfg.get("include_open_paper_trades", True): for rec in _load_pending_paper_order_recs(): symbol = str(rec.get("symbol") or "").strip().upper() if symbol: targets[symbol] = rec + if len(targets) >= max_symbols: + return targets for rec in _load_open_paper_trade_recs(): symbol = str(rec.get("symbol") or "").strip().upper() if symbol: targets[symbol] = rec + if len(targets) >= max_symbols: + return targets - return dict(list(targets.items())[:max_symbols]) + if cfg.get("include_actionable_recommendations", True): + remaining = max(0, max_symbols - len(targets)) + if remaining > 0: + for rec in get_active_recommendations_deduped(actionable_only=True, limit=remaining, with_meta=False): + symbol = str(rec.get("symbol") or "").strip().upper() + if symbol and symbol not in targets: + targets[symbol] = dict(rec) + if len(targets) >= max_symbols: + break + + return targets def handle_price_tick(symbol: str, price: float, targets: dict[str, dict], event_time: str | None = None, cfg: dict | None = None) -> dict: diff --git a/tests/test_price_streamer.py b/tests/test_price_streamer.py index fa0f35e..b111031 100644 --- a/tests/test_price_streamer.py +++ b/tests/test_price_streamer.py @@ -268,6 +268,38 @@ def test_price_streamer_prioritizes_pending_order_over_same_symbol_recommendatio assert list_paper_trades()["items"][0]["recommendation_id"] == pending_id +def test_price_streamer_subscription_prioritizes_execution_symbols(monkeypatch): + monkeypatch.setattr( + price_streamer, + "_load_pending_paper_order_recs", + lambda: [{"id": 1, "symbol": "PUMP/USDT", "side": "long", "entry_price": 0.001636}], + ) + monkeypatch.setattr( + price_streamer, + "_load_open_paper_trade_recs", + lambda: [{"id": 2, "symbol": "WLD/USDT", "side": "long", "entry_price": 0.5}], + ) + monkeypatch.setattr( + price_streamer, + "get_active_recommendations_deduped", + lambda **kwargs: [ + {"id": 3, "symbol": "AAA/USDT"}, + {"id": 4, "symbol": "BBB/USDT"}, + {"id": 5, "symbol": "CCC/USDT"}, + ], + ) + + targets = price_streamer.load_stream_targets( + limit=2, + cfg={ + "include_actionable_recommendations": True, + "include_open_paper_trades": True, + }, + ) + + assert list(targets.keys()) == ["PUMP/USDT", "WLD/USDT"] + + def test_price_streamer_builds_binance_combined_stream_url(): url = price_streamer._stream_url(["BTC/USDT", "ETH/USDT"], {"stream_url": "wss://example.test/stream"})