From dba6e569c6d1e3b5ddaece605c2781fe8ddf91dd Mon Sep 17 00:00:00 2001 From: aaron <> Date: Mon, 27 Apr 2026 11:47:27 +0800 Subject: [PATCH] 1 --- backend/app/crypto_agent/crypto_agent.py | 242 +++++++++++++++++- .../crypto_agent/market_signal_analyzer.py | 12 +- backend/app/crypto_agent/setup_policy.py | 14 +- frontend/console.html | 58 ++++- 4 files changed, 316 insertions(+), 10 deletions(-) diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index 0328037..c2e482f 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -3,7 +3,7 @@ """ import asyncio import math -from collections import deque +from collections import deque, defaultdict from typing import Dict, Any, List, Optional from datetime import datetime, timedelta import pandas as pd @@ -291,6 +291,28 @@ class CryptoAgent: "last_signal_symbol": None, "last_heartbeat_notified_at": None, } + self._analysis_funnel_stats: Dict[str, Any] = { + "total_triggers": 0, + "scheduled_triggers": 0, + "event_triggers": 0, + "manual_triggers": 0, + "data_invalid_skips": 0, + "volatility_skips": 0, + "llm_lane_calls": { + "intraday": 0, + "trend": 0, + }, + "llm_analyses": 0, + "cache_only_runs": 0, + "pre_regime_trade_signals": 0, + "post_regime_trade_signals": 0, + "regime_filtered_out": 0, + "no_trade_signal_runs": 0, + "threshold_filtered_runs": 0, + "valid_signal_runs": 0, + "valid_signals_total": 0, + "last_updated_at": None, + } self._lane_analysis_state: Dict[str, Dict[str, Any]] = {} self._event_analysis_state: Dict[str, Dict[str, Any]] = {} self._event_analysis_tasks: Dict[str, asyncio.Task] = {} @@ -800,6 +822,171 @@ class CryptoAgent: def get_recent_analysis_events(self, limit: int = 40) -> List[Dict[str, Any]]: return list(self._analysis_events)[:limit] + def _bump_analysis_stat(self, key: str, amount: int = 1): + self._analysis_funnel_stats[key] = int(self._analysis_funnel_stats.get(key, 0) or 0) + amount + self._analysis_funnel_stats["last_updated_at"] = datetime.now().isoformat() + + def _bump_lane_call(self, lane: str, amount: int = 1): + lane_calls = self._analysis_funnel_stats.setdefault("llm_lane_calls", {}) + lane_calls[lane] = int(lane_calls.get(lane, 0) or 0) + amount + self._analysis_funnel_stats["last_updated_at"] = datetime.now().isoformat() + + def _summarize_recent_analysis_funnel(self, hours: int = 24) -> Dict[str, Any]: + cutoff = datetime.now() - timedelta(hours=hours) + events = [] + for event in self._analysis_events: + timestamp = self._parse_iso_datetime(event.get("timestamp")) + if timestamp and timestamp >= cutoff: + events.append(event) + + summary: Dict[str, Any] = { + "window_hours": hours, + "total_events": len(events), + "triggered_symbols": 0, + "llm_runs": 0, + "cache_only_runs": 0, + "data_invalid_skips": 0, + "volatility_skips": 0, + "no_trade_signal_runs": 0, + "threshold_filtered_runs": 0, + "valid_signal_runs": 0, + "valid_signals_total": 0, + "symbols": {}, + "lane_calls": { + "intraday": 0, + "trend": 0, + }, + "lane_signal_counts": { + "short_term_pre": 0, + "medium_term_pre": 0, + "short_term_post": 0, + "medium_term_post": 0, + }, + "blocked_reason_counts": {}, + } + + symbol_stats: Dict[str, Dict[str, Any]] = defaultdict(lambda: { + "triggers": 0, + "llm_runs": 0, + "cache_only_runs": 0, + "data_invalid_skips": 0, + "volatility_skips": 0, + "no_trade_signal_runs": 0, + "threshold_filtered_runs": 0, + "valid_signal_runs": 0, + "valid_signals_total": 0, + "last_status": None, + "last_detail": None, + "last_event_at": None, + "lane_calls": { + "intraday": 0, + "trend": 0, + }, + "lane_signal_counts": { + "short_term_pre": 0, + "medium_term_pre": 0, + "short_term_post": 0, + "medium_term_post": 0, + }, + "blocked_reason_counts": {}, + }) + + blocked_reason_counts: Dict[str, int] = defaultdict(int) + + for event in reversed(events): + event_type = event.get("event_type") + symbol = event.get("symbol") or "" + stats = symbol_stats[symbol] if symbol else None + + if event_type == "symbol_analysis_started": + summary["triggered_symbols"] += 1 + if stats is not None: + stats["triggers"] += 1 + + if event_type == "llm_lane_plan": + cache_only = bool(event.get("cache_only")) + lanes_to_run = event.get("lanes_to_run") or [] + if cache_only: + summary["cache_only_runs"] += 1 + if stats is not None: + stats["cache_only_runs"] += 1 + else: + summary["llm_runs"] += 1 + if stats is not None: + stats["llm_runs"] += 1 + for lane in lanes_to_run: + if lane in summary["lane_calls"]: + summary["lane_calls"][lane] += 1 + if stats is not None and lane in stats["lane_calls"]: + stats["lane_calls"][lane] += 1 + + if event_type == "symbol_analysis_skipped": + detail = str(event.get("detail") or "") + if "数据不完整" in detail: + summary["data_invalid_skips"] += 1 + if stats is not None: + stats["data_invalid_skips"] += 1 + if "波动率过低" in detail: + summary["volatility_skips"] += 1 + if stats is not None: + stats["volatility_skips"] += 1 + + if event_type == "symbol_analysis_completed": + trade_signals = int(event.get("trade_signals", 0) or 0) + valid_signals = int(event.get("valid_signals", 0) or 0) + detail = str(event.get("detail") or "") + pre_lane_counts = event.get("pre_regime_lane_signal_counts") or {} + post_lane_counts = event.get("post_regime_lane_signal_counts") or {} + summary["lane_signal_counts"]["short_term_pre"] += int(pre_lane_counts.get("short_term", 0) or 0) + summary["lane_signal_counts"]["medium_term_pre"] += int(pre_lane_counts.get("medium_term", 0) or 0) + summary["lane_signal_counts"]["short_term_post"] += int(post_lane_counts.get("short_term", 0) or 0) + summary["lane_signal_counts"]["medium_term_post"] += int(post_lane_counts.get("medium_term", 0) or 0) + if stats is not None: + stats["lane_signal_counts"]["short_term_pre"] += int(pre_lane_counts.get("short_term", 0) or 0) + stats["lane_signal_counts"]["medium_term_pre"] += int(pre_lane_counts.get("medium_term", 0) or 0) + stats["lane_signal_counts"]["short_term_post"] += int(post_lane_counts.get("short_term", 0) or 0) + stats["lane_signal_counts"]["medium_term_post"] += int(post_lane_counts.get("medium_term", 0) or 0) + event_reason_counts = event.get("blocked_reason_counts") or {} + for reason_key, count in event_reason_counts.items(): + blocked_reason_counts[str(reason_key)] += int(count or 0) + if stats is not None: + symbol_reason_counts = stats.setdefault("blocked_reason_counts", {}) + symbol_reason_counts[str(reason_key)] = int(symbol_reason_counts.get(str(reason_key), 0) or 0) + int(count or 0) + if trade_signals == 0: + summary["no_trade_signal_runs"] += 1 + if stats is not None: + stats["no_trade_signal_runs"] += 1 + elif valid_signals == 0: + summary["threshold_filtered_runs"] += 1 + if stats is not None: + stats["threshold_filtered_runs"] += 1 + else: + summary["valid_signal_runs"] += 1 + summary["valid_signals_total"] += valid_signals + if stats is not None: + stats["valid_signal_runs"] += 1 + stats["valid_signals_total"] += valid_signals + + if stats is not None: + stats["last_status"] = event.get("status") + stats["last_detail"] = detail + stats["last_event_at"] = event.get("timestamp") + + summary["symbols"] = dict(sorted( + ( + (symbol, payload) + for symbol, payload in symbol_stats.items() + if symbol + ), + key=lambda item: ( + -(item[1]["valid_signal_runs"] or 0), + -(item[1]["triggers"] or 0), + item[0], + ) + )) + summary["blocked_reason_counts"] = dict(sorted(blocked_reason_counts.items(), key=lambda item: (-item[1], item[0]))) + return summary + def _on_price_update(self, symbol: str, price: float): """处理实时价格更新(用于模拟交易)""" if not self.paper_trading: @@ -1262,6 +1449,14 @@ class CryptoAgent: symbol: 交易对,如 'BTCUSDT' """ try: + self._bump_analysis_stat("total_triggers") + if trigger_source == "schedule": + self._bump_analysis_stat("scheduled_triggers") + elif trigger_source == "event": + self._bump_analysis_stat("event_triggers") + else: + self._bump_analysis_stat("manual_triggers") + # 更新活动时间 monitor = get_system_monitor() monitor.update_activity("crypto_agent") @@ -1284,6 +1479,7 @@ class CryptoAgent: data = self.exchange.get_multi_timeframe_data(symbol) if not self._validate_data(data): + self._bump_analysis_stat("data_invalid_skips") self._analysis_monitor["last_analysis_completed_at"] = datetime.now().isoformat() self._analysis_monitor["last_analysis_status"] = "warning" self._analysis_monitor["last_analysis_detail"] = "数据不完整,跳过分析" @@ -1323,6 +1519,7 @@ class CryptoAgent: # 1.5. 波动率检查(节省 LLM 调用) should_analyze, volatility_reason, volatility = self._check_volatility(symbol, data) if not should_analyze: + self._bump_analysis_stat("volatility_skips") self._analysis_monitor["last_analysis_completed_at"] = datetime.now().isoformat() self._analysis_monitor["last_analysis_status"] = "skipped" self._analysis_monitor["last_analysis_detail"] = volatility_reason @@ -1351,6 +1548,13 @@ class CryptoAgent: elif not lanes_to_run: logger.info(" 🧊 LLM 冷却中,使用上一轮 lane 缓存结果") + if lanes_to_run: + self._bump_analysis_stat("llm_analyses") + for lane in lanes_to_run: + self._bump_lane_call(lane) + else: + self._bump_analysis_stat("cache_only_runs") + self._record_analysis_event( "llm_lane_plan", symbol=symbol, @@ -1390,8 +1594,15 @@ class CryptoAgent: # 过滤掉 wait 信号,只保留 buy/sell 信号 signals = market_signal.get('signals', []) trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']] + pre_regime_trade_signals = int(market_signal.get('pre_regime_trade_signal_count', len(trade_signals)) or 0) + self._bump_analysis_stat("pre_regime_trade_signals", pre_regime_trade_signals) + self._bump_analysis_stat("post_regime_trade_signals", len(trade_signals)) + filtered_out = max(0, pre_regime_trade_signals - len(trade_signals)) + if filtered_out: + self._bump_analysis_stat("regime_filtered_out", filtered_out) if not trade_signals: + self._bump_analysis_stat("no_trade_signal_runs") self._analysis_monitor["last_analysis_completed_at"] = datetime.now().isoformat() self._analysis_monitor["last_analysis_status"] = "completed" self._analysis_monitor["last_analysis_detail"] = "完成分析,无交易信号" @@ -1400,7 +1611,13 @@ class CryptoAgent: symbol=symbol, status="success", detail="完成分析,无交易信号", - extra={"trade_signals": 0, "valid_signals": 0}, + extra={ + "trade_signals": 0, + "valid_signals": 0, + "blocked_reason_counts": market_signal.get("blocked_reason_counts") or {}, + "pre_regime_lane_signal_counts": market_signal.get("pre_regime_lane_signal_counts") or {}, + "post_regime_lane_signal_counts": market_signal.get("post_regime_lane_signal_counts") or {}, + }, ) blocked_reasons = market_signal.get('blocked_reasons') or [] if blocked_reasons: @@ -1414,6 +1631,7 @@ class CryptoAgent: valid_signals = [s for s in trade_signals if s.get('confidence', 0) >= threshold] if not valid_signals: + self._bump_analysis_stat("threshold_filtered_runs") self._analysis_monitor["last_analysis_completed_at"] = datetime.now().isoformat() self._analysis_monitor["last_analysis_status"] = "completed" self._analysis_monitor["last_analysis_detail"] = f"完成分析,但无信号达到阈值 {threshold}%" @@ -1422,11 +1640,19 @@ class CryptoAgent: symbol=symbol, status="success", detail=f"完成分析,但无信号达到阈值 {threshold}%", - extra={"trade_signals": len(trade_signals), "valid_signals": 0}, + extra={ + "trade_signals": len(trade_signals), + "valid_signals": 0, + "blocked_reason_counts": market_signal.get("blocked_reason_counts") or {}, + "pre_regime_lane_signal_counts": market_signal.get("pre_regime_lane_signal_counts") or {}, + "post_regime_lane_signal_counts": market_signal.get("post_regime_lane_signal_counts") or {}, + }, ) logger.info(f"\n⏸️ 结论: 无交易信号达到置信度阈值 ({threshold}%),继续观望") return + self._bump_analysis_stat("valid_signal_runs") + self._bump_analysis_stat("valid_signals_total", len(valid_signals)) logger.info(f"\n✅ 发现 {len(valid_signals)} 个有效交易信号(达到 {threshold}% 阈值)") for signal in valid_signals: logger.info( @@ -1535,7 +1761,13 @@ class CryptoAgent: symbol=symbol, status="success", detail=f"完成分析,产生 {len(valid_signals)} 个有效信号", - extra={"trade_signals": len(trade_signals), "valid_signals": len(valid_signals)}, + extra={ + "trade_signals": len(trade_signals), + "valid_signals": len(valid_signals), + "blocked_reason_counts": market_signal.get("blocked_reason_counts") or {}, + "pre_regime_lane_signal_counts": market_signal.get("pre_regime_lane_signal_counts") or {}, + "post_regime_lane_signal_counts": market_signal.get("post_regime_lane_signal_counts") or {}, + }, ) except Exception as e: @@ -4388,6 +4620,8 @@ class CryptoAgent: 'target_execution_controls': self.get_target_execution_status(), 'analysis_monitor': self._analysis_monitor, 'analysis_notifications': self._analysis_notification_state, + 'analysis_funnel_stats': self._analysis_funnel_stats, + 'analysis_funnel_24h': self._summarize_recent_analysis_funnel(hours=24), 'lane_analysis_state': self._lane_analysis_state, 'event_analysis_state': self._event_analysis_state, 'execution_guardian': self.execution_guardian.get_status(), diff --git a/backend/app/crypto_agent/market_signal_analyzer.py b/backend/app/crypto_agent/market_signal_analyzer.py index ec666e7..5a5a29e 100644 --- a/backend/app/crypto_agent/market_signal_analyzer.py +++ b/backend/app/crypto_agent/market_signal_analyzer.py @@ -1574,6 +1574,11 @@ class MarketSignalAnalyzer: )[:2] result['signals'] = merged_signals + result['pre_regime_trade_signal_count'] = len(merged_signals) + result['pre_regime_lane_signal_counts'] = { + 'short_term': len(intraday_signals), + 'medium_term': len(trend_signals), + } result['key_levels'] = { 'support': self._dedupe_levels( (intraday_result.get('key_levels', {}) or {}).get('support', []) + @@ -1665,12 +1670,17 @@ class MarketSignalAnalyzer: } ) - filtered_signals, blocked_reasons = self.setup_policy.filter_signals(enriched_signals, regime_profile) + filtered_signals, blocked_reasons, blocked_reason_counts = self.setup_policy.filter_signals(enriched_signals, regime_profile) normalized = dict(market_signal) normalized["signals"] = filtered_signals normalized["regime_profile"] = regime_profile normalized["blocked_reasons"] = blocked_reasons[:6] + normalized["blocked_reason_counts"] = blocked_reason_counts + normalized["post_regime_lane_signal_counts"] = { + 'short_term': len([signal for signal in filtered_signals if (signal.get("timeframe") or signal.get("type")) == "short_term"]), + 'medium_term': len([signal for signal in filtered_signals if (signal.get("timeframe") or signal.get("type")) == "medium_term"]), + } if not filtered_signals and blocked_reasons: normalized["analysis_summary"] = self._truncate_summary(regime_profile.get("summary") or "当前状态不交易") diff --git a/backend/app/crypto_agent/setup_policy.py b/backend/app/crypto_agent/setup_policy.py index 67f7114..a6a1011 100644 --- a/backend/app/crypto_agent/setup_policy.py +++ b/backend/app/crypto_agent/setup_policy.py @@ -1,6 +1,7 @@ """ 市场状态到交易行为的硬约束策略 """ +from collections import defaultdict from typing import Any, Dict, List, Tuple @@ -15,15 +16,18 @@ class SetupPolicy: self, signals: List[Dict[str, Any]], regime_profile: Dict[str, Any], - ) -> Tuple[List[Dict[str, Any]], List[str]]: + ) -> Tuple[List[Dict[str, Any]], List[str], Dict[str, int]]: allowed_lanes = set(regime_profile.get("allowed_lanes") or []) allowed_setups = set(regime_profile.get("allowed_setups") or []) tradability = regime_profile.get("tradability", "avoid") reasons: List[str] = [] + reason_counts: Dict[str, int] = defaultdict(int) if tradability == "avoid" or not allowed_lanes or not allowed_setups: - reasons.extend(regime_profile.get("no_trade_reasons") or ["当前市场状态不允许交易"]) - return [], reasons + base_reasons = regime_profile.get("no_trade_reasons") or ["当前市场状态不允许交易"] + reasons.extend(base_reasons) + reason_counts["tradability_avoid"] += max(1, len(signals or [])) + return [], reasons, dict(reason_counts) kept: List[Dict[str, Any]] = [] for signal in signals or []: @@ -34,10 +38,12 @@ class SetupPolicy: if lane not in allowed_lanes: reasons.append(f"{lane} 不在允许交易周期内") + reason_counts[f"lane_blocked:{lane}"] += 1 continue if setup_type not in allowed_setups: reasons.append(f"{setup_type} 不在允许 setup 内") + reason_counts[f"setup_blocked:{setup_type}"] += 1 continue kept.append({ @@ -47,7 +53,7 @@ class SetupPolicy: "entry_basis": entry_basis, }) - return kept, reasons + return kept, reasons, dict(reason_counts) def _infer_setup_type(self, signal: Dict[str, Any]) -> str: lane = signal.get("timeframe") or signal.get("type") or "medium_term" diff --git a/frontend/console.html b/frontend/console.html index 8c91261..db8e62c 100644 --- a/frontend/console.html +++ b/frontend/console.html @@ -3164,6 +3164,7 @@ const attentionItems = management.attention_items || []; const haltedCount = countHalted(cryptoAgent.platform_halts || {}); const latestSignals = data.signals?.latest || []; + const funnel = cryptoAgent.analysis_funnel_stats || {}; const heartbeatHeadline = monitor.last_heartbeat_at ? relativeTime(monitor.last_heartbeat_at) : '无心跳'; const heartbeatTone = toneClassForHealth(cryptoAgent.running ? (monitor.last_cycle_status || monitor.last_analysis_status) : 'stopped'); const exposureNotional = positions.reduce((sum, item) => { @@ -3184,7 +3185,7 @@