alphax/app/db/strategy_insights.py
2026-06-02 13:31:06 +08:00

885 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Strategy attribution read model based on opportunity and paper-trading conversion."""
import json
import re
from datetime import datetime, timedelta
from app.core.strategy_registry import normalize_strategy_code, registered_strategy_codes, strategy_definition, strategy_label
from app.db.schema import get_conn
def safe_list_json(value):
try:
if isinstance(value, list):
return value
if isinstance(value, str) and value.strip():
parsed = json.loads(value)
return parsed if isinstance(parsed, list) else []
except Exception:
pass
return []
def safe_dict_json(value):
try:
if isinstance(value, dict):
return value
if isinstance(value, str) and value.strip():
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
except Exception:
pass
return {}
def _safe_float(value, default=0.0):
try:
if value is None or value == "":
return default
return float(value)
except Exception:
return default
def _safe_int(value, default=0):
try:
if value is None or value == "":
return default
return int(value)
except Exception:
return default
def _pct(part, total):
return round(float(part or 0) / float(total or 0) * 100, 2) if total else 0.0
def _strategy_direction(definition) -> str:
direction = str((definition.entry_gate_config or {}).get("direction") or "long").strip().lower()
return "short" if direction == "short" else "long"
def _direction_label(direction: str) -> str:
return "" if str(direction or "").lower() == "short" else ""
def evaluate_strategy_decision(metrics: dict) -> dict:
"""Turn strategy metrics into an explicit lifecycle recommendation.
This is advisory only. It does not mutate strategy configs; release/pause
should still go through the strategy iteration gate.
"""
signal_count = _safe_int(metrics.get("signal_count"))
opportunity_count = _safe_int(metrics.get("opportunity_count"))
trade_count = _safe_int(metrics.get("trade_count"))
closed_count = _safe_int(metrics.get("closed_trade_count"))
win_rate = _safe_float(metrics.get("win_rate_pct"))
avg_pnl = _safe_float(metrics.get("avg_realized_pnl_pct"))
realized = _safe_float(metrics.get("realized_pnl_usdt"))
worst = _safe_float(metrics.get("worst_pnl_pct"))
fill_rate = _safe_float(metrics.get("order_fill_rate_pct"))
trade_conversion = _safe_float(metrics.get("trade_conversion_pct"))
score = 50.0
score += min(signal_count, 40) * 0.25
score += min(opportunity_count, 40) * 0.2
score += min(trade_conversion, 40) * 0.25
score += (win_rate - 50) * 0.35 if closed_count else 0
score += max(-12, min(12, avg_pnl)) * 1.8
score += 6 if realized > 0 else (-6 if realized < 0 else 0)
score += 4 if fill_rate >= 40 else 0
score -= 8 if worst <= -8 else 0
score = round(max(0, min(100, score)), 1)
reasons = []
next_actions = []
decision = "observe"
decision_label = "继续观察"
if signal_count < 5 and opportunity_count < 5:
decision = "collect_samples"
decision_label = "样本不足"
reasons.append("信号和机会样本不足,暂不判断优劣")
next_actions.append("继续收集样本,不要直接调高权重")
elif trade_count == 0:
decision = "review_entry_gate"
decision_label = "检查入场闸门"
reasons.append("已有发现样本,但还没有进入策略交易")
next_actions.append("检查 RR、买点距离、全局风控和挂单成交条件")
elif closed_count < 5:
decision = "gray"
decision_label = "灰度观察"
reasons.append("已有交易样本,但平仓数量不足 5 笔")
next_actions.append("保持 paper-only等更多已平仓样本")
elif win_rate >= 55 and avg_pnl > 0 and realized > 0:
decision = "promote"
decision_label = "优先保留"
reasons.append("胜率、平均收益和已实现收益同时为正")
next_actions.append("允许维持或小幅提升策略权重,但仍需观察回撤")
elif win_rate < 35 or avg_pnl <= -2 or realized < 0 and worst <= -6:
decision = "pause"
decision_label = "暂停/降权"
reasons.append("胜率或平均收益不达标,且存在较差回撤")
next_actions.append("暂停新增真实跟单,只保留观察或降低仓位")
elif fill_rate < 15 and opportunity_count >= 10:
decision = "tune_entry"
decision_label = "优化入场"
reasons.append("机会样本不少,但挂单成交或执行转化偏低")
next_actions.append("复查挂单价格、有效期、回踩距离和成交触发")
else:
decision = "keep"
decision_label = "保留运行"
reasons.append("当前表现没有触发暂停或升级条件")
next_actions.append("继续按当前门槛运行并积累样本")
return {
"decision": decision,
"decision_label": decision_label,
"evaluation_score": score,
"reasons": reasons,
"next_actions": next_actions,
}
def get_strategy_evaluation(days: int = 30) -> dict:
days = max(1, min(_safe_int(days, 30), 365))
since = (datetime.now() - timedelta(days=days)).isoformat()
codes = registered_strategy_codes()
metrics = {}
for code in codes:
definition = strategy_definition(code)
metrics[code] = {
"strategy_code": code,
"strategy_name": definition.strategy_name,
"description": definition.description,
"mode": definition.mode,
"status": definition.status,
"direction": _strategy_direction(definition),
"direction_label": _direction_label(_strategy_direction(definition)),
"signal_count": 0,
"candidate_signal_count": 0,
"observe_signal_count": 0,
"avg_signal_confidence": 0.0,
"opportunity_count": 0,
"actionable_count": 0,
"buy_now_count": 0,
"wait_pullback_count": 0,
"observe_count": 0,
"order_count": 0,
"filled_order_count": 0,
"canceled_order_count": 0,
"trade_count": 0,
"long_trade_count": 0,
"short_trade_count": 0,
"open_trade_count": 0,
"closed_trade_count": 0,
"win_count": 0,
"loss_count": 0,
"realized_pnl_usdt": 0.0,
"pnl_pct_values": [],
"best_pnl_pct": None,
"worst_pnl_pct": None,
}
def bucket(code):
normalized = normalize_strategy_code(code)
if normalized not in metrics:
definition = strategy_definition(normalized)
direction = _strategy_direction(definition)
metrics[normalized] = {
"strategy_code": normalized,
"strategy_name": definition.strategy_name,
"description": definition.description,
"mode": definition.mode,
"status": definition.status,
"direction": direction,
"direction_label": _direction_label(direction),
"signal_count": 0,
"candidate_signal_count": 0,
"observe_signal_count": 0,
"avg_signal_confidence": 0.0,
"opportunity_count": 0,
"actionable_count": 0,
"buy_now_count": 0,
"wait_pullback_count": 0,
"observe_count": 0,
"order_count": 0,
"filled_order_count": 0,
"canceled_order_count": 0,
"trade_count": 0,
"long_trade_count": 0,
"short_trade_count": 0,
"open_trade_count": 0,
"closed_trade_count": 0,
"win_count": 0,
"loss_count": 0,
"realized_pnl_usdt": 0.0,
"pnl_pct_values": [],
"best_pnl_pct": None,
"worst_pnl_pct": None,
}
return metrics[normalized]
conn = get_conn()
try:
for row in conn.execute(
"""
SELECT strategy_code,
COUNT(*) AS signal_count,
SUM(CASE WHEN signal_status='candidate' THEN 1 ELSE 0 END) AS candidate_count,
SUM(CASE WHEN signal_status='observe' THEN 1 ELSE 0 END) AS observe_count,
AVG(confidence) AS avg_confidence
FROM strategy_signals
WHERE created_at >= %s
GROUP BY strategy_code
""",
(since,),
).fetchall():
b = bucket(row.get("strategy_code"))
b["signal_count"] = _safe_int(row.get("signal_count"))
b["candidate_signal_count"] = _safe_int(row.get("candidate_count"))
b["observe_signal_count"] = _safe_int(row.get("observe_count"))
b["avg_signal_confidence"] = round(_safe_float(row.get("avg_confidence")), 2)
for row in conn.execute(
"""
SELECT strategy_code,
COUNT(*) AS opportunity_count,
SUM(CASE WHEN execution_status IN ('buy_now','wait_pullback') THEN 1 ELSE 0 END) AS actionable_count,
SUM(CASE WHEN execution_status='buy_now' THEN 1 ELSE 0 END) AS buy_now_count,
SUM(CASE WHEN execution_status='wait_pullback' THEN 1 ELSE 0 END) AS wait_pullback_count,
SUM(CASE WHEN execution_status='observe' THEN 1 ELSE 0 END) AS observe_count
FROM recommendation
WHERE rec_time >= %s
GROUP BY strategy_code
""",
(since,),
).fetchall():
b = bucket(row.get("strategy_code"))
for key in ("opportunity_count", "actionable_count", "buy_now_count", "wait_pullback_count", "observe_count"):
b[key] = _safe_int(row.get(key))
for row in conn.execute(
"""
SELECT COALESCE(NULLIF(po.strategy_code, ''), r.strategy_code) AS strategy_code,
COUNT(*) AS order_count,
SUM(CASE WHEN po.status='filled' THEN 1 ELSE 0 END) AS filled_count,
SUM(CASE WHEN po.status IN ('canceled','rejected','expired') THEN 1 ELSE 0 END) AS canceled_count
FROM paper_orders po
LEFT JOIN recommendation r ON r.id = po.recommendation_id
WHERE po.created_at >= %s
GROUP BY COALESCE(NULLIF(po.strategy_code, ''), r.strategy_code)
""",
(since,),
).fetchall():
b = bucket(row.get("strategy_code"))
b["order_count"] = _safe_int(row.get("order_count"))
b["filled_order_count"] = _safe_int(row.get("filled_count"))
b["canceled_order_count"] = _safe_int(row.get("canceled_count"))
for row in conn.execute(
"""
SELECT COALESCE(NULLIF(pt.strategy_code, ''), r.strategy_code) AS strategy_code,
pt.side,
pt.status
FROM paper_trades pt
LEFT JOIN recommendation r ON r.id = pt.recommendation_id
WHERE pt.opened_at >= %s
OR (pt.status='closed' AND COALESCE(pt.closed_at, pt.updated_at, pt.opened_at) >= %s)
""",
(since, since),
).fetchall():
b = bucket(row.get("strategy_code"))
status = row.get("status") or ""
side = str(row.get("side") or "long").strip().lower()
b["trade_count"] += 1
if side == "short":
b["short_trade_count"] += 1
else:
b["long_trade_count"] += 1
if status == "open":
b["open_trade_count"] += 1
for row in conn.execute(
"""
SELECT COALESCE(NULLIF(pt.strategy_code, ''), r.strategy_code) AS strategy_code,
pt.side,
pt.realized_pnl_pct,
pt.realized_pnl_usdt
FROM paper_trades pt
LEFT JOIN recommendation r ON r.id = pt.recommendation_id
WHERE pt.status='closed'
AND COALESCE(pt.closed_at, pt.updated_at, pt.opened_at) >= %s
""",
(since,),
).fetchall():
b = bucket(row.get("strategy_code"))
b["closed_trade_count"] += 1
pnl_pct = _safe_float(row.get("realized_pnl_pct"))
pnl_usdt = _safe_float(row.get("realized_pnl_usdt"))
b["realized_pnl_usdt"] += pnl_usdt
b["pnl_pct_values"].append(pnl_pct)
if pnl_pct > 0:
b["win_count"] += 1
elif pnl_pct < 0:
b["loss_count"] += 1
b["best_pnl_pct"] = pnl_pct if b["best_pnl_pct"] is None else max(b["best_pnl_pct"], pnl_pct)
b["worst_pnl_pct"] = pnl_pct if b["worst_pnl_pct"] is None else min(b["worst_pnl_pct"], pnl_pct)
finally:
conn.close()
strategies = []
for item in metrics.values():
values = item.pop("pnl_pct_values", [])
item["realized_pnl_usdt"] = round(item["realized_pnl_usdt"], 4)
item["avg_realized_pnl_pct"] = round(sum(values) / len(values), 4) if values else 0.0
item["win_rate_pct"] = _pct(item["win_count"], item["closed_trade_count"])
item["actionable_rate_pct"] = _pct(item["actionable_count"], item["opportunity_count"])
item["trade_conversion_pct"] = _pct(item["trade_count"], item["opportunity_count"])
item["order_fill_rate_pct"] = _pct(item["filled_order_count"], item["order_count"])
item["candidate_signal_rate_pct"] = _pct(item["candidate_signal_count"], item["signal_count"])
item["best_pnl_pct"] = item["best_pnl_pct"] if item["best_pnl_pct"] is not None else 0.0
item["worst_pnl_pct"] = item["worst_pnl_pct"] if item["worst_pnl_pct"] is not None else 0.0
item.update(evaluate_strategy_decision(item))
strategies.append(item)
strategies.sort(key=lambda x: (x["evaluation_score"], x["realized_pnl_usdt"], x["closed_trade_count"], x["signal_count"]), reverse=True)
decisions = {}
for item in strategies:
decisions[item["decision"]] = decisions.get(item["decision"], 0) + 1
return {
"definition": "策略评价按 strategy_code 独立统计发现、执行、成交、收益和回撤,并给出保留/灰度/暂停等建议;建议不直接改配置,仍需经过策略迭代闸门。",
"days": days,
"generated_at": datetime.now().isoformat(timespec="seconds"),
"summary": {
"strategy_count": len(strategies),
"active_signal_strategy_count": sum(1 for x in strategies if x["signal_count"] > 0),
"traded_strategy_count": sum(1 for x in strategies if x["trade_count"] > 0),
"promote_count": decisions.get("promote", 0),
"pause_count": decisions.get("pause", 0),
"gray_count": decisions.get("gray", 0),
"collect_samples_count": decisions.get("collect_samples", 0),
},
"decision_distribution": [{"name": k, "count": v} for k, v in sorted(decisions.items(), key=lambda x: (-x[1], x[0]))],
"strategies": strategies,
}
def get_strategy_insights(days: int | None = None):
"""Strategy attribution based on opportunity and paper-trading conversion.
Recommendation rows are opportunities/signals, not an execution ledger.
Therefore this read model does not use recommendation.pnl_pct as strategy
PnL. Paper-trading PnL is exposed only as an execution-conversion metric.
"""
if days is not None:
days = max(1, min(_safe_int(days, 30), 365))
since = (datetime.now() - timedelta(days=days)).isoformat()
rec_where = "WHERE r.rec_time >= %s"
rec_params = (since,)
trade_where = "WHERE pt.status='closed' AND COALESCE(pt.closed_at, pt.updated_at, pt.opened_at) >= %s"
trade_params = (since,)
else:
days = None
since = None
rec_where = ""
rec_params = ()
trade_where = "WHERE pt.status='closed'"
trade_params = ()
conn = get_conn()
rows = conn.execute(
f"""
SELECT
r.*,
pt.id AS paper_trade_id,
pt.status AS paper_status,
pt.side AS paper_side,
pt.source_status AS paper_source_status,
pt.source_action AS paper_source_action,
pt.strategy_code AS paper_strategy_code,
pt.realized_pnl_pct AS paper_realized_pnl_pct,
pt.realized_pnl_usdt AS paper_realized_pnl_usdt,
pt.pnl_pct AS paper_pnl_pct,
pt.exit_reason AS paper_exit_reason,
po.id AS paper_order_id,
po.status AS paper_order_status,
po.strategy_code AS paper_order_strategy_code
FROM recommendation r
LEFT JOIN paper_trades pt ON pt.recommendation_id = r.id
LEFT JOIN paper_orders po ON po.recommendation_id = r.id
{rec_where}
ORDER BY r.rec_time DESC, r.id DESC
""",
rec_params,
).fetchall()
trade_rows = conn.execute(
f"""
SELECT
r.*,
pt.id AS paper_trade_id,
pt.status AS paper_status,
pt.side AS paper_side,
pt.source_status AS paper_source_status,
pt.source_action AS paper_source_action,
pt.strategy_code AS paper_strategy_code,
pt.realized_pnl_pct AS paper_realized_pnl_pct,
pt.realized_pnl_usdt AS paper_realized_pnl_usdt,
pt.pnl_pct AS paper_pnl_pct,
pt.exit_reason AS paper_exit_reason,
po.id AS paper_order_id,
po.status AS paper_order_status,
po.strategy_code AS paper_order_strategy_code
FROM paper_trades pt
LEFT JOIN recommendation r ON r.id = pt.recommendation_id
LEFT JOIN paper_orders po ON po.recommendation_id = pt.recommendation_id
{trade_where}
ORDER BY COALESCE(pt.closed_at, pt.updated_at, pt.opened_at) DESC, pt.id DESC
""",
trade_params,
).fetchall()
conn.close()
items = [dict(r) for r in rows]
trade_items = [dict(r) for r in trade_rows]
actionable_statuses = {"buy_now", "wait_pullback"}
total = len(items)
actionable = [x for x in items if (x.get("execution_status") or "") in actionable_statuses]
buy_now = [x for x in items if (x.get("execution_status") or "") == "buy_now"]
paper_items = [x for x in items if x.get("paper_trade_id")]
closed_paper = trade_items
paper_wins = [x for x in closed_paper if float(x.get("paper_realized_pnl_pct") or 0) > 0]
paper_realized_usdt = round(sum(float(x.get("paper_realized_pnl_usdt") or 0) for x in closed_paper), 4)
overview = {
"total_opportunities": total,
"actionable_count": len(actionable),
"buy_now_count": len(buy_now),
"paper_trade_count": len(paper_items),
"closed_paper_trade_count": len(closed_paper),
"paper_win_count": len(paper_wins),
"paper_win_rate_pct": round(len(paper_wins) / len(closed_paper) * 100, 1) if closed_paper else 0,
"paper_realized_pnl_usdt": paper_realized_usdt,
"actionable_conversion_pct": round(len(actionable) / total * 100, 1) if total else 0,
"paper_conversion_pct": round(len(paper_items) / len(buy_now) * 100, 1) if buy_now else 0,
"definition": "策略归因只看机会转化和策略交易转化;收益只来自交易账本,不读取 recommendation.pnl_pct。",
}
def add_bucket(bucket_map, key, item):
if not key:
return
b = bucket_map.setdefault(key, {
"opportunity_count": 0,
"actionable_count": 0,
"buy_now_count": 0,
"paper_trade_count": 0,
"closed_paper_trade_count": 0,
"paper_win_count": 0,
"paper_realized_pnl_usdt": 0.0,
})
execution_status = item.get("execution_status") or ""
paper_status = item.get("paper_status") or ""
b["opportunity_count"] += 1
if execution_status in actionable_statuses:
b["actionable_count"] += 1
if execution_status == "buy_now":
b["buy_now_count"] += 1
if item.get("paper_trade_id"):
b["paper_trade_count"] += 1
if paper_status == "closed":
b["closed_paper_trade_count"] += 1
pnl_pct = float(item.get("paper_realized_pnl_pct") or 0)
if pnl_pct > 0:
b["paper_win_count"] += 1
b["paper_realized_pnl_usdt"] += float(item.get("paper_realized_pnl_usdt") or 0)
factor_map = {}
env_map = {}
version_map = {}
evidence_map = {}
trade_factor_map = {}
trade_entry_map = {}
trade_exit_map = {}
trade_env_map = {}
trade_evidence_map = {}
trade_version_map = {}
strategy_code_map = {}
trade_strategy_code_map = {}
trade_factor_group_map = {}
trade_regime_map = {}
trade_score_band_map = {}
watch_map = {}
order_map = {}
for item in items:
labels = safe_list_json(item.get("signal_labels_json")) or safe_list_json(item.get("signals"))
codes = safe_list_json(item.get("signal_codes_json"))
ep = safe_dict_json(item.get("entry_plan_json"))
for factor in labels:
add_bucket(factor_map, str(factor).strip(), item)
for code in codes:
text = str(code or "").strip()
if text.startswith(("sentiment_", "listing_", "ecosystem_")):
add_bucket(evidence_map, "舆情:" + text, item)
elif text.startswith(("dex_", "liquidity_", "exchange_", "whale_", "smart_money", "holder_")):
add_bucket(evidence_map, "链上:" + text, item)
mc = safe_dict_json(item.get("market_context_json"))
factor_breakdown = safe_dict_json(mc.get("factor_score_breakdown")) or safe_dict_json(ep.get("factor_score_breakdown"))
score_components = safe_dict_json(mc.get("score_components")) or safe_dict_json(ep.get("score_components"))
market_regime = safe_dict_json(mc.get("market_regime")) or safe_dict_json(ep.get("market_regime"))
regime_name = market_regime.get("regime") or mc.get("market_regime")
for key in ("btc_trend", "market_regime", "altcoin_regime", "sentiment"):
if mc.get(key):
add_bucket(env_map, f"{key}:{mc.get(key)}", item)
if regime_name:
add_bucket(env_map, f"regime:{regime_name}", item)
for bucket in env_buckets_from_market_context(mc):
add_bucket(env_map, bucket, item)
if item.get("strategy_version"):
add_bucket(version_map, str(item.get("strategy_version")).strip(), item)
strategy_code = normalize_strategy_code(item.get("strategy_code") or item.get("paper_strategy_code") or item.get("paper_order_strategy_code"))
add_bucket(strategy_code_map, strategy_code, item)
if (item.get("execution_status") or "") in {"observe", "wait_pullback"} or (item.get("display_bucket") or "") == "watch_pool":
add_watch_bucket(watch_map, watch_bucket(item), item)
if item.get("paper_order_id"):
add_order_bucket(order_map, order_bucket(item), item)
for item in trade_items:
labels = safe_list_json(item.get("signal_labels_json")) or safe_list_json(item.get("signals"))
codes = safe_list_json(item.get("signal_codes_json"))
ep = safe_dict_json(item.get("entry_plan_json"))
mc = safe_dict_json(item.get("market_context_json"))
factor_breakdown = safe_dict_json(mc.get("factor_score_breakdown")) or safe_dict_json(ep.get("factor_score_breakdown"))
score_components = safe_dict_json(mc.get("score_components")) or safe_dict_json(ep.get("score_components"))
market_regime = safe_dict_json(mc.get("market_regime")) or safe_dict_json(ep.get("market_regime"))
regime_name = market_regime.get("regime") or mc.get("market_regime")
strategy_code = normalize_strategy_code(item.get("paper_strategy_code") or item.get("strategy_code") or item.get("paper_order_strategy_code"))
for factor in labels:
add_trade_bucket(trade_factor_map, str(factor).strip(), item)
for group in factor_groups_from_breakdown(factor_breakdown):
add_trade_bucket(trade_factor_group_map, group, item)
add_trade_bucket(trade_entry_map, trade_entry_bucket(item), item)
add_trade_bucket(trade_exit_map, item.get("paper_exit_reason") or "未记录退出原因", item)
add_trade_bucket(trade_entry_map, f"方向:{item.get('paper_side') or item.get('side') or 'long'}", item)
if item.get("paper_order_id"):
add_trade_bucket(trade_entry_map, f"挂单路径:{item.get('paper_order_status') or 'filled'}", item)
add_trade_bucket(trade_score_band_map, score_band("机会分", score_components.get("opportunity_score")), item)
add_trade_bucket(trade_score_band_map, score_band("买点分", score_components.get("entry_score")), item)
add_trade_bucket(trade_score_band_map, score_band("风险分", score_components.get("risk_score")), item)
if regime_name:
add_trade_bucket(trade_regime_map, f"regime:{regime_name}", item)
for bucket in env_buckets_from_market_context(mc):
add_trade_bucket(trade_env_map, bucket, item)
for code in codes:
text = str(code or "").strip()
if text.startswith(("sentiment_", "listing_", "ecosystem_")):
add_trade_bucket(trade_evidence_map, "舆情:" + text, item)
elif text.startswith(("dex_", "liquidity_", "exchange_", "whale_", "smart_money", "holder_", "onchain_")):
add_trade_bucket(trade_evidence_map, "链上:" + text, item)
if item.get("strategy_version"):
add_trade_bucket(trade_version_map, str(item.get("strategy_version")).strip(), item)
add_trade_bucket(trade_strategy_code_map, strategy_code, item)
return {
"overview": overview,
"days": days,
"metric_definition": {
"opportunity_count": "进入 opportunity/recommendation 表的机会样本数,不代表交易。",
"actionable_count": "确认层输出 buy_now 或 wait_pullback 的样本数。",
"paper_trade_count": "已经被策略交易账本执行的样本数。",
"paper_realized_pnl_usdt": "仅来自交易账本的已平仓策略收益。",
},
"factor_attribution": serialize_buckets("factor", factor_map)[:30],
"market_environment": serialize_buckets("environment", env_map)[:20],
"evidence_attribution": serialize_buckets("evidence", evidence_map)[:20],
"version_performance": serialize_buckets("strategy_version", version_map, sort_by_version=True)[:20],
"strategy_performance": add_strategy_labels(serialize_buckets("strategy_code", strategy_code_map)[:20]),
"trade_attribution": {
"definition": "交易级归因只统计已平仓策略交易,用 realized_pnl_usdt / realized_pnl_pct 衡量因子、入场路径、退出原因和环境的真实账本表现。",
"factor": serialize_trade_buckets("factor", trade_factor_map)[:30],
"factor_group": serialize_trade_buckets("factor_group", trade_factor_group_map)[:20],
"entry_path": serialize_trade_buckets("entry_path", trade_entry_map)[:20],
"exit_reason": serialize_trade_buckets("exit_reason", trade_exit_map)[:20],
"market_regime": serialize_trade_buckets("market_regime", trade_regime_map)[:20],
"score_band": serialize_trade_buckets("score_band", trade_score_band_map)[:20],
"market_environment": serialize_trade_buckets("environment", trade_env_map)[:20],
"evidence": serialize_trade_buckets("evidence", trade_evidence_map)[:20],
"strategy_version": serialize_trade_buckets("strategy_version", trade_version_map, sort_by_version=True)[:20],
"strategy_code": add_strategy_labels(serialize_trade_buckets("strategy_code", trade_strategy_code_map)[:20]),
},
"watch_order_attribution": {
"definition": "观察池和挂单池只评价机会是否推进,不计入交易收益;用于判断没买/等回踩是否合理。",
"watch_pool": serialize_watch_buckets("watch_bucket", watch_map)[:20],
"paper_orders": serialize_order_buckets("order_bucket", order_map)[:20],
},
}
def add_strategy_labels(rows):
for item in rows or []:
code = item.get("strategy_code") or item.get("name") or ""
item["strategy_code"] = normalize_strategy_code(code)
item["strategy_name"] = strategy_label(item["strategy_code"])
return rows
def add_trade_bucket(bucket_map, key, item):
if not key:
return
b = bucket_map.setdefault(key, {
"closed_trade_count": 0,
"win_count": 0,
"loss_count": 0,
"realized_pnl_usdt": 0.0,
"pnl_pct_values": [],
"best_pnl_pct": None,
"worst_pnl_pct": None,
})
pnl_pct = float(item.get("paper_realized_pnl_pct") or 0)
pnl_usdt = float(item.get("paper_realized_pnl_usdt") or 0)
b["closed_trade_count"] += 1
b["realized_pnl_usdt"] += pnl_usdt
b["pnl_pct_values"].append(pnl_pct)
if pnl_pct > 0:
b["win_count"] += 1
elif pnl_pct < 0:
b["loss_count"] += 1
b["best_pnl_pct"] = pnl_pct if b["best_pnl_pct"] is None else max(b["best_pnl_pct"], pnl_pct)
b["worst_pnl_pct"] = pnl_pct if b["worst_pnl_pct"] is None else min(b["worst_pnl_pct"], pnl_pct)
def trade_entry_bucket(item):
source = str(item.get("paper_source_status") or item.get("execution_status") or "").strip()
action = str(item.get("paper_source_action") or item.get("action_status") or "").strip()
if source == "wait_pullback" or action == "等回踩":
return "入场:回踩挂单成交"
if source == "buy_now" or action == "可即刻买入":
return "入场:现价确认"
if source:
return f"入场:{source}"
return "入场:未标记"
def factor_groups_from_breakdown(breakdown):
groups = breakdown.get("groups") if isinstance(breakdown, dict) else {}
if isinstance(groups, dict):
return [str(k) for k, v in groups.items() if float((v or {}).get("score") or 0) != 0]
items = breakdown.get("items") if isinstance(breakdown, dict) else []
result = []
for item in items if isinstance(items, list) else []:
group = str((item or {}).get("factor_group") or "").strip()
if group:
result.append(group)
return sorted(set(result))
def score_band(label, value):
try:
n = float(value)
except Exception:
return f"{label}:未知"
if n >= 8:
band = ""
elif n >= 3:
band = ""
elif n >= 0:
band = ""
else:
band = ""
return f"{label}:{band}({n:g})"
def watch_bucket(item):
status = str(item.get("execution_status") or item.get("display_bucket") or "watch").strip()
if status == "wait_pullback":
return "观察:等待回踩"
if status == "observe":
return "观察:普通观察"
return f"观察:{status or '未标记'}"
def order_bucket(item):
status = str(item.get("paper_order_status") or "unknown").strip()
source = str(item.get("paper_source_status") or item.get("execution_status") or "").strip()
if status == "filled":
return "挂单:已成交"
if status == "canceled":
return "挂单:已取消"
if status == "pending":
return "挂单:等待中"
return f"挂单:{source or status}"
def add_watch_bucket(bucket_map, key, item):
if not key:
return
b = bucket_map.setdefault(key, {
"opportunity_count": 0,
"executed_count": 0,
"order_count": 0,
"invalid_count": 0,
})
b["opportunity_count"] += 1
if item.get("paper_trade_id"):
b["executed_count"] += 1
if item.get("paper_order_id"):
b["order_count"] += 1
if (item.get("execution_status") or "") == "invalid" or (item.get("status") or "") in {"expired", "invalid", "archived"}:
b["invalid_count"] += 1
def add_order_bucket(bucket_map, key, item):
if not key:
return
b = bucket_map.setdefault(key, {
"order_count": 0,
"filled_count": 0,
"canceled_count": 0,
"trade_count": 0,
})
status = str(item.get("paper_order_status") or "")
b["order_count"] += 1
if status == "filled":
b["filled_count"] += 1
if status == "canceled":
b["canceled_count"] += 1
if item.get("paper_trade_id"):
b["trade_count"] += 1
def serialize_watch_buckets(name_key, bucket_map):
rows = []
for key, bucket in bucket_map.items():
total = bucket["opportunity_count"]
rows.append({
name_key: key,
**bucket,
"executed_pct": round(bucket["executed_count"] / total * 100, 1) if total else 0,
"order_pct": round(bucket["order_count"] / total * 100, 1) if total else 0,
"invalid_pct": round(bucket["invalid_count"] / total * 100, 1) if total else 0,
})
rows.sort(key=lambda x: (-x["opportunity_count"], x[name_key]))
return rows
def serialize_order_buckets(name_key, bucket_map):
rows = []
for key, bucket in bucket_map.items():
total = bucket["order_count"]
rows.append({
name_key: key,
**bucket,
"fill_pct": round(bucket["filled_count"] / total * 100, 1) if total else 0,
"cancel_pct": round(bucket["canceled_count"] / total * 100, 1) if total else 0,
})
rows.sort(key=lambda x: (-x["order_count"], x[name_key]))
return rows
def env_buckets_from_market_context(mc):
"""Convert market_context_json numeric fields into attribution buckets."""
buckets = []
try:
change_24h = float(mc.get("change_24h", 0) or 0)
turn_1h = float(mc.get("turnover_acceleration_1h", 0) or 0)
turn_4h = float(mc.get("turnover_acceleration_4h", 0) or 0)
volume_24h = float(mc.get("volume_24h") or mc.get("quote_volume_24h") or 0)
funding = float(mc.get("funding_rate", 0) or 0)
except Exception:
change_24h = turn_1h = turn_4h = volume_24h = funding = 0
if change_24h >= 8:
buckets.append("24h涨幅:强势拉升≥8%")
elif change_24h >= 3:
buckets.append("24h涨幅:温和上涨3-8%")
elif change_24h <= -3:
buckets.append("24h涨幅:回撤≤-3%")
else:
buckets.append("24h涨幅:震荡-3~3%")
if turn_1h >= 3:
buckets.append("1h成交加速:爆量≥3x")
elif turn_1h >= 1.5:
buckets.append("1h成交加速:放量1.5-3x")
elif turn_1h > 0:
buckets.append("1h成交加速:平量<1.5x")
if turn_4h >= 3:
buckets.append("4h成交加速:爆量≥3x")
elif turn_4h >= 1.5:
buckets.append("4h成交加速:放量1.5-3x")
elif turn_4h > 0:
buckets.append("4h成交加速:平量<1.5x")
if volume_24h >= 100_000_000:
buckets.append("24h成交额:高流动性≥1亿")
elif volume_24h >= 10_000_000:
buckets.append("24h成交额:中等流动性1千万-1亿")
elif volume_24h > 0:
buckets.append("24h成交额:低流动性<1千万")
if funding >= 0.0005:
buckets.append("资金费率:多头拥挤")
elif funding <= -0.0005:
buckets.append("资金费率:空头拥挤")
return buckets
def serialize_buckets(name_key, bucket_map, sort_by_version=False):
rows = []
for key, bucket in bucket_map.items():
rows.append({
name_key: key,
"opportunity_count": bucket["opportunity_count"],
"actionable_count": bucket["actionable_count"],
"buy_now_count": bucket["buy_now_count"],
"paper_trade_count": bucket["paper_trade_count"],
"closed_paper_trade_count": bucket["closed_paper_trade_count"],
"paper_win_count": bucket["paper_win_count"],
"actionable_conversion_pct": round(bucket["actionable_count"] / bucket["opportunity_count"] * 100, 1) if bucket["opportunity_count"] else 0,
"paper_conversion_pct": round(bucket["paper_trade_count"] / bucket["buy_now_count"] * 100, 1) if bucket["buy_now_count"] else 0,
"paper_win_rate_pct": round(bucket["paper_win_count"] / bucket["closed_paper_trade_count"] * 100, 1) if bucket["closed_paper_trade_count"] else 0,
"paper_realized_pnl_usdt": round(bucket["paper_realized_pnl_usdt"], 4),
})
if sort_by_version:
rows.sort(key=lambda x: (version_sort_key(x[name_key]), x["opportunity_count"], x["actionable_conversion_pct"]), reverse=True)
else:
rows.sort(key=lambda x: (-x["opportunity_count"], -x["actionable_conversion_pct"], x[name_key]))
return rows
def serialize_trade_buckets(name_key, bucket_map, sort_by_version=False):
rows = []
for key, bucket in bucket_map.items():
pnl_values = bucket["pnl_pct_values"]
closed = bucket["closed_trade_count"]
rows.append({
name_key: key,
"closed_trade_count": closed,
"win_count": bucket["win_count"],
"loss_count": bucket["loss_count"],
"win_rate_pct": round(bucket["win_count"] / closed * 100, 1) if closed else 0,
"realized_pnl_usdt": round(bucket["realized_pnl_usdt"], 4),
"avg_realized_pnl_pct": round(sum(pnl_values) / len(pnl_values), 2) if pnl_values else 0,
"best_pnl_pct": round(bucket["best_pnl_pct"] or 0, 2),
"worst_pnl_pct": round(bucket["worst_pnl_pct"] or 0, 2),
})
if sort_by_version:
rows.sort(key=lambda x: (version_sort_key(x[name_key]), x["closed_trade_count"]), reverse=True)
else:
rows.sort(key=lambda x: (-x["closed_trade_count"], -x["realized_pnl_usdt"], x[name_key]))
return rows
def version_sort_key(version: str):
text = str(version or '').strip()
if text.startswith('v') or text.startswith('V'):
text = text[1:]
parts = []
for chunk in text.replace('-', '.').split('.'):
if chunk.isdigit():
parts.append(int(chunk))
else:
match = re.match(r'^(\d+)', chunk)
if match:
parts.append(int(match.group(1)))
else:
parts.append(chunk)
return tuple(parts)