astock-agent/backend/app/engine/screener.py
2026-04-14 22:13:28 +08:00

826 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""趋势突破统一筛选器(自上而下方案)
三阶段管道:
Step 1: 板块定位 — 找到有资金流入的热门板块 (3-5个)
Step 2: 板块内选股 — 在热门板块成分股中筛出有资金流入的候选 (30-50只)
Step 3: 深度分析 — 供需 + 价格行为 + 趋势 (10-15只推荐)
评分公式:供需关系 40% + 价格行为 35% + 趋势 25%
板块和资金流作为前置过滤条件,不参与评分。
数据源:
- 盘中模式Tushare 日线 + 腾讯实时行情混合
- 盘后模式Tushare 当日完整数据
"""
import logging
import pandas as pd
from app.analysis.market_temp import calculate_market_temperature
from app.analysis.sector_scanner import scan_hot_sectors
from app.analysis.trend_scanner import scan_trend_breakout
from app.analysis.signals import generate_signals
from app.analysis.intraday import intraday_market_temperature, intraday_filter_stocks, intraday_sector_scan
from app.data.models import MarketTemperature, SectorInfo, TechnicalSignal, Recommendation
from app.config import settings, is_trading_hours, is_market_session
logger = logging.getLogger(__name__)
async def run_screening(trade_date: str = None) -> dict:
"""执行趋势突破筛选流程
返回: {
"market_temp": MarketTemperature,
"hot_sectors": [SectorInfo],
"recommendations": [Recommendation],
"scan_mode": "intraday" | "post_market",
}
"""
intraday = is_market_session()
scan_mode = "intraday" if intraday else "post_market"
logger.info(f"=== 筛选模式: {'盘中实时' if intraday else '盘后'} ===")
# ── 市场温度 ──
logger.info("=== 市场温度计 ===")
market_temp = calculate_market_temperature(trade_date)
if intraday:
market_temp = await intraday_market_temperature(market_temp)
logger.info(f"盘中市场温度(实时调整): {market_temp.temperature}")
else:
logger.info(f"市场温度: {market_temp.temperature}")
market_temp_score = market_temp.temperature
# ── Step 1: 板块定位 ──
logger.info("=== Step 1: 板块定位 ===")
all_sectors = scan_hot_sectors(trade_date)
# 前置过滤:只保留有资金流入 + 非末期的板块
hot_sectors = [
s for s in all_sectors
if s.capital_inflow > 0 and s.stage not in ("end",)
][:settings.top_sector_count]
if not hot_sectors:
logger.info("无合格热门板块(需要资金流入+非末期),回退到全部板块")
hot_sectors = all_sectors[:settings.top_sector_count]
for s in hot_sectors:
logger.info(f" 目标板块: {s.sector_name} 涨幅{s.pct_change}% 资金{s.capital_inflow:.0f}"
f"涨停{s.limit_up_count} 阶段={s.stage}")
# 盘中用实时行情更新板块涨幅和涨停数
if intraday:
hot_sectors = await intraday_sector_scan(hot_sectors)
# ── Step 2: 板块内选股 ──
logger.info("=== Step 2: 板块内选股 ===")
candidates = await _select_from_hot_sectors(hot_sectors, trade_date, intraday)
if not candidates:
logger.info("=== Step 2 无候选,回退到全市场扫描 ===")
candidates = await scan_trend_breakout(
trade_date=trade_date,
market_temp=market_temp,
hot_sectors=hot_sectors,
intraday=intraday,
)
if not candidates:
logger.info("=== 筛选完成: 0 只股票 ===")
return {
"market_temp": market_temp,
"hot_sectors": hot_sectors,
"recommendations": [],
"scan_mode": scan_mode,
}
# ── Step 3: 供需 + 价格行为 + 趋势评分 ──
logger.info("=== Step 3: 深度分析 ===")
recommendations = _build_recommendations(
candidates, market_temp, hot_sectors, market_temp_score, intraday,
)
# 过滤低质量推荐
recommendations = [r for r in recommendations if r.score >= 40]
logger.info(f"=== 筛选完成: {len(recommendations)} 只股票 ({scan_mode}) ===")
for r in recommendations[:5]:
signal_map = {"breakout": "突破型", "breakout_confirm": "确认型", "pullback": "回踩型", "launch": "启动型", "reversal": "反转型"}
signal_label = signal_map.get(r.entry_signal_type, r.entry_signal_type)
logger.info(f" [{signal_label}] {r.name}({r.ts_code}) {r.level} 评分={r.score} 信号={r.signal}")
return {
"market_temp": market_temp,
"hot_sectors": hot_sectors,
"recommendations": recommendations,
"scan_mode": scan_mode,
}
async def _select_from_hot_sectors(
hot_sectors: list[SectorInfo],
trade_date: str,
intraday: bool,
) -> list[dict]:
"""Step 2: 从热门板块成分股中选出有资金流入的候选
流程:
1. 收集所有热门板块的成分股代码
2. 用 get_daily_all + get_daily_basic 过滤市值/换手率
3. 用 get_moneyflow_batch 过滤主力净流入 > 0
4. 对候选做入场信号初筛(只需满足任一信号类型)
"""
from app.data.tushare_client import tushare_client
from datetime import datetime, timedelta
import pandas as pd
if not trade_date:
trade_date = tushare_client.get_latest_trade_date()
# 收集热门板块成分股代码
sector_member_codes: set[str] = set()
sector_code_map: dict[str, str] = {} # ts_code -> sector_name
for s in hot_sectors:
try:
members_df = tushare_client.get_ths_members(s.sector_code)
if not members_df.empty and "con_code" in members_df.columns:
codes = members_df["con_code"].tolist()
sector_member_codes.update(codes)
for c in codes:
sector_code_map[c] = s.sector_name
except Exception as e:
logger.warning(f"获取板块 {s.sector_name} 成分股失败: {e}")
if not sector_member_codes:
logger.info("Step 2: 无板块成分股数据")
return []
logger.info(f"Step 2: 热门板块共 {len(sector_member_codes)} 只成分股")
# 过滤市值/换手率/ST/次新
stock_basic = tushare_client.get_stock_basic()
exclude_codes = set()
if not stock_basic.empty:
st_codes = set(stock_basic[stock_basic["name"].str.contains("ST", na=False)]["ts_code"])
exclude_codes.update(st_codes)
cutoff = (datetime.now() - timedelta(days=settings.min_list_days)).strftime("%Y%m%d")
new_codes = set(stock_basic[stock_basic["list_date"] > cutoff]["ts_code"])
exclude_codes.update(new_codes)
# 行业映射
industry_map = {}
if not stock_basic.empty:
for _, row in stock_basic.iterrows():
industry_map[row["ts_code"]] = row.get("industry", "")
# 用 daily_basic 过滤
basic = tushare_client.get_daily_basic(trade_date)
if basic.empty:
logger.info("Step 2: daily_basic 无数据")
return []
basic["circ_mv"] = basic["circ_mv"] / 10000 # 万元 → 亿元
filtered_basic = basic[
(basic["ts_code"].isin(sector_member_codes)) &
(~basic["ts_code"].isin(exclude_codes)) &
(basic["circ_mv"] >= settings.min_circ_mv) &
(basic["circ_mv"] <= settings.max_circ_mv) &
(basic["turnover_rate"] >= settings.min_turnover_rate) &
(basic["turnover_rate"] <= settings.max_turnover_rate)
].copy()
logger.info(f"Step 2 基本面过滤: {len(sector_member_codes)} 只 → {len(filtered_basic)}")
if filtered_basic.empty:
return []
# 资金流过滤:主力净流入 > 0
mf = tushare_client.get_moneyflow_batch(trade_date)
if mf.empty:
logger.info("Step 2: 资金流数据为空,跳过资金过滤")
candidate_codes = set(filtered_basic["ts_code"].tolist())
else:
mf["main_net_inflow"] = (
(mf["buy_elg_amount"] - mf["sell_elg_amount"]) +
(mf["buy_lg_amount"] - mf["sell_lg_amount"])
)
total = (
mf["buy_elg_amount"] + mf["sell_elg_amount"] +
mf["buy_lg_amount"] + mf["sell_lg_amount"] +
mf["buy_md_amount"] + mf["sell_md_amount"] +
mf["buy_sm_amount"] + mf["sell_sm_amount"]
)
mf["inflow_ratio"] = (mf["main_net_inflow"] / total.replace(0, float("nan")) * 100).fillna(0)
mf_positive = mf[
(mf["ts_code"].isin(set(filtered_basic["ts_code"]))) &
(mf["main_net_inflow"] > 0)
].sort_values("main_net_inflow", ascending=False)
candidate_codes = set(mf_positive["ts_code"].tolist())
# 构建资金流查找表
mf_lookup = {}
for _, row in mf_positive.iterrows():
mf_lookup[row["ts_code"]] = {
"main_net_inflow": float(row["main_net_inflow"]),
"inflow_ratio": float(row.get("inflow_ratio", 0)),
}
logger.info(f"Step 2 资金流过滤: → {len(candidate_codes)} 只主力净流入 > 0")
if not candidate_codes:
return []
# 构建候选列表
import numpy as np
candidates = []
for ts_code in candidate_codes:
name = ""
if not stock_basic.empty:
row = stock_basic[stock_basic["ts_code"] == ts_code]
if not row.empty:
name = row.iloc[0]["name"]
sector_name = sector_code_map.get(ts_code, industry_map.get(ts_code, ""))
b_row = filtered_basic[filtered_basic["ts_code"] == ts_code]
turnover_rate = float(b_row.iloc[0]["turnover_rate"]) if not b_row.empty else 0
circ_mv = float(b_row.iloc[0]["circ_mv"]) if not b_row.empty else 0
pe = float(b_row.iloc[0]["pe"]) if not b_row.empty and pd.notna(b_row.iloc[0].get("pe")) else None
pb = float(b_row.iloc[0]["pb"]) if not b_row.empty and pd.notna(b_row.iloc[0].get("pb")) else None
volume_ratio = float(b_row.iloc[0]["volume_ratio"]) if not b_row.empty and pd.notna(b_row.iloc[0].get("volume_ratio")) else None
try:
mf_info = mf_lookup.get(ts_code, {})
except NameError:
mf_info = {}
candidates.append({
"ts_code": ts_code,
"name": name,
"sector": sector_name,
"turnover_rate": turnover_rate,
"circ_mv": circ_mv,
"pe": pe,
"pb": pb,
"volume_ratio": volume_ratio,
"main_net_inflow": mf_info.get("main_net_inflow", 0),
"inflow_ratio": mf_info.get("inflow_ratio", 0),
})
logger.info(f"Step 2 候选: {len(candidates)}")
return candidates
def _build_recommendations(
candidates: list[dict],
market_temp: MarketTemperature,
hot_sectors: list[SectorInfo],
market_temp_score: float = 0,
intraday: bool = False,
) -> list[Recommendation]:
"""Step 3: 对候选做供需 + 价格行为 + 趋势深度分析
评分公式:供需关系 40% + 价格行为 35% + 趋势 25%
板块和资金流已在前置过滤中处理。
"""
from app.data.tushare_client import tushare_client
from app.analysis.technical import add_all_indicators
from app.analysis.breakout_signals import (
classify_entry_signal,
score_supply_demand,
analyze_volume_pattern,
EntrySignal,
)
from app.analysis.signals import generate_signals
from app.analysis.capital_flow import _score_valuation
# 名称和行业映射
stock_basic = tushare_client.get_stock_basic()
name_map = {}
industry_map = {}
if not stock_basic.empty:
for _, row in stock_basic.iterrows():
name_map[row["ts_code"]] = row["name"]
industry_map[row["ts_code"]] = row.get("industry", "")
recommendations = []
total = len(candidates)
signal_counts = {"breakout": 0, "breakout_confirm": 0, "pullback": 0, "launch": 0, "reversal": 0, "none": 0}
for idx, stock in enumerate(candidates):
ts_code = stock.get("ts_code", "")
if not ts_code:
continue
name = stock.get("name") or name_map.get(ts_code, ts_code)
sector = stock.get("sector") or industry_map.get(ts_code, "")
try:
# 获取 120 日 K 线
df = tushare_client.get_stock_daily(ts_code, 120)
if df.empty or len(df) < 30:
continue
# 添加技术指标
df = add_all_indicators(df)
# ── 入场信号分类 ──
entry_signal = classify_entry_signal(df)
signal_type = entry_signal["signal_type"]
if signal_type == EntrySignal.NONE:
signal_counts["none"] += 1
continue
signal_counts[signal_type.value] += 1
# ── 三维度评分 ──
# 1. 供需关系评分 (40%)
supply_demand_score = score_supply_demand(df)
# 2. 价格行为评分 (35%)
price_action_score = _score_price_action(df, entry_signal)
# 3. 趋势评分 (25%)
trend_score = _score_trend(df)
# 综合评分
final_score = (
supply_demand_score * 0.40 +
price_action_score * 0.35 +
trend_score * 0.25
)
# 风险乘数
tech_signal = generate_signals(ts_code, name)
if tech_signal:
if tech_signal.rally_pct_5d > 20:
final_score *= 0.65
elif tech_signal.rally_pct_5d > 15:
final_score *= 0.80
# 板块末期惩罚(板块信息来自 hot_sectors
sector_stage = _get_sector_stage(sector, hot_sectors)
if sector_stage == "end":
final_score *= 0.70
elif sector_stage == "late":
final_score *= 0.88
# 市场温度风控
if market_temp_score < 30:
final_score *= 0.75
elif market_temp_score < 50:
final_score *= 0.88
# 高置信度入场信号奖励
if entry_signal.get("signal_score", 0) >= 80:
final_score *= 1.10
# 估值评分(辅助参考,不参与主评分)
pe = stock.get("pe")
pb = stock.get("pb")
valuation_score = _score_valuation(pe, pb)
# 确定信号和等级
level = _score_to_level(final_score)
signal = "HOLD"
position_score = tech_signal.position_score if tech_signal else 50
if (signal_type != EntrySignal.NONE
and entry_signal.get("signal_score", 0) >= 50
and position_score >= 30
and final_score >= 60):
signal = "BUY"
# 价格参考
entry_price = None
target_price = None
stop_loss = None
if tech_signal:
entry_price = tech_signal.support_price
target_price = tech_signal.resist_price
stop_loss = tech_signal.stop_loss_price
details = entry_signal.get("details", {})
st = signal_type.value
if st in ("breakout", "breakout_confirm") and details.get("resist_level"):
entry_price = details["resist_level"]
target_price = round(entry_price * 1.05, 2)
elif st == "pullback" and details.get("support_price"):
entry_price = details["support_price"]
target_price = round(entry_price * 1.05, 2)
elif st == "launch" and details.get("resist_level"):
entry_price = round(details["resist_level"] * 1.01, 2)
target_price = round(details["resist_level"] * 1.08, 2)
elif st == "reversal" and tech_signal and tech_signal.support_price:
entry_price = tech_signal.support_price
target_price = round(entry_price * 1.08, 2)
# 生成推荐理由
reasons = _generate_reasons(stock, entry_signal, tech_signal, df, intraday)
risk_note = _generate_risk_note(market_temp, tech_signal, stock)
# 量价模式
vol_pattern = analyze_volume_pattern(df)
rec = Recommendation(
ts_code=ts_code,
name=name,
sector=sector,
score=round(final_score, 1),
market_temp_score=round(market_temp_score, 1),
sector_score=round(_get_sector_heat(sector, hot_sectors), 1),
capital_score=round(_score_capital_simple(stock), 1),
technical_score=round(trend_score, 1),
position_score=round(position_score, 1),
valuation_score=round(valuation_score, 1),
signal=signal,
entry_price=entry_price,
target_price=target_price,
stop_loss=stop_loss,
reasons=reasons,
risk_note=risk_note,
level=level,
strategy="trend_breakout",
entry_signal_type=signal_type.value,
)
recommendations.append(rec)
if len(recommendations) >= settings.top_stock_count:
break
except Exception as e:
logger.debug(f"深度分析 {ts_code} 失败: {e}")
continue
# 让出控制权(同步函数中无法 await跳过
# idx % 10 == 0 的让步在 _select_from_hot_sectors 的上层 async 函数中处理
logger.info(
f"Step 3 入场信号分布: "
f"突破={signal_counts['breakout']} 确认={signal_counts['breakout_confirm']} "
f"回踩={signal_counts['pullback']} 启动={signal_counts['launch']} "
f"反转={signal_counts['reversal']} 无信号={signal_counts['none']} "
f"(共分析{total}只)"
)
return recommendations
# ── 价格行为评分 ──
def _score_price_action(df, entry_signal: dict) -> float:
"""价格行为学评分 (0-100)
纯粹关注 K 线形态和量价配合,不重复评估趋势/均线因素。
维度:
- K线形态强度 (35): 实体占比、收盘位置、下影线
- 量价配合 (35): 放量/缩量与价格方向的配合度
- 入场形态质量 (30): 各信号类型的形态完成度
"""
import pandas as pd
score = 0
last = df.iloc[-1]
details = entry_signal.get("details", {})
signal_type = entry_signal.get("signal_type")
# K线形态强度 (35)
day_range = last["high"] - last["low"]
if day_range > 0:
# 实体占比(实体/全振幅)
body = abs(last["close"] - last["open"])
body_ratio = body / day_range
if body_ratio > 0.7:
score += 20 # 大实体,方向明确
elif body_ratio > 0.4:
score += 12
elif body_ratio > 0.2:
score += 6
# 收盘位置(越接近高点越好)
close_position = (last["close"] - last["low"]) / day_range
if close_position > 0.8:
score += 10 # 收在上部 20%
elif close_position > 0.6:
score += 6
elif close_position > 0.4:
score += 3
# 下影线(回踩型/启动型利好)
lower_wick = (last["open"] - last["low"]) if last["close"] > last["open"] else (last["close"] - last["low"])
if lower_wick > 0:
wick_ratio = lower_wick / day_range
if signal_type and signal_type.value in ("pullback", "reversal") and wick_ratio > 0.2:
score += 5 # 回踩型/反转型有下影线支撑
# 量价配合 (35)
vol_ma_col = "vol_ma5" if "vol_ma5" in df.columns else None
if vol_ma_col and not pd.isna(last[vol_ma_col]) and last[vol_ma_col] > 0:
vol_ratio = last["vol"] / last[vol_ma_col]
price_up = last["pct_chg"] > 0 if "pct_chg" in df.columns else last["close"] > last["open"]
if price_up and vol_ratio > 2.0:
score += 35 # 放量大阳
elif price_up and vol_ratio > 1.5:
score += 25
elif price_up and vol_ratio > 1.2:
score += 18
elif not price_up and vol_ratio < 0.7:
score += 25 # 缩量回调(良性)
elif not price_up and vol_ratio < 0.9:
score += 15
elif price_up and vol_ratio > 1.0:
score += 10
else:
score += 10
# 入场形态质量 (30) — 只评估形态完成度,不涉及均线/MACD
if signal_type and signal_type.value == "breakout":
breakout_pct = details.get("breakout_pct", 0)
vol_ratio = details.get("volume_ratio", 1)
if breakout_pct > 2 and vol_ratio > 2:
score += 30
elif breakout_pct > 1 and vol_ratio > 1.5:
score += 20
elif breakout_pct > 0:
score += 12
else:
score += 6
elif signal_type and signal_type.value == "breakout_confirm":
vol_ratio = details.get("volume_ratio", 1)
confirm_pct = details.get("confirm_pct", 0)
if vol_ratio > 2 and confirm_pct > 2:
score += 30
elif vol_ratio > 1.5 and confirm_pct > 1:
score += 22
elif vol_ratio > 1.0:
score += 14
else:
score += 8
elif signal_type and signal_type.value == "pullback":
support_ma = details.get("support_ma", "")
shrink = details.get("volume_shrink_ratio", 1)
if support_ma == "MA20" and shrink < 0.6:
score += 30
elif support_ma == "MA20":
score += 22
elif support_ma == "MA10" and shrink < 0.6:
score += 18
else:
score += 10
elif signal_type and signal_type.value == "launch":
range_pct = details.get("price_range_pct", 10)
if range_pct < 3:
score += 30
elif range_pct < 5:
score += 20
else:
score += 10
elif signal_type and signal_type.value == "reversal":
reversal_pct = details.get("reversal_pct", 0)
vol_ratio = details.get("volume_ratio", 1)
if reversal_pct > 5 and vol_ratio > 2.5:
score += 30
elif reversal_pct > 3 and vol_ratio > 2:
score += 22
elif reversal_pct > 3:
score += 14
else:
score += 8
else:
score += 10
return min(score, 100)
# ── 趋势评分 ──
def _score_trend(df) -> float:
"""趋势评分 (0-100)
维度:
- 均线排列 (40): MA5>MA10>MA20>MA60
- 更高高点/更高低点结构 (35): 近 20 日价格结构
- MA20 方向 (25): MA20 是否持续上行
"""
import pandas as pd
score = 0
last = df.iloc[-1]
# 均线排列 (40)
ma_cols = [c for c in ["ma5", "ma10", "ma20", "ma60"] if c in df.columns]
if len(ma_cols) >= 4 and not any(pd.isna(last[c]) for c in ma_cols):
if last["ma5"] > last["ma10"] > last["ma20"] > last["ma60"]:
score += 40 # 完美多头
elif last["ma5"] > last["ma10"] > last["ma20"]:
score += 28
elif last["ma5"] > last["ma20"]:
score += 15
elif "ma5" in df.columns and "ma20" in df.columns:
if not pd.isna(last["ma5"]) and not pd.isna(last["ma20"]) and last["ma5"] > last["ma20"]:
score += 15
# 更高高点/更高低点结构 (35)
if len(df) >= 20:
recent = df.tail(20)
# 检查高点抬升
first_10_high = recent["high"].iloc[:10].max()
second_10_high = recent["high"].iloc[10:].max()
# 检查低点抬升
first_10_low = recent["low"].iloc[:10].min()
second_10_low = recent["low"].iloc[10:].min()
if second_10_high > first_10_high and second_10_low > first_10_low:
score += 35 # 既抬高点又抬低点,最健康
elif second_10_high > first_10_high:
score += 20 # 至少高点抬升
elif second_10_low > first_10_low:
score += 12 # 至少低点抬升
# MA20 方向 (25)
if "ma20" in df.columns and len(df) >= 5:
ma20_now = last["ma20"]
ma20_5d = df.iloc[-5]["ma20"]
if not pd.isna(ma20_now) and not pd.isna(ma20_5d) and ma20_5d > 0:
ma20_pct = (ma20_now - ma20_5d) / ma20_5d * 100
if ma20_pct > 2:
score += 25
elif ma20_pct > 1:
score += 18
elif ma20_pct > 0:
score += 10
return min(score, 100)
# ── 辅助函数 ──
def _get_sector_stage(sector_name: str, hot_sectors: list[SectorInfo]) -> str:
"""获取板块所处阶段"""
for s in hot_sectors:
if s.sector_name == sector_name:
return s.stage
return "mid"
def _get_sector_heat(sector_name: str, hot_sectors: list[SectorInfo]) -> float:
"""获取板块热度得分"""
for s in hot_sectors:
if s.sector_name == sector_name:
return s.heat_score
return 30.0
def _score_capital_simple(stock: dict) -> float:
"""资金流简单评分(仅基于已有数据,不额外调 API"""
main_net = stock.get("main_net_inflow", 0) or 0
inflow_ratio = stock.get("inflow_ratio", 0) or 0
score = 0
if main_net > 10000:
score += 60
elif main_net > 5000:
score += 45
elif main_net > 2000:
score += 30
elif main_net > 0:
score += 15
if inflow_ratio > 15:
score += 40
elif inflow_ratio > 10:
score += 30
elif inflow_ratio > 5:
score += 20
elif inflow_ratio > 0:
score += 10
return min(score, 100)
def _score_to_level(score: float) -> str:
if score >= 80:
return "强烈推荐"
elif score >= 60:
return "推荐"
elif score >= 40:
return "观望"
else:
return "回避"
def _generate_reasons(
stock: dict, entry_signal: dict, tech: TechnicalSignal | None,
df, intraday: bool = False,
) -> list[str]:
"""生成推荐理由"""
import pandas as pd
from app.analysis.breakout_signals import EntrySignal
reasons = []
signal_type = entry_signal.get("signal_type")
details = entry_signal.get("details", {})
signal_map = {EntrySignal.BREAKOUT: "突破型", EntrySignal.BREAKOUT_CONFIRM: "确认型",
EntrySignal.PULLBACK: "回踩型", EntrySignal.LAUNCH: "启动型",
EntrySignal.REVERSAL: "反转型"}
entry_label = signal_map.get(signal_type, "")
# 入场信号
if entry_label and signal_type:
st = signal_type.value
if st == "breakout":
breakout_pct = details.get("breakout_pct", 0)
vol_ratio = details.get("volume_ratio", 0)
reasons.append(f"放量突破20日阻力位涨幅{breakout_pct:.1f}%,量比{vol_ratio:.1f}倍)")
elif st == "breakout_confirm":
vol_ratio = details.get("volume_ratio", 0)
confirm_pct = details.get("confirm_pct", 0)
reasons.append(f"突破后放量确认(确认日涨{confirm_pct:.1f}%,量比{vol_ratio:.1f}倍)")
elif st == "pullback":
support = details.get("support_ma", "")
shrink = details.get("volume_shrink_ratio", 0)
reasons.append(f"缩量回踩{support}支撑(量能收缩至{shrink:.0%}")
elif st == "launch":
range_pct = details.get("price_range_pct", 0)
reasons.append(f"缩量横盘整理{range_pct:.1f}%后首日放量启动")
elif st == "reversal":
reversal_pct = details.get("reversal_pct", 0)
vol_ratio = details.get("volume_ratio", 0)
reasons.append(f"连续下跌后放量长阳反转(涨{reversal_pct:.1f}%,量比{vol_ratio:.1f}倍)")
# 供需分析
if len(df) >= 10:
recent = df.tail(10)
up_days = recent[recent["pct_chg"] > 0]
down_days = recent[recent["pct_chg"] <= 0]
if len(up_days) > 0 and len(down_days) > 0:
avg_up_vol = up_days["vol"].mean()
avg_down_vol = down_days["vol"].mean()
if avg_down_vol > 0:
ds_ratio = avg_up_vol / avg_down_vol
if ds_ratio > 1.5:
reasons.append(f"需求主导(上涨均量/下跌均量={ds_ratio:.1f}")
# 资金流
main_net = stock.get("main_net_inflow", 0)
if main_net > 5000:
reasons.append(f"主力资金大幅流入{main_net:.0f}万元")
elif main_net > 1000:
reasons.append(f"主力资金持续流入{main_net:.0f}万元")
# 板块
sector = stock.get("sector", "")
if sector:
reasons.append(f"所属热门板块【{sector}")
return reasons[:3]
def _generate_risk_note(
market: MarketTemperature,
tech: TechnicalSignal | None,
stock: dict,
) -> str:
"""生成风险提示"""
notes = []
entry_type = stock.get("entry_signal_type", "")
if entry_type == "breakout":
notes.append("突破型需警惕假突破,关注量能是否持续")
elif entry_type == "breakout_confirm":
notes.append("确认型需观察后续量能是否跟上,防止冲高回落")
elif entry_type == "pullback":
notes.append("回踩型可能继续下探支撑,注意止损纪律")
elif entry_type == "launch":
notes.append("启动型整理可能延长,注意时间成本")
elif entry_type == "reversal":
notes.append("反转型可能二次探底,确认底部后再加仓")
if market.temperature < 30:
notes.append("市场情绪偏冷,系统性风险较高")
elif market.temperature < 50:
notes.append("市场情绪一般,注意仓位控制")
if tech:
if tech.position_score < 30:
notes.append(f"近期涨幅较大(5日{tech.rally_pct_5d}%),追高风险")
if tech.rally_pct_10d > 20:
notes.append(f"10日累涨{tech.rally_pct_10d}%,警惕回调")
if not notes:
return "注意设好止损,控制仓位"
return "".join(notes)