""" LLM Market Dataset Builder - Integrates all analysis modules Generates structured, human-readable market data for LLM decision making """ import logging from typing import Dict, Any, List, Optional from datetime import datetime import pandas as pd from .data_reader import MarketDataReader from .indicators import TechnicalIndicators from .trend_analyzer import TrendAnalyzer from .fibonacci import FibonacciAnalyzer from .orderflow_enhanced import EnhancedOrderFlowAnalyzer from .momentum_analyzer import MomentumAnalyzer from .config import config logger = logging.getLogger(__name__) # K-line limits per timeframe KLINE_LIMITS = { '5m': 288, # 1 day '15m': 96, # 1 day '1h': 72, # 3 days '4h': 42, # 7 days '1d': 60, # 60 days '1w': 52, # 1 year } class LLMDatasetBuilder: """ Builds comprehensive market dataset for LLM analysis Integrates: 1. Multi-timeframe trend analysis (EMA alignment) 2. Fibonacci support/resistance levels 3. Order flow / bull-bear battle analysis 4. Momentum and divergence analysis 5. K-line data for pattern recognition """ def __init__(self, symbol: str = "BTCUSDT"): self.symbol = symbol self.data_reader = MarketDataReader(symbol=symbol) def build_complete_dataset(self, symbol: str = None) -> Dict[str, Any]: """ Build complete market dataset for LLM consumption Args: symbol: Trading symbol (uses instance symbol if None) Returns: Comprehensive market analysis dict """ symbol = symbol or self.symbol start_time = datetime.now() try: # 1. Fetch multi-timeframe K-line data mtf_data = self._fetch_multi_timeframe_data(symbol) if not mtf_data or '5m' not in mtf_data: logger.error("Failed to fetch K-line data") return self._empty_dataset() # Get current price current_price = float(mtf_data['5m'].iloc[-1]['close']) # 2. Fetch order book data depth_data = self.data_reader.read_latest_depth() # 3. Build analysis sections # 3.1 Trend Analysis trend_analysis = TrendAnalyzer.analyze_multi_timeframe_trend(mtf_data) # 3.2 Support/Resistance (Fibonacci) sr_analysis = FibonacciAnalyzer.analyze_multi_timeframe_levels(mtf_data) # 3.3 Order Flow Analysis if depth_data: orderflow_analysis = EnhancedOrderFlowAnalyzer.analyze_full_orderflow( depth_data, current_price ) else: orderflow_analysis = EnhancedOrderFlowAnalyzer._empty_result() # 3.4 Momentum Analysis momentum_analysis = MomentumAnalyzer.analyze_multi_timeframe_momentum(mtf_data) # 3.5 K-line data for LLM kline_data = self._build_kline_data(mtf_data) # 4. Generate overall market assessment market_assessment = self._generate_market_assessment( trend_analysis, sr_analysis, orderflow_analysis, momentum_analysis ) # 5. Generate trading recommendations recommendations = self._generate_recommendations( trend_analysis, sr_analysis, orderflow_analysis, momentum_analysis, current_price ) # 6. Build final dataset processing_time = (datetime.now() - start_time).total_seconds() dataset = { 'metadata': { 'symbol': symbol, 'timestamp': datetime.now().isoformat(), 'processing_time_sec': round(processing_time, 2), 'data_quality': self._assess_data_quality(mtf_data, depth_data), }, 'price': { 'current': round(current_price, 2), 'change_24h': self._calculate_price_change(mtf_data.get('1h'), 24), 'change_1h': self._calculate_price_change(mtf_data.get('5m'), 12), }, # Main analysis sections 'trend': trend_analysis, 'support_resistance': sr_analysis, 'orderflow': orderflow_analysis, 'momentum': momentum_analysis, # K-line data for pattern recognition 'klines': kline_data, # Overall assessment 'assessment': market_assessment, # Trading recommendations 'recommendations': recommendations, # Concise summary for LLM 'summary': self._generate_summary( market_assessment, recommendations, current_price ), } logger.info( f"Built LLM dataset for {symbol}: " f"trend={trend_analysis.get('dominant_trend', {}).get('direction_cn', '?')}, " f"processing={processing_time:.2f}s" ) return dataset except Exception as e: logger.error(f"Error building LLM dataset: {e}", exc_info=True) return self._empty_dataset() def _fetch_multi_timeframe_data(self, symbol: str) -> Dict[str, pd.DataFrame]: """Fetch K-line data for all timeframes""" data = {} for tf, limit in KLINE_LIMITS.items(): df = self.data_reader.fetch_historical_klines_from_api( symbol=symbol, interval=tf, limit=limit ) if not df.empty: # Add technical indicators df = TechnicalIndicators.add_all_indicators(df) data[tf] = df return data def _build_kline_data(self, mtf_data: Dict[str, pd.DataFrame]) -> Dict[str, List[Dict]]: """Build K-line data in compact format for LLM""" kline_data = {} for tf, df in mtf_data.items(): if df.empty: continue # Get appropriate number of candles limit = KLINE_LIMITS.get(tf, 50) df_limited = df.tail(limit) # Convert to compact format klines = [] for idx, row in df_limited.iterrows(): kline = { 't': idx.strftime('%Y-%m-%d %H:%M'), 'o': round(row['open'], 2), 'h': round(row['high'], 2), 'l': round(row['low'], 2), 'c': round(row['close'], 2), 'v': round(row['volume'], 4), } klines.append(kline) kline_data[tf] = klines return kline_data def _generate_market_assessment( self, trend: Dict[str, Any], sr: Dict[str, Any], orderflow: Dict[str, Any], momentum: Dict[str, Any] ) -> Dict[str, Any]: """Generate overall market assessment from all analyses""" # Trend bias dominant_trend = trend.get('dominant_trend', {}) trend_direction = dominant_trend.get('direction', 'sideways') trend_confidence = dominant_trend.get('confidence', 0) # Momentum bias momentum_alignment = momentum.get('alignment', {}) momentum_status = momentum_alignment.get('status', 'conflicting') # Orderflow bias orderflow_direction = orderflow.get('assessment', {}).get('direction', 'neutral') # Calculate overall bias score (-1 to +1) bias_score = 0 bias_factors = [] # Trend contribution (40%) if trend_direction == 'uptrend': bias_score += 0.4 * trend_confidence bias_factors.append(('trend', 0.4 * trend_confidence)) elif trend_direction == 'downtrend': bias_score -= 0.4 * trend_confidence bias_factors.append(('trend', -0.4 * trend_confidence)) # Momentum contribution (30%) if momentum_status in ['aligned_bullish']: bias_score += 0.3 bias_factors.append(('momentum', 0.3)) elif momentum_status in ['aligned_bearish']: bias_score -= 0.3 bias_factors.append(('momentum', -0.3)) elif momentum_status == 'mixed_bullish': bias_score += 0.15 bias_factors.append(('momentum', 0.15)) elif momentum_status == 'mixed_bearish': bias_score -= 0.15 bias_factors.append(('momentum', -0.15)) # Orderflow contribution (30%) if orderflow_direction == 'bulls': bias_score += 0.3 bias_factors.append(('orderflow', 0.3)) elif orderflow_direction == 'bears': bias_score -= 0.3 bias_factors.append(('orderflow', -0.3)) # Determine overall bias if bias_score > 0.4: overall_bias = 'strongly_bullish' overall_bias_cn = '强烈看涨' elif bias_score > 0.2: overall_bias = 'bullish' overall_bias_cn = '看涨' elif bias_score > 0.05: overall_bias = 'slightly_bullish' overall_bias_cn = '略偏多' elif bias_score < -0.4: overall_bias = 'strongly_bearish' overall_bias_cn = '强烈看跌' elif bias_score < -0.2: overall_bias = 'bearish' overall_bias_cn = '看跌' elif bias_score < -0.05: overall_bias = 'slightly_bearish' overall_bias_cn = '略偏空' else: overall_bias = 'neutral' overall_bias_cn = '中性观望' # Check for divergences (warning signal) div_confluence = momentum.get('divergence_confluence', {}) has_warning = div_confluence.get('has_confluence', False) warning_type = div_confluence.get('type', 'none') return { 'overall_bias': overall_bias, 'overall_bias_cn': overall_bias_cn, 'bias_score': round(bias_score, 2), 'confidence': round(abs(bias_score) * 100, 0), 'bias_factors': bias_factors, 'has_warning': has_warning, 'warning': warning_type if has_warning else None, 'warning_cn': div_confluence.get('type_cn') if has_warning else None, 'components': { 'trend': { 'direction': trend_direction, 'direction_cn': dominant_trend.get('direction_cn', '?'), 'confidence': trend_confidence, }, 'momentum': { 'status': momentum_status, 'status_cn': momentum_alignment.get('status_cn', '?'), }, 'orderflow': { 'direction': orderflow_direction, 'direction_cn': orderflow.get('assessment', {}).get('direction_cn', '?'), }, }, } def _generate_recommendations( self, trend: Dict[str, Any], sr: Dict[str, Any], orderflow: Dict[str, Any], momentum: Dict[str, Any], current_price: float ) -> Dict[str, Any]: """Generate trading recommendations for different timeframes""" # Get key levels strongest_support = sr.get('confluence', {}).get('supports', [{}])[0] if sr.get('confluence', {}).get('supports') else None strongest_resistance = sr.get('confluence', {}).get('resistances', [{}])[0] if sr.get('confluence', {}).get('resistances') else None # Get trend info dominant = trend.get('dominant_trend', {}) trading_bias = trend.get('trading_bias', {}) # Short-term (5m-1h) recommendation short_term = self._generate_timeframe_recommendation( 'short', trend.get('timeframes', {}).get('15m', {}), momentum.get('timeframes', {}).get('15m', {}), orderflow, current_price, strongest_support, strongest_resistance ) # Medium-term (4h-1d) recommendation medium_term = self._generate_timeframe_recommendation( 'medium', trend.get('timeframes', {}).get('4h', {}), momentum.get('timeframes', {}).get('4h', {}), orderflow, current_price, strongest_support, strongest_resistance ) # Long-term (1d-1w) recommendation long_term = self._generate_timeframe_recommendation( 'long', trend.get('timeframes', {}).get('1d', {}), momentum.get('timeframes', {}).get('1d', {}), orderflow, current_price, strongest_support, strongest_resistance ) return { 'short_term': short_term, 'medium_term': medium_term, 'long_term': long_term, 'key_levels': { 'strongest_support': strongest_support.get('level') if strongest_support else None, 'strongest_resistance': strongest_resistance.get('level') if strongest_resistance else None, 'support_distance_pct': strongest_support.get('distance_pct') if strongest_support else None, 'resistance_distance_pct': strongest_resistance.get('distance_pct') if strongest_resistance else None, }, 'overall': trading_bias.get('recommendation', '方向不明,建议观望'), } def _generate_timeframe_recommendation( self, timeframe_type: str, trend_info: Dict[str, Any], momentum_info: Dict[str, Any], orderflow: Dict[str, Any], current_price: float, support: Optional[Dict], resistance: Optional[Dict] ) -> Dict[str, Any]: """Generate recommendation for a specific timeframe type""" trend_direction = trend_info.get('direction', 'sideways') trend_strength = trend_info.get('strength', 'weak') momentum_direction = momentum_info.get('assessment', {}).get('direction', 'neutral') # Determine action if trend_direction == 'uptrend' and momentum_direction in ['bullish', 'neutral']: action = 'long' action_cn = '做多' if trend_strength in ['strong', 'very_strong']: confidence = 'high' else: confidence = 'moderate' elif trend_direction == 'downtrend' and momentum_direction in ['bearish', 'neutral']: action = 'short' action_cn = '做空' if trend_strength in ['strong', 'very_strong']: confidence = 'high' else: confidence = 'moderate' else: action = 'wait' action_cn = '观望' confidence = 'low' # Entry zone if action == 'long' and support: entry_zone = { 'ideal': support.get('level'), 'current': current_price, 'description': f"理想入场: ${support.get('level', 0):,.0f}附近 (支撑区)" } elif action == 'short' and resistance: entry_zone = { 'ideal': resistance.get('level'), 'current': current_price, 'description': f"理想入场: ${resistance.get('level', 0):,.0f}附近 (阻力区)" } else: entry_zone = { 'ideal': current_price, 'current': current_price, 'description': "无明确入场区域" } return { 'action': action, 'action_cn': action_cn, 'confidence': confidence, 'confidence_cn': '高' if confidence == 'high' else '中等' if confidence == 'moderate' else '低', 'trend': trend_info.get('direction_cn', '?'), 'trend_strength': trend_info.get('strength_cn', '?'), 'momentum': momentum_info.get('assessment', {}).get('direction_cn', '?'), 'entry_zone': entry_zone, 'reasoning': self._generate_reasoning( trend_direction, trend_strength, momentum_direction, action ), } def _generate_reasoning( self, trend_direction: str, trend_strength: str, momentum_direction: str, action: str ) -> str: """Generate reasoning for recommendation""" if action == 'long': return f"趋势{self._cn(trend_direction)}({self._cn(trend_strength)}), 动能{self._cn(momentum_direction)}, 适合做多" elif action == 'short': return f"趋势{self._cn(trend_direction)}({self._cn(trend_strength)}), 动能{self._cn(momentum_direction)}, 适合做空" else: if trend_direction == 'sideways': return "市场处于震荡盘整,方向不明,建议观望" else: return f"趋势与动能存在分歧,等待信号确认" def _cn(self, value: str) -> str: """Convert English value to Chinese""" mapping = { 'uptrend': '上涨', 'downtrend': '下跌', 'sideways': '震荡', 'strong': '强', 'moderate': '中等', 'weak': '弱', 'very_strong': '非常强', 'bullish': '看涨', 'bearish': '看跌', 'neutral': '中性', } return mapping.get(value, value) def _generate_summary( self, assessment: Dict[str, Any], recommendations: Dict[str, Any], current_price: float ) -> str: """Generate concise summary for LLM""" parts = [] # Current price parts.append(f"当前价格: ${current_price:,.2f}") # Overall bias bias_cn = assessment.get('overall_bias_cn', '?') confidence = assessment.get('confidence', 0) parts.append(f"市场偏向: {bias_cn} (置信度{confidence:.0f}%)") # Warning if any if assessment.get('has_warning'): warning_cn = assessment.get('warning_cn', '') parts.append(f"警告: {warning_cn}") # Key levels key_levels = recommendations.get('key_levels', {}) if key_levels.get('strongest_support'): parts.append(f"关键支撑: ${key_levels['strongest_support']:,.0f}") if key_levels.get('strongest_resistance'): parts.append(f"关键阻力: ${key_levels['strongest_resistance']:,.0f}") # Recommendation overall = recommendations.get('overall', '') if overall: parts.append(f"建议: {overall}") return "; ".join(parts) def _calculate_price_change( self, df: Optional[pd.DataFrame], periods: int ) -> Optional[Dict[str, float]]: """Calculate price change over N periods""" if df is None or len(df) < periods: return None current = float(df.iloc[-1]['close']) past = float(df.iloc[-periods]['close']) change = current - past change_pct = (change / past) * 100 if past > 0 else 0 return { 'absolute': round(change, 2), 'percentage': round(change_pct, 2), } def _assess_data_quality( self, mtf_data: Dict[str, pd.DataFrame], depth_data: Optional[Dict] ) -> Dict[str, Any]: """Assess quality of available data""" quality = { 'klines': {}, 'orderbook': depth_data is not None, 'overall': 'good', } missing_tfs = [] for tf in ['5m', '15m', '1h', '4h', '1d']: if tf in mtf_data and not mtf_data[tf].empty: quality['klines'][tf] = { 'available': True, 'count': len(mtf_data[tf]), } else: quality['klines'][tf] = {'available': False} missing_tfs.append(tf) if len(missing_tfs) > 2: quality['overall'] = 'poor' elif len(missing_tfs) > 0 or not depth_data: quality['overall'] = 'fair' return quality def _empty_dataset(self) -> Dict[str, Any]: """Return empty dataset when data is unavailable""" return { 'metadata': { 'symbol': self.symbol, 'timestamp': datetime.now().isoformat(), 'error': 'Failed to fetch data', }, 'price': {'current': 0}, 'trend': {}, 'support_resistance': {}, 'orderflow': {}, 'momentum': {}, 'klines': {}, 'assessment': { 'overall_bias': 'unknown', 'overall_bias_cn': '数据不可用', }, 'recommendations': {}, 'summary': '数据获取失败,无法进行分析', }