644 lines
21 KiB
Python
644 lines
21 KiB
Python
"""
|
|
Fibonacci Support/Resistance Calculator
|
|
Calculates key price levels using Fibonacci retracement and extension
|
|
"""
|
|
import logging
|
|
from typing import Dict, Any, List, Tuple, Optional
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from .config import config
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FibonacciAnalyzer:
|
|
"""
|
|
Calculates support and resistance levels using:
|
|
1. Fibonacci retracement levels
|
|
2. Fibonacci extension levels
|
|
3. Pivot points (High/Low)
|
|
4. Price clustering zones
|
|
"""
|
|
|
|
# Standard Fibonacci ratios
|
|
FIB_RETRACEMENT = [0.0, 0.236, 0.382, 0.5, 0.618, 0.786, 1.0]
|
|
FIB_EXTENSION = [1.0, 1.272, 1.414, 1.618, 2.0, 2.618]
|
|
|
|
# Key levels for trading
|
|
KEY_FIB_LEVELS = [0.382, 0.5, 0.618] # Most important levels
|
|
|
|
@staticmethod
|
|
def calculate_fibonacci_levels(
|
|
df: pd.DataFrame,
|
|
lookback_periods: int = 100,
|
|
timeframe: str = '1h'
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Calculate Fibonacci retracement and extension levels
|
|
|
|
Args:
|
|
df: DataFrame with OHLCV data
|
|
lookback_periods: Number of periods to find swing high/low
|
|
timeframe: Timeframe for context
|
|
|
|
Returns:
|
|
Dict with Fibonacci levels and analysis
|
|
"""
|
|
if df.empty or len(df) < lookback_periods:
|
|
return FibonacciAnalyzer._empty_result(timeframe)
|
|
|
|
lookback_df = df.tail(lookback_periods)
|
|
current_price = float(df.iloc[-1]['close'])
|
|
|
|
# Find significant swing high and low
|
|
swing_high, swing_high_idx = FibonacciAnalyzer._find_swing_high(lookback_df)
|
|
swing_low, swing_low_idx = FibonacciAnalyzer._find_swing_low(lookback_df)
|
|
|
|
if swing_high is None or swing_low is None:
|
|
return FibonacciAnalyzer._empty_result(timeframe)
|
|
|
|
# Determine trend direction based on which came first
|
|
is_uptrend = swing_low_idx < swing_high_idx
|
|
|
|
# Calculate retracement levels
|
|
if is_uptrend:
|
|
# In uptrend, retracement levels from high to low
|
|
retracement_levels = FibonacciAnalyzer._calculate_retracement(
|
|
swing_high, swing_low, 'uptrend'
|
|
)
|
|
extension_levels = FibonacciAnalyzer._calculate_extension(
|
|
swing_low, swing_high, 'uptrend'
|
|
)
|
|
else:
|
|
# In downtrend, retracement levels from low to high
|
|
retracement_levels = FibonacciAnalyzer._calculate_retracement(
|
|
swing_low, swing_high, 'downtrend'
|
|
)
|
|
extension_levels = FibonacciAnalyzer._calculate_extension(
|
|
swing_high, swing_low, 'downtrend'
|
|
)
|
|
|
|
# Identify key support/resistance from Fibonacci
|
|
supports, resistances = FibonacciAnalyzer._identify_sr_from_fib(
|
|
retracement_levels, extension_levels, current_price
|
|
)
|
|
|
|
# Add pivot-based support/resistance
|
|
pivot_levels = FibonacciAnalyzer._calculate_pivot_points(lookback_df)
|
|
|
|
# Merge and cluster all levels
|
|
all_supports = FibonacciAnalyzer._merge_and_cluster_levels(
|
|
supports + pivot_levels.get('supports', []), current_price
|
|
)
|
|
all_resistances = FibonacciAnalyzer._merge_and_cluster_levels(
|
|
resistances + pivot_levels.get('resistances', []), current_price
|
|
)
|
|
|
|
# Find nearest levels
|
|
nearest_support = FibonacciAnalyzer._find_nearest_below(all_supports, current_price)
|
|
nearest_resistance = FibonacciAnalyzer._find_nearest_above(all_resistances, current_price)
|
|
|
|
# Calculate key zones
|
|
key_zones = FibonacciAnalyzer._identify_key_zones(
|
|
retracement_levels, current_price, is_uptrend
|
|
)
|
|
|
|
return {
|
|
'timeframe': timeframe,
|
|
'current_price': round(current_price, 2),
|
|
'trend_context': 'uptrend' if is_uptrend else 'downtrend',
|
|
'swing_high': round(swing_high, 2),
|
|
'swing_low': round(swing_low, 2),
|
|
'swing_range': round(swing_high - swing_low, 2),
|
|
'swing_range_pct': round((swing_high - swing_low) / swing_low * 100, 2),
|
|
|
|
# Fibonacci levels
|
|
'fibonacci': {
|
|
'retracement': {
|
|
str(ratio): round(level, 2)
|
|
for ratio, level in retracement_levels.items()
|
|
},
|
|
'extension': {
|
|
str(ratio): round(level, 2)
|
|
for ratio, level in extension_levels.items()
|
|
},
|
|
},
|
|
|
|
# Support/Resistance
|
|
'supports': [round(s, 2) for s in all_supports[:5]], # Top 5
|
|
'resistances': [round(r, 2) for r in all_resistances[:5]], # Top 5
|
|
'nearest_support': round(nearest_support, 2) if nearest_support else None,
|
|
'nearest_resistance': round(nearest_resistance, 2) if nearest_resistance else None,
|
|
|
|
# Distance to key levels
|
|
'distance_to_support': FibonacciAnalyzer._calculate_distance(
|
|
current_price, nearest_support
|
|
) if nearest_support else None,
|
|
'distance_to_resistance': FibonacciAnalyzer._calculate_distance(
|
|
current_price, nearest_resistance
|
|
) if nearest_resistance else None,
|
|
|
|
# Key zones for trading
|
|
'key_zones': key_zones,
|
|
|
|
# Pivot points
|
|
'pivot': pivot_levels.get('pivot'),
|
|
}
|
|
|
|
@staticmethod
|
|
def analyze_multi_timeframe_levels(
|
|
mtf_data: Dict[str, pd.DataFrame]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Analyze support/resistance across multiple timeframes
|
|
|
|
Args:
|
|
mtf_data: Dict mapping timeframe to DataFrame
|
|
|
|
Returns:
|
|
Consolidated support/resistance analysis
|
|
"""
|
|
results = {}
|
|
all_supports = []
|
|
all_resistances = []
|
|
current_price = None
|
|
|
|
# Analyze each timeframe
|
|
for tf, df in mtf_data.items():
|
|
if df.empty:
|
|
continue
|
|
|
|
# Adjust lookback based on timeframe
|
|
lookback = FibonacciAnalyzer._get_lookback_for_timeframe(tf)
|
|
tf_result = FibonacciAnalyzer.calculate_fibonacci_levels(
|
|
df, lookback_periods=lookback, timeframe=tf
|
|
)
|
|
results[tf] = tf_result
|
|
|
|
if current_price is None:
|
|
current_price = tf_result.get('current_price', 0)
|
|
|
|
# Collect levels with timeframe weight
|
|
weight = FibonacciAnalyzer._get_timeframe_weight(tf)
|
|
for s in tf_result.get('supports', []):
|
|
all_supports.append((s, weight, tf))
|
|
for r in tf_result.get('resistances', []):
|
|
all_resistances.append((r, weight, tf))
|
|
|
|
if current_price is None:
|
|
return {'error': 'No data available'}
|
|
|
|
# Find confluence zones (levels appearing in multiple timeframes)
|
|
support_confluence = FibonacciAnalyzer._find_confluence_zones(
|
|
all_supports, current_price, direction='below'
|
|
)
|
|
resistance_confluence = FibonacciAnalyzer._find_confluence_zones(
|
|
all_resistances, current_price, direction='above'
|
|
)
|
|
|
|
# Generate LLM-readable summary
|
|
summary = FibonacciAnalyzer._generate_sr_summary(
|
|
support_confluence, resistance_confluence, current_price
|
|
)
|
|
|
|
return {
|
|
'timeframes': results,
|
|
'confluence': {
|
|
'supports': support_confluence[:5],
|
|
'resistances': resistance_confluence[:5],
|
|
},
|
|
'strongest_support': support_confluence[0] if support_confluence else None,
|
|
'strongest_resistance': resistance_confluence[0] if resistance_confluence else None,
|
|
'current_price': round(current_price, 2),
|
|
'summary': summary,
|
|
}
|
|
|
|
@staticmethod
|
|
def _find_swing_high(df: pd.DataFrame) -> Tuple[Optional[float], Optional[int]]:
|
|
"""Find the most significant swing high in the data"""
|
|
highs = df['high'].values
|
|
if len(highs) < 5:
|
|
return None, None
|
|
|
|
# Find local maxima
|
|
swing_highs = []
|
|
for i in range(2, len(highs) - 2):
|
|
if (highs[i] > highs[i-1] and highs[i] > highs[i-2] and
|
|
highs[i] > highs[i+1] and highs[i] > highs[i+2]):
|
|
swing_highs.append((highs[i], i))
|
|
|
|
if not swing_highs:
|
|
# Fall back to absolute max
|
|
max_idx = np.argmax(highs)
|
|
return float(highs[max_idx]), int(max_idx)
|
|
|
|
# Return the highest swing
|
|
swing_highs.sort(key=lambda x: x[0], reverse=True)
|
|
return float(swing_highs[0][0]), int(swing_highs[0][1])
|
|
|
|
@staticmethod
|
|
def _find_swing_low(df: pd.DataFrame) -> Tuple[Optional[float], Optional[int]]:
|
|
"""Find the most significant swing low in the data"""
|
|
lows = df['low'].values
|
|
if len(lows) < 5:
|
|
return None, None
|
|
|
|
# Find local minima
|
|
swing_lows = []
|
|
for i in range(2, len(lows) - 2):
|
|
if (lows[i] < lows[i-1] and lows[i] < lows[i-2] and
|
|
lows[i] < lows[i+1] and lows[i] < lows[i+2]):
|
|
swing_lows.append((lows[i], i))
|
|
|
|
if not swing_lows:
|
|
# Fall back to absolute min
|
|
min_idx = np.argmin(lows)
|
|
return float(lows[min_idx]), int(min_idx)
|
|
|
|
# Return the lowest swing
|
|
swing_lows.sort(key=lambda x: x[0])
|
|
return float(swing_lows[0][0]), int(swing_lows[0][1])
|
|
|
|
@staticmethod
|
|
def _calculate_retracement(
|
|
high: float,
|
|
low: float,
|
|
trend: str
|
|
) -> Dict[float, float]:
|
|
"""Calculate Fibonacci retracement levels"""
|
|
diff = high - low
|
|
levels = {}
|
|
|
|
for ratio in FibonacciAnalyzer.FIB_RETRACEMENT:
|
|
if trend == 'uptrend':
|
|
# In uptrend, retracement down from high
|
|
levels[ratio] = high - (diff * ratio)
|
|
else:
|
|
# In downtrend, retracement up from low
|
|
levels[ratio] = low + (diff * ratio)
|
|
|
|
return levels
|
|
|
|
@staticmethod
|
|
def _calculate_extension(
|
|
start: float,
|
|
end: float,
|
|
trend: str
|
|
) -> Dict[float, float]:
|
|
"""Calculate Fibonacci extension levels"""
|
|
diff = abs(end - start)
|
|
levels = {}
|
|
|
|
for ratio in FibonacciAnalyzer.FIB_EXTENSION:
|
|
if trend == 'uptrend':
|
|
# Extension above the high
|
|
levels[ratio] = end + (diff * (ratio - 1))
|
|
else:
|
|
# Extension below the low
|
|
levels[ratio] = end - (diff * (ratio - 1))
|
|
|
|
return levels
|
|
|
|
@staticmethod
|
|
def _identify_sr_from_fib(
|
|
retracement: Dict[float, float],
|
|
extension: Dict[float, float],
|
|
current_price: float
|
|
) -> Tuple[List[float], List[float]]:
|
|
"""Identify support and resistance from Fibonacci levels"""
|
|
supports = []
|
|
resistances = []
|
|
|
|
# From retracement levels
|
|
for ratio, level in retracement.items():
|
|
if level < current_price:
|
|
supports.append(level)
|
|
elif level > current_price:
|
|
resistances.append(level)
|
|
|
|
# From extension levels
|
|
for ratio, level in extension.items():
|
|
if level < current_price:
|
|
supports.append(level)
|
|
elif level > current_price:
|
|
resistances.append(level)
|
|
|
|
# Sort
|
|
supports = sorted(set(supports), reverse=True)
|
|
resistances = sorted(set(resistances))
|
|
|
|
return supports, resistances
|
|
|
|
@staticmethod
|
|
def _calculate_pivot_points(df: pd.DataFrame) -> Dict[str, Any]:
|
|
"""Calculate pivot points (Classic formula)"""
|
|
if len(df) < 2:
|
|
return {'pivot': None, 'supports': [], 'resistances': []}
|
|
|
|
# Use previous period's data
|
|
prev = df.iloc[-2]
|
|
high = float(prev['high'])
|
|
low = float(prev['low'])
|
|
close = float(prev['close'])
|
|
|
|
# Classic pivot point
|
|
pivot = (high + low + close) / 3
|
|
|
|
# Support levels
|
|
s1 = 2 * pivot - high
|
|
s2 = pivot - (high - low)
|
|
s3 = low - 2 * (high - pivot)
|
|
|
|
# Resistance levels
|
|
r1 = 2 * pivot - low
|
|
r2 = pivot + (high - low)
|
|
r3 = high + 2 * (pivot - low)
|
|
|
|
return {
|
|
'pivot': round(pivot, 2),
|
|
'supports': [round(s1, 2), round(s2, 2), round(s3, 2)],
|
|
'resistances': [round(r1, 2), round(r2, 2), round(r3, 2)],
|
|
}
|
|
|
|
@staticmethod
|
|
def _merge_and_cluster_levels(
|
|
levels: List[float],
|
|
reference: float,
|
|
tolerance_pct: float = 0.005
|
|
) -> List[float]:
|
|
"""Merge similar levels within tolerance"""
|
|
if not levels:
|
|
return []
|
|
|
|
tolerance = reference * tolerance_pct
|
|
sorted_levels = sorted(levels)
|
|
clustered = []
|
|
|
|
current_cluster = [sorted_levels[0]]
|
|
for level in sorted_levels[1:]:
|
|
if abs(level - current_cluster[-1]) < tolerance:
|
|
current_cluster.append(level)
|
|
else:
|
|
# Average the cluster
|
|
clustered.append(np.mean(current_cluster))
|
|
current_cluster = [level]
|
|
|
|
# Add last cluster
|
|
if current_cluster:
|
|
clustered.append(np.mean(current_cluster))
|
|
|
|
return clustered
|
|
|
|
@staticmethod
|
|
def _find_nearest_below(levels: List[float], price: float) -> Optional[float]:
|
|
"""Find nearest level below current price"""
|
|
below = [l for l in levels if l < price]
|
|
if below:
|
|
return max(below)
|
|
return None
|
|
|
|
@staticmethod
|
|
def _find_nearest_above(levels: List[float], price: float) -> Optional[float]:
|
|
"""Find nearest level above current price"""
|
|
above = [l for l in levels if l > price]
|
|
if above:
|
|
return min(above)
|
|
return None
|
|
|
|
@staticmethod
|
|
def _calculate_distance(current: float, target: Optional[float]) -> Dict[str, float]:
|
|
"""Calculate distance to target level"""
|
|
if target is None or current == 0:
|
|
return None
|
|
diff = target - current
|
|
pct = (diff / current) * 100
|
|
return {
|
|
'absolute': round(abs(diff), 2),
|
|
'percentage': round(pct, 2),
|
|
'direction': 'above' if diff > 0 else 'below',
|
|
}
|
|
|
|
@staticmethod
|
|
def _identify_key_zones(
|
|
retracement: Dict[float, float],
|
|
current_price: float,
|
|
is_uptrend: bool
|
|
) -> List[Dict[str, Any]]:
|
|
"""Identify key trading zones from Fibonacci levels"""
|
|
zones = []
|
|
|
|
# Key Fibonacci levels with descriptions
|
|
key_levels = {
|
|
0.382: '浅回调区 (38.2%)',
|
|
0.5: '中度回调区 (50%)',
|
|
0.618: '黄金分割区 (61.8%)',
|
|
}
|
|
|
|
for ratio, description in key_levels.items():
|
|
if ratio in retracement:
|
|
level = retracement[ratio]
|
|
distance_pct = ((level - current_price) / current_price) * 100
|
|
|
|
zone = {
|
|
'ratio': ratio,
|
|
'level': round(level, 2),
|
|
'description': description,
|
|
'distance_pct': round(distance_pct, 2),
|
|
'type': 'support' if level < current_price else 'resistance',
|
|
}
|
|
|
|
# Importance rating
|
|
if ratio == 0.618:
|
|
zone['importance'] = 'high'
|
|
zone['importance_cn'] = '重要'
|
|
elif ratio == 0.5:
|
|
zone['importance'] = 'medium'
|
|
zone['importance_cn'] = '中等'
|
|
else:
|
|
zone['importance'] = 'low'
|
|
zone['importance_cn'] = '一般'
|
|
|
|
zones.append(zone)
|
|
|
|
return zones
|
|
|
|
@staticmethod
|
|
def _find_confluence_zones(
|
|
levels_with_weight: List[Tuple[float, float, str]],
|
|
current_price: float,
|
|
direction: str = 'below'
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Find confluence zones where multiple timeframes have similar levels
|
|
|
|
Args:
|
|
levels_with_weight: List of (level, weight, timeframe)
|
|
current_price: Current price
|
|
direction: 'below' for supports, 'above' for resistances
|
|
|
|
Returns:
|
|
List of confluence zones sorted by strength
|
|
"""
|
|
if not levels_with_weight:
|
|
return []
|
|
|
|
# Filter by direction
|
|
if direction == 'below':
|
|
filtered = [(l, w, tf) for l, w, tf in levels_with_weight if l < current_price]
|
|
else:
|
|
filtered = [(l, w, tf) for l, w, tf in levels_with_weight if l > current_price]
|
|
|
|
if not filtered:
|
|
return []
|
|
|
|
# Cluster levels with tolerance
|
|
tolerance = current_price * 0.005 # 0.5%
|
|
sorted_levels = sorted(filtered, key=lambda x: x[0])
|
|
|
|
clusters = []
|
|
current_cluster = [sorted_levels[0]]
|
|
|
|
for level_info in sorted_levels[1:]:
|
|
if abs(level_info[0] - current_cluster[-1][0]) < tolerance:
|
|
current_cluster.append(level_info)
|
|
else:
|
|
clusters.append(current_cluster)
|
|
current_cluster = [level_info]
|
|
|
|
if current_cluster:
|
|
clusters.append(current_cluster)
|
|
|
|
# Calculate confluence strength for each cluster
|
|
confluence_zones = []
|
|
for cluster in clusters:
|
|
avg_level = np.mean([l[0] for l in cluster])
|
|
total_weight = sum(l[1] for l in cluster)
|
|
timeframes = list(set(l[2] for l in cluster))
|
|
num_timeframes = len(timeframes)
|
|
|
|
# Confluence score based on weight and number of timeframes
|
|
confluence_score = total_weight * (1 + 0.2 * num_timeframes)
|
|
|
|
distance_pct = ((avg_level - current_price) / current_price) * 100
|
|
|
|
confluence_zones.append({
|
|
'level': round(avg_level, 2),
|
|
'confluence_score': round(confluence_score, 2),
|
|
'num_timeframes': num_timeframes,
|
|
'timeframes': timeframes,
|
|
'distance_pct': round(distance_pct, 2),
|
|
'strength': FibonacciAnalyzer._score_to_strength(confluence_score),
|
|
'strength_cn': FibonacciAnalyzer._score_to_strength_cn(confluence_score),
|
|
})
|
|
|
|
# Sort by confluence score (higher = stronger)
|
|
confluence_zones.sort(key=lambda x: x['confluence_score'], reverse=True)
|
|
|
|
return confluence_zones
|
|
|
|
@staticmethod
|
|
def _score_to_strength(score: float) -> str:
|
|
"""Convert confluence score to strength label"""
|
|
if score >= 0.8:
|
|
return 'very_strong'
|
|
elif score >= 0.5:
|
|
return 'strong'
|
|
elif score >= 0.3:
|
|
return 'moderate'
|
|
else:
|
|
return 'weak'
|
|
|
|
@staticmethod
|
|
def _score_to_strength_cn(score: float) -> str:
|
|
"""Convert confluence score to Chinese strength label"""
|
|
if score >= 0.8:
|
|
return '非常强'
|
|
elif score >= 0.5:
|
|
return '强'
|
|
elif score >= 0.3:
|
|
return '中等'
|
|
else:
|
|
return '弱'
|
|
|
|
@staticmethod
|
|
def _get_lookback_for_timeframe(tf: str) -> int:
|
|
"""Get appropriate lookback periods for each timeframe"""
|
|
lookbacks = {
|
|
'5m': 200,
|
|
'15m': 150,
|
|
'1h': 100,
|
|
'4h': 80,
|
|
'1d': 60,
|
|
'1w': 40,
|
|
}
|
|
return lookbacks.get(tf, 100)
|
|
|
|
@staticmethod
|
|
def _get_timeframe_weight(tf: str) -> float:
|
|
"""Get weight for each timeframe (longer = more weight)"""
|
|
weights = {
|
|
'5m': 0.05,
|
|
'15m': 0.10,
|
|
'1h': 0.15,
|
|
'4h': 0.20,
|
|
'1d': 0.25,
|
|
'1w': 0.25,
|
|
}
|
|
return weights.get(tf, 0.1)
|
|
|
|
@staticmethod
|
|
def _generate_sr_summary(
|
|
supports: List[Dict],
|
|
resistances: List[Dict],
|
|
current_price: float
|
|
) -> str:
|
|
"""Generate LLM-readable summary of support/resistance"""
|
|
parts = []
|
|
|
|
# Strongest support
|
|
if supports:
|
|
s = supports[0]
|
|
parts.append(
|
|
f"最强支撑: ${s['level']:,.0f} ({s['distance_pct']:.1f}%, "
|
|
f"{s['num_timeframes']}个周期确认, {s['strength_cn']})"
|
|
)
|
|
|
|
# Strongest resistance
|
|
if resistances:
|
|
r = resistances[0]
|
|
parts.append(
|
|
f"最强压力: ${r['level']:,.0f} ({r['distance_pct']:.1f}%, "
|
|
f"{r['num_timeframes']}个周期确认, {r['strength_cn']})"
|
|
)
|
|
|
|
# Price position
|
|
if supports and resistances:
|
|
s_dist = abs(supports[0]['distance_pct'])
|
|
r_dist = abs(resistances[0]['distance_pct'])
|
|
if s_dist < r_dist:
|
|
parts.append("价格更接近支撑位")
|
|
else:
|
|
parts.append("价格更接近压力位")
|
|
|
|
return "; ".join(parts) if parts else "无明确支撑压力位"
|
|
|
|
@staticmethod
|
|
def _empty_result(timeframe: str) -> Dict[str, Any]:
|
|
"""Return empty result when data is insufficient"""
|
|
return {
|
|
'timeframe': timeframe,
|
|
'error': 'insufficient_data',
|
|
'current_price': 0,
|
|
'supports': [],
|
|
'resistances': [],
|
|
'nearest_support': None,
|
|
'nearest_resistance': None,
|
|
'fibonacci': {
|
|
'retracement': {},
|
|
'extension': {},
|
|
},
|
|
'key_zones': [],
|
|
}
|