"""Synchronize strategy-trading ledger entries to configured live accounts.""" from __future__ import annotations from datetime import datetime from app.db.live_trading import ( _safe_float, _safe_int, create_live_order_intent, get_live_account, get_live_order_intent, list_enabled_live_accounts, record_live_order_event, update_live_order_intent, _row, ) from app.config.system_config import live_trading_config from app.db.schema import get_conn, init_db from app.integrations.binance_live import LiveTradingConfigError, build_binance_client from app.services.live_trading_account import sync_live_account_snapshots def _now() -> str: return datetime.now().isoformat() def _side_to_exchange(side: str) -> tuple[str, str]: side = str(side or "long").lower() if side == "short": return "sell", "buy" return "buy", "sell" def _paper_trade(paper_trade_id: int) -> dict: conn = get_conn() try: row = conn.execute("SELECT * FROM paper_trades WHERE id=%s", (_safe_int(paper_trade_id),)).fetchone() finally: conn.close() return _row(row) if row else {} def _existing_intent_for_paper_trade(paper_trade_id: int, account_id: int) -> dict: conn = get_conn() try: row = conn.execute( """ SELECT * FROM live_order_intents WHERE paper_trade_id=%s AND account_id=%s AND source_type='paper_trade_sync' AND status NOT IN ('blocked','error') ORDER BY id DESC LIMIT 1 """, (_safe_int(paper_trade_id), _safe_int(account_id)), ).fetchone() finally: conn.close() return dict(row) if row else {} def _open_unsynced_paper_trades(limit: int = 20) -> list[dict]: conn = get_conn() try: rows = conn.execute( """ SELECT pt.* FROM paper_trades pt WHERE pt.status='open' ORDER BY pt.opened_at DESC, pt.id DESC LIMIT %s """, (max(1, min(_safe_int(limit, 20), 100)),), ).fetchall() finally: conn.close() return [dict(r) for r in rows] def _risk_for_account(account: dict) -> dict: return account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {} def _live_sizing(paper_trade: dict, account: dict) -> dict: risk = _risk_for_account(account) paper_leverage = max(1.0, _safe_float(paper_trade.get("leverage"), 1)) max_leverage = max(1.0, _safe_float(risk.get("max_symbol_leverage"), paper_leverage)) leverage = min(paper_leverage, max_leverage) max_margin = _safe_float(risk.get("max_order_margin_usdt"), 0) paper_notional = _safe_float(paper_trade.get("notional_usdt")) if max_margin > 0: notional = min(paper_notional, max_margin * leverage) if paper_notional > 0 else max_margin * leverage else: notional = paper_notional return { "notional_usdt": round(max(0.0, notional), 8), "leverage": leverage, "paper_notional_usdt": paper_notional, "sizing_mode": "account_risk_cap" if max_margin > 0 else "paper_notional", } def _position_notional(position: dict) -> float: info = position.get("info") if isinstance(position.get("info"), dict) else {} return abs(_safe_float(position.get("notional") or info.get("notional"))) def _order_identifier(order: dict | None) -> dict: data = order if isinstance(order, dict) else {} return { "id": str(data.get("id") or data.get("orderId") or data.get("algoId") or ""), "algo_id": str(data.get("algoId") or data.get("id") or ""), "client_algo_id": str(data.get("clientAlgoId") or data.get("clientOrderId") or ""), } def _protection_orders(intent: dict) -> dict: response = intent.get("response") if isinstance(intent.get("response"), dict) else {} return { "stop_loss_order": response.get("stop_loss_order") if isinstance(response.get("stop_loss_order"), dict) else {}, "take_profit_order": response.get("take_profit_order") if isinstance(response.get("take_profit_order"), dict) else {}, "market_order": response.get("market_order") if isinstance(response.get("market_order"), dict) else {}, "live_protection": response.get("live_protection") if isinstance(response.get("live_protection"), dict) else {}, "raw": response, } def _cancel_protection_orders(client, intent: dict, keys: tuple[str, ...] = ("stop_loss_order", "take_profit_order")) -> list[dict]: canceled = [] orders = _protection_orders(intent) for key in keys: ident = _order_identifier(orders.get(key)) try: if ident["algo_id"] and hasattr(client, "cancel_algo_order"): canceled.append({"key": key, "result": client.cancel_algo_order(algo_id=ident["algo_id"])}) elif ident["client_algo_id"] and hasattr(client, "cancel_algo_order"): canceled.append({"key": key, "result": client.cancel_algo_order(client_algo_id=ident["client_algo_id"])}) elif ident["id"]: canceled.append({"key": key, "result": client.cancel_order(ident["id"], intent.get("symbol"))}) except Exception as exc: canceled.append({"key": key, "error": str(exc)}) return canceled def _merge_intent_response(intent: dict, patch: dict) -> dict: response = intent.get("response") if isinstance(intent.get("response"), dict) else {} merged = {**response, **(patch or {})} return update_live_order_intent(intent["id"], response_json=merged, updated_at=_now()) def _check_live_cumulative_leverage(client, account: dict, additional_notional: float) -> tuple[bool, dict]: risk = _risk_for_account(account) cap = _safe_float(risk.get("max_cumulative_leverage"), 0) if cap <= 0: return True, {"disabled": True, "max_cumulative_leverage": cap} balance = client.fetch_balance() total = balance.get("total") if isinstance(balance.get("total"), dict) else {} equity = _safe_float(total.get("USDT")) positions = client.fetch_positions(None) if hasattr(client, "fetch_positions") else [] open_notional = sum(_position_notional(p) for p in positions or []) projected = open_notional + max(0.0, _safe_float(additional_notional)) projected_leverage = projected / equity if equity > 0 else 0 detail = { "account_equity_usdt": round(equity, 8), "open_notional_usdt": round(open_notional, 8), "additional_notional_usdt": round(additional_notional, 8), "projected_notional_usdt": round(projected, 8), "projected_cumulative_leverage": round(projected_leverage, 6), "max_cumulative_leverage": cap, } return projected_leverage <= cap + 1e-12, detail def execute_live_order_intent(intent_id: int, *, client=None) -> dict: intent = get_live_order_intent(intent_id) if not intent: raise LiveTradingConfigError("live order intent not found") if intent.get("status") not in ("prepared", "pending_approval"): return {"ok": False, "reason": f"intent_status_{intent.get('status')}", "intent": intent} account = get_live_account(intent.get("account_id")) if not account: raise LiveTradingConfigError("live account not found") if account.get("status") != "enabled": raise LiveTradingConfigError("live account disabled") symbol = str(intent.get("symbol") or "").upper() notional = _safe_float(intent.get("notional_usdt")) leverage = max(1.0, _safe_float(intent.get("leverage"), 1)) if notional <= 0: raise LiveTradingConfigError("live order notional is zero") client = client or build_binance_client(account, require_testnet=True) client.load_markets() min_notional = client.min_notional(symbol) if hasattr(client, "min_notional") else 0.0 if min_notional > 0 and notional < min_notional: raise LiveTradingConfigError(f"{symbol} minimum notional is {min_notional:g} USDT; live sync notional is {notional:g} USDT") ok, leverage_detail = _check_live_cumulative_leverage(client, account, notional) if not ok: update_live_order_intent(intent_id, status="blocked", reason="live_cumulative_leverage_exceeded", updated_at=_now()) record_live_order_event(intent_id, "live_sync_blocked", "blocked", "live_cumulative_leverage_exceeded", leverage_detail) return {"ok": False, "reason": "live_cumulative_leverage_exceeded", "risk": leverage_detail} open_side, close_side = _side_to_exchange(intent.get("side")) ticker = client.fetch_ticker(symbol) last = _safe_float(ticker.get("last") or ticker.get("close")) amount = client.amount_to_precision(symbol, notional / last) if last > 0 else 0 if amount <= 0: raise LiveTradingConfigError("calculated live order amount is zero") submitted_at = _now() update_live_order_intent(intent_id, status="submitting", quantity=amount, submitted_at=submitted_at, updated_at=submitted_at) record_live_order_event(intent_id, "live_sync_submit", "submitting", "submitting_live_market_order", {"amount": amount, "notional_usdt": notional}) market_order = None stop_order = None take_profit_order = None try: client.set_leverage(symbol, leverage) market_order = client.create_market_order(symbol, open_side, amount, {"newClientOrderId": f"alphax_live_{intent_id}_{int(datetime.now().timestamp())}"}) stop_loss = _safe_float(intent.get("stop_loss")) take_profit = _safe_float(intent.get("take_profit")) if stop_loss > 0: stop_order = client.create_stop_loss_order(symbol, close_side, amount, stop_loss) if take_profit > 0: take_profit_order = client.create_take_profit_order(symbol, close_side, amount, take_profit) finished_at = _now() response = {"market_order": market_order, "stop_loss_order": stop_order, "take_profit_order": take_profit_order, "risk": leverage_detail} updated = update_live_order_intent( intent_id, status="submitted", reason="live_order_submitted", exchange_order_id=str((market_order or {}).get("id") or (market_order or {}).get("orderId") or ""), response_json=response, finished_at=finished_at, updated_at=finished_at, ) record_live_order_event(intent_id, "live_sync_submitted", "submitted", "live_order_submitted", response) return {"ok": True, "intent": updated, "market_order": market_order, "stop_loss_order": stop_order, "take_profit_order": take_profit_order} except Exception as exc: failed_at = _now() update_live_order_intent(intent_id, status="error", reason=str(exc), response_json={"market_order": market_order}, finished_at=failed_at, updated_at=failed_at) record_live_order_event(intent_id, "live_sync_error", "error", str(exc), {"market_order": market_order}) raise def sync_paper_trade_to_live( paper_trade_id: int, *, account_ids: list[int] | None = None, execute: bool = True, client_factory=None, ) -> dict: init_db() cfg = live_trading_config() if not bool(cfg.get("enabled")): return {"ok": False, "reason": "live_trading_disabled", "items": []} if str(cfg.get("execution_mode") or "exchange_api").strip().lower() != "exchange_api": return {"ok": False, "reason": "live_trading_not_exchange_api", "items": []} trade = _paper_trade(paper_trade_id) if not trade: return {"ok": False, "reason": "paper_trade_not_found", "items": []} if trade.get("status") != "open": return {"ok": False, "reason": f"paper_trade_{trade.get('status')}", "items": []} accounts = list_enabled_live_accounts() selected = {_safe_int(x) for x in (account_ids or []) if _safe_int(x) > 0} if selected: accounts = [a for a in accounts if _safe_int(a.get("id")) in selected] if not accounts: return {"ok": False, "reason": "no_enabled_accounts", "items": []} items = [] for account in accounts: existing = _existing_intent_for_paper_trade(trade.get("id"), account.get("id")) if existing: items.append({ "account_id": account["id"], "intent": get_live_order_intent(existing["id"]), "sizing": {}, "executed": existing.get("status") == "submitted", "skipped": True, "reason": "already_synced", }) continue sizing = _live_sizing(trade, account) payload = { "account_id": account["id"], "symbol": trade.get("symbol"), "side": trade.get("side") or "long", "order_type": "market", "price": _safe_float(trade.get("entry_price")), "stop_loss": _safe_float(trade.get("stop_loss")), "take_profit": _safe_float(trade.get("tp1")), "notional_usdt": sizing["notional_usdt"], "leverage": sizing["leverage"], "recommendation_id": _safe_int(trade.get("recommendation_id")), "paper_trade_id": _safe_int(trade.get("id")), } intent = create_live_order_intent(payload, source_type="paper_trade_sync", source_id=_safe_int(trade.get("id"))) item = {"account_id": account["id"], "intent": intent, "sizing": sizing, "executed": False} if execute and intent.get("status") == "prepared": factory_client = client_factory(account) if client_factory else None try: item["execution"] = execute_live_order_intent(intent["id"], client=factory_client) item["executed"] = bool(item["execution"].get("ok")) except Exception as exc: item["execution"] = {"ok": False, "reason": str(exc)} items.append(item) return {"ok": True, "paper_trade_id": _safe_int(paper_trade_id), "execute": execute, "items": items, "total": len(items)} def sync_open_paper_trades_to_live(*, limit: int = 20, execute: bool = True, client_factory=None) -> dict: trades = _open_unsynced_paper_trades(limit=limit) results = [] for trade in trades: results.append(sync_paper_trade_to_live( trade["id"], execute=execute, client_factory=client_factory, )) return { "ok": True, "processed_count": len(results), "execute": execute, "results": results, } def _submitted_live_intents(limit: int = 100) -> list[dict]: conn = get_conn() try: rows = conn.execute( """ SELECT loi.*, pt.status AS paper_status, pt.trailing_stop AS paper_trailing_stop, pt.stop_loss AS paper_stop_loss, pt.tp1 AS paper_tp1, pt.current_price AS paper_current_price, pt.closed_at AS paper_closed_at, pt.exit_reason AS paper_exit_reason FROM live_order_intents loi LEFT JOIN paper_trades pt ON pt.id=loi.paper_trade_id WHERE loi.source_type='paper_trade_sync' AND loi.status='submitted' ORDER BY loi.updated_at ASC, loi.id ASC LIMIT %s """, (max(1, min(_safe_int(limit, 100), 300)),), ).fetchall() finally: conn.close() return [_row(r) for r in rows] def _paper_canceled_live_order_intents(limit: int = 100) -> list[dict]: conn = get_conn() try: rows = conn.execute( """ SELECT loi.*, po.status AS paper_order_status, po.cancel_reason AS paper_order_cancel_reason FROM live_order_intents loi JOIN paper_orders po ON po.id=loi.paper_order_id WHERE loi.paper_order_id > 0 AND loi.status IN ('prepared','submitted','submitting') AND po.status IN ('canceled','expired','rejected') ORDER BY loi.updated_at ASC, loi.id ASC LIMIT %s """, (max(1, min(_safe_int(limit, 100), 300)),), ).fetchall() finally: conn.close() return [_row(r) for r in rows] def _sync_live_stop_to_paper(intent: dict, account: dict, *, client=None) -> dict: paper_stop = _safe_float(intent.get("paper_trailing_stop") or intent.get("paper_stop_loss") or intent.get("stop_loss")) current_stop = _safe_float(intent.get("stop_loss")) if paper_stop <= 0: return {"ok": True, "action": "hold", "reason": "missing_paper_stop"} side = str(intent.get("side") or "long").lower() tighter = paper_stop < current_stop - 1e-12 if side == "short" else paper_stop > current_stop + 1e-12 if current_stop > 0 and not tighter: return {"ok": True, "action": "hold", "reason": "stop_not_tighter", "paper_stop": paper_stop, "current_stop": current_stop} client = client or build_binance_client(account, require_testnet=True) client.load_markets() _, close_side = _side_to_exchange(side) amount = _safe_float(intent.get("quantity")) if amount <= 0: return {"ok": False, "action": "error", "reason": "missing_live_quantity"} canceled = _cancel_protection_orders(client, intent, keys=("stop_loss_order",)) new_stop = client.create_stop_loss_order(intent["symbol"], close_side, amount, paper_stop) updated = _merge_intent_response(intent, { "stop_loss_order": new_stop, "live_protection": { **(_protection_orders(intent).get("live_protection") or {}), "stop_loss": paper_stop, "synced_at": _now(), "source": "paper_trailing_stop" if _safe_float(intent.get("paper_trailing_stop")) > 0 else "paper_stop_loss", }, }) update_live_order_intent(intent["id"], stop_loss=paper_stop, updated_at=_now()) record_live_order_event(intent["id"], "live_protection_stop_replace", "submitted", "replace_live_stop_with_paper_protection", { "previous_stop": current_stop, "new_stop": paper_stop, "canceled": canceled, "new_stop_order": new_stop, "updated_intent": updated.get("id"), }) return {"ok": True, "action": "replace_stop", "previous_stop": current_stop, "new_stop": paper_stop} def _sync_live_close_from_paper(intent: dict, account: dict, *, client=None) -> dict: client = client or build_binance_client(account, require_testnet=True) client.load_markets() _, close_side = _side_to_exchange(intent.get("side")) amount = _safe_float(intent.get("quantity")) if amount <= 0: return {"ok": False, "action": "error", "reason": "missing_live_quantity"} canceled = _cancel_protection_orders(client, intent) close_order = client.create_market_order(intent["symbol"], close_side, amount, { "reduceOnly": True, "newClientOrderId": f"alphax_close_{intent['id']}_{int(datetime.now().timestamp())}", }) finished_at = _now() _merge_intent_response(intent, { "live_close_order": close_order, "live_close_reason": intent.get("paper_exit_reason") or "paper_trade_closed", "protection_canceled_on_close": canceled, }) update_live_order_intent( intent["id"], status="closed", reason=f"paper_trade_closed:{intent.get('paper_exit_reason') or ''}", finished_at=finished_at, updated_at=finished_at, ) record_live_order_event(intent["id"], "live_sync_close", "closed", "paper_trade_closed_reduce_only_live_close", { "paper_trade_id": intent.get("paper_trade_id"), "paper_exit_reason": intent.get("paper_exit_reason"), "close_order": close_order, "canceled": canceled, }) return {"ok": True, "action": "close", "close_order": close_order} def sync_live_protection_from_paper(*, limit: int = 100, client_factory=None) -> dict: intents = _submitted_live_intents(limit=limit) results = [] for intent in intents: account = get_live_account(intent.get("account_id")) if not account or account.get("status") != "enabled": continue client = client_factory(account) if client_factory else None try: if intent.get("paper_status") == "closed": result = _sync_live_close_from_paper(intent, account, client=client) elif intent.get("paper_status") == "open": result = _sync_live_stop_to_paper(intent, account, client=client) else: result = {"ok": True, "action": "hold", "reason": f"paper_status_{intent.get('paper_status')}"} except Exception as exc: record_live_order_event(intent["id"], "live_protection_error", "error", str(exc), { "paper_trade_id": intent.get("paper_trade_id"), "paper_status": intent.get("paper_status"), }) result = {"ok": False, "action": "error", "reason": str(exc)} results.append({"intent_id": intent.get("id"), "paper_trade_id": intent.get("paper_trade_id"), **result}) canceled_results = [] for intent in _paper_canceled_live_order_intents(limit=limit): account = get_live_account(intent.get("account_id")) client = client_factory(account) if client_factory and account else None try: if client is None and account: client = build_binance_client(account, require_testnet=True) if client: client.load_markets() exchange_order_id = str(intent.get("exchange_order_id") or "") cancel_result = client.cancel_order(exchange_order_id, intent.get("symbol")) if exchange_order_id else {"skipped": True, "reason": "missing_exchange_order_id"} else: cancel_result = {"skipped": True, "reason": "missing_account"} update_live_order_intent(intent["id"], status="canceled", reason=f"paper_order_{intent.get('paper_order_status')}", updated_at=_now(), finished_at=_now()) record_live_order_event(intent["id"], "live_order_cancel", "canceled", "paper_order_canceled_or_expired", { "paper_order_id": intent.get("paper_order_id"), "paper_order_status": intent.get("paper_order_status"), "paper_order_cancel_reason": intent.get("paper_order_cancel_reason"), "cancel_result": cancel_result, }) canceled_results.append({"intent_id": intent.get("id"), "ok": True, "action": "cancel_entry"}) except Exception as exc: record_live_order_event(intent["id"], "live_order_cancel_error", "error", str(exc), {"paper_order_id": intent.get("paper_order_id")}) canceled_results.append({"intent_id": intent.get("id"), "ok": False, "action": "error", "reason": str(exc)}) ok = all(x.get("ok", True) for x in [*results, *canceled_results]) return {"ok": ok, "processed_count": len(results), "canceled_entry_count": len(canceled_results), "results": results, "canceled_entries": canceled_results} def run_live_trading_sync( *, limit: int = 20, execute: bool = True, sync_snapshots: bool = True, sync_paper: bool = True, sync_protection: bool = True, client_factory=None, ) -> dict: """Single scheduler entrypoint for live account snapshots and paper sync.""" init_db() cfg = live_trading_config() snapshot_result = {"skipped": True, "reason": "snapshot_sync_disabled"} paper_result = {"skipped": True, "reason": "paper_sync_disabled"} protection_result = {"skipped": True, "reason": "live_protection_sync_disabled"} if sync_snapshots: snapshot_result = sync_live_account_snapshots(client_factory=client_factory) if sync_paper: if not bool(cfg.get("enabled")): paper_result = {"ok": False, "skipped": True, "reason": "live_trading_disabled", "results": []} else: paper_result = sync_open_paper_trades_to_live( limit=limit, execute=execute, client_factory=client_factory, ) if sync_protection: protection_result = sync_live_protection_from_paper(limit=limit, client_factory=client_factory) return { "ok": bool(snapshot_result.get("ok", True)) and bool(paper_result.get("ok", True)) and bool(protection_result.get("ok", True)), "enabled": bool(cfg.get("enabled")), "execute": execute, "snapshots": snapshot_result, "paper_sync": paper_result, "protection_sync": protection_result, }