alphax/app/strategies/orchestrator.py
2026-06-07 20:29:45 +08:00

69 lines
3.0 KiB
Python

"""Strategy orchestration helpers for the multi-strategy intraday rollout."""
from __future__ import annotations
from app.core.strategy_contract import StrategySignal
def _copy_with_status(signal: StrategySignal, status: str, reason: str) -> StrategySignal:
payload = signal.to_json_dict()
payload["status"] = status
risk = dict(payload.get("risk_plan") or {})
reasons = list(risk.get("risk_reasons") or [])
if reason not in reasons:
reasons.append(reason)
risk["risk_reasons"] = reasons
payload["risk_plan"] = risk
log = dict(payload.get("decision_log") or {})
log["decision"] = status
log["reasons"] = reasons
payload["decision_log"] = log
return StrategySignal(
strategy_code=payload["strategy_code"],
strategy_version=payload.get("strategy_version") or "",
symbol=payload.get("symbol") or "",
direction=payload.get("direction") or "long",
status=status,
confidence=float(payload.get("confidence") or 0),
score=float(payload.get("score") or 0),
run_id=payload.get("run_id") or "",
trigger=payload.get("trigger") or {},
factor_roles=payload.get("factor_roles") or {},
entry_plan=payload.get("entry_plan") or {},
risk_plan=risk,
decision_log=log,
created_at=payload.get("created_at") or "",
)
def arbitrate_strategy_signals(signals: list[StrategySignal]) -> list[StrategySignal]:
"""Deduplicate and resolve long/short conflicts for the same symbol."""
winners: dict[tuple[str, str], StrategySignal] = {}
for signal in signals or []:
key = (str(signal.symbol or "").upper(), str(signal.direction or "long").lower())
existing = winners.get(key)
if existing is None or float(signal.confidence or 0) > float(existing.confidence or 0):
winners[key] = signal
by_symbol: dict[str, list[StrategySignal]] = {}
for signal in winners.values():
by_symbol.setdefault(str(signal.symbol or "").upper(), []).append(signal)
result: list[StrategySignal] = []
for symbol_signals in by_symbol.values():
directions = {str(item.direction or "long").lower() for item in symbol_signals}
if len(directions) < 2:
result.extend(symbol_signals)
continue
ranked = sorted(symbol_signals, key=lambda x: float(x.confidence or 0), reverse=True)
leader = ranked[0]
runner = ranked[1]
leader_gap = float(leader.confidence or 0) - float(runner.confidence or 0)
runner_candidate = str(runner.status or "") == "candidate"
if leader_gap >= 20 and not runner_candidate:
result.append(leader)
for item in ranked[1:]:
result.append(_copy_with_status(item, "observe", "同币多空冲突,低置信方向降级观察"))
else:
for item in ranked:
result.append(_copy_with_status(item, "observe", "同币多空信号冲突,等待方向确认"))
return result