alphax/app/db/short_tf_signals.py
2026-05-25 08:53:21 +08:00

178 lines
6.9 KiB
Python

"""Short-timeframe signal sampling and review queries."""
from __future__ import annotations
import json
from datetime import datetime, timedelta
from app.db.postgres_connection import ensure_migrations_once
from app.db.schema import get_conn
def _json(data) -> str:
return json.dumps(data or {}, ensure_ascii=False)
def _loads(value, fallback):
try:
if isinstance(value, str) and value.strip():
return json.loads(value)
if value:
return value
except Exception:
pass
return fallback
def record_short_tf_samples(symbol: str, candidate: dict) -> int:
"""Persist short-timeframe discovery samples for later evidence-based review."""
short_tf = (candidate or {}).get("short_tf_ignition") or {}
signals = [dict(x or {}) for x in short_tf.get("signals", []) if (x or {}).get("found")]
if not signals:
return 0
ensure_migrations_once()
now_dt = datetime.now()
# Scheduler may retry within the same minute; keep one sample per symbol/timeframe/minute.
signal_time = now_dt.replace(second=0, microsecond=0).isoformat(timespec="seconds")
created_at = now_dt.isoformat(timespec="seconds")
symbol = str(symbol or "").upper().strip()
conn = get_conn()
count = 0
for item in signals:
tf = str(item.get("timeframe") or "").strip()
code = "short_tf_15m_ignition" if tf == "15m" else "short_tf_5m_ignition" if tf == "5m" else "short_tf_ignition"
trigger = item.get("trigger") or {}
context = {
"short_tf_ignition": short_tf,
"anomalies": candidate.get("anomalies") or [],
"signal_recency": candidate.get("signal_recency") or {},
"top_gainer_24h": bool(candidate.get("top_gainer_24h")),
"static_accumulation": candidate.get("static_accumulation") or {},
"higher_lows": candidate.get("higher_lows") or {},
"compression_surge": candidate.get("compression_surge") or {},
}
cur = conn.execute(
"""
INSERT INTO short_tf_signal_samples (
signal_time, symbol, timeframe, signal_code, signal_label,
entry_price, volume_24h, change_24h, gain_pct, vol_ratio,
body_ratio, age_bars, resonance, context_json, review_status, created_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'pending', %s)
ON CONFLICT(symbol, timeframe, signal_code, signal_time) DO NOTHING
""",
(
signal_time,
symbol,
tf,
code,
item.get("signal") or code,
float(candidate.get("price") or trigger.get("price") or 0),
float(candidate.get("volume_24h") or 0),
float(candidate.get("change_24h") or 0),
float(item.get("gain_pct") or 0),
float(trigger.get("vol_ratio") or 0),
float(trigger.get("body_ratio") or 0),
int(trigger.get("age_bars") or 0),
1 if short_tf.get("resonance") else 0,
_json(context),
created_at,
),
)
if getattr(cur, "rowcount", 0) > 0:
count += 1
conn.commit()
conn.close()
return count
def get_short_tf_signal_review(hours: int = 168, limit: int = 200) -> dict:
"""Return performance-style read model for short-timeframe samples."""
ensure_migrations_once()
try:
hours = max(1, min(int(hours or 168), 24 * 90))
except Exception:
hours = 168
try:
limit = max(1, min(int(limit or 200), 1000))
except Exception:
limit = 200
since = (datetime.now() - timedelta(hours=hours)).isoformat(timespec="seconds")
conn = get_conn()
rows = conn.execute(
"""
SELECT s.*,
lpc.price AS latest_price,
lpc.updated_at AS latest_price_at,
r.id AS recommendation_id,
r.execution_status,
r.action_status,
r.display_bucket,
r.rec_time
FROM short_tf_signal_samples s
LEFT JOIN latest_price_cache lpc ON lpc.symbol = s.symbol
LEFT JOIN LATERAL (
SELECT id, execution_status, action_status, display_bucket, rec_time
FROM recommendation
WHERE symbol = s.symbol AND rec_time >= s.signal_time
ORDER BY rec_time ASC, id ASC
LIMIT 1
) r ON TRUE
WHERE s.signal_time >= %s
ORDER BY s.signal_time DESC, s.id DESC
LIMIT %s
""",
(since, limit),
).fetchall()
conn.close()
items = []
by_code: dict[str, dict] = {}
for row in rows:
item = dict(row)
item["context_json"] = _loads(item.get("context_json"), {})
entry = float(item.get("entry_price") or 0)
latest = float(item.get("latest_price") or 0)
item["latest_return_pct"] = round((latest / entry - 1) * 100, 2) if entry > 0 and latest > 0 else 0
item["converted_to_recommendation"] = bool(item.get("recommendation_id"))
items.append(item)
code = item.get("signal_code") or "unknown"
bucket = by_code.setdefault(code, {"signal_code": code, "count": 0, "wins": 0, "losses": 0, "converted": 0, "total_return": 0.0})
bucket["count"] += 1
bucket["converted"] += 1 if item["converted_to_recommendation"] else 0
bucket["total_return"] += item["latest_return_pct"]
if item["latest_return_pct"] >= 2:
bucket["wins"] += 1
elif item["latest_return_pct"] <= -2:
bucket["losses"] += 1
summary = []
total_return = 0.0
converted_count = 0
for bucket in by_code.values():
count = bucket["count"]
total_return += bucket["total_return"]
converted_count += bucket["converted"]
summary.append({
**bucket,
"signal_label": {
"short_tf_15m_ignition": "15m 短周期启动",
"short_tf_5m_ignition": "5m 极早期启动",
"short_tf_ignition": "短周期启动",
}.get(bucket["signal_code"], bucket["signal_code"]),
"avg_return_pct": round(bucket["total_return"] / count, 2) if count else 0,
"win_rate": round(bucket["wins"] / count * 100, 1) if count else 0,
"conversion_rate": round(bucket["converted"] / count * 100, 1) if count else 0,
})
summary.sort(key=lambda x: (x["count"], x["avg_return_pct"]), reverse=True)
total_count = len(items)
return {
"hours": hours,
"total_samples": total_count,
"converted_count": converted_count,
"avg_return_pct": round(total_return / total_count, 2) if total_count else 0,
"summary": summary,
"items": items,
"note": "短周期信号只做发现证据采样,不直接触发交易动作;是否提权需看后续转推荐率和收益表现。",
}