""" Multi-Timeframe Trend Analyzer - EMA alignment based trend detection Provides structured trend data for LLM decision making """ import logging from typing import Dict, Any, List, Optional import pandas as pd import numpy as np from .indicators import TechnicalIndicators from .config import config logger = logging.getLogger(__name__) class TrendAnalyzer: """ Multi-timeframe trend analyzer using EMA alignment Analyzes trend direction, strength, and alignment across multiple timeframes """ # EMA periods for trend analysis EMA_PERIODS = [7, 20, 50, 100, 200] # Timeframe hierarchy (shorter to longer) TIMEFRAME_ORDER = ['5m', '15m', '1h', '4h', '1d', '1w'] @staticmethod def analyze_trend(df: pd.DataFrame, timeframe: str = '5m') -> Dict[str, Any]: """ Analyze trend for a single timeframe using EMA alignment Args: df: DataFrame with OHLCV data timeframe: Timeframe being analyzed Returns: Trend analysis dict for LLM consumption """ if df.empty or len(df) < 200: return TrendAnalyzer._empty_trend_result(timeframe) # Ensure indicators are calculated if 'ema_7' not in df.columns: df = TechnicalIndicators.add_all_indicators(df) latest = df.iloc[-1] current_price = float(latest['close']) # Get EMA values ema_values = {} for period in TrendAnalyzer.EMA_PERIODS: col_name = f'ema_{period}' if col_name in df.columns: ema_values[period] = float(latest[col_name]) else: # Calculate if missing ema_values[period] = float(df['close'].ewm(span=period, adjust=False).mean().iloc[-1]) # Determine EMA alignment alignment = TrendAnalyzer._analyze_ema_alignment(ema_values, current_price) # Calculate trend strength using ADX adx = float(latest.get('adx', 0)) trend_strength = TrendAnalyzer._calculate_trend_strength(adx, alignment) # Determine trend direction trend_direction = TrendAnalyzer._determine_trend_direction(alignment, ema_values) # Calculate trend momentum (slope of EMA20) ema_20_slope = TrendAnalyzer._calculate_ema_slope(df, 'ema_20', periods=5) # Detect trend phase trend_phase = TrendAnalyzer._detect_trend_phase( df, trend_direction, ema_values, current_price ) return { 'timeframe': timeframe, 'direction': trend_direction, # 'uptrend', 'downtrend', 'sideways' 'direction_cn': TrendAnalyzer._direction_to_chinese(trend_direction), 'strength': trend_strength, # 'strong', 'moderate', 'weak' 'strength_cn': TrendAnalyzer._strength_to_chinese(trend_strength), 'phase': trend_phase, # 'trending', 'pullback', 'reversal', 'consolidation' 'phase_cn': TrendAnalyzer._phase_to_chinese(trend_phase, trend_direction), 'ema_alignment': alignment, # 'bullish_aligned', 'bearish_aligned', 'mixed' 'ema_values': { 'ema_7': round(ema_values.get(7, 0), 2), 'ema_20': round(ema_values.get(20, 0), 2), 'ema_50': round(ema_values.get(50, 0), 2), 'ema_100': round(ema_values.get(100, 0), 2), 'ema_200': round(ema_values.get(200, 0), 2), }, 'price_vs_emas': { 'above_ema_7': current_price > ema_values.get(7, current_price), 'above_ema_20': current_price > ema_values.get(20, current_price), 'above_ema_50': current_price > ema_values.get(50, current_price), 'above_ema_100': current_price > ema_values.get(100, current_price), 'above_ema_200': current_price > ema_values.get(200, current_price), }, 'ema_20_slope': round(ema_20_slope, 4), 'adx': round(adx, 1), 'current_price': round(current_price, 2), } @staticmethod def analyze_multi_timeframe_trend( mtf_data: Dict[str, pd.DataFrame] ) -> Dict[str, Any]: """ Analyze trend across multiple timeframes and determine alignment Args: mtf_data: Dict mapping timeframe to DataFrame Returns: Multi-timeframe trend analysis for LLM """ results = {} trend_directions = {} # Analyze each timeframe for tf in TrendAnalyzer.TIMEFRAME_ORDER: if tf in mtf_data and not mtf_data[tf].empty: trend_info = TrendAnalyzer.analyze_trend(mtf_data[tf], tf) results[tf] = trend_info trend_directions[tf] = trend_info['direction'] # Calculate cross-timeframe alignment alignment_analysis = TrendAnalyzer._analyze_cross_timeframe_alignment( trend_directions ) # Determine dominant trend dominant_trend = TrendAnalyzer._determine_dominant_trend(trend_directions) # Generate LLM-readable summary summary = TrendAnalyzer._generate_trend_summary( results, alignment_analysis, dominant_trend ) return { 'timeframes': results, 'alignment': alignment_analysis, 'dominant_trend': dominant_trend, 'summary': summary, 'trading_bias': TrendAnalyzer._determine_trading_bias( alignment_analysis, dominant_trend ), } @staticmethod def _analyze_ema_alignment( ema_values: Dict[int, float], current_price: float ) -> str: """ Analyze EMA alignment pattern Perfect bullish: Price > EMA7 > EMA20 > EMA50 > EMA100 > EMA200 Perfect bearish: Price < EMA7 < EMA20 < EMA50 < EMA100 < EMA200 """ ema_7 = ema_values.get(7, current_price) ema_20 = ema_values.get(20, current_price) ema_50 = ema_values.get(50, current_price) ema_100 = ema_values.get(100, current_price) ema_200 = ema_values.get(200, current_price) # Check bullish alignment bullish_count = 0 if current_price > ema_7: bullish_count += 1 if ema_7 > ema_20: bullish_count += 1 if ema_20 > ema_50: bullish_count += 1 if ema_50 > ema_100: bullish_count += 1 if ema_100 > ema_200: bullish_count += 1 # Check bearish alignment bearish_count = 0 if current_price < ema_7: bearish_count += 1 if ema_7 < ema_20: bearish_count += 1 if ema_20 < ema_50: bearish_count += 1 if ema_50 < ema_100: bearish_count += 1 if ema_100 < ema_200: bearish_count += 1 if bullish_count >= 4: return 'bullish_aligned' elif bearish_count >= 4: return 'bearish_aligned' else: return 'mixed' @staticmethod def _calculate_trend_strength(adx: float, alignment: str) -> str: """Calculate trend strength based on ADX and alignment""" # ADX thresholds if adx > 40: base_strength = 'very_strong' elif adx > 25: base_strength = 'strong' elif adx > 20: base_strength = 'moderate' else: base_strength = 'weak' # Downgrade if alignment is mixed if alignment == 'mixed': if base_strength == 'very_strong': return 'strong' elif base_strength == 'strong': return 'moderate' else: return 'weak' return base_strength @staticmethod def _determine_trend_direction( alignment: str, ema_values: Dict[int, float] ) -> str: """Determine trend direction from EMA alignment""" if alignment == 'bullish_aligned': return 'uptrend' elif alignment == 'bearish_aligned': return 'downtrend' else: # Check shorter-term EMAs for mixed scenarios ema_20 = ema_values.get(20, 0) ema_50 = ema_values.get(50, 0) ema_100 = ema_values.get(100, 0) if ema_20 > ema_50 > ema_100: return 'uptrend' elif ema_20 < ema_50 < ema_100: return 'downtrend' else: return 'sideways' @staticmethod def _detect_trend_phase( df: pd.DataFrame, direction: str, ema_values: Dict[int, float], current_price: float ) -> str: """ Detect current trend phase Phases: - trending: Strong directional movement - pullback: Temporary counter-trend move within main trend - reversal: Potential trend change in progress - consolidation: Range-bound, no clear direction """ ema_20 = ema_values.get(20, current_price) ema_50 = ema_values.get(50, current_price) # Get recent price action if len(df) < 10: return 'unknown' recent_closes = df['close'].tail(10).values price_5_ago = recent_closes[-5] if direction == 'uptrend': # In uptrend, check if pulling back to EMA if current_price < ema_20 and current_price > ema_50: return 'pullback' elif current_price < ema_50: return 'reversal' elif current_price > price_5_ago * 1.01: return 'trending' else: return 'consolidation' elif direction == 'downtrend': # In downtrend, check if bouncing to EMA if current_price > ema_20 and current_price < ema_50: return 'pullback' elif current_price > ema_50: return 'reversal' elif current_price < price_5_ago * 0.99: return 'trending' else: return 'consolidation' else: return 'consolidation' @staticmethod def _calculate_ema_slope( df: pd.DataFrame, ema_col: str, periods: int = 5 ) -> float: """Calculate the slope of an EMA over N periods""" if ema_col not in df.columns or len(df) < periods: return 0.0 ema_values = df[ema_col].tail(periods).values current = ema_values[-1] previous = ema_values[0] if previous == 0: return 0.0 # Return percentage change per period return (current - previous) / previous / periods * 100 @staticmethod def _analyze_cross_timeframe_alignment( trend_directions: Dict[str, str] ) -> Dict[str, Any]: """Analyze alignment across timeframes""" uptrend_count = sum(1 for d in trend_directions.values() if d == 'uptrend') downtrend_count = sum(1 for d in trend_directions.values() if d == 'downtrend') sideways_count = sum(1 for d in trend_directions.values() if d == 'sideways') total = len(trend_directions) if total == 0: return { 'status': 'insufficient_data', 'status_cn': '数据不足', 'alignment_score': 0.0, } # Calculate alignment score (0 to 1) max_aligned = max(uptrend_count, downtrend_count) alignment_score = max_aligned / total if total > 0 else 0 # Determine status if uptrend_count >= total * 0.8: status = 'strongly_bullish' status_cn = '强烈看涨一致' elif uptrend_count >= total * 0.6: status = 'bullish' status_cn = '看涨一致' elif downtrend_count >= total * 0.8: status = 'strongly_bearish' status_cn = '强烈看跌一致' elif downtrend_count >= total * 0.6: status = 'bearish' status_cn = '看跌一致' elif sideways_count >= total * 0.5: status = 'ranging' status_cn = '震荡整理' else: status = 'mixed' status_cn = '方向分歧' return { 'status': status, 'status_cn': status_cn, 'alignment_score': round(alignment_score, 2), 'uptrend_count': uptrend_count, 'downtrend_count': downtrend_count, 'sideways_count': sideways_count, 'total_timeframes': total, } @staticmethod def _determine_dominant_trend(trend_directions: Dict[str, str]) -> Dict[str, Any]: """ Determine the dominant trend considering timeframe weight Longer timeframes have more weight """ # Weights for each timeframe (longer = more weight) weights = { '5m': 0.05, '15m': 0.10, '1h': 0.15, '4h': 0.20, '1d': 0.25, '1w': 0.25, } bullish_score = 0.0 bearish_score = 0.0 total_weight = 0.0 for tf, direction in trend_directions.items(): weight = weights.get(tf, 0.1) total_weight += weight if direction == 'uptrend': bullish_score += weight elif direction == 'downtrend': bearish_score += weight if total_weight == 0: return { 'direction': 'unknown', 'direction_cn': '未知', 'confidence': 0.0, } # Normalize scores bullish_pct = bullish_score / total_weight bearish_pct = bearish_score / total_weight if bullish_pct > 0.6: direction = 'uptrend' direction_cn = '上涨' confidence = bullish_pct elif bearish_pct > 0.6: direction = 'downtrend' direction_cn = '下跌' confidence = bearish_pct else: direction = 'sideways' direction_cn = '震荡' confidence = max(1.0 - bullish_pct - bearish_pct, 0.0) return { 'direction': direction, 'direction_cn': direction_cn, 'confidence': round(confidence, 2), 'bullish_score': round(bullish_pct, 2), 'bearish_score': round(bearish_pct, 2), } @staticmethod def _determine_trading_bias( alignment: Dict[str, Any], dominant: Dict[str, Any] ) -> Dict[str, Any]: """Determine trading bias based on trend analysis""" status = alignment.get('status', 'mixed') direction = dominant.get('direction', 'sideways') confidence = dominant.get('confidence', 0.0) if status in ['strongly_bullish', 'bullish'] and confidence > 0.6: bias = 'long' bias_cn = '偏多' strength = 'strong' if status == 'strongly_bullish' else 'moderate' elif status in ['strongly_bearish', 'bearish'] and confidence > 0.6: bias = 'short' bias_cn = '偏空' strength = 'strong' if status == 'strongly_bearish' else 'moderate' else: bias = 'neutral' bias_cn = '中性' strength = 'weak' return { 'bias': bias, 'bias_cn': bias_cn, 'strength': strength, 'strength_cn': TrendAnalyzer._strength_to_chinese(strength), 'recommendation': TrendAnalyzer._generate_bias_recommendation( bias, strength, direction ), } @staticmethod def _generate_trend_summary( results: Dict[str, Dict], alignment: Dict[str, Any], dominant: Dict[str, Any] ) -> str: """Generate human-readable trend summary for LLM""" parts = [] # Dominant trend direction_cn = dominant.get('direction_cn', '未知') confidence = dominant.get('confidence', 0) * 100 parts.append(f"主导趋势: {direction_cn} (置信度{confidence:.0f}%)") # Alignment status_cn = alignment.get('status_cn', '未知') parts.append(f"多周期一致性: {status_cn}") # Key timeframes key_tfs = ['1h', '4h', '1d'] tf_summary = [] for tf in key_tfs: if tf in results: tf_info = results[tf] tf_summary.append( f"{tf}={tf_info['direction_cn']}({tf_info['strength_cn']})" ) if tf_summary: parts.append(f"关键周期: {', '.join(tf_summary)}") return "; ".join(parts) @staticmethod def _generate_bias_recommendation( bias: str, strength: str, direction: str ) -> str: """Generate trading recommendation based on bias""" if bias == 'long': if strength == 'strong': return "强势看涨,可积极寻找做多机会,回调可加仓" else: return "偏多格局,可逢低做多,注意控制仓位" elif bias == 'short': if strength == 'strong': return "强势看跌,可积极寻找做空机会,反弹可加仓" else: return "偏空格局,可逢高做空,注意控制仓位" else: return "方向不明,建议观望或轻仓试探" @staticmethod def _direction_to_chinese(direction: str) -> str: """Convert direction to Chinese""" mapping = { 'uptrend': '上涨', 'downtrend': '下跌', 'sideways': '震荡', 'unknown': '未知', } return mapping.get(direction, '未知') @staticmethod def _strength_to_chinese(strength: str) -> str: """Convert strength to Chinese""" mapping = { 'very_strong': '非常强', 'strong': '强', 'moderate': '中等', 'weak': '弱', } return mapping.get(strength, '弱') @staticmethod def _phase_to_chinese(phase: str, direction: str) -> str: """Convert phase to Chinese with context""" if phase == 'trending': return '趋势进行中' if direction != 'sideways' else '震荡延续' elif phase == 'pullback': if direction == 'uptrend': return '上涨趋势回调' elif direction == 'downtrend': return '下跌趋势反弹' else: return '震荡回调' elif phase == 'reversal': if direction == 'uptrend': return '上涨趋势可能反转' elif direction == 'downtrend': return '下跌趋势可能反转' else: return '可能突破震荡' elif phase == 'consolidation': return '盘整蓄力' else: return '未知' @staticmethod def _empty_trend_result(timeframe: str) -> Dict[str, Any]: """Return empty trend result when data is insufficient""" return { 'timeframe': timeframe, 'direction': 'unknown', 'direction_cn': '数据不足', 'strength': 'weak', 'strength_cn': '弱', 'phase': 'unknown', 'phase_cn': '未知', 'ema_alignment': 'unknown', 'ema_values': {}, 'price_vs_emas': {}, 'ema_20_slope': 0.0, 'adx': 0.0, 'current_price': 0.0, }