stock-ai-agent/backend/app/crypto_agent/signal_analyzer.py
2026-02-21 19:36:58 +08:00

1405 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
信号分析器 - 多周期技术分析和 LLM 深度分析
"""
import pandas as pd
from typing import Dict, Any, Optional, List
from app.utils.logger import logger
from app.services.llm_service import llm_service
class SignalAnalyzer:
"""交易信号分析器 - 波段交易优化版"""
# LLM 系统提示词 - 波段交易版
CRYPTO_ANALYST_PROMPT = """你是一位经验丰富的加密货币波段交易员,专注于捕捉 1-7 天的中等波段行情。
## 交易风格
- **波段交易**:持仓 1-7 天,不做超短线
- **顺势回调**:在趋势中寻找回调入场机会
- **风险控制**:单笔亏损不超过本金 2%
## 多周期分析框架
1. **4H 周期**:判断主趋势方向和强度
- 趋势明确:价格在 MA20 同侧运行 3 根以上 K 线
- 趋势强度:看 MACD 柱状图是否放大
2. **1H 周期**:确认趋势 + 寻找回调位置
- 上涨趋势中:等待回调到 MA20 或前低支撑
- 下跌趋势中:等待反弹到 MA20 或前高阻力
3. **15M 周期**:入场信号确认
- 做多RSI 从超卖回升 + MACD 金叉 + K 线企稳
- 做空RSI 从超买回落 + MACD 死叉 + K 线见顶
## 入场条件(波段做多)
1. 4H 趋势向上(价格 > MA20MACD > 0 或底背离)
2. 1H 回调到支撑位MA20 附近或前低)
3. 15M 出现止跌信号RSI < 40 回升,或 MACD 金叉)
4. 止损明确(前低下方),风险收益比 >= 1:2
## 入场条件(波段做空)
1. 4H 趋势向下(价格 < MA20MACD < 0 或顶背离)
2. 1H 反弹到阻力位MA20 附近或前高)
3. 15M 出现见顶信号RSI > 60 回落,或 MACD 死叉)
4. 止损明确(前高上方),风险收益比 >= 1:2
## 特殊情况处理
- **极度超卖RSI < 20**:不追空,等待反弹做多机会
- **极度超买RSI > 80**:不追多,等待回调做空机会
- **震荡市**:观望,等待突破方向
## 输出格式JSON
```json
{
"market_structure": {
"trend": "uptrend/downtrend/sideways",
"strength": "strong/moderate/weak",
"phase": "impulse/correction/reversal"
},
"key_levels": {
"resistance": [阻力位1, 阻力位2],
"support": [支撑位1, 支撑位2]
},
"signal": {
"quality": "A/B/C/D",
"action": "buy/sell/wait",
"confidence": 0-100,
"entry_zone": [入场区间下限, 入场区间上限],
"stop_loss": 止损价,
"targets": [目标1, 目标2],
"reason": "入场理由"
},
"risk_warning": "风险提示"
}
```
信号质量说明:
- A级趋势明确 + 回调到位 + 多重信号共振(置信度 80+
- B级趋势明确 + 信号较好(置信度 60-80
- C级有机会但需要更多确认置信度 40-60
- D级不建议交易置信度 < 40
重要:波段交易要有耐心,宁可错过也不要在不理想的位置入场。"""
def __init__(self):
"""初始化信号分析器"""
logger.info("信号分析器初始化完成")
# ==================== K线形态识别 ====================
def _detect_candlestick_patterns(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
识别 K 线形态
Args:
df: K线数据至少需要3根K线
Returns:
{
'bullish_patterns': [...], # 看涨形态
'bearish_patterns': [...], # 看跌形态
'pattern_weight': float # 形态权重
}
"""
if len(df) < 3:
return {'bullish_patterns': [], 'bearish_patterns': [], 'pattern_weight': 0}
bullish = []
bearish = []
weight = 0
# 获取最近3根K线
curr = df.iloc[-1]
prev = df.iloc[-2]
prev2 = df.iloc[-3]
curr_body = curr['close'] - curr['open']
curr_body_abs = abs(curr_body)
curr_range = curr['high'] - curr['low']
prev_body = prev['close'] - prev['open']
prev_body_abs = abs(prev_body)
# 避免除零
if curr_range == 0:
curr_range = 0.0001
# === 锤子线 / 倒锤子线 ===
upper_shadow = curr['high'] - max(curr['open'], curr['close'])
lower_shadow = min(curr['open'], curr['close']) - curr['low']
# 锤子线:下影线长,实体小,出现在下跌后
if lower_shadow > curr_body_abs * 2 and upper_shadow < curr_body_abs * 0.5:
if prev_body < 0: # 前一根是阴线
bullish.append("锤子线")
weight += 1.5
# 倒锤子线:上影线长,实体小,出现在下跌后
if upper_shadow > curr_body_abs * 2 and lower_shadow < curr_body_abs * 0.5:
if prev_body < 0:
bullish.append("倒锤子线")
weight += 1
# 上吊线:锤子线形态但出现在上涨后
if lower_shadow > curr_body_abs * 2 and upper_shadow < curr_body_abs * 0.5:
if prev_body > 0:
bearish.append("上吊线")
weight -= 1.5
# === 吞没形态 ===
# 看涨吞没:阳线实体完全包住前一根阴线
if curr_body > 0 and prev_body < 0:
if curr['open'] <= prev['close'] and curr['close'] >= prev['open']:
if curr_body_abs > prev_body_abs * 1.2:
bullish.append("看涨吞没")
weight += 2
# 看跌吞没:阴线实体完全包住前一根阳线
if curr_body < 0 and prev_body > 0:
if curr['open'] >= prev['close'] and curr['close'] <= prev['open']:
if curr_body_abs > prev_body_abs * 1.2:
bearish.append("看跌吞没")
weight -= 2
# === 十字星 ===
if curr_body_abs < curr_range * 0.1: # 实体很小
if upper_shadow > curr_range * 0.3 and lower_shadow > curr_range * 0.3:
# 十字星本身是中性的需要结合前一根K线判断
if prev_body > 0:
bearish.append("十字星(上涨后)")
weight -= 1
elif prev_body < 0:
bullish.append("十字星(下跌后)")
weight += 1
# === 早晨之星 / 黄昏之星 (3根K线形态) ===
prev2_body = prev2['close'] - prev2['open']
prev_range = prev['high'] - prev['low'] if prev['high'] != prev['low'] else 0.0001
# 早晨之星:大阴线 + 小实体(星) + 大阳线
if prev2_body < 0 and abs(prev2_body) > prev_range * 0.5: # 第一根大阴线
if abs(prev_body) < prev_range * 0.3: # 第二根小实体
if curr_body > 0 and curr_body_abs > curr_range * 0.5: # 第三根大阳线
if curr['close'] > (prev2['open'] + prev2['close']) / 2:
bullish.append("早晨之星")
weight += 2.5
# 黄昏之星:大阳线 + 小实体(星) + 大阴线
if prev2_body > 0 and prev2_body > prev_range * 0.5:
if abs(prev_body) < prev_range * 0.3:
if curr_body < 0 and curr_body_abs > curr_range * 0.5:
if curr['close'] < (prev2['open'] + prev2['close']) / 2:
bearish.append("黄昏之星")
weight -= 2.5
return {
'bullish_patterns': bullish,
'bearish_patterns': bearish,
'pattern_weight': weight
}
# ==================== 支撑阻力位计算 ====================
def _calculate_support_resistance(self, df: pd.DataFrame, current_price: float) -> Dict[str, Any]:
"""
计算支撑位和阻力位
Args:
df: K线数据建议使用1H或4H数据
current_price: 当前价格
Returns:
{
'supports': [支撑位1, 支撑位2],
'resistances': [阻力位1, 阻力位2],
'nearest_support': float,
'nearest_resistance': float,
'at_support': bool,
'at_resistance': bool
}
"""
if len(df) < 20:
return {
'supports': [], 'resistances': [],
'nearest_support': 0, 'nearest_resistance': 0,
'at_support': False, 'at_resistance': False
}
# 方法1使用近期高低点
highs = df['high'].tail(50).values
lows = df['low'].tail(50).values
# 找局部高点和低点
local_highs = []
local_lows = []
for i in range(2, len(highs) - 2):
# 局部高点
if highs[i] > highs[i-1] and highs[i] > highs[i-2] and \
highs[i] > highs[i+1] and highs[i] > highs[i+2]:
local_highs.append(highs[i])
# 局部低点
if lows[i] < lows[i-1] and lows[i] < lows[i-2] and \
lows[i] < lows[i+1] and lows[i] < lows[i+2]:
local_lows.append(lows[i])
# 方法2使用均线作为动态支撑阻力
ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns and pd.notna(df['ma20'].iloc[-1]) else 0
ma50 = df['ma50'].iloc[-1] if 'ma50' in df.columns and pd.notna(df['ma50'].iloc[-1]) else 0
# 方法3布林带
bb_upper = df['bb_upper'].iloc[-1] if 'bb_upper' in df.columns and pd.notna(df['bb_upper'].iloc[-1]) else 0
bb_lower = df['bb_lower'].iloc[-1] if 'bb_lower' in df.columns and pd.notna(df['bb_lower'].iloc[-1]) else 0
# 合并所有支撑位(低于当前价格)
all_supports = []
if local_lows:
all_supports.extend([l for l in local_lows if l < current_price])
if ma20 and ma20 < current_price:
all_supports.append(ma20)
if ma50 and ma50 < current_price:
all_supports.append(ma50)
if bb_lower and bb_lower < current_price:
all_supports.append(bb_lower)
# 合并所有阻力位(高于当前价格)
all_resistances = []
if local_highs:
all_resistances.extend([h for h in local_highs if h > current_price])
if ma20 and ma20 > current_price:
all_resistances.append(ma20)
if ma50 and ma50 > current_price:
all_resistances.append(ma50)
if bb_upper and bb_upper > current_price:
all_resistances.append(bb_upper)
# 排序并去重(合并相近的价位)
supports = sorted(set(all_supports), reverse=True)[:3] # 最近的3个支撑
resistances = sorted(set(all_resistances))[:3] # 最近的3个阻力
# 找最近的支撑和阻力
nearest_support = supports[0] if supports else 0
nearest_resistance = resistances[0] if resistances else 0
# 判断是否在支撑/阻力位附近1%范围内)
at_support = nearest_support > 0 and abs(current_price - nearest_support) / current_price < 0.01
at_resistance = nearest_resistance > 0 and abs(current_price - nearest_resistance) / current_price < 0.01
return {
'supports': supports,
'resistances': resistances,
'nearest_support': nearest_support,
'nearest_resistance': nearest_resistance,
'at_support': at_support,
'at_resistance': at_resistance
}
# ==================== 成交量分析 ====================
def _analyze_volume(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
分析成交量
Args:
df: K线数据需要包含 volume, volume_ma20, volume_ratio
Returns:
{
'volume_signal': 'high' | 'normal' | 'low',
'volume_trend': 'increasing' | 'decreasing' | 'stable',
'volume_confirms': bool, # 成交量是否确认价格走势
'volume_weight': float
}
"""
if len(df) < 5 or 'volume' not in df.columns:
return {
'volume_signal': 'normal',
'volume_trend': 'stable',
'volume_confirms': False,
'volume_weight': 0
}
latest = df.iloc[-1]
prev = df.iloc[-2]
# 量比判断
volume_ratio = latest.get('volume_ratio', 1)
if pd.isna(volume_ratio):
volume_ratio = 1
if volume_ratio > 2:
volume_signal = 'high'
elif volume_ratio < 0.5:
volume_signal = 'low'
else:
volume_signal = 'normal'
# 成交量趋势最近5根K线
recent_volumes = df['volume'].tail(5)
if len(recent_volumes) >= 5:
first_half = recent_volumes.iloc[:2].mean()
second_half = recent_volumes.iloc[-2:].mean()
if second_half > first_half * 1.3:
volume_trend = 'increasing'
elif second_half < first_half * 0.7:
volume_trend = 'decreasing'
else:
volume_trend = 'stable'
else:
volume_trend = 'stable'
# 判断成交量是否确认价格走势
price_up = latest['close'] > prev['close']
volume_up = latest['volume'] > prev['volume']
# 价涨量增 或 价跌量缩 = 确认
volume_confirms = (price_up and volume_up) or (not price_up and not volume_up)
# 计算权重
weight = 0
if volume_signal == 'high' and volume_confirms:
weight = 1.5
elif volume_signal == 'high' and not volume_confirms:
weight = -0.5 # 放量但不确认,可能是假突破
elif volume_signal == 'low':
weight = -0.5 # 缩量,信号可靠性降低
return {
'volume_signal': volume_signal,
'volume_trend': volume_trend,
'volume_confirms': volume_confirms,
'volume_weight': weight
}
# ==================== 短线信号分析 ====================
def _analyze_short_term_signal(self, m5_data: pd.DataFrame, m15_data: pd.DataFrame,
trend_direction: str) -> Dict[str, Any]:
"""
分析短线交易信号(超跌反弹/超涨回落)
短线信号特点:
- 不依赖大趋势方向,主要看短周期超买超卖
- 快进快出,持仓时间短
- 置信度上限较低,建议轻仓
Args:
m5_data: 5分钟K线数据
m15_data: 15分钟K线数据
trend_direction: 大趋势方向(用于顺势加分)
Returns:
{
'action': 'buy' | 'sell' | 'hold',
'confidence': 0-100,
'reasons': [...],
'signal_type': 'short_term'
}
"""
if len(m5_data) < 5 or len(m15_data) < 5:
return {'action': 'hold', 'confidence': 0, 'reasons': [], 'signal_type': 'short_term'}
m5_latest = m5_data.iloc[-1]
m5_prev = m5_data.iloc[-2]
m15_latest = m15_data.iloc[-1]
m15_prev = m15_data.iloc[-2]
buy_score = 0
sell_score = 0
buy_reasons = []
sell_reasons = []
# === 15M RSI 超卖/超买 ===
m15_rsi = m15_latest.get('rsi', 50)
if pd.notna(m15_rsi):
if m15_rsi < 25:
buy_reasons.append(f"15M RSI极度超卖({m15_rsi:.1f})")
buy_score += 3
elif m15_rsi < 35:
buy_reasons.append(f"15M RSI超卖({m15_rsi:.1f})")
buy_score += 2
elif m15_rsi > 75:
sell_reasons.append(f"15M RSI极度超买({m15_rsi:.1f})")
sell_score += 3
elif m15_rsi > 65:
sell_reasons.append(f"15M RSI超买({m15_rsi:.1f})")
sell_score += 2
# === 5M RSI 反转确认 ===
m5_rsi = m5_latest.get('rsi', 50)
m5_prev_rsi = m5_prev.get('rsi', 50)
if pd.notna(m5_rsi) and pd.notna(m5_prev_rsi):
# 超卖后回升
if m5_rsi < 40 and m5_rsi > m5_prev_rsi and m5_prev_rsi < 35:
buy_reasons.append(f"5M RSI反转回升({m5_prev_rsi:.1f}{m5_rsi:.1f})")
buy_score += 2
# 超买后回落
if m5_rsi > 60 and m5_rsi < m5_prev_rsi and m5_prev_rsi > 65:
sell_reasons.append(f"5M RSI反转回落({m5_prev_rsi:.1f}{m5_rsi:.1f})")
sell_score += 2
# === 布林带触轨 ===
if 'bb_lower' in m15_latest and pd.notna(m15_latest['bb_lower']):
if m15_latest['close'] <= m15_latest['bb_lower']:
buy_reasons.append("15M触及布林下轨")
buy_score += 2
elif m15_latest['close'] >= m15_latest['bb_upper']:
sell_reasons.append("15M触及布林上轨")
sell_score += 2
# === KDJ 超卖/超买 ===
m15_k = m15_latest.get('k', 50)
m15_d = m15_latest.get('d', 50)
if pd.notna(m15_k) and pd.notna(m15_d):
if m15_k < 20 and m15_d < 20:
buy_reasons.append(f"15M KDJ超卖区(K={m15_k:.1f})")
buy_score += 1.5
elif m15_k > 80 and m15_d > 80:
sell_reasons.append(f"15M KDJ超买区(K={m15_k:.1f})")
sell_score += 1.5
# === 5M K线反转形态 ===
if m5_latest['close'] > m5_latest['open'] and m5_prev['close'] < m5_prev['open']:
# 阴转阳
if m5_rsi < 40:
buy_reasons.append("5M阴转阳反转")
buy_score += 1.5
elif m5_latest['close'] < m5_latest['open'] and m5_prev['close'] > m5_prev['open']:
# 阳转阴
if m5_rsi > 60:
sell_reasons.append("5M阳转阴反转")
sell_score += 1.5
# === 5M MACD 金叉/死叉 ===
m5_macd = m5_latest.get('macd', 0)
m5_macd_signal = m5_latest.get('macd_signal', 0)
m5_prev_macd = m5_prev.get('macd', 0)
m5_prev_macd_signal = m5_prev.get('macd_signal', 0)
if pd.notna(m5_macd) and pd.notna(m5_prev_macd):
if m5_prev_macd <= m5_prev_macd_signal and m5_macd > m5_macd_signal:
buy_reasons.append("5M MACD金叉")
buy_score += 1.5
elif m5_prev_macd >= m5_prev_macd_signal and m5_macd < m5_macd_signal:
sell_reasons.append("5M MACD死叉")
sell_score += 1.5
# === 顺势加分 ===
if trend_direction == 'bullish' and buy_score > 0:
buy_score += 1
buy_reasons.append("顺大势做多")
elif trend_direction == 'bearish' and sell_score > 0:
sell_score += 1
sell_reasons.append("顺大势做空")
# === 决策 ===
action = 'hold'
confidence = 0
reasons = []
# 短线信号阈值较低,但置信度上限也低
if buy_score >= 4 and buy_score > sell_score:
action = 'buy'
confidence = min(35 + buy_score * 6, 65) # 短线最高65%
reasons = buy_reasons + ["📈 短线超跌反弹"]
elif sell_score >= 4 and sell_score > buy_score:
action = 'sell'
confidence = min(35 + sell_score * 6, 65)
reasons = sell_reasons + ["📉 短线超涨回落"]
return {
'action': action,
'confidence': confidence,
'reasons': reasons,
'signal_type': 'short_term',
'scores': {'buy': buy_score, 'sell': sell_score}
}
# ==================== 5M 精确入场 ====================
def _analyze_5m_entry(self, m5_data: pd.DataFrame, action: str) -> Dict[str, Any]:
"""
使用 5M 数据寻找精确入场点
Args:
m5_data: 5分钟K线数据
action: 'buy''sell'
Returns:
{
'entry_confirmed': bool,
'entry_reasons': [...],
'entry_weight': float
}
"""
if len(m5_data) < 5:
return {'entry_confirmed': False, 'entry_reasons': [], 'entry_weight': 0}
latest = m5_data.iloc[-1]
prev = m5_data.iloc[-2]
reasons = []
weight = 0
# 获取指标
rsi = latest.get('rsi', 50)
prev_rsi = prev.get('rsi', 50)
macd = latest.get('macd', 0)
macd_signal = latest.get('macd_signal', 0)
prev_macd = prev.get('macd', 0)
prev_macd_signal = prev.get('macd_signal', 0)
if action == 'buy':
# 5M RSI 从超卖回升
if pd.notna(rsi) and pd.notna(prev_rsi):
if rsi < 40 and rsi > prev_rsi:
reasons.append("5M RSI回升")
weight += 1
if rsi < 30:
reasons.append("5M RSI超卖")
weight += 0.5
# 5M MACD 金叉
if pd.notna(macd) and pd.notna(prev_macd):
if prev_macd <= prev_macd_signal and macd > macd_signal:
reasons.append("5M MACD金叉")
weight += 1.5
# 5M K线企稳阳线
if latest['close'] > latest['open']:
if prev['close'] < prev['open']: # 前一根是阴线
reasons.append("5M阳线反转")
weight += 1
elif action == 'sell':
# 5M RSI 从超买回落
if pd.notna(rsi) and pd.notna(prev_rsi):
if rsi > 60 and rsi < prev_rsi:
reasons.append("5M RSI回落")
weight += 1
if rsi > 70:
reasons.append("5M RSI超买")
weight += 0.5
# 5M MACD 死叉
if pd.notna(macd) and pd.notna(prev_macd):
if prev_macd >= prev_macd_signal and macd < macd_signal:
reasons.append("5M MACD死叉")
weight += 1.5
# 5M K线见顶阴线
if latest['close'] < latest['open']:
if prev['close'] > prev['open']:
reasons.append("5M阴线反转")
weight += 1
entry_confirmed = weight >= 2
return {
'entry_confirmed': entry_confirmed,
'entry_reasons': reasons,
'entry_weight': weight
}
def analyze_trend(self, h1_data: pd.DataFrame, h4_data: pd.DataFrame) -> Dict[str, Any]:
"""
分析趋势方向和强度(波段交易优化版)
Args:
h1_data: 1小时K线数据含技术指标
h4_data: 4小时K线数据含技术指标
Returns:
{
'direction': 'bullish' | 'bearish' | 'neutral',
'strength': 'strong' | 'moderate' | 'weak',
'phase': 'impulse' | 'correction' | 'reversal',
'h4_score': float,
'h1_score': float
}
"""
if h1_data.empty or h4_data.empty:
return {
'direction': 'neutral',
'strength': 'weak',
'phase': 'sideways',
'h4_score': 0,
'h1_score': 0
}
# 获取最新数据
h1_latest = h1_data.iloc[-1]
h4_latest = h4_data.iloc[-1]
# 计算各周期的趋势得分
h4_score, h4_details = self._calculate_trend_score(h4_latest)
h1_score, h1_details = self._calculate_trend_score(h1_latest)
# 判断趋势方向4H 为主)
if h4_score > 0.3:
direction = 'bullish'
elif h4_score < -0.3:
direction = 'bearish'
else:
direction = 'neutral'
# 判断趋势强度
strength = self._assess_trend_strength(h4_data, h1_data)
# 判断当前阶段(是主升/主跌还是回调)
phase = self._detect_market_phase(h4_data, h1_data, direction)
# 检查极端情况
h4_rsi = h4_latest.get('rsi', 50)
extreme_warning = ""
if pd.notna(h4_rsi):
if h4_rsi < 20:
extreme_warning = f" [RSI={h4_rsi:.1f} 极度超卖]"
phase = 'oversold'
elif h4_rsi > 80:
extreme_warning = f" [RSI={h4_rsi:.1f} 极度超买]"
phase = 'overbought'
logger.info(f"趋势分析: 方向={direction}, 强度={strength}, 阶段={phase} | "
f"4H={h4_score:.2f}{h4_details}, 1H={h1_score:.2f}{h1_details}{extreme_warning}")
return {
'direction': direction,
'strength': strength,
'phase': phase,
'h4_score': h4_score,
'h1_score': h1_score
}
def _assess_trend_strength(self, h4_data: pd.DataFrame, h1_data: pd.DataFrame) -> str:
"""评估趋势强度"""
if len(h4_data) < 5:
return 'weak'
h4_latest = h4_data.iloc[-1]
# 检查 MACD 柱状图是否放大
macd_hist = h4_data['macd_hist'].tail(5)
macd_expanding = False
if len(macd_hist) >= 3:
recent_abs = abs(macd_hist.iloc[-1])
prev_abs = abs(macd_hist.iloc[-3])
if recent_abs > prev_abs * 1.2:
macd_expanding = True
# 检查价格是否持续在 MA20 同侧
close_prices = h4_data['close'].tail(5)
ma20_values = h4_data['ma20'].tail(5)
consistent_side = True
if pd.notna(ma20_values.iloc[-1]):
above_ma = close_prices > ma20_values
consistent_side = above_ma.all() or (~above_ma).all()
# 检查 RSI 是否在趋势区间
rsi = h4_latest.get('rsi', 50)
rsi_trending = 40 < rsi < 60 # 中性区间表示趋势不强
if macd_expanding and consistent_side and not rsi_trending:
return 'strong'
elif consistent_side:
return 'moderate'
else:
return 'weak'
def _detect_market_phase(self, h4_data: pd.DataFrame, h1_data: pd.DataFrame,
direction: str) -> str:
"""检测市场阶段(主升/主跌 vs 回调)"""
if len(h1_data) < 10:
return 'unknown'
h1_latest = h1_data.iloc[-1]
h4_latest = h4_data.iloc[-1]
# 获取 1H 的短期趋势
h1_ma5 = h1_latest.get('ma5', 0)
h1_ma20 = h1_latest.get('ma20', 0)
h1_close = h1_latest.get('close', 0)
if not (pd.notna(h1_ma5) and pd.notna(h1_ma20)):
return 'unknown'
# 判断 1H 是否在回调
if direction == 'bullish':
# 上涨趋势中1H 价格回落到 MA20 附近 = 回调
if h1_close < h1_ma5 and h1_close > h1_ma20 * 0.98:
return 'correction' # 回调中,可能是入场机会
elif h1_close > h1_ma5:
return 'impulse' # 主升浪
elif direction == 'bearish':
# 下跌趋势中1H 价格反弹到 MA20 附近 = 反弹
if h1_close > h1_ma5 and h1_close < h1_ma20 * 1.02:
return 'correction' # 反弹中,可能是做空机会
elif h1_close < h1_ma5:
return 'impulse' # 主跌浪
return 'sideways'
def _calculate_trend_score(self, data: pd.Series) -> tuple:
"""
计算单周期趋势得分
Args:
data: 包含技术指标的数据行
Returns:
(得分, 详情字符串)
"""
score = 0.0
count = 0
details = []
# 价格与均线关系
if 'close' in data and 'ma20' in data and pd.notna(data['ma20']):
if data['close'] > data['ma20']:
score += 1
details.append("价格>MA20")
else:
score -= 1
details.append("价格<MA20")
count += 1
# MA5 与 MA20 关系
if 'ma5' in data and 'ma20' in data and pd.notna(data['ma5']) and pd.notna(data['ma20']):
if data['ma5'] > data['ma20']:
score += 1
details.append("MA5>MA20")
else:
score -= 1
details.append("MA5<MA20")
count += 1
# MACD
if 'macd' in data and 'macd_signal' in data and pd.notna(data['macd']):
if data['macd'] > data['macd_signal']:
score += 1
details.append("MACD多")
else:
score -= 1
details.append("MACD空")
count += 1
# RSI
if 'rsi' in data and pd.notna(data['rsi']):
if data['rsi'] > 50:
score += 0.5
else:
score -= 0.5
count += 0.5
final_score = score / count if count > 0 else 0
detail_str = f"({','.join(details)})" if details else ""
return final_score, detail_str
def analyze_entry_signal(self, m5_data: pd.DataFrame, m15_data: pd.DataFrame,
trend: Dict[str, Any], h1_data: pd.DataFrame = None) -> Dict[str, Any]:
"""
分析 15M 进场信号(波段交易优化版 - 增强版)
新增功能:
- 成交量确认
- K线形态识别
- 支撑阻力位判断
- 5M精确入场
Args:
m5_data: 5分钟K线数据用于精确入场
m15_data: 15分钟K线数据主要入场周期
trend: 趋势分析结果
h1_data: 1小时K线数据用于支撑阻力位计算可选
Returns:
{
'action': 'buy' | 'sell' | 'hold',
'confidence': 0-100,
'signal_grade': 'A' | 'B' | 'C' | 'D',
'reasons': [...],
'indicators': {...},
'patterns': {...},
'levels': {...},
'volume_analysis': {...}
}
"""
if m5_data.empty or m15_data.empty:
return {'action': 'hold', 'confidence': 0, 'signal_grade': 'D',
'reasons': ['数据不足'], 'indicators': {}}
# 兼容旧格式(如果 trend 是字符串)
if isinstance(trend, str):
trend_direction = trend
trend_phase = 'unknown'
trend_strength = 'moderate'
else:
trend_direction = trend.get('direction', 'neutral')
trend_phase = trend.get('phase', 'unknown')
trend_strength = trend.get('strength', 'moderate')
m15_latest = m15_data.iloc[-1]
current_price = float(m15_latest['close'])
# 收集信号
buy_signals = []
sell_signals = []
signal_weights = {'buy': 0, 'sell': 0}
# ==================== 1. 传统技术指标信号 ====================
# === RSI 信号 ===
if 'rsi' in m15_latest and pd.notna(m15_latest['rsi']):
rsi = m15_latest['rsi']
if rsi < 30:
buy_signals.append(f"RSI超卖({rsi:.1f})")
signal_weights['buy'] += 2
elif rsi < 40 and len(m15_data) >= 2:
prev_rsi = m15_data.iloc[-2].get('rsi', 50)
if pd.notna(prev_rsi) and rsi > prev_rsi:
buy_signals.append(f"RSI回升({prev_rsi:.1f}{rsi:.1f})")
signal_weights['buy'] += 1.5
elif rsi > 70:
sell_signals.append(f"RSI超买({rsi:.1f})")
signal_weights['sell'] += 2
elif rsi > 60 and len(m15_data) >= 2:
prev_rsi = m15_data.iloc[-2].get('rsi', 50)
if pd.notna(prev_rsi) and rsi < prev_rsi:
sell_signals.append(f"RSI回落({prev_rsi:.1f}{rsi:.1f})")
signal_weights['sell'] += 1.5
# === MACD 信号 ===
if len(m15_data) >= 2:
prev = m15_data.iloc[-2]
if 'macd' in m15_latest and 'macd_signal' in m15_latest:
if pd.notna(m15_latest['macd']) and pd.notna(prev['macd']):
if prev['macd'] <= prev['macd_signal'] and m15_latest['macd'] > m15_latest['macd_signal']:
buy_signals.append("MACD金叉")
signal_weights['buy'] += 2
elif prev['macd'] >= prev['macd_signal'] and m15_latest['macd'] < m15_latest['macd_signal']:
sell_signals.append("MACD死叉")
signal_weights['sell'] += 2
elif abs(m15_latest['macd_hist']) < abs(prev['macd_hist']) * 0.7:
if m15_latest['macd_hist'] > 0:
sell_signals.append("MACD动能减弱")
signal_weights['sell'] += 0.5
else:
buy_signals.append("MACD动能减弱")
signal_weights['buy'] += 0.5
# === 布林带信号 ===
if 'close' in m15_latest and 'bb_lower' in m15_latest and 'bb_upper' in m15_latest:
if pd.notna(m15_latest['bb_lower']) and pd.notna(m15_latest['bb_upper']):
bb_middle = m15_latest.get('bb_middle', (m15_latest['bb_upper'] + m15_latest['bb_lower']) / 2)
if m15_latest['close'] < m15_latest['bb_lower']:
buy_signals.append("触及布林下轨")
signal_weights['buy'] += 1.5
elif m15_latest['close'] > m15_latest['bb_upper']:
sell_signals.append("触及布林上轨")
signal_weights['sell'] += 1.5
elif len(m15_data) >= 2:
prev_close = m15_data.iloc[-2]['close']
if prev_close < bb_middle and m15_latest['close'] > bb_middle:
buy_signals.append("突破布林中轨")
signal_weights['buy'] += 1
elif prev_close > bb_middle and m15_latest['close'] < bb_middle:
sell_signals.append("跌破布林中轨")
signal_weights['sell'] += 1
# === KDJ 信号 ===
if 'k' in m15_latest and 'd' in m15_latest and len(m15_data) >= 2:
prev = m15_data.iloc[-2]
if pd.notna(m15_latest['k']) and pd.notna(prev['k']):
if prev['k'] <= prev['d'] and m15_latest['k'] > m15_latest['d']:
if m15_latest['k'] < 30:
buy_signals.append("KDJ低位金叉")
signal_weights['buy'] += 1.5
else:
buy_signals.append("KDJ金叉")
signal_weights['buy'] += 0.5
elif prev['k'] >= prev['d'] and m15_latest['k'] < m15_latest['d']:
if m15_latest['k'] > 70:
sell_signals.append("KDJ高位死叉")
signal_weights['sell'] += 1.5
else:
sell_signals.append("KDJ死叉")
signal_weights['sell'] += 0.5
# ==================== 2. K线形态识别 ====================
patterns = self._detect_candlestick_patterns(m15_data)
if patterns['bullish_patterns']:
buy_signals.extend(patterns['bullish_patterns'])
signal_weights['buy'] += patterns['pattern_weight']
if patterns['bearish_patterns']:
sell_signals.extend(patterns['bearish_patterns'])
signal_weights['sell'] += abs(patterns['pattern_weight'])
# ==================== 3. 成交量分析 ====================
volume_analysis = self._analyze_volume(m15_data)
if volume_analysis['volume_signal'] == 'high':
if volume_analysis['volume_confirms']:
# 放量确认
if signal_weights['buy'] > signal_weights['sell']:
buy_signals.append(f"放量确认(量比{m15_latest.get('volume_ratio', 1):.1f})")
signal_weights['buy'] += volume_analysis['volume_weight']
else:
sell_signals.append(f"放量确认(量比{m15_latest.get('volume_ratio', 1):.1f})")
signal_weights['sell'] += volume_analysis['volume_weight']
else:
# 放量不确认,可能是假信号
if signal_weights['buy'] > signal_weights['sell']:
buy_signals.append("放量但不确认(警惕)")
signal_weights['buy'] += volume_analysis['volume_weight'] # 负权重
else:
sell_signals.append("放量但不确认(警惕)")
signal_weights['sell'] += volume_analysis['volume_weight']
# ==================== 4. 支撑阻力位分析 ====================
levels = {}
if h1_data is not None and not h1_data.empty:
levels = self._calculate_support_resistance(h1_data, current_price)
if levels.get('at_support') and trend_direction == 'bullish':
buy_signals.append(f"触及支撑位({levels['nearest_support']:.2f})")
signal_weights['buy'] += 1.5
if levels.get('at_resistance') and trend_direction == 'bearish':
sell_signals.append(f"触及阻力位({levels['nearest_resistance']:.2f})")
signal_weights['sell'] += 1.5
# ==================== 5. 根据趋势和阶段决定动作(中长线信号)====================
action = 'hold'
confidence = 0
reasons = []
signal_grade = 'D'
signal_type = 'swing' # 默认波段信号
# 波段交易核心逻辑:在回调中寻找入场机会
if trend_direction == 'bullish':
if trend_phase == 'correction' and signal_weights['buy'] >= 3:
action = 'buy'
confidence = min(40 + signal_weights['buy'] * 10, 95)
reasons = buy_signals + [f"📊 波段信号: 上涨趋势回调({trend_strength})"]
signal_grade = 'A' if confidence >= 80 else ('B' if confidence >= 60 else 'C')
elif trend_phase == 'impulse' and signal_weights['buy'] >= 4:
action = 'buy'
confidence = min(30 + signal_weights['buy'] * 8, 80)
reasons = buy_signals + ["📊 波段信号: 主升浪追多"]
signal_grade = 'B' if confidence >= 60 else 'C'
elif trend_phase == 'oversold' and signal_weights['buy'] >= 3:
# 极度超卖时允许抄底,但降低置信度并提示风险
action = 'buy'
confidence = min(30 + signal_weights['buy'] * 8, 70) # 最高70%
reasons = buy_signals + ["📊 波段信号: 极度超卖抄底(高风险)", "建议轻仓试探"]
signal_grade = 'C' # 最高C级
elif trend_phase == 'overbought':
reasons = ['极度超买,不宜追多']
elif trend_direction == 'bearish':
if trend_phase == 'correction' and signal_weights['sell'] >= 3:
action = 'sell'
confidence = min(40 + signal_weights['sell'] * 10, 95)
reasons = sell_signals + [f"📊 波段信号: 下跌趋势反弹({trend_strength})"]
signal_grade = 'A' if confidence >= 80 else ('B' if confidence >= 60 else 'C')
elif trend_phase == 'impulse' and signal_weights['sell'] >= 4:
action = 'sell'
confidence = min(30 + signal_weights['sell'] * 8, 80)
reasons = sell_signals + ["📊 波段信号: 主跌浪追空"]
signal_grade = 'B' if confidence >= 60 else 'C'
elif trend_phase == 'overbought' and signal_weights['sell'] >= 3:
# 极度超买时允许做空,但降低置信度并提示风险
action = 'sell'
confidence = min(30 + signal_weights['sell'] * 8, 70) # 最高70%
reasons = sell_signals + ["📊 波段信号: 极度超买摸顶(高风险)", "建议轻仓试探"]
signal_grade = 'C' # 最高C级
elif trend_phase == 'oversold':
reasons = ['极度超卖,不宜追空']
# ==================== 5.5 短线信号检测(中长线无信号时)====================
# 如果中长线没有触发信号,检查短线超跌反弹/超涨回落机会
if action == 'hold':
short_term = self._analyze_short_term_signal(m5_data, m15_data, trend_direction)
if short_term['action'] != 'hold' and short_term['confidence'] >= 50:
action = short_term['action']
confidence = short_term['confidence']
reasons = short_term['reasons']
signal_type = 'short_term'
signal_grade = 'C' # 短线信号最高C级
# 如果还是没有信号
if action == 'hold' and trend_direction == 'neutral':
reasons = ['趋势不明确,观望']
# ==================== 6. 5M 精确入场确认 ====================
if action != 'hold' and not m5_data.empty:
entry_5m = self._analyze_5m_entry(m5_data, action)
if entry_5m['entry_confirmed']:
confidence = min(confidence + 10, 95)
reasons.extend(entry_5m['entry_reasons'])
if signal_grade == 'B':
signal_grade = 'A'
elif signal_grade == 'C':
signal_grade = 'B'
elif entry_5m['entry_weight'] < 1:
# 5M 没有确认,降低置信度
confidence = max(confidence - 10, 0)
reasons.append("5M未确认入场")
if not reasons:
reasons = ['信号不足,继续观望']
# 收集指标数据
indicators = {}
for col in ['rsi', 'macd', 'macd_signal', 'macd_hist', 'k', 'd', 'j', 'close', 'ma20', 'volume_ratio']:
if col in m15_latest and pd.notna(m15_latest[col]):
indicators[col] = float(m15_latest[col])
# 记录详细日志(简化版,详细日志在 crypto_agent 中输出)
if action != 'hold':
type_text = "短线" if signal_type == 'short_term' else "波段"
logger.debug(f"信号详情: [{type_text}] {action} {confidence}% {signal_grade} | 买权重={signal_weights['buy']:.1f} 卖权重={signal_weights['sell']:.1f}")
return {
'action': action,
'confidence': confidence,
'signal_grade': signal_grade,
'signal_type': signal_type, # 'swing' 波段 | 'short_term' 短线
'reasons': reasons,
'indicators': indicators,
'trend_info': {
'direction': trend_direction,
'phase': trend_phase,
'strength': trend_strength
},
'patterns': patterns,
'volume_analysis': volume_analysis,
'levels': levels,
'signal_weights': signal_weights
}
async def llm_analyze(self, data: Dict[str, pd.DataFrame], signal: Dict[str, Any],
symbol: str) -> Dict[str, Any]:
"""
使用 LLM 进行深度分析
Args:
data: 多周期K线数据
signal: 初步信号分析结果
symbol: 交易对
Returns:
LLM 分析结果(结构化数据)
"""
try:
# 构建分析提示
prompt = self._build_analysis_prompt(data, signal, symbol)
# 调用 LLM使用异步方法避免阻塞事件循环
response = await llm_service.achat([
{"role": "system", "content": self.CRYPTO_ANALYST_PROMPT},
{"role": "user", "content": prompt}
])
if response:
logger.info(f"LLM 分析完成: {symbol}")
# 解析 JSON 响应
return self._parse_llm_response(response)
else:
return {"error": "LLM 分析暂时不可用", "raw": ""}
except Exception as e:
logger.error(f"LLM 分析失败: {e}")
return {"error": str(e), "raw": ""}
def _parse_llm_response(self, response: str) -> Dict[str, Any]:
"""
解析 LLM 的 JSON 响应
Args:
response: LLM 原始响应
Returns:
解析后的结构化数据
"""
import json
import re
result = {
"raw": response,
"parsed": None,
"summary": ""
}
try:
# 尝试提取 JSON 块
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response)
if json_match:
json_str = json_match.group(1)
else:
# 尝试直接解析整个响应
json_str = response
# 解析 JSON
parsed = json.loads(json_str)
result["parsed"] = parsed
# 生成摘要
if parsed:
recommendation = parsed.get("recommendation", {})
action = recommendation.get("action", "wait")
confidence = recommendation.get("confidence", 0)
reason = recommendation.get("reason", "")
if action == "wait":
result["summary"] = f"建议观望。{parsed.get('risk_warning', '')}"
else:
action_text = "做多" if action == "buy" else "做空"
result["summary"] = f"建议{action_text},置信度{confidence}%。{reason}"
except json.JSONDecodeError:
# JSON 解析失败,提取关键信息
logger.warning("LLM 响应不是有效 JSON尝试提取关键信息")
result["summary"] = self._extract_summary_from_text(response)
return result
def _extract_summary_from_text(self, text: str) -> str:
"""从非 JSON 文本中提取摘要"""
# 简单提取前 200 字符作为摘要
text = text.strip()
if len(text) > 200:
return text[:200] + "..."
return text
def _build_analysis_prompt(self, data: Dict[str, pd.DataFrame], signal: Dict[str, Any],
symbol: str) -> str:
"""构建 LLM 分析提示 - 优化版"""
parts = [f"# {symbol} 技术分析数据\n"]
# 当前价格
current_price = float(data['5m'].iloc[-1]['close'])
parts.append(f"**当前价格**: ${current_price:,.2f}\n")
# 添加各周期指标摘要
for interval in ['4h', '1h', '15m']:
df = data.get(interval)
if df is None or df.empty:
continue
latest = df.iloc[-1]
parts.append(f"\n## {interval.upper()} 周期指标")
# 价格与均线关系
close = latest.get('close', 0)
ma20 = latest.get('ma20', 0)
ma50 = latest.get('ma50', 0)
if pd.notna(ma20):
position = "上方" if close > ma20 else "下方"
parts.append(f"- 价格在 MA20 {position} (MA20={ma20:.2f})")
if pd.notna(ma50):
parts.append(f"- MA50: {ma50:.2f}")
# RSI
rsi = latest.get('rsi', 0)
if pd.notna(rsi):
rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性")
parts.append(f"- RSI: {rsi:.1f} ({rsi_status})")
# MACD
macd = latest.get('macd', 0)
macd_signal = latest.get('macd_signal', 0)
if pd.notna(macd) and pd.notna(macd_signal):
macd_status = "多头" if macd > macd_signal else "空头"
parts.append(f"- MACD: {macd:.2f}, Signal: {macd_signal:.2f} ({macd_status})")
# 布林带
bb_upper = latest.get('bb_upper', 0)
bb_lower = latest.get('bb_lower', 0)
if pd.notna(bb_upper) and pd.notna(bb_lower):
parts.append(f"- 布林带: 上轨={bb_upper:.2f}, 下轨={bb_lower:.2f}")
# 添加最近 5 根 15M K 线(让 LLM 看形态)
parts.append("\n## 最近 5 根 15M K线")
parts.append("| 时间 | 开盘 | 最高 | 最低 | 收盘 | 涨跌 |")
parts.append("|------|------|------|------|------|------|")
df_15m = data.get('15m')
if df_15m is not None and len(df_15m) >= 5:
for i in range(-5, 0):
row = df_15m.iloc[i]
change = ((row['close'] - row['open']) / row['open']) * 100
change_str = f"+{change:.2f}%" if change >= 0 else f"{change:.2f}%"
time_str = row['open_time'].strftime('%H:%M') if pd.notna(row['open_time']) else 'N/A'
parts.append(f"| {time_str} | {row['open']:.2f} | {row['high']:.2f} | {row['low']:.2f} | {row['close']:.2f} | {change_str} |")
# 计算关键价位
parts.append("\n## 关键价位参考")
df_1h = data.get('1h')
if df_1h is not None and len(df_1h) >= 20:
recent_high = df_1h['high'].tail(20).max()
recent_low = df_1h['low'].tail(20).min()
parts.append(f"- 近期高点: ${recent_high:,.2f}")
parts.append(f"- 近期低点: ${recent_low:,.2f}")
# 初步信号分析结果
parts.append(f"\n## 规则引擎初步判断")
parts.append(f"- 趋势: {signal.get('trend', 'unknown')}")
parts.append(f"- 信号: {signal.get('action', 'hold')}")
parts.append(f"- 置信度: {signal.get('confidence', 0)}%")
parts.append(f"- 触发原因: {', '.join(signal.get('reasons', []))}")
parts.append("\n---")
parts.append("请基于以上数据进行分析,严格按照 JSON 格式输出你的判断。")
return "\n".join(parts)
def calculate_stop_loss_take_profit(self, price: float, action: str,
atr: float, df: pd.DataFrame = None) -> Dict[str, float]:
"""
计算止损止盈位置 - 结构化止损 + 移动止盈策略
策略:
- 止损:基于关键支撑/阻力位(前高前低、均线)
- 止盈:不设固定止盈,通过移动止损锁定利润
Args:
price: 当前价格
action: 'buy''sell'
atr: ATR 值
df: K线数据用于计算关键价位
Returns:
{'stop_loss': float, 'take_profit': float, 'method': str}
"""
if df is not None and len(df) >= 20:
# 使用结构化止损
result = self._calculate_structured_stops(price, action, atr, df)
if result['stop_loss'] > 0:
return result
# 回退到 ATR 止损
if action == 'buy':
stop_loss = price - atr * 1.5 # 降低到 1.5 倍 ATR
# 止盈设置得很远,主要靠移动止损
take_profit = price * 1.15 # 15% 作为一个"保险"止盈位
method = 'atr'
elif action == 'sell':
stop_loss = price + atr * 1.5
# 止盈设置得很远,主要靠移动止损
take_profit = price * 0.85 # 15% 作为一个"保险"止盈位
method = 'atr'
else:
return {'stop_loss': 0, 'take_profit': 0, 'method': 'none'}
return {
'stop_loss': round(stop_loss, 2),
'take_profit': round(take_profit, 2),
'method': method
}
def _calculate_structured_stops(self, price: float, action: str,
atr: float, df: pd.DataFrame) -> Dict[str, float]:
"""
基于关键价位计算结构化止损(无固定止盈,靠移动止损)
原则:
- 做多止损在前低/MA20下方
- 做空止损在前高/MA20上方
- 止损距离控制在 1.5-3% ATR 范围内
- 止盈设置得很远,主要靠移动止损锁定利润
"""
stop_loss = 0
take_profit = 0
# 获取关键价位
recent_data = df.tail(50)
if action == 'buy':
# === 做多止损 ===
# 1. 查找近期支撑位(前低)
recent_lows = recent_data['low'].values
# 找出低于当前价的低点
valid_lows = [low for low in recent_lows if low < price * 0.99]
if valid_lows:
nearest_low = max(valid_lows) # 最近的低点
# 止损放在前低下方 0.3%
sl_candidate = nearest_low * 0.997
else:
sl_candidate = 0
# 2. 检查 MA20 作为支撑
ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns else 0
if pd.notna(ma20) and ma20 < price * 0.98:
sl_ma20 = ma20 * 0.995 # MA20 下方 0.5%
# 选择更保守的止损(更高的止损位)
if sl_candidate > 0:
sl_candidate = max(sl_candidate, sl_ma20)
else:
sl_candidate = sl_ma20
# 3. 验证止损距离合理性1.5%-3% ATR
if sl_candidate > 0:
sl_distance = (price - sl_candidate) / price * 100
atr_percent = atr / price * 100
# 如果止损距离太小(< 1%ATR使用 ATR 止损
if sl_distance < atr_percent * 1.0:
sl_candidate = price - atr * 1.2
# 如果止损距离过大(> 4%ATR限制在合理范围
elif sl_distance > atr_percent * 4:
sl_candidate = price - atr * 2.5
# 4. 回退到 ATR 止损
if sl_candidate <= 0:
sl_candidate = price - atr * 1.5
stop_loss = sl_candidate
# === 做多止盈(设置得很远,主要靠移动止损)===
take_profit = price * 1.15 # 15% 作为保险止盈位
elif action == 'sell':
# === 做空止损 ===
# 1. 查找近期阻力位(前高)
recent_highs = recent_data['high'].values
# 找出高于当前价的高点
valid_highs = [high for high in recent_highs if high > price * 1.01]
if valid_highs:
nearest_high = min(valid_highs) # 最近的高点
# 止损放在前高上方 0.3%
sl_candidate = nearest_high * 1.003
else:
sl_candidate = 0
# 2. 检查 MA20 作为阻力
ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns else 0
if pd.notna(ma20) and ma20 > price * 1.02:
sl_ma20 = ma20 * 1.005 # MA20 上方 0.5%
# 选择更保守的止损(更低的止损位)
if sl_candidate > 0:
sl_candidate = min(sl_candidate, sl_ma20)
else:
sl_candidate = sl_ma20
# 3. 验证止损距离合理性
if sl_candidate > 0:
sl_distance = (sl_candidate - price) / price * 100
atr_percent = atr / price * 100
# 如果止损距离太小(< 1%ATR使用 ATR 止损
if sl_distance < atr_percent * 1.0:
sl_candidate = price + atr * 1.2
# 如果止损距离过大(> 4%ATR限制在合理范围
elif sl_distance > atr_percent * 4:
sl_candidate = price + atr * 2.5
# 4. 回退到 ATR 止损
if sl_candidate <= 0:
sl_candidate = price + atr * 1.5
stop_loss = sl_candidate
# === 做空止盈(设置得很远,主要靠移动止损)===
take_profit = price * 0.85 # 15% 作为保险止盈位
return {
'stop_loss': round(stop_loss, 2),
'take_profit': round(take_profit, 2),
'method': 'structured'
}