This commit is contained in:
aaron 2026-05-20 16:50:46 +08:00
parent f6ae9c4ed6
commit fc11c73413
8 changed files with 935 additions and 85 deletions

View File

@ -0,0 +1,29 @@
CREATE TABLE IF NOT EXISTS paper_orders (
id BIGSERIAL PRIMARY KEY,
recommendation_id BIGINT NOT NULL UNIQUE,
symbol TEXT NOT NULL,
side TEXT NOT NULL DEFAULT 'long',
order_type TEXT NOT NULL DEFAULT 'limit',
status TEXT NOT NULL DEFAULT 'pending',
source_status TEXT DEFAULT '',
source_action TEXT DEFAULT '',
target_price DOUBLE PRECISION NOT NULL,
current_price_at_create DOUBLE PRECISION DEFAULT 0,
fill_price DOUBLE PRECISION DEFAULT 0,
notional_usdt DOUBLE PRECISION DEFAULT 0,
stop_loss DOUBLE PRECISION DEFAULT 0,
tp1 DOUBLE PRECISION DEFAULT 0,
tp2 DOUBLE PRECISION DEFAULT 0,
strategy_version TEXT DEFAULT '',
entry_plan_snapshot_json TEXT DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
expires_at TEXT DEFAULT '',
filled_at TEXT DEFAULT '',
canceled_at TEXT DEFAULT '',
cancel_reason TEXT DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_paper_orders_status_updated ON paper_orders(status, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_paper_orders_symbol_status ON paper_orders(symbol, status);
CREATE INDEX IF NOT EXISTS idx_paper_orders_recommendation ON paper_orders(recommendation_id);

View File

@ -105,6 +105,13 @@ def _entry_plan(rec: dict) -> dict:
return _loads_json(rec.get("entry_plan_json"), {})
def _parse_time(value: str):
try:
return datetime.fromisoformat(str(value or ""))
except Exception:
return None
def _open_price(current_price: float, config: dict | None = None) -> float:
return round(current_price * (1 + default_slippage_pct(config) / 100), 12)
@ -181,57 +188,158 @@ def _record_event(conn, trade_id: int, rec_id: int, symbol: str, event_type: str
)
def _push_event_card(event_type: str, trade: dict, result: dict, event_time: str = "") -> None:
def _fmt_price(value) -> str:
price = _safe_float(value)
if price <= 0:
return "--"
return f"${price:.8g}"
def _fmt_pct(value) -> str:
pct = _safe_float(value)
sign = "+" if pct > 0 else ""
return f"{sign}{pct:.2f}%"
def _card_field(label: str, value) -> dict:
return {
"tag": "div",
"text": {"tag": "lark_md", "content": f"**{label}**\n{value}"},
}
def _card_note(content: str) -> dict:
return {"tag": "note", "elements": [{"tag": "plain_text", "content": content}]}
def _push_paper_card(event_type: str, symbol: str, title: str, template: str, fields: list[tuple[str, str]], note: str = "", event_time: str = "") -> None:
try:
symbol = str(trade.get("symbol") or "")
title = {
"open": f"📒 模拟交易开仓 — {symbol.replace('/USDT', '')}",
"close": f"🏁 模拟交易平仓 — {symbol.replace('/USDT', '')}",
"trailing_activate": f"🛡️ 模拟交易移动止盈启动 — {symbol.replace('/USDT', '')}",
"trailing_move": f"🛡️ 模拟交易移动止盈上移 — {symbol.replace('/USDT', '')}",
}.get(event_type)
if not title:
return
card = {
"metadata": {"source": "paper_trading", "event_type": event_type},
elements = []
if fields:
elements.append({"tag": "column_set", "flex_mode": "none", "background_style": "default", "columns": [
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field(label, value)]}
for label, value in fields
]})
if note:
elements.append(_card_note(note))
if event_time:
elements.append(_card_note(f"时间: {event_time}"))
push_card({
"metadata": {"source": "paper_trading", "event_type": event_type, "symbol": symbol},
"config": {"wide_screen_mode": True},
"header": {
"template": "blue" if event_type == "open" else ("yellow" if event_type.startswith("trailing") else "red"),
"template": template,
"title": {"tag": "plain_text", "content": title},
},
"elements": [
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": (
f"**币种**: {symbol}\n"
f"**事件**: {event_type}\n"
f"**成交价**: ${result.get('entry_price') or result.get('exit_price') or result.get('trailing_stop') or trade.get('current_price') or 0}\n"
f"**时间**: {event_time or ''}"
),
},
}
],
}
push_card(card)
"elements": elements,
})
except Exception:
pass
def _open_trade(conn, rec: dict, current_price: float, event_time: str) -> dict:
def _push_event_card(event_type: str, trade: dict, result: dict, event_time: str = "") -> None:
symbol = str(trade.get("symbol") or "")
short_symbol = symbol.replace("/USDT", "")
if event_type == "open":
_push_paper_card(
event_type,
symbol,
f"模拟交易开仓 - {short_symbol}",
"blue",
[
("成交价", _fmt_price(result.get("entry_price"))),
("名义仓位", f"{_safe_float(result.get('notional_usdt')):.2f} USDT"),
("杠杆/保证金", f"{_safe_float(result.get('leverage')):.1f}x / {_safe_float(result.get('margin_usdt')):.2f} USDT"),
],
"仅用于策略收益验证,不代表真实成交。",
event_time,
)
return
if event_type == "close":
_push_paper_card(
event_type,
symbol,
f"模拟交易平仓 - {short_symbol}",
"red" if _safe_float(result.get("pnl_usdt")) < 0 else "green",
[
("退出价", _fmt_price(result.get("exit_price"))),
("收益率", _fmt_pct(result.get("pnl_pct"))),
("收益额", f"{_safe_float(result.get('pnl_usdt')):.2f} USDT"),
("原因", result.get("exit_reason") or "--"),
],
"收益只来自 paper_trades 模拟账本。",
event_time,
)
return
if event_type.startswith("trailing"):
_push_paper_card(
event_type,
symbol,
f"模拟交易移动止盈{'启动' if event_type == 'trailing_activate' else '上移'} - {short_symbol}",
"yellow",
[
("保护价", _fmt_price(result.get("trailing_stop"))),
("当前收益", _fmt_pct(result.get("pnl_pct"))),
("动作", "启动保护" if event_type == "trailing_activate" else "上移保护价"),
],
"移动止盈用于锁定浮盈,仍以模拟账本为准。",
event_time,
)
def _push_order_created_card(order: dict, event_time: str = "") -> None:
symbol = str(order.get("symbol") or "")
target = _safe_float(order.get("target_price"))
current = _safe_float(order.get("current_price_at_create"))
distance = round((current / target - 1) * 100, 2) if target and current else 0
_push_paper_card(
"paper_order_create",
symbol,
f"模拟挂单创建 - {symbol.replace('/USDT', '')}",
"wathet",
[
("目标价", _fmt_price(target)),
("当前价", _fmt_price(current)),
("距目标", _fmt_pct(distance)),
("有效期", order.get("expires_at") or "--"),
],
"等回踩机会已进入模拟挂单,触价后才会进入模拟持仓。",
event_time,
)
def _push_order_filled_card(order: dict, result: dict, event_time: str = "") -> None:
symbol = str(order.get("symbol") or "")
_push_paper_card(
"paper_order_fill",
symbol,
f"模拟挂单成交并开仓 - {symbol.replace('/USDT', '')}",
"green",
[
("挂单价", _fmt_price(order.get("target_price"))),
("成交价", _fmt_price(result.get("entry_price") or order.get("fill_price"))),
("名义仓位", f"{_safe_float(result.get('notional_usdt')):.2f} USDT"),
("来源", order.get("source_status") or "wait_pullback"),
],
"价格触达理想入场位,模拟挂单已转为 paper trade。",
event_time,
)
def _open_trade(conn, rec: dict, current_price: float, event_time: str, config: dict | None = None, push_open_card: bool = True) -> dict:
cfg = _paper_cfg(config)
rec_id = _safe_int(rec.get("id"))
symbol = str(rec.get("symbol") or "").strip().upper()
plan = _entry_plan(rec)
entry_price = _open_price(current_price)
notional = default_notional_usdt()
leverage = default_leverage()
margin = default_margin_usdt()
entry_price = _open_price(current_price, cfg)
notional = default_notional_usdt(cfg)
leverage = default_leverage(cfg)
margin = default_margin_usdt(cfg)
qty = round(notional / entry_price, 12) if entry_price > 0 else 0
stop_loss = _safe_float(rec.get("stop_loss") or plan.get("stop_loss"))
tp1 = _safe_float(rec.get("tp1") or plan.get("tp1") or plan.get("take_profit_1"))
tp2 = _safe_float(rec.get("tp2") or plan.get("tp2") or plan.get("take_profit_2"))
fee = round(notional * default_fee_rate(), 8)
fee = round(notional * default_fee_rate(cfg), 8)
now = event_time or _now()
row = conn.execute(
"""
@ -285,14 +393,13 @@ def _open_trade(conn, rec: dict, current_price: float, event_time: str) -> dict:
"leverage": leverage,
"qty": qty,
"fee_usdt": fee,
"slippage_pct": default_slippage_pct(),
"slippage_pct": default_slippage_pct(cfg),
"source_status": rec.get("execution_status") or "",
"source_action": rec.get("action_status") or "",
},
now,
)
_push_event_card("open", {"symbol": symbol}, {"entry_price": entry_price}, now)
return {
result = {
"opened": True,
"trade_id": trade_id,
"entry_price": entry_price,
@ -301,6 +408,262 @@ def _open_trade(conn, rec: dict, current_price: float, event_time: str) -> dict:
"margin_usdt": margin,
"leverage": leverage,
}
if push_open_card:
_push_event_card("open", {"symbol": symbol}, result, now)
return result
def _is_wait_pullback(rec: dict) -> bool:
plan = _entry_plan(rec)
execution_status = str(rec.get("execution_status") or "").strip()
action_status = str(rec.get("action_status") or "").strip()
entry_action = str(plan.get("entry_action") or "").strip()
return execution_status == "wait_pullback" or action_status == "等回踩" or entry_action == "等回踩"
def _paper_order_target_price(rec: dict) -> float:
plan = _entry_plan(rec)
return _safe_float(
plan.get("limit_price")
or plan.get("order_price")
or plan.get("entry_price")
or rec.get("entry_price")
)
def _paper_order_expires_at(event_time: str, config: dict | None = None) -> str:
cfg = _paper_cfg(config)
hours = max(0.25, _safe_float(cfg.get("order_expire_hours"), 24.0))
base = _parse_time(event_time) or datetime.now()
return (base + timedelta(hours=hours)).isoformat()
def _paper_order_touched(order: dict, current_price: float) -> bool:
side = str(order.get("side") or "long").lower()
target = _safe_float(order.get("target_price"))
if target <= 0 or current_price <= 0:
return False
if side == "short":
return current_price >= target
return current_price <= target
def _paper_order_too_far(order: dict, current_price: float, config: dict | None = None) -> bool:
cfg = _paper_cfg(config)
threshold_pct = max(0.0, _safe_float(cfg.get("order_cancel_far_from_entry_pct"), 12.0))
if threshold_pct <= 0:
return False
side = str(order.get("side") or "long").lower()
target = _safe_float(order.get("target_price"))
if target <= 0 or current_price <= 0:
return False
if side == "short":
return current_price < target * (1 - threshold_pct / 100)
return current_price > target * (1 + threshold_pct / 100)
def _cancel_paper_order(conn, order: dict, reason: str, event_time: str) -> dict:
conn.execute(
"""
UPDATE paper_orders
SET status='canceled', cancel_reason=%s, canceled_at=%s, updated_at=%s
WHERE id=%s AND status='pending'
""",
(reason, event_time, event_time, order["id"]),
)
return {"skipped": True, "reason": f"paper_order_{reason}", "paper_order_id": order["id"]}
def _order_recommendation_cancel_reason(conn, rec: dict, order: dict) -> str:
rec_id = _safe_int(rec.get("id") or order.get("recommendation_id"))
if rec_id <= 0:
return ""
row = conn.execute(
"""
SELECT status, execution_status, lifecycle_state, display_bucket
FROM recommendation
WHERE id=%s
""",
(rec_id,),
).fetchone()
if not row:
return "recommendation_missing"
row = dict(row)
status = str(row.get("status") or "").strip().lower()
execution_status = str(row.get("execution_status") or "").strip().lower()
lifecycle_state = str(row.get("lifecycle_state") or "").strip().lower()
display_bucket = str(row.get("display_bucket") or "").strip().lower()
if status in {"expired", "invalid", "archived", "stopped_out", "closed"}:
return "recommendation_invalid"
if execution_status in {"invalid", "expired", "archived", "stopped_out"}:
return "recommendation_invalid"
if lifecycle_state in {"invalid", "expired", "archived", "stopped_out"}:
return "recommendation_invalid"
if display_bucket in {"history", "archive", "archived"}:
return "recommendation_invalid"
return ""
def _order_payload_from_rec(rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict:
cfg = _paper_cfg(config)
plan = _entry_plan(rec)
return {
"recommendation_id": _safe_int(rec.get("id")),
"symbol": str(rec.get("symbol") or "").strip().upper(),
"side": str(plan.get("side") or rec.get("side") or "long").strip().lower() or "long",
"order_type": "limit",
"status": "pending",
"source_status": str(rec.get("execution_status") or ""),
"source_action": str(rec.get("action_status") or plan.get("entry_action") or ""),
"target_price": _paper_order_target_price(rec),
"current_price_at_create": current_price,
"notional_usdt": default_notional_usdt(cfg),
"stop_loss": _safe_float(rec.get("stop_loss") or plan.get("stop_loss")),
"tp1": _safe_float(rec.get("tp1") or plan.get("tp1") or plan.get("take_profit_1")),
"tp2": _safe_float(rec.get("tp2") or plan.get("tp2") or plan.get("take_profit_2")),
"strategy_version": str(rec.get("strategy_version") or ""),
"entry_plan_snapshot_json": json.dumps(plan, ensure_ascii=False, default=str),
"created_at": event_time,
"updated_at": event_time,
"expires_at": _paper_order_expires_at(event_time, cfg),
}
def _fill_paper_order(conn, order: dict, rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict:
fill_price = _safe_float(order.get("target_price")) or current_price
trade_rec = dict(rec)
plan = _entry_plan(trade_rec)
plan.setdefault("entry_price", fill_price)
trade_rec["entry_plan"] = plan
trade_rec["entry_price"] = fill_price
result = _open_trade(conn, trade_rec, fill_price, event_time, config=config, push_open_card=False)
if result.get("opened"):
order = {**order, "fill_price": fill_price}
conn.execute(
"""
UPDATE paper_orders
SET status='filled', fill_price=%s, filled_at=%s, updated_at=%s
WHERE id=%s
""",
(fill_price, event_time, event_time, order["id"]),
)
result["paper_order"] = {"filled": True, "order_id": order["id"], "fill_price": fill_price}
_push_order_filled_card(order, result, event_time)
stop_loss = _safe_float(rec.get("stop_loss") or _entry_plan(rec).get("stop_loss") or order.get("stop_loss"))
if stop_loss > 0 and current_price <= stop_loss:
trade = conn.execute("SELECT * FROM paper_trades WHERE id=%s", (result["trade_id"],)).fetchone()
if trade:
close_result = _close_trade(conn, dict(trade), current_price, "stop_loss_same_tick", event_time)
result.update({
"closed": True,
"exit_reason": close_result.get("exit_reason"),
"pnl_pct": close_result.get("pnl_pct"),
"pnl_usdt": close_result.get("pnl_usdt"),
"same_tick_stop_loss": True,
})
return result
if result.get("reason") == "already_exists":
conn.execute(
"""
UPDATE paper_orders
SET status='filled', fill_price=%s, filled_at=%s, updated_at=%s
WHERE id=%s
""",
(fill_price, event_time, event_time, order["id"]),
)
result["paper_order"] = {"filled": False, "order_id": order["id"], "fill_price": fill_price}
return result
def _sync_wait_pullback_order(conn, rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict:
cfg = _paper_cfg(config)
rec_id = _safe_int(rec.get("id"))
order = conn.execute("SELECT * FROM paper_orders WHERE recommendation_id=%s", (rec_id,)).fetchone()
if order:
order = dict(order)
if order.get("status") != "pending":
return {"skipped": True, "reason": f"paper_order_{order.get('status')}", "paper_order_id": order.get("id")}
cancel_reason = _order_recommendation_cancel_reason(conn, rec, order)
if cancel_reason:
return _cancel_paper_order(conn, order, cancel_reason, event_time)
if _paper_order_touched(order, current_price):
return _fill_paper_order(conn, order, rec, current_price, event_time, cfg)
expires_at = _parse_time(order.get("expires_at"))
now = _parse_time(event_time) or datetime.now()
if expires_at and now > expires_at:
conn.execute(
"""
UPDATE paper_orders
SET status='expired', cancel_reason='expired', canceled_at=%s, updated_at=%s
WHERE id=%s
""",
(event_time, event_time, order["id"]),
)
return {"skipped": True, "reason": "paper_order_expired", "paper_order_id": order["id"]}
if _paper_order_too_far(order, current_price, cfg):
return _cancel_paper_order(conn, order, "too_far_from_entry", event_time)
conn.execute("UPDATE paper_orders SET updated_at=%s WHERE id=%s", (event_time, order["id"]))
return {
"skipped": True,
"reason": "paper_order_pending",
"paper_order_id": order["id"],
"target_price": order.get("target_price"),
"current_price": current_price,
}
payload = _order_payload_from_rec(rec, current_price, event_time, cfg)
if payload["recommendation_id"] <= 0 or not payload["symbol"] or payload["target_price"] <= 0:
return {"skipped": True, "reason": "invalid_paper_order"}
row = conn.execute(
"""
INSERT INTO paper_orders (
recommendation_id, symbol, side, order_type, status,
source_status, source_action, target_price, current_price_at_create,
notional_usdt, stop_loss, tp1, tp2, strategy_version,
entry_plan_snapshot_json, created_at, updated_at, expires_at
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT(recommendation_id) DO NOTHING
RETURNING id
""",
(
payload["recommendation_id"],
payload["symbol"],
payload["side"],
payload["order_type"],
payload["status"],
payload["source_status"],
payload["source_action"],
payload["target_price"],
payload["current_price_at_create"],
payload["notional_usdt"],
payload["stop_loss"],
payload["tp1"],
payload["tp2"],
payload["strategy_version"],
payload["entry_plan_snapshot_json"],
payload["created_at"],
payload["updated_at"],
payload["expires_at"],
),
).fetchone()
order_id = row["id"] if row else None
order = {"id": order_id, **payload}
if _paper_order_touched(order, current_price):
return _fill_paper_order(conn, order, rec, current_price, event_time, cfg)
cancel_reason = _order_recommendation_cancel_reason(conn, rec, order)
if cancel_reason:
return _cancel_paper_order(conn, order, cancel_reason, event_time)
if _paper_order_too_far(order, current_price, cfg):
return _cancel_paper_order(conn, order, "too_far_from_entry", event_time)
result = {
"skipped": True,
"reason": "paper_order_created",
"paper_order_id": order_id,
"target_price": payload["target_price"],
"current_price": current_price,
}
_push_order_created_card(order, event_time)
return result
def _close_trade(conn, trade: dict, current_price: float, reason: str, event_time: str) -> dict:
@ -353,7 +716,12 @@ def _close_trade(conn, trade: dict, current_price: float, reason: str, event_tim
{"realized_pnl_usdt": pnl_usdt, "fee_usdt": total_fee},
now,
)
_push_event_card("close", trade, {"exit_price": exit_price}, now)
_push_event_card(
"close",
trade,
{"exit_price": exit_price, "exit_reason": reason, "pnl_pct": pnl_pct, "pnl_usdt": pnl_usdt},
now,
)
return {"closed": True, "trade_id": trade["id"], "exit_reason": reason, "pnl_pct": pnl_pct, "pnl_usdt": pnl_usdt}
@ -469,6 +837,7 @@ def sync_recommendation(rec: dict, current_price: float, event_time: str = "") -
execution_status = str(rec.get("execution_status") or "").strip()
action_status = str(rec.get("action_status") or "").strip()
event_time = event_time or _now()
cfg = paper_trading_config()
conn = get_conn()
try:
@ -479,13 +848,15 @@ def sync_recommendation(rec: dict, current_price: float, event_time: str = "") -
result = _update_open_trade(conn, trade, current_price, event_time)
conn.commit()
return result
conn.close()
return {"skipped": True, "reason": "already_closed", "trade_id": trade.get("id")}
if execution_status != "buy_now" and action_status != "可即刻买入":
conn.close()
if _is_wait_pullback(rec):
result = _sync_wait_pullback_order(conn, rec, current_price, event_time, cfg)
conn.commit()
return result
return {"skipped": True, "reason": "not_buy_now"}
result = _open_trade(conn, rec, current_price, event_time)
result = _open_trade(conn, rec, current_price, event_time, config=cfg)
conn.commit()
return result
except Exception:
@ -511,6 +882,7 @@ def get_paper_trading_summary(days: int = 30) -> dict:
""",
(cutoff,),
).fetchall()
pending_order_count = conn.execute("SELECT COUNT(*) FROM paper_orders WHERE status='pending'").fetchone()[0]
finally:
conn.close()
cfg = paper_trading_config()
@ -535,6 +907,7 @@ def get_paper_trading_summary(days: int = 30) -> dict:
"closed_count": len(closed_items),
"win_count": len(wins),
"loss_count": len(losses),
"pending_order_count": int(pending_order_count or 0),
"win_rate": round(len(wins) / len(closed_items) * 100, 2) if closed_items else 0,
"realized_pnl_usdt": total_realized,
"avg_realized_pnl_pct": avg_realized_pct,
@ -593,6 +966,51 @@ def list_paper_trades(limit: int = 50, offset: int = 0, status: str = "") -> dic
}
def list_paper_orders(limit: int = 50, offset: int = 0, status: str = "") -> dict:
limit = max(1, min(_safe_int(limit, 50), 200))
offset = max(0, _safe_int(offset, 0))
status = str(status or "").strip()
where = ""
params = []
if status in {"pending", "filled", "canceled", "expired", "rejected"}:
where = "WHERE status=%s"
params.append(status)
conn = get_conn()
try:
total = conn.execute(f"SELECT COUNT(*) FROM paper_orders {where}", tuple(params)).fetchone()[0]
rows = conn.execute(
f"""
SELECT po.*, lpc.price AS latest_market_price, lpc.updated_at AS latest_market_price_updated_at
FROM paper_orders po
LEFT JOIN latest_price_cache lpc ON lpc.symbol = po.symbol
{where}
ORDER BY po.created_at DESC, po.id DESC
LIMIT %s OFFSET %s
""",
tuple(params + [limit, offset]),
).fetchall()
finally:
conn.close()
items = []
for row in rows:
item = dict(row)
item["entry_plan_snapshot"] = _loads_json(item.pop("entry_plan_snapshot_json", "{}"), {})
latest_market = _safe_float(item.get("latest_market_price"))
item["latest_price"] = latest_market if latest_market > 0 else _safe_float(item.get("current_price_at_create"))
item["latest_price_updated_at"] = item.get("latest_market_price_updated_at") or item.get("updated_at") or ""
target = _safe_float(item.get("target_price"))
latest = _safe_float(item.get("latest_price"))
item["distance_to_target_pct"] = round((latest / target - 1) * 100, 4) if target and latest else 0
items.append(item)
return {
"items": items,
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(items) < int(total or 0),
}
def list_paper_trade_events(limit: int = 80, offset: int = 0, symbol: str = "", event_type: str = "") -> dict:
limit = max(1, min(_safe_int(limit, 80), 200))
offset = max(0, _safe_int(offset, 0))

