#!/usr/bin/env python3 """AlphaX 推送与状态流转口径审计脚本。""" import json import os import sqlite3 from pathlib import Path ROOT = Path(__file__).resolve().parents[1] DB = Path(os.getenv("ALPHAX_DB_PATH", str(ROOT / "data" / "altcoin_monitor.db"))) conn = sqlite3.connect(DB) conn.row_factory = sqlite3.Row errors = [] warnings = [] def rows(sql, params=()): return [dict(r) for r in conn.execute(sql, params).fetchall()] # 1. 仍在 active 的同一 symbol 只能有一条实时/观察主记录。 dups = rows(""" SELECT symbol, COUNT(*) c, GROUP_CONCAT(id) ids FROM recommendation WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history' GROUP BY symbol HAVING COUNT(*) > 1 """) if dups: errors.append({"rule": "active_symbol_unique", "rows": dups[:30]}) # 2. 推送必须可追溯到推荐主记录。历史旧数据可能 rec_id=0,但新推送不应继续出现。 missing_rec_id = rows(""" SELECT id, symbol, push_type, action_status, pushed_at FROM push_log WHERE COALESCE(rec_id,0)=0 ORDER BY id DESC LIMIT 20 """) if missing_rec_id: warnings.append({"rule": "push_log_missing_rec_id_legacy_or_bug", "rows": missing_rec_id}) # 3. 入场窗口 rec_time 不应被同一状态反复刷新:同一 rec_id/action 冷却期内重复推送要告警。 duplicate_push = rows(""" SELECT rec_id, symbol, push_type, action_status, COUNT(*) c, MIN(pushed_at) first_push, MAX(pushed_at) last_push FROM push_log WHERE COALESCE(rec_id,0) > 0 GROUP BY rec_id, push_type, action_status HAVING COUNT(*) > 1 ORDER BY c DESC, last_push DESC LIMIT 30 """) if duplicate_push: errors.append({"rule": "duplicate_same_rec_action_push", "rows": duplicate_push}) # 4. 可即刻买入必须有 rec_time/entry_price/current_price,且展示桶一致。 bad_buy_now = rows(""" SELECT id, symbol, rec_time, entry_price, current_price, action_status, execution_status, display_bucket FROM recommendation WHERE action_status='可即刻买入' AND NOT (execution_status='buy_now' AND display_bucket='realtime' AND COALESCE(entry_price,0)>0 AND COALESCE(current_price,0)>0 AND COALESCE(rec_time,'')!='') LIMIT 30 """) if bad_buy_now: errors.append({"rule": "bad_buy_now_state", "rows": bad_buy_now}) # 5. 失效/止损/反转/衰减不能留在实时/观察池。 bad_invalid = rows(""" SELECT id, symbol, status, action_status, execution_status, display_bucket FROM recommendation WHERE (status IN ('expired','stopped_out','invalid','archived') OR action_status IN ('衰减','反转','止损','放弃','过期','归档')) AND COALESCE(display_bucket,'') IN ('realtime','watch_pool') LIMIT 30 """) if bad_invalid: errors.append({"rule": "invalid_still_visible", "rows": bad_invalid}) rec_counts = dict(conn.execute(""" SELECT COUNT(*) AS recommendation_count, SUM(CASE WHEN status='active' AND COALESCE(display_bucket,'watch_pool')!='history' THEN 1 ELSE 0 END) AS active_mainline_count, SUM(CASE WHEN action_status='可即刻买入' THEN 1 ELSE 0 END) AS buy_now_count FROM recommendation """).fetchone()) push_counts = dict(conn.execute(""" SELECT COUNT(*) AS push_count, SUM(CASE WHEN COALESCE(rec_id,0)=0 THEN 1 ELSE 0 END) AS push_missing_rec_id_count FROM push_log """).fetchone()) summary = { "errors": errors, "warnings": warnings, "counts": {**rec_counts, **push_counts}, } print(json.dumps(summary, ensure_ascii=False, indent=2)) conn.close() if errors: raise SystemExit(1)