1
This commit is contained in:
parent
f83c1949ac
commit
346c35d664
@ -373,6 +373,64 @@ def list_live_order_intents(limit: int = 50, offset: int = 0, status: str = "",
|
||||
return {"items": [_row(r) for r in rows], "total": total, "limit": limit, "offset": offset}
|
||||
|
||||
|
||||
def get_live_order_intent(intent_id: int) -> dict:
|
||||
intent_id = _safe_int(intent_id)
|
||||
if intent_id <= 0:
|
||||
return {}
|
||||
conn = get_conn()
|
||||
try:
|
||||
row = conn.execute("SELECT * FROM live_order_intents WHERE id=%s", (intent_id,)).fetchone()
|
||||
finally:
|
||||
conn.close()
|
||||
return _row(row)
|
||||
|
||||
|
||||
def update_live_order_intent(intent_id: int, **fields) -> dict:
|
||||
intent_id = _safe_int(intent_id)
|
||||
allowed = {
|
||||
"status", "reason", "quantity", "price", "exchange_order_id",
|
||||
"response_json", "submitted_at", "finished_at", "updated_at",
|
||||
}
|
||||
updates = []
|
||||
params = []
|
||||
for key, value in fields.items():
|
||||
if key not in allowed:
|
||||
continue
|
||||
column_value = _dumps(value) if key == "response_json" else value
|
||||
updates.append(f"{key}=%s")
|
||||
params.append(column_value)
|
||||
if not updates or intent_id <= 0:
|
||||
return get_live_order_intent(intent_id)
|
||||
params.append(intent_id)
|
||||
conn = get_conn()
|
||||
try:
|
||||
row = conn.execute(
|
||||
f"UPDATE live_order_intents SET {', '.join(updates)} WHERE id=%s RETURNING *",
|
||||
tuple(params),
|
||||
).fetchone()
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
return _row(row)
|
||||
|
||||
|
||||
def record_live_order_event(intent_id: int, event_type: str, status: str, message: str = "", payload=None) -> dict:
|
||||
conn = get_conn()
|
||||
try:
|
||||
row = conn.execute(
|
||||
"""
|
||||
INSERT INTO live_order_events (intent_id, event_type, status, message, payload_json, event_time)
|
||||
VALUES (%s,%s,%s,%s,%s,%s)
|
||||
RETURNING *
|
||||
""",
|
||||
(_safe_int(intent_id), event_type, status, message, _dumps(payload or {}), _now()),
|
||||
).fetchone()
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
return _row(row)
|
||||
|
||||
|
||||
def list_live_order_events(limit: int = 80, offset: int = 0, intent_id: int = 0) -> dict:
|
||||
limit = max(1, min(_safe_int(limit, 80), 200))
|
||||
offset = max(0, _safe_int(offset))
|
||||
|
||||
275
app/services/live_trading_sync.py
Normal file
275
app/services/live_trading_sync.py
Normal file
@ -0,0 +1,275 @@
|
||||
"""Synchronize strategy-trading ledger entries to configured live accounts."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from app.db.live_trading import (
|
||||
_safe_float,
|
||||
_safe_int,
|
||||
create_live_order_intent,
|
||||
get_live_account,
|
||||
get_live_order_intent,
|
||||
list_enabled_live_accounts,
|
||||
record_live_order_event,
|
||||
update_live_order_intent,
|
||||
_row,
|
||||
)
|
||||
from app.db.schema import get_conn, init_db
|
||||
from app.integrations.binance_live import LiveTradingConfigError, build_binance_client
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
return datetime.now().isoformat()
|
||||
|
||||
|
||||
def _side_to_exchange(side: str) -> tuple[str, str]:
|
||||
side = str(side or "long").lower()
|
||||
if side == "short":
|
||||
return "sell", "buy"
|
||||
return "buy", "sell"
|
||||
|
||||
|
||||
def _paper_trade(paper_trade_id: int) -> dict:
|
||||
conn = get_conn()
|
||||
try:
|
||||
row = conn.execute("SELECT * FROM paper_trades WHERE id=%s", (_safe_int(paper_trade_id),)).fetchone()
|
||||
finally:
|
||||
conn.close()
|
||||
return _row(row) if row else {}
|
||||
|
||||
|
||||
def _existing_intent_for_paper_trade(paper_trade_id: int, account_id: int) -> dict:
|
||||
conn = get_conn()
|
||||
try:
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT *
|
||||
FROM live_order_intents
|
||||
WHERE paper_trade_id=%s
|
||||
AND account_id=%s
|
||||
AND source_type='paper_trade_sync'
|
||||
AND status NOT IN ('blocked','error')
|
||||
ORDER BY id DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
(_safe_int(paper_trade_id), _safe_int(account_id)),
|
||||
).fetchone()
|
||||
finally:
|
||||
conn.close()
|
||||
return dict(row) if row else {}
|
||||
|
||||
|
||||
def _open_unsynced_paper_trades(limit: int = 20) -> list[dict]:
|
||||
conn = get_conn()
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT pt.*
|
||||
FROM paper_trades pt
|
||||
WHERE pt.status='open'
|
||||
ORDER BY pt.opened_at DESC, pt.id DESC
|
||||
LIMIT %s
|
||||
""",
|
||||
(max(1, min(_safe_int(limit, 20), 100)),),
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def _risk_for_account(account: dict) -> dict:
|
||||
return account.get("risk_config") if isinstance(account.get("risk_config"), dict) else {}
|
||||
|
||||
|
||||
def _live_sizing(paper_trade: dict, account: dict) -> dict:
|
||||
risk = _risk_for_account(account)
|
||||
paper_leverage = max(1.0, _safe_float(paper_trade.get("leverage"), 1))
|
||||
max_leverage = max(1.0, _safe_float(risk.get("max_symbol_leverage"), paper_leverage))
|
||||
leverage = min(paper_leverage, max_leverage)
|
||||
max_margin = _safe_float(risk.get("max_order_margin_usdt"), 0)
|
||||
paper_notional = _safe_float(paper_trade.get("notional_usdt"))
|
||||
if max_margin > 0:
|
||||
notional = min(paper_notional, max_margin * leverage) if paper_notional > 0 else max_margin * leverage
|
||||
else:
|
||||
notional = paper_notional
|
||||
return {
|
||||
"notional_usdt": round(max(0.0, notional), 8),
|
||||
"leverage": leverage,
|
||||
"paper_notional_usdt": paper_notional,
|
||||
"sizing_mode": "account_risk_cap" if max_margin > 0 else "paper_notional",
|
||||
}
|
||||
|
||||
|
||||
def _position_notional(position: dict) -> float:
|
||||
info = position.get("info") if isinstance(position.get("info"), dict) else {}
|
||||
return abs(_safe_float(position.get("notional") or info.get("notional")))
|
||||
|
||||
|
||||
def _check_live_cumulative_leverage(client, account: dict, additional_notional: float) -> tuple[bool, dict]:
|
||||
risk = _risk_for_account(account)
|
||||
cap = _safe_float(risk.get("max_cumulative_leverage"), 0)
|
||||
if cap <= 0:
|
||||
return True, {"disabled": True, "max_cumulative_leverage": cap}
|
||||
balance = client.fetch_balance()
|
||||
total = balance.get("total") if isinstance(balance.get("total"), dict) else {}
|
||||
equity = _safe_float(total.get("USDT"))
|
||||
positions = client.fetch_positions(None) if hasattr(client, "fetch_positions") else []
|
||||
open_notional = sum(_position_notional(p) for p in positions or [])
|
||||
projected = open_notional + max(0.0, _safe_float(additional_notional))
|
||||
projected_leverage = projected / equity if equity > 0 else 0
|
||||
detail = {
|
||||
"account_equity_usdt": round(equity, 8),
|
||||
"open_notional_usdt": round(open_notional, 8),
|
||||
"additional_notional_usdt": round(additional_notional, 8),
|
||||
"projected_notional_usdt": round(projected, 8),
|
||||
"projected_cumulative_leverage": round(projected_leverage, 6),
|
||||
"max_cumulative_leverage": cap,
|
||||
}
|
||||
return projected_leverage <= cap + 1e-12, detail
|
||||
|
||||
|
||||
def execute_live_order_intent(intent_id: int, *, client=None) -> dict:
|
||||
intent = get_live_order_intent(intent_id)
|
||||
if not intent:
|
||||
raise LiveTradingConfigError("live order intent not found")
|
||||
if intent.get("status") not in ("prepared", "pending_approval"):
|
||||
return {"ok": False, "reason": f"intent_status_{intent.get('status')}", "intent": intent}
|
||||
account = get_live_account(intent.get("account_id"))
|
||||
if not account:
|
||||
raise LiveTradingConfigError("live account not found")
|
||||
if account.get("status") != "enabled":
|
||||
raise LiveTradingConfigError("live account disabled")
|
||||
symbol = str(intent.get("symbol") or "").upper()
|
||||
notional = _safe_float(intent.get("notional_usdt"))
|
||||
leverage = max(1.0, _safe_float(intent.get("leverage"), 1))
|
||||
if notional <= 0:
|
||||
raise LiveTradingConfigError("live order notional is zero")
|
||||
client = client or build_binance_client(account, require_testnet=True)
|
||||
client.load_markets()
|
||||
min_notional = client.min_notional(symbol) if hasattr(client, "min_notional") else 0.0
|
||||
if min_notional > 0 and notional < min_notional:
|
||||
raise LiveTradingConfigError(f"{symbol} minimum notional is {min_notional:g} USDT; live sync notional is {notional:g} USDT")
|
||||
ok, leverage_detail = _check_live_cumulative_leverage(client, account, notional)
|
||||
if not ok:
|
||||
update_live_order_intent(intent_id, status="blocked", reason="live_cumulative_leverage_exceeded", updated_at=_now())
|
||||
record_live_order_event(intent_id, "live_sync_blocked", "blocked", "live_cumulative_leverage_exceeded", leverage_detail)
|
||||
return {"ok": False, "reason": "live_cumulative_leverage_exceeded", "risk": leverage_detail}
|
||||
|
||||
open_side, close_side = _side_to_exchange(intent.get("side"))
|
||||
ticker = client.fetch_ticker(symbol)
|
||||
last = _safe_float(ticker.get("last") or ticker.get("close"))
|
||||
amount = client.amount_to_precision(symbol, notional / last) if last > 0 else 0
|
||||
if amount <= 0:
|
||||
raise LiveTradingConfigError("calculated live order amount is zero")
|
||||
submitted_at = _now()
|
||||
update_live_order_intent(intent_id, status="submitting", quantity=amount, submitted_at=submitted_at, updated_at=submitted_at)
|
||||
record_live_order_event(intent_id, "live_sync_submit", "submitting", "submitting_live_market_order", {"amount": amount, "notional_usdt": notional})
|
||||
|
||||
market_order = None
|
||||
stop_order = None
|
||||
take_profit_order = None
|
||||
try:
|
||||
client.set_leverage(symbol, leverage)
|
||||
market_order = client.create_market_order(symbol, open_side, amount, {"newClientOrderId": f"alphax_live_{intent_id}_{int(datetime.now().timestamp())}"})
|
||||
stop_loss = _safe_float(intent.get("stop_loss"))
|
||||
take_profit = _safe_float(intent.get("take_profit"))
|
||||
if stop_loss > 0:
|
||||
stop_order = client.create_stop_loss_order(symbol, close_side, amount, stop_loss)
|
||||
if take_profit > 0:
|
||||
take_profit_order = client.create_take_profit_order(symbol, close_side, amount, take_profit)
|
||||
finished_at = _now()
|
||||
response = {"market_order": market_order, "stop_loss_order": stop_order, "take_profit_order": take_profit_order, "risk": leverage_detail}
|
||||
updated = update_live_order_intent(
|
||||
intent_id,
|
||||
status="submitted",
|
||||
reason="live_order_submitted",
|
||||
exchange_order_id=str((market_order or {}).get("id") or (market_order or {}).get("orderId") or ""),
|
||||
response_json=response,
|
||||
finished_at=finished_at,
|
||||
updated_at=finished_at,
|
||||
)
|
||||
record_live_order_event(intent_id, "live_sync_submitted", "submitted", "live_order_submitted", response)
|
||||
return {"ok": True, "intent": updated, "market_order": market_order, "stop_loss_order": stop_order, "take_profit_order": take_profit_order}
|
||||
except Exception as exc:
|
||||
failed_at = _now()
|
||||
update_live_order_intent(intent_id, status="error", reason=str(exc), response_json={"market_order": market_order}, finished_at=failed_at, updated_at=failed_at)
|
||||
record_live_order_event(intent_id, "live_sync_error", "error", str(exc), {"market_order": market_order})
|
||||
raise
|
||||
|
||||
|
||||
def sync_paper_trade_to_live(
|
||||
paper_trade_id: int,
|
||||
*,
|
||||
account_ids: list[int] | None = None,
|
||||
execute: bool = True,
|
||||
client_factory=None,
|
||||
) -> dict:
|
||||
init_db()
|
||||
trade = _paper_trade(paper_trade_id)
|
||||
if not trade:
|
||||
return {"ok": False, "reason": "paper_trade_not_found", "items": []}
|
||||
if trade.get("status") != "open":
|
||||
return {"ok": False, "reason": f"paper_trade_{trade.get('status')}", "items": []}
|
||||
accounts = list_enabled_live_accounts()
|
||||
selected = {_safe_int(x) for x in (account_ids or []) if _safe_int(x) > 0}
|
||||
if selected:
|
||||
accounts = [a for a in accounts if _safe_int(a.get("id")) in selected]
|
||||
if not accounts:
|
||||
return {"ok": False, "reason": "no_enabled_accounts", "items": []}
|
||||
|
||||
items = []
|
||||
for account in accounts:
|
||||
existing = _existing_intent_for_paper_trade(trade.get("id"), account.get("id"))
|
||||
if existing:
|
||||
items.append({
|
||||
"account_id": account["id"],
|
||||
"intent": get_live_order_intent(existing["id"]),
|
||||
"sizing": {},
|
||||
"executed": existing.get("status") == "submitted",
|
||||
"skipped": True,
|
||||
"reason": "already_synced",
|
||||
})
|
||||
continue
|
||||
sizing = _live_sizing(trade, account)
|
||||
payload = {
|
||||
"account_id": account["id"],
|
||||
"symbol": trade.get("symbol"),
|
||||
"side": trade.get("side") or "long",
|
||||
"order_type": "market",
|
||||
"price": _safe_float(trade.get("entry_price")),
|
||||
"stop_loss": _safe_float(trade.get("stop_loss")),
|
||||
"take_profit": _safe_float(trade.get("tp1")),
|
||||
"notional_usdt": sizing["notional_usdt"],
|
||||
"leverage": sizing["leverage"],
|
||||
"recommendation_id": _safe_int(trade.get("recommendation_id")),
|
||||
"paper_trade_id": _safe_int(trade.get("id")),
|
||||
}
|
||||
intent = create_live_order_intent(payload, source_type="paper_trade_sync", source_id=_safe_int(trade.get("id")))
|
||||
item = {"account_id": account["id"], "intent": intent, "sizing": sizing, "executed": False}
|
||||
if execute and intent.get("status") == "prepared":
|
||||
factory_client = client_factory(account) if client_factory else None
|
||||
try:
|
||||
item["execution"] = execute_live_order_intent(intent["id"], client=factory_client)
|
||||
item["executed"] = bool(item["execution"].get("ok"))
|
||||
except Exception as exc:
|
||||
item["execution"] = {"ok": False, "reason": str(exc)}
|
||||
items.append(item)
|
||||
return {"ok": True, "paper_trade_id": _safe_int(paper_trade_id), "execute": execute, "items": items, "total": len(items)}
|
||||
|
||||
|
||||
def sync_open_paper_trades_to_live(*, limit: int = 20, execute: bool = True, client_factory=None) -> dict:
|
||||
trades = _open_unsynced_paper_trades(limit=limit)
|
||||
results = []
|
||||
for trade in trades:
|
||||
results.append(sync_paper_trade_to_live(
|
||||
trade["id"],
|
||||
execute=execute,
|
||||
client_factory=client_factory,
|
||||
))
|
||||
return {
|
||||
"ok": True,
|
||||
"processed_count": len(results),
|
||||
"execute": execute,
|
||||
"results": results,
|
||||
}
|
||||
@ -10,6 +10,7 @@ import ccxt
|
||||
from app.db.altcoin_db import init_db, log_cron_run, update_latest_price_cache
|
||||
from app.db.paper_trading import get_paper_trading_summary, sync_recommendation
|
||||
from app.db.recommendation_queries import get_active_recommendations_deduped
|
||||
from app.services.live_trading_sync import sync_paper_trade_to_live
|
||||
|
||||
|
||||
exchange = ccxt.binance({"enableRateLimit": True})
|
||||
@ -28,6 +29,8 @@ def run_once(limit: int = 100) -> dict:
|
||||
event_time = datetime.now().isoformat()
|
||||
update_latest_price_cache(symbol, current_price, updated_at=event_time, source="paper_trader")
|
||||
result = sync_recommendation(rec, current_price, event_time=event_time)
|
||||
if result.get("trade_id") and (result.get("opened") or result.get("paper_order", {}).get("filled")):
|
||||
result["live_sync"] = sync_paper_trade_to_live(int(result["trade_id"]), execute=True)
|
||||
result.update({"symbol": symbol, "rec_id": rec.get("id"), "current_price": current_price})
|
||||
results.append(result)
|
||||
except Exception as exc:
|
||||
|
||||
@ -13,6 +13,7 @@ from app.db.live_trading import (
|
||||
)
|
||||
from app.services.live_trading_smoke import run_binance_testnet_smoke
|
||||
from app.services.live_trading_account import get_live_account_overview
|
||||
from app.services.live_trading_sync import sync_open_paper_trades_to_live, sync_paper_trade_to_live
|
||||
from app.integrations.binance_live import LiveTradingConfigError
|
||||
from app.web.shared import require_admin
|
||||
|
||||
@ -96,6 +97,27 @@ async def api_live_trading_from_paper(paper_trade_id: int, payload: dict = Body(
|
||||
return prepare_intent_from_paper_trade(paper_trade_id, account_ids=account_ids if isinstance(account_ids, list) else None)
|
||||
|
||||
|
||||
@router.post("/api/live-trading/sync/from-paper/{paper_trade_id}")
|
||||
async def api_live_trading_sync_from_paper(paper_trade_id: int, payload: dict = Body(default={}), altcoin_session: str = Cookie(default="")):
|
||||
require_admin(altcoin_session)
|
||||
account_ids = payload.get("account_ids") if isinstance(payload, dict) else None
|
||||
execute = bool(payload.get("execute", True)) if isinstance(payload, dict) else True
|
||||
return sync_paper_trade_to_live(
|
||||
paper_trade_id,
|
||||
account_ids=account_ids if isinstance(account_ids, list) else None,
|
||||
execute=execute,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/api/live-trading/sync/open-paper-trades")
|
||||
async def api_live_trading_sync_open_paper_trades(payload: dict = Body(default={}), altcoin_session: str = Cookie(default="")):
|
||||
require_admin(altcoin_session)
|
||||
return sync_open_paper_trades_to_live(
|
||||
limit=int(payload.get("limit") or 20),
|
||||
execute=bool(payload.get("execute", True)),
|
||||
)
|
||||
|
||||
|
||||
@router.get("/api/live-trading/events")
|
||||
async def api_live_trading_events(
|
||||
limit: int = 80,
|
||||
|
||||
@ -13,7 +13,9 @@ from app.db.runtime_config_db import set_config
|
||||
from app.integrations.binance_live import build_binance_client
|
||||
from app.services.live_trading_account import get_live_account_overview
|
||||
from app.services.live_trading_smoke import run_binance_testnet_smoke
|
||||
from app.services.live_trading_sync import sync_paper_trade_to_live
|
||||
from app.web import web_server
|
||||
from app.db.schema import get_conn
|
||||
|
||||
|
||||
def _login_user(email: str, password: str = "StrongPass123", admin: bool = False) -> str:
|
||||
@ -250,6 +252,10 @@ class _FakeBinanceClient:
|
||||
self.calls.append(("fetch_ticker", symbol))
|
||||
return {"last": 100.0}
|
||||
|
||||
def fetch_positions(self, symbols=None):
|
||||
self.calls.append(("fetch_positions", symbols))
|
||||
return []
|
||||
|
||||
def set_leverage(self, symbol, leverage):
|
||||
self.calls.append(("set_leverage", symbol, leverage))
|
||||
return {"symbol": symbol, "leverage": leverage}
|
||||
@ -284,6 +290,71 @@ class _FakeBinanceClient:
|
||||
return self._order("take_profit", symbol, side, amount, stop_price, params or {})
|
||||
|
||||
|
||||
def _insert_paper_trade(symbol="DOGE/USDT", notional=5000, leverage=5):
|
||||
conn = get_conn()
|
||||
try:
|
||||
row = conn.execute(
|
||||
"""
|
||||
INSERT INTO paper_trades (
|
||||
recommendation_id, symbol, side, status, opened_at,
|
||||
entry_price, qty, notional_usdt, margin_usdt, leverage,
|
||||
stop_loss, tp1, tp2, max_price, min_price, current_price,
|
||||
pnl_pct, fee_usdt, source_status, source_action, strategy_version,
|
||||
created_at, updated_at
|
||||
)
|
||||
VALUES (%s,%s,'long','open','2026-05-22T00:00:00',0.1,100,%s,%s,%s,0.09,0.12,0.13,0.1,0.1,0.1,0,0,'buy_now','buy_now','test','2026-05-22T00:00:00','2026-05-22T00:00:00')
|
||||
RETURNING *
|
||||
""",
|
||||
(990000 + int(notional), symbol, notional, notional / leverage, leverage),
|
||||
).fetchone()
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
return dict(row)
|
||||
|
||||
|
||||
def test_paper_trade_sync_executes_scaled_live_order_once():
|
||||
set_config("system", "live_trading", {
|
||||
"enabled": True,
|
||||
"execution_mode": "exchange_api",
|
||||
"require_human_approval": False,
|
||||
"exchange": "binance",
|
||||
"market_type": "um_futures",
|
||||
"testnet": True,
|
||||
"risk": {"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "allowed_symbols": []},
|
||||
}, source="test")
|
||||
account = upsert_live_account(
|
||||
account_code="binance_auto_sync",
|
||||
status="enabled",
|
||||
risk_config={"max_order_margin_usdt": 20, "max_symbol_leverage": 2, "max_cumulative_leverage": 5, "allowed_symbols": []},
|
||||
)
|
||||
trade = _insert_paper_trade()
|
||||
fake = _FakeBinanceClient()
|
||||
fake.fetch_balance = lambda: {"total": {"USDT": 1000}}
|
||||
|
||||
result = sync_paper_trade_to_live(
|
||||
trade["id"],
|
||||
account_ids=[account["id"]],
|
||||
execute=True,
|
||||
client_factory=lambda acct: fake,
|
||||
)
|
||||
again = sync_paper_trade_to_live(
|
||||
trade["id"],
|
||||
account_ids=[account["id"]],
|
||||
execute=True,
|
||||
client_factory=lambda acct: fake,
|
||||
)
|
||||
|
||||
call_names = [c[0] for c in fake.calls]
|
||||
assert result["ok"] is True
|
||||
assert result["items"][0]["executed"] is True
|
||||
assert result["items"][0]["sizing"]["notional_usdt"] == 40
|
||||
assert "market_order" in call_names
|
||||
assert "stop_loss" in call_names
|
||||
assert "take_profit" in call_names
|
||||
assert again["items"][0]["reason"] == "already_synced"
|
||||
|
||||
|
||||
def test_binance_testnet_smoke_covers_market_limit_cancel_tp_sl_interfaces():
|
||||
account = upsert_live_account(
|
||||
account_code="binance_testnet",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user