"""Cron/task run logging queries.""" import json from datetime import datetime, timedelta from app.db.schema import get_conn def log_cron_run(job_name, script_name, run_status, result_status="", started_at="", finished_at="", duration_ms=0, summary=None, error_message=""): """Record one scheduled/manual job run summary.""" conn = get_conn() conn.execute(""" INSERT INTO cron_run_log ( job_name, script_name, run_status, result_status, started_at, finished_at, duration_ms, summary_json, error_message ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( job_name, script_name, run_status, result_status, started_at or datetime.now().isoformat(), finished_at or datetime.now().isoformat(), int(duration_ms or 0), json.dumps(summary or {}, ensure_ascii=False, default=str), (error_message or "")[:1000], )) conn.commit() conn.close() def get_cron_run_logs(limit=50, job_name=None): """Read cron run logs.""" conn = get_conn() sql = """ SELECT * FROM cron_run_log {where_clause} ORDER BY started_at DESC, id DESC LIMIT %s """ params = [] where_clause = "" if job_name: where_clause = "WHERE job_name = %s" params.append(job_name) params.append(limit) rows = conn.execute(sql.format(where_clause=where_clause), tuple(params)).fetchall() conn.close() result = [] for row in rows: item = dict(row) try: item["summary_json"] = json.loads(item.get("summary_json") or "{}") except Exception: item["summary_json"] = {} result.append(item) return result def get_cron_run_summary(hours=24): """Aggregate cron run stats for the recent window.""" rows = _recent_cron_rows(hours) logs = [] job_stats = {} total_runs = 0 success_runs = 0 error_runs = 0 total_duration = 0 for row in rows: item = dict(row) try: item["summary_json"] = json.loads(item.get("summary_json") or "{}") except Exception: item["summary_json"] = {} logs.append(item) total_runs += 1 total_duration += item.get("duration_ms") or 0 if item.get("run_status") == "success": success_runs += 1 else: error_runs += 1 job = item.get("job_name") or "unknown" stat = job_stats.setdefault(job, { "job_name": job, "runs": 0, "success_runs": 0, "error_runs": 0, "avg_duration_ms": 0, "last_status": "", "last_result_status": "", "last_started_at": "", "last_finished_at": "", "last_error_message": "", }) stat["runs"] += 1 if item.get("run_status") == "success": stat["success_runs"] += 1 else: stat["error_runs"] += 1 stat["avg_duration_ms"] += item.get("duration_ms") or 0 if not stat["last_started_at"]: stat["last_status"] = item.get("run_status", "") stat["last_result_status"] = item.get("result_status", "") stat["last_started_at"] = item.get("started_at", "") stat["last_finished_at"] = item.get("finished_at", "") stat["last_error_message"] = item.get("error_message", "") for stat in job_stats.values(): stat["success_rate"] = round(stat["success_runs"] / stat["runs"] * 100, 1) if stat["runs"] else 0 stat["avg_duration_ms"] = round(stat["avg_duration_ms"] / stat["runs"]) if stat["runs"] else 0 overall = { "hours": hours, "total_runs": total_runs, "success_runs": success_runs, "error_runs": error_runs, "success_rate": round(success_runs / total_runs * 100, 1) if total_runs else 0, "avg_duration_ms": round(total_duration / total_runs) if total_runs else 0, } return { "overall": overall, "job_stats": sorted(job_stats.values(), key=lambda x: x["job_name"]), "recent_logs": logs[:20], } def _recent_cron_rows(hours): conn = get_conn() rows = conn.execute(""" SELECT * FROM cron_run_log WHERE started_at >= %s ORDER BY started_at DESC, id DESC """, ((datetime.now() - timedelta(hours=float(hours or 24))).isoformat(),)).fetchall() conn.close() return rows