582 lines
19 KiB
Python
582 lines
19 KiB
Python
"""
|
|
Multi-Timeframe Trend Analyzer - EMA alignment based trend detection
|
|
Provides structured trend data for LLM decision making
|
|
"""
|
|
import logging
|
|
from typing import Dict, Any, List, Optional
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from .indicators import TechnicalIndicators
|
|
from .config import config
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TrendAnalyzer:
|
|
"""
|
|
Multi-timeframe trend analyzer using EMA alignment
|
|
|
|
Analyzes trend direction, strength, and alignment across multiple timeframes
|
|
"""
|
|
|
|
# EMA periods for trend analysis
|
|
EMA_PERIODS = [7, 20, 50, 100, 200]
|
|
|
|
# Timeframe hierarchy (shorter to longer)
|
|
TIMEFRAME_ORDER = ['5m', '15m', '1h', '4h', '1d', '1w']
|
|
|
|
@staticmethod
|
|
def analyze_trend(df: pd.DataFrame, timeframe: str = '5m') -> Dict[str, Any]:
|
|
"""
|
|
Analyze trend for a single timeframe using EMA alignment
|
|
|
|
Args:
|
|
df: DataFrame with OHLCV data
|
|
timeframe: Timeframe being analyzed
|
|
|
|
Returns:
|
|
Trend analysis dict for LLM consumption
|
|
"""
|
|
if df.empty or len(df) < 200:
|
|
return TrendAnalyzer._empty_trend_result(timeframe)
|
|
|
|
# Ensure indicators are calculated
|
|
if 'ema_7' not in df.columns:
|
|
df = TechnicalIndicators.add_all_indicators(df)
|
|
|
|
latest = df.iloc[-1]
|
|
current_price = float(latest['close'])
|
|
|
|
# Get EMA values
|
|
ema_values = {}
|
|
for period in TrendAnalyzer.EMA_PERIODS:
|
|
col_name = f'ema_{period}'
|
|
if col_name in df.columns:
|
|
ema_values[period] = float(latest[col_name])
|
|
else:
|
|
# Calculate if missing
|
|
ema_values[period] = float(df['close'].ewm(span=period, adjust=False).mean().iloc[-1])
|
|
|
|
# Determine EMA alignment
|
|
alignment = TrendAnalyzer._analyze_ema_alignment(ema_values, current_price)
|
|
|
|
# Calculate trend strength using ADX
|
|
adx = float(latest.get('adx', 0))
|
|
trend_strength = TrendAnalyzer._calculate_trend_strength(adx, alignment)
|
|
|
|
# Determine trend direction
|
|
trend_direction = TrendAnalyzer._determine_trend_direction(alignment, ema_values)
|
|
|
|
# Calculate trend momentum (slope of EMA20)
|
|
ema_20_slope = TrendAnalyzer._calculate_ema_slope(df, 'ema_20', periods=5)
|
|
|
|
# Detect trend phase
|
|
trend_phase = TrendAnalyzer._detect_trend_phase(
|
|
df, trend_direction, ema_values, current_price
|
|
)
|
|
|
|
return {
|
|
'timeframe': timeframe,
|
|
'direction': trend_direction, # 'uptrend', 'downtrend', 'sideways'
|
|
'direction_cn': TrendAnalyzer._direction_to_chinese(trend_direction),
|
|
'strength': trend_strength, # 'strong', 'moderate', 'weak'
|
|
'strength_cn': TrendAnalyzer._strength_to_chinese(trend_strength),
|
|
'phase': trend_phase, # 'trending', 'pullback', 'reversal', 'consolidation'
|
|
'phase_cn': TrendAnalyzer._phase_to_chinese(trend_phase, trend_direction),
|
|
'ema_alignment': alignment, # 'bullish_aligned', 'bearish_aligned', 'mixed'
|
|
'ema_values': {
|
|
'ema_7': round(ema_values.get(7, 0), 2),
|
|
'ema_20': round(ema_values.get(20, 0), 2),
|
|
'ema_50': round(ema_values.get(50, 0), 2),
|
|
'ema_100': round(ema_values.get(100, 0), 2),
|
|
'ema_200': round(ema_values.get(200, 0), 2),
|
|
},
|
|
'price_vs_emas': {
|
|
'above_ema_7': current_price > ema_values.get(7, current_price),
|
|
'above_ema_20': current_price > ema_values.get(20, current_price),
|
|
'above_ema_50': current_price > ema_values.get(50, current_price),
|
|
'above_ema_100': current_price > ema_values.get(100, current_price),
|
|
'above_ema_200': current_price > ema_values.get(200, current_price),
|
|
},
|
|
'ema_20_slope': round(ema_20_slope, 4),
|
|
'adx': round(adx, 1),
|
|
'current_price': round(current_price, 2),
|
|
}
|
|
|
|
@staticmethod
|
|
def analyze_multi_timeframe_trend(
|
|
mtf_data: Dict[str, pd.DataFrame]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Analyze trend across multiple timeframes and determine alignment
|
|
|
|
Args:
|
|
mtf_data: Dict mapping timeframe to DataFrame
|
|
|
|
Returns:
|
|
Multi-timeframe trend analysis for LLM
|
|
"""
|
|
results = {}
|
|
trend_directions = {}
|
|
|
|
# Analyze each timeframe
|
|
for tf in TrendAnalyzer.TIMEFRAME_ORDER:
|
|
if tf in mtf_data and not mtf_data[tf].empty:
|
|
trend_info = TrendAnalyzer.analyze_trend(mtf_data[tf], tf)
|
|
results[tf] = trend_info
|
|
trend_directions[tf] = trend_info['direction']
|
|
|
|
# Calculate cross-timeframe alignment
|
|
alignment_analysis = TrendAnalyzer._analyze_cross_timeframe_alignment(
|
|
trend_directions
|
|
)
|
|
|
|
# Determine dominant trend
|
|
dominant_trend = TrendAnalyzer._determine_dominant_trend(trend_directions)
|
|
|
|
# Generate LLM-readable summary
|
|
summary = TrendAnalyzer._generate_trend_summary(
|
|
results, alignment_analysis, dominant_trend
|
|
)
|
|
|
|
return {
|
|
'timeframes': results,
|
|
'alignment': alignment_analysis,
|
|
'dominant_trend': dominant_trend,
|
|
'summary': summary,
|
|
'trading_bias': TrendAnalyzer._determine_trading_bias(
|
|
alignment_analysis, dominant_trend
|
|
),
|
|
}
|
|
|
|
@staticmethod
|
|
def _analyze_ema_alignment(
|
|
ema_values: Dict[int, float],
|
|
current_price: float
|
|
) -> str:
|
|
"""
|
|
Analyze EMA alignment pattern
|
|
|
|
Perfect bullish: Price > EMA7 > EMA20 > EMA50 > EMA100 > EMA200
|
|
Perfect bearish: Price < EMA7 < EMA20 < EMA50 < EMA100 < EMA200
|
|
"""
|
|
ema_7 = ema_values.get(7, current_price)
|
|
ema_20 = ema_values.get(20, current_price)
|
|
ema_50 = ema_values.get(50, current_price)
|
|
ema_100 = ema_values.get(100, current_price)
|
|
ema_200 = ema_values.get(200, current_price)
|
|
|
|
# Check bullish alignment
|
|
bullish_count = 0
|
|
if current_price > ema_7:
|
|
bullish_count += 1
|
|
if ema_7 > ema_20:
|
|
bullish_count += 1
|
|
if ema_20 > ema_50:
|
|
bullish_count += 1
|
|
if ema_50 > ema_100:
|
|
bullish_count += 1
|
|
if ema_100 > ema_200:
|
|
bullish_count += 1
|
|
|
|
# Check bearish alignment
|
|
bearish_count = 0
|
|
if current_price < ema_7:
|
|
bearish_count += 1
|
|
if ema_7 < ema_20:
|
|
bearish_count += 1
|
|
if ema_20 < ema_50:
|
|
bearish_count += 1
|
|
if ema_50 < ema_100:
|
|
bearish_count += 1
|
|
if ema_100 < ema_200:
|
|
bearish_count += 1
|
|
|
|
if bullish_count >= 4:
|
|
return 'bullish_aligned'
|
|
elif bearish_count >= 4:
|
|
return 'bearish_aligned'
|
|
else:
|
|
return 'mixed'
|
|
|
|
@staticmethod
|
|
def _calculate_trend_strength(adx: float, alignment: str) -> str:
|
|
"""Calculate trend strength based on ADX and alignment"""
|
|
# ADX thresholds
|
|
if adx > 40:
|
|
base_strength = 'very_strong'
|
|
elif adx > 25:
|
|
base_strength = 'strong'
|
|
elif adx > 20:
|
|
base_strength = 'moderate'
|
|
else:
|
|
base_strength = 'weak'
|
|
|
|
# Downgrade if alignment is mixed
|
|
if alignment == 'mixed':
|
|
if base_strength == 'very_strong':
|
|
return 'strong'
|
|
elif base_strength == 'strong':
|
|
return 'moderate'
|
|
else:
|
|
return 'weak'
|
|
|
|
return base_strength
|
|
|
|
@staticmethod
|
|
def _determine_trend_direction(
|
|
alignment: str,
|
|
ema_values: Dict[int, float]
|
|
) -> str:
|
|
"""Determine trend direction from EMA alignment"""
|
|
if alignment == 'bullish_aligned':
|
|
return 'uptrend'
|
|
elif alignment == 'bearish_aligned':
|
|
return 'downtrend'
|
|
else:
|
|
# Check shorter-term EMAs for mixed scenarios
|
|
ema_20 = ema_values.get(20, 0)
|
|
ema_50 = ema_values.get(50, 0)
|
|
ema_100 = ema_values.get(100, 0)
|
|
|
|
if ema_20 > ema_50 > ema_100:
|
|
return 'uptrend'
|
|
elif ema_20 < ema_50 < ema_100:
|
|
return 'downtrend'
|
|
else:
|
|
return 'sideways'
|
|
|
|
@staticmethod
|
|
def _detect_trend_phase(
|
|
df: pd.DataFrame,
|
|
direction: str,
|
|
ema_values: Dict[int, float],
|
|
current_price: float
|
|
) -> str:
|
|
"""
|
|
Detect current trend phase
|
|
|
|
Phases:
|
|
- trending: Strong directional movement
|
|
- pullback: Temporary counter-trend move within main trend
|
|
- reversal: Potential trend change in progress
|
|
- consolidation: Range-bound, no clear direction
|
|
"""
|
|
ema_20 = ema_values.get(20, current_price)
|
|
ema_50 = ema_values.get(50, current_price)
|
|
|
|
# Get recent price action
|
|
if len(df) < 10:
|
|
return 'unknown'
|
|
|
|
recent_closes = df['close'].tail(10).values
|
|
price_5_ago = recent_closes[-5]
|
|
|
|
if direction == 'uptrend':
|
|
# In uptrend, check if pulling back to EMA
|
|
if current_price < ema_20 and current_price > ema_50:
|
|
return 'pullback'
|
|
elif current_price < ema_50:
|
|
return 'reversal'
|
|
elif current_price > price_5_ago * 1.01:
|
|
return 'trending'
|
|
else:
|
|
return 'consolidation'
|
|
|
|
elif direction == 'downtrend':
|
|
# In downtrend, check if bouncing to EMA
|
|
if current_price > ema_20 and current_price < ema_50:
|
|
return 'pullback'
|
|
elif current_price > ema_50:
|
|
return 'reversal'
|
|
elif current_price < price_5_ago * 0.99:
|
|
return 'trending'
|
|
else:
|
|
return 'consolidation'
|
|
else:
|
|
return 'consolidation'
|
|
|
|
@staticmethod
|
|
def _calculate_ema_slope(
|
|
df: pd.DataFrame,
|
|
ema_col: str,
|
|
periods: int = 5
|
|
) -> float:
|
|
"""Calculate the slope of an EMA over N periods"""
|
|
if ema_col not in df.columns or len(df) < periods:
|
|
return 0.0
|
|
|
|
ema_values = df[ema_col].tail(periods).values
|
|
current = ema_values[-1]
|
|
previous = ema_values[0]
|
|
|
|
if previous == 0:
|
|
return 0.0
|
|
|
|
# Return percentage change per period
|
|
return (current - previous) / previous / periods * 100
|
|
|
|
@staticmethod
|
|
def _analyze_cross_timeframe_alignment(
|
|
trend_directions: Dict[str, str]
|
|
) -> Dict[str, Any]:
|
|
"""Analyze alignment across timeframes"""
|
|
uptrend_count = sum(1 for d in trend_directions.values() if d == 'uptrend')
|
|
downtrend_count = sum(1 for d in trend_directions.values() if d == 'downtrend')
|
|
sideways_count = sum(1 for d in trend_directions.values() if d == 'sideways')
|
|
total = len(trend_directions)
|
|
|
|
if total == 0:
|
|
return {
|
|
'status': 'insufficient_data',
|
|
'status_cn': '数据不足',
|
|
'alignment_score': 0.0,
|
|
}
|
|
|
|
# Calculate alignment score (0 to 1)
|
|
max_aligned = max(uptrend_count, downtrend_count)
|
|
alignment_score = max_aligned / total if total > 0 else 0
|
|
|
|
# Determine status
|
|
if uptrend_count >= total * 0.8:
|
|
status = 'strongly_bullish'
|
|
status_cn = '强烈看涨一致'
|
|
elif uptrend_count >= total * 0.6:
|
|
status = 'bullish'
|
|
status_cn = '看涨一致'
|
|
elif downtrend_count >= total * 0.8:
|
|
status = 'strongly_bearish'
|
|
status_cn = '强烈看跌一致'
|
|
elif downtrend_count >= total * 0.6:
|
|
status = 'bearish'
|
|
status_cn = '看跌一致'
|
|
elif sideways_count >= total * 0.5:
|
|
status = 'ranging'
|
|
status_cn = '震荡整理'
|
|
else:
|
|
status = 'mixed'
|
|
status_cn = '方向分歧'
|
|
|
|
return {
|
|
'status': status,
|
|
'status_cn': status_cn,
|
|
'alignment_score': round(alignment_score, 2),
|
|
'uptrend_count': uptrend_count,
|
|
'downtrend_count': downtrend_count,
|
|
'sideways_count': sideways_count,
|
|
'total_timeframes': total,
|
|
}
|
|
|
|
@staticmethod
|
|
def _determine_dominant_trend(trend_directions: Dict[str, str]) -> Dict[str, Any]:
|
|
"""
|
|
Determine the dominant trend considering timeframe weight
|
|
Longer timeframes have more weight
|
|
"""
|
|
# Weights for each timeframe (longer = more weight)
|
|
weights = {
|
|
'5m': 0.05,
|
|
'15m': 0.10,
|
|
'1h': 0.15,
|
|
'4h': 0.20,
|
|
'1d': 0.25,
|
|
'1w': 0.25,
|
|
}
|
|
|
|
bullish_score = 0.0
|
|
bearish_score = 0.0
|
|
total_weight = 0.0
|
|
|
|
for tf, direction in trend_directions.items():
|
|
weight = weights.get(tf, 0.1)
|
|
total_weight += weight
|
|
|
|
if direction == 'uptrend':
|
|
bullish_score += weight
|
|
elif direction == 'downtrend':
|
|
bearish_score += weight
|
|
|
|
if total_weight == 0:
|
|
return {
|
|
'direction': 'unknown',
|
|
'direction_cn': '未知',
|
|
'confidence': 0.0,
|
|
}
|
|
|
|
# Normalize scores
|
|
bullish_pct = bullish_score / total_weight
|
|
bearish_pct = bearish_score / total_weight
|
|
|
|
if bullish_pct > 0.6:
|
|
direction = 'uptrend'
|
|
direction_cn = '上涨'
|
|
confidence = bullish_pct
|
|
elif bearish_pct > 0.6:
|
|
direction = 'downtrend'
|
|
direction_cn = '下跌'
|
|
confidence = bearish_pct
|
|
else:
|
|
direction = 'sideways'
|
|
direction_cn = '震荡'
|
|
confidence = max(1.0 - bullish_pct - bearish_pct, 0.0)
|
|
|
|
return {
|
|
'direction': direction,
|
|
'direction_cn': direction_cn,
|
|
'confidence': round(confidence, 2),
|
|
'bullish_score': round(bullish_pct, 2),
|
|
'bearish_score': round(bearish_pct, 2),
|
|
}
|
|
|
|
@staticmethod
|
|
def _determine_trading_bias(
|
|
alignment: Dict[str, Any],
|
|
dominant: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""Determine trading bias based on trend analysis"""
|
|
status = alignment.get('status', 'mixed')
|
|
direction = dominant.get('direction', 'sideways')
|
|
confidence = dominant.get('confidence', 0.0)
|
|
|
|
if status in ['strongly_bullish', 'bullish'] and confidence > 0.6:
|
|
bias = 'long'
|
|
bias_cn = '偏多'
|
|
strength = 'strong' if status == 'strongly_bullish' else 'moderate'
|
|
elif status in ['strongly_bearish', 'bearish'] and confidence > 0.6:
|
|
bias = 'short'
|
|
bias_cn = '偏空'
|
|
strength = 'strong' if status == 'strongly_bearish' else 'moderate'
|
|
else:
|
|
bias = 'neutral'
|
|
bias_cn = '中性'
|
|
strength = 'weak'
|
|
|
|
return {
|
|
'bias': bias,
|
|
'bias_cn': bias_cn,
|
|
'strength': strength,
|
|
'strength_cn': TrendAnalyzer._strength_to_chinese(strength),
|
|
'recommendation': TrendAnalyzer._generate_bias_recommendation(
|
|
bias, strength, direction
|
|
),
|
|
}
|
|
|
|
@staticmethod
|
|
def _generate_trend_summary(
|
|
results: Dict[str, Dict],
|
|
alignment: Dict[str, Any],
|
|
dominant: Dict[str, Any]
|
|
) -> str:
|
|
"""Generate human-readable trend summary for LLM"""
|
|
parts = []
|
|
|
|
# Dominant trend
|
|
direction_cn = dominant.get('direction_cn', '未知')
|
|
confidence = dominant.get('confidence', 0) * 100
|
|
parts.append(f"主导趋势: {direction_cn} (置信度{confidence:.0f}%)")
|
|
|
|
# Alignment
|
|
status_cn = alignment.get('status_cn', '未知')
|
|
parts.append(f"多周期一致性: {status_cn}")
|
|
|
|
# Key timeframes
|
|
key_tfs = ['1h', '4h', '1d']
|
|
tf_summary = []
|
|
for tf in key_tfs:
|
|
if tf in results:
|
|
tf_info = results[tf]
|
|
tf_summary.append(
|
|
f"{tf}={tf_info['direction_cn']}({tf_info['strength_cn']})"
|
|
)
|
|
if tf_summary:
|
|
parts.append(f"关键周期: {', '.join(tf_summary)}")
|
|
|
|
return "; ".join(parts)
|
|
|
|
@staticmethod
|
|
def _generate_bias_recommendation(
|
|
bias: str,
|
|
strength: str,
|
|
direction: str
|
|
) -> str:
|
|
"""Generate trading recommendation based on bias"""
|
|
if bias == 'long':
|
|
if strength == 'strong':
|
|
return "强势看涨,可积极寻找做多机会,回调可加仓"
|
|
else:
|
|
return "偏多格局,可逢低做多,注意控制仓位"
|
|
elif bias == 'short':
|
|
if strength == 'strong':
|
|
return "强势看跌,可积极寻找做空机会,反弹可加仓"
|
|
else:
|
|
return "偏空格局,可逢高做空,注意控制仓位"
|
|
else:
|
|
return "方向不明,建议观望或轻仓试探"
|
|
|
|
@staticmethod
|
|
def _direction_to_chinese(direction: str) -> str:
|
|
"""Convert direction to Chinese"""
|
|
mapping = {
|
|
'uptrend': '上涨',
|
|
'downtrend': '下跌',
|
|
'sideways': '震荡',
|
|
'unknown': '未知',
|
|
}
|
|
return mapping.get(direction, '未知')
|
|
|
|
@staticmethod
|
|
def _strength_to_chinese(strength: str) -> str:
|
|
"""Convert strength to Chinese"""
|
|
mapping = {
|
|
'very_strong': '非常强',
|
|
'strong': '强',
|
|
'moderate': '中等',
|
|
'weak': '弱',
|
|
}
|
|
return mapping.get(strength, '弱')
|
|
|
|
@staticmethod
|
|
def _phase_to_chinese(phase: str, direction: str) -> str:
|
|
"""Convert phase to Chinese with context"""
|
|
if phase == 'trending':
|
|
return '趋势进行中' if direction != 'sideways' else '震荡延续'
|
|
elif phase == 'pullback':
|
|
if direction == 'uptrend':
|
|
return '上涨趋势回调'
|
|
elif direction == 'downtrend':
|
|
return '下跌趋势反弹'
|
|
else:
|
|
return '震荡回调'
|
|
elif phase == 'reversal':
|
|
if direction == 'uptrend':
|
|
return '上涨趋势可能反转'
|
|
elif direction == 'downtrend':
|
|
return '下跌趋势可能反转'
|
|
else:
|
|
return '可能突破震荡'
|
|
elif phase == 'consolidation':
|
|
return '盘整蓄力'
|
|
else:
|
|
return '未知'
|
|
|
|
@staticmethod
|
|
def _empty_trend_result(timeframe: str) -> Dict[str, Any]:
|
|
"""Return empty trend result when data is insufficient"""
|
|
return {
|
|
'timeframe': timeframe,
|
|
'direction': 'unknown',
|
|
'direction_cn': '数据不足',
|
|
'strength': 'weak',
|
|
'strength_cn': '弱',
|
|
'phase': 'unknown',
|
|
'phase_cn': '未知',
|
|
'ema_alignment': 'unknown',
|
|
'ema_values': {},
|
|
'price_vs_emas': {},
|
|
'ema_20_slope': 0.0,
|
|
'adx': 0.0,
|
|
'current_price': 0.0,
|
|
}
|