"""Strategy attribution read model based on opportunity and paper-trading conversion.""" import json import re from app.db.schema import get_conn def safe_list_json(value): try: if isinstance(value, list): return value if isinstance(value, str) and value.strip(): parsed = json.loads(value) return parsed if isinstance(parsed, list) else [] except Exception: pass return [] def safe_dict_json(value): try: if isinstance(value, dict): return value if isinstance(value, str) and value.strip(): parsed = json.loads(value) return parsed if isinstance(parsed, dict) else {} except Exception: pass return {} def get_strategy_insights(): """Strategy attribution based on opportunity and paper-trading conversion. Recommendation rows are opportunities/signals, not an execution ledger. Therefore this read model does not use recommendation.pnl_pct as strategy PnL. Paper-trading PnL is exposed only as an execution-conversion metric. """ conn = get_conn() rows = conn.execute( """ SELECT r.*, pt.id AS paper_trade_id, pt.status AS paper_status, pt.side AS paper_side, pt.source_status AS paper_source_status, pt.source_action AS paper_source_action, pt.realized_pnl_pct AS paper_realized_pnl_pct, pt.realized_pnl_usdt AS paper_realized_pnl_usdt, pt.pnl_pct AS paper_pnl_pct, pt.exit_reason AS paper_exit_reason, po.id AS paper_order_id, po.status AS paper_order_status FROM recommendation r LEFT JOIN paper_trades pt ON pt.recommendation_id = r.id LEFT JOIN paper_orders po ON po.recommendation_id = r.id ORDER BY r.rec_time DESC, r.id DESC """ ).fetchall() conn.close() items = [dict(r) for r in rows] actionable_statuses = {"buy_now", "wait_pullback"} total = len(items) actionable = [x for x in items if (x.get("execution_status") or "") in actionable_statuses] buy_now = [x for x in items if (x.get("execution_status") or "") == "buy_now"] paper_items = [x for x in items if x.get("paper_trade_id")] closed_paper = [x for x in paper_items if x.get("paper_status") == "closed"] paper_wins = [x for x in closed_paper if float(x.get("paper_realized_pnl_pct") or 0) > 0] paper_realized_usdt = round(sum(float(x.get("paper_realized_pnl_usdt") or 0) for x in closed_paper), 4) overview = { "total_opportunities": total, "actionable_count": len(actionable), "buy_now_count": len(buy_now), "paper_trade_count": len(paper_items), "closed_paper_trade_count": len(closed_paper), "paper_win_count": len(paper_wins), "paper_win_rate_pct": round(len(paper_wins) / len(closed_paper) * 100, 1) if closed_paper else 0, "paper_realized_pnl_usdt": paper_realized_usdt, "actionable_conversion_pct": round(len(actionable) / total * 100, 1) if total else 0, "paper_conversion_pct": round(len(paper_items) / len(buy_now) * 100, 1) if buy_now else 0, "definition": "策略归因只看机会转化和策略交易转化;收益只来自交易账本,不读取 recommendation.pnl_pct。", } def add_bucket(bucket_map, key, item): if not key: return b = bucket_map.setdefault(key, { "opportunity_count": 0, "actionable_count": 0, "buy_now_count": 0, "paper_trade_count": 0, "closed_paper_trade_count": 0, "paper_win_count": 0, "paper_realized_pnl_usdt": 0.0, }) execution_status = item.get("execution_status") or "" paper_status = item.get("paper_status") or "" b["opportunity_count"] += 1 if execution_status in actionable_statuses: b["actionable_count"] += 1 if execution_status == "buy_now": b["buy_now_count"] += 1 if item.get("paper_trade_id"): b["paper_trade_count"] += 1 if paper_status == "closed": b["closed_paper_trade_count"] += 1 pnl_pct = float(item.get("paper_realized_pnl_pct") or 0) if pnl_pct > 0: b["paper_win_count"] += 1 b["paper_realized_pnl_usdt"] += float(item.get("paper_realized_pnl_usdt") or 0) factor_map = {} env_map = {} version_map = {} evidence_map = {} trade_factor_map = {} trade_entry_map = {} trade_exit_map = {} trade_env_map = {} trade_evidence_map = {} trade_version_map = {} for item in items: labels = safe_list_json(item.get("signal_labels_json")) or safe_list_json(item.get("signals")) codes = safe_list_json(item.get("signal_codes_json")) for factor in labels: add_bucket(factor_map, str(factor).strip(), item) for code in codes: text = str(code or "").strip() if text.startswith(("sentiment_", "listing_", "ecosystem_")): add_bucket(evidence_map, "舆情:" + text, item) elif text.startswith(("dex_", "liquidity_", "exchange_", "whale_", "smart_money", "holder_")): add_bucket(evidence_map, "链上:" + text, item) mc = safe_dict_json(item.get("market_context_json")) for key in ("btc_trend", "market_regime", "altcoin_regime", "sentiment"): if mc.get(key): add_bucket(env_map, f"{key}:{mc.get(key)}", item) for bucket in env_buckets_from_market_context(mc): add_bucket(env_map, bucket, item) if item.get("strategy_version"): add_bucket(version_map, str(item.get("strategy_version")).strip(), item) if item.get("paper_status") == "closed": for factor in labels: add_trade_bucket(trade_factor_map, str(factor).strip(), item) add_trade_bucket(trade_entry_map, trade_entry_bucket(item), item) add_trade_bucket(trade_exit_map, item.get("paper_exit_reason") or "未记录退出原因", item) add_trade_bucket(trade_entry_map, f"方向:{item.get('paper_side') or item.get('side') or 'long'}", item) if item.get("paper_order_id"): add_trade_bucket(trade_entry_map, f"挂单路径:{item.get('paper_order_status') or 'filled'}", item) for bucket in env_buckets_from_market_context(mc): add_trade_bucket(trade_env_map, bucket, item) for code in codes: text = str(code or "").strip() if text.startswith(("sentiment_", "listing_", "ecosystem_")): add_trade_bucket(trade_evidence_map, "舆情:" + text, item) elif text.startswith(("dex_", "liquidity_", "exchange_", "whale_", "smart_money", "holder_", "onchain_")): add_trade_bucket(trade_evidence_map, "链上:" + text, item) if item.get("strategy_version"): add_trade_bucket(trade_version_map, str(item.get("strategy_version")).strip(), item) return { "overview": overview, "metric_definition": { "opportunity_count": "进入 opportunity/recommendation 表的机会样本数,不代表交易。", "actionable_count": "确认层输出 buy_now 或 wait_pullback 的样本数。", "paper_trade_count": "已经被策略交易账本执行的样本数。", "paper_realized_pnl_usdt": "仅来自交易账本的已平仓策略收益。", }, "factor_attribution": serialize_buckets("factor", factor_map)[:30], "market_environment": serialize_buckets("environment", env_map)[:20], "evidence_attribution": serialize_buckets("evidence", evidence_map)[:20], "version_performance": serialize_buckets("strategy_version", version_map, sort_by_version=True)[:20], "trade_attribution": { "definition": "交易级归因只统计已平仓策略交易,用 realized_pnl_usdt / realized_pnl_pct 衡量因子、入场路径、退出原因和环境的真实账本表现。", "factor": serialize_trade_buckets("factor", trade_factor_map)[:30], "entry_path": serialize_trade_buckets("entry_path", trade_entry_map)[:20], "exit_reason": serialize_trade_buckets("exit_reason", trade_exit_map)[:20], "market_environment": serialize_trade_buckets("environment", trade_env_map)[:20], "evidence": serialize_trade_buckets("evidence", trade_evidence_map)[:20], "strategy_version": serialize_trade_buckets("strategy_version", trade_version_map, sort_by_version=True)[:20], }, } def add_trade_bucket(bucket_map, key, item): if not key: return b = bucket_map.setdefault(key, { "closed_trade_count": 0, "win_count": 0, "loss_count": 0, "realized_pnl_usdt": 0.0, "pnl_pct_values": [], "best_pnl_pct": None, "worst_pnl_pct": None, }) pnl_pct = float(item.get("paper_realized_pnl_pct") or 0) pnl_usdt = float(item.get("paper_realized_pnl_usdt") or 0) b["closed_trade_count"] += 1 b["realized_pnl_usdt"] += pnl_usdt b["pnl_pct_values"].append(pnl_pct) if pnl_pct > 0: b["win_count"] += 1 elif pnl_pct < 0: b["loss_count"] += 1 b["best_pnl_pct"] = pnl_pct if b["best_pnl_pct"] is None else max(b["best_pnl_pct"], pnl_pct) b["worst_pnl_pct"] = pnl_pct if b["worst_pnl_pct"] is None else min(b["worst_pnl_pct"], pnl_pct) def trade_entry_bucket(item): source = str(item.get("paper_source_status") or item.get("execution_status") or "").strip() action = str(item.get("paper_source_action") or item.get("action_status") or "").strip() if source == "wait_pullback" or action == "等回踩": return "入场:回踩挂单成交" if source == "buy_now" or action == "可即刻买入": return "入场:现价确认" if source: return f"入场:{source}" return "入场:未标记" def env_buckets_from_market_context(mc): """Convert market_context_json numeric fields into attribution buckets.""" buckets = [] try: change_24h = float(mc.get("change_24h", 0) or 0) turn_1h = float(mc.get("turnover_acceleration_1h", 0) or 0) turn_4h = float(mc.get("turnover_acceleration_4h", 0) or 0) volume_24h = float(mc.get("volume_24h") or mc.get("quote_volume_24h") or 0) funding = float(mc.get("funding_rate", 0) or 0) except Exception: change_24h = turn_1h = turn_4h = volume_24h = funding = 0 if change_24h >= 8: buckets.append("24h涨幅:强势拉升≥8%") elif change_24h >= 3: buckets.append("24h涨幅:温和上涨3-8%") elif change_24h <= -3: buckets.append("24h涨幅:回撤≤-3%") else: buckets.append("24h涨幅:震荡-3~3%") if turn_1h >= 3: buckets.append("1h成交加速:爆量≥3x") elif turn_1h >= 1.5: buckets.append("1h成交加速:放量1.5-3x") elif turn_1h > 0: buckets.append("1h成交加速:平量<1.5x") if turn_4h >= 3: buckets.append("4h成交加速:爆量≥3x") elif turn_4h >= 1.5: buckets.append("4h成交加速:放量1.5-3x") elif turn_4h > 0: buckets.append("4h成交加速:平量<1.5x") if volume_24h >= 100_000_000: buckets.append("24h成交额:高流动性≥1亿") elif volume_24h >= 10_000_000: buckets.append("24h成交额:中等流动性1千万-1亿") elif volume_24h > 0: buckets.append("24h成交额:低流动性<1千万") if funding >= 0.0005: buckets.append("资金费率:多头拥挤") elif funding <= -0.0005: buckets.append("资金费率:空头拥挤") return buckets def serialize_buckets(name_key, bucket_map, sort_by_version=False): rows = [] for key, bucket in bucket_map.items(): rows.append({ name_key: key, "opportunity_count": bucket["opportunity_count"], "actionable_count": bucket["actionable_count"], "buy_now_count": bucket["buy_now_count"], "paper_trade_count": bucket["paper_trade_count"], "closed_paper_trade_count": bucket["closed_paper_trade_count"], "paper_win_count": bucket["paper_win_count"], "actionable_conversion_pct": round(bucket["actionable_count"] / bucket["opportunity_count"] * 100, 1) if bucket["opportunity_count"] else 0, "paper_conversion_pct": round(bucket["paper_trade_count"] / bucket["buy_now_count"] * 100, 1) if bucket["buy_now_count"] else 0, "paper_win_rate_pct": round(bucket["paper_win_count"] / bucket["closed_paper_trade_count"] * 100, 1) if bucket["closed_paper_trade_count"] else 0, "paper_realized_pnl_usdt": round(bucket["paper_realized_pnl_usdt"], 4), }) if sort_by_version: rows.sort(key=lambda x: (version_sort_key(x[name_key]), x["opportunity_count"], x["actionable_conversion_pct"]), reverse=True) else: rows.sort(key=lambda x: (-x["opportunity_count"], -x["actionable_conversion_pct"], x[name_key])) return rows def serialize_trade_buckets(name_key, bucket_map, sort_by_version=False): rows = [] for key, bucket in bucket_map.items(): pnl_values = bucket["pnl_pct_values"] closed = bucket["closed_trade_count"] rows.append({ name_key: key, "closed_trade_count": closed, "win_count": bucket["win_count"], "loss_count": bucket["loss_count"], "win_rate_pct": round(bucket["win_count"] / closed * 100, 1) if closed else 0, "realized_pnl_usdt": round(bucket["realized_pnl_usdt"], 4), "avg_realized_pnl_pct": round(sum(pnl_values) / len(pnl_values), 2) if pnl_values else 0, "best_pnl_pct": round(bucket["best_pnl_pct"] or 0, 2), "worst_pnl_pct": round(bucket["worst_pnl_pct"] or 0, 2), }) if sort_by_version: rows.sort(key=lambda x: (version_sort_key(x[name_key]), x["closed_trade_count"]), reverse=True) else: rows.sort(key=lambda x: (-x["closed_trade_count"], -x["realized_pnl_usdt"], x[name_key])) return rows def version_sort_key(version: str): text = str(version or '').strip() if text.startswith('v') or text.startswith('V'): text = text[1:] parts = [] for chunk in text.replace('-', '.').split('.'): if chunk.isdigit(): parts.append(int(chunk)) else: match = re.match(r'^(\d+)', chunk) if match: parts.append(int(match.group(1))) else: parts.append(chunk) return tuple(parts)