alphax/app/core/vcp_detector.py
2026-06-01 00:16:01 +08:00

368 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""VCP (Volatility Contraction Pattern) 精细化检测 — 多空双向。
做多 VCP
- 价格从高点回调形成多次 swing low每次回调幅度递减
- 同时成交量逐步萎缩到地量
- 最终一根放量 K 线突破整个收缩区间上沿
- 止损放在最后一次收缩低点,天然 RR 3:1+
做空 VCP顶部分配 / Distribution
- 价格从低点反弹形成多次 swing high每次反弹幅度递减
- 成交量在反弹时递减(买盘衰竭)
- 最终一根放量阴线跌破收缩区间下沿
- 止损放在最后一次反弹高点
核心参数:
- 至少 2 次收缩3 次更佳)
- 每次收缩幅度 < 前一次的 75%
- 最后一次收缩的成交量 < 前一次的 80%
"""
from __future__ import annotations
from typing import Optional
import numpy as np
import pandas as pd
from app.config.config_loader import _get_section as _get_cfg_section
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _vcp_config() -> dict:
try:
cfg = _get_cfg_section("vcp") or {}
except Exception:
cfg = {}
return {
"min_contractions": int(cfg.get("min_contractions", 2)),
"max_contractions": int(cfg.get("max_contractions", 5)),
"contraction_decay_max": float(cfg.get("contraction_decay_max", 0.75)),
"volume_decay_max": float(cfg.get("volume_decay_max", 0.85)),
"min_first_swing_pct": float(cfg.get("min_first_swing_pct", 5.0)),
"breakout_vol_ratio_min": float(cfg.get("breakout_vol_ratio_min", 2.0)),
"lookback_bars": int(cfg.get("lookback_bars", 60)),
"swing_window": int(cfg.get("swing_window", 3)),
# Factor weights
"weight_vcp_bull": float(cfg.get("weight_vcp_bull", 5.0)),
"weight_vcp_bear": float(cfg.get("weight_vcp_bear", 5.0)),
"weight_vcp_forming": float(cfg.get("weight_vcp_forming", 3.0)),
}
# ---------------------------------------------------------------------------
# Swing detection (simplified for VCP — needs clear pivots)
# ---------------------------------------------------------------------------
def _find_swing_highs(df: pd.DataFrame, window: int = 3) -> list[dict]:
"""Find swing highs: local maxima with `window` bars on each side."""
highs = df["high"].values.astype(float)
n = len(highs)
swings = []
for i in range(window, n - window):
if all(highs[i] >= highs[i - j] for j in range(1, window + 1)) and \
all(highs[i] >= highs[i + j] for j in range(1, window + 1)):
swings.append({"index": i, "price": float(highs[i])})
return swings
def _find_swing_lows(df: pd.DataFrame, window: int = 3) -> list[dict]:
"""Find swing lows: local minima with `window` bars on each side."""
lows = df["low"].values.astype(float)
n = len(lows)
swings = []
for i in range(window, n - window):
if all(lows[i] <= lows[i - j] for j in range(1, window + 1)) and \
all(lows[i] <= lows[i + j] for j in range(1, window + 1)):
swings.append({"index": i, "price": float(lows[i])})
return swings
def _avg_volume_around(df: pd.DataFrame, index: int, window: int = 3) -> float:
"""Average volume around a swing point."""
start = max(0, index - window)
end = min(len(df), index + window + 1)
return float(df["volume"].iloc[start:end].mean())
# ---------------------------------------------------------------------------
# VCP Detection — Bullish (做多)
# ---------------------------------------------------------------------------
def detect_vcp_bull(df: pd.DataFrame) -> dict:
"""Detect bullish VCP: swing lows with decreasing depth + volume contraction.
Looks for a series of pullbacks from a resistance level where each pullback
is shallower than the last, indicating sellers are exhausting.
Args:
df: 4H or 1D kline DataFrame (recommend 60+ bars)
Returns:
{
"detected": bool,
"forming": bool, # pattern forming but not yet broken out
"contractions": int, # number of valid contractions
"depths_pct": list, # each contraction depth as %
"volume_ratios": list, # volume at each contraction relative to first
"pivot_high": float, # resistance level (breakout target)
"last_low": float, # last contraction low (stop loss level)
"breakout": bool, # has price broken above pivot_high with volume
"rr_estimate": float, # estimated risk/reward if entering now
"score": float,
"signal": str,
}
"""
cfg = _vcp_config()
result = {"detected": False, "forming": False, "contractions": 0,
"depths_pct": [], "volume_ratios": [], "pivot_high": 0,
"last_low": 0, "breakout": False, "rr_estimate": 0, "score": 0, "signal": ""}
if df is None or len(df) < cfg["lookback_bars"]:
return result
recent = df.tail(cfg["lookback_bars"]).reset_index(drop=True)
swing_highs = _find_swing_highs(recent, cfg["swing_window"])
swing_lows = _find_swing_lows(recent, cfg["swing_window"])
if len(swing_highs) < 1 or len(swing_lows) < cfg["min_contractions"]:
return result
# Find the resistance level (highest swing high in the lookback)
pivot_high = max(s["price"] for s in swing_highs)
result["pivot_high"] = round(pivot_high, 6)
# Measure contraction depths: distance from pivot_high to each swing low
depths = []
volumes = []
for sl in swing_lows:
depth_pct = (pivot_high - sl["price"]) / pivot_high * 100 if pivot_high > 0 else 0
if depth_pct > 0:
vol = _avg_volume_around(recent, sl["index"], cfg["swing_window"])
depths.append({"depth_pct": depth_pct, "price": sl["price"], "index": sl["index"], "volume": vol})
if len(depths) < cfg["min_contractions"]:
return result
# Filter: keep only the last N contractions that show decay
# Start from the deepest (first major pullback) and check subsequent ones decay
depths.sort(key=lambda x: x["index"])
# Find the first significant pullback
valid_sequence = []
for d in depths:
if d["depth_pct"] >= cfg["min_first_swing_pct"]:
valid_sequence = [d]
break
if not valid_sequence:
return result
# Build contraction sequence: each subsequent must be shallower
for d in depths:
if d["index"] <= valid_sequence[-1]["index"]:
continue
if d["depth_pct"] < valid_sequence[-1]["depth_pct"]:
valid_sequence.append(d)
if len(valid_sequence) < cfg["min_contractions"]:
return result
# Verify decay ratios
depth_ratios = []
volume_ratios = []
first_vol = valid_sequence[0]["volume"]
for i in range(1, len(valid_sequence)):
ratio = valid_sequence[i]["depth_pct"] / valid_sequence[i - 1]["depth_pct"]
depth_ratios.append(ratio)
vol_ratio = valid_sequence[i]["volume"] / first_vol if first_vol > 0 else 1
volume_ratios.append(vol_ratio)
# Check all depth ratios are decaying
all_decaying = all(r <= cfg["contraction_decay_max"] for r in depth_ratios)
# Volume should also be contracting (at least the last one)
vol_contracting = volume_ratios[-1] <= cfg["volume_decay_max"] if volume_ratios else False
if not all_decaying:
return result
# Pattern is forming
last_low = valid_sequence[-1]["price"]
result["forming"] = True
result["contractions"] = len(valid_sequence)
result["depths_pct"] = [round(d["depth_pct"], 2) for d in valid_sequence]
result["volume_ratios"] = [round(v, 2) for v in volume_ratios]
result["last_low"] = round(last_low, 6)
# Check for breakout: current price above pivot_high with volume
current_close = float(recent["close"].iloc[-1])
current_vol = float(recent["volume"].iloc[-1])
avg_vol = float(recent["volume"].rolling(20).mean().iloc[-1]) if len(recent) >= 20 else float(recent["volume"].mean())
vol_breakout = current_vol / avg_vol if avg_vol > 0 else 0
if current_close > pivot_high and vol_breakout >= cfg["breakout_vol_ratio_min"]:
result["breakout"] = True
result["detected"] = True
# RR estimate: risk = entry to last_low, reward = depth of first contraction projected up
risk = current_close - last_low
reward = valid_sequence[0]["depth_pct"] / 100 * pivot_high # project first swing depth as target
result["rr_estimate"] = round(reward / risk, 2) if risk > 0 else 0
result["score"] = cfg["weight_vcp_bull"]
result["signal"] = f"VCP突破({len(valid_sequence)}次收缩, 量{vol_breakout:.1f}x, RR≈{result['rr_estimate']})"
elif current_close > last_low and current_close < pivot_high:
# Forming but not broken out yet — still valuable as watch signal
result["detected"] = True
risk = current_close - last_low
reward = pivot_high - current_close + (valid_sequence[0]["depth_pct"] / 100 * pivot_high * 0.5)
result["rr_estimate"] = round(reward / risk, 2) if risk > 0 else 0
result["score"] = cfg["weight_vcp_forming"]
result["signal"] = f"VCP蓄力中({len(valid_sequence)}次收缩{'量缩' if vol_contracting else ''}, 待突破${pivot_high:.4f})"
return result
# ---------------------------------------------------------------------------
# VCP Detection — Bearish / Distribution (做空)
# ---------------------------------------------------------------------------
def detect_vcp_bear(df: pd.DataFrame) -> dict:
"""Detect bearish VCP (distribution): swing highs with decreasing bounce height.
Looks for a series of rallies from a support level where each rally is
weaker than the last, indicating buyers are exhausting.
Args:
df: 4H or 1D kline DataFrame (recommend 60+ bars)
Returns:
Same structure as detect_vcp_bull but for short side.
"""
cfg = _vcp_config()
result = {"detected": False, "forming": False, "contractions": 0,
"depths_pct": [], "volume_ratios": [], "pivot_low": 0,
"last_high": 0, "breakdown": False, "rr_estimate": 0, "score": 0, "signal": ""}
if df is None or len(df) < cfg["lookback_bars"]:
return result
recent = df.tail(cfg["lookback_bars"]).reset_index(drop=True)
swing_highs = _find_swing_highs(recent, cfg["swing_window"])
swing_lows = _find_swing_lows(recent, cfg["swing_window"])
if len(swing_lows) < 1 or len(swing_highs) < cfg["min_contractions"]:
return result
# Find the support level (lowest swing low in the lookback)
pivot_low = min(s["price"] for s in swing_lows)
result["pivot_low"] = round(pivot_low, 6)
# Measure bounce heights: distance from pivot_low to each swing high
bounces = []
for sh in swing_highs:
bounce_pct = (sh["price"] - pivot_low) / pivot_low * 100 if pivot_low > 0 else 0
if bounce_pct > 0:
vol = _avg_volume_around(recent, sh["index"], cfg["swing_window"])
bounces.append({"bounce_pct": bounce_pct, "price": sh["price"], "index": sh["index"], "volume": vol})
if len(bounces) < cfg["min_contractions"]:
return result
bounces.sort(key=lambda x: x["index"])
# Find first significant bounce
valid_sequence = []
for b in bounces:
if b["bounce_pct"] >= cfg["min_first_swing_pct"]:
valid_sequence = [b]
break
if not valid_sequence:
return result
# Build sequence: each subsequent bounce must be weaker
for b in bounces:
if b["index"] <= valid_sequence[-1]["index"]:
continue
if b["bounce_pct"] < valid_sequence[-1]["bounce_pct"]:
valid_sequence.append(b)
if len(valid_sequence) < cfg["min_contractions"]:
return result
# Verify decay
bounce_ratios = []
volume_ratios = []
first_vol = valid_sequence[0]["volume"]
for i in range(1, len(valid_sequence)):
ratio = valid_sequence[i]["bounce_pct"] / valid_sequence[i - 1]["bounce_pct"]
bounce_ratios.append(ratio)
vol_ratio = valid_sequence[i]["volume"] / first_vol if first_vol > 0 else 1
volume_ratios.append(vol_ratio)
all_decaying = all(r <= cfg["contraction_decay_max"] for r in bounce_ratios)
vol_contracting = volume_ratios[-1] <= cfg["volume_decay_max"] if volume_ratios else False
if not all_decaying:
return result
last_high = valid_sequence[-1]["price"]
result["forming"] = True
result["contractions"] = len(valid_sequence)
result["depths_pct"] = [round(b["bounce_pct"], 2) for b in valid_sequence]
result["volume_ratios"] = [round(v, 2) for v in volume_ratios]
result["last_high"] = round(last_high, 6)
# Check for breakdown
current_close = float(recent["close"].iloc[-1])
current_vol = float(recent["volume"].iloc[-1])
avg_vol = float(recent["volume"].rolling(20).mean().iloc[-1]) if len(recent) >= 20 else float(recent["volume"].mean())
vol_breakout = current_vol / avg_vol if avg_vol > 0 else 0
if current_close < pivot_low and vol_breakout >= cfg["breakout_vol_ratio_min"]:
result["breakdown"] = True
result["detected"] = True
risk = last_high - current_close
reward = valid_sequence[0]["bounce_pct"] / 100 * pivot_low
result["rr_estimate"] = round(reward / risk, 2) if risk > 0 else 0
result["score"] = cfg["weight_vcp_bear"]
result["signal"] = f"顶部分配破位({len(valid_sequence)}次反弹递减, 量{vol_breakout:.1f}x, RR≈{result['rr_estimate']})"
elif current_close < last_high and current_close > pivot_low:
result["detected"] = True
risk = last_high - current_close
reward = current_close - pivot_low + (valid_sequence[0]["bounce_pct"] / 100 * pivot_low * 0.5)
result["rr_estimate"] = round(reward / risk, 2) if risk > 0 else 0
result["score"] = cfg["weight_vcp_forming"]
result["signal"] = f"顶部分配蓄力({len(valid_sequence)}次反弹递减{'量缩' if vol_contracting else ''}, 待破位${pivot_low:.4f})"
return result
# ---------------------------------------------------------------------------
# Unified interface
# ---------------------------------------------------------------------------
def detect_vcp(df: pd.DataFrame, direction: str = "both") -> dict:
"""Detect VCP pattern for given direction.
Args:
df: 4H or 1D kline DataFrame
direction: "long", "short", or "both"
Returns:
{"bull": {...}, "bear": {...}} or single direction result
"""
if direction == "long":
return detect_vcp_bull(df)
elif direction == "short":
return detect_vcp_bear(df)
else:
return {
"bull": detect_vcp_bull(df),
"bear": detect_vcp_bear(df),
}