astock-agent/backend/app/analysis/sector_scanner.py
2026-04-16 14:16:02 +08:00

278 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""板块热度扫描
综合板块涨幅、资金净流入、涨停家数、持续性,
输出热门板块排名及深度分析。
优化策略:先用板块资金流向批量数据预筛 Top 板块,
只对 Top 板块做逐个详细查询ths_daily/ths_member
避免遍历全部数百个板块导致大量 API 调用。
增强分析:
- 领涨股每个板块中涨幅前3的成分股
- 资金趋势近5日板块资金净流入走势
- 涨跌趋势近5日板块涨跌幅走势
- 主力资金占比
"""
import logging
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from app.data.tushare_client import tushare_client
from app.data.models import SectorInfo
from app.config import settings
logger = logging.getLogger(__name__)
# 预筛阶段取 Top N 板块做详细查询
PRE_FILTER_COUNT = 30
def _normalize_score(values: list[float], reverse: bool = False) -> list[float]:
"""将数值列表归一化到 0-100"""
if not values:
return []
min_v, max_v = min(values), max(values)
if max_v == min_v:
return [50.0] * len(values)
normalized = [(v - min_v) / (max_v - min_v) * 100 for v in values]
if reverse:
normalized = [100 - v for v in normalized]
return normalized
def scan_hot_sectors(trade_date: str = None) -> list[SectorInfo]:
"""扫描热门板块,返回按热度排名的板块列表(含深度分析)"""
if not trade_date:
trade_date = tushare_client.get_latest_trade_date()
# ── 第一步用板块资金流向批量预筛1 次 API 调用)──
sector_mf = tushare_client.get_sector_moneyflow(trade_date)
if sector_mf.empty:
logger.warning("板块资金流向数据为空")
return []
# 按净流入金额排序,取 Top N 做详细分析
sector_mf = sector_mf.sort_values("net_amount", ascending=False)
top_mf = sector_mf.head(PRE_FILTER_COUNT)
top_codes = set(top_mf["ts_code"].tolist())
logger.info(f"板块资金流向预筛: {len(sector_mf)} 个板块 -> Top {len(top_codes)} 进入详细分析")
# 构建资金流向查找表
# Tushare moneyflow_ind_ths 的金额单位是亿元,统一转换为万元
_UNIT_CONV = 10000
mf_lookup = {}
# 同时构建主力买卖数据(用于计算主力占比)
mf_detail = {}
for _, row in sector_mf.iterrows():
mf_lookup[row["ts_code"]] = float(row["net_amount"]) * _UNIT_CONV
mf_detail[row["ts_code"]] = {
"net_amount": float(row["net_amount"]) * _UNIT_CONV,
"buy_elg_amount": float(row.get("buy_elg_amount", 0)) * _UNIT_CONV if pd.notna(row.get("buy_elg_amount")) else 0,
"sell_elg_amount": float(row.get("sell_elg_amount", 0)) * _UNIT_CONV if pd.notna(row.get("sell_elg_amount")) else 0,
"buy_lg_amount": float(row.get("buy_lg_amount", 0)) * _UNIT_CONV if pd.notna(row.get("buy_lg_amount")) else 0,
"sell_lg_amount": float(row.get("sell_lg_amount", 0)) * _UNIT_CONV if pd.notna(row.get("sell_lg_amount")) else 0,
}
# 构建板块名称查找表
name_lookup = {}
if "industry" in sector_mf.columns:
for _, r in sector_mf.iterrows():
if pd.notna(r.get("industry")):
name_lookup[r["ts_code"]] = str(r["industry"])
index_list = tushare_client.get_ths_index_list("I")
if not index_list.empty:
for _, r in index_list.iterrows():
if r["ts_code"] not in name_lookup:
name_lookup[r["ts_code"]] = r["name"]
# ── 第二步获取涨跌停列表1 次 API 调用)──
limit_df = tushare_client.get_limit_list(trade_date)
limit_up_codes = set()
# 同时收集涨停股的涨跌幅信息(用于领涨股展示)
limit_up_info: dict[str, dict] = {}
if not limit_df.empty:
up_df = limit_df[limit_df["limit"] == "U"]
up_df = up_df[~up_df["name"].str.contains("ST", na=False)]
limit_up_codes = set(up_df["ts_code"].tolist())
for _, row in up_df.iterrows():
limit_up_info[row["ts_code"]] = {
"name": row["name"],
"pct_chg": float(row.get("pct_chg", 10)),
"limit_times": int(row.get("limit_times", 1)),
}
# ── 第三步:获取全市场日线数据(用于领涨股计算)──
daily_all = tushare_client.get_daily_all(trade_date)
stock_basic = tushare_client.get_stock_basic()
# ── 第四步:只对 Top 板块做逐个详细查询 ──
sectors = []
for ts_code in top_codes:
sector_name = name_lookup.get(ts_code, ts_code)
# 板块日线 - 获取近5日数据
ths_daily = tushare_client.get_ths_daily(ts_code, days=5)
pct_change = 0.0
days_continuous = 0
pct_trend: list[float] = []
if not ths_daily.empty:
ths_daily = ths_daily.sort_values("trade_date")
# 当日涨跌幅
today_data = ths_daily[ths_daily["trade_date"] == trade_date]
if today_data.empty:
today_data = ths_daily.tail(1)
pct_change = float(today_data["pct_change"].iloc[0]) if not today_data.empty else 0
# 近5日涨跌幅趋势
pct_trend = [round(float(d["pct_change"]), 2) for _, d in ths_daily.iterrows()]
# 连续上涨天数
for _, d in ths_daily.iloc[::-1].iterrows():
if d["pct_change"] > 0:
days_continuous += 1
else:
break
# 板块资金净流入
capital_inflow = mf_lookup.get(ts_code, 0.0)
# 主力资金占比 = (特大单净买 + 大单净买) / (特大单买卖总额 + 大单买卖总额)
main_force_ratio = 0.0
detail = mf_detail.get(ts_code, {})
total_main_amount = (detail.get("buy_elg_amount", 0) + detail.get("sell_elg_amount", 0) +
detail.get("buy_lg_amount", 0) + detail.get("sell_lg_amount", 0))
if total_main_amount > 0:
main_force_ratio = round(capital_inflow / total_main_amount * 100, 1)
# 板块成分股分析
limit_up_count = 0
leading_stocks: list[dict] = []
member_count = 0
turnover_avg = 0.0
members = tushare_client.get_ths_members(ts_code)
if not members.empty and "con_code" in members.columns:
member_codes = list(members["con_code"].tolist())
member_set = set(member_codes)
member_count = len(member_codes)
limit_up_count = len(limit_up_codes & member_set)
# 领涨股从当日全市场日级数据中筛选该板块成分股按涨幅排序取前3
if not daily_all.empty:
sector_daily = daily_all[daily_all["ts_code"].isin(member_set)].copy()
# 排除 ST
if not stock_basic.empty:
st_set = set(stock_basic[stock_basic["name"].str.contains("ST", na=False)]["ts_code"])
sector_daily = sector_daily[~sector_daily["ts_code"].isin(st_set)]
sector_daily = sector_daily.sort_values("pct_chg", ascending=False)
# 计算板块平均换手率
if "turnover_rate" in sector_daily.columns:
turnover_values = sector_daily["turnover_rate"].dropna()
if len(turnover_values) > 0:
turnover_avg = round(float(turnover_values.mean()), 2)
# 构建名称查找
name_map = {}
if not stock_basic.empty:
for _, br in stock_basic.iterrows():
name_map[br["ts_code"]] = br["name"]
if "con_name" in members.columns:
for _, m in members.iterrows():
if pd.notna(m.get("con_name")):
name_map[m["con_code"]] = m["con_name"]
# 取涨幅前3
for _, sr in sector_daily.head(3).iterrows():
leading_stocks.append({
"ts_code": sr["ts_code"],
"name": name_map.get(sr["ts_code"], sr["ts_code"]),
"pct_chg": round(float(sr["pct_chg"]), 2),
"amount": round(float(sr.get("amount", 0)), 0),
})
# 涨停股也在成分股中的补充到领涨股如未在top3中
for code in (limit_up_codes & member_set):
if code not in [s["ts_code"] for s in leading_stocks]:
info = limit_up_info.get(code, {})
leading_stocks.append({
"ts_code": code,
"name": info.get("name", code),
"pct_chg": info.get("pct_chg", 10),
"amount": 0,
"limit_times": info.get("limit_times", 1),
})
sectors.append(SectorInfo(
sector_code=ts_code,
sector_name=sector_name,
pct_change=round(pct_change, 2),
capital_inflow=round(capital_inflow, 2),
limit_up_count=limit_up_count,
days_continuous=days_continuous,
member_count=member_count,
leading_stocks=leading_stocks,
pct_trend=pct_trend,
turnover_avg=turnover_avg,
main_force_ratio=main_force_ratio,
))
if not sectors:
return []
# ── 板块阶段判定(结合连续天数与累计涨幅)──
for s in sectors:
cumulative_pct = round(sum(s.pct_trend), 2)
if s.days_continuous <= 2 or (s.days_continuous <= 3 and cumulative_pct < 5):
s.stage = "early"
elif s.days_continuous == 3 and cumulative_pct >= 5 or (s.days_continuous == 4 and cumulative_pct < 8):
s.stage = "mid"
elif (s.days_continuous == 4 and cumulative_pct >= 8) or (s.days_continuous == 5 and cumulative_pct < 10):
s.stage = "late"
elif (s.days_continuous >= 5 and cumulative_pct >= 10) or s.days_continuous >= 6:
s.stage = "end"
# ── 综合评分 ──
pct_scores = _normalize_score([s.pct_change for s in sectors])
cap_scores = _normalize_score([s.capital_inflow for s in sectors])
lim_scores = _normalize_score([float(s.limit_up_count) for s in sectors])
con_scores = _normalize_score([float(s.days_continuous) for s in sectors])
for i, s in enumerate(sectors):
heat = (
pct_scores[i] * 0.25 +
cap_scores[i] * 0.30 +
lim_scores[i] * 0.25 +
con_scores[i] * 0.20
)
if s.days_continuous >= 2:
heat += 5
if s.days_continuous == 1 and s.pct_change > 3:
heat += 3
if s.stage == "early":
heat += 8
elif s.stage == "late":
heat -= 5
elif s.stage == "end":
heat -= 12
s.heat_score = round(max(0, min(heat, 100)), 1)
sectors.sort(key=lambda x: x.heat_score, reverse=True)
top = sectors[:settings.top_sector_count]
for s in top:
leaders = ", ".join(f'{l["name"]}({l["pct_chg"]}%)' for l in s.leading_stocks[:3])
inflow_display = f"{s.capital_inflow / 10000:.1f}亿" if s.capital_inflow >= 10000 else f"{s.capital_inflow:.0f}"
logger.info(f"热门板块: {s.sector_name} 涨幅{s.pct_change}% 资金{inflow_display} "
f"涨停{s.limit_up_count} 连续{s.days_continuous}天 阶段={s.stage} 热度{s.heat_score} "
f"领涨=[{leaders}]")
return sectors