stock-ai-agent/backend/app/services/news_service.py
2026-02-07 00:51:58 +08:00

273 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
新闻舆情服务 - 获取加密货币相关新闻
"""
import re
import html
import aiohttp
import xml.etree.ElementTree as ET
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from app.utils.logger import logger
class NewsService:
"""新闻舆情服务"""
# 律动快讯 RSS
BLOCKBEATS_RSS = "https://api.theblockbeats.news/v2/rss/newsflash"
def __init__(self):
"""初始化新闻服务"""
self._cache: List[Dict[str, Any]] = []
self._cache_time: Optional[datetime] = None
self._cache_duration = timedelta(minutes=5) # 缓存5分钟
logger.info("新闻舆情服务初始化完成")
async def get_latest_news(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
获取最新新闻
Args:
limit: 获取数量
Returns:
新闻列表
"""
# 检查缓存
if self._cache and self._cache_time:
if datetime.now() - self._cache_time < self._cache_duration:
return self._cache[:limit]
try:
news = await self._fetch_blockbeats_news()
self._cache = news
self._cache_time = datetime.now()
return news[:limit]
except Exception as e:
logger.error(f"获取新闻失败: {e}")
return self._cache[:limit] if self._cache else []
async def _fetch_blockbeats_news(self) -> List[Dict[str, Any]]:
"""获取律动快讯"""
news_list = []
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.BLOCKBEATS_RSS, timeout=10) as response:
if response.status != 200:
logger.error(f"获取律动快讯失败: HTTP {response.status}")
return []
content = await response.text()
# 解析 XML
root = ET.fromstring(content)
channel = root.find('channel')
if channel is None:
return []
for item in channel.findall('item'):
title_elem = item.find('title')
desc_elem = item.find('description')
pub_date_elem = item.find('pubDate')
link_elem = item.find('link')
if title_elem is None:
continue
# 提取标题
title = self._clean_cdata(title_elem.text or '')
# 提取描述(去除 HTML 标签)
description = ''
if desc_elem is not None and desc_elem.text:
description = self._clean_html(self._clean_cdata(desc_elem.text))
# 解析时间
pub_time = None
if pub_date_elem is not None and pub_date_elem.text:
pub_time = self._parse_rss_date(self._clean_cdata(pub_date_elem.text))
# 链接
link = ''
if link_elem is not None and link_elem.text:
link = self._clean_cdata(link_elem.text)
news_list.append({
'title': title,
'description': description[:500], # 限制长度
'time': pub_time,
'time_str': pub_time.strftime('%m-%d %H:%M') if pub_time else '',
'link': link,
'source': '律动BlockBeats'
})
logger.info(f"获取到 {len(news_list)} 条律动快讯")
return news_list
except Exception as e:
logger.error(f"解析律动快讯失败: {e}")
return []
def _clean_cdata(self, text: str) -> str:
"""清理 CDATA 标记"""
if not text:
return ''
# 移除 CDATA 包装
text = re.sub(r'<!\[CDATA\[(.*?)\]\]>', r'\1', text, flags=re.DOTALL)
return text.strip()
def _clean_html(self, text: str) -> str:
"""清理 HTML 标签"""
if not text:
return ''
# 移除 HTML 标签
text = re.sub(r'<[^>]+>', '', text)
# 解码 HTML 实体
text = html.unescape(text)
# 清理多余空白
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _parse_rss_date(self, date_str: str) -> Optional[datetime]:
"""解析 RSS 日期格式"""
if not date_str:
return None
# RSS 日期格式: "Sat, 07 Feb 2026 00:30:33 +0800"
formats = [
'%a, %d %b %Y %H:%M:%S %z',
'%a, %d %b %Y %H:%M:%S',
'%Y-%m-%d %H:%M:%S'
]
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
return None
def filter_relevant_news(self, news_list: List[Dict[str, Any]],
symbols: List[str] = None,
hours: int = 4) -> List[Dict[str, Any]]:
"""
过滤相关新闻
Args:
news_list: 新闻列表
symbols: 关注的交易对(如 ['BTCUSDT', 'ETHUSDT']
hours: 只保留最近几小时的新闻
Returns:
过滤后的新闻
"""
if not news_list:
return []
# 时间过滤
cutoff_time = datetime.now() - timedelta(hours=hours)
filtered = []
# 关键词映射
symbol_keywords = {
'BTCUSDT': ['比特币', 'BTC', 'Bitcoin'],
'ETHUSDT': ['以太坊', 'ETH', 'Ethereum'],
'BNBUSDT': ['BNB', 'Binance'],
'SOLUSDT': ['SOL', 'Solana'],
}
# 通用关键词(影响整体市场)
market_keywords = [
'市场', '行情', '反弹', '下跌', '暴跌', '暴涨', '清算',
'资金费率', '多单', '空单', '杠杆', '爆仓',
'美联储', 'Fed', '利率', '通胀',
'监管', 'SEC', 'ETF',
'鲸鱼', '巨鲸', '大户',
'交易所', 'Binance', 'Coinbase'
]
for news in news_list:
# 时间过滤
if news.get('time'):
# 处理带时区的时间
news_time = news['time']
if news_time.tzinfo:
news_time = news_time.replace(tzinfo=None)
if news_time < cutoff_time:
continue
title = news.get('title', '')
desc = news.get('description', '')
content = title + ' ' + desc
# 检查是否与关注的交易对相关
is_relevant = False
if symbols:
for symbol in symbols:
keywords = symbol_keywords.get(symbol, [])
for kw in keywords:
if kw.lower() in content.lower():
is_relevant = True
news['related_symbol'] = symbol
break
if is_relevant:
break
# 检查是否包含市场关键词
if not is_relevant:
for kw in market_keywords:
if kw.lower() in content.lower():
is_relevant = True
news['related_symbol'] = 'MARKET'
break
if is_relevant:
filtered.append(news)
return filtered
def format_news_for_llm(self, news_list: List[Dict[str, Any]],
max_items: int = 10) -> str:
"""
格式化新闻供 LLM 分析
Args:
news_list: 新闻列表
max_items: 最大条数
Returns:
格式化的新闻文本
"""
if not news_list:
return "暂无相关新闻"
lines = ["## 最新市场新闻\n"]
for i, news in enumerate(news_list[:max_items], 1):
time_str = news.get('time_str', '')
title = news.get('title', '')
desc = news.get('description', '')[:200] # 限制描述长度
lines.append(f"### {i}. [{time_str}] {title}")
if desc:
lines.append(f"{desc}")
lines.append("")
return "\n".join(lines)
# 全局实例
_news_service: Optional[NewsService] = None
def get_news_service() -> NewsService:
"""获取新闻服务实例"""
global _news_service
if _news_service is None:
_news_service = NewsService()
return _news_service