From ffd6f73427e46954f54d6fc324bf5772e9003f5d Mon Sep 17 00:00:00 2001 From: aaron <> Date: Sun, 7 Jun 2026 21:39:11 +0800 Subject: [PATCH] 1 --- AGENTS.md | 2 ++ app/db/operations_dashboard.py | 27 ++++++++++++++-- app/db/paper_trading.py | 51 +++++++++++++++++++++++++++--- tests/test_operations_dashboard.py | 32 +++++++++++++++++++ tests/test_paper_trading.py | 46 +++++++++++++++++++++++++++ 5 files changed, 151 insertions(+), 7 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 677cb83..2027e00 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -111,6 +111,8 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - 同一个 `symbol` 可以同时存在多头机会和空头机会。推荐写入和列表去重必须按 `symbol + direction/side` 处理,不能只按币种去重,否则会互相覆盖。 - 策略级配置入口在 `app/core/strategy_registry.py`,`StrategyDefinition.entry_gate_config` 控制确认/跟踪/展示派生的买点质量闸门,`StrategyDefinition.paper_config` 控制该策略进入 paper trading 的入场、挂单和动态杠杆门槛。 - 新增策略时必须注册稳定 `strategy_code`,并明确自己的 `entry_gate_config` / `paper_config`。不要把新策略的特殊门槛写进全局 `paper_trading_config()` 或 `DEFAULT_ENTRY_GATE`,否则会影响其他策略的信号生成和成交样本。 +- paper trading 的策略级门槛优先级高于全局默认:`entry_min_rec_score`、`entry_min_rr`、`order_min_rec_score`、`order_min_rr` 这类门槛应由策略自己的 `paper_config` 决定。`ALPHAX_PAPER_*` 环境变量只作为无策略来源推荐的默认值,或在明确设置 `ALPHAX_PAPER_FORCE_GLOBAL_GATE_OVERRIDES=1` 时作为紧急全局覆盖;不要默认用全局分数/RR 门槛压平所有策略。 +- `paper_gate_reject` 事件必须去重/节流。相同 `recommendation_id + strategy_code + reason + gate_reasons` 在冷却窗口内只应记录一次;运行大屏和复盘读模型也应按这个稳定 key 聚合,避免同一弱信号每轮重复刷屏污染策略评价。 - `apply_entry_quality_gate()` 必须传入或从 `entry_plan.strategy_code` 派生策略身份;`paper_trader.py` 中开仓、挂单、挂单成交和挂单维护应通过策略级配置合并后的参数执行。 - `app/core/factor_scoring.py` 是确认层因子评分中心。新增确认加减分不要继续散落写死 `score += N`,应优先通过 `FactorScorer.delta(factor_code, base_delta, evidence=...)` 计算。 - 稳定因子代码来自 `app/core/signal_taxonomy.py`,例如 `vp_fly_1h_current`、`volume_consecutive_1h`、`ignition_d1_current`、`sector_rotation`、`sentiment_resonance`、`top_trader_long`、`risk_reward_bad`。 diff --git a/app/db/operations_dashboard.py b/app/db/operations_dashboard.py index a441b87..7e8fc55 100644 --- a/app/db/operations_dashboard.py +++ b/app/db/operations_dashboard.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json from datetime import datetime, timedelta from app.db.schema import get_conn @@ -317,24 +318,44 @@ def _build_trading(conn, since: str) -> dict: def _build_timeline(conn, base_events: list[dict], since: str) -> list[dict]: timeline = list(base_events) + seen_trade_events = set() for row in _fetchall( conn, """ - SELECT event_time AS time, event_type, symbol, message + SELECT event_time AS time, event_type, symbol, message, + recommendation_id, strategy_code, detail_json FROM paper_trade_events WHERE event_time >= %s ORDER BY event_time DESC, id DESC - LIMIT 12 + LIMIT 80 """, (since,), ): + detail = {} + try: + raw_detail = row.get("detail_json") + detail = json.loads(raw_detail) if isinstance(raw_detail, str) and raw_detail.strip() else (raw_detail or {}) + except Exception: + detail = {} + if row.get("event_type") == "paper_gate_reject": + reject_key = ( + row.get("recommendation_id") or detail.get("recommendation_id") or detail.get("rec_id") or "", + row.get("strategy_code") or detail.get("strategy_code") or "", + detail.get("reason") or "", + tuple(detail.get("gate_reasons") if isinstance(detail.get("gate_reasons"), list) else []), + ) + if reject_key in seen_trade_events: + continue + seen_trade_events.add(reject_key) timeline.append({ "time": _iso(row.get("time")), "type": "trade", "title": f"{row.get('symbol') or '--'} · {row.get('event_type') or '交易事件'}", - "status": "ok", + "status": "warn" if row.get("event_type") == "paper_gate_reject" else "ok", "detail": _display_error_summary(row.get("message") or "", source="paper_trade"), }) + if len(seen_trade_events) >= 12: + break for row in _fetchall( conn, """ diff --git a/app/db/paper_trading.py b/app/db/paper_trading.py index 82f48fb..92ad33c 100644 --- a/app/db/paper_trading.py +++ b/app/db/paper_trading.py @@ -2,6 +2,7 @@ from __future__ import annotations +import hashlib import json import os from datetime import datetime, timedelta @@ -317,9 +318,15 @@ def _strategy_code_from_rec(rec: dict) -> str: def _paper_cfg_for_rec(rec: dict, config: dict | None = None) -> dict: cfg = dict(_paper_cfg(config) or {}) - cfg.update(strategy_paper_config(_strategy_code_from_rec(rec))) + raw_strategy_code = _raw_strategy_code_from_rec(rec) + cfg.update(strategy_paper_config(normalize_strategy_code(raw_strategy_code))) # Strategy configs tune the intraday defaults, but explicit environment - # overrides remain the emergency/manual control surface for local and prod. + # overrides remain the legacy/manual default only when a recommendation has + # no explicit strategy identity. Strategy-owned signals must keep their own + # paper gates so multi-strategy evaluation is not flattened by global envs. + allow_global_gate_override = not bool(raw_strategy_code) or str( + os.getenv("ALPHAX_PAPER_FORCE_GLOBAL_GATE_OVERRIDES") or "" + ).strip().lower() in {"1", "true", "yes", "on"} env_float_keys = { "ALPHAX_PAPER_ENTRY_MIN_REC_SCORE": "entry_min_rec_score", "ALPHAX_PAPER_ENTRY_MIN_RR": "entry_min_rr", @@ -334,7 +341,13 @@ def _paper_cfg_for_rec(rec: dict, config: dict | None = None) -> dict: "ALPHAX_PAPER_DYNAMIC_LEVERAGE_MIN": "dynamic_leverage_min", } for env_name, key in env_float_keys.items(): - if os.getenv(env_name) is not None: + is_gate_threshold = key in { + "entry_min_rec_score", + "entry_min_rr", + "order_min_rec_score", + "order_min_rr", + } + if os.getenv(env_name) is not None and (allow_global_gate_override or not is_gate_threshold): cfg[key] = _safe_float(os.getenv(env_name), cfg.get(key)) if os.getenv("ALPHAX_PAPER_ORDER_REQUIRE_CURRENT_TRIGGER") is not None: cfg["order_require_current_trigger"] = str(os.getenv("ALPHAX_PAPER_ORDER_REQUIRE_CURRENT_TRIGGER") or "").strip().lower() in {"1", "true", "yes", "on"} @@ -572,6 +585,35 @@ def _same_gate_reject(a: dict, b: dict) -> bool: ) +def _gate_reject_lock_id(rec_id: int, event_type: str, strategy_code: str, detail: dict) -> int: + gate_reasons = detail.get("gate_reasons") if isinstance(detail.get("gate_reasons"), list) else [] + raw = json.dumps( + { + "rec_id": rec_id, + "event_type": event_type, + "strategy_code": strategy_code, + "reason": detail.get("reason") or "", + "gate_reasons": gate_reasons, + }, + ensure_ascii=False, + sort_keys=True, + default=str, + ) + digest = hashlib.sha256(raw.encode("utf-8")).digest() + return int.from_bytes(digest[:8], "big", signed=False) & 0x7FFFFFFFFFFFFFFF + + +def _lock_gate_reject_event(conn, rec_id: int, event_type: str, strategy_code: str, detail: dict) -> None: + if rec_id <= 0 or event_type != "paper_gate_reject": + return + try: + conn.execute("SELECT pg_advisory_xact_lock(%s)", (_gate_reject_lock_id(rec_id, event_type, strategy_code, detail),)) + except Exception: + # Tests and future adapters should not fail merely because the current + # connection does not support PostgreSQL advisory locks. + return + + def _recent_recommendation_event_exists(conn, rec_id: int, event_type: str, strategy_code: str, detail: dict, minutes: int = 30, event_time: str = "") -> bool: if rec_id <= 0 or event_type != "paper_gate_reject": return False @@ -591,7 +633,7 @@ def _recent_recommendation_event_exists(conn, rec_id: int, event_type: str, stra AND COALESCE(strategy_code,'')=%s AND event_time::timestamp >= %s::timestamp ORDER BY event_time DESC - LIMIT 20 + LIMIT 100 """, (rec_id, event_type, strategy_code, cutoff), ).fetchall() @@ -617,6 +659,7 @@ def _record_recommendation_event(conn, rec: dict, event_type: str, message: str, "action_status": rec.get("action_status") or plan.get("entry_action") or "", "entry_plan": plan, } + _lock_gate_reject_event(conn, _safe_int(rec.get("id")), event_type, event_detail["strategy_code"], event_detail) if _recent_recommendation_event_exists( conn, _safe_int(rec.get("id")), diff --git a/tests/test_operations_dashboard.py b/tests/test_operations_dashboard.py index 1a64ba5..b25c1aa 100644 --- a/tests/test_operations_dashboard.py +++ b/tests/test_operations_dashboard.py @@ -1,3 +1,5 @@ +import json + from fastapi.testclient import TestClient from app.db import auth_db @@ -80,6 +82,36 @@ def test_operations_dashboard_hides_retired_onchain_runtime_data(pg_conn): assert all("onchain" not in str(x.get("title") or "").lower() for x in data["timeline"]) +def test_operations_dashboard_dedupes_repeated_gate_reject_timeline(pg_conn): + detail = { + "reason": "paper_order_gate_rejected", + "gate_reasons": ["rec_score_below_min"], + "strategy_code": "short_breakdown_retest_1h_v1", + } + for _ in range(3): + pg_conn.execute( + """ + INSERT INTO paper_trade_events ( + trade_id, recommendation_id, symbol, event_type, event_time, + message, detail_json, strategy_code + ) + VALUES (0, 9001, 'ONDO/USDT', 'paper_gate_reject', NOW(), + '日内策略挂单门禁拒绝', %s, 'short_breakdown_retest_1h_v1') + """, + (json.dumps(detail),), + ) + pg_conn.commit() + + data = get_operations_dashboard(hours=24) + gate_rows = [ + x for x in data["timeline"] + if "ONDO/USDT" in str(x.get("title") or "") and "paper_gate_reject" in str(x.get("title") or "") + ] + + assert len(gate_rows) == 1 + assert gate_rows[0]["status"] == "warn" + + def test_operations_dashboard_sanitizes_external_provider_errors(): summary = _display_error_summary( "market:HTTPSConnectionPool(host='api.binance.com', port=443): " diff --git a/tests/test_paper_trading.py b/tests/test_paper_trading.py index 5f9b077..9821ffc 100644 --- a/tests/test_paper_trading.py +++ b/tests/test_paper_trading.py @@ -5,6 +5,8 @@ import pytest from app.db import altcoin_db from app.db.paper_trading import ( + _gate_reject_lock_id, + _paper_order_gate, delete_paper_order, delete_paper_trade, get_paper_trading_performance, @@ -338,6 +340,50 @@ def test_repeated_paper_gate_reject_is_deduped(monkeypatch): assert events[0]["symbol"] == "DEDUP/USDT" +def test_strategy_order_gate_threshold_wins_over_global_env(monkeypatch): + monkeypatch.setenv("ALPHAX_PAPER_ORDER_MIN_REC_SCORE", "25") + rec = { + "id": 0, + "symbol": "SHORTOK/USDT", + "strategy_code": SHORT_BREAKDOWN_RETEST_STRATEGY, + "rec_score": 18, + "execution_status": "wait_pullback", + "entry_plan": { + "strategy_code": SHORT_BREAKDOWN_RETEST_STRATEGY, + "entry_action": "等回踩", + "side": "short", + "entry_price": 100, + "stop_loss": 110, + "tp1": 85, + "risk_reward_ok": True, + "rr1": 1.5, + }, + } + + ok, reasons, detail = _paper_order_gate(rec, 100) + + assert ok is True + assert "rec_score_below_min" not in reasons + assert detail["min_rec_score"] == 18 + + +def test_gate_reject_lock_id_uses_stable_reject_identity(): + base = { + "reason": "paper_order_gate_rejected", + "gate_reasons": ["rec_score_below_min"], + "action_status": "等回踩", + "execution_status": "wait_pullback", + } + changed_runtime = {**base, "action_status": "等待回踩", "execution_status": "observe"} + + assert _gate_reject_lock_id(1, "paper_gate_reject", SHORT_BREAKDOWN_RETEST_STRATEGY, base) == _gate_reject_lock_id( + 1, + "paper_gate_reject", + SHORT_BREAKDOWN_RETEST_STRATEGY, + changed_runtime, + ) + + def test_wait_pullback_requires_confirmed_risk_reward(monkeypatch): monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1") altcoin_db.init_db()