diff --git a/app/config/system_config.py b/app/config/system_config.py index 7705400..c8ae0c0 100644 --- a/app/config/system_config.py +++ b/app/config/system_config.py @@ -132,6 +132,10 @@ def default_price_streamer_config(): "refresh_symbols_seconds": _env_float("ALPHAX_PRICE_STREAMER_REFRESH_SYMBOLS_SECONDS", 20), "idle_sleep_seconds": _env_float("ALPHAX_PRICE_STREAMER_IDLE_SLEEP_SECONDS", 5), "reconnect_delay_seconds": _env_float("ALPHAX_PRICE_STREAMER_RECONNECT_DELAY_SECONDS", 5), + "ping_interval_seconds": _env_float("ALPHAX_PRICE_STREAMER_PING_INTERVAL_SECONDS", 30), + "ping_timeout_seconds": _env_float("ALPHAX_PRICE_STREAMER_PING_TIMEOUT_SECONDS", 60), + "close_timeout_seconds": _env_float("ALPHAX_PRICE_STREAMER_CLOSE_TIMEOUT_SECONDS", 5), + "transient_log_interval_seconds": _env_float("ALPHAX_PRICE_STREAMER_TRANSIENT_LOG_INTERVAL_SECONDS", 900), "max_stream_symbols": _env_int("ALPHAX_PRICE_STREAMER_MAX_SYMBOLS", 200), "include_actionable_recommendations": True, "include_open_paper_trades": True, diff --git a/app/services/price_streamer.py b/app/services/price_streamer.py index 998966f..5ee13cf 100644 --- a/app/services/price_streamer.py +++ b/app/services/price_streamer.py @@ -11,19 +11,24 @@ import json from datetime import datetime import websockets +from websockets.exceptions import ConnectionClosed, ConnectionClosedError, ConnectionClosedOK from app.config.system_config import price_streamer_config from app.db.altcoin_db import init_db, update_latest_price_cache from app.db.paper_trading import sync_recommendation from app.db.recommendation_queries import get_active_recommendations_deduped from app.db.schema import get_conn -from app.db.system_logs import record_exception +from app.db.system_logs import record_exception, record_system_error def _now() -> str: return datetime.now().isoformat() +_LAST_TRANSIENT_LOG_AT = 0.0 +_TRANSIENT_DISCONNECT_COUNT = 0 + + def _safe_float(value, default=0.0) -> float: try: if value is None or value == "": @@ -159,6 +164,53 @@ def _parse_ticker_message(raw: str) -> tuple[str, float]: return symbol.upper(), price +def _is_transient_ws_error(exc: BaseException) -> bool: + if isinstance(exc, (ConnectionClosed, ConnectionClosedError, ConnectionClosedOK, TimeoutError, asyncio.TimeoutError, OSError)): + return True + message = str(exc).lower() + return any( + needle in message + for needle in ( + "keepalive ping timeout", + "no close frame received", + "connection closed", + "connection reset", + "temporarily unavailable", + "timed out", + ) + ) + + +def _record_transient_disconnect(exc: BaseException, cfg: dict, symbols_count: int) -> None: + global _LAST_TRANSIENT_LOG_AT, _TRANSIENT_DISCONNECT_COUNT + _TRANSIENT_DISCONNECT_COUNT += 1 + loop_time = 0.0 + try: + loop_time = asyncio.get_running_loop().time() + except Exception: + loop_time = 0.0 + interval = max(0.0, _safe_float(cfg.get("transient_log_interval_seconds"), 900)) + if interval > 0 and _LAST_TRANSIENT_LOG_AT and loop_time and loop_time - _LAST_TRANSIENT_LOG_AT < interval: + return + _LAST_TRANSIENT_LOG_AT = loop_time or _LAST_TRANSIENT_LOG_AT + try: + record_system_error( + source="price_streamer", + level="warning", + error_type=exc.__class__.__name__, + message=f"websocket transient disconnect: {exc}", + status_code=0, + context={ + "symbols_count": symbols_count, + "disconnect_count_since_start": _TRANSIENT_DISCONNECT_COUNT, + "reconnect_delay_seconds": cfg.get("reconnect_delay_seconds"), + }, + fingerprint="price_streamer_transient_disconnect", + ) + except Exception: + pass + + async def run_forever() -> None: init_db() cfg = price_streamer_config() @@ -173,10 +225,12 @@ async def run_forever() -> None: log_every = max(1, _safe_int(cfg.get("log_every_events"), 100)) while True: + symbols_count = 0 try: cfg = price_streamer_config() targets = load_stream_targets(cfg=cfg) symbols = sorted(targets.keys()) + symbols_count = len(symbols) if not symbols: print("[price-streamer] no active symbols, sleeping", flush=True) await asyncio.sleep(idle_sleep) @@ -185,7 +239,12 @@ async def run_forever() -> None: print(f"[price-streamer] connecting symbols={len(symbols)}", flush=True) last_refresh = asyncio.get_running_loop().time() count = 0 - async with websockets.connect(url, ping_interval=20, ping_timeout=20, close_timeout=5) as ws: + async with websockets.connect( + url, + ping_interval=max(5.0, _safe_float(cfg.get("ping_interval_seconds"), 30)), + ping_timeout=max(5.0, _safe_float(cfg.get("ping_timeout_seconds"), 60)), + close_timeout=max(1.0, _safe_float(cfg.get("close_timeout_seconds"), 5)), + ) as ws: async for raw in ws: symbol, price = _parse_ticker_message(raw) if symbol and price > 0: @@ -201,11 +260,15 @@ async def run_forever() -> None: targets = new_targets last_refresh = asyncio.get_running_loop().time() except Exception as exc: - print(f"[price-streamer] error: {exc}", flush=True) - try: - record_exception(exc, source="price_streamer") - except Exception: - pass + if _is_transient_ws_error(exc): + print(f"[price-streamer] websocket disconnected, reconnecting: {exc}", flush=True) + _record_transient_disconnect(exc, cfg, symbols_count) + else: + print(f"[price-streamer] error: {exc}", flush=True) + try: + record_exception(exc, source="price_streamer") + except Exception: + pass await asyncio.sleep(reconnect_delay) diff --git a/tests/test_price_streamer.py b/tests/test_price_streamer.py index d3e68d6..8ce5fb9 100644 --- a/tests/test_price_streamer.py +++ b/tests/test_price_streamer.py @@ -149,3 +149,37 @@ def test_price_streamer_parse_ticker_message(): assert symbol == "WS/USDT" assert price == pytest.approx(101.5) + + +def test_price_streamer_treats_keepalive_disconnect_as_transient(): + exc = TimeoutError("sent 1011 (internal error) keepalive ping timeout; no close frame received") + + assert price_streamer._is_transient_ws_error(exc) is True + + +def test_price_streamer_transient_disconnect_logging_is_throttled(monkeypatch): + calls = [] + + class FakeLoop: + def __init__(self): + self.value = 1000.0 + + def time(self): + return self.value + + fake_loop = FakeLoop() + monkeypatch.setattr(price_streamer, "_LAST_TRANSIENT_LOG_AT", 0.0) + monkeypatch.setattr(price_streamer, "_TRANSIENT_DISCONNECT_COUNT", 0) + monkeypatch.setattr(price_streamer.asyncio, "get_running_loop", lambda: fake_loop) + monkeypatch.setattr(price_streamer, "record_system_error", lambda **kwargs: calls.append(kwargs) or 1) + + cfg = {"transient_log_interval_seconds": 900, "reconnect_delay_seconds": 5} + price_streamer._record_transient_disconnect(TimeoutError("keepalive ping timeout"), cfg, 12) + fake_loop.value += 60 + price_streamer._record_transient_disconnect(TimeoutError("keepalive ping timeout"), cfg, 12) + fake_loop.value += 901 + price_streamer._record_transient_disconnect(TimeoutError("keepalive ping timeout"), cfg, 12) + + assert len(calls) == 2 + assert calls[0]["level"] == "warning" + assert calls[0]["fingerprint"] == "price_streamer_transient_disconnect"