"""Reusable trailing-stop decision engine. This module is intentionally side-effect free: it does not write the ledger, send notifications, or call an exchange. Paper trading and live trading should both consume the same decision object, then adapt it to their execution layer. """ from __future__ import annotations from dataclasses import dataclass, field from app.core.trade_math import normalize_side, tighter_stop def _safe_float(value, default: float = 0.0) -> float: try: if value is None or value == "": return default return float(value) except Exception: return default def _cfg_bool(config: dict, key: str, default: bool) -> bool: value = config.get(key, default) if isinstance(value, str): return value.strip().lower() in {"1", "true", "yes", "on"} return bool(value) def _clamp(value: float, min_value: float, max_value: float) -> float: low = min(min_value, max_value) high = max(min_value, max_value) return max(low, min(high, value)) @dataclass(frozen=True) class TrailingStopDecision: action: str = "hold" event_type: str = "" trailing_stop: float = 0.0 previous_trailing_stop: float = 0.0 activated: bool = False moved: bool = False profile: dict = field(default_factory=dict) detail: dict = field(default_factory=dict) def as_dict(self) -> dict: return { "action": self.action, "event_type": self.event_type, "trailing_stop": round(self.trailing_stop, 12) if self.trailing_stop > 0 else 0, "previous_trailing_stop": round(self.previous_trailing_stop, 12) if self.previous_trailing_stop > 0 else 0, "activated": self.activated, "moved": self.moved, **self.profile, **self.detail, } def normalize_trailing_config(config: dict | None = None) -> dict: cfg = config if isinstance(config, dict) else {} return { "enabled": _cfg_bool(cfg, "trailing_stop_enabled", cfg.get("enabled", True)), "mode": str(cfg.get("trailing_mode") or cfg.get("mode") or "volatility").strip().lower(), "activate_pnl_pct": max(0.0, _safe_float(cfg.get("trailing_activate_pnl_pct", cfg.get("activate_pnl_pct")), 3.0)), "min_lock_profit_pct": max(0.0, _safe_float(cfg.get("trailing_min_lock_profit_pct", cfg.get("min_lock_profit_pct")), 0.5)), "distance_pct": max(0.1, _safe_float(cfg.get("trailing_distance_pct", cfg.get("distance_pct")), 1.5)), "vol_min_activation_pct": max(0.0, _safe_float(cfg.get("trailing_volatility_min_activation_pct", cfg.get("vol_min_activation_pct")), 2.5)), "vol_max_activation_pct": max(0.1, _safe_float(cfg.get("trailing_volatility_max_activation_pct", cfg.get("vol_max_activation_pct")), 8.0)), "vol_activation_mult": max(0.0, _safe_float(cfg.get("trailing_volatility_activation_mult", cfg.get("vol_activation_mult")), 0.6)), "vol_min_distance_pct": max(0.1, _safe_float(cfg.get("trailing_volatility_min_distance_pct", cfg.get("vol_min_distance_pct")), 1.2)), "vol_max_distance_pct": max(0.1, _safe_float(cfg.get("trailing_volatility_max_distance_pct", cfg.get("vol_max_distance_pct")), 8.0)), "vol_distance_mult": max(0.0, _safe_float(cfg.get("trailing_volatility_distance_mult", cfg.get("vol_distance_mult")), 0.7)), "tiers": cfg.get("trailing_tiers") if isinstance(cfg.get("trailing_tiers"), list) else (cfg.get("tiers") if isinstance(cfg.get("tiers"), list) else []), } def trailing_distance_pct(pnl_pct: float, config: dict) -> tuple[float, str]: distance = _safe_float(config.get("distance_pct"), 1.5) label = "" tiers = config.get("tiers") or [] for tier in sorted((t for t in tiers if isinstance(t, dict)), key=lambda x: _safe_float(x.get("min_pnl_pct")), reverse=True): if pnl_pct >= _safe_float(tier.get("min_pnl_pct")): distance = max(0.1, _safe_float(tier.get("distance_pct"), distance)) label = str(tier.get("label") or "") break return distance, label def observed_volatility_pct(position: dict, current_price: float) -> float: entry = _safe_float(position.get("entry_price")) if entry <= 0 or current_price <= 0: return 0.0 high = max(_safe_float(position.get("max_price")) or entry, current_price, entry) low = min(_safe_float(position.get("min_price")) or entry, current_price, entry) return round(max(0.0, (high - low) / entry * 100), 6) def trailing_profile(position: dict, current_price: float, pnl_pct: float, config: dict | None = None) -> dict: cfg = normalize_trailing_config(config) base_activate = _safe_float(cfg.get("activate_pnl_pct"), 3.0) base_distance, tier_label = trailing_distance_pct(pnl_pct, cfg) volatility_pct = observed_volatility_pct(position, current_price) if str(cfg.get("mode") or "volatility").lower() != "volatility": return { "trailing_mode": "fixed", "volatility_pct": volatility_pct, "activate_pnl_pct": base_activate, "distance_pct": base_distance, "tier_label": tier_label, } dynamic_activate = max(base_activate, volatility_pct * _safe_float(cfg.get("vol_activation_mult"), 0.6)) dynamic_activate = _clamp( dynamic_activate, _safe_float(cfg.get("vol_min_activation_pct"), 2.5), _safe_float(cfg.get("vol_max_activation_pct"), 8.0), ) dynamic_distance = max(base_distance, volatility_pct * _safe_float(cfg.get("vol_distance_mult"), 0.7)) dynamic_distance = _clamp( dynamic_distance, _safe_float(cfg.get("vol_min_distance_pct"), 1.2), _safe_float(cfg.get("vol_max_distance_pct"), 8.0), ) return { "trailing_mode": "volatility", "volatility_pct": volatility_pct, "activate_pnl_pct": round(dynamic_activate, 6), "distance_pct": round(dynamic_distance, 6), "tier_label": tier_label or "波动率", "base_activate_pnl_pct": base_activate, "base_distance_pct": base_distance, } def evaluate_trailing_stop( *, position: dict, current_price: float, pnl_pct: float, config: dict | None = None, ) -> TrailingStopDecision: cfg = normalize_trailing_config(config) current_trail = _safe_float(position.get("trailing_stop")) if not cfg.get("enabled"): return TrailingStopDecision(action="disabled", previous_trailing_stop=current_trail, detail={"enabled": False}) entry_price = _safe_float(position.get("entry_price")) if entry_price <= 0 or current_price <= 0: return TrailingStopDecision(action="hold", previous_trailing_stop=current_trail, detail={"reason": "missing_price"}) profile = trailing_profile(position, current_price, pnl_pct, cfg) activate_pnl_pct = _safe_float(profile.get("activate_pnl_pct"), cfg.get("activate_pnl_pct")) if pnl_pct < activate_pnl_pct: return TrailingStopDecision( action="hold", previous_trailing_stop=current_trail, profile=profile, detail={"reason": "below_activation"}, ) side = normalize_side(position.get("side")) distance_pct = _safe_float(profile.get("distance_pct"), cfg.get("distance_pct")) if side == "short": protection_floor = entry_price * (1 - _safe_float(cfg.get("min_lock_profit_pct")) / 100) candidate = current_price * (1 + distance_pct / 100) new_trail = round(tighter_stop(side, current_trail, min(protection_floor, candidate)), 12) else: protection_floor = entry_price * (1 + _safe_float(cfg.get("min_lock_profit_pct")) / 100) candidate = current_price * (1 - distance_pct / 100) new_trail = round(tighter_stop(side, current_trail, max(protection_floor, candidate)), 12) activated = current_trail <= 0 and new_trail > 0 moved = current_trail > 0 and ( new_trail < current_trail - 1e-12 if side == "short" else new_trail > current_trail + 1e-12 ) if not activated and not moved: return TrailingStopDecision( action="hold", trailing_stop=current_trail, previous_trailing_stop=current_trail, profile=profile, detail={"reason": "unchanged"}, ) return TrailingStopDecision( action="activate" if activated else "move", event_type="trailing_activate" if activated else "trailing_move", trailing_stop=new_trail, previous_trailing_stop=current_trail, activated=activated, moved=moved, profile=profile, detail={"min_lock_profit_pct": cfg.get("min_lock_profit_pct")}, ) __all__ = [ "TrailingStopDecision", "evaluate_trailing_stop", "normalize_trailing_config", "observed_volatility_pct", "trailing_distance_pct", "trailing_profile", ]