From 24859a6acf4da5d10df8a751f22302203fe622a1 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Thu, 21 May 2026 20:45:43 +0800 Subject: [PATCH] 1 --- app/db/onchain_db.py | 113 +++++++++++++++++++++++++++------ static/onchain.html | 22 +++---- tests/test_onchain_tracking.py | 40 ++++++++++++ 3 files changed, 146 insertions(+), 29 deletions(-) diff --git a/app/db/onchain_db.py b/app/db/onchain_db.py index 54c8f0c..1637d84 100644 --- a/app/db/onchain_db.py +++ b/app/db/onchain_db.py @@ -19,7 +19,7 @@ MIN_MAPPING_CONFIDENCE = 70 SIGNAL_LABELS = { "large_token_transfer": "链上大额转账", - "dex_volume_spike": "DEX 放量", + "dex_volume_spike": "链上成交放量", "liquidity_add": "流动性增加", "liquidity_remove_risk": "流动性撤出风险", "exchange_outflow": "交易所流出", @@ -44,6 +44,8 @@ RAW_EVENT_EXPLAINERS = { POSITIVE_SIGNALS = {"dex_volume_spike", "liquidity_add", "exchange_outflow", "whale_accumulation", "holder_growth", "smart_money_buying"} RISK_SIGNALS = {"liquidity_remove_risk", "exchange_inflow_risk", "holder_concentration_risk"} +STANDARD_SIGNAL_SOURCE = "nodereal" +LEGACY_ONCHAIN_SOURCES = {"dexscreener", "etherscan", "helius"} def _now(): @@ -87,6 +89,10 @@ def signal_direction(code): return "neutral" +def _is_legacy_onchain_source(source): + return str(source or "").lower().strip() in LEGACY_ONCHAIN_SOURCES + + def raw_event_type_label(event_type): return RAW_EVENT_TYPE_LABELS.get(str(event_type or ""), str(event_type or "链上原始事件")) @@ -426,28 +432,31 @@ def get_onchain_overview(hours=24): events = [dict(row) for row in event_rows] raw_events = [dict(row) for row in raw_rows] metrics = [dict(row) for row in metric_rows] - hot = sorted(metrics, key=lambda x: float(x.get("onchain_score") or 0), reverse=True)[:8] - risks = sorted(metrics, key=lambda x: float(x.get("risk_score") or 0), reverse=True)[:8] + standard_events = [e for e in events if str(e.get("source") or "").lower() == STANDARD_SIGNAL_SOURCE] + node_metrics = [m for m in metrics if not _is_legacy_onchain_source(m.get("source"))] + hot = sorted(node_metrics, key=lambda x: float(x.get("onchain_score") or 0), reverse=True)[:8] + risks = sorted(node_metrics, key=lambda x: float(x.get("risk_score") or 0), reverse=True)[:8] + mapped_feed = _mapped_raw_signal_items(raw_events, active, limit=8) total_netflow = sum(float(x.get("exchange_netflow_usd") or 0) for x in metrics) - dex_volume = sum(float(x.get("dex_volume_usd") or 0) for x in metrics) return { "hours": int(hours or 24), "updated_at": _now(), "kpi": { - "event_count": len(events), + "event_count": len(standard_events), "raw_event_count": len(raw_events), "raw_unmapped_count": sum(1 for e in raw_events if e.get("mapping_status") == "unmapped"), "raw_mapped_count": sum(1 for e in raw_events if e.get("mapping_status") == "mapped"), - "token_count": len({(m["symbol"], m["chain"], m.get("contract_address") or "") for m in metrics}), - "positive_events": sum(1 for e in events if e.get("direction") == "positive"), - "risk_events": sum(1 for e in events if e.get("direction") == "risk"), + "token_count": len({(m["symbol"], m["chain"], m.get("contract_address") or "") for m in node_metrics}) + or len({(e.get("mapped_symbol"), e.get("chain")) for e in raw_events if e.get("mapping_status") == "mapped" and e.get("mapped_symbol")}), + "positive_events": sum(1 for e in standard_events if e.get("direction") == "positive"), + "risk_events": sum(1 for e in standard_events if e.get("direction") == "risk"), "exchange_netflow_usd": round(total_netflow, 2), - "dex_volume_usd": round(dex_volume, 2), + "mapped_signal_count": len(mapped_feed), }, - "hot_tokens": [_format_metric_item(row, active) for row in hot], + "hot_tokens": ([_format_metric_item(row, active) for row in hot] or mapped_feed), "risk_tokens": [_format_metric_item(row, active) for row in risks], "raw_events": [_format_raw_event(row) for row in raw_latest], - "signals": _signal_counts(events), + "signals": _signal_counts(standard_events), "provider_status": get_onchain_provider_status(hours=hours), } @@ -461,8 +470,23 @@ def get_onchain_provider_status(hours=24): conn = get_conn() try: raw_total = conn.execute("SELECT COUNT(*) FROM onchain_raw_events WHERE detected_at >= %s", (cutoff,)).fetchone()[0] - metric_total = conn.execute("SELECT COUNT(*) FROM onchain_token_metrics WHERE metric_time >= %s", (cutoff,)).fetchone()[0] - signal_total = conn.execute("SELECT COUNT(*) FROM onchain_events WHERE detected_at >= %s", (cutoff,)).fetchone()[0] + metric_total = conn.execute( + """ + SELECT COUNT(*) + FROM onchain_token_metrics + WHERE metric_time >= %s + AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius') + """, + (cutoff,), + ).fetchone()[0] + signal_total = conn.execute( + """ + SELECT COUNT(*) + FROM onchain_events + WHERE detected_at >= %s AND source=%s + """, + (cutoff, STANDARD_SIGNAL_SOURCE), + ).fetchone()[0] candidate_total = conn.execute( """ SELECT COUNT(*) @@ -602,12 +626,58 @@ def _format_metric_item(row, active=None): return item +def _mapped_raw_signal_items(raw_events, active=None, limit=8): + active = active or {} + grouped = {} + for event in raw_events: + if event.get("mapping_status") != "mapped" or not event.get("mapped_symbol"): + continue + if str(event.get("source") or "").lower() != STANDARD_SIGNAL_SOURCE: + continue + key = (event.get("mapped_symbol"), event.get("chain") or "") + current = grouped.setdefault( + key, + { + "symbol": event.get("mapped_symbol"), + "chain": event.get("chain") or "", + "contract_address": event.get("token_address") or "", + "source": STANDARD_SIGNAL_SOURCE, + "onchain_score": 0, + "risk_score": 0, + "event_count": 0, + "mapped_event_count": 0, + "latest_event_at": "", + "dex_volume_usd": 0, + "dex_volume_change_pct": 0, + "liquidity_usd": 0, + "liquidity_change_pct": 0, + }, + ) + importance = float(event.get("importance") or 0) + current["event_count"] += 1 + current["mapped_event_count"] += 1 + current["onchain_score"] = max(float(current.get("onchain_score") or 0), importance) + current["latest_event_at"] = max(str(current.get("latest_event_at") or ""), str(event.get("detected_at") or "")) + items = [] + for item in grouped.values(): + rec = active.get(item.get("symbol")) or {} + item["recommendation"] = { + "rec_id": rec.get("id") or 0, + "execution_status": rec.get("execution_status") or "", + "action_status": rec.get("action_status") or "", + "display_bucket": rec.get("display_bucket") or "", + "has_active": bool(rec), + } + items.append(item) + return sorted(items, key=lambda x: (float(x.get("onchain_score") or 0), str(x.get("latest_event_at") or "")), reverse=True)[: int(limit or 8)] + + def list_onchain_tokens(limit=30, offset=0, chain="", signal="", hours=24): init_onchain_tables() limit = max(1, min(int(limit or 30), 100)) offset = max(0, int(offset or 0)) cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat() - clauses = [] + clauses = ["COALESCE(m.source, '') NOT IN ('dexscreener', 'etherscan', 'helius')"] params = [] if chain: clauses.append("m.chain=%s") @@ -619,6 +689,7 @@ def list_onchain_tokens(limit=30, offset=0, chain="", signal="", hours=24): SELECT 1 FROM onchain_events e WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.detected_at >= %s AND e.signal_code=%s + AND COALESCE(e.source, '') NOT IN ('dexscreener', 'etherscan', 'helius') ) """ ) @@ -640,9 +711,11 @@ def list_onchain_tokens(limit=30, offset=0, chain="", signal="", hours=24): f""" SELECT m.*, (SELECT COUNT(*) FROM onchain_events e - WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.detected_at >= %s) AS event_count, + WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.detected_at >= %s + AND COALESCE(e.source, '') NOT IN ('dexscreener', 'etherscan', 'helius')) AS event_count, (SELECT COUNT(*) FROM onchain_events e - WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.direction='risk' AND e.detected_at >= %s) AS risk_event_count + WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.direction='risk' AND e.detected_at >= %s + AND COALESCE(e.source, '') NOT IN ('dexscreener', 'etherscan', 'helius')) AS risk_event_count FROM ({_latest_metrics_subquery(hours)}) m WHERE {where} ORDER BY m.onchain_score DESC, m.risk_score DESC, m.metric_time DESC @@ -677,6 +750,7 @@ def get_onchain_token_detail(symbol, hours=72): """ SELECT * FROM onchain_events WHERE symbol=%s AND detected_at >= %s + AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius') ORDER BY detected_at DESC, id DESC LIMIT 100 """, @@ -686,6 +760,7 @@ def get_onchain_token_detail(symbol, hours=72): """ SELECT * FROM onchain_token_metrics WHERE symbol=%s AND metric_time >= %s + AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius') ORDER BY metric_time DESC, id DESC LIMIT 100 """, @@ -764,9 +839,11 @@ def list_onchain_raw_events(limit=50, offset=0, chain="", source="", event_type= params.append(mapping_status) if priority: if priority == "important": - clauses.append("event_type NOT IN ('token_profile_latest', 'token_boost_latest', 'token_boost_top')") + clauses.append("importance >= %s") + params.append(70) elif priority == "low": - clauses.append("event_type IN ('token_profile_latest', 'token_boost_latest', 'token_boost_top')") + clauses.append("importance < %s") + params.append(70) where = " AND ".join(clauses) conn = get_conn() total = conn.execute(f"SELECT COUNT(*) FROM onchain_raw_events WHERE {where}", tuple(params)).fetchone()[0] diff --git a/static/onchain.html b/static/onchain.html index 0a10b94..2d1778f 100644 --- a/static/onchain.html +++ b/static/onchain.html @@ -22,9 +22,9 @@