tradusai/analysis/orderflow.py
2025-12-02 22:54:03 +08:00

308 lines
10 KiB
Python

"""
Order flow analysis based on order book depth data
"""
import logging
from typing import Dict, Any, List, Tuple, Optional
import numpy as np
from .config import config
logger = logging.getLogger(__name__)
class OrderFlowAnalyzer:
"""Analyze order flow and liquidity from order book data"""
@staticmethod
def analyze_orderbook_imbalance(depth_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Calculate order book imbalance (buy vs sell pressure)
Args:
depth_data: Dict with 'bids' and 'asks' arrays
Returns:
Dict with imbalance metrics
"""
if not depth_data or 'bids' not in depth_data or 'asks' not in depth_data:
return {'imbalance': 0, 'status': '未知'}
bids = depth_data['bids']
asks = depth_data['asks']
if not bids or not asks:
return {'imbalance': 0, 'status': '未知'}
# Calculate total bid/ask volume
total_bid_volume = sum(float(qty) for _, qty in bids)
total_ask_volume = sum(float(qty) for _, qty in asks)
# Calculate total bid/ask value (price * quantity)
total_bid_value = sum(float(price) * float(qty) for price, qty in bids)
total_ask_value = sum(float(price) * float(qty) for price, qty in asks)
# Imbalance ratio: (bids - asks) / (bids + asks)
# Positive = buy pressure, Negative = sell pressure
volume_imbalance = (total_bid_volume - total_ask_volume) / (total_bid_volume + total_ask_volume) if (total_bid_volume + total_ask_volume) > 0 else 0
value_imbalance = (total_bid_value - total_ask_value) / (total_bid_value + total_ask_value) if (total_bid_value + total_ask_value) > 0 else 0
# Average the two imbalance measures
imbalance = (volume_imbalance + value_imbalance) / 2
# Determine status
if imbalance > config.ORDERBOOK_IMBALANCE_THRESHOLD:
status = '强买方主导'
pressure = 'buy'
elif imbalance < -config.ORDERBOOK_IMBALANCE_THRESHOLD:
status = '强卖方主导'
pressure = 'sell'
elif imbalance > 0.05:
status = '买方偏强'
pressure = 'buy_slight'
elif imbalance < -0.05:
status = '卖方偏强'
pressure = 'sell_slight'
else:
status = '买卖平衡'
pressure = 'neutral'
return {
'imbalance': round(imbalance, 3),
'imbalance_pct': round(imbalance * 100, 1),
'status': status,
'pressure': pressure,
'total_bid_volume': round(total_bid_volume, 2),
'total_ask_volume': round(total_ask_volume, 2),
'total_bid_value': round(total_bid_value, 2),
'total_ask_value': round(total_ask_value, 2),
}
@staticmethod
def analyze_liquidity_depth(depth_data: Dict[str, Any], current_price: float) -> Dict[str, Any]:
"""
Analyze liquidity at different price levels
Args:
depth_data: Dict with 'bids' and 'asks' arrays
current_price: Current market price
Returns:
Dict with liquidity metrics
"""
if not depth_data or 'bids' not in depth_data or 'asks' not in depth_data:
return {}
bids = depth_data['bids']
asks = depth_data['asks']
if not bids or not asks:
return {}
# Calculate cumulative liquidity at different distances from mid price
bid_liquidity_levels = OrderFlowAnalyzer._calculate_liquidity_at_levels(
bids, current_price, side='bid'
)
ask_liquidity_levels = OrderFlowAnalyzer._calculate_liquidity_at_levels(
asks, current_price, side='ask'
)
# Find bid and ask walls (largest orders)
bid_wall = OrderFlowAnalyzer._find_largest_order(bids)
ask_wall = OrderFlowAnalyzer._find_largest_order(asks)
# Calculate spread
best_bid = float(bids[0][0]) if bids else 0
best_ask = float(asks[0][0]) if asks else 0
spread = best_ask - best_bid
spread_pct = (spread / current_price * 100) if current_price > 0 else 0
return {
'bid_liquidity': bid_liquidity_levels,
'ask_liquidity': ask_liquidity_levels,
'bid_wall': bid_wall,
'ask_wall': ask_wall,
'spread': round(spread, 2),
'spread_pct': round(spread_pct, 4),
'best_bid': round(best_bid, 2),
'best_ask': round(best_ask, 2),
}
@staticmethod
def _calculate_liquidity_at_levels(
orders: List[List[float]],
current_price: float,
side: str
) -> Dict[str, float]:
"""
Calculate cumulative liquidity at 0.1%, 0.5%, 1%, 2% price levels
Args:
orders: List of [price, quantity] pairs
current_price: Current market price
side: 'bid' or 'ask'
Returns:
Dict with liquidity at different levels
"""
levels = [0.001, 0.005, 0.01, 0.02] # 0.1%, 0.5%, 1%, 2%
liquidity = {f'{level*100}%': 0 for level in levels}
for price, qty in orders:
price = float(price)
qty = float(qty)
# Calculate distance from current price
if side == 'bid':
distance = (current_price - price) / current_price
else: # ask
distance = (price - current_price) / current_price
# Add to appropriate levels
for level in levels:
if distance <= level:
liquidity[f'{level*100}%'] += qty
# Round values
return {k: round(v, 2) for k, v in liquidity.items()}
@staticmethod
def _find_largest_order(orders: List[List[float]]) -> Optional[Dict[str, Any]]:
"""
Find the largest order (potential wall)
Returns:
Dict with price, quantity, and value of largest order
"""
if not orders:
return None
largest = max(orders, key=lambda x: float(x[1]))
price = float(largest[0])
qty = float(largest[1])
return {
'price': round(price, 2),
'quantity': round(qty, 2),
'value': round(price * qty, 2),
}
@staticmethod
def detect_large_orders(depth_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Detect large orders that could indicate institutional activity
Returns:
Dict with large order detection results
"""
if not depth_data or 'bids' not in depth_data or 'asks' not in depth_data:
return {'has_large_orders': False}
bids = depth_data['bids']
asks = depth_data['asks']
large_bids = []
large_asks = []
# Find orders exceeding the large order threshold
for price, qty in bids:
price = float(price)
qty = float(qty)
value = price * qty
if value >= config.LARGE_ORDER_THRESHOLD_USD:
large_bids.append({
'price': round(price, 2),
'quantity': round(qty, 2),
'value': round(value, 2),
})
for price, qty in asks:
price = float(price)
qty = float(qty)
value = price * qty
if value >= config.LARGE_ORDER_THRESHOLD_USD:
large_asks.append({
'price': round(price, 2),
'quantity': round(qty, 2),
'value': round(value, 2),
})
has_large_orders = len(large_bids) > 0 or len(large_asks) > 0
# Determine dominant side
if len(large_bids) > len(large_asks) * 1.5:
dominant_side = '买方'
elif len(large_asks) > len(large_bids) * 1.5:
dominant_side = '卖方'
else:
dominant_side = '均衡'
return {
'has_large_orders': has_large_orders,
'large_bids_count': len(large_bids),
'large_asks_count': len(large_asks),
'large_bids': large_bids[:3], # Top 3 largest bids
'large_asks': large_asks[:3], # Top 3 largest asks
'dominant_side': dominant_side,
}
@staticmethod
def calculate_orderflow_strength(
imbalance: Dict[str, Any],
large_orders: Dict[str, Any],
liquidity: Dict[str, Any]
) -> Dict[str, Any]:
"""
Calculate overall order flow strength and direction
Args:
imbalance: Orderbook imbalance metrics
large_orders: Large order detection results
liquidity: Liquidity depth metrics
Returns:
Dict with orderflow strength metrics
"""
# Get imbalance percentage
imbalance_pct = imbalance.get('imbalance_pct', 0)
pressure = imbalance.get('pressure', 'neutral')
# Check for large order bias
large_bid_count = large_orders.get('large_bids_count', 0)
large_ask_count = large_orders.get('large_asks_count', 0)
large_order_bias = large_bid_count - large_ask_count
# Check spread (tight spread = healthy market)
spread_pct = liquidity.get('spread_pct', 0)
spread_status = '紧密' if spread_pct < 0.01 else '正常' if spread_pct < 0.05 else '宽松'
# Calculate composite strength score (-100 to +100)
# Positive = bullish, Negative = bearish
strength_score = imbalance_pct + (large_order_bias * 5)
strength_score = max(-100, min(100, strength_score)) # Clamp to [-100, 100]
# Determine strength category
if strength_score > 30:
strength = '强烈看涨'
elif strength_score > 15:
strength = '看涨'
elif strength_score > 5:
strength = '偏涨'
elif strength_score < -30:
strength = '强烈看跌'
elif strength_score < -15:
strength = '看跌'
elif strength_score < -5:
strength = '偏跌'
else:
strength = '中性'
return {
'strength_score': round(strength_score, 1),
'strength': strength,
'pressure': pressure,
'spread_status': spread_status,
'large_order_bias': large_order_bias,
'summary': f"{imbalance.get('status', '')} ({imbalance_pct:+.1f}%)",
}