tradusai/analysis/volatility_monitor.py
2025-12-11 21:59:01 +08:00

333 lines
12 KiB
Python

"""
Volatility Monitor - 实时监控价格波动率
当波动率突然升高时触发分析,防止漏掉突发行情
"""
import asyncio
import logging
import json
import urllib.request
import ssl
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass, field
from collections import deque
logger = logging.getLogger(__name__)
@dataclass
class PricePoint:
"""价格数据点"""
timestamp: datetime
price: float
volume: float = 0.0
@dataclass
class VolatilityState:
"""单个币种的波动率状态"""
symbol: str
prices: deque = field(default_factory=lambda: deque(maxlen=120)) # 保存最近120个数据点
last_trigger_time: datetime = None
baseline_volatility: float = 0.0 # 基准波动率
current_volatility: float = 0.0 # 当前波动率
def add_price(self, price: float, volume: float = 0.0):
"""添加价格点"""
self.prices.append(PricePoint(
timestamp=datetime.now(),
price=price,
volume=volume
))
def get_prices_in_window(self, minutes: int) -> List[float]:
"""获取指定时间窗口内的价格"""
cutoff = datetime.now() - timedelta(minutes=minutes)
return [p.price for p in self.prices if p.timestamp >= cutoff]
class VolatilityMonitor:
"""
波动率监控器
监控策略:
1. 价格变化率: 短期价格变化超过阈值
2. 波动率突增: 当前波动率 vs 基准波动率
3. 连续大幅波动: 连续多个周期出现大幅波动
"""
# Binance API
BINANCE_PRICE_URL = "https://fapi.binance.com/fapi/v1/ticker/price"
BINANCE_TICKER_URL = "https://fapi.binance.com/fapi/v1/ticker/24hr"
def __init__(
self,
symbols: List[str],
on_volatility_spike: Callable[[str, Dict], Any] = None,
check_interval: int = 10, # 检查间隔(秒)- 改为10秒减少API调用
price_change_threshold: float = 1.0, # 价格变化阈值 1% (原0.5%太敏感)
volatility_multiplier: float = 2.5, # 波动率突增倍数 (原2.0)
cooldown_minutes: int = 5, # 触发后冷却时间 5分钟 (原3分钟)
):
"""
Args:
symbols: 监控的交易对列表
on_volatility_spike: 波动率突增时的回调函数
check_interval: 检查间隔(秒)
price_change_threshold: 价格变化阈值(百分比)
volatility_multiplier: 波动率突增倍数阈值
cooldown_minutes: 触发后冷却时间(分钟)
"""
self.symbols = symbols
self.on_volatility_spike = on_volatility_spike
self.check_interval = check_interval
self.price_change_threshold = price_change_threshold
self.volatility_multiplier = volatility_multiplier
self.cooldown_minutes = cooldown_minutes
# 每个币种的状态
self.states: Dict[str, VolatilityState] = {
symbol: VolatilityState(symbol=symbol) for symbol in symbols
}
self.is_running = False
self._ssl_context = ssl.create_default_context()
logger.info(
f"VolatilityMonitor 初始化: {len(symbols)} 个币种, "
f"检查间隔 {check_interval}s, 价格阈值 {price_change_threshold}%, "
f"波动率倍数 {volatility_multiplier}x, 冷却时间 {cooldown_minutes}min"
)
async def start(self):
"""启动监控"""
self.is_running = True
logger.info("VolatilityMonitor 启动")
# 先获取初始价格建立基准
await self._initialize_baseline()
while self.is_running:
try:
await self._check_volatility()
await asyncio.sleep(self.check_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"波动率检查错误: {e}", exc_info=True)
await asyncio.sleep(self.check_interval)
logger.info("VolatilityMonitor 已停止")
def stop(self):
"""停止监控"""
self.is_running = False
async def _initialize_baseline(self):
"""初始化基准波动率"""
logger.info("正在建立波动率基准...")
# 获取24小时统计数据来估算基准波动率
for symbol in self.symbols:
try:
ticker = await self._fetch_24h_ticker(symbol)
if ticker:
# 使用24小时价格变化百分比作为基准
price_change_pct = abs(float(ticker.get('priceChangePercent', 0)))
# 转换为每分钟的预期波动率
self.states[symbol].baseline_volatility = price_change_pct / (24 * 60) * 5 # 5分钟窗口
logger.info(
f"[{symbol}] 24h变化: {price_change_pct:.2f}%, "
f"基准波动率: {self.states[symbol].baseline_volatility:.4f}%/5min"
)
except Exception as e:
logger.error(f"[{symbol}] 获取基准数据失败: {e}")
self.states[symbol].baseline_volatility = 0.1 # 默认值
# 收集一些初始价格数据
for _ in range(6): # 收集30秒的数据
prices = await self._fetch_prices()
for symbol, price in prices.items():
if symbol in self.states:
self.states[symbol].add_price(price)
await asyncio.sleep(5)
logger.info("波动率基准建立完成")
async def _check_volatility(self):
"""检查所有币种的波动率"""
prices = await self._fetch_prices()
for symbol, price in prices.items():
if symbol not in self.states:
continue
state = self.states[symbol]
state.add_price(price)
# 检查是否在冷却期
if state.last_trigger_time:
elapsed = (datetime.now() - state.last_trigger_time).total_seconds() / 60
if elapsed < self.cooldown_minutes:
continue
# 检查波动率
spike_info = self._detect_volatility_spike(state)
if spike_info:
logger.warning(
f"[{symbol}] 波动率突增! "
f"类型: {spike_info['type']}, "
f"变化: {spike_info['change_pct']:.2f}%, "
f"当前价: ${price:,.2f}"
)
state.last_trigger_time = datetime.now()
# 触发回调
if self.on_volatility_spike:
try:
result = self.on_volatility_spike(symbol, spike_info)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error(f"[{symbol}] 波动率回调执行失败: {e}")
def _detect_volatility_spike(self, state: VolatilityState) -> Optional[Dict]:
"""
检测波动率突增
返回:
触发信息字典,如果未触发返回 None
"""
if len(state.prices) < 10:
return None
prices = [p.price for p in state.prices]
current_price = prices[-1]
# 1. 检查短期价格变化 (1分钟)
prices_1min = state.get_prices_in_window(1)
if len(prices_1min) >= 2:
min_price = min(prices_1min)
max_price = max(prices_1min)
change_1min = (max_price - min_price) / min_price * 100
if change_1min >= self.price_change_threshold:
direction = "UP" if current_price >= prices_1min[0] else "DOWN"
return {
'type': 'rapid_move',
'timeframe': '1min',
'change_pct': change_1min,
'direction': direction,
'current_price': current_price,
'trigger_time': datetime.now().isoformat(),
}
# 2. 检查中期价格变化 (5分钟)
prices_5min = state.get_prices_in_window(5)
if len(prices_5min) >= 10:
min_price = min(prices_5min)
max_price = max(prices_5min)
change_5min = (max_price - min_price) / min_price * 100
# 5分钟窗口使用更高的阈值
if change_5min >= self.price_change_threshold * 2:
direction = "UP" if current_price >= prices_5min[0] else "DOWN"
return {
'type': 'sustained_move',
'timeframe': '5min',
'change_pct': change_5min,
'direction': direction,
'current_price': current_price,
'trigger_time': datetime.now().isoformat(),
}
# 3. 检查波动率突增
if len(prices_5min) >= 10 and state.baseline_volatility > 0:
# 计算当前波动率 (标准差 / 均值 * 100)
import statistics
mean_price = statistics.mean(prices_5min)
std_price = statistics.stdev(prices_5min)
current_volatility = (std_price / mean_price) * 100
state.current_volatility = current_volatility
if current_volatility >= state.baseline_volatility * self.volatility_multiplier:
return {
'type': 'volatility_spike',
'timeframe': '5min',
'change_pct': current_volatility,
'baseline_pct': state.baseline_volatility,
'multiplier': current_volatility / state.baseline_volatility,
'current_price': current_price,
'trigger_time': datetime.now().isoformat(),
}
return None
async def _fetch_prices(self) -> Dict[str, float]:
"""获取所有币种的当前价格"""
prices = {}
loop = asyncio.get_event_loop()
try:
for symbol in self.symbols:
price = await loop.run_in_executor(None, self._fetch_price_sync, symbol)
if price:
prices[symbol] = price
except Exception as e:
logger.error(f"获取价格失败: {e}")
return prices
def _fetch_price_sync(self, symbol: str) -> Optional[float]:
"""同步获取单个币种价格"""
try:
url = f"{self.BINANCE_PRICE_URL}?symbol={symbol}"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=5, context=self._ssl_context) as response:
data = json.loads(response.read().decode('utf-8'))
return float(data['price'])
except Exception as e:
logger.debug(f"获取 {symbol} 价格失败: {e}")
return None
async def _fetch_24h_ticker(self, symbol: str) -> Optional[Dict]:
"""获取24小时统计数据"""
loop = asyncio.get_event_loop()
try:
return await loop.run_in_executor(None, self._fetch_24h_ticker_sync, symbol)
except Exception as e:
logger.error(f"获取 {symbol} 24h数据失败: {e}")
return None
def _fetch_24h_ticker_sync(self, symbol: str) -> Optional[Dict]:
"""同步获取24小时统计"""
try:
url = f"{self.BINANCE_TICKER_URL}?symbol={symbol}"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=5, context=self._ssl_context) as response:
return json.loads(response.read().decode('utf-8'))
except Exception as e:
logger.debug(f"获取 {symbol} 24h数据失败: {e}")
return None
def get_status(self) -> Dict[str, Any]:
"""获取监控状态"""
status = {
'is_running': self.is_running,
'symbols': {},
}
for symbol, state in self.states.items():
prices = [p.price for p in state.prices]
status['symbols'][symbol] = {
'price_count': len(prices),
'current_price': prices[-1] if prices else None,
'baseline_volatility': state.baseline_volatility,
'current_volatility': state.current_volatility,
'last_trigger': state.last_trigger_time.isoformat() if state.last_trigger_time else None,
}
return status