diff --git a/backend/app/crypto_agent/llm_signal_analyzer.py b/backend/app/crypto_agent/llm_signal_analyzer.py index d1e4b1e..c262af9 100644 --- a/backend/app/crypto_agent/llm_signal_analyzer.py +++ b/backend/app/crypto_agent/llm_signal_analyzer.py @@ -199,13 +199,35 @@ class LLMSignalAnalyzer: 7. entry_type 必须明确:信号已触发用 market,等待更好价位用 limit 8. **position_size 必须明确**:根据信号质量和持仓情况给出 heavy/medium/light""" - def __init__(self): - """初始化分析器""" + def __init__(self, agent_type: str = "crypto"): + """初始化分析器 + + Args: + agent_type: 智能体类型,支持 'crypto', 'stock', 'smart' + """ from app.config import get_settings self.news_service = get_news_service() settings = get_settings() - self.model_override = getattr(settings, 'crypto_agent_model', None) - logger.info(f"LLM 信号分析器初始化完成(含新闻舆情,模型: {self.model_override or '默认'})") + + # 根据智能体类型选择模型配置 + model_config_map = { + 'crypto': 'crypto_agent_model', + 'stock': 'stock_agent_model', + 'smart': 'smart_agent_model' + } + + config_key = model_config_map.get(agent_type, 'crypto_agent_model') + self.model_override = getattr(settings, config_key, None) + self.agent_type = agent_type + + agent_name_map = { + 'crypto': '加密货币', + 'stock': '美股', + 'smart': '智能助手' + } + agent_name = agent_name_map.get(agent_type, '未知') + + logger.info(f"LLM 信号分析器初始化完成({agent_name},模型: {self.model_override or '默认'})") async def analyze(self, symbol: str, data: Dict[str, pd.DataFrame], symbols: List[str] = None, diff --git a/backend/app/stock_agent/stock_agent.py b/backend/app/stock_agent/stock_agent.py index 8236cfd..c13afc8 100644 --- a/backend/app/stock_agent/stock_agent.py +++ b/backend/app/stock_agent/stock_agent.py @@ -25,7 +25,7 @@ class StockAgent: self.yfinance = get_yfinance_service() self.feishu = get_feishu_service() self.telegram = get_telegram_service() - self.llm_analyzer = LLMSignalAnalyzer() + self.llm_analyzer = LLMSignalAnalyzer(agent_type="stock") # 指定使用 stock 模型配置 self.signal_db = get_signal_db_service() # 信号数据库服务 # 状态管理 @@ -88,12 +88,19 @@ class StockAgent: # 继续等待下一个整点 continue - # 在交易时间内,分析所有股票 + # 在交易时间内,分析所有股票并收集结果 logger.info(f"开始分析 {len(self.symbols)} 只股票") + analysis_results = [] + for symbol in self.symbols: if not self.running: break - await self.analyze_symbol(symbol) + result = await self.analyze_symbol(symbol) + if result: + analysis_results.append(result) + + # 生成并发送汇总报告 + await self._send_summary_report(analysis_results) logger.info("本次分析完成") @@ -143,13 +150,24 @@ class StockAgent: return False - async def analyze_symbol(self, symbol: str): + async def analyze_symbol(self, symbol: str) -> Optional[Dict[str, Any]]: """ 分析单个股票 Args: symbol: 股票代码 + + Returns: + 分析结果字典,包含股票信息和信号 """ + result = { + 'symbol': symbol, + 'current_price': 0, + 'signals': [], + 'analysis_summary': '', + 'notified': False + } + try: # 1. 获取多时间周期数据 data = self.yfinance.get_multi_timeframe_data(symbol) @@ -157,14 +175,15 @@ class StockAgent: # 2. 验证数据完整性 if not self._validate_data(data): logger.warning(f"{symbol} 数据不完整,跳过本次分析") - return + return result # 3. 获取当前价格 ticker = self.yfinance.get_ticker(symbol) if not ticker: logger.warning(f"无法获取 {symbol} 当前价格") - return + return result current_price = ticker['lastPrice'] + result['current_price'] = current_price logger.info(f"\n{'='*60}") logger.info(f"📊 分析 {symbol} @ ${current_price:,.2f}") @@ -172,19 +191,20 @@ class StockAgent: # 4. LLM 分析 logger.info(f"\n🤖 【LLM 分析中...】") - result = await self.llm_analyzer.analyze( + analysis = await self.llm_analyzer.analyze( symbol, data, symbols=self.symbols, position_info=None # 美股不跟踪持仓 ) # 输出分析摘要 - summary = result.get('analysis_summary', '无') + summary = analysis.get('analysis_summary', '无') + result['analysis_summary'] = summary logger.info(f" 市场状态: {summary}") # 输出新闻情绪 - news_sentiment = result.get('news_sentiment', '') - news_impact = result.get('news_impact', '') + news_sentiment = analysis.get('news_sentiment', '') + news_impact = analysis.get('news_impact', '') if news_sentiment: sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': '➖'}.get(news_sentiment, '') logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}") @@ -192,7 +212,7 @@ class StockAgent: logger.info(f" 消息影响: {news_impact}") # 输出关键价位 - levels = result.get('key_levels', {}) + levels = analysis.get('key_levels', {}) if levels.get('support') or levels.get('resistance'): support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]]) resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]]) @@ -200,11 +220,12 @@ class StockAgent: logger.info(f" 阻力位: {resistance_str or '-'}") # 5. 处理信号 - signals = result.get('signals', []) + signals = analysis.get('signals', []) + result['signals'] = signals if not signals: logger.info(f"\n⏸️ 结论: 无交易信号,继续观望") - return + return result # 输出所有信号 logger.info(f"\n🎯 【发现 {len(signals)} 个信号】") @@ -229,30 +250,35 @@ class StockAgent: if not best_signal: logger.info(f"\n⏸️ 信号质量不高,不发送通知") - return + return result # 检查置信度阈值 threshold = self.settings.stock_llm_threshold * 100 if best_signal.get('confidence', 0) < threshold: logger.info(f"\n⏸️ 置信度不足 ({best_signal.get('confidence', 0)}% < {threshold}%)") - return + return result # 检查冷却时间 if not self._should_send_signal(symbol, best_signal): logger.info(f"\n⏸️ 信号冷却中,不发送通知") - return + return result # 发送通知 await self._send_signal_notification(symbol, best_signal, current_price) + result['notified'] = True + result['best_signal'] = best_signal # 更新状态 self.last_signals[symbol] = best_signal self.signal_cooldown[symbol] = datetime.now() + return result + except Exception as e: logger.error(f"❌ 分析 {symbol} 出错: {e}") import traceback logger.error(traceback.format_exc()) + return result def _get_best_signal(self, signals: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """获取最佳信号""" @@ -301,8 +327,11 @@ class StockAgent: title = card['title'] content = card['content'] + # 根据信号方向选择颜色 + color = "green" if signal.get('action') == 'buy' else "red" + # 发送到飞书 - await self.feishu.send_markdown(title, content) + await self.feishu.send_card(title, content, color) # 发送到 Telegram await self.telegram.send_message(self.llm_analyzer.format_signal_message(signal, symbol)) @@ -360,6 +389,138 @@ class StockAgent: } } + async def _send_summary_report(self, results: List[Dict[str, Any]]): + """ + 生成并发送分析汇总报告 + + Args: + results: 所有股票的分析结果列表 + """ + try: + now = datetime.now() + total = len(results) + with_signals = [r for r in results if r.get('signals')] + notified = [r for r in results if r.get('notified')] + + # 统计信号 + buy_signals = [] + sell_signals = [] + high_quality_signals = [] # A/B级信号 + + for r in with_signals: + for sig in r.get('signals', []): + sig['symbol'] = r['symbol'] + sig['current_price'] = r.get('current_price', 0) + + if sig.get('action') == 'buy': + buy_signals.append(sig) + elif sig.get('action') == 'sell': + sell_signals.append(sig) + + if sig.get('grade') in ['A', 'B']: + high_quality_signals.append(sig) + + # 按置信度排序 + high_quality_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True) + + # 构建汇总报告 + logger.info(f"\n{'='*80}") + logger.info(f"📊 美股分析汇总报告") + logger.info(f"{'='*80}") + logger.info(f"时间: {now.strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(f"分析数量: {total} 只股票") + logger.info(f"有信号: {len(with_signals)} 只") + logger.info(f"已通知: {len(notified)} 只") + logger.info(f"") + + # 显示高等级信号 + if high_quality_signals: + logger.info(f"⭐ 高等级信号 (A/B级): {len(high_quality_signals)} 个") + for sig in high_quality_signals[:10]: # 最多显示10个 + symbol = sig['symbol'] + action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' + grade = sig.get('grade', 'D') + confidence = sig.get('confidence', 0) + price = sig.get('current_price', 0) + entry = sig.get('entry_price', 0) + + logger.info(f" {symbol} {action} [{grade}级] {confidence}% @ ${price:,.2f}") + if entry > 0: + logger.info(f" 入场: ${entry:,.2f}") + logger.info(f"") + + # 统计汇总 + logger.info(f"📈 做多信号: {len(buy_signals)} 个") + logger.info(f"📉 做空信号: {len(sell_signals)} 个") + logger.info(f"{'='*80}\n") + + # 发送飞书汇总 + await self._send_feishu_summary( + now, total, with_signals, notified, + buy_signals, sell_signals, high_quality_signals + ) + + except Exception as e: + logger.error(f"生成汇总报告失败: {e}") + import traceback + logger.error(traceback.format_exc()) + + async def _send_feishu_summary( + self, + now: datetime, + total: int, + with_signals: List, + notified: List, + buy_signals: List, + sell_signals: List, + high_quality_signals: List + ): + """发送飞书汇总报告""" + try: + # 构建内容 + content_parts = [ + f"**美股分析汇总报告**", + f"", + f"⏰ 时间: {now.strftime('%Y-%m-%d %H:%M')}", + f"", + f"📊 **分析概况**", + f"• 分析总数: {total} 只", + f"• 发现信号: {len(with_signals)} 只", + f"• 已发通知: {len(notified)} 只", + f"", + ] + + # 高等级信号 + if high_quality_signals: + content_parts.append(f"⭐ **高等级信号 (A/B级)**") + for sig in high_quality_signals[:5]: + symbol = sig['symbol'] + action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' + grade = sig.get('grade', 'D') + confidence = sig.get('confidence', 0) + content_parts.append(f"• {symbol} {action} {grade}级 {confidence}%") + content_parts.append(f"") + + # 信号统计 + content_parts.extend([ + f"📈 做多信号: {len(buy_signals)} 个", + f"📉 做空信号: {len(sell_signals)} 个", + f"", + f"*⚠️ 仅供参考,不构成投资建议*" + ]) + + content = "\n".join(content_parts) + + # 发送飞书 + title = f"📊 美股分析汇总 ({now.strftime('%H:%M')})" + color = "blue" + + await self.feishu.send_card(title, content, color) + logger.info("✅ 汇总报告已发送到飞书") + + except Exception as e: + logger.error(f"发送飞书汇总失败: {e}") + # 全局单例 _stock_agent: Optional[StockAgent] = None diff --git a/scripts/stock.sh b/scripts/stock.sh index 0fd31bc..04b0776 100755 --- a/scripts/stock.sh +++ b/scripts/stock.sh @@ -11,17 +11,29 @@ cd "$(dirname "$0")/.." || exit 1 if [ $# -eq 0 ]; then # 无参数,分析配置的所有股票 echo "📊 分析配置的所有股票(将发送通知)..." - # 使用更可靠的方式获取股票代码 - python3 -c " -import sys -sys.path.insert(0, 'backend') -from app.config import get_settings -settings = get_settings() -symbols = [s.strip() for s in settings.stock_symbols.split(',') if s.strip()] -print(' '.join(symbols)) -" 2>/dev/null | while read -r symbols; do - python3 scripts/test_stock.py $symbols - done + + # 直接从 .env 文件读取股票代码 + if [ -f .env ]; then + # 使用 grep 提取 STOCK_SYMBOLS 行,然后提取值 + STOCKS=$(grep "^STOCK_SYMBOLS=" .env | cut -d'=' -f2) + + if [ -z "$STOCKS" ]; then + echo "❌ 无法从 .env 文件读取股票列表" + exit 1 + fi + + echo "📋 股票列表: $STOCKS" + + # 使用 read array 来正确处理空格分隔的股票代码 + # 将逗号分隔转换为空格分隔 + STOCKS_SPACE=$(echo "$STOCKS" | tr ',' ' ') + + # 直接传递给 test_stock.py(不要用 while read 循环) + python3 scripts/test_stock.py $STOCKS_SPACE + else + echo "❌ .env 文件不存在" + exit 1 + fi else # 分析指定的股票 - 使用引号正确传递参数 echo "📊 分析股票: $*(将发送通知)" diff --git a/scripts/test_stock.py b/scripts/test_stock.py index e6181b5..8de92ca 100755 --- a/scripts/test_stock.py +++ b/scripts/test_stock.py @@ -20,6 +20,7 @@ from app.services.yfinance_service import get_yfinance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer +from app.config import get_settings from app.utils.logger import logger @@ -29,7 +30,21 @@ async def analyze(symbol: str, send_notification: bool = True): Args: symbol: 股票代码 send_notification: 是否发送通知(默认True) + + Returns: + 分析结果字典 """ + result = { + 'symbol': symbol, + 'price': 0, + 'signals': [], + 'notified': False + } + + # 获取配置 + settings = get_settings() + threshold = settings.stock_llm_threshold * 100 # 转换为百分比 + print(f"\n{'='*60}") print(f"📊 分析 {symbol}") print(f"{'='*60}") @@ -37,7 +52,7 @@ async def analyze(symbol: str, send_notification: bool = True): try: # 获取服务 yf_service = get_yfinance_service() - llm = LLMSignalAnalyzer() + llm = LLMSignalAnalyzer(agent_type="stock") # 指定使用 stock 模型配置 feishu = get_feishu_service() telegram = get_telegram_service() @@ -46,10 +61,11 @@ async def analyze(symbol: str, send_notification: bool = True): ticker = yf_service.get_ticker(symbol) if not ticker: print(f"❌ 无法获取 {symbol} 行情") - return + return result price = ticker['lastPrice'] change = ticker['priceChangePercent'] + result['price'] = price print(f"价格: ${price:,.2f} ({change:+.2f}%)") print(f"成交量: {ticker['volume']:,}") @@ -59,17 +75,18 @@ async def analyze(symbol: str, send_notification: bool = True): if not data: print(f"❌ 无法获取K线数据") - return + return result print(f"时间周期: {', '.join(data.keys())}") # LLM分析 print(f"\n🤖 LLM分析中...\n") - result = await llm.analyze(symbol, data, symbols=[symbol], position_info=None) + analysis = await llm.analyze(symbol, data, symbols=[symbol], position_info=None) # 输出结果 - summary = result.get('analysis_summary', '') - signals = result.get('signals', []) + summary = analysis.get('analysis_summary', '') + signals = analysis.get('signals', []) + result['signals'] = signals print(f"市场状态: {summary}") @@ -100,11 +117,11 @@ async def analyze(symbol: str, send_notification: bool = True): short_reason = reason[:80] + "..." if len(reason) > 80 else reason print(f" 理由: {short_reason}") - # 发送通知(仅发送置信度 >= 60% 的信号) + # 发送通知(仅发送置信度 >= 阈值的信号) if send_notification: best_signal = None for sig in signals: - if sig.get('confidence', 0) >= 60 and sig.get('grade', 'D') != 'D': + if sig.get('confidence', 0) >= threshold and sig.get('grade', 'D') != 'D': best_signal = sig break @@ -120,15 +137,211 @@ async def analyze(symbol: str, send_notification: bool = True): await feishu.send_card(title, content, color) await telegram.send_message(llm.format_signal_message(best_signal, symbol)) print(f"\n📬 通知已发送:{title}") + result['notified'] = True else: - print(f"\n⏸️ 置信度不足,不发送通知") + print(f"\n⏸️ 置信度不足,不发送通知(阈值: {threshold}%)") else: print(f"\n⏸️ 无交易信号") + return result + except Exception as e: print(f"❌ 错误: {e}") import traceback traceback.print_exc() + return result + + +def print_summary_report(results: list, send_notification: bool = True): + """打印汇总报告并发送通知 + + Args: + results: 分析结果列表 + send_notification: 是否发送通知(默认True) + """ + from app.config import get_settings + settings = get_settings() + threshold = settings.stock_llm_threshold * 100 # 获取阈值 + + total = len(results) + with_signals = [r for r in results if r.get('signals')] + notified = [r for r in results if r.get('notified')] + + # 统计信号 + buy_count = 0 + sell_count = 0 + high_quality_signals = [] # A/B级信号且达到阈值 + all_signals = [] # 所有信号 + + for r in with_signals: + for sig in r.get('signals', []): + sig['symbol'] = r['symbol'] + sig['current_price'] = r.get('price', 0) + all_signals.append(sig) + + if sig.get('action') == 'buy': + buy_count += 1 + elif sig.get('action') == 'sell': + sell_count += 1 + + # 只统计达到阈值的A/B级信号 + if sig.get('grade') in ['A', 'B'] and sig.get('confidence', 0) >= threshold: + high_quality_signals.append(sig) + + # 按置信度排序 + high_quality_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True) + all_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True) + + # 打印汇总 + print("\n" + "="*80) + print("📊 美股分析汇总报告") + print("="*80) + print(f"分析数量: {total} 只股票") + print(f"有信号: {len(with_signals)} 只") + print(f"已通知: {len(notified)} 只") + print(f"通知阈值: {threshold}%") + print("") + + # 显示高等级信号(达到阈值的) + if high_quality_signals: + print(f"⭐ 高等级信号达到阈值 (A/B级 >= {threshold}%): {len(high_quality_signals)} 个") + for sig in high_quality_signals[:10]: + symbol = sig['symbol'] + action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' + grade = sig.get('grade', 'D') + confidence = sig.get('confidence', 0) + price = sig.get('current_price', 0) + entry = sig.get('entry_price', 0) + + print(f" {symbol} {action} [{grade}级] {confidence}% @ ${price:,.2f}") + if entry > 0: + print(f" 入场: ${entry:,.2f}") + print("") + + # 显示未达到阈值但质量不错的信号 + below_threshold = [s for s in all_signals + if s.get('grade') in ['A', 'B'] and s.get('confidence', 0) < threshold] + if below_threshold: + print(f"⚠️ 以下信号未达到通知阈值 ({threshold}%):") + for sig in below_threshold[:10]: + symbol = sig['symbol'] + action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' + grade = sig.get('grade', 'D') + confidence = sig.get('confidence', 0) + print(f" {symbol} {action} {grade}级 {confidence}%") + print("") + + # 统计汇总 + print(f"📈 做多信号: {buy_count} 个") + print(f"📉 做空信号: {sell_count} 个") + print("="*80) + + # 发送汇总通知 + if send_notification: + asyncio.run(send_summary_notification( + results, total, with_signals, notified, + buy_count, sell_count, high_quality_signals, all_signals, threshold + )) + + +async def send_summary_notification( + results: list, + total: int, + with_signals: list, + notified: list, + buy_count: int, + sell_count: int, + high_quality_signals: list, + all_signals: list, + threshold: float +): + """发送汇总报告到飞书和Telegram + + Args: + results: 分析结果列表 + total: 总数 + with_signals: 有信号的股票列表 + notified: 已通知的股票列表 + buy_count: 做多信号数量 + sell_count: 做空信号数量 + high_quality_signals: 达到阈值的高等级信号列表 + all_signals: 所有信号列表 + threshold: 通知阈值 + """ + try: + from datetime import datetime + feishu = get_feishu_service() + telegram = get_telegram_service() + + now = datetime.now() + + # 构建飞书汇总内容 + content_parts = [ + f"**📊 美股分析汇总报告**", + f"", + f"⏰ 时间: {now.strftime('%Y-%m-%d %H:%M')}", + f"", + f"📊 **分析概况**", + f"• 分析总数: {total} 只", + f"• 发现信号: {len(with_signals)} 只", + f"• 已发通知: {len(notified)} 只", + f"• 通知阈值: {threshold:.0f}%", + f"", + ] + + # 高等级信号(达到阈值的) + if high_quality_signals: + content_parts.append(f"⭐ **高等级信号 (A/B级 ≥ {threshold:.0f}%)**") + for sig in high_quality_signals[:5]: + symbol = sig['symbol'] + action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' + grade = sig.get('grade', 'D') + confidence = sig.get('confidence', 0) + content_parts.append(f"• {symbol} {action} {grade}级 {confidence}%") + content_parts.append(f"") + + # 信号统计 + content_parts.extend([ + f"📈 做多信号: {buy_count} 个", + f"📉 做空信号: {sell_count} 个", + f"", + f"*⚠️ 仅供参考,不构成投资建议*" + ]) + + content = "\n".join(content_parts) + + # 发送飞书 + title = f"📊 美股分析汇总 ({now.strftime('%H:%M')})" + color = "blue" + + await feishu.send_card(title, content, color) + + # 发送 Telegram + telegram_msg = f"📊 *美股分析汇总*\n\n" + telegram_msg += f"时间: {now.strftime('%H:%M')}\n" + telegram_msg += f"分析: {total}只 | 信号: {len(with_signals)}只 | 通知: {len(notified)}只\n" + telegram_msg += f"阈值: {threshold:.0f}%\n\n" + + if high_quality_signals: + telegram_msg += f"⭐ *高等级信号 (≥{threshold:.0f}%)*\n" + for sig in high_quality_signals[:5]: + symbol = sig['symbol'] + action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' + grade = sig.get('grade', 'D') + confidence = sig.get('confidence', 0) + telegram_msg += f"{symbol} {action} {grade}级 {confidence}%\n" + telegram_msg += "\n" + + telegram_msg += f"做多: {buy_count} | 做空: {sell_count}" + + await telegram.send_message(telegram_msg) + + print(f"\n📬 汇总报告已发送到飞书和Telegram") + + except Exception as e: + print(f"❌ 发送汇总通知失败: {e}") + import traceback + traceback.print_exc() async def main(): @@ -167,8 +380,14 @@ async def main(): print(f"股票: {', '.join(symbols)}") print(f"时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + # 收集所有分析结果 + results = [] for symbol in symbols: - await analyze(symbol.upper()) + result = await analyze(symbol.upper(), send_notification=True) + results.append(result) + + # 生成汇总报告并发送通知 + print_summary_report(results, send_notification=True) print("\n" + "="*60) print("✅ 分析完成")