diff --git a/config/config.yaml b/config/config.yaml index 3d01c79..7e1fd9f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -59,6 +59,7 @@ strategy: timeframes: ["1h", "daily", "weekly"] # 支持的时间周期 scan_stocks_count: 5000 # 扫描股票数量限制 analysis_days: 60 # 分析的历史天数 + follow_up_days: 30 # 后续强势验证天数(形态完成后N天内不回踩阴线高点且不跌破EMA20) # 回踩监控配置 pullback_tolerance: 0.02 # 回踩容忍度(2%),价格接近阴线最高点的阈值 diff --git a/scripts/init_container.sh b/scripts/init_container.sh index d5f48e5..701bfc2 100644 --- a/scripts/init_container.sh +++ b/scripts/init_container.sh @@ -74,6 +74,7 @@ strategy: timeframes: ["1h", "daily", "weekly"] # 支持的时间周期 scan_stocks_count: 100 # Docker环境默认扫描股票数量 analysis_days: 60 # 分析的历史天数 + follow_up_days: 30 # 后续强势验证天数(形态完成后N天内不回踩阴线高点且不跌破EMA20) # 回踩监控配置 pullback_tolerance: 0.02 # 回踩容忍度(2%),价格接近阴线最高点的阈值 diff --git a/src/strategy/kline_pattern_strategy.py b/src/strategy/kline_pattern_strategy.py index bd8204b..3ad8eb7 100644 --- a/src/strategy/kline_pattern_strategy.py +++ b/src/strategy/kline_pattern_strategy.py @@ -110,6 +110,48 @@ class KLinePatternStrategy: return df + def check_follow_up_strength(self, df: pd.DataFrame, pattern_index: int, yin_high: float, ema20_price: float) -> bool: + """ + 检查形态发生后的后续强势条件 + + Args: + df: K线数据DataFrame + pattern_index: 形态完成的索引位置(第4根阳线的位置) + yin_high: 阴线的最高价 + ema20_price: 形态完成时的EMA20价格 + + Returns: + bool: 是否满足后续强势条件 + """ + # 获取配置的后续验证天数,默认为3天 + follow_up_days = self.config.get('follow_up_days', 3) + + # 检查是否有足够的后续数据 + if pattern_index + follow_up_days >= len(df): + # 如果没有足够的后续数据,说明是最新的形态,暂时通过验证 + return True + + # 检查后续N天的K线 + for j in range(1, follow_up_days + 1): + if pattern_index + j >= len(df): + break + + next_kline = df.iloc[pattern_index + j] + + # 检查1: 最低价不能回踩阴线最高价 + if next_kline['low'] <= yin_high: + logger.debug(f"后续第{j}天最低价{next_kline['low']:.2f}回踩阴线最高价{yin_high:.2f},不符合强势条件") + return False + + # 检查2: 收盘价不能跌破EMA20 + next_ema20 = next_kline.get('ema20', ema20_price) + if next_kline['close'] <= next_ema20: + logger.debug(f"后续第{j}天收盘价{next_kline['close']:.2f}跌破EMA20价格{next_ema20:.2f},不符合强势条件") + return False + + logger.debug(f"后续{follow_up_days}天强势验证通过") + return True + def detect_pattern(self, df: pd.DataFrame) -> List[Dict[str, Any]]: """ 检测"两阳线+阴线+阳线"形态 @@ -172,6 +214,12 @@ class KLinePatternStrategy: if not turnover_valid: continue + # 检查后续强势条件(如果有后续K线数据的话) + follow_up_strength_valid = self.check_follow_up_strength(df, i, k3['high'], k4.get('ema20', 0)) + + if not follow_up_strength_valid: + continue + # 构建信号 signal = { 'index': i, @@ -190,7 +238,9 @@ class KLinePatternStrategy: 'breakout_pct': (k4['close'] - k3['high']) / k3['high'] * 100 if k3['high'] > 0 else 0, 'ema20_price': k4.get('ema20', 0), 'above_ema20': k4.get('above_ema20', False), - 'turnover_ratio': turnover_ratio + 'turnover_ratio': turnover_ratio, + 'follow_up_strength': follow_up_strength_valid, + 'follow_up_days': self.config.get('follow_up_days', 3) } signals.append(signal) @@ -204,6 +254,7 @@ class KLinePatternStrategy: logger.info(f"💥 突破幅度: {signal['breakout_pct']:.2f}% (突破阴线最高价{signal['yin_high']:.2f}元)") logger.info(f"📈 EMA20: {signal['ema20_price']:.2f}元 ({'✅上方' if signal['above_ema20'] else '❌下方'})") logger.info(f"🔄 换手率: {signal['turnover_ratio']:.2f}% ({'✅合规' if signal['turnover_ratio'] <= self.max_turnover_ratio else '❌过高'})") + logger.info(f"💪 后续强势: {'✅通过' if signal['follow_up_strength'] else '❌未通过'}({signal['follow_up_days']}天验证)") logger.info("🎯" + "="*60) return signals diff --git a/web/app.py b/web/app.py index 74be1fd..5cd7280 100644 --- a/web/app.py +++ b/web/app.py @@ -7,7 +7,7 @@ AI 智能选股大师 Web 展示界面 import sys from pathlib import Path from flask import Flask, render_template, jsonify, request -from datetime import datetime, date, timedelta +from datetime import datetime, date, timedelta, timezone import pandas as pd # 添加项目根目录到路径 @@ -57,20 +57,32 @@ def signals(): timeframe=timeframe if timeframe else None ) - # 按扫描日期分组,每组内按信号日期排序 + # 按扫描时间(精确到小时)分组,每组内按信号日期排序 signals_grouped = {} if not signals_df.empty: # 确保scan_time是datetime类型 signals_df['scan_time'] = pd.to_datetime(signals_df['scan_time']) - signals_df['scan_date'] = signals_df['scan_time'].dt.date - # 按扫描日期分组 - for scan_date, group in signals_df.groupby('scan_date'): + # 转换为东八区时间 + china_tz = timezone(timedelta(hours=8)) + + # 如果是naive datetime,假设是UTC时间 + if signals_df['scan_time'].dt.tz is None: + signals_df['scan_time'] = signals_df['scan_time'].dt.tz_localize('UTC') + + # 转换为东八区时间 + signals_df['scan_time_china'] = signals_df['scan_time'].dt.tz_convert(china_tz) + + # 创建小时级别的分组键(基于东八区时间) + signals_df['scan_hour'] = signals_df['scan_time_china'].dt.floor('h') + + # 按扫描小时分组 + for scan_hour, group in signals_df.groupby('scan_hour'): # 每组内按信号日期排序(降序) group_sorted = group.sort_values('signal_date', ascending=False) - signals_grouped[scan_date] = group_sorted + signals_grouped[scan_hour] = group_sorted - # 按扫描日期排序(最新的在前) + # 按扫描小时排序(最新的在前) signals_grouped = dict(sorted(signals_grouped.items(), key=lambda x: x[0], reverse=True)) # 分页处理 @@ -80,18 +92,27 @@ def signals(): # 将分组数据展平用于分页 flattened_signals = [] - for scan_date, group in signals_grouped.items(): + for scan_hour, group in signals_grouped.items(): flattened_signals.extend(group.to_dict('records')) paginated_signals = flattened_signals[start_idx:end_idx] - # 重新按扫描日期分组分页后的数据 + # 重新按扫描小时分组分页后的数据 paginated_grouped = {} for signal in paginated_signals: - scan_date = pd.to_datetime(signal['scan_time']).date() - if scan_date not in paginated_grouped: - paginated_grouped[scan_date] = [] - paginated_grouped[scan_date].append(signal) + # 转换为东八区时间进行分组 + china_tz = timezone(timedelta(hours=8)) + + scan_time = pd.to_datetime(signal['scan_time']) + # 如果是naive datetime,假设是UTC时间 + if scan_time.tz is None: + scan_time = scan_time.tz_localize('UTC') + # 转换为东八区时间并按小时分组 + scan_hour = scan_time.tz_convert(china_tz).floor('h') + + if scan_hour not in paginated_grouped: + paginated_grouped[scan_hour] = [] + paginated_grouped[scan_hour].append(signal) # 计算分页信息 total_pages = (total_records + per_page - 1) // per_page @@ -188,8 +209,7 @@ def datetime_format(value, format='%Y-%m-%d %H:%M'): if value is None: return '' - # 导入时区支持 - from datetime import timezone, timedelta + # 使用已导入的时区支持 if isinstance(value, str): try: @@ -244,4 +264,4 @@ if __name__ == '__main__': print("📊 访问地址: http://localhost:8080") print("=" * 60) - app.run(host='0.0.0.0', port=8080, debug=True) \ No newline at end of file + app.run(host='0.0.0.0', port=8081, debug=True) \ No newline at end of file diff --git a/web/templates/signals.html b/web/templates/signals.html index c9f19de..b667235 100644 --- a/web/templates/signals.html +++ b/web/templates/signals.html @@ -55,16 +55,16 @@