tradusai/analysis/market_structure.py
2025-12-02 22:54:03 +08:00

291 lines
9.3 KiB
Python

"""
Market structure analysis: trend, support/resistance, breakouts
"""
import logging
from typing import List, Dict, Any, Tuple, Optional
import pandas as pd
import numpy as np
from .config import config
logger = logging.getLogger(__name__)
class MarketStructureAnalyzer:
"""Analyze market structure, S/R levels, and trend"""
@staticmethod
def identify_trend(df: pd.DataFrame) -> Dict[str, Any]:
"""
Identify trend direction and strength
Returns:
Dict with trend info
"""
if df.empty or len(df) < 50:
return {'direction': 'unknown', 'strength': 0}
latest = df.iloc[-1]
# EMA comparison
ema_20 = latest.get(f'ema_{config.EMA_FAST}', 0)
ema_50 = latest.get(f'ema_{config.EMA_SLOW}', 0)
adx = latest.get('adx', 0)
# Determine direction
if ema_20 > ema_50:
direction = '上涨'
if adx > 25:
strength = 'strong'
elif adx > 20:
strength = 'moderate'
else:
strength = 'weak'
elif ema_20 < ema_50:
direction = '下跌'
if adx > 25:
strength = 'strong'
elif adx > 20:
strength = 'moderate'
else:
strength = 'weak'
else:
direction = '震荡'
strength = 'weak'
# Detect trend phase
rsi = latest.get('rsi', 50)
if direction == '上涨':
if rsi > 70:
phase = '上涨中的强势回调'
elif rsi > 55:
phase = '上涨中'
else:
phase = '上涨后回调'
elif direction == '下跌':
if rsi < 30:
phase = '下跌中的超卖反弹'
elif rsi < 45:
phase = '下跌中'
else:
phase = '下跌后反弹'
else:
phase = '震荡盘整'
return {
'direction': direction,
'strength': strength,
'phase': phase,
'adx': round(adx, 1),
'ema_alignment': 'bullish' if ema_20 > ema_50 else 'bearish',
}
@staticmethod
def find_support_resistance(df: pd.DataFrame, current_price: float) -> Dict[str, Any]:
"""
Find support and resistance levels
Args:
df: DataFrame with OHLCV data
current_price: Current market price
Returns:
Dict with S/R levels
"""
if df.empty or len(df) < config.SR_LOOKBACK:
return {'support': [], 'resistance': []}
lookback_df = df.tail(config.SR_LOOKBACK)
# Find local highs (resistance) and lows (support)
highs = MarketStructureAnalyzer._find_local_extrema(lookback_df['high'], 'high')
lows = MarketStructureAnalyzer._find_local_extrema(lookback_df['low'], 'low')
# Cluster similar levels
support_levels = MarketStructureAnalyzer._cluster_levels(lows, current_price)
resistance_levels = MarketStructureAnalyzer._cluster_levels(highs, current_price)
# Filter to levels near current price (±5%)
support = [s for s in support_levels if s < current_price and s > current_price * 0.95]
resistance = [r for r in resistance_levels if r > current_price and r < current_price * 1.05]
# Sort and get closest
support = sorted(support, reverse=True)[:3] # Top 3 support levels
resistance = sorted(resistance)[:3] # Top 3 resistance levels
return {
'support': [round(s, 2) for s in support],
'resistance': [round(r, 2) for r in resistance],
'nearest_support': round(support[0], 2) if support else None,
'nearest_resistance': round(resistance[0], 2) if resistance else None,
}
@staticmethod
def _find_local_extrema(series: pd.Series, kind: str) -> List[float]:
"""Find local highs or lows"""
extrema = []
for i in range(2, len(series) - 2):
if kind == 'high':
# Local high
if (series.iloc[i] > series.iloc[i-1] and
series.iloc[i] > series.iloc[i-2] and
series.iloc[i] > series.iloc[i+1] and
series.iloc[i] > series.iloc[i+2]):
extrema.append(series.iloc[i])
else:
# Local low
if (series.iloc[i] < series.iloc[i-1] and
series.iloc[i] < series.iloc[i-2] and
series.iloc[i] < series.iloc[i+1] and
series.iloc[i] < series.iloc[i+2]):
extrema.append(series.iloc[i])
return extrema
@staticmethod
def _cluster_levels(levels: List[float], reference_price: float) -> List[float]:
"""Cluster similar price levels"""
if not levels:
return []
tolerance = reference_price * config.SR_TOLERANCE
clustered = []
sorted_levels = sorted(levels)
current_cluster = [sorted_levels[0]]
for level in sorted_levels[1:]:
if abs(level - current_cluster[-1]) < tolerance:
current_cluster.append(level)
else:
# Average the cluster
clustered.append(np.mean(current_cluster))
current_cluster = [level]
# Add last cluster
if current_cluster:
clustered.append(np.mean(current_cluster))
return clustered
@staticmethod
def detect_breakout(df: pd.DataFrame, sr_levels: Dict[str, Any]) -> Dict[str, Any]:
"""
Detect if price has broken through S/R levels
Returns:
Dict with breakout info
"""
if df.empty or len(df) < 5:
return {'has_breakout': False}
latest = df.iloc[-1]
current_price = latest['close']
recent_high = df.tail(20)['high'].max()
recent_low = df.tail(20)['low'].min()
# Check resistance breakout
resistance = sr_levels.get('nearest_resistance')
if resistance and current_price > resistance:
# Confirm breakout (price closed above resistance)
return {
'has_breakout': True,
'type': 'resistance_breakout',
'level': resistance,
'confirmation': '价格突破压力位' if latest['close'] > resistance else '未确认',
}
# Check support breakdown
support = sr_levels.get('nearest_support')
if support and current_price < support:
return {
'has_breakout': True,
'type': 'support_breakdown',
'level': support,
'confirmation': '价格跌破支撑位' if latest['close'] < support else '未确认',
}
# Check if approaching key level
if resistance and abs(current_price - resistance) / resistance < 0.005: # Within 0.5%
return {
'has_breakout': False,
'approaching': 'resistance',
'level': resistance,
'distance_pct': round((resistance - current_price) / current_price * 100, 2),
}
if support and abs(current_price - support) / support < 0.005:
return {
'has_breakout': False,
'approaching': 'support',
'level': support,
'distance_pct': round((current_price - support) / current_price * 100, 2),
}
return {'has_breakout': False}
@staticmethod
def calculate_momentum(df: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate momentum indicators
Returns:
Dict with momentum analysis
"""
if df.empty:
return {}
latest = df.iloc[-1]
prev = df.iloc[-2] if len(df) > 1 else latest
rsi = latest.get('rsi', 50)
macd_hist = latest.get('macd_hist', 0)
prev_macd_hist = prev.get('macd_hist', 0)
# RSI status
if rsi > 70:
rsi_status = '超买'
elif rsi > 60:
rsi_status = '强势'
elif rsi > 50:
rsi_status = '中性偏强'
elif rsi > 40:
rsi_status = '中性偏弱'
elif rsi > 30:
rsi_status = '弱势'
else:
rsi_status = '超卖'
# MACD signal
if macd_hist > 0 and prev_macd_hist <= 0:
macd_signal = '金叉'
elif macd_hist < 0 and prev_macd_hist >= 0:
macd_signal = '死叉'
elif macd_hist > 0:
if macd_hist > prev_macd_hist:
macd_signal = '金叉扩大'
else:
macd_signal = '金叉收窄'
else:
if abs(macd_hist) > abs(prev_macd_hist):
macd_signal = '死叉扩大'
else:
macd_signal = '死叉收窄'
# RSI trend
if len(df) >= 5:
rsi_5_ago = df.iloc[-5].get('rsi', 50)
rsi_trend = '上升中' if rsi > rsi_5_ago else '下降中'
else:
rsi_trend = '中性'
return {
'rsi': round(rsi, 1),
'rsi_status': rsi_status,
'rsi_trend': rsi_trend,
'macd_signal': macd_signal,
'macd_hist': round(macd_hist, 4),
}