alphax/app/integrations/binance_live.py
2026-05-24 20:44:22 +08:00

256 lines
11 KiB
Python

"""Binance live-trading adapter.
Only transport details live here. Business safety checks and audit logging stay
in the live_trading DB/service layer.
"""
from __future__ import annotations
import hashlib
import hmac
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urlencode
import requests
import ccxt
class LiveTradingConfigError(RuntimeError):
pass
def _cache_dir() -> Path:
return Path(os.getenv("ALPHAX_EXCHANGE_CACHE_DIR", "/app/data/exchange_cache"))
def _markets_cache_ttl_seconds() -> int:
try:
return max(300, int(os.getenv("ALPHAX_EXCHANGE_INFO_CACHE_SECONDS", "86400") or 86400))
except Exception:
return 86400
def _safe_cache_key(value: str) -> str:
return "".join(ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in value)
@dataclass
class BinanceLiveClient:
exchange: object
market_type: str = "um_futures"
def _markets_cache_path(self) -> Path:
sandbox = "sandbox" if bool(getattr(self.exchange, "sandbox", False)) else "live"
exchange_id = str(getattr(self.exchange, "id", "binance") or "binance")
return _cache_dir() / f"{_safe_cache_key(exchange_id)}_{_safe_cache_key(self.market_type)}_{sandbox}_markets.json"
def _restore_cached_markets(self, *, allow_stale: bool = False) -> bool:
path = self._markets_cache_path()
if not path.exists():
return False
try:
payload = json.loads(path.read_text(encoding="utf-8"))
cached_at = float(payload.get("cached_at") or 0)
age = time.time() - cached_at
if not allow_stale and age > _markets_cache_ttl_seconds():
return False
markets = payload.get("markets") or {}
markets_by_id = payload.get("markets_by_id") or {}
symbols = payload.get("symbols") or sorted(markets.keys())
ids = payload.get("ids") or sorted(markets_by_id.keys())
if not markets:
return False
self.exchange.markets = markets
self.exchange.markets_by_id = markets_by_id
self.exchange.symbols = symbols
self.exchange.ids = ids
self.exchange.markets_loading = None
return True
except Exception:
return False
def _store_markets_cache(self, markets) -> None:
if not markets:
return
path = self._markets_cache_path()
try:
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"cached_at": time.time(),
"exchange_id": str(getattr(self.exchange, "id", "binance") or "binance"),
"market_type": self.market_type,
"symbols": list(getattr(self.exchange, "symbols", []) or sorted(markets.keys())),
"ids": list(getattr(self.exchange, "ids", []) or []),
"markets": markets,
"markets_by_id": getattr(self.exchange, "markets_by_id", {}) or {},
}
path.write_text(json.dumps(payload, ensure_ascii=False, default=str), encoding="utf-8")
except Exception:
pass
def load_markets(self):
if self._restore_cached_markets():
return self.exchange.markets
try:
markets = self.exchange.load_markets()
self._store_markets_cache(markets)
return markets
except Exception:
if self._restore_cached_markets(allow_stale=True):
return self.exchange.markets
raise
def fetch_balance(self):
return self.exchange.fetch_balance()
def fetch_ticker(self, symbol: str):
return self.exchange.fetch_ticker(symbol)
def fetch_positions(self, symbols: list[str] | None = None):
if hasattr(self.exchange, "fetch_positions"):
return self.exchange.fetch_positions(symbols)
return []
def fetch_open_orders(self, symbol: str | None = None):
return self.exchange.fetch_open_orders(symbol)
def fetch_orders(self, symbol: str | None = None, limit: int = 30):
if hasattr(self.exchange, "fetch_orders"):
return self.exchange.fetch_orders(symbol, None, limit)
return []
def set_leverage(self, symbol: str, leverage: float):
if hasattr(self.exchange, "set_leverage"):
return self.exchange.set_leverage(int(leverage), symbol)
return {"skipped": True, "reason": "exchange_does_not_support_set_leverage"}
def amount_to_precision(self, symbol: str, amount: float) -> float:
try:
return float(self.exchange.amount_to_precision(symbol, amount))
except Exception:
return float(amount)
def price_to_precision(self, symbol: str, price: float) -> str:
try:
return str(self.exchange.price_to_precision(symbol, price))
except Exception:
return str(price)
def min_notional(self, symbol: str) -> float:
try:
market = self.exchange.market(symbol)
limits = market.get("limits") if isinstance(market, dict) else {}
cost = limits.get("cost") if isinstance(limits.get("cost"), dict) else {}
value = cost.get("min")
return float(value or 0)
except Exception:
return 0.0
def create_market_order(self, symbol: str, side: str, amount: float, params: dict | None = None):
return self.exchange.create_order(symbol, "market", side, amount, None, params or {})
def create_limit_order(self, symbol: str, side: str, amount: float, price: float, params: dict | None = None):
return self.exchange.create_order(symbol, "limit", side, amount, price, params or {})
def cancel_order(self, order_id: str, symbol: str):
return self.exchange.cancel_order(order_id, symbol)
def _market_id(self, symbol: str) -> str:
try:
return str(self.exchange.market(symbol).get("id") or symbol.replace("/", ""))
except Exception:
return symbol.replace("/", "")
def _signed_fapi_request(self, method: str, path: str, params: dict) -> dict:
timestamp = int(self.exchange.milliseconds()) if hasattr(self.exchange, "milliseconds") else 0
payload = {**(params or {}), "timestamp": timestamp}
query = urlencode(payload, doseq=True)
secret = str(getattr(self.exchange, "secret", "") or "")
signature = hmac.new(secret.encode("utf-8"), query.encode("utf-8"), hashlib.sha256).hexdigest()
signed_query = f"{query}&signature={signature}"
urls = getattr(self.exchange, "urls", {}) if isinstance(getattr(self.exchange, "urls", {}), dict) else {}
api_urls = urls.get("api") if isinstance(urls.get("api"), dict) else {}
base_url = str(api_urls.get("fapiPrivate") or "https://fapi.binance.com/fapi/v1").rstrip("/")
url = f"{base_url}{path}"
headers = {"X-MBX-APIKEY": str(getattr(self.exchange, "apiKey", "") or "")}
response = requests.request(method.upper(), url, params=signed_query, headers=headers, timeout=15)
try:
data = response.json()
except Exception:
data = {"raw": response.text}
if response.status_code >= 400 or (isinstance(data, dict) and int(data.get("code", 0) or 0) < 0):
raise ccxt.ExchangeError(f"binanceusdm {data}")
return data
def _create_algo_order(self, symbol: str, side: str, order_type: str, amount: float, trigger_price: float, params: dict | None = None):
merged = {
"algoType": "CONDITIONAL",
"symbol": self._market_id(symbol),
"side": side.upper(),
"type": order_type,
"quantity": self.exchange.amount_to_precision(symbol, amount),
"triggerPrice": self.price_to_precision(symbol, trigger_price),
"workingType": "CONTRACT_PRICE",
"reduceOnly": "true",
**(params or {}),
}
merged.pop("stopPrice", None)
return self._signed_fapi_request("POST", "/algoOrder", merged)
def create_stop_loss_order(self, symbol: str, side: str, amount: float, stop_price: float, params: dict | None = None):
return self._create_algo_order(symbol, side, "STOP_MARKET", amount, stop_price, params)
def create_take_profit_order(self, symbol: str, side: str, amount: float, stop_price: float, params: dict | None = None):
return self._create_algo_order(symbol, side, "TAKE_PROFIT_MARKET", amount, stop_price, params)
def cancel_algo_order(self, *, algo_id: str | int | None = None, client_algo_id: str | None = None):
params: dict = {}
if algo_id:
params["algoId"] = algo_id
if client_algo_id:
params["clientAlgoId"] = client_algo_id
if not params:
raise ValueError("algo_id or client_algo_id is required")
return self._signed_fapi_request("DELETE", "/algoOrder", params)
def build_binance_client(account: dict, *, require_testnet: bool = True) -> BinanceLiveClient:
market_type = str(account.get("market_type") or "um_futures")
testnet = bool(account.get("testnet", True))
risk_config = account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {}
sandbox_mode = str(risk_config.get("sandbox_mode") or os.getenv("ALPHAX_LIVE_TRADING_SANDBOX_MODE", "demo")).strip().lower()
if require_testnet and not testnet:
raise LiveTradingConfigError("mainnet execution is not allowed by this smoke tester")
api_key_env = str(account.get("api_key_env") or "ALPHAX_BINANCE_API_KEY")
api_secret_env = str(account.get("api_secret_env") or "ALPHAX_BINANCE_API_SECRET")
api_key = os.getenv(api_key_env, "").strip()
api_secret = os.getenv(api_secret_env, "").strip()
if not api_key or not api_secret:
raise LiveTradingConfigError(f"missing Binance credentials env: {api_key_env}/{api_secret_env}")
klass = ccxt.binanceusdm if market_type == "um_futures" else ccxt.binance
exchange = klass({
"apiKey": api_key,
"secret": api_secret,
"enableRateLimit": True,
"options": {
"defaultType": "future" if market_type == "um_futures" else "spot",
"fetchCurrencies": False,
"warnOnFetchOpenOrdersWithoutSymbol": False,
},
})
if testnet and sandbox_mode == "demo" and isinstance(getattr(exchange, "urls", None), dict) and exchange.urls.get("demo"):
exchange.urls["api"] = exchange.urls["demo"]
elif hasattr(exchange, "set_sandbox_mode"):
exchange.set_sandbox_mode(testnet)
return BinanceLiveClient(exchange=exchange, market_type=market_type)
__all__ = ["BinanceLiveClient", "LiveTradingConfigError", "build_binance_client"]