From 567b5b726852ea3e847db69a1a17e969da1d7edc Mon Sep 17 00:00:00 2001 From: aaron <> Date: Sun, 17 May 2026 19:39:11 +0800 Subject: [PATCH] update --- app/config/system_config.py | 7 + app/db/analytics.py | 19 +- app/db/onchain_db.py | 209 +++++++++++++++++++++- app/services/market_overview.py | 196 +++++++++++++++++++++ app/services/onchain_monitor.py | 283 +++++++++++++++++++++++++++++- app/web/routes_market.py | 18 +- app/web/routes_onchain.py | 9 + static/market.html | 145 ++++----------- static/onchain.html | 29 +-- tests/test_market_overview_api.py | 70 ++++++++ tests/test_onchain_tracking.py | 111 ++++++++++++ 11 files changed, 959 insertions(+), 137 deletions(-) create mode 100644 app/services/market_overview.py create mode 100644 tests/test_market_overview_api.py diff --git a/app/config/system_config.py b/app/config/system_config.py index 2af298e..5953a5f 100644 --- a/app/config/system_config.py +++ b/app/config/system_config.py @@ -79,9 +79,16 @@ def default_onchain_config(default_chains=("ethereum", "bsc", "base", "arbitrum" "dex_volume_spike_pct": _env_float("ALPHAX_ONCHAIN_DEX_VOLUME_SPIKE_PCT", 80), "dex_min_liquidity_usd": _env_float("ALPHAX_ONCHAIN_DEX_MIN_LIQUIDITY_USD", 100000), "dex_min_volume_24h_usd": _env_float("ALPHAX_ONCHAIN_DEX_MIN_VOLUME_24H_USD", 100000), + "dex_min_hour_volume_usd": _env_float("ALPHAX_ONCHAIN_DEX_MIN_HOUR_VOLUME_USD", 50000), + "dex_hour_volume_share_pct": _env_float("ALPHAX_ONCHAIN_DEX_HOUR_VOLUME_SHARE_PCT", 8), "liquidity_add_pct": _env_float("ALPHAX_ONCHAIN_LIQUIDITY_ADD_PCT", 25), "liquidity_remove_pct": _env_float("ALPHAX_ONCHAIN_LIQUIDITY_REMOVE_PCT", -25), "whale_tx_usd": _env_float("ALPHAX_ONCHAIN_WHALE_TX_USD", 250000), + "etherscan_enabled": _env_bool("ALPHAX_ETHERSCAN_ENABLED", True), + "etherscan_chains": _env_list("ALPHAX_ETHERSCAN_CHAINS", ("ethereum",)), + "helius_enabled": _env_bool("ALPHAX_HELIUS_ENABLED", True), + "etherscan_base_url": _env_str("ALPHAX_ETHERSCAN_BASE_URL", "https://api.etherscan.io/v2/api"), + "helius_base_url": _env_str("ALPHAX_HELIUS_BASE_URL", "https://api.helius.xyz"), "etherscan_api_key_env": "ALPHAX_ETHERSCAN_API_KEY", "helius_api_key_env": "ALPHAX_HELIUS_API_KEY", } diff --git a/app/db/analytics.py b/app/db/analytics.py index 57a002d..5b70a87 100644 --- a/app/db/analytics.py +++ b/app/db/analytics.py @@ -671,15 +671,26 @@ def get_stats(): "sector": derived.get("sector_context") or {}, }) - def avg_from_context(group_key, field): + def values_from_context(group_key, field, include_zero=True): values = [] for ctx in actionable_contexts: - value = (ctx.get(group_key) or {}).get(field) + group = ctx.get(group_key) or {} + if field not in group or group.get(field) in ("", None): + continue + value = group.get(field) if isinstance(value, (int, float)): - values.append(float(value)) + numeric = float(value) + if include_zero or numeric != 0: + values.append(numeric) + return values + + def avg_from_context(group_key, field, include_zero=True): + values = values_from_context(group_key, field, include_zero=include_zero) if not values: return 0 avg = sum(values) / len(values) + if field == "funding_rate": + return round(avg, 6) if abs(avg) < 0.01: return round(avg, 3) return round(avg, 1) @@ -696,7 +707,9 @@ def get_stats(): "avg_turnover_acceleration_4h": avg_from_context("market", "turnover_acceleration_4h"), "avg_volume_24h": avg_from_context("market", "volume_24h"), "avg_funding_rate": avg_from_context("derivatives", "funding_rate"), + "funding_rate_sample_count": len(values_from_context("derivatives", "funding_rate")), "avg_top_trader_long_pct": avg_from_context("derivatives", "top_trader_long_pct"), + "top_trader_sample_count": len(values_from_context("derivatives", "top_trader_long_pct")), "avg_top_trader_long_short_ratio": avg_from_context("derivatives", "top_trader_long_short_ratio"), "hot_sector_count": len(hot_sector_counter), "top_hot_sectors": [ diff --git a/app/db/onchain_db.py b/app/db/onchain_db.py index 1367798..fce8460 100644 --- a/app/db/onchain_db.py +++ b/app/db/onchain_db.py @@ -6,16 +6,19 @@ mutate trading recommendations directly. """ import json +import os from datetime import datetime, timedelta from app.db.altcoin_db import get_conn from app.db.postgres_connection import ensure_migrations_once +from app.config.system_config import onchain_config MIN_MAPPING_CONFIDENCE = 70 SIGNAL_LABELS = { + "large_token_transfer": "链上大额转账", "dex_volume_spike": "DEX 放量", "liquidity_add": "流动性增加", "liquidity_remove_risk": "流动性撤出风险", @@ -27,9 +30,27 @@ SIGNAL_LABELS = { } RAW_EVENT_TYPE_LABELS = { - "token_profile_latest": "Token 资料更新", - "token_boost_latest": "DEX 热度 Boost", - "token_boost_top": "DEX Boost 榜", + "token_profile_latest": "DEX 新币资料变更", + "token_boost_latest": "DEX 付费曝光新增", + "token_boost_top": "DEX 付费曝光榜", +} + +RAW_EVENT_EXPLAINERS = { + "token_profile_latest": { + "plain": "项目方或社区刚在 DEX Screener 更新了代币资料、图标或链接。", + "meaning": "只代表曝光资料发生变化,信号较弱,通常不能单独说明有资金买入。", + "priority": "low", + }, + "token_boost_latest": { + "plain": "有人为这个代币购买了 DEX Screener 曝光位。", + "meaning": "代表短期推广热度上升,可能吸引散户注意,但也可能只是营销。", + "priority": "medium", + }, + "token_boost_top": { + "plain": "这个代币出现在 DEX Screener 付费曝光榜前列。", + "meaning": "代表平台内关注度较高,需要再看成交量、流动性和是否能映射交易对。", + "priority": "medium", + }, } POSITIVE_SIGNALS = {"dex_volume_spike", "liquidity_add", "exchange_outflow", "whale_accumulation", "smart_money_buying"} @@ -81,6 +102,17 @@ def raw_event_type_label(event_type): return RAW_EVENT_TYPE_LABELS.get(str(event_type or ""), str(event_type or "链上原始事件")) +def raw_event_explainer(event_type): + return RAW_EVENT_EXPLAINERS.get( + str(event_type or ""), + { + "plain": "链上或链上相关数据源捕捉到一条原始动态。", + "meaning": "需要完成币种映射和质量验证后,才可能进入技术检查。", + "priority": "medium", + }, + ) + + def init_onchain_tables(): ensure_migrations_once() @@ -390,6 +422,7 @@ def get_onchain_overview(hours=24): """ SELECT * FROM onchain_raw_events WHERE detected_at >= %s + AND event_type NOT IN ('token_profile_latest', 'token_boost_latest', 'token_boost_top') ORDER BY detected_at::timestamp DESC, importance DESC, id DESC LIMIT 12 """, @@ -426,9 +459,163 @@ def get_onchain_overview(hours=24): "risk_tokens": [_format_metric_item(row, active) for row in risks], "raw_events": [_format_raw_event(row) for row in raw_latest], "signals": _signal_counts(events), + "provider_status": get_onchain_provider_status(hours=hours), } +def get_onchain_provider_status(hours=24): + init_onchain_tables() + cfg = onchain_config() + hours = int(hours or 24) + cutoff = (datetime.now() - timedelta(hours=hours)).isoformat() + etherscan_env = str(cfg.get("etherscan_api_key_env") or "ALPHAX_ETHERSCAN_API_KEY") + helius_env = str(cfg.get("helius_api_key_env") or "ALPHAX_HELIUS_API_KEY") + etherscan_chains = cfg.get("etherscan_chains") or ["ethereum"] + if isinstance(etherscan_chains, str): + etherscan_chains = [x.strip().lower() for x in etherscan_chains.split(",") if x.strip()] + conn = get_conn() + try: + raw_total = conn.execute("SELECT COUNT(*) FROM onchain_raw_events WHERE detected_at >= %s", (cutoff,)).fetchone()[0] + metric_total = conn.execute("SELECT COUNT(*) FROM onchain_token_metrics WHERE metric_time >= %s", (cutoff,)).fetchone()[0] + signal_total = conn.execute("SELECT COUNT(*) FROM onchain_events WHERE detected_at >= %s", (cutoff,)).fetchone()[0] + candidate_total = conn.execute( + """ + SELECT COUNT(*) + FROM event_news + WHERE source='onchain' AND detected_at >= %s + """, + (cutoff,), + ).fetchone()[0] + mapping_total = conn.execute("SELECT COUNT(*) FROM onchain_token_map WHERE is_active=1").fetchone()[0] + mapping_usable = conn.execute( + "SELECT COUNT(*) FROM onchain_token_map WHERE is_active=1 AND confidence >= %s", + (MIN_MAPPING_CONFIDENCE,), + ).fetchone()[0] + raw_by_type = conn.execute( + """ + SELECT event_type, COUNT(*) AS count + FROM onchain_raw_events + WHERE detected_at >= %s + GROUP BY event_type + ORDER BY count DESC, event_type + """, + (cutoff,), + ).fetchall() + metric_sources = conn.execute( + """ + SELECT source, COUNT(*) AS count + FROM onchain_token_metrics + WHERE metric_time >= %s + GROUP BY source + ORDER BY count DESC, source + """, + (cutoff,), + ).fetchall() + signal_sources = conn.execute( + """ + SELECT source, COUNT(*) AS count + FROM onchain_events + WHERE detected_at >= %s + GROUP BY source + ORDER BY count DESC, source + """, + (cutoff,), + ).fetchall() + last_onchain = conn.execute( + """ + SELECT * + FROM cron_run_log + WHERE script_name='onchain_monitor.py' OR job_name IN ('链上','onchain') + ORDER BY started_at DESC, id DESC + LIMIT 1 + """ + ).fetchone() + finally: + conn.close() + + summary = _load(last_onchain.get("summary_json") if last_onchain else "{}", {}) if last_onchain else {} + last_error = last_onchain.get("error_message") if last_onchain else "" + providers = [ + { + "provider": "dexscreener", + "label": "DEX Screener", + "enabled": bool(cfg.get("dexscreener_enabled", True)), + "api_key_present": True, + "implemented": True, + "role": "低优先级曝光源:Token 资料、付费推广、已映射合约的 DEX 成交量与流动性", + "raw_events": int(raw_total or 0), + "metrics": int(metric_total or 0), + "signals": int(sum(row["count"] for row in signal_sources if row["source"] == "dexscreener")), + "status": _provider_status_label(bool(cfg.get("dexscreener_enabled", True)), True, raw_total + metric_total, last_error), + }, + { + "provider": "etherscan", + "label": "Etherscan", + "enabled": bool(cfg.get("etherscan_enabled", True)), + "api_key_present": bool(os.getenv(etherscan_env, "").strip()), + "implemented": True, + "role": "EVM 已映射合约的 ERC20 大额转账,当前链: " + ", ".join(etherscan_chains or ["ethereum"]), + "raw_events": 0, + "metrics": 0, + "signals": int(sum(row["count"] for row in signal_sources if row["source"] == "etherscan")), + "status": _provider_status_label( + bool(cfg.get("etherscan_enabled", True)), + True, + int(sum(row["count"] for row in signal_sources if row["source"] == "etherscan")), + last_error if "etherscan" in str(last_error).lower() else "", + ), + }, + { + "provider": "helius", + "label": "Helius", + "enabled": bool(cfg.get("helius_enabled", True)), + "api_key_present": bool(os.getenv(helius_env, "").strip()), + "implemented": True, + "role": "Solana 已映射 mint 的解析交易与大额 token 活动", + "raw_events": 0, + "metrics": 0, + "signals": int(sum(row["count"] for row in signal_sources if row["source"] == "helius")), + "status": _provider_status_label( + bool(cfg.get("helius_enabled", True)), + True, + int(sum(row["count"] for row in signal_sources if row["source"] == "helius")), + last_error if "helius" in str(last_error).lower() else "", + ), + }, + ] + return { + "hours": hours, + "enabled": bool(cfg.get("enabled", False)), + "last_run": dict(last_onchain) if last_onchain else None, + "last_summary": summary, + "last_error": last_error or "", + "coverage": { + "active_mappings": int(mapping_total or 0), + "usable_mappings": int(mapping_usable or 0), + "raw_events": int(raw_total or 0), + "metrics": int(metric_total or 0), + "signals": int(signal_total or 0), + "queued_candidates": int(candidate_total or 0), + }, + "raw_event_types": [dict(row) for row in raw_by_type], + "metric_sources": [dict(row) for row in metric_sources], + "signal_sources": [dict(row) for row in signal_sources], + "providers": providers, + } + + +def _provider_status_label(enabled, implemented, count, last_error=""): + if not enabled: + return "已关闭" + if not implemented: + return "未接入采集" + if count: + return "正常采集中" + if last_error: + return "最近采集失败" + return "暂无数据" + + def _signal_counts(events): counts = {} for e in events: @@ -596,7 +783,7 @@ def list_onchain_events(limit=50, offset=0, chain="", signal="", status="", hour return {"items": [_with_raw(row) for row in rows], "total": int(total or 0), "limit": limit, "offset": offset, "has_more": offset + len(rows) < int(total or 0)} -def list_onchain_raw_events(limit=50, offset=0, chain="", source="", event_type="", mapping_status="", hours=24): +def list_onchain_raw_events(limit=50, offset=0, chain="", source="", event_type="", mapping_status="", priority="", hours=24): init_onchain_tables() limit = max(1, min(int(limit or 50), 200)) offset = max(0, int(offset or 0)) @@ -615,6 +802,11 @@ def list_onchain_raw_events(limit=50, offset=0, chain="", source="", event_type= if mapping_status: clauses.append("mapping_status=%s") params.append(mapping_status) + if priority: + if priority == "important": + clauses.append("event_type NOT IN ('token_profile_latest', 'token_boost_latest', 'token_boost_top')") + elif priority == "low": + clauses.append("event_type IN ('token_profile_latest', 'token_boost_latest', 'token_boost_top')") where = " AND ".join(clauses) conn = get_conn() total = conn.execute(f"SELECT COUNT(*) FROM onchain_raw_events WHERE {where}", tuple(params)).fetchone()[0] @@ -661,6 +853,15 @@ def _with_raw(row): def _format_raw_event(row): item = _with_raw(row) item["event_label"] = raw_event_type_label(item.get("event_type")) + explainer = raw_event_explainer(item.get("event_type")) + item["plain_summary"] = explainer.get("plain") or "" + item["why_matters"] = explainer.get("meaning") or "" + item["priority"] = explainer.get("priority") or "medium" + item["pipeline_note"] = ( + "已映射,可进入后续链上信号分析。" + if item.get("mapping_status") == "mapped" + else "未完成币种映射,仅作为原始观察,不进入推荐。" + ) item["token_short"] = _short_address(item.get("token_address")) return item diff --git a/app/services/market_overview.py b/app/services/market_overview.py new file mode 100644 index 0000000..9e4524d --- /dev/null +++ b/app/services/market_overview.py @@ -0,0 +1,196 @@ +"""Crypto market-wide overview providers. + +This module is intentionally separate from recommendation analytics. The market +overview page should describe the broad crypto environment, not the current +recommendation sample. +""" + +from __future__ import annotations + +from datetime import datetime + +import requests + +from app.services import altcoin_screener + + +def _safe_float(value, default=0.0): + try: + return float(value or 0) + except Exception: + return default + + +def _percentile(values, pct): + values = sorted([float(v) for v in values if isinstance(v, (int, float))]) + if not values: + return 0.0 + if len(values) == 1: + return round(values[0], 2) + k = (len(values) - 1) * float(pct) + lower = int(k) + upper = min(lower + 1, len(values) - 1) + weight = k - lower + return round(values[lower] * (1 - weight) + values[upper] * weight, 2) + + +def _market_state(avg_change, advance_decline_ratio, hot_count, crash_count, benchmarks=None): + benchmarks = benchmarks or {} + btc_change = _safe_float((benchmarks.get("BTC/USDT") or {}).get("change_24h")) + eth_change = _safe_float((benchmarks.get("ETH/USDT") or {}).get("change_24h")) + majors_weak = btc_change <= -2 or eth_change <= -2.5 + majors_strong = btc_change >= 1 and eth_change >= 1 + if crash_count >= 20 or advance_decline_ratio < 0.65 or (majors_weak and avg_change < 0): + return { + "label": "全市场偏弱", + "tone": "risk_off", + "summary": "主流币或山寨广度走弱,山寨机会更容易变成反弹噪音,优先控制仓位和追高风险。", + } + if avg_change >= 1.0 and advance_decline_ratio >= 1.2 and hot_count >= 20 and not majors_weak: + return { + "label": "全市场偏强", + "tone": "risk_on", + "summary": "上涨覆盖面和强势币数量都不错,可以更积极寻找放量后的确认机会。", + } + if majors_strong and hot_count >= 10 and advance_decline_ratio >= 0.9: + return { + "label": "主流带动轮动", + "tone": "selective", + "summary": "BTC/ETH 提供方向支撑,但山寨仍是结构性轮动,适合等待量价和入场窗口共振。", + } + return { + "label": "结构性行情", + "tone": "neutral", + "summary": "市场不是单边环境,机会更依赖板块轮动和单币确认,适合精选而不是广撒网。", + } + + +def _benchmark_overview(): + try: + tickers = altcoin_screener.exchange.fetch_tickers(["BTC/USDT", "ETH/USDT"]) + except Exception: + try: + tickers = altcoin_screener.exchange.fetch_tickers() + except Exception: + tickers = {} + result = {} + for symbol in ("BTC/USDT", "ETH/USDT"): + info = tickers.get(symbol) or {} + result[symbol] = { + "symbol": symbol, + "price": _safe_float(info.get("last")), + "change_24h": _safe_float(info.get("percentage")), + "volume_24h": _safe_float(info.get("quoteVolume")), + } + return result + + +def _funding_snapshot(): + try: + data = altcoin_screener.exchange.fapiPublicGetPremiumIndex() + except Exception: + try: + resp = requests.get("https://fapi.binance.com/fapi/v1/premiumIndex", timeout=8) + data = resp.json() if resp.status_code == 200 else [] + except Exception: + data = [] + if isinstance(data, dict): + data = [data] + result = {} + for item in data or []: + raw_symbol = str(item.get("symbol") or "") + if not raw_symbol.endswith("USDT"): + continue + rate = item.get("lastFundingRate") + try: + rate = float(rate) + except Exception: + continue + result[raw_symbol.replace("USDT", "/USDT")] = rate + return result + + +def _funding_overview(universe_symbols=None): + rates = _funding_snapshot() + if not rates: + rates = altcoin_screener.fetch_funding_rates() + universe = set(universe_symbols or []) + values = [ + float(v) + for symbol, v in (rates or {}).items() + if isinstance(v, (int, float)) and (not universe or symbol in universe) + ] + if not values: + return { + "sample_count": 0, + "avg_funding_rate": 0, + "positive_count": 0, + "negative_count": 0, + "extreme_positive_count": 0, + "extreme_negative_count": 0, + } + return { + "sample_count": len(values), + "avg_funding_rate": round(sum(values) / len(values), 6), + "positive_count": sum(1 for v in values if v > 0), + "negative_count": sum(1 for v in values if v < 0), + "extreme_positive_count": sum(1 for v in values if v >= 0.001), + "extreme_negative_count": sum(1 for v in values if v <= -0.001), + } + + +def get_crypto_market_overview(): + pairs = altcoin_screener.fetch_all_tickers() + benchmarks = _benchmark_overview() + items = [] + for symbol, info in (pairs or {}).items(): + volume = _safe_float(info.get("volume_24h")) + change = _safe_float(info.get("change_24h")) + price = _safe_float(info.get("price")) + if volume <= 0 or price <= 0: + continue + items.append({ + "symbol": symbol, + "price": price, + "change_24h": change, + "volume_24h": volume, + }) + changes = [x["change_24h"] for x in items] + volumes = [x["volume_24h"] for x in items] + up = sum(1 for x in changes if x > 0) + down = sum(1 for x in changes if x < 0) + hot = [x for x in items if x["change_24h"] >= 5] + crash = [x for x in items if x["change_24h"] <= -5] + adv_dec = round(up / down, 2) if down else float(up) + avg_change = round(sum(changes) / len(changes), 2) if changes else 0 + state = _market_state(avg_change, adv_dec, len(hot), len(crash), benchmarks=benchmarks) + top_gainers = sorted(items, key=lambda x: x["change_24h"], reverse=True)[:10] + top_losers = sorted(items, key=lambda x: x["change_24h"])[:10] + top_volume = sorted(items, key=lambda x: x["volume_24h"], reverse=True)[:10] + overview = { + "updated_at": datetime.now().isoformat(timespec="seconds"), + "source": "binance_spot_usdt_market", + "universe": "Binance spot USDT crypto market, with BTC/ETH as benchmarks and altcoin breadth excluding stables/wrapped/metals/BNB", + "benchmarks": benchmarks, + "sample_count": len(items), + "up_count": up, + "down_count": down, + "flat_count": max(0, len(items) - up - down), + "advance_decline_ratio": adv_dec, + "avg_change_24h": avg_change, + "median_change_24h": _percentile(changes, 0.5), + "p75_change_24h": _percentile(changes, 0.75), + "p25_change_24h": _percentile(changes, 0.25), + "hot_count_5pct": len(hot), + "crash_count_5pct": len(crash), + "total_quote_volume_24h": round(sum(volumes), 2), + "state": state, + "top_gainers": top_gainers, + "top_losers": top_losers, + "top_volume": top_volume, + "funding": _funding_overview({x["symbol"] for x in items}), + } + return overview + + +__all__ = ["get_crypto_market_overview"] diff --git a/app/services/onchain_monitor.py b/app/services/onchain_monitor.py index 75fcea8..2dff3e8 100644 --- a/app/services/onchain_monitor.py +++ b/app/services/onchain_monitor.py @@ -8,6 +8,7 @@ but it never creates recommendations or changes recommendation state directly. import json import os from datetime import datetime, timedelta +from urllib.parse import urlencode import requests @@ -33,6 +34,12 @@ from app.services.event_driven_screener import _tradable_symbol, init_event_tabl DEFAULT_CHAINS = ("ethereum", "bsc", "base", "arbitrum", "solana") +ETHERSCAN_CHAIN_IDS = { + "ethereum": "1", + "bsc": "56", + "base": "8453", + "arbitrum": "42161", +} SOLANA_AUTO_ALLOWLIST = { "WIF", "BONK", "JUP", "RAY", "PYTH", "PENGU", "JTO", "MEW", "POPCAT", "PNUT", "FARTCOIN", "RENDER", "HNT", "MOBILE", "ORCA", "KMNO", "DRIFT", "TNSR", "IO", @@ -96,6 +103,11 @@ def get_onchain_params(): chains = [str(x).strip().lower() for x in chains_raw if str(x).strip()] etherscan_env = str(cfg.get("etherscan_api_key_env") or "ALPHAX_ETHERSCAN_API_KEY") helius_env = str(cfg.get("helius_api_key_env") or "ALPHAX_HELIUS_API_KEY") + etherscan_chains_raw = cfg.get("etherscan_chains") or ["ethereum"] + if isinstance(etherscan_chains_raw, str): + etherscan_chains = [x.strip().lower() for x in etherscan_chains_raw.split(",") if x.strip()] + else: + etherscan_chains = [str(x).strip().lower() for x in etherscan_chains_raw if str(x).strip()] return { "enabled": bool(cfg.get("enabled", False)), "chains": chains or list(DEFAULT_CHAINS), @@ -105,12 +117,19 @@ def get_onchain_params(): "candidate_min_confidence": int(cfg.get("candidate_min_confidence") or 70), "candidate_cooldown_hours": float(cfg.get("candidate_cooldown_hours") or 6), "dexscreener_enabled": bool(cfg.get("dexscreener_enabled", True)), + "etherscan_enabled": bool(cfg.get("etherscan_enabled", True)), + "etherscan_chains": etherscan_chains or ["ethereum"], + "helius_enabled": bool(cfg.get("helius_enabled", True)), "dex_volume_spike_pct": float(cfg.get("dex_volume_spike_pct") or 80), "dex_min_liquidity_usd": float(cfg.get("dex_min_liquidity_usd") or 100000), "dex_min_volume_24h_usd": float(cfg.get("dex_min_volume_24h_usd") or 100000), "liquidity_add_pct": float(cfg.get("liquidity_add_pct") or 25), "liquidity_remove_pct": float(cfg.get("liquidity_remove_pct") or -25), + "dex_hour_volume_share_pct": float(cfg.get("dex_hour_volume_share_pct") or 8), + "dex_min_hour_volume_usd": float(cfg.get("dex_min_hour_volume_usd") or 50000), "whale_tx_usd": float(cfg.get("whale_tx_usd") or 250000), + "etherscan_base_url": str(cfg.get("etherscan_base_url") or "https://api.etherscan.io/v2/api").strip(), + "helius_base_url": str(cfg.get("helius_base_url") or "https://api.helius.xyz").strip().rstrip("/"), "etherscan_api_key": os.getenv(etherscan_env, "").strip(), "helius_api_key": os.getenv(helius_env, "").strip(), } @@ -142,11 +161,35 @@ def _safe_pct_change(new_value, old_value): return (new_value - old_value) / old_value * 100 +def _safe_int(value, default=0): + try: + return int(float(value or 0)) + except Exception: + return default + + def _chain_alias(value): key = str(value or "").lower() return DEX_CHAIN_ALIASES.get(key, key) +def _chain_explorer_tx_url(chain, tx_hash): + tx_hash = str(tx_hash or "").strip() + if not tx_hash: + return "" + if chain == "ethereum": + return f"https://etherscan.io/tx/{tx_hash}" + if chain == "bsc": + return f"https://bscscan.com/tx/{tx_hash}" + if chain == "base": + return f"https://basescan.org/tx/{tx_hash}" + if chain == "arbitrum": + return f"https://arbiscan.io/tx/{tx_hash}" + if chain == "solana": + return f"https://solscan.io/tx/{tx_hash}" + return "" + + def _latest_metric(symbol, chain, contract_address): conn = get_conn() row = conn.execute( @@ -210,11 +253,11 @@ def normalize_dexscreener_raw_event(item, event_type, cfg=None): } title = "DEX Screener" if event_type == "token_profile_latest": - title = "Token 资料更新" + title = "DEX 新币资料变更" elif event_type == "token_boost_latest": - title = "DEX Boost 新增" + title = "DEX 付费曝光新增" elif event_type == "token_boost_top": - title = "DEX Boost 榜单" + title = "DEX 付费曝光榜" return { "source": "dexscreener", "chain": chain, @@ -445,8 +488,16 @@ def derive_dex_signals(metric, cfg=None): signals = [] vol_change = _safe_float(metric.get("dex_volume_change_pct")) liq_change = _safe_float(metric.get("liquidity_change_pct")) + volume_1h = _safe_float(metric.get("dex_volume_1h_usd")) + volume_24h = _safe_float(metric.get("dex_volume_usd")) + hour_share_pct = (volume_1h / volume_24h * 100) if volume_24h > 0 else 0 if vol_change >= cfg.get("dex_volume_spike_pct", 80): signals.append("dex_volume_spike") + elif ( + volume_1h >= cfg.get("dex_min_hour_volume_usd", 50000) + and hour_share_pct >= cfg.get("dex_hour_volume_share_pct", 8) + ): + signals.append("dex_volume_spike") if liq_change >= cfg.get("liquidity_add_pct", 25): signals.append("liquidity_add") if liq_change <= cfg.get("liquidity_remove_pct", -25): @@ -481,7 +532,13 @@ def normalize_dexscreener_pair(pair, mapping, cfg=None): chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or mapping.get("chain") or "").lower(), str(mapping.get("chain") or "").lower()) contract = mapping.get("contract_address") or (pair.get("baseToken") or {}).get("address") or "" liquidity = _safe_float((pair.get("liquidity") or {}).get("usd")) - volume = _safe_float((pair.get("volume") or {}).get("h24")) + volume_map = pair.get("volume") or {} + volume = _safe_float(volume_map.get("h24")) + volume_1h = _safe_float(volume_map.get("h1")) + volume_5m = _safe_float(volume_map.get("m5")) + volume_6h = _safe_float(volume_map.get("h6")) + txns_map = pair.get("txns") or {} + txns_h1 = txns_map.get("h1") or {} prev = _latest_metric(symbol, chain, contract) prev_volume = _safe_float(prev.get("dex_volume_usd") if prev else 0) prev_liquidity = _safe_float(prev.get("liquidity_usd") if prev else 0) @@ -492,6 +549,10 @@ def normalize_dexscreener_pair(pair, mapping, cfg=None): "window": "1h", "metric_time": _now().isoformat(timespec="seconds"), "dex_volume_usd": volume, + "dex_volume_1h_usd": volume_1h, + "dex_volume_5m_usd": volume_5m, + "dex_volume_6h_usd": volume_6h, + "dex_volume_1h_share_pct": round(volume_1h / volume * 100, 2) if volume > 0 else 0, "dex_volume_change_pct": _safe_pct_change(volume, prev_volume), "liquidity_usd": liquidity, "liquidity_change_pct": _safe_pct_change(liquidity, prev_liquidity), @@ -510,6 +571,14 @@ def normalize_dexscreener_pair(pair, mapping, cfg=None): "price_change": pair.get("priceChange") or {}, "volume": pair.get("volume") or {}, "liquidity": pair.get("liquidity") or {}, + "derived": { + "dex_volume_1h_usd": volume_1h, + "dex_volume_5m_usd": volume_5m, + "dex_volume_6h_usd": volume_6h, + "dex_volume_1h_share_pct": round(volume_1h / volume * 100, 2) if volume > 0 else 0, + "h1_buys": int(txns_h1.get("buys") or 0), + "h1_sells": int(txns_h1.get("sells") or 0), + }, }, } return _score_metric(metric) @@ -558,6 +627,198 @@ def fetch_dexscreener_metrics(limit=60): return {"metrics": metrics, "events": events, "errors": errors} +def _event_from_etherscan_transfer(row, mapping, cfg=None): + cfg = cfg or get_onchain_params() + decimals = _safe_int(row.get("tokenDecimal"), 18) + amount = _safe_float(row.get("value")) / (10 ** decimals if decimals >= 0 else 1) + price_usd = _latest_price_from_metric(mapping) + value_usd = amount * price_usd if price_usd > 0 else 0 + threshold = _safe_float(cfg.get("whale_tx_usd"), 250000) + if value_usd > 0 and value_usd < threshold: + return None + if value_usd <= 0 and amount <= 0: + return None + tx_hash = str(row.get("hash") or "").strip() + chain = str(mapping.get("chain") or "").lower() + return { + "chain": chain, + "symbol": mapping.get("symbol"), + "contract_address": mapping.get("contract_address") or "", + "event_type": "token_transfer", + "signal_code": "whale_accumulation" if value_usd >= threshold else "large_token_transfer", + "signal_label": signal_label("whale_accumulation" if value_usd >= threshold else "large_token_transfer"), + "direction": "positive" if value_usd >= threshold else "neutral", + "value_usd": value_usd, + "amount": amount, + "tx_hash": tx_hash, + "wallet_address": row.get("to") or "", + "wallet_label": "EVM 接收地址", + "counterparty_label": "EVM 发送地址 " + _short_addr(row.get("from") or ""), + "confidence": 74 if value_usd >= threshold else 58, + "severity": "A" if value_usd >= threshold else "B", + "detected_at": _ts_to_iso(row.get("timeStamp")), + "source": "etherscan", + "url": _chain_explorer_tx_url(chain, tx_hash), + "raw": row, + } + + +def _latest_price_from_metric(mapping): + latest = _latest_metric( + normalize_symbol(mapping.get("symbol")), + str(mapping.get("chain") or "").lower(), + str(mapping.get("contract_address") or ""), + ) + raw = {} + try: + raw = json.loads(latest.get("raw_json") or "{}") if latest else {} + except Exception: + raw = {} + return _safe_float(raw.get("price_usd")) + + +def fetch_etherscan_events(limit=60): + cfg = get_onchain_params() + if not cfg.get("etherscan_enabled", True): + return {"events": [], "errors": ["etherscan_disabled"]} + api_key = cfg.get("etherscan_api_key") or "" + if not api_key: + return {"events": [], "errors": ["etherscan_api_key_missing"]} + allowed_chains = set(cfg.get("etherscan_chains") or ["ethereum"]) + mappings = [ + m for m in get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) + if m.get("chain") in ETHERSCAN_CHAIN_IDS and m.get("chain") in allowed_chains + ] + events = [] + errors = [] + for mapping in mappings[: int(limit or 60)]: + chain = str(mapping.get("chain") or "").lower() + contract = str(mapping.get("contract_address") or "").strip() + if not contract: + continue + params = { + "chainid": ETHERSCAN_CHAIN_IDS[chain], + "module": "account", + "action": "tokentx", + "contractaddress": contract, + "page": 1, + "offset": 25, + "sort": "desc", + "apikey": api_key, + } + try: + data = _request_json(cfg.get("etherscan_base_url"), params=params, timeout=cfg.get("timeout", 15)) + status = str(data.get("status") or "") + message = str(data.get("message") or "") + rows = data.get("result") or [] + if status == "0" and not isinstance(rows, list): + if "No transactions found" not in str(rows) and "No records" not in str(rows): + errors.append(f"{mapping.get('symbol')}:etherscan_{message}:{str(rows)[:100]}") + continue + if not isinstance(rows, list): + continue + for row in rows: + if not isinstance(row, dict): + continue + event = _event_from_etherscan_transfer(row, mapping, cfg=cfg) + if not event: + continue + if insert_onchain_event(event): + events.append(event) + except Exception as exc: + errors.append(f"{mapping.get('symbol')}:etherscan:{str(exc)[:160]}") + return {"events": events, "errors": errors} + + +def _event_from_helius_tx(tx, mapping, cfg=None): + cfg = cfg or get_onchain_params() + mint = str(mapping.get("contract_address") or "") + symbol = normalize_symbol(mapping.get("symbol")) + signature = str(tx.get("signature") or "") + token_transfers = tx.get("tokenTransfers") or [] + native_transfers = tx.get("nativeTransfers") or [] + matched = [t for t in token_transfers if str(t.get("mint") or "") == mint] + if not matched: + return None + amount = max(_safe_float(t.get("tokenAmount")) for t in matched) + price_usd = _latest_price_from_metric(mapping) + value_usd = amount * price_usd if price_usd > 0 else 0 + sol_amount = max([_safe_float(t.get("amount")) / 1_000_000_000 for t in native_transfers] or [0]) + threshold = _safe_float(cfg.get("whale_tx_usd"), 250000) + if value_usd > 0 and value_usd < threshold and sol_amount < 100: + return None + signal = "whale_accumulation" if value_usd >= threshold or sol_amount >= 100 else "large_token_transfer" + return { + "chain": "solana", + "symbol": symbol, + "contract_address": mint, + "event_type": "solana_token_activity", + "signal_code": signal, + "signal_label": signal_label(signal), + "direction": "positive" if signal == "whale_accumulation" else "neutral", + "value_usd": value_usd, + "amount": amount, + "tx_hash": signature, + "wallet_address": (matched[0].get("toUserAccount") or matched[0].get("userAccount") or ""), + "wallet_label": "Solana 接收地址", + "counterparty_label": "Solana 发送地址 " + _short_addr(matched[0].get("fromUserAccount") or ""), + "confidence": 74 if signal == "whale_accumulation" else 58, + "severity": "A" if signal == "whale_accumulation" else "B", + "detected_at": _ts_to_iso(tx.get("timestamp")), + "source": "helius", + "url": _chain_explorer_tx_url("solana", signature), + "raw": tx, + } + + +def fetch_helius_events(limit=60): + cfg = get_onchain_params() + if not cfg.get("helius_enabled", True): + return {"events": [], "errors": ["helius_disabled"]} + api_key = cfg.get("helius_api_key") or "" + if not api_key: + return {"events": [], "errors": ["helius_api_key_missing"]} + mappings = [m for m in get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) if m.get("chain") == "solana"] + events = [] + errors = [] + for mapping in mappings[: int(limit or 60)]: + mint = str(mapping.get("contract_address") or "").strip() + if not mint: + continue + query = urlencode({"api-key": api_key, "limit": 25}) + url = f"{cfg.get('helius_base_url')}/v0/addresses/{mint}/transactions?{query}" + try: + data = _request_json(url, timeout=cfg.get("timeout", 15)) + rows = data if isinstance(data, list) else data.get("transactions") or [] + for tx in rows: + if not isinstance(tx, dict): + continue + event = _event_from_helius_tx(tx, mapping, cfg=cfg) + if not event: + continue + if insert_onchain_event(event): + events.append(event) + except Exception as exc: + errors.append(f"{mapping.get('symbol')}:helius:{str(exc)[:160]}") + return {"events": events, "errors": errors} + + +def _ts_to_iso(value): + try: + if value: + return datetime.fromtimestamp(float(value)).isoformat(timespec="seconds") + except Exception: + pass + return _now().isoformat(timespec="seconds") + + +def _short_addr(value): + value = str(value or "") + if len(value) <= 12: + return value + return value[:6] + "..." + value[-4:] + + def ingest_normalized_events(events): """Test/integration helper for provider adapters.""" init_db() @@ -705,6 +966,12 @@ def run_once(limit=60): output["metrics_count"] += len(dex.get("metrics") or []) output["events_count"] += len(dex.get("events") or []) output["errors"].extend(dex.get("errors") or []) + eth = fetch_etherscan_events(limit=limit) + output["events_count"] += len(eth.get("events") or []) + output["errors"].extend(eth.get("errors") or []) + hel = fetch_helius_events(limit=limit) + output["events_count"] += len(hel.get("events") or []) + output["errors"].extend(hel.get("errors") or []) output["discovered_mappings"] = discover_token_mappings(limit=limit).get("inserted", 0) if not get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) else 0 if output.get("discovered_mappings"): output["status"] = "bootstrapped" @@ -712,6 +979,12 @@ def run_once(limit=60): output["metrics_count"] = len(dex.get("metrics") or []) output["events_count"] = len(dex.get("events") or []) output["errors"].extend(dex.get("errors") or []) + eth = fetch_etherscan_events(limit=limit) + output["events_count"] += len(eth.get("events") or []) + output["errors"].extend(eth.get("errors") or []) + hel = fetch_helius_events(limit=limit) + output["events_count"] += len(hel.get("events") or []) + output["errors"].extend(hel.get("errors") or []) queued = enqueue_onchain_candidates() output["candidate_queued"] = queued.get("queued", 0) output["candidate_symbols"] = queued.get("symbols", []) @@ -745,6 +1018,8 @@ __all__ = [ "enqueue_onchain_candidates", "fetch_dexscreener_metrics", "fetch_dexscreener_raw_events", + "fetch_etherscan_events", + "fetch_helius_events", "get_onchain_params", "ingest_normalized_events", "normalize_dexscreener_pair", diff --git a/app/web/routes_market.py b/app/web/routes_market.py index 5608c99..9e507aa 100644 --- a/app/web/routes_market.py +++ b/app/web/routes_market.py @@ -1,7 +1,7 @@ from fastapi import APIRouter, Cookie -from app.db.analytics import get_stats from app.db.onchain_db import get_onchain_overview +from app.services.market_overview import get_crypto_market_overview from app.web.shared import require_api_user_with_subscription @@ -11,7 +11,12 @@ router = APIRouter() @router.get("/api/market/overview") async def api_market_overview(hours: int = 24, altcoin_session: str = Cookie(default="")): require_api_user_with_subscription(altcoin_session) - stats = get_stats() + crypto_market = {} + market_error = "" + try: + crypto_market = get_crypto_market_overview() + except Exception as exc: + market_error = str(exc)[:500] onchain = get_onchain_overview(hours=hours) newsfeed = {} ai_analysis = {} @@ -22,9 +27,9 @@ async def api_market_overview(hours: int = 24, altcoin_session: str = Cookie(def except Exception: newsfeed = {} try: - from app.db.llm_insights import get_latest_insight_by_type + from app.services.llm_insights import get_latest_sentiment_batch_analysis - latest_sentiment = get_latest_insight_by_type("sentiment_batch_analysis") + latest_sentiment = get_latest_sentiment_batch_analysis() if latest_sentiment: ai_analysis = { "status": latest_sentiment.get("status"), @@ -38,9 +43,10 @@ async def api_market_overview(hours: int = 24, altcoin_session: str = Cookie(def ai_analysis = {} return { "hours": int(hours or 24), - "updated_at": stats.get("market_context_overview", {}).get("updated_at") if isinstance(stats, dict) else None, + "updated_at": crypto_market.get("updated_at"), "market": { - "stats": stats, + "crypto_market": crypto_market, + "market_error": market_error, "newsfeed": newsfeed, "onchain": onchain, "ai_analysis": ai_analysis, diff --git a/app/web/routes_onchain.py b/app/web/routes_onchain.py index 3efcd60..5ae288c 100644 --- a/app/web/routes_onchain.py +++ b/app/web/routes_onchain.py @@ -2,6 +2,7 @@ from fastapi import APIRouter, Cookie from app.db.onchain_db import ( get_onchain_overview, + get_onchain_provider_status, get_onchain_token_detail, list_onchain_events, list_onchain_raw_events, @@ -19,6 +20,12 @@ async def api_onchain_overview(hours: int = 24, altcoin_session: str = Cookie(de return get_onchain_overview(hours=hours) +@router.get("/api/onchain/provider-status") +async def api_onchain_provider_status(hours: int = 24, altcoin_session: str = Cookie(default="")): + require_api_user_with_subscription(altcoin_session) + return get_onchain_provider_status(hours=hours) + + @router.get("/api/onchain/tokens") async def api_onchain_tokens( limit: int = 30, @@ -60,6 +67,7 @@ async def api_onchain_raw_events( source: str = "", event_type: str = "", mapping_status: str = "", + priority: str = "", hours: int = 24, altcoin_session: str = Cookie(default=""), ): @@ -71,5 +79,6 @@ async def api_onchain_raw_events( source=source, event_type=event_type, mapping_status=mapping_status, + priority=priority, hours=hours, ) diff --git a/static/market.html b/static/market.html index 7becb21..562e400 100644 --- a/static/market.html +++ b/static/market.html @@ -2,7 +2,7 @@ {% block title %}AlphaX Agent | Crypto — 市场总览{% endblock %} {% block extra_head_css %} {% endblock %} {% block content %} @@ -10,7 +10,7 @@
这里不是单币推荐页,而是帮你快速判断当前环境:场内有没有动能,链上有没有真异动,AI 舆情有没有明确倾向。
+基于整个加密市场判断今天的大环境:BTC/ETH 方向、山寨市场广度、成交额、资金费率,再结合链上和 AI 舆情作为辅助证据。