"""End-to-end smoke tests for exchange execution interfaces.""" from __future__ import annotations import json from datetime import datetime from app.db.live_trading import _dumps, _safe_float, get_live_account from app.db.schema import get_conn, init_db from app.integrations.binance_live import LiveTradingConfigError, build_binance_client def _now() -> str: return datetime.now().isoformat() def _normalize_symbol(symbol: str) -> str: value = str(symbol or "").strip().upper() if value and "/" not in value and value.endswith("USDT"): value = value[:-4] + "/USDT" return value def _record_event(intent_id: int, event_type: str, status: str, message: str = "", payload=None) -> None: conn = get_conn() try: conn.execute( """ INSERT INTO live_order_events (intent_id, event_type, status, message, payload_json, event_time) VALUES (%s,%s,%s,%s,%s,%s) """, (int(intent_id or 0), event_type, status, message, _dumps(payload or {}), _now()), ) conn.commit() finally: conn.close() def _side_to_exchange(side: str) -> tuple[str, str]: side = str(side or "long").lower() if side == "short": return "sell", "buy" return "buy", "sell" def _extract_order_id(order: dict) -> str: return str((order or {}).get("id") or (order or {}).get("orderId") or "") def _extract_algo_id(order: dict) -> str: return str((order or {}).get("algoId") or (order or {}).get("clientAlgoId") or "") def _cancel_conditional_order(client, order: dict, symbol: str): algo_id = (order or {}).get("algoId") client_algo_id = (order or {}).get("clientAlgoId") if (algo_id or client_algo_id) and hasattr(client, "cancel_algo_order"): return client.cancel_algo_order(algo_id=algo_id, client_algo_id=client_algo_id) order_id = _extract_order_id(order) if order_id: return client.cancel_order(order_id, symbol) return {"skipped": True, "reason": "missing_order_id"} def _test_price(anchor_price: float, side: str, distance_pct: float) -> float: if side == "buy": return round(anchor_price * (1 - distance_pct / 100), 8) return round(anchor_price * (1 + distance_pct / 100), 8) def _compact_result(name: str, result): if name == "load_markets" and isinstance(result, dict): return {"markets_loaded": len(result)} if name == "fetch_balance" and isinstance(result, dict): total = result.get("total") if isinstance(result.get("total"), dict) else {} free = result.get("free") if isinstance(result.get("free"), dict) else {} return { "USDT": { "free": _safe_float(free.get("USDT")), "total": _safe_float(total.get("USDT")), } } if name == "fetch_ticker" and isinstance(result, dict): return { "symbol": result.get("symbol"), "last": result.get("last") or result.get("close"), "percentage": result.get("percentage"), } if name == "fetch_positions" and isinstance(result, list): return [ { "symbol": item.get("symbol"), "contracts": item.get("contracts"), "positionAmt": (item.get("info") or {}).get("positionAmt") if isinstance(item, dict) else None, } for item in result ] if isinstance(result, dict): return { k: v for k, v in result.items() if k in {"id", "orderId", "algoId", "clientOrderId", "clientAlgoId", "status", "symbol", "side", "type", "orderType", "price", "triggerPrice", "amount", "average", "filled", "code", "msg"} } or result return result def _position_amount(position: dict) -> float: return _safe_float((position or {}).get("contracts") or (position or {}).get("info", {}).get("positionAmt")) def run_binance_testnet_smoke( *, account_id: int, symbol: str = "BTC/USDT", notional_usdt: float = 10.0, leverage: float = 1.0, intent_id: int = 0, client=None, ) -> dict: init_db() account = get_live_account(account_id) if not account: raise LiveTradingConfigError("live account not found") if str(account.get("exchange") or "binance") != "binance": raise LiveTradingConfigError("only Binance smoke test is implemented") if not bool(account.get("testnet", True)): raise LiveTradingConfigError("account is not enabled for the configured exchange endpoint") symbol = _normalize_symbol(symbol) notional_usdt = max(5.0, _safe_float(notional_usdt, 10.0)) leverage = max(1.0, _safe_float(leverage, 1.0)) client = client or build_binance_client(account, require_testnet=True) side = str(account.get("risk_config", {}).get("smoke_side") or "long") open_side, close_side = _side_to_exchange(side) steps: list[dict] = [] def step(name: str, fn): try: result = fn() compact = _compact_result(name, result) item = {"step": name, "ok": True, "result": compact} _record_event(intent_id, f"smoke_{name}", "ok", "", compact) except Exception as exc: item = {"step": name, "ok": False, "error": str(exc)} _record_event(intent_id, f"smoke_{name}", "error", str(exc), {}) steps.append(item) raise steps.append(item) return result step("load_markets", client.load_markets) step("fetch_balance", client.fetch_balance) ticker = step("fetch_ticker", lambda: client.fetch_ticker(symbol)) last = _safe_float((ticker or {}).get("last") or (ticker or {}).get("close")) if last <= 0: raise LiveTradingConfigError("ticker price is unavailable") min_notional = client.min_notional(symbol) if hasattr(client, "min_notional") else 0.0 if min_notional > 0 and notional_usdt < min_notional: raise LiveTradingConfigError( f"{symbol} minimum notional is {min_notional:g} USDT; current test notional is {notional_usdt:g} USDT" ) amount = client.amount_to_precision(symbol, notional_usdt / last) if amount <= 0: raise LiveTradingConfigError("calculated order amount is zero") step("set_leverage", lambda: client.set_leverage(symbol, leverage)) if hasattr(client, "fetch_positions"): positions = step("fetch_positions", lambda: client.fetch_positions([symbol])) open_positions = [p for p in positions or [] if abs(_position_amount(p)) > 0] if open_positions: raise LiveTradingConfigError("symbol has existing position; close it before smoke test") market_order = None close_market = None try: market_order = step("market_order", lambda: client.create_market_order(symbol, open_side, amount, {"newClientOrderId": f"alphax_smoke_mkt_{int(datetime.now().timestamp())}"})) stop_price = round(last * 0.99, 8) take_profit_price = round(last * 1.01, 8) stop_loss = step("stop_loss", lambda: client.create_stop_loss_order(symbol, close_side, amount, stop_price)) take_profit = step("take_profit", lambda: client.create_take_profit_order(symbol, close_side, amount, take_profit_price)) limit_price = _test_price(last, open_side, 5.0) limit_order = step("limit_order", lambda: client.create_limit_order(symbol, open_side, amount, limit_price, {"timeInForce": "GTC"})) limit_order_id = _extract_order_id(limit_order) if limit_order_id: step("cancel_limit_order", lambda: client.cancel_order(limit_order_id, symbol)) for name, order in (("cancel_stop_loss", stop_loss), ("cancel_take_profit", take_profit)): if _extract_order_id(order) or _extract_algo_id(order): step(name, lambda item=order: _cancel_conditional_order(client, item, symbol)) close_market = step("close_market_order", lambda: client.create_market_order(symbol, close_side, amount, {"reduceOnly": True})) finally: if market_order and not close_market: try: close_market = step("emergency_close_market_order", lambda: client.create_market_order(symbol, close_side, amount, {"reduceOnly": True})) except Exception as exc: _record_event(intent_id, "smoke_emergency_close_failed", "error", str(exc), {}) summary = { "ok": all(x["ok"] for x in steps), "account_id": account_id, "side": side, "symbol": symbol, "notional_usdt": notional_usdt, "leverage": leverage, "amount": amount, "market_order_id": _extract_order_id(market_order), "close_order_id": _extract_order_id(close_market), "steps": steps, } _record_event(intent_id, "smoke_completed", "ok", "binance_exchange_smoke_completed", summary) return summary def main(account_id: int, symbol: str = "BTC/USDT", notional_usdt: float = 10.0, leverage: float = 1.0): result = run_binance_testnet_smoke(account_id=account_id, symbol=symbol, notional_usdt=notional_usdt, leverage=leverage) print(json.dumps(result, ensure_ascii=False, indent=2, default=str)) return result