alphax/app/db/paper_trading.py
2026-06-04 10:11:35 +08:00

2330 lines
99 KiB
Python

"""Paper trading ledger for separating signal quality from trade PnL."""
from __future__ import annotations
import json
import os
from datetime import datetime, timedelta
from app.config.system_config import paper_trading_config
from app.core.global_risk import evaluate_global_risk
from app.core.order_lifecycle import (
evaluate_limit_order,
order_distance_pct,
order_expires_at,
order_rr,
)
from app.core.position_health import evaluate_position_health
from app.core.trailing_stop import evaluate_trailing_stop, normalize_trailing_config
from app.core.trade_math import (
close_price as side_close_price,
normalize_side,
open_price as side_open_price,
pnl_pct as side_pnl_pct,
should_hit_trailing_stop,
should_stop_loss,
should_take_profit,
stop_loss_distance_pct as side_stop_loss_distance_pct,
tighter_stop,
)
from app.core.strategy_registry import normalize_strategy_code, strategy_label, strategy_paper_config
from app.db.schema import get_conn
from app.db.system_logs import record_system_error
from app.integrations.feishu_push import push_card
def _now() -> str:
return datetime.now().isoformat()
def _safe_float(value, default: float = 0.0) -> float:
try:
if value is None or value == "":
return default
return float(value)
except Exception:
return default
def _safe_int(value, default: int = 0) -> int:
try:
return int(value or 0)
except Exception:
return default
def paper_trading_enabled() -> bool:
return bool(paper_trading_config().get("enabled", True))
def _paper_cfg(config: dict | None = None) -> dict:
return config if isinstance(config, dict) else paper_trading_config()
def default_account_equity_usdt(config: dict | None = None) -> float:
return max(1.0, _safe_float(_paper_cfg(config).get("account_equity_usdt"), 20000.0))
def default_leverage(config: dict | None = None) -> float:
return max(1.0, _safe_float(_paper_cfg(config).get("trade_leverage"), 5.0))
def default_notional_usdt(config: dict | None = None) -> float:
return max(1.0, _safe_float(_paper_cfg(config).get("trade_notional_usdt"), 5000.0))
def default_margin_usdt(config: dict | None = None) -> float:
cfg = _paper_cfg(config)
return round(default_notional_usdt(cfg) / default_leverage(cfg), 8)
def default_fee_rate(config: dict | None = None) -> float:
return max(0.0, _safe_float(_paper_cfg(config).get("fee_rate"), 0.001))
def default_slippage_pct(config: dict | None = None) -> float:
return max(0.0, _safe_float(_paper_cfg(config).get("slippage_pct"), 0.05))
def max_cumulative_leverage(config: dict | None = None) -> float:
return max(0.0, _safe_float(_paper_cfg(config).get("max_cumulative_leverage"), 5.0))
def _cumulative_leverage_check(conn, additional_notional: float, config: dict | None = None, exclude_rec_id: int = 0) -> tuple[bool, dict]:
cfg = _paper_cfg(config)
equity = default_account_equity_usdt(cfg)
cap = max_cumulative_leverage(cfg)
if cap <= 0:
return True, {"max_cumulative_leverage": cap, "disabled": True}
exclude_rec_id = _safe_int(exclude_rec_id)
open_params = []
pending_params = []
open_where = "status='open'"
pending_where = "status='pending'"
if exclude_rec_id > 0:
open_where += " AND recommendation_id<>%s"
pending_where += " AND recommendation_id<>%s"
open_params.append(exclude_rec_id)
pending_params.append(exclude_rec_id)
open_notional = _safe_float(conn.execute(f"SELECT COALESCE(SUM(notional_usdt),0) FROM paper_trades WHERE {open_where}", tuple(open_params)).fetchone()[0])
pending_notional = _safe_float(conn.execute(f"SELECT COALESCE(SUM(notional_usdt),0) FROM paper_orders WHERE {pending_where}", tuple(pending_params)).fetchone()[0])
add = max(0.0, _safe_float(additional_notional))
projected_notional = open_notional + pending_notional + add
projected_leverage = projected_notional / equity if equity > 0 else 0
detail = {
"account_equity_usdt": equity,
"open_notional_usdt": round(open_notional, 8),
"pending_notional_usdt": round(pending_notional, 8),
"additional_notional_usdt": round(add, 8),
"projected_notional_usdt": round(projected_notional, 8),
"projected_cumulative_leverage": round(projected_leverage, 6),
"max_cumulative_leverage": cap,
}
return projected_leverage <= cap + 1e-12, detail
def _portfolio_drawdown_check(conn, additional_notional: float, config: dict | None = None) -> tuple[bool, dict]:
cfg = _paper_cfg(config)
cap = max(0.0, _safe_float(cfg.get("max_account_drawdown_pause_pct"), 0))
if cap <= 0:
return True, {"disabled": True, "max_account_drawdown_pause_pct": cap}
equity = default_account_equity_usdt(cfg)
open_rows = conn.execute("SELECT notional_usdt, pnl_pct FROM paper_trades WHERE status='open'").fetchall()
unrealized = sum(_safe_float(r["notional_usdt"]) * _safe_float(r["pnl_pct"]) / 100 for r in open_rows)
drawdown_pct = abs(min(0.0, unrealized)) / equity * 100 if equity > 0 else 0
detail = {
"account_equity_usdt": equity,
"open_unrealized_pnl_usdt": round(unrealized, 8),
"open_drawdown_pct": round(drawdown_pct, 6),
"max_account_drawdown_pause_pct": cap,
"additional_notional_usdt": round(max(0.0, _safe_float(additional_notional)), 8),
}
return drawdown_pct <= cap + 1e-12, detail
def _weak_entries_check(conn, event_time: str, config: dict | None = None) -> tuple[bool, dict]:
cfg = _paper_cfg(config)
limit = max(0, _safe_int(cfg.get("pause_after_weak_entries"), 0))
if limit <= 0:
return True, {"disabled": True, "pause_after_weak_entries": limit}
window_hours = max(0.1, _safe_float(cfg.get("weak_entry_window_hours"), 6.0))
threshold = max(0.0, _safe_float(cfg.get("weak_entry_min_max_pnl_pct"), 1.0))
now = _parse_time(event_time or _now()) or datetime.now()
since = (now - timedelta(hours=window_hours)).isoformat()
rows = conn.execute(
"""
SELECT symbol, side, opened_at, entry_price, max_price, min_price
FROM paper_trades
WHERE opened_at >= %s
ORDER BY opened_at DESC, id DESC
LIMIT %s
""",
(since, limit),
).fetchall()
samples = []
for row in rows:
side = normalize_side(row.get("side"))
entry = _safe_float(row["entry_price"])
if side == "short":
best_price = min(_safe_float(row.get("min_price")) or entry, entry)
max_pnl = (entry / best_price - 1) * 100 if entry > 0 and best_price > 0 else 0
else:
best_price = max(_safe_float(row.get("max_price")) or entry, entry)
max_pnl = (best_price / entry - 1) * 100 if entry > 0 else 0
samples.append({"symbol": row["symbol"], "side": side, "opened_at": row["opened_at"], "max_pnl_pct": round(max_pnl, 6)})
enough = len(samples) >= limit
all_weak = enough and all(x["max_pnl_pct"] < threshold for x in samples)
detail = {
"pause_after_weak_entries": limit,
"weak_entry_window_hours": window_hours,
"weak_entry_min_max_pnl_pct": threshold,
"samples": samples,
}
return not all_weak, detail
def _portfolio_entry_pause_check(conn, additional_notional: float, event_time: str, config: dict | None = None) -> tuple[bool, str, dict]:
drawdown_ok, drawdown = _portfolio_drawdown_check(conn, additional_notional, config)
if not drawdown_ok:
return False, "portfolio_drawdown_pause", {"drawdown": drawdown}
weak_ok, weak = _weak_entries_check(conn, event_time, config)
if not weak_ok:
return False, "weak_entries_pause", {"weak_entries": weak}
return True, "", {"drawdown": drawdown, "weak_entries": weak}
def _global_risk_entry_check(conn, rec: dict, additional_notional: float, config: dict | None = None) -> tuple[bool, dict]:
detail = evaluate_global_risk(
conn=conn,
config=_paper_cfg(config),
rec=rec,
additional_notional=additional_notional,
)
return bool(detail.get("allow_new_entries", True)), detail
def _market_risk_adjusted_notional(base_notional: float, risk_detail: dict | None, config: dict | None = None) -> float:
cfg = _paper_cfg(config)
detail = risk_detail if isinstance(risk_detail, dict) else {}
multiplier = _safe_float(detail.get("position_multiplier"), 1.0)
if multiplier <= 0:
multiplier = 1.0
adjusted = _safe_float(base_notional) * multiplier
min_notional = max(1.0, _safe_float(cfg.get("min_trade_notional_usdt"), 1.0))
return round(max(min_notional, adjusted), 8)
def _trailing_config() -> dict:
cfg = paper_trading_config()
normalized = normalize_trailing_config(cfg)
normalized["move_push_min_interval_seconds"] = max(0, _safe_int(cfg.get("trailing_move_push_min_interval_seconds"), 300))
normalized["move_push_min_step_pct"] = max(0.0, _safe_float(cfg.get("trailing_move_push_min_step_pct"), 2.0))
return normalized
def _parse_time(value: str) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(str(value).replace("Z", "+00:00"))
except Exception:
return None
def _should_emit_trailing_move(conn, trade: dict, new_trail: float, event_time: str, cfg: dict) -> bool:
last = conn.execute(
"""
SELECT event_time, price
FROM paper_trade_events
WHERE trade_id=%s AND event_type IN ('trailing_activate','trailing_move')
ORDER BY event_time DESC, id DESC
LIMIT 1
""",
(trade["id"],),
).fetchone()
if not last:
return True
min_interval = _safe_int(cfg.get("move_push_min_interval_seconds"), 300)
if min_interval > 0:
current_ts = _parse_time(event_time or _now())
last_ts = _parse_time(last["event_time"])
if current_ts and last_ts and (current_ts - last_ts).total_seconds() >= min_interval:
return True
last_trail = _safe_float(last["price"])
min_step_pct = _safe_float(cfg.get("move_push_min_step_pct"), 2.0)
if min_step_pct <= 0 or last_trail <= 0:
return True
return (new_trail / last_trail - 1) * 100 >= min_step_pct
def _loads_json(value, fallback=None):
try:
if isinstance(value, str) and value.strip():
return json.loads(value)
if value:
return value
except Exception:
pass
return fallback if fallback is not None else {}
def _entry_plan(rec: dict) -> dict:
plan = rec.get("entry_plan")
if isinstance(plan, dict):
return plan
return _loads_json(rec.get("entry_plan_json"), {})
def _strategy_code_from_rec(rec: dict) -> str:
plan = _entry_plan(rec)
return normalize_strategy_code(rec.get("strategy_code") or plan.get("strategy_code"))
def _paper_cfg_for_rec(rec: dict, config: dict | None = None) -> dict:
cfg = dict(_paper_cfg(config) or {})
cfg.update(strategy_paper_config(_strategy_code_from_rec(rec)))
return cfg
def _strategy_lineage_from_rec(rec: dict) -> dict:
code = _strategy_code_from_rec(rec)
signal_id = _safe_int(rec.get("strategy_signal_id"))
snapshot = _loads_json(rec.get("strategy_snapshot_json"), {})
roles = _loads_json(rec.get("factor_roles_json"), {})
if not snapshot:
snapshot = {
"strategy_code": code,
"strategy_name": strategy_label(code),
"strategy_version": rec.get("strategy_version") or "",
"symbol": rec.get("symbol") or "",
"source": "recommendation_compat",
}
snapshot.setdefault("strategy_code", code)
snapshot.setdefault("strategy_name", strategy_label(code))
return {
"strategy_code": code,
"strategy_name": strategy_label(code),
"strategy_signal_id": signal_id,
"strategy_snapshot": snapshot,
"factor_roles": roles if isinstance(roles, dict) else {},
"strategy_snapshot_json": json.dumps(snapshot, ensure_ascii=False, default=str),
"factor_roles_json": json.dumps(roles if isinstance(roles, dict) else {}, ensure_ascii=False, default=str),
}
def _rec_with_order_snapshot(rec: dict, order: dict, fill_price: float | None = None) -> dict:
"""Use the executable order snapshot as the trade-time source of truth."""
merged = dict(rec or {})
plan = _entry_plan(merged)
order_plan = _loads_json((order or {}).get("entry_plan_snapshot_json"), {})
if isinstance(order_plan, dict) and order_plan:
plan.update(order_plan)
side = normalize_side((order or {}).get("side") or plan.get("side") or merged.get("side"))
entry_price = _safe_float(fill_price if fill_price is not None else (order or {}).get("target_price"))
plan.update({
"side": side,
"entry_price": entry_price or _safe_float(plan.get("entry_price") or merged.get("entry_price")),
"stop_loss": _safe_float((order or {}).get("stop_loss") or plan.get("stop_loss") or merged.get("stop_loss")),
"tp1": _safe_float((order or {}).get("tp1") or plan.get("tp1") or plan.get("take_profit_1") or merged.get("tp1")),
"tp2": _safe_float((order or {}).get("tp2") or plan.get("tp2") or plan.get("take_profit_2") or merged.get("tp2")),
})
merged.update({
"side": side,
"entry_plan": plan,
"entry_price": plan["entry_price"],
"stop_loss": plan["stop_loss"],
"tp1": plan["tp1"],
"tp2": plan["tp2"],
"strategy_version": (order or {}).get("strategy_version") or merged.get("strategy_version"),
"strategy_code": (order or {}).get("strategy_code") or merged.get("strategy_code"),
"strategy_signal_id": (order or {}).get("strategy_signal_id") or merged.get("strategy_signal_id"),
"strategy_snapshot_json": (order or {}).get("strategy_snapshot_json") or merged.get("strategy_snapshot_json"),
"factor_roles_json": (order or {}).get("factor_roles_json") or merged.get("factor_roles_json"),
"paper_order_id": (order or {}).get("id"),
"exclude_order_id": (order or {}).get("id"),
})
return merged
def _strategy_lineage_from_trade_or_order(item: dict) -> dict:
code = normalize_strategy_code(item.get("strategy_code"))
snapshot = _loads_json(item.get("strategy_snapshot_json"), {})
roles = _loads_json(item.get("factor_roles_json"), {})
return {
"strategy_code": code,
"strategy_name": strategy_label(code),
"strategy_signal_id": _safe_int(item.get("strategy_signal_id")),
"strategy_snapshot": snapshot if isinstance(snapshot, dict) else {},
"factor_roles": roles if isinstance(roles, dict) else {},
}
def _parse_time(value: str):
try:
return datetime.fromisoformat(str(value or ""))
except Exception:
return None
def _open_price(current_price: float, config: dict | None = None, side: str = "long") -> float:
return side_open_price(side, current_price, default_slippage_pct(config))
def _close_price(current_price: float, config: dict | None = None, side: str = "long") -> float:
return side_close_price(side, current_price, default_slippage_pct(config))
def _trade_pnl_pct(entry_price: float, current_price: float, side: str = "long") -> float:
return side_pnl_pct(side, entry_price, current_price)
def _stop_loss_distance_pct(side: str, entry_price: float, stop_loss: float) -> float:
return side_stop_loss_distance_pct(side, entry_price, stop_loss)
def _stop_loss_leverage_risk_pct(side: str, entry_price: float, stop_loss: float, leverage: float) -> float:
return round(_stop_loss_distance_pct(side, entry_price, stop_loss) * max(1.0, _safe_float(leverage, 1.0)), 6)
def _risk_adjusted_leverage(side: str, entry_price: float, stop_loss: float, leverage: float, config: dict | None = None) -> tuple[float, dict]:
cfg = _paper_cfg(config)
base_leverage = max(1.0, _safe_float(leverage, 1.0))
max_sl_risk = max(0.0, _safe_float(cfg.get("max_stop_loss_leverage_risk_pct"), 0))
distance_pct = _stop_loss_distance_pct(side, entry_price, stop_loss)
original_risk = round(distance_pct * base_leverage, 6)
detail = {
"dynamic_leverage_enabled": bool(cfg.get("dynamic_leverage_enabled", True)),
"base_leverage": base_leverage,
"leverage": base_leverage,
"stop_loss_distance_pct": distance_pct,
"stop_loss_leverage_risk_pct": original_risk,
"max_stop_loss_leverage_risk_pct": max_sl_risk,
"adjusted": False,
}
if max_sl_risk <= 0 or distance_pct <= 0 or original_risk <= max_sl_risk + 1e-12:
return base_leverage, detail
if not bool(cfg.get("dynamic_leverage_enabled", True)):
return base_leverage, detail
allowed_leverage = max_sl_risk / distance_pct
min_leverage = max(1.0, _safe_float(cfg.get("dynamic_leverage_min"), 3.0))
detail["allowed_leverage"] = round(allowed_leverage, 6)
detail["min_dynamic_leverage"] = min_leverage
if allowed_leverage + 1e-12 < min_leverage:
return base_leverage, detail
adjusted = round(min(base_leverage, allowed_leverage), 4)
detail.update({
"leverage": adjusted,
"stop_loss_leverage_risk_pct": round(distance_pct * adjusted, 6),
"adjusted": adjusted < base_leverage,
})
return adjusted, detail
def _trade_rr(side: str, entry_price: float, stop_loss: float, tp1: float) -> float:
return _paper_order_rr(side, entry_price, stop_loss, tp1)
def _account_return_pct(pnl_usdt: float, account_equity: float | None = None, config: dict | None = None) -> float:
equity = max(1.0, _safe_float(account_equity, default_account_equity_usdt(config)))
return round(_safe_float(pnl_usdt) / equity * 100, 4)
def _margin_roi_pct(pnl_usdt: float, margin_usdt: float, config: dict | None = None) -> float:
margin = max(1.0, _safe_float(margin_usdt, default_margin_usdt(config)))
return round(_safe_float(pnl_usdt) / margin * 100, 4)
def _trade_margin(trade: dict, config: dict | None = None) -> float:
margin = _safe_float(trade.get("margin_usdt"))
if margin > 0:
return margin
leverage = max(1.0, _safe_float(trade.get("leverage"), default_leverage(config)))
return round(_safe_float(trade.get("notional_usdt")) / leverage, 8)
def _decorate_trade(trade: dict, config: dict | None = None) -> dict:
cfg = _paper_cfg(config)
item = dict(trade)
notional = _safe_float(item.get("notional_usdt"), default_notional_usdt(cfg))
leverage = max(1.0, _safe_float(item.get("leverage"), default_leverage(cfg)))
margin = _trade_margin({"margin_usdt": item.get("margin_usdt"), "notional_usdt": notional, "leverage": leverage}, cfg)
unrealized = round(notional * _safe_float(item.get("pnl_pct")) / 100, 8)
realized = _safe_float(item.get("realized_pnl_usdt"))
effective_pnl = realized if item.get("status") == "closed" else unrealized
item["notional_usdt"] = notional
item["leverage"] = leverage
item["margin_usdt"] = margin
item["unrealized_pnl_usdt"] = unrealized
item["margin_roi_pct"] = _margin_roi_pct(effective_pnl, margin, cfg)
item["account_return_pct"] = _account_return_pct(effective_pnl, config=cfg)
item["account_equity_usdt"] = default_account_equity_usdt(cfg)
latest_market = _safe_float(item.get("latest_market_price"))
item["latest_price"] = latest_market if latest_market > 0 else _safe_float(item.get("current_price"))
item["latest_price_updated_at"] = item.get("latest_market_price_updated_at") or item.get("updated_at") or ""
item.update(_strategy_lineage_from_trade_or_order(item))
return item
def _record_event(conn, trade_id: int, rec_id: int, symbol: str, event_type: str, price: float, pnl_pct: float, message: str, detail=None, event_time: str = ""):
detail = dict(detail or {})
strategy_code = str(detail.get("strategy_code") or "").strip()
strategy_signal_id = _safe_int(detail.get("strategy_signal_id"))
if (not strategy_code or strategy_signal_id <= 0) and _safe_int(trade_id) > 0:
try:
row = conn.execute(
"SELECT strategy_code, strategy_signal_id FROM paper_trades WHERE id=%s",
(_safe_int(trade_id),),
).fetchone()
if row:
strategy_code = strategy_code or row.get("strategy_code") or ""
strategy_signal_id = strategy_signal_id or _safe_int(row.get("strategy_signal_id"))
except Exception:
pass
strategy_code = normalize_strategy_code(strategy_code)
detail.setdefault("strategy_code", strategy_code)
detail.setdefault("strategy_name", strategy_label(strategy_code))
conn.execute(
"""
INSERT INTO paper_trade_events (
trade_id, recommendation_id, symbol, event_type, event_time,
price, pnl_pct, message, detail_json, strategy_code, strategy_signal_id
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""",
(
trade_id,
rec_id,
symbol,
event_type,
event_time or _now(),
price,
pnl_pct,
message,
json.dumps(detail, ensure_ascii=False, default=str),
strategy_code,
strategy_signal_id,
),
)
def _fmt_price(value) -> str:
price = _safe_float(value)
if price <= 0:
return "--"
return f"${price:.8g}"
def _fmt_pct(value) -> str:
pct = _safe_float(value)
sign = "+" if pct > 0 else ""
return f"{sign}{pct:.2f}%"
def _side_label(side: str) -> str:
return "" if normalize_side(side) == "short" else ""
def _side_action(side: str, long_text: str, short_text: str) -> str:
return short_text if normalize_side(side) == "short" else long_text
def _card_field(label: str, value) -> dict:
return {
"tag": "div",
"text": {"tag": "lark_md", "content": f"**{label}**\n{value}"},
}
def _card_note(content: str) -> dict:
return {"tag": "note", "elements": [{"tag": "plain_text", "content": content}]}
def _card_md(content: str) -> dict:
return {"tag": "div", "text": {"tag": "lark_md", "content": content}}
def _push_paper_card(event_type: str, symbol: str, title: str, template: str, fields: list[tuple[str, str]], note: str = "", event_time: str = "") -> None:
try:
elements = []
if fields:
elements.append({"tag": "column_set", "flex_mode": "none", "background_style": "default", "columns": [
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field(label, value)]}
for label, value in fields
]})
if note:
elements.append(_card_note(note))
if event_time:
elements.append(_card_note(f"时间: {event_time}"))
ok, result = push_card({
"metadata": {"source": "paper_trading", "event_type": event_type, "symbol": symbol},
"config": {"wide_screen_mode": True},
"header": {
"template": template,
"title": {"tag": "plain_text", "content": title},
},
"elements": elements,
})
if not ok:
record_system_error(
source="paper_trading",
level="warning",
error_type="FeishuPushFailed",
message=f"Feishu push failed for {event_type} {symbol}: {str(result)[:500]}",
status_code=0,
context={"event_type": event_type, "symbol": symbol, "push_result": result},
fingerprint=f"paper_trading_feishu_push_failed:{event_type}:{symbol}",
)
except Exception as exc:
record_system_error(
source="paper_trading",
level="warning",
error_type=exc.__class__.__name__,
message=f"Feishu push exception for {event_type} {symbol}: {str(exc)[:500]}",
status_code=0,
context={"event_type": event_type, "symbol": symbol},
fingerprint=f"paper_trading_feishu_push_exception:{event_type}:{symbol}",
)
def _push_custom_paper_card(card: dict) -> tuple[bool, object]:
try:
return push_card(card)
except Exception as exc:
return False, str(exc)
def _push_event_card(event_type: str, trade: dict, result: dict, event_time: str = "") -> None:
symbol = str(trade.get("symbol") or "")
short_symbol = symbol.replace("/USDT", "")
side = normalize_side(result.get("side") or trade.get("side"))
side_label = _side_label(side)
if event_type == "open":
_push_paper_card(
event_type,
symbol,
f"交易开仓({side_label}) - {short_symbol}",
"blue",
[
("方向", side_label),
("成交价", _fmt_price(result.get("entry_price"))),
("名义仓位", f"{_safe_float(result.get('notional_usdt')):.2f} USDT"),
("杠杆/保证金", f"{_safe_float(result.get('leverage')):.1f}x / {_safe_float(result.get('margin_usdt')):.2f} USDT"),
],
"策略信号已进入持仓跟踪。",
event_time,
)
return
if event_type == "close":
exit_reason = str(result.get("exit_reason") or "--")
title_prefix = "移动止盈成交平仓" if exit_reason == "trailing_stop" else "交易平仓"
_push_paper_card(
event_type,
symbol,
f"{title_prefix}({side_label}) - {short_symbol}",
"red" if _safe_float(result.get("pnl_usdt")) < 0 else "green",
[
("方向", side_label),
("退出价", _fmt_price(result.get("exit_price"))),
("收益率", _fmt_pct(result.get("pnl_pct"))),
("收益额", f"{_safe_float(result.get('pnl_usdt')):.2f} USDT"),
("原因", exit_reason),
],
"收益以交易账本记录为准。",
event_time,
)
return
if event_type.startswith("trailing"):
_push_paper_card(
event_type,
symbol,
f"移动止盈{'启动' if event_type == 'trailing_activate' else '调整'}({side_label}) - {short_symbol}",
"yellow",
[
("方向", side_label),
("保护价", _fmt_price(result.get("trailing_stop"))),
("当前收益", _fmt_pct(result.get("pnl_pct"))),
("动作", "启动保护" if event_type == "trailing_activate" else _side_action(side, "上移保护价", "下移保护价")),
],
"移动止盈用于锁定浮盈。",
event_time,
)
def _push_order_created_card(order: dict, event_time: str = "") -> None:
symbol = str(order.get("symbol") or "")
side = normalize_side(order.get("side"))
side_label = _side_label(side)
target = _safe_float(order.get("target_price"))
current = _safe_float(order.get("current_price_at_create"))
distance = order_distance_pct(side, current, target) if target and current else 0
_push_paper_card(
"paper_order_create",
symbol,
f"挂单创建({side_label}) - {symbol.replace('/USDT', '')}",
"wathet",
[
("方向", side_label),
("目标价", _fmt_price(target)),
("当前价", _fmt_price(current)),
("距目标", _fmt_pct(distance)),
("有效期", order.get("expires_at") or "--"),
],
_side_action(side, "等回踩机会已进入挂单,触价后进入持仓。", "等反抽机会已进入挂单,触价后进入持仓。"),
event_time,
)
def _push_order_filled_card(order: dict, result: dict, event_time: str = "") -> None:
symbol = str(order.get("symbol") or "")
side = normalize_side(result.get("side") or order.get("side"))
side_label = _side_label(side)
_push_paper_card(
"paper_order_fill",
symbol,
f"挂单成交并开仓({side_label}) - {symbol.replace('/USDT', '')}",
"green",
[
("方向", side_label),
("挂单价", _fmt_price(order.get("target_price"))),
("成交价", _fmt_price(result.get("entry_price") or order.get("fill_price"))),
("名义仓位", f"{_safe_float(result.get('notional_usdt')):.2f} USDT"),
("来源", order.get("source_status") or "wait_pullback"),
],
_side_action(side, "价格触达理想回踩位,挂单已转为持仓。", "价格触达理想反抽位,挂单已转为持仓。"),
event_time,
)
def _open_trade(conn, rec: dict, current_price: float, event_time: str, config: dict | None = None, push_open_card: bool = True) -> dict:
cfg = _paper_cfg_for_rec(rec, config)
rec_id = _safe_int(rec.get("id"))
symbol = str(rec.get("symbol") or "").strip().upper()
plan = _entry_plan(rec)
side = normalize_side(plan.get("side") or rec.get("side"))
entry_price = _open_price(current_price, cfg, side)
notional = default_notional_usdt(cfg)
leverage = default_leverage(cfg)
stop_loss = _safe_float(plan.get("stop_loss") or rec.get("stop_loss"))
tp1 = _safe_float(plan.get("tp1") or plan.get("take_profit_1") or rec.get("tp1"))
rec_score = _safe_float(rec.get("rec_score") or rec.get("score"))
if rec_score <= 0 and rec_id > 0:
row = conn.execute("SELECT rec_score FROM recommendation WHERE id=%s", (rec_id,)).fetchone()
rec_score = _safe_float(row["rec_score"] if row else 0)
if bool(cfg.get("entry_gate_enabled", True)):
calc_rr = _trade_rr(side, entry_price, stop_loss, tp1)
rr_candidates = [
_safe_float(plan.get("rr1")),
_safe_float(plan.get("rr1_live")),
calc_rr,
]
rr = max([x for x in rr_candidates if x > 0], default=0.0)
min_rr = max(0.0, _safe_float(cfg.get("entry_min_rr"), 0))
min_score = max(0.0, _safe_float(cfg.get("entry_min_rec_score"), 0))
adjusted_leverage, leverage_risk = _risk_adjusted_leverage(side, entry_price, stop_loss, leverage, cfg)
sl_risk = _safe_float(leverage_risk.get("stop_loss_leverage_risk_pct"))
max_sl_risk = _safe_float(leverage_risk.get("max_stop_loss_leverage_risk_pct"))
leverage = adjusted_leverage
entry_reasons = []
if rec_score < min_score:
entry_reasons.append("rec_score_below_min")
if rr <= 0:
entry_reasons.append("missing_rr")
elif rr < min_rr:
entry_reasons.append("rr_below_min")
if max_sl_risk > 0 and sl_risk > max_sl_risk:
entry_reasons.append("stop_loss_leverage_risk_exceeded")
if entry_reasons:
return {
"opened": False,
"skipped": True,
"reason": "entry_gate_rejected",
"gate_reasons": entry_reasons,
"gate_detail": {
"rec_score": rec_score,
"min_rec_score": min_score,
"rr1": round(rr, 4) if rr > 0 else 0,
"min_rr": min_rr,
"stop_loss_leverage_risk_pct": sl_risk,
"max_stop_loss_leverage_risk_pct": max_sl_risk,
"entry_price": entry_price,
"stop_loss": stop_loss,
"tp1": tp1,
"leverage": leverage,
"leverage_risk": leverage_risk,
},
}
global_detail = rec.get("_prefilled_global_risk") if isinstance(rec.get("_prefilled_global_risk"), dict) else None
if global_detail is None:
global_ok, global_detail = _global_risk_entry_check(conn, rec, notional, cfg)
if not global_ok:
return {
"opened": False,
"skipped": True,
"reason": "global_risk_rejected",
"risk_detail": global_detail,
}
adjusted_notional = _market_risk_adjusted_notional(notional, global_detail, cfg)
if adjusted_notional != notional:
plan["market_position_sizing"] = {
"base_notional_usdt": notional,
"adjusted_notional_usdt": adjusted_notional,
"position_multiplier": global_detail.get("position_multiplier"),
"risk_level": global_detail.get("risk_level"),
"decision": global_detail.get("decision"),
}
notional = adjusted_notional
if "leverage_risk" in locals() and leverage_risk.get("adjusted"):
plan["leverage_sizing"] = leverage_risk
pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, notional, event_time, cfg)
if not pause_ok:
return {"opened": False, "skipped": True, "reason": pause_reason, "risk_detail": pause_detail}
leverage_ok, leverage_detail = _cumulative_leverage_check(conn, notional, cfg, exclude_rec_id=rec_id)
if not leverage_ok:
return {
"opened": False,
"skipped": True,
"reason": "cumulative_leverage_exceeded",
"risk_detail": leverage_detail,
}
margin = round(notional / leverage, 8) if leverage > 0 else default_margin_usdt(cfg)
qty = round(notional / entry_price, 12) if entry_price > 0 else 0
tp2 = _safe_float(rec.get("tp2") or plan.get("tp2") or plan.get("take_profit_2"))
fee = round(notional * default_fee_rate(cfg), 8)
now = event_time or _now()
lineage = _strategy_lineage_from_rec(rec)
row = conn.execute(
"""
INSERT INTO paper_trades (
recommendation_id, symbol, side, status, opened_at,
entry_price, qty, notional_usdt, margin_usdt, leverage, stop_loss, tp1, tp2,
max_price, min_price, current_price, pnl_pct, fee_usdt,
source_status, source_action, strategy_version, strategy_code, strategy_signal_id,
strategy_snapshot_json, factor_roles_json, created_at, updated_at
) VALUES (%s,%s,%s,'open',%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,0,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT(recommendation_id) DO NOTHING
RETURNING id
""",
(
rec_id,
symbol,
side,
now,
entry_price,
qty,
notional,
margin,
leverage,
stop_loss,
tp1,
tp2,
entry_price,
entry_price,
entry_price,
fee,
rec.get("execution_status") or "",
rec.get("action_status") or "",
rec.get("strategy_version") or "",
lineage["strategy_code"],
lineage["strategy_signal_id"],
lineage["strategy_snapshot_json"],
lineage["factor_roles_json"],
now,
now,
),
).fetchone()
if not row:
return {"opened": False, "reason": "already_exists"}
trade_id = row["id"]
_record_event(
conn,
trade_id,
rec_id,
symbol,
"open",
entry_price,
0.0,
"交易开仓:策略信号已进入持仓跟踪",
{
"notional_usdt": notional,
"margin_usdt": margin,
"leverage": leverage,
"qty": qty,
"fee_usdt": fee,
"slippage_pct": default_slippage_pct(cfg),
"source_status": rec.get("execution_status") or "",
"source_action": rec.get("action_status") or "",
"strategy_code": lineage["strategy_code"],
"strategy_name": lineage["strategy_name"],
"strategy_signal_id": lineage["strategy_signal_id"],
"strategy_snapshot": lineage["strategy_snapshot"],
"factor_roles": lineage["factor_roles"],
"market_regime": global_detail.get("market_regime") or _entry_plan(rec).get("market_regime") or {},
"global_risk": global_detail,
"score_components": _entry_plan(rec).get("score_components") or {},
"decision_log": _entry_plan(rec).get("decision_log") or {},
},
now,
)
result = {
"opened": True,
"trade_id": trade_id,
"side": side,
"entry_price": entry_price,
"qty": qty,
"notional_usdt": notional,
"margin_usdt": margin,
"leverage": leverage,
"global_risk": global_detail,
"market_regime": global_detail.get("market_regime") or _entry_plan(rec).get("market_regime") or {},
}
if push_open_card:
_push_event_card("open", {"symbol": symbol}, result, now)
return result
def _is_wait_pullback(rec: dict) -> bool:
plan = _entry_plan(rec)
execution_status = str(rec.get("execution_status") or "").strip()
action_status = str(rec.get("action_status") or "").strip()
entry_action = str(plan.get("entry_action") or "").strip()
return execution_status == "wait_pullback" or action_status == "等回踩" or entry_action == "等回踩"
def _paper_order_target_price(rec: dict) -> float:
plan = _entry_plan(rec)
return _safe_float(
plan.get("limit_price")
or plan.get("order_price")
or plan.get("entry_price")
or rec.get("entry_price")
)
def _paper_order_expires_at(event_time: str, config: dict | None = None) -> str:
cfg = _paper_cfg(config)
return order_expires_at(event_time, _safe_float(cfg.get("order_expire_hours"), 24.0))
def _paper_order_rr(side: str, target: float, stop_loss: float, tp1: float) -> float:
return order_rr(side, target, stop_loss, tp1)
def _paper_order_distance_pct(side: str, current_price: float, target: float) -> float:
return order_distance_pct(side, current_price, target)
def _paper_order_gate(rec: dict, current_price: float, config: dict | None = None, conn=None) -> tuple[bool, list[str], dict]:
cfg = _paper_cfg_for_rec(rec, config)
if not bool(cfg.get("order_gate_enabled", True)):
return True, [], {"gate_enabled": False}
plan = _entry_plan(rec)
side = str(plan.get("side") or rec.get("side") or "long").strip().lower() or "long"
target = _paper_order_target_price(rec)
stop_loss = _safe_float(plan.get("stop_loss") or rec.get("stop_loss"))
tp1 = _safe_float(plan.get("tp1") or plan.get("take_profit_1") or rec.get("tp1"))
rr = _safe_float(plan.get("rr1") or plan.get("rr1_live"))
calc_rr = _paper_order_rr(side, target, stop_loss, tp1)
# Wait-pullback orders must be judged at the intended limit price, not at
# the stale confirmation price. A buy-now RR can be invalid while the
# pullback target is perfectly tradeable.
effective_rr = max(rr, calc_rr)
min_rr = max(0.0, _safe_float(cfg.get("order_min_rr"), 1.2))
min_rec_score = max(0.0, _safe_float(cfg.get("order_min_rec_score"), 20.0))
min_distance = max(0.0, _safe_float(cfg.get("order_min_distance_to_entry_pct"), 0.0))
rec_score = _safe_float(rec.get("rec_score") or rec.get("score"))
if rec_score <= 0 and conn is not None and _safe_int(rec.get("id")) > 0:
row = conn.execute("SELECT rec_score FROM recommendation WHERE id=%s", (_safe_int(rec.get("id")),)).fetchone()
rec_score = _safe_float(row["rec_score"] if row else 0)
leverage_ok = True
leverage_detail = {}
if conn is not None:
leverage_ok, leverage_detail = _cumulative_leverage_check(
conn,
default_notional_usdt(cfg),
cfg,
exclude_rec_id=_safe_int(rec.get("id")),
)
distance_pct = _paper_order_distance_pct(side, current_price, target)
max_distance = max(0.0, _safe_float(cfg.get("order_max_distance_to_entry_pct"), 8.0))
opportunity_level = str(plan.get("opportunity_level") or rec.get("opportunity_level") or "").strip()
level_max_action = str(plan.get("max_action") or "").strip()
risk_reward_ok = plan.get("risk_reward_ok")
trigger_ok = plan.get("entry_trigger_confirmed") is True or _safe_int(rec.get("entry_triggered")) == 1
reasons = []
if target <= 0:
reasons.append("missing_target_price")
if stop_loss <= 0:
reasons.append("missing_stop_loss")
if tp1 <= 0:
reasons.append("missing_tp1")
if target > 0 and stop_loss > 0 and tp1 > 0 and calc_rr <= 0:
reasons.append("invalid_risk_geometry")
target_rr_confirms = calc_rr + 1e-9 >= min_rr
if risk_reward_ok is False and not target_rr_confirms:
reasons.append("risk_reward_rejected")
if bool(cfg.get("order_require_risk_reward_ok", True)) and risk_reward_ok is not True and not (
risk_reward_ok is False and target_rr_confirms
):
reasons.append("risk_reward_not_confirmed")
if rec_score < min_rec_score:
reasons.append("rec_score_below_min")
if not leverage_ok:
reasons.append("cumulative_leverage_exceeded")
if effective_rr > 0 and effective_rr < min_rr:
reasons.append("rr_below_min")
if effective_rr <= 0:
reasons.append("missing_rr")
if distance_pct < min_distance:
reasons.append("too_close_to_entry")
if distance_pct > max_distance:
reasons.append("too_far_from_entry")
if opportunity_level in {"momentum_watch", "theme_trend"} or level_max_action == "observe":
reasons.append("observe_only_opportunity")
if bool(cfg.get("order_require_current_trigger", False)) and not trigger_ok:
reasons.append("missing_current_trigger")
return not reasons, reasons, {
"target_price": target,
"stop_loss": stop_loss,
"tp1": tp1,
"rr1": round(effective_rr, 4) if effective_rr > 0 else 0,
"calc_rr1": round(calc_rr, 4) if calc_rr > 0 else 0,
"distance_to_entry_pct": round(distance_pct, 4),
"min_distance_to_entry_pct": min_distance,
"max_distance_to_entry_pct": max_distance,
"min_rr": min_rr,
"rec_score": rec_score,
"min_rec_score": min_rec_score,
"leverage": leverage_detail,
"opportunity_level": opportunity_level,
"entry_trigger_confirmed": trigger_ok,
}
def _cancel_paper_order(conn, order: dict, reason: str, event_time: str) -> dict:
conn.execute(
"""
UPDATE paper_orders
SET status='canceled', cancel_reason=%s, canceled_at=%s, updated_at=%s
WHERE id=%s AND status='pending'
""",
(reason, event_time, event_time, order["id"]),
)
return {"skipped": True, "reason": f"paper_order_{reason}", "paper_order_id": order["id"]}
def _touch_global_risk_cancel_reason(global_detail: dict | None) -> str:
"""Only hard account/portfolio gates should cancel an already-touched order.
A touched pullback/retest order already waited for a planned price. Broad
market risk is noisy for altcoins and should mostly resize or annotate the
fill, not cancel it. Cancel only when account/portfolio constraints say the
system cannot add exposure.
"""
detail = global_detail if isinstance(global_detail, dict) else {}
decision = str(detail.get("decision") or "").strip()
portfolio = detail.get("portfolio") if isinstance(detail.get("portfolio"), dict) else {}
reasons_text = " ".join(str(x) for x in (detail.get("reasons") or []))
drawdown = _safe_float(portfolio.get("unrealized_drawdown_pct"))
critical_drawdown = _safe_float(detail.get("critical_drawdown_pct") or detail.get("max_drawdown_critical"))
if decision == "block_max_open_positions":
return "touch_max_open_positions"
if decision == "block_same_direction_concentration":
return "touch_same_direction_concentration"
if decision == "block_same_sector_concentration":
return "touch_same_sector_concentration"
if "账户浮亏" in reasons_text or (critical_drawdown > 0 and drawdown >= critical_drawdown):
return "touch_position_multiplier_zero"
return ""
def _order_recommendation_cancel_reason(conn, rec: dict, order: dict) -> str:
rec_id = _safe_int(rec.get("id") or order.get("recommendation_id"))
if rec_id <= 0:
return ""
row = conn.execute(
"""
SELECT status, execution_status, lifecycle_state, display_bucket
FROM recommendation
WHERE id=%s
""",
(rec_id,),
).fetchone()
if not row:
return "recommendation_missing"
row = dict(row)
status = str(row.get("status") or "").strip().lower()
execution_status = str(row.get("execution_status") or "").strip().lower()
lifecycle_state = str(row.get("lifecycle_state") or "").strip().lower()
display_bucket = str(row.get("display_bucket") or "").strip().lower()
if status in {"expired", "invalid", "archived", "stopped_out", "closed"}:
return "recommendation_invalid"
if execution_status in {"invalid", "expired", "archived", "stopped_out"}:
return "recommendation_invalid"
if lifecycle_state in {"invalid", "expired", "archived", "stopped_out"}:
return "recommendation_invalid"
if display_bucket in {"history", "archive", "archived"}:
return "recommendation_invalid"
return ""
def _order_payload_from_rec(rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict:
cfg = _paper_cfg_for_rec(rec, config)
plan = _entry_plan(rec)
lineage = _strategy_lineage_from_rec(rec)
return {
"recommendation_id": _safe_int(rec.get("id")),
"symbol": str(rec.get("symbol") or "").strip().upper(),
"side": str(plan.get("side") or rec.get("side") or "long").strip().lower() or "long",
"order_type": "limit",
"status": "pending",
"source_status": str(rec.get("execution_status") or ""),
"source_action": str(rec.get("action_status") or plan.get("entry_action") or ""),
"target_price": _paper_order_target_price(rec),
"current_price_at_create": current_price,
"notional_usdt": default_notional_usdt(cfg),
"stop_loss": _safe_float(plan.get("stop_loss") or rec.get("stop_loss")),
"tp1": _safe_float(plan.get("tp1") or plan.get("take_profit_1") or rec.get("tp1")),
"tp2": _safe_float(plan.get("tp2") or plan.get("take_profit_2") or rec.get("tp2")),
"strategy_version": str(rec.get("strategy_version") or ""),
"strategy_code": lineage["strategy_code"],
"strategy_name": lineage["strategy_name"],
"strategy_signal_id": lineage["strategy_signal_id"],
"strategy_snapshot_json": lineage["strategy_snapshot_json"],
"factor_roles_json": lineage["factor_roles_json"],
"entry_plan_snapshot_json": json.dumps(plan, ensure_ascii=False, default=str),
"created_at": event_time,
"updated_at": event_time,
"expires_at": _paper_order_expires_at(event_time, cfg),
}
def _fill_paper_order(conn, order: dict, rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict:
fill_price = _safe_float(order.get("target_price")) or current_price
trade_rec = _rec_with_order_snapshot(rec, order, fill_price)
cfg = _paper_cfg_for_rec(trade_rec, config)
stop_loss = _safe_float(order.get("stop_loss") or _entry_plan(trade_rec).get("stop_loss") or trade_rec.get("stop_loss"))
base_notional = _safe_float(order.get("notional_usdt"), default_notional_usdt(cfg))
global_ok, global_detail = _global_risk_entry_check(conn, trade_rec, base_notional, cfg)
if not global_ok:
cancel_reason = _touch_global_risk_cancel_reason(global_detail)
if cancel_reason:
result = _cancel_paper_order(conn, order, cancel_reason, event_time)
result.update({
"target_price": order.get("target_price"),
"current_price": current_price,
"risk_detail": global_detail,
})
return result
global_detail = {
**(global_detail or {}),
"allow_new_entries": True,
"touch_soft_gate_overridden": True,
"touch_soft_gate_reason": (global_detail or {}).get("decision") or "soft_global_risk_gate",
}
trade_rec["_prefilled_global_risk"] = global_detail
adjusted_notional = _market_risk_adjusted_notional(base_notional, global_detail, cfg)
pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, adjusted_notional, event_time, cfg)
if not pause_ok:
return _cancel_paper_order(conn, order, pause_reason, event_time)
plan = _entry_plan(trade_rec)
plan["entry_price"] = fill_price
if adjusted_notional != base_notional:
plan["market_position_sizing"] = {
"base_notional_usdt": base_notional,
"adjusted_notional_usdt": adjusted_notional,
"position_multiplier": global_detail.get("position_multiplier"),
"risk_level": global_detail.get("risk_level"),
"decision": global_detail.get("decision"),
}
trade_rec["entry_plan"] = plan
trade_rec["entry_price"] = fill_price
# Filled limit orders should keep the notional decided when the order was
# created; risk sizing is still applied once inside _open_trade.
fill_cfg = {**cfg, "trade_notional_usdt": base_notional}
result = _open_trade(conn, trade_rec, fill_price, event_time, config=fill_cfg, push_open_card=False)
if result.get("opened"):
order = {**order, "fill_price": fill_price}
conn.execute(
"""
UPDATE paper_orders
SET status='filled', fill_price=%s, filled_at=%s, updated_at=%s
WHERE id=%s
""",
(fill_price, event_time, event_time, order["id"]),
)
result["paper_order"] = {"filled": True, "order_id": order["id"], "fill_price": fill_price}
_push_order_filled_card(order, result, event_time)
stop_loss = _safe_float(rec.get("stop_loss") or _entry_plan(rec).get("stop_loss") or order.get("stop_loss"))
side = normalize_side(order.get("side") or _entry_plan(rec).get("side") or rec.get("side"))
if should_stop_loss(side, current_price, stop_loss):
trade = conn.execute("SELECT * FROM paper_trades WHERE id=%s", (result["trade_id"],)).fetchone()
if trade:
close_result = _close_trade(conn, dict(trade), current_price, "stop_loss_same_tick", event_time)
result.update({
"closed": True,
"exit_reason": close_result.get("exit_reason"),
"pnl_pct": close_result.get("pnl_pct"),
"pnl_usdt": close_result.get("pnl_usdt"),
"same_tick_stop_loss": True,
})
return result
if result.get("reason") == "already_exists":
conn.execute(
"""
UPDATE paper_orders
SET status='filled', fill_price=%s, filled_at=%s, updated_at=%s
WHERE id=%s
""",
(fill_price, event_time, event_time, order["id"]),
)
result["paper_order"] = {"filled": False, "order_id": order["id"], "fill_price": fill_price}
return result
def _sync_wait_pullback_order(conn, rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict:
cfg = _paper_cfg_for_rec(rec, config)
rec_id = _safe_int(rec.get("id"))
order = conn.execute("SELECT * FROM paper_orders WHERE recommendation_id=%s", (rec_id,)).fetchone()
if order:
order = dict(order)
if order.get("status") != "pending":
return {"skipped": True, "reason": f"paper_order_{order.get('status')}", "paper_order_id": order.get("id")}
cancel_reason = _order_recommendation_cancel_reason(conn, rec, order)
if cancel_reason:
return _cancel_paper_order(conn, order, cancel_reason, event_time)
lifecycle = evaluate_limit_order(order=order, current_price=current_price, event_time=event_time, config=cfg).as_dict()
if lifecycle["action"] == "fill":
return _fill_paper_order(conn, order, rec, current_price, event_time, cfg)
if lifecycle["action"] == "cancel" and lifecycle["reason"] == "expired":
conn.execute(
"""
UPDATE paper_orders
SET status='expired', cancel_reason='expired', canceled_at=%s, updated_at=%s
WHERE id=%s
""",
(event_time, event_time, order["id"]),
)
return {"skipped": True, "reason": "paper_order_expired", "paper_order_id": order["id"]}
if lifecycle["action"] == "cancel":
return _cancel_paper_order(conn, order, str(lifecycle["reason"] or "canceled"), event_time)
conn.execute("UPDATE paper_orders SET updated_at=%s WHERE id=%s", (event_time, order["id"]))
return {
"skipped": True,
"reason": "paper_order_pending",
"paper_order_id": order["id"],
"target_price": order.get("target_price"),
"current_price": current_price,
"order_lifecycle": lifecycle,
}
gate_ok, gate_reasons, gate_detail = _paper_order_gate(rec, current_price, cfg, conn=conn)
if not gate_ok:
return {
"skipped": True,
"reason": "paper_order_gate_rejected",
"gate_reasons": gate_reasons,
"gate_detail": gate_detail,
"target_price": gate_detail.get("target_price"),
"current_price": current_price,
}
payload = _order_payload_from_rec(rec, current_price, event_time, cfg)
if payload["recommendation_id"] <= 0 or not payload["symbol"] or payload["target_price"] <= 0:
return {"skipped": True, "reason": "invalid_paper_order"}
row = conn.execute(
"""
INSERT INTO paper_orders (
recommendation_id, symbol, side, order_type, status,
source_status, source_action, target_price, current_price_at_create,
notional_usdt, stop_loss, tp1, tp2, strategy_version, strategy_code,
strategy_signal_id, strategy_snapshot_json, factor_roles_json,
entry_plan_snapshot_json, created_at, updated_at, expires_at
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT(recommendation_id) DO NOTHING
RETURNING id
""",
(
payload["recommendation_id"],
payload["symbol"],
payload["side"],
payload["order_type"],
payload["status"],
payload["source_status"],
payload["source_action"],
payload["target_price"],
payload["current_price_at_create"],
payload["notional_usdt"],
payload["stop_loss"],
payload["tp1"],
payload["tp2"],
payload["strategy_version"],
payload["strategy_code"],
payload["strategy_signal_id"],
payload["strategy_snapshot_json"],
payload["factor_roles_json"],
payload["entry_plan_snapshot_json"],
payload["created_at"],
payload["updated_at"],
payload["expires_at"],
),
).fetchone()
order_id = row["id"] if row else None
order = {"id": order_id, **payload}
lifecycle = evaluate_limit_order(order=order, current_price=current_price, event_time=event_time, config=cfg).as_dict()
if lifecycle["action"] == "fill":
return _fill_paper_order(conn, order, rec, current_price, event_time, cfg)
cancel_reason = _order_recommendation_cancel_reason(conn, rec, order)
if cancel_reason:
return _cancel_paper_order(conn, order, cancel_reason, event_time)
if lifecycle["action"] == "cancel":
return _cancel_paper_order(conn, order, str(lifecycle["reason"] or "canceled"), event_time)
result = {
"skipped": True,
"reason": "paper_order_created",
"paper_order_id": order_id,
"target_price": payload["target_price"],
"current_price": current_price,
"gate_detail": gate_detail,
"order_lifecycle": lifecycle,
}
_push_order_created_card(order, event_time)
return result
def _close_trade(conn, trade: dict, current_price: float, reason: str, event_time: str, detail: dict | None = None) -> dict:
entry_price = _safe_float(trade.get("entry_price"))
side = normalize_side(trade.get("side"))
exit_price = _close_price(current_price, side=side)
pnl_pct = _trade_pnl_pct(entry_price, exit_price, side)
notional = _safe_float(trade.get("notional_usdt"))
open_fee = _safe_float(trade.get("fee_usdt"))
close_fee = round(notional * default_fee_rate(), 8)
total_fee = round(open_fee + close_fee, 8)
pnl_usdt = round(notional * pnl_pct / 100 - total_fee, 8)
now = event_time or _now()
conn.execute(
"""
UPDATE paper_trades
SET status='closed',
closed_at=%s,
exit_price=%s,
current_price=%s,
pnl_pct=%s,
realized_pnl_pct=%s,
realized_pnl_usdt=%s,
fee_usdt=%s,
exit_reason=%s,
updated_at=%s
WHERE id=%s AND status='open'
""",
(
now,
exit_price,
exit_price,
pnl_pct,
pnl_pct,
pnl_usdt,
total_fee,
reason,
now,
trade["id"],
),
)
_record_event(
conn,
trade["id"],
trade["recommendation_id"],
trade["symbol"],
"close",
exit_price,
pnl_pct,
f"交易平仓:{reason}",
{"realized_pnl_usdt": pnl_usdt, "fee_usdt": total_fee, **(detail or {})},
now,
)
_push_event_card(
"close",
trade,
{"side": side, "exit_price": exit_price, "exit_reason": reason, "pnl_pct": pnl_pct, "pnl_usdt": pnl_usdt},
now,
)
return {"closed": True, "trade_id": trade["id"], "exit_reason": reason, "pnl_pct": pnl_pct, "pnl_usdt": pnl_usdt}
def _apply_position_health_guard(
conn,
trade: dict,
current_price: float,
pnl_pct: float,
event_time: str,
config: dict | None = None,
) -> dict:
cfg = _paper_cfg(config)
risk_detail = {}
if bool(cfg.get("position_guard_critical_exit_enabled", True)) and bool(cfg.get("global_risk_gate_enabled", True)):
try:
risk_detail = evaluate_global_risk(conn=conn, config=cfg, rec=trade, additional_notional=0.0)
except Exception as exc:
risk_detail = {"error": exc.__class__.__name__, "message": str(exc)[:200]}
decision = evaluate_position_health(
trade=trade,
current_price=current_price,
event_time=event_time,
config=cfg,
global_risk=risk_detail,
).as_dict()
action = decision.get("action")
if action == "close":
return _close_trade(conn, trade, current_price, str(decision.get("reason") or "position_health_exit"), event_time, {"position_health": decision})
if action == "tighten_stop":
guard_stop = _safe_float(decision.get("guard_stop"))
current_trail = _safe_float(trade.get("trailing_stop"))
side = normalize_side(trade.get("side"))
next_stop = tighter_stop(side, current_trail, guard_stop)
if guard_stop > 0 and abs(next_stop - current_trail) > 1e-12:
_record_event(
conn,
trade["id"],
trade["recommendation_id"],
trade["symbol"],
"position_guard_tighten",
guard_stop,
pnl_pct,
f"仓位健康保护:收紧保护价 {guard_stop:.8g}",
{"position_health": decision},
event_time,
)
return {"updated": True, "tightened": True, "trailing_stop": next_stop, "position_health": decision}
return {"updated": True, "tightened": False, "position_health": decision}
def _update_trailing_stop(conn, trade: dict, current_price: float, pnl_pct: float, event_time: str) -> tuple[float, dict]:
cfg = _trailing_config()
side = normalize_side(trade.get("side"))
current_trail = _safe_float(trade.get("trailing_stop"))
decision = evaluate_trailing_stop(position=trade, current_price=current_price, pnl_pct=pnl_pct, config=cfg).as_dict()
if not decision.get("activated") and not decision.get("moved"):
return _safe_float(decision.get("trailing_stop")) or current_trail, {
"activated": False,
"moved": False,
"trailing_mode": decision.get("trailing_mode"),
"volatility_pct": decision.get("volatility_pct"),
"activate_pnl_pct": decision.get("activate_pnl_pct"),
"distance_pct": decision.get("distance_pct"),
"trailing_decision": decision,
}
new_trail = _safe_float(decision.get("trailing_stop"))
event_type = str(decision.get("event_type") or ("trailing_activate" if decision.get("activated") else "trailing_move"))
action_text = "激活" if decision.get("activated") else "上移"
should_emit = bool(decision.get("activated")) or _should_emit_trailing_move(conn, trade, new_trail, event_time, cfg)
if should_emit:
message = f"移动止盈{action_text}:保护价 {new_trail:.8g}"
_record_event(
conn,
trade["id"],
trade["recommendation_id"],
trade["symbol"],
event_type,
new_trail,
pnl_pct,
message,
{
"current_price": current_price,
"previous_trailing_stop": decision.get("previous_trailing_stop"),
"trailing_stop": new_trail,
"activate_pnl_pct": decision.get("activate_pnl_pct"),
"distance_pct": decision.get("distance_pct"),
"tier_label": decision.get("tier_label"),
"trailing_mode": decision.get("trailing_mode"),
"volatility_pct": decision.get("volatility_pct"),
"base_activate_pnl_pct": decision.get("base_activate_pnl_pct"),
"base_distance_pct": decision.get("base_distance_pct"),
"min_lock_profit_pct": decision.get("min_lock_profit_pct"),
"notification_throttled": False,
"trailing_decision": decision,
},
event_time,
)
_push_event_card(event_type, trade, {"side": side, "trailing_stop": new_trail, "pnl_pct": pnl_pct}, event_time)
return new_trail, {
"activated": bool(decision.get("activated")),
"moved": bool(decision.get("moved")),
"trailing_stop": new_trail,
"previous_trailing_stop": decision.get("previous_trailing_stop"),
"distance_pct": decision.get("distance_pct"),
"activate_pnl_pct": decision.get("activate_pnl_pct"),
"tier_label": decision.get("tier_label"),
"trailing_mode": decision.get("trailing_mode"),
"volatility_pct": decision.get("volatility_pct"),
"notification_emitted": should_emit,
"trailing_decision": decision,
}
def _update_open_trade(conn, trade: dict, current_price: float, event_time: str) -> dict:
entry_price = _safe_float(trade.get("entry_price"))
side = normalize_side(trade.get("side"))
old_max = _safe_float(trade.get("max_price")) or entry_price
old_min = _safe_float(trade.get("min_price")) or entry_price
new_max = max(old_max, current_price)
new_min = min(old_min, current_price)
pnl_pct = _trade_pnl_pct(entry_price, current_price, side)
stop_loss = _safe_float(trade.get("stop_loss"))
trailing_stop = _safe_float(trade.get("trailing_stop"))
tp2 = _safe_float(trade.get("tp2"))
tp1 = _safe_float(trade.get("tp1"))
reason = ""
if should_stop_loss(side, current_price, stop_loss):
reason = "stop_loss"
elif should_hit_trailing_stop(side, current_price, trailing_stop):
reason = "trailing_stop"
elif should_take_profit(side, current_price, tp2):
reason = "tp2"
elif should_take_profit(side, current_price, tp1):
reason = "tp1"
if reason:
return _close_trade(conn, trade, current_price, reason, event_time)
trailing_stop, trailing_result = _update_trailing_stop(conn, trade, current_price, pnl_pct, event_time or _now())
guarded_trade = {**trade, "max_price": new_max, "min_price": new_min, "trailing_stop": trailing_stop}
guard_result = _apply_position_health_guard(conn, guarded_trade, current_price, pnl_pct, event_time or _now())
if guard_result.get("closed"):
return guard_result
if guard_result.get("tightened"):
trailing_stop = tighter_stop(side, trailing_stop, _safe_float(guard_result.get("trailing_stop")))
conn.execute(
"""
UPDATE paper_trades
SET current_price=%s,
max_price=%s,
min_price=%s,
trailing_stop=%s,
pnl_pct=%s,
updated_at=%s
WHERE id=%s AND status='open'
""",
(current_price, new_max, new_min, trailing_stop, pnl_pct, event_time or _now(), trade["id"]),
)
return {"updated": True, "trade_id": trade["id"], "pnl_pct": pnl_pct, **trailing_result, **guard_result}
def sync_recommendation(rec: dict, current_price: float, event_time: str = "") -> dict:
"""Open/update paper trade for one recommendation.
This is intentionally independent from recommendation PnL fields. A
recommendation can be a signal; only this ledger represents simulated
execution.
"""
if not paper_trading_enabled():
return {"enabled": False, "skipped": True, "reason": "disabled"}
rec_id = _safe_int(rec.get("id"))
symbol = str(rec.get("symbol") or "").strip().upper()
current_price = _safe_float(current_price)
if rec_id <= 0 or not symbol or current_price <= 0:
return {"enabled": True, "skipped": True, "reason": "invalid_input"}
execution_status = str(rec.get("execution_status") or "").strip()
action_status = str(rec.get("action_status") or "").strip()
event_time = event_time or _now()
cfg = paper_trading_config()
conn = get_conn()
try:
trade = conn.execute("SELECT * FROM paper_trades WHERE recommendation_id=%s", (rec_id,)).fetchone()
if trade:
trade = dict(trade)
if trade.get("status") == "open":
conn.execute(
"""
UPDATE paper_orders
SET status='filled',
fill_price=%s,
filled_at=COALESCE(NULLIF(filled_at,''), %s),
updated_at=%s
WHERE recommendation_id=%s AND status='pending'
""",
(
_safe_float(trade.get("entry_price")),
trade.get("opened_at") or event_time,
event_time,
rec_id,
),
)
result = _update_open_trade(conn, trade, current_price, event_time)
conn.commit()
return result
return {"skipped": True, "reason": "already_closed", "trade_id": trade.get("id")}
if execution_status != "buy_now" and action_status != "可即刻买入":
if _is_wait_pullback(rec):
result = _sync_wait_pullback_order(conn, rec, current_price, event_time, cfg)
conn.commit()
return result
return {"skipped": True, "reason": "not_buy_now"}
pending_order = conn.execute(
"SELECT * FROM paper_orders WHERE recommendation_id=%s AND status='pending'",
(rec_id,),
).fetchone()
if pending_order:
conn.execute(
"""
UPDATE paper_orders
SET status='canceled',
cancel_reason='upgraded_to_buy_now',
canceled_at=%s,
updated_at=%s
WHERE recommendation_id=%s AND status='pending'
""",
(event_time, event_time, rec_id),
)
result = _open_trade(conn, rec, current_price, event_time, config=cfg)
if pending_order:
result["paper_order"] = {
"canceled": True,
"order_id": pending_order["id"],
"cancel_reason": "upgraded_to_buy_now",
}
conn.commit()
return result
except Exception:
conn.rollback()
raise
finally:
try:
conn.close()
except Exception:
pass
def sync_pending_paper_orders(limit: int = 100, event_time: str = "", config: dict | None = None) -> dict:
"""Reconcile pending limit orders against the latest shared price cache.
The strategy runner can miss an existing order if the recommendation is
later derived back into observe status. Pending orders are executable
state, so they need their own reconciliation pass based on the same
latest_price_cache used by the Web page.
"""
if not paper_trading_enabled():
return {"enabled": False, "processed_count": 0, "results": []}
limit = max(1, min(_safe_int(limit, 100), 500))
event_time = event_time or _now()
cfg = _paper_cfg(config)
conn = get_conn()
results = []
try:
rows = conn.execute(
"""
SELECT
po.*,
r.status AS rec_status,
r.rec_state,
r.rec_score,
r.action_status,
r.execution_status,
r.lifecycle_state,
r.display_bucket,
r.entry_price,
r.current_price AS rec_current_price,
r.stop_loss AS rec_stop_loss,
r.tp1 AS rec_tp1,
r.tp2 AS rec_tp2,
r.strategy_version AS rec_strategy_version,
r.strategy_code AS rec_strategy_code,
r.strategy_signal_id AS rec_strategy_signal_id,
r.strategy_snapshot_json AS rec_strategy_snapshot_json,
r.factor_roles_json AS rec_factor_roles_json,
r.entry_plan_json,
r.market_context_json,
r.derivatives_context_json,
r.sector_context_json,
lpc.price AS latest_price,
lpc.updated_at AS latest_price_updated_at
FROM paper_orders po
LEFT JOIN recommendation r ON r.id = po.recommendation_id
LEFT JOIN latest_price_cache lpc ON lpc.symbol = po.symbol
WHERE po.status='pending'
ORDER BY po.created_at ASC, po.id ASC
LIMIT %s
""",
(limit,),
).fetchall()
for row in rows:
item = dict(row)
current_price = _safe_float(item.get("latest_price"))
order = {k: item.get(k) for k in [
"id",
"recommendation_id",
"symbol",
"side",
"order_type",
"status",
"source_status",
"source_action",
"target_price",
"current_price_at_create",
"fill_price",
"notional_usdt",
"stop_loss",
"tp1",
"tp2",
"strategy_version",
"strategy_code",
"strategy_signal_id",
"strategy_snapshot_json",
"factor_roles_json",
"entry_plan_snapshot_json",
"created_at",
"updated_at",
"expires_at",
"filled_at",
"canceled_at",
"cancel_reason",
]}
rec = {
"id": item.get("recommendation_id"),
"symbol": item.get("symbol"),
"status": item.get("rec_status") or "active",
"rec_state": item.get("rec_state"),
"rec_score": item.get("rec_score"),
"action_status": item.get("action_status") or item.get("source_action"),
"execution_status": item.get("execution_status") or item.get("source_status"),
"lifecycle_state": item.get("lifecycle_state"),
"display_bucket": item.get("display_bucket"),
"entry_price": item.get("entry_price") or item.get("target_price"),
"current_price": item.get("rec_current_price") or current_price,
"stop_loss": item.get("rec_stop_loss") or item.get("stop_loss"),
"tp1": item.get("rec_tp1") or item.get("tp1"),
"tp2": item.get("rec_tp2") or item.get("tp2"),
"side": item.get("side"),
"strategy_version": item.get("strategy_version") or item.get("rec_strategy_version"),
"strategy_code": item.get("strategy_code") or item.get("rec_strategy_code"),
"strategy_signal_id": item.get("strategy_signal_id") or item.get("rec_strategy_signal_id"),
"strategy_snapshot_json": item.get("strategy_snapshot_json") or item.get("rec_strategy_snapshot_json"),
"factor_roles_json": item.get("factor_roles_json") or item.get("rec_factor_roles_json"),
"entry_plan_json": item.get("entry_plan_snapshot_json") or item.get("entry_plan_json"),
"market_context_json": item.get("market_context_json"),
"derivatives_context_json": item.get("derivatives_context_json"),
"sector_context_json": item.get("sector_context_json"),
}
if current_price <= 0:
result = {
"skipped": True,
"reason": "missing_latest_price",
"paper_order_id": order.get("id"),
"symbol": order.get("symbol"),
"target_price": order.get("target_price"),
}
else:
result = _sync_wait_pullback_order(conn, rec, current_price, event_time, cfg)
result.update({
"symbol": order.get("symbol"),
"latest_price": current_price,
"latest_price_updated_at": item.get("latest_price_updated_at") or "",
})
results.append(result)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
try:
conn.close()
except Exception:
pass
return {
"enabled": True,
"processed_count": len(results),
"filled_count": sum(1 for r in results if r.get("paper_order", {}).get("filled")),
"canceled_count": sum(1 for r in results if str(r.get("reason") or "").startswith("paper_order_") and "canceled" in str(r.get("reason") or "")),
"results": results,
"run_time": event_time,
}
def get_paper_trading_summary(days: int = 30) -> dict:
days = max(1, min(_safe_int(days, 30), 365))
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
conn = get_conn()
try:
open_rows = conn.execute(
"""
SELECT * FROM paper_trades
WHERE status='open'
ORDER BY opened_at DESC, id DESC
"""
).fetchall()
closed_rows = conn.execute(
"""
SELECT * FROM paper_trades
WHERE status='closed'
AND COALESCE(closed_at, updated_at, opened_at) >= %s
ORDER BY COALESCE(closed_at, updated_at, opened_at) DESC, id DESC
""",
(cutoff,),
).fetchall()
pending_order_count = conn.execute("SELECT COUNT(*) FROM paper_orders WHERE status='pending'").fetchone()[0]
finally:
conn.close()
cfg = paper_trading_config()
items = [_decorate_trade(dict(r), cfg) for r in [*open_rows, *closed_rows]]
open_items = [x for x in items if x.get("status") == "open"]
closed_items = [x for x in items if x.get("status") == "closed"]
wins = [x for x in closed_items if _safe_float(x.get("realized_pnl_pct")) > 0]
losses = [x for x in closed_items if _safe_float(x.get("realized_pnl_pct")) <= 0]
total_realized = round(sum(_safe_float(x.get("realized_pnl_usdt")) for x in closed_items), 4)
avg_realized_pct = round(sum(_safe_float(x.get("realized_pnl_pct")) for x in closed_items) / len(closed_items), 4) if closed_items else 0
open_unrealized = round(sum(_safe_float(x.get("unrealized_pnl_usdt")) for x in open_items), 4)
total_pnl = round(total_realized + open_unrealized, 4)
allocated_margin = round(sum(_safe_float(x.get("margin_usdt")) for x in open_items), 4)
open_position_value = round(sum(_safe_float(x.get("notional_usdt")) for x in open_items), 4)
initial_equity = default_account_equity_usdt(cfg)
current_balance = round(initial_equity + total_pnl, 4)
cumulative_leverage = round(open_position_value / current_balance, 4) if current_balance > 0 else 0
return {
"days": days,
"total": len(items),
"open_count": len(open_items),
"closed_count": len(closed_items),
"win_count": len(wins),
"loss_count": len(losses),
"pending_order_count": int(pending_order_count or 0),
"win_rate": round(len(wins) / len(closed_items) * 100, 2) if closed_items else 0,
"realized_pnl_usdt": total_realized,
"avg_realized_pnl_pct": avg_realized_pct,
"open_unrealized_pnl_usdt": open_unrealized,
"total_pnl_usdt": total_pnl,
"initial_equity_usdt": initial_equity,
"account_equity_usdt": initial_equity,
"current_balance_usdt": current_balance,
"account_realized_return_pct": _account_return_pct(total_realized),
"account_unrealized_return_pct": _account_return_pct(open_unrealized),
"account_total_return_pct": _account_return_pct(total_pnl),
"allocated_margin_usdt": allocated_margin,
"open_position_value_usdt": open_position_value,
"cumulative_leverage": cumulative_leverage,
"available_equity_usdt": round(current_balance - allocated_margin, 4),
"margin_usdt": default_margin_usdt(cfg),
"leverage": default_leverage(cfg),
"notional_usdt": default_notional_usdt(cfg),
"fee_rate": default_fee_rate(cfg),
"slippage_pct": default_slippage_pct(cfg),
}
def get_paper_trading_performance(days: int = 30) -> dict:
days = max(1, min(_safe_int(days, 30), 365))
cfg = paper_trading_config()
initial_equity = default_account_equity_usdt(cfg)
today = datetime.now().date()
start_date = today - timedelta(days=days - 1)
conn = get_conn()
try:
closed_rows = conn.execute(
"""
SELECT closed_at, realized_pnl_usdt
FROM paper_trades
WHERE status='closed' AND closed_at IS NOT NULL
ORDER BY closed_at ASC, id ASC
"""
).fetchall()
open_unrealized = conn.execute(
"""
SELECT COALESCE(SUM(notional_usdt * pnl_pct / 100.0), 0)
FROM paper_trades
WHERE status='open'
"""
).fetchone()[0]
finally:
conn.close()
daily_realized = {}
realized_before = 0.0
for row in closed_rows:
closed_at = _parse_time(row["closed_at"])
pnl = _safe_float(row["realized_pnl_usdt"])
if not closed_at:
continue
closed_date = closed_at.date()
if closed_date < start_date:
realized_before += pnl
continue
if closed_date > today:
continue
key = closed_date.isoformat()
daily_realized[key] = daily_realized.get(key, 0.0) + pnl
points = []
cumulative_realized = realized_before
prev_equity = initial_equity + realized_before
peak_equity = max(initial_equity, prev_equity)
max_drawdown_pct = 0.0
max_drawdown_usdt = 0.0
open_unrealized = _safe_float(open_unrealized)
for idx in range(days):
day = start_date + timedelta(days=idx)
key = day.isoformat()
realized = round(daily_realized.get(key, 0.0), 8)
cumulative_realized += realized
unrealized = open_unrealized if day == today else 0.0
equity = round(initial_equity + cumulative_realized + unrealized, 8)
daily_pnl = round(equity - prev_equity, 8)
daily_return_pct = round(daily_pnl / prev_equity * 100, 6) if prev_equity > 0 else 0.0
peak_equity = max(peak_equity, equity)
drawdown_usdt = round(max(0.0, peak_equity - equity), 8)
drawdown_pct = round(drawdown_usdt / peak_equity * 100, 6) if peak_equity > 0 else 0.0
max_drawdown_pct = max(max_drawdown_pct, drawdown_pct)
max_drawdown_usdt = max(max_drawdown_usdt, drawdown_usdt)
points.append(
{
"date": key,
"equity_usdt": equity,
"daily_pnl_usdt": daily_pnl,
"daily_return_pct": daily_return_pct,
"realized_pnl_usdt": round(cumulative_realized, 8),
"unrealized_pnl_usdt": round(unrealized, 8),
"return_pct": _account_return_pct(equity - initial_equity, initial_equity, cfg),
"drawdown_usdt": drawdown_usdt,
"drawdown_pct": drawdown_pct,
}
)
prev_equity = equity
total_pnl = round((points[-1]["equity_usdt"] - initial_equity) if points else 0.0, 8)
return {
"days": days,
"initial_equity_usdt": initial_equity,
"current_equity_usdt": points[-1]["equity_usdt"] if points else initial_equity,
"total_pnl_usdt": total_pnl,
"total_return_pct": _account_return_pct(total_pnl, initial_equity, cfg),
"max_drawdown_usdt": round(max_drawdown_usdt, 8),
"max_drawdown_pct": round(max_drawdown_pct, 6),
"points": points,
}
def list_paper_trades(limit: int = 50, offset: int = 0, status: str = "", strategy_code: str = "", side: str = "") -> dict:
limit = max(1, min(_safe_int(limit, 50), 200))
offset = max(0, _safe_int(offset, 0))
status = str(status or "").strip()
where = ""
params = []
clauses = []
if status in {"open", "closed"}:
clauses.append("status=%s")
params.append(status)
strategy_code = str(strategy_code or "").strip()
if strategy_code:
clauses.append("strategy_code=%s")
params.append(normalize_strategy_code(strategy_code))
side = str(side or "").strip().lower()
if side in {"long", "short"}:
clauses.append("LOWER(COALESCE(side, 'long'))=%s")
params.append(side)
where = "WHERE " + " AND ".join(clauses) if clauses else ""
conn = get_conn()
try:
total = conn.execute(f"SELECT COUNT(*) FROM paper_trades {where}", tuple(params)).fetchone()[0]
rows = conn.execute(
f"""
SELECT pt.*, lpc.price AS latest_market_price, lpc.updated_at AS latest_market_price_updated_at
FROM paper_trades pt
LEFT JOIN latest_price_cache lpc ON lpc.symbol = pt.symbol
{where}
ORDER BY pt.opened_at DESC, pt.id DESC
LIMIT %s OFFSET %s
""",
tuple(params + [limit, offset]),
).fetchall()
finally:
conn.close()
cfg = paper_trading_config()
return {
"items": [_decorate_trade(dict(r), cfg) for r in rows],
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(rows) < int(total or 0),
}
def list_paper_orders(limit: int = 50, offset: int = 0, status: str = "", strategy_code: str = "", side: str = "") -> dict:
limit = max(1, min(_safe_int(limit, 50), 200))
offset = max(0, _safe_int(offset, 0))
status = str(status or "").strip()
where = ""
params = []
clauses = []
if status in {"pending", "filled", "canceled", "expired", "rejected"}:
clauses.append("status=%s")
params.append(status)
strategy_code = str(strategy_code or "").strip()
if strategy_code:
clauses.append("strategy_code=%s")
params.append(normalize_strategy_code(strategy_code))
side = str(side or "").strip().lower()
if side in {"long", "short"}:
clauses.append("LOWER(COALESCE(side, 'long'))=%s")
params.append(side)
where = "WHERE " + " AND ".join(clauses) if clauses else ""
conn = get_conn()
try:
total = conn.execute(f"SELECT COUNT(*) FROM paper_orders {where}", tuple(params)).fetchone()[0]
rows = conn.execute(
f"""
SELECT po.*, lpc.price AS latest_market_price, lpc.updated_at AS latest_market_price_updated_at
FROM paper_orders po
LEFT JOIN latest_price_cache lpc ON lpc.symbol = po.symbol
{where}
ORDER BY po.created_at DESC, po.id DESC
LIMIT %s OFFSET %s
""",
tuple(params + [limit, offset]),
).fetchall()
finally:
conn.close()
items = []
for row in rows:
item = dict(row)
item["entry_plan_snapshot"] = _loads_json(item.pop("entry_plan_snapshot_json", "{}"), {})
latest_market = _safe_float(item.get("latest_market_price"))
item["latest_price"] = latest_market if latest_market > 0 else _safe_float(item.get("current_price_at_create"))
item["latest_price_updated_at"] = item.get("latest_market_price_updated_at") or item.get("updated_at") or ""
item.update(_strategy_lineage_from_trade_or_order(item))
target = _safe_float(item.get("target_price"))
latest = _safe_float(item.get("latest_price"))
item["distance_to_target_pct"] = round(order_distance_pct(item.get("side") or "long", latest, target), 4) if target and latest else 0
items.append(item)
return {
"items": items,
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(items) < int(total or 0),
}
def list_paper_trade_events(limit: int = 80, offset: int = 0, symbol: str = "", event_type: str = "", strategy_code: str = "", side: str = "") -> dict:
limit = max(1, min(_safe_int(limit, 80), 200))
offset = max(0, _safe_int(offset, 0))
symbol = str(symbol or "").strip().upper()
event_type = str(event_type or "").strip()
strategy_code = str(strategy_code or "").strip()
where = []
params = []
if symbol:
where.append("e.symbol=%s")
params.append(symbol)
if event_type:
where.append("e.event_type=%s")
params.append(event_type)
if strategy_code:
where.append("COALESCE(NULLIF(e.strategy_code, ''), t.strategy_code)=%s")
params.append(normalize_strategy_code(strategy_code))
side = str(side or "").strip().lower()
if side in {"long", "short"}:
where.append("LOWER(COALESCE(t.side, 'long'))=%s")
params.append(side)
where_sql = "WHERE " + " AND ".join(where) if where else ""
conn = get_conn()
try:
total = conn.execute(
f"""
SELECT COUNT(*)
FROM paper_trade_events e
LEFT JOIN paper_trades t ON t.id = e.trade_id
{where_sql}
""",
tuple(params),
).fetchone()[0]
rows = conn.execute(
f"""
SELECT
e.*,
t.status AS trade_status,
t.entry_price,
t.exit_price,
t.notional_usdt,
t.margin_usdt,
t.leverage,
t.exit_reason,
t.side AS trade_side,
t.opened_at,
t.closed_at,
t.strategy_code AS trade_strategy_code,
t.strategy_signal_id AS trade_strategy_signal_id
FROM paper_trade_events e
LEFT JOIN paper_trades t ON t.id = e.trade_id
{where_sql}
ORDER BY e.event_time DESC, e.id DESC
LIMIT %s OFFSET %s
""",
tuple(params + [limit, offset]),
).fetchall()
finally:
conn.close()
items = []
for row in rows:
item = dict(row)
item["detail"] = _loads_json(item.pop("detail_json", "{}"), {})
item["strategy_code"] = normalize_strategy_code(item.get("strategy_code") or item.get("trade_strategy_code"))
item["strategy_name"] = strategy_label(item["strategy_code"])
item["strategy_signal_id"] = _safe_int(item.get("strategy_signal_id") or item.get("trade_strategy_signal_id"))
item["side"] = normalize_side(item.get("trade_side"))
items.append(item)
return {
"items": items,
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(items) < int(total or 0),
}
def delete_paper_trade(trade_id: int) -> dict:
trade_id = _safe_int(trade_id)
if trade_id <= 0:
return {"deleted": False, "reason": "invalid_trade_id"}
conn = get_conn()
try:
trade = conn.execute("SELECT id, recommendation_id, symbol, status FROM paper_trades WHERE id=%s", (trade_id,)).fetchone()
if not trade:
return {"deleted": False, "reason": "not_found", "trade_id": trade_id}
event_count = conn.execute("SELECT COUNT(*) FROM paper_trade_events WHERE trade_id=%s", (trade_id,)).fetchone()[0]
conn.execute("DELETE FROM paper_trade_events WHERE trade_id=%s", (trade_id,))
conn.execute("DELETE FROM paper_trades WHERE id=%s", (trade_id,))
conn.commit()
return {
"deleted": True,
"trade_id": trade_id,
"symbol": trade["symbol"],
"status": trade["status"],
"deleted_events": int(event_count or 0),
}
finally:
conn.close()
def delete_paper_order(order_id: int) -> dict:
order_id = _safe_int(order_id)
if order_id <= 0:
return {"deleted": False, "reason": "invalid_order_id"}
conn = get_conn()
try:
order = conn.execute("SELECT id, recommendation_id, symbol, status FROM paper_orders WHERE id=%s", (order_id,)).fetchone()
if not order:
return {"deleted": False, "reason": "not_found", "order_id": order_id}
conn.execute("DELETE FROM paper_orders WHERE id=%s", (order_id,))
conn.commit()
return {
"deleted": True,
"order_id": order_id,
"symbol": order["symbol"],
"status": order["status"],
}
finally:
conn.close()
def reset_paper_trading_data(scope: str = "all") -> dict:
scope = str(scope or "all").strip().lower()
allowed = {"all", "trades", "orders", "events", "open_trades", "closed_trades", "completed"}
if scope not in allowed:
return {"reset": False, "reason": "invalid_scope", "scope": scope, "allowed_scopes": sorted(allowed)}
conn = get_conn()
try:
deleted = {"trades": 0, "orders": 0, "events": 0}
def delete_trades(where_sql: str = "", params: tuple = ()) -> None:
rows = conn.execute(f"SELECT id FROM paper_trades {where_sql}", params).fetchall()
ids = [int(r["id"]) for r in rows]
if not ids:
return
event_count = conn.execute("SELECT COUNT(*) FROM paper_trade_events WHERE trade_id = ANY(%s)", (ids,)).fetchone()[0]
conn.execute("DELETE FROM paper_trade_events WHERE trade_id = ANY(%s)", (ids,))
conn.execute("DELETE FROM paper_trades WHERE id = ANY(%s)", (ids,))
deleted["events"] += int(event_count or 0)
deleted["trades"] += len(ids)
def delete_orders(where_sql: str = "", params: tuple = ()) -> None:
count = conn.execute(f"SELECT COUNT(*) FROM paper_orders {where_sql}", params).fetchone()[0]
conn.execute(f"DELETE FROM paper_orders {where_sql}", params)
deleted["orders"] += int(count or 0)
if scope == "all":
deleted["events"] += int(conn.execute("SELECT COUNT(*) FROM paper_trade_events").fetchone()[0] or 0)
deleted["trades"] += int(conn.execute("SELECT COUNT(*) FROM paper_trades").fetchone()[0] or 0)
deleted["orders"] += int(conn.execute("SELECT COUNT(*) FROM paper_orders").fetchone()[0] or 0)
conn.execute("DELETE FROM paper_trade_events")
conn.execute("DELETE FROM paper_trades")
conn.execute("DELETE FROM paper_orders")
elif scope == "events":
deleted["events"] += int(conn.execute("SELECT COUNT(*) FROM paper_trade_events").fetchone()[0] or 0)
conn.execute("DELETE FROM paper_trade_events")
elif scope == "orders":
delete_orders()
elif scope == "trades":
delete_trades()
elif scope == "open_trades":
delete_trades("WHERE status='open'")
elif scope == "closed_trades":
delete_trades("WHERE status='closed'")
elif scope == "completed":
delete_trades("WHERE status='closed'")
delete_orders("WHERE status IN ('filled','expired','canceled','rejected')")
conn.commit()
return {"reset": True, "scope": scope, "deleted": deleted}
finally:
conn.close()
def _report_trade_line(item: dict, closed: bool = False) -> str:
symbol = item.get("symbol") or "--"
side = "" if str(item.get("side") or "long").lower() == "short" else ""
pnl = _safe_float(item.get("realized_pnl_pct") if closed else item.get("pnl_pct"))
pnl_usdt = _safe_float(item.get("realized_pnl_usdt") if closed else item.get("unrealized_pnl_usdt"))
status = item.get("exit_reason") if closed else "持仓中"
return f"- **{symbol}** · {side} · {_fmt_pct(pnl)} · {pnl_usdt:+.2f} USDT · {status or '--'}"
def _report_order_line(item: dict) -> str:
symbol = item.get("symbol") or "--"
side = "" if str(item.get("side") or "long").lower() == "short" else ""
distance = _safe_float(item.get("distance_to_target_pct"))
return f"- **{symbol}** · {side} · 目标 {_fmt_price(item.get('target_price'))} · 距目标 {_fmt_pct(distance)}"
def _paper_running_days() -> float:
conn = get_conn()
try:
row = conn.execute(
"""
SELECT MIN(first_at) AS first_at
FROM (
SELECT MIN(opened_at) AS first_at FROM paper_trades
UNION ALL
SELECT MIN(created_at) AS first_at FROM paper_orders
) x
WHERE first_at IS NOT NULL AND first_at <> ''
"""
).fetchone()
finally:
conn.close()
first_at = _parse_time(row["first_at"] if row else "")
if not first_at:
return 0.0
now = datetime.now(first_at.tzinfo) if first_at.tzinfo else datetime.now()
seconds = max(0.0, (now - first_at).total_seconds())
return max(1.0, seconds / 86400)
def _periodized_return_pct(total_return_pct: float, running_days: float, period_days: float) -> float:
if running_days <= 0:
return 0.0
growth = 1 + total_return_pct / 100
if growth <= 0:
return -100.0
return round((growth ** (period_days / running_days) - 1) * 100, 4)
def _periodized_return_label(total_return_pct: float, running_days: float, period_days: float) -> str:
if running_days < 30:
return "样本不足"
return _fmt_pct(_periodized_return_pct(total_return_pct, running_days, period_days))
def send_paper_trading_report(days: int = 30) -> dict:
days = max(1, min(_safe_int(days, 30), 365))
summary = get_paper_trading_summary(days=days)
open_trades = list_paper_trades(limit=5, status="open").get("items", [])
closed_trades = list_paper_trades(limit=5, status="closed").get("items", [])
pending_orders = list_paper_orders(limit=5, status="pending").get("items", [])
total_pnl = _safe_float(summary.get("total_pnl_usdt"))
realized = _safe_float(summary.get("realized_pnl_usdt"))
unrealized = _safe_float(summary.get("open_unrealized_pnl_usdt"))
initial_equity = _safe_float(summary.get("initial_equity_usdt"))
current_balance = _safe_float(summary.get("current_balance_usdt"))
return_pct = _safe_float(summary.get("account_total_return_pct"))
win_rate = _safe_float(summary.get("win_rate"))
running_days = _paper_running_days()
monthly_return_pct = _periodized_return_pct(return_pct, running_days, 30) if running_days >= 30 else None
annualized_return_pct = _periodized_return_pct(return_pct, running_days, 365) if running_days >= 30 else None
monthly_return_label = _periodized_return_label(return_pct, running_days, 30)
annualized_return_label = _periodized_return_label(return_pct, running_days, 365)
template = "green" if total_pnl >= 0 else "red"
elements = [
{
"tag": "column_set",
"flex_mode": "none",
"background_style": "default",
"columns": [
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("初始资金", f"{initial_equity:.2f} USDT")]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("当前资金", f"{current_balance:.2f} USDT")]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("运行天数", f"{running_days:.1f}")]},
],
},
{
"tag": "column_set",
"flex_mode": "none",
"background_style": "default",
"columns": [
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("账户收益率", _fmt_pct(return_pct))]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("月化收益率", monthly_return_label)]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("年化收益率", annualized_return_label)]},
],
},
{
"tag": "column_set",
"flex_mode": "none",
"background_style": "default",
"columns": [
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("总收益", f"{total_pnl:+.2f} USDT")]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("成功率", f"{_fmt_pct(win_rate)} ({summary.get('win_count', 0)}/{summary.get('closed_count', 0)})")]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("交易数量", f"持仓 {summary.get('open_count', 0)} / 平仓 {summary.get('closed_count', 0)} / 挂单 {summary.get('pending_order_count', 0)}")]},
],
},
_card_md(
"**战绩概览**\n"
f"- 已实现: {realized:+.2f} USDT\n"
f"- 浮动收益: {unrealized:+.2f} USDT\n"
f"- 平均平仓收益率: {_fmt_pct(summary.get('avg_realized_pnl_pct'))}\n"
f"- 持仓名义价值: {_safe_float(summary.get('open_position_value_usdt')):.2f} USDT\n"
f"- 已占用保证金: {_safe_float(summary.get('allocated_margin_usdt')):.2f} USDT\n"
f"- 可用资金: {_safe_float(summary.get('available_equity_usdt')):.2f} USDT\n"
f"- 累计杠杆: {_safe_float(summary.get('cumulative_leverage')):.2f}x"
),
_card_md("**当前持仓**\n" + ("\n".join(_report_trade_line(x) for x in open_trades) if open_trades else "- 暂无持仓")),
_card_md("**等待成交**\n" + ("\n".join(_report_order_line(x) for x in pending_orders) if pending_orders else "- 暂无挂单")),
_card_md("**最近平仓**\n" + ("\n".join(_report_trade_line(x, closed=True) for x in closed_trades) if closed_trades else "- 暂无平仓记录")),
_card_note(f"报告周期: 最近 {days} 天 · 发送时间: {_now()}"),
]
ok, push_result = _push_custom_paper_card({
"metadata": {"source": "paper_trading", "event_type": "trade_report", "symbol": "ALL"},
"config": {"wide_screen_mode": True},
"header": {
"template": template,
"title": {"tag": "plain_text", "content": f"交易报告 - 最近 {days}"},
},
"elements": elements,
})
return {
"ok": bool(ok),
"days": days,
"running_days": running_days,
"monthly_return_pct": monthly_return_pct,
"annualized_return_pct": annualized_return_pct,
"monthly_return_label": monthly_return_label,
"annualized_return_label": annualized_return_label,
"summary": summary,
"sent_at": _now(),
"push_result": push_result,
}