"""趋势突破统一筛选器 三阶段管道:全市场批量预筛 → 资金流过滤 → 逐股深度分析 评分公式:趋势&时机30% + 资金流25% + 供需20% + 板块共振15% + 市场温度10% 自动检测是否在交易时段: - 盘中模式:用前一日 Tushare 数据 + 腾讯实时行情混合筛选 - 盘后模式:用当日 Tushare 完整数据筛选 """ import logging from app.analysis.market_temp import calculate_market_temperature from app.analysis.sector_scanner import scan_hot_sectors from app.analysis.trend_scanner import scan_trend_breakout from app.analysis.signals import generate_signals from app.analysis.intraday import intraday_market_temperature, intraday_filter_stocks, intraday_sector_scan from app.data.models import MarketTemperature, SectorInfo, TechnicalSignal, Recommendation from app.config import settings, is_trading_hours, is_market_session logger = logging.getLogger(__name__) async def run_screening(trade_date: str = None) -> dict: """执行趋势突破筛选流程 返回: { "market_temp": MarketTemperature, "hot_sectors": [SectorInfo], "recommendations": [Recommendation], "scan_mode": "intraday" | "post_market", } """ intraday = is_market_session() scan_mode = "intraday" if intraday else "post_market" logger.info(f"=== 筛选模式: {'盘中实时' if intraday else '盘后'} ===") # ── 市场温度 ── logger.info("=== 市场温度计 ===") market_temp = calculate_market_temperature(trade_date) if intraday: market_temp = await intraday_market_temperature(market_temp) logger.info(f"盘中市场温度(实时调整): {market_temp.temperature}") else: logger.info(f"市场温度: {market_temp.temperature}") market_temp_score = market_temp.temperature # ── 板块热度(用于板块共振评分) ── logger.info("=== 板块热度扫描 ===") all_sectors = scan_hot_sectors(trade_date) hot_sectors = all_sectors[:settings.top_sector_count] # 盘中用实时行情更新板块涨幅和涨停数 if intraday: hot_sectors = await intraday_sector_scan(hot_sectors) # ── 趋势突破三阶段管道 ── logger.info("=== 趋势突破扫描 ===") candidates = await scan_trend_breakout( trade_date=trade_date, market_temp=market_temp, hot_sectors=hot_sectors, intraday=intraday, ) if not candidates: logger.info("=== 筛选完成: 0 只股票 ===") return { "market_temp": market_temp, "hot_sectors": hot_sectors, "recommendations": [], "scan_mode": scan_mode, } # ── 构建推荐列表 ── recommendations = _build_trend_recommendations( candidates, market_temp, hot_sectors, market_temp_score, intraday, ) # 过滤低质量推荐 recommendations = [r for r in recommendations if r.score >= 40] logger.info(f"=== 筛选完成: {len(recommendations)} 只股票 ({scan_mode}) ===") for r in recommendations[:5]: signal_map = {"breakout": "突破型", "pullback": "回踩型", "launch": "启动型"} signal_label = signal_map.get(r.entry_signal_type, r.entry_signal_type) logger.info(f" [{signal_label}] {r.name}({r.ts_code}) {r.level} 评分={r.score} 信号={r.signal}") return { "market_temp": market_temp, "hot_sectors": hot_sectors, "recommendations": recommendations, "scan_mode": scan_mode, } def _build_trend_recommendations( candidates: list[dict], market_temp: MarketTemperature, hot_sectors: list[SectorInfo], market_temp_score: float = 0, intraday: bool = False, ) -> list[Recommendation]: """从趋势突破扫描结果构建推荐列表 评分公式:趋势&时机30% + 资金流25% + 供需20% + 板块共振15% + 市场温度10% """ recommendations = [] for stock in candidates: ts_code = stock["ts_code"] name = stock["name"] sector = stock["sector"] entry_signal_type = stock.get("entry_signal_type", "none") entry_signal_score = stock.get("entry_signal_score", 0) tech_signal = stock.get("tech_signal") # 各维度得分 trend_timing_score = stock.get("trend_timing_score", 50) supply_demand_score = stock.get("supply_demand_score", 50) capital_score = stock.get("capital_score", 50) position_score = stock.get("position_score", 50) valuation_score = stock.get("valuation_score", 50) # 板块共振评分 sector_score = _score_sector_resonance(sector, hot_sectors) sector_stage = _get_sector_stage(sector, hot_sectors) # 综合评分(新权重) final_score = ( trend_timing_score * 0.30 + capital_score * 0.25 + supply_demand_score * 0.20 + sector_score * 0.15 + market_temp_score * 0.10 ) # 风险乘数 if tech_signal: if tech_signal.rally_pct_5d > 20: final_score *= 0.65 elif tech_signal.rally_pct_5d > 15: final_score *= 0.80 if sector_stage == "end": final_score *= 0.70 elif sector_stage == "late": final_score *= 0.88 if market_temp_score < 30: final_score *= 0.75 # 入场信号高置信度奖励 if entry_signal_score >= 80: final_score *= 1.10 # 确定信号和等级 level = _score_to_level(final_score) signal = "HOLD" if entry_signal_type != "none" and entry_signal_score >= 50 and position_score >= 30 and final_score >= 60: signal = "BUY" # 价格参考 entry_price = None target_price = None stop_loss = None if tech_signal: entry_price = tech_signal.support_price target_price = tech_signal.resist_price stop_loss = tech_signal.stop_loss_price # 根据入场信号类型调整参考价 details = stock.get("entry_signal_details", {}) if entry_signal_type == "breakout" and details.get("resist_level"): entry_price = details["resist_level"] target_price = round(entry_price * 1.05, 2) elif entry_signal_type == "pullback" and details.get("support_price"): entry_price = details["support_price"] target_price = round(entry_price * 1.05, 2) elif entry_signal_type == "launch" and details.get("resist_level"): entry_price = round(details["resist_level"] * 1.01, 2) target_price = round(details["resist_level"] * 1.08, 2) # 生成推荐理由 reasons = _generate_reasons(stock, tech_signal, market_temp, intraday) # 风险提示 risk_note = _generate_risk_note(market_temp, tech_signal, stock) rec = Recommendation( ts_code=ts_code, name=name, sector=sector, score=round(final_score, 1), market_temp_score=round(market_temp_score, 1), sector_score=round(sector_score, 1), capital_score=round(capital_score, 1), technical_score=round(stock.get("technical_score", 50), 1), position_score=round(position_score, 1), valuation_score=round(valuation_score, 1), signal=signal, entry_price=entry_price, target_price=target_price, stop_loss=stop_loss, reasons=reasons, risk_note=risk_note, level=level, strategy="trend_breakout", entry_signal_type=entry_signal_type, ) recommendations.append(rec) return recommendations def _score_sector_resonance(sector_name: str, hot_sectors: list[SectorInfo]) -> float: """板块共振评分 (0-100)""" for s in hot_sectors: if s.sector_name == sector_name: score = 40 # 在热门板块列表中 score += s.heat_score * 0.3 # 板块热度贡献 if s.stage == "early": score += 30 elif s.stage == "mid": score += 20 elif s.stage == "late": score += 5 return min(score, 100) return 10.0 # 不在热门板块 def _get_sector_score(sector_name: str, hot_sectors: list[SectorInfo]) -> float: """获取板块在热门板块中的得分""" for s in hot_sectors: if s.sector_name == sector_name: return s.heat_score return 30.0 def _get_sector_stage(sector_name: str, hot_sectors: list[SectorInfo]) -> str: """获取板块所处阶段""" for s in hot_sectors: if s.sector_name == sector_name: return s.stage return "mid" def _score_to_level(score: float) -> str: if score >= 80: return "强烈推荐" elif score >= 60: return "推荐" elif score >= 40: return "观望" else: return "回避" def _generate_reasons( stock: dict, tech: TechnicalSignal | None, market: MarketTemperature, intraday: bool = False, ) -> list[str]: """生成推荐理由""" reasons = [] entry_type = stock.get("entry_signal_type", "none") signal_map = {"breakout": "突破型", "pullback": "回踩型", "launch": "启动型"} entry_label = signal_map.get(entry_type, "") # 入场信号 if entry_label: details = stock.get("entry_signal_details", {}) if entry_type == "breakout": breakout_pct = details.get("breakout_pct", 0) vol_ratio = details.get("volume_ratio", 0) reasons.append(f"放量突破20日阻力位(涨幅{breakout_pct:.1f}%,量比{vol_ratio:.1f}倍)") elif entry_type == "pullback": support = details.get("support_ma", "") shrink = details.get("volume_shrink_ratio", 0) reasons.append(f"缩量回踩{support}支撑(量能收缩至{shrink:.0%})") elif entry_type == "launch": range_pct = details.get("price_range_pct", 0) shrink = details.get("volume_shrink_ratio", 0) reasons.append(f"高位缩量整理{range_pct:.1f}%后即将变盘(量缩至{shrink:.0%})") # 供需分析 vol_trend = stock.get("volume_trend", "") ds_ratio = stock.get("demand_supply_ratio", 1) if ds_ratio > 1.5: reasons.append(f"需求主导(上涨均量/下跌均量={ds_ratio:.1f})") elif vol_trend == "expanding": reasons.append("量能逐步放大,资金持续介入") # 资金流 main_net = stock.get("main_net_inflow", 0) if main_net > 5000: reasons.append(f"主力资金大幅流入{main_net:.0f}万元") elif main_net > 1000: reasons.append(f"主力资金持续流入{main_net:.0f}万元") # 板块 sector = stock.get("sector", "") if sector: reasons.append(f"所属板块【{sector}】") # 技术面 if tech: tech_reasons = [] if tech.ma_bullish: tech_reasons.append("均线多头排列") if tech.macd_golden: tech_reasons.append("MACD金叉") if tech.pullback_support: tech_reasons.append("缩量回踩支撑") if tech_reasons: reasons.append("技术面: " + "、".join(tech_reasons)) return reasons[:3] def _generate_risk_note( market: MarketTemperature, tech: TechnicalSignal | None, stock: dict, ) -> str: """生成风险提示""" notes = [] entry_type = stock.get("entry_signal_type", "") if entry_type == "breakout": notes.append("突破型需警惕假突破,关注量能是否持续") elif entry_type == "pullback": notes.append("回踩型可能继续下探支撑,注意止损纪律") elif entry_type == "launch": notes.append("启动型整理可能延长,注意时间成本") if market.temperature < 30: notes.append("市场情绪偏冷,系统性风险较高") elif market.temperature < 50: notes.append("市场情绪一般,注意仓位控制") if tech: if tech.position_score < 30: notes.append(f"近期涨幅较大(5日{tech.rally_pct_5d}%),追高风险") if tech.rally_pct_10d > 20: notes.append(f"10日累涨{tech.rally_pct_10d}%,警惕回调") if not notes: return "注意设好止损,控制仓位" return ";".join(notes)