#!/usr/bin/env python3 """ 手动执行新闻抓取和分析脚本 使用方法: # 抓取所有新闻 python scripts/manual_news_fetch.py # 只抓取加密货币新闻 python scripts/manual_news_fetch.py --category crypto # 只抓取股票新闻 python scripts/manual_news_fetch.py --category stock # 不使用 LLM 分析(仅关键词过滤) python scripts/manual_news_fetch.py --no-llm # 显示详细输出 python scripts/manual_news_fetch.py --verbose """ import asyncio import argparse import sys import os from pathlib import Path # 添加项目路径 script_dir = Path(__file__).parent project_root = script_dir.parent backend_dir = project_root / "backend" sys.path.insert(0, str(backend_dir)) # 切换到 backend 目录作为工作目录 os.chdir(backend_dir) from app.utils.logger import logger from app.news_agent.news_agent import get_news_agent from app.news_agent.fetcher import NewsFetcher, NewsItem from app.news_agent.filter import NewsDeduplicator, NewsFilter from app.news_agent.analyzer import NewsAnalyzer, NewsAnalyzerSimple from app.news_agent.news_db_service import get_news_db_service from app.news_agent.notifier import get_news_notifier async def main(): parser = argparse.ArgumentParser(description="手动执行新闻抓取和分析") parser.add_argument( "--category", choices=["crypto", "stock"], help="指定新闻分类(默认:全部)" ) parser.add_argument( "--no-llm", action="store_true", help="不使用 LLM 分析,仅使用关键词过滤" ) parser.add_argument( "--no-notify", action="store_true", help="不发送飞书通知" ) parser.add_argument( "--verbose", action="store_true", help="显示详细输出" ) parser.add_argument( "--limit", type=int, default=50, help="每个源最多抓取数量(默认:50)" ) parser.add_argument( "--analyze-existing", action="store_true", help="分析数据库中未分析的文章(不抓取新新闻)" ) args = parser.parse_args() print("=" * 70) print("📰 新闻智能体 - 手动抓取模式") print("=" * 70) print(f"📅 时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if args.category: print(f"📂 分类: {args.category}") else: print(f"📂 分类: 全部") print(f"🤖 LLM 分析: {'禁用' if args.no_llm else '启用'}") print(f"📢 通知: {'禁用' if args.no_notify else '启用'}") print("=" * 70) print() # 初始化组件 fetcher = NewsFetcher() deduplicator = NewsDeduplicator() filter = NewsFilter() db_service = get_news_db_service() notifier = get_news_notifier() # 选择分析器 if args.no_llm: analyzer = NewsAnalyzerSimple() print("使用规则分析器(关键词匹配)") else: analyzer = NewsAnalyzer() print("使用 LLM 分析器") try: # 如果是分析已有文章模式 if args.analyze_existing: print("\n" + "─" * 70) print("📂 分析数据库中未分析的文章") print("─" * 70) unanalyzed = db_service.get_unanalyzed_articles(limit=args.limit, hours=48) print(f"✅ 找到 {len(unanalyzed)} 条未分析的文章") if not unanalyzed: print("没有需要分析的文章") return # 转换为 NewsItem 对象 saved_articles = [] for article in unanalyzed: # 创建临时 NewsItem item = NewsItem( title=article.title, content=article.content or "", url=article.url, source=article.source, category=article.category, published_at=article.published_at, crawled_at=article.crawled_at, content_hash=article.content_hash, author=article.author, tags=article.tags ) # 附加属性 item.quality_score = article.quality_score or 0.5 saved_articles.append((article, item)) else: # 1. 抓取新闻 print("\n" + "─" * 70) print("📡 第一步: 抓取新闻") print("─" * 70) items = await fetcher.fetch_all_news(category=args.category) if not items: print("❌ 没有获取到任何新闻") return print(f"✅ 获取到 {len(items)} 条新闻") if args.verbose: for i, item in enumerate(items[:10], 1): print(f" {i}. [{item.category}] {item.title[:60]}...") # 2. 去重 print("\n" + "─" * 70) print("🔍 第二步: 去重") print("─" * 70) items = deduplicator.deduplicate_list(items) print(f"✅ 去重后剩余 {len(items)} 条") # 3. 过滤 print("\n" + "─" * 70) print("⚙️ 第三步: 关键词过滤") print("─" * 70) filtered_items = filter.filter_news(items, min_quality=0.3) print(f"✅ 过滤后剩余 {len(filtered_items)} 条") if args.verbose and filtered_items: print("\n通过过滤的新闻:") for i, item in enumerate(filtered_items[:10], 1): impact_score = getattr(item, 'impact_score', 0) quality_score = getattr(item, 'quality_score', 0) print(f" {i}. [{item.category}] 影响:{impact_score:.1f} 质量:{quality_score:.2f}") print(f" {item.title[:60]}...") # 4. 保存到数据库 print("\n" + "─" * 70) print("💾 第四步: 保存到数据库") print("─" * 70) saved_articles = [] for item in filtered_items: # 检查是否已存在 if db_service.check_duplicate_by_hash(item.content_hash): if args.verbose: print(f" ⊗ 已存在: {item.title[:50]}...") continue # 保存 article_data = { 'title': item.title, 'content': item.content, 'url': item.url, 'source': item.source, 'author': item.author, 'category': item.category, 'tags': item.tags, 'published_at': item.published_at, 'crawled_at': item.crawled_at, 'content_hash': item.content_hash, 'quality_score': getattr(item, 'quality_score', 0.5), } article = db_service.save_article(article_data) if article: saved_articles.append((article, item)) if args.verbose: print(f" ✓ 已保存: {item.title[:50]}...") if not args.analyze_existing: print(f"✅ 保存了 {len(saved_articles)} 条新文章") if not saved_articles: print("\n没有新文章需要处理") return # 5. LLM 分析 print("\n" + "─" * 70) print("🤖 第五步: AI 分析") print("─" * 70) analyzed_count = 0 high_priority_articles = [] # 批量分析 batch_size = 10 if not args.no_llm else 20 for i in range(0, len(saved_articles), batch_size): batch = saved_articles[i:i + batch_size] if args.no_llm: # 规则分析 for article, item in batch: result = analyzer.analyze_single(item) priority = result.get('confidence', 50) db_service.mark_as_analyzed(article.id, result, priority) analyzed_count += 1 print(f" ✓ [{result['market_impact']}] {article.title[:50]}...") print(f" 情绪: {result['sentiment']} | 相关: {', '.join(result.get('relevant_symbols', [])[:3])}") if priority >= 40: # 构造包含分析结果的完整字典 article_dict = article.to_dict() article_dict.update({ 'llm_analyzed': True, 'market_impact': result.get('market_impact'), 'impact_type': result.get('impact_type'), 'sentiment': result.get('sentiment'), 'summary': result.get('summary'), 'key_points': result.get('key_points'), 'trading_advice': result.get('trading_advice'), 'relevant_symbols': result.get('relevant_symbols'), 'priority': priority, }) high_priority_articles.append(article_dict) else: # LLM 批量分析 items_to_analyze = [item for _, item in batch] results = analyzer.analyze_batch(items_to_analyze) for (article, _), result in zip(batch, results): if result: priority = analyzer.calculate_priority( result, getattr(article, 'quality_score', 0.5) ) db_service.mark_as_analyzed(article.id, result, priority) analyzed_count += 1 print(f" ✓ [{result['market_impact']}] {article.title[:50]}...") print(f" {result.get('summary', 'N/A')[:60]}...") if priority >= 40: # 构造包含分析结果的完整字典 article_dict = article.to_dict() # 更新为最新的分析结果 article_dict.update({ 'llm_analyzed': True, 'market_impact': result.get('market_impact'), 'impact_type': result.get('impact_type'), 'sentiment': result.get('sentiment'), 'summary': result.get('summary'), 'key_points': result.get('key_points'), 'trading_advice': result.get('trading_advice'), 'relevant_symbols': result.get('relevant_symbols'), 'priority': priority, }) high_priority_articles.append(article_dict) print(f"\n✅ 分析了 {analyzed_count} 条文章") print(f"🔥 发现 {len(high_priority_articles)} 条高优先级新闻") # 6. 发送通知 if not args.no_notify and high_priority_articles: print("\n" + "─" * 70) print("📢 第六步: 发送通知") print("─" * 70) # 按优先级排序 high_priority_articles.sort( key=lambda x: x.get('priority', 0), reverse=True ) # 如果只有1-2条,单独发送;否则批量发送 if len(high_priority_articles) <= 2: for article in high_priority_articles: await notifier.notify_single_news(article) db_service.mark_as_notified(article['id']) print(f" ✓ 已发送: {article['title'][:50]}...") else: await notifier.notify_news_batch(high_priority_articles[:10]) for article in high_priority_articles[:10]: db_service.mark_as_notified(article['id']) print(f" ✓ 已发送批量通知 ({len(high_priority_articles[:10])} 条)") # 7. 统计报告 print("\n" + "=" * 70) print("📊 处理完成 - 统计报告") print("=" * 70) db_stats = db_service.get_stats(hours=24) print(f"📰 最近24小时:") print(f" • 总文章数: {db_stats['total_articles']}") print(f" • 已分析: {db_stats['analyzed']}") print(f" • 高影响: {db_stats['high_impact']}") print(f" • 已通知: {db_stats['notified']}") print("\n" + "=" * 70) except Exception as e: print(f"\n❌ 执行失败: {e}") import traceback traceback.print_exc() return 1 finally: await fetcher.close() return 0 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)