diff --git a/backend/app/services/news_service.py b/backend/app/services/news_service.py index ccc8a63..74e17be 100644 --- a/backend/app/services/news_service.py +++ b/backend/app/services/news_service.py @@ -3,6 +3,7 @@ """ import re import html +import asyncio import aiohttp import xml.etree.ElementTree as ET from typing import List, Dict, Any, Optional @@ -17,6 +18,12 @@ class NewsService: # 律动快讯 RSS BLOCKBEATS_RSS = "https://api.theblockbeats.news/v2/rss/newsflash" + # Cointelegraph RSS - 英文加密货币新闻 + COINTELEGRAPH_RSS = "https://cointelegraph.com/rss" + + # CoinDesk RSS - 权威加密货币新闻 + COINDESK_RSS = "https://www.coindesk.com/arc/outboundfeeds/rss/" + # Brave Search API BRAVE_SEARCH_API = "https://api.search.brave.com/res/v1/web/search" @@ -30,7 +37,12 @@ class NewsService: async def get_latest_news(self, limit: int = 20) -> List[Dict[str, Any]]: """ - 获取最新新闻 + 获取最新加密货币新闻(多源聚合) + + 数据源: + 1. 律动快讯 - 中文快讯 + 2. Cointelegraph - 英文新闻 + 3. CoinDesk - 英文深度分析 Args: limit: 获取数量 @@ -41,15 +53,41 @@ class NewsService: # 检查缓存 if self._cache and self._cache_time: if datetime.now() - self._cache_time < self._cache_duration: - return self._cache[:limit] + return self._cache['crypto'][:limit] if isinstance(self._cache, dict) else self._cache[:limit] try: - news = await self._fetch_blockbeats_news() - self._cache = news + # 并发获取所有源的新闻 + news_tasks = [ + self._fetch_blockbeats_news(), + self._fetch_cointelegraph_news(), + self._fetch_coindesk_news(), + ] + + results = await asyncio.gather(*news_tasks, return_exceptions=True) + + # 合并新闻 + all_news = [] + for result in results: + if isinstance(result, list): + all_news.extend(result) + elif isinstance(result, Exception): + logger.warning(f"获取新闻失败: {result}") + + # 按时间排序 + all_news.sort(key=lambda x: x.get('time') or datetime.min, reverse=True) + + # 更新缓存 + self._cache = {'crypto': all_news, 'stock': self._cache.get('stock', {}) if isinstance(self._cache, dict) else {}} self._cache_time = datetime.now() - return news[:limit] + + logger.info(f"获取到 {len(all_news)} 条加密货币新闻(律动+Cointelegraph+CoinDesk)") + return all_news[:limit] + except Exception as e: logger.error(f"获取新闻失败: {e}") + # 返回缓存 + if isinstance(self._cache, dict): + return self._cache.get('crypto', [])[:limit] return self._cache[:limit] if self._cache else [] async def _fetch_blockbeats_news(self) -> List[Dict[str, Any]]: @@ -123,6 +161,132 @@ class NewsService: logger.debug(traceback.format_exc()) return [] + async def _fetch_cointelegraph_news(self) -> List[Dict[str, Any]]: + """获取 Cointelegraph 新闻(英文)""" + news_list = [] + + try: + async with aiohttp.ClientSession() as session: + async with session.get(self.COINTELEGRAPH_RSS, timeout=10) as response: + if response.status != 200: + logger.error(f"获取 Cointelegraph 失败: HTTP {response.status}") + return [] + + content = await response.text() + + # 解析 XML + root = ET.fromstring(content) + channel = root.find('channel') + + if channel is None: + return [] + + for item in channel.findall('item')[:20]: # 最多取20条 + title_elem = item.find('title') + desc_elem = item.find('description') + pub_date_elem = item.find('pubDate') + link_elem = item.find('link') + + if title_elem is None: + continue + + # 提取标题 + title = self._clean_cdata(title_elem.text or '') + + # 提取描述(去除 HTML 标签) + description = '' + if desc_elem is not None and desc_elem.text: + description = self._clean_html(self._clean_cdata(desc_elem.text)) + + # 解析时间 + pub_time = None + if pub_date_elem is not None and pub_date_elem.text: + pub_time = self._parse_rss_date(self._clean_cdata(pub_date_elem.text)) + + # 链接 + link = '' + if link_elem is not None and link_elem.text: + link = self._clean_cdata(link_elem.text) + + news_list.append({ + 'title': title, + 'description': description[:500], + 'time': pub_time, + 'time_str': pub_time.strftime('%m-%d %H:%M') if pub_time else '', + 'link': link, + 'source': 'Cointelegraph' + }) + + logger.info(f"获取到 {len(news_list)} 条 Cointelegraph 新闻") + return news_list + + except Exception as e: + logger.error(f"获取 Cointelegraph 失败: {e}") + return [] + + async def _fetch_coindesk_news(self) -> List[Dict[str, Any]]: + """获取 CoinDesk 新闻(英文)""" + news_list = [] + + try: + async with aiohttp.ClientSession() as session: + async with session.get(self.COINDESK_RSS, timeout=10) as response: + if response.status != 200: + logger.error(f"获取 CoinDesk 失败: HTTP {response.status}") + return [] + + content = await response.text() + + # 解析 XML + root = ET.fromstring(content) + channel = root.find('channel') + + if channel is None: + return [] + + for item in channel.findall('item')[:20]: # 最多取20条 + title_elem = item.find('title') + desc_elem = item.find('description') + pub_date_elem = item.find('pubDate') + link_elem = item.find('link') + + if title_elem is None: + continue + + # 提取标题 + title = self._clean_cdata(title_elem.text or '') + + # 提取描述(去除 HTML 标签) + description = '' + if desc_elem is not None and desc_elem.text: + description = self._clean_html(self._clean_cdata(desc_elem.text)) + + # 解析时间 + pub_time = None + if pub_date_elem is not None and pub_date_elem.text: + pub_time = self._parse_rss_date(self._clean_cdata(pub_date_elem.text)) + + # 链接 + link = '' + if link_elem is not None and link_elem.text: + link = self._clean_cdata(link_elem.text) + + news_list.append({ + 'title': title, + 'description': description[:500], + 'time': pub_time, + 'time_str': pub_time.strftime('%m-%d %H:%M') if pub_time else '', + 'link': link, + 'source': 'CoinDesk' + }) + + logger.info(f"获取到 {len(news_list)} 条 CoinDesk 新闻") + return news_list + + except Exception as e: + logger.error(f"获取 CoinDesk 失败: {e}") + return [] + def _clean_cdata(self, text: str) -> str: """清理 CDATA 标记""" if not text: diff --git a/scripts/test_crypto_news.py b/scripts/test_crypto_news.py new file mode 100755 index 0000000..903804b --- /dev/null +++ b/scripts/test_crypto_news.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +""" +测试加密货币新闻获取(多源聚合) +""" +import asyncio +import sys +import os + +# 确保路径正确 +script_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.dirname(script_dir) +backend_dir = os.path.join(project_root, 'backend') +sys.path.insert(0, backend_dir) + +from app.services.news_service import get_news_service + + +async def main(): + print("=" * 60) + print("📰 测试加密货币新闻获取(多源聚合)") + print("=" * 60) + + news_service = get_news_service() + + # 获取最新新闻 + print("\n🔍 获取最新加密货币新闻...") + news_list = await news_service.get_latest_news(limit=30) + + print(f"\n✅ 获取到 {len(news_list)} 条新闻\n") + + # 按来源分组统计 + sources = {} + for news in news_list: + source = news.get('source', 'Unknown') + sources[source] = sources.get(source, 0) + 1 + + print("📊 新闻来源统计:") + for source, count in sources.items(): + print(f" {source}: {count} 条") + + # 显示最新10条新闻 + print("\n" + "=" * 60) + print("📰 最新 10 条新闻") + print("=" * 60) + + for i, news in enumerate(news_list[:10], 1): + time_str = news.get('time_str', '') + title = news.get('title', '') + source = news.get('source', '') + desc = news.get('description', '')[:100] + + print(f"\n{i}. [{time_str}] {source}") + print(f" {title}") + if desc: + print(f" {desc}...") + + # 测试格式化给 LLM + print("\n" + "=" * 60) + print("🤖 格式化给 LLM 的新闻") + print("=" * 60) + + formatted_news = news_service.format_news_for_llm(news_list[:5], max_items=5) + print(formatted_news) + + print("\n" + "=" * 60) + print("✅ 测试完成") + print("=" * 60) + + +if __name__ == "__main__": + asyncio.run(main())