import pytest from app.core.strategy_registry import LONG_MOMENTUM_BREAKOUT_STRATEGY from app.db import altcoin_db from app.db.paper_trading import list_paper_orders, list_paper_trades, sync_recommendation from app.db.runtime_config_db import set_config from app.services import price_streamer def _paper_config(**overrides): cfg = { "enabled": True, "trade_notional_usdt": 5000, "trade_leverage": 5, "fee_rate": 0, "slippage_pct": 0, "entry_gate_enabled": False, "entry_min_rr": 1.2, "order_min_rr": 1.2, "max_cumulative_leverage": 0, "max_stop_loss_leverage_risk_pct": 0, "max_account_drawdown_pause_pct": 0, "pause_after_weak_entries": 0, "global_risk_gate_enabled": False, } cfg.update(overrides) return cfg @pytest.fixture def buy_now_rec(monkeypatch): set_config("system", "paper_trading", _paper_config()) set_config("system", "price_streamer", { "enabled": True, "update_latest_price_cache": True, "sync_paper_trading": True, "include_actionable_recommendations": True, "include_open_paper_trades": True, }) altcoin_db.init_db() with altcoin_db.get_conn() as conn: conn.execute( "UPDATE recommendation SET status='archived', display_bucket='history' WHERE symbol=%s AND status='active'", ("WS/USDT",), ) conn.commit() rec_id = altcoin_db.create_recommendation( symbol="WS/USDT", rec_state="爆发", rec_score=28, entry_price=100, stop_loss=96, tp1=106, tp2=112, signals=["当前15min即刻入场信号"], strategy_code=LONG_MOMENTUM_BREAKOUT_STRATEGY, entry_plan={ "entry_action": "可即刻买入", "strategy_code": LONG_MOMENTUM_BREAKOUT_STRATEGY, "entry_price": 100, "stop_loss": 96, "tp1": 106, "tp2": 112, "risk_reward_ok": True, "entry_trigger_confirmed": True, }, ) with altcoin_db.get_conn() as conn: conn.execute( """ UPDATE recommendation SET action_status='可即刻买入', execution_status='buy_now', display_bucket='realtime', lifecycle_state='buyable', entry_triggered=0 WHERE id=%s """, (rec_id,), ) conn.commit() return next(r for r in altcoin_db.get_active_recommendations_deduped(actionable_only=False) if r["id"] == rec_id) def test_price_streamer_loads_actionable_targets(buy_now_rec): targets = price_streamer.load_stream_targets() assert "WS/USDT" in targets assert targets["WS/USDT"]["id"] == buy_now_rec["id"] def test_price_streamer_tick_opens_and_closes_paper_trade(buy_now_rec): targets = {"WS/USDT": buy_now_rec} opened = price_streamer.handle_price_tick("WS/USDT", 100, targets, event_time="2026-05-16T10:00:00") targets = price_streamer.load_stream_targets() closed = price_streamer.handle_price_tick("WS/USDT", 106, targets, event_time="2026-05-16T10:01:00") assert opened["paper_trading"]["opened"] is True assert closed["paper_trading"]["closed"] is True assert closed["paper_trading"]["exit_reason"] == "tp1" trade = list_paper_trades()["items"][0] assert trade["status"] == "closed" assert trade["notional_usdt"] == pytest.approx(5000.0) def test_price_streamer_tick_drives_paper_trailing_stop(monkeypatch, buy_now_rec): monkeypatch.setenv("ALPHAX_PAPER_TRAILING_STOP_ENABLED", "1") monkeypatch.setenv("ALPHAX_PAPER_TRAILING_MODE", "fixed") monkeypatch.setenv("ALPHAX_PAPER_TRAILING_ACTIVATE_PNL_PCT", "3") monkeypatch.setenv("ALPHAX_PAPER_TRAILING_MIN_LOCK_PROFIT_PCT", "0.5") monkeypatch.setenv("ALPHAX_PAPER_TRAILING_DISTANCE_PCT", "1.5") targets = {"WS/USDT": buy_now_rec} protection_calls = [] monkeypatch.setattr(price_streamer, "sync_live_protection_from_paper", lambda **kwargs: protection_calls.append(kwargs) or {"ok": True}) price_streamer.handle_price_tick("WS/USDT", 100, targets, event_time="2026-05-16T10:00:00") activated = price_streamer.handle_price_tick("WS/USDT", 105, targets, event_time="2026-05-16T10:01:00") trailing_stop = activated["paper_trading"]["trailing_stop"] closed = price_streamer.handle_price_tick("WS/USDT", trailing_stop * 0.999, targets, event_time="2026-05-16T10:02:00") assert activated["paper_trading"]["activated"] is True assert activated["paper_trading"]["live_protection_sync"]["ok"] is True assert closed["paper_trading"]["closed"] is True assert closed["paper_trading"]["exit_reason"] == "trailing_stop" assert closed["paper_trading"]["live_protection_sync"]["ok"] is True assert len(protection_calls) >= 2 def test_price_streamer_tracks_open_paper_trade_without_active_rec(buy_now_rec): sync_recommendation(buy_now_rec, 100, event_time="2026-05-16T10:00:00") targets = price_streamer.load_stream_targets() assert "WS/USDT" in targets assert targets["WS/USDT"]["id"] == buy_now_rec["id"] def test_price_streamer_fills_pending_paper_order(): set_config("system", "paper_trading", _paper_config()) set_config("system", "price_streamer", { "enabled": True, "update_latest_price_cache": True, "sync_paper_trading": True, "include_actionable_recommendations": True, "include_open_paper_trades": True, }) altcoin_db.init_db() rec_id = altcoin_db.create_recommendation( symbol="PBO/USDT", rec_state="蓄力", rec_score=22, entry_price=95, stop_loss=90, tp1=105, signals=["等待回踩"], entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "risk_reward_ok": True, "rr1": 2.0}, ) with altcoin_db.get_conn() as conn: conn.execute( "UPDATE recommendation SET execution_status='wait_pullback', action_status='等回踩', display_bucket='watch_pool' WHERE id=%s", (rec_id,), ) conn.commit() rec = {"id": rec_id, "symbol": "PBO/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "risk_reward_ok": True, "rr1": 2.0}} created = price_streamer.handle_price_tick("PBO/USDT", 100, {"PBO/USDT": rec}, event_time="2026-05-16T10:00:00") targets = price_streamer.load_stream_targets() filled = price_streamer.handle_price_tick("PBO/USDT", 94.9, targets, event_time="2026-05-16T10:05:00") assert created["paper_trading"]["reason"] == "paper_order_created" assert "PBO/USDT" in targets assert filled["paper_trading"]["opened"] is True assert list_paper_orders(status="filled")["total"] == 1 assert list_paper_trades()["items"][0]["entry_price"] == pytest.approx(95) def test_price_streamer_tick_fills_pending_order_even_when_targets_are_stale(): set_config("system", "paper_trading", _paper_config()) set_config("system", "price_streamer", { "enabled": True, "update_latest_price_cache": True, "sync_paper_trading": True, "include_actionable_recommendations": True, "include_open_paper_trades": True, }) altcoin_db.init_db() rec_id = altcoin_db.create_recommendation( symbol="STALEPBO/USDT", rec_state="蓄力", rec_score=22, entry_price=95, stop_loss=90, tp1=105, signals=["等待回踩"], entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "risk_reward_ok": True, "rr1": 2.0}, ) with altcoin_db.get_conn() as conn: conn.execute( "UPDATE recommendation SET execution_status='wait_pullback', action_status='等回踩', display_bucket='watch_pool' WHERE id=%s", (rec_id,), ) conn.commit() rec = {"id": rec_id, "symbol": "STALEPBO/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "risk_reward_ok": True, "rr1": 2.0}} created = price_streamer.handle_price_tick("STALEPBO/USDT", 100, {"STALEPBO/USDT": rec}, event_time="2026-05-16T10:00:00") filled = price_streamer.handle_price_tick("STALEPBO/USDT", 94.9, {}, event_time="2026-05-16T10:05:00") assert created["paper_trading"]["reason"] == "paper_order_created" assert filled["paper_trading"]["opened"] is True assert filled["paper_trading"]["paper_order"]["filled"] is True assert list_paper_orders(status="filled")["total"] == 1 assert list_paper_trades()["items"][0]["symbol"] == "STALEPBO/USDT" def test_price_streamer_prioritizes_pending_order_over_same_symbol_recommendation(): set_config("system", "paper_trading", _paper_config()) set_config("system", "price_streamer", { "enabled": True, "update_latest_price_cache": True, "sync_paper_trading": True, "include_actionable_recommendations": True, "include_open_paper_trades": True, }) altcoin_db.init_db() pending_id = altcoin_db.create_recommendation( symbol="DUP/USDT", rec_state="蓄力", rec_score=24, entry_price=95, stop_loss=90, tp1=105, signals=["等待回踩"], entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "risk_reward_ok": True, "rr1": 2.0}, ) active_id = altcoin_db.create_recommendation( symbol="DUP/USDT", rec_state="爆发", rec_score=28, entry_price=100, stop_loss=95, tp1=106, signals=["当前15min即刻入场信号"], strategy_code=LONG_MOMENTUM_BREAKOUT_STRATEGY, entry_plan={ "entry_action": "可即刻买入", "strategy_code": LONG_MOMENTUM_BREAKOUT_STRATEGY, "entry_trigger_confirmed": True, "risk_reward_ok": True, }, ) with altcoin_db.get_conn() as conn: conn.execute( "UPDATE recommendation SET execution_status='wait_pullback', action_status='等回踩', display_bucket='watch_pool' WHERE id=%s", (pending_id,), ) conn.commit() pending_rec = {"id": pending_id, "symbol": "DUP/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "risk_reward_ok": True, "rr1": 2.0}} price_streamer.handle_price_tick("DUP/USDT", 100, {"DUP/USDT": pending_rec}, event_time="2026-05-16T10:00:00") targets = price_streamer.load_stream_targets() filled = price_streamer.handle_price_tick("DUP/USDT", 94.9, targets, event_time="2026-05-16T10:05:00") assert active_id > 0 assert targets["DUP/USDT"]["id"] == pending_id assert filled["paper_trading"]["opened"] is True assert list_paper_orders(status="filled")["total"] == 1 assert list_paper_trades()["items"][0]["recommendation_id"] == pending_id def test_price_streamer_subscription_prioritizes_execution_symbols(monkeypatch): monkeypatch.setattr( price_streamer, "_load_pending_paper_order_recs", lambda: [{"id": 1, "symbol": "PUMP/USDT", "side": "long", "entry_price": 0.001636}], ) monkeypatch.setattr( price_streamer, "_load_open_paper_trade_recs", lambda: [{"id": 2, "symbol": "WLD/USDT", "side": "long", "entry_price": 0.5}], ) monkeypatch.setattr( price_streamer, "get_active_recommendations_deduped", lambda **kwargs: [ {"id": 3, "symbol": "AAA/USDT"}, {"id": 4, "symbol": "BBB/USDT"}, {"id": 5, "symbol": "CCC/USDT"}, ], ) targets = price_streamer.load_stream_targets( limit=2, cfg={ "include_actionable_recommendations": True, "include_open_paper_trades": True, }, ) assert list(targets.keys()) == ["PUMP/USDT", "WLD/USDT"] def test_price_streamer_builds_binance_combined_stream_url(): url = price_streamer._stream_url(["BTC/USDT", "ETH/USDT"], {"stream_url": "wss://example.test/stream"}) assert url == "wss://example.test/stream?streams=btcusdt@ticker/ethusdt@ticker" def test_price_streamer_parse_ticker_message(): symbol, price = price_streamer._parse_ticker_message('{"stream":"wsusdt@ticker","data":{"s":"WSUSDT","c":"101.5"}}') assert symbol == "WS/USDT" assert price == pytest.approx(101.5) def test_price_streamer_treats_keepalive_disconnect_as_transient(): exc = TimeoutError("sent 1011 (internal error) keepalive ping timeout; no close frame received") assert price_streamer._is_transient_ws_error(exc) is True def test_price_streamer_transient_disconnect_logging_is_throttled(monkeypatch): calls = [] class FakeLoop: def __init__(self): self.value = 1000.0 def time(self): return self.value fake_loop = FakeLoop() monkeypatch.setattr(price_streamer, "_LAST_TRANSIENT_LOG_AT", 0.0) monkeypatch.setattr(price_streamer, "_TRANSIENT_DISCONNECT_COUNT", 0) monkeypatch.setattr(price_streamer.asyncio, "get_running_loop", lambda: fake_loop) monkeypatch.setattr(price_streamer, "record_system_error", lambda **kwargs: calls.append(kwargs) or 1) cfg = {"transient_log_interval_seconds": 900, "reconnect_delay_seconds": 5} price_streamer._record_transient_disconnect(TimeoutError("keepalive ping timeout"), cfg, 12) fake_loop.value += 60 price_streamer._record_transient_disconnect(TimeoutError("keepalive ping timeout"), cfg, 12) fake_loop.value += 901 price_streamer._record_transient_disconnect(TimeoutError("keepalive ping timeout"), cfg, 12) assert len(calls) == 2 assert calls[0]["level"] == "warning" assert calls[0]["fingerprint"] == "price_streamer_transient_disconnect"