180 lines
5.7 KiB
Python
180 lines
5.7 KiB
Python
"""Direction-aware signal hygiene for long/short opportunities."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from copy import deepcopy
|
|
from typing import Any
|
|
|
|
from app.core.signal_taxonomy import signal_code
|
|
from app.core.trade_direction import normalize_trade_side
|
|
|
|
|
|
LONG_SUPPORT_CODES = {
|
|
"box_breakout_pullback_1h",
|
|
"box_breakout_pullback_4h",
|
|
"breakout_pullback_d1",
|
|
"breakout_pullback_w1",
|
|
"ignition_1h_current",
|
|
"ignition_4h_current",
|
|
"ignition_d1_current",
|
|
"dynamic_k_1h_bull",
|
|
"dynamic_k_d1_bull",
|
|
"breakout_15m_current",
|
|
"pullback_15m_confirm",
|
|
"top_trader_long",
|
|
"sector_rotation",
|
|
"rs_strong",
|
|
"rs_independent_strength",
|
|
"funding_negative_contrarian",
|
|
"vcp_bull_breakout",
|
|
"vcp_bull_forming",
|
|
}
|
|
|
|
SHORT_SUPPORT_CODES = {
|
|
"breakdown_retest_1h_short",
|
|
"retest_reject_15m_short",
|
|
"market_risk_off_short",
|
|
"vcp_bear_breakdown",
|
|
"vcp_bear_forming",
|
|
}
|
|
|
|
RISK_OR_CONTEXT_CODES = {
|
|
"volume_divergence_1h",
|
|
"entry_quality_gate",
|
|
"liquidity_remove_risk",
|
|
"exchange_inflow_risk",
|
|
"holder_concentration_risk",
|
|
"funding_extreme",
|
|
"trend_exhaustion",
|
|
"false_breakout",
|
|
"high_position_reject",
|
|
"risk_reward_bad",
|
|
"rs_weak",
|
|
"oi_divergence_risk",
|
|
"funding_positive_risk",
|
|
"tf_alignment_single_penalty",
|
|
"tf_alignment_conflict_penalty",
|
|
"vp_path_blocked",
|
|
"breakout_quality_low",
|
|
}
|
|
|
|
LONG_SUPPORT_TEXT = (
|
|
"大户偏多",
|
|
"BTC回调中独立走强",
|
|
"板块联动",
|
|
"龙头",
|
|
"箱体突破回踩",
|
|
"底部箱体突破",
|
|
"日线需求区反弹",
|
|
"动K(阳)",
|
|
"阳动K",
|
|
"当前多头起爆",
|
|
"VCP多头",
|
|
"资金费率负值反向看多",
|
|
"空头拥挤",
|
|
)
|
|
|
|
|
|
def is_factor_supportive_for_side(code: str, side: object, score_delta: object = 0) -> bool:
|
|
"""Whether a positive factor can support the requested trade side."""
|
|
trade_side = normalize_trade_side(side)
|
|
code = str(code or "").strip()
|
|
try:
|
|
delta = float(score_delta or 0)
|
|
except Exception:
|
|
delta = 0.0
|
|
if delta <= 0 or code in RISK_OR_CONTEXT_CODES:
|
|
return True
|
|
if trade_side == "short" and code in LONG_SUPPORT_CODES:
|
|
return False
|
|
if trade_side == "long" and code in SHORT_SUPPORT_CODES:
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_signal_supportive_for_side(signal: object, side: object) -> bool:
|
|
"""Whether a display signal belongs in the main evidence list."""
|
|
trade_side = normalize_trade_side(side)
|
|
text = str(signal or "").strip()
|
|
if not text:
|
|
return False
|
|
code = signal_code(text)
|
|
if trade_side == "short":
|
|
if code in LONG_SUPPORT_CODES:
|
|
return False
|
|
normalized = text.replace(" ", "")
|
|
if any(token.replace(" ", "") in normalized for token in LONG_SUPPORT_TEXT):
|
|
return False
|
|
elif code in SHORT_SUPPORT_CODES:
|
|
return False
|
|
return True
|
|
|
|
|
|
def sanitize_signals_for_side(signals: list[Any], side: object) -> tuple[list[Any], list[str]]:
|
|
clean: list[Any] = []
|
|
conflicts: list[str] = []
|
|
seen_clean: set[str] = set()
|
|
seen_conflict: set[str] = set()
|
|
for signal in signals or []:
|
|
text = str(signal or "").strip()
|
|
if not text:
|
|
continue
|
|
if is_signal_supportive_for_side(text, side):
|
|
if text not in seen_clean:
|
|
clean.append(signal)
|
|
seen_clean.add(text)
|
|
elif text not in seen_conflict:
|
|
conflicts.append(text)
|
|
seen_conflict.add(text)
|
|
return clean, conflicts
|
|
|
|
|
|
def excluded_factor_delta(items: list[dict[str, Any]], side: object) -> float:
|
|
total = 0.0
|
|
for item in items or []:
|
|
code = str(item.get("factor_code") or "")
|
|
delta = item.get("score_delta") or 0
|
|
if not is_factor_supportive_for_side(code, side, delta):
|
|
try:
|
|
total += float(delta or 0)
|
|
except Exception:
|
|
pass
|
|
return round(total, 6)
|
|
|
|
|
|
def sanitize_factor_breakdown_for_side(summary: dict[str, Any], side: object) -> tuple[dict[str, Any], list[dict[str, Any]]]:
|
|
"""Remove side-inconsistent positive factors from an existing summary."""
|
|
src = deepcopy(summary or {})
|
|
kept: list[dict[str, Any]] = []
|
|
removed: list[dict[str, Any]] = []
|
|
for item in src.get("items") or []:
|
|
code = str(item.get("factor_code") or "")
|
|
delta = item.get("score_delta") or 0
|
|
if is_factor_supportive_for_side(code, side, delta):
|
|
kept.append(item)
|
|
else:
|
|
removed.append(item)
|
|
|
|
groups: dict[str, dict[str, Any]] = {}
|
|
for item in kept:
|
|
group = item.get("factor_group") or "other"
|
|
bucket = groups.setdefault(group, {"score_delta": 0.0, "items": 0})
|
|
try:
|
|
bucket["score_delta"] = round(float(bucket["score_delta"]) + float(item.get("score_delta") or 0), 3)
|
|
except Exception:
|
|
pass
|
|
bucket["items"] += 1
|
|
opportunity_groups = {"momentum", "participation", "structure", "positioning", "narrative", "onchain_flow"}
|
|
src["items"] = kept
|
|
src["groups"] = groups
|
|
src["total_delta"] = round(sum(float(i.get("score_delta") or 0) for i in kept), 3)
|
|
src["opportunity_score"] = round(sum(float(v.get("score_delta") or 0) for k, v in groups.items() if k in opportunity_groups), 3)
|
|
src["entry_score"] = round(float(groups.get("entry_quality", {}).get("score_delta") or 0), 3)
|
|
src["risk_score"] = round(abs(min(0.0, float(groups.get("risk", {}).get("score_delta") or 0))), 3)
|
|
if removed:
|
|
src["direction_filtered"] = {
|
|
"side": normalize_trade_side(side),
|
|
"removed_factor_codes": [str(x.get("factor_code") or "") for x in removed],
|
|
}
|
|
return src, removed
|