""" 加密货币交易智能体 - 主控制器(LLM 驱动版) """ import asyncio from typing import Dict, Any, List, Optional from datetime import datetime, timedelta import pandas as pd from app.utils.logger import logger from app.config import get_settings from app.services.binance_service import binance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service from app.services.paper_trading_service import get_paper_trading_service from app.services.signal_database_service import get_signal_db_service from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer class CryptoAgent: """加密货币交易信号智能体(LLM 驱动版)""" _instance = None _initialized = False def __new__(cls, *args, **kwargs): """单例模式 - 确保只有一个实例""" if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): """初始化智能体""" # 防止重复初始化 if CryptoAgent._initialized: return CryptoAgent._initialized = True self.settings = get_settings() self.binance = binance_service self.feishu = get_feishu_service() self.telegram = get_telegram_service() self.llm_analyzer = LLMSignalAnalyzer() self.signal_db = get_signal_db_service() # 信号数据库服务 # 模拟交易服务 self.paper_trading_enabled = self.settings.paper_trading_enabled if self.paper_trading_enabled: self.paper_trading = get_paper_trading_service() else: self.paper_trading = None # 状态管理 self.last_signals: Dict[str, Dict[str, Any]] = {} self.signal_cooldown: Dict[str, datetime] = {} # 配置 self.symbols = self.settings.crypto_symbols.split(',') # 运行状态 self.running = False self._event_loop = None logger.info(f"加密货币智能体初始化完成(LLM 驱动),监控交易对: {self.symbols}") if self.paper_trading_enabled: logger.info(f"模拟交易已启用") def _on_price_update(self, symbol: str, price: float): """处理实时价格更新(用于模拟交易)""" if not self.paper_trading: return triggered = self.paper_trading.check_price_triggers(symbol, price) for result in triggered: if self._event_loop and self._event_loop.is_running(): # 根据事件类型选择不同的通知方法 event_type = result.get('event_type', 'order_closed') if event_type == 'order_filled': asyncio.run_coroutine_threadsafe(self._notify_order_filled(result), self._event_loop) elif event_type == 'breakeven_triggered': asyncio.run_coroutine_threadsafe(self._notify_breakeven_triggered(result), self._event_loop) else: asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop) else: logger.warning(f"无法发送通知: 事件循环不可用") async def _notify_order_filled(self, result: Dict[str, Any]): """发送挂单成交通知""" side_text = "做多" if result.get('side') == 'long' else "做空" grade = result.get('signal_grade', 'N/A') message = f"""✅ 挂单成交 交易对: {result.get('symbol')} 方向: {side_text} 等级: {grade} 挂单价: ${result.get('entry_price', 0):,.2f} 成交价: ${result.get('filled_price', 0):,.2f} 仓位: ${result.get('quantity', 0):,.0f} 止损: ${result.get('stop_loss', 0):,.2f} 止盈: ${result.get('take_profit', 0):,.2f}""" await self.feishu.send_text(message) await self.telegram.send_message(message) logger.info(f"已发送挂单成交通知: {result.get('order_id')}") async def _notify_pending_cancelled(self, result: Dict[str, Any]): """发送挂单撤销通知""" side_text = "做多" if result.get('side') == 'long' else "做空" new_side_text = "做多" if result.get('new_side') == 'long' else "做空" message = f"""⚠️ 挂单撤销 交易对: {result.get('symbol')} 原方向: {side_text} 挂单价: ${result.get('entry_price', 0):,.2f} 原因: 收到反向{new_side_text}信号,自动撤销""" await self.feishu.send_text(message) await self.telegram.send_message(message) logger.info(f"已发送挂单撤销通知: {result.get('order_id')}") async def _notify_breakeven_triggered(self, result: Dict[str, Any]): """发送保本止损触发通知""" side_text = "做多" if result.get('side') == 'long' else "做空" message = f"""🛡️ 保本止损已启动 交易对: {result.get('symbol')} 方向: {side_text} 开仓价: ${result.get('filled_price', 0):,.2f} 当前盈利: {result.get('current_pnl_percent', 0):.2f}% 止损已移至: ${result.get('new_stop_loss', 0):,.2f} ✅ 本单已锁定保本,不会亏损""" await self.feishu.send_text(message) await self.telegram.send_message(message) logger.info(f"已发送保本止损通知: {result.get('order_id')}") async def _notify_order_closed(self, result: Dict[str, Any]): """发送订单平仓通知""" status = result.get('status', '') is_win = result.get('is_win', False) if status == 'closed_tp': emoji = "🎯" status_text = "止盈平仓" elif status == 'closed_sl': emoji = "🛑" status_text = "止损平仓" elif status == 'closed_be': emoji = "🔒" status_text = "保本止损" else: emoji = "📤" status_text = "手动平仓" win_text = "盈利" if is_win else "亏损" side_text = "做多" if result.get('side') == 'long' else "做空" message = f"""{emoji} 订单{status_text} 交易对: {result.get('symbol')} 方向: {side_text} 入场: ${result.get('entry_price', 0):,.2f} 出场: ${result.get('exit_price', 0):,.2f} {win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f}) 持仓时间: {result.get('hold_duration', 'N/A')}""" await self.feishu.send_text(message) await self.telegram.send_message(message) logger.info(f"已发送订单平仓通知: {result.get('order_id')}") def _get_seconds_until_next_5min(self) -> int: """计算距离下一个5分钟整点的秒数""" now = datetime.now() current_minute = now.minute current_second = now.second minutes_past = current_minute % 5 if minutes_past == 0 and current_second == 0: return 0 minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5 seconds_to_wait = minutes_to_wait * 60 - current_second return seconds_to_wait async def run(self): """主运行循环""" self.running = True self._event_loop = asyncio.get_event_loop() # 启动横幅 logger.info("\n" + "=" * 60) logger.info("🚀 加密货币交易信号智能体(LLM 驱动)") logger.info("=" * 60) logger.info(f" 监控交易对: {', '.join(self.symbols)}") logger.info(f" 运行模式: 每5分钟整点执行") logger.info(f" 分析引擎: LLM 自主分析") if self.paper_trading_enabled: logger.info(f" 模拟交易: 已启用") logger.info("=" * 60 + "\n") # 注意:不再启动独立的价格监控 # 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查 if self.paper_trading_enabled: logger.info(f"模拟交易已启用(由后台统一监控)") # 发送启动通知 await self.feishu.send_text( f"🚀 加密货币智能体已启动(LLM 驱动)\n" f"监控交易对: {', '.join(self.symbols)}\n" f"运行时间: 每5分钟整点" ) await self.telegram.send_startup_notification(self.symbols) while self.running: try: wait_seconds = self._get_seconds_until_next_5min() if wait_seconds > 0: next_run = datetime.now() + timedelta(seconds=wait_seconds) logger.info(f"⏳ 等待 {wait_seconds} 秒,下次运行: {next_run.strftime('%H:%M:%S')}") await asyncio.sleep(wait_seconds) run_time = datetime.now() logger.info("\n" + "=" * 60) logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]") logger.info("=" * 60) for symbol in self.symbols: await self.analyze_symbol(symbol) logger.info("\n" + "─" * 60) logger.info(f"✅ 本轮分析完成,共分析 {len(self.symbols)} 个交易对") logger.info("─" * 60 + "\n") await asyncio.sleep(2) except Exception as e: logger.error(f"❌ 分析循环出错: {e}") import traceback logger.error(traceback.format_exc()) await asyncio.sleep(10) def stop(self): """停止运行""" self.running = False logger.info("加密货币智能体已停止") async def analyze_symbol(self, symbol: str): """ 分析单个交易对(LLM 驱动) Args: symbol: 交易对,如 'BTCUSDT' """ try: logger.info(f"\n{'─' * 50}") logger.info(f"📊 {symbol} 分析开始") logger.info(f"{'─' * 50}") # 1. 获取多周期数据 data = self.binance.get_multi_timeframe_data(symbol) if not self._validate_data(data): logger.warning(f"⚠️ {symbol} 数据不完整,跳过分析") return # 当前价格 current_price = float(data['5m'].iloc[-1]['close']) price_change_24h = self._calculate_price_change(data['1h']) logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})") # 获取当前持仓信息(供 LLM 仓位决策) position_info = self.paper_trading.get_position_info() # 2. LLM 分析(包含新闻舆情和持仓信息) logger.info(f"\n🤖 【LLM 分析中...】") result = await self.llm_analyzer.analyze( symbol, data, symbols=self.symbols, position_info=position_info ) # 输出分析摘要 summary = result.get('analysis_summary', '无') logger.info(f" 市场状态: {summary}") # 输出新闻情绪 news_sentiment = result.get('news_sentiment', '') news_impact = result.get('news_impact', '') if news_sentiment: sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': '➖'}.get(news_sentiment, '') logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}") if news_impact: logger.info(f" 消息影响: {news_impact}") # 输出关键价位 levels = result.get('key_levels', {}) if levels.get('support') or levels.get('resistance'): support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]]) resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]]) logger.info(f" 支撑位: {support_str or '-'}") logger.info(f" 阻力位: {resistance_str or '-'}") # 2.5. 回顾并调整现有持仓(LLM 主动管理) if self.paper_trading_enabled and self.paper_trading: await self._review_and_adjust_positions(symbol, data) # 3. 处理信号 signals = result.get('signals', []) if not signals: logger.info(f"\n⏸️ 结论: 无交易信号,继续观望") return # 输出所有信号 logger.info(f"\n🎯 【发现 {len(signals)} 个信号】") for sig in signals: signal_type = sig.get('type', 'unknown') type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'} type_text = type_map.get(signal_type, signal_type) action = sig.get('action', 'wait') action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'} action_text = action_map.get(action, action) grade = sig.get('grade', 'D') confidence = sig.get('confidence', 0) grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(grade, '') logger.info(f"\n [{type_text}] {action_text}") logger.info(f" 等级: {grade} {grade_icon} | 置信度: {confidence}%") logger.info(f" 入场: ${sig.get('entry_price', 0):,.2f} | " f"止损: ${sig.get('stop_loss', 0):,.2f} | " f"止盈: ${sig.get('take_profit', 0):,.2f}") logger.info(f" 原因: {sig.get('reason', '无')[:100]}") if sig.get('risk_warning'): logger.info(f" 风险: {sig.get('risk_warning')}") # 4. 选择最佳信号发送通知 best_signal = self.llm_analyzer.get_best_signal(result) if best_signal and self._should_send_signal(symbol, best_signal): logger.info(f"\n📤 【发送通知】") # 构建通知消息 telegram_message = self.llm_analyzer.format_signal_message(best_signal, symbol) feishu_card = self.llm_analyzer.format_feishu_card(best_signal, symbol) # 发送通知 - 飞书使用卡片格式,Telegram 使用文本格式 await self.feishu.send_card( feishu_card['title'], feishu_card['content'], feishu_card['color'] ) await self.telegram.send_message(telegram_message) logger.info(f" ✅ 已发送信号通知") # 保存信号到数据库 signal_to_save = best_signal.copy() signal_to_save['signal_type'] = 'crypto' signal_to_save['symbol'] = symbol signal_to_save['current_price'] = current_price self.signal_db.add_signal(signal_to_save) # 更新状态 self.last_signals[symbol] = best_signal self.signal_cooldown[symbol] = datetime.now() # 5. 创建模拟订单 if self.paper_trading_enabled and self.paper_trading: grade = best_signal.get('grade', 'D') position_size = best_signal.get('position_size', 'light') if grade != 'D': # 转换信号格式以兼容 paper_trading paper_signal = self._convert_to_paper_signal(symbol, best_signal, current_price) result = self.paper_trading.create_order_from_signal(paper_signal, current_price) # 发送被取消挂单的通知 cancelled_orders = result.get('cancelled_orders', []) for cancelled in cancelled_orders: await self._notify_pending_cancelled(cancelled) # 记录新订单 order = result.get('order') if order: logger.info(f" 📝 已创建模拟订单: {order.order_id} | 仓位: {position_size}") else: if best_signal: logger.info(f"\n⏸️ 信号冷却中或置信度不足,不发送通知") except Exception as e: logger.error(f"❌ 分析 {symbol} 出错: {e}") import traceback logger.error(traceback.format_exc()) def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any], current_price: float) -> Dict[str, Any]: """转换 LLM 信号格式为模拟交易格式""" signal_type = signal.get('type', 'medium_term') type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'} # 获取入场类型和入场价 entry_type = signal.get('entry_type', 'market') entry_price = signal.get('entry_price', current_price) return { 'symbol': symbol, 'action': signal.get('action', 'hold'), 'entry_type': entry_type, # market 或 limit 'entry_price': entry_price, # 入场价(挂单价格) 'price': current_price, # 当前价格 'stop_loss': signal.get('stop_loss', 0), 'take_profit': signal.get('take_profit', 0), 'confidence': signal.get('confidence', 0), 'signal_grade': signal.get('grade', 'D'), 'signal_type': type_map.get(signal_type, 'swing'), 'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小 'reasons': [signal.get('reason', '')], 'timestamp': datetime.now() } def _calculate_price_change(self, h1_data: pd.DataFrame) -> str: """计算24小时价格变化""" if len(h1_data) < 24: return "N/A" price_now = h1_data.iloc[-1]['close'] price_24h_ago = h1_data.iloc[-24]['close'] change = ((price_now - price_24h_ago) / price_24h_ago) * 100 if change >= 0: return f"+{change:.2f}%" return f"{change:.2f}%" def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool: """验证数据完整性""" required_intervals = ['5m', '15m', '1h', '4h'] for interval in required_intervals: if interval not in data or data[interval].empty: return False if len(data[interval]) < 20: return False return True def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool: """判断是否应该发送信号""" action = signal.get('action', 'wait') if action == 'wait': return False confidence = signal.get('confidence', 0) # 使用配置文件中的阈值 threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比 if confidence < threshold: return False # 检查冷却时间(30分钟内不重复发送相同方向的信号) if symbol in self.signal_cooldown: cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30) if datetime.now() < cooldown_end: if symbol in self.last_signals: if self.last_signals[symbol].get('action') == action: logger.debug(f"{symbol} 信号冷却中,跳过") return False return True async def _review_and_adjust_positions( self, symbol: str, data: Dict[str, pd.DataFrame] ): """ 回顾并调整现有持仓(LLM 主动管理) 每次分析后自动回顾该交易对的所有持仓,让 LLM 决定是否需要: - 调整止损止盈 - 部分平仓 - 全部平仓 """ try: # 获取该交易对的所有活跃持仓(只看已成交的) active_orders = self.paper_trading.get_active_orders() positions = [ order for order in active_orders if order.get('symbol') == symbol and order.get('status') == 'open' and order.get('filled_price') # 只处理已成交的订单 ] if not positions: return # 没有持仓需要回顾 logger.info(f"\n🔄 【LLM 回顾持仓中...】共 {len(positions)} 个持仓") # 准备持仓数据 position_data = [] for order in positions: entry_price = order.get('filled_price') or order.get('entry_price', 0) current_price = self.binance.get_ticker(symbol).get('lastPrice', entry_price) if isinstance(current_price, str): current_price = float(current_price) # 计算盈亏百分比 side = order.get('side') if side == 'long': pnl_percent = ((current_price - entry_price) / entry_price) * 100 else: pnl_percent = ((entry_price - current_price) / entry_price) * 100 position_data.append({ 'order_id': order.get('order_id'), 'side': side, 'entry_price': entry_price, 'current_price': current_price, 'stop_loss': order.get('stop_loss', 0), 'take_profit': order.get('take_profit', 0), 'quantity': order.get('quantity', 0), 'pnl_percent': pnl_percent, 'open_time': order.get('open_time') }) # 调用 LLM 回顾分析 decisions = await self.llm_analyzer.review_positions(symbol, position_data, data) if not decisions: logger.info(" LLM 建议保持所有持仓不变") return # 执行 LLM 的调整建议 logger.info(f"\n📝 【LLM 调整建议】共 {len(decisions)} 个") for decision in decisions: order_id = decision.get('order_id') action = decision.get('action') reason = decision.get('reason', '') action_map = { 'HOLD': '保持', 'ADJUST_SL_TP': '调整止损止盈', 'PARTIAL_CLOSE': '部分平仓', 'FULL_CLOSE': '全部平仓' } action_text = action_map.get(action, action) logger.info(f" 订单 {order_id[:8]}: {action_text} - {reason}") # 执行调整 result = self.paper_trading.adjust_position_by_llm( order_id=order_id, action=action, new_sl=decision.get('new_sl'), new_tp=decision.get('new_tp'), close_percent=decision.get('close_percent') ) if result.get('success'): # 发送通知 await self._notify_position_adjustment(symbol, order_id, decision, result) # 如果是平仓操作,从活跃订单中移除 if action in ['PARTIAL_CLOSE', 'FULL_CLOSE']: closed_result = result.get('pnl', {}) if closed_result: pnl = closed_result.get('pnl', 0) pnl_percent = closed_result.get('pnl_percent', 0) logger.info(f" ✅ 已平仓: PnL ${pnl:+.2f} ({pnl_percent:+.1f}%)") else: logger.warning(f" ❌ 执行失败: {result.get('error')}") except Exception as e: logger.error(f"持仓回顾失败: {e}", exc_info=True) async def _notify_position_adjustment( self, symbol: str, order_id: str, decision: Dict[str, Any], result: Dict[str, Any] ): """发送持仓调整通知""" action = decision.get('action') reason = decision.get('reason', '') action_map = { 'ADJUST_SL_TP': '🔄 调整止损止盈', 'PARTIAL_CLOSE': '📤 部分平仓', 'FULL_CLOSE': '🚪 全部平仓' } action_text = action_map.get(action, action) message = f"""{action_text} 交易对: {symbol} 订单: {order_id[:8]} 原因: {reason}""" if action == 'ADJUST_SL_TP': changes = result.get('changes', []) message += f"\n调整内容: {', '.join(changes)}" elif action in ['PARTIAL_CLOSE', 'FULL_CLOSE']: pnl_info = result.get('pnl', {}) if pnl_info: pnl = pnl_info.get('pnl', 0) pnl_percent = pnl_info.get('pnl_percent', 0) message += f"\n实现盈亏: ${pnl:+.2f} ({pnl_percent:+.1f}%)" if action == 'PARTIAL_CLOSE': close_percent = decision.get('close_percent', 0) remaining = result.get('remaining_quantity', 0) message += f"\n平仓比例: {close_percent:.0f}%" message += f"\n剩余仓位: ${remaining:,.0f}" await self.feishu.send_text(message) await self.telegram.send_message(message) async def analyze_once(self, symbol: str) -> Dict[str, Any]: """单次分析(用于测试或手动触发)""" data = self.binance.get_multi_timeframe_data(symbol) if not self._validate_data(data): return {'error': '数据不完整'} # 获取持仓信息 position_info = self.paper_trading.get_position_info() result = await self.llm_analyzer.analyze( symbol, data, symbols=self.symbols, position_info=position_info ) return result def get_status(self) -> Dict[str, Any]: """获取智能体状态""" return { 'running': self.running, 'symbols': self.symbols, 'mode': 'LLM 驱动', 'last_signals': { symbol: { 'type': sig.get('type'), 'action': sig.get('action'), 'confidence': sig.get('confidence'), 'grade': sig.get('grade') } for symbol, sig in self.last_signals.items() } } # 全局单例 _crypto_agent: Optional['CryptoAgent'] = None def get_crypto_agent() -> 'CryptoAgent': """获取加密货币智能体单例""" # 直接使用类单例,不使用全局变量(避免 reload 时重置) return CryptoAgent()