This commit is contained in:
aaron 2026-02-20 21:39:47 +08:00
parent eeac457323
commit 75592857a3
2 changed files with 240 additions and 5 deletions

View File

@ -3,6 +3,7 @@
""" """
import re import re
import html import html
import asyncio
import aiohttp import aiohttp
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
@ -17,6 +18,12 @@ class NewsService:
# 律动快讯 RSS # 律动快讯 RSS
BLOCKBEATS_RSS = "https://api.theblockbeats.news/v2/rss/newsflash" BLOCKBEATS_RSS = "https://api.theblockbeats.news/v2/rss/newsflash"
# Cointelegraph RSS - 英文加密货币新闻
COINTELEGRAPH_RSS = "https://cointelegraph.com/rss"
# CoinDesk RSS - 权威加密货币新闻
COINDESK_RSS = "https://www.coindesk.com/arc/outboundfeeds/rss/"
# Brave Search API # Brave Search API
BRAVE_SEARCH_API = "https://api.search.brave.com/res/v1/web/search" BRAVE_SEARCH_API = "https://api.search.brave.com/res/v1/web/search"
@ -30,7 +37,12 @@ class NewsService:
async def get_latest_news(self, limit: int = 20) -> List[Dict[str, Any]]: async def get_latest_news(self, limit: int = 20) -> List[Dict[str, Any]]:
""" """
获取最新新闻 获取最新加密货币新闻多源聚合
数据源:
1. 律动快讯 - 中文快讯
2. Cointelegraph - 英文新闻
3. CoinDesk - 英文深度分析
Args: Args:
limit: 获取数量 limit: 获取数量
@ -41,15 +53,41 @@ class NewsService:
# 检查缓存 # 检查缓存
if self._cache and self._cache_time: if self._cache and self._cache_time:
if datetime.now() - self._cache_time < self._cache_duration: if datetime.now() - self._cache_time < self._cache_duration:
return self._cache[:limit] return self._cache['crypto'][:limit] if isinstance(self._cache, dict) else self._cache[:limit]
try: try:
news = await self._fetch_blockbeats_news() # 并发获取所有源的新闻
self._cache = news news_tasks = [
self._fetch_blockbeats_news(),
self._fetch_cointelegraph_news(),
self._fetch_coindesk_news(),
]
results = await asyncio.gather(*news_tasks, return_exceptions=True)
# 合并新闻
all_news = []
for result in results:
if isinstance(result, list):
all_news.extend(result)
elif isinstance(result, Exception):
logger.warning(f"获取新闻失败: {result}")
# 按时间排序
all_news.sort(key=lambda x: x.get('time') or datetime.min, reverse=True)
# 更新缓存
self._cache = {'crypto': all_news, 'stock': self._cache.get('stock', {}) if isinstance(self._cache, dict) else {}}
self._cache_time = datetime.now() self._cache_time = datetime.now()
return news[:limit]
logger.info(f"获取到 {len(all_news)} 条加密货币新闻(律动+Cointelegraph+CoinDesk")
return all_news[:limit]
except Exception as e: except Exception as e:
logger.error(f"获取新闻失败: {e}") logger.error(f"获取新闻失败: {e}")
# 返回缓存
if isinstance(self._cache, dict):
return self._cache.get('crypto', [])[:limit]
return self._cache[:limit] if self._cache else [] return self._cache[:limit] if self._cache else []
async def _fetch_blockbeats_news(self) -> List[Dict[str, Any]]: async def _fetch_blockbeats_news(self) -> List[Dict[str, Any]]:
@ -123,6 +161,132 @@ class NewsService:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
return [] return []
async def _fetch_cointelegraph_news(self) -> List[Dict[str, Any]]:
"""获取 Cointelegraph 新闻(英文)"""
news_list = []
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.COINTELEGRAPH_RSS, timeout=10) as response:
if response.status != 200:
logger.error(f"获取 Cointelegraph 失败: HTTP {response.status}")
return []
content = await response.text()
# 解析 XML
root = ET.fromstring(content)
channel = root.find('channel')
if channel is None:
return []
for item in channel.findall('item')[:20]: # 最多取20条
title_elem = item.find('title')
desc_elem = item.find('description')
pub_date_elem = item.find('pubDate')
link_elem = item.find('link')
if title_elem is None:
continue
# 提取标题
title = self._clean_cdata(title_elem.text or '')
# 提取描述(去除 HTML 标签)
description = ''
if desc_elem is not None and desc_elem.text:
description = self._clean_html(self._clean_cdata(desc_elem.text))
# 解析时间
pub_time = None
if pub_date_elem is not None and pub_date_elem.text:
pub_time = self._parse_rss_date(self._clean_cdata(pub_date_elem.text))
# 链接
link = ''
if link_elem is not None and link_elem.text:
link = self._clean_cdata(link_elem.text)
news_list.append({
'title': title,
'description': description[:500],
'time': pub_time,
'time_str': pub_time.strftime('%m-%d %H:%M') if pub_time else '',
'link': link,
'source': 'Cointelegraph'
})
logger.info(f"获取到 {len(news_list)} 条 Cointelegraph 新闻")
return news_list
except Exception as e:
logger.error(f"获取 Cointelegraph 失败: {e}")
return []
async def _fetch_coindesk_news(self) -> List[Dict[str, Any]]:
"""获取 CoinDesk 新闻(英文)"""
news_list = []
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.COINDESK_RSS, timeout=10) as response:
if response.status != 200:
logger.error(f"获取 CoinDesk 失败: HTTP {response.status}")
return []
content = await response.text()
# 解析 XML
root = ET.fromstring(content)
channel = root.find('channel')
if channel is None:
return []
for item in channel.findall('item')[:20]: # 最多取20条
title_elem = item.find('title')
desc_elem = item.find('description')
pub_date_elem = item.find('pubDate')
link_elem = item.find('link')
if title_elem is None:
continue
# 提取标题
title = self._clean_cdata(title_elem.text or '')
# 提取描述(去除 HTML 标签)
description = ''
if desc_elem is not None and desc_elem.text:
description = self._clean_html(self._clean_cdata(desc_elem.text))
# 解析时间
pub_time = None
if pub_date_elem is not None and pub_date_elem.text:
pub_time = self._parse_rss_date(self._clean_cdata(pub_date_elem.text))
# 链接
link = ''
if link_elem is not None and link_elem.text:
link = self._clean_cdata(link_elem.text)
news_list.append({
'title': title,
'description': description[:500],
'time': pub_time,
'time_str': pub_time.strftime('%m-%d %H:%M') if pub_time else '',
'link': link,
'source': 'CoinDesk'
})
logger.info(f"获取到 {len(news_list)} 条 CoinDesk 新闻")
return news_list
except Exception as e:
logger.error(f"获取 CoinDesk 失败: {e}")
return []
def _clean_cdata(self, text: str) -> str: def _clean_cdata(self, text: str) -> str:
"""清理 CDATA 标记""" """清理 CDATA 标记"""
if not text: if not text:

