alphax/app/strategies/orchestrator.py
2026-05-27 07:02:37 +08:00

21 lines
871 B
Python

"""Minimal strategy orchestration helpers for first-stage multi-strategy rollout."""
from __future__ import annotations
from app.core.strategy_contract import StrategySignal
def arbitrate_strategy_signals(signals: list[StrategySignal]) -> list[StrategySignal]:
"""Deduplicate same-symbol same-direction signals by confidence.
First stage deliberately avoids complex voting. Opposite directions are
left to future long/short architecture; current system is long-only.
"""
winners: dict[tuple[str, str], StrategySignal] = {}
for signal in signals or []:
key = (str(signal.symbol or "").upper(), str(signal.direction or "long").lower())
existing = winners.get(key)
if existing is None or float(signal.confidence or 0) > float(existing.confidence or 0):
winners[key] = signal
return list(winners.values())