From 75ad00770db47558b22df69ebeffbff180ac02ab Mon Sep 17 00:00:00 2001 From: aaron <> Date: Wed, 25 Feb 2026 19:59:20 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=96=B0=E9=97=BBagent?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/app/api/news.py | 179 +++++++++ backend/app/config.py | 1 + backend/app/main.py | 28 +- backend/app/models/news.py | 99 +++++ backend/app/news_agent/__init__.py | 38 ++ backend/app/news_agent/analyzer.py | 423 ++++++++++++++++++++++ backend/app/news_agent/fetcher.py | 271 ++++++++++++++ backend/app/news_agent/filter.py | 267 ++++++++++++++ backend/app/news_agent/news_agent.py | 350 ++++++++++++++++++ backend/app/news_agent/news_db_service.py | 406 +++++++++++++++++++++ backend/app/news_agent/notifier.py | 307 ++++++++++++++++ backend/app/news_agent/sources.py | 277 ++++++++++++++ backend/app/services/feishu_service.py | 13 +- backend/requirements.txt | 5 + scripts/manual_news_fetch.py | 357 ++++++++++++++++++ scripts/migrate_create_news_table.py | 132 +++++++ scripts/run_news_fetch.sh | 21 ++ scripts/schema_news.sql | 45 +++ scripts/test_news_notification.py | 118 ++++++ 19 files changed, 3334 insertions(+), 3 deletions(-) create mode 100644 backend/app/api/news.py create mode 100644 backend/app/models/news.py create mode 100644 backend/app/news_agent/__init__.py create mode 100644 backend/app/news_agent/analyzer.py create mode 100644 backend/app/news_agent/fetcher.py create mode 100644 backend/app/news_agent/filter.py create mode 100644 backend/app/news_agent/news_agent.py create mode 100644 backend/app/news_agent/news_db_service.py create mode 100644 backend/app/news_agent/notifier.py create mode 100644 backend/app/news_agent/sources.py create mode 100755 scripts/manual_news_fetch.py create mode 100644 scripts/migrate_create_news_table.py create mode 100755 scripts/run_news_fetch.sh create mode 100644 scripts/schema_news.sql create mode 100644 scripts/test_news_notification.py diff --git a/backend/app/api/news.py b/backend/app/api/news.py new file mode 100644 index 0000000..fe636dc --- /dev/null +++ b/backend/app/api/news.py @@ -0,0 +1,179 @@ +""" +新闻 API - 提供新闻查询接口 +""" +from fastapi import APIRouter, HTTPException, Query +from typing import Dict, List, Optional, Any + +from app.news_agent.news_agent import get_news_agent +from app.news_agent.news_db_service import get_news_db_service +from app.utils.logger import logger + + +router = APIRouter(prefix="/api/news", tags=["新闻管理"]) + + +@router.get("/articles") +async def get_articles( + category: Optional[str] = Query(None, description="分类过滤 (crypto/stock)"), + limit: int = Query(50, ge=1, le=200, description="返回数量限制"), + hours: int = Query(24, ge=1, le=168, description="查询最近多少小时") +) -> Dict[str, Any]: + """ + 获取新闻文章列表 + + Args: + category: 分类过滤 + limit: 返回数量限制 + hours: 查询最近多少小时 + + Returns: + 文章列表 + """ + try: + db_service = get_news_db_service() + articles = db_service.get_latest_articles( + category=category, + limit=limit, + hours=hours + ) + + return { + 'success': True, + 'articles': articles, + 'count': len(articles) + } + except Exception as e: + logger.error(f"获取新闻文章失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/stats") +async def get_news_stats( + hours: int = Query(24, ge=1, le=168, description="统计最近多少小时") +) -> Dict[str, Any]: + """ + 获取新闻统计信息 + + Args: + hours: 统计最近多少小时 + + Returns: + 统计数据 + """ + try: + news_agent = get_news_agent() + agent_stats = news_agent.get_stats() + + db_service = get_news_db_service() + db_stats = db_service.get_stats(hours=hours) + + return { + 'success': True, + 'agent': agent_stats, + 'database': db_stats + } + except Exception as e: + logger.error(f"获取新闻统计失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/fetch") +async def manual_fetch( + category: Optional[str] = Query(None, description="分类过滤 (crypto/stock)") +) -> Dict[str, Any]: + """ + 手动触发新闻抓取 + + Args: + category: 分类过滤 + + Returns: + 抓取结果 + """ + try: + news_agent = get_news_agent() + result = await news_agent.manual_fetch(category) + + return { + 'success': True, + **result + } + except Exception as e: + logger.error(f"手动抓取新闻失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/high-priority") +async def get_high_priority_articles( + limit: int = Query(20, ge=1, le=100, description="返回数量限制"), + min_priority: float = Query(40.0, description="最低优先级分数"), + hours: int = Query(24, ge=1, le=168, description="查询最近多少小时") +) -> Dict[str, Any]: + """ + 获取高优先级文章 + + Args: + limit: 返回数量限制 + min_priority: 最低优先级分数 + hours: 查询最近多少小时 + + Returns: + 高优先级文章列表 + """ + try: + db_service = get_news_db_service() + articles = db_service.get_high_priority_articles( + limit=limit, + min_priority=min_priority, + hours=hours + ) + + return { + 'success': True, + 'articles': [article.to_dict() for article in articles], + 'count': len(articles) + } + except Exception as e: + logger.error(f"获取高优先级文章失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/sources") +async def get_news_sources() -> Dict[str, Any]: + """ + 获取新闻源配置 + + Returns: + 新闻源列表 + """ + try: + from app.news_agent.sources import CRYPTO_NEWS_SOURCES, STOCK_NEWS_SOURCES + + # 只返回基本信息,隐藏敏感配置 + crypto_sources = [ + { + 'name': s['name'], + 'category': s['category'], + 'enabled': s['enabled'] + } + for s in CRYPTO_NEWS_SOURCES + ] + + stock_sources = [ + { + 'name': s['name'], + 'category': s['category'], + 'enabled': s['enabled'] + } + for s in STOCK_NEWS_SOURCES + ] + + return { + 'success': True, + 'crypto': crypto_sources, + 'stock': stock_sources, + 'total': len(crypto_sources) + len(stock_sources) + } + except Exception as e: + logger.error(f"获取新闻源失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/backend/app/config.py b/backend/app/config.py index 52eb698..9d1faaf 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -96,6 +96,7 @@ class Settings(BaseSettings): # 飞书机器人配置 feishu_crypto_webhook_url: str = "https://open.feishu.cn/open-apis/bot/v2/hook/8a1dcf69-6753-41e2-a393-edc4f7822db0" # 加密货币通知 feishu_stock_webhook_url: str = "https://open.feishu.cn/open-apis/bot/v2/hook/408ab727-0dcd-4c7a-bde7-4aad38cbf807" # 股票通知 + feishu_news_webhook_url: str = "https://open.feishu.cn/open-apis/bot/v2/hook/c7fd0db7-d295-451c-b943-130278a6cd9d" # 新闻智能体通知 feishu_enabled: bool = True # 是否启用飞书通知 # Telegram 机器人配置 diff --git a/backend/app/main.py b/backend/app/main.py index a5d7d2d..aaa1035 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,7 +9,7 @@ from fastapi.responses import FileResponse from contextlib import asynccontextmanager from app.config import get_settings from app.utils.logger import logger -from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks, signals, system, real_trading +from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks, signals, system, real_trading, news from app.utils.error_handler import setup_global_exception_handler, init_error_notifier from app.utils.system_status import get_system_monitor import os @@ -20,6 +20,7 @@ _price_monitor_task = None _report_task = None _stock_agent_task = None _crypto_agent_task = None +_news_agent_task = None async def price_monitor_loop(): @@ -374,7 +375,7 @@ async def periodic_report_loop(): @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" - global _price_monitor_task, _report_task, _stock_agent_task, _crypto_agent_task + global _price_monitor_task, _report_task, _stock_agent_task, _crypto_agent_task, _news_agent_task # 启动时执行 logger.info("应用启动") @@ -444,6 +445,16 @@ async def lifespan(app: FastAPI): else: logger.info("股票智能体未启动(未配置股票代码)") + # 启动新闻智能体 + try: + from app.news_agent.news_agent import get_news_agent + news_agent = get_news_agent() + _news_agent_task = asyncio.create_task(news_agent.start()) + logger.info("新闻智能体已启动") + except Exception as e: + logger.error(f"新闻智能体启动失败: {e}") + logger.error(f"提示: 请确保已安装 feedparser 和 beautifulsoup4 (pip install feedparser beautifulsoup4)") + # 显示系统状态摘要 await _print_system_status() @@ -484,6 +495,18 @@ async def lifespan(app: FastAPI): pass logger.info("美股智能体已停止") + # 停止新闻智能体 + if _news_agent_task: + try: + from app.news_agent.news_agent import get_news_agent + news_agent = get_news_agent() + await news_agent.stop() + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"停止新闻智能体失败: {e}") + logger.info("新闻智能体已停止") + logger.info("应用关闭") @@ -516,6 +539,7 @@ app.include_router(paper_trading.router, tags=["模拟交易"]) app.include_router(real_trading.router, tags=["实盘交易"]) app.include_router(stocks.router, prefix="/api/stocks", tags=["美股分析"]) app.include_router(signals.router, tags=["信号管理"]) +app.include_router(news.router, tags=["新闻管理"]) app.include_router(system.router, prefix="/api/system", tags=["系统状态"]) # 挂载静态文件 diff --git a/backend/app/models/news.py b/backend/app/models/news.py new file mode 100644 index 0000000..5fad1d1 --- /dev/null +++ b/backend/app/models/news.py @@ -0,0 +1,99 @@ +""" +新闻文章数据库模型 +""" +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime, JSON, Boolean, Float +from sqlalchemy.orm import relationship + +from app.models.database import Base + + +class NewsArticle(Base): + """新闻文章表""" + __tablename__ = "news_articles" + + id = Column(Integer, primary_key=True, index=True) + + # 新闻基本信息 + title = Column(String(500), nullable=False) + content = Column(Text, nullable=True) # 完整内容或摘要 + content_hash = Column(String(64), nullable=False, index=True) # 内容哈希,用于去重 + url = Column(String(1000), nullable=False, unique=True) # 原文链接 + source = Column(String(100), nullable=False, index=True) # 来源网站 + author = Column(String(200), nullable=True) # 作者 + + # 新闻分类 + category = Column(String(50), nullable=False, index=True) # 'crypto', 'stock', 'forex', 'commodity' + tags = Column(JSON, nullable=True) # 标签列表 + + # 时间信息 + published_at = Column(DateTime, nullable=True, index=True) # 发布时间 + crawled_at = Column(DateTime, default=datetime.utcnow, index=True) # 爬取时间 + + # LLM 分析结果 + llm_analyzed = Column(Boolean, default=False, index=True) # 是否已分析 + market_impact = Column(String(20), nullable=True, index=True) # 'high', 'medium', 'low' + impact_type = Column(String(50), nullable=True) # 'bullish', 'bearish', 'neutral' + relevant_symbols = Column(JSON, nullable=True) # 相关的币种/股票代码 + + # LLM 分析详情 + sentiment = Column(String(20), nullable=True) # 'positive', 'negative', 'neutral' + summary = Column(Text, nullable=True) # LLM 生成的摘要 + key_points = Column(JSON, nullable=True) # 关键点列表 + trading_advice = Column(Text, nullable=True) # 交易建议 + + # 优先级队列 + priority = Column(Float, default=0.0, index=True) # 优先级分数 + priority_reason = Column(Text, nullable=True) # 优先级原因 + + # 通知状态 + notified = Column(Boolean, default=False, index=True) # 是否已发送通知 + notification_sent_at = Column(DateTime, nullable=True) + notification_channel = Column(String(50), nullable=True) # 'feishu', 'telegram', etc. + + # 质量控制 + quality_score = Column(Float, nullable=True) # 质量分数 0-1 + duplicate_of = Column(Integer, nullable=True) # 如果是重复,指向原始文章ID + + # 状态 + is_active = Column(Boolean, default=True, index=True) # 是否有效 + + # 时间戳 + created_at = Column(DateTime, default=datetime.utcnow, index=True) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def __repr__(self): + return f"" + + def to_dict(self): + """转换为字典""" + return { + 'id': self.id, + 'title': self.title, + 'content': self.content, + 'url': self.url, + 'source': self.source, + 'author': self.author, + 'category': self.category, + 'tags': self.tags, + 'published_at': self.published_at.isoformat() if self.published_at else None, + 'crawled_at': self.crawled_at.isoformat() if self.crawled_at else None, + 'llm_analyzed': self.llm_analyzed, + 'market_impact': self.market_impact, + 'impact_type': self.impact_type, + 'relevant_symbols': self.relevant_symbols, + 'sentiment': self.sentiment, + 'summary': self.summary, + 'key_points': self.key_points, + 'trading_advice': self.trading_advice, + 'priority': self.priority, + 'priority_reason': self.priority_reason, + 'notified': self.notified, + 'notification_sent_at': self.notification_sent_at.isoformat() if self.notification_sent_at else None, + 'notification_channel': self.notification_channel, + 'quality_score': self.quality_score, + 'duplicate_of': self.duplicate_of, + 'is_active': self.is_active, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'updated_at': self.updated_at.isoformat() if self.updated_at else None, + } diff --git a/backend/app/news_agent/__init__.py b/backend/app/news_agent/__init__.py new file mode 100644 index 0000000..579322c --- /dev/null +++ b/backend/app/news_agent/__init__.py @@ -0,0 +1,38 @@ +""" +新闻智能体模块 +""" +from app.news_agent.news_agent import NewsAgent, get_news_agent +from app.news_agent.fetcher import NewsFetcher, NewsItem +from app.news_agent.filter import NewsDeduplicator, NewsFilter +from app.news_agent.analyzer import NewsAnalyzer, NewsAnalyzerSimple +from app.news_agent.notifier import NewsNotifier, get_news_notifier +from app.news_agent.news_db_service import NewsDatabaseService, get_news_db_service +from app.news_agent.sources import ( + get_enabled_sources, + CRYPTO_NEWS_SOURCES, + STOCK_NEWS_SOURCES, + CRYPTO_KEYWORDS, + STOCK_KEYWORDS, + SYMBOL_MAPPINGS +) + +__all__ = [ + 'NewsAgent', + 'get_news_agent', + 'NewsFetcher', + 'NewsItem', + 'NewsDeduplicator', + 'NewsFilter', + 'NewsAnalyzer', + 'NewsAnalyzerSimple', + 'NewsNotifier', + 'get_news_notifier', + 'NewsDatabaseService', + 'get_news_db_service', + 'get_enabled_sources', + 'CRYPTO_NEWS_SOURCES', + 'STOCK_NEWS_SOURCES', + 'CRYPTO_KEYWORDS', + 'STOCK_KEYWORDS', + 'SYMBOL_MAPPINGS', +] diff --git a/backend/app/news_agent/analyzer.py b/backend/app/news_agent/analyzer.py new file mode 100644 index 0000000..ce74006 --- /dev/null +++ b/backend/app/news_agent/analyzer.py @@ -0,0 +1,423 @@ +""" +新闻 LLM 分析模块 +使用 LLM 分析新闻内容并生成交易建议 +""" +import json +from typing import Dict, Any, List, Optional +from datetime import datetime + +from app.utils.logger import logger +from app.news_agent.fetcher import NewsItem +from app.config import get_settings +from openai import OpenAI + + +class NewsAnalyzer: + """新闻 LLM 分析器 (DeepSeek)""" + + def __init__(self): + self.settings = get_settings() + self.client = None + + try: + # 使用 DeepSeek API + self.client = OpenAI( + api_key=self.settings.deepseek_api_key, + base_url="https://api.deepseek.com" + ) + except Exception as e: + logger.error(f"LLM 客户端初始化失败: {e}") + + # 批量分析配置 + self.batch_size = 10 # 每次最多分析 10 条新闻(只传标题,可以增加数量) + self.max_retries = 2 + + def _build_analysis_prompt(self, news_item: NewsItem) -> str: + """构建单条新闻的分析提示词""" + + prompt = f"""你是一名专业的金融新闻分析师。请分析以下新闻标题,并以 JSON 格式输出结果。 + +**新闻标题**: {news_item.title} + +**新闻来源**: {news_item.source} + +**新闻分类**: {news_item.category} + +请按以下 JSON 格式输出(不要包含其他内容): + +```json +{{ + "market_impact": "high/medium/low", + "impact_type": "bullish/bearish/neutral", + "sentiment": "positive/negative/neutral", + "summary": "简洁的新闻摘要(1句话,不超过50字)", + "key_points": ["关键点1", "关键点2", "关键点3"], + "trading_advice": "简洁的交易建议(1句话,不超过30字)", + "relevant_symbols": ["相关的币种或股票代码"], + "confidence": 85 +}} +``` + +**分析要求**: +1. market_impact: 对市场的潜在影响(high=重大影响, medium=中等影响, low=轻微影响) +2. impact_type: 对价格的影响方向(bullish=利好, bearish=利空, neutral=中性) +3. sentiment: 新闻情绪(positive=正面, negative=负面, neutral=中性) +4. summary: 根据标题推断并总结新闻核心内容 +5. key_points: 基于标题推断3-5个关键信息点 +6. trading_advice: 给出简明的交易建议 +7. relevant_symbols: 根据标题列出相关的交易代码(如 BTC, ETH, NVDA, TSLA 等) +8. confidence: 分析置信度(0-100) + +请只输出 JSON,不要包含其他解释。 +""" + + return prompt + + def _build_batch_analysis_prompt(self, news_items: List[NewsItem]) -> str: + """构建批量分析提示词""" + + news_text = "" + for i, item in enumerate(news_items, 1): + news_text += f""" +--- 新闻 {i} --- +标题: {item.title} +来源: {item.source} +分类: {item.category} +--- +""" + + prompt = f"""你是一名专业的金融新闻分析师。请分析以下 {len(news_items)} 条新闻标题,并以 JSON 数组格式输出结果。 + +{news_text} + +请按以下 JSON 格式输出(不要包含其他内容): + +```json +[ + {{ + "title": "新闻标题", + "market_impact": "high/medium/low", + "impact_type": "bullish/bearish/neutral", + "sentiment": "positive/negative/neutral", + "summary": "简洁的新闻摘要(1句话,不超过50字)", + "key_points": ["关键点1", "关键点2"], + "trading_advice": "简洁的交易建议(1句话,不超过30字)", + "relevant_symbols": ["相关代码"], + "confidence": 85 + }} +] +``` + +请只输出 JSON 数组,不要包含其他解释。 +""" + + return prompt + + def _parse_llm_response(self, response: str) -> Optional[Dict[str, Any]]: + """解析 LLM 响应""" + try: + # 尝试提取 JSON + response = response.strip() + + # 移除可能的 markdown 代码块标记 + if response.startswith("```json"): + response = response[7:] + if response.startswith("```"): + response = response[3:] + if response.endswith("```"): + response = response[:-3] + + response = response.strip() + + # 解析 JSON + return json.loads(response) + + except json.JSONDecodeError as e: + # 尝试修复截断的 JSON + logger.warning(f"JSON 解析失败,尝试修复: {e}") + try: + # 查找最后一个完整的对象 + response = response.strip() + + # 如果是数组,找到最后一个完整的对象 + if response.startswith('['): + # 找到每个完整对象的结束位置 + brace_count = 0 + last_complete = 0 + for i, char in enumerate(response): + if char == '{': + brace_count += 1 + elif char == '}': + brace_count -= 1 + if brace_count == 0: + last_complete = i + 1 + break + + if last_complete > 0: + # 提取完整的数组 + fixed = response[:last_complete] + if not fixed.endswith(']'): + fixed += ']' + if not fixed.endswith('}'): + fixed += '}' + return json.loads(fixed) + except: + pass + + logger.error(f"JSON 解析失败: {e}, 响应: {response[:500]}") + return None + + def _parse_llm_array_response(self, response: str) -> Optional[List[Dict[str, Any]]]: + """解析 LLM 数组响应""" + try: + # 尝试提取 JSON + response = response.strip() + + # 移除可能的 markdown 代码块标记 + if response.startswith("```json"): + response = response[7:] + if response.startswith("```"): + response = response[3:] + if response.endswith("```"): + response = response[:-3] + + response = response.strip() + + # 解析 JSON 数组 + result = json.loads(response) + if isinstance(result, list): + return result + elif isinstance(result, dict) and 'title' in result: + # 如果返回单个对象,包装成数组 + return [result] + return None + + except json.JSONDecodeError as e: + # 尝试修复截断的 JSON 数组 + logger.warning(f"JSON 数组解析失败,尝试修复: {e}") + try: + response = response.strip() + + if response.startswith('['): + # 找到每个完整对象 + objects = [] + brace_count = 0 + obj_start = -1 + + for i, char in enumerate(response): + if char == '{': + if obj_start == -1: + obj_start = i + brace_count += 1 + elif char == '}': + brace_count -= 1 + if brace_count == 0 and obj_start >= 0: + # 提取完整对象 + obj_str = response[obj_start:i + 1] + try: + obj = json.loads(obj_str) + if isinstance(obj, dict) and 'title' in obj: + objects.append(obj) + except: + pass + obj_start = -1 + + if objects: + return objects + except: + pass + + logger.error(f"JSON 数组解析失败: {e}, 响应: {response[:500]}") + return None + + def analyze_single(self, news_item: NewsItem) -> Optional[Dict[str, Any]]: + """ + 分析单条新闻 + + Args: + news_item: 新闻项 + + Returns: + 分析结果字典或 None + """ + if not self.client: + logger.warning("LLM 客户端未初始化") + return None + + try: + prompt = self._build_analysis_prompt(news_item) + + for attempt in range(self.max_retries): + try: + response = self.client.chat.completions.create( + model="deepseek-chat", + messages=[ + {"role": "system", "content": "你是一名专业的金融新闻分析师,擅长分析新闻标题对市场的影响。"}, + {"role": "user", "content": prompt} + ], + temperature=0.3, + max_tokens=1000 # 只传标题,减少输出token + ) + + result = self._parse_llm_response(response.choices[0].message.content) + + if result: + logger.info(f"新闻分析成功: {news_item.title[:50]}... -> {result.get('market_impact')}") + return result + + except Exception as e: + logger.warning(f"分析失败 (尝试 {attempt + 1}/{self.max_retries}): {e}") + + logger.error(f"新闻分析失败,已达最大重试次数: {news_item.title[:50]}") + return None + + except Exception as e: + logger.error(f"分析新闻时出错: {e}") + return None + + def analyze_batch(self, news_items: List[NewsItem]) -> List[Optional[Dict[str, Any]]]: + """ + 批量分析新闻 + + Args: + news_items: 新闻项列表 + + Returns: + 分析结果列表(与输入顺序一致) + """ + if not self.client: + logger.warning("LLM 客户端未初始化") + return [None] * len(news_items) + + results = [] + + # 分批处理 + for i in range(0, len(news_items), self.batch_size): + batch = news_items[i:i + self.batch_size] + + try: + prompt = self._build_batch_analysis_prompt(batch) + + response = self.client.chat.completions.create( + model="deepseek-chat", + messages=[ + {"role": "system", "content": "你是一名专业的金融新闻分析师,擅长分析新闻标题对市场的影响。"}, + {"role": "user", "content": prompt} + ], + temperature=0.3, + max_tokens=2000 # 批量分析需要更多 token + ) + + batch_results = self._parse_llm_array_response(response.choices[0].message.content) + + if batch_results: + # 按标题匹配结果 + title_to_result = {r.get('title'): r for r in batch_results if r and isinstance(r, dict)} + for item in batch: + result = title_to_result.get(item.title) + results.append(result) + if result: + logger.info(f"新闻分析成功: {item.title[:50]}... -> {result.get('market_impact')}") + else: + results.extend([None] * len(batch)) + + except Exception as e: + logger.error(f"批量分析失败: {e}") + results.extend([None] * len(batch)) + + return results + + def calculate_priority(self, analysis: Dict[str, Any], quality_score: float = 0.5) -> float: + """ + 根据分析结果计算优先级 + + Args: + analysis: LLM 分析结果 + quality_score: 质量分数 + + Returns: + 优先级分数 + """ + score = 0.0 + + # 市场影响 + impact_weights = {'high': 50, 'medium': 30, 'low': 10} + score += impact_weights.get(analysis.get('market_impact', 'low'), 10) + + # 方向性(利空利好比中性重要) + if analysis.get('impact_type') in ['bullish', 'bearish']: + score += 15 + + # 置信度 + score += (analysis.get('confidence', 50) / 100) * 10 + + # 质量分数 + score += quality_score * 20 + + # 是否有相关代码 + if analysis.get('relevant_symbols'): + score += 5 + + return score + + +class NewsAnalyzerSimple: + """简化版新闻分析器(仅关键词规则,不使用 LLM)""" + + def __init__(self): + pass + + def analyze_single(self, news_item: NewsItem) -> Dict[str, Any]: + """ + 基于规则分析新闻 + + Args: + news_item: 新闻项 + + Returns: + 分析结果字典 + """ + # 使用已有的影响评分 + impact_score = getattr(news_item, 'impact_score', 0.0) + + # 根据 impact_score 确定市场影响 + if impact_score >= 1.0: + market_impact = 'high' + elif impact_score >= 0.7: + market_impact = 'medium' + else: + market_impact = 'low' + + # 检查关键词确定方向 + text = f"{news_item.title} {news_item.content}".lower() + + bullish_keywords = ['上涨', '增长', '突破', '新高', 'bullish', 'surge', 'rally', 'gain', '批准', '合作'] + bearish_keywords = ['下跌', '暴跌', '崩盘', 'ban', 'bearish', 'crash', 'plunge', 'fall', '禁令', '风险'] + + bullish_count = sum(1 for k in bullish_keywords if k in text) + bearish_count = sum(1 for k in bearish_keywords if k in text) + + if bullish_count > bearish_count: + impact_type = 'bullish' + sentiment = 'positive' + elif bearish_count > bullish_count: + impact_type = 'bearish' + sentiment = 'negative' + else: + impact_type = 'neutral' + sentiment = 'neutral' + + # 获取相关代码 + relevant_symbols = list(set(getattr(news_item, 'relevant_symbols', []))) + + return { + 'market_impact': market_impact, + 'impact_type': impact_type, + 'sentiment': sentiment, + 'summary': news_item.title, + 'key_points': [news_item.title[:100]], + 'trading_advice': getattr(news_item, 'impact_reason', '关注市场动态'), + 'relevant_symbols': relevant_symbols, + 'confidence': 70, + 'analyzed_by': 'rules' + } diff --git a/backend/app/news_agent/fetcher.py b/backend/app/news_agent/fetcher.py new file mode 100644 index 0000000..2afa9d1 --- /dev/null +++ b/backend/app/news_agent/fetcher.py @@ -0,0 +1,271 @@ +""" +新闻获取模块 - 从 RSS 源获取新闻 +""" +import asyncio +import hashlib +import feedparser +import httpx +from datetime import datetime, timedelta +from typing import Dict, List, Any, Optional +from dataclasses import dataclass +from bs4 import BeautifulSoup + +from app.utils.logger import logger +from app.news_agent.sources import get_enabled_sources + + +@dataclass +class NewsItem: + """新闻项数据类""" + title: str + content: str + url: str + source: str + category: str + published_at: Optional[datetime] + crawled_at: datetime + content_hash: str + author: Optional[str] = None + tags: Optional[List[str]] = None + + def to_dict(self) -> Dict[str, Any]: + """转换为字典""" + return { + 'title': self.title, + 'content': self.content, + 'url': self.url, + 'source': self.source, + 'category': self.category, + 'published_at': self.published_at.isoformat() if self.published_at else None, + 'crawled_at': self.crawled_at.isoformat(), + 'content_hash': self.content_hash, + 'author': self.author, + 'tags': self.tags, + } + + +class NewsFetcher: + """新闻获取器""" + + def __init__(self): + self.sources = get_enabled_sources() + self.client = httpx.AsyncClient( + timeout=30.0, + headers={ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' + } + ) + + async def close(self): + """关闭 HTTP 客户端""" + await self.client.aclose() + + def _generate_content_hash(self, title: str, content: str) -> str: + """生成内容哈希用于去重""" + combined = f"{title}{content}" + return hashlib.sha256(combined.encode()).hexdigest() + + def _clean_html(self, html: str) -> str: + """清理 HTML,提取纯文本""" + if not html: + return "" + + soup = BeautifulSoup(html, 'html.parser') + + # 移除脚本和样式 + for script in soup(['script', 'style']): + script.decompose() + + # 获取文本 + text = soup.get_text() + + # 清理空白 + lines = (line.strip() for line in text.splitlines()) + chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) + text = ' '.join(chunk for chunk in chunks if chunk) + + return text[:5000] # 限制长度 + + def _parse_rss_date(self, date_str: str) -> Optional[datetime]: + """解析 RSS 日期""" + if not date_str: + return None + + try: + # feedparser 会解析日期 + parsed = feedparser.parse(date_str) + if hasattr(parsed, 'updated_parsed'): + return datetime(*parsed.updated_parsed[:6]) + except Exception as e: + logger.debug(f"日期解析失败: {date_str}, 错误: {e}") + + return None + + async def fetch_rss_feed(self, source: Dict[str, Any]) -> List[NewsItem]: + """ + 获取单个 RSS 源的新闻 + + Args: + source: 新闻源配置 + + Returns: + 新闻项列表 + """ + items = [] + + try: + logger.debug(f"正在获取 {source['name']} 的 RSS...") + + # 使用 feedparser 解析 RSS + feed = feedparser.parse(source['url']) + + if feed.bozo: # RSS 解析错误 + logger.warning(f"{source['name']} RSS 解析警告: {feed.bozo_exception}") + + # 解析每个条目 + for entry in feed.entries[:50]: # 每次最多取 50 条 + try: + # 提取标题 + title = entry.get('title', '') + + # 提取内容 + content = '' + if hasattr(entry, 'content'): + content = entry.content[0].value if entry.content else '' + elif hasattr(entry, 'summary'): + content = entry.summary + elif hasattr(entry, 'description'): + content = entry.description + + # 清理 HTML + content = self._clean_html(content) + + # 提取链接 + url = entry.get('link', '') + + # 提取作者 + author = entry.get('author', None) + + # 提取标签 + tags = [] + if hasattr(entry, 'tags'): + tags = [tag.term for tag in entry.tags] + + # 解析发布时间 + published_at = None + if hasattr(entry, 'published_parsed'): + published_at = datetime(*entry.published_parsed[:6]) + elif hasattr(entry, 'updated_parsed'): + published_at = datetime(*entry.updated_parsed[:6]) + + # 只处理最近 24 小时的新闻 + if published_at: + time_diff = datetime.utcnow() - published_at + if time_diff > timedelta(hours=24): + continue + + # 生成内容哈希 + content_hash = self._generate_content_hash(title, content) + + news_item = NewsItem( + title=title, + content=content, + url=url, + source=source['name'], + category=source['category'], + published_at=published_at, + crawled_at=datetime.utcnow(), + content_hash=content_hash, + author=author, + tags=tags if tags else None + ) + + items.append(news_item) + + except Exception as e: + logger.debug(f"解析新闻条目失败: {e}") + continue + + logger.info(f"从 {source['name']} 获取到 {len(items)} 条新闻") + + except Exception as e: + logger.error(f"获取 {source['name']} 失败: {e}") + + return items + + async def fetch_all_news(self, category: str = None) -> List[NewsItem]: + """ + 获取所有新闻源的新闻 + + Args: + category: 分类过滤 ('crypto', 'stock', None 表示全部) + + Returns: + 所有新闻项列表 + """ + sources = get_enabled_sources(category) + + if not sources: + logger.warning("没有启用的新闻源") + return [] + + logger.info(f"开始从 {len(sources)} 个新闻源获取新闻...") + + # 并发获取所有源 + tasks = [self.fetch_rss_feed(source) for source in sources] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 合并结果 + all_items = [] + for result in results: + if isinstance(result, Exception): + logger.error(f"获取新闻时出错: {result}") + continue + all_items.extend(result) + + logger.info(f"总共获取到 {len(all_items)} 条新闻") + + return all_items + + async def fetch_single_url(self, url: str, source: str = "manual") -> Optional[NewsItem]: + """ + 获取单个 URL 的新闻内容 + + Args: + url: 新闻 URL + source: 新闻来源名称 + + Returns: + 新闻项或 None + """ + try: + response = await self.client.get(url) + response.raise_for_status() + + # 使用 BeautifulSoup 解析 + soup = BeautifulSoup(response.text, 'html.parser') + + # 尝试提取标题 + title_tag = soup.find(['h1', 'title']) + title = title_tag.get_text().strip() if title_tag else url + + # 提取正文(简单处理,实际需要针对不同网站调整) + content = self._clean_html(response.text) + + # 生成哈希 + content_hash = self._generate_content_hash(title, content) + + return NewsItem( + title=title, + content=content, + url=url, + source=source, + category="manual", + published_at=datetime.utcnow(), + crawled_at=datetime.utcnow(), + content_hash=content_hash + ) + + except Exception as e: + logger.error(f"获取 URL {url} 失败: {e}") + return None diff --git a/backend/app/news_agent/filter.py b/backend/app/news_agent/filter.py new file mode 100644 index 0000000..cf4eede --- /dev/null +++ b/backend/app/news_agent/filter.py @@ -0,0 +1,267 @@ +""" +新闻去重和过滤模块 +""" +from datetime import datetime, timedelta +from typing import List, Dict, Any, Set, Tuple +from difflib import SequenceMatcher + +from app.utils.logger import logger +from app.news_agent.fetcher import NewsItem +from app.news_agent.sources import CRYPTO_KEYWORDS, STOCK_KEYWORDS, SYMBOL_MAPPINGS + + +class NewsDeduplicator: + """新闻去重器""" + + def __init__(self): + self.recent_hashes: Set[str] = set() + self.hash_expiry: datetime = None + self.expiry_hours = 24 + + def _clean_hash_cache(self): + """清理过期的哈希缓存""" + now = datetime.utcnow() + if self.hash_expiry is None or now > self.hash_expiry: + self.recent_hashes.clear() + self.hash_expiry = now + timedelta(hours=self.expiry_hours) + logger.debug("哈希缓存已清理") + + def check_duplicate(self, item: NewsItem) -> bool: + """ + 检查新闻是否重复 + + Args: + item: 新闻项 + + Returns: + True 如果重复 + """ + self._clean_hash_cache() + + # 检查内容哈希 + if item.content_hash in self.recent_hashes: + return True + + # 添加到缓存 + self.recent_hashes.add(item.content_hash) + return False + + def deduplicate_list(self, items: List[NewsItem]) -> List[NewsItem]: + """ + 对新闻列表进行去重 + + Args: + items: 新闻项列表 + + Returns: + 去重后的新闻列表 + """ + seen_hashes = set() + unique_items = [] + + for item in items: + if item.content_hash not in seen_hashes: + seen_hashes.add(item.content_hash) + unique_items.append(item) + + removed = len(items) - len(unique_items) + if removed > 0: + logger.info(f"去重: 移除了 {removed} 条重复新闻") + + return unique_items + + def find_similar(self, item: NewsItem, existing_items: List[NewsItem], threshold: float = 0.85) -> List[NewsItem]: + """ + 查找相似新闻(基于标题相似度) + + Args: + item: 待检查的新闻项 + existing_items: 已存在的新闻列表 + threshold: 相似度阈值 + + Returns: + 相似新闻列表 + """ + similar = [] + + for existing in existing_items: + # 只比较同类新闻 + if existing.category != item.category: + continue + + # 标题相似度 + similarity = SequenceMatcher(None, item.title.lower(), existing.title.lower()).ratio() + + if similarity >= threshold: + similar.append((existing, similarity)) + + # 按相似度排序 + similar.sort(key=lambda x: x[1], reverse=True) + return [s[0] for s in similar] + + +class NewsFilter: + """新闻过滤器 - 关键词和质量过滤""" + + def __init__(self): + self.crypto_keywords = CRYPTO_KEYWORDS + self.stock_keywords = STOCK_KEYWORDS + self.symbol_mappings = SYMBOL_MAPPINGS + + def _extract_symbols(self, text: str, category: str) -> List[str]: + """ + 从文本中提取相关的币种或股票代码 + + Args: + text: 输入文本 + category: 分类 ('crypto', 'stock') + + Returns: + 相关代码列表 + """ + text_lower = text.lower() + found_symbols = [] + + mappings = self.symbol_mappings + for symbol, keywords in mappings.items(): + # 检查是否匹配 + for keyword in keywords: + if keyword.lower() in text_lower: + found_symbols.append(symbol) + break + + return found_symbols + + def _check_keywords(self, text: str, category: str) -> Tuple[float, str]: + """ + 检查关键词并返回影响评分 + + Args: + text: 输入文本 + category: 分类 + + Returns: + (影响评分, 原因) + """ + text_lower = text.lower() + keywords_config = self.crypto_keywords if category == 'crypto' else self.stock_keywords + + # 检查高影响关键词 + for keyword in keywords_config['high_impact']: + if keyword.lower() in text_lower: + return 1.0, f"匹配高影响关键词: {keyword}" + + # 检查中等影响关键词 + for keyword in keywords_config['medium_impact']: + if keyword.lower() in text_lower: + return 0.7, f"匹配中等影响关键词: {keyword}" + + return 0.0, "未匹配关键词" + + def _calculate_quality_score(self, item: NewsItem) -> float: + """ + 计算新闻质量分数 + + Args: + item: 新闻项 + + Returns: + 质量分数 0-1 + """ + score = 0.5 # 基础分 + + # 内容长度 + if len(item.content) > 500: + score += 0.1 + if len(item.content) > 1000: + score += 0.1 + + # 标题长度 + if 20 <= len(item.title) <= 150: + score += 0.1 + + # 有作者 + if item.author: + score += 0.1 + + # 有标签 + if item.tags and len(item.tags) > 0: + score += 0.1 + + return min(score, 1.0) + + def filter_news(self, items: List[NewsItem], min_quality: float = 0.3) -> List[NewsItem]: + """ + 过滤新闻列表 + + Args: + items: 新闻项列表 + min_quality: 最低质量分数 + + Returns: + 过滤后的新闻列表,附带影响评分 + """ + filtered = [] + low_quality_count = 0 + no_keywords_count = 0 + + for item in items: + # 计算质量分数 + quality_score = self._calculate_quality_score(item) + + # 质量过滤 + if quality_score < min_quality: + low_quality_count += 1 + continue + + # 关键词检查 + text_to_check = f"{item.title} {item.content[:500]}" + impact_score, impact_reason = self._check_keywords(text_to_check, item.category) + + # 提取相关代码 + symbols = self._extract_symbols(text_to_check, item.category) + + # 附加属性 + item.quality_score = quality_score + item.impact_score = impact_score + item.impact_reason = impact_reason + item.relevant_symbols = symbols + + # 至少匹配关键词 + if impact_score > 0: + filtered.append(item) + else: + no_keywords_count += 1 + + logger.info(f"过滤结果: {len(filtered)} 条通过, {low_quality_count} 条低质量, {no_keywords_count} 条无关键词") + + return filtered + + def get_priority_score(self, item: NewsItem) -> float: + """ + 计算优先级分数 + + Args: + item: 新闻项 + + Returns: + 优先级分数 + """ + score = 0.0 + + # 影响分数 + score += getattr(item, 'impact_score', 0.0) * 50 + + # 质量分数 + score += getattr(item, 'quality_score', 0.5) * 20 + + # 是否有相关代码 + if hasattr(item, 'relevant_symbols') and item.relevant_symbols: + score += 10 + + # 新闻新鲜度(最近发布的优先) + if item.published_at: + hours_ago = (datetime.utcnow() - item.published_at).total_seconds() / 3600 + score += max(0, 20 - hours_ago) + + return score diff --git a/backend/app/news_agent/news_agent.py b/backend/app/news_agent/news_agent.py new file mode 100644 index 0000000..6488e18 --- /dev/null +++ b/backend/app/news_agent/news_agent.py @@ -0,0 +1,350 @@ +""" +新闻智能体 - 主控制器 +实时抓取、分析、通知重要新闻 +""" +import asyncio +from typing import Dict, Any, List, Optional +from datetime import datetime, timedelta + +from app.utils.logger import logger +from app.config import get_settings +from app.news_agent.sources import get_enabled_sources +from app.news_agent.fetcher import NewsFetcher, NewsItem +from app.news_agent.filter import NewsDeduplicator, NewsFilter +from app.news_agent.analyzer import NewsAnalyzer, NewsAnalyzerSimple +from app.news_agent.news_db_service import get_news_db_service +from app.news_agent.notifier import get_news_notifier + + +class NewsAgent: + """新闻智能体 - 主控制器""" + + _instance = None + _initialized = False + + def __new__(cls, *args, **kwargs): + """单例模式""" + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + """初始化新闻智能体""" + if NewsAgent._initialized: + return + + NewsAgent._initialized = True + self.settings = get_settings() + + # 核心组件 + self.fetcher = NewsFetcher() + self.deduplicator = NewsDeduplicator() + self.filter = NewsFilter() + self.analyzer = NewsAnalyzer() # LLM 分析器 + self.simple_analyzer = NewsAnalyzerSimple() # 规则分析器(备用) + self.db_service = get_news_db_service() + self.notifier = get_news_notifier() + + # 配置 + self.fetch_interval = 300 # 抓取间隔(秒)= 5分钟 + self.min_priority = 40.0 # 最低通知优先级 + self.use_llm = True # 是否使用 LLM 分析 + self.batch_analysis = True # 是否批量分析 + + # 统计数据 + self.stats = { + 'total_fetched': 0, + 'total_saved': 0, + 'total_analyzed': 0, + 'total_notified': 0, + 'last_fetch_time': None, + 'last_notify_time': None + } + + # 运行状态 + self.running = False + self._task = None + + logger.info("新闻智能体初始化完成") + + async def start(self): + """启动新闻智能体""" + if self.running: + logger.warning("新闻智能体已在运行") + return + + self.running = True + + # 发送启动通知 + sources = get_enabled_sources() + crypto_count = sum(1 for s in sources if s['category'] == 'crypto') + stock_count = sum(1 for s in sources if s['category'] == 'stock') + + await self.notifier.notify_startup({ + 'crypto_sources': crypto_count, + 'stock_sources': stock_count, + 'fetch_interval': self.fetch_interval + }) + + # 启动后台任务 + self._task = asyncio.create_task(self._run_loop()) + + logger.info("新闻智能体已启动") + + async def stop(self): + """停止新闻智能体""" + if not self.running: + return + + self.running = False + + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + await self.fetcher.close() + + logger.info("新闻智能体已停止") + + async def _run_loop(self): + """主循环""" + while self.running: + try: + await self._fetch_and_process_news() + + except Exception as e: + logger.error(f"新闻处理循环出错: {e}") + await self.notifier.notify_error(str(e)) + + # 等待下一次抓取 + await asyncio.sleep(self.fetch_interval) + + async def _fetch_and_process_news(self): + """抓取并处理新闻""" + logger.info("=" * 60) + logger.info("开始新闻处理周期") + + # 1. 抓取新闻 + items = await self.fetcher.fetch_all_news() + self.stats['total_fetched'] += len(items) + self.stats['last_fetch_time'] = datetime.utcnow().isoformat() + + if not items: + logger.info("没有获取到新新闻") + return + + logger.info(f"获取到 {len(items)} 条新闻") + + # 2. 去重 + items = self.deduplicator.deduplicate_list(items) + logger.info(f"去重后剩余 {len(items)} 条") + + # 3. 过滤 + filtered_items = self.filter.filter_news(items) + logger.info(f"过滤后剩余 {len(filtered_items)} 条") + + if not filtered_items: + logger.info("没有符合条件的新闻") + return + + # 4. 保存到数据库 + saved_articles = [] + for item in filtered_items: + # 检查数据库中是否已存在 + if self.db_service.check_duplicate_by_hash(item.content_hash): + continue + + # 保存 + article_data = { + 'title': item.title, + 'content': item.content, + 'url': item.url, + 'source': item.source, + 'author': item.author, + 'category': item.category, + 'tags': item.tags, + 'published_at': item.published_at, + 'crawled_at': item.crawled_at, + 'content_hash': item.content_hash, + 'quality_score': getattr(item, 'quality_score', 0.5), + } + + article = self.db_service.save_article(article_data) + if article: + saved_articles.append((article, item)) + + self.stats['total_saved'] += len(saved_articles) + logger.info(f"保存了 {len(saved_articles)} 条新文章") + + if not saved_articles: + return + + # 5. LLM 分析 + analyzed_count = 0 + high_priority_articles = [] + + if self.use_llm: + # 批量分析 + if self.batch_analysis and len(saved_articles) > 1: + items_to_analyze = [item for _, item in saved_articles] + results = self.analyzer.analyze_batch(items_to_analyze) + + for (article, _), result in zip(saved_articles, results): + if result: + priority = self.analyzer.calculate_priority( + result, + getattr(article, 'quality_score', 0.5) + ) + self.db_service.mark_as_analyzed(article.id, result, priority) + + analyzed_count += 1 + # 只发送重大影响(high)的新闻 + if result.get('market_impact') == 'high': + article_dict = article.to_dict() + article_dict.update({ + 'llm_analyzed': True, + 'market_impact': result.get('market_impact'), + 'impact_type': result.get('impact_type'), + 'sentiment': result.get('sentiment'), + 'summary': result.get('summary'), + 'key_points': result.get('key_points'), + 'trading_advice': result.get('trading_advice'), + 'relevant_symbols': result.get('relevant_symbols'), + 'priority': priority, + }) + high_priority_articles.append(article_dict) + + else: + # 单个分析 + for article, item in saved_articles: + result = self.analyzer.analyze_single(item) + + if result: + priority = self.analyzer.calculate_priority( + result, + getattr(article, 'quality_score', 0.5) + ) + self.db_service.mark_as_analyzed(article.id, result, priority) + + analyzed_count += 1 + # 只发送重大影响(high)的新闻 + if result.get('market_impact') == 'high': + article_dict = article.to_dict() + article_dict.update({ + 'llm_analyzed': True, + 'market_impact': result.get('market_impact'), + 'impact_type': result.get('impact_type'), + 'sentiment': result.get('sentiment'), + 'summary': result.get('summary'), + 'key_points': result.get('key_points'), + 'trading_advice': result.get('trading_advice'), + 'relevant_symbols': result.get('relevant_symbols'), + 'priority': priority, + }) + high_priority_articles.append(article_dict) + + else: + # 使用规则分析 + for article, item in saved_articles: + result = self.simple_analyzer.analyze_single(item) + priority = result.get('confidence', 50) + + self.db_service.mark_as_analyzed(article.id, result, priority) + analyzed_count += 1 + + # 只发送重大影响(high)的新闻 + if result.get('market_impact') == 'high': + article_dict = article.to_dict() + article_dict.update({ + 'llm_analyzed': True, + 'market_impact': result.get('market_impact'), + 'impact_type': result.get('impact_type'), + 'sentiment': result.get('sentiment'), + 'summary': result.get('summary'), + 'key_points': result.get('key_points'), + 'trading_advice': result.get('trading_advice'), + 'relevant_symbols': result.get('relevant_symbols'), + 'priority': priority, + }) + high_priority_articles.append(article_dict) + + self.stats['total_analyzed'] += analyzed_count + logger.info(f"分析了 {analyzed_count} 条文章") + + # 6. 发送通知 + if high_priority_articles: + # 按优先级排序 + high_priority_articles.sort( + key=lambda x: x.get('priority', 0), + reverse=True + ) + + # 如果只有1-2条,单独发送;否则批量发送 + if len(high_priority_articles) <= 2: + for article in high_priority_articles: + await self.notifier.notify_single_news(article) + self.db_service.mark_as_notified(article['id']) + self.stats['total_notified'] += 1 + else: + await self.notifier.notify_news_batch(high_priority_articles[:10]) + for article in high_priority_articles[:10]: + self.db_service.mark_as_notified(article['id']) + self.stats['total_notified'] += 1 + + self.stats['last_notify_time'] = datetime.utcnow().isoformat() + + logger.info("=" * 60) + + def get_stats(self) -> Dict[str, Any]: + """获取统计数据""" + stats = self.stats.copy() + stats['running'] = self.running + stats['fetch_interval'] = self.fetch_interval + stats['use_llm'] = self.use_llm + + # 从数据库获取更多统计 + db_stats = self.db_service.get_stats(hours=24) + stats['db_stats'] = db_stats + + return stats + + async def manual_fetch(self, category: str = None) -> Dict[str, Any]: + """ + 手动触发新闻抓取 + + Args: + category: 分类过滤 + + Returns: + 处理结果 + """ + logger.info(f"手动触发新闻抓取: category={category}") + + items = await self.fetcher.fetch_all_news(category) + + result = { + 'fetched': len(items), + 'timestamp': datetime.utcnow().isoformat() + } + + if items: + # 这里可以触发处理流程 + # 为简化,只返回抓取结果 + result['items'] = [item.to_dict() for item in items[:5]] + + return result + + +# 全局实例 +_news_agent = None + + +def get_news_agent() -> NewsAgent: + """获取新闻智能体单例""" + global _news_agent + if _news_agent is None: + _news_agent = NewsAgent() + return _news_agent diff --git a/backend/app/news_agent/news_db_service.py b/backend/app/news_agent/news_db_service.py new file mode 100644 index 0000000..3f51502 --- /dev/null +++ b/backend/app/news_agent/news_db_service.py @@ -0,0 +1,406 @@ +""" +新闻数据库服务 +""" +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional +from sqlalchemy import create_engine, and_, or_ +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy.exc import IntegrityError + +from app.models.news import NewsArticle +from app.models.database import Base +from app.config import get_settings +from app.utils.logger import logger + + +class NewsDatabaseService: + """新闻数据库服务""" + + def __init__(self): + self.settings = get_settings() + self.engine = None + self.SessionLocal = None + self._init_db() + + def _init_db(self): + """初始化数据库连接""" + try: + # 使用 settings.database_url 或构建路径 + if hasattr(self.settings, 'database_url'): + database_url = self.settings.database_url + elif hasattr(self.settings, 'database_path'): + database_url = f"sqlite:///{self.settings.database_path}" + else: + # 默认路径 + database_url = "sqlite:///./backend/stock_agent.db" + + self.engine = create_engine( + database_url, + connect_args={"check_same_thread": False}, + echo=False + ) + + self.SessionLocal = sessionmaker( + autocommit=False, + autoflush=False, + bind=self.engine + ) + + # 创建表(如果不存在) + from app.models.news import NewsArticle + NewsArticle.metadata.create_all(self.engine, checkfirst=True) + + logger.info("新闻数据库服务初始化完成") + + except Exception as e: + logger.error(f"新闻数据库初始化失败: {e}") + import traceback + logger.error(traceback.format_exc()) + # 重新抛出异常,避免 SessionLocal 为 None + raise + + def get_session(self) -> Session: + """获取数据库会话""" + return self.SessionLocal() + + def save_article(self, article_data: Dict[str, Any]) -> Optional[NewsArticle]: + """ + 保存单篇文章 + + Args: + article_data: 文章数据字典 + + Returns: + 保存的文章对象或 None + """ + session = self.get_session() + try: + article = NewsArticle(**article_data) + session.add(article) + session.commit() + session.refresh(article) + + logger.debug(f"文章保存成功: {article.title[:50]}...") + return article + + except IntegrityError as e: + session.rollback() + logger.debug(f"文章已存在(URL 重复): {article_data.get('url', '')}") + return None + + except Exception as e: + session.rollback() + logger.error(f"保存文章失败: {e}") + return None + + finally: + session.close() + + def check_duplicate_by_hash(self, content_hash: str, hours: int = 24) -> bool: + """ + 检查内容哈希是否重复 + + Args: + content_hash: 内容哈希 + hours: 检查最近多少小时 + + Returns: + True 如果重复 + """ + session = self.get_session() + try: + since = datetime.utcnow() - timedelta(hours=hours) + + count = session.query(NewsArticle).filter( + and_( + NewsArticle.content_hash == content_hash, + NewsArticle.created_at >= since + ) + ).count() + + return count > 0 + + finally: + session.close() + + def mark_as_analyzed( + self, + article_id: int, + analysis: Dict[str, Any], + priority: float + ) -> bool: + """ + 标记文章已分析 + + Args: + article_id: 文章 ID + analysis: LLM 分析结果 + priority: 优先级分数 + + Returns: + 是否成功 + """ + session = self.get_session() + try: + article = session.query(NewsArticle).filter( + NewsArticle.id == article_id + ).first() + + if not article: + logger.warning(f"文章不存在: {article_id}") + return False + + article.llm_analyzed = True + article.market_impact = analysis.get('market_impact') + article.impact_type = analysis.get('impact_type') + article.sentiment = analysis.get('sentiment') + article.summary = analysis.get('summary') + article.key_points = analysis.get('key_points') + article.trading_advice = analysis.get('trading_advice') + article.relevant_symbols = analysis.get('relevant_symbols') + article.quality_score = analysis.get('confidence', 70) / 100 + article.priority = priority + + session.commit() + + logger.debug(f"文章分析结果已保存: {article.title[:50]}...") + return True + + except Exception as e: + session.rollback() + logger.error(f"保存分析结果失败: {e}") + return False + + finally: + session.close() + + def mark_as_notified(self, article_id: int, channel: str = 'feishu') -> bool: + """ + 标记文章已发送通知 + + Args: + article_id: 文章 ID + channel: 通知渠道 + + Returns: + 是否成功 + """ + session = self.get_session() + try: + article = session.query(NewsArticle).filter( + NewsArticle.id == article_id + ).first() + + if not article: + return False + + article.notified = True + article.notification_sent_at = datetime.utcnow() + article.notification_channel = channel + + session.commit() + return True + + except Exception as e: + session.rollback() + logger.error(f"标记通知状态失败: {e}") + return False + + finally: + session.close() + + def get_high_priority_articles( + self, + limit: int = 20, + min_priority: float = 40.0, + hours: int = 24 + ) -> List[NewsArticle]: + """ + 获取高优先级文章 + + Args: + limit: 返回数量限制 + min_priority: 最低优先级分数 + hours: 查询最近多少小时 + + Returns: + 文章列表 + """ + session = self.get_session() + try: + since = datetime.utcnow() - timedelta(hours=hours) + + articles = session.query(NewsArticle).filter( + and_( + NewsArticle.llm_analyzed == True, + NewsArticle.priority >= min_priority, + NewsArticle.created_at >= since, + NewsArticle.notified == False + ) + ).order_by(NewsArticle.priority.desc()).limit(limit).all() + + return articles + + finally: + session.close() + + def get_latest_articles( + self, + category: str = None, + limit: int = 50, + hours: int = 24 + ) -> List[Dict[str, Any]]: + """ + 获取最新文章 + + Args: + category: 分类过滤 + limit: 返回数量限制 + hours: 查询最近多少小时 + + Returns: + 文章字典列表 + """ + session = self.get_session() + try: + since = datetime.utcnow() - timedelta(hours=hours) + + query = session.query(NewsArticle).filter( + NewsArticle.created_at >= since + ) + + if category: + query = query.filter(NewsArticle.category == category) + + articles = query.order_by( + NewsArticle.created_at.desc() + ).limit(limit).all() + + return [article.to_dict() for article in articles] + + finally: + session.close() + + def get_stats(self, hours: int = 24) -> Dict[str, Any]: + """ + 获取统计数据 + + Args: + hours: 统计最近多少小时 + + Returns: + 统计数据 + """ + session = self.get_session() + try: + since = datetime.utcnow() - timedelta(hours=hours) + + total = session.query(NewsArticle).filter( + NewsArticle.created_at >= since + ).count() + + analyzed = session.query(NewsArticle).filter( + and_( + NewsArticle.created_at >= since, + NewsArticle.llm_analyzed == True + ) + ).count() + + high_impact = session.query(NewsArticle).filter( + and_( + NewsArticle.created_at >= since, + NewsArticle.market_impact == 'high' + ) + ).count() + + notified = session.query(NewsArticle).filter( + and_( + NewsArticle.created_at >= since, + NewsArticle.notified == True + ) + ).count() + + return { + 'total_articles': total, + 'analyzed': analyzed, + 'high_impact': high_impact, + 'notified': notified, + 'hours': hours + } + + finally: + session.close() + + def get_unanalyzed_articles(self, limit: int = 50, hours: int = 24) -> List[NewsArticle]: + """ + 获取未分析的文章 + + Args: + limit: 返回数量限制 + hours: 查询最近多少小时 + + Returns: + 未分析的文章列表 + """ + session = self.get_session() + try: + since = datetime.utcnow() - timedelta(hours=hours) + + articles = session.query(NewsArticle).filter( + and_( + NewsArticle.llm_analyzed == False, + NewsArticle.created_at >= since + ) + ).order_by(NewsArticle.created_at.desc()).limit(limit).all() + + return articles + + finally: + session.close() + + def clean_old_articles(self, days: int = 7) -> int: + """ + 清理旧文章(设置为不活跃) + + Args: + days: 保留多少天的文章 + + Returns: + 清理的数量 + """ + session = self.get_session() + try: + before = datetime.utcnow() - timedelta(days=days) + + count = session.query(NewsArticle).filter( + NewsArticle.created_at < before + ).update({ + 'is_active': False + }) + + session.commit() + + if count > 0: + logger.info(f"清理了 {count} 条旧文章") + + return count + + except Exception as e: + session.rollback() + logger.error(f"清理旧文章失败: {e}") + return 0 + + finally: + session.close() + + +# 全局实例 +_news_db_service = None + + +def get_news_db_service() -> NewsDatabaseService: + """获取新闻数据库服务单例""" + global _news_db_service + if _news_db_service is None: + _news_db_service = NewsDatabaseService() + return _news_db_service diff --git a/backend/app/news_agent/notifier.py b/backend/app/news_agent/notifier.py new file mode 100644 index 0000000..3fe1312 --- /dev/null +++ b/backend/app/news_agent/notifier.py @@ -0,0 +1,307 @@ +""" +新闻通知模块 - 发送飞书卡片通知 +""" +from typing import Dict, Any, List + +from app.utils.logger import logger +from app.services.feishu_service import get_feishu_news_service + + +class NewsNotifier: + """新闻通知器""" + + def __init__(self): + self.feishu = get_feishu_news_service() + + def _get_emoji_for_impact(self, impact: str) -> str: + """根据影响级别获取表情符号""" + emoji_map = { + 'high': '🔴', + 'medium': '🟡', + 'low': '🟢' + } + return emoji_map.get(impact, '📰') + + def _get_emoji_for_impact_type(self, impact_type: str) -> str: + """根据影响类型获取表情符号""" + emoji_map = { + 'bullish': '📈', + 'bearish': '📉', + 'neutral': '➡️' + } + return emoji_map.get(impact_type, '📊') + + def _get_color_for_impact(self, impact: str) -> str: + """根据影响级别获取颜色""" + color_map = { + 'high': 'red', + 'medium': 'orange', + 'low': 'blue' + } + return color_map.get(impact, 'grey') + + async def notify_single_news(self, article: Dict[str, Any]) -> bool: + """ + 发送单条新闻通知 + + Args: + article: 文章数据(包含分析结果) + + Returns: + 是否发送成功 + """ + try: + impact = article.get('market_impact', 'low') + impact_type = article.get('impact_type', 'neutral') + title = article.get('title', '') + summary = article.get('summary', '') + source = article.get('source', '') + category = article.get('category', '') + url = article.get('url', '') + trading_advice = article.get('trading_advice', '') + relevant_symbols = article.get('relevant_symbols', []) + key_points = article.get('key_points', []) + + # 标题 + impact_emoji = self._get_emoji_for_impact(impact) + type_emoji = self._get_emoji_for_impact_type(impact_type) + category_text = '加密货币' if category == 'crypto' else '股票' + + card_title = f"{impact_emoji} {type_emoji} 市场快讯 - {category_text}" + + # 内容 + content_parts = [ + f"**来源**: {source}", + f"**标题**: {title}", + "", + f"**摘要**: {summary}", + ] + + # 关键点 + if key_points: + content_parts.append("") + content_parts.append("**关键点**:") + for point in key_points[:3]: + content_parts.append(f"• {point}") + + # 交易建议 + if trading_advice: + content_parts.append("") + content_parts.append(f"**交易建议**: {trading_advice}") + + # 相关代码 + if relevant_symbols: + symbols_text = " ".join(relevant_symbols) + content_parts.append("") + content_parts.append(f"**相关**: {symbols_text}") + + # 链接 + if url: + content_parts.append("") + content_parts.append(f"[查看原文]({url})") + + # 影响 + impact_map = {'high': '重大影响', 'medium': '中等影响', 'low': '轻微影响'} + content_parts.append("") + content_parts.append(f"**影响**: {impact_map.get(impact, '未知')}") + + # 颜色 + color = self._get_color_for_impact(impact) + + # 发送 + content = "\n".join(content_parts) + await self.feishu.send_card(card_title, content, color) + + logger.info(f"新闻通知已发送: {title[:50]}...") + return True + + except Exception as e: + logger.error(f"发送新闻通知失败: {e}") + return False + + async def notify_news_batch(self, articles: List[Dict[str, Any]]) -> bool: + """ + 发送批量新闻通知(详细模式) + + Args: + articles: 文章列表 + + Returns: + 是否发送成功 + """ + try: + if not articles: + return False + + # 只显示重大影响新闻 + high_impact = [a for a in articles if a.get('market_impact') == 'high'] + + if not high_impact: + logger.info("没有重大影响新闻,跳过通知") + return False + + title = f"🔴 重大市场新闻 ({len(high_impact)} 条)" + + content_parts = [] + + # 获取时间(显示时分) + created_time = high_impact[0].get('created_at', '') + if created_time: + # 格式: 2026-02-25T12:30:45 -> 02-25 12:30 + try: + dt = created_time[:16].replace('T', ' ') + content_parts.append(f"**时间**: {dt}") + except: + content_parts.append(f"**时间**: {created_time[:10]}") + + # 只显示重大影响新闻 + for i, article in enumerate(high_impact[:5]): + impact_type = article.get('impact_type', 'neutral') + emoji = self._get_emoji_for_impact_type(impact_type) + + # 每条新闻之间空一行 + if i > 0: + content_parts.append("") + + # 构建单条新闻的所有内容 + article_lines = [] + + # 标题 + title_text = article.get('title', '') + article_lines.append(f"{emoji} **{title_text}**") + + # 来源 + source = article.get('source', '') + if source: + article_lines.append(f"📰 来源: {source}") + + # 新闻内容(摘要) + summary = article.get('summary', '') + content = article.get('content', '') + if summary: + article_lines.append(f"📝 {summary[:100]}") + elif content: + article_lines.append(f"📝 {content[:100]}") + + # 影响和建议 + impact_desc = { + 'bullish': '📈 利好', + 'bearish': '📉 利空', + 'neutral': '➡️ 中性' + }.get(impact_type, '➡️ 中性') + + advice = article.get('trading_advice', '') + if advice: + article_lines.append(f"{impact_desc} | 💡 {advice}") + + # 相关代码和链接 + extra_info = [] + symbols = article.get('relevant_symbols', []) + if symbols and isinstance(symbols, list): + extra_info.append(f"🔗 {' '.join(symbols[:4])}") + + url = article.get('url', '') + if url: + extra_info.append(f"🔎 [查看原文]({url})") + + if extra_info: + article_lines.append(" ".join(extra_info)) + + # 将这条新闻的所有内容合并为一行 + content_parts.append(" | ".join(article_lines)) + + content = "\n".join(content_parts) + await self.feishu.send_card(title, content, "red") + + logger.info(f"重大新闻通知已发送: {len(high_impact)} 条") + return True + + except Exception as e: + logger.error(f"发送批量新闻通知失败: {e}") + import traceback + traceback.print_exc() + return False + + async def notify_startup(self, config: Dict[str, Any]) -> bool: + """ + 发送启动通知 + + Args: + config: 配置信息 + + Returns: + 是否发送成功 + """ + try: + crypto_sources = config.get('crypto_sources', 0) + stock_sources = config.get('stock_sources', 0) + interval = config.get('fetch_interval', 30) + + title = "📰 新闻智能体已启动" + + content_parts = [ + f"🤖 **功能**: 实时新闻监控与分析", + f"", + f"📊 **监控来源**:", + f" • 加密货币: {crypto_sources} 个", + f" • 股票: {stock_sources} 个", + f"", + f"⏱️ **抓取频率**: 每 {interval} 秒", + f"", + f"🎯 **分析能力**:", + f" • LLM 智能分析", + f" • 市场影响评估", + f" • 交易建议生成", + f"", + f"📢 **通知策略**: 仅推送高影响新闻" + ] + + content = "\n".join(content_parts) + await self.feishu.send_card(title, content, "green") + + logger.info("新闻智能体启动通知已发送") + return True + + except Exception as e: + logger.error(f"发送启动通知失败: {e}") + return False + + async def notify_error(self, error_message: str) -> bool: + """ + 发送错误通知 + + Args: + error_message: 错误信息 + + Returns: + 是否发送成功 + """ + try: + title = "⚠️ 新闻智能体异常" + + content = f""" +**错误信息**: {error_message} + +**建议操作**: +1. 检查网络连接 +2. 查看日志文件 +3. 必要时重启服务 +""" + await self.feishu.send_card(title, content, "red") + return True + + except Exception as e: + logger.error(f"发送错误通知失败: {e}") + return False + + +# 全局实例 +_news_notifier = None + + +def get_news_notifier() -> NewsNotifier: + """获取新闻通知器单例""" + global _news_notifier + if _news_notifier is None: + _news_notifier = NewsNotifier() + return _news_notifier diff --git a/backend/app/news_agent/sources.py b/backend/app/news_agent/sources.py new file mode 100644 index 0000000..f0588ee --- /dev/null +++ b/backend/app/news_agent/sources.py @@ -0,0 +1,277 @@ +""" +新闻源配置 +定义各类新闻的 RSS 源 +""" + +# 加密货币新闻源 +CRYPTO_NEWS_SOURCES = [ + { + "name": "Cointelegraph", + "url": "https://cointelegraph.com/rss", + "category": "crypto", + "language": "en", + "priority": 1.0, # 权重 + "enabled": True + }, + { + "name": "CoinDesk", + "url": "https://www.coindesk.com/arc/outboundfeeds/rss/", + "category": "crypto", + "language": "en", + "priority": 1.0, + "enabled": True + }, + { + "name": "Decrypt", + "url": "https://decrypt.co/feed", + "category": "crypto", + "language": "en", + "priority": 0.9, + "enabled": True + }, + { + "name": "The Block", + "url": "https://www.theblock.co/rss.xml", + "category": "crypto", + "language": "en", + "priority": 0.9, + "enabled": True + }, + { + "name": "律动 BlockBeats", + "url": "https://www.theblockbeats.info/feed", + "category": "crypto", + "language": "zh", + "priority": 1.0, + "enabled": False # RSS 格式问题,暂时禁用 + }, + { + "name": "巴比特", + "url": "https://www.8btc.com/feed", + "category": "crypto", + "language": "zh", + "priority": 0.8, + "enabled": False # 连接不稳定,暂时禁用 + }, + { + "name": "CoinGlass", + "url": "https://coinglass.com/news/rss", + "category": "crypto", + "language": "en", + "priority": 0.8, + "enabled": False # 返回 HTML 而非 RSS,暂时禁用 + }, + { + "name": "CryptoSlate", + "url": "https://cryptoslate.com/news/feed", + "category": "crypto", + "language": "en", + "priority": 0.8, + "enabled": False # RSS 格式问题,暂时禁用 + }, + { + "name": "AMBCrypto", + "url": "https://ambcrypto.com/feed", + "category": "crypto", + "language": "en", + "priority": 0.7, + "enabled": True + }, + { + "name": "Whale Alert", + "url": "https://whale-alert.io/rss", + "category": "crypto", + "language": "en", + "priority": 0.7, + "enabled": False # 大额转账,可选择性开启 + }, +] + +# 股票新闻源 +STOCK_NEWS_SOURCES = [ + { + "name": "Reuters Business", + "url": "https://www.reuters.com/finance/rss", + "category": "stock", + "language": "en", + "priority": 1.0, + "enabled": False # 返回 HTML 而非 RSS,暂时禁用 + }, + { + "name": "CNBC", + "url": "https://www.cnbc.com/id/100003114/device/rss/rss.html", + "category": "stock", + "language": "en", + "priority": 1.0, + "enabled": True + }, + { + "name": "Bloomberg Markets", + "url": "https://feeds.bloomberg.com/markets/news.rss", + "category": "stock", + "language": "en", + "priority": 1.0, + "enabled": True + }, + { + "name": "Yahoo Finance", + "url": "https://finance.yahoo.com/news/rssindex", + "category": "stock", + "language": "en", + "priority": 0.8, + "enabled": True + }, + { + "name": "MarketWatch", + "url": "https://www.marketwatch.com/rss/topstories", + "category": "stock", + "language": "en", + "priority": 0.9, + "enabled": True + }, + { + "name": "Seeking Alpha", + "url": "https://seekingalpha.com/article/rss", + "category": "stock", + "language": "en", + "priority": 0.9, + "enabled": False # RSS 格式问题,暂时禁用 + }, + { + "name": "华尔街见闻", + "url": "https://wallstreetcn.com/rss", + "category": "stock", + "language": "zh", + "priority": 0.9, + "enabled": False # RSS 格式问题,暂时禁用 + }, + { + "name": "雪球", + "url": "https://xueqiu.com/statuses/hot_stock.xml", + "category": "stock", + "language": "zh", + "priority": 0.8, + "enabled": False # 需要认证,暂时禁用 + }, + { + "name": "Investing.com", + "url": "https://www.investing.com/rss/news.rss", + "category": "stock", + "language": "en", + "priority": 0.8, + "enabled": True + }, + { + "name": "Business Insider", + "url": "https://markets.businessinsider.com/rss/news", + "category": "stock", + "language": "en", + "priority": 0.7, + "enabled": True + }, +] + +# 获取所有启用的新闻源 +def get_enabled_sources(category: str = None) -> list: + """ + 获取启用的新闻源 + + Args: + category: 分类过滤 ('crypto', 'stock', None 表示全部) + + Returns: + 启用的新闻源列表 + """ + all_sources = CRYPTO_NEWS_SOURCES + STOCK_NEWS_SOURCES + + if category: + return [s for s in all_sources if s['enabled'] and s['category'] == category] + + return [s for s in all_sources if s['enabled']] + + +# 关键词配置 - 用于第一级过滤 +CRYPTO_KEYWORDS = { + 'high_impact': [ + # 监管相关 + 'SEC', 'regulation', 'ban', 'approve', 'ETF', 'legal', + '监管', '批准', '合法化', '禁令', + + # 重大事件 + 'hack', 'exploit', 'bankruptcy', 'acquisition', 'merger', + 'blackrock', 'grayscale', 'fidelity', '比特大陆', '币安', + + # 市场动态 + 'all-time high', ' ATH ', 'crash', 'surge', 'plunge', + '历史新高', '暴跌', '暴涨', '突破', + + # 技术更新 + 'upgrade', 'fork', 'airdrop', 'launch', + '升级', '分叉', '空投', '上线', + + # 宏观经济 + 'fed', 'inflation', 'recession', 'interest rate', + '美联储', '通胀', '加息', '降息', + ], + 'medium_impact': [ + 'partnership', 'integration', 'listing', 'delisting', + '合作', '上线', '下架', + 'whale', 'wallet', 'exchange', + ] +} + +STOCK_KEYWORDS = { + 'high_impact': [ + # 财报相关 + 'earnings', 'revenue', 'profit', 'loss', 'guidance', + '财报', '营收', '利润', '业绩预告', + + # 重大事件 + 'FDA', 'approval', 'recall', 'lawsuit', 'IPO', + '批准', '召回', '诉讼', '上市', + + # 并购重组 + 'merger', 'acquisition', 'spinoff', 'buyout', + '并购', '收购', '重组', + + # 市场动态 + 'beat', 'miss', 'surge', 'plunge', 'rally', + '超预期', '不及预期', '暴涨', '暴跌', '反弹', + + # 管理层变动 + 'CEO', 'CFO', 'resign', 'appoint', 'executive', + '辞职', '任命', 'CEO', + ], + 'medium_impact': [ + 'upgrade', 'downgrade', 'rating', 'target price', + '评级', '目标价', '上调', '下调', + 'dividend', 'buyback', 'split', + '分红', '回购', '拆股', + ] +} + +# 常见的币种和股票代码映射 +SYMBOL_MAPPINGS = { + # 加密货币 + 'BTC': ['bitcoin', 'btc', '比特币'], + 'ETH': ['ethereum', 'eth', '以太坊'], + 'BNB': ['binance', 'bnb', '币安'], + 'SOL': ['solana', 'sol'], + 'XRP': ['ripple', 'xrp'], + 'ADA': ['cardano', 'ada'], + 'DOGE': ['dogecoin', 'doge', '狗狗币'], + 'AVAX': ['avalanche', 'avax'], + 'DOT': ['polkadot', 'dot'], + 'MATIC': ['polygon', 'matic'], + + # 美股 + 'AAPL': ['apple', 'aapl', '苹果'], + 'NVDA': ['nvidia', 'nvda', '英伟达'], + 'MSFT': ['microsoft', 'msft', '微软'], + 'GOOGL': ['google', 'alphabet', 'googl', '谷歌'], + 'AMZN': ['amazon', 'amzn', '亚马逊'], + 'TSLA': ['tesla', 'tsla', '特斯拉'], + 'META': ['meta', 'facebook', 'meta'], + 'BRK.B': ['berkshire', 'buffett', '伯克希尔'], + 'JPM': ['jpmorgan', 'jpm', '摩根大通'], +} diff --git a/backend/app/services/feishu_service.py b/backend/app/services/feishu_service.py index ec8ef0f..ddbe026 100644 --- a/backend/app/services/feishu_service.py +++ b/backend/app/services/feishu_service.py @@ -31,6 +31,8 @@ class FeishuService: self.webhook_url = getattr(settings, 'feishu_crypto_webhook_url', '') elif service_type == "stock": self.webhook_url = getattr(settings, 'feishu_stock_webhook_url', '') + elif service_type == "news": + self.webhook_url = getattr(settings, 'feishu_news_webhook_url', '') else: # 兼容旧配置 self.webhook_url = getattr(settings, 'feishu_webhook_url', '') @@ -283,9 +285,10 @@ class FeishuService: -# 全局实例(延迟初始化)- 分别用于加密货币和股票 +# 全局实例(延迟初始化)- 分别用于加密货币、股票和新闻 _feishu_crypto_service: Optional[FeishuService] = None _feishu_stock_service: Optional[FeishuService] = None +_feishu_news_service: Optional[FeishuService] = None def get_feishu_service() -> FeishuService: @@ -307,3 +310,11 @@ def get_feishu_stock_service() -> FeishuService: if _feishu_stock_service is None: _feishu_stock_service = FeishuService(service_type="stock") return _feishu_stock_service + + +def get_feishu_news_service() -> FeishuService: + """获取新闻智能体飞书服务实例""" + global _feishu_news_service + if _feishu_news_service is None: + _feishu_news_service = FeishuService(service_type="news") + return _feishu_news_service diff --git a/backend/requirements.txt b/backend/requirements.txt index 61deb29..020dede 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -23,3 +23,8 @@ python-jose[cryptography]==3.3.0 python-binance>=1.0.19 httpx>=0.27.0 ccxt>=4.0.0 # 统一交易所API接口,支持Bitget等主流交易所 + +# 新闻智能体依赖 +feedparser>=6.0.10 +beautifulsoup4>=4.12.0 +lxml>=4.9.0 diff --git a/scripts/manual_news_fetch.py b/scripts/manual_news_fetch.py new file mode 100755 index 0000000..8dce768 --- /dev/null +++ b/scripts/manual_news_fetch.py @@ -0,0 +1,357 @@ +#!/usr/bin/env python3 +""" +手动执行新闻抓取和分析脚本 + +使用方法: + # 抓取所有新闻 + python scripts/manual_news_fetch.py + + # 只抓取加密货币新闻 + python scripts/manual_news_fetch.py --category crypto + + # 只抓取股票新闻 + python scripts/manual_news_fetch.py --category stock + + # 不使用 LLM 分析(仅关键词过滤) + python scripts/manual_news_fetch.py --no-llm + + # 显示详细输出 + python scripts/manual_news_fetch.py --verbose +""" +import asyncio +import argparse +import sys +import os +from pathlib import Path + +# 添加项目路径 +script_dir = Path(__file__).parent +project_root = script_dir.parent +backend_dir = project_root / "backend" + +sys.path.insert(0, str(backend_dir)) + +# 切换到 backend 目录作为工作目录 +os.chdir(backend_dir) + +from app.utils.logger import logger +from app.news_agent.news_agent import get_news_agent +from app.news_agent.fetcher import NewsFetcher, NewsItem +from app.news_agent.filter import NewsDeduplicator, NewsFilter +from app.news_agent.analyzer import NewsAnalyzer, NewsAnalyzerSimple +from app.news_agent.news_db_service import get_news_db_service +from app.news_agent.notifier import get_news_notifier + + +async def main(): + parser = argparse.ArgumentParser(description="手动执行新闻抓取和分析") + parser.add_argument( + "--category", + choices=["crypto", "stock"], + help="指定新闻分类(默认:全部)" + ) + parser.add_argument( + "--no-llm", + action="store_true", + help="不使用 LLM 分析,仅使用关键词过滤" + ) + parser.add_argument( + "--no-notify", + action="store_true", + help="不发送飞书通知" + ) + parser.add_argument( + "--verbose", + action="store_true", + help="显示详细输出" + ) + parser.add_argument( + "--limit", + type=int, + default=50, + help="每个源最多抓取数量(默认:50)" + ) + parser.add_argument( + "--analyze-existing", + action="store_true", + help="分析数据库中未分析的文章(不抓取新新闻)" + ) + + args = parser.parse_args() + + print("=" * 70) + print("📰 新闻智能体 - 手动抓取模式") + print("=" * 70) + print(f"📅 时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + if args.category: + print(f"📂 分类: {args.category}") + else: + print(f"📂 分类: 全部") + print(f"🤖 LLM 分析: {'禁用' if args.no_llm else '启用'}") + print(f"📢 通知: {'禁用' if args.no_notify else '启用'}") + print("=" * 70) + print() + + # 初始化组件 + fetcher = NewsFetcher() + deduplicator = NewsDeduplicator() + filter = NewsFilter() + db_service = get_news_db_service() + notifier = get_news_notifier() + + # 选择分析器 + if args.no_llm: + analyzer = NewsAnalyzerSimple() + print("使用规则分析器(关键词匹配)") + else: + analyzer = NewsAnalyzer() + print("使用 LLM 分析器") + + try: + # 如果是分析已有文章模式 + if args.analyze_existing: + print("\n" + "─" * 70) + print("📂 分析数据库中未分析的文章") + print("─" * 70) + + unanalyzed = db_service.get_unanalyzed_articles(limit=args.limit, hours=48) + print(f"✅ 找到 {len(unanalyzed)} 条未分析的文章") + + if not unanalyzed: + print("没有需要分析的文章") + return + + # 转换为 NewsItem 对象 + saved_articles = [] + for article in unanalyzed: + # 创建临时 NewsItem + item = NewsItem( + title=article.title, + content=article.content or "", + url=article.url, + source=article.source, + category=article.category, + published_at=article.published_at, + crawled_at=article.crawled_at, + content_hash=article.content_hash, + author=article.author, + tags=article.tags + ) + # 附加属性 + item.quality_score = article.quality_score or 0.5 + saved_articles.append((article, item)) + + else: + # 1. 抓取新闻 + print("\n" + "─" * 70) + print("📡 第一步: 抓取新闻") + print("─" * 70) + + items = await fetcher.fetch_all_news(category=args.category) + + if not items: + print("❌ 没有获取到任何新闻") + return + + print(f"✅ 获取到 {len(items)} 条新闻") + + if args.verbose: + for i, item in enumerate(items[:10], 1): + print(f" {i}. [{item.category}] {item.title[:60]}...") + + # 2. 去重 + print("\n" + "─" * 70) + print("🔍 第二步: 去重") + print("─" * 70) + + items = deduplicator.deduplicate_list(items) + print(f"✅ 去重后剩余 {len(items)} 条") + + # 3. 过滤 + print("\n" + "─" * 70) + print("⚙️ 第三步: 关键词过滤") + print("─" * 70) + + filtered_items = filter.filter_news(items, min_quality=0.3) + print(f"✅ 过滤后剩余 {len(filtered_items)} 条") + + if args.verbose and filtered_items: + print("\n通过过滤的新闻:") + for i, item in enumerate(filtered_items[:10], 1): + impact_score = getattr(item, 'impact_score', 0) + quality_score = getattr(item, 'quality_score', 0) + print(f" {i}. [{item.category}] 影响:{impact_score:.1f} 质量:{quality_score:.2f}") + print(f" {item.title[:60]}...") + + # 4. 保存到数据库 + print("\n" + "─" * 70) + print("💾 第四步: 保存到数据库") + print("─" * 70) + + saved_articles = [] + for item in filtered_items: + # 检查是否已存在 + if db_service.check_duplicate_by_hash(item.content_hash): + if args.verbose: + print(f" ⊗ 已存在: {item.title[:50]}...") + continue + + # 保存 + article_data = { + 'title': item.title, + 'content': item.content, + 'url': item.url, + 'source': item.source, + 'author': item.author, + 'category': item.category, + 'tags': item.tags, + 'published_at': item.published_at, + 'crawled_at': item.crawled_at, + 'content_hash': item.content_hash, + 'quality_score': getattr(item, 'quality_score', 0.5), + } + + article = db_service.save_article(article_data) + if article: + saved_articles.append((article, item)) + if args.verbose: + print(f" ✓ 已保存: {item.title[:50]}...") + + if not args.analyze_existing: + print(f"✅ 保存了 {len(saved_articles)} 条新文章") + + if not saved_articles: + print("\n没有新文章需要处理") + return + + # 5. LLM 分析 + print("\n" + "─" * 70) + print("🤖 第五步: AI 分析") + print("─" * 70) + + analyzed_count = 0 + high_priority_articles = [] + + # 批量分析 + batch_size = 10 if not args.no_llm else 20 + for i in range(0, len(saved_articles), batch_size): + batch = saved_articles[i:i + batch_size] + + if args.no_llm: + # 规则分析 + for article, item in batch: + result = analyzer.analyze_single(item) + priority = result.get('confidence', 50) + + db_service.mark_as_analyzed(article.id, result, priority) + analyzed_count += 1 + + print(f" ✓ [{result['market_impact']}] {article.title[:50]}...") + print(f" 情绪: {result['sentiment']} | 相关: {', '.join(result.get('relevant_symbols', [])[:3])}") + + if priority >= 40: + # 构造包含分析结果的完整字典 + article_dict = article.to_dict() + article_dict.update({ + 'llm_analyzed': True, + 'market_impact': result.get('market_impact'), + 'impact_type': result.get('impact_type'), + 'sentiment': result.get('sentiment'), + 'summary': result.get('summary'), + 'key_points': result.get('key_points'), + 'trading_advice': result.get('trading_advice'), + 'relevant_symbols': result.get('relevant_symbols'), + 'priority': priority, + }) + high_priority_articles.append(article_dict) + + else: + # LLM 批量分析 + items_to_analyze = [item for _, item in batch] + results = analyzer.analyze_batch(items_to_analyze) + + for (article, _), result in zip(batch, results): + if result: + priority = analyzer.calculate_priority( + result, + getattr(article, 'quality_score', 0.5) + ) + db_service.mark_as_analyzed(article.id, result, priority) + + analyzed_count += 1 + print(f" ✓ [{result['market_impact']}] {article.title[:50]}...") + print(f" {result.get('summary', 'N/A')[:60]}...") + + if priority >= 40: + # 构造包含分析结果的完整字典 + article_dict = article.to_dict() + # 更新为最新的分析结果 + article_dict.update({ + 'llm_analyzed': True, + 'market_impact': result.get('market_impact'), + 'impact_type': result.get('impact_type'), + 'sentiment': result.get('sentiment'), + 'summary': result.get('summary'), + 'key_points': result.get('key_points'), + 'trading_advice': result.get('trading_advice'), + 'relevant_symbols': result.get('relevant_symbols'), + 'priority': priority, + }) + high_priority_articles.append(article_dict) + + print(f"\n✅ 分析了 {analyzed_count} 条文章") + print(f"🔥 发现 {len(high_priority_articles)} 条高优先级新闻") + + # 6. 发送通知 + if not args.no_notify and high_priority_articles: + print("\n" + "─" * 70) + print("📢 第六步: 发送通知") + print("─" * 70) + + # 按优先级排序 + high_priority_articles.sort( + key=lambda x: x.get('priority', 0), + reverse=True + ) + + # 如果只有1-2条,单独发送;否则批量发送 + if len(high_priority_articles) <= 2: + for article in high_priority_articles: + await notifier.notify_single_news(article) + db_service.mark_as_notified(article['id']) + print(f" ✓ 已发送: {article['title'][:50]}...") + else: + await notifier.notify_news_batch(high_priority_articles[:10]) + for article in high_priority_articles[:10]: + db_service.mark_as_notified(article['id']) + print(f" ✓ 已发送批量通知 ({len(high_priority_articles[:10])} 条)") + + # 7. 统计报告 + print("\n" + "=" * 70) + print("📊 处理完成 - 统计报告") + print("=" * 70) + + db_stats = db_service.get_stats(hours=24) + print(f"📰 最近24小时:") + print(f" • 总文章数: {db_stats['total_articles']}") + print(f" • 已分析: {db_stats['analyzed']}") + print(f" • 高影响: {db_stats['high_impact']}") + print(f" • 已通知: {db_stats['notified']}") + + print("\n" + "=" * 70) + + except Exception as e: + print(f"\n❌ 执行失败: {e}") + import traceback + traceback.print_exc() + return 1 + + finally: + await fetcher.close() + + return 0 + + +if __name__ == "__main__": + exit_code = asyncio.run(main()) + sys.exit(exit_code) diff --git a/scripts/migrate_create_news_table.py b/scripts/migrate_create_news_table.py new file mode 100644 index 0000000..ee21ba4 --- /dev/null +++ b/scripts/migrate_create_news_table.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +""" +数据库迁移脚本:创建 news_articles 表 + +使用方法: + python scripts/migrate_create_news_table.py + +或者在服务器上直接执行 SQL(参考 schema.sql) +""" +import sqlite3 +import os +from pathlib import Path + + +# SQL 建表语句 +CREATE_TABLE_SQL = """ +CREATE TABLE IF NOT EXISTS news_articles ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + title VARCHAR(500) NOT NULL, + content TEXT, + content_hash VARCHAR(64) NOT NULL, + url VARCHAR(1000) NOT NULL UNIQUE, + source VARCHAR(100) NOT NULL, + author VARCHAR(200), + category VARCHAR(50) NOT NULL, + tags JSON, + published_at DATETIME, + crawled_at DATETIME NOT NULL, + llm_analyzed BOOLEAN DEFAULT 0, + market_impact VARCHAR(20), + impact_type VARCHAR(50), + relevant_symbols JSON, + sentiment VARCHAR(20), + summary TEXT, + key_points JSON, + trading_advice TEXT, + priority REAL DEFAULT 0.0, + priority_reason TEXT, + notified BOOLEAN DEFAULT 0, + notification_sent_at DATETIME, + notification_channel VARCHAR(50), + quality_score REAL, + duplicate_of INTEGER, + is_active BOOLEAN DEFAULT 1, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL +); + +-- 创建索引 +CREATE INDEX IF NOT EXISTS idx_news_articles_content_hash ON news_articles(content_hash); +CREATE INDEX IF NOT EXISTS idx_news_articles_category ON news_articles(category); +CREATE INDEX IF NOT EXISTS idx_news_articles_published_at ON news_articles(published_at); +CREATE INDEX IF NOT EXISTS idx_news_articles_crawled_at ON news_articles(crawled_at); +CREATE INDEX IF NOT EXISTS idx_news_articles_llm_analyzed ON news_articles(llm_analyzed); +CREATE INDEX IF NOT EXISTS idx_news_articles_priority ON news_articles(priority); +CREATE INDEX IF NOT EXISTS idx_news_articles_notified ON news_articles(notified); +CREATE INDEX IF NOT EXISTS idx_news_articles_is_active ON news_articles(is_active); +CREATE INDEX IF NOT EXISTS idx_news_articles_created_at ON news_articles(created_at); +""" + + +def migrate_create_news_table(): + """创建 news_articles 表""" + + # 数据库路径 + db_path = Path(__file__).parent.parent / "backend" / "stock_agent.db" + + if not db_path.exists(): + print(f"❌ 数据库文件不存在: {db_path}") + return False + + try: + # 连接数据库 + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # 检查表是否已存在 + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='news_articles'") + if cursor.fetchone(): + print("✅ news_articles 表已存在,无需迁移") + conn.close() + return True + + # 创建表 + print(f"📝 正在创建 news_articles 表到 {db_path}...") + + # 执行 SQL + for sql_statement in CREATE_TABLE_SQL.split(';'): + sql_statement = sql_statement.strip() + if sql_statement: + cursor.execute(sql_statement) + + conn.commit() + + # 验证 + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='news_articles'") + if cursor.fetchone(): + print("✅ news_articles 表创建成功") + + # 显示索引 + cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name LIKE 'idx_news_articles%'") + indexes = cursor.fetchall() + print(f"✅ 创建了 {len(indexes)} 个索引") + + conn.close() + return True + else: + print("❌ 表创建失败") + conn.close() + return False + + except Exception as e: + print(f"❌ 迁移失败: {e}") + import traceback + traceback.print_exc() + return False + + +if __name__ == "__main__": + print("=" * 60) + print("数据库迁移:创建 news_articles 表") + print("=" * 60) + + success = migrate_create_news_table() + + if success: + print("\n✅ 迁移完成!") + print("\n请重启服务以使更改生效:") + print(" pm2 restart stock-agent") + else: + print("\n❌ 迁移失败!") + print("\n如果自动迁移失败,可以参考 scripts/schema_news.sql 手动执行 SQL") diff --git a/scripts/run_news_fetch.sh b/scripts/run_news_fetch.sh new file mode 100755 index 0000000..9d65b85 --- /dev/null +++ b/scripts/run_news_fetch.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# 新闻抓取启动脚本 + +# 获取脚本所在目录 +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" +BACKEND_DIR="$PROJECT_ROOT/backend" + +# 激活虚拟环境 +if [ -f "$BACKEND_DIR/venv/bin/activate" ]; then + source "$BACKEND_DIR/venv/bin/activate" +else + echo "错误: 虚拟环境不存在,请先创建: $BACKEND_DIR/venv" + exit 1 +fi + +# 切换到后端目录 +cd "$BACKEND_DIR" + +# 运行 Python 脚本 +python "$SCRIPT_DIR/manual_news_fetch.py" "$@" diff --git a/scripts/schema_news.sql b/scripts/schema_news.sql new file mode 100644 index 0000000..ca635bb --- /dev/null +++ b/scripts/schema_news.sql @@ -0,0 +1,45 @@ +-- 新闻文章表建表 SQL +-- 使用方法: sqlite3 backend/stock_agent.db < schema_news.sql + +CREATE TABLE IF NOT EXISTS news_articles ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + title VARCHAR(500) NOT NULL, + content TEXT, + content_hash VARCHAR(64) NOT NULL, + url VARCHAR(1000) NOT NULL UNIQUE, + source VARCHAR(100) NOT NULL, + author VARCHAR(200), + category VARCHAR(50) NOT NULL, + tags JSON, + published_at DATETIME, + crawled_at DATETIME NOT NULL, + llm_analyzed BOOLEAN DEFAULT 0, + market_impact VARCHAR(20), + impact_type VARCHAR(50), + relevant_symbols JSON, + sentiment VARCHAR(20), + summary TEXT, + key_points JSON, + trading_advice TEXT, + priority REAL DEFAULT 0.0, + priority_reason TEXT, + notified BOOLEAN DEFAULT 0, + notification_sent_at DATETIME, + notification_channel VARCHAR(50), + quality_score REAL, + duplicate_of INTEGER, + is_active BOOLEAN DEFAULT 1, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL +); + +-- 创建索引 +CREATE INDEX IF NOT EXISTS idx_news_articles_content_hash ON news_articles(content_hash); +CREATE INDEX IF NOT EXISTS idx_news_articles_category ON news_articles(category); +CREATE INDEX IF NOT EXISTS idx_news_articles_published_at ON news_articles(published_at); +CREATE INDEX IF NOT EXISTS idx_news_articles_crawled_at ON news_articles(crawled_at); +CREATE INDEX IF NOT EXISTS idx_news_articles_llm_analyzed ON news_articles(llm_analyzed); +CREATE INDEX IF NOT EXISTS idx_news_articles_priority ON news_articles(priority); +CREATE INDEX IF NOT EXISTS idx_news_articles_notified ON news_articles(notified); +CREATE INDEX IF NOT EXISTS idx_news_articles_is_active ON news_articles(is_active); +CREATE INDEX IF NOT EXISTS idx_news_articles_created_at ON news_articles(created_at); diff --git a/scripts/test_news_notification.py b/scripts/test_news_notification.py new file mode 100644 index 0000000..a4f7a75 --- /dev/null +++ b/scripts/test_news_notification.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +""" +测试新闻通知格式 +""" +import asyncio +import sys +from pathlib import Path + +# 添加项目路径 +script_dir = Path(__file__).parent +project_root = script_dir.parent +backend_dir = project_root / "backend" + +sys.path.insert(0, str(backend_dir)) + +import os +os.chdir(backend_dir) + +from app.news_agent.notifier import get_news_notifier + + +# 模拟高影响新闻数据 +test_articles = [ + { + 'id': 1, + 'title': 'Bitcoin ETFs See Record $500M Inflows as Institutions Pile In', + 'source': 'CoinDesk', + 'category': 'crypto', + 'url': 'https://example.com/article1', + 'market_impact': 'high', + 'impact_type': 'bullish', + 'sentiment': 'positive', + 'summary': '比特币现货ETF昨日吸引5亿美元资金流入,创历史新高,显示机构投资者持续增持。', + 'key_points': [ + '贝莱德IBIT录得3亿美元流入', + '富达FBTC流入1.5亿美元', + '机构持仓占比超过60%' + ], + 'trading_advice': '建议持有BTC,关注回调后的买入机会', + 'relevant_symbols': ['BTC', 'IBIT', 'FBTC'], + 'priority': 85, + 'created_at': '2026-02-25T12:00:00' + }, + { + 'id': 2, + 'title': 'SEC Delays Decision on Ethereum ETF Options Listings', + 'source': 'Bloomberg', + 'category': 'crypto', + 'url': 'https://example.com/article2', + 'market_impact': 'medium', + 'impact_type': 'neutral', + 'sentiment': 'neutral', + 'summary': 'SEC再次推迟对以太坊ETF期权上市的决议,新的截止日期为4月底。', + 'key_points': [ + 'SEC引用需要额外审查时间', + '这是第三次推迟', + '市场反应温和' + ], + 'trading_advice': 'ETH持仓观望,等待ETF期权批准', + 'relevant_symbols': ['ETH', 'ETHA'], + 'priority': 55, + 'created_at': '2026-02-25T11:30:00' + }, + { + 'id': 3, + 'title': 'NVIDIA Surpasses $4 Trillion Market Cap Amid AI Chip Demand', + 'source': 'CNBC', + 'category': 'stock', + 'url': 'https://example.com/article3', + 'market_impact': 'high', + 'impact_type': 'bullish', + 'sentiment': 'positive', + 'summary': '英伟达市值突破4万亿美元大关,成为全球市值最高的公司,AI芯片需求持续爆发。', + 'key_points': [ + '股价上涨5%至每股1600美元', + 'H100芯片供不应求', + '数据中心收入同比增长300%' + ], + 'trading_advice': '建议继续持有NVDA,AI趋势未完', + 'relevant_symbols': ['NVDA', 'AMD'], + 'priority': 80, + 'created_at': '2026-02-25T10:15:00' + } +] + + +async def main(): + print("=" * 70) + print("🧪 测试新闻通知格式") + print("=" * 70) + print() + + notifier = get_news_notifier() + + print("📊 测试数据:") + for i, article in enumerate(test_articles, 1): + print(f" {i}. [{article['market_impact']}] {article['title'][:50]}...") + print(f" 摘要: {article['summary'][:50]}...") + print(f" 建议: {article['trading_advice']}") + print() + + print("📢 发送测试通知...") + result = await notifier.notify_news_batch(test_articles) + + if result: + print("✅ 通知发送成功!") + else: + print("❌ 通知发送失败") + + print() + print("=" * 70) + + return 0 + + +if __name__ == "__main__": + exit_code = asyncio.run(main()) + sys.exit(exit_code)