276 lines
10 KiB
Python
276 lines
10 KiB
Python
"""Intraday trading frequency health read model."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from collections import Counter
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.config.system_config import paper_trading_config
|
|
from app.db.schema import get_conn
|
|
|
|
|
|
def _safe_int(value, default=0):
|
|
try:
|
|
return int(value or 0)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _safe_float(value, default=0.0):
|
|
try:
|
|
return float(value or 0)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _loads(value, fallback=None):
|
|
try:
|
|
if isinstance(value, (dict, list)):
|
|
return value
|
|
if isinstance(value, str) and value.strip():
|
|
return json.loads(value)
|
|
except Exception:
|
|
pass
|
|
return fallback if fallback is not None else {}
|
|
|
|
|
|
def _since(days):
|
|
return (datetime.now() - timedelta(days=max(1, min(_safe_int(days, 1), 30)))).isoformat()
|
|
|
|
|
|
def _date_expr(column):
|
|
return f"substr({column},1,10)"
|
|
|
|
|
|
def _counter_items(counter: Counter, limit=10):
|
|
return [{"reason": key, "count": count} for key, count in counter.most_common(limit)]
|
|
|
|
|
|
def _compact_detail(detail: dict) -> dict:
|
|
gate_detail = detail.get("gate_detail") if isinstance(detail.get("gate_detail"), dict) else {}
|
|
return {
|
|
"reason": detail.get("reason") or "",
|
|
"gate_reasons": detail.get("gate_reasons") if isinstance(detail.get("gate_reasons"), list) else [],
|
|
"target_price": gate_detail.get("target_price"),
|
|
"distance_to_entry_pct": gate_detail.get("distance_to_entry_pct"),
|
|
"max_distance_to_entry_pct": gate_detail.get("max_distance_to_entry_pct"),
|
|
"rr1": gate_detail.get("rr1"),
|
|
"min_rr": gate_detail.get("min_rr"),
|
|
"rec_score": gate_detail.get("rec_score"),
|
|
"min_rec_score": gate_detail.get("min_rec_score"),
|
|
"execution_status": detail.get("execution_status") or "",
|
|
"action_status": detail.get("action_status") or "",
|
|
}
|
|
|
|
|
|
def _extract_gate_reasons(rows):
|
|
counter = Counter()
|
|
samples = []
|
|
seen = set()
|
|
for row in rows:
|
|
detail = _loads(row.get("detail_json"), {})
|
|
dedupe_key = (
|
|
row.get("symbol") or "",
|
|
row.get("strategy_code") or detail.get("strategy_code") or "",
|
|
detail.get("recommendation_id") or detail.get("rec_id") or "",
|
|
detail.get("reason") or "",
|
|
tuple(detail.get("gate_reasons") if isinstance(detail.get("gate_reasons"), list) else []),
|
|
)
|
|
if dedupe_key in seen:
|
|
continue
|
|
seen.add(dedupe_key)
|
|
reason = str(detail.get("reason") or "").strip()
|
|
if reason:
|
|
counter[reason] += 1
|
|
gate_reasons = detail.get("gate_reasons")
|
|
if isinstance(gate_reasons, list):
|
|
for item in gate_reasons:
|
|
if item:
|
|
counter[str(item)] += 1
|
|
risk_detail = detail.get("risk_detail") if isinstance(detail.get("risk_detail"), dict) else {}
|
|
decision = str(risk_detail.get("decision") or "").strip()
|
|
if decision:
|
|
counter[decision] += 1
|
|
samples.append({
|
|
"time": row.get("event_time") or "",
|
|
"symbol": row.get("symbol") or "",
|
|
"strategy_code": row.get("strategy_code") or detail.get("strategy_code") or "",
|
|
"reason": reason or (gate_reasons[0] if isinstance(gate_reasons, list) and gate_reasons else decision),
|
|
"detail": _compact_detail(detail),
|
|
})
|
|
return _counter_items(counter), samples[:12]
|
|
|
|
|
|
def get_intraday_frequency_health(days: int = 1) -> dict:
|
|
"""Summarize whether the intraday paper-trading funnel is active enough."""
|
|
days = max(1, min(_safe_int(days, 1), 30))
|
|
since = _since(days)
|
|
cfg = paper_trading_config()
|
|
target_min = max(0, _safe_int(cfg.get("target_trades_per_day_min"), 3))
|
|
target_max = max(target_min, _safe_int(cfg.get("target_trades_per_day_max"), 5))
|
|
conn = get_conn()
|
|
try:
|
|
screening = conn.execute(
|
|
f"""
|
|
SELECT {_date_expr('scan_time')} AS d, layer, state, COUNT(*) AS c
|
|
FROM screening_log
|
|
WHERE scan_time >= %s
|
|
GROUP BY d, layer, state
|
|
ORDER BY d DESC
|
|
""",
|
|
(since,),
|
|
).fetchall()
|
|
recommendations = conn.execute(
|
|
f"""
|
|
SELECT {_date_expr('rec_time')} AS d, execution_status, strategy_code, direction, COUNT(*) AS c
|
|
FROM recommendation
|
|
WHERE rec_time >= %s
|
|
GROUP BY d, execution_status, strategy_code, direction
|
|
ORDER BY d DESC
|
|
""",
|
|
(since,),
|
|
).fetchall()
|
|
orders = conn.execute(
|
|
f"""
|
|
SELECT {_date_expr('created_at')} AS d, status, strategy_code, side, COUNT(*) AS c
|
|
FROM paper_orders
|
|
WHERE created_at >= %s
|
|
GROUP BY d, status, strategy_code, side
|
|
ORDER BY d DESC
|
|
""",
|
|
(since,),
|
|
).fetchall()
|
|
trades = conn.execute(
|
|
f"""
|
|
SELECT {_date_expr('opened_at')} AS d, status, strategy_code, side, COUNT(*) AS c
|
|
FROM paper_trades
|
|
WHERE opened_at >= %s
|
|
GROUP BY d, status, strategy_code, side
|
|
ORDER BY d DESC
|
|
""",
|
|
(since,),
|
|
).fetchall()
|
|
gate_rows = [dict(r) for r in conn.execute(
|
|
"""
|
|
SELECT event_time, symbol, strategy_code, detail_json
|
|
FROM paper_trade_events
|
|
WHERE event_time >= %s AND event_type IN ('paper_gate_reject','paper_order_cancel')
|
|
ORDER BY event_time DESC, id DESC
|
|
LIMIT 200
|
|
""",
|
|
(since,),
|
|
).fetchall()]
|
|
finally:
|
|
conn.close()
|
|
|
|
by_day = {}
|
|
for row in screening:
|
|
d = row["d"] or "unknown"
|
|
item = by_day.setdefault(d, _empty_day(d))
|
|
layer = str(row["layer"] or "")
|
|
state = str(row["state"] or "")
|
|
count = _safe_int(row["c"])
|
|
if layer == "粗筛":
|
|
item["rough_candidates"] += count
|
|
elif layer == "细筛":
|
|
item["quality_candidates"] += count
|
|
elif layer == "确认":
|
|
item["confirmed_candidates"] += count
|
|
elif layer == "universe_gate":
|
|
item["universe_filtered"] += count
|
|
item["screening_breakdown"].append({"layer": layer, "state": state, "count": count})
|
|
|
|
for row in recommendations:
|
|
d = row["d"] or "unknown"
|
|
item = by_day.setdefault(d, _empty_day(d))
|
|
status = str(row["execution_status"] or "")
|
|
count = _safe_int(row["c"])
|
|
item["recommendations"] += count
|
|
if status in {"buy_now", "wait_pullback"}:
|
|
item["actionable_opportunities"] += count
|
|
elif status == "observe":
|
|
item["observe_opportunities"] += count
|
|
item["recommendation_breakdown"].append(dict(row))
|
|
|
|
for row in orders:
|
|
d = row["d"] or "unknown"
|
|
item = by_day.setdefault(d, _empty_day(d))
|
|
status = str(row["status"] or "")
|
|
count = _safe_int(row["c"])
|
|
item["paper_orders"] += count
|
|
if status == "pending":
|
|
item["pending_orders"] += count
|
|
elif status == "filled":
|
|
item["filled_orders"] += count
|
|
elif status in {"canceled", "expired"}:
|
|
item["canceled_or_expired_orders"] += count
|
|
item["order_breakdown"].append(dict(row))
|
|
|
|
for row in trades:
|
|
d = row["d"] or "unknown"
|
|
item = by_day.setdefault(d, _empty_day(d))
|
|
count = _safe_int(row["c"])
|
|
item["paper_trades"] += count
|
|
item["trade_breakdown"].append(dict(row))
|
|
|
|
gate_reasons, gate_samples = _extract_gate_reasons(gate_rows)
|
|
days_list = sorted(by_day.values(), key=lambda x: x["date"], reverse=True)
|
|
for item in days_list:
|
|
item["paper_actions"] = item["paper_orders"] + item["paper_trades"]
|
|
converted = item["paper_trades"] + item["filled_orders"] + item["pending_orders"]
|
|
item["paper_converted_count"] = converted
|
|
item["trade_conversion_pct"] = round(min(converted, item["actionable_opportunities"]) / item["actionable_opportunities"] * 100, 1) if item["actionable_opportunities"] else 0
|
|
if item["paper_actions"] < target_min:
|
|
item["health_status"] = "low_frequency"
|
|
item["health_label"] = f"低于日内目标 {target_min}-{target_max} 单"
|
|
elif item["paper_actions"] > target_max:
|
|
item["health_status"] = "high_frequency"
|
|
item["health_label"] = f"高于日内目标 {target_min}-{target_max} 单"
|
|
else:
|
|
item["health_status"] = "healthy"
|
|
item["health_label"] = f"达到日内目标 {target_min}-{target_max} 单"
|
|
|
|
latest = days_list[0] if days_list else _empty_day(datetime.now().date().isoformat())
|
|
return {
|
|
"mode": cfg.get("trading_mode") or "intraday_trading",
|
|
"target_trades_per_day": {"min": target_min, "max": target_max},
|
|
"days": days,
|
|
"generated_at": datetime.now().isoformat(timespec="seconds"),
|
|
"latest": latest,
|
|
"daily": days_list,
|
|
"gate_reasons": gate_reasons,
|
|
"gate_samples": gate_samples,
|
|
"definition": "日内频率健康度只衡量机会到策略交易账本的转化,不代表收益质量。",
|
|
}
|
|
|
|
|
|
def _empty_day(day):
|
|
return {
|
|
"date": day,
|
|
"universe_filtered": 0,
|
|
"rough_candidates": 0,
|
|
"quality_candidates": 0,
|
|
"confirmed_candidates": 0,
|
|
"recommendations": 0,
|
|
"actionable_opportunities": 0,
|
|
"observe_opportunities": 0,
|
|
"paper_orders": 0,
|
|
"pending_orders": 0,
|
|
"filled_orders": 0,
|
|
"canceled_or_expired_orders": 0,
|
|
"paper_trades": 0,
|
|
"paper_actions": 0,
|
|
"paper_converted_count": 0,
|
|
"trade_conversion_pct": 0,
|
|
"health_status": "unknown",
|
|
"health_label": "暂无数据",
|
|
"screening_breakdown": [],
|
|
"recommendation_breakdown": [],
|
|
"order_breakdown": [],
|
|
"trade_breakdown": [],
|
|
}
|
|
|
|
|
|
__all__ = ["get_intraday_frequency_health"]
|