diff --git a/.env.example b/.env.example
index 7365e2f..fd38934 100644
--- a/.env.example
+++ b/.env.example
@@ -41,20 +41,28 @@ ALPHAX_LLM_REVIEW_ENABLED=1
# 链上追踪运行时配置。默认关闭;开启后采集结果只作为发现/风控辅助。
ALPHAX_ONCHAIN_ENABLED=0
-ALPHAX_ONCHAIN_CHAINS=ethereum,bsc,base,arbitrum,solana
+ALPHAX_ONCHAIN_PROVIDER=nodereal
+ALPHAX_ONCHAIN_CHAINS=ethereum,bsc
ALPHAX_ONCHAIN_TIMEOUT=15
+ALPHAX_NODEREAL_ENABLED=1
+ALPHAX_NODEREAL_CHAINS=ethereum,bsc
+ALPHAX_NODEREAL_API_KEY=
+ALPHAX_NODEREAL_LOG_BLOCK_LOOKBACK=120
+ALPHAX_NODEREAL_MAX_LOGS_PER_TOKEN=25
ALPHAX_ONCHAIN_CANDIDATE_ENABLED=1
ALPHAX_ONCHAIN_CANDIDATE_MIN_SCORE=70
ALPHAX_ONCHAIN_CANDIDATE_MIN_CONFIDENCE=70
ALPHAX_ONCHAIN_CANDIDATE_COOLDOWN_HOURS=6
-ALPHAX_ONCHAIN_DEXSCREENER_ENABLED=1
+ALPHAX_ONCHAIN_DEXSCREENER_ENABLED=0
ALPHAX_ONCHAIN_DEX_VOLUME_SPIKE_PCT=80
ALPHAX_ONCHAIN_DEX_MIN_LIQUIDITY_USD=100000
ALPHAX_ONCHAIN_DEX_MIN_VOLUME_24H_USD=100000
ALPHAX_ONCHAIN_LIQUIDITY_ADD_PCT=25
ALPHAX_ONCHAIN_LIQUIDITY_REMOVE_PCT=-25
ALPHAX_ONCHAIN_WHALE_TX_USD=250000
+ALPHAX_ETHERSCAN_ENABLED=0
ALPHAX_ETHERSCAN_API_KEY=
+ALPHAX_HELIUS_ENABLED=0
ALPHAX_HELIUS_API_KEY=
# 邮箱验证码 SMTP 配置。没有配置时,注册验证码只会生成,不会发邮件。
diff --git a/AGENTS.md b/AGENTS.md
index d3e8103..559ee4e 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -98,6 +98,13 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组
8. `app/services/review_engine.py`
负责复盘与策略自迭代,包括信号绩效、漏选复盘、规则候选、版本演进。
+### 4.1.1 链上数据源
+
+- 当前链上主数据源是 NodeReal,入口在 `app/services/nodereal_client.py` 和 `app/services/onchain_monitor.py`。
+- 默认只跑 `ALPHAX_ONCHAIN_PROVIDER=nodereal`,并通过 `ALPHAX_NODEREAL_API_KEY` 访问 EVM JSON-RPC / Enhanced API。
+- DEX Screener、Etherscan、Helius 已从默认链路关闭,只保留历史兼容函数和旧数据展示,不应再作为新增链上逻辑的主入口。
+- 新增链上信号优先落到 `onchain_token_metrics` / `onchain_events`,不要直接创建推荐;高质量事件仍通过 `event_news` 进入技术检查。
+
### 4.2 Web/API
`app/web/web_server.py` 只应负责 FastAPI 应用装配、模板装配、中间件、全局异常处理和 router include。新增业务 API 优先放到对应 route 模块:
diff --git a/README_DOCKER.md b/README_DOCKER.md
index 542ebbc..a4fde51 100644
--- a/README_DOCKER.md
+++ b/README_DOCKER.md
@@ -7,6 +7,7 @@
- Web 默认暴露到宿主机 `8191`,容器内端口 `8190`。
- 运行时数据库是 PostgreSQL,compose 内置 `postgres:16` 服务。
- `DATABASE_URL` 是应用唯一运行时数据库连接入口。
+- 链上主数据源是 NodeReal;`.env` 中配置 `ALPHAX_NODEREAL_API_KEY` 后,`python -m app.cli onchain` 才会产出 NodeReal 链上事件。
- 调度器以并发子进程运行,并通过业务锁组避免主推荐写入冲突。
- `.dockerignore` 排除了 `data/`、真实 `.env` 和所有 DB 文件,避免把数据库/密钥打进镜像。
diff --git a/app/config/system_config.py b/app/config/system_config.py
index c8ae0c0..fc601eb 100644
--- a/app/config/system_config.py
+++ b/app/config/system_config.py
@@ -68,16 +68,22 @@ def default_llm_config():
}
-def default_onchain_config(default_chains=("ethereum", "bsc", "base", "arbitrum", "solana")):
+def default_onchain_config(default_chains=("ethereum", "bsc")):
return {
"enabled": _env_bool("ALPHAX_ONCHAIN_ENABLED", False),
"chains": _env_list("ALPHAX_ONCHAIN_CHAINS", default_chains),
"timeout": _env_int("ALPHAX_ONCHAIN_TIMEOUT", 15),
+ "provider": _env_str("ALPHAX_ONCHAIN_PROVIDER", "nodereal"),
+ "nodereal_enabled": _env_bool("ALPHAX_NODEREAL_ENABLED", True),
+ "nodereal_chains": _env_list("ALPHAX_NODEREAL_CHAINS", ("ethereum", "bsc")),
+ "nodereal_api_key_env": "ALPHAX_NODEREAL_API_KEY",
+ "nodereal_log_block_lookback": _env_int("ALPHAX_NODEREAL_LOG_BLOCK_LOOKBACK", 120),
+ "nodereal_max_logs_per_token": _env_int("ALPHAX_NODEREAL_MAX_LOGS_PER_TOKEN", 25),
"candidate_enabled": _env_bool("ALPHAX_ONCHAIN_CANDIDATE_ENABLED", True),
"candidate_min_score": _env_float("ALPHAX_ONCHAIN_CANDIDATE_MIN_SCORE", 70),
"candidate_min_confidence": _env_int("ALPHAX_ONCHAIN_CANDIDATE_MIN_CONFIDENCE", 70),
"candidate_cooldown_hours": _env_float("ALPHAX_ONCHAIN_CANDIDATE_COOLDOWN_HOURS", 6),
- "dexscreener_enabled": _env_bool("ALPHAX_ONCHAIN_DEXSCREENER_ENABLED", True),
+ "dexscreener_enabled": _env_bool("ALPHAX_ONCHAIN_DEXSCREENER_ENABLED", False),
"dex_volume_spike_pct": _env_float("ALPHAX_ONCHAIN_DEX_VOLUME_SPIKE_PCT", 80),
"dex_min_liquidity_usd": _env_float("ALPHAX_ONCHAIN_DEX_MIN_LIQUIDITY_USD", 100000),
"dex_min_volume_24h_usd": _env_float("ALPHAX_ONCHAIN_DEX_MIN_VOLUME_24H_USD", 100000),
@@ -86,9 +92,9 @@ def default_onchain_config(default_chains=("ethereum", "bsc", "base", "arbitrum"
"liquidity_add_pct": _env_float("ALPHAX_ONCHAIN_LIQUIDITY_ADD_PCT", 25),
"liquidity_remove_pct": _env_float("ALPHAX_ONCHAIN_LIQUIDITY_REMOVE_PCT", -25),
"whale_tx_usd": _env_float("ALPHAX_ONCHAIN_WHALE_TX_USD", 250000),
- "etherscan_enabled": _env_bool("ALPHAX_ETHERSCAN_ENABLED", True),
+ "etherscan_enabled": _env_bool("ALPHAX_ETHERSCAN_ENABLED", False),
"etherscan_chains": _env_list("ALPHAX_ETHERSCAN_CHAINS", ("ethereum",)),
- "helius_enabled": _env_bool("ALPHAX_HELIUS_ENABLED", True),
+ "helius_enabled": _env_bool("ALPHAX_HELIUS_ENABLED", False),
"etherscan_base_url": _env_str("ALPHAX_ETHERSCAN_BASE_URL", "https://api.etherscan.io/v2/api"),
"helius_base_url": _env_str("ALPHAX_HELIUS_BASE_URL", "https://api.helius.xyz"),
"etherscan_api_key_env": "ALPHAX_ETHERSCAN_API_KEY",
@@ -334,7 +340,7 @@ def llm_config():
return cfg or default_llm_config()
-def onchain_config(default_chains=("ethereum", "bsc", "base", "arbitrum", "solana")):
+def onchain_config(default_chains=("ethereum", "bsc")):
cfg = get_onchain_config(default=None)
if cfg is None:
_seed_one("onchain", default_onchain_config(default_chains), "On-chain provider and signal thresholds; API keys remain in env")
diff --git a/app/db/onchain_db.py b/app/db/onchain_db.py
index fce8460..eb90f67 100644
--- a/app/db/onchain_db.py
+++ b/app/db/onchain_db.py
@@ -25,6 +25,7 @@ SIGNAL_LABELS = {
"exchange_outflow": "交易所流出",
"exchange_inflow_risk": "交易所流入风险",
"whale_accumulation": "鲸鱼增持",
+ "holder_growth": "持有人增长",
"holder_concentration_risk": "持仓集中风险",
"smart_money_buying": "聪明钱买入",
}
@@ -53,7 +54,7 @@ RAW_EVENT_EXPLAINERS = {
},
}
-POSITIVE_SIGNALS = {"dex_volume_spike", "liquidity_add", "exchange_outflow", "whale_accumulation", "smart_money_buying"}
+POSITIVE_SIGNALS = {"dex_volume_spike", "liquidity_add", "exchange_outflow", "whale_accumulation", "holder_growth", "smart_money_buying"}
RISK_SIGNALS = {"liquidity_remove_risk", "exchange_inflow_risk", "holder_concentration_risk"}
@@ -468,6 +469,7 @@ def get_onchain_provider_status(hours=24):
cfg = onchain_config()
hours = int(hours or 24)
cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()
+ nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY")
etherscan_env = str(cfg.get("etherscan_api_key_env") or "ALPHAX_ETHERSCAN_API_KEY")
helius_env = str(cfg.get("helius_api_key_env") or "ALPHAX_HELIUS_API_KEY")
etherscan_chains = cfg.get("etherscan_chains") or ["ethereum"]
@@ -535,23 +537,45 @@ def get_onchain_provider_status(hours=24):
summary = _load(last_onchain.get("summary_json") if last_onchain else "{}", {}) if last_onchain else {}
last_error = last_onchain.get("error_message") if last_onchain else ""
+ provider = str(cfg.get("provider") or "nodereal").strip().lower()
+ dexscreener_enabled = bool(cfg.get("dexscreener_enabled", False)) and provider != "nodereal"
+ etherscan_enabled = bool(cfg.get("etherscan_enabled", False)) and provider != "nodereal"
+ helius_enabled = bool(cfg.get("helius_enabled", False)) and provider != "nodereal"
providers = [
+ {
+ "provider": "nodereal",
+ "label": "NodeReal",
+ "enabled": bool(cfg.get("nodereal_enabled", True)) and provider == "nodereal",
+ "api_key_present": bool(os.getenv(nodereal_env, "").strip()),
+ "implemented": True,
+ "role": "EVM 主链上数据源:Transfer 日志、大额转账、holder 变化",
+ "raw_events": 0,
+ "metrics": int(sum(row["count"] for row in metric_sources if row["source"] == "nodereal")),
+ "signals": int(sum(row["count"] for row in signal_sources if row["source"] == "nodereal")),
+ "status": _provider_status_label(
+ bool(cfg.get("nodereal_enabled", True)),
+ True,
+ int(sum(row["count"] for row in metric_sources if row["source"] == "nodereal"))
+ + int(sum(row["count"] for row in signal_sources if row["source"] == "nodereal")),
+ last_error if "nodereal" in str(last_error).lower() else "",
+ ),
+ },
{
"provider": "dexscreener",
"label": "DEX Screener",
- "enabled": bool(cfg.get("dexscreener_enabled", True)),
+ "enabled": dexscreener_enabled,
"api_key_present": True,
"implemented": True,
"role": "低优先级曝光源:Token 资料、付费推广、已映射合约的 DEX 成交量与流动性",
"raw_events": int(raw_total or 0),
"metrics": int(metric_total or 0),
"signals": int(sum(row["count"] for row in signal_sources if row["source"] == "dexscreener")),
- "status": _provider_status_label(bool(cfg.get("dexscreener_enabled", True)), True, raw_total + metric_total, last_error),
+ "status": _provider_status_label(dexscreener_enabled, True, raw_total + metric_total, last_error),
},
{
"provider": "etherscan",
"label": "Etherscan",
- "enabled": bool(cfg.get("etherscan_enabled", True)),
+ "enabled": etherscan_enabled,
"api_key_present": bool(os.getenv(etherscan_env, "").strip()),
"implemented": True,
"role": "EVM 已映射合约的 ERC20 大额转账,当前链: " + ", ".join(etherscan_chains or ["ethereum"]),
@@ -559,7 +583,7 @@ def get_onchain_provider_status(hours=24):
"metrics": 0,
"signals": int(sum(row["count"] for row in signal_sources if row["source"] == "etherscan")),
"status": _provider_status_label(
- bool(cfg.get("etherscan_enabled", True)),
+ etherscan_enabled,
True,
int(sum(row["count"] for row in signal_sources if row["source"] == "etherscan")),
last_error if "etherscan" in str(last_error).lower() else "",
@@ -568,7 +592,7 @@ def get_onchain_provider_status(hours=24):
{
"provider": "helius",
"label": "Helius",
- "enabled": bool(cfg.get("helius_enabled", True)),
+ "enabled": helius_enabled,
"api_key_present": bool(os.getenv(helius_env, "").strip()),
"implemented": True,
"role": "Solana 已映射 mint 的解析交易与大额 token 活动",
@@ -576,7 +600,7 @@ def get_onchain_provider_status(hours=24):
"metrics": 0,
"signals": int(sum(row["count"] for row in signal_sources if row["source"] == "helius")),
"status": _provider_status_label(
- bool(cfg.get("helius_enabled", True)),
+ helius_enabled,
True,
int(sum(row["count"] for row in signal_sources if row["source"] == "helius")),
last_error if "helius" in str(last_error).lower() else "",
diff --git a/app/db/paper_trading.py b/app/db/paper_trading.py
index 3d63ac7..d0fefa2 100644
--- a/app/db/paper_trading.py
+++ b/app/db/paper_trading.py
@@ -8,6 +8,7 @@ from datetime import datetime, timedelta
from app.config.system_config import paper_trading_config
from app.db.schema import get_conn
+from app.db.system_logs import record_system_error
from app.integrations.feishu_push import push_card
@@ -267,7 +268,7 @@ def _push_paper_card(event_type: str, symbol: str, title: str, template: str, fi
elements.append(_card_note(note))
if event_time:
elements.append(_card_note(f"时间: {event_time}"))
- push_card({
+ ok, result = push_card({
"metadata": {"source": "paper_trading", "event_type": event_type, "symbol": symbol},
"config": {"wide_screen_mode": True},
"header": {
@@ -276,8 +277,26 @@ def _push_paper_card(event_type: str, symbol: str, title: str, template: str, fi
},
"elements": elements,
})
- except Exception:
- pass
+ if not ok:
+ record_system_error(
+ source="paper_trading",
+ level="warning",
+ error_type="FeishuPushFailed",
+ message=f"Feishu push failed for {event_type} {symbol}: {str(result)[:500]}",
+ status_code=0,
+ context={"event_type": event_type, "symbol": symbol, "push_result": result},
+ fingerprint=f"paper_trading_feishu_push_failed:{event_type}:{symbol}",
+ )
+ except Exception as exc:
+ record_system_error(
+ source="paper_trading",
+ level="warning",
+ error_type=exc.__class__.__name__,
+ message=f"Feishu push exception for {event_type} {symbol}: {str(exc)[:500]}",
+ status_code=0,
+ context={"event_type": event_type, "symbol": symbol},
+ fingerprint=f"paper_trading_feishu_push_exception:{event_type}:{symbol}",
+ )
def _push_custom_paper_card(card: dict) -> tuple[bool, object]:
@@ -306,16 +325,18 @@ def _push_event_card(event_type: str, trade: dict, result: dict, event_time: str
)
return
if event_type == "close":
+ exit_reason = str(result.get("exit_reason") or "--")
+ title_prefix = "移动止盈成交平仓" if exit_reason == "trailing_stop" else "交易平仓"
_push_paper_card(
event_type,
symbol,
- f"交易平仓 - {short_symbol}",
+ f"{title_prefix} - {short_symbol}",
"red" if _safe_float(result.get("pnl_usdt")) < 0 else "green",
[
("退出价", _fmt_price(result.get("exit_price"))),
("收益率", _fmt_pct(result.get("pnl_pct"))),
("收益额", f"{_safe_float(result.get('pnl_usdt')):.2f} USDT"),
- ("原因", result.get("exit_reason") or "--"),
+ ("原因", exit_reason),
],
"收益以交易账本记录为准。",
event_time,
diff --git a/app/services/nodereal_client.py b/app/services/nodereal_client.py
new file mode 100644
index 0000000..5bb8ae4
--- /dev/null
+++ b/app/services/nodereal_client.py
@@ -0,0 +1,90 @@
+"""Small JSON-RPC client for NodeReal MegaNode.
+
+The on-chain monitor only needs a narrow subset of NodeReal right now:
+standard EVM logs plus a few enhanced token APIs. Keeping this adapter small
+prevents provider-specific details from leaking into strategy code.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Any
+
+import requests
+
+
+DEFAULT_CHAIN_ENDPOINTS = {
+ "ethereum": "https://eth-mainnet.nodereal.io/v1/{api_key}",
+ "bsc": "https://bsc-mainnet.nodereal.io/v1/{api_key}",
+}
+
+
+@dataclass(frozen=True)
+class NodeRealConfig:
+ api_key: str
+ timeout: int = 15
+ endpoints: dict[str, str] | None = None
+
+
+class NodeRealClient:
+ def __init__(self, config: NodeRealConfig):
+ self.config = config
+ self.endpoints = {**DEFAULT_CHAIN_ENDPOINTS, **(config.endpoints or {})}
+
+ def supports_chain(self, chain: str) -> bool:
+ return bool(self._endpoint(chain))
+
+ def call(self, chain: str, method: str, params: list[Any] | None = None) -> Any:
+ endpoint = self._endpoint(chain)
+ if not endpoint:
+ raise ValueError(f"nodereal_chain_not_configured:{chain}")
+ payload = {
+ "jsonrpc": "2.0",
+ "id": 1,
+ "method": method,
+ "params": params or [],
+ }
+ resp = requests.post(
+ endpoint,
+ json=payload,
+ timeout=self.config.timeout,
+ headers={"Content-Type": "application/json", "User-Agent": "AlphaX-Agent-Crypto/1.0"},
+ )
+ if resp.status_code >= 400:
+ raise RuntimeError(f"nodereal_http_{resp.status_code}:{resp.text[:200]}")
+ data = resp.json()
+ if data.get("error"):
+ raise RuntimeError(f"nodereal_rpc_error:{data['error']}")
+ return data.get("result")
+
+ def block_number(self, chain: str) -> int:
+ return _hex_to_int(self.call(chain, "eth_blockNumber", []))
+
+ def get_logs(self, chain: str, log_filter: dict[str, Any]) -> list[dict[str, Any]]:
+ result = self.call(chain, "eth_getLogs", [log_filter])
+ return result if isinstance(result, list) else []
+
+ def token_holder_count(self, chain: str, contract_address: str) -> int:
+ return _hex_to_int(self.call(chain, "nr_getTokenHolderCount", [contract_address]))
+
+ def _endpoint(self, chain: str) -> str:
+ chain_key = str(chain or "").lower().strip()
+ template = self.endpoints.get(chain_key, "")
+ if not template or not self.config.api_key:
+ return ""
+ return template.format(api_key=self.config.api_key)
+
+
+def _hex_to_int(value: Any) -> int:
+ if value is None:
+ return 0
+ if isinstance(value, int):
+ return value
+ text = str(value).strip()
+ if not text:
+ return 0
+ try:
+ return int(text, 16) if text.startswith("0x") else int(text)
+ except Exception:
+ return 0
+
diff --git a/app/services/onchain_monitor.py b/app/services/onchain_monitor.py
index 2dff3e8..8d87c07 100644
--- a/app/services/onchain_monitor.py
+++ b/app/services/onchain_monitor.py
@@ -15,6 +15,7 @@ import requests
from app.config.system_config import onchain_config
from app.db import onchain_db
from app.db.altcoin_db import get_conn, init_db, log_cron_run
+from app.db.tracking_queries import get_latest_price_cache
from app.db.onchain_db import (
MIN_MAPPING_CONFIDENCE,
POSITIVE_SIGNALS,
@@ -31,9 +32,10 @@ from app.db.onchain_db import (
)
from app.services.event_driven_screener import _event_hash as event_hash
from app.services.event_driven_screener import _tradable_symbol, init_event_tables
+from app.services.nodereal_client import DEFAULT_CHAIN_ENDPOINTS, NodeRealClient, NodeRealConfig
-DEFAULT_CHAINS = ("ethereum", "bsc", "base", "arbitrum", "solana")
+DEFAULT_CHAINS = ("ethereum", "bsc")
ETHERSCAN_CHAIN_IDS = {
"ethereum": "1",
"bsc": "56",
@@ -70,6 +72,7 @@ DEXSCREENER_RAW_ENDPOINTS = (
("token_boost_latest", "https://api.dexscreener.com/token-boosts/latest/v1"),
("token_boost_top", "https://api.dexscreener.com/token-boosts/top/v1"),
)
+TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
def _env_bool(name, default=False):
@@ -103,6 +106,7 @@ def get_onchain_params():
chains = [str(x).strip().lower() for x in chains_raw if str(x).strip()]
etherscan_env = str(cfg.get("etherscan_api_key_env") or "ALPHAX_ETHERSCAN_API_KEY")
helius_env = str(cfg.get("helius_api_key_env") or "ALPHAX_HELIUS_API_KEY")
+ nodereal_env = str(cfg.get("nodereal_api_key_env") or "ALPHAX_NODEREAL_API_KEY")
etherscan_chains_raw = cfg.get("etherscan_chains") or ["ethereum"]
if isinstance(etherscan_chains_raw, str):
etherscan_chains = [x.strip().lower() for x in etherscan_chains_raw.split(",") if x.strip()]
@@ -110,8 +114,15 @@ def get_onchain_params():
etherscan_chains = [str(x).strip().lower() for x in etherscan_chains_raw if str(x).strip()]
return {
"enabled": bool(cfg.get("enabled", False)),
+ "provider": str(cfg.get("provider") or "nodereal").strip().lower(),
"chains": chains or list(DEFAULT_CHAINS),
"timeout": int(cfg.get("timeout") or 15),
+ "nodereal_enabled": bool(cfg.get("nodereal_enabled", True)),
+ "nodereal_chains": _normalize_chain_list(cfg.get("nodereal_chains") or ("ethereum", "bsc")),
+ "nodereal_api_key": os.getenv(nodereal_env, "").strip(),
+ "nodereal_api_key_env": nodereal_env,
+ "nodereal_log_block_lookback": int(cfg.get("nodereal_log_block_lookback") or 120),
+ "nodereal_max_logs_per_token": int(cfg.get("nodereal_max_logs_per_token") or 25),
"candidate_enabled": bool(cfg.get("candidate_enabled", True)),
"candidate_min_score": float(cfg.get("candidate_min_score") or 70),
"candidate_min_confidence": int(cfg.get("candidate_min_confidence") or 70),
@@ -135,6 +146,12 @@ def get_onchain_params():
}
+def _normalize_chain_list(value):
+ if isinstance(value, str):
+ return [x.strip().lower() for x in value.split(",") if x.strip()]
+ return [str(x).strip().lower() for x in (value or []) if str(x).strip()]
+
+
def _now():
return datetime.now()
@@ -664,17 +681,209 @@ def _event_from_etherscan_transfer(row, mapping, cfg=None):
def _latest_price_from_metric(mapping):
- latest = _latest_metric(
- normalize_symbol(mapping.get("symbol")),
- str(mapping.get("chain") or "").lower(),
- str(mapping.get("contract_address") or ""),
- )
- raw = {}
+ symbol = normalize_symbol(mapping.get("symbol"))
+ chain = str(mapping.get("chain") or "").lower()
+ contract = str(mapping.get("contract_address") or "")
+ conn = get_conn()
try:
- raw = json.loads(latest.get("raw_json") or "{}") if latest else {}
+ rows = conn.execute(
+ """
+ SELECT raw_json
+ FROM onchain_token_metrics
+ WHERE symbol=%s AND chain=%s AND contract_address=%s
+ ORDER BY metric_time DESC, id DESC
+ LIMIT 8
+ """,
+ (symbol, chain, contract),
+ ).fetchall()
+ finally:
+ conn.close()
+ for row in rows:
+ try:
+ raw = json.loads(row.get("raw_json") or "{}")
+ except Exception:
+ raw = {}
+ price = _safe_float(raw.get("price_usd"))
+ if price > 0:
+ return price
+ cache = get_latest_price_cache([symbol])
+ item = cache.get(symbol) or {}
+ return _safe_float(item.get("price"))
+
+
+def _hex_to_int(value):
+ text = str(value or "").strip()
+ if not text:
+ return 0
+ try:
+ return int(text, 16) if text.startswith("0x") else int(text)
except Exception:
- raw = {}
- return _safe_float(raw.get("price_usd"))
+ return 0
+
+
+def _topic_to_address(topic):
+ topic = str(topic or "").lower()
+ if topic.startswith("0x") and len(topic) >= 42:
+ return "0x" + topic[-40:]
+ return ""
+
+
+def _nodereal_client(cfg=None):
+ cfg = cfg or get_onchain_params()
+ return NodeRealClient(
+ NodeRealConfig(
+ api_key=cfg.get("nodereal_api_key") or "",
+ timeout=int(cfg.get("timeout") or 15),
+ endpoints=dict(DEFAULT_CHAIN_ENDPOINTS),
+ )
+ )
+
+
+def _event_from_nodereal_transfer(log, mapping, cfg=None):
+ cfg = cfg or get_onchain_params()
+ topics = log.get("topics") or []
+ if len(topics) < 3:
+ return None
+ amount_raw = _hex_to_int(log.get("data"))
+ mapping_raw = {}
+ try:
+ mapping_raw = json.loads(mapping.get("raw_json") or "{}")
+ except Exception:
+ mapping_raw = {}
+ decimals = _safe_int(mapping_raw.get("decimals") or mapping_raw.get("tokenDecimal") or 18, 18)
+ amount = amount_raw / (10 ** decimals if decimals >= 0 else 1)
+ price_usd = _latest_price_from_metric(mapping)
+ value_usd = amount * price_usd if price_usd > 0 else 0
+ threshold = _safe_float(cfg.get("whale_tx_usd"), 250000)
+ if value_usd <= 0 or value_usd < threshold:
+ return None
+ chain = str(mapping.get("chain") or "").lower()
+ tx_hash = str(log.get("transactionHash") or "").strip()
+ return {
+ "chain": chain,
+ "symbol": mapping.get("symbol"),
+ "contract_address": mapping.get("contract_address") or "",
+ "event_type": "token_transfer",
+ "signal_code": "whale_accumulation",
+ "signal_label": signal_label("whale_accumulation"),
+ "direction": "positive",
+ "value_usd": value_usd,
+ "amount": amount,
+ "tx_hash": tx_hash,
+ "wallet_address": _topic_to_address(topics[2]),
+ "wallet_label": "EVM 接收地址",
+ "counterparty_label": "EVM 发送地址 " + _short_addr(_topic_to_address(topics[1])),
+ "confidence": 76,
+ "severity": "A",
+ "detected_at": _now().isoformat(timespec="seconds"),
+ "source": "nodereal",
+ "url": _chain_explorer_tx_url(chain, tx_hash),
+ "raw": log,
+ }
+
+
+def _metric_from_nodereal_holder_count(holder_count, mapping):
+ symbol = normalize_symbol(mapping.get("symbol"))
+ chain = str(mapping.get("chain") or "").lower()
+ contract = str(mapping.get("contract_address") or "")
+ prev = _latest_metric(symbol, chain, contract)
+ prev_count = 0
+ if prev:
+ try:
+ prev_raw = json.loads(prev.get("raw_json") or "{}")
+ prev_count = _safe_int(prev_raw.get("holder_count"))
+ except Exception:
+ prev_count = 0
+ holder_delta = holder_count - prev_count if prev_count > 0 else 0
+ metric = {
+ "symbol": symbol,
+ "chain": chain,
+ "contract_address": contract,
+ "window": "1h",
+ "metric_time": _now().isoformat(timespec="seconds"),
+ "holder_delta": holder_delta,
+ "smart_money_score": 0,
+ "source": "nodereal",
+ "raw": {
+ "holder_count": holder_count,
+ "previous_holder_count": prev_count,
+ },
+ }
+ if holder_delta > 0:
+ metric["onchain_score"] = min(30, holder_delta)
+ elif holder_delta < 0:
+ metric["risk_score"] = min(30, abs(holder_delta))
+ return metric
+
+
+def _event_from_holder_metric(metric):
+ holder_delta = _safe_float(metric.get("holder_delta"))
+ if holder_delta <= 0:
+ return None
+ if holder_delta < 20:
+ return None
+ return _event_from_metric(metric, "holder_growth", source="nodereal")
+
+
+def fetch_nodereal_events(limit=60):
+ cfg = get_onchain_params()
+ if not cfg.get("nodereal_enabled", True):
+ return {"metrics": [], "events": [], "errors": ["nodereal_disabled"]}
+ if not cfg.get("nodereal_api_key"):
+ return {"metrics": [], "events": [], "errors": ["nodereal_api_key_missing"]}
+ client = _nodereal_client(cfg)
+ enabled_chains = set(cfg.get("nodereal_chains") or DEFAULT_CHAINS)
+ mappings = [
+ m for m in get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE)
+ if str(m.get("chain") or "").lower() in enabled_chains and client.supports_chain(str(m.get("chain") or "").lower())
+ ]
+ metrics = []
+ events = []
+ errors = []
+ lookback = max(1, int(cfg.get("nodereal_log_block_lookback") or 120))
+ max_logs = max(1, int(cfg.get("nodereal_max_logs_per_token") or 25))
+ for mapping in mappings[: int(limit or 60)]:
+ chain = str(mapping.get("chain") or "").lower()
+ contract = str(mapping.get("contract_address") or "").strip()
+ if not contract:
+ continue
+ try:
+ holder_count = client.token_holder_count(chain, contract)
+ if holder_count:
+ metric = _metric_from_nodereal_holder_count(holder_count, mapping)
+ insert_token_metric(metric)
+ metrics.append(metric)
+ holder_event = _event_from_holder_metric(metric)
+ if holder_event and insert_onchain_event(holder_event):
+ events.append(holder_event)
+ except Exception as exc:
+ errors.append(f"{mapping.get('symbol')}:nodereal_holder:{str(exc)[:160]}")
+ try:
+ latest = client.block_number(chain)
+ if latest <= 0:
+ continue
+ logs = client.get_logs(
+ chain,
+ {
+ "address": contract,
+ "fromBlock": hex(max(0, latest - lookback)),
+ "toBlock": hex(latest),
+ "topics": [TRANSFER_TOPIC],
+ },
+ )
+ for log in logs[:max_logs]:
+ if not isinstance(log, dict):
+ continue
+ event = _event_from_nodereal_transfer(log, mapping, cfg=cfg)
+ if not event:
+ continue
+ if insert_onchain_event(event):
+ events.append(event)
+ except Exception as exc:
+ errors.append(f"{mapping.get('symbol')}:nodereal_logs:{str(exc)[:160]}")
+ if not mappings:
+ errors.append("nodereal_no_supported_mappings")
+ return {"metrics": metrics, "events": events, "errors": errors}
def fetch_etherscan_events(limit=60):
@@ -959,32 +1168,17 @@ def run_once(limit=60):
"check_time": _now().isoformat(),
}
if cfg.get("enabled"):
- raw = fetch_dexscreener_raw_events(limit=limit)
- output["raw_events_count"] = len(raw.get("raw_events") or [])
- output["errors"].extend(raw.get("errors") or [])
- dex = fetch_dexscreener_metrics(limit=limit)
- output["metrics_count"] += len(dex.get("metrics") or [])
- output["events_count"] += len(dex.get("events") or [])
- output["errors"].extend(dex.get("errors") or [])
- eth = fetch_etherscan_events(limit=limit)
- output["events_count"] += len(eth.get("events") or [])
- output["errors"].extend(eth.get("errors") or [])
- hel = fetch_helius_events(limit=limit)
- output["events_count"] += len(hel.get("events") or [])
- output["errors"].extend(hel.get("errors") or [])
- output["discovered_mappings"] = discover_token_mappings(limit=limit).get("inserted", 0) if not get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) else 0
+ node = fetch_nodereal_events(limit=limit)
+ output["metrics_count"] += len(node.get("metrics") or [])
+ output["events_count"] += len(node.get("events") or [])
+ output["errors"].extend(node.get("errors") or [])
+ output["discovered_mappings"] = 0
if output.get("discovered_mappings"):
output["status"] = "bootstrapped"
- dex = fetch_dexscreener_metrics(limit=limit)
- output["metrics_count"] = len(dex.get("metrics") or [])
- output["events_count"] = len(dex.get("events") or [])
- output["errors"].extend(dex.get("errors") or [])
- eth = fetch_etherscan_events(limit=limit)
- output["events_count"] += len(eth.get("events") or [])
- output["errors"].extend(eth.get("errors") or [])
- hel = fetch_helius_events(limit=limit)
- output["events_count"] += len(hel.get("events") or [])
- output["errors"].extend(hel.get("errors") or [])
+ node = fetch_nodereal_events(limit=limit)
+ output["metrics_count"] = len(node.get("metrics") or [])
+ output["events_count"] = len(node.get("events") or [])
+ output["errors"].extend(node.get("errors") or [])
queued = enqueue_onchain_candidates()
output["candidate_queued"] = queued.get("queued", 0)
output["candidate_symbols"] = queued.get("symbols", [])
@@ -1020,6 +1214,7 @@ __all__ = [
"fetch_dexscreener_raw_events",
"fetch_etherscan_events",
"fetch_helius_events",
+ "fetch_nodereal_events",
"get_onchain_params",
"ingest_normalized_events",
"normalize_dexscreener_pair",
diff --git a/app/web/routes_admin.py b/app/web/routes_admin.py
index bed770a..d397ff1 100644
--- a/app/web/routes_admin.py
+++ b/app/web/routes_admin.py
@@ -4,6 +4,7 @@ from fastapi.responses import HTMLResponse, Response
from app.config.system_config import seed_runtime_system_defaults
from app.db import auth_db
from app.db import chat_assistant_db
+from app.db.analytics import get_cron_run_logs, get_cron_run_summary, get_pipeline_runs
from app.db.data_export import build_data_export_bundle
from app.db.scheduler_db import (
enqueue_manual_trigger,
@@ -86,6 +87,21 @@ def build_router(templates):
raise HTTPException(status_code=404, detail="日志不存在")
return item
+ @router.get("/api/admin/cron-runs")
+ async def api_admin_cron_runs(limit: int = 80, job_name: str = "", altcoin_session: str = Cookie(default="")):
+ require_admin(altcoin_session)
+ return {"items": get_cron_run_logs(limit=limit, job_name=job_name or None)}
+
+ @router.get("/api/admin/cron-runs/summary")
+ async def api_admin_cron_run_summary(hours: int = 24, altcoin_session: str = Cookie(default="")):
+ require_admin(altcoin_session)
+ return get_cron_run_summary(hours=hours)
+
+ @router.get("/api/admin/pipeline-runs")
+ async def api_admin_pipeline_runs(limit: int = 30, hours: int = 24, offset: int = 0, altcoin_session: str = Cookie(default="")):
+ require_admin(altcoin_session)
+ return get_pipeline_runs(limit=limit, hours=hours, offset=offset)
+
@router.get("/api/admin/chat-logs/overview")
async def api_admin_chat_logs_overview(hours: int = 24, altcoin_session: str = Cookie(default="")):
require_admin(altcoin_session)
diff --git a/app/web/routes_pages.py b/app/web/routes_pages.py
index 1012869..8a0dc77 100644
--- a/app/web/routes_pages.py
+++ b/app/web/routes_pages.py
@@ -96,6 +96,17 @@ def build_router(templates, repo_root: Path, stock_report_template: str):
return HTMLResponse(content=f"
需要管理员权限
{exc.detail}
返回看板", status_code=exc.status_code)
return render_page("system_logs.html", request, active_nav="system_logs")
+ @router.get("/logs", response_class=HTMLResponse)
+ async def logs_page(request: Request):
+ user, redirect = require_page_user(request)
+ if redirect:
+ return redirect
+ try:
+ require_admin(request.cookies.get("altcoin_session", ""))
+ except HTTPException as exc:
+ return HTMLResponse(content=f"需要管理员权限
{exc.detail}
返回看板", status_code=exc.status_code)
+ return render_page("logs.html", request, active_nav="logs")
+
@router.get("/data-export", response_class=HTMLResponse)
async def data_export_page(request: Request):
user, redirect = require_page_user(request)
diff --git a/static/base.html b/static/base.html
index 483c4b6..ed35f6f 100644
--- a/static/base.html
+++ b/static/base.html
@@ -185,15 +185,13 @@ a { color: inherit; text-decoration: none; }
-
+
-
-