""" K线形态策略模块 实现"两阳线+阴线+阳线"形态识别策略 """ import pandas as pd import numpy as np from typing import Dict, List, Tuple, Optional, Any from datetime import datetime, timedelta from loguru import logger from ..data.data_fetcher import ADataFetcher from ..utils.notification import NotificationManager from ..database.database_manager import DatabaseManager class KLinePatternStrategy: """K线形态策略类""" def __init__(self, data_fetcher: ADataFetcher, notification_manager: NotificationManager, config: Dict[str, Any], db_manager: DatabaseManager = None): """ 初始化K线形态策略 Args: data_fetcher: 数据获取器 notification_manager: 通知管理器 config: 策略配置 db_manager: 数据库管理器 """ self.data_fetcher = data_fetcher self.notification_manager = notification_manager self.config = config self.db_manager = db_manager or DatabaseManager() # 策略参数 self.strategy_name = "K线形态策略" self.min_entity_ratio = config.get('min_entity_ratio', 0.55) # 前两根阳线实体部分最小比例 self.final_yang_min_ratio = config.get('final_yang_min_ratio', 0.40) # 最后阳线实体部分最小比例 self.max_turnover_ratio = config.get('max_turnover_ratio', 40.0) # 最后阳线最大换手率(%) self.timeframes = config.get('timeframes', ['daily', 'weekly']) # 支持的时间周期 # 回踩监控参数 self.pullback_tolerance = config.get('pullback_tolerance', 0.02) # 回踩容忍度(2%) self.monitor_days = config.get('monitor_days', 30) # 监控回踩的天数 # 存储已触发的信号,用于监控回踩 # 格式: {stock_code: {'signals': [signal_dict], 'last_check_date': date}} self.triggered_signals = {} # 确保策略在数据库中存在 self.strategy_id = self.db_manager.create_or_update_strategy( strategy_name=self.strategy_name, strategy_type="kline_pattern", description="两阳线+阴线+阳线突破形态识别策略", config=config ) logger.info(f"K线形态策略初始化完成 (策略ID: {self.strategy_id})") def calculate_kline_features(self, df: pd.DataFrame) -> pd.DataFrame: """ 计算K线特征指标 Args: df: K线数据DataFrame,包含 open, high, low, close 列 Returns: 添加了特征指标的DataFrame """ if df.empty or len(df) < 4: return df # 确保列名正确 required_cols = ['open', 'high', 'low', 'close'] if not all(col in df.columns for col in required_cols): logger.warning(f"K线数据缺少必要字段: {required_cols}") return df df = df.copy() # 计算涨跌情况 df['is_yang'] = df['close'] > df['open'] # 阳线 df['is_yin'] = df['close'] < df['open'] # 阴线 # 计算实体部分和振幅 df['entity'] = abs(df['close'] - df['open']) # 实体长度 df['amplitude'] = df['high'] - df['low'] # 振幅 # 计算实体占振幅的比例 df['entity_ratio'] = np.where(df['amplitude'] > 0, df['entity'] / df['amplitude'], 0) # 计算涨跌幅 df['change_pct'] = (df['close'] - df['open']) / df['open'] * 100 # 计算EMA20指标 df['ema20'] = df['close'].ewm(span=20, adjust=False).mean() # 判断是否在EMA20上方 df['above_ema20'] = df['close'] > df['ema20'] # 计算换手率(如果存在volume和float_share列) if 'volume' in df.columns and 'float_share' in df.columns: # 换手率 = 成交量 / 流通股本 * 100% df['turnover_ratio'] = np.where(df['float_share'] > 0, (df['volume'] / df['float_share']) * 100, 0) elif 'turnover_ratio' not in df.columns: # 如果数据中没有换手率,设为0(不进行此项约束) df['turnover_ratio'] = 0 return df def detect_pattern(self, df: pd.DataFrame) -> List[Dict[str, Any]]: """ 检测"两阳线+阴线+阳线"形态 Args: df: 包含特征指标的K线数据 Returns: 检测到的形态信号列表 """ signals = [] if df.empty or len(df) < 4: return signals # 从第4个数据点开始检测(需要4根K线) for i in range(3, len(df)): # 获取连续4根K线 k1, k2, k3, k4 = df.iloc[i-3:i+1].to_dict('records') # 检查形态:两阳线 + 阴线 + 阳线 pattern_match = ( k1['is_yang'] and k2['is_yang'] and # 前两根是阳线 k3['is_yin'] and # 第三根是阴线 k4['is_yang'] # 第四根是阳线 ) if not pattern_match: continue # 检查前两根阳线的实体比例 yang1_valid = k1['entity_ratio'] >= self.min_entity_ratio yang2_valid = k2['entity_ratio'] >= self.min_entity_ratio if not (yang1_valid and yang2_valid): continue # 检查最后一根阳线的收盘价是否高于阴线的最高价 breakout_valid = k4['close'] > k3['high'] if not breakout_valid: continue # 检查最后一根阳线的实体比例 final_yang_valid = k4['entity_ratio'] >= self.final_yang_min_ratio if not final_yang_valid: continue # 检查最后一根阳线是否在EMA20上方 ema20_valid = k4.get('above_ema20', False) if not ema20_valid: continue # 检查最后一根阳线的换手率 turnover_ratio = k4.get('turnover_ratio', 0) turnover_valid = turnover_ratio <= self.max_turnover_ratio if not turnover_valid: continue # 构建信号 signal = { 'index': i, 'date': df.iloc[i].get('trade_date', df.index[i]), 'pattern_type': '两阳+阴+阳突破', 'k1': k1, # 第一根阳线 'k2': k2, # 第二根阳线 'k3': k3, # 阴线 'k4': k4, # 突破阳线 'yang1_entity_ratio': k1['entity_ratio'], 'yang2_entity_ratio': k2['entity_ratio'], 'final_yang_entity_ratio': k4['entity_ratio'], 'breakout_price': k4['close'], 'yin_high': k3['high'], 'breakout_amount': k4['close'] - k3['high'], 'breakout_pct': (k4['close'] - k3['high']) / k3['high'] * 100 if k3['high'] > 0 else 0, 'ema20_price': k4.get('ema20', 0), 'above_ema20': k4.get('above_ema20', False), 'turnover_ratio': turnover_ratio } signals.append(signal) # 美化信号发现日志 logger.info("🎯" + "="*60) logger.info(f"📈 发现K线形态突破信号!") logger.info(f"📅 信号时间: {signal['date']}") logger.info(f"💰 突破价格: {signal['breakout_price']:.2f}元") logger.info(f"📊 实体比例: 阳线1({signal['yang1_entity_ratio']:.1%}) | 阳线2({signal['yang2_entity_ratio']:.1%}) | 最后阳线({signal['final_yang_entity_ratio']:.1%})") logger.info(f"💥 突破幅度: {signal['breakout_pct']:.2f}% (突破阴线最高价{signal['yin_high']:.2f}元)") logger.info(f"📈 EMA20: {signal['ema20_price']:.2f}元 ({'✅上方' if signal['above_ema20'] else '❌下方'})") logger.info(f"🔄 换手率: {signal['turnover_ratio']:.2f}% ({'✅合规' if signal['turnover_ratio'] <= self.max_turnover_ratio else '❌过高'})") logger.info("🎯" + "="*60) return signals def analyze_stock(self, stock_code: str, stock_name: str = None, days: int = 60, session_id: Optional[int] = None) -> Dict[str, List[Dict[str, Any]]]: """ 分析单只股票的K线形态 Args: stock_code: 股票代码 stock_name: 股票名称 days: 分析的天数 Returns: 各时间周期的信号字典 """ results = {} if stock_name is None: # 尝试获取股票中文名称 stock_name = self.data_fetcher.get_stock_name(stock_code) try: # 计算开始日期,针对不同周期调整时间范围 end_date = datetime.now().strftime('%Y-%m-%d') for timeframe in self.timeframes: # 针对1小时周期调整分析天数,避免数据量过大 if timeframe == '1h': # 1小时数据只分析最近7天 analysis_days = min(days, 7) else: analysis_days = days start_date = (datetime.now() - timedelta(days=analysis_days)).strftime('%Y-%m-%d') logger.info(f"🔍 分析股票: {stock_code}({stock_name}) | 周期: {timeframe}") # 获取历史数据 - 直接使用adata的原生周期支持 df = self.data_fetcher.get_historical_data(stock_code, start_date, end_date, timeframe) if df.empty: logger.warning(f"{stock_code} {timeframe} 数据为空") results[timeframe] = [] continue # 计算K线特征 df_with_features = self.calculate_kline_features(df) # 检测形态 signals = self.detect_pattern(df_with_features) # 处理信号 for signal in signals: signal['stock_code'] = stock_code signal['stock_name'] = stock_name signal['timeframe'] = timeframe # 将信号添加到监控列表 self.add_triggered_signal(signal) # 保存信号到数据库(如果提供了session_id) if session_id is not None: try: signal_id = self.db_manager.save_stock_signal( session_id=session_id, strategy_id=self.strategy_id, signal=signal ) signal['signal_id'] = signal_id logger.debug(f"信号已保存到数据库: {stock_code} (ID: {signal_id})") except Exception as e: logger.error(f"保存信号到数据库失败: {e}") results[timeframe] = signals # 美化信号统计日志 if signals: logger.info(f"✅ {stock_code}({stock_name}) {timeframe}周期: 发现 {len(signals)} 个信号") for i, signal in enumerate(signals, 1): logger.info(f" 📊 信号{i}: {signal['date']} | 价格: {signal['breakout_price']:.2f}元 | 实体: {signal['final_yang_entity_ratio']:.1%}") else: logger.debug(f"📭 {stock_code}({stock_name}) {timeframe}周期: 无信号") except Exception as e: logger.error(f"分析股票 {stock_code} 失败: {e}") for timeframe in self.timeframes: results[timeframe] = [] return results def check_pullback_signals(self, stock_code: str, current_data: pd.DataFrame) -> List[Dict[str, Any]]: """ 检查已触发信号的价格回踩情况 Args: stock_code: 股票代码 current_data: 当前K线数据 Returns: 回踩提醒信号列表 """ pullback_alerts = [] if stock_code not in self.triggered_signals: return pullback_alerts signals = self.triggered_signals[stock_code]['signals'] current_date = datetime.now().date() if current_data.empty: return pullback_alerts # 获取最新价格 latest_price = current_data.iloc[-1]['close'] latest_low = current_data.iloc[-1]['low'] latest_date = current_data.iloc[-1].get('trade_date', current_data.index[-1]) if isinstance(latest_date, str): latest_date = pd.to_datetime(latest_date).date() elif hasattr(latest_date, 'date'): latest_date = latest_date.date() for signal in signals: # 检查信号是否在监控期内 signal_date = signal['date'] if isinstance(signal_date, str): signal_date = pd.to_datetime(signal_date).date() elif hasattr(signal_date, 'date'): signal_date = signal_date.date() days_since_signal = (current_date - signal_date).days if days_since_signal > self.monitor_days: continue yin_high = signal['yin_high'] # 阴线最高点 breakout_price = signal['breakout_price'] # 突破时价格 # 检查是否发生回踩 # 条件1: 最低价接近或跌破阴线最高点 pullback_to_yin_high = latest_low <= (yin_high * (1 + self.pullback_tolerance)) # 条件2: 当前价格相比突破价格有明显回调 significant_pullback = latest_price < (breakout_price * 0.95) # 回调超过5% if pullback_to_yin_high and significant_pullback: # 检查是否已经发送过此类提醒(避免重复) alert_key = f"{stock_code}_{signal_date}_{latest_date}" if not hasattr(self, '_sent_pullback_alerts'): self._sent_pullback_alerts = set() if alert_key not in self._sent_pullback_alerts: pullback_alert = { 'stock_code': stock_code, 'stock_name': signal.get('stock_name', ''), 'signal_date': signal_date, 'current_date': latest_date, 'timeframe': signal.get('timeframe', 'daily'), 'yin_high': yin_high, 'breakout_price': breakout_price, 'current_price': latest_price, 'current_low': latest_low, 'pullback_pct': ((latest_price - breakout_price) / breakout_price) * 100, 'distance_to_yin_high': ((latest_low - yin_high) / yin_high) * 100, 'days_since_signal': days_since_signal, 'alert_type': 'pullback_to_yin_high' } pullback_alerts.append(pullback_alert) self._sent_pullback_alerts.add(alert_key) # 记录回踩提醒日志 logger.warning("⚠️" + "="*60) logger.warning(f"📉 价格回踩阴线最高点提醒!") logger.warning(f"📅 原信号时间: {signal_date} | 当前时间: {latest_date}") logger.warning(f"🏷️ 股票: {stock_code}({signal.get('stock_name', '')})") logger.warning(f"📊 周期: {signal.get('timeframe', 'daily')}") logger.warning(f"💰 阴线最高点: {yin_high:.2f}元") logger.warning(f"🚀 当时突破价: {breakout_price:.2f}元") logger.warning(f"💸 当前价格: {latest_price:.2f}元 | 最低: {latest_low:.2f}元") logger.warning(f"📉 回调幅度: {pullback_alert['pullback_pct']:.2f}%") logger.warning(f"📏 距阴线高点: {pullback_alert['distance_to_yin_high']:.2f}%") logger.warning(f"⏰ 信号后经过: {days_since_signal}天") logger.warning("⚠️" + "="*60) return pullback_alerts def add_triggered_signal(self, signal: Dict[str, Any]): """ 添加已触发的信号到监控列表 Args: signal: 信号字典 """ stock_code = signal.get('stock_code') if not stock_code: return if stock_code not in self.triggered_signals: self.triggered_signals[stock_code] = { 'signals': [], 'last_check_date': datetime.now().date() } # 添加信号到监控列表 self.triggered_signals[stock_code]['signals'].append(signal) # 只保留最近的信号(避免内存占用过多) max_signals_per_stock = 10 if len(self.triggered_signals[stock_code]['signals']) > max_signals_per_stock: # 按日期排序,保留最新的信号 self.triggered_signals[stock_code]['signals'].sort( key=lambda x: pd.to_datetime(x['date']) if isinstance(x['date'], str) else x['date'], reverse=True ) self.triggered_signals[stock_code]['signals'] = \ self.triggered_signals[stock_code]['signals'][:max_signals_per_stock] def monitor_pullback_for_triggered_signals(self) -> List[Dict[str, Any]]: """ 监控所有已触发信号的回踩情况 Returns: 所有回踩提醒信号列表 """ all_pullback_alerts = [] current_date = datetime.now().date() # 清理过期的信号 stocks_to_remove = [] for stock_code, signal_info in self.triggered_signals.items(): # 过滤掉过期的信号 valid_signals = [] for signal in signal_info['signals']: signal_date = signal['date'] if isinstance(signal_date, str): signal_date = pd.to_datetime(signal_date).date() elif hasattr(signal_date, 'date'): signal_date = signal_date.date() days_since_signal = (current_date - signal_date).days if days_since_signal <= self.monitor_days: valid_signals.append(signal) if valid_signals: self.triggered_signals[stock_code]['signals'] = valid_signals else: stocks_to_remove.append(stock_code) # 移除没有有效信号的股票 for stock_code in stocks_to_remove: del self.triggered_signals[stock_code] logger.info(f"🔍 当前监控中的股票数量: {len(self.triggered_signals)}") # 检查每只股票的回踩情况 for stock_code in self.triggered_signals.keys(): try: # 获取最近几天的数据 end_date = current_date.strftime('%Y-%m-%d') start_date = (current_date - timedelta(days=5)).strftime('%Y-%m-%d') current_data = self.data_fetcher.get_historical_data( stock_code, start_date, end_date, 'daily' ) if not current_data.empty: pullback_alerts = self.check_pullback_signals(stock_code, current_data) all_pullback_alerts.extend(pullback_alerts) except Exception as e: logger.error(f"监控股票 {stock_code} 回踩情况失败: {e}") # 发送回踩提醒通知 if all_pullback_alerts: try: success = self.notification_manager.send_pullback_alerts(all_pullback_alerts) if success: logger.info(f"📱 回踩提醒通知发送完成,共{len(all_pullback_alerts)}个提醒") else: logger.warning(f"📱 回踩提醒通知发送失败") except Exception as e: logger.error(f"发送回踩提醒通知失败: {e}") return all_pullback_alerts def _convert_to_weekly(self, daily_df: pd.DataFrame) -> pd.DataFrame: """ 将日线数据转换为周线数据 Args: daily_df: 日线数据 Returns: 周线数据 """ if daily_df.empty: return daily_df try: df = daily_df.copy() # 确保有trade_date列并设置为索引 if 'trade_date' in df.columns: df['trade_date'] = pd.to_datetime(df['trade_date']) df.set_index('trade_date', inplace=True) # 按周聚合 weekly_df = df.resample('W').agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' if 'volume' in df.columns else 'last' }).dropna() # 重置索引,保持trade_date列 weekly_df.reset_index(inplace=True) return weekly_df except Exception as e: logger.error(f"转换周线数据失败: {e}") return pd.DataFrame() def _convert_to_monthly(self, daily_df: pd.DataFrame) -> pd.DataFrame: """ 将日线数据转换为月线数据 Args: daily_df: 日线数据 Returns: 月线数据 """ if daily_df.empty: return daily_df try: df = daily_df.copy() # 确保有trade_date列并设置为索引 if 'trade_date' in df.columns: df['trade_date'] = pd.to_datetime(df['trade_date']) df.set_index('trade_date', inplace=True) # 按月聚合 monthly_df = df.resample('ME').agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' if 'volume' in df.columns else 'last' }).dropna() # 重置索引,保持trade_date列 monthly_df.reset_index(inplace=True) return monthly_df except Exception as e: logger.error(f"转换月线数据失败: {e}") return pd.DataFrame() def scan_market(self, stock_list: List[str] = None, max_stocks: int = 100, use_hot_stocks: bool = True, use_combined_sources: bool = True, use_all_a_shares: bool = False) -> Dict[str, Dict[str, List[Dict[str, Any]]]]: """ 扫描市场中的股票形态 Args: stock_list: 股票代码列表,如果为None则自动选择股票池 max_stocks: 最大扫描股票数量 use_hot_stocks: 是否使用热门股票数据,默认True use_combined_sources: 是否使用合并的双数据源(同花顺+东财),默认True use_all_a_shares: 是否使用所有A股股票(排除北交所和ST),优先级最高 Returns: 所有股票的分析结果 """ logger.info("🚀" + "="*70) logger.info("🌍 开始市场K线形态扫描") logger.info("🚀" + "="*70) # 创建扫描会话 scan_config = { 'max_stocks': max_stocks, 'use_hot_stocks': use_hot_stocks, 'use_combined_sources': use_combined_sources, 'use_all_a_shares': use_all_a_shares, 'timeframes': self.timeframes } session_id = self.db_manager.create_scan_session( strategy_id=self.strategy_id, scan_config=scan_config ) if stock_list is None: # 优先级1: 使用所有A股股票 if use_all_a_shares: try: logger.info("📊 获取所有A股股票数据(排除北交所和ST股票)...") filtered_stocks = self.data_fetcher.get_filtered_a_share_list() if not filtered_stocks.empty: # 如果max_stocks小于总股票数,随机采样 if max_stocks > 0 and max_stocks < len(filtered_stocks): # 按市值排序或随机选择,这里先随机选择 selected_stocks = filtered_stocks.sample(max_stocks) stock_list = selected_stocks['full_stock_code'].tolist() logger.info(f"📈 从{len(filtered_stocks)}只A股中随机选择{len(stock_list)}只进行分析") else: stock_list = filtered_stocks['full_stock_code'].tolist() logger.info(f"📈 分析全部{len(stock_list)}只A股股票") source_info = "全A股(排除北交所和ST)" else: logger.warning("获取A股列表失败,回退到热门股票") use_all_a_shares = False except Exception as e: logger.error(f"获取A股列表失败: {e},回退到热门股票") use_all_a_shares = False # 优先级2: 使用热门股票数据 if not use_all_a_shares and use_hot_stocks: try: if use_combined_sources: # 使用合并的双数据源 logger.info("获取合并热门股票数据(同花顺+东财)...") hot_stocks = self.data_fetcher.get_combined_hot_stocks( limit_per_source=max_stocks, final_limit=max_stocks ) source_info = "双数据源合并" else: # 仅使用同花顺数据 logger.info("获取同花顺热股TOP100数据...") hot_stocks = self.data_fetcher.get_hot_stocks_ths(limit=max_stocks) source_info = "同花顺热股" if not hot_stocks.empty and 'stock_code' in hot_stocks.columns: stock_list = hot_stocks['stock_code'].tolist() # 统计数据源分布 if 'source' in hot_stocks.columns: source_counts = hot_stocks['source'].value_counts().to_dict() source_detail = " | ".join([f"{k}: {v}只" for k, v in source_counts.items()]) logger.info(f"📊 数据源: {source_info} | 总计: {len(stock_list)}只股票") logger.info(f"📈 分布详情: {source_detail}") else: logger.info(f"📊 数据源: {source_info} | 总计: {len(stock_list)}只股票") else: logger.warning("热门股票数据为空,回退到全市场股票") use_hot_stocks = False except Exception as e: logger.error(f"获取热门股票失败: {e},回退到全市场股票") use_hot_stocks = False # 优先级3: 如果热股获取失败,使用全市场股票列表 if not use_all_a_shares and not use_hot_stocks: try: all_stocks = self.data_fetcher.get_stock_list() if not all_stocks.empty: # 随机选择一些股票进行扫描 stock_list = all_stocks['stock_code'].head(max_stocks).tolist() logger.info(f"使用全市场股票数据,共{len(stock_list)}只股票") else: logger.warning("未能获取股票列表") return {} except Exception as e: logger.error(f"获取股票列表失败: {e}") return {} results = {} total_signals = 0 for i, stock_code in enumerate(stock_list): # 获取股票名称 stock_name = self.data_fetcher.get_stock_name(stock_code) logger.info(f"⏳ 扫描进度: [{i+1:3d}/{len(stock_list):3d}] 🔍 {stock_code}({stock_name})") try: stock_results = self.analyze_stock(stock_code, session_id=session_id) # 统计信号数量 stock_signal_count = sum(len(signals) for signals in stock_results.values()) if stock_signal_count > 0: results[stock_code] = stock_results total_signals += stock_signal_count except Exception as e: logger.error(f"扫描股票 {stock_code} 失败: {e}") continue # 更新扫描会话统计 try: self.db_manager.update_scan_session_stats( session_id=session_id, total_scanned=len(stock_list), total_signals=total_signals ) logger.debug(f"扫描会话统计已更新: {session_id}") except Exception as e: logger.error(f"更新扫描会话统计失败: {e}") # 美化最终扫描结果 logger.info("🎉" + "="*70) logger.info(f"🌍 市场K线形态扫描完成!") logger.info(f"📊 扫描统计:") logger.info(f" 🔍 总扫描股票: {len(stock_list)} 只") logger.info(f" 🎯 发现信号: {total_signals} 个") logger.info(f" 📈 涉及股票: {len(results)} 只") logger.info(f" 💾 扫描会话ID: {session_id}") if results: logger.info(f"📋 信号详情:") signal_count = 0 for stock_code, stock_results in results.items(): stock_name = self.data_fetcher.get_stock_name(stock_code) for timeframe, signals in stock_results.items(): if signals: for signal in signals: signal_count += 1 logger.info(f" 🎯 #{signal_count}: {stock_code}({stock_name}) | {timeframe} | {signal['date']} | {signal['breakout_price']:.2f}元") logger.info("🎉" + "="*70) # 监控已触发信号的回踩情况 logger.info("🔍 开始监控已触发信号的回踩情况...") pullback_alerts = self.monitor_pullback_for_triggered_signals() if pullback_alerts: logger.info(f"⚠️ 发现 {len(pullback_alerts)} 个回踩提醒") # 发送汇总通知 if results: # 判断数据源类型 data_source = '全市场股票' if stock_list and len(stock_list) <= max_stocks: if use_hot_stocks: data_source = '合并热门股票' if use_combined_sources else '热门股票' scan_stats = { 'total_scanned': len(stock_list), 'data_source': data_source } try: success = self.notification_manager.send_strategy_summary(results, scan_stats) if success: logger.info("📱 策略信号汇总通知发送完成") else: logger.warning("📱 策略信号汇总通知发送失败") except Exception as e: logger.error(f"发送汇总通知失败: {e}") return results def get_strategy_summary(self) -> str: """获取策略说明""" return f""" K线形态策略 - 两阳线+阴线+阳线突破 策略逻辑: 1. 识别连续4根K线:阳线 + 阳线 + 阴线 + 阳线 2. 前两根阳线实体部分须占振幅的 {self.min_entity_ratio:.0%} 以上 3. 最后阳线实体部分须占振幅的 {self.final_yang_min_ratio:.0%} 以上 4. 最后阳线收盘价须高于阴线最高价(突破确认) 5. 最后阳线收盘价须在EMA20上方(趋势确认) 6. 最后阳线换手率不高于 {self.max_turnover_ratio:.1f}%(流动性约束) 7. 支持时间周期:{', '.join(self.timeframes)} 回踩监控功能: - 自动监控已触发信号后的价格走势 - 当价格回踩到阴线最高点附近时发送特殊提醒 - 回踩容忍度:{self.pullback_tolerance:.0%} - 监控期限:信号触发后 {self.monitor_days} 天 - 提醒条件:价格接近阴线最高点且相比突破价有明显回调 信号触发条件: - 形态完整匹配 - 实体比例达标 - 价格突破确认 - EMA20趋势确认 - 换手率约束达标 扫描范围: - 优先使用双数据源合并(同花顺热股+东财人气榜) - 自动去重,保留最优质股票 - 回退到全市场股票列表 通知方式: - 钉钉webhook汇总推送(10个信号一组分批发送) - 价格回踩特殊提醒(5个提醒一组分批发送) - 包含关键信息:代码、股票名称、K线时间、价格、周期等 - 系统日志详细记录 """ if __name__ == "__main__": # 测试代码 from ..data.data_fetcher import ADataFetcher from ..utils.notification import NotificationManager # 模拟配置 strategy_config = { 'min_entity_ratio': 0.55, 'timeframes': ['daily'] } notification_config = { 'dingtalk': { 'enabled': False, 'webhook_url': '' } } # 初始化组件 data_fetcher = ADataFetcher() notification_manager = NotificationManager(notification_config) strategy = KLinePatternStrategy(data_fetcher, notification_manager, strategy_config) print("K线形态策略初始化完成") print(strategy.get_strategy_summary())