stock-ai-agent/scripts/test_stock.py
2026-02-20 10:02:00 +08:00

402 lines
14 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
美股分析脚本(修复版)
用法:
python3 scripts/test_stock.py AAPL
python3 scripts/test_stock.py AAPL TSLA NVDA
"""
import sys
import os
# 确保路径正确
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
backend_dir = os.path.join(project_root, 'backend')
sys.path.insert(0, backend_dir)
import asyncio
from app.services.yfinance_service import get_yfinance_service
from app.services.feishu_service import get_feishu_service
from app.services.telegram_service import get_telegram_service
from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer
from app.config import get_settings
from app.utils.logger import logger
async def analyze(symbol: str, send_notification: bool = True):
"""分析单只股票
Args:
symbol: 股票代码
send_notification: 是否发送通知默认True
Returns:
分析结果字典
"""
result = {
'symbol': symbol,
'price': 0,
'signals': [],
'notified': False
}
# 获取配置
settings = get_settings()
threshold = settings.stock_llm_threshold * 100 # 转换为百分比
print(f"\n{'='*60}")
print(f"📊 分析 {symbol}")
print(f"{'='*60}")
try:
# 获取服务
yf_service = get_yfinance_service()
llm = LLMSignalAnalyzer(agent_type="stock") # 指定使用 stock 模型配置
feishu = get_feishu_service()
telegram = get_telegram_service()
# 获取行情
print(f"获取行情...")
ticker = yf_service.get_ticker(symbol)
if not ticker:
print(f"❌ 无法获取 {symbol} 行情")
return result
price = ticker['lastPrice']
change = ticker['priceChangePercent']
result['price'] = price
print(f"价格: ${price:,.2f} ({change:+.2f}%)")
print(f"成交量: {ticker['volume']:,}")
# 获取K线
print(f"获取K线数据...")
data = yf_service.get_multi_timeframe_data(symbol)
if not data:
print(f"❌ 无法获取K线数据")
return result
print(f"时间周期: {', '.join(data.keys())}")
# LLM分析
print(f"\n🤖 LLM分析中...\n")
analysis = await llm.analyze(symbol, data, symbols=[symbol], position_info=None)
# 输出结果
summary = analysis.get('analysis_summary', '')
signals = analysis.get('signals', [])
result['signals'] = signals
print(f"市场状态: {summary}")
if signals:
print(f"\n🎯 发现 {len(signals)} 个信号:")
for sig in signals:
action = sig.get('action', 'wait')
grade = sig.get('grade', 'D')
conf = sig.get('confidence', 0)
action_text = {'buy': '🟢 做多', 'sell': '🔴 做空'}.get(action, action)
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': ''}.get(grade, '')
print(f"\n{action_text} [{grade}{grade_icon}] {conf}%")
entry = sig.get('entry_price')
sl = sig.get('stop_loss')
tp = sig.get('take_profit')
if entry and sl and tp:
print(f" 入场: ${entry:,.2f}")
print(f" 止损: ${sl:,.2f}")
print(f" 止盈: ${tp:,.2f}")
reason = sig.get('reason', '')
if reason:
# 限制理由长度
short_reason = reason[:80] + "..." if len(reason) > 80 else reason
print(f" 理由: {short_reason}")
# 发送通知(仅发送置信度 >= 阈值的信号)
if send_notification:
best_signal = None
for sig in signals:
if sig.get('confidence', 0) >= threshold and sig.get('grade', 'D') != 'D':
best_signal = sig
break
if best_signal:
# 使用正确的方法格式化通知
card = llm.format_feishu_card(best_signal, symbol)
title = card['title']
content = card['content']
# 发送通知 - 使用 send_card 方法
# 根据信号方向选择颜色
color = "green" if best_signal.get('action') == 'buy' else "red"
await feishu.send_card(title, content, color)
await telegram.send_message(llm.format_signal_message(best_signal, symbol))
print(f"\n📬 通知已发送:{title}")
result['notified'] = True
else:
print(f"\n⏸️ 置信度不足,不发送通知(阈值: {threshold}%")
else:
print(f"\n⏸️ 无交易信号")
return result
except Exception as e:
print(f"❌ 错误: {e}")
import traceback
traceback.print_exc()
return result
def print_summary_report(results: list, send_notification: bool = True):
"""打印汇总报告并发送通知
Args:
results: 分析结果列表
send_notification: 是否发送通知默认True
"""
from app.config import get_settings
settings = get_settings()
threshold = settings.stock_llm_threshold * 100 # 获取阈值
total = len(results)
with_signals = [r for r in results if r.get('signals')]
notified = [r for r in results if r.get('notified')]
# 统计信号
buy_count = 0
sell_count = 0
high_quality_signals = [] # A/B级信号且达到阈值
all_signals = [] # 所有信号
for r in with_signals:
for sig in r.get('signals', []):
sig['symbol'] = r['symbol']
sig['current_price'] = r.get('price', 0)
all_signals.append(sig)
if sig.get('action') == 'buy':
buy_count += 1
elif sig.get('action') == 'sell':
sell_count += 1
# 只统计达到阈值的A/B级信号
if sig.get('grade') in ['A', 'B'] and sig.get('confidence', 0) >= threshold:
high_quality_signals.append(sig)
# 按置信度排序
high_quality_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True)
all_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True)
# 打印汇总
print("\n" + "="*80)
print("📊 美股分析汇总报告")
print("="*80)
print(f"分析数量: {total} 只股票")
print(f"有信号: {len(with_signals)}")
print(f"已通知: {len(notified)}")
print(f"通知阈值: {threshold}%")
print("")
# 显示高等级信号(达到阈值的)
if high_quality_signals:
print(f"⭐ 高等级信号达到阈值 (A/B级 >= {threshold}%): {len(high_quality_signals)}")
for sig in high_quality_signals[:10]:
symbol = sig['symbol']
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
price = sig.get('current_price', 0)
entry = sig.get('entry_price', 0)
print(f" {symbol} {action} [{grade}级] {confidence}% @ ${price:,.2f}")
if entry > 0:
print(f" 入场: ${entry:,.2f}")
print("")
# 显示未达到阈值但质量不错的信号
below_threshold = [s for s in all_signals
if s.get('grade') in ['A', 'B'] and s.get('confidence', 0) < threshold]
if below_threshold:
print(f"⚠️ 以下信号未达到通知阈值 ({threshold}%):")
for sig in below_threshold[:10]:
symbol = sig['symbol']
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
print(f" {symbol} {action} {grade}{confidence}%")
print("")
# 统计汇总
print(f"📈 做多信号: {buy_count}")
print(f"📉 做空信号: {sell_count}")
print("="*80)
# 发送汇总通知
if send_notification:
asyncio.run(send_summary_notification(
results, total, with_signals, notified,
buy_count, sell_count, high_quality_signals, all_signals, threshold
))
async def send_summary_notification(
results: list,
total: int,
with_signals: list,
notified: list,
buy_count: int,
sell_count: int,
high_quality_signals: list,
all_signals: list,
threshold: float
):
"""发送汇总报告到飞书和Telegram
Args:
results: 分析结果列表
total: 总数
with_signals: 有信号的股票列表
notified: 已通知的股票列表
buy_count: 做多信号数量
sell_count: 做空信号数量
high_quality_signals: 达到阈值的高等级信号列表
all_signals: 所有信号列表
threshold: 通知阈值
"""
try:
from datetime import datetime
feishu = get_feishu_service()
telegram = get_telegram_service()
now = datetime.now()
# 构建飞书汇总内容
content_parts = [
f"**📊 美股分析汇总报告**",
f"",
f"⏰ 时间: {now.strftime('%Y-%m-%d %H:%M')}",
f"",
f"📊 **分析概况**",
f"• 分析总数: {total}",
f"• 发现信号: {len(with_signals)}",
f"• 已发通知: {len(notified)}",
f"• 通知阈值: {threshold:.0f}%",
f"",
]
# 高等级信号(达到阈值的)
if high_quality_signals:
content_parts.append(f"⭐ **高等级信号 (A/B级 ≥ {threshold:.0f}%)**")
for sig in high_quality_signals[:5]:
symbol = sig['symbol']
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
content_parts.append(f"{symbol} {action} {grade}{confidence}%")
content_parts.append(f"")
# 信号统计
content_parts.extend([
f"📈 做多信号: {buy_count}",
f"📉 做空信号: {sell_count}",
f"",
f"*⚠️ 仅供参考,不构成投资建议*"
])
content = "\n".join(content_parts)
# 发送飞书
title = f"📊 美股分析汇总 ({now.strftime('%H:%M')})"
color = "blue"
await feishu.send_card(title, content, color)
# 发送 Telegram
telegram_msg = f"📊 *美股分析汇总*\n\n"
telegram_msg += f"时间: {now.strftime('%H:%M')}\n"
telegram_msg += f"分析: {total}只 | 信号: {len(with_signals)}只 | 通知: {len(notified)}\n"
telegram_msg += f"阈值: {threshold:.0f}%\n\n"
if high_quality_signals:
telegram_msg += f"⭐ *高等级信号 (≥{threshold:.0f}%)*\n"
for sig in high_quality_signals[:5]:
symbol = sig['symbol']
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
telegram_msg += f"{symbol} {action} {grade}{confidence}%\n"
telegram_msg += "\n"
telegram_msg += f"做多: {buy_count} | 做空: {sell_count}"
await telegram.send_message(telegram_msg)
print(f"\n📬 汇总报告已发送到飞书和Telegram")
except Exception as e:
print(f"❌ 发送汇总通知失败: {e}")
import traceback
traceback.print_exc()
async def main():
# 从命令行参数获取股票代码
if len(sys.argv) < 2:
print("用法: python3 scripts/test_stock.py AAPL [TSLA] [NVDA] ...")
print("\n示例:")
print(" python3 scripts/test_stock.py AAPL")
print(" python3 scripts/test_stock.py AAPL TSLA NVDA")
sys.exit(1)
# 过滤掉无效的参数(如环境变量路径、配置项等)
# 只接受有效的股票代码字母开头1-5个字符
raw_symbols = sys.argv[1:]
symbols = []
for arg in raw_symbols:
# 跳过包含路径分隔符、等号、或明显不是股票代码的参数
if '/' in arg or '=' in arg or ':' in arg or len(arg) > 10:
logger.debug(f"跳过无效参数: {arg}")
continue
# 只保留纯字母的参数(股票代码)
if arg.isalpha() and 1 <= len(arg) <= 5:
symbols.append(arg.upper())
else:
logger.debug(f"跳过非股票代码参数: {arg}")
if not symbols:
print("❌ 未找到有效的股票代码")
print(f"接收到的参数: {raw_symbols}")
print("\n用法: python3 scripts/test_stock.py AAPL [TSLA] [NVDA] ...")
sys.exit(1)
print("="*60)
print("🤖 美股分析脚本")
print("="*60)
print(f"股票: {', '.join(symbols)}")
print(f"时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 收集所有分析结果
results = []
for symbol in symbols:
result = await analyze(symbol.upper(), send_notification=True)
results.append(result)
# 生成汇总报告并发送通知
print_summary_report(results, send_notification=True)
print("\n" + "="*60)
print("✅ 分析完成")
print("="*60)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断")