alphax/app/db/onchain_db.py
2026-05-23 10:17:26 +08:00

1087 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""On-chain discovery storage and read models.
The on-chain layer is a research/discovery input. It stores normalized external
facts and can enqueue technical-check candidates, but it must not create or
mutate trading recommendations directly.
"""
import json
import os
from datetime import datetime, timedelta
from app.db.altcoin_db import get_conn
from app.db.postgres_connection import ensure_migrations_once
from app.config.system_config import onchain_config
MIN_MAPPING_CONFIDENCE = 70
SIGNAL_LABELS = {
"large_token_transfer": "链上大额转账",
"dex_volume_spike": "链上成交放量",
"liquidity_add": "流动性增加",
"liquidity_remove_risk": "流动性撤出风险",
"exchange_outflow": "交易所流出",
"exchange_inflow_risk": "交易所流入风险",
"whale_accumulation": "鲸鱼增持",
"holder_growth": "持有人增长",
"holder_concentration_risk": "持仓集中风险",
"smart_money_buying": "聪明钱买入",
}
RAW_EVENT_TYPE_LABELS = {
"evm_transfer": "EVM 原始转账",
}
RAW_EVENT_EXPLAINERS = {
"evm_transfer": {
"plain": "NodeReal 捕捉到 EVM 链上的 ERC-20 Transfer 原始日志。",
"meaning": "这代表链上确实有资金转移,但没有完成币种映射前,不能直接进入策略候选。",
"priority": "medium",
},
}
POSITIVE_SIGNALS = {"dex_volume_spike", "liquidity_add", "exchange_outflow", "whale_accumulation", "holder_growth", "smart_money_buying"}
RISK_SIGNALS = {"liquidity_remove_risk", "exchange_inflow_risk", "holder_concentration_risk"}
STANDARD_SIGNAL_SOURCE = "nodereal"
LEGACY_ONCHAIN_SOURCES = {"dexscreener", "etherscan", "helius"}
def _now():
return datetime.now().isoformat()
def _dump(value):
return json.dumps(value or {}, ensure_ascii=False, sort_keys=True, default=str)
def _load(value, fallback=None):
try:
if isinstance(value, str) and value.strip():
return json.loads(value)
if value is not None:
return value
except Exception:
pass
return fallback
def _symbol_base(symbol):
return str(symbol or "").upper().replace("/USDT", "").replace("USDT", "").strip()
def normalize_symbol(symbol):
base = _symbol_base(symbol)
return f"{base}/USDT" if base else ""
def signal_label(code):
return SIGNAL_LABELS.get(str(code or ""), str(code or "链上信号"))
def signal_direction(code):
code = str(code or "")
if code in RISK_SIGNALS:
return "risk"
if code in POSITIVE_SIGNALS:
return "positive"
return "neutral"
def _is_legacy_onchain_source(source):
return str(source or "").lower().strip() in LEGACY_ONCHAIN_SOURCES
def raw_event_type_label(event_type):
return RAW_EVENT_TYPE_LABELS.get(str(event_type or ""), str(event_type or "链上原始事件"))
def raw_event_explainer(event_type):
return RAW_EVENT_EXPLAINERS.get(
str(event_type or ""),
{
"plain": "链上或链上相关数据源捕捉到一条原始动态。",
"meaning": "需要完成币种映射和质量验证后,才可能进入技术检查。",
"priority": "medium",
},
)
def init_onchain_tables():
ensure_migrations_once()
def upsert_token_mapping(symbol, chain, contract_address, source="", confidence=0, raw=None, is_active=True):
init_onchain_tables()
now = _now()
symbol = normalize_symbol(symbol)
chain = str(chain or "").lower().strip()
contract_address = str(contract_address or "").strip()
if not symbol or not chain or not contract_address:
return 0
conn = get_conn()
cur = conn.execute(
"""
INSERT INTO onchain_token_map
(symbol, chain, contract_address, source, confidence, is_active, raw_json, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(symbol, chain, contract_address) DO UPDATE SET
source=excluded.source,
confidence=GREATEST(onchain_token_map.confidence, excluded.confidence),
is_active=excluded.is_active,
raw_json=excluded.raw_json,
updated_at=excluded.updated_at
""",
(symbol, chain, contract_address, source or "", int(confidence or 0), 1 if is_active else 0, _dump(raw), now, now),
)
conn.commit()
row = conn.execute(
"SELECT id FROM onchain_token_map WHERE symbol=%s AND chain=%s AND contract_address=%s",
(symbol, chain, contract_address),
).fetchone()
conn.close()
return int(row["id"] if row else 0)
def get_token_mappings(symbol="", min_confidence=MIN_MAPPING_CONFIDENCE, active_only=True):
init_onchain_tables()
clauses = ["confidence >= %s"]
params = [int(min_confidence or 0)]
if symbol:
clauses.append("symbol=%s")
params.append(normalize_symbol(symbol))
if active_only:
clauses.append("is_active=1")
conn = get_conn()
rows = conn.execute(
f"""
SELECT * FROM onchain_token_map
WHERE {' AND '.join(clauses)}
ORDER BY confidence DESC, updated_at DESC
""",
tuple(params),
).fetchall()
conn.close()
return [dict(row) for row in rows]
def _event_hash(event):
raw = "|".join(
[
str(event.get("source") or ""),
str(event.get("chain") or ""),
str(event.get("symbol") or ""),
str(event.get("signal_code") or event.get("event_type") or ""),
str(event.get("tx_hash") or event.get("contract_address") or ""),
str(event.get("detected_at") or ""),
str(round(float(event.get("value_usd") or 0), 2)),
]
).lower()
import hashlib
return hashlib.sha256(raw.encode()).hexdigest()[:24]
def _raw_event_hash(event):
raw = "|".join(
[
str(event.get("source") or ""),
str(event.get("chain") or ""),
str(event.get("event_type") or ""),
str(event.get("token_address") or ""),
str(event.get("url") or ""),
str(round(float(event.get("amount") or 0), 4)),
str(round(float(event.get("total_amount") or 0), 4)),
]
).lower()
import hashlib
return hashlib.sha256(raw.encode()).hexdigest()[:24]
def insert_onchain_event(event):
init_onchain_tables()
item = dict(event or {})
item["symbol"] = normalize_symbol(item.get("symbol"))
item["chain"] = str(item.get("chain") or "").lower().strip()
item["signal_code"] = str(item.get("signal_code") or "").strip()
item["event_type"] = str(item.get("event_type") or item["signal_code"] or "onchain_event")
if not item["symbol"] or not item["chain"] or not item["signal_code"]:
return 0
item["signal_label"] = item.get("signal_label") or signal_label(item["signal_code"])
item["direction"] = item.get("direction") or signal_direction(item["signal_code"])
item["detected_at"] = str(item.get("detected_at") or _now())
item["event_hash"] = item.get("event_hash") or _event_hash(item)
conn = get_conn()
try:
cur = conn.execute(
"""
INSERT INTO onchain_events (
event_hash, chain, symbol, contract_address, event_type, signal_code, signal_label,
direction, value_usd, amount, tx_hash, wallet_address, wallet_label,
counterparty_label, confidence, severity, status, detected_at, source, url, raw_json
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(event_hash) DO NOTHING
RETURNING id
""",
(
item["event_hash"],
item["chain"],
item["symbol"],
item.get("contract_address") or "",
item["event_type"],
item["signal_code"],
item["signal_label"],
item["direction"],
float(item.get("value_usd") or 0),
float(item.get("amount") or 0),
item.get("tx_hash") or "",
item.get("wallet_address") or "",
item.get("wallet_label") or "",
item.get("counterparty_label") or "",
int(item.get("confidence") or 0),
item.get("severity") or "B",
item.get("status") or "new",
item["detected_at"],
item.get("source") or "",
item.get("url") or "",
_dump(item.get("raw") or item.get("raw_json") or {}),
),
)
row = cur.fetchone()
event_id = int(row["id"] if row else 0)
conn.commit()
finally:
conn.close()
return event_id
def find_mapping_by_contract(chain, contract_address):
init_onchain_tables()
chain = str(chain or "").lower().strip()
contract_address = str(contract_address or "").strip()
if not chain or not contract_address:
return None
conn = get_conn()
row = conn.execute(
"""
SELECT *
FROM onchain_token_map
WHERE chain=%s AND lower(contract_address)=lower(%s) AND is_active=1
ORDER BY confidence DESC, updated_at DESC
LIMIT 1
""",
(chain, contract_address),
).fetchone()
conn.close()
return dict(row) if row else None
def insert_onchain_raw_event(event):
init_onchain_tables()
item = dict(event or {})
item["source"] = str(item.get("source") or "").strip()
item["chain"] = str(item.get("chain") or "").lower().strip()
item["event_type"] = str(item.get("event_type") or "onchain_raw_event").strip()
item["token_address"] = str(item.get("token_address") or "").strip()
if not item["source"] or not item["chain"] or not item["event_type"] or not item["token_address"]:
return 0
item["detected_at"] = str(item.get("detected_at") or _now())
item["event_hash"] = item.get("event_hash") or _raw_event_hash(item)
item["mapped_symbol"] = normalize_symbol(item.get("mapped_symbol")) if item.get("mapped_symbol") else ""
item["mapping_status"] = str(item.get("mapping_status") or ("mapped" if item["mapped_symbol"] else "unmapped"))
conn = get_conn()
try:
cur = conn.execute(
"""
INSERT INTO onchain_raw_events (
event_hash, source, chain, event_type, token_address, symbol_guess, name,
title, description, url, icon, amount, total_amount, importance,
mapped_symbol, mapping_status, detected_at, raw_json
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(event_hash) DO NOTHING
RETURNING id
""",
(
item["event_hash"],
item["source"],
item["chain"],
item["event_type"],
item["token_address"],
item.get("symbol_guess") or "",
item.get("name") or "",
item.get("title") or raw_event_type_label(item["event_type"]),
item.get("description") or "",
item.get("url") or "",
item.get("icon") or "",
float(item.get("amount") or 0),
float(item.get("total_amount") or 0),
float(item.get("importance") or 0),
item["mapped_symbol"],
item["mapping_status"],
item["detected_at"],
_dump(item.get("raw") or item.get("raw_json") or {}),
),
)
row = cur.fetchone()
event_id = int(row["id"] if row else 0)
conn.commit()
finally:
conn.close()
return event_id
def insert_token_metric(metric):
init_onchain_tables()
item = dict(metric or {})
item["symbol"] = normalize_symbol(item.get("symbol"))
item["chain"] = str(item.get("chain") or "").lower().strip()
item["window"] = str(item.get("window") or "1h").strip()
item["metric_time"] = str(item.get("metric_time") or _now())
if not item["symbol"] or not item["chain"] or not item["window"]:
return 0
conn = get_conn()
cur = conn.execute(
"""
INSERT INTO onchain_token_metrics (
symbol, chain, contract_address, "window", metric_time,
dex_volume_usd, dex_volume_change_pct, liquidity_usd, liquidity_change_pct,
exchange_netflow_usd, whale_accumulation_usd, holder_delta, smart_money_score,
onchain_score, risk_score, source, raw_json
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(symbol, chain, contract_address, "window", metric_time) DO UPDATE SET
dex_volume_usd=excluded.dex_volume_usd,
dex_volume_change_pct=excluded.dex_volume_change_pct,
liquidity_usd=excluded.liquidity_usd,
liquidity_change_pct=excluded.liquidity_change_pct,
exchange_netflow_usd=excluded.exchange_netflow_usd,
whale_accumulation_usd=excluded.whale_accumulation_usd,
holder_delta=excluded.holder_delta,
smart_money_score=excluded.smart_money_score,
onchain_score=excluded.onchain_score,
risk_score=excluded.risk_score,
source=excluded.source,
raw_json=excluded.raw_json
RETURNING id
""",
(
item["symbol"],
item["chain"],
item.get("contract_address") or "",
item["window"],
item["metric_time"],
float(item.get("dex_volume_usd") or 0),
float(item.get("dex_volume_change_pct") or 0),
float(item.get("liquidity_usd") or 0),
float(item.get("liquidity_change_pct") or 0),
float(item.get("exchange_netflow_usd") or 0),
float(item.get("whale_accumulation_usd") or 0),
float(item.get("holder_delta") or 0),
float(item.get("smart_money_score") or 0),
float(item.get("onchain_score") or 0),
float(item.get("risk_score") or 0),
item.get("source") or "",
_dump(item.get("raw") or item.get("raw_json") or {}),
),
)
conn.commit()
metric_id = int(cur.fetchone()["id"] or 0)
conn.close()
return metric_id
def _latest_metrics_subquery(hours=24):
return """
SELECT m.*
FROM onchain_token_metrics m
JOIN (
SELECT symbol, chain, contract_address, MAX(metric_time) AS max_time
FROM onchain_token_metrics
WHERE metric_time >= %s
GROUP BY symbol, chain, contract_address
) latest ON latest.symbol=m.symbol
AND latest.chain=m.chain
AND latest.contract_address=m.contract_address
AND latest.max_time=m.metric_time
"""
def get_onchain_overview(hours=24):
init_onchain_tables()
cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat()
conn = get_conn()
event_rows = conn.execute("SELECT * FROM onchain_events WHERE detected_at >= %s", (cutoff,)).fetchall()
raw_rows = conn.execute("SELECT * FROM onchain_raw_events WHERE detected_at >= %s", (cutoff,)).fetchall()
raw_latest = conn.execute(
"""
SELECT * FROM onchain_raw_events
WHERE detected_at >= %s
AND event_type NOT IN ('token_profile_latest', 'token_boost_latest', 'token_boost_top')
ORDER BY detected_at::timestamp DESC, importance DESC, id DESC
LIMIT 12
""",
(cutoff,),
).fetchall()
metric_rows = conn.execute(_latest_metrics_subquery(hours), (cutoff,)).fetchall()
rec_rows = conn.execute(
"SELECT symbol, id, execution_status, action_status, display_bucket FROM recommendation WHERE status='active'"
).fetchall()
conn.close()
active = {row["symbol"]: dict(row) for row in rec_rows}
events = [dict(row) for row in event_rows]
raw_events = [dict(row) for row in raw_rows]
metrics = [dict(row) for row in metric_rows]
standard_events = [e for e in events if str(e.get("source") or "").lower() == STANDARD_SIGNAL_SOURCE]
node_metrics = [m for m in metrics if not _is_legacy_onchain_source(m.get("source"))]
hot = sorted(node_metrics, key=lambda x: float(x.get("onchain_score") or 0), reverse=True)[:8]
risks = sorted(node_metrics, key=lambda x: float(x.get("risk_score") or 0), reverse=True)[:8]
mapped_feed = _mapped_raw_signal_items(raw_events, active, limit=8)
total_netflow = sum(float(x.get("exchange_netflow_usd") or 0) for x in metrics)
return {
"hours": int(hours or 24),
"updated_at": _now(),
"kpi": {
"event_count": len(standard_events),
"raw_event_count": len(raw_events),
"raw_unmapped_count": sum(1 for e in raw_events if e.get("mapping_status") == "unmapped"),
"raw_mapped_count": sum(1 for e in raw_events if e.get("mapping_status") == "mapped"),
"token_count": len({(m["symbol"], m["chain"], m.get("contract_address") or "") for m in node_metrics})
or len({(e.get("mapped_symbol"), e.get("chain")) for e in raw_events if e.get("mapping_status") == "mapped" and e.get("mapped_symbol")}),
"positive_events": sum(1 for e in standard_events if e.get("direction") == "positive"),
"risk_events": sum(1 for e in standard_events if e.get("direction") == "risk"),
"exchange_netflow_usd": round(total_netflow, 2),
"mapped_signal_count": len(mapped_feed),
},
"hot_tokens": ([_format_metric_item(row, active) for row in hot] or mapped_feed),
"risk_tokens": [_format_metric_item(row, active) for row in risks],
"raw_events": _format_raw_events(raw_latest),
"signals": _signal_counts(standard_events),
"provider_status": get_onchain_provider_status(hours=hours),
}
def get_onchain_provider_status(hours=24):
init_onchain_tables()
cfg = onchain_config()
hours = int(hours or 24)
cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()
nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY")
conn = get_conn()
try:
raw_total = conn.execute("SELECT COUNT(*) FROM onchain_raw_events WHERE detected_at >= %s", (cutoff,)).fetchone()[0]
metric_total = conn.execute(
"""
SELECT COUNT(*)
FROM onchain_token_metrics
WHERE metric_time >= %s
AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius')
""",
(cutoff,),
).fetchone()[0]
signal_total = conn.execute(
"""
SELECT COUNT(*)
FROM onchain_events
WHERE detected_at >= %s AND source=%s
""",
(cutoff, STANDARD_SIGNAL_SOURCE),
).fetchone()[0]
candidate_total = conn.execute(
"""
SELECT COUNT(*)
FROM event_news
WHERE source='onchain' AND detected_at >= %s
""",
(cutoff,),
).fetchone()[0]
mapping_total = conn.execute("SELECT COUNT(*) FROM onchain_token_map WHERE is_active=1").fetchone()[0]
mapping_usable = conn.execute(
"SELECT COUNT(*) FROM onchain_token_map WHERE is_active=1 AND confidence >= %s",
(MIN_MAPPING_CONFIDENCE,),
).fetchone()[0]
raw_by_type = conn.execute(
"""
SELECT event_type, COUNT(*) AS count
FROM onchain_raw_events
WHERE detected_at >= %s
GROUP BY event_type
ORDER BY count DESC, event_type
""",
(cutoff,),
).fetchall()
metric_sources = conn.execute(
"""
SELECT source, COUNT(*) AS count
FROM onchain_token_metrics
WHERE metric_time >= %s
GROUP BY source
ORDER BY count DESC, source
""",
(cutoff,),
).fetchall()
signal_sources = conn.execute(
"""
SELECT source, COUNT(*) AS count
FROM onchain_events
WHERE detected_at >= %s
GROUP BY source
ORDER BY count DESC, source
""",
(cutoff,),
).fetchall()
last_onchain = conn.execute(
"""
SELECT *
FROM cron_run_log
WHERE script_name='onchain_monitor.py' OR job_name IN ('链上','onchain')
ORDER BY started_at DESC, id DESC
LIMIT 1
"""
).fetchone()
finally:
conn.close()
summary = _load(last_onchain.get("summary_json") if last_onchain else "{}", {}) if last_onchain else {}
last_error = last_onchain.get("error_message") if last_onchain else ""
provider = str(cfg.get("provider") or "nodereal").strip().lower()
nodereal_enabled = bool(cfg.get("nodereal_enabled", True)) and provider == "nodereal"
nodereal_metrics = int(sum(row["count"] for row in metric_sources if row["source"] == "nodereal"))
nodereal_signals = int(sum(row["count"] for row in signal_sources if row["source"] == "nodereal"))
providers = [
{
"provider": "nodereal",
"label": "NodeReal",
"enabled": nodereal_enabled,
"api_key_present": bool(os.getenv(nodereal_env, "").strip()),
"implemented": True,
"role": "EVM 主链上数据源Transfer 日志、大额转账、holder 变化",
"raw_events": int(raw_total or 0),
"metrics": nodereal_metrics,
"signals": nodereal_signals,
"status": _provider_status_label(
nodereal_enabled,
True,
int(raw_total or 0) + nodereal_metrics + nodereal_signals,
last_error if "nodereal" in str(last_error).lower() else "",
),
},
]
return {
"hours": hours,
"enabled": bool(cfg.get("enabled", False)),
"last_run": dict(last_onchain) if last_onchain else None,
"last_summary": summary,
"last_error": last_error or "",
"coverage": {
"active_mappings": int(mapping_total or 0),
"usable_mappings": int(mapping_usable or 0),
"raw_events": int(raw_total or 0),
"metrics": int(metric_total or 0),
"signals": int(signal_total or 0),
"queued_candidates": int(candidate_total or 0),
},
"raw_event_types": [dict(row) for row in raw_by_type],
"metric_sources": [dict(row) for row in metric_sources],
"signal_sources": [dict(row) for row in signal_sources],
"providers": providers,
}
def _provider_status_label(enabled, implemented, count, last_error=""):
if not enabled:
return "已关闭"
if not implemented:
return "未接入采集"
if count:
return "正常采集中"
if last_error:
return "最近采集失败"
return "暂无数据"
def _signal_counts(events):
counts = {}
for e in events:
code = e.get("signal_code") or ""
if not code:
continue
counts.setdefault(code, {"signal_code": code, "signal_label": signal_label(code), "count": 0})
counts[code]["count"] += 1
return sorted(counts.values(), key=lambda x: x["count"], reverse=True)
def _format_metric_item(row, active=None):
active = active or {}
item = dict(row)
item["raw"] = _load(item.pop("raw_json", "{}"), {})
rec = active.get(item.get("symbol")) or {}
item["recommendation"] = {
"rec_id": rec.get("id") or 0,
"execution_status": rec.get("execution_status") or "",
"action_status": rec.get("action_status") or "",
"display_bucket": rec.get("display_bucket") or "",
"has_active": bool(rec),
}
return item
def _mapped_raw_signal_items(raw_events, active=None, limit=8):
active = active or {}
grouped = {}
for event in raw_events:
if event.get("mapping_status") != "mapped" or not event.get("mapped_symbol"):
continue
if str(event.get("source") or "").lower() != STANDARD_SIGNAL_SOURCE:
continue
key = (event.get("mapped_symbol"), event.get("chain") or "")
current = grouped.setdefault(
key,
{
"symbol": event.get("mapped_symbol"),
"chain": event.get("chain") or "",
"contract_address": event.get("token_address") or "",
"source": STANDARD_SIGNAL_SOURCE,
"onchain_score": 0,
"risk_score": 0,
"event_count": 0,
"mapped_event_count": 0,
"latest_event_at": "",
"dex_volume_usd": 0,
"dex_volume_change_pct": 0,
"liquidity_usd": 0,
"liquidity_change_pct": 0,
},
)
importance = float(event.get("importance") or 0)
current["event_count"] += 1
current["mapped_event_count"] += 1
current["onchain_score"] = max(float(current.get("onchain_score") or 0), importance)
current["latest_event_at"] = max(str(current.get("latest_event_at") or ""), str(event.get("detected_at") or ""))
items = []
for item in grouped.values():
rec = active.get(item.get("symbol")) or {}
item["recommendation"] = {
"rec_id": rec.get("id") or 0,
"execution_status": rec.get("execution_status") or "",
"action_status": rec.get("action_status") or "",
"display_bucket": rec.get("display_bucket") or "",
"has_active": bool(rec),
}
items.append(item)
return sorted(items, key=lambda x: (float(x.get("onchain_score") or 0), str(x.get("latest_event_at") or "")), reverse=True)[: int(limit or 8)]
def list_onchain_tokens(limit=30, offset=0, chain="", signal="", hours=24):
init_onchain_tables()
limit = max(1, min(int(limit or 30), 100))
offset = max(0, int(offset or 0))
cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat()
clauses = ["COALESCE(m.source, '') NOT IN ('dexscreener', 'etherscan', 'helius')"]
params = []
if chain:
clauses.append("m.chain=%s")
params.append(str(chain).lower())
if signal:
clauses.append(
"""
EXISTS (
SELECT 1 FROM onchain_events e
WHERE e.symbol=m.symbol AND e.chain=m.chain
AND e.detected_at >= %s AND e.signal_code=%s
AND COALESCE(e.source, '') NOT IN ('dexscreener', 'etherscan', 'helius')
)
"""
)
params.extend([cutoff, signal])
where = " AND ".join(clauses) if clauses else "1=1"
conn = get_conn()
total = conn.execute(
f"""
SELECT COUNT(*) FROM (
SELECT m.symbol, m.chain, m.contract_address
FROM onchain_token_metrics m
WHERE {where}
GROUP BY m.symbol, m.chain, m.contract_address
)
""",
tuple(params),
).fetchone()[0]
rows = conn.execute(
f"""
SELECT m.*,
(SELECT COUNT(*) FROM onchain_events e
WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.detected_at >= %s
AND COALESCE(e.source, '') NOT IN ('dexscreener', 'etherscan', 'helius')) AS event_count,
(SELECT COUNT(*) FROM onchain_events e
WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.direction='risk' AND e.detected_at >= %s
AND COALESCE(e.source, '') NOT IN ('dexscreener', 'etherscan', 'helius')) AS risk_event_count
FROM ({_latest_metrics_subquery(hours)}) m
WHERE {where}
ORDER BY m.onchain_score DESC, m.risk_score DESC, m.metric_time DESC
LIMIT %s OFFSET %s
""",
(cutoff, cutoff, cutoff, *params, limit, offset),
).fetchall()
rec_rows = conn.execute(
"SELECT symbol, id, execution_status, action_status, display_bucket FROM recommendation WHERE status='active'"
).fetchall()
conn.close()
active = {row["symbol"]: dict(row) for row in rec_rows}
return {
"items": [_format_metric_item(row, active) for row in rows],
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(rows) < int(total or 0),
}
def get_onchain_token_detail(symbol, hours=72):
init_onchain_tables()
symbol = normalize_symbol(symbol)
cutoff = (datetime.now() - timedelta(hours=int(hours or 72))).isoformat()
conn = get_conn()
mappings = conn.execute(
"SELECT * FROM onchain_token_map WHERE symbol=%s ORDER BY confidence DESC, updated_at DESC",
(symbol,),
).fetchall()
events = conn.execute(
"""
SELECT * FROM onchain_events
WHERE symbol=%s AND detected_at >= %s
AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius')
ORDER BY detected_at DESC, id DESC
LIMIT 100
""",
(symbol, cutoff),
).fetchall()
metrics = conn.execute(
"""
SELECT * FROM onchain_token_metrics
WHERE symbol=%s AND metric_time >= %s
AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius')
ORDER BY metric_time DESC, id DESC
LIMIT 100
""",
(symbol, cutoff),
).fetchall()
raw_events = conn.execute(
"""
SELECT * FROM onchain_raw_events
WHERE mapped_symbol=%s AND detected_at >= %s
AND mapping_status='mapped'
AND source=%s
ORDER BY detected_at::timestamp DESC, importance DESC, id DESC
LIMIT 100
""",
(symbol, cutoff, STANDARD_SIGNAL_SOURCE),
).fetchall()
rec = conn.execute(
"""
SELECT id, rec_time, action_status, execution_status, display_bucket, entry_price, current_price
FROM recommendation
WHERE symbol=%s AND status='active'
ORDER BY id DESC LIMIT 1
""",
(symbol,),
).fetchone()
conn.close()
return {
"symbol": symbol,
"hours": int(hours or 72),
"mappings": [_with_raw(row) for row in mappings],
"events": [_with_raw(row) for row in events],
"raw_events": _format_raw_events(raw_events),
"raw_event_count": len(raw_events),
"metrics": [_with_raw(row) for row in metrics],
"recommendation": dict(rec) if rec else None,
}
def get_onchain_factor_context(symbol, hours=24):
"""Return compact on-chain factor evidence for strategy scoring.
This read model intentionally does not create recommendations. It only
exposes mapped NodeReal facts to the technical confirmation layer.
"""
init_onchain_tables()
symbol = normalize_symbol(symbol)
if not symbol:
return {"symbol": "", "positive_events": [], "risk_events": [], "metrics": {}, "has_data": False}
cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat()
conn = get_conn()
try:
metric = conn.execute(
"""
SELECT *
FROM onchain_token_metrics
WHERE symbol=%s AND metric_time >= %s
AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius')
ORDER BY metric_time DESC, id DESC
LIMIT 1
""",
(symbol, cutoff),
).fetchone()
rows = conn.execute(
"""
SELECT *
FROM onchain_events
WHERE symbol=%s AND detected_at >= %s
AND COALESCE(source, '') NOT IN ('dexscreener', 'etherscan', 'helius')
ORDER BY detected_at DESC, confidence DESC, value_usd DESC, id DESC
LIMIT 20
""",
(symbol, cutoff),
).fetchall()
finally:
conn.close()
events = [dict(row) for row in rows]
positive = [e for e in events if e.get("direction") == "positive"]
risks = [e for e in events if e.get("direction") == "risk"]
metric_item = dict(metric) if metric else {}
return {
"symbol": symbol,
"hours": int(hours or 24),
"has_data": bool(metric_item or events),
"metrics": metric_item,
"positive_events": positive,
"risk_events": risks,
"event_count": len(events),
"positive_event_count": len(positive),
"risk_event_count": len(risks),
"top_positive": positive[0] if positive else None,
"top_risk": risks[0] if risks else None,
}
def list_onchain_events(limit=50, offset=0, chain="", signal="", status="", hours=24):
init_onchain_tables()
limit = max(1, min(int(limit or 50), 200))
offset = max(0, int(offset or 0))
cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat()
clauses = ["detected_at >= %s"]
params = [cutoff]
if chain:
clauses.append("chain=%s")
params.append(str(chain).lower())
if signal:
clauses.append("signal_code=%s")
params.append(signal)
if status:
clauses.append("status=%s")
params.append(status)
where = " AND ".join(clauses)
conn = get_conn()
total = conn.execute(f"SELECT COUNT(*) FROM onchain_events WHERE {where}", tuple(params)).fetchone()[0]
rows = conn.execute(
f"""
SELECT * FROM onchain_events
WHERE {where}
ORDER BY detected_at::timestamp DESC, id DESC
LIMIT %s OFFSET %s
""",
(*params, limit, offset),
).fetchall()
conn.close()
return {"items": [_with_raw(row) for row in rows], "total": int(total or 0), "limit": limit, "offset": offset, "has_more": offset + len(rows) < int(total or 0)}
def list_onchain_raw_events(limit=50, offset=0, chain="", source="", event_type="", mapping_status="", priority="", hours=24):
init_onchain_tables()
limit = max(1, min(int(limit or 50), 200))
offset = max(0, int(offset or 0))
cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat()
clauses = ["detected_at >= %s"]
params = [cutoff]
if chain:
clauses.append("chain=%s")
params.append(str(chain).lower())
if source:
clauses.append("source=%s")
params.append(source)
if event_type:
clauses.append("event_type=%s")
params.append(event_type)
if mapping_status:
clauses.append("mapping_status=%s")
params.append(mapping_status)
if priority:
if priority == "important":
clauses.append("importance >= %s")
params.append(70)
elif priority == "low":
clauses.append("importance < %s")
params.append(70)
where = " AND ".join(clauses)
conn = get_conn()
total = conn.execute(f"SELECT COUNT(*) FROM onchain_raw_events WHERE {where}", tuple(params)).fetchone()[0]
rows = conn.execute(
f"""
SELECT * FROM onchain_raw_events
WHERE {where}
ORDER BY detected_at::timestamp DESC, importance DESC, id DESC
LIMIT %s OFFSET %s
""",
(*params, limit, offset),
).fetchall()
conn.close()
return {
"items": _format_raw_events(rows),
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(rows) < int(total or 0),
}
def update_event_status(event_ids, status):
if not event_ids:
return 0
init_onchain_tables()
conn = get_conn()
cur = conn.execute(
"UPDATE onchain_events SET status=%s WHERE id IN (" + ",".join(["%s"] * len(event_ids)) + ")",
(status, *[int(x) for x in event_ids]),
)
conn.commit()
conn.close()
return int(cur.rowcount or 0)
def _with_raw(row):
item = dict(row)
if "raw_json" in item:
item["raw"] = _load(item.pop("raw_json"), {})
return item
def _format_raw_events(rows):
rows = list(rows or [])
metadata = _raw_event_token_metadata(rows)
return [_format_raw_event(row, metadata.get(_raw_event_token_key(row), {})) for row in rows]
def _raw_event_token_key(row):
item = dict(row)
return (str(item.get("chain") or "").lower(), str(item.get("token_address") or "").lower())
def _raw_event_token_metadata(rows):
keys = sorted({key for key in (_raw_event_token_key(row) for row in rows or []) if key[0] and key[1]})
if not keys:
return {}
clauses = []
params = []
for chain, contract in keys:
clauses.append("(chain=%s AND lower(contract_address)=lower(%s))")
params.extend([chain, contract])
conn = get_conn()
try:
found = conn.execute(
f"""
SELECT chain, contract_address, symbol, raw_json
FROM onchain_token_map
WHERE is_active=1 AND ({' OR '.join(clauses)})
ORDER BY confidence DESC, updated_at DESC
""",
tuple(params),
).fetchall()
finally:
conn.close()
metadata = {}
for row in found:
key = (str(row["chain"] or "").lower(), str(row["contract_address"] or "").lower())
if key in metadata:
continue
raw = _load(row["raw_json"], {}) or {}
metadata[key] = {
"symbol": normalize_symbol(row["symbol"]),
"token_symbol": raw.get("symbol") or _symbol_base(row["symbol"]),
"name": raw.get("name") or "",
"decimals": int(raw.get("decimals") or 0),
}
return metadata
def _format_raw_event(row, token_meta=None):
item = _with_raw(row)
token_meta = token_meta or {}
item["event_label"] = raw_event_type_label(item.get("event_type"))
explainer = raw_event_explainer(item.get("event_type"))
item["plain_summary"] = explainer.get("plain") or ""
item["why_matters"] = explainer.get("meaning") or ""
item["priority"] = explainer.get("priority") or "medium"
display = _humanize_raw_transfer(item, token_meta)
item.update(display)
item["pipeline_note"] = (
"已映射,可进入后续链上信号分析。"
if item.get("mapping_status") == "mapped"
else "未完成币种映射,仅作为原始观察,不进入推荐。"
)
item["token_short"] = _short_address(item.get("token_address"))
return item
def _humanize_raw_transfer(item, token_meta):
token_symbol = token_meta.get("token_symbol") or item.get("symbol_guess") or _symbol_base(item.get("mapped_symbol")) or "Token"
decimals = int(token_meta.get("decimals") or 0)
amount = float(item.get("total_amount") or item.get("amount") or 0)
display_amount = amount
if decimals > 0 and amount >= 10**decimals:
display_amount = amount / (10**decimals)
raw = item.get("raw") or {}
topics = raw.get("topics") if isinstance(raw, dict) else []
from_addr = _topic_address(topics[1]) if isinstance(topics, list) and len(topics) > 1 else ""
to_addr = _topic_address(topics[2]) if isinstance(topics, list) and len(topics) > 2 else ""
mapped = item.get("mapped_symbol") or normalize_symbol(token_symbol)
amount_label = f"{_compact_number(display_amount)} {token_symbol}" if display_amount else f"未知数量 {token_symbol}"
route = ""
if from_addr and to_addr:
route = f"{_short_address(from_addr)} 转至 {_short_address(to_addr)}"
elif to_addr:
route = f"转入 {_short_address(to_addr)}"
summary = f"{mapped} 出现一笔 ERC-20 转账,数量约 {amount_label}"
if route:
summary += f"{route}"
return {
"display_amount": round(display_amount, 8) if display_amount else 0,
"display_amount_label": amount_label,
"from_address": from_addr,
"to_address": to_addr,
"from_short": _short_address(from_addr),
"to_short": _short_address(to_addr),
"human_summary": summary,
}
def _topic_address(topic):
topic = str(topic or "")
if topic.startswith("0x") and len(topic) >= 42:
return "0x" + topic[-40:]
return ""
def _compact_number(value):
value = float(value or 0)
abs_value = abs(value)
if abs_value >= 1_000_000_000:
return f"{value / 1_000_000_000:.2f}B"
if abs_value >= 1_000_000:
return f"{value / 1_000_000:.2f}M"
if abs_value >= 1_000:
return f"{value / 1_000:.2f}K"
if abs_value >= 1:
return f"{value:.2f}".rstrip("0").rstrip(".")
if abs_value > 0:
return f"{value:.6f}".rstrip("0").rstrip(".")
return "0"
def _short_address(value):
value = str(value or "")
if len(value) <= 14:
return value
return value[:6] + "..." + value[-4:]