""" 美股交易智能体 - 主控制器(LLM 驱动版) 只进行市场分析和通知,不执行模拟交易 """ import asyncio from typing import Dict, Any, List, Optional from datetime import datetime, timedelta import pandas as pd from app.utils.logger import logger from app.config import get_settings from app.services.yfinance_service import get_yfinance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service from app.services.signal_database_service import get_signal_db_service from app.services.fundamental_service import get_fundamental_service from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer # 股票名称映射表 STOCK_NAMES = { # 美股 - 科技龙头 'AAPL': '苹果', 'MSFT': '微软', 'GOOGL': '谷歌', 'META': 'Meta', 'AMZN': '亚马逊', 'NVDA': '英伟达', 'AMD': 'AMD', 'AVGO': '博通', 'ARM': 'ARM', 'PLTR': 'Palantir', 'SNOW': 'Snowflake', # 美股 - 生物医疗 'LLY': '礼来', 'NVO': '诺和诺德', 'VRTX': 'Vertex', # 美股 - 新能源/汽车 'TSLA': '特斯拉', 'ENPH': 'Enphase', # 美股 - 金融 'V': 'Visa', 'MA': 'Mastercard', # 美股 - 消费 'HD': 'Home Depot', 'COST': 'Costco', # 美股 - 其他 'RKLB': 'Relativity Space', 'HOOD': 'Robinhood', 'DXYZ': 'DEX', 'GLW': '康宁', 'UNTY': 'Unity', 'CRM': 'Salesforce', 'ADBE': 'Adobe', 'INTC': '英特尔', 'FSLR': 'First Solar', 'CRWD': 'CrowdStrike', 'SHOP': 'Shopify', 'NET': 'Cloudflare', 'COIN': 'Coinbase', 'MSTR': 'MicroStrategy', # 港股 '0700.HK': '腾讯', '9988.HK': '阿里巴巴', '1810.HK': '小米', '2015.HK': '理想汽车', '9866.HK': '蔚来', '9992.HK': '泡泡玛特', '9626.HK': '哔哩哔哩', '9880.HK': '优必选', } class StockAgent: """美股交易信号智能体(LLM 驱动,仅分析通知)""" def __init__(self): """初始化智能体""" self.settings = get_settings() self.yfinance = get_yfinance_service() self.feishu = get_feishu_service() self.telegram = get_telegram_service() self.llm_analyzer = LLMSignalAnalyzer(agent_type="stock") # 指定使用 stock 模型配置 self.signal_db = get_signal_db_service() # 信号数据库服务 self.fundamental = get_fundamental_service() # 基本面数据服务 # 状态管理 self.last_signals: Dict[str, Dict[str, Any]] = {} self.signal_cooldown: Dict[str, datetime] = {} # 配置 - 分别读取美股和港股 us_symbols = self.settings.stock_symbols_us.split(',') if self.settings.stock_symbols_us else [] hk_symbols = self.settings.stock_symbols_hk.split(',') if self.settings.stock_symbols_hk else [] self.symbols = us_symbols + hk_symbols # 运行状态 self.running = False self._event_loop = None self._task = None logger.info(f"股票智能体初始化完成 - 美股: {len(us_symbols)}只, 港股: {len(hk_symbols)}只, 总计: {len(self.symbols)}只") @staticmethod def get_stock_name(symbol: str) -> str: """获取股票中文名称""" return STOCK_NAMES.get(symbol, symbol) async def start(self): """启动智能体""" if self.running: logger.warning("美股智能体已在运行中") return self.running = True self._event_loop = asyncio.get_event_loop() logger.info("美股智能体已启动") # 启动分析任务 self._task = asyncio.create_task(self._analysis_loop()) async def stop(self): """停止智能体""" self.running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass logger.info("美股智能体已停止") async def _analysis_loop(self): """分析循环 - 根据交易时间分析对应市场的股票""" while self.running: try: # 计算距离下一个整点的时间 now = datetime.now() next_hour = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) wait_seconds = (next_hour - now).total_seconds() logger.info(f"等待到下一个整点: {next_hour.strftime('%H:%M')} (等待 {int(wait_seconds)} 秒)") # 等待到整点 await asyncio.sleep(wait_seconds) # 分类股票:美股和港股 us_stocks = [s for s in self.symbols if not s.endswith('.HK')] hk_stocks = [s for s in self.symbols if s.endswith('.HK')] # 检查各市场交易时间 us_market_open = self._is_market_hours('US') hk_market_open = self._is_market_hours('0700.HK') # 检查是否是盘后分析时间 us_after_hours = self._is_after_hours('US') hk_after_hours = self._is_after_hours('0700.HK') # 确定要分析的股票列表 stocks_to_analyze = [] analysis_type = "盘中" # 默认为盘中分析 # 盘后分析:优先级更高,用于日线级别分析 if us_after_hours or hk_after_hours: analysis_type = "盘后" if us_after_hours: stocks_to_analyze.extend(us_stocks) logger.info(f"美股盘后分析,分析 {len(us_stocks)} 只美股(日线级别)") if hk_after_hours: stocks_to_analyze.extend(hk_stocks) logger.info(f"港股盘后分析,分析 {len(hk_stocks)} 只港股(日线级别)") else: # 盘中分析 if us_market_open: stocks_to_analyze.extend(us_stocks) logger.info(f"美股交易时间,分析 {len(us_stocks)} 只美股") if hk_market_open: stocks_to_analyze.extend(hk_stocks) logger.info(f"港股交易时间,分析 {len(hk_stocks)} 只港股") # 如果没有需要分析的股票 if not stocks_to_analyze: logger.debug("没有需要分析的股票") continue # 分析股票并收集结果 logger.info(f"开始{analysis_type}分析 {len(stocks_to_analyze)} 只股票") analysis_results = [] for symbol in stocks_to_analyze: if not self.running: break result = await self.analyze_symbol(symbol, is_after_hours=(analysis_type == "盘后")) if result: analysis_results.append(result) # 生成并发送汇总报告 await self._send_summary_report(analysis_results, analysis_type) logger.info(f"本次{analysis_type}分析完成") except Exception as e: logger.error(f"分析循环出错: {e}") await asyncio.sleep(60) # 出错后等待 1 分钟再重试 def _is_market_hours(self, symbol: str = None) -> bool: """ 判断当前是否在交易时间 美股交易时间: 周一至周五 9:30-16:00 (EST) 北京时间: - 冬令时 (11月-3月): 22:30-05:00 (次日) - 夏令时 (3月-11月): 21:30-04:00 (次日) 港股交易时间: 周一至周五 北京时间: - 上午: 09:30-12:00 - 下午: 13:00-16:00 Args: symbol: 股票代码(用于判断是美股还是港股) Returns: 是否在交易时间 """ from datetime import datetime # 获取当前时间 now = datetime.now() # 检查是否为周末 if now.weekday() >= 5: # 5=周六, 6=周日 return False # 判断是港股还是美股 is_hk_stock = symbol and symbol.endswith('.HK') if symbol else False # 获取当前小时和分钟 hour = now.hour minute = now.minute current_time = hour * 100 + minute # 转换为数字,如 2130 表示 21:30 if is_hk_stock: # 港股交易时间: 09:30-12:00 或 13:00-16:00 return (930 <= current_time < 1200) or (1300 <= current_time < 1600) else: # 美股交易时间 # 判断夏令时/冬令时(简单判断:3-11月为夏令时) is_summer = 3 <= now.month <= 11 if is_summer: # 夏令时: 21:30-04:00 (次日) # 即 2130-2359 或 0000-0400 if current_time >= 2130 or current_time < 400: return True else: # 冬令时: 22:30-05:00 (次日) # 即 2230-2359 或 0000-0500 if current_time >= 2230 or current_time < 500: return True return False def _is_any_market_hours(self) -> bool: """判断当前是否在任一市场的交易时间(美股或港股)""" return self._is_market_hours('US') or self._is_market_hours('0700.HK') def _is_after_hours(self, symbol: str) -> bool: """ 判断当前是否是盘后分析时间(收盘后2小时内) 美股收盘时间: - 夏令时: 北京时间 04:00 收盘 - 冬令时: 北京时间 05:00 收盘 港股收盘时间: 北京时间 16:00 收盘 盘后分析时间: 收盘后 2 小时内 Args: symbol: 股票代码(用于判断是美股还是港股) Returns: 是否是盘后分析时间 """ from datetime import datetime # 获取当前时间 now = datetime.now() # 检查是否为周末 if now.weekday() >= 5: # 5=周六, 6=周日 return False # 判断是港股还是美股 is_hk_stock = symbol and symbol.endswith('.HK') if symbol else False # 获取当前小时和分钟 hour = now.hour minute = now.minute current_time = hour * 100 + minute # 转换为数字,如 1630 表示 16:30 if is_hk_stock: # 港股盘后: 16:00-18:00 (收盘后2小时) return 1600 <= current_time < 1800 else: # 美股盘后 # 判断夏令时/冬令时(简单判断:3-11月为夏令时) is_summer = 3 <= now.month <= 11 if is_summer: # 夏令时: 04:00-06:00 (收盘后2小时) return 400 <= current_time < 600 else: # 冬令时: 05:00-07:00 (收盘后2小时) return 500 <= current_time < 700 return False async def analyze_symbol(self, symbol: str, is_after_hours: bool = False) -> Optional[Dict[str, Any]]: """ 分析单个股票 Args: symbol: 股票代码 is_after_hours: 是否是盘后分析(盘后会更关注日线级别机会) Returns: 分析结果字典,包含股票信息和信号 """ result = { 'symbol': symbol, 'current_price': 0, 'signals': [], 'analysis_summary': '', 'notified': False } try: # 1. 获取多时间周期数据 data = self.yfinance.get_multi_timeframe_data(symbol) # 2. 验证数据完整性 if not self._validate_data(data): logger.warning(f"{symbol} 数据不完整,跳过本次分析") return result # 3. 获取当前价格 ticker = self.yfinance.get_ticker(symbol) if not ticker: logger.warning(f"无法获取 {symbol} 当前价格") return result current_price = ticker['lastPrice'] result['current_price'] = current_price # 获取股票中文名称 stock_name = STOCK_NAMES.get(symbol, '') symbol_display = f"{stock_name}({symbol})" if stock_name else symbol logger.info(f"\n{'='*60}") logger.info(f"📊 分析 {symbol_display} @ ${current_price:,.2f}") logger.info(f"{'='*60}") # 4. 获取基本面数据 logger.info(f"\n📈 【基本面分析】") fundamental_data = None fundamental_summary = "" try: fundamental_data = self.fundamental.get_fundamental_data(symbol) if fundamental_data: # 传递已获取的数据,避免重复调用 fundamental_summary = self.fundamental.get_fundamental_summary(symbol, fundamental_data) # 基本面评分已经在 fundamental_service 中输出 else: logger.warning(f" ⚠️ 无法获取基本面数据") except Exception as e: logger.warning(f" ⚠️ 获取基本面数据失败: {e}") # 5. LLM 分析 logger.info(f"\n🤖 【LLM 分析中...】") analysis = await self.llm_analyzer.analyze( symbol, data, symbols=self.symbols, position_info=None, # 美股不跟踪持仓 fundamental_data=fundamental_data, # 传递基本面数据 fundamental_summary=fundamental_summary # 传递基本面摘要 ) # 输出分析摘要 summary = analysis.get('analysis_summary', '无') result['analysis_summary'] = summary logger.info(f" 市场状态: {summary}") # 输出新闻情绪 news_sentiment = analysis.get('news_sentiment', '') news_impact = analysis.get('news_impact', '') if news_sentiment: sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': '➖'}.get(news_sentiment, '') logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}") if news_impact: logger.info(f" 消息影响: {news_impact}") # 输出关键价位 levels = analysis.get('key_levels', {}) if levels.get('support') or levels.get('resistance'): support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]]) resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]]) logger.info(f" 支撑位: {support_str or '-'}") logger.info(f" 阻力位: {resistance_str or '-'}") # 5. 处理信号 signals = analysis.get('signals', []) result['signals'] = signals if not signals: logger.info(f"\n⏸️ 结论: 无交易信号,继续观望") return result # 输出所有信号 logger.info(f"\n🎯 【发现 {len(signals)} 个信号】") for sig in signals: signal_type = sig.get('type', 'unknown') type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'} type_text = type_map.get(signal_type, signal_type) action = sig.get('action', 'wait') action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'} action_text = action_map.get(action, action) grade = sig.get('grade', 'D') confidence = sig.get('confidence', 0) grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(grade, '') logger.info(f"\n {type_text} {action_text} [{grade}级{grade_icon}] {confidence}%") # 6. 过滤并通知最佳信号 best_signal = self._get_best_signal(signals) if not best_signal: logger.info(f"\n⏸️ 信号质量不高,不发送通知") return result # 检查置信度阈值 threshold = self.settings.stock_llm_threshold * 100 if best_signal.get('confidence', 0) < threshold: logger.info(f"\n⏸️ 置信度不足 ({best_signal.get('confidence', 0)}% < {threshold}%)") return result # 检查冷却时间 if not self._should_send_signal(symbol, best_signal): logger.info(f"\n⏸️ 信号冷却中,不发送通知") return result # 发送通知 await self._send_signal_notification(symbol, best_signal, current_price) result['notified'] = True result['best_signal'] = best_signal # 更新状态 self.last_signals[symbol] = best_signal self.signal_cooldown[symbol] = datetime.now() return result except Exception as e: logger.error(f"❌ 分析 {symbol} 出错: {e}") import traceback logger.error(traceback.format_exc()) return result def _get_best_signal(self, signals: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """获取最佳信号""" # 过滤掉 D 级信号 valid_signals = [s for s in signals if s.get('grade', 'D') != 'D'] if not valid_signals: return None # 按等级和置信度排序 grade_order = {'A': 0, 'B': 1, 'C': 2} valid_signals.sort(key=lambda x: ( grade_order.get(x.get('grade', 'C'), 3), -x.get('confidence', 0) )) return valid_signals[0] def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool: """判断是否应该发送信号""" action = signal.get('action', 'wait') if action == 'wait': return False # 检查冷却时间(60分钟内不重复发送相同方向的信号) if symbol in self.signal_cooldown: cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=60) if datetime.now() < cooldown_end: if symbol in self.last_signals: if self.last_signals[symbol].get('action') == action: logger.debug(f"{symbol} 信号冷却中,跳过") return False return True async def _send_signal_notification( self, symbol: str, signal: Dict[str, Any], current_price: float ): """发送信号通知""" try: # 使用正确的方法格式化信号 card = self.llm_analyzer.format_feishu_card(signal, symbol) title = card['title'] content = card['content'] # 根据信号方向选择颜色 color = "green" if signal.get('action') == 'buy' else "red" # 发送到飞书 await self.feishu.send_card(title, content, color) # 发送到 Telegram await self.telegram.send_message(self.llm_analyzer.format_signal_message(signal, symbol)) logger.info(f"✅ 信号通知已发送: {title}") # 保存信号到数据库 signal_to_save = signal.copy() signal_to_save['signal_type'] = 'stock' signal_to_save['symbol'] = symbol signal_to_save['current_price'] = current_price self.signal_db.add_signal(signal_to_save) except Exception as e: logger.error(f"发送通知失败: {e}") def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool: """验证数据完整性""" required_intervals = ['1d', '1h'] for interval in required_intervals: if interval not in data or data[interval].empty: return False if len(data[interval]) < 20: return False return True async def analyze_once(self, symbol: str) -> Dict[str, Any]: """单次分析(用于测试或手动触发)""" data = self.yfinance.get_multi_timeframe_data(symbol) if not self._validate_data(data): return {'error': '数据不完整'} # 获取基本面数据 fundamental_data = None fundamental_summary = "" try: fundamental_data = self.fundamental.get_fundamental_data(symbol) if fundamental_data: # 传递已获取的数据,避免重复调用 fundamental_summary = self.fundamental.get_fundamental_summary(symbol, fundamental_data) except Exception as e: logger.warning(f"获取基本面数据失败: {e}") result = await self.llm_analyzer.analyze( symbol, data, symbols=self.symbols, position_info=None, fundamental_data=fundamental_data, fundamental_summary=fundamental_summary ) return result def get_status(self) -> Dict[str, Any]: """获取智能体状态""" return { 'running': self.running, 'symbols': self.symbols, 'mode': 'LLM 驱动(仅分析通知)', 'last_signals': { symbol: { 'type': sig.get('type'), 'action': sig.get('action'), 'confidence': sig.get('confidence'), 'grade': sig.get('grade') } for symbol, sig in self.last_signals.items() } } async def _send_summary_report(self, results: List[Dict[str, Any]], analysis_type: str = "盘中"): """ 生成并发送分析汇总报告 Args: results: 所有股票的分析结果列表 analysis_type: 分析类型 ("盘中" 或 "盘后") """ try: now = datetime.now() total = len(results) with_signals = [r for r in results if r.get('signals')] notified = [r for r in results if r.get('notified')] # 区分美股和港股 us_results = [r for r in results if not r['symbol'].endswith('.HK')] hk_results = [r for r in results if r['symbol'].endswith('.HK')] us_with_signals = [r for r in us_results if r.get('signals')] hk_with_signals = [r for r in hk_results if r.get('signals')] # 统计信号 buy_signals = [] sell_signals = [] high_quality_signals = [] # A/B级信号 for r in with_signals: for sig in r.get('signals', []): sig['symbol'] = r['symbol'] sig['current_price'] = r.get('current_price', 0) sig['is_hk'] = r['symbol'].endswith('.HK') sig['stock_name'] = STOCK_NAMES.get(r['symbol'], '') if sig.get('action') == 'buy': buy_signals.append(sig) elif sig.get('action') == 'sell': sell_signals.append(sig) if sig.get('grade') in ['A', 'B']: high_quality_signals.append(sig) # 按置信度排序 high_quality_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True) # 构建汇总报告 analysis_tag = f"【{analysis_type}分析】" logger.info(f"\n{'='*80}") logger.info(f"📊 股票分析汇总报告 {analysis_tag}") logger.info(f"{'='*80}") logger.info(f"时间: {now.strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"分析总数: {total} 只 (美股: {len(us_results)}, 港股: {len(hk_results)})") logger.info(f"有信号: {len(with_signals)} 只 (美股: {len(us_with_signals)}, 港股: {len(hk_with_signals)})") logger.info(f"已通知: {len(notified)} 只") logger.info(f"") # 显示高等级信号 if high_quality_signals: logger.info(f"⭐ 高等级信号 (A/B级): {len(high_quality_signals)} 个") for sig in high_quality_signals[:10]: # 最多显示10个 symbol = sig['symbol'] stock_name = sig.get('stock_name', '') market_tag = '[港股]' if sig.get('is_hk') else '[美股]' action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' grade = sig.get('grade', 'D') confidence = sig.get('confidence', 0) price = sig.get('current_price', 0) entry = sig.get('entry_price', 0) # 构建带名称的股票显示 symbol_display = f"{stock_name}({symbol})" if stock_name else symbol logger.info(f" {market_tag} {symbol_display} {action} [{grade}级] {confidence}% @ ${price:,.2f}") if entry > 0: logger.info(f" 入场: ${entry:,.2f}") logger.info(f"") # 统计汇总 logger.info(f"📈 做多信号: {len(buy_signals)} 个") logger.info(f"📉 做空信号: {len(sell_signals)} 个") logger.info(f"{'='*80}\n") # 发送飞书汇总 await self._send_feishu_summary( now, total, with_signals, notified, buy_signals, sell_signals, high_quality_signals, len(us_results), len(hk_results), analysis_type ) except Exception as e: logger.error(f"生成汇总报告失败: {e}") import traceback logger.error(traceback.format_exc()) async def _send_feishu_summary( self, now: datetime, total: int, with_signals: List, notified: List, buy_signals: List, sell_signals: List, high_quality_signals: List, us_count: int = 0, hk_count: int = 0, analysis_type: str = "盘中" ): """发送飞书汇总报告""" try: # 构建内容 analysis_tag = f"【{analysis_type}分析】" content_parts = [ f"**📊 股票分析汇总报告 {analysis_tag}**", f"", f"⏰ 时间: {now.strftime('%Y-%m-%d %H:%M')}", f"", f"📊 **分析概况**", f"• 美股: {us_count} 只 | 港股: {hk_count} 只", f"• 发现信号: {len(with_signals)} 只", f"• 已发通知: {len(notified)} 只", f"", ] # 高等级信号 if high_quality_signals: content_parts.append(f"⭐ **高等级信号 (A/B级)**") for sig in high_quality_signals[:5]: symbol = sig['symbol'] stock_name = sig.get('stock_name', '') market_tag = '[港股]' if sig.get('is_hk') else '[美股]' action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' grade = sig.get('grade', 'D') confidence = sig.get('confidence', 0) # 构建带名称的股票显示 symbol_display = f"{stock_name}({symbol})" if stock_name else symbol content_parts.append(f"• {market_tag} {symbol_display} {action} {grade}级 {confidence}%") content_parts.append(f"") # 信号统计 content_parts.extend([ f"📈 做多信号: {len(buy_signals)} 个", f"📉 做空信号: {len(sell_signals)} 个", f"", f"*⚠️ 仅供参考,不构成投资建议*" ]) content = "\n".join(content_parts) # 发送飞书 - 标题包含分析类型 type_tag = "盘后" if analysis_type == "盘后" else "分析" title = f"📊 股票{type_tag}汇总 ({now.strftime('%H:%M')})" color = "blue" await self.feishu.send_card(title, content, color) logger.info("✅ 汇总报告已发送到飞书") except Exception as e: logger.error(f"发送飞书汇总失败: {e}") # 全局单例 _stock_agent: Optional[StockAgent] = None def get_stock_agent() -> StockAgent: """获取美股智能体单例""" global _stock_agent if _stock_agent is None: _stock_agent = StockAgent() return _stock_agent