astock-agent/backend/app/engine/screener.py
2026-04-08 22:39:51 +08:00

350 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""双通道漏斗筛选器
Channel A强中选强市场温度 → 板块热度 → 资金筛选 → 技术信号
Channel B潜在启动全市场技术扫描 → 底部形态 → 估值筛选
自动检测是否在交易时段:
- 盘中模式:用前一日 Tushare 数据 + 腾讯实时行情混合筛选
- 盘后模式:用当日 Tushare 完整数据筛选
"""
import logging
from app.analysis.market_temp import calculate_market_temperature
from app.analysis.sector_scanner import scan_hot_sectors
from app.analysis.capital_flow import filter_stocks_by_capital
from app.analysis.potential_scanner import scan_potential_breakout
from app.analysis.signals import generate_signals
from app.analysis.intraday import intraday_market_temperature, intraday_filter_stocks
from app.data.models import MarketTemperature, SectorInfo, TechnicalSignal, Recommendation
from app.config import settings, is_trading_hours
logger = logging.getLogger(__name__)
async def run_screening(trade_date: str = None) -> dict:
"""执行双通道筛选流程
自动检测交易时段:
- 盘中 → 用前一日板块+实时行情筛选
- 盘后 → 用当日完整数据筛选
返回: {
"market_temp": MarketTemperature,
"hot_sectors": [SectorInfo],
"recommendations": [Recommendation],
"scan_mode": "intraday" | "post_market",
}
"""
intraday = is_trading_hours()
scan_mode = "intraday" if intraday else "post_market"
logger.info(f"=== 筛选模式: {'盘中实时' if intraday else '盘后'} ===")
# ── 市场温度(共享) ──
logger.info("=== 市场温度计 ===")
market_temp = calculate_market_temperature(trade_date)
if intraday:
market_temp = await intraday_market_temperature(market_temp)
logger.info(f"盘中市场温度(实时调整): {market_temp.temperature}")
else:
logger.info(f"市场温度: {market_temp.temperature}")
market_temp_score = market_temp.temperature
# ── 板块热度Channel A 需要) ──
logger.info("=== 板块热度扫描 ===")
all_sectors = scan_hot_sectors(trade_date)
hot_sectors = all_sectors[:settings.top_sector_count]
# ── Channel A强中选强 ──
recommendations_a = []
capital_filtered = []
if hot_sectors:
if intraday:
logger.info("=== Channel A盘中实时个股筛选 ===")
capital_filtered = await intraday_filter_stocks(hot_sectors)
else:
logger.info("=== Channel A个股资金筛选 ===")
capital_filtered = await filter_stocks_by_capital(hot_sectors, trade_date)
if capital_filtered:
recommendations_a = _build_recommendations(
capital_filtered, market_temp, hot_sectors,
market_temp_score=market_temp_score,
strategy="momentum", intraday=intraday,
)
logger.info(f"Channel A强中选强: {len(recommendations_a)}")
# ── Channel B潜在启动 ──
logger.info("=== Channel B潜在启动扫描 ===")
exclude_codes = {r.ts_code for r in recommendations_a}
potential_filtered = scan_potential_breakout(trade_date, exclude_codes)
recommendations_b = []
if potential_filtered:
recommendations_b = _build_recommendations(
potential_filtered, market_temp, hot_sectors,
market_temp_score=market_temp_score,
strategy="potential", intraday=intraday,
)
logger.info(f"Channel B潜在启动: {len(recommendations_b)}")
# 合并,按评分排序
all_recommendations = recommendations_a + recommendations_b
all_recommendations.sort(key=lambda x: x.score, reverse=True)
# 过滤掉低质量推荐
all_recommendations = [r for r in all_recommendations if r.score >= 40]
logger.info(f"=== 筛选完成: {len(all_recommendations)} 只股票 ({scan_mode}) ===")
for r in all_recommendations[:5]:
strategy_label = "强中选强" if r.strategy == "momentum" else "潜在启动"
logger.info(f" [{strategy_label}] {r.name}({r.ts_code}) {r.level} 评分={r.score} 信号={r.signal}")
return {
"market_temp": market_temp,
"hot_sectors": hot_sectors,
"capital_filtered": capital_filtered,
"recommendations": all_recommendations,
"scan_mode": scan_mode,
}
def _build_recommendations(
stocks: list[dict],
market_temp: MarketTemperature,
hot_sectors: list[SectorInfo],
market_temp_score: float = 0,
strategy: str = "momentum",
intraday: bool = False,
) -> list[Recommendation]:
"""从筛选结果构建推荐列表Channel A/B 共用)"""
recommendations = []
for stock in stocks:
ts_code = stock["ts_code"]
name = stock["name"]
sector = stock["sector"]
tech_signal = generate_signals(ts_code, name)
# 板块得分
if strategy == "momentum":
sector_score = _get_sector_score(sector, hot_sectors)
sector_stage = _get_sector_stage(sector, hot_sectors)
else:
# Channel B不在热门板块中给基础分
sector_score = 30.0
sector_stage = "mid"
# 估值安全得分
valuation_score = stock.get("valuation_score", 50)
# 位置安全得分
position_score = tech_signal.position_score
# 综合评分(根据策略调整权重)
if strategy == "momentum":
# 强中选强:板块和资金权重高
# 市场10% + 板块20% + 资金20% + 技术15% + 位置安全15% + 估值安全20%
final_score = (
market_temp_score * 0.10 +
sector_score * 0.20 +
stock["capital_score"] * 0.20 +
tech_signal.score * 0.15 +
position_score * 0.15 +
valuation_score * 0.20
)
else:
# 潜在启动:技术面和估值权重高
# 市场10% + 技术25% + 资金(potential_score)15% + 位置安全25% + 估值安全25%
final_score = (
market_temp_score * 0.10 +
tech_signal.score * 0.25 +
stock["capital_score"] * 0.15 +
position_score * 0.25 +
valuation_score * 0.25
)
# 板块尾声阶段额外惩罚(仅 Channel A
if strategy == "momentum":
if sector_stage == "end":
final_score *= 0.85
elif sector_stage == "late":
final_score *= 0.92
# 确定信号和等级
signal = "HOLD"
if strategy == "momentum":
if (tech_signal.score >= settings.buy_score_threshold
and tech_signal.signal_count >= settings.buy_min_signals
and position_score >= 30):
signal = "BUY"
else:
# Channel B技术面要求稍低但位置安全要求更高
if (tech_signal.score >= settings.buy_score_threshold * 0.7
and position_score >= 40):
signal = "BUY"
level = _score_to_level(final_score)
# 生成推荐理由
reasons = _generate_reasons(stock, tech_signal, market_temp, intraday, strategy)
# 风险提示
risk_note = _generate_risk_note(market_temp, tech_signal, intraday, strategy)
rec = Recommendation(
ts_code=ts_code,
name=name,
sector=sector,
score=round(final_score, 1),
market_temp_score=round(market_temp_score, 1),
sector_score=round(sector_score, 1),
capital_score=round(stock["capital_score"], 1),
technical_score=round(tech_signal.score, 1),
position_score=round(position_score, 1),
valuation_score=round(valuation_score, 1),
signal=signal,
entry_price=tech_signal.support_price,
target_price=tech_signal.resist_price,
stop_loss=tech_signal.stop_loss_price,
reasons=reasons,
risk_note=risk_note,
level=level,
strategy=strategy,
)
recommendations.append(rec)
return recommendations
def _get_sector_score(sector_name: str, hot_sectors: list[SectorInfo]) -> float:
"""获取板块在热门板块中的得分"""
for s in hot_sectors:
if s.sector_name == sector_name:
return s.heat_score
return 30.0
def _get_sector_stage(sector_name: str, hot_sectors: list[SectorInfo]) -> str:
"""获取板块所处阶段"""
for s in hot_sectors:
if s.sector_name == sector_name:
return s.stage
return "mid"
def _score_to_level(score: float) -> str:
if score >= 80:
return "强烈推荐"
elif score >= 60:
return "推荐"
elif score >= 40:
return "观望"
else:
return "回避"
def _generate_reasons(
stock: dict, tech: TechnicalSignal, market: MarketTemperature,
intraday: bool = False, strategy: str = "momentum",
) -> list[str]:
reasons = []
if strategy == "potential":
# Channel B 理由:侧重底部形态和估值
if tech.position_score >= 70:
reasons.append("位置处于低位,距高点回调充分")
if tech.macd_golden:
reasons.append("MACD底部金叉反转信号初现")
if tech.pullback_support:
reasons.append("缩量回踩支撑位,蓄势充分")
if tech.big_yang:
reasons.append("底部出现放量长阳,资金介入")
if stock.get("valuation_score", 0) >= 60:
reasons.append("估值安全,下行空间有限")
if not reasons:
reasons.append("技术面底部信号显现,关注启动时机")
elif intraday:
# Channel A 盘中理由
pct = stock.get("pct_chg", 0)
vr = stock.get("volume_ratio")
if vr and vr > 2:
reasons.append(f"盘中量比{vr:.1f}倍,资金活跃度高")
if pct > 3:
reasons.append(f"盘中涨幅{pct:.1f}%,走势强劲")
elif pct > 0:
reasons.append(f"盘中涨幅{pct:.1f}%,温和上攻")
reasons.append(f"所属板块【{stock['sector']}】为当前热门概念")
tech_reasons = []
if tech.ma_bullish:
tech_reasons.append("均线多头排列")
if tech.volume_breakout:
tech_reasons.append("放量突破")
if tech.macd_golden:
tech_reasons.append("MACD金叉")
if tech_reasons:
reasons.append("技术面: " + "".join(tech_reasons))
else:
# Channel A 盘后理由
inflow = stock.get("main_net_inflow", 0)
if inflow > 5000:
reasons.append(f"主力资金大幅流入{inflow:.0f}万元")
elif inflow > 1000:
reasons.append(f"主力资金持续流入{inflow:.0f}万元")
reasons.append(f"所属板块【{stock['sector']}】为当前热门概念")
tech_reasons = []
if tech.ma_bullish:
tech_reasons.append("均线多头排列")
if tech.volume_breakout:
tech_reasons.append("放量突破")
if tech.macd_golden:
tech_reasons.append("MACD金叉")
if tech.pullback_support:
tech_reasons.append("缩量回踩支撑")
if tech.big_yang:
tech_reasons.append("底部放量长阳")
if tech_reasons:
reasons.append("技术面: " + "".join(tech_reasons))
# 位置安全
if tech.position_score >= 70:
reasons.append("位置安全,距高点有空间")
elif tech.position_score < 30:
reasons.append("注意:短期涨幅较大,追高风险")
return reasons[:3]
def _generate_risk_note(
market: MarketTemperature, tech: TechnicalSignal,
intraday: bool = False, strategy: str = "momentum",
) -> str:
notes = []
if intraday:
notes.append("盘中数据参考,需结合尾盘确认")
if strategy == "potential":
notes.append("底部股票可能继续盘整,注意时间成本")
if market.temperature < 30:
notes.append("市场情绪偏冷,系统性风险较高")
elif market.temperature < 50:
notes.append("市场情绪一般,注意仓位控制")
if tech.score < 40:
notes.append("技术面支撑不足")
if tech.position_score < 30:
notes.append(f"近期涨幅较大(5日{tech.rally_pct_5d}%),追高风险")
if tech.rally_pct_10d > 20:
notes.append(f"10日累涨{tech.rally_pct_10d}%,警惕回调")
if not notes:
return "注意设好止损,控制仓位"
return "".join(notes)