From 42da844a80adbcd895601941c7bdba66abedebef Mon Sep 17 00:00:00 2001 From: aaron <> Date: Mon, 8 Jun 2026 09:54:44 +0800 Subject: [PATCH] 1 --- AGENTS.md | 4 ++-- app/db/scheduler_db.py | 2 +- app/services/price_streamer.py | 41 +++++++++++++++++++++++++++++++++- tests/test_price_streamer.py | 38 +++++++++++++++++++++++++++++++ 4 files changed, 81 insertions(+), 4 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 7c8cc48..8d1753a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -92,11 +92,11 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 4. `app/services/event_driven_screener.py` 负责事件/舆情驱动的快速触发检查,是多策略发现层的补充入口。 5. `app/services/price_streamer.py` - 负责实时价格缓存,不等同于完整推荐状态跟踪。 + 负责实时价格缓存和策略交易实时执行触发。挂单触价、已开仓 TP/SL、移动止盈、持仓保护这类依赖最新价的动作,应优先由 `price-streamer` 的 websocket tick 驱动;`paper-trader` 调度只能作为补偿任务,不能作为主要成交时钟。 6. `app/services/price_tracker.py` 负责可执行推荐的价格跟踪、状态迁移和动态风险提示。 7. `app/services/paper_trader.py` - 负责策略交易账本同步和 paper 执行适配。TP/SL、移动止盈、仓位健康、仓位 sizing、账户级风控等可复用交易能力不应长期绑定在 paper trading 层;新增能力优先沉到 `app/core/*` 或独立 execution/risk 模块,再由 paper/live 适配调用。 + 负责策略交易账本补偿同步和 paper 执行适配。它用于处理调度兜底、服务重启后的漏 tick 补账、从可执行推荐创建新持仓/挂单、以及同步 live protection;不要依赖它的 180 秒轮询来完成挂单成交。TP/SL、移动止盈、仓位健康、仓位 sizing、账户级风控等可复用交易能力不应长期绑定在 paper trading 层;新增能力优先沉到 `app/core/*` 或独立 execution/risk 模块,再由 paper/live 适配调用。 8. `app/db/live_trading.py` / `app/web/routes_live_trading.py` 负责实盘控制台:多交易所/多 API 账户配置、账号级风控、交易所接口验收和执行审计事件。页面不再使用“订单意图”作为产品概念,也不区分 Demo/正式环境,实际环境由 endpoint/API key 配置决定。 实盘控制台页面默认只读取 PostgreSQL 中的账户快照,不应在首屏加载时直接阻塞调用交易所 API。`live-trading-sync` 调度任务负责定时同步余额、持仓、挂单、订单历史到 `live_account_snapshots`,并按配置把策略交易 open 仓位同步到实盘账户;手动“立即同步”只是强制刷新同一份 DB 快照。 diff --git a/app/db/scheduler_db.py b/app/db/scheduler_db.py index a63d7b1..8977219 100644 --- a/app/db/scheduler_db.py +++ b/app/db/scheduler_db.py @@ -48,7 +48,7 @@ DEFAULT_JOBS = [ "every_seconds": 180, "initial_delay": 30, "lock_group": "paper_trading_write", - "description": "策略交易账本同步", + "description": "策略交易补偿同步(实时成交由 price-streamer 驱动)", "sort_order": 25, }, { diff --git a/app/services/price_streamer.py b/app/services/price_streamer.py index 0a67a3e..4b845b8 100644 --- a/app/services/price_streamer.py +++ b/app/services/price_streamer.py @@ -109,6 +109,42 @@ def _load_pending_paper_order_recs() -> list[dict]: conn.close() +def _load_pending_paper_order_rec(symbol: str) -> dict | None: + symbol = str(symbol or "").strip().upper() + if not symbol: + return None + conn = get_conn() + try: + row = conn.execute( + """ + SELECT + po.recommendation_id AS id, + po.symbol, + po.side, + po.target_price AS entry_price, + po.stop_loss, + po.tp1, + po.tp2, + po.source_status AS execution_status, + po.source_action AS action_status, + po.strategy_version, + po.strategy_code, + po.strategy_signal_id, + po.strategy_snapshot_json, + po.factor_roles_json, + po.entry_plan_snapshot_json AS entry_plan_json + FROM paper_orders po + WHERE po.status='pending' AND po.symbol=%s + ORDER BY po.created_at ASC, po.id ASC + LIMIT 1 + """, + (symbol,), + ).fetchone() + return dict(row) if row else None + finally: + conn.close() + + def load_stream_targets(limit: int | None = None, cfg: dict | None = None) -> dict[str, dict]: """Return symbol -> recommendation-like payload for websocket updates.""" cfg = cfg or price_streamer_config() @@ -146,7 +182,10 @@ def handle_price_tick(symbol: str, price: float, targets: dict[str, dict], event update_latest_price_cache(symbol, price, updated_at=event_time, source="price_streamer") if not cfg.get("sync_paper_trading", True): return {"updated_price": True, "paper_trading": {"skipped": True, "reason": "disabled_by_streamer"}} - rec = targets.get(symbol) + # Pending limit orders are executable state. Prefer the order snapshot over + # any stale in-memory recommendation target so a websocket tick can fill the + # order immediately when the price touches. + rec = _load_pending_paper_order_rec(symbol) or targets.get(symbol) if not rec: return {"updated_price": True, "paper_trading": {"skipped": True, "reason": "no_target"}} result = sync_recommendation(rec, price, event_time=event_time) diff --git a/tests/test_price_streamer.py b/tests/test_price_streamer.py index 939449d..fa0f35e 100644 --- a/tests/test_price_streamer.py +++ b/tests/test_price_streamer.py @@ -175,6 +175,44 @@ def test_price_streamer_fills_pending_paper_order(): assert list_paper_trades()["items"][0]["entry_price"] == pytest.approx(95) +def test_price_streamer_tick_fills_pending_order_even_when_targets_are_stale(): + set_config("system", "paper_trading", _paper_config()) + set_config("system", "price_streamer", { + "enabled": True, + "update_latest_price_cache": True, + "sync_paper_trading": True, + "include_actionable_recommendations": True, + "include_open_paper_trades": True, + }) + altcoin_db.init_db() + rec_id = altcoin_db.create_recommendation( + symbol="STALEPBO/USDT", + rec_state="蓄力", + rec_score=22, + entry_price=95, + stop_loss=90, + tp1=105, + signals=["等待回踩"], + entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "risk_reward_ok": True, "rr1": 2.0}, + ) + with altcoin_db.get_conn() as conn: + conn.execute( + "UPDATE recommendation SET execution_status='wait_pullback', action_status='等回踩', display_bucket='watch_pool' WHERE id=%s", + (rec_id,), + ) + conn.commit() + rec = {"id": rec_id, "symbol": "STALEPBO/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "risk_reward_ok": True, "rr1": 2.0}} + + created = price_streamer.handle_price_tick("STALEPBO/USDT", 100, {"STALEPBO/USDT": rec}, event_time="2026-05-16T10:00:00") + filled = price_streamer.handle_price_tick("STALEPBO/USDT", 94.9, {}, event_time="2026-05-16T10:05:00") + + assert created["paper_trading"]["reason"] == "paper_order_created" + assert filled["paper_trading"]["opened"] is True + assert filled["paper_trading"]["paper_order"]["filled"] is True + assert list_paper_orders(status="filled")["total"] == 1 + assert list_paper_trades()["items"][0]["symbol"] == "STALEPBO/USDT" + + def test_price_streamer_prioritizes_pending_order_over_same_symbol_recommendation(): set_config("system", "paper_trading", _paper_config()) set_config("system", "price_streamer", {