From d6bae2c8b6dcbb2fbd030dc06f8678ae301e680e Mon Sep 17 00:00:00 2001 From: aaron <> Date: Thu, 14 May 2026 11:10:17 +0800 Subject: [PATCH] update --- backend/app/analysis/sector_realtime.py | 63 +- backend/app/analysis/theme_mapper.py | 8 + backend/app/api/catalysts.py | 26 + backend/app/api/market.py | 80 +-- backend/app/api/recommendations.py | 18 +- backend/app/api/sectors.py | 36 +- backend/app/api/stocks.py | 10 + backend/app/catalyst/__init__.py | 5 + backend/app/catalyst/mapper.py | 262 ++++++++ backend/app/catalyst/models.py | 44 ++ backend/app/catalyst/service.py | 120 ++++ backend/app/data/eastmoney_client.py | 128 +++- backend/app/data/market_breadth_client.py | 15 + backend/app/data/models.py | 6 + backend/app/db/database.py | 2 + backend/app/db/tables.py | 30 + backend/app/engine/recommender.py | 52 ++ backend/app/engine/screener.py | 606 +++++++++++++++++- backend/app/llm/strategy_board.py | 8 - backend/app/llm/strategy_selector.py | 8 +- backend/app/main.py | 3 +- frontend/src/app/(auth)/chat/page.tsx | 14 +- frontend/src/app/(auth)/dashboard/page.tsx | 380 +++++------ frontend/src/app/(auth)/diagnose/page.tsx | 20 +- frontend/src/app/(auth)/layout.tsx | 4 +- .../src/app/(auth)/recommendations/page.tsx | 113 ++-- frontend/src/app/(auth)/sectors/page.tsx | 316 +++------ frontend/src/app/(auth)/stock/[code]/page.tsx | 49 +- frontend/src/app/(auth)/strategy/page.tsx | 13 +- frontend/src/app/(public)/login/page.tsx | 6 +- frontend/src/app/(public)/page.tsx | 10 +- frontend/src/app/layout.tsx | 6 +- frontend/src/components/market-temp.tsx | 3 +- frontend/src/components/nav.tsx | 8 +- frontend/src/components/sector-heatmap.tsx | 12 +- frontend/src/components/stock-card.tsx | 397 +++--------- frontend/src/lib/api.ts | 34 + 37 files changed, 1866 insertions(+), 1049 deletions(-) create mode 100644 backend/app/api/catalysts.py create mode 100644 backend/app/catalyst/__init__.py create mode 100644 backend/app/catalyst/mapper.py create mode 100644 backend/app/catalyst/models.py create mode 100644 backend/app/catalyst/service.py diff --git a/backend/app/analysis/sector_realtime.py b/backend/app/analysis/sector_realtime.py index d4a5bf29..2674cb01 100644 --- a/backend/app/analysis/sector_realtime.py +++ b/backend/app/analysis/sector_realtime.py @@ -5,14 +5,17 @@ """ import logging +import asyncio from app.config import should_prefer_realtime_today, today_trade_date +from app.data.cache import cache from app.data.eastmoney_client import get_sector_realtime_ranking from app.data.models import SectorInfo from app.data.tushare_client import tushare_client from app.analysis.theme_mapper import merge_sectors_to_themes logger = logging.getLogger(__name__) +_today_sector_board_tasks: dict[str, asyncio.Task] = {} def _match_sector_name(em_name: str, ts_name: str) -> bool: @@ -37,11 +40,14 @@ def _apply_empty_overlay(sector: SectorInfo) -> SectorInfo: sector.data_mode = "daily_snapshot" sector.source = sector.source or "snapshot" sector.board_type = sector.board_type or "snapshot" + sector.data_status = "snapshot" + sector.source_detail = "daily_snapshot" return sector def _sector_from_eastmoney(item: dict) -> SectorInfo: """把东方财富板块榜转换成今日展示用 SectorInfo。""" + source = item.get("source", "eastmoney") sector = SectorInfo( sector_code=item.get("sector_code", ""), sector_name=item.get("sector_name", ""), @@ -62,7 +68,9 @@ def _sector_from_eastmoney(item: dict) -> SectorInfo: realtime_down_count=int(item.get("down_count", 0) or 0), is_realtime=True, data_mode="realtime_today", - source=item.get("source", "eastmoney"), + source=source, + data_status=item.get("data_status", "fresh" if source == "eastmoney" else "fallback"), + source_detail=item.get("source_detail", "eastmoney_push2" if source == "eastmoney" else source), ) if item.get("leading_stock_name"): sector.leading_stocks_realtime = [{ @@ -77,16 +85,38 @@ def _sector_from_eastmoney(item: dict) -> SectorInfo: async def get_today_realtime_sector_board(limit: int = 20) -> list[SectorInfo]: """用实时行业榜 + 概念榜生成展示列表,作为 Tushare/定时扫描滞后的兜底。""" - industry_sectors = await get_sector_realtime_ranking(fs="m:90+t:2", page_size=max(limit, 20), notify=False) - concept_sectors = await get_sector_realtime_ranking(fs="m:90+t:3", page_size=max(limit, 20), notify=False) + cache_key = f"today_sector_board:{today_trade_date()}:{limit}" + cached = cache.get(cache_key) + if cached is not None: + return cached + + task = _today_sector_board_tasks.get(cache_key) + if task and not task.done(): + return await task + + task = asyncio.create_task(_load_today_realtime_sector_board(limit)) + _today_sector_board_tasks[cache_key] = task + try: + result = await task + cache.set(cache_key, result, ttl=60) + return result + finally: + if task.done(): + _today_sector_board_tasks.pop(cache_key, None) + + +async def _load_today_realtime_sector_board(limit: int) -> list[SectorInfo]: + industry_sectors, concept_sectors = await _load_eastmoney_sector_boards(limit=max(limit, 20)) em_sectors = industry_sectors + concept_sectors - if not em_sectors: + if not industry_sectors: try: from app.data.sina_client import get_sector_realtime_ranking_by_industry - em_sectors = await get_sector_realtime_ranking_by_industry(limit=max(limit, 20)) + sina_sectors = await get_sector_realtime_ranking_by_industry(limit=max(limit, 20)) + em_sectors.extend(sina_sectors) except Exception as e: logger.warning("新浪行业实时榜兜底失败: %s", e) - em_sectors = [] + if not em_sectors: + em_sectors = [] deduped = {} for item in em_sectors: @@ -100,6 +130,25 @@ async def get_today_realtime_sector_board(limit: int = 20) -> list[SectorInfo]: return sectors[:limit] +async def _load_eastmoney_sector_boards(limit: int) -> tuple[list[dict], list[dict]]: + """并行拉取行业/概念榜,允许部分成功。""" + import asyncio + + results = await asyncio.gather( + get_sector_realtime_ranking(fs="m:90+t:2", page_size=limit, notify=False), + get_sector_realtime_ranking(fs="m:90+t:3", page_size=limit, notify=False), + return_exceptions=True, + ) + boards: list[list[dict]] = [] + for label, result in zip(("行业", "概念"), results): + if isinstance(result, Exception): + logger.warning("东方财富%s实时榜失败: %s", label, result) + boards.append([]) + else: + boards.append(result) + return boards[0], boards[1] + + async def enrich_sectors_with_realtime(sectors: list[SectorInfo]) -> list[SectorInfo]: """按需为板块快照追加实时字段并重排。""" if not sectors: @@ -151,6 +200,8 @@ async def enrich_sectors_with_realtime(sectors: list[SectorInfo]) -> list[Sector sector.is_realtime = True sector.data_mode = "realtime_overlay" sector.source = em_data.get("source", "eastmoney") + sector.data_status = em_data.get("data_status", "fresh") + sector.source_detail = em_data.get("source_detail", sector.source) logger.info("板块实时覆盖: %s/%s 匹配成功", matched, len(sectors)) sectors.sort(key=lambda s: (s.realtime_pct_change if s.realtime_pct_change is not None else s.pct_change), reverse=True) diff --git a/backend/app/analysis/theme_mapper.py b/backend/app/analysis/theme_mapper.py index c56eaa87..1dd5ffbf 100644 --- a/backend/app/analysis/theme_mapper.py +++ b/backend/app/analysis/theme_mapper.py @@ -159,6 +159,14 @@ def merge_sectors_to_themes(sectors: list[SectorInfo], limit: int = 20) -> list[ existing.is_realtime = existing.is_realtime or sector.is_realtime if existing.data_mode == "daily_snapshot" and sector.data_mode != "daily_snapshot": existing.data_mode = sector.data_mode + if existing.data_status != "fresh" and sector.data_status == "fresh": + existing.data_status = "fresh" + existing.source_detail = sector.source_detail + elif existing.data_status == "fresh" and sector.data_status != "fresh": + pass + elif existing.data_status != sector.data_status: + existing.data_status = "mixed" + existing.source_detail = "mixed_status" if existing.source != sector.source: existing.source = "mixed" diff --git a/backend/app/api/catalysts.py b/backend/app/api/catalysts.py new file mode 100644 index 00000000..9d7f51f6 --- /dev/null +++ b/backend/app/api/catalysts.py @@ -0,0 +1,26 @@ +"""新闻/政策催化 API。""" + +from fastapi import APIRouter, Depends + +from app.catalyst.models import CatalystInput +from app.catalyst.service import build_theme_catalyst_scores, get_recent_catalysts, ingest_catalyst +from app.core.deps import get_current_admin + +router = APIRouter(prefix="/api/catalysts", tags=["catalysts"]) + + +@router.get("/recent") +async def recent(limit: int = 30, hours: int = 72): + return await get_recent_catalysts(limit=limit, hours=hours) + + +@router.get("/theme-scores") +async def theme_scores(hours: int = 72, limit: int = 20): + scores = await build_theme_catalyst_scores(hours=hours, limit=limit) + return [item.model_dump() for item in scores] + + +@router.post("/ingest") +async def ingest(item: CatalystInput, _admin: dict = Depends(get_current_admin)): + analysis = await ingest_catalyst(item, use_llm=True) + return analysis.model_dump() diff --git a/backend/app/api/market.py b/backend/app/api/market.py index 632eefa6..4590412a 100644 --- a/backend/app/api/market.py +++ b/backend/app/api/market.py @@ -4,13 +4,9 @@ from datetime import datetime from fastapi import APIRouter, Depends, HTTPException -from app.data.tushare_client import tushare_client -from app.data import tencent_client from app.data.cache import cache -from app.data.market_breadth_client import get_market_breadth -from app.analysis.market_temp import build_realtime_market_temperature, calculate_market_temperature from app.engine.recommender import get_latest_recommendations -from app.config import settings, is_trading_hours, is_market_session, should_prefer_realtime_today, today_trade_date +from app.config import settings, is_trading_hours, should_prefer_realtime_today, today_trade_date from app.core.deps import get_current_admin router = APIRouter(prefix="/api/market", tags=["market"]) @@ -18,17 +14,9 @@ router = APIRouter(prefix="/api/market", tags=["market"]) @router.get("/temperature") async def get_temperature(): - """获取市场温度。 - - 交易日 09:15 后优先做轻量实时计算,不触发完整扫描或 LLM。 - """ + """获取市场温度快照。页面访问只读数据库,不触发外部行情。""" result = await get_latest_recommendations() mt = result.get("market_temp") - realtime_used = False - if should_prefer_realtime_today(mt.trade_date if mt else None): - baseline = mt or calculate_market_temperature() - mt, realtime_used = await build_realtime_market_temperature(baseline) - breadth = await get_market_breadth() if realtime_used else None if mt: return { "trade_date": mt.trade_date, @@ -41,8 +29,8 @@ async def get_temperature(): "broken_rate": mt.broken_rate, "index_above_ma20": getattr(mt, "index_above_ma20", False), "is_trading": is_trading_hours(), - "data_mode": "realtime_today" if realtime_used else "daily_snapshot", - "limit_counts_reliable": breadth.limit_counts_reliable if breadth else False, + "data_mode": "daily_snapshot", + "limit_counts_reliable": False, } return { "trade_date": "", @@ -60,14 +48,12 @@ async def get_temperature(): @router.get("/overview") async def get_overview(): - """市场概况:上证、深证、创业板指数 + """市场概况快照。 - 盘中用腾讯实时行情,盘后用 Tushare 日线(有缓存)。 + 页面访问不拉腾讯/Tushare。当前库里还没有指数快照表,先返回空数组。 + 后续应由扫描任务把指数概览写入本地表后再展示。 """ - latest_trade_date = tushare_client.get_latest_trade_date() - if should_prefer_realtime_today(latest_trade_date): - return await _overview_realtime() - return _overview_daily() + return [] @router.get("/strategy-board") @@ -217,53 +203,3 @@ async def generate_strategy_iteration(limit: int = 50, _admin: dict = Depends(ge cache.delete(f"market:strategy_iteration:{limit}:rules") cache.delete("market:strategy_board:rules") return result - -async def _overview_realtime(): - """盘中:腾讯实时指数行情""" - index_data = await tencent_client.get_index_realtime() - result = [] - name_map = { - "000001.SH": "上证指数", - "399001.SZ": "深证成指", - "399006.SZ": "创业板指", - } - for code in ["000001.SH", "399001.SZ", "399006.SZ"]: - data = index_data.get(code) - if not data: - continue - result.append({ - "name": name_map.get(code, data.get("name", code)), - "code": code, - "close": round(data["price"], 2), - "pct_chg": round(data["pct_chg"], 2), - "volume": round(data["volume"], 2), - "realtime": True, - }) - return result - - -def _overview_daily(): - """盘后:Tushare 日线数据""" - indices = { - "上证指数": "000001.SH", - "深证成指": "399001.SZ", - "创业板指": "399006.SZ", - } - result = [] - for name, code in indices.items(): - df = tushare_client.get_index_daily(code, days=5) - if df.empty: - continue - df = df.sort_values("trade_date") - latest = df.iloc[-1] - prev = df.iloc[-2] if len(df) > 1 else latest - pct = (latest["close"] - prev["close"]) / prev["close"] * 100 - result.append({ - "name": name, - "code": code, - "close": round(float(latest["close"]), 2), - "pct_chg": round(pct, 2), - "volume": round(float(latest["vol"]), 2), - "realtime": False, - }) - return result diff --git a/backend/app/api/recommendations.py b/backend/app/api/recommendations.py index 351ae2ab..e3b90fb8 100644 --- a/backend/app/api/recommendations.py +++ b/backend/app/api/recommendations.py @@ -13,8 +13,7 @@ from app.engine.recommender import ( get_recommendation_history, get_performance_stats, ) -from app.config import is_trading_hours, should_prefer_realtime_today -from app.data.tushare_client import tushare_client +from app.config import is_trading_hours from app.core.deps import get_current_admin logger = logging.getLogger(__name__) @@ -29,13 +28,8 @@ async def get_latest(): anomalies = await get_latest_market_anomalies() mt = result.get("market_temp") - try: - from app.api.market import get_temperature - realtime_temp = await get_temperature() - except Exception: - realtime_temp = None return { - "market_temperature": realtime_temp or ({ + "market_temperature": ({ "trade_date": mt.trade_date if mt else "", "temperature": mt.temperature if mt else 0, "up_count": mt.up_count if mt else 0, @@ -81,6 +75,7 @@ async def get_latest(): "prefilter_decision": r.prefilter_decision, "prefilter_reason": r.prefilter_reason, "focus_points": r.focus_points, + "decision_trace": r.decision_trace, "strategy": r.strategy, "entry_signal_type": r.entry_signal_type, "scan_session": r.scan_session, @@ -163,13 +158,12 @@ async def update_tracking(_admin: dict = Depends(get_current_admin)): @router.get("/status") async def get_scan_status(): - """获取当前扫描状态信息""" - latest_trade_date = tushare_client.get_latest_trade_date() - prefer_realtime = should_prefer_realtime_today(latest_trade_date) + """获取当前扫描状态信息。只根据本地时间判断,不访问外部数据源。""" + prefer_realtime = is_trading_hours() return { "is_trading": is_trading_hours(), "scan_mode": "realtime_today" if prefer_realtime else "post_market", - "description": "今日实时分析优先" if prefer_realtime else "盘后分析(Tushare日级数据)", + "description": "交易时段,扫描任务会使用实时源" if prefer_realtime else "非交易时段,展示最近扫描结论", } diff --git a/backend/app/api/sectors.py b/backend/app/api/sectors.py index 344a2598..e1f4e4eb 100644 --- a/backend/app/api/sectors.py +++ b/backend/app/api/sectors.py @@ -2,8 +2,6 @@ from fastapi import APIRouter -from app.analysis.sector_realtime import enrich_sectors_with_realtime, get_today_realtime_sector_board -from app.config import should_prefer_realtime_today, today_trade_date from app.data.tushare_client import tushare_client from app.data.cache import cache from app.engine.recommender import get_latest_sectors @@ -13,17 +11,11 @@ router = APIRouter(prefix="/api/sectors", tags=["sectors"]) @router.get("/hot") async def get_hot_sectors(limit: int = 10): - """获取今日主线主题排名(盘中自动补充实时数据并统一归一)""" + """获取最新主线主题排名。 + + 页面访问只读数据库里的扫描结论,不在 GET 请求中拉取外部实时行情。 + """ sectors = await get_latest_sectors() - snapshot_trade_date = sectors[0].trade_date if sectors else "" - if should_prefer_realtime_today(snapshot_trade_date) or snapshot_trade_date != today_trade_date(): - realtime_sectors = await get_today_realtime_sector_board(limit=max(limit, 20)) - if realtime_sectors: - sectors = realtime_sectors - else: - sectors = await enrich_sectors_with_realtime(sectors) - else: - sectors = await enrich_sectors_with_realtime(sectors) trade_date = sectors[0].trade_date if sectors else "" sectors_data = [ @@ -57,18 +49,38 @@ async def get_hot_sectors(limit: int = 10): "is_realtime": s.is_realtime, "data_mode": s.data_mode, "source": s.source, + "data_status": s.data_status, + "source_detail": s.source_detail, + "catalyst_score": s.catalyst_score, + "catalyst_count": s.catalyst_count, + "catalyst_reasons": s.catalyst_reasons, } for s in sectors[:limit] ] realtime_enabled = any(s.get("is_realtime") for s in sectors_data) mode = sectors[0].data_mode if realtime_enabled and sectors else "daily_snapshot" + status = _derive_status(sectors_data) for s in sectors_data: s["data_mode"] = mode + s["data_status"] = status s["structure_trade_date"] = trade_date return sectors_data +def _derive_status(sectors: list[dict]) -> str: + statuses = {str(s.get("data_status") or "fresh") for s in sectors} + if not statuses: + return "snapshot" + if "fresh" in statuses: + return "fresh" if len(statuses) == 1 else "mixed" + if "stale" in statuses: + return "stale" + if "fallback" in statuses: + return "fallback" + return next(iter(statuses)) + + @router.get("/rotation") async def get_sector_rotation(days: int = 5): """获取近N日板块轮动数据(用于热力图)""" diff --git a/backend/app/api/stocks.py b/backend/app/api/stocks.py index 5cac0903..0eed9601 100644 --- a/backend/app/api/stocks.py +++ b/backend/app/api/stocks.py @@ -109,6 +109,15 @@ async def get_stock_thesis(ts_code: str): except Exception: return [] + def _safe_json_dict(value: str | None) -> dict: + if not value: + return {} + try: + parsed = json.loads(value) + return parsed if isinstance(parsed, dict) else {} + except Exception: + return {} + tracking_history = [] for row in tracking_rows: t = row._mapping @@ -181,6 +190,7 @@ async def get_stock_thesis(ts_code: str): "prefilter_decision": r.get("prefilter_decision") or "", "prefilter_reason": r.get("prefilter_reason") or "", "focus_points": _safe_json_list(r.get("focus_points")), + "decision_trace": _safe_json_dict(r.get("decision_trace")), "strategy": r["strategy"] or "trend_breakout", "entry_signal_type": r["entry_signal_type"] or "none", "entry_timing": r["entry_timing"] or "", diff --git a/backend/app/catalyst/__init__.py b/backend/app/catalyst/__init__.py new file mode 100644 index 00000000..169fb7b9 --- /dev/null +++ b/backend/app/catalyst/__init__.py @@ -0,0 +1,5 @@ +"""新闻/政策催化理解层。 + +LLM 只在这里做非结构化文本归因,不直接决定买卖。 +""" + diff --git a/backend/app/catalyst/mapper.py b/backend/app/catalyst/mapper.py new file mode 100644 index 00000000..e58b433d --- /dev/null +++ b/backend/app/catalyst/mapper.py @@ -0,0 +1,262 @@ +"""新闻/政策催化归因。 + +边界: +- LLM 只负责把文本映射到题材、提炼催化类型和解释。 +- 行情、资金、最终动作仍由规则引擎决定。 +""" + +from __future__ import annotations + +import json +import logging +import re +from datetime import datetime, timezone + +from app.analysis.theme_mapper import THEME_ALIASES, THEME_NAMES, resolve_theme +from app.catalyst.models import CatalystAnalysis, CatalystInput, CatalystTheme +from app.config import settings + +logger = logging.getLogger(__name__) + +CATALYST_TYPE_KEYWORDS = { + "policy": ["政策", "工信部", "发改委", "国务院", "证监会", "财政部", "规划", "指导意见", "补贴"], + "industry": ["订单", "需求", "涨价", "产能", "景气", "出口", "交付", "装机", "销量"], + "event": ["大会", "发布会", "展会", "会议", "试点", "招标", "中标", "事故"], + "earnings": ["业绩", "净利润", "营收", "预增", "扭亏", "年报", "季报"], + "announcement": ["公告", "重组", "并购", "定增", "回购", "签订合同"], +} + +STRENGTH_KEYWORDS = { + 18: ["重大", "重磅", "首次", "超预期", "全面", "国家级"], + 12: ["政策", "补贴", "涨价", "订单", "中标", "突破"], + 8: ["试点", "规划", "发布", "扩产", "合作"], +} + + +async def analyze_catalyst(item: CatalystInput, use_llm: bool = True) -> CatalystAnalysis: + """分析单条催化文本,LLM 不可用时使用规则归因。""" + rule_result = _analyze_by_rules(item) + if not use_llm or not settings.deepseek_api_key: + return rule_result + + llm_result = await _analyze_by_llm(item, rule_result) + return llm_result or rule_result + + +def _analyze_by_rules(item: CatalystInput) -> CatalystAnalysis: + text = f"{item.title}\n{item.content}".strip() + themes = _match_themes(text) + catalyst_type = _infer_catalyst_type(text) + strength = _score_strength(text, themes, catalyst_type) + freshness = _score_freshness(item.published_at) + confidence = 45 + min(len(themes) * 12, 35) + if catalyst_type in {"policy", "announcement"}: + confidence += 8 + + return CatalystAnalysis( + title=item.title, + summary=_summarize_text(item.content or item.title), + source=item.source, + url=item.url, + published_at=item.published_at, + catalyst_type=catalyst_type, + strength=min(strength, 100), + freshness=freshness, + confidence=min(confidence, 90), + themes=themes, + raw_text=text, + generated_by="rules", + ) + + +async def _analyze_by_llm(item: CatalystInput, baseline: CatalystAnalysis) -> CatalystAnalysis | None: + from app.llm.client import get_client + + client = get_client() + if not client: + return None + + aliases_text = "\n".join( + f"- {THEME_NAMES[theme_id]}: {', '.join(aliases[:8])}" + for theme_id, aliases in THEME_ALIASES.items() + ) + user_text = f"""\ +请把下面新闻/政策/公告归因到 A 股题材。只做语义归因,不给买卖建议。 + +## 可选系统题材 +{aliases_text} + +## 文本 +标题: {item.title} +来源: {item.source} +正文: {(item.content or '')[:1600]} + +请严格输出 JSON: +{{ + "summary": "一句话摘要", + "catalyst_type": "policy | industry | event | earnings | announcement | news", + "strength": 0-100, + "confidence": 0-100, + "themes": [ + {{"theme_name": "系统题材名或新题材名", "relevance": 0-100, "reason": "一句话"}} + ], + "reason": "为什么这么归因" +}}""" + + try: + response = await client.chat.completions.create( + model=settings.deepseek_model, + messages=[ + { + "role": "system", + "content": ( + "你是A股新闻催化归因器。" + "你只能做题材归因、催化类型和强度判断,不能输出买入卖出建议。" + "必须返回合法JSON。" + ), + }, + {"role": "user", "content": user_text}, + ], + max_tokens=700, + temperature=0.1, + ) + data = _extract_json(response.choices[0].message.content or "") + if not data: + return None + + themes = [] + for raw_theme in data.get("themes", [])[:5]: + theme_name = str(raw_theme.get("theme_name", "")).strip() + if not theme_name: + continue + theme_id, resolved_name, _ = resolve_theme(theme_name) + themes.append(CatalystTheme( + theme_id=theme_id, + theme_name=resolved_name, + relevance=_clamp_float(raw_theme.get("relevance"), 0, 100, 60), + reason=str(raw_theme.get("reason", "")).strip(), + )) + + if not themes: + themes = baseline.themes + + return CatalystAnalysis( + title=item.title, + summary=str(data.get("summary", "")).strip() or baseline.summary, + source=item.source, + url=item.url, + published_at=item.published_at, + catalyst_type=_normalize_type(data.get("catalyst_type")) or baseline.catalyst_type, + strength=_clamp_float(data.get("strength"), 0, 100, baseline.strength), + freshness=baseline.freshness, + confidence=_clamp_float(data.get("confidence"), 0, 100, baseline.confidence), + themes=themes, + raw_text=baseline.raw_text, + llm_reason=str(data.get("reason", "")).strip(), + generated_by="llm", + ) + except Exception as e: + logger.warning("LLM 催化归因失败: %s", e) + return None + + +def _match_themes(text: str) -> list[CatalystTheme]: + clean_text = _clean(text) + matched: list[CatalystTheme] = [] + for theme_id, aliases in THEME_ALIASES.items(): + hits = [] + for alias in aliases: + alias_clean = _clean(alias) + if alias_clean and alias_clean in clean_text: + hits.append(alias) + if not hits: + continue + relevance = min(55 + len(hits) * 12, 95) + matched.append(CatalystTheme( + theme_id=theme_id, + theme_name=THEME_NAMES[theme_id], + relevance=relevance, + reason=f"命中关键词: {'/'.join(hits[:3])}", + )) + matched.sort(key=lambda item: item.relevance, reverse=True) + return matched[:5] + + +def _infer_catalyst_type(text: str) -> str: + for catalyst_type, keywords in CATALYST_TYPE_KEYWORDS.items(): + if any(keyword in text for keyword in keywords): + return catalyst_type + return "news" + + +def _score_strength(text: str, themes: list[CatalystTheme], catalyst_type: str) -> float: + score = 35.0 + if themes: + score += min(max(theme.relevance for theme in themes) * 0.25, 25) + for bonus, keywords in STRENGTH_KEYWORDS.items(): + if any(keyword in text for keyword in keywords): + score += bonus + if catalyst_type == "policy": + score += 10 + elif catalyst_type == "announcement": + score += 6 + return round(min(score, 100), 1) + + +def _score_freshness(published_at: datetime | None) -> float: + if not published_at: + return 70 + dt = published_at + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + hours = max((datetime.now(timezone.utc) - dt.astimezone(timezone.utc)).total_seconds() / 3600, 0) + if hours <= 6: + return 100 + if hours <= 24: + return 90 + if hours <= 72: + return 70 + if hours <= 168: + return 45 + return 20 + + +def _summarize_text(text: str) -> str: + value = re.sub(r"\s+", " ", text or "").strip() + return value[:120] + + +def _clean(value: str) -> str: + return re.sub(r"[\s_\-()()【】\[\]、,。::]+", "", value or "") + + +def _extract_json(text: str) -> dict: + text = (text or "").strip() + if text.startswith("```"): + text = re.sub(r"^```(?:json)?", "", text).strip() + text = re.sub(r"```$", "", text).strip() + try: + return json.loads(text) + except Exception: + pass + start = text.find("{") + end = text.rfind("}") + if start >= 0 and end > start: + try: + return json.loads(text[start:end + 1]) + except Exception: + return {} + return {} + + +def _normalize_type(value) -> str: + text = str(value or "").strip().lower() + return text if text in {"policy", "industry", "event", "earnings", "announcement", "news"} else "" + + +def _clamp_float(value, minimum: float, maximum: float, default: float) -> float: + try: + num = float(value) + except (TypeError, ValueError): + return default + return max(minimum, min(maximum, num)) diff --git a/backend/app/catalyst/models.py b/backend/app/catalyst/models.py new file mode 100644 index 00000000..a8aa34fe --- /dev/null +++ b/backend/app/catalyst/models.py @@ -0,0 +1,44 @@ +"""催化事件领域模型。""" + +from datetime import datetime +from pydantic import BaseModel, Field + + +class CatalystTheme(BaseModel): + theme_id: str + theme_name: str + relevance: float = Field(default=0, ge=0, le=100) + reason: str = "" + + +class CatalystInput(BaseModel): + title: str + content: str = "" + source: str = "manual" + url: str = "" + published_at: datetime | None = None + + +class CatalystAnalysis(BaseModel): + title: str + summary: str = "" + source: str = "manual" + url: str = "" + published_at: datetime | None = None + catalyst_type: str = "news" + strength: float = Field(default=0, ge=0, le=100) + freshness: float = Field(default=0, ge=0, le=100) + confidence: float = Field(default=0, ge=0, le=100) + themes: list[CatalystTheme] = [] + raw_text: str = "" + llm_reason: str = "" + generated_by: str = "rules" + + +class ThemeCatalystScore(BaseModel): + theme_id: str + theme_name: str + catalyst_score: float = 0 + catalyst_count: int = 0 + top_reasons: list[str] = [] + generated_by: str = "rules" diff --git a/backend/app/catalyst/service.py b/backend/app/catalyst/service.py new file mode 100644 index 00000000..b1791f16 --- /dev/null +++ b/backend/app/catalyst/service.py @@ -0,0 +1,120 @@ +"""催化事件存储与主题分数聚合。""" + +from __future__ import annotations + +import logging +from datetime import datetime, timedelta + +from sqlalchemy import text + +from app.catalyst.mapper import analyze_catalyst +from app.catalyst.models import CatalystAnalysis, CatalystInput, ThemeCatalystScore +from app.db import tables +from app.db.database import get_db + +logger = logging.getLogger(__name__) + + +async def ingest_catalyst(item: CatalystInput, use_llm: bool = True) -> CatalystAnalysis: + analysis = await analyze_catalyst(item, use_llm=use_llm) + await save_catalyst(analysis) + return analysis + + +async def save_catalyst(analysis: CatalystAnalysis) -> int: + async with get_db() as db: + result = await db.execute( + tables.catalysts_table.insert().values( + title=analysis.title, + summary=analysis.summary, + source=analysis.source, + url=analysis.url, + published_at=analysis.published_at, + catalyst_type=analysis.catalyst_type, + strength=analysis.strength, + freshness=analysis.freshness, + confidence=analysis.confidence, + raw_text=analysis.raw_text, + llm_reason=analysis.llm_reason, + is_active=True, + ) + ) + catalyst_id = int(result.inserted_primary_key[0]) + if analysis.themes: + await db.execute( + tables.theme_catalysts_table.insert(), + [ + { + "catalyst_id": catalyst_id, + "theme_id": theme.theme_id, + "theme_name": theme.theme_name, + "relevance": theme.relevance, + "reason": theme.reason, + } + for theme in analysis.themes + ], + ) + await db.commit() + return catalyst_id + + +async def get_recent_catalysts(limit: int = 30, hours: int = 72) -> list[dict]: + since = datetime.now() - timedelta(hours=hours) + async with get_db() as db: + result = await db.execute( + text( + "SELECT c.*, " + "GROUP_CONCAT(tc.theme_name || ':' || ROUND(tc.relevance, 0), ',') AS themes " + "FROM catalysts c " + "LEFT JOIN theme_catalysts tc ON tc.catalyst_id = c.id " + "WHERE c.is_active = 1 AND COALESCE(c.published_at, c.created_at) >= :since " + "GROUP BY c.id " + "ORDER BY COALESCE(c.published_at, c.created_at) DESC, c.id DESC " + "LIMIT :limit" + ), + {"since": since, "limit": limit}, + ) + rows = result.mappings().all() + return [dict(row) for row in rows] + + +async def build_theme_catalyst_scores(hours: int = 72, limit: int = 20) -> list[ThemeCatalystScore]: + since = datetime.now() - timedelta(hours=hours) + async with get_db() as db: + rows = ( + await db.execute( + text( + "SELECT tc.theme_id, tc.theme_name, " + "COUNT(*) AS catalyst_count, " + "SUM((c.strength * 0.45 + c.freshness * 0.25 + c.confidence * 0.15 + tc.relevance * 0.15)) AS raw_score, " + "GROUP_CONCAT(SUBSTR(COALESCE(tc.reason, c.summary, c.title), 1, 60), ' | ') AS reasons " + "FROM theme_catalysts tc " + "JOIN catalysts c ON c.id = tc.catalyst_id " + "WHERE c.is_active = 1 AND COALESCE(c.published_at, c.created_at) >= :since " + "GROUP BY tc.theme_id, tc.theme_name " + "ORDER BY raw_score DESC " + "LIMIT :limit" + ), + {"since": since, "limit": limit}, + ) + ).mappings().all() + + scores = [] + for row in rows: + raw = float(row.get("raw_score") or 0) + count = int(row.get("catalyst_count") or 0) + normalized = min(raw / max(count, 1), 100) + reasons = [ + item.strip() + for item in str(row.get("reasons") or "").split("|") + if item.strip() + ][:3] + scores.append(ThemeCatalystScore( + theme_id=row["theme_id"], + theme_name=row["theme_name"], + catalyst_score=round(normalized, 1), + catalyst_count=count, + top_reasons=reasons, + generated_by="catalyst_layer", + )) + return scores diff --git a/backend/app/data/eastmoney_client.py b/backend/app/data/eastmoney_client.py index fafdd9e0..7df4271b 100644 --- a/backend/app/data/eastmoney_client.py +++ b/backend/app/data/eastmoney_client.py @@ -8,6 +8,9 @@ """ import logging +import random +import time +import asyncio import httpx import pandas as pd from datetime import datetime @@ -29,6 +32,11 @@ SECTOR_HEADERS = { "Referer": "https://data.eastmoney.com", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", } +SECTOR_RETRY_COUNT = 2 +SECTOR_RETRY_BASE_DELAY = 0.35 +SECTOR_CIRCUIT_BREAKER_SECONDS = 180 +_sector_failure_state: dict[str, dict] = {} +_ashare_realtime_tasks: dict[str, asyncio.Task] = {} def _describe_exception(exc: Exception) -> str: @@ -38,6 +46,16 @@ def _describe_exception(exc: Exception) -> str: return exc.__class__.__name__ +def _mark_rows_status(rows: list[dict], status: str, detail: str) -> list[dict]: + result = [] + for row in rows: + item = dict(row) + item["data_status"] = status + item["source_detail"] = detail + result.append(item) + return result + + def _ts_code_to_eastmoney(ts_code: str) -> str: """600519.SH -> 1.600519 (上海=1, 深圳=0)""" code, market = ts_code.split(".") @@ -84,6 +102,21 @@ async def get_sector_realtime_ranking( if cached is not None: return cached + stale_cache_key = f"{cache_key}:last_success" + circuit_key = f"{fs}:{sort_by}" + failure_state = _sector_failure_state.get(circuit_key, {}) + circuit_until = float(failure_state.get("circuit_until", 0) or 0) + if time.time() < circuit_until: + stale = cache.get(stale_cache_key) + if stale is not None: + logger.warning( + "东方财富板块实时排名熔断中,返回最近成功快照: fs=%s sort_by=%s", + fs, + sort_by, + ) + return _mark_rows_status(stale, status="stale", detail="eastmoney_circuit_open") + return [] + params = { "pn": "1", "pz": str(page_size), @@ -99,18 +132,38 @@ async def get_sector_realtime_ranking( "fields": "f2,f3,f4,f6,f8,f12,f14,f104,f105,f128,f140,f141", } - try: - board_type = "industry" if fs == "m:90+t:2" else "concept" if fs == "m:90+t:3" else "region" if fs == "m:90+t:1" else "unknown" - async with httpx.AsyncClient() as client: - resp = await client.get( - SECTOR_LIST_URL, - params=params, - headers=SECTOR_HEADERS, - timeout=10, - follow_redirects=True, + board_type = "industry" if fs == "m:90+t:2" else "concept" if fs == "m:90+t:3" else "region" if fs == "m:90+t:1" else "unknown" + last_error: Exception | None = None + data: dict = {} + for attempt in range(SECTOR_RETRY_COUNT + 1): + try: + async with httpx.AsyncClient() as client: + resp = await client.get( + SECTOR_LIST_URL, + params=params, + headers=SECTOR_HEADERS, + timeout=10, + follow_redirects=True, + ) + data = _parse_eastmoney_json(resp, "板块实时排名") + break + except Exception as e: + last_error = e + if attempt >= SECTOR_RETRY_COUNT: + data = {} + break + delay = SECTOR_RETRY_BASE_DELAY * (2 ** attempt) + random.uniform(0, 0.2) + logger.warning( + "东方财富板块实时排名获取失败,准备重试 %s/%s: %s", + attempt + 1, + SECTOR_RETRY_COUNT, + _describe_exception(e), ) - data = _parse_eastmoney_json(resp, "板块实时排名") + await asyncio.sleep(delay) + try: + if last_error and not data: + raise last_error items = data.get("data", {}).get("diff", []) if not items: logger.debug("东方财富板块实时排名无数据") @@ -134,21 +187,45 @@ async def get_sector_realtime_ranking( "leading_stock_name": item.get("f128", ""), "leading_stock_code": item.get("f140", ""), "leading_stock_pct": float(item.get("f141", 0) or 0), + "source": "eastmoney", + "data_status": "fresh", + "source_detail": "eastmoney_push2", }) # 缓存:盘中 60 秒,盘后 300 秒 ttl = 60 if _is_trading_hours() else 300 cache.set(cache_key, result, ttl) + cache.set(stale_cache_key, result, 60 * 60 * 6) + _sector_failure_state.pop(circuit_key, None) logger.info(f"东方财富板块实时排名: 获取 {len(result)} 个板块") return result except Exception as e: - logger.error(f"东方财富板块实时排名获取失败: {e}") - if notify: + failure_count = int(failure_state.get("failure_count", 0) or 0) + 1 + _sector_failure_state[circuit_key] = { + "failure_count": failure_count, + "circuit_until": time.time() + SECTOR_CIRCUIT_BREAKER_SECONDS, + "last_error": _describe_exception(e), + } + stale = cache.get(stale_cache_key) + if stale is not None: + logger.warning("东方财富板块实时排名获取失败,返回最近成功快照: %s", _describe_exception(e)) + if notify and failure_count >= 3: + await log_error( + "eastmoney", + f"东方财富板块实时排名连续失败,已使用最近成功快照: {_describe_exception(e)}", + detail=f"fs={fs}, sort_by={sort_by}, page_size={page_size}, failure_count={failure_count}", + level="warning", + notify=False, + ) + return _mark_rows_status(stale, status="stale", detail="eastmoney_last_success") + + logger.error(f"东方财富板块实时排名获取失败且无可用快照: {e}") + if notify and failure_count >= 2: await log_error( "eastmoney", f"东方财富板块实时排名获取失败: {e}", - detail=f"fs={fs}, sort_by={sort_by}, page_size={page_size}", + detail=f"fs={fs}, sort_by={sort_by}, page_size={page_size}, failure_count={failure_count}", ) return [] @@ -164,6 +241,31 @@ async def get_a_share_realtime_ranking( if cached is not None: return cached + task = _ashare_realtime_tasks.get(cache_key) + if task and not task.done(): + return await task + + task = asyncio.create_task(_load_a_share_realtime_ranking( + cache_key=cache_key, + sort_by=sort_by, + descending=descending, + page_size=page_size, + )) + _ashare_realtime_tasks[cache_key] = task + try: + return await task + finally: + if task.done(): + _ashare_realtime_tasks.pop(cache_key, None) + + +async def _load_a_share_realtime_ranking( + cache_key: str, + sort_by: str, + descending: bool, + page_size: int, +) -> list[dict]: + fs = "m:0+t:6,m:0+t:80,m:0+t:81+s:2048,m:1+t:2,m:1+t:23" fields = "f2,f3,f6,f8,f9,f12,f14,f20,f21,f23,f62" diff --git a/backend/app/data/market_breadth_client.py b/backend/app/data/market_breadth_client.py index e0d2247b..39158b7d 100644 --- a/backend/app/data/market_breadth_client.py +++ b/backend/app/data/market_breadth_client.py @@ -11,6 +11,7 @@ from __future__ import annotations import logging +import asyncio import httpx @@ -25,15 +26,29 @@ logger = logging.getLogger(__name__) ZT_POOL_URL = "https://push2ex.eastmoney.com/getTopicZTPool" DT_POOL_URL = "https://push2ex.eastmoney.com/getTopicDTPool" MIN_RELIABLE_SAMPLE_COUNT = 4500 +_market_breadth_task: asyncio.Task | None = None async def get_market_breadth() -> MarketBreadth: """获取市场广度快照。""" + global _market_breadth_task cache_key = f"market_breadth:{today_trade_date()}" cached = cache.get(cache_key) if cached is not None: return cached + if _market_breadth_task and not _market_breadth_task.done(): + return await _market_breadth_task + + _market_breadth_task = asyncio.create_task(_load_market_breadth(cache_key)) + try: + return await _market_breadth_task + finally: + if _market_breadth_task.done(): + _market_breadth_task = None + + +async def _load_market_breadth(cache_key: str) -> MarketBreadth: quotes = await get_a_share_realtime_ranking(page_size=6000) if quotes and len(quotes) >= MIN_RELIABLE_SAMPLE_COUNT: up_count = sum(1 for q in quotes if q.get("pct_chg", 0) > 0) diff --git a/backend/app/data/models.py b/backend/app/data/models.py index 54510898..b0ecf18d 100644 --- a/backend/app/data/models.py +++ b/backend/app/data/models.py @@ -85,6 +85,11 @@ class SectorInfo(BaseModel): is_realtime: bool = False data_mode: str = "daily_snapshot" source: str = "snapshot" + data_status: str = "fresh" # fresh / stale / fallback / snapshot + source_detail: str = "" + catalyst_score: float = 0 + catalyst_count: int = 0 + catalyst_reasons: list[str] = [] class MarketTemperature(BaseModel): @@ -175,6 +180,7 @@ class Recommendation(BaseModel): prefilter_decision: str = "" prefilter_reason: str = "" focus_points: list[str] = [] + decision_trace: dict = {} scan_session: str = "" created_at: datetime | None = None diff --git a/backend/app/db/database.py b/backend/app/db/database.py index a1b3ed54..2527baec 100644 --- a/backend/app/db/database.py +++ b/backend/app/db/database.py @@ -59,6 +59,7 @@ async def init_db(): "ALTER TABLE recommendations ADD COLUMN prefilter_decision TEXT DEFAULT ''", "ALTER TABLE recommendations ADD COLUMN prefilter_reason TEXT DEFAULT ''", "ALTER TABLE recommendations ADD COLUMN focus_points TEXT DEFAULT '[]'", + "ALTER TABLE recommendations ADD COLUMN decision_trace TEXT DEFAULT '{}'", "ALTER TABLE sector_heat ADD COLUMN stage TEXT", "ALTER TABLE sector_heat ADD COLUMN board_type TEXT DEFAULT 'theme'", "ALTER TABLE sector_heat ADD COLUMN theme_id TEXT DEFAULT ''", @@ -97,6 +98,7 @@ async def init_db(): "ALTER TABLE strategy_configs ADD COLUMN effective_from DATETIME DEFAULT CURRENT_TIMESTAMP", "ALTER TABLE prompt_configs ADD COLUMN evidence_json TEXT DEFAULT '{}'", "ALTER TABLE strategy_config_changes ADD COLUMN prompt_key TEXT DEFAULT ''", + "ALTER TABLE catalysts ADD COLUMN llm_reason TEXT DEFAULT ''", ]: try: await conn.execute( diff --git a/backend/app/db/tables.py b/backend/app/db/tables.py index f2086e5b..d28cbaca 100644 --- a/backend/app/db/tables.py +++ b/backend/app/db/tables.py @@ -44,6 +44,7 @@ recommendations_table = Table( Column("prefilter_decision", Text, default=""), Column("prefilter_reason", Text, default=""), Column("focus_points", Text, default="[]"), + Column("decision_trace", Text, default="{}"), Column("scan_session", Text), Column("created_at", DateTime, server_default=func.now()), ) @@ -238,3 +239,32 @@ strategy_config_changes_table = Table( Column("created_at", DateTime, server_default=func.now()), Column("applied_at", DateTime), ) + +catalysts_table = Table( + "catalysts", metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("title", Text, nullable=False), + Column("summary", Text, default=""), + Column("source", Text, default="manual"), + Column("url", Text, default=""), + Column("published_at", DateTime), + Column("catalyst_type", Text, default="news"), + Column("strength", Float, default=0), + Column("freshness", Float, default=0), + Column("confidence", Float, default=0), + Column("is_active", Boolean, default=True), + Column("raw_text", Text, default=""), + Column("llm_reason", Text, default=""), + Column("created_at", DateTime, server_default=func.now()), +) + +theme_catalysts_table = Table( + "theme_catalysts", metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("catalyst_id", Integer, nullable=False), + Column("theme_id", Text, nullable=False), + Column("theme_name", Text, nullable=False), + Column("relevance", Float, default=0), + Column("reason", Text, default=""), + Column("created_at", DateTime, server_default=func.now()), +) diff --git a/backend/app/engine/recommender.py b/backend/app/engine/recommender.py index 3bb55663..f1968389 100644 --- a/backend/app/engine/recommender.py +++ b/backend/app/engine/recommender.py @@ -36,6 +36,55 @@ def _has_valid_market_breadth(market_temp: MarketTemperature | None) -> bool: return (market_temp.up_count or 0) + (market_temp.down_count or 0) > 0 +def _safe_json_dict(value) -> dict: + if not value: + return {} + if isinstance(value, dict): + return value + try: + parsed = json.loads(value) + return parsed if isinstance(parsed, dict) else {} + except Exception: + return {} + + +def _safe_json_list_value(value) -> list: + if not value: + return [] + if isinstance(value, list): + return value + try: + parsed = json.loads(value) + return parsed if isinstance(parsed, list) else [] + except Exception: + return [] + + +def _build_legacy_decision_trace(row) -> dict: + r = row._mapping if hasattr(row, "_mapping") else row + action_plan = r.get("action_plan") or "观察" + score = float(r.get("score") or 0) + tags = _safe_json_list_value(r.get("recall_tags")) + reasons = _safe_json_list_value(r.get("reasons")) + return { + "version": 0, + "headline": f"{action_plan}: {r.get('sector') or '未归类'}候选,综合分{score:.0f}", + "action_plan": action_plan, + "final_score": round(score, 1), + "route_tags": tags, + "evidence": reasons[:3], + "score_breakdown": [ + {"key": "sector", "label": "主题热度", "score": round(float(r.get("sector_score") or 0), 1), "weight": 0}, + {"key": "capital", "label": "资金", "score": round(float(r.get("capital_score") or 0), 1), "weight": 0}, + {"key": "technical", "label": "技术", "score": round(float(r.get("technical_score") or 0), 1), "weight": 0}, + ], + "boosts": [], + "penalties": [], + "risk_tags": [], + "llm_adjustment": None, + } + + async def refresh_recommendations(trade_date: str = None, scan_session: str = "manual") -> dict: """刷新推荐列表(带扫描锁防止并发)""" global _scan_running @@ -594,6 +643,7 @@ async def get_recommendation_history(days: int = 7) -> list[dict]: "prefilter_decision": r.get("prefilter_decision") or "", "prefilter_reason": r.get("prefilter_reason") or "", "focus_points": json.loads(r.get("focus_points") or "[]"), + "decision_trace": _safe_json_dict(r.get("decision_trace")) or _build_legacy_decision_trace(r), "tracking": { "current_price": r.get("latest_current_price"), "pct_from_entry": r.get("latest_pct_from_entry"), @@ -812,6 +862,7 @@ async def _save_to_db(result: dict): "prefilter_decision": rec.prefilter_decision, "prefilter_reason": rec.prefilter_reason, "focus_points": json.dumps(rec.focus_points, ensure_ascii=False), + "decision_trace": json.dumps(rec.decision_trace, ensure_ascii=False), "scan_session": rec.scan_session, "created_at": now_dt, } @@ -916,6 +967,7 @@ async def _load_today_from_db() -> dict: prefilter_decision=r.get("prefilter_decision") or "", prefilter_reason=r.get("prefilter_reason") or "", focus_points=json.loads(r.get("focus_points") or "[]"), + decision_trace=_safe_json_dict(r.get("decision_trace")) or _build_legacy_decision_trace(r), scan_session=r["scan_session"] or "", )) diff --git a/backend/app/engine/screener.py b/backend/app/engine/screener.py index 1c5ebdd9..44aae676 100644 --- a/backend/app/engine/screener.py +++ b/backend/app/engine/screener.py @@ -40,6 +40,7 @@ from app.data.models import MarketTemperature, SectorInfo, TechnicalSignal, Reco from app.config import settings, should_prefer_realtime_today from app.data.tushare_client import tushare_client from app.llm.strategy_selector import StrategyProfile, select_strategy_profile +from app.catalyst.service import build_theme_catalyst_scores logger = logging.getLogger(__name__) @@ -96,6 +97,8 @@ async def run_screening(trade_date: str = None) -> dict: logger.info("无合格主线主题(需要资金/实时强度+非尾声),回退到全部主题") hot_sectors = all_themes[:settings.top_sector_count] + hot_sectors = await _apply_catalyst_scores(hot_sectors) + for s in hot_sectors: logger.info(f" 目标主题: {s.sector_name} 涨幅{s.pct_change}% 资金{s.capital_inflow:.0f}万 " f"涨停{s.limit_up_count} 阶段={s.stage}") @@ -187,6 +190,38 @@ async def run_screening(trade_date: str = None) -> dict: } +async def _apply_catalyst_scores(sectors: list[SectorInfo]) -> list[SectorInfo]: + if not sectors: + return sectors + try: + scores = await build_theme_catalyst_scores(hours=72, limit=50) + except Exception as e: + logger.warning("催化分数加载失败,跳过主题催化加权: %s", e) + return sectors + if not scores: + return sectors + score_map = {item.theme_id: item for item in scores} + name_map = {item.theme_name: item for item in scores} + for sector in sectors: + item = score_map.get(sector.theme_id) or name_map.get(sector.sector_name) + if not item: + continue + sector.catalyst_score = item.catalyst_score + sector.catalyst_count = item.catalyst_count + sector.catalyst_reasons = item.top_reasons + # 催化只增强主线优先级,不替代资金确认。 + sector.heat_score = round(min(sector.heat_score + item.catalyst_score * 0.18, 100), 1) + sectors.sort( + key=lambda s: ( + s.heat_score, + s.catalyst_score, + s.realtime_pct_change if s.realtime_pct_change is not None else s.pct_change, + ), + reverse=True, + ) + return sectors + + async def _select_from_hot_sectors( hot_sectors: list[SectorInfo], trade_date: str, @@ -605,10 +640,11 @@ async def _build_recommendations( total = len(candidates) signal_counts = {"breakout": 0, "breakout_confirm": 0, "pullback": 0, "launch": 0, "reversal": 0, "none": 0} score_weights = strategy_profile.score_weights if strategy_profile else { - "capital_momentum": 0.30, - "supply_demand": 0.30, - "price_action": 0.25, - "trend": 0.15, + "catalyst": 0.30, + "theme_money": 0.25, + "stock_money": 0.20, + "emotion_role": 0.15, + "timing": 0.10, } score_weights = _normalize_score_weights(score_weights) signal_priority = strategy_profile.entry_signal_priority if strategy_profile else [] @@ -649,12 +685,32 @@ async def _build_recommendations( signal_name = signal_type.value signal_counts[signal_name] += 1 - # ── 三维度评分 ── + # ── 五轴评分:催化、主题资金、个股资金、情绪角色、入场时机 ── supply_demand_score = score_supply_demand(df) price_action_score = _score_price_action(df, entry_signal) trend_score = _score_trend(df) capital_score = _score_capital_simple(stock) flow_momentum_score = _score_flow_momentum(stock, sector, hot_sectors) + sector_stage = _get_sector_stage(sector, hot_sectors) + hot_theme_match = find_hot_theme_match(sector, hot_sectors) + sector_limit_up = _get_sector_limit_up(sector, hot_sectors) + catalyst_score = _get_sector_catalyst_score(sector, hot_sectors) + catalyst_reasons = _get_sector_catalyst_reasons(sector, hot_sectors) + theme_money_score = _score_theme_money(sector, hot_sectors, hot_theme_match) + stock_money_score = _score_stock_money(stock, capital_score) + emotion_role_score = _score_emotion_role( + stock=stock, + sector_limit_up=sector_limit_up, + sector_stage=sector_stage, + hot_theme_match=hot_theme_match, + hot_sectors=hot_sectors, + ) + timing_score = _score_timing( + entry_signal_score=entry_signal.get("signal_score", 0), + price_action_score=price_action_score, + trend_score=trend_score, + position_score=50, + ) last = df.iloc[-1] trend_penalty = 1.0 @@ -663,15 +719,28 @@ async def _build_recommendations( if last["ma5"] < last["ma10"] < last["ma20"]: trend_penalty = 0.82 - final_score = ( - flow_momentum_score * score_weights["capital_momentum"] + - supply_demand_score * score_weights["supply_demand"] + - price_action_score * score_weights["price_action"] + - trend_score * score_weights["trend"] - ) + scoring_axes = { + "catalyst": catalyst_score, + "theme_money": theme_money_score, + "stock_money": stock_money_score, + "emotion_role": emotion_role_score, + "timing": timing_score, + } + final_score = sum(scoring_axes[key] * score_weights[key] for key in scoring_axes) final_score *= trend_penalty tech_signal = generate_signals(ts_code, name) + if tech_signal: + timing_score = _score_timing( + entry_signal_score=entry_signal.get("signal_score", 0), + price_action_score=price_action_score, + trend_score=trend_score, + position_score=tech_signal.position_score, + ) + scoring_axes["timing"] = timing_score + final_score = sum(scoring_axes[key] * score_weights[key] for key in scoring_axes) + final_score *= trend_penalty + penalties = [] if tech_signal: if tech_signal.rally_pct_5d > 20: @@ -679,9 +748,6 @@ async def _build_recommendations( elif tech_signal.rally_pct_5d > 15: penalties.append(0.80) - sector_stage = _get_sector_stage(sector, hot_sectors) - hot_theme_match = find_hot_theme_match(sector, hot_sectors) - if sector_stage == "end": penalties.append(0.70) elif sector_stage == "late": @@ -695,31 +761,59 @@ async def _build_recommendations( if penalties: final_score *= min(penalties) - sector_limit_up = _get_sector_limit_up(sector, hot_sectors) + boosts = [] if sector_limit_up >= 5: final_score *= 1.20 + boosts.append({"label": "板块涨停扩散", "value": "+20%", "reason": f"{sector_limit_up}家涨停"}) elif sector_limit_up >= 3: final_score *= 1.10 + boosts.append({"label": "板块涨停扩散", "value": "+10%", "reason": f"{sector_limit_up}家涨停"}) if entry_signal.get("signal_score", 0) >= 80: final_score *= 1.10 + boosts.append({"label": "入场形态强", "value": "+10%", "reason": f"信号分{entry_signal.get('signal_score', 0):.0f}"}) - final_score *= _flow_confirmation_multiplier(stock, hot_theme_match, market_temp) + if catalyst_score >= 70 and hot_theme_match: + final_score *= 1.06 + boosts.append({"label": "新闻催化确认", "value": "+6%", "reason": catalyst_reasons[0] if catalyst_reasons else f"催化分{catalyst_score:.0f}"}) + elif catalyst_score >= 45 and hot_theme_match: + final_score *= 1.03 + boosts.append({"label": "新闻催化加权", "value": "+3%", "reason": catalyst_reasons[0] if catalyst_reasons else f"催化分{catalyst_score:.0f}"}) + flow_multiplier = _flow_confirmation_multiplier(stock, hot_theme_match, market_temp) + final_score *= flow_multiplier + if flow_multiplier > 1: + boosts.append({"label": "资金主线共振", "value": f"+{round((flow_multiplier - 1) * 100)}%", "reason": "资金、量能与主线同向"}) + elif flow_multiplier < 1: + boosts.append({"label": "资金确认不足", "value": f"-{round((1 - flow_multiplier) * 100)}%", "reason": "资金或主线承接不足"}) + + theme_penalty = 1.0 if not hot_theme_match: - final_score *= 0.82 + theme_penalty = 0.82 + final_score *= theme_penalty elif hot_theme_match not in hot_sectors[:5]: - final_score *= 0.9 + theme_penalty = 0.9 + final_score *= theme_penalty signal_matches_profile = bool(signal_priority and signal_name in signal_priority[:4]) + profile_multiplier = 1.0 if signal_type != EntrySignal.NONE and signal_priority: priority_rank = signal_priority.index(signal_type.value) if priority_rank == 0: - final_score *= 1.08 + profile_multiplier = 1.08 + final_score *= profile_multiplier elif priority_rank == 1: - final_score *= 1.04 + profile_multiplier = 1.04 + final_score *= profile_multiplier elif priority_rank >= 3: - final_score *= 0.94 + profile_multiplier = 0.94 + final_score *= profile_multiplier + if profile_multiplier != 1.0: + boosts.append({ + "label": "策略匹配度", + "value": f"{'+' if profile_multiplier > 1 else '-'}{round(abs(profile_multiplier - 1) * 100)}%", + "reason": f"{signal_name} 与今日策略优先级匹配", + }) pe = stock.get("pe") pb = stock.get("pb") @@ -833,6 +927,41 @@ async def _build_recommendations( entry_timing=entry_timing, data_date=last_date, ) + risk_tags = _build_risk_tags(market_temp, tech_signal, sector_stage, trend_penalty) + penalty_notes = _build_penalty_notes( + penalties=penalties, + trend_penalty=trend_penalty, + theme_penalty=theme_penalty, + market_temp_score=market_temp_score, + sector_stage=sector_stage, + hot_theme_match=hot_theme_match, + ) + decision_trace = _build_decision_trace( + stock=stock, + score=final_score, + score_weights=score_weights, + scoring_axes=scoring_axes, + flow_momentum_score=flow_momentum_score, + supply_demand_score=supply_demand_score, + price_action_score=price_action_score, + trend_score=trend_score, + capital_score=capital_score, + position_score=position_score, + valuation_score=valuation_score, + entry_signal_type=effective_signal_name, + entry_signal_score=entry_signal.get("signal_score", 0), + signal_matches_profile=signal_matches_profile, + sector_stage=sector_stage, + sector_limit_up=sector_limit_up, + catalyst_score=catalyst_score, + catalyst_reasons=catalyst_reasons, + market_temp=market_temp, + trade_plan=trade_plan, + boosts=boosts, + penalties=penalty_notes, + risk_tags=risk_tags, + hot_theme_match=hot_theme_match, + ) rec = Recommendation( ts_code=ts_code, @@ -868,6 +997,7 @@ async def _build_recommendations( prefilter_decision="", prefilter_reason="", focus_points=[], + decision_trace=decision_trace, ) recommendations.append(rec) @@ -894,7 +1024,7 @@ async def _build_recommendations( "entry_signal_score": round(entry_signal.get("signal_score", 0), 1), "flow_momentum_score": round(flow_momentum_score, 1), "signal_matches_profile": signal_matches_profile, - "risk_tags": _build_risk_tags(market_temp, tech_signal, sector_stage, trend_penalty), + "risk_tags": risk_tags, "focus_points": _build_focus_points(stock, entry_signal, tech_signal, vol_pattern, sector_stage), } @@ -1035,6 +1165,13 @@ async def _build_recommendations( rec.risk_note = llm_data["risk_flag"] rec.level = _score_to_level(rec.score) + _apply_llm_trace( + rec, + verdict=verdict, + action_plan=rec.action_plan, + conviction=conviction, + reason=llm_data.get("analysis", "") or llm_data.get("risk_flag", ""), + ) # 用 LLM 给出的价格替代结构化规则价格 if llm_data.get("entry_price"): @@ -1068,21 +1205,188 @@ async def _build_recommendations( def _normalize_score_weights(weights: dict[str, float]) -> dict[str, float]: - """兼容旧策略权重,并保证资金顺势进入主评分。""" + """归一化五轴主评分权重,并兼容旧四项策略配置。""" defaults = { - "capital_momentum": 0.30, - "supply_demand": 0.30, - "price_action": 0.25, - "trend": 0.15, + "catalyst": 0.30, + "theme_money": 0.25, + "stock_money": 0.20, + "emotion_role": 0.15, + "timing": 0.10, } - merged = {**defaults, **(weights or {})} - keys = ["capital_momentum", "supply_demand", "price_action", "trend"] + raw = weights or {} + if any(key in raw for key in defaults): + merged = {**defaults, **raw} + else: + merged = { + "catalyst": defaults["catalyst"], + "theme_money": max(float(raw.get("capital_momentum", 0) or 0), 0), + "stock_money": max(float(raw.get("supply_demand", 0) or 0), 0), + "emotion_role": max(float(raw.get("trend", 0) or 0), 0), + "timing": max(float(raw.get("price_action", 0) or 0), 0), + } + keys = list(defaults.keys()) total = sum(max(float(merged.get(k, 0) or 0), 0) for k in keys) if total <= 0: return defaults return {k: max(float(merged.get(k, 0) or 0), 0) / total for k in keys} +def _score_theme_money( + sector_name: str, + hot_sectors: list[SectorInfo], + hot_theme_match: SectorInfo | None, +) -> float: + theme = hot_theme_match + if not theme: + theme = next((s for s in hot_sectors if s.sector_name == sector_name), None) + if not theme: + return 20.0 + + pct = theme.realtime_pct_change if theme.realtime_pct_change is not None else theme.pct_change + amount = theme.realtime_amount if theme.realtime_amount is not None else theme.capital_inflow + main_force_ratio = theme.main_force_ratio or 0 + up = theme.realtime_up_count + down = theme.realtime_down_count + + score = min(max(theme.heat_score, 0), 100) * 0.42 + if pct >= 4: + score += 18 + elif pct >= 2: + score += 14 + elif pct > 0: + score += 8 + elif pct < -1: + score -= 8 + + if amount > 500000: + score += 14 + elif amount > 200000: + score += 10 + elif amount > 0: + score += 6 + + if main_force_ratio >= 20: + score += 10 + elif main_force_ratio >= 10: + score += 6 + elif main_force_ratio < 0: + score -= 6 + + if up is not None and down is not None: + breadth = up - down + if breadth >= 20: + score += 10 + elif breadth >= 8: + score += 6 + elif breadth < -8: + score -= 8 + + return round(max(0, min(score, 100)), 1) + + +def _score_stock_money(stock: dict, capital_score: float) -> float: + main_net = float(stock.get("main_net_inflow", 0) or 0) + inflow_ratio = float(stock.get("inflow_ratio", 0) or 0) + turnover_rate = float(stock.get("turnover_rate", 0) or 0) + volume_ratio = stock.get("volume_ratio") + volume_ratio = float(volume_ratio) if volume_ratio not in (None, "") else 0.0 + + score = capital_score * 0.55 + if main_net > 15000: + score += 18 + elif main_net > 8000: + score += 14 + elif main_net > 3000: + score += 10 + elif main_net < -5000: + score -= 12 + + if inflow_ratio > 12: + score += 12 + elif inflow_ratio > 6: + score += 8 + elif inflow_ratio < -6: + score -= 8 + + if volume_ratio >= 2: + score += 10 + elif volume_ratio >= 1.2: + score += 6 + + if 3 <= turnover_rate <= 15: + score += 8 + elif turnover_rate > 0: + score += 3 + + return round(max(0, min(score, 100)), 1) + + +def _score_emotion_role( + stock: dict, + sector_limit_up: int, + sector_stage: str, + hot_theme_match: SectorInfo | None, + hot_sectors: list[SectorInfo], +) -> float: + tags = set(stock.get("recall_tags", []) or []) + recall_score = float(stock.get("recall_score", 0) or 0) + score = min(max(recall_score, 0), 100) * 0.45 + + if hot_theme_match: + try: + rank = hot_sectors.index(hot_theme_match) + 1 + except ValueError: + rank = 99 + if rank == 1: + score += 16 + elif rank <= 3: + score += 12 + elif rank <= 5: + score += 7 + else: + score -= 10 + + if "theme_leader" in tags: + score += 18 + elif "top_theme_member" in tags: + score += 10 + elif "hot_theme_core" in tags: + score += 6 + + if sector_limit_up >= 5: + score += 14 + elif sector_limit_up >= 3: + score += 10 + elif sector_limit_up >= 1: + score += 5 + + if sector_stage == "early": + score += 8 + elif sector_stage == "mid": + score += 5 + elif sector_stage == "late": + score -= 8 + elif sector_stage == "end": + score -= 20 + + return round(max(0, min(score, 100)), 1) + + +def _score_timing( + entry_signal_score: float, + price_action_score: float, + trend_score: float, + position_score: float, +) -> float: + score = ( + min(max(entry_signal_score, 0), 100) * 0.40 + + min(max(price_action_score, 0), 100) * 0.28 + + min(max(trend_score, 0), 100) * 0.17 + + min(max(position_score, 0), 100) * 0.15 + ) + return round(max(0, min(score, 100)), 1) + + def _score_flow_momentum(stock: dict, sector_name: str, hot_sectors: list[SectorInfo]) -> float: """资金顺势评分:个股资金在场 + 主线板块顺风 + 活跃度确认。""" main_net = float(stock.get("main_net_inflow", 0) or 0) @@ -1093,6 +1397,7 @@ def _score_flow_momentum(stock: dict, sector_name: str, hot_sectors: list[Sector recall_score = float(stock.get("recall_score", 0) or 0) sector_heat = _get_sector_heat(sector_name, hot_sectors) sector_limit_up = _get_sector_limit_up(sector_name, hot_sectors) + catalyst_score = _get_sector_catalyst_score(sector_name, hot_sectors) score = 0.0 @@ -1138,6 +1443,13 @@ def _score_flow_momentum(stock: dict, sector_name: str, hot_sectors: list[Sector elif sector_limit_up >= 1: score += 3 + if catalyst_score >= 80: + score += 8 + elif catalyst_score >= 60: + score += 6 + elif catalyst_score >= 40: + score += 3 + # 活跃度和召回强度占 35 分。 if volume_ratio >= 2.5: score += 12 @@ -1424,6 +1736,20 @@ def _get_sector_limit_up(sector_name: str, hot_sectors: list[SectorInfo]) -> int return 0 +def _get_sector_catalyst_score(sector_name: str, hot_sectors: list[SectorInfo]) -> float: + for s in hot_sectors: + if s.sector_name == sector_name: + return s.catalyst_score + return 0.0 + + +def _get_sector_catalyst_reasons(sector_name: str, hot_sectors: list[SectorInfo]) -> list[str]: + for s in hot_sectors: + if s.sector_name == sector_name: + return s.catalyst_reasons + return [] + + def _get_sector_member_count(sector_name: str, hot_sectors: list[SectorInfo]) -> int: """获取板块成分股数量""" for s in hot_sectors: @@ -1699,6 +2025,228 @@ def _build_risk_tags( return tags +def _build_penalty_notes( + penalties: list[float], + trend_penalty: float, + theme_penalty: float, + market_temp_score: float, + sector_stage: str, + hot_theme_match: SectorInfo | None, +) -> list[dict]: + notes: list[dict] = [] + if trend_penalty < 1: + notes.append({"label": "趋势压力", "value": f"-{round((1 - trend_penalty) * 100)}%", "reason": "短中期均线偏弱"}) + if sector_stage == "end": + notes.append({"label": "板块尾声", "value": "最高-30%", "reason": "主题阶段进入尾声"}) + elif sector_stage == "late": + notes.append({"label": "板块后段", "value": "最高-12%", "reason": "主题阶段偏后"}) + if market_temp_score < 30: + notes.append({"label": "市场温度偏冷", "value": "最高-25%", "reason": f"温度{market_temp_score:.0f}"}) + elif market_temp_score < 50: + notes.append({"label": "市场温度一般", "value": "最高-12%", "reason": f"温度{market_temp_score:.0f}"}) + if theme_penalty < 1: + label = "未匹配主线" if not hot_theme_match else "非前排主线" + notes.append({"label": label, "value": f"-{round((1 - theme_penalty) * 100)}%", "reason": "主题地位不足"}) + if not notes and penalties: + notes.append({"label": "风险折扣", "value": f"-{round((1 - min(penalties)) * 100)}%", "reason": "存在风险项"}) + return notes[:4] + + +def _build_decision_trace( + stock: dict, + score: float, + score_weights: dict[str, float], + scoring_axes: dict[str, float], + flow_momentum_score: float, + supply_demand_score: float, + price_action_score: float, + trend_score: float, + capital_score: float, + position_score: float, + valuation_score: float, + entry_signal_type: str, + entry_signal_score: float, + signal_matches_profile: bool, + sector_stage: str, + sector_limit_up: int, + catalyst_score: float, + catalyst_reasons: list[str], + market_temp: MarketTemperature, + trade_plan: dict, + boosts: list[dict], + penalties: list[dict], + risk_tags: list[str], + hot_theme_match: SectorInfo | None, +) -> dict: + tags = stock.get("recall_tags", []) or [] + headline = _build_decision_headline( + stock=stock, + action_plan=trade_plan.get("action_plan", "观察"), + entry_signal_type=entry_signal_type, + hot_theme_match=hot_theme_match, + score=score, + ) + score_breakdown = [ + { + "key": "catalyst", + "label": "新闻催化", + "score": round(scoring_axes.get("catalyst", 0), 1), + "weight": round(score_weights.get("catalyst", 0), 2), + }, + { + "key": "theme_money", + "label": "主题资金", + "score": round(scoring_axes.get("theme_money", 0), 1), + "weight": round(score_weights.get("theme_money", 0), 2), + }, + { + "key": "stock_money", + "label": "个股资金", + "score": round(scoring_axes.get("stock_money", 0), 1), + "weight": round(score_weights.get("stock_money", 0), 2), + }, + { + "key": "emotion_role", + "label": "情绪角色", + "score": round(scoring_axes.get("emotion_role", 0), 1), + "weight": round(score_weights.get("emotion_role", 0), 2), + }, + { + "key": "timing", + "label": "入场时机", + "score": round(scoring_axes.get("timing", 0), 1), + "weight": round(score_weights.get("timing", 0), 2), + }, + ] + evidence = _build_trace_evidence( + tags=tags, + main_net=float(stock.get("main_net_inflow", 0) or 0), + inflow_ratio=float(stock.get("inflow_ratio", 0) or 0), + sector_limit_up=sector_limit_up, + entry_signal_type=entry_signal_type, + entry_signal_score=entry_signal_score, + signal_matches_profile=signal_matches_profile, + hot_theme_match=hot_theme_match, + catalyst_score=catalyst_score, + catalyst_reasons=catalyst_reasons, + ) + return { + "version": 1, + "headline": headline, + "action_plan": trade_plan.get("action_plan", "观察"), + "final_score": round(score, 1), + "route_tags": tags, + "evidence": evidence, + "score_breakdown": score_breakdown, + "aux_scores": { + "flow_momentum": round(flow_momentum_score, 1), + "supply_demand": round(supply_demand_score, 1), + "price_action": round(price_action_score, 1), + "trend": round(trend_score, 1), + "capital": round(capital_score, 1), + "position": round(position_score, 1), + "valuation": round(valuation_score, 1), + }, + "context": { + "market_temperature": round(market_temp.temperature, 1), + "sector_stage": sector_stage, + "sector_limit_up": sector_limit_up, + "theme_matched": bool(hot_theme_match), + "theme_name": hot_theme_match.sector_name if hot_theme_match else "", + }, + "catalyst": { + "score": round(catalyst_score, 1), + "reasons": catalyst_reasons[:3], + }, + "boosts": boosts[:4], + "penalties": penalties[:4], + "risk_tags": risk_tags, + "llm_adjustment": None, + } + + +def _build_decision_headline( + stock: dict, + action_plan: str, + entry_signal_type: str, + hot_theme_match: SectorInfo | None, + score: float, +) -> str: + role = stock.get("stock_role_hint") or "候选标的" + theme = hot_theme_match.sector_name if hot_theme_match else stock.get("sector", "") + signal_label = { + "breakout": "突破", + "breakout_confirm": "突破确认", + "pullback": "回踩", + "launch": "启动", + "reversal": "反转", + "flow_momentum": "资金顺势", + "none": "观察", + }.get(entry_signal_type, entry_signal_type or "观察") + if action_plan == "可操作": + prefix = "可操作" + elif action_plan == "重点关注": + prefix = "重点关注" + else: + prefix = "观察" + theme_part = f"{theme}内" if theme else "" + return f"{prefix}: {theme_part}{role},{signal_label}线索,综合分{score:.0f}" + + +def _build_trace_evidence( + tags: list[str], + main_net: float, + inflow_ratio: float, + sector_limit_up: int, + entry_signal_type: str, + entry_signal_score: float, + signal_matches_profile: bool, + hot_theme_match: SectorInfo | None, + catalyst_score: float = 0, + catalyst_reasons: list[str] | None = None, +) -> list[str]: + evidence: list[str] = [] + if hot_theme_match: + evidence.append(f"匹配主线主题 {hot_theme_match.sector_name}") + if catalyst_score > 0: + reason = (catalyst_reasons or [""])[0] + suffix = f": {reason}" if reason else "" + evidence.append(f"新闻/政策催化分 {catalyst_score:.0f}{suffix}") + if tags: + evidence.append("召回来源: " + " / ".join(tags[:3])) + if main_net > 0: + evidence.append(f"主力净流入 {main_net:.0f} 万,占比 {inflow_ratio:.1f}%") + if sector_limit_up > 0: + evidence.append(f"板块涨停扩散 {sector_limit_up} 家") + if entry_signal_type and entry_signal_type != "none": + evidence.append(f"入场信号 {entry_signal_type},信号分 {entry_signal_score:.0f}") + if signal_matches_profile: + evidence.append("符合今日策略偏好的入场类型") + return evidence[:5] + + +def _apply_llm_trace( + rec: Recommendation, + verdict: str, + action_plan: str, + conviction: float, + reason: str, +) -> None: + trace = dict(rec.decision_trace or {}) + trace["llm_adjustment"] = { + "verdict": verdict, + "action_plan": action_plan, + "conviction": round(conviction, 1), + "reason": str(reason or "")[:180], + } + trace["action_plan"] = action_plan + if verdict == "execute": + trace["headline"] = f"AI确认可执行: {trace.get('headline', rec.name)}" + elif verdict == "skip": + trace["headline"] = f"AI降级观察: {trace.get('headline', rec.name)}" + rec.decision_trace = trace + + def _build_focus_points( stock: dict, entry_signal: dict, diff --git a/backend/app/llm/strategy_board.py b/backend/app/llm/strategy_board.py index 2349d9c6..7c775a70 100644 --- a/backend/app/llm/strategy_board.py +++ b/backend/app/llm/strategy_board.py @@ -6,8 +6,6 @@ import logging -from app.analysis.sector_realtime import enrich_sectors_with_realtime -from app.analysis.sector_realtime import get_today_realtime_sector_board from app.config import settings, should_prefer_realtime_today, today_trade_date from app.data.models import ( MarketTemperature, @@ -62,12 +60,6 @@ async def build_strategy_board(include_llm: bool = False) -> dict: market_temp = latest.get("market_temp") recommendations = latest.get("recommendations", []) sectors = await get_latest_sectors() - snapshot_trade_date = sectors[0].trade_date if sectors else "" - if should_prefer_realtime_today(snapshot_trade_date) or snapshot_trade_date != today_trade_date(): - realtime_sectors = await get_today_realtime_sector_board(limit=20) - sectors = realtime_sectors or await enrich_sectors_with_realtime(sectors) - else: - sectors = await enrich_sectors_with_realtime(sectors) performance = await get_performance_stats() from app.llm.strategy_iteration import build_strategy_iteration_report iteration_report = await build_strategy_iteration_report(limit=50, include_llm=include_llm) diff --git a/backend/app/llm/strategy_selector.py b/backend/app/llm/strategy_selector.py index 02158a6a..ebbaf3f6 100644 --- a/backend/app/llm/strategy_selector.py +++ b/backend/app/llm/strategy_selector.py @@ -50,7 +50,7 @@ def get_strategy_profile_by_id(strategy_id: str) -> StrategyProfile: name="主线突破", description="市场偏强,优先寻找主线板块内的突破和突破确认。", entry_signal_priority=["breakout", "breakout_confirm", "launch", "pullback", "reversal"], - score_weights={"capital_momentum": 0.30, "supply_demand": 0.30, "price_action": 0.25, "trend": 0.15}, + score_weights={"catalyst": 0.30, "theme_money": 0.25, "stock_money": 0.20, "emotion_role": 0.15, "timing": 0.10}, min_score=62, buy_threshold=66, max_position_pct=30, @@ -67,7 +67,7 @@ def get_strategy_profile_by_id(strategy_id: str) -> StrategyProfile: name="回踩轮动", description="市场震荡分化,优先做回踩支撑和板块轮动中的低吸确认。", entry_signal_priority=["pullback", "breakout_confirm", "launch", "breakout", "reversal"], - score_weights={"capital_momentum": 0.28, "supply_demand": 0.30, "price_action": 0.22, "trend": 0.20}, + score_weights={"catalyst": 0.25, "theme_money": 0.28, "stock_money": 0.20, "emotion_role": 0.12, "timing": 0.15}, min_score=60, buy_threshold=63, max_position_pct=20, @@ -84,7 +84,7 @@ def get_strategy_profile_by_id(strategy_id: str) -> StrategyProfile: name="启动试错", description="市场偏弱,适合少量观察启动型和反转型机会,不做强追涨。", entry_signal_priority=["launch", "reversal", "pullback", "breakout_confirm", "breakout"], - score_weights={"capital_momentum": 0.32, "supply_demand": 0.28, "price_action": 0.25, "trend": 0.15}, + score_weights={"catalyst": 0.28, "theme_money": 0.22, "stock_money": 0.20, "emotion_role": 0.12, "timing": 0.18}, min_score=58, buy_threshold=61, max_position_pct=10, @@ -101,7 +101,7 @@ def get_strategy_profile_by_id(strategy_id: str) -> StrategyProfile: name="防守观察", description="市场退潮,系统以观察池为主,不主动扩大出手。", entry_signal_priority=["pullback", "launch", "reversal", "breakout_confirm", "breakout"], - score_weights={"capital_momentum": 0.35, "supply_demand": 0.25, "price_action": 0.25, "trend": 0.15}, + score_weights={"catalyst": 0.22, "theme_money": 0.25, "stock_money": 0.18, "emotion_role": 0.15, "timing": 0.20}, min_score=56, buy_threshold=64, max_position_pct=5, diff --git a/backend/app/main.py b/backend/app/main.py index 524231be..325d77f9 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -11,7 +11,7 @@ from app.config import settings from app.db.error_logger import log_error from app.db.database import init_db from app.engine.scheduler import start_scheduler, stop_scheduler -from app.api import market, sectors, recommendations, stocks, watchlists, websocket, chat, auth, debug +from app.api import market, sectors, recommendations, stocks, watchlists, websocket, chat, auth, debug, catalysts def configure_logging() -> None: logging.basicConfig( @@ -144,6 +144,7 @@ app.include_router(watchlists.router) app.include_router(chat.router) app.include_router(auth.router) app.include_router(debug.router) +app.include_router(catalysts.router) # WebSocket app.websocket("/ws")(websocket.ws_endpoint) diff --git a/frontend/src/app/(auth)/chat/page.tsx b/frontend/src/app/(auth)/chat/page.tsx index 6ca5d941..ffb97af5 100644 --- a/frontend/src/app/(auth)/chat/page.tsx +++ b/frontend/src/app/(auth)/chat/page.tsx @@ -27,8 +27,8 @@ const CHAT_SCENES = [ description: "诊断 / 触发 / 失效", }, { - title: "系统", - description: "推荐池 / 自选股 / 校准", + title: "自选", + description: "推荐池 / 自选股 / 复盘", }, ]; @@ -108,9 +108,9 @@ export default function ChatPage() {