1255 lines
46 KiB
Python
1255 lines
46 KiB
Python
"""Analytics-facing DB API grouped by read concerns."""
|
||
|
||
import json
|
||
from datetime import datetime, timedelta
|
||
|
||
from app.db.altcoin_db import (
|
||
_classify_recommendation_result,
|
||
_derive_execution_fields,
|
||
_is_actionable_execution_status,
|
||
_is_executed_trade,
|
||
)
|
||
from app.db.schema import get_conn
|
||
|
||
|
||
def get_screening_history(hours=24, limit=100):
|
||
"""获取最近 N 小时的筛选记录。"""
|
||
conn = get_conn()
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT * FROM screening_log
|
||
WHERE layer='细筛' AND julianday(?) - julianday(scan_time) < ?
|
||
ORDER BY score DESC, scan_time DESC LIMIT ?
|
||
""",
|
||
(datetime.now().isoformat(), hours / 24.0, limit),
|
||
).fetchall()
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def _loads_json(value, fallback):
|
||
try:
|
||
if isinstance(value, str) and value.strip():
|
||
return json.loads(value)
|
||
if value:
|
||
return value
|
||
except Exception:
|
||
pass
|
||
return fallback
|
||
|
||
|
||
def _safe_int(value, default=0):
|
||
try:
|
||
return int(value or 0)
|
||
except Exception:
|
||
return default
|
||
|
||
|
||
def _safe_float(value, default=0.0):
|
||
try:
|
||
return float(value or 0)
|
||
except Exception:
|
||
return default
|
||
|
||
|
||
EXECUTED_TRADE_WHERE = """(
|
||
COALESCE(entry_triggered, 0) = 1
|
||
OR COALESCE(display_bucket, '') = 'position'
|
||
OR COALESCE(execution_status, '') IN ('holding', 'completed')
|
||
OR status IN ('hit_tp1', 'hit_tp2', 'stopped_out')
|
||
)"""
|
||
|
||
|
||
SUCCESS_CASE = f"""(
|
||
({EXECUTED_TRADE_WHERE})
|
||
AND (
|
||
status IN ('hit_tp1', 'hit_tp2')
|
||
OR (
|
||
status NOT IN ('stopped_out', 'expired', 'invalid', 'archived')
|
||
AND COALESCE(max_pnl_pct, 0) >= 5
|
||
)
|
||
)
|
||
)"""
|
||
|
||
|
||
FAILURE_CASE = f"""(
|
||
({EXECUTED_TRADE_WHERE})
|
||
AND (status='stopped_out' OR COALESCE(pnl_pct, 0) <= -3 OR COALESCE(max_drawdown_pct, 0) <= -5)
|
||
)"""
|
||
|
||
|
||
def _executed_trade_where(alias=""):
|
||
prefix = f"{alias}." if alias else ""
|
||
return f"""(
|
||
COALESCE({prefix}entry_triggered, 0) = 1
|
||
OR COALESCE({prefix}display_bucket, '') = 'position'
|
||
OR COALESCE({prefix}execution_status, '') IN ('holding', 'completed')
|
||
OR {prefix}status IN ('hit_tp1', 'hit_tp2', 'stopped_out')
|
||
)"""
|
||
|
||
|
||
def _parse_dt(value):
|
||
if isinstance(value, datetime):
|
||
return value
|
||
if not value:
|
||
return None
|
||
try:
|
||
return datetime.fromisoformat(str(value).replace("Z", "+00:00")).replace(tzinfo=None)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _iso(value):
|
||
if isinstance(value, datetime):
|
||
return value.isoformat(timespec="seconds")
|
||
return str(value or "")
|
||
|
||
|
||
def get_observation_candidates(limit=50):
|
||
"""Return current coarse-screen observation candidates for the watch pool."""
|
||
conn = get_conn()
|
||
try:
|
||
limit = max(1, min(int(limit or 50), 200))
|
||
except Exception:
|
||
limit = 50
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT * FROM coin_state
|
||
WHERE state != '过期'
|
||
ORDER BY score DESC, detected_at DESC
|
||
LIMIT ?
|
||
""",
|
||
(limit,),
|
||
).fetchall()
|
||
conn.close()
|
||
|
||
items = []
|
||
for row in rows:
|
||
r = dict(row)
|
||
detail = _loads_json(r.get("detail_json"), {})
|
||
signals = detail.get("signals")
|
||
if not isinstance(signals, list):
|
||
signals = []
|
||
price = float(detail.get("price") or detail.get("current_price") or 0)
|
||
market_context = detail.get("market_context") if isinstance(detail.get("market_context"), dict) else {}
|
||
derivatives_context = detail.get("derivatives_context") if isinstance(detail.get("derivatives_context"), dict) else {}
|
||
sector_context = detail.get("sector_context") if isinstance(detail.get("sector_context"), dict) else {}
|
||
observe_tier = "weak" if int(r.get("score") or 0) < 4 else "strong"
|
||
reason = "粗筛观察候选,等待确认层给出当前触发和完整入场计划"
|
||
items.append({
|
||
"id": f"obs:{r.get('symbol')}",
|
||
"symbol": r.get("symbol"),
|
||
"rec_time": r.get("detected_at"),
|
||
"rec_state": r.get("state"),
|
||
"rec_score": int(r.get("score") or 0),
|
||
"entry_price": price,
|
||
"current_price": price,
|
||
"stop_loss": 0,
|
||
"tp1": 0,
|
||
"tp2": 0,
|
||
"sector": r.get("sector") or detail.get("sector") or "",
|
||
"signals": signals,
|
||
"status": "active",
|
||
"action_status": "观察",
|
||
"execution_status": "observe",
|
||
"execution_label": "观察候选",
|
||
"execution_reason": reason,
|
||
"display_bucket": "watch_pool",
|
||
"lifecycle_state": "watching",
|
||
"entry_triggered": 0,
|
||
"entry_plan": {
|
||
"entry_action": "观察",
|
||
"entry_method": reason,
|
||
"entry_price": price,
|
||
"current_price": price,
|
||
},
|
||
"observe_tier": observe_tier,
|
||
"observe_reason": reason,
|
||
"direction": detail.get("direction") or "多头启动",
|
||
"market_context": market_context,
|
||
"derivatives_context": derivatives_context,
|
||
"sector_context": sector_context,
|
||
"recommendation_result": "pending",
|
||
"recommendation_result_label": "观察候选",
|
||
"source": "coin_state",
|
||
})
|
||
return {
|
||
"items": items,
|
||
"summary": {
|
||
"total": len(items),
|
||
"candidate_count": len(items),
|
||
"source": "coin_state",
|
||
"note": "初筛观察池,不计入推荐绩效",
|
||
},
|
||
"has_more": False,
|
||
}
|
||
|
||
|
||
def get_all_recommendations(limit=50, decision_only=False, version="", offset=0, with_meta=False):
|
||
"""获取推荐列表。"""
|
||
conn = get_conn()
|
||
version = str(version or "").strip()
|
||
try:
|
||
limit = max(1, min(int(limit or 50), 500))
|
||
except Exception:
|
||
limit = 50
|
||
try:
|
||
offset = max(0, int(offset or 0))
|
||
except Exception:
|
||
offset = 0
|
||
|
||
result_where = EXECUTED_TRADE_WHERE
|
||
version_where = " AND strategy_version=?" if version else ""
|
||
params = [version] if version else []
|
||
|
||
total = None
|
||
summary = None
|
||
version_counts = []
|
||
realized_pnl_case = (
|
||
f"CASE WHEN {FAILURE_CASE} THEN COALESCE(pnl_pct,0) "
|
||
f"WHEN {SUCCESS_CASE} THEN COALESCE(NULLIF(max_pnl_pct,0), pnl_pct, 0) "
|
||
"ELSE 0 END"
|
||
)
|
||
|
||
if decision_only:
|
||
if with_meta:
|
||
total = conn.execute(
|
||
"""
|
||
SELECT COUNT(*) FROM (
|
||
SELECT symbol
|
||
FROM recommendation
|
||
WHERE """
|
||
+ result_where
|
||
+ version_where
|
||
+ """
|
||
GROUP BY symbol
|
||
)
|
||
""",
|
||
tuple(params),
|
||
).fetchone()[0]
|
||
|
||
summary_row = conn.execute(
|
||
"""
|
||
SELECT
|
||
COUNT(*) AS total,
|
||
SUM(CASE WHEN """
|
||
+ SUCCESS_CASE
|
||
+ """ THEN 1 ELSE 0 END) AS success_count,
|
||
SUM(CASE WHEN """
|
||
+ FAILURE_CASE
|
||
+ """ THEN 1 ELSE 0 END) AS failure_count,
|
||
SUM("""
|
||
+ realized_pnl_case
|
||
+ """) AS total_pnl,
|
||
MAX("""
|
||
+ realized_pnl_case
|
||
+ """) AS best_pnl,
|
||
AVG(CASE WHEN """
|
||
+ FAILURE_CASE
|
||
+ """ THEN COALESCE(pnl_pct,0) END) AS avg_failure_pnl
|
||
FROM (
|
||
SELECT r.*
|
||
FROM recommendation r
|
||
JOIN (
|
||
SELECT symbol, MAX(id) AS max_id
|
||
FROM recommendation
|
||
WHERE """
|
||
+ result_where
|
||
+ version_where
|
||
+ """
|
||
GROUP BY symbol
|
||
) latest ON latest.max_id = r.id
|
||
)
|
||
""",
|
||
tuple(params),
|
||
).fetchone()
|
||
summary = dict(summary_row) if summary_row else {}
|
||
for key in ("total", "success_count", "failure_count", "total_pnl", "best_pnl", "avg_failure_pnl"):
|
||
if summary.get(key) is None:
|
||
summary[key] = 0
|
||
|
||
vc_rows = conn.execute(
|
||
"""
|
||
SELECT COALESCE(r.strategy_version, '') AS version, COUNT(*) AS count
|
||
FROM recommendation r
|
||
JOIN (
|
||
SELECT symbol, MAX(id) AS max_id
|
||
FROM recommendation
|
||
WHERE """
|
||
+ result_where
|
||
+ """
|
||
GROUP BY symbol
|
||
) latest ON latest.max_id = r.id
|
||
WHERE COALESCE(r.strategy_version, '') != ''
|
||
GROUP BY r.strategy_version
|
||
"""
|
||
).fetchall()
|
||
version_counts = [{"version": row["version"], "count": row["count"]} for row in vc_rows]
|
||
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT r.*,
|
||
lpc.price AS latest_cache_price,
|
||
lpc.updated_at AS latest_cache_updated_at
|
||
FROM recommendation r
|
||
LEFT JOIN latest_price_cache lpc ON lpc.symbol = r.symbol
|
||
JOIN (
|
||
SELECT symbol, MAX(id) AS max_id
|
||
FROM recommendation
|
||
WHERE """
|
||
+ result_where
|
||
+ version_where
|
||
+ """
|
||
GROUP BY symbol
|
||
) latest ON latest.max_id = r.id
|
||
ORDER BY r.rec_time DESC LIMIT ? OFFSET ?
|
||
""",
|
||
tuple(params + [limit, offset]),
|
||
).fetchall()
|
||
else:
|
||
where = "WHERE strategy_version=?" if version else ""
|
||
if with_meta:
|
||
total = conn.execute("SELECT COUNT(*) FROM recommendation " + where, tuple(params)).fetchone()[0]
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT * FROM recommendation
|
||
"""
|
||
+ where
|
||
+ """
|
||
ORDER BY rec_time DESC LIMIT ? OFFSET ?
|
||
""",
|
||
tuple(params + [limit, offset]),
|
||
).fetchall()
|
||
conn.close()
|
||
|
||
result = []
|
||
for row in rows:
|
||
item = dict(row)
|
||
rec_result, rec_result_label = _classify_recommendation_result(item)
|
||
item["recommendation_result"] = rec_result
|
||
item["recommendation_result_label"] = rec_result_label
|
||
_derive_execution_fields(item)
|
||
result.append(item)
|
||
|
||
if not with_meta:
|
||
return result
|
||
return {
|
||
"items": result,
|
||
"total": int(total or 0),
|
||
"limit": limit,
|
||
"offset": offset,
|
||
"has_more": offset + len(result) < int(total or 0),
|
||
"summary": summary or {},
|
||
"version_counts": version_counts,
|
||
}
|
||
|
||
|
||
def get_stats():
|
||
"""获取统计数据:胜率、平均盈亏、实时收益、推荐成败概览、排行榜、净值曲线与生命周期"""
|
||
conn = get_conn()
|
||
|
||
all_rows = conn.execute("SELECT * FROM recommendation ORDER BY rec_time DESC").fetchall()
|
||
raw_active_rows = conn.execute("SELECT * FROM recommendation WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history' ORDER BY rec_time DESC").fetchall()
|
||
raw_active_dedup_rows = conn.execute("""
|
||
SELECT r.*
|
||
FROM recommendation r
|
||
JOIN (
|
||
SELECT symbol, MAX(id) AS max_id
|
||
FROM recommendation
|
||
WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history'
|
||
GROUP BY symbol
|
||
) latest ON latest.max_id = r.id
|
||
ORDER BY r.rec_time DESC
|
||
""").fetchall()
|
||
|
||
total_count = len(all_rows)
|
||
raw_active_count = len(raw_active_rows)
|
||
now = datetime.now()
|
||
|
||
def classify_recommendation(row):
|
||
result, _ = _classify_recommendation_result(dict(row))
|
||
return result
|
||
|
||
def success_tier(row):
|
||
max_pnl_pct = row["max_pnl_pct"] or 0
|
||
if max_pnl_pct >= 20:
|
||
return "big"
|
||
if max_pnl_pct >= 10:
|
||
return "medium"
|
||
if max_pnl_pct >= 5:
|
||
return "small"
|
||
return "none"
|
||
|
||
def lifecycle_stage(row):
|
||
action_status = row["action_status"] or "持有"
|
||
result = classify_recommendation(row)
|
||
if result == "success":
|
||
return "已验证成功"
|
||
if result == "failed":
|
||
return "已验证失败"
|
||
if action_status in ("衰减", "反转"):
|
||
return "进入衰减"
|
||
if action_status in ("可即刻买入", "等回踩"):
|
||
return "等待入场"
|
||
return "持仓观察"
|
||
|
||
def safe_hours_between(start_text, end_dt):
|
||
try:
|
||
start_dt = datetime.fromisoformat(start_text)
|
||
return round((end_dt - start_dt).total_seconds() / 3600, 1)
|
||
except Exception:
|
||
return None
|
||
|
||
def compact_item(row):
|
||
item = dict(row)
|
||
rec_result, rec_result_label = _classify_recommendation_result(item)
|
||
item["recommendation_result"] = rec_result
|
||
item["recommendation_result_label"] = rec_result_label
|
||
derived = _derive_execution_fields(item)
|
||
hold_hours = safe_hours_between(row["rec_time"], now)
|
||
last_track_delay = safe_hours_between(row["last_track_time"], now) if row["last_track_time"] else None
|
||
return {
|
||
"symbol": row["symbol"],
|
||
"rec_time": row["rec_time"],
|
||
"entry_price": row["entry_price"],
|
||
"current_price": row["current_price"],
|
||
"pnl_pct": row["pnl_pct"] or 0,
|
||
"max_pnl_pct": row["max_pnl_pct"] or 0,
|
||
"max_drawdown_pct": row["max_drawdown_pct"] or 0,
|
||
"action_status": row["action_status"] or "持有",
|
||
"initial_action": derived["initial_action"],
|
||
"execution_status": derived["execution_status"],
|
||
"execution_label": derived["execution_label"],
|
||
"execution_reason": derived["execution_reason"],
|
||
"recommendation_result": classify_recommendation(row),
|
||
"success_tier": success_tier(row),
|
||
"lifecycle_stage": lifecycle_stage(row),
|
||
"hold_hours": hold_hours,
|
||
"track_delay_hours": last_track_delay,
|
||
"market_context": derived["market_context"],
|
||
"derivatives_context": derived["derivatives_context"],
|
||
"sector_context": derived["sector_context"],
|
||
}
|
||
|
||
active_dedup_rows = []
|
||
for row in raw_active_dedup_rows:
|
||
item = dict(row)
|
||
rec_result, rec_result_label = _classify_recommendation_result(item)
|
||
item["recommendation_result"] = rec_result
|
||
item["recommendation_result_label"] = rec_result_label
|
||
derived = _derive_execution_fields(item)
|
||
if _is_actionable_execution_status(derived.get("execution_status")):
|
||
active_dedup_rows.append(row)
|
||
|
||
active_count = len(active_dedup_rows)
|
||
success_count = 0
|
||
failed_count = 0
|
||
pending_count = 0
|
||
closed_count = 0
|
||
win_count = 0
|
||
realized_count = 0
|
||
realized_pnl_sum = 0
|
||
success_tier_counts = {"small": 0, "medium": 0, "big": 0}
|
||
|
||
closed_dedup_rows = conn.execute("""
|
||
SELECT r.*
|
||
FROM recommendation r
|
||
JOIN (
|
||
SELECT symbol, MAX(id) AS max_id
|
||
FROM recommendation
|
||
WHERE status IN ('hit_tp1', 'hit_tp2', 'stopped_out')
|
||
GROUP BY symbol
|
||
) latest ON latest.max_id = r.id
|
||
ORDER BY r.rec_time DESC
|
||
""").fetchall()
|
||
|
||
for row in closed_dedup_rows:
|
||
status = row["status"]
|
||
if status in ("hit_tp1", "hit_tp2"):
|
||
success_count += 1
|
||
tier = success_tier(row)
|
||
if tier in success_tier_counts:
|
||
success_tier_counts[tier] += 1
|
||
elif status == "stopped_out":
|
||
failed_count += 1
|
||
else:
|
||
pending_count += 1
|
||
|
||
if status in ("hit_tp1", "hit_tp2", "stopped_out", "expired"):
|
||
closed_count += 1
|
||
if (row["pnl_pct"] or 0) > 0:
|
||
win_count += 1
|
||
|
||
realized_dedup = [r for r in closed_dedup_rows if r["status"] in ("hit_tp1", "hit_tp2", "stopped_out")]
|
||
realized_count = len(realized_dedup)
|
||
realized_pnl_sum = sum((r["pnl_pct"] or 0) for r in realized_dedup)
|
||
|
||
exec_buy_now = 0
|
||
exec_wait = 0
|
||
exec_observe = 0
|
||
for row in raw_active_dedup_rows:
|
||
item = dict(row)
|
||
rec_result, rec_result_label = _classify_recommendation_result(item)
|
||
item["recommendation_result"] = rec_result
|
||
item["recommendation_result_label"] = rec_result_label
|
||
derived = _derive_execution_fields(item)
|
||
es = derived.get("execution_status", "")
|
||
if es == "buy_now":
|
||
exec_buy_now += 1
|
||
elif es == "wait_pullback":
|
||
exec_wait += 1
|
||
elif es == "observe":
|
||
exec_observe += 1
|
||
|
||
executed_active_dedup_rows = [r for r in active_dedup_rows if _is_executed_trade(dict(r))]
|
||
held_rows = executed_active_dedup_rows
|
||
held_count = len(held_rows)
|
||
held_pnl_avg = round(sum((r["pnl_pct"] or 0) for r in held_rows) / held_count, 2) if held_count else 0
|
||
held_win_count = sum(1 for r in held_rows if (r["pnl_pct"] or 0) > 0)
|
||
held_win_rate = round(held_win_count / held_count * 100, 1) if held_count else 0
|
||
active_pnl_sum = round(sum((r["pnl_pct"] or 0) for r in executed_active_dedup_rows), 2)
|
||
active_avg_pnl = round(active_pnl_sum / len(executed_active_dedup_rows), 2) if executed_active_dedup_rows else 0
|
||
active_max_pnl = round(max([(r["pnl_pct"] or 0) for r in executed_active_dedup_rows], default=0), 2)
|
||
active_min_pnl = round(min([(r["pnl_pct"] or 0) for r in executed_active_dedup_rows], default=0), 2)
|
||
active_success_count = sum(1 for r in executed_active_dedup_rows if classify_recommendation(r) == "success")
|
||
active_failed_count = sum(1 for r in executed_active_dedup_rows if classify_recommendation(r) == "failed")
|
||
active_pending_count = sum(1 for r in executed_active_dedup_rows if classify_recommendation(r) == "pending")
|
||
|
||
top_gainer = compact_item(max(executed_active_dedup_rows, key=lambda r: r["pnl_pct"] or -9999)) if executed_active_dedup_rows else None
|
||
top_loser = compact_item(min(executed_active_dedup_rows, key=lambda r: r["pnl_pct"] or 9999)) if executed_active_dedup_rows else None
|
||
biggest_explosion = compact_item(max(executed_active_dedup_rows, key=lambda r: r["max_pnl_pct"] or -9999)) if executed_active_dedup_rows else None
|
||
highest_risk = compact_item(min(executed_active_dedup_rows, key=lambda r: r["max_drawdown_pct"] or 9999)) if executed_active_dedup_rows else None
|
||
|
||
lifecycle_items = [compact_item(r) for r in executed_active_dedup_rows]
|
||
longest_holding = max(lifecycle_items, key=lambda x: x.get("hold_hours") or -1) if lifecycle_items else None
|
||
fastest_winner_candidates = [x for x in lifecycle_items if x.get("recommendation_result") == "success"]
|
||
fastest_winner = min(fastest_winner_candidates, key=lambda x: x.get("hold_hours") or 999999) if fastest_winner_candidates else None
|
||
decay_candidates = [x for x in lifecycle_items if x.get("lifecycle_stage") == "进入衰减"]
|
||
decay_watch = decay_candidates[0] if decay_candidates else None
|
||
|
||
points_24h = []
|
||
rows_24h = conn.execute(
|
||
"""
|
||
SELECT substr(pt.track_time, 1, 13) || ':00:00' AS bucket, AVG(pt.pnl_pct) AS avg_pnl, COUNT(*) AS sample_count
|
||
FROM price_tracking pt
|
||
JOIN recommendation r ON r.id = pt.rec_id
|
||
WHERE julianday(?) - julianday(pt.track_time) <= 1.0
|
||
AND """
|
||
+ _executed_trade_where("r")
|
||
+ """
|
||
GROUP BY bucket
|
||
ORDER BY bucket ASC
|
||
""",
|
||
(now.isoformat(),),
|
||
).fetchall()
|
||
for row in rows_24h:
|
||
points_24h.append({
|
||
"time": row["bucket"],
|
||
"avg_pnl": round(row["avg_pnl"] or 0, 2),
|
||
"sample_count": row["sample_count"] or 0,
|
||
})
|
||
|
||
points_7d = []
|
||
rows_7d = conn.execute(
|
||
"""
|
||
SELECT substr(pt.track_time, 1, 10) AS bucket, AVG(pt.pnl_pct) AS avg_pnl, COUNT(*) AS sample_count
|
||
FROM price_tracking pt
|
||
JOIN recommendation r ON r.id = pt.rec_id
|
||
WHERE julianday(?) - julianday(pt.track_time) <= 7.0
|
||
AND """
|
||
+ _executed_trade_where("r")
|
||
+ """
|
||
GROUP BY bucket
|
||
ORDER BY bucket ASC
|
||
""",
|
||
(now.isoformat(),),
|
||
).fetchall()
|
||
for row in rows_7d:
|
||
points_7d.append({
|
||
"time": row["bucket"],
|
||
"avg_pnl": round(row["avg_pnl"] or 0, 2),
|
||
"sample_count": row["sample_count"] or 0,
|
||
})
|
||
|
||
recommendation_success_rate = round(success_count / (success_count + failed_count) * 100, 1) if (success_count + failed_count) else 0
|
||
avg_pnl_pct = round(realized_pnl_sum / realized_count, 2) if realized_count else 0
|
||
|
||
actionable_contexts = []
|
||
for row in active_dedup_rows:
|
||
derived = _derive_execution_fields(dict(row))
|
||
actionable_contexts.append({
|
||
"market": derived.get("market_context") or {},
|
||
"derivatives": derived.get("derivatives_context") or {},
|
||
"sector": derived.get("sector_context") or {},
|
||
})
|
||
|
||
def avg_from_context(group_key, field):
|
||
values = []
|
||
for ctx in actionable_contexts:
|
||
value = (ctx.get(group_key) or {}).get(field)
|
||
if isinstance(value, (int, float)):
|
||
values.append(float(value))
|
||
if not values:
|
||
return 0
|
||
avg = sum(values) / len(values)
|
||
if abs(avg) < 0.01:
|
||
return round(avg, 3)
|
||
return round(avg, 1)
|
||
|
||
hot_sector_counter = {}
|
||
for ctx in actionable_contexts:
|
||
sector_ctx = ctx.get("sector") or {}
|
||
for sector in sector_ctx.get("hot_sectors") or []:
|
||
hot_sector_counter[sector] = hot_sector_counter.get(sector, 0) + 1
|
||
|
||
market_context_overview = {
|
||
"actionable_sample_count": len(actionable_contexts),
|
||
"avg_turnover_acceleration_1h": avg_from_context("market", "turnover_acceleration_1h"),
|
||
"avg_turnover_acceleration_4h": avg_from_context("market", "turnover_acceleration_4h"),
|
||
"avg_volume_24h": avg_from_context("market", "volume_24h"),
|
||
"avg_funding_rate": avg_from_context("derivatives", "funding_rate"),
|
||
"avg_top_trader_long_pct": avg_from_context("derivatives", "top_trader_long_pct"),
|
||
"avg_top_trader_long_short_ratio": avg_from_context("derivatives", "top_trader_long_short_ratio"),
|
||
"hot_sector_count": len(hot_sector_counter),
|
||
"top_hot_sectors": [
|
||
{"sector": sector, "count": count}
|
||
for sector, count in sorted(hot_sector_counter.items(), key=lambda item: (-item[1], item[0]))[:5]
|
||
],
|
||
}
|
||
|
||
conn.close()
|
||
return {
|
||
"total_recommendations": total_count,
|
||
"active_count": active_count,
|
||
"raw_active_count": raw_active_count,
|
||
"closed_count": closed_count,
|
||
"win_count": win_count,
|
||
"win_rate": round(win_count / closed_count * 100, 1) if closed_count else 0,
|
||
"avg_pnl_pct": avg_pnl_pct,
|
||
"success_count": success_count,
|
||
"failed_count": failed_count,
|
||
"pending_count": pending_count,
|
||
"recommendation_success_rate": recommendation_success_rate,
|
||
"active_pnl_sum": active_pnl_sum,
|
||
"active_avg_pnl": active_avg_pnl,
|
||
"active_max_pnl": active_max_pnl,
|
||
"active_min_pnl": active_min_pnl,
|
||
"active_success_count": active_success_count,
|
||
"active_failed_count": active_failed_count,
|
||
"active_pending_count": active_pending_count,
|
||
"live_overview": {
|
||
"actionable_count": active_count,
|
||
"executed_trade_count": len(executed_active_dedup_rows),
|
||
"executed_pnl_sum": active_pnl_sum,
|
||
"executed_avg_pnl": active_avg_pnl,
|
||
"actionable_pnl_sum": active_pnl_sum,
|
||
"actionable_avg_pnl": active_avg_pnl,
|
||
"buy_now_count": exec_buy_now,
|
||
"wait_pullback_count": exec_wait,
|
||
"observe_count": exec_observe,
|
||
"held_count": held_count,
|
||
"held_pnl_avg": held_pnl_avg,
|
||
"held_win_rate": held_win_rate,
|
||
"actionable_success_count": active_success_count,
|
||
"actionable_failed_count": active_failed_count,
|
||
"actionable_pending_count": active_pending_count,
|
||
"raw_active_count": raw_active_count,
|
||
},
|
||
"history_overview": {
|
||
"success_count": success_count,
|
||
"failed_count": failed_count,
|
||
"recommendation_success_rate": recommendation_success_rate,
|
||
"avg_pnl_pct": avg_pnl_pct,
|
||
"realized_count": realized_count,
|
||
},
|
||
"market_context_overview": market_context_overview,
|
||
"success_tier_counts": success_tier_counts,
|
||
"leaderboard": {
|
||
"top_gainer": top_gainer,
|
||
"top_loser": top_loser,
|
||
"biggest_explosion": biggest_explosion,
|
||
"highest_risk": highest_risk,
|
||
},
|
||
"equity_curve": {
|
||
"last_24h": points_24h,
|
||
"last_7d": points_7d,
|
||
},
|
||
"lifecycle_summary": {
|
||
"longest_holding": longest_holding,
|
||
"fastest_winner": fastest_winner,
|
||
"decay_watch": decay_watch,
|
||
},
|
||
"result_definition": {
|
||
"success": "仅统计实际命中止盈的推荐:status=hit_tp1 或 hit_tp2",
|
||
"failed": "仅统计实际触发止损的推荐:status=stopped_out",
|
||
"pending": "其余样本仅作为未兑现/观察中处理,不在顶部历史统计单独展示",
|
||
"avg_pnl_pct": "历史均盈亏仅基于真实兑现样本计算:hit_tp1 / hit_tp2 / stopped_out",
|
||
"live_pnl": "实时收益只统计已经执行/触发入场的交易;等回踩计划和观察信号不纳入收益"
|
||
},
|
||
"success_tier_definition": {
|
||
"small": "小成功:最大涨幅 5%~10%",
|
||
"medium": "中成功:最大涨幅 10%~20%",
|
||
"big": "大成功:最大涨幅 >=20%"
|
||
},
|
||
"lifecycle_definition": {
|
||
"hold_hours": "从推荐发出到当前的持续小时数",
|
||
"track_delay_hours": "距离最近一次价格跟踪的延迟小时数",
|
||
"lifecycle_stage": "等待入场 / 持仓观察 / 进入衰减 / 已验证成功 / 已验证失败"
|
||
},
|
||
}
|
||
|
||
|
||
def get_review_stats(conn_provider=None, iteration_logs_getter=None, iteration_summary_getter=None):
|
||
"""获取复盘统计概览。"""
|
||
from app.db.review_queries import get_strategy_iteration_logs, get_strategy_iteration_summary
|
||
|
||
conn_factory = conn_provider or get_conn
|
||
logs_getter = iteration_logs_getter or get_strategy_iteration_logs
|
||
summary_getter = iteration_summary_getter or get_strategy_iteration_summary
|
||
|
||
conn = conn_factory()
|
||
revision_started_at = ""
|
||
try:
|
||
from app.config.config_loader import get_meta
|
||
|
||
meta = get_meta() or {}
|
||
revision_started_at = (meta.get("strategy_revision_started_at") or "").strip()
|
||
except Exception:
|
||
revision_started_at = ""
|
||
|
||
reviews = conn.execute("SELECT * FROM review_log ORDER BY review_time DESC").fetchall()
|
||
missed = conn.execute("SELECT * FROM missed_explosions ORDER BY detect_time DESC LIMIT 20").fetchall()
|
||
signals = conn.execute("SELECT * FROM signal_performance ORDER BY hit_rate DESC").fetchall()
|
||
conn.close()
|
||
return {
|
||
"reviews": [dict(r) for r in reviews],
|
||
"signal_performance": [dict(s) for s in signals],
|
||
"missed_explosions": [dict(m) for m in missed],
|
||
"iteration_logs": logs_getter(limit=30),
|
||
"iteration_summary": summary_getter(days=30),
|
||
"strategy_revision_started_at": revision_started_at,
|
||
}
|
||
|
||
|
||
def get_cron_run_logs(limit=50, job_name=None):
|
||
"""获取 cron 运行日志列表。"""
|
||
conn = get_conn()
|
||
sql = """
|
||
SELECT * FROM cron_run_log
|
||
{where_clause}
|
||
ORDER BY started_at DESC, id DESC
|
||
LIMIT ?
|
||
"""
|
||
params = []
|
||
where_clause = ""
|
||
if job_name:
|
||
where_clause = "WHERE job_name = ?"
|
||
params.append(job_name)
|
||
params.append(limit)
|
||
rows = conn.execute(sql.format(where_clause=where_clause), tuple(params)).fetchall()
|
||
conn.close()
|
||
|
||
result = []
|
||
for row in rows:
|
||
item = dict(row)
|
||
try:
|
||
item["summary_json"] = json.loads(item.get("summary_json") or "{}")
|
||
except Exception:
|
||
item["summary_json"] = {}
|
||
result.append(item)
|
||
return result
|
||
|
||
|
||
def get_cron_run_summary(hours=24):
|
||
"""获取 cron 运行汇总统计。"""
|
||
conn = get_conn()
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT * FROM cron_run_log
|
||
WHERE julianday(?) - julianday(started_at) <= ?
|
||
ORDER BY started_at DESC, id DESC
|
||
""",
|
||
(datetime.now().isoformat(), hours / 24.0),
|
||
).fetchall()
|
||
conn.close()
|
||
|
||
logs = []
|
||
job_stats = {}
|
||
total_runs = 0
|
||
success_runs = 0
|
||
error_runs = 0
|
||
total_duration = 0
|
||
|
||
for row in rows:
|
||
item = dict(row)
|
||
try:
|
||
item["summary_json"] = json.loads(item.get("summary_json") or "{}")
|
||
except Exception:
|
||
item["summary_json"] = {}
|
||
logs.append(item)
|
||
|
||
total_runs += 1
|
||
total_duration += item.get("duration_ms") or 0
|
||
if item.get("run_status") == "success":
|
||
success_runs += 1
|
||
else:
|
||
error_runs += 1
|
||
|
||
job = item.get("job_name") or "unknown"
|
||
stat = job_stats.setdefault(
|
||
job,
|
||
{
|
||
"job_name": job,
|
||
"runs": 0,
|
||
"success_runs": 0,
|
||
"error_runs": 0,
|
||
"avg_duration_ms": 0,
|
||
"last_status": "",
|
||
"last_result_status": "",
|
||
"last_started_at": "",
|
||
"last_finished_at": "",
|
||
"last_error_message": "",
|
||
},
|
||
)
|
||
stat["runs"] += 1
|
||
if item.get("run_status") == "success":
|
||
stat["success_runs"] += 1
|
||
else:
|
||
stat["error_runs"] += 1
|
||
stat["avg_duration_ms"] += item.get("duration_ms") or 0
|
||
if not stat["last_started_at"]:
|
||
stat["last_status"] = item.get("run_status", "")
|
||
stat["last_result_status"] = item.get("result_status", "")
|
||
stat["last_started_at"] = item.get("started_at", "")
|
||
stat["last_finished_at"] = item.get("finished_at", "")
|
||
stat["last_error_message"] = item.get("error_message", "")
|
||
|
||
for stat in job_stats.values():
|
||
stat["success_rate"] = round(stat["success_runs"] / stat["runs"] * 100, 1) if stat["runs"] else 0
|
||
stat["avg_duration_ms"] = round(stat["avg_duration_ms"] / stat["runs"]) if stat["runs"] else 0
|
||
|
||
overall = {
|
||
"hours": hours,
|
||
"total_runs": total_runs,
|
||
"success_runs": success_runs,
|
||
"error_runs": error_runs,
|
||
"success_rate": round(success_runs / total_runs * 100, 1) if total_runs else 0,
|
||
"avg_duration_ms": round(total_duration / total_runs) if total_runs else 0,
|
||
}
|
||
return {
|
||
"overall": overall,
|
||
"job_stats": sorted(job_stats.values(), key=lambda x: x["job_name"]),
|
||
"recent_logs": logs[:20],
|
||
}
|
||
|
||
|
||
def _pipeline_window(run, next_run_start=None):
|
||
started = _parse_dt(run.get("started_at")) or datetime.now()
|
||
finished = _parse_dt(run.get("finished_at")) or started
|
||
if finished < started:
|
||
finished = started
|
||
window_start = started - timedelta(minutes=10)
|
||
window_end = finished + timedelta(minutes=30)
|
||
next_started = _parse_dt(next_run_start)
|
||
if next_started and next_started > finished:
|
||
window_end = min(window_end, next_started - timedelta(seconds=1))
|
||
return window_start, window_end
|
||
|
||
|
||
def _cron_item(row):
|
||
item = dict(row)
|
||
item["summary_json"] = _loads_json(item.get("summary_json"), {})
|
||
return item
|
||
|
||
|
||
def _screening_item(row):
|
||
item = dict(row)
|
||
item["signals"] = _loads_json(item.get("signals"), [])
|
||
item["detail_json"] = _loads_json(item.get("detail_json"), {})
|
||
if item.get("layer") == "细筛":
|
||
item["stage_bucket"] = "fine"
|
||
item["stage_label"] = "细筛通过"
|
||
elif item.get("layer") == "确认":
|
||
item["stage_bucket"] = "confirm"
|
||
item["stage_label"] = "确认记录"
|
||
else:
|
||
item["stage_bucket"] = "coarse"
|
||
item["stage_label"] = "观察候选"
|
||
return item
|
||
|
||
|
||
def _recommendation_item(row):
|
||
item = dict(row)
|
||
item["signals"] = _loads_json(item.get("signals"), [])
|
||
item["signal_codes"] = _loads_json(item.get("signal_codes_json"), [])
|
||
item["signal_labels"] = _loads_json(item.get("signal_labels_json"), [])
|
||
item["entry_plan"] = _loads_json(item.get("entry_plan_json"), {})
|
||
rec_result, rec_result_label = _classify_recommendation_result(item)
|
||
item["recommendation_result"] = rec_result
|
||
item["recommendation_result_label"] = rec_result_label
|
||
_derive_execution_fields(item)
|
||
return item
|
||
|
||
|
||
def _review_item(row):
|
||
item = dict(row)
|
||
item["triggered_signals"] = _loads_json(item.get("triggered_signals"), [])
|
||
item["hit_signals"] = _loads_json(item.get("hit_signals"), [])
|
||
item["miss_signals"] = _loads_json(item.get("miss_signals"), [])
|
||
return item
|
||
|
||
|
||
def _missed_item(row):
|
||
item = dict(row)
|
||
item["features_detected"] = _loads_json(item.get("features_detected"), {})
|
||
return item
|
||
|
||
|
||
def _performance_status(rec, reviews_by_rec):
|
||
status = (rec.get("status") or "").strip()
|
||
review_outcomes = [(r.get("outcome") or "").strip() for r in reviews_by_rec.get(rec.get("id"), [])]
|
||
if status in ("hit_tp1", "hit_tp2") or "爆发" in review_outcomes:
|
||
return "success"
|
||
if status == "stopped_out" or "失败" in review_outcomes:
|
||
return "failed"
|
||
return "pending"
|
||
|
||
|
||
def _select_pipeline_rows(conn, run):
|
||
next_row = conn.execute(
|
||
"""
|
||
SELECT started_at FROM cron_run_log
|
||
WHERE job_name='粗筛' AND started_at > ?
|
||
ORDER BY started_at ASC, id ASC
|
||
LIMIT 1
|
||
""",
|
||
(run.get("started_at"),),
|
||
).fetchone()
|
||
window_start, window_end = _pipeline_window(run, next_row["started_at"] if next_row else None)
|
||
run_started = _iso(_parse_dt(run.get("started_at")) or window_start)
|
||
run_finished = _iso(_parse_dt(run.get("finished_at")) or _parse_dt(run.get("started_at")) or window_start)
|
||
start_text = _iso(window_start)
|
||
end_text = _iso(window_end)
|
||
cron_rows = conn.execute(
|
||
"""
|
||
SELECT * FROM cron_run_log
|
||
WHERE started_at >= ? AND started_at <= ?
|
||
AND (
|
||
job_name IN ('事件舆情', '跟踪', '复盘')
|
||
OR (job_name='粗筛' AND id=?)
|
||
OR (job_name='确认' AND started_at >= ?)
|
||
)
|
||
ORDER BY started_at ASC, id ASC
|
||
""",
|
||
(start_text, end_text, run.get("id"), run_finished),
|
||
).fetchall()
|
||
screening_rows = conn.execute(
|
||
"""
|
||
SELECT * FROM screening_log
|
||
WHERE (
|
||
layer IN ('粗筛', '细筛') AND scan_time >= ? AND scan_time <= ?
|
||
) OR (
|
||
layer='确认' AND scan_time >= ? AND scan_time <= ?
|
||
) OR (
|
||
layer='舆情触发' AND scan_time >= ? AND scan_time <= ?
|
||
)
|
||
ORDER BY scan_time ASC, score DESC, id ASC
|
||
""",
|
||
(run_started, run_finished, run_finished, end_text, start_text, end_text),
|
||
).fetchall()
|
||
rec_rows = conn.execute(
|
||
"""
|
||
SELECT * FROM recommendation
|
||
WHERE rec_time >= ? AND rec_time <= ?
|
||
ORDER BY rec_time ASC, id ASC
|
||
""",
|
||
(run_finished, end_text),
|
||
).fetchall()
|
||
rec_ids = [row["id"] for row in rec_rows]
|
||
reviews = []
|
||
if rec_ids:
|
||
placeholders = ",".join(["?"] * len(rec_ids))
|
||
reviews = conn.execute(
|
||
f"""
|
||
SELECT * FROM review_log
|
||
WHERE rec_id IN ({placeholders})
|
||
ORDER BY review_time ASC, id ASC
|
||
""",
|
||
tuple(rec_ids),
|
||
).fetchall()
|
||
review_window_rows = conn.execute(
|
||
"""
|
||
SELECT * FROM review_log
|
||
WHERE review_time >= ? AND review_time <= ?
|
||
ORDER BY review_time ASC, id ASC
|
||
""",
|
||
(run_finished, end_text),
|
||
).fetchall()
|
||
known_review_ids = {row["id"] for row in reviews}
|
||
for row in review_window_rows:
|
||
if row["id"] not in known_review_ids:
|
||
reviews.append(row)
|
||
known_review_ids.add(row["id"])
|
||
missed_rows = conn.execute(
|
||
"""
|
||
SELECT * FROM missed_explosions
|
||
WHERE detect_time >= ? AND detect_time <= ?
|
||
ORDER BY detect_time ASC, id ASC
|
||
""",
|
||
(run_finished, end_text),
|
||
).fetchall()
|
||
return {
|
||
"window_start": start_text,
|
||
"window_end": end_text,
|
||
"cron_rows": [_cron_item(row) for row in cron_rows],
|
||
"screening_rows": [_screening_item(row) for row in screening_rows],
|
||
"recommendation_rows": [_recommendation_item(row) for row in rec_rows],
|
||
"review_rows": [_review_item(row) for row in reviews],
|
||
"missed_rows": [_missed_item(row) for row in missed_rows],
|
||
}
|
||
|
||
|
||
def _pipeline_summary_for_run(run, related):
|
||
summary = _loads_json(run.get("summary_json"), {})
|
||
confirm_rows = [r for r in related["cron_rows"] if r.get("job_name") == "确认"]
|
||
event_rows = [r for r in related["cron_rows"] if r.get("job_name") == "事件舆情"]
|
||
track_rows = [r for r in related["cron_rows"] if r.get("job_name") == "跟踪"]
|
||
review_cron_rows = [r for r in related["cron_rows"] if r.get("job_name") == "复盘"]
|
||
|
||
confirm_processed = 0
|
||
confirm_hits = 0
|
||
for row in confirm_rows:
|
||
s = row.get("summary_json") or {}
|
||
confirm_processed += _safe_int(s.get("processed_count"))
|
||
confirm_hits += _safe_int(s.get("confirmed_count"))
|
||
|
||
reviews_by_rec = {}
|
||
for review in related["review_rows"]:
|
||
reviews_by_rec.setdefault(review.get("rec_id"), []).append(review)
|
||
|
||
perf_counts = {"success": 0, "failed": 0, "pending": 0}
|
||
for rec in related["recommendation_rows"]:
|
||
perf_counts[_performance_status(rec, reviews_by_rec)] += 1
|
||
|
||
status = run.get("run_status") or "unknown"
|
||
rough_candidates = _safe_int(summary.get("total_candidates"))
|
||
fine_qualified = _safe_int(summary.get("total_qualified"))
|
||
if not rough_candidates:
|
||
rough_candidates = sum(1 for item in related["screening_rows"] if item.get("layer") == "粗筛")
|
||
if not fine_qualified:
|
||
fine_qualified = sum(1 for item in related["screening_rows"] if item.get("layer") == "细筛")
|
||
|
||
recommendations = len(related["recommendation_rows"])
|
||
hit_rate = round(recommendations / fine_qualified * 100, 1) if fine_qualified else 0
|
||
issue_notes = []
|
||
if status != "success":
|
||
issue_notes.append(run.get("error_message") or "任务异常")
|
||
if rough_candidates and not fine_qualified:
|
||
issue_notes.append("粗筛后细筛为空")
|
||
if fine_qualified and not confirm_hits:
|
||
issue_notes.append("确认未命中")
|
||
if confirm_hits and not recommendations:
|
||
issue_notes.append("确认命中但未生成推荐")
|
||
if perf_counts["failed"]:
|
||
issue_notes.append(f"失败 {perf_counts['failed']}")
|
||
if related["missed_rows"]:
|
||
issue_notes.append(f"漏选 {len(related['missed_rows'])}")
|
||
|
||
return {
|
||
"id": run.get("id"),
|
||
"run_id": run.get("id"),
|
||
"job_name": run.get("job_name"),
|
||
"script_name": run.get("script_name"),
|
||
"started_at": run.get("started_at"),
|
||
"finished_at": run.get("finished_at"),
|
||
"duration_ms": _safe_int(run.get("duration_ms")),
|
||
"run_status": status,
|
||
"result_status": run.get("result_status") or "",
|
||
"error_message": run.get("error_message") or "",
|
||
"window_start": related["window_start"],
|
||
"window_end": related["window_end"],
|
||
"rough_candidates": rough_candidates,
|
||
"fine_qualified": fine_qualified,
|
||
"confirm_processed": confirm_processed,
|
||
"confirm_hits": confirm_hits,
|
||
"recommendations": recommendations,
|
||
"perf_success": perf_counts["success"],
|
||
"perf_failed": perf_counts["failed"],
|
||
"perf_pending": perf_counts["pending"],
|
||
"missed_count": len(related["missed_rows"]),
|
||
"event_count": sum(_safe_int((row.get("summary_json") or {}).get("processed_count")) for row in event_rows),
|
||
"tracked_count": sum(_safe_int((row.get("summary_json") or {}).get("tracked_count")) for row in track_rows),
|
||
"review_count": len(related["review_rows"]),
|
||
"review_run_count": len(review_cron_rows),
|
||
"hit_rate": hit_rate,
|
||
"issue_notes": issue_notes[:3],
|
||
}
|
||
|
||
|
||
def get_pipeline_runs(limit=30, hours=24, offset=0):
|
||
"""按粗筛任务批次聚合推荐链路日志。"""
|
||
try:
|
||
limit = max(1, min(int(limit or 30), 100))
|
||
except Exception:
|
||
limit = 30
|
||
try:
|
||
offset = max(0, int(offset or 0))
|
||
except Exception:
|
||
offset = 0
|
||
try:
|
||
hours = max(1, min(int(hours or 24), 24 * 30))
|
||
except Exception:
|
||
hours = 24
|
||
|
||
conn = get_conn()
|
||
total_count = conn.execute(
|
||
"""
|
||
SELECT COUNT(*)
|
||
FROM cron_run_log
|
||
WHERE job_name = '粗筛'
|
||
AND julianday(?) - julianday(started_at) <= ?
|
||
""",
|
||
(datetime.now().isoformat(), hours / 24.0),
|
||
).fetchone()[0]
|
||
run_rows = conn.execute(
|
||
"""
|
||
SELECT * FROM cron_run_log
|
||
WHERE job_name = '粗筛'
|
||
AND julianday(?) - julianday(started_at) <= ?
|
||
ORDER BY started_at DESC, id DESC
|
||
LIMIT ?
|
||
OFFSET ?
|
||
""",
|
||
(datetime.now().isoformat(), hours / 24.0, limit, offset),
|
||
).fetchall()
|
||
all_run_rows = conn.execute(
|
||
"""
|
||
SELECT * FROM cron_run_log
|
||
WHERE job_name = '粗筛'
|
||
AND julianday(?) - julianday(started_at) <= ?
|
||
ORDER BY started_at DESC, id DESC
|
||
""",
|
||
(datetime.now().isoformat(), hours / 24.0),
|
||
).fetchall()
|
||
|
||
runs = []
|
||
for row in run_rows:
|
||
run = _cron_item(row)
|
||
related = _select_pipeline_rows(conn, run)
|
||
runs.append(_pipeline_summary_for_run(run, related))
|
||
all_summaries = []
|
||
for row in all_run_rows:
|
||
run = _cron_item(row)
|
||
related = _select_pipeline_rows(conn, run)
|
||
all_summaries.append(_pipeline_summary_for_run(run, related))
|
||
conn.close()
|
||
|
||
kpi = {
|
||
"hours": hours,
|
||
"run_count": len(all_summaries),
|
||
"rough_candidates": sum(item["rough_candidates"] for item in all_summaries),
|
||
"fine_qualified": sum(item["fine_qualified"] for item in all_summaries),
|
||
"confirm_processed": sum(item["confirm_processed"] for item in all_summaries),
|
||
"confirm_hits": sum(item["confirm_hits"] for item in all_summaries),
|
||
"recommendations": sum(item["recommendations"] for item in all_summaries),
|
||
"perf_success": sum(item["perf_success"] for item in all_summaries),
|
||
"perf_failed": sum(item["perf_failed"] for item in all_summaries),
|
||
"perf_pending": sum(item["perf_pending"] for item in all_summaries),
|
||
"missed_count": sum(item["missed_count"] for item in all_summaries),
|
||
}
|
||
kpi["recommendation_rate"] = round(kpi["recommendations"] / kpi["fine_qualified"] * 100, 1) if kpi["fine_qualified"] else 0
|
||
kpi["performance_hit_rate"] = round(kpi["perf_success"] / (kpi["perf_success"] + kpi["perf_failed"]) * 100, 1) if (kpi["perf_success"] + kpi["perf_failed"]) else 0
|
||
total_pages = (total_count + limit - 1) // limit if total_count else 0
|
||
current_page = (offset // limit) + 1 if total_count else 0
|
||
return {
|
||
"kpi": kpi,
|
||
"runs": runs,
|
||
"pagination": {
|
||
"hours": hours,
|
||
"limit": limit,
|
||
"offset": offset,
|
||
"total_count": total_count,
|
||
"total_pages": total_pages,
|
||
"page": current_page,
|
||
"has_more": offset + limit < total_count,
|
||
},
|
||
}
|
||
|
||
|
||
def get_pipeline_run_detail(run_id):
|
||
"""返回某次粗筛批次的链路明细。"""
|
||
conn = get_conn()
|
||
row = conn.execute("SELECT * FROM cron_run_log WHERE id=? AND job_name='粗筛'", (run_id,)).fetchone()
|
||
if not row:
|
||
conn.close()
|
||
return None
|
||
run = _cron_item(row)
|
||
related = _select_pipeline_rows(conn, run)
|
||
conn.close()
|
||
|
||
summary = _pipeline_summary_for_run(run, related)
|
||
reviews_by_rec = {}
|
||
for review in related["review_rows"]:
|
||
reviews_by_rec.setdefault(review.get("rec_id"), []).append(review)
|
||
|
||
recommendations = []
|
||
for rec in related["recommendation_rows"]:
|
||
rec_reviews = reviews_by_rec.get(rec.get("id"), [])
|
||
rec["performance_status"] = _performance_status(rec, reviews_by_rec)
|
||
rec["reviews"] = rec_reviews
|
||
if rec["performance_status"] == "success":
|
||
rec["stage_label"] = "复盘命中"
|
||
elif rec["performance_status"] == "failed":
|
||
rec["stage_label"] = "复盘失败"
|
||
else:
|
||
rec["stage_label"] = "交易推荐"
|
||
recommendations.append(rec)
|
||
|
||
timeline = []
|
||
for cron in related["cron_rows"]:
|
||
s = cron.get("summary_json") or {}
|
||
timeline.append({
|
||
"stage": cron.get("job_name") or "任务",
|
||
"started_at": cron.get("started_at"),
|
||
"finished_at": cron.get("finished_at"),
|
||
"duration_ms": _safe_int(cron.get("duration_ms")),
|
||
"run_status": cron.get("run_status") or "",
|
||
"result_status": cron.get("result_status") or "",
|
||
"summary": s,
|
||
"error_message": cron.get("error_message") or "",
|
||
})
|
||
|
||
screening_items = related["screening_rows"]
|
||
stage_counts = {
|
||
"observation": sum(1 for item in screening_items if item.get("layer") == "粗筛"),
|
||
"fine": sum(1 for item in screening_items if item.get("layer") == "细筛"),
|
||
"confirm_rejected": max(0, summary["confirm_processed"] - summary["confirm_hits"]),
|
||
"recommendation": len(recommendations),
|
||
"review_success": summary["perf_success"],
|
||
"review_failed": summary["perf_failed"],
|
||
"missed": summary["missed_count"],
|
||
}
|
||
|
||
return {
|
||
"summary": summary,
|
||
"timeline": timeline,
|
||
"stage_counts": stage_counts,
|
||
"screening_items": screening_items,
|
||
"recommendations": recommendations,
|
||
"reviews": related["review_rows"],
|
||
"missed_explosions": related["missed_rows"],
|
||
}
|
||
|
||
|
||
__all__ = [
|
||
"get_all_recommendations",
|
||
"get_observation_candidates",
|
||
"get_cron_run_logs",
|
||
"get_cron_run_summary",
|
||
"get_pipeline_run_detail",
|
||
"get_pipeline_runs",
|
||
"get_review_stats",
|
||
"get_screening_history",
|
||
"get_stats",
|
||
]
|