529 lines
24 KiB
Python
529 lines
24 KiB
Python
"""Synchronize strategy-trading ledger entries to configured live accounts."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
|
|
from app.db.live_trading import (
|
|
_safe_float,
|
|
_safe_int,
|
|
create_live_order_intent,
|
|
get_live_account,
|
|
get_live_order_intent,
|
|
list_enabled_live_accounts,
|
|
record_live_order_event,
|
|
update_live_order_intent,
|
|
_row,
|
|
)
|
|
from app.config.system_config import live_trading_config
|
|
from app.db.schema import get_conn, init_db
|
|
from app.integrations.binance_live import LiveTradingConfigError, build_binance_client
|
|
from app.services.live_trading_account import sync_live_account_snapshots
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now().isoformat()
|
|
|
|
|
|
def _side_to_exchange(side: str) -> tuple[str, str]:
|
|
side = str(side or "long").lower()
|
|
if side == "short":
|
|
return "sell", "buy"
|
|
return "buy", "sell"
|
|
|
|
|
|
def _paper_trade(paper_trade_id: int) -> dict:
|
|
conn = get_conn()
|
|
try:
|
|
row = conn.execute("SELECT * FROM paper_trades WHERE id=%s", (_safe_int(paper_trade_id),)).fetchone()
|
|
finally:
|
|
conn.close()
|
|
return _row(row) if row else {}
|
|
|
|
|
|
def _existing_intent_for_paper_trade(paper_trade_id: int, account_id: int) -> dict:
|
|
conn = get_conn()
|
|
try:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT *
|
|
FROM live_order_intents
|
|
WHERE paper_trade_id=%s
|
|
AND account_id=%s
|
|
AND source_type='paper_trade_sync'
|
|
AND status NOT IN ('blocked','error')
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
""",
|
|
(_safe_int(paper_trade_id), _safe_int(account_id)),
|
|
).fetchone()
|
|
finally:
|
|
conn.close()
|
|
return dict(row) if row else {}
|
|
|
|
|
|
def _open_unsynced_paper_trades(limit: int = 20) -> list[dict]:
|
|
conn = get_conn()
|
|
try:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT pt.*
|
|
FROM paper_trades pt
|
|
WHERE pt.status='open'
|
|
ORDER BY pt.opened_at DESC, pt.id DESC
|
|
LIMIT %s
|
|
""",
|
|
(max(1, min(_safe_int(limit, 20), 100)),),
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def _risk_for_account(account: dict) -> dict:
|
|
return account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {}
|
|
|
|
|
|
def _live_sizing(paper_trade: dict, account: dict) -> dict:
|
|
risk = _risk_for_account(account)
|
|
paper_leverage = max(1.0, _safe_float(paper_trade.get("leverage"), 1))
|
|
max_leverage = max(1.0, _safe_float(risk.get("max_symbol_leverage"), paper_leverage))
|
|
leverage = min(paper_leverage, max_leverage)
|
|
max_margin = _safe_float(risk.get("max_order_margin_usdt"), 0)
|
|
paper_notional = _safe_float(paper_trade.get("notional_usdt"))
|
|
if max_margin > 0:
|
|
notional = min(paper_notional, max_margin * leverage) if paper_notional > 0 else max_margin * leverage
|
|
else:
|
|
notional = paper_notional
|
|
return {
|
|
"notional_usdt": round(max(0.0, notional), 8),
|
|
"leverage": leverage,
|
|
"paper_notional_usdt": paper_notional,
|
|
"sizing_mode": "account_risk_cap" if max_margin > 0 else "paper_notional",
|
|
}
|
|
|
|
|
|
def _position_notional(position: dict) -> float:
|
|
info = position.get("info") if isinstance(position.get("info"), dict) else {}
|
|
return abs(_safe_float(position.get("notional") or info.get("notional")))
|
|
|
|
|
|
def _order_identifier(order: dict | None) -> dict:
|
|
data = order if isinstance(order, dict) else {}
|
|
return {
|
|
"id": str(data.get("id") or data.get("orderId") or data.get("algoId") or ""),
|
|
"algo_id": str(data.get("algoId") or data.get("id") or ""),
|
|
"client_algo_id": str(data.get("clientAlgoId") or data.get("clientOrderId") or ""),
|
|
}
|
|
|
|
|
|
def _protection_orders(intent: dict) -> dict:
|
|
response = intent.get("response") if isinstance(intent.get("response"), dict) else {}
|
|
return {
|
|
"stop_loss_order": response.get("stop_loss_order") if isinstance(response.get("stop_loss_order"), dict) else {},
|
|
"take_profit_order": response.get("take_profit_order") if isinstance(response.get("take_profit_order"), dict) else {},
|
|
"market_order": response.get("market_order") if isinstance(response.get("market_order"), dict) else {},
|
|
"live_protection": response.get("live_protection") if isinstance(response.get("live_protection"), dict) else {},
|
|
"raw": response,
|
|
}
|
|
|
|
|
|
def _cancel_protection_orders(client, intent: dict, keys: tuple[str, ...] = ("stop_loss_order", "take_profit_order")) -> list[dict]:
|
|
canceled = []
|
|
orders = _protection_orders(intent)
|
|
for key in keys:
|
|
ident = _order_identifier(orders.get(key))
|
|
try:
|
|
if ident["algo_id"] and hasattr(client, "cancel_algo_order"):
|
|
canceled.append({"key": key, "result": client.cancel_algo_order(algo_id=ident["algo_id"])})
|
|
elif ident["client_algo_id"] and hasattr(client, "cancel_algo_order"):
|
|
canceled.append({"key": key, "result": client.cancel_algo_order(client_algo_id=ident["client_algo_id"])})
|
|
elif ident["id"]:
|
|
canceled.append({"key": key, "result": client.cancel_order(ident["id"], intent.get("symbol"))})
|
|
except Exception as exc:
|
|
canceled.append({"key": key, "error": str(exc)})
|
|
return canceled
|
|
|
|
|
|
def _merge_intent_response(intent: dict, patch: dict) -> dict:
|
|
response = intent.get("response") if isinstance(intent.get("response"), dict) else {}
|
|
merged = {**response, **(patch or {})}
|
|
return update_live_order_intent(intent["id"], response_json=merged, updated_at=_now())
|
|
|
|
|
|
def _check_live_cumulative_leverage(client, account: dict, additional_notional: float) -> tuple[bool, dict]:
|
|
risk = _risk_for_account(account)
|
|
cap = _safe_float(risk.get("max_cumulative_leverage"), 0)
|
|
if cap <= 0:
|
|
return True, {"disabled": True, "max_cumulative_leverage": cap}
|
|
balance = client.fetch_balance()
|
|
total = balance.get("total") if isinstance(balance.get("total"), dict) else {}
|
|
equity = _safe_float(total.get("USDT"))
|
|
positions = client.fetch_positions(None) if hasattr(client, "fetch_positions") else []
|
|
open_notional = sum(_position_notional(p) for p in positions or [])
|
|
projected = open_notional + max(0.0, _safe_float(additional_notional))
|
|
projected_leverage = projected / equity if equity > 0 else 0
|
|
detail = {
|
|
"account_equity_usdt": round(equity, 8),
|
|
"open_notional_usdt": round(open_notional, 8),
|
|
"additional_notional_usdt": round(additional_notional, 8),
|
|
"projected_notional_usdt": round(projected, 8),
|
|
"projected_cumulative_leverage": round(projected_leverage, 6),
|
|
"max_cumulative_leverage": cap,
|
|
}
|
|
return projected_leverage <= cap + 1e-12, detail
|
|
|
|
|
|
def execute_live_order_intent(intent_id: int, *, client=None) -> dict:
|
|
intent = get_live_order_intent(intent_id)
|
|
if not intent:
|
|
raise LiveTradingConfigError("live order intent not found")
|
|
if intent.get("status") not in ("prepared", "pending_approval"):
|
|
return {"ok": False, "reason": f"intent_status_{intent.get('status')}", "intent": intent}
|
|
account = get_live_account(intent.get("account_id"))
|
|
if not account:
|
|
raise LiveTradingConfigError("live account not found")
|
|
if account.get("status") != "enabled":
|
|
raise LiveTradingConfigError("live account disabled")
|
|
symbol = str(intent.get("symbol") or "").upper()
|
|
notional = _safe_float(intent.get("notional_usdt"))
|
|
leverage = max(1.0, _safe_float(intent.get("leverage"), 1))
|
|
if notional <= 0:
|
|
raise LiveTradingConfigError("live order notional is zero")
|
|
client = client or build_binance_client(account, require_testnet=True)
|
|
client.load_markets()
|
|
min_notional = client.min_notional(symbol) if hasattr(client, "min_notional") else 0.0
|
|
if min_notional > 0 and notional < min_notional:
|
|
raise LiveTradingConfigError(f"{symbol} minimum notional is {min_notional:g} USDT; live sync notional is {notional:g} USDT")
|
|
ok, leverage_detail = _check_live_cumulative_leverage(client, account, notional)
|
|
if not ok:
|
|
update_live_order_intent(intent_id, status="blocked", reason="live_cumulative_leverage_exceeded", updated_at=_now())
|
|
record_live_order_event(intent_id, "live_sync_blocked", "blocked", "live_cumulative_leverage_exceeded", leverage_detail)
|
|
return {"ok": False, "reason": "live_cumulative_leverage_exceeded", "risk": leverage_detail}
|
|
|
|
open_side, close_side = _side_to_exchange(intent.get("side"))
|
|
ticker = client.fetch_ticker(symbol)
|
|
last = _safe_float(ticker.get("last") or ticker.get("close"))
|
|
amount = client.amount_to_precision(symbol, notional / last) if last > 0 else 0
|
|
if amount <= 0:
|
|
raise LiveTradingConfigError("calculated live order amount is zero")
|
|
submitted_at = _now()
|
|
update_live_order_intent(intent_id, status="submitting", quantity=amount, submitted_at=submitted_at, updated_at=submitted_at)
|
|
record_live_order_event(intent_id, "live_sync_submit", "submitting", "submitting_live_market_order", {"amount": amount, "notional_usdt": notional})
|
|
|
|
market_order = None
|
|
stop_order = None
|
|
take_profit_order = None
|
|
try:
|
|
client.set_leverage(symbol, leverage)
|
|
market_order = client.create_market_order(symbol, open_side, amount, {"newClientOrderId": f"alphax_live_{intent_id}_{int(datetime.now().timestamp())}"})
|
|
stop_loss = _safe_float(intent.get("stop_loss"))
|
|
take_profit = _safe_float(intent.get("take_profit"))
|
|
if stop_loss > 0:
|
|
stop_order = client.create_stop_loss_order(symbol, close_side, amount, stop_loss)
|
|
if take_profit > 0:
|
|
take_profit_order = client.create_take_profit_order(symbol, close_side, amount, take_profit)
|
|
finished_at = _now()
|
|
response = {"market_order": market_order, "stop_loss_order": stop_order, "take_profit_order": take_profit_order, "risk": leverage_detail}
|
|
updated = update_live_order_intent(
|
|
intent_id,
|
|
status="submitted",
|
|
reason="live_order_submitted",
|
|
exchange_order_id=str((market_order or {}).get("id") or (market_order or {}).get("orderId") or ""),
|
|
response_json=response,
|
|
finished_at=finished_at,
|
|
updated_at=finished_at,
|
|
)
|
|
record_live_order_event(intent_id, "live_sync_submitted", "submitted", "live_order_submitted", response)
|
|
return {"ok": True, "intent": updated, "market_order": market_order, "stop_loss_order": stop_order, "take_profit_order": take_profit_order}
|
|
except Exception as exc:
|
|
failed_at = _now()
|
|
update_live_order_intent(intent_id, status="error", reason=str(exc), response_json={"market_order": market_order}, finished_at=failed_at, updated_at=failed_at)
|
|
record_live_order_event(intent_id, "live_sync_error", "error", str(exc), {"market_order": market_order})
|
|
raise
|
|
|
|
|
|
def sync_paper_trade_to_live(
|
|
paper_trade_id: int,
|
|
*,
|
|
account_ids: list[int] | None = None,
|
|
execute: bool = True,
|
|
client_factory=None,
|
|
) -> dict:
|
|
init_db()
|
|
cfg = live_trading_config()
|
|
if not bool(cfg.get("enabled")):
|
|
return {"ok": False, "reason": "live_trading_disabled", "items": []}
|
|
if str(cfg.get("execution_mode") or "exchange_api").strip().lower() != "exchange_api":
|
|
return {"ok": False, "reason": "live_trading_not_exchange_api", "items": []}
|
|
trade = _paper_trade(paper_trade_id)
|
|
if not trade:
|
|
return {"ok": False, "reason": "paper_trade_not_found", "items": []}
|
|
if trade.get("status") != "open":
|
|
return {"ok": False, "reason": f"paper_trade_{trade.get('status')}", "items": []}
|
|
accounts = list_enabled_live_accounts()
|
|
selected = {_safe_int(x) for x in (account_ids or []) if _safe_int(x) > 0}
|
|
if selected:
|
|
accounts = [a for a in accounts if _safe_int(a.get("id")) in selected]
|
|
if not accounts:
|
|
return {"ok": False, "reason": "no_enabled_accounts", "items": []}
|
|
|
|
items = []
|
|
for account in accounts:
|
|
existing = _existing_intent_for_paper_trade(trade.get("id"), account.get("id"))
|
|
if existing:
|
|
items.append({
|
|
"account_id": account["id"],
|
|
"intent": get_live_order_intent(existing["id"]),
|
|
"sizing": {},
|
|
"executed": existing.get("status") == "submitted",
|
|
"skipped": True,
|
|
"reason": "already_synced",
|
|
})
|
|
continue
|
|
sizing = _live_sizing(trade, account)
|
|
payload = {
|
|
"account_id": account["id"],
|
|
"symbol": trade.get("symbol"),
|
|
"side": trade.get("side") or "long",
|
|
"order_type": "market",
|
|
"price": _safe_float(trade.get("entry_price")),
|
|
"stop_loss": _safe_float(trade.get("stop_loss")),
|
|
"take_profit": _safe_float(trade.get("tp1")),
|
|
"notional_usdt": sizing["notional_usdt"],
|
|
"leverage": sizing["leverage"],
|
|
"recommendation_id": _safe_int(trade.get("recommendation_id")),
|
|
"paper_trade_id": _safe_int(trade.get("id")),
|
|
}
|
|
intent = create_live_order_intent(payload, source_type="paper_trade_sync", source_id=_safe_int(trade.get("id")))
|
|
item = {"account_id": account["id"], "intent": intent, "sizing": sizing, "executed": False}
|
|
if execute and intent.get("status") == "prepared":
|
|
factory_client = client_factory(account) if client_factory else None
|
|
try:
|
|
item["execution"] = execute_live_order_intent(intent["id"], client=factory_client)
|
|
item["executed"] = bool(item["execution"].get("ok"))
|
|
except Exception as exc:
|
|
item["execution"] = {"ok": False, "reason": str(exc)}
|
|
items.append(item)
|
|
return {"ok": True, "paper_trade_id": _safe_int(paper_trade_id), "execute": execute, "items": items, "total": len(items)}
|
|
|
|
|
|
def sync_open_paper_trades_to_live(*, limit: int = 20, execute: bool = True, client_factory=None) -> dict:
|
|
trades = _open_unsynced_paper_trades(limit=limit)
|
|
results = []
|
|
for trade in trades:
|
|
results.append(sync_paper_trade_to_live(
|
|
trade["id"],
|
|
execute=execute,
|
|
client_factory=client_factory,
|
|
))
|
|
return {
|
|
"ok": True,
|
|
"processed_count": len(results),
|
|
"execute": execute,
|
|
"results": results,
|
|
}
|
|
|
|
|
|
def _submitted_live_intents(limit: int = 100) -> list[dict]:
|
|
conn = get_conn()
|
|
try:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT loi.*, pt.status AS paper_status, pt.trailing_stop AS paper_trailing_stop,
|
|
pt.stop_loss AS paper_stop_loss, pt.tp1 AS paper_tp1, pt.current_price AS paper_current_price,
|
|
pt.closed_at AS paper_closed_at, pt.exit_reason AS paper_exit_reason
|
|
FROM live_order_intents loi
|
|
LEFT JOIN paper_trades pt ON pt.id=loi.paper_trade_id
|
|
WHERE loi.source_type='paper_trade_sync'
|
|
AND loi.status='submitted'
|
|
ORDER BY loi.updated_at ASC, loi.id ASC
|
|
LIMIT %s
|
|
""",
|
|
(max(1, min(_safe_int(limit, 100), 300)),),
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [_row(r) for r in rows]
|
|
|
|
|
|
def _paper_canceled_live_order_intents(limit: int = 100) -> list[dict]:
|
|
conn = get_conn()
|
|
try:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT loi.*, po.status AS paper_order_status, po.cancel_reason AS paper_order_cancel_reason
|
|
FROM live_order_intents loi
|
|
JOIN paper_orders po ON po.id=loi.paper_order_id
|
|
WHERE loi.paper_order_id > 0
|
|
AND loi.status IN ('prepared','submitted','submitting')
|
|
AND po.status IN ('canceled','expired','rejected')
|
|
ORDER BY loi.updated_at ASC, loi.id ASC
|
|
LIMIT %s
|
|
""",
|
|
(max(1, min(_safe_int(limit, 100), 300)),),
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [_row(r) for r in rows]
|
|
|
|
|
|
def _sync_live_stop_to_paper(intent: dict, account: dict, *, client=None) -> dict:
|
|
paper_stop = _safe_float(intent.get("paper_trailing_stop") or intent.get("paper_stop_loss") or intent.get("stop_loss"))
|
|
current_stop = _safe_float(intent.get("stop_loss"))
|
|
if paper_stop <= 0:
|
|
return {"ok": True, "action": "hold", "reason": "missing_paper_stop"}
|
|
side = str(intent.get("side") or "long").lower()
|
|
tighter = paper_stop < current_stop - 1e-12 if side == "short" else paper_stop > current_stop + 1e-12
|
|
if current_stop > 0 and not tighter:
|
|
return {"ok": True, "action": "hold", "reason": "stop_not_tighter", "paper_stop": paper_stop, "current_stop": current_stop}
|
|
client = client or build_binance_client(account, require_testnet=True)
|
|
client.load_markets()
|
|
_, close_side = _side_to_exchange(side)
|
|
amount = _safe_float(intent.get("quantity"))
|
|
if amount <= 0:
|
|
return {"ok": False, "action": "error", "reason": "missing_live_quantity"}
|
|
canceled = _cancel_protection_orders(client, intent, keys=("stop_loss_order",))
|
|
new_stop = client.create_stop_loss_order(intent["symbol"], close_side, amount, paper_stop)
|
|
updated = _merge_intent_response(intent, {
|
|
"stop_loss_order": new_stop,
|
|
"live_protection": {
|
|
**(_protection_orders(intent).get("live_protection") or {}),
|
|
"stop_loss": paper_stop,
|
|
"synced_at": _now(),
|
|
"source": "paper_trailing_stop" if _safe_float(intent.get("paper_trailing_stop")) > 0 else "paper_stop_loss",
|
|
},
|
|
})
|
|
update_live_order_intent(intent["id"], stop_loss=paper_stop, updated_at=_now())
|
|
record_live_order_event(intent["id"], "live_protection_stop_replace", "submitted", "replace_live_stop_with_paper_protection", {
|
|
"previous_stop": current_stop,
|
|
"new_stop": paper_stop,
|
|
"canceled": canceled,
|
|
"new_stop_order": new_stop,
|
|
"updated_intent": updated.get("id"),
|
|
})
|
|
return {"ok": True, "action": "replace_stop", "previous_stop": current_stop, "new_stop": paper_stop}
|
|
|
|
|
|
def _sync_live_close_from_paper(intent: dict, account: dict, *, client=None) -> dict:
|
|
client = client or build_binance_client(account, require_testnet=True)
|
|
client.load_markets()
|
|
_, close_side = _side_to_exchange(intent.get("side"))
|
|
amount = _safe_float(intent.get("quantity"))
|
|
if amount <= 0:
|
|
return {"ok": False, "action": "error", "reason": "missing_live_quantity"}
|
|
canceled = _cancel_protection_orders(client, intent)
|
|
close_order = client.create_market_order(intent["symbol"], close_side, amount, {
|
|
"reduceOnly": True,
|
|
"newClientOrderId": f"alphax_close_{intent['id']}_{int(datetime.now().timestamp())}",
|
|
})
|
|
finished_at = _now()
|
|
_merge_intent_response(intent, {
|
|
"live_close_order": close_order,
|
|
"live_close_reason": intent.get("paper_exit_reason") or "paper_trade_closed",
|
|
"protection_canceled_on_close": canceled,
|
|
})
|
|
update_live_order_intent(
|
|
intent["id"],
|
|
status="closed",
|
|
reason=f"paper_trade_closed:{intent.get('paper_exit_reason') or ''}",
|
|
finished_at=finished_at,
|
|
updated_at=finished_at,
|
|
)
|
|
record_live_order_event(intent["id"], "live_sync_close", "closed", "paper_trade_closed_reduce_only_live_close", {
|
|
"paper_trade_id": intent.get("paper_trade_id"),
|
|
"paper_exit_reason": intent.get("paper_exit_reason"),
|
|
"close_order": close_order,
|
|
"canceled": canceled,
|
|
})
|
|
return {"ok": True, "action": "close", "close_order": close_order}
|
|
|
|
|
|
def sync_live_protection_from_paper(*, limit: int = 100, client_factory=None) -> dict:
|
|
intents = _submitted_live_intents(limit=limit)
|
|
results = []
|
|
for intent in intents:
|
|
account = get_live_account(intent.get("account_id"))
|
|
if not account or account.get("status") != "enabled":
|
|
continue
|
|
client = client_factory(account) if client_factory else None
|
|
try:
|
|
if intent.get("paper_status") == "closed":
|
|
result = _sync_live_close_from_paper(intent, account, client=client)
|
|
elif intent.get("paper_status") == "open":
|
|
result = _sync_live_stop_to_paper(intent, account, client=client)
|
|
else:
|
|
result = {"ok": True, "action": "hold", "reason": f"paper_status_{intent.get('paper_status')}"}
|
|
except Exception as exc:
|
|
record_live_order_event(intent["id"], "live_protection_error", "error", str(exc), {
|
|
"paper_trade_id": intent.get("paper_trade_id"),
|
|
"paper_status": intent.get("paper_status"),
|
|
})
|
|
result = {"ok": False, "action": "error", "reason": str(exc)}
|
|
results.append({"intent_id": intent.get("id"), "paper_trade_id": intent.get("paper_trade_id"), **result})
|
|
|
|
canceled_results = []
|
|
for intent in _paper_canceled_live_order_intents(limit=limit):
|
|
account = get_live_account(intent.get("account_id"))
|
|
client = client_factory(account) if client_factory and account else None
|
|
try:
|
|
if client is None and account:
|
|
client = build_binance_client(account, require_testnet=True)
|
|
if client:
|
|
client.load_markets()
|
|
exchange_order_id = str(intent.get("exchange_order_id") or "")
|
|
cancel_result = client.cancel_order(exchange_order_id, intent.get("symbol")) if exchange_order_id else {"skipped": True, "reason": "missing_exchange_order_id"}
|
|
else:
|
|
cancel_result = {"skipped": True, "reason": "missing_account"}
|
|
update_live_order_intent(intent["id"], status="canceled", reason=f"paper_order_{intent.get('paper_order_status')}", updated_at=_now(), finished_at=_now())
|
|
record_live_order_event(intent["id"], "live_order_cancel", "canceled", "paper_order_canceled_or_expired", {
|
|
"paper_order_id": intent.get("paper_order_id"),
|
|
"paper_order_status": intent.get("paper_order_status"),
|
|
"paper_order_cancel_reason": intent.get("paper_order_cancel_reason"),
|
|
"cancel_result": cancel_result,
|
|
})
|
|
canceled_results.append({"intent_id": intent.get("id"), "ok": True, "action": "cancel_entry"})
|
|
except Exception as exc:
|
|
record_live_order_event(intent["id"], "live_order_cancel_error", "error", str(exc), {"paper_order_id": intent.get("paper_order_id")})
|
|
canceled_results.append({"intent_id": intent.get("id"), "ok": False, "action": "error", "reason": str(exc)})
|
|
ok = all(x.get("ok", True) for x in [*results, *canceled_results])
|
|
return {"ok": ok, "processed_count": len(results), "canceled_entry_count": len(canceled_results), "results": results, "canceled_entries": canceled_results}
|
|
|
|
|
|
def run_live_trading_sync(
|
|
*,
|
|
limit: int = 20,
|
|
execute: bool = True,
|
|
sync_snapshots: bool = True,
|
|
sync_paper: bool = True,
|
|
sync_protection: bool = True,
|
|
client_factory=None,
|
|
) -> dict:
|
|
"""Single scheduler entrypoint for live account snapshots and paper sync."""
|
|
init_db()
|
|
cfg = live_trading_config()
|
|
snapshot_result = {"skipped": True, "reason": "snapshot_sync_disabled"}
|
|
paper_result = {"skipped": True, "reason": "paper_sync_disabled"}
|
|
protection_result = {"skipped": True, "reason": "live_protection_sync_disabled"}
|
|
if sync_snapshots:
|
|
snapshot_result = sync_live_account_snapshots(client_factory=client_factory)
|
|
if sync_paper:
|
|
if not bool(cfg.get("enabled")):
|
|
paper_result = {"ok": False, "skipped": True, "reason": "live_trading_disabled", "results": []}
|
|
else:
|
|
paper_result = sync_open_paper_trades_to_live(
|
|
limit=limit,
|
|
execute=execute,
|
|
client_factory=client_factory,
|
|
)
|
|
if sync_protection:
|
|
protection_result = sync_live_protection_from_paper(limit=limit, client_factory=client_factory)
|
|
return {
|
|
"ok": bool(snapshot_result.get("ok", True)) and bool(paper_result.get("ok", True)) and bool(protection_result.get("ok", True)),
|
|
"enabled": bool(cfg.get("enabled")),
|
|
"execute": execute,
|
|
"snapshots": snapshot_result,
|
|
"paper_sync": paper_result,
|
|
"protection_sync": protection_result,
|
|
}
|