diff --git a/backend/app/config.py b/backend/app/config.py index 49a48ae..71e41f9 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -123,6 +123,14 @@ class Settings(BaseSettings): crypto_min_volatility_percent: float = 0.5 # 最小波动率(百分比),低于此值跳过分析 crypto_min_price_range_percent: float = 0.3 # 最小价格变动范围(百分比),低于此值跳过分析 crypto_5m_surge_threshold: float = 1.0 # 5分钟突发波动阈值(百分比),超过此值即使1小时波动率低也会触发分析 + crypto_intraday_llm_cooldown_minutes: int = 15 # 日内 LLM 分析冷却时间 + crypto_trend_llm_cooldown_minutes: int = 60 # 趋势 LLM 分析冷却时间 + crypto_force_llm_surge_threshold: float = 1.2 # 15分钟突发波动强制触发 LLM 的阈值 + crypto_force_llm_trade_zone_pct: float = 0.25 # 接近关键交易区时强制触发 LLM 的距离阈值 + crypto_event_analysis_enabled: bool = True # 是否启用实时行情事件触发分析 + crypto_event_analysis_window_minutes: int = 5 # 实时行情异动检测窗口 + crypto_event_analysis_price_change_percent: float = 0.8 # 检测窗口内涨跌超过该阈值触发日内分析 + crypto_event_analysis_cooldown_minutes: int = 10 # 同一交易对事件触发分析冷却 # Brave Search API 配置 brave_api_key: str = "" diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index c48fa1f..c90212d 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -238,6 +238,10 @@ class CryptoAgent: "last_signal_symbol": None, "last_heartbeat_notified_at": None, } + self._lane_analysis_state: Dict[str, Dict[str, Any]] = {} + self._event_analysis_state: Dict[str, Dict[str, Any]] = {} + self._event_analysis_tasks: Dict[str, asyncio.Task] = {} + self._price_monitor_registered = False # 挂单 TP/SL 追踪:挂单成交后自动补设止盈止损 # key=order_id, value={symbol, is_long, size/contracts, tp_price, sl_price} @@ -269,7 +273,7 @@ class CryptoAgent: "auto_trading_enabled": True, # 模拟交易始终启用 "hyperliquid_enabled": self.hyperliquid is not None, "bitget_enabled": self.bitget is not None, - "analysis_interval": "每5分钟整点" + "analysis_interval": "每5分钟轻扫描,LLM分层冷却" }) logger.info(f"加密货币智能体初始化完成(LLM 驱动),监控交易对: {self.symbols}") @@ -303,6 +307,24 @@ class CryptoAgent: def _touch_analysis_heartbeat(self): self._analysis_monitor["last_heartbeat_at"] = datetime.now().isoformat() + def _get_lane_state(self, symbol: str) -> Dict[str, Any]: + return self._lane_analysis_state.setdefault(symbol, { + "last_intraday_at": None, + "last_trend_at": None, + "cached_intraday": None, + "cached_trend": None, + "last_force_reason": "", + }) + + def _get_event_analysis_state(self, symbol: str) -> Dict[str, Any]: + return self._event_analysis_state.setdefault(symbol, { + "window_start_at": None, + "window_start_price": None, + "last_triggered_at": None, + "last_trigger_reason": "", + "last_price": None, + }) + def _parse_iso_datetime(self, value: Optional[str]) -> Optional[datetime]: if not value: return None @@ -447,6 +469,169 @@ class CryptoAgent: "last_alert_at": last_alert_at, } + def _detect_force_llm_trigger(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[bool, str]: + try: + df_5m = data.get('5m') + if df_5m is not None and len(df_5m) >= 3: + recent_5m = df_5m.iloc[-3:] + price_start = float(recent_5m.iloc[0]['close']) + price_end = float(recent_5m.iloc[-1]['close']) + if price_start > 0: + change_pct = abs(price_end - price_start) / price_start * 100 + threshold = self.settings.crypto_force_llm_surge_threshold + if change_pct >= threshold: + direction = "上涨" if price_end > price_start else "下跌" + return True, f"15分钟突发{direction} {change_pct:.2f}% >= {threshold:.2f}%" + + df_1h = data.get('1h') + if df_1h is not None and len(df_1h) >= 20 and df_5m is not None and not df_5m.empty: + current_price = float(df_5m.iloc[-1]['close']) + recent_1h = df_1h.iloc[-20:] + high = float(recent_1h['high'].max()) + low = float(recent_1h['low'].min()) + zone_threshold = self.settings.crypto_force_llm_trade_zone_pct + if current_price > 0: + high_distance = abs(high - current_price) / current_price * 100 + low_distance = abs(current_price - low) / current_price * 100 + if min(high_distance, low_distance) <= zone_threshold: + zone = "阻力" if high_distance <= low_distance else "支撑" + return True, f"价格接近20小时{zone}位,距离 {min(high_distance, low_distance):.2f}% <= {zone_threshold:.2f}%" + except Exception as e: + logger.debug(f"{symbol} 强制 LLM 触发检测失败: {e}") + + return False, "" + + def _resolve_llm_lanes_for_symbol(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[List[str], Dict[str, Any], str]: + now = datetime.now() + state = self._get_lane_state(symbol) + force, force_reason = self._detect_force_llm_trigger(symbol, data) + lanes: List[str] = [] + + intraday_last = self._parse_iso_datetime(state.get("last_intraday_at")) + trend_last = self._parse_iso_datetime(state.get("last_trend_at")) + intraday_cooldown = timedelta(minutes=self.settings.crypto_intraday_llm_cooldown_minutes) + trend_cooldown = timedelta(minutes=self.settings.crypto_trend_llm_cooldown_minutes) + + if force or intraday_last is None or not state.get("cached_intraday") or now - intraday_last >= intraday_cooldown: + lanes.append("intraday") + if force or trend_last is None or not state.get("cached_trend") or now - trend_last >= trend_cooldown: + lanes.append("trend") + + cached_results = {} + if state.get("cached_intraday"): + cached_results["intraday"] = state["cached_intraday"] + if state.get("cached_trend"): + cached_results["trend"] = state["cached_trend"] + + if not lanes and not cached_results: + lanes = ["intraday", "trend"] + + state["last_force_reason"] = force_reason if force else "" + return lanes, cached_results, force_reason + + def _update_lane_analysis_state(self, symbol: str, market_signal: Dict[str, Any]): + state = self._get_lane_state(symbol) + now_iso = datetime.now().isoformat() + lane_results = market_signal.get("lane_results") or {} + fresh_lanes = set((market_signal.get("llm_lanes") or {}).get("fresh") or []) + + if "intraday" in fresh_lanes and lane_results.get("intraday"): + state["last_intraday_at"] = now_iso + state["cached_intraday"] = lane_results["intraday"] + if "trend" in fresh_lanes and lane_results.get("trend"): + state["last_trend_at"] = now_iso + state["cached_trend"] = lane_results["trend"] + + def _register_price_event_monitor(self): + if self._price_monitor_registered or not self.settings.crypto_event_analysis_enabled: + return + + try: + from app.services.price_monitor_service import get_price_monitor_service + + monitor = get_price_monitor_service() + for symbol in self.symbols: + monitor.subscribe_symbol(symbol) + monitor.add_price_callback(self._on_realtime_price_update) + self._price_monitor_registered = True + logger.info("✅ CryptoAgent 已接入实时行情事件触发分析") + except Exception as e: + logger.warning(f"实时行情事件触发分析接入失败: {e}") + + def _on_realtime_price_update(self, symbol: str, price: float): + if not self.running or not self.settings.crypto_event_analysis_enabled: + return + if symbol not in self.symbols: + return + if not price or price <= 0: + return + + now = datetime.now() + state = self._get_event_analysis_state(symbol) + state["last_price"] = price + + window_minutes = self.settings.crypto_event_analysis_window_minutes + window_start_at = self._parse_iso_datetime(state.get("window_start_at")) + window_start_price = state.get("window_start_price") + + if not window_start_at or not window_start_price or now - window_start_at >= timedelta(minutes=window_minutes): + state["window_start_at"] = now.isoformat() + state["window_start_price"] = price + return + + change_pct = abs(price - float(window_start_price)) / float(window_start_price) * 100 + threshold = self.settings.crypto_event_analysis_price_change_percent + if change_pct < threshold: + return + + last_triggered_at = self._parse_iso_datetime(state.get("last_triggered_at")) + cooldown = timedelta(minutes=self.settings.crypto_event_analysis_cooldown_minutes) + if last_triggered_at and now - last_triggered_at < cooldown: + return + + if symbol in self._event_analysis_tasks and not self._event_analysis_tasks[symbol].done(): + return + + direction = "上涨" if price > float(window_start_price) else "下跌" + reason = f"{window_minutes}分钟内{direction} {change_pct:.2f}% >= {threshold:.2f}%" + state["last_triggered_at"] = now.isoformat() + state["last_trigger_reason"] = reason + state["window_start_at"] = now.isoformat() + state["window_start_price"] = price + + if self._event_loop and self._event_loop.is_running(): + asyncio.run_coroutine_threadsafe( + self._run_event_triggered_analysis(symbol, reason), + self._event_loop, + ) + logger.info(f"⚡ 已排队实时行情事件分析: {symbol} | {reason}") + else: + logger.warning(f"实时行情事件分析跳过: 事件循环不可用 ({symbol})") + + async def _run_event_triggered_analysis(self, symbol: str, reason: str): + current_task = asyncio.current_task() + if current_task: + self._event_analysis_tasks[symbol] = current_task + + self._record_analysis_event( + "event_analysis_triggered", + symbol=symbol, + status="info", + detail=reason, + extra={"trigger_reason": reason}, + ) + try: + await self.analyze_symbol( + symbol, + trigger_source="realtime_event", + force_lanes=["intraday"], + trigger_reason=reason, + ) + finally: + task = self._event_analysis_tasks.get(symbol) + if task is current_task: + self._event_analysis_tasks.pop(symbol, None) + async def _maybe_alert_tp_sl_incomplete(self, platform: str, tracking_key: str, @@ -759,7 +944,7 @@ class CryptoAgent: logger.info("🚀 加密货币交易信号智能体(LLM 驱动)") logger.info("=" * 60) logger.info(f" 监控交易对: {', '.join(self.symbols)}") - logger.info(f" 运行模式: 每5分钟整点执行") + logger.info(f" 运行模式: 每5分钟轻扫描,LLM分层冷却") logger.info(f" 分析引擎: LLM 自主分析") logger.info(f" 交易模式: 自动交易已启用") logger.info("=" * 60 + "\n") @@ -770,6 +955,7 @@ class CryptoAgent: # 注意:不再启动独立的价格监控 # 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查 logger.info(f"交易已启用(由后台统一监控)") + self._register_price_event_monitor() # 发送启动通知(卡片格式) title = "🚀 加密货币智能体已启动" @@ -779,7 +965,8 @@ class CryptoAgent: f"🤖 **驱动引擎**: LLM 自主分析", f"📊 **监控交易对**: {len(self.symbols)} 个", f" {', '.join(self.symbols)}", - f"⏰ **运行频率**: 每5分钟整点", + f"⏰ **运行频率**: 每5分钟轻扫描", + f"🧊 **LLM 冷却**: 日内 {self.settings.crypto_intraday_llm_cooldown_minutes} 分钟 / 趋势 {self.settings.crypto_trend_llm_cooldown_minutes} 分钟", f"💰 **交易系统**: 已启用(后台统一监控)", f"🎯 **分析维度**: 技术面 + 资金面 + 情绪面", ] @@ -977,7 +1164,11 @@ class CryptoAgent: logger.warning(f"{symbol} 波动率检查失败: {e},允许分析") return True, "波动率检查失败,允许分析", 0 - async def analyze_symbol(self, symbol: str): + async def analyze_symbol(self, + symbol: str, + trigger_source: str = "schedule", + force_lanes: Optional[List[str]] = None, + trigger_reason: str = ""): """ 分析单个交易对(信号分析 + 平台执行规则) @@ -1005,7 +1196,7 @@ class CryptoAgent: ) logger.info(f"\n{'─' * 50}") - logger.info(f"📊 {symbol} 分析开始") + logger.info(f"📊 {symbol} 分析开始 ({trigger_source})") logger.info(f"{'─' * 50}") # 1. 获取多周期数据 @@ -1048,12 +1239,38 @@ class CryptoAgent: # ============================================================ # 第一阶段:市场信号分析(不包含仓位信息) # ============================================================ - logger.info(f"\n🤖 【第一阶段:市场信号分析】") + lanes_to_run, cached_lane_results, force_reason = self._resolve_llm_lanes_for_symbol(symbol, data) + if force_lanes: + merged_lanes = set(lanes_to_run) + merged_lanes.update(force_lanes) + lanes_to_run = sorted(merged_lanes) + force_reason = trigger_reason or force_reason or f"{trigger_source} 强制刷新 {', '.join(force_lanes)}" + logger.info(f"\n🤖 【第一阶段:市场信号分析】 lanes={lanes_to_run or ['cache_only']}") + if force_reason: + logger.info(f" ⚡ 强制触发 LLM: {force_reason}") + elif not lanes_to_run: + logger.info(" 🧊 LLM 冷却中,使用上一轮 lane 缓存结果") + + self._record_analysis_event( + "llm_lane_plan", + symbol=symbol, + status="info", + detail=force_reason or (f"本轮执行 lane: {', '.join(lanes_to_run)}" if lanes_to_run else "LLM 冷却中,使用缓存 lane 结果"), + extra={ + "lanes_to_run": lanes_to_run, + "cache_only": not bool(lanes_to_run), + "force_reason": force_reason, + "trigger_source": trigger_source, + }, + ) market_signal = await self.market_analyzer.analyze( symbol, data, - symbols=self.symbols + symbols=self.symbols, + lanes=lanes_to_run, + cached_lane_results=cached_lane_results, ) + self._update_lane_analysis_state(symbol, market_signal) # 输出市场分析结果 self._log_market_signal(market_signal) @@ -4793,6 +5010,19 @@ class CryptoAgent: 'platform_halts': self.get_platform_halt_status(), 'analysis_monitor': self._analysis_monitor, 'analysis_notifications': self._analysis_notification_state, + 'lane_analysis_state': self._lane_analysis_state, + 'event_analysis_state': self._event_analysis_state, + 'llm_schedule': { + 'scan_interval_minutes': 5, + 'intraday_cooldown_minutes': self.settings.crypto_intraday_llm_cooldown_minutes, + 'trend_cooldown_minutes': self.settings.crypto_trend_llm_cooldown_minutes, + 'force_surge_threshold': self.settings.crypto_force_llm_surge_threshold, + 'force_trade_zone_pct': self.settings.crypto_force_llm_trade_zone_pct, + 'event_analysis_enabled': self.settings.crypto_event_analysis_enabled, + 'event_analysis_window_minutes': self.settings.crypto_event_analysis_window_minutes, + 'event_analysis_price_change_percent': self.settings.crypto_event_analysis_price_change_percent, + 'event_analysis_cooldown_minutes': self.settings.crypto_event_analysis_cooldown_minutes, + }, 'last_signals': { symbol: { 'type': sig.get('type'), diff --git a/backend/app/crypto_agent/market_signal_analyzer.py b/backend/app/crypto_agent/market_signal_analyzer.py index e5bbf59..34e3cb2 100644 --- a/backend/app/crypto_agent/market_signal_analyzer.py +++ b/backend/app/crypto_agent/market_signal_analyzer.py @@ -197,7 +197,9 @@ class MarketSignalAnalyzer: self.exchange = bitget_service async def analyze(self, symbol: str, data: Dict[str, Any], - symbols: List[str] = None) -> Dict[str, Any]: + symbols: List[str] = None, + lanes: Optional[List[str]] = None, + cached_lane_results: Optional[Dict[str, Dict[str, Any]]] = None) -> Dict[str, Any]: """ 分析市场并生成信号 @@ -219,50 +221,74 @@ class MarketSignalAnalyzer: # 3. 获取合约市场数据(资金费率、持仓量等) futures_context, futures_market_data = await self._get_futures_context(symbol) - # 4. 将日内和趋势拆成两次独立分析,避免一个 prompt 同时混做两件事 - intraday_prompt = self._build_analysis_prompt( - symbol=symbol, - lane="intraday", - market_context=market_context, - news_context=news_context, - futures_context=futures_context, - futures_market_data=futures_market_data, - ) - trend_prompt = self._build_analysis_prompt( - symbol=symbol, - lane="trend", - market_context=market_context, - news_context=news_context, - futures_context=futures_context, - futures_market_data=futures_market_data, - ) + lanes_to_run = set(lanes or ["intraday", "trend"]) + cached_lane_results = cached_lane_results or {} + lane_tasks = {} - intraday_messages = [ - {"role": "system", "content": self.INTRADAY_ANALYSIS_PROMPT}, - {"role": "user", "content": intraday_prompt} - ] - trend_messages = [ - {"role": "system", "content": self.TREND_ANALYSIS_PROMPT}, - {"role": "user", "content": trend_prompt} - ] - - intraday_response, trend_response = await asyncio.gather( - llm_service.achat( - intraday_messages, + if "intraday" in lanes_to_run: + intraday_prompt = self._build_analysis_prompt( + symbol=symbol, + lane="intraday", + market_context=market_context, + news_context=news_context, + futures_context=futures_context, + futures_market_data=futures_market_data, + ) + lane_tasks["intraday"] = llm_service.achat( + [ + {"role": "system", "content": self.INTRADAY_ANALYSIS_PROMPT}, + {"role": "user", "content": intraday_prompt} + ], temperature=self.INTRADAY_ANALYSIS_TEMPERATURE, max_tokens=self.ANALYSIS_MAX_TOKENS - ), - llm_service.achat( - trend_messages, + ) + + if "trend" in lanes_to_run: + trend_prompt = self._build_analysis_prompt( + symbol=symbol, + lane="trend", + market_context=market_context, + news_context=news_context, + futures_context=futures_context, + futures_market_data=futures_market_data, + ) + lane_tasks["trend"] = llm_service.achat( + [ + {"role": "system", "content": self.TREND_ANALYSIS_PROMPT}, + {"role": "user", "content": trend_prompt} + ], temperature=self.TREND_ANALYSIS_TEMPERATURE, max_tokens=self.ANALYSIS_MAX_TOKENS ) - ) - intraday_result = self._parse_llm_response(intraday_response or "", symbol) - trend_result = self._parse_llm_response(trend_response or "", symbol) + lane_responses = {} + if lane_tasks: + responses = await asyncio.gather(*lane_tasks.values()) + lane_responses = dict(zip(lane_tasks.keys(), responses)) + + intraday_result = ( + self._parse_llm_response(lane_responses.get("intraday") or "", symbol) + if "intraday" in lane_responses + else dict(cached_lane_results.get("intraday") or self._get_empty_signal(symbol)) + ) + trend_result = ( + self._parse_llm_response(lane_responses.get("trend") or "", symbol) + if "trend" in lane_responses + else dict(cached_lane_results.get("trend") or self._get_empty_signal(symbol)) + ) + intraday_result['_lane_source'] = 'fresh' if "intraday" in lane_responses else 'cache' + trend_result['_lane_source'] = 'fresh' if "trend" in lane_responses else 'cache' result = self._merge_lane_results(symbol, intraday_result, trend_result) + result['llm_lanes'] = { + 'requested': sorted(lanes_to_run), + 'fresh': sorted(lane_responses.keys()), + 'cached': sorted(set(["intraday", "trend"]) - set(lane_responses.keys())), + } + result['lane_results'] = { + 'intraday': intraday_result, + 'trend': trend_result, + } # 携带量化 regime 数据到最终结果,供执行层使用 if market_context.get('range_metrics'): diff --git a/frontend/console.html b/frontend/console.html index 9778054..a77b735 100644 --- a/frontend/console.html +++ b/frontend/console.html @@ -1192,6 +1192,31 @@ min-width: 92px; } + .lane-state-list { + display: grid; + gap: 8px; + margin-top: 10px; + } + + .lane-state-item { + display: grid; + grid-template-columns: 76px 1fr; + gap: 10px; + padding-top: 8px; + border-top: 1px solid rgba(255,255,255,0.06); + font-size: 12px; + } + + .lane-state-symbol { + font-family: "IBM Plex Mono", monospace; + color: var(--text); + } + + .lane-state-detail { + color: var(--muted); + line-height: 1.5; + } + .analysis-log-item { padding: 12px 14px; border-radius: 14px; @@ -2264,6 +2289,9 @@ const summaryCard = document.getElementById('runtimeSummaryCard'); const monitor = analysisMonitor || {}; const notifications = cachedConsoleData?.crypto_agent?.analysis_notifications || {}; + const schedule = cachedConsoleData?.crypto_agent?.llm_schedule || {}; + const laneState = cachedConsoleData?.crypto_agent?.lane_analysis_state || {}; + const eventState = cachedConsoleData?.crypto_agent?.event_analysis_state || {}; const cycleStatus = monitor.last_cycle_status || 'idle'; const progressText = monitor.current_cycle_total ? `${monitor.current_cycle_index || 0}/${monitor.current_cycle_total} ${monitor.current_cycle_symbol || ''}` @@ -2291,6 +2319,20 @@ const heartbeatSentAt = notifications.last_heartbeat_notified_at; const lastSignalAt = notifications.last_signal_at; const lastSignalSymbol = notifications.last_signal_symbol || '-'; + const laneRows = Object.entries(laneState).slice(0, 4).map(([symbol, state]) => ` +
+
${symbol}
+
+ 日内 ${state.last_intraday_at ? relativeTime(state.last_intraday_at) : '-'} / + 趋势 ${state.last_trend_at ? relativeTime(state.last_trend_at) : '-'} + ${state.last_force_reason ? `
强触发: ${state.last_force_reason}` : ''} +
+
+ `).join(''); + const latestEventTrigger = Object.entries(eventState) + .map(([symbol, state]) => ({ symbol, ...state })) + .filter((state) => state.last_triggered_at) + .sort((a, b) => new Date(b.last_triggered_at) - new Date(a.last_triggered_at))[0]; summaryCard.innerHTML = `
运行摘要
${monitor.last_analysis_status || 'idle'} / ${monitor.last_analysis_symbol || '-'}
@@ -2298,7 +2340,11 @@
最近分析说明${monitor.last_analysis_detail || '-'}
最近信号${lastSignalAt ? `${lastSignalSymbol} / ${relativeTime(lastSignalAt)}` : '近 60 分钟无信号'}
上次心跳通知${heartbeatSentAt ? `${relativeTime(heartbeatSentAt)} / ${formatTime(heartbeatSentAt)}` : '尚未发送'}
+
LLM 冷却日内 ${schedule.intraday_cooldown_minutes || '-'}m / 趋势 ${schedule.trend_cooldown_minutes || '-'}m
+
事件触发${schedule.event_analysis_enabled ? `${schedule.event_analysis_window_minutes || '-'}m / ${formatPercent(schedule.event_analysis_price_change_percent || 0, 1)} / 冷却${schedule.event_analysis_cooldown_minutes || '-'}m` : '关闭'}
+
最近异动分析${latestEventTrigger ? `${latestEventTrigger.symbol} / ${relativeTime(latestEventTrigger.last_triggered_at)}` : '暂无'}
+
${laneRows || '
暂无 lane 状态,等待下一轮分析。
'}
`; if (!analysisEvents || analysisEvents.length === 0) {