This commit is contained in:
aaron 2026-05-19 09:21:51 +08:00
parent 74a0121a6d
commit 01e38675fe
4 changed files with 180 additions and 6 deletions

View File

@ -105,6 +105,8 @@ def screening_stage_meta(layer: str, detail: Dict[str, Any] | None = None, state
def discovery_source_types(candidate: Dict[str, Any]) -> List[str]:
sources: List[str] = []
if candidate.get("top_gainer_24h"):
sources.append("cex_top_gainer")
if candidate.get("vp_data") or candidate.get("turnover_acceleration_1h") or candidate.get("turnover_acceleration_4h"):
sources.append("cex")
if candidate.get("static_accumulation") or candidate.get("higher_lows") or candidate.get("compression_surge"):

View File

@ -12,6 +12,7 @@ from typing import Any, Iterable
SIGNAL_CODE_LABELS = {
"vp_fly_1h_current": "1H当前量价齐飞",
"cex_top_gainer_24h": "CEX 24h强势榜异动",
"vp_fly_1h_stale": "1H历史量价齐飞",
"volume_divergence_1h": "1H量价背离",
"static_accum_4h": "4H静K蓄力",
@ -49,6 +50,7 @@ SIGNAL_CODE_LABELS = {
_PATTERNS = [
("cex_top_gainer_24h", ("24h强势榜",)),
("vp_fly_1h_stale", ("历史放量阳线", "历史量价齐飞", "量价齐飞已过期")),
("vp_fly_1h_current", ("量价齐飞", "量价齐飞K")),
("volume_divergence_1h", ("量价背离", "放量但无量价齐飞")),

View File

@ -574,6 +574,13 @@ def _build_signal_recency(cand):
"""把粗筛/细筛命中的信号按 current/stale 标记,避免旧形态冒充当下机会。"""
current = []
stale = []
if cand.get("top_gainer_24h"):
current.append({
"type": "cex_top_gainer",
"label": "当前24h强势榜异动",
"timeframe": "24h",
"change_24h": cand.get("change_24h", 0),
})
vp = cand.get("vp_data") or {}
if vp.get("vp_fly_count", 0) > 0:
current.append({"type": "volume_price", "label": "当前1H量价齐飞", "timeframe": "1h", "age_hours": vp.get("latest_vp_age_hours")})
@ -591,6 +598,63 @@ def _build_signal_recency(cand):
return {"status": status, "current": current, "stale": stale}
def _is_top_gainer_candidate(symbol, info, *, min_volume=None, threshold=None):
"""把涨幅榜强势本身纳入发现层,避免已启动币被粗筛静默跳过。"""
volume = float((info or {}).get("volume_24h") or 0)
change = float((info or {}).get("change_24h") or 0)
if min_volume is None:
min_volume = MEME_MIN_24H_VOLUME_USD if is_meme_coin(symbol) else MIN_24H_VOLUME_USD
if threshold is None:
threshold = get_burst_threshold(symbol) * 1.5
return volume >= float(min_volume or 0) and change >= float(threshold or 0)
def _top_gainer_signal(symbol, change, volume):
return f"24h强势榜异动({float(change or 0):.1f}%,成交额{float(volume or 0)/1_000_000:.1f}M)"
def _attach_top_gainer_discovery(candidates, tickers, recently_screened):
"""为强势榜补发现入口;追高风险留给细筛/确认处理。"""
added = 0
for symbol, info in tickers.items():
if symbol in candidates:
if _is_top_gainer_candidate(symbol, info):
candidates[symbol]["top_gainer_24h"] = True
candidates[symbol]["top_gainer_chase_risk"] = symbol not in recently_screened
continue
if not _is_top_gainer_candidate(symbol, info):
continue
change = float(info.get("change_24h") or 0)
vol = float(info.get("volume_24h") or 0)
signal = _top_gainer_signal(symbol, change, vol)
if symbol not in recently_screened:
signal = f"{signal},追高风险待确认"
candidates[symbol] = {
"anomalies": [signal],
"anomaly_score": 2,
"price": info["price"],
"change_24h": change,
"volume_24h": vol,
"funding_rate": 0,
"is_meme": is_meme_coin(symbol),
"vp_data": None,
"bb_data": None,
"static_accumulation": None,
"h4_df": None,
"turnover_acceleration_1h": 0,
"turnover_acceleration_4h": 0,
"base_volume_24h": 0,
"quote_volume_24h": round(vol, 2),
"weighted_avg_price": info.get("price", 0),
"top_gainer_24h": True,
"top_gainer_chase_risk": symbol not in recently_screened,
"bypass_origin": "top_gainer_24h",
}
added += 1
return added
def _log_universe_exclusions(exclusions, max_logs=120):
"""把交易宇宙过滤结果写入链路日志,避免页面看不到第一道漏斗。"""
logged = 0
@ -742,12 +806,14 @@ def layer1_coarse_filter():
anomalies.append(f"资金费率极端偏低({fr*100:.3f}%)")
anomaly_score += 2
# 排除已涨太多 — 但24h内已被系统盯上的币豁免
# 已经进入24h强势榜的币不在粗筛静默丢弃先作为异动发现记录
# 后续由细筛/确认判断是否追高、是否等待二次结构。
burst_threshold = get_burst_threshold(symbol)
if change > burst_threshold * 1.5 and symbol not in recently_screened:
continue
is_unseen_top_gainer = change > burst_threshold * 1.5 and symbol not in recently_screened
if anomalies:
if is_unseen_top_gainer:
anomalies.append(_top_gainer_signal(symbol, change, vol) + ",追高风险待确认")
# === 冲高回落检查:量价齐飞后持续阴跌→拒绝 ===
if isinstance(vp_data, dict) and (vp_data.get("pullback") or {}).get("is_pullback"):
pb = vp_data["pullback"]
@ -777,6 +843,8 @@ def layer1_coarse_filter():
"base_volume_24h": round(base_volume, 2),
"quote_volume_24h": round(quote_volume, 2),
"weighted_avg_price": round(weighted_avg_price, 6) if weighted_avg_price else 0,
"top_gainer_24h": _is_top_gainer_candidate(symbol, info),
"top_gainer_chase_risk": is_unseen_top_gainer,
}
# ==== 第二遍扫描低成交量静K蓄力旁路 + 底部抬高 + 压缩放量 ====
@ -805,8 +873,7 @@ def layer1_coarse_filter():
change = info["change_24h"]
burst_threshold = get_burst_threshold(symbol)
if change > burst_threshold * 1.5 and symbol not in recently_screened:
continue
is_unseen_top_gainer = change > burst_threshold * 1.5 and symbol not in recently_screened
meme = is_meme_coin(symbol)
fr = funding_rates.get(symbol, 0)
@ -825,6 +892,8 @@ def layer1_coarse_filter():
anomalies = [
f"4H静K蓄力旁路({static_acc['static_count']}静K,量比{static_acc['vol_ratio']}x)"
]
if is_unseen_top_gainer:
anomalies.append(_top_gainer_signal(symbol, change, vol) + ",追高风险待确认")
anomaly_score = max(1, weights.get("静K蓄力", 2))
candidates[symbol] = {
@ -845,6 +914,8 @@ def layer1_coarse_filter():
"quote_volume_24h": 0,
"weighted_avg_price": info.get("price", 0),
"bypass_origin": True,
"top_gainer_24h": _is_top_gainer_candidate(symbol, info),
"top_gainer_chase_risk": is_unseen_top_gainer,
}
bypass_count += 1
added = True
@ -854,6 +925,8 @@ def layer1_coarse_filter():
hl_result = detect_higher_lows(h4_df, hl_cfg)
if hl_result["found"]:
anomalies = [f"4H {hl_result['signal']}"]
if is_unseen_top_gainer:
anomalies.append(_top_gainer_signal(symbol, change, vol) + ",追高风险待确认")
anomaly_score = hl_result["hl_score"]
candidates[symbol] = {
@ -875,6 +948,8 @@ def layer1_coarse_filter():
"quote_volume_24h": 0,
"weighted_avg_price": info.get("price", 0),
"bypass_origin": "higher_lows",
"top_gainer_24h": _is_top_gainer_candidate(symbol, info),
"top_gainer_chase_risk": is_unseen_top_gainer,
}
hl_count_total += 1
added = True
@ -884,6 +959,8 @@ def layer1_coarse_filter():
cs_result = detect_compression_surge(h4_df, cs_cfg)
if cs_result["found"]:
anomalies = [f"4H {cs_result['signal']}"]
if is_unseen_top_gainer:
anomalies.append(_top_gainer_signal(symbol, change, vol) + ",追高风险待确认")
anomaly_score = cs_result["score"]
candidates[symbol] = {
@ -905,10 +982,14 @@ def layer1_coarse_filter():
"quote_volume_24h": 0,
"weighted_avg_price": info.get("price", 0),
"bypass_origin": "compression_surge",
"top_gainer_24h": _is_top_gainer_candidate(symbol, info),
"top_gainer_chase_risk": is_unseen_top_gainer,
}
cs_count_total += 1
added = True
top_gainer_count = _attach_top_gainer_discovery(candidates, tickers, recently_screened)
# 第一道漏斗:把明确不可交易/太低成交额的资产写成独立阶段,研发侧可审计,
# 但不让它们进入后续机会链路。
low_turnover_threshold = min(v for v in [main_min_vol, bypass_min_vol, hl_min_vol, cs_min_vol] if v != float("inf"))
@ -963,7 +1044,7 @@ def layer1_coarse_filter():
print(f"舆情模块加载失败(非致命): {e}")
total_bypass = bypass_count + hl_count_total + cs_count_total
print(f"粗筛结果: {len(candidates)}个候选(宇宙过滤{len(universe_exclusions)}个,记录{universe_logged}个;含{total_bypass}个旁路: 静K{bypass_count}+底抬{hl_count_total}+压放{cs_count_total}")
print(f"粗筛结果: {len(candidates)}个候选(宇宙过滤{len(universe_exclusions)}个,记录{universe_logged}个;含{total_bypass}个旁路: 静K{bypass_count}+底抬{hl_count_total}+压放{cs_count_total};强势榜发现{top_gainer_count}")
for symbol, cand in candidates.items():
signals = cand.get("anomalies", [])
log_screening(
@ -996,6 +1077,7 @@ def layer1_coarse_filter():
layer1_coarse_filter.last_funnel_meta = {
"universe_gate_count": len(universe_exclusions),
"universe_gate_logged": universe_logged,
"top_gainer_discovery_count": top_gainer_count,
}
return candidates
@ -1194,6 +1276,11 @@ def layer2_fine_filter(candidates):
elif origin == "compression_surge" and not force_accumulate_reason:
force_accumulate_reason = "压缩放量旁路"
if cand.get("top_gainer_24h"):
signals.append(f"24h强势榜异动({cand.get('change_24h', 0):.1f}%)")
if cand.get("top_gainer_chase_risk"):
signals.append("追高风险:首次进入强势榜,等待二次结构确认")
quality = quality_filter_reasons(cand, int(score or 0), accumulate_threshold, signals)
if state in ("蓄力", "加速"):

View File

@ -316,3 +316,86 @@ def test_layer1_logs_coarse_candidate_details(monkeypatch):
assert "DOGE/USDT" in candidates
assert any(item["layer"] == "粗筛" and item["symbol"] == "DOGE/USDT" for item in logged)
def test_layer1_keeps_unseen_top_gainer_as_discovery_candidate(monkeypatch):
logged = []
monkeypatch.setattr(altcoin_screener, "fetch_all_tickers", lambda: {
"RONIN/USDT": {"volume_24h": 11_000_000, "change_24h": 43.0, "price": 1.2},
})
monkeypatch.setattr(altcoin_screener, "fetch_funding_rates", lambda: {})
monkeypatch.setattr(altcoin_screener, "get_dynamic_weights", _mock_weights)
monkeypatch.setattr(altcoin_screener, "is_meme_coin", lambda symbol: False)
monkeypatch.setattr(altcoin_screener, "get_burst_threshold", lambda symbol: 5)
monkeypatch.setattr(altcoin_screener, "fetch_klines", lambda symbol, timeframe, limit=200: None)
monkeypatch.setattr(
altcoin_screener,
"get_screener_section",
lambda name=None: {
"static_accumulation_bypass": {"min_volume_24h": 2_000_000, "min_vol_ratio": 1.2},
"higher_lows": {"enabled": False},
"compression_surge": {"enabled": False},
"sentiment": {"enabled": False},
}.get(name, {}),
)
monkeypatch.setattr(altcoin_screener.exchange, "fapiPublicGetTicker24hr", lambda: [])
monkeypatch.setattr(altcoin_screener, "log_screening", lambda **kwargs: logged.append(kwargs))
candidates = altcoin_screener.layer1_coarse_filter()
assert "RONIN/USDT" in candidates
cand = candidates["RONIN/USDT"]
assert cand["top_gainer_24h"] is True
assert cand["top_gainer_chase_risk"] is True
assert any("24h强势榜异动" in s for s in cand["anomalies"])
coarse = next(item for item in logged if item["layer"] == "粗筛" and item["symbol"] == "RONIN/USDT")
assert coarse["detail"]["candidate_stage"] == "discovery_candidate"
assert "cex_top_gainer" in coarse["detail"]["source_types"]
assert "cex_top_gainer_24h" in coarse["detail"]["signal_codes"]
def test_top_gainer_quality_rejected_as_chase_not_silent(monkeypatch):
logged = []
monkeypatch.setattr(altcoin_screener, "get_dynamic_weights", _mock_weights)
monkeypatch.setattr(altcoin_screener, "state_score_thresholds", lambda: (6, 9, 3))
monkeypatch.setattr(
altcoin_screener,
"get_screener_section",
lambda name=None: {
"static_accumulation_bypass": {
"min_score": 2,
"min_vol_ratio": 1.2,
"min_static_count": 3,
},
}.get(name, {}),
)
monkeypatch.setattr(altcoin_screener, "fetch_top_trader_ratio", lambda symbol: None)
monkeypatch.setattr(altcoin_screener, "log_screening", lambda **kwargs: logged.append(kwargs))
monkeypatch.setattr(altcoin_screener, "get_sector_for_coin", lambda symbol: [])
monkeypatch.setattr(altcoin_screener, "dynamic_leader_detection", lambda perf: {})
monkeypatch.setattr(altcoin_screener, "SECTOR_MEMBERS", {})
qualified, _, _ = altcoin_screener.layer2_fine_filter({
"RONIN/USDT": {
"anomaly_score": 2,
"price": 1.2,
"change_24h": 43.0,
"volume_24h": 11_000_000,
"funding_rate": 0.0,
"is_meme": False,
"vp_data": None,
"bb_data": None,
"static_accumulation": None,
"h4_df": None,
"top_gainer_24h": True,
"top_gainer_chase_risk": True,
"anomalies": ["24h强势榜异动(43.0%,成交额11.0M),追高风险待确认"],
}
})
assert "RONIN/USDT" not in qualified
reject = next(item for item in logged if item["layer"] == "细筛" and item["symbol"] == "RONIN/USDT")
assert reject["detail"]["candidate_stage"] == "rejected_candidate"
assert "high_chase_risk" in reject["detail"]["reject_reason_codes"]
assert "low_score" in reject["detail"]["reject_reason_codes"]