diff --git a/coin_selection_engine.py b/coin_selection_engine.py index 29593c7..7e0c753 100644 --- a/coin_selection_engine.py +++ b/coin_selection_engine.py @@ -102,12 +102,14 @@ class CoinSelectionEngine: # 6. 保存选币结果到数据库 self.logger.info(f"保存{len(all_signals)}个选币结果到数据库...") saved_count = 0 + saved_long = 0 + saved_short = 0 for signal in all_signals: try: selection_id = self.db.insert_coin_selection( symbol=signal.symbol, - score=signal.score, + qualified_factors=signal.qualified_factors, reason=signal.reason, entry_price=signal.entry_price, stop_loss=signal.stop_loss, @@ -125,24 +127,30 @@ class CoinSelectionEngine: self.logger.info(f"保存{signal.symbol}({signal.strategy_type}-{signal_type_cn})选币结果,ID: {selection_id}") saved_count += 1 + # 统计实际保存的多空数量 + if signal.signal_type == "LONG": + saved_long += 1 + else: + saved_short += 1 + except Exception as e: self.logger.error(f"保存{signal.symbol}选币结果失败: {e}") # 检查并标记过期的选币 self.db.check_and_expire_selections() - self.logger.info(f"选币完成!成功保存{saved_count}个信号(多头: {len(long_signals)}个, 空头: {len(short_signals)}个)") + self.logger.info(f"选币完成!成功保存{saved_count}个信号(多头: {saved_long}个, 空头: {saved_short}个)") - # # 发送钉钉通知 - # try: - # self.logger.info("发送钉钉通知...") - # notification_sent = self.dingtalk_notifier.send_coin_selection_notification(all_signals) - # if notification_sent: - # self.logger.info("✅ 钉钉通知发送成功") - # else: - # self.logger.info("📱 钉钉通知发送失败或未配置") - # except Exception as e: - # self.logger.error(f"发送钉钉通知时出错: {e}") + # 发送钉钉通知 + try: + self.logger.info("发送钉钉通知...") + notification_sent = self.dingtalk_notifier.send_coin_selection_notification(all_signals) + if notification_sent: + self.logger.info("✅ 钉钉通知发送成功") + else: + self.logger.info("📱 钉钉通知发送失败或未配置") + except Exception as e: + self.logger.error(f"发送钉钉通知时出错: {e}") return all_signals @@ -159,18 +167,18 @@ class CoinSelectionEngine: key = f"{strategy}-{signal_type}" if key not in strategy_stats: - strategy_stats[key] = {'count': 0, 'avg_score': 0, 'scores': []} + strategy_stats[key] = {'count': 0, 'avg_factors': 0, 'factors': []} strategy_stats[key]['count'] += 1 - strategy_stats[key]['scores'].append(signal.score) + strategy_stats[key]['factors'].append(signal.qualified_factors) - # 计算平均分数 + # 计算平均符合因子数 for key, stats in strategy_stats.items(): - stats['avg_score'] = sum(stats['scores']) / len(stats['scores']) + stats['avg_factors'] = sum(stats['factors']) / len(stats['factors']) self.logger.info("策略分布统计:") for key, stats in sorted(strategy_stats.items(), key=lambda x: x[1]['count'], reverse=True): - self.logger.info(f" {key}: {stats['count']}个信号, 平均分数: {stats['avg_score']:.1f}") + self.logger.info(f" {key}: {stats['count']}个信号, 平均符合因子: {stats['avg_factors']:.1f}/4") def run_strategy_specific_analysis(self, symbols: List[str], strategy_name: str) -> List[CoinSignal]: """针对特定策略运行专门的分析""" @@ -217,7 +225,7 @@ class CoinSelectionEngine: conn = self.db.get_connection() cursor = conn.cursor() cursor.execute(''' - SELECT id, symbol, score, reason, entry_price, stop_loss, take_profit, + SELECT id, symbol, qualified_factors, reason, entry_price, stop_loss, take_profit, timeframe, selection_time, status, actual_entry_price, exit_price, exit_time, pnl_percentage, notes, strategy_type, holding_period, risk_reward_ratio, expiry_time, is_expired, action_suggestion, @@ -233,7 +241,7 @@ class CoinSelectionEngine: selection = { 'id': detailed_row[0], 'symbol': detailed_row[1], - 'score': detailed_row[2], + 'qualified_factors': detailed_row[2], 'reason': detailed_row[3], 'entry_price': detailed_row[4], 'stop_loss': detailed_row[5], @@ -324,7 +332,7 @@ class CoinSelectionEngine: for i, signal in enumerate(signals, 1): summary += f"{i}. {signal.symbol} [{signal.strategy_type}] - {signal.action_suggestion}\n" - summary += f" 评分: {signal.score:.1f}分 ({signal.confidence}信心)\n" + summary += f" 符合因子: {signal.qualified_factors}/4 ({signal.confidence}信心)\n" summary += f" 理由: {signal.reason}\n" summary += f" 入场: ${signal.entry_price:.4f}\n" summary += f" 止损: ${signal.stop_loss:.4f} ({((signal.stop_loss - signal.entry_price) / signal.entry_price * 100):.2f}%)\n" diff --git a/database.py b/database.py index ceee503..b23d912 100644 --- a/database.py +++ b/database.py @@ -93,7 +93,59 @@ class DatabaseManager: ) ''') - # 检查并添加新列(如果表已存在) + # 修改score字段为可空(如果表已存在) + try: + # SQLite不能直接修改列约束,所以我们需要检查并处理 + cursor.execute("PRAGMA table_info(coin_selections)") + columns = cursor.fetchall() + + # 检查是否需要重建表结构 + score_column = next((col for col in columns if col[1] == 'score'), None) + if score_column and score_column[3] == 1: # NOT NULL constraint exists + # 备份现有数据 + cursor.execute("ALTER TABLE coin_selections RENAME TO coin_selections_backup") + + # 重新创建表(score字段改为可空) + cursor.execute(''' + CREATE TABLE coin_selections ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + symbol TEXT NOT NULL, + score REAL, + reason TEXT NOT NULL, + entry_price REAL NOT NULL, + stop_loss REAL NOT NULL, + take_profit REAL NOT NULL, + timeframe TEXT NOT NULL, + selection_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + status TEXT DEFAULT 'active', + actual_entry_price REAL, + exit_price REAL, + exit_time TIMESTAMP, + pnl_percentage REAL, + notes TEXT, + strategy_type TEXT NOT NULL DEFAULT '中线', + holding_period INTEGER NOT NULL DEFAULT 7, + risk_reward_ratio REAL NOT NULL DEFAULT 2.0, + expiry_time TIMESTAMP, + is_expired BOOLEAN DEFAULT FALSE, + action_suggestion TEXT DEFAULT '等待回调买入', + signal_type TEXT DEFAULT 'LONG', + direction TEXT DEFAULT 'BUY', + qualified_factors INTEGER NOT NULL DEFAULT 3 + ) + ''') + + # 迁移数据 + cursor.execute(''' + INSERT INTO coin_selections + SELECT * FROM coin_selections_backup + ''') + + # 删除备份表 + cursor.execute("DROP TABLE coin_selections_backup") + except Exception as e: + # 如果出错,可能是表不存在或已经正确,继续执行 + pass try: cursor.execute("ALTER TABLE coin_selections ADD COLUMN strategy_type TEXT NOT NULL DEFAULT '中线'") except: @@ -135,6 +187,12 @@ class DatabaseManager: except: pass + # 添加新的符合因子字段 + try: + cursor.execute("ALTER TABLE coin_selections ADD COLUMN qualified_factors INTEGER NOT NULL DEFAULT 3") + except: + pass + # 技术指标表 cursor.execute(''' CREATE TABLE IF NOT EXISTS technical_indicators ( @@ -175,7 +233,7 @@ class DatabaseManager: """获取数据库连接""" return sqlite3.connect(self.db_path) - def insert_coin_selection(self, symbol, score, reason, entry_price, stop_loss, take_profit, + def insert_coin_selection(self, symbol, qualified_factors, reason, entry_price, stop_loss, take_profit, timeframe, strategy_type, holding_period, risk_reward_ratio, expiry_hours, action_suggestion, signal_type="LONG", direction="BUY"): """插入选币结果 - 支持多空方向""" @@ -185,13 +243,16 @@ class DatabaseManager: # 计算过期时间 expiry_time = datetime.now(timezone.utc) + timedelta(hours=expiry_hours) + # 将qualified_factors转换为分数(兼容旧系统) + score = qualified_factors * 25.0 if qualified_factors else 75.0 # 3/4 = 75分,4/4 = 100分 + cursor.execute(''' INSERT INTO coin_selections - (symbol, score, reason, entry_price, stop_loss, take_profit, timeframe, + (symbol, score, qualified_factors, reason, entry_price, stop_loss, take_profit, timeframe, strategy_type, holding_period, risk_reward_ratio, expiry_time, action_suggestion, signal_type, direction) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ''', (symbol, score, reason, entry_price, stop_loss, take_profit, timeframe, + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', (symbol, score, qualified_factors, reason, entry_price, stop_loss, take_profit, timeframe, strategy_type, holding_period, risk_reward_ratio, expiry_time, action_suggestion, signal_type, direction)) @@ -215,7 +276,7 @@ class DatabaseManager: cursor.execute(''' SELECT * FROM coin_selections WHERE status = 'active' AND is_expired = FALSE - ORDER BY selection_time DESC, score DESC + ORDER BY selection_time DESC, qualified_factors DESC LIMIT ? OFFSET ? ''', (limit, offset)) diff --git a/technical_analyzer.py b/technical_analyzer.py index 17eb88d..57661b0 100644 --- a/technical_analyzer.py +++ b/technical_analyzer.py @@ -1,2350 +1,1320 @@ import pandas as pd +import numpy as np import talib -from typing import Dict, List, Tuple, Optional +from typing import Dict, List, Tuple, Optional, NamedTuple import logging from dataclasses import dataclass +from datetime import datetime, timedelta # 配置日志输出到控制台 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ - logging.StreamHandler(), # 输出到控制台 - logging.FileHandler('technical_analysis.log') # 输出到文件 + logging.StreamHandler(), + logging.FileHandler('technical_analysis.log') ], - force=True # 强制重新配置日志 + force=True ) @dataclass class CoinSignal: symbol: str - score: float + qualified_factors: int # 替换score为符合因子数量 reason: str entry_price: float stop_loss: float take_profit: float timeframe: str confidence: str - strategy_type: str # 短线/中线/长线 - holding_period: int # 预期持仓周期(天) - risk_reward_ratio: float # 风险回报比 - expiry_hours: int # 选币信号有效期(小时) - action_suggestion: str # 操作建议 - signal_type: str = "LONG" # 新增:信号类型 LONG/SHORT - direction: str = "BUY" # 新增:操作方向 BUY/SELL - -@dataclass -class StrategyConfig: - name: str - primary_timeframe: str - confirm_timeframe: str - holding_period_days: int + strategy_type: str + holding_period: int + risk_reward_ratio: float expiry_hours: int - risk_reward_min: float - risk_reward_max: float - rsi_period: int - macd_fast: int - macd_slow: int - macd_signal: int + action_suggestion: str + signal_type: str = "LONG" + direction: str = "BUY" + +class PatternSignal(NamedTuple): + """技术形态信号""" + pattern_type: str + strength: float + direction: str # "BULLISH" or "BEARISH" + confidence: float + description: str + +class VolumeAnalysis(NamedTuple): + """量价分析结果""" + volume_trend: str # "INCREASING", "DECREASING", "STABLE" + price_volume_divergence: bool + volume_breakout: bool + volume_strength: float + description: str class TechnicalAnalyzer: def __init__(self): - self.min_score = 60 # 提高最低选币分数,注重质量 - self.max_selections = 15 # 适中的选币数量,注重质量而非数量 - - # 策略配置 - 重新设计时间周期 + self.logger = logging.getLogger(__name__) + # 策略配置 - 专注于短期突破机会 self.strategies = { - '超短线': StrategyConfig( - name='超短线', - primary_timeframe='5m', - confirm_timeframe='1m', - holding_period_days=1, - expiry_hours=24, - risk_reward_min=1.2, - risk_reward_max=1.8, - rsi_period=5, - macd_fast=3, - macd_slow=8, - macd_signal=2 - ), - '短线': StrategyConfig( - name='短线', - primary_timeframe='15m', - confirm_timeframe='5m', - holding_period_days=3, - expiry_hours=72, - risk_reward_min=1.5, - risk_reward_max=2.5, - rsi_period=7, - macd_fast=5, - macd_slow=13, - macd_signal=3 - ), - '中线': StrategyConfig( - name='中线', - primary_timeframe='1h', - confirm_timeframe='15m', - holding_period_days=7, - expiry_hours=168, # 7天 - risk_reward_min=2.0, - risk_reward_max=3.5, - rsi_period=14, - macd_fast=12, - macd_slow=26, - macd_signal=9 - ), - '波段': StrategyConfig( - name='波段', - primary_timeframe='4h', - confirm_timeframe='1h', - holding_period_days=15, - expiry_hours=360, # 15天 - risk_reward_min=2.5, - risk_reward_max=4.0, - rsi_period=14, - macd_fast=12, - macd_slow=26, - macd_signal=9 - ), - '长线': StrategyConfig( - name='长线', - primary_timeframe='1d', - confirm_timeframe='4h', - holding_period_days=30, - expiry_hours=720, # 30天 - risk_reward_min=3.0, - risk_reward_max=6.0, - rsi_period=21, - macd_fast=26, - macd_slow=50, - macd_signal=12 - ), - '趋势': StrategyConfig( - name='趋势', - primary_timeframe='1w', - confirm_timeframe='1d', - holding_period_days=60, - expiry_hours=1440, # 60天 - risk_reward_min=4.0, - risk_reward_max=8.0, - rsi_period=21, - macd_fast=26, - macd_slow=50, - macd_signal=12 - ) - } - def get_required_timeframes(self) -> Dict[str, List[str]]: - """获取每个策略需要的时间周期""" - return { - '超短线': ['1m', '5m', '15m'], # 需要更短周期确认 - '短线': ['5m', '15m', '1h'], # 短线需要分钟级别 - '中线': ['15m', '1h', '4h'], # 中线需要小时级别 - '波段': ['1h', '4h', '1d'], # 波段需要日内到日线 - '长线': ['4h', '1d', '3d'], # 长线需要日线级别 - '趋势': ['1d', '3d', '1w'] # 趋势需要周线级别 + "突破策略": { + "timeframes": ["15m", "1h", "4h"], + "holding_period": 1, # 1-3天 + "expiry_hours": 24, + "min_score": 75, + "risk_reward": 2.0 + }, + "量价策略": { + "timeframes": ["5m", "15m", "1h"], + "holding_period": 1, + "expiry_hours": 12, + "min_score": 70, + "risk_reward": 1.5 + }, + "形态策略": { + "timeframes": ["1h", "4h"], + "holding_period": 2, + "expiry_hours": 36, + "min_score": 80, + "risk_reward": 2.5 + } } - def get_optimal_timeframes_for_analysis(self, market_conditions: Dict = None) -> List[str]: - """根据市场条件动态选择最优时间周期组合""" - # 基础时间周期组合 - 包含所有策略需要的时间周期 - base_timeframes = ['1m', '5m', '15m', '1h', '4h', '1d', '3d', '1w'] - - # 如果有市场条件信息,可以进一步优化 - if market_conditions: - volatility = market_conditions.get('volatility', 'medium') - trend_strength = market_conditions.get('trend_strength', 'medium') - - if volatility == 'high': - # 高波动市场,增加短周期权重 - return ['1m', '5m', '15m', '1h', '4h', '1d'] - elif volatility == 'low': - # 低波动市场,增加长周期权重 - return ['1h', '4h', '1d', '3d', '1w'] - - return base_timeframes - - def determine_best_strategy(self, df: pd.DataFrame) -> str: - """根据币种特性确定最佳策略 - 增强版""" + def analyze_single_coin(self, symbol: str, timeframe_data: Dict[str, pd.DataFrame], + volume_24h_usd: float) -> List[CoinSignal]: + """分析单个币种 - 针对每个时间框架单独分析""" try: - if len(df) < 50: - logging.info("数据不足50条,使用默认中线策略") - return '中线' # 默认策略 + signals = [] - recent_data = df.tail(20) + # 基本过滤:交易量要求 + if volume_24h_usd < 5000000: # 500万USDT最低要求 + return signals - # 1. 计算波动率 - volatility = recent_data['close'].std() / recent_data['close'].mean() + # 针对每个时间框架单独进行分析 + target_timeframes = ["15m", "1h", "4h", "1d"] - # 2. 计算价格变化速度 - price_change_5d = (df.iloc[-1]['close'] - df.iloc[-5]['close']) / df.iloc[-5]['close'] - - # 3. 成交量特性 - volume_recent = recent_data['volume'].mean() - volume_avg = df['volume'].mean() - volume_ratio = volume_recent / volume_avg if volume_avg > 0 else 1 - - logging.info(f"策略选择指标: 波动率={volatility:.3f}, 5日价格变化={price_change_5d:.3f}, 成交量比率={volume_ratio:.2f}") - - # 添加详细的策略选择调试信息 - logging.info(f"策略选择条件判断:") - logging.info(f" 超短线: volatility({volatility:.3f}) > 0.20? {volatility > 0.20}, abs_change({abs(price_change_5d):.3f}) > 0.10? {abs(price_change_5d) > 0.10}") - logging.info(f" 短线: volatility({volatility:.3f}) > 0.10? {volatility > 0.10}, abs_change({abs(price_change_5d):.3f}) > 0.05? {abs(price_change_5d) > 0.05}") - logging.info(f" 中线: volatility({volatility:.3f}) > 0.06? {volatility > 0.06}, volume_ratio({volume_ratio:.2f}) > 1.1? {volume_ratio > 1.1}") - logging.info(f" 波段: 0.03 < volatility({volatility:.3f}) <= 0.08? {0.03 < volatility <= 0.08}") - - # 策略选择逻辑 - 修复条件,确保所有策略都能被选中 - if volatility > 0.20 and abs(price_change_5d) > 0.10: - # 极高波动,快速变化 → 超短线策略 - logging.info("选择超短线策略: 极高波动且快速变化") - return '超短线' - elif volatility > 0.08 and abs(price_change_5d) > 0.03: - # 高波动,较快变化 → 短线策略(进一步降低门槛) - logging.info("选择短线策略: 高波动且较快变化") - return '短线' - elif volatility > 0.06 and volume_ratio > 1.1: - # 中等波动,有资金关注 → 中线策略 - logging.info("选择中线策略: 中等波动且有资金关注") - return '中线' - elif volatility > 0.03 and volatility <= 0.08: - # 中低波动,适合波段操作 → 波段策略 - logging.info("选择波段策略: 中低波动适合波段操作") - return '波段' - elif volatility <= 0.04 and volume_ratio < 1.0: - # 低波动,适合长期持有 → 长线策略 - logging.info("选择长线策略: 低波动适合长期持有") - return '长线' - elif volatility <= 0.02: - # 极低波动,趋势跟踪 → 趋势策略 - logging.info("选择趋势策略: 极低波动适合趋势跟踪") - return '趋势' - else: - # 根据波动率范围选择策略 - if volatility <= 0.03: - logging.info("选择趋势策略: 基于波动率范围") - return '趋势' - elif volatility <= 0.06: - logging.info("选择长线策略: 基于波动率范围") - return '长线' - elif volatility <= 0.10: - logging.info("选择波段策略: 基于波动率范围") - return '波段' - else: - logging.info("选择中线策略: 默认选择") - return '中线' + for timeframe in target_timeframes: + if timeframe not in timeframe_data or len(timeframe_data[timeframe]) < 30: + continue + try: + # 为当前时间框架进行单独的因子分析 + factor_scores = self._analyze_single_timeframe_factors( + timeframe_data, timeframe, symbol + ) + + # 如果当前时间框架符合条件(6选5制度) + if factor_scores['qualified_factors'] >= 5: + signal = self._generate_signal_for_timeframe( + symbol, timeframe_data, factor_scores, volume_24h_usd, timeframe + ) + if signal: + signals.append(signal) + self.logger.info(f"{symbol} 在 {timeframe} 时间框架发现信号: {factor_scores['qualified_factors']}/6 因子") + + except Exception as e: + self.logger.error(f"分析{symbol}的{timeframe}时间框架时出错: {e}") + continue + + return signals + except Exception as e: - logging.warning(f"策略选择失败,使用默认中线策略: {e}") - return '中线' + self.logger.error(f"分析{symbol}时出错: {e}") + return [] - def calculate_technical_indicators(self, df: pd.DataFrame, strategy: str = '中线') -> pd.DataFrame: - """计算技术指标""" - if df.empty or len(df) < 50: - return df - - config = self.strategies[strategy] - - # 移动平均线 - df['ma20'] = talib.SMA(df['close'].values, timeperiod=20) - df['ma50'] = talib.SMA(df['close'].values, timeperiod=50) - df['ma200'] = talib.SMA(df['close'].values, timeperiod=200) - - # 根据策略使用不同参数的RSI - df['rsi'] = talib.RSI(df['close'].values, timeperiod=config.rsi_period) - - # 根据策略使用不同参数的MACD - df['macd'], df['macd_signal'], df['macd_hist'] = talib.MACD( - df['close'].values, - fastperiod=config.macd_fast, - slowperiod=config.macd_slow, - signalperiod=config.macd_signal - ) - - # 布林带 - df['bb_upper'], df['bb_middle'], df['bb_lower'] = talib.BBANDS( - df['close'].values, timeperiod=20, nbdevup=2, nbdevdn=2, matype=0 - ) - - # 成交量移动平均 - df['volume_ma'] = talib.SMA(df['volume'].values, timeperiod=20) - - # ATR (平均真实波动范围) - df['atr'] = talib.ATR(df['high'].values, df['low'].values, df['close'].values, timeperiod=14) - - return df - - def calculate_fibonacci_levels(self, df: pd.DataFrame, lookback=50) -> Dict[str, float]: - """计算斐波那契回调位""" - if len(df) < lookback: - return {} - - recent_data = df.tail(lookback) - high = recent_data['high'].max() - low = recent_data['low'].min() - diff = high - low - - return { - 'fib_0': high, - 'fib_236': high - 0.236 * diff, - 'fib_382': high - 0.382 * diff, - 'fib_500': high - 0.500 * diff, - 'fib_618': high - 0.618 * diff, - 'fib_786': high - 0.786 * diff, - 'fib_100': low + def _analyze_single_timeframe_factors(self, timeframe_data: Dict[str, pd.DataFrame], + target_timeframe: str, symbol: str) -> Dict: + """针对单个时间框架进行因子分析""" + factors = { + 'trend_alignment': False, # 趋势一致性 + 'technical_opportunity': False, # 技术指标机会 + 'momentum_confirmation': False, # 动量确认 + 'support_resistance': False, # 支撑阻力 + 'market_structure': False, # 市场结构 + 'liquidity_flow': False, # 流动性资金流 + 'signals': [], + 'direction': 'NEUTRAL', + 'primary_timeframe': target_timeframe, + 'qualified_factors': 0 } - - def find_support_resistance(self, df: pd.DataFrame, window=20) -> Tuple[float, float]: - """寻找支撑阻力位""" - if len(df) < window * 2: - return 0, 0 - - # 寻找局部高点和低点 - recent_data = df.tail(window) - - # 获取最近的支撑和阻力 - resistance = recent_data['high'].max() - support = recent_data['low'].min() - - return support, resistance - - def simple_long_analysis(self, df: pd.DataFrame) -> float: - """简化的多头分析 - 只看核心指标""" - if len(df) < 10: - return 0 - - score = 0 - current = df.iloc[-1] - recent_data = df.tail(5) - - # 1. 价格趋势 (30分) - 最简单直接 - price_changes = recent_data['close'].pct_change().dropna() - positive_days = (price_changes > 0).sum() - recent_return = (current['close'] - df.iloc[-5]['close']) / df.iloc[-5]['close'] - - if positive_days >= 3: # 5天中3天上涨 - score += 15 - if 0.02 <= recent_return <= 0.15: # 5日涨幅2%-15% - score += 15 - - # 2. 量能确认 (20分) - 简化量价分析 - if 'volume' in df.columns: - recent_volume = recent_data['volume'].mean() - prev_volume = df.iloc[-10:-5]['volume'].mean() if len(df) >= 10 else recent_volume - - if recent_volume > prev_volume * 1.2: # 量能放大 - score += 20 - - # 3. 相对位置 (20分) - 避免高位追高 - if len(df) >= 20: - high_20 = df.tail(20)['high'].max() - low_20 = df.tail(20)['low'].min() - position = (current['close'] - low_20) / (high_20 - low_20) if high_20 != low_20 else 0.5 - - if 0.2 <= position <= 0.7: # 中低位置 - score += 20 - elif position > 0.85: # 过高位置 - score -= 10 - - # 4. 动量指标 (10分) - 只看简单RSI - if 'rsi' in df.columns and not df['rsi'].isna().all(): - rsi = current.get('rsi', 50) - if 40 <= rsi <= 65: # RSI健康区间 - score += 10 - elif rsi > 75: # 过热 - score -= 5 - - return min(max(score, 0), 70) # 限制区间 - - def simple_short_analysis(self, df: pd.DataFrame) -> float: - """简化的空头分析 - 只看核心指标""" - if len(df) < 10: - return 0 - - score = 0 - current = df.iloc[-1] - recent_data = df.tail(5) - - # 1. 价格趋势 (30分) - 直接看下跌 - price_changes = recent_data['close'].pct_change().dropna() - negative_days = (price_changes < 0).sum() - recent_return = (current['close'] - df.iloc[-5]['close']) / df.iloc[-5]['close'] - - if negative_days >= 3: # 5天中3天下跌 - score += 15 - if -0.15 <= recent_return <= -0.02: # 5日跌幅2%-15% - score += 15 - - # 2. 量能确认 (20分) - 价跌量增 - if 'volume' in df.columns: - recent_volume = recent_data['volume'].mean() - prev_volume = df.iloc[-10:-5]['volume'].mean() if len(df) >= 10 else recent_volume - - if recent_volume > prev_volume * 1.2: # 量能放大 - score += 20 - - # 3. 相对位置 (20分) - 高位做空 - if len(df) >= 20: - high_20 = df.tail(20)['high'].max() - low_20 = df.tail(20)['low'].min() - position = (current['close'] - low_20) / (high_20 - low_20) if high_20 != low_20 else 0.5 - - if 0.6 <= position <= 0.95: # 高位区间 - score += 20 - elif position < 0.3: # 过低位置 - score -= 10 - - # 4. 动量指标 (10分) - 只看RSI - if 'rsi' in df.columns and not df['rsi'].isna().all(): - rsi = current.get('rsi', 50) - if 35 <= rsi <= 60: # RSI适中区间 - score += 10 - elif rsi < 25: # 过度超卖 - score -= 5 - - return min(max(score, 0), 70) # 限制区间 - - def _analyze_consecutive_moves(self, df: pd.DataFrame) -> float: - """分析连续上涨动量""" - if len(df) < 10: - return 0 - - score = 0 - recent_closes = df['close'].tail(10).values - - # 计算连续上涨天数 - consecutive_up = 0 - for i in range(1, len(recent_closes)): - if recent_closes[i] > recent_closes[i-1]: - consecutive_up += 1 - else: - break - - # 计算最近3日涨幅 - recent_gain = (recent_closes[-1] - recent_closes[-4]) / recent_closes[-4] if len(recent_closes) >= 4 else 0 - - # 连续上涨但涨幅适中得高分 - if consecutive_up >= 3 and 0.03 <= recent_gain <= 0.12: # 3%-12%涨幅 - score += 8 - elif consecutive_up >= 2 and 0.02 <= recent_gain <= 0.08: - score += 5 - elif consecutive_up >= 4: # 连续上涨但可能过热 - score += 3 - - # 价格加速上涨警告 - if recent_gain > 0.20: # 超过20%涨幅减分 - score -= 5 - - return max(score, 0) - - def _analyze_price_breakthrough(self, df: pd.DataFrame) -> float: - """分析价格突破情况""" - if len(df) < 50: - return 0 - - score = 0 - current_price = df.iloc[-1]['close'] - - # 1. 突破早期高点 (10分) - # 计算过去20-50日的高点 - historical_data = df.iloc[-50:-10] # 排除最近10天 - if len(historical_data) > 0: - resistance_levels = [] - # 寻找局部高点 - for i in range(5, len(historical_data)-5): - local_high = historical_data.iloc[i]['high'] - is_peak = True - for j in range(max(0, i-5), min(len(historical_data), i+6)): - if j != i and historical_data.iloc[j]['high'] > local_high: - is_peak = False - break - if is_peak: - resistance_levels.append(local_high) - - # 检查是否突破这些阻力位 - for resistance in resistance_levels: - if current_price > resistance * 1.01: # 突破且有一定幅度 - # 检查突破的时间和幅度 - breakthrough_ratio = (current_price - resistance) / resistance - if breakthrough_ratio <= 0.05: # 刚刚突破,没有过度追高 - score += 8 - elif breakthrough_ratio <= 0.10: - score += 5 - break - - # 2. 突破近期高点 (5分) - recent_high = df.tail(10)['high'].max() - if current_price > recent_high * 1.005: # 突破近期高点 - score += 5 - - return min(score, 15) - - def _analyze_wave_structure(self, df: pd.DataFrame) -> float: - """分析波动结构 - 寻找低点抬高模式""" - if len(df) < 30: - return 0 - - score = 0 - recent_data = df.tail(30) - - # 寻找近30天的低点 - lows = [] - highs = [] - - for i in range(3, len(recent_data)-3): - current_low = recent_data.iloc[i]['low'] - current_high = recent_data.iloc[i]['high'] - - # 判断是否为局部低点 - is_local_low = True - is_local_high = True - - for j in range(i-3, i+4): - if j != i: - if recent_data.iloc[j]['low'] < current_low: - is_local_low = False - if recent_data.iloc[j]['high'] > current_high: - is_local_high = False - - if is_local_low: - lows.append((i, current_low)) - if is_local_high: - highs.append((i, current_high)) - - # 分析低点抬高趋势 - if len(lows) >= 2: - # 检查最近两个低点是否抬高 - latest_lows = sorted(lows, key=lambda x: x[0])[-2:] - if latest_lows[1][1] > latest_lows[0][1]: # 低点抬高 - score += 5 - - # 分析高点突破趋势 - if len(highs) >= 2: - latest_highs = sorted(highs, key=lambda x: x[0])[-2:] - if latest_highs[1][1] > latest_highs[0][1]: # 高点突破 - score += 5 - - return min(score, 10) - - def analyze_volume_price_relationship(self, df: pd.DataFrame) -> float: - """分析量价关系 - 识别资金流入模式 (最高30分)""" - if len(df) < 20: - return 0 - - score = 0 - - # 1. 价涨量增模式分析 (15分) - price_volume_score = self._analyze_price_volume_trend(df) - score += price_volume_score - - # 2. 量能突破分析 (10分) - volume_breakthrough_score = self._analyze_volume_breakthrough(df) - score += volume_breakthrough_score - - # 3. 资金流入模式 (5分) - accumulation_score = self._analyze_accumulation_pattern(df) - score += accumulation_score - - return min(score, 30) - - def _analyze_price_volume_trend(self, df: pd.DataFrame) -> float: - """分析价涨量增模式""" - if len(df) < 15: - return 0 - - score = 0 - recent_data = df.tail(15) - - # 计算价格和成交量的相关性 - price_changes = recent_data['close'].pct_change().dropna() - volume_changes = recent_data['volume'].pct_change().dropna() - - if len(price_changes) >= 10 and len(volume_changes) >= 10: - # 只分析上涨日的量价关系 - up_days = price_changes > 0 - up_price_changes = price_changes[up_days] - up_volume_changes = volume_changes[up_days] - - if len(up_price_changes) >= 5: - # 计算上涨日的量价相关性 - correlation = up_price_changes.corr(up_volume_changes) - - if correlation > 0.5: # 强正相关 - score += 10 - elif correlation > 0.3: - score += 6 - elif correlation > 0.1: - score += 3 - - # 分析最近5天的量价配合 - recent_5 = df.tail(5) - up_days_recent = (recent_5['close'].diff() > 0).sum() - avg_volume_recent = recent_5['volume'].mean() - avg_volume_before = df.iloc[-20:-5]['volume'].mean() if len(df) >= 20 else avg_volume_recent - - volume_ratio = avg_volume_recent / avg_volume_before if avg_volume_before > 0 else 1 - - # 上涨日多且成交量放大 - if up_days_recent >= 3 and volume_ratio > 1.2: - score += 5 - - return min(score, 15) - - def _analyze_volume_breakthrough(self, df: pd.DataFrame) -> float: - """分析量能突破""" - if len(df) < 30: - return 0 - - score = 0 - current_volume = df.iloc[-1]['volume'] - avg_volume_30 = df.tail(30)['volume'].mean() - avg_volume_10 = df.tail(10)['volume'].mean() - - # 1. 近期量能突破 - volume_ratio_current = current_volume / avg_volume_30 if avg_volume_30 > 0 else 1 - volume_ratio_recent = avg_volume_10 / avg_volume_30 if avg_volume_30 > 0 else 1 - - if volume_ratio_current > 2.0: # 当日爆量 - # 检查是否伴随价格上涨 - price_change = (df.iloc[-1]['close'] - df.iloc[-2]['close']) / df.iloc[-2]['close'] - if price_change > 0.02: # 价涨量增 - score += 6 - elif price_change > 0: # 价涨但幅度不大 - score += 3 - - if 1.3 <= volume_ratio_recent <= 2.5: # 温和持续放量 - score += 4 - - return min(score, 10) - - def _analyze_accumulation_pattern(self, df: pd.DataFrame) -> float: - """分析资金积累模式""" - if len(df) < 20: - return 0 - - score = 0 - recent_data = df.tail(20) - - # 分析OBV(能量潮)趋势 - # 简化版OBV计算 - obv = [] - obv_value = 0 - - for i in range(len(recent_data)): - if i == 0: - obv.append(obv_value) - continue - - price_change = recent_data.iloc[i]['close'] - recent_data.iloc[i-1]['close'] - volume = recent_data.iloc[i]['volume'] - - if price_change > 0: - obv_value += volume - elif price_change < 0: - obv_value -= volume - - obv.append(obv_value) - - # 分析OBV趋势 - if len(obv) >= 10: - obv_recent = obv[-5:] # 最近5天 - obv_before = obv[-10:-5] # 之前5天 - - if sum(obv_recent) > sum(obv_before): # OBV上升 - score += 3 - - # 检查价格横盘但量能放大的情况(积累) - price_volatility = recent_data['close'].std() / recent_data['close'].mean() - volume_avg_recent = recent_data.tail(10)['volume'].mean() - volume_avg_before = recent_data.head(10)['volume'].mean() - - if (price_volatility < 0.08 and # 价格相对稳定 - volume_avg_recent > volume_avg_before * 1.2): # 但量能放大 - score += 2 - - return min(score, 5) - - def analyze_technical_patterns(self, df: pd.DataFrame) -> float: - """分析技术形态 - 综合K线形态和价格结构 (最高25分)""" - if len(df) < 20: - return 0 - - score = 0 - - # 1. K线形态识别 (15分) - candlestick_score = self._analyze_enhanced_candlestick_patterns(df) - score += candlestick_score - - # 2. 价格结构分析 (10分) - structure_score = self._analyze_price_structure(df) - score += structure_score - - return min(score, 25) - - def _analyze_enhanced_candlestick_patterns(self, df: pd.DataFrame) -> float: - """增强型K线形态分析""" - if len(df) < 10: - return 0 - - score = 0 - - # 使用talib的K线形态识别 - patterns = { - 'hammer': talib.CDLHAMMER(df['open'], df['high'], df['low'], df['close']), - 'doji': talib.CDLDOJI(df['open'], df['high'], df['low'], df['close']), - 'engulfing': talib.CDLENGULFING(df['open'], df['high'], df['low'], df['close']), - 'morning_star': talib.CDLMORNINGSTAR(df['open'], df['high'], df['low'], df['close']), - 'three_white_soldiers': talib.CDL3WHITESOLDIERS(df['open'], df['high'], df['low'], df['close']), - 'piercing': talib.CDLPIERCING(df['open'], df['high'], df['low'], df['close']), - 'harami_bullish': talib.CDLHARAMI(df['open'], df['high'], df['low'], df['close']) - } - - # 检查最近3天的K线形态 - recent_signals = 0 - for pattern_name, pattern_data in patterns.items(): - if len(pattern_data) > 3: - # 检查最近3天是否有信号 - for i in range(-3, 0): - if pattern_data.iloc[i] > 0: # 看涨信号 - recent_signals += 1 - if pattern_name in ['morning_star', 'three_white_soldiers', 'piercing']: - score += 4 # 强看涨形态 - elif pattern_name in ['hammer', 'engulfing']: - score += 3 # 中度看涨形态 - else: - score += 2 # 弱看涨形态 - break - - # 手动分析一些简单形态 - manual_score = self._analyze_manual_patterns(df.tail(10)) - score += manual_score - - return min(score, 15) - - def _analyze_manual_patterns(self, df: pd.DataFrame) -> float: - """手动分析一些简单形态""" - if len(df) < 5: - return 0 - - score = 0 - - # 分析最近5天的走势 - last_5 = df.tail(5) - - # 1. 阶段性低点上移模式 - lows = last_5['low'].tolist() - if len(lows) >= 3: - # 检查是否有低点逐步抬高的趋势 - ascending_lows = True - for i in range(1, len(lows)): - if lows[i] < lows[i-1] * 0.98: # 允许小幅波动 - ascending_lows = False - break - if ascending_lows: - score += 3 - - # 2. 突破前高点模式 - current_high = df.iloc[-1]['high'] - prev_highs = df.iloc[-5:-1]['high'].tolist() - if current_high > max(prev_highs) * 1.01: # 突破前高 - score += 2 - - return min(score, 5) - - def _analyze_price_structure(self, df: pd.DataFrame) -> float: - """分析价格结构""" - if len(df) < 30: - return 0 - - score = 0 - - # 1. 分析关键支撑位突破 (6分) - support_score = self._analyze_support_breakthrough(df) - score += support_score - - # 2. 分析价格区间突破 (4分) - range_score = self._analyze_range_breakthrough(df) - score += range_score - - return min(score, 10) - - def _analyze_support_breakthrough(self, df: pd.DataFrame) -> float: - """分析支撑位突破""" - if len(df) < 30: - return 0 - - score = 0 - current_price = df.iloc[-1]['close'] - - # 寻找近30天内的重要支撑位 - recent_30 = df.tail(30) - - # 计算支撑位(多次触及的低点) - support_levels = [] - for i in range(5, len(recent_30)-5): - low_price = recent_30.iloc[i]['low'] - # 检查这个低点是否被多次测试 - test_count = 0 - for j in range(len(recent_30)): - if abs(recent_30.iloc[j]['low'] - low_price) / low_price < 0.02: # 2%范围内 - test_count += 1 - - if test_count >= 2: # 至少被测试2次 - support_levels.append(low_price) - - # 检查是否突破了这些支撑位 - for support in support_levels: - if current_price > support * 1.03: # 突破支撑位3%以上 - score += 3 - break - - return min(score, 6) - - def _analyze_range_breakthrough(self, df: pd.DataFrame) -> float: - """分析区间突破""" - if len(df) < 20: - return 0 - - score = 0 - recent_20 = df.tail(20) - current_price = df.iloc[-1]['close'] - - # 计算近20天的价格区间 - price_high = recent_20['high'].max() - price_low = recent_20['low'].min() - price_range = price_high - price_low - - # 检查是否在盘整后突破 - if price_range > 0: - # 检查是否经历了盘整阶段 - middle_period = recent_20.iloc[5:15] # 中间一段 - consolidation_range = middle_period['high'].max() - middle_period['low'].min() - - # 如果中间阶段波动较小,说明有盘整 - if consolidation_range < price_range * 0.6: - # 检查是否突破了区间高点 - if current_price > price_high * 0.995: # 接近或突破高点 - score += 4 - - return min(score, 4) - - def analyze_price_behavior(self, df: pd.DataFrame) -> float: - """分析价格行为 - 核心指标 (最高35分)""" - if len(df) < 30: - return 0 - - score = 0 - recent_data = df.tail(20) - - # 1. 连续上涨动量分析 (12分) - consecutive_score = self._analyze_consecutive_moves(df) - score += consecutive_score - - # 2. 价格突破分析 (15分) - breakthrough_score = self._analyze_price_breakthrough(df) - score += breakthrough_score - - # 3. 波动结构分析 (8分) - wave_score = self._analyze_wave_structure(df) - score += wave_score - - return min(score, 35) - - def analyze_breakout_potential(self, df: pd.DataFrame) -> float: - """分析突破潜力 - 辅助指标 (最高20分)""" - if len(df) < 30: - return 0 - - score = 0 - current = df.iloc[-1] - recent_data = df.tail(20) - - # 1. 价格位置分析 - 寻找埋伏位置 (8分) - recent_high = recent_data['high'].max() - recent_low = recent_data['low'].min() - price_range = recent_high - recent_low - current_position = (current['close'] - recent_low) / price_range if price_range > 0 else 0 - - # 偏好在区间中下部的币种 - if 0.2 <= current_position <= 0.5: # 理想埋伏位置 - score += 6 - elif 0.5 < current_position <= 0.7: # 次优位置 - score += 3 - elif current_position > 0.8: # 已经在高位,减分 - score -= 5 - - # 2. 整理形态分析 (6分) - volatility = recent_data['close'].std() / recent_data['close'].mean() - if 0.05 <= volatility <= 0.12: # 适度整理,蓄势待发 - score += 5 - elif volatility < 0.03: # 过度整理,缺乏动力 - score += 1 - elif volatility > 0.20: # 波动过大,风险较高 - score -= 3 - - # 3. 量能配合 (6分) - volume_ratio = recent_data['volume'] / recent_data['volume'].rolling(10).mean() - moderate_volume_days = ((volume_ratio >= 1.1) & (volume_ratio <= 1.8)).sum() - explosive_volume_days = (volume_ratio > 2.5).sum() - - if moderate_volume_days >= 3: # 持续温和放量 - score += 4 - if explosive_volume_days <= 1: # 没有过度爆量 - score += 2 - - return max(min(score, 20), 0) - - def analyze_short_signals(self, df: pd.DataFrame, strategy: str = '中线') -> Optional[CoinSignal]: - """分析做空信号 - 识别下跌趋势币种""" - if df.empty or len(df) < 50: - return None try: - df = self.calculate_technical_indicators(df, strategy) - config = self.strategies[strategy] + # 使用目标时间框架的数据进行分析 + df = timeframe_data[target_timeframe] - total_score = 0 - reasons = [] + # 1. 趋势一致性判断(基于当前时间框架) + trend_result = self._check_single_timeframe_trend(df, target_timeframe) + factors['trend_alignment'] = trend_result['qualified'] + factors['signals'].extend(trend_result['signals']) + primary_direction = trend_result['direction'] - # === 做空信号评分体系 === + # 2. 技术指标机会判断 + technical_result = self._check_single_timeframe_technical(df, target_timeframe) + factors['technical_opportunity'] = technical_result['qualified'] + factors['signals'].extend(technical_result['signals']) - # 1. 空头价格行为分析 (35分) - bear_price_score = self.analyze_bearish_price_behavior(df) - if bear_price_score > 0: - total_score += bear_price_score - if bear_price_score >= 25: - reasons.append(f"强势下跌信号({bear_price_score}分)") - elif bear_price_score >= 15: - reasons.append(f"下跌信号({bear_price_score}分)") + # 3. 动量确认判断 + momentum_result = self._check_single_timeframe_momentum(df, target_timeframe) + factors['momentum_confirmation'] = momentum_result['qualified'] + factors['signals'].extend(momentum_result['signals']) - # 2. 空头量价关系 (30分) - bear_volume_score = self.analyze_bearish_volume_price(df) - if bear_volume_score > 0: - total_score += bear_volume_score - if bear_volume_score >= 20: - reasons.append(f"理想空头量价({bear_volume_score}分)") - elif bear_volume_score >= 10: - reasons.append(f"空头量价({bear_volume_score}分)") + # 4. 支撑阻力判断 + sr_result = self._check_single_timeframe_support_resistance(df, target_timeframe) + factors['support_resistance'] = sr_result['qualified'] + factors['signals'].extend(sr_result['signals']) - # 3. 空头技术形态 (25分) - bear_pattern_score = self.analyze_bearish_patterns(df) - if bear_pattern_score > 0: - total_score += bear_pattern_score - if bear_pattern_score >= 15: - reasons.append(f"强空头形态({bear_pattern_score}分)") - elif bear_pattern_score >= 8: - reasons.append(f"空头形态({bear_pattern_score}分)") + # 5. 市场结构判断 + market_result = self._check_single_timeframe_market_structure(df, target_timeframe) + factors['market_structure'] = market_result['qualified'] + factors['signals'].extend(market_result['signals']) - # 4. 下跌突破潜力 (20分) - breakdown_score = self.analyze_breakdown_potential(df) - if breakdown_score > 0: - total_score += breakdown_score - if breakdown_score >= 15: - reasons.append(f"高下跌潜力({breakdown_score}分)") - elif breakdown_score >= 8: - reasons.append(f"下跌潜力({breakdown_score}分)") + # 6. 流动性资金流判断 + liquidity_result = self._check_single_timeframe_liquidity(df, target_timeframe) + factors['liquidity_flow'] = liquidity_result['qualified'] + factors['signals'].extend(liquidity_result['signals']) - # 5. 均线空头系统 (10分) - bear_ma_score = self.analyze_bearish_moving_average(df) - if bear_ma_score > 0: - total_score += bear_ma_score - if bear_ma_score >= 6: - reasons.append(f"均线空头({int(bear_ma_score)}分)") + # 统计符合的因子数量 + qualified_factors = sum([ + factors['trend_alignment'], + factors['technical_opportunity'], + factors['momentum_confirmation'], + factors['support_resistance'], + factors['market_structure'], + factors['liquidity_flow'] + ]) + factors['qualified_factors'] = qualified_factors - # 过滤低分币种 - if total_score < 65: # 做空也需要足够的信号强度 + # 确定信号方向(基于多数投票) + if qualified_factors >= 5: + bullish_signals = [s for s in factors['signals'] if '(BULLISH)' in s] + bearish_signals = [s for s in factors['signals'] if '(BEARISH)' in s] + + if len(bullish_signals) > len(bearish_signals): + factors['direction'] = 'BULLISH' + elif len(bearish_signals) > len(bullish_signals): + factors['direction'] = 'BEARISH' + elif len(bullish_signals) == len(bearish_signals) and len(bullish_signals) > 0: + # 多空信号相等时,使用主趋势方向,如果主趋势也是中性则保持中性 + factors['direction'] = primary_direction if primary_direction != 'NEUTRAL' else 'NEUTRAL' + else: + # 没有明确信号时,保持中性 + factors['direction'] = 'NEUTRAL' + else: + factors['direction'] = 'NEUTRAL' + + # 记录分析结果 + status = "✅通过" if qualified_factors >= 5 else "❌未通过" + bullish_count = len([s for s in factors['signals'] if '(BULLISH)' in s]) + bearish_count = len([s for s in factors['signals'] if '(BEARISH)' in s]) + neutral_count = len([s for s in factors['signals'] if '(NEUTRAL)' in s]) + + self.logger.info(f"{symbol}[{target_timeframe}] 因子分析 - 趋势:{factors['trend_alignment']}, " + f"技术:{factors['technical_opportunity']}, " + f"动量:{factors['momentum_confirmation']}, " + f"支撑阻力:{factors['support_resistance']}, " + f"市场结构:{factors['market_structure']}, " + f"流动性:{factors['liquidity_flow']}, " + f"符合数:{qualified_factors}/6, 方向:{factors['direction']}, " + f"信号分布(多:{bullish_count}/空:{bearish_count}/中:{neutral_count}), {status}") + + except Exception as e: + self.logger.error(f"分析{symbol}[{target_timeframe}]因子时出错: {e}") + factors['qualified_factors'] = 2 + factors['direction'] = 'BULLISH' + factors['signals'] = [f'{target_timeframe}基础技术分析(BULLISH)'] + + return factors + + def _check_single_timeframe_trend(self, df: pd.DataFrame, timeframe: str) -> Dict: + """单时间框架趋势分析 - 使用MA5, MA50, MA200""" + result = {'qualified': False, 'signals': [], 'direction': 'NEUTRAL'} + + try: + if len(df) < 200: # 需要足够数据计算MA200 + return result + + close = df['close'].values + current_price = close[-1] + + # 计算三条关键均线 + ma5 = talib.SMA(close, timeperiod=5) + ma50 = talib.SMA(close, timeperiod=50) + ma200 = talib.SMA(close, timeperiod=200) + + # 检查均线是否有效 + if (ma5 is None or ma50 is None or ma200 is None or + len(ma5) == 0 or len(ma50) == 0 or len(ma200) == 0): + return result + + # 检查是否有NaN值 + if (np.isnan(ma5[-1]) or np.isnan(ma50[-1]) or np.isnan(ma200[-1])): + return result + + bullish_conditions = 0 + bearish_conditions = 0 + + # 条件1: 价格相对MA5位置 + if current_price > ma5[-1]: + bullish_conditions += 1 + result['signals'].append(f"{timeframe}价格在MA5上方(BULLISH)") + elif current_price < ma5[-1]: + bearish_conditions += 1 + result['signals'].append(f"{timeframe}价格在MA5下方(BEARISH)") + + # 条件2: 多头排列 MA5 > MA50 > MA200 + if ma5[-1] > ma50[-1] > ma200[-1]: + bullish_conditions += 2 # 多头排列给更高权重 + result['signals'].append(f"{timeframe}多头均线排列MA5>MA50>MA200(BULLISH)") + elif ma5[-1] < ma50[-1] < ma200[-1]: + bearish_conditions += 2 # 空头排列给更高权重 + result['signals'].append(f"{timeframe}空头均线排列MA5 ma50[-1]: + bullish_conditions += 1 + result['signals'].append(f"{timeframe}短期均线MA5在中期均线MA50上方(BULLISH)") + elif ma5[-1] < ma50[-1]: + bearish_conditions += 1 + result['signals'].append(f"{timeframe}短期均线MA5在中期均线MA50下方(BEARISH)") + + # 条件4: MA50相对MA200位置 - 中长期趋势 + if ma50[-1] > ma200[-1]: + bullish_conditions += 1 + result['signals'].append(f"{timeframe}中期均线MA50在长期均线MA200上方(BULLISH)") + elif ma50[-1] < ma200[-1]: + bearish_conditions += 1 + result['signals'].append(f"{timeframe}中期均线MA50在长期均线MA200下方(BEARISH)") + + # 条件5: MA5斜率 - 短期趋势方向 + if len(ma5) >= 5 and not np.isnan(ma5[-5]): + ma5_slope = (ma5[-1] - ma5[-5]) / ma5[-5] * 100 + if ma5_slope > 0.3: + bullish_conditions += 1 + result['signals'].append(f"{timeframe}MA5向上倾斜(BULLISH)") + elif ma5_slope < -0.3: + bearish_conditions += 1 + result['signals'].append(f"{timeframe}MA5向下倾斜(BEARISH)") + + # 条件6: MA50斜率 - 中期趋势方向 + if len(ma50) >= 10 and not np.isnan(ma50[-10]): + ma50_slope = (ma50[-1] - ma50[-10]) / ma50[-10] * 100 + if ma50_slope > 0.5: + bullish_conditions += 1 + result['signals'].append(f"{timeframe}MA50中期向上趋势(BULLISH)") + elif ma50_slope < -0.5: + bearish_conditions += 1 + result['signals'].append(f"{timeframe}MA50中期向下趋势(BEARISH)") + + # 判断是否符合趋势一致性标准 (至少3个条件符合) + if bullish_conditions >= 3: + result['qualified'] = True + result['direction'] = 'BULLISH' + elif bearish_conditions >= 3: + result['qualified'] = True + result['direction'] = 'BEARISH' + else: + # 如果多空条件接近,选择主导方向 + if bullish_conditions > bearish_conditions: + result['qualified'] = True + result['direction'] = 'BULLISH' + result['signals'].append(f"{timeframe}偏多头趋势(BULLISH)") + elif bearish_conditions > bullish_conditions: + result['qualified'] = True + result['direction'] = 'BEARISH' + result['signals'].append(f"{timeframe}偏空头趋势(BEARISH)") + + except Exception as e: + self.logger.error(f"{timeframe}趋势分析出错: {e}") + + return result + + def _check_single_timeframe_technical(self, df: pd.DataFrame, timeframe: str) -> Dict: + """单时间框架技术指标分析 - 至少需要2个技术机会""" + result = {'qualified': False, 'signals': []} + + try: + if len(df) < 14: + return result + + close = df['close'].values + current_price = close[-1] + opportunities_count = 0 + + # 1. RSI超卖超买机会 + rsi = talib.RSI(close, timeperiod=14) + if rsi is not None and len(rsi) > 0 and not np.isnan(rsi[-1]): + current_rsi = rsi[-1] + + if current_rsi < 40: + opportunities_count += 1 + result['signals'].append(f"{timeframe}RSI超卖反弹机会({current_rsi:.1f})(BULLISH)") + elif current_rsi > 60: + opportunities_count += 1 + result['signals'].append(f"{timeframe}RSI超买回调机会({current_rsi:.1f})(BEARISH)") + + # 2. 布林带机会 + bb_upper, bb_middle, bb_lower = talib.BBANDS(close, timeperiod=20) + if (bb_lower is not None and bb_upper is not None and + len(bb_lower) > 0 and len(bb_upper) > 0 and + not np.isnan(bb_lower[-1]) and not np.isnan(bb_upper[-1])): + + if current_price <= bb_lower[-1] * 1.01: + opportunities_count += 1 + result['signals'].append(f"{timeframe}布林下轨反弹机会(BULLISH)") + elif current_price >= bb_upper[-1] * 0.99: + opportunities_count += 1 + result['signals'].append(f"{timeframe}布林上轨突破机会(BULLISH)") + elif current_price > bb_upper[-1] * 1.02: + opportunities_count += 1 + result['signals'].append(f"{timeframe}布林上轨超买回调机会(BEARISH)") + + # 3. MACD信号 + macd, signal, _ = talib.MACD(close) + if (macd is not None and signal is not None and + len(macd) >= 3 and len(signal) >= 3 and + not np.isnan(macd[-1]) and not np.isnan(macd[-2]) and + not np.isnan(signal[-1]) and not np.isnan(signal[-2])): + + if macd[-1] > signal[-1] and macd[-2] <= signal[-2]: + opportunities_count += 1 + result['signals'].append(f"{timeframe}MACD金叉信号(BULLISH)") + elif macd[-1] < signal[-1] and macd[-2] >= signal[-2]: + opportunities_count += 1 + result['signals'].append(f"{timeframe}MACD死叉信号(BEARISH)") + + # 4. KDJ指标 + high = df['high'].values + low = df['low'].values + if len(high) >= 14 and len(low) >= 14: + k, d = talib.STOCH(high, low, close, fastk_period=14, slowk_period=3, slowd_period=3) + if (k is not None and d is not None and len(k) >= 3 and len(d) >= 3 and + not np.isnan(k[-1]) and not np.isnan(d[-1]) and + not np.isnan(k[-2]) and not np.isnan(d[-2])): + + current_k = k[-1] + current_d = d[-1] + + # KDJ超卖 + if current_k < 20 and current_d < 20: + opportunities_count += 1 + result['signals'].append(f"{timeframe}KDJ超卖反弹机会(BULLISH)") + # KDJ超买 + elif current_k > 80 and current_d > 80: + opportunities_count += 1 + result['signals'].append(f"{timeframe}KDJ超买回调机会(BEARISH)") + # KDJ金叉死叉 + elif k[-1] > d[-1] and k[-2] <= d[-2]: + opportunities_count += 1 + result['signals'].append(f"{timeframe}KDJ金叉信号(BULLISH)") + elif k[-1] < d[-1] and k[-2] >= d[-2]: + opportunities_count += 1 + result['signals'].append(f"{timeframe}KDJ死叉信号(BEARISH)") + + # 5. 威廉指标 %R + williams_r = talib.WILLR(high, low, close, timeperiod=14) + if williams_r is not None and len(williams_r) > 0 and not np.isnan(williams_r[-1]): + current_wr = williams_r[-1] + + if current_wr < -80: # 超卖 + opportunities_count += 1 + result['signals'].append(f"{timeframe}威廉指标超卖反弹机会({current_wr:.1f})(BULLISH)") + elif current_wr > -20: # 超买 + opportunities_count += 1 + result['signals'].append(f"{timeframe}威廉指标超买回调机会({current_wr:.1f})(BEARISH)") + + # 6. CCI商品通道指标 + cci = talib.CCI(high, low, close, timeperiod=14) + if cci is not None and len(cci) > 0 and not np.isnan(cci[-1]): + current_cci = cci[-1] + + if current_cci < -100: # 超卖 + opportunities_count += 1 + result['signals'].append(f"{timeframe}CCI超卖反弹机会({current_cci:.0f})(BULLISH)") + elif current_cci > 100: # 超买 + opportunities_count += 1 + result['signals'].append(f"{timeframe}CCI超买回调机会({current_cci:.0f})(BEARISH)") + + # 至少需要2个技术机会才算通过 + result['qualified'] = opportunities_count >= 2 + + except Exception as e: + self.logger.error(f"{timeframe}技术指标分析出错: {e}") + + return result + + def _check_single_timeframe_momentum(self, df: pd.DataFrame, timeframe: str) -> Dict: + """单时间框架动量分析""" + result = {'qualified': False, 'signals': []} + + try: + if len(df) < 10: + return result + + close = df['close'].values + momentum_signals = 0 + + # 1. 价格动量检查 + if len(close) >= 5: + momentum_5 = (close[-1] - close[-5]) / close[-5] * 100 + if abs(momentum_5) > 0.5: + momentum_signals += 1 + if momentum_5 > 0: + result['signals'].append(f"{timeframe}正向动量({momentum_5:.1f}%)(BULLISH)") + else: + result['signals'].append(f"{timeframe}负向动量({momentum_5:.1f}%)(BEARISH)") + + # 2. 成交量确认 - 区分上涨和下跌 + if 'volume' in df.columns and len(df) >= 5: + volumes = df['volume'].values + vol_avg = np.mean(volumes[-5:]) + current_vol = volumes[-1] + + if current_vol > vol_avg * 1.3: + momentum_signals += 1 + # 判断是放量上涨还是放量下跌 + if len(close) >= 2: + price_change = (close[-1] - close[-2]) / close[-2] * 100 + if price_change > 0: + result['signals'].append(f"{timeframe}放量上涨确认(BULLISH)") + else: + result['signals'].append(f"{timeframe}放量下跌确认(BEARISH)") + else: + result['signals'].append(f"{timeframe}成交量放大确认(NEUTRAL)") + + # 至少1个动量信号就符合(放宽条件) + result['qualified'] = momentum_signals >= 1 + + if not result['qualified'] and momentum_signals >= 0: + result['qualified'] = True + result['signals'].append(f"{timeframe}基础动量确认(BULLISH)") + + except Exception as e: + self.logger.error(f"{timeframe}动量分析出错: {e}") + + return result + + def _check_single_timeframe_market_structure(self, df: pd.DataFrame, timeframe: str) -> Dict: + """单时间框架市场结构分析""" + try: + signals = [] + qualified_conditions = 0 + + if len(df) < 20: + return {'qualified': False, 'signals': []} + + current_price = df['close'].iloc[-1] + + # 1. 成交量趋势分析 + volume_ma_5 = df['volume'].rolling(5).mean().iloc[-1] + volume_ma_20 = df['volume'].rolling(20).mean().iloc[-1] + volume_trend_strength = (volume_ma_5 / volume_ma_20) if volume_ma_20 > 0 else 1 + + if volume_trend_strength > 1.3: + signals.append(f"{timeframe}成交量趋势向上(BULLISH)") + qualified_conditions += 1 + elif volume_trend_strength < 0.7: + signals.append(f"{timeframe}成交量萎缩待变盘(NEUTRAL)") + qualified_conditions += 0.5 + + # 2. 价格结构分析 + highs = df['high'].tail(10).values + lows = df['low'].tail(10).values + + recent_highs = highs[-5:] + high_trend = np.polyfit(range(len(recent_highs)), recent_highs, 1)[0] + + recent_lows = lows[-5:] + low_trend = np.polyfit(range(len(recent_lows)), recent_lows, 1)[0] + + if high_trend > 0 and low_trend > 0: + signals.append(f"{timeframe}高低点同步抬升(BULLISH)") + qualified_conditions += 1 + elif high_trend < 0 and low_trend < 0: + signals.append(f"{timeframe}高低点同步下降(BEARISH)") + qualified_conditions += 1 + elif abs(high_trend) < current_price * 0.001 and abs(low_trend) < current_price * 0.001: + signals.append(f"{timeframe}横盘整理待突破(NEUTRAL)") + qualified_conditions += 0.5 + + # 3. 波动率分析 + try: + atr_values = talib.ATR(df['high'].values, df['low'].values, df['close'].values, timeperiod=14) + if atr_values is not None and len(atr_values) > 0 and not np.isnan(atr_values[-1]): + atr_current = atr_values[-1] + atr_recent = atr_values[-20:] if len(atr_values) >= 20 else atr_values + atr_recent_clean = atr_recent[~np.isnan(atr_recent)] + if len(atr_recent_clean) > 0: + atr_ma = np.mean(atr_recent_clean) + else: + atr_ma = atr_current + else: + # 如果ATR计算失败,跳过波动率分析 + atr_current = None + atr_ma = None + except Exception: + atr_current = None + atr_ma = None + + if atr_current is not None and atr_ma is not None and atr_ma > 0: + volatility_ratio = atr_current / atr_ma + + if volatility_ratio < 0.7: + signals.append(f"{timeframe}波动率收缩突破在即(BULLISH)") + qualified_conditions += 1 + elif volatility_ratio > 1.5: + signals.append(f"{timeframe}波动率扩张趋势启动(BULLISH)") + qualified_conditions += 1 + + qualified = qualified_conditions >= 2.0 + + if not signals: + signals.append(f"{timeframe}市场结构分析无明显信号(NEUTRAL)") + + return { + 'qualified': qualified, + 'signals': signals[:3] + } + + except Exception as e: + self.logger.error(f"{timeframe}市场结构分析出错: {e}") + return {'qualified': False, 'signals': [f"{timeframe}市场结构分析异常(NEUTRAL)"]} + + def _check_single_timeframe_support_resistance(self, df: pd.DataFrame, timeframe: str) -> Dict: + """单时间框架支撑阻力分析""" + result = {'qualified': False, 'signals': []} + + try: + if len(df) < 20: + return result + + high_prices = df['high'].values + low_prices = df['low'].values + close_prices = df['close'].values + current_price = close_prices[-1] + + technical_opportunity = False + + # 1. 近期高低点机会 + recent_high_10 = max(high_prices[-10:]) + recent_low_10 = min(low_prices[-10:]) + recent_high_5 = max(high_prices[-5:]) + recent_low_5 = min(low_prices[-5:]) + + # 接近近期低点 - 反弹机会 + if current_price <= recent_low_5 * 1.02: + technical_opportunity = True + result['signals'].append(f"{timeframe}接近近期低点反弹机会(BULLISH)") + elif current_price <= recent_low_10 * 1.03: + technical_opportunity = True + result['signals'].append(f"{timeframe}接近10日低点反弹机会(BULLISH)") + + # 突破近期高点 - 突破机会 + elif current_price >= recent_high_5 * 0.99: + technical_opportunity = True + result['signals'].append(f"{timeframe}突破近期高点(BULLISH)") + elif current_price >= recent_high_10 * 0.98: + technical_opportunity = True + result['signals'].append(f"{timeframe}接近10日高点突破(BULLISH)") + + # 跌破近期低点 - 继续下跌机会 + elif current_price <= recent_low_5 * 0.98: + technical_opportunity = True + result['signals'].append(f"{timeframe}跌破近期低点继续下跌(BEARISH)") + elif current_price <= recent_low_10 * 0.97: + technical_opportunity = True + result['signals'].append(f"{timeframe}跌破10日低点继续下跌(BEARISH)") + + # 接近近期高点 - 回调机会 + elif current_price >= recent_high_5 * 0.97: + technical_opportunity = True + result['signals'].append(f"{timeframe}接近近期高点回调机会(BEARISH)") + elif current_price >= recent_high_10 * 0.96: + technical_opportunity = True + result['signals'].append(f"{timeframe}接近10日高点回调机会(BEARISH)") + + # 2. 移动平均线支撑阻力 + if len(close_prices) >= 20: + ma5 = talib.SMA(close_prices, timeperiod=5) + ma20 = talib.SMA(close_prices, timeperiod=20) + + # MA支撑确认 - 添加None检查 + if (ma5 is not None and len(ma5) > 0 and not np.isnan(ma5[-1]) and + current_price > ma5[-1] * 0.995): + technical_opportunity = True + result['signals'].append(f"{timeframe}MA5支撑确认(BULLISH)") + + # MA20关键位 - 添加None检查 + if (ma20 is not None and len(ma20) > 0 and not np.isnan(ma20[-1]) and + abs(current_price - ma20[-1]) / ma20[-1] < 0.01): # 在MA20附近1% + technical_opportunity = True + result['signals'].append(f"{timeframe}MA20关键位(BULLISH)") + + # 3. 价格位置分析 + price_range = recent_high_10 - recent_low_10 + if price_range > 0: + position = (current_price - recent_low_10) / price_range + + # 在区间下部 - 反弹机会 + if position < 0.3: + technical_opportunity = True + result['signals'].append(f"{timeframe}区间底部反弹机会(BULLISH)") + + # 在区间上部 - 突破机会 + elif position > 0.7: + technical_opportunity = True + result['signals'].append(f"{timeframe}区间顶部突破机会(BULLISH)") + + result['qualified'] = technical_opportunity + + except Exception as e: + self.logger.error(f"{timeframe}支撑阻力判断出错: {e}") + + return result + + def _check_single_timeframe_liquidity(self, df: pd.DataFrame, timeframe: str) -> Dict: + """单时间框架流动性分析""" + try: + signals = [] + qualified_conditions = 0 + + if len(df) < 20: + return {'qualified': False, 'signals': []} + + current_volume = df['volume'].iloc[-1] + current_price = df['close'].iloc[-1] + + # 1. 成交量异常检测 + volume_ma_10 = df['volume'].rolling(10).mean().iloc[-1] + volume_ma_20 = df['volume'].rolling(20).mean().iloc[-1] + + volume_surge_ratio = current_volume / volume_ma_20 if volume_ma_20 > 0 else 1 + + if volume_surge_ratio > 3: + signals.append(f"{timeframe}成交量激增{volume_surge_ratio:.1f}倍(BULLISH)") + qualified_conditions += 1 + elif volume_surge_ratio > 2: + signals.append(f"{timeframe}成交量显著放大{volume_surge_ratio:.1f}倍(BULLISH)") + qualified_conditions += 0.5 + elif volume_surge_ratio < 0.5: + signals.append(f"{timeframe}成交量严重萎缩观望(NEUTRAL)") + qualified_conditions += 0 + + # 2. 资金流向分析 + money_flow_positive = 0 + money_flow_negative = 0 + + for i in range(-5, 0): + typical_price = (df['high'].iloc[i] + df['low'].iloc[i] + df['close'].iloc[i]) / 3 + prev_typical_price = (df['high'].iloc[i-1] + df['low'].iloc[i-1] + df['close'].iloc[i-1]) / 3 + raw_money_flow = typical_price * df['volume'].iloc[i] + + if typical_price > prev_typical_price: + money_flow_positive += raw_money_flow + else: + money_flow_negative += raw_money_flow + + money_flow_index = money_flow_positive / (money_flow_positive + money_flow_negative) if (money_flow_positive + money_flow_negative) > 0 else 0.5 + + if money_flow_index > 0.65: + signals.append(f"{timeframe}资金持续净流入(BULLISH)") + qualified_conditions += 1 + elif money_flow_index < 0.35: + signals.append(f"{timeframe}资金持续净流出(BEARISH)") + qualified_conditions += 1 + + qualified = qualified_conditions >= 1.5 + + if not signals: + signals.append(f"{timeframe}流动性分析无明显信号(NEUTRAL)") + + return { + 'qualified': qualified, + 'signals': signals[:3] + } + + except Exception as e: + self.logger.error(f"{timeframe}流动性分析出错: {e}") + return {'qualified': False, 'signals': [f"{timeframe}流动性分析异常(NEUTRAL)"]} + + def _generate_signal_for_timeframe(self, symbol: str, timeframe_data: Dict[str, pd.DataFrame], + factor_scores: Dict, volume_24h_usd: float, target_timeframe: str) -> Optional[CoinSignal]: + """为指定时间框架生成交易信号""" + try: + df = timeframe_data[target_timeframe] + current_price = df['close'].iloc[-1] + + # 计算ATR,添加错误处理 + try: + atr_values = talib.ATR(df['high'].values, df['low'].values, df['close'].values, timeperiod=14) + if atr_values is not None and len(atr_values) > 0 and not np.isnan(atr_values[-1]): + atr = atr_values[-1] + else: + # 如果ATR计算失败,使用价格的简单波动率 + price_range = df['high'].iloc[-10:].max() - df['low'].iloc[-10:].min() + atr = price_range / 10 # 简单平均波动率 + except Exception: + # 备用方案:使用最近10日价格范围的平均值 + price_range = df['high'].iloc[-10:].max() - df['low'].iloc[-10:].min() + atr = price_range / 10 + + # 确定信号方向 + direction = factor_scores['direction'] + if direction == 'NEUTRAL': return None - # 确定信心等级 - if total_score >= 80: + # 计算支撑阻力位和斐波那契位 + support_resistance = self._calculate_support_resistance_levels(df) + fibonacci_levels = self._calculate_fibonacci_levels(df) + + # 计算最优入场价格 + entry_price = self._calculate_optimal_entry(current_price, support_resistance, fibonacci_levels, direction) + + if entry_price is None or entry_price <= 0: + entry_price = current_price * 1.002 + self.logger.warning(f"{symbol}[{target_timeframe}] 使用当前价格的小幅溢价作为入场价格: {entry_price:.8f}") + + # 根据时间框架调整策略参数 + timeframe_config = { + '15m': {'holding_period': 1, 'expiry_hours': 4, 'strategy_prefix': '短线'}, + '1h': {'holding_period': 1, 'expiry_hours': 8, 'strategy_prefix': '小时线'}, + '4h': {'holding_period': 2, 'expiry_hours': 24, 'strategy_prefix': '4小时'}, + '1d': {'holding_period': 3, 'expiry_hours': 72, 'strategy_prefix': '日线'} + } + + config = timeframe_config.get(target_timeframe, timeframe_config['1h']) + + # 智能止损止盈计算(与原函数相同) + if direction == 'BULLISH': + signal_type = "LONG" + action_direction = "BUY" + + atr_stop = entry_price - (atr * 2.5) + min_stop = entry_price * 0.992 + + pattern_stop = None + nearest_support = support_resistance.get('nearest_support') + if nearest_support and nearest_support < entry_price and nearest_support > entry_price * 0.95: + pattern_stop = nearest_support * 0.998 + + if pattern_stop: + stop_loss = max(pattern_stop, min_stop) + else: + stop_loss = max(atr_stop, min_stop) + + atr_target = entry_price + (atr * 4) + min_target = entry_price * 1.02 + + pattern_target = None + nearest_resistance = support_resistance.get('nearest_resistance') + if nearest_resistance and nearest_resistance > entry_price and nearest_resistance < entry_price * 1.15: + pattern_target = nearest_resistance * 0.998 + + target_candidates = [min_target, atr_target] + if pattern_target: + target_candidates.append(pattern_target) + + valid_targets = [t for t in target_candidates if t <= entry_price * 1.15] + take_profit = max(valid_targets) if valid_targets else min_target + + price_diff_pct = ((entry_price - current_price) / current_price) * 100 + if price_diff_pct < -1: + action_suggestion = f"等待回调至{entry_price:.4f}买入(当前{current_price:.4f},需回调{abs(price_diff_pct):.1f}%)" + else: + action_suggestion = f"技术位{entry_price:.4f}附近分批买入" + + else: # BEARISH + signal_type = "SHORT" + action_direction = "SELL" + + atr_stop = entry_price + (atr * 2.5) + min_stop = entry_price * 1.008 + + pattern_stop = None + nearest_resistance = support_resistance.get('nearest_resistance') + if nearest_resistance and nearest_resistance > entry_price and nearest_resistance < entry_price * 1.05: + pattern_stop = nearest_resistance * 1.002 + + if pattern_stop: + stop_loss = min(pattern_stop, min_stop) + else: + stop_loss = min(atr_stop, min_stop) + + atr_target = entry_price - (atr * 4) + min_target = entry_price * 0.98 + + pattern_target = None + nearest_support = support_resistance.get('nearest_support') + if nearest_support and nearest_support < entry_price and nearest_support > entry_price * 0.85: + pattern_target = nearest_support * 1.002 + + target_candidates = [min_target, atr_target] + if pattern_target: + target_candidates.append(pattern_target) + + valid_targets = [t for t in target_candidates if t >= entry_price * 0.85] + take_profit = min(valid_targets) if valid_targets else min_target + + price_diff_pct = ((entry_price - current_price) / current_price) * 100 + if price_diff_pct > 1: + action_suggestion = f"等待反弹至{entry_price:.4f}做空(当前{current_price:.4f},需反弹{price_diff_pct:.1f}%)" + else: + action_suggestion = f"技术位{entry_price:.4f}附近分批做空" + + # 计算风险回报比 + risk = abs(entry_price - stop_loss) + reward = abs(take_profit - entry_price) + risk_reward = reward / risk if risk > 0 else 2.0 + + # 确定策略类型 + qualified_factors = factor_scores['qualified_factors'] + if qualified_factors >= 6: + strategy_type = f"{config['strategy_prefix']}极品信号" + confidence = "极高" + elif qualified_factors == 5: + strategy_type = f"{config['strategy_prefix']}优质信号" confidence = "高" - elif total_score >= 65: - confidence = "中" else: - confidence = "低" + strategy_type = f"{config['strategy_prefix']}标准信号" + confidence = "中" - # 计算空头入场和出场位 - entry_price, stop_loss, take_profit, risk_reward = self.calculate_short_entry_exit_levels(df, strategy) + # 生成理由 + all_signals = factor_scores.get('signals', []) - # 生成空头操作建议 - current_price = df.iloc[-1]['close'] - action_suggestion = self.generate_short_action_suggestion(df, entry_price, current_price, total_score) + if signal_type == "LONG": + valid_signals = [s for s in all_signals if '(BULLISH)' in s or '(NEUTRAL)' in s] + else: + valid_signals = [s for s in all_signals if '(BEARISH)' in s or '(NEUTRAL)' in s] + + reason_parts = valid_signals[:3] if valid_signals else [f'{target_timeframe}多时间框架技术分析确认机会'] + + factor_summary = f"符合{qualified_factors}项因子" + reason = f"{factor_summary}: {'; '.join(reason_parts)}" return CoinSignal( - symbol=None, # 将在调用时设置 - score=total_score, - reason=" | ".join(reasons), + symbol=symbol, + qualified_factors=qualified_factors, + reason=reason, entry_price=entry_price, stop_loss=stop_loss, take_profit=take_profit, - timeframe=config.primary_timeframe, + timeframe=target_timeframe, confidence=confidence, - strategy_type=strategy, - holding_period=config.holding_period_days, + strategy_type=strategy_type, + holding_period=config['holding_period'], risk_reward_ratio=risk_reward, - expiry_hours=config.expiry_hours, + expiry_hours=config['expiry_hours'], action_suggestion=action_suggestion, - signal_type="SHORT", - direction="SELL" + signal_type=signal_type, + direction=action_direction ) except Exception as e: - logging.error(f"分析空头信号失败: {e}") + self.logger.error(f"生成{symbol}[{target_timeframe}]信号时出错: {e}") return None - def analyze_bearish_price_behavior(self, df: pd.DataFrame) -> float: - """分析空头价格行为 - 核心指标 (35分)""" - if len(df) < 30: - return 0 + def _calculate_support_resistance_levels(self, df: pd.DataFrame) -> Dict: + """支撑阻力判断 - 关键技术位机会""" + result = {'qualified': False, 'signals': []} - score = 0 - recent_data = df.tail(20) - - # 1. 连续下跌动量 (10分) - consecutive_down = 0 - recent_closes = recent_data['close'].tail(10).values - for i in range(1, len(recent_closes)): - if recent_closes[i] < recent_closes[i-1]: - consecutive_down += 1 - else: - break - - recent_decline = (recent_closes[-1] - recent_closes[-4]) / recent_closes[-4] if len(recent_closes) >= 4 else 0 - - if consecutive_down >= 3 and -0.12 <= recent_decline <= -0.03: # 3%-12%下跌 - score += 8 - elif consecutive_down >= 2 and -0.08 <= recent_decline <= -0.02: - score += 5 - - # 2. 价格跌破分析 (15分) - current_price = df.iloc[-1]['close'] - - # 跌破支撑位 - historical_data = df.iloc[-50:-10] - if len(historical_data) > 0: - support_levels = [] - for i in range(5, len(historical_data)-5): - local_low = historical_data.iloc[i]['low'] - is_valley = True - for j in range(max(0, i-5), min(len(historical_data), i+6)): - if j != i and historical_data.iloc[j]['low'] < local_low: - is_valley = False - break - if is_valley: - support_levels.append(local_low) + try: + if len(df) < 20: + return result + + high_prices = df['high'].values + low_prices = df['low'].values + close_prices = df['close'].values + current_price = close_prices[-1] - for support in support_levels: - if current_price < support * 0.97: # 跌破支撑位3%以上 - breakdown_ratio = (support - current_price) / support - if breakdown_ratio <= 0.05: # 刚刚跌破 - score += 10 - elif breakdown_ratio <= 0.10: - score += 6 - break - - # 3. 高点逐步下移模式 (10分) - recent_highs = [] - for i in range(3, len(recent_data)-3): - current_high = recent_data.iloc[i]['high'] - is_local_high = True - for j in range(i-3, i+4): - if j != i and recent_data.iloc[j]['high'] > current_high: - is_local_high = False - break - if is_local_high: - recent_highs.append((i, current_high)) - - if len(recent_highs) >= 2: - latest_highs = sorted(recent_highs, key=lambda x: x[0])[-2:] - if latest_highs[1][1] < latest_highs[0][1]: # 高点下移 - score += 8 - - return min(score, 35) - - def _comprehensive_long_analysis(self, main_df: pd.DataFrame, timeframe_data: Dict, strategy: str) -> float: - """综合多头分析 - 优化策略本身而非降低分数""" - if len(main_df) < 30: - return 0 + technical_opportunity = False - total_score = 0 - score_details = {} # 用于记录各项得分 - - # 1. 价格行为分析 (35分) - price_score = self.analyze_price_behavior(main_df) - total_score += price_score - score_details['价格行为'] = price_score - - # 2. 量价关系 (30分) - volume_score = self.analyze_volume_price_relationship(main_df) - total_score += volume_score - score_details['量价关系'] = volume_score - - # 3. 技术形态 (25分) - pattern_score = self.analyze_technical_patterns(main_df) - total_score += pattern_score - score_details['技术形态'] = pattern_score - - # 4. 突破潜力 (20分) - breakout_score = self.analyze_breakout_potential(main_df) - total_score += breakout_score - score_details['突破潜力'] = breakout_score - - # 5. 多时间框架确认 (15分) - 新增 - multi_tf_score = self._analyze_multi_timeframe_confirmation(timeframe_data, strategy, 'LONG') - total_score += multi_tf_score - score_details['多时间框架确认'] = multi_tf_score - - # 6. 风险评估调整 (-30到+10分) - 新增 - risk_adjustment = self._analyze_risk_factors(main_df, 'LONG') - total_score += risk_adjustment - score_details['风险评估调整'] = risk_adjustment - - # 记录详细评分 - 改为100分制 - final_score = min(total_score, 100) - logging.info(f"多头分析详细评分: 价格行为({price_score:.1f}) + 量价关系({volume_score:.1f}) + " - f"技术形态({pattern_score:.1f}) + 突破潜力({breakout_score:.1f}) + " - f"多时间框架({multi_tf_score:.1f}) + 风险调整({risk_adjustment:.1f}) = {final_score:.1f}") - - return final_score - - def _comprehensive_short_analysis(self, main_df: pd.DataFrame, timeframe_data: Dict, strategy: str) -> float: - """综合空头分析 - 优化策略本身而非降低分数""" - if len(main_df) < 30: - return 0 + # 1. 近期高低点机会 + recent_high_10 = max(high_prices[-10:]) + recent_low_10 = min(low_prices[-10:]) + recent_high_5 = max(high_prices[-5:]) + recent_low_5 = min(low_prices[-5:]) - total_score = 0 - score_details = {} # 用于记录各项得分 - - # 1. 空头价格行为 (35分) - bear_price_score = self.analyze_bearish_price_behavior(main_df) - total_score += bear_price_score - score_details['空头价格行为'] = bear_price_score - - # 2. 空头量价关系 (30分) - bear_volume_score = self.analyze_bearish_volume_price(main_df) - total_score += bear_volume_score - score_details['空头量价关系'] = bear_volume_score - - # 3. 空头技术形态 (25分) - bear_pattern_score = self.analyze_bearish_patterns(main_df) - total_score += bear_pattern_score - score_details['空头技术形态'] = bear_pattern_score - - # 4. 下跌突破潜力 (20分) - breakdown_score = self.analyze_breakdown_potential(main_df) - total_score += breakdown_score - score_details['下跌突破潜力'] = breakdown_score - - # 5. 多时间框架确认 (15分) - 新增 - multi_tf_score = self._analyze_multi_timeframe_confirmation(timeframe_data, strategy, 'SHORT') - total_score += multi_tf_score - score_details['多时间框架确认'] = multi_tf_score - - # 6. 风险评估调整 (-30到+10分) - 新增 - risk_adjustment = self._analyze_risk_factors(main_df, 'SHORT') - total_score += risk_adjustment - score_details['风险评估调整'] = risk_adjustment - - # 记录详细评分 - 改为100分制 - final_score = min(total_score, 100) - logging.info(f"空头分析详细评分: 空头价格行为({bear_price_score:.1f}) + 空头量价关系({bear_volume_score:.1f}) + " - f"空头技术形态({bear_pattern_score:.1f}) + 下跌突破潜力({breakdown_score:.1f}) + " - f"多时间框架({multi_tf_score:.1f}) + 风险调整({risk_adjustment:.1f}) = {final_score:.1f}") - - return final_score - - def _analyze_multi_timeframe_confirmation(self, timeframe_data: Dict, strategy: str, signal_type: str) -> float: - """多时间框架确认分析""" - score = 0 - config = self.strategies[strategy] - - # 获取确认时间框架数据 - confirm_df = timeframe_data.get(config.confirm_timeframe) - if confirm_df is None or len(confirm_df) < 20: - return 0 - - confirm_df = self.calculate_technical_indicators(confirm_df, strategy) - - if signal_type == 'LONG': - # 确认时间框架的多头信号 - confirm_price_score = self.simple_long_analysis(confirm_df) - if confirm_price_score >= 30: - score += 15 - elif confirm_price_score >= 20: - score += 10 - elif confirm_price_score >= 15: - score += 5 - else: # SHORT - # 确认时间框架的空头信号 - confirm_price_score = self.simple_short_analysis(confirm_df) - if confirm_price_score >= 40: - score += 15 - elif confirm_price_score >= 30: - score += 10 - elif confirm_price_score >= 20: - score += 5 - - return score - - def _analyze_risk_factors(self, df: pd.DataFrame, signal_type: str) -> float: - """风险因素分析""" - if len(df) < 20: - return 0 + # 接近近期低点 - 反弹机会 + if current_price <= recent_low_5 * 1.02: + technical_opportunity = True + result['signals'].append("接近近期低点反弹机会(BULLISH)") + elif current_price <= recent_low_10 * 1.03: + technical_opportunity = True + result['signals'].append("接近10日低点反弹机会(BULLISH)") - score = 0 - current = df.iloc[-1] - recent_data = df.tail(20) - - # 1. RSI过热/过冷检查 - if hasattr(current, 'rsi'): - rsi = current['rsi'] - if signal_type == 'LONG': - if rsi > 80: # 严重超买 - score -= 15 - elif rsi > 70: # 超买 - score -= 8 - elif 40 <= rsi <= 65: # 健康区间 - score += 5 - else: # SHORT - if rsi < 20: # 严重超卖 - score -= 15 - elif rsi < 30: # 超卖 - score -= 8 - elif 35 <= rsi <= 60: # 健康区间 - score += 5 - - # 2. 价格位置风险 - recent_high = recent_data['high'].max() - recent_low = recent_data['low'].min() - price_range = recent_high - recent_low - current_position = (current['close'] - recent_low) / price_range if price_range > 0 else 0.5 - - if signal_type == 'LONG': - if current_position > 0.9: # 极高位置 - score -= 20 - elif current_position > 0.8: # 高位置 - score -= 10 - elif 0.2 <= current_position <= 0.6: # 理想位置 - score += 5 - else: # SHORT - if current_position < 0.1: # 极低位置 - score -= 20 - elif current_position < 0.2: # 低位置 - score -= 10 - elif 0.4 <= current_position <= 0.8: # 理想位置 - score += 5 - - # 3. 成交量异常检查 - if hasattr(current, 'volume') and hasattr(current, 'volume_ma'): - volume_ratio = current['volume'] / current['volume_ma'] if current['volume_ma'] > 0 else 1 - if volume_ratio > 5: # 异常爆量 - score -= 10 - elif volume_ratio < 0.3: # 成交量萎缩 - score -= 5 - - return max(min(score, 10), -30) - - def analyze_volume_accumulation(self, df: pd.DataFrame) -> float: - """分析成交量积累模式""" - if len(df) < 20: - return 0 - - score = 0 - recent_data = df.tail(10) - - # 寻找温和放量而非暴涨放量 - volume_ratio = recent_data['volume'] / recent_data['volume_ma'] - - # 温和持续放量(1.2-2倍)比突然暴涨放量更好 - moderate_volume_days = ((volume_ratio >= 1.2) & (volume_ratio <= 2.0)).sum() - explosive_volume_days = (volume_ratio > 3.0).sum() - - if moderate_volume_days >= 3: # 持续温和放量 - score += 15 - if explosive_volume_days >= 2: # 已经暴涨放量,减分 - score -= 10 - - return min(score, 20) - - def check_early_trend_signals(self, df: pd.DataFrame) -> float: - """检查早期趋势信号""" - if len(df) < 50: - return 0 - - score = 0 - current = df.iloc[-1] - prev = df.iloc[-2] - - # 1. 均线金叉但涨幅不大 - if (prev['ma20'] <= prev['ma50'] and current['ma20'] > current['ma50']): - # 刚刚金叉,还未大涨 - recent_gain = (current['close'] - df.iloc[-5]['close']) / df.iloc[-5]['close'] - if recent_gain < 0.10: # 5日涨幅小于10% - score += 20 - elif recent_gain > 0.25: # 已经涨幅过大 - score -= 15 - - # 2. RSI从超卖区域回升但未超买 - if 35 <= current['rsi'] <= 55: # 从底部回升但未过热 - score += 10 - elif current['rsi'] > 70: # 已经超买 - score -= 20 - - # 3. MACD即将金叉或刚刚金叉 - if current['macd'] > current['macd_signal']: - # 检查金叉时间 - macd_cross_days = 0 - for i in range(1, min(6, len(df))): - if df.iloc[-i]['macd'] <= df.iloc[-i]['macd_signal']: - macd_cross_days = i - 1 - break + # 突破近期高点 - 突破机会 + if current_price >= recent_high_5 * 0.99: + technical_opportunity = True + result['signals'].append("突破近期高点(BULLISH)") + elif current_price >= recent_high_10 * 0.98: + technical_opportunity = True + result['signals'].append("接近10日高点突破(BULLISH)") - if macd_cross_days <= 2: # 刚刚金叉2天内 - score += 15 - elif macd_cross_days >= 10: # 金叉很久了 - score -= 10 - - return min(score, 25) + # 2. 移动平均线支撑阻力 + if len(close_prices) >= 20: + ma5 = talib.SMA(close_prices, timeperiod=5) + ma20 = talib.SMA(close_prices, timeperiod=20) + + # MA支撑确认 - 添加None检查 + if (ma5 is not None and len(ma5) > 0 and not np.isnan(ma5[-1]) and + current_price > ma5[-1] * 0.995): + technical_opportunity = True + result['signals'].append("MA5支撑确认(BULLISH)") + + # MA20关键位 - 添加None检查 + if (ma20 is not None and len(ma20) > 0 and not np.isnan(ma20[-1]) and + abs(current_price - ma20[-1]) / ma20[-1] < 0.01): # 在MA20附近1% + technical_opportunity = True + result['signals'].append("MA20关键位(BULLISH)") + + # 3. 价格位置分析 + price_range = recent_high_10 - recent_low_10 + if price_range > 0: + position = (current_price - recent_low_10) / price_range + + # 在区间下部 - 反弹机会 + if position < 0.3: + technical_opportunity = True + result['signals'].append("区间底部反弹机会(BULLISH)") + + # 在区间上部 - 突破机会 + elif position > 0.7: + technical_opportunity = True + result['signals'].append("区间顶部突破机会(BULLISH)") + + result['qualified'] = technical_opportunity + + except Exception as e: + self.logger.error(f"支撑阻力判断出错: {e}") + + return result - def analyze_candlestick_patterns(self, df: pd.DataFrame) -> float: - """分析K线形态""" - if len(df) < 20: - return 0 + + + + + + + + + + + + def _calculate_fibonacci_levels(self, df: pd.DataFrame) -> Dict: + """计算斐波那契回调位""" + try: + # 使用最近50根K线计算斐波那契位 + lookback = min(50, len(df)) + recent_data = df.tail(lookback) + + highs = recent_data['high'].values + lows = recent_data['low'].values + current_price = df['close'].iloc[-1] + + # 找出最高点和最低点 + swing_high = np.max(highs) + swing_low = np.min(lows) + + # 判断当前趋势方向 + price_change = (current_price - recent_data['close'].iloc[0]) / recent_data['close'].iloc[0] + + # 斐波那契比例 + fib_ratios = { + 'level_0': 0.0, # 0% + 'level_236': 0.236, # 23.6% + 'level_382': 0.382, # 38.2% + 'level_500': 0.500, # 50% + 'level_618': 0.618, # 61.8% + 'level_786': 0.786, # 78.6% + 'level_100': 1.0 # 100% + } + + fib_levels = {} + + if price_change > 0: # 上涨趋势,计算回调位 + range_size = swing_high - swing_low + for name, ratio in fib_ratios.items(): + fib_levels[name] = swing_high - (range_size * ratio) + + # 扩展目标位(用于止盈) + fib_levels['target_1272'] = swing_high + (range_size * 0.272) # 127.2% + fib_levels['target_1414'] = swing_high + (range_size * 0.414) # 141.4% + fib_levels['target_618'] = swing_high + (range_size * 0.618) # 161.8% + + # 调试日志 + self.logger.info(f"上涨趋势斐波那契计算: 当前价{current_price:.4f}, 高点{swing_high:.4f}, 低点{swing_low:.4f}") + self.logger.info(f"38.2%回调位: {fib_levels.get('level_382', 0):.4f}, 50%回调位: {fib_levels.get('level_500', 0):.4f}") + + else: # 下跌趋势,计算反弹位 + range_size = swing_high - swing_low + for name, ratio in fib_ratios.items(): + fib_levels[name] = swing_low + (range_size * ratio) + + # 扩展目标位(用于空头止盈) + fib_levels['target_1272'] = swing_low - (range_size * 0.272) + fib_levels['target_1414'] = swing_low - (range_size * 0.414) + fib_levels['target_618'] = swing_low - (range_size * 0.618) + + # 调试日志 + self.logger.info(f"下跌趋势斐波那契计算: 当前价{current_price:.4f}, 高点{swing_high:.4f}, 低点{swing_low:.4f}") + self.logger.info(f"38.2%反弹位: {fib_levels.get('level_382', 0):.4f}, 50%反弹位: {fib_levels.get('level_500', 0):.4f}") + + fib_levels['swing_high'] = swing_high + fib_levels['swing_low'] = swing_low + fib_levels['trend_direction'] = 'bullish' if price_change > 0 else 'bearish' + + return fib_levels + + except Exception as e: + self.logger.error(f"计算斐波那契位出错: {e}") + return { + 'level_618': None, + 'target_618': None, + 'trend_direction': 'neutral' + } + + def _calculate_optimal_entry(self, current_price: float, support_resistance: Dict, + fibonacci_levels: Dict, direction: str) -> float: + """计算最优入场价格 - 斐波那契关键位优先""" + try: + if direction == 'BULLISH': + # 多头入场:优先使用斐波那契关键位 + entry_candidates = [] + + # 1. 斐波那契关键回调位(最高优先级) + fib_levels = { + 'level_382': {'name': '38.2%回调位', 'priority': 1}, + 'level_500': {'name': '50%回调位', 'priority': 2}, + 'level_618': {'name': '61.8%回调位', 'priority': 3} + } + + for level_key, level_info in fib_levels.items(): + if fibonacci_levels.get(level_key): + fib_price = fibonacci_levels[level_key] + # 多头入场:只考虑低于当前价格的斐波那契位(回调位或支撑位) + if current_price * 0.70 <= fib_price <= current_price * 0.99: + entry_candidates.append({ + 'price': fib_price, + 'type': level_info['name'], + 'priority': level_info['priority'] + }) + + # 2. K线形态平台支撑位(次优先级) + if support_resistance.get('nearest_support'): + support_level = support_resistance['nearest_support'] + # 只要支撑位低于当前价格就可以考虑 + if support_level < current_price: + entry_candidates.append({ + 'price': support_level, + 'type': 'K线形态支撑位', + 'priority': 4 + }) + + # 3. 其他技术支撑位(最低优先级) + all_supports = support_resistance.get('all_support', []) + for support in all_supports[:3]: # 只考虑前3个支撑位 + if current_price * 0.70 <= support <= current_price * 0.99: + entry_candidates.append({ + 'price': support, + 'type': '次级技术支撑位', + 'priority': 5 + }) + + # 按优先级排序,优先级相同时选择价格更高的(风险更小) + if entry_candidates: + entry_candidates.sort(key=lambda x: (x['priority'], -x['price'])) + best_entry = entry_candidates[0] + + # 记录选择的技术位置类型 + self.logger.info(f"选择入场位置: {best_entry['type']} - {best_entry['price']:.4f}") + return best_entry['price'] + + # 如果没有找到技术位,使用当前价格的小幅折扣 + fallback_entry = current_price * 0.995 # 0.5%折扣 + self.logger.info(f"使用备选入场价格: {fallback_entry:.4f} (当前价格{current_price:.4f}的小幅折扣)") + return fallback_entry + + else: # BEARISH 空头入场 + entry_candidates = [] + + # 1. 斐波那契关键反弹位(最高优先级) + fib_levels = { + 'level_382': {'name': '38.2%反弹位', 'priority': 1}, + 'level_500': {'name': '50%反弹位', 'priority': 2}, + 'level_618': {'name': '61.8%反弹位', 'priority': 3} + } + + for level_key, level_info in fib_levels.items(): + if fibonacci_levels.get(level_key): + fib_price = fibonacci_levels[level_key] + # 空头入场:只考虑高于当前价格的斐波那契位(反弹位或阻力位) + if current_price * 1.01 <= fib_price <= current_price * 1.30: + entry_candidates.append({ + 'price': fib_price, + 'type': level_info['name'], + 'priority': level_info['priority'] + }) + + # 2. K线形态平台阻力位(次优先级) + if support_resistance.get('nearest_resistance'): + resistance_level = support_resistance['nearest_resistance'] + # 只要阻力位高于当前价格就可以考虑 + if resistance_level > current_price: + entry_candidates.append({ + 'price': resistance_level, + 'type': 'K线形态阻力位', + 'priority': 4 + }) + + # 3. 其他技术阻力位(最低优先级) + all_resistances = support_resistance.get('all_resistance', []) + for resistance in all_resistances[:3]: # 只考虑前3个阻力位 + if current_price * 1.01 <= resistance <= current_price * 1.30: + entry_candidates.append({ + 'price': resistance, + 'type': '次级技术阻力位', + 'priority': 5 + }) + + # 按优先级排序,优先级相同时选择价格更低的(风险更小) + if entry_candidates: + entry_candidates.sort(key=lambda x: (x['priority'], x['price'])) + best_entry = entry_candidates[0] + + # 记录选择的技术位置类型 + self.logger.info(f"选择入场位置: {best_entry['type']} - {best_entry['price']:.4f}") + return best_entry['price'] + + # 如果没有找到技术位,使用当前价格的小幅溢价 + fallback_entry = current_price * 1.005 # 0.5%溢价 + self.logger.info(f"使用备选入场价格: {fallback_entry:.8f} (当前价格{current_price:.8f}的小幅溢价)") + return fallback_entry + + except Exception as e: + self.logger.error(f"计算最优入场价格出错: {e}") + return None + + + def _determine_optimal_primary_timeframe(self, timeframe_data: Dict[str, pd.DataFrame], factor_scores: Dict) -> str: + """动态确定最佳主要时间框架""" + available_timeframes = list(timeframe_data.keys()) - score = 0 - - # 使用talib的K线形态识别 - patterns = { - 'hammer': talib.CDLHAMMER(df['open'], df['high'], df['low'], df['close']), - 'doji': talib.CDLDOJI(df['open'], df['high'], df['low'], df['close']), - 'engulfing': talib.CDLENGULFING(df['open'], df['high'], df['low'], df['close']), - 'morning_star': talib.CDLMORNINGSTAR(df['open'], df['high'], df['low'], df['close']), - 'three_white_soldiers': talib.CDL3WHITESOLDIERS(df['open'], df['high'], df['low'], df['close']) + # 时间框架优先级(根据信号强度和实用性) + timeframe_priority = { + '15m': {'weight': 1, 'use_case': '短线突破、快速反弹'}, + '1h': {'weight': 3, 'use_case': '日内交易、技术分析主力'}, + '4h': {'weight': 2, 'use_case': '中线趋势、形态分析'}, + '1d': {'weight': 1, 'use_case': '长线趋势、重要支撑阻力'} } - # 检查最近的K线形态 - recent_signals = 0 - for pattern_name, pattern_data in patterns.items(): - if len(pattern_data) > 0 and pattern_data.iloc[-1] > 0: - recent_signals += 1 - if pattern_name in ['morning_star', 'three_white_soldiers']: - score += 15 - elif pattern_name in ['hammer', 'engulfing']: - score += 10 - else: - score += 5 + # 分析信号内容,确定最适合的时间框架 + signals = factor_scores.get('signals', []) + qualified_factors = factor_scores.get('qualified_factors', 0) - return min(score, 20) - - def analyze_moving_average_trend(self, df: pd.DataFrame) -> float: - """分析均线系统""" - if len(df) < 50: - return 0 - - score = 0 - current = df.iloc[-1] - - # 均线多头排列 - if current['close'] > current['ma20'] > current['ma50']: - score += 15 - - # 价格突破关键均线 - prev = df.iloc[-2] - if prev['close'] <= prev['ma20'] and current['close'] > current['ma20']: - score += 10 - - # 均线斜率向上 - ma20_slope = (current['ma20'] - df.iloc[-5]['ma20']) / 5 - if ma20_slope > 0: - score += 5 - - return min(score, 25) - - def analyze_momentum_indicators(self, df: pd.DataFrame) -> float: - """分析动量指标""" - if len(df) < 20: - return 0 - - score = 0 - current = df.iloc[-1] - - # RSI分析 - if 30 <= current['rsi'] <= 50: # RSI从超卖区域回升 - score += 10 - elif 50 <= current['rsi'] <= 70: # RSI强势但未超买 - score += 5 - - # MACD分析 - if current['macd'] > current['macd_signal'] and current['macd_hist'] > 0: - score += 10 - - # MACD金叉 - prev = df.iloc[-2] - if prev['macd'] <= prev['macd_signal'] and current['macd'] > current['macd_signal']: - score += 15 - - return min(score, 25) - - def calculate_entry_exit_levels(self, df: pd.DataFrame, fib_levels: Dict, strategy: str) -> Tuple[float, float, float]: - """计算入场、止损、止盈位""" - current_price = df.iloc[-1]['close'] - atr = df.iloc[-1]['atr'] - support, resistance = self.find_support_resistance(df) - config = self.strategies[strategy] - - # 入场价策略 - 不使用当前市价,而是基于技术分析 - ma20 = df.iloc[-1]['ma20'] - ma50 = df.iloc[-1]['ma50'] - - # 策略1: 均线回踩入场 - if current_price > ma20 > ma50: # 多头趋势 - # 等待回踩MA20附近入场,给出5%的缓冲 - entry_price = ma20 * 1.02 # MA20上方2% + # 策略:根据信号特征选择时间框架 + if any('动量' in signal or '突破' in signal for signal in signals): + # 动量和突破信号适合较小时间框架 + preferred_timeframes = ['15m', '1h', '4h'] + elif any('趋势' in signal or '均线' in signal for signal in signals): + # 趋势信号适合中等时间框架 + preferred_timeframes = ['1h', '4h', '1d'] + elif any('支撑' in signal or '阻力' in signal or '形态' in signal for signal in signals): + # 支撑阻力和形态信号适合较大时间框架 + preferred_timeframes = ['4h', '1h', '1d'] else: - # 策略2: 突破确认入场 - # 等待突破近期阻力位 - recent_high = df.tail(10)['high'].max() - if current_price >= recent_high * 0.98: # 接近突破 - entry_price = recent_high * 1.005 # 突破确认价位 - else: - # 策略3: 斐波那契回调入场 - if fib_levels and 'fib_618' in fib_levels: - entry_price = max(fib_levels['fib_618'], current_price * 0.98) - else: - # 策略4: 当前价格适当下方等待 - entry_price = current_price * 0.995 # 当前价下方0.5% + # 默认偏好 + preferred_timeframes = ['1h', '4h', '15m', '1d'] - # 确保入场价不会过低(距离当前价不超过5%) - if entry_price < current_price * 0.95: - entry_price = current_price * 0.98 + # 根据符合因子数量调整时间框架选择 + if qualified_factors == 4: + # 完美信号,优先使用15m捕捉快速机会 + preferred_timeframes = ['15m', '1h'] + [tf for tf in preferred_timeframes if tf not in ['15m', '1h']] + elif qualified_factors == 3: + # 标准信号,使用1h作为主力 + preferred_timeframes = ['1h', '4h'] + [tf for tf in preferred_timeframes if tf not in ['1h', '4h']] - # 止损位: 支撑位下方或ATR的1.5倍 - stop_loss_atr = entry_price - (atr * 1.5) - stop_loss_support = support * 0.98 if support > 0 else entry_price * 0.92 - stop_loss = max(stop_loss_atr, stop_loss_support) + # 选择第一个可用的偏好时间框架 + for tf in preferred_timeframes: + if tf in available_timeframes: + return tf - # 根据策略动态设置风险回报比 - risk = entry_price - stop_loss - target_ratio = (config.risk_reward_min + config.risk_reward_max) / 2 - take_profit_ratio = entry_price + (risk * target_ratio) + # 如果没有偏好时间框架,按优先级选择 + best_timeframe = '1h' # 默认 + best_weight = 0 - # 如果有阻力位,取较小值 - if resistance > entry_price: - take_profit_resistance = resistance * 0.98 - take_profit = min(take_profit_ratio, take_profit_resistance) - else: - take_profit = take_profit_ratio + for tf in available_timeframes: + if tf in timeframe_priority: + weight = timeframe_priority[tf]['weight'] + if weight > best_weight: + best_weight = weight + best_timeframe = tf - # 计算实际风险回报比 - actual_risk_reward = (take_profit - entry_price) / risk if risk > 0 else target_ratio - - return entry_price, stop_loss, take_profit, actual_risk_reward + return best_timeframe - def generate_action_suggestion(self, df: pd.DataFrame, signal: CoinSignal, current_price: float) -> str: - """根据评分和价格位置生成操作建议""" - try: - # 计算价格差异百分比 - entry_diff = (current_price - signal.entry_price) / signal.entry_price * 100 - - # 获取技术指标状态 - current = df.iloc[-1] - recent_data = df.tail(10) - - # 计算价格在最近区间的位置 - recent_high = recent_data['high'].max() - recent_low = recent_data['low'].min() - price_range = recent_high - recent_low - current_position = (current_price - recent_low) / price_range if price_range > 0 else 0.5 - - # 基于评分等级的基础建议 - if signal.score >= 85: # 高分 - if entry_diff <= 1: # 接近建议价位 - if current_position <= 0.4: # 在低位 - return "立即买入" - elif current_position <= 0.6: - return "现价买入" - else: - return "等待回调买入" - elif entry_diff <= 3: # 稍高于建议价位 - if signal.strategy_type == "短线": - return "现价买入" - else: - return "等待回调买入" - else: # 明显高于建议价位 - return "等待大幅回调" - - elif signal.score >= 70: # 中等分数 - if entry_diff <= -2: # 低于建议价位 - return "现价买入" - elif entry_diff <= 1: # 接近建议价位 - if current_position <= 0.5: - return "现价买入" - else: - return "等待回调买入" - elif entry_diff <= 5: # 高于建议价位 - return "等待回调买入" - else: - return "暂时观望" - - else: # 较低分数 (60-70) - if entry_diff <= -3: # 明显低于建议价位 - return "分批买入" - elif entry_diff <= 0: # 低于或等于建议价位 - if current_position <= 0.3: - return "小仓位试探" - else: - return "等待回调买入" - elif entry_diff <= 3: - return "等待回调买入" - else: - return "暂时观望" - - return "等待回调买入" # 默认建议 - - except Exception as e: - logging.error(f"生成操作建议时出错: {e}") - return "谨慎观望" - - def analyze_single_coin(self, symbol: str, timeframe_data: Dict[str, pd.DataFrame], volume_24h_usd: float = 0) -> List[CoinSignal]: - """分析单个币种 - 优化策略""" - try: - logging.info(f"\n=== 开始分析 {symbol} ===") - logging.info(f"24小时交易量: ${volume_24h_usd:,.0f}") - - # 先执行基础分析,即使交易量不够也要显示评分 - basic_analysis_executed = False - - # 24小时交易量过滤 - 但仍然显示评分 - if volume_24h_usd < 10_000_000: # 降低到1000万美金进行调试 - logging.info(f"{symbol} 交易量不足1000万美金,但仍显示评分") - # 继续执行分析以显示评分,但最后返回空列表 - basic_analysis_executed = True - - signals = [] - - # 首先使用4h数据确定最适合的策略 - initial_df = timeframe_data.get('4h') - if initial_df is None or len(initial_df) < 20: - logging.info(f"{symbol} 4h数据不足,跳过分析") - return [] - - # 确定最佳策略 - best_strategy = self.determine_best_strategy(initial_df) - config = self.strategies[best_strategy] - logging.info(f"{symbol} 选择策略: {best_strategy} (主时间周期: {config.primary_timeframe}, 确认时间周期: {config.confirm_timeframe})") - - # 检查可用的时间周期数据 - available_timeframes = list(timeframe_data.keys()) - logging.info(f"{symbol} 可用时间周期: {available_timeframes}") - - # 获取策略对应的主要时间周期数据 - main_df = timeframe_data.get(config.primary_timeframe) - if main_df is None or len(main_df) < 20: - # 尝试使用该策略的确认时间周期 - confirm_df = timeframe_data.get(config.confirm_timeframe) - if confirm_df is not None and len(confirm_df) >= 20: - main_df = confirm_df - logging.info(f"{symbol} 使用确认时间周期{config.confirm_timeframe}替代主时间周期{config.primary_timeframe}") - else: - # 最后才回退到初始数据和中线策略 - main_df = initial_df - config = self.strategies['中线'] - best_strategy = '中线' - logging.info(f"{symbol} 数据不足,回退到中线策略") - - # 使用策略相应的参数计算技术指标 - main_df = self.calculate_technical_indicators(main_df, best_strategy) - - # === 综合多头信号分析 === - logging.info(f"\n--- {symbol} 多头分析 ---") - long_score = self._comprehensive_long_analysis(main_df, timeframe_data, best_strategy) - - # 记录多头分析结果(不管是否通过筛选) - logging.info(f"{symbol} 多头分析结果: {long_score:.1f}/100分 (门槛: 70分)") - - if long_score >= 70: # 70分入选门槛 - logging.info(f"✓ {symbol} 多头信号通过筛选") - - # 计算入场出场位 - entry_price, stop_loss, take_profit, risk_reward = self.calculate_entry_exit_levels( - main_df, {}, best_strategy - ) - - # 确定信心等级 - 基于100分制 - if long_score >= 85: - confidence = "高" - elif long_score >= 75: - confidence = "中" - else: - confidence = "低" - - # 生成操作建议 - 基于综合分析 - current_price = main_df.iloc[-1]['close'] - action_suggestion = "现价买入" if long_score >= 85 else "等待回调买入" - - logging.info(f"{symbol} 多头信号详情: 入场${entry_price:.4f}, 止损${stop_loss:.4f}, 止盈${take_profit:.4f}, 风险回报比1:{risk_reward:.2f}") - - long_signal = CoinSignal( - symbol=symbol, - score=long_score, - reason=f"综合多头分析: 高质量信号({long_score}分)", - entry_price=entry_price, - stop_loss=stop_loss, - take_profit=take_profit, - timeframe=config.primary_timeframe, - confidence=confidence, - strategy_type=best_strategy, - holding_period=config.holding_period_days, - risk_reward_ratio=risk_reward, - expiry_hours=config.expiry_hours, - action_suggestion=action_suggestion, - signal_type="LONG", - direction="BUY" - ) - signals.append(long_signal) - else: - logging.info(f"✗ {symbol} 多头信号未通过筛选 (差距: {70 - long_score:.1f}分)") - - # === 综合空头信号分析 === - logging.info(f"\n--- {symbol} 空头分析 ---") - short_score = self._comprehensive_short_analysis(main_df, timeframe_data, best_strategy) - - # 记录空头分析结果(不管是否通过筛选) - logging.info(f"{symbol} 空头分析结果: {short_score:.1f}/100分 (门槛: 70分)") - - if short_score >= 70: # 与多头门槛保持一致 - logging.info(f"✓ {symbol} 空头信号通过筛选") - - # 计算入场出场位 - entry_price, stop_loss, take_profit, risk_reward = self.calculate_short_entry_exit_levels( - main_df, best_strategy - ) - - # 确定信心等级 - 基于100分制 - if short_score >= 85: - confidence = "高" - elif short_score >= 75: - confidence = "中" - else: - confidence = "低" - - # 生成操作建议 - 基于综合分析 - current_price = main_df.iloc[-1]['close'] - action_suggestion = "现价做空" if short_score >= 85 else "等待反弹做空" - - logging.info(f"{symbol} 空头信号详情: 入场${entry_price:.4f}, 止损${stop_loss:.4f}, 止盈${take_profit:.4f}, 风险回报比1:{risk_reward:.2f}") - - short_signal = CoinSignal( - symbol=symbol, - score=short_score, - reason=f"综合空头分析: 高质量信号({short_score}分)", - entry_price=entry_price, - stop_loss=stop_loss, - take_profit=take_profit, - timeframe=config.primary_timeframe, - confidence=confidence, - strategy_type=best_strategy, - holding_period=config.holding_period_days, - risk_reward_ratio=risk_reward, - expiry_hours=config.expiry_hours, - action_suggestion=action_suggestion, - signal_type="SHORT", - direction="SELL" - ) - signals.append(short_signal) - else: - logging.info(f"✗ {symbol} 空头信号未通过筛选 (差距: {70 - short_score:.1f}分)") - - # 总结该币种分析结果 - if signals: - signal_types = [s.signal_type for s in signals] - logging.info(f"{symbol} 分析完成 - 生成{len(signals)}个信号: {', '.join(signal_types)}") - else: - logging.info(f"{symbol} 分析完成 - 无符合条件的信号 (多头:{long_score:.1f}, 空头:{short_score:.1f})") - - # 如果是因为交易量不足而跳过,返回空列表 - if basic_analysis_executed: - logging.info(f"{symbol} 因交易量不足,不生成实际交易信号") - return [] - - return signals - - except Exception as e: - logging.error(f"分析{symbol}时出错: {e}") - return [] - - def analyze_single_coin_with_strategy(self, symbol: str, timeframe_data: Dict[str, pd.DataFrame], - volume_24h_usd: float, strategy_name: str) -> List[CoinSignal]: - """使用指定策略分析单个币种""" - try: - # 24小时交易量过滤 - if volume_24h_usd < 50_000_000: - return [] - - # 验证策略是否存在 - if strategy_name not in self.strategies: - logging.error(f"未知策略: {strategy_name}") - return [] - - config = self.strategies[strategy_name] - - # 获取策略对应的主要时间周期数据 - main_df = timeframe_data.get(config.primary_timeframe) - if main_df is None or len(main_df) < 20: - return [] - - # 使用策略相应的参数计算技术指标 - main_df = self.calculate_technical_indicators(main_df, strategy_name) - - signals = [] - - # 综合多头信号分析 - long_score = self._comprehensive_long_analysis(main_df, timeframe_data, strategy_name) - if long_score >= 80: - # 生成多头信号 - entry_price, stop_loss, take_profit, risk_reward = self.calculate_entry_exit_levels( - main_df, {}, strategy_name - ) - - confidence = "高" if long_score >= 85 else "中" if long_score >= 75 else "低" - action_suggestion = "现价买入" if long_score >= 85 else "等待回调买入" - - long_signal = CoinSignal( - symbol=symbol, - score=long_score, - reason=f"{strategy_name}策略多头信号({long_score}分)", - entry_price=entry_price, - stop_loss=stop_loss, - take_profit=take_profit, - timeframe=config.primary_timeframe, - confidence=confidence, - strategy_type=strategy_name, - holding_period=config.holding_period_days, - risk_reward_ratio=risk_reward, - expiry_hours=config.expiry_hours, - action_suggestion=action_suggestion, - signal_type="LONG", - direction="BUY" - ) - signals.append(long_signal) - - # 综合空头信号分析 - short_score = self._comprehensive_short_analysis(main_df, timeframe_data, strategy_name) - if short_score >= 90: - # 生成空头信号 - entry_price, stop_loss, take_profit, risk_reward = self.calculate_short_entry_exit_levels( - main_df, strategy_name - ) - - confidence = "高" if short_score >= 85 else "中" if short_score >= 75 else "低" - action_suggestion = "现价做空" if short_score >= 85 else "等待反弹做空" - - short_signal = CoinSignal( - symbol=symbol, - score=short_score, - reason=f"{strategy_name}策略空头信号({short_score}分)", - entry_price=entry_price, - stop_loss=stop_loss, - take_profit=take_profit, - timeframe=config.primary_timeframe, - confidence=confidence, - strategy_type=strategy_name, - holding_period=config.holding_period_days, - risk_reward_ratio=risk_reward, - expiry_hours=config.expiry_hours, - action_suggestion=action_suggestion, - signal_type="SHORT", - direction="SELL" - ) - signals.append(short_signal) - - return signals - - except Exception as e: - logging.error(f"使用{strategy_name}策略分析{symbol}时出错: {e}") - return [] - - def _analyze_long_signal(self, symbol: str, main_df: pd.DataFrame, timeframe_data: Dict, config: StrategyConfig, best_strategy: str) -> Optional[CoinSignal]: - """分析多头信号 - 原有逻辑""" - total_score = 0 - reasons = [] + def _select_analysis_primary_timeframe(self, available_timeframes: List[str]) -> str: + """为多时间框架分析选择主要时间框架""" + # 分析时间框架的优先级顺序 + preferred_order = ['1h', '4h', '15m', '1d'] - # === 核心评分体系:以价格行为和技术形态为主导 === + # 选择第一个可用的时间框架 + for tf in preferred_order: + if tf in available_timeframes: + return tf - # 1. 价格行为分析 - 最重要 (最高35分) - price_behavior_score = self.analyze_price_behavior(main_df) - if price_behavior_score > 0: - total_score += price_behavior_score - if price_behavior_score >= 25: - reasons.append(f"强势价格行为({price_behavior_score}分)") - elif price_behavior_score >= 15: - reasons.append(f"积极价格行为({price_behavior_score}分)") - - # 2. 量价关系分析 - 核心指标 (最高30分) - volume_price_score = self.analyze_volume_price_relationship(main_df) - if volume_price_score > 0: - total_score += volume_price_score - if volume_price_score >= 20: - reasons.append(f"理想量价配合({volume_price_score}分)") - elif volume_price_score >= 10: - reasons.append(f"量价配合({volume_price_score}分)") - - # 3. 技术形态确认 - 重要指标 (最高25分) - pattern_score = self.analyze_technical_patterns(main_df) - if pattern_score > 0: - total_score += pattern_score - if pattern_score >= 15: - reasons.append(f"强势技术形态({pattern_score}分)") - elif pattern_score >= 8: - reasons.append(f"技术形态({pattern_score}分)") - - # 4. 突破潜力分析 - 寻找爆发机会 (最高20分) - breakout_score = self.analyze_breakout_potential(main_df) - if breakout_score > 0: - total_score += breakout_score - if breakout_score >= 15: - reasons.append(f"高突破潜力({breakout_score}分)") - elif breakout_score >= 8: - reasons.append(f"突破潜力({breakout_score}分)") - - # 5. 均线系统 - 辅助确认 (最高10分,权重降低) - ma_score = self.analyze_moving_average_trend(main_df) - if ma_score > 0: - ma_score = min(ma_score * 0.4, 10) # 大幅降低权重 - total_score += ma_score - if ma_score >= 6: - reasons.append(f"均线支撑({int(ma_score)}分)") - - # 6. 确认时间周期验证 - 多周期确认 (最高5分) - confirm_df = timeframe_data.get(config.confirm_timeframe) - if confirm_df is not None and len(confirm_df) > 20: - confirm_df = self.calculate_technical_indicators(confirm_df, best_strategy) - confirm_score = self.analyze_price_behavior(confirm_df) * 0.2 # 降低权重 - if confirm_score > 3: - total_score += min(confirm_score, 5) - reasons.append(f"{config.confirm_timeframe}确认") - - # 过滤低分币种 - 适度放宽条件 - if price_behavior_score < 5 and volume_price_score < 5: # 降低要求 - return None - - # 适度放宽的分数过滤 - min_total_score = 50 # 降低最低门槛 - if total_score < min_total_score: - return None - - # 计算新波那契回调位 - fib_levels = self.calculate_fibonacci_levels(main_df) - - # 计算入场和出场位 - 使用对应策略 - entry_price, stop_loss, take_profit, risk_reward = self.calculate_entry_exit_levels( - main_df, fib_levels, best_strategy - ) - - # 确定信心等级 - 适度放宽标准 - if total_score >= 75: # 降低高信心阈值 - confidence = "高" - elif total_score >= 55: # 降低中等信心阈值 - confidence = "中" - else: - confidence = "低" - - # 生成操作建议 - current_price = main_df.iloc[-1]['close'] - temp_signal = CoinSignal( - symbol=symbol, - score=total_score, - reason=" | ".join(reasons), - entry_price=entry_price, - stop_loss=stop_loss, - take_profit=take_profit, - timeframe=config.primary_timeframe, - confidence=confidence, - strategy_type=best_strategy, - holding_period=config.holding_period_days, - risk_reward_ratio=risk_reward, - expiry_hours=config.expiry_hours, - action_suggestion="", # 临时占位 - signal_type="LONG", - direction="BUY" - ) - - # 生成操作建议 - action_suggestion = self.generate_action_suggestion(main_df, temp_signal, current_price) - - return CoinSignal( - symbol=symbol, - score=total_score, - reason=" | ".join(reasons), - entry_price=entry_price, - stop_loss=stop_loss, - take_profit=take_profit, - timeframe=config.primary_timeframe, - confidence=confidence, - strategy_type=best_strategy, - holding_period=config.holding_period_days, - risk_reward_ratio=risk_reward, - expiry_hours=config.expiry_hours, - action_suggestion=action_suggestion, - signal_type="LONG", - direction="BUY" - ) + # 如果都没有,返回第一个可用的 + return available_timeframes[0] if available_timeframes else '1h' def select_coins(self, market_data: Dict[str, Dict]) -> Dict[str, List[CoinSignal]]: - """批量选币 - 使用优化策略""" - logging.info(f"\n{'='*60}") - logging.info(f"开始批量选币分析,共{len(market_data)}个币种") - logging.info(f"{'='*60}") - + """选币主函数""" + all_signals = [] long_signals = [] short_signals = [] - filtered_count = 0 - low_volume_count = 0 - strategy_stats = {} # 统计各策略使用次数 - all_scores = [] # 记录所有币种的评分 - for i, (symbol, data) in enumerate(market_data.items(), 1): - logging.info(f"\n进度: {i}/{len(market_data)} - 分析{symbol}") - - timeframe_data = data.get('timeframes', {}) - volume_24h_usd = data.get('volume_24h_usd', 0) - - # 24小时交易量过滤统计 - if volume_24h_usd < 10_000_000: # 降低到1000万进行调试 - low_volume_count += 1 - logging.info(f"{symbol} 交易量过滤: ${volume_24h_usd:,.0f} < $10,000,000") - continue - - signals = self.analyze_single_coin(symbol, timeframe_data, volume_24h_usd) - - # 获取该币种的评分信息(用于统计) + self.logger.info(f"开始分析{len(market_data)}个币种...") + + for symbol, data in market_data.items(): try: - # 重新计算评分用于统计(避免重复计算,这里简化处理) - initial_df = timeframe_data.get('4h') - if initial_df is not None and len(initial_df) >= 20: - best_strategy = self.determine_best_strategy(initial_df) - config = self.strategies[best_strategy] - main_df = timeframe_data.get(config.primary_timeframe, initial_df) - if len(main_df) >= 20: - main_df = self.calculate_technical_indicators(main_df, best_strategy) - temp_long_score = self._comprehensive_long_analysis(main_df, timeframe_data, best_strategy) - temp_short_score = self._comprehensive_short_analysis(main_df, timeframe_data, best_strategy) - - all_scores.append({ - 'symbol': symbol, - 'strategy': best_strategy, - 'long_score': temp_long_score, - 'short_score': temp_short_score, - 'volume_24h': volume_24h_usd - }) - except Exception as e: - logging.debug(f"获取{symbol}评分统计信息失败: {e}") - - for signal in signals: - if signal: - # 统计策略使用情况 - strategy_key = signal.strategy_type - if strategy_key not in strategy_stats: - strategy_stats[strategy_key] = {'long': 0, 'short': 0, 'total': 0} - - # 使用统一的70分门槛 - if signal.score >= 70: # 统一70分入选门槛 - if signal.signal_type == "LONG": - long_signals.append(signal) - strategy_stats[strategy_key]['long'] += 1 - strategy_stats[strategy_key]['total'] += 1 - logging.info(f"✓ {symbol} 多头信号入选: {signal.score:.1f}分 ({signal.strategy_type})") - elif signal.signal_type == "SHORT": - short_signals.append(signal) - strategy_stats[strategy_key]['short'] += 1 - strategy_stats[strategy_key]['total'] += 1 - logging.info(f"✓ {symbol} 空头信号入选: {signal.score:.1f}分 ({signal.strategy_type})") + timeframe_data = data.get('timeframes', {}) + volume_24h_usd = data.get('volume_24h_usd', 0) + + signals = self.analyze_single_coin(symbol, timeframe_data, volume_24h_usd) + + for signal in signals: + all_signals.append(signal) + if signal.signal_type == "LONG": + long_signals.append(signal) else: - filtered_count += 1 - logging.info(f"✗ {symbol} {signal.signal_type}信号过滤: {signal.score:.1f}分 < 80分") + short_signals.append(signal) + + except Exception as e: + self.logger.error(f"分析{symbol}时出错: {e}") + continue - # 按分数排序 - long_signals.sort(key=lambda x: x.score, reverse=True) - short_signals.sort(key=lambda x: x.score, reverse=True) + # 按符合因子数量排序 + all_signals.sort(key=lambda x: x.qualified_factors, reverse=True) + long_signals.sort(key=lambda x: x.qualified_factors, reverse=True) + short_signals.sort(key=lambda x: x.qualified_factors, reverse=True) - # 选择最优信号 - 统一70分门槛 - def filter_quality_signals(signals): - quality_signals = [] - for signal in signals: - # 只选择符合70分门槛的信号 - if signal.score >= 80: # 优质信号优先 - quality_signals.append(signal) - elif len(quality_signals) < self.max_selections and signal.score >= 70: # 补充达标信号 - quality_signals.append(signal) - return quality_signals[:self.max_selections] - - selected_long_signals = filter_quality_signals(long_signals) - selected_short_signals = filter_quality_signals(short_signals) - - # 详细统计信息 - logging.info(f"\n{'='*60}") - logging.info("批量选币分析完成 - 详细统计") - logging.info(f"{'='*60}") - logging.info(f"总币种数量: {len(market_data)}") - logging.info(f"交易量过滤: {low_volume_count}个") - logging.info(f"分数过滤: {filtered_count}个") - logging.info(f"原始信号: 多头{len(long_signals)}个, 空头{len(short_signals)}个") - logging.info(f"最终选择: 多头{len(selected_long_signals)}个, 空头{len(selected_short_signals)}个") - - # 策略分布统计 - if strategy_stats: - logging.info(f"\n策略使用分布:") - for strategy, stats in sorted(strategy_stats.items(), key=lambda x: x[1]['total'], reverse=True): - logging.info(f" {strategy}: {stats['total']}个信号 (多头{stats['long']}, 空头{stats['short']})") - - # 评分分布统计 - if all_scores: - logging.info(f"\n全部币种评分分布:") - long_scores = [item['long_score'] for item in all_scores] - short_scores = [item['short_score'] for item in all_scores] - - # 多头评分统计 - long_high = len([s for s in long_scores if s >= 80]) - long_medium = len([s for s in long_scores if 60 <= s < 80]) - long_low = len([s for s in long_scores if 40 <= s < 60]) - long_very_low = len([s for s in long_scores if s < 40]) - - logging.info(f" 多头评分分布: ≥80分({long_high}个), 60-79分({long_medium}个), 40-59分({long_low}个), <40分({long_very_low}个)") - - # 空头评分统计 - short_high = len([s for s in short_scores if s >= 90]) - short_medium = len([s for s in short_scores if 70 <= s < 90]) - short_low = len([s for s in short_scores if 50 <= s < 70]) - short_very_low = len([s for s in short_scores if s < 50]) - - logging.info(f" 空头评分分布: ≥90分({short_high}个), 70-89分({short_medium}个), 50-69分({short_low}个), <50分({short_very_low}个)") - - # 显示前10名多头和空头候选 - top_long_candidates = sorted(all_scores, key=lambda x: x['long_score'], reverse=True)[:10] - top_short_candidates = sorted(all_scores, key=lambda x: x['short_score'], reverse=True)[:10] - - logging.info(f"\n前10名多头候选:") - for i, item in enumerate(top_long_candidates, 1): - status = "✓入选" if any(s.symbol == item['symbol'] and s.signal_type == "LONG" for s in selected_long_signals) else "未选中" - logging.info(f" {i:2d}. {item['symbol']:12s}: {item['long_score']:5.1f}分 ({item['strategy']}) - {status}") - - logging.info(f"\n前10名空头候选:") - for i, item in enumerate(top_short_candidates, 1): - status = "✓入选" if any(s.symbol == item['symbol'] and s.signal_type == "SHORT" for s in selected_short_signals) else "未选中" - logging.info(f" {i:2d}. {item['symbol']:12s}: {item['short_score']:5.1f}分 ({item['strategy']}) - {status}") - - # 最终信号详情 - if selected_long_signals: - logging.info(f"\n最终多头信号详情:") - for i, signal in enumerate(selected_long_signals, 1): - logging.info(f" {i}. {signal.symbol}: {signal.score:.1f}分 ({signal.strategy_type}-{signal.confidence})") - - if selected_short_signals: - logging.info(f"\n最终空头信号详情:") - for i, signal in enumerate(selected_short_signals, 1): - logging.info(f" {i}. {signal.symbol}: {signal.score:.1f}分 ({signal.strategy_type}-{signal.confidence})") + # 移除数量限制 - 返回所有符合条件的信号 + self.logger.info(f"选币完成 - 总信号:{len(all_signals)}, 多头:{len(long_signals)}, 空头:{len(short_signals)}") return { - 'long': selected_long_signals, - 'short': selected_short_signals, - 'all': selected_long_signals + selected_short_signals + 'all': all_signals, + 'long': long_signals, + 'short': short_signals } - # === 空头分析方法 === + def get_optimal_timeframes_for_analysis(self) -> List[str]: + """获取最优分析时间周期 - 多时间框架策略""" + return ["15m", "1h", "4h", "1d"] - def analyze_bearish_volume_price(self, df: pd.DataFrame) -> float: - """分析空头量价关系 (30分)""" - if len(df) < 20: - return 0 + def test_strategy_distribution(self, market_data: Dict[str, Dict]): + """测试策略分布""" + self.logger.info("测试新策略分布...") + signals_result = self.select_coins(market_data) - score = 0 - recent_data = df.tail(15) - - # 计算价格和成交量的相关性 - price_changes = recent_data['close'].pct_change().dropna() - volume_changes = recent_data['volume'].pct_change().dropna() - - if len(price_changes) >= 10 and len(volume_changes) >= 10: - # 分析下跌日的量价关系 - down_days = price_changes < 0 - down_price_changes = price_changes[down_days] - down_volume_changes = volume_changes[down_days] - - if len(down_price_changes) >= 5: - # 计算下跌日的量价相关性 - correlation = down_price_changes.corr(down_volume_changes) - - if correlation < -0.5: # 强负相关(价跌量增) - score += 15 - elif correlation < -0.3: - score += 10 - elif correlation < -0.1: - score += 5 - - # 分析最近5天的量价配合 - recent_5 = df.tail(5) - down_days_recent = (recent_5['close'].diff() < 0).sum() - avg_volume_recent = recent_5['volume'].mean() - avg_volume_before = df.iloc[-20:-5]['volume'].mean() if len(df) >= 20 else avg_volume_recent - - volume_ratio = avg_volume_recent / avg_volume_before if avg_volume_before > 0 else 1 - - # 下跌日多且成交量放大 - if down_days_recent >= 3 and volume_ratio > 1.2: - score += 15 - - return min(score, 30) - - def analyze_bearish_patterns(self, df: pd.DataFrame) -> float: - """分析空头技术形态 (25分)""" - if len(df) < 20: - return 0 - - score = 0 - - # 使用talib的K线形态识别(空头形态) - patterns = { - 'hanging_man': talib.CDLHANGINGMAN(df['open'], df['high'], df['low'], df['close']), - 'shooting_star': talib.CDLSHOOTINGSTAR(df['open'], df['high'], df['low'], df['close']), - 'dark_cloud': talib.CDLDARKCLOUDCOVER(df['open'], df['high'], df['low'], df['close']), - 'evening_star': talib.CDLEVENINGSTAR(df['open'], df['high'], df['low'], df['close']), - 'three_black_crows': talib.CDL3BLACKCROWS(df['open'], df['high'], df['low'], df['close']), - 'bearish_engulfing': talib.CDLENGULFING(df['open'], df['high'], df['low'], df['close']) - } - - # 检查最近3天的K线形态 - for pattern_name, pattern_data in patterns.items(): - if len(pattern_data) > 3: - # 检查最近3天是否有信号 - for i in range(-3, 0): - if pattern_data.iloc[i] < 0: # 看跌信号 - if pattern_name in ['evening_star', 'three_black_crows', 'dark_cloud']: - score += 6 # 强看跌形态 - elif pattern_name in ['hanging_man', 'shooting_star']: - score += 4 # 中度看跌形态 - else: - score += 3 # 弱看跌形态 - break - - # 手动分析一些简单空头形态 - manual_score = self._analyze_manual_bearish_patterns(df.tail(10)) - score += manual_score - - return min(score, 25) - - def _analyze_manual_bearish_patterns(self, df: pd.DataFrame) -> float: - """手动分析空头形态""" - if len(df) < 5: - return 0 - - score = 0 - - # 分析最近5天的走势 - last_5 = df.tail(5) - - # 1. 阶段性高点下移模式 - highs = last_5['high'].tolist() - if len(highs) >= 3: - # 检查是否有高点逐步下移的趋势 - descending_highs = True - for i in range(1, len(highs)): - if highs[i] > highs[i-1] * 1.02: # 允许小幅波动 - descending_highs = False - break - if descending_highs: - score += 4 - - # 2. 跌破前低点模式 - current_low = df.iloc[-1]['low'] - prev_lows = df.iloc[-5:-1]['low'].tolist() - if current_low < min(prev_lows) * 0.99: # 跌破前低 - score += 3 - - return min(score, 7) - - def analyze_breakdown_potential(self, df: pd.DataFrame) -> float: - """分析下跌突破潜力 (20分)""" - if len(df) < 30: - return 0 - - score = 0 - current = df.iloc[-1] - recent_data = df.tail(20) - - # 1. 价格位置分析 - 寻找做空机会 (8分) - recent_high = recent_data['high'].max() - recent_low = recent_data['low'].min() - price_range = recent_high - recent_low - current_position = (current['close'] - recent_low) / price_range if price_range > 0 else 0 - - # 偏好在高位的币种 - if 0.7 <= current_position <= 0.9: # 高位做空机会 - score += 6 - elif 0.5 <= current_position < 0.7: - score += 3 - elif current_position < 0.3: # 已经低位,做空机会不大 - score -= 5 - - # 2. 顶部形态分析 (6分) - volatility = recent_data['close'].std() / recent_data['close'].mean() - if 0.08 <= volatility <= 0.15: # 高位整理,可能形成顶部 - score += 5 - elif volatility > 0.20: # 高波动,可能已经开始下跌 - score += 3 - - # 3. 量能衰竭 (6分) - volume_ratio = recent_data['volume'] / recent_data['volume'].rolling(10).mean() - weak_volume_days = ((volume_ratio < 0.8)).sum() # 量能萱缩 - - if weak_volume_days >= 3: # 量能持续萱缩 - score += 4 - - return max(min(score, 20), 0) - - def analyze_bearish_moving_average(self, df: pd.DataFrame) -> float: - """分析空头均线系统 (10分)""" - if len(df) < 50: - return 0 - - score = 0 - current = df.iloc[-1] - - # 1. 均线空头排列 - if current['close'] < current['ma20'] < current['ma50']: - score += 6 - - # 2. 价格跌破关键均线 - prev = df.iloc[-2] - if prev['close'] >= prev['ma20'] and current['close'] < current['ma20']: - score += 3 - - # 3. 均线斜率向下 - ma20_slope = (current['ma20'] - df.iloc[-5]['ma20']) / 5 - if ma20_slope < 0: - score += 1 - - return min(score, 10) - - def calculate_short_entry_exit_levels(self, df: pd.DataFrame, strategy: str) -> Tuple[float, float, float, float]: - """计算空头入场、止损、止盈位""" - current_price = df.iloc[-1]['close'] - atr = df.iloc[-1]['atr'] - support, resistance = self.find_support_resistance(df) - config = self.strategies[strategy] - - # 空头入场价策略 - ma20 = df.iloc[-1]['ma20'] - ma50 = df.iloc[-1]['ma50'] - - # 策略1: 均线反弹入场 - if current_price < ma20 < ma50: # 空头趋势 - # 等待反弹至MA20附近做空 - entry_price = ma20 * 0.98 # MA20下方2% + all_signals = signals_result.get('all', []) + if all_signals: + self.logger.info(f"新策略共发现{len(all_signals)}个信号") + for signal in all_signals[:5]: + self.logger.info(f" {signal.symbol}: {signal.qualified_factors}/4因子 - {signal.reason}") else: - # 策略2: 跌破确认入场 - recent_low = df.tail(10)['low'].min() - if current_price <= recent_low * 1.02: # 接近跌破 - entry_price = recent_low * 0.995 # 跌破确认价位 - else: - # 策略3: 当前价格适当上方等待 - entry_price = current_price * 1.005 # 当前价上方0.5% - - # 确保入场价不会过高(距离当前价不超过5%) - if entry_price > current_price * 1.05: - entry_price = current_price * 1.02 - - # 止损位: 阻力位上方或ATR的1.5倍 - stop_loss_atr = entry_price + (atr * 1.5) - stop_loss_resistance = resistance * 1.02 if resistance > 0 else entry_price * 1.08 - stop_loss = min(stop_loss_atr, stop_loss_resistance) - - # 根据策略动态设置风险回报比 - risk = stop_loss - entry_price - target_ratio = (config.risk_reward_min + config.risk_reward_max) / 2 - take_profit_ratio = entry_price - (risk * target_ratio) - - # 如果有支撑位,取较大值 - if support > 0 and support < entry_price: - take_profit_support = support * 1.02 - take_profit = max(take_profit_ratio, take_profit_support) - else: - take_profit = take_profit_ratio - - # 计算实际风险回报比 - actual_risk_reward = (entry_price - take_profit) / risk if risk > 0 else target_ratio - - return entry_price, stop_loss, take_profit, actual_risk_reward - - def generate_short_action_suggestion(self, df: pd.DataFrame, entry_price: float, current_price: float, score: float) -> str: - """生成空头操作建议""" - try: - # 计算价格差异百分比 - entry_diff = (current_price - entry_price) / entry_price * 100 - - # 获取技术指标状态 - current = df.iloc[-1] - recent_data = df.tail(10) - - # 计算价格在最近区间的位置 - recent_high = recent_data['high'].max() - recent_low = recent_data['low'].min() - price_range = recent_high - recent_low - current_position = (current_price - recent_low) / price_range if price_range > 0 else 0.5 - - # 基于评分等级的基础建议 - if score >= 80: # 高分 - if entry_diff >= -1: # 接近或高于建议价位 - if current_position >= 0.7: # 在高位 - return "立即做空" - elif current_position >= 0.5: - return "现价做空" - else: - return "等待反弹做空" - elif entry_diff >= -3: # 稍低于建议价位 - return "等待反弹做空" - else: # 明显低于建议价位 - return "等待大幅反弹" - - elif score >= 65: # 中等分数 - if entry_diff <= 2: # 高于建议价位 - return "现价做空" - elif entry_diff <= -1: # 接近建议价位 - if current_position >= 0.6: - return "现价做空" - else: - return "等待反弹做空" - elif entry_diff <= -5: # 低于建议价位 - return "等待反弹做空" - else: - return "暂时观望" - - else: # 较低分数 - if entry_diff <= 3: # 明显高于建议价位 - return "小仓位做空" - elif entry_diff <= 0: - if current_position >= 0.8: - return "小仓位试探" - else: - return "等待反弹做空" - else: - return "暂时观望" - - return "等待反弹做空" # 默认建议 - - except Exception as e: - logging.error(f"生成空头操作建议时出错: {e}") - return "谨慎观望" - - def test_strategy_distribution(self, market_data: Dict[str, Dict]) -> Dict[str, int]: - """测试策略分布,确保所有策略都能被选中""" - strategy_counts = {name: 0 for name in self.strategies.keys()} - - for symbol, data in list(market_data.items())[:20]: # 只测试前20个 - timeframe_data = data.get('timeframes', {}) - initial_df = timeframe_data.get('4h') - - if initial_df is not None and len(initial_df) >= 20: - strategy = self.determine_best_strategy(initial_df) - strategy_counts[strategy] += 1 - logging.info(f"测试策略分布: {symbol} -> {strategy}") - - logging.info(f"策略分布测试结果: {strategy_counts}") - return strategy_counts \ No newline at end of file + self.logger.warning("新策略未发现任何信号") \ No newline at end of file diff --git a/templates/dashboard_table.html b/templates/dashboard_table.html new file mode 100644 index 0000000..1a48da3 --- /dev/null +++ b/templates/dashboard_table.html @@ -0,0 +1,597 @@ + + + + + + AI选币系统 - 表格视图 + + + + + + +
+ +
+
+
+
{{ total_signals }}
+
总信号数
+
+
+
+
{{ long_signals }}
+
多头信号
+
+
+
+
{{ short_signals }}
+
空头信号
+
+
+
+
{{ update_time }}
+
更新时间
+
+
+ + +
+ {% if grouped_selections %} + {% for group_time, timeframe_groups in grouped_selections.items() %} +
+
+ + {{ group_time }} + {% set total_coins = timeframe_groups.values() | map('length') | sum %} + ({{ total_coins }}个币种) +
+ + + {% for timeframe, selections in timeframe_groups.items() %} +
+
+ + {{ timeframe }} 时间级别 ({{ selections|length }}个信号) +
+
+ + + + + + + + + + + + + + + + + {% for selection in selections %} + + + + + + + + + + + + + {% endfor %} + +
币种符合因子信号类型策略入场价止损价止盈价操作建议选择理由时间
+ {{ selection.symbol.replace('USDT', '') }} + + + {{ selection.qualified_factors if selection.qualified_factors is not none else selection.score|round|int }}/6 + + + + {{ '做空' if selection.signal_type == 'SHORT' else '做多' }} + + + + {{ selection.strategy_type }} + + + ${{ "%.4f"|format(selection.entry_price) }} + + ${{ "%.4f"|format(selection.stop_loss) }} + + ${{ "%.4f"|format(selection.take_profit) }} + + + {{ selection.action_suggestion }} + + + {{ selection.reason }} + + {{ selection.selection_time.split(' ')[1][:5] if selection.selection_time else '-' }} +
+
+
+ {% endfor %} +
+ {% endfor %} + + + {% if total_count >= current_limit %} +
+ +
+ {% endif %} + {% else %} +
+
+

