"""Realtime Binance websocket price stream for paper trading. The stream is intentionally separate from the cron tracker. Websocket ticks drive paper-trading TP/SL decisions quickly, while cron remains a fallback. """ from __future__ import annotations import asyncio import json from datetime import datetime import websockets from websockets.exceptions import ConnectionClosed, ConnectionClosedError, ConnectionClosedOK from app.config.system_config import price_streamer_config from app.db.altcoin_db import init_db, update_latest_price_cache from app.db.paper_trading import sync_recommendation from app.db.recommendation_queries import get_active_recommendations_deduped from app.db.schema import get_conn from app.db.system_logs import record_exception, record_system_error from app.services.live_trading_sync import sync_live_protection_from_paper, sync_paper_trade_to_live def _now() -> str: return datetime.now().isoformat() _LAST_TRANSIENT_LOG_AT = 0.0 _TRANSIENT_DISCONNECT_COUNT = 0 def _safe_float(value, default=0.0) -> float: try: if value is None or value == "": return default return float(value) except Exception: return default def _safe_int(value, default=0) -> int: try: return int(value or 0) except Exception: return default def _stream_name(symbol: str) -> str: base = str(symbol or "").replace("/", "").lower() return f"{base}@ticker" if base else "" def _stream_url(symbols: list[str], cfg: dict | None = None) -> str: cfg = cfg or price_streamer_config() base_url = str(cfg.get("stream_url") or "wss://stream.binance.com:9443/stream").rstrip("/") streams = "/".join(_stream_name(s) for s in symbols if _stream_name(s)) return f"{base_url}?streams={streams}" def _load_open_paper_trade_recs() -> list[dict]: conn = get_conn() try: rows = conn.execute( """ SELECT p.recommendation_id AS id, p.symbol, p.entry_price, p.stop_loss, p.tp1, p.tp2, p.source_status AS execution_status, p.source_action AS action_status, p.strategy_version FROM paper_trades p WHERE p.status='open' ORDER BY p.opened_at DESC, p.id DESC """ ).fetchall() return [dict(r) for r in rows] finally: conn.close() def _load_pending_paper_order_recs() -> list[dict]: conn = get_conn() try: rows = conn.execute( """ SELECT po.recommendation_id AS id, po.symbol, po.side, po.target_price AS entry_price, po.stop_loss, po.tp1, po.tp2, po.source_status AS execution_status, po.source_action AS action_status, po.strategy_version, po.strategy_code, po.strategy_signal_id, po.strategy_snapshot_json, po.factor_roles_json, po.entry_plan_snapshot_json AS entry_plan_json FROM paper_orders po WHERE po.status='pending' ORDER BY po.created_at DESC, po.id DESC """ ).fetchall() return [dict(r) for r in rows] finally: conn.close() def _load_pending_paper_order_rec(symbol: str) -> dict | None: symbol = str(symbol or "").strip().upper() if not symbol: return None conn = get_conn() try: row = conn.execute( """ SELECT po.recommendation_id AS id, po.symbol, po.side, po.target_price AS entry_price, po.stop_loss, po.tp1, po.tp2, po.source_status AS execution_status, po.source_action AS action_status, po.strategy_version, po.strategy_code, po.strategy_signal_id, po.strategy_snapshot_json, po.factor_roles_json, po.entry_plan_snapshot_json AS entry_plan_json FROM paper_orders po WHERE po.status='pending' AND po.symbol=%s ORDER BY po.created_at ASC, po.id ASC LIMIT 1 """, (symbol,), ).fetchone() return dict(row) if row else None finally: conn.close() def load_stream_targets(limit: int | None = None, cfg: dict | None = None) -> dict[str, dict]: """Return symbol -> recommendation-like payload for websocket updates.""" cfg = cfg or price_streamer_config() max_symbols = max(1, _safe_int(limit or cfg.get("max_stream_symbols"), 200)) targets: dict[str, dict] = {} # Execution state must never be crowded out by a large watch/recommendation # set. Pending orders and open positions drive fills, TP/SL, trailing stops, # and live protection sync; recommendations only use the remaining capacity. if cfg.get("include_open_paper_trades", True): for rec in _load_pending_paper_order_recs(): symbol = str(rec.get("symbol") or "").strip().upper() if symbol: targets[symbol] = rec if len(targets) >= max_symbols: return targets for rec in _load_open_paper_trade_recs(): symbol = str(rec.get("symbol") or "").strip().upper() if symbol: targets[symbol] = rec if len(targets) >= max_symbols: return targets if cfg.get("include_actionable_recommendations", True): remaining = max(0, max_symbols - len(targets)) if remaining > 0: for rec in get_active_recommendations_deduped(actionable_only=True, limit=remaining, with_meta=False): symbol = str(rec.get("symbol") or "").strip().upper() if symbol and symbol not in targets: targets[symbol] = dict(rec) if len(targets) >= max_symbols: break return targets def handle_price_tick(symbol: str, price: float, targets: dict[str, dict], event_time: str | None = None, cfg: dict | None = None) -> dict: cfg = cfg or price_streamer_config() symbol = str(symbol or "").strip().upper() price = _safe_float(price) if not symbol or price <= 0: return {"skipped": True, "reason": "invalid_tick"} event_time = event_time or _now() if cfg.get("update_latest_price_cache", True): update_latest_price_cache(symbol, price, updated_at=event_time, source="price_streamer") if not cfg.get("sync_paper_trading", True): return {"updated_price": True, "paper_trading": {"skipped": True, "reason": "disabled_by_streamer"}} # Pending limit orders are executable state. Prefer the order snapshot over # any stale in-memory recommendation target so a websocket tick can fill the # order immediately when the price touches. rec = _load_pending_paper_order_rec(symbol) or targets.get(symbol) if not rec: return {"updated_price": True, "paper_trading": {"skipped": True, "reason": "no_target"}} result = sync_recommendation(rec, price, event_time=event_time) if result.get("trade_id") and (result.get("opened") or result.get("paper_order", {}).get("filled")): result["live_sync"] = sync_paper_trade_to_live(int(result["trade_id"]), execute=True) if result.get("trade_id") and ( result.get("closed") or result.get("activated") or result.get("moved") or result.get("tightened") or result.get("trailing_stop") ): result["live_protection_sync"] = sync_live_protection_from_paper(limit=20) return {"updated_price": True, "paper_trading": result} def _parse_ticker_message(raw: str) -> tuple[str, float]: payload = json.loads(raw) data = payload.get("data") if isinstance(payload, dict) else None if not isinstance(data, dict): data = payload if isinstance(payload, dict) else {} symbol = str(data.get("s") or "") price = _safe_float(data.get("c") or data.get("p") or data.get("lastPrice")) if symbol.endswith("USDT"): symbol = f"{symbol[:-4]}/USDT" return symbol.upper(), price def _is_transient_ws_error(exc: BaseException) -> bool: if isinstance(exc, (ConnectionClosed, ConnectionClosedError, ConnectionClosedOK, TimeoutError, asyncio.TimeoutError, OSError)): return True message = str(exc).lower() return any( needle in message for needle in ( "keepalive ping timeout", "no close frame received", "connection closed", "connection reset", "temporarily unavailable", "timed out", ) ) def _record_transient_disconnect(exc: BaseException, cfg: dict, symbols_count: int) -> None: global _LAST_TRANSIENT_LOG_AT, _TRANSIENT_DISCONNECT_COUNT _TRANSIENT_DISCONNECT_COUNT += 1 loop_time = 0.0 try: loop_time = asyncio.get_running_loop().time() except Exception: loop_time = 0.0 interval = max(0.0, _safe_float(cfg.get("transient_log_interval_seconds"), 900)) if interval > 0 and _LAST_TRANSIENT_LOG_AT and loop_time and loop_time - _LAST_TRANSIENT_LOG_AT < interval: return _LAST_TRANSIENT_LOG_AT = loop_time or _LAST_TRANSIENT_LOG_AT try: record_system_error( source="price_streamer", level="warning", error_type=exc.__class__.__name__, message=f"websocket transient disconnect: {exc}", status_code=0, context={ "symbols_count": symbols_count, "disconnect_count_since_start": _TRANSIENT_DISCONNECT_COUNT, "reconnect_delay_seconds": cfg.get("reconnect_delay_seconds"), }, fingerprint="price_streamer_transient_disconnect", ) except Exception: pass async def run_forever() -> None: init_db() cfg = price_streamer_config() if not cfg.get("enabled", True): print("[price-streamer] disabled by system_config.price_streamer", flush=True) while True: await asyncio.sleep(max(1.0, _safe_float(cfg.get("idle_sleep_seconds"), 5))) refresh_seconds = max(5.0, _safe_float(cfg.get("refresh_symbols_seconds"), 20)) idle_sleep = max(1.0, _safe_float(cfg.get("idle_sleep_seconds"), 5)) reconnect_delay = max(1.0, _safe_float(cfg.get("reconnect_delay_seconds"), 5)) log_every = max(1, _safe_int(cfg.get("log_every_events"), 100)) while True: symbols_count = 0 try: cfg = price_streamer_config() targets = load_stream_targets(cfg=cfg) symbols = sorted(targets.keys()) symbols_count = len(symbols) if not symbols: print("[price-streamer] no active symbols, sleeping", flush=True) await asyncio.sleep(idle_sleep) continue url = _stream_url(symbols, cfg) print(f"[price-streamer] connecting symbols={len(symbols)}", flush=True) last_refresh = asyncio.get_running_loop().time() count = 0 async with websockets.connect( url, ping_interval=max(5.0, _safe_float(cfg.get("ping_interval_seconds"), 30)), ping_timeout=max(5.0, _safe_float(cfg.get("ping_timeout_seconds"), 60)), close_timeout=max(1.0, _safe_float(cfg.get("close_timeout_seconds"), 5)), ) as ws: async for raw in ws: symbol, price = _parse_ticker_message(raw) if symbol and price > 0: handle_price_tick(symbol, price, targets, event_time=_now(), cfg=cfg) count += 1 if count % log_every == 0: print(f"[price-streamer] ticks={count} latest={symbol} {price}", flush=True) if asyncio.get_running_loop().time() - last_refresh >= refresh_seconds: new_targets = load_stream_targets(cfg=cfg) if sorted(new_targets.keys()) != symbols: print("[price-streamer] symbol set changed, reconnecting", flush=True) break targets = new_targets last_refresh = asyncio.get_running_loop().time() except Exception as exc: if _is_transient_ws_error(exc): print(f"[price-streamer] websocket disconnected, reconnecting: {exc}", flush=True) _record_transient_disconnect(exc, cfg, symbols_count) else: print(f"[price-streamer] error: {exc}", flush=True) try: record_exception(exc, source="price_streamer") except Exception: pass await asyncio.sleep(reconnect_delay) def main(): asyncio.run(run_forever()) __all__ = [ "handle_price_tick", "load_stream_targets", "main", "run_forever", ] if __name__ == "__main__": main()