""" Volatility Monitor - 实时监控价格波动率 当波动率突然升高时触发分析,防止漏掉突发行情 """ import asyncio import logging import json import urllib.request import ssl from datetime import datetime, timedelta from typing import Dict, List, Optional, Callable, Any from dataclasses import dataclass, field from collections import deque logger = logging.getLogger(__name__) @dataclass class PricePoint: """价格数据点""" timestamp: datetime price: float volume: float = 0.0 @dataclass class VolatilityState: """单个币种的波动率状态""" symbol: str prices: deque = field(default_factory=lambda: deque(maxlen=120)) # 保存最近120个数据点 last_trigger_time: datetime = None baseline_volatility: float = 0.0 # 基准波动率 current_volatility: float = 0.0 # 当前波动率 def add_price(self, price: float, volume: float = 0.0): """添加价格点""" self.prices.append(PricePoint( timestamp=datetime.now(), price=price, volume=volume )) def get_prices_in_window(self, minutes: int) -> List[float]: """获取指定时间窗口内的价格""" cutoff = datetime.now() - timedelta(minutes=minutes) return [p.price for p in self.prices if p.timestamp >= cutoff] class VolatilityMonitor: """ 波动率监控器 监控策略: 1. 价格变化率: 短期价格变化超过阈值 2. 波动率突增: 当前波动率 vs 基准波动率 3. 连续大幅波动: 连续多个周期出现大幅波动 """ # Binance API BINANCE_PRICE_URL = "https://fapi.binance.com/fapi/v1/ticker/price" BINANCE_TICKER_URL = "https://fapi.binance.com/fapi/v1/ticker/24hr" def __init__( self, symbols: List[str], on_volatility_spike: Callable[[str, Dict], Any] = None, check_interval: int = 10, # 检查间隔(秒)- 改为10秒减少API调用 price_change_threshold: float = 1.0, # 价格变化阈值 1% (原0.5%太敏感) volatility_multiplier: float = 2.5, # 波动率突增倍数 (原2.0) cooldown_minutes: int = 5, # 触发后冷却时间 5分钟 (原3分钟) ): """ Args: symbols: 监控的交易对列表 on_volatility_spike: 波动率突增时的回调函数 check_interval: 检查间隔(秒) price_change_threshold: 价格变化阈值(百分比) volatility_multiplier: 波动率突增倍数阈值 cooldown_minutes: 触发后冷却时间(分钟) """ self.symbols = symbols self.on_volatility_spike = on_volatility_spike self.check_interval = check_interval self.price_change_threshold = price_change_threshold self.volatility_multiplier = volatility_multiplier self.cooldown_minutes = cooldown_minutes # 每个币种的状态 self.states: Dict[str, VolatilityState] = { symbol: VolatilityState(symbol=symbol) for symbol in symbols } self.is_running = False self._ssl_context = ssl.create_default_context() logger.info( f"VolatilityMonitor 初始化: {len(symbols)} 个币种, " f"检查间隔 {check_interval}s, 价格阈值 {price_change_threshold}%, " f"波动率倍数 {volatility_multiplier}x, 冷却时间 {cooldown_minutes}min" ) async def start(self): """启动监控""" self.is_running = True logger.info("VolatilityMonitor 启动") # 先获取初始价格建立基准 await self._initialize_baseline() while self.is_running: try: await self._check_volatility() await asyncio.sleep(self.check_interval) except asyncio.CancelledError: break except Exception as e: logger.error(f"波动率检查错误: {e}", exc_info=True) await asyncio.sleep(self.check_interval) logger.info("VolatilityMonitor 已停止") def stop(self): """停止监控""" self.is_running = False async def _initialize_baseline(self): """初始化基准波动率""" logger.info("正在建立波动率基准...") # 获取24小时统计数据来估算基准波动率 for symbol in self.symbols: try: ticker = await self._fetch_24h_ticker(symbol) if ticker: # 使用24小时价格变化百分比作为基准 price_change_pct = abs(float(ticker.get('priceChangePercent', 0))) # 转换为每分钟的预期波动率 self.states[symbol].baseline_volatility = price_change_pct / (24 * 60) * 5 # 5分钟窗口 logger.info( f"[{symbol}] 24h变化: {price_change_pct:.2f}%, " f"基准波动率: {self.states[symbol].baseline_volatility:.4f}%/5min" ) except Exception as e: logger.error(f"[{symbol}] 获取基准数据失败: {e}") self.states[symbol].baseline_volatility = 0.1 # 默认值 # 收集一些初始价格数据 for _ in range(6): # 收集30秒的数据 prices = await self._fetch_prices() for symbol, price in prices.items(): if symbol in self.states: self.states[symbol].add_price(price) await asyncio.sleep(5) logger.info("波动率基准建立完成") async def _check_volatility(self): """检查所有币种的波动率""" prices = await self._fetch_prices() for symbol, price in prices.items(): if symbol not in self.states: continue state = self.states[symbol] state.add_price(price) # 检查是否在冷却期 if state.last_trigger_time: elapsed = (datetime.now() - state.last_trigger_time).total_seconds() / 60 if elapsed < self.cooldown_minutes: continue # 检查波动率 spike_info = self._detect_volatility_spike(state) if spike_info: logger.warning( f"[{symbol}] 波动率突增! " f"类型: {spike_info['type']}, " f"变化: {spike_info['change_pct']:.2f}%, " f"当前价: ${price:,.2f}" ) state.last_trigger_time = datetime.now() # 触发回调 if self.on_volatility_spike: try: result = self.on_volatility_spike(symbol, spike_info) if asyncio.iscoroutine(result): await result except Exception as e: logger.error(f"[{symbol}] 波动率回调执行失败: {e}") def _detect_volatility_spike(self, state: VolatilityState) -> Optional[Dict]: """ 检测波动率突增 返回: 触发信息字典,如果未触发返回 None """ if len(state.prices) < 10: return None prices = [p.price for p in state.prices] current_price = prices[-1] # 1. 检查短期价格变化 (1分钟) prices_1min = state.get_prices_in_window(1) if len(prices_1min) >= 2: min_price = min(prices_1min) max_price = max(prices_1min) change_1min = (max_price - min_price) / min_price * 100 if change_1min >= self.price_change_threshold: direction = "UP" if current_price >= prices_1min[0] else "DOWN" return { 'type': 'rapid_move', 'timeframe': '1min', 'change_pct': change_1min, 'direction': direction, 'current_price': current_price, 'trigger_time': datetime.now().isoformat(), } # 2. 检查中期价格变化 (5分钟) prices_5min = state.get_prices_in_window(5) if len(prices_5min) >= 10: min_price = min(prices_5min) max_price = max(prices_5min) change_5min = (max_price - min_price) / min_price * 100 # 5分钟窗口使用更高的阈值 if change_5min >= self.price_change_threshold * 2: direction = "UP" if current_price >= prices_5min[0] else "DOWN" return { 'type': 'sustained_move', 'timeframe': '5min', 'change_pct': change_5min, 'direction': direction, 'current_price': current_price, 'trigger_time': datetime.now().isoformat(), } # 3. 检查波动率突增 if len(prices_5min) >= 10 and state.baseline_volatility > 0: # 计算当前波动率 (标准差 / 均值 * 100) import statistics mean_price = statistics.mean(prices_5min) std_price = statistics.stdev(prices_5min) current_volatility = (std_price / mean_price) * 100 state.current_volatility = current_volatility if current_volatility >= state.baseline_volatility * self.volatility_multiplier: return { 'type': 'volatility_spike', 'timeframe': '5min', 'change_pct': current_volatility, 'baseline_pct': state.baseline_volatility, 'multiplier': current_volatility / state.baseline_volatility, 'current_price': current_price, 'trigger_time': datetime.now().isoformat(), } return None async def _fetch_prices(self) -> Dict[str, float]: """获取所有币种的当前价格""" prices = {} loop = asyncio.get_event_loop() try: for symbol in self.symbols: price = await loop.run_in_executor(None, self._fetch_price_sync, symbol) if price: prices[symbol] = price except Exception as e: logger.error(f"获取价格失败: {e}") return prices def _fetch_price_sync(self, symbol: str) -> Optional[float]: """同步获取单个币种价格""" try: url = f"{self.BINANCE_PRICE_URL}?symbol={symbol}" req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req, timeout=5, context=self._ssl_context) as response: data = json.loads(response.read().decode('utf-8')) return float(data['price']) except Exception as e: logger.debug(f"获取 {symbol} 价格失败: {e}") return None async def _fetch_24h_ticker(self, symbol: str) -> Optional[Dict]: """获取24小时统计数据""" loop = asyncio.get_event_loop() try: return await loop.run_in_executor(None, self._fetch_24h_ticker_sync, symbol) except Exception as e: logger.error(f"获取 {symbol} 24h数据失败: {e}") return None def _fetch_24h_ticker_sync(self, symbol: str) -> Optional[Dict]: """同步获取24小时统计""" try: url = f"{self.BINANCE_TICKER_URL}?symbol={symbol}" req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req, timeout=5, context=self._ssl_context) as response: return json.loads(response.read().decode('utf-8')) except Exception as e: logger.debug(f"获取 {symbol} 24h数据失败: {e}") return None def get_status(self) -> Dict[str, Any]: """获取监控状态""" status = { 'is_running': self.is_running, 'symbols': {}, } for symbol, state in self.states.items(): prices = [p.price for p in state.prices] status['symbols'][symbol] = { 'price_count': len(prices), 'current_price': prices[-1] if prices else None, 'baseline_volatility': state.baseline_volatility, 'current_volatility': state.current_volatility, 'last_trigger': state.last_trigger_time.isoformat() if state.last_trigger_time else None, } return status