#!/usr/bin/env python3 """ 测试新闻通知格式 """ import asyncio import sys from pathlib import Path # 添加项目路径 script_dir = Path(__file__).parent project_root = script_dir.parent backend_dir = project_root / "backend" sys.path.insert(0, str(backend_dir)) import os os.chdir(backend_dir) from app.news_agent.notifier import get_news_notifier # 模拟高影响新闻数据 test_articles = [ { 'id': 1, 'title': 'Bitcoin ETFs See Record $500M Inflows as Institutions Pile In', 'source': 'CoinDesk', 'category': 'crypto', 'url': 'https://example.com/article1', 'market_impact': 'high', 'impact_type': 'bullish', 'sentiment': 'positive', 'summary': '比特币现货ETF昨日吸引5亿美元资金流入,创历史新高,显示机构投资者持续增持。', 'key_points': [ '贝莱德IBIT录得3亿美元流入', '富达FBTC流入1.5亿美元', '机构持仓占比超过60%' ], 'trading_advice': '建议持有BTC,关注回调后的买入机会', 'relevant_symbols': ['BTC', 'IBIT', 'FBTC'], 'priority': 85, 'created_at': '2026-02-25T12:00:00' }, { 'id': 2, 'title': 'SEC Delays Decision on Ethereum ETF Options Listings', 'source': 'Bloomberg', 'category': 'crypto', 'url': 'https://example.com/article2', 'market_impact': 'medium', 'impact_type': 'neutral', 'sentiment': 'neutral', 'summary': 'SEC再次推迟对以太坊ETF期权上市的决议,新的截止日期为4月底。', 'key_points': [ 'SEC引用需要额外审查时间', '这是第三次推迟', '市场反应温和' ], 'trading_advice': 'ETH持仓观望,等待ETF期权批准', 'relevant_symbols': ['ETH', 'ETHA'], 'priority': 55, 'created_at': '2026-02-25T11:30:00' }, { 'id': 3, 'title': 'NVIDIA Surpasses $4 Trillion Market Cap Amid AI Chip Demand', 'source': 'CNBC', 'category': 'stock', 'url': 'https://example.com/article3', 'market_impact': 'high', 'impact_type': 'bullish', 'sentiment': 'positive', 'summary': '英伟达市值突破4万亿美元大关,成为全球市值最高的公司,AI芯片需求持续爆发。', 'key_points': [ '股价上涨5%至每股1600美元', 'H100芯片供不应求', '数据中心收入同比增长300%' ], 'trading_advice': '建议继续持有NVDA,AI趋势未完', 'relevant_symbols': ['NVDA', 'AMD'], 'priority': 80, 'created_at': '2026-02-25T10:15:00' } ] async def main(): print("=" * 70) print("🧪 测试新闻通知格式") print("=" * 70) print() notifier = get_news_notifier() print("📊 测试数据:") for i, article in enumerate(test_articles, 1): print(f" {i}. [{article['market_impact']}] {article['title'][:50]}...") print(f" 摘要: {article['summary'][:50]}...") print(f" 建议: {article['trading_advice']}") print() print("📢 发送测试通知...") result = await notifier.notify_news_batch(test_articles) if result: print("✅ 通知发送成功!") else: print("❌ 通知发送失败") print() print("=" * 70) return 0 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)