From 0977696a5c35b84abaca3e694eeed3b0de16ec66 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Sun, 24 May 2026 11:28:07 +0800 Subject: [PATCH] 1 --- .env.example | 12 + AGENTS.md | 7 +- app/config/system_config.py | 11 + app/db/onchain_db.py | 26 +- app/services/alchemy_client.py | 85 +++++++ app/services/onchain_monitor.py | 428 ++++++++++++++++++++++++-------- tests/test_onchain_tracking.py | 127 +++++++++- tests/test_runtime_config.py | 10 + 8 files changed, 590 insertions(+), 116 deletions(-) create mode 100644 app/services/alchemy_client.py diff --git a/.env.example b/.env.example index c9f3248..8f1c5f7 100644 --- a/.env.example +++ b/.env.example @@ -42,11 +42,15 @@ ALPHAX_LLM_REVIEW_ENABLED=1 # 链上追踪运行时配置。默认关闭;开启后采集结果只作为发现/风控辅助。 ALPHAX_ONCHAIN_ENABLED=0 ALPHAX_ONCHAIN_PROVIDER=nodereal +# 可选:切换到 Alchemy 可设为 alchemy;并行可设为 nodereal,alchemy。 ALPHAX_ONCHAIN_CHAINS=ethereum,bsc ALPHAX_ONCHAIN_TIMEOUT=15 ALPHAX_NODEREAL_ENABLED=1 ALPHAX_NODEREAL_CHAINS=ethereum,bsc ALPHAX_NODEREAL_API_KEY= +ALPHAX_ALCHEMY_ENABLED=0 +ALPHAX_ALCHEMY_CHAINS=ethereum,bsc +ALPHAX_ALCHEMY_API_KEY= # 可选:生产若 onchain_token_map 为空,可用 JSON 数组自举 NodeReal 合约映射。 # 示例:[{"symbol":"STORJ/USDT","chain":"ethereum","contract_address":"0x...","confidence":95}] ALPHAX_ONCHAIN_TOKEN_MAPPINGS= @@ -57,6 +61,14 @@ ALPHAX_NODEREAL_RAW_BLOCK_LOOKBACK=1 ALPHAX_NODEREAL_RAW_MAX_LOGS_PER_CHAIN=30 ALPHAX_NODEREAL_AUTO_MAPPING_ENABLED=1 ALPHAX_NODEREAL_AUTO_MAPPING_CONFIDENCE=82 +ALPHAX_ALCHEMY_LOG_BLOCK_LOOKBACK=9 +ALPHAX_ALCHEMY_MAX_LOGS_PER_TOKEN=25 +ALPHAX_ALCHEMY_RAW_TRANSFER_ENABLED=1 +ALPHAX_ALCHEMY_RAW_CHAINS=ethereum +ALPHAX_ALCHEMY_RAW_BLOCK_LOOKBACK=1 +ALPHAX_ALCHEMY_RAW_MAX_LOGS_PER_CHAIN=8 +ALPHAX_ALCHEMY_AUTO_MAPPING_ENABLED=1 +ALPHAX_ALCHEMY_AUTO_MAPPING_CONFIDENCE=82 ALPHAX_ONCHAIN_CANDIDATE_ENABLED=1 ALPHAX_ONCHAIN_CANDIDATE_MIN_SCORE=70 ALPHAX_ONCHAIN_CANDIDATE_MIN_CONFIDENCE=70 diff --git a/AGENTS.md b/AGENTS.md index 9b4231c..cc81788 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -115,10 +115,11 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 ### 4.1.2 链上数据源 -- 当前链上主数据源是 NodeReal,入口在 `app/services/nodereal_client.py` 和 `app/services/onchain_monitor.py`。 -- 默认只跑 `ALPHAX_ONCHAIN_PROVIDER=nodereal`,并通过 `ALPHAX_NODEREAL_API_KEY` 访问 EVM JSON-RPC / Enhanced API。 +- 当前链上主数据源支持 NodeReal 与 Alchemy,入口分别是 `app/services/nodereal_client.py`、`app/services/alchemy_client.py` 和 `app/services/onchain_monitor.py`。 +- 默认仍可跑 `ALPHAX_ONCHAIN_PROVIDER=nodereal`;如果 NodeReal 额度受限,可切到 `ALPHAX_ONCHAIN_PROVIDER=alchemy` 并设置 `ALPHAX_ALCHEMY_API_KEY`;并行模式可用 `ALPHAX_ONCHAIN_PROVIDER=nodereal,alchemy`。 +- NodeReal 通过 `ALPHAX_NODEREAL_API_KEY` 访问 EVM JSON-RPC / Enhanced API;Alchemy 通过 `ALPHAX_ALCHEMY_API_KEY` 访问 Ethereum / BSC 标准 EVM JSON-RPC。 - DEX Screener、Etherscan、Helius 已从运行时代码链路移除;当前只做 Ethereum / BSC 的 NodeReal 采集。 -- NodeReal 原始 Transfer 会先记录到 `onchain_raw_events`,再通过 ERC-20 `symbol/name/decimals` 自动尝试映射到交易所 `XXX/USDT`,人工 `ALPHAX_ONCHAIN_TOKEN_MAPPINGS` 只作为兜底。 +- NodeReal / Alchemy 原始 Transfer 会先记录到 `onchain_raw_events`,再通过 ERC-20 `symbol/name/decimals` 自动尝试映射到交易所 `XXX/USDT`,人工 `ALPHAX_ONCHAIN_TOKEN_MAPPINGS` 只作为兜底。 - 新增链上信号优先落到 `onchain_token_metrics` / `onchain_events`,不要直接创建推荐;高质量事件仍通过 `event_news` 进入技术检查。 ### 4.2 Web/API diff --git a/app/config/system_config.py b/app/config/system_config.py index 96e6919..053193e 100644 --- a/app/config/system_config.py +++ b/app/config/system_config.py @@ -78,6 +78,9 @@ def default_onchain_config(default_chains=("ethereum", "bsc")): "nodereal_enabled": _env_bool("ALPHAX_NODEREAL_ENABLED", True), "nodereal_chains": _env_list("ALPHAX_NODEREAL_CHAINS", ("ethereum", "bsc")), "nodereal_api_key_env": "ALPHAX_NODEREAL_API_KEY", + "alchemy_enabled": _env_bool("ALPHAX_ALCHEMY_ENABLED", False), + "alchemy_chains": _env_list("ALPHAX_ALCHEMY_CHAINS", ("ethereum", "bsc")), + "alchemy_api_key_env": "ALPHAX_ALCHEMY_API_KEY", "token_mappings_env": "ALPHAX_ONCHAIN_TOKEN_MAPPINGS", "token_mappings": [], "nodereal_log_block_lookback": _env_int("ALPHAX_NODEREAL_LOG_BLOCK_LOOKBACK", 120), @@ -87,6 +90,14 @@ def default_onchain_config(default_chains=("ethereum", "bsc")): "nodereal_raw_max_logs_per_chain": _env_int("ALPHAX_NODEREAL_RAW_MAX_LOGS_PER_CHAIN", 30), "nodereal_auto_mapping_enabled": _env_bool("ALPHAX_NODEREAL_AUTO_MAPPING_ENABLED", True), "nodereal_auto_mapping_confidence": _env_int("ALPHAX_NODEREAL_AUTO_MAPPING_CONFIDENCE", 82), + "alchemy_log_block_lookback": _env_int("ALPHAX_ALCHEMY_LOG_BLOCK_LOOKBACK", 9), + "alchemy_max_logs_per_token": _env_int("ALPHAX_ALCHEMY_MAX_LOGS_PER_TOKEN", 25), + "alchemy_raw_transfer_enabled": _env_bool("ALPHAX_ALCHEMY_RAW_TRANSFER_ENABLED", True), + "alchemy_raw_chains": _env_list("ALPHAX_ALCHEMY_RAW_CHAINS", ("ethereum",)), + "alchemy_raw_block_lookback": _env_int("ALPHAX_ALCHEMY_RAW_BLOCK_LOOKBACK", 1), + "alchemy_raw_max_logs_per_chain": _env_int("ALPHAX_ALCHEMY_RAW_MAX_LOGS_PER_CHAIN", 8), + "alchemy_auto_mapping_enabled": _env_bool("ALPHAX_ALCHEMY_AUTO_MAPPING_ENABLED", True), + "alchemy_auto_mapping_confidence": _env_int("ALPHAX_ALCHEMY_AUTO_MAPPING_CONFIDENCE", 82), "candidate_enabled": _env_bool("ALPHAX_ONCHAIN_CANDIDATE_ENABLED", True), "candidate_min_score": _env_float("ALPHAX_ONCHAIN_CANDIDATE_MIN_SCORE", 70), "candidate_min_confidence": _env_int("ALPHAX_ONCHAIN_CANDIDATE_MIN_CONFIDENCE", 70), diff --git a/app/db/onchain_db.py b/app/db/onchain_db.py index c7b5ab5..03d7a5e 100644 --- a/app/db/onchain_db.py +++ b/app/db/onchain_db.py @@ -467,6 +467,7 @@ def get_onchain_provider_status(hours=24): hours = int(hours or 24) cutoff = (datetime.now() - timedelta(hours=hours)).isoformat() nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY") + alchemy_env = str(cfg.get("alchemy_api_key_env") or "ALPHAX_ALCHEMY_API_KEY") conn = get_conn() try: raw_total = conn.execute("SELECT COUNT(*) FROM onchain_raw_events WHERE detected_at >= %s", (cutoff,)).fetchone()[0] @@ -545,9 +546,15 @@ def get_onchain_provider_status(hours=24): summary = _load(last_onchain.get("summary_json") if last_onchain else "{}", {}) if last_onchain else {} last_error = last_onchain.get("error_message") if last_onchain else "" provider = str(cfg.get("provider") or "nodereal").strip().lower() - nodereal_enabled = bool(cfg.get("nodereal_enabled", True)) and provider == "nodereal" + requested = {p.strip() for p in provider.split(",") if p.strip()} + if requested & {"all", "multi", "both"}: + requested = {"nodereal", "alchemy"} + nodereal_enabled = bool(cfg.get("nodereal_enabled", True)) and "nodereal" in requested + alchemy_enabled = bool(cfg.get("alchemy_enabled", False)) and "alchemy" in requested nodereal_metrics = int(sum(row["count"] for row in metric_sources if row["source"] == "nodereal")) nodereal_signals = int(sum(row["count"] for row in signal_sources if row["source"] == "nodereal")) + alchemy_metrics = int(sum(row["count"] for row in metric_sources if row["source"] == "alchemy")) + alchemy_signals = int(sum(row["count"] for row in signal_sources if row["source"] == "alchemy")) providers = [ { "provider": "nodereal", @@ -566,6 +573,23 @@ def get_onchain_provider_status(hours=24): last_error if "nodereal" in str(last_error).lower() else "", ), }, + { + "provider": "alchemy", + "label": "Alchemy", + "enabled": alchemy_enabled, + "api_key_present": bool(os.getenv(alchemy_env, "").strip()), + "implemented": True, + "role": "EVM 备用/并行链上数据源:Transfer 日志、大额转账、ERC-20 自动映射", + "raw_events": int(raw_total or 0), + "metrics": alchemy_metrics, + "signals": alchemy_signals, + "status": _provider_status_label( + alchemy_enabled, + True, + int(raw_total or 0) + alchemy_metrics + alchemy_signals, + last_error if "alchemy" in str(last_error).lower() else "", + ), + }, ] return { "hours": hours, diff --git a/app/services/alchemy_client.py b/app/services/alchemy_client.py new file mode 100644 index 0000000..dd06213 --- /dev/null +++ b/app/services/alchemy_client.py @@ -0,0 +1,85 @@ +"""Small JSON-RPC client for Alchemy EVM endpoints.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import requests + + +DEFAULT_ALCHEMY_CHAIN_ENDPOINTS = { + "ethereum": "https://eth-mainnet.g.alchemy.com/v2/{api_key}", + "bsc": "https://bnb-mainnet.g.alchemy.com/v2/{api_key}", +} + + +@dataclass(frozen=True) +class AlchemyConfig: + api_key: str + timeout: int = 15 + endpoints: dict[str, str] | None = None + + +class AlchemyClient: + def __init__(self, config: AlchemyConfig): + self.config = config + self.endpoints = {**DEFAULT_ALCHEMY_CHAIN_ENDPOINTS, **(config.endpoints or {})} + + def supports_chain(self, chain: str) -> bool: + return bool(self._endpoint(chain)) + + def call(self, chain: str, method: str, params: list[Any] | None = None) -> Any: + endpoint = self._endpoint(chain) + if not endpoint: + raise ValueError(f"alchemy_chain_not_configured:{chain}") + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params or [], + } + resp = requests.post( + endpoint, + json=payload, + timeout=self.config.timeout, + headers={"Content-Type": "application/json", "User-Agent": "AlphaX-Agent-Crypto/1.0"}, + ) + if resp.status_code >= 400: + raise RuntimeError(f"alchemy_http_{resp.status_code}:{resp.text[:200]}") + data = resp.json() + if data.get("error"): + raise RuntimeError(f"alchemy_rpc_error:{data['error']}") + return data.get("result") + + def block_number(self, chain: str) -> int: + return _hex_to_int(self.call(chain, "eth_blockNumber", [])) + + def get_logs(self, chain: str, log_filter: dict[str, Any]) -> list[dict[str, Any]]: + result = self.call(chain, "eth_getLogs", [log_filter]) + return result if isinstance(result, list) else [] + + def eth_call(self, chain: str, to_address: str, data: str, block: str = "latest") -> str: + result = self.call(chain, "eth_call", [{"to": to_address, "data": data}, block]) + return str(result or "") + + def _endpoint(self, chain: str) -> str: + chain_key = str(chain or "").lower().strip() + template = self.endpoints.get(chain_key, "") + if not template or not self.config.api_key: + return "" + return template.format(api_key=self.config.api_key) + + +def _hex_to_int(value: Any) -> int: + if value is None: + return 0 + if isinstance(value, int): + return value + text = str(value).strip() + if not text: + return 0 + try: + return int(text, 16) if text.startswith("0x") else int(text) + except Exception: + return 0 diff --git a/app/services/onchain_monitor.py b/app/services/onchain_monitor.py index 49b6032..3fcffb4 100644 --- a/app/services/onchain_monitor.py +++ b/app/services/onchain_monitor.py @@ -29,6 +29,7 @@ from app.db.onchain_db import ( ) from app.services.event_driven_screener import _event_hash as event_hash from app.services.event_driven_screener import _tradable_symbol, init_event_tables +from app.services.alchemy_client import AlchemyClient, AlchemyConfig, DEFAULT_ALCHEMY_CHAIN_ENDPOINTS from app.services.nodereal_client import DEFAULT_CHAIN_ENDPOINTS, NodeRealClient, NodeRealConfig @@ -78,6 +79,7 @@ def get_onchain_params(): else: chains = [str(x).strip().lower() for x in chains_raw if str(x).strip()] nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY") + alchemy_env = str(cfg.get("alchemy_api_key_env") or "ALPHAX_ALCHEMY_API_KEY") token_mappings_env = str(cfg.get("token_mappings_env") or "ALPHAX_ONCHAIN_TOKEN_MAPPINGS") return { "enabled": bool(cfg.get("enabled", False)), @@ -88,6 +90,10 @@ def get_onchain_params(): "nodereal_chains": _normalize_chain_list(cfg.get("nodereal_chains") or ("ethereum", "bsc")), "nodereal_api_key": os.getenv(nodereal_env, "").strip(), "nodereal_api_key_env": nodereal_env, + "alchemy_enabled": bool(cfg.get("alchemy_enabled", False)), + "alchemy_chains": _normalize_chain_list(cfg.get("alchemy_chains") or ("ethereum", "bsc")), + "alchemy_api_key": os.getenv(alchemy_env, "").strip(), + "alchemy_api_key_env": alchemy_env, "token_mappings": _load_token_mappings(cfg.get("token_mappings"), os.getenv(token_mappings_env, "")), "token_mappings_env": token_mappings_env, "nodereal_log_block_lookback": int(cfg.get("nodereal_log_block_lookback") or 120), @@ -97,6 +103,14 @@ def get_onchain_params(): "nodereal_raw_max_logs_per_chain": int(cfg.get("nodereal_raw_max_logs_per_chain") or 30), "nodereal_auto_mapping_enabled": bool(cfg.get("nodereal_auto_mapping_enabled", True)), "nodereal_auto_mapping_confidence": int(cfg.get("nodereal_auto_mapping_confidence") or 82), + "alchemy_log_block_lookback": int(cfg.get("alchemy_log_block_lookback") or 120), + "alchemy_max_logs_per_token": int(cfg.get("alchemy_max_logs_per_token") or 25), + "alchemy_raw_transfer_enabled": bool(cfg.get("alchemy_raw_transfer_enabled", True)), + "alchemy_raw_chains": _normalize_chain_list(cfg.get("alchemy_raw_chains") or ("ethereum",)), + "alchemy_raw_block_lookback": int(cfg.get("alchemy_raw_block_lookback") or 1), + "alchemy_raw_max_logs_per_chain": int(cfg.get("alchemy_raw_max_logs_per_chain") or 30), + "alchemy_auto_mapping_enabled": bool(cfg.get("alchemy_auto_mapping_enabled", True)), + "alchemy_auto_mapping_confidence": int(cfg.get("alchemy_auto_mapping_confidence") or 82), "candidate_enabled": bool(cfg.get("candidate_enabled", True)), "candidate_min_score": float(cfg.get("candidate_min_score") or 70), "candidate_min_confidence": int(cfg.get("candidate_min_confidence") or 70), @@ -356,9 +370,10 @@ def _read_erc20_metadata(client, chain, contract): return metadata -def _auto_map_nodereal_contract(client, chain, contract, cfg=None): +def _auto_map_evm_contract(client, chain, contract, cfg=None, provider="nodereal"): cfg = cfg or get_onchain_params() - if not cfg.get("nodereal_auto_mapping_enabled", True): + provider = str(provider or "nodereal").lower() + if not cfg.get(f"{provider}_auto_mapping_enabled", True): return None existing = find_mapping_by_contract(chain, contract) if existing: @@ -368,12 +383,13 @@ def _auto_map_nodereal_contract(client, chain, contract, cfg=None): if not _is_auto_mapping_symbol_allowed(base, metadata.get("name")): return None symbol = normalize_symbol(base) - confidence = max(1, min(95, int(cfg.get("nodereal_auto_mapping_confidence") or 82))) + confidence = max(1, min(95, int(cfg.get(f"{provider}_auto_mapping_confidence") or 82))) + source = f"{provider}_erc20_metadata" mapping_id = onchain_db.upsert_token_mapping( symbol=symbol, chain=chain, contract_address=contract, - source="nodereal_erc20_metadata", + source=source, confidence=confidence, raw=metadata, is_active=True, @@ -385,12 +401,16 @@ def _auto_map_nodereal_contract(client, chain, contract, cfg=None): "symbol": symbol, "chain": str(chain or "").lower(), "contract_address": contract, - "source": "nodereal_erc20_metadata", + "source": source, "confidence": confidence, "raw_json": json.dumps(metadata, ensure_ascii=False), } +def _auto_map_nodereal_contract(client, chain, contract, cfg=None): + return _auto_map_evm_contract(client, chain, contract, cfg=cfg, provider="nodereal") + + def _nodereal_client(cfg=None): cfg = cfg or get_onchain_params() return NodeRealClient( @@ -402,8 +422,20 @@ def _nodereal_client(cfg=None): ) -def _event_from_nodereal_transfer(log, mapping, cfg=None): +def _alchemy_client(cfg=None): cfg = cfg or get_onchain_params() + return AlchemyClient( + AlchemyConfig( + api_key=cfg.get("alchemy_api_key") or "", + timeout=int(cfg.get("timeout") or 15), + endpoints=dict(DEFAULT_ALCHEMY_CHAIN_ENDPOINTS), + ) + ) + + +def _event_from_evm_transfer(log, mapping, cfg=None, source="nodereal"): + cfg = cfg or get_onchain_params() + source = str(source or "nodereal").lower() topics = log.get("topics") or [] if len(topics) < 3: return None @@ -439,13 +471,18 @@ def _event_from_nodereal_transfer(log, mapping, cfg=None): "confidence": 76, "severity": "A", "detected_at": _now().isoformat(timespec="seconds"), - "source": "nodereal", + "source": source, "url": _chain_explorer_tx_url(chain, tx_hash), "raw": log, } -def _raw_event_from_nodereal_transfer(log, chain): +def _event_from_nodereal_transfer(log, mapping, cfg=None): + return _event_from_evm_transfer(log, mapping, cfg=cfg, source="nodereal") + + +def _raw_event_from_evm_transfer(log, chain, source="nodereal"): + source = str(source or "nodereal").lower() topics = log.get("topics") or [] if len(topics) < 3: return None @@ -456,14 +493,15 @@ def _raw_event_from_nodereal_transfer(log, chain): return None from_addr = _topic_to_address(topics[1]) to_addr = _topic_to_address(topics[2]) + source_label = "Alchemy" if source == "alchemy" else "NodeReal" return { - "source": "nodereal", + "source": source, "chain": str(chain or "").lower(), "event_type": "evm_transfer", "token_address": contract, "symbol_guess": "", "name": "", - "title": "NodeReal ERC-20 原始转账", + "title": f"{source_label} ERC-20 原始转账", "description": f"合约 {_short_addr(contract)} · {_short_addr(from_addr)} -> {_short_addr(to_addr)}", "url": _chain_explorer_tx_url(chain, tx_hash), "amount": amount_raw, @@ -476,7 +514,11 @@ def _raw_event_from_nodereal_transfer(log, chain): } -def _apply_raw_event_mapping(raw_event, client=None, cfg=None): +def _raw_event_from_nodereal_transfer(log, chain): + return _raw_event_from_evm_transfer(log, chain, source="nodereal") + + +def _apply_raw_event_mapping(raw_event, client=None, cfg=None, provider=None): item = dict(raw_event or {}) chain = str(item.get("chain") or "").lower() contract = str(item.get("token_address") or "").strip() @@ -484,7 +526,7 @@ def _apply_raw_event_mapping(raw_event, client=None, cfg=None): return item mapping = find_mapping_by_contract(chain, contract) if not mapping and client: - mapping = _auto_map_nodereal_contract(client, chain, contract, cfg=cfg) + mapping = _auto_map_evm_contract(client, chain, contract, cfg=cfg, provider=provider or item.get("source") or "nodereal") if mapping: item["mapped_symbol"] = normalize_symbol(mapping.get("symbol")) item["mapping_status"] = "mapped" @@ -579,7 +621,7 @@ def fetch_nodereal_events(limit=60): for mapping in mappings[: int(limit or 60)]: chain = str(mapping.get("chain") or "").lower() contract = str(mapping.get("contract_address") or "").strip() - if not contract: + if not _is_evm_address(contract): continue try: holder_count = client.token_holder_count(chain, contract) @@ -630,6 +672,81 @@ def fetch_nodereal_events(limit=60): } +def fetch_alchemy_events(limit=60): + cfg = get_onchain_params() + if not cfg.get("alchemy_enabled", False): + return {"metrics": [], "events": [], "raw_events": [], "errors": ["alchemy_disabled"]} + if not cfg.get("alchemy_api_key"): + return {"metrics": [], "events": [], "raw_events": [], "errors": ["alchemy_api_key_missing"]} + seed_result = seed_configured_token_mappings(cfg) + client = _alchemy_client(cfg) + raw_result = fetch_alchemy_raw_events(client=client, cfg=cfg, limit=limit) + enabled_chains = set(cfg.get("alchemy_chains") or DEFAULT_CHAINS) + all_mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) + chain_mappings = [m for m in all_mappings if str(m.get("chain") or "").lower() in enabled_chains] + mappings = [] + unsupported_chains = set() + for mapping in chain_mappings: + chain = str(mapping.get("chain") or "").lower() + if client.supports_chain(chain): + mappings.append(mapping) + else: + unsupported_chains.add(chain) + events = [] + errors = list(seed_result.get("errors") or []) + list(raw_result.get("errors") or []) + diagnostics = { + "seeded_mappings": seed_result.get("seeded", 0), + "mapping_total": len(all_mappings), + "chain_mapping_total": len(chain_mappings), + "supported_mapping_total": len(mappings), + "enabled_chains": sorted(enabled_chains), + "unsupported_chains": sorted(unsupported_chains), + } + lookback = max(1, int(cfg.get("alchemy_log_block_lookback") or 120)) + max_logs = max(1, int(cfg.get("alchemy_max_logs_per_token") or 25)) + for mapping in mappings[: int(limit or 60)]: + chain = str(mapping.get("chain") or "").lower() + contract = str(mapping.get("contract_address") or "").strip() + if not _is_evm_address(contract): + continue + try: + latest = client.block_number(chain) + if latest <= 0: + continue + logs = client.get_logs( + chain, + { + "address": contract, + "fromBlock": hex(max(0, latest - lookback)), + "toBlock": hex(latest), + "topics": [TRANSFER_TOPIC], + }, + ) + for log in logs[:max_logs]: + if not isinstance(log, dict): + continue + event = _event_from_evm_transfer(log, mapping, cfg=cfg, source="alchemy") + if not event: + continue + if insert_onchain_event(event): + events.append(event) + except Exception as exc: + errors.append(f"{mapping.get('symbol')}:alchemy_logs:{str(exc)[:160]}") + if not all_mappings: + diagnostics["mapping_note"] = "no_strategy_mappings_raw_events_only" + elif not chain_mappings: + diagnostics["mapping_note"] = "no_enabled_chain_mappings_raw_events_only" + elif not mappings: + diagnostics["mapping_note"] = "no_supported_mappings_raw_events_only" + return { + "metrics": [], + "events": events, + "raw_events": raw_result.get("raw_events") or [], + "errors": errors, + "diagnostics": diagnostics, + } + + def fetch_nodereal_raw_events(client=None, cfg=None, limit=60): cfg = cfg or get_onchain_params() if not cfg.get("nodereal_raw_transfer_enabled", True): @@ -659,9 +776,10 @@ def fetch_nodereal_raw_events(client=None, cfg=None, limit=60): continue item = _raw_event_from_nodereal_transfer(log, chain) if item: - raw_items.append(_apply_raw_event_mapping(item, client=client, cfg=cfg)) + raw_items.append(item) raw_items.sort(key=lambda item: item.get("amount") or 0, reverse=True) for item in raw_items[:per_chain]: + item = _apply_raw_event_mapping(item, client=client, cfg=cfg, provider="nodereal") if insert_onchain_raw_event(item): inserted.append(item) except Exception as exc: @@ -669,6 +787,46 @@ def fetch_nodereal_raw_events(client=None, cfg=None, limit=60): return {"raw_events": inserted, "errors": errors} +def fetch_alchemy_raw_events(client=None, cfg=None, limit=60): + cfg = cfg or get_onchain_params() + if not cfg.get("alchemy_raw_transfer_enabled", True): + return {"raw_events": [], "errors": []} + client = client or _alchemy_client(cfg) + chains = [c for c in (cfg.get("alchemy_raw_chains") or ("ethereum",)) if client.supports_chain(c)] + lookback = max(0, min(12, int(cfg.get("alchemy_raw_block_lookback") or 1))) + per_chain = max(1, min(int(cfg.get("alchemy_raw_max_logs_per_chain") or 30), int(limit or 60))) + inserted = [] + errors = [] + for chain in chains: + try: + latest = client.block_number(chain) + if latest <= 0: + continue + logs = client.get_logs( + chain, + { + "fromBlock": hex(max(0, latest - lookback)), + "toBlock": hex(latest), + "topics": [TRANSFER_TOPIC], + }, + ) + raw_items = [] + for log in logs: + if not isinstance(log, dict): + continue + item = _raw_event_from_evm_transfer(log, chain, source="alchemy") + if item: + raw_items.append(item) + raw_items.sort(key=lambda item: item.get("amount") or 0, reverse=True) + for item in raw_items[:per_chain]: + item = _apply_raw_event_mapping(item, client=client, cfg=cfg, provider="alchemy") + if insert_onchain_raw_event(item): + inserted.append(item) + except Exception as exc: + errors.append(f"{chain}:alchemy_raw_logs:{str(exc)[:160]}") + return {"raw_events": inserted, "errors": errors} + + def _short_addr(value): value = str(value or "") if len(value) <= 12: @@ -676,6 +834,17 @@ def _short_addr(value): return value[:6] + "..." + value[-4:] +def _is_evm_address(value): + text = str(value or "").strip() + if len(text) != 42 or not text.startswith("0x"): + return False + try: + int(text[2:], 16) + return True + except Exception: + return False + + def ingest_normalized_events(events): """Test/integration helper for provider adapters.""" init_db() @@ -691,6 +860,19 @@ def ingest_normalized_events(events): return {"inserted": len(inserted), "queued": queued.get("queued", 0), "events": inserted, "candidate_result": queued} +def _enabled_onchain_providers(cfg): + raw = str(cfg.get("provider") or "nodereal").strip().lower() + requested = [p.strip() for p in raw.split(",") if p.strip()] + if any(p in {"all", "multi", "both"} for p in requested): + requested = ["nodereal", "alchemy"] + providers = [] + if "nodereal" in requested and cfg.get("nodereal_enabled", True): + providers.append("nodereal") + if "alchemy" in requested and cfg.get("alchemy_enabled", False): + providers.append("alchemy") + return providers or ["nodereal"] + + def _candidate_title(event): label = event.get("signal_label") or signal_label(event.get("signal_code")) value = _safe_float(event.get("value_usd")) @@ -709,96 +891,111 @@ def enqueue_onchain_candidates(min_score=None, min_confidence=None, cooldown_hou init_event_tables() cutoff = (_now() - timedelta(hours=24)).isoformat() conn = get_conn() - rows = conn.execute( - """ - SELECT e.*, - COALESCE(( - SELECT m.onchain_score FROM onchain_token_metrics m - WHERE m.symbol=e.symbol AND m.chain=e.chain - ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1 - ), 0) AS latest_onchain_score, - COALESCE(( - SELECT m.risk_score FROM onchain_token_metrics m - WHERE m.symbol=e.symbol AND m.chain=e.chain - ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1 - ), 0) AS latest_risk_score - FROM onchain_events e - WHERE e.status IN ('new', 'candidate_failed') - AND e.detected_at >= %s - AND e.direction='positive' - ORDER BY e.confidence DESC, e.value_usd DESC, e.detected_at::timestamp DESC - LIMIT %s - """, - (cutoff, int(limit or 20)), - ).fetchall() queued = [] skipped_ids = [] - now = _now().isoformat(timespec="seconds") - cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat() - for row in rows: - event = dict(row) - symbol = normalize_symbol(event.get("symbol")) - if not symbol or not _tradable_symbol(symbol): - skipped_ids.append(event["id"]) - continue - score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence"))) - if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0): - continue - recent = conn.execute( + errors = [] + try: + rows = conn.execute( """ - SELECT id FROM event_news - WHERE source='onchain' AND symbol=%s AND detected_at >= %s - LIMIT 1 + SELECT e.*, + COALESCE(( + SELECT m.onchain_score FROM onchain_token_metrics m + WHERE m.symbol=e.symbol AND m.chain=e.chain + ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1 + ), 0) AS latest_onchain_score, + COALESCE(( + SELECT m.risk_score FROM onchain_token_metrics m + WHERE m.symbol=e.symbol AND m.chain=e.chain + ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1 + ), 0) AS latest_risk_score + FROM onchain_events e + WHERE e.status IN ('new', 'candidate_failed') + AND e.detected_at >= %s + AND e.direction='positive' + ORDER BY e.confidence DESC, e.value_usd DESC, e.detected_at::timestamp DESC + LIMIT %s """, - (symbol, cooldown_cutoff), - ).fetchone() - if recent: - skipped_ids.append(event["id"]) - continue - title = _candidate_title(event) - h = event_hash("onchain", title, symbol) - try: - conn.execute( + (cutoff, int(limit or 20)), + ).fetchall() + now = _now().isoformat(timespec="seconds") + cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat() + for row in rows: + event = dict(row) + symbol = normalize_symbol(event.get("symbol")) + if not symbol or not _tradable_symbol(symbol): + skipped_ids.append(event["id"]) + continue + score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence"))) + if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0): + continue + recent = conn.execute( """ - INSERT INTO event_news - (event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed) - VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0) + SELECT id FROM event_news + WHERE source='onchain' AND symbol=%s AND detected_at >= %s + LIMIT 1 """, - ( - h, - symbol, - title, - event.get("url") or "", - event.get("detected_at") or now, - now, - event.get("severity") or "A", - json.dumps( - { - "onchain_event_id": event.get("id"), - "chain": event.get("chain"), - "signal_code": event.get("signal_code"), - "signal_label": event.get("signal_label"), - "confidence": event.get("confidence"), - "value_usd": event.get("value_usd"), - "onchain_score": event.get("latest_onchain_score"), - "risk_score": event.get("latest_risk_score"), - }, - ensure_ascii=False, - ), - ), + (symbol, cooldown_cutoff), + ).fetchone() + if recent: + skipped_ids.append(event["id"]) + continue + title = _candidate_title(event) + h = event_hash("onchain", title, symbol) + try: + # A single bad/duplicate candidate must not poison the whole + # PostgreSQL transaction; nested transaction becomes SAVEPOINT. + with conn.transaction(): + inserted = conn.execute( + """ + INSERT INTO event_news + (event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed) + VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0) + ON CONFLICT(event_hash) DO NOTHING + RETURNING id + """, + ( + h, + symbol, + title, + event.get("url") or "", + event.get("detected_at") or now, + now, + event.get("severity") or "A", + json.dumps( + { + "onchain_event_id": event.get("id"), + "chain": event.get("chain"), + "signal_code": event.get("signal_code"), + "signal_label": event.get("signal_label"), + "confidence": event.get("confidence"), + "value_usd": event.get("value_usd"), + "onchain_score": event.get("latest_onchain_score"), + "risk_score": event.get("latest_risk_score"), + }, + ensure_ascii=False, + ), + ), + ).fetchone() + if inserted: + conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),)) + queued.append(symbol) + else: + skipped_ids.append(event["id"]) + except Exception as exc: + errors.append(f"{symbol}:candidate_enqueue:{str(exc)[:160]}") + skipped_ids.append(event["id"]) + if skipped_ids: + conn.execute( + "UPDATE onchain_events SET status='candidate_skipped' WHERE id IN (" + ",".join(["%s"] * len(skipped_ids)) + ")", + tuple(skipped_ids), ) - conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),)) - queued.append(symbol) - except Exception: - skipped_ids.append(event["id"]) - if skipped_ids: - conn.execute( - "UPDATE onchain_events SET status='candidate_skipped' WHERE id IN (" + ",".join(["%s"] * len(skipped_ids)) + ")", - tuple(skipped_ids), - ) - conn.commit() - conn.close() - return {"queued": len(queued), "skipped": len(skipped_ids), "symbols": queued} + conn.commit() + return {"queued": len(queued), "skipped": len(skipped_ids), "symbols": queued, "errors": errors} + except Exception: + conn.rollback() + raise + finally: + conn.close() def run_once(limit=60): @@ -816,21 +1013,39 @@ def run_once(limit=60): "check_time": _now().isoformat(), } if cfg.get("enabled"): - node = fetch_nodereal_events(limit=limit) - output["metrics_count"] += len(node.get("metrics") or []) - output["events_count"] += len(node.get("events") or []) - output["raw_events_count"] += len(node.get("raw_events") or []) - output["errors"].extend(node.get("errors") or []) + provider_results = {} + for provider in _enabled_onchain_providers(cfg): + if provider == "alchemy": + node = fetch_alchemy_events(limit=limit) + else: + node = fetch_nodereal_events(limit=limit) + provider_results[provider] = { + "metrics_count": len(node.get("metrics") or []), + "events_count": len(node.get("events") or []), + "raw_events_count": len(node.get("raw_events") or []), + "diagnostics": node.get("diagnostics") or {}, + } + output["metrics_count"] += len(node.get("metrics") or []) + output["events_count"] += len(node.get("events") or []) + output["raw_events_count"] += len(node.get("raw_events") or []) + output["errors"].extend(node.get("errors") or []) + output["provider_results"] = provider_results output["discovered_mappings"] = 0 if output.get("discovered_mappings"): output["status"] = "bootstrapped" - node = fetch_nodereal_events(limit=limit) - output["metrics_count"] = len(node.get("metrics") or []) - output["events_count"] = len(node.get("events") or []) - output["errors"].extend(node.get("errors") or []) + output["metrics_count"] = 0 + output["events_count"] = 0 + output["raw_events_count"] = 0 + for provider in _enabled_onchain_providers(cfg): + node = fetch_alchemy_events(limit=limit) if provider == "alchemy" else fetch_nodereal_events(limit=limit) + output["metrics_count"] += len(node.get("metrics") or []) + output["events_count"] += len(node.get("events") or []) + output["raw_events_count"] += len(node.get("raw_events") or []) + output["errors"].extend(node.get("errors") or []) queued = enqueue_onchain_candidates() output["candidate_queued"] = queued.get("queued", 0) output["candidate_symbols"] = queued.get("symbols", []) + output["errors"].extend(queued.get("errors") or []) if not output["metrics_count"] and not output["events_count"] and not output["raw_events_count"]: output["status"] = "no_onchain_data" log_cron_run( @@ -858,6 +1073,7 @@ __all__ = [ "POSITIVE_SIGNALS", "RISK_SIGNALS", "enqueue_onchain_candidates", + "fetch_alchemy_events", "fetch_nodereal_events", "get_onchain_params", "ingest_normalized_events", diff --git a/tests/test_onchain_tracking.py b/tests/test_onchain_tracking.py index 5490a59..098f615 100644 --- a/tests/test_onchain_tracking.py +++ b/tests/test_onchain_tracking.py @@ -2,7 +2,7 @@ import json import os import sqlite3 import sys -from datetime import datetime +from datetime import datetime, timedelta from fastapi.testclient import TestClient @@ -93,6 +93,77 @@ def test_onchain_candidate_enqueues_event_news_not_recommendation(monkeypatch, t assert status == "candidate_queued" +def test_onchain_candidate_duplicate_event_hash_does_not_abort_transaction(monkeypatch, tmp_path): + db_path = _temp_db(monkeypatch, tmp_path) + old_time = (datetime.now() - timedelta(hours=12)).isoformat() + event_time = datetime.now().isoformat() + stale_event = { + "id": 9001, + "symbol": "DUP/USDT", + "signal_label": "DEX交易量放大", + "signal_code": "dex_volume_spike", + "value_usd": 500000, + } + duplicate_title = onchain_monitor._candidate_title(stale_event) + duplicate_hash = onchain_monitor.event_hash("onchain", duplicate_title, "DUP/USDT") + conn = altcoin_db.get_conn() + conn.execute( + """ + INSERT INTO event_news + (event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed) + VALUES (%s, 'onchain', %s, %s, '', %s, %s, 'A', 'onchain_candidate', '{}', 0) + """, + (duplicate_hash, "DUP/USDT", duplicate_title, old_time, old_time), + ) + conn.commit() + conn.close() + first_id = onchain_db.insert_onchain_event( + { + "chain": "ethereum", + "symbol": "DUP/USDT", + "contract_address": "0xdup", + "signal_code": "dex_volume_spike", + "signal_label": "DEX交易量放大", + "direction": "positive", + "value_usd": 500000, + "confidence": 90, + "severity": "A", + "detected_at": event_time, + "source": "test", + } + ) + second_id = onchain_db.insert_onchain_event( + { + "chain": "ethereum", + "symbol": "OK/USDT", + "contract_address": "0xok", + "signal_code": "dex_volume_spike", + "direction": "positive", + "value_usd": 600000, + "confidence": 91, + "severity": "A", + "detected_at": event_time, + "source": "test", + } + ) + + result = onchain_monitor.enqueue_onchain_candidates(min_score=1, min_confidence=1, cooldown_hours=6) + + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + statuses = { + row["id"]: row["status"] + for row in conn.execute("SELECT id, status FROM onchain_events WHERE id IN (?, ?)", (first_id, second_id)).fetchall() + } + queued_news = conn.execute("SELECT * FROM event_news WHERE source='onchain' AND symbol='OK/USDT'").fetchone() + conn.close() + assert result["queued"] == 1 + assert result["skipped"] == 1 + assert statuses[first_id] == "candidate_skipped" + assert statuses[second_id] == "candidate_queued" + assert queued_news is not None + + def test_negative_onchain_signal_is_risk_context_only(monkeypatch, tmp_path): db_path = _temp_db(monkeypatch, tmp_path) onchain_db.insert_onchain_event( @@ -271,12 +342,13 @@ def test_token_detail_includes_mapped_raw_events(monkeypatch, tmp_path): def test_nodereal_events_generate_metrics_and_normalized_event(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_NODEREAL_API_KEY", "test-key") - onchain_db.upsert_token_mapping("ABC", "ethereum", "0xabc", source="manual", confidence=95) + contract = "0x0000000000000000000000000000000000000abc" + onchain_db.upsert_token_mapping("ABC", "ethereum", contract, source="manual", confidence=95) onchain_db.insert_token_metric( { "symbol": "ABC/USDT", "chain": "ethereum", - "contract_address": "0xabc", + "contract_address": contract, "window": "1h", "metric_time": datetime.now().isoformat(), "dex_volume_usd": 100000, @@ -292,7 +364,7 @@ def test_nodereal_events_generate_metrics_and_normalized_event(monkeypatch, tmp_ def token_holder_count(self, chain, contract): assert chain == "ethereum" - assert contract == "0xabc" + assert contract == "0x0000000000000000000000000000000000000abc" return 120 def block_number(self, chain): @@ -302,10 +374,10 @@ def test_nodereal_events_generate_metrics_and_normalized_event(monkeypatch, tmp_ def get_logs(self, chain, log_filter): if "address" not in log_filter: return [] - assert log_filter["address"] == "0xabc" + assert log_filter["address"] == "0x0000000000000000000000000000000000000abc" return [ { - "address": "0xabc", + "address": "0x0000000000000000000000000000000000000abc", "transactionHash": "0xtx", "data": hex(200000 * 10**18), "topics": [ @@ -402,6 +474,49 @@ def test_nodereal_records_raw_events_without_strategy_mappings(monkeypatch, tmp_ assert raw["items"][0]["event_type"] == "evm_transfer" +def test_alchemy_records_raw_events_without_strategy_mappings(monkeypatch, tmp_path): + monkeypatch.setenv("ALPHAX_ALCHEMY_API_KEY", "test-key") + monkeypatch.setenv("ALPHAX_ALCHEMY_ENABLED", "1") + monkeypatch.setenv("ALPHAX_ALCHEMY_CHAINS", "ethereum") + _temp_db(monkeypatch, tmp_path) + + class RawAlchemyClient: + def supports_chain(self, chain): + return chain == "ethereum" + + def block_number(self, chain): + return 1000 + + def get_logs(self, chain, log_filter): + if "address" in log_filter: + return [] + return [ + { + "address": "0xabc", + "transactionHash": "0xalchemyrawtx", + "data": hex(123456789), + "topics": [ + onchain_monitor.TRANSFER_TOPIC, + "0x0000000000000000000000001111111111111111111111111111111111111111", + "0x0000000000000000000000002222222222222222222222222222222222222222", + ], + } + ] + + monkeypatch.setattr(onchain_monitor, "_alchemy_client", lambda cfg=None: RawAlchemyClient()) + + result = onchain_monitor.fetch_alchemy_events(limit=10) + + assert result["errors"] == [] + assert result["events"] == [] + assert len(result["raw_events"]) == 1 + raw = onchain_db.list_onchain_raw_events(hours=50000) + assert raw["total"] == 1 + assert raw["items"][0]["source"] == "alchemy" + assert raw["items"][0]["mapping_status"] == "unmapped" + assert raw["items"][0]["event_type"] == "evm_transfer" + + def test_nodereal_auto_maps_raw_event_from_erc20_metadata(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_NODEREAL_API_KEY", "test-key") diff --git a/tests/test_runtime_config.py b/tests/test_runtime_config.py index e2b61ea..a543e9c 100644 --- a/tests/test_runtime_config.py +++ b/tests/test_runtime_config.py @@ -217,13 +217,19 @@ def test_llm_system_config_overrides_env_defaults(monkeypatch): def test_onchain_system_config_overrides_env(monkeypatch): monkeypatch.setenv("ALPHAX_ONCHAIN_ENABLED", "0") monkeypatch.setenv("TEST_NODEREAL_KEY", "nodereal-secret") + monkeypatch.setenv("TEST_ALCHEMY_KEY", "alchemy-secret") set_config("system", "onchain", { "enabled": True, + "provider": "alchemy", "chains": ["ethereum", "bsc"], "timeout": 9, "candidate_min_score": 88, "nodereal_api_key_env": "TEST_NODEREAL_KEY", "nodereal_raw_max_logs_per_chain": 12, + "alchemy_enabled": True, + "alchemy_chains": ["ethereum"], + "alchemy_api_key_env": "TEST_ALCHEMY_KEY", + "alchemy_raw_max_logs_per_chain": 9, }) params = get_onchain_params() @@ -234,6 +240,10 @@ def test_onchain_system_config_overrides_env(monkeypatch): assert params["candidate_min_score"] == 88 assert params["nodereal_api_key"] == "nodereal-secret" assert params["nodereal_raw_max_logs_per_chain"] == 12 + assert params["provider"] == "alchemy" + assert params["alchemy_api_key"] == "alchemy-secret" + assert params["alchemy_chains"] == ["ethereum"] + assert params["alchemy_raw_max_logs_per_chain"] == 9 def test_paper_trading_system_config_controls_account_model(monkeypatch):