"""On-chain signal collector and candidate bridge. V1 deliberately treats on-chain data as a discovery/risk layer. It writes normalized events/metrics and may request a technical check through event_news, but it never creates recommendations or changes recommendation state directly. """ import json import os from datetime import datetime, timedelta from urllib.parse import urlencode import requests from app.config.system_config import onchain_config from app.db import onchain_db from app.db.altcoin_db import get_conn, init_db, log_cron_run from app.db.tracking_queries import get_latest_price_cache from app.db.onchain_db import ( MIN_MAPPING_CONFIDENCE, POSITIVE_SIGNALS, RISK_SIGNALS, find_mapping_by_contract, get_token_mappings, init_onchain_tables, insert_onchain_event, insert_onchain_raw_event, insert_token_metric, normalize_symbol, signal_direction, signal_label, ) from app.services.event_driven_screener import _event_hash as event_hash from app.services.event_driven_screener import _tradable_symbol, init_event_tables from app.services.nodereal_client import DEFAULT_CHAIN_ENDPOINTS, NodeRealClient, NodeRealConfig DEFAULT_CHAINS = ("ethereum", "bsc") ETHERSCAN_CHAIN_IDS = { "ethereum": "1", "bsc": "56", "base": "8453", "arbitrum": "42161", } SOLANA_AUTO_ALLOWLIST = { "WIF", "BONK", "JUP", "RAY", "PYTH", "PENGU", "JTO", "MEW", "POPCAT", "PNUT", "FARTCOIN", "RENDER", "HNT", "MOBILE", "ORCA", "KMNO", "DRIFT", "TNSR", "IO", } NON_TARGET_NATIVE_BASES = { "AVAX", "FIL", "SUI", "APT", "DOT", "ADA", "XRP", "LTC", "BCH", "ATOM", "NEAR", "SEI", "INJ", "TON", "ETC", "ICP", "HBAR", "ALGO", "VET", "TRX", "XLM", "KAS", "TIA", "EGLD", "FLOW", "KAVA", "MINA", "IOTA", "XMR", "DASH", "ZEC", } BRIDGED_TOKEN_MARKERS = ( "wrapped", "wormhole", "portal", "bridged", "bridge", "axelar", "allbridge", "binance-peg", "multichain", "layerzero", "lz", "wavax", "wfil", ) DEX_CHAIN_ALIASES = { "ethereum": "ethereum", "eth": "ethereum", "bsc": "bsc", "bnb": "bsc", "base": "base", "arbitrum": "arbitrum", "arb": "arbitrum", "solana": "solana", "sol": "solana", } DEXSCREENER_RAW_ENDPOINTS = ( ("token_profile_latest", "https://api.dexscreener.com/token-profiles/latest/v1"), ("token_boost_latest", "https://api.dexscreener.com/token-boosts/latest/v1"), ("token_boost_top", "https://api.dexscreener.com/token-boosts/top/v1"), ) TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" def _env_bool(name, default=False): value = os.getenv(name) if value is None: return default return str(value).strip().lower() in ("1", "true", "yes", "on") def _env_int(name, default): try: return int(os.getenv(name, str(default)) or default) except Exception: return default def _env_float(name, default): try: return float(os.getenv(name, str(default)) or default) except Exception: return default def get_onchain_params(): """Runtime provider config. Keep this out of rules.yaml.""" cfg = onchain_config(DEFAULT_CHAINS) chains_raw = cfg.get("chains") or list(DEFAULT_CHAINS) if isinstance(chains_raw, str): chains = [x.strip().lower() for x in chains_raw.split(",") if x.strip()] else: chains = [str(x).strip().lower() for x in chains_raw if str(x).strip()] etherscan_env = str(cfg.get("etherscan_api_key_env") or "ALPHAX_ETHERSCAN_API_KEY") helius_env = str(cfg.get("helius_api_key_env") or "ALPHAX_HELIUS_API_KEY") nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY") token_mappings_env = str(cfg.get("token_mappings_env") or "ALPHAX_ONCHAIN_TOKEN_MAPPINGS") etherscan_chains_raw = cfg.get("etherscan_chains") or ["ethereum"] if isinstance(etherscan_chains_raw, str): etherscan_chains = [x.strip().lower() for x in etherscan_chains_raw.split(",") if x.strip()] else: etherscan_chains = [str(x).strip().lower() for x in etherscan_chains_raw if str(x).strip()] return { "enabled": bool(cfg.get("enabled", False)), "provider": str(cfg.get("provider") or "nodereal").strip().lower(), "chains": chains or list(DEFAULT_CHAINS), "timeout": int(cfg.get("timeout") or 15), "nodereal_enabled": bool(cfg.get("nodereal_enabled", True)), "nodereal_chains": _normalize_chain_list(cfg.get("nodereal_chains") or ("ethereum", "bsc")), "nodereal_api_key": os.getenv(nodereal_env, "").strip(), "nodereal_api_key_env": nodereal_env, "token_mappings": _load_token_mappings(cfg.get("token_mappings"), os.getenv(token_mappings_env, "")), "token_mappings_env": token_mappings_env, "nodereal_log_block_lookback": int(cfg.get("nodereal_log_block_lookback") or 120), "nodereal_max_logs_per_token": int(cfg.get("nodereal_max_logs_per_token") or 25), "nodereal_raw_transfer_enabled": bool(cfg.get("nodereal_raw_transfer_enabled", True)), "nodereal_raw_block_lookback": int(cfg.get("nodereal_raw_block_lookback") or 1), "nodereal_raw_max_logs_per_chain": int(cfg.get("nodereal_raw_max_logs_per_chain") or 30), "candidate_enabled": bool(cfg.get("candidate_enabled", True)), "candidate_min_score": float(cfg.get("candidate_min_score") or 70), "candidate_min_confidence": int(cfg.get("candidate_min_confidence") or 70), "candidate_cooldown_hours": float(cfg.get("candidate_cooldown_hours") or 6), "dexscreener_enabled": bool(cfg.get("dexscreener_enabled", True)), "etherscan_enabled": bool(cfg.get("etherscan_enabled", True)), "etherscan_chains": etherscan_chains or ["ethereum"], "helius_enabled": bool(cfg.get("helius_enabled", True)), "dex_volume_spike_pct": float(cfg.get("dex_volume_spike_pct") or 80), "dex_min_liquidity_usd": float(cfg.get("dex_min_liquidity_usd") or 100000), "dex_min_volume_24h_usd": float(cfg.get("dex_min_volume_24h_usd") or 100000), "liquidity_add_pct": float(cfg.get("liquidity_add_pct") or 25), "liquidity_remove_pct": float(cfg.get("liquidity_remove_pct") or -25), "dex_hour_volume_share_pct": float(cfg.get("dex_hour_volume_share_pct") or 8), "dex_min_hour_volume_usd": float(cfg.get("dex_min_hour_volume_usd") or 50000), "whale_tx_usd": float(cfg.get("whale_tx_usd") or 250000), "etherscan_base_url": str(cfg.get("etherscan_base_url") or "https://api.etherscan.io/v2/api").strip(), "helius_base_url": str(cfg.get("helius_base_url") or "https://api.helius.xyz").strip().rstrip("/"), "etherscan_api_key": os.getenv(etherscan_env, "").strip(), "helius_api_key": os.getenv(helius_env, "").strip(), } def _normalize_chain_list(value): if isinstance(value, str): return [x.strip().lower() for x in value.split(",") if x.strip()] return [str(x).strip().lower() for x in (value or []) if str(x).strip()] def _load_token_mappings(config_value=None, env_value=""): items = [] if isinstance(config_value, list): items.extend(config_value) if env_value: try: parsed = json.loads(env_value) if isinstance(parsed, list): items.extend(parsed) except Exception: for part in str(env_value or "").split(","): bits = [x.strip() for x in part.split(":")] if len(bits) >= 3: items.append({"symbol": bits[0], "chain": bits[1], "contract_address": bits[2]}) normalized = [] seen = set() for item in items: if not isinstance(item, dict): continue symbol = normalize_symbol(item.get("symbol")) chain = str(item.get("chain") or "").lower().strip() contract = str(item.get("contract_address") or item.get("address") or "").strip() if not symbol or not chain or not contract: continue key = (symbol, chain, contract.lower()) if key in seen: continue seen.add(key) normalized.append({ "symbol": symbol, "chain": chain, "contract_address": contract, "source": item.get("source") or "nodereal_seed", "confidence": int(item.get("confidence") or 95), "raw": item.get("raw") or {}, }) return normalized def seed_configured_token_mappings(cfg=None): cfg = cfg or get_onchain_params() seeded = [] errors = [] for item in cfg.get("token_mappings") or []: try: mapping_id = onchain_db.upsert_token_mapping( item["symbol"], item["chain"], item["contract_address"], source=item.get("source") or "nodereal_seed", confidence=item.get("confidence") or 95, raw=item.get("raw") or {}, is_active=True, ) if mapping_id: seeded.append(item) except Exception as exc: errors.append(f"{item.get('symbol')}:seed_mapping:{str(exc)[:160]}") return {"seeded": len(seeded), "items": seeded, "errors": errors} def _now(): return datetime.now() def _request_json(url, params=None, timeout=15): resp = requests.get(url, params=params or {}, timeout=timeout, headers={"User-Agent": "AlphaX-Agent-Crypto/1.0"}) if resp.status_code >= 400: raise RuntimeError(f"http_{resp.status_code}:{resp.text[:200]}") return resp.json() def _safe_float(value, default=0.0): try: return float(value or 0) except Exception: return default def _safe_pct_change(new_value, old_value): new_value = _safe_float(new_value) old_value = _safe_float(old_value) if old_value <= 0: return 0.0 return (new_value - old_value) / old_value * 100 def _safe_int(value, default=0): try: return int(float(value or 0)) except Exception: return default def _chain_alias(value): key = str(value or "").lower() return DEX_CHAIN_ALIASES.get(key, key) def _chain_explorer_tx_url(chain, tx_hash): tx_hash = str(tx_hash or "").strip() if not tx_hash: return "" if chain == "ethereum": return f"https://etherscan.io/tx/{tx_hash}" if chain == "bsc": return f"https://bscscan.com/tx/{tx_hash}" if chain == "base": return f"https://basescan.org/tx/{tx_hash}" if chain == "arbitrum": return f"https://arbiscan.io/tx/{tx_hash}" if chain == "solana": return f"https://solscan.io/tx/{tx_hash}" return "" def _latest_metric(symbol, chain, contract_address): conn = get_conn() row = conn.execute( """ SELECT * FROM onchain_token_metrics WHERE symbol=%s AND chain=%s AND contract_address=%s AND "window"='1h' ORDER BY metric_time DESC, id DESC LIMIT 1 """, (symbol, chain, contract_address or ""), ).fetchone() conn.close() return dict(row) if row else None def _event_amount(item): return _safe_float(item.get("amount")) def _event_total_amount(item): return _safe_float(item.get("totalAmount") or item.get("total_amount")) def _raw_importance(event_type, item): amount = _event_amount(item) total = _event_total_amount(item) if event_type == "token_boost_top": return max(total, amount, 1) if event_type == "token_boost_latest": return max(amount, total * 0.5, 1) return 1 def normalize_dexscreener_raw_event(item, event_type, cfg=None): cfg = cfg or get_onchain_params() chain = _chain_alias(item.get("chainId")) if chain not in set(cfg.get("chains") or DEFAULT_CHAINS): return None token_address = str(item.get("tokenAddress") or "").strip() if not token_address: return None mapping = find_mapping_by_contract(chain, token_address) links = item.get("links") or [] symbol_guess = "" name = "" if isinstance(links, list): for link in links: if not isinstance(link, dict): continue if not name and link.get("label"): name = str(link.get("label") or "") raw = { "chainId": item.get("chainId"), "tokenAddress": token_address, "url": item.get("url") or "", "description": item.get("description") or "", "icon": item.get("icon") or "", "header": item.get("header") or "", "links": links, "amount": item.get("amount"), "totalAmount": item.get("totalAmount"), } title = "DEX Screener" if event_type == "token_profile_latest": title = "DEX 新币资料变更" elif event_type == "token_boost_latest": title = "DEX 付费曝光新增" elif event_type == "token_boost_top": title = "DEX 付费曝光榜" return { "source": "dexscreener", "chain": chain, "event_type": event_type, "token_address": token_address, "symbol_guess": symbol_guess, "name": name, "title": title, "description": item.get("description") or "", "url": item.get("url") or "", "icon": item.get("icon") or "", "amount": _event_amount(item), "total_amount": _event_total_amount(item), "importance": _raw_importance(event_type, item), "mapped_symbol": mapping.get("symbol") if mapping else "", "mapping_status": "mapped" if mapping else "unmapped", "detected_at": _now().isoformat(timespec="seconds"), "raw": raw, } def fetch_dexscreener_raw_events(limit=80): cfg = get_onchain_params() if not cfg.get("dexscreener_enabled", True): return {"raw_events": [], "errors": ["dexscreener_disabled"]} inserted = [] errors = [] per_source_limit = max(1, int(limit or 80)) for event_type, url in DEXSCREENER_RAW_ENDPOINTS: try: data = _request_json(url, timeout=cfg.get("timeout", 15)) items = data if isinstance(data, list) else data.get("items") or data.get("data") or [] for item in items[:per_source_limit]: if not isinstance(item, dict): continue event = normalize_dexscreener_raw_event(item, event_type, cfg=cfg) if not event: continue if insert_onchain_raw_event(event): inserted.append(event) except Exception as exc: errors.append(f"{event_type}:{str(exc)[:160]}") return {"raw_events": inserted, "errors": errors} def _discover_seed_symbols(limit=120): conn = get_conn() symbols = [] try: rows = conn.execute( """ SELECT DISTINCT symbol FROM recommendation WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history' ORDER BY rec_time DESC LIMIT %s """, (int(limit or 120),), ).fetchall() symbols.extend([row["symbol"] for row in rows if row["symbol"]]) except Exception: pass try: rows = conn.execute( """ SELECT DISTINCT symbol FROM coin_state WHERE state != '过期' ORDER BY detected_at DESC LIMIT %s """, (int(limit or 120),), ).fetchall() symbols.extend([row["symbol"] for row in rows if row["symbol"]]) except Exception: pass conn.close() seen = set() ordered = [] for symbol in symbols: norm = normalize_symbol(symbol) if not norm or norm in seen or not _tradable_symbol(norm): continue seen.add(norm) ordered.append(norm) return ordered[: int(limit or 120)] def _score_pair_candidate(pair, requested_symbol, chains): base = (pair.get("baseToken") or {}) quote = (pair.get("quoteToken") or {}) base_symbol = str(base.get("symbol") or "").upper() req_base = str(requested_symbol or "").split("/")[0].upper() liquidity = _safe_float((pair.get("liquidity") or {}).get("usd")) volume = _safe_float((pair.get("volume") or {}).get("h24")) chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or "").lower(), str(pair.get("chainId") or "").lower()) score = 0 if base_symbol == req_base: score += 50 if chain in set(chains or []): score += 15 if quote.get("symbol") in ("USDT", "USDC", "USD", "FDUSD", "USDE", "DAI", "USDS"): score += 10 if liquidity >= 100000: score += 10 if volume >= 100000: score += 10 if liquidity >= 500000: score += 5 return score def _pair_rejection_reason(pair, requested_symbol, chains): base = pair.get("baseToken") or {} quote = pair.get("quoteToken") or {} req_base = str(requested_symbol or "").split("/")[0].upper() base_symbol = str(base.get("symbol") or "").upper() base_name = str(base.get("name") or "").lower() pair_url = str(pair.get("url") or "").lower() chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or "").lower(), str(pair.get("chainId") or "").lower()) if base_symbol != req_base: return "symbol_mismatch" if chain not in set(chains or []): return "chain_not_supported" if req_base in NON_TARGET_NATIVE_BASES: return "native_chain_not_in_scope" if chain == "solana" and req_base not in SOLANA_AUTO_ALLOWLIST: return "solana_not_allowlisted" text = " ".join([base_name, base_symbol.lower(), str(quote.get("symbol") or "").lower(), pair_url]) if any(marker in text for marker in BRIDGED_TOKEN_MARKERS): return "bridged_or_wrapped_token" return "" def discover_token_mappings(limit=60): cfg = get_onchain_params() chains = set(cfg.get("chains") or DEFAULT_CHAINS) seeds = _discover_seed_symbols(limit=limit) if not seeds: return {"inserted": 0, "candidates": [], "errors": ["no_seed_symbols"]} inserted = [] errors = [] for symbol in seeds: existing = get_token_mappings(symbol, min_confidence=1, active_only=False) if existing: continue base = symbol.split("/")[0] try: data = _request_json("https://api.dexscreener.com/latest/dex/search", params={"q": base}, timeout=cfg.get("timeout", 15)) pairs = data.get("pairs") or [] pair_candidates = [] for pair in pairs: chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or "").lower(), str(pair.get("chainId") or "").lower()) if chain not in chains: continue if _pair_rejection_reason(pair, symbol, chains): continue pair_candidates.append((pair, _score_pair_candidate(pair, symbol, chains))) if not pair_candidates: continue pair_candidates.sort(key=lambda x: (x[1], _safe_float((x[0].get("liquidity") or {}).get("usd")), _safe_float((x[0].get("volume") or {}).get("h24"))), reverse=True) best, score = pair_candidates[0] if score < 55: continue base_token = best.get("baseToken") or {} chain = DEX_CHAIN_ALIASES.get(str(best.get("chainId") or "").lower(), str(best.get("chainId") or "").lower()) contract = str(base_token.get("address") or "").strip() if not contract: continue confidence = min(95, 60 + score) mapping_id = onchain_db.upsert_token_mapping( symbol=symbol, chain=chain, contract_address=contract, source="dexscreener_search", confidence=confidence, raw={ "search_query": base, "matched_pair": { "pairAddress": best.get("pairAddress") or "", "dexId": best.get("dexId") or "", "url": best.get("url") or "", "liquidity": best.get("liquidity") or {}, "volume": best.get("volume") or {}, "priceChange": best.get("priceChange") or {}, "baseToken": base_token, "quoteToken": best.get("quoteToken") or {}, }, }, is_active=True, ) if mapping_id: inserted.append({"symbol": symbol, "chain": chain, "contract_address": contract, "confidence": confidence}) except Exception as exc: errors.append(f"{symbol}:{str(exc)[:160]}") return {"inserted": len(inserted), "candidates": inserted, "errors": errors} def _score_metric(metric): score = 0.0 risk = 0.0 vol_change = _safe_float(metric.get("dex_volume_change_pct")) liq_change = _safe_float(metric.get("liquidity_change_pct")) netflow = _safe_float(metric.get("exchange_netflow_usd")) whale = _safe_float(metric.get("whale_accumulation_usd")) smart = _safe_float(metric.get("smart_money_score")) if vol_change > 0: score += min(35, vol_change / 4) if liq_change > 0: score += min(20, liq_change / 3) if netflow < 0: score += min(20, abs(netflow) / 100000) if whale > 0: score += min(20, whale / 100000) score += min(20, smart) if liq_change < 0: risk += min(40, abs(liq_change)) if netflow > 0: risk += min(35, netflow / 100000) metric["onchain_score"] = round(min(score, 100), 2) metric["risk_score"] = round(min(risk, 100), 2) return metric def derive_dex_signals(metric, cfg=None): cfg = cfg or get_onchain_params() signals = [] vol_change = _safe_float(metric.get("dex_volume_change_pct")) liq_change = _safe_float(metric.get("liquidity_change_pct")) volume_1h = _safe_float(metric.get("dex_volume_1h_usd")) volume_24h = _safe_float(metric.get("dex_volume_usd")) hour_share_pct = (volume_1h / volume_24h * 100) if volume_24h > 0 else 0 if vol_change >= cfg.get("dex_volume_spike_pct", 80): signals.append("dex_volume_spike") elif ( volume_1h >= cfg.get("dex_min_hour_volume_usd", 50000) and hour_share_pct >= cfg.get("dex_hour_volume_share_pct", 8) ): signals.append("dex_volume_spike") if liq_change >= cfg.get("liquidity_add_pct", 25): signals.append("liquidity_add") if liq_change <= cfg.get("liquidity_remove_pct", -25): signals.append("liquidity_remove_risk") return signals def _event_from_metric(metric, signal_code, source="dexscreener"): direction = signal_direction(signal_code) severity = "RISK" if direction == "risk" else "A" if _safe_float(metric.get("onchain_score")) >= 75 else "B" return { "chain": metric.get("chain"), "symbol": metric.get("symbol"), "contract_address": metric.get("contract_address") or "", "event_type": "onchain_signal", "signal_code": signal_code, "signal_label": signal_label(signal_code), "direction": direction, "value_usd": metric.get("dex_volume_usd") or metric.get("whale_accumulation_usd") or abs(metric.get("exchange_netflow_usd") or 0), "confidence": 75 if direction != "risk" else 80, "severity": severity, "detected_at": metric.get("metric_time") or _now().isoformat(), "source": source, "url": metric.get("url") or "", "raw": metric, } def normalize_dexscreener_pair(pair, mapping, cfg=None): cfg = cfg or get_onchain_params() symbol = normalize_symbol(mapping.get("symbol")) chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or mapping.get("chain") or "").lower(), str(mapping.get("chain") or "").lower()) contract = mapping.get("contract_address") or (pair.get("baseToken") or {}).get("address") or "" liquidity = _safe_float((pair.get("liquidity") or {}).get("usd")) volume_map = pair.get("volume") or {} volume = _safe_float(volume_map.get("h24")) volume_1h = _safe_float(volume_map.get("h1")) volume_5m = _safe_float(volume_map.get("m5")) volume_6h = _safe_float(volume_map.get("h6")) txns_map = pair.get("txns") or {} txns_h1 = txns_map.get("h1") or {} prev = _latest_metric(symbol, chain, contract) prev_volume = _safe_float(prev.get("dex_volume_usd") if prev else 0) prev_liquidity = _safe_float(prev.get("liquidity_usd") if prev else 0) metric = { "symbol": symbol, "chain": chain, "contract_address": contract, "window": "1h", "metric_time": _now().isoformat(timespec="seconds"), "dex_volume_usd": volume, "dex_volume_1h_usd": volume_1h, "dex_volume_5m_usd": volume_5m, "dex_volume_6h_usd": volume_6h, "dex_volume_1h_share_pct": round(volume_1h / volume * 100, 2) if volume > 0 else 0, "dex_volume_change_pct": _safe_pct_change(volume, prev_volume), "liquidity_usd": liquidity, "liquidity_change_pct": _safe_pct_change(liquidity, prev_liquidity), "exchange_netflow_usd": 0, "whale_accumulation_usd": 0, "holder_delta": 0, "smart_money_score": 0, "source": "dexscreener", "url": pair.get("url") or "", "raw": { "pair_address": pair.get("pairAddress") or "", "dex_id": pair.get("dexId") or "", "price_usd": pair.get("priceUsd") or "", "fdv": pair.get("fdv") or 0, "txns": pair.get("txns") or {}, "price_change": pair.get("priceChange") or {}, "volume": pair.get("volume") or {}, "liquidity": pair.get("liquidity") or {}, "derived": { "dex_volume_1h_usd": volume_1h, "dex_volume_5m_usd": volume_5m, "dex_volume_6h_usd": volume_6h, "dex_volume_1h_share_pct": round(volume_1h / volume * 100, 2) if volume > 0 else 0, "h1_buys": int(txns_h1.get("buys") or 0), "h1_sells": int(txns_h1.get("sells") or 0), }, }, } return _score_metric(metric) def fetch_dexscreener_metrics(limit=60): cfg = get_onchain_params() if not cfg.get("dexscreener_enabled", True): return {"metrics": [], "events": [], "errors": ["dexscreener_disabled"]} mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) bootstrap = None if not mappings: bootstrap = discover_token_mappings(limit=limit) mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) metrics = [] events = [] errors = [] if bootstrap: errors.extend(bootstrap.get("errors") or []) for mapping in mappings[: int(limit or 60)]: symbol = normalize_symbol(mapping.get("symbol")) if not symbol or not _tradable_symbol(symbol): continue try: url = "https://api.dexscreener.com/latest/dex/tokens/" + str(mapping.get("contract_address") or "").strip() data = _request_json(url, timeout=cfg.get("timeout", 15)) pairs = data.get("pairs") or [] wanted_chain = DEX_CHAIN_ALIASES.get(str(mapping.get("chain") or "").lower(), str(mapping.get("chain") or "").lower()) pairs = [p for p in pairs if DEX_CHAIN_ALIASES.get(str(p.get("chainId") or "").lower(), str(p.get("chainId") or "").lower()) == wanted_chain] if not pairs: continue best = max(pairs, key=lambda p: _safe_float((p.get("liquidity") or {}).get("usd"))) metric = normalize_dexscreener_pair(best, mapping, cfg=cfg) if metric.get("liquidity_usd", 0) < cfg.get("dex_min_liquidity_usd", 100000) and metric.get("dex_volume_usd", 0) < cfg.get("dex_min_volume_24h_usd", 100000): insert_token_metric(metric) metrics.append(metric) continue insert_token_metric(metric) metrics.append(metric) for code in derive_dex_signals(metric, cfg): event = _event_from_metric(metric, code, source="dexscreener") if insert_onchain_event(event): events.append(event) except Exception as exc: errors.append(f"{symbol}:{str(exc)[:160]}") return {"metrics": metrics, "events": events, "errors": errors} def _event_from_etherscan_transfer(row, mapping, cfg=None): cfg = cfg or get_onchain_params() decimals = _safe_int(row.get("tokenDecimal"), 18) amount = _safe_float(row.get("value")) / (10 ** decimals if decimals >= 0 else 1) price_usd = _latest_price_from_metric(mapping) value_usd = amount * price_usd if price_usd > 0 else 0 threshold = _safe_float(cfg.get("whale_tx_usd"), 250000) if value_usd > 0 and value_usd < threshold: return None if value_usd <= 0 and amount <= 0: return None tx_hash = str(row.get("hash") or "").strip() chain = str(mapping.get("chain") or "").lower() return { "chain": chain, "symbol": mapping.get("symbol"), "contract_address": mapping.get("contract_address") or "", "event_type": "token_transfer", "signal_code": "whale_accumulation" if value_usd >= threshold else "large_token_transfer", "signal_label": signal_label("whale_accumulation" if value_usd >= threshold else "large_token_transfer"), "direction": "positive" if value_usd >= threshold else "neutral", "value_usd": value_usd, "amount": amount, "tx_hash": tx_hash, "wallet_address": row.get("to") or "", "wallet_label": "EVM 接收地址", "counterparty_label": "EVM 发送地址 " + _short_addr(row.get("from") or ""), "confidence": 74 if value_usd >= threshold else 58, "severity": "A" if value_usd >= threshold else "B", "detected_at": _ts_to_iso(row.get("timeStamp")), "source": "etherscan", "url": _chain_explorer_tx_url(chain, tx_hash), "raw": row, } def _latest_price_from_metric(mapping): symbol = normalize_symbol(mapping.get("symbol")) chain = str(mapping.get("chain") or "").lower() contract = str(mapping.get("contract_address") or "") conn = get_conn() try: rows = conn.execute( """ SELECT raw_json FROM onchain_token_metrics WHERE symbol=%s AND chain=%s AND contract_address=%s ORDER BY metric_time DESC, id DESC LIMIT 8 """, (symbol, chain, contract), ).fetchall() finally: conn.close() for row in rows: try: raw = json.loads(row.get("raw_json") or "{}") except Exception: raw = {} price = _safe_float(raw.get("price_usd")) if price > 0: return price cache = get_latest_price_cache([symbol]) item = cache.get(symbol) or {} return _safe_float(item.get("price")) def _hex_to_int(value): text = str(value or "").strip() if not text: return 0 try: return int(text, 16) if text.startswith("0x") else int(text) except Exception: return 0 def _topic_to_address(topic): topic = str(topic or "").lower() if topic.startswith("0x") and len(topic) >= 42: return "0x" + topic[-40:] return "" def _nodereal_client(cfg=None): cfg = cfg or get_onchain_params() return NodeRealClient( NodeRealConfig( api_key=cfg.get("nodereal_api_key") or "", timeout=int(cfg.get("timeout") or 15), endpoints=dict(DEFAULT_CHAIN_ENDPOINTS), ) ) def _event_from_nodereal_transfer(log, mapping, cfg=None): cfg = cfg or get_onchain_params() topics = log.get("topics") or [] if len(topics) < 3: return None amount_raw = _hex_to_int(log.get("data")) mapping_raw = {} try: mapping_raw = json.loads(mapping.get("raw_json") or "{}") except Exception: mapping_raw = {} decimals = _safe_int(mapping_raw.get("decimals") or mapping_raw.get("tokenDecimal") or 18, 18) amount = amount_raw / (10 ** decimals if decimals >= 0 else 1) price_usd = _latest_price_from_metric(mapping) value_usd = amount * price_usd if price_usd > 0 else 0 threshold = _safe_float(cfg.get("whale_tx_usd"), 250000) if value_usd <= 0 or value_usd < threshold: return None chain = str(mapping.get("chain") or "").lower() tx_hash = str(log.get("transactionHash") or "").strip() return { "chain": chain, "symbol": mapping.get("symbol"), "contract_address": mapping.get("contract_address") or "", "event_type": "token_transfer", "signal_code": "whale_accumulation", "signal_label": signal_label("whale_accumulation"), "direction": "positive", "value_usd": value_usd, "amount": amount, "tx_hash": tx_hash, "wallet_address": _topic_to_address(topics[2]), "wallet_label": "EVM 接收地址", "counterparty_label": "EVM 发送地址 " + _short_addr(_topic_to_address(topics[1])), "confidence": 76, "severity": "A", "detected_at": _now().isoformat(timespec="seconds"), "source": "nodereal", "url": _chain_explorer_tx_url(chain, tx_hash), "raw": log, } def _raw_event_from_nodereal_transfer(log, chain): topics = log.get("topics") or [] if len(topics) < 3: return None contract = str(log.get("address") or "").strip() tx_hash = str(log.get("transactionHash") or "").strip() amount_raw = _hex_to_int(log.get("data")) if not contract or amount_raw <= 0: return None from_addr = _topic_to_address(topics[1]) to_addr = _topic_to_address(topics[2]) return { "source": "nodereal", "chain": str(chain or "").lower(), "event_type": "evm_transfer", "token_address": contract, "symbol_guess": "", "name": "", "title": "NodeReal ERC-20 原始转账", "description": f"合约 {_short_addr(contract)} · {_short_addr(from_addr)} -> {_short_addr(to_addr)}", "url": _chain_explorer_tx_url(chain, tx_hash), "amount": amount_raw, "total_amount": 0, "importance": min(100, max(1, len(str(amount_raw)) * 4)), "mapped_symbol": "", "mapping_status": "unmapped", "detected_at": _now().isoformat(timespec="seconds"), "raw": log, } def _metric_from_nodereal_holder_count(holder_count, mapping): symbol = normalize_symbol(mapping.get("symbol")) chain = str(mapping.get("chain") or "").lower() contract = str(mapping.get("contract_address") or "") prev = _latest_metric(symbol, chain, contract) prev_count = 0 if prev: try: prev_raw = json.loads(prev.get("raw_json") or "{}") prev_count = _safe_int(prev_raw.get("holder_count")) except Exception: prev_count = 0 holder_delta = holder_count - prev_count if prev_count > 0 else 0 metric = { "symbol": symbol, "chain": chain, "contract_address": contract, "window": "1h", "metric_time": _now().isoformat(timespec="seconds"), "holder_delta": holder_delta, "smart_money_score": 0, "source": "nodereal", "raw": { "holder_count": holder_count, "previous_holder_count": prev_count, }, } if holder_delta > 0: metric["onchain_score"] = min(30, holder_delta) elif holder_delta < 0: metric["risk_score"] = min(30, abs(holder_delta)) return metric def _event_from_holder_metric(metric): holder_delta = _safe_float(metric.get("holder_delta")) if holder_delta <= 0: return None if holder_delta < 20: return None return _event_from_metric(metric, "holder_growth", source="nodereal") def fetch_nodereal_events(limit=60): cfg = get_onchain_params() if not cfg.get("nodereal_enabled", True): return {"metrics": [], "events": [], "errors": ["nodereal_disabled"]} if not cfg.get("nodereal_api_key"): return {"metrics": [], "events": [], "errors": ["nodereal_api_key_missing"]} seed_result = seed_configured_token_mappings(cfg) client = _nodereal_client(cfg) raw_result = fetch_nodereal_raw_events(client=client, cfg=cfg, limit=limit) enabled_chains = set(cfg.get("nodereal_chains") or DEFAULT_CHAINS) all_mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) chain_mappings = [m for m in all_mappings if str(m.get("chain") or "").lower() in enabled_chains] mappings = [] unsupported_chains = set() for mapping in chain_mappings: chain = str(mapping.get("chain") or "").lower() if client.supports_chain(chain): mappings.append(mapping) else: unsupported_chains.add(chain) metrics = [] events = [] errors = list(seed_result.get("errors") or []) + list(raw_result.get("errors") or []) diagnostics = { "seeded_mappings": seed_result.get("seeded", 0), "mapping_total": len(all_mappings), "chain_mapping_total": len(chain_mappings), "supported_mapping_total": len(mappings), "enabled_chains": sorted(enabled_chains), "unsupported_chains": sorted(unsupported_chains), } lookback = max(1, int(cfg.get("nodereal_log_block_lookback") or 120)) max_logs = max(1, int(cfg.get("nodereal_max_logs_per_token") or 25)) for mapping in mappings[: int(limit or 60)]: chain = str(mapping.get("chain") or "").lower() contract = str(mapping.get("contract_address") or "").strip() if not contract: continue try: holder_count = client.token_holder_count(chain, contract) if holder_count: metric = _metric_from_nodereal_holder_count(holder_count, mapping) insert_token_metric(metric) metrics.append(metric) holder_event = _event_from_holder_metric(metric) if holder_event and insert_onchain_event(holder_event): events.append(holder_event) except Exception as exc: errors.append(f"{mapping.get('symbol')}:nodereal_holder:{str(exc)[:160]}") try: latest = client.block_number(chain) if latest <= 0: continue logs = client.get_logs( chain, { "address": contract, "fromBlock": hex(max(0, latest - lookback)), "toBlock": hex(latest), "topics": [TRANSFER_TOPIC], }, ) for log in logs[:max_logs]: if not isinstance(log, dict): continue event = _event_from_nodereal_transfer(log, mapping, cfg=cfg) if not event: continue if insert_onchain_event(event): events.append(event) except Exception as exc: errors.append(f"{mapping.get('symbol')}:nodereal_logs:{str(exc)[:160]}") if not all_mappings: diagnostics["mapping_note"] = "no_strategy_mappings_raw_events_only" elif not chain_mappings: diagnostics["mapping_note"] = "no_enabled_chain_mappings_raw_events_only" elif not mappings: diagnostics["mapping_note"] = "no_supported_mappings_raw_events_only" return { "metrics": metrics, "events": events, "raw_events": raw_result.get("raw_events") or [], "errors": errors, "diagnostics": diagnostics, } def fetch_nodereal_raw_events(client=None, cfg=None, limit=60): cfg = cfg or get_onchain_params() if not cfg.get("nodereal_raw_transfer_enabled", True): return {"raw_events": [], "errors": []} client = client or _nodereal_client(cfg) chains = [c for c in (cfg.get("nodereal_chains") or DEFAULT_CHAINS) if client.supports_chain(c)] lookback = max(0, min(12, int(cfg.get("nodereal_raw_block_lookback") or 1))) per_chain = max(1, min(int(cfg.get("nodereal_raw_max_logs_per_chain") or 30), int(limit or 60))) inserted = [] errors = [] for chain in chains: try: latest = client.block_number(chain) if latest <= 0: continue logs = client.get_logs( chain, { "fromBlock": hex(max(0, latest - lookback)), "toBlock": hex(latest), "topics": [TRANSFER_TOPIC], }, ) raw_items = [] for log in logs: if not isinstance(log, dict): continue item = _raw_event_from_nodereal_transfer(log, chain) if item: raw_items.append(item) raw_items.sort(key=lambda item: item.get("amount") or 0, reverse=True) for item in raw_items[:per_chain]: if insert_onchain_raw_event(item): inserted.append(item) except Exception as exc: errors.append(f"{chain}:nodereal_raw_logs:{str(exc)[:160]}") return {"raw_events": inserted, "errors": errors} def fetch_etherscan_events(limit=60): cfg = get_onchain_params() if not cfg.get("etherscan_enabled", True): return {"events": [], "errors": ["etherscan_disabled"]} api_key = cfg.get("etherscan_api_key") or "" if not api_key: return {"events": [], "errors": ["etherscan_api_key_missing"]} allowed_chains = set(cfg.get("etherscan_chains") or ["ethereum"]) mappings = [ m for m in get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) if m.get("chain") in ETHERSCAN_CHAIN_IDS and m.get("chain") in allowed_chains ] events = [] errors = [] for mapping in mappings[: int(limit or 60)]: chain = str(mapping.get("chain") or "").lower() contract = str(mapping.get("contract_address") or "").strip() if not contract: continue params = { "chainid": ETHERSCAN_CHAIN_IDS[chain], "module": "account", "action": "tokentx", "contractaddress": contract, "page": 1, "offset": 25, "sort": "desc", "apikey": api_key, } try: data = _request_json(cfg.get("etherscan_base_url"), params=params, timeout=cfg.get("timeout", 15)) status = str(data.get("status") or "") message = str(data.get("message") or "") rows = data.get("result") or [] if status == "0" and not isinstance(rows, list): if "No transactions found" not in str(rows) and "No records" not in str(rows): errors.append(f"{mapping.get('symbol')}:etherscan_{message}:{str(rows)[:100]}") continue if not isinstance(rows, list): continue for row in rows: if not isinstance(row, dict): continue event = _event_from_etherscan_transfer(row, mapping, cfg=cfg) if not event: continue if insert_onchain_event(event): events.append(event) except Exception as exc: errors.append(f"{mapping.get('symbol')}:etherscan:{str(exc)[:160]}") return {"events": events, "errors": errors} def _event_from_helius_tx(tx, mapping, cfg=None): cfg = cfg or get_onchain_params() mint = str(mapping.get("contract_address") or "") symbol = normalize_symbol(mapping.get("symbol")) signature = str(tx.get("signature") or "") token_transfers = tx.get("tokenTransfers") or [] native_transfers = tx.get("nativeTransfers") or [] matched = [t for t in token_transfers if str(t.get("mint") or "") == mint] if not matched: return None amount = max(_safe_float(t.get("tokenAmount")) for t in matched) price_usd = _latest_price_from_metric(mapping) value_usd = amount * price_usd if price_usd > 0 else 0 sol_amount = max([_safe_float(t.get("amount")) / 1_000_000_000 for t in native_transfers] or [0]) threshold = _safe_float(cfg.get("whale_tx_usd"), 250000) if value_usd > 0 and value_usd < threshold and sol_amount < 100: return None signal = "whale_accumulation" if value_usd >= threshold or sol_amount >= 100 else "large_token_transfer" return { "chain": "solana", "symbol": symbol, "contract_address": mint, "event_type": "solana_token_activity", "signal_code": signal, "signal_label": signal_label(signal), "direction": "positive" if signal == "whale_accumulation" else "neutral", "value_usd": value_usd, "amount": amount, "tx_hash": signature, "wallet_address": (matched[0].get("toUserAccount") or matched[0].get("userAccount") or ""), "wallet_label": "Solana 接收地址", "counterparty_label": "Solana 发送地址 " + _short_addr(matched[0].get("fromUserAccount") or ""), "confidence": 74 if signal == "whale_accumulation" else 58, "severity": "A" if signal == "whale_accumulation" else "B", "detected_at": _ts_to_iso(tx.get("timestamp")), "source": "helius", "url": _chain_explorer_tx_url("solana", signature), "raw": tx, } def fetch_helius_events(limit=60): cfg = get_onchain_params() if not cfg.get("helius_enabled", True): return {"events": [], "errors": ["helius_disabled"]} api_key = cfg.get("helius_api_key") or "" if not api_key: return {"events": [], "errors": ["helius_api_key_missing"]} mappings = [m for m in get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) if m.get("chain") == "solana"] events = [] errors = [] for mapping in mappings[: int(limit or 60)]: mint = str(mapping.get("contract_address") or "").strip() if not mint: continue query = urlencode({"api-key": api_key, "limit": 25}) url = f"{cfg.get('helius_base_url')}/v0/addresses/{mint}/transactions?{query}" try: data = _request_json(url, timeout=cfg.get("timeout", 15)) rows = data if isinstance(data, list) else data.get("transactions") or [] for tx in rows: if not isinstance(tx, dict): continue event = _event_from_helius_tx(tx, mapping, cfg=cfg) if not event: continue if insert_onchain_event(event): events.append(event) except Exception as exc: errors.append(f"{mapping.get('symbol')}:helius:{str(exc)[:160]}") return {"events": events, "errors": errors} def _ts_to_iso(value): try: if value: return datetime.fromtimestamp(float(value)).isoformat(timespec="seconds") except Exception: pass return _now().isoformat(timespec="seconds") def _short_addr(value): value = str(value or "") if len(value) <= 12: return value return value[:6] + "..." + value[-4:] def ingest_normalized_events(events): """Test/integration helper for provider adapters.""" init_db() init_onchain_tables() inserted = [] for event in events or []: eid = insert_onchain_event(event) if eid: item = dict(event) item["id"] = eid inserted.append(item) queued = enqueue_onchain_candidates() return {"inserted": len(inserted), "queued": queued.get("queued", 0), "events": inserted, "candidate_result": queued} def _candidate_title(event): label = event.get("signal_label") or signal_label(event.get("signal_code")) value = _safe_float(event.get("value_usd")) value_txt = f" · ${value:,.0f}" if value > 0 else "" return f"链上异动 {event.get('symbol')}: {label}{value_txt}" def enqueue_onchain_candidates(min_score=None, min_confidence=None, cooldown_hours=None, limit=20): cfg = get_onchain_params() if not cfg.get("candidate_enabled", True): return {"queued": 0, "skipped": 0, "symbols": [], "reason": "candidate_disabled"} min_score = cfg.get("candidate_min_score", 70) if min_score is None else min_score min_confidence = cfg.get("candidate_min_confidence", 70) if min_confidence is None else min_confidence cooldown_hours = cfg.get("candidate_cooldown_hours", 6) if cooldown_hours is None else cooldown_hours init_onchain_tables() init_event_tables() cutoff = (_now() - timedelta(hours=24)).isoformat() conn = get_conn() rows = conn.execute( """ SELECT e.*, COALESCE(( SELECT m.onchain_score FROM onchain_token_metrics m WHERE m.symbol=e.symbol AND m.chain=e.chain ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1 ), 0) AS latest_onchain_score, COALESCE(( SELECT m.risk_score FROM onchain_token_metrics m WHERE m.symbol=e.symbol AND m.chain=e.chain ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1 ), 0) AS latest_risk_score FROM onchain_events e WHERE e.status IN ('new', 'candidate_failed') AND e.detected_at >= %s AND e.direction='positive' ORDER BY e.confidence DESC, e.value_usd DESC, e.detected_at::timestamp DESC LIMIT %s """, (cutoff, int(limit or 20)), ).fetchall() queued = [] skipped_ids = [] now = _now().isoformat(timespec="seconds") cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat() for row in rows: event = dict(row) symbol = normalize_symbol(event.get("symbol")) if not symbol or not _tradable_symbol(symbol): skipped_ids.append(event["id"]) continue score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence"))) if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0): continue recent = conn.execute( """ SELECT id FROM event_news WHERE source='onchain' AND symbol=%s AND detected_at >= %s LIMIT 1 """, (symbol, cooldown_cutoff), ).fetchone() if recent: skipped_ids.append(event["id"]) continue title = _candidate_title(event) h = event_hash("onchain", title, symbol) try: conn.execute( """ INSERT INTO event_news (event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed) VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0) """, ( h, symbol, title, event.get("url") or "", event.get("detected_at") or now, now, event.get("severity") or "A", json.dumps( { "onchain_event_id": event.get("id"), "chain": event.get("chain"), "signal_code": event.get("signal_code"), "signal_label": event.get("signal_label"), "confidence": event.get("confidence"), "value_usd": event.get("value_usd"), "onchain_score": event.get("latest_onchain_score"), "risk_score": event.get("latest_risk_score"), }, ensure_ascii=False, ), ), ) conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),)) queued.append(symbol) except Exception: skipped_ids.append(event["id"]) if skipped_ids: conn.execute( "UPDATE onchain_events SET status='candidate_skipped' WHERE id IN (" + ",".join(["%s"] * len(skipped_ids)) + ")", tuple(skipped_ids), ) conn.commit() conn.close() return {"queued": len(queued), "skipped": len(skipped_ids), "symbols": queued} def run_once(limit=60): started = _now() init_db() init_onchain_tables() cfg = get_onchain_params() output = { "status": "disabled" if not cfg.get("enabled") else "processed", "metrics_count": 0, "events_count": 0, "raw_events_count": 0, "candidate_queued": 0, "errors": [], "check_time": _now().isoformat(), } if cfg.get("enabled"): node = fetch_nodereal_events(limit=limit) output["metrics_count"] += len(node.get("metrics") or []) output["events_count"] += len(node.get("events") or []) output["raw_events_count"] += len(node.get("raw_events") or []) output["errors"].extend(node.get("errors") or []) output["discovered_mappings"] = 0 if output.get("discovered_mappings"): output["status"] = "bootstrapped" node = fetch_nodereal_events(limit=limit) output["metrics_count"] = len(node.get("metrics") or []) output["events_count"] = len(node.get("events") or []) output["errors"].extend(node.get("errors") or []) queued = enqueue_onchain_candidates() output["candidate_queued"] = queued.get("queued", 0) output["candidate_symbols"] = queued.get("symbols", []) if not output["metrics_count"] and not output["events_count"] and not output["raw_events_count"]: output["status"] = "no_onchain_data" log_cron_run( job_name="链上", script_name="onchain_monitor.py", run_status="success" if not output["errors"] else "error", result_status=output["status"], started_at=started.isoformat(), finished_at=_now().isoformat(), duration_ms=int((_now() - started).total_seconds() * 1000), summary={ "metrics_count": output["metrics_count"], "events_count": output["events_count"], "raw_events_count": output["raw_events_count"], "candidate_queued": output["candidate_queued"], "enabled": cfg.get("enabled"), }, error_message="; ".join(output["errors"][:5]), ) print(json.dumps(output, ensure_ascii=False, indent=2, default=str)) return output __all__ = [ "POSITIVE_SIGNALS", "RISK_SIGNALS", "derive_dex_signals", "enqueue_onchain_candidates", "fetch_dexscreener_metrics", "fetch_dexscreener_raw_events", "fetch_etherscan_events", "fetch_helius_events", "fetch_nodereal_events", "get_onchain_params", "ingest_normalized_events", "normalize_dexscreener_pair", "run_once", "seed_configured_token_mappings", ]