This commit is contained in:
aaron 2026-05-24 11:28:07 +08:00
parent 870b068a5a
commit 0977696a5c
8 changed files with 590 additions and 116 deletions

View File

@ -42,11 +42,15 @@ ALPHAX_LLM_REVIEW_ENABLED=1
# 链上追踪运行时配置。默认关闭;开启后采集结果只作为发现/风控辅助。
ALPHAX_ONCHAIN_ENABLED=0
ALPHAX_ONCHAIN_PROVIDER=nodereal
# 可选:切换到 Alchemy 可设为 alchemy并行可设为 nodereal,alchemy。
ALPHAX_ONCHAIN_CHAINS=ethereum,bsc
ALPHAX_ONCHAIN_TIMEOUT=15
ALPHAX_NODEREAL_ENABLED=1
ALPHAX_NODEREAL_CHAINS=ethereum,bsc
ALPHAX_NODEREAL_API_KEY=
ALPHAX_ALCHEMY_ENABLED=0
ALPHAX_ALCHEMY_CHAINS=ethereum,bsc
ALPHAX_ALCHEMY_API_KEY=
# 可选:生产若 onchain_token_map 为空,可用 JSON 数组自举 NodeReal 合约映射。
# 示例:[{"symbol":"STORJ/USDT","chain":"ethereum","contract_address":"0x...","confidence":95}]
ALPHAX_ONCHAIN_TOKEN_MAPPINGS=
@ -57,6 +61,14 @@ ALPHAX_NODEREAL_RAW_BLOCK_LOOKBACK=1
ALPHAX_NODEREAL_RAW_MAX_LOGS_PER_CHAIN=30
ALPHAX_NODEREAL_AUTO_MAPPING_ENABLED=1
ALPHAX_NODEREAL_AUTO_MAPPING_CONFIDENCE=82
ALPHAX_ALCHEMY_LOG_BLOCK_LOOKBACK=9
ALPHAX_ALCHEMY_MAX_LOGS_PER_TOKEN=25
ALPHAX_ALCHEMY_RAW_TRANSFER_ENABLED=1
ALPHAX_ALCHEMY_RAW_CHAINS=ethereum
ALPHAX_ALCHEMY_RAW_BLOCK_LOOKBACK=1
ALPHAX_ALCHEMY_RAW_MAX_LOGS_PER_CHAIN=8
ALPHAX_ALCHEMY_AUTO_MAPPING_ENABLED=1
ALPHAX_ALCHEMY_AUTO_MAPPING_CONFIDENCE=82
ALPHAX_ONCHAIN_CANDIDATE_ENABLED=1
ALPHAX_ONCHAIN_CANDIDATE_MIN_SCORE=70
ALPHAX_ONCHAIN_CANDIDATE_MIN_CONFIDENCE=70

View File

@ -115,10 +115,11 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组
### 4.1.2 链上数据源
- 当前链上主数据源是 NodeReal入口在 `app/services/nodereal_client.py``app/services/onchain_monitor.py`
- 默认只跑 `ALPHAX_ONCHAIN_PROVIDER=nodereal`,并通过 `ALPHAX_NODEREAL_API_KEY` 访问 EVM JSON-RPC / Enhanced API。
- 当前链上主数据源支持 NodeReal 与 Alchemy入口分别是 `app/services/nodereal_client.py`、`app/services/alchemy_client.py` 和 `app/services/onchain_monitor.py`
- 默认仍可跑 `ALPHAX_ONCHAIN_PROVIDER=nodereal`;如果 NodeReal 额度受限,可切到 `ALPHAX_ONCHAIN_PROVIDER=alchemy` 并设置 `ALPHAX_ALCHEMY_API_KEY`;并行模式可用 `ALPHAX_ONCHAIN_PROVIDER=nodereal,alchemy`
- NodeReal 通过 `ALPHAX_NODEREAL_API_KEY` 访问 EVM JSON-RPC / Enhanced APIAlchemy 通过 `ALPHAX_ALCHEMY_API_KEY` 访问 Ethereum / BSC 标准 EVM JSON-RPC。
- DEX Screener、Etherscan、Helius 已从运行时代码链路移除;当前只做 Ethereum / BSC 的 NodeReal 采集。
- NodeReal 原始 Transfer 会先记录到 `onchain_raw_events`,再通过 ERC-20 `symbol/name/decimals` 自动尝试映射到交易所 `XXX/USDT`,人工 `ALPHAX_ONCHAIN_TOKEN_MAPPINGS` 只作为兜底。
- NodeReal / Alchemy 原始 Transfer 会先记录到 `onchain_raw_events`,再通过 ERC-20 `symbol/name/decimals` 自动尝试映射到交易所 `XXX/USDT`,人工 `ALPHAX_ONCHAIN_TOKEN_MAPPINGS` 只作为兜底。
- 新增链上信号优先落到 `onchain_token_metrics` / `onchain_events`,不要直接创建推荐;高质量事件仍通过 `event_news` 进入技术检查。
### 4.2 Web/API

View File

