This commit is contained in:
aaron 2026-06-07 21:39:11 +08:00
parent 076a15da35
commit ffd6f73427
5 changed files with 151 additions and 7 deletions

View File

@ -111,6 +111,8 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组
- 同一个 `symbol` 可以同时存在多头机会和空头机会。推荐写入和列表去重必须按 `symbol + direction/side` 处理,不能只按币种去重,否则会互相覆盖。 - 同一个 `symbol` 可以同时存在多头机会和空头机会。推荐写入和列表去重必须按 `symbol + direction/side` 处理,不能只按币种去重,否则会互相覆盖。
- 策略级配置入口在 `app/core/strategy_registry.py``StrategyDefinition.entry_gate_config` 控制确认/跟踪/展示派生的买点质量闸门,`StrategyDefinition.paper_config` 控制该策略进入 paper trading 的入场、挂单和动态杠杆门槛。 - 策略级配置入口在 `app/core/strategy_registry.py``StrategyDefinition.entry_gate_config` 控制确认/跟踪/展示派生的买点质量闸门,`StrategyDefinition.paper_config` 控制该策略进入 paper trading 的入场、挂单和动态杠杆门槛。
- 新增策略时必须注册稳定 `strategy_code`,并明确自己的 `entry_gate_config` / `paper_config`。不要把新策略的特殊门槛写进全局 `paper_trading_config()``DEFAULT_ENTRY_GATE`,否则会影响其他策略的信号生成和成交样本。 - 新增策略时必须注册稳定 `strategy_code`,并明确自己的 `entry_gate_config` / `paper_config`。不要把新策略的特殊门槛写进全局 `paper_trading_config()``DEFAULT_ENTRY_GATE`,否则会影响其他策略的信号生成和成交样本。
- paper trading 的策略级门槛优先级高于全局默认:`entry_min_rec_score`、`entry_min_rr`、`order_min_rec_score`、`order_min_rr` 这类门槛应由策略自己的 `paper_config` 决定。`ALPHAX_PAPER_*` 环境变量只作为无策略来源推荐的默认值,或在明确设置 `ALPHAX_PAPER_FORCE_GLOBAL_GATE_OVERRIDES=1` 时作为紧急全局覆盖;不要默认用全局分数/RR 门槛压平所有策略。
- `paper_gate_reject` 事件必须去重/节流。相同 `recommendation_id + strategy_code + reason + gate_reasons` 在冷却窗口内只应记录一次;运行大屏和复盘读模型也应按这个稳定 key 聚合,避免同一弱信号每轮重复刷屏污染策略评价。
- `apply_entry_quality_gate()` 必须传入或从 `entry_plan.strategy_code` 派生策略身份;`paper_trader.py` 中开仓、挂单、挂单成交和挂单维护应通过策略级配置合并后的参数执行。 - `apply_entry_quality_gate()` 必须传入或从 `entry_plan.strategy_code` 派生策略身份;`paper_trader.py` 中开仓、挂单、挂单成交和挂单维护应通过策略级配置合并后的参数执行。
- `app/core/factor_scoring.py` 是确认层因子评分中心。新增确认加减分不要继续散落写死 `score += N`,应优先通过 `FactorScorer.delta(factor_code, base_delta, evidence=...)` 计算。 - `app/core/factor_scoring.py` 是确认层因子评分中心。新增确认加减分不要继续散落写死 `score += N`,应优先通过 `FactorScorer.delta(factor_code, base_delta, evidence=...)` 计算。
- 稳定因子代码来自 `app/core/signal_taxonomy.py`,例如 `vp_fly_1h_current`、`volume_consecutive_1h`、`ignition_d1_current`、`sector_rotation`、`sentiment_resonance`、`top_trader_long`、`risk_reward_bad`。 - 稳定因子代码来自 `app/core/signal_taxonomy.py`,例如 `vp_fly_1h_current`、`volume_consecutive_1h`、`ignition_d1_current`、`sector_rotation`、`sentiment_resonance`、`top_trader_long`、`risk_reward_bad`。

