"""One-time cleanup for retired strategy samples.""" from __future__ import annotations import os import subprocess from datetime import datetime from pathlib import Path from app.db.postgres_connection import get_database_url from app.db.schema import get_conn LEGACY_STRATEGY_CODES = ( "main_composite_v1", "volume_ignition_1h_v1", "intraday_momentum_15m_v1", "box_retest_1h_v1", "box_retest_4h_v1", "compression_breakout_4h_v1", "breakdown_retest_short_1h_v1", ) CONFIRM_TOKEN = "CLEAN_LEGACY_STRATEGY_SAMPLES" def _repo_root() -> Path: return Path(__file__).resolve().parents[2] def backup_database() -> str: backup_dir = _repo_root() / "data" / "backups" backup_dir.mkdir(parents=True, exist_ok=True) stamp = datetime.now().strftime("%Y%m%d_%H%M%S") out_file = backup_dir / f"legacy_strategy_cleanup_before_{stamp}.dump" subprocess.run( ["pg_dump", get_database_url(), "--format=custom", "--file", str(out_file)], check=True, ) return str(out_file) def cleanup_legacy_strategy_samples(*, confirm: str, create_backup: bool = True) -> dict: if confirm != CONFIRM_TOKEN: raise ValueError(f"confirm must be {CONFIRM_TOKEN}") backup_path = backup_database() if create_backup else "" conn = get_conn() deleted = { "paper_trade_events": 0, "paper_orders": 0, "paper_trades": 0, "recommendation": 0, "strategy_signals": 0, } try: trade_ids = [ int(row["id"]) for row in conn.execute( "SELECT id FROM paper_trades WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),), ).fetchall() ] recommendation_ids = [ int(row["id"]) for row in conn.execute( "SELECT id FROM recommendation WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),), ).fetchall() ] with conn.transaction(): if trade_ids: deleted["paper_trade_events"] += int( conn.execute( "SELECT COUNT(*) FROM paper_trade_events WHERE trade_id = ANY(%s)", (trade_ids,), ).fetchone()[0] or 0 ) conn.execute("DELETE FROM paper_trade_events WHERE trade_id = ANY(%s)", (trade_ids,)) if recommendation_ids: deleted["paper_trade_events"] += int( conn.execute( "SELECT COUNT(*) FROM paper_trade_events WHERE recommendation_id = ANY(%s)", (recommendation_ids,), ).fetchone()[0] or 0 ) conn.execute("DELETE FROM paper_trade_events WHERE recommendation_id = ANY(%s)", (recommendation_ids,)) deleted["paper_orders"] = int( conn.execute( "SELECT COUNT(*) FROM paper_orders WHERE recommendation_id = ANY(%s) OR strategy_code = ANY(%s)", (recommendation_ids, list(LEGACY_STRATEGY_CODES)), ).fetchone()[0] or 0 ) conn.execute( "DELETE FROM paper_orders WHERE recommendation_id = ANY(%s) OR strategy_code = ANY(%s)", (recommendation_ids, list(LEGACY_STRATEGY_CODES)), ) else: deleted["paper_orders"] = int( conn.execute( "SELECT COUNT(*) FROM paper_orders WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),), ).fetchone()[0] or 0 ) conn.execute("DELETE FROM paper_orders WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),)) deleted["paper_trades"] = int( conn.execute( "SELECT COUNT(*) FROM paper_trades WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),), ).fetchone()[0] or 0 ) conn.execute("DELETE FROM paper_trades WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),)) deleted["recommendation"] = int( conn.execute( "SELECT COUNT(*) FROM recommendation WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),), ).fetchone()[0] or 0 ) conn.execute("DELETE FROM recommendation WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),)) deleted["strategy_signals"] = int( conn.execute( "SELECT COUNT(*) FROM strategy_signals WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),), ).fetchone()[0] or 0 ) conn.execute("DELETE FROM strategy_signals WHERE strategy_code = ANY(%s)", (list(LEGACY_STRATEGY_CODES),)) conn.commit() except Exception: conn.rollback() raise finally: conn.close() return { "status": "completed", "scope": "legacy-strategies", "legacy_strategy_codes": list(LEGACY_STRATEGY_CODES), "backup_path": backup_path, "deleted": deleted, "run_time": datetime.now().isoformat(), }