from typing import Any, Dict, Optional import numpy as np import pandas as pd from app.utils.logger import logger class FeatureEngine: """将原始 K 线转换为供策略与 LLM 使用的高价值特征。""" def get_session_open(self, df: Optional[pd.DataFrame]) -> Optional[float]: """获取当前交易日开盘价。""" if df is None or df.empty: return None try: if 'open_time' not in df.columns: return float(df.iloc[-24]['open']) if len(df) >= 24 else float(df.iloc[0]['open']) latest_time = pd.to_datetime(df['open_time'].iloc[-1]) session_start = latest_time.normalize() today_bars = df[df['open_time'] >= session_start] if not today_bars.empty: return float(today_bars.iloc[0]['open']) except Exception as e: logger.debug(f"获取交易日开盘价失败: {e}") return float(df.iloc[0]['open']) if not df.empty else None def calculate_session_vwap(self, df: Optional[pd.DataFrame]) -> Optional[float]: """计算当前交易日 VWAP。""" if df is None or df.empty or 'volume' not in df.columns: return None try: session_df = df if 'open_time' in df.columns: latest_time = pd.to_datetime(df['open_time'].iloc[-1]) session_start = latest_time.normalize() session_df = df[df['open_time'] >= session_start] if session_df.empty: return None typical_price = (session_df['high'] + session_df['low'] + session_df['close']) / 3 volume = session_df['volume'].replace(0, np.nan) total_volume = volume.sum() if pd.isna(total_volume) or total_volume <= 0: return None return float((typical_price * session_df['volume']).sum() / total_volume) except Exception as e: logger.debug(f"计算 VWAP 失败: {e}") return None def calculate_opening_range(self, df: Optional[pd.DataFrame], bars: int = 6) -> Optional[Dict[str, float]]: """计算前 30 分钟开盘区间。""" if df is None or df.empty or len(df) < bars: return None try: session_df = df if 'open_time' in df.columns: latest_time = pd.to_datetime(df['open_time'].iloc[-1]) session_start = latest_time.normalize() session_df = df[df['open_time'] >= session_start] session_df = session_df.iloc[:bars] if session_df.empty: return None return { 'high': float(session_df['high'].max()), 'low': float(session_df['low'].min()) } except Exception as e: logger.debug(f"计算开盘区间失败: {e}") return None def summarize_timeframe_features(self, df: Optional[pd.DataFrame], timeframe: str) -> Dict[str, Any]: """将单个周期的 K 线转换为高价值特征摘要。""" feature = { 'timeframe': timeframe, 'available': False, 'close': None, 'ema_alignment': 'neutral', 'structure': 'unknown', 'momentum_3': None, 'momentum_12': None, 'rsi': None, 'atr_pct': None, 'volume_ratio': None, 'distance_to_ema20': None, 'distance_to_recent_high': None, 'distance_to_recent_low': None, 'is_accelerating': False, 'adx': None, 'trend_strength_adx': 'unknown', 'body_ratio': None, 'close_position_in_bar': None, 'upper_wick_ratio': None, 'lower_wick_ratio': None, 'range_expansion_ratio': None, 'pressure_bias': 'neutral', 'volume_price_state': 'neutral', 'breakout_quality': 'none', 'pullback_quality': 'neutral', 'rejection_signal': 'none', 'exhaustion_risk': 'low', } if df is None or df.empty or len(df) < 20: return feature latest = df.iloc[-1] close = float(latest['close']) ema5 = latest.get('ema5') ema10 = latest.get('ema10') ema20 = latest.get('ema20') rsi = latest.get('rsi') atr = latest.get('atr') structure = self.infer_price_structure(df) candle_context = self.analyze_candle_context(df) volume_price = self.analyze_volume_price(df, structure=structure, candle_context=candle_context) feature.update({ 'available': True, 'close': close, 'rsi': float(rsi) if pd.notna(rsi) else None, 'atr_pct': float(atr / close * 100) if pd.notna(atr) and close > 0 else None, 'distance_to_ema20': self.distance_percent(close, ema20), 'structure': structure, 'momentum_3': self.window_return(df, 3), 'momentum_12': self.window_return(df, 12), 'volume_ratio': self.calculate_volume_ratio(df), 'is_accelerating': self.is_accelerating(df), **candle_context, **volume_price, }) adx = latest.get('adx') if pd.notna(adx): feature['adx'] = float(adx) if adx >= 40: feature['trend_strength_adx'] = 'strong' elif adx >= 25: feature['trend_strength_adx'] = 'moderate' elif adx >= 20: feature['trend_strength_adx'] = 'weak' else: feature['trend_strength_adx'] = 'ranging' if pd.notna(ema5) and pd.notna(ema10) and pd.notna(ema20): if ema5 > ema10 > ema20: feature['ema_alignment'] = 'bull' elif ema5 < ema10 < ema20: feature['ema_alignment'] = 'bear' else: feature['ema_alignment'] = 'mixed' recent_window = df.iloc[-20:] recent_high = float(recent_window['high'].max()) recent_low = float(recent_window['low'].min()) feature['distance_to_recent_high'] = self.distance_percent(close, recent_high) feature['distance_to_recent_low'] = self.distance_percent(close, recent_low) return feature def analyze_candle_context(self, df: pd.DataFrame, window: int = 20) -> Dict[str, Any]: """提炼最新 K 线的实体、影线、收盘位置和波动扩张。""" context = { 'body_ratio': None, 'close_position_in_bar': None, 'upper_wick_ratio': None, 'lower_wick_ratio': None, 'range_expansion_ratio': None, } if df is None or df.empty: return context latest = df.iloc[-1] open_price = self._resolve_open_price(df) close_price = float(latest['close']) high_price = float(latest['high']) low_price = float(latest['low']) bar_range = max(high_price - low_price, 1e-9) body_high = max(open_price, close_price) body_low = min(open_price, close_price) avg_range = None if len(df) > 1: recent_window = df.iloc[-window:] avg_range = float((recent_window['high'] - recent_window['low']).mean()) context['body_ratio'] = body_ratio = abs(close_price - open_price) / bar_range context['close_position_in_bar'] = (close_price - low_price) / bar_range context['upper_wick_ratio'] = (high_price - body_high) / bar_range context['lower_wick_ratio'] = (body_low - low_price) / bar_range if avg_range and avg_range > 0: context['range_expansion_ratio'] = bar_range / avg_range return context def analyze_volume_price( self, df: pd.DataFrame, *, structure: str, candle_context: Optional[Dict[str, Any]] = None, volume_window: int = 20, ) -> Dict[str, Any]: """将量价关系压缩成交易含义明确的状态字段。""" result = { 'pressure_bias': 'neutral', 'volume_price_state': 'neutral', 'breakout_quality': 'none', 'pullback_quality': 'neutral', 'rejection_signal': 'none', 'exhaustion_risk': 'low', } if df is None or len(df) < 8: return result candle_context = candle_context or self.analyze_candle_context(df) volume_ratio = self.calculate_volume_ratio(df, window=volume_window) recent_move = self.window_return(df, 3) or 0.0 body_ratio = float(candle_context.get('body_ratio') or 0) close_position = float(candle_context.get('close_position_in_bar') or 0.5) upper_wick_ratio = float(candle_context.get('upper_wick_ratio') or 0) lower_wick_ratio = float(candle_context.get('lower_wick_ratio') or 0) range_expansion = float(candle_context.get('range_expansion_ratio') or 1.0) close_price = float(df['close'].iloc[-1]) prior_window = df.iloc[-min(max(volume_window, 8) + 1, len(df)):-1] prior_high = float(prior_window['high'].max()) if not prior_window.empty else close_price prior_low = float(prior_window['low'].min()) if not prior_window.empty else close_price breakout_up = close_price > prior_high breakout_down = close_price < prior_low if breakout_up: if volume_ratio >= 1.2 and body_ratio >= 0.55 and close_position >= 0.72: result['breakout_quality'] = 'acceptance_breakout_up' else: result['breakout_quality'] = 'weak_breakout_up' elif breakout_down: if volume_ratio >= 1.2 and body_ratio >= 0.55 and close_position <= 0.28: result['breakout_quality'] = 'acceptance_breakout_down' else: result['breakout_quality'] = 'weak_breakout_down' if upper_wick_ratio >= 0.4 and close_position <= 0.45 and volume_ratio >= 1.15: result['rejection_signal'] = 'bearish_rejection' elif lower_wick_ratio >= 0.4 and close_position >= 0.55 and volume_ratio >= 1.15: result['rejection_signal'] = 'bullish_rejection' if structure == 'HH/HL' and recent_move < 0: result['pullback_quality'] = 'healthy_pullback' if volume_ratio <= 0.95 else 'heavy_sell_pullback' elif structure == 'LH/LL' and recent_move > 0: result['pullback_quality'] = 'healthy_pullback' if volume_ratio <= 0.95 else 'heavy_buy_pullback' if abs(recent_move) >= 1.5 and volume_ratio >= 1.8 and body_ratio <= 0.35: if recent_move > 0: result['exhaustion_risk'] = 'upside_climax' elif recent_move < 0: result['exhaustion_risk'] = 'downside_climax' elif volume_ratio >= 1.6 and range_expansion <= 0.8: result['exhaustion_risk'] = 'high_volume_churn' if result['breakout_quality'] == 'acceptance_breakout_up': result['pressure_bias'] = 'bullish' result['volume_price_state'] = 'bullish_acceptance' elif result['breakout_quality'] == 'acceptance_breakout_down': result['pressure_bias'] = 'bearish' result['volume_price_state'] = 'bearish_acceptance' elif result['rejection_signal'] == 'bullish_rejection': result['pressure_bias'] = 'bullish' result['volume_price_state'] = 'bullish_rejection' elif result['rejection_signal'] == 'bearish_rejection': result['pressure_bias'] = 'bearish' result['volume_price_state'] = 'bearish_rejection' elif structure == 'HH/HL' and recent_move > 0 and volume_ratio >= 1.05 and close_position >= 0.6: result['pressure_bias'] = 'bullish' result['volume_price_state'] = 'bullish_continuation' elif structure == 'LH/LL' and recent_move < 0 and volume_ratio >= 1.05 and close_position <= 0.4: result['pressure_bias'] = 'bearish' result['volume_price_state'] = 'bearish_continuation' elif result['pullback_quality'] == 'healthy_pullback': result['volume_price_state'] = 'pullback_on_light_volume' elif result['pullback_quality'] in {'heavy_sell_pullback', 'heavy_buy_pullback'}: result['volume_price_state'] = 'counter_pressure_expanding' return result def infer_price_structure(self, df: pd.DataFrame, lookback: int = 20) -> str: """根据分段高低点判断 HH/HL / LH/LL / 区间。""" if df is None or len(df) < lookback: return "unknown" window = df.iloc[-lookback:] half = max(lookback // 2, 5) first = window.iloc[:half] second = window.iloc[-half:] prev_high = float(first['high'].max()) prev_low = float(first['low'].min()) recent_high = float(second['high'].max()) recent_low = float(second['low'].min()) if recent_high > prev_high and recent_low > prev_low: return "HH/HL" if recent_high < prev_high and recent_low < prev_low: return "LH/LL" return "range/mixed" def window_return(self, df: pd.DataFrame, bars: int) -> Optional[float]: if df is None or len(df) <= bars: return None start_price = float(df['close'].iloc[-bars - 1]) end_price = float(df['close'].iloc[-1]) if start_price <= 0: return None return (end_price - start_price) / start_price * 100 def calculate_volume_ratio(self, df: pd.DataFrame, window: int = 20) -> float: if df is None or len(df) <= 1: return 1.0 lookback = min(window, len(df) - 1) latest_volume = float(df['volume'].iloc[-1]) baseline = float(df['volume'].iloc[-(lookback + 1):-1].mean()) if baseline <= 0: return 1.0 return latest_volume / baseline def is_accelerating(self, df: pd.DataFrame, bars: int = 3, threshold: float = 0.3) -> bool: if df is None or len(df) < bars + 1: return False closes = df['close'].iloc[-(bars + 1):].values changes = [ (closes[i] - closes[i - 1]) / closes[i - 1] * 100 for i in range(1, len(closes)) if closes[i - 1] > 0 ] if len(changes) < bars: return False same_direction = all(change > 0 for change in changes) or all(change < 0 for change in changes) large_enough = sum(1 for change in changes if abs(change) >= threshold) >= bars - 1 return same_direction and large_enough def distance_percent(self, value: Optional[float], reference: Optional[float]) -> Optional[float]: if value is None or reference is None or pd.isna(reference) or reference == 0: return None return (float(value) - float(reference)) / float(reference) * 100 def _resolve_open_price(self, df: pd.DataFrame) -> float: latest = df.iloc[-1] open_value = latest.get('open') if pd.notna(open_value): return float(open_value) if len(df) >= 2: return float(df['close'].iloc[-2]) return float(latest['close'])