astock-agent/backend/app/analysis/theme_mapper.py
2026-05-14 11:10:17 +08:00

195 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""市场主线主题归一层。
不同数据源的板块体系不一致:
- 东方财富行业/概念
- Tushare/同花顺行业/概念
- 股票基础行业
这里先用可维护的 alias 规则把它们归一成系统自己的 MarketTheme。
后续可以把 alias 挪到数据库或配置,并让 LLM 根据复盘结果辅助维护。
"""
from __future__ import annotations
import re
from app.data.models import SectorInfo
THEME_ALIASES: dict[str, list[str]] = {
"power_energy": ["公用事业", "电力", "绿色电力", "风力发电", "水电", "火电", "核电", "电力行业"],
"baijiu_consumer": ["白酒", "白酒Ⅱ", "酿酒", "酿酒概念", "食品饮料", "啤酒", "乳业"],
"ship_military": ["航海装备", "航海装备Ⅱ", "船舶", "船舶制造", "国防军工", "军工"],
"coal_energy": ["煤炭", "煤炭行业", "煤炭概念", "煤炭开采加工", "焦炭", "煤化工"],
"ai_compute": ["人工智能", "算力", "服务器", "数据中心", "CPO", "液冷", "东数西算"],
"semiconductor": ["半导体", "芯片", "集成电路", "先进封装", "存储芯片", "光刻机"],
"robotics": ["机器人", "人形机器人", "减速器", "工业母机", "自动化设备", "专用机械"],
"auto_chain": ["汽车", "汽车零部件", "汽车配件", "新能源车", "智能驾驶", "无人驾驶"],
"battery_lithium": ["锂电池", "固态电池", "钠离子电池"],
"nonferrous_metals": ["有色金属", "小金属", "贵金属", "黄金"],
"media_games": ["传媒", "影视", "影视音像", "游戏", "短剧", "文化传媒"],
"tourism_services": ["旅游", "酒店", "景点", "餐饮", "免税"],
"chemical_materials": ["化工", "化学制品", "化工原料", "氟化工", "磷化工"],
"medicine_health": ["医药", "医疗", "创新药", "中药", "医疗器械", "生物制品"],
}
THEME_NAMES: dict[str, str] = {
"power_energy": "电力能源",
"baijiu_consumer": "白酒消费",
"ship_military": "船舶军工",
"coal_energy": "煤炭能源",
"ai_compute": "AI算力",
"semiconductor": "半导体",
"robotics": "机器人装备",
"auto_chain": "汽车产业链",
"battery_lithium": "锂电池",
"nonferrous_metals": "有色金属",
"media_games": "传媒游戏",
"tourism_services": "旅游服务",
"chemical_materials": "化工材料",
"medicine_health": "医药健康",
}
EXCLUDED_THEME_KEYWORDS = (
"昨日",
"连板",
"首板",
"炸板",
"涨停",
"跌停",
"情绪",
"微盘",
"昨日触板",
"昨日涨停",
"昨日首板",
)
def _unique_ordered(items: list[str]) -> list[str]:
seen: set[str] = set()
result: list[str] = []
for item in items:
value = str(item or "").strip()
if not value or value in seen:
continue
seen.add(value)
result.append(value)
return result
def _clean_name(name: str) -> str:
cleaned = (name or "")
cleaned = cleaned.replace("行业", "").replace("板块", "").replace("概念", "")
cleaned = cleaned.replace("", "").replace("", "").replace("", "").replace("IV", "")
return re.sub(r"[\s_\-]+", "", cleaned)
def resolve_theme(name: str) -> tuple[str, str, list[str]]:
clean = _clean_name(name)
for theme_id, aliases in THEME_ALIASES.items():
for alias in aliases:
alias_clean = _clean_name(alias)
if clean == alias_clean:
return theme_id, THEME_NAMES[theme_id], aliases
fallback_id = f"raw_{clean}" if clean else "unknown"
return fallback_id, name or "未归一主题", [name] if name else []
def apply_theme(sector: SectorInfo) -> SectorInfo:
theme_id, theme_name, aliases = resolve_theme(sector.sector_name)
sector.theme_id = theme_id
sector.theme_name = theme_name
sector.theme_aliases = [sector.sector_name] if sector.sector_name else []
return sector
def is_excluded_theme_name(name: str) -> bool:
value = str(name or "").strip()
return any(keyword in value for keyword in EXCLUDED_THEME_KEYWORDS)
def is_valid_theme(sector: SectorInfo) -> bool:
if not sector.theme_id:
return False
if sector.theme_id.startswith("raw_"):
return False
return not is_excluded_theme_name(sector.sector_name)
def merge_sectors_to_themes(sectors: list[SectorInfo], limit: int = 20) -> list[SectorInfo]:
"""把行业/概念板块合并成主题级列表。
返回仍使用 SectorInfo便于兼容现有推荐链路但 sector_name 会变成主题名,
theme_aliases 里保留原始 alias 关系leading_stocks 合并去重。
"""
grouped: dict[str, SectorInfo] = {}
for raw in sectors:
if is_excluded_theme_name(raw.sector_name):
continue
sector = apply_theme(raw)
if not is_valid_theme(sector):
continue
key = sector.theme_id or sector.sector_name
pct = float(sector.realtime_pct_change if sector.realtime_pct_change is not None else sector.pct_change)
existing = grouped.get(key)
if existing is None:
sector.sector_name = sector.theme_name or sector.sector_name
sector.board_type = "theme"
sector.sector_code = sector.theme_id or sector.sector_code
sector.theme_aliases = _unique_ordered([raw.sector_name])
grouped[key] = sector
continue
existing_pct = float(existing.realtime_pct_change if existing.realtime_pct_change is not None else existing.pct_change)
existing.realtime_pct_change = max(existing_pct, pct) if existing.realtime_pct_change is not None or sector.realtime_pct_change is not None else None
existing.pct_change = max(existing.pct_change, sector.pct_change)
existing.heat_score = max(existing.heat_score, sector.heat_score)
existing.capital_inflow += sector.capital_inflow
existing.limit_up_count += sector.limit_up_count
existing.member_count += sector.member_count
existing.days_continuous = max(existing.days_continuous, sector.days_continuous)
existing.turnover_avg = max(existing.turnover_avg, sector.turnover_avg)
existing.main_force_ratio = max(existing.main_force_ratio, sector.main_force_ratio)
existing.realtime_amount = (existing.realtime_amount or 0) + (sector.realtime_amount or 0)
existing.realtime_up_count = (existing.realtime_up_count or 0) + (sector.realtime_up_count or 0)
existing.realtime_down_count = (existing.realtime_down_count or 0) + (sector.realtime_down_count or 0)
existing.theme_aliases = _unique_ordered([*(existing.theme_aliases or []), raw.sector_name])
existing.is_realtime = existing.is_realtime or sector.is_realtime
if existing.data_mode == "daily_snapshot" and sector.data_mode != "daily_snapshot":
existing.data_mode = sector.data_mode
if existing.data_status != "fresh" and sector.data_status == "fresh":
existing.data_status = "fresh"
existing.source_detail = sector.source_detail
elif existing.data_status == "fresh" and sector.data_status != "fresh":
pass
elif existing.data_status != sector.data_status:
existing.data_status = "mixed"
existing.source_detail = "mixed_status"
if existing.source != sector.source:
existing.source = "mixed"
leaders = {}
for item in (existing.leading_stocks_realtime or existing.leading_stocks or []):
if item.get("ts_code"):
leaders[item["ts_code"]] = item
for item in (sector.leading_stocks_realtime or sector.leading_stocks or []):
if item.get("ts_code"):
leaders[item["ts_code"]] = item
merged_leaders = sorted(leaders.values(), key=lambda item: float(item.get("pct_chg", 0) or 0), reverse=True)[:5]
existing.leading_stocks_realtime = merged_leaders
existing.leading_stocks = merged_leaders
if existing.stage == "end" or (existing.stage == "late" and sector.stage in ("early", "mid")):
existing.stage = sector.stage
result = list(grouped.values())
result.sort(
key=lambda item: (
float(item.realtime_pct_change if item.realtime_pct_change is not None else item.pct_change),
float(item.realtime_amount or 0),
),
reverse=True,
)
return result[:limit]