commit 055f47d88c296cfdedb31b97ae1181bd6807e9c3 Author: aaron <> Date: Fri May 22 00:38:22 2026 +0800 first commit diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..3e4b8c8 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,9 @@ +.git +.venv +.pytest_cache +__pycache__ +*.pyc +data/*.jsonl +data/*.duckdb +data/*.lock +*.log diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c306c8c --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.venv/ +__pycache__/ +.pytest_cache/ +*.py[cod] +data/*.jsonl +data/*.duckdb +data/*.lock +data/*.log +*.log diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4356c1d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 + +WORKDIR /app + +COPY pyproject.toml README.md ./ +COPY src ./src + +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir . + +RUN useradd --create-home --shell /usr/sbin/nologin appuser \ + && mkdir -p /app/data \ + && chown -R appuser:appuser /app + +USER appuser + +EXPOSE 8765 + +CMD ["updown-dashboard", "--host", "0.0.0.0", "--port", "8765"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..1ae9853 --- /dev/null +++ b/README.md @@ -0,0 +1,98 @@ +# Polymarket BTC Up/Down Lab + +只读可行性验证项目,用来研究 Polymarket BTC Up/Down 5m 市场是否存在足够稳定的定价偏差。 + +当前版本不会连接钱包、不会签名、不会下单。它做三件事: + +- 从 Gamma API 读取事件、market、token id、`priceToBeat`。 +- 从 Polymarket CLOB WebSocket 读取 Up/Down 增量盘口。 +- 从 Polymarket RTDS Chainlink BTC/USD 获取实时价格,估算 `P(end >= priceToBeat)` 和扣费后的 taker edge。 +- 将实时观测和盘口事件写入 DuckDB。 + +## 安装 + +```bash +python3 -m venv .venv +.venv/bin/pip install -e ".[dev]" +``` + +## 观察指定市场 + +```bash +.venv/bin/updown-lab observe --slug btc-updown-5m-1779347700 --duration 60 +``` + +输出会写入 `data/observations.jsonl`,每行是一条可回放的 JSON 快照。 + +## 发现当前候选市场 + +```bash +.venv/bin/updown-lab discover --limit 10 +``` + +Gamma 的 recurring series 有时会返回尚未启用订单簿的未来市场,所以实盘观察前要看 `acceptingOrders`、`enableOrderBook`、`priceToBeat` 是否存在。 + +## 启动实时监控网站 + +```bash +.venv/bin/updown-dashboard --host 127.0.0.1 --port 8765 +``` + +然后打开: + +```text +http://127.0.0.1:8765 +``` + +监控台会按 5 分钟 Unix 时间边界自动切换当前 BTC Up/Down 市场,实时展示价格、盘口、fair value、taker edge 和最近日志。实时网站的数据源是 `data/updown.duckdb`。 + +如果 live 市场的 Gamma 元数据暂时没有 `priceToBeat`,仪表盘会优先使用 RTDS 边界 tick 作为 `rtds_boundary`。只有中途启动且缺少边界 tick 时才会临时显示 `proxy_start`,这种样本不会计入可信回测。 + +## 策略口径 + +第一版 fair value 使用短期布朗运动近似: + +```text +z = (current_price - price_to_beat) / (sigma * sqrt(seconds_remaining)) +fair_prob = normal_cdf(z) +``` + +其中 `sigma` 由最近价格收益率滚动估算,并设置了下限,避免临近结算时概率过度极端。 + +扣 taker fee 后的近似 edge: + +```text +buy_up_edge = fair_prob - up_ask - fee(up_ask) +buy_down_edge = (1 - fair_prob) - down_ask - fee(down_ask) +``` + +Polymarket crypto fee 近似: + +```text +fee = 0.07 * price * (1 - price) +``` + +## 下一步 + +- 用 DuckDB 回放页分析入场时机、滑点、手续费后 edge 和窗口胜率。 +- 增加进程锁/单实例启动保护,避免多个 dashboard 同时写一个 DuckDB 文件。 +- 用海外 VPS 长时间采集,比较 RTDS 延迟、边界 tick 命中率和 CLOB 盘口年龄。 + +## 海外 VPS 部署 + +部署脚本在 `deploy/` 目录。推荐美国东部 Ubuntu VPS,通过 SSH tunnel 访问本地绑定的 dashboard: + +```bash +deploy/sync_to_server.sh user@YOUR_SERVER_IP /opt/updown-dashboard +ssh user@YOUR_SERVER_IP +cd /opt/updown-dashboard +sudo bash deploy/install_ubuntu.sh +sudo bash deploy/install_systemd.sh +sudo systemctl start updown-dashboard +``` + +本地访问: + +```bash +ssh -L 8765:127.0.0.1:8765 user@YOUR_SERVER_IP +``` diff --git a/data/updown.duckdb.wal b/data/updown.duckdb.wal new file mode 100644 index 0000000..a0c456b Binary files /dev/null and b/data/updown.duckdb.wal differ diff --git a/deploy/README.md b/deploy/README.md new file mode 100644 index 0000000..68ccbbd --- /dev/null +++ b/deploy/README.md @@ -0,0 +1,66 @@ +# Overseas VPS Deployment + +This deployment keeps the dashboard bound to `127.0.0.1:8765` on the server. Access it with an SSH tunnel instead of exposing it publicly. + +## Recommended server + +- Ubuntu 22.04 or 24.04 +- US East preferred, e.g. Virginia / New York / New Jersey +- 1 vCPU / 1 GB RAM is enough for observation + +## One-time server setup + +On the remote server: + +```bash +sudo bash deploy/install_ubuntu.sh +``` + +## Deploy app + +Copy this project to the server, then from the project root: + +```bash +sudo bash deploy/install_systemd.sh +sudo systemctl start updown-dashboard +sudo systemctl status updown-dashboard +``` + +## Access dashboard + +From your laptop: + +```bash +ssh -L 8765:127.0.0.1:8765 user@YOUR_SERVER_IP +``` + +Then open: + +```text +http://127.0.0.1:8765 +``` + +## Logs and data + +```bash +docker compose logs -f +curl http://127.0.0.1:8765/api/state +curl http://127.0.0.1:8765/api/analytics +curl http://127.0.0.1:8765/api/health +``` + +Realtime observations and CLOB events are stored in `data/updown.duckdb`. + +## Compare local vs overseas + +Watch these fields: + +- `rtds_lag_ms` +- `start_boundary.offset_ms` +- `trusted_samples / total_samples` +- `paper.trades` +- `paper.pnl` +- `clob_book_age_ms` +- DuckDB `trusted_observations` + +The overseas deployment is useful only if these improve materially. diff --git a/deploy/check_remote.sh b/deploy/check_remote.sh new file mode 100755 index 0000000..bb397c4 --- /dev/null +++ b/deploy/check_remote.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +set -euo pipefail + +curl -sS http://127.0.0.1:8765/api/state | python3 -m json.tool +curl -sS http://127.0.0.1:8765/api/analytics | python3 -m json.tool +curl -sS http://127.0.0.1:8765/api/health | python3 -m json.tool diff --git a/deploy/install_systemd.sh b/deploy/install_systemd.sh new file mode 100755 index 0000000..a640da0 --- /dev/null +++ b/deploy/install_systemd.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [[ "${EUID}" -ne 0 ]]; then + echo "Run as root: sudo bash deploy/install_systemd.sh" >&2 + exit 1 +fi + +PROJECT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +SERVICE_FILE="/etc/systemd/system/updown-dashboard.service" + +cat > "${SERVICE_FILE}" <&2 + exit 1 +fi + +apt-get update +apt-get install -y ca-certificates curl gnupg git ufw chrony + +install -m 0755 -d /etc/apt/keyrings +if [[ ! -f /etc/apt/keyrings/docker.gpg ]]; then + curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg + chmod a+r /etc/apt/keyrings/docker.gpg +fi + +. /etc/os-release +echo \ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu ${VERSION_CODENAME} stable" \ + > /etc/apt/sources.list.d/docker.list + +apt-get update +apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin + +systemctl enable --now docker +systemctl enable --now chrony + +ufw allow OpenSSH +ufw --force enable + +echo "Server base setup complete." +echo "Optional: add your deploy user to docker group:" +echo " sudo usermod -aG docker \$USER" diff --git a/deploy/sync_to_server.sh b/deploy/sync_to_server.sh new file mode 100755 index 0000000..b1cc969 --- /dev/null +++ b/deploy/sync_to_server.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [[ $# -ne 2 ]]; then + echo "Usage: deploy/sync_to_server.sh user@host /remote/path" >&2 + exit 1 +fi + +TARGET="$1" +REMOTE_PATH="$2" + +rsync -az --delete \ + --exclude ".git" \ + --exclude ".venv" \ + --exclude ".pytest_cache" \ + --exclude "__pycache__" \ + --exclude "data/*.jsonl" \ + --exclude "data/*.duckdb" \ + --exclude "data/*.lock" \ + --exclude "data/*.log" \ + ./ "${TARGET}:${REMOTE_PATH}/" + +echo "Synced to ${TARGET}:${REMOTE_PATH}" diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..15eab1c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,17 @@ +services: + updown-dashboard: + build: . + container_name: updown-dashboard + restart: unless-stopped + ports: + - "127.0.0.1:8765:8765" + volumes: + - ./data:/app/data + environment: + - TZ=UTC + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8765/api/state', timeout=5).read()"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 20s diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e331433 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,32 @@ +[build-system] +requires = ["setuptools>=69", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "polymarket-btc-updown-lab" +version = "0.1.0" +description = "Read-only feasibility lab for Polymarket BTC Up/Down 5m markets." +requires-python = ">=3.11" +dependencies = [ + "aiohttp>=3.9", + "duckdb>=1.1", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", +] + +[project.scripts] +updown-lab = "poly_updown.cli:main" +updown-dashboard = "poly_updown.server:main" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.setuptools.package-data] +poly_updown = ["web/*"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["src"] diff --git a/src/poly_updown/__init__.py b/src/poly_updown/__init__.py new file mode 100644 index 0000000..2a131c2 --- /dev/null +++ b/src/poly_updown/__init__.py @@ -0,0 +1,5 @@ +"""Read-only Polymarket BTC Up/Down feasibility lab.""" + +__all__ = ["__version__"] + +__version__ = "0.1.0" diff --git a/src/poly_updown/analytics.py b/src/poly_updown/analytics.py new file mode 100644 index 0000000..7be1265 --- /dev/null +++ b/src/poly_updown/analytics.py @@ -0,0 +1,333 @@ +from __future__ import annotations + +import json +from collections import Counter, defaultdict +from collections import deque +from dataclasses import dataclass +from pathlib import Path +from typing import Any + + +DEFAULT_EDGE_THRESHOLD = 0.03 +DEFAULT_NOTIONAL = 25.0 +TRUSTED_BENCHMARK_SOURCES = {"gamma_priceToBeat", "rtds_boundary"} +TRUSTED_PRICE_SOURCES = {"polymarket_rtds_chainlink"} + + +@dataclass +class MarketStats: + slug: str + title: str | None + samples: int + start_time: str | None + end_time: str | None + first_seen: str | None + last_seen: str | None + benchmark_source: str | None + price_to_beat: float | None + first_price: float | None + last_price: float | None + final_direction: str | None + max_buy_up_edge: float | None + max_buy_down_edge: float | None + max_last60_up_edge: float | None + max_last60_down_edge: float | None + up_signals: int + down_signals: int + source_counts: dict[str, int] + paper_trades: list[dict[str, Any]] + paper_pnl: float + + def as_dict(self) -> dict[str, Any]: + return self.__dict__.copy() + + +def load_recent_market_stats( + path: Path, + *, + limit: int = 12, + max_rows: int = 12000, + edge_threshold: float = DEFAULT_EDGE_THRESHOLD, + notional: float = DEFAULT_NOTIONAL, +) -> dict[str, Any]: + rows = _read_jsonl(path, max_rows=max_rows) + return build_recent_market_stats( + rows, + path=str(path), + limit=limit, + edge_threshold=edge_threshold, + notional=notional, + ) + + +def build_recent_market_stats( + rows: list[dict[str, Any]], + *, + path: str, + limit: int = 12, + edge_threshold: float = DEFAULT_EDGE_THRESHOLD, + notional: float = DEFAULT_NOTIONAL, +) -> dict[str, Any]: + grouped: dict[str, list[dict[str, Any]]] = defaultdict(list) + for row in rows: + slug = (row.get("market") or {}).get("event_slug") + if slug: + grouped[slug].append(row) + + stats = [ + _build_stats(slug, samples, edge_threshold=edge_threshold, notional=notional) + for slug, samples in grouped.items() + ] + stats.sort(key=lambda item: item.last_seen or "", reverse=True) + return { + "path": path, + "total_samples": len(rows), + "trusted_samples": sum(_is_trusted_sample(row) for row in rows), + "market_count": len(stats), + "edge_threshold": edge_threshold, + "notional": notional, + "paper": _paper_summary(stats), + "markets": [item.as_dict() for item in stats[:limit]], + } + + +def _read_jsonl(path: Path, *, max_rows: int = 12000) -> list[dict[str, Any]]: + if not path.exists(): + return [] + rows: deque[dict[str, Any]] = deque(maxlen=max_rows) + with path.open("r", encoding="utf-8") as handle: + for line in handle: + line = line.strip() + if not line: + continue + try: + rows.append(json.loads(line)) + except json.JSONDecodeError: + continue + return list(rows) + + +def _build_stats( + slug: str, + samples: list[dict[str, Any]], + *, + edge_threshold: float, + notional: float, +) -> MarketStats: + samples.sort(key=lambda item: item.get("recorded_at") or "") + first = samples[0] + last = samples[-1] + market = last.get("market") or {} + + up_edges = _edge_values(samples, "buy_up_edge") + down_edges = _edge_values(samples, "buy_down_edge") + last60 = [item for item in samples if _seconds_remaining(item) <= 60] + source_counts = Counter( + ((item.get("tick") or {}).get("source") or "unknown") for item in samples + ) + + first_price = _tick_price(first) + last_price = _tick_price(last) + price_to_beat = _number(market.get("price_to_beat")) + + final_direction = None + if last_price is not None and price_to_beat is not None: + final_direction = "up" if last_price >= price_to_beat else "down" + paper_trades = _paper_trades( + samples, + final_direction, + edge_threshold=edge_threshold, + notional=notional, + ) + + return MarketStats( + slug=slug, + title=market.get("title"), + samples=len(samples), + start_time=market.get("start_time"), + end_time=market.get("end_time"), + first_seen=first.get("recorded_at"), + last_seen=last.get("recorded_at"), + benchmark_source=last.get("benchmark_source"), + price_to_beat=price_to_beat, + first_price=first_price, + last_price=last_price, + final_direction=final_direction, + max_buy_up_edge=max(up_edges) if up_edges else None, + max_buy_down_edge=max(down_edges) if down_edges else None, + max_last60_up_edge=_max_edge(last60, "buy_up_edge"), + max_last60_down_edge=_max_edge(last60, "buy_down_edge"), + up_signals=sum(value >= edge_threshold for value in up_edges), + down_signals=sum(value >= edge_threshold for value in down_edges), + source_counts=dict(source_counts), + paper_trades=paper_trades, + paper_pnl=sum(trade["pnl"] for trade in paper_trades), + ) + + +def _paper_summary(stats: list[MarketStats]) -> dict[str, Any]: + trades = [trade for stat in stats for trade in stat.paper_trades] + wins = [trade for trade in trades if trade["pnl"] > 0] + pnl = sum(trade["pnl"] for trade in trades) + return { + "trades": len(trades), + "wins": len(wins), + "win_rate": (len(wins) / len(trades)) if trades else None, + "pnl": pnl, + "avg_pnl": (pnl / len(trades)) if trades else None, + } + + +def _paper_trades( + samples: list[dict[str, Any]], + final_direction: str | None, + *, + edge_threshold: float, + notional: float, +) -> list[dict[str, Any]]: + if final_direction not in {"up", "down"}: + return [] + + trades: list[dict[str, Any]] = [] + seen_sides: set[str] = set() + for sample in samples: + if not _is_trusted_sample(sample): + continue + edge = sample.get("edge") or {} + for side in ("up", "down"): + if side in seen_sides: + continue + edge_value = _number(edge.get(f"buy_{side}_edge")) + fair_prob = _number(edge.get("fair_up")) + if side == "down" and fair_prob is not None: + fair_prob = 1 - fair_prob + execution = _simulate_taker_fill(sample, side, notional=notional) + if edge_value is None or fair_prob is None or execution is None: + continue + ask = execution["avg_price"] + fee = 0.07 * ask * (1 - ask) + effective_edge = fair_prob - ask - fee + if edge_value < edge_threshold or effective_edge < edge_threshold: + continue + won = side == final_direction + pnl_per_share = (1 - ask - fee) if won else (-ask - fee) + trades.append( + { + "side": side, + "recorded_at": sample.get("recorded_at"), + "seconds_remaining": _seconds_remaining(sample), + "price": ask, + "requested_notional": notional, + "filled_notional": execution["filled_notional"], + "shares": execution["shares"], + "levels_used": execution["levels_used"], + "fee": fee, + "edge": effective_edge, + "raw_edge": edge_value, + "slippage": ask - execution["best_ask"], + "won": won, + "pnl": pnl_per_share * execution["shares"], + "price_source": (sample.get("tick") or {}).get("source"), + "benchmark_source": sample.get("benchmark_source"), + } + ) + seen_sides.add(side) + return trades + + +def _is_trusted_sample(sample: dict[str, Any]) -> bool: + return ( + sample.get("benchmark_source") in TRUSTED_BENCHMARK_SOURCES + and (sample.get("tick") or {}).get("source") in TRUSTED_PRICE_SOURCES + and sample.get("start_boundary") is not None + ) + + +def _simulate_taker_fill( + sample: dict[str, Any], + side: str, + *, + notional: float, +) -> dict[str, Any] | None: + book = ((sample.get("books") or {}).get(side) or {}) + asks = book.get("asks") or [] + if not asks: + edge = sample.get("edge") or {} + ask = _number(edge.get(f"{side}_ask")) + if ask is None: + return None + return { + "avg_price": ask, + "best_ask": ask, + "filled_notional": notional, + "shares": notional / ask, + "levels_used": 1, + } + + remaining = notional + cost = 0.0 + shares = 0.0 + levels_used = 0 + best_ask = _number(asks[0].get("price")) + if best_ask is None: + return None + + for level in asks: + price = _number(level.get("price")) + size = _number(level.get("size")) + if price is None or size is None or price <= 0 or size <= 0: + continue + level_capacity = price * size + spend = min(remaining, level_capacity) + if spend <= 0: + break + cost += spend + shares += spend / price + remaining -= spend + levels_used += 1 + if remaining <= 1e-9: + break + + if remaining > 1e-6 or shares <= 0: + return None + return { + "avg_price": cost / shares, + "best_ask": best_ask, + "filled_notional": cost, + "shares": shares, + "levels_used": levels_used, + } + + +def _edge_values(samples: list[dict[str, Any]], field: str) -> list[float]: + values = [] + for sample in samples: + value = (sample.get("edge") or {}).get(field) + value = _number(value) + if value is not None: + values.append(value) + return values + + +def _max_edge(samples: list[dict[str, Any]], field: str) -> float | None: + values = _edge_values(samples, field) + return max(values) if values else None + + +def _seconds_remaining(sample: dict[str, Any]) -> float: + value = (sample.get("market") or {}).get("seconds_remaining") + parsed = _number(value) + return parsed if parsed is not None else float("inf") + + +def _tick_price(sample: dict[str, Any]) -> float | None: + return _number((sample.get("tick") or {}).get("price")) + + +def _number(value: Any) -> float | None: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None diff --git a/src/poly_updown/cli.py b/src/poly_updown/cli.py new file mode 100644 index 0000000..380383c --- /dev/null +++ b/src/poly_updown/cli.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +import argparse +import asyncio +from pathlib import Path + +import aiohttp + +from .gamma import discover_btc_updown, fetch_event_by_slug +from .observer import observe_market + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="updown-lab") + subparsers = parser.add_subparsers(dest="command", required=True) + + inspect_parser = subparsers.add_parser("inspect", help="Inspect one event slug.") + inspect_parser.add_argument("--slug", required=True) + + discover_parser = subparsers.add_parser("discover", help="Discover BTC Up/Down 5m candidates.") + discover_parser.add_argument("--limit", type=int, default=10) + + observe_parser = subparsers.add_parser("observe", help="Run read-only live observation.") + observe_parser.add_argument("--slug", required=True) + observe_parser.add_argument("--duration", type=float, default=60) + observe_parser.add_argument("--output", type=Path, default=Path("data/observations.jsonl")) + observe_parser.add_argument("--poll-books", type=float, default=1.0) + observe_parser.add_argument("--rest-price", action="store_true", help="Poll Binance REST instead of WS.") + + return parser + + +async def _inspect(slug: str) -> None: + async with aiohttp.ClientSession() as session: + market = await fetch_event_by_slug(session, slug) + print_market(market) + + +async def _discover(limit: int) -> None: + async with aiohttp.ClientSession() as session: + markets = await discover_btc_updown(session, limit=limit) + for market in markets: + print_market(market) + print() + + +def print_market(market) -> None: + print(f"slug: {market.event_slug}") + print(f"title: {market.title}") + print(f"market_id: {market.market_id}") + print(f"condition_id: {market.condition_id}") + print(f"start_time: {market.start_time}") + print(f"end_time: {market.end_time}") + print(f"seconds_remaining: {market.seconds_remaining}") + print(f"price_to_beat: {market.price_to_beat}") + print(f"up_token_id: {market.up_token_id}") + print(f"down_token_id: {market.down_token_id}") + print(f"accepting_orders: {market.accepting_orders}") + print(f"enable_order_book: {market.enable_order_book}") + print(f"closed: {market.closed}") + print(f"best_bid/best_ask: {market.best_bid}/{market.best_ask}") + + +async def async_main(args: argparse.Namespace) -> None: + if args.command == "inspect": + await _inspect(args.slug) + elif args.command == "discover": + await _discover(args.limit) + elif args.command == "observe": + await observe_market( + slug=args.slug, + duration_s=args.duration, + output_path=args.output, + poll_books_s=args.poll_books, + use_websocket_price=not args.rest_price, + ) + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + asyncio.run(async_main(args)) + + +if __name__ == "__main__": + main() diff --git a/src/poly_updown/clob.py b/src/poly_updown/clob.py new file mode 100644 index 0000000..588abdf --- /dev/null +++ b/src/poly_updown/clob.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from typing import Any + +import aiohttp + +from .models import BookLevel, BookSnapshot + + +CLOB_BASE = "https://clob.polymarket.com" + + +def _parse_levels(levels: list[dict[str, Any]], *, reverse: bool) -> tuple[BookLevel, ...]: + parsed = [ + BookLevel(price=float(level["price"]), size=float(level["size"])) + for level in levels + if "price" in level and "size" in level + ] + return tuple(sorted(parsed, key=lambda level: level.price, reverse=reverse)) + + +async def fetch_book(session: aiohttp.ClientSession, token_id: str) -> BookSnapshot: + async with session.get(f"{CLOB_BASE}/book", params={"token_id": token_id}) as response: + response.raise_for_status() + payload = await response.json() + + if "error" in payload: + raise RuntimeError(payload["error"]) + + return BookSnapshot( + token_id=token_id, + bids=_parse_levels(payload.get("bids") or [], reverse=True), + asks=_parse_levels(payload.get("asks") or [], reverse=False), + raw=payload, + ) diff --git a/src/poly_updown/clob_ws.py b/src/poly_updown/clob_ws.py new file mode 100644 index 0000000..fd1a7d9 --- /dev/null +++ b/src/poly_updown/clob_ws.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +from typing import Any + +import aiohttp + +from .models import BookLevel, BookSnapshot + + +CLOB_WS = "wss://ws-subscriptions-clob.polymarket.com/ws/market" + + +class ClobBookState: + def __init__(self) -> None: + self.books: dict[str, BookSnapshot] = {} + self.last_event_at: datetime | None = None + self.last_event_type: str | None = None + + @property + def age_ms(self) -> int | None: + if self.last_event_at is None: + return None + return int((datetime.now(timezone.utc) - self.last_event_at).total_seconds() * 1000) + + def handle(self, payload: dict[str, Any]) -> None: + event_type = payload.get("event_type") + self.last_event_at = datetime.now(timezone.utc) + self.last_event_type = event_type + if event_type == "book": + self._handle_book(payload) + elif event_type == "price_change": + for change in payload.get("price_changes") or []: + self._handle_price_change(change, payload) + elif event_type == "best_bid_ask": + self._handle_best_bid_ask(payload) + + def _handle_book(self, payload: dict[str, Any]) -> None: + asset_id = str(payload.get("asset_id")) + self.books[asset_id] = BookSnapshot( + token_id=asset_id, + bids=_levels(payload.get("bids") or [], reverse=True), + asks=_levels(payload.get("asks") or [], reverse=False), + raw=payload, + ) + + def _handle_price_change(self, change: dict[str, Any], parent: dict[str, Any]) -> None: + asset_id = str(change.get("asset_id")) + old = self.books.get(asset_id) + if old is None: + return + bids = {level.price: level.size for level in old.bids} + asks = {level.price: level.size for level in old.asks} + side = str(change.get("side", "")).upper() + price = _float_or_none(change.get("price")) + size = _float_or_none(change.get("size")) + if price is None or size is None: + return + target = bids if side == "BUY" else asks + if size <= 0: + target.pop(price, None) + else: + target[price] = size + self.books[asset_id] = BookSnapshot( + token_id=asset_id, + bids=tuple(BookLevel(price=p, size=s) for p, s in sorted(bids.items(), reverse=True)), + asks=tuple(BookLevel(price=p, size=s) for p, s in sorted(asks.items())), + raw=parent, + ) + + def _handle_best_bid_ask(self, payload: dict[str, Any]) -> None: + asset_id = str(payload.get("asset_id")) + old = self.books.get(asset_id) + if old is not None: + old.raw["best_bid_ask"] = payload + + +async def clob_market_stream( + session: aiohttp.ClientSession, + *, + asset_ids: list[str], + state: ClobBookState, + on_event=None, +) -> None: + subscribe = { + "assets_ids": asset_ids, + "type": "market", + "custom_feature_enabled": True, + } + while True: + try: + async with session.ws_connect(CLOB_WS, heartbeat=20) as ws: + await ws.send_json(subscribe) + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + payload = msg.json() + items = payload if isinstance(payload, list) else [payload] + for item in items: + if isinstance(item, dict) and item.get("event_type"): + state.handle(item) + if on_event: + on_event(item) + elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): + break + except Exception: + await asyncio.sleep(1) + + +def _levels(levels: list[dict[str, Any]], *, reverse: bool) -> tuple[BookLevel, ...]: + parsed = [] + for level in levels: + price = _float_or_none(level.get("price")) + size = _float_or_none(level.get("size")) + if price is not None and size is not None: + parsed.append(BookLevel(price=price, size=size)) + return tuple(sorted(parsed, key=lambda level: level.price, reverse=reverse)) + + +def _float_or_none(value: Any) -> float | None: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None diff --git a/src/poly_updown/duckstore.py b/src/poly_updown/duckstore.py new file mode 100644 index 0000000..b4e102f --- /dev/null +++ b/src/poly_updown/duckstore.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +import duckdb + + +class DuckStore: + def __init__(self, path: Path) -> None: + self.path = path + self.path.parent.mkdir(parents=True, exist_ok=True) + self._conn = duckdb.connect(str(path)) + self._init_schema() + + def close(self) -> None: + self._conn.close() + + def _init_schema(self) -> None: + self._conn.execute( + """ + create table if not exists observations ( + recorded_at timestamp, + slug varchar, + benchmark_source varchar, + price_source varchar, + btc_price double, + source_timestamp_ms bigint, + rtds_lag_ms bigint, + start_offset_ms bigint, + final_offset_ms bigint, + is_trusted_market boolean, + fair_up double, + buy_up_edge double, + buy_down_edge double, + up_bid double, + up_ask double, + down_bid double, + down_ask double, + payload json + ) + """ + ) + self._conn.execute( + """ + create table if not exists book_events ( + received_at timestamp, + event_type varchar, + asset_id varchar, + market varchar, + timestamp_ms bigint, + payload json + ) + """ + ) + + def write_observation(self, payload: dict[str, Any]) -> None: + market = payload.get("market") or {} + tick = payload.get("tick") or {} + edge = payload.get("edge") or {} + start = payload.get("start_boundary") or {} + final = payload.get("final_boundary") or {} + books = payload.get("books") or {} + up = books.get("up") or {} + down = books.get("down") or {} + self._conn.execute( + """ + insert into observations values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + [ + payload.get("recorded_at"), + market.get("event_slug"), + payload.get("benchmark_source"), + tick.get("source"), + tick.get("price"), + tick.get("source_timestamp_ms"), + payload.get("rtds_lag_ms"), + start.get("offset_ms"), + final.get("offset_ms"), + payload.get("is_trusted_market"), + edge.get("fair_up"), + edge.get("buy_up_edge"), + edge.get("buy_down_edge"), + up.get("best_bid"), + up.get("best_ask"), + down.get("best_bid"), + down.get("best_ask"), + json.dumps(payload, sort_keys=True), + ], + ) + + def write_book_event(self, payload: dict[str, Any], *, received_at: str) -> None: + self._conn.execute( + "insert into book_events values (?, ?, ?, ?, ?, ?)", + [ + received_at, + payload.get("event_type"), + payload.get("asset_id"), + payload.get("market"), + _int_or_none(payload.get("timestamp")), + json.dumps(payload, sort_keys=True), + ], + ) + + def recent_observations(self, *, limit: int = 12000) -> list[dict[str, Any]]: + rows = self._conn.execute( + """ + select cast(payload as varchar) + from observations + order by recorded_at desc + limit ? + """, + [limit], + ).fetchall() + payloads = [] + for (raw,) in reversed(rows): + try: + payloads.append(json.loads(raw)) + except (TypeError, json.JSONDecodeError): + continue + return payloads + + def health_summary(self) -> dict[str, Any]: + obs = self._conn.execute( + """ + select + count(*) as total, + sum(case when is_trusted_market then 1 else 0 end) as trusted, + avg(rtds_lag_ms) as avg_rtds_lag_ms, + max(recorded_at) as last_recorded_at + from observations + """ + ).fetchone() + books = self._conn.execute( + """ + select + count(*) as total, + max(received_at) as last_received_at + from book_events + """ + ).fetchone() + last_book = self._conn.execute( + """ + select event_type + from book_events + order by received_at desc + limit 1 + """ + ).fetchone() + return { + "path": str(self.path), + "observations": int(obs[0] or 0), + "trusted_observations": int(obs[1] or 0), + "avg_rtds_lag_ms": obs[2], + "last_recorded_at": str(obs[3]) if obs[3] is not None else None, + "book_events": int(books[0] or 0), + "last_book_event_at": str(books[1]) if books[1] is not None else None, + "last_book_event_type": last_book[0] if last_book else None, + } + + +def _int_or_none(value: Any) -> int | None: + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None diff --git a/src/poly_updown/gamma.py b/src/poly_updown/gamma.py new file mode 100644 index 0000000..c85187b --- /dev/null +++ b/src/poly_updown/gamma.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import json +from typing import Any + +import aiohttp + +from .models import MarketInfo, parse_dt + + +GAMMA_BASE = "https://gamma-api.polymarket.com" + + +def _loads_json_array(value: str | list[Any] | None) -> list[Any]: + if value is None: + return [] + if isinstance(value, list): + return value + return json.loads(value) + + +def parse_market_info(event: dict[str, Any]) -> MarketInfo: + markets = event.get("markets") or [] + if not markets: + raise ValueError(f"event {event.get('slug')} has no markets") + + market = markets[0] + token_ids = _loads_json_array(market.get("clobTokenIds")) + if len(token_ids) < 2: + raise ValueError(f"market {market.get('id')} has no Up/Down token ids") + + metadata = event.get("eventMetadata") or {} + price_to_beat = metadata.get("priceToBeat") + if price_to_beat is not None: + price_to_beat = float(price_to_beat) + + start = parse_dt(event.get("startTime") or market.get("eventStartTime") or market.get("startDate")) + end = parse_dt(event.get("endDate") or market.get("endDate")) + + return MarketInfo( + event_slug=event["slug"], + title=event.get("title") or market.get("question") or event["slug"], + market_id=str(market.get("id", "")), + condition_id=str(market.get("conditionId", "")), + start_time=start, + end_time=end, + price_to_beat=price_to_beat, + up_token_id=str(token_ids[0]), + down_token_id=str(token_ids[1]), + accepting_orders=bool(market.get("acceptingOrders")), + enable_order_book=bool(market.get("enableOrderBook")), + closed=bool(event.get("closed") or market.get("closed")), + best_bid=_maybe_float(market.get("bestBid")), + best_ask=_maybe_float(market.get("bestAsk")), + ) + + +def _maybe_float(value: Any) -> float | None: + if value is None or value == "": + return None + return float(value) + + +async def fetch_event_by_slug(session: aiohttp.ClientSession, slug: str) -> MarketInfo: + async with session.get(f"{GAMMA_BASE}/events/slug/{slug}") as response: + response.raise_for_status() + event = await response.json() + return parse_market_info(event) + + +async def discover_btc_updown( + session: aiohttp.ClientSession, + *, + limit: int = 20, +) -> list[MarketInfo]: + params = { + "series_slug": "btc-up-or-down-5m", + "closed": "false", + "active": "true", + "limit": str(limit), + } + async with session.get(f"{GAMMA_BASE}/events", params=params) as response: + response.raise_for_status() + events = await response.json() + + infos: list[MarketInfo] = [] + for event in events: + try: + infos.append(parse_market_info(event)) + except (KeyError, TypeError, ValueError, json.JSONDecodeError): + continue + return infos diff --git a/src/poly_updown/live.py b/src/poly_updown/live.py new file mode 100644 index 0000000..809c005 --- /dev/null +++ b/src/poly_updown/live.py @@ -0,0 +1,530 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import aiohttp + +from .clob import fetch_book +from .clob_ws import ClobBookState, clob_market_stream +from .duckstore import DuckStore +from .gamma import fetch_event_by_slug +from .models import BookSnapshot, EdgeSnapshot, MarketInfo, PriceTick +from .prices import resilient_btc_price_stream +from .strategy import build_edge_snapshot +from .tick_buffer import BoundaryPrice, TickBuffer +from .volatility import RollingVolatility + + +@dataclass +class LiveState: + running: bool = False + market: MarketInfo | None = None + benchmark_source: str = "none" + latest_tick: PriceTick | None = None + up_book: BookSnapshot | None = None + down_book: BookSnapshot | None = None + edge: EdgeSnapshot | None = None + start_boundary: dict[str, Any] | None = None + final_boundary: dict[str, Any] | None = None + rtds_lag_ms: int | None = None + clob_book_age_ms: int | None = None + clob_last_event_type: str | None = None + is_trusted_market: bool = False + samples_written: int = 0 + last_sample_written_at: datetime | None = None + recorder_path: str | None = None + errors: list[str] = field(default_factory=list) + events: list[dict[str, Any]] = field(default_factory=list) + updated_at: datetime | None = None + + def add_event(self, message: str, *, level: str = "info") -> None: + self.events.append( + { + "at": datetime.now(timezone.utc).isoformat(), + "level": level, + "message": message, + } + ) + self.events = self.events[-80:] + + def add_error(self, message: str) -> None: + self.errors.append(f"{datetime.now(timezone.utc).isoformat()} {message}") + self.errors = self.errors[-20:] + self.add_event(message, level="error") + + def as_dict(self) -> dict[str, Any]: + now = datetime.now(timezone.utc) + state_age_ms = ( + int((now - self.updated_at).total_seconds() * 1000) + if self.updated_at + else None + ) + db_write_age_ms = ( + int((now - self.last_sample_written_at).total_seconds() * 1000) + if self.last_sample_written_at + else None + ) + return { + "running": self.running, + "updated_at": self.updated_at.isoformat() if self.updated_at else None, + "state_age_ms": state_age_ms, + "benchmark_source": self.benchmark_source, + "market": _market_dict(self.market), + "tick": _tick_dict(self.latest_tick), + "books": { + "up": _book_dict(self.up_book), + "down": _book_dict(self.down_book), + }, + "edge": self.edge.as_dict() if self.edge else None, + "start_boundary": self.start_boundary, + "final_boundary": self.final_boundary, + "rtds_lag_ms": self.rtds_lag_ms, + "clob_book_age_ms": self.clob_book_age_ms, + "clob_last_event_type": self.clob_last_event_type, + "is_trusted_market": self.is_trusted_market, + "samples_written": self.samples_written, + "last_sample_written_at": ( + self.last_sample_written_at.isoformat() + if self.last_sample_written_at + else None + ), + "db_write_age_ms": db_write_age_ms, + "health": _health_dict( + running=self.running, + state_age_ms=state_age_ms, + rtds_lag_ms=self.rtds_lag_ms, + clob_book_age_ms=self.clob_book_age_ms, + db_write_age_ms=db_write_age_ms, + has_recent_error=bool(self.errors), + ), + "recorder_path": self.recorder_path, + "errors": self.errors, + "events": self.events, + } + + +class LiveMonitor: + def __init__( + self, + *, + book_poll_s: float = 1.0, + market_refresh_s: float = 5.0, + publish_interval_s: float = 0.5, + stale_restart_s: float = 20.0, + duckdb_path: str = "data/updown.duckdb", + ) -> None: + self.book_poll_s = book_poll_s + self.market_refresh_s = market_refresh_s + self.publish_interval_s = publish_interval_s + self.stale_restart_s = stale_restart_s + self.state = LiveState() + self._duck = DuckStore(Path(duckdb_path)) + self.state.recorder_path = duckdb_path + self._task: asyncio.Task[None] | None = None + self._subscribers: set[asyncio.Queue[dict[str, Any]]] = set() + self._proxy_benchmarks: dict[str, float] = {} + self._rtds_boundary_benchmarks: dict[str, float] = {} + self._tick_buffer = TickBuffer() + self._start_boundaries: dict[str, BoundaryPrice] = {} + self._final_boundaries: dict[str, BoundaryPrice] = {} + self._clob_state = ClobBookState() + self._clob_task: asyncio.Task[None] | None = None + + def start(self) -> None: + if self._task is None or self._task.done(): + self.state.running = True + self._task = asyncio.create_task(self._supervise()) + + async def stop(self) -> None: + self.state.running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + if self._clob_task: + self._clob_task.cancel() + try: + await self._clob_task + except asyncio.CancelledError: + pass + self._duck.close() + + def subscribe(self) -> asyncio.Queue[dict[str, Any]]: + queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=5) + self._subscribers.add(queue) + queue.put_nowait(self.state.as_dict()) + return queue + + def unsubscribe(self, queue: asyncio.Queue[dict[str, Any]]) -> None: + self._subscribers.discard(queue) + + async def _publish(self) -> None: + payload = self.state.as_dict() + for queue in list(self._subscribers): + if queue.full(): + try: + queue.get_nowait() + except asyncio.QueueEmpty: + pass + await queue.put(payload) + + async def _supervise(self) -> None: + while self.state.running: + try: + await self._run_once() + except asyncio.CancelledError: + raise + except Exception as exc: + self.state.add_error(f"monitor restarted after failure: {exc}") + await asyncio.sleep(1) + + async def _run_once(self) -> None: + timeout = aiohttp.ClientTimeout(total=10) + vol = RollingVolatility() + next_market_refresh = 0.0 + next_book_poll = 0.0 + next_publish = 0.0 + + async with aiohttp.ClientSession(timeout=timeout) as session: + price_stream = resilient_btc_price_stream(session) + while self.state.running: + try: + tick = await asyncio.wait_for( + anext(price_stream), + timeout=self.stale_restart_s, + ) + except TimeoutError as exc: + raise RuntimeError("price stream stale") from exc + self.state.latest_tick = tick + self.state.updated_at = datetime.now(timezone.utc) + if tick.source == "polymarket_rtds_chainlink": + self._tick_buffer.add(tick) + if tick.source_timestamp_ms is not None: + self.state.rtds_lag_ms = int( + self.state.updated_at.timestamp() * 1000 + ) - tick.source_timestamp_ms + if self.state.market and self.state.benchmark_source == "proxy_start": + await self._refresh_market(session, tick) + vol.add(tick) + + now = asyncio.get_running_loop().time() + if now >= next_market_refresh: + await self._refresh_market(session, tick) + next_market_refresh = now + self.market_refresh_s + + if self.state.market and now >= next_book_poll: + await self._refresh_books(session) + next_book_poll = now + self.book_poll_s + + self._refresh_edge(tick, vol) + self._record_sample() + if now >= next_publish: + await self._publish() + next_publish = now + self.publish_interval_s + + async def _refresh_market(self, session: aiohttp.ClientSession, tick: PriceTick) -> None: + slug = current_btc_updown_slug() + if self.state.market and self.state.market.event_slug == slug: + if ( + self.state.market.price_to_beat is None + or ( + self.state.benchmark_source == "proxy_start" + and tick.source == "polymarket_rtds_chainlink" + ) + ): + await self._load_market(session, slug, tick) + return + + await self._load_market(session, slug, tick) + + async def _load_market( + self, + session: aiohttp.ClientSession, + slug: str, + tick: PriceTick, + ) -> None: + try: + market = await fetch_event_by_slug(session, slug) + except Exception as exc: + self.state.add_error(f"market fetch failed for {slug}: {exc}") + return + + old_slug = self.state.market.event_slug if self.state.market else None + if old_slug != market.event_slug: + self.state.add_event(f"switched to {market.event_slug}") + self.state.up_book = None + self.state.down_book = None + self.state.edge = None + self.state.start_boundary = None + self.state.final_boundary = None + self.state.is_trusted_market = False + await self._restart_clob_stream(session, market) + + if market.price_to_beat is None: + benchmark, source = self._benchmark_from_market(market, tick) + market = MarketInfo( + event_slug=market.event_slug, + title=market.title, + market_id=market.market_id, + condition_id=market.condition_id, + start_time=market.start_time, + end_time=market.end_time, + price_to_beat=benchmark, + up_token_id=market.up_token_id, + down_token_id=market.down_token_id, + accepting_orders=market.accepting_orders, + enable_order_book=market.enable_order_book, + closed=market.closed, + best_bid=market.best_bid, + best_ask=market.best_ask, + ) + self.state.benchmark_source = source + else: + self.state.benchmark_source = "gamma_priceToBeat" + + self.state.market = market + self._refresh_boundary_state(market) + + def _benchmark_from_market(self, market: MarketInfo, tick: PriceTick) -> tuple[float, str]: + if market.start_time is not None: + target_ms = int(market.start_time.timestamp() * 1000) + boundary = self._tick_buffer.nearest(target_ms) + if boundary is not None: + self._start_boundaries[market.event_slug] = boundary + self._rtds_boundary_benchmarks[market.event_slug] = boundary.price + return boundary.price, "rtds_boundary" + benchmark = self._proxy_benchmarks.setdefault(market.event_slug, tick.price) + return benchmark, "proxy_start" + + def _refresh_boundary_state(self, market: MarketInfo) -> None: + start = self._start_boundaries.get(market.event_slug) + if start is not None: + self.state.start_boundary = _boundary_dict(start) + + if market.end_time is not None: + final = self._tick_buffer.nearest(int(market.end_time.timestamp() * 1000)) + if final is not None: + self._final_boundaries[market.event_slug] = final + self.state.final_boundary = _boundary_dict(final) + self.state.is_trusted_market = ( + self.state.benchmark_source in {"gamma_priceToBeat", "rtds_boundary"} + and self.state.start_boundary is not None + ) + + async def _refresh_books(self, session: aiohttp.ClientSession) -> None: + market = self.state.market + if market is None: + return + up = self._clob_state.books.get(market.up_token_id) + down = self._clob_state.books.get(market.down_token_id) + self.state.clob_book_age_ms = self._clob_state.age_ms + self.state.clob_last_event_type = self._clob_state.last_event_type + if up is None or down is None: + rest_up, rest_down = await asyncio.gather( + _safe_book(session, market.up_token_id), + _safe_book(session, market.down_token_id), + ) + up = up or rest_up + down = down or rest_down + self.state.up_book = up + self.state.down_book = down + + async def _restart_clob_stream( + self, + session: aiohttp.ClientSession, + market: MarketInfo, + ) -> None: + if self._clob_task: + self._clob_task.cancel() + try: + await self._clob_task + except asyncio.CancelledError: + pass + self._clob_state = ClobBookState() + self._clob_task = asyncio.create_task( + clob_market_stream( + session, + asset_ids=[market.up_token_id, market.down_token_id], + state=self._clob_state, + on_event=self._record_book_event, + ) + ) + + def _record_book_event(self, payload: dict[str, Any]) -> None: + try: + self._duck.write_book_event( + payload, + received_at=datetime.now(timezone.utc).isoformat(), + ) + except Exception as exc: + self.state.add_error(f"duckdb book write failed: {exc}") + + def _refresh_edge(self, tick: PriceTick, vol: RollingVolatility) -> None: + market = self.state.market + sigma = vol.sigma_price_per_sqrt_second + if market is None or market.price_to_beat is None or sigma is None: + return + try: + self.state.edge = build_edge_snapshot( + market=market, + tick=tick, + sigma_price_per_sqrt_second=sigma, + up_book=self.state.up_book, + down_book=self.state.down_book, + ) + except Exception as exc: + self.state.add_error(f"edge refresh failed: {exc}") + + def _record_sample(self) -> None: + if self.state.market is None or self.state.latest_tick is None: + return + payload = self.state.as_dict() + payload["recorded_at"] = datetime.now(timezone.utc).isoformat() + try: + self._duck.write_observation(payload) + self.state.samples_written += 1 + self.state.last_sample_written_at = datetime.now(timezone.utc) + except Exception as exc: + self.state.add_error(f"sample write failed: {exc}") + + +async def _safe_book( + session: aiohttp.ClientSession, + token_id: str, +) -> BookSnapshot | None: + try: + return await fetch_book(session, token_id) + except Exception: + return None + + +def current_btc_updown_slug(now: datetime | None = None) -> str: + moment = now or datetime.now(timezone.utc) + timestamp = int(moment.timestamp()) + start = timestamp - (timestamp % 300) + return f"btc-updown-5m-{start}" + + +def _market_dict(market: MarketInfo | None) -> dict[str, Any] | None: + if market is None: + return None + return { + "event_slug": market.event_slug, + "title": market.title, + "market_id": market.market_id, + "condition_id": market.condition_id, + "start_time": market.start_time.isoformat() if market.start_time else None, + "end_time": market.end_time.isoformat() if market.end_time else None, + "seconds_remaining": market.seconds_remaining, + "price_to_beat": market.price_to_beat, + "up_token_id": market.up_token_id, + "down_token_id": market.down_token_id, + "accepting_orders": market.accepting_orders, + "enable_order_book": market.enable_order_book, + "closed": market.closed, + "best_bid": market.best_bid, + "best_ask": market.best_ask, + } + + +def _tick_dict(tick: PriceTick | None) -> dict[str, Any] | None: + if tick is None: + return None + return { + "source": tick.source, + "symbol": tick.symbol, + "price": tick.price, + "received_at": tick.received_at.isoformat(), + "source_timestamp_ms": tick.source_timestamp_ms, + } + + +def _book_dict(book: BookSnapshot | None) -> dict[str, Any] | None: + if book is None: + return None + return { + "token_id": book.token_id, + "best_bid": book.best_bid, + "best_ask": book.best_ask, + "bids": [{"price": item.price, "size": item.size} for item in book.bids[:8]], + "asks": [{"price": item.price, "size": item.size} for item in book.asks[:8]], + } + + +def _boundary_dict(boundary: BoundaryPrice) -> dict[str, Any]: + return { + "price": boundary.price, + "tick_timestamp_ms": boundary.tick_timestamp_ms, + "target_timestamp_ms": boundary.target_timestamp_ms, + "offset_ms": boundary.offset_ms, + "source": boundary.source, + } + + +def _health_dict( + *, + running: bool, + state_age_ms: int | None, + rtds_lag_ms: int | None, + clob_book_age_ms: int | None, + db_write_age_ms: int | None, + has_recent_error: bool, +) -> dict[str, Any]: + issues = [] + severity = 0 + + def add(level: int, message: str) -> None: + nonlocal severity + severity = max(severity, level) + issues.append(message) + + if not running: + add(2, "采集未运行") + if state_age_ms is None: + add(1, "尚未收到价格 tick") + elif state_age_ms > 20_000: + add(2, f"状态已停滞 {state_age_ms} ms") + elif state_age_ms > 5_000: + add(1, f"状态刷新偏慢 {state_age_ms} ms") + + if rtds_lag_ms is None: + add(1, "RTDS 延迟未知") + elif rtds_lag_ms > 30_000: + add(2, f"RTDS 延迟过高 {rtds_lag_ms} ms") + elif rtds_lag_ms > 10_000: + add(1, f"RTDS 延迟偏高 {rtds_lag_ms} ms") + + if clob_book_age_ms is None: + add(1, "CLOB 盘口尚未到达") + elif clob_book_age_ms > 10_000: + add(2, f"CLOB 盘口停滞 {clob_book_age_ms} ms") + elif clob_book_age_ms > 3_000: + add(1, f"CLOB 盘口偏旧 {clob_book_age_ms} ms") + + if db_write_age_ms is None: + add(1, "DuckDB 尚未写入") + elif db_write_age_ms > 20_000: + add(2, f"DuckDB 写入停滞 {db_write_age_ms} ms") + elif db_write_age_ms > 5_000: + add(1, f"DuckDB 写入偏慢 {db_write_age_ms} ms") + + if has_recent_error: + add(1, "近期有错误日志") + + labels = { + 0: ("ok", "正常"), + 1: ("warn", "注意"), + 2: ("bad", "异常"), + } + status, label = labels[severity] + return { + "status": status, + "label": label, + "issues": issues[:5], + } diff --git a/src/poly_updown/lockfile.py b/src/poly_updown/lockfile.py new file mode 100644 index 0000000..d8ae112 --- /dev/null +++ b/src/poly_updown/lockfile.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import fcntl +import os +from pathlib import Path + + +class InstanceLockError(RuntimeError): + pass + + +class InstanceLock: + def __init__(self, path: Path) -> None: + self.path = path + self._handle = None + + def acquire(self) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + self._handle = self.path.open("a+", encoding="utf-8") + try: + fcntl.flock(self._handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError as exc: + raise InstanceLockError( + f"another updown-dashboard process is already running; lock={self.path}" + ) from exc + self._handle.seek(0) + self._handle.truncate() + self._handle.write(f"{os.getpid()}\n") + self._handle.flush() + + def release(self) -> None: + if self._handle is None: + return + fcntl.flock(self._handle.fileno(), fcntl.LOCK_UN) + self._handle.close() + self._handle = None + + def __enter__(self) -> InstanceLock: + self.acquire() + return self + + def __exit__(self, *_) -> None: + self.release() diff --git a/src/poly_updown/models.py b/src/poly_updown/models.py new file mode 100644 index 0000000..45d810a --- /dev/null +++ b/src/poly_updown/models.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any + + +def parse_dt(value: str | None) -> datetime | None: + if not value: + return None + normalized = value.replace("Z", "+00:00") + try: + return datetime.fromisoformat(normalized).astimezone(timezone.utc) + except ValueError: + return None + + +@dataclass(frozen=True) +class MarketInfo: + event_slug: str + title: str + market_id: str + condition_id: str + start_time: datetime | None + end_time: datetime | None + price_to_beat: float | None + up_token_id: str + down_token_id: str + accepting_orders: bool + enable_order_book: bool + closed: bool + best_bid: float | None = None + best_ask: float | None = None + + @property + def seconds_remaining(self) -> float | None: + if self.end_time is None: + return None + return max(0.0, (self.end_time - datetime.now(timezone.utc)).total_seconds()) + + +@dataclass(frozen=True) +class BookLevel: + price: float + size: float + + +@dataclass(frozen=True) +class BookSnapshot: + token_id: str + bids: tuple[BookLevel, ...] + asks: tuple[BookLevel, ...] + raw: dict[str, Any] + + @property + def best_bid(self) -> float | None: + return self.bids[0].price if self.bids else None + + @property + def best_ask(self) -> float | None: + return self.asks[0].price if self.asks else None + + +@dataclass(frozen=True) +class PriceTick: + source: str + symbol: str + price: float + received_at: datetime + source_timestamp_ms: int | None = None + + +@dataclass(frozen=True) +class EdgeSnapshot: + observed_at: datetime + slug: str + price_source: str + btc_price: float + price_to_beat: float + seconds_remaining: float + volatility_per_sqrt_second: float + fair_up: float + up_bid: float | None + up_ask: float | None + down_bid: float | None + down_ask: float | None + buy_up_edge: float | None + buy_down_edge: float | None + + def as_dict(self) -> dict[str, Any]: + return { + "observed_at": self.observed_at.isoformat(), + "slug": self.slug, + "price_source": self.price_source, + "btc_price": self.btc_price, + "price_to_beat": self.price_to_beat, + "seconds_remaining": self.seconds_remaining, + "volatility_per_sqrt_second": self.volatility_per_sqrt_second, + "fair_up": self.fair_up, + "up_bid": self.up_bid, + "up_ask": self.up_ask, + "down_bid": self.down_bid, + "down_ask": self.down_ask, + "buy_up_edge": self.buy_up_edge, + "buy_down_edge": self.buy_down_edge, + } diff --git a/src/poly_updown/observer.py b/src/poly_updown/observer.py new file mode 100644 index 0000000..fada60e --- /dev/null +++ b/src/poly_updown/observer.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path + +import aiohttp + +from .clob import fetch_book +from .gamma import fetch_event_by_slug +from .models import BookSnapshot, PriceTick +from .prices import binance_trade_stream, poll_binance_price +from .storage import JsonlWriter +from .strategy import build_edge_snapshot +from .volatility import RollingVolatility + + +async def _safe_fetch_book( + session: aiohttp.ClientSession, + token_id: str, +) -> BookSnapshot | None: + try: + return await fetch_book(session, token_id) + except Exception: + return None + + +async def observe_market( + *, + slug: str, + duration_s: float, + output_path: Path, + poll_books_s: float = 1.0, + use_websocket_price: bool = True, +) -> None: + timeout = aiohttp.ClientTimeout(total=20) + async with aiohttp.ClientSession(timeout=timeout) as session: + market = await fetch_event_by_slug(session, slug) + if market.price_to_beat is None: + raise RuntimeError(f"{slug} has no priceToBeat yet") + + writer = JsonlWriter(output_path) + vol = RollingVolatility() + latest_tick: PriceTick | None = None + latest_up_book: BookSnapshot | None = None + latest_down_book: BookSnapshot | None = None + started_at = asyncio.get_running_loop().time() + next_book_poll = 0.0 + + price_stream = ( + binance_trade_stream(session) + if use_websocket_price + else poll_binance_price(session, interval_s=1.0) + ) + + async for tick in price_stream: + latest_tick = tick + vol.add(tick) + + now = asyncio.get_running_loop().time() + if now >= next_book_poll: + latest_up_book, latest_down_book = await asyncio.gather( + _safe_fetch_book(session, market.up_token_id), + _safe_fetch_book(session, market.down_token_id), + ) + next_book_poll = now + poll_books_s + + sigma = vol.sigma_price_per_sqrt_second + if sigma is not None and latest_tick is not None: + snapshot = build_edge_snapshot( + market=market, + tick=latest_tick, + sigma_price_per_sqrt_second=sigma, + up_book=latest_up_book, + down_book=latest_down_book, + ) + writer.write(snapshot) + print( + f"{snapshot.observed_at.isoformat()} " + f"btc={snapshot.btc_price:.2f} " + f"beat={snapshot.price_to_beat:.2f} " + f"t={snapshot.seconds_remaining:.1f}s " + f"fair_up={snapshot.fair_up:.3f} " + f"up={snapshot.up_bid}/{snapshot.up_ask} " + f"down={snapshot.down_bid}/{snapshot.down_ask} " + f"edge_up={_fmt(snapshot.buy_up_edge)} " + f"edge_down={_fmt(snapshot.buy_down_edge)}", + flush=True, + ) + + if now - started_at >= duration_s: + break + + +def _fmt(value: float | None) -> str: + return "n/a" if value is None else f"{value:+.4f}" diff --git a/src/poly_updown/prices.py b/src/poly_updown/prices.py new file mode 100644 index 0000000..6892a7d --- /dev/null +++ b/src/poly_updown/prices.py @@ -0,0 +1,165 @@ +from __future__ import annotations + +import asyncio +import json +from collections.abc import AsyncIterator +from datetime import datetime, timezone + +import aiohttp + +from .models import PriceTick + + +BINANCE_WS = "wss://stream.binance.com:9443/ws/btcusdt@trade" +BINANCE_REST = "https://api.binance.com/api/v3/ticker/price" +POLYMARKET_RTDS_WS = "wss://ws-live-data.polymarket.com" + + +async def binance_trade_stream(session: aiohttp.ClientSession) -> AsyncIterator[PriceTick]: + async with session.ws_connect(BINANCE_WS, heartbeat=20) as ws: + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + payload = msg.json() + price = float(payload["p"]) + yield PriceTick( + source="binance_ws", + symbol="BTCUSDT", + price=price, + received_at=datetime.now(timezone.utc), + source_timestamp_ms=int(payload.get("T") or payload.get("E") or 0) or None, + ) + elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): + break + + +async def poll_binance_price( + session: aiohttp.ClientSession, + *, + interval_s: float = 1.0, +) -> AsyncIterator[PriceTick]: + while True: + async with session.get(BINANCE_REST, params={"symbol": "BTCUSDT"}) as response: + response.raise_for_status() + payload = await response.json() + yield PriceTick( + source="binance_rest", + symbol="BTCUSDT", + price=float(payload["price"]), + received_at=datetime.now(timezone.utc), + source_timestamp_ms=None, + ) + await asyncio.sleep(interval_s) + + +async def polymarket_chainlink_btc_stream( + session: aiohttp.ClientSession, +) -> AsyncIterator[PriceTick]: + subscribe = { + "action": "subscribe", + "subscriptions": [ + { + "topic": "crypto_prices_chainlink", + "type": "*", + "filters": json.dumps({"symbol": "btc/usd"}), + } + ], + } + async with session.ws_connect(POLYMARKET_RTDS_WS, heartbeat=20) as ws: + await ws.send_json(subscribe) + ping_task = asyncio.create_task(_ping_loop(ws)) + try: + while True: + msg = await ws.receive(timeout=8) + if msg.type == aiohttp.WSMsgType.TEXT: + if not msg.data: + continue + payload = msg.json() + for tick in _parse_chainlink_message(payload): + yield tick + elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): + break + elif msg.type == aiohttp.WSMsgType.CLOSE: + break + finally: + ping_task.cancel() + try: + await ping_task + except asyncio.CancelledError: + pass + + +async def resilient_btc_price_stream( + session: aiohttp.ClientSession, +) -> AsyncIterator[PriceTick]: + while True: + try: + async for tick in polymarket_chainlink_btc_stream(session): + yield tick + except Exception: + await asyncio.sleep(1) + + +async def _limited_binance_fallback( + session: aiohttp.ClientSession, + *, + attempts: int = 10, +) -> AsyncIterator[PriceTick]: + count = 0 + async for tick in poll_binance_price(session, interval_s=1.0): + yield tick + count += 1 + if count >= attempts: + break + + +async def _ping_loop(ws: aiohttp.ClientWebSocketResponse) -> None: + while True: + await asyncio.sleep(5) + await ws.send_str("PING") + + +def _parse_chainlink_message(message: dict) -> list[PriceTick]: + if message.get("topic") not in {"crypto_prices_chainlink", "crypto_prices"}: + return [] + payload = message.get("payload") or {} + if payload.get("symbol") != "btc/usd": + return [] + + now = datetime.now(timezone.utc) + if "data" in payload: + ticks = [] + for item in payload.get("data") or []: + value = item.get("value") + if value is not None: + ticks.append( + PriceTick( + source="polymarket_rtds_chainlink", + symbol="btc/usd", + price=float(value), + received_at=now, + source_timestamp_ms=_int_or_none(item.get("timestamp")), + ) + ) + return ticks + + value = payload.get("value") + if value is None: + return [] + return [ + PriceTick( + source="polymarket_rtds_chainlink", + symbol="btc/usd", + price=float(value), + received_at=now, + source_timestamp_ms=_int_or_none(payload.get("timestamp") or message.get("timestamp")), + ) + ] + + +def _int_or_none(value) -> int | None: + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None diff --git a/src/poly_updown/server.py b/src/poly_updown/server.py new file mode 100644 index 0000000..a4042bd --- /dev/null +++ b/src/poly_updown/server.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import argparse +import asyncio +import json +from pathlib import Path +import sys + +import aiohttp +from aiohttp import web + +from .analytics import build_recent_market_stats +from .live import LiveMonitor +from .lockfile import InstanceLock, InstanceLockError + + +WEB_ROOT = Path(__file__).with_name("web") + + +async def index(_: web.Request) -> web.FileResponse: + return web.FileResponse(WEB_ROOT / "index.html") + + +async def state(request: web.Request) -> web.Response: + monitor: LiveMonitor = request.app["monitor"] + return web.json_response(monitor.state.as_dict()) + + +async def analytics(request: web.Request) -> web.Response: + monitor: LiveMonitor = request.app["monitor"] + path = monitor.state.recorder_path or "data/updown.duckdb" + rows = monitor._duck.recent_observations(limit=12000) + return web.json_response(build_recent_market_stats(rows, path=path)) + + +async def health(request: web.Request) -> web.Response: + monitor: LiveMonitor = request.app["monitor"] + state = monitor.state.as_dict() + return web.json_response( + { + "state": state, + "state_age_ms": state.get("state_age_ms"), + "duckdb": monitor._duck.health_summary(), + } + ) + + +async def stream(request: web.Request) -> web.StreamResponse: + monitor: LiveMonitor = request.app["monitor"] + queue = monitor.subscribe() + response = web.StreamResponse( + status=200, + headers={ + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + await response.prepare(request) + try: + while True: + payload = await queue.get() + data = json.dumps(payload, separators=(",", ":")) + await response.write(f"data: {data}\n\n".encode("utf-8")) + except asyncio.CancelledError: + raise + except (ConnectionResetError, aiohttp.ClientConnectionResetError): + pass + finally: + monitor.unsubscribe(queue) + return response + + +async def on_startup(app: web.Application) -> None: + app["monitor"].start() + + +async def on_cleanup(app: web.Application) -> None: + await app["monitor"].stop() + + +def create_app() -> web.Application: + app = web.Application() + app["monitor"] = LiveMonitor(duckdb_path="data/updown.duckdb") + app.router.add_get("/", index) + app.router.add_get("/api/state", state) + app.router.add_get("/api/analytics", analytics) + app.router.add_get("/api/health", health) + app.router.add_get("/api/stream", stream) + app.router.add_static("/static", WEB_ROOT, show_index=False) + app.on_startup.append(on_startup) + app.on_cleanup.append(on_cleanup) + return app + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="updown-dashboard") + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", type=int, default=8765) + parser.add_argument("--lock-file", type=Path, default=Path("data/updown-dashboard.lock")) + parser.add_argument("--no-lock", action="store_true") + return parser + + +def main() -> None: + args = build_parser().parse_args() + if args.no_lock: + web.run_app(create_app(), host=args.host, port=args.port) + return + try: + with InstanceLock(args.lock_file): + web.run_app(create_app(), host=args.host, port=args.port) + except InstanceLockError as exc: + print(str(exc), file=sys.stderr) + raise SystemExit(2) from exc + + +if __name__ == "__main__": + main() diff --git a/src/poly_updown/storage.py b/src/poly_updown/storage.py new file mode 100644 index 0000000..a69979f --- /dev/null +++ b/src/poly_updown/storage.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from .models import EdgeSnapshot + + +class JsonlWriter: + def __init__(self, path: Path) -> None: + self.path = path + self.path.parent.mkdir(parents=True, exist_ok=True) + + def write(self, snapshot: EdgeSnapshot) -> None: + self.write_dict(snapshot.as_dict()) + + def write_dict(self, payload: dict[str, Any]) -> None: + with self.path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(payload, sort_keys=True) + "\n") diff --git a/src/poly_updown/strategy.py b/src/poly_updown/strategy.py new file mode 100644 index 0000000..a32a73e --- /dev/null +++ b/src/poly_updown/strategy.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +import math +from datetime import datetime, timezone + +from .models import BookSnapshot, EdgeSnapshot, MarketInfo, PriceTick + + +TAKER_FEE_RATE = 0.07 + + +def normal_cdf(value: float) -> float: + return 0.5 * (1.0 + math.erf(value / math.sqrt(2.0))) + + +def crypto_taker_fee(price: float) -> float: + return TAKER_FEE_RATE * price * (1.0 - price) + + +def fair_up_probability( + *, + current_price: float, + price_to_beat: float, + seconds_remaining: float, + sigma_price_per_sqrt_second: float, + latency_buffer_s: float = 2.0, +) -> float: + effective_seconds = max(1.0, seconds_remaining + latency_buffer_s) + denominator = sigma_price_per_sqrt_second * math.sqrt(effective_seconds) + if denominator <= 0: + return 0.5 + z = (current_price - price_to_beat) / denominator + return min(0.98, max(0.02, normal_cdf(z))) + + +def taker_buy_edge(fair_prob: float, ask: float | None) -> float | None: + if ask is None: + return None + return fair_prob - ask - crypto_taker_fee(ask) + + +def build_edge_snapshot( + *, + market: MarketInfo, + tick: PriceTick, + sigma_price_per_sqrt_second: float, + up_book: BookSnapshot | None, + down_book: BookSnapshot | None, +) -> EdgeSnapshot: + if market.price_to_beat is None: + raise ValueError("market has no price_to_beat") + + seconds_remaining = market.seconds_remaining + if seconds_remaining is None: + raise ValueError("market has no end_time") + + fair_up = fair_up_probability( + current_price=tick.price, + price_to_beat=market.price_to_beat, + seconds_remaining=seconds_remaining, + sigma_price_per_sqrt_second=sigma_price_per_sqrt_second, + ) + up_bid = up_book.best_bid if up_book else market.best_bid + up_ask = up_book.best_ask if up_book else market.best_ask + down_bid = down_book.best_bid if down_book else None + down_ask = down_book.best_ask if down_book else None + + return EdgeSnapshot( + observed_at=datetime.now(timezone.utc), + slug=market.event_slug, + price_source=tick.source, + btc_price=tick.price, + price_to_beat=market.price_to_beat, + seconds_remaining=seconds_remaining, + volatility_per_sqrt_second=sigma_price_per_sqrt_second, + fair_up=fair_up, + up_bid=up_bid, + up_ask=up_ask, + down_bid=down_bid, + down_ask=down_ask, + buy_up_edge=taker_buy_edge(fair_up, up_ask), + buy_down_edge=taker_buy_edge(1.0 - fair_up, down_ask), + ) diff --git a/src/poly_updown/tick_buffer.py b/src/poly_updown/tick_buffer.py new file mode 100644 index 0000000..dc5f7c9 --- /dev/null +++ b/src/poly_updown/tick_buffer.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from collections import deque +from dataclasses import dataclass + +from .models import PriceTick + + +@dataclass(frozen=True) +class BoundaryPrice: + price: float + tick_timestamp_ms: int + target_timestamp_ms: int + offset_ms: int + source: str + + +class TickBuffer: + def __init__(self, *, max_age_ms: int = 10 * 60 * 1000) -> None: + self.max_age_ms = max_age_ms + self._ticks: deque[PriceTick] = deque() + + def add(self, tick: PriceTick) -> None: + if tick.source_timestamp_ms is None: + return + self._ticks.append(tick) + cutoff = tick.source_timestamp_ms - self.max_age_ms + while self._ticks and (self._ticks[0].source_timestamp_ms or 0) < cutoff: + self._ticks.popleft() + + def nearest(self, target_timestamp_ms: int, *, max_offset_ms: int = 2_000) -> BoundaryPrice | None: + best: PriceTick | None = None + best_offset: int | None = None + for tick in self._ticks: + if tick.source_timestamp_ms is None: + continue + offset = abs(tick.source_timestamp_ms - target_timestamp_ms) + if best_offset is None or offset < best_offset: + best = tick + best_offset = offset + + if best is None or best_offset is None or best_offset > max_offset_ms: + return None + return BoundaryPrice( + price=best.price, + tick_timestamp_ms=best.source_timestamp_ms or 0, + target_timestamp_ms=target_timestamp_ms, + offset_ms=(best.source_timestamp_ms or 0) - target_timestamp_ms, + source=best.source, + ) + + def latest(self) -> PriceTick | None: + return self._ticks[-1] if self._ticks else None diff --git a/src/poly_updown/volatility.py b/src/poly_updown/volatility.py new file mode 100644 index 0000000..1ef03e0 --- /dev/null +++ b/src/poly_updown/volatility.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import math +from collections import deque +from dataclasses import dataclass, field + +from .models import PriceTick + + +@dataclass +class RollingVolatility: + window_s: float = 60.0 + floor_bps_per_sqrt_second: float = 0.35 + ticks: deque[PriceTick] = field(default_factory=deque) + + def add(self, tick: PriceTick) -> None: + self.ticks.append(tick) + cutoff = tick.received_at.timestamp() - self.window_s + while self.ticks and self.ticks[0].received_at.timestamp() < cutoff: + self.ticks.popleft() + + @property + def sigma_price_per_sqrt_second(self) -> float | None: + if len(self.ticks) < 3: + return None + + returns: list[float] = [] + for previous, current in zip(self.ticks, list(self.ticks)[1:]): + dt = current.received_at.timestamp() - previous.received_at.timestamp() + if dt <= 0: + continue + returns.append((current.price - previous.price) / math.sqrt(dt)) + + if len(returns) < 2: + return None + + mean = sum(returns) / len(returns) + variance = sum((item - mean) ** 2 for item in returns) / (len(returns) - 1) + observed = math.sqrt(variance) + floor = self.ticks[-1].price * self.floor_bps_per_sqrt_second / 10_000 + return max(observed, floor) diff --git a/src/poly_updown/web/app.js b/src/poly_updown/web/app.js new file mode 100644 index 0000000..badf275 --- /dev/null +++ b/src/poly_updown/web/app.js @@ -0,0 +1,396 @@ +const els = { + connectionDot: document.getElementById("connectionDot"), + connectionText: document.getElementById("connectionText"), + btcPrice: document.getElementById("btcPrice"), + priceSource: document.getElementById("priceSource"), + confidenceText: document.getElementById("confidenceText"), + healthCard: document.getElementById("healthCard"), + healthText: document.getElementById("healthText"), + healthDetail: document.getElementById("healthDetail"), + fairUp: document.getElementById("fairUp"), + benchmarkSource: document.getElementById("benchmarkSource"), + timeLeft: document.getElementById("timeLeft"), + marketWindow: document.getElementById("marketWindow"), + bestEdge: document.getElementById("bestEdge"), + bestEdgeSide: document.getElementById("bestEdgeSide"), + samplesWritten: document.getElementById("samplesWritten"), + recorderPath: document.getElementById("recorderPath"), + rtdsLag: document.getElementById("rtdsLag"), + startOffset: document.getElementById("startOffset"), + clobAge: document.getElementById("clobAge"), + clobEvent: document.getElementById("clobEvent"), + paperPnlTop: document.getElementById("paperPnlTop"), + paperWinRateTop: document.getElementById("paperWinRateTop"), + marketTitle: document.getElementById("marketTitle"), + marketBadge: document.getElementById("marketBadge"), + marketSlug: document.getElementById("marketSlug"), + priceToBeat: document.getElementById("priceToBeat"), + acceptingOrders: document.getElementById("acceptingOrders"), + volatility: document.getElementById("volatility"), + livePriceSource: document.getElementById("livePriceSource"), + lastTick: document.getElementById("lastTick"), + upEdge: document.getElementById("upEdge"), + downEdge: document.getElementById("downEdge"), + upEdgeBar: document.getElementById("upEdgeBar"), + downEdgeBar: document.getElementById("downEdgeBar"), + upBook: document.getElementById("upBook"), + downBook: document.getElementById("downBook"), + eventLog: document.getElementById("eventLog"), + analyticsMeta: document.getElementById("analyticsMeta"), + analyticsTable: document.getElementById("analyticsTable"), + paperMeta: document.getElementById("paperMeta"), + paperSummary: document.getElementById("paperSummary"), + paperTable: document.getElementById("paperTable"), +}; + +function fmtMoney(value) { + if (value === null || value === undefined || Number.isNaN(value)) return "--"; + return Number(value).toLocaleString(undefined, { + minimumFractionDigits: 2, + maximumFractionDigits: 2, + }); +} + +function fmtPct(value) { + if (value === null || value === undefined || Number.isNaN(value)) return "--"; + return `${(Number(value) * 100).toFixed(1)}%`; +} + +function fmtEdge(value) { + if (value === null || value === undefined || Number.isNaN(value)) return "--"; + const sign = value >= 0 ? "+" : ""; + return `${sign}${(value * 100).toFixed(2)}¢`; +} + +function fmtPnl(value) { + if (value === null || value === undefined || Number.isNaN(value)) return "--"; + const sign = value >= 0 ? "+" : ""; + return `${sign}${Number(value).toFixed(4)}`; +} + +function fmtSeconds(value) { + if (value === null || value === undefined || Number.isNaN(value)) return "--"; + const seconds = Math.max(0, Math.floor(value)); + const min = Math.floor(seconds / 60); + const sec = seconds % 60; + return `${min}:${String(sec).padStart(2, "0")}`; +} + +function compactTime(value) { + if (!value) return "--"; + return new Date(value).toLocaleTimeString([], { + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + }); +} + +function setConnection(state, text) { + els.connectionDot.className = `dot ${state}`; + els.connectionText.textContent = text; +} + +function render(state) { + const market = state.market; + const tick = state.tick; + const edge = state.edge; + + els.btcPrice.textContent = tick ? `$${fmtMoney(tick.price)}` : "--"; + els.priceSource.textContent = tick ? `${compactSource(tick.source)} · ${compactTime(tick.received_at)}` : "--"; + els.fairUp.textContent = edge ? fmtPct(edge.fair_up) : "--"; + els.benchmarkSource.textContent = state.benchmark_source || "--"; + els.confidenceText.textContent = confidenceLabel(state); + renderHealth(state.health); + els.timeLeft.textContent = market ? fmtSeconds(market.seconds_remaining) : "--"; + els.marketWindow.textContent = market + ? `${compactTime(market.start_time)} - ${compactTime(market.end_time)}` + : "--"; + + const upEdge = edge?.buy_up_edge; + const downEdge = edge?.buy_down_edge; + const best = bestEdge(upEdge, downEdge); + els.bestEdge.textContent = fmtEdge(best.value); + els.bestEdgeSide.textContent = best.side || "暂无信号"; + els.samplesWritten.textContent = state.samples_written ?? "--"; + els.recorderPath.textContent = state.recorder_path || "--"; + els.rtdsLag.textContent = state.rtds_lag_ms === null || state.rtds_lag_ms === undefined + ? "--" + : `${state.rtds_lag_ms} ms`; + els.startOffset.textContent = state.start_boundary + ? `${state.start_boundary.offset_ms} ms` + : "--"; + els.clobAge.textContent = state.clob_book_age_ms === null || state.clob_book_age_ms === undefined + ? "--" + : `${state.clob_book_age_ms} ms`; + els.clobEvent.textContent = state.clob_last_event_type || "--"; + + els.marketTitle.textContent = market?.title || "等待市场"; + els.marketSlug.textContent = market?.event_slug || "--"; + els.priceToBeat.textContent = market?.price_to_beat ? `$${fmtMoney(market.price_to_beat)}` : "--"; + els.acceptingOrders.textContent = market ? String(market.accepting_orders).toUpperCase() : "--"; + els.volatility.textContent = edge ? fmtMoney(edge.volatility_per_sqrt_second) : "--"; + els.livePriceSource.textContent = tick?.source || "--"; + els.lastTick.textContent = tick ? compactTime(tick.received_at) : "--"; + + if (market) { + const open = market.accepting_orders && !market.closed; + els.marketBadge.textContent = open ? "交易中" : "已关闭"; + els.marketBadge.className = `badge ${open ? "open" : "closed"}`; + } + + els.upEdge.textContent = fmtEdge(upEdge); + els.downEdge.textContent = fmtEdge(downEdge); + renderEdgeBar(els.upEdgeBar, upEdge); + renderEdgeBar(els.downEdgeBar, downEdge); + renderBook(els.upBook, state.books?.up); + renderBook(els.downBook, state.books?.down); + renderEvents(state.events || []); +} + +function compactSource(source) { + if (!source) return "--"; + return source + .replace("polymarket_rtds_chainlink", "RTDS Chainlink") + .replace("binance_rest", "Binance REST") + .replace("binance_ws", "Binance WS"); +} + +function confidenceLabel(state) { + const benchmark = state.benchmark_source; + const source = state.tick?.source; + if (state.is_trusted_market && source === "polymarket_rtds_chainlink") { + return "高"; + } + if (source) { + return "低"; + } + return "--"; +} + +function renderHealth(health) { + const status = health?.status || "warn"; + els.healthText.textContent = health?.label || "--"; + els.healthDetail.textContent = health?.issues?.length + ? health.issues.join(" · ") + : "RTDS / CLOB / DuckDB 正常"; + els.healthCard.className = `metric health-card ${status}`; +} + +function bestEdge(up, down) { + const candidates = [ + { side: "买 Up", value: up }, + { side: "买 Down", value: down }, + ].filter((item) => item.value !== null && item.value !== undefined); + if (!candidates.length) return { side: "", value: null }; + return candidates.sort((a, b) => b.value - a.value)[0]; +} + +function renderEdgeBar(el, value) { + if (value === null || value === undefined || Number.isNaN(value)) { + el.style.width = "0"; + el.style.left = "50%"; + el.className = "bar"; + return; + } + const capped = Math.max(-0.12, Math.min(0.12, value)); + const width = Math.abs(capped) / 0.12 * 50; + el.style.width = `${width}%`; + el.style.left = capped < 0 ? `${50 - width}%` : "50%"; + el.className = `bar ${capped < 0 ? "negative" : ""}`; +} + +function renderBook(el, book) { + if (!book) { + el.innerHTML = '
暂无实时盘口快照
'; + return; + } + const levels = [ + ...(book.asks || []).slice(0, 4).reverse().map((level) => ({ ...level, side: "ask" })), + ...(book.bids || []).slice(0, 4).map((level) => ({ ...level, side: "bid" })), + ]; + if (!levels.length) { + el.innerHTML = '
盘口为空
'; + return; + } + const maxSize = Math.max(...levels.map((level) => level.size), 1); + el.innerHTML = levels + .map((level) => { + const width = Math.max(4, Math.min(100, (level.size / maxSize) * 100)); + return ` +
+ ${level.side.toUpperCase()} + + ${Number(level.price).toFixed(2)} +
+ `; + }) + .join(""); +} + +function renderEvents(events) { + els.eventLog.innerHTML = events + .slice() + .reverse() + .slice(0, 14) + .map((event) => { + return ` +
+ ${compactTime(event.at)} + ${event.level} + ${escapeHtml(event.message)} +
+ `; + }) + .join(""); +} + +function escapeHtml(value) { + return String(value) + .replaceAll("&", "&") + .replaceAll("<", "<") + .replaceAll(">", ">") + .replaceAll('"', """); +} + +function connect() { + setConnection("", "连接中"); + const stream = new EventSource("/api/stream"); + stream.onopen = () => setConnection("live", "实时"); + stream.onmessage = (event) => { + queueRender(JSON.parse(event.data)); + scheduleAnalyticsRefresh(); + }; + stream.onerror = () => { + setConnection("offline", "重连中"); + }; +} + +let analyticsRefreshAt = 0; +let pendingState = null; +let renderFrame = null; + +function queueRender(state) { + pendingState = state; + if (renderFrame !== null) return; + renderFrame = requestAnimationFrame(() => { + renderFrame = null; + if (pendingState) { + render(pendingState); + pendingState = null; + } + }); +} + +function scheduleAnalyticsRefresh() { + const now = Date.now(); + if (now < analyticsRefreshAt) return; + analyticsRefreshAt = now + 15000; + refreshAnalytics(); +} + +async function refreshAnalytics() { + try { + const response = await fetch("/api/analytics"); + const data = await response.json(); + renderAnalytics(data); + } catch (error) { + els.analyticsMeta.textContent = "错误"; + } +} + +function renderAnalytics(data) { + els.analyticsMeta.textContent = `${data.market_count} 个市场 · ${data.trusted_samples}/${data.total_samples} 可信样本`; + renderPaper(data); + const markets = data.markets || []; + if (!markets.length) { + els.analyticsTable.innerHTML = '
暂无市场记录
'; + return; + } + els.analyticsTable.innerHTML = ` +
+ 市场 + 样本 + Up 最大 + Down 最大 + 最后 60 秒 + 信号 + 价格源 +
+ ${markets.map(renderAnalyticsRow).join("")} + `; +} + +function renderPaper(data) { + const paper = data.paper || {}; + els.paperMeta.textContent = `${data.edge_threshold ? (data.edge_threshold * 100).toFixed(1) : "--"}¢ 阈值 · $${data.notional || "--"}`; + els.paperPnlTop.textContent = fmtPnl(paper.pnl); + els.paperWinRateTop.textContent = `${paper.trades ?? 0} 笔 · 胜率 ${fmtPct(paper.win_rate)}`; + els.paperSummary.innerHTML = ` +
可信交易数${paper.trades ?? 0}
+
胜率${fmtPct(paper.win_rate)}
+
总 PnL${fmtPnl(paper.pnl)}
+
平均 PnL${fmtPnl(paper.avg_pnl)}
+ `; + + const trades = (data.markets || []).flatMap((market) => + (market.paper_trades || []).map((trade) => ({ ...trade, slug: market.slug })), + ); + if (!trades.length) { + els.paperTable.innerHTML = '
暂无虚拟交易
'; + return; + } + els.paperTable.innerHTML = ` +
+ 市场 + 方向 + 入场 + Edge + 滑点 + 结果 + 容量 +
+ ${trades.slice(0, 16).map(renderPaperRow).join("")} + `; +} + +function renderPaperRow(trade) { + const wonLabel = trade.won ? "赢" : "亏"; + const sideLabel = trade.side === "up" ? "买 Up" : "买 Down"; + return ` +
+ ${escapeHtml(trade.slug)} + ${sideLabel} + ${Number(trade.price).toFixed(2)} + ${fmtEdge(trade.edge)} + ${fmtEdge(trade.slippage)} + ${wonLabel} ${fmtPnl(trade.pnl)} + $${fmtMoney(trade.filled_notional)} · ${trade.levels_used}档 · ${fmtSeconds(trade.seconds_remaining)} +
+ `; +} + +function renderAnalyticsRow(market) { + const sources = Object.entries(market.source_counts || {}) + .map(([name, count]) => `${name.replace("polymarket_", "pm_")}:${count}`) + .join(" · "); + const last60 = Math.max( + market.max_last60_up_edge ?? -Infinity, + market.max_last60_down_edge ?? -Infinity, + ); + const direction = market.final_direction || "open"; + const directionLabel = { up: "上涨", down: "下跌", open: "进行中" }[direction] || direction; + return ` +
+ ${escapeHtml(market.slug)} + ${market.samples} + ${fmtEdge(market.max_buy_up_edge)} + ${fmtEdge(market.max_buy_down_edge)} + ${Number.isFinite(last60) ? fmtEdge(last60) : "--"} + ${market.up_signals}U / ${market.down_signals}D + ${directionLabel} ${escapeHtml(sources)} +
+ `; +} + +connect(); +refreshAnalytics(); diff --git a/src/poly_updown/web/index.html b/src/poly_updown/web/index.html new file mode 100644 index 0000000..0d2e5ee --- /dev/null +++ b/src/poly_updown/web/index.html @@ -0,0 +1,185 @@ + + + + + + BTC Up/Down Lab + + + +
+
+
+

Polymarket 5 分钟监控

+

BTC 涨跌实验台

+
+
+ + 连接中 +
+
+ +
+
+ 当前信号 + -- + 暂无信号 +
+
+ Paper PnL + -- + 等待回放 +
+
+ 数据可信度 + -- + -- +
+
+ 系统健康 + -- + 等待 tick +
+
+ 剩余时间 + -- + -- +
+
+ BTC / 样本 + -- + -- 条 · -- +
+
+ +
+
+
+
+

当前合约

+

等待市场

+
+ SYNC +
+
+
+
Slug
+
--
+
+
+
起始基准价
+
--
+
+
+
是否接单
+
--
+
+
+
波动 / sqrt(s)
+
--
+
+
+
价格源
+
--
+
+
+
最近报价
+
--
+
+
+
上涨公允概率
+
--
+
+
+
数据库
+
--
+
+
+
RTDS 延迟
+
--
+
+
+
起点偏移
+
--
+
+
+
CLOB 盘口年龄
+
--
+
+
+
CLOB 最近事件
+
--
+
+
+
+ +
+
+
+

吃单检测

+

扣费后优势

+
+
+
+
+ 买 Up +
+ -- +
+
+ 买 Down +
+ -- +
+
+
+ +
+
+

Up 盘口

+
+
+
+ +
+
+

Down 盘口

+
+
+
+ +
+
+
+

Paper 策略

+

虚拟吃单回测

+
+ 等待 +
+
+
+
+ +
+
+
+

市场回放

+

近期证据

+
+ LOADING +
+
+
+ +
+
+

运行日志

+
+
+
+
+
+ + + + diff --git a/src/poly_updown/web/styles.css b/src/poly_updown/web/styles.css new file mode 100644 index 0000000..7cf1527 --- /dev/null +++ b/src/poly_updown/web/styles.css @@ -0,0 +1,483 @@ +:root { + --ink: #14120e; + --muted: #6d695e; + --line: #d8d1c4; + --paper: #f3eee4; + --panel: #fbf7ee; + --green: #0d8f61; + --red: #c54432; + --amber: #d08b18; + --blue: #225f91; + --shadow: 0 18px 50px rgba(33, 29, 21, 0.12); +} + +* { + box-sizing: border-box; +} + +body { + margin: 0; + min-height: 100vh; + color: var(--ink); + background: + linear-gradient(90deg, rgba(20, 18, 14, 0.045) 1px, transparent 1px), + linear-gradient(180deg, rgba(20, 18, 14, 0.045) 1px, transparent 1px), + var(--paper); + background-size: 26px 26px; + font-family: "Avenir Next", "Gill Sans", Helvetica, sans-serif; +} + +button, +input, +select { + font: inherit; +} + +.shell { + width: min(1480px, calc(100vw - 32px)); + margin: 0 auto; + padding: 28px 0 36px; +} + +.topbar { + display: flex; + align-items: end; + justify-content: space-between; + gap: 24px; + padding: 18px 0 22px; + border-bottom: 3px solid var(--ink); +} + +.kicker { + margin: 0 0 6px; + color: var(--muted); + font-size: 12px; + font-weight: 800; + letter-spacing: 0.12em; + text-transform: uppercase; +} + +h1, +h2 { + margin: 0; + letter-spacing: 0; +} + +h1 { + font-family: Georgia, "Times New Roman", serif; + font-size: clamp(34px, 5vw, 76px); + line-height: 0.92; +} + +h2 { + font-size: 20px; + line-height: 1.1; +} + +.status-strip { + display: inline-flex; + align-items: center; + gap: 10px; + min-width: 142px; + justify-content: center; + padding: 10px 14px; + border: 2px solid var(--ink); + background: var(--panel); + box-shadow: 5px 5px 0 var(--ink); + font-size: 13px; + font-weight: 800; + text-transform: uppercase; +} + +.dot { + width: 11px; + height: 11px; + border-radius: 999px; + background: var(--amber); +} + +.dot.live { + background: var(--green); +} + +.dot.offline { + background: var(--red); +} + +.ticker-grid { + display: grid; + grid-template-columns: 1.35fr 1.05fr 1.05fr 1.05fr 1fr 1.35fr; + gap: 14px; + margin: 22px 0 14px; +} + +.metric, +.panel { + border: 2px solid var(--ink); + background: var(--panel); + box-shadow: var(--shadow); +} + +.metric { + min-height: 126px; + padding: 16px; + display: flex; + flex-direction: column; + justify-content: space-between; +} + +.metric.primary { + background: #e8f1ed; +} + +.health-card.ok { + background: #e8f1ed; +} + +.health-card.warn { + background: #fff0c2; +} + +.health-card.bad { + background: #f4c2b9; +} + +.signal-card { + border-width: 3px; + box-shadow: 7px 7px 0 var(--ink); +} + +.label, +.metric small, +dt { + color: var(--muted); + font-size: 12px; + font-weight: 800; + letter-spacing: 0.08em; + text-transform: uppercase; +} + +.metric strong { + display: block; + margin-top: 8px; + font-family: Georgia, "Times New Roman", serif; + font-size: clamp(26px, 3.6vw, 48px); + line-height: 1; +} + +.metric small { + display: block; + min-height: 16px; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.main-grid { + display: grid; + grid-template-columns: 1.35fr 1fr 1fr; + gap: 14px; + align-items: start; +} + +.panel { + padding: 16px; +} + +.market-panel { + grid-column: span 2; +} + +.edge-panel { + min-height: 100%; +} + +.log-panel, +.replay-panel, +.paper-panel { + grid-column: span 3; +} + +.panel-head { + display: flex; + align-items: start; + justify-content: space-between; + gap: 16px; + padding-bottom: 14px; + border-bottom: 1px solid var(--line); +} + +.panel-head.compact { + align-items: center; +} + +.badge { + flex: 0 0 auto; + padding: 7px 10px; + border: 2px solid var(--ink); + background: #f4d35e; + font-size: 12px; + font-weight: 900; +} + +.badge.closed { + background: #e9a39a; +} + +.badge.open { + background: #9bd7b7; +} + +.facts { + display: grid; + grid-template-columns: repeat(2, minmax(0, 1fr)); + gap: 12px; + margin: 16px 0 0; +} + +.facts div { + min-width: 0; + padding: 12px; + background: rgba(255, 255, 255, 0.48); + border: 1px solid var(--line); +} + +dd { + margin: 5px 0 0; + min-height: 20px; + overflow-wrap: anywhere; + font-weight: 800; +} + +.edge-bars { + display: grid; + gap: 18px; + margin-top: 18px; +} + +.edge-row { + display: grid; + grid-template-columns: 78px 1fr 82px; + gap: 12px; + align-items: center; + font-size: 13px; + font-weight: 900; +} + +.bar-track { + height: 18px; + border: 2px solid var(--ink); + background: linear-gradient(90deg, #f0b4a9 0 50%, #a9d9bf 50% 100%); + position: relative; +} + +.bar-track::after { + content: ""; + position: absolute; + left: 50%; + top: -4px; + bottom: -4px; + width: 2px; + background: var(--ink); +} + +.bar { + position: absolute; + top: 2px; + bottom: 2px; + left: 50%; + width: 0; + background: var(--blue); + transition: width 180ms ease, left 180ms ease, background 180ms ease; +} + +.bar.negative { + background: var(--red); +} + +.book { + display: grid; + gap: 6px; + margin-top: 12px; + min-height: 248px; +} + +.book-row { + display: grid; + grid-template-columns: 42px 1fr 70px; + align-items: center; + gap: 8px; + min-height: 24px; + font-size: 12px; + font-variant-numeric: tabular-nums; +} + +.book-row .side { + font-weight: 900; +} + +.book-row .depth { + position: relative; + height: 18px; + border: 1px solid var(--line); + background: rgba(255, 255, 255, 0.55); + overflow: hidden; +} + +.book-row .fill { + display: block; + height: 100%; + width: var(--w, 0%); + background: rgba(34, 95, 145, 0.35); +} + +.book-row.ask .fill { + background: rgba(197, 68, 50, 0.35); +} + +.empty { + color: var(--muted); + font-size: 13px; + font-weight: 800; + padding: 18px 0; +} + +.event-log { + display: grid; + gap: 8px; + max-height: 220px; + overflow: auto; + padding-top: 12px; +} + +.event { + display: grid; + grid-template-columns: 184px 68px 1fr; + gap: 10px; + align-items: baseline; + padding: 9px 10px; + border: 1px solid var(--line); + background: rgba(255, 255, 255, 0.5); + font-size: 13px; +} + +.event b { + color: var(--blue); + text-transform: uppercase; +} + +.event.error b { + color: var(--red); +} + +.analytics-table { + display: grid; + gap: 8px; + padding-top: 12px; +} + +.analytics-row { + display: grid; + grid-template-columns: minmax(170px, 1.35fr) 68px 84px 84px 88px 98px 1fr; + gap: 10px; + align-items: center; + min-height: 46px; + padding: 9px 10px; + border: 1px solid var(--line); + background: rgba(255, 255, 255, 0.5); + font-size: 12px; + font-variant-numeric: tabular-nums; +} + +.analytics-row.header { + color: var(--muted); + font-size: 11px; + font-weight: 900; + letter-spacing: 0.08em; + text-transform: uppercase; + background: transparent; +} + +.analytics-row b { + overflow-wrap: anywhere; +} + +.pill { + display: inline-flex; + align-items: center; + justify-content: center; + min-height: 24px; + padding: 3px 7px; + border: 1px solid var(--ink); + font-weight: 900; +} + +.pill.up { + background: #9bd7b7; +} + +.pill.down { + background: #e9a39a; +} + +.pill.open { + background: #f4d35e; +} + +.paper-summary { + display: grid; + grid-template-columns: repeat(4, minmax(0, 1fr)); + gap: 10px; + padding-top: 12px; +} + +.paper-summary div { + padding: 12px; + border: 1px solid var(--line); + background: rgba(255, 255, 255, 0.5); +} + +.paper-summary span { + display: block; + color: var(--muted); + font-size: 11px; + font-weight: 900; + letter-spacing: 0.08em; + text-transform: uppercase; +} + +.paper-summary b { + display: block; + margin-top: 7px; + font-size: 20px; +} + +@media (max-width: 980px) { + .ticker-grid, + .main-grid, + .facts { + grid-template-columns: 1fr; + } + + .market-panel, + .log-panel, + .replay-panel, + .paper-panel { + grid-column: auto; + } + + .paper-summary { + grid-template-columns: 1fr; + } + + .topbar { + align-items: start; + flex-direction: column; + } + + .event, + .analytics-row { + grid-template-columns: 1fr; + gap: 3px; + } +} diff --git a/src/polymarket_btc_updown_lab.egg-info/PKG-INFO b/src/polymarket_btc_updown_lab.egg-info/PKG-INFO new file mode 100644 index 0000000..a35f862 --- /dev/null +++ b/src/polymarket_btc_updown_lab.egg-info/PKG-INFO @@ -0,0 +1,9 @@ +Metadata-Version: 2.4 +Name: polymarket-btc-updown-lab +Version: 0.1.0 +Summary: Read-only feasibility lab for Polymarket BTC Up/Down 5m markets. +Requires-Python: >=3.11 +Requires-Dist: aiohttp>=3.9 +Requires-Dist: duckdb>=1.1 +Provides-Extra: dev +Requires-Dist: pytest>=8.0; extra == "dev" diff --git a/src/polymarket_btc_updown_lab.egg-info/SOURCES.txt b/src/polymarket_btc_updown_lab.egg-info/SOURCES.txt new file mode 100644 index 0000000..b1a411e --- /dev/null +++ b/src/polymarket_btc_updown_lab.egg-info/SOURCES.txt @@ -0,0 +1,33 @@ +README.md +pyproject.toml +src/poly_updown/__init__.py +src/poly_updown/analytics.py +src/poly_updown/cli.py +src/poly_updown/clob.py +src/poly_updown/clob_ws.py +src/poly_updown/duckstore.py +src/poly_updown/gamma.py +src/poly_updown/live.py +src/poly_updown/models.py +src/poly_updown/observer.py +src/poly_updown/prices.py +src/poly_updown/server.py +src/poly_updown/storage.py +src/poly_updown/strategy.py +src/poly_updown/tick_buffer.py +src/poly_updown/volatility.py +src/poly_updown/web/app.js +src/poly_updown/web/index.html +src/poly_updown/web/styles.css +src/polymarket_btc_updown_lab.egg-info/PKG-INFO +src/polymarket_btc_updown_lab.egg-info/SOURCES.txt +src/polymarket_btc_updown_lab.egg-info/dependency_links.txt +src/polymarket_btc_updown_lab.egg-info/entry_points.txt +src/polymarket_btc_updown_lab.egg-info/requires.txt +src/polymarket_btc_updown_lab.egg-info/top_level.txt +tests/test_analytics.py +tests/test_dashboard_assets.py +tests/test_gamma.py +tests/test_live.py +tests/test_prices.py +tests/test_strategy.py \ No newline at end of file diff --git a/src/polymarket_btc_updown_lab.egg-info/dependency_links.txt b/src/polymarket_btc_updown_lab.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/polymarket_btc_updown_lab.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/src/polymarket_btc_updown_lab.egg-info/entry_points.txt b/src/polymarket_btc_updown_lab.egg-info/entry_points.txt new file mode 100644 index 0000000..3ae1094 --- /dev/null +++ b/src/polymarket_btc_updown_lab.egg-info/entry_points.txt @@ -0,0 +1,3 @@ +[console_scripts] +updown-dashboard = poly_updown.server:main +updown-lab = poly_updown.cli:main diff --git a/src/polymarket_btc_updown_lab.egg-info/requires.txt b/src/polymarket_btc_updown_lab.egg-info/requires.txt new file mode 100644 index 0000000..f0e1152 --- /dev/null +++ b/src/polymarket_btc_updown_lab.egg-info/requires.txt @@ -0,0 +1,5 @@ +aiohttp>=3.9 +duckdb>=1.1 + +[dev] +pytest>=8.0 diff --git a/src/polymarket_btc_updown_lab.egg-info/top_level.txt b/src/polymarket_btc_updown_lab.egg-info/top_level.txt new file mode 100644 index 0000000..10ca30a --- /dev/null +++ b/src/polymarket_btc_updown_lab.egg-info/top_level.txt @@ -0,0 +1 @@ +poly_updown diff --git a/tests/test_analytics.py b/tests/test_analytics.py new file mode 100644 index 0000000..e03498f --- /dev/null +++ b/tests/test_analytics.py @@ -0,0 +1,33 @@ +from pathlib import Path + +from poly_updown.analytics import load_recent_market_stats + + +def test_load_recent_market_stats_groups_samples(tmp_path: Path) -> None: + path = tmp_path / "samples.jsonl" + path.write_text( + "\n".join( + [ + '{"recorded_at":"2026-05-21T15:20:01Z","benchmark_source":"rtds_boundary","start_boundary":{"price":100,"offset_ms":500},"market":{"event_slug":"m1","title":"M1","start_time":"s","end_time":"e","seconds_remaining":120,"price_to_beat":100},"tick":{"source":"polymarket_rtds_chainlink","price":99},"books":{"up":{"asks":[{"price":0.52,"size":100}]},"down":{"asks":[{"price":0.48,"size":100}]}},"edge":{"fair_up":0.55,"buy_up_edge":0.01,"buy_down_edge":0.02,"up_ask":0.52,"down_ask":0.48}}', + '{"recorded_at":"2026-05-21T15:20:02Z","benchmark_source":"rtds_boundary","start_boundary":{"price":100,"offset_ms":500},"market":{"event_slug":"m1","title":"M1","start_time":"s","end_time":"e","seconds_remaining":50,"price_to_beat":100},"tick":{"source":"polymarket_rtds_chainlink","price":101},"books":{"up":{"asks":[{"price":0.52,"size":100}]},"down":{"asks":[{"price":0.48,"size":100}]}},"edge":{"fair_up":0.58,"buy_up_edge":0.04,"buy_down_edge":-0.01,"up_ask":0.52,"down_ask":0.48}}', + ] + ), + encoding="utf-8", + ) + + result = load_recent_market_stats(path, edge_threshold=0.03) + + market = result["markets"][0] + assert result["total_samples"] == 2 + assert result["trusted_samples"] == 2 + assert market["slug"] == "m1" + assert market["max_buy_up_edge"] == 0.04 + assert market["max_last60_up_edge"] == 0.04 + assert market["up_signals"] == 1 + assert market["final_direction"] == "up" + assert market["paper_trades"][0]["side"] == "up" + assert market["paper_trades"][0]["won"] is True + assert market["paper_trades"][0]["filled_notional"] == 25.0 + assert market["paper_trades"][0]["levels_used"] == 1 + assert result["paper"]["trades"] == 1 + assert result["paper"]["wins"] == 1 diff --git a/tests/test_dashboard_assets.py b/tests/test_dashboard_assets.py new file mode 100644 index 0000000..0ec188e --- /dev/null +++ b/tests/test_dashboard_assets.py @@ -0,0 +1,28 @@ +from pathlib import Path + + +WEB_ROOT = Path(__file__).parents[1] / "src" / "poly_updown" / "web" + + +def test_dashboard_assets_reference_expected_mounts() -> None: + html = (WEB_ROOT / "index.html").read_text(encoding="utf-8") + + assert "/static/styles.css" in html + assert "/static/app.js" in html + assert "BTC 涨跌实验台" in html + assert "当前信号" in html + assert 'id="paperPnlTop"' in html + assert 'id="btcPrice"' in html + assert 'id="upBook"' in html + assert 'id="eventLog"' in html + assert 'id="analyticsTable"' in html + assert 'id="paperTable"' in html + + +def test_dashboard_script_uses_sse_stream() -> None: + script = (WEB_ROOT / "app.js").read_text(encoding="utf-8") + + assert 'new EventSource("/api/stream")' in script + assert 'fetch("/api/analytics")' in script + assert "renderBook" in script + assert "renderEdgeBar" in script diff --git a/tests/test_gamma.py b/tests/test_gamma.py new file mode 100644 index 0000000..b1396c5 --- /dev/null +++ b/tests/test_gamma.py @@ -0,0 +1,31 @@ +from poly_updown.gamma import parse_market_info + + +def test_parse_market_info_extracts_updown_tokens() -> None: + event = { + "slug": "btc-updown-5m-test", + "title": "Bitcoin Up or Down", + "startTime": "2026-05-21T07:15:00Z", + "endDate": "2026-05-21T07:20:00Z", + "closed": False, + "eventMetadata": {"priceToBeat": 77525.3}, + "markets": [ + { + "id": "1", + "conditionId": "0xabc", + "clobTokenIds": '["up", "down"]', + "acceptingOrders": True, + "enableOrderBook": True, + "closed": False, + "bestBid": 0.51, + "bestAsk": 0.52, + } + ], + } + + market = parse_market_info(event) + + assert market.price_to_beat == 77525.3 + assert market.up_token_id == "up" + assert market.down_token_id == "down" + assert market.accepting_orders is True diff --git a/tests/test_live.py b/tests/test_live.py new file mode 100644 index 0000000..14584cf --- /dev/null +++ b/tests/test_live.py @@ -0,0 +1,9 @@ +from datetime import datetime, timezone + +from poly_updown.live import current_btc_updown_slug + + +def test_current_slug_uses_five_minute_floor() -> None: + moment = datetime(2026, 5, 21, 15, 7, 42, tzinfo=timezone.utc) + + assert current_btc_updown_slug(moment) == "btc-updown-5m-1779375900" diff --git a/tests/test_prices.py b/tests/test_prices.py new file mode 100644 index 0000000..727b09c --- /dev/null +++ b/tests/test_prices.py @@ -0,0 +1,34 @@ +from poly_updown.prices import _parse_chainlink_message + + +def test_parse_chainlink_rtds_message() -> None: + ticks = _parse_chainlink_message( + { + "topic": "crypto_prices", + "type": "update", + "payload": { + "symbol": "btc/usd", + "data": [ + {"timestamp": 1779376675000, "value": 77123.45}, + {"timestamp": 1779376676000, "value": 77124.45}, + ], + }, + } + ) + + assert len(ticks) == 2 + tick = ticks[-1] + assert tick is not None + assert tick.source == "polymarket_rtds_chainlink" + assert tick.symbol == "btc/usd" + assert tick.price == 77124.45 + + +def test_parse_chainlink_rtds_ignores_other_symbols() -> None: + assert _parse_chainlink_message( + { + "topic": "crypto_prices", + "type": "update", + "payload": {"symbol": "eth/usd", "value": 3000}, + } + ) == [] diff --git a/tests/test_strategy.py b/tests/test_strategy.py new file mode 100644 index 0000000..cfaaaa6 --- /dev/null +++ b/tests/test_strategy.py @@ -0,0 +1,29 @@ +from poly_updown.strategy import crypto_taker_fee, fair_up_probability + + +def test_crypto_taker_fee_peaks_near_half() -> None: + assert round(crypto_taker_fee(0.5), 4) == 0.0175 + assert crypto_taker_fee(0.1) < crypto_taker_fee(0.5) + + +def test_fair_probability_moves_with_distance() -> None: + base = fair_up_probability( + current_price=100.0, + price_to_beat=100.0, + seconds_remaining=60.0, + sigma_price_per_sqrt_second=1.0, + ) + high = fair_up_probability( + current_price=102.0, + price_to_beat=100.0, + seconds_remaining=60.0, + sigma_price_per_sqrt_second=1.0, + ) + low = fair_up_probability( + current_price=98.0, + price_to_beat=100.0, + seconds_remaining=60.0, + sigma_price_per_sqrt_second=1.0, + ) + + assert low < base < high