alphax/app/db/paper_trading.py
2026-05-28 23:47:37 +08:00

2261 lines
95 KiB
Python

"""Paper trading ledger for separating signal quality from trade PnL."""
from __future__ import annotations
import json
import os
from datetime import datetime, timedelta
from app.config.system_config import paper_trading_config
from app.core.global_risk import evaluate_global_risk
from app.core.strategy_registry import normalize_strategy_code, strategy_label, strategy_paper_config
from app.db.schema import get_conn
from app.db.system_logs import record_system_error
from app.integrations.feishu_push import push_card
def _now() -> str:
return datetime.now().isoformat()
def _safe_float(value, default: float = 0.0) -> float:
try:
if value is None or value == "":
return default
return float(value)
except Exception:
return default
def _safe_int(value, default: int = 0) -> int:
try:
return int(value or 0)
except Exception:
return default
def paper_trading_enabled() -> bool:
return bool(paper_trading_config().get("enabled", True))
def _paper_cfg(config: dict | None = None) -> dict:
return config if isinstance(config, dict) else paper_trading_config()
def default_account_equity_usdt(config: dict | None = None) -> float:
return max(1.0, _safe_float(_paper_cfg(config).get("account_equity_usdt"), 20000.0))
def default_leverage(config: dict | None = None) -> float:
return max(1.0, _safe_float(_paper_cfg(config).get("trade_leverage"), 5.0))
def default_notional_usdt(config: dict | None = None) -> float:
return max(1.0, _safe_float(_paper_cfg(config).get("trade_notional_usdt"), 5000.0))
def default_margin_usdt(config: dict | None = None) -> float:
cfg = _paper_cfg(config)
return round(default_notional_usdt(cfg) / default_leverage(cfg), 8)
def default_fee_rate(config: dict | None = None) -> float:
return max(0.0, _safe_float(_paper_cfg(config).get("fee_rate"), 0.001))
def default_slippage_pct(config: dict | None = None) -> float:
return max(0.0, _safe_float(_paper_cfg(config).get("slippage_pct"), 0.05))
def max_cumulative_leverage(config: dict | None = None) -> float:
return max(0.0, _safe_float(_paper_cfg(config).get("max_cumulative_leverage"), 5.0))
def _cumulative_leverage_check(conn, additional_notional: float, config: dict | None = None, exclude_rec_id: int = 0) -> tuple[bool, dict]:
cfg = _paper_cfg(config)
equity = default_account_equity_usdt(cfg)
cap = max_cumulative_leverage(cfg)
if cap <= 0:
return True, {"max_cumulative_leverage": cap, "disabled": True}
exclude_rec_id = _safe_int(exclude_rec_id)
open_params = []
pending_params = []
open_where = "status='open'"
pending_where = "status='pending'"
if exclude_rec_id > 0:
open_where += " AND recommendation_id<>%s"
pending_where += " AND recommendation_id<>%s"
open_params.append(exclude_rec_id)
pending_params.append(exclude_rec_id)
open_notional = _safe_float(conn.execute(f"SELECT COALESCE(SUM(notional_usdt),0) FROM paper_trades WHERE {open_where}", tuple(open_params)).fetchone()[0])
pending_notional = _safe_float(conn.execute(f"SELECT COALESCE(SUM(notional_usdt),0) FROM paper_orders WHERE {pending_where}", tuple(pending_params)).fetchone()[0])
add = max(0.0, _safe_float(additional_notional))
projected_notional = open_notional + pending_notional + add
projected_leverage = projected_notional / equity if equity > 0 else 0
detail = {
"account_equity_usdt": equity,
"open_notional_usdt": round(open_notional, 8),
"pending_notional_usdt": round(pending_notional, 8),
"additional_notional_usdt": round(add, 8),
"projected_notional_usdt": round(projected_notional, 8),
"projected_cumulative_leverage": round(projected_leverage, 6),
"max_cumulative_leverage": cap,
}
return projected_leverage <= cap + 1e-12, detail
def _portfolio_drawdown_check(conn, additional_notional: float, config: dict | None = None) -> tuple[bool, dict]:
cfg = _paper_cfg(config)
cap = max(0.0, _safe_float(cfg.get("max_account_drawdown_pause_pct"), 0))
if cap <= 0:
return True, {"disabled": True, "max_account_drawdown_pause_pct": cap}
equity = default_account_equity_usdt(cfg)
open_rows = conn.execute("SELECT notional_usdt, pnl_pct FROM paper_trades WHERE status='open'").fetchall()
unrealized = sum(_safe_float(r["notional_usdt"]) * _safe_float(r["pnl_pct"]) / 100 for r in open_rows)
drawdown_pct = abs(min(0.0, unrealized)) / equity * 100 if equity > 0 else 0
detail = {
"account_equity_usdt": equity,
"open_unrealized_pnl_usdt": round(unrealized, 8),
"open_drawdown_pct": round(drawdown_pct, 6),
"max_account_drawdown_pause_pct": cap,
"additional_notional_usdt": round(max(0.0, _safe_float(additional_notional)), 8),
}
return drawdown_pct <= cap + 1e-12, detail
def _weak_entries_check(conn, event_time: str, config: dict | None = None) -> tuple[bool, dict]:
cfg = _paper_cfg(config)
limit = max(0, _safe_int(cfg.get("pause_after_weak_entries"), 0))
if limit <= 0:
return True, {"disabled": True, "pause_after_weak_entries": limit}
window_hours = max(0.1, _safe_float(cfg.get("weak_entry_window_hours"), 6.0))
threshold = max(0.0, _safe_float(cfg.get("weak_entry_min_max_pnl_pct"), 1.0))
now = _parse_time(event_time or _now()) or datetime.now()
since = (now - timedelta(hours=window_hours)).isoformat()
rows = conn.execute(
"""
SELECT symbol, opened_at, entry_price, max_price
FROM paper_trades
WHERE opened_at >= %s
ORDER BY opened_at DESC, id DESC
LIMIT %s
""",
(since, limit),
).fetchall()
samples = []
for row in rows:
entry = _safe_float(row["entry_price"])
max_pnl = (max(_safe_float(row["max_price"]), entry) / entry - 1) * 100 if entry > 0 else 0
samples.append({"symbol": row["symbol"], "opened_at": row["opened_at"], "max_pnl_pct": round(max_pnl, 6)})
enough = len(samples) >= limit
all_weak = enough and all(x["max_pnl_pct"] < threshold for x in samples)
detail = {
"pause_after_weak_entries": limit,
"weak_entry_window_hours": window_hours,
"weak_entry_min_max_pnl_pct": threshold,
"samples": samples,
}
return not all_weak, detail
def _portfolio_entry_pause_check(conn, additional_notional: float, event_time: str, config: dict | None = None) -> tuple[bool, str, dict]:
drawdown_ok, drawdown = _portfolio_drawdown_check(conn, additional_notional, config)
if not drawdown_ok:
return False, "portfolio_drawdown_pause", {"drawdown": drawdown}
weak_ok, weak = _weak_entries_check(conn, event_time, config)
if not weak_ok:
return False, "weak_entries_pause", {"weak_entries": weak}
return True, "", {"drawdown": drawdown, "weak_entries": weak}
def _global_risk_entry_check(conn, rec: dict, additional_notional: float, config: dict | None = None) -> tuple[bool, dict]:
detail = evaluate_global_risk(
conn=conn,
config=_paper_cfg(config),
rec=rec,
additional_notional=additional_notional,
)
return bool(detail.get("allow_new_entries", True)), detail
def _market_risk_adjusted_notional(base_notional: float, risk_detail: dict | None, config: dict | None = None) -> float:
cfg = _paper_cfg(config)
detail = risk_detail if isinstance(risk_detail, dict) else {}
multiplier = _safe_float(detail.get("position_multiplier"), 1.0)
if multiplier <= 0:
multiplier = 1.0
adjusted = _safe_float(base_notional) * multiplier
min_notional = max(1.0, _safe_float(cfg.get("min_trade_notional_usdt"), 1.0))
return round(max(min_notional, adjusted), 8)
def _trailing_config() -> dict:
cfg = paper_trading_config()
return {
"enabled": bool(cfg.get("trailing_stop_enabled", True)),
"mode": str(cfg.get("trailing_mode") or "volatility").strip().lower(),
"activate_pnl_pct": max(0.0, _safe_float(cfg.get("trailing_activate_pnl_pct"), 3.0)),
"min_lock_profit_pct": max(0.0, _safe_float(cfg.get("trailing_min_lock_profit_pct"), 0.5)),
"distance_pct": max(0.1, _safe_float(cfg.get("trailing_distance_pct"), 1.5)),
"vol_min_activation_pct": max(0.0, _safe_float(cfg.get("trailing_volatility_min_activation_pct"), 2.5)),
"vol_max_activation_pct": max(0.1, _safe_float(cfg.get("trailing_volatility_max_activation_pct"), 8.0)),
"vol_activation_mult": max(0.0, _safe_float(cfg.get("trailing_volatility_activation_mult"), 0.6)),
"vol_min_distance_pct": max(0.1, _safe_float(cfg.get("trailing_volatility_min_distance_pct"), 1.2)),
"vol_max_distance_pct": max(0.1, _safe_float(cfg.get("trailing_volatility_max_distance_pct"), 8.0)),
"vol_distance_mult": max(0.0, _safe_float(cfg.get("trailing_volatility_distance_mult"), 0.7)),
"move_push_min_interval_seconds": max(0, _safe_int(cfg.get("trailing_move_push_min_interval_seconds"), 300)),
"move_push_min_step_pct": max(0.0, _safe_float(cfg.get("trailing_move_push_min_step_pct"), 2.0)),
"tiers": cfg.get("trailing_tiers") if isinstance(cfg.get("trailing_tiers"), list) else [],
}
def _trailing_distance_pct(pnl_pct: float, cfg: dict) -> tuple[float, str]:
distance = _safe_float(cfg.get("distance_pct"), 1.5)
label = ""
tiers = cfg.get("tiers") or []
for tier in sorted((t for t in tiers if isinstance(t, dict)), key=lambda x: _safe_float(x.get("min_pnl_pct")), reverse=True):
if pnl_pct >= _safe_float(tier.get("min_pnl_pct")):
distance = max(0.1, _safe_float(tier.get("distance_pct"), distance))
label = str(tier.get("label") or "")
break
return distance, label
def _clamp(value: float, min_value: float, max_value: float) -> float:
low = min(min_value, max_value)
high = max(min_value, max_value)
return max(low, min(high, value))
def _trade_observed_volatility_pct(trade: dict, current_price: float) -> float:
entry = _safe_float(trade.get("entry_price"))
if entry <= 0 or current_price <= 0:
return 0.0
high = max(_safe_float(trade.get("max_price")) or entry, current_price, entry)
low = min(_safe_float(trade.get("min_price")) or entry, current_price, entry)
return round(max(0.0, (high - low) / entry * 100), 6)
def _dynamic_trailing_profile(trade: dict, current_price: float, pnl_pct: float, cfg: dict) -> dict:
base_activate = _safe_float(cfg.get("activate_pnl_pct"), 3.0)
base_distance, tier_label = _trailing_distance_pct(pnl_pct, cfg)
volatility_pct = _trade_observed_volatility_pct(trade, current_price)
if str(cfg.get("mode") or "volatility").lower() != "volatility":
return {
"mode": "fixed",
"volatility_pct": volatility_pct,
"activate_pnl_pct": base_activate,
"distance_pct": base_distance,
"tier_label": tier_label,
}
dynamic_activate = max(base_activate, volatility_pct * _safe_float(cfg.get("vol_activation_mult"), 0.6))
dynamic_activate = _clamp(
dynamic_activate,
_safe_float(cfg.get("vol_min_activation_pct"), 2.5),
_safe_float(cfg.get("vol_max_activation_pct"), 8.0),
)
dynamic_distance = max(base_distance, volatility_pct * _safe_float(cfg.get("vol_distance_mult"), 0.7))
dynamic_distance = _clamp(
dynamic_distance,
_safe_float(cfg.get("vol_min_distance_pct"), 1.2),
_safe_float(cfg.get("vol_max_distance_pct"), 8.0),
)
label = tier_label or "波动率"
return {
"mode": "volatility",
"volatility_pct": volatility_pct,
"activate_pnl_pct": round(dynamic_activate, 6),
"distance_pct": round(dynamic_distance, 6),
"tier_label": label,
"base_activate_pnl_pct": base_activate,
"base_distance_pct": base_distance,
}
def _parse_time(value: str) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(str(value).replace("Z", "+00:00"))
except Exception:
return None
def _should_emit_trailing_move(conn, trade: dict, new_trail: float, event_time: str, cfg: dict) -> bool:
last = conn.execute(
"""
SELECT event_time, price
FROM paper_trade_events
WHERE trade_id=%s AND event_type IN ('trailing_activate','trailing_move')
ORDER BY event_time DESC, id DESC
LIMIT 1
""",
(trade["id"],),
).fetchone()
if not last:
return True
min_interval = _safe_int(cfg.get("move_push_min_interval_seconds"), 300)
if min_interval > 0:
current_ts = _parse_time(event_time or _now())
last_ts = _parse_time(last["event_time"])
if current_ts and last_ts and (current_ts - last_ts).total_seconds() >= min_interval:
return True
last_trail = _safe_float(last["price"])
min_step_pct = _safe_float(cfg.get("move_push_min_step_pct"), 2.0)
if min_step_pct <= 0 or last_trail <= 0:
return True
return (new_trail / last_trail - 1) * 100 >= min_step_pct
def _loads_json(value, fallback=None):
try:
if isinstance(value, str) and value.strip():
return json.loads(value)
if value:
return value
except Exception:
pass
return fallback if fallback is not None else {}
def _entry_plan(rec: dict) -> dict:
plan = rec.get("entry_plan")
if isinstance(plan, dict):
return plan
return _loads_json(rec.get("entry_plan_json"), {})
def _strategy_code_from_rec(rec: dict) -> str:
plan = _entry_plan(rec)
return normalize_strategy_code(rec.get("strategy_code") or plan.get("strategy_code"))
def _paper_cfg_for_rec(rec: dict, config: dict | None = None) -> dict:
cfg = dict(_paper_cfg(config) or {})
cfg.update(strategy_paper_config(_strategy_code_from_rec(rec)))
return cfg
def _strategy_lineage_from_rec(rec: dict) -> dict:
code = _strategy_code_from_rec(rec)
signal_id = _safe_int(rec.get("strategy_signal_id"))
snapshot = _loads_json(rec.get("strategy_snapshot_json"), {})
roles = _loads_json(rec.get("factor_roles_json"), {})
if not snapshot:
snapshot = {
"strategy_code": code,
"strategy_name": strategy_label(code),
"strategy_version": rec.get("strategy_version") or "",
"symbol": rec.get("symbol") or "",
"source": "recommendation_compat",
}
snapshot.setdefault("strategy_code", code)
snapshot.setdefault("strategy_name", strategy_label(code))
return {
"strategy_code": code,
"strategy_name": strategy_label(code),
"strategy_signal_id": signal_id,
"strategy_snapshot": snapshot,
"factor_roles": roles if isinstance(roles, dict) else {},
"strategy_snapshot_json": json.dumps(snapshot, ensure_ascii=False, default=str),
"factor_roles_json": json.dumps(roles if isinstance(roles, dict) else {}, ensure_ascii=False, default=str),
}
def _strategy_lineage_from_trade_or_order(item: dict) -> dict:
code = normalize_strategy_code(item.get("strategy_code"))
snapshot = _loads_json(item.get("strategy_snapshot_json"), {})
roles = _loads_json(item.get("factor_roles_json"), {})
return {
"strategy_code": code,
"strategy_name": strategy_label(code),
"strategy_signal_id": _safe_int(item.get("strategy_signal_id")),
"strategy_snapshot": snapshot if isinstance(snapshot, dict) else {},
"factor_roles": roles if isinstance(roles, dict) else {},
}
def _parse_time(value: str):
try:
return datetime.fromisoformat(str(value or ""))
except Exception:
return None
def _open_price(current_price: float, config: dict | None = None) -> float:
return round(current_price * (1 + default_slippage_pct(config) / 100), 12)
def _close_price(current_price: float, config: dict | None = None) -> float:
return round(current_price * (1 - default_slippage_pct(config) / 100), 12)
def _trade_pnl_pct(entry_price: float, current_price: float) -> float:
if entry_price <= 0 or current_price <= 0:
return 0.0
return round((current_price / entry_price - 1) * 100, 4)
def _stop_loss_distance_pct(side: str, entry_price: float, stop_loss: float) -> float:
if entry_price <= 0 or stop_loss <= 0:
return 0.0
if str(side or "long").lower() == "short":
return max(0.0, (stop_loss / entry_price - 1) * 100)
return max(0.0, (1 - stop_loss / entry_price) * 100)
def _stop_loss_leverage_risk_pct(side: str, entry_price: float, stop_loss: float, leverage: float) -> float:
return round(_stop_loss_distance_pct(side, entry_price, stop_loss) * max(1.0, _safe_float(leverage, 1.0)), 6)
def _risk_adjusted_leverage(side: str, entry_price: float, stop_loss: float, leverage: float, config: dict | None = None) -> tuple[float, dict]:
cfg = _paper_cfg(config)
base_leverage = max(1.0, _safe_float(leverage, 1.0))
max_sl_risk = max(0.0, _safe_float(cfg.get("max_stop_loss_leverage_risk_pct"), 0))
distance_pct = _stop_loss_distance_pct(side, entry_price, stop_loss)
original_risk = round(distance_pct * base_leverage, 6)
detail = {
"dynamic_leverage_enabled": bool(cfg.get("dynamic_leverage_enabled", True)),
"base_leverage": base_leverage,
"leverage": base_leverage,
"stop_loss_distance_pct": distance_pct,
"stop_loss_leverage_risk_pct": original_risk,
"max_stop_loss_leverage_risk_pct": max_sl_risk,
"adjusted": False,
}
if max_sl_risk <= 0 or distance_pct <= 0 or original_risk <= max_sl_risk + 1e-12:
return base_leverage, detail
if not bool(cfg.get("dynamic_leverage_enabled", True)):
return base_leverage, detail
allowed_leverage = max_sl_risk / distance_pct
min_leverage = max(1.0, _safe_float(cfg.get("dynamic_leverage_min"), 3.0))
detail["allowed_leverage"] = round(allowed_leverage, 6)
detail["min_dynamic_leverage"] = min_leverage
if allowed_leverage + 1e-12 < min_leverage:
return base_leverage, detail
adjusted = round(min(base_leverage, allowed_leverage), 4)
detail.update({
"leverage": adjusted,
"stop_loss_leverage_risk_pct": round(distance_pct * adjusted, 6),
"adjusted": adjusted < base_leverage,
})
return adjusted, detail
def _trade_rr(side: str, entry_price: float, stop_loss: float, tp1: float) -> float:
return _paper_order_rr(side, entry_price, stop_loss, tp1)
def _account_return_pct(pnl_usdt: float, account_equity: float | None = None, config: dict | None = None) -> float:
equity = max(1.0, _safe_float(account_equity, default_account_equity_usdt(config)))
return round(_safe_float(pnl_usdt) / equity * 100, 4)
def _margin_roi_pct(pnl_usdt: float, margin_usdt: float, config: dict | None = None) -> float:
margin = max(1.0, _safe_float(margin_usdt, default_margin_usdt(config)))
return round(_safe_float(pnl_usdt) / margin * 100, 4)
def _trade_margin(trade: dict, config: dict | None = None) -> float:
margin = _safe_float(trade.get("margin_usdt"))
if margin > 0:
return margin
leverage = max(1.0, _safe_float(trade.get("leverage"), default_leverage(config)))
return round(_safe_float(trade.get("notional_usdt")) / leverage, 8)
def _decorate_trade(trade: dict, config: dict | None = None) -> dict:
cfg = _paper_cfg(config)
item = dict(trade)
notional = _safe_float(item.get("notional_usdt"), default_notional_usdt(cfg))
leverage = max(1.0, _safe_float(item.get("leverage"), default_leverage(cfg)))
margin = _trade_margin({"margin_usdt": item.get("margin_usdt"), "notional_usdt": notional, "leverage": leverage}, cfg)
unrealized = round(notional * _safe_float(item.get("pnl_pct")) / 100, 8)
realized = _safe_float(item.get("realized_pnl_usdt"))
effective_pnl = realized if item.get("status") == "closed" else unrealized
item["notional_usdt"] = notional
item["leverage"] = leverage
item["margin_usdt"] = margin
item["unrealized_pnl_usdt"] = unrealized
item["margin_roi_pct"] = _margin_roi_pct(effective_pnl, margin, cfg)
item["account_return_pct"] = _account_return_pct(effective_pnl, config=cfg)
item["account_equity_usdt"] = default_account_equity_usdt(cfg)
latest_market = _safe_float(item.get("latest_market_price"))
item["latest_price"] = latest_market if latest_market > 0 else _safe_float(item.get("current_price"))
item["latest_price_updated_at"] = item.get("latest_market_price_updated_at") or item.get("updated_at") or ""
item.update(_strategy_lineage_from_trade_or_order(item))
return item
def _record_event(conn, trade_id: int, rec_id: int, symbol: str, event_type: str, price: float, pnl_pct: float, message: str, detail=None, event_time: str = ""):
detail = dict(detail or {})
strategy_code = str(detail.get("strategy_code") or "").strip()
strategy_signal_id = _safe_int(detail.get("strategy_signal_id"))
if (not strategy_code or strategy_signal_id <= 0) and _safe_int(trade_id) > 0:
try:
row = conn.execute(
"SELECT strategy_code, strategy_signal_id FROM paper_trades WHERE id=%s",
(_safe_int(trade_id),),
).fetchone()
if row:
strategy_code = strategy_code or row.get("strategy_code") or ""
strategy_signal_id = strategy_signal_id or _safe_int(row.get("strategy_signal_id"))
except Exception:
pass
strategy_code = normalize_strategy_code(strategy_code)
detail.setdefault("strategy_code", strategy_code)
detail.setdefault("strategy_name", strategy_label(strategy_code))
conn.execute(
"""
INSERT INTO paper_trade_events (
trade_id, recommendation_id, symbol, event_type, event_time,
price, pnl_pct, message, detail_json, strategy_code, strategy_signal_id
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""",
(
trade_id,
rec_id,
symbol,
event_type,
event_time or _now(),
price,
pnl_pct,
message,
json.dumps(detail, ensure_ascii=False, default=str),
strategy_code,
strategy_signal_id,
),
)
def _fmt_price(value) -> str:
price = _safe_float(value)
if price <= 0:
return "--"
return f"${price:.8g}"
def _fmt_pct(value) -> str:
pct = _safe_float(value)
sign = "+" if pct > 0 else ""
return f"{sign}{pct:.2f}%"
def _card_field(label: str, value) -> dict:
return {
"tag": "div",
"text": {"tag": "lark_md", "content": f"**{label}**\n{value}"},
}
def _card_note(content: str) -> dict:
return {"tag": "note", "elements": [{"tag": "plain_text", "content": content}]}
def _card_md(content: str) -> dict:
return {"tag": "div", "text": {"tag": "lark_md", "content": content}}
def _push_paper_card(event_type: str, symbol: str, title: str, template: str, fields: list[tuple[str, str]], note: str = "", event_time: str = "") -> None:
try:
elements = []
if fields:
elements.append({"tag": "column_set", "flex_mode": "none", "background_style": "default", "columns": [
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field(label, value)]}
for label, value in fields
]})
if note:
elements.append(_card_note(note))
if event_time:
elements.append(_card_note(f"时间: {event_time}"))
ok, result = push_card({
"metadata": {"source": "paper_trading", "event_type": event_type, "symbol": symbol},
"config": {"wide_screen_mode": True},
"header": {
"template": template,
"title": {"tag": "plain_text", "content": title},
},
"elements": elements,
})
if not ok:
record_system_error(
source="paper_trading",
level="warning",
error_type="FeishuPushFailed",
message=f"Feishu push failed for {event_type} {symbol}: {str(result)[:500]}",
status_code=0,
context={"event_type": event_type, "symbol": symbol, "push_result": result},
fingerprint=f"paper_trading_feishu_push_failed:{event_type}:{symbol}",
)
except Exception as exc:
record_system_error(
source="paper_trading",
level="warning",
error_type=exc.__class__.__name__,
message=f"Feishu push exception for {event_type} {symbol}: {str(exc)[:500]}",
status_code=0,
context={"event_type": event_type, "symbol": symbol},
fingerprint=f"paper_trading_feishu_push_exception:{event_type}:{symbol}",
)
def _push_custom_paper_card(card: dict) -> tuple[bool, object]:
try:
return push_card(card)
except Exception as exc:
return False, str(exc)
def _push_event_card(event_type: str, trade: dict, result: dict, event_time: str = "") -> None:
symbol = str(trade.get("symbol") or "")
short_symbol = symbol.replace("/USDT", "")
if event_type == "open":
_push_paper_card(
event_type,
symbol,
f"交易开仓 - {short_symbol}",
"blue",
[
("成交价", _fmt_price(result.get("entry_price"))),
("名义仓位", f"{_safe_float(result.get('notional_usdt')):.2f} USDT"),
("杠杆/保证金", f"{_safe_float(result.get('leverage')):.1f}x / {_safe_float(result.get('margin_usdt')):.2f} USDT"),
],
"策略信号已进入持仓跟踪。",
event_time,
)
return
if event_type == "close":
exit_reason = str(result.get("exit_reason") or "--")
title_prefix = "移动止盈成交平仓" if exit_reason == "trailing_stop" else "交易平仓"
_push_paper_card(
event_type,
symbol,
f"{title_prefix} - {short_symbol}",
"red" if _safe_float(result.get("pnl_usdt")) < 0 else "green",
[
("退出价", _fmt_price(result.get("exit_price"))),
("收益率", _fmt_pct(result.get("pnl_pct"))),
("收益额", f"{_safe_float(result.get('pnl_usdt')):.2f} USDT"),
("原因", exit_reason),
],
"收益以交易账本记录为准。",
event_time,
)
return
if event_type.startswith("trailing"):
_push_paper_card(
event_type,
symbol,
f"移动止盈{'启动' if event_type == 'trailing_activate' else '上移'} - {short_symbol}",
"yellow",
[
("保护价", _fmt_price(result.get("trailing_stop"))),
("当前收益", _fmt_pct(result.get("pnl_pct"))),
("动作", "启动保护" if event_type == "trailing_activate" else "上移保护价"),
],
"移动止盈用于锁定浮盈。",
event_time,
)
def _push_order_created_card(order: dict, event_time: str = "") -> None:
symbol = str(order.get("symbol") or "")
target = _safe_float(order.get("target_price"))
current = _safe_float(order.get("current_price_at_create"))
distance = round((current / target - 1) * 100, 2) if target and current else 0
_push_paper_card(
"paper_order_create",
symbol,
f"挂单创建 - {symbol.replace('/USDT', '')}",
"wathet",
[
("目标价", _fmt_price(target)),
("当前价", _fmt_price(current)),
("距目标", _fmt_pct(distance)),
("有效期", order.get("expires_at") or "--"),
],
"等回踩机会已进入挂单,触价后进入持仓。",
event_time,
)
def _push_order_filled_card(order: dict, result: dict, event_time: str = "") -> None:
symbol = str(order.get("symbol") or "")
_push_paper_card(
"paper_order_fill",
symbol,
f"挂单成交并开仓 - {symbol.replace('/USDT', '')}",
"green",
[
("挂单价", _fmt_price(order.get("target_price"))),
("成交价", _fmt_price(result.get("entry_price") or order.get("fill_price"))),
("名义仓位", f"{_safe_float(result.get('notional_usdt')):.2f} USDT"),
("来源", order.get("source_status") or "wait_pullback"),
],
"价格触达理想入场位,挂单已转为持仓。",
event_time,
)
def _open_trade(conn, rec: dict, current_price: float, event_time: str, config: dict | None = None, push_open_card: bool = True) -> dict:
cfg = _paper_cfg_for_rec(rec, config)
rec_id = _safe_int(rec.get("id"))
symbol = str(rec.get("symbol") or "").strip().upper()
plan = _entry_plan(rec)
entry_price = _open_price(current_price, cfg)
notional = default_notional_usdt(cfg)
side = str(plan.get("side") or rec.get("side") or "long").strip().lower() or "long"
leverage = default_leverage(cfg)
stop_loss = _safe_float(plan.get("stop_loss") or rec.get("stop_loss"))
tp1 = _safe_float(plan.get("tp1") or plan.get("take_profit_1") or rec.get("tp1"))
rec_score = _safe_float(rec.get("rec_score") or rec.get("score"))
if rec_score <= 0 and rec_id > 0:
row = conn.execute("SELECT rec_score FROM recommendation WHERE id=%s", (rec_id,)).fetchone()
rec_score = _safe_float(row["rec_score"] if row else 0)
if bool(cfg.get("entry_gate_enabled", True)):
calc_rr = _trade_rr(side, entry_price, stop_loss, tp1)
rr_candidates = [
_safe_float(plan.get("rr1")),
_safe_float(plan.get("rr1_live")),
calc_rr,
]
rr = max([x for x in rr_candidates if x > 0], default=0.0)
min_rr = max(0.0, _safe_float(cfg.get("entry_min_rr"), 0))
min_score = max(0.0, _safe_float(cfg.get("entry_min_rec_score"), 0))
adjusted_leverage, leverage_risk = _risk_adjusted_leverage(side, entry_price, stop_loss, leverage, cfg)
sl_risk = _safe_float(leverage_risk.get("stop_loss_leverage_risk_pct"))
max_sl_risk = _safe_float(leverage_risk.get("max_stop_loss_leverage_risk_pct"))
leverage = adjusted_leverage
entry_reasons = []
if rec_score < min_score:
entry_reasons.append("rec_score_below_min")
if rr <= 0:
entry_reasons.append("missing_rr")
elif rr < min_rr:
entry_reasons.append("rr_below_min")
if max_sl_risk > 0 and sl_risk > max_sl_risk:
entry_reasons.append("stop_loss_leverage_risk_exceeded")
if entry_reasons:
return {
"opened": False,
"skipped": True,
"reason": "entry_gate_rejected",
"gate_reasons": entry_reasons,
"gate_detail": {
"rec_score": rec_score,
"min_rec_score": min_score,
"rr1": round(rr, 4) if rr > 0 else 0,
"min_rr": min_rr,
"stop_loss_leverage_risk_pct": sl_risk,
"max_stop_loss_leverage_risk_pct": max_sl_risk,
"entry_price": entry_price,
"stop_loss": stop_loss,
"tp1": tp1,
"leverage": leverage,
"leverage_risk": leverage_risk,
},
}
global_ok, global_detail = _global_risk_entry_check(conn, rec, notional, cfg)
if not global_ok:
return {
"opened": False,
"skipped": True,
"reason": "global_risk_rejected",
"risk_detail": global_detail,
}
adjusted_notional = _market_risk_adjusted_notional(notional, global_detail, cfg)
if adjusted_notional != notional:
plan["market_position_sizing"] = {
"base_notional_usdt": notional,
"adjusted_notional_usdt": adjusted_notional,
"position_multiplier": global_detail.get("position_multiplier"),
"risk_level": global_detail.get("risk_level"),
"decision": global_detail.get("decision"),
}
notional = adjusted_notional
if "leverage_risk" in locals() and leverage_risk.get("adjusted"):
plan["leverage_sizing"] = leverage_risk
pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, notional, event_time, cfg)
if not pause_ok:
return {"opened": False, "skipped": True, "reason": pause_reason, "risk_detail": pause_detail}
leverage_ok, leverage_detail = _cumulative_leverage_check(conn, notional, cfg, exclude_rec_id=rec_id)
if not leverage_ok:
return {
"opened": False,
"skipped": True,
"reason": "cumulative_leverage_exceeded",
"risk_detail": leverage_detail,
}
margin = round(notional / leverage, 8) if leverage > 0 else default_margin_usdt(cfg)
qty = round(notional / entry_price, 12) if entry_price > 0 else 0
tp2 = _safe_float(rec.get("tp2") or plan.get("tp2") or plan.get("take_profit_2"))
fee = round(notional * default_fee_rate(cfg), 8)
now = event_time or _now()
lineage = _strategy_lineage_from_rec(rec)
row = conn.execute(
"""
INSERT INTO paper_trades (
recommendation_id, symbol, side, status, opened_at,
entry_price, qty, notional_usdt, margin_usdt, leverage, stop_loss, tp1, tp2,
max_price, min_price, current_price, pnl_pct, fee_usdt,
source_status, source_action, strategy_version, strategy_code, strategy_signal_id,
strategy_snapshot_json, factor_roles_json, created_at, updated_at
) VALUES (%s,%s,'long','open',%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,0,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT(recommendation_id) DO NOTHING
RETURNING id
""",
(
rec_id,
symbol,
now,
entry_price,
qty,
notional,
margin,
leverage,
stop_loss,
tp1,
tp2,
entry_price,
entry_price,
entry_price,
fee,
rec.get("execution_status") or "",
rec.get("action_status") or "",
rec.get("strategy_version") or "",
lineage["strategy_code"],
lineage["strategy_signal_id"],
lineage["strategy_snapshot_json"],
lineage["factor_roles_json"],
now,
now,
),
).fetchone()
if not row:
return {"opened": False, "reason": "already_exists"}
trade_id = row["id"]
_record_event(
conn,
trade_id,
rec_id,
symbol,
"open",
entry_price,
0.0,
"交易开仓:策略信号已进入持仓跟踪",
{
"notional_usdt": notional,
"margin_usdt": margin,
"leverage": leverage,
"qty": qty,
"fee_usdt": fee,
"slippage_pct": default_slippage_pct(cfg),
"source_status": rec.get("execution_status") or "",
"source_action": rec.get("action_status") or "",
"strategy_code": lineage["strategy_code"],
"strategy_name": lineage["strategy_name"],
"strategy_signal_id": lineage["strategy_signal_id"],
"strategy_snapshot": lineage["strategy_snapshot"],
"factor_roles": lineage["factor_roles"],
"market_regime": global_detail.get("market_regime") or _entry_plan(rec).get("market_regime") or {},
"global_risk": global_detail,
"score_components": _entry_plan(rec).get("score_components") or {},
"decision_log": _entry_plan(rec).get("decision_log") or {},
},
now,
)
result = {
"opened": True,
"trade_id": trade_id,
"entry_price": entry_price,
"qty": qty,
"notional_usdt": notional,
"margin_usdt": margin,
"leverage": leverage,
"global_risk": global_detail,
"market_regime": global_detail.get("market_regime") or _entry_plan(rec).get("market_regime") or {},
}
if push_open_card:
_push_event_card("open", {"symbol": symbol}, result, now)
return result
def _is_wait_pullback(rec: dict) -> bool:
plan = _entry_plan(rec)
execution_status = str(rec.get("execution_status") or "").strip()
action_status = str(rec.get("action_status") or "").strip()
entry_action = str(plan.get("entry_action") or "").strip()
return execution_status == "wait_pullback" or action_status == "等回踩" or entry_action == "等回踩"
def _paper_order_target_price(rec: dict) -> float:
plan = _entry_plan(rec)
return _safe_float(
plan.get("limit_price")
or plan.get("order_price")
or plan.get("entry_price")
or rec.get("entry_price")
)
def _paper_order_expires_at(event_time: str, config: dict | None = None) -> str:
cfg = _paper_cfg(config)
hours = max(0.25, _safe_float(cfg.get("order_expire_hours"), 24.0))
base = _parse_time(event_time) or datetime.now()
return (base + timedelta(hours=hours)).isoformat()
def _paper_order_touched(order: dict, current_price: float) -> bool:
side = str(order.get("side") or "long").lower()
target = _safe_float(order.get("target_price"))
if target <= 0 or current_price <= 0:
return False
if side == "short":
return current_price >= target
return current_price <= target
def _paper_order_too_far(order: dict, current_price: float, config: dict | None = None) -> bool:
cfg = _paper_cfg(config)
threshold_pct = max(0.0, _safe_float(cfg.get("order_cancel_far_from_entry_pct"), 12.0))
if threshold_pct <= 0:
return False
side = str(order.get("side") or "long").lower()
target = _safe_float(order.get("target_price"))
if target <= 0 or current_price <= 0:
return False
if side == "short":
return current_price < target * (1 - threshold_pct / 100)
return current_price > target * (1 + threshold_pct / 100)
def _paper_order_rr(side: str, target: float, stop_loss: float, tp1: float) -> float:
if side == "short":
risk = stop_loss - target
reward = target - tp1
else:
risk = target - stop_loss
reward = tp1 - target
if risk <= 0 or reward <= 0:
return 0.0
return reward / risk
def _paper_order_distance_pct(side: str, current_price: float, target: float) -> float:
if target <= 0 or current_price <= 0:
return 999.0
if side == "short":
return max(0.0, (target / current_price - 1) * 100)
return max(0.0, (current_price / target - 1) * 100)
def _paper_order_gate(rec: dict, current_price: float, config: dict | None = None, conn=None) -> tuple[bool, list[str], dict]:
cfg = _paper_cfg_for_rec(rec, config)
if not bool(cfg.get("order_gate_enabled", True)):
return True, [], {"gate_enabled": False}
plan = _entry_plan(rec)
side = str(plan.get("side") or rec.get("side") or "long").strip().lower() or "long"
target = _paper_order_target_price(rec)
stop_loss = _safe_float(plan.get("stop_loss") or rec.get("stop_loss"))
tp1 = _safe_float(plan.get("tp1") or plan.get("take_profit_1") or rec.get("tp1"))
rr = _safe_float(plan.get("rr1") or plan.get("rr1_live"))
calc_rr = _paper_order_rr(side, target, stop_loss, tp1)
# Wait-pullback orders must be judged at the intended limit price, not at
# the stale confirmation price. A buy-now RR can be invalid while the
# pullback target is perfectly tradeable.
effective_rr = max(rr, calc_rr)
min_rr = max(0.0, _safe_float(cfg.get("order_min_rr"), 1.2))
min_rec_score = max(0.0, _safe_float(cfg.get("order_min_rec_score"), 20.0))
min_distance = max(0.0, _safe_float(cfg.get("order_min_distance_to_entry_pct"), 0.0))
rec_score = _safe_float(rec.get("rec_score") or rec.get("score"))
if rec_score <= 0 and conn is not None and _safe_int(rec.get("id")) > 0:
row = conn.execute("SELECT rec_score FROM recommendation WHERE id=%s", (_safe_int(rec.get("id")),)).fetchone()
rec_score = _safe_float(row["rec_score"] if row else 0)
leverage_ok = True
leverage_detail = {}
if conn is not None:
leverage_ok, leverage_detail = _cumulative_leverage_check(
conn,
default_notional_usdt(cfg),
cfg,
exclude_rec_id=_safe_int(rec.get("id")),
)
distance_pct = _paper_order_distance_pct(side, current_price, target)
max_distance = max(0.0, _safe_float(cfg.get("order_max_distance_to_entry_pct"), 8.0))
opportunity_level = str(plan.get("opportunity_level") or rec.get("opportunity_level") or "").strip()
level_max_action = str(plan.get("max_action") or "").strip()
risk_reward_ok = plan.get("risk_reward_ok")
trigger_ok = plan.get("entry_trigger_confirmed") is True or _safe_int(rec.get("entry_triggered")) == 1
reasons = []
if target <= 0:
reasons.append("missing_target_price")
if stop_loss <= 0:
reasons.append("missing_stop_loss")
if tp1 <= 0:
reasons.append("missing_tp1")
if target > 0 and stop_loss > 0 and tp1 > 0 and calc_rr <= 0:
reasons.append("invalid_risk_geometry")
target_rr_confirms = calc_rr + 1e-9 >= min_rr
if risk_reward_ok is False and not target_rr_confirms:
reasons.append("risk_reward_rejected")
if bool(cfg.get("order_require_risk_reward_ok", True)) and risk_reward_ok is not True and not (
risk_reward_ok is False and target_rr_confirms
):
reasons.append("risk_reward_not_confirmed")
if rec_score < min_rec_score:
reasons.append("rec_score_below_min")
if not leverage_ok:
reasons.append("cumulative_leverage_exceeded")
if effective_rr > 0 and effective_rr < min_rr:
reasons.append("rr_below_min")
if effective_rr <= 0:
reasons.append("missing_rr")
if distance_pct < min_distance:
reasons.append("too_close_to_entry")
if distance_pct > max_distance:
reasons.append("too_far_from_entry")
if opportunity_level in {"momentum_watch", "theme_trend"} or level_max_action == "observe":
reasons.append("observe_only_opportunity")
if bool(cfg.get("order_require_current_trigger", False)) and not trigger_ok:
reasons.append("missing_current_trigger")
return not reasons, reasons, {
"target_price": target,
"stop_loss": stop_loss,
"tp1": tp1,
"rr1": round(effective_rr, 4) if effective_rr > 0 else 0,
"calc_rr1": round(calc_rr, 4) if calc_rr > 0 else 0,
"distance_to_entry_pct": round(distance_pct, 4),
"min_distance_to_entry_pct": min_distance,
"max_distance_to_entry_pct": max_distance,
"min_rr": min_rr,
"rec_score": rec_score,
"min_rec_score": min_rec_score,
"leverage": leverage_detail,
"opportunity_level": opportunity_level,
"entry_trigger_confirmed": trigger_ok,
}
def _cancel_paper_order(conn, order: dict, reason: str, event_time: str) -> dict:
conn.execute(
"""
UPDATE paper_orders
SET status='canceled', cancel_reason=%s, canceled_at=%s, updated_at=%s
WHERE id=%s AND status='pending'
""",
(reason, event_time, event_time, order["id"]),
)
return {"skipped": True, "reason": f"paper_order_{reason}", "paper_order_id": order["id"]}
def _order_recommendation_cancel_reason(conn, rec: dict, order: dict) -> str:
rec_id = _safe_int(rec.get("id") or order.get("recommendation_id"))
if rec_id <= 0:
return ""
row = conn.execute(
"""
SELECT status, execution_status, lifecycle_state, display_bucket
FROM recommendation
WHERE id=%s
""",
(rec_id,),
).fetchone()
if not row:
return "recommendation_missing"
row = dict(row)
status = str(row.get("status") or "").strip().lower()
execution_status = str(row.get("execution_status") or "").strip().lower()
lifecycle_state = str(row.get("lifecycle_state") or "").strip().lower()
display_bucket = str(row.get("display_bucket") or "").strip().lower()
if status in {"expired", "invalid", "archived", "stopped_out", "closed"}:
return "recommendation_invalid"
if execution_status in {"invalid", "expired", "archived", "stopped_out"}:
return "recommendation_invalid"
if lifecycle_state in {"invalid", "expired", "archived", "stopped_out"}:
return "recommendation_invalid"
if display_bucket in {"history", "archive", "archived"}:
return "recommendation_invalid"
return ""
def _order_payload_from_rec(rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict:
cfg = _paper_cfg_for_rec(rec, config)
plan = _entry_plan(rec)
lineage = _strategy_lineage_from_rec(rec)
return {
"recommendation_id": _safe_int(rec.get("id")),
"symbol": str(rec.get("symbol") or "").strip().upper(),
"side": str(plan.get("side") or rec.get("side") or "long").strip().lower() or "long",
"order_type": "limit",
"status": "pending",
"source_status": str(rec.get("execution_status") or ""),
"source_action": str(rec.get("action_status") or plan.get("entry_action") or ""),
"target_price": _paper_order_target_price(rec),
"current_price_at_create": current_price,
"notional_usdt": default_notional_usdt(cfg),
"stop_loss": _safe_float(plan.get("stop_loss") or rec.get("stop_loss")),
"tp1": _safe_float(plan.get("tp1") or plan.get("take_profit_1") or rec.get("tp1")),
"tp2": _safe_float(plan.get("tp2") or plan.get("take_profit_2") or rec.get("tp2")),
"strategy_version": str(rec.get("strategy_version") or ""),
"strategy_code": lineage["strategy_code"],
"strategy_name": lineage["strategy_name"],
"strategy_signal_id": lineage["strategy_signal_id"],
"strategy_snapshot_json": lineage["strategy_snapshot_json"],
"factor_roles_json": lineage["factor_roles_json"],
"entry_plan_snapshot_json": json.dumps(plan, ensure_ascii=False, default=str),
"created_at": event_time,
"updated_at": event_time,
"expires_at": _paper_order_expires_at(event_time, cfg),
}
def _fill_paper_order(conn, order: dict, rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict:
fill_price = _safe_float(order.get("target_price")) or current_price
cfg = _paper_cfg_for_rec(rec, config)
stop_loss = _safe_float(order.get("stop_loss") or _entry_plan(rec).get("stop_loss") or rec.get("stop_loss"))
base_notional = _safe_float(order.get("notional_usdt"), default_notional_usdt(cfg))
global_ok, global_detail = _global_risk_entry_check(conn, rec, base_notional, cfg)
if not global_ok:
# Market/account risk is a temporary execution gate, not proof that the
# original limit order is invalid. Keep the order pending so it can fill
# later if risk improves, instead of instantly wiping every touched order.
conn.execute("UPDATE paper_orders SET updated_at=%s WHERE id=%s", (event_time, order["id"]))
return {
"skipped": True,
"reason": "paper_order_risk_paused",
"paper_order_id": order.get("id"),
"target_price": order.get("target_price"),
"current_price": current_price,
"risk_detail": global_detail,
}
adjusted_notional = _market_risk_adjusted_notional(base_notional, global_detail, cfg)
pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, adjusted_notional, event_time, cfg)
if not pause_ok:
return _cancel_paper_order(conn, order, pause_reason, event_time)
trade_rec = dict(rec)
plan = _entry_plan(trade_rec)
plan.setdefault("entry_price", fill_price)
if adjusted_notional != base_notional:
plan["market_position_sizing"] = {
"base_notional_usdt": base_notional,
"adjusted_notional_usdt": adjusted_notional,
"position_multiplier": global_detail.get("position_multiplier"),
"risk_level": global_detail.get("risk_level"),
"decision": global_detail.get("decision"),
}
trade_rec["entry_plan"] = plan
trade_rec["entry_price"] = fill_price
# Filled limit orders should keep the notional decided when the order was
# created; risk sizing is still applied once inside _open_trade.
fill_cfg = {**cfg, "trade_notional_usdt": base_notional}
result = _open_trade(conn, trade_rec, fill_price, event_time, config=fill_cfg, push_open_card=False)
if result.get("opened"):
order = {**order, "fill_price": fill_price}
conn.execute(
"""
UPDATE paper_orders
SET status='filled', fill_price=%s, filled_at=%s, updated_at=%s
WHERE id=%s
""",
(fill_price, event_time, event_time, order["id"]),
)
result["paper_order"] = {"filled": True, "order_id": order["id"], "fill_price": fill_price}
_push_order_filled_card(order, result, event_time)
stop_loss = _safe_float(rec.get("stop_loss") or _entry_plan(rec).get("stop_loss") or order.get("stop_loss"))
if stop_loss > 0 and current_price <= stop_loss:
trade = conn.execute("SELECT * FROM paper_trades WHERE id=%s", (result["trade_id"],)).fetchone()
if trade:
close_result = _close_trade(conn, dict(trade), current_price, "stop_loss_same_tick", event_time)
result.update({
"closed": True,
"exit_reason": close_result.get("exit_reason"),
"pnl_pct": close_result.get("pnl_pct"),
"pnl_usdt": close_result.get("pnl_usdt"),
"same_tick_stop_loss": True,
})
return result
if result.get("reason") == "already_exists":
conn.execute(
"""
UPDATE paper_orders
SET status='filled', fill_price=%s, filled_at=%s, updated_at=%s
WHERE id=%s
""",
(fill_price, event_time, event_time, order["id"]),
)
result["paper_order"] = {"filled": False, "order_id": order["id"], "fill_price": fill_price}
return result
def _sync_wait_pullback_order(conn, rec: dict, current_price: float, event_time: str, config: dict | None = None) -> dict:
cfg = _paper_cfg_for_rec(rec, config)
rec_id = _safe_int(rec.get("id"))
order = conn.execute("SELECT * FROM paper_orders WHERE recommendation_id=%s", (rec_id,)).fetchone()
if order:
order = dict(order)
if order.get("status") != "pending":
return {"skipped": True, "reason": f"paper_order_{order.get('status')}", "paper_order_id": order.get("id")}
cancel_reason = _order_recommendation_cancel_reason(conn, rec, order)
if cancel_reason:
return _cancel_paper_order(conn, order, cancel_reason, event_time)
if _paper_order_touched(order, current_price):
return _fill_paper_order(conn, order, rec, current_price, event_time, cfg)
expires_at = _parse_time(order.get("expires_at"))
now = _parse_time(event_time) or datetime.now()
if expires_at and now > expires_at:
conn.execute(
"""
UPDATE paper_orders
SET status='expired', cancel_reason='expired', canceled_at=%s, updated_at=%s
WHERE id=%s
""",
(event_time, event_time, order["id"]),
)
return {"skipped": True, "reason": "paper_order_expired", "paper_order_id": order["id"]}
if _paper_order_too_far(order, current_price, cfg):
return _cancel_paper_order(conn, order, "too_far_from_entry", event_time)
conn.execute("UPDATE paper_orders SET updated_at=%s WHERE id=%s", (event_time, order["id"]))
return {
"skipped": True,
"reason": "paper_order_pending",
"paper_order_id": order["id"],
"target_price": order.get("target_price"),
"current_price": current_price,
}
gate_ok, gate_reasons, gate_detail = _paper_order_gate(rec, current_price, cfg, conn=conn)
if not gate_ok:
return {
"skipped": True,
"reason": "paper_order_gate_rejected",
"gate_reasons": gate_reasons,
"gate_detail": gate_detail,
"target_price": gate_detail.get("target_price"),
"current_price": current_price,
}
payload = _order_payload_from_rec(rec, current_price, event_time, cfg)
if payload["recommendation_id"] <= 0 or not payload["symbol"] or payload["target_price"] <= 0:
return {"skipped": True, "reason": "invalid_paper_order"}
row = conn.execute(
"""
INSERT INTO paper_orders (
recommendation_id, symbol, side, order_type, status,
source_status, source_action, target_price, current_price_at_create,
notional_usdt, stop_loss, tp1, tp2, strategy_version, strategy_code,
strategy_signal_id, strategy_snapshot_json, factor_roles_json,
entry_plan_snapshot_json, created_at, updated_at, expires_at
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT(recommendation_id) DO NOTHING
RETURNING id
""",
(
payload["recommendation_id"],
payload["symbol"],
payload["side"],
payload["order_type"],
payload["status"],
payload["source_status"],
payload["source_action"],
payload["target_price"],
payload["current_price_at_create"],
payload["notional_usdt"],
payload["stop_loss"],
payload["tp1"],
payload["tp2"],
payload["strategy_version"],
payload["strategy_code"],
payload["strategy_signal_id"],
payload["strategy_snapshot_json"],
payload["factor_roles_json"],
payload["entry_plan_snapshot_json"],
payload["created_at"],
payload["updated_at"],
payload["expires_at"],
),
).fetchone()
order_id = row["id"] if row else None
order = {"id": order_id, **payload}
if _paper_order_touched(order, current_price):
return _fill_paper_order(conn, order, rec, current_price, event_time, cfg)
cancel_reason = _order_recommendation_cancel_reason(conn, rec, order)
if cancel_reason:
return _cancel_paper_order(conn, order, cancel_reason, event_time)
if _paper_order_too_far(order, current_price, cfg):
return _cancel_paper_order(conn, order, "too_far_from_entry", event_time)
result = {
"skipped": True,
"reason": "paper_order_created",
"paper_order_id": order_id,
"target_price": payload["target_price"],
"current_price": current_price,
"gate_detail": gate_detail,
}
_push_order_created_card(order, event_time)
return result
def _close_trade(conn, trade: dict, current_price: float, reason: str, event_time: str) -> dict:
entry_price = _safe_float(trade.get("entry_price"))
exit_price = _close_price(current_price)
pnl_pct = _trade_pnl_pct(entry_price, exit_price)
notional = _safe_float(trade.get("notional_usdt"))
open_fee = _safe_float(trade.get("fee_usdt"))
close_fee = round(notional * default_fee_rate(), 8)
total_fee = round(open_fee + close_fee, 8)
pnl_usdt = round(notional * pnl_pct / 100 - total_fee, 8)
now = event_time or _now()
conn.execute(
"""
UPDATE paper_trades
SET status='closed',
closed_at=%s,
exit_price=%s,
current_price=%s,
pnl_pct=%s,
realized_pnl_pct=%s,
realized_pnl_usdt=%s,
fee_usdt=%s,
exit_reason=%s,
updated_at=%s
WHERE id=%s AND status='open'
""",
(
now,
exit_price,
exit_price,
pnl_pct,
pnl_pct,
pnl_usdt,
total_fee,
reason,
now,
trade["id"],
),
)
_record_event(
conn,
trade["id"],
trade["recommendation_id"],
trade["symbol"],
"close",
exit_price,
pnl_pct,
f"交易平仓:{reason}",
{"realized_pnl_usdt": pnl_usdt, "fee_usdt": total_fee},
now,
)
_push_event_card(
"close",
trade,
{"exit_price": exit_price, "exit_reason": reason, "pnl_pct": pnl_pct, "pnl_usdt": pnl_usdt},
now,
)
return {"closed": True, "trade_id": trade["id"], "exit_reason": reason, "pnl_pct": pnl_pct, "pnl_usdt": pnl_usdt}
def _update_trailing_stop(conn, trade: dict, current_price: float, pnl_pct: float, event_time: str) -> tuple[float, dict]:
cfg = _trailing_config()
current_trail = _safe_float(trade.get("trailing_stop"))
if not cfg.get("enabled"):
return current_trail, {"activated": False, "moved": False}
entry_price = _safe_float(trade.get("entry_price"))
if entry_price <= 0 or current_price <= 0:
return current_trail, {"activated": False, "moved": False}
profile = _dynamic_trailing_profile(trade, current_price, pnl_pct, cfg)
activate_pnl_pct = _safe_float(profile.get("activate_pnl_pct"), cfg.get("activate_pnl_pct"))
if pnl_pct < activate_pnl_pct:
return current_trail, {
"activated": False,
"moved": False,
"trailing_mode": profile.get("mode"),
"volatility_pct": profile.get("volatility_pct"),
"activate_pnl_pct": activate_pnl_pct,
}
distance_pct = _safe_float(profile.get("distance_pct"), cfg.get("distance_pct"))
tier_label = str(profile.get("tier_label") or "")
protection_floor = entry_price * (1 + _safe_float(cfg.get("min_lock_profit_pct")) / 100)
candidate = current_price * (1 - distance_pct / 100)
new_trail = round(max(current_trail, protection_floor, candidate), 12)
activated = current_trail <= 0 and new_trail > 0
moved = current_trail > 0 and new_trail > current_trail + 1e-12
if not activated and not moved:
return current_trail, {"activated": False, "moved": False}
event_type = "trailing_activate" if activated else "trailing_move"
action_text = "激活" if activated else "上移"
should_emit = activated or _should_emit_trailing_move(conn, trade, new_trail, event_time, cfg)
if should_emit:
message = f"移动止盈{action_text}:保护价 {new_trail:.8g}"
_record_event(
conn,
trade["id"],
trade["recommendation_id"],
trade["symbol"],
event_type,
new_trail,
pnl_pct,
message,
{
"current_price": current_price,
"previous_trailing_stop": current_trail,
"trailing_stop": new_trail,
"activate_pnl_pct": activate_pnl_pct,
"distance_pct": distance_pct,
"tier_label": tier_label,
"trailing_mode": profile.get("mode"),
"volatility_pct": profile.get("volatility_pct"),
"base_activate_pnl_pct": profile.get("base_activate_pnl_pct"),
"base_distance_pct": profile.get("base_distance_pct"),
"min_lock_profit_pct": cfg.get("min_lock_profit_pct"),
"notification_throttled": False,
},
event_time,
)
_push_event_card(event_type, trade, {"trailing_stop": new_trail, "pnl_pct": pnl_pct}, event_time)
return new_trail, {
"activated": activated,
"moved": moved,
"trailing_stop": new_trail,
"previous_trailing_stop": current_trail,
"distance_pct": distance_pct,
"activate_pnl_pct": activate_pnl_pct,
"tier_label": tier_label,
"trailing_mode": profile.get("mode"),
"volatility_pct": profile.get("volatility_pct"),
"notification_emitted": should_emit,
}
def _update_open_trade(conn, trade: dict, current_price: float, event_time: str) -> dict:
entry_price = _safe_float(trade.get("entry_price"))
old_max = _safe_float(trade.get("max_price")) or entry_price
old_min = _safe_float(trade.get("min_price")) or entry_price
new_max = max(old_max, current_price)
new_min = min(old_min, current_price)
pnl_pct = _trade_pnl_pct(entry_price, current_price)
stop_loss = _safe_float(trade.get("stop_loss"))
trailing_stop = _safe_float(trade.get("trailing_stop"))
tp2 = _safe_float(trade.get("tp2"))
tp1 = _safe_float(trade.get("tp1"))
reason = ""
if stop_loss > 0 and current_price <= stop_loss:
reason = "stop_loss"
elif trailing_stop > 0 and current_price <= trailing_stop:
reason = "trailing_stop"
elif tp2 > 0 and current_price >= tp2:
reason = "tp2"
elif tp1 > 0 and current_price >= tp1:
reason = "tp1"
if reason:
return _close_trade(conn, trade, current_price, reason, event_time)
trailing_stop, trailing_result = _update_trailing_stop(conn, trade, current_price, pnl_pct, event_time or _now())
conn.execute(
"""
UPDATE paper_trades
SET current_price=%s,
max_price=%s,
min_price=%s,
trailing_stop=%s,
pnl_pct=%s,
updated_at=%s
WHERE id=%s AND status='open'
""",
(current_price, new_max, new_min, trailing_stop, pnl_pct, event_time or _now(), trade["id"]),
)
return {"updated": True, "trade_id": trade["id"], "pnl_pct": pnl_pct, **trailing_result}
def sync_recommendation(rec: dict, current_price: float, event_time: str = "") -> dict:
"""Open/update paper trade for one recommendation.
This is intentionally independent from recommendation PnL fields. A
recommendation can be a signal; only this ledger represents simulated
execution.
"""
if not paper_trading_enabled():
return {"enabled": False, "skipped": True, "reason": "disabled"}
rec_id = _safe_int(rec.get("id"))
symbol = str(rec.get("symbol") or "").strip().upper()
current_price = _safe_float(current_price)
if rec_id <= 0 or not symbol or current_price <= 0:
return {"enabled": True, "skipped": True, "reason": "invalid_input"}
execution_status = str(rec.get("execution_status") or "").strip()
action_status = str(rec.get("action_status") or "").strip()
event_time = event_time or _now()
cfg = paper_trading_config()
conn = get_conn()
try:
trade = conn.execute("SELECT * FROM paper_trades WHERE recommendation_id=%s", (rec_id,)).fetchone()
if trade:
trade = dict(trade)
if trade.get("status") == "open":
conn.execute(
"""
UPDATE paper_orders
SET status='filled',
fill_price=%s,
filled_at=COALESCE(NULLIF(filled_at,''), %s),
updated_at=%s
WHERE recommendation_id=%s AND status='pending'
""",
(
_safe_float(trade.get("entry_price")),
trade.get("opened_at") or event_time,
event_time,
rec_id,
),
)
result = _update_open_trade(conn, trade, current_price, event_time)
conn.commit()
return result
return {"skipped": True, "reason": "already_closed", "trade_id": trade.get("id")}
if execution_status != "buy_now" and action_status != "可即刻买入":
if _is_wait_pullback(rec):
result = _sync_wait_pullback_order(conn, rec, current_price, event_time, cfg)
conn.commit()
return result
return {"skipped": True, "reason": "not_buy_now"}
pending_order = conn.execute(
"SELECT * FROM paper_orders WHERE recommendation_id=%s AND status='pending'",
(rec_id,),
).fetchone()
if pending_order:
conn.execute(
"""
UPDATE paper_orders
SET status='canceled',
cancel_reason='upgraded_to_buy_now',
canceled_at=%s,
updated_at=%s
WHERE recommendation_id=%s AND status='pending'
""",
(event_time, event_time, rec_id),
)
result = _open_trade(conn, rec, current_price, event_time, config=cfg)
if pending_order:
result["paper_order"] = {
"canceled": True,
"order_id": pending_order["id"],
"cancel_reason": "upgraded_to_buy_now",
}
conn.commit()
return result
except Exception:
conn.rollback()
raise
finally:
try:
conn.close()
except Exception:
pass
def sync_pending_paper_orders(limit: int = 100, event_time: str = "", config: dict | None = None) -> dict:
"""Reconcile pending limit orders against the latest shared price cache.
The strategy runner can miss an existing order if the recommendation is
later derived back into observe status. Pending orders are executable
state, so they need their own reconciliation pass based on the same
latest_price_cache used by the Web page.
"""
if not paper_trading_enabled():
return {"enabled": False, "processed_count": 0, "results": []}
limit = max(1, min(_safe_int(limit, 100), 500))
event_time = event_time or _now()
cfg = _paper_cfg(config)
conn = get_conn()
results = []
try:
rows = conn.execute(
"""
SELECT
po.*,
r.status AS rec_status,
r.rec_state,
r.rec_score,
r.action_status,
r.execution_status,
r.lifecycle_state,
r.display_bucket,
r.entry_price,
r.current_price AS rec_current_price,
r.stop_loss AS rec_stop_loss,
r.tp1 AS rec_tp1,
r.tp2 AS rec_tp2,
r.strategy_version AS rec_strategy_version,
r.strategy_code AS rec_strategy_code,
r.strategy_signal_id AS rec_strategy_signal_id,
r.strategy_snapshot_json AS rec_strategy_snapshot_json,
r.factor_roles_json AS rec_factor_roles_json,
r.entry_plan_json,
r.market_context_json,
r.derivatives_context_json,
r.sector_context_json,
lpc.price AS latest_price,
lpc.updated_at AS latest_price_updated_at
FROM paper_orders po
LEFT JOIN recommendation r ON r.id = po.recommendation_id
LEFT JOIN latest_price_cache lpc ON lpc.symbol = po.symbol
WHERE po.status='pending'
ORDER BY po.created_at ASC, po.id ASC
LIMIT %s
""",
(limit,),
).fetchall()
for row in rows:
item = dict(row)
current_price = _safe_float(item.get("latest_price"))
order = {k: item.get(k) for k in [
"id",
"recommendation_id",
"symbol",
"side",
"order_type",
"status",
"source_status",
"source_action",
"target_price",
"current_price_at_create",
"fill_price",
"notional_usdt",
"stop_loss",
"tp1",
"tp2",
"strategy_version",
"strategy_code",
"strategy_signal_id",
"strategy_snapshot_json",
"factor_roles_json",
"entry_plan_snapshot_json",
"created_at",
"updated_at",
"expires_at",
"filled_at",
"canceled_at",
"cancel_reason",
]}
rec = {
"id": item.get("recommendation_id"),
"symbol": item.get("symbol"),
"status": item.get("rec_status") or "active",
"rec_state": item.get("rec_state"),
"rec_score": item.get("rec_score"),
"action_status": item.get("action_status") or item.get("source_action"),
"execution_status": item.get("execution_status") or item.get("source_status"),
"lifecycle_state": item.get("lifecycle_state"),
"display_bucket": item.get("display_bucket"),
"entry_price": item.get("entry_price") or item.get("target_price"),
"current_price": item.get("rec_current_price") or current_price,
"stop_loss": item.get("rec_stop_loss") or item.get("stop_loss"),
"tp1": item.get("rec_tp1") or item.get("tp1"),
"tp2": item.get("rec_tp2") or item.get("tp2"),
"strategy_version": item.get("rec_strategy_version") or item.get("strategy_version"),
"strategy_code": item.get("rec_strategy_code") or item.get("strategy_code"),
"strategy_signal_id": item.get("rec_strategy_signal_id") or item.get("strategy_signal_id"),
"strategy_snapshot_json": item.get("rec_strategy_snapshot_json") or item.get("strategy_snapshot_json"),
"factor_roles_json": item.get("rec_factor_roles_json") or item.get("factor_roles_json"),
"entry_plan_json": item.get("entry_plan_json") or item.get("entry_plan_snapshot_json"),
"market_context_json": item.get("market_context_json"),
"derivatives_context_json": item.get("derivatives_context_json"),
"sector_context_json": item.get("sector_context_json"),
}
if current_price <= 0:
result = {
"skipped": True,
"reason": "missing_latest_price",
"paper_order_id": order.get("id"),
"symbol": order.get("symbol"),
"target_price": order.get("target_price"),
}
else:
result = _sync_wait_pullback_order(conn, rec, current_price, event_time, cfg)
result.update({
"symbol": order.get("symbol"),
"latest_price": current_price,
"latest_price_updated_at": item.get("latest_price_updated_at") or "",
})
results.append(result)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
try:
conn.close()
except Exception:
pass
return {
"enabled": True,
"processed_count": len(results),
"filled_count": sum(1 for r in results if r.get("paper_order", {}).get("filled")),
"canceled_count": sum(1 for r in results if str(r.get("reason") or "").startswith("paper_order_") and "canceled" in str(r.get("reason") or "")),
"results": results,
"run_time": event_time,
}
def get_paper_trading_summary(days: int = 30) -> dict:
days = max(1, min(_safe_int(days, 30), 365))
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
conn = get_conn()
try:
rows = conn.execute(
"""
SELECT * FROM paper_trades
WHERE opened_at >= %s
ORDER BY opened_at DESC, id DESC
""",
(cutoff,),
).fetchall()
pending_order_count = conn.execute("SELECT COUNT(*) FROM paper_orders WHERE status='pending'").fetchone()[0]
finally:
conn.close()
cfg = paper_trading_config()
items = [_decorate_trade(dict(r), cfg) for r in rows]
open_items = [x for x in items if x.get("status") == "open"]
closed_items = [x for x in items if x.get("status") == "closed"]
wins = [x for x in closed_items if _safe_float(x.get("realized_pnl_pct")) > 0]
losses = [x for x in closed_items if _safe_float(x.get("realized_pnl_pct")) <= 0]
total_realized = round(sum(_safe_float(x.get("realized_pnl_usdt")) for x in closed_items), 4)
avg_realized_pct = round(sum(_safe_float(x.get("realized_pnl_pct")) for x in closed_items) / len(closed_items), 4) if closed_items else 0
open_unrealized = round(sum(_safe_float(x.get("unrealized_pnl_usdt")) for x in open_items), 4)
total_pnl = round(total_realized + open_unrealized, 4)
allocated_margin = round(sum(_safe_float(x.get("margin_usdt")) for x in open_items), 4)
open_position_value = round(sum(_safe_float(x.get("notional_usdt")) for x in open_items), 4)
initial_equity = default_account_equity_usdt(cfg)
current_balance = round(initial_equity + total_pnl, 4)
cumulative_leverage = round(open_position_value / current_balance, 4) if current_balance > 0 else 0
return {
"days": days,
"total": len(items),
"open_count": len(open_items),
"closed_count": len(closed_items),
"win_count": len(wins),
"loss_count": len(losses),
"pending_order_count": int(pending_order_count or 0),
"win_rate": round(len(wins) / len(closed_items) * 100, 2) if closed_items else 0,
"realized_pnl_usdt": total_realized,
"avg_realized_pnl_pct": avg_realized_pct,
"open_unrealized_pnl_usdt": open_unrealized,
"total_pnl_usdt": total_pnl,
"initial_equity_usdt": initial_equity,
"account_equity_usdt": initial_equity,
"current_balance_usdt": current_balance,
"account_realized_return_pct": _account_return_pct(total_realized),
"account_unrealized_return_pct": _account_return_pct(open_unrealized),
"account_total_return_pct": _account_return_pct(total_pnl),
"allocated_margin_usdt": allocated_margin,
"open_position_value_usdt": open_position_value,
"cumulative_leverage": cumulative_leverage,
"available_equity_usdt": round(current_balance - allocated_margin, 4),
"margin_usdt": default_margin_usdt(cfg),
"leverage": default_leverage(cfg),
"notional_usdt": default_notional_usdt(cfg),
"fee_rate": default_fee_rate(cfg),
"slippage_pct": default_slippage_pct(cfg),
}
def get_paper_trading_performance(days: int = 30) -> dict:
days = max(1, min(_safe_int(days, 30), 365))
cfg = paper_trading_config()
initial_equity = default_account_equity_usdt(cfg)
today = datetime.now().date()
start_date = today - timedelta(days=days - 1)
conn = get_conn()
try:
closed_rows = conn.execute(
"""
SELECT closed_at, realized_pnl_usdt
FROM paper_trades
WHERE status='closed' AND closed_at IS NOT NULL
ORDER BY closed_at ASC, id ASC
"""
).fetchall()
open_unrealized = conn.execute(
"""
SELECT COALESCE(SUM(notional_usdt * pnl_pct / 100.0), 0)
FROM paper_trades
WHERE status='open'
"""
).fetchone()[0]
finally:
conn.close()
daily_realized = {}
realized_before = 0.0
for row in closed_rows:
closed_at = _parse_time(row["closed_at"])
pnl = _safe_float(row["realized_pnl_usdt"])
if not closed_at:
continue
closed_date = closed_at.date()
if closed_date < start_date:
realized_before += pnl
continue
if closed_date > today:
continue
key = closed_date.isoformat()
daily_realized[key] = daily_realized.get(key, 0.0) + pnl
points = []
cumulative_realized = realized_before
prev_equity = initial_equity + realized_before
peak_equity = max(initial_equity, prev_equity)
max_drawdown_pct = 0.0
max_drawdown_usdt = 0.0
open_unrealized = _safe_float(open_unrealized)
for idx in range(days):
day = start_date + timedelta(days=idx)
key = day.isoformat()
realized = round(daily_realized.get(key, 0.0), 8)
cumulative_realized += realized
unrealized = open_unrealized if day == today else 0.0
equity = round(initial_equity + cumulative_realized + unrealized, 8)
daily_pnl = round(equity - prev_equity, 8)
daily_return_pct = round(daily_pnl / prev_equity * 100, 6) if prev_equity > 0 else 0.0
peak_equity = max(peak_equity, equity)
drawdown_usdt = round(max(0.0, peak_equity - equity), 8)
drawdown_pct = round(drawdown_usdt / peak_equity * 100, 6) if peak_equity > 0 else 0.0
max_drawdown_pct = max(max_drawdown_pct, drawdown_pct)
max_drawdown_usdt = max(max_drawdown_usdt, drawdown_usdt)
points.append(
{
"date": key,
"equity_usdt": equity,
"daily_pnl_usdt": daily_pnl,
"daily_return_pct": daily_return_pct,
"realized_pnl_usdt": round(cumulative_realized, 8),
"unrealized_pnl_usdt": round(unrealized, 8),
"return_pct": _account_return_pct(equity - initial_equity, initial_equity, cfg),
"drawdown_usdt": drawdown_usdt,
"drawdown_pct": drawdown_pct,
}
)
prev_equity = equity
total_pnl = round((points[-1]["equity_usdt"] - initial_equity) if points else 0.0, 8)
return {
"days": days,
"initial_equity_usdt": initial_equity,
"current_equity_usdt": points[-1]["equity_usdt"] if points else initial_equity,
"total_pnl_usdt": total_pnl,
"total_return_pct": _account_return_pct(total_pnl, initial_equity, cfg),
"max_drawdown_usdt": round(max_drawdown_usdt, 8),
"max_drawdown_pct": round(max_drawdown_pct, 6),
"points": points,
}
def list_paper_trades(limit: int = 50, offset: int = 0, status: str = "", strategy_code: str = "") -> dict:
limit = max(1, min(_safe_int(limit, 50), 200))
offset = max(0, _safe_int(offset, 0))
status = str(status or "").strip()
where = ""
params = []
clauses = []
if status in {"open", "closed"}:
clauses.append("status=%s")
params.append(status)
strategy_code = str(strategy_code or "").strip()
if strategy_code:
clauses.append("strategy_code=%s")
params.append(normalize_strategy_code(strategy_code))
where = "WHERE " + " AND ".join(clauses) if clauses else ""
conn = get_conn()
try:
total = conn.execute(f"SELECT COUNT(*) FROM paper_trades {where}", tuple(params)).fetchone()[0]
rows = conn.execute(
f"""
SELECT pt.*, lpc.price AS latest_market_price, lpc.updated_at AS latest_market_price_updated_at
FROM paper_trades pt
LEFT JOIN latest_price_cache lpc ON lpc.symbol = pt.symbol
{where}
ORDER BY pt.opened_at DESC, pt.id DESC
LIMIT %s OFFSET %s
""",
tuple(params + [limit, offset]),
).fetchall()
finally:
conn.close()
cfg = paper_trading_config()
return {
"items": [_decorate_trade(dict(r), cfg) for r in rows],
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(rows) < int(total or 0),
}
def list_paper_orders(limit: int = 50, offset: int = 0, status: str = "", strategy_code: str = "") -> dict:
limit = max(1, min(_safe_int(limit, 50), 200))
offset = max(0, _safe_int(offset, 0))
status = str(status or "").strip()
where = ""
params = []
clauses = []
if status in {"pending", "filled", "canceled", "expired", "rejected"}:
clauses.append("status=%s")
params.append(status)
strategy_code = str(strategy_code or "").strip()
if strategy_code:
clauses.append("strategy_code=%s")
params.append(normalize_strategy_code(strategy_code))
where = "WHERE " + " AND ".join(clauses) if clauses else ""
conn = get_conn()
try:
total = conn.execute(f"SELECT COUNT(*) FROM paper_orders {where}", tuple(params)).fetchone()[0]
rows = conn.execute(
f"""
SELECT po.*, lpc.price AS latest_market_price, lpc.updated_at AS latest_market_price_updated_at
FROM paper_orders po
LEFT JOIN latest_price_cache lpc ON lpc.symbol = po.symbol
{where}
ORDER BY po.created_at DESC, po.id DESC
LIMIT %s OFFSET %s
""",
tuple(params + [limit, offset]),
).fetchall()
finally:
conn.close()
items = []
for row in rows:
item = dict(row)
item["entry_plan_snapshot"] = _loads_json(item.pop("entry_plan_snapshot_json", "{}"), {})
latest_market = _safe_float(item.get("latest_market_price"))
item["latest_price"] = latest_market if latest_market > 0 else _safe_float(item.get("current_price_at_create"))
item["latest_price_updated_at"] = item.get("latest_market_price_updated_at") or item.get("updated_at") or ""
item.update(_strategy_lineage_from_trade_or_order(item))
target = _safe_float(item.get("target_price"))
latest = _safe_float(item.get("latest_price"))
item["distance_to_target_pct"] = round((latest / target - 1) * 100, 4) if target and latest else 0
items.append(item)
return {
"items": items,
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(items) < int(total or 0),
}
def list_paper_trade_events(limit: int = 80, offset: int = 0, symbol: str = "", event_type: str = "") -> dict:
limit = max(1, min(_safe_int(limit, 80), 200))
offset = max(0, _safe_int(offset, 0))
symbol = str(symbol or "").strip().upper()
event_type = str(event_type or "").strip()
where = []
params = []
if symbol:
where.append("e.symbol=%s")
params.append(symbol)
if event_type:
where.append("e.event_type=%s")
params.append(event_type)
where_sql = "WHERE " + " AND ".join(where) if where else ""
conn = get_conn()
try:
total = conn.execute(
f"SELECT COUNT(*) FROM paper_trade_events e {where_sql}",
tuple(params),
).fetchone()[0]
rows = conn.execute(
f"""
SELECT
e.*,
t.status AS trade_status,
t.entry_price,
t.exit_price,
t.notional_usdt,
t.margin_usdt,
t.leverage,
t.exit_reason,
t.opened_at,
t.closed_at,
t.strategy_code AS trade_strategy_code,
t.strategy_signal_id AS trade_strategy_signal_id
FROM paper_trade_events e
LEFT JOIN paper_trades t ON t.id = e.trade_id
{where_sql}
ORDER BY e.event_time DESC, e.id DESC
LIMIT %s OFFSET %s
""",
tuple(params + [limit, offset]),
).fetchall()
finally:
conn.close()
items = []
for row in rows:
item = dict(row)
item["detail"] = _loads_json(item.pop("detail_json", "{}"), {})
item["strategy_code"] = normalize_strategy_code(item.get("strategy_code") or item.get("trade_strategy_code"))
item["strategy_name"] = strategy_label(item["strategy_code"])
item["strategy_signal_id"] = _safe_int(item.get("strategy_signal_id") or item.get("trade_strategy_signal_id"))
items.append(item)
return {
"items": items,
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(items) < int(total or 0),
}
def delete_paper_trade(trade_id: int) -> dict:
trade_id = _safe_int(trade_id)
if trade_id <= 0:
return {"deleted": False, "reason": "invalid_trade_id"}
conn = get_conn()
try:
trade = conn.execute("SELECT id, recommendation_id, symbol, status FROM paper_trades WHERE id=%s", (trade_id,)).fetchone()
if not trade:
return {"deleted": False, "reason": "not_found", "trade_id": trade_id}
event_count = conn.execute("SELECT COUNT(*) FROM paper_trade_events WHERE trade_id=%s", (trade_id,)).fetchone()[0]
conn.execute("DELETE FROM paper_trade_events WHERE trade_id=%s", (trade_id,))
conn.execute("DELETE FROM paper_trades WHERE id=%s", (trade_id,))
conn.commit()
return {
"deleted": True,
"trade_id": trade_id,
"symbol": trade["symbol"],
"status": trade["status"],
"deleted_events": int(event_count or 0),
}
finally:
conn.close()
def delete_paper_order(order_id: int) -> dict:
order_id = _safe_int(order_id)
if order_id <= 0:
return {"deleted": False, "reason": "invalid_order_id"}
conn = get_conn()
try:
order = conn.execute("SELECT id, recommendation_id, symbol, status FROM paper_orders WHERE id=%s", (order_id,)).fetchone()
if not order:
return {"deleted": False, "reason": "not_found", "order_id": order_id}
conn.execute("DELETE FROM paper_orders WHERE id=%s", (order_id,))
conn.commit()
return {
"deleted": True,
"order_id": order_id,
"symbol": order["symbol"],
"status": order["status"],
}
finally:
conn.close()
def reset_paper_trading_data(scope: str = "all") -> dict:
scope = str(scope or "all").strip().lower()
allowed = {"all", "trades", "orders", "events", "open_trades", "closed_trades", "completed"}
if scope not in allowed:
return {"reset": False, "reason": "invalid_scope", "scope": scope, "allowed_scopes": sorted(allowed)}
conn = get_conn()
try:
deleted = {"trades": 0, "orders": 0, "events": 0}
def delete_trades(where_sql: str = "", params: tuple = ()) -> None:
rows = conn.execute(f"SELECT id FROM paper_trades {where_sql}", params).fetchall()
ids = [int(r["id"]) for r in rows]
if not ids:
return
event_count = conn.execute("SELECT COUNT(*) FROM paper_trade_events WHERE trade_id = ANY(%s)", (ids,)).fetchone()[0]
conn.execute("DELETE FROM paper_trade_events WHERE trade_id = ANY(%s)", (ids,))
conn.execute("DELETE FROM paper_trades WHERE id = ANY(%s)", (ids,))
deleted["events"] += int(event_count or 0)
deleted["trades"] += len(ids)
def delete_orders(where_sql: str = "", params: tuple = ()) -> None:
count = conn.execute(f"SELECT COUNT(*) FROM paper_orders {where_sql}", params).fetchone()[0]
conn.execute(f"DELETE FROM paper_orders {where_sql}", params)
deleted["orders"] += int(count or 0)
if scope == "all":
deleted["events"] += int(conn.execute("SELECT COUNT(*) FROM paper_trade_events").fetchone()[0] or 0)
deleted["trades"] += int(conn.execute("SELECT COUNT(*) FROM paper_trades").fetchone()[0] or 0)
deleted["orders"] += int(conn.execute("SELECT COUNT(*) FROM paper_orders").fetchone()[0] or 0)
conn.execute("DELETE FROM paper_trade_events")
conn.execute("DELETE FROM paper_trades")
conn.execute("DELETE FROM paper_orders")
elif scope == "events":
deleted["events"] += int(conn.execute("SELECT COUNT(*) FROM paper_trade_events").fetchone()[0] or 0)
conn.execute("DELETE FROM paper_trade_events")
elif scope == "orders":
delete_orders()
elif scope == "trades":
delete_trades()
elif scope == "open_trades":
delete_trades("WHERE status='open'")
elif scope == "closed_trades":
delete_trades("WHERE status='closed'")
elif scope == "completed":
delete_trades("WHERE status='closed'")
delete_orders("WHERE status IN ('filled','expired','canceled','rejected')")
conn.commit()
return {"reset": True, "scope": scope, "deleted": deleted}
finally:
conn.close()
def _report_trade_line(item: dict, closed: bool = False) -> str:
symbol = item.get("symbol") or "--"
side = "" if str(item.get("side") or "long").lower() == "short" else ""
pnl = _safe_float(item.get("realized_pnl_pct") if closed else item.get("pnl_pct"))
pnl_usdt = _safe_float(item.get("realized_pnl_usdt") if closed else item.get("unrealized_pnl_usdt"))
status = item.get("exit_reason") if closed else "持仓中"
return f"- **{symbol}** · {side} · {_fmt_pct(pnl)} · {pnl_usdt:+.2f} USDT · {status or '--'}"
def _report_order_line(item: dict) -> str:
symbol = item.get("symbol") or "--"
side = "" if str(item.get("side") or "long").lower() == "short" else ""
distance = _safe_float(item.get("distance_to_target_pct"))
return f"- **{symbol}** · {side} · 目标 {_fmt_price(item.get('target_price'))} · 距目标 {_fmt_pct(distance)}"
def _paper_running_days() -> float:
conn = get_conn()
try:
row = conn.execute(
"""
SELECT MIN(first_at) AS first_at
FROM (
SELECT MIN(opened_at) AS first_at FROM paper_trades
UNION ALL
SELECT MIN(created_at) AS first_at FROM paper_orders
) x
WHERE first_at IS NOT NULL AND first_at <> ''
"""
).fetchone()
finally:
conn.close()
first_at = _parse_time(row["first_at"] if row else "")
if not first_at:
return 0.0
now = datetime.now(first_at.tzinfo) if first_at.tzinfo else datetime.now()
seconds = max(0.0, (now - first_at).total_seconds())
return max(1.0, seconds / 86400)
def _periodized_return_pct(total_return_pct: float, running_days: float, period_days: float) -> float:
if running_days <= 0:
return 0.0
growth = 1 + total_return_pct / 100
if growth <= 0:
return -100.0
return round((growth ** (period_days / running_days) - 1) * 100, 4)
def _periodized_return_label(total_return_pct: float, running_days: float, period_days: float) -> str:
if running_days < 30:
return "样本不足"
return _fmt_pct(_periodized_return_pct(total_return_pct, running_days, period_days))
def send_paper_trading_report(days: int = 30) -> dict:
days = max(1, min(_safe_int(days, 30), 365))
summary = get_paper_trading_summary(days=days)
open_trades = list_paper_trades(limit=5, status="open").get("items", [])
closed_trades = list_paper_trades(limit=5, status="closed").get("items", [])
pending_orders = list_paper_orders(limit=5, status="pending").get("items", [])
total_pnl = _safe_float(summary.get("total_pnl_usdt"))
realized = _safe_float(summary.get("realized_pnl_usdt"))
unrealized = _safe_float(summary.get("open_unrealized_pnl_usdt"))
initial_equity = _safe_float(summary.get("initial_equity_usdt"))
current_balance = _safe_float(summary.get("current_balance_usdt"))
return_pct = _safe_float(summary.get("account_total_return_pct"))
win_rate = _safe_float(summary.get("win_rate"))
running_days = _paper_running_days()
monthly_return_pct = _periodized_return_pct(return_pct, running_days, 30) if running_days >= 30 else None
annualized_return_pct = _periodized_return_pct(return_pct, running_days, 365) if running_days >= 30 else None
monthly_return_label = _periodized_return_label(return_pct, running_days, 30)
annualized_return_label = _periodized_return_label(return_pct, running_days, 365)
template = "green" if total_pnl >= 0 else "red"
elements = [
{
"tag": "column_set",
"flex_mode": "none",
"background_style": "default",
"columns": [
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("初始资金", f"{initial_equity:.2f} USDT")]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("当前资金", f"{current_balance:.2f} USDT")]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("运行天数", f"{running_days:.1f}")]},
],
},
{
"tag": "column_set",
"flex_mode": "none",
"background_style": "default",
"columns": [
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("账户收益率", _fmt_pct(return_pct))]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("月化收益率", monthly_return_label)]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("年化收益率", annualized_return_label)]},
],
},
{
"tag": "column_set",
"flex_mode": "none",
"background_style": "default",
"columns": [
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("总收益", f"{total_pnl:+.2f} USDT")]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("成功率", f"{_fmt_pct(win_rate)} ({summary.get('win_count', 0)}/{summary.get('closed_count', 0)})")]},
{"tag": "column", "width": "weighted", "weight": 1, "elements": [_card_field("交易数量", f"持仓 {summary.get('open_count', 0)} / 平仓 {summary.get('closed_count', 0)} / 挂单 {summary.get('pending_order_count', 0)}")]},
],
},
_card_md(
"**战绩概览**\n"
f"- 已实现: {realized:+.2f} USDT\n"
f"- 浮动收益: {unrealized:+.2f} USDT\n"
f"- 平均平仓收益率: {_fmt_pct(summary.get('avg_realized_pnl_pct'))}\n"
f"- 持仓名义价值: {_safe_float(summary.get('open_position_value_usdt')):.2f} USDT\n"
f"- 已占用保证金: {_safe_float(summary.get('allocated_margin_usdt')):.2f} USDT\n"
f"- 可用资金: {_safe_float(summary.get('available_equity_usdt')):.2f} USDT\n"
f"- 累计杠杆: {_safe_float(summary.get('cumulative_leverage')):.2f}x"
),
_card_md("**当前持仓**\n" + ("\n".join(_report_trade_line(x) for x in open_trades) if open_trades else "- 暂无持仓")),
_card_md("**等待成交**\n" + ("\n".join(_report_order_line(x) for x in pending_orders) if pending_orders else "- 暂无挂单")),
_card_md("**最近平仓**\n" + ("\n".join(_report_trade_line(x, closed=True) for x in closed_trades) if closed_trades else "- 暂无平仓记录")),
_card_note(f"报告周期: 最近 {days} 天 · 发送时间: {_now()}"),
]
ok, push_result = _push_custom_paper_card({
"metadata": {"source": "paper_trading", "event_type": "trade_report", "symbol": "ALL"},
"config": {"wide_screen_mode": True},
"header": {
"template": template,
"title": {"tag": "plain_text", "content": f"交易报告 - 最近 {days}"},
},
"elements": elements,
})
return {
"ok": bool(ok),
"days": days,
"running_days": running_days,
"monthly_return_pct": monthly_return_pct,
"annualized_return_pct": annualized_return_pct,
"monthly_return_label": monthly_return_label,
"annualized_return_label": annualized_return_label,
"summary": summary,
"sent_at": _now(),
"push_result": push_result,
}