diff --git a/app/core/opportunity_funnel.py b/app/core/opportunity_funnel.py index 099ba3e..9c6361b 100644 --- a/app/core/opportunity_funnel.py +++ b/app/core/opportunity_funnel.py @@ -105,6 +105,8 @@ def screening_stage_meta(layer: str, detail: Dict[str, Any] | None = None, state def discovery_source_types(candidate: Dict[str, Any]) -> List[str]: sources: List[str] = [] + if candidate.get("top_gainer_24h"): + sources.append("cex_top_gainer") if candidate.get("vp_data") or candidate.get("turnover_acceleration_1h") or candidate.get("turnover_acceleration_4h"): sources.append("cex") if candidate.get("static_accumulation") or candidate.get("higher_lows") or candidate.get("compression_surge"): diff --git a/app/core/signal_taxonomy.py b/app/core/signal_taxonomy.py index 03ab643..025deab 100644 --- a/app/core/signal_taxonomy.py +++ b/app/core/signal_taxonomy.py @@ -12,6 +12,7 @@ from typing import Any, Iterable SIGNAL_CODE_LABELS = { "vp_fly_1h_current": "1H当前量价齐飞", + "cex_top_gainer_24h": "CEX 24h强势榜异动", "vp_fly_1h_stale": "1H历史量价齐飞", "volume_divergence_1h": "1H量价背离", "static_accum_4h": "4H静K蓄力", @@ -49,6 +50,7 @@ SIGNAL_CODE_LABELS = { _PATTERNS = [ + ("cex_top_gainer_24h", ("24h强势榜",)), ("vp_fly_1h_stale", ("历史放量阳线", "历史量价齐飞", "量价齐飞已过期")), ("vp_fly_1h_current", ("量价齐飞", "量价齐飞K")), ("volume_divergence_1h", ("量价背离", "放量但无量价齐飞")), diff --git a/app/services/altcoin_screener.py b/app/services/altcoin_screener.py index b30087a..608106c 100644 --- a/app/services/altcoin_screener.py +++ b/app/services/altcoin_screener.py @@ -574,6 +574,13 @@ def _build_signal_recency(cand): """把粗筛/细筛命中的信号按 current/stale 标记,避免旧形态冒充当下机会。""" current = [] stale = [] + if cand.get("top_gainer_24h"): + current.append({ + "type": "cex_top_gainer", + "label": "当前24h强势榜异动", + "timeframe": "24h", + "change_24h": cand.get("change_24h", 0), + }) vp = cand.get("vp_data") or {} if vp.get("vp_fly_count", 0) > 0: current.append({"type": "volume_price", "label": "当前1H量价齐飞", "timeframe": "1h", "age_hours": vp.get("latest_vp_age_hours")}) @@ -591,6 +598,63 @@ def _build_signal_recency(cand): return {"status": status, "current": current, "stale": stale} +def _is_top_gainer_candidate(symbol, info, *, min_volume=None, threshold=None): + """把涨幅榜强势本身纳入发现层,避免已启动币被粗筛静默跳过。""" + volume = float((info or {}).get("volume_24h") or 0) + change = float((info or {}).get("change_24h") or 0) + if min_volume is None: + min_volume = MEME_MIN_24H_VOLUME_USD if is_meme_coin(symbol) else MIN_24H_VOLUME_USD + if threshold is None: + threshold = get_burst_threshold(symbol) * 1.5 + return volume >= float(min_volume or 0) and change >= float(threshold or 0) + + +def _top_gainer_signal(symbol, change, volume): + return f"24h强势榜异动({float(change or 0):.1f}%,成交额{float(volume or 0)/1_000_000:.1f}M)" + + +def _attach_top_gainer_discovery(candidates, tickers, recently_screened): + """为强势榜补发现入口;追高风险留给细筛/确认处理。""" + added = 0 + for symbol, info in tickers.items(): + if symbol in candidates: + if _is_top_gainer_candidate(symbol, info): + candidates[symbol]["top_gainer_24h"] = True + candidates[symbol]["top_gainer_chase_risk"] = symbol not in recently_screened + continue + if not _is_top_gainer_candidate(symbol, info): + continue + + change = float(info.get("change_24h") or 0) + vol = float(info.get("volume_24h") or 0) + signal = _top_gainer_signal(symbol, change, vol) + if symbol not in recently_screened: + signal = f"{signal},追高风险待确认" + candidates[symbol] = { + "anomalies": [signal], + "anomaly_score": 2, + "price": info["price"], + "change_24h": change, + "volume_24h": vol, + "funding_rate": 0, + "is_meme": is_meme_coin(symbol), + "vp_data": None, + "bb_data": None, + "static_accumulation": None, + "h4_df": None, + "turnover_acceleration_1h": 0, + "turnover_acceleration_4h": 0, + "base_volume_24h": 0, + "quote_volume_24h": round(vol, 2), + "weighted_avg_price": info.get("price", 0), + "top_gainer_24h": True, + "top_gainer_chase_risk": symbol not in recently_screened, + "bypass_origin": "top_gainer_24h", + } + added += 1 + return added + + def _log_universe_exclusions(exclusions, max_logs=120): """把交易宇宙过滤结果写入链路日志,避免页面看不到第一道漏斗。""" logged = 0 @@ -742,12 +806,14 @@ def layer1_coarse_filter(): anomalies.append(f"资金费率极端偏低({fr*100:.3f}%)") anomaly_score += 2 - # 排除已涨太多 — 但24h内已被系统盯上的币豁免 + # 已经进入24h强势榜的币不在粗筛静默丢弃;先作为异动发现记录, + # 后续由细筛/确认判断是否追高、是否等待二次结构。 burst_threshold = get_burst_threshold(symbol) - if change > burst_threshold * 1.5 and symbol not in recently_screened: - continue + is_unseen_top_gainer = change > burst_threshold * 1.5 and symbol not in recently_screened if anomalies: + if is_unseen_top_gainer: + anomalies.append(_top_gainer_signal(symbol, change, vol) + ",追高风险待确认") # === 冲高回落检查:量价齐飞后持续阴跌→拒绝 === if isinstance(vp_data, dict) and (vp_data.get("pullback") or {}).get("is_pullback"): pb = vp_data["pullback"] @@ -777,6 +843,8 @@ def layer1_coarse_filter(): "base_volume_24h": round(base_volume, 2), "quote_volume_24h": round(quote_volume, 2), "weighted_avg_price": round(weighted_avg_price, 6) if weighted_avg_price else 0, + "top_gainer_24h": _is_top_gainer_candidate(symbol, info), + "top_gainer_chase_risk": is_unseen_top_gainer, } # ==== 第二遍扫描:低成交量静K蓄力旁路 + 底部抬高 + 压缩放量 ==== @@ -805,8 +873,7 @@ def layer1_coarse_filter(): change = info["change_24h"] burst_threshold = get_burst_threshold(symbol) - if change > burst_threshold * 1.5 and symbol not in recently_screened: - continue + is_unseen_top_gainer = change > burst_threshold * 1.5 and symbol not in recently_screened meme = is_meme_coin(symbol) fr = funding_rates.get(symbol, 0) @@ -825,6 +892,8 @@ def layer1_coarse_filter(): anomalies = [ f"4H静K蓄力旁路({static_acc['static_count']}静K,量比{static_acc['vol_ratio']}x)" ] + if is_unseen_top_gainer: + anomalies.append(_top_gainer_signal(symbol, change, vol) + ",追高风险待确认") anomaly_score = max(1, weights.get("静K蓄力", 2)) candidates[symbol] = { @@ -845,6 +914,8 @@ def layer1_coarse_filter(): "quote_volume_24h": 0, "weighted_avg_price": info.get("price", 0), "bypass_origin": True, + "top_gainer_24h": _is_top_gainer_candidate(symbol, info), + "top_gainer_chase_risk": is_unseen_top_gainer, } bypass_count += 1 added = True @@ -854,6 +925,8 @@ def layer1_coarse_filter(): hl_result = detect_higher_lows(h4_df, hl_cfg) if hl_result["found"]: anomalies = [f"4H {hl_result['signal']}"] + if is_unseen_top_gainer: + anomalies.append(_top_gainer_signal(symbol, change, vol) + ",追高风险待确认") anomaly_score = hl_result["hl_score"] candidates[symbol] = { @@ -875,6 +948,8 @@ def layer1_coarse_filter(): "quote_volume_24h": 0, "weighted_avg_price": info.get("price", 0), "bypass_origin": "higher_lows", + "top_gainer_24h": _is_top_gainer_candidate(symbol, info), + "top_gainer_chase_risk": is_unseen_top_gainer, } hl_count_total += 1 added = True @@ -884,6 +959,8 @@ def layer1_coarse_filter(): cs_result = detect_compression_surge(h4_df, cs_cfg) if cs_result["found"]: anomalies = [f"4H {cs_result['signal']}"] + if is_unseen_top_gainer: + anomalies.append(_top_gainer_signal(symbol, change, vol) + ",追高风险待确认") anomaly_score = cs_result["score"] candidates[symbol] = { @@ -905,10 +982,14 @@ def layer1_coarse_filter(): "quote_volume_24h": 0, "weighted_avg_price": info.get("price", 0), "bypass_origin": "compression_surge", + "top_gainer_24h": _is_top_gainer_candidate(symbol, info), + "top_gainer_chase_risk": is_unseen_top_gainer, } cs_count_total += 1 added = True + top_gainer_count = _attach_top_gainer_discovery(candidates, tickers, recently_screened) + # 第一道漏斗:把明确不可交易/太低成交额的资产写成独立阶段,研发侧可审计, # 但不让它们进入后续机会链路。 low_turnover_threshold = min(v for v in [main_min_vol, bypass_min_vol, hl_min_vol, cs_min_vol] if v != float("inf")) @@ -963,7 +1044,7 @@ def layer1_coarse_filter(): print(f"舆情模块加载失败(非致命): {e}") total_bypass = bypass_count + hl_count_total + cs_count_total - print(f"粗筛结果: {len(candidates)}个候选(宇宙过滤{len(universe_exclusions)}个,记录{universe_logged}个;含{total_bypass}个旁路: 静K{bypass_count}+底抬{hl_count_total}+压放{cs_count_total})") + print(f"粗筛结果: {len(candidates)}个候选(宇宙过滤{len(universe_exclusions)}个,记录{universe_logged}个;含{total_bypass}个旁路: 静K{bypass_count}+底抬{hl_count_total}+压放{cs_count_total};强势榜发现{top_gainer_count}个)") for symbol, cand in candidates.items(): signals = cand.get("anomalies", []) log_screening( @@ -996,6 +1077,7 @@ def layer1_coarse_filter(): layer1_coarse_filter.last_funnel_meta = { "universe_gate_count": len(universe_exclusions), "universe_gate_logged": universe_logged, + "top_gainer_discovery_count": top_gainer_count, } return candidates @@ -1194,6 +1276,11 @@ def layer2_fine_filter(candidates): elif origin == "compression_surge" and not force_accumulate_reason: force_accumulate_reason = "压缩放量旁路" + if cand.get("top_gainer_24h"): + signals.append(f"24h强势榜异动({cand.get('change_24h', 0):.1f}%)") + if cand.get("top_gainer_chase_risk"): + signals.append("追高风险:首次进入强势榜,等待二次结构确认") + quality = quality_filter_reasons(cand, int(score or 0), accumulate_threshold, signals) if state in ("蓄力", "加速"): diff --git a/tests/test_screener_optimizations.py b/tests/test_screener_optimizations.py index eedce0c..49ef04a 100644 --- a/tests/test_screener_optimizations.py +++ b/tests/test_screener_optimizations.py @@ -316,3 +316,86 @@ def test_layer1_logs_coarse_candidate_details(monkeypatch): assert "DOGE/USDT" in candidates assert any(item["layer"] == "粗筛" and item["symbol"] == "DOGE/USDT" for item in logged) + + +def test_layer1_keeps_unseen_top_gainer_as_discovery_candidate(monkeypatch): + logged = [] + + monkeypatch.setattr(altcoin_screener, "fetch_all_tickers", lambda: { + "RONIN/USDT": {"volume_24h": 11_000_000, "change_24h": 43.0, "price": 1.2}, + }) + monkeypatch.setattr(altcoin_screener, "fetch_funding_rates", lambda: {}) + monkeypatch.setattr(altcoin_screener, "get_dynamic_weights", _mock_weights) + monkeypatch.setattr(altcoin_screener, "is_meme_coin", lambda symbol: False) + monkeypatch.setattr(altcoin_screener, "get_burst_threshold", lambda symbol: 5) + monkeypatch.setattr(altcoin_screener, "fetch_klines", lambda symbol, timeframe, limit=200: None) + monkeypatch.setattr( + altcoin_screener, + "get_screener_section", + lambda name=None: { + "static_accumulation_bypass": {"min_volume_24h": 2_000_000, "min_vol_ratio": 1.2}, + "higher_lows": {"enabled": False}, + "compression_surge": {"enabled": False}, + "sentiment": {"enabled": False}, + }.get(name, {}), + ) + monkeypatch.setattr(altcoin_screener.exchange, "fapiPublicGetTicker24hr", lambda: []) + monkeypatch.setattr(altcoin_screener, "log_screening", lambda **kwargs: logged.append(kwargs)) + + candidates = altcoin_screener.layer1_coarse_filter() + + assert "RONIN/USDT" in candidates + cand = candidates["RONIN/USDT"] + assert cand["top_gainer_24h"] is True + assert cand["top_gainer_chase_risk"] is True + assert any("24h强势榜异动" in s for s in cand["anomalies"]) + coarse = next(item for item in logged if item["layer"] == "粗筛" and item["symbol"] == "RONIN/USDT") + assert coarse["detail"]["candidate_stage"] == "discovery_candidate" + assert "cex_top_gainer" in coarse["detail"]["source_types"] + assert "cex_top_gainer_24h" in coarse["detail"]["signal_codes"] + + +def test_top_gainer_quality_rejected_as_chase_not_silent(monkeypatch): + logged = [] + monkeypatch.setattr(altcoin_screener, "get_dynamic_weights", _mock_weights) + monkeypatch.setattr(altcoin_screener, "state_score_thresholds", lambda: (6, 9, 3)) + monkeypatch.setattr( + altcoin_screener, + "get_screener_section", + lambda name=None: { + "static_accumulation_bypass": { + "min_score": 2, + "min_vol_ratio": 1.2, + "min_static_count": 3, + }, + }.get(name, {}), + ) + monkeypatch.setattr(altcoin_screener, "fetch_top_trader_ratio", lambda symbol: None) + monkeypatch.setattr(altcoin_screener, "log_screening", lambda **kwargs: logged.append(kwargs)) + monkeypatch.setattr(altcoin_screener, "get_sector_for_coin", lambda symbol: []) + monkeypatch.setattr(altcoin_screener, "dynamic_leader_detection", lambda perf: {}) + monkeypatch.setattr(altcoin_screener, "SECTOR_MEMBERS", {}) + + qualified, _, _ = altcoin_screener.layer2_fine_filter({ + "RONIN/USDT": { + "anomaly_score": 2, + "price": 1.2, + "change_24h": 43.0, + "volume_24h": 11_000_000, + "funding_rate": 0.0, + "is_meme": False, + "vp_data": None, + "bb_data": None, + "static_accumulation": None, + "h4_df": None, + "top_gainer_24h": True, + "top_gainer_chase_risk": True, + "anomalies": ["24h强势榜异动(43.0%,成交额11.0M),追高风险待确认"], + } + }) + + assert "RONIN/USDT" not in qualified + reject = next(item for item in logged if item["layer"] == "细筛" and item["symbol"] == "RONIN/USDT") + assert reject["detail"]["candidate_stage"] == "rejected_candidate" + assert "high_chase_risk" in reject["detail"]["reject_reason_codes"] + assert "low_score" in reject["detail"]["reject_reason_codes"]