""" 加密货币交易智能体 - 主控制器 """ import asyncio from typing import Dict, Any, List, Optional from datetime import datetime, timedelta import pandas as pd from app.utils.logger import logger from app.config import get_settings from app.services.binance_service import binance_service from app.services.feishu_service import get_feishu_service from app.crypto_agent.signal_analyzer import SignalAnalyzer from app.crypto_agent.strategy import TrendFollowingStrategy class CryptoAgent: """加密货币交易信号智能体""" def __init__(self): """初始化智能体""" self.settings = get_settings() self.binance = binance_service self.feishu = get_feishu_service() self.analyzer = SignalAnalyzer() self.strategy = TrendFollowingStrategy() # 状态管理 self.last_signals: Dict[str, Dict[str, Any]] = {} # 上次信号 self.last_trends: Dict[str, str] = {} # 上次趋势 self.signal_cooldown: Dict[str, datetime] = {} # 信号冷却时间 # 配置 self.symbols = self.settings.crypto_symbols.split(',') self.analysis_interval = self.settings.crypto_analysis_interval self.llm_threshold = self.settings.crypto_llm_threshold # 运行状态 self.running = False logger.info(f"加密货币智能体初始化完成,监控交易对: {self.symbols}") def _get_seconds_until_next_5min(self) -> int: """计算距离下一个5分钟整点的秒数""" now = datetime.now() current_minute = now.minute current_second = now.second # 计算下一个5的倍数分钟 minutes_past = current_minute % 5 if minutes_past == 0 and current_second == 0: # 刚好在整点,立即执行 return 0 minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5 seconds_to_wait = minutes_to_wait * 60 - current_second return seconds_to_wait async def run(self): """主运行循环 - 在5的倍数分钟执行""" self.running = True logger.info("加密货币智能体开始运行...") # 发送启动通知 await self.feishu.send_text( f"🚀 加密货币智能体已启动\n" f"监控交易对: {', '.join(self.symbols)}\n" f"运行时间: 每5分钟整点 (:00, :05, :10, ...)" ) while self.running: try: # 等待到下一个5分钟整点 wait_seconds = self._get_seconds_until_next_5min() if wait_seconds > 0: next_run = datetime.now() + timedelta(seconds=wait_seconds) logger.info(f"等待 {wait_seconds} 秒,下次运行时间: {next_run.strftime('%H:%M:%S')}") await asyncio.sleep(wait_seconds) # 执行分析 run_time = datetime.now() logger.info(f"=== 开始分析 [{run_time.strftime('%H:%M:%S')}] ===") for symbol in self.symbols: await self.analyze_symbol(symbol) # 等待几秒确保不会在同一分钟内重复执行 await asyncio.sleep(2) except Exception as e: logger.error(f"分析循环出错: {e}") await asyncio.sleep(10) # 出错后等待10秒再继续 def stop(self): """停止运行""" self.running = False logger.info("加密货币智能体已停止") async def analyze_symbol(self, symbol: str): """ 分析单个交易对 Args: symbol: 交易对,如 'BTCUSDT' """ try: logger.info(f"开始分析 {symbol}...") # 1. 获取多周期数据 data = self.binance.get_multi_timeframe_data(symbol) if not self._validate_data(data): logger.warning(f"{symbol} 数据不完整,跳过分析") return # 2. 分析趋势(1H + 4H)- 返回详细趋势信息 trend = self.analyzer.analyze_trend(data['1h'], data['4h']) trend_direction = trend.get('direction', 'neutral') if isinstance(trend, dict) else trend # 3. 检查趋势变化 last_direction = self.last_trends.get(symbol, {}) if isinstance(last_direction, dict): last_direction = last_direction.get('direction', 'neutral') if last_direction and last_direction != trend_direction: await self._handle_trend_change(symbol, last_direction, trend_direction, data) self.last_trends[symbol] = trend # 4. 分析进场信号(15M 为主,5M 辅助) signal = self.analyzer.analyze_entry_signal(data['5m'], data['15m'], trend) signal['symbol'] = symbol signal['trend'] = trend_direction signal['trend_info'] = trend if isinstance(trend, dict) else {'direction': trend} signal['price'] = float(data['5m'].iloc[-1]['close']) signal['timestamp'] = datetime.now() # 5. 检查是否需要发送信号 if self._should_send_signal(symbol, signal): # 6. 计算止损止盈 atr = float(data['15m'].iloc[-1].get('atr', 0)) if atr > 0: sl_tp = self.analyzer.calculate_stop_loss_take_profit( signal['price'], signal['action'], atr ) signal.update(sl_tp) # 7. LLM 深度分析(置信度超过阈值时) if signal['confidence'] >= self.llm_threshold * 100: llm_result = await self.analyzer.llm_analyze(data, signal, symbol) # 处理 LLM 分析结果 if llm_result.get('parsed'): parsed = llm_result['parsed'] # 新格式使用 signal 而不是 recommendation recommendation = parsed.get('signal', parsed.get('recommendation', {})) # 如果 LLM 建议观望,降低置信度 if recommendation.get('action') == 'wait': signal['confidence'] = min(signal['confidence'], 40) signal['llm_analysis'] = llm_result.get('summary', 'LLM 建议观望') else: # 使用 LLM 的止损止盈建议 if recommendation.get('stop_loss'): signal['stop_loss'] = recommendation['stop_loss'] if recommendation.get('targets'): signal['take_profit'] = recommendation['targets'][0] elif recommendation.get('take_profit'): signal['take_profit'] = recommendation['take_profit'] signal['llm_analysis'] = llm_result.get('summary', '') else: signal['llm_analysis'] = llm_result.get('summary', llm_result.get('raw', '')[:200]) # 8. 发送飞书通知(置信度仍然足够高时) if signal['confidence'] >= 50: await self.feishu.send_trading_signal(signal) # 9. 更新状态 self.last_signals[symbol] = signal self.signal_cooldown[symbol] = datetime.now() logger.info(f"{symbol} 发送{signal['action']}信号,置信度: {signal['confidence']}%") except Exception as e: logger.error(f"分析 {symbol} 出错: {e}") def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool: """验证数据完整性""" required_intervals = ['5m', '15m', '1h', '4h'] for interval in required_intervals: if interval not in data or data[interval].empty: return False if len(data[interval]) < 20: # 至少需要20条数据 return False return True def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool: """ 判断是否应该发送信号 Args: symbol: 交易对 signal: 信号数据 Returns: 是否发送 """ # 如果是观望,不发送 if signal['action'] == 'hold': return False # 置信度太低,不发送 if signal['confidence'] < 50: return False # 检查冷却时间(同一交易对30分钟内不重复发送相同方向的信号) if symbol in self.signal_cooldown: cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30) if datetime.now() < cooldown_end: # 检查是否是相同方向的信号 if symbol in self.last_signals: if self.last_signals[symbol]['action'] == signal['action']: logger.debug(f"{symbol} 信号冷却中,跳过") return False return True async def _handle_trend_change(self, symbol: str, old_trend: str, new_trend: str, data: Dict[str, pd.DataFrame]): """处理趋势变化""" price = float(data['1h'].iloc[-1]['close']) await self.feishu.send_trend_change(symbol, old_trend, new_trend, price) logger.info(f"{symbol} 趋势变化: {old_trend} -> {new_trend}") async def analyze_once(self, symbol: str) -> Dict[str, Any]: """ 单次分析(用于测试或手动触发) Args: symbol: 交易对 Returns: 分析结果 """ data = self.binance.get_multi_timeframe_data(symbol) if not self._validate_data(data): return {'error': '数据不完整'} trend = self.analyzer.analyze_trend(data['1h'], data['4h']) signal = self.analyzer.analyze_entry_signal(data['5m'], data['15m'], trend) signal['symbol'] = symbol signal['trend'] = trend signal['price'] = float(data['5m'].iloc[-1]['close']) # 计算止损止盈 atr = float(data['15m'].iloc[-1].get('atr', 0)) if atr > 0: sl_tp = self.analyzer.calculate_stop_loss_take_profit( signal['price'], signal['action'], atr ) signal.update(sl_tp) return signal def get_status(self) -> Dict[str, Any]: """获取智能体状态""" return { 'running': self.running, 'symbols': self.symbols, 'analysis_interval': self.analysis_interval, 'last_signals': { symbol: { 'action': sig.get('action'), 'confidence': sig.get('confidence'), 'timestamp': sig.get('timestamp').isoformat() if sig.get('timestamp') else None } for symbol, sig in self.last_signals.items() }, 'last_trends': self.last_trends } # 全局实例 crypto_agent = CryptoAgent()