178 lines
6.9 KiB
Python
178 lines
6.9 KiB
Python
"""Short-timeframe signal sampling and review queries."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.db.postgres_connection import ensure_migrations_once
|
|
from app.db.schema import get_conn
|
|
|
|
|
|
def _json(data) -> str:
|
|
return json.dumps(data or {}, ensure_ascii=False)
|
|
|
|
|
|
def _loads(value, fallback):
|
|
try:
|
|
if isinstance(value, str) and value.strip():
|
|
return json.loads(value)
|
|
if value:
|
|
return value
|
|
except Exception:
|
|
pass
|
|
return fallback
|
|
|
|
|
|
def record_short_tf_samples(symbol: str, candidate: dict) -> int:
|
|
"""Persist short-timeframe discovery samples for later evidence-based review."""
|
|
short_tf = (candidate or {}).get("short_tf_ignition") or {}
|
|
signals = [dict(x or {}) for x in short_tf.get("signals", []) if (x or {}).get("found")]
|
|
if not signals:
|
|
return 0
|
|
ensure_migrations_once()
|
|
now_dt = datetime.now()
|
|
# Scheduler may retry within the same minute; keep one sample per symbol/timeframe/minute.
|
|
signal_time = now_dt.replace(second=0, microsecond=0).isoformat(timespec="seconds")
|
|
created_at = now_dt.isoformat(timespec="seconds")
|
|
symbol = str(symbol or "").upper().strip()
|
|
conn = get_conn()
|
|
count = 0
|
|
for item in signals:
|
|
tf = str(item.get("timeframe") or "").strip()
|
|
code = "short_tf_15m_ignition" if tf == "15m" else "short_tf_5m_ignition" if tf == "5m" else "short_tf_ignition"
|
|
trigger = item.get("trigger") or {}
|
|
context = {
|
|
"short_tf_ignition": short_tf,
|
|
"anomalies": candidate.get("anomalies") or [],
|
|
"signal_recency": candidate.get("signal_recency") or {},
|
|
"top_gainer_24h": bool(candidate.get("top_gainer_24h")),
|
|
"static_accumulation": candidate.get("static_accumulation") or {},
|
|
"higher_lows": candidate.get("higher_lows") or {},
|
|
"compression_surge": candidate.get("compression_surge") or {},
|
|
}
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO short_tf_signal_samples (
|
|
signal_time, symbol, timeframe, signal_code, signal_label,
|
|
entry_price, volume_24h, change_24h, gain_pct, vol_ratio,
|
|
body_ratio, age_bars, resonance, context_json, review_status, created_at
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'pending', %s)
|
|
ON CONFLICT(symbol, timeframe, signal_code, signal_time) DO NOTHING
|
|
""",
|
|
(
|
|
signal_time,
|
|
symbol,
|
|
tf,
|
|
code,
|
|
item.get("signal") or code,
|
|
float(candidate.get("price") or trigger.get("price") or 0),
|
|
float(candidate.get("volume_24h") or 0),
|
|
float(candidate.get("change_24h") or 0),
|
|
float(item.get("gain_pct") or 0),
|
|
float(trigger.get("vol_ratio") or 0),
|
|
float(trigger.get("body_ratio") or 0),
|
|
int(trigger.get("age_bars") or 0),
|
|
1 if short_tf.get("resonance") else 0,
|
|
_json(context),
|
|
created_at,
|
|
),
|
|
)
|
|
if getattr(cur, "rowcount", 0) > 0:
|
|
count += 1
|
|
conn.commit()
|
|
conn.close()
|
|
return count
|
|
|
|
|
|
def get_short_tf_signal_review(hours: int = 168, limit: int = 200) -> dict:
|
|
"""Return performance-style read model for short-timeframe samples."""
|
|
ensure_migrations_once()
|
|
try:
|
|
hours = max(1, min(int(hours or 168), 24 * 90))
|
|
except Exception:
|
|
hours = 168
|
|
try:
|
|
limit = max(1, min(int(limit or 200), 1000))
|
|
except Exception:
|
|
limit = 200
|
|
since = (datetime.now() - timedelta(hours=hours)).isoformat(timespec="seconds")
|
|
conn = get_conn()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT s.*,
|
|
lpc.price AS latest_price,
|
|
lpc.updated_at AS latest_price_at,
|
|
r.id AS recommendation_id,
|
|
r.execution_status,
|
|
r.action_status,
|
|
r.display_bucket,
|
|
r.rec_time
|
|
FROM short_tf_signal_samples s
|
|
LEFT JOIN latest_price_cache lpc ON lpc.symbol = s.symbol
|
|
LEFT JOIN LATERAL (
|
|
SELECT id, execution_status, action_status, display_bucket, rec_time
|
|
FROM recommendation
|
|
WHERE symbol = s.symbol AND rec_time >= s.signal_time
|
|
ORDER BY rec_time ASC, id ASC
|
|
LIMIT 1
|
|
) r ON TRUE
|
|
WHERE s.signal_time >= %s
|
|
ORDER BY s.signal_time DESC, s.id DESC
|
|
LIMIT %s
|
|
""",
|
|
(since, limit),
|
|
).fetchall()
|
|
conn.close()
|
|
items = []
|
|
by_code: dict[str, dict] = {}
|
|
for row in rows:
|
|
item = dict(row)
|
|
item["context_json"] = _loads(item.get("context_json"), {})
|
|
entry = float(item.get("entry_price") or 0)
|
|
latest = float(item.get("latest_price") or 0)
|
|
item["latest_return_pct"] = round((latest / entry - 1) * 100, 2) if entry > 0 and latest > 0 else 0
|
|
item["converted_to_recommendation"] = bool(item.get("recommendation_id"))
|
|
items.append(item)
|
|
|
|
code = item.get("signal_code") or "unknown"
|
|
bucket = by_code.setdefault(code, {"signal_code": code, "count": 0, "wins": 0, "losses": 0, "converted": 0, "total_return": 0.0})
|
|
bucket["count"] += 1
|
|
bucket["converted"] += 1 if item["converted_to_recommendation"] else 0
|
|
bucket["total_return"] += item["latest_return_pct"]
|
|
if item["latest_return_pct"] >= 2:
|
|
bucket["wins"] += 1
|
|
elif item["latest_return_pct"] <= -2:
|
|
bucket["losses"] += 1
|
|
|
|
summary = []
|
|
total_return = 0.0
|
|
converted_count = 0
|
|
for bucket in by_code.values():
|
|
count = bucket["count"]
|
|
total_return += bucket["total_return"]
|
|
converted_count += bucket["converted"]
|
|
summary.append({
|
|
**bucket,
|
|
"signal_label": {
|
|
"short_tf_15m_ignition": "15m 短周期启动",
|
|
"short_tf_5m_ignition": "5m 极早期启动",
|
|
"short_tf_ignition": "短周期启动",
|
|
}.get(bucket["signal_code"], bucket["signal_code"]),
|
|
"avg_return_pct": round(bucket["total_return"] / count, 2) if count else 0,
|
|
"win_rate": round(bucket["wins"] / count * 100, 1) if count else 0,
|
|
"conversion_rate": round(bucket["converted"] / count * 100, 1) if count else 0,
|
|
})
|
|
summary.sort(key=lambda x: (x["count"], x["avg_return_pct"]), reverse=True)
|
|
total_count = len(items)
|
|
return {
|
|
"hours": hours,
|
|
"total_samples": total_count,
|
|
"converted_count": converted_count,
|
|
"avg_return_pct": round(total_return / total_count, 2) if total_count else 0,
|
|
"summary": summary,
|
|
"items": items,
|
|
"note": "短周期信号只做发现证据采样,不直接触发交易动作;是否提权需看后续转推荐率和收益表现。",
|
|
}
|