alphax/tests/test_tracker_terminal_action_guard.py
2026-05-20 00:57:46 +08:00

115 lines
4.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if PROJECT_DIR not in sys.path:
sys.path.insert(0, PROJECT_DIR)
from app.core.opportunity_lifecycle import apply_entry_quality_gate
from app.db import altcoin_db
def test_terminal_recommendation_action_status_cannot_be_overwritten_by_entry_signal(monkeypatch, tmp_path):
db_path = tmp_path / "altcoin_monitor.db"
monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path))
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="NOT/USDT",
rec_state="爆发",
rec_score=10,
entry_price=0.000619,
stop_loss=0.000521,
tp1=0.000684,
tp2=0.000726,
entry_plan={"entry_action": "持有", "entry_price": 0.000628},
)
conn = altcoin_db.get_conn()
conn.execute(
"""
UPDATE recommendation
SET action_status='持有', execution_status='holding',
display_bucket='position', lifecycle_state='holding', entry_triggered=1
WHERE id=%s
""",
(rec_id,),
)
conn.commit()
conn.close()
altcoin_db.update_recommendation_tracking(rec_id, 0.000685)
# 即使后续动态入场逻辑误传“可即刻买入”DB 层也必须保持止盈状态。
altcoin_db.update_recommendation_action_status(rec_id, "可即刻买入")
rows = altcoin_db.get_all_recommendations(limit=10)
rec = next(r for r in rows if r["id"] == rec_id)
assert rec["status"] == "hit_tp1"
assert rec["action_status"] == "止盈1"
assert rec["execution_status"] == "completed"
def test_ws_tracker_does_not_emit_entry_signal_for_closed_recommendation():
action, plan, reasons = apply_entry_quality_gate(
action_status="可即刻买入",
entry_plan={"entry_action": "等回踩", "entry_price": 0.000628, "stop_loss": 0.000521, "tp1": 0.000684, "rr1": 1.8},
signals=["15min 即刻入场信号"],
current_price=0.000628,
)
assert action in ("等回踩", "观察")
assert action != "可即刻买入"
assert plan["entry_quality_gate"]["blocked_action"] == "可即刻买入"
def test_watch_pool_tracking_does_not_mark_stopped_out(monkeypatch, tmp_path):
db_path = tmp_path / "altcoin_monitor.db"
monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path))
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="STORJ/USDT",
rec_state="爆发",
rec_score=12,
entry_price=0.1449,
stop_loss=0.12042,
tp1=0.177279,
tp2=0.206264,
entry_plan={"entry_action": "观察", "entry_price": 0.1338, "stop_loss": 0.12042, "tp1": 0.177279},
)
altcoin_db.update_recommendation_tracking(rec_id, 0.119)
rows = altcoin_db.get_all_recommendations(limit=10)
rec = next(r for r in rows if r["id"] == rec_id)
assert rec["status"] == "active"
assert rec["execution_status"] == "observe"
assert rec["entry_triggered"] == 0
assert rec["recommendation_result"] == "pending"
def test_watch_pool_tracking_does_not_mark_take_profit(monkeypatch, tmp_path):
db_path = tmp_path / "altcoin_monitor.db"
monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path))
altcoin_db.init_db()
rec_id = altcoin_db.create_recommendation(
symbol="WATCH/USDT",
rec_state="爆发",
rec_score=12,
entry_price=100.0,
stop_loss=90.0,
tp1=105.0,
tp2=110.0,
entry_plan={"entry_action": "观察", "entry_price": 100.0, "stop_loss": 90.0, "tp1": 105.0, "tp2": 110.0},
)
altcoin_db.update_recommendation_tracking(rec_id, 112.0)
rows = altcoin_db.get_all_recommendations(limit=10)
rec = next(r for r in rows if r["id"] == rec_id)
assert rec["status"] == "active"
assert rec["execution_status"] == "observe"
assert rec["entry_triggered"] == 0
assert rec["pnl_pct"] == 12.0
assert rec["max_pnl_pct"] == 12.0
assert rec["recommendation_result"] == "pending"