diff --git a/app/cli.py b/app/cli.py index 26e879c..f26c185 100644 --- a/app/cli.py +++ b/app/cli.py @@ -3,7 +3,7 @@ import argparse import sys -from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, onchain_monitor, paper_trader, price_tracker, review_engine, sentiment_monitor +from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, onchain_monitor, paper_trader, price_streamer, price_tracker, review_engine, sentiment_monitor def build_parser(): @@ -21,6 +21,8 @@ def build_parser(): paper = subparsers.add_parser("paper-trader", help="运行模拟交易账本同步") paper.add_argument("--limit", type=int, default=100, help="本轮最多处理的可执行推荐数量") + subparsers.add_parser("price-streamer", help="运行 websocket 实时价格流") + review = subparsers.add_parser("review", help="运行复盘") review.add_argument("--compact", action="store_true", help="输出紧凑 JSON") review.add_argument("--no-push", action="store_true", help="只运行复盘,不发飞书") @@ -55,6 +57,8 @@ def main(): return price_tracker.main() if args.command == "paper-trader": return paper_trader.main(limit=args.limit) + if args.command == "price-streamer": + return price_streamer.main() if args.command == "review": return review_engine.run_review(push_enabled=not args.no_push, compact=args.compact) if args.command == "event": diff --git a/app/config/system_config.py b/app/config/system_config.py index db29c3d..2af298e 100644 --- a/app/config/system_config.py +++ b/app/config/system_config.py @@ -13,6 +13,7 @@ from app.db.runtime_config_db import ( get_notification_config, get_onchain_config, get_paper_trading_config, + get_price_streamer_config, get_scheduler_config, get_sentiment_config, seed_system_defaults, @@ -97,6 +98,23 @@ def default_paper_trading_config(): } +def default_price_streamer_config(): + return { + "enabled": _env_bool("ALPHAX_PRICE_STREAMER_ENABLED", True), + "provider": _env_str("ALPHAX_PRICE_STREAMER_PROVIDER", "binance_spot"), + "stream_url": _env_str("ALPHAX_PRICE_STREAMER_URL", "wss://stream.binance.com:9443/stream"), + "refresh_symbols_seconds": _env_float("ALPHAX_PRICE_STREAMER_REFRESH_SYMBOLS_SECONDS", 20), + "idle_sleep_seconds": _env_float("ALPHAX_PRICE_STREAMER_IDLE_SLEEP_SECONDS", 5), + "reconnect_delay_seconds": _env_float("ALPHAX_PRICE_STREAMER_RECONNECT_DELAY_SECONDS", 5), + "max_stream_symbols": _env_int("ALPHAX_PRICE_STREAMER_MAX_SYMBOLS", 200), + "include_actionable_recommendations": True, + "include_open_paper_trades": True, + "update_latest_price_cache": True, + "sync_paper_trading": True, + "log_every_events": _env_int("ALPHAX_PRICE_STREAMER_LOG_EVERY_EVENTS", 100), + } + + def default_sentiment_config(): return { "enabled": _env_bool("ALPHAX_SENTIMENT_ENABLED", True), @@ -263,6 +281,7 @@ def seed_runtime_system_defaults(): "llm": (default_llm_config(), "LLM provider and module switches; API key remains in env"), "onchain": (default_onchain_config(), "On-chain provider and signal thresholds; API keys remain in env"), "paper_trading": (default_paper_trading_config(), "Paper trading account and execution model"), + "price_streamer": (default_price_streamer_config(), "Realtime websocket price streamer settings"), "sentiment": (default_sentiment_config(), "Sentiment monitoring settings"), "event_driven": (default_event_driven_config(), "Event/news driven screening settings"), "monitoring": (default_monitoring_config(), "Monitoring and audit settings"), @@ -301,6 +320,14 @@ def paper_trading_config(): return cfg or default_paper_trading_config() +def price_streamer_config(): + cfg = get_price_streamer_config(default=None) + if cfg is None: + _seed_one("price_streamer", default_price_streamer_config(), "Realtime websocket price streamer settings") + cfg = get_price_streamer_config(default=None) + return cfg or default_price_streamer_config() + + def notification_config(): cfg = get_notification_config(default=None) if cfg is None: @@ -367,6 +394,7 @@ __all__ = [ "default_notification_config", "default_onchain_config", "default_paper_trading_config", + "default_price_streamer_config", "default_scheduler_config", "default_sentiment_config", "email_config", @@ -376,6 +404,7 @@ __all__ = [ "notification_config", "onchain_config", "paper_trading_config", + "price_streamer_config", "scheduler_config", "sentiment_config", "seed_runtime_system_defaults", diff --git a/app/db/runtime_config_db.py b/app/db/runtime_config_db.py index f409bd9..264bd1e 100644 --- a/app/db/runtime_config_db.py +++ b/app/db/runtime_config_db.py @@ -221,6 +221,14 @@ def set_paper_trading_config(value, updated_by="", source="manual"): return set_config("system", "paper_trading", value, description="Paper trading account and execution model", source=source, updated_by=updated_by) +def get_price_streamer_config(default=None): + return get_config("system", "price_streamer", default=default) + + +def set_price_streamer_config(value, updated_by="", source="manual"): + return set_config("system", "price_streamer", value, description="Realtime websocket price streamer settings", source=source, updated_by=updated_by) + + def get_notification_config(default=None): return get_config("system", "notification", default=default) @@ -276,6 +284,7 @@ __all__ = [ "get_notification_config", "get_onchain_config", "get_paper_trading_config", + "get_price_streamer_config", "get_scheduler_config", "get_sentiment_config", "get_learned_rules_config", @@ -292,6 +301,7 @@ __all__ = [ "set_notification_config", "set_onchain_config", "set_paper_trading_config", + "set_price_streamer_config", "set_scheduler_config", "set_sentiment_config", "seed_system_defaults", diff --git a/app/db/scheduler_db.py b/app/db/scheduler_db.py index a8cb1a5..66a8148 100644 --- a/app/db/scheduler_db.py +++ b/app/db/scheduler_db.py @@ -376,6 +376,7 @@ def _display_job_name(job_name): "sentiment": "舆情", "onchain": "链上", "llm-sentiment": "AI舆情", + "paper-trader": "模拟交易", "review": "复盘", }.get(job_name, job_name) diff --git a/app/services/price_streamer.py b/app/services/price_streamer.py new file mode 100644 index 0000000..e0496f2 --- /dev/null +++ b/app/services/price_streamer.py @@ -0,0 +1,194 @@ +"""Realtime Binance websocket price stream for paper trading. + +The stream is intentionally separate from the cron tracker. Websocket ticks +drive paper-trading TP/SL decisions quickly, while cron remains a fallback. +""" + +from __future__ import annotations + +import asyncio +import json +from datetime import datetime + +import websockets + +from app.config.system_config import price_streamer_config +from app.db.altcoin_db import init_db, update_latest_price_cache +from app.db.paper_trading import sync_recommendation +from app.db.recommendation_queries import get_active_recommendations_deduped +from app.db.schema import get_conn +from app.db.system_logs import record_exception + + +def _now() -> str: + return datetime.now().isoformat() + + +def _safe_float(value, default=0.0) -> float: + try: + if value is None or value == "": + return default + return float(value) + except Exception: + return default + + +def _safe_int(value, default=0) -> int: + try: + return int(value or 0) + except Exception: + return default + + +def _stream_name(symbol: str) -> str: + base = str(symbol or "").replace("/", "").lower() + return f"{base}@ticker" if base else "" + + +def _stream_url(symbols: list[str], cfg: dict | None = None) -> str: + cfg = cfg or price_streamer_config() + base_url = str(cfg.get("stream_url") or "wss://stream.binance.com:9443/stream").rstrip("/") + streams = "/".join(_stream_name(s) for s in symbols if _stream_name(s)) + return f"{base_url}?streams={streams}" + + +def _load_open_paper_trade_recs() -> list[dict]: + conn = get_conn() + try: + rows = conn.execute( + """ + SELECT + p.recommendation_id AS id, + p.symbol, + p.entry_price, + p.stop_loss, + p.tp1, + p.tp2, + p.source_status AS execution_status, + p.source_action AS action_status, + p.strategy_version + FROM paper_trades p + WHERE p.status='open' + ORDER BY p.opened_at DESC, p.id DESC + """ + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def load_stream_targets(limit: int | None = None, cfg: dict | None = None) -> dict[str, dict]: + """Return symbol -> recommendation-like payload for websocket updates.""" + cfg = cfg or price_streamer_config() + max_symbols = max(1, _safe_int(limit or cfg.get("max_stream_symbols"), 200)) + targets: dict[str, dict] = {} + + if cfg.get("include_actionable_recommendations", True): + for rec in get_active_recommendations_deduped(actionable_only=True, limit=max_symbols, with_meta=False): + symbol = str(rec.get("symbol") or "").strip().upper() + if symbol: + targets[symbol] = dict(rec) + + if cfg.get("include_open_paper_trades", True): + for rec in _load_open_paper_trade_recs(): + symbol = str(rec.get("symbol") or "").strip().upper() + if symbol: + targets.setdefault(symbol, rec) + + return dict(list(targets.items())[:max_symbols]) + + +def handle_price_tick(symbol: str, price: float, targets: dict[str, dict], event_time: str | None = None, cfg: dict | None = None) -> dict: + cfg = cfg or price_streamer_config() + symbol = str(symbol or "").strip().upper() + price = _safe_float(price) + if not symbol or price <= 0: + return {"skipped": True, "reason": "invalid_tick"} + event_time = event_time or _now() + if cfg.get("update_latest_price_cache", True): + update_latest_price_cache(symbol, price, updated_at=event_time, source="price_streamer") + if not cfg.get("sync_paper_trading", True): + return {"updated_price": True, "paper_trading": {"skipped": True, "reason": "disabled_by_streamer"}} + rec = targets.get(symbol) + if not rec: + return {"updated_price": True, "paper_trading": {"skipped": True, "reason": "no_target"}} + result = sync_recommendation(rec, price, event_time=event_time) + return {"updated_price": True, "paper_trading": result} + + +def _parse_ticker_message(raw: str) -> tuple[str, float]: + payload = json.loads(raw) + data = payload.get("data") if isinstance(payload, dict) else None + if not isinstance(data, dict): + data = payload if isinstance(payload, dict) else {} + symbol = str(data.get("s") or "") + price = _safe_float(data.get("c") or data.get("p") or data.get("lastPrice")) + if symbol.endswith("USDT"): + symbol = f"{symbol[:-4]}/USDT" + return symbol.upper(), price + + +async def run_forever() -> None: + init_db() + cfg = price_streamer_config() + if not cfg.get("enabled", True): + print("[price-streamer] disabled by system_config.price_streamer", flush=True) + while True: + await asyncio.sleep(max(1.0, _safe_float(cfg.get("idle_sleep_seconds"), 5))) + + refresh_seconds = max(5.0, _safe_float(cfg.get("refresh_symbols_seconds"), 20)) + idle_sleep = max(1.0, _safe_float(cfg.get("idle_sleep_seconds"), 5)) + reconnect_delay = max(1.0, _safe_float(cfg.get("reconnect_delay_seconds"), 5)) + log_every = max(1, _safe_int(cfg.get("log_every_events"), 100)) + + while True: + try: + cfg = price_streamer_config() + targets = load_stream_targets(cfg=cfg) + symbols = sorted(targets.keys()) + if not symbols: + print("[price-streamer] no active symbols, sleeping", flush=True) + await asyncio.sleep(idle_sleep) + continue + url = _stream_url(symbols, cfg) + print(f"[price-streamer] connecting symbols={len(symbols)}", flush=True) + last_refresh = asyncio.get_running_loop().time() + count = 0 + async with websockets.connect(url, ping_interval=20, ping_timeout=20, close_timeout=5) as ws: + async for raw in ws: + symbol, price = _parse_ticker_message(raw) + if symbol and price > 0: + handle_price_tick(symbol, price, targets, event_time=_now(), cfg=cfg) + count += 1 + if count % log_every == 0: + print(f"[price-streamer] ticks={count} latest={symbol} {price}", flush=True) + if asyncio.get_running_loop().time() - last_refresh >= refresh_seconds: + new_targets = load_stream_targets(cfg=cfg) + if sorted(new_targets.keys()) != symbols: + print("[price-streamer] symbol set changed, reconnecting", flush=True) + break + targets = new_targets + last_refresh = asyncio.get_running_loop().time() + except Exception as exc: + print(f"[price-streamer] error: {exc}", flush=True) + try: + record_exception(exc, source="price_streamer") + except Exception: + pass + await asyncio.sleep(reconnect_delay) + + +def main(): + asyncio.run(run_forever()) + + +__all__ = [ + "handle_price_tick", + "load_stream_targets", + "main", + "run_forever", +] + + +if __name__ == "__main__": + main() diff --git a/docker-compose.yml b/docker-compose.yml index 2a0449f..6869699 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -73,5 +73,24 @@ services: - ./logs:/app/logs - ./rules.yaml:/app/rules.yaml + alphax-price-streamer: + image: alphax:local + container_name: alphax-price-streamer + restart: unless-stopped + depends_on: + alphax-web: + condition: service_started + env_file: + - .env + environment: + ALPHAX_ENV: "${ALPHAX_ENV:-dev}" + ALPHAX_DB_BACKEND: "postgres" + DATABASE_URL: "${DATABASE_URL:-postgresql://alphax:alphax_dev_password@postgres:5432/alphax_dev}" + command: ["price-streamer"] + volumes: + - ./data:/app/data + - ./logs:/app/logs + - ./rules.yaml:/app/rules.yaml + volumes: postgres_data: diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 303cdd9..35b4277 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -11,6 +11,9 @@ case "${1:-web}" in scheduler) exec python /app/docker/scheduler.py ;; + price-streamer) + exec python -m app.cli price-streamer + ;; once) shift exec python "$@" diff --git a/requirements.txt b/requirements.txt index 6149ac3..e680f32 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ pytest==8.3.4 httpx==0.28.1 jinja2==3.1.6 psycopg[binary,pool]==3.2.9 +websockets==15.0.1 diff --git a/tests/test_price_streamer.py b/tests/test_price_streamer.py new file mode 100644 index 0000000..17c0215 --- /dev/null +++ b/tests/test_price_streamer.py @@ -0,0 +1,89 @@ +import pytest + +from app.db import altcoin_db +from app.db.paper_trading import list_paper_trades, sync_recommendation +from app.db.runtime_config_db import set_config +from app.services import price_streamer + + +@pytest.fixture +def buy_now_rec(monkeypatch): + set_config("system", "paper_trading", { + "enabled": True, + "trade_notional_usdt": 5000, + "trade_leverage": 5, + "fee_rate": 0, + "slippage_pct": 0, + }) + set_config("system", "price_streamer", { + "enabled": True, + "update_latest_price_cache": True, + "sync_paper_trading": True, + "include_actionable_recommendations": True, + "include_open_paper_trades": True, + }) + altcoin_db.init_db() + rec_id = altcoin_db.create_recommendation( + symbol="WS/USDT", + rec_state="爆发", + rec_score=28, + entry_price=100, + stop_loss=95, + tp1=106, + tp2=112, + signals=["当前15min即刻入场信号"], + entry_plan={ + "entry_action": "可即刻买入", + "entry_price": 100, + "stop_loss": 95, + "tp1": 106, + "tp2": 112, + "risk_reward_ok": True, + "entry_trigger_confirmed": True, + }, + ) + return next(r for r in altcoin_db.get_active_recommendations_deduped(actionable_only=False) if r["id"] == rec_id) + + +def test_price_streamer_loads_actionable_targets(buy_now_rec): + targets = price_streamer.load_stream_targets() + + assert "WS/USDT" in targets + assert targets["WS/USDT"]["id"] == buy_now_rec["id"] + + +def test_price_streamer_tick_opens_and_closes_paper_trade(buy_now_rec): + targets = {"WS/USDT": buy_now_rec} + + opened = price_streamer.handle_price_tick("WS/USDT", 100, targets, event_time="2026-05-16T10:00:00") + targets = price_streamer.load_stream_targets() + closed = price_streamer.handle_price_tick("WS/USDT", 106, targets, event_time="2026-05-16T10:01:00") + + assert opened["paper_trading"]["opened"] is True + assert closed["paper_trading"]["closed"] is True + assert closed["paper_trading"]["exit_reason"] == "tp1" + trade = list_paper_trades()["items"][0] + assert trade["status"] == "closed" + assert trade["notional_usdt"] == pytest.approx(5000.0) + + +def test_price_streamer_tracks_open_paper_trade_without_active_rec(buy_now_rec): + sync_recommendation(buy_now_rec, 100, event_time="2026-05-16T10:00:00") + + targets = price_streamer.load_stream_targets() + + assert "WS/USDT" in targets + assert targets["WS/USDT"]["id"] == buy_now_rec["id"] + + +def test_price_streamer_builds_binance_combined_stream_url(): + url = price_streamer._stream_url(["BTC/USDT", "ETH/USDT"], {"stream_url": "wss://example.test/stream"}) + + assert url == "wss://example.test/stream?streams=btcusdt@ticker/ethusdt@ticker" + + +def test_price_streamer_parse_ticker_message(): + symbol, price = price_streamer._parse_ticker_message('{"stream":"wsusdt@ticker","data":{"s":"WSUSDT","c":"101.5"}}') + + assert symbol == "WS/USDT" + assert price == pytest.approx(101.5) diff --git a/tests/test_runtime_config.py b/tests/test_runtime_config.py index 0db3665..cfc14d2 100644 --- a/tests/test_runtime_config.py +++ b/tests/test_runtime_config.py @@ -179,7 +179,7 @@ def test_runtime_config_api_can_manage_system_config(): def test_runtime_config_api_seeds_all_system_defaults_when_listing(): - for key in ["llm", "onchain", "paper_trading", "notification", "email", "bootstrap_admin", "scheduler"]: + for key in ["llm", "onchain", "paper_trading", "price_streamer", "notification", "email", "bootstrap_admin", "scheduler"]: delete_config("system", key) client = TestClient(web_server.app) @@ -187,7 +187,7 @@ def test_runtime_config_api_seeds_all_system_defaults_when_listing(): assert resp.status_code == 200 keys = {x["config_key"] for x in resp.json()["items"]} - for key in ["llm", "onchain", "paper_trading", "notification", "email", "bootstrap_admin", "scheduler"]: + for key in ["llm", "onchain", "paper_trading", "price_streamer", "notification", "email", "bootstrap_admin", "scheduler"]: assert key in keys diff --git a/tests/test_scheduler_control.py b/tests/test_scheduler_control.py index c34b9c2..ced8bd9 100644 --- a/tests/test_scheduler_control.py +++ b/tests/test_scheduler_control.py @@ -28,6 +28,7 @@ def test_scheduler_tables_seed_defaults(monkeypatch, tmp_path): assert jobs["event"]["lock_group"] == "recommendation_write" assert jobs["confirm"]["lock_group"] == "recommendation_write" assert jobs["tracker"]["every_seconds"] == 180 + assert jobs["paper-trader"]["lock_group"] == "paper_trading_write" assert jobs["onchain"]["lock_group"] == "onchain_write"