This commit is contained in:
aaron 2026-05-21 08:58:10 +08:00
parent c5d8e343ea
commit cc4a0c7eb7
3 changed files with 108 additions and 7 deletions

View File

@ -132,6 +132,10 @@ def default_price_streamer_config():
"refresh_symbols_seconds": _env_float("ALPHAX_PRICE_STREAMER_REFRESH_SYMBOLS_SECONDS", 20),
"idle_sleep_seconds": _env_float("ALPHAX_PRICE_STREAMER_IDLE_SLEEP_SECONDS", 5),
"reconnect_delay_seconds": _env_float("ALPHAX_PRICE_STREAMER_RECONNECT_DELAY_SECONDS", 5),
"ping_interval_seconds": _env_float("ALPHAX_PRICE_STREAMER_PING_INTERVAL_SECONDS", 30),
"ping_timeout_seconds": _env_float("ALPHAX_PRICE_STREAMER_PING_TIMEOUT_SECONDS", 60),
"close_timeout_seconds": _env_float("ALPHAX_PRICE_STREAMER_CLOSE_TIMEOUT_SECONDS", 5),
"transient_log_interval_seconds": _env_float("ALPHAX_PRICE_STREAMER_TRANSIENT_LOG_INTERVAL_SECONDS", 900),
"max_stream_symbols": _env_int("ALPHAX_PRICE_STREAMER_MAX_SYMBOLS", 200),
"include_actionable_recommendations": True,
"include_open_paper_trades": True,

View File

@ -11,19 +11,24 @@ import json
from datetime import datetime
import websockets
from websockets.exceptions import ConnectionClosed, ConnectionClosedError, ConnectionClosedOK
from app.config.system_config import price_streamer_config
from app.db.altcoin_db import init_db, update_latest_price_cache
from app.db.paper_trading import sync_recommendation
from app.db.recommendation_queries import get_active_recommendations_deduped
from app.db.schema import get_conn
from app.db.system_logs import record_exception
from app.db.system_logs import record_exception, record_system_error
def _now() -> str:
return datetime.now().isoformat()
_LAST_TRANSIENT_LOG_AT = 0.0
_TRANSIENT_DISCONNECT_COUNT = 0
def _safe_float(value, default=0.0) -> float:
try:
if value is None or value == "":
@ -159,6 +164,53 @@ def _parse_ticker_message(raw: str) -> tuple[str, float]:
return symbol.upper(), price
def _is_transient_ws_error(exc: BaseException) -> bool:
if isinstance(exc, (ConnectionClosed, ConnectionClosedError, ConnectionClosedOK, TimeoutError, asyncio.TimeoutError, OSError)):
return True
message = str(exc).lower()
return any(
needle in message
for needle in (
"keepalive ping timeout",
"no close frame received",
"connection closed",
"connection reset",
"temporarily unavailable",
"timed out",
)
)
def _record_transient_disconnect(exc: BaseException, cfg: dict, symbols_count: int) -> None:
global _LAST_TRANSIENT_LOG_AT, _TRANSIENT_DISCONNECT_COUNT
_TRANSIENT_DISCONNECT_COUNT += 1
loop_time = 0.0
try:
loop_time = asyncio.get_running_loop().time()
except Exception:
loop_time = 0.0
interval = max(0.0, _safe_float(cfg.get("transient_log_interval_seconds"), 900))
if interval > 0 and _LAST_TRANSIENT_LOG_AT and loop_time and loop_time - _LAST_TRANSIENT_LOG_AT < interval:
return
_LAST_TRANSIENT_LOG_AT = loop_time or _LAST_TRANSIENT_LOG_AT
try:
record_system_error(
source="price_streamer",
level="warning",
error_type=exc.__class__.__name__,
message=f"websocket transient disconnect: {exc}",
status_code=0,
context={
"symbols_count": symbols_count,
"disconnect_count_since_start": _TRANSIENT_DISCONNECT_COUNT,
"reconnect_delay_seconds": cfg.get("reconnect_delay_seconds"),
},
fingerprint="price_streamer_transient_disconnect",
)
except Exception:
pass
async def run_forever() -> None:
init_db()
cfg = price_streamer_config()
@ -173,10 +225,12 @@ async def run_forever() -> None:
log_every = max(1, _safe_int(cfg.get("log_every_events"), 100))
while True:
symbols_count = 0
try:
cfg = price_streamer_config()
targets = load_stream_targets(cfg=cfg)
symbols = sorted(targets.keys())
symbols_count = len(symbols)
if not symbols:
print("[price-streamer] no active symbols, sleeping", flush=True)
await asyncio.sleep(idle_sleep)
@ -185,7 +239,12 @@ async def run_forever() -> None:
print(f"[price-streamer] connecting symbols={len(symbols)}", flush=True)
last_refresh = asyncio.get_running_loop().time()
count = 0
async with websockets.connect(url, ping_interval=20, ping_timeout=20, close_timeout=5) as ws:
async with websockets.connect(
url,
ping_interval=max(5.0, _safe_float(cfg.get("ping_interval_seconds"), 30)),
ping_timeout=max(5.0, _safe_float(cfg.get("ping_timeout_seconds"), 60)),
close_timeout=max(1.0, _safe_float(cfg.get("close_timeout_seconds"), 5)),
) as ws:
async for raw in ws:
symbol, price = _parse_ticker_message(raw)
if symbol and price > 0:
@ -201,11 +260,15 @@ async def run_forever() -> None:
targets = new_targets
last_refresh = asyncio.get_running_loop().time()
except Exception as exc:
print(f"[price-streamer] error: {exc}", flush=True)
try:
record_exception(exc, source="price_streamer")
except Exception:
pass
if _is_transient_ws_error(exc):
print(f"[price-streamer] websocket disconnected, reconnecting: {exc}", flush=True)
_record_transient_disconnect(exc, cfg, symbols_count)
else:
print(f"[price-streamer] error: {exc}", flush=True)
try:
record_exception(exc, source="price_streamer")
except Exception:
pass
await asyncio.sleep(reconnect_delay)

