import pandas as pd import numpy as np import talib from typing import Dict, List, Tuple, Optional, NamedTuple import logging from dataclasses import dataclass from datetime import datetime, timedelta # 配置日志输出到控制台 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('technical_analysis.log') ], force=True ) @dataclass class CoinSignal: symbol: str qualified_factors: int # 替换score为符合因子数量 reason: str entry_price: float stop_loss: float take_profit: float timeframe: str confidence: str strategy_type: str holding_period: int risk_reward_ratio: float expiry_hours: int action_suggestion: str signal_type: str = "LONG" direction: str = "BUY" class PatternSignal(NamedTuple): """技术形态信号""" pattern_type: str strength: float direction: str # "BULLISH" or "BEARISH" confidence: float description: str class VolumeAnalysis(NamedTuple): """量价分析结果""" volume_trend: str # "INCREASING", "DECREASING", "STABLE" price_volume_divergence: bool volume_breakout: bool volume_strength: float description: str class TechnicalAnalyzer: def __init__(self): self.logger = logging.getLogger(__name__) # 策略配置 - 专注于短期突破机会 self.strategies = { "突破策略": { "timeframes": ["15m", "1h", "4h"], "holding_period": 1, # 1-3天 "expiry_hours": 24, "min_score": 75, "risk_reward": 2.0 }, "量价策略": { "timeframes": ["5m", "15m", "1h"], "holding_period": 1, "expiry_hours": 12, "min_score": 70, "risk_reward": 1.5 }, "形态策略": { "timeframes": ["1h", "4h"], "holding_period": 2, "expiry_hours": 36, "min_score": 80, "risk_reward": 2.5 } } def analyze_single_coin(self, symbol: str, timeframe_data: Dict[str, pd.DataFrame], volume_24h_usd: float) -> List[CoinSignal]: """分析单个币种 - 针对每个时间框架单独分析""" try: signals = [] # 基本过滤:交易量要求 if volume_24h_usd < 5000000: # 500万USDT最低要求 return signals # 针对每个时间框架单独进行分析 target_timeframes = ["15m", "1h", "4h", "1d"] for timeframe in target_timeframes: if timeframe not in timeframe_data or len(timeframe_data[timeframe]) < 30: continue try: # 为当前时间框架进行单独的因子分析 factor_scores = self._analyze_single_timeframe_factors( timeframe_data, timeframe, symbol ) # 如果当前时间框架符合条件(6选5制度) if factor_scores['qualified_factors'] >= 5: signal = self._generate_signal_for_timeframe( symbol, timeframe_data, factor_scores, volume_24h_usd, timeframe ) if signal: signals.append(signal) self.logger.info(f"{symbol} 在 {timeframe} 时间框架发现信号: {factor_scores['qualified_factors']}/6 因子") except Exception as e: self.logger.error(f"分析{symbol}的{timeframe}时间框架时出错: {e}") continue return signals except Exception as e: self.logger.error(f"分析{symbol}时出错: {e}") return [] def _analyze_single_timeframe_factors(self, timeframe_data: Dict[str, pd.DataFrame], target_timeframe: str, symbol: str) -> Dict: """针对单个时间框架进行因子分析""" factors = { 'trend_alignment': False, # 趋势一致性 'technical_opportunity': False, # 技术指标机会 'momentum_confirmation': False, # 动量确认 'support_resistance': False, # 支撑阻力 'market_structure': False, # 市场结构 'liquidity_flow': False, # 流动性资金流 'signals': [], 'direction': 'NEUTRAL', 'primary_timeframe': target_timeframe, 'qualified_factors': 0 } try: # 使用目标时间框架的数据进行分析 df = timeframe_data[target_timeframe] # 1. 趋势一致性判断(基于当前时间框架) trend_result = self._check_single_timeframe_trend(df, target_timeframe) factors['trend_alignment'] = trend_result['qualified'] factors['signals'].extend(trend_result['signals']) primary_direction = trend_result['direction'] # 2. 技术指标机会判断 technical_result = self._check_single_timeframe_technical(df, target_timeframe) factors['technical_opportunity'] = technical_result['qualified'] factors['signals'].extend(technical_result['signals']) # 3. 动量确认判断 momentum_result = self._check_single_timeframe_momentum(df, target_timeframe) factors['momentum_confirmation'] = momentum_result['qualified'] factors['signals'].extend(momentum_result['signals']) # 4. 支撑阻力判断 sr_result = self._check_single_timeframe_support_resistance(df, target_timeframe) factors['support_resistance'] = sr_result['qualified'] factors['signals'].extend(sr_result['signals']) # 5. 市场结构判断 market_result = self._check_single_timeframe_market_structure(df, target_timeframe) factors['market_structure'] = market_result['qualified'] factors['signals'].extend(market_result['signals']) # 6. 流动性资金流判断 liquidity_result = self._check_single_timeframe_liquidity(df, target_timeframe) factors['liquidity_flow'] = liquidity_result['qualified'] factors['signals'].extend(liquidity_result['signals']) # 统计符合的因子数量 qualified_factors = sum([ factors['trend_alignment'], factors['technical_opportunity'], factors['momentum_confirmation'], factors['support_resistance'], factors['market_structure'], factors['liquidity_flow'] ]) factors['qualified_factors'] = qualified_factors # 确定信号方向(基于多数投票) if qualified_factors >= 5: bullish_signals = [s for s in factors['signals'] if '(BULLISH)' in s] bearish_signals = [s for s in factors['signals'] if '(BEARISH)' in s] if len(bullish_signals) > len(bearish_signals): factors['direction'] = 'BULLISH' elif len(bearish_signals) > len(bullish_signals): factors['direction'] = 'BEARISH' elif len(bullish_signals) == len(bearish_signals) and len(bullish_signals) > 0: # 多空信号相等时,使用主趋势方向,如果主趋势也是中性则保持中性 factors['direction'] = primary_direction if primary_direction != 'NEUTRAL' else 'NEUTRAL' else: # 没有明确信号时,保持中性 factors['direction'] = 'NEUTRAL' else: factors['direction'] = 'NEUTRAL' # 记录分析结果 status = "✅通过" if qualified_factors >= 5 else "❌未通过" bullish_count = len([s for s in factors['signals'] if '(BULLISH)' in s]) bearish_count = len([s for s in factors['signals'] if '(BEARISH)' in s]) neutral_count = len([s for s in factors['signals'] if '(NEUTRAL)' in s]) self.logger.info(f"{symbol}[{target_timeframe}] 因子分析 - 趋势:{factors['trend_alignment']}, " f"技术:{factors['technical_opportunity']}, " f"动量:{factors['momentum_confirmation']}, " f"支撑阻力:{factors['support_resistance']}, " f"市场结构:{factors['market_structure']}, " f"流动性:{factors['liquidity_flow']}, " f"符合数:{qualified_factors}/6, 方向:{factors['direction']}, " f"信号分布(多:{bullish_count}/空:{bearish_count}/中:{neutral_count}), {status}") except Exception as e: self.logger.error(f"分析{symbol}[{target_timeframe}]因子时出错: {e}") factors['qualified_factors'] = 2 factors['direction'] = 'BULLISH' factors['signals'] = [f'{target_timeframe}基础技术分析(BULLISH)'] return factors def _check_single_timeframe_trend(self, df: pd.DataFrame, timeframe: str) -> Dict: """单时间框架趋势分析 - 使用MA5, MA50, MA200""" result = {'qualified': False, 'signals': [], 'direction': 'NEUTRAL'} try: if len(df) < 200: # 需要足够数据计算MA200 return result close = df['close'].values current_price = close[-1] # 计算三条关键均线 ma5 = talib.SMA(close, timeperiod=5) ma50 = talib.SMA(close, timeperiod=50) ma200 = talib.SMA(close, timeperiod=200) # 检查均线是否有效 if (ma5 is None or ma50 is None or ma200 is None or len(ma5) == 0 or len(ma50) == 0 or len(ma200) == 0): return result # 检查是否有NaN值 if (np.isnan(ma5[-1]) or np.isnan(ma50[-1]) or np.isnan(ma200[-1])): return result bullish_conditions = 0 bearish_conditions = 0 # 条件1: 价格相对MA5位置 if current_price > ma5[-1]: bullish_conditions += 1 result['signals'].append(f"{timeframe}价格在MA5上方(BULLISH)") elif current_price < ma5[-1]: bearish_conditions += 1 result['signals'].append(f"{timeframe}价格在MA5下方(BEARISH)") # 条件2: 多头排列 MA5 > MA50 > MA200 if ma5[-1] > ma50[-1] > ma200[-1]: bullish_conditions += 2 # 多头排列给更高权重 result['signals'].append(f"{timeframe}多头均线排列MA5>MA50>MA200(BULLISH)") elif ma5[-1] < ma50[-1] < ma200[-1]: bearish_conditions += 2 # 空头排列给更高权重 result['signals'].append(f"{timeframe}空头均线排列MA5 ma50[-1]: bullish_conditions += 1 result['signals'].append(f"{timeframe}短期均线MA5在中期均线MA50上方(BULLISH)") elif ma5[-1] < ma50[-1]: bearish_conditions += 1 result['signals'].append(f"{timeframe}短期均线MA5在中期均线MA50下方(BEARISH)") # 条件4: MA50相对MA200位置 - 中长期趋势 if ma50[-1] > ma200[-1]: bullish_conditions += 1 result['signals'].append(f"{timeframe}中期均线MA50在长期均线MA200上方(BULLISH)") elif ma50[-1] < ma200[-1]: bearish_conditions += 1 result['signals'].append(f"{timeframe}中期均线MA50在长期均线MA200下方(BEARISH)") # 条件5: MA5斜率 - 短期趋势方向 if len(ma5) >= 5 and not np.isnan(ma5[-5]): ma5_slope = (ma5[-1] - ma5[-5]) / ma5[-5] * 100 if ma5_slope > 0.3: bullish_conditions += 1 result['signals'].append(f"{timeframe}MA5向上倾斜(BULLISH)") elif ma5_slope < -0.3: bearish_conditions += 1 result['signals'].append(f"{timeframe}MA5向下倾斜(BEARISH)") # 条件6: MA50斜率 - 中期趋势方向 if len(ma50) >= 10 and not np.isnan(ma50[-10]): ma50_slope = (ma50[-1] - ma50[-10]) / ma50[-10] * 100 if ma50_slope > 0.5: bullish_conditions += 1 result['signals'].append(f"{timeframe}MA50中期向上趋势(BULLISH)") elif ma50_slope < -0.5: bearish_conditions += 1 result['signals'].append(f"{timeframe}MA50中期向下趋势(BEARISH)") # 判断是否符合趋势一致性标准 (至少3个条件符合) if bullish_conditions >= 3: result['qualified'] = True result['direction'] = 'BULLISH' elif bearish_conditions >= 3: result['qualified'] = True result['direction'] = 'BEARISH' else: # 如果多空条件接近,选择主导方向 if bullish_conditions > bearish_conditions: result['qualified'] = True result['direction'] = 'BULLISH' result['signals'].append(f"{timeframe}偏多头趋势(BULLISH)") elif bearish_conditions > bullish_conditions: result['qualified'] = True result['direction'] = 'BEARISH' result['signals'].append(f"{timeframe}偏空头趋势(BEARISH)") except Exception as e: self.logger.error(f"{timeframe}趋势分析出错: {e}") return result def _check_single_timeframe_technical(self, df: pd.DataFrame, timeframe: str) -> Dict: """单时间框架技术指标分析 - 至少需要2个技术机会""" result = {'qualified': False, 'signals': []} try: if len(df) < 14: return result close = df['close'].values current_price = close[-1] opportunities_count = 0 # 1. RSI超卖超买机会 rsi = talib.RSI(close, timeperiod=14) if rsi is not None and len(rsi) > 0 and not np.isnan(rsi[-1]): current_rsi = rsi[-1] if current_rsi < 40: opportunities_count += 1 result['signals'].append(f"{timeframe}RSI超卖反弹机会({current_rsi:.1f})(BULLISH)") elif current_rsi > 60: opportunities_count += 1 result['signals'].append(f"{timeframe}RSI超买回调机会({current_rsi:.1f})(BEARISH)") # 2. 布林带机会 bb_upper, bb_middle, bb_lower = talib.BBANDS(close, timeperiod=20) if (bb_lower is not None and bb_upper is not None and len(bb_lower) > 0 and len(bb_upper) > 0 and not np.isnan(bb_lower[-1]) and not np.isnan(bb_upper[-1])): if current_price <= bb_lower[-1] * 1.01: opportunities_count += 1 result['signals'].append(f"{timeframe}布林下轨反弹机会(BULLISH)") elif current_price >= bb_upper[-1] * 0.99: opportunities_count += 1 result['signals'].append(f"{timeframe}布林上轨突破机会(BULLISH)") elif current_price > bb_upper[-1] * 1.02: opportunities_count += 1 result['signals'].append(f"{timeframe}布林上轨超买回调机会(BEARISH)") # 3. MACD信号 macd, signal, _ = talib.MACD(close) if (macd is not None and signal is not None and len(macd) >= 3 and len(signal) >= 3 and not np.isnan(macd[-1]) and not np.isnan(macd[-2]) and not np.isnan(signal[-1]) and not np.isnan(signal[-2])): if macd[-1] > signal[-1] and macd[-2] <= signal[-2]: opportunities_count += 1 result['signals'].append(f"{timeframe}MACD金叉信号(BULLISH)") elif macd[-1] < signal[-1] and macd[-2] >= signal[-2]: opportunities_count += 1 result['signals'].append(f"{timeframe}MACD死叉信号(BEARISH)") # 4. KDJ指标 high = df['high'].values low = df['low'].values if len(high) >= 14 and len(low) >= 14: k, d = talib.STOCH(high, low, close, fastk_period=14, slowk_period=3, slowd_period=3) if (k is not None and d is not None and len(k) >= 3 and len(d) >= 3 and not np.isnan(k[-1]) and not np.isnan(d[-1]) and not np.isnan(k[-2]) and not np.isnan(d[-2])): current_k = k[-1] current_d = d[-1] # KDJ超卖 if current_k < 20 and current_d < 20: opportunities_count += 1 result['signals'].append(f"{timeframe}KDJ超卖反弹机会(BULLISH)") # KDJ超买 elif current_k > 80 and current_d > 80: opportunities_count += 1 result['signals'].append(f"{timeframe}KDJ超买回调机会(BEARISH)") # KDJ金叉死叉 elif k[-1] > d[-1] and k[-2] <= d[-2]: opportunities_count += 1 result['signals'].append(f"{timeframe}KDJ金叉信号(BULLISH)") elif k[-1] < d[-1] and k[-2] >= d[-2]: opportunities_count += 1 result['signals'].append(f"{timeframe}KDJ死叉信号(BEARISH)") # 5. 威廉指标 %R williams_r = talib.WILLR(high, low, close, timeperiod=14) if williams_r is not None and len(williams_r) > 0 and not np.isnan(williams_r[-1]): current_wr = williams_r[-1] if current_wr < -80: # 超卖 opportunities_count += 1 result['signals'].append(f"{timeframe}威廉指标超卖反弹机会({current_wr:.1f})(BULLISH)") elif current_wr > -20: # 超买 opportunities_count += 1 result['signals'].append(f"{timeframe}威廉指标超买回调机会({current_wr:.1f})(BEARISH)") # 6. CCI商品通道指标 cci = talib.CCI(high, low, close, timeperiod=14) if cci is not None and len(cci) > 0 and not np.isnan(cci[-1]): current_cci = cci[-1] if current_cci < -100: # 超卖 opportunities_count += 1 result['signals'].append(f"{timeframe}CCI超卖反弹机会({current_cci:.0f})(BULLISH)") elif current_cci > 100: # 超买 opportunities_count += 1 result['signals'].append(f"{timeframe}CCI超买回调机会({current_cci:.0f})(BEARISH)") # 至少需要2个技术机会才算通过 result['qualified'] = opportunities_count >= 2 except Exception as e: self.logger.error(f"{timeframe}技术指标分析出错: {e}") return result def _check_single_timeframe_momentum(self, df: pd.DataFrame, timeframe: str) -> Dict: """单时间框架动量分析""" result = {'qualified': False, 'signals': []} try: if len(df) < 10: return result close = df['close'].values momentum_signals = 0 # 1. 价格动量检查 if len(close) >= 5: momentum_5 = (close[-1] - close[-5]) / close[-5] * 100 if abs(momentum_5) > 0.5: momentum_signals += 1 if momentum_5 > 0: result['signals'].append(f"{timeframe}正向动量({momentum_5:.1f}%)(BULLISH)") else: result['signals'].append(f"{timeframe}负向动量({momentum_5:.1f}%)(BEARISH)") # 2. 成交量确认 - 区分上涨和下跌 if 'volume' in df.columns and len(df) >= 5: volumes = df['volume'].values vol_avg = np.mean(volumes[-5:]) current_vol = volumes[-1] if current_vol > vol_avg * 1.3: momentum_signals += 1 # 判断是放量上涨还是放量下跌 if len(close) >= 2: price_change = (close[-1] - close[-2]) / close[-2] * 100 if price_change > 0: result['signals'].append(f"{timeframe}放量上涨确认(BULLISH)") else: result['signals'].append(f"{timeframe}放量下跌确认(BEARISH)") else: result['signals'].append(f"{timeframe}成交量放大确认(NEUTRAL)") # 至少1个动量信号就符合(放宽条件) result['qualified'] = momentum_signals >= 1 if not result['qualified'] and momentum_signals >= 0: result['qualified'] = True result['signals'].append(f"{timeframe}基础动量确认(BULLISH)") except Exception as e: self.logger.error(f"{timeframe}动量分析出错: {e}") return result def _check_single_timeframe_market_structure(self, df: pd.DataFrame, timeframe: str) -> Dict: """单时间框架市场结构分析""" try: signals = [] qualified_conditions = 0 if len(df) < 20: return {'qualified': False, 'signals': []} current_price = df['close'].iloc[-1] # 1. 成交量趋势分析 volume_ma_5 = df['volume'].rolling(5).mean().iloc[-1] volume_ma_20 = df['volume'].rolling(20).mean().iloc[-1] volume_trend_strength = (volume_ma_5 / volume_ma_20) if volume_ma_20 > 0 else 1 if volume_trend_strength > 1.3: signals.append(f"{timeframe}成交量趋势向上(BULLISH)") qualified_conditions += 1 elif volume_trend_strength < 0.7: signals.append(f"{timeframe}成交量萎缩待变盘(NEUTRAL)") qualified_conditions += 0.5 # 2. 价格结构分析 highs = df['high'].tail(10).values lows = df['low'].tail(10).values recent_highs = highs[-5:] high_trend = np.polyfit(range(len(recent_highs)), recent_highs, 1)[0] recent_lows = lows[-5:] low_trend = np.polyfit(range(len(recent_lows)), recent_lows, 1)[0] if high_trend > 0 and low_trend > 0: signals.append(f"{timeframe}高低点同步抬升(BULLISH)") qualified_conditions += 1 elif high_trend < 0 and low_trend < 0: signals.append(f"{timeframe}高低点同步下降(BEARISH)") qualified_conditions += 1 elif abs(high_trend) < current_price * 0.001 and abs(low_trend) < current_price * 0.001: signals.append(f"{timeframe}横盘整理待突破(NEUTRAL)") qualified_conditions += 0.5 # 3. 波动率分析 try: atr_values = talib.ATR(df['high'].values, df['low'].values, df['close'].values, timeperiod=14) if atr_values is not None and len(atr_values) > 0 and not np.isnan(atr_values[-1]): atr_current = atr_values[-1] atr_recent = atr_values[-20:] if len(atr_values) >= 20 else atr_values atr_recent_clean = atr_recent[~np.isnan(atr_recent)] if len(atr_recent_clean) > 0: atr_ma = np.mean(atr_recent_clean) else: atr_ma = atr_current else: # 如果ATR计算失败,跳过波动率分析 atr_current = None atr_ma = None except Exception: atr_current = None atr_ma = None if atr_current is not None and atr_ma is not None and atr_ma > 0: volatility_ratio = atr_current / atr_ma if volatility_ratio < 0.7: signals.append(f"{timeframe}波动率收缩突破在即(BULLISH)") qualified_conditions += 1 elif volatility_ratio > 1.5: signals.append(f"{timeframe}波动率扩张趋势启动(BULLISH)") qualified_conditions += 1 qualified = qualified_conditions >= 2.0 if not signals: signals.append(f"{timeframe}市场结构分析无明显信号(NEUTRAL)") return { 'qualified': qualified, 'signals': signals[:3] } except Exception as e: self.logger.error(f"{timeframe}市场结构分析出错: {e}") return {'qualified': False, 'signals': [f"{timeframe}市场结构分析异常(NEUTRAL)"]} def _check_single_timeframe_support_resistance(self, df: pd.DataFrame, timeframe: str) -> Dict: """单时间框架支撑阻力分析""" result = {'qualified': False, 'signals': []} try: if len(df) < 20: return result high_prices = df['high'].values low_prices = df['low'].values close_prices = df['close'].values current_price = close_prices[-1] technical_opportunity = False # 1. 近期高低点机会 recent_high_10 = max(high_prices[-10:]) recent_low_10 = min(low_prices[-10:]) recent_high_5 = max(high_prices[-5:]) recent_low_5 = min(low_prices[-5:]) # 接近近期低点 - 反弹机会 if current_price <= recent_low_5 * 1.02: technical_opportunity = True result['signals'].append(f"{timeframe}接近近期低点反弹机会(BULLISH)") elif current_price <= recent_low_10 * 1.03: technical_opportunity = True result['signals'].append(f"{timeframe}接近10日低点反弹机会(BULLISH)") # 突破近期高点 - 突破机会 elif current_price >= recent_high_5 * 0.99: technical_opportunity = True result['signals'].append(f"{timeframe}突破近期高点(BULLISH)") elif current_price >= recent_high_10 * 0.98: technical_opportunity = True result['signals'].append(f"{timeframe}接近10日高点突破(BULLISH)") # 跌破近期低点 - 继续下跌机会 elif current_price <= recent_low_5 * 0.98: technical_opportunity = True result['signals'].append(f"{timeframe}跌破近期低点继续下跌(BEARISH)") elif current_price <= recent_low_10 * 0.97: technical_opportunity = True result['signals'].append(f"{timeframe}跌破10日低点继续下跌(BEARISH)") # 接近近期高点 - 回调机会 elif current_price >= recent_high_5 * 0.97: technical_opportunity = True result['signals'].append(f"{timeframe}接近近期高点回调机会(BEARISH)") elif current_price >= recent_high_10 * 0.96: technical_opportunity = True result['signals'].append(f"{timeframe}接近10日高点回调机会(BEARISH)") # 2. 移动平均线支撑阻力 if len(close_prices) >= 20: ma5 = talib.SMA(close_prices, timeperiod=5) ma20 = talib.SMA(close_prices, timeperiod=20) # MA支撑确认 - 添加None检查 if (ma5 is not None and len(ma5) > 0 and not np.isnan(ma5[-1]) and current_price > ma5[-1] * 0.995): technical_opportunity = True result['signals'].append(f"{timeframe}MA5支撑确认(BULLISH)") # MA20关键位 - 添加None检查 if (ma20 is not None and len(ma20) > 0 and not np.isnan(ma20[-1]) and abs(current_price - ma20[-1]) / ma20[-1] < 0.01): # 在MA20附近1% technical_opportunity = True result['signals'].append(f"{timeframe}MA20关键位(BULLISH)") # 3. 价格位置分析 price_range = recent_high_10 - recent_low_10 if price_range > 0: position = (current_price - recent_low_10) / price_range # 在区间下部 - 反弹机会 if position < 0.3: technical_opportunity = True result['signals'].append(f"{timeframe}区间底部反弹机会(BULLISH)") # 在区间上部 - 突破机会 elif position > 0.7: technical_opportunity = True result['signals'].append(f"{timeframe}区间顶部突破机会(BULLISH)") result['qualified'] = technical_opportunity except Exception as e: self.logger.error(f"{timeframe}支撑阻力判断出错: {e}") return result def _check_single_timeframe_liquidity(self, df: pd.DataFrame, timeframe: str) -> Dict: """单时间框架流动性分析""" try: signals = [] qualified_conditions = 0 if len(df) < 20: return {'qualified': False, 'signals': []} current_volume = df['volume'].iloc[-1] current_price = df['close'].iloc[-1] # 1. 成交量异常检测 volume_ma_10 = df['volume'].rolling(10).mean().iloc[-1] volume_ma_20 = df['volume'].rolling(20).mean().iloc[-1] volume_surge_ratio = current_volume / volume_ma_20 if volume_ma_20 > 0 else 1 if volume_surge_ratio > 3: signals.append(f"{timeframe}成交量激增{volume_surge_ratio:.1f}倍(BULLISH)") qualified_conditions += 1 elif volume_surge_ratio > 2: signals.append(f"{timeframe}成交量显著放大{volume_surge_ratio:.1f}倍(BULLISH)") qualified_conditions += 0.5 elif volume_surge_ratio < 0.5: signals.append(f"{timeframe}成交量严重萎缩观望(NEUTRAL)") qualified_conditions += 0 # 2. 资金流向分析 money_flow_positive = 0 money_flow_negative = 0 for i in range(-5, 0): typical_price = (df['high'].iloc[i] + df['low'].iloc[i] + df['close'].iloc[i]) / 3 prev_typical_price = (df['high'].iloc[i-1] + df['low'].iloc[i-1] + df['close'].iloc[i-1]) / 3 raw_money_flow = typical_price * df['volume'].iloc[i] if typical_price > prev_typical_price: money_flow_positive += raw_money_flow else: money_flow_negative += raw_money_flow money_flow_index = money_flow_positive / (money_flow_positive + money_flow_negative) if (money_flow_positive + money_flow_negative) > 0 else 0.5 if money_flow_index > 0.65: signals.append(f"{timeframe}资金持续净流入(BULLISH)") qualified_conditions += 1 elif money_flow_index < 0.35: signals.append(f"{timeframe}资金持续净流出(BEARISH)") qualified_conditions += 1 qualified = qualified_conditions >= 1.5 if not signals: signals.append(f"{timeframe}流动性分析无明显信号(NEUTRAL)") return { 'qualified': qualified, 'signals': signals[:3] } except Exception as e: self.logger.error(f"{timeframe}流动性分析出错: {e}") return {'qualified': False, 'signals': [f"{timeframe}流动性分析异常(NEUTRAL)"]} def _generate_signal_for_timeframe(self, symbol: str, timeframe_data: Dict[str, pd.DataFrame], factor_scores: Dict, volume_24h_usd: float, target_timeframe: str) -> Optional[CoinSignal]: """为指定时间框架生成交易信号""" try: df = timeframe_data[target_timeframe] current_price = df['close'].iloc[-1] # 计算ATR,添加错误处理 try: atr_values = talib.ATR(df['high'].values, df['low'].values, df['close'].values, timeperiod=14) if atr_values is not None and len(atr_values) > 0 and not np.isnan(atr_values[-1]): atr = atr_values[-1] else: # 如果ATR计算失败,使用价格的简单波动率 price_range = df['high'].iloc[-10:].max() - df['low'].iloc[-10:].min() atr = price_range / 10 # 简单平均波动率 except Exception: # 备用方案:使用最近10日价格范围的平均值 price_range = df['high'].iloc[-10:].max() - df['low'].iloc[-10:].min() atr = price_range / 10 # 确定信号方向 direction = factor_scores['direction'] if direction == 'NEUTRAL': return None # 计算支撑阻力位和斐波那契位 support_resistance = self._calculate_support_resistance_levels(df) fibonacci_levels = self._calculate_fibonacci_levels(df) # 计算最优入场价格 entry_price = self._calculate_optimal_entry(current_price, support_resistance, fibonacci_levels, direction) if entry_price is None or entry_price <= 0: entry_price = current_price * 1.002 self.logger.warning(f"{symbol}[{target_timeframe}] 使用当前价格的小幅溢价作为入场价格: {entry_price:.8f}") # 根据时间框架调整策略参数 timeframe_config = { '15m': {'holding_period': 1, 'expiry_hours': 4, 'strategy_prefix': '短线'}, '1h': {'holding_period': 1, 'expiry_hours': 8, 'strategy_prefix': '小时线'}, '4h': {'holding_period': 2, 'expiry_hours': 24, 'strategy_prefix': '4小时'}, '1d': {'holding_period': 3, 'expiry_hours': 72, 'strategy_prefix': '日线'} } config = timeframe_config.get(target_timeframe, timeframe_config['1h']) # 智能止损止盈计算(与原函数相同) if direction == 'BULLISH': signal_type = "LONG" action_direction = "BUY" atr_stop = entry_price - (atr * 2.5) min_stop = entry_price * 0.992 pattern_stop = None nearest_support = support_resistance.get('nearest_support') if nearest_support and nearest_support < entry_price and nearest_support > entry_price * 0.95: pattern_stop = nearest_support * 0.998 if pattern_stop: stop_loss = max(pattern_stop, min_stop) else: stop_loss = max(atr_stop, min_stop) atr_target = entry_price + (atr * 4) min_target = entry_price * 1.02 pattern_target = None nearest_resistance = support_resistance.get('nearest_resistance') if nearest_resistance and nearest_resistance > entry_price and nearest_resistance < entry_price * 1.15: pattern_target = nearest_resistance * 0.998 target_candidates = [min_target, atr_target] if pattern_target: target_candidates.append(pattern_target) valid_targets = [t for t in target_candidates if t <= entry_price * 1.15] take_profit = max(valid_targets) if valid_targets else min_target price_diff_pct = ((entry_price - current_price) / current_price) * 100 if price_diff_pct < -1: action_suggestion = f"等待回调至{entry_price:.4f}买入(当前{current_price:.4f},需回调{abs(price_diff_pct):.1f}%)" else: action_suggestion = f"技术位{entry_price:.4f}附近分批买入" else: # BEARISH signal_type = "SHORT" action_direction = "SELL" atr_stop = entry_price + (atr * 2.5) min_stop = entry_price * 1.008 pattern_stop = None nearest_resistance = support_resistance.get('nearest_resistance') if nearest_resistance and nearest_resistance > entry_price and nearest_resistance < entry_price * 1.05: pattern_stop = nearest_resistance * 1.002 if pattern_stop: stop_loss = min(pattern_stop, min_stop) else: stop_loss = min(atr_stop, min_stop) atr_target = entry_price - (atr * 4) min_target = entry_price * 0.98 pattern_target = None nearest_support = support_resistance.get('nearest_support') if nearest_support and nearest_support < entry_price and nearest_support > entry_price * 0.85: pattern_target = nearest_support * 1.002 target_candidates = [min_target, atr_target] if pattern_target: target_candidates.append(pattern_target) valid_targets = [t for t in target_candidates if t >= entry_price * 0.85] take_profit = min(valid_targets) if valid_targets else min_target price_diff_pct = ((entry_price - current_price) / current_price) * 100 if price_diff_pct > 1: action_suggestion = f"等待反弹至{entry_price:.4f}做空(当前{current_price:.4f},需反弹{price_diff_pct:.1f}%)" else: action_suggestion = f"技术位{entry_price:.4f}附近分批做空" # 计算风险回报比 risk = abs(entry_price - stop_loss) reward = abs(take_profit - entry_price) risk_reward = reward / risk if risk > 0 else 2.0 # 确定策略类型 qualified_factors = factor_scores['qualified_factors'] if qualified_factors >= 6: strategy_type = f"{config['strategy_prefix']}极品信号" confidence = "极高" elif qualified_factors == 5: strategy_type = f"{config['strategy_prefix']}优质信号" confidence = "高" else: strategy_type = f"{config['strategy_prefix']}标准信号" confidence = "中" # 生成理由 all_signals = factor_scores.get('signals', []) if signal_type == "LONG": valid_signals = [s for s in all_signals if '(BULLISH)' in s or '(NEUTRAL)' in s] else: valid_signals = [s for s in all_signals if '(BEARISH)' in s or '(NEUTRAL)' in s] reason_parts = valid_signals[:3] if valid_signals else [f'{target_timeframe}多时间框架技术分析确认机会'] factor_summary = f"符合{qualified_factors}项因子" reason = f"{factor_summary}: {'; '.join(reason_parts)}" return CoinSignal( symbol=symbol, qualified_factors=qualified_factors, reason=reason, entry_price=entry_price, stop_loss=stop_loss, take_profit=take_profit, timeframe=target_timeframe, confidence=confidence, strategy_type=strategy_type, holding_period=config['holding_period'], risk_reward_ratio=risk_reward, expiry_hours=config['expiry_hours'], action_suggestion=action_suggestion, signal_type=signal_type, direction=action_direction ) except Exception as e: self.logger.error(f"生成{symbol}[{target_timeframe}]信号时出错: {e}") return None def _calculate_support_resistance_levels(self, df: pd.DataFrame) -> Dict: """支撑阻力判断 - 关键技术位机会""" result = {'qualified': False, 'signals': []} try: if len(df) < 20: return result high_prices = df['high'].values low_prices = df['low'].values close_prices = df['close'].values current_price = close_prices[-1] technical_opportunity = False # 1. 近期高低点机会 recent_high_10 = max(high_prices[-10:]) recent_low_10 = min(low_prices[-10:]) recent_high_5 = max(high_prices[-5:]) recent_low_5 = min(low_prices[-5:]) # 接近近期低点 - 反弹机会 if current_price <= recent_low_5 * 1.02: technical_opportunity = True result['signals'].append("接近近期低点反弹机会(BULLISH)") elif current_price <= recent_low_10 * 1.03: technical_opportunity = True result['signals'].append("接近10日低点反弹机会(BULLISH)") # 突破近期高点 - 突破机会 if current_price >= recent_high_5 * 0.99: technical_opportunity = True result['signals'].append("突破近期高点(BULLISH)") elif current_price >= recent_high_10 * 0.98: technical_opportunity = True result['signals'].append("接近10日高点突破(BULLISH)") # 2. 移动平均线支撑阻力 if len(close_prices) >= 20: ma5 = talib.SMA(close_prices, timeperiod=5) ma20 = talib.SMA(close_prices, timeperiod=20) # MA支撑确认 - 添加None检查 if (ma5 is not None and len(ma5) > 0 and not np.isnan(ma5[-1]) and current_price > ma5[-1] * 0.995): technical_opportunity = True result['signals'].append("MA5支撑确认(BULLISH)") # MA20关键位 - 添加None检查 if (ma20 is not None and len(ma20) > 0 and not np.isnan(ma20[-1]) and abs(current_price - ma20[-1]) / ma20[-1] < 0.01): # 在MA20附近1% technical_opportunity = True result['signals'].append("MA20关键位(BULLISH)") # 3. 价格位置分析 price_range = recent_high_10 - recent_low_10 if price_range > 0: position = (current_price - recent_low_10) / price_range # 在区间下部 - 反弹机会 if position < 0.3: technical_opportunity = True result['signals'].append("区间底部反弹机会(BULLISH)") # 在区间上部 - 突破机会 elif position > 0.7: technical_opportunity = True result['signals'].append("区间顶部突破机会(BULLISH)") result['qualified'] = technical_opportunity except Exception as e: self.logger.error(f"支撑阻力判断出错: {e}") return result def _calculate_fibonacci_levels(self, df: pd.DataFrame) -> Dict: """计算斐波那契回调位""" try: # 使用最近50根K线计算斐波那契位 lookback = min(50, len(df)) recent_data = df.tail(lookback) highs = recent_data['high'].values lows = recent_data['low'].values current_price = df['close'].iloc[-1] # 找出最高点和最低点 swing_high = np.max(highs) swing_low = np.min(lows) # 判断当前趋势方向 price_change = (current_price - recent_data['close'].iloc[0]) / recent_data['close'].iloc[0] # 斐波那契比例 fib_ratios = { 'level_0': 0.0, # 0% 'level_236': 0.236, # 23.6% 'level_382': 0.382, # 38.2% 'level_500': 0.500, # 50% 'level_618': 0.618, # 61.8% 'level_786': 0.786, # 78.6% 'level_100': 1.0 # 100% } fib_levels = {} if price_change > 0: # 上涨趋势,计算回调位 range_size = swing_high - swing_low for name, ratio in fib_ratios.items(): fib_levels[name] = swing_high - (range_size * ratio) # 扩展目标位(用于止盈) fib_levels['target_1272'] = swing_high + (range_size * 0.272) # 127.2% fib_levels['target_1414'] = swing_high + (range_size * 0.414) # 141.4% fib_levels['target_618'] = swing_high + (range_size * 0.618) # 161.8% # 调试日志 self.logger.info(f"上涨趋势斐波那契计算: 当前价{current_price:.4f}, 高点{swing_high:.4f}, 低点{swing_low:.4f}") self.logger.info(f"38.2%回调位: {fib_levels.get('level_382', 0):.4f}, 50%回调位: {fib_levels.get('level_500', 0):.4f}") else: # 下跌趋势,计算反弹位 range_size = swing_high - swing_low for name, ratio in fib_ratios.items(): fib_levels[name] = swing_low + (range_size * ratio) # 扩展目标位(用于空头止盈) fib_levels['target_1272'] = swing_low - (range_size * 0.272) fib_levels['target_1414'] = swing_low - (range_size * 0.414) fib_levels['target_618'] = swing_low - (range_size * 0.618) # 调试日志 self.logger.info(f"下跌趋势斐波那契计算: 当前价{current_price:.4f}, 高点{swing_high:.4f}, 低点{swing_low:.4f}") self.logger.info(f"38.2%反弹位: {fib_levels.get('level_382', 0):.4f}, 50%反弹位: {fib_levels.get('level_500', 0):.4f}") fib_levels['swing_high'] = swing_high fib_levels['swing_low'] = swing_low fib_levels['trend_direction'] = 'bullish' if price_change > 0 else 'bearish' return fib_levels except Exception as e: self.logger.error(f"计算斐波那契位出错: {e}") return { 'level_618': None, 'target_618': None, 'trend_direction': 'neutral' } def _calculate_optimal_entry(self, current_price: float, support_resistance: Dict, fibonacci_levels: Dict, direction: str) -> float: """计算最优入场价格 - 斐波那契关键位优先""" try: if direction == 'BULLISH': # 多头入场:优先使用斐波那契关键位 entry_candidates = [] # 1. 斐波那契关键回调位(最高优先级) fib_levels = { 'level_382': {'name': '38.2%回调位', 'priority': 1}, 'level_500': {'name': '50%回调位', 'priority': 2}, 'level_618': {'name': '61.8%回调位', 'priority': 3} } for level_key, level_info in fib_levels.items(): if fibonacci_levels.get(level_key): fib_price = fibonacci_levels[level_key] # 多头入场:只考虑低于当前价格的斐波那契位(回调位或支撑位) if current_price * 0.70 <= fib_price <= current_price * 0.99: entry_candidates.append({ 'price': fib_price, 'type': level_info['name'], 'priority': level_info['priority'] }) # 2. K线形态平台支撑位(次优先级) if support_resistance.get('nearest_support'): support_level = support_resistance['nearest_support'] # 只要支撑位低于当前价格就可以考虑 if support_level < current_price: entry_candidates.append({ 'price': support_level, 'type': 'K线形态支撑位', 'priority': 4 }) # 3. 其他技术支撑位(最低优先级) all_supports = support_resistance.get('all_support', []) for support in all_supports[:3]: # 只考虑前3个支撑位 if current_price * 0.70 <= support <= current_price * 0.99: entry_candidates.append({ 'price': support, 'type': '次级技术支撑位', 'priority': 5 }) # 按优先级排序,优先级相同时选择价格更高的(风险更小) if entry_candidates: entry_candidates.sort(key=lambda x: (x['priority'], -x['price'])) best_entry = entry_candidates[0] # 记录选择的技术位置类型 self.logger.info(f"选择入场位置: {best_entry['type']} - {best_entry['price']:.4f}") return best_entry['price'] # 如果没有找到技术位,使用当前价格的小幅折扣 fallback_entry = current_price * 0.995 # 0.5%折扣 self.logger.info(f"使用备选入场价格: {fallback_entry:.4f} (当前价格{current_price:.4f}的小幅折扣)") return fallback_entry else: # BEARISH 空头入场 entry_candidates = [] # 1. 斐波那契关键反弹位(最高优先级) fib_levels = { 'level_382': {'name': '38.2%反弹位', 'priority': 1}, 'level_500': {'name': '50%反弹位', 'priority': 2}, 'level_618': {'name': '61.8%反弹位', 'priority': 3} } for level_key, level_info in fib_levels.items(): if fibonacci_levels.get(level_key): fib_price = fibonacci_levels[level_key] # 空头入场:只考虑高于当前价格的斐波那契位(反弹位或阻力位) if current_price * 1.01 <= fib_price <= current_price * 1.30: entry_candidates.append({ 'price': fib_price, 'type': level_info['name'], 'priority': level_info['priority'] }) # 2. K线形态平台阻力位(次优先级) if support_resistance.get('nearest_resistance'): resistance_level = support_resistance['nearest_resistance'] # 只要阻力位高于当前价格就可以考虑 if resistance_level > current_price: entry_candidates.append({ 'price': resistance_level, 'type': 'K线形态阻力位', 'priority': 4 }) # 3. 其他技术阻力位(最低优先级) all_resistances = support_resistance.get('all_resistance', []) for resistance in all_resistances[:3]: # 只考虑前3个阻力位 if current_price * 1.01 <= resistance <= current_price * 1.30: entry_candidates.append({ 'price': resistance, 'type': '次级技术阻力位', 'priority': 5 }) # 按优先级排序,优先级相同时选择价格更低的(风险更小) if entry_candidates: entry_candidates.sort(key=lambda x: (x['priority'], x['price'])) best_entry = entry_candidates[0] # 记录选择的技术位置类型 self.logger.info(f"选择入场位置: {best_entry['type']} - {best_entry['price']:.4f}") return best_entry['price'] # 如果没有找到技术位,使用当前价格的小幅溢价 fallback_entry = current_price * 1.005 # 0.5%溢价 self.logger.info(f"使用备选入场价格: {fallback_entry:.8f} (当前价格{current_price:.8f}的小幅溢价)") return fallback_entry except Exception as e: self.logger.error(f"计算最优入场价格出错: {e}") return None def _determine_optimal_primary_timeframe(self, timeframe_data: Dict[str, pd.DataFrame], factor_scores: Dict) -> str: """动态确定最佳主要时间框架""" available_timeframes = list(timeframe_data.keys()) # 时间框架优先级(根据信号强度和实用性) timeframe_priority = { '15m': {'weight': 1, 'use_case': '短线突破、快速反弹'}, '1h': {'weight': 3, 'use_case': '日内交易、技术分析主力'}, '4h': {'weight': 2, 'use_case': '中线趋势、形态分析'}, '1d': {'weight': 1, 'use_case': '长线趋势、重要支撑阻力'} } # 分析信号内容,确定最适合的时间框架 signals = factor_scores.get('signals', []) qualified_factors = factor_scores.get('qualified_factors', 0) # 策略:根据信号特征选择时间框架 if any('动量' in signal or '突破' in signal for signal in signals): # 动量和突破信号适合较小时间框架 preferred_timeframes = ['15m', '1h', '4h'] elif any('趋势' in signal or '均线' in signal for signal in signals): # 趋势信号适合中等时间框架 preferred_timeframes = ['1h', '4h', '1d'] elif any('支撑' in signal or '阻力' in signal or '形态' in signal for signal in signals): # 支撑阻力和形态信号适合较大时间框架 preferred_timeframes = ['4h', '1h', '1d'] else: # 默认偏好 preferred_timeframes = ['1h', '4h', '15m', '1d'] # 根据符合因子数量调整时间框架选择 if qualified_factors == 4: # 完美信号,优先使用15m捕捉快速机会 preferred_timeframes = ['15m', '1h'] + [tf for tf in preferred_timeframes if tf not in ['15m', '1h']] elif qualified_factors == 3: # 标准信号,使用1h作为主力 preferred_timeframes = ['1h', '4h'] + [tf for tf in preferred_timeframes if tf not in ['1h', '4h']] # 选择第一个可用的偏好时间框架 for tf in preferred_timeframes: if tf in available_timeframes: return tf # 如果没有偏好时间框架,按优先级选择 best_timeframe = '1h' # 默认 best_weight = 0 for tf in available_timeframes: if tf in timeframe_priority: weight = timeframe_priority[tf]['weight'] if weight > best_weight: best_weight = weight best_timeframe = tf return best_timeframe def _select_analysis_primary_timeframe(self, available_timeframes: List[str]) -> str: """为多时间框架分析选择主要时间框架""" # 分析时间框架的优先级顺序 preferred_order = ['1h', '4h', '15m', '1d'] # 选择第一个可用的时间框架 for tf in preferred_order: if tf in available_timeframes: return tf # 如果都没有,返回第一个可用的 return available_timeframes[0] if available_timeframes else '1h' def select_coins(self, market_data: Dict[str, Dict]) -> Dict[str, List[CoinSignal]]: """选币主函数""" all_signals = [] long_signals = [] short_signals = [] self.logger.info(f"开始分析{len(market_data)}个币种...") for symbol, data in market_data.items(): try: timeframe_data = data.get('timeframes', {}) volume_24h_usd = data.get('volume_24h_usd', 0) signals = self.analyze_single_coin(symbol, timeframe_data, volume_24h_usd) for signal in signals: all_signals.append(signal) if signal.signal_type == "LONG": long_signals.append(signal) else: short_signals.append(signal) except Exception as e: self.logger.error(f"分析{symbol}时出错: {e}") continue # 按符合因子数量排序 all_signals.sort(key=lambda x: x.qualified_factors, reverse=True) long_signals.sort(key=lambda x: x.qualified_factors, reverse=True) short_signals.sort(key=lambda x: x.qualified_factors, reverse=True) # 移除数量限制 - 返回所有符合条件的信号 self.logger.info(f"选币完成 - 总信号:{len(all_signals)}, 多头:{len(long_signals)}, 空头:{len(short_signals)}") return { 'all': all_signals, 'long': long_signals, 'short': short_signals } def get_optimal_timeframes_for_analysis(self) -> List[str]: """获取最优分析时间周期 - 多时间框架策略""" return ["15m", "1h", "4h", "1d"] def test_strategy_distribution(self, market_data: Dict[str, Dict]): """测试策略分布""" self.logger.info("测试新策略分布...") signals_result = self.select_coins(market_data) all_signals = signals_result.get('all', []) if all_signals: self.logger.info(f"新策略共发现{len(all_signals)}个信号") for signal in all_signals[:5]: self.logger.info(f" {signal.symbol}: {signal.qualified_factors}/4因子 - {signal.reason}") else: self.logger.warning("新策略未发现任何信号")