stock-ai-agent/backend/app/news_agent/notifier.py
2026-02-25 19:59:20 +08:00

308 lines
9.7 KiB
Python

"""
新闻通知模块 - 发送飞书卡片通知
"""
from typing import Dict, Any, List
from app.utils.logger import logger
from app.services.feishu_service import get_feishu_news_service
class NewsNotifier:
"""新闻通知器"""
def __init__(self):
self.feishu = get_feishu_news_service()
def _get_emoji_for_impact(self, impact: str) -> str:
"""根据影响级别获取表情符号"""
emoji_map = {
'high': '🔴',
'medium': '🟡',
'low': '🟢'
}
return emoji_map.get(impact, '📰')
def _get_emoji_for_impact_type(self, impact_type: str) -> str:
"""根据影响类型获取表情符号"""
emoji_map = {
'bullish': '📈',
'bearish': '📉',
'neutral': '➡️'
}
return emoji_map.get(impact_type, '📊')
def _get_color_for_impact(self, impact: str) -> str:
"""根据影响级别获取颜色"""
color_map = {
'high': 'red',
'medium': 'orange',
'low': 'blue'
}
return color_map.get(impact, 'grey')
async def notify_single_news(self, article: Dict[str, Any]) -> bool:
"""
发送单条新闻通知
Args:
article: 文章数据(包含分析结果)
Returns:
是否发送成功
"""
try:
impact = article.get('market_impact', 'low')
impact_type = article.get('impact_type', 'neutral')
title = article.get('title', '')
summary = article.get('summary', '')
source = article.get('source', '')
category = article.get('category', '')
url = article.get('url', '')
trading_advice = article.get('trading_advice', '')
relevant_symbols = article.get('relevant_symbols', [])
key_points = article.get('key_points', [])
# 标题
impact_emoji = self._get_emoji_for_impact(impact)
type_emoji = self._get_emoji_for_impact_type(impact_type)
category_text = '加密货币' if category == 'crypto' else '股票'
card_title = f"{impact_emoji} {type_emoji} 市场快讯 - {category_text}"
# 内容
content_parts = [
f"**来源**: {source}",
f"**标题**: {title}",
"",
f"**摘要**: {summary}",
]
# 关键点
if key_points:
content_parts.append("")
content_parts.append("**关键点**:")
for point in key_points[:3]:
content_parts.append(f"{point}")
# 交易建议
if trading_advice:
content_parts.append("")
content_parts.append(f"**交易建议**: {trading_advice}")
# 相关代码
if relevant_symbols:
symbols_text = " ".join(relevant_symbols)
content_parts.append("")
content_parts.append(f"**相关**: {symbols_text}")
# 链接
if url:
content_parts.append("")
content_parts.append(f"[查看原文]({url})")
# 影响
impact_map = {'high': '重大影响', 'medium': '中等影响', 'low': '轻微影响'}
content_parts.append("")
content_parts.append(f"**影响**: {impact_map.get(impact, '未知')}")
# 颜色
color = self._get_color_for_impact(impact)
# 发送
content = "\n".join(content_parts)
await self.feishu.send_card(card_title, content, color)
logger.info(f"新闻通知已发送: {title[:50]}...")
return True
except Exception as e:
logger.error(f"发送新闻通知失败: {e}")
return False
async def notify_news_batch(self, articles: List[Dict[str, Any]]) -> bool:
"""
发送批量新闻通知(详细模式)
Args:
articles: 文章列表
Returns:
是否发送成功
"""
try:
if not articles:
return False
# 只显示重大影响新闻
high_impact = [a for a in articles if a.get('market_impact') == 'high']
if not high_impact:
logger.info("没有重大影响新闻,跳过通知")
return False
title = f"🔴 重大市场新闻 ({len(high_impact)} 条)"
content_parts = []
# 获取时间(显示时分)
created_time = high_impact[0].get('created_at', '')
if created_time:
# 格式: 2026-02-25T12:30:45 -> 02-25 12:30
try:
dt = created_time[:16].replace('T', ' ')
content_parts.append(f"**时间**: {dt}")
except:
content_parts.append(f"**时间**: {created_time[:10]}")
# 只显示重大影响新闻
for i, article in enumerate(high_impact[:5]):
impact_type = article.get('impact_type', 'neutral')
emoji = self._get_emoji_for_impact_type(impact_type)
# 每条新闻之间空一行
if i > 0:
content_parts.append("")
# 构建单条新闻的所有内容
article_lines = []
# 标题
title_text = article.get('title', '')
article_lines.append(f"{emoji} **{title_text}**")
# 来源
source = article.get('source', '')
if source:
article_lines.append(f"📰 来源: {source}")
# 新闻内容(摘要)
summary = article.get('summary', '')
content = article.get('content', '')
if summary:
article_lines.append(f"📝 {summary[:100]}")
elif content:
article_lines.append(f"📝 {content[:100]}")
# 影响和建议
impact_desc = {
'bullish': '📈 利好',
'bearish': '📉 利空',
'neutral': '➡️ 中性'
}.get(impact_type, '➡️ 中性')
advice = article.get('trading_advice', '')
if advice:
article_lines.append(f"{impact_desc} | 💡 {advice}")
# 相关代码和链接
extra_info = []
symbols = article.get('relevant_symbols', [])
if symbols and isinstance(symbols, list):
extra_info.append(f"🔗 {' '.join(symbols[:4])}")
url = article.get('url', '')
if url:
extra_info.append(f"🔎 [查看原文]({url})")
if extra_info:
article_lines.append(" ".join(extra_info))
# 将这条新闻的所有内容合并为一行
content_parts.append(" | ".join(article_lines))
content = "\n".join(content_parts)
await self.feishu.send_card(title, content, "red")
logger.info(f"重大新闻通知已发送: {len(high_impact)}")
return True
except Exception as e:
logger.error(f"发送批量新闻通知失败: {e}")
import traceback
traceback.print_exc()
return False
async def notify_startup(self, config: Dict[str, Any]) -> bool:
"""
发送启动通知
Args:
config: 配置信息
Returns:
是否发送成功
"""
try:
crypto_sources = config.get('crypto_sources', 0)
stock_sources = config.get('stock_sources', 0)
interval = config.get('fetch_interval', 30)
title = "📰 新闻智能体已启动"
content_parts = [
f"🤖 **功能**: 实时新闻监控与分析",
f"",
f"📊 **监控来源**:",
f" • 加密货币: {crypto_sources}",
f" • 股票: {stock_sources}",
f"",
f"⏱️ **抓取频率**: 每 {interval}",
f"",
f"🎯 **分析能力**:",
f" • LLM 智能分析",
f" • 市场影响评估",
f" • 交易建议生成",
f"",
f"📢 **通知策略**: 仅推送高影响新闻"
]
content = "\n".join(content_parts)
await self.feishu.send_card(title, content, "green")
logger.info("新闻智能体启动通知已发送")
return True
except Exception as e:
logger.error(f"发送启动通知失败: {e}")
return False
async def notify_error(self, error_message: str) -> bool:
"""
发送错误通知
Args:
error_message: 错误信息
Returns:
是否发送成功
"""
try:
title = "⚠️ 新闻智能体异常"
content = f"""
**错误信息**: {error_message}
**建议操作**:
1. 检查网络连接
2. 查看日志文件
3. 必要时重启服务
"""
await self.feishu.send_card(title, content, "red")
return True
except Exception as e:
logger.error(f"发送错误通知失败: {e}")
return False
# 全局实例
_news_notifier = None
def get_news_notifier() -> NewsNotifier:
"""获取新闻通知器单例"""
global _news_notifier
if _news_notifier is None:
_news_notifier = NewsNotifier()
return _news_notifier