"""催化事件存储与主题分数聚合。""" from __future__ import annotations import hashlib import logging from datetime import datetime, timedelta from sqlalchemy import text from app.catalyst.mapper import analyze_catalyst from app.catalyst.models import CatalystAnalysis, CatalystInput, ThemeCatalystScore from app.db import tables from app.db.database import get_db from app.news.models import NewsItem logger = logging.getLogger(__name__) async def ingest_catalyst(item: CatalystInput, use_llm: bool = True) -> CatalystAnalysis: analysis = await analyze_catalyst(item, use_llm=use_llm) await save_catalyst(analysis) return analysis async def ingest_catalyst_with_id(item: CatalystInput, use_llm: bool = True) -> tuple[CatalystAnalysis, int]: analysis = await analyze_catalyst(item, use_llm=use_llm) catalyst_id = await save_catalyst(analysis) return analysis, catalyst_id async def save_catalyst(analysis: CatalystAnalysis) -> int: async with get_db() as db: if analysis.url: exists = await db.execute( text( "SELECT id FROM catalysts WHERE source = :source AND url = :url " "ORDER BY id DESC LIMIT 1" ), {"source": analysis.source, "url": analysis.url}, ) row = exists.fetchone() if row: return int(row._mapping["id"]) result = await db.execute( tables.catalysts_table.insert().values( title=analysis.title, summary=analysis.summary, source=analysis.source, url=analysis.url, published_at=analysis.published_at, catalyst_type=analysis.catalyst_type, strength=analysis.strength, freshness=analysis.freshness, confidence=analysis.confidence, raw_text=analysis.raw_text, llm_reason=analysis.llm_reason, is_active=True, ) ) catalyst_id = int(result.inserted_primary_key[0]) if analysis.themes: await db.execute( tables.theme_catalysts_table.insert(), [ { "catalyst_id": catalyst_id, "theme_id": theme.theme_id, "theme_name": theme.theme_name, "relevance": theme.relevance, "reason": theme.reason, } for theme in analysis.themes ], ) await db.commit() return catalyst_id async def ingest_news_items(items: list[NewsItem]) -> dict: """保存原始新闻并分析新增项。""" inserted = 0 duplicates = 0 analyzed = 0 failed = 0 async with get_db() as db: for item in items: dedup_key = build_news_dedup_key(item) exists = await db.execute( text("SELECT id FROM news_items WHERE dedup_key = :dedup_key LIMIT 1"), {"dedup_key": dedup_key}, ) if exists.fetchone(): duplicates += 1 continue await db.execute( tables.news_items_table.insert().values( title=item.title, content=item.content, summary=item.summary, source=item.source, url=item.url, published_at=item.published_at, dedup_key=dedup_key, status="pending", ) ) inserted += 1 await db.commit() if inserted: result = await analyze_pending_news(limit=inserted) analyzed += int(result.get("analyzed", 0)) failed += int(result.get("failed", 0)) return { "fetched": len(items), "inserted": inserted, "duplicates": duplicates, "analyzed": analyzed, "failed": failed, } async def analyze_pending_news(limit: int = 50, use_llm: bool = True) -> dict: rows = [] async with get_db() as db: result = await db.execute( text( "SELECT * FROM news_items " "WHERE status = 'pending' " "ORDER BY COALESCE(published_at, created_at) DESC, id DESC " "LIMIT :limit" ), {"limit": limit}, ) rows = [dict(row._mapping) for row in result.fetchall()] analyzed = 0 skipped = 0 failed = 0 for row in rows: title = row.get("title") or "" content = row.get("content") or row.get("summary") or "" try: if not _looks_market_relevant(title, content): await _mark_news_item(row["id"], status="skipped", error="") skipped += 1 continue _, catalyst_id = await ingest_catalyst_with_id( CatalystInput( title=title, content=content, source=row.get("source") or "news", url=row.get("url") or "", published_at=row.get("published_at"), ), use_llm=use_llm, ) await _mark_news_item( row["id"], status="analyzed", catalyst_id=catalyst_id, error="", ) analyzed += 1 except Exception as e: logger.warning("新闻催化分析失败 id=%s title=%s error=%s", row.get("id"), title, e) await _mark_news_item(row["id"], status="failed", error=str(e)[:500]) failed += 1 return {"analyzed": analyzed, "skipped": skipped, "failed": failed} def build_news_dedup_key(item: NewsItem) -> str: text = f"{item.source}|{item.url or item.title}" normalized = "".join(ch.lower() for ch in text.strip() if ch.isalnum() or ch in ".:/_-|") return hashlib.sha256(normalized.encode("utf-8")).hexdigest() async def _mark_news_item( news_id: int, status: str, catalyst_id: int | None = None, error: str = "", ) -> None: async with get_db() as db: await db.execute( text( "UPDATE news_items SET status = :status, catalyst_id = :catalyst_id, " "error = :error, analyzed_at = :analyzed_at WHERE id = :id" ), { "status": status, "catalyst_id": catalyst_id, "error": error, "analyzed_at": datetime.now(), "id": news_id, }, ) await db.commit() def _looks_market_relevant(title: str, content: str) -> bool: text_value = f"{title} {content}" keywords = [ "A股", "股市", "上市公司", "证券", "券商", "板块", "行业", "概念", "政策", "国务院", "发改委", "工信部", "央行", "证监会", "交易所", "业绩", "订单", "签约", "并购", "重组", "回购", "定增", "涨停", "资金", "主力", "北向", "算力", "人工智能", "半导体", "新能源", "机器人", "低空", "医药", "消费", "军工", "地产", "有色", ] return any(keyword in text_value for keyword in keywords) async def get_recent_catalysts(limit: int = 30, hours: int = 72) -> list[dict]: since = datetime.now() - timedelta(hours=hours) async with get_db() as db: result = await db.execute( text( "SELECT c.*, " "GROUP_CONCAT(tc.theme_name || ':' || ROUND(tc.relevance, 0), ',') AS themes " "FROM catalysts c " "LEFT JOIN theme_catalysts tc ON tc.catalyst_id = c.id " "WHERE c.is_active = 1 AND COALESCE(c.published_at, c.created_at) >= :since " "GROUP BY c.id " "ORDER BY COALESCE(c.published_at, c.created_at) DESC, c.id DESC " "LIMIT :limit" ), {"since": since, "limit": limit}, ) rows = result.mappings().all() return [dict(row) for row in rows] async def get_recent_news_items(limit: int = 50, hours: int = 24, status: str | None = None) -> list[dict]: since = datetime.now() - timedelta(hours=hours) conditions = ["COALESCE(published_at, created_at) >= :since"] params = {"since": since, "limit": limit} if status: conditions.append("status = :status") params["status"] = status async with get_db() as db: result = await db.execute( text( "SELECT id, title, source, url, published_at, status, catalyst_id, error, created_at, analyzed_at " "FROM news_items WHERE " + " AND ".join(conditions) + " ORDER BY COALESCE(published_at, created_at) DESC, id DESC LIMIT :limit" ), params, ) return [dict(row._mapping) for row in result.fetchall()] async def build_theme_catalyst_scores(hours: int = 72, limit: int = 20) -> list[ThemeCatalystScore]: since = datetime.now() - timedelta(hours=hours) async with get_db() as db: rows = ( await db.execute( text( "SELECT tc.theme_id, tc.theme_name, " "COUNT(*) AS catalyst_count, " "SUM((c.strength * 0.45 + c.freshness * 0.25 + c.confidence * 0.15 + tc.relevance * 0.15)) AS raw_score, " "GROUP_CONCAT(SUBSTR(COALESCE(tc.reason, c.summary, c.title), 1, 60), ' | ') AS reasons " "FROM theme_catalysts tc " "JOIN catalysts c ON c.id = tc.catalyst_id " "WHERE c.is_active = 1 AND COALESCE(c.published_at, c.created_at) >= :since " "GROUP BY tc.theme_id, tc.theme_name " "ORDER BY raw_score DESC " "LIMIT :limit" ), {"since": since, "limit": limit}, ) ).mappings().all() scores = [] for row in rows: raw = float(row.get("raw_score") or 0) count = int(row.get("catalyst_count") or 0) normalized = min(raw / max(count, 1), 100) reasons = [ item.strip() for item in str(row.get("reasons") or "").split("|") if item.strip() ][:3] scores.append(ThemeCatalystScore( theme_id=row["theme_id"], theme_name=row["theme_name"], catalyst_score=round(normalized, 1), catalyst_count=count, top_reasons=reasons, generated_by="catalyst_layer", )) return scores