From a39688c731b4556ece98e5dc2dabc17213d61093 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Tue, 16 Dec 2025 23:26:19 +0800 Subject: [PATCH] 111 --- analysis/__init__.py | 13 + analysis/fibonacci.py | 643 ++++++++++++++++++++++++++ analysis/llm_dataset_builder.py | 592 ++++++++++++++++++++++++ analysis/momentum_analyzer.py | 796 ++++++++++++++++++++++++++++++++ analysis/orderflow_enhanced.py | 575 +++++++++++++++++++++++ analysis/trend_analyzer.py | 581 +++++++++++++++++++++++ signals/llm_decision.py | 262 ++++++++++- 7 files changed, 3461 insertions(+), 1 deletion(-) create mode 100644 analysis/fibonacci.py create mode 100644 analysis/llm_dataset_builder.py create mode 100644 analysis/momentum_analyzer.py create mode 100644 analysis/orderflow_enhanced.py create mode 100644 analysis/trend_analyzer.py diff --git a/analysis/__init__.py b/analysis/__init__.py index 9c21846..8045f4e 100644 --- a/analysis/__init__.py +++ b/analysis/__init__.py @@ -10,6 +10,13 @@ from .orderflow import OrderFlowAnalyzer from .llm_context import LLMContextBuilder from .engine import MarketAnalysisEngine +# New analysis modules +from .trend_analyzer import TrendAnalyzer +from .fibonacci import FibonacciAnalyzer +from .orderflow_enhanced import EnhancedOrderFlowAnalyzer +from .momentum_analyzer import MomentumAnalyzer +from .llm_dataset_builder import LLMDatasetBuilder + __all__ = [ 'config', 'MarketDataReader', @@ -18,4 +25,10 @@ __all__ = [ 'OrderFlowAnalyzer', 'LLMContextBuilder', 'MarketAnalysisEngine', + # New modules + 'TrendAnalyzer', + 'FibonacciAnalyzer', + 'EnhancedOrderFlowAnalyzer', + 'MomentumAnalyzer', + 'LLMDatasetBuilder', ] diff --git a/analysis/fibonacci.py b/analysis/fibonacci.py new file mode 100644 index 0000000..94c21ed --- /dev/null +++ b/analysis/fibonacci.py @@ -0,0 +1,643 @@ +""" +Fibonacci Support/Resistance Calculator +Calculates key price levels using Fibonacci retracement and extension +""" +import logging +from typing import Dict, Any, List, Tuple, Optional +import pandas as pd +import numpy as np + +from .config import config + + +logger = logging.getLogger(__name__) + + +class FibonacciAnalyzer: + """ + Calculates support and resistance levels using: + 1. Fibonacci retracement levels + 2. Fibonacci extension levels + 3. Pivot points (High/Low) + 4. Price clustering zones + """ + + # Standard Fibonacci ratios + FIB_RETRACEMENT = [0.0, 0.236, 0.382, 0.5, 0.618, 0.786, 1.0] + FIB_EXTENSION = [1.0, 1.272, 1.414, 1.618, 2.0, 2.618] + + # Key levels for trading + KEY_FIB_LEVELS = [0.382, 0.5, 0.618] # Most important levels + + @staticmethod + def calculate_fibonacci_levels( + df: pd.DataFrame, + lookback_periods: int = 100, + timeframe: str = '1h' + ) -> Dict[str, Any]: + """ + Calculate Fibonacci retracement and extension levels + + Args: + df: DataFrame with OHLCV data + lookback_periods: Number of periods to find swing high/low + timeframe: Timeframe for context + + Returns: + Dict with Fibonacci levels and analysis + """ + if df.empty or len(df) < lookback_periods: + return FibonacciAnalyzer._empty_result(timeframe) + + lookback_df = df.tail(lookback_periods) + current_price = float(df.iloc[-1]['close']) + + # Find significant swing high and low + swing_high, swing_high_idx = FibonacciAnalyzer._find_swing_high(lookback_df) + swing_low, swing_low_idx = FibonacciAnalyzer._find_swing_low(lookback_df) + + if swing_high is None or swing_low is None: + return FibonacciAnalyzer._empty_result(timeframe) + + # Determine trend direction based on which came first + is_uptrend = swing_low_idx < swing_high_idx + + # Calculate retracement levels + if is_uptrend: + # In uptrend, retracement levels from high to low + retracement_levels = FibonacciAnalyzer._calculate_retracement( + swing_high, swing_low, 'uptrend' + ) + extension_levels = FibonacciAnalyzer._calculate_extension( + swing_low, swing_high, 'uptrend' + ) + else: + # In downtrend, retracement levels from low to high + retracement_levels = FibonacciAnalyzer._calculate_retracement( + swing_low, swing_high, 'downtrend' + ) + extension_levels = FibonacciAnalyzer._calculate_extension( + swing_high, swing_low, 'downtrend' + ) + + # Identify key support/resistance from Fibonacci + supports, resistances = FibonacciAnalyzer._identify_sr_from_fib( + retracement_levels, extension_levels, current_price + ) + + # Add pivot-based support/resistance + pivot_levels = FibonacciAnalyzer._calculate_pivot_points(lookback_df) + + # Merge and cluster all levels + all_supports = FibonacciAnalyzer._merge_and_cluster_levels( + supports + pivot_levels.get('supports', []), current_price + ) + all_resistances = FibonacciAnalyzer._merge_and_cluster_levels( + resistances + pivot_levels.get('resistances', []), current_price + ) + + # Find nearest levels + nearest_support = FibonacciAnalyzer._find_nearest_below(all_supports, current_price) + nearest_resistance = FibonacciAnalyzer._find_nearest_above(all_resistances, current_price) + + # Calculate key zones + key_zones = FibonacciAnalyzer._identify_key_zones( + retracement_levels, current_price, is_uptrend + ) + + return { + 'timeframe': timeframe, + 'current_price': round(current_price, 2), + 'trend_context': 'uptrend' if is_uptrend else 'downtrend', + 'swing_high': round(swing_high, 2), + 'swing_low': round(swing_low, 2), + 'swing_range': round(swing_high - swing_low, 2), + 'swing_range_pct': round((swing_high - swing_low) / swing_low * 100, 2), + + # Fibonacci levels + 'fibonacci': { + 'retracement': { + str(ratio): round(level, 2) + for ratio, level in retracement_levels.items() + }, + 'extension': { + str(ratio): round(level, 2) + for ratio, level in extension_levels.items() + }, + }, + + # Support/Resistance + 'supports': [round(s, 2) for s in all_supports[:5]], # Top 5 + 'resistances': [round(r, 2) for r in all_resistances[:5]], # Top 5 + 'nearest_support': round(nearest_support, 2) if nearest_support else None, + 'nearest_resistance': round(nearest_resistance, 2) if nearest_resistance else None, + + # Distance to key levels + 'distance_to_support': FibonacciAnalyzer._calculate_distance( + current_price, nearest_support + ) if nearest_support else None, + 'distance_to_resistance': FibonacciAnalyzer._calculate_distance( + current_price, nearest_resistance + ) if nearest_resistance else None, + + # Key zones for trading + 'key_zones': key_zones, + + # Pivot points + 'pivot': pivot_levels.get('pivot'), + } + + @staticmethod + def analyze_multi_timeframe_levels( + mtf_data: Dict[str, pd.DataFrame] + ) -> Dict[str, Any]: + """ + Analyze support/resistance across multiple timeframes + + Args: + mtf_data: Dict mapping timeframe to DataFrame + + Returns: + Consolidated support/resistance analysis + """ + results = {} + all_supports = [] + all_resistances = [] + current_price = None + + # Analyze each timeframe + for tf, df in mtf_data.items(): + if df.empty: + continue + + # Adjust lookback based on timeframe + lookback = FibonacciAnalyzer._get_lookback_for_timeframe(tf) + tf_result = FibonacciAnalyzer.calculate_fibonacci_levels( + df, lookback_periods=lookback, timeframe=tf + ) + results[tf] = tf_result + + if current_price is None: + current_price = tf_result.get('current_price', 0) + + # Collect levels with timeframe weight + weight = FibonacciAnalyzer._get_timeframe_weight(tf) + for s in tf_result.get('supports', []): + all_supports.append((s, weight, tf)) + for r in tf_result.get('resistances', []): + all_resistances.append((r, weight, tf)) + + if current_price is None: + return {'error': 'No data available'} + + # Find confluence zones (levels appearing in multiple timeframes) + support_confluence = FibonacciAnalyzer._find_confluence_zones( + all_supports, current_price, direction='below' + ) + resistance_confluence = FibonacciAnalyzer._find_confluence_zones( + all_resistances, current_price, direction='above' + ) + + # Generate LLM-readable summary + summary = FibonacciAnalyzer._generate_sr_summary( + support_confluence, resistance_confluence, current_price + ) + + return { + 'timeframes': results, + 'confluence': { + 'supports': support_confluence[:5], + 'resistances': resistance_confluence[:5], + }, + 'strongest_support': support_confluence[0] if support_confluence else None, + 'strongest_resistance': resistance_confluence[0] if resistance_confluence else None, + 'current_price': round(current_price, 2), + 'summary': summary, + } + + @staticmethod + def _find_swing_high(df: pd.DataFrame) -> Tuple[Optional[float], Optional[int]]: + """Find the most significant swing high in the data""" + highs = df['high'].values + if len(highs) < 5: + return None, None + + # Find local maxima + swing_highs = [] + for i in range(2, len(highs) - 2): + if (highs[i] > highs[i-1] and highs[i] > highs[i-2] and + highs[i] > highs[i+1] and highs[i] > highs[i+2]): + swing_highs.append((highs[i], i)) + + if not swing_highs: + # Fall back to absolute max + max_idx = np.argmax(highs) + return float(highs[max_idx]), int(max_idx) + + # Return the highest swing + swing_highs.sort(key=lambda x: x[0], reverse=True) + return float(swing_highs[0][0]), int(swing_highs[0][1]) + + @staticmethod + def _find_swing_low(df: pd.DataFrame) -> Tuple[Optional[float], Optional[int]]: + """Find the most significant swing low in the data""" + lows = df['low'].values + if len(lows) < 5: + return None, None + + # Find local minima + swing_lows = [] + for i in range(2, len(lows) - 2): + if (lows[i] < lows[i-1] and lows[i] < lows[i-2] and + lows[i] < lows[i+1] and lows[i] < lows[i+2]): + swing_lows.append((lows[i], i)) + + if not swing_lows: + # Fall back to absolute min + min_idx = np.argmin(lows) + return float(lows[min_idx]), int(min_idx) + + # Return the lowest swing + swing_lows.sort(key=lambda x: x[0]) + return float(swing_lows[0][0]), int(swing_lows[0][1]) + + @staticmethod + def _calculate_retracement( + high: float, + low: float, + trend: str + ) -> Dict[float, float]: + """Calculate Fibonacci retracement levels""" + diff = high - low + levels = {} + + for ratio in FibonacciAnalyzer.FIB_RETRACEMENT: + if trend == 'uptrend': + # In uptrend, retracement down from high + levels[ratio] = high - (diff * ratio) + else: + # In downtrend, retracement up from low + levels[ratio] = low + (diff * ratio) + + return levels + + @staticmethod + def _calculate_extension( + start: float, + end: float, + trend: str + ) -> Dict[float, float]: + """Calculate Fibonacci extension levels""" + diff = abs(end - start) + levels = {} + + for ratio in FibonacciAnalyzer.FIB_EXTENSION: + if trend == 'uptrend': + # Extension above the high + levels[ratio] = end + (diff * (ratio - 1)) + else: + # Extension below the low + levels[ratio] = end - (diff * (ratio - 1)) + + return levels + + @staticmethod + def _identify_sr_from_fib( + retracement: Dict[float, float], + extension: Dict[float, float], + current_price: float + ) -> Tuple[List[float], List[float]]: + """Identify support and resistance from Fibonacci levels""" + supports = [] + resistances = [] + + # From retracement levels + for ratio, level in retracement.items(): + if level < current_price: + supports.append(level) + elif level > current_price: + resistances.append(level) + + # From extension levels + for ratio, level in extension.items(): + if level < current_price: + supports.append(level) + elif level > current_price: + resistances.append(level) + + # Sort + supports = sorted(set(supports), reverse=True) + resistances = sorted(set(resistances)) + + return supports, resistances + + @staticmethod + def _calculate_pivot_points(df: pd.DataFrame) -> Dict[str, Any]: + """Calculate pivot points (Classic formula)""" + if len(df) < 2: + return {'pivot': None, 'supports': [], 'resistances': []} + + # Use previous period's data + prev = df.iloc[-2] + high = float(prev['high']) + low = float(prev['low']) + close = float(prev['close']) + + # Classic pivot point + pivot = (high + low + close) / 3 + + # Support levels + s1 = 2 * pivot - high + s2 = pivot - (high - low) + s3 = low - 2 * (high - pivot) + + # Resistance levels + r1 = 2 * pivot - low + r2 = pivot + (high - low) + r3 = high + 2 * (pivot - low) + + return { + 'pivot': round(pivot, 2), + 'supports': [round(s1, 2), round(s2, 2), round(s3, 2)], + 'resistances': [round(r1, 2), round(r2, 2), round(r3, 2)], + } + + @staticmethod + def _merge_and_cluster_levels( + levels: List[float], + reference: float, + tolerance_pct: float = 0.005 + ) -> List[float]: + """Merge similar levels within tolerance""" + if not levels: + return [] + + tolerance = reference * tolerance_pct + sorted_levels = sorted(levels) + clustered = [] + + current_cluster = [sorted_levels[0]] + for level in sorted_levels[1:]: + if abs(level - current_cluster[-1]) < tolerance: + current_cluster.append(level) + else: + # Average the cluster + clustered.append(np.mean(current_cluster)) + current_cluster = [level] + + # Add last cluster + if current_cluster: + clustered.append(np.mean(current_cluster)) + + return clustered + + @staticmethod + def _find_nearest_below(levels: List[float], price: float) -> Optional[float]: + """Find nearest level below current price""" + below = [l for l in levels if l < price] + if below: + return max(below) + return None + + @staticmethod + def _find_nearest_above(levels: List[float], price: float) -> Optional[float]: + """Find nearest level above current price""" + above = [l for l in levels if l > price] + if above: + return min(above) + return None + + @staticmethod + def _calculate_distance(current: float, target: Optional[float]) -> Dict[str, float]: + """Calculate distance to target level""" + if target is None or current == 0: + return None + diff = target - current + pct = (diff / current) * 100 + return { + 'absolute': round(abs(diff), 2), + 'percentage': round(pct, 2), + 'direction': 'above' if diff > 0 else 'below', + } + + @staticmethod + def _identify_key_zones( + retracement: Dict[float, float], + current_price: float, + is_uptrend: bool + ) -> List[Dict[str, Any]]: + """Identify key trading zones from Fibonacci levels""" + zones = [] + + # Key Fibonacci levels with descriptions + key_levels = { + 0.382: '浅回调区 (38.2%)', + 0.5: '中度回调区 (50%)', + 0.618: '黄金分割区 (61.8%)', + } + + for ratio, description in key_levels.items(): + if ratio in retracement: + level = retracement[ratio] + distance_pct = ((level - current_price) / current_price) * 100 + + zone = { + 'ratio': ratio, + 'level': round(level, 2), + 'description': description, + 'distance_pct': round(distance_pct, 2), + 'type': 'support' if level < current_price else 'resistance', + } + + # Importance rating + if ratio == 0.618: + zone['importance'] = 'high' + zone['importance_cn'] = '重要' + elif ratio == 0.5: + zone['importance'] = 'medium' + zone['importance_cn'] = '中等' + else: + zone['importance'] = 'low' + zone['importance_cn'] = '一般' + + zones.append(zone) + + return zones + + @staticmethod + def _find_confluence_zones( + levels_with_weight: List[Tuple[float, float, str]], + current_price: float, + direction: str = 'below' + ) -> List[Dict[str, Any]]: + """ + Find confluence zones where multiple timeframes have similar levels + + Args: + levels_with_weight: List of (level, weight, timeframe) + current_price: Current price + direction: 'below' for supports, 'above' for resistances + + Returns: + List of confluence zones sorted by strength + """ + if not levels_with_weight: + return [] + + # Filter by direction + if direction == 'below': + filtered = [(l, w, tf) for l, w, tf in levels_with_weight if l < current_price] + else: + filtered = [(l, w, tf) for l, w, tf in levels_with_weight if l > current_price] + + if not filtered: + return [] + + # Cluster levels with tolerance + tolerance = current_price * 0.005 # 0.5% + sorted_levels = sorted(filtered, key=lambda x: x[0]) + + clusters = [] + current_cluster = [sorted_levels[0]] + + for level_info in sorted_levels[1:]: + if abs(level_info[0] - current_cluster[-1][0]) < tolerance: + current_cluster.append(level_info) + else: + clusters.append(current_cluster) + current_cluster = [level_info] + + if current_cluster: + clusters.append(current_cluster) + + # Calculate confluence strength for each cluster + confluence_zones = [] + for cluster in clusters: + avg_level = np.mean([l[0] for l in cluster]) + total_weight = sum(l[1] for l in cluster) + timeframes = list(set(l[2] for l in cluster)) + num_timeframes = len(timeframes) + + # Confluence score based on weight and number of timeframes + confluence_score = total_weight * (1 + 0.2 * num_timeframes) + + distance_pct = ((avg_level - current_price) / current_price) * 100 + + confluence_zones.append({ + 'level': round(avg_level, 2), + 'confluence_score': round(confluence_score, 2), + 'num_timeframes': num_timeframes, + 'timeframes': timeframes, + 'distance_pct': round(distance_pct, 2), + 'strength': FibonacciAnalyzer._score_to_strength(confluence_score), + 'strength_cn': FibonacciAnalyzer._score_to_strength_cn(confluence_score), + }) + + # Sort by confluence score (higher = stronger) + confluence_zones.sort(key=lambda x: x['confluence_score'], reverse=True) + + return confluence_zones + + @staticmethod + def _score_to_strength(score: float) -> str: + """Convert confluence score to strength label""" + if score >= 0.8: + return 'very_strong' + elif score >= 0.5: + return 'strong' + elif score >= 0.3: + return 'moderate' + else: + return 'weak' + + @staticmethod + def _score_to_strength_cn(score: float) -> str: + """Convert confluence score to Chinese strength label""" + if score >= 0.8: + return '非常强' + elif score >= 0.5: + return '强' + elif score >= 0.3: + return '中等' + else: + return '弱' + + @staticmethod + def _get_lookback_for_timeframe(tf: str) -> int: + """Get appropriate lookback periods for each timeframe""" + lookbacks = { + '5m': 200, + '15m': 150, + '1h': 100, + '4h': 80, + '1d': 60, + '1w': 40, + } + return lookbacks.get(tf, 100) + + @staticmethod + def _get_timeframe_weight(tf: str) -> float: + """Get weight for each timeframe (longer = more weight)""" + weights = { + '5m': 0.05, + '15m': 0.10, + '1h': 0.15, + '4h': 0.20, + '1d': 0.25, + '1w': 0.25, + } + return weights.get(tf, 0.1) + + @staticmethod + def _generate_sr_summary( + supports: List[Dict], + resistances: List[Dict], + current_price: float + ) -> str: + """Generate LLM-readable summary of support/resistance""" + parts = [] + + # Strongest support + if supports: + s = supports[0] + parts.append( + f"最强支撑: ${s['level']:,.0f} ({s['distance_pct']:.1f}%, " + f"{s['num_timeframes']}个周期确认, {s['strength_cn']})" + ) + + # Strongest resistance + if resistances: + r = resistances[0] + parts.append( + f"最强压力: ${r['level']:,.0f} ({r['distance_pct']:.1f}%, " + f"{r['num_timeframes']}个周期确认, {r['strength_cn']})" + ) + + # Price position + if supports and resistances: + s_dist = abs(supports[0]['distance_pct']) + r_dist = abs(resistances[0]['distance_pct']) + if s_dist < r_dist: + parts.append("价格更接近支撑位") + else: + parts.append("价格更接近压力位") + + return "; ".join(parts) if parts else "无明确支撑压力位" + + @staticmethod + def _empty_result(timeframe: str) -> Dict[str, Any]: + """Return empty result when data is insufficient""" + return { + 'timeframe': timeframe, + 'error': 'insufficient_data', + 'current_price': 0, + 'supports': [], + 'resistances': [], + 'nearest_support': None, + 'nearest_resistance': None, + 'fibonacci': { + 'retracement': {}, + 'extension': {}, + }, + 'key_zones': [], + } diff --git a/analysis/llm_dataset_builder.py b/analysis/llm_dataset_builder.py new file mode 100644 index 0000000..c43f048 --- /dev/null +++ b/analysis/llm_dataset_builder.py @@ -0,0 +1,592 @@ +""" +LLM Market Dataset Builder - Integrates all analysis modules +Generates structured, human-readable market data for LLM decision making +""" +import logging +from typing import Dict, Any, List, Optional +from datetime import datetime +import pandas as pd + +from .data_reader import MarketDataReader +from .indicators import TechnicalIndicators +from .trend_analyzer import TrendAnalyzer +from .fibonacci import FibonacciAnalyzer +from .orderflow_enhanced import EnhancedOrderFlowAnalyzer +from .momentum_analyzer import MomentumAnalyzer +from .config import config + + +logger = logging.getLogger(__name__) + + +# K-line limits per timeframe +KLINE_LIMITS = { + '5m': 288, # 1 day + '15m': 96, # 1 day + '1h': 72, # 3 days + '4h': 42, # 7 days + '1d': 60, # 60 days + '1w': 52, # 1 year +} + + +class LLMDatasetBuilder: + """ + Builds comprehensive market dataset for LLM analysis + + Integrates: + 1. Multi-timeframe trend analysis (EMA alignment) + 2. Fibonacci support/resistance levels + 3. Order flow / bull-bear battle analysis + 4. Momentum and divergence analysis + 5. K-line data for pattern recognition + """ + + def __init__(self, symbol: str = "BTCUSDT"): + self.symbol = symbol + self.data_reader = MarketDataReader(symbol=symbol) + + def build_complete_dataset(self, symbol: str = None) -> Dict[str, Any]: + """ + Build complete market dataset for LLM consumption + + Args: + symbol: Trading symbol (uses instance symbol if None) + + Returns: + Comprehensive market analysis dict + """ + symbol = symbol or self.symbol + start_time = datetime.now() + + try: + # 1. Fetch multi-timeframe K-line data + mtf_data = self._fetch_multi_timeframe_data(symbol) + + if not mtf_data or '5m' not in mtf_data: + logger.error("Failed to fetch K-line data") + return self._empty_dataset() + + # Get current price + current_price = float(mtf_data['5m'].iloc[-1]['close']) + + # 2. Fetch order book data + depth_data = self.data_reader.read_latest_depth() + + # 3. Build analysis sections + # 3.1 Trend Analysis + trend_analysis = TrendAnalyzer.analyze_multi_timeframe_trend(mtf_data) + + # 3.2 Support/Resistance (Fibonacci) + sr_analysis = FibonacciAnalyzer.analyze_multi_timeframe_levels(mtf_data) + + # 3.3 Order Flow Analysis + if depth_data: + orderflow_analysis = EnhancedOrderFlowAnalyzer.analyze_full_orderflow( + depth_data, current_price + ) + else: + orderflow_analysis = EnhancedOrderFlowAnalyzer._empty_result() + + # 3.4 Momentum Analysis + momentum_analysis = MomentumAnalyzer.analyze_multi_timeframe_momentum(mtf_data) + + # 3.5 K-line data for LLM + kline_data = self._build_kline_data(mtf_data) + + # 4. Generate overall market assessment + market_assessment = self._generate_market_assessment( + trend_analysis, + sr_analysis, + orderflow_analysis, + momentum_analysis + ) + + # 5. Generate trading recommendations + recommendations = self._generate_recommendations( + trend_analysis, + sr_analysis, + orderflow_analysis, + momentum_analysis, + current_price + ) + + # 6. Build final dataset + processing_time = (datetime.now() - start_time).total_seconds() + + dataset = { + 'metadata': { + 'symbol': symbol, + 'timestamp': datetime.now().isoformat(), + 'processing_time_sec': round(processing_time, 2), + 'data_quality': self._assess_data_quality(mtf_data, depth_data), + }, + + 'price': { + 'current': round(current_price, 2), + 'change_24h': self._calculate_price_change(mtf_data.get('1h'), 24), + 'change_1h': self._calculate_price_change(mtf_data.get('5m'), 12), + }, + + # Main analysis sections + 'trend': trend_analysis, + 'support_resistance': sr_analysis, + 'orderflow': orderflow_analysis, + 'momentum': momentum_analysis, + + # K-line data for pattern recognition + 'klines': kline_data, + + # Overall assessment + 'assessment': market_assessment, + + # Trading recommendations + 'recommendations': recommendations, + + # Concise summary for LLM + 'summary': self._generate_summary( + market_assessment, + recommendations, + current_price + ), + } + + logger.info( + f"Built LLM dataset for {symbol}: " + f"trend={trend_analysis.get('dominant_trend', {}).get('direction_cn', '?')}, " + f"processing={processing_time:.2f}s" + ) + + return dataset + + except Exception as e: + logger.error(f"Error building LLM dataset: {e}", exc_info=True) + return self._empty_dataset() + + def _fetch_multi_timeframe_data(self, symbol: str) -> Dict[str, pd.DataFrame]: + """Fetch K-line data for all timeframes""" + data = {} + + for tf, limit in KLINE_LIMITS.items(): + df = self.data_reader.fetch_historical_klines_from_api( + symbol=symbol, interval=tf, limit=limit + ) + if not df.empty: + # Add technical indicators + df = TechnicalIndicators.add_all_indicators(df) + data[tf] = df + + return data + + def _build_kline_data(self, mtf_data: Dict[str, pd.DataFrame]) -> Dict[str, List[Dict]]: + """Build K-line data in compact format for LLM""" + kline_data = {} + + for tf, df in mtf_data.items(): + if df.empty: + continue + + # Get appropriate number of candles + limit = KLINE_LIMITS.get(tf, 50) + df_limited = df.tail(limit) + + # Convert to compact format + klines = [] + for idx, row in df_limited.iterrows(): + kline = { + 't': idx.strftime('%Y-%m-%d %H:%M'), + 'o': round(row['open'], 2), + 'h': round(row['high'], 2), + 'l': round(row['low'], 2), + 'c': round(row['close'], 2), + 'v': round(row['volume'], 4), + } + klines.append(kline) + + kline_data[tf] = klines + + return kline_data + + def _generate_market_assessment( + self, + trend: Dict[str, Any], + sr: Dict[str, Any], + orderflow: Dict[str, Any], + momentum: Dict[str, Any] + ) -> Dict[str, Any]: + """Generate overall market assessment from all analyses""" + + # Trend bias + dominant_trend = trend.get('dominant_trend', {}) + trend_direction = dominant_trend.get('direction', 'sideways') + trend_confidence = dominant_trend.get('confidence', 0) + + # Momentum bias + momentum_alignment = momentum.get('alignment', {}) + momentum_status = momentum_alignment.get('status', 'conflicting') + + # Orderflow bias + orderflow_direction = orderflow.get('assessment', {}).get('direction', 'neutral') + + # Calculate overall bias score (-1 to +1) + bias_score = 0 + bias_factors = [] + + # Trend contribution (40%) + if trend_direction == 'uptrend': + bias_score += 0.4 * trend_confidence + bias_factors.append(('trend', 0.4 * trend_confidence)) + elif trend_direction == 'downtrend': + bias_score -= 0.4 * trend_confidence + bias_factors.append(('trend', -0.4 * trend_confidence)) + + # Momentum contribution (30%) + if momentum_status in ['aligned_bullish']: + bias_score += 0.3 + bias_factors.append(('momentum', 0.3)) + elif momentum_status in ['aligned_bearish']: + bias_score -= 0.3 + bias_factors.append(('momentum', -0.3)) + elif momentum_status == 'mixed_bullish': + bias_score += 0.15 + bias_factors.append(('momentum', 0.15)) + elif momentum_status == 'mixed_bearish': + bias_score -= 0.15 + bias_factors.append(('momentum', -0.15)) + + # Orderflow contribution (30%) + if orderflow_direction == 'bulls': + bias_score += 0.3 + bias_factors.append(('orderflow', 0.3)) + elif orderflow_direction == 'bears': + bias_score -= 0.3 + bias_factors.append(('orderflow', -0.3)) + + # Determine overall bias + if bias_score > 0.4: + overall_bias = 'strongly_bullish' + overall_bias_cn = '强烈看涨' + elif bias_score > 0.2: + overall_bias = 'bullish' + overall_bias_cn = '看涨' + elif bias_score > 0.05: + overall_bias = 'slightly_bullish' + overall_bias_cn = '略偏多' + elif bias_score < -0.4: + overall_bias = 'strongly_bearish' + overall_bias_cn = '强烈看跌' + elif bias_score < -0.2: + overall_bias = 'bearish' + overall_bias_cn = '看跌' + elif bias_score < -0.05: + overall_bias = 'slightly_bearish' + overall_bias_cn = '略偏空' + else: + overall_bias = 'neutral' + overall_bias_cn = '中性观望' + + # Check for divergences (warning signal) + div_confluence = momentum.get('divergence_confluence', {}) + has_warning = div_confluence.get('has_confluence', False) + warning_type = div_confluence.get('type', 'none') + + return { + 'overall_bias': overall_bias, + 'overall_bias_cn': overall_bias_cn, + 'bias_score': round(bias_score, 2), + 'confidence': round(abs(bias_score) * 100, 0), + 'bias_factors': bias_factors, + 'has_warning': has_warning, + 'warning': warning_type if has_warning else None, + 'warning_cn': div_confluence.get('type_cn') if has_warning else None, + 'components': { + 'trend': { + 'direction': trend_direction, + 'direction_cn': dominant_trend.get('direction_cn', '?'), + 'confidence': trend_confidence, + }, + 'momentum': { + 'status': momentum_status, + 'status_cn': momentum_alignment.get('status_cn', '?'), + }, + 'orderflow': { + 'direction': orderflow_direction, + 'direction_cn': orderflow.get('assessment', {}).get('direction_cn', '?'), + }, + }, + } + + def _generate_recommendations( + self, + trend: Dict[str, Any], + sr: Dict[str, Any], + orderflow: Dict[str, Any], + momentum: Dict[str, Any], + current_price: float + ) -> Dict[str, Any]: + """Generate trading recommendations for different timeframes""" + + # Get key levels + strongest_support = sr.get('confluence', {}).get('supports', [{}])[0] if sr.get('confluence', {}).get('supports') else None + strongest_resistance = sr.get('confluence', {}).get('resistances', [{}])[0] if sr.get('confluence', {}).get('resistances') else None + + # Get trend info + dominant = trend.get('dominant_trend', {}) + trading_bias = trend.get('trading_bias', {}) + + # Short-term (5m-1h) recommendation + short_term = self._generate_timeframe_recommendation( + 'short', + trend.get('timeframes', {}).get('15m', {}), + momentum.get('timeframes', {}).get('15m', {}), + orderflow, + current_price, + strongest_support, + strongest_resistance + ) + + # Medium-term (4h-1d) recommendation + medium_term = self._generate_timeframe_recommendation( + 'medium', + trend.get('timeframes', {}).get('4h', {}), + momentum.get('timeframes', {}).get('4h', {}), + orderflow, + current_price, + strongest_support, + strongest_resistance + ) + + # Long-term (1d-1w) recommendation + long_term = self._generate_timeframe_recommendation( + 'long', + trend.get('timeframes', {}).get('1d', {}), + momentum.get('timeframes', {}).get('1d', {}), + orderflow, + current_price, + strongest_support, + strongest_resistance + ) + + return { + 'short_term': short_term, + 'medium_term': medium_term, + 'long_term': long_term, + 'key_levels': { + 'strongest_support': strongest_support.get('level') if strongest_support else None, + 'strongest_resistance': strongest_resistance.get('level') if strongest_resistance else None, + 'support_distance_pct': strongest_support.get('distance_pct') if strongest_support else None, + 'resistance_distance_pct': strongest_resistance.get('distance_pct') if strongest_resistance else None, + }, + 'overall': trading_bias.get('recommendation', '方向不明,建议观望'), + } + + def _generate_timeframe_recommendation( + self, + timeframe_type: str, + trend_info: Dict[str, Any], + momentum_info: Dict[str, Any], + orderflow: Dict[str, Any], + current_price: float, + support: Optional[Dict], + resistance: Optional[Dict] + ) -> Dict[str, Any]: + """Generate recommendation for a specific timeframe type""" + + trend_direction = trend_info.get('direction', 'sideways') + trend_strength = trend_info.get('strength', 'weak') + momentum_direction = momentum_info.get('assessment', {}).get('direction', 'neutral') + + # Determine action + if trend_direction == 'uptrend' and momentum_direction in ['bullish', 'neutral']: + action = 'long' + action_cn = '做多' + if trend_strength in ['strong', 'very_strong']: + confidence = 'high' + else: + confidence = 'moderate' + elif trend_direction == 'downtrend' and momentum_direction in ['bearish', 'neutral']: + action = 'short' + action_cn = '做空' + if trend_strength in ['strong', 'very_strong']: + confidence = 'high' + else: + confidence = 'moderate' + else: + action = 'wait' + action_cn = '观望' + confidence = 'low' + + # Entry zone + if action == 'long' and support: + entry_zone = { + 'ideal': support.get('level'), + 'current': current_price, + 'description': f"理想入场: ${support.get('level', 0):,.0f}附近 (支撑区)" + } + elif action == 'short' and resistance: + entry_zone = { + 'ideal': resistance.get('level'), + 'current': current_price, + 'description': f"理想入场: ${resistance.get('level', 0):,.0f}附近 (阻力区)" + } + else: + entry_zone = { + 'ideal': current_price, + 'current': current_price, + 'description': "无明确入场区域" + } + + return { + 'action': action, + 'action_cn': action_cn, + 'confidence': confidence, + 'confidence_cn': '高' if confidence == 'high' else '中等' if confidence == 'moderate' else '低', + 'trend': trend_info.get('direction_cn', '?'), + 'trend_strength': trend_info.get('strength_cn', '?'), + 'momentum': momentum_info.get('assessment', {}).get('direction_cn', '?'), + 'entry_zone': entry_zone, + 'reasoning': self._generate_reasoning( + trend_direction, trend_strength, momentum_direction, action + ), + } + + def _generate_reasoning( + self, + trend_direction: str, + trend_strength: str, + momentum_direction: str, + action: str + ) -> str: + """Generate reasoning for recommendation""" + if action == 'long': + return f"趋势{self._cn(trend_direction)}({self._cn(trend_strength)}), 动能{self._cn(momentum_direction)}, 适合做多" + elif action == 'short': + return f"趋势{self._cn(trend_direction)}({self._cn(trend_strength)}), 动能{self._cn(momentum_direction)}, 适合做空" + else: + if trend_direction == 'sideways': + return "市场处于震荡盘整,方向不明,建议观望" + else: + return f"趋势与动能存在分歧,等待信号确认" + + def _cn(self, value: str) -> str: + """Convert English value to Chinese""" + mapping = { + 'uptrend': '上涨', + 'downtrend': '下跌', + 'sideways': '震荡', + 'strong': '强', + 'moderate': '中等', + 'weak': '弱', + 'very_strong': '非常强', + 'bullish': '看涨', + 'bearish': '看跌', + 'neutral': '中性', + } + return mapping.get(value, value) + + def _generate_summary( + self, + assessment: Dict[str, Any], + recommendations: Dict[str, Any], + current_price: float + ) -> str: + """Generate concise summary for LLM""" + parts = [] + + # Current price + parts.append(f"当前价格: ${current_price:,.2f}") + + # Overall bias + bias_cn = assessment.get('overall_bias_cn', '?') + confidence = assessment.get('confidence', 0) + parts.append(f"市场偏向: {bias_cn} (置信度{confidence:.0f}%)") + + # Warning if any + if assessment.get('has_warning'): + warning_cn = assessment.get('warning_cn', '') + parts.append(f"警告: {warning_cn}") + + # Key levels + key_levels = recommendations.get('key_levels', {}) + if key_levels.get('strongest_support'): + parts.append(f"关键支撑: ${key_levels['strongest_support']:,.0f}") + if key_levels.get('strongest_resistance'): + parts.append(f"关键阻力: ${key_levels['strongest_resistance']:,.0f}") + + # Recommendation + overall = recommendations.get('overall', '') + if overall: + parts.append(f"建议: {overall}") + + return "; ".join(parts) + + def _calculate_price_change( + self, + df: Optional[pd.DataFrame], + periods: int + ) -> Optional[Dict[str, float]]: + """Calculate price change over N periods""" + if df is None or len(df) < periods: + return None + + current = float(df.iloc[-1]['close']) + past = float(df.iloc[-periods]['close']) + change = current - past + change_pct = (change / past) * 100 if past > 0 else 0 + + return { + 'absolute': round(change, 2), + 'percentage': round(change_pct, 2), + } + + def _assess_data_quality( + self, + mtf_data: Dict[str, pd.DataFrame], + depth_data: Optional[Dict] + ) -> Dict[str, Any]: + """Assess quality of available data""" + quality = { + 'klines': {}, + 'orderbook': depth_data is not None, + 'overall': 'good', + } + + missing_tfs = [] + for tf in ['5m', '15m', '1h', '4h', '1d']: + if tf in mtf_data and not mtf_data[tf].empty: + quality['klines'][tf] = { + 'available': True, + 'count': len(mtf_data[tf]), + } + else: + quality['klines'][tf] = {'available': False} + missing_tfs.append(tf) + + if len(missing_tfs) > 2: + quality['overall'] = 'poor' + elif len(missing_tfs) > 0 or not depth_data: + quality['overall'] = 'fair' + + return quality + + def _empty_dataset(self) -> Dict[str, Any]: + """Return empty dataset when data is unavailable""" + return { + 'metadata': { + 'symbol': self.symbol, + 'timestamp': datetime.now().isoformat(), + 'error': 'Failed to fetch data', + }, + 'price': {'current': 0}, + 'trend': {}, + 'support_resistance': {}, + 'orderflow': {}, + 'momentum': {}, + 'klines': {}, + 'assessment': { + 'overall_bias': 'unknown', + 'overall_bias_cn': '数据不可用', + }, + 'recommendations': {}, + 'summary': '数据获取失败,无法进行分析', + } diff --git a/analysis/momentum_analyzer.py b/analysis/momentum_analyzer.py new file mode 100644 index 0000000..c42135f --- /dev/null +++ b/analysis/momentum_analyzer.py @@ -0,0 +1,796 @@ +""" +Technical Indicator Synthesis - Momentum and Volume-Price Analysis +Provides structured technical analysis for LLM decision making +""" +import logging +from typing import Dict, Any, List, Optional +import pandas as pd +import numpy as np + +from .indicators import TechnicalIndicators +from .config import config + + +logger = logging.getLogger(__name__) + + +class MomentumAnalyzer: + """ + Technical indicator synthesis and divergence detection + + Provides: + 1. Momentum analysis (RSI, MACD, Stochastic) + 2. Volume-Price relationship + 3. Divergence detection (bullish/bearish) + 4. Overbought/Oversold conditions + 5. LLM-ready structured output + """ + + @staticmethod + def analyze_momentum(df: pd.DataFrame, timeframe: str = '1h') -> Dict[str, Any]: + """ + Perform comprehensive momentum analysis + + Args: + df: DataFrame with OHLCV data + timeframe: Timeframe for context + + Returns: + Complete momentum analysis for LLM + """ + if df.empty or len(df) < 50: + return MomentumAnalyzer._empty_result(timeframe) + + # Ensure indicators are calculated + if 'rsi' not in df.columns: + df = TechnicalIndicators.add_all_indicators(df) + + latest = df.iloc[-1] + current_price = float(latest['close']) + + # 1. RSI Analysis + rsi_analysis = MomentumAnalyzer._analyze_rsi(df) + + # 2. MACD Analysis + macd_analysis = MomentumAnalyzer._analyze_macd(df) + + # 3. Stochastic Analysis + stoch_analysis = MomentumAnalyzer._analyze_stochastic(df) + + # 4. Volume-Price Analysis + volume_analysis = MomentumAnalyzer._analyze_volume_price(df) + + # 5. Divergence Detection + divergences = MomentumAnalyzer._detect_divergences(df) + + # 6. Overall Momentum Assessment + assessment = MomentumAnalyzer._assess_overall_momentum( + rsi_analysis, macd_analysis, stoch_analysis, volume_analysis, divergences + ) + + # 7. Generate Summary + summary = MomentumAnalyzer._generate_summary( + rsi_analysis, macd_analysis, volume_analysis, divergences, assessment + ) + + return { + 'timeframe': timeframe, + 'current_price': round(current_price, 2), + + # Individual indicators + 'rsi': rsi_analysis, + 'macd': macd_analysis, + 'stochastic': stoch_analysis, + + # Volume analysis + 'volume': volume_analysis, + + # Divergences + 'divergences': divergences, + + # Overall assessment + 'assessment': assessment, + + # LLM-ready summary + 'summary': summary, + } + + @staticmethod + def analyze_multi_timeframe_momentum( + mtf_data: Dict[str, pd.DataFrame] + ) -> Dict[str, Any]: + """ + Analyze momentum across multiple timeframes + + Args: + mtf_data: Dict mapping timeframe to DataFrame + + Returns: + Multi-timeframe momentum analysis + """ + results = {} + momentum_directions = {} + + for tf, df in mtf_data.items(): + if df.empty: + continue + + momentum_info = MomentumAnalyzer.analyze_momentum(df, tf) + results[tf] = momentum_info + momentum_directions[tf] = momentum_info['assessment'].get('direction', 'neutral') + + # Calculate cross-timeframe alignment + alignment = MomentumAnalyzer._calculate_mtf_alignment(momentum_directions) + + # Find divergence confluence + divergence_confluence = MomentumAnalyzer._find_divergence_confluence(results) + + return { + 'timeframes': results, + 'alignment': alignment, + 'divergence_confluence': divergence_confluence, + 'summary': MomentumAnalyzer._generate_mtf_summary( + results, alignment, divergence_confluence + ), + } + + @staticmethod + def _analyze_rsi(df: pd.DataFrame) -> Dict[str, Any]: + """Analyze RSI indicator""" + latest = df.iloc[-1] + rsi = float(latest.get('rsi', 50)) + + # RSI trend (last 5 periods) + if len(df) >= 5: + rsi_values = df['rsi'].tail(5).values + rsi_change = rsi_values[-1] - rsi_values[0] + if rsi_change > 5: + trend = 'rising' + trend_cn = '上升' + elif rsi_change < -5: + trend = 'falling' + trend_cn = '下降' + else: + trend = 'flat' + trend_cn = '平稳' + else: + trend = 'unknown' + trend_cn = '未知' + rsi_change = 0 + + # RSI zone + if rsi >= 80: + zone = 'extreme_overbought' + zone_cn = '极度超买' + signal = 'bearish' + elif rsi >= 70: + zone = 'overbought' + zone_cn = '超买' + signal = 'bearish_warning' + elif rsi >= 60: + zone = 'strong' + zone_cn = '强势' + signal = 'bullish' + elif rsi >= 40: + zone = 'neutral' + zone_cn = '中性' + signal = 'neutral' + elif rsi >= 30: + zone = 'weak' + zone_cn = '弱势' + signal = 'bearish' + elif rsi >= 20: + zone = 'oversold' + zone_cn = '超卖' + signal = 'bullish_warning' + else: + zone = 'extreme_oversold' + zone_cn = '极度超卖' + signal = 'bullish' + + return { + 'value': round(rsi, 1), + 'zone': zone, + 'zone_cn': zone_cn, + 'trend': trend, + 'trend_cn': trend_cn, + 'change': round(rsi_change, 1), + 'signal': signal, + } + + @staticmethod + def _analyze_macd(df: pd.DataFrame) -> Dict[str, Any]: + """Analyze MACD indicator""" + if len(df) < 2: + return {'signal': 'unknown'} + + latest = df.iloc[-1] + prev = df.iloc[-2] + + macd_line = float(latest.get('macd', 0)) + signal_line = float(latest.get('macd_signal', 0)) + histogram = float(latest.get('macd_hist', 0)) + prev_histogram = float(prev.get('macd_hist', 0)) + + # Cross detection + if histogram > 0 and prev_histogram <= 0: + cross = 'golden_cross' + cross_cn = '金叉' + elif histogram < 0 and prev_histogram >= 0: + cross = 'death_cross' + cross_cn = '死叉' + else: + cross = 'none' + cross_cn = '无交叉' + + # Histogram momentum + if histogram > 0: + if histogram > prev_histogram: + momentum = 'bullish_expanding' + momentum_cn = '多头动能增强' + else: + momentum = 'bullish_contracting' + momentum_cn = '多头动能减弱' + else: + if histogram < prev_histogram: + momentum = 'bearish_expanding' + momentum_cn = '空头动能增强' + else: + momentum = 'bearish_contracting' + momentum_cn = '空头动能减弱' + + # Signal + if cross == 'golden_cross': + signal = 'bullish' + elif cross == 'death_cross': + signal = 'bearish' + elif histogram > 0 and histogram > prev_histogram: + signal = 'bullish' + elif histogram < 0 and histogram < prev_histogram: + signal = 'bearish' + else: + signal = 'neutral' + + return { + 'macd_line': round(macd_line, 4), + 'signal_line': round(signal_line, 4), + 'histogram': round(histogram, 4), + 'cross': cross, + 'cross_cn': cross_cn, + 'momentum': momentum, + 'momentum_cn': momentum_cn, + 'signal': signal, + } + + @staticmethod + def _analyze_stochastic(df: pd.DataFrame) -> Dict[str, Any]: + """Analyze Stochastic oscillator""" + latest = df.iloc[-1] + + stoch_k = float(latest.get('stoch_k', 50)) + stoch_d = float(latest.get('stoch_d', 50)) + + # Zone + if stoch_k >= 80: + zone = 'overbought' + zone_cn = '超买' + elif stoch_k <= 20: + zone = 'oversold' + zone_cn = '超卖' + else: + zone = 'neutral' + zone_cn = '中性' + + # Cross + if len(df) >= 2: + prev = df.iloc[-2] + prev_k = float(prev.get('stoch_k', 50)) + prev_d = float(prev.get('stoch_d', 50)) + + if stoch_k > stoch_d and prev_k <= prev_d: + cross = 'bullish' + cross_cn = '金叉' + elif stoch_k < stoch_d and prev_k >= prev_d: + cross = 'bearish' + cross_cn = '死叉' + else: + cross = 'none' + cross_cn = '无' + else: + cross = 'unknown' + cross_cn = '未知' + + return { + 'k': round(stoch_k, 1), + 'd': round(stoch_d, 1), + 'zone': zone, + 'zone_cn': zone_cn, + 'cross': cross, + 'cross_cn': cross_cn, + } + + @staticmethod + def _analyze_volume_price(df: pd.DataFrame) -> Dict[str, Any]: + """Analyze volume-price relationship""" + if len(df) < 20: + return {'status': 'insufficient_data'} + + latest = df.iloc[-1] + current_volume = float(latest['volume']) + avg_volume = float(df['volume'].tail(20).mean()) + volume_ratio = current_volume / avg_volume if avg_volume > 0 else 1 + + # Price change + price_change = (float(latest['close']) - float(df.iloc[-2]['close'])) / float(df.iloc[-2]['close']) * 100 + + # OBV analysis + obv = float(latest.get('obv', 0)) + if len(df) >= 10: + obv_10_ago = float(df.iloc[-10].get('obv', 0)) + obv_trend = 'rising' if obv > obv_10_ago else 'falling' if obv < obv_10_ago else 'flat' + obv_trend_cn = '上升' if obv_trend == 'rising' else '下降' if obv_trend == 'falling' else '平稳' + else: + obv_trend = 'unknown' + obv_trend_cn = '未知' + + # Volume status + if volume_ratio > 2.0: + volume_status = 'extremely_high' + volume_status_cn = '异常放量' + elif volume_ratio > 1.5: + volume_status = 'high' + volume_status_cn = '明显放量' + elif volume_ratio > 1.1: + volume_status = 'above_average' + volume_status_cn = '温和放量' + elif volume_ratio < 0.5: + volume_status = 'very_low' + volume_status_cn = '显著缩量' + elif volume_ratio < 0.8: + volume_status = 'low' + volume_status_cn = '温和缩量' + else: + volume_status = 'normal' + volume_status_cn = '正常' + + # Volume-price confirmation + if price_change > 0.5 and volume_ratio > 1.2: + confirmation = 'bullish_confirmed' + confirmation_cn = '量价齐升(看涨确认)' + elif price_change < -0.5 and volume_ratio > 1.2: + confirmation = 'bearish_confirmed' + confirmation_cn = '放量下跌(看跌确认)' + elif price_change > 0.5 and volume_ratio < 0.8: + confirmation = 'bullish_weak' + confirmation_cn = '缩量上涨(动能不足)' + elif price_change < -0.5 and volume_ratio < 0.8: + confirmation = 'bearish_exhaustion' + confirmation_cn = '缩量下跌(卖压减弱)' + else: + confirmation = 'neutral' + confirmation_cn = '量价中性' + + return { + 'current_volume': round(current_volume, 2), + 'avg_volume': round(avg_volume, 2), + 'volume_ratio': round(volume_ratio, 2), + 'volume_status': volume_status, + 'volume_status_cn': volume_status_cn, + 'price_change_pct': round(price_change, 2), + 'obv_trend': obv_trend, + 'obv_trend_cn': obv_trend_cn, + 'confirmation': confirmation, + 'confirmation_cn': confirmation_cn, + } + + @staticmethod + def _detect_divergences(df: pd.DataFrame) -> Dict[str, Any]: + """Detect bullish and bearish divergences""" + if len(df) < 30: + return {'detected': False, 'divergences': []} + + divergences = [] + + # RSI divergence + rsi_divergence = MomentumAnalyzer._check_rsi_divergence(df) + if rsi_divergence: + divergences.append(rsi_divergence) + + # MACD divergence + macd_divergence = MomentumAnalyzer._check_macd_divergence(df) + if macd_divergence: + divergences.append(macd_divergence) + + # OBV divergence + obv_divergence = MomentumAnalyzer._check_obv_divergence(df) + if obv_divergence: + divergences.append(obv_divergence) + + # Determine overall divergence signal + bullish_count = sum(1 for d in divergences if d['type'] == 'bullish') + bearish_count = sum(1 for d in divergences if d['type'] == 'bearish') + + if bullish_count > bearish_count and bullish_count > 0: + overall = 'bullish' + overall_cn = '看涨背离' + elif bearish_count > bullish_count and bearish_count > 0: + overall = 'bearish' + overall_cn = '看跌背离' + else: + overall = 'none' + overall_cn = '无明显背离' + + return { + 'detected': len(divergences) > 0, + 'divergences': divergences, + 'bullish_count': bullish_count, + 'bearish_count': bearish_count, + 'overall': overall, + 'overall_cn': overall_cn, + 'strength': 'strong' if len(divergences) >= 2 else 'moderate' if len(divergences) == 1 else 'none', + } + + @staticmethod + def _check_rsi_divergence(df: pd.DataFrame, lookback: int = 20) -> Optional[Dict[str, Any]]: + """Check for RSI divergence""" + recent = df.tail(lookback) + prices = recent['close'].values + rsi = recent['rsi'].values + + # Find local extrema + price_lows_idx = MomentumAnalyzer._find_local_minima(prices) + price_highs_idx = MomentumAnalyzer._find_local_maxima(prices) + + # Bullish divergence: Price making lower lows, RSI making higher lows + if len(price_lows_idx) >= 2: + idx1, idx2 = price_lows_idx[-2], price_lows_idx[-1] + if prices[idx2] < prices[idx1] and rsi[idx2] > rsi[idx1]: + return { + 'type': 'bullish', + 'type_cn': '看涨背离', + 'indicator': 'RSI', + 'description': '价格创新低但RSI抬高,可能反转向上', + } + + # Bearish divergence: Price making higher highs, RSI making lower highs + if len(price_highs_idx) >= 2: + idx1, idx2 = price_highs_idx[-2], price_highs_idx[-1] + if prices[idx2] > prices[idx1] and rsi[idx2] < rsi[idx1]: + return { + 'type': 'bearish', + 'type_cn': '看跌背离', + 'indicator': 'RSI', + 'description': '价格创新高但RSI走低,可能反转向下', + } + + return None + + @staticmethod + def _check_macd_divergence(df: pd.DataFrame, lookback: int = 20) -> Optional[Dict[str, Any]]: + """Check for MACD histogram divergence""" + recent = df.tail(lookback) + prices = recent['close'].values + macd_hist = recent['macd_hist'].values + + price_lows_idx = MomentumAnalyzer._find_local_minima(prices) + price_highs_idx = MomentumAnalyzer._find_local_maxima(prices) + + # Bullish divergence + if len(price_lows_idx) >= 2: + idx1, idx2 = price_lows_idx[-2], price_lows_idx[-1] + if prices[idx2] < prices[idx1] and macd_hist[idx2] > macd_hist[idx1]: + return { + 'type': 'bullish', + 'type_cn': '看涨背离', + 'indicator': 'MACD', + 'description': '价格创新低但MACD柱状图抬高,动能减弱', + } + + # Bearish divergence + if len(price_highs_idx) >= 2: + idx1, idx2 = price_highs_idx[-2], price_highs_idx[-1] + if prices[idx2] > prices[idx1] and macd_hist[idx2] < macd_hist[idx1]: + return { + 'type': 'bearish', + 'type_cn': '看跌背离', + 'indicator': 'MACD', + 'description': '价格创新高但MACD柱状图走低,动能减弱', + } + + return None + + @staticmethod + def _check_obv_divergence(df: pd.DataFrame, lookback: int = 20) -> Optional[Dict[str, Any]]: + """Check for OBV divergence""" + recent = df.tail(lookback) + prices = recent['close'].values + obv = recent['obv'].values + + price_lows_idx = MomentumAnalyzer._find_local_minima(prices) + price_highs_idx = MomentumAnalyzer._find_local_maxima(prices) + + # Bullish divergence + if len(price_lows_idx) >= 2: + idx1, idx2 = price_lows_idx[-2], price_lows_idx[-1] + if prices[idx2] < prices[idx1] and obv[idx2] > obv[idx1]: + return { + 'type': 'bullish', + 'type_cn': '看涨背离', + 'indicator': 'OBV', + 'description': '价格创新低但OBV抬高,资金流入', + } + + # Bearish divergence + if len(price_highs_idx) >= 2: + idx1, idx2 = price_highs_idx[-2], price_highs_idx[-1] + if prices[idx2] > prices[idx1] and obv[idx2] < obv[idx1]: + return { + 'type': 'bearish', + 'type_cn': '看跌背离', + 'indicator': 'OBV', + 'description': '价格创新高但OBV走低,资金流出', + } + + return None + + @staticmethod + def _find_local_minima(arr: np.ndarray, window: int = 3) -> List[int]: + """Find local minima indices""" + minima = [] + for i in range(window, len(arr) - window): + if all(arr[i] <= arr[i-j] for j in range(1, window+1)) and \ + all(arr[i] <= arr[i+j] for j in range(1, window+1)): + minima.append(i) + return minima + + @staticmethod + def _find_local_maxima(arr: np.ndarray, window: int = 3) -> List[int]: + """Find local maxima indices""" + maxima = [] + for i in range(window, len(arr) - window): + if all(arr[i] >= arr[i-j] for j in range(1, window+1)) and \ + all(arr[i] >= arr[i+j] for j in range(1, window+1)): + maxima.append(i) + return maxima + + @staticmethod + def _assess_overall_momentum( + rsi: Dict[str, Any], + macd: Dict[str, Any], + stoch: Dict[str, Any], + volume: Dict[str, Any], + divergences: Dict[str, Any] + ) -> Dict[str, Any]: + """Assess overall momentum direction and strength""" + # Collect signals + signals = [] + + # RSI signal + rsi_signal = rsi.get('signal', 'neutral') + if rsi_signal in ['bullish', 'bullish_warning']: + signals.append(1) + elif rsi_signal in ['bearish', 'bearish_warning']: + signals.append(-1) + else: + signals.append(0) + + # MACD signal + macd_signal = macd.get('signal', 'neutral') + if macd_signal == 'bullish': + signals.append(1) + elif macd_signal == 'bearish': + signals.append(-1) + else: + signals.append(0) + + # Volume confirmation + confirmation = volume.get('confirmation', 'neutral') + if confirmation in ['bullish_confirmed']: + signals.append(1) + elif confirmation in ['bearish_confirmed']: + signals.append(-1) + else: + signals.append(0) + + # Divergence override + div_overall = divergences.get('overall', 'none') + divergence_weight = 0 + if div_overall == 'bullish': + divergence_weight = 1 + elif div_overall == 'bearish': + divergence_weight = -1 + + # Calculate overall direction + avg_signal = np.mean(signals) + combined = avg_signal * 0.7 + divergence_weight * 0.3 + + if combined > 0.3: + direction = 'bullish' + direction_cn = '看涨' + elif combined < -0.3: + direction = 'bearish' + direction_cn = '看跌' + else: + direction = 'neutral' + direction_cn = '中性' + + # Strength based on signal consistency + bullish_signals = sum(1 for s in signals if s > 0) + bearish_signals = sum(1 for s in signals if s < 0) + max_aligned = max(bullish_signals, bearish_signals) + + if max_aligned == len(signals): + strength = 'strong' + strength_cn = '强' + elif max_aligned >= len(signals) - 1: + strength = 'moderate' + strength_cn = '中等' + else: + strength = 'weak' + strength_cn = '弱' + + return { + 'direction': direction, + 'direction_cn': direction_cn, + 'strength': strength, + 'strength_cn': strength_cn, + 'combined_score': round(combined, 2), + 'signal_alignment': max_aligned / len(signals) if signals else 0, + 'has_divergence_warning': div_overall != 'none', + } + + @staticmethod + def _calculate_mtf_alignment(directions: Dict[str, str]) -> Dict[str, Any]: + """Calculate multi-timeframe momentum alignment""" + if not directions: + return {'status': 'no_data'} + + bullish = sum(1 for d in directions.values() if d == 'bullish') + bearish = sum(1 for d in directions.values() if d == 'bearish') + total = len(directions) + + if bullish >= total * 0.7: + status = 'aligned_bullish' + status_cn = '多周期看涨一致' + elif bearish >= total * 0.7: + status = 'aligned_bearish' + status_cn = '多周期看跌一致' + elif bullish > bearish: + status = 'mixed_bullish' + status_cn = '偏向看涨但有分歧' + elif bearish > bullish: + status = 'mixed_bearish' + status_cn = '偏向看跌但有分歧' + else: + status = 'conflicting' + status_cn = '多空分歧严重' + + return { + 'status': status, + 'status_cn': status_cn, + 'bullish_count': bullish, + 'bearish_count': bearish, + 'total': total, + 'alignment_score': max(bullish, bearish) / total if total > 0 else 0, + } + + @staticmethod + def _find_divergence_confluence(results: Dict[str, Dict]) -> Dict[str, Any]: + """Find divergences appearing across multiple timeframes""" + bullish_tfs = [] + bearish_tfs = [] + + for tf, data in results.items(): + div_info = data.get('divergences', {}) + if div_info.get('overall') == 'bullish': + bullish_tfs.append(tf) + elif div_info.get('overall') == 'bearish': + bearish_tfs.append(tf) + + has_confluence = len(bullish_tfs) >= 2 or len(bearish_tfs) >= 2 + + if len(bullish_tfs) >= 2: + confluence_type = 'bullish' + confluence_type_cn = '多周期看涨背离' + elif len(bearish_tfs) >= 2: + confluence_type = 'bearish' + confluence_type_cn = '多周期看跌背离' + else: + confluence_type = 'none' + confluence_type_cn = '无背离共振' + + return { + 'has_confluence': has_confluence, + 'type': confluence_type, + 'type_cn': confluence_type_cn, + 'bullish_timeframes': bullish_tfs, + 'bearish_timeframes': bearish_tfs, + } + + @staticmethod + def _generate_summary( + rsi: Dict[str, Any], + macd: Dict[str, Any], + volume: Dict[str, Any], + divergences: Dict[str, Any], + assessment: Dict[str, Any] + ) -> str: + """Generate LLM-readable summary""" + parts = [] + + # Overall assessment + direction_cn = assessment.get('direction_cn', '中性') + strength_cn = assessment.get('strength_cn', '弱') + parts.append(f"动能: {direction_cn}({strength_cn})") + + # RSI + rsi_value = rsi.get('value', 50) + rsi_zone_cn = rsi.get('zone_cn', '中性') + parts.append(f"RSI={rsi_value:.0f}({rsi_zone_cn})") + + # MACD + macd_momentum_cn = macd.get('momentum_cn', '') + if macd_momentum_cn: + parts.append(f"MACD: {macd_momentum_cn}") + + # Volume + confirmation_cn = volume.get('confirmation_cn', '') + if confirmation_cn: + parts.append(f"量价: {confirmation_cn}") + + # Divergence + if divergences.get('detected'): + overall_cn = divergences.get('overall_cn', '') + indicators = [d['indicator'] for d in divergences.get('divergences', [])] + parts.append(f"背离: {overall_cn}({','.join(indicators)})") + + return "; ".join(parts) + + @staticmethod + def _generate_mtf_summary( + results: Dict[str, Dict], + alignment: Dict[str, Any], + divergence_confluence: Dict[str, Any] + ) -> str: + """Generate multi-timeframe summary""" + parts = [] + + # Alignment + status_cn = alignment.get('status_cn', '未知') + parts.append(f"动能一致性: {status_cn}") + + # Key timeframe momentum + key_tfs = ['1h', '4h', '1d'] + tf_parts = [] + for tf in key_tfs: + if tf in results: + direction_cn = results[tf]['assessment'].get('direction_cn', '?') + tf_parts.append(f"{tf}={direction_cn}") + if tf_parts: + parts.append(f"关键周期: {', '.join(tf_parts)}") + + # Divergence confluence + if divergence_confluence.get('has_confluence'): + type_cn = divergence_confluence.get('type_cn', '') + parts.append(f"重要: {type_cn}") + + return "; ".join(parts) + + @staticmethod + def _empty_result(timeframe: str) -> Dict[str, Any]: + """Return empty result when data is insufficient""" + return { + 'timeframe': timeframe, + 'error': 'insufficient_data', + 'rsi': {'value': 50, 'zone_cn': '数据不足'}, + 'macd': {'signal': 'unknown'}, + 'stochastic': {}, + 'volume': {}, + 'divergences': {'detected': False}, + 'assessment': { + 'direction': 'unknown', + 'direction_cn': '数据不足', + 'strength': 'unknown', + }, + 'summary': '数据不足,无法分析', + } diff --git a/analysis/orderflow_enhanced.py b/analysis/orderflow_enhanced.py new file mode 100644 index 0000000..3c6f090 --- /dev/null +++ b/analysis/orderflow_enhanced.py @@ -0,0 +1,575 @@ +""" +Enhanced Order Flow Analyzer - Bull/Bear Battle Analysis +Provides structured order flow data for LLM decision making +""" +import logging +from typing import Dict, Any, List, Optional +import numpy as np + +from .config import config + + +logger = logging.getLogger(__name__) + + +class EnhancedOrderFlowAnalyzer: + """ + Enhanced order flow analyzer with bull/bear battle metrics + + Provides: + 1. Order book imbalance analysis + 2. Liquidity wall detection + 3. Bull/Bear battle intensity + 4. Absorption analysis + 5. LLM-ready structured output + """ + + # Price levels for liquidity analysis (% from current price) + LIQUIDITY_LEVELS = [0.001, 0.002, 0.005, 0.01, 0.02, 0.03] # 0.1% to 3% + + # Battle intensity thresholds + STRONG_IMBALANCE = 0.20 # 20% imbalance + MODERATE_IMBALANCE = 0.10 # 10% imbalance + + @staticmethod + def analyze_full_orderflow( + depth_data: Dict[str, Any], + current_price: float + ) -> Dict[str, Any]: + """ + Perform comprehensive order flow analysis + + Args: + depth_data: Order book data with 'bids' and 'asks' + current_price: Current market price + + Returns: + Complete order flow analysis for LLM + """ + if not depth_data or 'bids' not in depth_data or 'asks' not in depth_data: + return EnhancedOrderFlowAnalyzer._empty_result() + + bids = depth_data.get('bids', []) + asks = depth_data.get('asks', []) + + if not bids or not asks: + return EnhancedOrderFlowAnalyzer._empty_result() + + # 1. Basic imbalance + imbalance = EnhancedOrderFlowAnalyzer._calculate_imbalance(bids, asks) + + # 2. Liquidity distribution + liquidity = EnhancedOrderFlowAnalyzer._analyze_liquidity_distribution( + bids, asks, current_price + ) + + # 3. Wall detection + walls = EnhancedOrderFlowAnalyzer._detect_walls(bids, asks, current_price) + + # 4. Bull/Bear battle intensity + battle = EnhancedOrderFlowAnalyzer._analyze_battle_intensity( + bids, asks, current_price, imbalance, liquidity, walls + ) + + # 5. Absorption potential + absorption = EnhancedOrderFlowAnalyzer._analyze_absorption( + bids, asks, current_price + ) + + # 6. Generate summary + summary = EnhancedOrderFlowAnalyzer._generate_summary( + imbalance, battle, walls + ) + + return { + 'current_price': round(current_price, 2), + + # Order book imbalance + 'imbalance': imbalance, + + # Liquidity analysis + 'liquidity': liquidity, + + # Walls (support/resistance from order book) + 'walls': walls, + + # Bull/Bear battle + 'battle': battle, + + # Absorption analysis + 'absorption': absorption, + + # Overall assessment + 'assessment': { + 'direction': battle.get('dominant_side', 'neutral'), + 'direction_cn': EnhancedOrderFlowAnalyzer._direction_to_cn( + battle.get('dominant_side', 'neutral') + ), + 'intensity': battle.get('intensity', 'low'), + 'intensity_cn': battle.get('intensity_cn', '低'), + 'confidence': battle.get('confidence', 0.0), + }, + + # LLM-ready summary + 'summary': summary, + } + + @staticmethod + def _calculate_imbalance( + bids: List, + asks: List + ) -> Dict[str, Any]: + """Calculate order book imbalance metrics""" + # Volume-based + bid_volume = sum(float(qty) for _, qty in bids) + ask_volume = sum(float(qty) for _, qty in asks) + total_volume = bid_volume + ask_volume + + if total_volume == 0: + return {'ratio': 0, 'status': 'unknown'} + + volume_imbalance = (bid_volume - ask_volume) / total_volume + + # Value-based (weighted by price) + bid_value = sum(float(price) * float(qty) for price, qty in bids) + ask_value = sum(float(price) * float(qty) for price, qty in asks) + total_value = bid_value + ask_value + + if total_value > 0: + value_imbalance = (bid_value - ask_value) / total_value + else: + value_imbalance = 0 + + # Combined imbalance (weighted average) + combined = volume_imbalance * 0.4 + value_imbalance * 0.6 + + # Determine status + if combined > EnhancedOrderFlowAnalyzer.STRONG_IMBALANCE: + status = 'strong_buy' + status_cn = '强买盘' + elif combined > EnhancedOrderFlowAnalyzer.MODERATE_IMBALANCE: + status = 'moderate_buy' + status_cn = '偏买盘' + elif combined < -EnhancedOrderFlowAnalyzer.STRONG_IMBALANCE: + status = 'strong_sell' + status_cn = '强卖盘' + elif combined < -EnhancedOrderFlowAnalyzer.MODERATE_IMBALANCE: + status = 'moderate_sell' + status_cn = '偏卖盘' + else: + status = 'balanced' + status_cn = '平衡' + + return { + 'ratio': round(combined, 3), + 'ratio_pct': round(combined * 100, 1), + 'volume_imbalance': round(volume_imbalance, 3), + 'value_imbalance': round(value_imbalance, 3), + 'status': status, + 'status_cn': status_cn, + 'bid_volume': round(bid_volume, 4), + 'ask_volume': round(ask_volume, 4), + 'bid_value_usd': round(bid_value, 2), + 'ask_value_usd': round(ask_value, 2), + } + + @staticmethod + def _analyze_liquidity_distribution( + bids: List, + asks: List, + current_price: float + ) -> Dict[str, Any]: + """Analyze liquidity distribution at different price levels""" + bid_liquidity = {} + ask_liquidity = {} + + for level_pct in EnhancedOrderFlowAnalyzer.LIQUIDITY_LEVELS: + bid_threshold = current_price * (1 - level_pct) + ask_threshold = current_price * (1 + level_pct) + + bid_vol = sum( + float(qty) for price, qty in bids + if float(price) >= bid_threshold + ) + ask_vol = sum( + float(qty) for price, qty in asks + if float(price) <= ask_threshold + ) + + level_key = f"{level_pct * 100:.1f}%" + bid_liquidity[level_key] = round(bid_vol, 4) + ask_liquidity[level_key] = round(ask_vol, 4) + + # Calculate liquidity concentration + # Higher concentration near price = tighter market + near_bid = bid_liquidity.get('0.1%', 0) + far_bid = bid_liquidity.get('2.0%', 0) + near_ask = ask_liquidity.get('0.1%', 0) + far_ask = ask_liquidity.get('2.0%', 0) + + if far_bid > 0: + bid_concentration = near_bid / far_bid + else: + bid_concentration = 0 + + if far_ask > 0: + ask_concentration = near_ask / far_ask + else: + ask_concentration = 0 + + # Spread analysis + best_bid = float(bids[0][0]) if bids else 0 + best_ask = float(asks[0][0]) if asks else 0 + spread = best_ask - best_bid + spread_pct = (spread / current_price * 100) if current_price > 0 else 0 + + return { + 'bid_levels': bid_liquidity, + 'ask_levels': ask_liquidity, + 'bid_concentration': round(bid_concentration, 3), + 'ask_concentration': round(ask_concentration, 3), + 'spread': round(spread, 2), + 'spread_pct': round(spread_pct, 4), + 'spread_status': 'tight' if spread_pct < 0.01 else 'normal' if spread_pct < 0.05 else 'wide', + 'spread_status_cn': '紧密' if spread_pct < 0.01 else '正常' if spread_pct < 0.05 else '宽松', + 'best_bid': round(best_bid, 2), + 'best_ask': round(best_ask, 2), + } + + @staticmethod + def _detect_walls( + bids: List, + asks: List, + current_price: float + ) -> Dict[str, Any]: + """Detect significant liquidity walls in the order book""" + # Calculate average order size for reference + all_orders = bids + asks + if not all_orders: + return {'bid_walls': [], 'ask_walls': []} + + avg_size = np.mean([float(qty) for _, qty in all_orders]) + std_size = np.std([float(qty) for _, qty in all_orders]) + + # Wall threshold: orders > mean + 2*std + wall_threshold = avg_size + 2 * std_size + + bid_walls = [] + ask_walls = [] + + # Find bid walls + for price, qty in bids: + price = float(price) + qty = float(qty) + if qty >= wall_threshold: + distance_pct = (current_price - price) / current_price * 100 + value_usd = price * qty + bid_walls.append({ + 'price': round(price, 2), + 'quantity': round(qty, 4), + 'value_usd': round(value_usd, 2), + 'distance_pct': round(distance_pct, 2), + 'strength': EnhancedOrderFlowAnalyzer._wall_strength(qty, avg_size, std_size), + }) + + # Find ask walls + for price, qty in asks: + price = float(price) + qty = float(qty) + if qty >= wall_threshold: + distance_pct = (price - current_price) / current_price * 100 + value_usd = price * qty + ask_walls.append({ + 'price': round(price, 2), + 'quantity': round(qty, 4), + 'value_usd': round(value_usd, 2), + 'distance_pct': round(distance_pct, 2), + 'strength': EnhancedOrderFlowAnalyzer._wall_strength(qty, avg_size, std_size), + }) + + # Sort by distance and limit + bid_walls.sort(key=lambda x: x['distance_pct']) + ask_walls.sort(key=lambda x: x['distance_pct']) + + # Find nearest significant wall + nearest_support_wall = bid_walls[0] if bid_walls else None + nearest_resistance_wall = ask_walls[0] if ask_walls else None + + return { + 'bid_walls': bid_walls[:5], + 'ask_walls': ask_walls[:5], + 'nearest_support_wall': nearest_support_wall, + 'nearest_resistance_wall': nearest_resistance_wall, + 'total_bid_walls': len(bid_walls), + 'total_ask_walls': len(ask_walls), + 'wall_imbalance': len(bid_walls) - len(ask_walls), + } + + @staticmethod + def _wall_strength(qty: float, avg: float, std: float) -> str: + """Determine wall strength based on standard deviations""" + if std == 0: + return 'moderate' + + z_score = (qty - avg) / std + + if z_score > 4: + return 'massive' + elif z_score > 3: + return 'strong' + elif z_score > 2: + return 'moderate' + else: + return 'weak' + + @staticmethod + def _analyze_battle_intensity( + bids: List, + asks: List, + current_price: float, + imbalance: Dict[str, Any], + liquidity: Dict[str, Any], + walls: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Analyze bull/bear battle intensity + + Combines multiple factors: + - Order book imbalance + - Liquidity distribution + - Wall presence + - Spread tightness + """ + # Component scores (-1 to +1, positive = bullish) + scores = [] + + # 1. Imbalance score + imbalance_ratio = imbalance.get('ratio', 0) + scores.append(('imbalance', imbalance_ratio)) + + # 2. Wall score + wall_imbalance = walls.get('wall_imbalance', 0) + wall_score = np.tanh(wall_imbalance / 5) # Normalize + scores.append(('walls', wall_score)) + + # 3. Liquidity concentration score + bid_conc = liquidity.get('bid_concentration', 0) + ask_conc = liquidity.get('ask_concentration', 0) + conc_diff = bid_conc - ask_conc + conc_score = np.tanh(conc_diff) + scores.append(('concentration', conc_score)) + + # Calculate weighted battle score + weights = {'imbalance': 0.5, 'walls': 0.3, 'concentration': 0.2} + battle_score = sum(score * weights[name] for name, score in scores) + + # Determine dominant side + if battle_score > 0.2: + dominant = 'bulls' + elif battle_score < -0.2: + dominant = 'bears' + else: + dominant = 'neutral' + + # Calculate battle intensity (how actively contested) + # Higher when both sides have significant presence + bid_volume = imbalance.get('bid_volume', 0) + ask_volume = imbalance.get('ask_volume', 0) + + if max(bid_volume, ask_volume) > 0: + volume_balance = min(bid_volume, ask_volume) / max(bid_volume, ask_volume) + else: + volume_balance = 0 + + # Intensity is high when volumes are balanced but pressure is clear + intensity_score = volume_balance * (1 + abs(battle_score)) + + if intensity_score > 0.8: + intensity = 'high' + intensity_cn = '激烈' + elif intensity_score > 0.5: + intensity = 'moderate' + intensity_cn = '中等' + else: + intensity = 'low' + intensity_cn = '平淡' + + # Confidence based on data quality and consistency + confidence = min(1.0, abs(battle_score) * 2) + + return { + 'battle_score': round(battle_score, 3), + 'dominant_side': dominant, + 'intensity': intensity, + 'intensity_cn': intensity_cn, + 'intensity_score': round(intensity_score, 3), + 'confidence': round(confidence, 2), + 'component_scores': { + 'imbalance': round(imbalance_ratio, 3), + 'walls': round(wall_score, 3), + 'concentration': round(conc_score, 3), + }, + 'interpretation': EnhancedOrderFlowAnalyzer._interpret_battle( + dominant, intensity, battle_score + ), + } + + @staticmethod + def _analyze_absorption( + bids: List, + asks: List, + current_price: float + ) -> Dict[str, Any]: + """ + Analyze absorption capacity + + Estimates how much buying/selling pressure can be absorbed + """ + # Calculate total bid/ask capacity within 1% + bid_capacity = sum( + float(price) * float(qty) for price, qty in bids + if float(price) >= current_price * 0.99 + ) + ask_capacity = sum( + float(price) * float(qty) for price, qty in asks + if float(price) <= current_price * 1.01 + ) + + # Estimate absorption capability + if bid_capacity >= 1000000: # $1M+ + bid_strength = 'very_high' + bid_strength_cn = '非常强' + elif bid_capacity >= 500000: # $500K+ + bid_strength = 'high' + bid_strength_cn = '强' + elif bid_capacity >= 100000: # $100K+ + bid_strength = 'moderate' + bid_strength_cn = '中等' + else: + bid_strength = 'low' + bid_strength_cn = '弱' + + if ask_capacity >= 1000000: + ask_strength = 'very_high' + ask_strength_cn = '非常强' + elif ask_capacity >= 500000: + ask_strength = 'high' + ask_strength_cn = '强' + elif ask_capacity >= 100000: + ask_strength = 'moderate' + ask_strength_cn = '中等' + else: + ask_strength = 'low' + ask_strength_cn = '弱' + + return { + 'bid_capacity_usd': round(bid_capacity, 2), + 'ask_capacity_usd': round(ask_capacity, 2), + 'bid_strength': bid_strength, + 'bid_strength_cn': bid_strength_cn, + 'ask_strength': ask_strength, + 'ask_strength_cn': ask_strength_cn, + 'easier_direction': 'up' if ask_capacity < bid_capacity else 'down', + 'easier_direction_cn': '向上' if ask_capacity < bid_capacity else '向下', + } + + @staticmethod + def _interpret_battle( + dominant: str, + intensity: str, + score: float + ) -> str: + """Generate interpretation of bull/bear battle""" + if dominant == 'bulls': + if intensity == 'high': + return "多头占据主导且战斗激烈,买盘积极进场" + elif intensity == 'moderate': + return "多头略占优势,但空头仍有抵抗" + else: + return "多头偏强,但整体交投清淡" + elif dominant == 'bears': + if intensity == 'high': + return "空头占据主导且战斗激烈,卖盘积极出货" + elif intensity == 'moderate': + return "空头略占优势,但多头仍在防守" + else: + return "空头偏强,但整体交投清淡" + else: + if intensity == 'high': + return "多空激烈交战,方向不明" + else: + return "多空势均力敌,市场观望情绪浓厚" + + @staticmethod + def _generate_summary( + imbalance: Dict[str, Any], + battle: Dict[str, Any], + walls: Dict[str, Any] + ) -> str: + """Generate LLM-readable summary""" + parts = [] + + # Imbalance + status_cn = imbalance.get('status_cn', '平衡') + ratio_pct = imbalance.get('ratio_pct', 0) + parts.append(f"订单簿{status_cn} ({ratio_pct:+.1f}%)") + + # Battle + interpretation = battle.get('interpretation', '') + if interpretation: + parts.append(interpretation) + + # Walls + nearest_support = walls.get('nearest_support_wall') + nearest_resistance = walls.get('nearest_resistance_wall') + if nearest_support: + parts.append( + f"最近支撑墙: ${nearest_support['price']:,.0f} " + f"({nearest_support['distance_pct']:.1f}%)" + ) + if nearest_resistance: + parts.append( + f"最近阻力墙: ${nearest_resistance['price']:,.0f} " + f"({nearest_resistance['distance_pct']:.1f}%)" + ) + + return "; ".join(parts) + + @staticmethod + def _direction_to_cn(direction: str) -> str: + """Convert direction to Chinese""" + mapping = { + 'bulls': '多头主导', + 'bears': '空头主导', + 'neutral': '多空平衡', + } + return mapping.get(direction, '未知') + + @staticmethod + def _empty_result() -> Dict[str, Any]: + """Return empty result when data is unavailable""" + return { + 'current_price': 0, + 'imbalance': { + 'ratio': 0, + 'status': 'unknown', + 'status_cn': '数据不可用', + }, + 'liquidity': {}, + 'walls': { + 'bid_walls': [], + 'ask_walls': [], + }, + 'battle': { + 'dominant_side': 'unknown', + 'intensity': 'unknown', + }, + 'absorption': {}, + 'assessment': { + 'direction': 'unknown', + 'direction_cn': '数据不可用', + 'intensity': 'unknown', + 'confidence': 0, + }, + 'summary': '订单流数据不可用', + } diff --git a/analysis/trend_analyzer.py b/analysis/trend_analyzer.py new file mode 100644 index 0000000..887d20e --- /dev/null +++ b/analysis/trend_analyzer.py @@ -0,0 +1,581 @@ +""" +Multi-Timeframe Trend Analyzer - EMA alignment based trend detection +Provides structured trend data for LLM decision making +""" +import logging +from typing import Dict, Any, List, Optional +import pandas as pd +import numpy as np + +from .indicators import TechnicalIndicators +from .config import config + + +logger = logging.getLogger(__name__) + + +class TrendAnalyzer: + """ + Multi-timeframe trend analyzer using EMA alignment + + Analyzes trend direction, strength, and alignment across multiple timeframes + """ + + # EMA periods for trend analysis + EMA_PERIODS = [7, 20, 50, 100, 200] + + # Timeframe hierarchy (shorter to longer) + TIMEFRAME_ORDER = ['5m', '15m', '1h', '4h', '1d', '1w'] + + @staticmethod + def analyze_trend(df: pd.DataFrame, timeframe: str = '5m') -> Dict[str, Any]: + """ + Analyze trend for a single timeframe using EMA alignment + + Args: + df: DataFrame with OHLCV data + timeframe: Timeframe being analyzed + + Returns: + Trend analysis dict for LLM consumption + """ + if df.empty or len(df) < 200: + return TrendAnalyzer._empty_trend_result(timeframe) + + # Ensure indicators are calculated + if 'ema_7' not in df.columns: + df = TechnicalIndicators.add_all_indicators(df) + + latest = df.iloc[-1] + current_price = float(latest['close']) + + # Get EMA values + ema_values = {} + for period in TrendAnalyzer.EMA_PERIODS: + col_name = f'ema_{period}' + if col_name in df.columns: + ema_values[period] = float(latest[col_name]) + else: + # Calculate if missing + ema_values[period] = float(df['close'].ewm(span=period, adjust=False).mean().iloc[-1]) + + # Determine EMA alignment + alignment = TrendAnalyzer._analyze_ema_alignment(ema_values, current_price) + + # Calculate trend strength using ADX + adx = float(latest.get('adx', 0)) + trend_strength = TrendAnalyzer._calculate_trend_strength(adx, alignment) + + # Determine trend direction + trend_direction = TrendAnalyzer._determine_trend_direction(alignment, ema_values) + + # Calculate trend momentum (slope of EMA20) + ema_20_slope = TrendAnalyzer._calculate_ema_slope(df, 'ema_20', periods=5) + + # Detect trend phase + trend_phase = TrendAnalyzer._detect_trend_phase( + df, trend_direction, ema_values, current_price + ) + + return { + 'timeframe': timeframe, + 'direction': trend_direction, # 'uptrend', 'downtrend', 'sideways' + 'direction_cn': TrendAnalyzer._direction_to_chinese(trend_direction), + 'strength': trend_strength, # 'strong', 'moderate', 'weak' + 'strength_cn': TrendAnalyzer._strength_to_chinese(trend_strength), + 'phase': trend_phase, # 'trending', 'pullback', 'reversal', 'consolidation' + 'phase_cn': TrendAnalyzer._phase_to_chinese(trend_phase, trend_direction), + 'ema_alignment': alignment, # 'bullish_aligned', 'bearish_aligned', 'mixed' + 'ema_values': { + 'ema_7': round(ema_values.get(7, 0), 2), + 'ema_20': round(ema_values.get(20, 0), 2), + 'ema_50': round(ema_values.get(50, 0), 2), + 'ema_100': round(ema_values.get(100, 0), 2), + 'ema_200': round(ema_values.get(200, 0), 2), + }, + 'price_vs_emas': { + 'above_ema_7': current_price > ema_values.get(7, current_price), + 'above_ema_20': current_price > ema_values.get(20, current_price), + 'above_ema_50': current_price > ema_values.get(50, current_price), + 'above_ema_100': current_price > ema_values.get(100, current_price), + 'above_ema_200': current_price > ema_values.get(200, current_price), + }, + 'ema_20_slope': round(ema_20_slope, 4), + 'adx': round(adx, 1), + 'current_price': round(current_price, 2), + } + + @staticmethod + def analyze_multi_timeframe_trend( + mtf_data: Dict[str, pd.DataFrame] + ) -> Dict[str, Any]: + """ + Analyze trend across multiple timeframes and determine alignment + + Args: + mtf_data: Dict mapping timeframe to DataFrame + + Returns: + Multi-timeframe trend analysis for LLM + """ + results = {} + trend_directions = {} + + # Analyze each timeframe + for tf in TrendAnalyzer.TIMEFRAME_ORDER: + if tf in mtf_data and not mtf_data[tf].empty: + trend_info = TrendAnalyzer.analyze_trend(mtf_data[tf], tf) + results[tf] = trend_info + trend_directions[tf] = trend_info['direction'] + + # Calculate cross-timeframe alignment + alignment_analysis = TrendAnalyzer._analyze_cross_timeframe_alignment( + trend_directions + ) + + # Determine dominant trend + dominant_trend = TrendAnalyzer._determine_dominant_trend(trend_directions) + + # Generate LLM-readable summary + summary = TrendAnalyzer._generate_trend_summary( + results, alignment_analysis, dominant_trend + ) + + return { + 'timeframes': results, + 'alignment': alignment_analysis, + 'dominant_trend': dominant_trend, + 'summary': summary, + 'trading_bias': TrendAnalyzer._determine_trading_bias( + alignment_analysis, dominant_trend + ), + } + + @staticmethod + def _analyze_ema_alignment( + ema_values: Dict[int, float], + current_price: float + ) -> str: + """ + Analyze EMA alignment pattern + + Perfect bullish: Price > EMA7 > EMA20 > EMA50 > EMA100 > EMA200 + Perfect bearish: Price < EMA7 < EMA20 < EMA50 < EMA100 < EMA200 + """ + ema_7 = ema_values.get(7, current_price) + ema_20 = ema_values.get(20, current_price) + ema_50 = ema_values.get(50, current_price) + ema_100 = ema_values.get(100, current_price) + ema_200 = ema_values.get(200, current_price) + + # Check bullish alignment + bullish_count = 0 + if current_price > ema_7: + bullish_count += 1 + if ema_7 > ema_20: + bullish_count += 1 + if ema_20 > ema_50: + bullish_count += 1 + if ema_50 > ema_100: + bullish_count += 1 + if ema_100 > ema_200: + bullish_count += 1 + + # Check bearish alignment + bearish_count = 0 + if current_price < ema_7: + bearish_count += 1 + if ema_7 < ema_20: + bearish_count += 1 + if ema_20 < ema_50: + bearish_count += 1 + if ema_50 < ema_100: + bearish_count += 1 + if ema_100 < ema_200: + bearish_count += 1 + + if bullish_count >= 4: + return 'bullish_aligned' + elif bearish_count >= 4: + return 'bearish_aligned' + else: + return 'mixed' + + @staticmethod + def _calculate_trend_strength(adx: float, alignment: str) -> str: + """Calculate trend strength based on ADX and alignment""" + # ADX thresholds + if adx > 40: + base_strength = 'very_strong' + elif adx > 25: + base_strength = 'strong' + elif adx > 20: + base_strength = 'moderate' + else: + base_strength = 'weak' + + # Downgrade if alignment is mixed + if alignment == 'mixed': + if base_strength == 'very_strong': + return 'strong' + elif base_strength == 'strong': + return 'moderate' + else: + return 'weak' + + return base_strength + + @staticmethod + def _determine_trend_direction( + alignment: str, + ema_values: Dict[int, float] + ) -> str: + """Determine trend direction from EMA alignment""" + if alignment == 'bullish_aligned': + return 'uptrend' + elif alignment == 'bearish_aligned': + return 'downtrend' + else: + # Check shorter-term EMAs for mixed scenarios + ema_20 = ema_values.get(20, 0) + ema_50 = ema_values.get(50, 0) + ema_100 = ema_values.get(100, 0) + + if ema_20 > ema_50 > ema_100: + return 'uptrend' + elif ema_20 < ema_50 < ema_100: + return 'downtrend' + else: + return 'sideways' + + @staticmethod + def _detect_trend_phase( + df: pd.DataFrame, + direction: str, + ema_values: Dict[int, float], + current_price: float + ) -> str: + """ + Detect current trend phase + + Phases: + - trending: Strong directional movement + - pullback: Temporary counter-trend move within main trend + - reversal: Potential trend change in progress + - consolidation: Range-bound, no clear direction + """ + ema_20 = ema_values.get(20, current_price) + ema_50 = ema_values.get(50, current_price) + + # Get recent price action + if len(df) < 10: + return 'unknown' + + recent_closes = df['close'].tail(10).values + price_5_ago = recent_closes[-5] + + if direction == 'uptrend': + # In uptrend, check if pulling back to EMA + if current_price < ema_20 and current_price > ema_50: + return 'pullback' + elif current_price < ema_50: + return 'reversal' + elif current_price > price_5_ago * 1.01: + return 'trending' + else: + return 'consolidation' + + elif direction == 'downtrend': + # In downtrend, check if bouncing to EMA + if current_price > ema_20 and current_price < ema_50: + return 'pullback' + elif current_price > ema_50: + return 'reversal' + elif current_price < price_5_ago * 0.99: + return 'trending' + else: + return 'consolidation' + else: + return 'consolidation' + + @staticmethod + def _calculate_ema_slope( + df: pd.DataFrame, + ema_col: str, + periods: int = 5 + ) -> float: + """Calculate the slope of an EMA over N periods""" + if ema_col not in df.columns or len(df) < periods: + return 0.0 + + ema_values = df[ema_col].tail(periods).values + current = ema_values[-1] + previous = ema_values[0] + + if previous == 0: + return 0.0 + + # Return percentage change per period + return (current - previous) / previous / periods * 100 + + @staticmethod + def _analyze_cross_timeframe_alignment( + trend_directions: Dict[str, str] + ) -> Dict[str, Any]: + """Analyze alignment across timeframes""" + uptrend_count = sum(1 for d in trend_directions.values() if d == 'uptrend') + downtrend_count = sum(1 for d in trend_directions.values() if d == 'downtrend') + sideways_count = sum(1 for d in trend_directions.values() if d == 'sideways') + total = len(trend_directions) + + if total == 0: + return { + 'status': 'insufficient_data', + 'status_cn': '数据不足', + 'alignment_score': 0.0, + } + + # Calculate alignment score (0 to 1) + max_aligned = max(uptrend_count, downtrend_count) + alignment_score = max_aligned / total if total > 0 else 0 + + # Determine status + if uptrend_count >= total * 0.8: + status = 'strongly_bullish' + status_cn = '强烈看涨一致' + elif uptrend_count >= total * 0.6: + status = 'bullish' + status_cn = '看涨一致' + elif downtrend_count >= total * 0.8: + status = 'strongly_bearish' + status_cn = '强烈看跌一致' + elif downtrend_count >= total * 0.6: + status = 'bearish' + status_cn = '看跌一致' + elif sideways_count >= total * 0.5: + status = 'ranging' + status_cn = '震荡整理' + else: + status = 'mixed' + status_cn = '方向分歧' + + return { + 'status': status, + 'status_cn': status_cn, + 'alignment_score': round(alignment_score, 2), + 'uptrend_count': uptrend_count, + 'downtrend_count': downtrend_count, + 'sideways_count': sideways_count, + 'total_timeframes': total, + } + + @staticmethod + def _determine_dominant_trend(trend_directions: Dict[str, str]) -> Dict[str, Any]: + """ + Determine the dominant trend considering timeframe weight + Longer timeframes have more weight + """ + # Weights for each timeframe (longer = more weight) + weights = { + '5m': 0.05, + '15m': 0.10, + '1h': 0.15, + '4h': 0.20, + '1d': 0.25, + '1w': 0.25, + } + + bullish_score = 0.0 + bearish_score = 0.0 + total_weight = 0.0 + + for tf, direction in trend_directions.items(): + weight = weights.get(tf, 0.1) + total_weight += weight + + if direction == 'uptrend': + bullish_score += weight + elif direction == 'downtrend': + bearish_score += weight + + if total_weight == 0: + return { + 'direction': 'unknown', + 'direction_cn': '未知', + 'confidence': 0.0, + } + + # Normalize scores + bullish_pct = bullish_score / total_weight + bearish_pct = bearish_score / total_weight + + if bullish_pct > 0.6: + direction = 'uptrend' + direction_cn = '上涨' + confidence = bullish_pct + elif bearish_pct > 0.6: + direction = 'downtrend' + direction_cn = '下跌' + confidence = bearish_pct + else: + direction = 'sideways' + direction_cn = '震荡' + confidence = max(1.0 - bullish_pct - bearish_pct, 0.0) + + return { + 'direction': direction, + 'direction_cn': direction_cn, + 'confidence': round(confidence, 2), + 'bullish_score': round(bullish_pct, 2), + 'bearish_score': round(bearish_pct, 2), + } + + @staticmethod + def _determine_trading_bias( + alignment: Dict[str, Any], + dominant: Dict[str, Any] + ) -> Dict[str, Any]: + """Determine trading bias based on trend analysis""" + status = alignment.get('status', 'mixed') + direction = dominant.get('direction', 'sideways') + confidence = dominant.get('confidence', 0.0) + + if status in ['strongly_bullish', 'bullish'] and confidence > 0.6: + bias = 'long' + bias_cn = '偏多' + strength = 'strong' if status == 'strongly_bullish' else 'moderate' + elif status in ['strongly_bearish', 'bearish'] and confidence > 0.6: + bias = 'short' + bias_cn = '偏空' + strength = 'strong' if status == 'strongly_bearish' else 'moderate' + else: + bias = 'neutral' + bias_cn = '中性' + strength = 'weak' + + return { + 'bias': bias, + 'bias_cn': bias_cn, + 'strength': strength, + 'strength_cn': TrendAnalyzer._strength_to_chinese(strength), + 'recommendation': TrendAnalyzer._generate_bias_recommendation( + bias, strength, direction + ), + } + + @staticmethod + def _generate_trend_summary( + results: Dict[str, Dict], + alignment: Dict[str, Any], + dominant: Dict[str, Any] + ) -> str: + """Generate human-readable trend summary for LLM""" + parts = [] + + # Dominant trend + direction_cn = dominant.get('direction_cn', '未知') + confidence = dominant.get('confidence', 0) * 100 + parts.append(f"主导趋势: {direction_cn} (置信度{confidence:.0f}%)") + + # Alignment + status_cn = alignment.get('status_cn', '未知') + parts.append(f"多周期一致性: {status_cn}") + + # Key timeframes + key_tfs = ['1h', '4h', '1d'] + tf_summary = [] + for tf in key_tfs: + if tf in results: + tf_info = results[tf] + tf_summary.append( + f"{tf}={tf_info['direction_cn']}({tf_info['strength_cn']})" + ) + if tf_summary: + parts.append(f"关键周期: {', '.join(tf_summary)}") + + return "; ".join(parts) + + @staticmethod + def _generate_bias_recommendation( + bias: str, + strength: str, + direction: str + ) -> str: + """Generate trading recommendation based on bias""" + if bias == 'long': + if strength == 'strong': + return "强势看涨,可积极寻找做多机会,回调可加仓" + else: + return "偏多格局,可逢低做多,注意控制仓位" + elif bias == 'short': + if strength == 'strong': + return "强势看跌,可积极寻找做空机会,反弹可加仓" + else: + return "偏空格局,可逢高做空,注意控制仓位" + else: + return "方向不明,建议观望或轻仓试探" + + @staticmethod + def _direction_to_chinese(direction: str) -> str: + """Convert direction to Chinese""" + mapping = { + 'uptrend': '上涨', + 'downtrend': '下跌', + 'sideways': '震荡', + 'unknown': '未知', + } + return mapping.get(direction, '未知') + + @staticmethod + def _strength_to_chinese(strength: str) -> str: + """Convert strength to Chinese""" + mapping = { + 'very_strong': '非常强', + 'strong': '强', + 'moderate': '中等', + 'weak': '弱', + } + return mapping.get(strength, '弱') + + @staticmethod + def _phase_to_chinese(phase: str, direction: str) -> str: + """Convert phase to Chinese with context""" + if phase == 'trending': + return '趋势进行中' if direction != 'sideways' else '震荡延续' + elif phase == 'pullback': + if direction == 'uptrend': + return '上涨趋势回调' + elif direction == 'downtrend': + return '下跌趋势反弹' + else: + return '震荡回调' + elif phase == 'reversal': + if direction == 'uptrend': + return '上涨趋势可能反转' + elif direction == 'downtrend': + return '下跌趋势可能反转' + else: + return '可能突破震荡' + elif phase == 'consolidation': + return '盘整蓄力' + else: + return '未知' + + @staticmethod + def _empty_trend_result(timeframe: str) -> Dict[str, Any]: + """Return empty trend result when data is insufficient""" + return { + 'timeframe': timeframe, + 'direction': 'unknown', + 'direction_cn': '数据不足', + 'strength': 'weak', + 'strength_cn': '弱', + 'phase': 'unknown', + 'phase_cn': '未知', + 'ema_alignment': 'unknown', + 'ema_values': {}, + 'price_vs_emas': {}, + 'ema_20_slope': 0.0, + 'adx': 0.0, + 'current_price': 0.0, + } diff --git a/signals/llm_decision.py b/signals/llm_decision.py index 6f84b08..0d91173 100644 --- a/signals/llm_decision.py +++ b/signals/llm_decision.py @@ -139,7 +139,11 @@ class LLMDecisionMaker: ) -> str: """Build trading decision prompt""" - # Extract context elements + # Check if using new dataset format + if 'assessment' in market_context and 'trend' in market_context: + return self._build_prompt_new_format(market_context) + + # Extract context elements (legacy format) market_state = market_context.get('market_state', {}) momentum = market_context.get('momentum', {}) signal_consensus = market_context.get('signal_consensus', 0.5) @@ -837,3 +841,259 @@ class LLMDecisionMaker: 'reasoning': f'Error generating decision: {error_msg}', 'error': error_msg, } + + def _build_prompt_new_format(self, dataset: Dict[str, Any]) -> str: + """ + Build prompt using new LLMDatasetBuilder format + + This prompt is structured around: + 1. Clear market assessment summary + 2. Multi-timeframe trend analysis + 3. Support/Resistance levels + 4. Order flow / bull-bear battle + 5. Momentum and divergence signals + 6. K-line data for pattern recognition + """ + price = dataset.get('price', {}) + current_price = price.get('current', 0) + assessment = dataset.get('assessment', {}) + trend = dataset.get('trend', {}) + sr = dataset.get('support_resistance', {}) + orderflow = dataset.get('orderflow', {}) + momentum = dataset.get('momentum', {}) + recommendations = dataset.get('recommendations', {}) + klines = dataset.get('klines', {}) + + prompt = f"""你是一个专业的加密货币交易分析师。基于以下详细的市场分析数据,提供精确的交易信号。 + +## 当前价格: ${current_price:,.2f} + +## 市场总体评估 + +{dataset.get('summary', '无数据')} + +- 市场偏向: {assessment.get('overall_bias_cn', '未知')} +- 置信度: {assessment.get('confidence', 0):.0f}% +""" + + # Warning if any + if assessment.get('has_warning'): + prompt += f"- **警告**: {assessment.get('warning_cn', '')}\n" + + # Trend Analysis + prompt += "\n## 趋势分析 (EMA均线排列)\n\n" + + dominant = trend.get('dominant_trend', {}) + alignment = trend.get('alignment', {}) + prompt += f"**主导趋势**: {dominant.get('direction_cn', '未知')} (置信度 {dominant.get('confidence', 0)*100:.0f}%)\n" + prompt += f"**多周期一致性**: {alignment.get('status_cn', '未知')} (一致性评分 {alignment.get('alignment_score', 0)*100:.0f}%)\n\n" + + # Per-timeframe trend + trend_tfs = trend.get('timeframes', {}) + if trend_tfs: + prompt += "| 周期 | 趋势 | 强度 | 阶段 | ADX |\n" + prompt += "|------|------|------|------|-----|\n" + for tf in ['5m', '15m', '1h', '4h', '1d', '1w']: + if tf in trend_tfs: + t = trend_tfs[tf] + prompt += f"| {tf} | {t.get('direction_cn', '?')} | {t.get('strength_cn', '?')} | {t.get('phase_cn', '?')} | {t.get('adx', 0):.1f} |\n" + prompt += "\n" + + # Support/Resistance + prompt += "\n## 支撑位/压力位 (斐波那契+多周期共振)\n\n" + + confluence = sr.get('confluence', {}) + supports = confluence.get('supports', []) + resistances = confluence.get('resistances', []) + + if supports: + prompt += "**支撑位** (按强度排序):\n" + for i, s in enumerate(supports[:3], 1): + prompt += f" {i}. ${s.get('level', 0):,.0f} ({s.get('strength_cn', '?')}, {s.get('num_timeframes', 0)}个周期确认, 距离 {s.get('distance_pct', 0):.1f}%)\n" + prompt += "\n" + + if resistances: + prompt += "**压力位** (按强度排序):\n" + for i, r in enumerate(resistances[:3], 1): + prompt += f" {i}. ${r.get('level', 0):,.0f} ({r.get('strength_cn', '?')}, {r.get('num_timeframes', 0)}个周期确认, 距离 {r.get('distance_pct', 0):.1f}%)\n" + prompt += "\n" + + # Order Flow + prompt += "\n## 订单流分析 (多空博弈)\n\n" + + of_assessment = orderflow.get('assessment', {}) + of_imbalance = orderflow.get('imbalance', {}) + of_battle = orderflow.get('battle', {}) + + prompt += f"**多空态势**: {of_assessment.get('direction_cn', '未知')}\n" + prompt += f"**博弈强度**: {of_assessment.get('intensity_cn', '未知')}\n" + prompt += f"**订单簿失衡**: {of_imbalance.get('status_cn', '未知')} ({of_imbalance.get('ratio_pct', 0):+.1f}%)\n" + + if of_battle.get('interpretation'): + prompt += f"**解读**: {of_battle.get('interpretation', '')}\n" + + # Walls + walls = orderflow.get('walls', {}) + if walls.get('nearest_support_wall'): + w = walls['nearest_support_wall'] + prompt += f"**最近支撑墙**: ${w.get('price', 0):,.0f} (距离 {w.get('distance_pct', 0):.2f}%, 强度 {w.get('strength', '?')})\n" + if walls.get('nearest_resistance_wall'): + w = walls['nearest_resistance_wall'] + prompt += f"**最近阻力墙**: ${w.get('price', 0):,.0f} (距离 {w.get('distance_pct', 0):.2f}%, 强度 {w.get('strength', '?')})\n" + + prompt += "\n" + + # Momentum Analysis + prompt += "\n## 动能分析 (RSI/MACD/量价背离)\n\n" + + mom_alignment = momentum.get('alignment', {}) + div_confluence = momentum.get('divergence_confluence', {}) + + prompt += f"**动能一致性**: {mom_alignment.get('status_cn', '未知')}\n" + + if div_confluence.get('has_confluence'): + prompt += f"**重要背离**: {div_confluence.get('type_cn', '')} (周期: {', '.join(div_confluence.get('bullish_timeframes', []) + div_confluence.get('bearish_timeframes', []))})\n" + + # Per-timeframe momentum + mom_tfs = momentum.get('timeframes', {}) + if mom_tfs: + prompt += "\n| 周期 | RSI | RSI状态 | MACD | 量价关系 |\n" + prompt += "|------|-----|---------|------|----------|\n" + for tf in ['15m', '1h', '4h', '1d']: + if tf in mom_tfs: + m = mom_tfs[tf] + rsi = m.get('rsi', {}) + macd = m.get('macd', {}) + vol = m.get('volume', {}) + prompt += f"| {tf} | {rsi.get('value', 50):.0f} | {rsi.get('zone_cn', '?')} | {macd.get('momentum_cn', '?')} | {vol.get('confirmation_cn', '?')} |\n" + prompt += "\n" + + # Recommendations summary + prompt += "\n## 量化分析建议\n\n" + + for tf_key, tf_label in [('short_term', '短期'), ('medium_term', '中期'), ('long_term', '长期')]: + rec = recommendations.get(tf_key, {}) + if rec: + prompt += f"**{tf_label}**: {rec.get('action_cn', '?')} ({rec.get('confidence_cn', '?')}置信度)\n" + prompt += f" - 趋势: {rec.get('trend', '?')} ({rec.get('trend_strength', '?')})\n" + prompt += f" - 动能: {rec.get('momentum', '?')}\n" + if rec.get('reasoning'): + prompt += f" - 理由: {rec.get('reasoning', '')}\n" + prompt += "\n" + + # K-line data (abbreviated for context) + if klines: + prompt += "\n## K线数据 (用于模式识别)\n\n" + prompt += "格式: t=时间, o=开盘, h=最高, l=最低, c=收盘, v=成交量\n\n" + + for tf_key, tf_desc in [('1h', '1小时'), ('4h', '4小时'), ('1d', '日线')]: + if tf_key in klines: + kline_list = klines[tf_key] + if kline_list: + # Show last 24 candles for context + display_klines = kline_list[-24:] + prompt += f"### {tf_desc} (最近{len(display_klines)}根)\n```\n" + for k in display_klines: + prompt += f"{k.get('t', '')} | o:{k.get('o', 0)} h:{k.get('h', 0)} l:{k.get('l', 0)} c:{k.get('c', 0)} v:{k.get('v', 0):.0f}\n" + prompt += "```\n\n" + + # Output requirements + prompt += """ +## 输出要求 + +请严格按照以下JSON Schema输出,所有字段必须存在: + +```json +{ + "signal": "BUY|SELL|HOLD", + "confidence": 0.65, + "risk_level": "LOW|MEDIUM|HIGH", + "market_bias": "BULLISH|BEARISH|NEUTRAL", + + "trades": [ + { + "id": "short_001", + "timeframe": "short", + "status": "ACTIVE|INACTIVE", + "direction": "LONG|SHORT|NONE", + "entry": { + "price_1": 90000.00, + "price_2": 89700.00, + "price_3": 89400.00, + "price_4": 89100.00 + }, + "exit": { + "stop_loss": 88500.00, + "take_profit_1": 91000.00, + "take_profit_2": 92000.00, + "take_profit_3": 93000.00 + }, + "position": { + "size_pct_1": 40, + "size_pct_2": 30, + "size_pct_3": 20, + "size_pct_4": 10 + }, + "risk_reward": 2.5, + "expected_profit_pct": 1.5, + "reasoning": "简要说明" + }, + { + "id": "medium_001", + "timeframe": "medium", + "status": "ACTIVE|INACTIVE", + "direction": "LONG|SHORT|NONE", + "entry": { ... }, + "exit": { ... }, + "position": { ... }, + "risk_reward": 2.0, + "expected_profit_pct": 3.5, + "reasoning": "简要说明" + }, + { + "id": "long_001", + "timeframe": "long", + "status": "ACTIVE|INACTIVE", + "direction": "LONG|SHORT|NONE", + "entry": { ... }, + "exit": { ... }, + "position": { ... }, + "risk_reward": 2.0, + "expected_profit_pct": 8.0, + "reasoning": "简要说明" + } + ], + + "key_levels": { + "support": [89000.00, 88000.00, 86000.00], + "resistance": [92000.00, 94000.00, 96000.00] + }, + + "analysis": { + "trend": "UP|DOWN|SIDEWAYS", + "momentum": "STRONG|WEAK|NEUTRAL", + "volume": "HIGH|LOW|NORMAL", + "summary": "一句话市场总结" + }, + + "key_factors": ["因素1", "因素2", "因素3"] +} +``` + +## 关键规则 + +1. **trades数组必须有3个元素**: short, medium, long +2. **盈利要求**: + - short: expected_profit_pct >= 1.0% 才设 ACTIVE + - medium: expected_profit_pct >= 2.0% 才设 ACTIVE + - long: expected_profit_pct >= 5.0% 才设 ACTIVE +3. **INACTIVE时**: direction="NONE", 所有价格=0 +4. **入场价间距建议**: + - short: 0.3-0.5% + - medium: 0.5-1.0% + - long: 1.0-2.0% +5. **止损止盈设置参考上方支撑压力位** +6. **只输出JSON**,不要有其他文字 +""" + + return prompt