astock-agent/backend/app/api/sectors.py
2026-04-08 10:15:38 +08:00

98 lines
3.1 KiB
Python

"""板块分析 API"""
import logging
from fastapi import APIRouter
from app.config import is_trading_hours
from app.data.tushare_client import tushare_client
from app.data.tencent_client import get_realtime_quotes_batch
from app.engine.recommender import get_latest_sectors
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/sectors", tags=["sectors"])
async def _enrich_sectors_realtime(sectors_data: list[dict]) -> list[dict]:
"""盘中时,用腾讯实时行情补充板块涨幅和涨停数"""
if not is_trading_hours():
for s in sectors_data:
s["realtime_pct_change"] = None
s["realtime_limit_up_count"] = None
s["is_realtime"] = False
return sectors_data
# 收集所有板块的成分股代码
sector_members: dict[str, list[str]] = {}
all_codes: list[str] = []
for s in sectors_data:
code = s["sector_code"]
try:
df = tushare_client.get_ths_members(code)
members = df["con_code"].tolist() if not df.empty else []
except Exception:
members = []
sector_members[code] = members
all_codes.extend(members)
if not all_codes:
for s in sectors_data:
s["realtime_pct_change"] = None
s["realtime_limit_up_count"] = None
s["is_realtime"] = True
return sectors_data
# 批量获取实时报价
try:
quotes = await get_realtime_quotes_batch(all_codes)
except Exception:
logger.warning("获取板块实时行情失败,回退到日级数据")
for s in sectors_data:
s["realtime_pct_change"] = None
s["realtime_limit_up_count"] = None
s["is_realtime"] = False
return sectors_data
# 为每个板块计算实时指标
for s in sectors_data:
members = sector_members.get(s["sector_code"], [])
member_quotes = [quotes[c] for c in members if c in quotes]
if member_quotes:
pct_changes = [q.pct_chg for q in member_quotes]
s["realtime_pct_change"] = round(sum(pct_changes) / len(pct_changes), 2)
s["realtime_limit_up_count"] = sum(
1 for q in member_quotes
if q.limit_up and q.price >= q.limit_up * 0.995
)
else:
s["realtime_pct_change"] = None
s["realtime_limit_up_count"] = None
s["is_realtime"] = True
return sectors_data
@router.get("/hot")
async def get_hot_sectors(limit: int = 10):
"""获取热门板块排名(盘中自动补充实时数据)"""
sectors = await get_latest_sectors()
sectors_data = [
{
"sector_code": s.sector_code,
"sector_name": s.sector_name,
"pct_change": s.pct_change,
"capital_inflow": s.capital_inflow,
"limit_up_count": s.limit_up_count,
"days_continuous": s.days_continuous,
"heat_score": s.heat_score,
"stage": s.stage,
}
for s in sectors[:limit]
]
sectors_data = await _enrich_sectors_realtime(sectors_data)
return sectors_data