""" Order flow analysis based on order book depth data """ import logging from typing import Dict, Any, List, Tuple, Optional import numpy as np from .config import config logger = logging.getLogger(__name__) class OrderFlowAnalyzer: """Analyze order flow and liquidity from order book data""" @staticmethod def analyze_orderbook_imbalance(depth_data: Dict[str, Any]) -> Dict[str, Any]: """ Calculate order book imbalance (buy vs sell pressure) Args: depth_data: Dict with 'bids' and 'asks' arrays Returns: Dict with imbalance metrics """ if not depth_data or 'bids' not in depth_data or 'asks' not in depth_data: return {'imbalance': 0, 'status': '未知'} bids = depth_data['bids'] asks = depth_data['asks'] if not bids or not asks: return {'imbalance': 0, 'status': '未知'} # Calculate total bid/ask volume total_bid_volume = sum(float(qty) for _, qty in bids) total_ask_volume = sum(float(qty) for _, qty in asks) # Calculate total bid/ask value (price * quantity) total_bid_value = sum(float(price) * float(qty) for price, qty in bids) total_ask_value = sum(float(price) * float(qty) for price, qty in asks) # Imbalance ratio: (bids - asks) / (bids + asks) # Positive = buy pressure, Negative = sell pressure volume_imbalance = (total_bid_volume - total_ask_volume) / (total_bid_volume + total_ask_volume) if (total_bid_volume + total_ask_volume) > 0 else 0 value_imbalance = (total_bid_value - total_ask_value) / (total_bid_value + total_ask_value) if (total_bid_value + total_ask_value) > 0 else 0 # Average the two imbalance measures imbalance = (volume_imbalance + value_imbalance) / 2 # Determine status if imbalance > config.ORDERBOOK_IMBALANCE_THRESHOLD: status = '强买方主导' pressure = 'buy' elif imbalance < -config.ORDERBOOK_IMBALANCE_THRESHOLD: status = '强卖方主导' pressure = 'sell' elif imbalance > 0.05: status = '买方偏强' pressure = 'buy_slight' elif imbalance < -0.05: status = '卖方偏强' pressure = 'sell_slight' else: status = '买卖平衡' pressure = 'neutral' return { 'imbalance': round(imbalance, 3), 'imbalance_pct': round(imbalance * 100, 1), 'status': status, 'pressure': pressure, 'total_bid_volume': round(total_bid_volume, 2), 'total_ask_volume': round(total_ask_volume, 2), 'total_bid_value': round(total_bid_value, 2), 'total_ask_value': round(total_ask_value, 2), } @staticmethod def analyze_liquidity_depth(depth_data: Dict[str, Any], current_price: float) -> Dict[str, Any]: """ Analyze liquidity at different price levels Args: depth_data: Dict with 'bids' and 'asks' arrays current_price: Current market price Returns: Dict with liquidity metrics """ if not depth_data or 'bids' not in depth_data or 'asks' not in depth_data: return {} bids = depth_data['bids'] asks = depth_data['asks'] if not bids or not asks: return {} # Calculate cumulative liquidity at different distances from mid price bid_liquidity_levels = OrderFlowAnalyzer._calculate_liquidity_at_levels( bids, current_price, side='bid' ) ask_liquidity_levels = OrderFlowAnalyzer._calculate_liquidity_at_levels( asks, current_price, side='ask' ) # Find bid and ask walls (largest orders) bid_wall = OrderFlowAnalyzer._find_largest_order(bids) ask_wall = OrderFlowAnalyzer._find_largest_order(asks) # Calculate spread best_bid = float(bids[0][0]) if bids else 0 best_ask = float(asks[0][0]) if asks else 0 spread = best_ask - best_bid spread_pct = (spread / current_price * 100) if current_price > 0 else 0 return { 'bid_liquidity': bid_liquidity_levels, 'ask_liquidity': ask_liquidity_levels, 'bid_wall': bid_wall, 'ask_wall': ask_wall, 'spread': round(spread, 2), 'spread_pct': round(spread_pct, 4), 'best_bid': round(best_bid, 2), 'best_ask': round(best_ask, 2), } @staticmethod def _calculate_liquidity_at_levels( orders: List[List[float]], current_price: float, side: str ) -> Dict[str, float]: """ Calculate cumulative liquidity at 0.1%, 0.5%, 1%, 2% price levels Args: orders: List of [price, quantity] pairs current_price: Current market price side: 'bid' or 'ask' Returns: Dict with liquidity at different levels """ levels = [0.001, 0.005, 0.01, 0.02] # 0.1%, 0.5%, 1%, 2% liquidity = {f'{level*100}%': 0 for level in levels} for price, qty in orders: price = float(price) qty = float(qty) # Calculate distance from current price if side == 'bid': distance = (current_price - price) / current_price else: # ask distance = (price - current_price) / current_price # Add to appropriate levels for level in levels: if distance <= level: liquidity[f'{level*100}%'] += qty # Round values return {k: round(v, 2) for k, v in liquidity.items()} @staticmethod def _find_largest_order(orders: List[List[float]]) -> Optional[Dict[str, Any]]: """ Find the largest order (potential wall) Returns: Dict with price, quantity, and value of largest order """ if not orders: return None largest = max(orders, key=lambda x: float(x[1])) price = float(largest[0]) qty = float(largest[1]) return { 'price': round(price, 2), 'quantity': round(qty, 2), 'value': round(price * qty, 2), } @staticmethod def detect_large_orders(depth_data: Dict[str, Any]) -> Dict[str, Any]: """ Detect large orders that could indicate institutional activity Returns: Dict with large order detection results """ if not depth_data or 'bids' not in depth_data or 'asks' not in depth_data: return {'has_large_orders': False} bids = depth_data['bids'] asks = depth_data['asks'] large_bids = [] large_asks = [] # Find orders exceeding the large order threshold for price, qty in bids: price = float(price) qty = float(qty) value = price * qty if value >= config.LARGE_ORDER_THRESHOLD_USD: large_bids.append({ 'price': round(price, 2), 'quantity': round(qty, 2), 'value': round(value, 2), }) for price, qty in asks: price = float(price) qty = float(qty) value = price * qty if value >= config.LARGE_ORDER_THRESHOLD_USD: large_asks.append({ 'price': round(price, 2), 'quantity': round(qty, 2), 'value': round(value, 2), }) has_large_orders = len(large_bids) > 0 or len(large_asks) > 0 # Determine dominant side if len(large_bids) > len(large_asks) * 1.5: dominant_side = '买方' elif len(large_asks) > len(large_bids) * 1.5: dominant_side = '卖方' else: dominant_side = '均衡' return { 'has_large_orders': has_large_orders, 'large_bids_count': len(large_bids), 'large_asks_count': len(large_asks), 'large_bids': large_bids[:3], # Top 3 largest bids 'large_asks': large_asks[:3], # Top 3 largest asks 'dominant_side': dominant_side, } @staticmethod def calculate_orderflow_strength( imbalance: Dict[str, Any], large_orders: Dict[str, Any], liquidity: Dict[str, Any] ) -> Dict[str, Any]: """ Calculate overall order flow strength and direction Args: imbalance: Orderbook imbalance metrics large_orders: Large order detection results liquidity: Liquidity depth metrics Returns: Dict with orderflow strength metrics """ # Get imbalance percentage imbalance_pct = imbalance.get('imbalance_pct', 0) pressure = imbalance.get('pressure', 'neutral') # Check for large order bias large_bid_count = large_orders.get('large_bids_count', 0) large_ask_count = large_orders.get('large_asks_count', 0) large_order_bias = large_bid_count - large_ask_count # Check spread (tight spread = healthy market) spread_pct = liquidity.get('spread_pct', 0) spread_status = '紧密' if spread_pct < 0.01 else '正常' if spread_pct < 0.05 else '宽松' # Calculate composite strength score (-100 to +100) # Positive = bullish, Negative = bearish strength_score = imbalance_pct + (large_order_bias * 5) strength_score = max(-100, min(100, strength_score)) # Clamp to [-100, 100] # Determine strength category if strength_score > 30: strength = '强烈看涨' elif strength_score > 15: strength = '看涨' elif strength_score > 5: strength = '偏涨' elif strength_score < -30: strength = '强烈看跌' elif strength_score < -15: strength = '看跌' elif strength_score < -5: strength = '偏跌' else: strength = '中性' return { 'strength_score': round(strength_score, 1), 'strength': strength, 'pressure': pressure, 'spread_status': spread_status, 'large_order_bias': large_order_bias, 'summary': f"{imbalance.get('status', '')} ({imbalance_pct:+.1f}%)", }