from __future__ import annotations import asyncio from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any import aiohttp from .clob import fetch_book from .clob_ws import ClobBookState, clob_market_stream from .duckstore import DuckStore from .gamma import fetch_event_by_slug from .models import BookSnapshot, EdgeSnapshot, MarketInfo, PriceTick from .prices import resilient_btc_price_stream from .strategy import build_edge_snapshot from .tick_buffer import BoundaryPrice, TickBuffer from .volatility import RollingVolatility @dataclass class LiveState: running: bool = False market: MarketInfo | None = None benchmark_source: str = "none" latest_tick: PriceTick | None = None up_book: BookSnapshot | None = None down_book: BookSnapshot | None = None edge: EdgeSnapshot | None = None start_boundary: dict[str, Any] | None = None final_boundary: dict[str, Any] | None = None rtds_lag_ms: int | None = None clob_book_age_ms: int | None = None clob_last_event_type: str | None = None is_trusted_market: bool = False samples_written: int = 0 last_sample_written_at: datetime | None = None recorder_path: str | None = None errors: list[str] = field(default_factory=list) events: list[dict[str, Any]] = field(default_factory=list) updated_at: datetime | None = None def add_event(self, message: str, *, level: str = "info") -> None: self.events.append( { "at": datetime.now(timezone.utc).isoformat(), "level": level, "message": message, } ) self.events = self.events[-80:] def add_error(self, message: str) -> None: self.errors.append(f"{datetime.now(timezone.utc).isoformat()} {message}") self.errors = self.errors[-20:] self.add_event(message, level="error") def as_dict(self) -> dict[str, Any]: now = datetime.now(timezone.utc) state_age_ms = ( int((now - self.updated_at).total_seconds() * 1000) if self.updated_at else None ) db_write_age_ms = ( int((now - self.last_sample_written_at).total_seconds() * 1000) if self.last_sample_written_at else None ) return { "running": self.running, "updated_at": self.updated_at.isoformat() if self.updated_at else None, "state_age_ms": state_age_ms, "benchmark_source": self.benchmark_source, "market": _market_dict(self.market), "tick": _tick_dict(self.latest_tick), "books": { "up": _book_dict(self.up_book), "down": _book_dict(self.down_book), }, "edge": self.edge.as_dict() if self.edge else None, "start_boundary": self.start_boundary, "final_boundary": self.final_boundary, "rtds_lag_ms": self.rtds_lag_ms, "clob_book_age_ms": self.clob_book_age_ms, "clob_last_event_type": self.clob_last_event_type, "is_trusted_market": self.is_trusted_market, "samples_written": self.samples_written, "last_sample_written_at": ( self.last_sample_written_at.isoformat() if self.last_sample_written_at else None ), "db_write_age_ms": db_write_age_ms, "health": _health_dict( running=self.running, state_age_ms=state_age_ms, rtds_lag_ms=self.rtds_lag_ms, clob_book_age_ms=self.clob_book_age_ms, db_write_age_ms=db_write_age_ms, has_recent_error=bool(self.errors), ), "recorder_path": self.recorder_path, "errors": self.errors, "events": self.events, } class LiveMonitor: def __init__( self, *, book_poll_s: float = 1.0, market_refresh_s: float = 5.0, publish_interval_s: float = 0.5, stale_restart_s: float = 20.0, duckdb_path: str = "data/updown.duckdb", ) -> None: self.book_poll_s = book_poll_s self.market_refresh_s = market_refresh_s self.publish_interval_s = publish_interval_s self.stale_restart_s = stale_restart_s self.state = LiveState() self._duck = DuckStore(Path(duckdb_path)) self.state.recorder_path = duckdb_path self._task: asyncio.Task[None] | None = None self._subscribers: set[asyncio.Queue[dict[str, Any]]] = set() self._proxy_benchmarks: dict[str, float] = {} self._rtds_boundary_benchmarks: dict[str, float] = {} self._tick_buffer = TickBuffer() self._start_boundaries: dict[str, BoundaryPrice] = {} self._final_boundaries: dict[str, BoundaryPrice] = {} self._clob_state = ClobBookState() self._clob_task: asyncio.Task[None] | None = None def start(self) -> None: if self._task is None or self._task.done(): self.state.running = True self._task = asyncio.create_task(self._supervise()) async def stop(self) -> None: self.state.running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass if self._clob_task: self._clob_task.cancel() try: await self._clob_task except asyncio.CancelledError: pass self._duck.close() def subscribe(self) -> asyncio.Queue[dict[str, Any]]: queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=5) self._subscribers.add(queue) queue.put_nowait(self.state.as_dict()) return queue def unsubscribe(self, queue: asyncio.Queue[dict[str, Any]]) -> None: self._subscribers.discard(queue) async def _publish(self) -> None: payload = self.state.as_dict() for queue in list(self._subscribers): if queue.full(): try: queue.get_nowait() except asyncio.QueueEmpty: pass await queue.put(payload) async def _supervise(self) -> None: while self.state.running: try: await self._run_once() except asyncio.CancelledError: raise except Exception as exc: self.state.add_error(f"monitor restarted after failure: {exc}") await asyncio.sleep(1) async def _run_once(self) -> None: timeout = aiohttp.ClientTimeout(total=10) vol = RollingVolatility() next_market_refresh = 0.0 next_book_poll = 0.0 next_publish = 0.0 async with aiohttp.ClientSession(timeout=timeout) as session: price_stream = resilient_btc_price_stream(session) while self.state.running: try: tick = await asyncio.wait_for( anext(price_stream), timeout=self.stale_restart_s, ) except TimeoutError as exc: raise RuntimeError("price stream stale") from exc self.state.latest_tick = tick self.state.updated_at = datetime.now(timezone.utc) if tick.source == "polymarket_rtds_chainlink": self._tick_buffer.add(tick) if tick.source_timestamp_ms is not None: self.state.rtds_lag_ms = int( self.state.updated_at.timestamp() * 1000 ) - tick.source_timestamp_ms if self.state.market and self.state.benchmark_source == "proxy_start": await self._refresh_market(session, tick) vol.add(tick) now = asyncio.get_running_loop().time() if now >= next_market_refresh: await self._refresh_market(session, tick) next_market_refresh = now + self.market_refresh_s if self.state.market and now >= next_book_poll: await self._refresh_books(session) next_book_poll = now + self.book_poll_s self._refresh_edge(tick, vol) self._record_sample() if now >= next_publish: await self._publish() next_publish = now + self.publish_interval_s async def _refresh_market(self, session: aiohttp.ClientSession, tick: PriceTick) -> None: slug = current_btc_updown_slug() if self.state.market and self.state.market.event_slug == slug: if ( self.state.market.price_to_beat is None or ( self.state.benchmark_source == "proxy_start" and tick.source == "polymarket_rtds_chainlink" ) ): await self._load_market(session, slug, tick) return await self._load_market(session, slug, tick) async def _load_market( self, session: aiohttp.ClientSession, slug: str, tick: PriceTick, ) -> None: try: market = await fetch_event_by_slug(session, slug) except Exception as exc: self.state.add_error(f"market fetch failed for {slug}: {exc}") return old_slug = self.state.market.event_slug if self.state.market else None if old_slug != market.event_slug: self.state.add_event(f"switched to {market.event_slug}") self.state.up_book = None self.state.down_book = None self.state.edge = None self.state.start_boundary = None self.state.final_boundary = None self.state.is_trusted_market = False await self._restart_clob_stream(session, market) if market.price_to_beat is None: benchmark, source = self._benchmark_from_market(market, tick) market = MarketInfo( event_slug=market.event_slug, title=market.title, market_id=market.market_id, condition_id=market.condition_id, start_time=market.start_time, end_time=market.end_time, price_to_beat=benchmark, up_token_id=market.up_token_id, down_token_id=market.down_token_id, accepting_orders=market.accepting_orders, enable_order_book=market.enable_order_book, closed=market.closed, best_bid=market.best_bid, best_ask=market.best_ask, ) self.state.benchmark_source = source else: self.state.benchmark_source = "gamma_priceToBeat" self.state.market = market self._refresh_boundary_state(market) def _benchmark_from_market(self, market: MarketInfo, tick: PriceTick) -> tuple[float, str]: if market.start_time is not None: target_ms = int(market.start_time.timestamp() * 1000) boundary = self._tick_buffer.nearest(target_ms) if boundary is not None: self._start_boundaries[market.event_slug] = boundary self._rtds_boundary_benchmarks[market.event_slug] = boundary.price return boundary.price, "rtds_boundary" benchmark = self._proxy_benchmarks.setdefault(market.event_slug, tick.price) return benchmark, "proxy_start" def _refresh_boundary_state(self, market: MarketInfo) -> None: start = self._start_boundaries.get(market.event_slug) if start is not None: self.state.start_boundary = _boundary_dict(start) if market.end_time is not None: final = self._tick_buffer.nearest(int(market.end_time.timestamp() * 1000)) if final is not None: self._final_boundaries[market.event_slug] = final self.state.final_boundary = _boundary_dict(final) self.state.is_trusted_market = ( self.state.benchmark_source in {"gamma_priceToBeat", "rtds_boundary"} and self.state.start_boundary is not None ) async def _refresh_books(self, session: aiohttp.ClientSession) -> None: market = self.state.market if market is None: return up = self._clob_state.books.get(market.up_token_id) down = self._clob_state.books.get(market.down_token_id) self.state.clob_book_age_ms = self._clob_state.age_ms self.state.clob_last_event_type = self._clob_state.last_event_type if up is None or down is None: rest_up, rest_down = await asyncio.gather( _safe_book(session, market.up_token_id), _safe_book(session, market.down_token_id), ) up = up or rest_up down = down or rest_down self.state.up_book = up self.state.down_book = down async def _restart_clob_stream( self, session: aiohttp.ClientSession, market: MarketInfo, ) -> None: if self._clob_task: self._clob_task.cancel() try: await self._clob_task except asyncio.CancelledError: pass self._clob_state = ClobBookState() self._clob_task = asyncio.create_task( clob_market_stream( session, asset_ids=[market.up_token_id, market.down_token_id], state=self._clob_state, on_event=self._record_book_event, ) ) def _record_book_event(self, payload: dict[str, Any]) -> None: try: self._duck.write_book_event( payload, received_at=datetime.now(timezone.utc).isoformat(), ) except Exception as exc: self.state.add_error(f"duckdb book write failed: {exc}") def _refresh_edge(self, tick: PriceTick, vol: RollingVolatility) -> None: market = self.state.market sigma = vol.sigma_price_per_sqrt_second if market is None or market.price_to_beat is None or sigma is None: return try: self.state.edge = build_edge_snapshot( market=market, tick=tick, sigma_price_per_sqrt_second=sigma, up_book=self.state.up_book, down_book=self.state.down_book, ) except Exception as exc: self.state.add_error(f"edge refresh failed: {exc}") def _record_sample(self) -> None: if self.state.market is None or self.state.latest_tick is None: return payload = self.state.as_dict() payload["recorded_at"] = datetime.now(timezone.utc).isoformat() try: self._duck.write_observation(payload) self.state.samples_written += 1 self.state.last_sample_written_at = datetime.now(timezone.utc) except Exception as exc: self.state.add_error(f"sample write failed: {exc}") async def _safe_book( session: aiohttp.ClientSession, token_id: str, ) -> BookSnapshot | None: try: return await fetch_book(session, token_id) except Exception: return None def current_btc_updown_slug(now: datetime | None = None) -> str: moment = now or datetime.now(timezone.utc) timestamp = int(moment.timestamp()) start = timestamp - (timestamp % 300) return f"btc-updown-5m-{start}" def _market_dict(market: MarketInfo | None) -> dict[str, Any] | None: if market is None: return None return { "event_slug": market.event_slug, "title": market.title, "market_id": market.market_id, "condition_id": market.condition_id, "start_time": market.start_time.isoformat() if market.start_time else None, "end_time": market.end_time.isoformat() if market.end_time else None, "seconds_remaining": market.seconds_remaining, "price_to_beat": market.price_to_beat, "up_token_id": market.up_token_id, "down_token_id": market.down_token_id, "accepting_orders": market.accepting_orders, "enable_order_book": market.enable_order_book, "closed": market.closed, "best_bid": market.best_bid, "best_ask": market.best_ask, } def _tick_dict(tick: PriceTick | None) -> dict[str, Any] | None: if tick is None: return None return { "source": tick.source, "symbol": tick.symbol, "price": tick.price, "received_at": tick.received_at.isoformat(), "source_timestamp_ms": tick.source_timestamp_ms, } def _book_dict(book: BookSnapshot | None) -> dict[str, Any] | None: if book is None: return None return { "token_id": book.token_id, "best_bid": book.best_bid, "best_ask": book.best_ask, "bids": [{"price": item.price, "size": item.size} for item in book.bids[:8]], "asks": [{"price": item.price, "size": item.size} for item in book.asks[:8]], } def _boundary_dict(boundary: BoundaryPrice) -> dict[str, Any]: return { "price": boundary.price, "tick_timestamp_ms": boundary.tick_timestamp_ms, "target_timestamp_ms": boundary.target_timestamp_ms, "offset_ms": boundary.offset_ms, "source": boundary.source, } def _health_dict( *, running: bool, state_age_ms: int | None, rtds_lag_ms: int | None, clob_book_age_ms: int | None, db_write_age_ms: int | None, has_recent_error: bool, ) -> dict[str, Any]: issues = [] severity = 0 def add(level: int, message: str) -> None: nonlocal severity severity = max(severity, level) issues.append(message) if not running: add(2, "采集未运行") if state_age_ms is None: add(1, "尚未收到价格 tick") elif state_age_ms > 20_000: add(2, f"状态已停滞 {state_age_ms} ms") elif state_age_ms > 5_000: add(1, f"状态刷新偏慢 {state_age_ms} ms") if rtds_lag_ms is None: add(1, "RTDS 延迟未知") elif rtds_lag_ms > 30_000: add(2, f"RTDS 延迟过高 {rtds_lag_ms} ms") elif rtds_lag_ms > 10_000: add(1, f"RTDS 延迟偏高 {rtds_lag_ms} ms") if clob_book_age_ms is None: add(1, "CLOB 盘口尚未到达") elif clob_book_age_ms > 10_000: add(2, f"CLOB 盘口停滞 {clob_book_age_ms} ms") elif clob_book_age_ms > 3_000: add(1, f"CLOB 盘口偏旧 {clob_book_age_ms} ms") if db_write_age_ms is None: add(1, "DuckDB 尚未写入") elif db_write_age_ms > 20_000: add(2, f"DuckDB 写入停滞 {db_write_age_ms} ms") elif db_write_age_ms > 5_000: add(1, f"DuckDB 写入偏慢 {db_write_age_ms} ms") if has_recent_error: add(1, "近期有错误日志") labels = { 0: ("ok", "正常"), 1: ("warn", "注意"), 2: ("bad", "异常"), } status, label = labels[severity] return { "status": status, "label": label, "issues": issues[:5], }