alphax/app/core/oi_funding.py
2026-06-01 00:16:01 +08:00

431 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""OI (Open Interest) 变化 + 资金费率反向信号因子。
核心逻辑:
1. OI Buildup持仓量蓄力
- 价格横盘 + OI 快速增加 = 有人在建仓,即将选方向
- 价格上涨 + OI 增加 = 新多头进场,趋势健康
- 价格上涨 + OI 减少 = 空头平仓驱动,动力不持续
2. 资金费率反向信号:
- 底部横盘 + 资金费率极度负值 = 空头拥挤,启动时空头平仓加速
- 高位 + 资金费率极度正值 = 多头拥挤,回调风险大
使用场景1-3天交易
- oi_buildup + 静K蓄力 → 高权重加分(蓄力确认)
- funding_negative_contrarian + RS强 → 看多信号
- funding_positive_risk + 高位 → 风险降级
"""
from __future__ import annotations
import time
from typing import Optional
import requests
from app.config.config_loader import _get_section as _get_cfg_section
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _oi_funding_config() -> dict:
"""Load config from rules.yaml -> oi_funding section."""
try:
cfg = _get_cfg_section("oi_funding") or {}
except Exception:
cfg = {}
return {
# OI buildup thresholds
"oi_buildup_min_change_pct": float(cfg.get("oi_buildup_min_change_pct", 8.0)),
"oi_buildup_price_flat_max_pct": float(cfg.get("oi_buildup_price_flat_max_pct", 3.0)),
"oi_healthy_trend_min_pct": float(cfg.get("oi_healthy_trend_min_pct", 5.0)),
"oi_divergence_min_drop_pct": float(cfg.get("oi_divergence_min_drop_pct", -5.0)),
# Funding thresholds
"funding_negative_threshold": float(cfg.get("funding_negative_threshold", -0.01)),
"funding_positive_threshold": float(cfg.get("funding_positive_threshold", 0.05)),
"funding_extreme_negative": float(cfg.get("funding_extreme_negative", -0.03)),
"funding_extreme_positive": float(cfg.get("funding_extreme_positive", 0.1)),
# Weights
"weight_oi_buildup": float(cfg.get("weight_oi_buildup", 3.0)),
"weight_oi_healthy": float(cfg.get("weight_oi_healthy", 1.5)),
"weight_oi_divergence_risk": float(cfg.get("weight_oi_divergence_risk", -2.0)),
"weight_funding_contrarian": float(cfg.get("weight_funding_contrarian", 3.5)),
"weight_funding_risk": float(cfg.get("weight_funding_risk", -3.0)),
# API
"api_timeout": float(cfg.get("api_timeout", 5.0)),
}
# ---------------------------------------------------------------------------
# Data fetching
# ---------------------------------------------------------------------------
BINANCE_FAPI_BASE = "https://fapi.binance.com"
# In-memory cache: {symbol: {"data": ..., "ts": ...}}
_oi_cache: dict = {}
_OI_CACHE_TTL = 180 # 3 minutes
def fetch_open_interest_history(symbol: str, period: str = "1h", limit: int = 25) -> Optional[list[dict]]:
"""Fetch OI history from Binance Futures.
Args:
symbol: e.g. "SOL/USDT"
period: "5m", "15m", "30m", "1h", "2h", "4h", "6h", "12h", "1d"
limit: number of data points
Returns:
List of {"timestamp": int, "sumOpenInterest": float, "sumOpenInterestValue": float}
"""
cfg = _oi_funding_config()
pair = symbol.replace("/", "")
# Check cache
cache_key = f"{pair}_{period}_{limit}"
now = time.time()
if cache_key in _oi_cache and now - _oi_cache[cache_key]["ts"] < _OI_CACHE_TTL:
return _oi_cache[cache_key]["data"]
try:
url = f"{BINANCE_FAPI_BASE}/futures/data/openInterestHist"
resp = requests.get(url, params={
"symbol": pair,
"period": period,
"limit": limit,
}, timeout=cfg["api_timeout"])
resp.raise_for_status()
data = resp.json()
if not isinstance(data, list):
return None
result = []
for item in data:
result.append({
"timestamp": int(item.get("timestamp", 0)),
"sumOpenInterest": float(item.get("sumOpenInterest", 0)),
"sumOpenInterestValue": float(item.get("sumOpenInterestValue", 0)),
})
_oi_cache[cache_key] = {"data": result, "ts": now}
return result
except Exception:
return None
def fetch_current_funding_rate(symbol: str) -> Optional[float]:
"""Fetch the latest funding rate for a symbol.
Args:
symbol: e.g. "SOL/USDT"
Returns:
Funding rate as float (e.g. 0.0001 = 0.01%)
"""
cfg = _oi_funding_config()
pair = symbol.replace("/", "")
try:
url = f"{BINANCE_FAPI_BASE}/fapi/v1/fundingRate"
resp = requests.get(url, params={
"symbol": pair,
"limit": 1,
}, timeout=cfg["api_timeout"])
resp.raise_for_status()
data = resp.json()
if data and isinstance(data, list):
return float(data[-1].get("fundingRate", 0))
return None
except Exception:
return None
def fetch_funding_rate_history(symbol: str, limit: int = 8) -> Optional[list[float]]:
"""Fetch recent funding rate history (last N periods, each 8h).
Returns list of rates from oldest to newest.
"""
cfg = _oi_funding_config()
pair = symbol.replace("/", "")
try:
url = f"{BINANCE_FAPI_BASE}/fapi/v1/fundingRate"
resp = requests.get(url, params={
"symbol": pair,
"limit": limit,
}, timeout=cfg["api_timeout"])
resp.raise_for_status()
data = resp.json()
if data and isinstance(data, list):
return [float(item.get("fundingRate", 0)) for item in data]
return None
except Exception:
return None
# ---------------------------------------------------------------------------
# OI Analysis
# ---------------------------------------------------------------------------
def analyze_oi_change(
symbol: str,
price_change_pct: float = 0.0,
oi_history: Optional[list[dict]] = None,
) -> dict:
"""Analyze OI change pattern relative to price movement.
Args:
symbol: e.g. "SOL/USDT"
price_change_pct: price change over the same period as OI lookback
oi_history: pre-fetched OI history (optional)
Returns:
{
"oi_change_pct": float,
"oi_pattern": str, # "buildup" / "healthy_trend" / "divergence" / "neutral"
"oi_signal": str, # human-readable label
"available": bool,
}
"""
cfg = _oi_funding_config()
if oi_history is None:
oi_history = fetch_open_interest_history(symbol, period="1h", limit=25)
if not oi_history or len(oi_history) < 5:
return {
"oi_change_pct": 0.0,
"oi_change_24h_pct": 0.0,
"oi_pattern": "unknown",
"oi_signal": "",
"available": False,
}
# Calculate OI change over full window
oi_start = oi_history[0]["sumOpenInterestValue"]
oi_end = oi_history[-1]["sumOpenInterestValue"]
if oi_start <= 0:
return {
"oi_change_pct": 0.0,
"oi_change_24h_pct": 0.0,
"oi_pattern": "unknown",
"oi_signal": "",
"available": False,
}
oi_change_pct = round((oi_end - oi_start) / oi_start * 100, 2)
# Also compute last 24h (last ~24 data points for 1h period)
oi_24h_start_idx = max(0, len(oi_history) - 24)
oi_24h_start = oi_history[oi_24h_start_idx]["sumOpenInterestValue"]
oi_change_24h_pct = round((oi_end - oi_24h_start) / oi_24h_start * 100, 2) if oi_24h_start > 0 else 0.0
# Classify pattern
price_flat = abs(price_change_pct) < cfg["oi_buildup_price_flat_max_pct"]
oi_rising = oi_change_pct >= cfg["oi_buildup_min_change_pct"]
oi_moderate_rise = oi_change_pct >= cfg["oi_healthy_trend_min_pct"]
oi_dropping = oi_change_pct <= cfg["oi_divergence_min_drop_pct"]
price_rising = price_change_pct > cfg["oi_buildup_price_flat_max_pct"]
if price_flat and oi_rising:
# Price sideways but OI building up → someone is positioning
pattern = "buildup"
signal = f"OI蓄力({oi_change_pct:+.1f}%价格横盘)"
elif price_rising and oi_moderate_rise:
# Price up + OI up → healthy trend with new longs
pattern = "healthy_trend"
signal = f"OI健康增长({oi_change_pct:+.1f}%)"
elif price_rising and oi_dropping:
# Price up but OI dropping → short squeeze driven, not sustainable
pattern = "divergence"
signal = f"OI背离风险(价涨{price_change_pct:+.1f}% OI{oi_change_pct:+.1f}%)"
else:
pattern = "neutral"
signal = ""
return {
"oi_change_pct": oi_change_pct,
"oi_change_24h_pct": oi_change_24h_pct,
"oi_pattern": pattern,
"oi_signal": signal,
"available": True,
}
# ---------------------------------------------------------------------------
# Funding Rate Analysis
# ---------------------------------------------------------------------------
def analyze_funding_signal(
symbol: str,
current_rate: Optional[float] = None,
rate_history: Optional[list[float]] = None,
price_position: str = "neutral",
) -> dict:
"""Analyze funding rate for contrarian or risk signals.
Args:
symbol: e.g. "SOL/USDT"
current_rate: pre-fetched current funding rate (optional)
rate_history: pre-fetched rate history (optional)
price_position: "low" / "high" / "neutral" — where price is relative
to recent range (caller determines this from PA)
Returns:
{
"funding_rate": float,
"funding_avg_recent": float,
"funding_pattern": str, # "contrarian_long" / "crowded_risk" / "neutral"
"funding_signal": str,
"consecutive_negative": int,
"available": bool,
}
"""
cfg = _oi_funding_config()
if current_rate is None:
current_rate = fetch_current_funding_rate(symbol)
if current_rate is None:
return {
"funding_rate": 0.0,
"funding_avg_recent": 0.0,
"funding_pattern": "unknown",
"funding_signal": "",
"consecutive_negative": 0,
"available": False,
}
if rate_history is None:
rate_history = fetch_funding_rate_history(symbol, limit=8)
# Calculate average and consecutive negative count
avg_rate = 0.0
consecutive_negative = 0
if rate_history:
avg_rate = sum(rate_history) / len(rate_history)
# Count consecutive negative from the end
for rate in reversed(rate_history):
if rate < 0:
consecutive_negative += 1
else:
break
# Classify pattern
rate_pct = current_rate * 100 # Convert to percentage for display
if (
current_rate <= cfg["funding_negative_threshold"]
and price_position in ("low", "neutral")
):
# Negative funding at bottom/neutral = shorts are crowded
if current_rate <= cfg["funding_extreme_negative"]:
pattern = "contrarian_long"
signal = f"资金费率极负({rate_pct:.3f}%)空头拥挤"
elif consecutive_negative >= 3:
pattern = "contrarian_long"
signal = f"资金费率持续为负(连续{consecutive_negative}期)空头积累"
else:
pattern = "mild_negative"
signal = f"资金费率偏负({rate_pct:.3f}%)"
elif (
current_rate >= cfg["funding_positive_threshold"]
and price_position in ("high", "neutral")
):
# High positive funding at top = longs are crowded
if current_rate >= cfg["funding_extreme_positive"]:
pattern = "crowded_risk"
signal = f"资金费率极高({rate_pct:.3f}%)多头拥挤风险"
else:
pattern = "elevated_risk"
signal = f"资金费率偏高({rate_pct:.3f}%)注意回调"
else:
pattern = "neutral"
signal = ""
return {
"funding_rate": round(current_rate, 6),
"funding_rate_pct": round(rate_pct, 4),
"funding_avg_recent": round(avg_rate, 6),
"funding_pattern": pattern,
"funding_signal": signal,
"consecutive_negative": consecutive_negative,
"available": True,
}
# ---------------------------------------------------------------------------
# Combined factor scoring interface
# ---------------------------------------------------------------------------
def oi_funding_factor_scores(
symbol: str,
price_change_pct: float = 0.0,
price_position: str = "neutral",
oi_history: Optional[list[dict]] = None,
current_funding: Optional[float] = None,
funding_history: Optional[list[float]] = None,
) -> dict:
"""Compute OI + funding factor scores for the scoring system.
Returns:
{
"oi": {analysis dict},
"funding": {analysis dict},
"factors": [
{"code": str, "score": float, "label": str},
...
]
}
"""
cfg = _oi_funding_config()
oi_result = analyze_oi_change(symbol, price_change_pct, oi_history)
funding_result = analyze_funding_signal(symbol, current_funding, funding_history, price_position)
factors = []
# OI factors
if oi_result.get("available"):
pattern = oi_result["oi_pattern"]
if pattern == "buildup":
factors.append({
"code": "oi_buildup",
"score": cfg["weight_oi_buildup"],
"label": oi_result["oi_signal"],
})
elif pattern == "healthy_trend":
factors.append({
"code": "oi_healthy_trend",
"score": cfg["weight_oi_healthy"],
"label": oi_result["oi_signal"],
})
elif pattern == "divergence":
factors.append({
"code": "oi_divergence_risk",
"score": cfg["weight_oi_divergence_risk"],
"label": oi_result["oi_signal"],
})
# Funding factors
if funding_result.get("available"):
pattern = funding_result["funding_pattern"]
if pattern == "contrarian_long":
factors.append({
"code": "funding_negative_contrarian",
"score": cfg["weight_funding_contrarian"],
"label": funding_result["funding_signal"],
})
elif pattern in ("crowded_risk", "elevated_risk"):
factors.append({
"code": "funding_positive_risk",
"score": cfg["weight_funding_risk"],
"label": funding_result["funding_signal"],
})
return {
"oi": oi_result,
"funding": funding_result,
"factors": factors,
}