alphax/scripts/validate_state_machine.py
2026-05-16 14:52:10 +08:00

69 lines
2.7 KiB
Python

#!/usr/bin/env python3
"""AlphaX 状态机口径验收脚本。"""
import json, sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from app.db.schema import get_conn, init_db
init_db()
conn = get_conn()
errors = []
def scalar(sql, params=()):
return conn.execute(sql, params).fetchone()[0]
# 1. 当前主状态唯一:同一 symbol 不能有多条非历史 active。
dups = conn.execute("""
SELECT symbol, COUNT(*) c, STRING_AGG(id::text, ',') ids
FROM recommendation
WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history'
GROUP BY symbol HAVING COUNT(*) > 1
""").fetchall()
if dups:
errors.append({'rule': 'active_symbol_unique', 'rows': [dict(r) for r in dups[:20]]})
# 2. 实时看板不允许失效/历史状态混入。
bad_realtime = conn.execute("""
SELECT id,symbol,status,action_status,display_bucket,execution_status
FROM recommendation
WHERE display_bucket IN ('realtime','watch_pool')
AND (status IN ('expired','stopped_out','invalid','archived') OR action_status IN ('衰减','反转','止损','放弃','过期','归档'))
LIMIT 50
""").fetchall()
if bad_realtime:
errors.append({'rule': 'no_invalid_in_realtime_or_watch_pool', 'rows': [dict(r) for r in bad_realtime]})
# 3. 等回踩/观察不能被标记为已触发入场。
bad_unexecuted = conn.execute("""
SELECT id,symbol,status,action_status,display_bucket,execution_status,entry_triggered
FROM recommendation
WHERE action_status IN ('等回踩','观察') AND COALESCE(entry_triggered,0) != 0
LIMIT 50
""").fetchall()
if bad_unexecuted:
errors.append({'rule': 'watch_wait_not_executed', 'rows': [dict(r) for r in bad_unexecuted]})
# 4. 入场窗口必须具备 realtime/buy_now 口径。
bad_buy = conn.execute("""
SELECT id,symbol,status,action_status,display_bucket,execution_status
FROM recommendation
WHERE action_status='可即刻买入' AND NOT (display_bucket='realtime' AND execution_status='buy_now')
LIMIT 50
""").fetchall()
if bad_buy:
errors.append({'rule': 'buy_now_bucket_consistency', 'rows': [dict(r) for r in bad_buy]})
summary = {
'recommendation_count': scalar('SELECT COUNT(*) FROM recommendation'),
'realtime_count': scalar("SELECT COUNT(*) FROM recommendation WHERE display_bucket='realtime'"),
'watch_pool_count': scalar("SELECT COUNT(*) FROM recommendation WHERE display_bucket='watch_pool'"),
'position_count': scalar("SELECT COUNT(*) FROM recommendation WHERE display_bucket='position'"),
'history_count': scalar("SELECT COUNT(*) FROM recommendation WHERE display_bucket='history'"),
'errors': errors,
}
print(json.dumps(summary, ensure_ascii=False, indent=2))
conn.close()
if errors:
sys.exit(1)