diff --git a/backend/.env b/backend/.env index 1ee376ed..dac3bc55 100644 --- a/backend/.env +++ b/backend/.env @@ -1,7 +1,7 @@ ASTOCK_TUSHARE_TOKEN=0ed6419a00d8923dc19c0b58fc92d94c9a0696949ab91a13aa58a0cc ASTOCK_DEBUG=true -ASTOCK_DEEPSEEK_API_KEY=sk-9f6b56f08796435d988cf202e37f6ee3 +ASTOCK_DEEPSEEK_API_KEY=sk-ee8eee63d5cf41eba14a328de49055ac ASTOCK_ALERT_ENABLED=true ASTOCK_FEISHU_WEBHOOK_URL=https://open.feishu.cn/open-apis/bot/v2/hook/6307668f-10aa-4fc1-8c1e-bad1b6b78d4d ASTOCK_ALERT_ENVIRONMENT=local diff --git a/backend/app/api/catalysts.py b/backend/app/api/catalysts.py index 9d7f51f6..33b6f773 100644 --- a/backend/app/api/catalysts.py +++ b/backend/app/api/catalysts.py @@ -3,8 +3,14 @@ from fastapi import APIRouter, Depends from app.catalyst.models import CatalystInput -from app.catalyst.service import build_theme_catalyst_scores, get_recent_catalysts, ingest_catalyst +from app.catalyst.service import ( + build_theme_catalyst_scores, + get_recent_catalysts, + get_recent_news_items, + ingest_catalyst, +) from app.core.deps import get_current_admin +from app.news.pipeline import refresh_news_catalysts router = APIRouter(prefix="/api/catalysts", tags=["catalysts"]) @@ -14,6 +20,11 @@ async def recent(limit: int = 30, hours: int = 72): return await get_recent_catalysts(limit=limit, hours=hours) +@router.get("/news") +async def news(limit: int = 50, hours: int = 24, status: str | None = None): + return await get_recent_news_items(limit=limit, hours=hours, status=status) + + @router.get("/theme-scores") async def theme_scores(hours: int = 72, limit: int = 20): scores = await build_theme_catalyst_scores(hours=hours, limit=limit) @@ -24,3 +35,8 @@ async def theme_scores(hours: int = 72, limit: int = 20): async def ingest(item: CatalystInput, _admin: dict = Depends(get_current_admin)): analysis = await ingest_catalyst(item, use_llm=True) return analysis.model_dump() + + +@router.post("/refresh-news") +async def refresh_news(_admin: dict = Depends(get_current_admin)): + return await refresh_news_catalysts() diff --git a/backend/app/api/stocks.py b/backend/app/api/stocks.py index 0eed9601..b5f688e6 100644 --- a/backend/app/api/stocks.py +++ b/backend/app/api/stocks.py @@ -10,6 +10,7 @@ from starlette.responses import StreamingResponse from app.data.tushare_client import tushare_client from app.data import tencent_client +from app.data.code_utils import normalize_ts_code from app.analysis.technical import add_all_indicators from app.analysis.signals import generate_signals from app.db.database import get_db @@ -38,6 +39,7 @@ async def get_stock_thesis(ts_code: str): """获取个股推荐推演归档(只读缓存,不触发扫描或 LLM)。""" from sqlalchemy import text + ts_code = normalize_ts_code(ts_code) async with get_db() as db: rec_result = await db.execute( text( @@ -139,7 +141,6 @@ async def get_stock_thesis(ts_code: str): decision_points = [ {"label": "操作计划", "value": r["action_plan"] or "观察"}, {"label": "召回来源", "value": " / ".join(_safe_json_list(r.get("recall_tags"))) or "未归档"}, - {"label": "AI预筛", "value": r.get("prefilter_decision") or "未执行"}, {"label": "触发条件", "value": r["trigger_condition"] or "等待触发条件归档"}, {"label": "失效条件", "value": r["invalidation_condition"] or "等待失效条件归档"}, {"label": "建议仓位", "value": f"{r['suggested_position_pct']}%" if r["suggested_position_pct"] is not None else "未设置"}, @@ -220,6 +221,7 @@ async def get_stock_thesis(ts_code: str): @router.get("/{ts_code}/quote") async def get_quote(ts_code: str): """获取个股实时行情""" + ts_code = normalize_ts_code(ts_code) quote = await tencent_client.get_realtime_quote(ts_code) if not quote: return {"error": "获取行情失败"} @@ -229,6 +231,7 @@ async def get_quote(ts_code: str): @router.get("/{ts_code}/kline") async def get_kline(ts_code: str, days: int = 120): """获取个股K线数据(含技术指标)""" + ts_code = normalize_ts_code(ts_code) df = tushare_client.get_stock_daily(ts_code, days=days) if df.empty: return [] @@ -247,6 +250,7 @@ async def get_kline(ts_code: str, days: int = 120): @router.get("/{ts_code}/signals") async def get_signals(ts_code: str): """获取个股技术面买卖信号""" + ts_code = normalize_ts_code(ts_code) signal = generate_signals(ts_code) return signal.model_dump() @@ -254,6 +258,7 @@ async def get_signals(ts_code: str): @router.get("/{ts_code}/capital_flow") async def get_capital_flow(ts_code: str, days: int = 10): """获取个股资金流向(含大/中/小单分拆)""" + ts_code = normalize_ts_code(ts_code) df = tushare_client.get_stock_moneyflow(ts_code, days=days) if df.empty: return [] @@ -287,6 +292,7 @@ async def get_capital_flow(ts_code: str, days: int = 10): @router.get("/{ts_code}/diagnose/history") async def get_diagnose_history(ts_code: str): """获取个股最近5次诊断历史""" + ts_code = normalize_ts_code(ts_code) try: from sqlalchemy import text async with get_db() as db: @@ -321,6 +327,7 @@ async def get_diagnose_history(ts_code: str): @router.post("/{ts_code}/diagnose") async def diagnose_stock(ts_code: str, mode: str = Query("entry")): """AI 诊断个股(SSE 流式返回)""" + ts_code = normalize_ts_code(ts_code) from app.config import settings if not settings.deepseek_api_key: return {"status": "error", "message": "未配置 LLM API Key"} @@ -484,7 +491,7 @@ async def diagnose_stock(ts_code: str, mode: str = Query("entry")): "capital_score, technical_score, position_score, sector, signal, entry_signal_type " "FROM recommendations " "WHERE ts_code = :code " - "AND (action_plan IN ('可操作', '重点关注') OR COALESCE(llm_score, 0) >= 6 OR score >= 56) " + "AND (action_plan IN ('可操作', '重点关注') OR score >= 56) " "ORDER BY created_at DESC LIMIT 1" ), {"code": ts_code}, diff --git a/backend/app/catalyst/service.py b/backend/app/catalyst/service.py index b1791f16..31e12bd8 100644 --- a/backend/app/catalyst/service.py +++ b/backend/app/catalyst/service.py @@ -2,6 +2,7 @@ from __future__ import annotations +import hashlib import logging from datetime import datetime, timedelta @@ -11,6 +12,7 @@ from app.catalyst.mapper import analyze_catalyst from app.catalyst.models import CatalystAnalysis, CatalystInput, ThemeCatalystScore from app.db import tables from app.db.database import get_db +from app.news.models import NewsItem logger = logging.getLogger(__name__) @@ -21,8 +23,26 @@ async def ingest_catalyst(item: CatalystInput, use_llm: bool = True) -> Catalyst return analysis +async def ingest_catalyst_with_id(item: CatalystInput, use_llm: bool = True) -> tuple[CatalystAnalysis, int]: + analysis = await analyze_catalyst(item, use_llm=use_llm) + catalyst_id = await save_catalyst(analysis) + return analysis, catalyst_id + + async def save_catalyst(analysis: CatalystAnalysis) -> int: async with get_db() as db: + if analysis.url: + exists = await db.execute( + text( + "SELECT id FROM catalysts WHERE source = :source AND url = :url " + "ORDER BY id DESC LIMIT 1" + ), + {"source": analysis.source, "url": analysis.url}, + ) + row = exists.fetchone() + if row: + return int(row._mapping["id"]) + result = await db.execute( tables.catalysts_table.insert().values( title=analysis.title, @@ -58,6 +78,146 @@ async def save_catalyst(analysis: CatalystAnalysis) -> int: return catalyst_id +async def ingest_news_items(items: list[NewsItem]) -> dict: + """保存原始新闻并分析新增项。""" + inserted = 0 + duplicates = 0 + analyzed = 0 + failed = 0 + + async with get_db() as db: + for item in items: + dedup_key = build_news_dedup_key(item) + exists = await db.execute( + text("SELECT id FROM news_items WHERE dedup_key = :dedup_key LIMIT 1"), + {"dedup_key": dedup_key}, + ) + if exists.fetchone(): + duplicates += 1 + continue + + await db.execute( + tables.news_items_table.insert().values( + title=item.title, + content=item.content, + summary=item.summary, + source=item.source, + url=item.url, + published_at=item.published_at, + dedup_key=dedup_key, + status="pending", + ) + ) + inserted += 1 + await db.commit() + + if inserted: + result = await analyze_pending_news(limit=inserted) + analyzed += int(result.get("analyzed", 0)) + failed += int(result.get("failed", 0)) + + return { + "fetched": len(items), + "inserted": inserted, + "duplicates": duplicates, + "analyzed": analyzed, + "failed": failed, + } + + +async def analyze_pending_news(limit: int = 50, use_llm: bool = True) -> dict: + rows = [] + async with get_db() as db: + result = await db.execute( + text( + "SELECT * FROM news_items " + "WHERE status = 'pending' " + "ORDER BY COALESCE(published_at, created_at) DESC, id DESC " + "LIMIT :limit" + ), + {"limit": limit}, + ) + rows = [dict(row._mapping) for row in result.fetchall()] + + analyzed = 0 + skipped = 0 + failed = 0 + + for row in rows: + title = row.get("title") or "" + content = row.get("content") or row.get("summary") or "" + try: + if not _looks_market_relevant(title, content): + await _mark_news_item(row["id"], status="skipped", error="") + skipped += 1 + continue + + _, catalyst_id = await ingest_catalyst_with_id( + CatalystInput( + title=title, + content=content, + source=row.get("source") or "news", + url=row.get("url") or "", + published_at=row.get("published_at"), + ), + use_llm=use_llm, + ) + await _mark_news_item( + row["id"], + status="analyzed", + catalyst_id=catalyst_id, + error="", + ) + analyzed += 1 + except Exception as e: + logger.warning("新闻催化分析失败 id=%s title=%s error=%s", row.get("id"), title, e) + await _mark_news_item(row["id"], status="failed", error=str(e)[:500]) + failed += 1 + + return {"analyzed": analyzed, "skipped": skipped, "failed": failed} + + +def build_news_dedup_key(item: NewsItem) -> str: + text = f"{item.source}|{item.url or item.title}" + normalized = "".join(ch.lower() for ch in text.strip() if ch.isalnum() or ch in ".:/_-|") + return hashlib.sha256(normalized.encode("utf-8")).hexdigest() + + +async def _mark_news_item( + news_id: int, + status: str, + catalyst_id: int | None = None, + error: str = "", +) -> None: + async with get_db() as db: + await db.execute( + text( + "UPDATE news_items SET status = :status, catalyst_id = :catalyst_id, " + "error = :error, analyzed_at = :analyzed_at WHERE id = :id" + ), + { + "status": status, + "catalyst_id": catalyst_id, + "error": error, + "analyzed_at": datetime.now(), + "id": news_id, + }, + ) + await db.commit() + + +def _looks_market_relevant(title: str, content: str) -> bool: + text_value = f"{title} {content}" + keywords = [ + "A股", "股市", "上市公司", "证券", "券商", "板块", "行业", "概念", + "政策", "国务院", "发改委", "工信部", "央行", "证监会", "交易所", + "业绩", "订单", "签约", "并购", "重组", "回购", "定增", "涨停", + "资金", "主力", "北向", "算力", "人工智能", "半导体", "新能源", + "机器人", "低空", "医药", "消费", "军工", "地产", "有色", + ] + return any(keyword in text_value for keyword in keywords) + + async def get_recent_catalysts(limit: int = 30, hours: int = 72) -> list[dict]: since = datetime.now() - timedelta(hours=hours) async with get_db() as db: @@ -78,6 +238,27 @@ async def get_recent_catalysts(limit: int = 30, hours: int = 72) -> list[dict]: return [dict(row) for row in rows] +async def get_recent_news_items(limit: int = 50, hours: int = 24, status: str | None = None) -> list[dict]: + since = datetime.now() - timedelta(hours=hours) + conditions = ["COALESCE(published_at, created_at) >= :since"] + params = {"since": since, "limit": limit} + if status: + conditions.append("status = :status") + params["status"] = status + + async with get_db() as db: + result = await db.execute( + text( + "SELECT id, title, source, url, published_at, status, catalyst_id, error, created_at, analyzed_at " + "FROM news_items WHERE " + + " AND ".join(conditions) + + " ORDER BY COALESCE(published_at, created_at) DESC, id DESC LIMIT :limit" + ), + params, + ) + return [dict(row._mapping) for row in result.fetchall()] + + async def build_theme_catalyst_scores(hours: int = 72, limit: int = 20) -> list[ThemeCatalystScore]: since = datetime.now() - timedelta(hours=hours) async with get_db() as db: diff --git a/backend/app/config.py b/backend/app/config.py index 27edf2c0..9a6985de 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -31,9 +31,6 @@ class Settings(BaseSettings): top_sector_count: int = 5 # 关注板块数量 top_stock_count: int = 6 # 最终推荐输出上限 candidate_pool_limit: int = 90 # 多路召回后的候选池上限 - llm_prefilter_limit: int = 24 # LLM 初筛保留数量 - llm_prefilter_max_concurrent: int = 6 - llm_final_limit: int = 10 # LLM 深裁决池上限 actionable_limit: int = 3 # 最多可操作标的 watch_limit: int = 5 # 最多重点关注标的 min_turnover_rate: float = 2.0 # 最小换手率 % @@ -54,6 +51,16 @@ class Settings(BaseSettings): pullback_max_shrink_ratio: float = 0.85 # 回踩型最大缩量比 consolidation_max_range_pct: float = 8.0 # 启动型最大整理振幅 % + # 新闻/政策催化采集 + news_collection_enabled: bool = True + news_tushare_sources: str = "sina,eastmoney,10jqka,wallstreetcn" + news_tushare_sources_per_run: int = 1 + news_rss_sources: str = "" # name|url,name|url + news_fetch_lookback_hours: int = 24 + news_fetch_limit_per_source: int = 30 + news_analyze_limit_per_run: int = 50 + news_min_title_length: int = 8 + # LLM (DeepSeek) deepseek_api_key: str = "" deepseek_base_url: str = "https://api.deepseek.com/v1" diff --git a/backend/app/data/code_utils.py b/backend/app/data/code_utils.py new file mode 100644 index 00000000..8654fe2a --- /dev/null +++ b/backend/app/data/code_utils.py @@ -0,0 +1,54 @@ +"""股票代码规范化工具。""" + +from __future__ import annotations + +import re + +SUPPORTED_MARKETS = {"SH", "SZ", "BJ"} + + +def normalize_ts_code(value: str | None) -> str: + """把常见 A 股代码输入规范成 Tushare 格式。 + + 支持: + - 603779 -> 603779.SH + - 300750 -> 300750.SZ + - 430047 -> 430047.BJ + - sh600519 / sz000001 / bj430047 + - 600519.sh -> 600519.SH + """ + text = str(value or "").strip().upper() + if not text: + return "" + + compact_match = re.fullmatch(r"(SH|SZ|BJ)(\d{6})", text) + if compact_match: + market, code = compact_match.groups() + return f"{code}.{market}" + + dotted_match = re.fullmatch(r"(\d{6})\.(SH|SZ|BJ)", text) + if dotted_match: + code, market = dotted_match.groups() + return f"{code}.{market}" + + if re.fullmatch(r"\d{6}", text): + if text.startswith(("6", "9")): + return f"{text}.SH" + if text.startswith(("0", "2", "3")): + return f"{text}.SZ" + if text.startswith(("4", "8")): + return f"{text}.BJ" + + return text + + +def split_ts_code(value: str | None) -> tuple[str, str]: + """返回 (code, market),无法识别时抛出带上下文的 ValueError。""" + normalized = normalize_ts_code(value) + if "." not in normalized: + raise ValueError(f"无效股票代码格式: {value!r}") + + code, market = normalized.split(".", 1) + if not re.fullmatch(r"\d{6}", code) or market not in SUPPORTED_MARKETS: + raise ValueError(f"无效股票代码格式: {value!r}") + return code, market diff --git a/backend/app/data/eastmoney_client.py b/backend/app/data/eastmoney_client.py index 7df4271b..9e9d30e9 100644 --- a/backend/app/data/eastmoney_client.py +++ b/backend/app/data/eastmoney_client.py @@ -17,6 +17,7 @@ from datetime import datetime from app.data.cache import cache from app.config import settings +from app.data.code_utils import split_ts_code from app.db.error_logger import log_error logger = logging.getLogger(__name__) @@ -57,8 +58,8 @@ def _mark_rows_status(rows: list[dict], status: str, detail: str) -> list[dict]: def _ts_code_to_eastmoney(ts_code: str) -> str: - """600519.SH -> 1.600519 (上海=1, 深圳=0)""" - code, market = ts_code.split(".") + """600519.SH -> 1.600519 (上海=1, 深圳/北交所=0)""" + code, market = split_ts_code(ts_code) prefix = "1" if market == "SH" else "0" return f"{prefix}.{code}" diff --git a/backend/app/data/sina_client.py b/backend/app/data/sina_client.py index 00b66824..b6a89db9 100644 --- a/backend/app/data/sina_client.py +++ b/backend/app/data/sina_client.py @@ -16,6 +16,7 @@ import httpx import pandas as pd from app.data.cache import cache +from app.data.code_utils import split_ts_code from app.data.models import StockQuote logger = logging.getLogger(__name__) @@ -29,7 +30,7 @@ HEADERS = { def _ts_code_to_sina(ts_code: str) -> str: - code, market = ts_code.split(".") + code, market = split_ts_code(ts_code) return f"{market.lower()}{code}" diff --git a/backend/app/data/tencent_client.py b/backend/app/data/tencent_client.py index 8b990d00..f28ad34a 100644 --- a/backend/app/data/tencent_client.py +++ b/backend/app/data/tencent_client.py @@ -8,6 +8,7 @@ import logging import httpx from app.data.cache import cache from app.config import settings +from app.data.code_utils import normalize_ts_code, split_ts_code from app.data.models import StockQuote from app.db.error_logger import log_error @@ -22,7 +23,7 @@ HEADERS = { def _ts_code_to_tencent(ts_code: str) -> str: """600519.SH -> sh600519""" - code, market = ts_code.split(".") + code, market = split_ts_code(ts_code) return f"{market.lower()}{code}" @@ -67,6 +68,7 @@ def _parse_tencent_response(raw: str) -> dict | None: async def get_realtime_quote(ts_code: str) -> StockQuote | None: """获取单只股票实时行情""" + ts_code = normalize_ts_code(ts_code) cache_key = f"rt_quote:{ts_code}" cached = cache.get(cache_key) if cached is not None: @@ -118,6 +120,7 @@ async def get_realtime_quotes_batch(ts_codes: list[str]) -> dict[str, StockQuote """批量获取实时行情(腾讯支持逗号拼接多只)""" results = {} uncached = [] + ts_codes = [normalize_ts_code(code) for code in ts_codes if normalize_ts_code(code)] for code in ts_codes: cached = cache.get(f"rt_quote:{code}") @@ -191,6 +194,7 @@ async def get_index_realtime(index_codes: list[str] = None) -> dict[str, dict]: """ if not index_codes: index_codes = ["000001.SH", "399001.SZ", "399006.SZ"] + index_codes = [normalize_ts_code(code) for code in index_codes if normalize_ts_code(code)] results = {} symbols = ",".join(_ts_code_to_tencent(c) for c in index_codes) diff --git a/backend/app/data/tushare_client.py b/backend/app/data/tushare_client.py index 178e07c1..f2218e56 100644 --- a/backend/app/data/tushare_client.py +++ b/backend/app/data/tushare_client.py @@ -265,5 +265,43 @@ class TushareClient: ) ) + # ── 新闻快讯 ── + + def get_news( + self, + source: str, + start_time: datetime, + end_time: datetime, + limit: int = 30, + ) -> pd.DataFrame: + """获取 Tushare 新闻快讯。 + + Tushare news 接口不同账号权限不一,失败时返回空 DataFrame, + 不阻断其他新闻源或推荐流程。该接口常见限频为 1 次/分钟或 + 2 次/小时,因此这里不复用通用重试逻辑,避免失败重试继续消耗配额。 + """ + cache_key = f"news:{source}:{start_time:%Y%m%d%H}:{end_time:%Y%m%d%H}:{limit}" + cached = cache.get(cache_key) + if cached is not None: + return cached + + try: + self._ensure_init() + self._rate_limit() + result = self.pro.news( + src=source, + start_date=start_time.strftime("%Y-%m-%d %H:%M:%S"), + end_date=end_time.strftime("%Y-%m-%d %H:%M:%S"), + limit=limit, + ) + if result is None or result.empty: + return pd.DataFrame() + cache.set(cache_key, result, 600) + return result + except Exception as e: + logger.warning("Tushare 新闻请求失败 source=%s: %s", source, e) + log_error_background("tushare_news", f"Tushare 新闻请求失败 source={source}: {e}") + return pd.DataFrame() + tushare_client = TushareClient() diff --git a/backend/app/db/database.py b/backend/app/db/database.py index 2527baec..5c00bcd2 100644 --- a/backend/app/db/database.py +++ b/backend/app/db/database.py @@ -98,6 +98,8 @@ async def init_db(): "ALTER TABLE strategy_configs ADD COLUMN effective_from DATETIME DEFAULT CURRENT_TIMESTAMP", "ALTER TABLE prompt_configs ADD COLUMN evidence_json TEXT DEFAULT '{}'", "ALTER TABLE strategy_config_changes ADD COLUMN prompt_key TEXT DEFAULT ''", + "ALTER TABLE news_items ADD COLUMN summary TEXT DEFAULT ''", + "ALTER TABLE news_items ADD COLUMN error TEXT DEFAULT ''", "ALTER TABLE catalysts ADD COLUMN llm_reason TEXT DEFAULT ''", ]: try: @@ -107,6 +109,16 @@ async def init_db(): except Exception: pass # 列已存在,忽略 + for index_sql in [ + "CREATE UNIQUE INDEX IF NOT EXISTS idx_news_items_dedup_key ON news_items(dedup_key)", + "CREATE INDEX IF NOT EXISTS idx_news_items_status_time ON news_items(status, published_at)", + "CREATE INDEX IF NOT EXISTS idx_catalysts_source_url ON catalysts(source, url)", + ]: + try: + await conn.execute(__import__("sqlalchemy").text(index_sql)) + except Exception: + pass + try: await conn.execute( __import__("sqlalchemy").text( diff --git a/backend/app/db/tables.py b/backend/app/db/tables.py index d28cbaca..1245dcd3 100644 --- a/backend/app/db/tables.py +++ b/backend/app/db/tables.py @@ -240,6 +240,23 @@ strategy_config_changes_table = Table( Column("applied_at", DateTime), ) +news_items_table = Table( + "news_items", metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("title", Text, nullable=False), + Column("content", Text, default=""), + Column("summary", Text, default=""), + Column("source", Text, default=""), + Column("url", Text, default=""), + Column("published_at", DateTime), + Column("dedup_key", Text, nullable=False, unique=True), + Column("status", Text, default="pending"), # pending / analyzed / skipped / failed + Column("catalyst_id", Integer), + Column("error", Text, default=""), + Column("created_at", DateTime, server_default=func.now()), + Column("analyzed_at", DateTime), +) + catalysts_table = Table( "catalysts", metadata, Column("id", Integer, primary_key=True, autoincrement=True), diff --git a/backend/app/engine/recommender.py b/backend/app/engine/recommender.py index f1968389..e695ee97 100644 --- a/backend/app/engine/recommender.py +++ b/backend/app/engine/recommender.py @@ -577,7 +577,7 @@ async def get_recommendation_history(days: int = 7) -> list[dict]: " ) lt ON t.id = lt.max_id" ") latest_t ON latest_t.recommendation_id = r.id " "WHERE r.created_at >= :start " - "AND (r.action_plan IN ('可操作', '重点关注') OR COALESCE(r.llm_score, 0) >= 6 OR r.score >= 56) " + "AND (r.action_plan IN ('可操作', '重点关注') OR r.score >= 56) " "AND (" " COALESCE(r.recall_tags, '[]') LIKE '%hot_theme_core%' " " OR COALESCE(r.recall_tags, '[]') LIKE '%theme_leader%' " @@ -810,7 +810,6 @@ async def _save_to_db(result: dict): rec for rec in result.get("recommendations", []) if ( rec.action_plan in {"可操作", "重点关注"} - or (rec.llm_score is not None and rec.llm_score >= 6) or rec.score >= 56 ) ] @@ -916,7 +915,7 @@ async def _load_today_from_db() -> dict: result = await db.execute( text("SELECT * FROM recommendations " "WHERE date(created_at) = (SELECT date(created_at) FROM recommendations ORDER BY created_at DESC LIMIT 1) " - "AND (action_plan IN ('可操作', '重点关注') OR COALESCE(llm_score, 0) >= 6 OR score >= 56) " + "AND (action_plan IN ('可操作', '重点关注') OR score >= 56) " "AND (" " COALESCE(recall_tags, '[]') LIKE '%hot_theme_core%' " " OR COALESCE(recall_tags, '[]') LIKE '%theme_leader%' " diff --git a/backend/app/engine/scheduler.py b/backend/app/engine/scheduler.py index 865f8fef..67bbc3dd 100644 --- a/backend/app/engine/scheduler.py +++ b/backend/app/engine/scheduler.py @@ -39,6 +39,26 @@ async def _run_scan(session_name: str): await log_error("scheduler", f"定时扫描失败 ({session_name}): {e}", detail=traceback.format_exc()) +async def _run_news_refresh(session_name: str = "scheduled"): + """后台采集新闻并更新主题催化分。""" + logger.info("=== 新闻催化刷新: %s ===", session_name) + try: + from app.news.pipeline import refresh_news_catalysts + + result = await refresh_news_catalysts() + await broadcast_update({ + "type": "news_catalysts_ready", + "session": session_name, + "inserted": result.get("inserted", 0), + "analyzed": result.get("analyzed", 0), + "timestamp": datetime.now().isoformat(), + }) + except Exception as e: + logger.error(f"新闻催化刷新失败 ({session_name}): {e}") + from app.db.error_logger import log_error + await log_error("scheduler", f"新闻催化刷新失败 ({session_name}): {e}", detail=traceback.format_exc()) + + async def _run_watchlist_analysis(): """收盘后自动分析所有用户自选股。""" logger.info("=== 开始自选股定时分析 ===") @@ -77,6 +97,22 @@ async def _run_strategy_iteration(): def setup_scheduler(): """配置所有定时任务(交易日时间)""" + news_schedule = [ + ("news_pre_market", 8, 50, "pre_market"), + ("news_morning", 10, 5, "morning"), + ("news_noon", 12, 45, "noon"), + ("news_afternoon", 13, 55, "afternoon"), + ("news_post_market", 15, 40, "post_market"), + ] + for job_id, hour, minute, session_name in news_schedule: + scheduler.add_job( + _run_news_refresh, + CronTrigger(hour=hour, minute=minute, day_of_week="mon-fri"), + args=[session_name], + id=job_id, + replace_existing=True, + ) + # 盘前准备 09:00 - 计算前一日市场温度和板块数据 scheduler.add_job( _run_scan, CronTrigger(hour=9, minute=0, day_of_week="mon-fri"), diff --git a/backend/app/engine/screener.py b/backend/app/engine/screener.py index 44aae676..ad5003d9 100644 --- a/backend/app/engine/screener.py +++ b/backend/app/engine/screener.py @@ -3,10 +3,10 @@ 三阶段管道: Step 1: 主线定位 — 把实时板块/快照板块归一成系统 MarketTheme Step 2: 主题内选股 — 从主线主题成分、领涨股和实时异动中召回候选 - Step 3: 深度分析 — 资金顺势 + 供需 + 价格行为 + 趋势 + LLM + Step 3: 规则定价 — 催化 + 主题资金 + 个股资金 + 情绪角色 + 入场节奏 -评分公式:资金顺势 + 供需关系 + 价格行为 + 趋势。 -资金流和主题地位进入主评分,RSI/MACD 只作为节奏与风险参考。 +评分公式:市场热点/新闻催化 + 主线资金 + 个股资金 + 情绪地位 + 时机。 +技术指标只作为入场节奏与风控参考,不替代热点与资金主线。 风险乘数:惩罚取最大而非叠加(防过度惩罚),奖励可叠加。 @@ -19,7 +19,6 @@ import asyncio import logging -import traceback import pandas as pd @@ -144,29 +143,16 @@ async def run_screening(trade_date: str = None) -> dict: except Exception as e: logger.warning(f"注入实时价格失败,使用 Tushare 收盘价: {e}") - # ── Step 3: 规则边界 + LLM 两阶段裁决 ── - logger.info("=== Step 3: 规则边界 + LLM 两阶段裁决 ===") + # ── Step 3: 规则评分与交易计划 ── + logger.info("=== Step 3: 规则评分与交易计划 ===") recommendations = await _build_recommendations( candidates, market_temp, hot_sectors, market_temp_score, intraday, strategy_profile, ) - if settings.deepseek_api_key: - recommendations = [ - r for r in recommendations - if ( - _is_main_theme_recommendation(r) - and ( - r.action_plan in {"可操作", "重点关注"} - or (r.llm_score is not None and r.llm_score >= 6) - or r.score >= max(strategy_profile.min_score - 4, 56) - ) - ) - ] - else: - recommendations = [ - r for r in recommendations - if _is_main_theme_recommendation(r) and r.score >= strategy_profile.min_score - ] + recommendations = [ + r for r in recommendations + if _is_main_theme_recommendation(r) and r.score >= strategy_profile.min_score + ] recommendations = _finalize_battle_plan( recommendations=recommendations, @@ -388,7 +374,7 @@ async def _build_candidate_pool( ) -> list[dict]: """多路召回候选池。 - 目标是提高召回率,再交给 LLM 做资源分配与最终裁决。 + 目标是提高主线、资金、形态多路召回率,最终由规则评分统一排序。 """ merged: dict[str, dict] = {} @@ -610,21 +596,16 @@ async def _build_recommendations( intraday: bool = False, strategy_profile=None, ) -> list[Recommendation]: - """Step 3: 规则边界建模 + LLM 两阶段裁决。""" + """Step 3: 规则边界建模、评分与交易计划生成。""" from app.data.tushare_client import tushare_client from app.analysis.technical import add_all_indicators from app.analysis.breakout_signals import ( classify_entry_signal, score_supply_demand, - analyze_volume_pattern, EntrySignal, ) from app.analysis.signals import generate_signals from app.analysis.capital_flow import _score_valuation - from app.llm.batch_screener import ( - analyze_candidates_individually, - prefilter_candidates_individually, - ) # 名称和行业映射 stock_basic = tushare_client.get_stock_basic() @@ -636,7 +617,6 @@ async def _build_recommendations( industry_map[row["ts_code"]] = row.get("industry", "") recommendations = [] - llm_candidates = [] total = len(candidates) signal_counts = {"breakout": 0, "breakout_confirm": 0, "pullback": 0, "launch": 0, "reversal": 0, "none": 0} score_weights = strategy_profile.score_weights if strategy_profile else { @@ -914,7 +894,6 @@ async def _build_recommendations( stock["entry_signal_type"] = effective_signal_name reasons = _generate_reasons(stock, entry_signal, tech_signal, df, intraday) risk_note = _generate_risk_note(market_temp, tech_signal, stock) - vol_pattern = analyze_volume_pattern(df) entry_timing = _generate_entry_timing(effective_signal_name, intraday) trade_plan = _build_trade_plan( signal_type=effective_signal_name, @@ -1001,55 +980,8 @@ async def _build_recommendations( ) recommendations.append(rec) - # 收集 LLM 分析所需的候选摘要(不含 signal_type,让 LLM 独立判断) - llm_candidate = { - "ts_code": ts_code, - "name": name, - "sector": sector, - "quant_score": round(final_score, 1), - "position_score": round(position_score, 1), - "current_price": stock.get("price") or float(df.iloc[-1]["close"]), - "kline_summary": _summarize_for_llm(df, entry_signal, tech_signal), - "capital_flow_summary": ( - f"主力净流入{stock.get('main_net_inflow', 0):.0f}万, " - f"占比{stock.get('inflow_ratio', 0):.1f}%" - ), - "recall_tags": stock.get("recall_tags", []), - "sector_stage": sector_stage, - "hot_theme_matched": bool(hot_theme_match), - "hot_theme_name": hot_theme_match.sector_name if hot_theme_match else "", - "hot_theme_aliases": hot_theme_match.theme_aliases if hot_theme_match else [], - "stock_role_hint": stock.get("stock_role_hint", "待判断"), - "entry_signal_type": effective_signal_name, - "entry_signal_score": round(entry_signal.get("signal_score", 0), 1), - "flow_momentum_score": round(flow_momentum_score, 1), - "signal_matches_profile": signal_matches_profile, - "risk_tags": risk_tags, - "focus_points": _build_focus_points(stock, entry_signal, tech_signal, vol_pattern, sector_stage), - } - - if intraday: - try: - from app.data.eastmoney_client import get_min_kline, analyze_intraday_volume_distribution - min_df = await get_min_kline(ts_code, period="5", count=48) - if not min_df.empty: - vol_dist = analyze_intraday_volume_distribution(min_df) - llm_candidate["intraday_volume"] = ( - f"上午量占比{vol_dist['morning_volume_ratio']}%, " - f"下午{vol_dist['afternoon_volume_ratio']}%, " - f"开盘30分{vol_dist['opening_strength']}%, " - f"尾盘30分{vol_dist['closing_strength']}%, " - f"趋势={vol_dist['volume_trend']}" - ) - if vol_dist["key_periods"]: - llm_candidate["intraday_volume"] += f", 放量时段: {'; '.join(vol_dist['key_periods'])}" - except Exception as e: - logger.debug(f"分时量能数据获取失败 {ts_code}: {e}") - - llm_candidates.append(llm_candidate) - except Exception as e: - logger.debug(f"深度分析 {ts_code} 失败: {e}") + logger.debug(f"规则分析 {ts_code} 失败: {e}") continue logger.info( @@ -1061,143 +993,6 @@ async def _build_recommendations( ) recommendations.sort(key=lambda rec: rec.score, reverse=True) - - if settings.deepseek_api_key and llm_candidates: - try: - market_summary = ( - f"市场温度: {market_temp.temperature}/100, " - f"涨跌比: {market_temp.up_count}涨/{market_temp.down_count}跌, " - f"涨停: {market_temp.limit_up_count}家; " - f"今日主线主题: " - + " / ".join( - f"{s.sector_name}[{' / '.join((s.theme_aliases or [])[:3])}]" - f"({(s.realtime_pct_change if s.realtime_pct_change is not None else s.pct_change):+.2f}%)" - for s in hot_sectors[:5] - ) - ) - - llm_candidates.sort(key=lambda c: c["quant_score"], reverse=True) - prefilter_pool = llm_candidates[: settings.llm_prefilter_limit] - prefilter_results = await prefilter_candidates_individually( - prefilter_pool, - market_summary, - max_concurrent=settings.llm_prefilter_max_concurrent, - ) - - prioritized = [] - for item in prefilter_pool: - pre = prefilter_results.get(item["ts_code"], {}) - item["prefilter_decision"] = pre.get("decision", "watch") - item["prefilter_confidence"] = pre.get("confidence", 5) - item["prefilter_reason"] = pre.get("reason", "") - item["prefilter_focus_points"] = pre.get("focus_points", []) - if item["prefilter_decision"] == "priority": - rank_bonus = 16 - elif item["prefilter_decision"] == "watch": - rank_bonus = 6 - else: - rank_bonus = -12 - item["deep_rank"] = round(item["quant_score"] + rank_bonus + item["prefilter_confidence"] * 1.5, 1) - if item["prefilter_decision"] != "ignore": - prioritized.append(item) - - if not prioritized: - prioritized = prefilter_pool[: min(8, len(prefilter_pool))] - - prioritized.sort(key=lambda c: c.get("deep_rank", c["quant_score"]), reverse=True) - llm_top = prioritized[: settings.llm_final_limit] - llm_results = await analyze_candidates_individually(llm_top, market_summary) - - for rec in recommendations: - pre_item = next((item for item in prefilter_pool if item["ts_code"] == rec.ts_code), None) - if pre_item: - rec.prefilter_decision = pre_item.get("prefilter_decision", "") - rec.prefilter_reason = pre_item.get("prefilter_reason", "") - rec.focus_points = pre_item.get("prefilter_focus_points", []) - - llm_data = llm_results.get(rec.ts_code) - if llm_data: - rec.llm_analysis = llm_data.get("analysis", "") - rec.llm_score = float(llm_data.get("conviction", 0) or 0) - - verdict = llm_data.get("verdict", "watch") - action_plan = llm_data.get("action_plan", "") - conviction = float(llm_data.get("conviction", 6) or 6) - ai_score = conviction * 10 - - if verdict == "execute": - rec.score = round(rec.score * 0.4 + ai_score * 0.6 + 4, 1) - elif verdict == "watch": - rec.score = round(rec.score * 0.5 + ai_score * 0.5 - 2, 1) - else: # skip - rec.score = round(rec.score * 0.45 + ai_score * 0.35 - 18, 1) - - if verdict == "skip": - rec.signal = "HOLD" - rec.action_plan = "观察" - rec.lifecycle_status = "candidate" - if not rec.risk_note: - rec.risk_note = llm_data.get("risk_flag", "") or rec.risk_note - else: - if action_plan in {"可操作", "重点关注", "观察"}: - rec.action_plan = action_plan - elif verdict == "execute": - rec.action_plan = "可操作" - else: - rec.action_plan = "重点关注" - - rec.signal = "BUY" if verdict == "execute" else "HOLD" - if rec.action_plan == "可操作": - rec.lifecycle_status = "actionable" - elif rec.action_plan == "重点关注": - rec.lifecycle_status = "candidate" - - if llm_data.get("timing"): - rec.entry_timing = llm_data["timing"] - - if llm_data.get("trigger_condition"): - rec.trigger_condition = llm_data["trigger_condition"] - if llm_data.get("invalidation_condition"): - rec.invalidation_condition = llm_data["invalidation_condition"] - if llm_data.get("position_pct") is not None: - rec.suggested_position_pct = float(llm_data["position_pct"] or 0) - if llm_data.get("risk_flag"): - rec.risk_note = llm_data["risk_flag"] - - rec.level = _score_to_level(rec.score) - _apply_llm_trace( - rec, - verdict=verdict, - action_plan=rec.action_plan, - conviction=conviction, - reason=llm_data.get("analysis", "") or llm_data.get("risk_flag", ""), - ) - - # 用 LLM 给出的价格替代结构化规则价格 - if llm_data.get("entry_price"): - rec.entry_price = llm_data["entry_price"] - if llm_data.get("target_price"): - rec.target_price = llm_data["target_price"] - if llm_data.get("stop_loss"): - rec.stop_loss = llm_data["stop_loss"] - - recommendations = [ - rec for rec in recommendations - if not ( - rec.llm_score is not None - and rec.llm_score <= 4 - and rec.action_plan == "观察" - and rec.score < max(strategy_profile.min_score - 6, 54) - ) - ] - recommendations.sort(key=lambda r: r.score, reverse=True) - recommendations = recommendations[:settings.top_stock_count] - logger.info(f"LLM 两阶段分析完成, 综合评分后保留 {len(recommendations)} 只") - except Exception as e: - logger.error(f"LLM 两阶段分析失败, 仅使用规则边界: {e}") - from app.db.error_logger import log_error - await log_error("screener", f"LLM 两阶段分析失败, 仅使用规则边界: {e}", detail=traceback.format_exc()) - return recommendations @@ -2224,186 +2019,3 @@ def _build_trace_evidence( evidence.append("符合今日策略偏好的入场类型") return evidence[:5] - -def _apply_llm_trace( - rec: Recommendation, - verdict: str, - action_plan: str, - conviction: float, - reason: str, -) -> None: - trace = dict(rec.decision_trace or {}) - trace["llm_adjustment"] = { - "verdict": verdict, - "action_plan": action_plan, - "conviction": round(conviction, 1), - "reason": str(reason or "")[:180], - } - trace["action_plan"] = action_plan - if verdict == "execute": - trace["headline"] = f"AI确认可执行: {trace.get('headline', rec.name)}" - elif verdict == "skip": - trace["headline"] = f"AI降级观察: {trace.get('headline', rec.name)}" - rec.decision_trace = trace - - -def _build_focus_points( - stock: dict, - entry_signal: dict, - tech: TechnicalSignal | None, - vol_pattern: dict, - sector_stage: str, -) -> list[str]: - points: list[str] = [] - signal_type = entry_signal.get("signal_type") - if signal_type and getattr(signal_type, "value", "none") != "none": - points.append(f"确认{signal_type.value}信号是否延续") - elif stock.get("entry_signal_type") == "flow_momentum": - points.append("确认主力流入和板块前排强度是否延续") - if stock.get("main_net_inflow", 0) > 0: - points.append("观察主力流入是否继续放大") - if vol_pattern.get("volume_trend"): - points.append(f"量能状态: {vol_pattern['volume_trend']}") - if tech and tech.support_price: - points.append(f"关键支撑 {tech.support_price}") - if sector_stage in ("late", "end"): - points.append("板块已偏后段,注意是否还有前排承接") - return points[:4] - - -def _summarize_for_llm(df, entry_signal: dict, tech_signal: TechnicalSignal | None) -> str: - """生成 K 线分析结论供 LLM 判断(输出结论而非原始数据)""" - import pandas as pd - - last = df.iloc[-1] - parts = [] - - # ── 趋势结论 ── - ma_fields = ["ma5", "ma10", "ma20", "ma60"] - ma_vals = {m: last.get(m) for m in ma_fields} - - trend_desc = "趋势不明" - all_ma_valid = all(ma_vals.get(m) is not None and not pd.isna(ma_vals[m]) for m in ma_fields) - if all_ma_valid: - if ma_vals["ma5"] > ma_vals["ma10"] > ma_vals["ma20"] > ma_vals["ma60"]: - trend_desc = "强势多头排列(MA5>MA10>MA20>MA60)" - elif ma_vals["ma5"] > ma_vals["ma10"] > ma_vals["ma20"]: - trend_desc = "中短期多头(MA5>MA10>MA20)" - elif ma_vals["ma5"] > ma_vals["ma20"]: - trend_desc = "偏多(MA5在MA20上方)" - elif ma_vals["ma5"] < ma_vals["ma10"] < ma_vals["ma20"]: - trend_desc = "空头排列,趋势偏弱" - else: - trend_desc = "均线交织,趋势震荡" - - # MA20 方向 - if len(df) >= 5 and not pd.isna(last.get("ma20")) and not pd.isna(df.iloc[-5].get("ma20")): - ma20_now = last["ma20"] - ma20_5d = df.iloc[-5]["ma20"] - if ma20_5d > 0: - ma20_pct = (ma20_now - ma20_5d) / ma20_5d * 100 - if ma20_pct > 2: - trend_desc += ",MA20快速上扬" - elif ma20_pct > 0: - trend_desc += ",MA20缓慢上行" - else: - trend_desc += ",MA20走平或下行" - parts.append(trend_desc) - - # ── 量价结论 ── - if len(df) >= 10: - recent = df.tail(10) - up_days = recent[recent["pct_chg"] > 0] - down_days = recent[recent["pct_chg"] <= 0] - vol_conclusion = "" - if len(up_days) > 0 and len(down_days) > 0: - avg_up_vol = up_days["vol"].mean() - avg_down_vol = down_days["vol"].mean() - if avg_down_vol > 0: - ratio = avg_up_vol / avg_down_vol - if ratio > 1.5: - vol_conclusion = f"量价健康(上涨均量/下跌均量={ratio:.1f},需求主导)" - elif ratio > 1.0: - vol_conclusion = f"量价尚可(量比={ratio:.1f},需求略强)" - else: - vol_conclusion = f"量价偏弱(量比={ratio:.1f},供给主导)" - if not vol_conclusion: - vol_conclusion = "量价数据不足" - - # 最近5日量能变化 - recent_5 = df.tail(5) - vol_ma5 = recent_5["vol"].mean() - vol_ma10 = df.tail(10)["vol"].mean() - if vol_ma10 > 0: - vol_ratio = vol_ma5 / vol_ma10 - if vol_ratio > 1.5: - vol_conclusion += ",近5日明显放量" - elif vol_ratio < 0.7: - vol_conclusion += ",近5日缩量" - parts.append(vol_conclusion) - - # ── MACD 结论(节奏参考) ── - dif = last.get("dif", 0) or 0 - dea = last.get("dea", 0) or 0 - macd_desc = "" - if len(df) >= 3: - prev_dif = df.iloc[-2].get("dif", 0) or 0 - prev_dea = df.iloc[-2].get("dea", 0) or 0 - if dif > dea and prev_dif <= prev_dea: - macd_desc = "MACD刚金叉" - elif dif > dea: - macd_desc = "MACD金叉运行中" - elif dif < dea and prev_dif >= prev_dea: - macd_desc = "MACD刚死叉" - elif dif < dea: - macd_desc = "MACD死叉运行中" - - if dif > 0: - macd_desc += ",零轴上方(偏多)" - else: - macd_desc += ",零轴下方(偏空)" - parts.append((macd_desc or "MACD数据不足") + ";仅作节奏参考") - - # ── RSI 结论(风险提示,不做买卖裁判) ── - rsi = last.get("rsi14", 50) - if not pd.isna(rsi): - if rsi > 80: - parts.append(f"RSI14={rsi:.0f},偏热,提示追高风险但不单独否决资金顺势") - elif rsi > 70: - parts.append(f"RSI14={rsi:.0f},偏高,注意追高风险") - elif rsi >= 40: - parts.append(f"RSI14={rsi:.0f},节奏中性") - else: - parts.append(f"RSI14={rsi:.0f},偏低,提示弱势或反弹弹性,不单独构成买点") - - # ── 价格位置结论 ── - if tech_signal: - pos_parts = [] - if tech_signal.rally_pct_5d > 15: - pos_parts.append(f"5日已涨{tech_signal.rally_pct_5d}%,追高风险大") - elif tech_signal.rally_pct_5d > 8: - pos_parts.append(f"5日涨{tech_signal.rally_pct_5d}%,短期有一定涨幅") - elif tech_signal.rally_pct_5d > 0: - pos_parts.append(f"5日涨{tech_signal.rally_pct_5d}%,涨幅温和") - else: - pos_parts.append(f"5日跌{abs(tech_signal.rally_pct_5d)}%,回调中") - - if tech_signal.distance_from_high >= 0: - pos_parts.append("处于60日新高附近") - elif tech_signal.distance_from_high > -5: - pos_parts.append(f"距60日高点{abs(tech_signal.distance_from_high):.1f}%") - else: - pos_parts.append(f"距60日高点{abs(tech_signal.distance_from_high):.1f}%,位置较低") - - parts.append("位置: " + ",".join(pos_parts)) - - # ── 近5日价格走势简述 ── - if len(df) >= 5: - recent_5 = df.tail(5) - closes = recent_5["close"].tolist() - first_c = closes[0] - last_c = closes[-1] - pct_5d = (last_c - first_c) / first_c * 100 - parts.append(f"当前价: {last_c:.2f},5日{'涨' if pct_5d >= 0 else '跌'}{abs(pct_5d):.1f}%") - - return "\n".join(parts) diff --git a/backend/app/llm/batch_screener.py b/backend/app/llm/batch_screener.py index 4f4ec546..ba5e8b4d 100644 --- a/backend/app/llm/batch_screener.py +++ b/backend/app/llm/batch_screener.py @@ -1,4 +1,9 @@ -"""LLM 候选预筛 + 逐股深度分析 +"""历史 LLM 候选预筛 + 逐股深度分析工具。 + +弃用说明: +生产推荐链路禁止调用本模块。A 股批量选股必须由热点/催化、资金流、 +情绪地位和规则化交易计划决定,LLM 只能做用户主动触发的解释、会诊、 +离线复盘或新闻催化归因。 先做轻量预筛,控制深度裁决成本; 再对重点股票单独调用 LLM 做深度分析。 diff --git a/backend/app/llm/prompts.py b/backend/app/llm/prompts.py index 8228ade5..58e03eed 100644 --- a/backend/app/llm/prompts.py +++ b/backend/app/llm/prompts.py @@ -42,15 +42,17 @@ CHAT_SYSTEM_PROMPT = """\ 3. 查询当前用户的自选股池与最新建议 4. 查询个股K线、技术面、资金流向数据 5. 搜索股票代码,并把结果放回当前交易语境中分析 -6. 对单只股票生成系统化会诊,输出结论、触发条件、失效条件、仓位边界和风险清单 +6. 在用户明确要求时,对单只股票生成系统化会诊,输出结论、触发条件、失效条件、仓位边界和风险清单 重要提醒: - 回答用户关于"今天市场怎么样"之类的问题时,必须调用 get_realtime_indices 获取实时指数数据 - 回答用户关于"今天该怎么做"、"当前推荐怎么看"、"自选股该怎么处理"这类问题时,优先调用 get_strategy_board、get_latest_recommendations、get_user_watchlist_snapshot -- 回答用户关于某只股票能不能看、是否该买、持仓怎么处理、为什么涨跌、是否要复盘时,必须先 search_stock(如果用户没给标准 ts_code),再调用 diagnose_stock;必要时补充 get_stock_capital_flow、get_stock_technical_signal +- 回答用户关于某只股票时,先 search_stock(如果用户没给标准 ts_code),再用 get_latest_recommendations、get_user_watchlist_snapshot、get_stock_capital_flow、get_stock_technical_signal 等工具读取现有证据 +- 只有当用户明确要求"诊断"、"会诊"、"深度分析"、"生成报告"或正在使用诊断页面时,才调用 diagnose_stock;不要在普通聊天、列表解释、推荐解读中自动触发个股会诊 - 盘中时段(9:30-15:00)必须使用实时数据,盘后时段使用当日收盘或最近一次系统生成的数据 - 不要脱离系统上下文泛泛而谈,必须先调用工具获取最新结果再回答 - 个股分析必须优先看资金流向、主线板块、量价关系、价格行为和位置边界;RSI/MACD/KDJ 只做最后的节奏与风控备注,不能因超买超卖本身直接否决或买入 +- LLM 只负责解释、归纳和生成用户主动请求的诊断文本,不参与批量选股、排序、行动计划、止盈止损或策略换挡 回答要求: 1. 使用工具获取最新数据后再回答,不要凭空编造数据 diff --git a/backend/app/llm/strategy_selector.py b/backend/app/llm/strategy_selector.py index ebbaf3f6..848fc18d 100644 --- a/backend/app/llm/strategy_selector.py +++ b/backend/app/llm/strategy_selector.py @@ -1,7 +1,8 @@ """动态策略选择器 在固定筛选引擎前增加一层“先选打法,再选股票”的策略决策。 -规则负责稳定分类,LLM 负责补充语义判断和操作建议。 +生产筛选只使用规则和策略配置,保证同一份行情输入得到稳定输出。 +LLM 只能用于离线复盘、配置建议或解释,不参与盘中策略换挡。 """ import json @@ -125,14 +126,7 @@ async def select_strategy_profile( from app.llm.strategy_config import load_active_strategy_profile profile = _select_rule_profile(market_temp, hot_sectors, intraday) - profile = await load_active_strategy_profile(profile) - - if settings.deepseek_api_key: - llm_profile = await _select_llm_profile(market_temp, hot_sectors, intraday, profile) - if llm_profile: - profile = await load_active_strategy_profile(llm_profile) - - return profile + return await load_active_strategy_profile(profile) def _select_rule_profile( diff --git a/backend/app/news/__init__.py b/backend/app/news/__init__.py new file mode 100644 index 00000000..97d2d0d1 --- /dev/null +++ b/backend/app/news/__init__.py @@ -0,0 +1,5 @@ +"""新闻源采集层。 + +该层只负责把外部新闻落到本地数据库。题材归因由 catalyst 层处理, +页面和普通 API 不应直接触发外部新闻抓取。 +""" diff --git a/backend/app/news/collector.py b/backend/app/news/collector.py new file mode 100644 index 00000000..1c2f80a9 --- /dev/null +++ b/backend/app/news/collector.py @@ -0,0 +1,189 @@ +"""多源新闻采集器。""" + +from __future__ import annotations + +import email.utils +import html +import logging +import re +import xml.etree.ElementTree as ET +from datetime import datetime, timedelta + +import httpx + +from app.config import settings +from app.data.tushare_client import tushare_client +from app.news.models import NewsItem + +logger = logging.getLogger(__name__) +_tushare_source_cursor = 0 + +RSS_HEADERS = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", +} + + +async def collect_news_sources( + lookback_hours: int | None = None, + limit_per_source: int | None = None, +) -> list[NewsItem]: + """从配置的数据源采集新闻。失败源隔离,不影响其他源。""" + lookback_hours = lookback_hours or settings.news_fetch_lookback_hours + limit_per_source = limit_per_source or settings.news_fetch_limit_per_source + items: list[NewsItem] = [] + + for source in _select_tushare_sources_for_run(): + try: + items.extend(await _collect_tushare_news(source, lookback_hours, limit_per_source)) + except Exception as e: + logger.warning("Tushare 新闻源采集失败 source=%s error=%s", source, e) + + rss_sources = _parse_rss_sources(settings.news_rss_sources) + if rss_sources: + async with httpx.AsyncClient(headers=RSS_HEADERS, timeout=10.0, follow_redirects=True) as client: + for name, url in rss_sources: + try: + items.extend(await _collect_rss(client, name, url, lookback_hours, limit_per_source)) + except Exception as e: + logger.warning("RSS 新闻源采集失败 source=%s url=%s error=%s", name, url, e) + + return _dedup_in_memory(items) + + +async def _collect_tushare_news(source: str, lookback_hours: int, limit: int) -> list[NewsItem]: + df = tushare_client.get_news( + source=source, + start_time=datetime.now() - timedelta(hours=lookback_hours), + end_time=datetime.now(), + limit=limit, + ) + if df.empty: + return [] + + items: list[NewsItem] = [] + for _, row in df.iterrows(): + title = _clean_text(row.get("title", "")) + if len(title) < settings.news_min_title_length: + continue + content = _clean_text(row.get("content", "")) + items.append(NewsItem( + title=title, + content=content, + summary=_clean_text(row.get("summary", "")), + source=f"tushare:{source}", + url=str(row.get("url", "") or ""), + published_at=_parse_datetime(row.get("datetime") or row.get("time") or row.get("publish_time")), + )) + return items[:limit] + + +async def _collect_rss( + client: httpx.AsyncClient, + source: str, + url: str, + lookback_hours: int, + limit: int, +) -> list[NewsItem]: + resp = await client.get(url) + resp.raise_for_status() + root = ET.fromstring(resp.content) + cutoff = datetime.now() - timedelta(hours=lookback_hours) + items: list[NewsItem] = [] + + for item in root.findall(".//item")[: limit * 2]: + title = _clean_text(_xml_text(item, "title")) + if len(title) < settings.news_min_title_length: + continue + published_at = _parse_datetime(_xml_text(item, "pubDate")) + if published_at and published_at < cutoff: + continue + summary = _clean_text(_xml_text(item, "description")) + items.append(NewsItem( + title=title, + content=summary, + summary=summary[:240], + source=f"rss:{source}", + url=_clean_text(_xml_text(item, "link")), + published_at=published_at, + )) + if len(items) >= limit: + break + + return items + + +def _split_csv(value: str) -> list[str]: + return [item.strip() for item in (value or "").split(",") if item.strip()] + + +def _parse_rss_sources(value: str) -> list[tuple[str, str]]: + result: list[tuple[str, str]] = [] + for chunk in _split_csv(value): + if "|" not in chunk: + continue + name, url = chunk.split("|", 1) + name = name.strip() + url = url.strip() + if name and url: + result.append((name, url)) + return result + + +def _select_tushare_sources_for_run() -> list[str]: + """Tushare news 免费/低权限账号通常限制 1 次/分钟,每轮只取少量源。""" + global _tushare_source_cursor + + sources = _split_csv(settings.news_tushare_sources) + if not sources: + return [] + + limit = max(1, min(int(settings.news_tushare_sources_per_run or 1), len(sources))) + selected: list[str] = [] + for offset in range(limit): + selected.append(sources[(_tushare_source_cursor + offset) % len(sources)]) + _tushare_source_cursor = (_tushare_source_cursor + limit) % len(sources) + return selected + + +def _xml_text(item: ET.Element, tag: str) -> str: + node = item.find(tag) + return node.text if node is not None and node.text else "" + + +def _clean_text(value) -> str: + text = html.unescape(str(value or "")) + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r"\s+", " ", text) + return text.strip() + + +def _parse_datetime(value) -> datetime | None: + if value is None: + return None + if isinstance(value, datetime): + return value.replace(tzinfo=None) + text = str(value).strip() + if not text: + return None + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y%m%d%H%M%S", "%Y%m%d"): + try: + return datetime.strptime(text[: len(fmt)], fmt) + except Exception: + pass + try: + parsed = email.utils.parsedate_to_datetime(text) + return parsed.replace(tzinfo=None) + except Exception: + return None + + +def _dedup_in_memory(items: list[NewsItem]) -> list[NewsItem]: + seen: set[str] = set() + result: list[NewsItem] = [] + for item in items: + key = re.sub(r"\W+", "", item.title.lower())[:80] + if not key or key in seen: + continue + seen.add(key) + result.append(item) + return result diff --git a/backend/app/news/models.py b/backend/app/news/models.py new file mode 100644 index 00000000..ded5d7fa --- /dev/null +++ b/backend/app/news/models.py @@ -0,0 +1,13 @@ +"""新闻采集领域模型。""" + +from datetime import datetime +from pydantic import BaseModel + + +class NewsItem(BaseModel): + title: str + content: str = "" + summary: str = "" + source: str = "" + url: str = "" + published_at: datetime | None = None diff --git a/backend/app/news/pipeline.py b/backend/app/news/pipeline.py new file mode 100644 index 00000000..ef1b3982 --- /dev/null +++ b/backend/app/news/pipeline.py @@ -0,0 +1,50 @@ +"""新闻采集与催化归因流水线。""" + +from __future__ import annotations + +import logging + +from app.config import settings +from app.catalyst.service import analyze_pending_news, ingest_news_items +from app.news.collector import collect_news_sources + +logger = logging.getLogger(__name__) + + +async def refresh_news_catalysts( + lookback_hours: int | None = None, + limit_per_source: int | None = None, + analyze_limit: int | None = None, + use_llm: bool = True, +) -> dict: + if not settings.news_collection_enabled: + return { + "enabled": False, + "fetched": 0, + "inserted": 0, + "duplicates": 0, + "analyzed": 0, + "skipped": 0, + "failed": 0, + } + + items = await collect_news_sources( + lookback_hours=lookback_hours or settings.news_fetch_lookback_hours, + limit_per_source=limit_per_source or settings.news_fetch_limit_per_source, + ) + ingest_result = await ingest_news_items(items) + remaining_result = await analyze_pending_news( + limit=analyze_limit or settings.news_analyze_limit_per_run, + use_llm=use_llm, + ) + result = { + "enabled": True, + "fetched": len(items), + "inserted": ingest_result.get("inserted", 0), + "duplicates": ingest_result.get("duplicates", 0), + "analyzed": ingest_result.get("analyzed", 0) + remaining_result.get("analyzed", 0), + "skipped": remaining_result.get("skipped", 0), + "failed": ingest_result.get("failed", 0) + remaining_result.get("failed", 0), + } + logger.info("新闻催化刷新完成: %s", result) + return result diff --git a/frontend/src/app/(auth)/dashboard/page.tsx b/frontend/src/app/(auth)/dashboard/page.tsx index 2b1e97a9..b383e523 100644 --- a/frontend/src/app/(auth)/dashboard/page.tsx +++ b/frontend/src/app/(auth)/dashboard/page.tsx @@ -606,7 +606,7 @@ function FocusStockCard({ rec }: { rec: RecommendationData }) {
- {rec.decision_trace?.headline ?? rec.trigger_condition ?? rec.entry_timing ?? rec.prefilter_reason ?? rec.reasons?.[0] ?? "等待新的触发条件。"} + {rec.decision_trace?.headline ?? rec.trigger_condition ?? rec.entry_timing ?? rec.reasons?.[0] ?? "等待新的触发条件。"}
{(rec.invalidation_condition || rec.risk_note) ? ( diff --git a/frontend/src/app/(auth)/sentiment/page.tsx b/frontend/src/app/(auth)/sentiment/page.tsx new file mode 100644 index 00000000..a9c1ea81 --- /dev/null +++ b/frontend/src/app/(auth)/sentiment/page.tsx @@ -0,0 +1,429 @@ +"use client"; + +import { useCallback, useEffect, useMemo, useState } from "react"; +import { ErrorBoundary } from "@/components/error-boundary"; +import { fetchAPI } from "@/lib/api"; +import type { CatalystEvent, CatalystNewsItem, ThemeCatalystScore } from "@/lib/api"; +import { useWebSocket } from "@/hooks/use-websocket"; + +type CatalystTone = "hot" | "warm" | "quiet"; + +export default function SentimentPage() { + const [news, setNews] = useState([]); + const [events, setEvents] = useState([]); + const [themes, setThemes] = useState([]); + const [loading, setLoading] = useState(true); + const [statusFilter, setStatusFilter] = useState("all"); + + const loadData = useCallback(async () => { + try { + const [newsData, eventData, themeData] = await Promise.all([ + fetchAPI("/api/catalysts/news?limit=80&hours=72").catch(() => []), + fetchAPI("/api/catalysts/recent?limit=60&hours=72").catch(() => []), + fetchAPI("/api/catalysts/theme-scores?limit=20&hours=72").catch(() => []), + ]); + setNews(newsData); + setEvents(eventData); + setThemes(themeData); + } finally { + setLoading(false); + } + }, []); + + useEffect(() => { + loadData(); + }, [loadData]); + + useWebSocket( + useCallback((message: { type: string }) => { + if (message.type === "news_catalysts_ready" || message.type === "sector_scan_ready" || message.type === "scan_complete") { + loadData(); + } + }, [loadData]) + ); + + const analyzedCount = news.filter((item) => item.status === "analyzed").length; + const pendingCount = news.filter((item) => item.status === "pending").length; + const failedCount = news.filter((item) => item.status === "failed").length; + const topTheme = themes[0]; + const headline = buildSentimentHeadline(themes, events); + + const filteredNews = useMemo(() => { + if (statusFilter === "all") return news; + return news.filter((item) => item.status === statusFilter); + }, [news, statusFilter]); + + const eventsById = useMemo(() => { + const map = new Map(); + events.forEach((event) => map.set(event.id, event)); + return map; + }, [events]); + + return ( + +
+
+

舆情雷达

+
+ +
+
+
+
+
市场情绪线索
+

{headline.title}

+

{headline.detail}

+
+
+ + + item.catalyst_score >= 70).length} tone="hot" /> + +
+
+ +
+ + + +
+
+ +
+
+

主题催化排行

+ 72小时 +
+
+ {themes.length ? themes.slice(0, 8).map((theme, index) => ( + + )) : ( + + )} +
+
+
+ +
+
+
+
+

舆情流

+

后台抓取的新闻、政策和公告线索。

+
+
+ {[ + { key: "all", label: "全部", count: news.length }, + { key: "analyzed", label: "已归因", count: analyzedCount }, + { key: "pending", label: "待处理", count: pendingCount }, + { key: "failed", label: "异常", count: failedCount }, + ].map((tab) => ( + + ))} +
+
+ +
+ {filteredNews.length ? filteredNews.map((item) => ( + + )) : ( +
+ {loading ? "加载舆情流..." : "暂无符合条件的舆情。"} +
+ )} +
+
+ + +
+
+
+ ); +} + +function buildSentimentHeadline(themes: ThemeCatalystScore[], events: CatalystEvent[]) { + const top = themes[0]; + const strongCount = themes.filter((item) => item.catalyst_score >= 70).length; + const policyCount = events.filter((item) => item.catalyst_type === "policy").length; + + if (!top) { + return { + title: "舆情等待后台归因", + detail: "新闻采集和催化分析会在后台任务完成后更新。", + action: "等待线索", + risk: "无数据不判断", + }; + } + + if (top.catalyst_score >= 75) { + return { + title: `${top.theme_name} 舆情升温`, + detail: `过去 72 小时 ${top.catalyst_count} 条催化线索,${strongCount} 个主题达到强催化阈值。${policyCount ? `其中政策类线索 ${policyCount} 条。` : ""}`, + action: "优先核对资金回流", + risk: "热度兑现", + }; + } + + return { + title: `${top.theme_name} 有线索但未形成强共振`, + detail: `当前催化分 ${top.catalyst_score.toFixed(0)},更适合观察是否被资金和板块前排确认。`, + action: "跟踪扩散", + risk: "证据不足", + }; +} + +function RadarMetric({ label, value, tone }: { label: string; value: number; tone: CatalystTone }) { + const toneClass = getToneClass(tone); + return ( +
+
{label}
+
{value}
+
+ ); +} + +function RadarDecision({ title, value, detail, tone }: { title: string; value: string; detail: string; tone: CatalystTone }) { + const toneClass = getToneClass(tone); + return ( +
+
{title}
+
{value}
+
{detail}
+
+ ); +} + +function ThemePulseRow({ theme, rank }: { theme: ThemeCatalystScore; rank: number }) { + const score = Math.max(0, Math.min(theme.catalyst_score, 100)); + const tone = score >= 70 ? "hot" : score >= 45 ? "warm" : "quiet"; + const toneClass = getToneClass(tone); + + return ( +
+
+
+
+ {rank} +
+
+
{theme.theme_name}
+
+ {theme.top_reasons?.[0] ?? "等待更多舆情证据"} +
+
+
+
{score.toFixed(0)}
+
+
+
+
+
+ ); +} + +function NewsRow({ item, event }: { item: CatalystNewsItem; event?: CatalystEvent }) { + const status = getStatusMeta(item.status); + const eventTone = event ? getEventTone(event) : "quiet"; + const toneClass = getToneClass(eventTone); + const title = event?.title || item.title; + const href = item.url || event?.url || ""; + + return ( +
+
+
+
+ {status.label} + {formatSource(item.source)} + {formatDateTime(item.published_at || item.created_at)} +
+ {href ? ( + + {title} + + ) : ( +

{title}

+ )} + {event?.summary || event?.llm_reason ? ( +

+ {event.llm_reason || event.summary} +

+ ) : item.error ? ( +

{item.error}

+ ) : null} +
+ +
+ + + +
+
+ {event?.themes ? ( +
+ 归因:{event.themes} +
+ ) : null} + {event ? ( +
+ {formatCatalystType(event.catalyst_type)} + 本地已归档 +
+ ) : null} +
+ ); +} + +function CatalystInsight({ event }: { event: CatalystEvent }) { + const tone = getEventTone(event); + const toneClass = getToneClass(tone); + return ( +
+
+
+
+ {formatCatalystType(event.catalyst_type)} + {formatDateTime(event.published_at || event.created_at)} +
+
{event.title}
+
+ {event.llm_reason || event.summary || "等待更多解释"} +
+
+
{Math.round(event.strength)}
+
+
+ ); +} + +function MiniScore({ label, value, tone }: { label: string; value?: number; tone: CatalystTone }) { + const toneClass = getToneClass(tone); + return ( +
+
{label}
+
+ {value == null ? "--" : Math.round(value)} +
+
+ ); +} + +function BoundaryLine({ text }: { text: string }) { + return ( +
+ + {text} +
+ ); +} + +function EmptyLine({ text }: { text: string }) { + return ( +
+ {text} +
+ ); +} + +function getToneClass(tone: CatalystTone) { + if (tone === "hot") { + return { + box: "border-red-500/15 bg-red-500/[0.08]", + text: "text-red-300", + bar: "bg-red-400", + }; + } + if (tone === "warm") { + return { + box: "border-amber-500/15 bg-amber-500/[0.08]", + text: "text-amber-300", + bar: "bg-amber-400", + }; + } + return { + box: "border-border-subtle bg-surface-2", + text: "text-text-secondary", + bar: "bg-text-muted", + }; +} + +function getEventTone(event: CatalystEvent): CatalystTone { + if (event.strength >= 70 || event.confidence >= 75) return "hot"; + if (event.strength >= 45 || event.freshness >= 55) return "warm"; + return "quiet"; +} + +function getStatusMeta(status: string) { + if (status === "analyzed") { + return { label: "已归因", className: "border-emerald-500/15 bg-emerald-500/10 text-emerald-300" }; + } + if (status === "pending") { + return { label: "待处理", className: "border-amber-500/15 bg-amber-500/10 text-amber-300" }; + } + if (status === "failed") { + return { label: "异常", className: "border-red-500/15 bg-red-500/10 text-red-300" }; + } + if (status === "skipped") { + return { label: "已忽略", className: "border-border-subtle bg-surface-2 text-text-muted" }; + } + return { label: status || "未知", className: "border-border-subtle bg-surface-2 text-text-muted" }; +} + +function formatCatalystType(type: string) { + const labels: Record = { + policy: "政策", + industry: "产业", + event: "事件", + earnings: "业绩", + announcement: "公告", + news: "新闻", + }; + return labels[type] ?? type; +} + +function formatSource(source: string) { + if (!source) return "未知来源"; + return source.replace("tushare:", "").replace("rss:", ""); +} + +function formatDateTime(value?: string | null) { + if (!value) return "暂无时间"; + const date = new Date(value); + if (Number.isNaN(date.getTime())) return value; + return date.toLocaleString("zh-CN", { + month: "2-digit", + day: "2-digit", + hour: "2-digit", + minute: "2-digit", + }); +} diff --git a/frontend/src/app/(auth)/stock/[code]/page.tsx b/frontend/src/app/(auth)/stock/[code]/page.tsx index d462b95a..3dbe2993 100644 --- a/frontend/src/app/(auth)/stock/[code]/page.tsx +++ b/frontend/src/app/(auth)/stock/[code]/page.tsx @@ -107,7 +107,7 @@ export default function StockDetailPage() { const recFromHistory = history .flatMap((group) => group.recommendations) - .find((rec) => rec.ts_code === code); + .find((rec) => sameStockCode(rec.ts_code, code)); const rec = thesisData?.recommendation ?? recFromHistory ?? null; if (rec) { @@ -122,11 +122,6 @@ export default function StockDetailPage() { setRecScore(null); } - setQuote(null); - setSignals(null); - setKline([]); - setCapitalFlow([]); - setEvidenceLoaded(false); } finally { if (!cancelled) setLoading(false); } @@ -138,35 +133,52 @@ export default function StockDetailPage() { }; }, [code]); - const loadEvidence = async () => { - if (!code || evidenceLoading || evidenceLoaded) return; - setEvidenceLoading(true); - try { - const [quoteData, signalsData, klineData, flowData] = await Promise.all([ - fetchAPI(`/api/stocks/${code}/quote`).catch(() => null), - fetchAPI(`/api/stocks/${code}/signals`).catch(() => null), - fetchAPI(`/api/stocks/${code}/kline?days=120`).catch(() => []), - fetchAPI(`/api/stocks/${code}/capital_flow?days=10`).catch(() => []), - ]); - setQuote(isValidQuote(quoteData) ? quoteData : null); - setSignals(signalsData); - setKline(Array.isArray(klineData) ? klineData : []); - setCapitalFlow(Array.isArray(flowData) ? flowData : []); - setEvidenceLoaded(true); - } finally { - setEvidenceLoading(false); + useEffect(() => { + if (!code) return; + + let cancelled = false; + + async function loadEvidence() { + setEvidenceLoading(true); + setEvidenceLoaded(false); + setQuote(null); + setSignals(null); + setKline([]); + setCapitalFlow([]); + + try { + const [quoteData, signalsData, klineData, flowData] = await Promise.all([ + fetchAPI(`/api/stocks/${code}/quote`).catch(() => null), + fetchAPI(`/api/stocks/${code}/signals`).catch(() => null), + fetchAPI(`/api/stocks/${code}/kline?days=120`).catch(() => []), + fetchAPI(`/api/stocks/${code}/capital_flow?days=10`).catch(() => []), + ]); + + if (cancelled) return; + setQuote(isValidQuote(quoteData) ? quoteData : null); + setSignals(signalsData); + setKline(Array.isArray(klineData) ? klineData : []); + setCapitalFlow(Array.isArray(flowData) ? flowData : []); + setEvidenceLoaded(true); + } finally { + if (!cancelled) setEvidenceLoading(false); + } } - }; + + loadEvidence(); + return () => { + cancelled = true; + }; + }, [code]); const recommendation = thesis?.recommendation; const latestTracking = thesis?.latest_tracking; const latestFlow = capitalFlow.length > 0 ? capitalFlow[capitalFlow.length - 1] : null; const pageName = recommendation?.name || thesis?.name || quote?.name || code; - const conviction = recommendation?.llm_score != null ? Math.round(recommendation.llm_score) : null; return ( -
+
-
-
-
+
+
- 个股作战卡 + 个股作战卡 {recommendation?.action_plan ? ( {recommendation.action_plan} @@ -197,33 +208,34 @@ export default function StockDetailPage() {

{pageName}

- {code} + {quote?.ts_code || thesis?.ts_code || code}
-

+

{buildHeroSummary(thesis, quote)}

-
- {(thesis?.decision_points ?? []).slice(0, 3).map((point) => ( - - ))} +
+ + +
-
- -
- {thesis ? "已读取最近推荐、跟踪和诊断记录。" : "加载中"} +
+
+ +
+
- - - - + + + +
{(recommendation?.recall_tags?.length ?? 0) > 0 ? ( -
+
{(recommendation?.recall_tags ?? []).slice(0, 4).map((tag) => ( {formatRecallTag(tag)} @@ -250,45 +262,24 @@ export default function StockDetailPage() {
加载个股推演中...
) : ( <> -
+
+ +
+ {kline.length > 0 ? : } + {capitalFlow.length > 0 ? : } +
+ {latestFlow ? : null} - +
+ +
+ + +
- -
- - - -
- -
-
- - -
-
- - {evidenceLoaded ? ( - <> -
- {kline.length > 0 ? : } - {capitalFlow.length > 0 ? : } -
- - {latestFlow ? ( - - ) : null} - - ) : null} )}
@@ -307,12 +298,11 @@ function PlanCard({
- {recommendation?.llm_score != null ? ( - 把握度 {Math.round(recommendation.llm_score)}/10 + {recommendation?.score != null ? ( + 规则分 {Math.round(recommendation.score)} ) : null}
- {recommendation?.prefilter_reason ? : null} {(recommendation?.focus_points?.length ?? 0) > 0 ? ( ) : null} @@ -328,6 +318,44 @@ function PlanCard({ ); } +function EvidenceStatus({ loading, loaded, hasQuote }: { loading: boolean; loaded: boolean; hasQuote: boolean }) { + const text = loading ? "同步中" : loaded && hasQuote ? "已同步" : loaded ? "暂无行情" : "等待同步"; + const color = loading + ? "border-amber-500/20 bg-amber-500/10 text-amber-400" + : loaded && hasQuote + ? "border-cyan-500/20 bg-cyan-500/10 text-cyan-400" + : "border-border-subtle bg-surface-2 text-text-muted"; + + return {text}; +} + +function QuotePriceLine({ quote, loading }: { quote: QuoteData | null; loading: boolean }) { + if (!quote) { + return ( +
+ {loading ? "正在读取最新行情证据..." : "暂无实时行情"} +
+ ); + } + + return ( +
+
+
现价
+
0 ? "text-red-400" : quote.pct_chg < 0 ? "text-emerald-400" : "text-text-primary"}`}> + {quote.price.toFixed(2)} +
+
+
+
涨跌幅
+
0 ? "text-red-400" : quote.pct_chg < 0 ? "text-emerald-400" : "text-text-muted"}`}> + {quote.pct_chg > 0 ? "+" : ""}{quote.pct_chg.toFixed(2)}% +
+
+
+ ); +} + function formatRecallTag(tag: string): string { const labels: Record = { sector_recall: "主线入选", @@ -341,23 +369,16 @@ function formatRecallTag(tag: string): string { return labels[tag] ?? tag; } -function formatPrefilterDecision(decision?: string | null): string { - const labels: Record = { - priority: "优先研究", - watch: "保留观察", - ignore: "暂不处理", - }; - return labels[decision ?? ""] ?? "暂无"; -} - function EvidenceCard({ recommendation, quote, signals, + loading, }: { recommendation: RecommendationData | null | undefined; quote: QuoteData | null; signals: StockSignals | null; + loading: boolean; }) { const reasons = recommendation?.reasons ?? []; const evidenceChips = [ @@ -369,12 +390,15 @@ function EvidenceCard({ signals?.pullback_support ? "回踩支撑" : null, ].filter(Boolean) as string[]; return ( -
- +
+
+ + {loading ? 行情同步中 : null} +
{evidenceChips.length ? (
{evidenceChips.slice(0, 6).map((item) => ( - + {item} ))} @@ -391,7 +415,7 @@ function EvidenceCard({ )}
{recommendation?.risk_note ? ( -
+
风险提示:{recommendation.risk_note}
) : null} @@ -447,10 +471,21 @@ function TrackingCard({ tracking }: { tracking: StockThesisResponse["latest_trac ); } -function QuoteSnapshot({ quote, evidenceLoaded }: { quote: QuoteData | null; evidenceLoaded: boolean }) { +function QuoteSnapshot({ + quote, + evidenceLoading, + evidenceLoaded, +}: { + quote: QuoteData | null; + evidenceLoading: boolean; + evidenceLoaded: boolean; +}) { return (
- +
+ + +
{quote ? ( <>
@@ -471,7 +506,7 @@ function QuoteSnapshot({ quote, evidenceLoaded }: { quote: QuoteData | null; evi ) : evidenceLoaded ? (
暂无行情证据
) : ( -
点击“加载行情证据”后查看
+
{evidenceLoading ? "正在读取行情证据..." : "等待行情同步"}
)}
); @@ -480,10 +515,12 @@ function QuoteSnapshot({ quote, evidenceLoaded }: { quote: QuoteData | null; evi function SignalSnapshot({ signals, recScore, + evidenceLoading, evidenceLoaded, }: { signals: StockSignals | null; recScore: RecScore | null; + evidenceLoading: boolean; evidenceLoaded: boolean; }) { return ( @@ -506,7 +543,7 @@ function SignalSnapshot({ ) : evidenceLoaded ? (
暂无技术信号
) : ( -
点击“加载行情证据”后查看
+
{evidenceLoading ? "正在计算技术信号..." : "等待信号同步"}
)}
); @@ -537,7 +574,7 @@ function CapitalFlowBreakdown({ flow }: { flow: FlowRecord }) { function ChartEmptyCard({ title, description }: { title: string; description: string }) { return ( -
+

{title}

{description}
@@ -546,8 +583,8 @@ function ChartEmptyCard({ title, description }: { title: string; description: st function DecisionPoint({ label, value }: { label: string; value: string }) { return ( -
-
{label}
+
+
{label}
{value}
); @@ -562,11 +599,18 @@ function PlanRow({ label, value }: { label: string; value: string }) { ); } -function MiniDataCell({ label, value }: { label: string; value: string | number }) { +function MiniDataCell({ label, value, tone }: { label: string; value: string | number; tone?: number | null }) { + const toneClass = tone == null + ? "text-text-secondary" + : tone > 0 + ? "text-red-400" + : tone < 0 + ? "text-emerald-400" + : "text-text-secondary"; return ( -
+
{label}
-
{value}
+
{value}
); } @@ -613,7 +657,7 @@ function FlowBar({ label, value, max }: { label: string; value: number; max: num } function SectionTitle({ title }: { title: string }) { - return
{title}
; + return
{title}
; } function getLifecycleLabel(status: string) { @@ -626,7 +670,7 @@ function getLifecycleLabel(status: string) { expired: "到期复盘", invalidated: "已失效", }; - return labels[status] ?? status; + return labels[status] ?? "观察"; } function getActionPlanClass(actionPlan: string) { @@ -667,6 +711,8 @@ function signalTypeLabel(signalType?: string) { launch: "启动", reversal: "反转", breakout_confirm: "突破确认", + flow_momentum: "资金推动", + none: "观察", }; return map[signalType || ""] ?? "观察"; } @@ -684,3 +730,11 @@ function positionComment(positionScore: number) { if (positionScore >= 50) return "位置中性"; return "位置偏高,防追高"; } + +function sameStockCode(left: string, right: string) { + return stripMarket(left) === stripMarket(right); +} + +function stripMarket(value: string) { + return value.replace(/\.(SH|SZ|BJ)$/i, "").replace(/^(SH|SZ|BJ)/i, ""); +} diff --git a/frontend/src/components/nav.tsx b/frontend/src/components/nav.tsx index bcee9f0c..56dc4784 100644 --- a/frontend/src/components/nav.tsx +++ b/frontend/src/components/nav.tsx @@ -34,6 +34,17 @@ function FireIcon() { ); } +function RadarIcon() { + return ( + + + + + + + ); +} + function StrategyIcon() { return ( @@ -117,6 +128,7 @@ export function SidebarNav() { } label="今日作战" /> } label="推荐池" /> } label="板块主线" /> + } label="舆情雷达" /> } label="自选股" /> } label="研究助手" /> } label="个股诊断" /> @@ -162,6 +174,9 @@ export function MobileBottomNav() { + + + diff --git a/frontend/src/components/stock-card.tsx b/frontend/src/components/stock-card.tsx index 1c43e86f..7c6dc311 100644 --- a/frontend/src/components/stock-card.tsx +++ b/frontend/src/components/stock-card.tsx @@ -4,11 +4,10 @@ import type { RecommendationData } from "@/lib/api"; export default function StockCard({ rec, compact = false }: { rec: RecommendationData; compact?: boolean }) { const action = getActionMeta(rec.action_plan); - const trigger = rec.trigger_condition ?? rec.entry_timing ?? rec.decision_trace?.headline ?? rec.prefilter_reason ?? rec.reasons?.[0] ?? "等待触发条件确认"; + const trigger = rec.trigger_condition ?? rec.entry_timing ?? rec.decision_trace?.headline ?? rec.reasons?.[0] ?? "等待触发条件确认"; const risk = rec.invalidation_condition ?? rec.risk_note ?? "暂无明确失效条件"; const thesis = rec.decision_trace?.headline ?? rec.reasons?.[0] ?? rec.focus_points?.[0] ?? "等待更多盘面证据"; - const conviction = rec.llm_score != null ? Math.round(rec.llm_score) : null; - const chips = buildChips(rec, conviction).slice(0, compact ? 3 : 5); + const chips = buildChips(rec).slice(0, compact ? 3 : 5); return ( = { sector_recall: "主线入选", trend_scan: "趋势入选", @@ -100,7 +99,7 @@ function buildChips(rec: RecommendationData, conviction: number | null) { return [ rec.entry_signal_type ? signalTypeLabel(rec.entry_signal_type) : null, rec.review_after_days ? `${rec.review_after_days}日复盘` : null, - conviction != null ? `把握 ${conviction}/10` : null, + rec.score != null ? `规则分 ${Math.round(rec.score)}` : null, ...(rec.recall_tags ?? []).map((tag) => recallLabels[tag] ?? tag), ].filter(Boolean) as string[]; } diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index d1d69984..9c36d3f1 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -242,6 +242,45 @@ export interface SectorData { catalyst_reasons?: string[]; } +export interface CatalystNewsItem { + id: number; + title: string; + source: string; + url: string; + published_at: string | null; + status: "pending" | "analyzed" | "skipped" | "failed" | string; + catalyst_id: number | null; + error: string; + created_at: string; + analyzed_at: string | null; +} + +export interface CatalystEvent { + id: number; + title: string; + summary: string; + source: string; + url: string; + published_at: string | null; + catalyst_type: "policy" | "industry" | "event" | "earnings" | "announcement" | "news" | string; + strength: number; + freshness: number; + confidence: number; + raw_text: string; + llm_reason: string; + created_at: string; + themes?: string; +} + +export interface ThemeCatalystScore { + theme_id: string; + theme_name: string; + catalyst_score: number; + catalyst_count: number; + top_reasons: string[]; + generated_by: string; +} + export interface LatestResult { market_temperature: MarketTemperatureData | null; recommendations: RecommendationData[];