1518 lines
59 KiB
Python
1518 lines
59 KiB
Python
"""On-chain signal collector and candidate bridge.
|
|
|
|
V1 deliberately treats on-chain data as a discovery/risk layer. It writes
|
|
normalized events/metrics and may request a technical check through event_news,
|
|
but it never creates recommendations or changes recommendation state directly.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from urllib.parse import urlencode
|
|
|
|
import requests
|
|
|
|
from app.config.system_config import onchain_config
|
|
from app.db import onchain_db
|
|
from app.db.altcoin_db import get_conn, init_db, log_cron_run
|
|
from app.db.tracking_queries import get_latest_price_cache
|
|
from app.db.onchain_db import (
|
|
MIN_MAPPING_CONFIDENCE,
|
|
POSITIVE_SIGNALS,
|
|
RISK_SIGNALS,
|
|
find_mapping_by_contract,
|
|
get_token_mappings,
|
|
init_onchain_tables,
|
|
insert_onchain_event,
|
|
insert_onchain_raw_event,
|
|
insert_token_metric,
|
|
normalize_symbol,
|
|
signal_direction,
|
|
signal_label,
|
|
)
|
|
from app.services.event_driven_screener import _event_hash as event_hash
|
|
from app.services.event_driven_screener import _tradable_symbol, init_event_tables
|
|
from app.services.nodereal_client import DEFAULT_CHAIN_ENDPOINTS, NodeRealClient, NodeRealConfig
|
|
|
|
|
|
DEFAULT_CHAINS = ("ethereum", "bsc")
|
|
ETHERSCAN_CHAIN_IDS = {
|
|
"ethereum": "1",
|
|
"bsc": "56",
|
|
"base": "8453",
|
|
"arbitrum": "42161",
|
|
}
|
|
SOLANA_AUTO_ALLOWLIST = {
|
|
"WIF", "BONK", "JUP", "RAY", "PYTH", "PENGU", "JTO", "MEW", "POPCAT", "PNUT",
|
|
"FARTCOIN", "RENDER", "HNT", "MOBILE", "ORCA", "KMNO", "DRIFT", "TNSR", "IO",
|
|
}
|
|
NON_TARGET_NATIVE_BASES = {
|
|
"AVAX", "FIL", "SUI", "APT", "DOT", "ADA", "XRP", "LTC", "BCH", "ATOM", "NEAR",
|
|
"SEI", "INJ", "TON", "ETC", "ICP", "HBAR", "ALGO", "VET", "TRX", "XLM", "KAS",
|
|
"TIA", "EGLD", "FLOW", "KAVA", "MINA", "IOTA", "XMR", "DASH", "ZEC",
|
|
}
|
|
BRIDGED_TOKEN_MARKERS = (
|
|
"wrapped", "wormhole", "portal", "bridged", "bridge", "axelar", "allbridge",
|
|
"binance-peg", "multichain", "layerzero", "lz", "wavax", "wfil",
|
|
)
|
|
DEX_CHAIN_ALIASES = {
|
|
"ethereum": "ethereum",
|
|
"eth": "ethereum",
|
|
"bsc": "bsc",
|
|
"bnb": "bsc",
|
|
"base": "base",
|
|
"arbitrum": "arbitrum",
|
|
"arb": "arbitrum",
|
|
"solana": "solana",
|
|
"sol": "solana",
|
|
}
|
|
|
|
DEXSCREENER_RAW_ENDPOINTS = (
|
|
("token_profile_latest", "https://api.dexscreener.com/token-profiles/latest/v1"),
|
|
("token_boost_latest", "https://api.dexscreener.com/token-boosts/latest/v1"),
|
|
("token_boost_top", "https://api.dexscreener.com/token-boosts/top/v1"),
|
|
)
|
|
TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
|
|
ERC20_SYMBOL_SELECTOR = "0x95d89b41"
|
|
ERC20_NAME_SELECTOR = "0x06fdde03"
|
|
ERC20_DECIMALS_SELECTOR = "0x313ce567"
|
|
|
|
|
|
def _env_bool(name, default=False):
|
|
value = os.getenv(name)
|
|
if value is None:
|
|
return default
|
|
return str(value).strip().lower() in ("1", "true", "yes", "on")
|
|
|
|
|
|
def _env_int(name, default):
|
|
try:
|
|
return int(os.getenv(name, str(default)) or default)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _env_float(name, default):
|
|
try:
|
|
return float(os.getenv(name, str(default)) or default)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def get_onchain_params():
|
|
"""Runtime provider config. Keep this out of rules.yaml."""
|
|
cfg = onchain_config(DEFAULT_CHAINS)
|
|
chains_raw = cfg.get("chains") or list(DEFAULT_CHAINS)
|
|
if isinstance(chains_raw, str):
|
|
chains = [x.strip().lower() for x in chains_raw.split(",") if x.strip()]
|
|
else:
|
|
chains = [str(x).strip().lower() for x in chains_raw if str(x).strip()]
|
|
etherscan_env = str(cfg.get("etherscan_api_key_env") or "ALPHAX_ETHERSCAN_API_KEY")
|
|
helius_env = str(cfg.get("helius_api_key_env") or "ALPHAX_HELIUS_API_KEY")
|
|
nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY")
|
|
token_mappings_env = str(cfg.get("token_mappings_env") or "ALPHAX_ONCHAIN_TOKEN_MAPPINGS")
|
|
etherscan_chains_raw = cfg.get("etherscan_chains") or ["ethereum"]
|
|
if isinstance(etherscan_chains_raw, str):
|
|
etherscan_chains = [x.strip().lower() for x in etherscan_chains_raw.split(",") if x.strip()]
|
|
else:
|
|
etherscan_chains = [str(x).strip().lower() for x in etherscan_chains_raw if str(x).strip()]
|
|
return {
|
|
"enabled": bool(cfg.get("enabled", False)),
|
|
"provider": str(cfg.get("provider") or "nodereal").strip().lower(),
|
|
"chains": chains or list(DEFAULT_CHAINS),
|
|
"timeout": int(cfg.get("timeout") or 15),
|
|
"nodereal_enabled": bool(cfg.get("nodereal_enabled", True)),
|
|
"nodereal_chains": _normalize_chain_list(cfg.get("nodereal_chains") or ("ethereum", "bsc")),
|
|
"nodereal_api_key": os.getenv(nodereal_env, "").strip(),
|
|
"nodereal_api_key_env": nodereal_env,
|
|
"token_mappings": _load_token_mappings(cfg.get("token_mappings"), os.getenv(token_mappings_env, "")),
|
|
"token_mappings_env": token_mappings_env,
|
|
"nodereal_log_block_lookback": int(cfg.get("nodereal_log_block_lookback") or 120),
|
|
"nodereal_max_logs_per_token": int(cfg.get("nodereal_max_logs_per_token") or 25),
|
|
"nodereal_raw_transfer_enabled": bool(cfg.get("nodereal_raw_transfer_enabled", True)),
|
|
"nodereal_raw_block_lookback": int(cfg.get("nodereal_raw_block_lookback") or 1),
|
|
"nodereal_raw_max_logs_per_chain": int(cfg.get("nodereal_raw_max_logs_per_chain") or 30),
|
|
"nodereal_auto_mapping_enabled": bool(cfg.get("nodereal_auto_mapping_enabled", True)),
|
|
"nodereal_auto_mapping_confidence": int(cfg.get("nodereal_auto_mapping_confidence") or 82),
|
|
"candidate_enabled": bool(cfg.get("candidate_enabled", True)),
|
|
"candidate_min_score": float(cfg.get("candidate_min_score") or 70),
|
|
"candidate_min_confidence": int(cfg.get("candidate_min_confidence") or 70),
|
|
"candidate_cooldown_hours": float(cfg.get("candidate_cooldown_hours") or 6),
|
|
"dexscreener_enabled": bool(cfg.get("dexscreener_enabled", True)),
|
|
"etherscan_enabled": bool(cfg.get("etherscan_enabled", True)),
|
|
"etherscan_chains": etherscan_chains or ["ethereum"],
|
|
"helius_enabled": bool(cfg.get("helius_enabled", True)),
|
|
"dex_volume_spike_pct": float(cfg.get("dex_volume_spike_pct") or 80),
|
|
"dex_min_liquidity_usd": float(cfg.get("dex_min_liquidity_usd") or 100000),
|
|
"dex_min_volume_24h_usd": float(cfg.get("dex_min_volume_24h_usd") or 100000),
|
|
"liquidity_add_pct": float(cfg.get("liquidity_add_pct") or 25),
|
|
"liquidity_remove_pct": float(cfg.get("liquidity_remove_pct") or -25),
|
|
"dex_hour_volume_share_pct": float(cfg.get("dex_hour_volume_share_pct") or 8),
|
|
"dex_min_hour_volume_usd": float(cfg.get("dex_min_hour_volume_usd") or 50000),
|
|
"whale_tx_usd": float(cfg.get("whale_tx_usd") or 250000),
|
|
"etherscan_base_url": str(cfg.get("etherscan_base_url") or "https://api.etherscan.io/v2/api").strip(),
|
|
"helius_base_url": str(cfg.get("helius_base_url") or "https://api.helius.xyz").strip().rstrip("/"),
|
|
"etherscan_api_key": os.getenv(etherscan_env, "").strip(),
|
|
"helius_api_key": os.getenv(helius_env, "").strip(),
|
|
}
|
|
|
|
|
|
def _normalize_chain_list(value):
|
|
if isinstance(value, str):
|
|
return [x.strip().lower() for x in value.split(",") if x.strip()]
|
|
return [str(x).strip().lower() for x in (value or []) if str(x).strip()]
|
|
|
|
|
|
def _load_token_mappings(config_value=None, env_value=""):
|
|
items = []
|
|
if isinstance(config_value, list):
|
|
items.extend(config_value)
|
|
if env_value:
|
|
try:
|
|
parsed = json.loads(env_value)
|
|
if isinstance(parsed, list):
|
|
items.extend(parsed)
|
|
except Exception:
|
|
for part in str(env_value or "").split(","):
|
|
bits = [x.strip() for x in part.split(":")]
|
|
if len(bits) >= 3:
|
|
items.append({"symbol": bits[0], "chain": bits[1], "contract_address": bits[2]})
|
|
normalized = []
|
|
seen = set()
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
symbol = normalize_symbol(item.get("symbol"))
|
|
chain = str(item.get("chain") or "").lower().strip()
|
|
contract = str(item.get("contract_address") or item.get("address") or "").strip()
|
|
if not symbol or not chain or not contract:
|
|
continue
|
|
key = (symbol, chain, contract.lower())
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
normalized.append({
|
|
"symbol": symbol,
|
|
"chain": chain,
|
|
"contract_address": contract,
|
|
"source": item.get("source") or "nodereal_seed",
|
|
"confidence": int(item.get("confidence") or 95),
|
|
"raw": item.get("raw") or {},
|
|
})
|
|
return normalized
|
|
|
|
|
|
def seed_configured_token_mappings(cfg=None):
|
|
cfg = cfg or get_onchain_params()
|
|
seeded = []
|
|
errors = []
|
|
for item in cfg.get("token_mappings") or []:
|
|
try:
|
|
mapping_id = onchain_db.upsert_token_mapping(
|
|
item["symbol"],
|
|
item["chain"],
|
|
item["contract_address"],
|
|
source=item.get("source") or "nodereal_seed",
|
|
confidence=item.get("confidence") or 95,
|
|
raw=item.get("raw") or {},
|
|
is_active=True,
|
|
)
|
|
if mapping_id:
|
|
seeded.append(item)
|
|
except Exception as exc:
|
|
errors.append(f"{item.get('symbol')}:seed_mapping:{str(exc)[:160]}")
|
|
return {"seeded": len(seeded), "items": seeded, "errors": errors}
|
|
|
|
|
|
def _now():
|
|
return datetime.now()
|
|
|
|
|
|
def _request_json(url, params=None, timeout=15):
|
|
resp = requests.get(url, params=params or {}, timeout=timeout, headers={"User-Agent": "AlphaX-Agent-Crypto/1.0"})
|
|
if resp.status_code >= 400:
|
|
raise RuntimeError(f"http_{resp.status_code}:{resp.text[:200]}")
|
|
return resp.json()
|
|
|
|
|
|
def _safe_float(value, default=0.0):
|
|
try:
|
|
return float(value or 0)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _safe_pct_change(new_value, old_value):
|
|
new_value = _safe_float(new_value)
|
|
old_value = _safe_float(old_value)
|
|
if old_value <= 0:
|
|
return 0.0
|
|
return (new_value - old_value) / old_value * 100
|
|
|
|
|
|
def _safe_int(value, default=0):
|
|
try:
|
|
return int(float(value or 0))
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _chain_alias(value):
|
|
key = str(value or "").lower()
|
|
return DEX_CHAIN_ALIASES.get(key, key)
|
|
|
|
|
|
def _chain_explorer_tx_url(chain, tx_hash):
|
|
tx_hash = str(tx_hash or "").strip()
|
|
if not tx_hash:
|
|
return ""
|
|
if chain == "ethereum":
|
|
return f"https://etherscan.io/tx/{tx_hash}"
|
|
if chain == "bsc":
|
|
return f"https://bscscan.com/tx/{tx_hash}"
|
|
if chain == "base":
|
|
return f"https://basescan.org/tx/{tx_hash}"
|
|
if chain == "arbitrum":
|
|
return f"https://arbiscan.io/tx/{tx_hash}"
|
|
if chain == "solana":
|
|
return f"https://solscan.io/tx/{tx_hash}"
|
|
return ""
|
|
|
|
|
|
def _latest_metric(symbol, chain, contract_address):
|
|
conn = get_conn()
|
|
row = conn.execute(
|
|
"""
|
|
SELECT * FROM onchain_token_metrics
|
|
WHERE symbol=%s AND chain=%s AND contract_address=%s AND "window"='1h'
|
|
ORDER BY metric_time DESC, id DESC LIMIT 1
|
|
""",
|
|
(symbol, chain, contract_address or ""),
|
|
).fetchone()
|
|
conn.close()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def _event_amount(item):
|
|
return _safe_float(item.get("amount"))
|
|
|
|
|
|
def _event_total_amount(item):
|
|
return _safe_float(item.get("totalAmount") or item.get("total_amount"))
|
|
|
|
|
|
def _raw_importance(event_type, item):
|
|
amount = _event_amount(item)
|
|
total = _event_total_amount(item)
|
|
if event_type == "token_boost_top":
|
|
return max(total, amount, 1)
|
|
if event_type == "token_boost_latest":
|
|
return max(amount, total * 0.5, 1)
|
|
return 1
|
|
|
|
|
|
def normalize_dexscreener_raw_event(item, event_type, cfg=None):
|
|
cfg = cfg or get_onchain_params()
|
|
chain = _chain_alias(item.get("chainId"))
|
|
if chain not in set(cfg.get("chains") or DEFAULT_CHAINS):
|
|
return None
|
|
token_address = str(item.get("tokenAddress") or "").strip()
|
|
if not token_address:
|
|
return None
|
|
mapping = find_mapping_by_contract(chain, token_address)
|
|
links = item.get("links") or []
|
|
symbol_guess = ""
|
|
name = ""
|
|
if isinstance(links, list):
|
|
for link in links:
|
|
if not isinstance(link, dict):
|
|
continue
|
|
if not name and link.get("label"):
|
|
name = str(link.get("label") or "")
|
|
raw = {
|
|
"chainId": item.get("chainId"),
|
|
"tokenAddress": token_address,
|
|
"url": item.get("url") or "",
|
|
"description": item.get("description") or "",
|
|
"icon": item.get("icon") or "",
|
|
"header": item.get("header") or "",
|
|
"links": links,
|
|
"amount": item.get("amount"),
|
|
"totalAmount": item.get("totalAmount"),
|
|
}
|
|
title = "DEX Screener"
|
|
if event_type == "token_profile_latest":
|
|
title = "DEX 新币资料变更"
|
|
elif event_type == "token_boost_latest":
|
|
title = "DEX 付费曝光新增"
|
|
elif event_type == "token_boost_top":
|
|
title = "DEX 付费曝光榜"
|
|
return {
|
|
"source": "dexscreener",
|
|
"chain": chain,
|
|
"event_type": event_type,
|
|
"token_address": token_address,
|
|
"symbol_guess": symbol_guess,
|
|
"name": name,
|
|
"title": title,
|
|
"description": item.get("description") or "",
|
|
"url": item.get("url") or "",
|
|
"icon": item.get("icon") or "",
|
|
"amount": _event_amount(item),
|
|
"total_amount": _event_total_amount(item),
|
|
"importance": _raw_importance(event_type, item),
|
|
"mapped_symbol": mapping.get("symbol") if mapping else "",
|
|
"mapping_status": "mapped" if mapping else "unmapped",
|
|
"detected_at": _now().isoformat(timespec="seconds"),
|
|
"raw": raw,
|
|
}
|
|
|
|
|
|
def fetch_dexscreener_raw_events(limit=80):
|
|
cfg = get_onchain_params()
|
|
if not cfg.get("dexscreener_enabled", True):
|
|
return {"raw_events": [], "errors": ["dexscreener_disabled"]}
|
|
inserted = []
|
|
errors = []
|
|
per_source_limit = max(1, int(limit or 80))
|
|
for event_type, url in DEXSCREENER_RAW_ENDPOINTS:
|
|
try:
|
|
data = _request_json(url, timeout=cfg.get("timeout", 15))
|
|
items = data if isinstance(data, list) else data.get("items") or data.get("data") or []
|
|
for item in items[:per_source_limit]:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
event = normalize_dexscreener_raw_event(item, event_type, cfg=cfg)
|
|
if not event:
|
|
continue
|
|
if insert_onchain_raw_event(event):
|
|
inserted.append(event)
|
|
except Exception as exc:
|
|
errors.append(f"{event_type}:{str(exc)[:160]}")
|
|
return {"raw_events": inserted, "errors": errors}
|
|
|
|
|
|
def _discover_seed_symbols(limit=120):
|
|
conn = get_conn()
|
|
symbols = []
|
|
try:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT DISTINCT symbol
|
|
FROM recommendation
|
|
WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history'
|
|
ORDER BY rec_time DESC
|
|
LIMIT %s
|
|
""",
|
|
(int(limit or 120),),
|
|
).fetchall()
|
|
symbols.extend([row["symbol"] for row in rows if row["symbol"]])
|
|
except Exception:
|
|
pass
|
|
try:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT DISTINCT symbol
|
|
FROM coin_state
|
|
WHERE state != '过期'
|
|
ORDER BY detected_at DESC
|
|
LIMIT %s
|
|
""",
|
|
(int(limit or 120),),
|
|
).fetchall()
|
|
symbols.extend([row["symbol"] for row in rows if row["symbol"]])
|
|
except Exception:
|
|
pass
|
|
conn.close()
|
|
seen = set()
|
|
ordered = []
|
|
for symbol in symbols:
|
|
norm = normalize_symbol(symbol)
|
|
if not norm or norm in seen or not _tradable_symbol(norm):
|
|
continue
|
|
seen.add(norm)
|
|
ordered.append(norm)
|
|
return ordered[: int(limit or 120)]
|
|
|
|
|
|
def _score_pair_candidate(pair, requested_symbol, chains):
|
|
base = (pair.get("baseToken") or {})
|
|
quote = (pair.get("quoteToken") or {})
|
|
base_symbol = str(base.get("symbol") or "").upper()
|
|
req_base = str(requested_symbol or "").split("/")[0].upper()
|
|
liquidity = _safe_float((pair.get("liquidity") or {}).get("usd"))
|
|
volume = _safe_float((pair.get("volume") or {}).get("h24"))
|
|
chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or "").lower(), str(pair.get("chainId") or "").lower())
|
|
score = 0
|
|
if base_symbol == req_base:
|
|
score += 50
|
|
if chain in set(chains or []):
|
|
score += 15
|
|
if quote.get("symbol") in ("USDT", "USDC", "USD", "FDUSD", "USDE", "DAI", "USDS"):
|
|
score += 10
|
|
if liquidity >= 100000:
|
|
score += 10
|
|
if volume >= 100000:
|
|
score += 10
|
|
if liquidity >= 500000:
|
|
score += 5
|
|
return score
|
|
|
|
|
|
def _pair_rejection_reason(pair, requested_symbol, chains):
|
|
base = pair.get("baseToken") or {}
|
|
quote = pair.get("quoteToken") or {}
|
|
req_base = str(requested_symbol or "").split("/")[0].upper()
|
|
base_symbol = str(base.get("symbol") or "").upper()
|
|
base_name = str(base.get("name") or "").lower()
|
|
pair_url = str(pair.get("url") or "").lower()
|
|
chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or "").lower(), str(pair.get("chainId") or "").lower())
|
|
|
|
if base_symbol != req_base:
|
|
return "symbol_mismatch"
|
|
if chain not in set(chains or []):
|
|
return "chain_not_supported"
|
|
if req_base in NON_TARGET_NATIVE_BASES:
|
|
return "native_chain_not_in_scope"
|
|
if chain == "solana" and req_base not in SOLANA_AUTO_ALLOWLIST:
|
|
return "solana_not_allowlisted"
|
|
text = " ".join([base_name, base_symbol.lower(), str(quote.get("symbol") or "").lower(), pair_url])
|
|
if any(marker in text for marker in BRIDGED_TOKEN_MARKERS):
|
|
return "bridged_or_wrapped_token"
|
|
return ""
|
|
|
|
|
|
def discover_token_mappings(limit=60):
|
|
cfg = get_onchain_params()
|
|
chains = set(cfg.get("chains") or DEFAULT_CHAINS)
|
|
seeds = _discover_seed_symbols(limit=limit)
|
|
if not seeds:
|
|
return {"inserted": 0, "candidates": [], "errors": ["no_seed_symbols"]}
|
|
inserted = []
|
|
errors = []
|
|
for symbol in seeds:
|
|
existing = get_token_mappings(symbol, min_confidence=1, active_only=False)
|
|
if existing:
|
|
continue
|
|
base = symbol.split("/")[0]
|
|
try:
|
|
data = _request_json("https://api.dexscreener.com/latest/dex/search", params={"q": base}, timeout=cfg.get("timeout", 15))
|
|
pairs = data.get("pairs") or []
|
|
pair_candidates = []
|
|
for pair in pairs:
|
|
chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or "").lower(), str(pair.get("chainId") or "").lower())
|
|
if chain not in chains:
|
|
continue
|
|
if _pair_rejection_reason(pair, symbol, chains):
|
|
continue
|
|
pair_candidates.append((pair, _score_pair_candidate(pair, symbol, chains)))
|
|
if not pair_candidates:
|
|
continue
|
|
pair_candidates.sort(key=lambda x: (x[1], _safe_float((x[0].get("liquidity") or {}).get("usd")), _safe_float((x[0].get("volume") or {}).get("h24"))), reverse=True)
|
|
best, score = pair_candidates[0]
|
|
if score < 55:
|
|
continue
|
|
base_token = best.get("baseToken") or {}
|
|
chain = DEX_CHAIN_ALIASES.get(str(best.get("chainId") or "").lower(), str(best.get("chainId") or "").lower())
|
|
contract = str(base_token.get("address") or "").strip()
|
|
if not contract:
|
|
continue
|
|
confidence = min(95, 60 + score)
|
|
mapping_id = onchain_db.upsert_token_mapping(
|
|
symbol=symbol,
|
|
chain=chain,
|
|
contract_address=contract,
|
|
source="dexscreener_search",
|
|
confidence=confidence,
|
|
raw={
|
|
"search_query": base,
|
|
"matched_pair": {
|
|
"pairAddress": best.get("pairAddress") or "",
|
|
"dexId": best.get("dexId") or "",
|
|
"url": best.get("url") or "",
|
|
"liquidity": best.get("liquidity") or {},
|
|
"volume": best.get("volume") or {},
|
|
"priceChange": best.get("priceChange") or {},
|
|
"baseToken": base_token,
|
|
"quoteToken": best.get("quoteToken") or {},
|
|
},
|
|
},
|
|
is_active=True,
|
|
)
|
|
if mapping_id:
|
|
inserted.append({"symbol": symbol, "chain": chain, "contract_address": contract, "confidence": confidence})
|
|
except Exception as exc:
|
|
errors.append(f"{symbol}:{str(exc)[:160]}")
|
|
return {"inserted": len(inserted), "candidates": inserted, "errors": errors}
|
|
|
|
|
|
def _score_metric(metric):
|
|
score = 0.0
|
|
risk = 0.0
|
|
vol_change = _safe_float(metric.get("dex_volume_change_pct"))
|
|
liq_change = _safe_float(metric.get("liquidity_change_pct"))
|
|
netflow = _safe_float(metric.get("exchange_netflow_usd"))
|
|
whale = _safe_float(metric.get("whale_accumulation_usd"))
|
|
smart = _safe_float(metric.get("smart_money_score"))
|
|
if vol_change > 0:
|
|
score += min(35, vol_change / 4)
|
|
if liq_change > 0:
|
|
score += min(20, liq_change / 3)
|
|
if netflow < 0:
|
|
score += min(20, abs(netflow) / 100000)
|
|
if whale > 0:
|
|
score += min(20, whale / 100000)
|
|
score += min(20, smart)
|
|
if liq_change < 0:
|
|
risk += min(40, abs(liq_change))
|
|
if netflow > 0:
|
|
risk += min(35, netflow / 100000)
|
|
metric["onchain_score"] = round(min(score, 100), 2)
|
|
metric["risk_score"] = round(min(risk, 100), 2)
|
|
return metric
|
|
|
|
|
|
def derive_dex_signals(metric, cfg=None):
|
|
cfg = cfg or get_onchain_params()
|
|
signals = []
|
|
vol_change = _safe_float(metric.get("dex_volume_change_pct"))
|
|
liq_change = _safe_float(metric.get("liquidity_change_pct"))
|
|
volume_1h = _safe_float(metric.get("dex_volume_1h_usd"))
|
|
volume_24h = _safe_float(metric.get("dex_volume_usd"))
|
|
hour_share_pct = (volume_1h / volume_24h * 100) if volume_24h > 0 else 0
|
|
if vol_change >= cfg.get("dex_volume_spike_pct", 80):
|
|
signals.append("dex_volume_spike")
|
|
elif (
|
|
volume_1h >= cfg.get("dex_min_hour_volume_usd", 50000)
|
|
and hour_share_pct >= cfg.get("dex_hour_volume_share_pct", 8)
|
|
):
|
|
signals.append("dex_volume_spike")
|
|
if liq_change >= cfg.get("liquidity_add_pct", 25):
|
|
signals.append("liquidity_add")
|
|
if liq_change <= cfg.get("liquidity_remove_pct", -25):
|
|
signals.append("liquidity_remove_risk")
|
|
return signals
|
|
|
|
|
|
def _event_from_metric(metric, signal_code, source="dexscreener"):
|
|
direction = signal_direction(signal_code)
|
|
severity = "RISK" if direction == "risk" else "A" if _safe_float(metric.get("onchain_score")) >= 75 else "B"
|
|
return {
|
|
"chain": metric.get("chain"),
|
|
"symbol": metric.get("symbol"),
|
|
"contract_address": metric.get("contract_address") or "",
|
|
"event_type": "onchain_signal",
|
|
"signal_code": signal_code,
|
|
"signal_label": signal_label(signal_code),
|
|
"direction": direction,
|
|
"value_usd": metric.get("dex_volume_usd") or metric.get("whale_accumulation_usd") or abs(metric.get("exchange_netflow_usd") or 0),
|
|
"confidence": 75 if direction != "risk" else 80,
|
|
"severity": severity,
|
|
"detected_at": metric.get("metric_time") or _now().isoformat(),
|
|
"source": source,
|
|
"url": metric.get("url") or "",
|
|
"raw": metric,
|
|
}
|
|
|
|
|
|
def normalize_dexscreener_pair(pair, mapping, cfg=None):
|
|
cfg = cfg or get_onchain_params()
|
|
symbol = normalize_symbol(mapping.get("symbol"))
|
|
chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or mapping.get("chain") or "").lower(), str(mapping.get("chain") or "").lower())
|
|
contract = mapping.get("contract_address") or (pair.get("baseToken") or {}).get("address") or ""
|
|
liquidity = _safe_float((pair.get("liquidity") or {}).get("usd"))
|
|
volume_map = pair.get("volume") or {}
|
|
volume = _safe_float(volume_map.get("h24"))
|
|
volume_1h = _safe_float(volume_map.get("h1"))
|
|
volume_5m = _safe_float(volume_map.get("m5"))
|
|
volume_6h = _safe_float(volume_map.get("h6"))
|
|
txns_map = pair.get("txns") or {}
|
|
txns_h1 = txns_map.get("h1") or {}
|
|
prev = _latest_metric(symbol, chain, contract)
|
|
prev_volume = _safe_float(prev.get("dex_volume_usd") if prev else 0)
|
|
prev_liquidity = _safe_float(prev.get("liquidity_usd") if prev else 0)
|
|
metric = {
|
|
"symbol": symbol,
|
|
"chain": chain,
|
|
"contract_address": contract,
|
|
"window": "1h",
|
|
"metric_time": _now().isoformat(timespec="seconds"),
|
|
"dex_volume_usd": volume,
|
|
"dex_volume_1h_usd": volume_1h,
|
|
"dex_volume_5m_usd": volume_5m,
|
|
"dex_volume_6h_usd": volume_6h,
|
|
"dex_volume_1h_share_pct": round(volume_1h / volume * 100, 2) if volume > 0 else 0,
|
|
"dex_volume_change_pct": _safe_pct_change(volume, prev_volume),
|
|
"liquidity_usd": liquidity,
|
|
"liquidity_change_pct": _safe_pct_change(liquidity, prev_liquidity),
|
|
"exchange_netflow_usd": 0,
|
|
"whale_accumulation_usd": 0,
|
|
"holder_delta": 0,
|
|
"smart_money_score": 0,
|
|
"source": "dexscreener",
|
|
"url": pair.get("url") or "",
|
|
"raw": {
|
|
"pair_address": pair.get("pairAddress") or "",
|
|
"dex_id": pair.get("dexId") or "",
|
|
"price_usd": pair.get("priceUsd") or "",
|
|
"fdv": pair.get("fdv") or 0,
|
|
"txns": pair.get("txns") or {},
|
|
"price_change": pair.get("priceChange") or {},
|
|
"volume": pair.get("volume") or {},
|
|
"liquidity": pair.get("liquidity") or {},
|
|
"derived": {
|
|
"dex_volume_1h_usd": volume_1h,
|
|
"dex_volume_5m_usd": volume_5m,
|
|
"dex_volume_6h_usd": volume_6h,
|
|
"dex_volume_1h_share_pct": round(volume_1h / volume * 100, 2) if volume > 0 else 0,
|
|
"h1_buys": int(txns_h1.get("buys") or 0),
|
|
"h1_sells": int(txns_h1.get("sells") or 0),
|
|
},
|
|
},
|
|
}
|
|
return _score_metric(metric)
|
|
|
|
|
|
def fetch_dexscreener_metrics(limit=60):
|
|
cfg = get_onchain_params()
|
|
if not cfg.get("dexscreener_enabled", True):
|
|
return {"metrics": [], "events": [], "errors": ["dexscreener_disabled"]}
|
|
mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE)
|
|
bootstrap = None
|
|
if not mappings:
|
|
bootstrap = discover_token_mappings(limit=limit)
|
|
mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE)
|
|
metrics = []
|
|
events = []
|
|
errors = []
|
|
if bootstrap:
|
|
errors.extend(bootstrap.get("errors") or [])
|
|
for mapping in mappings[: int(limit or 60)]:
|
|
symbol = normalize_symbol(mapping.get("symbol"))
|
|
if not symbol or not _tradable_symbol(symbol):
|
|
continue
|
|
try:
|
|
url = "https://api.dexscreener.com/latest/dex/tokens/" + str(mapping.get("contract_address") or "").strip()
|
|
data = _request_json(url, timeout=cfg.get("timeout", 15))
|
|
pairs = data.get("pairs") or []
|
|
wanted_chain = DEX_CHAIN_ALIASES.get(str(mapping.get("chain") or "").lower(), str(mapping.get("chain") or "").lower())
|
|
pairs = [p for p in pairs if DEX_CHAIN_ALIASES.get(str(p.get("chainId") or "").lower(), str(p.get("chainId") or "").lower()) == wanted_chain]
|
|
if not pairs:
|
|
continue
|
|
best = max(pairs, key=lambda p: _safe_float((p.get("liquidity") or {}).get("usd")))
|
|
metric = normalize_dexscreener_pair(best, mapping, cfg=cfg)
|
|
if metric.get("liquidity_usd", 0) < cfg.get("dex_min_liquidity_usd", 100000) and metric.get("dex_volume_usd", 0) < cfg.get("dex_min_volume_24h_usd", 100000):
|
|
insert_token_metric(metric)
|
|
metrics.append(metric)
|
|
continue
|
|
insert_token_metric(metric)
|
|
metrics.append(metric)
|
|
for code in derive_dex_signals(metric, cfg):
|
|
event = _event_from_metric(metric, code, source="dexscreener")
|
|
if insert_onchain_event(event):
|
|
events.append(event)
|
|
except Exception as exc:
|
|
errors.append(f"{symbol}:{str(exc)[:160]}")
|
|
return {"metrics": metrics, "events": events, "errors": errors}
|
|
|
|
|
|
def _event_from_etherscan_transfer(row, mapping, cfg=None):
|
|
cfg = cfg or get_onchain_params()
|
|
decimals = _safe_int(row.get("tokenDecimal"), 18)
|
|
amount = _safe_float(row.get("value")) / (10 ** decimals if decimals >= 0 else 1)
|
|
price_usd = _latest_price_from_metric(mapping)
|
|
value_usd = amount * price_usd if price_usd > 0 else 0
|
|
threshold = _safe_float(cfg.get("whale_tx_usd"), 250000)
|
|
if value_usd > 0 and value_usd < threshold:
|
|
return None
|
|
if value_usd <= 0 and amount <= 0:
|
|
return None
|
|
tx_hash = str(row.get("hash") or "").strip()
|
|
chain = str(mapping.get("chain") or "").lower()
|
|
return {
|
|
"chain": chain,
|
|
"symbol": mapping.get("symbol"),
|
|
"contract_address": mapping.get("contract_address") or "",
|
|
"event_type": "token_transfer",
|
|
"signal_code": "whale_accumulation" if value_usd >= threshold else "large_token_transfer",
|
|
"signal_label": signal_label("whale_accumulation" if value_usd >= threshold else "large_token_transfer"),
|
|
"direction": "positive" if value_usd >= threshold else "neutral",
|
|
"value_usd": value_usd,
|
|
"amount": amount,
|
|
"tx_hash": tx_hash,
|
|
"wallet_address": row.get("to") or "",
|
|
"wallet_label": "EVM 接收地址",
|
|
"counterparty_label": "EVM 发送地址 " + _short_addr(row.get("from") or ""),
|
|
"confidence": 74 if value_usd >= threshold else 58,
|
|
"severity": "A" if value_usd >= threshold else "B",
|
|
"detected_at": _ts_to_iso(row.get("timeStamp")),
|
|
"source": "etherscan",
|
|
"url": _chain_explorer_tx_url(chain, tx_hash),
|
|
"raw": row,
|
|
}
|
|
|
|
|
|
def _latest_price_from_metric(mapping):
|
|
symbol = normalize_symbol(mapping.get("symbol"))
|
|
chain = str(mapping.get("chain") or "").lower()
|
|
contract = str(mapping.get("contract_address") or "")
|
|
conn = get_conn()
|
|
try:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT raw_json
|
|
FROM onchain_token_metrics
|
|
WHERE symbol=%s AND chain=%s AND contract_address=%s
|
|
ORDER BY metric_time DESC, id DESC
|
|
LIMIT 8
|
|
""",
|
|
(symbol, chain, contract),
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
for row in rows:
|
|
try:
|
|
raw = json.loads(row.get("raw_json") or "{}")
|
|
except Exception:
|
|
raw = {}
|
|
price = _safe_float(raw.get("price_usd"))
|
|
if price > 0:
|
|
return price
|
|
cache = get_latest_price_cache([symbol])
|
|
item = cache.get(symbol) or {}
|
|
return _safe_float(item.get("price"))
|
|
|
|
|
|
def _hex_to_int(value):
|
|
text = str(value or "").strip()
|
|
if not text:
|
|
return 0
|
|
try:
|
|
return int(text, 16) if text.startswith("0x") else int(text)
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def _topic_to_address(topic):
|
|
topic = str(topic or "").lower()
|
|
if topic.startswith("0x") and len(topic) >= 42:
|
|
return "0x" + topic[-40:]
|
|
return ""
|
|
|
|
|
|
def _decode_abi_string(value):
|
|
text = str(value or "").strip()
|
|
if not text or text == "0x":
|
|
return ""
|
|
payload = text[2:] if text.startswith("0x") else text
|
|
try:
|
|
raw = bytes.fromhex(payload)
|
|
except Exception:
|
|
return ""
|
|
if not raw:
|
|
return ""
|
|
try:
|
|
if len(raw) >= 96:
|
|
offset = int.from_bytes(raw[:32], "big")
|
|
if 0 <= offset + 32 <= len(raw):
|
|
length = int.from_bytes(raw[offset:offset + 32], "big")
|
|
body = raw[offset + 32:offset + 32 + length]
|
|
return body.decode("utf-8", errors="ignore").strip("\x00 ").strip()
|
|
return raw.rstrip(b"\x00").decode("utf-8", errors="ignore").strip()
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _clean_erc20_symbol(value):
|
|
symbol = str(value or "").upper().strip().replace("$", "")
|
|
symbol = "".join(ch for ch in symbol if ch.isalnum())
|
|
if symbol.endswith("USDT") and len(symbol) > 4:
|
|
symbol = symbol[:-4]
|
|
return symbol[:20]
|
|
|
|
|
|
def _is_auto_mapping_symbol_allowed(base, token_name=""):
|
|
base = _clean_erc20_symbol(base)
|
|
if not base:
|
|
return False
|
|
symbol = f"{base}/USDT"
|
|
if not _tradable_symbol(symbol):
|
|
return False
|
|
if base in NON_TARGET_NATIVE_BASES:
|
|
return False
|
|
text = f"{base} {token_name or ''}".lower()
|
|
if any(marker in text for marker in BRIDGED_TOKEN_MARKERS):
|
|
return False
|
|
return True
|
|
|
|
|
|
def _read_erc20_metadata(client, chain, contract):
|
|
metadata = {"symbol": "", "name": "", "decimals": 18}
|
|
try:
|
|
metadata["symbol"] = _decode_abi_string(client.eth_call(chain, contract, ERC20_SYMBOL_SELECTOR))
|
|
except Exception:
|
|
metadata["symbol"] = ""
|
|
try:
|
|
metadata["name"] = _decode_abi_string(client.eth_call(chain, contract, ERC20_NAME_SELECTOR))
|
|
except Exception:
|
|
metadata["name"] = ""
|
|
try:
|
|
decimals = _hex_to_int(client.eth_call(chain, contract, ERC20_DECIMALS_SELECTOR))
|
|
if 0 <= decimals <= 36:
|
|
metadata["decimals"] = decimals
|
|
except Exception:
|
|
pass
|
|
metadata["symbol"] = _clean_erc20_symbol(metadata.get("symbol"))
|
|
return metadata
|
|
|
|
|
|
def _auto_map_nodereal_contract(client, chain, contract, cfg=None):
|
|
cfg = cfg or get_onchain_params()
|
|
if not cfg.get("nodereal_auto_mapping_enabled", True):
|
|
return None
|
|
existing = find_mapping_by_contract(chain, contract)
|
|
if existing:
|
|
return existing
|
|
metadata = _read_erc20_metadata(client, chain, contract)
|
|
base = metadata.get("symbol") or ""
|
|
if not _is_auto_mapping_symbol_allowed(base, metadata.get("name")):
|
|
return None
|
|
symbol = normalize_symbol(base)
|
|
confidence = max(1, min(95, int(cfg.get("nodereal_auto_mapping_confidence") or 82)))
|
|
mapping_id = onchain_db.upsert_token_mapping(
|
|
symbol=symbol,
|
|
chain=chain,
|
|
contract_address=contract,
|
|
source="nodereal_erc20_metadata",
|
|
confidence=confidence,
|
|
raw=metadata,
|
|
is_active=True,
|
|
)
|
|
if not mapping_id:
|
|
return None
|
|
return {
|
|
"id": mapping_id,
|
|
"symbol": symbol,
|
|
"chain": str(chain or "").lower(),
|
|
"contract_address": contract,
|
|
"source": "nodereal_erc20_metadata",
|
|
"confidence": confidence,
|
|
"raw_json": json.dumps(metadata, ensure_ascii=False),
|
|
}
|
|
|
|
|
|
def _nodereal_client(cfg=None):
|
|
cfg = cfg or get_onchain_params()
|
|
return NodeRealClient(
|
|
NodeRealConfig(
|
|
api_key=cfg.get("nodereal_api_key") or "",
|
|
timeout=int(cfg.get("timeout") or 15),
|
|
endpoints=dict(DEFAULT_CHAIN_ENDPOINTS),
|
|
)
|
|
)
|
|
|
|
|
|
def _event_from_nodereal_transfer(log, mapping, cfg=None):
|
|
cfg = cfg or get_onchain_params()
|
|
topics = log.get("topics") or []
|
|
if len(topics) < 3:
|
|
return None
|
|
amount_raw = _hex_to_int(log.get("data"))
|
|
mapping_raw = {}
|
|
try:
|
|
mapping_raw = json.loads(mapping.get("raw_json") or "{}")
|
|
except Exception:
|
|
mapping_raw = {}
|
|
decimals = _safe_int(mapping_raw.get("decimals") or mapping_raw.get("tokenDecimal") or 18, 18)
|
|
amount = amount_raw / (10 ** decimals if decimals >= 0 else 1)
|
|
price_usd = _latest_price_from_metric(mapping)
|
|
value_usd = amount * price_usd if price_usd > 0 else 0
|
|
threshold = _safe_float(cfg.get("whale_tx_usd"), 250000)
|
|
if value_usd <= 0 or value_usd < threshold:
|
|
return None
|
|
chain = str(mapping.get("chain") or "").lower()
|
|
tx_hash = str(log.get("transactionHash") or "").strip()
|
|
return {
|
|
"chain": chain,
|
|
"symbol": mapping.get("symbol"),
|
|
"contract_address": mapping.get("contract_address") or "",
|
|
"event_type": "token_transfer",
|
|
"signal_code": "whale_accumulation",
|
|
"signal_label": signal_label("whale_accumulation"),
|
|
"direction": "positive",
|
|
"value_usd": value_usd,
|
|
"amount": amount,
|
|
"tx_hash": tx_hash,
|
|
"wallet_address": _topic_to_address(topics[2]),
|
|
"wallet_label": "EVM 接收地址",
|
|
"counterparty_label": "EVM 发送地址 " + _short_addr(_topic_to_address(topics[1])),
|
|
"confidence": 76,
|
|
"severity": "A",
|
|
"detected_at": _now().isoformat(timespec="seconds"),
|
|
"source": "nodereal",
|
|
"url": _chain_explorer_tx_url(chain, tx_hash),
|
|
"raw": log,
|
|
}
|
|
|
|
|
|
def _raw_event_from_nodereal_transfer(log, chain):
|
|
topics = log.get("topics") or []
|
|
if len(topics) < 3:
|
|
return None
|
|
contract = str(log.get("address") or "").strip()
|
|
tx_hash = str(log.get("transactionHash") or "").strip()
|
|
amount_raw = _hex_to_int(log.get("data"))
|
|
if not contract or amount_raw <= 0:
|
|
return None
|
|
from_addr = _topic_to_address(topics[1])
|
|
to_addr = _topic_to_address(topics[2])
|
|
return {
|
|
"source": "nodereal",
|
|
"chain": str(chain or "").lower(),
|
|
"event_type": "evm_transfer",
|
|
"token_address": contract,
|
|
"symbol_guess": "",
|
|
"name": "",
|
|
"title": "NodeReal ERC-20 原始转账",
|
|
"description": f"合约 {_short_addr(contract)} · {_short_addr(from_addr)} -> {_short_addr(to_addr)}",
|
|
"url": _chain_explorer_tx_url(chain, tx_hash),
|
|
"amount": amount_raw,
|
|
"total_amount": 0,
|
|
"importance": min(100, max(1, len(str(amount_raw)) * 4)),
|
|
"mapped_symbol": "",
|
|
"mapping_status": "unmapped",
|
|
"detected_at": _now().isoformat(timespec="seconds"),
|
|
"raw": log,
|
|
}
|
|
|
|
|
|
def _apply_raw_event_mapping(raw_event, client=None, cfg=None):
|
|
item = dict(raw_event or {})
|
|
chain = str(item.get("chain") or "").lower()
|
|
contract = str(item.get("token_address") or "").strip()
|
|
if not chain or not contract:
|
|
return item
|
|
mapping = find_mapping_by_contract(chain, contract)
|
|
if not mapping and client:
|
|
mapping = _auto_map_nodereal_contract(client, chain, contract, cfg=cfg)
|
|
if mapping:
|
|
item["mapped_symbol"] = normalize_symbol(mapping.get("symbol"))
|
|
item["mapping_status"] = "mapped"
|
|
item["symbol_guess"] = item.get("symbol_guess") or item["mapped_symbol"].split("/")[0]
|
|
item["raw"] = {
|
|
**(item.get("raw") or {}),
|
|
"mapping": {
|
|
"symbol": item["mapped_symbol"],
|
|
"source": mapping.get("source") or "",
|
|
"confidence": mapping.get("confidence") or 0,
|
|
},
|
|
}
|
|
return item
|
|
|
|
|
|
def _metric_from_nodereal_holder_count(holder_count, mapping):
|
|
symbol = normalize_symbol(mapping.get("symbol"))
|
|
chain = str(mapping.get("chain") or "").lower()
|
|
contract = str(mapping.get("contract_address") or "")
|
|
prev = _latest_metric(symbol, chain, contract)
|
|
prev_count = 0
|
|
if prev:
|
|
try:
|
|
prev_raw = json.loads(prev.get("raw_json") or "{}")
|
|
prev_count = _safe_int(prev_raw.get("holder_count"))
|
|
except Exception:
|
|
prev_count = 0
|
|
holder_delta = holder_count - prev_count if prev_count > 0 else 0
|
|
metric = {
|
|
"symbol": symbol,
|
|
"chain": chain,
|
|
"contract_address": contract,
|
|
"window": "1h",
|
|
"metric_time": _now().isoformat(timespec="seconds"),
|
|
"holder_delta": holder_delta,
|
|
"smart_money_score": 0,
|
|
"source": "nodereal",
|
|
"raw": {
|
|
"holder_count": holder_count,
|
|
"previous_holder_count": prev_count,
|
|
},
|
|
}
|
|
if holder_delta > 0:
|
|
metric["onchain_score"] = min(30, holder_delta)
|
|
elif holder_delta < 0:
|
|
metric["risk_score"] = min(30, abs(holder_delta))
|
|
return metric
|
|
|
|
|
|
def _event_from_holder_metric(metric):
|
|
holder_delta = _safe_float(metric.get("holder_delta"))
|
|
if holder_delta <= 0:
|
|
return None
|
|
if holder_delta < 20:
|
|
return None
|
|
return _event_from_metric(metric, "holder_growth", source="nodereal")
|
|
|
|
|
|
def fetch_nodereal_events(limit=60):
|
|
cfg = get_onchain_params()
|
|
if not cfg.get("nodereal_enabled", True):
|
|
return {"metrics": [], "events": [], "errors": ["nodereal_disabled"]}
|
|
if not cfg.get("nodereal_api_key"):
|
|
return {"metrics": [], "events": [], "errors": ["nodereal_api_key_missing"]}
|
|
seed_result = seed_configured_token_mappings(cfg)
|
|
client = _nodereal_client(cfg)
|
|
raw_result = fetch_nodereal_raw_events(client=client, cfg=cfg, limit=limit)
|
|
enabled_chains = set(cfg.get("nodereal_chains") or DEFAULT_CHAINS)
|
|
all_mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE)
|
|
chain_mappings = [m for m in all_mappings if str(m.get("chain") or "").lower() in enabled_chains]
|
|
mappings = []
|
|
unsupported_chains = set()
|
|
for mapping in chain_mappings:
|
|
chain = str(mapping.get("chain") or "").lower()
|
|
if client.supports_chain(chain):
|
|
mappings.append(mapping)
|
|
else:
|
|
unsupported_chains.add(chain)
|
|
metrics = []
|
|
events = []
|
|
errors = list(seed_result.get("errors") or []) + list(raw_result.get("errors") or [])
|
|
diagnostics = {
|
|
"seeded_mappings": seed_result.get("seeded", 0),
|
|
"mapping_total": len(all_mappings),
|
|
"chain_mapping_total": len(chain_mappings),
|
|
"supported_mapping_total": len(mappings),
|
|
"enabled_chains": sorted(enabled_chains),
|
|
"unsupported_chains": sorted(unsupported_chains),
|
|
}
|
|
lookback = max(1, int(cfg.get("nodereal_log_block_lookback") or 120))
|
|
max_logs = max(1, int(cfg.get("nodereal_max_logs_per_token") or 25))
|
|
for mapping in mappings[: int(limit or 60)]:
|
|
chain = str(mapping.get("chain") or "").lower()
|
|
contract = str(mapping.get("contract_address") or "").strip()
|
|
if not contract:
|
|
continue
|
|
try:
|
|
holder_count = client.token_holder_count(chain, contract)
|
|
if holder_count:
|
|
metric = _metric_from_nodereal_holder_count(holder_count, mapping)
|
|
insert_token_metric(metric)
|
|
metrics.append(metric)
|
|
holder_event = _event_from_holder_metric(metric)
|
|
if holder_event and insert_onchain_event(holder_event):
|
|
events.append(holder_event)
|
|
except Exception as exc:
|
|
errors.append(f"{mapping.get('symbol')}:nodereal_holder:{str(exc)[:160]}")
|
|
try:
|
|
latest = client.block_number(chain)
|
|
if latest <= 0:
|
|
continue
|
|
logs = client.get_logs(
|
|
chain,
|
|
{
|
|
"address": contract,
|
|
"fromBlock": hex(max(0, latest - lookback)),
|
|
"toBlock": hex(latest),
|
|
"topics": [TRANSFER_TOPIC],
|
|
},
|
|
)
|
|
for log in logs[:max_logs]:
|
|
if not isinstance(log, dict):
|
|
continue
|
|
event = _event_from_nodereal_transfer(log, mapping, cfg=cfg)
|
|
if not event:
|
|
continue
|
|
if insert_onchain_event(event):
|
|
events.append(event)
|
|
except Exception as exc:
|
|
errors.append(f"{mapping.get('symbol')}:nodereal_logs:{str(exc)[:160]}")
|
|
if not all_mappings:
|
|
diagnostics["mapping_note"] = "no_strategy_mappings_raw_events_only"
|
|
elif not chain_mappings:
|
|
diagnostics["mapping_note"] = "no_enabled_chain_mappings_raw_events_only"
|
|
elif not mappings:
|
|
diagnostics["mapping_note"] = "no_supported_mappings_raw_events_only"
|
|
return {
|
|
"metrics": metrics,
|
|
"events": events,
|
|
"raw_events": raw_result.get("raw_events") or [],
|
|
"errors": errors,
|
|
"diagnostics": diagnostics,
|
|
}
|
|
|
|
|
|
def fetch_nodereal_raw_events(client=None, cfg=None, limit=60):
|
|
cfg = cfg or get_onchain_params()
|
|
if not cfg.get("nodereal_raw_transfer_enabled", True):
|
|
return {"raw_events": [], "errors": []}
|
|
client = client or _nodereal_client(cfg)
|
|
chains = [c for c in (cfg.get("nodereal_chains") or DEFAULT_CHAINS) if client.supports_chain(c)]
|
|
lookback = max(0, min(12, int(cfg.get("nodereal_raw_block_lookback") or 1)))
|
|
per_chain = max(1, min(int(cfg.get("nodereal_raw_max_logs_per_chain") or 30), int(limit or 60)))
|
|
inserted = []
|
|
errors = []
|
|
for chain in chains:
|
|
try:
|
|
latest = client.block_number(chain)
|
|
if latest <= 0:
|
|
continue
|
|
logs = client.get_logs(
|
|
chain,
|
|
{
|
|
"fromBlock": hex(max(0, latest - lookback)),
|
|
"toBlock": hex(latest),
|
|
"topics": [TRANSFER_TOPIC],
|
|
},
|
|
)
|
|
raw_items = []
|
|
for log in logs:
|
|
if not isinstance(log, dict):
|
|
continue
|
|
item = _raw_event_from_nodereal_transfer(log, chain)
|
|
if item:
|
|
raw_items.append(_apply_raw_event_mapping(item, client=client, cfg=cfg))
|
|
raw_items.sort(key=lambda item: item.get("amount") or 0, reverse=True)
|
|
for item in raw_items[:per_chain]:
|
|
if insert_onchain_raw_event(item):
|
|
inserted.append(item)
|
|
except Exception as exc:
|
|
errors.append(f"{chain}:nodereal_raw_logs:{str(exc)[:160]}")
|
|
return {"raw_events": inserted, "errors": errors}
|
|
|
|
|
|
def fetch_etherscan_events(limit=60):
|
|
cfg = get_onchain_params()
|
|
if not cfg.get("etherscan_enabled", True):
|
|
return {"events": [], "errors": ["etherscan_disabled"]}
|
|
api_key = cfg.get("etherscan_api_key") or ""
|
|
if not api_key:
|
|
return {"events": [], "errors": ["etherscan_api_key_missing"]}
|
|
allowed_chains = set(cfg.get("etherscan_chains") or ["ethereum"])
|
|
mappings = [
|
|
m for m in get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE)
|
|
if m.get("chain") in ETHERSCAN_CHAIN_IDS and m.get("chain") in allowed_chains
|
|
]
|
|
events = []
|
|
errors = []
|
|
for mapping in mappings[: int(limit or 60)]:
|
|
chain = str(mapping.get("chain") or "").lower()
|
|
contract = str(mapping.get("contract_address") or "").strip()
|
|
if not contract:
|
|
continue
|
|
params = {
|
|
"chainid": ETHERSCAN_CHAIN_IDS[chain],
|
|
"module": "account",
|
|
"action": "tokentx",
|
|
"contractaddress": contract,
|
|
"page": 1,
|
|
"offset": 25,
|
|
"sort": "desc",
|
|
"apikey": api_key,
|
|
}
|
|
try:
|
|
data = _request_json(cfg.get("etherscan_base_url"), params=params, timeout=cfg.get("timeout", 15))
|
|
status = str(data.get("status") or "")
|
|
message = str(data.get("message") or "")
|
|
rows = data.get("result") or []
|
|
if status == "0" and not isinstance(rows, list):
|
|
if "No transactions found" not in str(rows) and "No records" not in str(rows):
|
|
errors.append(f"{mapping.get('symbol')}:etherscan_{message}:{str(rows)[:100]}")
|
|
continue
|
|
if not isinstance(rows, list):
|
|
continue
|
|
for row in rows:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
event = _event_from_etherscan_transfer(row, mapping, cfg=cfg)
|
|
if not event:
|
|
continue
|
|
if insert_onchain_event(event):
|
|
events.append(event)
|
|
except Exception as exc:
|
|
errors.append(f"{mapping.get('symbol')}:etherscan:{str(exc)[:160]}")
|
|
return {"events": events, "errors": errors}
|
|
|
|
|
|
def _event_from_helius_tx(tx, mapping, cfg=None):
|
|
cfg = cfg or get_onchain_params()
|
|
mint = str(mapping.get("contract_address") or "")
|
|
symbol = normalize_symbol(mapping.get("symbol"))
|
|
signature = str(tx.get("signature") or "")
|
|
token_transfers = tx.get("tokenTransfers") or []
|
|
native_transfers = tx.get("nativeTransfers") or []
|
|
matched = [t for t in token_transfers if str(t.get("mint") or "") == mint]
|
|
if not matched:
|
|
return None
|
|
amount = max(_safe_float(t.get("tokenAmount")) for t in matched)
|
|
price_usd = _latest_price_from_metric(mapping)
|
|
value_usd = amount * price_usd if price_usd > 0 else 0
|
|
sol_amount = max([_safe_float(t.get("amount")) / 1_000_000_000 for t in native_transfers] or [0])
|
|
threshold = _safe_float(cfg.get("whale_tx_usd"), 250000)
|
|
if value_usd > 0 and value_usd < threshold and sol_amount < 100:
|
|
return None
|
|
signal = "whale_accumulation" if value_usd >= threshold or sol_amount >= 100 else "large_token_transfer"
|
|
return {
|
|
"chain": "solana",
|
|
"symbol": symbol,
|
|
"contract_address": mint,
|
|
"event_type": "solana_token_activity",
|
|
"signal_code": signal,
|
|
"signal_label": signal_label(signal),
|
|
"direction": "positive" if signal == "whale_accumulation" else "neutral",
|
|
"value_usd": value_usd,
|
|
"amount": amount,
|
|
"tx_hash": signature,
|
|
"wallet_address": (matched[0].get("toUserAccount") or matched[0].get("userAccount") or ""),
|
|
"wallet_label": "Solana 接收地址",
|
|
"counterparty_label": "Solana 发送地址 " + _short_addr(matched[0].get("fromUserAccount") or ""),
|
|
"confidence": 74 if signal == "whale_accumulation" else 58,
|
|
"severity": "A" if signal == "whale_accumulation" else "B",
|
|
"detected_at": _ts_to_iso(tx.get("timestamp")),
|
|
"source": "helius",
|
|
"url": _chain_explorer_tx_url("solana", signature),
|
|
"raw": tx,
|
|
}
|
|
|
|
|
|
def fetch_helius_events(limit=60):
|
|
cfg = get_onchain_params()
|
|
if not cfg.get("helius_enabled", True):
|
|
return {"events": [], "errors": ["helius_disabled"]}
|
|
api_key = cfg.get("helius_api_key") or ""
|
|
if not api_key:
|
|
return {"events": [], "errors": ["helius_api_key_missing"]}
|
|
mappings = [m for m in get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) if m.get("chain") == "solana"]
|
|
events = []
|
|
errors = []
|
|
for mapping in mappings[: int(limit or 60)]:
|
|
mint = str(mapping.get("contract_address") or "").strip()
|
|
if not mint:
|
|
continue
|
|
query = urlencode({"api-key": api_key, "limit": 25})
|
|
url = f"{cfg.get('helius_base_url')}/v0/addresses/{mint}/transactions?{query}"
|
|
try:
|
|
data = _request_json(url, timeout=cfg.get("timeout", 15))
|
|
rows = data if isinstance(data, list) else data.get("transactions") or []
|
|
for tx in rows:
|
|
if not isinstance(tx, dict):
|
|
continue
|
|
event = _event_from_helius_tx(tx, mapping, cfg=cfg)
|
|
if not event:
|
|
continue
|
|
if insert_onchain_event(event):
|
|
events.append(event)
|
|
except Exception as exc:
|
|
errors.append(f"{mapping.get('symbol')}:helius:{str(exc)[:160]}")
|
|
return {"events": events, "errors": errors}
|
|
|
|
|
|
def _ts_to_iso(value):
|
|
try:
|
|
if value:
|
|
return datetime.fromtimestamp(float(value)).isoformat(timespec="seconds")
|
|
except Exception:
|
|
pass
|
|
return _now().isoformat(timespec="seconds")
|
|
|
|
|
|
def _short_addr(value):
|
|
value = str(value or "")
|
|
if len(value) <= 12:
|
|
return value
|
|
return value[:6] + "..." + value[-4:]
|
|
|
|
|
|
def ingest_normalized_events(events):
|
|
"""Test/integration helper for provider adapters."""
|
|
init_db()
|
|
init_onchain_tables()
|
|
inserted = []
|
|
for event in events or []:
|
|
eid = insert_onchain_event(event)
|
|
if eid:
|
|
item = dict(event)
|
|
item["id"] = eid
|
|
inserted.append(item)
|
|
queued = enqueue_onchain_candidates()
|
|
return {"inserted": len(inserted), "queued": queued.get("queued", 0), "events": inserted, "candidate_result": queued}
|
|
|
|
|
|
def _candidate_title(event):
|
|
label = event.get("signal_label") or signal_label(event.get("signal_code"))
|
|
value = _safe_float(event.get("value_usd"))
|
|
value_txt = f" · ${value:,.0f}" if value > 0 else ""
|
|
return f"链上异动 {event.get('symbol')}: {label}{value_txt}"
|
|
|
|
|
|
def enqueue_onchain_candidates(min_score=None, min_confidence=None, cooldown_hours=None, limit=20):
|
|
cfg = get_onchain_params()
|
|
if not cfg.get("candidate_enabled", True):
|
|
return {"queued": 0, "skipped": 0, "symbols": [], "reason": "candidate_disabled"}
|
|
min_score = cfg.get("candidate_min_score", 70) if min_score is None else min_score
|
|
min_confidence = cfg.get("candidate_min_confidence", 70) if min_confidence is None else min_confidence
|
|
cooldown_hours = cfg.get("candidate_cooldown_hours", 6) if cooldown_hours is None else cooldown_hours
|
|
init_onchain_tables()
|
|
init_event_tables()
|
|
cutoff = (_now() - timedelta(hours=24)).isoformat()
|
|
conn = get_conn()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT e.*,
|
|
COALESCE((
|
|
SELECT m.onchain_score FROM onchain_token_metrics m
|
|
WHERE m.symbol=e.symbol AND m.chain=e.chain
|
|
ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1
|
|
), 0) AS latest_onchain_score,
|
|
COALESCE((
|
|
SELECT m.risk_score FROM onchain_token_metrics m
|
|
WHERE m.symbol=e.symbol AND m.chain=e.chain
|
|
ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1
|
|
), 0) AS latest_risk_score
|
|
FROM onchain_events e
|
|
WHERE e.status IN ('new', 'candidate_failed')
|
|
AND e.detected_at >= %s
|
|
AND e.direction='positive'
|
|
ORDER BY e.confidence DESC, e.value_usd DESC, e.detected_at::timestamp DESC
|
|
LIMIT %s
|
|
""",
|
|
(cutoff, int(limit or 20)),
|
|
).fetchall()
|
|
queued = []
|
|
skipped_ids = []
|
|
now = _now().isoformat(timespec="seconds")
|
|
cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat()
|
|
for row in rows:
|
|
event = dict(row)
|
|
symbol = normalize_symbol(event.get("symbol"))
|
|
if not symbol or not _tradable_symbol(symbol):
|
|
skipped_ids.append(event["id"])
|
|
continue
|
|
score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence")))
|
|
if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0):
|
|
continue
|
|
recent = conn.execute(
|
|
"""
|
|
SELECT id FROM event_news
|
|
WHERE source='onchain' AND symbol=%s AND detected_at >= %s
|
|
LIMIT 1
|
|
""",
|
|
(symbol, cooldown_cutoff),
|
|
).fetchone()
|
|
if recent:
|
|
skipped_ids.append(event["id"])
|
|
continue
|
|
title = _candidate_title(event)
|
|
h = event_hash("onchain", title, symbol)
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO event_news
|
|
(event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed)
|
|
VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0)
|
|
""",
|
|
(
|
|
h,
|
|
symbol,
|
|
title,
|
|
event.get("url") or "",
|
|
event.get("detected_at") or now,
|
|
now,
|
|
event.get("severity") or "A",
|
|
json.dumps(
|
|
{
|
|
"onchain_event_id": event.get("id"),
|
|
"chain": event.get("chain"),
|
|
"signal_code": event.get("signal_code"),
|
|
"signal_label": event.get("signal_label"),
|
|
"confidence": event.get("confidence"),
|
|
"value_usd": event.get("value_usd"),
|
|
"onchain_score": event.get("latest_onchain_score"),
|
|
"risk_score": event.get("latest_risk_score"),
|
|
},
|
|
ensure_ascii=False,
|
|
),
|
|
),
|
|
)
|
|
conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),))
|
|
queued.append(symbol)
|
|
except Exception:
|
|
skipped_ids.append(event["id"])
|
|
if skipped_ids:
|
|
conn.execute(
|
|
"UPDATE onchain_events SET status='candidate_skipped' WHERE id IN (" + ",".join(["%s"] * len(skipped_ids)) + ")",
|
|
tuple(skipped_ids),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return {"queued": len(queued), "skipped": len(skipped_ids), "symbols": queued}
|
|
|
|
|
|
def run_once(limit=60):
|
|
started = _now()
|
|
init_db()
|
|
init_onchain_tables()
|
|
cfg = get_onchain_params()
|
|
output = {
|
|
"status": "disabled" if not cfg.get("enabled") else "processed",
|
|
"metrics_count": 0,
|
|
"events_count": 0,
|
|
"raw_events_count": 0,
|
|
"candidate_queued": 0,
|
|
"errors": [],
|
|
"check_time": _now().isoformat(),
|
|
}
|
|
if cfg.get("enabled"):
|
|
node = fetch_nodereal_events(limit=limit)
|
|
output["metrics_count"] += len(node.get("metrics") or [])
|
|
output["events_count"] += len(node.get("events") or [])
|
|
output["raw_events_count"] += len(node.get("raw_events") or [])
|
|
output["errors"].extend(node.get("errors") or [])
|
|
output["discovered_mappings"] = 0
|
|
if output.get("discovered_mappings"):
|
|
output["status"] = "bootstrapped"
|
|
node = fetch_nodereal_events(limit=limit)
|
|
output["metrics_count"] = len(node.get("metrics") or [])
|
|
output["events_count"] = len(node.get("events") or [])
|
|
output["errors"].extend(node.get("errors") or [])
|
|
queued = enqueue_onchain_candidates()
|
|
output["candidate_queued"] = queued.get("queued", 0)
|
|
output["candidate_symbols"] = queued.get("symbols", [])
|
|
if not output["metrics_count"] and not output["events_count"] and not output["raw_events_count"]:
|
|
output["status"] = "no_onchain_data"
|
|
log_cron_run(
|
|
job_name="链上",
|
|
script_name="onchain_monitor.py",
|
|
run_status="success" if not output["errors"] else "error",
|
|
result_status=output["status"],
|
|
started_at=started.isoformat(),
|
|
finished_at=_now().isoformat(),
|
|
duration_ms=int((_now() - started).total_seconds() * 1000),
|
|
summary={
|
|
"metrics_count": output["metrics_count"],
|
|
"events_count": output["events_count"],
|
|
"raw_events_count": output["raw_events_count"],
|
|
"candidate_queued": output["candidate_queued"],
|
|
"enabled": cfg.get("enabled"),
|
|
},
|
|
error_message="; ".join(output["errors"][:5]),
|
|
)
|
|
print(json.dumps(output, ensure_ascii=False, indent=2, default=str))
|
|
return output
|
|
|
|
|
|
__all__ = [
|
|
"POSITIVE_SIGNALS",
|
|
"RISK_SIGNALS",
|
|
"derive_dex_signals",
|
|
"enqueue_onchain_candidates",
|
|
"fetch_dexscreener_metrics",
|
|
"fetch_dexscreener_raw_events",
|
|
"fetch_etherscan_events",
|
|
"fetch_helius_events",
|
|
"fetch_nodereal_events",
|
|
"get_onchain_params",
|
|
"ingest_normalized_events",
|
|
"normalize_dexscreener_pair",
|
|
"run_once",
|
|
"seed_configured_token_mappings",
|
|
]
|