alphax/app/db/universe_audit.py
2026-05-29 08:51:48 +08:00

230 lines
8.4 KiB
Python

"""Universe cache and screening coverage audit queries."""
from __future__ import annotations
import json
from datetime import datetime, timedelta
from typing import Iterable
from app.db.postgres_connection import ensure_migrations_once
from app.db.schema import get_conn
STATIC_REASON_CODES = {"stablecoin", "wrapped", "excluded_base", "invalid_pair", "non_ascii", "inactive_market"}
TRANSIENT_REASON_CODES = {"stale_ticker"}
DYNAMIC_REASON_CODES = {"low_turnover"}
def _now() -> datetime:
return datetime.now()
def _iso(value: datetime | None = None) -> str:
return (value or _now()).isoformat(timespec="seconds")
def reason_type_for(code: str) -> str:
code = str(code or "").strip()
if code in STATIC_REASON_CODES:
return "static"
if code in TRANSIENT_REASON_CODES:
return "transient"
if code in DYNAMIC_REASON_CODES:
return "dynamic"
return "dynamic"
def expires_at_for(reason_type: str, now: datetime | None = None) -> str:
base = now or _now()
if reason_type == "static":
return (base + timedelta(days=90)).isoformat(timespec="seconds")
if reason_type == "transient":
return (base + timedelta(hours=1)).isoformat(timespec="seconds")
return (base + timedelta(hours=6)).isoformat(timespec="seconds")
def _json(data) -> str:
return json.dumps(data or {}, ensure_ascii=False)
def get_active_static_exclusions(symbols: Iterable[str]) -> dict[str, dict]:
"""Return cached long-lived exclusions for current Binance symbols."""
return get_active_universe_exclusions(symbols, reason_types=("static",))
def get_active_universe_exclusions(symbols: Iterable[str], *, reason_types: Iterable[str] = ("static", "dynamic", "transient")) -> dict[str, dict]:
"""Return active cached universe exclusions.
Static exclusions can be applied unconditionally. Dynamic/transient rows
should still be rechecked against the latest ticker evidence by callers.
"""
symbol_list = [str(s or "").upper().strip() for s in symbols if str(s or "").strip()]
if not symbol_list:
return {}
ensure_migrations_once()
now = _iso()
placeholders = ",".join(["%s"] * len(symbol_list))
type_list = [str(x or "").strip() for x in reason_types if str(x or "").strip()]
if not type_list:
return {}
type_placeholders = ",".join(["%s"] * len(type_list))
conn = get_conn()
rows = conn.execute(
f"""
SELECT *
FROM symbol_universe_cache
WHERE symbol IN ({placeholders})
AND decision='excluded'
AND reason_type IN ({type_placeholders})
AND manual_override=0
AND (expires_at='' OR expires_at >= %s)
""",
tuple(symbol_list) + tuple(type_list) + (now,),
).fetchall()
conn.close()
result = {}
for row in rows:
item = dict(row)
try:
item["evidence"] = json.loads(item.get("evidence_json") or "{}")
except Exception:
item["evidence"] = {}
result[item["symbol"]] = item
return result
def record_universe_decisions(items: Iterable[dict], *, source: str = "screener") -> int:
"""Upsert universe filter decisions for later scans and audit."""
rows = [dict(item or {}) for item in items if (item or {}).get("symbol")]
if not rows:
return 0
ensure_migrations_once()
now_dt = _now()
now = _iso(now_dt)
conn = get_conn()
count = 0
for item in rows:
symbol = str(item.get("symbol") or "").upper().strip()
base = str(item.get("base") or symbol.split("/")[0]).upper().strip()
reason_code = str(item.get("reason_code") or "").strip()
reason_label = str(item.get("reason_label") or reason_code or "宇宙过滤").strip()
rtype = str(item.get("reason_type") or reason_type_for(reason_code)).strip()
expires_at = str(item.get("expires_at") or expires_at_for(rtype, now_dt)).strip()
evidence = {
"price": item.get("price", 0),
"volume_24h": item.get("volume_24h", 0),
"change_24h": item.get("change_24h", 0),
"cache_hit": bool(item.get("cache_hit")),
"min_volume": item.get("min_volume", 0),
}
conn.execute(
"""
INSERT INTO symbol_universe_cache (
symbol, base, quote, decision, reason_code, reason_label, reason_type,
source, evidence_json, first_seen_at, last_seen_at, expires_at, hit_count, manual_override
)
VALUES (%s, %s, 'USDT', 'excluded', %s, %s, %s, %s, %s, %s, %s, %s, 1, 0)
ON CONFLICT(symbol) DO UPDATE SET
base=EXCLUDED.base,
decision=EXCLUDED.decision,
reason_code=EXCLUDED.reason_code,
reason_label=EXCLUDED.reason_label,
reason_type=EXCLUDED.reason_type,
source=EXCLUDED.source,
evidence_json=EXCLUDED.evidence_json,
last_seen_at=EXCLUDED.last_seen_at,
expires_at=EXCLUDED.expires_at,
hit_count=symbol_universe_cache.hit_count + 1
""",
(symbol, base, reason_code, reason_label, rtype, source, _json(evidence), now, now, expires_at),
)
count += 1
conn.commit()
conn.close()
return count
def record_screening_coverage(metrics: dict) -> int:
"""Persist one coverage snapshot for a screener run."""
ensure_migrations_once()
data = dict(metrics or {})
now = _iso()
started = str(data.get("scan_started_at") or now)
finished = str(data.get("scan_finished_at") or now)
detail = data.get("detail") or {}
conn = get_conn()
row = conn.execute(
"""
INSERT INTO screening_coverage_audit (
scan_started_at, scan_finished_at, source, status,
raw_ticker_count, usdt_pair_count, tradable_universe_count,
cached_exclusion_count, universe_gate_count, static_exclusion_count,
dynamic_exclusion_count, low_turnover_count, stale_ticker_count,
kline_attempt_count, kline_h1_success_count, kline_h4_success_count,
coarse_candidate_count, fine_qualified_count, quality_rejected_count,
top_gainer_discovery_count, detail_json
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
started,
finished,
str(data.get("source") or "binance_spot_usdt_market"),
str(data.get("status") or "completed"),
int(data.get("raw_ticker_count") or 0),
int(data.get("usdt_pair_count") or 0),
int(data.get("tradable_universe_count") or 0),
int(data.get("cached_exclusion_count") or 0),
int(data.get("universe_gate_count") or 0),
int(data.get("static_exclusion_count") or 0),
int(data.get("dynamic_exclusion_count") or 0),
int(data.get("low_turnover_count") or 0),
int(data.get("stale_ticker_count") or 0),
int(data.get("kline_attempt_count") or 0),
int(data.get("kline_h1_success_count") or 0),
int(data.get("kline_h4_success_count") or 0),
int(data.get("coarse_candidate_count") or 0),
int(data.get("fine_qualified_count") or 0),
int(data.get("quality_rejected_count") or 0),
int(data.get("top_gainer_discovery_count") or 0),
_json(detail),
),
).fetchone()
conn.commit()
conn.close()
return int(row["id"] if row else 0)
def list_screening_coverage(hours: int = 24, limit: int = 50) -> list[dict]:
ensure_migrations_once()
try:
hours = max(1, min(int(hours or 24), 24 * 30))
except Exception:
hours = 24
try:
limit = max(1, min(int(limit or 50), 200))
except Exception:
limit = 50
since = (_now() - timedelta(hours=hours)).isoformat(timespec="seconds")
conn = get_conn()
rows = conn.execute(
"""
SELECT *
FROM screening_coverage_audit
WHERE scan_started_at >= %s
ORDER BY scan_started_at DESC, id DESC
LIMIT %s
""",
(since, limit),
).fetchall()
conn.close()
result = []
for row in rows:
item = dict(row)
try:
item["detail_json"] = json.loads(item.get("detail_json") or "{}")
except Exception:
item["detail_json"] = {}
result.append(item)
return result