#!/usr/bin/env bash set -euo pipefail # Rebuild the AlphaX volume DB from a database file placed in the project root. # # This is different from a raw copy: # - it creates a brand-new clean SQLite DB # - it recreates schema from the root DB # - it copies readable table rows into the clean DB # - it can salvage partial data from a DB that fails integrity_check # # Server usage: # bash scripts/rebuild_volume_db_from_root_db.sh # # Defaults: # source: ./altcoin_monitor.db # target: ./data/altcoin_monitor.db # # Overrides: # ROOT_DB=./backup.db bash scripts/rebuild_volume_db_from_root_db.sh # STRICT=1 bash scripts/rebuild_volume_db_from_root_db.sh # fail if any table/row cannot be copied # RESTART=0 bash scripts/rebuild_volume_db_from_root_db.sh ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" cd "$ROOT_DIR" COMPOSE_CMD="${COMPOSE_CMD:-docker compose}" ROOT_DB="${ROOT_DB:-altcoin_monitor.db}" TARGET_DB="${TARGET_DB:-data/altcoin_monitor.db}" BACKUP_DIR="${BACKUP_DIR:-data/backups}" RESTART="${RESTART:-1}" STRICT="${STRICT:-0}" WEB_SERVICE="${WEB_SERVICE:-alphax-web}" SCHEDULER_SERVICE="${SCHEDULER_SERVICE:-alphax-scheduler}" compose() { ${COMPOSE_CMD} "$@" } info() { echo "[rebuild-db] $*" } die() { echo "ERROR: $*" >&2 exit 1 } service_exists() { compose config --services 2>/dev/null | grep -qx "$1" } [ -f "$ROOT_DB" ] || die "root database not found: $ROOT_DIR/$ROOT_DB" [ -s "$ROOT_DB" ] || die "root database is empty: $ROOT_DIR/$ROOT_DB" command -v python3 >/dev/null 2>&1 || die "python3 is required on the server" mkdir -p "$(dirname "$TARGET_DB")" "$BACKUP_DIR" STAMP="$(date +%Y%m%d_%H%M%S)" TEMP_DB="${BACKUP_DIR}/altcoin_monitor.rebuilt_from_root.${STAMP}.tmp.db" REPORT_FILE="${BACKUP_DIR}/altcoin_monitor.rebuild_report.${STAMP}.json" info "source root db: $ROOT_DIR/$ROOT_DB" info "target volume db: $ROOT_DIR/$TARGET_DB" info "root db candidates:" ls -lh "$ROOT_DB"* 2>/dev/null || true if [ "$RESTART" = "1" ]; then info "stopping compose services before replacing SQLite DB" if service_exists "$SCHEDULER_SERVICE"; then compose stop "$SCHEDULER_SERVICE" >/dev/null 2>&1 || true fi if service_exists "$WEB_SERVICE"; then compose stop "$WEB_SERVICE" >/dev/null 2>&1 || true fi fi info "rebuilding a clean SQLite DB from readable source data" ROOT_DB="$ROOT_DB" TEMP_DB="$TEMP_DB" REPORT_FILE="$REPORT_FILE" STRICT="$STRICT" python3 - <<'PY' import json import os import sqlite3 import sys import traceback src_path = os.environ["ROOT_DB"] dst_path = os.environ["TEMP_DB"] report_file = os.environ["REPORT_FILE"] strict = os.environ.get("STRICT") == "1" for path in (dst_path, dst_path + "-wal", dst_path + "-shm"): try: os.remove(path) except FileNotFoundError: pass report = { "source": src_path, "target_temp": dst_path, "strict": strict, "source_integrity": None, "target_integrity": None, "tables": [], "schema_errors": [], "index_errors": [], "fatal": None, } def quote_ident(name: str) -> str: return '"' + str(name).replace('"', '""') + '"' def record_table(name, copied=0, skipped=0, status="ok", error=""): report["tables"].append({ "table": name, "copied_rows": int(copied), "skipped_rows": int(skipped), "status": status, "error": str(error)[:2000], }) def write_report(): with open(report_file, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2) try: src = sqlite3.connect(f"file:{src_path}?mode=ro", uri=True, timeout=30) src.row_factory = sqlite3.Row dst = sqlite3.connect(dst_path, timeout=30) dst.row_factory = sqlite3.Row dst.execute("PRAGMA foreign_keys=OFF") dst.execute("PRAGMA journal_mode=DELETE") dst.execute("PRAGMA synchronous=OFF") try: report["source_integrity"] = src.execute("PRAGMA integrity_check").fetchone()[0] except Exception as exc: report["source_integrity"] = f"failed: {exc}" try: objects = src.execute(""" SELECT type, name, tbl_name, sql FROM sqlite_master WHERE sql IS NOT NULL AND name NOT LIKE 'sqlite_autoindex%' ORDER BY CASE type WHEN 'table' THEN 0 WHEN 'index' THEN 1 WHEN 'trigger' THEN 2 ELSE 3 END, name """).fetchall() except Exception as exc: report["fatal"] = f"cannot read sqlite_master: {exc}" write_report() raise SystemExit(report["fatal"]) table_objects = [ dict(row) for row in objects if row["type"] == "table" and not str(row["name"]).startswith("sqlite_") ] late_objects = [ dict(row) for row in objects if row["type"] in ("index", "trigger", "view") and not str(row["name"]).startswith("sqlite_") ] for obj in table_objects: try: dst.execute(obj["sql"]) except Exception as exc: report["schema_errors"].append({"object": obj["name"], "error": str(exc)}) if strict: raise dst.commit() for obj in table_objects: table = obj["name"] copied = 0 skipped = 0 try: cols = [row["name"] for row in src.execute(f"PRAGMA table_info({quote_ident(table)})").fetchall()] if not cols: record_table(table, copied, skipped, "skipped", "no columns") continue col_sql = ", ".join(quote_ident(c) for c in cols) placeholders = ", ".join("?" for _ in cols) insert_sql = f"INSERT INTO {quote_ident(table)} ({col_sql}) VALUES ({placeholders})" rowid_available = False try: src.execute(f"SELECT rowid FROM {quote_ident(table)} LIMIT 1").fetchone() rowid_available = True except Exception: rowid_available = False if rowid_available: bounds = src.execute(f"SELECT min(rowid), max(rowid) FROM {quote_ident(table)}").fetchone() min_id, max_id = bounds[0], bounds[1] if min_id is None or max_id is None: record_table(table, 0, 0, "ok") continue chunk = 1000 current = int(min_id) max_id = int(max_id) while current <= max_id: end = min(current + chunk - 1, max_id) try: rows = src.execute( f"SELECT {col_sql} FROM {quote_ident(table)} WHERE rowid BETWEEN ? AND ? ORDER BY rowid", (current, end), ).fetchall() dst.executemany(insert_sql, [tuple(row[c] for c in cols) for row in rows]) copied += len(rows) except Exception as chunk_exc: for rowid in range(current, end + 1): try: row = src.execute( f"SELECT {col_sql} FROM {quote_ident(table)} WHERE rowid=?", (rowid,), ).fetchone() if row is None: continue dst.execute(insert_sql, tuple(row[c] for c in cols)) copied += 1 except Exception: skipped += 1 if strict and skipped: raise chunk_exc current = end + 1 else: # Fallback for WITHOUT ROWID tables. If a malformed page breaks the # full scan, the whole table may be skipped. rows = src.execute(f"SELECT {col_sql} FROM {quote_ident(table)}").fetchall() dst.executemany(insert_sql, [tuple(row[c] for c in cols) for row in rows]) copied += len(rows) dst.commit() status = "partial" if skipped else "ok" record_table(table, copied, skipped, status) except Exception as exc: dst.rollback() record_table(table, copied, skipped, "failed", exc) if strict: raise for obj in late_objects: try: dst.execute(obj["sql"]) except Exception as exc: report["index_errors"].append({"object": obj["name"], "type": obj["type"], "error": str(exc)}) if strict: raise dst.commit() try: report["target_integrity"] = dst.execute("PRAGMA integrity_check").fetchone()[0] except Exception as exc: report["target_integrity"] = f"failed: {exc}" dst.close() src.close() write_report() if report["target_integrity"] != "ok": raise SystemExit(f"rebuilt target integrity_check failed: {report['target_integrity']}") if strict: bad = [t for t in report["tables"] if t["status"] != "ok"] if bad or report["schema_errors"] or report["index_errors"]: raise SystemExit("strict rebuild failed; see report") print(json.dumps({ "source_integrity": report["source_integrity"], "target_integrity": report["target_integrity"], "tables": len(report["tables"]), "copied_rows": sum(t["copied_rows"] for t in report["tables"]), "skipped_rows": sum(t["skipped_rows"] for t in report["tables"]), "report": report_file, }, ensure_ascii=False)) except Exception as exc: report["fatal"] = f"{exc}\n{traceback.format_exc()}" write_report() raise PY if [ -e "$TARGET_DB" ]; then OLD_BACKUP="${BACKUP_DIR}/altcoin_monitor.volume_before_rebuild.${STAMP}.db" info "backing up old volume db to $OLD_BACKUP" cp -p "$TARGET_DB" "$OLD_BACKUP" fi for sidecar in "${TARGET_DB}-wal" "${TARGET_DB}-shm"; do if [ -e "$sidecar" ]; then sidecar_backup="${BACKUP_DIR}/$(basename "$sidecar").volume_before_rebuild.${STAMP}" info "moving old sidecar $sidecar to $sidecar_backup" mv "$sidecar" "$sidecar_backup" fi done info "installing rebuilt db into volume path" mv "$TEMP_DB" "$TARGET_DB" chmod 664 "$TARGET_DB" || true info "final integrity check" HOST_DB="$TARGET_DB" python3 - <<'PY' import os import sqlite3 db = os.environ["HOST_DB"] conn = sqlite3.connect(f"file:{db}?mode=ro", uri=True, timeout=30) result = conn.execute("PRAGMA integrity_check").fetchone()[0] tables = conn.execute("SELECT COUNT(*) FROM sqlite_master WHERE type='table'").fetchone()[0] conn.close() if result != "ok": raise SystemExit(f"integrity_check failed: {result}") print(f"integrity_check=ok tables={tables} db={db}") PY if [ "$RESTART" = "1" ]; then info "starting compose services" if service_exists "$WEB_SERVICE"; then compose up -d "$WEB_SERVICE" fi if service_exists "$SCHEDULER_SERVICE"; then compose up -d "$SCHEDULER_SERVICE" fi fi info "done" info "system database path: $ROOT_DIR/$TARGET_DB" info "rebuild report: $ROOT_DIR/$REPORT_FILE"