alphax/app/core/trailing_stop.py
2026-05-31 14:10:44 +08:00

196 lines
8.2 KiB
Python

"""Reusable trailing-stop decision engine.
This module is intentionally side-effect free: it does not write the ledger,
send notifications, or call an exchange. Paper trading and live trading should
both consume the same decision object, then adapt it to their execution layer.
"""
from __future__ import annotations
from dataclasses import dataclass, field
def _safe_float(value, default: float = 0.0) -> float:
try:
if value is None or value == "":
return default
return float(value)
except Exception:
return default
def _cfg_bool(config: dict, key: str, default: bool) -> bool:
value = config.get(key, default)
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "on"}
return bool(value)
def _clamp(value: float, min_value: float, max_value: float) -> float:
low = min(min_value, max_value)
high = max(min_value, max_value)
return max(low, min(high, value))
@dataclass(frozen=True)
class TrailingStopDecision:
action: str = "hold"
event_type: str = ""
trailing_stop: float = 0.0
previous_trailing_stop: float = 0.0
activated: bool = False
moved: bool = False
profile: dict = field(default_factory=dict)
detail: dict = field(default_factory=dict)
def as_dict(self) -> dict:
return {
"action": self.action,
"event_type": self.event_type,
"trailing_stop": round(self.trailing_stop, 12) if self.trailing_stop > 0 else 0,
"previous_trailing_stop": round(self.previous_trailing_stop, 12) if self.previous_trailing_stop > 0 else 0,
"activated": self.activated,
"moved": self.moved,
**self.profile,
**self.detail,
}
def normalize_trailing_config(config: dict | None = None) -> dict:
cfg = config if isinstance(config, dict) else {}
return {
"enabled": _cfg_bool(cfg, "trailing_stop_enabled", cfg.get("enabled", True)),
"mode": str(cfg.get("trailing_mode") or cfg.get("mode") or "volatility").strip().lower(),
"activate_pnl_pct": max(0.0, _safe_float(cfg.get("trailing_activate_pnl_pct", cfg.get("activate_pnl_pct")), 3.0)),
"min_lock_profit_pct": max(0.0, _safe_float(cfg.get("trailing_min_lock_profit_pct", cfg.get("min_lock_profit_pct")), 0.5)),
"distance_pct": max(0.1, _safe_float(cfg.get("trailing_distance_pct", cfg.get("distance_pct")), 1.5)),
"vol_min_activation_pct": max(0.0, _safe_float(cfg.get("trailing_volatility_min_activation_pct", cfg.get("vol_min_activation_pct")), 2.5)),
"vol_max_activation_pct": max(0.1, _safe_float(cfg.get("trailing_volatility_max_activation_pct", cfg.get("vol_max_activation_pct")), 8.0)),
"vol_activation_mult": max(0.0, _safe_float(cfg.get("trailing_volatility_activation_mult", cfg.get("vol_activation_mult")), 0.6)),
"vol_min_distance_pct": max(0.1, _safe_float(cfg.get("trailing_volatility_min_distance_pct", cfg.get("vol_min_distance_pct")), 1.2)),
"vol_max_distance_pct": max(0.1, _safe_float(cfg.get("trailing_volatility_max_distance_pct", cfg.get("vol_max_distance_pct")), 8.0)),
"vol_distance_mult": max(0.0, _safe_float(cfg.get("trailing_volatility_distance_mult", cfg.get("vol_distance_mult")), 0.7)),
"tiers": cfg.get("trailing_tiers") if isinstance(cfg.get("trailing_tiers"), list) else (cfg.get("tiers") if isinstance(cfg.get("tiers"), list) else []),
}
def trailing_distance_pct(pnl_pct: float, config: dict) -> tuple[float, str]:
distance = _safe_float(config.get("distance_pct"), 1.5)
label = ""
tiers = config.get("tiers") or []
for tier in sorted((t for t in tiers if isinstance(t, dict)), key=lambda x: _safe_float(x.get("min_pnl_pct")), reverse=True):
if pnl_pct >= _safe_float(tier.get("min_pnl_pct")):
distance = max(0.1, _safe_float(tier.get("distance_pct"), distance))
label = str(tier.get("label") or "")
break
return distance, label
def observed_volatility_pct(position: dict, current_price: float) -> float:
entry = _safe_float(position.get("entry_price"))
if entry <= 0 or current_price <= 0:
return 0.0
high = max(_safe_float(position.get("max_price")) or entry, current_price, entry)
low = min(_safe_float(position.get("min_price")) or entry, current_price, entry)
return round(max(0.0, (high - low) / entry * 100), 6)
def trailing_profile(position: dict, current_price: float, pnl_pct: float, config: dict | None = None) -> dict:
cfg = normalize_trailing_config(config)
base_activate = _safe_float(cfg.get("activate_pnl_pct"), 3.0)
base_distance, tier_label = trailing_distance_pct(pnl_pct, cfg)
volatility_pct = observed_volatility_pct(position, current_price)
if str(cfg.get("mode") or "volatility").lower() != "volatility":
return {
"trailing_mode": "fixed",
"volatility_pct": volatility_pct,
"activate_pnl_pct": base_activate,
"distance_pct": base_distance,
"tier_label": tier_label,
}
dynamic_activate = max(base_activate, volatility_pct * _safe_float(cfg.get("vol_activation_mult"), 0.6))
dynamic_activate = _clamp(
dynamic_activate,
_safe_float(cfg.get("vol_min_activation_pct"), 2.5),
_safe_float(cfg.get("vol_max_activation_pct"), 8.0),
)
dynamic_distance = max(base_distance, volatility_pct * _safe_float(cfg.get("vol_distance_mult"), 0.7))
dynamic_distance = _clamp(
dynamic_distance,
_safe_float(cfg.get("vol_min_distance_pct"), 1.2),
_safe_float(cfg.get("vol_max_distance_pct"), 8.0),
)
return {
"trailing_mode": "volatility",
"volatility_pct": volatility_pct,
"activate_pnl_pct": round(dynamic_activate, 6),
"distance_pct": round(dynamic_distance, 6),
"tier_label": tier_label or "波动率",
"base_activate_pnl_pct": base_activate,
"base_distance_pct": base_distance,
}
def evaluate_trailing_stop(
*,
position: dict,
current_price: float,
pnl_pct: float,
config: dict | None = None,
) -> TrailingStopDecision:
cfg = normalize_trailing_config(config)
current_trail = _safe_float(position.get("trailing_stop"))
if not cfg.get("enabled"):
return TrailingStopDecision(action="disabled", previous_trailing_stop=current_trail, detail={"enabled": False})
entry_price = _safe_float(position.get("entry_price"))
if entry_price <= 0 or current_price <= 0:
return TrailingStopDecision(action="hold", previous_trailing_stop=current_trail, detail={"reason": "missing_price"})
profile = trailing_profile(position, current_price, pnl_pct, cfg)
activate_pnl_pct = _safe_float(profile.get("activate_pnl_pct"), cfg.get("activate_pnl_pct"))
if pnl_pct < activate_pnl_pct:
return TrailingStopDecision(
action="hold",
previous_trailing_stop=current_trail,
profile=profile,
detail={"reason": "below_activation"},
)
distance_pct = _safe_float(profile.get("distance_pct"), cfg.get("distance_pct"))
protection_floor = entry_price * (1 + _safe_float(cfg.get("min_lock_profit_pct")) / 100)
candidate = current_price * (1 - distance_pct / 100)
new_trail = round(max(current_trail, protection_floor, candidate), 12)
activated = current_trail <= 0 and new_trail > 0
moved = current_trail > 0 and new_trail > current_trail + 1e-12
if not activated and not moved:
return TrailingStopDecision(
action="hold",
trailing_stop=current_trail,
previous_trailing_stop=current_trail,
profile=profile,
detail={"reason": "unchanged"},
)
return TrailingStopDecision(
action="activate" if activated else "move",
event_type="trailing_activate" if activated else "trailing_move",
trailing_stop=new_trail,
previous_trailing_stop=current_trail,
activated=activated,
moved=moved,
profile=profile,
detail={"min_lock_profit_pct": cfg.get("min_lock_profit_pct")},
)
__all__ = [
"TrailingStopDecision",
"evaluate_trailing_stop",
"normalize_trailing_config",
"observed_volatility_pct",
"trailing_distance_pct",
"trailing_profile",
]