339 lines
12 KiB
Python
339 lines
12 KiB
Python
"""Realtime Binance websocket price stream for paper trading.
|
|
|
|
The stream is intentionally separate from the cron tracker. Websocket ticks
|
|
drive paper-trading TP/SL decisions quickly, while cron remains a fallback.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime
|
|
|
|
import websockets
|
|
from websockets.exceptions import ConnectionClosed, ConnectionClosedError, ConnectionClosedOK
|
|
|
|
from app.config.system_config import price_streamer_config
|
|
from app.db.altcoin_db import init_db, update_latest_price_cache
|
|
from app.db.paper_trading import sync_recommendation
|
|
from app.db.recommendation_queries import get_active_recommendations_deduped
|
|
from app.db.schema import get_conn
|
|
from app.db.system_logs import record_exception, record_system_error
|
|
from app.services.live_trading_sync import sync_live_protection_from_paper, sync_paper_trade_to_live
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now().isoformat()
|
|
|
|
|
|
_LAST_TRANSIENT_LOG_AT = 0.0
|
|
_TRANSIENT_DISCONNECT_COUNT = 0
|
|
|
|
|
|
def _safe_float(value, default=0.0) -> float:
|
|
try:
|
|
if value is None or value == "":
|
|
return default
|
|
return float(value)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _safe_int(value, default=0) -> int:
|
|
try:
|
|
return int(value or 0)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _stream_name(symbol: str) -> str:
|
|
base = str(symbol or "").replace("/", "").lower()
|
|
return f"{base}@ticker" if base else ""
|
|
|
|
|
|
def _stream_url(symbols: list[str], cfg: dict | None = None) -> str:
|
|
cfg = cfg or price_streamer_config()
|
|
base_url = str(cfg.get("stream_url") or "wss://stream.binance.com:9443/stream").rstrip("/")
|
|
streams = "/".join(_stream_name(s) for s in symbols if _stream_name(s))
|
|
return f"{base_url}?streams={streams}"
|
|
|
|
|
|
def _load_open_paper_trade_recs() -> list[dict]:
|
|
conn = get_conn()
|
|
try:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
p.recommendation_id AS id,
|
|
p.symbol,
|
|
p.entry_price,
|
|
p.stop_loss,
|
|
p.tp1,
|
|
p.tp2,
|
|
p.source_status AS execution_status,
|
|
p.source_action AS action_status,
|
|
p.strategy_version
|
|
FROM paper_trades p
|
|
WHERE p.status='open'
|
|
ORDER BY p.opened_at DESC, p.id DESC
|
|
"""
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _load_pending_paper_order_recs() -> list[dict]:
|
|
conn = get_conn()
|
|
try:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
po.recommendation_id AS id,
|
|
po.symbol,
|
|
po.target_price AS entry_price,
|
|
po.stop_loss,
|
|
po.tp1,
|
|
po.tp2,
|
|
po.source_status AS execution_status,
|
|
po.source_action AS action_status,
|
|
po.strategy_version,
|
|
po.entry_plan_snapshot_json AS entry_plan_json
|
|
FROM paper_orders po
|
|
WHERE po.status='pending'
|
|
ORDER BY po.created_at DESC, po.id DESC
|
|
"""
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _load_pending_paper_order_rec(symbol: str) -> dict | None:
|
|
symbol = str(symbol or "").strip().upper()
|
|
if not symbol:
|
|
return None
|
|
conn = get_conn()
|
|
try:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT
|
|
po.recommendation_id AS id,
|
|
po.symbol,
|
|
po.side,
|
|
po.target_price AS entry_price,
|
|
po.stop_loss,
|
|
po.tp1,
|
|
po.tp2,
|
|
po.source_status AS execution_status,
|
|
po.source_action AS action_status,
|
|
po.strategy_version,
|
|
po.strategy_code,
|
|
po.strategy_signal_id,
|
|
po.strategy_snapshot_json,
|
|
po.factor_roles_json,
|
|
po.entry_plan_snapshot_json AS entry_plan_json
|
|
FROM paper_orders po
|
|
WHERE po.status='pending' AND po.symbol=%s
|
|
ORDER BY po.created_at ASC, po.id ASC
|
|
LIMIT 1
|
|
""",
|
|
(symbol,),
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def load_stream_targets(limit: int | None = None, cfg: dict | None = None) -> dict[str, dict]:
|
|
"""Return symbol -> recommendation-like payload for websocket updates."""
|
|
cfg = cfg or price_streamer_config()
|
|
max_symbols = max(1, _safe_int(limit or cfg.get("max_stream_symbols"), 200))
|
|
targets: dict[str, dict] = {}
|
|
|
|
if cfg.get("include_actionable_recommendations", True):
|
|
for rec in get_active_recommendations_deduped(actionable_only=True, limit=max_symbols, with_meta=False):
|
|
symbol = str(rec.get("symbol") or "").strip().upper()
|
|
if symbol:
|
|
targets[symbol] = dict(rec)
|
|
|
|
if cfg.get("include_open_paper_trades", True):
|
|
for rec in _load_pending_paper_order_recs():
|
|
symbol = str(rec.get("symbol") or "").strip().upper()
|
|
if symbol:
|
|
targets[symbol] = rec
|
|
|
|
for rec in _load_open_paper_trade_recs():
|
|
symbol = str(rec.get("symbol") or "").strip().upper()
|
|
if symbol:
|
|
targets[symbol] = rec
|
|
|
|
return dict(list(targets.items())[:max_symbols])
|
|
|
|
|
|
def handle_price_tick(symbol: str, price: float, targets: dict[str, dict], event_time: str | None = None, cfg: dict | None = None) -> dict:
|
|
cfg = cfg or price_streamer_config()
|
|
symbol = str(symbol or "").strip().upper()
|
|
price = _safe_float(price)
|
|
if not symbol or price <= 0:
|
|
return {"skipped": True, "reason": "invalid_tick"}
|
|
event_time = event_time or _now()
|
|
if cfg.get("update_latest_price_cache", True):
|
|
update_latest_price_cache(symbol, price, updated_at=event_time, source="price_streamer")
|
|
if not cfg.get("sync_paper_trading", True):
|
|
return {"updated_price": True, "paper_trading": {"skipped": True, "reason": "disabled_by_streamer"}}
|
|
# Pending limit orders are executable state. Prefer the order snapshot over
|
|
# any stale in-memory recommendation target so a websocket tick can fill the
|
|
# order immediately when the price touches.
|
|
rec = _load_pending_paper_order_rec(symbol) or targets.get(symbol)
|
|
if not rec:
|
|
return {"updated_price": True, "paper_trading": {"skipped": True, "reason": "no_target"}}
|
|
result = sync_recommendation(rec, price, event_time=event_time)
|
|
if result.get("trade_id") and (result.get("opened") or result.get("paper_order", {}).get("filled")):
|
|
result["live_sync"] = sync_paper_trade_to_live(int(result["trade_id"]), execute=True)
|
|
if result.get("trade_id") and (
|
|
result.get("closed")
|
|
or result.get("activated")
|
|
or result.get("moved")
|
|
or result.get("tightened")
|
|
or result.get("trailing_stop")
|
|
):
|
|
result["live_protection_sync"] = sync_live_protection_from_paper(limit=20)
|
|
return {"updated_price": True, "paper_trading": result}
|
|
|
|
|
|
def _parse_ticker_message(raw: str) -> tuple[str, float]:
|
|
payload = json.loads(raw)
|
|
data = payload.get("data") if isinstance(payload, dict) else None
|
|
if not isinstance(data, dict):
|
|
data = payload if isinstance(payload, dict) else {}
|
|
symbol = str(data.get("s") or "")
|
|
price = _safe_float(data.get("c") or data.get("p") or data.get("lastPrice"))
|
|
if symbol.endswith("USDT"):
|
|
symbol = f"{symbol[:-4]}/USDT"
|
|
return symbol.upper(), price
|
|
|
|
|
|
def _is_transient_ws_error(exc: BaseException) -> bool:
|
|
if isinstance(exc, (ConnectionClosed, ConnectionClosedError, ConnectionClosedOK, TimeoutError, asyncio.TimeoutError, OSError)):
|
|
return True
|
|
message = str(exc).lower()
|
|
return any(
|
|
needle in message
|
|
for needle in (
|
|
"keepalive ping timeout",
|
|
"no close frame received",
|
|
"connection closed",
|
|
"connection reset",
|
|
"temporarily unavailable",
|
|
"timed out",
|
|
)
|
|
)
|
|
|
|
|
|
def _record_transient_disconnect(exc: BaseException, cfg: dict, symbols_count: int) -> None:
|
|
global _LAST_TRANSIENT_LOG_AT, _TRANSIENT_DISCONNECT_COUNT
|
|
_TRANSIENT_DISCONNECT_COUNT += 1
|
|
loop_time = 0.0
|
|
try:
|
|
loop_time = asyncio.get_running_loop().time()
|
|
except Exception:
|
|
loop_time = 0.0
|
|
interval = max(0.0, _safe_float(cfg.get("transient_log_interval_seconds"), 900))
|
|
if interval > 0 and _LAST_TRANSIENT_LOG_AT and loop_time and loop_time - _LAST_TRANSIENT_LOG_AT < interval:
|
|
return
|
|
_LAST_TRANSIENT_LOG_AT = loop_time or _LAST_TRANSIENT_LOG_AT
|
|
try:
|
|
record_system_error(
|
|
source="price_streamer",
|
|
level="warning",
|
|
error_type=exc.__class__.__name__,
|
|
message=f"websocket transient disconnect: {exc}",
|
|
status_code=0,
|
|
context={
|
|
"symbols_count": symbols_count,
|
|
"disconnect_count_since_start": _TRANSIENT_DISCONNECT_COUNT,
|
|
"reconnect_delay_seconds": cfg.get("reconnect_delay_seconds"),
|
|
},
|
|
fingerprint="price_streamer_transient_disconnect",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
async def run_forever() -> None:
|
|
init_db()
|
|
cfg = price_streamer_config()
|
|
if not cfg.get("enabled", True):
|
|
print("[price-streamer] disabled by system_config.price_streamer", flush=True)
|
|
while True:
|
|
await asyncio.sleep(max(1.0, _safe_float(cfg.get("idle_sleep_seconds"), 5)))
|
|
|
|
refresh_seconds = max(5.0, _safe_float(cfg.get("refresh_symbols_seconds"), 20))
|
|
idle_sleep = max(1.0, _safe_float(cfg.get("idle_sleep_seconds"), 5))
|
|
reconnect_delay = max(1.0, _safe_float(cfg.get("reconnect_delay_seconds"), 5))
|
|
log_every = max(1, _safe_int(cfg.get("log_every_events"), 100))
|
|
|
|
while True:
|
|
symbols_count = 0
|
|
try:
|
|
cfg = price_streamer_config()
|
|
targets = load_stream_targets(cfg=cfg)
|
|
symbols = sorted(targets.keys())
|
|
symbols_count = len(symbols)
|
|
if not symbols:
|
|
print("[price-streamer] no active symbols, sleeping", flush=True)
|
|
await asyncio.sleep(idle_sleep)
|
|
continue
|
|
url = _stream_url(symbols, cfg)
|
|
print(f"[price-streamer] connecting symbols={len(symbols)}", flush=True)
|
|
last_refresh = asyncio.get_running_loop().time()
|
|
count = 0
|
|
async with websockets.connect(
|
|
url,
|
|
ping_interval=max(5.0, _safe_float(cfg.get("ping_interval_seconds"), 30)),
|
|
ping_timeout=max(5.0, _safe_float(cfg.get("ping_timeout_seconds"), 60)),
|
|
close_timeout=max(1.0, _safe_float(cfg.get("close_timeout_seconds"), 5)),
|
|
) as ws:
|
|
async for raw in ws:
|
|
symbol, price = _parse_ticker_message(raw)
|
|
if symbol and price > 0:
|
|
handle_price_tick(symbol, price, targets, event_time=_now(), cfg=cfg)
|
|
count += 1
|
|
if count % log_every == 0:
|
|
print(f"[price-streamer] ticks={count} latest={symbol} {price}", flush=True)
|
|
if asyncio.get_running_loop().time() - last_refresh >= refresh_seconds:
|
|
new_targets = load_stream_targets(cfg=cfg)
|
|
if sorted(new_targets.keys()) != symbols:
|
|
print("[price-streamer] symbol set changed, reconnecting", flush=True)
|
|
break
|
|
targets = new_targets
|
|
last_refresh = asyncio.get_running_loop().time()
|
|
except Exception as exc:
|
|
if _is_transient_ws_error(exc):
|
|
print(f"[price-streamer] websocket disconnected, reconnecting: {exc}", flush=True)
|
|
_record_transient_disconnect(exc, cfg, symbols_count)
|
|
else:
|
|
print(f"[price-streamer] error: {exc}", flush=True)
|
|
try:
|
|
record_exception(exc, source="price_streamer")
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(reconnect_delay)
|
|
|
|
|
|
def main():
|
|
asyncio.run(run_forever())
|
|
|
|
|
|
__all__ = [
|
|
"handle_price_tick",
|
|
"load_stream_targets",
|
|
"main",
|
|
"run_forever",
|
|
]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|