alphax/app/services/review_engine.py
2026-06-01 21:11:33 +08:00

1638 lines
68 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
山寨币复盘引擎 — 自我迭代的核心闭环
1. 48h归因复盘推荐后48h判断爆发/横盘/失败,归因信号
2. 信号绩效统计:每个信号类型的命中率,动态调权
3. 漏选复盘:没选但后来爆发的币,逆推漏选原因
4. 逆向分析:涨幅榜复盘,提取起爆前共性特征,发现规律
5. 规律提炼跨多轮复盘数据自动生成learned_rule
6. 输出JSON结果 + 飞书推送复盘报告 + 逆向分析报告
"""
import json
import os
import sys
import time
from datetime import datetime, timedelta
from collections import defaultdict, Counter
sys.path.insert(0, os.path.dirname(__file__))
from app.db.altcoin_db import (
get_conn, record_review, update_signal_performance,
get_signal_weights, record_missed_explosion, get_review_stats, log_strategy_iteration,
upsert_strategy_rule_candidate, record_strategy_failure_pattern,
get_strategy_rule_candidates, update_strategy_rule_candidate_status,
refresh_strategy_candidate_performance,
)
from app.core.pa_engine import classify_candles, calc_atr, full_pa_analysis
from app.core.opportunity_lifecycle import derive_display_bucket, normalize_action_status
from app.core.signal_taxonomy import signal_code, signal_label_for_code, signal_codes
from app.config.config_loader import (
get_review_params, update_meta, get_learned_rules, add_learned_rule,
get_rules_snapshot, diff_rule_snapshots, get_meta, update_signal_weight,
promote_candidate_rule_to_learned_rule, bump_strategy_patch_version,
)
from app.analysis import reverse_analysis
import requests
BINANCE_API = "https://api.binance.com/api/v3"
# ==================== 信号分类映射 ====================
# 每个信号属于哪个分类:前瞻(量价行为) / 滞后(MACD/RSI/均线) / PA(供需区/动K静K)
SIGNAL_CATEGORY_MAP = {
"N倍放量": "前瞻",
"量价齐飞": "前瞻",
"暴力突破供给区": "前瞻",
"静K→动K起爆": "前瞻",
"1H放量(3.2倍)": "前瞻",
"1H放量": "前瞻",
"4H MACD金叉!": "滞后",
"4H MACD柱扩张↑": "滞后",
"4H MACD柱收缩↓": "滞后",
"MACD金叉": "滞后",
"4H RSI拐点": "滞后",
"RSI拐点": "滞后",
"4H 均线多头排列": "滞后",
"均线多头排列": "滞后",
"4H 3静K蓄力": "PA",
"静K蓄力": "PA",
"连续3K多头加速": "PA",
"连续K加速": "PA",
"Q≥7供需区突破": "PA",
"供给区突破": "PA",
"动K起爆": "PA",
}
def get_signal_category(signal_text):
"""根据信号文本推断分类"""
for key, cat in SIGNAL_CATEGORY_MAP.items():
if key in signal_text:
return cat
# 未知信号默认分类
if "MACD" in signal_text or "RSI" in signal_text or "均线" in signal_text:
return "滞后"
if "放量" in signal_text or "量价" in signal_text or "突破" in signal_text:
return "前瞻"
if "静K" in signal_text or "动K" in signal_text or "蓄力" in signal_text or "Q" in signal_text:
return "PA"
return "未知"
# ==================== 爆发判定标准从config_loader读取 ====================
def _get_thresholds():
"""从config_loader读取复盘阈值参数"""
params = get_review_params()
return {
"hit_threshold_pct": params.get("hit_threshold_pct", 5.0),
"fail_threshold_pct": params.get("fail_threshold_pct", -3.0),
"missed_explosion_pct": params.get("missed_explosion_pct", 20.0),
"min_samples_for_weight": params.get("min_samples_for_weight", 12),
"weight_floor": params.get("weight_floor", 0.0),
"hit_rate_kill_threshold": params.get("hit_rate_kill_threshold", 0.10),
"kill_min_samples": params.get("kill_min_samples", 20),
"category_base_weights": params.get("category_base_weights", {"前瞻": 2.0, "PA": 1.5, "滞后": 0.5}),
}
def _get_strategy_revision_started_at():
"""返回当前策略改版起始时间;未设置则返回空字符串"""
try:
meta = get_meta() or {}
except Exception:
meta = {}
return (meta.get("strategy_revision_started_at") or "").strip()
def _get_reviewable_recommendations(now=None):
"""获取所有未复盘且已进入执行口径的推荐。
观察池、等回踩未触发、粗筛直写的无 entry_plan 样本只参与漏选/候选研究,
不进入推荐绩效复盘,避免把“发现机会”当成“交易推荐”。
"""
now = now or datetime.now()
conn = get_conn()
revision_started_at = _get_strategy_revision_started_at()
executable_filter = """
AND (
COALESCE(entry_triggered,0)=1
OR status IN ('hit_tp1','hit_tp2','stopped_out')
OR COALESCE(execution_status,'') IN ('buy_now','completed','holding')
OR COALESCE(action_status,'') IN ('可即刻买入','止盈1','止盈2','跟踪止盈','止损')
)
AND NOT (
status='active'
AND COALESCE(entry_triggered,0)=0
AND (
COALESCE(execution_status,'') IN ('wait_pullback','observe')
OR COALESCE(display_bucket,'watch_pool')='watch_pool'
OR COALESCE(action_status,'') IN ('等回踩','观察','持有','')
)
)
"""
if revision_started_at:
rows = conn.execute(
f"""
SELECT * FROM recommendation
WHERE rec_time <= %s
AND rec_time >= %s
AND id NOT IN (SELECT rec_id FROM review_log)
{executable_filter}
ORDER BY rec_time ASC
""",
((now - timedelta(days=1)).isoformat(), revision_started_at),
).fetchall()
else:
rows = conn.execute(
f"""
SELECT * FROM recommendation
WHERE rec_time <= %s
AND id NOT IN (SELECT rec_id FROM review_log)
{executable_filter}
ORDER BY rec_time ASC
""",
((now - timedelta(days=1)).isoformat(),),
).fetchall()
conn.close()
return rows
def _is_reviewable_execution(rec):
status = str(rec.get("status") or "active").strip()
action = normalize_action_status(rec.get("action_status"), status)
execution_status = str(rec.get("execution_status") or "").strip()
if status in ("hit_tp1", "hit_tp2", "stopped_out"):
return True
bucket = derive_display_bucket(status, action, execution_status)
if not rec.get("entry_triggered") and (
execution_status in ("wait_pullback", "observe")
or bucket.get("display_bucket") == "watch_pool"
or action in ("等回踩", "观察", "持有")
):
return False
return (
bool(rec.get("entry_triggered"))
or status in ("hit_tp1", "hit_tp2", "stopped_out")
or execution_status in ("buy_now", "completed", "holding")
or action in ("可即刻买入", "止盈1", "止盈2", "跟踪止盈", "止损")
)
def _window_price_metrics(rec, hours=48):
"""Calculate realized review-window metrics from price_tracking snapshots."""
entry_price = float(rec.get("entry_price") or 0)
rec_time = rec.get("rec_time") or ""
rec_id = rec.get("id")
if not entry_price or not rec_time or not rec_id:
return {
"pnl_pct": 0,
"max_pnl_pct": 0,
"min_pnl_pct": 0,
"source": "insufficient_tracking",
"sample_count": 0,
"quality": "insufficient",
"reason": "missing_entry_or_rec_time",
}
try:
start = datetime.fromisoformat(str(rec_time))
except Exception:
start = None
if not start:
return {
"pnl_pct": 0,
"max_pnl_pct": 0,
"min_pnl_pct": 0,
"source": "insufficient_tracking",
"sample_count": 0,
"quality": "insufficient",
"reason": "invalid_rec_time",
}
end = start + timedelta(hours=hours)
conn = get_conn()
rows = conn.execute(
"""
SELECT price, track_time FROM price_tracking
WHERE rec_id=%s AND track_time >= %s AND track_time <= %s
ORDER BY track_time ASC
""",
(rec_id, start.isoformat(), end.isoformat()),
).fetchall()
conn.close()
if not rows:
return {
"pnl_pct": 0,
"max_pnl_pct": 0,
"min_pnl_pct": 0,
"source": "insufficient_tracking",
"sample_count": 0,
"quality": "insufficient",
"reason": "missing_price_tracking_window",
}
prices = [float(r["price"] or 0) for r in rows if float(r["price"] or 0) > 0]
if not prices:
return {
"pnl_pct": 0,
"max_pnl_pct": 0,
"min_pnl_pct": 0,
"source": "insufficient_tracking",
"sample_count": 0,
"quality": "insufficient",
"reason": "invalid_price_tracking_prices",
}
close_price = prices[-1]
return {
"pnl_pct": round((close_price / entry_price - 1) * 100, 2),
"max_pnl_pct": round((max(prices) / entry_price - 1) * 100, 2),
"min_pnl_pct": round((min(prices) / entry_price - 1) * 100, 2),
"source": "price_tracking",
"sample_count": len(prices),
"quality": "complete",
"reason": "",
}
# ==================== 1. 推荐归因复盘 ====================
def fetch_klines(symbol, interval="1h", limit=96):
"""获取K线数据4天1H K线"""
try:
resp = requests.get(f"{BINANCE_API}/klines", params={
"symbol": symbol.replace("/", ""),
"interval": interval,
"limit": limit
}, timeout=10)
if resp.status_code != 200:
return []
raw = resp.json()
return [{"time": k[0], "open": float(k[1]), "high": float(k[2]),
"low": float(k[3]), "close": float(k[4]), "volume": float(k[5])}
for k in raw]
except Exception:
return []
def get_current_price(symbol):
"""获取当前价格"""
try:
resp = requests.get(f"{BINANCE_API}/ticker/price", params={
"symbol": symbol.replace("/", "")
}, timeout=5)
if resp.status_code == 200:
return float(resp.json()["price"])
except Exception:
pass
return 0
def _verify_signal_in_post_rec_pa(signal_text, post_rec_pa_result):
"""
更精确的信号归因检查推荐时触发的信号模式是否在推荐后PA中真正延续
例如:"量价齐飞" → 推荐后是否放量持续? "静K→动K起爆" → 推荐后动K是否延续
"""
if not post_rec_pa_result:
return False # 无法验证则不归为命中
# 检查动K延续
candles_class = post_rec_pa_result.get("candles_class", [])
dynamic_ks = [c for c in candles_class if c["type"] == "dynamic" and c["direction"] == 1]
has_dynamic_continuation = len(dynamic_ks) >= 2
# 检查放量持续
continuous_k = post_rec_pa_result.get("continuous_k", [])
has_continuous_k_bull = any(ck["type"] == "bullish_continue" and ck["length"] >= 3 for ck in continuous_k)
# 检查供需区突破延续
zones = post_rec_pa_result.get("zones", [])
demand_zones_nearby = [z for z in zones if z["type"] == "demand" and z["q_score"] >= 7]
# 检查起爆点延续
ignition_points = post_rec_pa_result.get("ignition_points", [])
bullish_ignitions = [ip for ip in ignition_points if ip["direction"] == 1]
# 根据信号文本逐个匹配
if "量价齐飞" in signal_text or "放量" in signal_text:
# 量价齐飞 → 推荐后需有连续K多头加速或动K延续
return has_dynamic_continuation or has_continuous_k_bull
elif "静K" in signal_text and "动K" in signal_text or "起爆" in signal_text:
# 静K→动K起爆 → 推荐后需有新的起爆点或动K延续
return len(bullish_ignitions) > 0 or has_dynamic_continuation
elif "Q≥7" in signal_text or "供给区突破" in signal_text or "供需区" in signal_text:
# 供需区突破 → 推荐后需有Q≥7需求区支撑
return len(demand_zones_nearby) > 0
elif "连续K" in signal_text or "加速" in signal_text:
# 连续K加速 → 推荐后需有连续K延续
return has_continuous_k_bull
elif "蓄力" in signal_text:
# 静K蓄力 → 推荐后蓄力结束转为动K
return len(bullish_ignitions) > 0 or has_dynamic_continuation
# 其他前瞻/PA信号有动K延续就视为确认
return has_dynamic_continuation
def review_recommendation(rec):
"""对一条推荐进行复盘归因(增强版 — 更精确的信号归因)"""
thresholds = _get_thresholds()
symbol = rec["symbol"]
entry_price = rec["entry_price"]
rec_time = rec["rec_time"]
rec_id = rec["id"]
signals_raw = rec["signals"]
if not _is_reviewable_execution(rec):
return {
"rec_id": rec_id,
"symbol": symbol,
"outcome": "未执行",
"pnl_48h": 0,
"max_pnl_48h": 0,
"triggered_signals": [],
"hit_signals": [],
"miss_signals": [],
"lesson": "观察池/等回踩未触发样本不进入推荐绩效复盘",
"skipped": True,
}
# 解析信号列表
try:
signals = json.loads(signals_raw) if isinstance(signals_raw, str) else signals_raw
except Exception:
signals = []
codes_raw = rec.get("signal_codes_json") or ""
try:
signal_code_list = json.loads(codes_raw) if isinstance(codes_raw, str) and codes_raw else []
except Exception:
signal_code_list = []
if not signal_code_list:
signal_code_list = signal_codes(signals)
review_signals = signal_code_list or ["unknown"]
# 计算固定48h窗口盈亏优先使用 price_tracking不用复盘运行时现价污染历史结果。
metrics = _window_price_metrics(rec, hours=48)
pnl_pct = metrics["pnl_pct"]
max_pnl_pct = metrics["max_pnl_pct"]
min_pnl_pct = metrics.get("min_pnl_pct", 0)
if metrics.get("quality") != "complete":
lesson = (
"48h price_tracking窗口样本不足复盘只记录占位不进入推荐绩效/信号权重。"
f"原因: {metrics.get('reason') or metrics.get('source')}"
)
record_review(
rec_id,
symbol,
"样本不足",
0,
0,
review_signals,
[],
[],
lesson,
)
return {
"rec_id": rec_id,
"symbol": symbol,
"outcome": "样本不足",
"pnl_48h": 0,
"max_pnl_48h": 0,
"triggered_signals": review_signals,
"hit_signals": [],
"miss_signals": [],
"lesson": lesson,
"window_metrics": metrics,
"skipped": True,
}
# 判定结果
if max_pnl_pct >= thresholds["hit_threshold_pct"]:
outcome = "爆发"
elif pnl_pct <= thresholds["fail_threshold_pct"] or min_pnl_pct <= thresholds["fail_threshold_pct"]:
outcome = "失败"
else:
outcome = "横盘"
# 归因:哪些信号命中了,哪些是假信号
hit_signals = []
miss_signals = []
# 用PA引擎重新分析推荐后的K线做精确信号归因
klines = fetch_klines(symbol, "1h", 96)
post_rec_pa_result = None
if klines and len(klines) >= 10:
import pandas as pd
df = pd.DataFrame(klines)
df["time"] = pd.to_datetime(df["time"], unit="ms")
# 找推荐后的K线段
rec_ts = _parse_ts(rec_time)
post_rec_start = 0
for i, k in enumerate(klines):
if k["time"] >= rec_ts:
post_rec_start = i
break
if post_rec_start > 0 and post_rec_start < len(df):
# 取推荐后12-24根1H K线做PA分析
post_rec_df = df.iloc[post_rec_start:post_rec_start + 24].copy()
if len(post_rec_df) >= 10:
post_rec_pa_result = full_pa_analysis(post_rec_df, timeframe="1h")
# 看推荐后12根1H K线内有没有动K
atr = calc_atr(df)
classification = classify_candles(df, atr)
for i in range(post_rec_start, min(post_rec_start + 12, len(classification))):
if classification[i]["type"] == "dynamic":
hit_signals.append("动K延续")
break
# 看推荐后有没有持续放量
if post_rec_start + 3 <= len(klines):
avg_vol = sum(klines[i]["volume"] for i in range(post_rec_start, post_rec_start + 3)) / 3
pre_vol = sum(k["volume"] for k in klines[-10:]) / min(len(klines), 10)
if pre_vol > 0 and avg_vol > pre_vol * 2:
hit_signals.append("放量持续")
# 对原始信号逐个归因(增强版 — 精确验证)
for code in review_signals:
label = signal_label_for_code(code)
sig_cat = get_signal_category(label)
if outcome == "爆发":
# 爆发了 → 前瞻/PA信号需验证是否真正延续
if sig_cat in ("前瞻", "PA"):
verified = _verify_signal_in_post_rec_pa(label, post_rec_pa_result)
if verified:
hit_signals.append(code)
else:
# 信号触发了但未延续 → 可能是假信号,不过爆发了就算部分命中
hit_signals.append(f"{code}:unverified")
else:
miss_signals.append(code) # 滞后指标只是事后确认
elif outcome == "失败":
# 失败了 → 所有信号都是假信号
miss_signals.append(code)
else:
# 横盘 → 滞后信号假信号概率高
if sig_cat == "滞后":
miss_signals.append(code)
elif sig_cat in ("前瞻", "PA"):
# 前瞻信号没骗人但也不够强
miss_signals.append(code)
# 生成教训
lesson = _generate_lesson(outcome, hit_signals, miss_signals, review_signals)
if metrics.get("source") == "price_tracking":
lesson = (lesson + "; " if lesson else "") + f"复盘收益来自48h price_tracking窗口({metrics.get('sample_count', 0)}个快照)"
# 写入复盘记录
record_review(rec_id, symbol, outcome, pnl_pct, max_pnl_pct,
review_signals, hit_signals, miss_signals, lesson)
# 更新每个信号的绩效统计
is_hit = outcome == "爆发"
for code in review_signals:
update_signal_performance(code, get_signal_category(signal_label_for_code(code)), is_hit, pnl_pct)
return {
"rec_id": rec_id,
"symbol": symbol,
"outcome": outcome,
"pnl_48h": pnl_pct,
"max_pnl_48h": max_pnl_pct,
"triggered_signals": review_signals,
"hit_signals": hit_signals,
"miss_signals": miss_signals,
"lesson": lesson,
"window_metrics": metrics,
}
def _parse_ts(ts_str):
"""解析时间字符串为timestamp"""
try:
dt = datetime.fromisoformat(ts_str)
return dt.timestamp() * 1000 # Binance用毫秒
except Exception:
return 0
def _generate_lesson(outcome, hit_signals, miss_signals, all_signals):
"""根据复盘结果生成教训"""
lessons = []
if outcome == "爆发":
if hit_signals:
hit_cats = [get_signal_category(s) for s in hit_signals if "(未延续)" not in s]
if "前瞻" in hit_cats:
lessons.append("前瞻信号(量价行为)是真正预判爆发的关键")
if "PA" in hit_cats:
lessons.append("PA信号(供需区/动K静K)有效确认了爆发方向")
# 检查是否有未延续信号
unverified = [s for s in hit_signals if "(未延续)" in s]
if unverified:
lessons.append(f"部分信号({', '.join(s.replace('(未延续)', '') for s in unverified[:2])})虽触发但未延续,需更强的确认条件")
if "滞后" in [get_signal_category(s) for s in miss_signals]:
lessons.append("滞后指标(MACD/RSI)只是事后确认,不具前瞻性")
elif outcome == "失败":
lagging = [s for s in miss_signals if get_signal_category(s) == "滞后"]
if lagging:
lessons.append(f"滞后指标({','.join(lagging[:2])})产生假信号,追高失败")
if not any(get_signal_category(s) == "前瞻" for s in all_signals):
lessons.append("缺乏前瞻性量价信号,仅靠滞后指标选币风险极高")
else: # 横盘
lessons.append("信号强度不足,推荐后无明显爆发行为")
lagging = [s for s in miss_signals if get_signal_category(s) == "滞后"]
if lagging:
lessons.append(f"滞后信号({','.join(lagging[:2])})无法预判启动时机")
return "; ".join(lessons)
# ==================== 2. 信号权重动态调整 ====================
def adjust_signal_weights():
"""根据信号绩效生成权重调整建议。
复盘/自学习只做研究和候选沉淀;不得绕过发布闸门直接修改 signal_performance 权重,
否则会出现“复盘报告说发现规律,实际策略已经被悄悄改掉”的口径分裂。
"""
thresholds = _get_thresholds()
weights = get_signal_weights()
adjustments = []
category_base = thresholds["category_base_weights"]
for sig_type, data in weights.items():
hit_rate = data["hit_rate"]
total = data["total_count"]
old_weight = data["weight"]
# 至少min_samples才调权
if total < thresholds["min_samples_for_weight"]:
continue
# 新权重 = 命中率/50 * 分类基础权重
cat = data["category"]
base = category_base.get(cat, 1.0)
new_weight = round(hit_rate / 50 * base, 2)
# 最低权重(连续失手→淘汰)
if hit_rate < thresholds["hit_rate_kill_threshold"] * 100 and total >= thresholds["kill_min_samples"]:
new_weight = thresholds["weight_floor"]
adjustments.append(f"⚠️ {sig_type} 命中率{hit_rate}%<10%, 淘汰(权重→0)")
# 权重变化>0.5才记录
if abs(new_weight - old_weight) >= 0.5:
adjustments.append(f"{sig_type}: {old_weight}{new_weight} (命中率{hit_rate}%)")
# 只记录建议,不直接更新 DB 权重;正式生效必须走候选规则发布闸门。
if adjustments and adjustments[-1].startswith(sig_type):
adjustments[-1] += "(研究建议,未直接生效)"
return adjustments
def _apply_daily_factor_weight_governance():
"""Promote good factors and suppress weak factors after each review run.
This is intentionally conservative: it only uses accumulated clean
``signal_performance`` rows, respects minimum sample thresholds, and writes
the resulting weight through ``update_signal_weight`` so the screener reads
it on the next run.
"""
thresholds = _get_thresholds()
weights = get_signal_weights()
min_samples = max(12, int(thresholds.get("min_samples_for_weight", 12) or 12))
kill_min_samples = max(min_samples, int(thresholds.get("kill_min_samples", 20) or 20))
kill_hit_rate = float(thresholds.get("hit_rate_kill_threshold", 0.10) or 0.10) * 100
warn_hit_rate = float((thresholds.get("signal_deprecation") or {}).get("hit_rate_warn_threshold", 0.20) or 0.20) * 100
category_base = thresholds.get("category_base_weights") or {"前瞻": 2.0, "PA": 1.5, "滞后": 0.5}
changes = []
for sig_type, data in sorted(weights.items()):
total = int(data.get("total_count") or 0)
if total < min_samples:
continue
hit_rate = float(data.get("hit_rate") or 0)
avg_pnl = float(data.get("avg_pnl") or 0)
old_weight = float(data.get("weight") or 0)
category = data.get("category") or "未知"
base = float(category_base.get(category, 1.0) or 1.0)
new_weight = old_weight
action = ""
# Expectancy-first governance: avg_pnl is the per-trade expectancy and the
# source of truth for profitability. hit_rate alone must not kill or demote
# a low-hit-rate but high-payoff signal (typical of breakout/trend setups),
# nor promote a high-hit-rate but net-losing one.
if total >= kill_min_samples and avg_pnl <= 0 and hit_rate < kill_hit_rate:
new_weight = 0.0
action = "淘汰"
elif avg_pnl <= -3 or (avg_pnl <= 0 and hit_rate < warn_hit_rate):
new_weight = round(max(0.0, old_weight * 0.5), 3)
action = "降权"
elif avg_pnl > 0 and (hit_rate >= 55 or avg_pnl >= 2):
target = round(min(4.0, max(old_weight, (hit_rate / 50) * base, (1 + avg_pnl / 5) * base)), 3)
if target > old_weight:
new_weight = target
action = "升权"
if action and abs(new_weight - old_weight) >= 0.001:
update_signal_weight(sig_type, new_weight)
changes.append({
"signal": sig_type,
"action": action,
"old_weight": round(old_weight, 4),
"new_weight": round(new_weight, 4),
"sample_size": total,
"hit_rate": round(hit_rate, 2),
"avg_pnl": round(avg_pnl, 2),
})
return changes
# ==================== 2.5 信号淘汰机制 ====================
def _deprecate_low_performance_signals():
"""生成低绩效信号淘汰/降权候选,不直接改权重或写 learned_rules。
低命中率信号属于候选惩罚规则,必须进入 strategy_rule_candidate经过 refresh/dry-run/发布闸门后才允许生效。
"""
deprecation_config = get_review_params().get("signal_deprecation", {})
if not deprecation_config.get("enabled", True):
return []
min_samples = deprecation_config.get("min_samples", 10)
hit_rate_deprecate_threshold = deprecation_config.get("hit_rate_deprecate_threshold", 0.10) * 100 # 转为百分比
hit_rate_warn_threshold = deprecation_config.get("hit_rate_warn_threshold", 0.20) * 100
min_tracking_days = deprecation_config.get("min_tracking_days", 3)
# 从 DB 获取信号绩效数据
weights = get_signal_weights()
now = datetime.now()
actions = []
for sig_type, data in weights.items():
total_count = data.get("total_count", 0)
hit_rate = data.get("hit_rate", 0)
current_weight = data.get("weight", 0)
last_updated = data.get("last_updated", "")
# 检查最小样本量
if total_count < min_samples:
continue
# 检查跟踪天数(最近更新距今是否 >= min_tracking_days
if last_updated:
try:
last_dt = datetime.fromisoformat(last_updated)
if (now - last_dt).days < min_tracking_days:
continue
except Exception:
pass
if hit_rate < hit_rate_deprecate_threshold:
desc = (
f"信号 [{sig_type}] 命中率{hit_rate}%<{hit_rate_deprecate_threshold}%"
f"候选淘汰/降权,样本{total_count}"
)
cid = upsert_strategy_rule_candidate(
source="signal_deprecation",
rule_type="penalty",
signal_name=sig_type,
rule_description=desc,
support_count=total_count,
success_count=int(data.get("hit_count", 0) or 0),
fail_count=int(data.get("miss_count", 0) or 0),
confidence_score=round(max(0, 100 - hit_rate), 1),
sample_size=total_count,
status="candidate",
notes="低绩效信号候选淘汰:只入候选池,不直接改权重/learned_rules",
source_ref=f"signal_deprecation:{sig_type}",
)
actions.append(f"⚠️ 信号淘汰候选: {sig_type}(命中率{hit_rate}%, 样本{total_count}, candidate#{cid})")
elif hit_rate < hit_rate_warn_threshold:
new_weight = max(0, int(current_weight // 2))
desc = f"信号 [{sig_type}] 命中率{hit_rate}%<{hit_rate_warn_threshold}%,候选降权 {current_weight}{new_weight},样本{total_count}"
cid = upsert_strategy_rule_candidate(
source="signal_deprecation",
rule_type="penalty",
signal_name=sig_type,
rule_description=desc,
support_count=total_count,
success_count=int(data.get("hit_count", 0) or 0),
fail_count=int(data.get("miss_count", 0) or 0),
confidence_score=round(max(0, 100 - hit_rate), 1),
sample_size=total_count,
status="candidate",
notes="低绩效信号候选降权:只入候选池,不直接改权重/learned_rules",
source_ref=f"signal_deprecation_warn:{sig_type}",
)
actions.append(f"⚠️ 信号降权候选: {sig_type}(命中率{hit_rate}%, {current_weight}{new_weight}, candidate#{cid})")
return actions
# ==================== 3. 漏选复盘 ====================
def scan_missed_explosions(now=None):
"""扫描过去24h内没被推荐但涨幅>20%的币(漏选复盘)"""
thresholds = _get_thresholds()
missed_pct = thresholds["missed_explosion_pct"]
now = now or datetime.now()
revision_started_at = _get_strategy_revision_started_at()
conn = get_conn()
# 获取过去24h所有推荐过的币
if revision_started_at:
recommended = conn.execute("""
SELECT symbol FROM recommendation
WHERE rec_time >= %s
AND rec_time >= %s
""", ((now - timedelta(days=1)).isoformat(), revision_started_at)).fetchall()
else:
recommended = conn.execute("""
SELECT symbol FROM recommendation
WHERE rec_time >= %s
""", ((now - timedelta(days=1)).isoformat(),)).fetchall()
recommended_symbols = set(r["symbol"] for r in recommended)
# 获取过去24h筛选过的币
if revision_started_at:
screened = conn.execute("""
SELECT symbol, state, score, signals FROM screening_log
WHERE layer='细筛' AND scan_time >= %s
AND scan_time >= %s
""", ((now - timedelta(days=1)).isoformat(), revision_started_at)).fetchall()
else:
screened = conn.execute("""
SELECT symbol, state, score, signals FROM screening_log
WHERE layer='细筛' AND scan_time >= %s
""", ((now - timedelta(days=1)).isoformat(),)).fetchall()
screened_info = {r["symbol"]: dict(r) for r in screened}
conn.close()
# 获取Binance所有USDT交易对的24h涨幅
try:
resp = requests.get(f"{BINANCE_API}/ticker/24hr", timeout=10)
if resp.status_code != 200:
return []
tickers = resp.json()
except Exception:
return []
missed = []
for t in tickers:
symbol_str = t["symbol"]
if not symbol_str.endswith("USDT"):
continue
base = symbol_str.replace("USDT", "")
if base in ("BTC", "ETH", "BNB", "USDT"):
continue
formatted = f"{base}/USDT"
change_pct = float(t["priceChangePercent"])
# 涨幅>阈值且没有被推荐过 → 漏选
if change_pct >= missed_pct and formatted not in recommended_symbols:
# 判断为什么漏选
if formatted in screened_info:
info = screened_info[formatted]
reason = f"细筛淘汰(state={info['state']}, score={info['score']})"
features = json.loads(info["signals"]) if isinstance(info["signals"], str) else info["signals"]
else:
reason = "粗筛未通过(涨幅或量价不达标)"
features = []
# 检查当前有什么特征
current_price = float(t["lastPrice"])
volume_24h = float(t["quoteVolume"])
detected_features = [f"24h涨{change_pct}%"]
if volume_24h > 0:
detected_features.append(f"24h成交额${volume_24h/1e6:.1f}M")
lesson = ""
if formatted in screened_info:
lesson = f"细筛给了score={screened_info[formatted]['score']}, 但实际涨幅{change_pct}%→评分标准可能有误"
else:
lesson = f"粗筛条件太严格,错过了{change_pct}%涨幅的币"
record_missed_explosion(
formatted, current_price, current_price / (1 + change_pct/100),
change_pct, reason, detected_features + features, lesson
)
missed.append({
"symbol": formatted,
"gain_pct": change_pct,
"reason": reason,
"lesson": lesson,
})
return missed
# ==================== 4. 规律提炼(跨多轮复盘) ====================
def _extract_rules_from_review():
"""
分析跨多轮review_log的模式
当同一信号组合在≥3次爆发案例中出现自动生成learned_rule
返回: [{rule_id, description, conditions, score_adjust}]
"""
conn = get_conn()
reviews = conn.execute("""
SELECT outcome, triggered_signals, hit_signals FROM review_log
WHERE outcome = '爆发'
ORDER BY review_time DESC LIMIT 100
""").fetchall()
conn.close()
if len(reviews) < 3:
return []
# 统计爆发案例中信号组合频率
combo_counter = Counter()
for r in reviews:
try:
triggered = json.loads(r["triggered_signals"]) if isinstance(r["triggered_signals"], str) else r["triggered_signals"]
except:
triggered = []
if not triggered:
continue
# 提取信号分类组合(前瞻+PA的组合最有价值
foresight = [s for s in triggered if get_signal_category(s) == "前瞻"]
pa_signals = [s for s in triggered if get_signal_category(s) == "PA"]
# 记录有价值的组合
if foresight and pa_signals:
# 用关键词作为组合标识
foresight_keys = sorted([_signal_key(s) for s in foresight])
pa_keys = sorted([_signal_key(s) for s in pa_signals])
combo = tuple(foresight_keys + pa_keys)
combo_counter[combo] += 1
# 也记录单个高频PA信号
for s in pa_signals:
combo_counter[(_signal_key(s),)] += 1
# 找到≥3次出现的组合 → 生成规律
new_rules = []
existing_rules = get_learned_rules()
for combo, count in combo_counter.most_common():
if count < 3:
continue
# 生成规则描述
sig_names = [_key_to_label(k) for k in combo]
description = f"爆发案例中{count}次出现{', '.join(sig_names)}组合 → 此信号组合预测爆发有效"
# 检查是否已有类似规则
already_exists = any(
er.get("description", "").startswith(description[:30])
for er in existing_rules
)
if not already_exists:
conditions = {"signal_combo": list(combo), "min_combo_count": 3}
rule = {
"type": "bonus",
"description": description,
"conditions": conditions,
"score_adjust": 2 if len(combo) >= 2 else 1,
"source": "review_pattern",
}
# 新体系:先进入候选规则池,不直接污染已发布策略。达到发布门槛后再升级为 active。
rule["candidate_id"] = upsert_strategy_rule_candidate(
source="review_pattern",
rule_type=rule.get("type", "bonus"),
signal_name="+".join(combo),
rule_description=description,
support_count=count,
success_count=count,
fail_count=0,
avg_pnl=0,
max_gain=0,
max_drawdown=0,
confidence_score=round(min(95, 45 + count * 8), 1),
sample_size=count,
status="candidate",
notes="由成功样本提炼,等待灰度验证/样本门槛",
)
new_rules.append(rule)
return new_rules
def _signal_key(signal_text):
"""提取信号关键词(简化信号文本用于组合标识)"""
key_map = {
"N倍放量": "vol_Nx",
"量价齐飞": "vp_fly",
"暴力突破供给区": "zone_break",
"静K→动K起爆": "ignition",
"1H放量(3.2倍)": "1h_vol",
"1H放量": "1h_vol",
"4H 3静K蓄力": "3sk_accum",
"静K蓄力": "sk_accum",
"连续3K多头加速": "cont3k",
"连续K加速": "cont_k",
"Q≥7供需区突破": "q7_break",
"供给区突破": "zone_break",
"动K起爆": "dyn_k",
}
for key, short in key_map.items():
if key in signal_text:
return short
# 未知信号取前6字符
return signal_text[:6]
def _key_to_label(key):
"""将信号关键词转换为可读标签"""
label_map = {
"vol_Nx": "N倍放量",
"vp_fly": "量价齐飞",
"zone_break": "供需区突破",
"ignition": "起爆点(静K→动K)",
"1h_vol": "1H放量",
"3sk_accum": "3静K蓄力",
"sk_accum": "静K蓄力",
"cont3k": "连续3K加速",
"cont_k": "连续K加速",
"q7_break": "Q≥7供需区突破",
"dyn_k": "动K起爆",
}
return label_map.get(key, key)
# ==================== 5. 主流程 ====================
def _compute_effect_summary(now, lookback_days=7):
conn = get_conn()
start_iso = (now - timedelta(days=lookback_days)).isoformat()
revision_started_at = _get_strategy_revision_started_at()
effective_start = revision_started_at if revision_started_at and revision_started_at > start_iso else start_iso
pnl_col = "pnl_48h"
rows = conn.execute("""
SELECT outcome, pnl_48h FROM review_log
WHERE review_time >= %s
ORDER BY review_time DESC
""", (effective_start,)).fetchall()
conn.close()
total = len(rows)
if total == 0:
return {
"window_days": lookback_days,
"review_count_window": 0,
"hit_rate_pct": 0,
"fail_rate_pct": 0,
"flat_rate_pct": 0,
"avg_pnl": 0,
}
hit = sum(1 for r in rows if r["outcome"] == "爆发")
fail = sum(1 for r in rows if r["outcome"] == "失败")
flat = sum(1 for r in rows if r["outcome"] == "横盘")
pnl_values = [float(r[pnl_col] or 0) for r in rows] if pnl_col else []
return {
"window_days": lookback_days,
"review_count_window": total,
"hit_rate_pct": round(hit / total * 100, 1),
"fail_rate_pct": round(fail / total * 100, 1),
"flat_rate_pct": round(flat / total * 100, 1),
"avg_pnl": round(sum(pnl_values) / len(pnl_values), 2) if pnl_values else 0,
}
def _scan_stable_fiat_pollution(now, lookback_days=7):
conn = get_conn()
start_iso = (now - timedelta(days=lookback_days)).isoformat()
revision_started_at = _get_strategy_revision_started_at()
effective_start = max(start_iso, revision_started_at) if revision_started_at else start_iso
watch_bases = sorted(set(
list(getattr(reverse_analysis, "STABLECOINS", set()) or [])
+ list(getattr(reverse_analysis, "EXCLUDED_BASES", set()) or [])
))
suffixes = tuple(getattr(reverse_analysis, "EXCLUDED_BASE_SUFFIXES", tuple()) or tuple())
screening_rows = conn.execute(
"SELECT layer, symbol, scan_time FROM screening_log WHERE scan_time >= %s ORDER BY scan_time DESC",
(effective_start,),
).fetchall()
recommendation_rows = conn.execute(
"SELECT symbol, rec_time FROM recommendation WHERE rec_time >= %s ORDER BY rec_time DESC",
(effective_start,),
).fetchall()
conn.close()
def classify(symbol):
if not symbol or "/" not in symbol:
return None
base = symbol.split("/")[0].strip().upper()
if base in watch_bases:
return "explicit"
if suffixes and base.endswith(suffixes):
return "suffix"
return None
screening_hits = []
layer_counts = defaultdict(int)
for row in screening_rows:
symbol = row["symbol"]
reason = classify(symbol)
if not reason:
continue
hit = {
"symbol": symbol,
"layer": row["layer"],
"time": row["scan_time"],
"reason": reason,
}
screening_hits.append(hit)
layer_counts[row["layer"] or "未知"] += 1
recommendation_hits = []
for row in recommendation_rows:
symbol = row["symbol"]
reason = classify(symbol)
if not reason:
continue
recommendation_hits.append({
"symbol": symbol,
"time": row["rec_time"],
"reason": reason,
})
unique_screening_symbols = sorted({item["symbol"] for item in screening_hits})
unique_recommendation_symbols = sorted({item["symbol"] for item in recommendation_hits})
contaminated_symbols = sorted(set(unique_screening_symbols) | set(unique_recommendation_symbols))
return {
"window_days": lookback_days,
"effective_start": effective_start,
"screening_hit_count": len(screening_hits),
"recommendation_hit_count": len(recommendation_hits),
"contaminated_symbol_count": len(contaminated_symbols),
"screening_symbols": unique_screening_symbols,
"recommendation_symbols": unique_recommendation_symbols,
"contaminated_symbols": contaminated_symbols,
"layer_counts": dict(layer_counts),
"screening_examples": screening_hits[:10],
"recommendation_examples": recommendation_hits[:10],
"status": "clean" if not contaminated_symbols else "polluted",
}
def _safe_json_load(value, default=None):
if default is None:
default = []
if value is None:
return default
if isinstance(value, (list, dict)):
return value
try:
return json.loads(value)
except Exception:
return default
def _classify_failure_type(review):
"""失败样本二级归因。
重点防止“历史事件冒充当前信号”:凡出现过期/历史/旧放量/age_bars等提示
优先归为过期因子误判,后续作为策略质量治理的高优先级失败模式。
"""
signals = review.get("triggered_signals") or []
miss = review.get("miss_signals") or []
lesson = review.get("lesson") or ""
text = " ".join([str(x) for x in signals + miss]) + " " + lesson
pnl = float(review.get("pnl_48h") or 0)
outcome = review.get("outcome") or ""
if any(k in text for k in ["过期", "历史", "旧放量", "age_bars", "已过期", "小时前", "旧起爆"]):
return "过期因子误判", "历史放量/起爆/突破不能当作当前触发信号,必须做时效闸门"
if any(k in text for k in ["假突破", "突破失败", "未站稳", "冲高回落"]):
return "假突破", "突破后没有站稳或快速回落,需要增加站稳/承接确认"
if any(k in text for k in ["量价背离", "缩量上涨", "放量下跌", "无量拉升"]):
return "量价背离", "价格动作与成交量不匹配,量能确认不足"
if any(k in text for k in ["高位", "追高", "涨幅过大", "乖离"]):
return "追高风险", "入场位置偏高,盈亏比和回撤风险恶化"
if any(k in text for k in ["承接不足", "无承接", "上影线", "砸盘"]):
return "高位无承接", "高位出现抛压但缺少买盘承接"
if any(k in text for k in ["板块退潮", "热点退潮", "龙头走弱", "板块分歧"]):
return "板块退潮", "板块热度回落,个币信号容易失效"
if any(k in text for k in ["BTC", "大盘", "反向共振", "系统性"]):
return "BTC/大盘反向共振", "大盘方向与个币信号冲突,需要宏观/主流币过滤"
if any(k in text for k in ["止损", "盈亏比", "RR", "止盈"]):
return "止损/盈亏比不合理", "止损或止盈结构不合理,导致信号收益风险不匹配"
if "滞后" in text or "MACD" in text or "RSI" in text:
return "滞后信号追高", "滞后指标占比高,容易形成事后确认/追高失败"
if "缺乏前瞻" in text or "前瞻" not in text:
return "前瞻信号不足", "缺少量价/PA等前瞻性确认"
if "横盘" in lesson or outcome == "横盘":
return "信号强度不足", "触发后未形成有效爆发,确认条件偏弱"
if "回撤" in text or pnl < -3:
return "入场点太晚", "入场后回撤/亏损明显,买点可能滞后或确认过慢"
return "未分类失败", "需要继续积累样本做二级归因"
def _build_dual_attribution(results, effect_summary):
"""双向归因:成功因子 + 失败原因,并生成候选规则。"""
reviews = results.get("review_details") or []
success_reviews = [r for r in reviews if r.get("outcome") == "爆发"]
failure_reviews = [r for r in reviews if r.get("outcome") in ("失败", "横盘")]
signal_success = Counter()
signal_failure = Counter()
failure_types = Counter()
candidate_rules = []
for r in success_reviews:
for sig in r.get("hit_signals") or r.get("triggered_signals") or []:
signal_success[_signal_key(str(sig))] += 1
for r in failure_reviews:
ftype, reason = _classify_failure_type(r)
failure_types[ftype] += 1
symbol = r.get("symbol") or ""
record_strategy_failure_pattern(
symbol=symbol,
version="",
failure_type=ftype,
failure_reason=reason,
signal_combo=r.get("triggered_signals") or [],
market_context={"outcome": r.get("outcome")},
entry_quality_issue=reason,
pnl_pct=float(r.get("pnl_48h") or 0),
max_drawdown_pct=0,
lesson=r.get("lesson") or "",
)
for sig in r.get("miss_signals") or r.get("triggered_signals") or []:
signal_failure[_signal_key(str(sig))] += 1
for key, cnt in signal_success.most_common(8):
fail_cnt = signal_failure.get(key, 0)
sample = cnt + fail_cnt
if sample < 2:
continue
confidence = round((cnt / sample) * 100, 1)
desc = f"成功样本中高频因子:{_key_to_label(key)},成功{cnt}次/失败{fail_cnt}"
cid = upsert_strategy_rule_candidate(
source="dual_attribution_success",
rule_type="bonus",
signal_name=key,
rule_description=desc,
support_count=sample,
success_count=cnt,
fail_count=fail_cnt,
confidence_score=confidence,
sample_size=sample,
status="candidate" if sample < 10 or confidence < 65 else "gray",
notes="双向归因生成:成功因子需和失败样本对照,避免只看成功过拟合",
source_ref=f"review:{key}",
)
candidate_rules.append({"id": cid, "type": "bonus", "signal": key, "description": desc, "confidence_score": confidence, "sample_size": sample, "status": "candidate" if sample < 10 or confidence < 65 else "gray"})
for ftype, cnt in failure_types.most_common(6):
desc = f"失败模式:{ftype} 出现{cnt}次,需要降低同类触发权重或增加确认条件"
cid = upsert_strategy_rule_candidate(
source="dual_attribution_failure",
rule_type="penalty",
signal_name=ftype,
rule_description=desc,
support_count=cnt,
success_count=0,
fail_count=cnt,
confidence_score=round(min(90, 45 + cnt * 10), 1),
sample_size=cnt,
status="candidate",
notes="失败归因生成:先入候选池,不立即改已发布策略",
source_ref=f"failure:{ftype}",
)
candidate_rules.append({"id": cid, "type": "penalty", "signal": ftype, "description": desc, "confidence_score": round(min(90, 45 + cnt * 10), 1), "sample_size": cnt, "status": "candidate"})
success_analysis = {
"sample_count": len(success_reviews),
"top_success_factors": [{"signal": k, "label": _key_to_label(k), "count": v} for k, v in signal_success.most_common(10)],
}
failure_analysis = {
"sample_count": len(failure_reviews),
"failure_types": [{"type": k, "count": v} for k, v in failure_types.most_common(10)],
"top_failure_signals": [{"signal": k, "label": _key_to_label(k), "count": v} for k, v in signal_failure.most_common(10)],
}
resolved = len(success_reviews) + len(failure_reviews)
hit_rate = (effect_summary or {}).get("hit_rate_pct", 0)
avg_pnl = (effect_summary or {}).get("avg_pnl", 0)
if resolved < 20:
release_decision = "hold"
release_reason = f"有效样本{resolved}<20本轮只研究不发布避免样本不足乱升版"
confidence_level = "low"
promotion_state = "research_only"
elif hit_rate >= 55 and avg_pnl > 0 and candidate_rules:
release_decision = "gray"
release_reason = f"有效样本{resolved},命中率{hit_rate}%,均值{avg_pnl}%,候选规则进入灰度观察"
confidence_level = "medium"
promotion_state = "gray"
else:
release_decision = "hold"
release_reason = f"有效样本{resolved},但命中率/收益或候选规则稳定性不足,继续研究"
confidence_level = "medium" if resolved >= 20 else "low"
promotion_state = "research_only"
return {
"success_analysis": success_analysis,
"failure_analysis": failure_analysis,
"candidate_rules": candidate_rules,
"candidate_performance": [],
"release_decision": release_decision,
"release_reason": release_reason,
"confidence_level": confidence_level,
"promotion_state": promotion_state,
}
def _build_iteration_log(results, current_meta, now, config_diff=None, effect_summary=None, pollution_summary=None):
hit_count = sum(1 for r in results.get("review_details", []) if r.get("outcome") == "爆发")
fail_count = sum(1 for r in results.get("review_details", []) if r.get("outcome") == "失败")
flat_count = sum(1 for r in results.get("review_details", []) if r.get("outcome") == "横盘")
insufficient_count = sum(1 for r in results.get("review_details", []) if r.get("outcome") == "样本不足")
effective_review_count = hit_count + fail_count + flat_count
findings = []
problems = []
actions = []
changed_rules = []
related_symbols = []
strategy_version = str(current_meta.get("strategy_version") or "").strip()
version_change_parts = []
for item in results.get("review_details", [])[:12]:
symbol = item.get("symbol")
if symbol:
related_symbols.append(symbol)
lesson = item.get("lesson")
if lesson and len(findings) < 6:
findings.append(f"{symbol}: {lesson}")
if item.get("outcome") == "失败" and symbol and len(problems) < 5:
problems.append(f"{symbol} 复盘结果为失败,需检查触发信号是否偏滞后或追高")
elif item.get("outcome") == "横盘" and symbol and len(problems) < 5:
problems.append(f"{symbol} 仅横盘,说明信号强度不足或确认条件不够")
elif item.get("outcome") == "样本不足" and symbol and len(problems) < 5:
problems.append(f"{symbol} 缺少48h price_tracking窗口已跳过绩效计权")
for adj in results.get("weight_adjustments", [])[:8]:
actions.append(adj)
changed_rules.append({"type": "weight_adjustment", "detail": adj})
if len(version_change_parts) < 4:
version_change_parts.append(f"权重调整:{adj}")
for upd in results.get("factor_weight_updates", [])[:8]:
detail = (
f"{upd.get('signal')}: {upd.get('action')} "
f"{upd.get('old_weight')}{upd.get('new_weight')} "
f"(样本{upd.get('sample_size')}, 命中{upd.get('hit_rate')}%, 均值{upd.get('avg_pnl')}%)"
)
actions.append(detail)
changed_rules.append({"type": "factor_weight_governance", **upd})
if len(version_change_parts) < 4:
version_change_parts.append(f"因子{upd.get('action')}{upd.get('signal')}")
for rule in results.get("new_learned_rules", [])[:6]:
desc = rule.get("description", "")
if desc:
actions.append(f"新增规律: {desc}")
changed_rules.append({
"type": "learned_rule",
"rule_id": rule.get("rule_id"),
"description": desc,
"score_adjust": rule.get("score_adjust"),
})
if len(version_change_parts) < 4:
version_change_parts.append(f"新增规律:{desc}")
# 信号淘汰/降权操作
for dep_action in results.get("signal_deprecations", [])[:6]:
actions.append(dep_action)
changed_rules.append({"type": "signal_deprecation", "detail": dep_action})
if len(version_change_parts) < 4:
version_change_parts.append(f"信号淘汰:{dep_action}")
reverse_results = results.get("reverse_analysis") or {}
if reverse_results and not reverse_results.get("error"):
summary = reverse_results.get("summary")
if summary:
findings.append(f"逆向分析: {summary}")
if reverse_results.get("new_rules"):
for rule in reverse_results.get("new_rules", [])[:4]:
desc = rule.get("description", "")
if desc:
actions.append(f"逆向分析新增规律: {desc}")
changed_rules.append({
"type": "reverse_rule",
"rule_id": rule.get("rule_id"),
"description": desc,
})
if len(version_change_parts) < 4:
version_change_parts.append(f"逆向分析规律:{desc}")
elif reverse_results.get("error"):
problems.append(f"逆向分析失败: {reverse_results.get('error')}")
for item in results.get("missed_explosions", [])[:5]:
symbol = item.get("symbol")
if symbol:
related_symbols.append(symbol)
problems.append(f"漏选 {symbol}: {item.get('reason') or '未记录原因'}")
lesson = item.get("lesson")
if lesson and len(findings) < 8:
findings.append(f"{symbol}: {lesson}")
if not findings:
findings.append("本轮暂无新增高价值发现,更多是常规复盘巡检。")
if not problems:
problems.append("本轮未发现显著新问题,但仍需继续观察样本规模是否足够。")
if not actions:
actions.append("本轮未触发显式参数改动,先继续积累样本。")
diff = config_diff or {"changed": [], "added": [], "removed": []}
if not version_change_parts:
changed_paths = [item.get("path") for item in (diff.get("changed") or []) if item.get("path") and not str(item.get("path")).startswith("meta.")]
added_paths = [item.get("path") for item in (diff.get("added") or []) if item.get("path") and not str(item.get("path")).startswith("meta.")]
removed_paths = [item.get("path") for item in (diff.get("removed") or []) if item.get("path") and not str(item.get("path")).startswith("meta.")]
if changed_paths:
version_change_parts.append(f"参数改动:{', '.join(changed_paths[:3])}")
if added_paths:
version_change_parts.append(f"新增配置:{', '.join(added_paths[:3])}")
if removed_paths:
version_change_parts.append(f"移除配置:{', '.join(removed_paths[:3])}")
version_change_summary = f"{strategy_version or '当前版本'}" + "".join(version_change_parts[:4]) if version_change_parts else "本轮暂无显式规则改动,继续积累样本"
title = f"{current_meta.get('iteration_count', 0) + 1}轮复盘迭代"
summary = results.get("summary") or ""
metrics = {
"reviews_done": results.get("reviews_done", 0),
"effective_review_count": effective_review_count,
"hit_count": hit_count,
"fail_count": fail_count,
"flat_count": flat_count,
"insufficient_tracking_count": insufficient_count,
"missed_explosions": len(results.get("missed_explosions", [])),
"weight_adjustments": len(results.get("weight_adjustments", [])),
"factor_weight_updates": len(results.get("factor_weight_updates", [])),
"signal_deprecations": len(results.get("signal_deprecations", [])),
"candidate_rules": len(results.get("candidate_rules", [])),
"new_learned_rules": 0,
}
if pollution_summary:
contaminated_count = int(pollution_summary.get("contaminated_symbol_count") or 0)
screening_hits = int(pollution_summary.get("screening_hit_count") or 0)
recommendation_hits = int(pollution_summary.get("recommendation_hit_count") or 0)
metrics["stable_fiat_pollution_symbols"] = contaminated_count
metrics["stable_fiat_screening_hits"] = screening_hits
metrics["stable_fiat_recommendation_hits"] = recommendation_hits
if contaminated_count > 0:
polluted_symbols = pollution_summary.get("contaminated_symbols") or []
symbol_preview = ", ".join(polluted_symbols[:4]) if polluted_symbols else "--"
findings.append(f"稳定币/法币污染巡检: 窗口内发现 {contaminated_count} 个污染币对,示例 {symbol_preview}")
problems.append(f"稳定币/法币污染仍存在:筛选命中 {screening_hits} 次,推荐命中 {recommendation_hits}")
actions.append("已将稳定币/法币污染巡检写入 iteration log后续可直接在复盘页查看")
if len(version_change_parts) < 4:
version_change_parts.append("新增稳定币/法币污染巡检日志")
else:
findings.append("稳定币/法币污染巡检通过:窗口内未发现污染币对进入筛选或推荐")
return {
"run_date": now.date().isoformat(),
"trigger_source": "daily_review",
"title": title,
"summary": summary,
"findings": findings[:8],
"problems": problems[:8],
"actions": actions[:10],
"changed_rules": changed_rules[:12],
"metrics": metrics,
"related_symbols": list(dict.fromkeys(related_symbols))[:12],
"config_diff": diff,
"effect_summary": effect_summary or {},
"pollution_summary": pollution_summary or {},
"strategy_version": strategy_version,
"version_change_summary": version_change_summary,
}
def _release_candidate_rules_if_ready(dual_attribution, effect_summary):
"""正式发布闸门:只有样本和效果达标,才把灰度候选写入 learned_rules 并升版。"""
candidates = get_strategy_rule_candidates(limit=30, status="gray")
resolved = int((dual_attribution.get("success_analysis") or {}).get("sample_count", 0)) + int((dual_attribution.get("failure_analysis") or {}).get("sample_count", 0))
hit_rate = float((effect_summary or {}).get("hit_rate_pct") or 0)
avg_pnl = float((effect_summary or {}).get("avg_pnl") or 0)
releaseable = [
c for c in candidates
if int(c.get("sample_size") or 0) >= 10
and float(c.get("confidence_score") or 0) >= 65
and not str(c.get("source") or "").startswith("dirty_history")
and "dirty_history" not in str(c.get("source_ref") or "")
and "污染历史" not in str(c.get("notes") or "")
]
if resolved < 20 or hit_rate < 55 or avg_pnl <= 0 or not releaseable:
return {
"released": False,
"release_decision": dual_attribution.get("release_decision") or "hold",
"release_reason": dual_attribution.get("release_reason") or f"发布闸门未通过:有效样本{resolved}、命中率{hit_rate}%、均值{avg_pnl}%、可发布候选{len(releaseable)}",
"released_rules": [],
"new_version": "",
}
note = f"候选规则正式发布{len(releaseable[:5])}条;样本{resolved};命中率{hit_rate}%;均值{avg_pnl}%"
old_ver, new_ver = bump_strategy_patch_version(note)
released = []
for cand in releaseable[:5]:
rule_id = promote_candidate_rule_to_learned_rule(cand, release_version=new_ver)
update_strategy_rule_candidate_status(cand["id"], "active", release_version=new_ver, notes_append=f"发布闸门通过,{old_ver}{new_ver}learned_rule={rule_id}")
released.append({"candidate_id": cand["id"], "rule_id": rule_id, "description": cand.get("rule_description"), "new_version": new_ver})
return {
"released": True,
"release_decision": "release",
"release_reason": note,
"released_rules": released,
"new_version": new_ver,
}
def _iteration_log_dual_fields(dual_attribution):
"""Keep only fields supported by log_strategy_iteration()."""
allowed = {
"success_analysis",
"failure_analysis",
"candidate_rules",
"release_decision",
"release_reason",
"confidence_level",
"promotion_state",
}
return {k: v for k, v in (dual_attribution or {}).items() if k in allowed}
def run_review(push_enabled: bool = True, compact: bool = False):
"""执行完整复盘流程(增强版 — 含逆向分析 + 飞书推送 + 规律提炼)"""
before_rules = get_rules_snapshot()
now = datetime.now()
# 找出超过24h的推荐可以复盘了
reviewable = _get_reviewable_recommendations(now)
results = {
"reviews_done": 0,
"review_details": [],
"weight_adjustments": [],
"factor_weight_updates": [],
"signal_deprecations": [],
"missed_explosions": [],
"new_learned_rules": [],
"candidate_rules": [],
"candidate_performance": [],
"reverse_analysis": None,
"summary": "",
}
# 1. 推荐归因复盘
for rec in reviewable:
review = review_recommendation(dict(rec))
results["review_details"].append(review)
results["reviews_done"] += 1
# 2. 信号权重调整
results["weight_adjustments"] = adjust_signal_weights()
results["factor_weight_updates"] = _apply_daily_factor_weight_governance()
# 2.5 信号淘汰机制(低命中率信号自动淘汰/降权)
results["signal_deprecations"] = _deprecate_low_performance_signals()
# 3. 漏选复盘
results["missed_explosions"] = scan_missed_explosions()
# 4. 规律提炼(跨多轮复盘)
new_pattern_rules = _extract_rules_from_review()
results["candidate_rules"] = new_pattern_rules
results["new_learned_rules"] = []
# 5. 逆向分析(涨幅榜复盘)
try:
reverse_results = reverse_analysis.run_reverse_analysis()
results["reverse_analysis"] = reverse_results
except Exception as e:
print(f"[review_engine] 逆向分析失败: {e}")
results["reverse_analysis"] = {"error": str(e)}
# 5.5 新体系reverse_analysis.discover_new_rules 已经把候选写入 DB。
# 这里仅保留结果用于报告/推送,避免同一涨幅榜共性被二次 upsert 污染候选池。
reverse_new_rules = (results.get("reverse_analysis") or {}).get("new_rules", []) or []
# 6. 生成总结
hit_count = sum(1 for r in results["review_details"] if r["outcome"] == "爆发")
fail_count = sum(1 for r in results["review_details"] if r["outcome"] == "失败")
flat_count = sum(1 for r in results["review_details"] if r["outcome"] == "横盘")
insufficient_count = sum(1 for r in results["review_details"] if r["outcome"] == "样本不足")
effective_review_count = hit_count + fail_count + flat_count
# 信号绩效汇总
weights = get_signal_weights()
sig_summary = []
for sig, data in sorted(weights.items(), key=lambda x: x[1]["hit_rate"], reverse=True):
sig_summary.append(f"{sig}({data['category']}): 命中{data['hit_rate']}% 权重{data['weight']}")
results["summary"] = (
f"本次复盘{results['reviews_done']}条推荐: "
f"有效计权{effective_review_count}条,样本不足{insufficient_count}条;"
f"爆发{hit_count} 横盘{flat_count} 失败{fail_count} | "
f"漏选爆发{len(results['missed_explosions'])}只 | "
f"权重调整{len(results['weight_adjustments'])}项 | "
f"因子生效调整{len(results['factor_weight_updates'])}项 | "
f"信号淘汰{len(results['signal_deprecations'])}项 | "
f"候选规则{len(results.get('candidate_rules', []))}\n"
f"信号绩效排名: {', '.join(sig_summary[:5])}"
)
# 信号淘汰详情追加到 summary
if results["signal_deprecations"]:
results["summary"] += (
f"\n信号淘汰: {'; '.join(results['signal_deprecations'][:5])}"
)
# 8. 更新meta迭代元数据
update_meta("last_review", now.isoformat())
meta = get_review_params() # 先读当前meta
# 通过config_loader更新迭代计数
current_meta = {}
try:
from app.config.config_loader import get_meta
current_meta = get_meta()
except:
pass
after_rules = get_rules_snapshot()
config_diff = diff_rule_snapshots(before_rules, after_rules)
effect_summary = _compute_effect_summary(now, lookback_days=7)
pollution_summary = _scan_stable_fiat_pollution(now, lookback_days=7)
dual_attribution = _build_dual_attribution(results, effect_summary)
candidate_performance = refresh_strategy_candidate_performance()
results["candidate_performance"] = candidate_performance
release_gate = _release_candidate_rules_if_ready(dual_attribution, effect_summary)
if release_gate.get("released"):
current_meta = get_meta()
after_rules = get_rules_snapshot()
config_diff = diff_rule_snapshots(before_rules, after_rules)
iteration_log = _build_iteration_log(
results,
current_meta,
now,
config_diff=config_diff,
effect_summary=effect_summary,
pollution_summary=pollution_summary,
)
iteration_log.update(_iteration_log_dual_fields(dual_attribution))
iteration_log["release_decision"] = release_gate.get("release_decision") or dual_attribution.get("release_decision")
iteration_log["release_reason"] = release_gate.get("release_reason") or dual_attribution.get("release_reason")
if release_gate.get("released"):
iteration_log["strategy_version"] = release_gate.get("new_version") or iteration_log.get("strategy_version")
iteration_log["version_change_summary"] = release_gate.get("release_reason")
iteration_log["changed_rules"] = (iteration_log.get("changed_rules") or []) + [{"type": "candidate_release", **r} for r in release_gate.get("released_rules", [])]
iteration_log["actions"] = (iteration_log.get("actions") or []) + [f"正式发布候选规则:{len(release_gate.get('released_rules', []))}"]
elif dual_attribution.get("release_decision") == "hold":
iteration_log["version_change_summary"] = dual_attribution.get("release_reason") or iteration_log.get("version_change_summary")
iteration_log["changed_rules"] = []
iteration_log["actions"] = (iteration_log.get("actions") or []) + ["候选规则已入池,本轮未正式发布新版本"]
log_strategy_iteration(**iteration_log)
total_reviews = current_meta.get("total_reviews", 0) + 1
update_meta("total_reviews", total_reviews)
iteration_count = current_meta.get("iteration_count", 0) + 1
update_meta("iteration_count", iteration_count)
if compact:
print(json.dumps(results, ensure_ascii=False))
else:
print(json.dumps(results, ensure_ascii=False, indent=2))
return results
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="AlphaX Agent 复盘引擎")
parser.add_argument("--no-push", action="store_true", help="只运行复盘,不发送飞书通知")
parser.add_argument("--compact", action="store_true", help="输出紧凑 JSON便于脚本消费")
args = parser.parse_args()
run_review(push_enabled=not args.no_push, compact=args.compact)