"""板块热度扫描 综合板块涨幅、资金净流入、涨停家数、持续性, 输出热门板块排名。 优化策略:先用板块资金流向批量数据预筛 Top 板块, 只对 Top 板块做逐个详细查询(ths_daily/ths_member), 避免遍历全部数百个板块导致大量 API 调用。 """ import logging import pandas as pd import numpy as np from datetime import datetime, timedelta from app.data.tushare_client import tushare_client from app.data.models import SectorInfo from app.config import settings logger = logging.getLogger(__name__) # 预筛阶段取 Top N 板块做详细查询 PRE_FILTER_COUNT = 30 def _normalize_score(values: list[float], reverse: bool = False) -> list[float]: """将数值列表归一化到 0-100""" if not values: return [] min_v, max_v = min(values), max(values) if max_v == min_v: return [50.0] * len(values) normalized = [(v - min_v) / (max_v - min_v) * 100 for v in values] if reverse: normalized = [100 - v for v in normalized] return normalized def scan_hot_sectors(trade_date: str = None) -> list[SectorInfo]: """扫描热门板块,返回按热度排名的板块列表""" if not trade_date: trade_date = tushare_client.get_latest_trade_date() # ── 第一步:用板块资金流向批量预筛(1 次 API 调用)── sector_mf = tushare_client.get_sector_moneyflow(trade_date) if sector_mf.empty: logger.warning("板块资金流向数据为空") return [] # 按净流入金额排序,取 Top N 做详细分析 sector_mf = sector_mf.sort_values("net_amount", ascending=False) top_mf = sector_mf.head(PRE_FILTER_COUNT) top_codes = set(top_mf["ts_code"].tolist()) logger.info(f"板块资金流向预筛: {len(sector_mf)} 个板块 -> Top {len(top_codes)} 进入详细分析") # 构建资金流向查找表 mf_lookup = {} for _, row in sector_mf.iterrows(): mf_lookup[row["ts_code"]] = float(row["net_amount"]) # 构建板块名称查找表 # moneyflow_ind_ths 返回的是行业板块(881XXX.TI),自带 industry 列 name_lookup = {} if "industry" in sector_mf.columns: for _, r in sector_mf.iterrows(): if pd.notna(r.get("industry")): name_lookup[r["ts_code"]] = str(r["industry"]) # 补充:从 ths_index type=I(行业板块)获取名称 index_list = tushare_client.get_ths_index_list("I") if not index_list.empty: for _, r in index_list.iterrows(): if r["ts_code"] not in name_lookup: name_lookup[r["ts_code"]] = r["name"] # ── 第二步:获取涨跌停列表(1 次 API 调用)── limit_df = tushare_client.get_limit_list(trade_date) limit_up_codes = set() if not limit_df.empty: up_df = limit_df[limit_df["limit"] == "U"] up_df = up_df[~up_df["name"].str.contains("ST", na=False)] limit_up_codes = set(up_df["ts_code"].tolist()) # ── 第三步:只对 Top 板块做逐个详细查询 ── sectors = [] for ts_code in top_codes: # 板块名称从 ths_index 查找表获取 sector_name = name_lookup.get(ts_code, ts_code) # 板块日线 - 获取近5日数据(1 次 API) ths_daily = tushare_client.get_ths_daily(ts_code, days=5) pct_change = 0.0 days_continuous = 0 if not ths_daily.empty: ths_daily = ths_daily.sort_values("trade_date") # 当日涨跌幅 today_data = ths_daily[ths_daily["trade_date"] == trade_date] if today_data.empty: today_data = ths_daily.tail(1) pct_change = float(today_data["pct_change"].iloc[0]) if not today_data.empty else 0 # 连续上涨天数 for _, d in ths_daily.iloc[::-1].iterrows(): if d["pct_change"] > 0: days_continuous += 1 else: break # 板块资金净流入(从预筛数据中直接取) capital_inflow = mf_lookup.get(ts_code, 0.0) # 板块内涨停家数(1 次 API) limit_up_count = 0 members = tushare_client.get_ths_members(ts_code) if not members.empty and "con_code" in members.columns: member_codes = set(members["con_code"].tolist()) limit_up_count = len(limit_up_codes & member_codes) sectors.append(SectorInfo( sector_code=ts_code, sector_name=sector_name, pct_change=round(pct_change, 2), capital_inflow=round(capital_inflow, 2), limit_up_count=limit_up_count, days_continuous=days_continuous, )) if not sectors: return [] # ── 板块阶段判定 ── for s in sectors: if s.days_continuous <= 2: s.stage = "early" # 启动期,安全 elif s.days_continuous == 3: s.stage = "mid" # 发展期,正常 elif s.days_continuous == 4: s.stage = "late" # 后期,谨慎 else: s.stage = "end" # 尾声,高风险 # ── 综合评分 ── pct_scores = _normalize_score([s.pct_change for s in sectors]) cap_scores = _normalize_score([s.capital_inflow for s in sectors]) lim_scores = _normalize_score([float(s.limit_up_count) for s in sectors]) con_scores = _normalize_score([float(s.days_continuous) for s in sectors]) for i, s in enumerate(sectors): heat = ( pct_scores[i] * 0.25 + cap_scores[i] * 0.30 + lim_scores[i] * 0.25 + con_scores[i] * 0.20 ) # 连续2日以上资金流入的板块加分 if s.days_continuous >= 2: heat += 5 # 首日大涨(昨日涨幅<=0,今日>3%)可视为新热点,加分 if s.days_continuous == 1 and s.pct_change > 3: heat += 3 # 板块阶段调整:早期加分,尾声减分(防追高) if s.stage == "early": heat += 8 # 启动期,介入安全,大幅加分 elif s.stage == "late": heat -= 5 # 后期,风险上升 elif s.stage == "end": heat -= 12 # 尾声,大幅减分 s.heat_score = round(max(0, min(heat, 100)), 1) sectors.sort(key=lambda x: x.heat_score, reverse=True) top = sectors[:settings.top_sector_count] for s in top: logger.info(f"热门板块: {s.sector_name} 涨幅{s.pct_change}% 资金{s.capital_inflow}万 " f"涨停{s.limit_up_count} 连续{s.days_continuous}天 阶段={s.stage} 热度{s.heat_score}") return sectors