333 lines
12 KiB
Python
333 lines
12 KiB
Python
"""
|
|
Volatility Monitor - 实时监控价格波动率
|
|
|
|
当波动率突然升高时触发分析,防止漏掉突发行情
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import json
|
|
import urllib.request
|
|
import ssl
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Callable, Any
|
|
from dataclasses import dataclass, field
|
|
from collections import deque
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class PricePoint:
|
|
"""价格数据点"""
|
|
timestamp: datetime
|
|
price: float
|
|
volume: float = 0.0
|
|
|
|
|
|
@dataclass
|
|
class VolatilityState:
|
|
"""单个币种的波动率状态"""
|
|
symbol: str
|
|
prices: deque = field(default_factory=lambda: deque(maxlen=120)) # 保存最近120个数据点
|
|
last_trigger_time: datetime = None
|
|
baseline_volatility: float = 0.0 # 基准波动率
|
|
current_volatility: float = 0.0 # 当前波动率
|
|
|
|
def add_price(self, price: float, volume: float = 0.0):
|
|
"""添加价格点"""
|
|
self.prices.append(PricePoint(
|
|
timestamp=datetime.now(),
|
|
price=price,
|
|
volume=volume
|
|
))
|
|
|
|
def get_prices_in_window(self, minutes: int) -> List[float]:
|
|
"""获取指定时间窗口内的价格"""
|
|
cutoff = datetime.now() - timedelta(minutes=minutes)
|
|
return [p.price for p in self.prices if p.timestamp >= cutoff]
|
|
|
|
|
|
class VolatilityMonitor:
|
|
"""
|
|
波动率监控器
|
|
|
|
监控策略:
|
|
1. 价格变化率: 短期价格变化超过阈值
|
|
2. 波动率突增: 当前波动率 vs 基准波动率
|
|
3. 连续大幅波动: 连续多个周期出现大幅波动
|
|
"""
|
|
|
|
# Binance API
|
|
BINANCE_PRICE_URL = "https://fapi.binance.com/fapi/v1/ticker/price"
|
|
BINANCE_TICKER_URL = "https://fapi.binance.com/fapi/v1/ticker/24hr"
|
|
|
|
def __init__(
|
|
self,
|
|
symbols: List[str],
|
|
on_volatility_spike: Callable[[str, Dict], Any] = None,
|
|
check_interval: int = 10, # 检查间隔(秒)- 改为10秒减少API调用
|
|
price_change_threshold: float = 1.0, # 价格变化阈值 1% (原0.5%太敏感)
|
|
volatility_multiplier: float = 2.5, # 波动率突增倍数 (原2.0)
|
|
cooldown_minutes: int = 5, # 触发后冷却时间 5分钟 (原3分钟)
|
|
):
|
|
"""
|
|
Args:
|
|
symbols: 监控的交易对列表
|
|
on_volatility_spike: 波动率突增时的回调函数
|
|
check_interval: 检查间隔(秒)
|
|
price_change_threshold: 价格变化阈值(百分比)
|
|
volatility_multiplier: 波动率突增倍数阈值
|
|
cooldown_minutes: 触发后冷却时间(分钟)
|
|
"""
|
|
self.symbols = symbols
|
|
self.on_volatility_spike = on_volatility_spike
|
|
self.check_interval = check_interval
|
|
self.price_change_threshold = price_change_threshold
|
|
self.volatility_multiplier = volatility_multiplier
|
|
self.cooldown_minutes = cooldown_minutes
|
|
|
|
# 每个币种的状态
|
|
self.states: Dict[str, VolatilityState] = {
|
|
symbol: VolatilityState(symbol=symbol) for symbol in symbols
|
|
}
|
|
|
|
self.is_running = False
|
|
self._ssl_context = ssl.create_default_context()
|
|
|
|
logger.info(
|
|
f"VolatilityMonitor 初始化: {len(symbols)} 个币种, "
|
|
f"检查间隔 {check_interval}s, 价格阈值 {price_change_threshold}%, "
|
|
f"波动率倍数 {volatility_multiplier}x, 冷却时间 {cooldown_minutes}min"
|
|
)
|
|
|
|
async def start(self):
|
|
"""启动监控"""
|
|
self.is_running = True
|
|
logger.info("VolatilityMonitor 启动")
|
|
|
|
# 先获取初始价格建立基准
|
|
await self._initialize_baseline()
|
|
|
|
while self.is_running:
|
|
try:
|
|
await self._check_volatility()
|
|
await asyncio.sleep(self.check_interval)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"波动率检查错误: {e}", exc_info=True)
|
|
await asyncio.sleep(self.check_interval)
|
|
|
|
logger.info("VolatilityMonitor 已停止")
|
|
|
|
def stop(self):
|
|
"""停止监控"""
|
|
self.is_running = False
|
|
|
|
async def _initialize_baseline(self):
|
|
"""初始化基准波动率"""
|
|
logger.info("正在建立波动率基准...")
|
|
|
|
# 获取24小时统计数据来估算基准波动率
|
|
for symbol in self.symbols:
|
|
try:
|
|
ticker = await self._fetch_24h_ticker(symbol)
|
|
if ticker:
|
|
# 使用24小时价格变化百分比作为基准
|
|
price_change_pct = abs(float(ticker.get('priceChangePercent', 0)))
|
|
# 转换为每分钟的预期波动率
|
|
self.states[symbol].baseline_volatility = price_change_pct / (24 * 60) * 5 # 5分钟窗口
|
|
logger.info(
|
|
f"[{symbol}] 24h变化: {price_change_pct:.2f}%, "
|
|
f"基准波动率: {self.states[symbol].baseline_volatility:.4f}%/5min"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"[{symbol}] 获取基准数据失败: {e}")
|
|
self.states[symbol].baseline_volatility = 0.1 # 默认值
|
|
|
|
# 收集一些初始价格数据
|
|
for _ in range(6): # 收集30秒的数据
|
|
prices = await self._fetch_prices()
|
|
for symbol, price in prices.items():
|
|
if symbol in self.states:
|
|
self.states[symbol].add_price(price)
|
|
await asyncio.sleep(5)
|
|
|
|
logger.info("波动率基准建立完成")
|
|
|
|
async def _check_volatility(self):
|
|
"""检查所有币种的波动率"""
|
|
prices = await self._fetch_prices()
|
|
|
|
for symbol, price in prices.items():
|
|
if symbol not in self.states:
|
|
continue
|
|
|
|
state = self.states[symbol]
|
|
state.add_price(price)
|
|
|
|
# 检查是否在冷却期
|
|
if state.last_trigger_time:
|
|
elapsed = (datetime.now() - state.last_trigger_time).total_seconds() / 60
|
|
if elapsed < self.cooldown_minutes:
|
|
continue
|
|
|
|
# 检查波动率
|
|
spike_info = self._detect_volatility_spike(state)
|
|
if spike_info:
|
|
logger.warning(
|
|
f"[{symbol}] 波动率突增! "
|
|
f"类型: {spike_info['type']}, "
|
|
f"变化: {spike_info['change_pct']:.2f}%, "
|
|
f"当前价: ${price:,.2f}"
|
|
)
|
|
state.last_trigger_time = datetime.now()
|
|
|
|
# 触发回调
|
|
if self.on_volatility_spike:
|
|
try:
|
|
result = self.on_volatility_spike(symbol, spike_info)
|
|
if asyncio.iscoroutine(result):
|
|
await result
|
|
except Exception as e:
|
|
logger.error(f"[{symbol}] 波动率回调执行失败: {e}")
|
|
|
|
def _detect_volatility_spike(self, state: VolatilityState) -> Optional[Dict]:
|
|
"""
|
|
检测波动率突增
|
|
|
|
返回:
|
|
触发信息字典,如果未触发返回 None
|
|
"""
|
|
if len(state.prices) < 10:
|
|
return None
|
|
|
|
prices = [p.price for p in state.prices]
|
|
current_price = prices[-1]
|
|
|
|
# 1. 检查短期价格变化 (1分钟)
|
|
prices_1min = state.get_prices_in_window(1)
|
|
if len(prices_1min) >= 2:
|
|
min_price = min(prices_1min)
|
|
max_price = max(prices_1min)
|
|
change_1min = (max_price - min_price) / min_price * 100
|
|
|
|
if change_1min >= self.price_change_threshold:
|
|
direction = "UP" if current_price >= prices_1min[0] else "DOWN"
|
|
return {
|
|
'type': 'rapid_move',
|
|
'timeframe': '1min',
|
|
'change_pct': change_1min,
|
|
'direction': direction,
|
|
'current_price': current_price,
|
|
'trigger_time': datetime.now().isoformat(),
|
|
}
|
|
|
|
# 2. 检查中期价格变化 (5分钟)
|
|
prices_5min = state.get_prices_in_window(5)
|
|
if len(prices_5min) >= 10:
|
|
min_price = min(prices_5min)
|
|
max_price = max(prices_5min)
|
|
change_5min = (max_price - min_price) / min_price * 100
|
|
|
|
# 5分钟窗口使用更高的阈值
|
|
if change_5min >= self.price_change_threshold * 2:
|
|
direction = "UP" if current_price >= prices_5min[0] else "DOWN"
|
|
return {
|
|
'type': 'sustained_move',
|
|
'timeframe': '5min',
|
|
'change_pct': change_5min,
|
|
'direction': direction,
|
|
'current_price': current_price,
|
|
'trigger_time': datetime.now().isoformat(),
|
|
}
|
|
|
|
# 3. 检查波动率突增
|
|
if len(prices_5min) >= 10 and state.baseline_volatility > 0:
|
|
# 计算当前波动率 (标准差 / 均值 * 100)
|
|
import statistics
|
|
mean_price = statistics.mean(prices_5min)
|
|
std_price = statistics.stdev(prices_5min)
|
|
current_volatility = (std_price / mean_price) * 100
|
|
|
|
state.current_volatility = current_volatility
|
|
|
|
if current_volatility >= state.baseline_volatility * self.volatility_multiplier:
|
|
return {
|
|
'type': 'volatility_spike',
|
|
'timeframe': '5min',
|
|
'change_pct': current_volatility,
|
|
'baseline_pct': state.baseline_volatility,
|
|
'multiplier': current_volatility / state.baseline_volatility,
|
|
'current_price': current_price,
|
|
'trigger_time': datetime.now().isoformat(),
|
|
}
|
|
|
|
return None
|
|
|
|
async def _fetch_prices(self) -> Dict[str, float]:
|
|
"""获取所有币种的当前价格"""
|
|
prices = {}
|
|
loop = asyncio.get_event_loop()
|
|
|
|
try:
|
|
for symbol in self.symbols:
|
|
price = await loop.run_in_executor(None, self._fetch_price_sync, symbol)
|
|
if price:
|
|
prices[symbol] = price
|
|
except Exception as e:
|
|
logger.error(f"获取价格失败: {e}")
|
|
|
|
return prices
|
|
|
|
def _fetch_price_sync(self, symbol: str) -> Optional[float]:
|
|
"""同步获取单个币种价格"""
|
|
try:
|
|
url = f"{self.BINANCE_PRICE_URL}?symbol={symbol}"
|
|
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
|
|
with urllib.request.urlopen(req, timeout=5, context=self._ssl_context) as response:
|
|
data = json.loads(response.read().decode('utf-8'))
|
|
return float(data['price'])
|
|
except Exception as e:
|
|
logger.debug(f"获取 {symbol} 价格失败: {e}")
|
|
return None
|
|
|
|
async def _fetch_24h_ticker(self, symbol: str) -> Optional[Dict]:
|
|
"""获取24小时统计数据"""
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
return await loop.run_in_executor(None, self._fetch_24h_ticker_sync, symbol)
|
|
except Exception as e:
|
|
logger.error(f"获取 {symbol} 24h数据失败: {e}")
|
|
return None
|
|
|
|
def _fetch_24h_ticker_sync(self, symbol: str) -> Optional[Dict]:
|
|
"""同步获取24小时统计"""
|
|
try:
|
|
url = f"{self.BINANCE_TICKER_URL}?symbol={symbol}"
|
|
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
|
|
with urllib.request.urlopen(req, timeout=5, context=self._ssl_context) as response:
|
|
return json.loads(response.read().decode('utf-8'))
|
|
except Exception as e:
|
|
logger.debug(f"获取 {symbol} 24h数据失败: {e}")
|
|
return None
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""获取监控状态"""
|
|
status = {
|
|
'is_running': self.is_running,
|
|
'symbols': {},
|
|
}
|
|
|
|
for symbol, state in self.states.items():
|
|
prices = [p.price for p in state.prices]
|
|
status['symbols'][symbol] = {
|
|
'price_count': len(prices),
|
|
'current_price': prices[-1] if prices else None,
|
|
'baseline_volatility': state.baseline_volatility,
|
|
'current_volatility': state.current_volatility,
|
|
'last_trigger': state.last_trigger_time.isoformat() if state.last_trigger_time else None,
|
|
}
|
|
|
|
return status
|