@ -78,6 +78,9 @@ def default_onchain_config(default_chains=("ethereum", "bsc")):
"nodereal_enabled": _env_bool("ALPHAX_NODEREAL_ENABLED", True),
"nodereal_chains": _env_list("ALPHAX_NODEREAL_CHAINS", ("ethereum", "bsc")),
"nodereal_api_key_env": "ALPHAX_NODEREAL_API_KEY",
"alchemy_enabled": _env_bool("ALPHAX_ALCHEMY_ENABLED", False),
"alchemy_chains": _env_list("ALPHAX_ALCHEMY_CHAINS", ("ethereum", "bsc")),
"alchemy_api_key_env": "ALPHAX_ALCHEMY_API_KEY",
"token_mappings_env": "ALPHAX_ONCHAIN_TOKEN_MAPPINGS",
"token_mappings": [],
"nodereal_log_block_lookback": _env_int("ALPHAX_NODEREAL_LOG_BLOCK_LOOKBACK", 120),
@ -87,6 +90,14 @@ def default_onchain_config(default_chains=("ethereum", "bsc")):
"nodereal_raw_max_logs_per_chain": _env_int("ALPHAX_NODEREAL_RAW_MAX_LOGS_PER_CHAIN", 30),
"nodereal_auto_mapping_enabled": _env_bool("ALPHAX_NODEREAL_AUTO_MAPPING_ENABLED", True),
"nodereal_auto_mapping_confidence": _env_int("ALPHAX_NODEREAL_AUTO_MAPPING_CONFIDENCE", 82),
"alchemy_log_block_lookback": _env_int("ALPHAX_ALCHEMY_LOG_BLOCK_LOOKBACK", 9),
"alchemy_max_logs_per_token": _env_int("ALPHAX_ALCHEMY_MAX_LOGS_PER_TOKEN", 25),
"alchemy_raw_transfer_enabled": _env_bool("ALPHAX_ALCHEMY_RAW_TRANSFER_ENABLED", True),
"alchemy_raw_chains": _env_list("ALPHAX_ALCHEMY_RAW_CHAINS", ("ethereum",)),
"alchemy_raw_block_lookback": _env_int("ALPHAX_ALCHEMY_RAW_BLOCK_LOOKBACK", 1),
"alchemy_raw_max_logs_per_chain": _env_int("ALPHAX_ALCHEMY_RAW_MAX_LOGS_PER_CHAIN", 8),
"alchemy_auto_mapping_enabled": _env_bool("ALPHAX_ALCHEMY_AUTO_MAPPING_ENABLED", True),
"alchemy_auto_mapping_confidence": _env_int("ALPHAX_ALCHEMY_AUTO_MAPPING_CONFIDENCE", 82),
"candidate_enabled": _env_bool("ALPHAX_ONCHAIN_CANDIDATE_ENABLED", True),
"candidate_min_score": _env_float("ALPHAX_ONCHAIN_CANDIDATE_MIN_SCORE", 70),
"candidate_min_confidence": _env_int("ALPHAX_ONCHAIN_CANDIDATE_MIN_CONFIDENCE", 70),

View File

