astock-agent/backend/app/engine/screener.py
2026-04-10 23:38:37 +08:00

352 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""趋势突破统一筛选器
三阶段管道:全市场批量预筛 → 资金流过滤 → 逐股深度分析
评分公式:趋势&时机30% + 资金流25% + 供需20% + 板块共振15% + 市场温度10%
自动检测是否在交易时段:
- 盘中模式:用前一日 Tushare 数据 + 腾讯实时行情混合筛选
- 盘后模式:用当日 Tushare 完整数据筛选
"""
import logging
from app.analysis.market_temp import calculate_market_temperature
from app.analysis.sector_scanner import scan_hot_sectors
from app.analysis.trend_scanner import scan_trend_breakout
from app.analysis.signals import generate_signals
from app.analysis.intraday import intraday_market_temperature, intraday_filter_stocks, intraday_sector_scan
from app.data.models import MarketTemperature, SectorInfo, TechnicalSignal, Recommendation
from app.config import settings, is_trading_hours, is_market_session
logger = logging.getLogger(__name__)
async def run_screening(trade_date: str = None) -> dict:
"""执行趋势突破筛选流程
返回: {
"market_temp": MarketTemperature,
"hot_sectors": [SectorInfo],
"recommendations": [Recommendation],
"scan_mode": "intraday" | "post_market",
}
"""
intraday = is_market_session()
scan_mode = "intraday" if intraday else "post_market"
logger.info(f"=== 筛选模式: {'盘中实时' if intraday else '盘后'} ===")
# ── 市场温度 ──
logger.info("=== 市场温度计 ===")
market_temp = calculate_market_temperature(trade_date)
if intraday:
market_temp = await intraday_market_temperature(market_temp)
logger.info(f"盘中市场温度(实时调整): {market_temp.temperature}")
else:
logger.info(f"市场温度: {market_temp.temperature}")
market_temp_score = market_temp.temperature
# ── 板块热度(用于板块共振评分) ──
logger.info("=== 板块热度扫描 ===")
all_sectors = scan_hot_sectors(trade_date)
hot_sectors = all_sectors[:settings.top_sector_count]
# 盘中用实时行情更新板块涨幅和涨停数
if intraday:
hot_sectors = await intraday_sector_scan(hot_sectors)
# ── 趋势突破三阶段管道 ──
logger.info("=== 趋势突破扫描 ===")
candidates = await scan_trend_breakout(
trade_date=trade_date,
market_temp=market_temp,
hot_sectors=hot_sectors,
intraday=intraday,
)
if not candidates:
logger.info("=== 筛选完成: 0 只股票 ===")
return {
"market_temp": market_temp,
"hot_sectors": hot_sectors,
"recommendations": [],
"scan_mode": scan_mode,
}
# ── 构建推荐列表 ──
recommendations = _build_trend_recommendations(
candidates, market_temp, hot_sectors, market_temp_score, intraday,
)
# 过滤低质量推荐
recommendations = [r for r in recommendations if r.score >= 40]
logger.info(f"=== 筛选完成: {len(recommendations)} 只股票 ({scan_mode}) ===")
for r in recommendations[:5]:
signal_map = {"breakout": "突破型", "pullback": "回踩型", "launch": "启动型"}
signal_label = signal_map.get(r.entry_signal_type, r.entry_signal_type)
logger.info(f" [{signal_label}] {r.name}({r.ts_code}) {r.level} 评分={r.score} 信号={r.signal}")
return {
"market_temp": market_temp,
"hot_sectors": hot_sectors,
"recommendations": recommendations,
"scan_mode": scan_mode,
}
def _build_trend_recommendations(
candidates: list[dict],
market_temp: MarketTemperature,
hot_sectors: list[SectorInfo],
market_temp_score: float = 0,
intraday: bool = False,
) -> list[Recommendation]:
"""从趋势突破扫描结果构建推荐列表
评分公式:趋势&时机30% + 资金流25% + 供需20% + 板块共振15% + 市场温度10%
"""
recommendations = []
for stock in candidates:
ts_code = stock["ts_code"]
name = stock["name"]
sector = stock["sector"]
entry_signal_type = stock.get("entry_signal_type", "none")
entry_signal_score = stock.get("entry_signal_score", 0)
tech_signal = stock.get("tech_signal")
# 各维度得分
trend_timing_score = stock.get("trend_timing_score", 50)
supply_demand_score = stock.get("supply_demand_score", 50)
capital_score = stock.get("capital_score", 50)
position_score = stock.get("position_score", 50)
valuation_score = stock.get("valuation_score", 50)
# 板块共振评分
sector_score = _score_sector_resonance(sector, hot_sectors)
sector_stage = _get_sector_stage(sector, hot_sectors)
# 综合评分(新权重)
final_score = (
trend_timing_score * 0.30 +
capital_score * 0.25 +
supply_demand_score * 0.20 +
sector_score * 0.15 +
market_temp_score * 0.10
)
# 风险乘数
if tech_signal:
if tech_signal.rally_pct_5d > 20:
final_score *= 0.65
elif tech_signal.rally_pct_5d > 15:
final_score *= 0.80
if sector_stage == "end":
final_score *= 0.70
elif sector_stage == "late":
final_score *= 0.88
if market_temp_score < 30:
final_score *= 0.75
# 入场信号高置信度奖励
if entry_signal_score >= 80:
final_score *= 1.10
# 确定信号和等级
level = _score_to_level(final_score)
signal = "HOLD"
if entry_signal_type != "none" and entry_signal_score >= 50 and position_score >= 30 and final_score >= 60:
signal = "BUY"
# 价格参考
entry_price = None
target_price = None
stop_loss = None
if tech_signal:
entry_price = tech_signal.support_price
target_price = tech_signal.resist_price
stop_loss = tech_signal.stop_loss_price
# 根据入场信号类型调整参考价
details = stock.get("entry_signal_details", {})
if entry_signal_type == "breakout" and details.get("resist_level"):
entry_price = details["resist_level"]
target_price = round(entry_price * 1.05, 2)
elif entry_signal_type == "pullback" and details.get("support_price"):
entry_price = details["support_price"]
target_price = round(entry_price * 1.05, 2)
elif entry_signal_type == "launch" and details.get("resist_level"):
entry_price = round(details["resist_level"] * 1.01, 2)
target_price = round(details["resist_level"] * 1.08, 2)
# 生成推荐理由
reasons = _generate_reasons(stock, tech_signal, market_temp, intraday)
# 风险提示
risk_note = _generate_risk_note(market_temp, tech_signal, stock)
rec = Recommendation(
ts_code=ts_code,
name=name,
sector=sector,
score=round(final_score, 1),
market_temp_score=round(market_temp_score, 1),
sector_score=round(sector_score, 1),
capital_score=round(capital_score, 1),
technical_score=round(stock.get("technical_score", 50), 1),
position_score=round(position_score, 1),
valuation_score=round(valuation_score, 1),
signal=signal,
entry_price=entry_price,
target_price=target_price,
stop_loss=stop_loss,
reasons=reasons,
risk_note=risk_note,
level=level,
strategy="trend_breakout",
entry_signal_type=entry_signal_type,
)
recommendations.append(rec)
return recommendations
def _score_sector_resonance(sector_name: str, hot_sectors: list[SectorInfo]) -> float:
"""板块共振评分 (0-100)"""
for s in hot_sectors:
if s.sector_name == sector_name:
score = 40 # 在热门板块列表中
score += s.heat_score * 0.3 # 板块热度贡献
if s.stage == "early":
score += 30
elif s.stage == "mid":
score += 20
elif s.stage == "late":
score += 5
return min(score, 100)
return 10.0 # 不在热门板块
def _get_sector_score(sector_name: str, hot_sectors: list[SectorInfo]) -> float:
"""获取板块在热门板块中的得分"""
for s in hot_sectors:
if s.sector_name == sector_name:
return s.heat_score
return 30.0
def _get_sector_stage(sector_name: str, hot_sectors: list[SectorInfo]) -> str:
"""获取板块所处阶段"""
for s in hot_sectors:
if s.sector_name == sector_name:
return s.stage
return "mid"
def _score_to_level(score: float) -> str:
if score >= 80:
return "强烈推荐"
elif score >= 60:
return "推荐"
elif score >= 40:
return "观望"
else:
return "回避"
def _generate_reasons(
stock: dict, tech: TechnicalSignal | None,
market: MarketTemperature, intraday: bool = False,
) -> list[str]:
"""生成推荐理由"""
reasons = []
entry_type = stock.get("entry_signal_type", "none")
signal_map = {"breakout": "突破型", "pullback": "回踩型", "launch": "启动型"}
entry_label = signal_map.get(entry_type, "")
# 入场信号
if entry_label:
details = stock.get("entry_signal_details", {})
if entry_type == "breakout":
breakout_pct = details.get("breakout_pct", 0)
vol_ratio = details.get("volume_ratio", 0)
reasons.append(f"放量突破20日阻力位涨幅{breakout_pct:.1f}%,量比{vol_ratio:.1f}倍)")
elif entry_type == "pullback":
support = details.get("support_ma", "")
shrink = details.get("volume_shrink_ratio", 0)
reasons.append(f"缩量回踩{support}支撑(量能收缩至{shrink:.0%}")
elif entry_type == "launch":
range_pct = details.get("price_range_pct", 0)
shrink = details.get("volume_shrink_ratio", 0)
reasons.append(f"高位缩量整理{range_pct:.1f}%后即将变盘(量缩至{shrink:.0%}")
# 供需分析
vol_trend = stock.get("volume_trend", "")
ds_ratio = stock.get("demand_supply_ratio", 1)
if ds_ratio > 1.5:
reasons.append(f"需求主导(上涨均量/下跌均量={ds_ratio:.1f}")
elif vol_trend == "expanding":
reasons.append("量能逐步放大,资金持续介入")
# 资金流
main_net = stock.get("main_net_inflow", 0)
if main_net > 5000:
reasons.append(f"主力资金大幅流入{main_net:.0f}万元")
elif main_net > 1000:
reasons.append(f"主力资金持续流入{main_net:.0f}万元")
# 板块
sector = stock.get("sector", "")
if sector:
reasons.append(f"所属板块【{sector}")
# 技术面
if tech:
tech_reasons = []
if tech.ma_bullish:
tech_reasons.append("均线多头排列")
if tech.macd_golden:
tech_reasons.append("MACD金叉")
if tech.pullback_support:
tech_reasons.append("缩量回踩支撑")
if tech_reasons:
reasons.append("技术面: " + "".join(tech_reasons))
return reasons[:3]
def _generate_risk_note(
market: MarketTemperature,
tech: TechnicalSignal | None,
stock: dict,
) -> str:
"""生成风险提示"""
notes = []
entry_type = stock.get("entry_signal_type", "")
if entry_type == "breakout":
notes.append("突破型需警惕假突破,关注量能是否持续")
elif entry_type == "pullback":
notes.append("回踩型可能继续下探支撑,注意止损纪律")
elif entry_type == "launch":
notes.append("启动型整理可能延长,注意时间成本")
if market.temperature < 30:
notes.append("市场情绪偏冷,系统性风险较高")
elif market.temperature < 50:
notes.append("市场情绪一般,注意仓位控制")
if tech:
if tech.position_score < 30:
notes.append(f"近期涨幅较大(5日{tech.rally_pct_5d}%),追高风险")
if tech.rally_pct_10d > 20:
notes.append(f"10日累涨{tech.rally_pct_10d}%,警惕回调")
if not notes:
return "注意设好止损,控制仓位"
return "".join(notes)