"""On-chain signal collector and candidate bridge. V1 deliberately treats on-chain data as a discovery/risk layer. It writes normalized events/metrics and may request a technical check through event_news, but it never creates recommendations or changes recommendation state directly. """ import json import os from datetime import datetime, timedelta from app.config.system_config import onchain_config from app.db import onchain_db from app.db.altcoin_db import get_conn, init_db, log_cron_run from app.db.tracking_queries import get_latest_price_cache from app.db.onchain_db import ( MIN_MAPPING_CONFIDENCE, POSITIVE_SIGNALS, RISK_SIGNALS, find_mapping_by_contract, get_token_mappings, init_onchain_tables, insert_onchain_event, insert_onchain_raw_event, insert_token_metric, normalize_symbol, signal_direction, signal_label, ) from app.services.event_driven_screener import _event_hash as event_hash from app.services.event_driven_screener import _tradable_symbol, init_event_tables from app.services.alchemy_client import AlchemyClient, AlchemyConfig, DEFAULT_ALCHEMY_CHAIN_ENDPOINTS from app.services.nodereal_client import DEFAULT_CHAIN_ENDPOINTS, NodeRealClient, NodeRealConfig DEFAULT_CHAINS = ("ethereum", "bsc") NON_TARGET_NATIVE_BASES = { "AVAX", "FIL", "SUI", "APT", "DOT", "ADA", "XRP", "LTC", "BCH", "ATOM", "NEAR", "SEI", "INJ", "TON", "ETC", "ICP", "HBAR", "ALGO", "VET", "TRX", "XLM", "KAS", "TIA", "EGLD", "FLOW", "KAVA", "MINA", "IOTA", "XMR", "DASH", "ZEC", } BRIDGED_TOKEN_MARKERS = ( "wrapped", "wormhole", "portal", "bridged", "bridge", "axelar", "allbridge", "binance-peg", "multichain", "layerzero", "lz", "wavax", "wfil", ) TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" ERC20_SYMBOL_SELECTOR = "0x95d89b41" ERC20_NAME_SELECTOR = "0x06fdde03" ERC20_DECIMALS_SELECTOR = "0x313ce567" def _provider_error_summary(provider: str, chain: str = "", scope: str = "", symbol: str = "", exc: Exception | str = "") -> str: provider_label = {"alchemy": "Alchemy", "nodereal": "NodeReal"}.get(str(provider or "").lower(), provider or "链上数据源") chain_label = {"ethereum": "Ethereum", "bsc": "BSC"}.get(str(chain or "").lower(), chain or "") scope_label = { "logs": "映射代币日志", "raw_logs": "原始转账流", "metadata": "Token 资料", }.get(str(scope or ""), scope or "链上数据") text = str(exc or "") reason = "采集失败" lowered = text.lower() if "name resolution" in lowered or "nameresolution" in lowered or "temporary failure in name resolution" in lowered: reason = "DNS 解析异常" elif "ssl" in lowered or "connectionpool" in lowered or "max retries" in lowered or "eof" in lowered: reason = "连接异常" elif "timeout" in lowered or "timed out" in lowered: reason = "请求超时" elif "rate" in lowered or "429" in lowered: reason = "额度或限流" elif "403" in lowered or "401" in lowered or "api_key" in lowered: reason = "鉴权异常" prefix = f"{symbol}:" if symbol else "" chain_part = f"{chain_label} " if chain_label else "" return f"{prefix}{provider_label} {chain_part}{scope_label} {reason}" # --------------------------------------------------------------------------- # Known CEX hot/deposit wallet addresses (lowercase). # Sources: Etherscan/BscScan labeled addresses, Arkham, Nansen public tags. # Used to classify transfer direction: inflow (to CEX) vs outflow (from CEX). # --------------------------------------------------------------------------- _CEX_ADDRESSES: set[str] = { # Binance "0x28c6c06298d514db089934071355e5743bf21d60", "0x21a31ee1afc51d94c2efccaa2092ad1028285549", "0xdfd5293d8e347dfe59e90efd55b2956a1343963d", "0x56eddb7aa87536c09ccc2793473599fd21a8b17f", "0x9696f59e4d72e237be84ffd425dcad154bf96976", "0xf977814e90da44bfa03b6295a0616a897441acec", "0x8894e0a0c962cb723c1ef8a1b67f07aa277d42ad", "0xe2fc31f816a9b94326492132018c3aecc4a93ae1", "0x3c783c21a0383057d128bae431894a5c19f9cf06", "0xb38e8c17e38363af6ebdcb3dae12e0243582891d", "0x5a52e96bacdabb82fd05763e25335261b270efcb", "0x835678a611b28684005a5e2233695fb6cbbb00a4", # OKX "0x6cc5f688a315f3dc28a7781717a9a798a59fda7b", "0x236f9f97e0e62388479bf9e5ba4889e46b0273c3", "0xa7efae728d2936e78bda97dc267687568dd593f3", "0x98ec059dc3adfbdd63429454aeb0c990fba4a128", "0x6fb624b48d9299674022a23d92515e76ba880113", # Bybit "0xf89d7b9c864f589bbf53a82105107622b35eaa40", "0x1db92e2eebc8e0c075a02bea49a2935bcd2dfcf4", # Coinbase "0x71660c4005ba85c37ccec55d0c4493e66fe775d3", "0x503828976d22510aad0201ac7ec88293211d23da", "0xddfabcdc4d8ffc6d5beaf154f18b778f892a0740", "0x3cd751e6b0078be393132286c442345e68ff0aaa", "0xb5d85cbf7cb3ee0d56b3bb207d5fc4b82f43f511", "0xa9d1e08c7793af67e9d92fe308d5697fb81d3e43", # Kraken "0x2910543af39aba0cd09dbb2d50200b3e800a63d2", "0x267be1c1d684f78cb4f6a176c4911b741e4ffdc0", # KuCoin "0xd6216fc19db775df9774a6e33526131da7d19a2c", "0xf16e9b0d03470827a95cdfd0cb8a8a3b46969b91", "0x738cf6903e6c4e699d1c2dd9ab8b67fcdb3121ea", # Gate.io "0x0d0707963952f2fba59dd06f2b425ace40b492fe", "0x1c4b70a3968436b9a0a9cf5205c787eb81bb558c", # Huobi / HTX "0xab5c66752a9e8167967685f1450532fb96d5d24f", "0x6748f50f686bfbca6fe8ad62b22228b87f31ff2b", "0xfdb16996831753d5331ff813c29a93c76834a0ad", "0x46340b20830761efd32832a74d7169b29feb9758", # Bitfinex "0x876eabf441b2ee5b5b0554fd502a8e0600950cfa", "0x742d35cc6634c0532925a3b844bc9e7595f2bd3e", # Crypto.com "0x6262998ced04146fa42253a5c0af90ca02dfd2a3", "0x46340b20830761efd32832a74d7169b29feb9758", # MEXC "0x3cc936b795a188f0e246cbb2d74c5bd190aecf18", # Upbit "0x5e032243d507c743b061ef27c9169ae92ed40ec0", } def is_cex_address(address: str) -> bool: """Check if an address belongs to a known centralized exchange.""" return str(address or "").lower().strip() in _CEX_ADDRESSES def classify_transfer_signal(from_addr: str, to_addr: str) -> tuple[str, str]: """Classify a transfer's signal code and direction based on CEX address labels. Returns (signal_code, direction): - to CEX → ("exchange_inflow_risk", "risk") — likely selling - from CEX → ("exchange_outflow", "positive") — likely accumulating - neither → ("whale_accumulation", "positive") — large wallet-to-wallet move """ to_is_cex = is_cex_address(to_addr) from_is_cex = is_cex_address(from_addr) if to_is_cex and not from_is_cex: return "exchange_inflow_risk", "risk" if from_is_cex and not to_is_cex: return "exchange_outflow", "positive" # Both CEX (internal transfer) or neither (wallet-to-wallet whale move) return "whale_accumulation", "positive" def _env_bool(name, default=False): value = os.getenv(name) if value is None: return default return str(value).strip().lower() in ("1", "true", "yes", "on") def _env_int(name, default): try: return int(os.getenv(name, str(default)) or default) except Exception: return default def _env_float(name, default): try: return float(os.getenv(name, str(default)) or default) except Exception: return default def get_onchain_params(): """Runtime provider config. Keep this out of rules.yaml.""" cfg = onchain_config(DEFAULT_CHAINS) chains_raw = cfg.get("chains") or list(DEFAULT_CHAINS) if isinstance(chains_raw, str): chains = [x.strip().lower() for x in chains_raw.split(",") if x.strip()] else: chains = [str(x).strip().lower() for x in chains_raw if str(x).strip()] nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY") alchemy_env = str(cfg.get("alchemy_api_key_env") or "ALPHAX_ALCHEMY_API_KEY") token_mappings_env = str(cfg.get("token_mappings_env") or "ALPHAX_ONCHAIN_TOKEN_MAPPINGS") return { "enabled": bool(cfg.get("enabled", False)), "provider": str(cfg.get("provider") or "nodereal").strip().lower(), "chains": chains or list(DEFAULT_CHAINS), "timeout": int(cfg.get("timeout") or 15), "nodereal_enabled": bool(cfg.get("nodereal_enabled", True)), "nodereal_chains": _normalize_chain_list(cfg.get("nodereal_chains") or ("ethereum", "bsc")), "nodereal_api_key": os.getenv(nodereal_env, "").strip(), "nodereal_api_key_env": nodereal_env, "alchemy_enabled": bool(cfg.get("alchemy_enabled", False)), "alchemy_chains": _normalize_chain_list(cfg.get("alchemy_chains") or ("ethereum", "bsc")), "alchemy_api_key": os.getenv(alchemy_env, "").strip(), "alchemy_api_key_env": alchemy_env, "token_mappings": _load_token_mappings(cfg.get("token_mappings"), os.getenv(token_mappings_env, "")), "token_mappings_env": token_mappings_env, "nodereal_log_block_lookback": int(cfg.get("nodereal_log_block_lookback") or 120), "nodereal_max_logs_per_token": int(cfg.get("nodereal_max_logs_per_token") or 25), "nodereal_raw_transfer_enabled": bool(cfg.get("nodereal_raw_transfer_enabled", True)), "nodereal_raw_block_lookback": int(cfg.get("nodereal_raw_block_lookback") or 1), "nodereal_raw_max_logs_per_chain": int(cfg.get("nodereal_raw_max_logs_per_chain") or 30), "nodereal_auto_mapping_enabled": bool(cfg.get("nodereal_auto_mapping_enabled", True)), "nodereal_auto_mapping_confidence": int(cfg.get("nodereal_auto_mapping_confidence") or 82), "alchemy_log_block_lookback": int(cfg.get("alchemy_log_block_lookback") or 120), "alchemy_max_logs_per_token": int(cfg.get("alchemy_max_logs_per_token") or 25), "alchemy_raw_transfer_enabled": bool(cfg.get("alchemy_raw_transfer_enabled", True)), "alchemy_raw_chains": _normalize_chain_list(cfg.get("alchemy_raw_chains") or ("ethereum",)), "alchemy_raw_block_lookback": int(cfg.get("alchemy_raw_block_lookback") or 1), "alchemy_raw_max_logs_per_chain": int(cfg.get("alchemy_raw_max_logs_per_chain") or 30), "alchemy_auto_mapping_enabled": bool(cfg.get("alchemy_auto_mapping_enabled", True)), "alchemy_auto_mapping_confidence": int(cfg.get("alchemy_auto_mapping_confidence") or 82), "candidate_enabled": bool(cfg.get("candidate_enabled", True)), "candidate_min_score": float(cfg.get("candidate_min_score") or 70), "candidate_min_confidence": int(cfg.get("candidate_min_confidence") or 70), "candidate_cooldown_hours": float(cfg.get("candidate_cooldown_hours") or 6), "whale_tx_usd": float(cfg.get("whale_tx_usd") or 250000), } def _normalize_chain_list(value): if isinstance(value, str): return [x.strip().lower() for x in value.split(",") if x.strip()] return [str(x).strip().lower() for x in (value or []) if str(x).strip()] def _load_token_mappings(config_value=None, env_value=""): items = [] if isinstance(config_value, list): items.extend(config_value) if env_value: try: parsed = json.loads(env_value) if isinstance(parsed, list): items.extend(parsed) except Exception: for part in str(env_value or "").split(","): bits = [x.strip() for x in part.split(":")] if len(bits) >= 3: items.append({"symbol": bits[0], "chain": bits[1], "contract_address": bits[2]}) normalized = [] seen = set() for item in items: if not isinstance(item, dict): continue symbol = normalize_symbol(item.get("symbol")) chain = str(item.get("chain") or "").lower().strip() contract = str(item.get("contract_address") or item.get("address") or "").strip() if not symbol or not chain or not contract: continue key = (symbol, chain, contract.lower()) if key in seen: continue seen.add(key) normalized.append({ "symbol": symbol, "chain": chain, "contract_address": contract, "source": item.get("source") or "nodereal_seed", "confidence": int(item.get("confidence") or 95), "raw": item.get("raw") or {}, }) return normalized def seed_configured_token_mappings(cfg=None): cfg = cfg or get_onchain_params() seeded = [] errors = [] for item in cfg.get("token_mappings") or []: try: mapping_id = onchain_db.upsert_token_mapping( item["symbol"], item["chain"], item["contract_address"], source=item.get("source") or "nodereal_seed", confidence=item.get("confidence") or 95, raw=item.get("raw") or {}, is_active=True, ) if mapping_id: seeded.append(item) except Exception as exc: errors.append(f"{item.get('symbol')}:seed_mapping:{str(exc)[:160]}") return {"seeded": len(seeded), "items": seeded, "errors": errors} def _now(): return datetime.now() def _safe_float(value, default=0.0): try: return float(value or 0) except Exception: return default def _safe_int(value, default=0): try: return int(float(value or 0)) except Exception: return default def _chain_explorer_tx_url(chain, tx_hash): tx_hash = str(tx_hash or "").strip() if not tx_hash: return "" if chain == "ethereum": return f"https://etherscan.io/tx/{tx_hash}" if chain == "bsc": return f"https://bscscan.com/tx/{tx_hash}" if chain == "base": return f"https://basescan.org/tx/{tx_hash}" if chain == "arbitrum": return f"https://arbiscan.io/tx/{tx_hash}" if chain == "solana": return f"https://solscan.io/tx/{tx_hash}" return "" def _latest_metric(symbol, chain, contract_address): conn = get_conn() row = conn.execute( """ SELECT * FROM onchain_token_metrics WHERE symbol=%s AND chain=%s AND contract_address=%s AND "window"='1h' ORDER BY metric_time DESC, id DESC LIMIT 1 """, (symbol, chain, contract_address or ""), ).fetchone() conn.close() return dict(row) if row else None def _event_from_metric(metric, signal_code, source="nodereal"): direction = signal_direction(signal_code) severity = "RISK" if direction == "risk" else "A" if _safe_float(metric.get("onchain_score")) >= 75 else "B" return { "chain": metric.get("chain"), "symbol": metric.get("symbol"), "contract_address": metric.get("contract_address") or "", "event_type": "onchain_signal", "signal_code": signal_code, "signal_label": signal_label(signal_code), "direction": direction, "value_usd": metric.get("dex_volume_usd") or metric.get("whale_accumulation_usd") or abs(metric.get("exchange_netflow_usd") or 0), "confidence": 75 if direction != "risk" else 80, "severity": severity, "detected_at": metric.get("metric_time") or _now().isoformat(), "source": source, "url": metric.get("url") or "", "raw": metric, } def _latest_price_from_metric(mapping): symbol = normalize_symbol(mapping.get("symbol")) chain = str(mapping.get("chain") or "").lower() contract = str(mapping.get("contract_address") or "") conn = get_conn() try: rows = conn.execute( """ SELECT raw_json FROM onchain_token_metrics WHERE symbol=%s AND chain=%s AND contract_address=%s ORDER BY metric_time DESC, id DESC LIMIT 8 """, (symbol, chain, contract), ).fetchall() finally: conn.close() for row in rows: try: raw = json.loads(row.get("raw_json") or "{}") except Exception: raw = {} price = _safe_float(raw.get("price_usd")) if price > 0: return price cache = get_latest_price_cache([symbol]) item = cache.get(symbol) or {} return _safe_float(item.get("price")) def _hex_to_int(value): text = str(value or "").strip() if not text: return 0 try: return int(text, 16) if text.startswith("0x") else int(text) except Exception: return 0 def _topic_to_address(topic): topic = str(topic or "").lower() if topic.startswith("0x") and len(topic) >= 42: return "0x" + topic[-40:] return "" def _decode_abi_string(value): text = str(value or "").strip() if not text or text == "0x": return "" payload = text[2:] if text.startswith("0x") else text try: raw = bytes.fromhex(payload) except Exception: return "" if not raw: return "" try: if len(raw) >= 96: offset = int.from_bytes(raw[:32], "big") if 0 <= offset + 32 <= len(raw): length = int.from_bytes(raw[offset:offset + 32], "big") body = raw[offset + 32:offset + 32 + length] return body.decode("utf-8", errors="ignore").strip("\x00 ").strip() return raw.rstrip(b"\x00").decode("utf-8", errors="ignore").strip() except Exception: return "" def _clean_erc20_symbol(value): symbol = str(value or "").upper().strip().replace("$", "") symbol = "".join(ch for ch in symbol if ch.isalnum()) if symbol.endswith("USDT") and len(symbol) > 4: symbol = symbol[:-4] return symbol[:20] def _is_auto_mapping_symbol_allowed(base, token_name=""): base = _clean_erc20_symbol(base) if not base: return False symbol = f"{base}/USDT" if not _tradable_symbol(symbol): return False if base in NON_TARGET_NATIVE_BASES: return False text = f"{base} {token_name or ''}".lower() if any(marker in text for marker in BRIDGED_TOKEN_MARKERS): return False return True def _read_erc20_metadata(client, chain, contract): metadata = {"symbol": "", "name": "", "decimals": 18} try: metadata["symbol"] = _decode_abi_string(client.eth_call(chain, contract, ERC20_SYMBOL_SELECTOR)) except Exception: metadata["symbol"] = "" try: metadata["name"] = _decode_abi_string(client.eth_call(chain, contract, ERC20_NAME_SELECTOR)) except Exception: metadata["name"] = "" try: decimals = _hex_to_int(client.eth_call(chain, contract, ERC20_DECIMALS_SELECTOR)) if 0 <= decimals <= 36: metadata["decimals"] = decimals except Exception: pass metadata["symbol"] = _clean_erc20_symbol(metadata.get("symbol")) return metadata def _auto_map_evm_contract(client, chain, contract, cfg=None, provider="nodereal"): cfg = cfg or get_onchain_params() provider = str(provider or "nodereal").lower() if not cfg.get(f"{provider}_auto_mapping_enabled", True): return None existing = find_mapping_by_contract(chain, contract) if existing: return existing metadata = _read_erc20_metadata(client, chain, contract) base = metadata.get("symbol") or "" if not _is_auto_mapping_symbol_allowed(base, metadata.get("name")): return None symbol = normalize_symbol(base) confidence = max(1, min(95, int(cfg.get(f"{provider}_auto_mapping_confidence") or 82))) source = f"{provider}_erc20_metadata" mapping_id = onchain_db.upsert_token_mapping( symbol=symbol, chain=chain, contract_address=contract, source=source, confidence=confidence, raw=metadata, is_active=True, ) if not mapping_id: return None return { "id": mapping_id, "symbol": symbol, "chain": str(chain or "").lower(), "contract_address": contract, "source": source, "confidence": confidence, "raw_json": json.dumps(metadata, ensure_ascii=False), } def _auto_map_nodereal_contract(client, chain, contract, cfg=None): return _auto_map_evm_contract(client, chain, contract, cfg=cfg, provider="nodereal") def _nodereal_client(cfg=None): cfg = cfg or get_onchain_params() return NodeRealClient( NodeRealConfig( api_key=cfg.get("nodereal_api_key") or "", timeout=int(cfg.get("timeout") or 15), endpoints=dict(DEFAULT_CHAIN_ENDPOINTS), ) ) def _alchemy_client(cfg=None): cfg = cfg or get_onchain_params() return AlchemyClient( AlchemyConfig( api_key=cfg.get("alchemy_api_key") or "", timeout=int(cfg.get("timeout") or 15), endpoints=dict(DEFAULT_ALCHEMY_CHAIN_ENDPOINTS), ) ) def _event_from_evm_transfer(log, mapping, cfg=None, source="nodereal"): cfg = cfg or get_onchain_params() source = str(source or "nodereal").lower() topics = log.get("topics") or [] if len(topics) < 3: return None amount_raw = _hex_to_int(log.get("data")) mapping_raw = {} try: mapping_raw = json.loads(mapping.get("raw_json") or "{}") except Exception: mapping_raw = {} decimals = _safe_int(mapping_raw.get("decimals") or mapping_raw.get("tokenDecimal") or 18, 18) amount = amount_raw / (10 ** decimals if decimals >= 0 else 1) price_usd = _latest_price_from_metric(mapping) value_usd = amount * price_usd if price_usd > 0 else 0 threshold = _safe_float(cfg.get("whale_tx_usd"), 250000) if value_usd <= 0 or value_usd < threshold: return None chain = str(mapping.get("chain") or "").lower() tx_hash = str(log.get("transactionHash") or "").strip() from_addr = _topic_to_address(topics[1]) to_addr = _topic_to_address(topics[2]) sig_code, direction = classify_transfer_signal(from_addr, to_addr) severity = "RISK" if direction == "risk" else "A" confidence = 80 if direction == "risk" else 76 # Descriptive labels based on classification if sig_code == "exchange_inflow_risk": wallet_label = "CEX 充值地址" counterparty_label = "发送方 " + _short_addr(from_addr) elif sig_code == "exchange_outflow": wallet_label = "接收钱包 " + _short_addr(to_addr) counterparty_label = "CEX 提币地址" else: wallet_label = "EVM 接收地址" counterparty_label = "EVM 发送地址 " + _short_addr(from_addr) return { "chain": chain, "symbol": mapping.get("symbol"), "contract_address": mapping.get("contract_address") or "", "event_type": "token_transfer", "signal_code": sig_code, "signal_label": signal_label(sig_code), "direction": direction, "value_usd": value_usd, "amount": amount, "tx_hash": tx_hash, "wallet_address": to_addr, "wallet_label": wallet_label, "counterparty_label": counterparty_label, "confidence": confidence, "severity": severity, "detected_at": _now().isoformat(timespec="seconds"), "source": source, "url": _chain_explorer_tx_url(chain, tx_hash), "raw": log, } def _event_from_nodereal_transfer(log, mapping, cfg=None): return _event_from_evm_transfer(log, mapping, cfg=cfg, source="nodereal") def _raw_event_from_evm_transfer(log, chain, source="nodereal"): source = str(source or "nodereal").lower() topics = log.get("topics") or [] if len(topics) < 3: return None contract = str(log.get("address") or "").strip() tx_hash = str(log.get("transactionHash") or "").strip() amount_raw = _hex_to_int(log.get("data")) if not contract or amount_raw <= 0: return None from_addr = _topic_to_address(topics[1]) to_addr = _topic_to_address(topics[2]) source_label = "Alchemy" if source == "alchemy" else "NodeReal" return { "source": source, "chain": str(chain or "").lower(), "event_type": "evm_transfer", "token_address": contract, "symbol_guess": "", "name": "", "title": f"{source_label} ERC-20 原始转账", "description": f"合约 {_short_addr(contract)} · {_short_addr(from_addr)} -> {_short_addr(to_addr)}", "url": _chain_explorer_tx_url(chain, tx_hash), "amount": amount_raw, "total_amount": 0, "importance": min(100, max(1, len(str(amount_raw)) * 4)), "mapped_symbol": "", "mapping_status": "unmapped", "detected_at": _now().isoformat(timespec="seconds"), "raw": log, } def _raw_event_from_nodereal_transfer(log, chain): return _raw_event_from_evm_transfer(log, chain, source="nodereal") def _apply_raw_event_mapping(raw_event, client=None, cfg=None, provider=None): item = dict(raw_event or {}) chain = str(item.get("chain") or "").lower() contract = str(item.get("token_address") or "").strip() if not chain or not contract: return item mapping = find_mapping_by_contract(chain, contract) if not mapping and client: mapping = _auto_map_evm_contract(client, chain, contract, cfg=cfg, provider=provider or item.get("source") or "nodereal") if mapping: item["mapped_symbol"] = normalize_symbol(mapping.get("symbol")) item["mapping_status"] = "mapped" item["symbol_guess"] = item.get("symbol_guess") or item["mapped_symbol"].split("/")[0] item["raw"] = { **(item.get("raw") or {}), "mapping": { "symbol": item["mapped_symbol"], "source": mapping.get("source") or "", "confidence": mapping.get("confidence") or 0, }, } return item def _metric_from_nodereal_holder_count(holder_count, mapping): symbol = normalize_symbol(mapping.get("symbol")) chain = str(mapping.get("chain") or "").lower() contract = str(mapping.get("contract_address") or "") prev = _latest_metric(symbol, chain, contract) prev_count = 0 if prev: try: prev_raw = json.loads(prev.get("raw_json") or "{}") prev_count = _safe_int(prev_raw.get("holder_count")) except Exception: prev_count = 0 holder_delta = holder_count - prev_count if prev_count > 0 else 0 metric = { "symbol": symbol, "chain": chain, "contract_address": contract, "window": "1h", "metric_time": _now().isoformat(timespec="seconds"), "holder_delta": holder_delta, "smart_money_score": 0, "source": "nodereal", "raw": { "holder_count": holder_count, "previous_holder_count": prev_count, }, } if holder_delta > 0: metric["onchain_score"] = min(30, holder_delta) elif holder_delta < 0: metric["risk_score"] = min(30, abs(holder_delta)) return metric def _event_from_holder_metric(metric): holder_delta = _safe_float(metric.get("holder_delta")) if holder_delta <= 0: return None if holder_delta < 20: return None return _event_from_metric(metric, "holder_growth", source="nodereal") def fetch_nodereal_events(limit=60): cfg = get_onchain_params() if not cfg.get("nodereal_enabled", True): return {"metrics": [], "events": [], "errors": ["nodereal_disabled"]} if not cfg.get("nodereal_api_key"): return {"metrics": [], "events": [], "errors": ["nodereal_api_key_missing"]} seed_result = seed_configured_token_mappings(cfg) client = _nodereal_client(cfg) raw_result = fetch_nodereal_raw_events(client=client, cfg=cfg, limit=limit) enabled_chains = set(cfg.get("nodereal_chains") or DEFAULT_CHAINS) all_mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) chain_mappings = [m for m in all_mappings if str(m.get("chain") or "").lower() in enabled_chains] mappings = [] unsupported_chains = set() for mapping in chain_mappings: chain = str(mapping.get("chain") or "").lower() if client.supports_chain(chain): mappings.append(mapping) else: unsupported_chains.add(chain) metrics = [] events = [] errors = list(seed_result.get("errors") or []) + list(raw_result.get("errors") or []) diagnostics = { "seeded_mappings": seed_result.get("seeded", 0), "mapping_total": len(all_mappings), "chain_mapping_total": len(chain_mappings), "supported_mapping_total": len(mappings), "enabled_chains": sorted(enabled_chains), "unsupported_chains": sorted(unsupported_chains), } lookback = max(1, int(cfg.get("nodereal_log_block_lookback") or 120)) max_logs = max(1, int(cfg.get("nodereal_max_logs_per_token") or 25)) for mapping in mappings[: int(limit or 60)]: chain = str(mapping.get("chain") or "").lower() contract = str(mapping.get("contract_address") or "").strip() if not _is_evm_address(contract): continue try: holder_count = client.token_holder_count(chain, contract) if holder_count: metric = _metric_from_nodereal_holder_count(holder_count, mapping) insert_token_metric(metric) metrics.append(metric) holder_event = _event_from_holder_metric(metric) if holder_event and insert_onchain_event(holder_event): events.append(holder_event) except Exception as exc: errors.append(_provider_error_summary("nodereal", chain=chain, scope="metadata", symbol=mapping.get("symbol"), exc=exc)) try: latest = client.block_number(chain) if latest <= 0: continue logs = client.get_logs( chain, { "address": contract, "fromBlock": hex(max(0, latest - lookback)), "toBlock": hex(latest), "topics": [TRANSFER_TOPIC], }, ) for log in logs[:max_logs]: if not isinstance(log, dict): continue event = _event_from_nodereal_transfer(log, mapping, cfg=cfg) if not event: continue if insert_onchain_event(event): events.append(event) except Exception as exc: errors.append(_provider_error_summary("nodereal", chain=chain, scope="logs", symbol=mapping.get("symbol"), exc=exc)) if not all_mappings: diagnostics["mapping_note"] = "no_strategy_mappings_raw_events_only" elif not chain_mappings: diagnostics["mapping_note"] = "no_enabled_chain_mappings_raw_events_only" elif not mappings: diagnostics["mapping_note"] = "no_supported_mappings_raw_events_only" return { "metrics": metrics, "events": events, "raw_events": raw_result.get("raw_events") or [], "errors": errors, "diagnostics": diagnostics, } def fetch_alchemy_events(limit=60): cfg = get_onchain_params() if not cfg.get("alchemy_enabled", False): return {"metrics": [], "events": [], "raw_events": [], "errors": ["alchemy_disabled"]} if not cfg.get("alchemy_api_key"): return {"metrics": [], "events": [], "raw_events": [], "errors": ["alchemy_api_key_missing"]} seed_result = seed_configured_token_mappings(cfg) client = _alchemy_client(cfg) raw_result = fetch_alchemy_raw_events(client=client, cfg=cfg, limit=limit) enabled_chains = set(cfg.get("alchemy_chains") or DEFAULT_CHAINS) all_mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) chain_mappings = [m for m in all_mappings if str(m.get("chain") or "").lower() in enabled_chains] mappings = [] unsupported_chains = set() for mapping in chain_mappings: chain = str(mapping.get("chain") or "").lower() if client.supports_chain(chain): mappings.append(mapping) else: unsupported_chains.add(chain) events = [] errors = list(seed_result.get("errors") or []) + list(raw_result.get("errors") or []) diagnostics = { "seeded_mappings": seed_result.get("seeded", 0), "mapping_total": len(all_mappings), "chain_mapping_total": len(chain_mappings), "supported_mapping_total": len(mappings), "enabled_chains": sorted(enabled_chains), "unsupported_chains": sorted(unsupported_chains), } lookback = max(1, int(cfg.get("alchemy_log_block_lookback") or 120)) max_logs = max(1, int(cfg.get("alchemy_max_logs_per_token") or 25)) for mapping in mappings[: int(limit or 60)]: chain = str(mapping.get("chain") or "").lower() contract = str(mapping.get("contract_address") or "").strip() if not _is_evm_address(contract): continue try: latest = client.block_number(chain) if latest <= 0: continue logs = client.get_logs( chain, { "address": contract, "fromBlock": hex(max(0, latest - lookback)), "toBlock": hex(latest), "topics": [TRANSFER_TOPIC], }, ) for log in logs[:max_logs]: if not isinstance(log, dict): continue event = _event_from_evm_transfer(log, mapping, cfg=cfg, source="alchemy") if not event: continue if insert_onchain_event(event): events.append(event) except Exception as exc: errors.append(_provider_error_summary("alchemy", chain=chain, scope="logs", symbol=mapping.get("symbol"), exc=exc)) if not all_mappings: diagnostics["mapping_note"] = "no_strategy_mappings_raw_events_only" elif not chain_mappings: diagnostics["mapping_note"] = "no_enabled_chain_mappings_raw_events_only" elif not mappings: diagnostics["mapping_note"] = "no_supported_mappings_raw_events_only" return { "metrics": [], "events": events, "raw_events": raw_result.get("raw_events") or [], "errors": errors, "diagnostics": diagnostics, } def fetch_nodereal_raw_events(client=None, cfg=None, limit=60): cfg = cfg or get_onchain_params() if not cfg.get("nodereal_raw_transfer_enabled", True): return {"raw_events": [], "errors": []} client = client or _nodereal_client(cfg) chains = [c for c in (cfg.get("nodereal_chains") or DEFAULT_CHAINS) if client.supports_chain(c)] lookback = max(0, min(12, int(cfg.get("nodereal_raw_block_lookback") or 1))) per_chain = max(1, min(int(cfg.get("nodereal_raw_max_logs_per_chain") or 30), int(limit or 60))) inserted = [] errors = [] for chain in chains: try: latest = client.block_number(chain) if latest <= 0: continue logs = client.get_logs( chain, { "fromBlock": hex(max(0, latest - lookback)), "toBlock": hex(latest), "topics": [TRANSFER_TOPIC], }, ) raw_items = [] for log in logs: if not isinstance(log, dict): continue item = _raw_event_from_nodereal_transfer(log, chain) if item: raw_items.append(item) raw_items.sort(key=lambda item: item.get("amount") or 0, reverse=True) for item in raw_items[:per_chain]: item = _apply_raw_event_mapping(item, client=client, cfg=cfg, provider="nodereal") if insert_onchain_raw_event(item): inserted.append(item) except Exception as exc: errors.append(_provider_error_summary("nodereal", chain=chain, scope="raw_logs", exc=exc)) return {"raw_events": inserted, "errors": errors} def fetch_alchemy_raw_events(client=None, cfg=None, limit=60): cfg = cfg or get_onchain_params() if not cfg.get("alchemy_raw_transfer_enabled", True): return {"raw_events": [], "errors": []} client = client or _alchemy_client(cfg) chains = [c for c in (cfg.get("alchemy_raw_chains") or ("ethereum",)) if client.supports_chain(c)] lookback = max(0, min(12, int(cfg.get("alchemy_raw_block_lookback") or 1))) per_chain = max(1, min(int(cfg.get("alchemy_raw_max_logs_per_chain") or 30), int(limit or 60))) inserted = [] errors = [] for chain in chains: try: latest = client.block_number(chain) if latest <= 0: continue logs = client.get_logs( chain, { "fromBlock": hex(max(0, latest - lookback)), "toBlock": hex(latest), "topics": [TRANSFER_TOPIC], }, ) raw_items = [] for log in logs: if not isinstance(log, dict): continue item = _raw_event_from_evm_transfer(log, chain, source="alchemy") if item: raw_items.append(item) raw_items.sort(key=lambda item: item.get("amount") or 0, reverse=True) for item in raw_items[:per_chain]: item = _apply_raw_event_mapping(item, client=client, cfg=cfg, provider="alchemy") if insert_onchain_raw_event(item): inserted.append(item) except Exception as exc: errors.append(_provider_error_summary("alchemy", chain=chain, scope="raw_logs", exc=exc)) return {"raw_events": inserted, "errors": errors} def _short_addr(value): value = str(value or "") if len(value) <= 12: return value return value[:6] + "..." + value[-4:] def _is_evm_address(value): text = str(value or "").strip() if len(text) != 42 or not text.startswith("0x"): return False try: int(text[2:], 16) return True except Exception: return False def ingest_normalized_events(events): """Test/integration helper for provider adapters.""" init_db() init_onchain_tables() inserted = [] for event in events or []: eid = insert_onchain_event(event) if eid: item = dict(event) item["id"] = eid inserted.append(item) queued = enqueue_onchain_candidates() return {"inserted": len(inserted), "queued": queued.get("queued", 0), "events": inserted, "candidate_result": queued} def _enabled_onchain_providers(cfg): raw = str(cfg.get("provider") or "nodereal").strip().lower() requested = [p.strip() for p in raw.split(",") if p.strip()] if any(p in {"all", "multi", "both"} for p in requested): requested = ["nodereal", "alchemy"] providers = [] if "nodereal" in requested and cfg.get("nodereal_enabled", True): providers.append("nodereal") if "alchemy" in requested and cfg.get("alchemy_enabled", False): providers.append("alchemy") return providers or ["nodereal"] def _candidate_title(event): label = event.get("signal_label") or signal_label(event.get("signal_code")) value = _safe_float(event.get("value_usd")) value_txt = f" · ${value:,.0f}" if value > 0 else "" return f"链上异动 {event.get('symbol')}: {label}{value_txt}" def enqueue_onchain_candidates(min_score=None, min_confidence=None, cooldown_hours=None, limit=20): cfg = get_onchain_params() if not cfg.get("candidate_enabled", True): return {"queued": 0, "skipped": 0, "symbols": [], "reason": "candidate_disabled"} min_score = cfg.get("candidate_min_score", 70) if min_score is None else min_score min_confidence = cfg.get("candidate_min_confidence", 70) if min_confidence is None else min_confidence cooldown_hours = cfg.get("candidate_cooldown_hours", 6) if cooldown_hours is None else cooldown_hours init_onchain_tables() init_event_tables() cutoff = (_now() - timedelta(hours=24)).isoformat() conn = get_conn() queued = [] skipped_ids = [] errors = [] try: rows = conn.execute( """ SELECT e.*, COALESCE(( SELECT m.onchain_score FROM onchain_token_metrics m WHERE m.symbol=e.symbol AND m.chain=e.chain ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1 ), 0) AS latest_onchain_score, COALESCE(( SELECT m.risk_score FROM onchain_token_metrics m WHERE m.symbol=e.symbol AND m.chain=e.chain ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1 ), 0) AS latest_risk_score FROM onchain_events e WHERE e.status IN ('new', 'candidate_failed') AND e.detected_at >= %s AND e.direction='positive' ORDER BY e.confidence DESC, e.value_usd DESC, e.detected_at::timestamp DESC LIMIT %s """, (cutoff, int(limit or 20)), ).fetchall() now = _now().isoformat(timespec="seconds") cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat() for row in rows: event = dict(row) try: symbol = normalize_symbol(event.get("symbol")) if not symbol or not _tradable_symbol(symbol): skipped_ids.append(event["id"]) continue score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence"))) if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0): continue recent = conn.execute( """ SELECT id FROM event_news WHERE source='onchain' AND symbol=%s AND detected_at >= %s LIMIT 1 """, (symbol, cooldown_cutoff), ).fetchone() if recent: skipped_ids.append(event["id"]) continue title = _candidate_title(event) h = event_hash("onchain", title, symbol) inserted = conn.execute( """ INSERT INTO event_news (event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed) VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0) ON CONFLICT(event_hash) DO NOTHING RETURNING id """, ( h, symbol, title, event.get("url") or "", event.get("detected_at") or now, now, event.get("severity") or "A", json.dumps( { "onchain_event_id": event.get("id"), "chain": event.get("chain"), "signal_code": event.get("signal_code"), "signal_label": event.get("signal_label"), "confidence": event.get("confidence"), "value_usd": event.get("value_usd"), "onchain_score": event.get("latest_onchain_score"), "risk_score": event.get("latest_risk_score"), }, ensure_ascii=False, ), ), ).fetchone() if inserted: conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),)) queued.append(symbol) else: skipped_ids.append(event["id"]) conn.commit() except Exception as exc: conn.rollback() symbol = normalize_symbol(event.get("symbol")) errors.append(f"{symbol}:candidate_enqueue:{str(exc)[:160]}") skipped_ids.append(event["id"]) if skipped_ids: conn.execute( "UPDATE onchain_events SET status='candidate_skipped' WHERE id IN (" + ",".join(["%s"] * len(skipped_ids)) + ")", tuple(skipped_ids), ) conn.commit() return {"queued": len(queued), "skipped": len(skipped_ids), "symbols": queued, "errors": errors} except Exception: conn.rollback() raise finally: conn.close() def run_once(limit=60): started = _now() init_db() init_onchain_tables() cfg = get_onchain_params() output = { "status": "disabled" if not cfg.get("enabled") else "processed", "metrics_count": 0, "events_count": 0, "raw_events_count": 0, "candidate_queued": 0, "errors": [], "check_time": _now().isoformat(), } if cfg.get("enabled"): provider_results = {} for provider in _enabled_onchain_providers(cfg): if provider == "alchemy": node = fetch_alchemy_events(limit=limit) else: node = fetch_nodereal_events(limit=limit) provider_results[provider] = { "metrics_count": len(node.get("metrics") or []), "events_count": len(node.get("events") or []), "raw_events_count": len(node.get("raw_events") or []), "diagnostics": node.get("diagnostics") or {}, } output["metrics_count"] += len(node.get("metrics") or []) output["events_count"] += len(node.get("events") or []) output["raw_events_count"] += len(node.get("raw_events") or []) output["errors"].extend(node.get("errors") or []) output["provider_results"] = provider_results output["discovered_mappings"] = 0 if output.get("discovered_mappings"): output["status"] = "bootstrapped" output["metrics_count"] = 0 output["events_count"] = 0 output["raw_events_count"] = 0 for provider in _enabled_onchain_providers(cfg): node = fetch_alchemy_events(limit=limit) if provider == "alchemy" else fetch_nodereal_events(limit=limit) output["metrics_count"] += len(node.get("metrics") or []) output["events_count"] += len(node.get("events") or []) output["raw_events_count"] += len(node.get("raw_events") or []) output["errors"].extend(node.get("errors") or []) queued = enqueue_onchain_candidates() output["candidate_queued"] = queued.get("queued", 0) output["candidate_symbols"] = queued.get("symbols", []) output["errors"].extend(queued.get("errors") or []) if not output["metrics_count"] and not output["events_count"] and not output["raw_events_count"]: output["status"] = "no_onchain_data" log_cron_run( job_name="链上", script_name="onchain_monitor.py", run_status="success" if not output["errors"] else "error", result_status=output["status"], started_at=started.isoformat(), finished_at=_now().isoformat(), duration_ms=int((_now() - started).total_seconds() * 1000), summary={ "metrics_count": output["metrics_count"], "events_count": output["events_count"], "raw_events_count": output["raw_events_count"], "candidate_queued": output["candidate_queued"], "enabled": cfg.get("enabled"), }, error_message="; ".join(output["errors"][:5]), ) print(json.dumps(output, ensure_ascii=False, indent=2, default=str)) return output __all__ = [ "POSITIVE_SIGNALS", "RISK_SIGNALS", "enqueue_onchain_candidates", "fetch_alchemy_events", "fetch_nodereal_events", "get_onchain_params", "ingest_normalized_events", "run_once", "seed_configured_token_mappings", ]