358 lines
13 KiB
Python
Executable File
358 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
手动执行新闻抓取和分析脚本
|
||
|
||
使用方法:
|
||
# 抓取所有新闻
|
||
python scripts/manual_news_fetch.py
|
||
|
||
# 只抓取加密货币新闻
|
||
python scripts/manual_news_fetch.py --category crypto
|
||
|
||
# 只抓取股票新闻
|
||
python scripts/manual_news_fetch.py --category stock
|
||
|
||
# 不使用 LLM 分析(仅关键词过滤)
|
||
python scripts/manual_news_fetch.py --no-llm
|
||
|
||
# 显示详细输出
|
||
python scripts/manual_news_fetch.py --verbose
|
||
"""
|
||
import asyncio
|
||
import argparse
|
||
import sys
|
||
import os
|
||
from pathlib import Path
|
||
|
||
# 添加项目路径
|
||
script_dir = Path(__file__).parent
|
||
project_root = script_dir.parent
|
||
backend_dir = project_root / "backend"
|
||
|
||
sys.path.insert(0, str(backend_dir))
|
||
|
||
# 切换到 backend 目录作为工作目录
|
||
os.chdir(backend_dir)
|
||
|
||
from app.utils.logger import logger
|
||
from app.news_agent.news_agent import get_news_agent
|
||
from app.news_agent.fetcher import NewsFetcher, NewsItem
|
||
from app.news_agent.filter import NewsDeduplicator, NewsFilter
|
||
from app.news_agent.analyzer import NewsAnalyzer, NewsAnalyzerSimple
|
||
from app.news_agent.news_db_service import get_news_db_service
|
||
from app.news_agent.notifier import get_news_notifier
|
||
|
||
|
||
async def main():
|
||
parser = argparse.ArgumentParser(description="手动执行新闻抓取和分析")
|
||
parser.add_argument(
|
||
"--category",
|
||
choices=["crypto", "stock"],
|
||
help="指定新闻分类(默认:全部)"
|
||
)
|
||
parser.add_argument(
|
||
"--no-llm",
|
||
action="store_true",
|
||
help="不使用 LLM 分析,仅使用关键词过滤"
|
||
)
|
||
parser.add_argument(
|
||
"--no-notify",
|
||
action="store_true",
|
||
help="不发送飞书通知"
|
||
)
|
||
parser.add_argument(
|
||
"--verbose",
|
||
action="store_true",
|
||
help="显示详细输出"
|
||
)
|
||
parser.add_argument(
|
||
"--limit",
|
||
type=int,
|
||
default=50,
|
||
help="每个源最多抓取数量(默认:50)"
|
||
)
|
||
parser.add_argument(
|
||
"--analyze-existing",
|
||
action="store_true",
|
||
help="分析数据库中未分析的文章(不抓取新新闻)"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
print("=" * 70)
|
||
print("📰 新闻智能体 - 手动抓取模式")
|
||
print("=" * 70)
|
||
print(f"📅 时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
if args.category:
|
||
print(f"📂 分类: {args.category}")
|
||
else:
|
||
print(f"📂 分类: 全部")
|
||
print(f"🤖 LLM 分析: {'禁用' if args.no_llm else '启用'}")
|
||
print(f"📢 通知: {'禁用' if args.no_notify else '启用'}")
|
||
print("=" * 70)
|
||
print()
|
||
|
||
# 初始化组件
|
||
fetcher = NewsFetcher()
|
||
deduplicator = NewsDeduplicator()
|
||
filter = NewsFilter()
|
||
db_service = get_news_db_service()
|
||
notifier = get_news_notifier()
|
||
|
||
# 选择分析器
|
||
if args.no_llm:
|
||
analyzer = NewsAnalyzerSimple()
|
||
print("使用规则分析器(关键词匹配)")
|
||
else:
|
||
analyzer = NewsAnalyzer()
|
||
print("使用 LLM 分析器")
|
||
|
||
try:
|
||
# 如果是分析已有文章模式
|
||
if args.analyze_existing:
|
||
print("\n" + "─" * 70)
|
||
print("📂 分析数据库中未分析的文章")
|
||
print("─" * 70)
|
||
|
||
unanalyzed = db_service.get_unanalyzed_articles(limit=args.limit, hours=48)
|
||
print(f"✅ 找到 {len(unanalyzed)} 条未分析的文章")
|
||
|
||
if not unanalyzed:
|
||
print("没有需要分析的文章")
|
||
return
|
||
|
||
# 转换为 NewsItem 对象
|
||
saved_articles = []
|
||
for article in unanalyzed:
|
||
# 创建临时 NewsItem
|
||
item = NewsItem(
|
||
title=article.title,
|
||
content=article.content or "",
|
||
url=article.url,
|
||
source=article.source,
|
||
category=article.category,
|
||
published_at=article.published_at,
|
||
crawled_at=article.crawled_at,
|
||
content_hash=article.content_hash,
|
||
author=article.author,
|
||
tags=article.tags
|
||
)
|
||
# 附加属性
|
||
item.quality_score = article.quality_score or 0.5
|
||
saved_articles.append((article, item))
|
||
|
||
else:
|
||
# 1. 抓取新闻
|
||
print("\n" + "─" * 70)
|
||
print("📡 第一步: 抓取新闻")
|
||
print("─" * 70)
|
||
|
||
items = await fetcher.fetch_all_news(category=args.category)
|
||
|
||
if not items:
|
||
print("❌ 没有获取到任何新闻")
|
||
return
|
||
|
||
print(f"✅ 获取到 {len(items)} 条新闻")
|
||
|
||
if args.verbose:
|
||
for i, item in enumerate(items[:10], 1):
|
||
print(f" {i}. [{item.category}] {item.title[:60]}...")
|
||
|
||
# 2. 去重
|
||
print("\n" + "─" * 70)
|
||
print("🔍 第二步: 去重")
|
||
print("─" * 70)
|
||
|
||
items = deduplicator.deduplicate_list(items)
|
||
print(f"✅ 去重后剩余 {len(items)} 条")
|
||
|
||
# 3. 过滤
|
||
print("\n" + "─" * 70)
|
||
print("⚙️ 第三步: 关键词过滤")
|
||
print("─" * 70)
|
||
|
||
filtered_items = filter.filter_news(items, min_quality=0.3)
|
||
print(f"✅ 过滤后剩余 {len(filtered_items)} 条")
|
||
|
||
if args.verbose and filtered_items:
|
||
print("\n通过过滤的新闻:")
|
||
for i, item in enumerate(filtered_items[:10], 1):
|
||
impact_score = getattr(item, 'impact_score', 0)
|
||
quality_score = getattr(item, 'quality_score', 0)
|
||
print(f" {i}. [{item.category}] 影响:{impact_score:.1f} 质量:{quality_score:.2f}")
|
||
print(f" {item.title[:60]}...")
|
||
|
||
# 4. 保存到数据库
|
||
print("\n" + "─" * 70)
|
||
print("💾 第四步: 保存到数据库")
|
||
print("─" * 70)
|
||
|
||
saved_articles = []
|
||
for item in filtered_items:
|
||
# 检查是否已存在
|
||
if db_service.check_duplicate_by_hash(item.content_hash):
|
||
if args.verbose:
|
||
print(f" ⊗ 已存在: {item.title[:50]}...")
|
||
continue
|
||
|
||
# 保存
|
||
article_data = {
|
||
'title': item.title,
|
||
'content': item.content,
|
||
'url': item.url,
|
||
'source': item.source,
|
||
'author': item.author,
|
||
'category': item.category,
|
||
'tags': item.tags,
|
||
'published_at': item.published_at,
|
||
'crawled_at': item.crawled_at,
|
||
'content_hash': item.content_hash,
|
||
'quality_score': getattr(item, 'quality_score', 0.5),
|
||
}
|
||
|
||
article = db_service.save_article(article_data)
|
||
if article:
|
||
saved_articles.append((article, item))
|
||
if args.verbose:
|
||
print(f" ✓ 已保存: {item.title[:50]}...")
|
||
|
||
if not args.analyze_existing:
|
||
print(f"✅ 保存了 {len(saved_articles)} 条新文章")
|
||
|
||
if not saved_articles:
|
||
print("\n没有新文章需要处理")
|
||
return
|
||
|
||
# 5. LLM 分析
|
||
print("\n" + "─" * 70)
|
||
print("🤖 第五步: AI 分析")
|
||
print("─" * 70)
|
||
|
||
analyzed_count = 0
|
||
high_priority_articles = []
|
||
|
||
# 批量分析
|
||
batch_size = 10 if not args.no_llm else 20
|
||
for i in range(0, len(saved_articles), batch_size):
|
||
batch = saved_articles[i:i + batch_size]
|
||
|
||
if args.no_llm:
|
||
# 规则分析
|
||
for article, item in batch:
|
||
result = analyzer.analyze_single(item)
|
||
priority = result.get('confidence', 50)
|
||
|
||
db_service.mark_as_analyzed(article.id, result, priority)
|
||
analyzed_count += 1
|
||
|
||
print(f" ✓ [{result['market_impact']}] {article.title[:50]}...")
|
||
print(f" 情绪: {result['sentiment']} | 相关: {', '.join(result.get('relevant_symbols', [])[:3])}")
|
||
|
||
if priority >= 40:
|
||
# 构造包含分析结果的完整字典
|
||
article_dict = article.to_dict()
|
||
article_dict.update({
|
||
'llm_analyzed': True,
|
||
'market_impact': result.get('market_impact'),
|
||
'impact_type': result.get('impact_type'),
|
||
'sentiment': result.get('sentiment'),
|
||
'summary': result.get('summary'),
|
||
'key_points': result.get('key_points'),
|
||
'trading_advice': result.get('trading_advice'),
|
||
'relevant_symbols': result.get('relevant_symbols'),
|
||
'priority': priority,
|
||
})
|
||
high_priority_articles.append(article_dict)
|
||
|
||
else:
|
||
# LLM 批量分析
|
||
items_to_analyze = [item for _, item in batch]
|
||
results = analyzer.analyze_batch(items_to_analyze)
|
||
|
||
for (article, _), result in zip(batch, results):
|
||
if result:
|
||
priority = analyzer.calculate_priority(
|
||
result,
|
||
getattr(article, 'quality_score', 0.5)
|
||
)
|
||
db_service.mark_as_analyzed(article.id, result, priority)
|
||
|
||
analyzed_count += 1
|
||
print(f" ✓ [{result['market_impact']}] {article.title[:50]}...")
|
||
print(f" {result.get('summary', 'N/A')[:60]}...")
|
||
|
||
if priority >= 40:
|
||
# 构造包含分析结果的完整字典
|
||
article_dict = article.to_dict()
|
||
# 更新为最新的分析结果
|
||
article_dict.update({
|
||
'llm_analyzed': True,
|
||
'market_impact': result.get('market_impact'),
|
||
'impact_type': result.get('impact_type'),
|
||
'sentiment': result.get('sentiment'),
|
||
'summary': result.get('summary'),
|
||
'key_points': result.get('key_points'),
|
||
'trading_advice': result.get('trading_advice'),
|
||
'relevant_symbols': result.get('relevant_symbols'),
|
||
'priority': priority,
|
||
})
|
||
high_priority_articles.append(article_dict)
|
||
|
||
print(f"\n✅ 分析了 {analyzed_count} 条文章")
|
||
print(f"🔥 发现 {len(high_priority_articles)} 条高优先级新闻")
|
||
|
||
# 6. 发送通知
|
||
if not args.no_notify and high_priority_articles:
|
||
print("\n" + "─" * 70)
|
||
print("📢 第六步: 发送通知")
|
||
print("─" * 70)
|
||
|
||
# 按优先级排序
|
||
high_priority_articles.sort(
|
||
key=lambda x: x.get('priority', 0),
|
||
reverse=True
|
||
)
|
||
|
||
# 如果只有1-2条,单独发送;否则批量发送
|
||
if len(high_priority_articles) <= 2:
|
||
for article in high_priority_articles:
|
||
await notifier.notify_single_news(article)
|
||
db_service.mark_as_notified(article['id'])
|
||
print(f" ✓ 已发送: {article['title'][:50]}...")
|
||
else:
|
||
await notifier.notify_news_batch(high_priority_articles[:10])
|
||
for article in high_priority_articles[:10]:
|
||
db_service.mark_as_notified(article['id'])
|
||
print(f" ✓ 已发送批量通知 ({len(high_priority_articles[:10])} 条)")
|
||
|
||
# 7. 统计报告
|
||
print("\n" + "=" * 70)
|
||
print("📊 处理完成 - 统计报告")
|
||
print("=" * 70)
|
||
|
||
db_stats = db_service.get_stats(hours=24)
|
||
print(f"📰 最近24小时:")
|
||
print(f" • 总文章数: {db_stats['total_articles']}")
|
||
print(f" • 已分析: {db_stats['analyzed']}")
|
||
print(f" • 高影响: {db_stats['high_impact']}")
|
||
print(f" • 已通知: {db_stats['notified']}")
|
||
|
||
print("\n" + "=" * 70)
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ 执行失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
|
||
finally:
|
||
await fetcher.close()
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
exit_code = asyncio.run(main())
|
||
sys.exit(exit_code)
|