""" Enhanced Order Flow Analyzer - Bull/Bear Battle Analysis Provides structured order flow data for LLM decision making """ import logging from typing import Dict, Any, List, Optional import numpy as np from .config import config logger = logging.getLogger(__name__) class EnhancedOrderFlowAnalyzer: """ Enhanced order flow analyzer with bull/bear battle metrics Provides: 1. Order book imbalance analysis 2. Liquidity wall detection 3. Bull/Bear battle intensity 4. Absorption analysis 5. LLM-ready structured output """ # Price levels for liquidity analysis (% from current price) LIQUIDITY_LEVELS = [0.001, 0.002, 0.005, 0.01, 0.02, 0.03] # 0.1% to 3% # Battle intensity thresholds STRONG_IMBALANCE = 0.20 # 20% imbalance MODERATE_IMBALANCE = 0.10 # 10% imbalance @staticmethod def analyze_full_orderflow( depth_data: Dict[str, Any], current_price: float ) -> Dict[str, Any]: """ Perform comprehensive order flow analysis Args: depth_data: Order book data with 'bids' and 'asks' current_price: Current market price Returns: Complete order flow analysis for LLM """ if not depth_data or 'bids' not in depth_data or 'asks' not in depth_data: return EnhancedOrderFlowAnalyzer._empty_result() bids = depth_data.get('bids', []) asks = depth_data.get('asks', []) if not bids or not asks: return EnhancedOrderFlowAnalyzer._empty_result() # 1. Basic imbalance imbalance = EnhancedOrderFlowAnalyzer._calculate_imbalance(bids, asks) # 2. Liquidity distribution liquidity = EnhancedOrderFlowAnalyzer._analyze_liquidity_distribution( bids, asks, current_price ) # 3. Wall detection walls = EnhancedOrderFlowAnalyzer._detect_walls(bids, asks, current_price) # 4. Bull/Bear battle intensity battle = EnhancedOrderFlowAnalyzer._analyze_battle_intensity( bids, asks, current_price, imbalance, liquidity, walls ) # 5. Absorption potential absorption = EnhancedOrderFlowAnalyzer._analyze_absorption( bids, asks, current_price ) # 6. Generate summary summary = EnhancedOrderFlowAnalyzer._generate_summary( imbalance, battle, walls ) return { 'current_price': round(current_price, 2), # Order book imbalance 'imbalance': imbalance, # Liquidity analysis 'liquidity': liquidity, # Walls (support/resistance from order book) 'walls': walls, # Bull/Bear battle 'battle': battle, # Absorption analysis 'absorption': absorption, # Overall assessment 'assessment': { 'direction': battle.get('dominant_side', 'neutral'), 'direction_cn': EnhancedOrderFlowAnalyzer._direction_to_cn( battle.get('dominant_side', 'neutral') ), 'intensity': battle.get('intensity', 'low'), 'intensity_cn': battle.get('intensity_cn', '低'), 'confidence': battle.get('confidence', 0.0), }, # LLM-ready summary 'summary': summary, } @staticmethod def _calculate_imbalance( bids: List, asks: List ) -> Dict[str, Any]: """Calculate order book imbalance metrics""" # Volume-based bid_volume = sum(float(qty) for _, qty in bids) ask_volume = sum(float(qty) for _, qty in asks) total_volume = bid_volume + ask_volume if total_volume == 0: return {'ratio': 0, 'status': 'unknown'} volume_imbalance = (bid_volume - ask_volume) / total_volume # Value-based (weighted by price) bid_value = sum(float(price) * float(qty) for price, qty in bids) ask_value = sum(float(price) * float(qty) for price, qty in asks) total_value = bid_value + ask_value if total_value > 0: value_imbalance = (bid_value - ask_value) / total_value else: value_imbalance = 0 # Combined imbalance (weighted average) combined = volume_imbalance * 0.4 + value_imbalance * 0.6 # Determine status if combined > EnhancedOrderFlowAnalyzer.STRONG_IMBALANCE: status = 'strong_buy' status_cn = '强买盘' elif combined > EnhancedOrderFlowAnalyzer.MODERATE_IMBALANCE: status = 'moderate_buy' status_cn = '偏买盘' elif combined < -EnhancedOrderFlowAnalyzer.STRONG_IMBALANCE: status = 'strong_sell' status_cn = '强卖盘' elif combined < -EnhancedOrderFlowAnalyzer.MODERATE_IMBALANCE: status = 'moderate_sell' status_cn = '偏卖盘' else: status = 'balanced' status_cn = '平衡' return { 'ratio': round(combined, 3), 'ratio_pct': round(combined * 100, 1), 'volume_imbalance': round(volume_imbalance, 3), 'value_imbalance': round(value_imbalance, 3), 'status': status, 'status_cn': status_cn, 'bid_volume': round(bid_volume, 4), 'ask_volume': round(ask_volume, 4), 'bid_value_usd': round(bid_value, 2), 'ask_value_usd': round(ask_value, 2), } @staticmethod def _analyze_liquidity_distribution( bids: List, asks: List, current_price: float ) -> Dict[str, Any]: """Analyze liquidity distribution at different price levels""" bid_liquidity = {} ask_liquidity = {} for level_pct in EnhancedOrderFlowAnalyzer.LIQUIDITY_LEVELS: bid_threshold = current_price * (1 - level_pct) ask_threshold = current_price * (1 + level_pct) bid_vol = sum( float(qty) for price, qty in bids if float(price) >= bid_threshold ) ask_vol = sum( float(qty) for price, qty in asks if float(price) <= ask_threshold ) level_key = f"{level_pct * 100:.1f}%" bid_liquidity[level_key] = round(bid_vol, 4) ask_liquidity[level_key] = round(ask_vol, 4) # Calculate liquidity concentration # Higher concentration near price = tighter market near_bid = bid_liquidity.get('0.1%', 0) far_bid = bid_liquidity.get('2.0%', 0) near_ask = ask_liquidity.get('0.1%', 0) far_ask = ask_liquidity.get('2.0%', 0) if far_bid > 0: bid_concentration = near_bid / far_bid else: bid_concentration = 0 if far_ask > 0: ask_concentration = near_ask / far_ask else: ask_concentration = 0 # Spread analysis best_bid = float(bids[0][0]) if bids else 0 best_ask = float(asks[0][0]) if asks else 0 spread = best_ask - best_bid spread_pct = (spread / current_price * 100) if current_price > 0 else 0 return { 'bid_levels': bid_liquidity, 'ask_levels': ask_liquidity, 'bid_concentration': round(bid_concentration, 3), 'ask_concentration': round(ask_concentration, 3), 'spread': round(spread, 2), 'spread_pct': round(spread_pct, 4), 'spread_status': 'tight' if spread_pct < 0.01 else 'normal' if spread_pct < 0.05 else 'wide', 'spread_status_cn': '紧密' if spread_pct < 0.01 else '正常' if spread_pct < 0.05 else '宽松', 'best_bid': round(best_bid, 2), 'best_ask': round(best_ask, 2), } @staticmethod def _detect_walls( bids: List, asks: List, current_price: float ) -> Dict[str, Any]: """Detect significant liquidity walls in the order book""" # Calculate average order size for reference all_orders = bids + asks if not all_orders: return {'bid_walls': [], 'ask_walls': []} avg_size = np.mean([float(qty) for _, qty in all_orders]) std_size = np.std([float(qty) for _, qty in all_orders]) # Wall threshold: orders > mean + 2*std wall_threshold = avg_size + 2 * std_size bid_walls = [] ask_walls = [] # Find bid walls for price, qty in bids: price = float(price) qty = float(qty) if qty >= wall_threshold: distance_pct = (current_price - price) / current_price * 100 value_usd = price * qty bid_walls.append({ 'price': round(price, 2), 'quantity': round(qty, 4), 'value_usd': round(value_usd, 2), 'distance_pct': round(distance_pct, 2), 'strength': EnhancedOrderFlowAnalyzer._wall_strength(qty, avg_size, std_size), }) # Find ask walls for price, qty in asks: price = float(price) qty = float(qty) if qty >= wall_threshold: distance_pct = (price - current_price) / current_price * 100 value_usd = price * qty ask_walls.append({ 'price': round(price, 2), 'quantity': round(qty, 4), 'value_usd': round(value_usd, 2), 'distance_pct': round(distance_pct, 2), 'strength': EnhancedOrderFlowAnalyzer._wall_strength(qty, avg_size, std_size), }) # Sort by distance and limit bid_walls.sort(key=lambda x: x['distance_pct']) ask_walls.sort(key=lambda x: x['distance_pct']) # Find nearest significant wall nearest_support_wall = bid_walls[0] if bid_walls else None nearest_resistance_wall = ask_walls[0] if ask_walls else None return { 'bid_walls': bid_walls[:5], 'ask_walls': ask_walls[:5], 'nearest_support_wall': nearest_support_wall, 'nearest_resistance_wall': nearest_resistance_wall, 'total_bid_walls': len(bid_walls), 'total_ask_walls': len(ask_walls), 'wall_imbalance': len(bid_walls) - len(ask_walls), } @staticmethod def _wall_strength(qty: float, avg: float, std: float) -> str: """Determine wall strength based on standard deviations""" if std == 0: return 'moderate' z_score = (qty - avg) / std if z_score > 4: return 'massive' elif z_score > 3: return 'strong' elif z_score > 2: return 'moderate' else: return 'weak' @staticmethod def _analyze_battle_intensity( bids: List, asks: List, current_price: float, imbalance: Dict[str, Any], liquidity: Dict[str, Any], walls: Dict[str, Any] ) -> Dict[str, Any]: """ Analyze bull/bear battle intensity Combines multiple factors: - Order book imbalance - Liquidity distribution - Wall presence - Spread tightness """ # Component scores (-1 to +1, positive = bullish) scores = [] # 1. Imbalance score imbalance_ratio = imbalance.get('ratio', 0) scores.append(('imbalance', imbalance_ratio)) # 2. Wall score wall_imbalance = walls.get('wall_imbalance', 0) wall_score = np.tanh(wall_imbalance / 5) # Normalize scores.append(('walls', wall_score)) # 3. Liquidity concentration score bid_conc = liquidity.get('bid_concentration', 0) ask_conc = liquidity.get('ask_concentration', 0) conc_diff = bid_conc - ask_conc conc_score = np.tanh(conc_diff) scores.append(('concentration', conc_score)) # Calculate weighted battle score weights = {'imbalance': 0.5, 'walls': 0.3, 'concentration': 0.2} battle_score = sum(score * weights[name] for name, score in scores) # Determine dominant side if battle_score > 0.2: dominant = 'bulls' elif battle_score < -0.2: dominant = 'bears' else: dominant = 'neutral' # Calculate battle intensity (how actively contested) # Higher when both sides have significant presence bid_volume = imbalance.get('bid_volume', 0) ask_volume = imbalance.get('ask_volume', 0) if max(bid_volume, ask_volume) > 0: volume_balance = min(bid_volume, ask_volume) / max(bid_volume, ask_volume) else: volume_balance = 0 # Intensity is high when volumes are balanced but pressure is clear intensity_score = volume_balance * (1 + abs(battle_score)) if intensity_score > 0.8: intensity = 'high' intensity_cn = '激烈' elif intensity_score > 0.5: intensity = 'moderate' intensity_cn = '中等' else: intensity = 'low' intensity_cn = '平淡' # Confidence based on data quality and consistency confidence = min(1.0, abs(battle_score) * 2) return { 'battle_score': round(battle_score, 3), 'dominant_side': dominant, 'intensity': intensity, 'intensity_cn': intensity_cn, 'intensity_score': round(intensity_score, 3), 'confidence': round(confidence, 2), 'component_scores': { 'imbalance': round(imbalance_ratio, 3), 'walls': round(wall_score, 3), 'concentration': round(conc_score, 3), }, 'interpretation': EnhancedOrderFlowAnalyzer._interpret_battle( dominant, intensity, battle_score ), } @staticmethod def _analyze_absorption( bids: List, asks: List, current_price: float ) -> Dict[str, Any]: """ Analyze absorption capacity Estimates how much buying/selling pressure can be absorbed """ # Calculate total bid/ask capacity within 1% bid_capacity = sum( float(price) * float(qty) for price, qty in bids if float(price) >= current_price * 0.99 ) ask_capacity = sum( float(price) * float(qty) for price, qty in asks if float(price) <= current_price * 1.01 ) # Estimate absorption capability if bid_capacity >= 1000000: # $1M+ bid_strength = 'very_high' bid_strength_cn = '非常强' elif bid_capacity >= 500000: # $500K+ bid_strength = 'high' bid_strength_cn = '强' elif bid_capacity >= 100000: # $100K+ bid_strength = 'moderate' bid_strength_cn = '中等' else: bid_strength = 'low' bid_strength_cn = '弱' if ask_capacity >= 1000000: ask_strength = 'very_high' ask_strength_cn = '非常强' elif ask_capacity >= 500000: ask_strength = 'high' ask_strength_cn = '强' elif ask_capacity >= 100000: ask_strength = 'moderate' ask_strength_cn = '中等' else: ask_strength = 'low' ask_strength_cn = '弱' return { 'bid_capacity_usd': round(bid_capacity, 2), 'ask_capacity_usd': round(ask_capacity, 2), 'bid_strength': bid_strength, 'bid_strength_cn': bid_strength_cn, 'ask_strength': ask_strength, 'ask_strength_cn': ask_strength_cn, 'easier_direction': 'up' if ask_capacity < bid_capacity else 'down', 'easier_direction_cn': '向上' if ask_capacity < bid_capacity else '向下', } @staticmethod def _interpret_battle( dominant: str, intensity: str, score: float ) -> str: """Generate interpretation of bull/bear battle""" if dominant == 'bulls': if intensity == 'high': return "多头占据主导且战斗激烈,买盘积极进场" elif intensity == 'moderate': return "多头略占优势,但空头仍有抵抗" else: return "多头偏强,但整体交投清淡" elif dominant == 'bears': if intensity == 'high': return "空头占据主导且战斗激烈,卖盘积极出货" elif intensity == 'moderate': return "空头略占优势,但多头仍在防守" else: return "空头偏强,但整体交投清淡" else: if intensity == 'high': return "多空激烈交战,方向不明" else: return "多空势均力敌,市场观望情绪浓厚" @staticmethod def _generate_summary( imbalance: Dict[str, Any], battle: Dict[str, Any], walls: Dict[str, Any] ) -> str: """Generate LLM-readable summary""" parts = [] # Imbalance status_cn = imbalance.get('status_cn', '平衡') ratio_pct = imbalance.get('ratio_pct', 0) parts.append(f"订单簿{status_cn} ({ratio_pct:+.1f}%)") # Battle interpretation = battle.get('interpretation', '') if interpretation: parts.append(interpretation) # Walls nearest_support = walls.get('nearest_support_wall') nearest_resistance = walls.get('nearest_resistance_wall') if nearest_support: parts.append( f"最近支撑墙: ${nearest_support['price']:,.0f} " f"({nearest_support['distance_pct']:.1f}%)" ) if nearest_resistance: parts.append( f"最近阻力墙: ${nearest_resistance['price']:,.0f} " f"({nearest_resistance['distance_pct']:.1f}%)" ) return "; ".join(parts) @staticmethod def _direction_to_cn(direction: str) -> str: """Convert direction to Chinese""" mapping = { 'bulls': '多头主导', 'bears': '空头主导', 'neutral': '多空平衡', } return mapping.get(direction, '未知') @staticmethod def _empty_result() -> Dict[str, Any]: """Return empty result when data is unavailable""" return { 'current_price': 0, 'imbalance': { 'ratio': 0, 'status': 'unknown', 'status_cn': '数据不可用', }, 'liquidity': {}, 'walls': { 'bid_walls': [], 'ask_walls': [], }, 'battle': { 'dominant_side': 'unknown', 'intensity': 'unknown', }, 'absorption': {}, 'assessment': { 'direction': 'unknown', 'direction_cn': '数据不可用', 'intensity': 'unknown', 'confidence': 0, }, 'summary': '订单流数据不可用', }