"""Review-aware factor scoring. This module is the bridge between daily review results and the next screening run. Strategy code should score named factors here instead of hard-coding every ``score += N`` forever. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any from app.config.config_loader import get_signal_weights from app.core.signal_taxonomy import signal_label_for_code DEFAULT_FACTOR_WEIGHTS = { "vp_fly_1h_current": 5.0, "volume_consecutive_1h": 4.0, "cex_top_gainer_24h": 2.0, "volume_divergence_1h": 1.0, "static_accum_4h": 5.0, "higher_lows_4h": 2.0, "compression_surge_4h": 2.0, "ignition_1h_current": 4.0, "ignition_4h_current": 3.0, "ignition_d1_current": 6.0, "dynamic_k_1h_bull": 3.0, "dynamic_k_d1_bull": 5.0, "breakout_pullback_d1": 8.0, "breakout_pullback_w1": 8.0, "breakout_15m_current": 3.0, "pullback_15m_confirm": 2.0, "strong_resonance_bypass": 3.0, "entry_quality_gate": 2.0, "top_trader_long": 1.0, "sector_rotation": 2.0, "sentiment_resonance": 2.0, "dex_volume_spike": 2.0, "liquidity_add": 1.5, "exchange_outflow": 2.0, "whale_accumulation": 2.5, "smart_money_buying": 2.5, "liquidity_remove_risk": 2.5, "exchange_inflow_risk": 2.5, "holder_concentration_risk": 2.0, "funding_extreme": 2.0, "trend_exhaustion": 3.0, "false_breakout": 5.0, "high_position_reject": 5.0, "risk_reward_bad": 2.0, } WEIGHT_ALIASES = { "vp_fly_1h_current": ("量价齐飞", "1H当前量价齐飞"), "volume_consecutive_1h": ("连续3x放量", "连续3x放量(≥3根)", "1H连续放量"), "volume_divergence_1h": ("量价背离", "1H量价背离"), "static_accum_4h": ("静K蓄力", "4H静K蓄力"), "ignition_1h_current": ("静K动K转折", "静K→动K转折", "1H当前起爆点"), "ignition_4h_current": ("静K动K转折", "静K→动K转折", "4H当前起爆点"), "ignition_d1_current": ("静K动K转折", "静K→动K转折", "日线当前起爆点"), "breakout_15m_current": ("15min当前突破",), "pullback_15m_confirm": ("15min回踩确认",), "top_trader_long": ("大户偏多",), "sector_rotation": ("板块联动",), "sentiment_resonance": ("舆情共振",), "dex_volume_spike": ("DEX 放量", "链上成交放量"), "liquidity_add": ("流动性增加",), "exchange_outflow": ("交易所流出",), "whale_accumulation": ("鲸鱼增持",), "smart_money_buying": ("聪明钱买入",), "liquidity_remove_risk": ("流动性撤出风险",), "exchange_inflow_risk": ("交易所流入风险",), "holder_concentration_risk": ("持仓集中风险",), "funding_extreme": ("资金费率极端",), "trend_exhaustion": ("趋势衰减",), "false_breakout": ("假突破",), "high_position_reject": ("高位拒绝",), "risk_reward_bad": ("盈亏比不合格",), } def _safe_float(value: Any, default: float = 0.0) -> float: try: if value is None or value == "": return default return float(value) except Exception: return default def _clamp(value: float, low: float, high: float) -> float: return max(low, min(high, value)) @dataclass class FactorScorer: """Apply review-adjusted factor weights and keep an audit trail.""" weights: dict[str, Any] = field(default_factory=dict) breakdown: list[dict[str, Any]] = field(default_factory=list) @classmethod def from_runtime(cls) -> "FactorScorer": try: weights = get_signal_weights() or {} except Exception: weights = {} return cls(weights=weights) def _runtime_weight(self, code: str) -> float | None: keys = [code, signal_label_for_code(code), *WEIGHT_ALIASES.get(code, ())] for key in keys: if key in self.weights: value = self.weights[key] if isinstance(value, dict): value = value.get("weight") return max(0.0, _safe_float(value)) return None def delta(self, code: str, base: float, *, evidence: str = "", value: Any = None) -> float: """Return the review-aware score delta for one factor. ``base`` remains the strategy designer's default score. Once reviews have enough samples, ``signal_performance.weight`` becomes a multiplier against the factor's canonical baseline. A reviewed weight of 0 means the factor is effectively淘汰 for scoring. """ base = _safe_float(base) runtime_weight = self._runtime_weight(code) baseline = max(0.1, _safe_float(DEFAULT_FACTOR_WEIGHTS.get(code), abs(base) or 1.0)) if runtime_weight is None: adjusted = base multiplier = 1.0 source = "base" else: multiplier = _clamp(runtime_weight / baseline, 0.0, 1.8) adjusted = base * multiplier source = "review_weight" adjusted = round(adjusted, 3) self.breakdown.append( { "factor_code": code, "factor_name": signal_label_for_code(code), "base_delta": base, "score_delta": adjusted, "runtime_weight": runtime_weight, "baseline_weight": baseline, "multiplier": round(multiplier, 4), "source": source, "evidence": evidence, "value": value, } ) return adjusted def add_existing(self, code: str, observed_score: float, *, evidence: str = "", value: Any = None, cap: float | None = None) -> float: base = _safe_float(observed_score) if cap is not None: base = min(base, _safe_float(cap, base)) return self.delta(code, base, evidence=evidence, value=value) def summary(self) -> dict[str, Any]: total = round(sum(_safe_float(item.get("score_delta")) for item in self.breakdown), 3) return {"total_delta": total, "items": self.breakdown}