From d646007bc5f3f70f90ee6a657b8d32140462ac6d Mon Sep 17 00:00:00 2001 From: aaron <> Date: Mon, 8 Jun 2026 00:15:54 +0800 Subject: [PATCH] 1 --- app/services/paper_trader.py | 4 +++- app/services/price_streamer.py | 10 ++++++++- tests/test_price_streamer.py | 40 +++++++++++++++++++++++++++++++--- 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/app/services/paper_trader.py b/app/services/paper_trader.py index 9f30bc0..6ecea98 100644 --- a/app/services/paper_trader.py +++ b/app/services/paper_trader.py @@ -10,7 +10,7 @@ import ccxt from app.db.altcoin_db import init_db, log_cron_run, update_latest_price_cache from app.db.paper_trading import get_paper_trading_summary, sync_pending_paper_orders, sync_recommendation from app.db.recommendation_queries import get_active_recommendations_deduped -from app.services.live_trading_sync import sync_paper_trade_to_live +from app.services.live_trading_sync import sync_live_protection_from_paper, sync_paper_trade_to_live exchange = ccxt.binance({"enableRateLimit": True}) @@ -39,6 +39,7 @@ def run_once(limit: int = 100) -> dict: for item in pending_result.get("results", []): if item.get("trade_id") and (item.get("opened") or item.get("paper_order", {}).get("filled")): item["live_sync"] = sync_paper_trade_to_live(int(item["trade_id"]), execute=True) + protection_result = sync_live_protection_from_paper(limit=limit) output = { "status": "completed", "processed_count": len(results), @@ -48,6 +49,7 @@ def run_once(limit: int = 100) -> dict: "failed": failed, "results": results, "pending_results": pending_result.get("results", []), + "live_protection_sync": protection_result, "summary": get_paper_trading_summary(days=30), "run_time": datetime.now().isoformat(), } diff --git a/app/services/price_streamer.py b/app/services/price_streamer.py index 99550be..0a67a3e 100644 --- a/app/services/price_streamer.py +++ b/app/services/price_streamer.py @@ -19,7 +19,7 @@ from app.db.paper_trading import sync_recommendation from app.db.recommendation_queries import get_active_recommendations_deduped from app.db.schema import get_conn from app.db.system_logs import record_exception, record_system_error -from app.services.live_trading_sync import sync_paper_trade_to_live +from app.services.live_trading_sync import sync_live_protection_from_paper, sync_paper_trade_to_live def _now() -> str: @@ -152,6 +152,14 @@ def handle_price_tick(symbol: str, price: float, targets: dict[str, dict], event result = sync_recommendation(rec, price, event_time=event_time) if result.get("trade_id") and (result.get("opened") or result.get("paper_order", {}).get("filled")): result["live_sync"] = sync_paper_trade_to_live(int(result["trade_id"]), execute=True) + if result.get("trade_id") and ( + result.get("closed") + or result.get("activated") + or result.get("moved") + or result.get("tightened") + or result.get("trailing_stop") + ): + result["live_protection_sync"] = sync_live_protection_from_paper(limit=20) return {"updated_price": True, "paper_trading": result} diff --git a/tests/test_price_streamer.py b/tests/test_price_streamer.py index 3df49d5..939449d 100644 --- a/tests/test_price_streamer.py +++ b/tests/test_price_streamer.py @@ -1,5 +1,6 @@ import pytest +from app.core.strategy_registry import LONG_MOMENTUM_BREAKOUT_STRATEGY from app.db import altcoin_db from app.db.paper_trading import list_paper_orders, list_paper_trades, sync_recommendation from app.db.runtime_config_db import set_config @@ -37,25 +38,47 @@ def buy_now_rec(monkeypatch): "include_open_paper_trades": True, }) altcoin_db.init_db() + with altcoin_db.get_conn() as conn: + conn.execute( + "UPDATE recommendation SET status='archived', display_bucket='history' WHERE symbol=%s AND status='active'", + ("WS/USDT",), + ) + conn.commit() rec_id = altcoin_db.create_recommendation( symbol="WS/USDT", rec_state="爆发", rec_score=28, entry_price=100, - stop_loss=95, + stop_loss=96, tp1=106, tp2=112, signals=["当前15min即刻入场信号"], + strategy_code=LONG_MOMENTUM_BREAKOUT_STRATEGY, entry_plan={ "entry_action": "可即刻买入", + "strategy_code": LONG_MOMENTUM_BREAKOUT_STRATEGY, "entry_price": 100, - "stop_loss": 95, + "stop_loss": 96, "tp1": 106, "tp2": 112, "risk_reward_ok": True, "entry_trigger_confirmed": True, }, ) + with altcoin_db.get_conn() as conn: + conn.execute( + """ + UPDATE recommendation + SET action_status='可即刻买入', + execution_status='buy_now', + display_bucket='realtime', + lifecycle_state='buyable', + entry_triggered=0 + WHERE id=%s + """, + (rec_id,), + ) + conn.commit() return next(r for r in altcoin_db.get_active_recommendations_deduped(actionable_only=False) if r["id"] == rec_id) @@ -88,6 +111,8 @@ def test_price_streamer_tick_drives_paper_trailing_stop(monkeypatch, buy_now_rec monkeypatch.setenv("ALPHAX_PAPER_TRAILING_MIN_LOCK_PROFIT_PCT", "0.5") monkeypatch.setenv("ALPHAX_PAPER_TRAILING_DISTANCE_PCT", "1.5") targets = {"WS/USDT": buy_now_rec} + protection_calls = [] + monkeypatch.setattr(price_streamer, "sync_live_protection_from_paper", lambda **kwargs: protection_calls.append(kwargs) or {"ok": True}) price_streamer.handle_price_tick("WS/USDT", 100, targets, event_time="2026-05-16T10:00:00") activated = price_streamer.handle_price_tick("WS/USDT", 105, targets, event_time="2026-05-16T10:01:00") @@ -95,8 +120,11 @@ def test_price_streamer_tick_drives_paper_trailing_stop(monkeypatch, buy_now_rec closed = price_streamer.handle_price_tick("WS/USDT", trailing_stop * 0.999, targets, event_time="2026-05-16T10:02:00") assert activated["paper_trading"]["activated"] is True + assert activated["paper_trading"]["live_protection_sync"]["ok"] is True assert closed["paper_trading"]["closed"] is True assert closed["paper_trading"]["exit_reason"] == "trailing_stop" + assert closed["paper_trading"]["live_protection_sync"]["ok"] is True + assert len(protection_calls) >= 2 def test_price_streamer_tracks_open_paper_trade_without_active_rec(buy_now_rec): @@ -175,7 +203,13 @@ def test_price_streamer_prioritizes_pending_order_over_same_symbol_recommendatio stop_loss=95, tp1=106, signals=["当前15min即刻入场信号"], - entry_plan={"entry_action": "可即刻买入", "entry_trigger_confirmed": True, "risk_reward_ok": True}, + strategy_code=LONG_MOMENTUM_BREAKOUT_STRATEGY, + entry_plan={ + "entry_action": "可即刻买入", + "strategy_code": LONG_MOMENTUM_BREAKOUT_STRATEGY, + "entry_trigger_confirmed": True, + "risk_reward_ok": True, + }, ) with altcoin_db.get_conn() as conn: conn.execute(