From b65fc75893e1c1c64815487565a244d5043297f5 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Fri, 29 May 2026 08:51:48 +0800 Subject: [PATCH] 1 --- AGENTS.md | 1 + app/db/universe_audit.py | 27 ++++- app/services/altcoin_screener.py | 175 +++++++++++++++++++++++++-- rules.yaml | 12 ++ tests/test_screener_optimizations.py | 42 +++++++ 5 files changed, 241 insertions(+), 16 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 898f59b..f235471 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -456,6 +456,7 @@ docker compose run --rm alphax-web python scripts/postgres/validate_import.py -- - `rec_state` 是发现层状态(如“爆发/加速”),`execution_status`/`trade_stage` 才是交易执行阶段(如 `buy_now`/`wait_pullback`/`observe`),不要把“发现爆发”直接解读成“现在可买”。 - 每轮粗筛会写 `screening_coverage_audit`,用于确认 `Binance USDT 总数 -> 可交易宇宙 -> K线成功 -> 粗筛候选 -> 细筛通过` 的覆盖漏斗;排查“为什么没有机会/是否漏选”时应先看这张表和 `/pipeline` 的覆盖率指标。 - `symbol_universe_cache` 只应把静态/半静态问题长期缓存,例如稳定币、封装币、异常交易对、非标准交易对;`low_turnover`、`stale_ticker` 等动态问题只能短 TTL,不能永久拉黑,否则会错过后续流动性改善的币。 +- 粗筛每轮允许拉全市场 24h ticker,但不能对全市场无差别拉 K 线。`rules.yaml` 的 `screener.kline_scan` 应优先表达规则型准入:交易宇宙缓存、成交额、活跃度、最近关注、强势榜、短周期活跃条件;不要默认用“最多扫描 N 个币”截断机会。`emergency_*_max_symbols` 只作为交易所限流事故时的临时保护,默认应为 0。 - 静K蓄力旁路已要求配置化共振(见 `rules.yaml` 的 `screener.static_accumulation_bypass.require_resonance`),避免单一静K样本淹没确认层;无追高风险的强势榜异动仍可作为发现入口。 - 粗筛发现层已加入 `screener.short_timeframe_ignition`:15m 用于捕捉 1H 成型前的短周期启动,5m 只在 15m 已启动或已有结构背景时启用;短周期信号只作为早期发现/共振,不应绕过确认层直接买入。 - 短周期信号会写入 `short_tf_signal_samples`,`/api/screening/short-tf-review` 和 `/pipeline` 的“短周期验证”会展示样本数、转推荐率、当前收益等证据。后续若要把 5m/15m 提升为更强交易触发,必须先基于这张表和历史暴涨样本验证,而不是固定写死。 diff --git a/app/db/universe_audit.py b/app/db/universe_audit.py index 3d553cc..7d266d0 100644 --- a/app/db/universe_audit.py +++ b/app/db/universe_audit.py @@ -48,12 +48,25 @@ def _json(data) -> str: def get_active_static_exclusions(symbols: Iterable[str]) -> dict[str, dict]: """Return cached long-lived exclusions for current Binance symbols.""" + return get_active_universe_exclusions(symbols, reason_types=("static",)) + + +def get_active_universe_exclusions(symbols: Iterable[str], *, reason_types: Iterable[str] = ("static", "dynamic", "transient")) -> dict[str, dict]: + """Return active cached universe exclusions. + + Static exclusions can be applied unconditionally. Dynamic/transient rows + should still be rechecked against the latest ticker evidence by callers. + """ symbol_list = [str(s or "").upper().strip() for s in symbols if str(s or "").strip()] if not symbol_list: return {} ensure_migrations_once() now = _iso() placeholders = ",".join(["%s"] * len(symbol_list)) + type_list = [str(x or "").strip() for x in reason_types if str(x or "").strip()] + if not type_list: + return {} + type_placeholders = ",".join(["%s"] * len(type_list)) conn = get_conn() rows = conn.execute( f""" @@ -61,14 +74,22 @@ def get_active_static_exclusions(symbols: Iterable[str]) -> dict[str, dict]: FROM symbol_universe_cache WHERE symbol IN ({placeholders}) AND decision='excluded' - AND reason_type IN ('static') + AND reason_type IN ({type_placeholders}) AND manual_override=0 AND (expires_at='' OR expires_at >= %s) """, - tuple(symbol_list) + (now,), + tuple(symbol_list) + tuple(type_list) + (now,), ).fetchall() conn.close() - return {row["symbol"]: dict(row) for row in rows} + result = {} + for row in rows: + item = dict(row) + try: + item["evidence"] = json.loads(item.get("evidence_json") or "{}") + except Exception: + item["evidence"] = {} + result[item["symbol"]] = item + return result def record_universe_decisions(items: Iterable[dict], *, source: str = "screener") -> int: diff --git a/app/services/altcoin_screener.py b/app/services/altcoin_screener.py index ffa8843..5226120 100644 --- a/app/services/altcoin_screener.py +++ b/app/services/altcoin_screener.py @@ -60,6 +60,7 @@ from app.core.opportunity_funnel import ( ) from app.core.signal_taxonomy import signal_codes as build_signal_codes from app.db.universe_audit import ( + get_active_universe_exclusions, get_active_static_exclusions, reason_type_for, record_screening_coverage, @@ -210,6 +211,80 @@ def _write_spot_exchange_info_cache(statuses): pass +def _kline_scan_config(): + cfg = get_screener_section("kline_scan") or {} + return { + "enabled": bool(cfg.get("enabled", True)), + "main_min_volume_usd": float(cfg.get("main_min_volume_usd", MIN_24H_VOLUME_USD) or 0), + "bypass_min_volume_usd": float(cfg.get("bypass_min_volume_usd", 2_000_000) or 0), + "short_tf_min_volume_usd": float(cfg.get("short_tf_min_volume_usd", 5_000_000) or 0), + "short_tf_min_abs_change_pct": float(cfg.get("short_tf_min_abs_change_pct", 1.0) or 0), + "short_tf_high_volume_usd": float(cfg.get("short_tf_high_volume_usd", 20_000_000) or 0), + "emergency_main_max_symbols": max(0, int(cfg.get("emergency_main_max_symbols", 0) or 0)), + "emergency_bypass_max_symbols": max(0, int(cfg.get("emergency_bypass_max_symbols", 0) or 0)), + "emergency_short_tf_max_symbols": max(0, int(cfg.get("emergency_short_tf_max_symbols", 0) or 0)), + "respect_dynamic_universe_cache": bool(cfg.get("respect_dynamic_universe_cache", True)), + } + + +def _is_cached_dynamic_exclusion(symbol: str, info: dict, cached: dict, fallback_min_volume: float = 0) -> bool: + item = (cached or {}).get(str(symbol or "").upper()) + if not item: + return False + reason_type = str(item.get("reason_type") or "").strip() + reason_code = str(item.get("reason_code") or "").strip() + if reason_type == "static": + return True + evidence = item.get("evidence") if isinstance(item.get("evidence"), dict) else {} + current_volume = float((info or {}).get("volume_24h") or 0) + if reason_code == "low_turnover": + min_volume = float(evidence.get("min_volume") or fallback_min_volume or 0) + return min_volume > 0 and current_volume < min_volume + if reason_code == "stale_ticker": + return True + return False + + +def _symbol_priority_score(symbol: str, info: dict, recently_screened: set) -> tuple: + volume = float((info or {}).get("volume_24h") or 0) + change = abs(float((info or {}).get("change_24h") or 0)) + top_gainer = _is_top_gainer_candidate(symbol, info) + return ( + 1 if symbol in recently_screened else 0, + 1 if top_gainer else 0, + min(change, 80), + volume, + symbol, + ) + + +def _rule_based_kline_scan_symbols(tickers: dict, *, recently_screened: set, min_volume: float = 0, emergency_max: int = 0) -> list[str]: + """Select K-line scan universe by rules first; emergency_max is off by default.""" + items = [] + for symbol, info in (tickers or {}).items(): + volume = float(info.get("volume_24h") or 0) + if min_volume and volume < min_volume: + continue + items.append(_symbol_priority_score(symbol, info, recently_screened)) + items.sort(reverse=True) + if emergency_max > 0: + items = items[:emergency_max] + return [symbol for *_, symbol in items] + + +def _should_scan_short_tf(symbol: str, info: dict, *, recently_screened: set, cfg: dict) -> bool: + volume = float((info or {}).get("volume_24h") or 0) + change = abs(float((info or {}).get("change_24h") or 0)) + if volume < float(cfg.get("short_tf_min_volume_usd") or 0): + return False + return ( + symbol in recently_screened + or _is_top_gainer_candidate(symbol, info) + or change >= float(cfg.get("short_tf_min_abs_change_pct") or 0) + or volume >= float(cfg.get("short_tf_high_volume_usd") or 0) + ) + + def _fetch_spot_exchange_statuses(): """Return Binance spot symbol activity status, cached to avoid repeated exchangeInfo calls.""" cached = _read_spot_exchange_info_cache() @@ -997,6 +1072,7 @@ def layer1_coarse_filter(): kline_attempt_symbols = set() h1_success_symbols = set() h4_success_symbols = set() + short_tf_scan_count = 0 # === 24h筛选历史豁免 (v1.6.9) === # 过去24h内在screening_log出现过的币,不受"涨太多"过滤限制 @@ -1011,18 +1087,68 @@ def layer1_coarse_filter(): recently_screened = {r["symbol"] for r in _recent} print(f" 24h已筛选币种: {len(recently_screened)} 只,豁免涨太多过滤") + scan_cfg = _kline_scan_config() + bypass_cfg = get_screener_section("static_accumulation_bypass") + hl_cfg = get_screener_section("higher_lows") + cs_cfg = get_screener_section("compression_surge") + bypass_min_vol = bypass_cfg.get("min_volume_24h", 2000000) + hl_min_vol = hl_cfg.get("min_volume_24h", 2000000) if hl_cfg.get("enabled", True) else float("inf") + cs_min_vol = cs_cfg.get("min_volume_24h", 2000000) if cs_cfg.get("enabled", True) else float("inf") + main_min_vol = max(0.0, float(scan_cfg.get("main_min_volume_usd") or min(MIN_24H_VOLUME_USD, MEME_MIN_24H_VOLUME_USD))) + low_turnover_threshold = min(v for v in [main_min_vol, bypass_min_vol, hl_min_vol, cs_min_vol] if v != float("inf")) + cached_runtime_exclusions = ( + get_active_universe_exclusions(tickers.keys(), reason_types=("dynamic", "transient")) + if scan_cfg.get("respect_dynamic_universe_cache", True) + else {} + ) + cached_runtime_skip_count = 0 + main_scan_symbols = set(_rule_based_kline_scan_symbols( + tickers, + recently_screened=recently_screened, + min_volume=main_min_vol, + emergency_max=scan_cfg["emergency_main_max_symbols"], + )) + bypass_scan_symbols = set(_rule_based_kline_scan_symbols( + tickers, + recently_screened=recently_screened, + min_volume=low_turnover_threshold, + emergency_max=scan_cfg["emergency_bypass_max_symbols"], + )) + print( + f" K线扫描规则: 主扫描{len(main_scan_symbols)}/{len(tickers)}," + f"旁路扫描{len(bypass_scan_symbols)}/{len(tickers)},动态缓存{len(cached_runtime_exclusions)}" + ) + try: - exchange.fapiPublicGetTicker24hr() + futures_24h = exchange.fapiPublicGetTicker24hr() except Exception: futures_24h_map = {} else: futures_24h_map = { item.get("symbol", "").replace("USDT", "/USDT"): item - for item in exchange.fapiPublicGetTicker24hr() + for item in futures_24h if item.get("symbol", "").endswith("USDT") } for symbol, info in tickers.items(): + if symbol not in main_scan_symbols: + continue + if _is_cached_dynamic_exclusion(symbol, info, cached_runtime_exclusions, low_turnover_threshold): + cached_runtime_skip_count += 1 + cached = cached_runtime_exclusions.get(symbol.upper()) or {} + universe_exclusions.append({ + "symbol": symbol, + "base": symbol.split("/")[0], + "price": info.get("price", 0) or 0, + "volume_24h": info.get("volume_24h", 0) or 0, + "change_24h": info.get("change_24h", 0) or 0, + "reason_code": cached.get("reason_code") or "cached_dynamic_exclusion", + "reason_label": cached.get("reason_label") or "动态宇宙过滤缓存", + "reason_type": cached.get("reason_type") or "dynamic", + "cache_hit": True, + }) + excluded_symbols.add(symbol) + continue base = symbol.split("/")[0] vol = info["volume_24h"] change = info["change_24h"] @@ -1105,7 +1231,12 @@ def layer1_coarse_filter(): anomaly_score += max(1, weights.get("静K蓄力", 2)) short_cfg = get_screener_section("short_timeframe_ignition") - if short_cfg.get("enabled", True) and vol >= float(short_cfg.get("min_volume_24h", 5_000_000) or 0): + emergency_short_tf_limit = int(scan_cfg.get("emergency_short_tf_max_symbols") or 0) + allow_short_tf_scan = _should_scan_short_tf(symbol, info, recently_screened=recently_screened, cfg=scan_cfg) + if emergency_short_tf_limit > 0 and short_tf_scan_count >= emergency_short_tf_limit and not _is_top_gainer_candidate(symbol, info): + allow_short_tf_scan = False + if short_cfg.get("enabled", True) and allow_short_tf_scan and vol >= float(short_cfg.get("min_volume_24h", 5_000_000) or 0): + short_tf_scan_count += 1 m15_result = {"found": False, "timeframe": "15m"} m5_result = {"found": False, "timeframe": "5m"} m15_df = fetch_klines(symbol, "15m", limit=120) @@ -1187,24 +1318,33 @@ def layer1_coarse_filter(): } # ==== 第二遍扫描:低成交量静K蓄力旁路 + 底部抬高 + 压缩放量 ==== - bypass_cfg = get_screener_section("static_accumulation_bypass") - bypass_min_vol = bypass_cfg.get("min_volume_24h", 2000000) bypass_min_vol_ratio = bypass_cfg.get("min_vol_ratio", 1.2) bypass_count = 0 hl_count_total = 0 cs_count_total = 0 - # 主门槛:第一遍扫描的最低成交量门槛 - main_min_vol = min(MIN_24H_VOLUME_USD, MEME_MIN_24H_VOLUME_USD) - - hl_cfg = get_screener_section("higher_lows") - cs_cfg = get_screener_section("compression_surge") - hl_min_vol = hl_cfg.get("min_volume_24h", 2000000) if hl_cfg.get("enabled", True) else float("inf") - cs_min_vol = cs_cfg.get("min_volume_24h", 2000000) if cs_cfg.get("enabled", True) else float("inf") - for symbol, info in tickers.items(): if symbol in candidates: continue + if symbol not in bypass_scan_symbols: + continue + if _is_cached_dynamic_exclusion(symbol, info, cached_runtime_exclusions, low_turnover_threshold): + cached_runtime_skip_count += 1 + if symbol not in excluded_symbols: + cached = cached_runtime_exclusions.get(symbol.upper()) or {} + universe_exclusions.append({ + "symbol": symbol, + "base": symbol.split("/")[0], + "price": info.get("price", 0) or 0, + "volume_24h": info.get("volume_24h", 0) or 0, + "change_24h": info.get("change_24h", 0) or 0, + "reason_code": cached.get("reason_code") or "cached_dynamic_exclusion", + "reason_label": cached.get("reason_label") or "动态宇宙过滤缓存", + "reason_type": cached.get("reason_type") or "dynamic", + "cache_hit": True, + }) + excluded_symbols.add(symbol) + continue vol = info["volume_24h"] if vol < bypass_min_vol and vol < hl_min_vol and vol < cs_min_vol: @@ -1438,6 +1578,15 @@ def layer1_coarse_filter(): "kline_h4_success_count": len(h4_success_symbols), "coarse_candidate_count": len(candidates), "top_gainer_discovery_count": top_gainer_count, + "main_kline_min_volume_usd": scan_cfg["main_min_volume_usd"], + "bypass_kline_min_volume_usd": low_turnover_threshold, + "emergency_main_kline_scan_budget": scan_cfg["emergency_main_max_symbols"], + "emergency_bypass_kline_scan_budget": scan_cfg["emergency_bypass_max_symbols"], + "main_kline_scan_pool_count": len(main_scan_symbols), + "bypass_kline_scan_pool_count": len(bypass_scan_symbols), + "short_tf_scan_count": short_tf_scan_count, + "emergency_short_tf_scan_budget": scan_cfg["emergency_short_tf_max_symbols"], + "cached_runtime_skip_count": cached_runtime_skip_count, }, } return candidates diff --git a/rules.yaml b/rules.yaml index ad9b0e1..ed8528e 100644 --- a/rules.yaml +++ b/rules.yaml @@ -52,6 +52,18 @@ pa_engine: max_breakout_age_bars: 1 dy_bear_max: 3 screener: + kline_scan: + enabled: true + main_min_volume_usd: 5000000 + bypass_min_volume_usd: 2000000 + short_tf_min_volume_usd: 5000000 + short_tf_min_abs_change_pct: 1.0 + short_tf_high_volume_usd: 20000000 + emergency_main_max_symbols: 0 + emergency_bypass_max_symbols: 0 + emergency_short_tf_max_symbols: 0 + respect_dynamic_universe_cache: true + note: 每轮仍拉全市场ticker,但K线扫描由成交额、活跃度、最近关注、强势榜和动态宇宙缓存决定;emergency_* 默认0表示不按数量截断,只在交易所限流异常时临时启用。 volume: min_usd: 10000000 meme_min_usd: 5000000 diff --git a/tests/test_screener_optimizations.py b/tests/test_screener_optimizations.py index db70746..0583e1a 100644 --- a/tests/test_screener_optimizations.py +++ b/tests/test_screener_optimizations.py @@ -147,6 +147,48 @@ def test_fetch_spot_24h_tickers_uses_cache_when_dns_fails(monkeypatch, tmp_path) assert tickers["AI/USDT"]["quoteVolume"] == 1234567 +def test_dynamic_low_turnover_cache_only_skips_while_volume_still_low(): + cached = { + "SLOW/USDT": { + "reason_type": "dynamic", + "reason_code": "low_turnover", + "evidence": {"min_volume": 2_000_000}, + } + } + + assert altcoin_screener._is_cached_dynamic_exclusion("SLOW/USDT", {"volume_24h": 900_000}, cached) + assert not altcoin_screener._is_cached_dynamic_exclusion("SLOW/USDT", {"volume_24h": 3_000_000}, cached) + + +def test_kline_scan_selection_is_rule_based_without_default_count_cap(monkeypatch): + monkeypatch.setattr(altcoin_screener, "get_burst_threshold", lambda symbol: 4) + monkeypatch.setattr(altcoin_screener, "is_meme_coin", lambda symbol: False) + tickers = { + "AAA/USDT": {"volume_24h": 20_000_000, "change_24h": 1}, + "HOT/USDT": {"volume_24h": 10_000_000, "change_24h": 8}, + "OLD/USDT": {"volume_24h": 6_000_000, "change_24h": 0.5}, + "LOW/USDT": {"volume_24h": 500_000, "change_24h": 99}, + } + + selected = altcoin_screener._rule_based_kline_scan_symbols( + tickers, + recently_screened={"OLD/USDT"}, + min_volume=5_000_000, + emergency_max=0, + ) + capped = altcoin_screener._rule_based_kline_scan_symbols( + tickers, + recently_screened={"OLD/USDT"}, + min_volume=5_000_000, + emergency_max=2, + ) + + assert "LOW/USDT" not in selected + assert {"AAA/USDT", "HOT/USDT", "OLD/USDT"} <= set(selected) + assert len(selected) == 3 + assert len(capped) == 2 + + def _mock_weights(): return { "量价齐飞": 5,