""" Market structure analysis: trend, support/resistance, breakouts """ import logging from typing import List, Dict, Any, Tuple, Optional import pandas as pd import numpy as np from .config import config logger = logging.getLogger(__name__) class MarketStructureAnalyzer: """Analyze market structure, S/R levels, and trend""" @staticmethod def identify_trend(df: pd.DataFrame) -> Dict[str, Any]: """ Identify trend direction and strength Returns: Dict with trend info """ if df.empty or len(df) < 50: return {'direction': 'unknown', 'strength': 0} latest = df.iloc[-1] # EMA comparison ema_20 = latest.get(f'ema_{config.EMA_FAST}', 0) ema_50 = latest.get(f'ema_{config.EMA_SLOW}', 0) adx = latest.get('adx', 0) # Determine direction if ema_20 > ema_50: direction = '上涨' if adx > 25: strength = 'strong' elif adx > 20: strength = 'moderate' else: strength = 'weak' elif ema_20 < ema_50: direction = '下跌' if adx > 25: strength = 'strong' elif adx > 20: strength = 'moderate' else: strength = 'weak' else: direction = '震荡' strength = 'weak' # Detect trend phase rsi = latest.get('rsi', 50) if direction == '上涨': if rsi > 70: phase = '上涨中的强势回调' elif rsi > 55: phase = '上涨中' else: phase = '上涨后回调' elif direction == '下跌': if rsi < 30: phase = '下跌中的超卖反弹' elif rsi < 45: phase = '下跌中' else: phase = '下跌后反弹' else: phase = '震荡盘整' return { 'direction': direction, 'strength': strength, 'phase': phase, 'adx': round(adx, 1), 'ema_alignment': 'bullish' if ema_20 > ema_50 else 'bearish', } @staticmethod def find_support_resistance(df: pd.DataFrame, current_price: float) -> Dict[str, Any]: """ Find support and resistance levels Args: df: DataFrame with OHLCV data current_price: Current market price Returns: Dict with S/R levels """ if df.empty or len(df) < config.SR_LOOKBACK: return {'support': [], 'resistance': []} lookback_df = df.tail(config.SR_LOOKBACK) # Find local highs (resistance) and lows (support) highs = MarketStructureAnalyzer._find_local_extrema(lookback_df['high'], 'high') lows = MarketStructureAnalyzer._find_local_extrema(lookback_df['low'], 'low') # Cluster similar levels support_levels = MarketStructureAnalyzer._cluster_levels(lows, current_price) resistance_levels = MarketStructureAnalyzer._cluster_levels(highs, current_price) # Filter to levels near current price (±5%) support = [s for s in support_levels if s < current_price and s > current_price * 0.95] resistance = [r for r in resistance_levels if r > current_price and r < current_price * 1.05] # Sort and get closest support = sorted(support, reverse=True)[:3] # Top 3 support levels resistance = sorted(resistance)[:3] # Top 3 resistance levels return { 'support': [round(s, 2) for s in support], 'resistance': [round(r, 2) for r in resistance], 'nearest_support': round(support[0], 2) if support else None, 'nearest_resistance': round(resistance[0], 2) if resistance else None, } @staticmethod def _find_local_extrema(series: pd.Series, kind: str) -> List[float]: """Find local highs or lows""" extrema = [] for i in range(2, len(series) - 2): if kind == 'high': # Local high if (series.iloc[i] > series.iloc[i-1] and series.iloc[i] > series.iloc[i-2] and series.iloc[i] > series.iloc[i+1] and series.iloc[i] > series.iloc[i+2]): extrema.append(series.iloc[i]) else: # Local low if (series.iloc[i] < series.iloc[i-1] and series.iloc[i] < series.iloc[i-2] and series.iloc[i] < series.iloc[i+1] and series.iloc[i] < series.iloc[i+2]): extrema.append(series.iloc[i]) return extrema @staticmethod def _cluster_levels(levels: List[float], reference_price: float) -> List[float]: """Cluster similar price levels""" if not levels: return [] tolerance = reference_price * config.SR_TOLERANCE clustered = [] sorted_levels = sorted(levels) current_cluster = [sorted_levels[0]] for level in sorted_levels[1:]: if abs(level - current_cluster[-1]) < tolerance: current_cluster.append(level) else: # Average the cluster clustered.append(np.mean(current_cluster)) current_cluster = [level] # Add last cluster if current_cluster: clustered.append(np.mean(current_cluster)) return clustered @staticmethod def detect_breakout(df: pd.DataFrame, sr_levels: Dict[str, Any]) -> Dict[str, Any]: """ Detect if price has broken through S/R levels Returns: Dict with breakout info """ if df.empty or len(df) < 5: return {'has_breakout': False} latest = df.iloc[-1] current_price = latest['close'] recent_high = df.tail(20)['high'].max() recent_low = df.tail(20)['low'].min() # Check resistance breakout resistance = sr_levels.get('nearest_resistance') if resistance and current_price > resistance: # Confirm breakout (price closed above resistance) return { 'has_breakout': True, 'type': 'resistance_breakout', 'level': resistance, 'confirmation': '价格突破压力位' if latest['close'] > resistance else '未确认', } # Check support breakdown support = sr_levels.get('nearest_support') if support and current_price < support: return { 'has_breakout': True, 'type': 'support_breakdown', 'level': support, 'confirmation': '价格跌破支撑位' if latest['close'] < support else '未确认', } # Check if approaching key level if resistance and abs(current_price - resistance) / resistance < 0.005: # Within 0.5% return { 'has_breakout': False, 'approaching': 'resistance', 'level': resistance, 'distance_pct': round((resistance - current_price) / current_price * 100, 2), } if support and abs(current_price - support) / support < 0.005: return { 'has_breakout': False, 'approaching': 'support', 'level': support, 'distance_pct': round((current_price - support) / current_price * 100, 2), } return {'has_breakout': False} @staticmethod def calculate_momentum(df: pd.DataFrame) -> Dict[str, Any]: """ Calculate momentum indicators Returns: Dict with momentum analysis """ if df.empty: return {} latest = df.iloc[-1] prev = df.iloc[-2] if len(df) > 1 else latest rsi = latest.get('rsi', 50) macd_hist = latest.get('macd_hist', 0) prev_macd_hist = prev.get('macd_hist', 0) # RSI status if rsi > 70: rsi_status = '超买' elif rsi > 60: rsi_status = '强势' elif rsi > 50: rsi_status = '中性偏强' elif rsi > 40: rsi_status = '中性偏弱' elif rsi > 30: rsi_status = '弱势' else: rsi_status = '超卖' # MACD signal if macd_hist > 0 and prev_macd_hist <= 0: macd_signal = '金叉' elif macd_hist < 0 and prev_macd_hist >= 0: macd_signal = '死叉' elif macd_hist > 0: if macd_hist > prev_macd_hist: macd_signal = '金叉扩大' else: macd_signal = '金叉收窄' else: if abs(macd_hist) > abs(prev_macd_hist): macd_signal = '死叉扩大' else: macd_signal = '死叉收窄' # RSI trend if len(df) >= 5: rsi_5_ago = df.iloc[-5].get('rsi', 50) rsi_trend = '上升中' if rsi > rsi_5_ago else '下降中' else: rsi_trend = '中性' return { 'rsi': round(rsi, 1), 'rsi_status': rsi_status, 'rsi_trend': rsi_trend, 'macd_signal': macd_signal, 'macd_hist': round(macd_hist, 4), }