71
scripts/test_crypto_news.py Executable file
View File

@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""
测试加密货币新闻获取多源聚合
"""
import asyncio
import sys
import os
# 确保路径正确
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
backend_dir = os.path.join(project_root, 'backend')
sys.path.insert(0, backend_dir)
from app.services.news_service import get_news_service
async def main():
print("=" * 60)
print("📰 测试加密货币新闻获取(多源聚合)")
print("=" * 60)
news_service = get_news_service()
# 获取最新新闻
print("\n🔍 获取最新加密货币新闻...")
news_list = await news_service.get_latest_news(limit=30)
print(f"\n✅ 获取到 {len(news_list)} 条新闻\n")
# 按来源分组统计
sources = {}
for news in news_list:
source = news.get('source', 'Unknown')
sources[source] = sources.get(source, 0) + 1
print("📊 新闻来源统计:")
for source, count in sources.items():
print(f" {source}: {count}")
# 显示最新10条新闻
print("\n" + "=" * 60)
print("📰 最新 10 条新闻")
print("=" * 60)
for i, news in enumerate(news_list[:10], 1):
time_str = news.get('time_str', '')
title = news.get('title', '')
source = news.get('source', '')
desc = news.get('description', '')[:100]
print(f"\n{i}. [{time_str}] {source}")
print(f" {title}")
if desc:
print(f" {desc}...")
# 测试格式化给 LLM
print("\n" + "=" * 60)
print("🤖 格式化给 LLM 的新闻")
print("=" * 60)
formatted_news = news_service.format_news_for_llm(news_list[:5], max_items=5)
print(formatted_news)
print("\n" + "=" * 60)
print("✅ 测试完成")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())