alphax/app/db/strategy_rule_queries.py
2026-05-20 00:57:46 +08:00

637 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Strategy rule candidates and failure-pattern queries."""
import json
from datetime import datetime
from app.config.config_loader import get_meta
from app.db.schema import get_conn
def loads_json_field(value, fallback):
try:
return json.loads(value) if isinstance(value, str) else (value if value is not None else fallback)
except Exception:
return fallback
def upsert_strategy_rule_candidate(source, rule_type, signal_name, rule_description,
support_count=0, success_count=0, fail_count=0,
avg_pnl=0, max_gain=0, max_drawdown=0,
confidence_score=0, sample_size=0, status="candidate",
release_version="", notes="", source_ref=""):
"""Insert or update a research-only strategy rule candidate."""
conn = get_conn()
now = datetime.now().isoformat()
existing = conn.execute("""
SELECT id FROM strategy_rule_candidate
WHERE source=%s AND rule_type=%s AND signal_name=%s AND rule_description=%s
ORDER BY id DESC LIMIT 1
""", (source or "", rule_type or "", signal_name or "", rule_description or "")).fetchone()
if existing:
conn.execute("""
UPDATE strategy_rule_candidate
SET support_count=%s, success_count=%s, fail_count=%s, avg_pnl=%s, max_gain=%s,
max_drawdown=%s, confidence_score=%s, sample_size=%s, status=%s,
release_version=%s, notes=%s, source_ref=COALESCE(NULLIF(%s, ''), source_ref), created_at=%s
WHERE id=%s
""", (support_count, success_count, fail_count, avg_pnl, max_gain, max_drawdown,
confidence_score, sample_size, status, release_version or "", notes or "", source_ref or "", now, existing["id"]))
candidate_id = existing["id"]
else:
cur = conn.execute("""
INSERT INTO strategy_rule_candidate (
created_at, source, rule_type, signal_name, rule_description,
support_count, success_count, fail_count, avg_pnl, max_gain, max_drawdown,
confidence_score, sample_size, status, release_version, notes, source_ref
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (now, source or "", rule_type or "", signal_name or "", rule_description or "",
support_count, success_count, fail_count, avg_pnl, max_gain, max_drawdown,
confidence_score, sample_size, status, release_version or "", notes or "", source_ref or ""))
candidate_id = cur.fetchone()["id"]
conn.commit()
conn.close()
return candidate_id
def record_strategy_failure_pattern(symbol, version="", failure_type="", failure_reason="",
signal_combo=None, market_context=None,
entry_quality_issue="", pnl_pct=0, max_drawdown_pct=0, lesson=""):
"""Record a failure-pattern sample for later attribution."""
conn = get_conn()
conn.execute("""
INSERT INTO strategy_failure_pattern (
created_at, symbol, version, failure_type, failure_reason, signal_combo,
market_context_json, entry_quality_issue, pnl_pct, max_drawdown_pct, lesson
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
datetime.now().isoformat(), symbol or "", version or "", failure_type or "",
failure_reason or "", json.dumps(signal_combo or [], ensure_ascii=False, default=str),
json.dumps(market_context or {}, ensure_ascii=False, default=str),
entry_quality_issue or "", pnl_pct or 0, max_drawdown_pct or 0, lesson or "",
))
conn.commit()
conn.close()
def get_strategy_rule_candidates(limit=50, status=None):
"""Read strategy rule candidates."""
conn = get_conn()
params = []
where = ""
if status:
where = "WHERE status=%s"
params.append(status)
rows = conn.execute(f"""
SELECT * FROM strategy_rule_candidate
{where}
ORDER BY confidence_score DESC, sample_size DESC, created_at DESC
LIMIT %s
""", (*params, limit)).fetchall()
conn.close()
return [dict(r) for r in rows]
def update_strategy_rule_candidate_status(candidate_id, status, release_version="", notes_append=""):
"""Update candidate lifecycle status."""
conn = get_conn()
row = conn.execute("SELECT notes FROM strategy_rule_candidate WHERE id=%s", (candidate_id,)).fetchone()
if not row:
conn.close()
return False
notes = (row["notes"] or "").strip()
if notes_append:
notes = (notes + "\n" if notes else "") + f"[{datetime.now().isoformat()}] {notes_append}"
conn.execute("""
UPDATE strategy_rule_candidate
SET status=%s, release_version=COALESCE(NULLIF(%s, ''), release_version), notes=%s, created_at=%s
WHERE id=%s
""", (status or "candidate", release_version or "", notes, datetime.now().isoformat(), candidate_id))
conn.commit()
conn.close()
return True
def get_strategy_failure_patterns(limit=50):
"""Read failure-pattern rows."""
conn = get_conn()
rows = conn.execute("""
SELECT * FROM strategy_failure_pattern
ORDER BY created_at DESC
LIMIT %s
""", (limit,)).fetchall()
conn.close()
items = []
for row in rows:
item = dict(row)
item["signal_combo"] = loads_json_field(item.get("signal_combo"), [])
item["market_context"] = loads_json_field(item.get("market_context_json"), {})
items.append(item)
return items
def candidate_signal_key(signal_text):
"""Lightweight signal normalization for candidate attribution."""
text = str(signal_text or "")
key_map = {
"量价齐飞": "vp_fly",
"N倍放量": "vol_Nx",
"放量": "1h_vol",
"供需区突破": "zone_break",
"供给区突破": "zone_break",
"站稳突破": "zone_break",
"起爆点": "ignition",
"静K→动K": "ignition",
"静K蓄力": "sk_accum",
"连续3K": "cont3k",
"连续K": "cont_k",
"Q≥7": "q7_break",
"动K": "dyn_k",
"过期": "stale_signal",
"历史": "stale_signal",
"追高": "chase_high",
"假突破": "false_breakout",
"量价背离": "vp_divergence",
}
for marker, key in key_map.items():
if marker in text:
return key
return text[:12]
def get_factor_recency_fixed_at():
"""Factor-recency fix time. Older recommendations are dirty-history references."""
try:
meta = get_meta() or {}
except Exception:
meta = {}
return (meta.get("factor_recency_fixed_at") or meta.get("clean_review_started_at") or "").strip()
def is_dirty_history_candidate(candidate):
source = str(candidate.get("source") or "")
notes = str(candidate.get("notes") or "")
source_ref = str(candidate.get("source_ref") or "")
return source in ("history_review_auto", "dirty_history_reference") or "dirty_history" in source_ref or "污染历史" in notes
def candidate_status_for_metrics(rule_type, sample_size, confidence, avg_pnl, current_status="candidate",
min_gray_samples=10, min_gray_confidence=65):
"""Derive candidate lifecycle status from clean sample metrics."""
if current_status == "active":
return "active"
if sample_size >= min_gray_samples and confidence >= min_gray_confidence and (avg_pnl > 0 or rule_type == "penalty"):
return "gray"
if sample_size >= 8 and ((rule_type != "penalty" and confidence < 35) or avg_pnl <= -3):
return "rejected"
if current_status in ("gray", "rejected"):
return current_status
return "candidate"
def classify_failure_type_from_text(review):
"""Local classifier for historical failure-pattern backfill."""
signals = review.get("triggered_signals") or []
miss = review.get("miss_signals") or []
lesson = review.get("lesson") or ""
text = " ".join([str(x) for x in signals + miss]) + " " + str(lesson or "")
pnl = float(review.get("pnl_48h") or 0)
outcome = review.get("outcome") or ""
if any(k in text for k in ["过期", "历史", "旧放量", "age_bars", "已过期", "小时前", "旧起爆"]):
return "过期因子误判", "历史放量/起爆/突破不能当作当前触发信号,必须做时效闸门"
if any(k in text for k in ["假突破", "突破失败", "未站稳", "冲高回落"]):
return "假突破", "突破后没有站稳或快速回落,需要增加站稳/承接确认"
if any(k in text for k in ["量价背离", "缩量上涨", "放量下跌", "无量拉升"]):
return "量价背离", "价格动作与成交量不匹配,量能确认不足"
if any(k in text for k in ["高位", "追高", "涨幅过大", "乖离"]):
return "追高风险", "入场位置偏高,盈亏比和回撤风险恶化"
if any(k in text for k in ["承接不足", "无承接", "上影线", "砸盘"]):
return "高位无承接", "高位出现抛压但缺少买盘承接"
if any(k in text for k in ["板块退潮", "热点退潮", "龙头走弱", "板块分歧"]):
return "板块退潮", "板块热度回落,个币信号容易失效"
if any(k in text for k in ["BTC", "大盘", "反向共振", "系统性"]):
return "BTC/大盘反向共振", "大盘方向与个币信号冲突,需要宏观/主流币过滤"
if any(k in text for k in ["止损", "盈亏比", "RR", "止盈"]):
return "止损/盈亏比不合理", "止损或止盈结构不合理,导致信号收益风险不匹配"
if "滞后" in text or "MACD" in text or "RSI" in text:
return "滞后信号追高", "滞后指标占比高,容易形成事后确认/追高失败"
if "缺乏前瞻" in text or "前瞻" not in text:
return "前瞻信号不足", "缺少量价/PA等前瞻性确认"
if "横盘" in text or outcome == "横盘":
return "信号强度不足", "触发后未形成有效爆发,确认条件偏弱"
if "回撤" in text or pnl < -3:
return "入场点太晚", "入场后回撤/亏损明显,买点可能滞后或确认过慢"
return "未分类失败", "需要继续积累样本做二级归因"
def backfill_strategy_failure_patterns(limit=2000, dry_run=False):
"""Backfill failure-pattern rows from historical review_log, deduped by rec_id."""
conn = get_conn()
rows = conn.execute("""
SELECT rl.*, r.strategy_version, r.max_drawdown_pct
FROM review_log rl
LEFT JOIN recommendation r ON r.id = rl.rec_id
WHERE rl.outcome IN ('失败','横盘')
ORDER BY rl.review_time DESC
LIMIT %s
""", (limit,)).fetchall()
existing = set()
for row in conn.execute("SELECT market_context_json FROM strategy_failure_pattern").fetchall():
ctx = loads_json_field(row["market_context_json"], {})
if ctx.get("rec_id") is not None:
existing.add(str(ctx.get("rec_id")))
inserted = 0
skipped = 0
type_counts = {}
examples = []
for row in rows:
item = dict(row)
rec_id = item.get("rec_id")
if str(rec_id) in existing:
skipped += 1
continue
triggered = loads_json_field(item.get("triggered_signals"), []) or []
miss = loads_json_field(item.get("miss_signals"), []) or []
item["triggered_signals"] = triggered
item["miss_signals"] = miss
failure_type, reason = classify_failure_type_from_text(item)
type_counts[failure_type] = type_counts.get(failure_type, 0) + 1
if len(examples) < 10:
examples.append({"rec_id": rec_id, "symbol": item.get("symbol"), "failure_type": failure_type, "reason": reason})
if not dry_run:
conn.execute("""
INSERT INTO strategy_failure_pattern (
created_at, symbol, version, failure_type, failure_reason, signal_combo,
market_context_json, entry_quality_issue, pnl_pct, max_drawdown_pct, lesson
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
datetime.now().isoformat(), item.get("symbol") or "", item.get("strategy_version") or "",
failure_type, reason, json.dumps(triggered, ensure_ascii=False, default=str),
json.dumps({"source": "history_backfill", "rec_id": rec_id, "outcome": item.get("outcome"), "review_time": item.get("review_time")}, ensure_ascii=False, default=str),
reason, float(item.get("pnl_48h") or 0), float(item.get("max_drawdown_pct") or 0), item.get("lesson") or "",
))
existing.add(str(rec_id))
inserted += 1
if not dry_run:
conn.commit()
conn.close()
return {"dry_run": dry_run, "scanned": len(rows), "inserted": inserted, "skipped_existing": skipped, "type_counts": type_counts, "examples": examples}
def generate_candidates_from_review_history(min_samples=20, min_bonus_confidence=55, max_penalty_confidence=40, dry_run=False):
"""Generate strategy rule candidates from historical review_log."""
conn = get_conn()
rows = conn.execute("""
SELECT rl.*, r.max_drawdown_pct
FROM review_log rl
LEFT JOIN recommendation r ON r.id = rl.rec_id
ORDER BY rl.review_time DESC
""").fetchall()
buckets = {}
for row in rows:
item = dict(row)
triggered = loads_json_field(item.get("triggered_signals"), []) or []
hit = loads_json_field(item.get("hit_signals"), []) or []
miss = loads_json_field(item.get("miss_signals"), []) or []
keys = {candidate_signal_key(x) for x in list(triggered) + list(hit) + list(miss) if str(x).strip()}
if not keys:
continue
for key in keys:
bucket = buckets.setdefault(key, {"sample_size": 0, "success_count": 0, "fail_count": 0, "pnl_values": [], "dd_values": []})
bucket["sample_size"] += 1
if item.get("outcome") == "爆发":
bucket["success_count"] += 1
elif item.get("outcome") in ("失败", "横盘"):
bucket["fail_count"] += 1
bucket["pnl_values"].append(float(item.get("pnl_48h") or 0))
bucket["dd_values"].append(float(item.get("max_drawdown_pct") or 0))
generated = []
for key, bucket in buckets.items():
sample = bucket["sample_size"]
if sample < min_samples:
continue
resolved = bucket["success_count"] + bucket["fail_count"]
confidence = round(bucket["success_count"] / resolved * 100, 1) if resolved else 0
avg_pnl = round(sum(bucket["pnl_values"]) / len(bucket["pnl_values"]), 2) if bucket["pnl_values"] else 0
max_gain = round(max(bucket["pnl_values"]), 2) if bucket["pnl_values"] else 0
max_drawdown = round(min(bucket["dd_values"]), 2) if bucket["dd_values"] else 0
rule_type = "bonus" if confidence >= min_bonus_confidence and avg_pnl > 0 else "penalty" if confidence <= max_penalty_confidence else "observe"
if rule_type == "observe":
continue
status = candidate_status_for_metrics(rule_type, sample, confidence, avg_pnl, "candidate")
if rule_type == "bonus":
desc = f"历史样本候选加分因子:{key},样本{sample},成功{bucket['success_count']},失败/横盘{bucket['fail_count']},置信{confidence}%,均值{avg_pnl}%"
else:
desc = f"历史样本候选惩罚因子:{key},样本{sample},成功{bucket['success_count']},失败/横盘{bucket['fail_count']},置信{confidence}%,均值{avg_pnl}%"
candidate = {
"source": "dirty_history_reference",
"rule_type": rule_type,
"signal_name": key,
"rule_description": desc,
"support_count": sample,
"success_count": bucket["success_count"],
"fail_count": bucket["fail_count"],
"avg_pnl": avg_pnl,
"max_gain": max_gain,
"max_drawdown": max_drawdown,
"confidence_score": confidence,
"sample_size": sample,
"status": status,
"source_ref": f"dirty_history:{key}",
}
generated.append(candidate)
if not dry_run:
upsert_strategy_rule_candidate(
source=candidate["source"], rule_type=rule_type, signal_name=key,
rule_description=desc, support_count=sample, success_count=bucket["success_count"],
fail_count=bucket["fail_count"], avg_pnl=avg_pnl, max_gain=max_gain,
max_drawdown=max_drawdown, confidence_score=confidence, sample_size=sample,
status=status, notes="历史review_log自动生成候选规则仍需灰度验证后才可发布",
source_ref=candidate["source_ref"],
)
if not dry_run:
conn.commit()
conn.close()
generated.sort(key=lambda x: (-x["sample_size"], x["rule_type"], -x["confidence_score"]))
return {"dry_run": dry_run, "review_rows": len(rows), "generated_count": len(generated), "generated": generated[:80]}
def dry_run_strategy_candidate_performance(min_gray_samples=10, min_gray_confidence=65):
"""Evaluate strategy candidates without writing DB state."""
conn = get_conn()
candidates = [dict(r) for r in conn.execute("SELECT * FROM strategy_rule_candidate").fetchall()]
clean_started_at = get_factor_recency_fixed_at()
if clean_started_at:
review_rows = conn.execute("""
SELECT rl.*, r.strategy_version, r.max_drawdown_pct, r.rec_time
FROM review_log rl
LEFT JOIN recommendation r ON r.id = rl.rec_id
WHERE r.rec_time >= %s
ORDER BY rl.review_time DESC
""", (clean_started_at,)).fetchall()
else:
review_rows = conn.execute("""
SELECT rl.*, r.strategy_version, r.max_drawdown_pct, r.rec_time
FROM review_log rl
LEFT JOIN recommendation r ON r.id = rl.rec_id
ORDER BY rl.review_time DESC
""").fetchall()
failure_rows = [dict(r) for r in conn.execute("SELECT * FROM strategy_failure_pattern ORDER BY created_at DESC").fetchall()]
try:
current_version = str(get_meta().get("strategy_version") or "").strip()
except Exception:
current_version = ""
conn.close()
review_items = _build_review_items(review_rows)
evaluated = []
for candidate in candidates:
evaluated.append(_evaluate_candidate(candidate, review_items, failure_rows, min_gray_samples, min_gray_confidence))
gray_ready = [x for x in evaluated if x.get("dry_run_status") == "gray"]
active_ready = [x for x in evaluated if x.get("dry_run_status") == "active"]
rejected = [x for x in evaluated if x.get("dry_run_status") == "rejected"]
return {
"dry_run": True,
"current_version": current_version,
"review_sample_count": len(review_items),
"clean_started_at": clean_started_at,
"sample_window": "clean_after_factor_recency_fix" if clean_started_at else "all_history",
"dirty_history_candidate_count": sum(1 for c in candidates if is_dirty_history_candidate(c)),
"candidate_count": len(candidates),
"gray_ready_count": len(gray_ready),
"active_count": len(active_ready),
"rejected_count": len(rejected),
"would_bump_version": False,
"release_reason": "dry-run只评估候选规则表现不执行 learned_rules 写入或版本升级",
"gate_policy": {
"gray": f"sample_size≥{min_gray_samples} 且 confidence≥{min_gray_confidence} 且 avg_pnl>0penalty规则可不要求avg_pnl>0",
"reject": "sample_size≥8 且 confidence<35 或 avg_pnl≤-3",
"release": "dry-run不发布正式发布仍由复盘发布闸门统一控制",
},
"evaluated_candidates": sorted(evaluated, key=lambda x: (x.get("dry_run_status") != "gray", -float(x.get("sample_size") or 0), -float(x.get("confidence_score") or 0)))[:80],
}
def refresh_strategy_candidate_performance(min_gray_samples=10, min_gray_confidence=65):
"""Refresh candidate metrics and status from clean review/failure samples."""
conn = get_conn()
candidates = conn.execute("SELECT * FROM strategy_rule_candidate").fetchall()
clean_started_at = get_factor_recency_fixed_at()
if clean_started_at:
review_rows = conn.execute("""
SELECT rl.*, r.strategy_version, r.max_drawdown_pct, r.rec_time
FROM review_log rl
LEFT JOIN recommendation r ON r.id = rl.rec_id
WHERE r.rec_time >= %s
ORDER BY rl.review_time DESC
""", (clean_started_at,)).fetchall()
else:
review_rows = conn.execute("""
SELECT rl.*, r.strategy_version, r.max_drawdown_pct, r.rec_time
FROM review_log rl
LEFT JOIN recommendation r ON r.id = rl.rec_id
ORDER BY rl.review_time DESC
""").fetchall()
failure_rows = [dict(row) for row in conn.execute("SELECT * FROM strategy_failure_pattern ORDER BY created_at DESC").fetchall()]
review_items = _build_review_items(review_rows)
updated = []
for row in candidates:
candidate = dict(row)
candidate_id = candidate["id"]
status = candidate.get("status") or "candidate"
if status == "active":
continue
evaluated = _evaluate_candidate(candidate, review_items, failure_rows, min_gray_samples, min_gray_confidence)
if evaluated.get("dry_run_status") == "dirty_history":
updated.append({
"id": candidate_id,
"signal_name": candidate.get("signal_name") or "",
"source": candidate.get("source") or "",
"rule_type": candidate.get("rule_type") or "",
"sample_size": 0,
"success_count": 0,
"fail_count": 0,
"confidence_score": candidate.get("confidence_score") or 0,
"avg_pnl": candidate.get("avg_pnl") or 0,
"status": "dirty_history",
"description": candidate.get("rule_description") or "",
"gate_reason": "污染历史参考,不参与干净样本刷新",
})
continue
note = (candidate.get("notes") or "").strip()
audit_note = (
f"[{datetime.now().isoformat()}] 自动评估: 样本{evaluated['sample_size']}, "
f"成功{evaluated['success_count']}, 失败{evaluated['fail_count']}, "
f"置信{evaluated['confidence_score']}%, avg_pnl={evaluated['avg_pnl']}%, "
f"status={evaluated['dry_run_status']}"
)
if audit_note not in note:
note = (note + "\n" if note else "") + audit_note
conn.execute("""
UPDATE strategy_rule_candidate
SET support_count=%s, success_count=%s, fail_count=%s, avg_pnl=%s, max_gain=%s,
max_drawdown=%s, confidence_score=%s, sample_size=%s, status=%s, notes=%s, created_at=%s
WHERE id=%s
""", (
evaluated["sample_size"], evaluated["success_count"], evaluated["fail_count"],
evaluated["avg_pnl"], evaluated["max_gain"], evaluated["max_drawdown"],
evaluated["confidence_score"], evaluated["sample_size"], evaluated["dry_run_status"],
note, datetime.now().isoformat(), candidate_id,
))
updated.append({
"id": candidate_id,
"signal_name": candidate.get("signal_name") or "",
"source": candidate.get("source") or "",
"rule_type": candidate.get("rule_type") or "",
"sample_size": evaluated["sample_size"],
"success_count": evaluated["success_count"],
"fail_count": evaluated["fail_count"],
"confidence_score": evaluated["confidence_score"],
"avg_pnl": evaluated["avg_pnl"],
"status": evaluated["dry_run_status"],
"description": candidate.get("rule_description") or "",
})
conn.commit()
conn.close()
return updated
def _build_review_items(review_rows):
review_items = []
for row in review_rows:
item = dict(row)
triggered = loads_json_field(item.get("triggered_signals"), []) or []
hit = loads_json_field(item.get("hit_signals"), []) or []
miss = loads_json_field(item.get("miss_signals"), []) or []
all_signals = list(triggered) + list(hit) + list(miss)
item["signal_keys"] = {candidate_signal_key(signal) for signal in all_signals}
item["all_signal_text"] = " ".join(str(signal) for signal in all_signals)
review_items.append(item)
return review_items
def _evaluate_candidate(candidate, review_items, failure_rows, min_gray_samples, min_gray_confidence):
status = candidate.get("status") or "candidate"
source = candidate.get("source") or ""
rule_type = candidate.get("rule_type") or ""
signal_name = candidate.get("signal_name") or ""
source_ref = candidate.get("source_ref") or ""
if is_dirty_history_candidate(candidate):
return {
**candidate,
"sample_size": 0,
"support_count": 0,
"success_count": 0,
"fail_count": 0,
"dry_run_status": "dirty_history",
"release_gate_passed": False,
"gate_reason": "因子时效修复前的污染历史参考:不参与干净样本统计,不允许发布",
}
if status == "active":
return {**candidate, "dry_run_status": "active", "release_gate_passed": True, "gate_reason": "已正式生效不参与dry-run降级"}
if source.startswith("dual_attribution_failure") or source_ref.startswith("failure:") or rule_type == "penalty":
failure_type = signal_name or source_ref.replace("failure:", "")
matched = [row for row in failure_rows if (row.get("failure_type") or "") == failure_type or failure_type in (row.get("failure_reason") or "")]
sample_size = len(matched)
success_count = 0
fail_count = sample_size
pnl_values = [float(row.get("pnl_pct") or 0) for row in matched]
dd_values = [float(row.get("max_drawdown_pct") or 0) for row in matched]
confidence = round(min(95, 45 + fail_count * 8), 1) if sample_size else float(candidate.get("confidence_score") or 0)
else:
key = signal_name or source_ref.replace("review:", "")
matched = [item for item in review_items if key and (key in item["signal_keys"] or key in item["all_signal_text"] or signal_name in item["all_signal_text"])]
sample_size = len(matched)
success_count = sum(1 for row in matched if row.get("outcome") == "爆发")
fail_count = sum(1 for row in matched if row.get("outcome") in ("失败", "横盘"))
pnl_values = [float(row.get("pnl_48h") or 0) for row in matched]
dd_values = [float(row.get("max_drawdown_pct") or 0) for row in matched]
resolved = success_count + fail_count
confidence = round(success_count / resolved * 100, 1) if resolved else float(candidate.get("confidence_score") or 0)
avg_pnl = round(sum(pnl_values) / len(pnl_values), 2) if pnl_values else float(candidate.get("avg_pnl") or 0)
max_gain = round(max(pnl_values), 2) if pnl_values else float(candidate.get("max_gain") or 0)
max_drawdown = round(min(dd_values), 2) if dd_values else float(candidate.get("max_drawdown") or 0)
dry_status = candidate_status_for_metrics(rule_type, sample_size, confidence, avg_pnl, status, min_gray_samples, min_gray_confidence)
gate_passed = dry_status in ("gray", "active")
if dry_status == "gray":
gate_reason = f"样本{sample_size}{min_gray_samples},置信{confidence}%≥{min_gray_confidence}avg_pnl={avg_pnl}%:可进入灰度,仍不升版"
elif dry_status == "rejected":
gate_reason = f"样本{sample_size}已足够但置信/收益不达标:淘汰,不允许发布"
else:
gate_reason = f"样本{sample_size}或置信{confidence}%不足:只研究不发布"
return {
**candidate,
"sample_size": sample_size,
"support_count": sample_size,
"success_count": success_count,
"fail_count": fail_count,
"avg_pnl": avg_pnl,
"max_gain": max_gain,
"max_drawdown": max_drawdown,
"confidence_score": confidence,
"dry_run_status": dry_status,
"release_gate_passed": gate_passed,
"gate_reason": gate_reason,
}
def get_strategy_iteration_dashboard(days=30):
"""Dashboard aggregate: overview + candidates + failure patterns + timeline."""
from app.db.review_queries import get_strategy_iteration_logs, get_strategy_iteration_summary
summary = get_strategy_iteration_summary(days=days)
candidates = get_strategy_rule_candidates(limit=80)
failures = get_strategy_failure_patterns(limit=80)
logs = get_strategy_iteration_logs(limit=40)
status_counts = {}
source_counts = {}
for candidate in candidates:
status = candidate.get("status") or "candidate"
source = candidate.get("source") or "unknown"
status_counts[status] = status_counts.get(status, 0) + 1
source_counts[source] = source_counts.get(source, 0) + 1
failure_counts = {}
for failure in failures:
failure_type = failure.get("failure_type") or "未分类"
failure_counts[failure_type] = failure_counts.get(failure_type, 0) + 1
release_counts = {}
for log in logs:
decision = log.get("release_decision") or "unknown"
release_counts[decision] = release_counts.get(decision, 0) + 1
dry_run = dry_run_strategy_candidate_performance()
latest_log = logs[0] if logs else {}
return {
"summary": summary,
"overview": {
"total_logs": len(logs),
"candidate_count": len(candidates),
"candidate_status_counts": status_counts,
"candidate_source_counts": source_counts,
"failure_type_counts": [{"type": k, "count": v} for k, v in sorted(failure_counts.items(), key=lambda x: (-x[1], x[0]))],
"release_decision_counts": release_counts,
"latest_release_decision": latest_log.get("release_decision") or "hold",
"latest_release_reason": latest_log.get("release_reason") or latest_log.get("version_change_summary") or "暂无发布决策说明",
"dry_run_summary": {
"review_sample_count": dry_run.get("review_sample_count", 0),
"clean_started_at": dry_run.get("clean_started_at", ""),
"sample_window": dry_run.get("sample_window", "all_history"),
"dirty_history_candidate_count": dry_run.get("dirty_history_candidate_count", 0),
"candidate_count": dry_run.get("candidate_count", 0),
"gray_ready_count": dry_run.get("gray_ready_count", 0),
"rejected_count": dry_run.get("rejected_count", 0),
"would_bump_version": dry_run.get("would_bump_version", False),
"release_reason": dry_run.get("release_reason", ""),
},
},
"dry_run": dry_run,
"candidates": candidates,
"failures": failures,
"logs": logs,
}