""" Fibonacci Support/Resistance Calculator Calculates key price levels using Fibonacci retracement and extension """ import logging from typing import Dict, Any, List, Tuple, Optional import pandas as pd import numpy as np from .config import config logger = logging.getLogger(__name__) class FibonacciAnalyzer: """ Calculates support and resistance levels using: 1. Fibonacci retracement levels 2. Fibonacci extension levels 3. Pivot points (High/Low) 4. Price clustering zones """ # Standard Fibonacci ratios FIB_RETRACEMENT = [0.0, 0.236, 0.382, 0.5, 0.618, 0.786, 1.0] FIB_EXTENSION = [1.0, 1.272, 1.414, 1.618, 2.0, 2.618] # Key levels for trading KEY_FIB_LEVELS = [0.382, 0.5, 0.618] # Most important levels @staticmethod def calculate_fibonacci_levels( df: pd.DataFrame, lookback_periods: int = 100, timeframe: str = '1h' ) -> Dict[str, Any]: """ Calculate Fibonacci retracement and extension levels Args: df: DataFrame with OHLCV data lookback_periods: Number of periods to find swing high/low timeframe: Timeframe for context Returns: Dict with Fibonacci levels and analysis """ if df.empty or len(df) < lookback_periods: return FibonacciAnalyzer._empty_result(timeframe) lookback_df = df.tail(lookback_periods) current_price = float(df.iloc[-1]['close']) # Find significant swing high and low swing_high, swing_high_idx = FibonacciAnalyzer._find_swing_high(lookback_df) swing_low, swing_low_idx = FibonacciAnalyzer._find_swing_low(lookback_df) if swing_high is None or swing_low is None: return FibonacciAnalyzer._empty_result(timeframe) # Determine trend direction based on which came first is_uptrend = swing_low_idx < swing_high_idx # Calculate retracement levels if is_uptrend: # In uptrend, retracement levels from high to low retracement_levels = FibonacciAnalyzer._calculate_retracement( swing_high, swing_low, 'uptrend' ) extension_levels = FibonacciAnalyzer._calculate_extension( swing_low, swing_high, 'uptrend' ) else: # In downtrend, retracement levels from low to high retracement_levels = FibonacciAnalyzer._calculate_retracement( swing_low, swing_high, 'downtrend' ) extension_levels = FibonacciAnalyzer._calculate_extension( swing_high, swing_low, 'downtrend' ) # Identify key support/resistance from Fibonacci supports, resistances = FibonacciAnalyzer._identify_sr_from_fib( retracement_levels, extension_levels, current_price ) # Add pivot-based support/resistance pivot_levels = FibonacciAnalyzer._calculate_pivot_points(lookback_df) # Merge and cluster all levels all_supports = FibonacciAnalyzer._merge_and_cluster_levels( supports + pivot_levels.get('supports', []), current_price ) all_resistances = FibonacciAnalyzer._merge_and_cluster_levels( resistances + pivot_levels.get('resistances', []), current_price ) # Find nearest levels nearest_support = FibonacciAnalyzer._find_nearest_below(all_supports, current_price) nearest_resistance = FibonacciAnalyzer._find_nearest_above(all_resistances, current_price) # Calculate key zones key_zones = FibonacciAnalyzer._identify_key_zones( retracement_levels, current_price, is_uptrend ) return { 'timeframe': timeframe, 'current_price': round(current_price, 2), 'trend_context': 'uptrend' if is_uptrend else 'downtrend', 'swing_high': round(swing_high, 2), 'swing_low': round(swing_low, 2), 'swing_range': round(swing_high - swing_low, 2), 'swing_range_pct': round((swing_high - swing_low) / swing_low * 100, 2), # Fibonacci levels 'fibonacci': { 'retracement': { str(ratio): round(level, 2) for ratio, level in retracement_levels.items() }, 'extension': { str(ratio): round(level, 2) for ratio, level in extension_levels.items() }, }, # Support/Resistance 'supports': [round(s, 2) for s in all_supports[:5]], # Top 5 'resistances': [round(r, 2) for r in all_resistances[:5]], # Top 5 'nearest_support': round(nearest_support, 2) if nearest_support else None, 'nearest_resistance': round(nearest_resistance, 2) if nearest_resistance else None, # Distance to key levels 'distance_to_support': FibonacciAnalyzer._calculate_distance( current_price, nearest_support ) if nearest_support else None, 'distance_to_resistance': FibonacciAnalyzer._calculate_distance( current_price, nearest_resistance ) if nearest_resistance else None, # Key zones for trading 'key_zones': key_zones, # Pivot points 'pivot': pivot_levels.get('pivot'), } @staticmethod def analyze_multi_timeframe_levels( mtf_data: Dict[str, pd.DataFrame] ) -> Dict[str, Any]: """ Analyze support/resistance across multiple timeframes Args: mtf_data: Dict mapping timeframe to DataFrame Returns: Consolidated support/resistance analysis """ results = {} all_supports = [] all_resistances = [] current_price = None # Analyze each timeframe for tf, df in mtf_data.items(): if df.empty: continue # Adjust lookback based on timeframe lookback = FibonacciAnalyzer._get_lookback_for_timeframe(tf) tf_result = FibonacciAnalyzer.calculate_fibonacci_levels( df, lookback_periods=lookback, timeframe=tf ) results[tf] = tf_result if current_price is None: current_price = tf_result.get('current_price', 0) # Collect levels with timeframe weight weight = FibonacciAnalyzer._get_timeframe_weight(tf) for s in tf_result.get('supports', []): all_supports.append((s, weight, tf)) for r in tf_result.get('resistances', []): all_resistances.append((r, weight, tf)) if current_price is None: return {'error': 'No data available'} # Find confluence zones (levels appearing in multiple timeframes) support_confluence = FibonacciAnalyzer._find_confluence_zones( all_supports, current_price, direction='below' ) resistance_confluence = FibonacciAnalyzer._find_confluence_zones( all_resistances, current_price, direction='above' ) # Generate LLM-readable summary summary = FibonacciAnalyzer._generate_sr_summary( support_confluence, resistance_confluence, current_price ) return { 'timeframes': results, 'confluence': { 'supports': support_confluence[:5], 'resistances': resistance_confluence[:5], }, 'strongest_support': support_confluence[0] if support_confluence else None, 'strongest_resistance': resistance_confluence[0] if resistance_confluence else None, 'current_price': round(current_price, 2), 'summary': summary, } @staticmethod def _find_swing_high(df: pd.DataFrame) -> Tuple[Optional[float], Optional[int]]: """Find the most significant swing high in the data""" highs = df['high'].values if len(highs) < 5: return None, None # Find local maxima swing_highs = [] for i in range(2, len(highs) - 2): if (highs[i] > highs[i-1] and highs[i] > highs[i-2] and highs[i] > highs[i+1] and highs[i] > highs[i+2]): swing_highs.append((highs[i], i)) if not swing_highs: # Fall back to absolute max max_idx = np.argmax(highs) return float(highs[max_idx]), int(max_idx) # Return the highest swing swing_highs.sort(key=lambda x: x[0], reverse=True) return float(swing_highs[0][0]), int(swing_highs[0][1]) @staticmethod def _find_swing_low(df: pd.DataFrame) -> Tuple[Optional[float], Optional[int]]: """Find the most significant swing low in the data""" lows = df['low'].values if len(lows) < 5: return None, None # Find local minima swing_lows = [] for i in range(2, len(lows) - 2): if (lows[i] < lows[i-1] and lows[i] < lows[i-2] and lows[i] < lows[i+1] and lows[i] < lows[i+2]): swing_lows.append((lows[i], i)) if not swing_lows: # Fall back to absolute min min_idx = np.argmin(lows) return float(lows[min_idx]), int(min_idx) # Return the lowest swing swing_lows.sort(key=lambda x: x[0]) return float(swing_lows[0][0]), int(swing_lows[0][1]) @staticmethod def _calculate_retracement( high: float, low: float, trend: str ) -> Dict[float, float]: """Calculate Fibonacci retracement levels""" diff = high - low levels = {} for ratio in FibonacciAnalyzer.FIB_RETRACEMENT: if trend == 'uptrend': # In uptrend, retracement down from high levels[ratio] = high - (diff * ratio) else: # In downtrend, retracement up from low levels[ratio] = low + (diff * ratio) return levels @staticmethod def _calculate_extension( start: float, end: float, trend: str ) -> Dict[float, float]: """Calculate Fibonacci extension levels""" diff = abs(end - start) levels = {} for ratio in FibonacciAnalyzer.FIB_EXTENSION: if trend == 'uptrend': # Extension above the high levels[ratio] = end + (diff * (ratio - 1)) else: # Extension below the low levels[ratio] = end - (diff * (ratio - 1)) return levels @staticmethod def _identify_sr_from_fib( retracement: Dict[float, float], extension: Dict[float, float], current_price: float ) -> Tuple[List[float], List[float]]: """Identify support and resistance from Fibonacci levels""" supports = [] resistances = [] # From retracement levels for ratio, level in retracement.items(): if level < current_price: supports.append(level) elif level > current_price: resistances.append(level) # From extension levels for ratio, level in extension.items(): if level < current_price: supports.append(level) elif level > current_price: resistances.append(level) # Sort supports = sorted(set(supports), reverse=True) resistances = sorted(set(resistances)) return supports, resistances @staticmethod def _calculate_pivot_points(df: pd.DataFrame) -> Dict[str, Any]: """Calculate pivot points (Classic formula)""" if len(df) < 2: return {'pivot': None, 'supports': [], 'resistances': []} # Use previous period's data prev = df.iloc[-2] high = float(prev['high']) low = float(prev['low']) close = float(prev['close']) # Classic pivot point pivot = (high + low + close) / 3 # Support levels s1 = 2 * pivot - high s2 = pivot - (high - low) s3 = low - 2 * (high - pivot) # Resistance levels r1 = 2 * pivot - low r2 = pivot + (high - low) r3 = high + 2 * (pivot - low) return { 'pivot': round(pivot, 2), 'supports': [round(s1, 2), round(s2, 2), round(s3, 2)], 'resistances': [round(r1, 2), round(r2, 2), round(r3, 2)], } @staticmethod def _merge_and_cluster_levels( levels: List[float], reference: float, tolerance_pct: float = 0.005 ) -> List[float]: """Merge similar levels within tolerance""" if not levels: return [] tolerance = reference * tolerance_pct sorted_levels = sorted(levels) clustered = [] current_cluster = [sorted_levels[0]] for level in sorted_levels[1:]: if abs(level - current_cluster[-1]) < tolerance: current_cluster.append(level) else: # Average the cluster clustered.append(np.mean(current_cluster)) current_cluster = [level] # Add last cluster if current_cluster: clustered.append(np.mean(current_cluster)) return clustered @staticmethod def _find_nearest_below(levels: List[float], price: float) -> Optional[float]: """Find nearest level below current price""" below = [l for l in levels if l < price] if below: return max(below) return None @staticmethod def _find_nearest_above(levels: List[float], price: float) -> Optional[float]: """Find nearest level above current price""" above = [l for l in levels if l > price] if above: return min(above) return None @staticmethod def _calculate_distance(current: float, target: Optional[float]) -> Dict[str, float]: """Calculate distance to target level""" if target is None or current == 0: return None diff = target - current pct = (diff / current) * 100 return { 'absolute': round(abs(diff), 2), 'percentage': round(pct, 2), 'direction': 'above' if diff > 0 else 'below', } @staticmethod def _identify_key_zones( retracement: Dict[float, float], current_price: float, is_uptrend: bool ) -> List[Dict[str, Any]]: """Identify key trading zones from Fibonacci levels""" zones = [] # Key Fibonacci levels with descriptions key_levels = { 0.382: '浅回调区 (38.2%)', 0.5: '中度回调区 (50%)', 0.618: '黄金分割区 (61.8%)', } for ratio, description in key_levels.items(): if ratio in retracement: level = retracement[ratio] distance_pct = ((level - current_price) / current_price) * 100 zone = { 'ratio': ratio, 'level': round(level, 2), 'description': description, 'distance_pct': round(distance_pct, 2), 'type': 'support' if level < current_price else 'resistance', } # Importance rating if ratio == 0.618: zone['importance'] = 'high' zone['importance_cn'] = '重要' elif ratio == 0.5: zone['importance'] = 'medium' zone['importance_cn'] = '中等' else: zone['importance'] = 'low' zone['importance_cn'] = '一般' zones.append(zone) return zones @staticmethod def _find_confluence_zones( levels_with_weight: List[Tuple[float, float, str]], current_price: float, direction: str = 'below' ) -> List[Dict[str, Any]]: """ Find confluence zones where multiple timeframes have similar levels Args: levels_with_weight: List of (level, weight, timeframe) current_price: Current price direction: 'below' for supports, 'above' for resistances Returns: List of confluence zones sorted by strength """ if not levels_with_weight: return [] # Filter by direction if direction == 'below': filtered = [(l, w, tf) for l, w, tf in levels_with_weight if l < current_price] else: filtered = [(l, w, tf) for l, w, tf in levels_with_weight if l > current_price] if not filtered: return [] # Cluster levels with tolerance tolerance = current_price * 0.005 # 0.5% sorted_levels = sorted(filtered, key=lambda x: x[0]) clusters = [] current_cluster = [sorted_levels[0]] for level_info in sorted_levels[1:]: if abs(level_info[0] - current_cluster[-1][0]) < tolerance: current_cluster.append(level_info) else: clusters.append(current_cluster) current_cluster = [level_info] if current_cluster: clusters.append(current_cluster) # Calculate confluence strength for each cluster confluence_zones = [] for cluster in clusters: avg_level = np.mean([l[0] for l in cluster]) total_weight = sum(l[1] for l in cluster) timeframes = list(set(l[2] for l in cluster)) num_timeframes = len(timeframes) # Confluence score based on weight and number of timeframes confluence_score = total_weight * (1 + 0.2 * num_timeframes) distance_pct = ((avg_level - current_price) / current_price) * 100 confluence_zones.append({ 'level': round(avg_level, 2), 'confluence_score': round(confluence_score, 2), 'num_timeframes': num_timeframes, 'timeframes': timeframes, 'distance_pct': round(distance_pct, 2), 'strength': FibonacciAnalyzer._score_to_strength(confluence_score), 'strength_cn': FibonacciAnalyzer._score_to_strength_cn(confluence_score), }) # Sort by confluence score (higher = stronger) confluence_zones.sort(key=lambda x: x['confluence_score'], reverse=True) return confluence_zones @staticmethod def _score_to_strength(score: float) -> str: """Convert confluence score to strength label""" if score >= 0.8: return 'very_strong' elif score >= 0.5: return 'strong' elif score >= 0.3: return 'moderate' else: return 'weak' @staticmethod def _score_to_strength_cn(score: float) -> str: """Convert confluence score to Chinese strength label""" if score >= 0.8: return '非常强' elif score >= 0.5: return '强' elif score >= 0.3: return '中等' else: return '弱' @staticmethod def _get_lookback_for_timeframe(tf: str) -> int: """Get appropriate lookback periods for each timeframe""" lookbacks = { '5m': 200, '15m': 150, '1h': 100, '4h': 80, '1d': 60, '1w': 40, } return lookbacks.get(tf, 100) @staticmethod def _get_timeframe_weight(tf: str) -> float: """Get weight for each timeframe (longer = more weight)""" weights = { '5m': 0.05, '15m': 0.10, '1h': 0.15, '4h': 0.20, '1d': 0.25, '1w': 0.25, } return weights.get(tf, 0.1) @staticmethod def _generate_sr_summary( supports: List[Dict], resistances: List[Dict], current_price: float ) -> str: """Generate LLM-readable summary of support/resistance""" parts = [] # Strongest support if supports: s = supports[0] parts.append( f"最强支撑: ${s['level']:,.0f} ({s['distance_pct']:.1f}%, " f"{s['num_timeframes']}个周期确认, {s['strength_cn']})" ) # Strongest resistance if resistances: r = resistances[0] parts.append( f"最强压力: ${r['level']:,.0f} ({r['distance_pct']:.1f}%, " f"{r['num_timeframes']}个周期确认, {r['strength_cn']})" ) # Price position if supports and resistances: s_dist = abs(supports[0]['distance_pct']) r_dist = abs(resistances[0]['distance_pct']) if s_dist < r_dist: parts.append("价格更接近支撑位") else: parts.append("价格更接近压力位") return "; ".join(parts) if parts else "无明确支撑压力位" @staticmethod def _empty_result(timeframe: str) -> Dict[str, Any]: """Return empty result when data is insufficient""" return { 'timeframe': timeframe, 'error': 'insufficient_data', 'current_price': 0, 'supports': [], 'resistances': [], 'nearest_support': None, 'nearest_resistance': None, 'fibonacci': { 'retracement': {}, 'extension': {}, }, 'key_zones': [], }