alphax/app/db/analytics.py
2026-05-14 01:20:47 +08:00

762 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Analytics-facing DB API grouped by read concerns."""
import json
from datetime import datetime
from app.db.altcoin_db import (
_classify_recommendation_result,
_derive_execution_fields,
_is_actionable_execution_status,
_is_executed_trade,
)
from app.db.schema import get_conn
def get_screening_history(hours=24, limit=100):
"""获取最近 N 小时的筛选记录。"""
conn = get_conn()
rows = conn.execute(
"""
SELECT * FROM screening_log
WHERE layer='细筛' AND julianday(?) - julianday(scan_time) < ?
ORDER BY score DESC, scan_time DESC LIMIT ?
""",
(datetime.now().isoformat(), hours / 24.0, limit),
).fetchall()
conn.close()
return [dict(r) for r in rows]
def _loads_json(value, fallback):
try:
if isinstance(value, str) and value.strip():
return json.loads(value)
if value:
return value
except Exception:
pass
return fallback
def get_observation_candidates(limit=50):
"""Return current coarse-screen observation candidates for the watch pool."""
conn = get_conn()
try:
limit = max(1, min(int(limit or 50), 200))
except Exception:
limit = 50
rows = conn.execute(
"""
SELECT * FROM coin_state
WHERE state != '过期'
ORDER BY score DESC, detected_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
conn.close()
items = []
for row in rows:
r = dict(row)
detail = _loads_json(r.get("detail_json"), {})
signals = detail.get("signals")
if not isinstance(signals, list):
signals = []
price = float(detail.get("price") or detail.get("current_price") or 0)
market_context = detail.get("market_context") if isinstance(detail.get("market_context"), dict) else {}
derivatives_context = detail.get("derivatives_context") if isinstance(detail.get("derivatives_context"), dict) else {}
sector_context = detail.get("sector_context") if isinstance(detail.get("sector_context"), dict) else {}
observe_tier = "weak" if int(r.get("score") or 0) < 4 else "strong"
reason = "粗筛观察候选,等待确认层给出当前触发和完整入场计划"
items.append({
"id": f"obs:{r.get('symbol')}",
"symbol": r.get("symbol"),
"rec_time": r.get("detected_at"),
"rec_state": r.get("state"),
"rec_score": int(r.get("score") or 0),
"entry_price": price,
"current_price": price,
"stop_loss": 0,
"tp1": 0,
"tp2": 0,
"sector": r.get("sector") or detail.get("sector") or "",
"signals": signals,
"status": "active",
"action_status": "观察",
"execution_status": "observe",
"execution_label": "观察候选",
"execution_reason": reason,
"display_bucket": "watch_pool",
"lifecycle_state": "watching",
"entry_triggered": 0,
"entry_plan": {
"entry_action": "观察",
"entry_method": reason,
"entry_price": price,
"current_price": price,
},
"observe_tier": observe_tier,
"observe_reason": reason,
"direction": detail.get("direction") or "多头启动",
"market_context": market_context,
"derivatives_context": derivatives_context,
"sector_context": sector_context,
"recommendation_result": "pending",
"recommendation_result_label": "观察候选",
"source": "coin_state",
})
return {
"items": items,
"summary": {
"total": len(items),
"candidate_count": len(items),
"source": "coin_state",
"note": "初筛观察池,不计入推荐绩效",
},
"has_more": False,
}
def get_all_recommendations(limit=50, decision_only=False, version="", offset=0, with_meta=False):
"""获取推荐列表。"""
conn = get_conn()
version = str(version or "").strip()
try:
limit = max(1, min(int(limit or 50), 500))
except Exception:
limit = 50
try:
offset = max(0, int(offset or 0))
except Exception:
offset = 0
result_where = """(status IN ('hit_tp1', 'hit_tp2', 'stopped_out')
OR (COALESCE(max_pnl_pct, 0) >= 5)
OR (COALESCE(pnl_pct, 0) <= -3 OR COALESCE(max_drawdown_pct, 0) <= -5))"""
version_where = " AND strategy_version=?" if version else ""
params = [version] if version else []
total = None
summary = None
version_counts = []
if decision_only:
if with_meta:
total = conn.execute(
"""
SELECT COUNT(*) FROM (
SELECT symbol
FROM recommendation
WHERE """
+ result_where
+ version_where
+ """
GROUP BY symbol
)
""",
tuple(params),
).fetchone()[0]
summary_row = conn.execute(
"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status IN ('hit_tp1','hit_tp2') OR COALESCE(max_pnl_pct,0) >= 5 THEN 1 ELSE 0 END) AS success_count,
SUM(CASE WHEN status='stopped_out' OR COALESCE(pnl_pct,0) <= -3 OR COALESCE(max_drawdown_pct,0) <= -5 THEN 1 ELSE 0 END) AS failure_count,
SUM(CASE WHEN status IN ('hit_tp1','hit_tp2') OR COALESCE(max_pnl_pct,0) >= 5 THEN COALESCE(NULLIF(max_pnl_pct,0), pnl_pct, 0)
WHEN status='stopped_out' OR COALESCE(pnl_pct,0) <= -3 OR COALESCE(max_drawdown_pct,0) <= -5 THEN COALESCE(pnl_pct,0)
ELSE 0 END) AS total_pnl,
MAX(CASE WHEN status IN ('hit_tp1','hit_tp2') OR COALESCE(max_pnl_pct,0) >= 5 THEN COALESCE(NULLIF(max_pnl_pct,0), pnl_pct, 0)
WHEN status='stopped_out' OR COALESCE(pnl_pct,0) <= -3 OR COALESCE(max_drawdown_pct,0) <= -5 THEN COALESCE(pnl_pct,0)
ELSE 0 END) AS best_pnl,
AVG(CASE WHEN status='stopped_out' OR COALESCE(pnl_pct,0) <= -3 OR COALESCE(max_drawdown_pct,0) <= -5 THEN COALESCE(pnl_pct,0) END) AS avg_failure_pnl
FROM (
SELECT r.*
FROM recommendation r
JOIN (
SELECT symbol, MAX(id) AS max_id
FROM recommendation
WHERE """
+ result_where
+ version_where
+ """
GROUP BY symbol
) latest ON latest.max_id = r.id
)
""",
tuple(params),
).fetchone()
summary = dict(summary_row) if summary_row else {}
vc_rows = conn.execute(
"""
SELECT COALESCE(r.strategy_version, '') AS version, COUNT(*) AS count
FROM recommendation r
JOIN (
SELECT symbol, MAX(id) AS max_id
FROM recommendation
WHERE """
+ result_where
+ """
GROUP BY symbol
) latest ON latest.max_id = r.id
WHERE COALESCE(r.strategy_version, '') != ''
GROUP BY r.strategy_version
"""
).fetchall()
version_counts = [{"version": row["version"], "count": row["count"]} for row in vc_rows]
rows = conn.execute(
"""
SELECT r.*,
lpc.price AS latest_cache_price,
lpc.updated_at AS latest_cache_updated_at
FROM recommendation r
LEFT JOIN latest_price_cache lpc ON lpc.symbol = r.symbol
JOIN (
SELECT symbol, MAX(id) AS max_id
FROM recommendation
WHERE """
+ result_where
+ version_where
+ """
GROUP BY symbol
) latest ON latest.max_id = r.id
ORDER BY r.rec_time DESC LIMIT ? OFFSET ?
""",
tuple(params + [limit, offset]),
).fetchall()
else:
where = "WHERE strategy_version=?" if version else ""
if with_meta:
total = conn.execute("SELECT COUNT(*) FROM recommendation " + where, tuple(params)).fetchone()[0]
rows = conn.execute(
"""
SELECT * FROM recommendation
"""
+ where
+ """
ORDER BY rec_time DESC LIMIT ? OFFSET ?
""",
tuple(params + [limit, offset]),
).fetchall()
conn.close()
result = []
for row in rows:
item = dict(row)
rec_result, rec_result_label = _classify_recommendation_result(item)
item["recommendation_result"] = rec_result
item["recommendation_result_label"] = rec_result_label
_derive_execution_fields(item)
result.append(item)
if not with_meta:
return result
return {
"items": result,
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(result) < int(total or 0),
"summary": summary or {},
"version_counts": version_counts,
}
def get_stats():
"""获取统计数据:胜率、平均盈亏、实时收益、推荐成败概览、排行榜、净值曲线与生命周期"""
conn = get_conn()
all_rows = conn.execute("SELECT * FROM recommendation ORDER BY rec_time DESC").fetchall()
raw_active_rows = conn.execute("SELECT * FROM recommendation WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history' ORDER BY rec_time DESC").fetchall()
raw_active_dedup_rows = conn.execute("""
SELECT r.*
FROM recommendation r
JOIN (
SELECT symbol, MAX(id) AS max_id
FROM recommendation
WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history'
GROUP BY symbol
) latest ON latest.max_id = r.id
ORDER BY r.rec_time DESC
""").fetchall()
total_count = len(all_rows)
raw_active_count = len(raw_active_rows)
now = datetime.now()
def classify_recommendation(row):
result, _ = _classify_recommendation_result(dict(row))
return result
def success_tier(row):
max_pnl_pct = row["max_pnl_pct"] or 0
if max_pnl_pct >= 20:
return "big"
if max_pnl_pct >= 10:
return "medium"
if max_pnl_pct >= 5:
return "small"
return "none"
def lifecycle_stage(row):
action_status = row["action_status"] or "持有"
result = classify_recommendation(row)
if result == "success":
return "已验证成功"
if result == "failed":
return "已验证失败"
if action_status in ("衰减", "反转"):
return "进入衰减"
if action_status in ("可即刻买入", "等回踩"):
return "等待入场"
return "持仓观察"
def safe_hours_between(start_text, end_dt):
try:
start_dt = datetime.fromisoformat(start_text)
return round((end_dt - start_dt).total_seconds() / 3600, 1)
except Exception:
return None
def compact_item(row):
item = dict(row)
rec_result, rec_result_label = _classify_recommendation_result(item)
item["recommendation_result"] = rec_result
item["recommendation_result_label"] = rec_result_label
derived = _derive_execution_fields(item)
hold_hours = safe_hours_between(row["rec_time"], now)
last_track_delay = safe_hours_between(row["last_track_time"], now) if row["last_track_time"] else None
return {
"symbol": row["symbol"],
"rec_time": row["rec_time"],
"entry_price": row["entry_price"],
"current_price": row["current_price"],
"pnl_pct": row["pnl_pct"] or 0,
"max_pnl_pct": row["max_pnl_pct"] or 0,
"max_drawdown_pct": row["max_drawdown_pct"] or 0,
"action_status": row["action_status"] or "持有",
"initial_action": derived["initial_action"],
"execution_status": derived["execution_status"],
"execution_label": derived["execution_label"],
"execution_reason": derived["execution_reason"],
"recommendation_result": classify_recommendation(row),
"success_tier": success_tier(row),
"lifecycle_stage": lifecycle_stage(row),
"hold_hours": hold_hours,
"track_delay_hours": last_track_delay,
"market_context": derived["market_context"],
"derivatives_context": derived["derivatives_context"],
"sector_context": derived["sector_context"],
}
active_dedup_rows = []
for row in raw_active_dedup_rows:
item = dict(row)
rec_result, rec_result_label = _classify_recommendation_result(item)
item["recommendation_result"] = rec_result
item["recommendation_result_label"] = rec_result_label
derived = _derive_execution_fields(item)
if _is_actionable_execution_status(derived.get("execution_status")):
active_dedup_rows.append(row)
active_count = len(active_dedup_rows)
success_count = 0
failed_count = 0
pending_count = 0
closed_count = 0
win_count = 0
realized_count = 0
realized_pnl_sum = 0
success_tier_counts = {"small": 0, "medium": 0, "big": 0}
closed_dedup_rows = conn.execute("""
SELECT r.*
FROM recommendation r
JOIN (
SELECT symbol, MAX(id) AS max_id
FROM recommendation
WHERE status IN ('hit_tp1', 'hit_tp2', 'stopped_out')
GROUP BY symbol
) latest ON latest.max_id = r.id
ORDER BY r.rec_time DESC
""").fetchall()
for row in closed_dedup_rows:
status = row["status"]
if status in ("hit_tp1", "hit_tp2"):
success_count += 1
tier = success_tier(row)
if tier in success_tier_counts:
success_tier_counts[tier] += 1
elif status == "stopped_out":
failed_count += 1
else:
pending_count += 1
if status in ("hit_tp1", "hit_tp2", "stopped_out", "expired"):
closed_count += 1
if (row["pnl_pct"] or 0) > 0:
win_count += 1
realized_dedup = [r for r in closed_dedup_rows if r["status"] in ("hit_tp1", "hit_tp2", "stopped_out")]
realized_count = len(realized_dedup)
realized_pnl_sum = sum((r["pnl_pct"] or 0) for r in realized_dedup)
exec_buy_now = 0
exec_wait = 0
exec_observe = 0
for row in raw_active_dedup_rows:
item = dict(row)
rec_result, rec_result_label = _classify_recommendation_result(item)
item["recommendation_result"] = rec_result
item["recommendation_result_label"] = rec_result_label
derived = _derive_execution_fields(item)
es = derived.get("execution_status", "")
if es == "buy_now":
exec_buy_now += 1
elif es == "wait_pullback":
exec_wait += 1
elif es == "observe":
exec_observe += 1
executed_active_dedup_rows = [r for r in active_dedup_rows if _is_executed_trade(dict(r))]
held_rows = executed_active_dedup_rows
held_count = len(held_rows)
held_pnl_avg = round(sum((r["pnl_pct"] or 0) for r in held_rows) / held_count, 2) if held_count else 0
held_win_count = sum(1 for r in held_rows if (r["pnl_pct"] or 0) > 0)
held_win_rate = round(held_win_count / held_count * 100, 1) if held_count else 0
active_pnl_sum = round(sum((r["pnl_pct"] or 0) for r in executed_active_dedup_rows), 2)
active_avg_pnl = round(active_pnl_sum / len(executed_active_dedup_rows), 2) if executed_active_dedup_rows else 0
active_max_pnl = round(max([(r["pnl_pct"] or 0) for r in executed_active_dedup_rows], default=0), 2)
active_min_pnl = round(min([(r["pnl_pct"] or 0) for r in executed_active_dedup_rows], default=0), 2)
active_success_count = sum(1 for r in executed_active_dedup_rows if classify_recommendation(r) == "success")
active_failed_count = sum(1 for r in executed_active_dedup_rows if classify_recommendation(r) == "failed")
active_pending_count = sum(1 for r in executed_active_dedup_rows if classify_recommendation(r) == "pending")
top_gainer = compact_item(max(executed_active_dedup_rows, key=lambda r: r["pnl_pct"] or -9999)) if executed_active_dedup_rows else None
top_loser = compact_item(min(executed_active_dedup_rows, key=lambda r: r["pnl_pct"] or 9999)) if executed_active_dedup_rows else None
biggest_explosion = compact_item(max(executed_active_dedup_rows, key=lambda r: r["max_pnl_pct"] or -9999)) if executed_active_dedup_rows else None
highest_risk = compact_item(min(executed_active_dedup_rows, key=lambda r: r["max_drawdown_pct"] or 9999)) if executed_active_dedup_rows else None
lifecycle_items = [compact_item(r) for r in executed_active_dedup_rows]
longest_holding = max(lifecycle_items, key=lambda x: x.get("hold_hours") or -1) if lifecycle_items else None
fastest_winner_candidates = [x for x in lifecycle_items if x.get("recommendation_result") == "success"]
fastest_winner = min(fastest_winner_candidates, key=lambda x: x.get("hold_hours") or 999999) if fastest_winner_candidates else None
decay_candidates = [x for x in lifecycle_items if x.get("lifecycle_stage") == "进入衰减"]
decay_watch = decay_candidates[0] if decay_candidates else None
points_24h = []
rows_24h = conn.execute("""
SELECT substr(track_time, 1, 13) || ':00:00' AS bucket, AVG(pnl_pct) AS avg_pnl, COUNT(*) AS sample_count
FROM price_tracking
WHERE julianday(?) - julianday(track_time) <= 1.0
GROUP BY bucket
ORDER BY bucket ASC
""", (now.isoformat(),)).fetchall()
for row in rows_24h:
points_24h.append({
"time": row["bucket"],
"avg_pnl": round(row["avg_pnl"] or 0, 2),
"sample_count": row["sample_count"] or 0,
})
points_7d = []
rows_7d = conn.execute("""
SELECT substr(track_time, 1, 10) AS bucket, AVG(pnl_pct) AS avg_pnl, COUNT(*) AS sample_count
FROM price_tracking
WHERE julianday(?) - julianday(track_time) <= 7.0
GROUP BY bucket
ORDER BY bucket ASC
""", (now.isoformat(),)).fetchall()
for row in rows_7d:
points_7d.append({
"time": row["bucket"],
"avg_pnl": round(row["avg_pnl"] or 0, 2),
"sample_count": row["sample_count"] or 0,
})
recommendation_success_rate = round(success_count / (success_count + failed_count) * 100, 1) if (success_count + failed_count) else 0
avg_pnl_pct = round(realized_pnl_sum / realized_count, 2) if realized_count else 0
actionable_contexts = []
for row in active_dedup_rows:
derived = _derive_execution_fields(dict(row))
actionable_contexts.append({
"market": derived.get("market_context") or {},
"derivatives": derived.get("derivatives_context") or {},
"sector": derived.get("sector_context") or {},
})
def avg_from_context(group_key, field):
values = []
for ctx in actionable_contexts:
value = (ctx.get(group_key) or {}).get(field)
if isinstance(value, (int, float)):
values.append(float(value))
if not values:
return 0
avg = sum(values) / len(values)
if abs(avg) < 0.01:
return round(avg, 3)
return round(avg, 1)
hot_sector_counter = {}
for ctx in actionable_contexts:
sector_ctx = ctx.get("sector") or {}
for sector in sector_ctx.get("hot_sectors") or []:
hot_sector_counter[sector] = hot_sector_counter.get(sector, 0) + 1
market_context_overview = {
"actionable_sample_count": len(actionable_contexts),
"avg_turnover_acceleration_1h": avg_from_context("market", "turnover_acceleration_1h"),
"avg_turnover_acceleration_4h": avg_from_context("market", "turnover_acceleration_4h"),
"avg_volume_24h": avg_from_context("market", "volume_24h"),
"avg_funding_rate": avg_from_context("derivatives", "funding_rate"),
"avg_top_trader_long_pct": avg_from_context("derivatives", "top_trader_long_pct"),
"avg_top_trader_long_short_ratio": avg_from_context("derivatives", "top_trader_long_short_ratio"),
"hot_sector_count": len(hot_sector_counter),
"top_hot_sectors": [
{"sector": sector, "count": count}
for sector, count in sorted(hot_sector_counter.items(), key=lambda item: (-item[1], item[0]))[:5]
],
}
conn.close()
return {
"total_recommendations": total_count,
"active_count": active_count,
"raw_active_count": raw_active_count,
"closed_count": closed_count,
"win_count": win_count,
"win_rate": round(win_count / closed_count * 100, 1) if closed_count else 0,
"avg_pnl_pct": avg_pnl_pct,
"success_count": success_count,
"failed_count": failed_count,
"pending_count": pending_count,
"recommendation_success_rate": recommendation_success_rate,
"active_pnl_sum": active_pnl_sum,
"active_avg_pnl": active_avg_pnl,
"active_max_pnl": active_max_pnl,
"active_min_pnl": active_min_pnl,
"active_success_count": active_success_count,
"active_failed_count": active_failed_count,
"active_pending_count": active_pending_count,
"live_overview": {
"actionable_count": active_count,
"executed_trade_count": len(executed_active_dedup_rows),
"executed_pnl_sum": active_pnl_sum,
"executed_avg_pnl": active_avg_pnl,
"actionable_pnl_sum": active_pnl_sum,
"actionable_avg_pnl": active_avg_pnl,
"buy_now_count": exec_buy_now,
"wait_pullback_count": exec_wait,
"observe_count": exec_observe,
"held_count": held_count,
"held_pnl_avg": held_pnl_avg,
"held_win_rate": held_win_rate,
"actionable_success_count": active_success_count,
"actionable_failed_count": active_failed_count,
"actionable_pending_count": active_pending_count,
"raw_active_count": raw_active_count,
},
"history_overview": {
"success_count": success_count,
"failed_count": failed_count,
"recommendation_success_rate": recommendation_success_rate,
"avg_pnl_pct": avg_pnl_pct,
"realized_count": realized_count,
},
"market_context_overview": market_context_overview,
"success_tier_counts": success_tier_counts,
"leaderboard": {
"top_gainer": top_gainer,
"top_loser": top_loser,
"biggest_explosion": biggest_explosion,
"highest_risk": highest_risk,
},
"equity_curve": {
"last_24h": points_24h,
"last_7d": points_7d,
},
"lifecycle_summary": {
"longest_holding": longest_holding,
"fastest_winner": fastest_winner,
"decay_watch": decay_watch,
},
"result_definition": {
"success": "仅统计实际命中止盈的推荐status=hit_tp1 或 hit_tp2",
"failed": "仅统计实际触发止损的推荐status=stopped_out",
"pending": "其余样本仅作为未兑现/观察中处理,不在顶部历史统计单独展示",
"avg_pnl_pct": "历史均盈亏仅基于真实兑现样本计算hit_tp1 / hit_tp2 / stopped_out",
"live_pnl": "实时收益只统计已经执行/触发入场的交易;等回踩计划和观察信号不纳入收益"
},
"success_tier_definition": {
"small": "小成功:最大涨幅 5%~10%",
"medium": "中成功:最大涨幅 10%~20%",
"big": "大成功:最大涨幅 >=20%"
},
"lifecycle_definition": {
"hold_hours": "从推荐发出到当前的持续小时数",
"track_delay_hours": "距离最近一次价格跟踪的延迟小时数",
"lifecycle_stage": "等待入场 / 持仓观察 / 进入衰减 / 已验证成功 / 已验证失败"
},
}
def get_review_stats(conn_provider=None, iteration_logs_getter=None, iteration_summary_getter=None):
"""获取复盘统计概览。"""
from app.db.review_queries import get_strategy_iteration_logs, get_strategy_iteration_summary
conn_factory = conn_provider or get_conn
logs_getter = iteration_logs_getter or get_strategy_iteration_logs
summary_getter = iteration_summary_getter or get_strategy_iteration_summary
conn = conn_factory()
revision_started_at = ""
try:
from app.config.config_loader import get_meta
meta = get_meta() or {}
revision_started_at = (meta.get("strategy_revision_started_at") or "").strip()
except Exception:
revision_started_at = ""
reviews = conn.execute("SELECT * FROM review_log ORDER BY review_time DESC").fetchall()
missed = conn.execute("SELECT * FROM missed_explosions ORDER BY detect_time DESC LIMIT 20").fetchall()
signals = conn.execute("SELECT * FROM signal_performance ORDER BY hit_rate DESC").fetchall()
conn.close()
return {
"reviews": [dict(r) for r in reviews],
"signal_performance": [dict(s) for s in signals],
"missed_explosions": [dict(m) for m in missed],
"iteration_logs": logs_getter(limit=30),
"iteration_summary": summary_getter(days=30),
"strategy_revision_started_at": revision_started_at,
}
def get_cron_run_logs(limit=50, job_name=None):
"""获取 cron 运行日志列表。"""
conn = get_conn()
sql = """
SELECT * FROM cron_run_log
{where_clause}
ORDER BY started_at DESC, id DESC
LIMIT ?
"""
params = []
where_clause = ""
if job_name:
where_clause = "WHERE job_name = ?"
params.append(job_name)
params.append(limit)
rows = conn.execute(sql.format(where_clause=where_clause), tuple(params)).fetchall()
conn.close()
result = []
for row in rows:
item = dict(row)
try:
item["summary_json"] = json.loads(item.get("summary_json") or "{}")
except Exception:
item["summary_json"] = {}
result.append(item)
return result
def get_cron_run_summary(hours=24):
"""获取 cron 运行汇总统计。"""
conn = get_conn()
rows = conn.execute(
"""
SELECT * FROM cron_run_log
WHERE julianday(?) - julianday(started_at) <= ?
ORDER BY started_at DESC, id DESC
""",
(datetime.now().isoformat(), hours / 24.0),
).fetchall()
conn.close()
logs = []
job_stats = {}
total_runs = 0
success_runs = 0
error_runs = 0
total_duration = 0
for row in rows:
item = dict(row)
try:
item["summary_json"] = json.loads(item.get("summary_json") or "{}")
except Exception:
item["summary_json"] = {}
logs.append(item)
total_runs += 1
total_duration += item.get("duration_ms") or 0
if item.get("run_status") == "success":
success_runs += 1
else:
error_runs += 1
job = item.get("job_name") or "unknown"
stat = job_stats.setdefault(
job,
{
"job_name": job,
"runs": 0,
"success_runs": 0,
"error_runs": 0,
"avg_duration_ms": 0,
"last_status": "",
"last_result_status": "",
"last_started_at": "",
"last_finished_at": "",
"last_error_message": "",
},
)
stat["runs"] += 1
if item.get("run_status") == "success":
stat["success_runs"] += 1
else:
stat["error_runs"] += 1
stat["avg_duration_ms"] += item.get("duration_ms") or 0
if not stat["last_started_at"]:
stat["last_status"] = item.get("run_status", "")
stat["last_result_status"] = item.get("result_status", "")
stat["last_started_at"] = item.get("started_at", "")
stat["last_finished_at"] = item.get("finished_at", "")
stat["last_error_message"] = item.get("error_message", "")
for stat in job_stats.values():
stat["success_rate"] = round(stat["success_runs"] / stat["runs"] * 100, 1) if stat["runs"] else 0
stat["avg_duration_ms"] = round(stat["avg_duration_ms"] / stat["runs"]) if stat["runs"] else 0
overall = {
"hours": hours,
"total_runs": total_runs,
"success_runs": success_runs,
"error_runs": error_runs,
"success_rate": round(success_runs / total_runs * 100, 1) if total_runs else 0,
"avg_duration_ms": round(total_duration / total_runs) if total_runs else 0,
}
return {
"overall": overall,
"job_stats": sorted(job_stats.values(), key=lambda x: x["job_name"]),
"recent_logs": logs[:20],
}
__all__ = [
"get_all_recommendations",
"get_observation_candidates",
"get_cron_run_logs",
"get_cron_run_summary",
"get_review_stats",
"get_screening_history",
"get_stats",
]