astock-agent/backend/app/analysis/sector_realtime.py
2026-04-22 11:56:23 +08:00

94 lines
3.6 KiB
Python

"""板块实时数据富化。
复用东方财富板块实时排名,为数据库中的板块快照补充今日实时涨幅、广度和成交额。
不触发扫描,只做轻量覆盖。
"""
import logging
from app.config import should_prefer_realtime_today
from app.data.eastmoney_client import get_sector_realtime_ranking
from app.data.models import SectorInfo
from app.data.tushare_client import tushare_client
logger = logging.getLogger(__name__)
def _match_sector_name(em_name: str, ts_name: str) -> bool:
"""东方财富板块名与 Tushare 板块名模糊匹配。"""
em_clean = em_name.rstrip("行业").rstrip("板块").rstrip("概念").strip()
ts_clean = ts_name.rstrip("行业").rstrip("板块").rstrip("概念").strip()
if em_clean == ts_clean:
return True
short, long = (em_clean, ts_clean) if len(em_clean) <= len(ts_clean) else (ts_clean, em_clean)
return short in long
def _apply_empty_overlay(sector: SectorInfo) -> SectorInfo:
sector.realtime_pct_change = None
sector.realtime_limit_up_count = None
sector.realtime_amount = None
sector.realtime_turnover_rate = None
sector.realtime_up_count = None
sector.realtime_down_count = None
sector.leading_stocks_realtime = []
sector.is_realtime = False
sector.data_mode = "daily_snapshot"
return sector
async def enrich_sectors_with_realtime(sectors: list[SectorInfo]) -> list[SectorInfo]:
"""按需为板块快照追加实时字段并重排。"""
if not sectors:
return sectors
latest_trade_date = sectors[0].trade_date or tushare_client.get_latest_trade_date()
if not should_prefer_realtime_today(latest_trade_date):
return [_apply_empty_overlay(sector) for sector in sectors]
try:
em_sectors = await get_sector_realtime_ranking()
except Exception:
logger.warning("东方财富板块实时数据获取失败,回退到日级快照")
return [_apply_empty_overlay(sector) for sector in sectors]
if not em_sectors:
return [_apply_empty_overlay(sector) for sector in sectors]
em_name_map = {item["sector_name"]: item for item in em_sectors}
matched = 0
for sector in sectors:
em_data = em_name_map.get(sector.sector_name)
if not em_data:
for item in em_sectors:
if _match_sector_name(item["sector_name"], sector.sector_name):
em_data = item
break
if not em_data:
_apply_empty_overlay(sector)
continue
matched += 1
sector.realtime_pct_change = float(em_data.get("pct_change", 0) or 0)
sector.realtime_limit_up_count = None
sector.realtime_amount = round(float(em_data.get("amount", 0) or 0) / 10000, 2)
sector.realtime_turnover_rate = float(em_data.get("turnover_rate", 0) or 0)
sector.realtime_up_count = int(em_data.get("up_count", 0) or 0)
sector.realtime_down_count = int(em_data.get("down_count", 0) or 0)
sector.leading_stocks_realtime = []
if em_data.get("leading_stock_name"):
sector.leading_stocks_realtime = [{
"ts_code": em_data.get("leading_stock_code", ""),
"name": em_data.get("leading_stock_name", ""),
"pct_chg": float(em_data.get("leading_stock_pct", 0) or 0),
"amount": 0,
}]
sector.is_realtime = True
sector.data_mode = "realtime_overlay"
logger.info("板块实时覆盖: %s/%s 匹配成功", matched, len(sectors))
sectors.sort(key=lambda s: (s.realtime_pct_change if s.realtime_pct_change is not None else s.pct_change), reverse=True)
return sectors