alphax/app/db/data_export.py
2026-05-21 08:42:08 +08:00

126 lines
4.8 KiB
Python

"""Admin data export bundles for offline strategy analysis."""
from __future__ import annotations
import io
import json
import zipfile
from datetime import date, datetime, timedelta
from decimal import Decimal
from app.db.schema import get_conn
RECENT_TABLES = {
"recommendation": ("rec_time", 5000, "recommendations and lifecycle state"),
"screening_log": ("scan_time", 10000, "screening funnel rows"),
"coin_state": ("detected_at", 5000, "latest detected coin states"),
"price_tracking": ("track_time", 10000, "recommendation tracking samples"),
"paper_orders": ("created_at", 5000, "pending/filled/canceled order simulation"),
"paper_trades": ("opened_at", 5000, "trade ledger"),
"paper_trade_events": ("event_time", 10000, "trade lifecycle events"),
"cron_run_log": ("started_at", 2000, "scheduler run logs"),
"review_log": ("review_time", 2000, "review records"),
"missed_explosions": ("detected_at", 5000, "missed explosion review samples"),
"strategy_iteration_log": ("created_at", 2000, "strategy iteration history"),
"strategy_rule_candidate": ("created_at", 5000, "candidate strategy rules"),
"strategy_failure_pattern": ("created_at", 5000, "failure pattern records"),
"push_log": ("pushed_at", 5000, "notification/push decisions"),
"sentiment_events": ("detected_at", 5000, "sentiment events"),
"llm_insights": ("created_at", 5000, "LLM analysis cache"),
"event_news": ("detected_at", 5000, "news/event candidates"),
"onchain_events": ("detected_at", 5000, "normalized on-chain events"),
"latest_price_cache": ("updated_at", 2000, "latest price cache"),
}
SNAPSHOT_TABLES = {
"strategy_runtime_config": (1000, "strategy runtime config snapshot"),
"system_config": (1000, "system runtime config snapshot"),
"scheduler_job_config": (200, "scheduler config snapshot"),
}
def _json_default(value):
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
return str(value)
def _rows_to_dicts(rows) -> list[dict]:
return [dict(row) for row in rows]
def _fetch_recent(conn, table: str, time_col: str, cutoff: str, limit: int) -> list[dict]:
return _rows_to_dicts(
conn.execute(
f"""
SELECT *
FROM {table}
WHERE {time_col} >= %s
ORDER BY {time_col} DESC
LIMIT %s
""",
(cutoff, limit),
).fetchall()
)
def _fetch_snapshot(conn, table: str, limit: int) -> list[dict]:
return _rows_to_dicts(conn.execute(f"SELECT * FROM {table} LIMIT %s", (limit,)).fetchall())
def _write_json(zf: zipfile.ZipFile, path: str, payload) -> None:
zf.writestr(path, json.dumps(payload, ensure_ascii=False, indent=2, default=_json_default))
def build_data_export_bundle(hours: int = 24) -> tuple[str, bytes, dict]:
hours = max(1, min(int(hours or 24), 24 * 90))
generated_at = datetime.now()
cutoff = (generated_at - timedelta(hours=hours)).isoformat()
manifest = {
"generated_at": generated_at.isoformat(),
"window_hours": hours,
"cutoff": cutoff,
"format": "json_zip",
"tables": {},
}
buffer = io.BytesIO()
conn = get_conn()
try:
with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for table, (time_col, limit, description) in RECENT_TABLES.items():
try:
rows = _fetch_recent(conn, table, time_col, cutoff, limit)
_write_json(zf, f"tables/{table}.json", rows)
manifest["tables"][table] = {
"mode": "recent",
"time_column": time_col,
"rows": len(rows),
"limit": limit,
"description": description,
}
except Exception as exc:
manifest["tables"][table] = {"error": str(exc), "description": description}
for table, (limit, description) in SNAPSHOT_TABLES.items():
try:
rows = _fetch_snapshot(conn, table, limit)
_write_json(zf, f"snapshots/{table}.json", rows)
manifest["tables"][table] = {
"mode": "snapshot",
"rows": len(rows),
"limit": limit,
"description": description,
}
except Exception as exc:
manifest["tables"][table] = {"error": str(exc), "description": description}
_write_json(zf, "manifest.json", manifest)
finally:
conn.close()
filename = f"alphax_export_{generated_at.strftime('%Y%m%d_%H%M%S')}_{hours}h.zip"
return filename, buffer.getvalue(), manifest
__all__ = ["build_data_export_bundle"]