"""Global risk gate for paper trading entries.""" from __future__ import annotations from app.core.market_regime import classify_market_regime from app.core.sector_map import get_sector_for_coin from app.services.market_overview import get_crypto_market_overview def _safe_float(value, default: float = 0.0) -> float: try: if value is None or value == "": return default return float(value) except Exception: return default def _safe_int(value, default: int = 0) -> int: try: return int(value or 0) except Exception: return default def _side_from_rec(rec: dict | None = None) -> str: rec = rec or {} text = str(rec.get("side") or rec.get("direction") or "").strip().lower() if text in {"short", "sell", "空", "空头", "做空", "空头启动"} or "空" in text: return "short" return "long" def _directional_market_gate(regime: dict, side: str, base_risk_level: str, base_multiplier: float) -> dict: """Translate broad market regime into side-aware entry risk. A risk-off market is a headwind for long entries, but it can be the intended environment for a dedicated short strategy. Conversely, an altcoin rotation is supportive for long opportunities and a squeeze risk for shorts. """ side = "short" if str(side or "").lower() == "short" else "long" regime_code = str((regime or {}).get("regime") or "unknown").strip().lower() risk_level = str(base_risk_level or "medium").strip().lower() multiplier = max(0.0, _safe_float(base_multiplier, 1.0)) bias = "neutral" reasons: list[str] = [] if regime_code == "risk_off": if side == "short": bias = "favorable" risk_level = "medium" multiplier = max(1.0, multiplier) reasons.append("risk_off 对空头属于顺风环境,不作为空头挂单/开仓的拦截理由") else: bias = "adverse" reasons.append("risk_off 对多头属于逆风环境,需要降仓或提高门槛") elif regime_code in {"altcoin_rotation", "btc_main_uptrend", "meme_frenzy"}: if side == "short": bias = "adverse" if regime_code == "meme_frenzy": risk_level = "critical" multiplier = min(multiplier, 0.25) reasons.append("情绪过热期做空容易被轧空,只允许极高质量空头") else: risk_level = "high" if risk_level not in {"critical"} else risk_level multiplier = min(multiplier, 0.5) reasons.append("上涨轮动/主流带动期做空属于逆势,需要提高空头质量门槛") else: bias = "favorable" multiplier = max(multiplier, 0.8) reasons.append("当前市场对多头相对顺风,不额外惩罚多头入场") elif regime_code == "sideways_chop": bias = "neutral" reasons.append("震荡期对多空都不是单边顺风,按原始风控轻仓精选") return { "side": side, "market_bias": bias, "effective_risk_level": risk_level, "effective_position_multiplier": multiplier, "reasons": reasons, "raw_risk_level": str(base_risk_level or "medium"), "raw_position_multiplier": base_multiplier, } def _portfolio_snapshot(conn, account_equity: float, additional_notional: float) -> dict: open_rows = conn.execute("SELECT notional_usdt, pnl_pct FROM paper_trades WHERE status='open'").fetchall() pending_notional = _safe_float(conn.execute("SELECT COALESCE(SUM(notional_usdt),0) FROM paper_orders WHERE status='pending'").fetchone()[0]) open_notional = 0.0 unrealized = 0.0 for row in open_rows: notional = _safe_float(row["notional_usdt"]) open_notional += notional unrealized += notional * _safe_float(row["pnl_pct"]) / 100 projected_notional = open_notional + pending_notional + max(0.0, _safe_float(additional_notional)) current_equity = account_equity + unrealized return { "open_count": len(open_rows), "open_notional_usdt": round(open_notional, 8), "pending_notional_usdt": round(pending_notional, 8), "additional_notional_usdt": round(max(0.0, _safe_float(additional_notional)), 8), "projected_notional_usdt": round(projected_notional, 8), "unrealized_pnl_usdt": round(unrealized, 8), "current_equity_usdt": round(current_equity, 8), "unrealized_drawdown_pct": round(abs(min(0.0, unrealized)) / account_equity * 100, 6) if account_equity > 0 else 0, "projected_cumulative_leverage": round(projected_notional / max(1.0, current_equity), 6), } def _sector_names(symbol: str, rec: dict | None = None) -> list[str]: sectors = [] rec = rec or {} raw = rec.get("sector") or "" if raw: sectors.extend([x.strip() for x in str(raw).split(",") if x.strip()]) if not sectors: sectors.extend(get_sector_for_coin(symbol)) return sorted({x for x in sectors if x}) def _concentration_snapshot(conn, rec: dict | None = None) -> dict: rec = rec or {} target_symbol = str(rec.get("symbol") or "").strip().upper() target_side = str(rec.get("side") or rec.get("direction") or "long").strip().lower() target_sectors = _sector_names(target_symbol, rec) open_rows = [dict(r) for r in conn.execute("SELECT symbol, side, notional_usdt FROM paper_trades WHERE status='open'").fetchall()] same_direction_count = 0 same_direction_notional = 0.0 sector_counts = {sector: 0 for sector in target_sectors} sector_notional = {sector: 0.0 for sector in target_sectors} for row in open_rows: row_side = str(row.get("side") or "long").strip().lower() if row_side == target_side: same_direction_count += 1 same_direction_notional += _safe_float(row.get("notional_usdt")) row_sectors = _sector_names(row.get("symbol") or "") for sector in target_sectors: if sector in row_sectors: sector_counts[sector] = sector_counts.get(sector, 0) + 1 sector_notional[sector] = sector_notional.get(sector, 0.0) + _safe_float(row.get("notional_usdt")) return { "target_symbol": target_symbol, "target_side": target_side, "target_sectors": target_sectors, "same_direction_count": same_direction_count, "same_direction_notional_usdt": round(same_direction_notional, 8), "same_sector_counts": sector_counts, "same_sector_notional_usdt": {k: round(v, 8) for k, v in sector_notional.items()}, } def evaluate_global_risk( *, conn, config: dict, rec: dict | None = None, additional_notional: float = 0.0, overview: dict | None = None, ) -> dict: """Evaluate whether the system should allow a new paper-trading entry.""" cfg = config if isinstance(config, dict) else {} if not bool(cfg.get("global_risk_gate_enabled", True)): return { "enabled": False, "allow_new_entries": True, "risk_level": "disabled", "reasons": ["全局风控门禁已关闭"], } if overview is None: overview = get_crypto_market_overview(allow_live_fallback=False) regime = classify_market_regime(overview) account_equity = max(1.0, _safe_float(cfg.get("account_equity_usdt"), 20000.0)) portfolio = _portfolio_snapshot(conn, account_equity, additional_notional) concentration = _concentration_snapshot(conn, rec) side = _side_from_rec(rec) rec_score = _safe_float((rec or {}).get("rec_score") or (rec or {}).get("score")) min_score_high = max(0.0, _safe_float(cfg.get("global_risk_high_min_rec_score"), 70.0)) min_score_critical = max(min_score_high, _safe_float(cfg.get("global_risk_critical_min_rec_score"), 80.0)) min_position_multiplier = max(0.0, _safe_float(cfg.get("global_risk_min_position_multiplier"), 0.2)) max_drawdown_critical = max(0.0, _safe_float(cfg.get("global_risk_critical_drawdown_pct"), 6.0)) max_drawdown_high = max(0.0, _safe_float(cfg.get("global_risk_high_drawdown_pct"), 3.0)) reasons = list(regime.get("reasons") or []) risk_level = str(regime.get("risk_level") or "medium") position_multiplier = max(min_position_multiplier, _safe_float(regime.get("position_multiplier"), 1.0)) directional_gate = _directional_market_gate(regime, side, risk_level, position_multiplier) risk_level = str(directional_gate.get("effective_risk_level") or risk_level) position_multiplier = max(min_position_multiplier, _safe_float(directional_gate.get("effective_position_multiplier"), position_multiplier)) reasons.extend(directional_gate.get("reasons") or []) allow = True decision = "allow" drawdown = _safe_float(portfolio.get("unrealized_drawdown_pct")) if max_drawdown_critical > 0 and drawdown >= max_drawdown_critical: risk_level = "critical" reasons.append("账户浮亏已进入 critical 区间,暂停所有新开仓") position_multiplier = 0.0 elif max_drawdown_high > 0 and drawdown >= max_drawdown_high and risk_level not in {"critical"}: risk_level = "high" reasons.append("账户浮亏偏高,只允许高质量机会") if risk_level == "critical" and bool(cfg.get("global_risk_block_critical", False)): allow = False decision = "block_critical" elif risk_level == "critical" and drawdown < max_drawdown_critical: if rec_score < min_score_critical: allow = False decision = "block_critical_weak_score" reasons.append(f"critical 市场环境下推荐分 {rec_score:.1f} 低于 {min_score_critical:.1f}") else: decision = "allow_reduced_size" reasons.append(f"critical 市场环境不再一刀切,按 {position_multiplier:.0%} 仓位试运行") elif risk_level == "high" and rec_score < min_score_high: allow = False decision = "block_high_weak_score" reasons.append(f"高风险环境下推荐分 {rec_score:.1f} 低于 {min_score_high:.1f}") max_open_positions = max(0, _safe_int(cfg.get("global_risk_max_open_positions"), 0)) if allow and max_open_positions > 0 and int(portfolio.get("open_count") or 0) >= max_open_positions: allow = False decision = "block_max_open_positions" risk_level = "high" if risk_level not in {"critical"} else risk_level reasons.append(f"持仓数量已达到上限 {max_open_positions}") max_same_direction = max(0, _safe_int(cfg.get("global_risk_max_same_direction_positions"), 0)) projected_same_direction = _safe_int(concentration.get("same_direction_count")) + (1 if rec else 0) if allow and max_same_direction > 0 and projected_same_direction > max_same_direction: allow = False decision = "block_same_direction_concentration" risk_level = "high" if risk_level not in {"critical"} else risk_level reasons.append(f"同方向持仓将达到 {projected_same_direction} 个,超过上限 {max_same_direction}") max_same_sector = max(0, _safe_int(cfg.get("global_risk_max_same_sector_positions"), 0)) if allow and max_same_sector > 0: for sector, count in (concentration.get("same_sector_counts") or {}).items(): projected = _safe_int(count) + 1 if projected > max_same_sector: allow = False decision = "block_same_sector_concentration" risk_level = "high" if risk_level not in {"critical"} else risk_level reasons.append(f"{sector} 板块持仓将达到 {projected} 个,超过上限 {max_same_sector}") break return { "enabled": True, "allow_new_entries": allow, "decision": decision, "risk_level": risk_level, "side": side, "directional_market_bias": directional_gate, "position_multiplier": position_multiplier, "max_open_positions": max_open_positions, "min_score_when_high_risk": min_score_high, "min_score_when_critical_risk": min_score_critical, "reasons": reasons, "market_regime": regime, "portfolio": portfolio, "concentration": concentration, } __all__ = ["evaluate_global_risk"]