@ -467,6 +467,7 @@ def get_onchain_provider_status(hours=24):
hours = int(hours or 24)
cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()
nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY")
alchemy_env = str(cfg.get("alchemy_api_key_env") or "ALPHAX_ALCHEMY_API_KEY")
conn = get_conn()
try:
raw_total = conn.execute("SELECT COUNT(*) FROM onchain_raw_events WHERE detected_at >= %s", (cutoff,)).fetchone()[0]
@ -545,9 +546,15 @@ def get_onchain_provider_status(hours=24):
summary = _load(last_onchain.get("summary_json") if last_onchain else "{}", {}) if last_onchain else {}
last_error = last_onchain.get("error_message") if last_onchain else ""
provider = str(cfg.get("provider") or "nodereal").strip().lower()
nodereal_enabled = bool(cfg.get("nodereal_enabled", True)) and provider == "nodereal"
requested = {p.strip() for p in provider.split(",") if p.strip()}
if requested & {"all", "multi", "both"}:
requested = {"nodereal", "alchemy"}
nodereal_enabled = bool(cfg.get("nodereal_enabled", True)) and "nodereal" in requested
alchemy_enabled = bool(cfg.get("alchemy_enabled", False)) and "alchemy" in requested
nodereal_metrics = int(sum(row["count"] for row in metric_sources if row["source"] == "nodereal"))
nodereal_signals = int(sum(row["count"] for row in signal_sources if row["source"] == "nodereal"))
alchemy_metrics = int(sum(row["count"] for row in metric_sources if row["source"] == "alchemy"))
alchemy_signals = int(sum(row["count"] for row in signal_sources if row["source"] == "alchemy"))
providers = [
{
"provider": "nodereal",
@ -566,6 +573,23 @@ def get_onchain_provider_status(hours=24):
last_error if "nodereal" in str(last_error).lower() else "",
),
},
{
"provider": "alchemy",
"label": "Alchemy",
"enabled": alchemy_enabled,
"api_key_present": bool(os.getenv(alchemy_env, "").strip()),
"implemented": True,
"role": "EVM 备用/并行链上数据源Transfer 日志、大额转账、ERC-20 自动映射",
"raw_events": int(raw_total or 0),
"metrics": alchemy_metrics,
"signals": alchemy_signals,
"status": _provider_status_label(
alchemy_enabled,
True,
int(raw_total or 0) + alchemy_metrics + alchemy_signals,
last_error if "alchemy" in str(last_error).lower() else "",
),
},
]
return {
"hours": hours,

View File

@ -0,0 +1,85 @@
"""Small JSON-RPC client for Alchemy EVM endpoints."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import requests
DEFAULT_ALCHEMY_CHAIN_ENDPOINTS = {
"ethereum": "https://eth-mainnet.g.alchemy.com/v2/{api_key}",
"bsc": "https://bnb-mainnet.g.alchemy.com/v2/{api_key}",
}
@dataclass(frozen=True)
class AlchemyConfig:
api_key: str
timeout: int = 15
endpoints: dict[str, str] | None = None
class AlchemyClient:
def __init__(self, config: AlchemyConfig):
self.config = config
self.endpoints = {**DEFAULT_ALCHEMY_CHAIN_ENDPOINTS, **(config.endpoints or {})}
def supports_chain(self, chain: str) -> bool:
return bool(self._endpoint(chain))
def call(self, chain: str, method: str, params: list[Any] | None = None) -> Any:
endpoint = self._endpoint(chain)
if not endpoint:
raise ValueError(f"alchemy_chain_not_configured:{chain}")
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or [],
}
resp = requests.post(
endpoint,
json=payload,
timeout=self.config.timeout,
headers={"Content-Type": "application/json", "User-Agent": "AlphaX-Agent-Crypto/1.0"},
)
if resp.status_code >= 400:
raise RuntimeError(f"alchemy_http_{resp.status_code}:{resp.text[:200]}")
data = resp.json()
if data.get("error"):
raise RuntimeError(f"alchemy_rpc_error:{data['error']}")
return data.get("result")
def block_number(self, chain: str) -> int:
return _hex_to_int(self.call(chain, "eth_blockNumber", []))
def get_logs(self, chain: str, log_filter: dict[str, Any]) -> list[dict[str, Any]]:
result = self.call(chain, "eth_getLogs", [log_filter])
return result if isinstance(result, list) else []
def eth_call(self, chain: str, to_address: str, data: str, block: str = "latest") -> str:
result = self.call(chain, "eth_call", [{"to": to_address, "data": data}, block])
return str(result or "")
def _endpoint(self, chain: str) -> str:
chain_key = str(chain or "").lower().strip()
template = self.endpoints.get(chain_key, "")
if not template or not self.config.api_key:
return ""
return template.format(api_key=self.config.api_key)
def _hex_to_int(value: Any) -> int:
if value is None:
return 0
if isinstance(value, int):
return value
text = str(value).strip()
if not text:
return 0
try:
return int(text, 16) if text.startswith("0x") else int(text)
except Exception:
return 0

View File

@ -29,6 +29,7 @@ from app.db.onchain_db import (
)
from app.services.event_driven_screener import _event_hash as event_hash
from app.services.event_driven_screener import _tradable_symbol, init_event_tables
from app.services.alchemy_client import AlchemyClient, AlchemyConfig, DEFAULT_ALCHEMY_CHAIN_ENDPOINTS
from app.services.nodereal_client import DEFAULT_CHAIN_ENDPOINTS, NodeRealClient, NodeRealConfig
@ -78,6 +79,7 @@ def get_onchain_params():
else:
chains = [str(x).strip().lower() for x in chains_raw if str(x).strip()]
nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY")
alchemy_env = str(cfg.get("alchemy_api_key_env") or "ALPHAX_ALCHEMY_API_KEY")
token_mappings_env = str(cfg.get("token_mappings_env") or "ALPHAX_ONCHAIN_TOKEN_MAPPINGS")
return {
"enabled": bool(cfg.get("enabled", False)),
@ -88,6 +90,10 @@ def get_onchain_params():
"nodereal_chains": _normalize_chain_list(cfg.get("nodereal_chains") or ("ethereum", "bsc")),
"nodereal_api_key": os.getenv(nodereal_env, "").strip(),
"nodereal_api_key_env": nodereal_env,
"alchemy_enabled": bool(cfg.get("alchemy_enabled", False)),
"alchemy_chains": _normalize_chain_list(cfg.get("alchemy_chains") or ("ethereum", "bsc")),
"alchemy_api_key": os.getenv(alchemy_env, "").strip(),
"alchemy_api_key_env": alchemy_env,
"token_mappings": _load_token_mappings(cfg.get("token_mappings"), os.getenv(token_mappings_env, "")),
"token_mappings_env": token_mappings_env,
"nodereal_log_block_lookback": int(cfg.get("nodereal_log_block_lookback") or 120),
@ -97,6 +103,14 @@ def get_onchain_params():
"nodereal_raw_max_logs_per_chain": int(cfg.get("nodereal_raw_max_logs_per_chain") or 30),
"nodereal_auto_mapping_enabled": bool(cfg.get("nodereal_auto_mapping_enabled", True)),
"nodereal_auto_mapping_confidence": int(cfg.get("nodereal_auto_mapping_confidence") or 82),
"alchemy_log_block_lookback": int(cfg.get("alchemy_log_block_lookback") or 120),
"alchemy_max_logs_per_token": int(cfg.get("alchemy_max_logs_per_token") or 25),
"alchemy_raw_transfer_enabled": bool(cfg.get("alchemy_raw_transfer_enabled", True)),
"alchemy_raw_chains": _normalize_chain_list(cfg.get("alchemy_raw_chains") or ("ethereum",)),
"alchemy_raw_block_lookback": int(cfg.get("alchemy_raw_block_lookback") or 1),
"alchemy_raw_max_logs_per_chain": int(cfg.get("alchemy_raw_max_logs_per_chain") or 30),
"alchemy_auto_mapping_enabled": bool(cfg.get("alchemy_auto_mapping_enabled", True)),
"alchemy_auto_mapping_confidence": int(cfg.get("alchemy_auto_mapping_confidence") or 82),
"candidate_enabled": bool(cfg.get("candidate_enabled", True)),
"candidate_min_score": float(cfg.get("candidate_min_score") or 70),
"candidate_min_confidence": int(cfg.get("candidate_min_confidence") or 70),
@ -356,9 +370,10 @@ def _read_erc20_metadata(client, chain, contract):
return metadata
def _auto_map_nodereal_contract(client, chain, contract, cfg=None):
def _auto_map_evm_contract(client, chain, contract, cfg=None, provider="nodereal"):
cfg = cfg or get_onchain_params()
if not cfg.get("nodereal_auto_mapping_enabled", True):
provider = str(provider or "nodereal").lower()
if not cfg.get(f"{provider}_auto_mapping_enabled", True):
return None
existing = find_mapping_by_contract(chain, contract)
if existing:
@ -368,12 +383,13 @@ def _auto_map_nodereal_contract(client, chain, contract, cfg=None):
if not _is_auto_mapping_symbol_allowed(base, metadata.get("name")):
return None
symbol = normalize_symbol(base)
confidence = max(1, min(95, int(cfg.get("nodereal_auto_mapping_confidence") or 82)))
confidence = max(1, min(95, int(cfg.get(f"{provider}_auto_mapping_confidence") or 82)))
source = f"{provider}_erc20_metadata"
mapping_id = onchain_db.upsert_token_mapping(
symbol=symbol,
chain=chain,
contract_address=contract,
source="nodereal_erc20_metadata",
source=source,
confidence=confidence,
raw=metadata,
is_active=True,
@ -385,12 +401,16 @@ def _auto_map_nodereal_contract(client, chain, contract, cfg=None):
"symbol": symbol,
"chain": str(chain or "").lower(),
"contract_address": contract,
"source": "nodereal_erc20_metadata",
"source": source,
"confidence": confidence,
"raw_json": json.dumps(metadata, ensure_ascii=False),
}
def _auto_map_nodereal_contract(client, chain, contract, cfg=None):
return _auto_map_evm_contract(client, chain, contract, cfg=cfg, provider="nodereal")
def _nodereal_client(cfg=None):
cfg = cfg or get_onchain_params()
return NodeRealClient(
@ -402,8 +422,20 @@ def _nodereal_client(cfg=None):
)
def _event_from_nodereal_transfer(log, mapping, cfg=None):
def _alchemy_client(cfg=None):
cfg = cfg or get_onchain_params()
return AlchemyClient(
AlchemyConfig(
api_key=cfg.get("alchemy_api_key") or "",
timeout=int(cfg.get("timeout") or 15),
endpoints=dict(DEFAULT_ALCHEMY_CHAIN_ENDPOINTS),
)
)
def _event_from_evm_transfer(log, mapping, cfg=None, source="nodereal"):
cfg = cfg or get_onchain_params()
source = str(source or "nodereal").lower()
topics = log.get("topics") or []
if len(topics) < 3:
return None
@ -439,13 +471,18 @@ def _event_from_nodereal_transfer(log, mapping, cfg=None):
"confidence": 76,
"severity": "A",
"detected_at": _now().isoformat(timespec="seconds"),
"source": "nodereal",
"source": source,
"url": _chain_explorer_tx_url(chain, tx_hash),
"raw": log,
}
def _raw_event_from_nodereal_transfer(log, chain):
def _event_from_nodereal_transfer(log, mapping, cfg=None):
return _event_from_evm_transfer(log, mapping, cfg=cfg, source="nodereal")
def _raw_event_from_evm_transfer(log, chain, source="nodereal"):
source = str(source or "nodereal").lower()
topics = log.get("topics") or []
if len(topics) < 3:
return None
@ -456,14 +493,15 @@ def _raw_event_from_nodereal_transfer(log, chain):
return None
from_addr = _topic_to_address(topics[1])
to_addr = _topic_to_address(topics[2])
source_label = "Alchemy" if source == "alchemy" else "NodeReal"
return {
"source": "nodereal",
"source": source,
"chain": str(chain or "").lower(),
"event_type": "evm_transfer",
"token_address": contract,
"symbol_guess": "",
"name": "",
"title": "NodeReal ERC-20 原始转账",
"title": f"{source_label} ERC-20 原始转账",
"description": f"合约 {_short_addr(contract)} · {_short_addr(from_addr)} -> {_short_addr(to_addr)}",
"url": _chain_explorer_tx_url(chain, tx_hash),
"amount": amount_raw,
@ -476,7 +514,11 @@ def _raw_event_from_nodereal_transfer(log, chain):
}
def _apply_raw_event_mapping(raw_event, client=None, cfg=None):
def _raw_event_from_nodereal_transfer(log, chain):
return _raw_event_from_evm_transfer(log, chain, source="nodereal")
def _apply_raw_event_mapping(raw_event, client=None, cfg=None, provider=None):
item = dict(raw_event or {})
chain = str(item.get("chain") or "").lower()
contract = str(item.get("token_address") or "").strip()
@ -484,7 +526,7 @@ def _apply_raw_event_mapping(raw_event, client=None, cfg=None):
return item
mapping = find_mapping_by_contract(chain, contract)
if not mapping and client:
mapping = _auto_map_nodereal_contract(client, chain, contract, cfg=cfg)
mapping = _auto_map_evm_contract(client, chain, contract, cfg=cfg, provider=provider or item.get("source") or "nodereal")
if mapping:
item["mapped_symbol"] = normalize_symbol(mapping.get("symbol"))
item["mapping_status"] = "mapped"
@ -579,7 +621,7 @@ def fetch_nodereal_events(limit=60):
for mapping in mappings[: int(limit or 60)]:
chain = str(mapping.get("chain") or "").lower()
contract = str(mapping.get("contract_address") or "").strip()
if not contract:
if not _is_evm_address(contract):
continue
try:
holder_count = client.token_holder_count(chain, contract)
@ -630,6 +672,81 @@ def fetch_nodereal_events(limit=60):
}
def fetch_alchemy_events(limit=60):
cfg = get_onchain_params()
if not cfg.get("alchemy_enabled", False):
return {"metrics": [], "events": [], "raw_events": [], "errors": ["alchemy_disabled"]}
if not cfg.get("alchemy_api_key"):
return {"metrics": [], "events": [], "raw_events": [], "errors": ["alchemy_api_key_missing"]}
seed_result = seed_configured_token_mappings(cfg)
client = _alchemy_client(cfg)
raw_result = fetch_alchemy_raw_events(client=client, cfg=cfg, limit=limit)
enabled_chains = set(cfg.get("alchemy_chains") or DEFAULT_CHAINS)
all_mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE)
chain_mappings = [m for m in all_mappings if str(m.get("chain") or "").lower() in enabled_chains]
mappings = []
unsupported_chains = set()
for mapping in chain_mappings:
chain = str(mapping.get("chain") or "").lower()
if client.supports_chain(chain):
mappings.append(mapping)
else:
unsupported_chains.add(chain)
events = []
errors = list(seed_result.get("errors") or []) + list(raw_result.get("errors") or [])
diagnostics = {
"seeded_mappings": seed_result.get("seeded", 0),
"mapping_total": len(all_mappings),
"chain_mapping_total": len(chain_mappings),
"supported_mapping_total": len(mappings),
"enabled_chains": sorted(enabled_chains),
"unsupported_chains": sorted(unsupported_chains),
}
lookback = max(1, int(cfg.get("alchemy_log_block_lookback") or 120))
max_logs = max(1, int(cfg.get("alchemy_max_logs_per_token") or 25))
for mapping in mappings[: int(limit or 60)]:
chain = str(mapping.get("chain") or "").lower()
contract = str(mapping.get("contract_address") or "").strip()
if not _is_evm_address(contract):
continue
try:
latest = client.block_number(chain)
if latest <= 0:
continue
logs = client.get_logs(
chain,
{
"address": contract,
"fromBlock": hex(max(0, latest - lookback)),
"toBlock": hex(latest),
"topics": [TRANSFER_TOPIC],
},
)
for log in logs[:max_logs]:
if not isinstance(log, dict):
continue
event = _event_from_evm_transfer(log, mapping, cfg=cfg, source="alchemy")
if not event:
continue
if insert_onchain_event(event):
events.append(event)
except Exception as exc:
errors.append(f"{mapping.get('symbol')}:alchemy_logs:{str(exc)[:160]}")
if not all_mappings:
diagnostics["mapping_note"] = "no_strategy_mappings_raw_events_only"
elif not chain_mappings:
diagnostics["mapping_note"] = "no_enabled_chain_mappings_raw_events_only"
elif not mappings:
diagnostics["mapping_note"] = "no_supported_mappings_raw_events_only"
return {
"metrics": [],
"events": events,
"raw_events": raw_result.get("raw_events") or [],
"errors": errors,
"diagnostics": diagnostics,
}
def fetch_nodereal_raw_events(client=None, cfg=None, limit=60):
cfg = cfg or get_onchain_params()
if not cfg.get("nodereal_raw_transfer_enabled", True):
@ -659,9 +776,10 @@ def fetch_nodereal_raw_events(client=None, cfg=None, limit=60):
continue
item = _raw_event_from_nodereal_transfer(log, chain)
if item:
raw_items.append(_apply_raw_event_mapping(item, client=client, cfg=cfg))
raw_items.append(item)
raw_items.sort(key=lambda item: item.get("amount") or 0, reverse=True)
for item in raw_items[:per_chain]:
item = _apply_raw_event_mapping(item, client=client, cfg=cfg, provider="nodereal")
if insert_onchain_raw_event(item):
inserted.append(item)
except Exception as exc:
@ -669,6 +787,46 @@ def fetch_nodereal_raw_events(client=None, cfg=None, limit=60):
return {"raw_events": inserted, "errors": errors}
def fetch_alchemy_raw_events(client=None, cfg=None, limit=60):
cfg = cfg or get_onchain_params()
if not cfg.get("alchemy_raw_transfer_enabled", True):
return {"raw_events": [], "errors": []}
client = client or _alchemy_client(cfg)
chains = [c for c in (cfg.get("alchemy_raw_chains") or ("ethereum",)) if client.supports_chain(c)]
lookback = max(0, min(12, int(cfg.get("alchemy_raw_block_lookback") or 1)))
per_chain = max(1, min(int(cfg.get("alchemy_raw_max_logs_per_chain") or 30), int(limit or 60)))
inserted = []
errors = []
for chain in chains:
try:
latest = client.block_number(chain)
if latest <= 0:
continue
logs = client.get_logs(
chain,
{
"fromBlock": hex(max(0, latest - lookback)),
"toBlock": hex(latest),
"topics": [TRANSFER_TOPIC],
},
)
raw_items = []
for log in logs:
if not isinstance(log, dict):
continue
item = _raw_event_from_evm_transfer(log, chain, source="alchemy")
if item:
raw_items.append(item)
raw_items.sort(key=lambda item: item.get("amount") or 0, reverse=True)
for item in raw_items[:per_chain]:
item = _apply_raw_event_mapping(item, client=client, cfg=cfg, provider="alchemy")
if insert_onchain_raw_event(item):
inserted.append(item)
except Exception as exc:
errors.append(f"{chain}:alchemy_raw_logs:{str(exc)[:160]}")
return {"raw_events": inserted, "errors": errors}
def _short_addr(value):
value = str(value or "")
if len(value) <= 12:
@ -676,6 +834,17 @@ def _short_addr(value):
return value[:6] + "..." + value[-4:]
def _is_evm_address(value):
text = str(value or "").strip()
if len(text) != 42 or not text.startswith("0x"):
return False
try:
int(text[2:], 16)
return True
except Exception:
return False
def ingest_normalized_events(events):
"""Test/integration helper for provider adapters."""
init_db()
@ -691,6 +860,19 @@ def ingest_normalized_events(events):
return {"inserted": len(inserted), "queued": queued.get("queued", 0), "events": inserted, "candidate_result": queued}
def _enabled_onchain_providers(cfg):
raw = str(cfg.get("provider") or "nodereal").strip().lower()
requested = [p.strip() for p in raw.split(",") if p.strip()]
if any(p in {"all", "multi", "both"} for p in requested):
requested = ["nodereal", "alchemy"]
providers = []
if "nodereal" in requested and cfg.get("nodereal_enabled", True):
providers.append("nodereal")
if "alchemy" in requested and cfg.get("alchemy_enabled", False):
providers.append("alchemy")
return providers or ["nodereal"]
def _candidate_title(event):
label = event.get("signal_label") or signal_label(event.get("signal_code"))
value = _safe_float(event.get("value_usd"))
@ -709,96 +891,111 @@ def enqueue_onchain_candidates(min_score=None, min_confidence=None, cooldown_hou
init_event_tables()
cutoff = (_now() - timedelta(hours=24)).isoformat()
conn = get_conn()
rows = conn.execute(
"""
SELECT e.*,
COALESCE((
SELECT m.onchain_score FROM onchain_token_metrics m
WHERE m.symbol=e.symbol AND m.chain=e.chain
ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1
), 0) AS latest_onchain_score,
COALESCE((
SELECT m.risk_score FROM onchain_token_metrics m
WHERE m.symbol=e.symbol AND m.chain=e.chain
ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1
), 0) AS latest_risk_score
FROM onchain_events e
WHERE e.status IN ('new', 'candidate_failed')
AND e.detected_at >= %s
AND e.direction='positive'
ORDER BY e.confidence DESC, e.value_usd DESC, e.detected_at::timestamp DESC
LIMIT %s
""",
(cutoff, int(limit or 20)),
).fetchall()
queued = []
skipped_ids = []
now = _now().isoformat(timespec="seconds")
cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat()
for row in rows:
event = dict(row)
symbol = normalize_symbol(event.get("symbol"))
if not symbol or not _tradable_symbol(symbol):
skipped_ids.append(event["id"])
continue
score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence")))
if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0):
continue
recent = conn.execute(
errors = []
try:
rows = conn.execute(
"""
SELECT id FROM event_news
WHERE source='onchain' AND symbol=%s AND detected_at >= %s
LIMIT 1
SELECT e.*,
COALESCE((
SELECT m.onchain_score FROM onchain_token_metrics m
WHERE m.symbol=e.symbol AND m.chain=e.chain
ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1
), 0) AS latest_onchain_score,
COALESCE((
SELECT m.risk_score FROM onchain_token_metrics m
WHERE m.symbol=e.symbol AND m.chain=e.chain
ORDER BY m.metric_time::timestamp DESC, m.id DESC LIMIT 1
), 0) AS latest_risk_score
FROM onchain_events e
WHERE e.status IN ('new', 'candidate_failed')
AND e.detected_at >= %s
AND e.direction='positive'
ORDER BY e.confidence DESC, e.value_usd DESC, e.detected_at::timestamp DESC
LIMIT %s
""",
(symbol, cooldown_cutoff),
).fetchone()
if recent:
skipped_ids.append(event["id"])
continue
title = _candidate_title(event)
h = event_hash("onchain", title, symbol)
try:
conn.execute(
(cutoff, int(limit or 20)),
).fetchall()
now = _now().isoformat(timespec="seconds")
cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat()
for row in rows:
event = dict(row)
symbol = normalize_symbol(event.get("symbol"))
if not symbol or not _tradable_symbol(symbol):
skipped_ids.append(event["id"])
continue
score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence")))
if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0):
continue
recent = conn.execute(
"""
INSERT INTO event_news
(event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed)
VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0)
SELECT id FROM event_news
WHERE source='onchain' AND symbol=%s AND detected_at >= %s
LIMIT 1
""",
(
h,
symbol,
title,
event.get("url") or "",
event.get("detected_at") or now,
now,
event.get("severity") or "A",
json.dumps(
{
"onchain_event_id": event.get("id"),
"chain": event.get("chain"),
"signal_code": event.get("signal_code"),
"signal_label": event.get("signal_label"),
"confidence": event.get("confidence"),
"value_usd": event.get("value_usd"),
"onchain_score": event.get("latest_onchain_score"),
"risk_score": event.get("latest_risk_score"),
},
ensure_ascii=False,
),
),
(symbol, cooldown_cutoff),
).fetchone()
if recent:
skipped_ids.append(event["id"])
continue
title = _candidate_title(event)
h = event_hash("onchain", title, symbol)
try:
# A single bad/duplicate candidate must not poison the whole
# PostgreSQL transaction; nested transaction becomes SAVEPOINT.
with conn.transaction():
inserted = conn.execute(
"""
INSERT INTO event_news
(event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed)
VALUES (%s, 'onchain', %s, %s, %s, %s, %s, %s, 'onchain_candidate', %s, 0)
ON CONFLICT(event_hash) DO NOTHING
RETURNING id
""",
(
h,
symbol,
title,
event.get("url") or "",
event.get("detected_at") or now,
now,
event.get("severity") or "A",
json.dumps(
{
"onchain_event_id": event.get("id"),
"chain": event.get("chain"),
"signal_code": event.get("signal_code"),
"signal_label": event.get("signal_label"),
"confidence": event.get("confidence"),
"value_usd": event.get("value_usd"),
"onchain_score": event.get("latest_onchain_score"),
"risk_score": event.get("latest_risk_score"),
},
ensure_ascii=False,
),
),
).fetchone()
if inserted:
conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),))
queued.append(symbol)
else:
skipped_ids.append(event["id"])
except Exception as exc:
errors.append(f"{symbol}:candidate_enqueue:{str(exc)[:160]}")
skipped_ids.append(event["id"])
if skipped_ids:
conn.execute(
"UPDATE onchain_events SET status='candidate_skipped' WHERE id IN (" + ",".join(["%s"] * len(skipped_ids)) + ")",
tuple(skipped_ids),
)
conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=%s", (event.get("id"),))
queued.append(symbol)
except Exception:
skipped_ids.append(event["id"])
if skipped_ids:
conn.execute(
"UPDATE onchain_events SET status='candidate_skipped' WHERE id IN (" + ",".join(["%s"] * len(skipped_ids)) + ")",
tuple(skipped_ids),
)
conn.commit()
conn.close()
return {"queued": len(queued), "skipped": len(skipped_ids), "symbols": queued}
conn.commit()
return {"queued": len(queued), "skipped": len(skipped_ids), "symbols": queued, "errors": errors}
except Exception:
conn.rollback()
raise
finally:
conn.close()
def run_once(limit=60):
@ -816,21 +1013,39 @@ def run_once(limit=60):
"check_time": _now().isoformat(),
}
if cfg.get("enabled"):
node = fetch_nodereal_events(limit=limit)
output["metrics_count"] += len(node.get("metrics") or [])
output["events_count"] += len(node.get("events") or [])
output["raw_events_count"] += len(node.get("raw_events") or [])
output["errors"].extend(node.get("errors") or [])
provider_results = {}
for provider in _enabled_onchain_providers(cfg):
if provider == "alchemy":
node = fetch_alchemy_events(limit=limit)
else:
node = fetch_nodereal_events(limit=limit)
provider_results[provider] = {
"metrics_count": len(node.get("metrics") or []),
"events_count": len(node.get("events") or []),
"raw_events_count": len(node.get("raw_events") or []),
"diagnostics": node.get("diagnostics") or {},
}
output["metrics_count"] += len(node.get("metrics") or [])
output["events_count"] += len(node.get("events") or [])
output["raw_events_count"] += len(node.get("raw_events") or [])
output["errors"].extend(node.get("errors") or [])
output["provider_results"] = provider_results
output["discovered_mappings"] = 0
if output.get("discovered_mappings"):
output["status"] = "bootstrapped"
node = fetch_nodereal_events(limit=limit)
output["metrics_count"] = len(node.get("metrics") or [])
output["events_count"] = len(node.get("events") or [])
output["errors"].extend(node.get("errors") or [])
output["metrics_count"] = 0
output["events_count"] = 0
output["raw_events_count"] = 0
for provider in _enabled_onchain_providers(cfg):
node = fetch_alchemy_events(limit=limit) if provider == "alchemy" else fetch_nodereal_events(limit=limit)
output["metrics_count"] += len(node.get("metrics") or [])
output["events_count"] += len(node.get("events") or [])
output["raw_events_count"] += len(node.get("raw_events") or [])
output["errors"].extend(node.get("errors") or [])
queued = enqueue_onchain_candidates()
output["candidate_queued"] = queued.get("queued", 0)
output["candidate_symbols"] = queued.get("symbols", [])
output["errors"].extend(queued.get("errors") or [])
if not output["metrics_count"] and not output["events_count"] and not output["raw_events_count"]:
output["status"] = "no_onchain_data"
log_cron_run(
@ -858,6 +1073,7 @@ __all__ = [
"POSITIVE_SIGNALS",
"RISK_SIGNALS",
"enqueue_onchain_candidates",
"fetch_alchemy_events",
"fetch_nodereal_events",
"get_onchain_params",
"ingest_normalized_events",

View File

@ -2,7 +2,7 @@ import json
import os
import sqlite3
import sys
from datetime import datetime
from datetime import datetime, timedelta
from fastapi.testclient import TestClient
@ -93,6 +93,77 @@ def test_onchain_candidate_enqueues_event_news_not_recommendation(monkeypatch, t
assert status == "candidate_queued"
def test_onchain_candidate_duplicate_event_hash_does_not_abort_transaction(monkeypatch, tmp_path):
db_path = _temp_db(monkeypatch, tmp_path)
old_time = (datetime.now() - timedelta(hours=12)).isoformat()
event_time = datetime.now().isoformat()
stale_event = {
"id": 9001,
"symbol": "DUP/USDT",
"signal_label": "DEX交易量放大",
"signal_code": "dex_volume_spike",
"value_usd": 500000,
}
duplicate_title = onchain_monitor._candidate_title(stale_event)
duplicate_hash = onchain_monitor.event_hash("onchain", duplicate_title, "DUP/USDT")
conn = altcoin_db.get_conn()
conn.execute(
"""
INSERT INTO event_news
(event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed)
VALUES (%s, 'onchain', %s, %s, '', %s, %s, 'A', 'onchain_candidate', '{}', 0)
""",
(duplicate_hash, "DUP/USDT", duplicate_title, old_time, old_time),
)
conn.commit()
conn.close()
first_id = onchain_db.insert_onchain_event(
{
"chain": "ethereum",
"symbol": "DUP/USDT",
"contract_address": "0xdup",
"signal_code": "dex_volume_spike",
"signal_label": "DEX交易量放大",
"direction": "positive",
"value_usd": 500000,
"confidence": 90,
"severity": "A",
"detected_at": event_time,
"source": "test",
}
)
second_id = onchain_db.insert_onchain_event(
{
"chain": "ethereum",
"symbol": "OK/USDT",
"contract_address": "0xok",
"signal_code": "dex_volume_spike",
"direction": "positive",
"value_usd": 600000,
"confidence": 91,
"severity": "A",
"detected_at": event_time,
"source": "test",
}
)
result = onchain_monitor.enqueue_onchain_candidates(min_score=1, min_confidence=1, cooldown_hours=6)
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
statuses = {
row["id"]: row["status"]
for row in conn.execute("SELECT id, status FROM onchain_events WHERE id IN (?, ?)", (first_id, second_id)).fetchall()
}
queued_news = conn.execute("SELECT * FROM event_news WHERE source='onchain' AND symbol='OK/USDT'").fetchone()
conn.close()
assert result["queued"] == 1
assert result["skipped"] == 1
assert statuses[first_id] == "candidate_skipped"
assert statuses[second_id] == "candidate_queued"
assert queued_news is not None
def test_negative_onchain_signal_is_risk_context_only(monkeypatch, tmp_path):
db_path = _temp_db(monkeypatch, tmp_path)
onchain_db.insert_onchain_event(
@ -271,12 +342,13 @@ def test_token_detail_includes_mapped_raw_events(monkeypatch, tmp_path):
def test_nodereal_events_generate_metrics_and_normalized_event(monkeypatch, tmp_path):
_temp_db(monkeypatch, tmp_path)
monkeypatch.setenv("ALPHAX_NODEREAL_API_KEY", "test-key")
onchain_db.upsert_token_mapping("ABC", "ethereum", "0xabc", source="manual", confidence=95)
contract = "0x0000000000000000000000000000000000000abc"
onchain_db.upsert_token_mapping("ABC", "ethereum", contract, source="manual", confidence=95)
onchain_db.insert_token_metric(
{
"symbol": "ABC/USDT",
"chain": "ethereum",
"contract_address": "0xabc",
"contract_address": contract,
"window": "1h",
"metric_time": datetime.now().isoformat(),
"dex_volume_usd": 100000,
@ -292,7 +364,7 @@ def test_nodereal_events_generate_metrics_and_normalized_event(monkeypatch, tmp_
def token_holder_count(self, chain, contract):
assert chain == "ethereum"
assert contract == "0xabc"
assert contract == "0x0000000000000000000000000000000000000abc"
return 120
def block_number(self, chain):
@ -302,10 +374,10 @@ def test_nodereal_events_generate_metrics_and_normalized_event(monkeypatch, tmp_
def get_logs(self, chain, log_filter):
if "address" not in log_filter:
return []
assert log_filter["address"] == "0xabc"
assert log_filter["address"] == "0x0000000000000000000000000000000000000abc"
return [
{
"address": "0xabc",
"address": "0x0000000000000000000000000000000000000abc",
"transactionHash": "0xtx",
"data": hex(200000 * 10**18),
"topics": [
@ -402,6 +474,49 @@ def test_nodereal_records_raw_events_without_strategy_mappings(monkeypatch, tmp_
assert raw["items"][0]["event_type"] == "evm_transfer"
def test_alchemy_records_raw_events_without_strategy_mappings(monkeypatch, tmp_path):
monkeypatch.setenv("ALPHAX_ALCHEMY_API_KEY", "test-key")
monkeypatch.setenv("ALPHAX_ALCHEMY_ENABLED", "1")
monkeypatch.setenv("ALPHAX_ALCHEMY_CHAINS", "ethereum")
_temp_db(monkeypatch, tmp_path)
class RawAlchemyClient:
def supports_chain(self, chain):
return chain == "ethereum"
def block_number(self, chain):
return 1000
def get_logs(self, chain, log_filter):
if "address" in log_filter:
return []
return [
{
"address": "0xabc",
"transactionHash": "0xalchemyrawtx",
"data": hex(123456789),
"topics": [
onchain_monitor.TRANSFER_TOPIC,
"0x0000000000000000000000001111111111111111111111111111111111111111",
"0x0000000000000000000000002222222222222222222222222222222222222222",
],
}
]
monkeypatch.setattr(onchain_monitor, "_alchemy_client", lambda cfg=None: RawAlchemyClient())
result = onchain_monitor.fetch_alchemy_events(limit=10)
assert result["errors"] == []
assert result["events"] == []
assert len(result["raw_events"]) == 1
raw = onchain_db.list_onchain_raw_events(hours=50000)
assert raw["total"] == 1
assert raw["items"][0]["source"] == "alchemy"
assert raw["items"][0]["mapping_status"] == "unmapped"
assert raw["items"][0]["event_type"] == "evm_transfer"
def test_nodereal_auto_maps_raw_event_from_erc20_metadata(monkeypatch, tmp_path):
_temp_db(monkeypatch, tmp_path)
monkeypatch.setenv("ALPHAX_NODEREAL_API_KEY", "test-key")

View File

@ -217,13 +217,19 @@ def test_llm_system_config_overrides_env_defaults(monkeypatch):
def test_onchain_system_config_overrides_env(monkeypatch):
monkeypatch.setenv("ALPHAX_ONCHAIN_ENABLED", "0")
monkeypatch.setenv("TEST_NODEREAL_KEY", "nodereal-secret")
monkeypatch.setenv("TEST_ALCHEMY_KEY", "alchemy-secret")
set_config("system", "onchain", {
"enabled": True,
"provider": "alchemy",
"chains": ["ethereum", "bsc"],
"timeout": 9,
"candidate_min_score": 88,
"nodereal_api_key_env": "TEST_NODEREAL_KEY",
"nodereal_raw_max_logs_per_chain": 12,
"alchemy_enabled": True,
"alchemy_chains": ["ethereum"],
"alchemy_api_key_env": "TEST_ALCHEMY_KEY",
"alchemy_raw_max_logs_per_chain": 9,
})
params = get_onchain_params()
@ -234,6 +240,10 @@ def test_onchain_system_config_overrides_env(monkeypatch):
assert params["candidate_min_score"] == 88
assert params["nodereal_api_key"] == "nodereal-secret"
assert params["nodereal_raw_max_logs_per_chain"] == 12
assert params["provider"] == "alchemy"
assert params["alchemy_api_key"] == "alchemy-secret"
assert params["alchemy_chains"] == ["ethereum"]
assert params["alchemy_raw_max_logs_per_chain"] == 9
def test_paper_trading_system_config_controls_account_model(monkeypatch):