637 lines
31 KiB
Python
637 lines
31 KiB
Python
"""Strategy rule candidates and failure-pattern queries."""
|
||
|
||
import json
|
||
from datetime import datetime
|
||
|
||
from app.config.config_loader import get_meta
|
||
from app.db.schema import get_conn
|
||
|
||
|
||
def loads_json_field(value, fallback):
|
||
try:
|
||
return json.loads(value) if isinstance(value, str) else (value if value is not None else fallback)
|
||
except Exception:
|
||
return fallback
|
||
|
||
|
||
def upsert_strategy_rule_candidate(source, rule_type, signal_name, rule_description,
|
||
support_count=0, success_count=0, fail_count=0,
|
||
avg_pnl=0, max_gain=0, max_drawdown=0,
|
||
confidence_score=0, sample_size=0, status="candidate",
|
||
release_version="", notes="", source_ref=""):
|
||
"""Insert or update a research-only strategy rule candidate."""
|
||
conn = get_conn()
|
||
now = datetime.now().isoformat()
|
||
existing = conn.execute("""
|
||
SELECT id FROM strategy_rule_candidate
|
||
WHERE source=%s AND rule_type=%s AND signal_name=%s AND rule_description=%s
|
||
ORDER BY id DESC LIMIT 1
|
||
""", (source or "", rule_type or "", signal_name or "", rule_description or "")).fetchone()
|
||
if existing:
|
||
conn.execute("""
|
||
UPDATE strategy_rule_candidate
|
||
SET support_count=%s, success_count=%s, fail_count=%s, avg_pnl=%s, max_gain=%s,
|
||
max_drawdown=%s, confidence_score=%s, sample_size=%s, status=%s,
|
||
release_version=%s, notes=%s, source_ref=COALESCE(NULLIF(%s, ''), source_ref), created_at=%s
|
||
WHERE id=%s
|
||
""", (support_count, success_count, fail_count, avg_pnl, max_gain, max_drawdown,
|
||
confidence_score, sample_size, status, release_version or "", notes or "", source_ref or "", now, existing["id"]))
|
||
candidate_id = existing["id"]
|
||
else:
|
||
cur = conn.execute("""
|
||
INSERT INTO strategy_rule_candidate (
|
||
created_at, source, rule_type, signal_name, rule_description,
|
||
support_count, success_count, fail_count, avg_pnl, max_gain, max_drawdown,
|
||
confidence_score, sample_size, status, release_version, notes, source_ref
|
||
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
RETURNING id
|
||
""", (now, source or "", rule_type or "", signal_name or "", rule_description or "",
|
||
support_count, success_count, fail_count, avg_pnl, max_gain, max_drawdown,
|
||
confidence_score, sample_size, status, release_version or "", notes or "", source_ref or ""))
|
||
candidate_id = cur.fetchone()["id"]
|
||
conn.commit()
|
||
conn.close()
|
||
return candidate_id
|
||
|
||
|
||
def record_strategy_failure_pattern(symbol, version="", failure_type="", failure_reason="",
|
||
signal_combo=None, market_context=None,
|
||
entry_quality_issue="", pnl_pct=0, max_drawdown_pct=0, lesson=""):
|
||
"""Record a failure-pattern sample for later attribution."""
|
||
conn = get_conn()
|
||
conn.execute("""
|
||
INSERT INTO strategy_failure_pattern (
|
||
created_at, symbol, version, failure_type, failure_reason, signal_combo,
|
||
market_context_json, entry_quality_issue, pnl_pct, max_drawdown_pct, lesson
|
||
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
""", (
|
||
datetime.now().isoformat(), symbol or "", version or "", failure_type or "",
|
||
failure_reason or "", json.dumps(signal_combo or [], ensure_ascii=False, default=str),
|
||
json.dumps(market_context or {}, ensure_ascii=False, default=str),
|
||
entry_quality_issue or "", pnl_pct or 0, max_drawdown_pct or 0, lesson or "",
|
||
))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def get_strategy_rule_candidates(limit=50, status=None):
|
||
"""Read strategy rule candidates."""
|
||
conn = get_conn()
|
||
params = []
|
||
where = ""
|
||
if status:
|
||
where = "WHERE status=%s"
|
||
params.append(status)
|
||
rows = conn.execute(f"""
|
||
SELECT * FROM strategy_rule_candidate
|
||
{where}
|
||
ORDER BY confidence_score DESC, sample_size DESC, created_at DESC
|
||
LIMIT %s
|
||
""", (*params, limit)).fetchall()
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def update_strategy_rule_candidate_status(candidate_id, status, release_version="", notes_append=""):
|
||
"""Update candidate lifecycle status."""
|
||
conn = get_conn()
|
||
row = conn.execute("SELECT notes FROM strategy_rule_candidate WHERE id=%s", (candidate_id,)).fetchone()
|
||
if not row:
|
||
conn.close()
|
||
return False
|
||
notes = (row["notes"] or "").strip()
|
||
if notes_append:
|
||
notes = (notes + "\n" if notes else "") + f"[{datetime.now().isoformat()}] {notes_append}"
|
||
conn.execute("""
|
||
UPDATE strategy_rule_candidate
|
||
SET status=%s, release_version=COALESCE(NULLIF(%s, ''), release_version), notes=%s, created_at=%s
|
||
WHERE id=%s
|
||
""", (status or "candidate", release_version or "", notes, datetime.now().isoformat(), candidate_id))
|
||
conn.commit()
|
||
conn.close()
|
||
return True
|
||
|
||
|
||
def get_strategy_failure_patterns(limit=50):
|
||
"""Read failure-pattern rows."""
|
||
conn = get_conn()
|
||
rows = conn.execute("""
|
||
SELECT * FROM strategy_failure_pattern
|
||
ORDER BY created_at DESC
|
||
LIMIT %s
|
||
""", (limit,)).fetchall()
|
||
conn.close()
|
||
items = []
|
||
for row in rows:
|
||
item = dict(row)
|
||
item["signal_combo"] = loads_json_field(item.get("signal_combo"), [])
|
||
item["market_context"] = loads_json_field(item.get("market_context_json"), {})
|
||
items.append(item)
|
||
return items
|
||
|
||
|
||
def candidate_signal_key(signal_text):
|
||
"""Lightweight signal normalization for candidate attribution."""
|
||
text = str(signal_text or "")
|
||
key_map = {
|
||
"量价齐飞": "vp_fly",
|
||
"N倍放量": "vol_Nx",
|
||
"放量": "1h_vol",
|
||
"供需区突破": "zone_break",
|
||
"供给区突破": "zone_break",
|
||
"站稳突破": "zone_break",
|
||
"起爆点": "ignition",
|
||
"静K→动K": "ignition",
|
||
"静K蓄力": "sk_accum",
|
||
"连续3K": "cont3k",
|
||
"连续K": "cont_k",
|
||
"Q≥7": "q7_break",
|
||
"动K": "dyn_k",
|
||
"过期": "stale_signal",
|
||
"历史": "stale_signal",
|
||
"追高": "chase_high",
|
||
"假突破": "false_breakout",
|
||
"量价背离": "vp_divergence",
|
||
}
|
||
for marker, key in key_map.items():
|
||
if marker in text:
|
||
return key
|
||
return text[:12]
|
||
|
||
|
||
def get_factor_recency_fixed_at():
|
||
"""Factor-recency fix time. Older recommendations are dirty-history references."""
|
||
try:
|
||
meta = get_meta() or {}
|
||
except Exception:
|
||
meta = {}
|
||
return (meta.get("factor_recency_fixed_at") or meta.get("clean_review_started_at") or "").strip()
|
||
|
||
|
||
def is_dirty_history_candidate(candidate):
|
||
source = str(candidate.get("source") or "")
|
||
notes = str(candidate.get("notes") or "")
|
||
source_ref = str(candidate.get("source_ref") or "")
|
||
return source in ("history_review_auto", "dirty_history_reference") or "dirty_history" in source_ref or "污染历史" in notes
|
||
|
||
|
||
def candidate_status_for_metrics(rule_type, sample_size, confidence, avg_pnl, current_status="candidate",
|
||
min_gray_samples=10, min_gray_confidence=65):
|
||
"""Derive candidate lifecycle status from clean sample metrics."""
|
||
if current_status == "active":
|
||
return "active"
|
||
if sample_size >= min_gray_samples and confidence >= min_gray_confidence and (avg_pnl > 0 or rule_type == "penalty"):
|
||
return "gray"
|
||
if sample_size >= 8 and ((rule_type != "penalty" and confidence < 35) or avg_pnl <= -3):
|
||
return "rejected"
|
||
if current_status in ("gray", "rejected"):
|
||
return current_status
|
||
return "candidate"
|
||
|
||
|
||
def classify_failure_type_from_text(review):
|
||
"""Local classifier for historical failure-pattern backfill."""
|
||
signals = review.get("triggered_signals") or []
|
||
miss = review.get("miss_signals") or []
|
||
lesson = review.get("lesson") or ""
|
||
text = " ".join([str(x) for x in signals + miss]) + " " + str(lesson or "")
|
||
pnl = float(review.get("pnl_48h") or 0)
|
||
outcome = review.get("outcome") or ""
|
||
if any(k in text for k in ["过期", "历史", "旧放量", "age_bars", "已过期", "小时前", "旧起爆"]):
|
||
return "过期因子误判", "历史放量/起爆/突破不能当作当前触发信号,必须做时效闸门"
|
||
if any(k in text for k in ["假突破", "突破失败", "未站稳", "冲高回落"]):
|
||
return "假突破", "突破后没有站稳或快速回落,需要增加站稳/承接确认"
|
||
if any(k in text for k in ["量价背离", "缩量上涨", "放量下跌", "无量拉升"]):
|
||
return "量价背离", "价格动作与成交量不匹配,量能确认不足"
|
||
if any(k in text for k in ["高位", "追高", "涨幅过大", "乖离"]):
|
||
return "追高风险", "入场位置偏高,盈亏比和回撤风险恶化"
|
||
if any(k in text for k in ["承接不足", "无承接", "上影线", "砸盘"]):
|
||
return "高位无承接", "高位出现抛压但缺少买盘承接"
|
||
if any(k in text for k in ["板块退潮", "热点退潮", "龙头走弱", "板块分歧"]):
|
||
return "板块退潮", "板块热度回落,个币信号容易失效"
|
||
if any(k in text for k in ["BTC", "大盘", "反向共振", "系统性"]):
|
||
return "BTC/大盘反向共振", "大盘方向与个币信号冲突,需要宏观/主流币过滤"
|
||
if any(k in text for k in ["止损", "盈亏比", "RR", "止盈"]):
|
||
return "止损/盈亏比不合理", "止损或止盈结构不合理,导致信号收益风险不匹配"
|
||
if "滞后" in text or "MACD" in text or "RSI" in text:
|
||
return "滞后信号追高", "滞后指标占比高,容易形成事后确认/追高失败"
|
||
if "缺乏前瞻" in text or "前瞻" not in text:
|
||
return "前瞻信号不足", "缺少量价/PA等前瞻性确认"
|
||
if "横盘" in text or outcome == "横盘":
|
||
return "信号强度不足", "触发后未形成有效爆发,确认条件偏弱"
|
||
if "回撤" in text or pnl < -3:
|
||
return "入场点太晚", "入场后回撤/亏损明显,买点可能滞后或确认过慢"
|
||
return "未分类失败", "需要继续积累样本做二级归因"
|
||
|
||
|
||
def backfill_strategy_failure_patterns(limit=2000, dry_run=False):
|
||
"""Backfill failure-pattern rows from historical review_log, deduped by rec_id."""
|
||
conn = get_conn()
|
||
rows = conn.execute("""
|
||
SELECT rl.*, r.strategy_version, r.max_drawdown_pct
|
||
FROM review_log rl
|
||
LEFT JOIN recommendation r ON r.id = rl.rec_id
|
||
WHERE rl.outcome IN ('失败','横盘')
|
||
ORDER BY rl.review_time DESC
|
||
LIMIT %s
|
||
""", (limit,)).fetchall()
|
||
existing = set()
|
||
for row in conn.execute("SELECT market_context_json FROM strategy_failure_pattern").fetchall():
|
||
ctx = loads_json_field(row["market_context_json"], {})
|
||
if ctx.get("rec_id") is not None:
|
||
existing.add(str(ctx.get("rec_id")))
|
||
inserted = 0
|
||
skipped = 0
|
||
type_counts = {}
|
||
examples = []
|
||
for row in rows:
|
||
item = dict(row)
|
||
rec_id = item.get("rec_id")
|
||
if str(rec_id) in existing:
|
||
skipped += 1
|
||
continue
|
||
triggered = loads_json_field(item.get("triggered_signals"), []) or []
|
||
miss = loads_json_field(item.get("miss_signals"), []) or []
|
||
item["triggered_signals"] = triggered
|
||
item["miss_signals"] = miss
|
||
failure_type, reason = classify_failure_type_from_text(item)
|
||
type_counts[failure_type] = type_counts.get(failure_type, 0) + 1
|
||
if len(examples) < 10:
|
||
examples.append({"rec_id": rec_id, "symbol": item.get("symbol"), "failure_type": failure_type, "reason": reason})
|
||
if not dry_run:
|
||
conn.execute("""
|
||
INSERT INTO strategy_failure_pattern (
|
||
created_at, symbol, version, failure_type, failure_reason, signal_combo,
|
||
market_context_json, entry_quality_issue, pnl_pct, max_drawdown_pct, lesson
|
||
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
""", (
|
||
datetime.now().isoformat(), item.get("symbol") or "", item.get("strategy_version") or "",
|
||
failure_type, reason, json.dumps(triggered, ensure_ascii=False, default=str),
|
||
json.dumps({"source": "history_backfill", "rec_id": rec_id, "outcome": item.get("outcome"), "review_time": item.get("review_time")}, ensure_ascii=False, default=str),
|
||
reason, float(item.get("pnl_48h") or 0), float(item.get("max_drawdown_pct") or 0), item.get("lesson") or "",
|
||
))
|
||
existing.add(str(rec_id))
|
||
inserted += 1
|
||
if not dry_run:
|
||
conn.commit()
|
||
conn.close()
|
||
return {"dry_run": dry_run, "scanned": len(rows), "inserted": inserted, "skipped_existing": skipped, "type_counts": type_counts, "examples": examples}
|
||
|
||
|
||
def generate_candidates_from_review_history(min_samples=20, min_bonus_confidence=55, max_penalty_confidence=40, dry_run=False):
|
||
"""Generate strategy rule candidates from historical review_log."""
|
||
conn = get_conn()
|
||
rows = conn.execute("""
|
||
SELECT rl.*, r.max_drawdown_pct
|
||
FROM review_log rl
|
||
LEFT JOIN recommendation r ON r.id = rl.rec_id
|
||
ORDER BY rl.review_time DESC
|
||
""").fetchall()
|
||
buckets = {}
|
||
for row in rows:
|
||
item = dict(row)
|
||
triggered = loads_json_field(item.get("triggered_signals"), []) or []
|
||
hit = loads_json_field(item.get("hit_signals"), []) or []
|
||
miss = loads_json_field(item.get("miss_signals"), []) or []
|
||
keys = {candidate_signal_key(x) for x in list(triggered) + list(hit) + list(miss) if str(x).strip()}
|
||
if not keys:
|
||
continue
|
||
for key in keys:
|
||
bucket = buckets.setdefault(key, {"sample_size": 0, "success_count": 0, "fail_count": 0, "pnl_values": [], "dd_values": []})
|
||
bucket["sample_size"] += 1
|
||
if item.get("outcome") == "爆发":
|
||
bucket["success_count"] += 1
|
||
elif item.get("outcome") in ("失败", "横盘"):
|
||
bucket["fail_count"] += 1
|
||
bucket["pnl_values"].append(float(item.get("pnl_48h") or 0))
|
||
bucket["dd_values"].append(float(item.get("max_drawdown_pct") or 0))
|
||
generated = []
|
||
for key, bucket in buckets.items():
|
||
sample = bucket["sample_size"]
|
||
if sample < min_samples:
|
||
continue
|
||
resolved = bucket["success_count"] + bucket["fail_count"]
|
||
confidence = round(bucket["success_count"] / resolved * 100, 1) if resolved else 0
|
||
avg_pnl = round(sum(bucket["pnl_values"]) / len(bucket["pnl_values"]), 2) if bucket["pnl_values"] else 0
|
||
max_gain = round(max(bucket["pnl_values"]), 2) if bucket["pnl_values"] else 0
|
||
max_drawdown = round(min(bucket["dd_values"]), 2) if bucket["dd_values"] else 0
|
||
rule_type = "bonus" if confidence >= min_bonus_confidence and avg_pnl > 0 else "penalty" if confidence <= max_penalty_confidence else "observe"
|
||
if rule_type == "observe":
|
||
continue
|
||
status = candidate_status_for_metrics(rule_type, sample, confidence, avg_pnl, "candidate")
|
||
if rule_type == "bonus":
|
||
desc = f"历史样本候选加分因子:{key},样本{sample},成功{bucket['success_count']},失败/横盘{bucket['fail_count']},置信{confidence}%,均值{avg_pnl}%"
|
||
else:
|
||
desc = f"历史样本候选惩罚因子:{key},样本{sample},成功{bucket['success_count']},失败/横盘{bucket['fail_count']},置信{confidence}%,均值{avg_pnl}%"
|
||
candidate = {
|
||
"source": "dirty_history_reference",
|
||
"rule_type": rule_type,
|
||
"signal_name": key,
|
||
"rule_description": desc,
|
||
"support_count": sample,
|
||
"success_count": bucket["success_count"],
|
||
"fail_count": bucket["fail_count"],
|
||
"avg_pnl": avg_pnl,
|
||
"max_gain": max_gain,
|
||
"max_drawdown": max_drawdown,
|
||
"confidence_score": confidence,
|
||
"sample_size": sample,
|
||
"status": status,
|
||
"source_ref": f"dirty_history:{key}",
|
||
}
|
||
generated.append(candidate)
|
||
if not dry_run:
|
||
upsert_strategy_rule_candidate(
|
||
source=candidate["source"], rule_type=rule_type, signal_name=key,
|
||
rule_description=desc, support_count=sample, success_count=bucket["success_count"],
|
||
fail_count=bucket["fail_count"], avg_pnl=avg_pnl, max_gain=max_gain,
|
||
max_drawdown=max_drawdown, confidence_score=confidence, sample_size=sample,
|
||
status=status, notes="历史review_log自动生成:候选规则仍需灰度验证后才可发布",
|
||
source_ref=candidate["source_ref"],
|
||
)
|
||
if not dry_run:
|
||
conn.commit()
|
||
conn.close()
|
||
generated.sort(key=lambda x: (-x["sample_size"], x["rule_type"], -x["confidence_score"]))
|
||
return {"dry_run": dry_run, "review_rows": len(rows), "generated_count": len(generated), "generated": generated[:80]}
|
||
|
||
|
||
def dry_run_strategy_candidate_performance(min_gray_samples=10, min_gray_confidence=65):
|
||
"""Evaluate strategy candidates without writing DB state."""
|
||
conn = get_conn()
|
||
candidates = [dict(r) for r in conn.execute("SELECT * FROM strategy_rule_candidate").fetchall()]
|
||
clean_started_at = get_factor_recency_fixed_at()
|
||
if clean_started_at:
|
||
review_rows = conn.execute("""
|
||
SELECT rl.*, r.strategy_version, r.max_drawdown_pct, r.rec_time
|
||
FROM review_log rl
|
||
LEFT JOIN recommendation r ON r.id = rl.rec_id
|
||
WHERE r.rec_time >= %s
|
||
ORDER BY rl.review_time DESC
|
||
""", (clean_started_at,)).fetchall()
|
||
else:
|
||
review_rows = conn.execute("""
|
||
SELECT rl.*, r.strategy_version, r.max_drawdown_pct, r.rec_time
|
||
FROM review_log rl
|
||
LEFT JOIN recommendation r ON r.id = rl.rec_id
|
||
ORDER BY rl.review_time DESC
|
||
""").fetchall()
|
||
failure_rows = [dict(r) for r in conn.execute("SELECT * FROM strategy_failure_pattern ORDER BY created_at DESC").fetchall()]
|
||
try:
|
||
current_version = str(get_meta().get("strategy_version") or "").strip()
|
||
except Exception:
|
||
current_version = ""
|
||
conn.close()
|
||
|
||
review_items = _build_review_items(review_rows)
|
||
evaluated = []
|
||
for candidate in candidates:
|
||
evaluated.append(_evaluate_candidate(candidate, review_items, failure_rows, min_gray_samples, min_gray_confidence))
|
||
|
||
gray_ready = [x for x in evaluated if x.get("dry_run_status") == "gray"]
|
||
active_ready = [x for x in evaluated if x.get("dry_run_status") == "active"]
|
||
rejected = [x for x in evaluated if x.get("dry_run_status") == "rejected"]
|
||
return {
|
||
"dry_run": True,
|
||
"current_version": current_version,
|
||
"review_sample_count": len(review_items),
|
||
"clean_started_at": clean_started_at,
|
||
"sample_window": "clean_after_factor_recency_fix" if clean_started_at else "all_history",
|
||
"dirty_history_candidate_count": sum(1 for c in candidates if is_dirty_history_candidate(c)),
|
||
"candidate_count": len(candidates),
|
||
"gray_ready_count": len(gray_ready),
|
||
"active_count": len(active_ready),
|
||
"rejected_count": len(rejected),
|
||
"would_bump_version": False,
|
||
"release_reason": "dry-run只评估候选规则表现,不执行 learned_rules 写入或版本升级",
|
||
"gate_policy": {
|
||
"gray": f"sample_size≥{min_gray_samples} 且 confidence≥{min_gray_confidence} 且 avg_pnl>0(penalty规则可不要求avg_pnl>0)",
|
||
"reject": "sample_size≥8 且 confidence<35 或 avg_pnl≤-3",
|
||
"release": "dry-run不发布;正式发布仍由复盘发布闸门统一控制",
|
||
},
|
||
"evaluated_candidates": sorted(evaluated, key=lambda x: (x.get("dry_run_status") != "gray", -float(x.get("sample_size") or 0), -float(x.get("confidence_score") or 0)))[:80],
|
||
}
|
||
|
||
|
||
def refresh_strategy_candidate_performance(min_gray_samples=10, min_gray_confidence=65):
|
||
"""Refresh candidate metrics and status from clean review/failure samples."""
|
||
conn = get_conn()
|
||
candidates = conn.execute("SELECT * FROM strategy_rule_candidate").fetchall()
|
||
clean_started_at = get_factor_recency_fixed_at()
|
||
if clean_started_at:
|
||
review_rows = conn.execute("""
|
||
SELECT rl.*, r.strategy_version, r.max_drawdown_pct, r.rec_time
|
||
FROM review_log rl
|
||
LEFT JOIN recommendation r ON r.id = rl.rec_id
|
||
WHERE r.rec_time >= %s
|
||
ORDER BY rl.review_time DESC
|
||
""", (clean_started_at,)).fetchall()
|
||
else:
|
||
review_rows = conn.execute("""
|
||
SELECT rl.*, r.strategy_version, r.max_drawdown_pct, r.rec_time
|
||
FROM review_log rl
|
||
LEFT JOIN recommendation r ON r.id = rl.rec_id
|
||
ORDER BY rl.review_time DESC
|
||
""").fetchall()
|
||
failure_rows = [dict(row) for row in conn.execute("SELECT * FROM strategy_failure_pattern ORDER BY created_at DESC").fetchall()]
|
||
review_items = _build_review_items(review_rows)
|
||
|
||
updated = []
|
||
for row in candidates:
|
||
candidate = dict(row)
|
||
candidate_id = candidate["id"]
|
||
status = candidate.get("status") or "candidate"
|
||
if status == "active":
|
||
continue
|
||
evaluated = _evaluate_candidate(candidate, review_items, failure_rows, min_gray_samples, min_gray_confidence)
|
||
if evaluated.get("dry_run_status") == "dirty_history":
|
||
updated.append({
|
||
"id": candidate_id,
|
||
"signal_name": candidate.get("signal_name") or "",
|
||
"source": candidate.get("source") or "",
|
||
"rule_type": candidate.get("rule_type") or "",
|
||
"sample_size": 0,
|
||
"success_count": 0,
|
||
"fail_count": 0,
|
||
"confidence_score": candidate.get("confidence_score") or 0,
|
||
"avg_pnl": candidate.get("avg_pnl") or 0,
|
||
"status": "dirty_history",
|
||
"description": candidate.get("rule_description") or "",
|
||
"gate_reason": "污染历史参考,不参与干净样本刷新",
|
||
})
|
||
continue
|
||
|
||
note = (candidate.get("notes") or "").strip()
|
||
audit_note = (
|
||
f"[{datetime.now().isoformat()}] 自动评估: 样本{evaluated['sample_size']}, "
|
||
f"成功{evaluated['success_count']}, 失败{evaluated['fail_count']}, "
|
||
f"置信{evaluated['confidence_score']}%, avg_pnl={evaluated['avg_pnl']}%, "
|
||
f"status={evaluated['dry_run_status']}"
|
||
)
|
||
if audit_note not in note:
|
||
note = (note + "\n" if note else "") + audit_note
|
||
|
||
conn.execute("""
|
||
UPDATE strategy_rule_candidate
|
||
SET support_count=%s, success_count=%s, fail_count=%s, avg_pnl=%s, max_gain=%s,
|
||
max_drawdown=%s, confidence_score=%s, sample_size=%s, status=%s, notes=%s, created_at=%s
|
||
WHERE id=%s
|
||
""", (
|
||
evaluated["sample_size"], evaluated["success_count"], evaluated["fail_count"],
|
||
evaluated["avg_pnl"], evaluated["max_gain"], evaluated["max_drawdown"],
|
||
evaluated["confidence_score"], evaluated["sample_size"], evaluated["dry_run_status"],
|
||
note, datetime.now().isoformat(), candidate_id,
|
||
))
|
||
updated.append({
|
||
"id": candidate_id,
|
||
"signal_name": candidate.get("signal_name") or "",
|
||
"source": candidate.get("source") or "",
|
||
"rule_type": candidate.get("rule_type") or "",
|
||
"sample_size": evaluated["sample_size"],
|
||
"success_count": evaluated["success_count"],
|
||
"fail_count": evaluated["fail_count"],
|
||
"confidence_score": evaluated["confidence_score"],
|
||
"avg_pnl": evaluated["avg_pnl"],
|
||
"status": evaluated["dry_run_status"],
|
||
"description": candidate.get("rule_description") or "",
|
||
})
|
||
conn.commit()
|
||
conn.close()
|
||
return updated
|
||
|
||
|
||
def _build_review_items(review_rows):
|
||
review_items = []
|
||
for row in review_rows:
|
||
item = dict(row)
|
||
triggered = loads_json_field(item.get("triggered_signals"), []) or []
|
||
hit = loads_json_field(item.get("hit_signals"), []) or []
|
||
miss = loads_json_field(item.get("miss_signals"), []) or []
|
||
all_signals = list(triggered) + list(hit) + list(miss)
|
||
item["signal_keys"] = {candidate_signal_key(signal) for signal in all_signals}
|
||
item["all_signal_text"] = " ".join(str(signal) for signal in all_signals)
|
||
review_items.append(item)
|
||
return review_items
|
||
|
||
|
||
def _evaluate_candidate(candidate, review_items, failure_rows, min_gray_samples, min_gray_confidence):
|
||
status = candidate.get("status") or "candidate"
|
||
source = candidate.get("source") or ""
|
||
rule_type = candidate.get("rule_type") or ""
|
||
signal_name = candidate.get("signal_name") or ""
|
||
source_ref = candidate.get("source_ref") or ""
|
||
if is_dirty_history_candidate(candidate):
|
||
return {
|
||
**candidate,
|
||
"sample_size": 0,
|
||
"support_count": 0,
|
||
"success_count": 0,
|
||
"fail_count": 0,
|
||
"dry_run_status": "dirty_history",
|
||
"release_gate_passed": False,
|
||
"gate_reason": "因子时效修复前的污染历史参考:不参与干净样本统计,不允许发布",
|
||
}
|
||
if status == "active":
|
||
return {**candidate, "dry_run_status": "active", "release_gate_passed": True, "gate_reason": "已正式生效,不参与dry-run降级"}
|
||
|
||
if source.startswith("dual_attribution_failure") or source_ref.startswith("failure:") or rule_type == "penalty":
|
||
failure_type = signal_name or source_ref.replace("failure:", "")
|
||
matched = [row for row in failure_rows if (row.get("failure_type") or "") == failure_type or failure_type in (row.get("failure_reason") or "")]
|
||
sample_size = len(matched)
|
||
success_count = 0
|
||
fail_count = sample_size
|
||
pnl_values = [float(row.get("pnl_pct") or 0) for row in matched]
|
||
dd_values = [float(row.get("max_drawdown_pct") or 0) for row in matched]
|
||
confidence = round(min(95, 45 + fail_count * 8), 1) if sample_size else float(candidate.get("confidence_score") or 0)
|
||
else:
|
||
key = signal_name or source_ref.replace("review:", "")
|
||
matched = [item for item in review_items if key and (key in item["signal_keys"] or key in item["all_signal_text"] or signal_name in item["all_signal_text"])]
|
||
sample_size = len(matched)
|
||
success_count = sum(1 for row in matched if row.get("outcome") == "爆发")
|
||
fail_count = sum(1 for row in matched if row.get("outcome") in ("失败", "横盘"))
|
||
pnl_values = [float(row.get("pnl_48h") or 0) for row in matched]
|
||
dd_values = [float(row.get("max_drawdown_pct") or 0) for row in matched]
|
||
resolved = success_count + fail_count
|
||
confidence = round(success_count / resolved * 100, 1) if resolved else float(candidate.get("confidence_score") or 0)
|
||
|
||
avg_pnl = round(sum(pnl_values) / len(pnl_values), 2) if pnl_values else float(candidate.get("avg_pnl") or 0)
|
||
max_gain = round(max(pnl_values), 2) if pnl_values else float(candidate.get("max_gain") or 0)
|
||
max_drawdown = round(min(dd_values), 2) if dd_values else float(candidate.get("max_drawdown") or 0)
|
||
dry_status = candidate_status_for_metrics(rule_type, sample_size, confidence, avg_pnl, status, min_gray_samples, min_gray_confidence)
|
||
gate_passed = dry_status in ("gray", "active")
|
||
if dry_status == "gray":
|
||
gate_reason = f"样本{sample_size}≥{min_gray_samples},置信{confidence}%≥{min_gray_confidence},avg_pnl={avg_pnl}%:可进入灰度,仍不升版"
|
||
elif dry_status == "rejected":
|
||
gate_reason = f"样本{sample_size}已足够但置信/收益不达标:淘汰,不允许发布"
|
||
else:
|
||
gate_reason = f"样本{sample_size}或置信{confidence}%不足:只研究不发布"
|
||
return {
|
||
**candidate,
|
||
"sample_size": sample_size,
|
||
"support_count": sample_size,
|
||
"success_count": success_count,
|
||
"fail_count": fail_count,
|
||
"avg_pnl": avg_pnl,
|
||
"max_gain": max_gain,
|
||
"max_drawdown": max_drawdown,
|
||
"confidence_score": confidence,
|
||
"dry_run_status": dry_status,
|
||
"release_gate_passed": gate_passed,
|
||
"gate_reason": gate_reason,
|
||
}
|
||
|
||
|
||
def get_strategy_iteration_dashboard(days=30):
|
||
"""Dashboard aggregate: overview + candidates + failure patterns + timeline."""
|
||
from app.db.review_queries import get_strategy_iteration_logs, get_strategy_iteration_summary
|
||
|
||
summary = get_strategy_iteration_summary(days=days)
|
||
candidates = get_strategy_rule_candidates(limit=80)
|
||
failures = get_strategy_failure_patterns(limit=80)
|
||
logs = get_strategy_iteration_logs(limit=40)
|
||
status_counts = {}
|
||
source_counts = {}
|
||
for candidate in candidates:
|
||
status = candidate.get("status") or "candidate"
|
||
source = candidate.get("source") or "unknown"
|
||
status_counts[status] = status_counts.get(status, 0) + 1
|
||
source_counts[source] = source_counts.get(source, 0) + 1
|
||
failure_counts = {}
|
||
for failure in failures:
|
||
failure_type = failure.get("failure_type") or "未分类"
|
||
failure_counts[failure_type] = failure_counts.get(failure_type, 0) + 1
|
||
release_counts = {}
|
||
for log in logs:
|
||
decision = log.get("release_decision") or "unknown"
|
||
release_counts[decision] = release_counts.get(decision, 0) + 1
|
||
dry_run = dry_run_strategy_candidate_performance()
|
||
latest_log = logs[0] if logs else {}
|
||
return {
|
||
"summary": summary,
|
||
"overview": {
|
||
"total_logs": len(logs),
|
||
"candidate_count": len(candidates),
|
||
"candidate_status_counts": status_counts,
|
||
"candidate_source_counts": source_counts,
|
||
"failure_type_counts": [{"type": k, "count": v} for k, v in sorted(failure_counts.items(), key=lambda x: (-x[1], x[0]))],
|
||
"release_decision_counts": release_counts,
|
||
"latest_release_decision": latest_log.get("release_decision") or "hold",
|
||
"latest_release_reason": latest_log.get("release_reason") or latest_log.get("version_change_summary") or "暂无发布决策说明",
|
||
"dry_run_summary": {
|
||
"review_sample_count": dry_run.get("review_sample_count", 0),
|
||
"clean_started_at": dry_run.get("clean_started_at", ""),
|
||
"sample_window": dry_run.get("sample_window", "all_history"),
|
||
"dirty_history_candidate_count": dry_run.get("dirty_history_candidate_count", 0),
|
||
"candidate_count": dry_run.get("candidate_count", 0),
|
||
"gray_ready_count": dry_run.get("gray_ready_count", 0),
|
||
"rejected_count": dry_run.get("rejected_count", 0),
|
||
"would_bump_version": dry_run.get("would_bump_version", False),
|
||
"release_reason": dry_run.get("release_reason", ""),
|
||
},
|
||
},
|
||
"dry_run": dry_run,
|
||
"candidates": candidates,
|
||
"failures": failures,
|
||
"logs": logs,
|
||
}
|