"""Direction-aware signal hygiene for long/short opportunities.""" from __future__ import annotations from copy import deepcopy from typing import Any from app.core.signal_taxonomy import signal_code from app.core.trade_direction import normalize_trade_side LONG_SUPPORT_CODES = { "box_breakout_pullback_1h", "box_breakout_pullback_4h", "breakout_pullback_d1", "breakout_pullback_w1", "ignition_1h_current", "ignition_4h_current", "ignition_d1_current", "dynamic_k_1h_bull", "dynamic_k_d1_bull", "breakout_15m_current", "pullback_15m_confirm", "top_trader_long", "sector_rotation", "rs_strong", "rs_independent_strength", "funding_negative_contrarian", "vcp_bull_breakout", "vcp_bull_forming", } SHORT_SUPPORT_CODES = { "breakdown_retest_1h_short", "retest_reject_15m_short", "market_risk_off_short", "vcp_bear_breakdown", "vcp_bear_forming", } RISK_OR_CONTEXT_CODES = { "volume_divergence_1h", "entry_quality_gate", "liquidity_remove_risk", "exchange_inflow_risk", "holder_concentration_risk", "funding_extreme", "trend_exhaustion", "false_breakout", "high_position_reject", "risk_reward_bad", "rs_weak", "oi_divergence_risk", "funding_positive_risk", "tf_alignment_single_penalty", "tf_alignment_conflict_penalty", "vp_path_blocked", "breakout_quality_low", } LONG_SUPPORT_TEXT = ( "大户偏多", "BTC回调中独立走强", "板块联动", "龙头", "箱体突破回踩", "底部箱体突破", "日线需求区反弹", "动K(阳)", "阳动K", "当前多头起爆", "VCP多头", "资金费率负值反向看多", "空头拥挤", ) def is_factor_supportive_for_side(code: str, side: object, score_delta: object = 0) -> bool: """Whether a positive factor can support the requested trade side.""" trade_side = normalize_trade_side(side) code = str(code or "").strip() try: delta = float(score_delta or 0) except Exception: delta = 0.0 if delta <= 0 or code in RISK_OR_CONTEXT_CODES: return True if trade_side == "short" and code in LONG_SUPPORT_CODES: return False if trade_side == "long" and code in SHORT_SUPPORT_CODES: return False return True def is_signal_supportive_for_side(signal: object, side: object) -> bool: """Whether a display signal belongs in the main evidence list.""" trade_side = normalize_trade_side(side) text = str(signal or "").strip() if not text: return False code = signal_code(text) if trade_side == "short": if code in LONG_SUPPORT_CODES: return False normalized = text.replace(" ", "") if any(token.replace(" ", "") in normalized for token in LONG_SUPPORT_TEXT): return False elif code in SHORT_SUPPORT_CODES: return False return True def sanitize_signals_for_side(signals: list[Any], side: object) -> tuple[list[Any], list[str]]: clean: list[Any] = [] conflicts: list[str] = [] seen_clean: set[str] = set() seen_conflict: set[str] = set() for signal in signals or []: text = str(signal or "").strip() if not text: continue if is_signal_supportive_for_side(text, side): if text not in seen_clean: clean.append(signal) seen_clean.add(text) elif text not in seen_conflict: conflicts.append(text) seen_conflict.add(text) return clean, conflicts def excluded_factor_delta(items: list[dict[str, Any]], side: object) -> float: total = 0.0 for item in items or []: code = str(item.get("factor_code") or "") delta = item.get("score_delta") or 0 if not is_factor_supportive_for_side(code, side, delta): try: total += float(delta or 0) except Exception: pass return round(total, 6) def sanitize_factor_breakdown_for_side(summary: dict[str, Any], side: object) -> tuple[dict[str, Any], list[dict[str, Any]]]: """Remove side-inconsistent positive factors from an existing summary.""" src = deepcopy(summary or {}) kept: list[dict[str, Any]] = [] removed: list[dict[str, Any]] = [] for item in src.get("items") or []: code = str(item.get("factor_code") or "") delta = item.get("score_delta") or 0 if is_factor_supportive_for_side(code, side, delta): kept.append(item) else: removed.append(item) groups: dict[str, dict[str, Any]] = {} for item in kept: group = item.get("factor_group") or "other" bucket = groups.setdefault(group, {"score_delta": 0.0, "items": 0}) try: bucket["score_delta"] = round(float(bucket["score_delta"]) + float(item.get("score_delta") or 0), 3) except Exception: pass bucket["items"] += 1 opportunity_groups = {"momentum", "participation", "structure", "positioning", "narrative"} src["items"] = kept src["groups"] = groups src["total_delta"] = round(sum(float(i.get("score_delta") or 0) for i in kept), 3) src["opportunity_score"] = round(sum(float(v.get("score_delta") or 0) for k, v in groups.items() if k in opportunity_groups), 3) src["entry_score"] = round(float(groups.get("entry_quality", {}).get("score_delta") or 0), 3) src["risk_score"] = round(abs(min(0.0, float(groups.get("risk", {}).get("score_delta") or 0))), 3) if removed: src["direction_filtered"] = { "side": normalize_trade_side(side), "removed_factor_codes": [str(x.get("factor_code") or "") for x in removed], } return src, removed