291 lines
9.3 KiB
Python
291 lines
9.3 KiB
Python
"""
|
|
Market structure analysis: trend, support/resistance, breakouts
|
|
"""
|
|
import logging
|
|
from typing import List, Dict, Any, Tuple, Optional
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from .config import config
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MarketStructureAnalyzer:
|
|
"""Analyze market structure, S/R levels, and trend"""
|
|
|
|
@staticmethod
|
|
def identify_trend(df: pd.DataFrame) -> Dict[str, Any]:
|
|
"""
|
|
Identify trend direction and strength
|
|
|
|
Returns:
|
|
Dict with trend info
|
|
"""
|
|
if df.empty or len(df) < 50:
|
|
return {'direction': 'unknown', 'strength': 0}
|
|
|
|
latest = df.iloc[-1]
|
|
|
|
# EMA comparison
|
|
ema_20 = latest.get(f'ema_{config.EMA_FAST}', 0)
|
|
ema_50 = latest.get(f'ema_{config.EMA_SLOW}', 0)
|
|
adx = latest.get('adx', 0)
|
|
|
|
# Determine direction
|
|
if ema_20 > ema_50:
|
|
direction = '上涨'
|
|
if adx > 25:
|
|
strength = 'strong'
|
|
elif adx > 20:
|
|
strength = 'moderate'
|
|
else:
|
|
strength = 'weak'
|
|
elif ema_20 < ema_50:
|
|
direction = '下跌'
|
|
if adx > 25:
|
|
strength = 'strong'
|
|
elif adx > 20:
|
|
strength = 'moderate'
|
|
else:
|
|
strength = 'weak'
|
|
else:
|
|
direction = '震荡'
|
|
strength = 'weak'
|
|
|
|
# Detect trend phase
|
|
rsi = latest.get('rsi', 50)
|
|
if direction == '上涨':
|
|
if rsi > 70:
|
|
phase = '上涨中的强势回调'
|
|
elif rsi > 55:
|
|
phase = '上涨中'
|
|
else:
|
|
phase = '上涨后回调'
|
|
elif direction == '下跌':
|
|
if rsi < 30:
|
|
phase = '下跌中的超卖反弹'
|
|
elif rsi < 45:
|
|
phase = '下跌中'
|
|
else:
|
|
phase = '下跌后反弹'
|
|
else:
|
|
phase = '震荡盘整'
|
|
|
|
return {
|
|
'direction': direction,
|
|
'strength': strength,
|
|
'phase': phase,
|
|
'adx': round(adx, 1),
|
|
'ema_alignment': 'bullish' if ema_20 > ema_50 else 'bearish',
|
|
}
|
|
|
|
@staticmethod
|
|
def find_support_resistance(df: pd.DataFrame, current_price: float) -> Dict[str, Any]:
|
|
"""
|
|
Find support and resistance levels
|
|
|
|
Args:
|
|
df: DataFrame with OHLCV data
|
|
current_price: Current market price
|
|
|
|
Returns:
|
|
Dict with S/R levels
|
|
"""
|
|
if df.empty or len(df) < config.SR_LOOKBACK:
|
|
return {'support': [], 'resistance': []}
|
|
|
|
lookback_df = df.tail(config.SR_LOOKBACK)
|
|
|
|
# Find local highs (resistance) and lows (support)
|
|
highs = MarketStructureAnalyzer._find_local_extrema(lookback_df['high'], 'high')
|
|
lows = MarketStructureAnalyzer._find_local_extrema(lookback_df['low'], 'low')
|
|
|
|
# Cluster similar levels
|
|
support_levels = MarketStructureAnalyzer._cluster_levels(lows, current_price)
|
|
resistance_levels = MarketStructureAnalyzer._cluster_levels(highs, current_price)
|
|
|
|
# Filter to levels near current price (±5%)
|
|
support = [s for s in support_levels if s < current_price and s > current_price * 0.95]
|
|
resistance = [r for r in resistance_levels if r > current_price and r < current_price * 1.05]
|
|
|
|
# Sort and get closest
|
|
support = sorted(support, reverse=True)[:3] # Top 3 support levels
|
|
resistance = sorted(resistance)[:3] # Top 3 resistance levels
|
|
|
|
return {
|
|
'support': [round(s, 2) for s in support],
|
|
'resistance': [round(r, 2) for r in resistance],
|
|
'nearest_support': round(support[0], 2) if support else None,
|
|
'nearest_resistance': round(resistance[0], 2) if resistance else None,
|
|
}
|
|
|
|
@staticmethod
|
|
def _find_local_extrema(series: pd.Series, kind: str) -> List[float]:
|
|
"""Find local highs or lows"""
|
|
extrema = []
|
|
|
|
for i in range(2, len(series) - 2):
|
|
if kind == 'high':
|
|
# Local high
|
|
if (series.iloc[i] > series.iloc[i-1] and
|
|
series.iloc[i] > series.iloc[i-2] and
|
|
series.iloc[i] > series.iloc[i+1] and
|
|
series.iloc[i] > series.iloc[i+2]):
|
|
extrema.append(series.iloc[i])
|
|
else:
|
|
# Local low
|
|
if (series.iloc[i] < series.iloc[i-1] and
|
|
series.iloc[i] < series.iloc[i-2] and
|
|
series.iloc[i] < series.iloc[i+1] and
|
|
series.iloc[i] < series.iloc[i+2]):
|
|
extrema.append(series.iloc[i])
|
|
|
|
return extrema
|
|
|
|
@staticmethod
|
|
def _cluster_levels(levels: List[float], reference_price: float) -> List[float]:
|
|
"""Cluster similar price levels"""
|
|
if not levels:
|
|
return []
|
|
|
|
tolerance = reference_price * config.SR_TOLERANCE
|
|
clustered = []
|
|
sorted_levels = sorted(levels)
|
|
|
|
current_cluster = [sorted_levels[0]]
|
|
|
|
for level in sorted_levels[1:]:
|
|
if abs(level - current_cluster[-1]) < tolerance:
|
|
current_cluster.append(level)
|
|
else:
|
|
# Average the cluster
|
|
clustered.append(np.mean(current_cluster))
|
|
current_cluster = [level]
|
|
|
|
# Add last cluster
|
|
if current_cluster:
|
|
clustered.append(np.mean(current_cluster))
|
|
|
|
return clustered
|
|
|
|
@staticmethod
|
|
def detect_breakout(df: pd.DataFrame, sr_levels: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Detect if price has broken through S/R levels
|
|
|
|
Returns:
|
|
Dict with breakout info
|
|
"""
|
|
if df.empty or len(df) < 5:
|
|
return {'has_breakout': False}
|
|
|
|
latest = df.iloc[-1]
|
|
current_price = latest['close']
|
|
recent_high = df.tail(20)['high'].max()
|
|
recent_low = df.tail(20)['low'].min()
|
|
|
|
# Check resistance breakout
|
|
resistance = sr_levels.get('nearest_resistance')
|
|
if resistance and current_price > resistance:
|
|
# Confirm breakout (price closed above resistance)
|
|
return {
|
|
'has_breakout': True,
|
|
'type': 'resistance_breakout',
|
|
'level': resistance,
|
|
'confirmation': '价格突破压力位' if latest['close'] > resistance else '未确认',
|
|
}
|
|
|
|
# Check support breakdown
|
|
support = sr_levels.get('nearest_support')
|
|
if support and current_price < support:
|
|
return {
|
|
'has_breakout': True,
|
|
'type': 'support_breakdown',
|
|
'level': support,
|
|
'confirmation': '价格跌破支撑位' if latest['close'] < support else '未确认',
|
|
}
|
|
|
|
# Check if approaching key level
|
|
if resistance and abs(current_price - resistance) / resistance < 0.005: # Within 0.5%
|
|
return {
|
|
'has_breakout': False,
|
|
'approaching': 'resistance',
|
|
'level': resistance,
|
|
'distance_pct': round((resistance - current_price) / current_price * 100, 2),
|
|
}
|
|
|
|
if support and abs(current_price - support) / support < 0.005:
|
|
return {
|
|
'has_breakout': False,
|
|
'approaching': 'support',
|
|
'level': support,
|
|
'distance_pct': round((current_price - support) / current_price * 100, 2),
|
|
}
|
|
|
|
return {'has_breakout': False}
|
|
|
|
@staticmethod
|
|
def calculate_momentum(df: pd.DataFrame) -> Dict[str, Any]:
|
|
"""
|
|
Calculate momentum indicators
|
|
|
|
Returns:
|
|
Dict with momentum analysis
|
|
"""
|
|
if df.empty:
|
|
return {}
|
|
|
|
latest = df.iloc[-1]
|
|
prev = df.iloc[-2] if len(df) > 1 else latest
|
|
|
|
rsi = latest.get('rsi', 50)
|
|
macd_hist = latest.get('macd_hist', 0)
|
|
prev_macd_hist = prev.get('macd_hist', 0)
|
|
|
|
# RSI status
|
|
if rsi > 70:
|
|
rsi_status = '超买'
|
|
elif rsi > 60:
|
|
rsi_status = '强势'
|
|
elif rsi > 50:
|
|
rsi_status = '中性偏强'
|
|
elif rsi > 40:
|
|
rsi_status = '中性偏弱'
|
|
elif rsi > 30:
|
|
rsi_status = '弱势'
|
|
else:
|
|
rsi_status = '超卖'
|
|
|
|
# MACD signal
|
|
if macd_hist > 0 and prev_macd_hist <= 0:
|
|
macd_signal = '金叉'
|
|
elif macd_hist < 0 and prev_macd_hist >= 0:
|
|
macd_signal = '死叉'
|
|
elif macd_hist > 0:
|
|
if macd_hist > prev_macd_hist:
|
|
macd_signal = '金叉扩大'
|
|
else:
|
|
macd_signal = '金叉收窄'
|
|
else:
|
|
if abs(macd_hist) > abs(prev_macd_hist):
|
|
macd_signal = '死叉扩大'
|
|
else:
|
|
macd_signal = '死叉收窄'
|
|
|
|
# RSI trend
|
|
if len(df) >= 5:
|
|
rsi_5_ago = df.iloc[-5].get('rsi', 50)
|
|
rsi_trend = '上升中' if rsi > rsi_5_ago else '下降中'
|
|
else:
|
|
rsi_trend = '中性'
|
|
|
|
return {
|
|
'rsi': round(rsi, 1),
|
|
'rsi_status': rsi_status,
|
|
'rsi_trend': rsi_trend,
|
|
'macd_signal': macd_signal,
|
|
'macd_hist': round(macd_hist, 4),
|
|
}
|