astock-agent/backend/app/engine/screener.py
2026-04-07 20:51:00 +08:00

282 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""四层漏斗筛选器
串联市场温度 → 板块热度 → 资金筛选 → 技术信号,
输出最终推荐列表。
自动检测是否在交易时段:
- 盘中模式:用前一日 Tushare 数据 + 腾讯实时行情混合筛选
- 盘后模式:用当日 Tushare 完整数据筛选
"""
import logging
from app.analysis.market_temp import calculate_market_temperature
from app.analysis.sector_scanner import scan_hot_sectors
from app.analysis.capital_flow import filter_stocks_by_capital
from app.analysis.signals import generate_signals
from app.analysis.intraday import intraday_market_temperature, intraday_filter_stocks
from app.data.models import MarketTemperature, SectorInfo, TechnicalSignal, Recommendation
from app.config import settings, is_trading_hours
logger = logging.getLogger(__name__)
async def run_screening(trade_date: str = None) -> dict:
"""执行完整的四层漏斗筛选流程
自动检测交易时段:
- 盘中 → 用前一日板块+实时行情筛选
- 盘后 → 用当日完整数据筛选
返回: {
"market_temp": MarketTemperature,
"hot_sectors": [SectorInfo],
"capital_filtered": [dict],
"recommendations": [Recommendation],
"scan_mode": "intraday" | "post_market",
}
"""
intraday = is_trading_hours()
scan_mode = "intraday" if intraday else "post_market"
logger.info(f"=== 筛选模式: {'盘中实时' if intraday else '盘后'} ===")
# ── 第一层:市场温度 ──
# 盘中和盘后都先计算基线温度(盘中用前一日数据)
logger.info("=== 第一层:市场温度计 ===")
market_temp = calculate_market_temperature(trade_date)
if intraday:
# 盘中叠加实时指数涨跌调整温度
market_temp = await intraday_market_temperature(market_temp)
logger.info(f"盘中市场温度(实时调整): {market_temp.temperature}")
else:
logger.info(f"市场温度: {market_temp.temperature}")
market_temp_score = market_temp.temperature
# ── 第二层:板块热度 ──
# 盘中用前一日的板块热度作为基线(哪些板块近期是热点)
logger.info("=== 第二层:板块热度扫描 ===")
all_sectors = scan_hot_sectors(trade_date)
hot_sectors = all_sectors[:settings.top_sector_count]
if not hot_sectors:
logger.warning("未找到热门板块")
return {
"market_temp": market_temp,
"hot_sectors": [],
"capital_filtered": [],
"recommendations": [],
"scan_mode": scan_mode,
}
# ── 第三层:资金/实时筛选 ──
if intraday:
logger.info("=== 第三层:盘中实时个股筛选 ===")
capital_filtered = await intraday_filter_stocks(hot_sectors)
else:
logger.info("=== 第三层:个股资金筛选 ===")
capital_filtered = await filter_stocks_by_capital(hot_sectors, trade_date)
if not capital_filtered:
logger.warning("筛选后无符合条件的个股")
return {
"market_temp": market_temp,
"hot_sectors": hot_sectors,
"capital_filtered": [],
"recommendations": [],
"scan_mode": scan_mode,
}
# ── 第四层:技术面信号 ──
logger.info("=== 第四层:技术面买卖信号 ===")
recommendations = []
for stock in capital_filtered:
ts_code = stock["ts_code"]
name = stock["name"]
sector = stock["sector"]
tech_signal = generate_signals(ts_code, name)
# 板块得分(含阶段调整)
sector_score = _get_sector_score(stock["sector"], hot_sectors)
sector_stage = _get_sector_stage(stock["sector"], hot_sectors)
# 估值安全得分
valuation_score = stock.get("valuation_score", 50)
# 位置安全得分
position_score = tech_signal.position_score
# 六维综合评分(降低动量权重,增加安全维度)
# 市场10% + 板块20% + 资金20% + 技术20% + 位置安全15% + 估值安全15%
final_score = (
market_temp_score * 0.10 +
sector_score * 0.20 +
stock["capital_score"] * 0.20 +
tech_signal.score * 0.20 +
position_score * 0.15 +
valuation_score * 0.15
)
# 板块尾声阶段额外惩罚(防止追板块尾部)
if sector_stage == "end":
final_score *= 0.85
elif sector_stage == "late":
final_score *= 0.92
# 确定信号和等级
signal = "HOLD"
if (tech_signal.score >= settings.buy_score_threshold
and tech_signal.signal_count >= settings.buy_min_signals
and position_score >= 30): # 位置太危险不给买入信号
signal = "BUY"
level = _score_to_level(final_score)
# 生成推荐理由
reasons = _generate_reasons(stock, tech_signal, market_temp, intraday)
# 风险提示
risk_note = _generate_risk_note(market_temp, tech_signal, intraday)
rec = Recommendation(
ts_code=ts_code,
name=name,
sector=sector,
score=round(final_score, 1),
market_temp_score=round(market_temp_score, 1),
sector_score=round(sector_score, 1),
capital_score=round(stock["capital_score"], 1),
technical_score=round(tech_signal.score, 1),
position_score=round(position_score, 1),
valuation_score=round(valuation_score, 1),
signal=signal,
entry_price=tech_signal.support_price,
target_price=tech_signal.resist_price,
stop_loss=tech_signal.stop_loss_price,
reasons=reasons,
risk_note=risk_note,
level=level,
)
recommendations.append(rec)
# 按综合评分排序
recommendations.sort(key=lambda x: x.score, reverse=True)
# 过滤掉低质量推荐(综合分 < 40 为"回避"级别,不展示)
recommendations = [r for r in recommendations if r.score >= 40]
logger.info(f"=== 筛选完成: {len(recommendations)} 只股票 ({scan_mode}) ===")
for r in recommendations[:5]:
logger.info(f" {r.name}({r.ts_code}) {r.level} 评分={r.score} 信号={r.signal}")
return {
"market_temp": market_temp,
"hot_sectors": hot_sectors,
"capital_filtered": capital_filtered,
"recommendations": recommendations,
"scan_mode": scan_mode,
}
def _get_sector_score(sector_name: str, hot_sectors: list[SectorInfo]) -> float:
"""获取板块在热门板块中的得分"""
for s in hot_sectors:
if s.sector_name == sector_name:
return s.heat_score
return 30.0 # 不在热门板块中,给一个基础分
def _get_sector_stage(sector_name: str, hot_sectors: list[SectorInfo]) -> str:
"""获取板块所处阶段"""
for s in hot_sectors:
if s.sector_name == sector_name:
return s.stage
return "mid"
def _score_to_level(score: float) -> str:
if score >= 80:
return "强烈推荐"
elif score >= 60:
return "推荐"
elif score >= 40:
return "观望"
else:
return "回避"
def _generate_reasons(
stock: dict, tech: TechnicalSignal, market: MarketTemperature,
intraday: bool = False,
) -> list[str]:
reasons = []
if intraday:
# 盘中理由:侧重实时行情指标
pct = stock.get("pct_chg", 0)
vr = stock.get("volume_ratio")
if vr and vr > 2:
reasons.append(f"盘中量比{vr:.1f}倍,资金活跃度高")
if pct > 3:
reasons.append(f"盘中涨幅{pct:.1f}%,走势强劲")
elif pct > 0:
reasons.append(f"盘中涨幅{pct:.1f}%,温和上攻")
else:
# 盘后理由:侧重资金流向
inflow = stock.get("main_net_inflow", 0)
if inflow > 5000:
reasons.append(f"主力资金大幅流入{inflow:.0f}万元")
elif inflow > 1000:
reasons.append(f"主力资金持续流入{inflow:.0f}万元")
# 板块
reasons.append(f"所属板块【{stock['sector']}】为当前热门概念")
# 技术面
tech_reasons = []
if tech.ma_bullish:
tech_reasons.append("均线多头排列")
if tech.volume_breakout:
tech_reasons.append("放量突破")
if tech.macd_golden:
tech_reasons.append("MACD金叉")
if tech.pullback_support:
tech_reasons.append("缩量回踩支撑")
if tech.big_yang:
tech_reasons.append("底部放量长阳")
if tech_reasons:
reasons.append("技术面: " + "".join(tech_reasons))
# 位置安全
if tech.position_score >= 70:
reasons.append("位置安全,距高点有空间")
elif tech.position_score < 30:
reasons.append("注意:短期涨幅较大,追高风险")
return reasons[:3] # 最多3条
def _generate_risk_note(
market: MarketTemperature, tech: TechnicalSignal,
intraday: bool = False,
) -> str:
notes = []
if intraday:
notes.append("盘中数据参考,需结合尾盘确认")
if market.temperature < 30:
notes.append("市场情绪偏冷,系统性风险较高")
elif market.temperature < 50:
notes.append("市场情绪一般,注意仓位控制")
if tech.score < 40:
notes.append("技术面支撑不足")
if tech.position_score < 30:
notes.append(f"近期涨幅较大(5日{tech.rally_pct_5d}%),追高风险")
if tech.rally_pct_10d > 20:
notes.append(f"10日累涨{tech.rally_pct_10d}%,警惕回调")
if not notes:
return "注意设好止损,控制仓位"
return "".join(notes)