diff --git a/app/db/live_trading.py b/app/db/live_trading.py index 1e212fc..3ef693c 100644 --- a/app/db/live_trading.py +++ b/app/db/live_trading.py @@ -373,6 +373,64 @@ def list_live_order_intents(limit: int = 50, offset: int = 0, status: str = "", return {"items": [_row(r) for r in rows], "total": total, "limit": limit, "offset": offset} +def get_live_order_intent(intent_id: int) -> dict: + intent_id = _safe_int(intent_id) + if intent_id <= 0: + return {} + conn = get_conn() + try: + row = conn.execute("SELECT * FROM live_order_intents WHERE id=%s", (intent_id,)).fetchone() + finally: + conn.close() + return _row(row) + + +def update_live_order_intent(intent_id: int, **fields) -> dict: + intent_id = _safe_int(intent_id) + allowed = { + "status", "reason", "quantity", "price", "exchange_order_id", + "response_json", "submitted_at", "finished_at", "updated_at", + } + updates = [] + params = [] + for key, value in fields.items(): + if key not in allowed: + continue + column_value = _dumps(value) if key == "response_json" else value + updates.append(f"{key}=%s") + params.append(column_value) + if not updates or intent_id <= 0: + return get_live_order_intent(intent_id) + params.append(intent_id) + conn = get_conn() + try: + row = conn.execute( + f"UPDATE live_order_intents SET {', '.join(updates)} WHERE id=%s RETURNING *", + tuple(params), + ).fetchone() + conn.commit() + finally: + conn.close() + return _row(row) + + +def record_live_order_event(intent_id: int, event_type: str, status: str, message: str = "", payload=None) -> dict: + conn = get_conn() + try: + row = conn.execute( + """ + INSERT INTO live_order_events (intent_id, event_type, status, message, payload_json, event_time) + VALUES (%s,%s,%s,%s,%s,%s) + RETURNING * + """, + (_safe_int(intent_id), event_type, status, message, _dumps(payload or {}), _now()), + ).fetchone() + conn.commit() + finally: + conn.close() + return _row(row) + + def list_live_order_events(limit: int = 80, offset: int = 0, intent_id: int = 0) -> dict: limit = max(1, min(_safe_int(limit, 80), 200)) offset = max(0, _safe_int(offset)) diff --git a/app/services/live_trading_sync.py b/app/services/live_trading_sync.py new file mode 100644 index 0000000..d8bf939 --- /dev/null +++ b/app/services/live_trading_sync.py @@ -0,0 +1,275 @@ +"""Synchronize strategy-trading ledger entries to configured live accounts.""" + +from __future__ import annotations + +from datetime import datetime + +from app.db.live_trading import ( + _safe_float, + _safe_int, + create_live_order_intent, + get_live_account, + get_live_order_intent, + list_enabled_live_accounts, + record_live_order_event, + update_live_order_intent, + _row, +) +from app.db.schema import get_conn, init_db +from app.integrations.binance_live import LiveTradingConfigError, build_binance_client + + +def _now() -> str: + return datetime.now().isoformat() + + +def _side_to_exchange(side: str) -> tuple[str, str]: + side = str(side or "long").lower() + if side == "short": + return "sell", "buy" + return "buy", "sell" + + +def _paper_trade(paper_trade_id: int) -> dict: + conn = get_conn() + try: + row = conn.execute("SELECT * FROM paper_trades WHERE id=%s", (_safe_int(paper_trade_id),)).fetchone() + finally: + conn.close() + return _row(row) if row else {} + + +def _existing_intent_for_paper_trade(paper_trade_id: int, account_id: int) -> dict: + conn = get_conn() + try: + row = conn.execute( + """ + SELECT * + FROM live_order_intents + WHERE paper_trade_id=%s + AND account_id=%s + AND source_type='paper_trade_sync' + AND status NOT IN ('blocked','error') + ORDER BY id DESC + LIMIT 1 + """, + (_safe_int(paper_trade_id), _safe_int(account_id)), + ).fetchone() + finally: + conn.close() + return dict(row) if row else {} + + +def _open_unsynced_paper_trades(limit: int = 20) -> list[dict]: + conn = get_conn() + try: + rows = conn.execute( + """ + SELECT pt.* + FROM paper_trades pt + WHERE pt.status='open' + ORDER BY pt.opened_at DESC, pt.id DESC + LIMIT %s + """, + (max(1, min(_safe_int(limit, 20), 100)),), + ).fetchall() + finally: + conn.close() + return [dict(r) for r in rows] + + +def _risk_for_account(account: dict) -> dict: + return account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {} + + +def _live_sizing(paper_trade: dict, account: dict) -> dict: + risk = _risk_for_account(account) + paper_leverage = max(1.0, _safe_float(paper_trade.get("leverage"), 1)) + max_leverage = max(1.0, _safe_float(risk.get("max_symbol_leverage"), paper_leverage)) + leverage = min(paper_leverage, max_leverage) + max_margin = _safe_float(risk.get("max_order_margin_usdt"), 0) + paper_notional = _safe_float(paper_trade.get("notional_usdt")) + if max_margin > 0: + notional = min(paper_notional, max_margin * leverage) if paper_notional > 0 else max_margin * leverage + else: + notional = paper_notional + return { + "notional_usdt": round(max(0.0, notional), 8), + "leverage": leverage, + "paper_notional_usdt": paper_notional, + "sizing_mode": "account_risk_cap" if max_margin > 0 else "paper_notional", + } + + +def _position_notional(position: dict) -> float: + info = position.get("info") if isinstance(position.get("info"), dict) else {} + return abs(_safe_float(position.get("notional") or info.get("notional"))) + + +def _check_live_cumulative_leverage(client, account: dict, additional_notional: float) -> tuple[bool, dict]: + risk = _risk_for_account(account) + cap = _safe_float(risk.get("max_cumulative_leverage"), 0) + if cap <= 0: + return True, {"disabled": True, "max_cumulative_leverage": cap} + balance = client.fetch_balance() + total = balance.get("total") if isinstance(balance.get("total"), dict) else {} + equity = _safe_float(total.get("USDT")) + positions = client.fetch_positions(None) if hasattr(client, "fetch_positions") else [] + open_notional = sum(_position_notional(p) for p in positions or []) + projected = open_notional + max(0.0, _safe_float(additional_notional)) + projected_leverage = projected / equity if equity > 0 else 0 + detail = { + "account_equity_usdt": round(equity, 8), + "open_notional_usdt": round(open_notional, 8), + "additional_notional_usdt": round(additional_notional, 8), + "projected_notional_usdt": round(projected, 8), + "projected_cumulative_leverage": round(projected_leverage, 6), + "max_cumulative_leverage": cap, + } + return projected_leverage <= cap + 1e-12, detail + + +def execute_live_order_intent(intent_id: int, *, client=None) -> dict: + intent = get_live_order_intent(intent_id) + if not intent: + raise LiveTradingConfigError("live order intent not found") + if intent.get("status") not in ("prepared", "pending_approval"): + return {"ok": False, "reason": f"intent_status_{intent.get('status')}", "intent": intent} + account = get_live_account(intent.get("account_id")) + if not account: + raise LiveTradingConfigError("live account not found") + if account.get("status") != "enabled": + raise LiveTradingConfigError("live account disabled") + symbol = str(intent.get("symbol") or "").upper() + notional = _safe_float(intent.get("notional_usdt")) + leverage = max(1.0, _safe_float(intent.get("leverage"), 1)) + if notional <= 0: + raise LiveTradingConfigError("live order notional is zero") + client = client or build_binance_client(account, require_testnet=True) + client.load_markets() + min_notional = client.min_notional(symbol) if hasattr(client, "min_notional") else 0.0 + if min_notional > 0 and notional < min_notional: + raise LiveTradingConfigError(f"{symbol} minimum notional is {min_notional:g} USDT; live sync notional is {notional:g} USDT") + ok, leverage_detail = _check_live_cumulative_leverage(client, account, notional) + if not ok: + update_live_order_intent(intent_id, status="blocked", reason="live_cumulative_leverage_exceeded", updated_at=_now()) + record_live_order_event(intent_id, "live_sync_blocked", "blocked", "live_cumulative_leverage_exceeded", leverage_detail) + return {"ok": False, "reason": "live_cumulative_leverage_exceeded", "risk": leverage_detail} + + open_side, close_side = _side_to_exchange(intent.get("side")) + ticker = client.fetch_ticker(symbol) + last = _safe_float(ticker.get("last") or ticker.get("close")) + amount = client.amount_to_precision(symbol, notional / last) if last > 0 else 0 + if amount <= 0: + raise LiveTradingConfigError("calculated live order amount is zero") + submitted_at = _now() + update_live_order_intent(intent_id, status="submitting", quantity=amount, submitted_at=submitted_at, updated_at=submitted_at) + record_live_order_event(intent_id, "live_sync_submit", "submitting", "submitting_live_market_order", {"amount": amount, "notional_usdt": notional}) + + market_order = None + stop_order = None + take_profit_order = None + try: + client.set_leverage(symbol, leverage) + market_order = client.create_market_order(symbol, open_side, amount, {"newClientOrderId": f"alphax_live_{intent_id}_{int(datetime.now().timestamp())}"}) + stop_loss = _safe_float(intent.get("stop_loss")) + take_profit = _safe_float(intent.get("take_profit")) + if stop_loss > 0: + stop_order = client.create_stop_loss_order(symbol, close_side, amount, stop_loss) + if take_profit > 0: + take_profit_order = client.create_take_profit_order(symbol, close_side, amount, take_profit) + finished_at = _now() + response = {"market_order": market_order, "stop_loss_order": stop_order, "take_profit_order": take_profit_order, "risk": leverage_detail} + updated = update_live_order_intent( + intent_id, + status="submitted", + reason="live_order_submitted", + exchange_order_id=str((market_order or {}).get("id") or (market_order or {}).get("orderId") or ""), + response_json=response, + finished_at=finished_at, + updated_at=finished_at, + ) + record_live_order_event(intent_id, "live_sync_submitted", "submitted", "live_order_submitted", response) + return {"ok": True, "intent": updated, "market_order": market_order, "stop_loss_order": stop_order, "take_profit_order": take_profit_order} + except Exception as exc: + failed_at = _now() + update_live_order_intent(intent_id, status="error", reason=str(exc), response_json={"market_order": market_order}, finished_at=failed_at, updated_at=failed_at) + record_live_order_event(intent_id, "live_sync_error", "error", str(exc), {"market_order": market_order}) + raise + + +def sync_paper_trade_to_live( + paper_trade_id: int, + *, + account_ids: list[int] | None = None, + execute: bool = True, + client_factory=None, +) -> dict: + init_db() + trade = _paper_trade(paper_trade_id) + if not trade: + return {"ok": False, "reason": "paper_trade_not_found", "items": []} + if trade.get("status") != "open": + return {"ok": False, "reason": f"paper_trade_{trade.get('status')}", "items": []} + accounts = list_enabled_live_accounts() + selected = {_safe_int(x) for x in (account_ids or []) if _safe_int(x) > 0} + if selected: + accounts = [a for a in accounts if _safe_int(a.get("id")) in selected] + if not accounts: + return {"ok": False, "reason": "no_enabled_accounts", "items": []} + + items = [] + for account in accounts: + existing = _existing_intent_for_paper_trade(trade.get("id"), account.get("id")) + if existing: + items.append({ + "account_id": account["id"], + "intent": get_live_order_intent(existing["id"]), + "sizing": {}, + "executed": existing.get("status") == "submitted", + "skipped": True, + "reason": "already_synced", + }) + continue + sizing = _live_sizing(trade, account) + payload = { + "account_id": account["id"], + "symbol": trade.get("symbol"), + "side": trade.get("side") or "long", + "order_type": "market", + "price": _safe_float(trade.get("entry_price")), + "stop_loss": _safe_float(trade.get("stop_loss")), + "take_profit": _safe_float(trade.get("tp1")), + "notional_usdt": sizing["notional_usdt"], + "leverage": sizing["leverage"], + "recommendation_id": _safe_int(trade.get("recommendation_id")), + "paper_trade_id": _safe_int(trade.get("id")), + } + intent = create_live_order_intent(payload, source_type="paper_trade_sync", source_id=_safe_int(trade.get("id"))) + item = {"account_id": account["id"], "intent": intent, "sizing": sizing, "executed": False} + if execute and intent.get("status") == "prepared": + factory_client = client_factory(account) if client_factory else None + try: + item["execution"] = execute_live_order_intent(intent["id"], client=factory_client) + item["executed"] = bool(item["execution"].get("ok")) + except Exception as exc: + item["execution"] = {"ok": False, "reason": str(exc)} + items.append(item) + return {"ok": True, "paper_trade_id": _safe_int(paper_trade_id), "execute": execute, "items": items, "total": len(items)} + + +def sync_open_paper_trades_to_live(*, limit: int = 20, execute: bool = True, client_factory=None) -> dict: + trades = _open_unsynced_paper_trades(limit=limit) + results = [] + for trade in trades: + results.append(sync_paper_trade_to_live( + trade["id"], + execute=execute, + client_factory=client_factory, + )) + return { + "ok": True, + "processed_count": len(results), + "execute": execute, + "results": results, + } diff --git a/app/services/paper_trader.py b/app/services/paper_trader.py index 6c0112c..32f62cd 100644 --- a/app/services/paper_trader.py +++ b/app/services/paper_trader.py @@ -10,6 +10,7 @@ import ccxt from app.db.altcoin_db import init_db, log_cron_run, update_latest_price_cache from app.db.paper_trading import get_paper_trading_summary, sync_recommendation from app.db.recommendation_queries import get_active_recommendations_deduped +from app.services.live_trading_sync import sync_paper_trade_to_live exchange = ccxt.binance({"enableRateLimit": True}) @@ -28,6 +29,8 @@ def run_once(limit: int = 100) -> dict: event_time = datetime.now().isoformat() update_latest_price_cache(symbol, current_price, updated_at=event_time, source="paper_trader") result = sync_recommendation(rec, current_price, event_time=event_time) + if result.get("trade_id") and (result.get("opened") or result.get("paper_order", {}).get("filled")): + result["live_sync"] = sync_paper_trade_to_live(int(result["trade_id"]), execute=True) result.update({"symbol": symbol, "rec_id": rec.get("id"), "current_price": current_price}) results.append(result) except Exception as exc: diff --git a/app/web/routes_live_trading.py b/app/web/routes_live_trading.py index 29585c3..64cbbf2 100644 --- a/app/web/routes_live_trading.py +++ b/app/web/routes_live_trading.py @@ -13,6 +13,7 @@ from app.db.live_trading import ( ) from app.services.live_trading_smoke import run_binance_testnet_smoke from app.services.live_trading_account import get_live_account_overview +from app.services.live_trading_sync import sync_open_paper_trades_to_live, sync_paper_trade_to_live from app.integrations.binance_live import LiveTradingConfigError from app.web.shared import require_admin @@ -96,6 +97,27 @@ async def api_live_trading_from_paper(paper_trade_id: int, payload: dict = Body( return prepare_intent_from_paper_trade(paper_trade_id, account_ids=account_ids if isinstance(account_ids, list) else None) +@router.post("/api/live-trading/sync/from-paper/{paper_trade_id}") +async def api_live_trading_sync_from_paper(paper_trade_id: int, payload: dict = Body(default={}), altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + account_ids = payload.get("account_ids") if isinstance(payload, dict) else None + execute = bool(payload.get("execute", True)) if isinstance(payload, dict) else True + return sync_paper_trade_to_live( + paper_trade_id, + account_ids=account_ids if isinstance(account_ids, list) else None, + execute=execute, + ) + + +@router.post("/api/live-trading/sync/open-paper-trades") +async def api_live_trading_sync_open_paper_trades(payload: dict = Body(default={}), altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + return sync_open_paper_trades_to_live( + limit=int(payload.get("limit") or 20), + execute=bool(payload.get("execute", True)), + ) + + @router.get("/api/live-trading/events") async def api_live_trading_events( limit: int = 80, diff --git a/tests/test_live_trading.py b/tests/test_live_trading.py index 2a6bddc..ddaa690 100644 --- a/tests/test_live_trading.py +++ b/tests/test_live_trading.py @@ -13,7 +13,9 @@ from app.db.runtime_config_db import set_config from app.integrations.binance_live import build_binance_client from app.services.live_trading_account import get_live_account_overview from app.services.live_trading_smoke import run_binance_testnet_smoke +from app.services.live_trading_sync import sync_paper_trade_to_live from app.web import web_server +from app.db.schema import get_conn def _login_user(email: str, password: str = "StrongPass123", admin: bool = False) -> str: @@ -250,6 +252,10 @@ class _FakeBinanceClient: self.calls.append(("fetch_ticker", symbol)) return {"last": 100.0} + def fetch_positions(self, symbols=None): + self.calls.append(("fetch_positions", symbols)) + return [] + def set_leverage(self, symbol, leverage): self.calls.append(("set_leverage", symbol, leverage)) return {"symbol": symbol, "leverage": leverage} @@ -284,6 +290,71 @@ class _FakeBinanceClient: return self._order("take_profit", symbol, side, amount, stop_price, params or {}) +def _insert_paper_trade(symbol="DOGE/USDT", notional=5000, leverage=5): + conn = get_conn() + try: + row = conn.execute( + """ + INSERT INTO paper_trades ( + recommendation_id, symbol, side, status, opened_at, + entry_price, qty, notional_usdt, margin_usdt, leverage, + stop_loss, tp1, tp2, max_price, min_price, current_price, + pnl_pct, fee_usdt, source_status, source_action, strategy_version, + created_at, updated_at + ) + VALUES (%s,%s,'long','open','2026-05-22T00:00:00',0.1,100,%s,%s,%s,0.09,0.12,0.13,0.1,0.1,0.1,0,0,'buy_now','buy_now','test','2026-05-22T00:00:00','2026-05-22T00:00:00') + RETURNING * + """, + (990000 + int(notional), symbol, notional, notional / leverage, leverage), + ).fetchone() + conn.commit() + finally: + conn.close() + return dict(row) + + +def test_paper_trade_sync_executes_scaled_live_order_once(): + set_config("system", "live_trading", { + "enabled": True, + "execution_mode": "exchange_api", + "require_human_approval": False, + "exchange": "binance", + "market_type": "um_futures", + "testnet": True, + "risk": {"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "allowed_symbols": []}, + }, source="test") + account = upsert_live_account( + account_code="binance_auto_sync", + status="enabled", + risk_config={"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "max_cumulative_leverage": 5, "allowed_symbols": []}, + ) + trade = _insert_paper_trade() + fake = _FakeBinanceClient() + fake.fetch_balance = lambda: {"total": {"USDT": 1000}} + + result = sync_paper_trade_to_live( + trade["id"], + account_ids=[account["id"]], + execute=True, + client_factory=lambda acct: fake, + ) + again = sync_paper_trade_to_live( + trade["id"], + account_ids=[account["id"]], + execute=True, + client_factory=lambda acct: fake, + ) + + call_names = [c[0] for c in fake.calls] + assert result["ok"] is True + assert result["items"][0]["executed"] is True + assert result["items"][0]["sizing"]["notional_usdt"] == 40 + assert "market_order" in call_names + assert "stop_loss" in call_names + assert "take_profit" in call_names + assert again["items"][0]["reason"] == "already_synced" + + def test_binance_testnet_smoke_covers_market_limit_cancel_tp_sl_interfaces(): account = upsert_live_account( account_code="binance_testnet",