alphax/app/analysis/reverse_analysis.py
2026-05-29 10:09:30 +08:00

669 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
逆向分析模块 — 从涨幅榜复盘,提取起爆前共性特征,发现新规律
核心逻辑:
1. 拉Binance 24h涨幅榜Top N把“已经涨起来的币”只作为事后标签。
2. 对未被推荐的暴涨币截掉最近24h起爆段只回溯其起爆前/启动点K线。
3. 用full_pa_analysis()提取涨前特征连续K、起爆点、供需区(Q≥7)、静K蓄力、量价模式。
4. 检查板块联动(同板块是否有其他币也暴涨),并用未大涨高成交额样本做对照组。
5. 统计涨前共性特征占比和对照组lift只生成候选规则不能直接变成追涨买入依据。
6. 返回结构化结果供feishu推送和策略迭代中心展示。
"""
import json
import os
import sys
import time
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import requests
import pandas as pd
sys.path.insert(0, os.path.dirname(__file__))
from app.db.altcoin_db import get_conn, record_missed_explosion, upsert_strategy_rule_candidate
from app.core.pa_engine import full_pa_analysis, classify_candles, calc_atr
from app.core.sector_map import get_sector_for_coin, COIN_TO_SECTORS, SECTOR_MEMBERS
from app.config import config_loader
from app.services.altcoin_screener import (
STABLECOINS,
WRAPPED,
GOLD_METAL,
EXCLUDED_BASES,
EXCLUDED_BASE_SUFFIXES,
)
BINANCE_API = "https://api.binance.com/api/v3"
FEATURE_LABELS = {
"has_ignition_point": "起爆点(静K→动K转折)",
"has_q7_zone": "Q≥7高质量供需区",
"has_q7_demand_nearby": "Q≥7需求区在起爆价附近(<3%)",
"has_continuous_k": "连续K多头加速",
"has_static_accumulation": "静K蓄力(占比≥15%)",
"has_volume_surge_before": "起爆前2倍+放量",
"has_bullish_breakout_pattern": "连续5K收盘价抬高",
}
def _is_altcoin_usdt_symbol(symbol_str):
if not symbol_str or not symbol_str.endswith("USDT"):
return False
base = symbol_str.replace("USDT", "").upper()
if base in STABLECOINS or base in WRAPPED or base in GOLD_METAL:
return False
if base in EXCLUDED_BASES or base.endswith(EXCLUDED_BASE_SUFFIXES):
return False
return base.isascii()
# ==================== 数据获取 ====================
def fetch_24h_tickers():
"""获取Binance所有USDT交易对的24h行情"""
try:
resp = requests.get(f"{BINANCE_API}/ticker/24hr", timeout=15)
if resp.status_code != 200:
return []
return resp.json()
except Exception as e:
print(f"[reverse_analysis] fetch_24h_tickers failed: {e}")
return []
def fetch_klines_before(symbol, lookback_hours=72, interval="1h"):
"""
获取起爆前的K线数据
取最近(lookback_hours+24)根1H K线截掉最后24h起爆段只看起爆前的蓄力期
"""
total_limit = lookback_hours + 24 # 起爆前 + 起爆段
try:
resp = requests.get(f"{BINANCE_API}/klines", params={
"symbol": symbol.replace("/", ""),
"interval": interval,
"limit": total_limit,
}, timeout=10)
if resp.status_code != 200:
return [], []
raw = resp.json()
klines = [{"time": k[0], "open": float(k[1]), "high": float(k[2]),
"low": float(k[3]), "close": float(k[4]), "volume": float(k[5])}
for k in raw]
# 分割前lookback_hours是起爆前后24h是起爆段
explosion_start = max(0, len(klines) - 24)
pre_explosion = klines[:explosion_start]
explosion_segment = klines[explosion_start:]
return pre_explosion, explosion_segment
except Exception as e:
print(f"[reverse_analysis] fetch_klines_before({symbol}) failed: {e}")
return [], []
def get_recommended_symbols(hours=72):
"""获取过去N小时内被推荐过的币种列表"""
conn = get_conn()
cutoff = (datetime.now() - timedelta(hours=float(hours or 72))).isoformat()
rows = conn.execute("""
SELECT symbol FROM recommendation
WHERE rec_time >= %s
""", (cutoff,)).fetchall()
conn.close()
return set(r["symbol"] for r in rows)
# ==================== 特征提取 ====================
def extract_pre_explosion_features(symbol, pre_klines, explosion_klines, config):
"""
对起爆前K线做full_pa_analysis提取特征字典
"""
features = {
"has_ignition_point": False,
"ignition_count": 0,
"ignition_details": [],
"has_q7_zone": False,
"q7_zones": [],
"q_max": 0,
"has_continuous_k": False,
"continuous_k_count": 0,
"continuous_k_details": [],
"has_static_accumulation": False,
"static_k_count": 0,
"static_ratio": 0.0,
"has_volume_surge_before": False,
"vol_surge_ratio": 0.0,
"has_bullish_breakout_pattern": False,
}
if not pre_klines or len(pre_klines) < 30:
return features
# 转DataFrame做PA分析
df = pd.DataFrame(pre_klines)
df["time"] = pd.to_datetime(df["time"], unit="ms")
pa_result = full_pa_analysis(df, timeframe="1h")
feature_config = config.get("feature_extraction", {})
# 1. 起爆点特征
if feature_config.get("check_ignition", True):
ignition_points = pa_result.get("ignition_points", [])
bullish_ignitions = [ip for ip in ignition_points if ip["direction"] == 1]
features["has_ignition_point"] = len(bullish_ignitions) > 0
features["ignition_count"] = len(bullish_ignitions)
features["ignition_details"] = bullish_ignitions
# 2. 供需区特征特别是Q≥7
if feature_config.get("check_supply_demand", True):
zones = pa_result.get("zones", [])
q7_zones = [z for z in zones if z["q_score"] >= 7]
demand_q7 = [z for z in q7_zones if z["type"] == "demand"]
features["has_q7_zone"] = len(q7_zones) > 0
features["q7_zones"] = q7_zones
features["q_max"] = max((z["q_score"] for z in zones), default=0)
# 特别标注是否有Q≥7需求区在起爆前价格附近
if demand_q7 and len(pre_klines) > 0:
last_price = pre_klines[-1]["close"]
nearby_demand = [z for z in demand_q7 if abs(z["top"] - last_price) / last_price < 0.03]
features["has_q7_demand_nearby"] = len(nearby_demand) > 0
# 3. 连续K加速特征
if feature_config.get("check_continuous_k", True):
continuous_k = pa_result.get("continuous_k", [])
bullish_cont = [ck for ck in continuous_k if ck["type"] == "bullish_continue"]
features["has_continuous_k"] = len(bullish_cont) > 0
features["continuous_k_count"] = len(bullish_cont)
features["continuous_k_details"] = bullish_cont
# 4. 静K蓄力特征
candles_class = pa_result.get("candles_class", [])
static_ks = [c for c in candles_class if c["type"] == "static"]
features["static_k_count"] = len(static_ks)
features["static_ratio"] = len(static_ks) / len(candles_class) if candles_class else 0
features["has_static_accumulation"] = features["static_ratio"] >= 0.15 # 静K占比≥15%
# 5. 起爆前放量特征
if feature_config.get("check_volume_pattern", True) and len(pre_klines) >= 20:
recent_10 = pre_klines[-10:]
older_10 = pre_klines[-20:-10]
avg_recent_vol = sum(k["volume"] for k in recent_10) / len(recent_10)
avg_older_vol = sum(k["volume"] for k in older_10) / len(older_10) if older_10 else 1
vol_surge_ratio = avg_recent_vol / avg_older_vol if avg_older_vol > 0 else 0
features["has_volume_surge_before"] = vol_surge_ratio >= 2.0
features["vol_surge_ratio"] = round(vol_surge_ratio, 2)
# 6. 多头突破模式最后几根K线close连续抬高
if len(pre_klines) >= 5:
last_5_closes = [k["close"] for k in pre_klines[-5:]]
bullish_breakout = all(last_5_closes[i] > last_5_closes[i - 1] for i in range(1, len(last_5_closes)))
features["has_bullish_breakout_pattern"] = bullish_breakout
# 7. 4H 和 1D 多周期分析从config读取lookback_hours默认168/720
lookback_4h = config.get("lookback_hours_4h", 168)
lookback_1d = config.get("lookback_hours_1d", 720)
features["has_4h_ignition"] = False
features["has_daily_demand_zone"] = False
features["daily_trend_up"] = False
try:
# 4H K线分析
pre_4h, _ = fetch_klines_before(symbol, lookback_4h, interval="4h")
if pre_4h and len(pre_4h) >= 8:
df_4h = pd.DataFrame(pre_4h)
df_4h["time"] = pd.to_datetime(df_4h["time"], unit="ms")
pa_4h = full_pa_analysis(df_4h, timeframe="4h")
ignition_4h = pa_4h.get("ignition_points", [])
features["has_4h_ignition"] = any(ip["direction"] == 1 for ip in ignition_4h)
except Exception:
pass
try:
# 1D K线分析
pre_1d, _ = fetch_klines_before(symbol, lookback_1d, interval="1d")
if pre_1d and len(pre_1d) >= 5:
df_1d = pd.DataFrame(pre_1d)
df_1d["time"] = pd.to_datetime(df_1d["time"], unit="ms")
pa_1d = full_pa_analysis(df_1d, timeframe="1d")
zones_1d = pa_1d.get("zones", [])
demand_zones_1d = [z for z in zones_1d if z["type"] == "demand" and z["q_score"] >= 5]
features["has_daily_demand_zone"] = len(demand_zones_1d) > 0
# 日线趋势最后5根日线close是否整体向上
closes_1d = [k["close"] for k in pre_1d[-5:]]
features["daily_trend_up"] = closes_1d[-1] > closes_1d[0] if len(closes_1d) >= 3 else False
except Exception:
pass
return features
def check_sector_alignment(symbol, top_gainers, config):
"""
检查板块联动:同板块其他币是否也在涨幅榜
返回: {sector_name, sector_coin_count, sector_top_gainer_count, is_sector_hot}
"""
if not config.get("feature_extraction", {}).get("check_sector_alignment", True):
return {"sector": "", "sector_coin_count": 0, "sector_top_gainer_count": 0, "is_sector_hot": False}
sectors = get_sector_for_coin(symbol)
if not sectors:
return {"sector": "未知", "sector_coin_count": 0, "sector_top_gainer_count": 0, "is_sector_hot": False}
# 主板块取第一个
primary_sector = sectors[0]
sector_coins = SECTOR_MEMBERS.get(primary_sector, [])
# 统计同板块有多少币在涨幅榜
sector_in_gainers = []
for g in top_gainers:
if g["symbol"] in sector_coins:
sector_in_gainers.append(g)
# 板块联动阈值≥3只同板块币涨幅榜 → 板块热
is_hot = len(sector_in_gainers) >= 3
return {
"sector": primary_sector,
"sector_coin_count": len(sector_coins),
"sector_top_gainer_count": len(sector_in_gainers),
"sector_gainer_symbols": [g["symbol"] for g in sector_in_gainers],
"is_sector_hot": is_hot,
}
# ==================== 共性特征统计与规律发现 ====================
def compute_pattern_summary(all_features, total_count, control_features=None):
"""
统计所有top gainer起爆前窗口的共性特征占比
返回: [{feature_name, count, percentage, description}]
"""
if total_count == 0:
return []
feature_counts = Counter()
feature_details = defaultdict(list)
for feat in all_features:
for key, value in feat.items():
if key.startswith("has_") and value:
feature_counts[key] += 1
# 收集具体描述
detail_key = key.replace("has_", "") + "_details"
if detail_key in feat and feat[detail_key]:
feature_details[key].extend(feat[detail_key])
control_features = control_features or []
control_total = len(control_features)
control_counts = Counter()
for feat in control_features:
for key, value in feat.items():
if key.startswith("has_") and value:
control_counts[key] += 1
summary = []
for feat_key, count in feature_counts.most_common():
pct = round(count / total_count * 100, 1)
control_count = control_counts.get(feat_key, 0)
control_pct = round(control_count / control_total * 100, 1) if control_total else 0.0
lift = round((pct + 1.0) / (control_pct + 1.0), 2) if control_total else 0.0
label = FEATURE_LABELS.get(feat_key, feat_key)
summary.append({
"feature": feat_key,
"label": label,
"count": count,
"percentage": pct,
"control_count": control_count,
"control_percentage": control_pct,
"lift": lift,
"total": total_count,
"control_total": control_total,
})
# 补充即使未达到阈值的特征也记录只是不触发learned_rule
for feat_key, label in FEATURE_LABELS.items():
if feat_key not in feature_counts:
summary.append({
"feature": feat_key,
"label": label,
"count": 0,
"percentage": 0.0,
"control_count": control_counts.get(feat_key, 0),
"control_percentage": round(control_counts.get(feat_key, 0) / control_total * 100, 1) if control_total else 0.0,
"lift": 0.0,
"total": total_count,
"control_total": control_total,
})
# 排序:优先看相对对照组的提升,再看涨幅榜占比
summary.sort(key=lambda x: (x.get("lift", 0), x["percentage"]), reverse=True)
return summary
def _rule_for_feature(feat_key, pct, lift=0):
lift_note = f",相对对照组提升{lift}x" if lift else ""
if feat_key == "has_ignition_point":
return {
"type": "bonus",
"description": f"涨幅榜{pct}%有起爆点(静K→动K){lift_note} → 起爆点是爆发前候选信号",
"conditions": {"has_ignition_point": True},
"score_adjust": 2,
"source": "reverse_analysis",
}
if feat_key == "has_q7_zone":
return {
"type": "bonus",
"description": f"涨幅榜{pct}%有Q≥7供需区{lift_note} → 高质量供需区是爆发支撑候选",
"conditions": {"has_q7_zone": True},
"score_adjust": 2,
"source": "reverse_analysis",
}
if feat_key == "has_q7_demand_nearby":
return {
"type": "bonus",
"description": f"涨幅榜{pct}%有Q≥7需求区在起爆价附近{lift_note} → 需求区支撑是爆发候选因子",
"conditions": {"has_q7_demand_nearby": True},
"score_adjust": 3,
"source": "reverse_analysis",
}
if feat_key == "has_continuous_k":
return {
"type": "bonus",
"description": f"涨幅榜{pct}%有连续K多头加速{lift_note} → 趋势加速是爆发前兆候选",
"conditions": {"has_continuous_k_bullish": True},
"score_adjust": 2,
"source": "reverse_analysis",
}
if feat_key == "has_static_accumulation":
return {
"type": "bonus",
"description": f"涨幅榜{pct}%有静K蓄力(占比≥15%){lift_note} → 蓄力是爆发候选条件",
"conditions": {"static_ratio_min": 0.15},
"score_adjust": 1,
"source": "reverse_analysis",
}
if feat_key == "has_volume_surge_before":
return {
"type": "bonus",
"description": f"涨幅榜{pct}%起爆前2倍+放量{lift_note} → 量能先行是爆发预警候选",
"conditions": {"vol_surge_ratio_min": 2.0},
"score_adjust": 2,
"source": "reverse_analysis",
}
if feat_key == "has_bullish_breakout_pattern":
return {
"type": "bonus",
"description": f"涨幅榜{pct}%连续5K收盘价抬高{lift_note} → 价格加速是爆发前形态候选",
"conditions": {"has_bullish_breakout_pattern": True},
"score_adjust": 1,
"source": "reverse_analysis",
}
return None
def discover_new_rules(pattern_summary, all_features, sector_alignments, significance_pct=60.0, min_lift=1.5):
"""
当共性特征占比≥significance_pct时自动生成learned_rule
返回: [{rule_id, description, conditions, score_adjust}]
"""
new_rules = []
total_analyzed = len(all_features)
significant_features = [
p for p in pattern_summary
if p["percentage"] >= significance_pct and (not p.get("control_total") or p.get("lift", 0) >= min_lift)
]
for pattern in significant_features:
feat_key = pattern["feature"]
pct = pattern["percentage"]
count = int(pattern.get("count") or 0)
lift = float(pattern.get("lift") or 0)
rule = _rule_for_feature(feat_key, pct, lift=lift)
if not rule:
continue
# 新体系:逆向分析只生成候选规则,不直接写 learned_rules避免涨幅榜小样本污染已发布策略。
rule["candidate_id"] = upsert_strategy_rule_candidate(
source="reverse_analysis",
rule_type=rule.get("type", "bonus"),
signal_name=feat_key,
rule_description=rule.get("description", ""),
support_count=int(count),
success_count=int(count),
fail_count=int(pattern.get("control_count") or 0),
confidence_score=round(min(95, pct * max(lift, 1.0) / 2), 1),
sample_size=int(total_analyzed),
status="candidate",
notes=f"逆向涨幅榜规律,只分析起爆前窗口并做对照组校验(lift={lift}),用于提前埋伏候选,不能作为涨后追买依据,仍需等待推荐样本验证后再发布",
source_ref=f"reverse:{feat_key}",
)
new_rules.append(rule)
# 检查板块联动规律
hot_sectors = [sa for sa in sector_alignments if sa.get("is_sector_hot")]
if len(hot_sectors) >= 2:
# 多板块联动规律
sector_names = [sa["sector"] for sa in hot_sectors]
rule = {
"type": "bonus",
"description": f"多板块联动({', '.join(sector_names)}) → 市场整体情绪升温,蓄力币起爆概率高",
"conditions": {"multi_sector_hot": True, "sectors": sector_names},
"score_adjust": 2,
"source": "reverse_analysis",
}
rule["candidate_id"] = upsert_strategy_rule_candidate(
source="reverse_analysis",
rule_type=rule.get("type", "bonus"),
signal_name="multi_sector_hot",
rule_description=rule.get("description", ""),
support_count=len(hot_sectors),
success_count=len(hot_sectors),
fail_count=0,
confidence_score=60,
sample_size=len(sector_alignments),
status="candidate",
notes="板块联动候选规律,用于提前识别市场情绪扩散,不能作为涨后追买依据,需等待推荐样本验证后再发布",
source_ref="reverse:multi_sector_hot",
)
new_rules.append(rule)
return new_rules
# ==================== 主流程 ====================
def run_reverse_analysis():
"""
执行完整逆向分析流程:
1. 拉涨幅榜Top N只把它作为“哪些币后来涨了”的标签
2. 过滤掉已推荐的币
3. 对每个暴涨币回溯起爆前K线截掉涨后窗口再做PA分析
4. 统计共性特征,发现规律
5. 写入DB返回结构化结果
"""
config = config_loader.get_reverse_params()
top_n = config.get("top_n_gainers", 30)
lookback_hours = config.get("lookback_hours", 72)
min_gain_pct = config.get("min_gain_pct", 10.0)
significance_pct = config.get("significance_threshold_pct", 60.0)
control_sample_size = config.get("control_sample_size", top_n)
min_lift = config.get("min_lift", 1.5)
# 1. 拉涨幅榜
tickers = fetch_24h_tickers()
if not tickers:
print("[reverse_analysis] 无法获取24h行情数据")
return {"error": "无法获取24h行情数据", "top_gainers": [], "pattern_summary": [], "new_rules": []}
# 排序涨幅榜
gainers = []
eligible_universe = []
for t in tickers:
symbol_str = t["symbol"]
if not _is_altcoin_usdt_symbol(symbol_str):
continue
base = symbol_str.replace("USDT", "")
formatted = f"{base}/USDT"
change_pct = float(t["priceChangePercent"])
volume_24h = float(t["quoteVolume"])
last_price = float(t["lastPrice"])
item = {
"symbol": formatted,
"gain_pct": round(change_pct, 2),
"price": last_price,
"volume_24h": volume_24h,
"sector": get_sector_for_coin(formatted),
}
eligible_universe.append(item)
if change_pct >= min_gain_pct:
gainers.append(item)
# 按涨幅排序取Top N
gainers.sort(key=lambda x: x["gain_pct"], reverse=True)
gainers = gainers[:top_n]
# 2. 过滤掉已推荐的币
recommended = get_recommended_symbols(hours=lookback_hours)
unrecommended_gainers = [g for g in gainers if g["symbol"] not in recommended]
print(f"[reverse_analysis] 涨幅榜共{len(gainers)}只>{min_gain_pct}%, 其中{len(unrecommended_gainers)}只未被推荐")
gainer_symbols = {g["symbol"] for g in gainers}
control_candidates = [
item for item in eligible_universe
if item["symbol"] not in gainer_symbols and item["gain_pct"] < min_gain_pct
]
# 对照组取成交额较高但未大涨的山寨,保证“没涨”不是因为死币没流动性。
control_candidates.sort(key=lambda x: (x["volume_24h"], -abs(x["gain_pct"])), reverse=True)
control_group = control_candidates[:control_sample_size]
# 3. 对每个未被推荐的暴涨币做PA分析
all_features = []
sector_alignments = []
missed_details = []
for gainer in unrecommended_gainers:
symbol = gainer["symbol"]
pre_klines, explosion_klines = fetch_klines_before(symbol, lookback_hours, "1h")
features = extract_pre_explosion_features(symbol, pre_klines, explosion_klines, config)
all_features.append(features)
# 板块联动检测
sector_alignment = check_sector_alignment(symbol, gainers, config)
sector_alignments.append(sector_alignment)
gainer["sector_info"] = sector_alignment
# 构建DB记录的特征列表用于features_detected字段
detected_features = []
for key, value in features.items():
if key.startswith("has_") and value:
detected_features.append(key.replace("has_", ""))
elif key in ("ignition_count", "q_max", "static_ratio", "vol_surge_ratio", "static_k_count", "continuous_k_count"):
detected_features.append(f"{key}={value}")
# 计算起爆前价格(涨幅反推)
price_before = gainer["price"] / (1 + gainer["gain_pct"] / 100)
# 判断为什么没推荐(简化版)
conn = get_conn()
screened = conn.execute("""
SELECT symbol, state, score, signals FROM screening_log
WHERE symbol=%s AND layer='细筛'
ORDER BY scan_time DESC LIMIT 1
""", (symbol,)).fetchone()
conn.close()
if screened:
reason = f"细筛淘汰(state={screened['state']}, score={screened['score']})"
else:
reason = "粗筛未通过(涨幅或量价不达标)"
# 写入DB
record_missed_explosion(
symbol=symbol,
price_at_detect=gainer["price"],
price_before=price_before,
gain_pct=gainer["gain_pct"],
reason_missed=reason,
features_detected=detected_features,
lesson=f"起爆前PA特征: {', '.join(detected_features[:5])}; 板块: {sector_alignment.get('sector', '未知')}",
)
missed_details.append({
"symbol": symbol,
"gain_pct": gainer["gain_pct"],
"sector": sector_alignment.get("sector", "未知"),
"is_sector_hot": sector_alignment.get("is_sector_hot", False),
"reason_missed": reason,
"features_detected": detected_features,
"q_max": features.get("q_max", 0),
"ignition_count": features.get("ignition_count", 0),
"static_ratio": features.get("static_ratio", 0),
"vol_surge_ratio": features.get("vol_surge_ratio", 0),
})
control_features = []
control_details = []
for item in control_group:
symbol = item["symbol"]
pre_klines, explosion_klines = fetch_klines_before(symbol, lookback_hours, "1h")
features = extract_pre_explosion_features(symbol, pre_klines, explosion_klines, config)
control_features.append(features)
control_details.append({
"symbol": symbol,
"gain_pct": item["gain_pct"],
"volume_24h": item["volume_24h"],
"features": [key for key, value in features.items() if key.startswith("has_") and value],
})
# 4. 统计共性特征
total_analyzed = len(all_features)
pattern_summary = compute_pattern_summary(all_features, total_analyzed, control_features=control_features)
# 5. 发现新规律
new_rules = discover_new_rules(
pattern_summary,
all_features,
sector_alignments,
significance_pct,
min_lift=min_lift,
)
# 更新meta
config_loader.update_meta("last_reverse_analysis", datetime.now().isoformat())
# 6. 返回结构化结果
results = {
"timestamp": datetime.now().isoformat(),
"analysis_scope": "pre_explosion_only",
"feature_window": {
"lookback_hours": lookback_hours,
"excluded_recent_hours": 24,
"label_usage": "top_gainers_are_outcome_labels_only",
"note": "涨幅榜只用于标记哪些币后来涨了;因子只从起爆前/启动点窗口提取,不能使用涨后结果做追买依据。",
},
"total_gainers": len(gainers),
"total_unrecommended": len(unrecommended_gainers),
"total_analyzed": total_analyzed,
"control_analyzed": len(control_features),
"top_gainers": gainers[:10], # feishu推送只取TOP10
"missed_details": missed_details,
"control_details": control_details[:10],
"pattern_summary": pattern_summary,
"new_rules": new_rules,
"sector_alignments": sector_alignments,
}
print(f"[reverse_analysis] 完成: 分析{total_analyzed}只, 发现{len(new_rules)}条新规律")
return results
if __name__ == "__main__":
results = run_reverse_analysis()
print(json.dumps(results, ensure_ascii=False, indent=2))