""" 新闻通知模块 - 发送飞书卡片通知 """ from typing import Dict, Any, List from app.utils.logger import logger from app.services.feishu_service import get_feishu_news_service class NewsNotifier: """新闻通知器""" def __init__(self): self.feishu = get_feishu_news_service() def _get_emoji_for_impact(self, impact: str) -> str: """根据影响级别获取表情符号""" emoji_map = { 'high': '🔴', 'medium': '🟡', 'low': '🟢' } return emoji_map.get(impact, '📰') def _get_emoji_for_impact_type(self, impact_type: str) -> str: """根据影响类型获取表情符号""" emoji_map = { 'bullish': '📈', 'bearish': '📉', 'neutral': '➡️' } return emoji_map.get(impact_type, '📊') def _get_color_for_impact(self, impact: str) -> str: """根据影响级别获取颜色""" color_map = { 'high': 'red', 'medium': 'orange', 'low': 'blue' } return color_map.get(impact, 'grey') async def notify_single_news(self, article: Dict[str, Any]) -> bool: """ 发送单条新闻通知 Args: article: 文章数据(包含分析结果) Returns: 是否发送成功 """ try: impact = article.get('market_impact', 'low') impact_type = article.get('impact_type', 'neutral') title = article.get('title', '') summary = article.get('summary', '') source = article.get('source', '') category = article.get('category', '') url = article.get('url', '') trading_advice = article.get('trading_advice', '') relevant_symbols = article.get('relevant_symbols', []) key_points = article.get('key_points', []) # 标题 impact_emoji = self._get_emoji_for_impact(impact) type_emoji = self._get_emoji_for_impact_type(impact_type) category_text = '加密货币' if category == 'crypto' else '股票' card_title = f"{impact_emoji} {type_emoji} 市场快讯 - {category_text}" # 内容 content_parts = [ f"**来源**: {source}", f"**标题**: {title}", "", f"**摘要**: {summary}", ] # 关键点 if key_points: content_parts.append("") content_parts.append("**关键点**:") for point in key_points[:3]: content_parts.append(f"• {point}") # 交易建议 if trading_advice: content_parts.append("") content_parts.append(f"**交易建议**: {trading_advice}") # 相关代码 if relevant_symbols: symbols_text = " ".join(relevant_symbols) content_parts.append("") content_parts.append(f"**相关**: {symbols_text}") # 链接 if url: content_parts.append("") content_parts.append(f"[查看原文]({url})") # 影响 impact_map = {'high': '重大影响', 'medium': '中等影响', 'low': '轻微影响'} content_parts.append("") content_parts.append(f"**影响**: {impact_map.get(impact, '未知')}") # 颜色 color = self._get_color_for_impact(impact) # 发送 content = "\n".join(content_parts) await self.feishu.send_card(card_title, content, color) logger.info(f"新闻通知已发送: {title[:50]}...") return True except Exception as e: logger.error(f"发送新闻通知失败: {e}") return False async def notify_news_batch(self, articles: List[Dict[str, Any]]) -> bool: """ 发送批量新闻通知(详细模式) Args: articles: 文章列表 Returns: 是否发送成功 """ try: if not articles: return False # 只显示重大影响新闻 high_impact = [a for a in articles if a.get('market_impact') == 'high'] if not high_impact: logger.info("没有重大影响新闻,跳过通知") return False title = f"🔴 重大市场新闻 ({len(high_impact)} 条)" content_parts = [] # 获取时间(显示时分) created_time = high_impact[0].get('created_at', '') if created_time: # 格式: 2026-02-25T12:30:45 -> 02-25 12:30 try: dt = created_time[:16].replace('T', ' ') content_parts.append(f"**时间**: {dt}") except: content_parts.append(f"**时间**: {created_time[:10]}") # 只显示重大影响新闻 for i, article in enumerate(high_impact[:5]): impact_type = article.get('impact_type', 'neutral') emoji = self._get_emoji_for_impact_type(impact_type) # 每条新闻之间空一行 if i > 0: content_parts.append("") # 构建单条新闻的所有内容 article_lines = [] # 标题 title_text = article.get('title', '') article_lines.append(f"{emoji} **{title_text}**") # 来源 source = article.get('source', '') if source: article_lines.append(f"📰 来源: {source}") # 新闻内容(摘要) summary = article.get('summary', '') content = article.get('content', '') if summary: article_lines.append(f"📝 {summary[:100]}") elif content: article_lines.append(f"📝 {content[:100]}") # 影响和建议 impact_desc = { 'bullish': '📈 利好', 'bearish': '📉 利空', 'neutral': '➡️ 中性' }.get(impact_type, '➡️ 中性') advice = article.get('trading_advice', '') if advice: article_lines.append(f"{impact_desc} | 💡 {advice}") # 相关代码和链接 extra_info = [] symbols = article.get('relevant_symbols', []) if symbols and isinstance(symbols, list): extra_info.append(f"🔗 {' '.join(symbols[:4])}") url = article.get('url', '') if url: extra_info.append(f"🔎 [查看原文]({url})") if extra_info: article_lines.append(" ".join(extra_info)) # 将这条新闻的所有内容合并为一行 content_parts.append(" | ".join(article_lines)) content = "\n".join(content_parts) await self.feishu.send_card(title, content, "red") logger.info(f"重大新闻通知已发送: {len(high_impact)} 条") return True except Exception as e: logger.error(f"发送批量新闻通知失败: {e}") import traceback traceback.print_exc() return False async def notify_startup(self, config: Dict[str, Any]) -> bool: """ 发送启动通知 Args: config: 配置信息 Returns: 是否发送成功 """ try: crypto_sources = config.get('crypto_sources', 0) stock_sources = config.get('stock_sources', 0) interval = config.get('fetch_interval', 30) title = "📰 新闻智能体已启动" content_parts = [ f"🤖 **功能**: 实时新闻监控与分析", f"", f"📊 **监控来源**:", f" • 加密货币: {crypto_sources} 个", f" • 股票: {stock_sources} 个", f"", f"⏱️ **抓取频率**: 每 {interval} 秒", f"", f"🎯 **分析能力**:", f" • LLM 智能分析", f" • 市场影响评估", f" • 交易建议生成", f"", f"📢 **通知策略**: 仅推送高影响新闻" ] content = "\n".join(content_parts) await self.feishu.send_card(title, content, "green") logger.info("新闻智能体启动通知已发送") return True except Exception as e: logger.error(f"发送启动通知失败: {e}") return False async def notify_error(self, error_message: str) -> bool: """ 发送错误通知 Args: error_message: 错误信息 Returns: 是否发送成功 """ try: title = "⚠️ 新闻智能体异常" content = f""" **错误信息**: {error_message} **建议操作**: 1. 检查网络连接 2. 查看日志文件 3. 必要时重启服务 """ await self.feishu.send_card(title, content, "red") return True except Exception as e: logger.error(f"发送错误通知失败: {e}") return False # 全局实例 _news_notifier = None def get_news_notifier() -> NewsNotifier: """获取新闻通知器单例""" global _news_notifier if _news_notifier is None: _news_notifier = NewsNotifier() return _news_notifier