""" 加密货币交易智能体 - 主控制器(LLM 驱动版) """ import asyncio from typing import Dict, Any, List, Optional from datetime import datetime, timedelta import pandas as pd from app.utils.logger import logger from app.config import get_settings from app.services.binance_service import binance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service from app.services.paper_trading_service import get_paper_trading_service from app.services.price_monitor_service import get_price_monitor_service from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer class CryptoAgent: """加密货币交易信号智能体(LLM 驱动版)""" def __init__(self): """初始化智能体""" self.settings = get_settings() self.binance = binance_service self.feishu = get_feishu_service() self.telegram = get_telegram_service() self.llm_analyzer = LLMSignalAnalyzer() # 模拟交易服务 self.paper_trading_enabled = self.settings.paper_trading_enabled if self.paper_trading_enabled: self.paper_trading = get_paper_trading_service() self.price_monitor = get_price_monitor_service() self.price_monitor.add_price_callback(self._on_price_update) else: self.paper_trading = None self.price_monitor = None # 状态管理 self.last_signals: Dict[str, Dict[str, Any]] = {} self.signal_cooldown: Dict[str, datetime] = {} # 配置 self.symbols = self.settings.crypto_symbols.split(',') # 运行状态 self.running = False self._event_loop = None logger.info(f"加密货币智能体初始化完成(LLM 驱动),监控交易对: {self.symbols}") if self.paper_trading_enabled: logger.info(f"模拟交易已启用") def _on_price_update(self, symbol: str, price: float): """处理实时价格更新(用于模拟交易)""" if not self.paper_trading: return triggered = self.paper_trading.check_price_triggers(symbol, price) for result in triggered: if self._event_loop and self._event_loop.is_running(): # 根据事件类型选择不同的通知方法 event_type = result.get('event_type', 'order_closed') if event_type == 'order_filled': asyncio.run_coroutine_threadsafe(self._notify_order_filled(result), self._event_loop) else: asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop) else: logger.warning(f"无法发送通知: 事件循环不可用") async def _notify_order_filled(self, result: Dict[str, Any]): """发送挂单成交通知""" side_text = "做多" if result.get('side') == 'long' else "做空" grade = result.get('signal_grade', 'N/A') message = f"""✅ 挂单成交 交易对: {result.get('symbol')} 方向: {side_text} 等级: {grade} 挂单价: ${result.get('entry_price', 0):,.2f} 成交价: ${result.get('filled_price', 0):,.2f} 仓位: ${result.get('quantity', 0):,.0f} 止损: ${result.get('stop_loss', 0):,.2f} 止盈: ${result.get('take_profit', 0):,.2f}""" await self.feishu.send_text(message) await self.telegram.send_message(message) logger.info(f"已发送挂单成交通知: {result.get('order_id')}") async def _notify_pending_cancelled(self, result: Dict[str, Any]): """发送挂单撤销通知""" side_text = "做多" if result.get('side') == 'long' else "做空" new_side_text = "做多" if result.get('new_side') == 'long' else "做空" message = f"""⚠️ 挂单撤销 交易对: {result.get('symbol')} 原方向: {side_text} 挂单价: ${result.get('entry_price', 0):,.2f} 原因: 收到反向{new_side_text}信号,自动撤销""" await self.feishu.send_text(message) await self.telegram.send_message(message) logger.info(f"已发送挂单撤销通知: {result.get('order_id')}") async def _notify_order_closed(self, result: Dict[str, Any]): """发送订单平仓通知""" status = result.get('status', '') is_win = result.get('is_win', False) if status == 'closed_tp': emoji = "🎯" status_text = "止盈平仓" elif status == 'closed_sl': emoji = "🛑" status_text = "止损平仓" else: emoji = "📤" status_text = "手动平仓" win_text = "盈利" if is_win else "亏损" side_text = "做多" if result.get('side') == 'long' else "做空" message = f"""{emoji} 订单{status_text} 交易对: {result.get('symbol')} 方向: {side_text} 入场: ${result.get('entry_price', 0):,.2f} 出场: ${result.get('exit_price', 0):,.2f} {win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f}) 持仓时间: {result.get('hold_duration', 'N/A')}""" await self.feishu.send_text(message) await self.telegram.send_message(message) logger.info(f"已发送订单平仓通知: {result.get('order_id')}") def _get_seconds_until_next_5min(self) -> int: """计算距离下一个5分钟整点的秒数""" now = datetime.now() current_minute = now.minute current_second = now.second minutes_past = current_minute % 5 if minutes_past == 0 and current_second == 0: return 0 minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5 seconds_to_wait = minutes_to_wait * 60 - current_second return seconds_to_wait async def run(self): """主运行循环""" self.running = True self._event_loop = asyncio.get_event_loop() # 启动横幅 logger.info("\n" + "=" * 60) logger.info("🚀 加密货币交易信号智能体(LLM 驱动)") logger.info("=" * 60) logger.info(f" 监控交易对: {', '.join(self.symbols)}") logger.info(f" 运行模式: 每5分钟整点执行") logger.info(f" 分析引擎: LLM 自主分析") if self.paper_trading_enabled: logger.info(f" 模拟交易: 已启用") logger.info("=" * 60 + "\n") # 启动价格监控 if self.paper_trading_enabled and self.price_monitor: for symbol in self.symbols: self.price_monitor.subscribe_symbol(symbol) logger.info(f"已启动 WebSocket 价格监控: {', '.join(self.symbols)}") # 发送启动通知 await self.feishu.send_text( f"🚀 加密货币智能体已启动(LLM 驱动)\n" f"监控交易对: {', '.join(self.symbols)}\n" f"运行时间: 每5分钟整点" ) await self.telegram.send_startup_notification(self.symbols) while self.running: try: wait_seconds = self._get_seconds_until_next_5min() if wait_seconds > 0: next_run = datetime.now() + timedelta(seconds=wait_seconds) logger.info(f"⏳ 等待 {wait_seconds} 秒,下次运行: {next_run.strftime('%H:%M:%S')}") await asyncio.sleep(wait_seconds) run_time = datetime.now() logger.info("\n" + "=" * 60) logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]") logger.info("=" * 60) for symbol in self.symbols: await self.analyze_symbol(symbol) logger.info("\n" + "─" * 60) logger.info(f"✅ 本轮分析完成,共分析 {len(self.symbols)} 个交易对") logger.info("─" * 60 + "\n") await asyncio.sleep(2) except Exception as e: logger.error(f"❌ 分析循环出错: {e}") import traceback logger.error(traceback.format_exc()) await asyncio.sleep(10) def stop(self): """停止运行""" self.running = False if self.price_monitor: self.price_monitor.stop() logger.info("加密货币智能体已停止") async def analyze_symbol(self, symbol: str): """ 分析单个交易对(LLM 驱动) Args: symbol: 交易对,如 'BTCUSDT' """ try: logger.info(f"\n{'─' * 50}") logger.info(f"📊 {symbol} 分析开始") logger.info(f"{'─' * 50}") # 1. 获取多周期数据 data = self.binance.get_multi_timeframe_data(symbol) if not self._validate_data(data): logger.warning(f"⚠️ {symbol} 数据不完整,跳过分析") return # 当前价格 current_price = float(data['5m'].iloc[-1]['close']) price_change_24h = self._calculate_price_change(data['1h']) logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})") # 2. LLM 分析(包含新闻舆情) logger.info(f"\n🤖 【LLM 分析中...】") result = await self.llm_analyzer.analyze(symbol, data, symbols=self.symbols) # 输出分析摘要 summary = result.get('analysis_summary', '无') logger.info(f" 市场状态: {summary}") # 输出新闻情绪 news_sentiment = result.get('news_sentiment', '') news_impact = result.get('news_impact', '') if news_sentiment: sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': '➖'}.get(news_sentiment, '') logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}") if news_impact: logger.info(f" 消息影响: {news_impact}") # 输出关键价位 levels = result.get('key_levels', {}) if levels.get('support') or levels.get('resistance'): support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]]) resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]]) logger.info(f" 支撑位: {support_str or '-'}") logger.info(f" 阻力位: {resistance_str or '-'}") # 3. 处理信号 signals = result.get('signals', []) if not signals: logger.info(f"\n⏸️ 结论: 无交易信号,继续观望") return # 输出所有信号 logger.info(f"\n🎯 【发现 {len(signals)} 个信号】") for sig in signals: signal_type = sig.get('type', 'unknown') type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'} type_text = type_map.get(signal_type, signal_type) action = sig.get('action', 'wait') action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'} action_text = action_map.get(action, action) grade = sig.get('grade', 'D') confidence = sig.get('confidence', 0) grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(grade, '') logger.info(f"\n [{type_text}] {action_text}") logger.info(f" 等级: {grade} {grade_icon} | 置信度: {confidence}%") logger.info(f" 入场: ${sig.get('entry_price', 0):,.2f} | " f"止损: ${sig.get('stop_loss', 0):,.2f} | " f"止盈: ${sig.get('take_profit', 0):,.2f}") logger.info(f" 原因: {sig.get('reason', '无')[:100]}") if sig.get('risk_warning'): logger.info(f" 风险: {sig.get('risk_warning')}") # 4. 选择最佳信号发送通知 best_signal = self.llm_analyzer.get_best_signal(result) if best_signal and self._should_send_signal(symbol, best_signal): logger.info(f"\n📤 【发送通知】") # 构建通知消息 telegram_message = self.llm_analyzer.format_signal_message(best_signal, symbol) feishu_card = self.llm_analyzer.format_feishu_card(best_signal, symbol) # 发送通知 - 飞书使用卡片格式,Telegram 使用文本格式 await self.feishu.send_card( feishu_card['title'], feishu_card['content'], feishu_card['color'] ) await self.telegram.send_message(telegram_message) logger.info(f" ✅ 已发送信号通知") # 更新状态 self.last_signals[symbol] = best_signal self.signal_cooldown[symbol] = datetime.now() # 5. 创建模拟订单 if self.paper_trading_enabled and self.paper_trading: grade = best_signal.get('grade', 'D') if grade != 'D': # 转换信号格式以兼容 paper_trading paper_signal = self._convert_to_paper_signal(symbol, best_signal, current_price) result = self.paper_trading.create_order_from_signal(paper_signal) # 发送被取消挂单的通知 cancelled_orders = result.get('cancelled_orders', []) for cancelled in cancelled_orders: await self._notify_pending_cancelled(cancelled) # 记录新订单 order = result.get('order') if order: logger.info(f" 📝 已创建模拟订单: {order.order_id}") else: if best_signal: logger.info(f"\n⏸️ 信号冷却中或置信度不足,不发送通知") except Exception as e: logger.error(f"❌ 分析 {symbol} 出错: {e}") import traceback logger.error(traceback.format_exc()) def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any], current_price: float) -> Dict[str, Any]: """转换 LLM 信号格式为模拟交易格式""" signal_type = signal.get('type', 'medium_term') type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'} # 获取入场类型和入场价 entry_type = signal.get('entry_type', 'market') entry_price = signal.get('entry_price', current_price) return { 'symbol': symbol, 'action': signal.get('action', 'hold'), 'entry_type': entry_type, # market 或 limit 'entry_price': entry_price, # 入场价(挂单价格) 'price': current_price, # 当前价格 'stop_loss': signal.get('stop_loss', 0), 'take_profit': signal.get('take_profit', 0), 'confidence': signal.get('confidence', 0), 'signal_grade': signal.get('grade', 'D'), 'signal_type': type_map.get(signal_type, 'swing'), 'reasons': [signal.get('reason', '')], 'timestamp': datetime.now() } def _calculate_price_change(self, h1_data: pd.DataFrame) -> str: """计算24小时价格变化""" if len(h1_data) < 24: return "N/A" price_now = h1_data.iloc[-1]['close'] price_24h_ago = h1_data.iloc[-24]['close'] change = ((price_now - price_24h_ago) / price_24h_ago) * 100 if change >= 0: return f"+{change:.2f}%" return f"{change:.2f}%" def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool: """验证数据完整性""" required_intervals = ['5m', '15m', '1h', '4h'] for interval in required_intervals: if interval not in data or data[interval].empty: return False if len(data[interval]) < 20: return False return True def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool: """判断是否应该发送信号""" action = signal.get('action', 'wait') if action == 'wait': return False confidence = signal.get('confidence', 0) # 使用配置文件中的阈值 threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比 if confidence < threshold: return False # 检查冷却时间(30分钟内不重复发送相同方向的信号) if symbol in self.signal_cooldown: cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30) if datetime.now() < cooldown_end: if symbol in self.last_signals: if self.last_signals[symbol].get('action') == action: logger.debug(f"{symbol} 信号冷却中,跳过") return False return True async def analyze_once(self, symbol: str) -> Dict[str, Any]: """单次分析(用于测试或手动触发)""" data = self.binance.get_multi_timeframe_data(symbol) if not self._validate_data(data): return {'error': '数据不完整'} result = await self.llm_analyzer.analyze(symbol, data, symbols=self.symbols) return result def get_status(self) -> Dict[str, Any]: """获取智能体状态""" return { 'running': self.running, 'symbols': self.symbols, 'mode': 'LLM 驱动', 'last_signals': { symbol: { 'type': sig.get('type'), 'action': sig.get('action'), 'confidence': sig.get('confidence'), 'grade': sig.get('grade') } for symbol, sig in self.last_signals.items() } } # 全局实例 crypto_agent = CryptoAgent()