"""On-chain discovery storage and read models. The on-chain layer is a research/discovery input. It stores normalized external facts and can enqueue technical-check candidates, but it must not create or mutate trading recommendations directly. """ import json import os from datetime import datetime, timedelta from app.db.altcoin_db import get_conn from app.db.postgres_connection import ensure_migrations_once from app.config.system_config import onchain_config MIN_MAPPING_CONFIDENCE = 70 SIGNAL_LABELS = { "large_token_transfer": "链上大额转账", "dex_volume_spike": "链上成交放量", "liquidity_add": "流动性增加", "liquidity_remove_risk": "流动性撤出风险", "exchange_outflow": "交易所流出", "exchange_inflow_risk": "交易所流入风险", "whale_accumulation": "鲸鱼增持", "holder_growth": "持有人增长", "holder_concentration_risk": "持仓集中风险", "smart_money_buying": "聪明钱买入", } RAW_EVENT_TYPE_LABELS = { "evm_transfer": "EVM 原始转账", } RAW_EVENT_EXPLAINERS = { "evm_transfer": { "plain": "NodeReal 捕捉到 EVM 链上的 ERC-20 Transfer 原始日志。", "meaning": "这代表链上确实有资金转移,但没有完成币种映射前,不能直接进入策略候选。", "priority": "medium", }, } POSITIVE_SIGNALS = {"dex_volume_spike", "liquidity_add", "exchange_outflow", "whale_accumulation", "holder_growth", "smart_money_buying"} RISK_SIGNALS = {"liquidity_remove_risk", "exchange_inflow_risk", "holder_concentration_risk"} STANDARD_SIGNAL_SOURCE = "nodereal" LEGACY_ONCHAIN_SOURCES = {"dexscreener", "etherscan", "helius"} def _now(): return datetime.now().isoformat() def _dump(value): return json.dumps(value or {}, ensure_ascii=False, sort_keys=True, default=str) def _load(value, fallback=None): try: if isinstance(value, str) and value.strip(): return json.loads(value) if value is not None: return value except Exception: pass return fallback def _symbol_base(symbol): return str(symbol or "").upper().replace("/USDT", "").replace("USDT", "").strip() def normalize_symbol(symbol): base = _symbol_base(symbol) return f"{base}/USDT" if base else "" def signal_label(code): return SIGNAL_LABELS.get(str(code or ""), str(code or "链上信号")) def signal_direction(code): code = str(code or "") if code in RISK_SIGNALS: return "risk" if code in POSITIVE_SIGNALS: return "positive" return "neutral" def _is_legacy_onchain_source(source): return str(source or "").lower().strip() in LEGACY_ONCHAIN_SOURCES def raw_event_type_label(event_type): return RAW_EVENT_TYPE_LABELS.get(str(event_type or ""), str(event_type or "链上原始事件")) def raw_event_explainer(event_type): return RAW_EVENT_EXPLAINERS.get( str(event_type or ""), { "plain": "链上或链上相关数据源捕捉到一条原始动态。", "meaning": "需要完成币种映射和质量验证后,才可能进入技术检查。", "priority": "medium", }, ) def init_onchain_tables(): ensure_migrations_once() def upsert_token_mapping(symbol, chain, contract_address, source="", confidence=0, raw=None, is_active=True): init_onchain_tables() now = _now() symbol = normalize_symbol(symbol) chain = str(chain or "").lower().strip() contract_address = str(contract_address or "").strip() if not symbol or not chain or not contract_address: return 0 conn = get_conn() cur = conn.execute( """ INSERT INTO onchain_token_map (symbol, chain, contract_address, source, confidence, is_active, raw_json, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT(symbol, chain, contract_address) DO UPDATE SET source=excluded.source, confidence=GREATEST(onchain_token_map.confidence, excluded.confidence), is_active=excluded.is_active, raw_json=excluded.raw_json, updated_at=excluded.updated_at """, (symbol, chain, contract_address, source or "", int(confidence or 0), 1 if is_active else 0, _dump(raw), now, now), ) conn.commit() row = conn.execute( "SELECT id FROM onchain_token_map WHERE symbol=%s AND chain=%s AND contract_address=%s", (symbol, chain, contract_address), ).fetchone() conn.close() return int(row["id"] if row else 0) def get_token_mappings(symbol="", min_confidence=MIN_MAPPING_CONFIDENCE, active_only=True): init_onchain_tables() clauses = ["confidence >= %s"] params = [int(min_confidence or 0)] if symbol: clauses.append("symbol=%s") params.append(normalize_symbol(symbol)) if active_only: clauses.append("is_active=1") conn = get_conn() rows = conn.execute( f""" SELECT * FROM onchain_token_map WHERE {' AND '.join(clauses)} ORDER BY confidence DESC, updated_at DESC """, tuple(params), ).fetchall() conn.close() return [dict(row) for row in rows] def _event_hash(event): raw = "|".join( [ str(event.get("source") or ""), str(event.get("chain") or ""), str(event.get("symbol") or ""), str(event.get("signal_code") or event.get("event_type") or ""), str(event.get("tx_hash") or event.get("contract_address") or ""), str(event.get("detected_at") or ""), str(round(float(event.get("value_usd") or 0), 2)), ] ).lower() import hashlib return hashlib.sha256(raw.encode()).hexdigest()[:24] def _raw_event_hash(event): raw = "|".join( [ str(event.get("source") or ""), str(event.get("chain") or ""), str(event.get("event_type") or ""), str(event.get("token_address") or ""), str(event.get("url") or ""), str(round(float(event.get("amount") or 0), 4)), str(round(float(event.get("total_amount") or 0), 4)), ] ).lower() import hashlib return hashlib.sha256(raw.encode()).hexdigest()[:24] def insert_onchain_event(event): init_onchain_tables() item = dict(event or {}) item["symbol"] = normalize_symbol(item.get("symbol")) item["chain"] = str(item.get("chain") or "").lower().strip() item["signal_code"] = str(item.get("signal_code") or "").strip() item["event_type"] = str(item.get("event_type") or item["signal_code"] or "onchain_event") if not item["symbol"] or not item["chain"] or not item["signal_code"]: return 0 item["signal_label"] = item.get("signal_label") or signal_label(item["signal_code"]) item["direction"] = item.get("direction") or signal_direction(item["signal_code"]) item["detected_at"] = str(item.get("detected_at") or _now()) item["event_hash"] = item.get("event_hash") or _event_hash(item) conn = get_conn() try: cur = conn.execute( """ INSERT INTO onchain_events ( event_hash, chain, symbol, contract_address, event_type, signal_code, signal_label, direction, value_usd, amount, tx_hash, wallet_address, wallet_label, counterparty_label, confidence, severity, status, detected_at, source, url, raw_json ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT(event_hash) DO NOTHING RETURNING id """, ( item["event_hash"], item["chain"], item["symbol"], item.get("contract_address") or "", item["event_type"], item["signal_code"], item["signal_label"], item["direction"], float(item.get("value_usd") or 0), float(item.get("amount") or 0), item.get("tx_hash") or "", item.get("wallet_address") or "", item.get("wallet_label") or "", item.get("counterparty_label") or "", int(item.get("confidence") or 0), item.get("severity") or "B", item.get("status") or "new", item["detected_at"], item.get("source") or "", item.get("url") or "", _dump(item.get("raw") or item.get("raw_json") or {}), ), ) row = cur.fetchone() event_id = int(row["id"] if row else 0) conn.commit() finally: conn.close() return event_id def find_mapping_by_contract(chain, contract_address): init_onchain_tables() chain = str(chain or "").lower().strip() contract_address = str(contract_address or "").strip() if not chain or not contract_address: return None conn = get_conn() row = conn.execute( """ SELECT * FROM onchain_token_map WHERE chain=%s AND lower(contract_address)=lower(%s) AND is_active=1 ORDER BY confidence DESC, updated_at DESC LIMIT 1 """, (chain, contract_address), ).fetchone() conn.close() return dict(row) if row else None def insert_onchain_raw_event(event): init_onchain_tables() item = dict(event or {}) item["source"] = str(item.get("source") or "").strip() item["chain"] = str(item.get("chain") or "").lower().strip() item["event_type"] = str(item.get("event_type") or "onchain_raw_event").strip() item["token_address"] = str(item.get("token_address") or "").strip() if not item["source"] or not item["chain"] or not item["event_type"] or not item["token_address"]: return 0 item["detected_at"] = str(item.get("detected_at") or _now()) item["event_hash"] = item.get("event_hash") or _raw_event_hash(item) item["mapped_symbol"] = normalize_symbol(item.get("mapped_symbol")) if item.get("mapped_symbol") else "" item["mapping_status"] = str(item.get("mapping_status") or ("mapped" if item["mapped_symbol"] else "unmapped")) conn = get_conn() try: cur = conn.execute( """ INSERT INTO onchain_raw_events ( event_hash, source, chain, event_type, token_address, symbol_guess, name, title, description, url, icon, amount, total_amount, importance, mapped_symbol, mapping_status, detected_at, raw_json ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT(event_hash) DO NOTHING RETURNING id """, ( item["event_hash"], item["source"], item["chain"], item["event_type"], item["token_address"], item.get("symbol_guess") or "", item.get("name") or "", item.get("title") or raw_event_type_label(item["event_type"]), item.get("description") or "", item.get("url") or "", item.get("icon") or "", float(item.get("amount") or 0), float(item.get("total_amount") or 0), float(item.get("importance") or 0), item["mapped_symbol"], item["mapping_status"], item["detected_at"], _dump(item.get("raw") or item.get("raw_json") or {}), ), ) row = cur.fetchone() event_id = int(row["id"] if row else 0) conn.commit() finally: conn.close() return event_id def insert_token_metric(metric): init_onchain_tables() item = dict(metric or {}) item["symbol"] = normalize_symbol(item.get("symbol")) item["chain"] = str(item.get("chain") or "").lower().strip() item["window"] = str(item.get("window") or "1h").strip() item["metric_time"] = str(item.get("metric_time") or _now()) if not item["symbol"] or not item["chain"] or not item["window"]: return 0 conn = get_conn() cur = conn.execute( """ INSERT INTO onchain_token_metrics ( symbol, chain, contract_address, "window", metric_time, dex_volume_usd, dex_volume_change_pct, liquidity_usd, liquidity_change_pct, exchange_netflow_usd, whale_accumulation_usd, holder_delta, smart_money_score, onchain_score, risk_score, source, raw_json ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT(symbol, chain, contract_address, "window", metric_time) DO UPDATE SET dex_volume_usd=excluded.dex_volume_usd, dex_volume_change_pct=excluded.dex_volume_change_pct, liquidity_usd=excluded.liquidity_usd, liquidity_change_pct=excluded.liquidity_change_pct, exchange_netflow_usd=excluded.exchange_netflow_usd, whale_accumulation_usd=excluded.whale_accumulation_usd, holder_delta=excluded.holder_delta, smart_money_score=excluded.smart_money_score, onchain_score=excluded.onchain_score, risk_score=excluded.risk_score, source=excluded.source, raw_json=excluded.raw_json RETURNING id """, ( item["symbol"], item["chain"], item.get("contract_address") or "", item["window"], item["metric_time"], float(item.get("dex_volume_usd") or 0), float(item.get("dex_volume_change_pct") or 0), float(item.get("liquidity_usd") or 0), float(item.get("liquidity_change_pct") or 0), float(item.get("exchange_netflow_usd") or 0), float(item.get("whale_accumulation_usd") or 0), float(item.get("holder_delta") or 0), float(item.get("smart_money_score") or 0), float(item.get("onchain_score") or 0), float(item.get("risk_score") or 0), item.get("source") or "", _dump(item.get("raw") or item.get("raw_json") or {}), ), ) conn.commit() metric_id = int(cur.fetchone()["id"] or 0) conn.close() return metric_id def _latest_metrics_subquery(hours=24): return """ SELECT m.* FROM onchain_token_metrics m JOIN ( SELECT symbol, chain, contract_address, MAX(metric_time) AS max_time FROM onchain_token_metrics WHERE metric_time >= %s GROUP BY symbol, chain, contract_address ) latest ON latest.symbol=m.symbol AND latest.chain=m.chain AND latest.contract_address=m.contract_address AND latest.max_time=m.metric_time """ def get_onchain_overview(hours=24): init_onchain_tables() cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat() conn = get_conn() event_rows = conn.execute("SELECT * FROM onchain_events WHERE detected_at >= %s", (cutoff,)).fetchall() raw_rows = conn.execute("SELECT * FROM onchain_raw_events WHERE detected_at >= %s", (cutoff,)).fetchall() raw_latest = conn.execute( """ SELECT * FROM onchain_raw_events WHERE detected_at >= %s AND event_type NOT IN ('token_profile_latest', 'token_boost_latest', 'token_boost_top') ORDER BY detected_at::timestamp DESC, importance DESC, id DESC LIMIT 12 """, (cutoff,), ).fetchall() metric_rows = conn.execute(_latest_metrics_subquery(hours), (cutoff,)).fetchall() rec_rows = conn.execute( "SELECT symbol, id, execution_status, action_status, display_bucket FROM recommendation WHERE status='active'" ).fetchall() conn.close() active = {row["symbol"]: dict(row) for row in rec_rows} events = [dict(row) for row in event_rows] raw_events = [dict(row) for row in raw_rows] metrics = [dict(row) for row in metric_rows] standard_events = [e for e in events if str(e.get("source") or "").lower() == STANDARD_SIGNAL_SOURCE] node_metrics = [m for m in metrics if not _is_legacy_onchain_source(m.get("source"))] hot = sorted(node_metrics, key=lambda x: float(x.get("onchain_score") or 0), reverse=True)[:8] risks = sorted(node_metrics, key=lambda x: float(x.get("risk_score") or 0), reverse=True)[:8] mapped_feed = _mapped_raw_signal_items(raw_events, active, limit=8) total_netflow = sum(float(x.get("exchange_netflow_usd") or 0) for x in metrics) return { "hours": int(hours or 24), "updated_at": _now(), "kpi": { "event_count": len(standard_events), "raw_event_count": len(raw_events), "raw_unmapped_count": sum(1 for e in raw_events if e.get("mapping_status") == "unmapped"), "raw_mapped_count": sum(1 for e in raw_events if e.get("mapping_status") == "mapped"), "token_count": len({(m["symbol"], m["chain"], m.get("contract_address") or "") for m in node_metrics}) or len({(e.get("mapped_symbol"), e.get("chain")) for e in raw_events if e.get("mapping_status") == "mapped" and e.get("mapped_symbol")}), "positive_events": sum(1 for e in standard_events if e.get("direction") == "positive"), "risk_events": sum(1 for e in standard_events if e.get("direction") == "risk"), "exchange_netflow_usd": round(total_netflow, 2), "mapped_signal_count": len(mapped_feed), }, "hot_tokens": ([_format_metric_item(row, active) for row in hot] or mapped_feed), "risk_tokens": [_format_metric_item(row, active) for row in risks], "raw_events": [_format_raw_event(row) for row in raw_latest], "signals": _signal_counts(standard_events), "provider_status": get_onchain_provider_status(hours=hours), } def get_onchain_provider_status(hours=24): init_onchain_tables() cfg = onchain_config() hours = int(hours or 24) cutoff = (datetime.now() - timedelta(hours=hours)).isoformat() nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY") conn = get_conn() try: raw_total = conn.execute("SELECT COUNT(*) FROM onchain_raw_events WHERE detected_at >= %s", (cutoff,)).fetchone()[0] metric_total = conn.execute( """ SELECT COUNT(*) FROM onchain_token_metrics WHERE metric_time >= %s AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius') """, (cutoff,), ).fetchone()[0] signal_total = conn.execute( """ SELECT COUNT(*) FROM onchain_events WHERE detected_at >= %s AND source=%s """, (cutoff, STANDARD_SIGNAL_SOURCE), ).fetchone()[0] candidate_total = conn.execute( """ SELECT COUNT(*) FROM event_news WHERE source='onchain' AND detected_at >= %s """, (cutoff,), ).fetchone()[0] mapping_total = conn.execute("SELECT COUNT(*) FROM onchain_token_map WHERE is_active=1").fetchone()[0] mapping_usable = conn.execute( "SELECT COUNT(*) FROM onchain_token_map WHERE is_active=1 AND confidence >= %s", (MIN_MAPPING_CONFIDENCE,), ).fetchone()[0] raw_by_type = conn.execute( """ SELECT event_type, COUNT(*) AS count FROM onchain_raw_events WHERE detected_at >= %s GROUP BY event_type ORDER BY count DESC, event_type """, (cutoff,), ).fetchall() metric_sources = conn.execute( """ SELECT source, COUNT(*) AS count FROM onchain_token_metrics WHERE metric_time >= %s GROUP BY source ORDER BY count DESC, source """, (cutoff,), ).fetchall() signal_sources = conn.execute( """ SELECT source, COUNT(*) AS count FROM onchain_events WHERE detected_at >= %s GROUP BY source ORDER BY count DESC, source """, (cutoff,), ).fetchall() last_onchain = conn.execute( """ SELECT * FROM cron_run_log WHERE script_name='onchain_monitor.py' OR job_name IN ('链上','onchain') ORDER BY started_at DESC, id DESC LIMIT 1 """ ).fetchone() finally: conn.close() summary = _load(last_onchain.get("summary_json") if last_onchain else "{}", {}) if last_onchain else {} last_error = last_onchain.get("error_message") if last_onchain else "" provider = str(cfg.get("provider") or "nodereal").strip().lower() nodereal_enabled = bool(cfg.get("nodereal_enabled", True)) and provider == "nodereal" nodereal_metrics = int(sum(row["count"] for row in metric_sources if row["source"] == "nodereal")) nodereal_signals = int(sum(row["count"] for row in signal_sources if row["source"] == "nodereal")) providers = [ { "provider": "nodereal", "label": "NodeReal", "enabled": nodereal_enabled, "api_key_present": bool(os.getenv(nodereal_env, "").strip()), "implemented": True, "role": "EVM 主链上数据源:Transfer 日志、大额转账、holder 变化", "raw_events": int(raw_total or 0), "metrics": nodereal_metrics, "signals": nodereal_signals, "status": _provider_status_label( nodereal_enabled, True, int(raw_total or 0) + nodereal_metrics + nodereal_signals, last_error if "nodereal" in str(last_error).lower() else "", ), }, ] return { "hours": hours, "enabled": bool(cfg.get("enabled", False)), "last_run": dict(last_onchain) if last_onchain else None, "last_summary": summary, "last_error": last_error or "", "coverage": { "active_mappings": int(mapping_total or 0), "usable_mappings": int(mapping_usable or 0), "raw_events": int(raw_total or 0), "metrics": int(metric_total or 0), "signals": int(signal_total or 0), "queued_candidates": int(candidate_total or 0), }, "raw_event_types": [dict(row) for row in raw_by_type], "metric_sources": [dict(row) for row in metric_sources], "signal_sources": [dict(row) for row in signal_sources], "providers": providers, } def _provider_status_label(enabled, implemented, count, last_error=""): if not enabled: return "已关闭" if not implemented: return "未接入采集" if count: return "正常采集中" if last_error: return "最近采集失败" return "暂无数据" def _signal_counts(events): counts = {} for e in events: code = e.get("signal_code") or "" if not code: continue counts.setdefault(code, {"signal_code": code, "signal_label": signal_label(code), "count": 0}) counts[code]["count"] += 1 return sorted(counts.values(), key=lambda x: x["count"], reverse=True) def _format_metric_item(row, active=None): active = active or {} item = dict(row) item["raw"] = _load(item.pop("raw_json", "{}"), {}) rec = active.get(item.get("symbol")) or {} item["recommendation"] = { "rec_id": rec.get("id") or 0, "execution_status": rec.get("execution_status") or "", "action_status": rec.get("action_status") or "", "display_bucket": rec.get("display_bucket") or "", "has_active": bool(rec), } return item def _mapped_raw_signal_items(raw_events, active=None, limit=8): active = active or {} grouped = {} for event in raw_events: if event.get("mapping_status") != "mapped" or not event.get("mapped_symbol"): continue if str(event.get("source") or "").lower() != STANDARD_SIGNAL_SOURCE: continue key = (event.get("mapped_symbol"), event.get("chain") or "") current = grouped.setdefault( key, { "symbol": event.get("mapped_symbol"), "chain": event.get("chain") or "", "contract_address": event.get("token_address") or "", "source": STANDARD_SIGNAL_SOURCE, "onchain_score": 0, "risk_score": 0, "event_count": 0, "mapped_event_count": 0, "latest_event_at": "", "dex_volume_usd": 0, "dex_volume_change_pct": 0, "liquidity_usd": 0, "liquidity_change_pct": 0, }, ) importance = float(event.get("importance") or 0) current["event_count"] += 1 current["mapped_event_count"] += 1 current["onchain_score"] = max(float(current.get("onchain_score") or 0), importance) current["latest_event_at"] = max(str(current.get("latest_event_at") or ""), str(event.get("detected_at") or "")) items = [] for item in grouped.values(): rec = active.get(item.get("symbol")) or {} item["recommendation"] = { "rec_id": rec.get("id") or 0, "execution_status": rec.get("execution_status") or "", "action_status": rec.get("action_status") or "", "display_bucket": rec.get("display_bucket") or "", "has_active": bool(rec), } items.append(item) return sorted(items, key=lambda x: (float(x.get("onchain_score") or 0), str(x.get("latest_event_at") or "")), reverse=True)[: int(limit or 8)] def list_onchain_tokens(limit=30, offset=0, chain="", signal="", hours=24): init_onchain_tables() limit = max(1, min(int(limit or 30), 100)) offset = max(0, int(offset or 0)) cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat() clauses = ["COALESCE(m.source, '') NOT IN ('dexscreener', 'etherscan', 'helius')"] params = [] if chain: clauses.append("m.chain=%s") params.append(str(chain).lower()) if signal: clauses.append( """ EXISTS ( SELECT 1 FROM onchain_events e WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.detected_at >= %s AND e.signal_code=%s AND COALESCE(e.source, '') NOT IN ('dexscreener', 'etherscan', 'helius') ) """ ) params.extend([cutoff, signal]) where = " AND ".join(clauses) if clauses else "1=1" conn = get_conn() total = conn.execute( f""" SELECT COUNT(*) FROM ( SELECT m.symbol, m.chain, m.contract_address FROM onchain_token_metrics m WHERE {where} GROUP BY m.symbol, m.chain, m.contract_address ) """, tuple(params), ).fetchone()[0] rows = conn.execute( f""" SELECT m.*, (SELECT COUNT(*) FROM onchain_events e WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.detected_at >= %s AND COALESCE(e.source, '') NOT IN ('dexscreener', 'etherscan', 'helius')) AS event_count, (SELECT COUNT(*) FROM onchain_events e WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.direction='risk' AND e.detected_at >= %s AND COALESCE(e.source, '') NOT IN ('dexscreener', 'etherscan', 'helius')) AS risk_event_count FROM ({_latest_metrics_subquery(hours)}) m WHERE {where} ORDER BY m.onchain_score DESC, m.risk_score DESC, m.metric_time DESC LIMIT %s OFFSET %s """, (cutoff, cutoff, cutoff, *params, limit, offset), ).fetchall() rec_rows = conn.execute( "SELECT symbol, id, execution_status, action_status, display_bucket FROM recommendation WHERE status='active'" ).fetchall() conn.close() active = {row["symbol"]: dict(row) for row in rec_rows} return { "items": [_format_metric_item(row, active) for row in rows], "total": int(total or 0), "limit": limit, "offset": offset, "has_more": offset + len(rows) < int(total or 0), } def get_onchain_token_detail(symbol, hours=72): init_onchain_tables() symbol = normalize_symbol(symbol) cutoff = (datetime.now() - timedelta(hours=int(hours or 72))).isoformat() conn = get_conn() mappings = conn.execute( "SELECT * FROM onchain_token_map WHERE symbol=%s ORDER BY confidence DESC, updated_at DESC", (symbol,), ).fetchall() events = conn.execute( """ SELECT * FROM onchain_events WHERE symbol=%s AND detected_at >= %s AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius') ORDER BY detected_at DESC, id DESC LIMIT 100 """, (symbol, cutoff), ).fetchall() metrics = conn.execute( """ SELECT * FROM onchain_token_metrics WHERE symbol=%s AND metric_time >= %s AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius') ORDER BY metric_time DESC, id DESC LIMIT 100 """, (symbol, cutoff), ).fetchall() rec = conn.execute( """ SELECT id, rec_time, action_status, execution_status, display_bucket, entry_price, current_price FROM recommendation WHERE symbol=%s AND status='active' ORDER BY id DESC LIMIT 1 """, (symbol,), ).fetchone() conn.close() return { "symbol": symbol, "hours": int(hours or 72), "mappings": [_with_raw(row) for row in mappings], "events": [_with_raw(row) for row in events], "metrics": [_with_raw(row) for row in metrics], "recommendation": dict(rec) if rec else None, } def list_onchain_events(limit=50, offset=0, chain="", signal="", status="", hours=24): init_onchain_tables() limit = max(1, min(int(limit or 50), 200)) offset = max(0, int(offset or 0)) cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat() clauses = ["detected_at >= %s"] params = [cutoff] if chain: clauses.append("chain=%s") params.append(str(chain).lower()) if signal: clauses.append("signal_code=%s") params.append(signal) if status: clauses.append("status=%s") params.append(status) where = " AND ".join(clauses) conn = get_conn() total = conn.execute(f"SELECT COUNT(*) FROM onchain_events WHERE {where}", tuple(params)).fetchone()[0] rows = conn.execute( f""" SELECT * FROM onchain_events WHERE {where} ORDER BY detected_at::timestamp DESC, id DESC LIMIT %s OFFSET %s """, (*params, limit, offset), ).fetchall() conn.close() return {"items": [_with_raw(row) for row in rows], "total": int(total or 0), "limit": limit, "offset": offset, "has_more": offset + len(rows) < int(total or 0)} def list_onchain_raw_events(limit=50, offset=0, chain="", source="", event_type="", mapping_status="", priority="", hours=24): init_onchain_tables() limit = max(1, min(int(limit or 50), 200)) offset = max(0, int(offset or 0)) cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat() clauses = ["detected_at >= %s"] params = [cutoff] if chain: clauses.append("chain=%s") params.append(str(chain).lower()) if source: clauses.append("source=%s") params.append(source) if event_type: clauses.append("event_type=%s") params.append(event_type) if mapping_status: clauses.append("mapping_status=%s") params.append(mapping_status) if priority: if priority == "important": clauses.append("importance >= %s") params.append(70) elif priority == "low": clauses.append("importance < %s") params.append(70) where = " AND ".join(clauses) conn = get_conn() total = conn.execute(f"SELECT COUNT(*) FROM onchain_raw_events WHERE {where}", tuple(params)).fetchone()[0] rows = conn.execute( f""" SELECT * FROM onchain_raw_events WHERE {where} ORDER BY detected_at::timestamp DESC, importance DESC, id DESC LIMIT %s OFFSET %s """, (*params, limit, offset), ).fetchall() conn.close() return { "items": [_format_raw_event(row) for row in rows], "total": int(total or 0), "limit": limit, "offset": offset, "has_more": offset + len(rows) < int(total or 0), } def update_event_status(event_ids, status): if not event_ids: return 0 init_onchain_tables() conn = get_conn() cur = conn.execute( "UPDATE onchain_events SET status=%s WHERE id IN (" + ",".join(["%s"] * len(event_ids)) + ")", (status, *[int(x) for x in event_ids]), ) conn.commit() conn.close() return int(cur.rowcount or 0) def _with_raw(row): item = dict(row) if "raw_json" in item: item["raw"] = _load(item.pop("raw_json"), {}) return item def _format_raw_event(row): item = _with_raw(row) item["event_label"] = raw_event_type_label(item.get("event_type")) explainer = raw_event_explainer(item.get("event_type")) item["plain_summary"] = explainer.get("plain") or "" item["why_matters"] = explainer.get("meaning") or "" item["priority"] = explainer.get("priority") or "medium" item["pipeline_note"] = ( "已映射,可进入后续链上信号分析。" if item.get("mapping_status") == "mapped" else "未完成币种映射,仅作为原始观察,不进入推荐。" ) item["token_short"] = _short_address(item.get("token_address")) return item def _short_address(value): value = str(value or "") if len(value) <= 14: return value return value[:6] + "..." + value[-4:]