This commit is contained in:
aaron 2026-06-04 23:33:21 +08:00
parent 15900b9f53
commit 82b61bd808
20 changed files with 783 additions and 463 deletions

View File

@ -58,6 +58,11 @@ def build_parser():
repair_strategy.add_argument("--limit", type=int, default=500, help="最多扫描的 recommendation 数量")
repair_strategy.add_argument("--dry-run", action="store_true", help="只预览不写库")
reset_strategy = subparsers.add_parser("reset-strategy-samples", help="清理已下线策略的历史样本")
reset_strategy.add_argument("--scope", choices=["legacy-strategies"], required=True)
reset_strategy.add_argument("--confirm", required=True, help="必须传 CLEAN_LEGACY_STRATEGY_SAMPLES")
reset_strategy.add_argument("--no-backup", action="store_true", help="跳过 pg_dump 备份,仅用于测试")
return parser
@ -132,6 +137,12 @@ def main():
result = repair_strategy_direction_mismatches(limit=args.limit, dry_run=args.dry_run)
print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2))
return result
if args.command == "reset-strategy-samples":
from app.db.strategy_sample_cleanup import cleanup_legacy_strategy_samples
result = cleanup_legacy_strategy_samples(confirm=args.confirm, create_backup=not args.no_backup)
print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2))
return result
parser.error(f"unknown command: {args.command}")

View File