View File

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from app.db.schema import get_conn from app.db.schema import get_conn
@ -317,24 +318,44 @@ def _build_trading(conn, since: str) -> dict:
def _build_timeline(conn, base_events: list[dict], since: str) -> list[dict]: def _build_timeline(conn, base_events: list[dict], since: str) -> list[dict]:
timeline = list(base_events) timeline = list(base_events)
seen_trade_events = set()
for row in _fetchall( for row in _fetchall(
conn, conn,
""" """
SELECT event_time AS time, event_type, symbol, message SELECT event_time AS time, event_type, symbol, message,
recommendation_id, strategy_code, detail_json
FROM paper_trade_events FROM paper_trade_events
WHERE event_time >= %s WHERE event_time >= %s
ORDER BY event_time DESC, id DESC ORDER BY event_time DESC, id DESC
LIMIT 12 LIMIT 80
""", """,
(since,), (since,),
): ):
detail = {}
try:
raw_detail = row.get("detail_json")
detail = json.loads(raw_detail) if isinstance(raw_detail, str) and raw_detail.strip() else (raw_detail or {})
except Exception:
detail = {}
if row.get("event_type") == "paper_gate_reject":
reject_key = (
row.get("recommendation_id") or detail.get("recommendation_id") or detail.get("rec_id") or "",
row.get("strategy_code") or detail.get("strategy_code") or "",
detail.get("reason") or "",
tuple(detail.get("gate_reasons") if isinstance(detail.get("gate_reasons"), list) else []),
)
if reject_key in seen_trade_events:
continue
seen_trade_events.add(reject_key)
timeline.append({ timeline.append({
"time": _iso(row.get("time")), "time": _iso(row.get("time")),
"type": "trade", "type": "trade",
"title": f"{row.get('symbol') or '--'} · {row.get('event_type') or '交易事件'}", "title": f"{row.get('symbol') or '--'} · {row.get('event_type') or '交易事件'}",
"status": "ok", "status": "warn" if row.get("event_type") == "paper_gate_reject" else "ok",
"detail": _display_error_summary(row.get("message") or "", source="paper_trade"), "detail": _display_error_summary(row.get("message") or "", source="paper_trade"),
}) })
if len(seen_trade_events) >= 12:
break
for row in _fetchall( for row in _fetchall(
conn, conn,
""" """

View File

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import hashlib
import json import json
import os import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
@ -317,9 +318,15 @@ def _strategy_code_from_rec(rec: dict) -> str:
def _paper_cfg_for_rec(rec: dict, config: dict | None = None) -> dict: def _paper_cfg_for_rec(rec: dict, config: dict | None = None) -> dict:
cfg = dict(_paper_cfg(config) or {}) cfg = dict(_paper_cfg(config) or {})
cfg.update(strategy_paper_config(_strategy_code_from_rec(rec))) raw_strategy_code = _raw_strategy_code_from_rec(rec)
cfg.update(strategy_paper_config(normalize_strategy_code(raw_strategy_code)))
# Strategy configs tune the intraday defaults, but explicit environment # Strategy configs tune the intraday defaults, but explicit environment
# overrides remain the emergency/manual control surface for local and prod. # overrides remain the legacy/manual default only when a recommendation has
# no explicit strategy identity. Strategy-owned signals must keep their own
# paper gates so multi-strategy evaluation is not flattened by global envs.
allow_global_gate_override = not bool(raw_strategy_code) or str(
os.getenv("ALPHAX_PAPER_FORCE_GLOBAL_GATE_OVERRIDES") or ""
).strip().lower() in {"1", "true", "yes", "on"}
env_float_keys = { env_float_keys = {
"ALPHAX_PAPER_ENTRY_MIN_REC_SCORE": "entry_min_rec_score", "ALPHAX_PAPER_ENTRY_MIN_REC_SCORE": "entry_min_rec_score",
"ALPHAX_PAPER_ENTRY_MIN_RR": "entry_min_rr", "ALPHAX_PAPER_ENTRY_MIN_RR": "entry_min_rr",
@ -334,7 +341,13 @@ def _paper_cfg_for_rec(rec: dict, config: dict | None = None) -> dict:
"ALPHAX_PAPER_DYNAMIC_LEVERAGE_MIN": "dynamic_leverage_min", "ALPHAX_PAPER_DYNAMIC_LEVERAGE_MIN": "dynamic_leverage_min",
} }
for env_name, key in env_float_keys.items(): for env_name, key in env_float_keys.items():
if os.getenv(env_name) is not None: is_gate_threshold = key in {
"entry_min_rec_score",
"entry_min_rr",
"order_min_rec_score",
"order_min_rr",
}
if os.getenv(env_name) is not None and (allow_global_gate_override or not is_gate_threshold):
cfg[key] = _safe_float(os.getenv(env_name), cfg.get(key)) cfg[key] = _safe_float(os.getenv(env_name), cfg.get(key))
if os.getenv("ALPHAX_PAPER_ORDER_REQUIRE_CURRENT_TRIGGER") is not None: if os.getenv("ALPHAX_PAPER_ORDER_REQUIRE_CURRENT_TRIGGER") is not None:
cfg["order_require_current_trigger"] = str(os.getenv("ALPHAX_PAPER_ORDER_REQUIRE_CURRENT_TRIGGER") or "").strip().lower() in {"1", "true", "yes", "on"} cfg["order_require_current_trigger"] = str(os.getenv("ALPHAX_PAPER_ORDER_REQUIRE_CURRENT_TRIGGER") or "").strip().lower() in {"1", "true", "yes", "on"}
@ -572,6 +585,35 @@ def _same_gate_reject(a: dict, b: dict) -> bool:
) )
def _gate_reject_lock_id(rec_id: int, event_type: str, strategy_code: str, detail: dict) -> int:
gate_reasons = detail.get("gate_reasons") if isinstance(detail.get("gate_reasons"), list) else []
raw = json.dumps(
{
"rec_id": rec_id,
"event_type": event_type,
"strategy_code": strategy_code,
"reason": detail.get("reason") or "",
"gate_reasons": gate_reasons,
},
ensure_ascii=False,
sort_keys=True,
default=str,
)
digest = hashlib.sha256(raw.encode("utf-8")).digest()
return int.from_bytes(digest[:8], "big", signed=False) & 0x7FFFFFFFFFFFFFFF
def _lock_gate_reject_event(conn, rec_id: int, event_type: str, strategy_code: str, detail: dict) -> None:
if rec_id <= 0 or event_type != "paper_gate_reject":
return
try:
conn.execute("SELECT pg_advisory_xact_lock(%s)", (_gate_reject_lock_id(rec_id, event_type, strategy_code, detail),))
except Exception:
# Tests and future adapters should not fail merely because the current
# connection does not support PostgreSQL advisory locks.
return
def _recent_recommendation_event_exists(conn, rec_id: int, event_type: str, strategy_code: str, detail: dict, minutes: int = 30, event_time: str = "") -> bool: def _recent_recommendation_event_exists(conn, rec_id: int, event_type: str, strategy_code: str, detail: dict, minutes: int = 30, event_time: str = "") -> bool:
if rec_id <= 0 or event_type != "paper_gate_reject": if rec_id <= 0 or event_type != "paper_gate_reject":
return False return False
@ -591,7 +633,7 @@ def _recent_recommendation_event_exists(conn, rec_id: int, event_type: str, stra
AND COALESCE(strategy_code,'')=%s AND COALESCE(strategy_code,'')=%s
AND event_time::timestamp >= %s::timestamp AND event_time::timestamp >= %s::timestamp
ORDER BY event_time DESC ORDER BY event_time DESC
LIMIT 20 LIMIT 100
""", """,
(rec_id, event_type, strategy_code, cutoff), (rec_id, event_type, strategy_code, cutoff),
).fetchall() ).fetchall()
@ -617,6 +659,7 @@ def _record_recommendation_event(conn, rec: dict, event_type: str, message: str,
"action_status": rec.get("action_status") or plan.get("entry_action") or "", "action_status": rec.get("action_status") or plan.get("entry_action") or "",
"entry_plan": plan, "entry_plan": plan,
} }
_lock_gate_reject_event(conn, _safe_int(rec.get("id")), event_type, event_detail["strategy_code"], event_detail)
if _recent_recommendation_event_exists( if _recent_recommendation_event_exists(
conn, conn,
_safe_int(rec.get("id")), _safe_int(rec.get("id")),

View File

@ -1,3 +1,5 @@
import json
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from app.db import auth_db from app.db import auth_db
@ -80,6 +82,36 @@ def test_operations_dashboard_hides_retired_onchain_runtime_data(pg_conn):
assert all("onchain" not in str(x.get("title") or "").lower() for x in data["timeline"]) assert all("onchain" not in str(x.get("title") or "").lower() for x in data["timeline"])
def test_operations_dashboard_dedupes_repeated_gate_reject_timeline(pg_conn):
detail = {
"reason": "paper_order_gate_rejected",
"gate_reasons": ["rec_score_below_min"],
"strategy_code": "short_breakdown_retest_1h_v1",
}
for _ in range(3):
pg_conn.execute(
"""
INSERT INTO paper_trade_events (
trade_id, recommendation_id, symbol, event_type, event_time,
message, detail_json, strategy_code
)
VALUES (0, 9001, 'ONDO/USDT', 'paper_gate_reject', NOW(),
'日内策略挂单门禁拒绝', %s, 'short_breakdown_retest_1h_v1')
""",
(json.dumps(detail),),
)
pg_conn.commit()
data = get_operations_dashboard(hours=24)
gate_rows = [
x for x in data["timeline"]
if "ONDO/USDT" in str(x.get("title") or "") and "paper_gate_reject" in str(x.get("title") or "")
]
assert len(gate_rows) == 1
assert gate_rows[0]["status"] == "warn"
def test_operations_dashboard_sanitizes_external_provider_errors(): def test_operations_dashboard_sanitizes_external_provider_errors():
summary = _display_error_summary( summary = _display_error_summary(
"market:HTTPSConnectionPool(host='api.binance.com', port=443): " "market:HTTPSConnectionPool(host='api.binance.com', port=443): "

View File

@ -5,6 +5,8 @@ import pytest
from app.db import altcoin_db from app.db import altcoin_db
from app.db.paper_trading import ( from app.db.paper_trading import (
_gate_reject_lock_id,
_paper_order_gate,
delete_paper_order, delete_paper_order,
delete_paper_trade, delete_paper_trade,
get_paper_trading_performance, get_paper_trading_performance,
@ -338,6 +340,50 @@ def test_repeated_paper_gate_reject_is_deduped(monkeypatch):
assert events[0]["symbol"] == "DEDUP/USDT" assert events[0]["symbol"] == "DEDUP/USDT"
def test_strategy_order_gate_threshold_wins_over_global_env(monkeypatch):
monkeypatch.setenv("ALPHAX_PAPER_ORDER_MIN_REC_SCORE", "25")
rec = {
"id": 0,
"symbol": "SHORTOK/USDT",
"strategy_code": SHORT_BREAKDOWN_RETEST_STRATEGY,
"rec_score": 18,
"execution_status": "wait_pullback",
"entry_plan": {
"strategy_code": SHORT_BREAKDOWN_RETEST_STRATEGY,
"entry_action": "等回踩",
"side": "short",
"entry_price": 100,
"stop_loss": 110,
"tp1": 85,
"risk_reward_ok": True,
"rr1": 1.5,
},
}
ok, reasons, detail = _paper_order_gate(rec, 100)
assert ok is True
assert "rec_score_below_min" not in reasons
assert detail["min_rec_score"] == 18
def test_gate_reject_lock_id_uses_stable_reject_identity():
base = {
"reason": "paper_order_gate_rejected",
"gate_reasons": ["rec_score_below_min"],
"action_status": "等回踩",
"execution_status": "wait_pullback",
}
changed_runtime = {**base, "action_status": "等待回踩", "execution_status": "observe"}
assert _gate_reject_lock_id(1, "paper_gate_reject", SHORT_BREAKDOWN_RETEST_STRATEGY, base) == _gate_reject_lock_id(
1,
"paper_gate_reject",
SHORT_BREAKDOWN_RETEST_STRATEGY,
changed_runtime,
)
def test_wait_pullback_requires_confirmed_risk_reward(monkeypatch): def test_wait_pullback_requires_confirmed_risk_reward(monkeypatch):
monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1") monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1")
altcoin_db.init_db() altcoin_db.init_db()