alphax/app/services/live_trading_smoke.py
2026-05-22 23:17:37 +08:00

221 lines
9.0 KiB
Python

"""End-to-end smoke tests for exchange execution interfaces."""
from __future__ import annotations
import json
from datetime import datetime
from app.db.live_trading import _dumps, _safe_float, get_live_account
from app.db.schema import get_conn, init_db
from app.integrations.binance_live import LiveTradingConfigError, build_binance_client
def _now() -> str:
return datetime.now().isoformat()
def _normalize_symbol(symbol: str) -> str:
value = str(symbol or "").strip().upper()
if value and "/" not in value and value.endswith("USDT"):
value = value[:-4] + "/USDT"
return value
def _record_event(intent_id: int, event_type: str, status: str, message: str = "", payload=None) -> None:
conn = get_conn()
try:
conn.execute(
"""
INSERT INTO live_order_events (intent_id, event_type, status, message, payload_json, event_time)
VALUES (%s,%s,%s,%s,%s,%s)
""",
(int(intent_id or 0), event_type, status, message, _dumps(payload or {}), _now()),
)
conn.commit()
finally:
conn.close()
def _side_to_exchange(side: str) -> tuple[str, str]:
side = str(side or "long").lower()
if side == "short":
return "sell", "buy"
return "buy", "sell"
def _extract_order_id(order: dict) -> str:
return str((order or {}).get("id") or (order or {}).get("orderId") or "")
def _extract_algo_id(order: dict) -> str:
return str((order or {}).get("algoId") or (order or {}).get("clientAlgoId") or "")
def _cancel_conditional_order(client, order: dict, symbol: str):
algo_id = (order or {}).get("algoId")
client_algo_id = (order or {}).get("clientAlgoId")
if (algo_id or client_algo_id) and hasattr(client, "cancel_algo_order"):
return client.cancel_algo_order(algo_id=algo_id, client_algo_id=client_algo_id)
order_id = _extract_order_id(order)
if order_id:
return client.cancel_order(order_id, symbol)
return {"skipped": True, "reason": "missing_order_id"}
def _test_price(anchor_price: float, side: str, distance_pct: float) -> float:
if side == "buy":
return round(anchor_price * (1 - distance_pct / 100), 8)
return round(anchor_price * (1 + distance_pct / 100), 8)
def _compact_result(name: str, result):
if name == "load_markets" and isinstance(result, dict):
return {"markets_loaded": len(result)}
if name == "fetch_balance" and isinstance(result, dict):
total = result.get("total") if isinstance(result.get("total"), dict) else {}
free = result.get("free") if isinstance(result.get("free"), dict) else {}
return {
"USDT": {
"free": _safe_float(free.get("USDT")),
"total": _safe_float(total.get("USDT")),
}
}
if name == "fetch_ticker" and isinstance(result, dict):
return {
"symbol": result.get("symbol"),
"last": result.get("last") or result.get("close"),
"percentage": result.get("percentage"),
}
if name == "fetch_positions" and isinstance(result, list):
return [
{
"symbol": item.get("symbol"),
"contracts": item.get("contracts"),
"positionAmt": (item.get("info") or {}).get("positionAmt") if isinstance(item, dict) else None,
}
for item in result
]
if isinstance(result, dict):
return {
k: v
for k, v in result.items()
if k in {"id", "orderId", "algoId", "clientOrderId", "clientAlgoId", "status", "symbol", "side", "type", "orderType", "price", "triggerPrice", "amount", "average", "filled", "code", "msg"}
} or result
return result
def _position_amount(position: dict) -> float:
return _safe_float((position or {}).get("contracts") or (position or {}).get("info", {}).get("positionAmt"))
def run_binance_testnet_smoke(
*,
account_id: int,
symbol: str = "BTC/USDT",
notional_usdt: float = 10.0,
leverage: float = 1.0,
intent_id: int = 0,
client=None,
) -> dict:
init_db()
account = get_live_account(account_id)
if not account:
raise LiveTradingConfigError("live account not found")
if str(account.get("exchange") or "binance") != "binance":
raise LiveTradingConfigError("only Binance smoke test is implemented")
if not bool(account.get("testnet", True)):
raise LiveTradingConfigError("account is not enabled for the configured exchange endpoint")
symbol = _normalize_symbol(symbol)
notional_usdt = max(5.0, _safe_float(notional_usdt, 10.0))
leverage = max(1.0, _safe_float(leverage, 1.0))
client = client or build_binance_client(account, require_testnet=True)
side = str(account.get("risk_config", {}).get("smoke_side") or "long")
open_side, close_side = _side_to_exchange(side)
steps: list[dict] = []
def step(name: str, fn):
try:
result = fn()
compact = _compact_result(name, result)
item = {"step": name, "ok": True, "result": compact}
_record_event(intent_id, f"smoke_{name}", "ok", "", compact)
except Exception as exc:
item = {"step": name, "ok": False, "error": str(exc)}
_record_event(intent_id, f"smoke_{name}", "error", str(exc), {})
steps.append(item)
raise
steps.append(item)
return result
step("load_markets", client.load_markets)
step("fetch_balance", client.fetch_balance)
ticker = step("fetch_ticker", lambda: client.fetch_ticker(symbol))
last = _safe_float((ticker or {}).get("last") or (ticker or {}).get("close"))
if last <= 0:
raise LiveTradingConfigError("ticker price is unavailable")
min_notional = client.min_notional(symbol) if hasattr(client, "min_notional") else 0.0
if min_notional > 0 and notional_usdt < min_notional:
raise LiveTradingConfigError(
f"{symbol} minimum notional is {min_notional:g} USDT; current test notional is {notional_usdt:g} USDT"
)
amount = client.amount_to_precision(symbol, notional_usdt / last)
if amount <= 0:
raise LiveTradingConfigError("calculated order amount is zero")
step("set_leverage", lambda: client.set_leverage(symbol, leverage))
if hasattr(client, "fetch_positions"):
positions = step("fetch_positions", lambda: client.fetch_positions([symbol]))
open_positions = [p for p in positions or [] if abs(_position_amount(p)) > 0]
if open_positions:
raise LiveTradingConfigError("symbol has existing position; close it before smoke test")
market_order = None
close_market = None
try:
market_order = step("market_order", lambda: client.create_market_order(symbol, open_side, amount, {"newClientOrderId": f"alphax_smoke_mkt_{int(datetime.now().timestamp())}"}))
stop_price = round(last * 0.99, 8)
take_profit_price = round(last * 1.01, 8)
stop_loss = step("stop_loss", lambda: client.create_stop_loss_order(symbol, close_side, amount, stop_price))
take_profit = step("take_profit", lambda: client.create_take_profit_order(symbol, close_side, amount, take_profit_price))
limit_price = _test_price(last, open_side, 5.0)
limit_order = step("limit_order", lambda: client.create_limit_order(symbol, open_side, amount, limit_price, {"timeInForce": "GTC"}))
limit_order_id = _extract_order_id(limit_order)
if limit_order_id:
step("cancel_limit_order", lambda: client.cancel_order(limit_order_id, symbol))
for name, order in (("cancel_stop_loss", stop_loss), ("cancel_take_profit", take_profit)):
if _extract_order_id(order) or _extract_algo_id(order):
step(name, lambda item=order: _cancel_conditional_order(client, item, symbol))
close_market = step("close_market_order", lambda: client.create_market_order(symbol, close_side, amount, {"reduceOnly": True}))
finally:
if market_order and not close_market:
try:
close_market = step("emergency_close_market_order", lambda: client.create_market_order(symbol, close_side, amount, {"reduceOnly": True}))
except Exception as exc:
_record_event(intent_id, "smoke_emergency_close_failed", "error", str(exc), {})
summary = {
"ok": all(x["ok"] for x in steps),
"account_id": account_id,
"side": side,
"symbol": symbol,
"notional_usdt": notional_usdt,
"leverage": leverage,
"amount": amount,
"market_order_id": _extract_order_id(market_order),
"close_order_id": _extract_order_id(close_market),
"steps": steps,
}
_record_event(intent_id, "smoke_completed", "ok", "binance_exchange_smoke_completed", summary)
return summary
def main(account_id: int, symbol: str = "BTC/USDT", notional_usdt: float = 10.0, leverage: float = 1.0):
result = run_binance_testnet_smoke(account_id=account_id, symbol=symbol, notional_usdt=notional_usdt, leverage=leverage)
print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
return result