diff --git a/.env.example b/.env.example index e7a7145..bd43af8 100644 --- a/.env.example +++ b/.env.example @@ -92,6 +92,17 @@ CRYPTO_ANALYSIS_INTERVAL=60 # 触发 LLM 分析的置信度阈值(0-1) CRYPTO_LLM_THRESHOLD=0.70 +# 波动率过滤配置(节省 LLM 调用) +# ---------------------------------------------------------------------------- +# 是否启用波动率过滤(true/false) +CRYPTO_VOLATILITY_FILTER_ENABLED=true +# 1小时最小波动率(百分比),低于此值需要检查5分钟突发波动 +CRYPTO_MIN_VOLATILITY_PERCENT=0.5 +# 最小价格变动范围(百分比),低于此值需要检查5分钟突发波动 +CRYPTO_MIN_PRICE_RANGE_PERCENT=0.3 +# 5分钟突发波动阈值(百分比),超过此值即使1小时波动率低也会触发分析 +CRYPTO_5M_SURGE_THRESHOLD=1.0 + # ---------------------------------------------------------------------------- # 模拟交易配置 # ---------------------------------------------------------------------------- diff --git a/backend/app/config.py b/backend/app/config.py index fee1386..dc2c83b 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -107,6 +107,12 @@ class Settings(BaseSettings): crypto_analysis_interval: int = 60 # 分析间隔(秒) crypto_llm_threshold: float = 0.70 # 触发 LLM 分析的置信度阈值 + # 波动率过滤配置(节省 LLM 调用) + crypto_volatility_filter_enabled: bool = True # 是否启用波动率过滤 + crypto_min_volatility_percent: float = 0.5 # 最小波动率(百分比),低于此值跳过分析 + crypto_min_price_range_percent: float = 0.3 # 最小价格变动范围(百分比),低于此值跳过分析 + crypto_5m_surge_threshold: float = 1.0 # 5分钟突发波动阈值(百分比),超过此值即使1小时波动率低也会触发分析 + # Brave Search API 配置 brave_api_key: str = "" diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index 48fc605..41da811 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -248,6 +248,93 @@ class CryptoAgent: self.running = False logger.info("加密货币智能体已停止") + def _check_volatility(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[bool, str, float]: + """ + 检查波动率,判断是否值得进行 LLM 分析(组合方案) + + 使用 1 小时 K 线判断趋势波动,5 分钟 K 线检测突发波动 + + Args: + symbol: 交易对 + data: 多周期K线数据 + + Returns: + (should_analyze, reason, volatility_percent) + should_analyze: 是否应该进行分析 + reason: 原因说明 + volatility_percent: 1小时波动率百分比 + """ + # 检查是否启用波动率过滤 + if not self.settings.crypto_volatility_filter_enabled: + return True, "波动率过滤未启用", 0 + + try: + # 1. 首先检查 1 小时趋势波动率 + df_1h = data.get('1h') + if df_1h is None or len(df_1h) < 20: + # 数据不足,保守起见允许分析 + return True, "1小时数据不足,允许分析", 0 + + # 获取最近20根K线 + recent_1h = df_1h.iloc[-20:] + + # 计算最高价和最低价 + high = recent_1h['high'].max() + low = recent_1h['low'].min() + current_price = float(recent_1h.iloc[-1]['close']) + + # 计算1小时波动率 + if low > 0: + volatility_1h_percent = ((high - low) / low) * 100 + else: + volatility_1h_percent = 0 + + # 计算价格变化范围(相对于当前价格) + price_range_high_percent = ((high - current_price) / current_price) * 100 if current_price > 0 else 0 + price_range_low_percent = ((current_price - low) / current_price) * 100 if current_price > 0 else 0 + + # 从配置读取阈值 + min_volatility = self.settings.crypto_min_volatility_percent + min_price_range = self.settings.crypto_min_price_range_percent + + # 如果1小时波动率足够大,直接允许分析 + if volatility_1h_percent >= min_volatility or price_range_high_percent >= min_price_range or price_range_low_percent >= min_price_range: + return True, f"1小时趋势活跃 (波动率 {volatility_1h_percent:.2f}%),值得分析", volatility_1h_percent + + # 2. 1小时波动率较低,检查5分钟突发波动 + df_5m = data.get('5m') + if df_5m is not None and len(df_5m) >= 3: + # 获取最近3根5分钟K线(15分钟内的变化) + recent_5m = df_5m.iloc[-3:] + + # 计算5分钟价格变化幅度 + price_start = float(recent_5m.iloc[0]['close']) + price_end = float(recent_5m.iloc[-1]['close']) + + if price_start > 0: + price_change_5m = abs(price_end - price_start) / price_start * 100 + else: + price_change_5m = 0 + + # 从配置读取5分钟突发波动阈值 + surge_threshold = self.settings.crypto_5m_surge_threshold + + logger.debug(f"{symbol} 5分钟价格变化: {price_start:.2f} -> {price_end:.2f} = {price_change_5m:.2f}% (阈值: {surge_threshold}%)") + + # 如果5分钟突发波动超过阈值,仍然允许分析 + if price_change_5m >= surge_threshold: + direction = "上涨" if price_end > price_start else "下跌" + return True, f"5分钟突发{direction} ({price_change_5m:.2f}% > {surge_threshold}%),强制分析", volatility_1h_percent + + # 3. 波动率过低,跳过分析 + reason = f"波动率过低 (1小时: {volatility_1h_percent:.2f}% < {min_volatility}%, 5分钟无突发波动),跳过分析" + logger.info(f"⏸️ {symbol} {reason}") + return False, reason, volatility_1h_percent + + except Exception as e: + logger.warning(f"{symbol} 波动率检查失败: {e},允许分析") + return True, "波动率检查失败,允许分析", 0 + async def analyze_symbol(self, symbol: str): """ 分析单个交易对(LLM 驱动) @@ -272,6 +359,12 @@ class CryptoAgent: price_change_24h = self._calculate_price_change(data['1h']) logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})") + # 1.5. 波动率检查(节省 LLM 调用) + should_analyze, volatility_reason, volatility = self._check_volatility(symbol, data) + if not should_analyze: + logger.info(f"⏸️ {volatility_reason},跳过本次 LLM 分析") + return + # 获取当前持仓信息(供 LLM 仓位决策) position_info = self.paper_trading.get_position_info() diff --git a/scripts/test_volatility_filter.py b/scripts/test_volatility_filter.py new file mode 100644 index 0000000..c92fa7f --- /dev/null +++ b/scripts/test_volatility_filter.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +""" +测试组合波动率过滤功能 + +测试场景: +1. 低波动率(1小时和5分钟都低)- 应该跳过分析 +2. 高波动率(1小时高)- 应该允许分析 +3. 突发波动(1小时低,但5分钟高)- 应该允许分析 +""" +import sys +import pandas as pd +from pathlib import Path + +# 添加项目路径 +backend_dir = Path(__file__).parent.parent / "backend" +sys.path.insert(0, str(backend_dir)) + +from app.crypto_agent.crypto_agent import CryptoAgent + + +def create_test_data_1h_low_volatility(): + """ + 场景1: 低波动率 + 1小时K线:价格在 50000 - 50040 之间波动(波动率约0.08%) + 5分钟K线:价格稳定,无突发波动 + """ + import numpy as np + + # 创建1小时数据(20根K线)- 价格波动范围很小 + base_price = 50000 + timestamps_1h = pd.date_range(end='2024-01-01 12:00:00', periods=20, freq='1h') + df_1h = pd.DataFrame({ + 'timestamp': timestamps_1h, + 'open': [base_price + i * 2 for i in range(20)], # 缓慢上涨 + 'high': [base_price + 20 + i * 2 for i in range(20)], # 最高价只高20 + 'low': [base_price - 20 + i * 2 for i in range(20)], # 最低价只低20 + 'close': [base_price + 10 + i * 2 for i in range(20)], + 'volume': [1000] * 20 + }) + + # 创建5分钟数据(3根K线)- 价格稳定 + timestamps_5m = pd.date_range(end='2024-01-01 12:00:00', periods=3, freq='5min') + df_5m = pd.DataFrame({ + 'timestamp': timestamps_5m, + 'open': [base_price + 38, base_price + 39, base_price + 40], + 'high': [base_price + 42, base_price + 43, base_price + 44], + 'low': [base_price + 36, base_price + 37, base_price + 38], + 'close': [base_price + 39, base_price + 40, base_price + 41], + 'volume': [100] * 3 + }) + + return { + '1h': df_1h, + '5m': df_5m, + '15m': df_1h, + '4h': df_1h + } + + +def create_test_data_1h_high_volatility(): + """ + 场景2: 高波动率(1小时趋势活跃) + 1小时K线:价格在 50000 - 51000 之间波动(波动率约2%) + """ + import numpy as np + + # 创建1小时数据(20根K线) + timestamps_1h = pd.date_range(end='2024-01-01 12:00:00', periods=20, freq='1h') + df_1h = pd.DataFrame({ + 'timestamp': timestamps_1h, + 'open': [50000 + i * 50 for i in range(20)], + 'high': [50200 + i * 50 for i in range(20)], + 'low': [49800 + i * 50 for i in range(20)], + 'close': [50100 + i * 50 for i in range(20)], + 'volume': [1000] * 20 + }) + + # 创建5分钟数据 + timestamps_5m = pd.date_range(end='2024-01-01 12:00:00', periods=3, freq='5min') + df_5m = pd.DataFrame({ + 'timestamp': timestamps_5m, + 'open': [50980, 50985, 50990], + 'high': [51000, 51005, 51010], + 'low': [50975, 50980, 50985], + 'close': [50985, 50990, 50995], + 'volume': [100] * 3 + }) + + return { + '1h': df_1h, + '5m': df_5m, + '15m': df_1h, + '4h': df_1h + } + + +def create_test_data_5m_surge(): + """ + 场景3: 突发波动(1小时低,但5分钟高) + 1小时K线:价格在 50000 - 50040 之间波动(波动率约0.08%) + 5分钟K线:价格从 50000 突然涨到 50600(涨幅约1.2%) + """ + import numpy as np + + # 创建1小时数据(20根K线)- 低波动 + base_price = 50000 + timestamps_1h = pd.date_range(end='2024-01-01 12:00:00', periods=20, freq='1h') + df_1h = pd.DataFrame({ + 'timestamp': timestamps_1h, + 'open': [base_price + i * 2 for i in range(20)], + 'high': [base_price + 20 + i * 2 for i in range(20)], + 'low': [base_price - 20 + i * 2 for i in range(20)], + 'close': [base_price + 10 + i * 2 for i in range(20)], + 'volume': [1000] * 20 + }) + + # 创建5分钟数据(3根K线)- 突发波动 + # 第一根K线从正常价格开始,然后突然暴涨 + timestamps_5m = pd.date_range(end='2024-01-01 12:00:00', periods=3, freq='5min') + df_5m = pd.DataFrame({ + 'timestamp': timestamps_5m, + 'open': [base_price + 38, base_price + 300, base_price + 600], # 突然暴涨 + 'high': [base_price + 42, base_price + 350, base_price + 650], + 'low': [base_price + 36, base_price + 250, base_price + 550], + 'close': [base_price + 40, base_price + 600, base_price + 620], # 第一根收盘价接近开盘价,然后暴涨 + 'volume': [100, 200, 300] # 成交量也放大 + }) + + return { + '1h': df_1h, + '5m': df_5m, + '15m': df_1h, + '4h': df_1h + } + + +def test_volatility_filter(): + """测试波动率过滤功能""" + print("\n" + "=" * 60) + print("🧪 测试组合波动率过滤功能") + print("=" * 60) + + # 创建 CryptoAgent 实例 + agent = CryptoAgent() + + # 测试场景1: 低波动率 + print("\n📋 场景1: 低波动率(1小时和5分钟都低)") + print("-" * 60) + data1 = create_test_data_1h_low_volatility() + should_analyze1, reason1, vol1 = agent._check_volatility('BTCUSDT', data1) + print(f"结果: {'✅ 允许分析' if should_analyze1 else '⏸️ 跳过分析'}") + print(f"原因: {reason1}") + print(f"1小时波动率: {vol1:.2f}%") + print(f"预期: ⏸️ 跳过分析(波动率过低)") + print(f"测试: {'✅ 通过' if not should_analyze1 else '❌ 失败'}") + + # 测试场景2: 高波动率 + print("\n📋 场景2: 高波动率(1小时趋势活跃)") + print("-" * 60) + data2 = create_test_data_1h_high_volatility() + should_analyze2, reason2, vol2 = agent._check_volatility('BTCUSDT', data2) + print(f"结果: {'✅ 允许分析' if should_analyze2 else '⏸️ 跳过分析'}") + print(f"原因: {reason2}") + print(f"1小时波动率: {vol2:.2f}%") + print(f"预期: ✅ 允许分析(1小时趋势活跃)") + print(f"测试: {'✅ 通过' if should_analyze2 else '❌ 失败'}") + + # 测试场景3: 突发波动 + print("\n📋 场景3: 突发波动(1小时低,但5分钟高)") + print("-" * 60) + data3 = create_test_data_5m_surge() + + # 打印5分钟数据用于调试 + df_5m_3 = data3['5m'] + print(f"5分钟K线数据:") + for idx, row in df_5m_3.iterrows(): + print(f" 开: {row['open']:.2f}, 高: {row['high']:.2f}, 低: {row['low']:.2f}, 收: {row['close']:.2f}") + + # 手动计算价格变化 + price_start = float(df_5m_3.iloc[0]['close']) + price_end = float(df_5m_3.iloc[-1]['close']) + price_change_5m = abs(price_end - price_start) / price_start * 100 + print(f"5分钟价格变化: {price_start:.2f} -> {price_end:.2f} = {price_change_5m:.2f}%") + + should_analyze3, reason3, vol3 = agent._check_volatility('BTCUSDT', data3) + print(f"结果: {'✅ 允许分析' if should_analyze3 else '⏸️ 跳过分析'}") + print(f"原因: {reason3}") + print(f"1小时波动率: {vol3:.2f}%") + print(f"预期: ✅ 允许分析(5分钟突发波动)") + print(f"测试: {'✅ 通过' if should_analyze3 else '❌ 失败'}") + + # 总结 + print("\n" + "=" * 60) + print("📊 测试总结") + print("=" * 60) + all_passed = ( + not should_analyze1 and # 场景1应该跳过 + should_analyze2 and # 场景2应该允许 + should_analyze3 # 场景3应该允许 + ) + if all_passed: + print("✅ 所有测试通过!") + print("\n组合波动率过滤功能正常工作:") + print(" • 低波动率时正确跳过分析(节省API调用)") + print(" • 高波动率时正确触发分析(捕捉趋势机会)") + print(" • 突发波动时正确触发分析(捕捉突发机会)") + else: + print("❌ 部分测试失败,请检查逻辑") + + return all_passed + + +if __name__ == "__main__": + success = test_volatility_filter() + sys.exit(0 if success else 1)