astock-agent/backend/app/engine/screener.py
2026-04-14 20:51:38 +08:00

756 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""趋势突破统一筛选器(自上而下方案)
三阶段管道:
Step 1: 板块定位 — 找到有资金流入的热门板块 (3-5个)
Step 2: 板块内选股 — 在热门板块成分股中筛出有资金流入的候选 (30-50只)
Step 3: 深度分析 — 供需 + 价格行为 + 趋势 (10-15只推荐)
评分公式:供需关系 40% + 价格行为 35% + 趋势 25%
板块和资金流作为前置过滤条件,不参与评分。
数据源:
- 盘中模式Tushare 日线 + 腾讯实时行情混合
- 盘后模式Tushare 当日完整数据
"""
import logging
from app.analysis.market_temp import calculate_market_temperature
from app.analysis.sector_scanner import scan_hot_sectors
from app.analysis.trend_scanner import scan_trend_breakout
from app.analysis.signals import generate_signals
from app.analysis.intraday import intraday_market_temperature, intraday_filter_stocks, intraday_sector_scan
from app.data.models import MarketTemperature, SectorInfo, TechnicalSignal, Recommendation
from app.config import settings, is_trading_hours, is_market_session
logger = logging.getLogger(__name__)
async def run_screening(trade_date: str = None) -> dict:
"""执行趋势突破筛选流程
返回: {
"market_temp": MarketTemperature,
"hot_sectors": [SectorInfo],
"recommendations": [Recommendation],
"scan_mode": "intraday" | "post_market",
}
"""
intraday = is_market_session()
scan_mode = "intraday" if intraday else "post_market"
logger.info(f"=== 筛选模式: {'盘中实时' if intraday else '盘后'} ===")
# ── 市场温度 ──
logger.info("=== 市场温度计 ===")
market_temp = calculate_market_temperature(trade_date)
if intraday:
market_temp = await intraday_market_temperature(market_temp)
logger.info(f"盘中市场温度(实时调整): {market_temp.temperature}")
else:
logger.info(f"市场温度: {market_temp.temperature}")
market_temp_score = market_temp.temperature
# ── Step 1: 板块定位 ──
logger.info("=== Step 1: 板块定位 ===")
all_sectors = scan_hot_sectors(trade_date)
# 前置过滤:只保留有资金流入 + 非末期的板块
hot_sectors = [
s for s in all_sectors
if s.capital_inflow > 0 and s.stage not in ("end",)
][:settings.top_sector_count]
if not hot_sectors:
logger.info("无合格热门板块(需要资金流入+非末期),回退到全部板块")
hot_sectors = all_sectors[:settings.top_sector_count]
for s in hot_sectors:
logger.info(f" 目标板块: {s.sector_name} 涨幅{s.pct_change}% 资金{s.capital_inflow:.0f}"
f"涨停{s.limit_up_count} 阶段={s.stage}")
# 盘中用实时行情更新板块涨幅和涨停数
if intraday:
hot_sectors = await intraday_sector_scan(hot_sectors)
# ── Step 2: 板块内选股 ──
logger.info("=== Step 2: 板块内选股 ===")
candidates = await _select_from_hot_sectors(hot_sectors, trade_date, intraday)
if not candidates:
logger.info("=== Step 2 无候选,回退到全市场扫描 ===")
candidates = await scan_trend_breakout(
trade_date=trade_date,
market_temp=market_temp,
hot_sectors=hot_sectors,
intraday=intraday,
)
if not candidates:
logger.info("=== 筛选完成: 0 只股票 ===")
return {
"market_temp": market_temp,
"hot_sectors": hot_sectors,
"recommendations": [],
"scan_mode": scan_mode,
}
# ── Step 3: 供需 + 价格行为 + 趋势评分 ──
logger.info("=== Step 3: 深度分析 ===")
recommendations = _build_recommendations(
candidates, market_temp, hot_sectors, market_temp_score, intraday,
)
# 过滤低质量推荐
recommendations = [r for r in recommendations if r.score >= 40]
logger.info(f"=== 筛选完成: {len(recommendations)} 只股票 ({scan_mode}) ===")
for r in recommendations[:5]:
signal_map = {"breakout": "突破型", "pullback": "回踩型", "launch": "启动型"}
signal_label = signal_map.get(r.entry_signal_type, r.entry_signal_type)
logger.info(f" [{signal_label}] {r.name}({r.ts_code}) {r.level} 评分={r.score} 信号={r.signal}")
return {
"market_temp": market_temp,
"hot_sectors": hot_sectors,
"recommendations": recommendations,
"scan_mode": scan_mode,
}
async def _select_from_hot_sectors(
hot_sectors: list[SectorInfo],
trade_date: str,
intraday: bool,
) -> list[dict]:
"""Step 2: 从热门板块成分股中选出有资金流入的候选
流程:
1. 收集所有热门板块的成分股代码
2. 用 get_daily_all + get_daily_basic 过滤市值/换手率
3. 用 get_moneyflow_batch 过滤主力净流入 > 0
4. 对候选做入场信号初筛(只需满足任一信号类型)
"""
from app.data.tushare_client import tushare_client
from datetime import datetime, timedelta
import pandas as pd
if not trade_date:
trade_date = tushare_client.get_latest_trade_date()
# 收集热门板块成分股代码
sector_member_codes: set[str] = set()
sector_code_map: dict[str, str] = {} # ts_code -> sector_name
for s in hot_sectors:
try:
members_df = tushare_client.get_ths_members(s.sector_code)
if not members_df.empty and "con_code" in members_df.columns:
codes = members_df["con_code"].tolist()
sector_member_codes.update(codes)
for c in codes:
sector_code_map[c] = s.sector_name
except Exception as e:
logger.warning(f"获取板块 {s.sector_name} 成分股失败: {e}")
if not sector_member_codes:
logger.info("Step 2: 无板块成分股数据")
return []
logger.info(f"Step 2: 热门板块共 {len(sector_member_codes)} 只成分股")
# 过滤市值/换手率/ST/次新
stock_basic = tushare_client.get_stock_basic()
exclude_codes = set()
if not stock_basic.empty:
st_codes = set(stock_basic[stock_basic["name"].str.contains("ST", na=False)]["ts_code"])
exclude_codes.update(st_codes)
cutoff = (datetime.now() - timedelta(days=settings.min_list_days)).strftime("%Y%m%d")
new_codes = set(stock_basic[stock_basic["list_date"] > cutoff]["ts_code"])
exclude_codes.update(new_codes)
# 行业映射
industry_map = {}
if not stock_basic.empty:
for _, row in stock_basic.iterrows():
industry_map[row["ts_code"]] = row.get("industry", "")
# 用 daily_basic 过滤
basic = tushare_client.get_daily_basic(trade_date)
if basic.empty:
logger.info("Step 2: daily_basic 无数据")
return []
basic["circ_mv"] = basic["circ_mv"] / 10000 # 万元 → 亿元
filtered_basic = basic[
(basic["ts_code"].isin(sector_member_codes)) &
(~basic["ts_code"].isin(exclude_codes)) &
(basic["circ_mv"] >= settings.min_circ_mv) &
(basic["circ_mv"] <= settings.max_circ_mv) &
(basic["turnover_rate"] >= settings.min_turnover_rate) &
(basic["turnover_rate"] <= settings.max_turnover_rate)
].copy()
logger.info(f"Step 2 基本面过滤: {len(sector_member_codes)} 只 → {len(filtered_basic)}")
if filtered_basic.empty:
return []
# 资金流过滤:主力净流入 > 0
mf = tushare_client.get_moneyflow_batch(trade_date)
if mf.empty:
logger.info("Step 2: 资金流数据为空,跳过资金过滤")
candidate_codes = set(filtered_basic["ts_code"].tolist())
else:
mf["main_net_inflow"] = (
(mf["buy_elg_amount"] - mf["sell_elg_amount"]) +
(mf["buy_lg_amount"] - mf["sell_lg_amount"])
)
total = (
mf["buy_elg_amount"] + mf["sell_elg_amount"] +
mf["buy_lg_amount"] + mf["sell_lg_amount"] +
mf["buy_md_amount"] + mf["sell_md_amount"] +
mf["buy_sm_amount"] + mf["sell_sm_amount"]
)
mf["inflow_ratio"] = (mf["main_net_inflow"] / total.replace(0, float("nan")) * 100).fillna(0)
mf_positive = mf[
(mf["ts_code"].isin(set(filtered_basic["ts_code"]))) &
(mf["main_net_inflow"] > 0)
].sort_values("main_net_inflow", ascending=False)
candidate_codes = set(mf_positive["ts_code"].tolist())
# 构建资金流查找表
mf_lookup = {}
for _, row in mf_positive.iterrows():
mf_lookup[row["ts_code"]] = {
"main_net_inflow": float(row["main_net_inflow"]),
"inflow_ratio": float(row.get("inflow_ratio", 0)),
}
logger.info(f"Step 2 资金流过滤: → {len(candidate_codes)} 只主力净流入 > 0")
if not candidate_codes:
return []
# 构建候选列表
import numpy as np
candidates = []
for ts_code in candidate_codes:
name = ""
if not stock_basic.empty:
row = stock_basic[stock_basic["ts_code"] == ts_code]
if not row.empty:
name = row.iloc[0]["name"]
sector_name = sector_code_map.get(ts_code, industry_map.get(ts_code, ""))
b_row = filtered_basic[filtered_basic["ts_code"] == ts_code]
turnover_rate = float(b_row.iloc[0]["turnover_rate"]) if not b_row.empty else 0
circ_mv = float(b_row.iloc[0]["circ_mv"]) if not b_row.empty else 0
pe = float(b_row.iloc[0]["pe"]) if not b_row.empty and pd.notna(b_row.iloc[0].get("pe")) else None
pb = float(b_row.iloc[0]["pb"]) if not b_row.empty and pd.notna(b_row.iloc[0].get("pb")) else None
volume_ratio = float(b_row.iloc[0]["volume_ratio"]) if not b_row.empty and pd.notna(b_row.iloc[0].get("volume_ratio")) else None
try:
mf_info = mf_lookup.get(ts_code, {})
except NameError:
mf_info = {}
candidates.append({
"ts_code": ts_code,
"name": name,
"sector": sector_name,
"turnover_rate": turnover_rate,
"circ_mv": circ_mv,
"pe": pe,
"pb": pb,
"volume_ratio": volume_ratio,
"main_net_inflow": mf_info.get("main_net_inflow", 0),
"inflow_ratio": mf_info.get("inflow_ratio", 0),
})
logger.info(f"Step 2 候选: {len(candidates)}")
return candidates
def _build_recommendations(
candidates: list[dict],
market_temp: MarketTemperature,
hot_sectors: list[SectorInfo],
market_temp_score: float = 0,
intraday: bool = False,
) -> list[Recommendation]:
"""Step 3: 对候选做供需 + 价格行为 + 趋势深度分析
评分公式:供需关系 40% + 价格行为 35% + 趋势 25%
板块和资金流已在前置过滤中处理。
"""
from app.data.tushare_client import tushare_client
from app.analysis.technical import add_all_indicators
from app.analysis.breakout_signals import (
classify_entry_signal,
score_supply_demand,
analyze_volume_pattern,
EntrySignal,
)
from app.analysis.signals import generate_signals
from app.analysis.capital_flow import _score_valuation
# 名称和行业映射
stock_basic = tushare_client.get_stock_basic()
name_map = {}
industry_map = {}
if not stock_basic.empty:
for _, row in stock_basic.iterrows():
name_map[row["ts_code"]] = row["name"]
industry_map[row["ts_code"]] = row.get("industry", "")
recommendations = []
total = len(candidates)
signal_counts = {"breakout": 0, "pullback": 0, "launch": 0, "none": 0}
for idx, stock in enumerate(candidates):
ts_code = stock.get("ts_code", "")
if not ts_code:
continue
name = stock.get("name") or name_map.get(ts_code, ts_code)
sector = stock.get("sector") or industry_map.get(ts_code, "")
try:
# 获取 120 日 K 线
df = tushare_client.get_stock_daily(ts_code, 120)
if df.empty or len(df) < 30:
continue
# 添加技术指标
df = add_all_indicators(df)
# ── 入场信号分类 ──
entry_signal = classify_entry_signal(df)
signal_type = entry_signal["signal_type"]
if signal_type == EntrySignal.NONE:
signal_counts["none"] += 1
continue
signal_counts[signal_type.value] += 1
# ── 三维度评分 ──
# 1. 供需关系评分 (40%)
supply_demand_score = score_supply_demand(df)
# 2. 价格行为评分 (35%)
price_action_score = _score_price_action(df, entry_signal)
# 3. 趋势评分 (25%)
trend_score = _score_trend(df)
# 综合评分
final_score = (
supply_demand_score * 0.40 +
price_action_score * 0.35 +
trend_score * 0.25
)
# 风险乘数
tech_signal = generate_signals(ts_code, name)
if tech_signal:
if tech_signal.rally_pct_5d > 20:
final_score *= 0.65
elif tech_signal.rally_pct_5d > 15:
final_score *= 0.80
# 板块末期惩罚(板块信息来自 hot_sectors
sector_stage = _get_sector_stage(sector, hot_sectors)
if sector_stage == "end":
final_score *= 0.70
elif sector_stage == "late":
final_score *= 0.88
# 市场温度风控
if market_temp_score < 30:
final_score *= 0.75
elif market_temp_score < 50:
final_score *= 0.88
# 高置信度入场信号奖励
if entry_signal.get("signal_score", 0) >= 80:
final_score *= 1.10
# 估值评分(辅助参考,不参与主评分)
pe = stock.get("pe")
pb = stock.get("pb")
valuation_score = _score_valuation(pe, pb)
# 确定信号和等级
level = _score_to_level(final_score)
signal = "HOLD"
position_score = tech_signal.position_score if tech_signal else 50
if (signal_type != EntrySignal.NONE
and entry_signal.get("signal_score", 0) >= 50
and position_score >= 30
and final_score >= 60):
signal = "BUY"
# 价格参考
entry_price = None
target_price = None
stop_loss = None
if tech_signal:
entry_price = tech_signal.support_price
target_price = tech_signal.resist_price
stop_loss = tech_signal.stop_loss_price
details = entry_signal.get("details", {})
st = signal_type.value
if st == "breakout" and details.get("resist_level"):
entry_price = details["resist_level"]
target_price = round(entry_price * 1.05, 2)
elif st == "pullback" and details.get("support_price"):
entry_price = details["support_price"]
target_price = round(entry_price * 1.05, 2)
elif st == "launch" and details.get("resist_level"):
entry_price = round(details["resist_level"] * 1.01, 2)
target_price = round(details["resist_level"] * 1.08, 2)
# 生成推荐理由
reasons = _generate_reasons(stock, entry_signal, tech_signal, df, intraday)
risk_note = _generate_risk_note(market_temp, tech_signal, stock)
# 量价模式
vol_pattern = analyze_volume_pattern(df)
rec = Recommendation(
ts_code=ts_code,
name=name,
sector=sector,
score=round(final_score, 1),
market_temp_score=round(market_temp_score, 1),
sector_score=round(_get_sector_heat(sector, hot_sectors), 1),
capital_score=round(_score_capital_simple(stock), 1),
technical_score=round(trend_score, 1),
position_score=round(position_score, 1),
valuation_score=round(valuation_score, 1),
signal=signal,
entry_price=entry_price,
target_price=target_price,
stop_loss=stop_loss,
reasons=reasons,
risk_note=risk_note,
level=level,
strategy="trend_breakout",
entry_signal_type=signal_type.value,
)
recommendations.append(rec)
if len(recommendations) >= settings.top_stock_count:
break
except Exception as e:
logger.debug(f"深度分析 {ts_code} 失败: {e}")
continue
# 让出控制权(同步函数中无法 await跳过
# idx % 10 == 0 的让步在 _select_from_hot_sectors 的上层 async 函数中处理
logger.info(
f"Step 3 入场信号分布: "
f"突破={signal_counts['breakout']} 回踩={signal_counts['pullback']} "
f"启动={signal_counts['launch']} 无信号={signal_counts['none']} "
f"(共分析{total}只)"
)
return recommendations
# ── 价格行为评分 ──
def _score_price_action(df, entry_signal: dict) -> float:
"""价格行为学评分 (0-100)
维度:
- 入场信号类型质量 (40): 突破型/回踩型/启动型各自的得分
- K线形态强度 (30): 突破日/回踩日的K线实体占比、下影线、收盘位置
- 支撑阻力位质量 (30): 关键价格位置的测试情况
"""
score = 0
last = df.iloc[-1]
details = entry_signal.get("details", {})
signal_type = entry_signal.get("signal_type")
# 入场信号类型质量 (40)
signal_score = entry_signal.get("signal_score", 0)
score += signal_score * 0.40
# K线形态强度 (30)
day_range = last["high"] - last["low"]
if day_range > 0:
# 实体占比(实体/全振幅)
body = abs(last["close"] - last["open"])
body_ratio = body / day_range
if body_ratio > 0.7:
score += 20 # 大实体,方向明确
elif body_ratio > 0.4:
score += 12
elif body_ratio > 0.2:
score += 6
# 收盘位置(越接近高点越好)
close_position = (last["close"] - last["low"]) / day_range
if close_position > 0.8:
score += 10 # 收在上部 20%
elif close_position > 0.6:
score += 6
elif close_position > 0.4:
score += 3
# 支撑阻力位质量 (30)
if signal_type and signal_type.value == "breakout":
# 突破型:阻力位被突破的力度
breakout_pct = details.get("breakout_pct", 0)
vol_ratio = details.get("volume_ratio", 1)
if breakout_pct > 2 and vol_ratio > 2:
score += 30 # 强力突破
elif breakout_pct > 1 and vol_ratio > 1.5:
score += 20
elif breakout_pct > 0:
score += 10
elif signal_type and signal_type.value == "pullback":
# 回踩型:支撑位的精确度
support_ma = details.get("support_ma", "")
shrink = details.get("volume_shrink_ratio", 1)
if support_ma == "MA20" and shrink < 0.6:
score += 30 # 精确回踩 MA20 且大幅缩量
elif support_ma == "MA20":
score += 22
elif support_ma == "MA10" and shrink < 0.6:
score += 18
else:
score += 10
elif signal_type and signal_type.value == "launch":
# 启动型:整理的充分度
range_pct = details.get("price_range_pct", 10)
shrink = details.get("volume_shrink_ratio", 1)
if range_pct < 3 and shrink < 0.4:
score += 30 # 极度缩量窄幅整理
elif range_pct < 5 and shrink < 0.6:
score += 20
else:
score += 10
else:
score += 10
return min(score, 100)
# ── 趋势评分 ──
def _score_trend(df) -> float:
"""趋势评分 (0-100)
维度:
- 均线排列 (40): MA5>MA10>MA20>MA60
- 更高高点/更高低点结构 (35): 近 20 日价格结构
- MA20 方向 (25): MA20 是否持续上行
"""
import pandas as pd
score = 0
last = df.iloc[-1]
# 均线排列 (40)
ma_cols = [c for c in ["ma5", "ma10", "ma20", "ma60"] if c in df.columns]
if len(ma_cols) >= 4 and not any(pd.isna(last[c]) for c in ma_cols):
if last["ma5"] > last["ma10"] > last["ma20"] > last["ma60"]:
score += 40 # 完美多头
elif last["ma5"] > last["ma10"] > last["ma20"]:
score += 28
elif last["ma5"] > last["ma20"]:
score += 15
elif "ma5" in df.columns and "ma20" in df.columns:
if not pd.isna(last["ma5"]) and not pd.isna(last["ma20"]) and last["ma5"] > last["ma20"]:
score += 15
# 更高高点/更高低点结构 (35)
if len(df) >= 20:
recent = df.tail(20)
# 检查高点抬升
first_10_high = recent["high"].iloc[:10].max()
second_10_high = recent["high"].iloc[10:].max()
# 检查低点抬升
first_10_low = recent["low"].iloc[:10].min()
second_10_low = recent["low"].iloc[10:].min()
if second_10_high > first_10_high and second_10_low > first_10_low:
score += 35 # 既抬高点又抬低点,最健康
elif second_10_high > first_10_high:
score += 20 # 至少高点抬升
elif second_10_low > first_10_low:
score += 12 # 至少低点抬升
# MA20 方向 (25)
if "ma20" in df.columns and len(df) >= 5:
ma20_now = last["ma20"]
ma20_5d = df.iloc[-5]["ma20"]
if not pd.isna(ma20_now) and not pd.isna(ma20_5d) and ma20_5d > 0:
ma20_pct = (ma20_now - ma20_5d) / ma20_5d * 100
if ma20_pct > 2:
score += 25
elif ma20_pct > 1:
score += 18
elif ma20_pct > 0:
score += 10
return min(score, 100)
# ── 辅助函数 ──
def _get_sector_stage(sector_name: str, hot_sectors: list[SectorInfo]) -> str:
"""获取板块所处阶段"""
for s in hot_sectors:
if s.sector_name == sector_name:
return s.stage
return "mid"
def _get_sector_heat(sector_name: str, hot_sectors: list[SectorInfo]) -> float:
"""获取板块热度得分"""
for s in hot_sectors:
if s.sector_name == sector_name:
return s.heat_score
return 30.0
def _score_capital_simple(stock: dict) -> float:
"""资金流简单评分(仅基于已有数据,不额外调 API"""
main_net = stock.get("main_net_inflow", 0) or 0
inflow_ratio = stock.get("inflow_ratio", 0) or 0
score = 0
if main_net > 10000:
score += 60
elif main_net > 5000:
score += 45
elif main_net > 2000:
score += 30
elif main_net > 0:
score += 15
if inflow_ratio > 15:
score += 40
elif inflow_ratio > 10:
score += 30
elif inflow_ratio > 5:
score += 20
elif inflow_ratio > 0:
score += 10
return min(score, 100)
def _score_to_level(score: float) -> str:
if score >= 80:
return "强烈推荐"
elif score >= 60:
return "推荐"
elif score >= 40:
return "观望"
else:
return "回避"
def _generate_reasons(
stock: dict, entry_signal: dict, tech: TechnicalSignal | None,
df, intraday: bool = False,
) -> list[str]:
"""生成推荐理由"""
import pandas as pd
reasons = []
signal_type = entry_signal.get("signal_type")
details = entry_signal.get("details", {})
signal_map = {EntrySignal.BREAKOUT: "突破型", EntrySignal.PULLBACK: "回踩型", EntrySignal.LAUNCH: "启动型"}
entry_label = signal_map.get(signal_type, "")
# 入场信号
if entry_label and signal_type:
st = signal_type.value
if st == "breakout":
breakout_pct = details.get("breakout_pct", 0)
vol_ratio = details.get("volume_ratio", 0)
reasons.append(f"放量突破20日阻力位涨幅{breakout_pct:.1f}%,量比{vol_ratio:.1f}倍)")
elif st == "pullback":
support = details.get("support_ma", "")
shrink = details.get("volume_shrink_ratio", 0)
reasons.append(f"缩量回踩{support}支撑(量能收缩至{shrink:.0%}")
elif st == "launch":
range_pct = details.get("price_range_pct", 0)
shrink = details.get("volume_shrink_ratio", 0)
reasons.append(f"高位缩量整理{range_pct:.1f}%后即将变盘(量缩至{shrink:.0%}")
# 供需分析
if len(df) >= 10:
recent = df.tail(10)
up_days = recent[recent["pct_chg"] > 0]
down_days = recent[recent["pct_chg"] <= 0]
if len(up_days) > 0 and len(down_days) > 0:
avg_up_vol = up_days["vol"].mean()
avg_down_vol = down_days["vol"].mean()
if avg_down_vol > 0:
ds_ratio = avg_up_vol / avg_down_vol
if ds_ratio > 1.5:
reasons.append(f"需求主导(上涨均量/下跌均量={ds_ratio:.1f}")
# 资金流
main_net = stock.get("main_net_inflow", 0)
if main_net > 5000:
reasons.append(f"主力资金大幅流入{main_net:.0f}万元")
elif main_net > 1000:
reasons.append(f"主力资金持续流入{main_net:.0f}万元")
# 板块
sector = stock.get("sector", "")
if sector:
reasons.append(f"所属热门板块【{sector}")
return reasons[:3]
def _generate_risk_note(
market: MarketTemperature,
tech: TechnicalSignal | None,
stock: dict,
) -> str:
"""生成风险提示"""
notes = []
entry_type = stock.get("entry_signal_type", "")
if entry_type == "breakout":
notes.append("突破型需警惕假突破,关注量能是否持续")
elif entry_type == "pullback":
notes.append("回踩型可能继续下探支撑,注意止损纪律")
elif entry_type == "launch":
notes.append("启动型整理可能延长,注意时间成本")
if market.temperature < 30:
notes.append("市场情绪偏冷,系统性风险较高")
elif market.temperature < 50:
notes.append("市场情绪一般,注意仓位控制")
if tech:
if tech.position_score < 30:
notes.append(f"近期涨幅较大(5日{tech.rally_pct_5d}%),追高风险")
if tech.rally_pct_10d > 20:
notes.append(f"10日累涨{tech.rally_pct_10d}%,警惕回调")
if not notes:
return "注意设好止损,控制仓位"
return "".join(notes)