alphax/app/core/opportunity_funnel.py
2026-05-25 08:53:21 +08:00

223 lines
8.8 KiB
Python

"""Shared opportunity funnel helpers.
This module keeps the stage vocabulary and a few lightweight heuristics in one
place so screening, confirmation, analytics, and the pipeline log page can use
the same language.
"""
from __future__ import annotations
from typing import Any, Dict, Iterable, List, Sequence, Tuple
FUNNEL_STAGES = (
"universe_gate",
"discovery",
"quality_filter",
"trade_confirm",
"tracking",
"review",
)
FUNNEL_STAGE_LABELS = {
"universe_gate": "交易宇宙过滤",
"discovery": "异动发现",
"quality_filter": "异动质量验证",
"trade_confirm": "当前交易确认",
"tracking": "跟踪",
"review": "复盘",
}
SCREENING_LAYER_STAGE = {
"universe_gate": "universe_gate",
"粗筛": "discovery",
"细筛": "quality_filter",
"确认": "trade_confirm",
"舆情触发": "discovery",
"跟踪": "tracking",
"复盘": "review",
}
SCREENING_STAGE_LABELS = {
"universe_gate": "宇宙过滤",
"discovery_candidate": "异动发现",
"qualified_candidate": "质量通过",
"rejected_candidate": "质量淘汰",
"trade_confirm": "交易确认",
"tracking": "跟踪",
"review": "复盘",
}
UNIVERSE_REASON_LABELS = {
"stablecoin": "稳定币/锚定资产",
"wrapped": "包装资产",
"excluded_base": "排除基础资产",
"invalid_pair": "交易对异常",
"non_ascii": "非标准交易对",
"low_turnover": "24h成交额不足",
}
QUALITY_REASON_LABELS = {
"low_score": "评分不足",
"stale_signal": "旧信号复用",
"fake_pump_risk": "疑似拉高出货",
"high_chase_risk": "追高风险",
"bearish_flow_risk": "空头流风险",
"multi_source_resonance": "多源共振",
}
def stage_label(stage: str) -> str:
return FUNNEL_STAGE_LABELS.get(stage or "", stage or "")
def normalize_trade_action(action: Any) -> str:
text = str(action or "").strip()
if text in ("可即刻买入", "buy_now"):
return "buy_now"
if text in ("等回踩", "wait_pullback"):
return "wait_pullback"
if text in ("观察", "observe"):
return "observe"
return "invalid"
def screening_stage_meta(layer: str, detail: Dict[str, Any] | None = None, state: str = "", execution_status: str = "") -> Dict[str, str]:
detail = detail or {}
layer_stage = SCREENING_LAYER_STAGE.get(str(layer or "").strip(), "discovery")
candidate_stage = str(detail.get("candidate_stage") or "").strip()
if not candidate_stage:
if layer_stage == "universe_gate":
candidate_stage = "universe_gate"
elif layer_stage == "quality_filter":
candidate_stage = "qualified_candidate" if str(state or "").strip() in ("蓄力", "加速") else "rejected_candidate"
elif layer_stage == "trade_confirm":
candidate_stage = "trade_confirm"
else:
candidate_stage = "discovery_candidate"
return {
"funnel_stage": layer_stage,
"funnel_stage_label": stage_label(layer_stage),
"candidate_stage": candidate_stage,
"candidate_stage_label": SCREENING_STAGE_LABELS.get(candidate_stage, candidate_stage),
}
def discovery_source_types(candidate: Dict[str, Any]) -> List[str]:
sources: List[str] = []
if candidate.get("top_gainer_24h"):
sources.append("cex_top_gainer")
if candidate.get("vp_data") or candidate.get("turnover_acceleration_1h") or candidate.get("turnover_acceleration_4h"):
sources.append("cex")
if candidate.get("short_tf_ignition"):
sources.append("short_timeframe")
if candidate.get("static_accumulation") or candidate.get("higher_lows") or candidate.get("compression_surge"):
sources.append("structure")
if candidate.get("sentiment") or candidate.get("sentiment_bonus"):
sources.append("sentiment")
if candidate.get("funding_rate") not in (None, "", 0, 0.0):
sources.append("derivatives")
if candidate.get("bypass_origin") in ("higher_lows", "compression_surge"):
sources.append("structure")
return list(dict.fromkeys(sources)) or ["cex"]
def discovery_reason(candidate: Dict[str, Any]) -> str:
signals = candidate.get("anomalies") or candidate.get("signals") or []
if isinstance(signals, str):
signals = [signals]
texts = [str(s).strip() for s in signals if str(s).strip()]
if not texts:
return "异动发现"
return " / ".join(texts[:3])
def quality_filter_reasons(candidate: Dict[str, Any], score: int, threshold: int, signals: Sequence[Any] | None = None) -> Dict[str, List[str]]:
signals = list(signals or candidate.get("signals") or candidate.get("anomalies") or [])
text = " ".join(str(s) for s in signals)
codes: List[str] = []
has_top_gainer = bool(candidate.get("top_gainer_24h") or "24h强势榜" in text or "涨幅榜" in text)
chase_risk = bool(candidate.get("top_gainer_chase_risk") or any(keyword in text for keyword in ("追高", "站稳突破位+", "离突破位+")))
if score < threshold:
codes.append("low_score")
if "历史" in text or "过期" in text:
codes.append("stale_signal")
if any(keyword in text for keyword in ("量价背离", "冲高回落", "假突破", "多头出货")):
codes.append("fake_pump_risk")
if any(keyword in text for keyword in ("高位", "追高", "站稳突破位+", "离突破位+")):
codes.append("high_chase_risk")
if any(keyword in text for keyword in ("空头加速", "放量阴线", "量价背离", "资金费率极端")):
codes.append("bearish_flow_risk")
if any(keyword in text for keyword in ("板块联动", "舆情共振", "链上", "DEX", "鲸鱼", "聪明钱")):
codes.append("multi_source_resonance")
# 24h 强势榜异动属于“发现层”信号,不能因为低分或旧背景被直接打成纯拒绝。
# 只要没有明显追高/出货风险,就保留为可继续验证的候选。
if has_top_gainer and not chase_risk and "stale_signal" in codes and "multi_source_resonance" not in codes:
codes = [code for code in codes if code != "stale_signal"]
if has_top_gainer and not chase_risk and score < threshold and "low_score" in codes and "stale_signal" not in codes:
codes = [code for code in codes if code != "low_score"]
labels = [QUALITY_REASON_LABELS.get(code, code) for code in dict.fromkeys(codes)]
return {"codes": list(dict.fromkeys(codes)), "labels": labels}
def universe_gate_reason(base: str, quote_volume: float, min_volume: float, *, symbol: str = "") -> Dict[str, str]:
base = str(base or "").upper().strip()
symbol = str(symbol or "").strip()
if base in {"USDT", "USDC", "BUSD", "TUSD", "DAI", "FDUSD", "USDP", "PAX", "USD1", "USDE", "USDS", "RLUSD", "PYUSD", "XUSD", "USDUC", "FRAX", "LUSD", "GUSD", "SUSD", "USDD", "EURS", "EUR", "GBP"}:
return {"reason_code": "stablecoin", "reason_label": UNIVERSE_REASON_LABELS["stablecoin"]}
if base in {"WBTC", "WETH", "RENBTC"}:
return {"reason_code": "wrapped", "reason_label": UNIVERSE_REASON_LABELS["wrapped"]}
if base in {"XAUT", "PAXG"}:
return {"reason_code": "excluded_base", "reason_label": UNIVERSE_REASON_LABELS["excluded_base"]}
if not symbol or "/USDT" not in symbol:
return {"reason_code": "invalid_pair", "reason_label": UNIVERSE_REASON_LABELS["invalid_pair"]}
if not base.isascii():
return {"reason_code": "non_ascii", "reason_label": UNIVERSE_REASON_LABELS["non_ascii"]}
if float(quote_volume or 0) < float(min_volume or 0):
return {
"reason_code": "low_turnover",
"reason_label": f"{UNIVERSE_REASON_LABELS['low_turnover']}({float(quote_volume or 0):.0f}<{float(min_volume or 0):.0f})",
}
return {}
def build_screening_detail(
*,
layer: str,
state: str = "",
detail: Dict[str, Any] | None = None,
signals: Iterable[Any] | None = None,
candidate: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
detail = dict(detail or {})
meta = screening_stage_meta(layer, detail=detail, state=state)
detail.setdefault("funnel_stage", meta["funnel_stage"])
detail.setdefault("funnel_stage_label", meta["funnel_stage_label"])
detail.setdefault("candidate_stage", meta["candidate_stage"])
detail.setdefault("candidate_stage_label", meta["candidate_stage_label"])
if candidate:
detail.setdefault("source_types", discovery_source_types(candidate))
detail.setdefault("discovery_reason", discovery_reason(candidate))
if signals is not None:
detail.setdefault("signal_count", len(list(signals)))
return detail
__all__ = [
"FUNNEL_STAGES",
"FUNNEL_STAGE_LABELS",
"QUALITY_REASON_LABELS",
"SCREENING_STAGE_LABELS",
"build_screening_detail",
"discovery_reason",
"discovery_source_types",
"normalize_trade_action",
"quality_filter_reasons",
"screening_stage_meta",
"stage_label",
"universe_gate_reason",
]