"""潜在启动股扫描器(Channel B) 从全市场中寻找底部蓄势、即将启动的股票。 不依赖板块热度,独立于 Channel A(强中选强)。 筛选逻辑: 1. 基本面预筛:PE/PB 合理、市值适中、非 ST 非次新 2. 底部形态:股价在 60 日低点附近,近期不再创新低 3. 缩量蓄势:成交量萎缩至地量水平 4. 技术早期信号:MACD 底部金叉、RSI 从超卖回升等 """ import logging import pandas as pd import numpy as np from datetime import datetime, timedelta from app.data.tushare_client import tushare_client from app.analysis.technical import add_all_indicators from app.analysis.capital_flow import _score_valuation from app.config import settings logger = logging.getLogger(__name__) def scan_potential_breakout( trade_date: str = None, exclude_codes: set[str] = None, ) -> list[dict]: """扫描全市场潜在启动股票 Args: trade_date: 交易日期,默认最新 exclude_codes: 需要排除的股票代码集合(Channel A 已推荐的) Returns: list[dict]: [{ts_code, name, sector, capital_score, ...}] """ if not trade_date: trade_date = tushare_client.get_latest_trade_date() exclude_codes = exclude_codes or set() # ── 第一步:基本面预筛 ── basic_df = tushare_client.get_daily_basic(trade_date) if basic_df.empty: logger.warning("Channel B: daily_basic 数据为空") return [] stock_basic = tushare_client.get_stock_basic() # 过滤 ST st_codes = set() if not stock_basic.empty: st_codes = set(stock_basic[stock_basic["name"].str.contains("ST", na=False)]["ts_code"]) # 过滤次新(上市不足 min_list_days 天) new_codes = set() if not stock_basic.empty: cutoff = (datetime.now() - timedelta(days=settings.min_list_days)).strftime("%Y%m%d") new_codes = set(stock_basic[stock_basic["list_date"] > cutoff]["ts_code"]) # 基本面硬性条件:仅沪深主板(00/60) + 创业板(30) # 自动排除:北证(xxxxx.BJ)、科创板(68xxxx.SH)、ST、次新 eligible = basic_df[ (basic_df["ts_code"].str.startswith(("00", "30", "60"))) & (~basic_df["ts_code"].isin(st_codes)) & (~basic_df["ts_code"].isin(new_codes)) & (~basic_df["ts_code"].isin(exclude_codes)) ].copy() # PE 筛选:0-60(排除亏损和泡沫) if "pe_ttm" in eligible.columns: eligible = eligible[ eligible["pe_ttm"].notna() & (eligible["pe_ttm"] > 0) & (eligible["pe_ttm"] <= 60) ] # PB 筛选:0-8 if "pb" in eligible.columns: eligible = eligible[ eligible["pb"].notna() & (eligible["pb"] > 0) & (eligible["pb"] <= 8) ] # 流通市值:30-500 亿 if "circ_mv" in eligible.columns: eligible["circ_mv_yi"] = eligible["circ_mv"] / 10000 # 万 → 亿 eligible = eligible[ (eligible["circ_mv_yi"] >= 30) & (eligible["circ_mv_yi"] <= 500) ] # 换手率 > 1%(有一定活跃度) if "turnover_rate" in eligible.columns: eligible = eligible[eligible["turnover_rate"].notna() & (eligible["turnover_rate"] > 1)] # 当日跌幅或微涨(底部区域:涨幅 < 3%,排除已启动的) if "pct_chg" in eligible.columns: eligible = eligible[ eligible["pct_chg"].notna() & (eligible["pct_chg"] < 3) ] if eligible.empty: logger.warning("Channel B: 基本面预筛后无股票") return [] # 按换手率排序取 Top N(活跃度高的优先,避免扫描过多) if "turnover_rate" in eligible.columns: eligible = eligible.sort_values("turnover_rate", ascending=False) max_candidates = 300 # 最多扫描 300 只,控制 K 线请求量 candidate_codes = eligible["ts_code"].head(max_candidates).tolist() logger.info(f"Channel B: 基本面预筛后 {len(eligible)} 只,取 Top {len(candidate_codes)} 只进行技术扫描") # ── 第二步:技术面扫描 ── results = [] processed = 0 for ts_code in candidate_codes: try: df = tushare_client.get_stock_daily(ts_code, days=120) if df.empty or len(df) < 30: continue df = df.sort_values("trade_date").reset_index(drop=True) df = add_all_indicators(df) # 底部形态检测 if not _check_bottom_reversal(df): continue # 缩量蓄势检测 volume_shrink = _check_volume_shrink(df) # 技术早期信号(至少满足一个) early_signal = _check_technical_early_signal(df) if not early_signal: continue # 计算评分 score = _score_potential(df, volume_shrink) # 估值评分 row_data = eligible[eligible["ts_code"] == ts_code] pe = None pb = None circ_mv = None turnover_rate = 0 if not row_data.empty: r = row_data.iloc[0] pe = float(r["pe_ttm"]) if pd.notna(r.get("pe_ttm")) else None pb = float(r["pb"]) if pd.notna(r.get("pb")) else None circ_mv = float(r.get("circ_mv_yi", 0)) or None turnover_rate = float(r.get("turnover_rate", 0) or 0) valuation_score = _score_valuation(pe, pb) # 股票名称 name = ts_code if not stock_basic.empty: name_row = stock_basic[stock_basic["ts_code"] == ts_code] if not name_row.empty: name = name_row.iloc[0]["name"] # 行业作为 sector sector = "" if not stock_basic.empty: ind_row = stock_basic[stock_basic["ts_code"] == ts_code] if not ind_row.empty: sector = str(ind_row.iloc[0].get("industry", "")) results.append({ "ts_code": ts_code, "name": name, "sector": sector, "sectors": [sector] if sector else [], "capital_score": round(score, 1), "valuation_score": round(valuation_score, 1), "main_net_inflow": 0, "inflow_ratio": 0, "turnover_rate": round(turnover_rate, 2), "volume_ratio": None, "circ_mv": circ_mv, "pe": round(pe, 2) if pe else None, "pb": round(pb, 2) if pb else None, }) processed += 1 except Exception as e: logger.debug(f"Channel B 扫描 {ts_code} 异常: {e}") continue # 按评分排序 results.sort(key=lambda x: x["capital_score"], reverse=True) top = results[:settings.top_stock_count] logger.info(f"Channel B: 扫描完成,{len(results)} 只底部股票,取 Top {len(top)}") for r in top: logger.info( f"Channel B: {r['name']}({r['ts_code']}) " f"评分={r['capital_score']} 估值={r['valuation_score']}" ) return top def _check_bottom_reversal(df: pd.DataFrame) -> bool: """底部反转形态检测 条件(满足任一即可): 1. 股价距 60 日低点 < 10%(深度底部) 2. 股价距 60 日低点 < 20% 且近 5 日止跌企稳 """ if len(df) < 20: return False lookback = min(60, len(df)) recent = df.tail(lookback) last = df.iloc[-1] low_60d = recent["low"].min() dist_from_low = (last["close"] - low_60d) / low_60d * 100 # 深度底部:距低点 < 10%,直接通过 if dist_from_low < 10: return True # 一般底部:距低点 < 20% 且近期止跌 if dist_from_low > 20: return False # 止跌检测:近 5 日最低价 vs 之前 5 日最低价 last_5 = df.tail(5) if len(df) >= 10: prev_5 = df.iloc[-10:-5] recent_min = last_5["low"].min() prev_min = prev_5["low"].min() return recent_min >= prev_min * 0.97 # 允许 3% 误差 return False def _check_volume_shrink(df: pd.DataFrame) -> bool: """缩量蓄势检测 条件:近 3 日均量 < vol_ma20 * 0.7 """ if len(df) < 20: return False last = df.iloc[-1] vol_ma20 = last.get("vol_ma10", last["vol"]) # 用 vol_ma10 近似 recent_3_vol = df["vol"].tail(3).mean() return recent_3_vol < vol_ma20 * 0.7 def _check_technical_early_signal(df: pd.DataFrame) -> bool: """技术早期信号检测(满足任一即可) 1. MACD 底部金叉(DIF 从负区上穿 DEA) 2. MACD 柱状线由负转正或缩短(底部动能转换) 3. RSI 从超卖区回升(5 日前 RSI < 40,当前 > 40) 4. 底部放量长阳(近 10 日内有涨幅 >3% 且量 > vol_ma5*1.3) 5. MA5 开始向上拐头(短期趋势转变) 6. 布林带下轨支撑(触及下轨后反弹) """ if len(df) < 10: return False last = df.iloc[-1] prev = df.iloc[-2] # 信号 1:MACD 底部金叉 if (prev["dif"] <= prev["dea"] and last["dif"] > last["dea"] and last["dif"] < 0): return True # 信号 2:MACD 柱状线缩短(绿柱变短,动能衰减) if len(df) >= 3: hist_1 = last["macd_hist"] hist_2 = prev["macd_hist"] hist_3 = df.iloc[-3]["macd_hist"] if hist_1 < 0 and hist_2 < 0 and hist_1 > hist_2 and hist_2 > hist_3: return True # 绿柱连续缩短 # 信号 3:RSI 从超卖区回升 if len(df) >= 6: rsi_5d_ago = df.iloc[-6].get("rsi14", 50) rsi_now = last.get("rsi14", 50) if rsi_5d_ago < 40 and rsi_now > 40: return True # 信号 4:底部放量长阳 recent_10 = df.tail(10) for _, row in recent_10.iterrows(): pct = row.get("pct_chg", 0) or 0 vol = row["vol"] vol_ma = row.get("vol_ma5", vol) if pct > 3 and vol > vol_ma * 1.3: return True # 信号 5:MA5 向上拐头(3 日前下降,现在上升) if len(df) >= 4: ma5_now = last["ma5"] ma5_1d = prev["ma5"] ma5_3d = df.iloc[-4]["ma5"] if ma5_1d < ma5_3d and ma5_now > ma5_1d: # 先降后升,拐头 return True # 信号 6:布林带下轨支撑后反弹 if "boll_lower" in df.columns: recent_5 = df.tail(5) touched = any(row["low"] <= row["boll_lower"] * 1.01 for _, row in recent_5.iterrows()) if touched and last["close"] > last["boll_lower"]: return True return False def _score_potential(df: pd.DataFrame, volume_shrink: bool) -> float: """潜在启动股评分(0-100) 评分维度: - 底部位置得分 (35%): 距 60 日低点越近越好 - 技术信号强度 (30%): MACD/RSI/量价共振 - 缩量蓄势 (20%): 成交量萎缩程度 - 均线形态 (15%): 均线收敛或即将突破 """ score = 0.0 last = df.iloc[-1] lookback = min(60, len(df)) low_60d = df["low"].tail(lookback).min() dist_from_low = (last["close"] - low_60d) / low_60d * 100 # 1) 底部位置得分 (35 分) if dist_from_low < 5: score += 35 # 几乎在最低点 elif dist_from_low < 8: score += 28 elif dist_from_low < 12: score += 20 elif dist_from_low <= 15: score += 12 elif dist_from_low <= 20: score += 6 # 距低点较远但仍属底部区域 # 2) 技术信号强度 (30 分) signal_count = 0 prev = df.iloc[-2] if len(df) >= 2 else last # MACD 金叉 if prev["dif"] <= prev["dea"] and last["dif"] > last["dea"] and last["dif"] < 0: signal_count += 1 # RSI 回升 if len(df) >= 6: rsi_5d_ago = df.iloc[-6].get("rsi14", 50) rsi_now = last.get("rsi14", 50) if rsi_5d_ago < 35 and rsi_now > 40: signal_count += 1 # 放量长阳 recent_10 = df.tail(10) for _, row in recent_10.iterrows(): pct = row.get("pct_chg", 0) or 0 vol = row["vol"] vol_ma = row.get("vol_ma5", vol) if pct > 3 and vol > vol_ma * 1.5: signal_count += 1 break if signal_count >= 3: score += 30 elif signal_count == 2: score += 22 elif signal_count == 1: score += 14 # 3) 缩量蓄势 (20 分) if volume_shrink: recent_3_vol = df["vol"].tail(3).mean() vol_ma10 = last.get("vol_ma10", df["vol"].mean()) shrink_ratio = recent_3_vol / vol_ma10 if vol_ma10 > 0 else 1 if shrink_ratio < 0.4: score += 20 # 极度缩量 elif shrink_ratio < 0.55: score += 15 elif shrink_ratio < 0.7: score += 10 else: score += 3 # 4) 均线形态 (15 分): 短期均线开始上翘 if len(df) >= 20: ma5_now = last["ma5"] ma5_3d = df.iloc[-3]["ma5"] if len(df) >= 3 else ma5_now ma10_now = last["ma10"] ma20_now = last["ma20"] # MA5 上穿 MA10 if ma5_now > ma10_now and df.iloc[-2]["ma5"] <= df.iloc[-2]["ma10"]: score += 15 # 金叉 # MA5 向上 elif ma5_now > ma5_3d: score += 10 # 均线收敛(MA5 接近 MA20) elif abs(ma5_now - ma20_now) / ma20_now < 0.02: score += 8 return score