69 lines
3.0 KiB
Python
69 lines
3.0 KiB
Python
"""Strategy orchestration helpers for the multi-strategy intraday rollout."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from app.core.strategy_contract import StrategySignal
|
|
|
|
|
|
def _copy_with_status(signal: StrategySignal, status: str, reason: str) -> StrategySignal:
|
|
payload = signal.to_json_dict()
|
|
payload["status"] = status
|
|
risk = dict(payload.get("risk_plan") or {})
|
|
reasons = list(risk.get("risk_reasons") or [])
|
|
if reason not in reasons:
|
|
reasons.append(reason)
|
|
risk["risk_reasons"] = reasons
|
|
payload["risk_plan"] = risk
|
|
log = dict(payload.get("decision_log") or {})
|
|
log["decision"] = status
|
|
log["reasons"] = reasons
|
|
payload["decision_log"] = log
|
|
return StrategySignal(
|
|
strategy_code=payload["strategy_code"],
|
|
strategy_version=payload.get("strategy_version") or "",
|
|
symbol=payload.get("symbol") or "",
|
|
direction=payload.get("direction") or "long",
|
|
status=status,
|
|
confidence=float(payload.get("confidence") or 0),
|
|
score=float(payload.get("score") or 0),
|
|
run_id=payload.get("run_id") or "",
|
|
trigger=payload.get("trigger") or {},
|
|
factor_roles=payload.get("factor_roles") or {},
|
|
entry_plan=payload.get("entry_plan") or {},
|
|
risk_plan=risk,
|
|
decision_log=log,
|
|
created_at=payload.get("created_at") or "",
|
|
)
|
|
|
|
|
|
def arbitrate_strategy_signals(signals: list[StrategySignal]) -> list[StrategySignal]:
|
|
"""Deduplicate and resolve long/short conflicts for the same symbol."""
|
|
winners: dict[tuple[str, str], StrategySignal] = {}
|
|
for signal in signals or []:
|
|
key = (str(signal.symbol or "").upper(), str(signal.direction or "long").lower())
|
|
existing = winners.get(key)
|
|
if existing is None or float(signal.confidence or 0) > float(existing.confidence or 0):
|
|
winners[key] = signal
|
|
by_symbol: dict[str, list[StrategySignal]] = {}
|
|
for signal in winners.values():
|
|
by_symbol.setdefault(str(signal.symbol or "").upper(), []).append(signal)
|
|
result: list[StrategySignal] = []
|
|
for symbol_signals in by_symbol.values():
|
|
directions = {str(item.direction or "long").lower() for item in symbol_signals}
|
|
if len(directions) < 2:
|
|
result.extend(symbol_signals)
|
|
continue
|
|
ranked = sorted(symbol_signals, key=lambda x: float(x.confidence or 0), reverse=True)
|
|
leader = ranked[0]
|
|
runner = ranked[1]
|
|
leader_gap = float(leader.confidence or 0) - float(runner.confidence or 0)
|
|
runner_candidate = str(runner.status or "") == "candidate"
|
|
if leader_gap >= 20 and not runner_candidate:
|
|
result.append(leader)
|
|
for item in ranked[1:]:
|
|
result.append(_copy_with_status(item, "observe", "同币多空冲突,低置信方向降级观察"))
|
|
else:
|
|
for item in ranked:
|
|
result.append(_copy_with_status(item, "observe", "同币多空信号冲突,等待方向确认"))
|
|
return result
|