View File

@ -77,6 +77,32 @@ def _load_open_paper_trade_recs() -> list[dict]:
conn.close()
def _load_pending_paper_order_recs() -> list[dict]:
conn = get_conn()
try:
rows = conn.execute(
"""
SELECT
po.recommendation_id AS id,
po.symbol,
po.target_price AS entry_price,
po.stop_loss,
po.tp1,
po.tp2,
po.source_status AS execution_status,
po.source_action AS action_status,
po.strategy_version,
po.entry_plan_snapshot_json AS entry_plan_json
FROM paper_orders po
WHERE po.status='pending'
ORDER BY po.created_at DESC, po.id DESC
"""
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def load_stream_targets(limit: int | None = None, cfg: dict | None = None) -> dict[str, dict]:
"""Return symbol -> recommendation-like payload for websocket updates."""
cfg = cfg or price_streamer_config()
@ -95,6 +121,11 @@ def load_stream_targets(limit: int | None = None, cfg: dict | None = None) -> di
if symbol:
targets.setdefault(symbol, rec)
for rec in _load_pending_paper_order_recs():
symbol = str(rec.get("symbol") or "").strip().upper()
if symbol:
targets.setdefault(symbol, rec)
return dict(list(targets.items())[:max_symbols])

View File

@ -1,6 +1,6 @@
from fastapi import APIRouter, Cookie
from app.db.paper_trading import get_paper_trading_summary, list_paper_trade_events, list_paper_trades
from app.db.paper_trading import get_paper_trading_summary, list_paper_orders, list_paper_trade_events, list_paper_trades
from app.web.shared import require_admin
@ -24,6 +24,17 @@ async def api_paper_trading_trades(
return list_paper_trades(limit=limit, offset=offset, status=status)
@router.get("/api/paper-trading/orders")
async def api_paper_trading_orders(
limit: int = 50,
offset: int = 0,
status: str = "",
altcoin_session: str = Cookie(default=""),
):
require_admin(altcoin_session)
return list_paper_orders(limit=limit, offset=offset, status=status)
@router.get("/api/paper-trading/events")
async def api_paper_trading_events(
limit: int = 80,

File diff suppressed because one or more lines are too long

View File

@ -74,6 +74,7 @@ _ID_TABLES = {
"screening_log",
"recommendation",
"price_tracking",
"paper_orders",
"paper_trades",
"paper_trade_events",
"cron_run_log",

View File

@ -3,7 +3,7 @@ import json
import pytest
from app.db import altcoin_db
from app.db.paper_trading import get_paper_trading_summary, list_paper_trade_events, list_paper_trades, sync_recommendation
from app.db.paper_trading import get_paper_trading_summary, list_paper_orders, list_paper_trade_events, list_paper_trades, sync_recommendation
@pytest.fixture
@ -124,6 +124,255 @@ def test_observation_does_not_open_paper_trade(monkeypatch):
assert list_paper_trades()["total"] == 0
def test_wait_pullback_creates_pending_paper_order(monkeypatch):
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
monkeypatch.setenv("ALPHAX_PAPER_TRADE_NOTIONAL_USDT", "100")
monkeypatch.setenv("ALPHAX_PAPER_TRADE_FEE_RATE", "0")
monkeypatch.setenv("ALPHAX_PAPER_TRADE_SLIPPAGE_PCT", "0")
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="WAIT/USDT",
rec_state="蓄力",
rec_score=22,
entry_price=95,
stop_loss=90,
tp1=105,
tp2=112,
signals=["等待回踩"],
entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105},
)
with altcoin_db.get_conn() as conn:
conn.execute(
"UPDATE recommendation SET execution_status='wait_pullback', action_status='等回踩', display_bucket='watch_pool' WHERE id=%s",
(rec_id,),
)
conn.commit()
rec = {"id": rec_id, "symbol": "WAIT/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "tp2": 112, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105}}
result = sync_recommendation(rec, 100, event_time="2026-05-16T10:00:00")
assert result["skipped"] is True
assert result["reason"] == "paper_order_created"
assert result["target_price"] == pytest.approx(95)
assert list_paper_trades()["total"] == 0
orders = list_paper_orders()["items"]
assert len(orders) == 1
assert orders[0]["status"] == "pending"
assert orders[0]["symbol"] == "WAIT/USDT"
def test_wait_pullback_paper_order_pushes_created_card(monkeypatch):
pushed = []
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
monkeypatch.setattr("app.db.paper_trading.push_card", lambda card: pushed.append(card) or (True, {"StatusCode": 0}))
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="PUSHORD/USDT",
rec_state="蓄力",
rec_score=22,
entry_price=95,
stop_loss=90,
tp1=105,
signals=["等待回踩"],
entry_plan={"entry_action": "等回踩", "entry_price": 95},
)
rec = {"id": rec_id, "symbol": "PUSHORD/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "entry_plan": {"entry_action": "等回踩", "entry_price": 95}}
sync_recommendation(rec, 100, event_time="2026-05-16T10:00:00")
assert pushed
card = pushed[0]
assert card["metadata"]["event_type"] == "paper_order_create"
assert "模拟挂单创建" in card["header"]["title"]["content"]
assert card["elements"][0]["tag"] == "column_set"
def test_wait_pullback_paper_order_fills_when_price_touches(monkeypatch):
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
monkeypatch.setenv("ALPHAX_PAPER_TRADE_NOTIONAL_USDT", "100")
monkeypatch.setenv("ALPHAX_PAPER_TRADE_FEE_RATE", "0")
monkeypatch.setenv("ALPHAX_PAPER_TRADE_SLIPPAGE_PCT", "0")
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="FILL/USDT",
rec_state="蓄力",
rec_score=22,
entry_price=95,
stop_loss=90,
tp1=105,
tp2=112,
signals=["等待回踩"],
entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105},
)
with altcoin_db.get_conn() as conn:
conn.execute(
"UPDATE recommendation SET execution_status='wait_pullback', action_status='等回踩', display_bucket='watch_pool' WHERE id=%s",
(rec_id,),
)
conn.commit()
rec = {"id": rec_id, "symbol": "FILL/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "tp2": 112, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105}}
created = sync_recommendation(rec, 100, event_time="2026-05-16T10:00:00")
filled = sync_recommendation(rec, 94.9, event_time="2026-05-16T10:05:00")
assert created["reason"] == "paper_order_created"
assert filled["opened"] is True
assert filled["paper_order"]["filled"] is True
trade = list_paper_trades()["items"][0]
assert trade["entry_price"] == pytest.approx(95)
order = list_paper_orders(status="filled")["items"][0]
assert order["fill_price"] == pytest.approx(95)
def test_wait_pullback_order_cancels_when_recommendation_invalid(monkeypatch):
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="CANCEL/USDT",
rec_state="蓄力",
rec_score=22,
entry_price=95,
stop_loss=90,
tp1=105,
signals=["等待回踩"],
entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105},
)
rec = {"id": rec_id, "symbol": "CANCEL/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105}}
sync_recommendation(rec, 100, event_time="2026-05-16T10:00:00")
with altcoin_db.get_conn() as conn:
conn.execute("UPDATE recommendation SET status='invalid', execution_status='invalid' WHERE id=%s", (rec_id,))
conn.commit()
result = sync_recommendation(rec, 100, event_time="2026-05-16T10:05:00")
assert result["reason"] == "paper_order_recommendation_invalid"
order = list_paper_orders(status="canceled")["items"][0]
assert order["cancel_reason"] == "recommendation_invalid"
def test_wait_pullback_order_cancels_when_price_runs_too_far(monkeypatch):
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="FAR/USDT",
rec_state="蓄力",
rec_score=22,
entry_price=95,
stop_loss=90,
tp1=105,
signals=["等待回踩"],
entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105},
)
rec = {"id": rec_id, "symbol": "FAR/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105}}
sync_recommendation(rec, 100, event_time="2026-05-16T10:00:00")
result = sync_recommendation(rec, 108, event_time="2026-05-16T10:05:00")
assert result["reason"] == "paper_order_too_far_from_entry"
order = list_paper_orders(status="canceled")["items"][0]
assert order["cancel_reason"] == "too_far_from_entry"
assert list_paper_trades()["total"] == 0
def test_wait_pullback_order_fills_before_same_tick_stop_loss(monkeypatch):
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
monkeypatch.setenv("ALPHAX_PAPER_TRADE_FEE_RATE", "0")
monkeypatch.setenv("ALPHAX_PAPER_TRADE_SLIPPAGE_PCT", "0")
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="GAP/USDT",
rec_state="蓄力",
rec_score=22,
entry_price=95,
stop_loss=90,
tp1=105,
signals=["等待回踩"],
entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105},
)
rec = {"id": rec_id, "symbol": "GAP/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105}}
sync_recommendation(rec, 100, event_time="2026-05-16T10:00:00")
result = sync_recommendation(rec, 89, event_time="2026-05-16T10:05:00")
assert result["opened"] is True
assert result["closed"] is True
assert result["same_tick_stop_loss"] is True
assert result["exit_reason"] == "stop_loss_same_tick"
order = list_paper_orders(status="filled")["items"][0]
assert order["fill_price"] == pytest.approx(95)
trade = list_paper_trades()["items"][0]
assert trade["status"] == "closed"
assert trade["entry_price"] == pytest.approx(95)
assert trade["exit_price"] == pytest.approx(89)
def test_wait_pullback_paper_order_fill_pushes_single_combined_card(monkeypatch):
pushed = []
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
monkeypatch.setenv("ALPHAX_PAPER_TRADE_FEE_RATE", "0")
monkeypatch.setenv("ALPHAX_PAPER_TRADE_SLIPPAGE_PCT", "0")
monkeypatch.setattr("app.db.paper_trading.push_card", lambda card: pushed.append(card) or (True, {"StatusCode": 0}))
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="FILLPUSH/USDT",
rec_state="蓄力",
rec_score=22,
entry_price=95,
stop_loss=90,
tp1=105,
signals=["等待回踩"],
entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105},
)
rec = {"id": rec_id, "symbol": "FILLPUSH/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105}}
sync_recommendation(rec, 100, event_time="2026-05-16T10:00:00")
sync_recommendation(rec, 94.9, event_time="2026-05-16T10:05:00")
event_types = [card["metadata"]["event_type"] for card in pushed]
assert event_types == ["paper_order_create", "paper_order_fill"]
assert "模拟挂单成交并开仓" in pushed[1]["header"]["title"]["content"]
assert "open" not in event_types
def test_paper_trade_open_push_card_is_structured(monkeypatch, buy_now_rec):
pushed = []
monkeypatch.setattr("app.db.paper_trading.push_card", lambda card: pushed.append(card) or (True, {"StatusCode": 0}))
sync_recommendation(buy_now_rec, 100, event_time="2026-05-16T10:00:00")
assert pushed
card = pushed[0]
assert card["metadata"]["event_type"] == "open"
assert "模拟交易开仓" in card["header"]["title"]["content"]
assert card["elements"][0]["tag"] == "column_set"
def test_summary_counts_pending_paper_orders(monkeypatch):
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="COUNT/USDT",
rec_state="蓄力",
rec_score=22,
entry_price=95,
stop_loss=90,
tp1=105,
signals=["等待回踩"],
entry_plan={"entry_action": "等回踩", "entry_price": 95},
)
with altcoin_db.get_conn() as conn:
conn.execute(
"UPDATE recommendation SET execution_status='wait_pullback', action_status='等回踩', display_bucket='watch_pool' WHERE id=%s",
(rec_id,),
)
conn.commit()
rec = {"id": rec_id, "symbol": "COUNT/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "entry_plan": {"entry_action": "等回踩", "entry_price": 95}}
sync_recommendation(rec, 100, event_time="2026-05-16T10:00:00")
assert get_paper_trading_summary(days=30)["pending_order_count"] == 1
def test_open_paper_trade_closes_on_tp1_and_summary_counts_win(buy_now_rec):
sync_recommendation(buy_now_rec, 100, event_time="2026-05-16T10:00:00")
result = sync_recommendation(buy_now_rec, 106, event_time="2026-05-16T10:05:00")

View File

@ -1,7 +1,7 @@
import pytest
from app.db import altcoin_db
from app.db.paper_trading import list_paper_trades, sync_recommendation
from app.db.paper_trading import list_paper_orders, list_paper_trades, sync_recommendation
from app.db.runtime_config_db import set_config
from app.services import price_streamer
@ -93,6 +93,51 @@ def test_price_streamer_tracks_open_paper_trade_without_active_rec(buy_now_rec):
assert targets["WS/USDT"]["id"] == buy_now_rec["id"]
def test_price_streamer_fills_pending_paper_order():
set_config("system", "paper_trading", {
"enabled": True,
"trade_notional_usdt": 5000,
"trade_leverage": 5,
"fee_rate": 0,
"slippage_pct": 0,
})
set_config("system", "price_streamer", {
"enabled": True,
"update_latest_price_cache": True,
"sync_paper_trading": True,
"include_actionable_recommendations": True,
"include_open_paper_trades": True,
})
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="PBO/USDT",
rec_state="蓄力",
rec_score=22,
entry_price=95,
stop_loss=90,
tp1=105,
signals=["等待回踩"],
entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105},
)
with altcoin_db.get_conn() as conn:
conn.execute(
"UPDATE recommendation SET execution_status='wait_pullback', action_status='等回踩', display_bucket='watch_pool' WHERE id=%s",
(rec_id,),
)
conn.commit()
rec = {"id": rec_id, "symbol": "PBO/USDT", "execution_status": "wait_pullback", "action_status": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105}}
created = price_streamer.handle_price_tick("PBO/USDT", 100, {"PBO/USDT": rec}, event_time="2026-05-16T10:00:00")
targets = price_streamer.load_stream_targets()
filled = price_streamer.handle_price_tick("PBO/USDT", 94.9, targets, event_time="2026-05-16T10:05:00")
assert created["paper_trading"]["reason"] == "paper_order_created"
assert "PBO/USDT" in targets
assert filled["paper_trading"]["opened"] is True
assert list_paper_orders(status="filled")["total"] == 1
assert list_paper_trades()["items"][0]["entry_price"] == pytest.approx(95)
def test_price_streamer_builds_binance_combined_stream_url():
url = price_streamer._stream_url(["BTC/USDT", "ETH/USDT"], {"stream_url": "wss://example.test/stream"})