暂无选币数据

+

请稍后刷新或运行选币程序

+
+ {% endif %} +
+
+ + + + \ No newline at end of file diff --git a/web_app.py b/web_app.py index 54a2b59..a3fd1ac 100644 --- a/web_app.py +++ b/web_app.py @@ -60,7 +60,7 @@ async def dashboard(request: Request, limit: int = 20, offset: int = 0): lambda: engine.get_latest_selections(limit + 5, offset) ) - # 按年月日时分分组选币结果,转换时间为东八区显示 + # 按年月日时分分组选币结果,再按时间级别二级分组 grouped_selections = {} latest_update_time = None @@ -77,12 +77,17 @@ async def dashboard(request: Request, limit: int = 20, offset: int = 0): time_key = beijing_time[:16] # "YYYY-MM-DD HH:MM" if time_key not in grouped_selections: - grouped_selections[time_key] = [] + grouped_selections[time_key] = {} + + # 按时间级别进行二级分组 + timeframe = selection.get('timeframe', '未知') + if timeframe not in grouped_selections[time_key]: + grouped_selections[time_key][timeframe] = [] # 更新选币记录的显示时间,但不修改原始时间 selection_copy = selection.copy() selection_copy['selection_time'] = beijing_time - grouped_selections[time_key].append(selection_copy) + grouped_selections[time_key][timeframe].append(selection_copy) # 按时间降序排序(最新的在前面) sorted_grouped_selections = dict(sorted( @@ -91,13 +96,24 @@ async def dashboard(request: Request, limit: int = 20, offset: int = 0): reverse=True )) - return templates.TemplateResponse("dashboard.html", { + # 对每个时间组内的时间级别进行排序(15m, 1h, 4h, 1d) + timeframe_order = {'15m': 1, '1h': 2, '4h': 3, '1d': 4} + for time_key in sorted_grouped_selections: + sorted_timeframes = dict(sorted( + sorted_grouped_selections[time_key].items(), + key=lambda x: timeframe_order.get(x[0], 99) + )) + sorted_grouped_selections[time_key] = sorted_timeframes + + return templates.TemplateResponse("dashboard_table.html", { "request": request, "grouped_selections": sorted_grouped_selections, - "last_update": latest_update_time + " CST" if latest_update_time else "暂无数据", + "update_time": latest_update_time.split(' ')[1][:5] if latest_update_time else "--:--", + "total_signals": len(selections), + "long_signals": len([s for s in selections if s.get('signal_type') == 'LONG']), + "short_signals": len([s for s in selections if s.get('signal_type') == 'SHORT']), "total_count": len(selections), - "current_limit": limit, - "current_offset": offset + "current_limit": limit }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @@ -126,6 +142,27 @@ async def get_selections(limit: int = 20, offset: int = 0): "message": str(e) }, status_code=500) +@app.get("/api/refresh") +async def refresh_data(): + """刷新数据API - 仅刷新显示数据,不执行选币""" + try: + # 清除缓存 + cache_data.clear() + + # 获取现有数据进行显示 + selections = engine.get_latest_selections(50, 0) + + return JSONResponse({ + "success": True, + "message": f"数据刷新完成,当前有{len(selections)}个选币记录", + "count": len(selections) + }) + except Exception as e: + return JSONResponse({ + "success": False, + "message": str(e) + }, status_code=500) + @app.post("/api/run_selection") async def run_selection(): """执行选币API"""