From 7a7f7261a9acffb1f248dd495dcf05de66babfb1 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Thu, 28 May 2026 00:02:11 +0800 Subject: [PATCH] 1 --- .env.example | 4 +- AGENTS.md | 1 + app/cli.py | 7 +- app/config/system_config.py | 4 +- app/core/factor_roles.py | 1 + app/core/factor_scoring.py | 3 + app/core/global_risk.py | 17 +- app/core/market_regime.py | 4 +- app/core/signal_taxonomy.py | 2 + app/core/strategy_registry.py | 7 + .../0016_box_retest_1h_strategy.sql | 19 ++ app/db/paper_trading.py | 50 ++- app/db/scheduler_db.py | 2 +- app/db/screening_queries.py | 13 +- app/services/altcoin_confirm.py | 305 ++++++++++++++---- app/strategies/box_retest_4h.py | 90 +++++- rules.yaml | 4 + tests/test_box_breakout_pullback_4h.py | 23 +- tests/test_confirm_market_risk_gate.py | 10 +- tests/test_market_regime.py | 4 +- tests/test_multi_strategy_infra.py | 26 +- tests/test_paper_trading.py | 12 +- 22 files changed, 505 insertions(+), 103 deletions(-) create mode 100644 app/db/migrations/0016_box_retest_1h_strategy.sql diff --git a/.env.example b/.env.example index 8f1c5f7..a2293ed 100644 --- a/.env.example +++ b/.env.example @@ -87,8 +87,10 @@ ALPHAX_PAPER_PAUSE_AFTER_WEAK_ENTRIES=3 ALPHAX_PAPER_WEAK_ENTRY_WINDOW_HOURS=6 ALPHAX_PAPER_WEAK_ENTRY_MIN_MAX_PNL_PCT=1 ALPHAX_PAPER_GLOBAL_RISK_GATE_ENABLED=1 -ALPHAX_PAPER_GLOBAL_RISK_BLOCK_CRITICAL=1 +ALPHAX_PAPER_GLOBAL_RISK_BLOCK_CRITICAL=0 ALPHAX_PAPER_GLOBAL_RISK_HIGH_MIN_REC_SCORE=70 +ALPHAX_PAPER_GLOBAL_RISK_CRITICAL_MIN_REC_SCORE=80 +ALPHAX_PAPER_GLOBAL_RISK_MIN_POSITION_MULTIPLIER=0.2 ALPHAX_PAPER_GLOBAL_RISK_HIGH_DRAWDOWN_PCT=3 ALPHAX_PAPER_GLOBAL_RISK_CRITICAL_DRAWDOWN_PCT=6 ALPHAX_PAPER_GLOBAL_RISK_MAX_OPEN_POSITIONS=0 diff --git a/AGENTS.md b/AGENTS.md index 41f5af8..4bef6e6 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -176,6 +176,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - 市场环境识别中心,第一版基于市场快照、BTC/ETH 涨跌、山寨涨跌广度、强势/大跌数量和 funding 热度识别 `risk_off`、`btc_main_uptrend`、`altcoin_rotation`、`sideways_chop`、`meme_frenzy`、`unknown`。 - `app/core/global_risk.py` - paper trading 全局风控门禁。单币机会进入开仓或挂单成交前,需要先检查市场环境和账户风险;critical 禁止新开仓,high 只允许高质量机会。 +- 多策略基础设施当前内置 `main_composite_v1`、`box_retest_4h_v1`、`box_retest_1h_v1`。`box_breakout_pullback_4h` / `box_breakout_pullback_1h` 只是触发因子,只有和入场确认、风控、失效条件组成完整剧本后,才作为对应策略信号写入 `strategy_signals`。 - 确认层也会应用同一市场风控语义:`risk_level=critical` 且 `position_multiplier=0` 时,强势发现仍可记录为观察,但不能输出 `buy_now` 或新挂单动作;已有活跃可交易推荐会被降级为观察并写入 `market_risk_gate`。 ## 5. 数据与状态中心 diff --git a/app/cli.py b/app/cli.py index 08a1b6d..95c7772 100644 --- a/app/cli.py +++ b/app/cli.py @@ -14,7 +14,10 @@ def build_parser(): screener.add_argument("--compact", action="store_true", help="输出紧凑 JSON") confirm = subparsers.add_parser("confirm", help="运行确认流程") - confirm.add_argument("--compact", action="store_true", help="输出紧凑 JSON") + confirm.add_argument("--compact", action="store_true", help="输出紧凑摘要 JSON") + confirm.add_argument("--verbose", action="store_true", help="输出完整确认上下文,仅排查时使用") + confirm.add_argument("--limit", type=int, default=None, help="本轮最多确认的候选数量") + confirm.add_argument("--max-seconds", type=int, default=None, help="本轮最大运行秒数") tracker = subparsers.add_parser("tracker", help="运行价格跟踪") @@ -61,7 +64,7 @@ def main(): if args.command == "screener": return altcoin_screener.main(compact=args.compact) if args.command == "confirm": - return altcoin_confirm.main(compact=args.compact) + return altcoin_confirm.main(compact=args.compact, verbose=args.verbose, limit=args.limit, max_seconds=args.max_seconds) if args.command == "tracker": return price_tracker.main() if args.command == "paper-trader": diff --git a/app/config/system_config.py b/app/config/system_config.py index 5e48ef7..8daf3b8 100644 --- a/app/config/system_config.py +++ b/app/config/system_config.py @@ -181,7 +181,9 @@ def default_paper_trading_config(): "weak_entry_window_hours": _env_float("ALPHAX_PAPER_WEAK_ENTRY_WINDOW_HOURS", 6.0), "weak_entry_min_max_pnl_pct": _env_float("ALPHAX_PAPER_WEAK_ENTRY_MIN_MAX_PNL_PCT", 1.0), "global_risk_gate_enabled": _env_bool("ALPHAX_PAPER_GLOBAL_RISK_GATE_ENABLED", True), - "global_risk_block_critical": _env_bool("ALPHAX_PAPER_GLOBAL_RISK_BLOCK_CRITICAL", True), + "global_risk_block_critical": _env_bool("ALPHAX_PAPER_GLOBAL_RISK_BLOCK_CRITICAL", False), + "global_risk_critical_min_rec_score": _env_float("ALPHAX_PAPER_GLOBAL_RISK_CRITICAL_MIN_REC_SCORE", 80.0), + "global_risk_min_position_multiplier": _env_float("ALPHAX_PAPER_GLOBAL_RISK_MIN_POSITION_MULTIPLIER", 0.2), "global_risk_high_min_rec_score": _env_float("ALPHAX_PAPER_GLOBAL_RISK_HIGH_MIN_REC_SCORE", 70.0), "global_risk_high_drawdown_pct": _env_float("ALPHAX_PAPER_GLOBAL_RISK_HIGH_DRAWDOWN_PCT", 3.0), "global_risk_critical_drawdown_pct": _env_float("ALPHAX_PAPER_GLOBAL_RISK_CRITICAL_DRAWDOWN_PCT", 6.0), diff --git a/app/core/factor_roles.py b/app/core/factor_roles.py index 792bef5..4ad7648 100644 --- a/app/core/factor_roles.py +++ b/app/core/factor_roles.py @@ -30,6 +30,7 @@ VALID_FACTOR_ROLES = { DEFAULT_FACTOR_ROLES: dict[str, str] = { "box_breakout_pullback_4h": TRIGGER, + "box_breakout_pullback_1h": TRIGGER, "vp_fly_1h_current": TRIGGER, "volume_consecutive_1h": CONFIRMATION, "volume_divergence_1h": RISK, diff --git a/app/core/factor_scoring.py b/app/core/factor_scoring.py index ed3e5ac..baf5895 100644 --- a/app/core/factor_scoring.py +++ b/app/core/factor_scoring.py @@ -22,6 +22,7 @@ DEFAULT_FACTOR_WEIGHTS = { "static_accum_4h": 5.0, "higher_lows_4h": 2.0, "compression_surge_4h": 2.0, + "box_breakout_pullback_1h": 6.0, "box_breakout_pullback_4h": 8.0, "ignition_1h_current": 4.0, "ignition_4h_current": 3.0, @@ -60,6 +61,7 @@ FACTOR_GROUPS = { "static_accum_4h": "structure", "higher_lows_4h": "structure", "compression_surge_4h": "structure", + "box_breakout_pullback_1h": "structure", "box_breakout_pullback_4h": "structure", "ignition_1h_current": "momentum", "ignition_4h_current": "momentum", @@ -106,6 +108,7 @@ WEIGHT_ALIASES = { "volume_consecutive_1h": ("连续3x放量", "连续3x放量(≥3根)", "1H连续放量"), "volume_divergence_1h": ("量价背离", "1H量价背离"), "static_accum_4h": ("静K蓄力", "4H静K蓄力"), + "box_breakout_pullback_1h": ("1H箱体突破回踩", "1H底部箱体突破回踩"), "box_breakout_pullback_4h": ("4H箱体突破回踩", "4H底部箱体突破回踩"), "ignition_1h_current": ("静K动K转折", "静K→动K转折", "1H当前起爆点"), "ignition_4h_current": ("静K动K转折", "静K→动K转折", "4H当前起爆点"), diff --git a/app/core/global_risk.py b/app/core/global_risk.py index ac17bbc..dad68a6 100644 --- a/app/core/global_risk.py +++ b/app/core/global_risk.py @@ -115,10 +115,13 @@ def evaluate_global_risk( concentration = _concentration_snapshot(conn, rec) rec_score = _safe_float((rec or {}).get("rec_score") or (rec or {}).get("score")) min_score_high = max(0.0, _safe_float(cfg.get("global_risk_high_min_rec_score"), 70.0)) + min_score_critical = max(min_score_high, _safe_float(cfg.get("global_risk_critical_min_rec_score"), 80.0)) + min_position_multiplier = max(0.0, _safe_float(cfg.get("global_risk_min_position_multiplier"), 0.2)) max_drawdown_critical = max(0.0, _safe_float(cfg.get("global_risk_critical_drawdown_pct"), 6.0)) max_drawdown_high = max(0.0, _safe_float(cfg.get("global_risk_high_drawdown_pct"), 3.0)) reasons = list(regime.get("reasons") or []) risk_level = str(regime.get("risk_level") or "medium") + position_multiplier = max(min_position_multiplier, _safe_float(regime.get("position_multiplier"), 1.0)) allow = True decision = "allow" @@ -126,13 +129,22 @@ def evaluate_global_risk( if max_drawdown_critical > 0 and drawdown >= max_drawdown_critical: risk_level = "critical" reasons.append("账户浮亏已进入 critical 区间,暂停所有新开仓") + position_multiplier = 0.0 elif max_drawdown_high > 0 and drawdown >= max_drawdown_high and risk_level not in {"critical"}: risk_level = "high" reasons.append("账户浮亏偏高,只允许高质量机会") - if risk_level == "critical" and bool(cfg.get("global_risk_block_critical", True)): + if risk_level == "critical" and bool(cfg.get("global_risk_block_critical", False)): allow = False decision = "block_critical" + elif risk_level == "critical" and drawdown < max_drawdown_critical: + if rec_score < min_score_critical: + allow = False + decision = "block_critical_weak_score" + reasons.append(f"critical 市场环境下推荐分 {rec_score:.1f} 低于 {min_score_critical:.1f}") + else: + decision = "allow_reduced_size" + reasons.append(f"critical 市场环境不再一刀切,按 {position_multiplier:.0%} 仓位试运行") elif risk_level == "high" and rec_score < min_score_high: allow = False decision = "block_high_weak_score" @@ -169,9 +181,10 @@ def evaluate_global_risk( "allow_new_entries": allow, "decision": decision, "risk_level": risk_level, - "position_multiplier": regime.get("position_multiplier", 1.0), + "position_multiplier": position_multiplier, "max_open_positions": max_open_positions, "min_score_when_high_risk": min_score_high, + "min_score_when_critical_risk": min_score_critical, "reasons": reasons, "market_regime": regime, "portfolio": portfolio, diff --git a/app/core/market_regime.py b/app/core/market_regime.py index 796b6a5..0d0fdd2 100644 --- a/app/core/market_regime.py +++ b/app/core/market_regime.py @@ -63,8 +63,8 @@ def classify_market_regime(overview: dict | None) -> dict: "label": "风险释放期", "confidence": 0.88, "risk_level": "critical", - "position_multiplier": 0.0, - "reasons": ["主流币或山寨广度明显走弱,新开山寨仓位容易变成接飞刀"], + "position_multiplier": 0.25, + "reasons": ["主流币或山寨广度明显走弱,新开山寨仓位必须显著降仓并提高质量门槛"], "metrics": metrics, } diff --git a/app/core/signal_taxonomy.py b/app/core/signal_taxonomy.py index 9421188..d7974cd 100644 --- a/app/core/signal_taxonomy.py +++ b/app/core/signal_taxonomy.py @@ -22,6 +22,7 @@ SIGNAL_CODE_LABELS = { "static_accum_4h": "4H静K蓄力", "higher_lows_4h": "4H底部抬高", "compression_surge_4h": "4H压缩放量", + "box_breakout_pullback_1h": "1H箱体突破回踩", "box_breakout_pullback_4h": "4H箱体突破回踩", "ignition_1h_current": "1H当前起爆点", "ignition_4h_current": "4H当前起爆点", @@ -64,6 +65,7 @@ _PATTERNS = [ ("short_tf_5m_ignition", ("5min极早期启动", "5m极早期启动", "5min 早期启动")), ("volume_divergence_1h", ("量价背离", "放量但无量价齐飞")), ("static_accum_4h", ("静K蓄力", "静K旁路")), + ("box_breakout_pullback_1h", ("1H", "箱体", "突破", "回踩")), ("box_breakout_pullback_4h", ("4H", "箱体", "突破", "回踩")), ("higher_lows_4h", ("底部抬高",)), ("compression_surge_4h", ("压缩放量",)), diff --git a/app/core/strategy_registry.py b/app/core/strategy_registry.py index 96882e9..56bed74 100644 --- a/app/core/strategy_registry.py +++ b/app/core/strategy_registry.py @@ -6,6 +6,7 @@ from dataclasses import dataclass MAIN_COMPOSITE_STRATEGY = "main_composite_v1" +BOX_RETEST_1H_STRATEGY = "box_retest_1h_v1" BOX_RETEST_4H_STRATEGY = "box_retest_4h_v1" @@ -25,6 +26,12 @@ STRATEGY_DEFINITIONS: dict[str, StrategyDefinition] = { description="迁移期兼容主链路,承载现有综合筛选与确认逻辑。", mode="paper_enabled", ), + BOX_RETEST_1H_STRATEGY: StrategyDefinition( + strategy_code=BOX_RETEST_1H_STRATEGY, + strategy_name="1H箱体突破回踩", + description="小时级底部箱体突破后回踩箱体上沿或EMA承接的早期结构策略。", + mode="paper_only", + ), BOX_RETEST_4H_STRATEGY: StrategyDefinition( strategy_code=BOX_RETEST_4H_STRATEGY, strategy_name="4H箱体突破回踩", diff --git a/app/db/migrations/0016_box_retest_1h_strategy.sql b/app/db/migrations/0016_box_retest_1h_strategy.sql new file mode 100644 index 0000000..194b4b7 --- /dev/null +++ b/app/db/migrations/0016_box_retest_1h_strategy.sql @@ -0,0 +1,19 @@ +INSERT INTO strategy_catalog ( + strategy_code, strategy_name, strategy_version, status, mode, description, config_json, created_at, updated_at +) VALUES ( + 'box_retest_1h_v1', + '1H箱体突破回踩', + '', + 'active', + 'paper_only', + '小时级底部箱体突破后回踩箱体上沿或EMA承接的早期结构策略。', + '{}', + NOW()::TEXT, + NOW()::TEXT +) +ON CONFLICT(strategy_code) DO UPDATE SET + strategy_name=EXCLUDED.strategy_name, + status=EXCLUDED.status, + mode=EXCLUDED.mode, + description=EXCLUDED.description, + updated_at=NOW()::TEXT; diff --git a/app/db/paper_trading.py b/app/db/paper_trading.py index 488dbf3..48830a4 100644 --- a/app/db/paper_trading.py +++ b/app/db/paper_trading.py @@ -178,6 +178,17 @@ def _global_risk_entry_check(conn, rec: dict, additional_notional: float, config return bool(detail.get("allow_new_entries", True)), detail +def _market_risk_adjusted_notional(base_notional: float, risk_detail: dict | None, config: dict | None = None) -> float: + cfg = _paper_cfg(config) + detail = risk_detail if isinstance(risk_detail, dict) else {} + multiplier = _safe_float(detail.get("position_multiplier"), 1.0) + if multiplier <= 0: + multiplier = 1.0 + adjusted = _safe_float(base_notional) * multiplier + min_notional = max(1.0, _safe_float(cfg.get("min_trade_notional_usdt"), 1.0)) + return round(max(min_notional, adjusted), 8) + + def _trailing_config() -> dict: cfg = paper_trading_config() return { @@ -693,9 +704,6 @@ def _open_trade(conn, rec: dict, current_price: float, event_time: str, config: "leverage": leverage, }, } - pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, notional, event_time, cfg) - if not pause_ok: - return {"opened": False, "skipped": True, "reason": pause_reason, "risk_detail": pause_detail} global_ok, global_detail = _global_risk_entry_check(conn, rec, notional, cfg) if not global_ok: return { @@ -704,6 +712,19 @@ def _open_trade(conn, rec: dict, current_price: float, event_time: str, config: "reason": "global_risk_rejected", "risk_detail": global_detail, } + adjusted_notional = _market_risk_adjusted_notional(notional, global_detail, cfg) + if adjusted_notional != notional: + plan["market_position_sizing"] = { + "base_notional_usdt": notional, + "adjusted_notional_usdt": adjusted_notional, + "position_multiplier": global_detail.get("position_multiplier"), + "risk_level": global_detail.get("risk_level"), + "decision": global_detail.get("decision"), + } + notional = adjusted_notional + pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, notional, event_time, cfg) + if not pause_ok: + return {"opened": False, "skipped": True, "reason": pause_reason, "risk_detail": pause_detail} leverage_ok, leverage_detail = _cumulative_leverage_check(conn, notional, cfg, exclude_rec_id=rec_id) if not leverage_ok: return { @@ -1048,20 +1069,33 @@ def _fill_paper_order(conn, order: dict, rec: dict, current_price: float, event_ max_sl_risk = max(0.0, _safe_float(cfg.get("max_stop_loss_leverage_risk_pct"), 0)) if max_sl_risk > 0 and sl_risk > max_sl_risk: return _cancel_paper_order(conn, order, "stop_loss_leverage_risk_exceeded", event_time) - pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, default_notional_usdt(cfg), event_time, cfg) - if not pause_ok: - return _cancel_paper_order(conn, order, pause_reason, event_time) - global_ok, global_detail = _global_risk_entry_check(conn, rec, default_notional_usdt(cfg), cfg) + base_notional = _safe_float(order.get("notional_usdt"), default_notional_usdt(cfg)) + global_ok, global_detail = _global_risk_entry_check(conn, rec, base_notional, cfg) if not global_ok: result = _cancel_paper_order(conn, order, "global_risk_rejected", event_time) result["risk_detail"] = global_detail return result + adjusted_notional = _market_risk_adjusted_notional(base_notional, global_detail, cfg) + pause_ok, pause_reason, pause_detail = _portfolio_entry_pause_check(conn, adjusted_notional, event_time, cfg) + if not pause_ok: + return _cancel_paper_order(conn, order, pause_reason, event_time) trade_rec = dict(rec) plan = _entry_plan(trade_rec) plan.setdefault("entry_price", fill_price) + if adjusted_notional != base_notional: + plan["market_position_sizing"] = { + "base_notional_usdt": base_notional, + "adjusted_notional_usdt": adjusted_notional, + "position_multiplier": global_detail.get("position_multiplier"), + "risk_level": global_detail.get("risk_level"), + "decision": global_detail.get("decision"), + } trade_rec["entry_plan"] = plan trade_rec["entry_price"] = fill_price - result = _open_trade(conn, trade_rec, fill_price, event_time, config=config, push_open_card=False) + # Filled limit orders should keep the notional decided when the order was + # created; risk sizing is still applied once inside _open_trade. + fill_cfg = {**cfg, "trade_notional_usdt": base_notional} + result = _open_trade(conn, trade_rec, fill_price, event_time, config=fill_cfg, push_open_card=False) if result.get("opened"): order = {**order, "fill_price": fill_price} conn.execute( diff --git a/app/db/scheduler_db.py b/app/db/scheduler_db.py index e2d0d63..6c475f9 100644 --- a/app/db/scheduler_db.py +++ b/app/db/scheduler_db.py @@ -61,7 +61,7 @@ DEFAULT_JOBS = [ { "job_name": "confirm", "command": "confirm", - "args": [], + "args": ["--compact", "--limit", "8", "--max-seconds", "90"], "every_seconds": 600, "initial_delay": 40, "lock_group": "recommendation_write", diff --git a/app/db/screening_queries.py b/app/db/screening_queries.py index 56788d5..28740f8 100644 --- a/app/db/screening_queries.py +++ b/app/db/screening_queries.py @@ -38,26 +38,29 @@ def get_screening_history(hours=24, limit=100): return [dict(r) for r in rows] -def get_candidates_for_confirm(): +def get_candidates_for_confirm(limit=None): """Read candidates for confirm layer, preferring the latest screening window.""" try: _, _, accumulate_threshold = state_score_thresholds() except Exception: accumulate_threshold = 3 conn = get_conn() + limit = max(1, min(int(limit or 50), 100)) rows = conn.execute(""" SELECT * FROM coin_state WHERE state IN ('加速', '蓄力') AND score >= %s AND detected_at >= %s - ORDER BY detected_at DESC, score DESC - """, (accumulate_threshold, (datetime.now() - timedelta(minutes=45)).isoformat())).fetchall() + ORDER BY score DESC, detected_at DESC + LIMIT %s + """, (accumulate_threshold, (datetime.now() - timedelta(minutes=45)).isoformat(), limit)).fetchall() if not rows: rows = conn.execute(""" SELECT * FROM coin_state WHERE state IN ('加速', '蓄力') AND score >= 5 - ORDER BY detected_at DESC, score DESC - """).fetchall() + ORDER BY score DESC, detected_at DESC + LIMIT %s + """, (limit,)).fetchall() conn.close() return [dict(r) for r in rows] diff --git a/app/services/altcoin_confirm.py b/app/services/altcoin_confirm.py index ef7df71..3019161 100644 --- a/app/services/altcoin_confirm.py +++ b/app/services/altcoin_confirm.py @@ -53,7 +53,7 @@ from app.core.market_regime import classify_market_regime from app.db.onchain_db import get_onchain_factor_context from app.db.strategy_signal_queries import insert_strategy_signal from app.services.market_overview import get_crypto_market_overview -from app.strategies.box_retest_4h import build_box_retest_signal +from app.strategies.box_retest_4h import build_box_retest_1h_signal, build_box_retest_4h_signal from app.config.config_loader import _get_section as _get_cfg_section from app.core.pa_engine import ( classify_candles, calc_atr, find_supply_demand_zones, @@ -61,26 +61,67 @@ from app.core.pa_engine import ( analyze_entry_point, detect_trend_exhaustion, ) -exchange = ccxt.binance({"enableRateLimit": True}) +def _confirm_cfg_value(key, default): + try: + return _get_cfg_section("confirm").get(key, default) + except Exception: + return default + + +def _confirm_http_timeout() -> float: + return float(os.getenv("ALPHAX_CONFIRM_HTTP_TIMEOUT_SECONDS") or _confirm_cfg_value("http_timeout_seconds", 2.5) or 2.5) + + +def _confirm_kline_timeout_ms() -> int: + return int(float(os.getenv("ALPHAX_CONFIRM_KLINE_TIMEOUT_MS") or _confirm_cfg_value("kline_timeout_ms", 4500) or 4500)) + + +exchange = ccxt.binance({"enableRateLimit": True, "timeout": _confirm_kline_timeout_ms()}) REPO_ROOT = Path(__file__).resolve().parents[2] def _strategy_context_for_recommendation(symbol: str, result: dict, entry_plan: dict) -> dict: """Build and persist a standard strategy signal when an independent strategy matches.""" + bp_1h = result.get("box_breakout_pullback_1h") or (result.get("market_context") or {}).get("box_breakout_pullback_1h") or {} bp_4h = result.get("box_breakout_pullback_4h") or (result.get("market_context") or {}).get("box_breakout_pullback_4h") or {} - if not bp_4h.get("detected"): + if not bp_1h.get("detected") and not bp_4h.get("detected"): return {} - signal = build_box_retest_signal( - symbol=symbol, - current_price=result.get("price") or 0, - detection=bp_4h, - entry_plan=entry_plan or {}, - market_regime=result.get("market_regime") or (result.get("market_context") or {}).get("market_regime") or {}, - decision_log=result.get("decision_log") or {}, - ) - if not signal: + market_regime = result.get("market_regime") or (result.get("market_context") or {}).get("market_regime") or {} + signal_candidates = [] + if bp_1h.get("detected"): + signal_candidates.append( + build_box_retest_1h_signal( + symbol=symbol, + current_price=result.get("price") or 0, + detection=bp_1h, + entry_plan=entry_plan or {}, + market_regime=market_regime, + decision_log=result.get("decision_log") or {}, + ) + ) + if bp_4h.get("detected"): + signal_candidates.append( + build_box_retest_4h_signal( + symbol=symbol, + current_price=result.get("price") or 0, + detection=bp_4h, + entry_plan=entry_plan or {}, + market_regime=market_regime, + decision_log=result.get("decision_log") or {}, + ) + ) + saved_payloads = [] + for signal in [item for item in signal_candidates if item]: + saved_payloads.append(insert_strategy_signal(signal)) + if not saved_payloads: return {} - payload = insert_strategy_signal(signal) + def _rank(payload: dict) -> tuple: + status_rank = {"candidate": 3, "observe": 2, "risk": 1, "rejected": 0}.get(str(payload.get("signal_status") or payload.get("status") or ""), 0) + trigger = payload.get("trigger") or {} + age = _safe_age_bars(trigger.get("pullback_age_bars")) + return (status_rank, float(payload.get("confidence") or 0), -age, float(payload.get("score") or 0)) + + payload = sorted(saved_payloads, key=_rank, reverse=True)[0] return { "strategy_code": payload.get("strategy_code"), "strategy_signal_id": payload.get("strategy_signal_id") or payload.get("id") or 0, @@ -91,6 +132,7 @@ def _strategy_context_for_recommendation(symbol: str, result: dict, entry_plan: def fetch_klines(symbol, timeframe, limit=200): try: + exchange.timeout = _confirm_kline_timeout_ms() ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit) df = pd.DataFrame(ohlcv, columns=["timestamp", "open", "high", "low", "close", "volume"]) df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") @@ -177,6 +219,15 @@ def _event_time_from_age(df, age_bars: int): return None +def _safe_age_bars(value, default=999) -> int: + try: + if value is None or value == "": + return default + return int(value) + except Exception: + return default + + def _is_candidate_fresh(cand, event_times, max_hours=6): """候选新鲜度:当前触发或新近进入候选池,避免旧结构反复确认。""" now = datetime.now() @@ -216,6 +267,7 @@ def _build_trigger_context( stale_1h_ignitions=None, stale_d1_ignitions=None, bp_daily=None, + bp_1h=None, bp_4h=None, entry_action="", ): @@ -242,6 +294,20 @@ def _build_trigger_context( fresh_event_bucket.append({"type": "technical", "label": fresh_event_label, "source": "pa_engine", **e}) if (bp_daily or {}).get("detected"): stale.append({"type": "technical_background", "label": "日线底部突破回踩背景", "source": "daily_structure"}) + if (bp_1h or {}).get("detected"): + age = bp_1h.get("pullback_age_bars") + item = { + "type": "technical", + "label": "1H箱体突破回踩", + "source": "box_breakout_pullback_1h", + "entry_zone": bp_1h.get("entry_zone"), + "pullback_kind": bp_1h.get("pullback_kind"), + "age_bars": age, + } + if age is not None and int(age) <= 2: + current.append(item) + else: + stale.append({**item, "type": "technical_background"}) if (bp_4h or {}).get("detected"): age = bp_4h.get("pullback_age_bars") item = { @@ -292,11 +358,12 @@ def fetch_derivatives_context(symbol): """ futures_sym = _spot_to_futures(symbol) ctx = {} + timeout = _confirm_http_timeout() try: # 1. Funding Rate r = requests.get( f"https://fapi.binance.com/fapi/v1/premiumIndex?symbol={futures_sym}", - timeout=5, + timeout=timeout, ) if r.status_code == 200: ctx["funding_rate"] = float(r.json().get("lastFundingRate", 0) or 0) @@ -307,7 +374,7 @@ def fetch_derivatives_context(symbol): # 2. Open Interest r = requests.get( f"https://fapi.binance.com/fapi/v1/openInterest?symbol={futures_sym}", - timeout=5, + timeout=timeout, ) if r.status_code == 200: ctx["open_interest"] = float(r.json().get("openInterest", 0) or 0) @@ -319,7 +386,7 @@ def fetch_derivatives_context(symbol): r = requests.get( f"https://fapi.binance.com/futures/data/topLongShortAccountRatio" f"?symbol={futures_sym}&period=5m&limit=2", - timeout=5, + timeout=timeout, ) if r.status_code == 200: data = r.json() @@ -339,7 +406,7 @@ def fetch_derivatives_context(symbol): r = requests.get( f"https://fapi.binance.com/futures/data/openInterestHist" f"?symbol={futures_sym}&period=1d&limit=2", - timeout=5, + timeout=timeout, ) if r.status_code == 200: hist = r.json() @@ -575,26 +642,27 @@ def _decision_log(module: str, decision: str, *, score: float = 0.0, reasons=Non def _apply_market_risk_entry_gate(entry_plan: dict, signals: list, market_regime: dict) -> tuple[dict, str]: - """Keep high-risk market discoveries visible, but block executable entries.""" + """Annotate market risk as position sizing guidance instead of a hard blocker.""" plan = dict(entry_plan or {}) regime = market_regime if isinstance(market_regime, dict) else {} risk_level = str(regime.get("risk_level") or "").strip().lower() - position_multiplier = float(regime.get("position_multiplier") or 0) + position_multiplier = max(0.0, float(regime.get("position_multiplier") or 0)) current_action = str(plan.get("entry_action") or "").strip() - if risk_level != "critical" or position_multiplier > 0: + if risk_level != "critical": return plan, "" if current_action not in {"可即刻买入", "即刻买入", "等回踩"}: return plan, "" - reason = "全市场处于 critical 风险,暂停新开仓与新挂单,保留为观察机会" + if position_multiplier <= 0: + position_multiplier = 0.25 + reason = f"全市场处于 critical 风险,不再一刀切拦截,按 {position_multiplier:.0%} 仓位执行并禁止追高" plan["market_risk_gate"] = { - "blocked_action": current_action, - "final_action": "观察", + "blocked_action": "", + "final_action": current_action, "risk_level": risk_level, "position_multiplier": position_multiplier, "reasons": [reason], } - plan["entry_action"] = "观察" if not any("市场风控闸门" in str(sig) for sig in signals): signals.append(f"⚠️ 市场风控闸门: {reason}") return plan, reason @@ -896,8 +964,19 @@ def detect_breakout_pullback(df, timeframe="1d"): return result -def detect_box_breakout_pullback_4h(df, lookback=24, max_wait_bars=8): - """4H底部箱体突破回踩检测。 +def _detect_box_breakout_pullback( + df, + *, + timeframe_label="4H", + lookback=24, + max_wait_bars=8, + recent_window=36, + min_box_width_pct=3, + max_box_width_pct=45, + breakout_close_multiplier=1.006, + min_breakout_vol_ratio=1.15, +): + """底部箱体突破回踩检测。 模式:底部箱体横盘 -> 放量突破箱体上沿 -> 回踩箱体上沿/EMA不破。 这类形态比单根K线因子更像完整交易剧本,所以单独输出可复盘证据。 @@ -926,7 +1005,7 @@ def detect_box_breakout_pullback_4h(df, lookback=24, max_wait_bars=8): work["ema25"] = work["close"].ewm(span=25, adjust=False).mean() # 只回看最近一段,避免很久以前的箱体形态反复污染当前确认。 - start = max(int(lookback), len(work) - 36) + start = max(int(lookback), len(work) - int(recent_window)) end = len(work) - 1 best = None for i in range(start, end): @@ -938,16 +1017,16 @@ def detect_box_breakout_pullback_4h(df, lookback=24, max_wait_bars=8): if box_high <= 0 or box_low <= 0: continue box_width_pct = (box_high - box_low) / box_low * 100 - if box_width_pct <= 3 or box_width_pct > 45: + if box_width_pct <= float(min_box_width_pct) or box_width_pct > float(max_box_width_pct): continue row = work.iloc[i] vol_median = float(base["volume"].median() or 0) breakout_vol_ratio = float(row["volume"]) / vol_median if vol_median > 0 else 1.0 broke_out = ( - float(row["close"]) > box_high * 1.006 + float(row["close"]) > box_high * float(breakout_close_multiplier) and float(row["close"]) > float(row["open"]) - and breakout_vol_ratio >= 1.15 + and breakout_vol_ratio >= float(min_breakout_vol_ratio) ) if not broke_out: continue @@ -994,13 +1073,14 @@ def detect_box_breakout_pullback_4h(df, lookback=24, max_wait_bars=8): pullback_parts.append("EMA25") pullback_kind = "+".join(pullback_parts) or "回踩承接" signals.append( - "4H箱体突破回踩({} ${:.6g}, 量{:.1f}x)".format( + "{}箱体突破回踩({} ${:.6g}, 量{:.1f}x)".format( + timeframe_label, pullback_kind, box_high, breakout_vol_ratio, ) ) - signals.append("4H底部箱体宽度{:.1f}%".format(box_width_pct)) + signals.append("{}底部箱体宽度{:.1f}%".format(timeframe_label, box_width_pct)) quality = "优质" if score >= 10 else "良好" if score >= 7 else "可观察" if score >= 5 else "弱" candidate = { "detected": True, @@ -1027,6 +1107,40 @@ def detect_box_breakout_pullback_4h(df, lookback=24, max_wait_bars=8): return result +def detect_box_breakout_pullback_4h(df, lookback=24, max_wait_bars=8): + """4H底部箱体突破回踩检测。""" + return _detect_box_breakout_pullback( + df, + timeframe_label="4H", + lookback=lookback, + max_wait_bars=max_wait_bars, + recent_window=36, + min_box_width_pct=3, + max_box_width_pct=45, + breakout_close_multiplier=1.006, + min_breakout_vol_ratio=1.15, + ) + + +def detect_box_breakout_pullback_1h(df, lookback=36, max_wait_bars=10): + """1H底部箱体突破回踩检测。 + + 1H 用于捕捉 4H 成型前的更早入场结构,但只作为策略候选证据, + 仍需后续入场质量和全局风控过滤。 + """ + return _detect_box_breakout_pullback( + df, + timeframe_label="1H", + lookback=lookback, + max_wait_bars=max_wait_bars, + recent_window=72, + min_box_width_pct=2, + max_box_width_pct=32, + breakout_close_multiplier=1.004, + min_breakout_vol_ratio=1.2, + ) + + def confirm_burst(symbol, cand): """对单个候选做爆发确认(v1.7.0:强共振旁路+量价齐飞双门控) cand: coin_state行数据,含leader_status/detail_json等 @@ -1135,6 +1249,26 @@ def confirm_burst(symbol, cand): signals.append(f"1H放量({vol_ratio:.1f}x)但无量价齐飞(量价背离)") score += factor_scorer.delta("volume_divergence_1h", 1, evidence="1H放量但价格行为未确认", value=round(vol_ratio, 2)) + # ---- 1H箱体突破回踩:比4H更早的结构候选,仍需买点/风控过滤 ---- + bp_1h = {"detected": False} + try: + if h1_df is not None and len(h1_df) >= 60: + bp_1h = detect_box_breakout_pullback_1h(h1_df) + except Exception: + bp_1h = {"detected": False} + if bp_1h.get("detected"): + signals.extend(bp_1h.get("signals", [])) + score += factor_scorer.add_existing( + "box_breakout_pullback_1h", + bp_1h.get("score", 0), + evidence="1H底部箱体突破后回踩箱体上沿/均线", + value=bp_1h, + cap=8, + ) + t = _event_time_from_age(h1_df, bp_1h.get("pullback_age_bars")) + if t and _safe_age_bars(bp_1h.get("pullback_age_bars")) <= 2: + current_trigger_times.append(t) + # ---- PA引擎:4H级别(阻力/支撑) ---- pa_4h = full_pa_analysis(h4_df, "4h") if h4_df is not None and len(h4_df) >= 30 else {} @@ -1161,7 +1295,7 @@ def confirm_burst(symbol, cand): cap=10, ) t = _event_time_from_age(h4_df, bp_4h.get("pullback_age_bars")) - if t and int(bp_4h.get("pullback_age_bars") or 999) <= 1: + if t and _safe_age_bars(bp_4h.get("pullback_age_bars")) <= 1: current_trigger_times.append(t) # ---- v1.7.7: 日线 PA 全分析(供需区 + 起爆点 + 动K,高权重)---- @@ -1382,7 +1516,7 @@ def confirm_burst(symbol, cand): current_trigger_ok = bool(current_trigger_times) recent_candidate_ok = (fresh_reason == "fresh_candidate_state") if score >= structure_gate_score and entry_action in ("即刻买入", "可即刻买入") and (current_trigger_ok or recent_candidate_ok): - if fresh_reason != "stale_structure_background_only" and (stale_vp_count > 0 or stale_1h_ignitions or stale_d1_ignitions or bp_daily.get("detected") or bp_4h.get("detected")): + if fresh_reason != "stale_structure_background_only" and (stale_vp_count > 0 or stale_1h_ignitions or stale_d1_ignitions or bp_daily.get("detected") or bp_1h.get("detected") or bp_4h.get("detected")): signals.append(f"🟡 历史强背景+当前结构确认(score≥{structure_gate_score})") confirmed = True @@ -1429,7 +1563,10 @@ def confirm_burst(symbol, cand): if bp_daily.get("detected"): aux_count += 1 # 4H箱体突破回踩:更接近交易级别的结构形态 - if bp_4h.get("detected") and int(bp_4h.get("pullback_age_bars") or 999) <= 3: + if bp_4h.get("detected") and _safe_age_bars(bp_4h.get("pullback_age_bars")) <= 3: + aux_count += 1 + # 1H箱体突破回踩:更早的小时级结构,只在较新鲜时作为辅助确认 + if bp_1h.get("detected") and _safe_age_bars(bp_1h.get("pullback_age_bars")) <= 4: aux_count += 1 # 舆情共振:screener给了sentiment_bonus sentiment_bonus = cand_detail.get("sentiment_bonus") @@ -1648,12 +1785,18 @@ def confirm_burst(symbol, cand): if gated_action == "观察": score += factor_scorer.delta("entry_quality_gate", -2, evidence="买点质量闸门降为观察", value=gate_reasons[:3]) - # 周线突破回踩(需独立拉取) + regime_context = _current_market_regime_context() + market_regime = regime_context.get("market_regime") or {} + market_risk_level = str(market_regime.get("risk_level") or "").strip().lower() + + # 周线突破回踩是高成本兜底证据;全局禁止开仓时不再额外拉取,避免确认链路被低价值 API 拖慢。 bp_weekly = {"detected": False} try: - w1_df = fetch_klines(symbol, "1w", limit=52) - if w1_df is not None and len(w1_df) >= 30: - bp_weekly = detect_breakout_pullback(w1_df, "周线") + should_fetch_weekly = market_risk_level != "critical" and score >= max(confirm_min_score(), 8) + if should_fetch_weekly: + w1_df = fetch_klines(symbol, "1w", limit=52) + if w1_df is not None and len(w1_df) >= 30: + bp_weekly = detect_breakout_pullback(w1_df, "周线") except Exception: pass @@ -1663,10 +1806,11 @@ def confirm_burst(symbol, cand): # ---- 计算上下文数据 ---- market_context = compute_market_context(h1_df, price) - derivatives_context = fetch_derivatives_context(symbol) + derivatives_context = upstream_deriv if isinstance(upstream_deriv, dict) else {} + if market_risk_level != "critical" or not derivatives_context: + live_derivatives = fetch_derivatives_context(symbol) + derivatives_context = {**derivatives_context, **(live_derivatives or {})} sector_context = compute_sector_context(symbol, cand_detail) - regime_context = _current_market_regime_context() - market_regime = regime_context.get("market_regime") or {} if entry_plan: entry_plan, market_risk_gate_reason = _apply_market_risk_entry_gate(entry_plan, signals, market_regime) if market_risk_gate_reason: @@ -1688,10 +1832,12 @@ def confirm_burst(symbol, cand): stale_1h_ignitions=stale_1h_ignitions if 'stale_1h_ignitions' in locals() else [], stale_d1_ignitions=stale_d1_ignitions if 'stale_d1_ignitions' in locals() else [], bp_daily=bp_daily if 'bp_daily' in locals() else {}, + bp_1h=bp_1h if 'bp_1h' in locals() else {}, bp_4h=bp_4h if 'bp_4h' in locals() else {}, entry_action=entry_action, ) market_context["trigger_context"] = trigger_context + market_context["box_breakout_pullback_1h"] = bp_1h if 'bp_1h' in locals() else {} market_context["box_breakout_pullback_4h"] = bp_4h if 'bp_4h' in locals() else {} market_context["factor_score_breakdown"] = factor_score_breakdown market_context["onchain_context"] = onchain_context @@ -1738,6 +1884,7 @@ def confirm_burst(symbol, cand): "pa_1h": pa_1h, "pa_15min": pa_15min_result, "pa_1d": pa_1d, + "box_breakout_pullback_1h": bp_1h if 'bp_1h' in locals() else {}, "box_breakout_pullback_4h": bp_4h if 'bp_4h' in locals() else {}, "m30_aligned": m30_aligned, "entry_action": (entry_plan or {}).get("entry_action") or entry_action, @@ -1814,29 +1961,63 @@ def _should_publish_watch_candidate(cand, result): return not severe_risk and is_top_gainer and (has_current_trigger or score >= confirm_min_score()) -def _emit_output(output, compact: bool = False): - if compact: - print(json.dumps(output, ensure_ascii=False)) - else: - print(json.dumps(output, ensure_ascii=False, indent=2)) +def _result_brief(item: dict) -> dict: + ctx = item.get("market_context") or {} + trigger = item.get("trigger_context") or ctx.get("trigger_context") or {} + decision = item.get("decision_log") or ctx.get("decision_log") or {} + signal_text = " ".join(str(x) for x in (item.get("signals") or [])) + inferred_strategy = "" + if "1H箱体突破回踩" in signal_text: + inferred_strategy = "box_retest_1h_v1" + elif "4H箱体突破回踩" in signal_text: + inferred_strategy = "box_retest_4h_v1" + return { + "symbol": item.get("symbol"), + "confirmed": bool(item.get("confirmed")), + "score": item.get("score"), + "action": item.get("entry_action") or (item.get("entry_plan") or {}).get("entry_action") or "", + "strategy_code": item.get("strategy_code") or (item.get("strategy_snapshot") or {}).get("strategy_code") or inferred_strategy, + "rec_id": item.get("rec_id") or 0, + "published_watch": bool(item.get("published_watch")), + "trigger_status": trigger.get("trigger_status") or "", + "risk_flags": decision.get("risk_flags") or [], + "signals": list(item.get("signals") or [])[:6], + "state_update": item.get("state_update") or {}, + } -def main(compact: bool = False): +def _summarize_output(output: dict) -> dict: + return { + "status": output.get("status"), + "processed_count": output.get("processed_count", output.get("confirmed_count", 0) + output.get("unconfirmed_count", 0)), + "confirmed_count": output.get("confirmed_count", 0), + "unconfirmed_count": output.get("unconfirmed_count", 0), + "market_risk_downgraded_count": output.get("market_risk_downgraded_count", 0), + "stopped_reason": output.get("stopped_reason", ""), + "confirmed": [_result_brief(r) for r in output.get("confirmed", [])], + "unconfirmed": [_result_brief(r) for r in output.get("unconfirmed", [])], + "check_time": output.get("check_time"), + } + + +def _emit_output(output, compact: bool = False, verbose: bool = False): + payload = output if verbose else _summarize_output(output) + print(json.dumps(payload, ensure_ascii=False, separators=(",", ":") if compact else None, indent=None if compact else 2, default=str)) + + +def main(compact: bool = False, verbose: bool = False, limit: int | None = None, max_seconds: int | None = None): started_at = datetime.now() try: init_db() expire_old_states() regime_context = _current_market_regime_context() market_regime = regime_context.get("market_regime") or {} - if str(market_regime.get("risk_level") or "").strip().lower() == "critical" and float(market_regime.get("position_multiplier") or 0) <= 0: - downgrade_result = downgrade_active_entries_for_market_risk( - "全市场处于 critical 风险,暂停新开仓与新挂单,保留为观察机会", - event_time=datetime.now().isoformat(), - ) - else: - downgrade_result = {"updated_count": 0} + downgrade_result = {"updated_count": 0} - candidates = get_candidates_for_confirm() + confirm_cfg = _get_cfg_section("confirm") + candidate_limit = max(1, min(int(limit or confirm_cfg.get("max_candidates_per_run") or 8), 50)) + run_budget_seconds = max(5, int(max_seconds or confirm_cfg.get("max_run_seconds") or 90)) + candidates = get_candidates_for_confirm(limit=candidate_limit) if not candidates: output = { @@ -1845,11 +2026,16 @@ def main(compact: bool = False): "market_risk_downgraded_count": downgrade_result.get("updated_count", 0), "check_time": datetime.now().isoformat(), } - _emit_output(output, compact=compact) + _emit_output(output, compact=compact, verbose=verbose) return output results = [] + stopped_reason = "" for cand in candidates: + elapsed = (datetime.now() - started_at).total_seconds() + if elapsed >= run_budget_seconds: + stopped_reason = f"max_seconds_exceeded:{run_budget_seconds}" + break symbol = cand["symbol"] result = confirm_burst(symbol, cand) @@ -1990,12 +2176,15 @@ def main(compact: bool = False): "status": "confirmed" if confirmed else "unconfirmed", "confirmed_count": len(confirmed), "unconfirmed_count": len(unconfirmed), + "processed_count": len(results), + "candidate_limit": candidate_limit, "market_risk_downgraded_count": downgrade_result.get("updated_count", 0), + "stopped_reason": stopped_reason, "confirmed": confirmed, "unconfirmed": unconfirmed, "check_time": datetime.now().isoformat(), } - _emit_output(output, compact=compact) + _emit_output(output, compact=compact, verbose=verbose) return output except Exception as e: finished_at = datetime.now() diff --git a/app/strategies/box_retest_4h.py b/app/strategies/box_retest_4h.py index 013bcc6..109ad36 100644 --- a/app/strategies/box_retest_4h.py +++ b/app/strategies/box_retest_4h.py @@ -9,7 +9,7 @@ from __future__ import annotations from app.core.factor_roles import ENTRY, RISK, TRIGGER from app.core.strategy_contract import StrategySignal, current_strategy_version -from app.core.strategy_registry import BOX_RETEST_4H_STRATEGY +from app.core.strategy_registry import BOX_RETEST_1H_STRATEGY, BOX_RETEST_4H_STRATEGY def _safe_float(value, default=0.0) -> float: @@ -21,11 +21,26 @@ def _safe_float(value, default=0.0) -> float: return default -def build_box_retest_signal( +def _safe_int(value, default=999) -> int: + try: + if value is None or value == "": + return default + return int(value) + except Exception: + return default + + +def _build_box_retest_signal( *, symbol: str, current_price: float, detection: dict, + strategy_code: str, + timeframe_label: str, + factor_code: str, + factor_label: str, + max_fresh_age_bars: int, + max_chase_distance_pct: float, entry_plan: dict | None = None, market_regime: dict | None = None, decision_log: dict | None = None, @@ -38,28 +53,28 @@ def build_box_retest_signal( entry_zone = _safe_float(detection.get("entry_zone")) current_price = _safe_float(current_price) distance_pct = (current_price / entry_zone - 1) * 100 if entry_zone > 0 and current_price > 0 else 0 - age = int(detection.get("pullback_age_bars") or 999) + age = _safe_int(detection.get("pullback_age_bars")) quality = str(detection.get("quality") or "") status = "candidate" reasons = [] if risk_level == "critical": status = "observe" reasons.append("全局风险 critical,仅观察") - if age > 4: + if age > max_fresh_age_bars: status = "observe" - reasons.append(f"回踩已过去 {age} 根4H,时效偏旧") + reasons.append(f"回踩已过去 {age} 根{timeframe_label},时效偏旧") if quality not in {"良好", "优质"}: status = "observe" reasons.append(f"形态质量 {quality or '未知'},不直接交易") - if distance_pct > 8: + if distance_pct > max_chase_distance_pct: status = "observe" reasons.append(f"当前价离箱体上沿 {distance_pct:.1f}%,禁止追高") score = _safe_float(detection.get("score")) confidence = min(100.0, max(0.0, score * 8)) trigger = { - "factor_code": "box_breakout_pullback_4h", - "factor_label": "4H箱体突破回踩", + "factor_code": factor_code, + "factor_label": factor_label, "box_high": detection.get("box_high"), "box_low": detection.get("box_low"), "entry_zone": detection.get("entry_zone"), @@ -77,7 +92,7 @@ def build_box_retest_signal( "risk_reasons": reasons, } return StrategySignal( - strategy_code=BOX_RETEST_4H_STRATEGY, + strategy_code=strategy_code, strategy_version=current_strategy_version(), symbol=symbol, direction="long", @@ -86,7 +101,7 @@ def build_box_retest_signal( score=score, trigger=trigger, factor_roles={ - "box_breakout_pullback_4h": TRIGGER, + factor_code: TRIGGER, "pullback_15m_confirm": ENTRY, "trend_exhaustion": RISK, "false_breakout": RISK, @@ -99,3 +114,58 @@ def build_box_retest_signal( "reasons": reasons, }, ) + + +def build_box_retest_4h_signal( + *, + symbol: str, + current_price: float, + detection: dict, + entry_plan: dict | None = None, + market_regime: dict | None = None, + decision_log: dict | None = None, +) -> StrategySignal | None: + return _build_box_retest_signal( + symbol=symbol, + current_price=current_price, + detection=detection, + strategy_code=BOX_RETEST_4H_STRATEGY, + timeframe_label="4H", + factor_code="box_breakout_pullback_4h", + factor_label="4H箱体突破回踩", + max_fresh_age_bars=4, + max_chase_distance_pct=8, + entry_plan=entry_plan, + market_regime=market_regime, + decision_log=decision_log, + ) + + +def build_box_retest_1h_signal( + *, + symbol: str, + current_price: float, + detection: dict, + entry_plan: dict | None = None, + market_regime: dict | None = None, + decision_log: dict | None = None, +) -> StrategySignal | None: + return _build_box_retest_signal( + symbol=symbol, + current_price=current_price, + detection=detection, + strategy_code=BOX_RETEST_1H_STRATEGY, + timeframe_label="1H", + factor_code="box_breakout_pullback_1h", + factor_label="1H箱体突破回踩", + max_fresh_age_bars=6, + max_chase_distance_pct=6, + entry_plan=entry_plan, + market_regime=market_regime, + decision_log=decision_log, + ) + + +def build_box_retest_signal(**kwargs) -> StrategySignal | None: + """Backward-compatible alias for the original 4H strategy builder.""" + return build_box_retest_4h_signal(**kwargs) diff --git a/rules.yaml b/rules.yaml index 10b9949..ad9b0e1 100644 --- a/rules.yaml +++ b/rules.yaml @@ -146,6 +146,10 @@ screener: sector_only_max_state: 蓄力 confirm: min_score: 5 + max_candidates_per_run: 8 + max_run_seconds: 90 + http_timeout_seconds: 2.5 + kline_timeout_ms: 4500 state_cooldown_hours: 6 volume_breakout_ratio: 2.2 strong_resonance_bypass: diff --git a/tests/test_box_breakout_pullback_4h.py b/tests/test_box_breakout_pullback_4h.py index b5cff6b..5d62445 100644 --- a/tests/test_box_breakout_pullback_4h.py +++ b/tests/test_box_breakout_pullback_4h.py @@ -3,7 +3,7 @@ from datetime import datetime, timedelta import pandas as pd from app.core.signal_taxonomy import signal_code -from app.services.altcoin_confirm import detect_box_breakout_pullback_4h +from app.services.altcoin_confirm import detect_box_breakout_pullback_1h, detect_box_breakout_pullback_4h def _bar(ts, open_, high, low, close, volume): @@ -56,3 +56,24 @@ def test_detect_box_breakout_pullback_4h_rejects_failed_retest(): result = detect_box_breakout_pullback_4h(df) assert result["detected"] is False + + +def test_detect_box_breakout_pullback_1h_finds_early_retest(): + start = datetime(2026, 5, 20) + rows = [] + for i in range(44): + base = 1.0 + (i % 4) * 0.003 + rows.append(_bar(start + timedelta(hours=i), base, 1.035 + (i % 3) * 0.001, 0.985 - (i % 2) * 0.001, base + 0.002, 1000 + (i % 5) * 20)) + + rows.append(_bar(start + timedelta(hours=44), 1.032, 1.085, 1.03, 1.068, 2300)) + rows.append(_bar(start + timedelta(hours=45), 1.064, 1.073, 1.034, 1.055, 1700)) + for i in range(8): + close = 1.058 + i * 0.004 + rows.append(_bar(start + timedelta(hours=46 + i), close - 0.006, close + 0.012, close - 0.018, close, 1250 + i * 25)) + + result = detect_box_breakout_pullback_1h(pd.DataFrame(rows)) + + assert result["detected"] is True + assert result["score"] >= 7 + assert result["quality"] in {"良好", "优质"} + assert signal_code(result["signals"][0]) == "box_breakout_pullback_1h" diff --git a/tests/test_confirm_market_risk_gate.py b/tests/test_confirm_market_risk_gate.py index eb97414..e9809f2 100644 --- a/tests/test_confirm_market_risk_gate.py +++ b/tests/test_confirm_market_risk_gate.py @@ -1,7 +1,7 @@ from app.services.altcoin_confirm import _apply_market_risk_entry_gate -def test_market_risk_gate_blocks_executable_action_when_critical(): +def test_market_risk_gate_reduces_size_instead_of_blocking_when_critical(): signals = ["15min即刻入场信号"] plan, reason = _apply_market_risk_entry_gate( {"entry_action": "可即刻买入", "entry_price": 1.0}, @@ -9,9 +9,11 @@ def test_market_risk_gate_blocks_executable_action_when_critical(): {"risk_level": "critical", "position_multiplier": 0.0}, ) - assert plan["entry_action"] == "观察" - assert plan["market_risk_gate"]["blocked_action"] == "可即刻买入" - assert "暂停新开仓" in reason + assert plan["entry_action"] == "可即刻买入" + assert plan["market_risk_gate"]["blocked_action"] == "" + assert plan["market_risk_gate"]["final_action"] == "可即刻买入" + assert plan["market_risk_gate"]["position_multiplier"] == 0.25 + assert "仓位" in reason assert any("市场风控闸门" in sig for sig in signals) diff --git a/tests/test_market_regime.py b/tests/test_market_regime.py index b9487d8..ee0576b 100644 --- a/tests/test_market_regime.py +++ b/tests/test_market_regime.py @@ -19,7 +19,7 @@ def _overview(**overrides): return data -def test_market_regime_blocks_clear_risk_off(): +def test_market_regime_reduces_size_for_clear_risk_off(): result = classify_market_regime( _overview( benchmarks={"BTC/USDT": {"change_24h": -3.4}, "ETH/USDT": {"change_24h": -4.2}}, @@ -30,7 +30,7 @@ def test_market_regime_blocks_clear_risk_off(): assert result["regime"] == "risk_off" assert result["risk_level"] == "critical" - assert result["position_multiplier"] == 0 + assert result["position_multiplier"] == 0.25 def test_market_regime_detects_altcoin_rotation(): diff --git a/tests/test_multi_strategy_infra.py b/tests/test_multi_strategy_infra.py index 1058a92..209ab0c 100644 --- a/tests/test_multi_strategy_infra.py +++ b/tests/test_multi_strategy_infra.py @@ -1,13 +1,15 @@ from app.core.factor_roles import RISK, TRIGGER, factor_role, factor_roles_for_codes from app.core.strategy_contract import StrategySignal, default_main_composite_signal -from app.core.strategy_registry import BOX_RETEST_4H_STRATEGY, MAIN_COMPOSITE_STRATEGY, strategy_label +from app.core.strategy_registry import BOX_RETEST_1H_STRATEGY, BOX_RETEST_4H_STRATEGY, MAIN_COMPOSITE_STRATEGY, strategy_label from app.db.recommendation_commands import create_recommendation from app.db.strategy_signal_queries import insert_strategy_signal from app.db.paper_trading import _open_trade, _order_payload_from_rec +from app.strategies.box_retest_4h import build_box_retest_1h_signal def test_factor_roles_never_promote_unknown_to_trigger(): assert factor_role("box_breakout_pullback_4h") == TRIGGER + assert factor_role("box_breakout_pullback_1h") == TRIGGER assert factor_role("false_breakout") == RISK assert factor_role("new_unknown_factor") == "unknown" assert factor_roles_for_codes(["box_breakout_pullback_4h", "new_unknown_factor"]) == { @@ -27,6 +29,7 @@ def test_default_main_composite_strategy_signal_is_stable(): assert signal["strategy_code"] == MAIN_COMPOSITE_STRATEGY assert signal["strategy_name"] == "综合确认主链路" assert signal["factor_roles"]["vp_fly_1h_current"] == "trigger" + assert strategy_label(BOX_RETEST_1H_STRATEGY) == "1H箱体突破回踩" def test_strategy_signal_insert_and_recommendation_lineage(pg_conn): @@ -63,6 +66,27 @@ def test_strategy_signal_insert_and_recommendation_lineage(pg_conn): assert "box_breakout_pullback_4h" in row["factor_roles_json"] +def test_box_retest_strategy_preserves_zero_age_as_fresh(): + signal = build_box_retest_1h_signal( + symbol="FRESH/USDT", + current_price=1.01, + detection={ + "detected": True, + "score": 10, + "entry_zone": 1.0, + "stop_level": 0.94, + "quality": "优质", + "pullback_age_bars": 0, + }, + market_regime={"regime": "altcoin_rotation", "risk_level": "medium"}, + ) + payload = signal.to_json_dict() + + assert payload["status"] == "candidate" + assert payload["trigger"]["pullback_age_bars"] == 0 + assert payload["risk_plan"]["risk_reasons"] == [] + + def test_paper_order_and_trade_inherit_strategy_lineage(pg_conn): rec = { "id": 1, diff --git a/tests/test_paper_trading.py b/tests/test_paper_trading.py index 3f59943..9d3e6c1 100644 --- a/tests/test_paper_trading.py +++ b/tests/test_paper_trading.py @@ -527,9 +527,10 @@ def test_buy_now_rejects_when_cumulative_leverage_exceeded(monkeypatch): assert list_paper_trades(status="open")["total"] == 1 -def test_buy_now_rejects_when_global_market_risk_is_critical(monkeypatch): +def test_buy_now_uses_reduced_size_when_global_market_risk_is_critical(monkeypatch): monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1") monkeypatch.setenv("ALPHAX_PAPER_GLOBAL_RISK_GATE_ENABLED", "1") + monkeypatch.setenv("ALPHAX_PAPER_GLOBAL_RISK_BLOCK_CRITICAL", "0") monkeypatch.setenv("ALPHAX_PAPER_ENTRY_GATE_ENABLED", "0") monkeypatch.setenv("ALPHAX_PAPER_MAX_ACCOUNT_DRAWDOWN_PAUSE_PCT", "0") monkeypatch.setattr( @@ -559,10 +560,11 @@ def test_buy_now_rejects_when_global_market_risk_is_critical(monkeypatch): result = sync_recommendation(rec, 100, event_time="2026-05-16T10:00:00") - assert result["reason"] == "global_risk_rejected" - assert result["risk_detail"]["risk_level"] == "critical" - assert result["risk_detail"]["market_regime"]["regime"] == "risk_off" - assert list_paper_trades()["total"] == 0 + assert result["opened"] is True + assert result["global_risk"]["risk_level"] == "critical" + assert result["global_risk"]["decision"] == "allow_reduced_size" + assert result["notional_usdt"] == pytest.approx(1250.0) + assert list_paper_trades()["total"] == 1 def test_open_event_records_market_regime_and_score_components(monkeypatch):