diff --git a/backend/app/__pycache__/config.cpython-313.pyc b/backend/app/__pycache__/config.cpython-313.pyc index de9c7742..57859c90 100644 Binary files a/backend/app/__pycache__/config.cpython-313.pyc and b/backend/app/__pycache__/config.cpython-313.pyc differ diff --git a/backend/app/api/__pycache__/recommendations.cpython-313.pyc b/backend/app/api/__pycache__/recommendations.cpython-313.pyc index fdbc92db..d545b149 100644 Binary files a/backend/app/api/__pycache__/recommendations.cpython-313.pyc and b/backend/app/api/__pycache__/recommendations.cpython-313.pyc differ diff --git a/backend/app/api/__pycache__/stocks.cpython-313.pyc b/backend/app/api/__pycache__/stocks.cpython-313.pyc index 79fcd35e..12c28f24 100644 Binary files a/backend/app/api/__pycache__/stocks.cpython-313.pyc and b/backend/app/api/__pycache__/stocks.cpython-313.pyc differ diff --git a/backend/app/api/recommendations.py b/backend/app/api/recommendations.py index d3ee8ea5..35d61030 100644 --- a/backend/app/api/recommendations.py +++ b/backend/app/api/recommendations.py @@ -70,6 +70,10 @@ async def get_latest(): "lifecycle_status": r.lifecycle_status, "data_freshness": r.data_freshness, "llm_score": r.llm_score, + "recall_tags": r.recall_tags, + "prefilter_decision": r.prefilter_decision, + "prefilter_reason": r.prefilter_reason, + "focus_points": r.focus_points, "strategy": r.strategy, "entry_signal_type": r.entry_signal_type, "scan_session": r.scan_session, diff --git a/backend/app/api/stocks.py b/backend/app/api/stocks.py index 54a15e28..e8d00083 100644 --- a/backend/app/api/stocks.py +++ b/backend/app/api/stocks.py @@ -129,6 +129,8 @@ async def get_stock_thesis(ts_code: str): decision_points = [ {"label": "操作计划", "value": r["action_plan"] or "观察"}, + {"label": "召回来源", "value": " / ".join(_safe_json_list(r.get("recall_tags"))) or "未归档"}, + {"label": "AI预筛", "value": r.get("prefilter_decision") or "未执行"}, {"label": "触发条件", "value": r["trigger_condition"] or "等待触发条件归档"}, {"label": "失效条件", "value": r["invalidation_condition"] or "等待失效条件归档"}, {"label": "建议仓位", "value": f"{r['suggested_position_pct']}%" if r["suggested_position_pct"] is not None else "未设置"}, @@ -175,6 +177,10 @@ async def get_stock_thesis(ts_code: str): "data_freshness": r["data_freshness"] or "", "llm_analysis": r["llm_analysis"] or "", "llm_score": r["llm_score"], + "recall_tags": _safe_json_list(r.get("recall_tags")), + "prefilter_decision": r.get("prefilter_decision") or "", + "prefilter_reason": r.get("prefilter_reason") or "", + "focus_points": _safe_json_list(r.get("focus_points")), "strategy": r["strategy"] or "trend_breakout", "entry_signal_type": r["entry_signal_type"] or "none", "entry_timing": r["entry_timing"] or "", @@ -466,7 +472,8 @@ async def diagnose_stock(ts_code: str, mode: str = Query("entry")): "SELECT score, supply_demand_score, price_action_score, " "technical_score, position_score, sector, signal " "FROM recommendations " - "WHERE ts_code = :code AND score >= 60 " + "WHERE ts_code = :code " + "AND (action_plan IN ('可操作', '重点关注') OR COALESCE(llm_score, 0) >= 6 OR score >= 56) " "ORDER BY created_at DESC LIMIT 1" ), {"code": ts_code}, diff --git a/backend/app/config.py b/backend/app/config.py index 69c1fe05..e730b439 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -30,6 +30,10 @@ class Settings(BaseSettings): # 筛选参数 top_sector_count: int = 5 # 关注板块数量 top_stock_count: int = 20 # 进入技术面筛选的个股数 + candidate_pool_limit: int = 120 # 多路召回后的候选池上限 + llm_prefilter_limit: int = 36 # LLM 初筛保留数量 + llm_prefilter_max_concurrent: int = 6 + llm_final_limit: int = 14 # LLM 深裁决池上限 min_turnover_rate: float = 2.0 # 最小换手率 % max_turnover_rate: float = 30.0 # 最大换手率 % min_circ_mv: float = 50.0 # 最小流通市值(亿) diff --git a/backend/app/data/__pycache__/models.cpython-313.pyc b/backend/app/data/__pycache__/models.cpython-313.pyc index 8adf4b9e..af209434 100644 Binary files a/backend/app/data/__pycache__/models.cpython-313.pyc and b/backend/app/data/__pycache__/models.cpython-313.pyc differ diff --git a/backend/app/data/__pycache__/tushare_client.cpython-313.pyc b/backend/app/data/__pycache__/tushare_client.cpython-313.pyc index 12bd2af2..6c2da9d6 100644 Binary files a/backend/app/data/__pycache__/tushare_client.cpython-313.pyc and b/backend/app/data/__pycache__/tushare_client.cpython-313.pyc differ diff --git a/backend/app/data/models.py b/backend/app/data/models.py index 30102021..7c7f29cc 100644 --- a/backend/app/data/models.py +++ b/backend/app/data/models.py @@ -152,6 +152,10 @@ class Recommendation(BaseModel): data_freshness: str = "" # 数据新鲜度说明 llm_analysis: str = "" # LLM 深度分析 llm_score: float | None = None # AI 评分 1-10 + recall_tags: list[str] = [] + prefilter_decision: str = "" + prefilter_reason: str = "" + focus_points: list[str] = [] scan_session: str = "" created_at: datetime | None = None diff --git a/backend/app/data/tushare_client.py b/backend/app/data/tushare_client.py index 36cb390f..3fa70d5c 100644 --- a/backend/app/data/tushare_client.py +++ b/backend/app/data/tushare_client.py @@ -26,8 +26,8 @@ class TushareClient: if not self._initialized: if not self.token: raise ValueError("Tushare token 未配置,请设置 ASTOCK_TUSHARE_TOKEN") - ts.set_token(self.token) - self.pro = ts.pro_api() + # 直接把 token 传给 pro_api,避免 tushare 尝试在用户主目录写 tk.csv + self.pro = ts.pro_api(self.token) self._initialized = True def _rate_limit(self): diff --git a/backend/app/db/__pycache__/database.cpython-313.pyc b/backend/app/db/__pycache__/database.cpython-313.pyc index d47e232c..7dc9d780 100644 Binary files a/backend/app/db/__pycache__/database.cpython-313.pyc and b/backend/app/db/__pycache__/database.cpython-313.pyc differ diff --git a/backend/app/db/__pycache__/tables.cpython-313.pyc b/backend/app/db/__pycache__/tables.cpython-313.pyc index c63e2b8b..b3a4609a 100644 Binary files a/backend/app/db/__pycache__/tables.cpython-313.pyc and b/backend/app/db/__pycache__/tables.cpython-313.pyc differ diff --git a/backend/app/db/database.py b/backend/app/db/database.py index 5af2fe78..ce5aed29 100644 --- a/backend/app/db/database.py +++ b/backend/app/db/database.py @@ -55,6 +55,10 @@ async def init_db(): "ALTER TABLE market_temperature ADD COLUMN broken_rate REAL", "ALTER TABLE recommendations ADD COLUMN entry_signal_type TEXT DEFAULT 'none'", "ALTER TABLE recommendations ADD COLUMN entry_timing TEXT DEFAULT ''", + "ALTER TABLE recommendations ADD COLUMN recall_tags TEXT DEFAULT '[]'", + "ALTER TABLE recommendations ADD COLUMN prefilter_decision TEXT DEFAULT ''", + "ALTER TABLE recommendations ADD COLUMN prefilter_reason TEXT DEFAULT ''", + "ALTER TABLE recommendations ADD COLUMN focus_points TEXT DEFAULT '[]'", "ALTER TABLE sector_heat ADD COLUMN stage TEXT", "ALTER TABLE sector_heat ADD COLUMN days_continuous INTEGER", "ALTER TABLE sector_heat ADD COLUMN member_count INTEGER", diff --git a/backend/app/db/tables.py b/backend/app/db/tables.py index f3f3aab8..18a3140c 100644 --- a/backend/app/db/tables.py +++ b/backend/app/db/tables.py @@ -40,6 +40,10 @@ recommendations_table = Table( Column("entry_signal_type", Text, default="none"), Column("entry_timing", Text, default=""), Column("llm_score", Float, default=None), + Column("recall_tags", Text, default="[]"), + Column("prefilter_decision", Text, default=""), + Column("prefilter_reason", Text, default=""), + Column("focus_points", Text, default="[]"), Column("scan_session", Text), Column("created_at", DateTime, server_default=func.now()), ) diff --git a/backend/app/engine/__pycache__/recommender.cpython-313.pyc b/backend/app/engine/__pycache__/recommender.cpython-313.pyc index 3c5a1ee2..e27dd600 100644 Binary files a/backend/app/engine/__pycache__/recommender.cpython-313.pyc and b/backend/app/engine/__pycache__/recommender.cpython-313.pyc differ diff --git a/backend/app/engine/__pycache__/screener.cpython-313.pyc b/backend/app/engine/__pycache__/screener.cpython-313.pyc index 501d9be6..19e80bcf 100644 Binary files a/backend/app/engine/__pycache__/screener.cpython-313.pyc and b/backend/app/engine/__pycache__/screener.cpython-313.pyc differ diff --git a/backend/app/engine/recommender.py b/backend/app/engine/recommender.py index 86c3b0f0..d8a276c5 100644 --- a/backend/app/engine/recommender.py +++ b/backend/app/engine/recommender.py @@ -365,7 +365,7 @@ async def get_performance_stats() -> dict: text( "SELECT r.ts_code, r.name, r.signal, r.entry_price, " " r.target_price, r.stop_loss, r.entry_signal_type, r.score, " - " r.action_plan, r.lifecycle_status, " + " r.action_plan, r.lifecycle_status, r.recall_tags, r.prefilter_decision, " " t.pct_from_entry, t.current_price, t.track_date, t.hit_target, t.hit_stop_loss, " " t.max_return_pct, t.max_drawdown_pct, t.days_since_recommendation, " " t.close_reason, t.review_note, r.created_at " @@ -388,6 +388,8 @@ async def get_performance_stats() -> dict: "entry_signal_type": r["entry_signal_type"], "action_plan": r["action_plan"], "lifecycle_status": r["lifecycle_status"], + "recall_tags": json.loads(r["recall_tags"]) if r["recall_tags"] else [], + "prefilter_decision": r["prefilter_decision"] or "", "score": r["score"], "entry_price": r["entry_price"], "target_price": r["target_price"], @@ -418,6 +420,8 @@ async def get_performance_stats() -> dict: "hit_target_count": hit_target_count, "hit_stop_count": hit_stop_count, "lifecycle_counts": lifecycle_counts, + "route_breakdown": _build_route_breakdown(details), + "prefilter_breakdown": _build_prefilter_breakdown(details), "details": details, } except Exception as e: @@ -428,7 +432,7 @@ async def get_performance_stats() -> dict: "total_recommendations": 0, "tracked": 0, "winning": 0, "win_rate": 0, "avg_return": 0, "avg_max_return": 0, "avg_max_drawdown": 0, "hit_target_count": 0, - "hit_stop_count": 0, "lifecycle_counts": {}, "details": [], + "hit_stop_count": 0, "lifecycle_counts": {}, "route_breakdown": [], "prefilter_breakdown": [], "details": [], } @@ -470,7 +474,7 @@ async def get_recommendation_history(days: int = 7) -> list[dict]: " ) lt ON t.id = lt.max_id" ") latest_t ON latest_t.recommendation_id = r.id " "WHERE r.created_at >= :start " - "AND r.score >= 60 " + "AND (r.action_plan IN ('可操作', '重点关注') OR COALESCE(r.llm_score, 0) >= 6 OR r.score >= 56) " "AND r.id IN (" " SELECT MAX(id) FROM recommendations " " WHERE created_at >= :start " @@ -526,6 +530,10 @@ async def get_recommendation_history(days: int = 7) -> list[dict]: "entry_signal_type": r.get("entry_signal_type") or "none", "llm_analysis": r.get("llm_analysis") or "", "llm_score": r.get("llm_score"), + "recall_tags": json.loads(r.get("recall_tags") or "[]"), + "prefilter_decision": r.get("prefilter_decision") or "", + "prefilter_reason": r.get("prefilter_reason") or "", + "focus_points": json.loads(r.get("focus_points") or "[]"), "tracking": { "current_price": r.get("latest_current_price"), "pct_from_entry": r.get("latest_pct_from_entry"), @@ -573,6 +581,50 @@ def _score_to_level_static(score: float) -> str: return "观望" +def _build_route_breakdown(details: list[dict]) -> list[dict]: + stats: dict[str, dict] = {} + for item in details: + for tag in item.get("recall_tags", []) or []: + bucket = stats.setdefault(tag, {"route": tag, "count": 0, "wins": 0, "avg_return_sum": 0.0}) + bucket["count"] += 1 + pct = float(item.get("pct_from_entry") or 0) + bucket["avg_return_sum"] += pct + if pct > 0: + bucket["wins"] += 1 + result = [] + for bucket in stats.values(): + count = bucket["count"] or 1 + result.append({ + "route": bucket["route"], + "count": bucket["count"], + "win_rate": round(bucket["wins"] / count * 100, 1), + "avg_return": round(bucket["avg_return_sum"] / count, 2), + }) + return sorted(result, key=lambda item: item["count"], reverse=True) + + +def _build_prefilter_breakdown(details: list[dict]) -> list[dict]: + stats: dict[str, dict] = {} + for item in details: + key = item.get("prefilter_decision") or "unknown" + bucket = stats.setdefault(key, {"decision": key, "count": 0, "wins": 0, "avg_return_sum": 0.0}) + bucket["count"] += 1 + pct = float(item.get("pct_from_entry") or 0) + bucket["avg_return_sum"] += pct + if pct > 0: + bucket["wins"] += 1 + result = [] + for bucket in stats.values(): + count = bucket["count"] or 1 + result.append({ + "decision": bucket["decision"], + "count": bucket["count"], + "win_rate": round(bucket["wins"] / count * 100, 1), + "avg_return": round(bucket["avg_return_sum"] / count, 2), + }) + return sorted(result, key=lambda item: item["count"], reverse=True) + + async def _save_to_db(result: dict): """将推荐结果保存到数据库""" try: @@ -631,7 +683,14 @@ async def _save_to_db(result: dict): # 保存推荐:先批量清除当日旧记录,再批量插入 today_str = datetime.now().strftime("%Y-%m-%d") now_dt = datetime.now() - qualified_recs = [rec for rec in result.get("recommendations", []) if rec.score >= 60] + qualified_recs = [ + rec for rec in result.get("recommendations", []) + if ( + rec.action_plan in {"可操作", "重点关注"} + or (rec.llm_score is not None and rec.llm_score >= 6) + or rec.score >= 56 + ) + ] if qualified_recs: # 批量删除当日同一 ts_code 的旧记录 codes = [rec.ts_code for rec in qualified_recs] @@ -676,6 +735,10 @@ async def _save_to_db(result: dict): "entry_signal_type": rec.entry_signal_type, "entry_timing": rec.entry_timing, "llm_score": rec.llm_score, + "recall_tags": json.dumps(rec.recall_tags, ensure_ascii=False), + "prefilter_decision": rec.prefilter_decision, + "prefilter_reason": rec.prefilter_reason, + "focus_points": json.dumps(rec.focus_points, ensure_ascii=False), "scan_session": rec.scan_session, "created_at": now_dt, } @@ -684,7 +747,10 @@ async def _save_to_db(result: dict): await db.execute(tables.recommendations_table.insert(), rec_values) await db.commit() - logger.info(f"已保存 {len(qualified_recs)} 条推荐到数据库(共 {len(result.get('recommendations', []))} 条,过滤掉 <60 分)") + logger.info( + f"已保存 {len(qualified_recs)} 条推荐到数据库" + f"(共 {len(result.get('recommendations', []))} 条,过滤掉低优先级候选)" + ) except Exception as e: logger.error(f"保存推荐到数据库失败: {e}") from app.db.error_logger import log_error @@ -721,11 +787,11 @@ async def _load_today_from_db() -> dict: temperature=m["temperature"], ) - # 加载推荐(取最近一个有数据的日期,按 ts_code 去重,只取 >= 60 分) + # 加载推荐(取最近一个有数据的日期,按 ts_code 去重,优先保留行动级别更高的结果) result = await db.execute( text("SELECT * FROM recommendations " "WHERE date(created_at) = (SELECT date(created_at) FROM recommendations ORDER BY created_at DESC LIMIT 1) " - "AND score >= 60 " + "AND (action_plan IN ('可操作', '重点关注') OR COALESCE(llm_score, 0) >= 6 OR score >= 56) " "AND id IN (SELECT MAX(id) FROM recommendations " " WHERE date(created_at) = (SELECT date(created_at) FROM recommendations ORDER BY created_at DESC LIMIT 1) " " GROUP BY ts_code) " @@ -766,6 +832,10 @@ async def _load_today_from_db() -> dict: strategy=r.get("strategy") or "trend_breakout", entry_signal_type=r.get("entry_signal_type") or "none", llm_score=r.get("llm_score"), + recall_tags=json.loads(r.get("recall_tags") or "[]"), + prefilter_decision=r.get("prefilter_decision") or "", + prefilter_reason=r.get("prefilter_reason") or "", + focus_points=json.loads(r.get("focus_points") or "[]"), scan_session=r["scan_session"] or "", )) diff --git a/backend/app/engine/screener.py b/backend/app/engine/screener.py index 46a56749..9dd64a36 100644 --- a/backend/app/engine/screener.py +++ b/backend/app/engine/screener.py @@ -91,21 +91,14 @@ async def run_screening(trade_date: str = None) -> dict: f"threshold={strategy_profile.buy_threshold} min_score={strategy_profile.min_score} ===" ) - # ── Step 2: 板块内选股 ── - logger.info("=== Step 2: 板块内选股 ===") - if intraday: - candidates = await intraday_filter_stocks(hot_sectors) - else: - candidates = await _select_from_hot_sectors(hot_sectors, trade_date, intraday) - - if not candidates: - logger.info("=== Step 2 无候选,回退到全市场扫描 ===") - candidates = await scan_trend_breakout( - trade_date=trade_date, - market_temp=market_temp, - hot_sectors=hot_sectors, - intraday=intraday, - ) + # ── Step 2: 多路召回构建候选池 ── + logger.info("=== Step 2: 多路召回候选池 ===") + candidates = await _build_candidate_pool( + hot_sectors=hot_sectors, + trade_date=trade_date, + intraday=intraday, + market_temp=market_temp, + ) if not candidates: logger.info("=== 筛选完成: 0 只股票 ===") @@ -129,14 +122,23 @@ async def run_screening(trade_date: str = None) -> dict: except Exception as e: logger.warning(f"注入实时价格失败,使用 Tushare 收盘价: {e}") - # ── Step 3: 供需 + 价格行为 + 趋势评分 ── - logger.info("=== Step 3: 深度分析 ===") + # ── Step 3: 规则边界 + LLM 两阶段裁决 ── + logger.info("=== Step 3: 规则边界 + LLM 两阶段裁决 ===") recommendations = await _build_recommendations( candidates, market_temp, hot_sectors, market_temp_score, intraday, strategy_profile, ) - # 过滤低质量推荐(低于60分不推荐) - recommendations = [r for r in recommendations if r.score >= strategy_profile.min_score] + if settings.deepseek_api_key: + recommendations = [ + r for r in recommendations + if ( + r.action_plan in {"可操作", "重点关注"} + or (r.llm_score is not None and r.llm_score >= 6) + or r.score >= max(strategy_profile.min_score - 4, 56) + ) + ] + else: + recommendations = [r for r in recommendations if r.score >= strategy_profile.min_score] logger.info(f"=== 筛选完成: {len(recommendations)} 只股票 ({scan_mode}) ===") for r in recommendations[:5]: @@ -158,25 +160,28 @@ async def _select_from_hot_sectors( trade_date: str, intraday: bool, ) -> list[dict]: - """Step 2: 从热门板块成分股中选出有资金流入的候选 + """热点板块轻召回。 - 流程: - 1. 收集所有热门板块的成分股代码 - 2. 用 get_daily_all + get_daily_basic 过滤市值/换手率 - 3. 用 get_moneyflow_batch 过滤主力净流入 > 0 - 4. 对候选做入场信号初筛(只需满足任一信号类型) + 这里只做基础清洗和活跃度排序,不再用“主力净流入必须为正”之类的硬门槛直接淘汰。 """ - from app.data.tushare_client import tushare_client from datetime import datetime, timedelta - import pandas as pd if not trade_date: trade_date = tushare_client.get_latest_trade_date() - # 收集热门板块成分股代码 sector_member_codes: set[str] = set() - sector_code_map: dict[str, str] = {} # ts_code -> sector_name - for s in hot_sectors: + sector_code_map: dict[str, str] = {} + sector_stage_map: dict[str, str] = {} + sector_rank_map: dict[str, int] = {} + leader_codes: set[str] = set() + + for idx, s in enumerate(hot_sectors): + sector_rank_map[s.sector_name] = idx + 1 + sector_stage_map[s.sector_name] = s.stage + for leader in s.leading_stocks or []: + leader_code = str(leader.get("ts_code", "")).strip() + if leader_code: + leader_codes.add(leader_code) try: members_df = tushare_client.get_ths_members(s.sector_code) if not members_df.empty and "con_code" in members_df.columns: @@ -188,47 +193,44 @@ async def _select_from_hot_sectors( logger.warning(f"获取板块 {s.sector_name} 成分股失败: {e}") if not sector_member_codes: - logger.info("Step 2: 无板块成分股数据") + logger.info("Step 2: 热点板块轻召回无成分股数据") return [] - logger.info(f"Step 2: 热门板块共 {len(sector_member_codes)} 只成分股") + logger.info(f"Step 2: 热点板块共 {len(sector_member_codes)} 只成分股") - # 过滤市值/换手率/ST/次新 stock_basic = tushare_client.get_stock_basic() exclude_codes = set() + name_map = {} + industry_map = {} if not stock_basic.empty: st_codes = set(stock_basic[stock_basic["name"].str.contains("ST", na=False)]["ts_code"]) exclude_codes.update(st_codes) cutoff = (datetime.now() - timedelta(days=settings.min_list_days)).strftime("%Y%m%d") new_codes = set(stock_basic[stock_basic["list_date"] > cutoff]["ts_code"]) exclude_codes.update(new_codes) - - # 行业映射 - industry_map = {} - if not stock_basic.empty: for _, row in stock_basic.iterrows(): + name_map[row["ts_code"]] = row["name"] industry_map[row["ts_code"]] = row.get("industry", "") - # 用 daily_basic 过滤 basic = tushare_client.get_daily_basic(trade_date) if basic.empty: logger.info("Step 2: daily_basic 无数据") return [] - basic["circ_mv"] = basic["circ_mv"] / 10000 # 万元 → 亿元 + basic = basic.copy() + basic["circ_mv"] = basic["circ_mv"] / 10000 filtered_basic = basic[ (basic["ts_code"].isin(sector_member_codes)) & (~basic["ts_code"].isin(exclude_codes)) & (basic["circ_mv"] >= settings.min_circ_mv) & (basic["circ_mv"] <= settings.max_circ_mv) & - (basic["turnover_rate"] >= settings.min_turnover_rate) & - (basic["turnover_rate"] <= settings.max_turnover_rate) + (basic["turnover_rate"] >= max(settings.min_turnover_rate * 0.5, 1.0)) & + (basic["turnover_rate"] <= settings.max_turnover_rate * 1.2) ].copy() - # 严格过滤为空时,放宽换手率条件重试 if filtered_basic.empty: - logger.info("Step 2 严格过滤无结果,放宽换手率重试") + logger.info("Step 2 热点板块轻召回严格过滤无结果,放宽换手率重试") filtered_basic = basic[ (basic["ts_code"].isin(sector_member_codes)) & (~basic["ts_code"].isin(exclude_codes)) & @@ -241,12 +243,9 @@ async def _select_from_hot_sectors( if filtered_basic.empty: return [] - # 资金流过滤:主力净流入 > 0 mf = tushare_client.get_moneyflow_batch(trade_date) - if mf.empty: - logger.info("Step 2: 资金流数据为空,跳过资金过滤") - candidate_codes = set(filtered_basic["ts_code"].tolist()) - else: + mf_lookup = {} + if not mf.empty: mf["main_net_inflow"] = ( (mf["buy_elg_amount"] - mf["sell_elg_amount"]) + (mf["buy_lg_amount"] - mf["sell_lg_amount"]) @@ -258,65 +257,177 @@ async def _select_from_hot_sectors( mf["buy_sm_amount"] + mf["sell_sm_amount"] ) mf["inflow_ratio"] = (mf["main_net_inflow"] / total.replace(0, float("nan")) * 100).fillna(0) - - mf_positive = mf[ - (mf["ts_code"].isin(set(filtered_basic["ts_code"]))) & - (mf["main_net_inflow"] > 0) - ].sort_values("main_net_inflow", ascending=False) - - candidate_codes = set(mf_positive["ts_code"].tolist()) - - # 构建资金流查找表 - mf_lookup = {} - for _, row in mf_positive.iterrows(): + for _, row in mf.iterrows(): mf_lookup[row["ts_code"]] = { "main_net_inflow": float(row["main_net_inflow"]), "inflow_ratio": float(row.get("inflow_ratio", 0)), } - - logger.info(f"Step 2 资金流过滤: → {len(candidate_codes)} 只主力净流入 > 0") - - if not candidate_codes: - return [] - - # 构建候选列表 - import numpy as np candidates = [] - for ts_code in candidate_codes: - name = "" - if not stock_basic.empty: - row = stock_basic[stock_basic["ts_code"] == ts_code] - if not row.empty: - name = row.iloc[0]["name"] - + for _, base_row in filtered_basic.iterrows(): + ts_code = base_row["ts_code"] + name = name_map.get(ts_code, ts_code) sector_name = sector_code_map.get(ts_code, industry_map.get(ts_code, "")) - b_row = filtered_basic[filtered_basic["ts_code"] == ts_code] - turnover_rate = float(b_row.iloc[0]["turnover_rate"]) if not b_row.empty else 0 - circ_mv = float(b_row.iloc[0]["circ_mv"]) if not b_row.empty else 0 - pe = float(b_row.iloc[0]["pe"]) if not b_row.empty and pd.notna(b_row.iloc[0].get("pe")) else None - pb = float(b_row.iloc[0]["pb"]) if not b_row.empty and pd.notna(b_row.iloc[0].get("pb")) else None - volume_ratio = float(b_row.iloc[0]["volume_ratio"]) if not b_row.empty and pd.notna(b_row.iloc[0].get("volume_ratio")) else None - - try: - mf_info = mf_lookup.get(ts_code, {}) - except NameError: - mf_info = {} + mf_info = mf_lookup.get(ts_code, {}) + turnover_rate = float(base_row["turnover_rate"]) if pd.notna(base_row.get("turnover_rate")) else 0 + circ_mv = float(base_row["circ_mv"]) if pd.notna(base_row.get("circ_mv")) else 0 + pe = float(base_row["pe"]) if pd.notna(base_row.get("pe")) else None + pb = float(base_row["pb"]) if pd.notna(base_row.get("pb")) else None + volume_ratio = float(base_row["volume_ratio"]) if pd.notna(base_row.get("volume_ratio")) else None + main_net_inflow = float(mf_info.get("main_net_inflow", 0)) + inflow_ratio = float(mf_info.get("inflow_ratio", 0)) + sector_rank = sector_rank_map.get(sector_name, 99) + recall_score = 30 + if sector_rank <= 2: + recall_score += 14 + elif sector_rank <= 5: + recall_score += 8 + if ts_code in leader_codes: + recall_score += 14 + if turnover_rate >= settings.min_turnover_rate: + recall_score += 8 + if volume_ratio and volume_ratio >= 1.2: + recall_score += 8 + if main_net_inflow > 0: + recall_score += 8 + elif main_net_inflow < 0: + recall_score -= 4 + recall_tags = ["hot_sector_core"] + if ts_code in leader_codes: + recall_tags.append("sector_leader") + if main_net_inflow > 0: + recall_tags.append("moneyflow_support") + if volume_ratio and volume_ratio >= 1.5: + recall_tags.append("volume_active") candidates.append({ "ts_code": ts_code, "name": name, "sector": sector_name, + "sector_stage": sector_stage_map.get(sector_name, "mid"), "turnover_rate": turnover_rate, "circ_mv": circ_mv, "pe": pe, "pb": pb, "volume_ratio": volume_ratio, - "main_net_inflow": mf_info.get("main_net_inflow", 0), - "inflow_ratio": mf_info.get("inflow_ratio", 0), + "main_net_inflow": main_net_inflow, + "inflow_ratio": inflow_ratio, + "recall_score": round(recall_score, 1), + "recall_tags": recall_tags, + "stock_role_hint": "板块领涨前排" if ts_code in leader_codes else "板块活跃成分", }) - logger.info(f"Step 2 候选: {len(candidates)} 只") - return candidates + candidates.sort(key=lambda item: ( + item.get("recall_score", 0), + item.get("main_net_inflow", 0), + item.get("turnover_rate", 0), + ), reverse=True) + top = candidates[: settings.candidate_pool_limit] + logger.info(f"Step 2 热点板块轻召回: {len(top)} 只") + return top + + +async def _build_candidate_pool( + hot_sectors: list[SectorInfo], + trade_date: str | None, + intraday: bool, + market_temp: MarketTemperature, +) -> list[dict]: + """多路召回候选池。 + + 目标是提高召回率,再交给 LLM 做资源分配与最终裁决。 + """ + merged: dict[str, dict] = {} + + sector_candidates = await _select_from_hot_sectors(hot_sectors, trade_date, intraday) + _merge_candidate_batch(merged, sector_candidates, route="sector_recall") + + try: + trend_candidates = await scan_trend_breakout( + trade_date=trade_date, + market_temp=market_temp, + hot_sectors=hot_sectors, + intraday=intraday, + ) + except Exception as e: + logger.warning(f"趋势扫描召回失败: {e}") + trend_candidates = [] + _merge_candidate_batch(merged, trend_candidates, route="trend_scan") + + if intraday: + try: + intraday_candidates = await intraday_filter_stocks(hot_sectors) + except Exception as e: + logger.warning(f"盘中异动召回失败: {e}") + intraday_candidates = [] + _merge_candidate_batch(merged, intraday_candidates, route="intraday_active") + + candidates = list(merged.values()) + candidates.sort(key=lambda item: ( + item.get("recall_score", 0), + item.get("main_net_inflow", 0), + item.get("turnover_rate", 0), + item.get("volume_ratio", 0) or 0, + ), reverse=True) + top = candidates[: settings.candidate_pool_limit] + logger.info( + f"Step 2 多路召回完成: sector={len(sector_candidates)} " + f"trend={len(trend_candidates)} " + f"{'intraday=' + str(len(intraday_candidates)) if intraday else ''} " + f"→ merged={len(top)}" + ) + return top + + +def _merge_candidate_batch(merged: dict[str, dict], items: list[dict], route: str) -> None: + for item in items or []: + ts_code = str(item.get("ts_code", "")).strip() + if not ts_code: + continue + + normalized = dict(item) + normalized.setdefault("ts_code", ts_code) + normalized.setdefault("name", ts_code) + normalized.setdefault("sector", item.get("sector", "")) + normalized.setdefault("sector_stage", item.get("sector_stage", "mid")) + normalized.setdefault("recall_tags", []) + normalized.setdefault("stock_role_hint", "待判断") + normalized["recall_tags"] = list({*normalized.get("recall_tags", []), route}) + normalized["recall_score"] = round( + float(normalized.get("recall_score", 0) or 0) + _route_recall_weight(route, normalized), + 1, + ) + + existing = merged.get(ts_code) + if not existing: + merged[ts_code] = normalized + continue + + existing["recall_tags"] = list({*existing.get("recall_tags", []), *normalized.get("recall_tags", [])}) + existing["recall_score"] = round( + min( + 100, + max(float(existing.get("recall_score", 0) or 0), float(normalized.get("recall_score", 0) or 0)) + + min(float(normalized.get("recall_score", 0) or 0) * 0.2, 10), + ), + 1, + ) + for key, value in normalized.items(): + if key in {"recall_tags", "recall_score"}: + continue + if existing.get(key) in (None, "", 0) and value not in (None, "", 0): + existing[key] = value + if len(existing.get("sector", "")) < len(normalized.get("sector", "")): + existing["sector"] = normalized.get("sector", existing.get("sector", "")) + + +def _route_recall_weight(route: str, item: dict) -> float: + if route == "sector_recall": + return 8 + if route == "trend_scan": + return min(float(item.get("entry_signal_score", 0) or 0) * 0.12, 12) + if route == "intraday_active": + return 12 + return 0 async def _build_recommendations( @@ -327,11 +438,7 @@ async def _build_recommendations( intraday: bool = False, strategy_profile=None, ) -> list[Recommendation]: - """Step 3: 对候选做供需 + 价格行为 + 趋势深度分析 - - 评分公式:供需关系 40% + 价格行为 35% + 趋势 25% - 板块和资金流已在前置过滤中处理。 - """ + """Step 3: 规则边界建模 + LLM 两阶段裁决。""" from app.data.tushare_client import tushare_client from app.analysis.technical import add_all_indicators from app.analysis.breakout_signals import ( @@ -342,6 +449,10 @@ async def _build_recommendations( ) from app.analysis.signals import generate_signals from app.analysis.capital_flow import _score_valuation + from app.llm.batch_screener import ( + analyze_candidates_individually, + prefilter_candidates_individually, + ) # 名称和行业映射 stock_basic = tushare_client.get_stock_basic() @@ -353,7 +464,7 @@ async def _build_recommendations( industry_map[row["ts_code"]] = row.get("industry", "") recommendations = [] - llm_candidates = [] # 收集候选摘要供 LLM 分析 + llm_candidates = [] total = len(candidates) signal_counts = {"breakout": 0, "breakout_confirm": 0, "pullback": 0, "launch": 0, "reversal": 0, "none": 0} score_weights = strategy_profile.score_weights if strategy_profile else { @@ -394,42 +505,31 @@ async def _build_recommendations( signal_type = entry_signal["signal_type"] if signal_type == EntrySignal.NONE: signal_counts["none"] += 1 - continue - if signal_priority and signal_type.value not in signal_priority[:4]: - signal_counts["none"] += 1 - continue - signal_counts[signal_type.value] += 1 + signal_name = "none" + else: + signal_name = signal_type.value + signal_counts[signal_name] += 1 # ── 三维度评分 ── - - # 1. 供需关系评分 (50%) — 短线核心 supply_demand_score = score_supply_demand(df) - - # 2. 价格行为评分 (40%) — 形态质量 price_action_score = _score_price_action(df, entry_signal) - - # 3. 趋势评分 (10%) — 短线趋势权重低,偏空直接过滤 trend_score = _score_trend(df) - # 趋势偏空门槛过滤:MA5 20: @@ -448,22 +548,20 @@ async def _build_recommendations( elif market_temp_score < 50: penalties.append(0.88) - # 取最大惩罚(1.0 = 无惩罚) if penalties: final_score *= min(penalties) - # 奖励可叠加(奖励之间互不矛盾) sector_limit_up = _get_sector_limit_up(sector, hot_sectors) - sector_member_count = _get_sector_member_count(sector, hot_sectors) if sector_limit_up >= 5: - final_score *= 1.20 # 板块5+涨停,情绪极强 + final_score *= 1.20 elif sector_limit_up >= 3: - final_score *= 1.10 # 板块3涨停,情绪较强 + final_score *= 1.10 if entry_signal.get("signal_score", 0) >= 80: final_score *= 1.10 - if signal_priority: + signal_matches_profile = bool(signal_priority and signal_name in signal_priority[:4]) + if signal_type != EntrySignal.NONE and signal_priority: priority_rank = signal_priority.index(signal_type.value) if priority_rank == 0: final_score *= 1.08 @@ -472,22 +570,21 @@ async def _build_recommendations( elif priority_rank >= 3: final_score *= 0.94 - # 估值评分(辅助参考,不参与主评分) pe = stock.get("pe") pb = stock.get("pb") valuation_score = _score_valuation(pe, pb) - # 确定信号和等级 level = _score_to_level(final_score) signal = "HOLD" position_score = tech_signal.position_score if tech_signal else 50 - if (signal_type != EntrySignal.NONE - and entry_signal.get("signal_score", 0) >= 50 - and position_score >= 30 - and final_score >= buy_threshold): + if ( + signal_type != EntrySignal.NONE + and entry_signal.get("signal_score", 0) >= 50 + and position_score >= 30 + and final_score >= buy_threshold + ): signal = "BUY" - # 价格参考 — 结构化止损止盈(基于市场结构而非固定百分比) entry_price = None target_price = None stop_loss = None @@ -496,21 +593,16 @@ async def _build_recommendations( st = signal_type.value details = entry_signal.get("details", {}) - # ── 入场价:统一为当前价(短线看盘即时进场) ── entry_price = round(current_close, 2) - # ── 止损价:基于市场结构 ── if st == "breakout": - # 突破型:止损在突破点(被突破的阻力位)下方1% resistance = details.get("resistance_price", 0) if resistance and resistance > 0: stop_loss = round(resistance * 0.99, 2) else: - # fallback: 近20日低点下方1% low_20 = float(df.tail(20)["low"].min()) stop_loss = round(low_20 * 0.99, 2) elif st == "pullback": - # 回踩型:止损在支撑均线下方1.5% support_ma = details.get("support_ma", "MA20") support_price = 0 if support_ma == "MA20" and not pd.isna(last.get("ma20")): @@ -522,71 +614,51 @@ async def _build_recommendations( else: stop_loss = round(current_close * 0.97, 2) elif st == "reversal": - # 反转型:止损在近5日最低点下方1% low_5 = float(df.tail(5)["low"].min()) stop_loss = round(low_5 * 0.99, 2) elif st == "launch": - # 启动型:止损在MA20下方2% if not pd.isna(last.get("ma20")) and last["ma20"] > 0: stop_loss = round(last["ma20"] * 0.98, 2) else: stop_loss = round(current_close * 0.97, 2) else: - # breakout_confirm / 其他:近20日低点下方1% low_20 = float(df.tail(20)["low"].min()) stop_loss = round(min(low_20 * 0.99, current_close * 0.97), 2) - # ── 止盈价:基于下一个阻力位 ── - # 近20日高点作为第一阻力 high_20 = float(df.tail(20)["high"].max()) - # 近60日高点作为第二阻力 high_60 = float(df.tail(60)["high"].max()) if len(df) >= 60 else high_20 if st == "breakout": - # 突破型:刚突破20日高点,目标看60日高点附近 if high_60 > current_close: target_price = round(min(high_60 * 0.98, entry_price * 1.08), 2) else: target_price = round(entry_price * 1.05, 2) elif st == "launch": - # 启动型:整理后启动,目标看整理区间上方+8% target_price = round(min(high_20 * 1.03, entry_price * 1.08), 2) elif st == "reversal": - # 反转型:从低位反转,目标看近20日高点 target_price = round(min(high_20 * 0.98, entry_price * 1.08), 2) elif st == "pullback": - # 回踩型:目标看前高附近 target_price = round(min(high_20 * 0.98, entry_price * 1.05), 2) else: - # breakout_confirm / 其他 target_price = round(min(high_20 * 0.98, entry_price * 1.05), 2) - # 保底:止损不超过入场价-8%(防止结构化止损太远) max_stop_pct = 0.08 if stop_loss < entry_price * (1 - max_stop_pct): stop_loss = round(entry_price * (1 - max_stop_pct), 2) - # 止损不低于入场价-2%(止损太近没有意义) min_stop_pct = 0.02 if stop_loss > entry_price * (1 - min_stop_pct): stop_loss = round(entry_price * (1 - min_stop_pct), 2) - - # 保底:止盈不低于入场价+3%(空间太小不值得做) min_target_pct = 0.03 if target_price < entry_price * (1 + min_target_pct): target_price = round(entry_price * (1 + min_target_pct), 2) - # 生成推荐理由 reasons = _generate_reasons(stock, entry_signal, tech_signal, df, intraday) - stock["entry_signal_type"] = signal_type.value + stock["entry_signal_type"] = signal_name risk_note = _generate_risk_note(market_temp, tech_signal, stock) - - # 量价模式 vol_pattern = analyze_volume_pattern(df) - - # 进场时机建议(盘中适用) - entry_timing = _generate_entry_timing(signal_type.value, intraday) + entry_timing = _generate_entry_timing(signal_name, intraday) trade_plan = _build_trade_plan( - signal_type=signal_type.value, + signal_type=signal_name, score=final_score, market_temp=market_temp, sector_stage=sector_stage, @@ -618,7 +690,7 @@ async def _build_recommendations( risk_note=risk_note, level=level, strategy=strategy_profile.strategy_id if strategy_profile else "trend_breakout", - entry_signal_type=signal_type.value, + entry_signal_type=signal_name, entry_timing=entry_timing, action_plan=trade_plan["action_plan"], trigger_condition=trade_plan["trigger_condition"], @@ -627,6 +699,10 @@ async def _build_recommendations( review_after_days=trade_plan["review_after_days"], lifecycle_status=trade_plan["lifecycle_status"], data_freshness=trade_plan["data_freshness"], + recall_tags=stock.get("recall_tags", []), + prefilter_decision="", + prefilter_reason="", + focus_points=[], ) recommendations.append(rec) @@ -643,9 +719,16 @@ async def _build_recommendations( f"主力净流入{stock.get('main_net_inflow', 0):.0f}万, " f"占比{stock.get('inflow_ratio', 0):.1f}%" ), + "recall_tags": stock.get("recall_tags", []), + "sector_stage": sector_stage, + "stock_role_hint": stock.get("stock_role_hint", "待判断"), + "entry_signal_type": signal_name, + "entry_signal_score": round(entry_signal.get("signal_score", 0), 1), + "signal_matches_profile": signal_matches_profile, + "risk_tags": _build_risk_tags(market_temp, tech_signal, sector_stage, trend_penalty), + "focus_points": _build_focus_points(stock, entry_signal, tech_signal, vol_pattern, sector_stage), } - # 盘中模式:补充分时量能分布数据 if intraday: try: from app.data.eastmoney_client import get_min_kline, analyze_intraday_volume_distribution @@ -670,9 +753,6 @@ async def _build_recommendations( logger.debug(f"深度分析 {ts_code} 失败: {e}") continue - # 让出控制权(同步函数中无法 await,跳过) - # idx % 10 == 0 的让步在 _select_from_hot_sectors 的上层 async 函数中处理 - logger.info( f"Step 3 入场信号分布: " f"突破={signal_counts['breakout']} 确认={signal_counts['breakout_confirm']} " @@ -681,24 +761,55 @@ async def _build_recommendations( f"(共分析{total}只)" ) - # ── LLM 逐股深度分析 ── + recommendations.sort(key=lambda rec: rec.score, reverse=True) + if settings.deepseek_api_key and llm_candidates: try: - from app.llm.batch_screener import analyze_candidates_individually - - # 只对量化评分 Top N 做LLM分析,减少API调用 - llm_candidates.sort(key=lambda c: c["quant_score"], reverse=True) - llm_top = llm_candidates[:settings.top_stock_count] - market_summary = ( f"市场温度: {market_temp.temperature}/100, " f"涨跌比: {market_temp.up_count}涨/{market_temp.down_count}跌, " f"涨停: {market_temp.limit_up_count}家" ) + + llm_candidates.sort(key=lambda c: c["quant_score"], reverse=True) + prefilter_pool = llm_candidates[: settings.llm_prefilter_limit] + prefilter_results = await prefilter_candidates_individually( + prefilter_pool, + market_summary, + max_concurrent=settings.llm_prefilter_max_concurrent, + ) + + prioritized = [] + for item in prefilter_pool: + pre = prefilter_results.get(item["ts_code"], {}) + item["prefilter_decision"] = pre.get("decision", "watch") + item["prefilter_confidence"] = pre.get("confidence", 5) + item["prefilter_reason"] = pre.get("reason", "") + item["prefilter_focus_points"] = pre.get("focus_points", []) + if item["prefilter_decision"] == "priority": + rank_bonus = 16 + elif item["prefilter_decision"] == "watch": + rank_bonus = 6 + else: + rank_bonus = -12 + item["deep_rank"] = round(item["quant_score"] + rank_bonus + item["prefilter_confidence"] * 1.5, 1) + if item["prefilter_decision"] != "ignore": + prioritized.append(item) + + if not prioritized: + prioritized = prefilter_pool[: min(8, len(prefilter_pool))] + + prioritized.sort(key=lambda c: c.get("deep_rank", c["quant_score"]), reverse=True) + llm_top = prioritized[: settings.llm_final_limit] llm_results = await analyze_candidates_individually(llm_top, market_summary) - # 综合规则边界 + LLM 最终裁决 for rec in recommendations: + pre_item = next((item for item in prefilter_pool if item["ts_code"] == rec.ts_code), None) + if pre_item: + rec.prefilter_decision = pre_item.get("prefilter_decision", "") + rec.prefilter_reason = pre_item.get("prefilter_reason", "") + rec.focus_points = pre_item.get("prefilter_focus_points", []) + llm_data = llm_results.get(rec.ts_code) if llm_data: rec.llm_analysis = llm_data.get("analysis", "") @@ -758,18 +869,22 @@ async def _build_recommendations( if llm_data.get("stop_loss"): rec.stop_loss = llm_data["stop_loss"] - # LLM 明确 skip 的标的,从推荐前列剔除 recommendations = [ rec for rec in recommendations - if not (rec.llm_score is not None and rec.llm_score <= 4 and rec.action_plan == "观察" and rec.score < strategy_profile.min_score) + if not ( + rec.llm_score is not None + and rec.llm_score <= 4 + and rec.action_plan == "观察" + and rec.score < max(strategy_profile.min_score - 6, 54) + ) ] recommendations.sort(key=lambda r: r.score, reverse=True) recommendations = recommendations[:settings.top_stock_count] - logger.info(f"LLM 逐股分析完成, 综合评分后保留 {len(recommendations)} 只") + logger.info(f"LLM 两阶段分析完成, 综合评分后保留 {len(recommendations)} 只") except Exception as e: - logger.error(f"LLM 逐股分析失败, 仅使用量化评分: {e}") + logger.error(f"LLM 两阶段分析失败, 仅使用规则边界: {e}") from app.db.error_logger import log_error - await log_error("screener", f"LLM 逐股分析失败, 仅使用量化评分: {e}", detail=traceback.format_exc()) + await log_error("screener", f"LLM 两阶段分析失败, 仅使用规则边界: {e}", detail=traceback.format_exc()) return recommendations @@ -1238,6 +1353,49 @@ def _generate_risk_note( return ";".join(notes) +def _build_risk_tags( + market: MarketTemperature, + tech: TechnicalSignal | None, + sector_stage: str, + trend_penalty: float, +) -> list[str]: + tags: list[str] = [] + if market.temperature < 45: + tags.append("market_weak") + if sector_stage in ("late", "end"): + tags.append(f"sector_{sector_stage}") + if trend_penalty < 0.9: + tags.append("trend_under_pressure") + if tech: + if tech.position_score < 35: + tags.append("position_high") + if tech.rally_pct_10d > 20: + tags.append("short_term_overheat") + return tags + + +def _build_focus_points( + stock: dict, + entry_signal: dict, + tech: TechnicalSignal | None, + vol_pattern: dict, + sector_stage: str, +) -> list[str]: + points: list[str] = [] + signal_type = entry_signal.get("signal_type") + if signal_type and getattr(signal_type, "value", "none") != "none": + points.append(f"确认{signal_type.value}信号是否延续") + if stock.get("main_net_inflow", 0) > 0: + points.append("观察主力流入是否继续放大") + if vol_pattern.get("volume_trend"): + points.append(f"量能状态: {vol_pattern['volume_trend']}") + if tech and tech.support_price: + points.append(f"关键支撑 {tech.support_price}") + if sector_stage in ("late", "end"): + points.append("板块已偏后段,注意是否还有前排承接") + return points[:4] + + def _summarize_for_llm(df, entry_signal: dict, tech_signal: TechnicalSignal | None) -> str: """生成 K 线分析结论供 LLM 判断(输出结论而非原始数据)""" import pandas as pd diff --git a/backend/app/llm/__pycache__/prompts.cpython-313.pyc b/backend/app/llm/__pycache__/prompts.cpython-313.pyc index 8d726eca..913e5a56 100644 Binary files a/backend/app/llm/__pycache__/prompts.cpython-313.pyc and b/backend/app/llm/__pycache__/prompts.cpython-313.pyc differ diff --git a/backend/app/llm/batch_screener.py b/backend/app/llm/batch_screener.py index e9363d64..b7aeac70 100644 --- a/backend/app/llm/batch_screener.py +++ b/backend/app/llm/batch_screener.py @@ -1,7 +1,7 @@ -"""LLM 逐股深度分析 +"""LLM 候选预筛 + 逐股深度分析 -量化筛选完成后,对每只候选股票单独调用 LLM 做深度分析, -让 AI 独立判断入场时机并给出具体买卖价格。 +先做轻量预筛,控制深度裁决成本; +再对重点股票单独调用 LLM 做深度分析。 """ import asyncio @@ -14,6 +14,62 @@ from app.config import settings logger = logging.getLogger(__name__) +async def prefilter_single_stock(candidate: dict, market_summary: str) -> dict: + """对单只候选股票做轻量 LLM 预筛。""" + from app.llm.prompts import STOCK_PREFILTER_PROMPT + from app.llm.client import get_client + + stock_text = f"""\ +股票: {candidate['name']}({candidate['ts_code']}) +板块: {candidate.get('sector', '未知')} +召回来源: {', '.join(candidate.get('recall_tags', []) or ['未标注'])} +规则参考分: {candidate.get('quant_score', 0)}/100 +位置安全: {candidate.get('position_score', 50)}/100 +当前价: {candidate.get('current_price', '未知')} +板块阶段: {candidate.get('sector_stage', '未知')} +个股角色线索: {candidate.get('stock_role_hint', '待判断')}""" + + if candidate.get("kline_summary"): + stock_text += f"\n\n## 技术结构摘要\n{candidate['kline_summary']}" + + if candidate.get("capital_flow_summary"): + stock_text += f"\n\n## 资金与活跃度摘要\n{candidate['capital_flow_summary']}" + + if candidate.get("intraday_volume"): + stock_text += f"\n\n## 分时量能摘要\n{candidate['intraday_volume']}" + + user_msg = f"{STOCK_PREFILTER_PROMPT}\n\n## 市场环境\n{market_summary}\n\n{stock_text}\n\n请输出 JSON。" + + try: + client = get_client() + response = await client.chat.completions.create( + model=settings.deepseek_model, + messages=[ + { + "role": "system", + "content": ( + "你是A股候选池预审官。" + "你只负责决定资源分配优先级,不直接下最终交易结论。" + "必须返回合法JSON。" + ), + }, + {"role": "user", "content": user_msg}, + ], + max_tokens=400, + temperature=0.2, + ) + content = response.choices[0].message.content.strip() + return _parse_prefilter_response(content) + except Exception as e: + logger.error(f"LLM 预筛 {candidate.get('ts_code')} 失败: {e}") + return { + "decision": "watch", + "confidence": 5, + "reason": "AI 预筛暂不可用,保留观察", + "focus_points": [], + } + + async def analyze_single_stock(candidate: dict, market_summary: str) -> dict: """对单只股票做 LLM 深度分析 @@ -95,6 +151,32 @@ async def analyze_single_stock(candidate: dict, market_summary: str) -> dict: } +def _parse_prefilter_response(text: str) -> dict: + data = _extract_json_object(text) + if not data: + return { + "decision": "watch", + "confidence": 5, + "reason": "预筛输出不可解析,默认保留观察", + "focus_points": [], + } + + decision = str(data.get("decision", "watch")).strip().lower() + if decision not in {"priority", "watch", "ignore"}: + decision = "watch" + + focus_points = data.get("focus_points") or [] + if not isinstance(focus_points, list): + focus_points = [] + + return { + "decision": decision, + "confidence": _clamp_int(data.get("confidence"), minimum=1, maximum=10, default=5), + "reason": str(data.get("reason", "")).strip() or "暂无说明", + "focus_points": [str(item).strip() for item in focus_points[:3] if str(item).strip()], + } + + def _parse_single_response(text: str) -> dict: """解析单只股票的 LLM 返回""" data = _extract_json_object(text) @@ -238,3 +320,38 @@ async def analyze_candidates_individually( logger.info(f"LLM 逐股分析完成: {len(results)}/{len(candidates)} 只") return results + + +async def prefilter_candidates_individually( + candidates: list[dict], market_summary: str, max_concurrent: int = 6 +) -> dict[str, dict]: + """对候选股票逐个做 LLM 预筛。""" + if not settings.deepseek_api_key or not candidates: + return {} + + results = {} + semaphore = asyncio.Semaphore(max_concurrent) + + async def _prefilter_with_semaphore(c: dict): + async with semaphore: + ts_code = c["ts_code"] + logger.info(f"LLM 预筛: {c.get('name', ts_code)}") + result = await prefilter_single_stock(c, market_summary) + logger.info( + f"LLM 预筛结果: {c.get('name', ts_code)} → " + f"decision={result['decision']} confidence={result['confidence']}" + ) + return ts_code, result + + tasks = [_prefilter_with_semaphore(c) for c in candidates] + completed = await asyncio.gather(*tasks, return_exceptions=True) + + for item in completed: + if isinstance(item, Exception): + logger.error(f"LLM 预筛任务异常: {item}") + continue + ts_code, result = item + results[ts_code] = result + + logger.info(f"LLM 预筛完成: {len(results)}/{len(candidates)} 只") + return results diff --git a/backend/app/llm/prompts.py b/backend/app/llm/prompts.py index 700934d4..b448e3c9 100644 --- a/backend/app/llm/prompts.py +++ b/backend/app/llm/prompts.py @@ -156,3 +156,31 @@ SINGLE_STOCK_ANALYSIS_PROMPT = """\ - position_pct 返回 0-35 的整数;如果不适合参与,就返回 0 - 没有把握时优先给 watch 或 skip - trigger_condition 和 invalidation_condition 必须可执行,不能写空话""" + + +STOCK_PREFILTER_PROMPT = """\ +你是A股候选池的预审官,目标是在不漏掉潜在机会的前提下,先把候选股票分成“优先深看 / 保留观察 / 可忽略”三类。 + +你的原则: +1. 这一步不是最终买卖结论,只做资源分配 +2. 不能因为某一个规则分数低就直接忽略,要看题材位置、角色、量价异常、时机感 +3. 可以容忍不标准的形态,但不能容忍明显失真、明显追高、明显没有交易边界的票 +4. 输出必须是 JSON,不要输出 Markdown + +字段格式: +{ + "decision": "priority | watch | ignore", + "confidence": 1-10, + "reason": "一句话说明为什么这样分层", + "focus_points": ["最多三条,说明后续该重点看什么"] +} + +分层标准: +- priority: 值得进入深度裁决池,今天存在较强观察价值或操作潜力 +- watch: 逻辑未坏,但暂时不应占用深度裁决名额 +- ignore: 当前信号弱、位置差、边界不清或交易价值很低 + +补充要求: +- confidence 必须是 1-10 整数 +- focus_points 最多三条,尽量具体 +- 如果拿不准,优先给 watch,不要滥给 ignore""" diff --git a/backend/astock.db b/backend/astock.db index ccf34c11..6c8e9bca 100644 Binary files a/backend/astock.db and b/backend/astock.db differ diff --git a/frontend/src/app/(auth)/recommendations/page.tsx b/frontend/src/app/(auth)/recommendations/page.tsx index af10af7d..e9c258d0 100644 --- a/frontend/src/app/(auth)/recommendations/page.tsx +++ b/frontend/src/app/(auth)/recommendations/page.tsx @@ -160,21 +160,26 @@ export default function RecommendationsPage() {
Recommendation Logic
-
+
+ + -
@@ -257,6 +262,30 @@ export default function RecommendationsPage() { ))}
+ {((performance.route_breakdown?.length ?? 0) > 0 || (performance.prefilter_breakdown?.length ?? 0) > 0) && ( +
+ {(performance.route_breakdown?.length ?? 0) > 0 ? ( + ({ + label: formatRouteLabel(item.route), + value: `${item.count}只`, + detail: `胜率 ${item.win_rate.toFixed(1)}% · 平均 ${item.avg_return > 0 ? "+" : ""}${item.avg_return.toFixed(2)}%`, + }))} + /> + ) : null} + {(performance.prefilter_breakdown?.length ?? 0) > 0 ? ( + ({ + label: formatPrefilterLabel(item.decision), + value: `${item.count}只`, + detail: `胜率 ${item.win_rate.toFixed(1)}% · 平均 ${item.avg_return > 0 ? "+" : ""}${item.avg_return.toFixed(2)}%`, + }))} + /> + ) : null} +
+ )} )} @@ -404,6 +433,54 @@ export default function RecommendationsPage() { ); } +function CompactInsightCard({ + title, + items, +}: { + title: string; + items: Array<{ label: string; value: string; detail: string }>; +}) { + return ( +
+
{title}
+
+ {items.map((item) => ( +
+
+
{item.label}
+
{item.detail}
+
+
{item.value}
+
+ ))} +
+
+ ); +} + +function formatRouteLabel(route: string): string { + const labels: Record = { + sector_recall: "主线召回", + trend_scan: "趋势召回", + intraday_active: "盘中异动", + hot_sector_core: "板块核心", + sector_leader: "前排线索", + moneyflow_support: "资金支撑", + volume_active: "量能活跃", + }; + return labels[route] ?? route; +} + +function formatPrefilterLabel(decision: string): string { + const labels: Record = { + priority: "AI优先深看", + watch: "AI保留观察", + ignore: "AI建议忽略", + unknown: "未记录", + }; + return labels[decision] ?? decision; +} + function FunnelWorkspace({ groups, activeKey, diff --git a/frontend/src/app/(auth)/stock/[code]/page.tsx b/frontend/src/app/(auth)/stock/[code]/page.tsx index 60f6fbf6..b5a3ad90 100644 --- a/frontend/src/app/(auth)/stock/[code]/page.tsx +++ b/frontend/src/app/(auth)/stock/[code]/page.tsx @@ -219,9 +219,18 @@ export default function StockDetailPage() {
- +
+ {(recommendation?.recall_tags?.length ?? 0) > 0 ? ( +
+ {(recommendation?.recall_tags ?? []).slice(0, 4).map((tag) => ( + + {formatRecallTag(tag)} + + ))} +
+ ) : null}
推荐 {formatDateTime(thesis?.data_freshness.recommendation_created_at)} 跟踪 {thesis?.data_freshness.tracking_date || "暂无"} @@ -303,17 +312,35 @@ function PlanCard({ ) : null}
+ {recommendation?.prefilter_reason ? : null} + {(recommendation?.focus_points?.length ?? 0) > 0 ? ( + + ) : null} - {recommendation ? : null} + {recommendation ? : null} + {recommendation ? : null} {trackingNote ? : null}
); } +function formatRecallTag(tag: string): string { + const labels: Record = { + sector_recall: "主线召回", + trend_scan: "趋势召回", + intraday_active: "盘中异动", + hot_sector_core: "板块核心", + sector_leader: "前排线索", + moneyflow_support: "资金支撑", + volume_active: "量能活跃", + }; + return labels[tag] ?? tag; +} + function EvidenceCard({ recommendation, quote, diff --git a/frontend/src/components/stock-card.tsx b/frontend/src/components/stock-card.tsx index fbdbcc4b..5256bb2f 100644 --- a/frontend/src/components/stock-card.tsx +++ b/frontend/src/components/stock-card.tsx @@ -6,6 +6,21 @@ import type { RecommendationData } from "@/lib/api"; export default function StockCard({ rec }: { rec: RecommendationData }) { const badge = getLevelBadge(rec.level); const aiConviction = rec.llm_score != null ? Math.round(rec.llm_score) : null; + const recallLabels: Record = { + sector_recall: "主线召回", + trend_scan: "趋势召回", + intraday_active: "盘中异动", + hot_sector_core: "板块核心", + sector_leader: "前排线索", + moneyflow_support: "资金支撑", + volume_active: "量能活跃", + }; + const prefilterLabel: Record = { + priority: "AI优先深看", + watch: "AI保留观察", + ignore: "AI建议忽略", + "": "待AI预筛", + }; // 入场信号标签 const signalTypeMap: Record = { @@ -40,9 +55,13 @@ export default function StockCard({ rec }: { rec: RecommendationData }) { "重点关注": "等待确认,不提前交易", "观察": "只记录,不主动出手", }; - const evidence = [rec.reasons?.[0], rec.entry_timing, rec.data_freshness] - .filter(Boolean) - .slice(0, 2) as string[]; + const evidence = [ + rec.prefilter_reason, + rec.focus_points?.[0], + rec.reasons?.[0], + rec.entry_timing, + rec.data_freshness, + ].filter(Boolean).slice(0, 3) as string[]; return (
@@ -131,7 +150,7 @@ export default function StockCard({ rec }: { rec: RecommendationData }) { {evidence.length > 0 && (
-
核心证据
+
AI 关注点
{evidence.map((item, index) => (
@@ -141,10 +160,28 @@ export default function StockCard({ rec }: { rec: RecommendationData }) { ))}
- 规则供需 {Math.round(rec.supply_demand_score ?? 0)} - 规则形态 {Math.round(rec.price_action_score ?? 0)} - 规则趋势 {Math.round(rec.technical_score ?? 0)} - 规则位置 {Math.round(rec.position_score ?? 50)} + {(rec.recall_tags ?? []).slice(0, 3).map((tag) => ( + + {recallLabels[tag] ?? tag} + + ))} + + {prefilterLabel[rec.prefilter_decision ?? ""] ?? "AI预筛"} + +
+
+ )} + + {(rec.focus_points?.length ?? 0) > 0 && ( +
+
深裁决前重点观察
+
+ {(rec.focus_points ?? []).slice(0, 3).map((item, index) => ( +
+ + {item} +
+ ))}
)} @@ -212,7 +249,7 @@ export default function StockCard({ rec }: { rec: RecommendationData }) {
- 详细推演在详情页归档 + 召回、预筛与推演链路已归档 {aiConviction != null && ( AI {aiConviction}/10 diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 2c88df1f..671ffb1d 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -138,6 +138,10 @@ export interface RecommendationData { entry_signal_type?: "breakout" | "pullback" | "launch" | "none"; llm_analysis?: string; llm_score?: number | null; + recall_tags?: string[]; + prefilter_decision?: "priority" | "watch" | "ignore" | ""; + prefilter_reason?: string; + focus_points?: string[]; scan_session: string; created_at: string | null; entry_timing?: string; @@ -231,6 +235,8 @@ export interface PerformanceStats { hit_target_count: number; hit_stop_count: number; lifecycle_counts: Record; + route_breakdown?: Array<{ route: string; count: number; win_rate: number; avg_return: number }>; + prefilter_breakdown?: Array<{ decision: string; count: number; win_rate: number; avg_return: number }>; details: TrackedRecommendation[]; } @@ -246,6 +252,8 @@ export interface TrackedRecommendation { status: string; action_plan?: string; lifecycle_status?: string; + recall_tags?: string[]; + prefilter_decision?: string; max_return_pct?: number; max_drawdown_pct?: number; days_since_recommendation?: number;