This commit is contained in:
aaron 2026-05-26 23:33:06 +08:00
parent cca36615c1
commit d227ab126c
5 changed files with 304 additions and 60 deletions

View File

@ -22,6 +22,7 @@ DEFAULT_FACTOR_WEIGHTS = {
"static_accum_4h": 5.0,
"higher_lows_4h": 2.0,
"compression_surge_4h": 2.0,
"box_breakout_pullback_4h": 8.0,
"ignition_1h_current": 4.0,
"ignition_4h_current": 3.0,
"ignition_d1_current": 6.0,
@ -59,6 +60,7 @@ FACTOR_GROUPS = {
"static_accum_4h": "structure",
"higher_lows_4h": "structure",
"compression_surge_4h": "structure",
"box_breakout_pullback_4h": "structure",
"ignition_1h_current": "momentum",
"ignition_4h_current": "momentum",
"ignition_d1_current": "momentum",
@ -104,6 +106,7 @@ WEIGHT_ALIASES = {
"volume_consecutive_1h": ("连续3x放量", "连续3x放量(≥3根)", "1H连续放量"),
"volume_divergence_1h": ("量价背离", "1H量价背离"),
"static_accum_4h": ("静K蓄力", "4H静K蓄力"),
"box_breakout_pullback_4h": ("4H箱体突破回踩", "4H底部箱体突破回踩"),
"ignition_1h_current": ("静K动K转折", "静K→动K转折", "1H当前起爆点"),
"ignition_4h_current": ("静K动K转折", "静K→动K转折", "4H当前起爆点"),
"ignition_d1_current": ("静K动K转折", "静K→动K转折", "日线当前起爆点"),

View File

@ -22,6 +22,7 @@ SIGNAL_CODE_LABELS = {
"static_accum_4h": "4H静K蓄力",
"higher_lows_4h": "4H底部抬高",
"compression_surge_4h": "4H压缩放量",
"box_breakout_pullback_4h": "4H箱体突破回踩",
"ignition_1h_current": "1H当前起爆点",
"ignition_4h_current": "4H当前起爆点",
"ignition_d1_current": "日线当前起爆点",
@ -63,6 +64,7 @@ _PATTERNS = [
("short_tf_5m_ignition", ("5min极早期启动", "5m极早期启动", "5min 早期启动")),
("volume_divergence_1h", ("量价背离", "放量但无量价齐飞")),
("static_accum_4h", ("静K蓄力", "静K旁路")),
("box_breakout_pullback_4h", ("4H", "箱体", "突破", "回踩")),
("higher_lows_4h", ("底部抬高",)),
("compression_surge_4h", ("压缩放量",)),
("ignition_stale", ("历史起爆点", "起爆点已过期", "旧起爆")),

View File

@ -182,7 +182,17 @@ def _is_candidate_fresh(cand, event_times, max_hours=6):
def _build_trigger_context(fresh_reason, fresh_events, vp_data=None, stale_vp_count=0, stale_1h_ignitions=None, stale_d1_ignitions=None, bp_daily=None, entry_action=""):
def _build_trigger_context(
fresh_reason,
fresh_events,
vp_data=None,
stale_vp_count=0,
stale_1h_ignitions=None,
stale_d1_ignitions=None,
bp_daily=None,
bp_4h=None,
entry_action="",
):
"""生成用户可审计的触发上下文:区分当前触发、历史背景、消息触发。"""
fresh_events = fresh_events or []
stale_1h_ignitions = stale_1h_ignitions or []
@ -206,6 +216,20 @@ def _build_trigger_context(fresh_reason, fresh_events, vp_data=None, stale_vp_co
fresh_event_bucket.append({"type": "technical", "label": fresh_event_label, "source": "pa_engine", **e})
if (bp_daily or {}).get("detected"):
stale.append({"type": "technical_background", "label": "日线底部突破回踩背景", "source": "daily_structure"})
if (bp_4h or {}).get("detected"):
age = bp_4h.get("pullback_age_bars")
item = {
"type": "technical",
"label": "4H箱体突破回踩",
"source": "box_breakout_pullback_4h",
"entry_zone": bp_4h.get("entry_zone"),
"pullback_kind": bp_4h.get("pullback_kind"),
"age_bars": age,
}
if age is not None and int(age) <= 1:
current.append(item)
else:
stale.append({**item, "type": "technical_background"})
if fresh_reason == "stale_structure_background_only":
status = "stale_background_only"
label = "历史结构背景缺少当前K线触发"
@ -846,6 +870,137 @@ def detect_breakout_pullback(df, timeframe="1d"):
return result
def detect_box_breakout_pullback_4h(df, lookback=24, max_wait_bars=8):
"""4H底部箱体突破回踩检测。
模式底部箱体横盘 -> 放量突破箱体上沿 -> 回踩箱体上沿/EMA不破
这类形态比单根K线因子更像完整交易剧本所以单独输出可复盘证据
"""
result = {
"detected": False,
"score": 0,
"signals": [],
"entry_zone": None,
"stop_level": None,
"quality": "",
"pullback_kind": "",
"pullback_age_bars": None,
}
if df is None or len(df) < int(lookback) + int(max_wait_bars) + 8:
return result
work = df.copy()
for col in ("open", "high", "low", "close", "volume"):
work[col] = pd.to_numeric(work[col], errors="coerce")
work = work.dropna(subset=["open", "high", "low", "close", "volume"]).reset_index(drop=True)
if len(work) < int(lookback) + int(max_wait_bars) + 8:
return result
work["ema5"] = work["close"].ewm(span=5, adjust=False).mean()
work["ema25"] = work["close"].ewm(span=25, adjust=False).mean()
# 只回看最近一段,避免很久以前的箱体形态反复污染当前确认。
start = max(int(lookback), len(work) - 36)
end = len(work) - 1
best = None
for i in range(start, end):
base = work.iloc[i - int(lookback):i]
if len(base) < int(lookback):
continue
box_high = float(base["high"].quantile(0.92))
box_low = float(base["low"].quantile(0.08))
if box_high <= 0 or box_low <= 0:
continue
box_width_pct = (box_high - box_low) / box_low * 100
if box_width_pct <= 3 or box_width_pct > 45:
continue
row = work.iloc[i]
vol_median = float(base["volume"].median() or 0)
breakout_vol_ratio = float(row["volume"]) / vol_median if vol_median > 0 else 1.0
broke_out = (
float(row["close"]) > box_high * 1.006
and float(row["close"]) > float(row["open"])
and breakout_vol_ratio >= 1.15
)
if not broke_out:
continue
for j in range(i + 1, min(len(work), i + int(max_wait_bars) + 1)):
pb = work.iloc[j]
low = float(pb["low"])
close = float(pb["close"])
ema5 = float(pb["ema5"])
ema25 = float(pb["ema25"])
touch_box = low <= box_high * 1.035 and close >= box_high * 0.985
touch_ema5 = low <= ema5 * 1.012 and close >= ema5 * 0.985
touch_ema25 = low <= ema25 * 1.012 and close >= ema25 * 0.985
failed = close < box_high * 0.975 or low < box_low * 0.985
if failed:
break
if not (touch_box or touch_ema5 or touch_ema25):
continue
score = 5
signals = []
if box_width_pct <= 20:
score += 2
elif box_width_pct <= 32:
score += 1
if breakout_vol_ratio >= 2:
score += 2
elif breakout_vol_ratio >= 1.5:
score += 1
if touch_box and (touch_ema5 or touch_ema25):
score += 2
elif touch_box or touch_ema5 or touch_ema25:
score += 1
age_bars = len(work) - 1 - j
if age_bars <= 1:
score += 1
pullback_parts = []
if touch_box:
pullback_parts.append("箱体上沿")
if touch_ema5:
pullback_parts.append("EMA5")
if touch_ema25:
pullback_parts.append("EMA25")
pullback_kind = "+".join(pullback_parts) or "回踩承接"
signals.append(
"4H箱体突破回踩({} ${:.6g}, 量{:.1f}x)".format(
pullback_kind,
box_high,
breakout_vol_ratio,
)
)
signals.append("4H底部箱体宽度{:.1f}%".format(box_width_pct))
quality = "优质" if score >= 10 else "良好" if score >= 7 else "可观察" if score >= 5 else ""
candidate = {
"detected": True,
"score": score,
"signals": signals,
"entry_zone": round(box_high, 8),
"stop_level": round(min(float(pb["low"]), box_low) * 0.97, 8),
"quality": quality,
"pullback_kind": pullback_kind,
"pullback_age_bars": int(age_bars),
"box_high": round(box_high, 8),
"box_low": round(box_low, 8),
"box_width_pct": round(box_width_pct, 3),
"breakout_vol_ratio": round(breakout_vol_ratio, 3),
"breakout_index": int(i),
"pullback_index": int(j),
}
if best is None or (candidate["pullback_index"], candidate["score"]) > (best["pullback_index"], best["score"]):
best = candidate
break
if best:
result.update(best)
return result
def confirm_burst(symbol, cand):
"""对单个候选做爆发确认v1.7.0:强共振旁路+量价齐飞双门控)
cand: coin_state行数据含leader_status/detail_json等
@ -963,6 +1118,26 @@ def confirm_burst(symbol, cand):
if high_q_supply:
resistance = high_q_supply[0]["top"]
# ---- 4H箱体突破回踩完整结构模型优先作为“可执行形态”记录和复盘 ----
bp_4h = {"detected": False}
try:
if h4_df is not None and len(h4_df) >= 40:
bp_4h = detect_box_breakout_pullback_4h(h4_df)
except Exception:
bp_4h = {"detected": False}
if bp_4h.get("detected"):
signals.extend(bp_4h.get("signals", []))
score += factor_scorer.add_existing(
"box_breakout_pullback_4h",
bp_4h.get("score", 0),
evidence="4H底部箱体突破后回踩箱体上沿/均线",
value=bp_4h,
cap=10,
)
t = _event_time_from_age(h4_df, bp_4h.get("pullback_age_bars"))
if t and int(bp_4h.get("pullback_age_bars") or 999) <= 1:
current_trigger_times.append(t)
# ---- v1.7.7: 日线 PA 全分析(供需区 + 起爆点 + 动K高权重----
# 日线是最大的时间框架,信号强度远高于小时级
pa_1d = {}
@ -1181,7 +1356,7 @@ def confirm_burst(symbol, cand):
current_trigger_ok = bool(current_trigger_times)
recent_candidate_ok = (fresh_reason == "fresh_candidate_state")
if score >= structure_gate_score and entry_action in ("即刻买入", "可即刻买入") and (current_trigger_ok or recent_candidate_ok):
if fresh_reason != "stale_structure_background_only" and (stale_vp_count > 0 or stale_1h_ignitions or stale_d1_ignitions or bp_daily.get("detected")):
if fresh_reason != "stale_structure_background_only" and (stale_vp_count > 0 or stale_1h_ignitions or stale_d1_ignitions or bp_daily.get("detected") or bp_4h.get("detected")):
signals.append(f"🟡 历史强背景+当前结构确认(score≥{structure_gate_score})")
confirmed = True
@ -1227,6 +1402,9 @@ def confirm_burst(symbol, cand):
# 日线筑底
if bp_daily.get("detected"):
aux_count += 1
# 4H箱体突破回踩更接近交易级别的结构形态
if bp_4h.get("detected") and int(bp_4h.get("pullback_age_bars") or 999) <= 3:
aux_count += 1
# 舆情共振screener给了sentiment_bonus
sentiment_bonus = cand_detail.get("sentiment_bonus")
if sentiment_bonus is not None and sentiment_bonus > 0:
@ -1484,9 +1662,11 @@ def confirm_burst(symbol, cand):
stale_1h_ignitions=stale_1h_ignitions if 'stale_1h_ignitions' in locals() else [],
stale_d1_ignitions=stale_d1_ignitions if 'stale_d1_ignitions' in locals() else [],
bp_daily=bp_daily if 'bp_daily' in locals() else {},
bp_4h=bp_4h if 'bp_4h' in locals() else {},
entry_action=entry_action,
)
market_context["trigger_context"] = trigger_context
market_context["box_breakout_pullback_4h"] = bp_4h if 'bp_4h' in locals() else {}
market_context["factor_score_breakdown"] = factor_score_breakdown
market_context["onchain_context"] = onchain_context
market_context["market_regime"] = market_regime
@ -1532,6 +1712,7 @@ def confirm_burst(symbol, cand):
"pa_1h": pa_1h,
"pa_15min": pa_15min_result,
"pa_1d": pa_1d,
"box_breakout_pullback_4h": bp_4h if 'bp_4h' in locals() else {},
"m30_aligned": m30_aligned,
"entry_action": (entry_plan or {}).get("entry_action") or entry_action,
"market_context": market_context,

View File

@ -921,67 +921,67 @@ def enqueue_onchain_candidates(min_score=None, min_confidence=None, cooldown_hou
cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat()
for row in rows:
event = dict(row)
symbol = normalize_symbol(event.get("symbol"))
if not symbol or not _tradable_symbol(symbol):
skipped_ids.append(event["id"])
continue
score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence")))
if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0):
continue
recent = conn.execute(
"""
SELECT id FROM event_news
WHERE source='onchain' AND symbol=%s AND detected_at >= %s
LIMIT 1
""",
(symbol, cooldown_cutoff),
).fetchone()
if recent:
skipped_ids.append(event["id"])
continue
title = _candidate_title(event)
h = event_hash("onchain", title, symbol)
try:
# A single bad/duplicate candidate must not poison the whole
# PostgreSQL transaction; nested transaction becomes SAVEPOINT.
with conn.transaction():
inserted = conn.execute(
"""
INSERT INTO event_news
(event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed)
VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0)
ON CONFLICT(event_hash) DO NOTHING
RETURNING id
""",
(
h,
symbol,
title,
event.get("url") or "",
event.get("detected_at") or now,
now,
event.get("severity") or "A",
json.dumps(
{
"onchain_event_id": event.get("id"),
"chain": event.get("chain"),
"signal_code": event.get("signal_code"),
"signal_label": event.get("signal_label"),
"confidence": event.get("confidence"),
"value_usd": event.get("value_usd"),
"onchain_score": event.get("latest_onchain_score"),
"risk_score": event.get("latest_risk_score"),
},
ensure_ascii=False,
),
symbol = normalize_symbol(event.get("symbol"))
if not symbol or not _tradable_symbol(symbol):
skipped_ids.append(event["id"])
continue
score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence")))
if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0):
continue
recent = conn.execute(
"""
SELECT id FROM event_news
WHERE source='onchain' AND symbol=%s AND detected_at >= %s
LIMIT 1
""",
(symbol, cooldown_cutoff),
).fetchone()
if recent:
skipped_ids.append(event["id"])
continue
title = _candidate_title(event)
h = event_hash("onchain", title, symbol)
inserted = conn.execute(
"""
INSERT INTO event_news
(event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed)
VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0)
ON CONFLICT(event_hash) DO NOTHING
RETURNING id
""",
(
h,
symbol,
title,
event.get("url") or "",
event.get("detected_at") or now,
now,
event.get("severity") or "A",
json.dumps(
{
"onchain_event_id": event.get("id"),
"chain": event.get("chain"),
"signal_code": event.get("signal_code"),
"signal_label": event.get("signal_label"),
"confidence": event.get("confidence"),
"value_usd": event.get("value_usd"),
"onchain_score": event.get("latest_onchain_score"),
"risk_score": event.get("latest_risk_score"),
},
ensure_ascii=False,
),
).fetchone()
if inserted:
conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),))
queued.append(symbol)
else:
skipped_ids.append(event["id"])
),
).fetchone()
if inserted:
conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),))
queued.append(symbol)
else:
skipped_ids.append(event["id"])
conn.commit()
except Exception as exc:
conn.rollback()
symbol = normalize_symbol(event.get("symbol"))
errors.append(f"{symbol}:candidate_enqueue:{str(exc)[:160]}")
skipped_ids.append(event["id"])
if skipped_ids:

View File

@ -0,0 +1,58 @@
from datetime import datetime, timedelta
import pandas as pd
from app.core.signal_taxonomy import signal_code
from app.services.altcoin_confirm import detect_box_breakout_pullback_4h
def _bar(ts, open_, high, low, close, volume):
return {
"timestamp": ts,
"open": open_,
"high": high,
"low": low,
"close": close,
"volume": volume,
}
def _box_breakout_rows(fail=False):
start = datetime(2026, 5, 20)
rows = []
for i in range(32):
base = 1.03 + (i % 5) * 0.006
rows.append(_bar(start + timedelta(hours=4 * i), base, 1.095 + (i % 3) * 0.002, 0.985 - (i % 2) * 0.002, base + 0.004, 1000 + (i % 4) * 30))
rows.append(_bar(start + timedelta(hours=4 * 32), 1.085, 1.16, 1.08, 1.135, 2600))
if fail:
rows.append(_bar(start + timedelta(hours=4 * 33), 1.13, 1.135, 1.00, 1.055, 1800))
else:
rows.append(_bar(start + timedelta(hours=4 * 33), 1.13, 1.145, 1.092, 1.118, 1600))
for i in range(12):
close = (1.04 + i * 0.002) if fail else (1.12 + i * 0.008)
rows.append(_bar(start + timedelta(hours=4 * (34 + i)), close - 0.01, close + 0.018, close - 0.025, close, 1300 + i * 40))
return rows
def test_detect_box_breakout_pullback_4h_finds_box_top_retest():
df = pd.DataFrame(_box_breakout_rows())
result = detect_box_breakout_pullback_4h(df)
assert result["detected"] is True
assert result["score"] >= 7
assert result["quality"] in {"良好", "优质"}
assert result["entry_zone"] > 1.08
assert result["stop_level"] < result["entry_zone"]
assert "箱体" in result["signals"][0]
assert signal_code(result["signals"][0]) == "box_breakout_pullback_4h"
def test_detect_box_breakout_pullback_4h_rejects_failed_retest():
df = pd.DataFrame(_box_breakout_rows(fail=True))
result = detect_box_breakout_pullback_4h(df)
assert result["detected"] is False