"""市场主题与个股/成分映射辅助。 解决两个问题: 1. 实时板块榜(东方财富行业/概念)与 Tushare 板块代码体系不一致。 2. 个股行业名、概念指数名与系统内部 MarketTheme 存在口径差异。 """ from __future__ import annotations import logging from app.data.models import SectorInfo from app.data.tushare_client import tushare_client from app.analysis.theme_mapper import resolve_theme logger = logging.getLogger(__name__) def normalize_sector_name(name: str) -> str: return ( (name or "") .replace("申万", "") .replace("同花顺", "") .replace("行业", "") .replace("板块", "") .replace("概念", "") .strip() ) def normalize_ts_code(code: str) -> str: raw = str(code or "").strip() if not raw: return "" if "." in raw: return raw if len(raw) >= 6 and raw[:6].isdigit(): symbol = raw[:6] return f"{symbol}.SH" if symbol.startswith("6") else f"{symbol}.SZ" return raw def sector_name_matches(left: str, right: str) -> bool: a = normalize_sector_name(left) b = normalize_sector_name(right) if not a or not b: return False if a == b: return True short, long = (a, b) if len(a) <= len(b) else (b, a) return short in long def sector_name_strict_match(left: str, right: str) -> bool: a = normalize_sector_name(left) b = normalize_sector_name(right) if not a or not b: return False return a == b def _theme_match_names(theme: SectorInfo) -> list[str]: return [ theme.sector_name, theme.theme_name, *(theme.theme_aliases or []), ] def find_hot_theme_match(name: str, hot_themes: list[SectorInfo]) -> SectorInfo | None: """把任意行业/概念/主题名称匹配到今日系统主题。""" if not name: return None resolved_id, resolved_name, resolved_aliases = resolve_theme(name) for theme in hot_themes: if resolved_id and theme.theme_id and resolved_id == theme.theme_id: return theme names = _theme_match_names(theme) if any(sector_name_strict_match(name, candidate) for candidate in names): return theme if any(sector_name_strict_match(resolved_name, candidate) for candidate in names): return theme if any( sector_name_strict_match(alias, candidate) for alias in resolved_aliases for candidate in names ): return theme return None def build_hot_theme_membership( hot_themes: list[SectorInfo], ) -> tuple[set[str], dict[str, str], dict[str, str], dict[str, int], set[str]]: """为今日主线主题构造成分股映射。 返回: - sector_member_codes: 所有成分股代码 - sector_code_map: ts_code -> 主题名 - sector_stage_map: 主题名 -> 阶段 - sector_rank_map: 主题名 -> 排名 - leader_codes: 领涨股代码 """ sector_member_codes: set[str] = set() sector_code_map: dict[str, str] = {} sector_stage_map: dict[str, str] = {} sector_rank_map: dict[str, int] = {} leader_codes: set[str] = set() stock_basic = tushare_client.get_stock_basic() industry_buckets: dict[str, list[str]] = {} if not stock_basic.empty and "industry" in stock_basic.columns: for _, row in stock_basic.iterrows(): code = str(row.get("ts_code") or "") industry = str(row.get("industry") or "") if not code or not industry: continue industry_buckets.setdefault(industry, []).append(code) concept_index = tushare_client.get_ths_index_list("N") industry_index = tushare_client.get_ths_index_list("I") concept_map = { str(row.get("name") or ""): str(row.get("ts_code") or "") for _, row in concept_index.iterrows() if row.get("name") and row.get("ts_code") } if not concept_index.empty else {} industry_index_map = { str(row.get("name") or ""): str(row.get("ts_code") or "") for _, row in industry_index.iterrows() if row.get("name") and row.get("ts_code") } if not industry_index.empty else {} concept_theme_ids = { "ai_compute", "robotics", "battery_lithium", "media_games", } for idx, theme in enumerate(hot_themes): theme_name = theme.theme_name or theme.sector_name theme_aliases = _theme_match_names(theme) sector_rank_map[theme_name] = idx + 1 sector_stage_map[theme_name] = theme.stage for leader in (theme.leading_stocks_realtime or theme.leading_stocks or []): leader_code = normalize_ts_code(str(leader.get("ts_code", "")).strip()) if leader_code: leader_codes.add(leader_code) sector_member_codes.add(leader_code) sector_code_map.setdefault(leader_code, theme_name) resolved_codes: set[str] = set() strict_industry_codes: set[str] = set() strict_concept_codes: set[str] = set() industry_codes: set[str] = set() concept_codes: set[str] = set() for industry_name, codes in industry_buckets.items(): if any(sector_name_strict_match(alias, industry_name) for alias in theme_aliases): strict_industry_codes.update(codes) industry_codes.update(codes) elif any(sector_name_matches(alias, industry_name) for alias in theme_aliases): resolved_codes.update(codes) industry_codes.update(codes) for index_name, index_code in industry_index_map.items(): if not any(sector_name_matches(alias, index_name) for alias in theme_aliases): continue members_df = tushare_client.get_ths_members(index_code) if not members_df.empty and "con_code" in members_df.columns: codes = {str(code) for code in members_df["con_code"].tolist() if code} industry_codes.update(codes) if any(sector_name_strict_match(alias, index_name) for alias in theme_aliases): strict_industry_codes.update(codes) else: resolved_codes.update(codes) for index_name, index_code in concept_map.items(): if not any(sector_name_matches(alias, index_name) for alias in theme_aliases): continue members_df = tushare_client.get_ths_members(index_code) if not members_df.empty and "con_code" in members_df.columns: codes = {str(code) for code in members_df["con_code"].tolist() if code} concept_codes.update(codes) if any(sector_name_strict_match(alias, index_name) for alias in theme_aliases): strict_concept_codes.update(codes) else: resolved_codes.update(codes) if industry_codes: final_codes = strict_industry_codes or industry_codes elif theme.theme_id in concept_theme_ids: final_codes = strict_concept_codes or concept_codes or resolved_codes else: final_codes = strict_industry_codes or strict_concept_codes or resolved_codes if not final_codes: logger.debug("主线主题未解析到成分股: %s aliases=%s", theme_name, theme_aliases) continue sector_member_codes.update(final_codes) for code in final_codes: sector_code_map.setdefault(code, theme_name) return sector_member_codes, sector_code_map, sector_stage_map, sector_rank_map, leader_codes