alphax/app/core/volume_profile.py
2026-06-01 00:16:01 +08:00

336 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""简易 Volume Profile — 价格维度的成交量分布HVN/LVN 识别)。
核心逻辑:
- 把过去 N 根 K 线的价格区间等分为若干 bin
- 统计每个 bin 内的累计成交量
- High Volume Node (HVN):成交量显著高于均值的价格区 → 阻力/支撑
- Low Volume Node (LVN):成交量显著低于均值的价格区 → 价格快速穿越区
使用场景(多空双向):
做多:
- 突破后上方是 LVN → TP 可以设远(价格会快速穿越)
- 突破后上方紧挨 HVN → TP 保守(价格容易卡住)
- 当前价格在 HVN 内 → 支撑较强
做空:
- 破位后下方是 LVN → TP 可以设远
- 破位后下方紧挨 HVN → TP 保守
- 当前价格在 HVN 下方 → 阻力较强
"""
from __future__ import annotations
from typing import Optional
import numpy as np
import pandas as pd
from app.config.config_loader import _get_section as _get_cfg_section
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _vp_config() -> dict:
try:
cfg = _get_cfg_section("volume_profile") or {}
except Exception:
cfg = {}
return {
"num_bins": int(cfg.get("num_bins", 30)),
"lookback_bars": int(cfg.get("lookback_bars", 50)),
"hvn_threshold_mult": float(cfg.get("hvn_threshold_mult", 1.5)),
"lvn_threshold_mult": float(cfg.get("lvn_threshold_mult", 0.5)),
"poc_weight": float(cfg.get("poc_weight", 2.0)),
"tp_lvn_bonus": float(cfg.get("tp_lvn_bonus", 1.5)),
"tp_hvn_penalty": float(cfg.get("tp_hvn_penalty", -1.0)),
}
# ---------------------------------------------------------------------------
# Core Volume Profile computation
# ---------------------------------------------------------------------------
def compute_volume_profile(df: pd.DataFrame, num_bins: int = 30, lookback: int = 50) -> dict:
"""Compute volume profile from kline data.
Args:
df: OHLCV DataFrame
num_bins: number of price bins
lookback: number of bars to analyze
Returns:
{
"bins": [{"price_low": f, "price_high": f, "price_mid": f, "volume": f, "type": str}],
"poc": float, # Point of Control (price with highest volume)
"value_area_high": float, # 70% volume area upper bound
"value_area_low": float, # 70% volume area lower bound
"hvn_zones": [{"low": f, "high": f, "volume": f}],
"lvn_zones": [{"low": f, "high": f, "volume": f}],
"range_high": float,
"range_low": float,
"available": bool,
}
"""
if df is None or len(df) < 10:
return {"bins": [], "poc": 0, "value_area_high": 0, "value_area_low": 0,
"hvn_zones": [], "lvn_zones": [], "range_high": 0, "range_low": 0, "available": False}
recent = df.tail(lookback)
range_high = float(recent["high"].max())
range_low = float(recent["low"].min())
if range_high <= range_low or range_low <= 0:
return {"bins": [], "poc": 0, "value_area_high": 0, "value_area_low": 0,
"hvn_zones": [], "lvn_zones": [], "range_high": range_high, "range_low": range_low, "available": False}
# Create price bins
bin_edges = np.linspace(range_low, range_high, num_bins + 1)
bin_volumes = np.zeros(num_bins)
# Distribute volume across bins for each candle
# Each candle's volume is distributed proportionally across the bins it spans
for _, row in recent.iterrows():
candle_low = float(row["low"])
candle_high = float(row["high"])
candle_vol = float(row["volume"])
if candle_high <= candle_low or candle_vol <= 0:
continue
for b in range(num_bins):
bin_low = bin_edges[b]
bin_high = bin_edges[b + 1]
# Calculate overlap between candle range and bin
overlap_low = max(candle_low, bin_low)
overlap_high = min(candle_high, bin_high)
if overlap_high > overlap_low:
# Proportion of candle range that falls in this bin
proportion = (overlap_high - overlap_low) / (candle_high - candle_low)
bin_volumes[b] += candle_vol * proportion
# Build bin list
avg_vol = float(np.mean(bin_volumes)) if np.sum(bin_volumes) > 0 else 1.0
cfg = _vp_config()
hvn_thresh = avg_vol * cfg["hvn_threshold_mult"]
lvn_thresh = avg_vol * cfg["lvn_threshold_mult"]
bins = []
for b in range(num_bins):
vol = float(bin_volumes[b])
if vol >= hvn_thresh:
bin_type = "hvn"
elif vol <= lvn_thresh:
bin_type = "lvn"
else:
bin_type = "normal"
bins.append({
"price_low": round(float(bin_edges[b]), 6),
"price_high": round(float(bin_edges[b + 1]), 6),
"price_mid": round(float((bin_edges[b] + bin_edges[b + 1]) / 2), 6),
"volume": round(vol, 2),
"type": bin_type,
})
# Point of Control (POC): bin with highest volume
poc_idx = int(np.argmax(bin_volumes))
poc = float((bin_edges[poc_idx] + bin_edges[poc_idx + 1]) / 2)
# Value Area (70% of total volume centered on POC)
total_vol = float(np.sum(bin_volumes))
target_vol = total_vol * 0.70
accumulated = float(bin_volumes[poc_idx])
va_low_idx = poc_idx
va_high_idx = poc_idx
while accumulated < target_vol and (va_low_idx > 0 or va_high_idx < num_bins - 1):
expand_low = float(bin_volumes[va_low_idx - 1]) if va_low_idx > 0 else 0
expand_high = float(bin_volumes[va_high_idx + 1]) if va_high_idx < num_bins - 1 else 0
if expand_low >= expand_high and va_low_idx > 0:
va_low_idx -= 1
accumulated += expand_low
elif va_high_idx < num_bins - 1:
va_high_idx += 1
accumulated += expand_high
else:
va_low_idx -= 1
accumulated += expand_low
value_area_low = float(bin_edges[va_low_idx])
value_area_high = float(bin_edges[va_high_idx + 1])
# Collect HVN and LVN zones (merge adjacent bins of same type)
hvn_zones = _merge_zones(bins, "hvn")
lvn_zones = _merge_zones(bins, "lvn")
return {
"bins": bins,
"poc": round(poc, 6),
"value_area_high": round(value_area_high, 6),
"value_area_low": round(value_area_low, 6),
"hvn_zones": hvn_zones,
"lvn_zones": lvn_zones,
"range_high": round(range_high, 6),
"range_low": round(range_low, 6),
"available": True,
}
def _merge_zones(bins: list[dict], zone_type: str) -> list[dict]:
"""Merge adjacent bins of the same type into zones."""
zones = []
current = None
for b in bins:
if b["type"] == zone_type:
if current is None:
current = {"low": b["price_low"], "high": b["price_high"], "volume": b["volume"]}
else:
current["high"] = b["price_high"]
current["volume"] += b["volume"]
else:
if current is not None:
zones.append({k: round(v, 6) if isinstance(v, float) else v for k, v in current.items()})
current = None
if current is not None:
zones.append({k: round(v, 6) if isinstance(v, float) else v for k, v in current.items()})
return zones
# ---------------------------------------------------------------------------
# TP optimization based on Volume Profile
# ---------------------------------------------------------------------------
def find_nearest_node(price: float, direction: str, vp_data: dict) -> dict:
"""Find the nearest HVN or LVN in the given direction from current price.
Args:
price: current price
direction: "above" or "below"
vp_data: result from compute_volume_profile
Returns:
{
"nearest_hvn": {"low": f, "high": f, "distance_pct": f} or None,
"nearest_lvn": {"low": f, "high": f, "distance_pct": f} or None,
"path_type": "clear" / "blocked" / "mixed",
"tp_adjustment": str,
}
"""
if not vp_data.get("available"):
return {"nearest_hvn": None, "nearest_lvn": None, "path_type": "unknown", "tp_adjustment": ""}
hvn_zones = vp_data.get("hvn_zones", [])
lvn_zones = vp_data.get("lvn_zones", [])
nearest_hvn = None
nearest_lvn = None
if direction == "above":
# Find first HVN above price
for zone in hvn_zones:
if zone["low"] > price:
dist = (zone["low"] - price) / price * 100
nearest_hvn = {"low": zone["low"], "high": zone["high"], "distance_pct": round(dist, 2)}
break
# Find first LVN above price
for zone in lvn_zones:
if zone["low"] > price:
dist = (zone["low"] - price) / price * 100
nearest_lvn = {"low": zone["low"], "high": zone["high"], "distance_pct": round(dist, 2)}
break
else: # below
# Find first HVN below price (search from high to low)
for zone in reversed(hvn_zones):
if zone["high"] < price:
dist = (price - zone["high"]) / price * 100
nearest_hvn = {"low": zone["low"], "high": zone["high"], "distance_pct": round(dist, 2)}
break
# Find first LVN below price
for zone in reversed(lvn_zones):
if zone["high"] < price:
dist = (price - zone["high"]) / price * 100
nearest_lvn = {"low": zone["low"], "high": zone["high"], "distance_pct": round(dist, 2)}
break
# Determine path type
if nearest_lvn and (not nearest_hvn or nearest_lvn["distance_pct"] < nearest_hvn["distance_pct"]):
path_type = "clear"
tp_adjustment = "TP可设远(前方低量区,价格易快速穿越)"
elif nearest_hvn and (not nearest_lvn or nearest_hvn["distance_pct"] < nearest_lvn["distance_pct"]):
path_type = "blocked"
tp_adjustment = f"TP应保守(前方{nearest_hvn['distance_pct']:.1f}%处有高量区阻力)"
else:
path_type = "mixed"
tp_adjustment = ""
return {
"nearest_hvn": nearest_hvn,
"nearest_lvn": nearest_lvn,
"path_type": path_type,
"tp_adjustment": tp_adjustment,
}
# ---------------------------------------------------------------------------
# Factor scoring interface
# ---------------------------------------------------------------------------
def vp_factor_context(
df: pd.DataFrame,
current_price: float,
trade_direction: str = "long",
) -> dict:
"""Compute Volume Profile context for factor scoring.
Args:
df: 4H kline DataFrame
current_price: current price
trade_direction: "long" or "short"
Returns:
{
"vp": volume profile data,
"path_analysis": nearest node analysis,
"score_delta": float,
"signal": str,
"poc": float,
"in_value_area": bool,
}
"""
cfg = _vp_config()
vp = compute_volume_profile(df, cfg["num_bins"], cfg["lookback_bars"])
if not vp.get("available"):
return {"vp": vp, "path_analysis": {}, "score_delta": 0, "signal": "", "poc": 0, "in_value_area": False}
# Determine which direction to look for obstacles
look_dir = "above" if trade_direction == "long" else "below"
path = find_nearest_node(current_price, look_dir, vp)
# Score adjustment
score_delta = 0.0
signal = ""
if path["path_type"] == "clear":
score_delta = cfg["tp_lvn_bonus"]
signal = f"VP路径清晰({path['tp_adjustment']})"
elif path["path_type"] == "blocked":
score_delta = cfg["tp_hvn_penalty"]
signal = f"VP路径受阻({path['tp_adjustment']})"
# Check if price is in value area (support/resistance context)
in_value_area = vp["value_area_low"] <= current_price <= vp["value_area_high"]
return {
"vp": vp,
"path_analysis": path,
"score_delta": round(score_delta, 2),
"signal": signal,
"poc": vp["poc"],
"in_value_area": in_value_area,
}