"""市场主线主题归一层。 不同数据源的板块体系不一致: - 东方财富行业/概念 - Tushare/同花顺行业/概念 - 股票基础行业 这里先用可维护的 alias 规则把它们归一成系统自己的 MarketTheme。 后续可以把 alias 挪到数据库或配置,并让 LLM 根据复盘结果辅助维护。 """ from __future__ import annotations import re from app.data.models import SectorInfo THEME_ALIASES: dict[str, list[str]] = { "power_energy": ["公用事业", "电力", "绿色电力", "风力发电", "水电", "火电", "核电", "电力行业"], "baijiu_consumer": ["白酒", "白酒Ⅱ", "酿酒", "酿酒概念", "食品饮料", "啤酒", "乳业"], "ship_military": ["航海装备", "航海装备Ⅱ", "船舶", "船舶制造", "国防军工", "军工"], "coal_energy": ["煤炭", "煤炭行业", "煤炭概念", "煤炭开采加工", "焦炭", "煤化工"], "ai_compute": ["人工智能", "算力", "服务器", "数据中心", "CPO", "液冷", "东数西算"], "semiconductor": ["半导体", "芯片", "集成电路", "先进封装", "存储芯片", "光刻机"], "robotics": ["机器人", "人形机器人", "减速器", "工业母机", "自动化设备", "专用机械"], "auto_chain": ["汽车", "汽车零部件", "汽车配件", "新能源车", "智能驾驶", "无人驾驶"], "battery_lithium": ["锂电池", "固态电池", "钠离子电池"], "nonferrous_metals": ["有色金属", "小金属", "贵金属", "黄金"], "media_games": ["传媒", "影视", "影视音像", "游戏", "短剧", "文化传媒"], "tourism_services": ["旅游", "酒店", "景点", "餐饮", "免税"], "chemical_materials": ["化工", "化学制品", "化工原料", "氟化工", "磷化工"], "medicine_health": ["医药", "医疗", "创新药", "中药", "医疗器械", "生物制品"], } THEME_NAMES: dict[str, str] = { "power_energy": "电力能源", "baijiu_consumer": "白酒消费", "ship_military": "船舶军工", "coal_energy": "煤炭能源", "ai_compute": "AI算力", "semiconductor": "半导体", "robotics": "机器人装备", "auto_chain": "汽车产业链", "battery_lithium": "锂电池", "nonferrous_metals": "有色金属", "media_games": "传媒游戏", "tourism_services": "旅游服务", "chemical_materials": "化工材料", "medicine_health": "医药健康", } EXCLUDED_THEME_KEYWORDS = ( "昨日", "连板", "首板", "炸板", "涨停", "跌停", "情绪", "微盘", "昨日触板", "昨日涨停", "昨日首板", ) def _unique_ordered(items: list[str]) -> list[str]: seen: set[str] = set() result: list[str] = [] for item in items: value = str(item or "").strip() if not value or value in seen: continue seen.add(value) result.append(value) return result def _clean_name(name: str) -> str: cleaned = (name or "") cleaned = cleaned.replace("行业", "").replace("板块", "").replace("概念", "") cleaned = cleaned.replace("Ⅰ", "").replace("Ⅱ", "").replace("Ⅲ", "").replace("IV", "") return re.sub(r"[\s_\-]+", "", cleaned) def resolve_theme(name: str) -> tuple[str, str, list[str]]: clean = _clean_name(name) for theme_id, aliases in THEME_ALIASES.items(): for alias in aliases: alias_clean = _clean_name(alias) if clean == alias_clean: return theme_id, THEME_NAMES[theme_id], aliases fallback_id = f"raw_{clean}" if clean else "unknown" return fallback_id, name or "未归一主题", [name] if name else [] def apply_theme(sector: SectorInfo) -> SectorInfo: theme_id, theme_name, aliases = resolve_theme(sector.sector_name) sector.theme_id = theme_id sector.theme_name = theme_name sector.theme_aliases = [sector.sector_name] if sector.sector_name else [] return sector def is_excluded_theme_name(name: str) -> bool: value = str(name or "").strip() return any(keyword in value for keyword in EXCLUDED_THEME_KEYWORDS) def is_valid_theme(sector: SectorInfo) -> bool: if not sector.theme_id: return False if sector.theme_id.startswith("raw_"): return False return not is_excluded_theme_name(sector.sector_name) def merge_sectors_to_themes(sectors: list[SectorInfo], limit: int = 20) -> list[SectorInfo]: """把行业/概念板块合并成主题级列表。 返回仍使用 SectorInfo,便于兼容现有推荐链路;但 sector_name 会变成主题名, theme_aliases 里保留原始 alias 关系,leading_stocks 合并去重。 """ grouped: dict[str, SectorInfo] = {} for raw in sectors: if is_excluded_theme_name(raw.sector_name): continue sector = apply_theme(raw) if not is_valid_theme(sector): continue key = sector.theme_id or sector.sector_name pct = float(sector.realtime_pct_change if sector.realtime_pct_change is not None else sector.pct_change) existing = grouped.get(key) if existing is None: sector.sector_name = sector.theme_name or sector.sector_name sector.board_type = "theme" sector.sector_code = sector.theme_id or sector.sector_code sector.theme_aliases = _unique_ordered([raw.sector_name]) grouped[key] = sector continue existing_pct = float(existing.realtime_pct_change if existing.realtime_pct_change is not None else existing.pct_change) existing.realtime_pct_change = max(existing_pct, pct) if existing.realtime_pct_change is not None or sector.realtime_pct_change is not None else None existing.pct_change = max(existing.pct_change, sector.pct_change) existing.heat_score = max(existing.heat_score, sector.heat_score) existing.capital_inflow += sector.capital_inflow existing.limit_up_count += sector.limit_up_count existing.member_count += sector.member_count existing.days_continuous = max(existing.days_continuous, sector.days_continuous) existing.turnover_avg = max(existing.turnover_avg, sector.turnover_avg) existing.main_force_ratio = max(existing.main_force_ratio, sector.main_force_ratio) existing.realtime_amount = (existing.realtime_amount or 0) + (sector.realtime_amount or 0) existing.realtime_up_count = (existing.realtime_up_count or 0) + (sector.realtime_up_count or 0) existing.realtime_down_count = (existing.realtime_down_count or 0) + (sector.realtime_down_count or 0) existing.theme_aliases = _unique_ordered([*(existing.theme_aliases or []), raw.sector_name]) existing.is_realtime = existing.is_realtime or sector.is_realtime if existing.data_mode == "daily_snapshot" and sector.data_mode != "daily_snapshot": existing.data_mode = sector.data_mode if existing.data_status != "fresh" and sector.data_status == "fresh": existing.data_status = "fresh" existing.source_detail = sector.source_detail elif existing.data_status == "fresh" and sector.data_status != "fresh": pass elif existing.data_status != sector.data_status: existing.data_status = "mixed" existing.source_detail = "mixed_status" if existing.source != sector.source: existing.source = "mixed" leaders = {} for item in (existing.leading_stocks_realtime or existing.leading_stocks or []): if item.get("ts_code"): leaders[item["ts_code"]] = item for item in (sector.leading_stocks_realtime or sector.leading_stocks or []): if item.get("ts_code"): leaders[item["ts_code"]] = item merged_leaders = sorted(leaders.values(), key=lambda item: float(item.get("pct_chg", 0) or 0), reverse=True)[:5] existing.leading_stocks_realtime = merged_leaders existing.leading_stocks = merged_leaders if existing.stage == "end" or (existing.stage == "late" and sector.stage in ("early", "mid")): existing.stage = sector.stage result = list(grouped.values()) result.sort( key=lambda item: ( float(item.realtime_pct_change if item.realtime_pct_change is not None else item.pct_change), float(item.realtime_amount or 0), ), reverse=True, ) return result[:limit]