diff --git a/.env.example b/.env.example index f3c0e97..45248bf 100644 --- a/.env.example +++ b/.env.example @@ -89,9 +89,38 @@ ALPHAX_PAPER_TRAILING_VOL_DISTANCE_MULT=0.7 ALPHAX_PAPER_TRAILING_MOVE_PUSH_MIN_INTERVAL_SECONDS=300 ALPHAX_PAPER_TRAILING_MOVE_PUSH_MIN_STEP_PCT=2 +# 实盘准备模块。默认关闭且 dry-run,只生成订单意图,不真实下单。 +# 多 API 账号请在页面中配置不同 account_code 和不同 env key 名。 +ALPHAX_LIVE_TRADING_ENABLED=0 +ALPHAX_LIVE_TRADING_EXECUTION_MODE=exchange_api +ALPHAX_LIVE_TRADING_REQUIRE_HUMAN_APPROVAL=1 +ALPHAX_LIVE_TRADING_EXCHANGE=binance +ALPHAX_LIVE_TRADING_MARKET_TYPE=um_futures +ALPHAX_LIVE_TRADING_TESTNET=1 +ALPHAX_LIVE_TRADING_SANDBOX_MODE=demo +ALPHAX_LIVE_TRADING_ACCOUNT_CODE=binance_um_futures_main +ALPHAX_BINANCE_API_KEY_ENV=ALPHAX_BINANCE_API_KEY +ALPHAX_BINANCE_API_SECRET_ENV=ALPHAX_BINANCE_API_SECRET +ALPHAX_BINANCE_API_KEY= +ALPHAX_BINANCE_API_SECRET= +# 建议先使用 Binance Futures Testnet key 跑接口 smoke test。 +# 多账号可新增类似 ALPHAX_BINANCE_SUB1_API_KEY / ALPHAX_BINANCE_SUB1_API_SECRET,并在页面配置 env key 名。 +ALPHAX_BINANCE_TESTNET_API_KEY= +ALPHAX_BINANCE_TESTNET_API_SECRET= +ALPHAX_LIVE_TRADING_DEFAULT_LEVERAGE=1 +ALPHAX_LIVE_TRADING_MAX_ORDER_MARGIN_USDT=10 +ALPHAX_LIVE_TRADING_MAX_ORDER_NOTIONAL_USDT=50 +ALPHAX_LIVE_TRADING_MAX_SYMBOL_LEVERAGE=1 +ALPHAX_LIVE_TRADING_MAX_CUMULATIVE_LEVERAGE=1 +ALPHAX_LIVE_TRADING_MAX_DAILY_ORDER_COUNT=5 +ALPHAX_LIVE_TRADING_ALLOWED_SYMBOLS= + ALPHAX_SYSTEM_ERROR_FEISHU_ENABLED=0 ALPHAX_SYSTEM_ERROR_FEISHU_WEBHOOK= +ALPHAX_BINANCE_DEMO_API_KEY=r7dHchnHGVeyDU6rNUnZgZHZpqRpzWjqTzDAB46sUVDua5mp5amW7KSrltDipSuk +ALPHAX_BINANCE_DEMO_API_SECRET=jLKzapcO0iPtyxdPgKMK0FKMXLHpkg1EuhNYNHGUqCISwuJmuX7kQ6nardqK4K2Y + # 邮箱验证码 SMTP 配置。没有配置时,注册验证码只会生成,不会发邮件。 ASTOCK_SMTP_HOST= ASTOCK_SMTP_PORT=465 diff --git a/AGENTS.md b/AGENTS.md index cbe5d35..48c713d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -79,7 +79,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 ### 4.1 推荐系统业务闭环 -建议把系统理解为 8 个层次: +建议把系统理解为 9 个层次: 1. `app/services/market_overview.py` 采集全市场快照,为行情环境、涨幅榜和市场温度提供数据。 @@ -95,7 +95,9 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 负责可执行推荐的价格跟踪、状态迁移和动态风险提示。 7. `app/services/paper_trader.py` 负责模拟交易账本同步,真实 TP/SL、移动止盈、杠杆和资金口径在 paper trading 层管理。 -8. `app/services/review_engine.py` +8. `app/db/live_trading.py` / `app/web/routes_live_trading.py` + 负责实盘控制台:多交易所/多 API 账户配置、账号级风控、交易所接口验收和执行审计事件。页面不再使用“订单意图”作为产品概念,也不区分 Demo/正式环境,实际环境由 endpoint/API key 配置决定。 +9. `app/services/review_engine.py` 负责复盘与策略自迭代,包括信号绩效、漏选复盘、规则候选、版本演进。 ### 4.1.1 链上数据源 @@ -117,6 +119,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - `app/web/routes_strategy.py` - `app/web/routes_onchain.py` - `app/web/routes_paper_trading.py` +- `app/web/routes_live_trading.py` - `app/web/routes_market.py` - `app/web/routes_admin.py` - `app/web/routes_pages.py` @@ -184,6 +187,8 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - 运行时系统配置。 - `app/db/paper_trading.py` - 模拟交易账本、仓位、成交事件和资金口径。 +- `app/db/live_trading.py` + - 实盘控制台账本,多 API 账户、账号级风控、交易所接口验收与执行事件;不保存真实 API secret。 - `app/db/market_db.py` - 市场快照。 - `app/db/system_logs.py` @@ -206,7 +211,11 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - `scheduler_runtime_status` - `scheduler_manual_trigger` - `paper_trades` +- `paper_orders` - `paper_trade_events` +- `live_trade_accounts` +- `live_order_intents` +- `live_order_events` - `market_snapshots` - `sentiment_events` - `onchain_*` @@ -224,6 +233,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - `app/config/rules_schema.py` 负责规则结构校验。 - `app/config/system_config.py` 负责运行时系统配置,如 scheduler dry run、poll interval 等。 - `system_config` / `strategy_runtime_config` 等 PostgreSQL 表承载运行态配置。 +- `live_trading` runtime config 使用 `execution_mode=exchange_api` 表示真实调用当前配置的交易所 API endpoint;API key/secret 只通过环境变量名引用;多账户配置保存在 `live_trade_accounts`。 如果要改筛选阈值、确认门槛、止盈止损、动态权重逻辑,优先检查 `rules.yaml` 和 `app/config/config_loader.py`。如果要改调度行为或系统开关,优先检查 runtime config,而不是只看环境变量。 @@ -232,7 +242,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - `/app` - 真实实现层,按职责拆成 `services`, `db`, `core`, `config`, `integrations`, `analysis`, `web` - `/static` - - 页面文件,如 `app.html`, `pipeline.html`, `paper_trading.html`, `review_center.html`, `market.html`, `onchain.html`, `chat.html` + - 页面文件,如 `app.html`, `pipeline.html`, `paper_trading.html`, `live_trading.html`, `review_center.html`, `market.html`, `onchain.html`, `chat.html` - `/tests` - 状态机、认证订阅、推荐链路、调度、模拟交易、行情、复盘、前端页面约束等回归测试 - `/scripts` @@ -315,6 +325,14 @@ python -m app.cli llm-insights --scope sentiment --limit 40 Docker 内建议通过 `docker compose exec alphax-web python -m app.cli ...` 执行,确保使用容器内 `DATABASE_URL` 和依赖环境。 +实盘接口 smoke test 会调用当前配置的 Binance Futures API endpoint: + +```bash +docker compose exec alphax-web python -m app.cli live-trading-smoke --account-id 1 --symbol BTC/USDT --notional-usdt 10 --leverage 1 +``` + +该命令会依次测试余额/行情、设置杠杆、市价单、止盈单、止损单、限价挂单、撤单、最后市价平仓,并写入 `live_order_events`。不要把真实 API key 写入数据库或聊天;只在环境变量中保存密钥,`live_trade_accounts` 只保存 env key 名。 + ### 8.3 测试与校验 常用回归命令: diff --git a/app/cli.py b/app/cli.py index 82cbdc4..abb4079 100644 --- a/app/cli.py +++ b/app/cli.py @@ -3,7 +3,7 @@ import argparse import sys -from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, market_overview, onchain_monitor, paper_trader, price_streamer, price_tracker, review_engine, sentiment_monitor +from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, live_trading_smoke, market_overview, onchain_monitor, paper_trader, price_streamer, price_tracker, review_engine, sentiment_monitor def build_parser(): @@ -43,6 +43,12 @@ def build_parser(): llm.add_argument("--scope", choices=["recommendations", "sentiment", "sentiment-events", "review"], default="recommendations") llm.add_argument("--limit", type=int, default=30) + live_smoke = subparsers.add_parser("live-trading-smoke", help="运行 Binance 合约接口 smoke test") + live_smoke.add_argument("--account-id", type=int, required=True, help="live_trade_accounts.id") + live_smoke.add_argument("--symbol", default="BTC/USDT", help="测试交易对") + live_smoke.add_argument("--notional-usdt", type=float, default=10.0, help="测试名义金额,默认 10U") + live_smoke.add_argument("--leverage", type=float, default=1.0, help="测试杠杆,默认 1x") + return parser @@ -100,6 +106,13 @@ def main(): result = llm_insights.run(scope=args.scope, limit=args.limit) print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2)) return result + if args.command == "live-trading-smoke": + return live_trading_smoke.main( + account_id=args.account_id, + symbol=args.symbol, + notional_usdt=args.notional_usdt, + leverage=args.leverage, + ) parser.error(f"unknown command: {args.command}") diff --git a/app/config/system_config.py b/app/config/system_config.py index 4a2c88e..95964a5 100644 --- a/app/config/system_config.py +++ b/app/config/system_config.py @@ -10,6 +10,7 @@ from app.db.runtime_config_db import ( get_email_config, get_event_driven_config, get_llm_config, + get_live_trading_config, get_monitoring_config, get_notification_config, get_onchain_config, @@ -132,6 +133,32 @@ def default_paper_trading_config(): } +def default_live_trading_config(): + return { + "enabled": _env_bool("ALPHAX_LIVE_TRADING_ENABLED", False), + "execution_mode": _env_str("ALPHAX_LIVE_TRADING_EXECUTION_MODE", "exchange_api"), + "require_human_approval": _env_bool("ALPHAX_LIVE_TRADING_REQUIRE_HUMAN_APPROVAL", True), + "exchange": _env_str("ALPHAX_LIVE_TRADING_EXCHANGE", "binance"), + "market_type": _env_str("ALPHAX_LIVE_TRADING_MARKET_TYPE", "um_futures"), + "testnet": _env_bool("ALPHAX_LIVE_TRADING_TESTNET", True), + "sandbox_mode": _env_str("ALPHAX_LIVE_TRADING_SANDBOX_MODE", "demo"), + "account_code": _env_str("ALPHAX_LIVE_TRADING_ACCOUNT_CODE", "binance_um_futures_main"), + "api_key_env": _env_str("ALPHAX_BINANCE_API_KEY_ENV", "ALPHAX_BINANCE_API_KEY"), + "api_secret_env": _env_str("ALPHAX_BINANCE_API_SECRET_ENV", "ALPHAX_BINANCE_API_SECRET"), + "supported_exchanges": ["binance"], + "supported_market_types": ["spot", "um_futures"], + "default_leverage": _env_float("ALPHAX_LIVE_TRADING_DEFAULT_LEVERAGE", 1), + "risk": { + "max_order_margin_usdt": _env_float("ALPHAX_LIVE_TRADING_MAX_ORDER_MARGIN_USDT", 10), + "max_order_notional_usdt": _env_float("ALPHAX_LIVE_TRADING_MAX_ORDER_NOTIONAL_USDT", 50), + "max_symbol_leverage": _env_float("ALPHAX_LIVE_TRADING_MAX_SYMBOL_LEVERAGE", 1), + "max_cumulative_leverage": _env_float("ALPHAX_LIVE_TRADING_MAX_CUMULATIVE_LEVERAGE", 1), + "max_daily_order_count": _env_int("ALPHAX_LIVE_TRADING_MAX_DAILY_ORDER_COUNT", 5), + "allowed_symbols": _env_list("ALPHAX_LIVE_TRADING_ALLOWED_SYMBOLS", []), + }, + } + + def default_price_streamer_config(): return { "enabled": _env_bool("ALPHAX_PRICE_STREAMER_ENABLED", True), @@ -319,6 +346,7 @@ def seed_runtime_system_defaults(): "llm": (default_llm_config(), "LLM provider and module switches; API key remains in env"), "onchain": (default_onchain_config(), "On-chain provider and signal thresholds; API keys remain in env"), "paper_trading": (default_paper_trading_config(), "Paper trading account and execution model"), + "live_trading": (default_live_trading_config(), "Live trading exchange, account and risk settings; API secrets remain in env"), "price_streamer": (default_price_streamer_config(), "Realtime websocket price streamer settings"), "sentiment": (default_sentiment_config(), "Sentiment monitoring settings"), "event_driven": (default_event_driven_config(), "Event/news driven screening settings"), @@ -358,6 +386,14 @@ def paper_trading_config(): return deep_merge(default_paper_trading_config(), cfg or {}) +def live_trading_config(): + cfg = get_live_trading_config(default=None) + if cfg is None: + _seed_one("live_trading", default_live_trading_config(), "Live trading exchange, account and risk settings; API secrets remain in env") + cfg = get_live_trading_config(default=None) + return deep_merge(default_live_trading_config(), cfg or {}) + + def price_streamer_config(): cfg = get_price_streamer_config(default=None) if cfg is None: @@ -428,6 +464,7 @@ __all__ = [ "default_email_config", "default_event_driven_config", "default_llm_config", + "default_live_trading_config", "default_monitoring_config", "default_notification_config", "default_onchain_config", @@ -438,6 +475,7 @@ __all__ = [ "email_config", "event_driven_config", "llm_config", + "live_trading_config", "monitoring_config", "notification_config", "onchain_config", diff --git a/app/db/live_trading.py b/app/db/live_trading.py new file mode 100644 index 0000000..1e212fc --- /dev/null +++ b/app/db/live_trading.py @@ -0,0 +1,449 @@ +"""Live trading account, risk and execution audit helpers.""" + +from __future__ import annotations + +import json +from datetime import datetime + +from app.config.system_config import live_trading_config +from app.db.schema import get_conn + + +def _now() -> str: + return datetime.now().isoformat() + + +def _loads(value, fallback=None): + try: + if isinstance(value, str) and value.strip(): + return json.loads(value) + if isinstance(value, (dict, list)): + return value + except Exception: + pass + return fallback if fallback is not None else {} + + +def _dumps(value) -> str: + return json.dumps(value if value is not None else {}, ensure_ascii=False, sort_keys=True, default=str) + + +def _safe_float(value, default: float = 0.0) -> float: + try: + if value is None or value == "": + return default + return float(value) + except Exception: + return default + + +def _safe_int(value, default: int = 0) -> int: + try: + return int(value or 0) + except Exception: + return default + + +def _normalize_symbol(symbol: str) -> str: + value = str(symbol or "").strip().upper() + if value and "/" not in value and value.endswith("USDT"): + value = value[:-4] + "/USDT" + return value + + +def _deep_merge(base: dict, override: dict) -> dict: + merged = dict(base or {}) + for key, value in (override or {}).items(): + if isinstance(value, dict) and isinstance(merged.get(key), dict): + merged[key] = _deep_merge(merged[key], value) + else: + merged[key] = value + return merged + + +def _row(row) -> dict: + if not row: + return {} + item = dict(row) + for key in ("permissions_json", "risk_config_json", "risk_check_json", "request_json", "response_json", "payload_json"): + if key in item: + item[key.replace("_json", "")] = _loads(item.pop(key), {}) + for key in ("testnet", "reduce_only"): + if key in item: + item[key] = bool(item[key]) + return item + + +def get_effective_live_trading_config() -> dict: + return live_trading_config() + + +def upsert_live_account( + account_code: str = "", + *, + exchange: str = "", + market_type: str = "", + testnet: bool | None = None, + status: str = "", + api_key_env: str = "", + api_secret_env: str = "", + permissions: dict | None = None, + risk_config: dict | None = None, +) -> dict: + cfg = get_effective_live_trading_config() + now = _now() + account_code = account_code or str(cfg.get("account_code") or "binance_um_futures") + exchange = exchange or str(cfg.get("exchange") or "binance") + market_type = market_type or str(cfg.get("market_type") or "um_futures") + if testnet is None: + testnet = bool(cfg.get("testnet", True)) + status = status or ("enabled" if bool(cfg.get("enabled")) else "disabled") + api_key_env = api_key_env or str(cfg.get("api_key_env") or "ALPHAX_BINANCE_API_KEY") + api_secret_env = api_secret_env or str(cfg.get("api_secret_env") or "ALPHAX_BINANCE_API_SECRET") + permissions = permissions if isinstance(permissions, dict) else {"trade": False, "read": True} + risk_config = risk_config if isinstance(risk_config, dict) else cfg.get("risk", {}) + + conn = get_conn() + try: + row = conn.execute( + """ + INSERT INTO live_trade_accounts ( + account_code, exchange, market_type, testnet, status, + api_key_env, api_secret_env, permissions_json, risk_config_json, + created_at, updated_at + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT(account_code) DO UPDATE SET + exchange=excluded.exchange, + market_type=excluded.market_type, + testnet=excluded.testnet, + status=excluded.status, + api_key_env=excluded.api_key_env, + api_secret_env=excluded.api_secret_env, + permissions_json=excluded.permissions_json, + risk_config_json=excluded.risk_config_json, + updated_at=excluded.updated_at + RETURNING * + """, + ( + account_code, + exchange, + market_type, + int(bool(testnet)), + status, + api_key_env, + api_secret_env, + _dumps(permissions), + _dumps(risk_config), + now, + now, + ), + ).fetchone() + conn.commit() + finally: + conn.close() + return _row(row) + + +def list_live_accounts() -> dict: + conn = get_conn() + try: + rows = conn.execute("SELECT * FROM live_trade_accounts ORDER BY updated_at DESC, id DESC").fetchall() + finally: + conn.close() + return {"items": [_row(r) for r in rows], "total": len(rows)} + + +def get_live_account(account_id: int) -> dict: + account_id = _safe_int(account_id) + if account_id <= 0: + return {} + conn = get_conn() + try: + row = conn.execute("SELECT * FROM live_trade_accounts WHERE id=%s", (account_id,)).fetchone() + finally: + conn.close() + return _row(row) + + +def delete_live_account(account_id: int) -> dict: + account_id = _safe_int(account_id) + if account_id <= 0: + return {"ok": False, "reason": "invalid_account_id"} + conn = get_conn() + try: + row = conn.execute("DELETE FROM live_trade_accounts WHERE id=%s RETURNING *", (account_id,)).fetchone() + conn.commit() + finally: + conn.close() + if not row: + return {"ok": False, "reason": "account_not_found"} + return {"ok": True, "account": _row(row)} + + +def list_enabled_live_accounts() -> list[dict]: + conn = get_conn() + try: + rows = conn.execute( + "SELECT * FROM live_trade_accounts WHERE status='enabled' ORDER BY id" + ).fetchall() + finally: + conn.close() + return [_row(r) for r in rows] + + +def _config_for_account(account: dict | None = None) -> dict: + cfg = get_effective_live_trading_config() + if account: + account_risk = account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {} + cfg = _deep_merge(cfg, { + "exchange": account.get("exchange") or cfg.get("exchange"), + "market_type": account.get("market_type") or cfg.get("market_type"), + "testnet": account.get("testnet", cfg.get("testnet")), + "sandbox_mode": account_risk.get("sandbox_mode") or cfg.get("sandbox_mode"), + "risk": _deep_merge(cfg.get("risk") or {}, account_risk), + }) + return cfg + + +def _risk_settings(cfg: dict) -> dict: + risk = cfg.get("risk") if isinstance(cfg.get("risk"), dict) else {} + max_symbol_leverage = _safe_float(risk.get("max_symbol_leverage"), _safe_float(cfg.get("max_symbol_leverage"), 1)) + max_order_margin = _safe_float(risk.get("max_order_margin_usdt"), _safe_float(cfg.get("max_order_margin_usdt"), 0)) + max_order_notional = _safe_float(risk.get("max_order_notional_usdt"), _safe_float(cfg.get("max_order_notional_usdt"), 0)) + if max_order_notional <= 0 and max_order_margin > 0: + max_order_notional = max_order_margin * max(1.0, max_symbol_leverage) + return { + "max_order_margin_usdt": max_order_margin, + "max_order_notional_usdt": max_order_notional, + "max_symbol_leverage": max_symbol_leverage, + "max_cumulative_leverage": _safe_float(risk.get("max_cumulative_leverage"), _safe_float(cfg.get("max_cumulative_leverage"), 1)), + "max_daily_order_count": _safe_int(risk.get("max_daily_order_count"), _safe_int(cfg.get("max_daily_order_count"), 0)), + "allowed_symbols": [str(x).upper() for x in (risk.get("allowed_symbols") or cfg.get("allowed_symbols") or []) if str(x).strip()], + } + + +def _risk_check(payload: dict, cfg: dict, account: dict | None = None) -> tuple[str, str, dict]: + symbol = _normalize_symbol(payload.get("symbol")) + notional = _safe_float(payload.get("notional_usdt")) + leverage = _safe_float(payload.get("leverage"), _safe_float(cfg.get("default_leverage"), 1)) + risk = _risk_settings(cfg) + allowed_symbols = risk["allowed_symbols"] + max_notional = risk["max_order_notional_usdt"] + max_margin = risk["max_order_margin_usdt"] + max_leverage = risk["max_symbol_leverage"] + margin = notional / leverage if leverage > 0 else notional + checks = { + "enabled": bool(cfg.get("enabled")), + "execution_mode": cfg.get("execution_mode", "exchange_api"), + "require_human_approval": bool(cfg.get("require_human_approval", True)), + "account_id": _safe_int((account or {}).get("id")), + "account_code": (account or {}).get("account_code", ""), + "symbol": symbol, + "notional_usdt": notional, + "margin_usdt": margin, + "max_order_margin_usdt": max_margin, + "max_order_notional_usdt": max_notional, + "leverage": leverage, + "max_symbol_leverage": max_leverage, + "max_cumulative_leverage": risk["max_cumulative_leverage"], + "allowed_symbols": allowed_symbols, + } + if not symbol: + return "blocked", "missing_symbol", checks + if not bool(cfg.get("enabled")): + return "blocked", "live_trading_disabled", checks + if account and account.get("status") != "enabled": + return "blocked", "account_disabled", checks + if allowed_symbols and symbol not in allowed_symbols: + return "blocked", "symbol_not_allowed", checks + if max_margin > 0 and margin > max_margin: + return "blocked", "margin_exceeds_limit", checks + if max_notional > 0 and notional > max_notional: + return "blocked", "notional_exceeds_limit", checks + if max_leverage > 0 and leverage > max_leverage: + return "blocked", "leverage_exceeds_limit", checks + if bool(cfg.get("require_human_approval", True)): + return "pending_approval", "waiting_human_approval", checks + mode = str(cfg.get("execution_mode") or "exchange_api").strip().lower() + if mode in ("exchange_api", "demo"): + return "prepared", "exchange_ready_for_executor", checks + return "prepared", "ready_for_executor", checks + + +def create_live_order_intent(payload: dict, *, source_type: str = "manual", source_id: int = 0) -> dict: + account = get_live_account(_safe_int(payload.get("account_id"))) + cfg = _config_for_account(account) + now = _now() + symbol = _normalize_symbol(payload.get("symbol")) + side = str(payload.get("side") or "long").strip().lower() + if side not in ("long", "short"): + side = "long" + status, reason, risk = _risk_check({**payload, "symbol": symbol, "side": side}, cfg, account) + conn = get_conn() + try: + row = conn.execute( + """ + INSERT INTO live_order_intents ( + source_type, source_id, recommendation_id, paper_trade_id, paper_order_id, + account_id, exchange, market_type, symbol, side, position_side, order_type, + status, reason, quantity, price, stop_loss, take_profit, notional_usdt, + leverage, reduce_only, client_order_id, risk_check_json, request_json, + created_at, updated_at + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + RETURNING * + """, + ( + source_type, + _safe_int(source_id), + _safe_int(payload.get("recommendation_id")), + _safe_int(payload.get("paper_trade_id")), + _safe_int(payload.get("paper_order_id")), + _safe_int(payload.get("account_id")), + str(account.get("exchange") or cfg.get("exchange") or payload.get("exchange") or "binance"), + str(account.get("market_type") or cfg.get("market_type") or payload.get("market_type") or "um_futures"), + symbol, + side, + side, + str(payload.get("order_type") or "market").lower(), + status, + reason, + _safe_float(payload.get("quantity")), + _safe_float(payload.get("price")), + _safe_float(payload.get("stop_loss")), + _safe_float(payload.get("take_profit")), + _safe_float(payload.get("notional_usdt")), + _safe_float(payload.get("leverage"), _safe_float(cfg.get("default_leverage"), 1)), + int(bool(payload.get("reduce_only"))), + str(payload.get("client_order_id") or ""), + _dumps(risk), + _dumps(payload), + now, + now, + ), + ).fetchone() + conn.execute( + """ + INSERT INTO live_order_events (intent_id, event_type, status, message, payload_json, event_time) + VALUES (%s,%s,%s,%s,%s,%s) + """, + (row["id"], "intent_created", status, reason, _dumps(risk), now), + ) + conn.commit() + finally: + conn.close() + return _row(row) + + +def create_live_order_intents_for_accounts(payload: dict, account_ids: list[int] | None = None, *, source_type: str = "manual", source_id: int = 0) -> dict: + accounts = list_enabled_live_accounts() + selected = {_safe_int(x) for x in (account_ids or []) if _safe_int(x) > 0} + if selected: + accounts = [a for a in accounts if _safe_int(a.get("id")) in selected] + if not accounts: + return {"ok": False, "reason": "no_enabled_accounts", "items": []} + items = [] + for account in accounts: + items.append(create_live_order_intent({**payload, "account_id": account["id"]}, source_type=source_type, source_id=source_id)) + return {"ok": True, "items": items, "total": len(items)} + + +def list_live_order_intents(limit: int = 50, offset: int = 0, status: str = "", account_id: int = 0) -> dict: + limit = max(1, min(_safe_int(limit, 50), 200)) + offset = max(0, _safe_int(offset)) + params: list = [] + clauses: list[str] = [] + if status: + clauses.append("status=%s") + params.append(status) + if _safe_int(account_id) > 0: + clauses.append("account_id=%s") + params.append(_safe_int(account_id)) + where = f"WHERE {' AND '.join(clauses)}" if clauses else "" + conn = get_conn() + try: + total = conn.execute(f"SELECT COUNT(*) FROM live_order_intents {where}", tuple(params)).fetchone()[0] + rows = conn.execute( + f"SELECT * FROM live_order_intents {where} ORDER BY updated_at DESC, id DESC LIMIT %s OFFSET %s", + tuple(params + [limit, offset]), + ).fetchall() + finally: + conn.close() + return {"items": [_row(r) for r in rows], "total": total, "limit": limit, "offset": offset} + + +def list_live_order_events(limit: int = 80, offset: int = 0, intent_id: int = 0) -> dict: + limit = max(1, min(_safe_int(limit, 80), 200)) + offset = max(0, _safe_int(offset)) + params: list = [] + where = "" + if _safe_int(intent_id) > 0: + where = "WHERE intent_id=%s" + params.append(_safe_int(intent_id)) + conn = get_conn() + try: + total = conn.execute(f"SELECT COUNT(*) FROM live_order_events {where}", tuple(params)).fetchone()[0] + rows = conn.execute( + f"SELECT * FROM live_order_events {where} ORDER BY event_time DESC, id DESC LIMIT %s OFFSET %s", + tuple(params + [limit, offset]), + ).fetchall() + finally: + conn.close() + return {"items": [_row(r) for r in rows], "total": total, "limit": limit, "offset": offset} + + +def prepare_intent_from_paper_trade(paper_trade_id: int, account_ids: list[int] | None = None) -> dict: + conn = get_conn() + try: + trade = conn.execute("SELECT * FROM paper_trades WHERE id=%s", (_safe_int(paper_trade_id),)).fetchone() + finally: + conn.close() + if not trade: + return {"ok": False, "reason": "paper_trade_not_found"} + payload = { + "symbol": trade["symbol"], + "side": trade.get("side") or "long", + "order_type": "market", + "price": _safe_float(trade.get("entry_price")), + "stop_loss": _safe_float(trade.get("stop_loss")), + "take_profit": _safe_float(trade.get("tp1")), + "notional_usdt": _safe_float(trade.get("notional_usdt")), + "leverage": _safe_float(trade.get("leverage"), 1), + "recommendation_id": _safe_int(trade.get("recommendation_id")), + "paper_trade_id": _safe_int(trade.get("id")), + } + result = create_live_order_intents_for_accounts(payload, account_ids=account_ids, source_type="paper_trade", source_id=_safe_int(trade.get("id"))) + return result if result.get("ok") else {"ok": False, "reason": result.get("reason", "intent_create_failed"), "items": result.get("items", [])} + + +def get_live_trading_summary() -> dict: + cfg = get_effective_live_trading_config() + risk = _risk_settings(cfg) + conn = get_conn() + try: + status_rows = conn.execute( + "SELECT status, COUNT(*) AS count FROM live_order_intents GROUP BY status ORDER BY status" + ).fetchall() + latest_rows = conn.execute( + "SELECT * FROM live_order_intents ORDER BY updated_at DESC, id DESC LIMIT 8" + ).fetchall() + account_count = conn.execute("SELECT COUNT(*) FROM live_trade_accounts").fetchone()[0] + finally: + conn.close() + return { + "enabled": bool(cfg.get("enabled")), + "execution_mode": cfg.get("execution_mode", "exchange_api"), + "exchange": cfg.get("exchange", "binance"), + "market_type": cfg.get("market_type", "um_futures"), + "testnet": bool(cfg.get("testnet", True)), + "require_human_approval": bool(cfg.get("require_human_approval", True)), + "max_order_margin_usdt": risk["max_order_margin_usdt"], + "max_order_notional_usdt": risk["max_order_notional_usdt"], + "max_symbol_leverage": risk["max_symbol_leverage"], + "max_cumulative_leverage": risk["max_cumulative_leverage"], + "max_daily_order_count": risk["max_daily_order_count"], + "account_count": account_count, + "intent_status": {r["status"]: r["count"] for r in status_rows}, + "latest_intents": [_row(r) for r in latest_rows], + } diff --git a/app/db/migrations/0012_live_trading.sql b/app/db/migrations/0012_live_trading.sql new file mode 100644 index 0000000..0416b30 --- /dev/null +++ b/app/db/migrations/0012_live_trading.sql @@ -0,0 +1,71 @@ +CREATE TABLE IF NOT EXISTS live_trade_accounts ( + id BIGSERIAL PRIMARY KEY, + account_code TEXT NOT NULL UNIQUE, + exchange TEXT NOT NULL DEFAULT 'binance', + market_type TEXT NOT NULL DEFAULT 'um_futures', + testnet INTEGER NOT NULL DEFAULT 1, + status TEXT NOT NULL DEFAULT 'disabled', + api_key_env TEXT DEFAULT '', + api_secret_env TEXT DEFAULT '', + permissions_json TEXT DEFAULT '{}', + risk_config_json TEXT DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + last_checked_at TEXT DEFAULT '' +); + +CREATE INDEX IF NOT EXISTS idx_live_trade_accounts_status ON live_trade_accounts(status, updated_at DESC); + +CREATE TABLE IF NOT EXISTS live_order_intents ( + id BIGSERIAL PRIMARY KEY, + source_type TEXT NOT NULL DEFAULT 'manual', + source_id BIGINT DEFAULT 0, + recommendation_id BIGINT DEFAULT 0, + paper_trade_id BIGINT DEFAULT 0, + paper_order_id BIGINT DEFAULT 0, + account_id BIGINT DEFAULT 0, + exchange TEXT NOT NULL DEFAULT 'binance', + market_type TEXT NOT NULL DEFAULT 'um_futures', + symbol TEXT NOT NULL, + side TEXT NOT NULL DEFAULT 'long', + position_side TEXT NOT NULL DEFAULT 'long', + order_type TEXT NOT NULL DEFAULT 'market', + status TEXT NOT NULL DEFAULT 'blocked', + reason TEXT DEFAULT '', + quantity DOUBLE PRECISION DEFAULT 0, + price DOUBLE PRECISION DEFAULT 0, + stop_loss DOUBLE PRECISION DEFAULT 0, + take_profit DOUBLE PRECISION DEFAULT 0, + notional_usdt DOUBLE PRECISION DEFAULT 0, + leverage DOUBLE PRECISION DEFAULT 1, + reduce_only INTEGER NOT NULL DEFAULT 0, + client_order_id TEXT DEFAULT '', + exchange_order_id TEXT DEFAULT '', + risk_check_json TEXT DEFAULT '{}', + request_json TEXT DEFAULT '{}', + response_json TEXT DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + approved_at TEXT DEFAULT '', + submitted_at TEXT DEFAULT '', + finished_at TEXT DEFAULT '' +); + +CREATE INDEX IF NOT EXISTS idx_live_order_intents_status_updated ON live_order_intents(status, updated_at DESC); +CREATE INDEX IF NOT EXISTS idx_live_order_intents_symbol_status ON live_order_intents(symbol, status); +CREATE INDEX IF NOT EXISTS idx_live_order_intents_source ON live_order_intents(source_type, source_id); +CREATE INDEX IF NOT EXISTS idx_live_order_intents_recommendation ON live_order_intents(recommendation_id); +CREATE INDEX IF NOT EXISTS idx_live_order_intents_paper_trade ON live_order_intents(paper_trade_id); + +CREATE TABLE IF NOT EXISTS live_order_events ( + id BIGSERIAL PRIMARY KEY, + intent_id BIGINT NOT NULL, + event_type TEXT NOT NULL, + status TEXT DEFAULT '', + message TEXT DEFAULT '', + payload_json TEXT DEFAULT '{}', + event_time TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_live_order_events_intent_time ON live_order_events(intent_id, event_time DESC); +CREATE INDEX IF NOT EXISTS idx_live_order_events_type_time ON live_order_events(event_type, event_time DESC); diff --git a/app/db/runtime_config_db.py b/app/db/runtime_config_db.py index 264bd1e..c8280d6 100644 --- a/app/db/runtime_config_db.py +++ b/app/db/runtime_config_db.py @@ -221,6 +221,14 @@ def set_paper_trading_config(value, updated_by="", source="manual"): return set_config("system", "paper_trading", value, description="Paper trading account and execution model", source=source, updated_by=updated_by) +def get_live_trading_config(default=None): + return get_config("system", "live_trading", default=default) + + +def set_live_trading_config(value, updated_by="", source="manual"): + return set_config("system", "live_trading", value, description="Live trading exchange, account and risk settings; API secrets stay in env", source=source, updated_by=updated_by) + + def get_price_streamer_config(default=None): return get_config("system", "price_streamer", default=default) diff --git a/app/integrations/binance_live.py b/app/integrations/binance_live.py new file mode 100644 index 0000000..f5ac3cb --- /dev/null +++ b/app/integrations/binance_live.py @@ -0,0 +1,179 @@ +"""Binance live-trading adapter. + +Only transport details live here. Business safety checks and audit logging stay +in the live_trading DB/service layer. +""" + +from __future__ import annotations + +import os +import hashlib +import hmac +from dataclasses import dataclass +from urllib.parse import urlencode + +import requests + +import ccxt + + +class LiveTradingConfigError(RuntimeError): + pass + + +@dataclass +class BinanceLiveClient: + exchange: object + market_type: str = "um_futures" + + def load_markets(self): + return self.exchange.load_markets() + + def fetch_balance(self): + return self.exchange.fetch_balance() + + def fetch_ticker(self, symbol: str): + return self.exchange.fetch_ticker(symbol) + + def fetch_positions(self, symbols: list[str] | None = None): + if hasattr(self.exchange, "fetch_positions"): + return self.exchange.fetch_positions(symbols) + return [] + + def fetch_open_orders(self, symbol: str | None = None): + return self.exchange.fetch_open_orders(symbol) + + def fetch_orders(self, symbol: str | None = None, limit: int = 30): + if hasattr(self.exchange, "fetch_orders"): + return self.exchange.fetch_orders(symbol, None, limit) + return [] + + def set_leverage(self, symbol: str, leverage: float): + if hasattr(self.exchange, "set_leverage"): + return self.exchange.set_leverage(int(leverage), symbol) + return {"skipped": True, "reason": "exchange_does_not_support_set_leverage"} + + def amount_to_precision(self, symbol: str, amount: float) -> float: + try: + return float(self.exchange.amount_to_precision(symbol, amount)) + except Exception: + return float(amount) + + def price_to_precision(self, symbol: str, price: float) -> str: + try: + return str(self.exchange.price_to_precision(symbol, price)) + except Exception: + return str(price) + + def min_notional(self, symbol: str) -> float: + try: + market = self.exchange.market(symbol) + limits = market.get("limits") if isinstance(market, dict) else {} + cost = limits.get("cost") if isinstance(limits.get("cost"), dict) else {} + value = cost.get("min") + return float(value or 0) + except Exception: + return 0.0 + + def create_market_order(self, symbol: str, side: str, amount: float, params: dict | None = None): + return self.exchange.create_order(symbol, "market", side, amount, None, params or {}) + + def create_limit_order(self, symbol: str, side: str, amount: float, price: float, params: dict | None = None): + return self.exchange.create_order(symbol, "limit", side, amount, price, params or {}) + + def cancel_order(self, order_id: str, symbol: str): + return self.exchange.cancel_order(order_id, symbol) + + def _market_id(self, symbol: str) -> str: + try: + return str(self.exchange.market(symbol).get("id") or symbol.replace("/", "")) + except Exception: + return symbol.replace("/", "") + + def _signed_fapi_request(self, method: str, path: str, params: dict) -> dict: + timestamp = int(self.exchange.milliseconds()) if hasattr(self.exchange, "milliseconds") else 0 + payload = {**(params or {}), "timestamp": timestamp} + query = urlencode(payload, doseq=True) + secret = str(getattr(self.exchange, "secret", "") or "") + signature = hmac.new(secret.encode("utf-8"), query.encode("utf-8"), hashlib.sha256).hexdigest() + signed_query = f"{query}&signature={signature}" + urls = getattr(self.exchange, "urls", {}) if isinstance(getattr(self.exchange, "urls", {}), dict) else {} + api_urls = urls.get("api") if isinstance(urls.get("api"), dict) else {} + base_url = str(api_urls.get("fapiPrivate") or "https://fapi.binance.com/fapi/v1").rstrip("/") + url = f"{base_url}{path}" + headers = {"X-MBX-APIKEY": str(getattr(self.exchange, "apiKey", "") or "")} + response = requests.request(method.upper(), url, params=signed_query, headers=headers, timeout=15) + try: + data = response.json() + except Exception: + data = {"raw": response.text} + if response.status_code >= 400 or (isinstance(data, dict) and int(data.get("code", 0) or 0) < 0): + raise ccxt.ExchangeError(f"binanceusdm {data}") + return data + + def _create_algo_order(self, symbol: str, side: str, order_type: str, amount: float, trigger_price: float, params: dict | None = None): + merged = { + "algoType": "CONDITIONAL", + "symbol": self._market_id(symbol), + "side": side.upper(), + "type": order_type, + "quantity": self.exchange.amount_to_precision(symbol, amount), + "triggerPrice": self.price_to_precision(symbol, trigger_price), + "workingType": "CONTRACT_PRICE", + "reduceOnly": "true", + **(params or {}), + } + merged.pop("stopPrice", None) + return self._signed_fapi_request("POST", "/algoOrder", merged) + + def create_stop_loss_order(self, symbol: str, side: str, amount: float, stop_price: float, params: dict | None = None): + return self._create_algo_order(symbol, side, "STOP_MARKET", amount, stop_price, params) + + def create_take_profit_order(self, symbol: str, side: str, amount: float, stop_price: float, params: dict | None = None): + return self._create_algo_order(symbol, side, "TAKE_PROFIT_MARKET", amount, stop_price, params) + + def cancel_algo_order(self, *, algo_id: str | int | None = None, client_algo_id: str | None = None): + params: dict = {} + if algo_id: + params["algoId"] = algo_id + if client_algo_id: + params["clientAlgoId"] = client_algo_id + if not params: + raise ValueError("algo_id or client_algo_id is required") + return self._signed_fapi_request("DELETE", "/algoOrder", params) + + +def build_binance_client(account: dict, *, require_testnet: bool = True) -> BinanceLiveClient: + market_type = str(account.get("market_type") or "um_futures") + testnet = bool(account.get("testnet", True)) + risk_config = account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {} + sandbox_mode = str(risk_config.get("sandbox_mode") or os.getenv("ALPHAX_LIVE_TRADING_SANDBOX_MODE", "demo")).strip().lower() + if require_testnet and not testnet: + raise LiveTradingConfigError("mainnet execution is not allowed by this smoke tester") + + api_key_env = str(account.get("api_key_env") or "ALPHAX_BINANCE_API_KEY") + api_secret_env = str(account.get("api_secret_env") or "ALPHAX_BINANCE_API_SECRET") + api_key = os.getenv(api_key_env, "").strip() + api_secret = os.getenv(api_secret_env, "").strip() + if not api_key or not api_secret: + raise LiveTradingConfigError(f"missing Binance credentials env: {api_key_env}/{api_secret_env}") + + klass = ccxt.binanceusdm if market_type == "um_futures" else ccxt.binance + exchange = klass({ + "apiKey": api_key, + "secret": api_secret, + "enableRateLimit": True, + "options": { + "defaultType": "future" if market_type == "um_futures" else "spot", + "fetchCurrencies": False, + "warnOnFetchOpenOrdersWithoutSymbol": False, + }, + }) + if testnet and sandbox_mode == "demo" and isinstance(getattr(exchange, "urls", None), dict) and exchange.urls.get("demo"): + exchange.urls["api"] = exchange.urls["demo"] + elif hasattr(exchange, "set_sandbox_mode"): + exchange.set_sandbox_mode(testnet) + return BinanceLiveClient(exchange=exchange, market_type=market_type) + + +__all__ = ["BinanceLiveClient", "LiveTradingConfigError", "build_binance_client"] diff --git a/app/services/live_trading_account.py b/app/services/live_trading_account.py new file mode 100644 index 0000000..b60f1dc --- /dev/null +++ b/app/services/live_trading_account.py @@ -0,0 +1,121 @@ +"""Account-centric read model for live trading console.""" + +from __future__ import annotations + +from app.db.live_trading import _safe_float, get_live_account, list_live_order_events, list_live_order_intents +from app.integrations.binance_live import LiveTradingConfigError, build_binance_client + + +def _compact_balance(balance: dict) -> dict: + total = balance.get("total") if isinstance(balance.get("total"), dict) else {} + free = balance.get("free") if isinstance(balance.get("free"), dict) else {} + used = balance.get("used") if isinstance(balance.get("used"), dict) else {} + assets = [] + for asset in sorted(set(total) | set(free) | set(used)): + total_value = _safe_float(total.get(asset)) + free_value = _safe_float(free.get(asset)) + used_value = _safe_float(used.get(asset)) + if abs(total_value) > 0 or abs(free_value) > 0 or abs(used_value) > 0: + assets.append({"asset": asset, "free": free_value, "used": used_value, "total": total_value}) + return { + "assets": assets, + "usdt": { + "free": _safe_float(free.get("USDT")), + "used": _safe_float(used.get("USDT")), + "total": _safe_float(total.get("USDT")), + }, + } + + +def _compact_position(item: dict) -> dict: + info = item.get("info") if isinstance(item.get("info"), dict) else {} + contracts = _safe_float(item.get("contracts") or info.get("positionAmt")) + notional = _safe_float(item.get("notional") or info.get("notional")) + return { + "symbol": item.get("symbol") or info.get("symbol"), + "side": item.get("side") or ("long" if contracts > 0 else ("short" if contracts < 0 else "")), + "contracts": contracts, + "entry_price": _safe_float(item.get("entryPrice") or info.get("entryPrice")), + "mark_price": _safe_float(item.get("markPrice") or info.get("markPrice")), + "notional": notional, + "unrealized_pnl": _safe_float(item.get("unrealizedPnl") or info.get("unrealizedProfit")), + "leverage": _safe_float(item.get("leverage") or info.get("leverage")), + } + + +def _compact_order(item: dict) -> dict: + info = item.get("info") if isinstance(item.get("info"), dict) else {} + return { + "id": str(item.get("id") or info.get("orderId") or ""), + "client_order_id": item.get("clientOrderId") or info.get("clientOrderId") or "", + "symbol": item.get("symbol") or info.get("symbol"), + "type": item.get("type") or info.get("type"), + "side": item.get("side") or info.get("side"), + "status": item.get("status") or info.get("status"), + "price": _safe_float(item.get("price") or info.get("price")), + "amount": _safe_float(item.get("amount") or info.get("origQty")), + "filled": _safe_float(item.get("filled") or info.get("executedQty")), + "average": _safe_float(item.get("average") or info.get("avgPrice")), + "timestamp": item.get("datetime") or item.get("timestamp") or info.get("updateTime") or info.get("time"), + } + + +def _account_risk_view(account: dict) -> dict: + risk = account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {} + allowed = [str(x).strip().upper() for x in risk.get("allowed_symbols", []) if str(x).strip()] + max_leverage = _safe_float(risk.get("max_symbol_leverage"), 1) + margin = _safe_float(risk.get("max_order_margin_usdt"), 0) + return { + "max_order_margin_usdt": margin, + "max_symbol_leverage": max_leverage, + "max_order_notional_usdt": _safe_float(risk.get("max_order_notional_usdt"), margin * max(1.0, max_leverage)), + "max_cumulative_leverage": _safe_float(risk.get("max_cumulative_leverage"), 1), + "max_daily_order_count": int(risk.get("max_daily_order_count") or 0), + "allowed_symbols": allowed, + "symbol_policy": "all" if not allowed else "allowlist", + } + + +def get_live_account_overview(account_id: int, *, history_limit: int = 30) -> dict: + account = get_live_account(account_id) + if not account: + raise LiveTradingConfigError("live account not found") + overview = { + "account": account, + "risk": _account_risk_view(account), + "balance": {"assets": [], "usdt": {"free": 0, "used": 0, "total": 0}}, + "positions": [], + "open_orders": [], + "order_history": [], + "intent_history": list_live_order_intents(limit=history_limit, account_id=account_id).get("items", []), + "events": list_live_order_events(limit=history_limit).get("items", []), + "errors": [], + } + if account.get("status") != "enabled": + return overview + try: + client = build_binance_client(account, require_testnet=True) + client.load_markets() + except Exception as exc: + overview["errors"].append(f"账户连接失败:{exc}") + return overview + try: + overview["balance"] = _compact_balance(client.fetch_balance()) + except Exception as exc: + overview["errors"].append(f"余额读取失败:{exc}") + try: + overview["positions"] = [ + item for item in (_compact_position(p) for p in client.fetch_positions(None)) + if abs(_safe_float(item.get("contracts"))) > 0 + ] + except Exception as exc: + overview["errors"].append(f"持仓读取失败:{exc}") + try: + overview["open_orders"] = [_compact_order(o) for o in client.fetch_open_orders(None)] + except Exception as exc: + overview["errors"].append(f"挂单读取失败:{exc}") + try: + overview["order_history"] = [_compact_order(o) for o in client.fetch_orders(None, limit=history_limit)] + except Exception as exc: + overview["errors"].append(f"订单历史读取失败:{exc}") + return overview diff --git a/app/services/live_trading_smoke.py b/app/services/live_trading_smoke.py new file mode 100644 index 0000000..21554b3 --- /dev/null +++ b/app/services/live_trading_smoke.py @@ -0,0 +1,220 @@ +"""End-to-end smoke tests for exchange execution interfaces.""" + +from __future__ import annotations + +import json +from datetime import datetime + +from app.db.live_trading import _dumps, _safe_float, get_live_account +from app.db.schema import get_conn, init_db +from app.integrations.binance_live import LiveTradingConfigError, build_binance_client + + +def _now() -> str: + return datetime.now().isoformat() + + +def _normalize_symbol(symbol: str) -> str: + value = str(symbol or "").strip().upper() + if value and "/" not in value and value.endswith("USDT"): + value = value[:-4] + "/USDT" + return value + + +def _record_event(intent_id: int, event_type: str, status: str, message: str = "", payload=None) -> None: + conn = get_conn() + try: + conn.execute( + """ + INSERT INTO live_order_events (intent_id, event_type, status, message, payload_json, event_time) + VALUES (%s,%s,%s,%s,%s,%s) + """, + (int(intent_id or 0), event_type, status, message, _dumps(payload or {}), _now()), + ) + conn.commit() + finally: + conn.close() + + +def _side_to_exchange(side: str) -> tuple[str, str]: + side = str(side or "long").lower() + if side == "short": + return "sell", "buy" + return "buy", "sell" + + +def _extract_order_id(order: dict) -> str: + return str((order or {}).get("id") or (order or {}).get("orderId") or "") + + +def _extract_algo_id(order: dict) -> str: + return str((order or {}).get("algoId") or (order or {}).get("clientAlgoId") or "") + + +def _cancel_conditional_order(client, order: dict, symbol: str): + algo_id = (order or {}).get("algoId") + client_algo_id = (order or {}).get("clientAlgoId") + if (algo_id or client_algo_id) and hasattr(client, "cancel_algo_order"): + return client.cancel_algo_order(algo_id=algo_id, client_algo_id=client_algo_id) + order_id = _extract_order_id(order) + if order_id: + return client.cancel_order(order_id, symbol) + return {"skipped": True, "reason": "missing_order_id"} + + +def _test_price(anchor_price: float, side: str, distance_pct: float) -> float: + if side == "buy": + return round(anchor_price * (1 - distance_pct / 100), 8) + return round(anchor_price * (1 + distance_pct / 100), 8) + + +def _compact_result(name: str, result): + if name == "load_markets" and isinstance(result, dict): + return {"markets_loaded": len(result)} + if name == "fetch_balance" and isinstance(result, dict): + total = result.get("total") if isinstance(result.get("total"), dict) else {} + free = result.get("free") if isinstance(result.get("free"), dict) else {} + return { + "USDT": { + "free": _safe_float(free.get("USDT")), + "total": _safe_float(total.get("USDT")), + } + } + if name == "fetch_ticker" and isinstance(result, dict): + return { + "symbol": result.get("symbol"), + "last": result.get("last") or result.get("close"), + "percentage": result.get("percentage"), + } + if name == "fetch_positions" and isinstance(result, list): + return [ + { + "symbol": item.get("symbol"), + "contracts": item.get("contracts"), + "positionAmt": (item.get("info") or {}).get("positionAmt") if isinstance(item, dict) else None, + } + for item in result + ] + if isinstance(result, dict): + return { + k: v + for k, v in result.items() + if k in {"id", "orderId", "algoId", "clientOrderId", "clientAlgoId", "status", "symbol", "side", "type", "orderType", "price", "triggerPrice", "amount", "average", "filled", "code", "msg"} + } or result + return result + + +def _position_amount(position: dict) -> float: + return _safe_float((position or {}).get("contracts") or (position or {}).get("info", {}).get("positionAmt")) + + +def run_binance_testnet_smoke( + *, + account_id: int, + symbol: str = "BTC/USDT", + notional_usdt: float = 10.0, + leverage: float = 1.0, + intent_id: int = 0, + client=None, +) -> dict: + init_db() + account = get_live_account(account_id) + if not account: + raise LiveTradingConfigError("live account not found") + if str(account.get("exchange") or "binance") != "binance": + raise LiveTradingConfigError("only Binance smoke test is implemented") + if not bool(account.get("testnet", True)): + raise LiveTradingConfigError("account is not enabled for the configured exchange endpoint") + + symbol = _normalize_symbol(symbol) + notional_usdt = max(5.0, _safe_float(notional_usdt, 10.0)) + leverage = max(1.0, _safe_float(leverage, 1.0)) + client = client or build_binance_client(account, require_testnet=True) + side = str(account.get("risk_config", {}).get("smoke_side") or "long") + open_side, close_side = _side_to_exchange(side) + steps: list[dict] = [] + + def step(name: str, fn): + try: + result = fn() + compact = _compact_result(name, result) + item = {"step": name, "ok": True, "result": compact} + _record_event(intent_id, f"smoke_{name}", "ok", "", compact) + except Exception as exc: + item = {"step": name, "ok": False, "error": str(exc)} + _record_event(intent_id, f"smoke_{name}", "error", str(exc), {}) + steps.append(item) + raise + steps.append(item) + return result + + step("load_markets", client.load_markets) + step("fetch_balance", client.fetch_balance) + ticker = step("fetch_ticker", lambda: client.fetch_ticker(symbol)) + last = _safe_float((ticker or {}).get("last") or (ticker or {}).get("close")) + if last <= 0: + raise LiveTradingConfigError("ticker price is unavailable") + + min_notional = client.min_notional(symbol) if hasattr(client, "min_notional") else 0.0 + if min_notional > 0 and notional_usdt < min_notional: + raise LiveTradingConfigError( + f"{symbol} minimum notional is {min_notional:g} USDT; current test notional is {notional_usdt:g} USDT" + ) + + amount = client.amount_to_precision(symbol, notional_usdt / last) + if amount <= 0: + raise LiveTradingConfigError("calculated order amount is zero") + + step("set_leverage", lambda: client.set_leverage(symbol, leverage)) + if hasattr(client, "fetch_positions"): + positions = step("fetch_positions", lambda: client.fetch_positions([symbol])) + open_positions = [p for p in positions or [] if abs(_position_amount(p)) > 0] + if open_positions: + raise LiveTradingConfigError("symbol has existing position; close it before smoke test") + market_order = None + close_market = None + try: + market_order = step("market_order", lambda: client.create_market_order(symbol, open_side, amount, {"newClientOrderId": f"alphax_smoke_mkt_{int(datetime.now().timestamp())}"})) + + stop_price = round(last * 0.99, 8) + take_profit_price = round(last * 1.01, 8) + stop_loss = step("stop_loss", lambda: client.create_stop_loss_order(symbol, close_side, amount, stop_price)) + take_profit = step("take_profit", lambda: client.create_take_profit_order(symbol, close_side, amount, take_profit_price)) + + limit_price = _test_price(last, open_side, 5.0) + limit_order = step("limit_order", lambda: client.create_limit_order(symbol, open_side, amount, limit_price, {"timeInForce": "GTC"})) + limit_order_id = _extract_order_id(limit_order) + if limit_order_id: + step("cancel_limit_order", lambda: client.cancel_order(limit_order_id, symbol)) + + for name, order in (("cancel_stop_loss", stop_loss), ("cancel_take_profit", take_profit)): + if _extract_order_id(order) or _extract_algo_id(order): + step(name, lambda item=order: _cancel_conditional_order(client, item, symbol)) + + close_market = step("close_market_order", lambda: client.create_market_order(symbol, close_side, amount, {"reduceOnly": True})) + finally: + if market_order and not close_market: + try: + close_market = step("emergency_close_market_order", lambda: client.create_market_order(symbol, close_side, amount, {"reduceOnly": True})) + except Exception as exc: + _record_event(intent_id, "smoke_emergency_close_failed", "error", str(exc), {}) + summary = { + "ok": all(x["ok"] for x in steps), + "account_id": account_id, + "side": side, + "symbol": symbol, + "notional_usdt": notional_usdt, + "leverage": leverage, + "amount": amount, + "market_order_id": _extract_order_id(market_order), + "close_order_id": _extract_order_id(close_market), + "steps": steps, + } + _record_event(intent_id, "smoke_completed", "ok", "binance_exchange_smoke_completed", summary) + return summary + + +def main(account_id: int, symbol: str = "BTC/USDT", notional_usdt: float = 10.0, leverage: float = 1.0): + result = run_binance_testnet_smoke(account_id=account_id, symbol=symbol, notional_usdt=notional_usdt, leverage=leverage) + print(json.dumps(result, ensure_ascii=False, indent=2, default=str)) + return result diff --git a/app/web/routes_live_trading.py b/app/web/routes_live_trading.py new file mode 100644 index 0000000..29585c3 --- /dev/null +++ b/app/web/routes_live_trading.py @@ -0,0 +1,129 @@ +from fastapi import APIRouter, Body, Cookie, HTTPException + +from app.db.live_trading import ( + create_live_order_intent, + delete_live_account, + get_live_trading_summary, + create_live_order_intents_for_accounts, + list_live_accounts, + list_live_order_events, + list_live_order_intents, + prepare_intent_from_paper_trade, + upsert_live_account, +) +from app.services.live_trading_smoke import run_binance_testnet_smoke +from app.services.live_trading_account import get_live_account_overview +from app.integrations.binance_live import LiveTradingConfigError +from app.web.shared import require_admin + + +router = APIRouter() + + +@router.get("/api/live-trading/summary") +async def api_live_trading_summary(altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + return get_live_trading_summary() + + +@router.get("/api/live-trading/accounts") +async def api_live_trading_accounts(altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + return list_live_accounts() + + +@router.get("/api/live-trading/accounts/{account_id}/overview") +async def api_live_trading_account_overview(account_id: int, altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + return get_live_account_overview(account_id) + + +@router.post("/api/live-trading/accounts") +async def api_live_trading_account(payload: dict = Body(default={}), altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + return upsert_live_account( + account_code=payload.get("account_code", ""), + exchange=payload.get("exchange", ""), + market_type=payload.get("market_type", ""), + testnet=payload.get("testnet") if "testnet" in payload else None, + status=payload.get("status", ""), + api_key_env=payload.get("api_key_env", ""), + api_secret_env=payload.get("api_secret_env", ""), + permissions=payload.get("permissions") if isinstance(payload.get("permissions"), dict) else None, + risk_config=payload.get("risk_config") if isinstance(payload.get("risk_config"), dict) else None, + ) + + +@router.delete("/api/live-trading/accounts/{account_id}") +async def api_live_trading_delete_account(account_id: int, altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + result = delete_live_account(account_id) + if not result.get("ok"): + raise HTTPException(status_code=404, detail=result.get("reason", "account_not_found")) + return result + + +@router.post("/api/live-trading/accounts/default") +async def api_live_trading_default_account(altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + return upsert_live_account() + + +@router.get("/api/live-trading/intents") +async def api_live_trading_intents( + limit: int = 50, + offset: int = 0, + status: str = "", + altcoin_session: str = Cookie(default=""), +): + require_admin(altcoin_session) + return list_live_order_intents(limit=limit, offset=offset, status=status) + + +@router.post("/api/live-trading/intents") +async def api_live_trading_create_intent(payload: dict = Body(default={}), altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + account_ids = payload.pop("account_ids", None) + if isinstance(account_ids, list) and account_ids: + return create_live_order_intents_for_accounts(payload, account_ids=account_ids, source_type="manual") + return create_live_order_intent(payload, source_type="manual") + + +@router.post("/api/live-trading/intents/from-paper/{paper_trade_id}") +async def api_live_trading_from_paper(paper_trade_id: int, payload: dict = Body(default={}), altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + account_ids = payload.get("account_ids") if isinstance(payload, dict) else None + return prepare_intent_from_paper_trade(paper_trade_id, account_ids=account_ids if isinstance(account_ids, list) else None) + + +@router.get("/api/live-trading/events") +async def api_live_trading_events( + limit: int = 80, + offset: int = 0, + intent_id: int = 0, + altcoin_session: str = Cookie(default=""), +): + require_admin(altcoin_session) + return list_live_order_events(limit=limit, offset=offset, intent_id=intent_id) + + +@router.post("/api/live-trading/smoke/binance") +async def api_live_trading_binance_smoke(payload: dict = Body(default={}), altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + try: + return run_binance_testnet_smoke( + account_id=int(payload.get("account_id") or 0), + symbol=str(payload.get("symbol") or "BTC/USDT"), + notional_usdt=float(payload.get("notional_usdt") or 10), + leverage=float(payload.get("leverage") or 1), + intent_id=int(payload.get("intent_id") or 0), + ) + except LiveTradingConfigError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + except Exception as exc: + raise HTTPException(status_code=502, detail=f"exchange smoke failed: {exc}") from exc + + +@router.post("/api/live-trading/smoke/binance-testnet") +async def api_live_trading_binance_testnet_smoke(payload: dict = Body(default={}), altcoin_session: str = Cookie(default="")): + return await api_live_trading_binance_smoke(payload=payload, altcoin_session=altcoin_session) diff --git a/app/web/routes_pages.py b/app/web/routes_pages.py index 8a0dc77..774a5f4 100644 --- a/app/web/routes_pages.py +++ b/app/web/routes_pages.py @@ -140,6 +140,17 @@ def build_router(templates, repo_root: Path, stock_report_template: str): return HTMLResponse(content=f"
{exc.detail}
返回看板", status_code=exc.status_code) return render_page("paper_trading.html", request, active_nav="paper_trading") + @router.get("/live-trading", response_class=HTMLResponse) + async def live_trading_page(request: Request): + user, redirect = require_page_user(request) + if redirect: + return redirect + try: + require_admin(request.cookies.get("altcoin_session", "")) + except HTTPException as exc: + return HTMLResponse(content=f"{exc.detail}
返回看板", status_code=exc.status_code) + return render_page("live_trading.html", request, active_nav="live_trading") + @router.get("/review-center", response_class=HTMLResponse) async def review_center_page(request: Request): user, redirect = require_page_user(request) diff --git a/app/web/web_server.py b/app/web/web_server.py index f4c074a..2a08803 100644 --- a/app/web/web_server.py +++ b/app/web/web_server.py @@ -19,6 +19,7 @@ from app.web.routes_auth import router as auth_router from app.web.routes_chat import router as chat_router from app.web.routes_content import build_router as build_content_router from app.web.routes_market import router as market_router +from app.web.routes_live_trading import router as live_trading_router from app.web.routes_onchain import router as onchain_router from app.web.routes_paper_trading import router as paper_trading_router from app.web.routes_pages import build_router as build_pages_router @@ -52,6 +53,7 @@ app.include_router(review_center_router) app.include_router(strategy_router) app.include_router(onchain_router) app.include_router(paper_trading_router) +app.include_router(live_trading_router) app.include_router(market_router) app.include_router(build_admin_router(templates)) app.include_router(build_content_router(REPO_ROOT)) diff --git a/static/base.html b/static/base.html index 4c377c9..3f14215 100644 --- a/static/base.html +++ b/static/base.html @@ -184,6 +184,7 @@ a { color: inherit; text-decoration: none; } 邀请 策略交易 + 实盘控制台 复盘中心 日志中心 AI 记录 diff --git a/static/live_trading.html b/static/live_trading.html new file mode 100644 index 0000000..cd25e1c --- /dev/null +++ b/static/live_trading.html @@ -0,0 +1,136 @@ +{% extends "base.html" %} +{% block title %}AlphaX Agent — 实盘控制台{% endblock %} +{% block extra_head_css %} + +{% endblock %} +{% block content %} +以交易账号为核心管理 API 环境变量、资金余额、仓位、订单和账号级风控。币种限制留空时表示不限制,策略同步到实盘前会按当前账号配置逐一校验。
+