alphax/app/services/price_streamer.py
2026-05-16 23:54:43 +08:00

195 lines
7.1 KiB
Python

"""Realtime Binance websocket price stream for paper trading.
The stream is intentionally separate from the cron tracker. Websocket ticks
drive paper-trading TP/SL decisions quickly, while cron remains a fallback.
"""
from __future__ import annotations
import asyncio
import json
from datetime import datetime
import websockets
from app.config.system_config import price_streamer_config
from app.db.altcoin_db import init_db, update_latest_price_cache
from app.db.paper_trading import sync_recommendation
from app.db.recommendation_queries import get_active_recommendations_deduped
from app.db.schema import get_conn
from app.db.system_logs import record_exception
def _now() -> str:
return datetime.now().isoformat()
def _safe_float(value, default=0.0) -> float:
try:
if value is None or value == "":
return default
return float(value)
except Exception:
return default
def _safe_int(value, default=0) -> int:
try:
return int(value or 0)
except Exception:
return default
def _stream_name(symbol: str) -> str:
base = str(symbol or "").replace("/", "").lower()
return f"{base}@ticker" if base else ""
def _stream_url(symbols: list[str], cfg: dict | None = None) -> str:
cfg = cfg or price_streamer_config()
base_url = str(cfg.get("stream_url") or "wss://stream.binance.com:9443/stream").rstrip("/")
streams = "/".join(_stream_name(s) for s in symbols if _stream_name(s))
return f"{base_url}?streams={streams}"
def _load_open_paper_trade_recs() -> list[dict]:
conn = get_conn()
try:
rows = conn.execute(
"""
SELECT
p.recommendation_id AS id,
p.symbol,
p.entry_price,
p.stop_loss,
p.tp1,
p.tp2,
p.source_status AS execution_status,
p.source_action AS action_status,
p.strategy_version
FROM paper_trades p
WHERE p.status='open'
ORDER BY p.opened_at DESC, p.id DESC
"""
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def load_stream_targets(limit: int | None = None, cfg: dict | None = None) -> dict[str, dict]:
"""Return symbol -> recommendation-like payload for websocket updates."""
cfg = cfg or price_streamer_config()
max_symbols = max(1, _safe_int(limit or cfg.get("max_stream_symbols"), 200))
targets: dict[str, dict] = {}
if cfg.get("include_actionable_recommendations", True):
for rec in get_active_recommendations_deduped(actionable_only=True, limit=max_symbols, with_meta=False):
symbol = str(rec.get("symbol") or "").strip().upper()
if symbol:
targets[symbol] = dict(rec)
if cfg.get("include_open_paper_trades", True):
for rec in _load_open_paper_trade_recs():
symbol = str(rec.get("symbol") or "").strip().upper()
if symbol:
targets.setdefault(symbol, rec)
return dict(list(targets.items())[:max_symbols])
def handle_price_tick(symbol: str, price: float, targets: dict[str, dict], event_time: str | None = None, cfg: dict | None = None) -> dict:
cfg = cfg or price_streamer_config()
symbol = str(symbol or "").strip().upper()
price = _safe_float(price)
if not symbol or price <= 0:
return {"skipped": True, "reason": "invalid_tick"}
event_time = event_time or _now()
if cfg.get("update_latest_price_cache", True):
update_latest_price_cache(symbol, price, updated_at=event_time, source="price_streamer")
if not cfg.get("sync_paper_trading", True):
return {"updated_price": True, "paper_trading": {"skipped": True, "reason": "disabled_by_streamer"}}
rec = targets.get(symbol)
if not rec:
return {"updated_price": True, "paper_trading": {"skipped": True, "reason": "no_target"}}
result = sync_recommendation(rec, price, event_time=event_time)
return {"updated_price": True, "paper_trading": result}
def _parse_ticker_message(raw: str) -> tuple[str, float]:
payload = json.loads(raw)
data = payload.get("data") if isinstance(payload, dict) else None
if not isinstance(data, dict):
data = payload if isinstance(payload, dict) else {}
symbol = str(data.get("s") or "")
price = _safe_float(data.get("c") or data.get("p") or data.get("lastPrice"))
if symbol.endswith("USDT"):
symbol = f"{symbol[:-4]}/USDT"
return symbol.upper(), price
async def run_forever() -> None:
init_db()
cfg = price_streamer_config()
if not cfg.get("enabled", True):
print("[price-streamer] disabled by system_config.price_streamer", flush=True)
while True:
await asyncio.sleep(max(1.0, _safe_float(cfg.get("idle_sleep_seconds"), 5)))
refresh_seconds = max(5.0, _safe_float(cfg.get("refresh_symbols_seconds"), 20))
idle_sleep = max(1.0, _safe_float(cfg.get("idle_sleep_seconds"), 5))
reconnect_delay = max(1.0, _safe_float(cfg.get("reconnect_delay_seconds"), 5))
log_every = max(1, _safe_int(cfg.get("log_every_events"), 100))
while True:
try:
cfg = price_streamer_config()
targets = load_stream_targets(cfg=cfg)
symbols = sorted(targets.keys())
if not symbols:
print("[price-streamer] no active symbols, sleeping", flush=True)
await asyncio.sleep(idle_sleep)
continue
url = _stream_url(symbols, cfg)
print(f"[price-streamer] connecting symbols={len(symbols)}", flush=True)
last_refresh = asyncio.get_running_loop().time()
count = 0
async with websockets.connect(url, ping_interval=20, ping_timeout=20, close_timeout=5) as ws:
async for raw in ws:
symbol, price = _parse_ticker_message(raw)
if symbol and price > 0:
handle_price_tick(symbol, price, targets, event_time=_now(), cfg=cfg)
count += 1
if count % log_every == 0:
print(f"[price-streamer] ticks={count} latest={symbol} {price}", flush=True)
if asyncio.get_running_loop().time() - last_refresh >= refresh_seconds:
new_targets = load_stream_targets(cfg=cfg)
if sorted(new_targets.keys()) != symbols:
print("[price-streamer] symbol set changed, reconnecting", flush=True)
break
targets = new_targets
last_refresh = asyncio.get_running_loop().time()
except Exception as exc:
print(f"[price-streamer] error: {exc}", flush=True)
try:
record_exception(exc, source="price_streamer")
except Exception:
pass
await asyncio.sleep(reconnect_delay)
def main():
asyncio.run(run_forever())
__all__ = [
"handle_price_tick",
"load_stream_targets",
"main",
"run_forever",
]
if __name__ == "__main__":
main()