astock-agent/backend/app/analysis/market_temp.py
2026-04-23 17:36:07 +08:00

181 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""市场温度计
综合涨跌家数、涨跌停数、连板高度、炸板率、指数位置,
输出 0-100 的市场温度值。
"""
import logging
import pandas as pd
import numpy as np
from app.data.tushare_client import tushare_client
from app.data.models import MarketTemperature
logger = logging.getLogger(__name__)
def calculate_market_temperature(trade_date: str = None) -> MarketTemperature:
"""计算指定交易日的市场温度
盘中 Tushare 当日数据可能不完整limit_list_d 只有盘后数据),
因此涨停数据先尝试当日,若为空则用前一日数据作为基线。
"""
if not trade_date:
trade_date = tushare_client.get_latest_trade_date()
# 1. 涨跌家数
daily_df = tushare_client.get_daily_all(trade_date)
if daily_df.empty:
original_trade_date = trade_date
trade_dates = tushare_client.get_trade_dates()
fallback_dates = [d for d in trade_dates if d < trade_date][-5:]
for fallback_date in reversed(fallback_dates):
fallback_df = tushare_client.get_daily_all(fallback_date)
if not fallback_df.empty:
daily_df = fallback_df
trade_date = fallback_date
logger.warning(
"市场温度 %s 日线数据为空,回退到最近有效交易日 %s",
original_trade_date,
fallback_date,
)
break
if daily_df.empty:
logger.warning("市场温度 %s 无有效日线数据,返回占位温度", trade_date)
return MarketTemperature(trade_date=trade_date, temperature=50)
up_count = len(daily_df[daily_df["pct_chg"] > 0])
down_count = len(daily_df[daily_df["pct_chg"] < 0])
# 2. 涨跌停数据 — 先尝试当日,若为空则用前一日
limit_df = tushare_client.get_limit_list(trade_date)
if limit_df.empty:
# 当日无涨跌停数据(盘中),用前一日作为基线
prev_dates = tushare_client.get_trade_dates()
prev_dates = [d for d in prev_dates if d < trade_date]
if prev_dates:
prev_date = prev_dates[-1]
limit_df = tushare_client.get_limit_list(prev_date)
logger.info(f"当日涨跌停数据为空,使用前一日 {prev_date} 作为基线")
limit_up_count = 0
limit_down_count = 0
max_streak = 0
broken_rate = 0.0
if not limit_df.empty:
# limit: U=涨停, D=跌停
up_df = limit_df[limit_df["limit"] == "U"]
down_df = limit_df[limit_df["limit"] == "D"]
# 排除 ST名称含 ST
up_df = up_df[~up_df["name"].str.contains("ST", na=False)]
down_df = down_df[~down_df["name"].str.contains("ST", na=False)]
# 排除一字板open_times == 0 表示全天封板未开板,不算自然涨停...
# 实际上 open_times 是打开次数0 表示一直封住,属于强势
# 这里我们统计所有非 ST 涨停数作为情绪指标)
limit_up_count = len(up_df)
limit_down_count = len(down_df)
# 连板高度limit_times 字段
if "limit_times" in up_df.columns and not up_df.empty:
max_streak = int(up_df["limit_times"].max())
# 炸板率:曾经涨停但未封住的 = up_stat 中 "炸板" 的记录
# open_times > 0 的算曾经打开过涨停板,如果最终未涨停就是炸板
# 但 limit_list_d 只返回最终涨停的... 需要另一个思路
# 用全市场日线数据:最高价触及涨停但收盘未涨停
if not daily_df.empty:
# 涨停价 = pre_close * 1.1(普通股),容差 0.01
daily_df["limit_price"] = daily_df["pre_close"] * 1.1
touched_limit = daily_df[
(daily_df["high"] >= daily_df["limit_price"] - 0.01) &
(daily_df["pct_chg"] < 9.9) # 触及涨停但收盘未涨停
]
total_touched = len(touched_limit) + limit_up_count
if total_touched > 0:
broken_rate = len(touched_limit) / total_touched * 100
# 3. 上证指数位置
index_df = tushare_client.get_index_daily("000001.SH", days=30)
index_above_ma20 = False
if not index_df.empty:
index_df = index_df.sort_values("trade_date")
if len(index_df) >= 20:
ma20 = index_df["close"].tail(20).mean()
latest_close = index_df["close"].iloc[-1]
index_above_ma20 = latest_close > ma20
# ── 综合评分 ──
score = 0.0
# 涨跌家数比 (0-25分)
ratio = up_count / max(down_count, 1)
if ratio > 2.0:
score += 25
elif ratio > 1.5:
score += 20
elif ratio > 1.0:
score += 15
elif ratio > 0.7:
score += 8
else:
score += 0
# 涨停数 (0-25分)
if limit_up_count > 80:
score += 25
elif limit_up_count > 50:
score += 20
elif limit_up_count > 30:
score += 15
elif limit_up_count > 15:
score += 8
else:
score += 0
# 连板高度 (0-20分)
if max_streak >= 7:
score += 20
elif max_streak >= 5:
score += 15
elif max_streak >= 3:
score += 10
elif max_streak >= 2:
score += 5
# 炸板率 (0-15分) - 炸板率越低越好
if broken_rate < 20:
score += 15
elif broken_rate < 30:
score += 12
elif broken_rate < 50:
score += 6
else:
score += 0
# 指数位置 (0-15分)
if index_above_ma20:
score += 15
else:
score += 3
temperature = min(max(score, 0), 100)
result = MarketTemperature(
trade_date=trade_date,
up_count=up_count,
down_count=down_count,
limit_up_count=limit_up_count,
limit_down_count=limit_down_count,
max_streak=max_streak,
broken_rate=round(broken_rate, 1),
index_above_ma20=index_above_ma20,
temperature=round(temperature, 1),
)
logger.info(f"市场温度 {trade_date}: {temperature:.1f} (涨{up_count}/跌{down_count}, 涨停{limit_up_count}, 连板{max_streak})")
return result