alphax/scripts/validate_push_state_flow.py
2026-05-16 14:52:10 +08:00

99 lines
3.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""AlphaX 推送与状态流转口径审计脚本。"""
import json
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from app.db.schema import get_conn, init_db
init_db()
conn = get_conn()
errors = []
warnings = []
def rows(sql, params=()):
return [dict(r) for r in conn.execute(sql, params).fetchall()]
# 1. 仍在 active 的同一 symbol 只能有一条实时/观察主记录。
dups = rows("""
SELECT symbol, COUNT(*) c, STRING_AGG(id::text, ',') ids
FROM recommendation
WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history'
GROUP BY symbol HAVING COUNT(*) > 1
""")
if dups:
errors.append({"rule": "active_symbol_unique", "rows": dups[:30]})
# 2. 推送必须可追溯到推荐主记录。历史旧数据可能 rec_id=0但新推送不应继续出现。
missing_rec_id = rows("""
SELECT id, symbol, push_type, action_status, pushed_at
FROM push_log
WHERE COALESCE(rec_id,0)=0
ORDER BY id DESC
LIMIT 20
""")
if missing_rec_id:
warnings.append({"rule": "push_log_missing_rec_id_legacy_or_bug", "rows": missing_rec_id})
# 3. 入场窗口 rec_time 不应被同一状态反复刷新:同一 rec_id/action 冷却期内重复推送要告警。
duplicate_push = rows("""
SELECT rec_id, symbol, push_type, action_status, COUNT(*) c, MIN(pushed_at) first_push, MAX(pushed_at) last_push
FROM push_log
WHERE COALESCE(rec_id,0) > 0
GROUP BY rec_id, symbol, push_type, action_status
HAVING COUNT(*) > 1
ORDER BY c DESC, last_push DESC
LIMIT 30
""")
if duplicate_push:
errors.append({"rule": "duplicate_same_rec_action_push", "rows": duplicate_push})
# 4. 可即刻买入必须有 rec_time/entry_price/current_price且展示桶一致。
bad_buy_now = rows("""
SELECT id, symbol, rec_time, entry_price, current_price, action_status, execution_status, display_bucket
FROM recommendation
WHERE action_status='可即刻买入'
AND NOT (execution_status='buy_now' AND display_bucket='realtime' AND COALESCE(entry_price,0)>0 AND COALESCE(current_price,0)>0 AND COALESCE(rec_time,'')!='')
LIMIT 30
""")
if bad_buy_now:
errors.append({"rule": "bad_buy_now_state", "rows": bad_buy_now})
# 5. 失效/止损/反转/衰减不能留在实时/观察池。
bad_invalid = rows("""
SELECT id, symbol, status, action_status, execution_status, display_bucket
FROM recommendation
WHERE (status IN ('expired','stopped_out','invalid','archived') OR action_status IN ('衰减','反转','止损','放弃','过期','归档'))
AND COALESCE(display_bucket,'') IN ('realtime','watch_pool')
LIMIT 30
""")
if bad_invalid:
errors.append({"rule": "invalid_still_visible", "rows": bad_invalid})
rec_counts = dict(conn.execute("""
SELECT
COUNT(*) AS recommendation_count,
SUM(CASE WHEN status='active' AND COALESCE(display_bucket,'watch_pool')!='history' THEN 1 ELSE 0 END) AS active_mainline_count,
SUM(CASE WHEN action_status='可即刻买入' THEN 1 ELSE 0 END) AS buy_now_count
FROM recommendation
""").fetchone())
push_counts = dict(conn.execute("""
SELECT
COUNT(*) AS push_count,
SUM(CASE WHEN COALESCE(rec_id,0)=0 THEN 1 ELSE 0 END) AS push_missing_rec_id_count
FROM push_log
""").fetchone())
summary = {
"errors": errors,
"warnings": warnings,
"counts": {**rec_counts, **push_counts},
}
print(json.dumps(summary, ensure_ascii=False, indent=2))
conn.close()
if errors:
raise SystemExit(1)