alphax/app/core/global_risk.py
2026-05-23 12:13:31 +08:00

119 lines
4.9 KiB
Python

"""Global risk gate for paper trading entries."""
from __future__ import annotations
from app.core.market_regime import classify_market_regime
from app.services.market_overview import get_crypto_market_overview
def _safe_float(value, default: float = 0.0) -> float:
try:
if value is None or value == "":
return default
return float(value)
except Exception:
return default
def _safe_int(value, default: int = 0) -> int:
try:
return int(value or 0)
except Exception:
return default
def _portfolio_snapshot(conn, account_equity: float, additional_notional: float) -> dict:
open_rows = conn.execute("SELECT notional_usdt, pnl_pct FROM paper_trades WHERE status='open'").fetchall()
pending_notional = _safe_float(conn.execute("SELECT COALESCE(SUM(notional_usdt),0) FROM paper_orders WHERE status='pending'").fetchone()[0])
open_notional = 0.0
unrealized = 0.0
for row in open_rows:
notional = _safe_float(row["notional_usdt"])
open_notional += notional
unrealized += notional * _safe_float(row["pnl_pct"]) / 100
projected_notional = open_notional + pending_notional + max(0.0, _safe_float(additional_notional))
current_equity = account_equity + unrealized
return {
"open_count": len(open_rows),
"open_notional_usdt": round(open_notional, 8),
"pending_notional_usdt": round(pending_notional, 8),
"additional_notional_usdt": round(max(0.0, _safe_float(additional_notional)), 8),
"projected_notional_usdt": round(projected_notional, 8),
"unrealized_pnl_usdt": round(unrealized, 8),
"current_equity_usdt": round(current_equity, 8),
"unrealized_drawdown_pct": round(abs(min(0.0, unrealized)) / account_equity * 100, 6) if account_equity > 0 else 0,
"projected_cumulative_leverage": round(projected_notional / max(1.0, current_equity), 6),
}
def evaluate_global_risk(
*,
conn,
config: dict,
rec: dict | None = None,
additional_notional: float = 0.0,
overview: dict | None = None,
) -> dict:
"""Evaluate whether the system should allow a new paper-trading entry."""
cfg = config if isinstance(config, dict) else {}
if not bool(cfg.get("global_risk_gate_enabled", True)):
return {
"enabled": False,
"allow_new_entries": True,
"risk_level": "disabled",
"reasons": ["全局风控门禁已关闭"],
}
if overview is None:
overview = get_crypto_market_overview(allow_live_fallback=False)
regime = classify_market_regime(overview)
account_equity = max(1.0, _safe_float(cfg.get("account_equity_usdt"), 20000.0))
portfolio = _portfolio_snapshot(conn, account_equity, additional_notional)
rec_score = _safe_float((rec or {}).get("rec_score") or (rec or {}).get("score"))
min_score_high = max(0.0, _safe_float(cfg.get("global_risk_high_min_rec_score"), 70.0))
max_drawdown_critical = max(0.0, _safe_float(cfg.get("global_risk_critical_drawdown_pct"), 6.0))
max_drawdown_high = max(0.0, _safe_float(cfg.get("global_risk_high_drawdown_pct"), 3.0))
reasons = list(regime.get("reasons") or [])
risk_level = str(regime.get("risk_level") or "medium")
allow = True
decision = "allow"
drawdown = _safe_float(portfolio.get("unrealized_drawdown_pct"))
if max_drawdown_critical > 0 and drawdown >= max_drawdown_critical:
risk_level = "critical"
reasons.append("账户浮亏已进入 critical 区间,暂停所有新开仓")
elif max_drawdown_high > 0 and drawdown >= max_drawdown_high and risk_level not in {"critical"}:
risk_level = "high"
reasons.append("账户浮亏偏高,只允许高质量机会")
if risk_level == "critical" and bool(cfg.get("global_risk_block_critical", True)):
allow = False
decision = "block_critical"
elif risk_level == "high" and rec_score < min_score_high:
allow = False
decision = "block_high_weak_score"
reasons.append(f"高风险环境下推荐分 {rec_score:.1f} 低于 {min_score_high:.1f}")
max_open_positions = max(0, _safe_int(cfg.get("global_risk_max_open_positions"), 0))
if allow and max_open_positions > 0 and int(portfolio.get("open_count") or 0) >= max_open_positions:
allow = False
decision = "block_max_open_positions"
risk_level = "high" if risk_level not in {"critical"} else risk_level
reasons.append(f"持仓数量已达到上限 {max_open_positions}")
return {
"enabled": True,
"allow_new_entries": allow,
"decision": decision,
"risk_level": risk_level,
"position_multiplier": regime.get("position_multiplier", 1.0),
"max_open_positions": max_open_positions,
"min_score_when_high_risk": min_score_high,
"reasons": reasons,
"market_regime": regime,
"portfolio": portfolio,
}
__all__ = ["evaluate_global_risk"]