View File

@ -149,3 +149,37 @@ def test_price_streamer_parse_ticker_message():
assert symbol == "WS/USDT"
assert price == pytest.approx(101.5)
def test_price_streamer_treats_keepalive_disconnect_as_transient():
exc = TimeoutError("sent 1011 (internal error) keepalive ping timeout; no close frame received")
assert price_streamer._is_transient_ws_error(exc) is True
def test_price_streamer_transient_disconnect_logging_is_throttled(monkeypatch):
calls = []
class FakeLoop:
def __init__(self):
self.value = 1000.0
def time(self):
return self.value
fake_loop = FakeLoop()
monkeypatch.setattr(price_streamer, "_LAST_TRANSIENT_LOG_AT", 0.0)
monkeypatch.setattr(price_streamer, "_TRANSIENT_DISCONNECT_COUNT", 0)
monkeypatch.setattr(price_streamer.asyncio, "get_running_loop", lambda: fake_loop)
monkeypatch.setattr(price_streamer, "record_system_error", lambda **kwargs: calls.append(kwargs) or 1)
cfg = {"transient_log_interval_seconds": 900, "reconnect_delay_seconds": 5}
price_streamer._record_transient_disconnect(TimeoutError("keepalive ping timeout"), cfg, 12)
fake_loop.value += 60
price_streamer._record_transient_disconnect(TimeoutError("keepalive ping timeout"), cfg, 12)
fake_loop.value += 901
price_streamer._record_transient_disconnect(TimeoutError("keepalive ping timeout"), cfg, 12)
assert len(calls) == 2
assert calls[0]["level"] == "warning"
assert calls[0]["fingerprint"] == "price_streamer_transient_disconnect"