stock-ai-agent/backend/app/stock_agent/stock_agent.py
2026-02-24 21:23:36 +08:00

843 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
美股交易智能体 - 主控制器(新架构版)
只进行市场分析和通知,不执行模拟交易
"""
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pandas as pd
from app.utils.logger import logger
from app.config import get_settings
from app.services.yfinance_service import get_yfinance_service
from app.services.feishu_service import get_feishu_stock_service
from app.services.telegram_service import get_telegram_service
from app.services.signal_database_service import get_signal_db_service
from app.services.fundamental_service import get_fundamental_service
from app.services.news_service import get_news_service
from app.stock_agent.market_signal_analyzer import StockMarketSignalAnalyzer
from app.utils.system_status import get_system_monitor, AgentStatus
# 股票名称映射表
STOCK_NAMES = {
# 美股 - 科技龙头
'AAPL': '苹果',
'MSFT': '微软',
'GOOGL': '谷歌',
'META': 'Meta',
'AMZN': '亚马逊',
'NVDA': '英伟达',
'AMD': 'AMD',
'AVGO': '博通',
'ARM': 'ARM',
'PLTR': 'Palantir',
'SNOW': 'Snowflake',
# 美股 - 生物医疗
'LLY': '礼来',
'NVO': '诺和诺德',
'VRTX': 'Vertex',
# 美股 - 新能源/汽车
'TSLA': '特斯拉',
'ENPH': 'Enphase',
# 美股 - 金融
'V': 'Visa',
'MA': 'Mastercard',
# 美股 - 消费
'HD': 'Home Depot',
'COST': 'Costco',
# 美股 - 其他
'RKLB': 'Relativity Space',
'HOOD': 'Robinhood',
'DXYZ': 'DEX',
'GLW': '康宁',
'UNTY': 'Unity',
'CRM': 'Salesforce',
'ADBE': 'Adobe',
'INTC': '英特尔',
'FSLR': 'First Solar',
'CRWD': 'CrowdStrike',
'SHOP': 'Shopify',
'NET': 'Cloudflare',
'COIN': 'Coinbase',
'MSTR': 'MicroStrategy',
# 港股
'0700.HK': '腾讯',
'9988.HK': '阿里巴巴',
'1810.HK': '小米',
'2015.HK': '理想汽车',
'9866.HK': '蔚来',
'9992.HK': '泡泡玛特',
'9626.HK': '哔哩哔哩',
'9880.HK': '优必选',
}
class StockAgent:
"""美股交易信号智能体LLM 驱动,仅分析通知)"""
def __init__(self):
"""初始化智能体"""
self.settings = get_settings()
self.yfinance = get_yfinance_service()
self.feishu = get_feishu_stock_service()
self.telegram = get_telegram_service()
self.market_analyzer = StockMarketSignalAnalyzer() # 使用新的市场信号分析器
self.signal_db = get_signal_db_service() # 信号数据库服务
self.fundamental = get_fundamental_service() # 基本面数据服务
self.news = get_news_service() # 新闻服务
# 状态管理
self.last_signals: Dict[str, Dict[str, Any]] = {}
self.signal_cooldown: Dict[str, datetime] = {}
# 配置 - 分别读取美股和港股
us_symbols = self.settings.stock_symbols_us.split(',') if self.settings.stock_symbols_us else []
hk_symbols = self.settings.stock_symbols_hk.split(',') if self.settings.stock_symbols_hk else []
self.symbols = us_symbols + hk_symbols
# 运行状态
self.running = False
self._event_loop = None
self._task = None
# 注册到系统监控
monitor = get_system_monitor()
self._monitor_info = monitor.register_agent(
agent_id="stock_agent",
name="股票智能体",
agent_type="stock"
)
# 分类美股和港股数量
us_count = len([s for s in self.symbols if not s.endswith('.HK')])
hk_count = len([s for s in self.symbols if s.endswith('.HK')])
monitor.update_config("stock_agent", {
"us_symbols": us_symbols,
"hk_symbols": hk_symbols,
"total_symbols": len(self.symbols),
"us_count": us_count,
"hk_count": hk_count,
"analysis_interval": f"{self.settings.stock_analysis_interval}"
})
logger.info(f"股票智能体初始化完成 - 美股: {us_count}只, 港股: {hk_count}只, 总计: {len(self.symbols)}")
@staticmethod
def get_stock_name(symbol: str) -> str:
"""获取股票中文名称"""
return STOCK_NAMES.get(symbol, symbol)
async def start(self):
"""启动智能体"""
if self.running:
logger.warning("美股智能体已在运行中")
return
self.running = True
self._event_loop = asyncio.get_event_loop()
# 更新状态为启动中
monitor = get_system_monitor()
monitor.update_status("stock_agent", AgentStatus.STARTING)
logger.info("美股智能体已启动")
# 启动分析任务
self._task = asyncio.create_task(self._analysis_loop())
# 更新状态为运行中
monitor.update_status("stock_agent", AgentStatus.RUNNING)
# 发送启动通知(卡片格式)
us_stocks = [s for s in self.symbols if not s.endswith('.HK')]
hk_stocks = [s for s in self.symbols if s.endswith('.HK')]
title = "📈 股票智能体已启动"
content_parts = [
f"🤖 **驱动引擎**: LLM 三维分析",
f"📊 **监控股票**: {len(self.symbols)}",
]
if us_stocks:
content_parts.append(f" 🇺🇸 美股 ({len(us_stocks)}): {', '.join(us_stocks[:3])}{'...' if len(us_stocks) > 3 else ''}")
if hk_stocks:
content_parts.append(f" 🇭🇰 港股 ({len(hk_stocks)}): {', '.join(hk_stocks[:3])}{'...' if len(hk_stocks) > 3 else ''}")
content_parts.extend([
f"⏰ **运行频率**: 每小时整点",
f"🎯 **分析维度**: 技术面(40%) + 基本面(35%) + 新闻(25%)",
f"📢 **当前模式**: 仅市场分析",
])
content = "\n".join(content_parts)
await self.feishu.send_card(title, content, "green")
async def stop(self):
"""停止智能体"""
self.running = False
# 更新状态为已停止
monitor = get_system_monitor()
monitor.update_status("stock_agent", AgentStatus.STOPPED)
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("美股智能体已停止")
async def _analysis_loop(self):
"""分析循环 - 根据交易时间分析对应市场的股票"""
while self.running:
try:
# 计算距离下一个整点的时间
now = datetime.now()
next_hour = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
wait_seconds = (next_hour - now).total_seconds()
logger.info(f"等待到下一个整点: {next_hour.strftime('%H:%M')} (等待 {int(wait_seconds)} 秒)")
# 等待到整点
await asyncio.sleep(wait_seconds)
# 分类股票:美股和港股
us_stocks = [s for s in self.symbols if not s.endswith('.HK')]
hk_stocks = [s for s in self.symbols if s.endswith('.HK')]
# 检查各市场交易时间
us_market_open = self._is_market_hours('US')
hk_market_open = self._is_market_hours('0700.HK')
# 检查是否是盘后分析时间
us_after_hours = self._is_after_hours('US')
hk_after_hours = self._is_after_hours('0700.HK')
# 确定要分析的股票列表
stocks_to_analyze = []
analysis_type = "盘中" # 默认为盘中分析
# 盘后分析:优先级更高,用于日线级别分析
if us_after_hours or hk_after_hours:
analysis_type = "盘后"
if us_after_hours:
stocks_to_analyze.extend(us_stocks)
logger.info(f"美股盘后分析,分析 {len(us_stocks)} 只美股(日线级别)")
if hk_after_hours:
stocks_to_analyze.extend(hk_stocks)
logger.info(f"港股盘后分析,分析 {len(hk_stocks)} 只港股(日线级别)")
else:
# 盘中分析
if us_market_open:
stocks_to_analyze.extend(us_stocks)
logger.info(f"美股交易时间,分析 {len(us_stocks)} 只美股")
if hk_market_open:
stocks_to_analyze.extend(hk_stocks)
logger.info(f"港股交易时间,分析 {len(hk_stocks)} 只港股")
# 如果没有需要分析的股票
if not stocks_to_analyze:
logger.debug("没有需要分析的股票")
continue
# 分析股票并收集结果
logger.info(f"开始{analysis_type}分析 {len(stocks_to_analyze)} 只股票")
analysis_results = []
for symbol in stocks_to_analyze:
if not self.running:
break
result = await self.analyze_symbol(symbol, is_after_hours=(analysis_type == "盘后"))
if result:
analysis_results.append(result)
# 生成并发送汇总报告
await self._send_summary_report(analysis_results, analysis_type)
logger.info(f"本次{analysis_type}分析完成")
except Exception as e:
logger.error(f"分析循环出错: {e}")
await asyncio.sleep(60) # 出错后等待 1 分钟再重试
def _is_market_hours(self, symbol: str = None) -> bool:
"""
判断当前是否在交易时间
美股交易时间: 周一至周五 9:30-16:00 (EST)
北京时间:
- 冬令时 (11月-3月): 22:30-05:00 (次日)
- 夏令时 (3月-11月): 21:30-04:00 (次日)
港股交易时间: 周一至周五
北京时间:
- 上午: 09:30-12:00
- 下午: 13:00-16:00
Args:
symbol: 股票代码(用于判断是美股还是港股)
Returns:
是否在交易时间
"""
from datetime import datetime
# 获取当前时间
now = datetime.now()
# 检查是否为周末
if now.weekday() >= 5: # 5=周六, 6=周日
return False
# 判断是港股还是美股
is_hk_stock = symbol and symbol.endswith('.HK') if symbol else False
# 获取当前小时和分钟
hour = now.hour
minute = now.minute
current_time = hour * 100 + minute # 转换为数字,如 2130 表示 21:30
if is_hk_stock:
# 港股交易时间: 09:30-12:00 或 13:00-16:00
return (930 <= current_time < 1200) or (1300 <= current_time < 1600)
else:
# 美股交易时间
# 判断夏令时/冬令时简单判断3-11月为夏令时
is_summer = 3 <= now.month <= 11
if is_summer:
# 夏令时: 21:30-04:00 (次日)
# 即 2130-2359 或 0000-0400
if current_time >= 2130 or current_time < 400:
return True
else:
# 冬令时: 22:30-05:00 (次日)
# 即 2230-2359 或 0000-0500
if current_time >= 2230 or current_time < 500:
return True
return False
def _is_any_market_hours(self) -> bool:
"""判断当前是否在任一市场的交易时间(美股或港股)"""
return self._is_market_hours('US') or self._is_market_hours('0700.HK')
def _is_after_hours(self, symbol: str) -> bool:
"""
判断当前是否是盘后分析时间收盘后2小时内
美股收盘时间:
- 夏令时: 北京时间 04:00 收盘
- 冬令时: 北京时间 05:00 收盘
港股收盘时间: 北京时间 16:00 收盘
盘后分析时间: 收盘后 2 小时内
Args:
symbol: 股票代码(用于判断是美股还是港股)
Returns:
是否是盘后分析时间
"""
from datetime import datetime
# 获取当前时间
now = datetime.now()
# 检查是否为周末
if now.weekday() >= 5: # 5=周六, 6=周日
return False
# 判断是港股还是美股
is_hk_stock = symbol and symbol.endswith('.HK') if symbol else False
# 获取当前小时和分钟
hour = now.hour
minute = now.minute
current_time = hour * 100 + minute # 转换为数字,如 1630 表示 16:30
if is_hk_stock:
# 港股盘后: 16:00-18:00 (收盘后2小时)
return 1600 <= current_time < 1800
else:
# 美股盘后
# 判断夏令时/冬令时简单判断3-11月为夏令时
is_summer = 3 <= now.month <= 11
if is_summer:
# 夏令时: 04:00-06:00 (收盘后2小时)
return 400 <= current_time < 600
else:
# 冬令时: 05:00-07:00 (收盘后2小时)
return 500 <= current_time < 700
return False
async def analyze_symbol(self, symbol: str, is_after_hours: bool = False) -> Optional[Dict[str, Any]]:
"""
分析单个股票
Args:
symbol: 股票代码
is_after_hours: 是否是盘后分析(盘后会更关注日线级别机会)
Returns:
分析结果字典,包含股票信息和信号
"""
# 更新活动时间
monitor = get_system_monitor()
monitor.update_activity("stock_agent")
result = {
'symbol': symbol,
'current_price': 0,
'signals': [],
'analysis_summary': '',
'notified': False
}
try:
# 1. 获取多时间周期数据
data = self.yfinance.get_multi_timeframe_data(symbol)
# 2. 验证数据完整性
if not self._validate_data(data):
logger.warning(f"{symbol} 数据不完整,跳过本次分析")
return result
# 3. 获取当前价格
ticker = self.yfinance.get_ticker(symbol)
if not ticker:
logger.warning(f"无法获取 {symbol} 当前价格")
return result
current_price = ticker['lastPrice']
result['current_price'] = current_price
# 获取股票中文名称
stock_name = STOCK_NAMES.get(symbol, '')
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
logger.info(f"\n{'='*60}")
logger.info(f"📊 分析 {symbol_display} @ ${current_price:,.2f}")
logger.info(f"{'='*60}")
# 4. 获取基本面数据
logger.info(f"\n📈 【基本面分析】")
fundamental_data = None
fundamental_summary = ""
try:
fundamental_data = self.fundamental.get_fundamental_data(symbol)
if fundamental_data:
# 传递已获取的数据,避免重复调用
fundamental_summary = self.fundamental.get_fundamental_summary(symbol, fundamental_data)
# 基本面评分已经在 fundamental_service 中输出
else:
logger.warning(f" ⚠️ 无法获取基本面数据")
except Exception as e:
logger.warning(f" ⚠️ 获取基本面数据失败: {e}")
# 5. 获取新闻数据
logger.info(f"\n📰 【新闻分析】")
news_data = None
try:
stock_name = STOCK_NAMES.get(symbol, '')
news_data = await self.news.search_stock_news(symbol, stock_name, max_results=5)
if news_data:
logger.info(f" 获取到 {len(news_data)} 条相关新闻")
else:
logger.info(f" 暂无相关新闻")
except Exception as e:
logger.warning(f" ⚠️ 获取新闻数据失败: {e}")
# 6. 市场信号分析(使用新架构 - 技术面 + 基本面 + 新闻)
logger.info(f"\n🤖 【市场信号分析中...】")
market_signal = await self.market_analyzer.analyze(
symbol, data,
symbols=self.symbols,
fundamental_data=fundamental_data,
news_data=news_data
)
# 输出分析摘要
summary = market_signal.get('analysis_summary', '')
result['analysis_summary'] = summary
logger.info(f" 市场状态: {summary}")
# 输出新闻情绪(如果有)
# 注:新的分析器不包含新闻分析,可以跳过或从其他地方获取
# 输出关键价位
levels = market_signal.get('key_levels', {})
if levels.get('support') or levels.get('resistance'):
support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]])
resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]])
logger.info(f" 支撑位: {support_str or '-'}")
logger.info(f" 阻力位: {resistance_str or '-'}")
# 5. 处理信号
signals = market_signal.get('signals', [])
result['signals'] = signals
if not signals:
logger.info(f"\n⏸️ 结论: 无交易信号,继续观望")
return result
# 输出所有信号
logger.info(f"\n🎯 【发现 {len(signals)} 个信号】")
for sig in signals:
signal_type = sig.get('type', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(signal_type, signal_type)
action = sig.get('action', 'wait')
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'}
action_text = action_map.get(action, action)
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(grade, '')
logger.info(f"\n {type_text} {action_text} [{grade}{grade_icon}] {confidence}%")
# 6. 过滤并通知最佳信号
best_signal = self._get_best_signal(signals)
if not best_signal:
logger.info(f"\n⏸️ 信号质量不高,不发送通知")
return result
# 检查置信度阈值
threshold = self.settings.stock_llm_threshold * 100
if best_signal.get('confidence', 0) < threshold:
logger.info(f"\n⏸️ 置信度不足 ({best_signal.get('confidence', 0)}% < {threshold}%)")
return result
# 检查冷却时间
if not self._should_send_signal(symbol, best_signal):
logger.info(f"\n⏸️ 信号冷却中,不发送通知")
return result
# 发送通知
await self._send_signal_notification(symbol, best_signal, current_price)
result['notified'] = True
result['best_signal'] = best_signal
# 更新状态
self.last_signals[symbol] = best_signal
self.signal_cooldown[symbol] = datetime.now()
return result
except Exception as e:
logger.error(f"❌ 分析 {symbol} 出错: {e}")
import traceback
logger.error(traceback.format_exc())
return result
def _get_best_signal(self, signals: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""获取最佳信号"""
# 过滤掉 D 级信号
valid_signals = [s for s in signals if s.get('grade', 'D') != 'D']
if not valid_signals:
return None
# 按等级和置信度排序
grade_order = {'A': 0, 'B': 1, 'C': 2}
valid_signals.sort(key=lambda x: (
grade_order.get(x.get('grade', 'C'), 3),
-x.get('confidence', 0)
))
return valid_signals[0]
def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool:
"""判断是否应该发送信号"""
action = signal.get('action', 'wait')
if action == 'wait':
return False
# 检查冷却时间60分钟内不重复发送相同方向的信号
if symbol in self.signal_cooldown:
cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=60)
if datetime.now() < cooldown_end:
if symbol in self.last_signals:
if self.last_signals[symbol].get('action') == action:
logger.debug(f"{symbol} 信号冷却中,跳过")
return False
return True
async def _send_signal_notification(
self,
symbol: str,
signal: Dict[str, Any],
current_price: float
):
"""发送信号通知"""
try:
from app.utils.signal_formatter import get_signal_formatter
formatter = get_signal_formatter()
# 使用格式化工具格式化信号
card = formatter.format_feishu_card(signal, symbol, agent_type='stock')
title = card['title']
content = card['content']
# 根据信号方向选择颜色
color = "green" if signal.get('action') == 'buy' else "red"
# 发送到飞书
await self.feishu.send_card(title, content, color)
# 发送到 Telegram
await self.telegram.send_message(formatter.format_signal_message(signal, symbol, agent_type='stock'))
logger.info(f"✅ 信号通知已发送: {title}")
# 保存信号到数据库
signal_to_save = signal.copy()
signal_to_save['signal_type'] = 'stock'
signal_to_save['symbol'] = symbol
signal_to_save['current_price'] = current_price
self.signal_db.add_signal(signal_to_save)
except Exception as e:
logger.error(f"发送通知失败: {e}")
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
"""验证数据完整性"""
required_intervals = ['1d', '1h']
for interval in required_intervals:
if interval not in data or data[interval].empty:
return False
if len(data[interval]) < 20:
return False
return True
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
"""单次分析(用于测试或手动触发)"""
data = self.yfinance.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
return {'error': '数据不完整'}
# 获取基本面数据
fundamental_data = None
fundamental_summary = ""
try:
fundamental_data = self.fundamental.get_fundamental_data(symbol)
if fundamental_data:
# 传递已获取的数据,避免重复调用
fundamental_summary = self.fundamental.get_fundamental_summary(symbol, fundamental_data)
except Exception as e:
logger.warning(f"获取基本面数据失败: {e}")
result = await self.market_analyzer.analyze(
symbol, data,
symbols=self.symbols
)
return result
def get_status(self) -> Dict[str, Any]:
"""获取智能体状态"""
return {
'running': self.running,
'symbols': self.symbols,
'mode': 'LLM 驱动(仅分析通知)',
'last_signals': {
symbol: {
'type': sig.get('type'),
'action': sig.get('action'),
'confidence': sig.get('confidence'),
'grade': sig.get('grade')
}
for symbol, sig in self.last_signals.items()
}
}
async def _send_summary_report(self, results: List[Dict[str, Any]], analysis_type: str = "盘中"):
"""
生成并发送分析汇总报告
Args:
results: 所有股票的分析结果列表
analysis_type: 分析类型 ("盘中""盘后")
"""
try:
now = datetime.now()
total = len(results)
with_signals = [r for r in results if r.get('signals')]
notified = [r for r in results if r.get('notified')]
# 区分美股和港股
us_results = [r for r in results if not r['symbol'].endswith('.HK')]
hk_results = [r for r in results if r['symbol'].endswith('.HK')]
us_with_signals = [r for r in us_results if r.get('signals')]
hk_with_signals = [r for r in hk_results if r.get('signals')]
# 统计信号
buy_signals = []
sell_signals = []
high_quality_signals = [] # A/B级信号
for r in with_signals:
for sig in r.get('signals', []):
sig['symbol'] = r['symbol']
sig['current_price'] = r.get('current_price', 0)
sig['is_hk'] = r['symbol'].endswith('.HK')
sig['stock_name'] = STOCK_NAMES.get(r['symbol'], '')
if sig.get('action') == 'buy':
buy_signals.append(sig)
elif sig.get('action') == 'sell':
sell_signals.append(sig)
if sig.get('grade') in ['A', 'B']:
high_quality_signals.append(sig)
# 按置信度排序
high_quality_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True)
# 构建汇总报告
analysis_tag = f"{analysis_type}分析】"
logger.info(f"\n{'='*80}")
logger.info(f"📊 股票分析汇总报告 {analysis_tag}")
logger.info(f"{'='*80}")
logger.info(f"时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"分析总数: {total} 只 (美股: {len(us_results)}, 港股: {len(hk_results)})")
logger.info(f"有信号: {len(with_signals)} 只 (美股: {len(us_with_signals)}, 港股: {len(hk_with_signals)})")
logger.info(f"已通知: {len(notified)}")
logger.info(f"")
# 显示高等级信号
if high_quality_signals:
logger.info(f"⭐ 高等级信号 (A/B级): {len(high_quality_signals)}")
for sig in high_quality_signals[:10]: # 最多显示10个
symbol = sig['symbol']
stock_name = sig.get('stock_name', '')
market_tag = '[港股]' if sig.get('is_hk') else '[美股]'
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
price = sig.get('current_price', 0)
entry = sig.get('entry_price', 0)
# 构建带名称的股票显示
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
logger.info(f" {market_tag} {symbol_display} {action} [{grade}级] {confidence}% @ ${price:,.2f}")
if entry > 0:
logger.info(f" 入场: ${entry:,.2f}")
logger.info(f"")
# 统计汇总
logger.info(f"📈 做多信号: {len(buy_signals)}")
logger.info(f"📉 做空信号: {len(sell_signals)}")
logger.info(f"{'='*80}\n")
# 发送飞书汇总
await self._send_feishu_summary(
now, total, with_signals, notified,
buy_signals, sell_signals, high_quality_signals,
len(us_results), len(hk_results),
analysis_type
)
except Exception as e:
logger.error(f"生成汇总报告失败: {e}")
import traceback
logger.error(traceback.format_exc())
async def _send_feishu_summary(
self,
now: datetime,
total: int,
with_signals: List,
notified: List,
buy_signals: List,
sell_signals: List,
high_quality_signals: List,
us_count: int = 0,
hk_count: int = 0,
analysis_type: str = "盘中"
):
"""发送飞书汇总报告"""
try:
# 构建内容
analysis_tag = f"{analysis_type}分析】"
content_parts = [
f"**📊 股票分析汇总报告 {analysis_tag}**",
f"",
f"⏰ 时间: {now.strftime('%Y-%m-%d %H:%M')}",
f"",
f"📊 **分析概况**",
f"• 美股: {us_count} 只 | 港股: {hk_count}",
f"• 发现信号: {len(with_signals)}",
f"• 已发通知: {len(notified)}",
f"",
]
# 高等级信号
if high_quality_signals:
content_parts.append(f"⭐ **高等级信号 (A/B级)**")
for sig in high_quality_signals[:5]:
symbol = sig['symbol']
stock_name = sig.get('stock_name', '')
market_tag = '[港股]' if sig.get('is_hk') else '[美股]'
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
# 构建带名称的股票显示
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
content_parts.append(f"{market_tag} {symbol_display} {action} {grade}{confidence}%")
content_parts.append(f"")
# 信号统计
content_parts.extend([
f"📈 做多信号: {len(buy_signals)}",
f"📉 做空信号: {len(sell_signals)}",
f"",
f"*⚠️ 仅供参考,不构成投资建议*"
])
content = "\n".join(content_parts)
# 发送飞书 - 标题包含分析类型
type_tag = "盘后" if analysis_type == "盘后" else "分析"
title = f"📊 股票{type_tag}汇总 ({now.strftime('%H:%M')})"
color = "blue"
await self.feishu.send_card(title, content, color)
logger.info("✅ 汇总报告已发送到飞书")
except Exception as e:
logger.error(f"发送飞书汇总失败: {e}")
# 全局单例
_stock_agent: Optional[StockAgent] = None
def get_stock_agent() -> StockAgent:
"""获取美股智能体单例"""
global _stock_agent
if _stock_agent is None:
_stock_agent = StockAgent()
return _stock_agent