From a32be277fb01e650eb445e7ce1b59de7e1e49187 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Mon, 25 May 2026 08:53:21 +0800 Subject: [PATCH] 1 --- AGENTS.md | 13 +- app/core/opportunity_funnel.py | 2 + app/core/signal_taxonomy.py | 6 + app/db/analytics.py | 46 +++ app/db/data_export.py | 3 + .../0013_screening_universe_audit.sql | 51 +++ .../0014_short_tf_signal_samples.sql | 25 ++ app/db/short_tf_signals.py | 177 ++++++++++ app/db/universe_audit.py | 208 ++++++++++++ app/services/altcoin_screener.py | 319 +++++++++++++++++- app/web/routes_recommendations.py | 7 + rules.yaml | 19 ++ static/pipeline.html | 25 +- tests/test_screener_optimizations.py | 78 ++++- tests/test_universe_audit.py | 19 ++ 15 files changed, 975 insertions(+), 23 deletions(-) create mode 100644 app/db/migrations/0013_screening_universe_audit.sql create mode 100644 app/db/migrations/0014_short_tf_signal_samples.sql create mode 100644 app/db/short_tf_signals.py create mode 100644 app/db/universe_audit.py create mode 100644 tests/test_universe_audit.py diff --git a/AGENTS.md b/AGENTS.md index cc81788..5558b13 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -84,7 +84,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 1. `app/services/market_overview.py` 采集全市场快照,为行情环境、涨幅榜和市场温度提供数据。 2. `app/services/altcoin_screener.py` - 负责粗筛/细筛,基于 Binance 行情、量价/结构等规则找候选币。 + 负责粗筛/细筛,基于 Binance 行情、量价/结构等规则找候选币;同时写入交易宇宙缓存和覆盖率审计,方便排查“是否扫全、哪里漏掉”。 3. `app/services/altcoin_confirm.py` 负责确认,判断候选是否形成更可执行的机会,并生成入场计划、上下文和推送候选。 4. `app/services/event_driven_screener.py` @@ -177,6 +177,10 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - 旧 `coin_state` 兼容状态写入、active 状态读取、过期状态清理。 - `app/db/screening_queries.py` - 筛选日志写入、细筛历史读取、确认层候选读取。 +- `app/db/universe_audit.py` + - 交易宇宙缓存与筛选覆盖率审计。`symbol_universe_cache` 保存稳定币/封装币/异常交易对/低成交额等过滤结论;`screening_coverage_audit` 保存每轮 Binance USDT 总数、可交易宇宙、缓存命中、K 线成功率、粗筛/细筛数量等覆盖率快照。 +- `app/db/short_tf_signals.py` + - 5m/15m 短周期启动信号的证据采样与复盘读模型。短周期信号先进入 `short_tf_signal_samples`,通过转推荐率、后续收益等数据验证价值,不应拍脑袋直接变成交易动作。 - `app/db/recommendation_state.py` - 推荐状态派生、展示桶、发现层/交易层字段、entry_plan 解析、观察池分层。 - `app/db/recommendation_queries.py` @@ -220,6 +224,9 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - `recommendation` - `screening_log` +- `screening_coverage_audit` +- `symbol_universe_cache` +- `short_tf_signal_samples` - `price_tracking` - `latest_price_cache` - `review_log` @@ -428,7 +435,11 @@ docker compose run --rm alphax-web python scripts/postgres/validate_import.py -- - `price_tracking` 是跟踪流水,不应和 `latest_price_cache` 混为一谈。 - `price_tracker.py` 会为 active 观察池样本更新观察价/PnL,但未触发入场的 watch_pool/wait_pullback 不能触发止盈止损、不能进入 paper trading 收益账本。 - `rec_state` 是发现层状态(如“爆发/加速”),`execution_status`/`trade_stage` 才是交易执行阶段(如 `buy_now`/`wait_pullback`/`observe`),不要把“发现爆发”直接解读成“现在可买”。 +- 每轮粗筛会写 `screening_coverage_audit`,用于确认 `Binance USDT 总数 -> 可交易宇宙 -> K线成功 -> 粗筛候选 -> 细筛通过` 的覆盖漏斗;排查“为什么没有机会/是否漏选”时应先看这张表和 `/pipeline` 的覆盖率指标。 +- `symbol_universe_cache` 只应把静态/半静态问题长期缓存,例如稳定币、封装币、异常交易对、非标准交易对;`low_turnover`、`stale_ticker` 等动态问题只能短 TTL,不能永久拉黑,否则会错过后续流动性改善的币。 - 静K蓄力旁路已要求配置化共振(见 `rules.yaml` 的 `screener.static_accumulation_bypass.require_resonance`),避免单一静K样本淹没确认层;无追高风险的强势榜异动仍可作为发现入口。 +- 粗筛发现层已加入 `screener.short_timeframe_ignition`:15m 用于捕捉 1H 成型前的短周期启动,5m 只在 15m 已启动或已有结构背景时启用;短周期信号只作为早期发现/共振,不应绕过确认层直接买入。 +- 短周期信号会写入 `short_tf_signal_samples`,`/api/screening/short-tf-review` 和 `/pipeline` 的“短周期验证”会展示样本数、转推荐率、当前收益等证据。后续若要把 5m/15m 提升为更强交易触发,必须先基于这张表和历史暴涨样本验证,而不是固定写死。 - 确认评分不再应被理解为固定技术分;确认层通过 `FactorScorer` 读取复盘后的 `signal_performance.weight`,高胜率因子会升权,低胜率/负收益因子会降权或淘汰。 - 评分因子必须保留 `factor_score_breakdown`,否则复盘无法知道一次推荐具体由哪些因子贡献、哪些因子拖累。 - `paper_trader.py` 只应处理可执行推荐,不能把观察池样本当成已成交。 diff --git a/app/core/opportunity_funnel.py b/app/core/opportunity_funnel.py index 3dca5d5..a3fffcb 100644 --- a/app/core/opportunity_funnel.py +++ b/app/core/opportunity_funnel.py @@ -109,6 +109,8 @@ def discovery_source_types(candidate: Dict[str, Any]) -> List[str]: sources.append("cex_top_gainer") if candidate.get("vp_data") or candidate.get("turnover_acceleration_1h") or candidate.get("turnover_acceleration_4h"): sources.append("cex") + if candidate.get("short_tf_ignition"): + sources.append("short_timeframe") if candidate.get("static_accumulation") or candidate.get("higher_lows") or candidate.get("compression_surge"): sources.append("structure") if candidate.get("sentiment") or candidate.get("sentiment_bonus"): diff --git a/app/core/signal_taxonomy.py b/app/core/signal_taxonomy.py index 147b8d1..9434229 100644 --- a/app/core/signal_taxonomy.py +++ b/app/core/signal_taxonomy.py @@ -15,6 +15,9 @@ SIGNAL_CODE_LABELS = { "volume_consecutive_1h": "1H连续放量", "cex_top_gainer_24h": "CEX 24h强势榜异动", "vp_fly_1h_stale": "1H历史量价齐飞", + "short_tf_15m_ignition": "15min短周期启动", + "short_tf_5m_ignition": "5min极早期启动", + "short_tf_resonance": "短周期共振", "volume_divergence_1h": "1H量价背离", "static_accum_4h": "4H静K蓄力", "higher_lows_4h": "4H底部抬高", @@ -55,6 +58,9 @@ _PATTERNS = [ ("cex_top_gainer_24h", ("24h强势榜",)), ("vp_fly_1h_stale", ("历史放量阳线", "历史量价齐飞", "量价齐飞已过期")), ("vp_fly_1h_current", ("量价齐飞", "量价齐飞K")), + ("short_tf_resonance", ("短周期共振", "5m/15m共振")), + ("short_tf_15m_ignition", ("15min短周期启动", "15m短周期启动", "15min 早期启动")), + ("short_tf_5m_ignition", ("5min极早期启动", "5m极早期启动", "5min 早期启动")), ("volume_divergence_1h", ("量价背离", "放量但无量价齐飞")), ("static_accum_4h", ("静K蓄力", "静K旁路")), ("higher_lows_4h", ("底部抬高",)), diff --git a/app/db/analytics.py b/app/db/analytics.py index 09e9e7c..01f473a 100644 --- a/app/db/analytics.py +++ b/app/db/analytics.py @@ -39,6 +39,14 @@ def _loads_json(value, fallback): return fallback +def _coverage_item(row): + if not row: + return {} + item = dict(row) + item["detail_json"] = _loads_json(item.get("detail_json"), {}) + return item + + def _safe_int(value, default=0): try: return int(value or 0) @@ -1179,6 +1187,22 @@ def _select_pipeline_rows(conn, run): """, (run_finished, end_text), ).fetchall() + run_summary = _loads_json(run.get("summary_json"), {}) + coverage = None + coverage_id = _safe_int(run_summary.get("coverage_audit_id")) + if coverage_id: + coverage = conn.execute("SELECT * FROM screening_coverage_audit WHERE id=%s", (coverage_id,)).fetchone() + if not coverage: + coverage = conn.execute( + """ + SELECT * + FROM screening_coverage_audit + WHERE scan_started_at >= %s AND scan_started_at <= %s + ORDER BY scan_started_at ASC, id ASC + LIMIT 1 + """, + (run_started, run_finished), + ).fetchone() return { "window_start": start_text, "window_end": end_text, @@ -1187,6 +1211,7 @@ def _select_pipeline_rows(conn, run): "recommendation_rows": [_recommendation_item(row) for row in rec_rows], "review_rows": [_review_item(row) for row in reviews], "missed_rows": _dedupe_missed_rows(missed_rows), + "coverage": _coverage_item(coverage), } @@ -1225,6 +1250,7 @@ def _pipeline_summary_for_run(run, related): if not fine_qualified: fine_qualified = quality_pass_count + coverage = related.get("coverage") or {} recommendations = len(related["recommendation_rows"]) hit_rate = round(recommendations / fine_qualified * 100, 1) if fine_qualified else 0 issue_notes = [] @@ -1262,6 +1288,16 @@ def _pipeline_summary_for_run(run, related): "confirm_hits": confirm_hits, "recommendations": recommendations, "universe_gate_count": universe_gate_count, + "coverage_audit_id": coverage.get("id") or 0, + "raw_ticker_count": _safe_int(coverage.get("raw_ticker_count")), + "usdt_pair_count": _safe_int(coverage.get("usdt_pair_count")), + "tradable_universe_count": _safe_int(coverage.get("tradable_universe_count")), + "cached_exclusion_count": _safe_int(coverage.get("cached_exclusion_count")), + "kline_attempt_count": _safe_int(coverage.get("kline_attempt_count")), + "kline_h1_success_count": _safe_int(coverage.get("kline_h1_success_count")), + "kline_h4_success_count": _safe_int(coverage.get("kline_h4_success_count")), + "low_turnover_count": _safe_int(coverage.get("low_turnover_count")), + "stale_ticker_count": _safe_int(coverage.get("stale_ticker_count")), "discovery_count": discovery_count, "quality_pass_count": quality_pass_count, "quality_reject_count": quality_reject_count, @@ -1340,6 +1376,13 @@ def get_pipeline_runs(limit=30, hours=24, offset=0): kpi = { "hours": hours, "run_count": len(all_summaries), + "raw_ticker_count": sum(item.get("raw_ticker_count", 0) for item in all_summaries), + "usdt_pair_count": sum(item.get("usdt_pair_count", 0) for item in all_summaries), + "tradable_universe_count": sum(item.get("tradable_universe_count", 0) for item in all_summaries), + "cached_exclusion_count": sum(item.get("cached_exclusion_count", 0) for item in all_summaries), + "kline_attempt_count": sum(item.get("kline_attempt_count", 0) for item in all_summaries), + "kline_h1_success_count": sum(item.get("kline_h1_success_count", 0) for item in all_summaries), + "kline_h4_success_count": sum(item.get("kline_h4_success_count", 0) for item in all_summaries), "universe_gate_count": sum(item.get("universe_gate_count", 0) for item in all_summaries), "discovery_count": sum(item.get("discovery_count", 0) for item in all_summaries), "rough_candidates": sum(item["rough_candidates"] for item in all_summaries), @@ -1357,6 +1400,8 @@ def get_pipeline_runs(limit=30, hours=24, offset=0): } kpi["recommendation_rate"] = round(kpi["recommendations"] / kpi["fine_qualified"] * 100, 1) if kpi["fine_qualified"] else 0 kpi["performance_hit_rate"] = round(kpi["perf_success"] / (kpi["perf_success"] + kpi["perf_failed"]) * 100, 1) if (kpi["perf_success"] + kpi["perf_failed"]) else 0 + kpi["kline_h1_success_rate"] = round(kpi["kline_h1_success_count"] / kpi["kline_attempt_count"] * 100, 1) if kpi["kline_attempt_count"] else 0 + kpi["kline_h4_success_rate"] = round(kpi["kline_h4_success_count"] / kpi["kline_attempt_count"] * 100, 1) if kpi["kline_attempt_count"] else 0 total_pages = (total_count + limit - 1) // limit if total_count else 0 current_page = (offset // limit) + 1 if total_count else 0 return { @@ -1447,6 +1492,7 @@ def get_pipeline_run_detail(run_id): return { "summary": summary, + "coverage": related.get("coverage") or {}, "timeline": timeline, "stage_counts": stage_counts, "screening_items": screening_items, diff --git a/app/db/data_export.py b/app/db/data_export.py index d1c5d8f..c5602c6 100644 --- a/app/db/data_export.py +++ b/app/db/data_export.py @@ -14,6 +14,8 @@ from app.db.schema import get_conn RECENT_TABLES = { "recommendation": ("rec_time", 5000, "recommendations and lifecycle state"), "screening_log": ("scan_time", 10000, "screening funnel rows"), + "screening_coverage_audit": ("scan_started_at", 2000, "screening universe coverage audit"), + "short_tf_signal_samples": ("signal_time", 5000, "short-timeframe signal samples"), "coin_state": ("detected_at", 5000, "latest detected coin states"), "price_tracking": ("track_time", 10000, "recommendation tracking samples"), "paper_orders": ("created_at", 5000, "pending/filled/canceled order simulation"), @@ -37,6 +39,7 @@ SNAPSHOT_TABLES = { "strategy_runtime_config": (1000, "strategy runtime config snapshot"), "system_config": (1000, "system runtime config snapshot"), "scheduler_job_config": (200, "scheduler config snapshot"), + "symbol_universe_cache": (20000, "cached universe filter decisions"), } diff --git a/app/db/migrations/0013_screening_universe_audit.sql b/app/db/migrations/0013_screening_universe_audit.sql new file mode 100644 index 0000000..2afe4d9 --- /dev/null +++ b/app/db/migrations/0013_screening_universe_audit.sql @@ -0,0 +1,51 @@ +CREATE TABLE IF NOT EXISTS symbol_universe_cache ( + symbol TEXT PRIMARY KEY, + base TEXT NOT NULL, + quote TEXT NOT NULL DEFAULT 'USDT', + decision TEXT NOT NULL DEFAULT 'excluded', + reason_code TEXT NOT NULL DEFAULT '', + reason_label TEXT NOT NULL DEFAULT '', + reason_type TEXT NOT NULL DEFAULT 'dynamic', + source TEXT NOT NULL DEFAULT 'screener', + evidence_json TEXT DEFAULT '{}', + first_seen_at TEXT NOT NULL, + last_seen_at TEXT NOT NULL, + expires_at TEXT DEFAULT '', + hit_count INTEGER NOT NULL DEFAULT 0, + manual_override INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS idx_symbol_universe_cache_decision_expires + ON symbol_universe_cache(decision, expires_at); +CREATE INDEX IF NOT EXISTS idx_symbol_universe_cache_reason + ON symbol_universe_cache(reason_code, last_seen_at DESC); + +CREATE TABLE IF NOT EXISTS screening_coverage_audit ( + id BIGSERIAL PRIMARY KEY, + scan_started_at TEXT NOT NULL, + scan_finished_at TEXT NOT NULL, + source TEXT NOT NULL DEFAULT 'binance_spot_usdt_market', + status TEXT NOT NULL DEFAULT 'completed', + raw_ticker_count INTEGER NOT NULL DEFAULT 0, + usdt_pair_count INTEGER NOT NULL DEFAULT 0, + tradable_universe_count INTEGER NOT NULL DEFAULT 0, + cached_exclusion_count INTEGER NOT NULL DEFAULT 0, + universe_gate_count INTEGER NOT NULL DEFAULT 0, + static_exclusion_count INTEGER NOT NULL DEFAULT 0, + dynamic_exclusion_count INTEGER NOT NULL DEFAULT 0, + low_turnover_count INTEGER NOT NULL DEFAULT 0, + stale_ticker_count INTEGER NOT NULL DEFAULT 0, + kline_attempt_count INTEGER NOT NULL DEFAULT 0, + kline_h1_success_count INTEGER NOT NULL DEFAULT 0, + kline_h4_success_count INTEGER NOT NULL DEFAULT 0, + coarse_candidate_count INTEGER NOT NULL DEFAULT 0, + fine_qualified_count INTEGER NOT NULL DEFAULT 0, + quality_rejected_count INTEGER NOT NULL DEFAULT 0, + top_gainer_discovery_count INTEGER NOT NULL DEFAULT 0, + detail_json TEXT DEFAULT '{}' +); + +CREATE INDEX IF NOT EXISTS idx_screening_coverage_audit_started + ON screening_coverage_audit(scan_started_at DESC); +CREATE INDEX IF NOT EXISTS idx_screening_coverage_audit_status + ON screening_coverage_audit(status, scan_started_at DESC); diff --git a/app/db/migrations/0014_short_tf_signal_samples.sql b/app/db/migrations/0014_short_tf_signal_samples.sql new file mode 100644 index 0000000..aa54225 --- /dev/null +++ b/app/db/migrations/0014_short_tf_signal_samples.sql @@ -0,0 +1,25 @@ +CREATE TABLE IF NOT EXISTS short_tf_signal_samples ( + id BIGSERIAL PRIMARY KEY, + signal_time TEXT NOT NULL, + symbol TEXT NOT NULL, + timeframe TEXT NOT NULL, + signal_code TEXT NOT NULL, + signal_label TEXT NOT NULL, + entry_price DOUBLE PRECISION NOT NULL, + volume_24h DOUBLE PRECISION DEFAULT 0, + change_24h DOUBLE PRECISION DEFAULT 0, + gain_pct DOUBLE PRECISION DEFAULT 0, + vol_ratio DOUBLE PRECISION DEFAULT 0, + body_ratio DOUBLE PRECISION DEFAULT 0, + age_bars INTEGER DEFAULT 0, + resonance INTEGER NOT NULL DEFAULT 0, + context_json TEXT DEFAULT '{}', + review_status TEXT NOT NULL DEFAULT 'pending', + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_short_tf_samples_time ON short_tf_signal_samples(signal_time DESC); +CREATE INDEX IF NOT EXISTS idx_short_tf_samples_symbol_time ON short_tf_signal_samples(symbol, signal_time DESC); +CREATE INDEX IF NOT EXISTS idx_short_tf_samples_code_time ON short_tf_signal_samples(signal_code, signal_time DESC); +CREATE UNIQUE INDEX IF NOT EXISTS idx_short_tf_samples_dedupe + ON short_tf_signal_samples(symbol, timeframe, signal_code, signal_time); diff --git a/app/db/short_tf_signals.py b/app/db/short_tf_signals.py new file mode 100644 index 0000000..f098ccd --- /dev/null +++ b/app/db/short_tf_signals.py @@ -0,0 +1,177 @@ +"""Short-timeframe signal sampling and review queries.""" + +from __future__ import annotations + +import json +from datetime import datetime, timedelta + +from app.db.postgres_connection import ensure_migrations_once +from app.db.schema import get_conn + + +def _json(data) -> str: + return json.dumps(data or {}, ensure_ascii=False) + + +def _loads(value, fallback): + try: + if isinstance(value, str) and value.strip(): + return json.loads(value) + if value: + return value + except Exception: + pass + return fallback + + +def record_short_tf_samples(symbol: str, candidate: dict) -> int: + """Persist short-timeframe discovery samples for later evidence-based review.""" + short_tf = (candidate or {}).get("short_tf_ignition") or {} + signals = [dict(x or {}) for x in short_tf.get("signals", []) if (x or {}).get("found")] + if not signals: + return 0 + ensure_migrations_once() + now_dt = datetime.now() + # Scheduler may retry within the same minute; keep one sample per symbol/timeframe/minute. + signal_time = now_dt.replace(second=0, microsecond=0).isoformat(timespec="seconds") + created_at = now_dt.isoformat(timespec="seconds") + symbol = str(symbol or "").upper().strip() + conn = get_conn() + count = 0 + for item in signals: + tf = str(item.get("timeframe") or "").strip() + code = "short_tf_15m_ignition" if tf == "15m" else "short_tf_5m_ignition" if tf == "5m" else "short_tf_ignition" + trigger = item.get("trigger") or {} + context = { + "short_tf_ignition": short_tf, + "anomalies": candidate.get("anomalies") or [], + "signal_recency": candidate.get("signal_recency") or {}, + "top_gainer_24h": bool(candidate.get("top_gainer_24h")), + "static_accumulation": candidate.get("static_accumulation") or {}, + "higher_lows": candidate.get("higher_lows") or {}, + "compression_surge": candidate.get("compression_surge") or {}, + } + cur = conn.execute( + """ + INSERT INTO short_tf_signal_samples ( + signal_time, symbol, timeframe, signal_code, signal_label, + entry_price, volume_24h, change_24h, gain_pct, vol_ratio, + body_ratio, age_bars, resonance, context_json, review_status, created_at + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'pending', %s) + ON CONFLICT(symbol, timeframe, signal_code, signal_time) DO NOTHING + """, + ( + signal_time, + symbol, + tf, + code, + item.get("signal") or code, + float(candidate.get("price") or trigger.get("price") or 0), + float(candidate.get("volume_24h") or 0), + float(candidate.get("change_24h") or 0), + float(item.get("gain_pct") or 0), + float(trigger.get("vol_ratio") or 0), + float(trigger.get("body_ratio") or 0), + int(trigger.get("age_bars") or 0), + 1 if short_tf.get("resonance") else 0, + _json(context), + created_at, + ), + ) + if getattr(cur, "rowcount", 0) > 0: + count += 1 + conn.commit() + conn.close() + return count + + +def get_short_tf_signal_review(hours: int = 168, limit: int = 200) -> dict: + """Return performance-style read model for short-timeframe samples.""" + ensure_migrations_once() + try: + hours = max(1, min(int(hours or 168), 24 * 90)) + except Exception: + hours = 168 + try: + limit = max(1, min(int(limit or 200), 1000)) + except Exception: + limit = 200 + since = (datetime.now() - timedelta(hours=hours)).isoformat(timespec="seconds") + conn = get_conn() + rows = conn.execute( + """ + SELECT s.*, + lpc.price AS latest_price, + lpc.updated_at AS latest_price_at, + r.id AS recommendation_id, + r.execution_status, + r.action_status, + r.display_bucket, + r.rec_time + FROM short_tf_signal_samples s + LEFT JOIN latest_price_cache lpc ON lpc.symbol = s.symbol + LEFT JOIN LATERAL ( + SELECT id, execution_status, action_status, display_bucket, rec_time + FROM recommendation + WHERE symbol = s.symbol AND rec_time >= s.signal_time + ORDER BY rec_time ASC, id ASC + LIMIT 1 + ) r ON TRUE + WHERE s.signal_time >= %s + ORDER BY s.signal_time DESC, s.id DESC + LIMIT %s + """, + (since, limit), + ).fetchall() + conn.close() + items = [] + by_code: dict[str, dict] = {} + for row in rows: + item = dict(row) + item["context_json"] = _loads(item.get("context_json"), {}) + entry = float(item.get("entry_price") or 0) + latest = float(item.get("latest_price") or 0) + item["latest_return_pct"] = round((latest / entry - 1) * 100, 2) if entry > 0 and latest > 0 else 0 + item["converted_to_recommendation"] = bool(item.get("recommendation_id")) + items.append(item) + + code = item.get("signal_code") or "unknown" + bucket = by_code.setdefault(code, {"signal_code": code, "count": 0, "wins": 0, "losses": 0, "converted": 0, "total_return": 0.0}) + bucket["count"] += 1 + bucket["converted"] += 1 if item["converted_to_recommendation"] else 0 + bucket["total_return"] += item["latest_return_pct"] + if item["latest_return_pct"] >= 2: + bucket["wins"] += 1 + elif item["latest_return_pct"] <= -2: + bucket["losses"] += 1 + + summary = [] + total_return = 0.0 + converted_count = 0 + for bucket in by_code.values(): + count = bucket["count"] + total_return += bucket["total_return"] + converted_count += bucket["converted"] + summary.append({ + **bucket, + "signal_label": { + "short_tf_15m_ignition": "15m 短周期启动", + "short_tf_5m_ignition": "5m 极早期启动", + "short_tf_ignition": "短周期启动", + }.get(bucket["signal_code"], bucket["signal_code"]), + "avg_return_pct": round(bucket["total_return"] / count, 2) if count else 0, + "win_rate": round(bucket["wins"] / count * 100, 1) if count else 0, + "conversion_rate": round(bucket["converted"] / count * 100, 1) if count else 0, + }) + summary.sort(key=lambda x: (x["count"], x["avg_return_pct"]), reverse=True) + total_count = len(items) + return { + "hours": hours, + "total_samples": total_count, + "converted_count": converted_count, + "avg_return_pct": round(total_return / total_count, 2) if total_count else 0, + "summary": summary, + "items": items, + "note": "短周期信号只做发现证据采样,不直接触发交易动作;是否提权需看后续转推荐率和收益表现。", + } diff --git a/app/db/universe_audit.py b/app/db/universe_audit.py new file mode 100644 index 0000000..3d553cc --- /dev/null +++ b/app/db/universe_audit.py @@ -0,0 +1,208 @@ +"""Universe cache and screening coverage audit queries.""" + +from __future__ import annotations + +import json +from datetime import datetime, timedelta +from typing import Iterable + +from app.db.postgres_connection import ensure_migrations_once +from app.db.schema import get_conn + +STATIC_REASON_CODES = {"stablecoin", "wrapped", "excluded_base", "invalid_pair", "non_ascii", "inactive_market"} +TRANSIENT_REASON_CODES = {"stale_ticker"} +DYNAMIC_REASON_CODES = {"low_turnover"} + + +def _now() -> datetime: + return datetime.now() + + +def _iso(value: datetime | None = None) -> str: + return (value or _now()).isoformat(timespec="seconds") + + +def reason_type_for(code: str) -> str: + code = str(code or "").strip() + if code in STATIC_REASON_CODES: + return "static" + if code in TRANSIENT_REASON_CODES: + return "transient" + if code in DYNAMIC_REASON_CODES: + return "dynamic" + return "dynamic" + + +def expires_at_for(reason_type: str, now: datetime | None = None) -> str: + base = now or _now() + if reason_type == "static": + return (base + timedelta(days=90)).isoformat(timespec="seconds") + if reason_type == "transient": + return (base + timedelta(hours=1)).isoformat(timespec="seconds") + return (base + timedelta(hours=6)).isoformat(timespec="seconds") + + +def _json(data) -> str: + return json.dumps(data or {}, ensure_ascii=False) + + +def get_active_static_exclusions(symbols: Iterable[str]) -> dict[str, dict]: + """Return cached long-lived exclusions for current Binance symbols.""" + symbol_list = [str(s or "").upper().strip() for s in symbols if str(s or "").strip()] + if not symbol_list: + return {} + ensure_migrations_once() + now = _iso() + placeholders = ",".join(["%s"] * len(symbol_list)) + conn = get_conn() + rows = conn.execute( + f""" + SELECT * + FROM symbol_universe_cache + WHERE symbol IN ({placeholders}) + AND decision='excluded' + AND reason_type IN ('static') + AND manual_override=0 + AND (expires_at='' OR expires_at >= %s) + """, + tuple(symbol_list) + (now,), + ).fetchall() + conn.close() + return {row["symbol"]: dict(row) for row in rows} + + +def record_universe_decisions(items: Iterable[dict], *, source: str = "screener") -> int: + """Upsert universe filter decisions for later scans and audit.""" + rows = [dict(item or {}) for item in items if (item or {}).get("symbol")] + if not rows: + return 0 + ensure_migrations_once() + now_dt = _now() + now = _iso(now_dt) + conn = get_conn() + count = 0 + for item in rows: + symbol = str(item.get("symbol") or "").upper().strip() + base = str(item.get("base") or symbol.split("/")[0]).upper().strip() + reason_code = str(item.get("reason_code") or "").strip() + reason_label = str(item.get("reason_label") or reason_code or "宇宙过滤").strip() + rtype = str(item.get("reason_type") or reason_type_for(reason_code)).strip() + expires_at = str(item.get("expires_at") or expires_at_for(rtype, now_dt)).strip() + evidence = { + "price": item.get("price", 0), + "volume_24h": item.get("volume_24h", 0), + "change_24h": item.get("change_24h", 0), + "cache_hit": bool(item.get("cache_hit")), + "min_volume": item.get("min_volume", 0), + } + conn.execute( + """ + INSERT INTO symbol_universe_cache ( + symbol, base, quote, decision, reason_code, reason_label, reason_type, + source, evidence_json, first_seen_at, last_seen_at, expires_at, hit_count, manual_override + ) + VALUES (%s, %s, 'USDT', 'excluded', %s, %s, %s, %s, %s, %s, %s, %s, 1, 0) + ON CONFLICT(symbol) DO UPDATE SET + base=EXCLUDED.base, + decision=EXCLUDED.decision, + reason_code=EXCLUDED.reason_code, + reason_label=EXCLUDED.reason_label, + reason_type=EXCLUDED.reason_type, + source=EXCLUDED.source, + evidence_json=EXCLUDED.evidence_json, + last_seen_at=EXCLUDED.last_seen_at, + expires_at=EXCLUDED.expires_at, + hit_count=symbol_universe_cache.hit_count + 1 + """, + (symbol, base, reason_code, reason_label, rtype, source, _json(evidence), now, now, expires_at), + ) + count += 1 + conn.commit() + conn.close() + return count + + +def record_screening_coverage(metrics: dict) -> int: + """Persist one coverage snapshot for a screener run.""" + ensure_migrations_once() + data = dict(metrics or {}) + now = _iso() + started = str(data.get("scan_started_at") or now) + finished = str(data.get("scan_finished_at") or now) + detail = data.get("detail") or {} + conn = get_conn() + row = conn.execute( + """ + INSERT INTO screening_coverage_audit ( + scan_started_at, scan_finished_at, source, status, + raw_ticker_count, usdt_pair_count, tradable_universe_count, + cached_exclusion_count, universe_gate_count, static_exclusion_count, + dynamic_exclusion_count, low_turnover_count, stale_ticker_count, + kline_attempt_count, kline_h1_success_count, kline_h4_success_count, + coarse_candidate_count, fine_qualified_count, quality_rejected_count, + top_gainer_discovery_count, detail_json + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + RETURNING id + """, + ( + started, + finished, + str(data.get("source") or "binance_spot_usdt_market"), + str(data.get("status") or "completed"), + int(data.get("raw_ticker_count") or 0), + int(data.get("usdt_pair_count") or 0), + int(data.get("tradable_universe_count") or 0), + int(data.get("cached_exclusion_count") or 0), + int(data.get("universe_gate_count") or 0), + int(data.get("static_exclusion_count") or 0), + int(data.get("dynamic_exclusion_count") or 0), + int(data.get("low_turnover_count") or 0), + int(data.get("stale_ticker_count") or 0), + int(data.get("kline_attempt_count") or 0), + int(data.get("kline_h1_success_count") or 0), + int(data.get("kline_h4_success_count") or 0), + int(data.get("coarse_candidate_count") or 0), + int(data.get("fine_qualified_count") or 0), + int(data.get("quality_rejected_count") or 0), + int(data.get("top_gainer_discovery_count") or 0), + _json(detail), + ), + ).fetchone() + conn.commit() + conn.close() + return int(row["id"] if row else 0) + + +def list_screening_coverage(hours: int = 24, limit: int = 50) -> list[dict]: + ensure_migrations_once() + try: + hours = max(1, min(int(hours or 24), 24 * 30)) + except Exception: + hours = 24 + try: + limit = max(1, min(int(limit or 50), 200)) + except Exception: + limit = 50 + since = (_now() - timedelta(hours=hours)).isoformat(timespec="seconds") + conn = get_conn() + rows = conn.execute( + """ + SELECT * + FROM screening_coverage_audit + WHERE scan_started_at >= %s + ORDER BY scan_started_at DESC, id DESC + LIMIT %s + """, + (since, limit), + ).fetchall() + conn.close() + result = [] + for row in rows: + item = dict(row) + try: + item["detail_json"] = json.loads(item.get("detail_json") or "{}") + except Exception: + item["detail_json"] = {} + result.append(item) + return result diff --git a/app/services/altcoin_screener.py b/app/services/altcoin_screener.py index 7e40cd4..6e61ec7 100644 --- a/app/services/altcoin_screener.py +++ b/app/services/altcoin_screener.py @@ -59,10 +59,18 @@ from app.core.opportunity_funnel import ( universe_gate_reason, ) from app.core.signal_taxonomy import signal_codes as build_signal_codes +from app.db.universe_audit import ( + get_active_static_exclusions, + reason_type_for, + record_screening_coverage, + record_universe_decisions, +) +from app.db.short_tf_signals import record_short_tf_samples exchange = ccxt.binance({"enableRateLimit": True}) REPO_ROOT = Path(__file__).resolve().parents[2] BINANCE_SPOT_BASE_URL = os.getenv("ALPHAX_BINANCE_SPOT_BASE_URL", "https://api.binance.com").rstrip("/") +EXCHANGE_CACHE_DIR = REPO_ROOT / "data" / "exchange_cache" # ==================== 排除列表 ==================== STABLECOINS = { @@ -100,10 +108,13 @@ def _fetch_spot_24h_tickers(): if not isinstance(data, list): return {} tickers = {} + raw_count = len(data) + usdt_count = 0 for item in data: raw_symbol = str(item.get("symbol") or "") if not raw_symbol.endswith("USDT"): continue + usdt_count += 1 base = raw_symbol[:-4] if not base: continue @@ -121,38 +132,126 @@ def _fetch_spot_24h_tickers(): "low": float(item.get("lowPrice") or 0), "datetime": ticker_dt, } + _fetch_spot_24h_tickers.last_raw_count = raw_count + _fetch_spot_24h_tickers.last_usdt_count = usdt_count return tickers +def _spot_exchange_info_cache_path(): + return EXCHANGE_CACHE_DIR / "binance_spot_exchange_info.json" + + +def _read_spot_exchange_info_cache(max_age_seconds=86400): + path = _spot_exchange_info_cache_path() + try: + if not path.exists(): + return None + payload = json.loads(path.read_text(encoding="utf-8")) + fetched_at = float(payload.get("fetched_at") or 0) + if max_age_seconds and time.time() - fetched_at > max_age_seconds: + return None + return payload.get("statuses") or {} + except Exception: + return None + + +def _write_spot_exchange_info_cache(statuses): + try: + EXCHANGE_CACHE_DIR.mkdir(parents=True, exist_ok=True) + _spot_exchange_info_cache_path().write_text( + json.dumps({"fetched_at": time.time(), "statuses": statuses or {}}, ensure_ascii=False), + encoding="utf-8", + ) + except Exception: + pass + + +def _fetch_spot_exchange_statuses(): + """Return Binance spot symbol activity status, cached to avoid repeated exchangeInfo calls.""" + cached = _read_spot_exchange_info_cache() + if cached is not None: + return cached + try: + resp = requests.get(f"{BINANCE_SPOT_BASE_URL}/api/v3/exchangeInfo", timeout=15) + resp.raise_for_status() + data = resp.json() + statuses = {} + for item in data.get("symbols", []) if isinstance(data, dict) else []: + raw = str(item.get("symbol") or "").upper() + if not raw.endswith("USDT"): + continue + base = raw[:-4] + if not base: + continue + statuses[f"{base}/USDT"] = { + "status": str(item.get("status") or "").upper(), + "isSpotTradingAllowed": bool(item.get("isSpotTradingAllowed", True)), + } + if statuses: + _write_spot_exchange_info_cache(statuses) + return statuses + except Exception: + # If Binance is unavailable, a stale cache is still safer than hammering exchangeInfo. + return _read_spot_exchange_info_cache(max_age_seconds=0) or {} + + def fetch_all_tickers(): tickers = _fetch_spot_24h_tickers() usdt_pairs = {} universe_exclusions = [] + cached_exclusions = get_active_static_exclusions(tickers.keys()) + exchange_statuses = _fetch_spot_exchange_statuses() for symbol, info in tickers.items(): if "/USDT" in symbol: base = symbol.split("/")[0] vol_usd = info.get("quoteVolume", 0) or 0 + status_info = exchange_statuses.get(symbol.upper()) if exchange_statuses else None + if status_info and (status_info.get("status") != "TRADING" or not status_info.get("isSpotTradingAllowed", True)): + universe_exclusions.append({ + "symbol": symbol, + "base": base, + "price": info.get("last", 0) or 0, + "volume_24h": vol_usd, + "reason_code": "inactive_market", + "reason_label": "交易对已停用/不可现货交易", + "reason_type": "static", + "exchange_status": status_info.get("status") or "", + }) + continue + cached = cached_exclusions.get(symbol.upper()) + if cached: + universe_exclusions.append({ + "symbol": symbol, + "base": base, + "price": info.get("last", 0) or 0, + "volume_24h": vol_usd, + "reason_code": cached.get("reason_code") or "cached_exclusion", + "reason_label": cached.get("reason_label") or "缓存宇宙过滤", + "reason_type": cached.get("reason_type") or "static", + "cache_hit": True, + }) + continue ticker_dt = info.get("datetime") if ticker_dt: try: ticker_time = datetime.fromisoformat(str(ticker_dt).replace("Z", "+00:00")).replace(tzinfo=None) if datetime.utcnow() - ticker_time > timedelta(hours=36): - universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "stale_ticker", "reason_label": "行情数据过旧"}) + universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "stale_ticker", "reason_label": "行情数据过旧", "reason_type": "transient"}) continue except Exception: pass if base in STABLECOINS or base in WRAPPED or base in GOLD_METAL: reason = universe_gate_reason(base, vol_usd, 0, symbol=symbol) or {"reason_code": "excluded_base", "reason_label": "排除基础资产"} - universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, **reason}) + universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_type": reason_type_for(reason.get("reason_code")), **reason}) continue if base in EXCLUDED_BASES: - universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "invalid_pair", "reason_label": "交易对异常"}) + universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "invalid_pair", "reason_label": "交易对异常", "reason_type": "static"}) continue if base.endswith(EXCLUDED_BASE_SUFFIXES): - universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "invalid_pair", "reason_label": "交易对异常"}) + universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "invalid_pair", "reason_label": "交易对异常", "reason_type": "static"}) continue if not base.isascii(): - universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "non_ascii", "reason_label": "非标准交易对"}) + universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "non_ascii", "reason_label": "非标准交易对", "reason_type": "static"}) continue usdt_pairs[symbol] = { "price": info.get("last", 0), @@ -162,6 +261,14 @@ def fetch_all_tickers(): "low_24h": info.get("low", 0), } fetch_all_tickers.last_universe_exclusions = universe_exclusions + fetch_all_tickers.last_coverage_meta = { + "raw_ticker_count": int(getattr(_fetch_spot_24h_tickers, "last_raw_count", 0) or 0), + "usdt_pair_count": int(getattr(_fetch_spot_24h_tickers, "last_usdt_count", 0) or 0), + "tradable_universe_count": len(usdt_pairs), + "cached_exclusion_count": sum(1 for item in universe_exclusions if item.get("cache_hit")), + "pre_volume_universe_gate_count": len(universe_exclusions), + } + record_universe_decisions(universe_exclusions) return usdt_pairs @@ -600,6 +707,84 @@ def detect_compression_surge(df, cfg=None): } +def detect_short_timeframe_ignition(df, timeframe="15m", cfg=None): + """短周期早期启动探针。 + + 只用于发现层预警,避免山寨币在 1H 成型前已经走完第一段。 + 返回的信号需要后续确认层继续用 15m/30m/1H/4H/1D 决定是否可交易。 + """ + cfg = dict(cfg or {}) + if df is None or len(df) < 30 or not cfg: + return {"found": False, "timeframe": timeframe} + + recent_bars = int(cfg.get("recent_bars", 8) or 8) + max_age = int(cfg.get("max_trigger_age_bars", 2) or 2) + min_vol_ratio = float(cfg.get("min_vol_ratio", 2.5) or 2.5) + min_body_ratio = float(cfg.get("min_body_ratio", 0.45) or 0.45) + min_gain_pct = float(cfg.get("min_gain_pct", 0.8) or 0.8) + + recent = df.tail(max(recent_bars, max_age + 1)).copy() + if len(recent) < max_age + 1: + return {"found": False, "timeframe": timeframe} + + prior = df.iloc[:-len(recent)] if len(df) > len(recent) else df.iloc[:-max_age] + avg_vol = float(prior["volume"].tail(48).mean()) if len(prior) else float(df["volume"].tail(30).mean()) + if avg_vol <= 0: + return {"found": False, "timeframe": timeframe} + + first_close = float(recent["close"].iloc[0]) + last_close = float(recent["close"].iloc[-1]) + gain_pct = (last_close - first_close) / first_close * 100 if first_close > 0 else 0.0 + + trigger = None + for offset in range(0, min(max_age + 1, len(recent))): + row = recent.iloc[-1 - offset] + candle_range = float(row["high"] - row["low"]) + body = float(row["close"] - row["open"]) + body_ratio = abs(body) / (candle_range + 1e-9) + vol_ratio = float(row["volume"]) / avg_vol + close_pos = (float(row["close"]) - float(row["low"])) / (candle_range + 1e-9) + if body > 0 and body_ratio >= min_body_ratio and vol_ratio >= min_vol_ratio and close_pos >= 0.6: + trigger = { + "age_bars": offset, + "vol_ratio": round(vol_ratio, 2), + "body_ratio": round(body_ratio, 2), + "close_pos": round(close_pos, 2), + "price": round(float(row["close"]), 8), + } + break + + if not trigger or gain_pct < min_gain_pct: + return { + "found": False, + "timeframe": timeframe, + "gain_pct": round(gain_pct, 2), + "max_vol_ratio": round(max(float(v) / avg_vol for v in recent["volume"]), 2), + } + + bullish_dynamic = 0 + bearish_dynamic = 0 + for _, row in recent.iterrows(): + candle_range = float(row["high"] - row["low"]) + body = float(row["close"] - row["open"]) + body_ratio = abs(body) / (candle_range + 1e-9) + vol_ratio = float(row["volume"]) / avg_vol + if body > 0 and body_ratio >= min_body_ratio and vol_ratio >= min_vol_ratio * 0.7: + bullish_dynamic += 1 + elif body < 0 and body_ratio >= min_body_ratio and vol_ratio >= min_vol_ratio * 0.7: + bearish_dynamic += 1 + + return { + "found": True, + "timeframe": timeframe, + "gain_pct": round(gain_pct, 2), + "trigger": trigger, + "bullish_dynamic_count": bullish_dynamic, + "bearish_dynamic_count": bearish_dynamic, + "signal": f"{timeframe}短周期启动(量{trigger['vol_ratio']}x,涨{gain_pct:.1f}%)", + } + + def _build_signal_recency(cand): """把粗筛/细筛命中的信号按 current/stale 标记,避免旧形态冒充当下机会。""" current = [] @@ -622,6 +807,15 @@ def _build_signal_recency(cand): current.append({"type": "structure", "label": "当前4H底部抬高", "timeframe": "4h"}) if cand.get("compression_surge"): current.append({"type": "structure", "label": "当前4H压缩放量", "timeframe": "4h"}) + short_tf = cand.get("short_tf_ignition") or {} + for item in short_tf.get("signals", []): + if item.get("found"): + current.append({ + "type": "short_timeframe", + "label": item.get("signal") or f"{item.get('timeframe')}短周期启动", + "timeframe": item.get("timeframe"), + "gain_pct": item.get("gain_pct", 0), + }) if cand.get("sentiment") or cand.get("sentiment_bonus"): current.append({"type": "sentiment", "label": "舆情共振", "source": "sentiment_monitor"}) status = "current" if current else "stale_background_only" if stale else "unknown" @@ -665,6 +859,9 @@ def _static_bypass_resonance(cand, *, static_cfg, sector_signal_count=0, top_tra signals.append("higher_lows") if (cand.get("compression_surge") or {}).get("found"): signals.append("compression_surge") + short_tf = cand.get("short_tf_ignition") or {} + if short_tf.get("resonance") or len(short_tf.get("signals") or []) >= 1: + signals.append("short_timeframe") if cand.get("sentiment") or cand.get("sentiment_bonus"): signals.append("sentiment") return list(dict.fromkeys(signals)) @@ -747,11 +944,15 @@ def layer1_coarse_filter(): """粗筛 — 只检测量价行为+布林收窄,不计算任何滞后指标""" print("=== 第一层:粗筛(v11纯前瞻) ===") tickers = fetch_all_tickers() + coverage_meta = dict(getattr(fetch_all_tickers, "last_coverage_meta", {}) or {}) universe_exclusions = list(getattr(fetch_all_tickers, "last_universe_exclusions", []) or []) excluded_symbols = {item.get("symbol", "") for item in universe_exclusions} funding_rates = fetch_funding_rates() weights = get_dynamic_weights() candidates = {} + kline_attempt_symbols = set() + h1_success_symbols = set() + h4_success_symbols = set() # === 24h筛选历史豁免 (v1.6.9) === # 过去24h内在screening_log出现过的币,不受"涨太多"过滤限制 @@ -791,10 +992,16 @@ def layer1_coarse_filter(): vp_data = None bb_data = None static_accumulation = None + short_tf_ignition = None # 1H量价齐飞检测(核心) + kline_attempt_symbols.add(symbol) h1_df = fetch_klines(symbol, "1h", limit=72) h4_df = fetch_klines(symbol, "4h", limit=100) + if h1_df is not None and len(h1_df) > 0: + h1_success_symbols.add(symbol) + if h4_df is not None and len(h4_df) > 0: + h4_success_symbols.add(symbol) if h1_df is not None and len(h1_df) >= 20: vp_data = detect_volume_price_fly(h1_df) if vp_data: @@ -853,6 +1060,36 @@ def layer1_coarse_filter(): ) anomaly_score += max(1, weights.get("静K蓄力", 2)) + short_cfg = get_screener_section("short_timeframe_ignition") + if short_cfg.get("enabled", True) and vol >= float(short_cfg.get("min_volume_24h", 5_000_000) or 0): + m15_result = {"found": False, "timeframe": "15m"} + m5_result = {"found": False, "timeframe": "5m"} + m15_df = fetch_klines(symbol, "15m", limit=120) + if m15_df is not None and len(m15_df) >= 30: + m15_result = detect_short_timeframe_ignition(m15_df, "15m", short_cfg.get("m15", {})) + # 5m 噪音更大:只有 15m 已有启动,或 1H/4H已有结构背景时才启用。 + if m15_result.get("found") or anomalies: + m5_df = fetch_klines(symbol, "5m", limit=120) + if m5_df is not None and len(m5_df) >= 30: + m5_result = detect_short_timeframe_ignition(m5_df, "5m", short_cfg.get("m5", {})) + short_signals = [x for x in (m15_result, m5_result) if x.get("found")] + if short_signals: + short_tf_ignition = { + "signals": short_signals, + "m15": m15_result, + "m5": m5_result, + "resonance": bool(m15_result.get("found") and m5_result.get("found")), + } + if short_tf_ignition["resonance"]: + anomalies.append("5m/15m短周期共振启动") + anomaly_score += int(short_cfg.get("resonance_bonus", 1) or 1) + if m15_result.get("found"): + anomalies.append(m15_result.get("signal") or "15min短周期启动") + anomaly_score += int(short_cfg.get("score_m15", 2) or 2) + if m5_result.get("found"): + anomalies.append(m5_result.get("signal") or "5min极早期启动") + anomaly_score += int(short_cfg.get("score_m5", 1) or 1) + # 资金费率极端(保留) fr = funding_rates.get(symbol, 0) funding_cfg = funding_rate_params() @@ -894,6 +1131,7 @@ def layer1_coarse_filter(): "vp_data": vp_data, "bb_data": bb_data, "static_accumulation": static_accumulation, + "short_tf_ignition": short_tf_ignition, "h4_df": h4_df, "turnover_acceleration_1h": turnover_acc_1h, "turnover_acceleration_4h": turnover_acc_4h, @@ -936,7 +1174,10 @@ def layer1_coarse_filter(): fr = funding_rates.get(symbol, 0) # 拉取4H数据(只拉一次,多个检测复用) + kline_attempt_symbols.add(symbol) h4_df = fetch_klines(symbol, "4h", limit=100) + if h4_df is not None and len(h4_df) > 0: + h4_success_symbols.add(symbol) if h4_df is None or len(h4_df) < 20: continue @@ -1062,10 +1303,13 @@ def layer1_coarse_filter(): "price": info.get("price", 0) or 0, "volume_24h": info.get("volume_24h", 0) or 0, "change_24h": info.get("change_24h", 0) or 0, + "reason_type": "dynamic", + "min_volume": low_turnover_threshold, **gate, }) excluded_symbols.add(symbol) + record_universe_decisions(universe_exclusions) universe_logged = _log_universe_exclusions(universe_exclusions) if bypass_count or hl_count_total or cs_count_total: @@ -1104,6 +1348,8 @@ def layer1_coarse_filter(): print(f"粗筛结果: {len(candidates)}个候选(宇宙过滤{len(universe_exclusions)}个,记录{universe_logged}个;含{total_bypass}个旁路: 静K{bypass_count}+底抬{hl_count_total}+压放{cs_count_total};强势榜发现{top_gainer_count}个)") for symbol, cand in candidates.items(): signals = cand.get("anomalies", []) + if cand.get("short_tf_ignition"): + record_short_tf_samples(symbol, {**cand, "signal_recency": _build_signal_recency(cand)}) log_screening( layer="粗筛", symbol=symbol, @@ -1135,6 +1381,20 @@ def layer1_coarse_filter(): "universe_gate_count": len(universe_exclusions), "universe_gate_logged": universe_logged, "top_gainer_discovery_count": top_gainer_count, + "coverage": { + **coverage_meta, + "tradable_universe_count": len(tickers), + "universe_gate_count": len(universe_exclusions), + "static_exclusion_count": sum(1 for item in universe_exclusions if (item.get("reason_type") or reason_type_for(item.get("reason_code"))) == "static"), + "dynamic_exclusion_count": sum(1 for item in universe_exclusions if (item.get("reason_type") or reason_type_for(item.get("reason_code"))) == "dynamic"), + "low_turnover_count": sum(1 for item in universe_exclusions if item.get("reason_code") == "low_turnover"), + "stale_ticker_count": sum(1 for item in universe_exclusions if item.get("reason_code") == "stale_ticker"), + "kline_attempt_count": len(kline_attempt_symbols), + "kline_h1_success_count": len(h1_success_symbols), + "kline_h4_success_count": len(h4_success_symbols), + "coarse_candidate_count": len(candidates), + "top_gainer_discovery_count": top_gainer_count, + }, } return candidates @@ -1195,6 +1455,15 @@ def layer2_fine_filter(candidates): signals.append(f"1H 连续{vp_data['max_consecutive_3x']}根3x放量") score += 2 + short_tf = cand.get("short_tf_ignition") or {} + if short_tf.get("signals"): + if short_tf.get("resonance"): + signals.append("短周期共振:5m/15m同步启动") + score += 1 + for item in short_tf.get("signals", []): + if item.get("found"): + signals.append(item.get("signal") or f"{item.get('timeframe')}短周期启动") + # 继承布林数据(蓄力末期特征) bb_data = cand.get("bb_data") if bb_data and bb_data["tight_squeeze"]: @@ -1580,6 +1849,29 @@ def _emit_output(output, compact: bool = False): print(json.dumps(output, ensure_ascii=False, indent=2)) +def _record_coverage_snapshot(started_at, status, *, fine_qualified=0, quality_rejected=0): + funnel_meta = getattr(layer1_coarse_filter, "last_funnel_meta", {}) or {} + coverage = dict(funnel_meta.get("coverage") or {}) + if not coverage: + return 0 + coverage.update({ + "scan_started_at": started_at.isoformat(timespec="seconds"), + "scan_finished_at": datetime.now().isoformat(timespec="seconds"), + "status": status, + "fine_qualified_count": int(fine_qualified or 0), + "quality_rejected_count": int(quality_rejected or 0), + "detail": { + "universe_gate_logged": funnel_meta.get("universe_gate_logged", 0), + "top_gainer_discovery_count": funnel_meta.get("top_gainer_discovery_count", 0), + }, + }) + try: + return record_screening_coverage(coverage) + except Exception as exc: + print(f"覆盖率审计写入失败(非致命): {exc}") + return 0 + + def main(compact: bool = False): started_at = datetime.now() try: @@ -1591,10 +1883,12 @@ def main(compact: bool = False): funnel_meta = getattr(layer1_coarse_filter, "last_funnel_meta", {}) if not candidates: + coverage_id = _record_coverage_snapshot(started_at, "no_candidates") output = { "status": "no_candidates", "message": "粗筛无候选", "universe_gate_count": funnel_meta.get("universe_gate_count", 0), + "coverage_audit_id": coverage_id, "check_time": datetime.now().isoformat(), } _emit_output(output, compact=compact) @@ -1604,12 +1898,19 @@ def main(compact: bool = False): fine_meta = getattr(layer2_fine_filter, "last_funnel_meta", {}) if not qualified: + coverage_id = _record_coverage_snapshot( + started_at, + "no_qualified", + fine_qualified=0, + quality_rejected=fine_meta.get("quality_rejected_count", 0), + ) output = { "status": "no_qualified", "message": "细筛无合格候选", "candidates_count": len(candidates), "universe_gate_count": funnel_meta.get("universe_gate_count", 0), "quality_rejected_count": fine_meta.get("quality_rejected_count", 0), + "coverage_audit_id": coverage_id, "check_time": datetime.now().isoformat(), } _emit_output(output, compact=compact) @@ -1634,12 +1935,19 @@ def main(compact: bool = False): if hot_sectors: pass # 用户要求:板块联动不再推送飞书,仅保留DB记录 + coverage_id = _record_coverage_snapshot( + started_at, + "screened", + fine_qualified=len(qualified), + quality_rejected=fine_meta.get("quality_rejected_count", 0), + ) output = { "status": "screened", "total_candidates": len(candidates), "total_qualified": len(qualified), "universe_gate_count": funnel_meta.get("universe_gate_count", 0), "quality_rejected_count": fine_meta.get("quality_rejected_count", 0), + "coverage_audit_id": coverage_id, "alerts": alert_results, "all_qualified": qualified, "check_time": datetime.now().isoformat(), @@ -1669,6 +1977,7 @@ def main(compact: bool = False): "total_qualified": output.get("total_qualified", 0), "universe_gate_count": output.get("universe_gate_count", 0), "quality_rejected_count": output.get("quality_rejected_count", 0), + "coverage_audit_id": output.get("coverage_audit_id", 0), "alert_count": len(output.get("alerts", [])), } log_cron_run( diff --git a/app/web/routes_recommendations.py b/app/web/routes_recommendations.py index 4deaef5..348d5ce 100644 --- a/app/web/routes_recommendations.py +++ b/app/web/routes_recommendations.py @@ -14,6 +14,7 @@ from app.db.analytics import ( ) from app.db.llm_insights import get_llm_insight_by_id, list_llm_insights from app.db.recommendation_queries import get_active_recommendations, get_active_recommendations_deduped +from app.db.short_tf_signals import get_short_tf_signal_review from app.config.config_loader import get_signal_weights from app.web.shared import ( ObservationRequest, @@ -173,6 +174,12 @@ async def api_screening(hours: int = 24, limit: int = 100, altcoin_session: str return get_screening_history(hours, limit) +@router.get("/api/screening/short-tf-review") +async def api_short_tf_signal_review(hours: int = 168, limit: int = 200, altcoin_session: str = Cookie(default="")): + require_api_user_with_subscription(altcoin_session) + return get_short_tf_signal_review(hours=hours, limit=limit) + + @router.get("/api/review") async def api_review(altcoin_session: str = Cookie(default="")): require_api_user_with_subscription(altcoin_session) diff --git a/rules.yaml b/rules.yaml index 3ce5647..567f253 100644 --- a/rules.yaml +++ b/rules.yaml @@ -121,6 +121,25 @@ screener: min_score: 2 min_volume_24h: 2000000 note: 2026-05-08复盘发现29%爆发币在起爆前振幅<20%+突然放量>2x。紧凑型压缩后爆发模式。 + short_timeframe_ignition: + enabled: true + min_volume_24h: 5000000 + score_m15: 2 + score_m5: 1 + resonance_bonus: 1 + note: 5m/15m 只作为早期发现预警,不直接生成买入;最终交易动作仍由确认层 15m/30m/1H/4H/1D 共振决定。 + m15: + recent_bars: 8 + max_trigger_age_bars: 2 + min_vol_ratio: 2.5 + min_body_ratio: 0.45 + min_gain_pct: 0.8 + m5: + recent_bars: 12 + max_trigger_age_bars: 3 + min_vol_ratio: 3.5 + min_body_ratio: 0.5 + min_gain_pct: 0.6 sector_rotation: bonus_weight: 0 min_non_sector_signals_for_accelerate: 2 diff --git a/static/pipeline.html b/static/pipeline.html index d94952a..338ffa6 100644 --- a/static/pipeline.html +++ b/static/pipeline.html @@ -2,7 +2,7 @@ {% block title %}AlphaX Agent — 链路日志{% endblock %} {% block extra_head_css %} {% endblock %} {% block content %} @@ -15,6 +15,16 @@
样本 '+esc(x.count||0)+' · 转推荐 '+esc(x.conversion_rate||0)+'% · 当前均值 '+fmtPct(x.avg_return_pct||0)+' · 胜率 '+esc(x.win_rate||0)+'%
| 时间 | 阶段 | 币种 | 分数/收益 | 价格/涨幅 | 状态 | 信号与说明 |
|---|---|---|---|---|---|---|
| '+fmtTime(r.time)+' | '+label(r.stage,cls)+' | '+esc(r.symbol||'--')+' | '+esc(r.score===''?'--':r.score)+' | '+esc(r.price==null?'--':r.price)+' | '+esc(r.status||'--')+' | '+esc(desc||'--')+' |
| 时间 | 阶段 | 币种 | 分数/收益 | 价格/涨幅 | 状态 | 信号与说明 |
|---|---|---|---|---|---|---|
| '+fmtTime(r.time)+' | '+label(r.stage,cls)+' | '+esc(r.symbol||'--')+' | '+esc(r.score===''?'--':r.score)+' | '+esc(r.price==null?'--':r.price)+' | '+esc(r.status||'--')+' | '+esc(desc||'--')+' |