"""个股资金流向分析 在热门板块成分股中,通过主力净流入、换手率、量比、市值 筛选被主力资金真正介入的个股。 """ import logging import pandas as pd from app.data.tushare_client import tushare_client from app.data import tencent_client from app.data.models import SectorInfo, CapitalFlow from app.config import settings from app.analysis.sector_alignment import build_hot_theme_membership logger = logging.getLogger(__name__) def _score_capital_flow( main_net_inflow: float, inflow_ratio: float, turnover_rate: float, volume_ratio: float | None, circ_mv: float | None, ) -> float: """资金维度评分,返回 0-100""" score = 0.0 # 主力净流入金额归一化评分 (35%) # 这里不做跨个股归一化,而是基于绝对值打分 if main_net_inflow > 10000: # >1亿 score += 35 elif main_net_inflow > 5000: # >5000万 score += 28 elif main_net_inflow > 2000: # >2000万 score += 22 elif main_net_inflow > 500: # >500万 score += 15 elif main_net_inflow > 0: score += 8 # 主力净流入占比 (20%) if inflow_ratio > 15: score += 20 elif inflow_ratio > 10: score += 16 elif inflow_ratio > 5: score += 12 elif inflow_ratio > 2: score += 8 elif inflow_ratio > 0: score += 4 # 换手率 (15%) - 3%-15% 最佳 if settings.min_turnover_rate <= turnover_rate <= settings.max_turnover_rate: # 5%-10% 最佳 if 5 <= turnover_rate <= 10: score += 15 else: score += 10 elif turnover_rate > 0: score += 3 # 量比 (15%) if volume_ratio is not None: if volume_ratio > 3.0: score += 15 elif volume_ratio > 2.0: score += 12 elif volume_ratio > 1.5: score += 9 elif volume_ratio > 1.0: score += 5 # 市值适配 (15%) if circ_mv is not None: mv_b = circ_mv # 已经是亿 if settings.min_circ_mv <= mv_b <= settings.max_circ_mv: if 100 <= mv_b <= 300: # 最佳区间 score += 15 else: score += 10 elif mv_b > 0: score += 3 return score def _score_valuation(pe: float | None, pb: float | None) -> float: """估值安全评分,返回 0-100 PE/PB 越低越安全(分数越高)。 过高估值会被大幅惩罚,避免追高买在估值泡沫中。 """ score = 0.0 # PE_TTM 评分 (60%) if pe is not None and pe > 0: if pe <= 15: score += 60 # 低估 elif pe <= 25: score += 50 # 合理偏低 elif pe <= 40: score += 35 # 合理 elif pe <= 60: score += 20 # 偏高 elif pe <= 100: score += 8 # 高估 else: score += 0 # 极度高估 elif pe is not None and pe < 0: score += 5 # 亏损股,低分 else: score += 30 # 无数据,给中间分 # PB 评分 (40%) if pb is not None and pb > 0: if pb <= 1.5: score += 40 # 破净或接近,极度低估 elif pb <= 3: score += 32 # 合理 elif pb <= 5: score += 22 # 偏高 elif pb <= 8: score += 12 # 高估 else: score += 4 # 极度高估 else: score += 20 # 无数据,给中间分 return score async def filter_stocks_by_capital( hot_sectors: list[SectorInfo], trade_date: str = None, ) -> list[dict]: """ 从热门板块中筛选资金流入个股。 返回: [{ts_code, name, sector, capital_score, turnover_rate, ...}] """ if not trade_date: trade_date = tushare_client.get_latest_trade_date() # 收集所有热门板块的成分股(去重) stock_sectors: dict[str, list[str]] = {} sector_member_codes, sector_code_map, _, _, _ = build_hot_theme_membership(hot_sectors) for code in sector_member_codes: if not code or "." not in str(code): continue stock_sectors.setdefault(code, []).append(sector_code_map.get(code, "")) if not stock_sectors: logger.warning("热门板块成分股为空") return [] logger.info(f"热门板块共 {len(stock_sectors)} 只成分股待筛选") # 获取股票基本信息(用于过滤 ST 和次新) stock_basic = tushare_client.get_stock_basic() st_codes = set() new_codes = set() if not stock_basic.empty: st_codes = set(stock_basic[stock_basic["name"].str.contains("ST", na=False)]["ts_code"]) # 上市不足 60 天 from datetime import datetime, timedelta cutoff = (datetime.now() - timedelta(days=settings.min_list_days)).strftime("%Y%m%d") new_codes = set(stock_basic[stock_basic["list_date"] > cutoff]["ts_code"]) # 获取当日全市场资金流向 mf_df = tushare_client.get_moneyflow_batch(trade_date) # 获取当日 daily_basic(换手率、市值等) basic_df = tushare_client.get_daily_basic(trade_date) results = [] for ts_code, sector_names in stock_sectors.items(): # 过滤 ST 和次新 if ts_code in st_codes or ts_code in new_codes: continue # 资金流向 main_net = 0.0 inflow_ratio = 0.0 if not mf_df.empty: row = mf_df[mf_df["ts_code"] == ts_code] if not row.empty: r = row.iloc[0] main_net = ( (r.get("buy_elg_amount", 0) or 0) - (r.get("sell_elg_amount", 0) or 0) + (r.get("buy_lg_amount", 0) or 0) - (r.get("sell_lg_amount", 0) or 0) ) total = ( (r.get("buy_elg_amount", 0) or 0) + (r.get("sell_elg_amount", 0) or 0) + (r.get("buy_lg_amount", 0) or 0) + (r.get("sell_lg_amount", 0) or 0) + (r.get("buy_md_amount", 0) or 0) + (r.get("sell_md_amount", 0) or 0) + (r.get("buy_sm_amount", 0) or 0) + (r.get("sell_sm_amount", 0) or 0) ) if total > 0: inflow_ratio = main_net / total * 100 # 硬性条件:主力净流入 > 0 if main_net <= 0: continue # 换手率和市值 turnover_rate = 0.0 volume_ratio = None circ_mv = None pe = None pb = None if not basic_df.empty: b_row = basic_df[basic_df["ts_code"] == ts_code] if not b_row.empty: b = b_row.iloc[0] turnover_rate = float(b.get("turnover_rate", 0) or 0) volume_ratio = float(b["volume_ratio"]) if pd.notna(b.get("volume_ratio")) else None circ_mv_raw = b.get("circ_mv", 0) or 0 circ_mv = float(circ_mv_raw) / 10000 # Tushare 单位是万元,转为亿 pe = float(b["pe_ttm"]) if pd.notna(b.get("pe_ttm")) else None pb = float(b["pb"]) if pd.notna(b.get("pb")) else None # 换手率硬性过滤 if turnover_rate < settings.min_turnover_rate or turnover_rate > settings.max_turnover_rate: continue # 市值硬性过滤 if circ_mv is not None and (circ_mv < settings.min_circ_mv or circ_mv > settings.max_circ_mv): continue capital_score = _score_capital_flow( main_net_inflow=main_net, inflow_ratio=inflow_ratio, turnover_rate=turnover_rate, volume_ratio=volume_ratio, circ_mv=circ_mv, ) valuation_score = _score_valuation(pe, pb) # 获取股票名称 name = ts_code if not stock_basic.empty: name_row = stock_basic[stock_basic["ts_code"] == ts_code] if not name_row.empty: name = name_row.iloc[0]["name"] results.append({ "ts_code": ts_code, "name": name, "sector": sector_names[0], # 取第一个板块 "sectors": sector_names, "main_net_inflow": round(main_net, 2), "inflow_ratio": round(inflow_ratio, 2), "turnover_rate": round(turnover_rate, 2), "volume_ratio": volume_ratio, "circ_mv": round(circ_mv, 2) if circ_mv else None, "pe": round(pe, 2) if pe else None, "pb": round(pb, 2) if pb else None, "capital_score": round(capital_score, 1), "valuation_score": round(valuation_score, 1), }) # 按资金评分排序 results.sort(key=lambda x: x["capital_score"], reverse=True) top = results[:settings.top_stock_count] for r in top: logger.info(f"资金筛选: {r['name']}({r['ts_code']}) 板块={r['sector']} " f"主力净流入={r['main_net_inflow']}万 评分={r['capital_score']}") return top