"""趋势突破三阶段扫描管道 Phase 1: 全市场批量预筛 — get_daily_all × 5 天 + get_daily_basic → ~300 候选 Phase 2: 资金流批量过滤 — get_moneyflow_batch → ~80 候选 Phase 3: 逐股深度分析 — get_stock_daily(120d) + 入场信号 + 供需评分 → ~20 推荐 """ import logging from datetime import datetime, timedelta import numpy as np import pandas as pd from app.config import settings from app.data.tushare_client import tushare_client from app.analysis.technical import add_all_indicators from app.analysis.breakout_signals import ( classify_entry_signal, score_supply_demand, analyze_volume_pattern, EntrySignal, ) logger = logging.getLogger(__name__) async def scan_trend_breakout( trade_date: str = None, market_temp=None, hot_sectors: list = None, intraday: bool = False, ) -> list[dict]: """统一趋势突破扫描 — 三阶段管道 Returns: list[dict] — 每个字典包含: ts_code, name, sector, entry_signal_type, entry_signal_score, trend_timing_score, supply_demand_score, capital_score, main_net_inflow, inflow_ratio, turnover_rate, volume_ratio, circ_mv, pe, pb, volume_trend, demand_supply_ratio """ if not trade_date: trade_date = tushare_client.get_latest_trade_date() trade_date = _resolve_daily_basic_trade_date(trade_date) logger.info(f"=== 趋势突破扫描 (trade_date={trade_date}) ===") # Phase 1: 批量预筛 candidates = _bulk_pre_filter(trade_date) if candidates.empty: logger.info("Phase 1 预筛无候选") return [] logger.info(f"Phase 1 预筛: {len(candidates)} 只候选") # Phase 2: 资金流过滤 candidates = _bulk_capital_filter(candidates, trade_date) if candidates.empty: logger.info("Phase 2 资金过滤后无候选") return [] logger.info(f"Phase 2 资金过滤: {len(candidates)} 只候选") # Phase 3: 逐股深度分析 results = await _deep_analysis(candidates, trade_date, market_temp, hot_sectors or [], intraday) logger.info(f"Phase 3 深度分析: {len(results)} 只推荐") return results def _bulk_pre_filter(trade_date: str) -> pd.DataFrame: """Phase 1: 批量预筛 利用 get_daily_all × 5 天构建迷你时序,用 5 日趋势初筛。 合并 get_daily_basic 过滤市值/换手率。 排除 ST 和次新股。 """ # 获取交易日历 dates = tushare_client.get_trade_dates() if not dates: logger.warning("Phase 1: 无法获取交易日历") return pd.DataFrame() # 取目标日之前最近 5 个交易日,避免早盘把尚无日频行情的当天纳入窗口。 eligible_dates = [d for d in dates if d <= trade_date] recent_dates = eligible_dates[-5:] if len(eligible_dates) >= 5 else eligible_dates if trade_date not in recent_dates: recent_dates = recent_dates[-4:] + [trade_date] if len(recent_dates) >= 4 else [trade_date] logger.info(f"Phase 1: 使用交易日 {recent_dates}") # 获取多日全市场数据 daily_frames = [] for d in recent_dates: df = tushare_client.get_daily_all(d) if not df.empty: daily_frames.append(df) logger.debug(f"Phase 1: {d} 获取到 {len(df)} 只股票数据") else: logger.warning(f"Phase 1: {d} 无数据") if len(daily_frames) < 2: # 至少需要 2 天数据做趋势判断 if daily_frames: logger.info("Phase 1: 仅有1天数据,跳过趋势筛选") return _filter_daily_basic(daily_frames[0], trade_date) logger.warning("Phase 1: 完全无数据") return pd.DataFrame() # 拼接多日数据 all_daily = pd.concat(daily_frames, ignore_index=True) logger.info(f"Phase 1: 拼接后总记录数 {len(all_daily)}") # 计算每只股票的 5 日趋势 stock_groups = all_daily.groupby("ts_code") # 分步统计 total_stocks = len(stock_groups) count_has_data = 0 count_main_board = 0 count_no_limit = 0 count_positive_return = 0 count_above_avg = 0 count_near_high = 0 trend_data = [] for ts_code, group in stock_groups: if len(group) < 2: continue count_has_data += 1 group = group.sort_values("trade_date") latest = group.iloc[-1] # 排除非主板(只保留 00/30/60 开头) code_prefix = ts_code[:2] if code_prefix not in ("00", "30", "60"): continue count_main_board += 1 # 排除涨跌停 pct = latest["pct_chg"] if pd.isna(pct) or abs(pct) > 9.5: continue count_no_limit += 1 # 5 日涨幅 first_close = group.iloc[0]["close"] last_close = latest["close"] if first_close <= 0: continue ret_5d = (last_close - first_close) / first_close * 100 # 条件:5 日涨幅 > -2%(允许小幅回调) 且 < 20%(未过热) if ret_5d <= -2 or ret_5d >= 20: continue count_positive_return += 1 # 5 日均价 avg_close = group["close"].mean() if avg_close <= 0: continue # 收盘价 >= 5 日均价 * 0.98(允许略低于均线) if last_close < avg_close * 0.98: continue count_above_avg += 1 # 距 5 日高点 < 8%(放宽,不要求紧贴高点) high_5d = group["high"].max() if high_5d <= 0: continue dist_from_high = (high_5d - last_close) / high_5d * 100 if dist_from_high > 8: continue count_near_high += 1 trend_data.append({ "ts_code": ts_code, "close": last_close, "pct_chg_today": pct, "return_5d": round(ret_5d, 2), "dist_from_high_5d": round(dist_from_high, 2), "vol_today": latest["vol"], }) logger.info( f"Phase 1 趋势筛选: " f"总{total_stocks} → 有数据{count_has_data} → 主板{count_main_board} → " f"非涨跌停{count_no_limit} → 5日涨幅>-2%{count_positive_return} → " f"近均线{count_above_avg} → 近高点{count_near_high}" ) if not trend_data: return pd.DataFrame() candidates = pd.DataFrame(trend_data) # 合并 daily_basic 过滤(使用更宽松的换手率) return _filter_daily_basic(candidates, trade_date) def _resolve_daily_basic_trade_date(preferred: str) -> str: """盘中当日日频数据未更新时,回退到最近可用交易日。""" dates = tushare_client.get_trade_dates() if preferred and preferred not in dates: dates = sorted([*dates, preferred]) candidates = [d for d in dates if d <= preferred] if dates else [preferred] for date in reversed(candidates[-8:]): basic = tushare_client.get_daily_basic(date) daily = tushare_client.get_daily_all(date) if not basic.empty and not daily.empty: if date != preferred: logger.info("趋势扫描日频数据回退: %s -> %s", preferred, date) return date return preferred def _filter_daily_basic(candidates: pd.DataFrame, trade_date: str) -> pd.DataFrame: """使用 daily_basic 过滤市值、换手率,排除 ST 和次新""" basic = tushare_client.get_daily_basic(trade_date) if basic.empty: logger.warning("Phase 1: daily_basic 无数据") return candidates # 股票基本信息(排除 ST 和次新) stock_basic = tushare_client.get_stock_basic() exclude_codes = set() if not stock_basic.empty: # ST st_codes = set(stock_basic[stock_basic["name"].str.contains("ST", na=False)]["ts_code"]) exclude_codes.update(st_codes) # 次新 cutoff = (datetime.now() - timedelta(days=settings.min_list_days)).strftime("%Y%m%d") new_codes = set(stock_basic[stock_basic["list_date"] > cutoff]["ts_code"]) exclude_codes.update(new_codes) # Tushare circ_mv 单位是万元,转为亿元 basic["circ_mv"] = basic["circ_mv"] / 10000 # Phase 1 使用更宽松的换手率门槛(2%)以保留更多候选 min_tr = min(settings.min_turnover_rate, 2.0) basic_filtered = basic[ (basic["circ_mv"] >= settings.min_circ_mv) & (basic["circ_mv"] <= settings.max_circ_mv) & (basic["turnover_rate"] >= min_tr) & (basic["turnover_rate"] <= settings.max_turnover_rate) & (~basic["ts_code"].isin(exclude_codes)) ].copy() logger.info( f"Phase 1 daily_basic: 全市场{len(basic)}只 → " f"过滤后{len(basic_filtered)}只 " f"(circ_mv {settings.min_circ_mv}-{settings.max_circ_mv}亿, " f"turnover >={min_tr}%)" ) if basic_filtered.empty: return pd.DataFrame() # 合并候选和 basic 数据 if "circ_mv" in candidates.columns: merged = candidates else: # candidates 没有 basic 列,通过 ts_code 合并 basic_subset = basic_filtered[["ts_code", "turnover_rate", "volume_ratio", "circ_mv", "pe", "pb"]].drop_duplicates("ts_code") # 如果 candidates 有 ts_code 列 if "ts_code" in candidates.columns: merged = candidates.merge(basic_subset, on="ts_code", how="inner") else: # candidates 是 get_daily_all 返回的原始 df merged = basic_subset logger.info(f"Phase 1 合并后: {len(merged)} 只候选") return merged def _bulk_capital_filter(candidates: pd.DataFrame, trade_date: str) -> pd.DataFrame: """Phase 2: 资金流批量过滤 硬条件:主力净流入 > 0(特大单 + 大单净买为正) """ mf = tushare_client.get_moneyflow_batch(trade_date) if mf.empty: return candidates # 计算主力净流入 = (特大单买入-特大单卖出) + (大单买入-大单卖出) mf["main_net_inflow"] = ( (mf["buy_elg_amount"] - mf["sell_elg_amount"]) + (mf["buy_lg_amount"] - mf["sell_lg_amount"]) ) # 计算流入比例 total = ( mf["buy_elg_amount"] + mf["sell_elg_amount"] + mf["buy_lg_amount"] + mf["sell_lg_amount"] + mf["buy_md_amount"] + mf["sell_md_amount"] + mf["buy_sm_amount"] + mf["sell_sm_amount"] ) mf["inflow_ratio"] = mf["main_net_inflow"] / total.replace(0, np.nan) * 100 mf["inflow_ratio"] = mf["inflow_ratio"].fillna(0) # 只保留主力净流入 > 0 的 mf_positive = mf[mf["main_net_inflow"] > 0][["ts_code", "main_net_inflow", "inflow_ratio"]] # 合并候选 if "ts_code" in candidates.columns: merged = candidates.merge(mf_positive, on="ts_code", how="inner") else: merged = mf_positive # 按主力净流入排序,取 top 100 if not merged.empty: merged = merged.sort_values("main_net_inflow", ascending=False).head(100) return merged.reset_index(drop=True) async def _deep_analysis( candidates: pd.DataFrame, trade_date: str, market_temp, hot_sectors: list, intraday: bool, ) -> list[dict]: """Phase 3: 逐股深度分析 对每只候选获取 120 日 K 线,计算技术指标, 分类入场信号,评分供需关系。 """ import asyncio from app.analysis.signals import generate_signals from app.analysis.capital_flow import _score_valuation # 获取股票名称映射 stock_basic = tushare_client.get_stock_basic() name_map = {} industry_map = {} if not stock_basic.empty: for _, row in stock_basic.iterrows(): name_map[row["ts_code"]] = row["name"] industry_map[row["ts_code"]] = row.get("industry", "") # 获取多日资金流(用于资金持续性评分) mf_history = _get_multi_day_moneyflow(candidates, trade_date) results = [] total = len(candidates) signal_counts = {"breakout": 0, "breakout_confirm": 0, "pullback": 0, "launch": 0, "reversal": 0, "none": 0} for idx, row in candidates.iterrows(): ts_code = row["ts_code"] if "ts_code" in candidates.columns else "" if not ts_code: continue name = name_map.get(ts_code, ts_code) sector = industry_map.get(ts_code, "") try: # 获取 120 日 K 线 df = tushare_client.get_stock_daily(ts_code, 120) if df.empty or len(df) < 30: continue # 添加技术指标 df = add_all_indicators(df) # 入场信号分类 entry_signal = classify_entry_signal(df) signal_type = entry_signal["signal_type"] if signal_type == EntrySignal.NONE: signal_counts["none"] += 1 continue signal_counts[signal_type.value] += 1 # 趋势评分(内联简化版) trend_score = _simple_trend_score(df) # 供需评分 sd_score = score_supply_demand(df) # 量价模式分析 vol_pattern = analyze_volume_pattern(df) # 技术信号(复用现有 generate_signals) tech_signal = generate_signals(ts_code, name) # 资金流评分 capital_score = _score_capital( row, mf_history.get(ts_code, pd.DataFrame()) ) # 估值评分(作为辅助参考) pe = row.get("pe") pb = row.get("pb") valuation_score = _score_valuation(pe, pb) results.append({ "ts_code": ts_code, "name": name, "sector": sector, "entry_signal_type": entry_signal["signal_type"].value, "entry_signal_score": round(entry_signal["signal_score"], 1), "entry_signal_details": entry_signal.get("details", {}), "trend_timing_score": round(trend_score, 1), "supply_demand_score": round(sd_score, 1), "capital_score": round(capital_score, 1), "valuation_score": round(valuation_score, 1), "technical_score": round(tech_signal.score, 1), "position_score": round(tech_signal.position_score, 1), "main_net_inflow": row.get("main_net_inflow", 0), "inflow_ratio": round(row.get("inflow_ratio", 0), 2), "turnover_rate": row.get("turnover_rate"), "volume_ratio": row.get("volume_ratio"), "circ_mv": row.get("circ_mv"), "pe": pe, "pb": pb, "volume_trend": vol_pattern["volume_trend"], "demand_supply_ratio": vol_pattern["demand_supply_ratio"], # 技术信号详情(用于生成推荐理由) "tech_signal": tech_signal, }) if len(results) >= settings.top_stock_count: break except Exception as e: logger.debug(f"深度分析 {ts_code} 失败: {e}") continue # 让出控制权,避免阻塞事件循环 if idx % 10 == 0: await asyncio.sleep(0) logger.info( f"Phase 3 入场信号分布: " f"突破={signal_counts['breakout']} 确认={signal_counts['breakout_confirm']} " f"回踩={signal_counts['pullback']} 启动={signal_counts['launch']} " f"反转={signal_counts['reversal']} 无信号={signal_counts['none']} " f"(共分析{total}只)" ) return results def _get_multi_day_moneyflow(candidates: pd.DataFrame, trade_date: str) -> dict[str, pd.DataFrame]: """获取候选股票近 5 日资金流数据(用于资金持续性评分) 利用 get_stock_moneyflow 逐只获取(仅对 Phase 2 过滤后的 ~80 只)。 """ result = {} if "ts_code" not in candidates.columns: return result for ts_code in candidates["ts_code"].values[:80]: try: df = tushare_client.get_stock_moneyflow(ts_code, 5) if not df.empty: result[ts_code] = df except Exception: pass return result def _simple_trend_score(df: pd.DataFrame) -> float: """简化趋势评分 (0-100),用于回退扫描路径""" import numpy as np score = 0 last = df.iloc[-1] ma_cols = [c for c in ["ma5", "ma10", "ma20", "ma60"] if c in df.columns] if len(ma_cols) >= 4 and not any(pd.isna(last[c]) for c in ma_cols): if last["ma5"] > last["ma10"] > last["ma20"] > last["ma60"]: score += 50 elif last["ma5"] > last["ma10"] > last["ma20"]: score += 35 elif last["ma5"] > last["ma20"]: score += 20 if len(df) >= 20: recent = df.tail(20) if recent["high"].iloc[10:].max() > recent["high"].iloc[:10].max(): score += 30 if recent["low"].iloc[10:].min() > recent["low"].iloc[:10].min(): score += 20 return min(score, 100) def _score_capital(stock_row: dict | pd.Series, mf_history: pd.DataFrame) -> float: """资金流评分 (0-100) 维度: - 当日主力净流入规模 (35) - 资金持续性 (35): 近 N 日中主力净流入为正的天数 - 流入比 (15) - 量比 (15) """ score = 0 main_net = stock_row.get("main_net_inflow", 0) or 0 inflow_ratio = stock_row.get("inflow_ratio", 0) or 0 volume_ratio = stock_row.get("volume_ratio") # 当日主力净流入规模 (35) if main_net > 10000: score += 35 elif main_net > 5000: score += 28 elif main_net > 2000: score += 20 elif main_net > 500: score += 12 elif main_net > 0: score += 5 # 资金持续性 (35) if not mf_history.empty and len(mf_history) >= 2: positive_days = 0 for _, r in mf_history.iterrows(): net = (r.get("buy_elg_amount", 0) - r.get("sell_elg_amount", 0) + r.get("buy_lg_amount", 0) - r.get("sell_lg_amount", 0)) if net > 0: positive_days += 1 total_days = len(mf_history) if total_days >= 3 and positive_days >= total_days * 0.7: score += 35 elif total_days >= 2 and positive_days >= total_days * 0.5: score += 25 elif positive_days >= 1: score += 12 else: # 只有一天数据且为正(已经通过 Phase 2 过滤) score += 15 # 流入比 (15) if inflow_ratio > 15: score += 15 elif inflow_ratio > 10: score += 12 elif inflow_ratio > 5: score += 8 elif inflow_ratio > 0: score += 4 # 量比 (15) if volume_ratio: if volume_ratio > 2.0: score += 15 elif volume_ratio > 1.5: score += 10 elif volume_ratio > 1.0: score += 5 return min(score, 100)