stock-ai-agent/scripts/manual_news_fetch.py
2026-02-25 19:59:20 +08:00

358 lines
13 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
手动执行新闻抓取和分析脚本
使用方法:
# 抓取所有新闻
python scripts/manual_news_fetch.py
# 只抓取加密货币新闻
python scripts/manual_news_fetch.py --category crypto
# 只抓取股票新闻
python scripts/manual_news_fetch.py --category stock
# 不使用 LLM 分析(仅关键词过滤)
python scripts/manual_news_fetch.py --no-llm
# 显示详细输出
python scripts/manual_news_fetch.py --verbose
"""
import asyncio
import argparse
import sys
import os
from pathlib import Path
# 添加项目路径
script_dir = Path(__file__).parent
project_root = script_dir.parent
backend_dir = project_root / "backend"
sys.path.insert(0, str(backend_dir))
# 切换到 backend 目录作为工作目录
os.chdir(backend_dir)
from app.utils.logger import logger
from app.news_agent.news_agent import get_news_agent
from app.news_agent.fetcher import NewsFetcher, NewsItem
from app.news_agent.filter import NewsDeduplicator, NewsFilter
from app.news_agent.analyzer import NewsAnalyzer, NewsAnalyzerSimple
from app.news_agent.news_db_service import get_news_db_service
from app.news_agent.notifier import get_news_notifier
async def main():
parser = argparse.ArgumentParser(description="手动执行新闻抓取和分析")
parser.add_argument(
"--category",
choices=["crypto", "stock"],
help="指定新闻分类(默认:全部)"
)
parser.add_argument(
"--no-llm",
action="store_true",
help="不使用 LLM 分析,仅使用关键词过滤"
)
parser.add_argument(
"--no-notify",
action="store_true",
help="不发送飞书通知"
)
parser.add_argument(
"--verbose",
action="store_true",
help="显示详细输出"
)
parser.add_argument(
"--limit",
type=int,
default=50,
help="每个源最多抓取数量默认50"
)
parser.add_argument(
"--analyze-existing",
action="store_true",
help="分析数据库中未分析的文章(不抓取新新闻)"
)
args = parser.parse_args()
print("=" * 70)
print("📰 新闻智能体 - 手动抓取模式")
print("=" * 70)
print(f"📅 时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if args.category:
print(f"📂 分类: {args.category}")
else:
print(f"📂 分类: 全部")
print(f"🤖 LLM 分析: {'禁用' if args.no_llm else '启用'}")
print(f"📢 通知: {'禁用' if args.no_notify else '启用'}")
print("=" * 70)
print()
# 初始化组件
fetcher = NewsFetcher()
deduplicator = NewsDeduplicator()
filter = NewsFilter()
db_service = get_news_db_service()
notifier = get_news_notifier()
# 选择分析器
if args.no_llm:
analyzer = NewsAnalyzerSimple()
print("使用规则分析器(关键词匹配)")
else:
analyzer = NewsAnalyzer()
print("使用 LLM 分析器")
try:
# 如果是分析已有文章模式
if args.analyze_existing:
print("\n" + "" * 70)
print("📂 分析数据库中未分析的文章")
print("" * 70)
unanalyzed = db_service.get_unanalyzed_articles(limit=args.limit, hours=48)
print(f"✅ 找到 {len(unanalyzed)} 条未分析的文章")
if not unanalyzed:
print("没有需要分析的文章")
return
# 转换为 NewsItem 对象
saved_articles = []
for article in unanalyzed:
# 创建临时 NewsItem
item = NewsItem(
title=article.title,
content=article.content or "",
url=article.url,
source=article.source,
category=article.category,
published_at=article.published_at,
crawled_at=article.crawled_at,
content_hash=article.content_hash,
author=article.author,
tags=article.tags
)
# 附加属性
item.quality_score = article.quality_score or 0.5
saved_articles.append((article, item))
else:
# 1. 抓取新闻
print("\n" + "" * 70)
print("📡 第一步: 抓取新闻")
print("" * 70)
items = await fetcher.fetch_all_news(category=args.category)
if not items:
print("❌ 没有获取到任何新闻")
return
print(f"✅ 获取到 {len(items)} 条新闻")
if args.verbose:
for i, item in enumerate(items[:10], 1):
print(f" {i}. [{item.category}] {item.title[:60]}...")
# 2. 去重
print("\n" + "" * 70)
print("🔍 第二步: 去重")
print("" * 70)
items = deduplicator.deduplicate_list(items)
print(f"✅ 去重后剩余 {len(items)}")
# 3. 过滤
print("\n" + "" * 70)
print("⚙️ 第三步: 关键词过滤")
print("" * 70)
filtered_items = filter.filter_news(items, min_quality=0.3)
print(f"✅ 过滤后剩余 {len(filtered_items)}")
if args.verbose and filtered_items:
print("\n通过过滤的新闻:")
for i, item in enumerate(filtered_items[:10], 1):
impact_score = getattr(item, 'impact_score', 0)
quality_score = getattr(item, 'quality_score', 0)
print(f" {i}. [{item.category}] 影响:{impact_score:.1f} 质量:{quality_score:.2f}")
print(f" {item.title[:60]}...")
# 4. 保存到数据库
print("\n" + "" * 70)
print("💾 第四步: 保存到数据库")
print("" * 70)
saved_articles = []
for item in filtered_items:
# 检查是否已存在
if db_service.check_duplicate_by_hash(item.content_hash):
if args.verbose:
print(f" ⊗ 已存在: {item.title[:50]}...")
continue
# 保存
article_data = {
'title': item.title,
'content': item.content,
'url': item.url,
'source': item.source,
'author': item.author,
'category': item.category,
'tags': item.tags,
'published_at': item.published_at,
'crawled_at': item.crawled_at,
'content_hash': item.content_hash,
'quality_score': getattr(item, 'quality_score', 0.5),
}
article = db_service.save_article(article_data)
if article:
saved_articles.append((article, item))
if args.verbose:
print(f" ✓ 已保存: {item.title[:50]}...")
if not args.analyze_existing:
print(f"✅ 保存了 {len(saved_articles)} 条新文章")
if not saved_articles:
print("\n没有新文章需要处理")
return
# 5. LLM 分析
print("\n" + "" * 70)
print("🤖 第五步: AI 分析")
print("" * 70)
analyzed_count = 0
high_priority_articles = []
# 批量分析
batch_size = 10 if not args.no_llm else 20
for i in range(0, len(saved_articles), batch_size):
batch = saved_articles[i:i + batch_size]
if args.no_llm:
# 规则分析
for article, item in batch:
result = analyzer.analyze_single(item)
priority = result.get('confidence', 50)
db_service.mark_as_analyzed(article.id, result, priority)
analyzed_count += 1
print(f" ✓ [{result['market_impact']}] {article.title[:50]}...")
print(f" 情绪: {result['sentiment']} | 相关: {', '.join(result.get('relevant_symbols', [])[:3])}")
if priority >= 40:
# 构造包含分析结果的完整字典
article_dict = article.to_dict()
article_dict.update({
'llm_analyzed': True,
'market_impact': result.get('market_impact'),
'impact_type': result.get('impact_type'),
'sentiment': result.get('sentiment'),
'summary': result.get('summary'),
'key_points': result.get('key_points'),
'trading_advice': result.get('trading_advice'),
'relevant_symbols': result.get('relevant_symbols'),
'priority': priority,
})
high_priority_articles.append(article_dict)
else:
# LLM 批量分析
items_to_analyze = [item for _, item in batch]
results = analyzer.analyze_batch(items_to_analyze)
for (article, _), result in zip(batch, results):
if result:
priority = analyzer.calculate_priority(
result,
getattr(article, 'quality_score', 0.5)
)
db_service.mark_as_analyzed(article.id, result, priority)
analyzed_count += 1
print(f" ✓ [{result['market_impact']}] {article.title[:50]}...")
print(f" {result.get('summary', 'N/A')[:60]}...")
if priority >= 40:
# 构造包含分析结果的完整字典
article_dict = article.to_dict()
# 更新为最新的分析结果
article_dict.update({
'llm_analyzed': True,
'market_impact': result.get('market_impact'),
'impact_type': result.get('impact_type'),
'sentiment': result.get('sentiment'),
'summary': result.get('summary'),
'key_points': result.get('key_points'),
'trading_advice': result.get('trading_advice'),
'relevant_symbols': result.get('relevant_symbols'),
'priority': priority,
})
high_priority_articles.append(article_dict)
print(f"\n✅ 分析了 {analyzed_count} 条文章")
print(f"🔥 发现 {len(high_priority_articles)} 条高优先级新闻")
# 6. 发送通知
if not args.no_notify and high_priority_articles:
print("\n" + "" * 70)
print("📢 第六步: 发送通知")
print("" * 70)
# 按优先级排序
high_priority_articles.sort(
key=lambda x: x.get('priority', 0),
reverse=True
)
# 如果只有1-2条单独发送否则批量发送
if len(high_priority_articles) <= 2:
for article in high_priority_articles:
await notifier.notify_single_news(article)
db_service.mark_as_notified(article['id'])
print(f" ✓ 已发送: {article['title'][:50]}...")
else:
await notifier.notify_news_batch(high_priority_articles[:10])
for article in high_priority_articles[:10]:
db_service.mark_as_notified(article['id'])
print(f" ✓ 已发送批量通知 ({len(high_priority_articles[:10])} 条)")
# 7. 统计报告
print("\n" + "=" * 70)
print("📊 处理完成 - 统计报告")
print("=" * 70)
db_stats = db_service.get_stats(hours=24)
print(f"📰 最近24小时:")
print(f" • 总文章数: {db_stats['total_articles']}")
print(f" • 已分析: {db_stats['analyzed']}")
print(f" • 高影响: {db_stats['high_impact']}")
print(f" • 已通知: {db_stats['notified']}")
print("\n" + "=" * 70)
except Exception as e:
print(f"\n❌ 执行失败: {e}")
import traceback
traceback.print_exc()
return 1
finally:
await fetcher.close()
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)