@ -5,13 +5,19 @@ from __future__ import annotations
from dataclasses import dataclass, field
MAIN_COMPOSITE_STRATEGY = "main_composite_v1"
BOX_RETEST_1H_STRATEGY = "box_retest_1h_v1"
BOX_RETEST_4H_STRATEGY = "box_retest_4h_v1"
VOLUME_IGNITION_1H_STRATEGY = "volume_ignition_1h_v1"
COMPRESSION_BREAKOUT_4H_STRATEGY = "compression_breakout_4h_v1"
INTRADAY_MOMENTUM_15M_STRATEGY = "intraday_momentum_15m_v1"
BREAKDOWN_RETEST_SHORT_1H_STRATEGY = "breakdown_retest_short_1h_v1"
LONG_MOMENTUM_BREAKOUT_STRATEGY = "long_momentum_breakout_15m_1h_v1"
LONG_SECOND_WAVE_PULLBACK_STRATEGY = "long_second_wave_pullback_1h_v1"
SHORT_BREAKDOWN_RETEST_STRATEGY = "short_breakdown_retest_1h_v1"
# Compatibility aliases for old imports. These aliases intentionally map old
# names to the new active strategy pool so new data never emits retired codes.
MAIN_COMPOSITE_STRATEGY = LONG_MOMENTUM_BREAKOUT_STRATEGY
BOX_RETEST_1H_STRATEGY = LONG_SECOND_WAVE_PULLBACK_STRATEGY
BOX_RETEST_4H_STRATEGY = LONG_SECOND_WAVE_PULLBACK_STRATEGY
VOLUME_IGNITION_1H_STRATEGY = LONG_MOMENTUM_BREAKOUT_STRATEGY
COMPRESSION_BREAKOUT_4H_STRATEGY = LONG_SECOND_WAVE_PULLBACK_STRATEGY
INTRADAY_MOMENTUM_15M_STRATEGY = LONG_MOMENTUM_BREAKOUT_STRATEGY
BREAKDOWN_RETEST_SHORT_1H_STRATEGY = SHORT_BREAKDOWN_RETEST_STRATEGY
@dataclass(frozen=True)
@ -27,67 +33,17 @@ class StrategyDefinition:
STRATEGY_DEFINITIONS: dict[str, StrategyDefinition] = {
MAIN_COMPOSITE_STRATEGY: StrategyDefinition(
strategy_code=MAIN_COMPOSITE_STRATEGY,
strategy_name="综合确认策略",
description="迁移期兼容综合策略,承载现有综合筛选与确认逻辑;它与其他策略平等运行",
direction="both",
LONG_MOMENTUM_BREAKOUT_STRATEGY: StrategyDefinition(
strategy_code=LONG_MOMENTUM_BREAKOUT_STRATEGY,
strategy_name="多头动量启动",
description="15m当前突破叠加1H成交量/波动增强捕捉山寨币日内到1-3天启动段",
direction="long",
mode="paper_enabled",
),
BOX_RETEST_1H_STRATEGY: StrategyDefinition(
strategy_code=BOX_RETEST_1H_STRATEGY,
strategy_name="1H箱体突破回踩",
description="小时级底部箱体突破后回踩箱体上沿或EMA承接的早期结构策略。",
mode="paper_only",
entry_gate_config={
"min_entry_score_buy_now": 1,
"min_entry_score_wait_pullback": 0,
"min_rr_buy_now": 1.2,
"max_wait_pullback_deviation_pct": 20,
"breakout_distance_wait_pct": 25,
"gain_24h_wait_pct": 12,
},
paper_config={
"entry_min_rr": 1.5,
"order_min_rr": 1.5,
"order_min_distance_to_entry_pct": 0,
"order_require_current_trigger": False,
"dynamic_leverage_enabled": True,
"dynamic_leverage_min": 3,
},
),
BOX_RETEST_4H_STRATEGY: StrategyDefinition(
strategy_code=BOX_RETEST_4H_STRATEGY,
strategy_name="4H箱体突破回踩",
description="底部箱体突破后回踩箱体上沿或EMA承接的结构策略。",
mode="paper_only",
entry_gate_config={
"min_entry_score_buy_now": 1,
"min_entry_score_wait_pullback": 0,
"min_rr_buy_now": 1.2,
"max_wait_pullback_deviation_pct": 20,
"breakout_distance_wait_pct": 25,
"gain_24h_wait_pct": 12,
},
paper_config={
"entry_min_rr": 1.5,
"order_min_rr": 1.5,
"order_min_distance_to_entry_pct": 0,
"order_require_current_trigger": False,
"dynamic_leverage_enabled": True,
"dynamic_leverage_min": 3,
},
),
VOLUME_IGNITION_1H_STRATEGY: StrategyDefinition(
strategy_code=VOLUME_IGNITION_1H_STRATEGY,
strategy_name="1H放量突破启动",
description="1H量价齐飞或连续放量后的启动策略适合捕捉日内到3天的第一段加速。",
mode="paper_only",
entry_gate_config={
"min_entry_score_buy_now": 2,
"min_entry_score_wait_pullback": 1,
"min_rr_buy_now": 1.25,
"breakout_distance_wait_pct": 12,
"min_entry_score_buy_now": 3,
"min_entry_score_wait_pullback": 2,
"min_rr_buy_now": 1.5,
"breakout_distance_wait_pct": 8,
"gain_24h_wait_pct": 10,
},
paper_config={
@ -99,55 +55,35 @@ STRATEGY_DEFINITIONS: dict[str, StrategyDefinition] = {
"dynamic_leverage_min": 3,
},
),
COMPRESSION_BREAKOUT_4H_STRATEGY: StrategyDefinition(
strategy_code=COMPRESSION_BREAKOUT_4H_STRATEGY,
strategy_name="4H压缩蓄力突破",
description="4H静K蓄力、底部抬高或压缩放量后的突破策略偏向捕捉1周以内主升前段。",
mode="paper_only",
LONG_SECOND_WAVE_PULLBACK_STRATEGY: StrategyDefinition(
strategy_code=LONG_SECOND_WAVE_PULLBACK_STRATEGY,
strategy_name="多头二波回踩",
description="强势榜或放量币第一波后回踩EMA、箱体上沿或前高转支撑再次承接的短线策略。",
direction="long",
mode="paper_enabled",
entry_gate_config={
"min_entry_score_buy_now": 2,
"min_entry_score_wait_pullback": 0,
"min_rr_buy_now": 1.3,
"max_wait_pullback_deviation_pct": 18,
"breakout_distance_wait_pct": 18,
"gain_24h_wait_pct": 12,
"min_rr_buy_now": 1.5,
"max_wait_pullback_deviation_pct": 10,
"breakout_distance_wait_pct": 12,
"gain_24h_wait_pct": 18,
},
paper_config={
"entry_min_rr": 1.6,
"order_min_rr": 1.6,
"order_min_distance_to_entry_pct": 0,
"entry_min_rr": 1.5,
"order_min_rr": 1.5,
"order_min_distance_to_entry_pct": 1.5,
"order_require_current_trigger": False,
"dynamic_leverage_enabled": True,
"dynamic_leverage_min": 3,
},
),
INTRADAY_MOMENTUM_15M_STRATEGY: StrategyDefinition(
strategy_code=INTRADAY_MOMENTUM_15M_STRATEGY,
strategy_name="15m日内动量延续",
description="短周期放量突破与1H背景共振的日内动量策略只做当前触发不做纯观察追高。",
mode="paper_only",
entry_gate_config={
"min_entry_score_buy_now": 3,
"min_entry_score_wait_pullback": 2,
"min_rr_buy_now": 1.4,
"breakout_distance_wait_pct": 8,
"gain_24h_wait_pct": 8,
},
paper_config={
"entry_min_rr": 1.6,
"order_min_rr": 1.6,
"order_min_distance_to_entry_pct": 0,
"order_require_current_trigger": True,
"dynamic_leverage_enabled": True,
"dynamic_leverage_min": 3,
},
),
BREAKDOWN_RETEST_SHORT_1H_STRATEGY: StrategyDefinition(
strategy_code=BREAKDOWN_RETEST_SHORT_1H_STRATEGY,
strategy_name="1H破位反抽做空",
description="箱体或关键均线破位后反抽失败的空头策略;只用于独立空头样本,不与多头突破策略共享入场门槛。",
SHORT_BREAKDOWN_RETEST_STRATEGY: StrategyDefinition(
strategy_code=SHORT_BREAKDOWN_RETEST_STRATEGY,
strategy_name="空头破位反抽",
description="1H支撑或箱体下沿破位后反抽失败叠加15m弱确认和相对弱势的空头短线策略。",
direction="short",
mode="paper_only",
mode="paper_enabled",
entry_gate_config={
"direction": "short",
"min_entry_score_buy_now": 2,
@ -170,7 +106,18 @@ STRATEGY_DEFINITIONS: dict[str, StrategyDefinition] = {
def normalize_strategy_code(strategy_code: str | None) -> str:
code = str(strategy_code or "").strip()
return code or MAIN_COMPOSITE_STRATEGY
legacy_map = {
"main_composite_v1": LONG_MOMENTUM_BREAKOUT_STRATEGY,
"volume_ignition_1h_v1": LONG_MOMENTUM_BREAKOUT_STRATEGY,
"intraday_momentum_15m_v1": LONG_MOMENTUM_BREAKOUT_STRATEGY,
"box_retest_1h_v1": LONG_SECOND_WAVE_PULLBACK_STRATEGY,
"box_retest_4h_v1": LONG_SECOND_WAVE_PULLBACK_STRATEGY,
"compression_breakout_4h_v1": LONG_SECOND_WAVE_PULLBACK_STRATEGY,
"breakdown_retest_short_1h_v1": SHORT_BREAKDOWN_RETEST_STRATEGY,
}
if code in legacy_map:
return legacy_map[code]
return code or LONG_MOMENTUM_BREAKOUT_STRATEGY
def strategy_definition(strategy_code: str | None) -> StrategyDefinition:

View File

@ -0,0 +1,26 @@
INSERT INTO strategy_catalog (
strategy_code, strategy_name, strategy_version, status, mode, description, config_json, created_at, updated_at
) VALUES
('long_momentum_breakout_15m_1h_v1', '多头动量启动', '', 'active', 'paper_enabled', '15m当前突破叠加1H成交量/波动增强捕捉山寨币日内到1-3天启动段。', '{}', NOW()::TEXT, NOW()::TEXT),
('long_second_wave_pullback_1h_v1', '多头二波回踩', '', 'active', 'paper_enabled', '强势榜或放量币第一波后回踩EMA、箱体上沿或前高转支撑再次承接的短线策略。', '{}', NOW()::TEXT, NOW()::TEXT),
('short_breakdown_retest_1h_v1', '空头破位反抽', '', 'active', 'paper_enabled', '1H支撑或箱体下沿破位后反抽失败叠加15m弱确认和相对弱势的空头短线策略。', '{}', NOW()::TEXT, NOW()::TEXT)
ON CONFLICT(strategy_code) DO UPDATE SET
strategy_name=EXCLUDED.strategy_name,
status=EXCLUDED.status,
mode=EXCLUDED.mode,
description=EXCLUDED.description,
updated_at=NOW()::TEXT;
UPDATE strategy_catalog
SET status='retired',
mode='disabled',
updated_at=NOW()::TEXT
WHERE strategy_code IN (
'main_composite_v1',
'volume_ignition_1h_v1',
'intraday_momentum_15m_v1',
'box_retest_1h_v1',
'box_retest_4h_v1',
'compression_breakout_4h_v1',
'breakdown_retest_short_1h_v1'
);

View File

@ -27,11 +27,24 @@ from app.core.trade_math import (
stop_loss_distance_pct as side_stop_loss_distance_pct,
tighter_stop,
)
from app.core.strategy_registry import normalize_strategy_code, strategy_label, strategy_paper_config
from app.core.strategy_registry import (
LONG_MOMENTUM_BREAKOUT_STRATEGY,
LONG_SECOND_WAVE_PULLBACK_STRATEGY,
SHORT_BREAKDOWN_RETEST_STRATEGY,
normalize_strategy_code,
strategy_label,
strategy_paper_config,
)
from app.db.schema import get_conn
from app.db.system_logs import record_system_error
from app.integrations.feishu_push import push_card
ACTIVE_PAPER_STRATEGIES = {
LONG_MOMENTUM_BREAKOUT_STRATEGY,
LONG_SECOND_WAVE_PULLBACK_STRATEGY,
SHORT_BREAKDOWN_RETEST_STRATEGY,
}
def _now() -> str:
return datetime.now().isoformat()
@ -277,9 +290,29 @@ def _entry_plan(rec: dict) -> dict:
return _loads_json(rec.get("entry_plan_json"), {})
def _strategy_code_from_rec(rec: dict) -> str:
def _raw_strategy_code_from_rec(rec: dict) -> str:
plan = _entry_plan(rec)
return normalize_strategy_code(rec.get("strategy_code") or plan.get("strategy_code"))
raw_code = str(rec.get("strategy_code") or plan.get("strategy_code") or "").strip()
if raw_code:
return raw_code
rec_id = _safe_int(rec.get("id"))
if rec_id <= 0:
return ""
try:
with get_conn() as conn:
row = conn.execute("SELECT strategy_code FROM recommendation WHERE id=%s", (rec_id,)).fetchone()
return str((row or {}).get("strategy_code") or "").strip()
except Exception:
return ""
def _is_active_paper_strategy_rec(rec: dict) -> bool:
raw_code = _raw_strategy_code_from_rec(rec)
return raw_code in ACTIVE_PAPER_STRATEGIES
def _strategy_code_from_rec(rec: dict) -> str:
return normalize_strategy_code(_raw_strategy_code_from_rec(rec))
def _paper_cfg_for_rec(rec: dict, config: dict | None = None) -> dict:
@ -1529,6 +1562,13 @@ def sync_recommendation(rec: dict, current_price: float, event_time: str = "") -
current_price = _safe_float(current_price)
if rec_id <= 0 or not symbol or current_price <= 0:
return {"enabled": True, "skipped": True, "reason": "invalid_input"}
if not _is_active_paper_strategy_rec(rec):
return {
"enabled": True,
"skipped": True,
"reason": "legacy_strategy_disabled",
"strategy_code": _raw_strategy_code_from_rec(rec),
}
execution_status = str(rec.get("execution_status") or "").strip()
action_status = str(rec.get("action_status") or "").strip()
event_time = event_time or _now()
@ -1712,6 +1752,26 @@ def sync_pending_paper_orders(limit: int = 100, event_time: str = "", config: di
"derivatives_context_json": item.get("derivatives_context_json"),
"sector_context_json": item.get("sector_context_json"),
}
if not _is_active_paper_strategy_rec(rec):
conn.execute(
"""
UPDATE paper_orders
SET status='canceled',
cancel_reason='legacy_strategy_disabled',
canceled_at=%s,
updated_at=%s
WHERE id=%s AND status='pending'
""",
(event_time, event_time, order.get("id")),
)
results.append({
"skipped": True,
"reason": "legacy_strategy_disabled",
"paper_order_id": order.get("id"),
"symbol": order.get("symbol"),
"strategy_code": _raw_strategy_code_from_rec(rec),
})
continue
if current_price <= 0:
result = {
"skipped": True,

View File

@ -12,7 +12,11 @@ from app.core.opportunity_lifecycle import (
from app.core.signal_taxonomy import signal_codes as build_signal_codes, signal_labels as build_signal_labels
from app.core.strategy_contract import signal_to_recommendation_context
from app.core.trade_direction import direction_label, trade_side_from_payload
from app.core.strategy_registry import normalize_strategy_code
from app.core.strategy_registry import (
LONG_MOMENTUM_BREAKOUT_STRATEGY,
LONG_SECOND_WAVE_PULLBACK_STRATEGY,
normalize_strategy_code,
)
from app.db.recommendation_state import (
derive_minimal_state_fields,
entry_window_policy,
@ -80,6 +84,9 @@ def create_recommendation(
):
"""Create or merge the current recommendation record for one symbol."""
entry_plan = dict(entry_plan or {})
explicit_strategy_code = str(strategy_code or "").strip()
if not explicit_strategy_code and str(entry_plan.get("entry_action") or "").strip() in {"等回踩", "等待回踩"}:
strategy_code = LONG_SECOND_WAVE_PULLBACK_STRATEGY
raw_pct = round(rec_score * 100.0 / 30) if rec_score else 0
rec_score_pct = min(raw_pct, 100)
strategy_context = signal_to_recommendation_context(
@ -137,7 +144,7 @@ def create_recommendation(
UPDATE recommendation
SET rec_state=%s, rec_score=%s, sector=COALESCE(NULLIF(%s, ''), sector),
signals=%s, signal_codes_json=%s, signal_labels_json=%s, is_meme=%s, direction=%s, strategy_version=%s,
strategy_code=COALESCE(NULLIF(%s, ''), NULLIF(strategy_code, ''), 'main_composite_v1'),
strategy_code=COALESCE(NULLIF(%s, ''), NULLIF(strategy_code, ''), %s),
strategy_signal_id=CASE WHEN %s > 0 THEN %s ELSE COALESCE(strategy_signal_id, 0) END,
strategy_snapshot_json=CASE WHEN %s != '{}' THEN %s ELSE COALESCE(strategy_snapshot_json, '{}') END,
factor_roles_json=CASE WHEN %s != '{}' THEN %s ELSE COALESCE(factor_roles_json, '{}') END,
@ -170,6 +177,7 @@ def create_recommendation(
direction,
strategy_version,
strategy_code,
LONG_MOMENTUM_BREAKOUT_STRATEGY,
strategy_signal_id,
strategy_signal_id,
json.dumps(strategy_snapshot or {}, ensure_ascii=False),

View File

@ -0,0 +1,152 @@
"""One-time cleanup for retired strategy samples."""
from __future__ import annotations
import os
import subprocess
from datetime import datetime
from pathlib import Path
from app.db.postgres_connection import get_database_url
from app.db.schema import get_conn
LEGACY_STRATEGY_CODES = (
"main_composite_v1",
"volume_ignition_1h_v1",
"intraday_momentum_15m_v1",
"box_retest_1h_v1",
"box_retest_4h_v1",
"compression_breakout_4h_v1",
"breakdown_retest_short_1h_v1",
)
CONFIRM_TOKEN = "CLEAN_LEGACY_STRATEGY_SAMPLES"
def _repo_root() -> Path:
return Path(__file__).resolve().parents[2]
def backup_database() -> str:
backup_dir = _repo_root() / "data" / "backups"
backup_dir.mkdir(parents=True, exist_ok=True)
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
out_file = backup_dir / f"legacy_strategy_cleanup_before_{stamp}.dump"
subprocess.run(
["pg_dump", get_database_url(), "--format=custom", "--file", str(out_file)],
check=True,
)
return str(out_file)
def cleanup_legacy_strategy_samples(*, confirm: str, create_backup: bool = True) -> dict:
if confirm != CONFIRM_TOKEN:
raise ValueError(f"confirm must be {CONFIRM_TOKEN}")
backup_path = backup_database() if create_backup else ""
conn = get_conn()
deleted = {
"paper_trade_events": 0,
"paper_orders": 0,
"paper_trades": 0,
"recommendation": 0,
"strategy_signals": 0,
}
try:
trade_ids = [
int(row["id"])
for row in conn.execute(
"SELECT id FROM paper_trades WHERE strategy_code = ANY(%s)",
(list(LEGACY_STRATEGY_CODES),),
).fetchall()
]
recommendation_ids = [
int(row["id"])
for row in conn.execute(
"SELECT id FROM recommendation WHERE strategy_code = ANY(%s)",
(list(LEGACY_STRATEGY_CODES),),
).fetchall()
]
with conn.transaction():
if trade_ids:
deleted["paper_trade_events"] += int(
conn.execute(
"SELECT COUNT(*) FROM paper_trade_events WHERE trade_id = ANY(%s)",
(trade_ids,),
).fetchone()[0]
or 0
)
conn.execute("DELETE FROM paper_trade_events WHERE trade_id = ANY(%s)", (trade_ids,))
if recommendation_ids:
deleted["paper_trade_events"] += int(
conn.execute(
"SELECT COUNT(*) FROM paper_trade_events WHERE recommendation_id = ANY(%s)",
(recommendation_ids,),
).fetchone()[0]
or 0
)
conn.execute("DELETE FROM paper_trade_events WHERE recommendation_id = ANY(%s)", (recommendation_ids,))
deleted["paper_orders"] = int(
conn.execute(
"SELECT COUNT(*) FROM paper_orders WHERE recommendation_id = ANY(%s) OR strategy_code = ANY(%s)",
(recommendation_ids, list(LEGACY_STRATEGY_CODES)),
).fetchone()[0]
or 0
)
conn.execute(
"DELETE FROM paper_orders WHERE recommendation_id = ANY(%s) OR strategy_code = ANY(%s)",
(recommendation_ids, list(LEGACY_STRATEGY_CODES)),
)
else:
deleted["paper_orders"] = int(
conn.execute(
"SELECT COUNT(*) FROM paper_orders WHERE strategy_code = ANY(%s)",
(list(LEGACY_STRATEGY_CODES),),
).fetchone()[0]
or 0
)
conn.execute("DELETE FROM paper_orders WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),))
deleted["paper_trades"] = int(
conn.execute(
"SELECT COUNT(*) FROM paper_trades WHERE strategy_code = ANY(%s)",
(list(LEGACY_STRATEGY_CODES),),
).fetchone()[0]
or 0
)
conn.execute("DELETE FROM paper_trades WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),))
deleted["recommendation"] = int(
conn.execute(
"SELECT COUNT(*) FROM recommendation WHERE strategy_code = ANY(%s)",
(list(LEGACY_STRATEGY_CODES),),
).fetchone()[0]
or 0
)
conn.execute("DELETE FROM recommendation WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),))
deleted["strategy_signals"] = int(
conn.execute(
"SELECT COUNT(*) FROM strategy_signals WHERE strategy_code = ANY(%s)",
(list(LEGACY_STRATEGY_CODES),),
).fetchone()[0]
or 0
)
conn.execute("DELETE FROM strategy_signals WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),))
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
return {
"status": "completed",
"scope": "legacy-strategies",
"legacy_strategy_codes": list(LEGACY_STRATEGY_CODES),
"backup_path": backup_path,
"deleted": deleted,
"run_time": datetime.now().isoformat(),
}

View File

@ -42,10 +42,9 @@ from app.config.config_loader import (
)
from app.core.opportunity_lifecycle import apply_entry_quality_gate
from app.core.strategy_registry import (
BOX_RETEST_1H_STRATEGY,
BOX_RETEST_4H_STRATEGY,
BREAKDOWN_RETEST_SHORT_1H_STRATEGY,
MAIN_COMPOSITE_STRATEGY,
LONG_MOMENTUM_BREAKOUT_STRATEGY,
LONG_SECOND_WAVE_PULLBACK_STRATEGY,
is_strategy_allowed_for_side,
)
from app.core.trade_direction import direction_label, normalize_trade_side
@ -63,11 +62,9 @@ from app.db.onchain_db import get_onchain_factor_context
from app.db.strategy_signal_queries import insert_strategy_signal
from app.services.market_overview import get_crypto_market_overview
from app.strategies.altcoin_breakout import (
build_compression_breakout_4h_signal,
build_intraday_momentum_15m_signal,
build_volume_ignition_1h_signal,
build_long_momentum_breakout_signal,
build_long_second_wave_pullback_signal,
)
from app.strategies.box_retest_4h import build_box_retest_1h_signal, build_box_retest_4h_signal
from app.strategies.short_breakdown import build_breakdown_retest_short_1h_signal, detect_breakdown_retest_short_1h
from app.config.config_loader import _get_section as _get_cfg_section
from app.core.pa_engine import (
@ -118,32 +115,9 @@ def _strategy_context_for_recommendation(symbol: str, result: dict, entry_plan:
)
else:
signal_candidates.extend([
build_volume_ignition_1h_signal(symbol=symbol, result=result, entry_plan=entry_plan or {}),
build_compression_breakout_4h_signal(symbol=symbol, result=result, entry_plan=entry_plan or {}),
build_intraday_momentum_15m_signal(symbol=symbol, result=result, entry_plan=entry_plan or {}),
build_long_momentum_breakout_signal(symbol=symbol, result=result, entry_plan=entry_plan or {}),
build_long_second_wave_pullback_signal(symbol=symbol, result=result, entry_plan=entry_plan or {}),
])
if bp_1h.get("detected"):
signal_candidates.append(
build_box_retest_1h_signal(
symbol=symbol,
current_price=result.get("price") or 0,
detection=bp_1h,
entry_plan=entry_plan or {},
market_regime=market_regime,
decision_log=result.get("decision_log") or {},
)
)
if bp_4h.get("detected"):
signal_candidates.append(
build_box_retest_4h_signal(
symbol=symbol,
current_price=result.get("price") or 0,
detection=bp_4h,
entry_plan=entry_plan or {},
market_regime=market_regime,
decision_log=result.get("decision_log") or {},
)
)
saved_payloads = []
for signal in [item for item in signal_candidates if item]:
payload = signal.to_json_dict() if hasattr(signal, "to_json_dict") else dict(signal)
@ -1209,6 +1183,18 @@ def confirm_burst(symbol, cand):
except Exception:
cand_change_24h = 0.0
cand_signal_text = " ".join(str(x) for x in (json.loads(cand.get("signals", "[]")) if isinstance(cand.get("signals"), str) and cand.get("signals", "").strip().startswith("[") else [cand.get("signals", "")]))
cand_short_hint = (
cand_detail.get("short_breakdown_retest_1h")
or (cand_detail.get("market_context") or {}).get("short_breakdown_retest_1h")
or {}
)
if trade_side != "short" and (
bool(cand_short_hint.get("detected"))
or "破位反抽做空" in cand_signal_text
or "等待反抽失败" in cand_signal_text
or "breakdown_retest_1h_short" in cand_signal_text
):
trade_side = "short"
cand_is_top_gainer = bool(cand_detail.get("top_gainer_24h") or "24h强势榜" in cand_signal_text or cand_change_24h >= get_burst_threshold(symbol) * 1.5)
h1_df = fetch_klines(symbol, "1h", limit=100)
@ -2038,11 +2024,14 @@ def confirm_burst(symbol, cand):
entry_plan = short_entry_plan
gate_strategy_code = BREAKDOWN_RETEST_SHORT_1H_STRATEGY
else:
gate_strategy_code = (
BOX_RETEST_1H_STRATEGY if bp_1h.get("detected")
else BOX_RETEST_4H_STRATEGY if bp_4h.get("detected")
else MAIN_COMPOSITE_STRATEGY
signal_text = " ".join(str(x or "") for x in signals)
has_second_wave_context = (
"24h强势榜" in signal_text
or "回踩" in signal_text
or bp_1h.get("detected")
or bp_4h.get("detected")
)
gate_strategy_code = LONG_SECOND_WAVE_PULLBACK_STRATEGY if has_second_wave_context else LONG_MOMENTUM_BREAKOUT_STRATEGY
entry_plan.setdefault("strategy_code", gate_strategy_code)
# v1.7.5 买点质量闸门:确认强势 ≠ 允许现价追买。
@ -2288,10 +2277,13 @@ def _result_brief(item: dict) -> dict:
decision = item.get("decision_log") or ctx.get("decision_log") or {}
signal_text = " ".join(str(x) for x in (item.get("signals") or []))
inferred_strategy = ""
if "1H箱体突破回踩" in signal_text:
inferred_strategy = "box_retest_1h_v1"
elif "4H箱体突破回踩" in signal_text:
inferred_strategy = "box_retest_4h_v1"
side = normalize_trade_side(item.get("side") or ctx.get("side") or item.get("direction"))
if side == "short" or any(key in signal_text for key in ("破位反抽做空", "等待反抽失败", "breakdown_retest_1h_short")):
inferred_strategy = BREAKDOWN_RETEST_SHORT_1H_STRATEGY
elif any(key in signal_text for key in ("24h强势榜", "回踩", "箱体突破回踩")):
inferred_strategy = LONG_SECOND_WAVE_PULLBACK_STRATEGY
elif any(key in signal_text for key in ("15min", "短周期", "量价齐飞", "放量")):
inferred_strategy = LONG_MOMENTUM_BREAKOUT_STRATEGY
return {
"symbol": item.get("symbol"),
"confirmed": bool(item.get("confirmed")),

View File

@ -219,6 +219,12 @@ def _kline_scan_config():
"enabled": bool(cfg.get("enabled", True)),
"main_min_volume_usd": float(cfg.get("main_min_volume_usd", MIN_24H_VOLUME_USD) or 0),
"bypass_min_volume_usd": float(cfg.get("bypass_min_volume_usd", 2_000_000) or 0),
"discovery_min_volume_usd": float(cfg.get("discovery_min_volume_usd", cfg.get("bypass_min_volume_usd", 2_000_000)) or 0),
"tier_a_budget": max(0, int(cfg.get("tier_a_budget", 90) or 0)),
"tier_b_budget": max(0, int(cfg.get("tier_b_budget", 90) or 0)),
"bypass_extra_budget": max(0, int(cfg.get("bypass_extra_budget", 50) or 0)),
"tier_a_min_score": float(cfg.get("tier_a_min_score", 45) or 0),
"tier_b_min_score": float(cfg.get("tier_b_min_score", 20) or 0),
"short_tf_min_volume_usd": float(cfg.get("short_tf_min_volume_usd", 5_000_000) or 0),
"short_tf_min_abs_change_pct": float(cfg.get("short_tf_min_abs_change_pct", 1.0) or 0),
"short_tf_high_volume_usd": float(cfg.get("short_tf_high_volume_usd", 20_000_000) or 0),
@ -260,6 +266,123 @@ def _symbol_priority_score(symbol: str, info: dict, recently_screened: set) -> t
)
def _range_24h_pct(info: dict) -> float:
price = float((info or {}).get("price") or 0)
high = float((info or {}).get("high_24h") or 0)
low = float((info or {}).get("low_24h") or 0)
base = price or low
if base <= 0 or high <= 0 or low <= 0 or high < low:
return 0.0
return (high - low) / base * 100
def _discovery_priority_score(symbol: str, info: dict, recently_screened: set, cfg: dict) -> dict:
"""Cheap all-market score used only to decide which symbols deserve K-line calls."""
volume = float((info or {}).get("volume_24h") or 0)
change = float((info or {}).get("change_24h") or 0)
abs_change = abs(change)
range_pct = _range_24h_pct(info)
main_min = float(cfg.get("main_min_volume_usd") or 0)
high_volume = max(float(cfg.get("short_tf_high_volume_usd") or 0), main_min * 3)
top_gainer = _is_top_gainer_candidate(
symbol,
info,
min_volume=float(cfg.get("discovery_min_volume_usd") or 0),
)
score = 0.0
reasons = []
if top_gainer:
score += 40
reasons.append("24h强势榜")
if symbol in recently_screened:
score += 12
reasons.append("近期已关注")
if high_volume and volume >= high_volume:
score += 22
reasons.append("高成交额")
elif main_min and volume >= main_min * 2:
score += 14
reasons.append("成交额活跃")
elif main_min and volume >= main_min:
score += 8
reasons.append("成交额达标")
if abs_change >= 12:
score += 26
reasons.append("价格大幅波动")
elif abs_change >= 6:
score += 18
reasons.append("价格波动扩大")
elif abs_change >= 2:
score += 8
reasons.append("价格开始活跃")
if range_pct >= 18:
score += 18
reasons.append("24h振幅很大")
elif range_pct >= 8:
score += 10
reasons.append("24h振幅扩大")
tier_a_min = float(cfg.get("tier_a_min_score") or 45)
tier_b_min = float(cfg.get("tier_b_min_score") or 20)
tier = "A" if score >= tier_a_min else "B" if score >= tier_b_min else "C"
return {
"symbol": symbol,
"score": round(score, 2),
"tier": tier,
"reasons": reasons or ["轻量观察"],
"volume_24h": volume,
"change_24h": change,
"range_24h_pct": round(range_pct, 2),
"top_gainer_24h": top_gainer,
}
def _build_discovery_priority_queue(tickers: dict, *, recently_screened: set, cfg: dict) -> list[dict]:
min_volume = float(cfg.get("discovery_min_volume_usd") or 0)
queue = []
for symbol, info in (tickers or {}).items():
volume = float((info or {}).get("volume_24h") or 0)
if min_volume and volume < min_volume and not _is_top_gainer_candidate(symbol, info):
continue
queue.append(_discovery_priority_score(symbol, info, recently_screened, cfg))
queue.sort(
key=lambda item: (
item.get("tier") == "A",
item.get("score", 0),
item.get("top_gainer_24h", False),
item.get("volume_24h", 0),
),
reverse=True,
)
return queue
def _select_discovery_tiers(queue: list[dict], *, tier_a_budget: int, tier_b_budget: int, extra_tiers=(), extra_budget: int = 0) -> list[str]:
selected = []
selected_set = set()
def add(items, budget):
if budget <= 0:
return
count = 0
for item in items:
symbol = item.get("symbol")
if not symbol or symbol in selected_set:
continue
if count >= budget:
break
selected.append(symbol)
selected_set.add(symbol)
count += 1
add([item for item in queue if item.get("tier") == "A"], tier_a_budget)
add([item for item in queue if item.get("tier") == "B"], tier_b_budget)
if extra_tiers and extra_budget:
add([item for item in queue if item.get("tier") in set(extra_tiers)], extra_budget)
return selected
def _rule_based_kline_scan_symbols(tickers: dict, *, recently_screened: set, min_volume: float = 0, emergency_max: int = 0) -> list[str]:
"""Select K-line scan universe by rules first; emergency_max is off by default."""
items = []
@ -988,14 +1111,16 @@ def _static_bypass_resonance(cand, *, static_cfg, sector_signal_count=0, top_tra
return list(dict.fromkeys(signals))
def _attach_top_gainer_discovery(candidates, tickers, recently_screened):
def _attach_top_gainer_discovery(candidates, tickers, recently_screened, priority_context=None):
"""为强势榜补发现入口;追高风险留给细筛/确认处理。"""
priority_context = priority_context or {}
added = 0
for symbol, info in tickers.items():
if symbol in candidates:
if _is_top_gainer_candidate(symbol, info):
candidates[symbol]["top_gainer_24h"] = True
candidates[symbol]["top_gainer_chase_risk"] = symbol not in recently_screened
candidates[symbol].setdefault("discovery_priority", priority_context.get(symbol, {}))
continue
if not _is_top_gainer_candidate(symbol, info):
continue
@ -1025,6 +1150,7 @@ def _attach_top_gainer_discovery(candidates, tickers, recently_screened):
"top_gainer_24h": True,
"top_gainer_chase_risk": symbol not in recently_screened,
"bypass_origin": "top_gainer_24h",
"discovery_priority": priority_context.get(symbol, {}),
}
added += 1
return added
@ -1104,21 +1230,39 @@ def layer1_coarse_filter():
else {}
)
cached_runtime_skip_count = 0
main_scan_symbols = set(_rule_based_kline_scan_symbols(
discovery_queue = _build_discovery_priority_queue(
tickers,
recently_screened=recently_screened,
min_volume=main_min_vol,
emergency_max=scan_cfg["emergency_main_max_symbols"],
))
bypass_scan_symbols = set(_rule_based_kline_scan_symbols(
tickers,
recently_screened=recently_screened,
min_volume=low_turnover_threshold,
emergency_max=scan_cfg["emergency_bypass_max_symbols"],
))
cfg=scan_cfg,
)
priority_context = {item["symbol"]: item for item in discovery_queue}
main_selected = _select_discovery_tiers(
discovery_queue,
tier_a_budget=scan_cfg["tier_a_budget"],
tier_b_budget=scan_cfg["tier_b_budget"],
)
bypass_selected = _select_discovery_tiers(
discovery_queue,
tier_a_budget=scan_cfg["tier_a_budget"],
tier_b_budget=0,
extra_tiers=("B", "C"),
extra_budget=scan_cfg["bypass_extra_budget"],
)
if scan_cfg["emergency_main_max_symbols"] > 0:
main_selected = main_selected[:scan_cfg["emergency_main_max_symbols"]]
if scan_cfg["emergency_bypass_max_symbols"] > 0:
bypass_selected = bypass_selected[:scan_cfg["emergency_bypass_max_symbols"]]
main_scan_symbols = set(main_selected)
bypass_scan_symbols = set(bypass_selected)
tier_counts = {
"A": sum(1 for item in discovery_queue if item.get("tier") == "A"),
"B": sum(1 for item in discovery_queue if item.get("tier") == "B"),
"C": sum(1 for item in discovery_queue if item.get("tier") == "C"),
}
print(
f" K线扫描规则: 主扫描{len(main_scan_symbols)}/{len(tickers)}"
f"旁路扫描{len(bypass_scan_symbols)}/{len(tickers)},动态缓存{len(cached_runtime_exclusions)}"
f" K线扫描队列: A{tier_counts['A']} B{tier_counts['B']} C{tier_counts['C']}"
f"主扫描{len(main_scan_symbols)}/{len(tickers)},旁路扫描{len(bypass_scan_symbols)}/{len(tickers)}"
f"动态缓存{len(cached_runtime_exclusions)}"
)
try:
@ -1337,6 +1481,7 @@ def layer1_coarse_filter():
"weighted_avg_price": round(weighted_avg_price, 6) if weighted_avg_price else 0,
"top_gainer_24h": _is_top_gainer_candidate(symbol, info),
"top_gainer_chase_risk": is_unseen_top_gainer,
"discovery_priority": priority_context.get(symbol, {}),
}
# ==== 第二遍扫描低成交量静K蓄力旁路 + 底部抬高 + 压缩放量 ====
@ -1420,6 +1565,7 @@ def layer1_coarse_filter():
"bypass_origin": True,
"top_gainer_24h": _is_top_gainer_candidate(symbol, info),
"top_gainer_chase_risk": is_unseen_top_gainer,
"discovery_priority": priority_context.get(symbol, {}),
}
bypass_count += 1
added = True
@ -1454,6 +1600,7 @@ def layer1_coarse_filter():
"bypass_origin": "higher_lows",
"top_gainer_24h": _is_top_gainer_candidate(symbol, info),
"top_gainer_chase_risk": is_unseen_top_gainer,
"discovery_priority": priority_context.get(symbol, {}),
}
hl_count_total += 1
added = True
@ -1488,11 +1635,12 @@ def layer1_coarse_filter():
"bypass_origin": "compression_surge",
"top_gainer_24h": _is_top_gainer_candidate(symbol, info),
"top_gainer_chase_risk": is_unseen_top_gainer,
"discovery_priority": priority_context.get(symbol, {}),
}
cs_count_total += 1
added = True
top_gainer_count = _attach_top_gainer_discovery(candidates, tickers, recently_screened)
top_gainer_count = _attach_top_gainer_discovery(candidates, tickers, recently_screened, priority_context)
# 第一道漏斗:把明确不可交易/太低成交额的资产写成独立阶段,研发侧可审计,
# 但不让它们进入后续机会链路。
@ -1580,6 +1728,7 @@ def layer1_coarse_filter():
"bypass_origin": cand.get("bypass_origin", ""),
"source_types": discovery_source_types(cand),
"signal_codes": build_signal_codes(signals),
"discovery_priority": cand.get("discovery_priority") or {},
},
),
)
@ -1600,7 +1749,15 @@ def layer1_coarse_filter():
"kline_h4_success_count": len(h4_success_symbols),
"coarse_candidate_count": len(candidates),
"top_gainer_discovery_count": top_gainer_count,
"discovery_queue_count": len(discovery_queue),
"discovery_tier_a_count": tier_counts["A"],
"discovery_tier_b_count": tier_counts["B"],
"discovery_tier_c_count": tier_counts["C"],
"discovery_tier_a_budget": scan_cfg["tier_a_budget"],
"discovery_tier_b_budget": scan_cfg["tier_b_budget"],
"discovery_bypass_extra_budget": scan_cfg["bypass_extra_budget"],
"main_kline_min_volume_usd": scan_cfg["main_min_volume_usd"],
"discovery_min_volume_usd": scan_cfg["discovery_min_volume_usd"],
"bypass_kline_min_volume_usd": low_turnover_threshold,
"emergency_main_kline_scan_budget": scan_cfg["emergency_main_max_symbols"],
"emergency_bypass_kline_scan_budget": scan_cfg["emergency_bypass_max_symbols"],

View File

@ -10,9 +10,8 @@ from __future__ import annotations
from app.core.factor_roles import CONFIRMATION, ENTRY, PREREQUISITE, RISK, TRIGGER
from app.core.strategy_contract import StrategySignal, current_strategy_version
from app.core.strategy_registry import (
COMPRESSION_BREAKOUT_4H_STRATEGY,
INTRADAY_MOMENTUM_15M_STRATEGY,
VOLUME_IGNITION_1H_STRATEGY,
LONG_MOMENTUM_BREAKOUT_STRATEGY,
LONG_SECOND_WAVE_PULLBACK_STRATEGY,
)
@ -53,24 +52,25 @@ def _status_for_entry(result: dict, entry_plan: dict | None = None, *, require_c
return "observe", reasons
def build_volume_ignition_1h_signal(*, symbol: str, result: dict, entry_plan: dict | None = None) -> StrategySignal | None:
def build_long_momentum_breakout_signal(*, symbol: str, result: dict, entry_plan: dict | None = None) -> StrategySignal | None:
text = _signals_text(result)
has_vp = "量价齐飞" in text or ("连续" in text and "放量" in text)
has_breakout = "1H" in text and ("突破" in text or "起爆" in text)
if not (has_vp or has_breakout):
has_15m = any(key in text for key in ("15min", "15m", "短周期", "5m/15m"))
has_1h_participation = "量价齐飞" in text or ("连续" in text and "放量" in text) or ("1H" in text and ("突破" in text or "起爆" in text))
if not (has_15m and has_1h_participation):
return None
status, reasons = _status_for_entry(result, entry_plan, require_current_trigger=False, allow_wait=True)
status, reasons = _status_for_entry(result, entry_plan, require_current_trigger=True, allow_wait=True)
score = _safe_float(result.get("score"))
confidence = min(100.0, max(0.0, score * 7 + (12 if _has_current_trigger(result) else 0)))
confidence = min(100.0, max(0.0, score * 7 + 18))
trigger = {
"factor_code": "vp_fly_1h_current" if has_vp else "ignition_1h_current",
"factor_label": "1H放量突破启动",
"factor_code": "momentum_breakout_15m_1h",
"factor_label": "15m突破 + 1H放量/波动增强",
"has_current_trigger": _has_current_trigger(result),
"trigger_status": _trigger_context(result).get("trigger_status") or "",
"entry_action": (entry_plan or {}).get("entry_action") or "",
"opportunity_level": "intraday",
}
return StrategySignal(
strategy_code=VOLUME_IGNITION_1H_STRATEGY,
strategy_code=LONG_MOMENTUM_BREAKOUT_STRATEGY,
strategy_version=current_strategy_version(),
symbol=symbol,
direction="long",
@ -79,8 +79,9 @@ def build_volume_ignition_1h_signal(*, symbol: str, result: dict, entry_plan: di
score=score,
trigger=trigger,
factor_roles={
"vp_fly_1h_current": TRIGGER,
"momentum_breakout_15m_1h": TRIGGER,
"volume_consecutive_1h": CONFIRMATION,
"vp_fly_1h_current": CONFIRMATION,
"breakout_15m_current": ENTRY,
"pullback_15m_confirm": ENTRY,
"false_breakout": RISK,
@ -88,24 +89,25 @@ def build_volume_ignition_1h_signal(*, symbol: str, result: dict, entry_plan: di
},
entry_plan=entry_plan or {},
risk_plan={
"invalid_if": ["放量后不能延续", "15m假突破", "跌回启动K低点", "RR不足"],
"invalid_if": ["15m跌回突破K低点", "1H放量后不能延续", "快速冲高回落", "RR不足"],
"risk_reasons": reasons,
},
decision_log={"module": VOLUME_IGNITION_1H_STRATEGY, "decision": status, "reasons": reasons},
decision_log={"module": LONG_MOMENTUM_BREAKOUT_STRATEGY, "decision": status, "reasons": reasons},
)
def build_compression_breakout_4h_signal(*, symbol: str, result: dict, entry_plan: dict | None = None) -> StrategySignal | None:
def build_long_second_wave_pullback_signal(*, symbol: str, result: dict, entry_plan: dict | None = None) -> StrategySignal | None:
text = _signals_text(result)
has_compression = any(key in text for key in ("静K", "压缩", "布林收窄", "底部抬高"))
has_breakout_context = any(key in text for key in ("突破", "起爆", "量价齐飞", "回踩"))
if not (has_compression and has_breakout_context):
market_context = (result or {}).get("market_context") or {}
has_first_wave = bool((result or {}).get("top_gainer_24h") or market_context.get("top_gainer_24h")) or "24h强势榜" in text or "量价齐飞" in text or "放量" in text
has_pullback_context = any(key in text for key in ("回踩", "箱体", "EMA", "前高", "底部抬高", "静K"))
if not (has_first_wave and has_pullback_context):
return None
status, reasons = _status_for_entry(result, entry_plan, require_current_trigger=False, allow_wait=True)
score = _safe_float(result.get("score"))
confidence = min(100.0, max(0.0, score * 6 + (10 if "底部" in text else 0) + (8 if _has_current_trigger(result) else 0)))
confidence = min(100.0, max(0.0, score * 6 + (12 if "24h强势榜" in text else 0) + (8 if _has_current_trigger(result) else 0)))
return StrategySignal(
strategy_code=COMPRESSION_BREAKOUT_4H_STRATEGY,
strategy_code=LONG_SECOND_WAVE_PULLBACK_STRATEGY,
strategy_version=current_strategy_version(),
symbol=symbol,
direction="long",
@ -113,64 +115,37 @@ def build_compression_breakout_4h_signal(*, symbol: str, result: dict, entry_pla
confidence=confidence,
score=score,
trigger={
"factor_code": "compression_surge_4h",
"factor_label": "4H压缩蓄力突破",
"factor_code": "second_wave_pullback_1h",
"factor_label": "强势第一波后回踩承接",
"has_current_trigger": _has_current_trigger(result),
"trigger_status": _trigger_context(result).get("trigger_status") or "",
"opportunity_level": "swing_1_3d",
},
factor_roles={
"static_accum_4h": PREREQUISITE,
"higher_lows_4h": PREREQUISITE,
"compression_surge_4h": TRIGGER,
"ignition_4h_current": CONFIRMATION,
"cex_top_gainer": PREREQUISITE,
"vp_fly_1h_current": CONFIRMATION,
"box_breakout_pullback_1h": CONFIRMATION,
"box_breakout_pullback_4h": CONFIRMATION,
"second_wave_pullback_1h": TRIGGER,
"pullback_15m_confirm": ENTRY,
"false_breakout": RISK,
},
entry_plan=entry_plan or {},
risk_plan={
"invalid_if": ["突破后跌回压缩区间", "回踩放量跌破", "无量反抽失败", "市场风险升高"],
"invalid_if": ["跌回第一波启动区", "回踩放量跌破", "高位追涨无承接", "RR不足"],
"risk_reasons": reasons,
},
decision_log={"module": COMPRESSION_BREAKOUT_4H_STRATEGY, "decision": status, "reasons": reasons},
decision_log={"module": LONG_SECOND_WAVE_PULLBACK_STRATEGY, "decision": status, "reasons": reasons},
)
def build_volume_ignition_1h_signal(*, symbol: str, result: dict, entry_plan: dict | None = None) -> StrategySignal | None:
return None
def build_compression_breakout_4h_signal(*, symbol: str, result: dict, entry_plan: dict | None = None) -> StrategySignal | None:
return None
def build_intraday_momentum_15m_signal(*, symbol: str, result: dict, entry_plan: dict | None = None) -> StrategySignal | None:
text = _signals_text(result)
has_short_tf = any(key in text for key in ("15min", "15m", "短周期", "5m/15m"))
if not has_short_tf:
return None
status, reasons = _status_for_entry(result, entry_plan, require_current_trigger=True, allow_wait=False)
score = _safe_float(result.get("score"))
confidence = min(100.0, max(0.0, score * 6 + 18))
return StrategySignal(
strategy_code=INTRADAY_MOMENTUM_15M_STRATEGY,
strategy_version=current_strategy_version(),
symbol=symbol,
direction="long",
status=status,
confidence=confidence,
score=score,
trigger={
"factor_code": "short_tf_15m_ignition",
"factor_label": "15m日内动量延续",
"has_current_trigger": _has_current_trigger(result),
"trigger_status": _trigger_context(result).get("trigger_status") or "",
},
factor_roles={
"short_tf_5m_ignition": PREREQUISITE,
"short_tf_15m_ignition": TRIGGER,
"short_tf_resonance": CONFIRMATION,
"vp_fly_1h_current": CONFIRMATION,
"breakout_15m_current": ENTRY,
"false_breakout": RISK,
"trend_exhaustion": RISK,
},
entry_plan=entry_plan or {},
risk_plan={
"invalid_if": ["15m跌回突破K", "短周期量能衰减", "快速冲高回落", "RR不足"],
"risk_reasons": reasons,
},
decision_log={"module": INTRADAY_MOMENTUM_15M_STRATEGY, "decision": status, "reasons": reasons},
)
return None

View File

@ -1,171 +1,21 @@
"""4H box breakout retest strategy candidate.
"""Retired box-retest strategy builders.
The detector factor is not the strategy. This module wraps that factor with
market, freshness, entry-distance and risk semantics to produce a standard
StrategySignal.
Box/retest detection remains available as a factor inside the new short-term
strategy pool, but these legacy builders no longer emit standalone signals.
"""
from __future__ import annotations
from app.core.factor_roles import ENTRY, RISK, TRIGGER
from app.core.strategy_contract import StrategySignal, current_strategy_version
from app.core.strategy_registry import BOX_RETEST_1H_STRATEGY, BOX_RETEST_4H_STRATEGY
from app.core.strategy_contract import StrategySignal
def _safe_float(value, default=0.0) -> float:
try:
if value is None or value == "":
return default
return float(value)
except Exception:
return default
def build_box_retest_4h_signal(**kwargs) -> StrategySignal | None:
return None
def _safe_int(value, default=999) -> int:
try:
if value is None or value == "":
return default
return int(value)
except Exception:
return default
def _build_box_retest_signal(
*,
symbol: str,
current_price: float,
detection: dict,
strategy_code: str,
timeframe_label: str,
factor_code: str,
factor_label: str,
max_fresh_age_bars: int,
max_chase_distance_pct: float,
entry_plan: dict | None = None,
market_regime: dict | None = None,
decision_log: dict | None = None,
) -> StrategySignal | None:
if not (detection or {}).get("detected"):
return None
entry_plan = entry_plan or {}
market_regime = market_regime or {}
risk_level = str(market_regime.get("risk_level") or "medium").lower()
entry_zone = _safe_float(detection.get("entry_zone"))
current_price = _safe_float(current_price)
distance_pct = (current_price / entry_zone - 1) * 100 if entry_zone > 0 and current_price > 0 else 0
age = _safe_int(detection.get("pullback_age_bars"))
quality = str(detection.get("quality") or "")
status = "candidate"
reasons = []
if risk_level == "critical":
status = "observe"
reasons.append("全局风险 critical仅观察")
if age > max_fresh_age_bars:
status = "observe"
reasons.append(f"回踩已过去 {age}{timeframe_label},时效偏旧")
if quality not in {"良好", "优质"}:
status = "observe"
reasons.append(f"形态质量 {quality or '未知'},不直接交易")
if distance_pct > max_chase_distance_pct:
status = "observe"
reasons.append(f"当前价离箱体上沿 {distance_pct:.1f}%,禁止追高")
score = _safe_float(detection.get("score"))
confidence = min(100.0, max(0.0, score * 8))
trigger = {
"factor_code": factor_code,
"factor_label": factor_label,
"box_high": detection.get("box_high"),
"box_low": detection.get("box_low"),
"entry_zone": detection.get("entry_zone"),
"stop_level": detection.get("stop_level"),
"pullback_kind": detection.get("pullback_kind"),
"pullback_age_bars": age,
"quality": quality,
"distance_to_entry_zone_pct": round(distance_pct, 4),
"market_regime": market_regime.get("regime") or "",
"risk_level": risk_level,
}
risk_plan = {
"invalid_if": ["跌回箱体", "放量冲高回落", "回踩过久", "全局风险升为critical"],
"stop_level": detection.get("stop_level"),
"risk_reasons": reasons,
}
return StrategySignal(
strategy_code=strategy_code,
strategy_version=current_strategy_version(),
symbol=symbol,
direction="long",
status=status,
confidence=confidence,
score=score,
trigger=trigger,
factor_roles={
factor_code: TRIGGER,
"pullback_15m_confirm": ENTRY,
"trend_exhaustion": RISK,
"false_breakout": RISK,
},
entry_plan=entry_plan,
risk_plan=risk_plan,
decision_log=decision_log or {
"module": "box_retest_4h_v1",
"decision": status,
"reasons": reasons,
},
)
def build_box_retest_4h_signal(
*,
symbol: str,
current_price: float,
detection: dict,
entry_plan: dict | None = None,
market_regime: dict | None = None,
decision_log: dict | None = None,
) -> StrategySignal | None:
return _build_box_retest_signal(
symbol=symbol,
current_price=current_price,
detection=detection,
strategy_code=BOX_RETEST_4H_STRATEGY,
timeframe_label="4H",
factor_code="box_breakout_pullback_4h",
factor_label="4H箱体突破回踩",
max_fresh_age_bars=4,
max_chase_distance_pct=8,
entry_plan=entry_plan,
market_regime=market_regime,
decision_log=decision_log,
)
def build_box_retest_1h_signal(
*,
symbol: str,
current_price: float,
detection: dict,
entry_plan: dict | None = None,
market_regime: dict | None = None,
decision_log: dict | None = None,
) -> StrategySignal | None:
return _build_box_retest_signal(
symbol=symbol,
current_price=current_price,
detection=detection,
strategy_code=BOX_RETEST_1H_STRATEGY,
timeframe_label="1H",
factor_code="box_breakout_pullback_1h",
factor_label="1H箱体突破回踩",
max_fresh_age_bars=6,
max_chase_distance_pct=6,
entry_plan=entry_plan,
market_regime=market_regime,
decision_log=decision_log,
)
def build_box_retest_1h_signal(**kwargs) -> StrategySignal | None:
return None
def build_box_retest_signal(**kwargs) -> StrategySignal | None:
"""Backward-compatible alias for the original 4H strategy builder."""
return build_box_retest_4h_signal(**kwargs)
return None

View File

@ -56,6 +56,12 @@ screener:
enabled: true
main_min_volume_usd: 5000000
bypass_min_volume_usd: 2000000
discovery_min_volume_usd: 2000000
tier_a_budget: 90
tier_b_budget: 90
bypass_extra_budget: 50
tier_a_min_score: 45
tier_b_min_score: 20
short_tf_min_volume_usd: 5000000
short_tf_min_abs_change_pct: 1.0
short_tf_high_volume_usd: 20000000

View File

@ -943,7 +943,7 @@ function renderRecCard(r) {
return '<span class="sig '+cls+'">'+displaySignalText(s)+'</span>';
}).join('');
var score = r.rec_score||0, st = scoreTier(score), ver = r.strategy_version||'';
var strategyLabel = r.strategy_name || ({main_composite_v1:'综合确认策略',box_retest_1h_v1:'1H箱体突破回踩',box_retest_4h_v1:'4H箱体突破回踩',breakdown_retest_short_1h_v1:'1H破位反抽做空',volume_ignition_1h_v1:'1H放量突破启动',compression_breakout_4h_v1:'4H压缩蓄力突破',intraday_momentum_15m_v1:'15m日内动量延续'}[r.strategy_code||''] || r.strategy_code || '');
var strategyLabel = r.strategy_name || ({long_momentum_breakout_15m_1h_v1:'多头动量启动',long_second_wave_pullback_1h_v1:'多头二波回踩',short_breakdown_retest_1h_v1:'空头破位反抽'}[r.strategy_code||''] || r.strategy_code || '');
var hasQualityGate = ep.entry_quality_gate && Array.isArray(ep.entry_quality_gate.reasons) && ep.entry_quality_gate.reasons.length;
var entryLabel = isWait ? (side === 'short' ? '反抽参考' : '回踩参考') : (hasQualityGate ? '失效参考' : '参考价位');
var entryRef = (isWait || hasQualityGate) ? (ep.entry_price || r.entry_price || 0) : (r.entry_price || ep.entry_price || 0);

View File

@ -228,7 +228,7 @@ function renderOrders(items){if(!items.length){$('orderRows').innerHTML='<tr><td
'<td><button class="row-action" type="button" onclick="deleteOrder('+Number(x.id)+',\''+esc(String(x.symbol||'')).replace(/'/g,'&#39;')+'\')">删除</button></td>'+
'</tr>'}).join('')}
function protectionCell(x){var trail=Number(x.trailing_stop||0);var trailHtml=trail>0?'<span class="trail-line">移动止盈 $'+fmt(trail,6)+'</span>':'<span class="trail-line off">移动止盈未启动</span>';return '<div class="riskline"><span>TP $'+fmt(x.tp1,6)+'</span><span>SL $'+fmt(x.stop_loss,6)+'</span>'+trailHtml+'</div>'}
function strategyName(x){return (x&&x.strategy_name)||({main_composite_v1:'综合确认策略',box_retest_1h_v1:'1H箱体突破回踩',box_retest_4h_v1:'4H箱体突破回踩',volume_ignition_1h_v1:'1H放量突破启动',compression_breakout_4h_v1:'4H压缩蓄力突破',intraday_momentum_15m_v1:'15m日内动量延续',breakdown_retest_short_1h_v1:'1H破位反抽做空'}[(x&&x.strategy_code)||'']||((x&&x.strategy_code)||'--'))}
function strategyName(x){return (x&&x.strategy_name)||({long_momentum_breakout_15m_1h_v1:'多头动量启动',long_second_wave_pullback_1h_v1:'多头二波回踩',short_breakdown_retest_1h_v1:'空头破位反抽'}[(x&&x.strategy_code)||'']||((x&&x.strategy_code)||'--'))}
async function loadOpenTrades(nextOffset){openOffset=Math.max(0,nextOffset||0);$('openRows').innerHTML='<tr><td colspan="15" class="loading">加载中...</td></tr>';try{var d=await api('/api/paper-trading/trades?limit='+LIMIT+'&offset='+openOffset+'&status=open'+tradeQuery());openTotal=d.total||0;renderTradeRows('openRows',d.items||[],'暂无持仓中的策略交易');renderOpenPager()}catch(e){$('openRows').innerHTML='<tr><td colspan="15" class="empty">'+esc(e.message)+'</td></tr>'}}
async function loadClosedTrades(){$('closedTradeRows').innerHTML='<tr><td colspan="15" class="loading">加载中...</td></tr>';try{var d=await api('/api/paper-trading/trades?limit=80&offset=0&status=closed'+tradeQuery());renderTradeRows('closedTradeRows',d.items||[],'暂无已完结持仓')}catch(e){$('closedTradeRows').innerHTML='<tr><td colspan="15" class="empty">'+esc(e.message)+'</td></tr>'}}
async function loadCanceledOrders(){$('canceledOrderRows').innerHTML='<tr><td colspan="11" class="loading">加载中...</td></tr>';try{var sets=await Promise.all(['canceled','expired','rejected'].map(function(s){return api('/api/paper-trading/orders?limit=50&offset=0&status='+s+tradeQuery())}));var items=[];sets.forEach(function(d){items=items.concat(d.items||[])});items.sort(function(a,b){return String(b.updated_at||b.created_at).localeCompare(String(a.updated_at||a.created_at))});renderCanceledOrders(items)}catch(e){$('canceledOrderRows').innerHTML='<tr><td colspan="11" class="empty">'+esc(e.message)+'</td></tr>'}}

View File

@ -2,14 +2,13 @@ from app.core.factor_roles import RISK, TRIGGER, factor_role, factor_roles_for_c
from app.core.signal_direction import sanitize_factor_breakdown_for_side, sanitize_signals_for_side
from app.core.strategy_contract import StrategySignal, default_main_composite_signal
from app.core.strategy_registry import (
BOX_RETEST_1H_STRATEGY,
BOX_RETEST_4H_STRATEGY,
COMPRESSION_BREAKOUT_4H_STRATEGY,
BREAKDOWN_RETEST_SHORT_1H_STRATEGY,
INTRADAY_MOMENTUM_15M_STRATEGY,
LONG_MOMENTUM_BREAKOUT_STRATEGY,
LONG_SECOND_WAVE_PULLBACK_STRATEGY,
MAIN_COMPOSITE_STRATEGY,
VOLUME_IGNITION_1H_STRATEGY,
SHORT_BREAKDOWN_RETEST_STRATEGY,
is_strategy_allowed_for_side,
registered_strategy_codes,
strategy_direction,
strategy_label,
)
@ -18,6 +17,8 @@ from app.db.paper_trading import _open_trade, _order_payload_from_rec
from app.db.strategy_signal_queries import insert_strategy_signal
from app.db.strategy_insights import evaluate_strategy_decision
from app.strategies.altcoin_breakout import (
build_long_momentum_breakout_signal,
build_long_second_wave_pullback_signal,
build_compression_breakout_4h_signal,
build_intraday_momentum_15m_signal,
build_volume_ignition_1h_signal,
@ -37,7 +38,7 @@ def test_factor_roles_never_promote_unknown_to_trigger():
}
def test_default_main_composite_strategy_signal_is_stable():
def test_active_strategy_pool_only_contains_short_term_three_strategies():
signal = default_main_composite_signal(
symbol="AAA/USDT",
score=70,
@ -45,23 +46,26 @@ def test_default_main_composite_strategy_signal_is_stable():
entry_plan={"entry_action": "观察"},
).to_json_dict()
assert signal["strategy_code"] == MAIN_COMPOSITE_STRATEGY
assert signal["strategy_name"] == "综合确认策略"
assert registered_strategy_codes() == [
LONG_MOMENTUM_BREAKOUT_STRATEGY,
LONG_SECOND_WAVE_PULLBACK_STRATEGY,
SHORT_BREAKDOWN_RETEST_STRATEGY,
]
assert signal["strategy_code"] == LONG_MOMENTUM_BREAKOUT_STRATEGY
assert signal["strategy_name"] == "多头动量启动"
assert signal["factor_roles"]["vp_fly_1h_current"] == "trigger"
assert strategy_label(BOX_RETEST_1H_STRATEGY) == "1H箱体突破回踩"
assert strategy_label(VOLUME_IGNITION_1H_STRATEGY) == "1H放量突破启动"
assert strategy_label(COMPRESSION_BREAKOUT_4H_STRATEGY) == "4H压缩蓄力突破"
assert strategy_label(INTRADAY_MOMENTUM_15M_STRATEGY) == "15m日内动量延续"
assert strategy_label(BREAKDOWN_RETEST_SHORT_1H_STRATEGY) == "1H破位反抽做空"
assert strategy_direction(VOLUME_IGNITION_1H_STRATEGY) == "long"
assert strategy_label(LONG_MOMENTUM_BREAKOUT_STRATEGY) == "多头动量启动"
assert strategy_label(LONG_SECOND_WAVE_PULLBACK_STRATEGY) == "多头二波回踩"
assert strategy_label(SHORT_BREAKDOWN_RETEST_STRATEGY) == "空头破位反抽"
assert strategy_direction(LONG_MOMENTUM_BREAKOUT_STRATEGY) == "long"
assert strategy_direction(BREAKDOWN_RETEST_SHORT_1H_STRATEGY) == "short"
assert is_strategy_allowed_for_side(MAIN_COMPOSITE_STRATEGY, "short") is True
assert is_strategy_allowed_for_side(VOLUME_IGNITION_1H_STRATEGY, "short") is False
assert is_strategy_allowed_for_side(MAIN_COMPOSITE_STRATEGY, "short") is False
assert is_strategy_allowed_for_side(LONG_MOMENTUM_BREAKOUT_STRATEGY, "short") is False
assert is_strategy_allowed_for_side(BREAKDOWN_RETEST_SHORT_1H_STRATEGY, "short") is True
def test_volume_ignition_strategy_builds_independent_signal():
signal = build_volume_ignition_1h_signal(
def test_long_momentum_breakout_strategy_builds_independent_signal():
signal = build_long_momentum_breakout_signal(
symbol="VOL/USDT",
result={
"score": 8,
@ -73,10 +77,28 @@ def test_volume_ignition_strategy_builds_independent_signal():
)
payload = signal.to_json_dict()
assert payload["strategy_code"] == VOLUME_IGNITION_1H_STRATEGY
assert payload["strategy_code"] == LONG_MOMENTUM_BREAKOUT_STRATEGY
assert payload["status"] == "candidate"
assert payload["trigger"]["factor_code"] == "vp_fly_1h_current"
assert payload["factor_roles"]["vp_fly_1h_current"] == "trigger"
assert payload["trigger"]["factor_code"] == "momentum_breakout_15m_1h"
assert payload["factor_roles"]["momentum_breakout_15m_1h"] == "trigger"
def test_legacy_strategy_builders_no_longer_emit_signals():
assert build_volume_ignition_1h_signal(
symbol="OLD/USDT",
result={"score": 9, "signals": ["1H量价齐飞", "15min即刻入场"], "trigger_context": {"current_triggers": ["15m"]}},
entry_plan={"entry_action": "可即刻买入"},
) is None
assert build_compression_breakout_4h_signal(
symbol="OLD/USDT",
result={"score": 9, "signals": ["4H静K压缩突破箱体上沿"]},
entry_plan={"entry_action": "等回踩"},
) is None
assert build_intraday_momentum_15m_signal(
symbol="OLD/USDT",
result={"score": 9, "signals": ["15min强突破"], "trigger_context": {"current_triggers": ["15m"]}},
entry_plan={"entry_action": "可即刻买入"},
) is None
def test_long_strategy_cannot_emit_short_signal():
@ -84,7 +106,7 @@ def test_long_strategy_cannot_emit_short_signal():
with pytest.raises(ValueError):
StrategySignal(
strategy_code=VOLUME_IGNITION_1H_STRATEGY,
strategy_code=LONG_MOMENTUM_BREAKOUT_STRATEGY,
symbol="BAD/USDT",
direction="short",
factor_roles={"vp_fly_1h_current": "trigger"},
@ -136,35 +158,35 @@ def test_short_factor_breakdown_removes_long_only_positive_factors():
}
def test_compression_breakout_strategy_requires_structure_and_breakout_context():
signal = build_compression_breakout_4h_signal(
def test_second_wave_strategy_requires_first_wave_and_pullback_context():
signal = build_long_second_wave_pullback_signal(
symbol="QUIET/USDT",
result={"score": 6, "signals": ["4H静K压缩突破箱体上沿"], "entry_plan": {"entry_action": "等回踩"}},
result={"score": 6, "signals": ["24h强势榜异动", "1H箱体突破回踩"], "entry_plan": {"entry_action": "等回踩"}},
entry_plan={"entry_action": "等回踩", "entry_price": 1.0},
)
payload = signal.to_json_dict()
assert payload["strategy_code"] == COMPRESSION_BREAKOUT_4H_STRATEGY
assert payload["strategy_code"] == LONG_SECOND_WAVE_PULLBACK_STRATEGY
assert payload["status"] == "candidate"
assert payload["factor_roles"]["compression_surge_4h"] == "trigger"
assert build_compression_breakout_4h_signal(
assert payload["factor_roles"]["second_wave_pullback_1h"] == "trigger"
assert build_long_second_wave_pullback_signal(
symbol="NOBOX/USDT",
result={"score": 6, "signals": ["1H量价齐飞"], "entry_plan": {"entry_action": "可即刻买入"}},
entry_plan={"entry_action": "可即刻买入"},
) is None
def test_intraday_momentum_strategy_requires_current_trigger():
stale = build_intraday_momentum_15m_signal(
def test_long_momentum_strategy_requires_current_trigger():
stale = build_long_momentum_breakout_signal(
symbol="FAST/USDT",
result={"score": 7, "signals": ["15m短周期启动"], "entry_plan": {"entry_action": "可即刻买入"}},
result={"score": 7, "signals": ["15m短周期启动", "1H量价齐飞"], "entry_plan": {"entry_action": "可即刻买入"}},
entry_plan={"entry_action": "可即刻买入"},
).to_json_dict()
fresh = build_intraday_momentum_15m_signal(
fresh = build_long_momentum_breakout_signal(
symbol="FAST/USDT",
result={
"score": 7,
"signals": ["15min强突破"],
"signals": ["15min强突破", "1H量价齐飞"],
"trigger_context": {"current_triggers": ["15m突破"], "trigger_status": "current"},
"entry_plan": {"entry_action": "可即刻买入"},
},
@ -263,12 +285,12 @@ def test_strategy_evaluation_recommends_promote_or_pause():
def test_strategy_signal_insert_and_recommendation_lineage(pg_conn):
signal = insert_strategy_signal(
StrategySignal(
strategy_code=BOX_RETEST_4H_STRATEGY,
strategy_code=LONG_SECOND_WAVE_PULLBACK_STRATEGY,
symbol="BOX/USDT",
score=10,
confidence=80,
trigger={"factor_code": "box_breakout_pullback_4h"},
factor_roles={"box_breakout_pullback_4h": "trigger"},
trigger={"factor_code": "second_wave_pullback_1h"},
factor_roles={"second_wave_pullback_1h": "trigger"},
entry_plan={"entry_action": "等回踩", "entry_price": 1.0},
)
)
@ -289,9 +311,9 @@ def test_strategy_signal_insert_and_recommendation_lineage(pg_conn):
)
row = pg_conn.execute("SELECT * FROM recommendation WHERE id=%s", (rec_id,)).fetchone()
assert row["strategy_code"] == BOX_RETEST_4H_STRATEGY
assert row["strategy_code"] == LONG_SECOND_WAVE_PULLBACK_STRATEGY
assert row["strategy_signal_id"] == signal["strategy_signal_id"]
assert "box_breakout_pullback_4h" in row["factor_roles_json"]
assert "second_wave_pullback_1h" in row["factor_roles_json"]
def test_box_retest_strategy_preserves_zero_age_as_fresh():
@ -308,11 +330,7 @@ def test_box_retest_strategy_preserves_zero_age_as_fresh():
},
market_regime={"regime": "altcoin_rotation", "risk_level": "medium"},
)
payload = signal.to_json_dict()
assert payload["status"] == "candidate"
assert payload["trigger"]["pullback_age_bars"] == 0
assert payload["risk_plan"]["risk_reasons"] == []
assert signal is None
def test_paper_order_and_trade_inherit_strategy_lineage(pg_conn):
@ -327,15 +345,15 @@ def test_paper_order_and_trade_inherit_strategy_lineage(pg_conn):
"execution_status": "buy_now",
"action_status": "可即刻买入",
"strategy_version": "v-test",
"strategy_code": BOX_RETEST_4H_STRATEGY,
"strategy_code": LONG_SECOND_WAVE_PULLBACK_STRATEGY,
"strategy_signal_id": 42,
"strategy_snapshot_json": '{"strategy_code":"box_retest_4h_v1"}',
"factor_roles_json": '{"box_breakout_pullback_4h":"trigger"}',
"strategy_snapshot_json": '{"strategy_code":"long_second_wave_pullback_1h_v1"}',
"factor_roles_json": '{"second_wave_pullback_1h":"trigger"}',
"entry_plan": {"entry_action": "可即刻买入", "entry_price": 1.0, "stop_loss": 0.94, "tp1": 1.12},
}
payload = _order_payload_from_rec(rec, 1.01, "2026-05-27T00:00:00", {"trade_notional_usdt": 100})
assert payload["strategy_code"] == BOX_RETEST_4H_STRATEGY
assert payload["strategy_code"] == LONG_SECOND_WAVE_PULLBACK_STRATEGY
assert payload["strategy_signal_id"] == 42
result = _open_trade(
@ -362,7 +380,7 @@ def test_paper_order_and_trade_inherit_strategy_lineage(pg_conn):
row = pg_conn.execute("SELECT * FROM paper_trades WHERE id=%s", (result["trade_id"],)).fetchone()
event = pg_conn.execute("SELECT * FROM paper_trade_events WHERE trade_id=%s", (result["trade_id"],)).fetchone()
assert row["strategy_code"] == BOX_RETEST_4H_STRATEGY
assert row["strategy_code"] == LONG_SECOND_WAVE_PULLBACK_STRATEGY
assert row["strategy_signal_id"] == 42
assert event["strategy_code"] == BOX_RETEST_4H_STRATEGY
assert strategy_label(row["strategy_code"]) == "4H箱体突破回踩"
assert event["strategy_code"] == LONG_SECOND_WAVE_PULLBACK_STRATEGY
assert strategy_label(row["strategy_code"]) == "多头二波回踩"

View File

@ -5,7 +5,7 @@ import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.core.opportunity_lifecycle import apply_entry_quality_gate
from app.core.strategy_registry import BOX_RETEST_1H_STRATEGY, MAIN_COMPOSITE_STRATEGY
from app.core.strategy_registry import LONG_MOMENTUM_BREAKOUT_STRATEGY, LONG_SECOND_WAVE_PULLBACK_STRATEGY
from app.services.price_tracker import reconcile_buy_signals_after_gate
@ -48,7 +48,7 @@ def test_buy_now_with_bad_rr_sets_real_pullback_price():
assert action == '等回踩'
assert plan['entry_price'] < 0.11455
assert round(plan['entry_price'], 6) == 0.113199
assert round(plan['entry_price'], 6) == 0.11251
assert plan['rr_target_entry'] == plan['entry_price']
assert any('现价不买' in r for r in reasons)
@ -88,29 +88,29 @@ def test_entry_gate_uses_strategy_specific_thresholds():
'risk_reward_ok': True,
'rr1': 2.4,
'entry_trigger_confirmed': True,
'score_components': {'opportunity_score': 12, 'entry_score': 1, 'risk_score': 1},
'score_components': {'opportunity_score': 12, 'entry_score': 2, 'risk_score': 1},
}
main_action, main_plan, _ = apply_entry_quality_gate(
momentum_action, momentum_plan, _ = apply_entry_quality_gate(
action_status='可即刻买入',
entry_plan=dict(base_plan),
signals=['当前15min即刻入场信号'],
current_price=1.0,
market_context={'change_24h': 2.0},
strategy_code=MAIN_COMPOSITE_STRATEGY,
strategy_code=LONG_MOMENTUM_BREAKOUT_STRATEGY,
)
box_action, box_plan, _ = apply_entry_quality_gate(
second_wave_action, second_wave_plan, _ = apply_entry_quality_gate(
action_status='可即刻买入',
entry_plan=dict(base_plan),
signals=['当前15min即刻入场信号'],
current_price=1.0,
market_context={'change_24h': 2.0},
strategy_code=BOX_RETEST_1H_STRATEGY,
strategy_code=LONG_SECOND_WAVE_PULLBACK_STRATEGY,
)
assert main_action != '可即刻买入'
assert main_plan['entry_quality_gate']['strategy_code'] == MAIN_COMPOSITE_STRATEGY
assert box_action == '可即刻买入'
assert box_plan['strategy_code'] == BOX_RETEST_1H_STRATEGY
assert momentum_action != '可即刻买入'
assert momentum_plan['entry_quality_gate']['strategy_code'] == LONG_MOMENTUM_BREAKOUT_STRATEGY
assert second_wave_action == '可即刻买入'
assert second_wave_plan['strategy_code'] == LONG_SECOND_WAVE_PULLBACK_STRATEGY
def test_buy_now_requires_current_trigger_and_no_bearish_flow_risk():

View File

@ -18,6 +18,10 @@ from app.db.paper_trading import (
sync_recommendation,
)
from app.db.recommendation_queries import get_opportunity_detail
from app.core.strategy_registry import (
LONG_MOMENTUM_BREAKOUT_STRATEGY,
SHORT_BREAKDOWN_RETEST_STRATEGY,
)
def _visible_card_text(card: dict) -> str:
@ -72,18 +76,18 @@ def buy_now_rec(monkeypatch):
rec_state="爆发",
rec_score=28,
entry_price=100,
stop_loss=95,
stop_loss=96,
tp1=106,
tp2=112,
signals=["当前15min即刻入场信号"],
entry_plan={
"entry_action": "可即刻买入",
"entry_price": 100,
"stop_loss": 95,
"stop_loss": 96,
"tp1": 106,
"tp2": 112,
"risk_reward_ok": True,
"rr1": 1.2,
"rr1": 1.5,
"entry_trigger_confirmed": True,
},
)
@ -992,7 +996,7 @@ def test_touched_order_uses_order_snapshot_side_for_global_risk(monkeypatch, pg_
) VALUES (
301, 'SNAPSHORT/USDT', '2026-05-16T10:00:00', '蓄力', 88, 105,
'active', 'wait_pullback', '等回踩', 'watch_pool', 0,
95, 115, 122, 'volume_ignition_1h_v1', %s
95, 115, 122, 'long_momentum_breakout_15m_1h_v1', %s
)
""",
(json.dumps({"side": "long", "entry_action": "等回踩", "entry_price": 105, "stop_loss": 95, "tp1": 115, "rr1": 2.0}, ensure_ascii=False),),
@ -1008,12 +1012,12 @@ def test_touched_order_uses_order_snapshot_side_for_global_risk(monkeypatch, pg_
) VALUES (
301, 'SNAPSHORT/USDT', 'short', 'limit', 'pending',
'wait_pullback', '等反抽', 105, 100,
5000, 110, 95, 90, 'v-test', 'breakdown_retest_short_1h_v1',
5000, 110, 95, 90, 'v-test', 'short_breakdown_retest_1h_v1',
0, %s, '{}', %s, '2026-05-16T10:00:00', '2026-05-16T10:00:00', '2026-05-17T10:00:00'
)
""",
(
json.dumps({"strategy_code": "breakdown_retest_short_1h_v1"}, ensure_ascii=False),
json.dumps({"strategy_code": "short_breakdown_retest_1h_v1"}, ensure_ascii=False),
json.dumps({"side": "short", "entry_action": "等反抽", "entry_price": 105, "stop_loss": 110, "tp1": 95, "rr1": 2.0}, ensure_ascii=False),
),
)
@ -1025,10 +1029,10 @@ def test_touched_order_uses_order_snapshot_side_for_global_risk(monkeypatch, pg_
assert result["filled_count"] == 1
assert captured
assert all(x.get("side") == "short" for x in captured)
assert all(x.get("strategy_code") == "breakdown_retest_short_1h_v1" for x in captured)
assert all(x.get("strategy_code") == "short_breakdown_retest_1h_v1" for x in captured)
trade = list_paper_trades()["items"][0]
assert trade["side"] == "short"
assert trade["strategy_code"] == "breakdown_retest_short_1h_v1"
assert trade["strategy_code"] == "short_breakdown_retest_1h_v1"
def test_wait_pullback_order_cancels_when_recommendation_invalid(monkeypatch):

View File

@ -31,7 +31,7 @@ class RecommendationStateMainlineTests(unittest.TestCase):
rec_score=27,
entry_price=0.06557,
stop_loss=0.061846,
tp1=0.071156,
tp1=0.072000,
tp2=0.074881,
sector='',
signals=json.dumps(['15min 回踩确认'], ensure_ascii=False),
@ -54,7 +54,7 @@ class RecommendationStateMainlineTests(unittest.TestCase):
'risk_reward_ok': True,
'rr1': 1.5,
'stop_loss': 0.061846,
'tp1': 0.071156,
'tp1': 0.072000,
'entry_trigger_confirmed': True,
}, ensure_ascii=False),
action_status='等回踩',

View File

@ -92,7 +92,7 @@ def test_review_center_strategy_counts_use_same_closed_trade_window(pg_conn):
old_close = now - timedelta(days=40)
old_open_recent_close = now - timedelta(days=20)
outside_open = now - timedelta(days=60)
strategy_code = "volume_ignition_1h_v1"
strategy_code = "long_momentum_breakout_15m_1h_v1"
pg_conn.execute(
"""

View File

@ -189,6 +189,52 @@ def test_kline_scan_selection_is_rule_based_without_default_count_cap(monkeypatc
assert len(capped) == 2
def test_discovery_priority_queue_promotes_top_gainers_before_kline_scan(monkeypatch):
monkeypatch.setattr(altcoin_screener, "get_burst_threshold", lambda symbol: 4)
monkeypatch.setattr(altcoin_screener, "is_meme_coin", lambda symbol: False)
cfg = {
"main_min_volume_usd": 5_000_000,
"discovery_min_volume_usd": 2_000_000,
"short_tf_high_volume_usd": 20_000_000,
"tier_a_min_score": 45,
"tier_b_min_score": 20,
}
tickers = {
"SLEEP/USDT": {"price": 1, "volume_24h": 30_000_000, "change_24h": 0.2, "high_24h": 1.01, "low_24h": 0.99},
"MOVE/USDT": {"price": 1, "volume_24h": 3_000_000, "change_24h": 12, "high_24h": 1.16, "low_24h": 0.96},
"LOW/USDT": {"price": 1, "volume_24h": 300_000, "change_24h": 50, "high_24h": 1.8, "low_24h": 0.9},
}
queue = altcoin_screener._build_discovery_priority_queue(tickers, recently_screened=set(), cfg=cfg)
by_symbol = {item["symbol"]: item for item in queue}
assert by_symbol["MOVE/USDT"]["tier"] == "A"
assert by_symbol["SLEEP/USDT"]["tier"] in {"B", "A"}
assert "LOW/USDT" not in by_symbol
def test_discovery_tier_selection_respects_a_b_budgets():
queue = [
{"symbol": "A1/USDT", "tier": "A", "score": 90},
{"symbol": "A2/USDT", "tier": "A", "score": 80},
{"symbol": "B1/USDT", "tier": "B", "score": 30},
{"symbol": "B2/USDT", "tier": "B", "score": 25},
{"symbol": "C1/USDT", "tier": "C", "score": 5},
]
selected = altcoin_screener._select_discovery_tiers(queue, tier_a_budget=1, tier_b_budget=1)
bypass = altcoin_screener._select_discovery_tiers(
queue,
tier_a_budget=1,
tier_b_budget=0,
extra_tiers=("B", "C"),
extra_budget=2,
)
assert selected == ["A1/USDT", "B1/USDT"]
assert bypass == ["A1/USDT", "B1/USDT", "B2/USDT"]
def _mock_weights():
return {
"量价齐飞": 5,

View File

@ -0,0 +1,68 @@
import json
from app.db.strategy_sample_cleanup import CONFIRM_TOKEN, cleanup_legacy_strategy_samples
def test_cleanup_legacy_strategy_samples_removes_only_retired_strategy_data(pg_conn):
pg_conn.execute(
"""
INSERT INTO recommendation (
id, symbol, rec_time, rec_state, rec_score, entry_price,
status, execution_status, action_status, display_bucket, entry_triggered,
strategy_code, entry_plan_json
) VALUES
(9001, 'OLD/USDT', '2026-06-04T00:00:00', '爆发', 50, 1,
'active', 'buy_now', '可即刻买入', 'realtime', 1, 'volume_ignition_1h_v1', '{}'),
(9002, 'NEW/USDT', '2026-06-04T00:00:00', '爆发', 50, 1,
'active', 'buy_now', '可即刻买入', 'realtime', 1, 'long_momentum_breakout_15m_1h_v1', '{}')
"""
)
pg_conn.execute(
"""
INSERT INTO strategy_signals (
id, strategy_code, symbol, direction, signal_status, created_at
) VALUES
(9101, 'volume_ignition_1h_v1', 'OLD/USDT', 'long', 'candidate', '2026-06-04T00:00:00'),
(9102, 'long_momentum_breakout_15m_1h_v1', 'NEW/USDT', 'long', 'candidate', '2026-06-04T00:00:00')
"""
)
pg_conn.execute(
"""
INSERT INTO paper_orders (
id, recommendation_id, symbol, side, order_type, status, target_price,
current_price_at_create, notional_usdt, strategy_code, created_at, updated_at, expires_at
) VALUES
(9201, 9001, 'OLD/USDT', 'long', 'limit', 'pending', 1, 1, 5000, 'volume_ignition_1h_v1', '2026-06-04T00:00:00', '2026-06-04T00:00:00', '2026-06-05T00:00:00'),
(9202, 9002, 'NEW/USDT', 'long', 'limit', 'pending', 1, 1, 5000, 'long_momentum_breakout_15m_1h_v1', '2026-06-04T00:00:00', '2026-06-04T00:00:00', '2026-06-05T00:00:00')
"""
)
pg_conn.execute(
"""
INSERT INTO paper_trades (
id, recommendation_id, symbol, side, status, opened_at, entry_price,
qty, notional_usdt, strategy_code, created_at, updated_at
) VALUES
(9301, 9001, 'OLD/USDT', 'long', 'open', '2026-06-04T00:00:00', 1, 1, 5000, 'volume_ignition_1h_v1', '2026-06-04T00:00:00', '2026-06-04T00:00:00'),
(9302, 9002, 'NEW/USDT', 'long', 'open', '2026-06-04T00:00:00', 1, 1, 5000, 'long_momentum_breakout_15m_1h_v1', '2026-06-04T00:00:00', '2026-06-04T00:00:00')
"""
)
pg_conn.execute(
"""
INSERT INTO paper_trade_events (
id, trade_id, recommendation_id, symbol, event_type, event_time, detail_json, strategy_code
) VALUES
(9401, 9301, 9001, 'OLD/USDT', 'open', '2026-06-04T00:00:00', '{}', 'volume_ignition_1h_v1'),
(9402, 9302, 9002, 'NEW/USDT', 'open', '2026-06-04T00:00:00', '{}', 'long_momentum_breakout_15m_1h_v1')
"""
)
pg_conn.commit()
result = cleanup_legacy_strategy_samples(confirm=CONFIRM_TOKEN, create_backup=False)
assert result["deleted"]["recommendation"] == 1
assert result["deleted"]["strategy_signals"] == 1
assert result["deleted"]["paper_orders"] == 1
assert result["deleted"]["paper_trades"] == 1
assert result["deleted"]["paper_trade_events"] >= 1
assert pg_conn.execute("SELECT COUNT(*) FROM recommendation WHERE symbol='OLD/USDT'").fetchone()[0] == 0
assert pg_conn.execute("SELECT COUNT(*) FROM recommendation WHERE symbol='NEW/USDT'").fetchone()[0] == 1