From d227ab126c4bbccccf4a9513fa84306fc003be47 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Tue, 26 May 2026 23:33:06 +0800 Subject: [PATCH] 1 --- app/core/factor_scoring.py | 3 + app/core/signal_taxonomy.py | 2 + app/services/altcoin_confirm.py | 185 ++++++++++++++++++++++++- app/services/onchain_monitor.py | 116 ++++++++-------- tests/test_box_breakout_pullback_4h.py | 58 ++++++++ 5 files changed, 304 insertions(+), 60 deletions(-) create mode 100644 tests/test_box_breakout_pullback_4h.py diff --git a/app/core/factor_scoring.py b/app/core/factor_scoring.py index 69a7eed..ed3e5ac 100644 --- a/app/core/factor_scoring.py +++ b/app/core/factor_scoring.py @@ -22,6 +22,7 @@ DEFAULT_FACTOR_WEIGHTS = { "static_accum_4h": 5.0, "higher_lows_4h": 2.0, "compression_surge_4h": 2.0, + "box_breakout_pullback_4h": 8.0, "ignition_1h_current": 4.0, "ignition_4h_current": 3.0, "ignition_d1_current": 6.0, @@ -59,6 +60,7 @@ FACTOR_GROUPS = { "static_accum_4h": "structure", "higher_lows_4h": "structure", "compression_surge_4h": "structure", + "box_breakout_pullback_4h": "structure", "ignition_1h_current": "momentum", "ignition_4h_current": "momentum", "ignition_d1_current": "momentum", @@ -104,6 +106,7 @@ WEIGHT_ALIASES = { "volume_consecutive_1h": ("连续3x放量", "连续3x放量(≥3根)", "1H连续放量"), "volume_divergence_1h": ("量价背离", "1H量价背离"), "static_accum_4h": ("静K蓄力", "4H静K蓄力"), + "box_breakout_pullback_4h": ("4H箱体突破回踩", "4H底部箱体突破回踩"), "ignition_1h_current": ("静K动K转折", "静K→动K转折", "1H当前起爆点"), "ignition_4h_current": ("静K动K转折", "静K→动K转折", "4H当前起爆点"), "ignition_d1_current": ("静K动K转折", "静K→动K转折", "日线当前起爆点"), diff --git a/app/core/signal_taxonomy.py b/app/core/signal_taxonomy.py index 9434229..9421188 100644 --- a/app/core/signal_taxonomy.py +++ b/app/core/signal_taxonomy.py @@ -22,6 +22,7 @@ SIGNAL_CODE_LABELS = { "static_accum_4h": "4H静K蓄力", "higher_lows_4h": "4H底部抬高", "compression_surge_4h": "4H压缩放量", + "box_breakout_pullback_4h": "4H箱体突破回踩", "ignition_1h_current": "1H当前起爆点", "ignition_4h_current": "4H当前起爆点", "ignition_d1_current": "日线当前起爆点", @@ -63,6 +64,7 @@ _PATTERNS = [ ("short_tf_5m_ignition", ("5min极早期启动", "5m极早期启动", "5min 早期启动")), ("volume_divergence_1h", ("量价背离", "放量但无量价齐飞")), ("static_accum_4h", ("静K蓄力", "静K旁路")), + ("box_breakout_pullback_4h", ("4H", "箱体", "突破", "回踩")), ("higher_lows_4h", ("底部抬高",)), ("compression_surge_4h", ("压缩放量",)), ("ignition_stale", ("历史起爆点", "起爆点已过期", "旧起爆")), diff --git a/app/services/altcoin_confirm.py b/app/services/altcoin_confirm.py index 4930901..e99f200 100644 --- a/app/services/altcoin_confirm.py +++ b/app/services/altcoin_confirm.py @@ -182,7 +182,17 @@ def _is_candidate_fresh(cand, event_times, max_hours=6): -def _build_trigger_context(fresh_reason, fresh_events, vp_data=None, stale_vp_count=0, stale_1h_ignitions=None, stale_d1_ignitions=None, bp_daily=None, entry_action=""): +def _build_trigger_context( + fresh_reason, + fresh_events, + vp_data=None, + stale_vp_count=0, + stale_1h_ignitions=None, + stale_d1_ignitions=None, + bp_daily=None, + bp_4h=None, + entry_action="", +): """生成用户可审计的触发上下文:区分当前触发、历史背景、消息触发。""" fresh_events = fresh_events or [] stale_1h_ignitions = stale_1h_ignitions or [] @@ -206,6 +216,20 @@ def _build_trigger_context(fresh_reason, fresh_events, vp_data=None, stale_vp_co fresh_event_bucket.append({"type": "technical", "label": fresh_event_label, "source": "pa_engine", **e}) if (bp_daily or {}).get("detected"): stale.append({"type": "technical_background", "label": "日线底部突破回踩背景", "source": "daily_structure"}) + if (bp_4h or {}).get("detected"): + age = bp_4h.get("pullback_age_bars") + item = { + "type": "technical", + "label": "4H箱体突破回踩", + "source": "box_breakout_pullback_4h", + "entry_zone": bp_4h.get("entry_zone"), + "pullback_kind": bp_4h.get("pullback_kind"), + "age_bars": age, + } + if age is not None and int(age) <= 1: + current.append(item) + else: + stale.append({**item, "type": "technical_background"}) if fresh_reason == "stale_structure_background_only": status = "stale_background_only" label = "历史结构背景,缺少当前K线触发" @@ -846,6 +870,137 @@ def detect_breakout_pullback(df, timeframe="1d"): return result +def detect_box_breakout_pullback_4h(df, lookback=24, max_wait_bars=8): + """4H底部箱体突破回踩检测。 + + 模式:底部箱体横盘 -> 放量突破箱体上沿 -> 回踩箱体上沿/EMA不破。 + 这类形态比单根K线因子更像完整交易剧本,所以单独输出可复盘证据。 + """ + result = { + "detected": False, + "score": 0, + "signals": [], + "entry_zone": None, + "stop_level": None, + "quality": "", + "pullback_kind": "", + "pullback_age_bars": None, + } + if df is None or len(df) < int(lookback) + int(max_wait_bars) + 8: + return result + + work = df.copy() + for col in ("open", "high", "low", "close", "volume"): + work[col] = pd.to_numeric(work[col], errors="coerce") + work = work.dropna(subset=["open", "high", "low", "close", "volume"]).reset_index(drop=True) + if len(work) < int(lookback) + int(max_wait_bars) + 8: + return result + + work["ema5"] = work["close"].ewm(span=5, adjust=False).mean() + work["ema25"] = work["close"].ewm(span=25, adjust=False).mean() + + # 只回看最近一段,避免很久以前的箱体形态反复污染当前确认。 + start = max(int(lookback), len(work) - 36) + end = len(work) - 1 + best = None + for i in range(start, end): + base = work.iloc[i - int(lookback):i] + if len(base) < int(lookback): + continue + box_high = float(base["high"].quantile(0.92)) + box_low = float(base["low"].quantile(0.08)) + if box_high <= 0 or box_low <= 0: + continue + box_width_pct = (box_high - box_low) / box_low * 100 + if box_width_pct <= 3 or box_width_pct > 45: + continue + + row = work.iloc[i] + vol_median = float(base["volume"].median() or 0) + breakout_vol_ratio = float(row["volume"]) / vol_median if vol_median > 0 else 1.0 + broke_out = ( + float(row["close"]) > box_high * 1.006 + and float(row["close"]) > float(row["open"]) + and breakout_vol_ratio >= 1.15 + ) + if not broke_out: + continue + + for j in range(i + 1, min(len(work), i + int(max_wait_bars) + 1)): + pb = work.iloc[j] + low = float(pb["low"]) + close = float(pb["close"]) + ema5 = float(pb["ema5"]) + ema25 = float(pb["ema25"]) + touch_box = low <= box_high * 1.035 and close >= box_high * 0.985 + touch_ema5 = low <= ema5 * 1.012 and close >= ema5 * 0.985 + touch_ema25 = low <= ema25 * 1.012 and close >= ema25 * 0.985 + failed = close < box_high * 0.975 or low < box_low * 0.985 + if failed: + break + if not (touch_box or touch_ema5 or touch_ema25): + continue + + score = 5 + signals = [] + if box_width_pct <= 20: + score += 2 + elif box_width_pct <= 32: + score += 1 + if breakout_vol_ratio >= 2: + score += 2 + elif breakout_vol_ratio >= 1.5: + score += 1 + if touch_box and (touch_ema5 or touch_ema25): + score += 2 + elif touch_box or touch_ema5 or touch_ema25: + score += 1 + age_bars = len(work) - 1 - j + if age_bars <= 1: + score += 1 + + pullback_parts = [] + if touch_box: + pullback_parts.append("箱体上沿") + if touch_ema5: + pullback_parts.append("EMA5") + if touch_ema25: + pullback_parts.append("EMA25") + pullback_kind = "+".join(pullback_parts) or "回踩承接" + signals.append( + "4H箱体突破回踩({} ${:.6g}, 量{:.1f}x)".format( + pullback_kind, + box_high, + breakout_vol_ratio, + ) + ) + signals.append("4H底部箱体宽度{:.1f}%".format(box_width_pct)) + quality = "优质" if score >= 10 else "良好" if score >= 7 else "可观察" if score >= 5 else "弱" + candidate = { + "detected": True, + "score": score, + "signals": signals, + "entry_zone": round(box_high, 8), + "stop_level": round(min(float(pb["low"]), box_low) * 0.97, 8), + "quality": quality, + "pullback_kind": pullback_kind, + "pullback_age_bars": int(age_bars), + "box_high": round(box_high, 8), + "box_low": round(box_low, 8), + "box_width_pct": round(box_width_pct, 3), + "breakout_vol_ratio": round(breakout_vol_ratio, 3), + "breakout_index": int(i), + "pullback_index": int(j), + } + if best is None or (candidate["pullback_index"], candidate["score"]) > (best["pullback_index"], best["score"]): + best = candidate + break + + if best: + result.update(best) + return result + + def confirm_burst(symbol, cand): """对单个候选做爆发确认(v1.7.0:强共振旁路+量价齐飞双门控) cand: coin_state行数据,含leader_status/detail_json等 @@ -963,6 +1118,26 @@ def confirm_burst(symbol, cand): if high_q_supply: resistance = high_q_supply[0]["top"] + # ---- 4H箱体突破回踩:完整结构模型,优先作为“可执行形态”记录和复盘 ---- + bp_4h = {"detected": False} + try: + if h4_df is not None and len(h4_df) >= 40: + bp_4h = detect_box_breakout_pullback_4h(h4_df) + except Exception: + bp_4h = {"detected": False} + if bp_4h.get("detected"): + signals.extend(bp_4h.get("signals", [])) + score += factor_scorer.add_existing( + "box_breakout_pullback_4h", + bp_4h.get("score", 0), + evidence="4H底部箱体突破后回踩箱体上沿/均线", + value=bp_4h, + cap=10, + ) + t = _event_time_from_age(h4_df, bp_4h.get("pullback_age_bars")) + if t and int(bp_4h.get("pullback_age_bars") or 999) <= 1: + current_trigger_times.append(t) + # ---- v1.7.7: 日线 PA 全分析(供需区 + 起爆点 + 动K,高权重)---- # 日线是最大的时间框架,信号强度远高于小时级 pa_1d = {} @@ -1181,7 +1356,7 @@ def confirm_burst(symbol, cand): current_trigger_ok = bool(current_trigger_times) recent_candidate_ok = (fresh_reason == "fresh_candidate_state") if score >= structure_gate_score and entry_action in ("即刻买入", "可即刻买入") and (current_trigger_ok or recent_candidate_ok): - if fresh_reason != "stale_structure_background_only" and (stale_vp_count > 0 or stale_1h_ignitions or stale_d1_ignitions or bp_daily.get("detected")): + if fresh_reason != "stale_structure_background_only" and (stale_vp_count > 0 or stale_1h_ignitions or stale_d1_ignitions or bp_daily.get("detected") or bp_4h.get("detected")): signals.append(f"🟡 历史强背景+当前结构确认(score≥{structure_gate_score})") confirmed = True @@ -1227,6 +1402,9 @@ def confirm_burst(symbol, cand): # 日线筑底 if bp_daily.get("detected"): aux_count += 1 + # 4H箱体突破回踩:更接近交易级别的结构形态 + if bp_4h.get("detected") and int(bp_4h.get("pullback_age_bars") or 999) <= 3: + aux_count += 1 # 舆情共振:screener给了sentiment_bonus sentiment_bonus = cand_detail.get("sentiment_bonus") if sentiment_bonus is not None and sentiment_bonus > 0: @@ -1484,9 +1662,11 @@ def confirm_burst(symbol, cand): stale_1h_ignitions=stale_1h_ignitions if 'stale_1h_ignitions' in locals() else [], stale_d1_ignitions=stale_d1_ignitions if 'stale_d1_ignitions' in locals() else [], bp_daily=bp_daily if 'bp_daily' in locals() else {}, + bp_4h=bp_4h if 'bp_4h' in locals() else {}, entry_action=entry_action, ) market_context["trigger_context"] = trigger_context + market_context["box_breakout_pullback_4h"] = bp_4h if 'bp_4h' in locals() else {} market_context["factor_score_breakdown"] = factor_score_breakdown market_context["onchain_context"] = onchain_context market_context["market_regime"] = market_regime @@ -1532,6 +1712,7 @@ def confirm_burst(symbol, cand): "pa_1h": pa_1h, "pa_15min": pa_15min_result, "pa_1d": pa_1d, + "box_breakout_pullback_4h": bp_4h if 'bp_4h' in locals() else {}, "m30_aligned": m30_aligned, "entry_action": (entry_plan or {}).get("entry_action") or entry_action, "market_context": market_context, diff --git a/app/services/onchain_monitor.py b/app/services/onchain_monitor.py index 3fcffb4..5ac3437 100644 --- a/app/services/onchain_monitor.py +++ b/app/services/onchain_monitor.py @@ -921,67 +921,67 @@ def enqueue_onchain_candidates(min_score=None, min_confidence=None, cooldown_hou cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat() for row in rows: event = dict(row) - symbol = normalize_symbol(event.get("symbol")) - if not symbol or not _tradable_symbol(symbol): - skipped_ids.append(event["id"]) - continue - score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence"))) - if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0): - continue - recent = conn.execute( - """ - SELECT id FROM event_news - WHERE source='onchain' AND symbol=%s AND detected_at >= %s - LIMIT 1 - """, - (symbol, cooldown_cutoff), - ).fetchone() - if recent: - skipped_ids.append(event["id"]) - continue - title = _candidate_title(event) - h = event_hash("onchain", title, symbol) try: - # A single bad/duplicate candidate must not poison the whole - # PostgreSQL transaction; nested transaction becomes SAVEPOINT. - with conn.transaction(): - inserted = conn.execute( - """ - INSERT INTO event_news - (event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed) - VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0) - ON CONFLICT(event_hash) DO NOTHING - RETURNING id - """, - ( - h, - symbol, - title, - event.get("url") or "", - event.get("detected_at") or now, - now, - event.get("severity") or "A", - json.dumps( - { - "onchain_event_id": event.get("id"), - "chain": event.get("chain"), - "signal_code": event.get("signal_code"), - "signal_label": event.get("signal_label"), - "confidence": event.get("confidence"), - "value_usd": event.get("value_usd"), - "onchain_score": event.get("latest_onchain_score"), - "risk_score": event.get("latest_risk_score"), - }, - ensure_ascii=False, - ), + symbol = normalize_symbol(event.get("symbol")) + if not symbol or not _tradable_symbol(symbol): + skipped_ids.append(event["id"]) + continue + score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence"))) + if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0): + continue + recent = conn.execute( + """ + SELECT id FROM event_news + WHERE source='onchain' AND symbol=%s AND detected_at >= %s + LIMIT 1 + """, + (symbol, cooldown_cutoff), + ).fetchone() + if recent: + skipped_ids.append(event["id"]) + continue + title = _candidate_title(event) + h = event_hash("onchain", title, symbol) + inserted = conn.execute( + """ + INSERT INTO event_news + (event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed) + VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0) + ON CONFLICT(event_hash) DO NOTHING + RETURNING id + """, + ( + h, + symbol, + title, + event.get("url") or "", + event.get("detected_at") or now, + now, + event.get("severity") or "A", + json.dumps( + { + "onchain_event_id": event.get("id"), + "chain": event.get("chain"), + "signal_code": event.get("signal_code"), + "signal_label": event.get("signal_label"), + "confidence": event.get("confidence"), + "value_usd": event.get("value_usd"), + "onchain_score": event.get("latest_onchain_score"), + "risk_score": event.get("latest_risk_score"), + }, + ensure_ascii=False, ), - ).fetchone() - if inserted: - conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),)) - queued.append(symbol) - else: - skipped_ids.append(event["id"]) + ), + ).fetchone() + if inserted: + conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),)) + queued.append(symbol) + else: + skipped_ids.append(event["id"]) + conn.commit() except Exception as exc: + conn.rollback() + symbol = normalize_symbol(event.get("symbol")) errors.append(f"{symbol}:candidate_enqueue:{str(exc)[:160]}") skipped_ids.append(event["id"]) if skipped_ids: diff --git a/tests/test_box_breakout_pullback_4h.py b/tests/test_box_breakout_pullback_4h.py new file mode 100644 index 0000000..b5cff6b --- /dev/null +++ b/tests/test_box_breakout_pullback_4h.py @@ -0,0 +1,58 @@ +from datetime import datetime, timedelta + +import pandas as pd + +from app.core.signal_taxonomy import signal_code +from app.services.altcoin_confirm import detect_box_breakout_pullback_4h + + +def _bar(ts, open_, high, low, close, volume): + return { + "timestamp": ts, + "open": open_, + "high": high, + "low": low, + "close": close, + "volume": volume, + } + + +def _box_breakout_rows(fail=False): + start = datetime(2026, 5, 20) + rows = [] + for i in range(32): + base = 1.03 + (i % 5) * 0.006 + rows.append(_bar(start + timedelta(hours=4 * i), base, 1.095 + (i % 3) * 0.002, 0.985 - (i % 2) * 0.002, base + 0.004, 1000 + (i % 4) * 30)) + + rows.append(_bar(start + timedelta(hours=4 * 32), 1.085, 1.16, 1.08, 1.135, 2600)) + if fail: + rows.append(_bar(start + timedelta(hours=4 * 33), 1.13, 1.135, 1.00, 1.055, 1800)) + else: + rows.append(_bar(start + timedelta(hours=4 * 33), 1.13, 1.145, 1.092, 1.118, 1600)) + + for i in range(12): + close = (1.04 + i * 0.002) if fail else (1.12 + i * 0.008) + rows.append(_bar(start + timedelta(hours=4 * (34 + i)), close - 0.01, close + 0.018, close - 0.025, close, 1300 + i * 40)) + return rows + + +def test_detect_box_breakout_pullback_4h_finds_box_top_retest(): + df = pd.DataFrame(_box_breakout_rows()) + + result = detect_box_breakout_pullback_4h(df) + + assert result["detected"] is True + assert result["score"] >= 7 + assert result["quality"] in {"良好", "优质"} + assert result["entry_zone"] > 1.08 + assert result["stop_level"] < result["entry_zone"] + assert "箱体" in result["signals"][0] + assert signal_code(result["signals"][0]) == "box_breakout_pullback_4h" + + +def test_detect_box_breakout_pullback_4h_rejects_failed_retest(): + df = pd.DataFrame(_box_breakout_rows(fail=True)) + + result = detect_box_breakout_pullback_4h(df) + + assert result["detected"] is False