"""简易 Volume Profile — 价格维度的成交量分布(HVN/LVN 识别)。 核心逻辑: - 把过去 N 根 K 线的价格区间等分为若干 bin - 统计每个 bin 内的累计成交量 - High Volume Node (HVN):成交量显著高于均值的价格区 → 阻力/支撑 - Low Volume Node (LVN):成交量显著低于均值的价格区 → 价格快速穿越区 使用场景(多空双向): 做多: - 突破后上方是 LVN → TP 可以设远(价格会快速穿越) - 突破后上方紧挨 HVN → TP 保守(价格容易卡住) - 当前价格在 HVN 内 → 支撑较强 做空: - 破位后下方是 LVN → TP 可以设远 - 破位后下方紧挨 HVN → TP 保守 - 当前价格在 HVN 下方 → 阻力较强 """ from __future__ import annotations from typing import Optional import numpy as np import pandas as pd from app.config.config_loader import _get_section as _get_cfg_section # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- def _vp_config() -> dict: try: cfg = _get_cfg_section("volume_profile") or {} except Exception: cfg = {} return { "num_bins": int(cfg.get("num_bins", 30)), "lookback_bars": int(cfg.get("lookback_bars", 50)), "hvn_threshold_mult": float(cfg.get("hvn_threshold_mult", 1.5)), "lvn_threshold_mult": float(cfg.get("lvn_threshold_mult", 0.5)), "poc_weight": float(cfg.get("poc_weight", 2.0)), "tp_lvn_bonus": float(cfg.get("tp_lvn_bonus", 1.5)), "tp_hvn_penalty": float(cfg.get("tp_hvn_penalty", -1.0)), } # --------------------------------------------------------------------------- # Core Volume Profile computation # --------------------------------------------------------------------------- def compute_volume_profile(df: pd.DataFrame, num_bins: int = 30, lookback: int = 50) -> dict: """Compute volume profile from kline data. Args: df: OHLCV DataFrame num_bins: number of price bins lookback: number of bars to analyze Returns: { "bins": [{"price_low": f, "price_high": f, "price_mid": f, "volume": f, "type": str}], "poc": float, # Point of Control (price with highest volume) "value_area_high": float, # 70% volume area upper bound "value_area_low": float, # 70% volume area lower bound "hvn_zones": [{"low": f, "high": f, "volume": f}], "lvn_zones": [{"low": f, "high": f, "volume": f}], "range_high": float, "range_low": float, "available": bool, } """ if df is None or len(df) < 10: return {"bins": [], "poc": 0, "value_area_high": 0, "value_area_low": 0, "hvn_zones": [], "lvn_zones": [], "range_high": 0, "range_low": 0, "available": False} recent = df.tail(lookback) range_high = float(recent["high"].max()) range_low = float(recent["low"].min()) if range_high <= range_low or range_low <= 0: return {"bins": [], "poc": 0, "value_area_high": 0, "value_area_low": 0, "hvn_zones": [], "lvn_zones": [], "range_high": range_high, "range_low": range_low, "available": False} # Create price bins bin_edges = np.linspace(range_low, range_high, num_bins + 1) bin_volumes = np.zeros(num_bins) # Distribute volume across bins for each candle # Each candle's volume is distributed proportionally across the bins it spans for _, row in recent.iterrows(): candle_low = float(row["low"]) candle_high = float(row["high"]) candle_vol = float(row["volume"]) if candle_high <= candle_low or candle_vol <= 0: continue for b in range(num_bins): bin_low = bin_edges[b] bin_high = bin_edges[b + 1] # Calculate overlap between candle range and bin overlap_low = max(candle_low, bin_low) overlap_high = min(candle_high, bin_high) if overlap_high > overlap_low: # Proportion of candle range that falls in this bin proportion = (overlap_high - overlap_low) / (candle_high - candle_low) bin_volumes[b] += candle_vol * proportion # Build bin list avg_vol = float(np.mean(bin_volumes)) if np.sum(bin_volumes) > 0 else 1.0 cfg = _vp_config() hvn_thresh = avg_vol * cfg["hvn_threshold_mult"] lvn_thresh = avg_vol * cfg["lvn_threshold_mult"] bins = [] for b in range(num_bins): vol = float(bin_volumes[b]) if vol >= hvn_thresh: bin_type = "hvn" elif vol <= lvn_thresh: bin_type = "lvn" else: bin_type = "normal" bins.append({ "price_low": round(float(bin_edges[b]), 6), "price_high": round(float(bin_edges[b + 1]), 6), "price_mid": round(float((bin_edges[b] + bin_edges[b + 1]) / 2), 6), "volume": round(vol, 2), "type": bin_type, }) # Point of Control (POC): bin with highest volume poc_idx = int(np.argmax(bin_volumes)) poc = float((bin_edges[poc_idx] + bin_edges[poc_idx + 1]) / 2) # Value Area (70% of total volume centered on POC) total_vol = float(np.sum(bin_volumes)) target_vol = total_vol * 0.70 accumulated = float(bin_volumes[poc_idx]) va_low_idx = poc_idx va_high_idx = poc_idx while accumulated < target_vol and (va_low_idx > 0 or va_high_idx < num_bins - 1): expand_low = float(bin_volumes[va_low_idx - 1]) if va_low_idx > 0 else 0 expand_high = float(bin_volumes[va_high_idx + 1]) if va_high_idx < num_bins - 1 else 0 if expand_low >= expand_high and va_low_idx > 0: va_low_idx -= 1 accumulated += expand_low elif va_high_idx < num_bins - 1: va_high_idx += 1 accumulated += expand_high else: va_low_idx -= 1 accumulated += expand_low value_area_low = float(bin_edges[va_low_idx]) value_area_high = float(bin_edges[va_high_idx + 1]) # Collect HVN and LVN zones (merge adjacent bins of same type) hvn_zones = _merge_zones(bins, "hvn") lvn_zones = _merge_zones(bins, "lvn") return { "bins": bins, "poc": round(poc, 6), "value_area_high": round(value_area_high, 6), "value_area_low": round(value_area_low, 6), "hvn_zones": hvn_zones, "lvn_zones": lvn_zones, "range_high": round(range_high, 6), "range_low": round(range_low, 6), "available": True, } def _merge_zones(bins: list[dict], zone_type: str) -> list[dict]: """Merge adjacent bins of the same type into zones.""" zones = [] current = None for b in bins: if b["type"] == zone_type: if current is None: current = {"low": b["price_low"], "high": b["price_high"], "volume": b["volume"]} else: current["high"] = b["price_high"] current["volume"] += b["volume"] else: if current is not None: zones.append({k: round(v, 6) if isinstance(v, float) else v for k, v in current.items()}) current = None if current is not None: zones.append({k: round(v, 6) if isinstance(v, float) else v for k, v in current.items()}) return zones # --------------------------------------------------------------------------- # TP optimization based on Volume Profile # --------------------------------------------------------------------------- def find_nearest_node(price: float, direction: str, vp_data: dict) -> dict: """Find the nearest HVN or LVN in the given direction from current price. Args: price: current price direction: "above" or "below" vp_data: result from compute_volume_profile Returns: { "nearest_hvn": {"low": f, "high": f, "distance_pct": f} or None, "nearest_lvn": {"low": f, "high": f, "distance_pct": f} or None, "path_type": "clear" / "blocked" / "mixed", "tp_adjustment": str, } """ if not vp_data.get("available"): return {"nearest_hvn": None, "nearest_lvn": None, "path_type": "unknown", "tp_adjustment": ""} hvn_zones = vp_data.get("hvn_zones", []) lvn_zones = vp_data.get("lvn_zones", []) nearest_hvn = None nearest_lvn = None if direction == "above": # Find first HVN above price for zone in hvn_zones: if zone["low"] > price: dist = (zone["low"] - price) / price * 100 nearest_hvn = {"low": zone["low"], "high": zone["high"], "distance_pct": round(dist, 2)} break # Find first LVN above price for zone in lvn_zones: if zone["low"] > price: dist = (zone["low"] - price) / price * 100 nearest_lvn = {"low": zone["low"], "high": zone["high"], "distance_pct": round(dist, 2)} break else: # below # Find first HVN below price (search from high to low) for zone in reversed(hvn_zones): if zone["high"] < price: dist = (price - zone["high"]) / price * 100 nearest_hvn = {"low": zone["low"], "high": zone["high"], "distance_pct": round(dist, 2)} break # Find first LVN below price for zone in reversed(lvn_zones): if zone["high"] < price: dist = (price - zone["high"]) / price * 100 nearest_lvn = {"low": zone["low"], "high": zone["high"], "distance_pct": round(dist, 2)} break # Determine path type if nearest_lvn and (not nearest_hvn or nearest_lvn["distance_pct"] < nearest_hvn["distance_pct"]): path_type = "clear" tp_adjustment = "TP可设远(前方低量区,价格易快速穿越)" elif nearest_hvn and (not nearest_lvn or nearest_hvn["distance_pct"] < nearest_lvn["distance_pct"]): path_type = "blocked" tp_adjustment = f"TP应保守(前方{nearest_hvn['distance_pct']:.1f}%处有高量区阻力)" else: path_type = "mixed" tp_adjustment = "" return { "nearest_hvn": nearest_hvn, "nearest_lvn": nearest_lvn, "path_type": path_type, "tp_adjustment": tp_adjustment, } # --------------------------------------------------------------------------- # Factor scoring interface # --------------------------------------------------------------------------- def vp_factor_context( df: pd.DataFrame, current_price: float, trade_direction: str = "long", ) -> dict: """Compute Volume Profile context for factor scoring. Args: df: 4H kline DataFrame current_price: current price trade_direction: "long" or "short" Returns: { "vp": volume profile data, "path_analysis": nearest node analysis, "score_delta": float, "signal": str, "poc": float, "in_value_area": bool, } """ cfg = _vp_config() vp = compute_volume_profile(df, cfg["num_bins"], cfg["lookback_bars"]) if not vp.get("available"): return {"vp": vp, "path_analysis": {}, "score_delta": 0, "signal": "", "poc": 0, "in_value_area": False} # Determine which direction to look for obstacles look_dir = "above" if trade_direction == "long" else "below" path = find_nearest_node(current_price, look_dir, vp) # Score adjustment score_delta = 0.0 signal = "" if path["path_type"] == "clear": score_delta = cfg["tp_lvn_bonus"] signal = f"VP路径清晰({path['tp_adjustment']})" elif path["path_type"] == "blocked": score_delta = cfg["tp_hvn_penalty"] signal = f"VP路径受阻({path['tp_adjustment']})" # Check if price is in value area (support/resistance context) in_value_area = vp["value_area_low"] <= current_price <= vp["value_area_high"] return { "vp": vp, "path_analysis": path, "score_delta": round(score_delta, 2), "signal": signal, "poc": vp["poc"], "in_value_area": in_value_area, }