astock-agent/backend/app/analysis/market_temp.py
2026-06-10 08:36:25 +08:00

225 lines
8.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""市场温度计
综合涨跌家数、涨跌停数、连板高度、炸板率、指数位置,
输出 0-100 的市场温度值。
"""
import logging
import pandas as pd
import numpy as np
from app.data import tencent_client
from app.data.market_breadth_client import get_market_breadth
from app.data.tushare_client import tushare_client
from app.data.models import MarketTemperature
logger = logging.getLogger(__name__)
def calculate_market_temperature(trade_date: str = None) -> MarketTemperature:
"""计算指定交易日的市场温度
盘中 Tushare 当日数据可能不完整limit_list_d 只有盘后数据),
因此涨停数据先尝试当日,若为空则用前一日数据作为基线。
"""
if not trade_date:
trade_date = tushare_client.get_latest_trade_date()
# 1. 涨跌家数
daily_df = tushare_client.get_daily_all(trade_date)
if daily_df.empty:
original_trade_date = trade_date
trade_dates = tushare_client.get_trade_dates()
fallback_dates = [d for d in trade_dates if d < trade_date][-5:]
for fallback_date in reversed(fallback_dates):
fallback_df = tushare_client.get_daily_all(fallback_date)
if not fallback_df.empty:
daily_df = fallback_df
trade_date = fallback_date
logger.warning(
"市场温度 %s 日线数据为空,回退到最近有效交易日 %s",
original_trade_date,
fallback_date,
)
break
if daily_df.empty:
logger.warning("市场温度 %s 无有效日线数据,返回占位温度", trade_date)
return MarketTemperature(trade_date=trade_date, temperature=50)
up_count = len(daily_df[daily_df["pct_chg"] > 0])
down_count = len(daily_df[daily_df["pct_chg"] < 0])
# 2. 涨跌停数据 — 先尝试当日,若为空则用前一日
limit_df = tushare_client.get_limit_list(trade_date)
if limit_df.empty:
# 当日无涨跌停数据(盘中),用前一日作为基线
prev_dates = tushare_client.get_trade_dates()
prev_dates = [d for d in prev_dates if d < trade_date]
if prev_dates:
prev_date = prev_dates[-1]
limit_df = tushare_client.get_limit_list(prev_date)
logger.info(f"当日涨跌停数据为空,使用前一日 {prev_date} 作为基线")
limit_up_count = 0
limit_down_count = 0
max_streak = 0
broken_rate = 0.0
if not limit_df.empty:
# limit: U=涨停, D=跌停
up_df = limit_df[limit_df["limit"] == "U"]
down_df = limit_df[limit_df["limit"] == "D"]
# 排除 ST名称含 ST
up_df = up_df[~up_df["name"].str.contains("ST", na=False)]
down_df = down_df[~down_df["name"].str.contains("ST", na=False)]
# 排除一字板open_times == 0 表示全天封板未开板,不算自然涨停...
# 实际上 open_times 是打开次数0 表示一直封住,属于强势
# 这里我们统计所有非 ST 涨停数作为情绪指标)
limit_up_count = len(up_df)
limit_down_count = len(down_df)
# 连板高度limit_times 字段
if "limit_times" in up_df.columns and not up_df.empty:
max_streak = int(up_df["limit_times"].max())
# 炸板率:曾经涨停但未封住的 = up_stat 中 "炸板" 的记录
# open_times > 0 的算曾经打开过涨停板,如果最终未涨停就是炸板
# 但 limit_list_d 只返回最终涨停的... 需要另一个思路
# 用全市场日线数据:最高价触及涨停但收盘未涨停
if not daily_df.empty:
# 涨停价 = pre_close * 1.1(普通股),容差 0.01
daily_df["limit_price"] = daily_df["pre_close"] * 1.1
touched_limit = daily_df[
(daily_df["high"] >= daily_df["limit_price"] - 0.01) &
(daily_df["pct_chg"] < 9.9) # 触及涨停但收盘未涨停
]
total_touched = len(touched_limit) + limit_up_count
if total_touched > 0:
broken_rate = len(touched_limit) / total_touched * 100
# 3. 上证指数位置
index_df = tushare_client.get_index_daily("000001.SH", days=30)
index_above_ma20 = False
if not index_df.empty:
index_df = index_df.sort_values("trade_date")
if len(index_df) >= 20:
ma20 = index_df["close"].tail(20).mean()
latest_close = index_df["close"].iloc[-1]
index_above_ma20 = latest_close > ma20
# ── 综合评分 ──
score = 0.0
# 涨跌家数比 (0-25分)
ratio = up_count / max(down_count, 1)
if ratio > 2.0:
score += 25
elif ratio > 1.5:
score += 20
elif ratio > 1.0:
score += 15
elif ratio > 0.7:
score += 8
else:
score += 0
# 涨停数 (0-25分)
if limit_up_count > 80:
score += 25
elif limit_up_count > 50:
score += 20
elif limit_up_count > 30:
score += 15
elif limit_up_count > 15:
score += 8
else:
score += 0
# 连板高度 (0-20分)
if max_streak >= 7:
score += 20
elif max_streak >= 5:
score += 15
elif max_streak >= 3:
score += 10
elif max_streak >= 2:
score += 5
# 炸板率 (0-15分) - 炸板率越低越好
if broken_rate < 20:
score += 15
elif broken_rate < 30:
score += 12
elif broken_rate < 50:
score += 6
else:
score += 0
# 指数位置 (0-15分)
if index_above_ma20:
score += 15
else:
score += 3
temperature = min(max(score, 0), 100)
result = MarketTemperature(
trade_date=trade_date,
up_count=up_count,
down_count=down_count,
limit_up_count=limit_up_count,
limit_down_count=limit_down_count,
max_streak=max_streak,
broken_rate=round(broken_rate, 1),
index_above_ma20=index_above_ma20,
temperature=round(temperature, 1),
source="tushare_daily",
data_status="fresh",
source_detail=f"daily={trade_date}",
limit_counts_reliable=not limit_df.empty,
)
logger.info(f"市场温度 {trade_date}: {temperature:.1f} (涨{up_count}/跌{down_count}, 涨停{limit_up_count}, 连板{max_streak})")
return result
async def build_realtime_market_temperature(
baseline: MarketTemperature | None = None,
) -> tuple[MarketTemperature, bool]:
"""基于统一市场广度客户端构建实时市场温度。"""
baseline = baseline or calculate_market_temperature()
breadth = await get_market_breadth()
if not breadth.reliable:
return baseline, False
index_data = await tencent_client.get_index_realtime()
sh_index = index_data.get("000001.SH")
pct = sh_index.get("pct_chg", 0) if sh_index else 0
ratio = breadth.up_count / max(breadth.down_count, 1)
# 实时口径里,涨跌停池可靠时用专门池;池接口失败时用全市场行情阈值估算。
# 估算口径权重略低,并在研究报告中标记为降级,避免悄悄污染判断。
temp_from_ratio = min(ratio / 3.0 * 40, 40)
temp_from_limit_up = (
min(breadth.limit_up_count / 3, 10)
if breadth.limit_counts_reliable
else min(breadth.limit_up_count / 5, 8)
)
temp_from_index = min(max(pct * 10 + 15, 0), 30)
baseline.trade_date = breadth.trade_date
baseline.up_count = breadth.up_count
baseline.down_count = breadth.down_count
baseline.limit_up_count = breadth.limit_up_count
baseline.limit_down_count = breadth.limit_down_count
baseline.temperature = round(min(max(temp_from_ratio + temp_from_limit_up + temp_from_index + 20, 0), 100), 1)
baseline.index_above_ma20 = pct > 0 if sh_index else baseline.index_above_ma20
baseline.source = breadth.source
baseline.source_detail = breadth.source
baseline.limit_counts_reliable = breadth.limit_counts_reliable
baseline.data_status = "fresh" if breadth.limit_counts_reliable else "estimated"
return baseline, True