"""Binance live-trading adapter. Only transport details live here. Business safety checks and audit logging stay in the live_trading DB/service layer. """ from __future__ import annotations import hashlib import hmac import json import os import time from dataclasses import dataclass from pathlib import Path from urllib.parse import urlencode import requests import ccxt class LiveTradingConfigError(RuntimeError): pass def _cache_dir() -> Path: return Path(os.getenv("ALPHAX_EXCHANGE_CACHE_DIR", "/app/data/exchange_cache")) def _markets_cache_ttl_seconds() -> int: try: return max(300, int(os.getenv("ALPHAX_EXCHANGE_INFO_CACHE_SECONDS", "86400") or 86400)) except Exception: return 86400 def _safe_cache_key(value: str) -> str: return "".join(ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in value) @dataclass class BinanceLiveClient: exchange: object market_type: str = "um_futures" def _markets_cache_path(self) -> Path: sandbox = "sandbox" if bool(getattr(self.exchange, "sandbox", False)) else "live" exchange_id = str(getattr(self.exchange, "id", "binance") or "binance") return _cache_dir() / f"{_safe_cache_key(exchange_id)}_{_safe_cache_key(self.market_type)}_{sandbox}_markets.json" def _restore_cached_markets(self, *, allow_stale: bool = False) -> bool: path = self._markets_cache_path() if not path.exists(): return False try: payload = json.loads(path.read_text(encoding="utf-8")) cached_at = float(payload.get("cached_at") or 0) age = time.time() - cached_at if not allow_stale and age > _markets_cache_ttl_seconds(): return False markets = payload.get("markets") or {} markets_by_id = payload.get("markets_by_id") or {} symbols = payload.get("symbols") or sorted(markets.keys()) ids = payload.get("ids") or sorted(markets_by_id.keys()) if not markets: return False self.exchange.markets = markets self.exchange.markets_by_id = markets_by_id self.exchange.symbols = symbols self.exchange.ids = ids self.exchange.markets_loading = None return True except Exception: return False def _store_markets_cache(self, markets) -> None: if not markets: return path = self._markets_cache_path() try: path.parent.mkdir(parents=True, exist_ok=True) payload = { "cached_at": time.time(), "exchange_id": str(getattr(self.exchange, "id", "binance") or "binance"), "market_type": self.market_type, "symbols": list(getattr(self.exchange, "symbols", []) or sorted(markets.keys())), "ids": list(getattr(self.exchange, "ids", []) or []), "markets": markets, "markets_by_id": getattr(self.exchange, "markets_by_id", {}) or {}, } path.write_text(json.dumps(payload, ensure_ascii=False, default=str), encoding="utf-8") except Exception: pass def load_markets(self): if self._restore_cached_markets(): return self.exchange.markets try: markets = self.exchange.load_markets() self._store_markets_cache(markets) return markets except Exception: if self._restore_cached_markets(allow_stale=True): return self.exchange.markets raise def fetch_balance(self): return self.exchange.fetch_balance() def fetch_ticker(self, symbol: str): return self.exchange.fetch_ticker(symbol) def fetch_positions(self, symbols: list[str] | None = None): if hasattr(self.exchange, "fetch_positions"): return self.exchange.fetch_positions(symbols) return [] def fetch_open_orders(self, symbol: str | None = None): return self.exchange.fetch_open_orders(symbol) def fetch_orders(self, symbol: str | None = None, limit: int = 30): if hasattr(self.exchange, "fetch_orders"): return self.exchange.fetch_orders(symbol, None, limit) return [] def set_leverage(self, symbol: str, leverage: float): if hasattr(self.exchange, "set_leverage"): return self.exchange.set_leverage(int(leverage), symbol) return {"skipped": True, "reason": "exchange_does_not_support_set_leverage"} def amount_to_precision(self, symbol: str, amount: float) -> float: try: return float(self.exchange.amount_to_precision(symbol, amount)) except Exception: return float(amount) def price_to_precision(self, symbol: str, price: float) -> str: try: return str(self.exchange.price_to_precision(symbol, price)) except Exception: return str(price) def min_notional(self, symbol: str) -> float: try: market = self.exchange.market(symbol) limits = market.get("limits") if isinstance(market, dict) else {} cost = limits.get("cost") if isinstance(limits.get("cost"), dict) else {} value = cost.get("min") return float(value or 0) except Exception: return 0.0 def create_market_order(self, symbol: str, side: str, amount: float, params: dict | None = None): return self.exchange.create_order(symbol, "market", side, amount, None, params or {}) def create_limit_order(self, symbol: str, side: str, amount: float, price: float, params: dict | None = None): return self.exchange.create_order(symbol, "limit", side, amount, price, params or {}) def cancel_order(self, order_id: str, symbol: str): return self.exchange.cancel_order(order_id, symbol) def _market_id(self, symbol: str) -> str: try: return str(self.exchange.market(symbol).get("id") or symbol.replace("/", "")) except Exception: return symbol.replace("/", "") def _signed_fapi_request(self, method: str, path: str, params: dict) -> dict: timestamp = int(self.exchange.milliseconds()) if hasattr(self.exchange, "milliseconds") else 0 payload = {**(params or {}), "timestamp": timestamp} query = urlencode(payload, doseq=True) secret = str(getattr(self.exchange, "secret", "") or "") signature = hmac.new(secret.encode("utf-8"), query.encode("utf-8"), hashlib.sha256).hexdigest() signed_query = f"{query}&signature={signature}" urls = getattr(self.exchange, "urls", {}) if isinstance(getattr(self.exchange, "urls", {}), dict) else {} api_urls = urls.get("api") if isinstance(urls.get("api"), dict) else {} base_url = str(api_urls.get("fapiPrivate") or "https://fapi.binance.com/fapi/v1").rstrip("/") url = f"{base_url}{path}" headers = {"X-MBX-APIKEY": str(getattr(self.exchange, "apiKey", "") or "")} response = requests.request(method.upper(), url, params=signed_query, headers=headers, timeout=15) try: data = response.json() except Exception: data = {"raw": response.text} if response.status_code >= 400 or (isinstance(data, dict) and int(data.get("code", 0) or 0) < 0): raise ccxt.ExchangeError(f"binanceusdm {data}") return data def _create_algo_order(self, symbol: str, side: str, order_type: str, amount: float, trigger_price: float, params: dict | None = None): merged = { "algoType": "CONDITIONAL", "symbol": self._market_id(symbol), "side": side.upper(), "type": order_type, "quantity": self.exchange.amount_to_precision(symbol, amount), "triggerPrice": self.price_to_precision(symbol, trigger_price), "workingType": "CONTRACT_PRICE", "reduceOnly": "true", **(params or {}), } merged.pop("stopPrice", None) return self._signed_fapi_request("POST", "/algoOrder", merged) def create_stop_loss_order(self, symbol: str, side: str, amount: float, stop_price: float, params: dict | None = None): return self._create_algo_order(symbol, side, "STOP_MARKET", amount, stop_price, params) def create_take_profit_order(self, symbol: str, side: str, amount: float, stop_price: float, params: dict | None = None): return self._create_algo_order(symbol, side, "TAKE_PROFIT_MARKET", amount, stop_price, params) def cancel_algo_order(self, *, algo_id: str | int | None = None, client_algo_id: str | None = None): params: dict = {} if algo_id: params["algoId"] = algo_id if client_algo_id: params["clientAlgoId"] = client_algo_id if not params: raise ValueError("algo_id or client_algo_id is required") return self._signed_fapi_request("DELETE", "/algoOrder", params) def build_binance_client(account: dict, *, require_testnet: bool = True) -> BinanceLiveClient: market_type = str(account.get("market_type") or "um_futures") testnet = bool(account.get("testnet", True)) risk_config = account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {} sandbox_mode = str(risk_config.get("sandbox_mode") or os.getenv("ALPHAX_LIVE_TRADING_SANDBOX_MODE", "demo")).strip().lower() if require_testnet and not testnet: raise LiveTradingConfigError("mainnet execution is not allowed by this smoke tester") api_key_env = str(account.get("api_key_env") or "ALPHAX_BINANCE_API_KEY") api_secret_env = str(account.get("api_secret_env") or "ALPHAX_BINANCE_API_SECRET") api_key = os.getenv(api_key_env, "").strip() api_secret = os.getenv(api_secret_env, "").strip() if not api_key or not api_secret: raise LiveTradingConfigError(f"missing Binance credentials env: {api_key_env}/{api_secret_env}") klass = ccxt.binanceusdm if market_type == "um_futures" else ccxt.binance exchange = klass({ "apiKey": api_key, "secret": api_secret, "enableRateLimit": True, "options": { "defaultType": "future" if market_type == "um_futures" else "spot", "fetchCurrencies": False, "warnOnFetchOpenOrdersWithoutSymbol": False, }, }) if testnet and sandbox_mode == "demo" and isinstance(getattr(exchange, "urls", None), dict) and exchange.urls.get("demo"): exchange.urls["api"] = exchange.urls["demo"] elif hasattr(exchange, "set_sandbox_mode"): exchange.set_sandbox_mode(testnet) return BinanceLiveClient(exchange=exchange, market_type=market_type) __all__ = ["BinanceLiveClient", "LiveTradingConfigError", "build_binance_client"]