"""Short-timeframe signal sampling and review queries.""" from __future__ import annotations import json from datetime import datetime, timedelta from app.db.postgres_connection import ensure_migrations_once from app.db.schema import get_conn def _json(data) -> str: return json.dumps(data or {}, ensure_ascii=False) def _loads(value, fallback): try: if isinstance(value, str) and value.strip(): return json.loads(value) if value: return value except Exception: pass return fallback def record_short_tf_samples(symbol: str, candidate: dict) -> int: """Persist short-timeframe discovery samples for later evidence-based review.""" short_tf = (candidate or {}).get("short_tf_ignition") or {} signals = [dict(x or {}) for x in short_tf.get("signals", []) if (x or {}).get("found")] if not signals: return 0 ensure_migrations_once() now_dt = datetime.now() # Scheduler may retry within the same minute; keep one sample per symbol/timeframe/minute. signal_time = now_dt.replace(second=0, microsecond=0).isoformat(timespec="seconds") created_at = now_dt.isoformat(timespec="seconds") symbol = str(symbol or "").upper().strip() conn = get_conn() count = 0 for item in signals: tf = str(item.get("timeframe") or "").strip() code = "short_tf_15m_ignition" if tf == "15m" else "short_tf_5m_ignition" if tf == "5m" else "short_tf_ignition" trigger = item.get("trigger") or {} context = { "short_tf_ignition": short_tf, "anomalies": candidate.get("anomalies") or [], "signal_recency": candidate.get("signal_recency") or {}, "top_gainer_24h": bool(candidate.get("top_gainer_24h")), "static_accumulation": candidate.get("static_accumulation") or {}, "higher_lows": candidate.get("higher_lows") or {}, "compression_surge": candidate.get("compression_surge") or {}, } cur = conn.execute( """ INSERT INTO short_tf_signal_samples ( signal_time, symbol, timeframe, signal_code, signal_label, entry_price, volume_24h, change_24h, gain_pct, vol_ratio, body_ratio, age_bars, resonance, context_json, review_status, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'pending', %s) ON CONFLICT(symbol, timeframe, signal_code, signal_time) DO NOTHING """, ( signal_time, symbol, tf, code, item.get("signal") or code, float(candidate.get("price") or trigger.get("price") or 0), float(candidate.get("volume_24h") or 0), float(candidate.get("change_24h") or 0), float(item.get("gain_pct") or 0), float(trigger.get("vol_ratio") or 0), float(trigger.get("body_ratio") or 0), int(trigger.get("age_bars") or 0), 1 if short_tf.get("resonance") else 0, _json(context), created_at, ), ) if getattr(cur, "rowcount", 0) > 0: count += 1 conn.commit() conn.close() return count def get_short_tf_signal_review(hours: int = 168, limit: int = 200) -> dict: """Return performance-style read model for short-timeframe samples.""" ensure_migrations_once() try: hours = max(1, min(int(hours or 168), 24 * 90)) except Exception: hours = 168 try: limit = max(1, min(int(limit or 200), 1000)) except Exception: limit = 200 since = (datetime.now() - timedelta(hours=hours)).isoformat(timespec="seconds") conn = get_conn() rows = conn.execute( """ SELECT s.*, lpc.price AS latest_price, lpc.updated_at AS latest_price_at, r.id AS recommendation_id, r.execution_status, r.action_status, r.display_bucket, r.rec_time FROM short_tf_signal_samples s LEFT JOIN latest_price_cache lpc ON lpc.symbol = s.symbol LEFT JOIN LATERAL ( SELECT id, execution_status, action_status, display_bucket, rec_time FROM recommendation WHERE symbol = s.symbol AND rec_time >= s.signal_time ORDER BY rec_time ASC, id ASC LIMIT 1 ) r ON TRUE WHERE s.signal_time >= %s ORDER BY s.signal_time DESC, s.id DESC LIMIT %s """, (since, limit), ).fetchall() conn.close() items = [] by_code: dict[str, dict] = {} for row in rows: item = dict(row) item["context_json"] = _loads(item.get("context_json"), {}) entry = float(item.get("entry_price") or 0) latest = float(item.get("latest_price") or 0) item["latest_return_pct"] = round((latest / entry - 1) * 100, 2) if entry > 0 and latest > 0 else 0 item["converted_to_recommendation"] = bool(item.get("recommendation_id")) items.append(item) code = item.get("signal_code") or "unknown" bucket = by_code.setdefault(code, {"signal_code": code, "count": 0, "wins": 0, "losses": 0, "converted": 0, "total_return": 0.0}) bucket["count"] += 1 bucket["converted"] += 1 if item["converted_to_recommendation"] else 0 bucket["total_return"] += item["latest_return_pct"] if item["latest_return_pct"] >= 2: bucket["wins"] += 1 elif item["latest_return_pct"] <= -2: bucket["losses"] += 1 summary = [] total_return = 0.0 converted_count = 0 for bucket in by_code.values(): count = bucket["count"] total_return += bucket["total_return"] converted_count += bucket["converted"] summary.append({ **bucket, "signal_label": { "short_tf_15m_ignition": "15m 短周期启动", "short_tf_5m_ignition": "5m 极早期启动", "short_tf_ignition": "短周期启动", }.get(bucket["signal_code"], bucket["signal_code"]), "avg_return_pct": round(bucket["total_return"] / count, 2) if count else 0, "win_rate": round(bucket["wins"] / count * 100, 1) if count else 0, "conversion_rate": round(bucket["converted"] / count * 100, 1) if count else 0, }) summary.sort(key=lambda x: (x["count"], x["avg_return_pct"]), reverse=True) total_count = len(items) return { "hours": hours, "total_samples": total_count, "converted_count": converted_count, "avg_return_pct": round(total_return / total_count, 2) if total_count else 0, "summary": summary, "items": items, "note": "短周期信号只做发现证据采样,不直接触发交易动作;是否提权需看后续转推荐率和收益表现。", }