diff --git a/.env.example b/.env.example index df9182e..e45c84a 100644 --- a/.env.example +++ b/.env.example @@ -32,6 +32,24 @@ ALPHAX_LLM_RECOMMENDATIONS_ENABLED=1 ALPHAX_LLM_SENTIMENT_ENABLED=1 ALPHAX_LLM_REVIEW_ENABLED=1 +# 链上追踪运行时配置。默认关闭;开启后采集结果只作为发现/风控辅助。 +ALPHAX_ONCHAIN_ENABLED=0 +ALPHAX_ONCHAIN_CHAINS=ethereum,bsc,base,arbitrum,solana +ALPHAX_ONCHAIN_TIMEOUT=15 +ALPHAX_ONCHAIN_CANDIDATE_ENABLED=1 +ALPHAX_ONCHAIN_CANDIDATE_MIN_SCORE=70 +ALPHAX_ONCHAIN_CANDIDATE_MIN_CONFIDENCE=70 +ALPHAX_ONCHAIN_CANDIDATE_COOLDOWN_HOURS=6 +ALPHAX_ONCHAIN_DEXSCREENER_ENABLED=1 +ALPHAX_ONCHAIN_DEX_VOLUME_SPIKE_PCT=80 +ALPHAX_ONCHAIN_DEX_MIN_LIQUIDITY_USD=100000 +ALPHAX_ONCHAIN_DEX_MIN_VOLUME_24H_USD=100000 +ALPHAX_ONCHAIN_LIQUIDITY_ADD_PCT=25 +ALPHAX_ONCHAIN_LIQUIDITY_REMOVE_PCT=-25 +ALPHAX_ONCHAIN_WHALE_TX_USD=250000 +ALPHAX_ETHERSCAN_API_KEY= +ALPHAX_HELIUS_API_KEY= + # 邮箱验证码 SMTP 配置。没有配置时,注册验证码只会生成,不会发邮件。 ASTOCK_SMTP_HOST= ASTOCK_SMTP_PORT=465 diff --git a/app/cli.py b/app/cli.py index e77c5c6..bae848a 100644 --- a/app/cli.py +++ b/app/cli.py @@ -2,7 +2,7 @@ import argparse -from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, price_tracker, review_engine, sentiment_monitor +from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, onchain_monitor, price_tracker, review_engine, sentiment_monitor def build_parser(): @@ -29,6 +29,9 @@ def build_parser(): sentiment.add_argument("--check", action="store_true", help="输出舆情异动") sentiment.add_argument("--scores", action="store_true", help="输出评分") + onchain = subparsers.add_parser("onchain", help="运行链上追踪任务") + onchain.add_argument("--limit", type=int, default=60, help="本轮最多处理的 token 映射数量") + llm = subparsers.add_parser("llm-insights", help="异步生成 LLM 缓存解释") llm.add_argument("--scope", choices=["recommendations", "sentiment", "sentiment-events", "review"], default="recommendations") llm.add_argument("--limit", type=int, default=30) @@ -74,6 +77,8 @@ def main(): } print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2)) return result + if args.command == "onchain": + return onchain_monitor.run_once(limit=args.limit) if args.command == "llm-insights": from app.services import llm_insights diff --git a/app/core/signal_taxonomy.py b/app/core/signal_taxonomy.py index 53cf262..03ab643 100644 --- a/app/core/signal_taxonomy.py +++ b/app/core/signal_taxonomy.py @@ -31,6 +31,14 @@ SIGNAL_CODE_LABELS = { "top_trader_long": "大户偏多", "sector_rotation": "板块联动", "sentiment_resonance": "舆情共振", + "dex_volume_spike": "DEX 放量", + "liquidity_add": "流动性增加", + "liquidity_remove_risk": "流动性撤出风险", + "exchange_outflow": "交易所流出", + "exchange_inflow_risk": "交易所流入风险", + "whale_accumulation": "鲸鱼增持", + "holder_concentration_risk": "持仓集中风险", + "smart_money_buying": "聪明钱买入", "funding_extreme": "资金费率极端", "trend_exhaustion": "趋势衰减", "false_breakout": "假突破", @@ -61,6 +69,14 @@ _PATTERNS = [ ("top_trader_long", ("大户偏多",)), ("sector_rotation", ("板块联动", "龙头")), ("sentiment_resonance", ("舆情共振",)), + ("dex_volume_spike", ("DEX", "放量")), + ("liquidity_add", ("流动性", "增加")), + ("liquidity_remove_risk", ("流动性", "撤出")), + ("exchange_outflow", ("交易所", "流出")), + ("exchange_inflow_risk", ("交易所", "流入")), + ("whale_accumulation", ("鲸鱼", "增持")), + ("holder_concentration_risk", ("持仓集中",)), + ("smart_money_buying", ("聪明钱", "买入")), ("funding_extreme", ("资金费率极端",)), ("trend_exhaustion", ("衰减", "反转", "阴动K")), ("false_breakout", ("假突破", "冲高回落")), diff --git a/app/db/onchain_db.py b/app/db/onchain_db.py new file mode 100644 index 0000000..41fa9c4 --- /dev/null +++ b/app/db/onchain_db.py @@ -0,0 +1,786 @@ +"""On-chain discovery storage and read models. + +The on-chain layer is a research/discovery input. It stores normalized external +facts and can enqueue technical-check candidates, but it must not create or +mutate trading recommendations directly. +""" + +import json +import sqlite3 +from datetime import datetime, timedelta + +from app.db.altcoin_db import get_conn + + +MIN_MAPPING_CONFIDENCE = 70 + + +SIGNAL_LABELS = { + "dex_volume_spike": "DEX 放量", + "liquidity_add": "流动性增加", + "liquidity_remove_risk": "流动性撤出风险", + "exchange_outflow": "交易所流出", + "exchange_inflow_risk": "交易所流入风险", + "whale_accumulation": "鲸鱼增持", + "holder_concentration_risk": "持仓集中风险", + "smart_money_buying": "聪明钱买入", +} + +RAW_EVENT_TYPE_LABELS = { + "token_profile_latest": "Token 资料更新", + "token_boost_latest": "DEX 热度 Boost", + "token_boost_top": "DEX Boost 榜", +} + +POSITIVE_SIGNALS = {"dex_volume_spike", "liquidity_add", "exchange_outflow", "whale_accumulation", "smart_money_buying"} +RISK_SIGNALS = {"liquidity_remove_risk", "exchange_inflow_risk", "holder_concentration_risk"} + + +def _now(): + return datetime.now().isoformat() + + +def _dump(value): + return json.dumps(value or {}, ensure_ascii=False, sort_keys=True, default=str) + + +def _load(value, fallback=None): + try: + if isinstance(value, str) and value.strip(): + return json.loads(value) + if value is not None: + return value + except Exception: + pass + return fallback + + +def _symbol_base(symbol): + return str(symbol or "").upper().replace("/USDT", "").replace("USDT", "").strip() + + +def normalize_symbol(symbol): + base = _symbol_base(symbol) + return f"{base}/USDT" if base else "" + + +def signal_label(code): + return SIGNAL_LABELS.get(str(code or ""), str(code or "链上信号")) + + +def signal_direction(code): + code = str(code or "") + if code in RISK_SIGNALS: + return "risk" + if code in POSITIVE_SIGNALS: + return "positive" + return "neutral" + + +def raw_event_type_label(event_type): + return RAW_EVENT_TYPE_LABELS.get(str(event_type or ""), str(event_type or "链上原始事件")) + + +def init_onchain_tables(): + conn = get_conn() + conn.execute( + """ + CREATE TABLE IF NOT EXISTS onchain_token_map ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + symbol TEXT NOT NULL, + chain TEXT NOT NULL, + contract_address TEXT NOT NULL, + source TEXT DEFAULT '', + confidence INTEGER DEFAULT 0, + is_active INTEGER DEFAULT 1, + raw_json TEXT DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE UNIQUE INDEX IF NOT EXISTS idx_onchain_token_map_unique + ON onchain_token_map(symbol, chain, contract_address) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_onchain_token_map_symbol ON onchain_token_map(symbol, confidence, is_active)") + + conn.execute( + """ + CREATE TABLE IF NOT EXISTS onchain_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_hash TEXT UNIQUE, + chain TEXT NOT NULL, + symbol TEXT NOT NULL, + contract_address TEXT DEFAULT '', + event_type TEXT NOT NULL, + signal_code TEXT NOT NULL, + signal_label TEXT DEFAULT '', + direction TEXT DEFAULT 'neutral', + value_usd REAL DEFAULT 0, + amount REAL DEFAULT 0, + tx_hash TEXT DEFAULT '', + wallet_address TEXT DEFAULT '', + wallet_label TEXT DEFAULT '', + counterparty_label TEXT DEFAULT '', + confidence INTEGER DEFAULT 0, + severity TEXT DEFAULT 'B', + status TEXT DEFAULT 'new', + detected_at TEXT NOT NULL, + source TEXT DEFAULT '', + url TEXT DEFAULT '', + raw_json TEXT DEFAULT '{}' + ) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_onchain_events_time ON onchain_events(detected_at, signal_code)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_onchain_events_symbol ON onchain_events(symbol, detected_at)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_onchain_events_chain ON onchain_events(chain, detected_at)") + + conn.execute( + """ + CREATE TABLE IF NOT EXISTS onchain_token_metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + symbol TEXT NOT NULL, + chain TEXT NOT NULL, + contract_address TEXT DEFAULT '', + window TEXT NOT NULL, + metric_time TEXT NOT NULL, + dex_volume_usd REAL DEFAULT 0, + dex_volume_change_pct REAL DEFAULT 0, + liquidity_usd REAL DEFAULT 0, + liquidity_change_pct REAL DEFAULT 0, + exchange_netflow_usd REAL DEFAULT 0, + whale_accumulation_usd REAL DEFAULT 0, + holder_delta REAL DEFAULT 0, + smart_money_score REAL DEFAULT 0, + onchain_score REAL DEFAULT 0, + risk_score REAL DEFAULT 0, + source TEXT DEFAULT '', + raw_json TEXT DEFAULT '{}' + ) + """ + ) + conn.execute( + """ + CREATE UNIQUE INDEX IF NOT EXISTS idx_onchain_metrics_unique + ON onchain_token_metrics(symbol, chain, contract_address, window, metric_time) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_onchain_metrics_symbol ON onchain_token_metrics(symbol, metric_time)") + + conn.execute( + """ + CREATE TABLE IF NOT EXISTS onchain_raw_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_hash TEXT UNIQUE, + source TEXT NOT NULL, + chain TEXT NOT NULL, + event_type TEXT NOT NULL, + token_address TEXT DEFAULT '', + symbol_guess TEXT DEFAULT '', + name TEXT DEFAULT '', + title TEXT DEFAULT '', + description TEXT DEFAULT '', + url TEXT DEFAULT '', + icon TEXT DEFAULT '', + amount REAL DEFAULT 0, + total_amount REAL DEFAULT 0, + importance REAL DEFAULT 0, + mapped_symbol TEXT DEFAULT '', + mapping_status TEXT DEFAULT 'unmapped', + detected_at TEXT NOT NULL, + raw_json TEXT DEFAULT '{}' + ) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_onchain_raw_events_time ON onchain_raw_events(detected_at, importance)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_onchain_raw_events_chain ON onchain_raw_events(chain, detected_at)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_onchain_raw_events_mapping ON onchain_raw_events(mapping_status, detected_at)") + conn.commit() + conn.close() + + +def upsert_token_mapping(symbol, chain, contract_address, source="", confidence=0, raw=None, is_active=True): + init_onchain_tables() + now = _now() + symbol = normalize_symbol(symbol) + chain = str(chain or "").lower().strip() + contract_address = str(contract_address or "").strip() + if not symbol or not chain or not contract_address: + return 0 + conn = get_conn() + cur = conn.execute( + """ + INSERT INTO onchain_token_map + (symbol, chain, contract_address, source, confidence, is_active, raw_json, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(symbol, chain, contract_address) DO UPDATE SET + source=excluded.source, + confidence=MAX(onchain_token_map.confidence, excluded.confidence), + is_active=excluded.is_active, + raw_json=excluded.raw_json, + updated_at=excluded.updated_at + """, + (symbol, chain, contract_address, source or "", int(confidence or 0), 1 if is_active else 0, _dump(raw), now, now), + ) + conn.commit() + row = conn.execute( + "SELECT id FROM onchain_token_map WHERE symbol=? AND chain=? AND contract_address=?", + (symbol, chain, contract_address), + ).fetchone() + conn.close() + return int(row["id"] if row else cur.lastrowid or 0) + + +def get_token_mappings(symbol="", min_confidence=MIN_MAPPING_CONFIDENCE, active_only=True): + init_onchain_tables() + clauses = ["confidence >= ?"] + params = [int(min_confidence or 0)] + if symbol: + clauses.append("symbol=?") + params.append(normalize_symbol(symbol)) + if active_only: + clauses.append("is_active=1") + conn = get_conn() + rows = conn.execute( + f""" + SELECT * FROM onchain_token_map + WHERE {' AND '.join(clauses)} + ORDER BY confidence DESC, updated_at DESC + """, + tuple(params), + ).fetchall() + conn.close() + return [dict(row) for row in rows] + + +def _event_hash(event): + raw = "|".join( + [ + str(event.get("source") or ""), + str(event.get("chain") or ""), + str(event.get("symbol") or ""), + str(event.get("signal_code") or event.get("event_type") or ""), + str(event.get("tx_hash") or event.get("contract_address") or ""), + str(event.get("detected_at") or ""), + str(round(float(event.get("value_usd") or 0), 2)), + ] + ).lower() + import hashlib + + return hashlib.sha256(raw.encode()).hexdigest()[:24] + + +def _raw_event_hash(event): + raw = "|".join( + [ + str(event.get("source") or ""), + str(event.get("chain") or ""), + str(event.get("event_type") or ""), + str(event.get("token_address") or ""), + str(event.get("url") or ""), + str(round(float(event.get("amount") or 0), 4)), + str(round(float(event.get("total_amount") or 0), 4)), + ] + ).lower() + import hashlib + + return hashlib.sha256(raw.encode()).hexdigest()[:24] + + +def insert_onchain_event(event): + init_onchain_tables() + item = dict(event or {}) + item["symbol"] = normalize_symbol(item.get("symbol")) + item["chain"] = str(item.get("chain") or "").lower().strip() + item["signal_code"] = str(item.get("signal_code") or "").strip() + item["event_type"] = str(item.get("event_type") or item["signal_code"] or "onchain_event") + if not item["symbol"] or not item["chain"] or not item["signal_code"]: + return 0 + item["signal_label"] = item.get("signal_label") or signal_label(item["signal_code"]) + item["direction"] = item.get("direction") or signal_direction(item["signal_code"]) + item["detected_at"] = str(item.get("detected_at") or _now()) + item["event_hash"] = item.get("event_hash") or _event_hash(item) + conn = get_conn() + try: + cur = conn.execute( + """ + INSERT INTO onchain_events ( + event_hash, chain, symbol, contract_address, event_type, signal_code, signal_label, + direction, value_usd, amount, tx_hash, wallet_address, wallet_label, + counterparty_label, confidence, severity, status, detected_at, source, url, raw_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + item["event_hash"], + item["chain"], + item["symbol"], + item.get("contract_address") or "", + item["event_type"], + item["signal_code"], + item["signal_label"], + item["direction"], + float(item.get("value_usd") or 0), + float(item.get("amount") or 0), + item.get("tx_hash") or "", + item.get("wallet_address") or "", + item.get("wallet_label") or "", + item.get("counterparty_label") or "", + int(item.get("confidence") or 0), + item.get("severity") or "B", + item.get("status") or "new", + item["detected_at"], + item.get("source") or "", + item.get("url") or "", + _dump(item.get("raw") or item.get("raw_json") or {}), + ), + ) + conn.commit() + event_id = int(cur.lastrowid or 0) + except sqlite3.IntegrityError: + event_id = 0 + finally: + conn.close() + return event_id + + +def find_mapping_by_contract(chain, contract_address): + init_onchain_tables() + chain = str(chain or "").lower().strip() + contract_address = str(contract_address or "").strip() + if not chain or not contract_address: + return None + conn = get_conn() + row = conn.execute( + """ + SELECT * + FROM onchain_token_map + WHERE chain=? AND lower(contract_address)=lower(?) AND is_active=1 + ORDER BY confidence DESC, updated_at DESC + LIMIT 1 + """, + (chain, contract_address), + ).fetchone() + conn.close() + return dict(row) if row else None + + +def insert_onchain_raw_event(event): + init_onchain_tables() + item = dict(event or {}) + item["source"] = str(item.get("source") or "").strip() + item["chain"] = str(item.get("chain") or "").lower().strip() + item["event_type"] = str(item.get("event_type") or "onchain_raw_event").strip() + item["token_address"] = str(item.get("token_address") or "").strip() + if not item["source"] or not item["chain"] or not item["event_type"] or not item["token_address"]: + return 0 + item["detected_at"] = str(item.get("detected_at") or _now()) + item["event_hash"] = item.get("event_hash") or _raw_event_hash(item) + item["mapped_symbol"] = normalize_symbol(item.get("mapped_symbol")) if item.get("mapped_symbol") else "" + item["mapping_status"] = str(item.get("mapping_status") or ("mapped" if item["mapped_symbol"] else "unmapped")) + conn = get_conn() + try: + cur = conn.execute( + """ + INSERT INTO onchain_raw_events ( + event_hash, source, chain, event_type, token_address, symbol_guess, name, + title, description, url, icon, amount, total_amount, importance, + mapped_symbol, mapping_status, detected_at, raw_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + item["event_hash"], + item["source"], + item["chain"], + item["event_type"], + item["token_address"], + item.get("symbol_guess") or "", + item.get("name") or "", + item.get("title") or raw_event_type_label(item["event_type"]), + item.get("description") or "", + item.get("url") or "", + item.get("icon") or "", + float(item.get("amount") or 0), + float(item.get("total_amount") or 0), + float(item.get("importance") or 0), + item["mapped_symbol"], + item["mapping_status"], + item["detected_at"], + _dump(item.get("raw") or item.get("raw_json") or {}), + ), + ) + conn.commit() + event_id = int(cur.lastrowid or 0) + except sqlite3.IntegrityError: + event_id = 0 + finally: + conn.close() + return event_id + + +def insert_token_metric(metric): + init_onchain_tables() + item = dict(metric or {}) + item["symbol"] = normalize_symbol(item.get("symbol")) + item["chain"] = str(item.get("chain") or "").lower().strip() + item["window"] = str(item.get("window") or "1h").strip() + item["metric_time"] = str(item.get("metric_time") or _now()) + if not item["symbol"] or not item["chain"] or not item["window"]: + return 0 + conn = get_conn() + cur = conn.execute( + """ + INSERT INTO onchain_token_metrics ( + symbol, chain, contract_address, window, metric_time, + dex_volume_usd, dex_volume_change_pct, liquidity_usd, liquidity_change_pct, + exchange_netflow_usd, whale_accumulation_usd, holder_delta, smart_money_score, + onchain_score, risk_score, source, raw_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(symbol, chain, contract_address, window, metric_time) DO UPDATE SET + dex_volume_usd=excluded.dex_volume_usd, + dex_volume_change_pct=excluded.dex_volume_change_pct, + liquidity_usd=excluded.liquidity_usd, + liquidity_change_pct=excluded.liquidity_change_pct, + exchange_netflow_usd=excluded.exchange_netflow_usd, + whale_accumulation_usd=excluded.whale_accumulation_usd, + holder_delta=excluded.holder_delta, + smart_money_score=excluded.smart_money_score, + onchain_score=excluded.onchain_score, + risk_score=excluded.risk_score, + source=excluded.source, + raw_json=excluded.raw_json + """, + ( + item["symbol"], + item["chain"], + item.get("contract_address") or "", + item["window"], + item["metric_time"], + float(item.get("dex_volume_usd") or 0), + float(item.get("dex_volume_change_pct") or 0), + float(item.get("liquidity_usd") or 0), + float(item.get("liquidity_change_pct") or 0), + float(item.get("exchange_netflow_usd") or 0), + float(item.get("whale_accumulation_usd") or 0), + float(item.get("holder_delta") or 0), + float(item.get("smart_money_score") or 0), + float(item.get("onchain_score") or 0), + float(item.get("risk_score") or 0), + item.get("source") or "", + _dump(item.get("raw") or item.get("raw_json") or {}), + ), + ) + conn.commit() + conn.close() + return int(cur.lastrowid or 0) + + +def _latest_metrics_subquery(hours=24): + return """ + SELECT m.* + FROM onchain_token_metrics m + JOIN ( + SELECT symbol, chain, contract_address, MAX(metric_time) AS max_time + FROM onchain_token_metrics + WHERE metric_time >= ? + GROUP BY symbol, chain, contract_address + ) latest ON latest.symbol=m.symbol + AND latest.chain=m.chain + AND latest.contract_address=m.contract_address + AND latest.max_time=m.metric_time + """ + + +def get_onchain_overview(hours=24): + init_onchain_tables() + cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat() + conn = get_conn() + event_rows = conn.execute("SELECT * FROM onchain_events WHERE detected_at >= ?", (cutoff,)).fetchall() + raw_rows = conn.execute("SELECT * FROM onchain_raw_events WHERE detected_at >= ?", (cutoff,)).fetchall() + raw_latest = conn.execute( + """ + SELECT * FROM onchain_raw_events + WHERE detected_at >= ? + ORDER BY datetime(detected_at) DESC, importance DESC, id DESC + LIMIT 12 + """, + (cutoff,), + ).fetchall() + metric_rows = conn.execute(_latest_metrics_subquery(hours), (cutoff,)).fetchall() + rec_rows = conn.execute( + "SELECT symbol, id, execution_status, action_status, display_bucket FROM recommendation WHERE status='active'" + ).fetchall() + conn.close() + active = {row["symbol"]: dict(row) for row in rec_rows} + events = [dict(row) for row in event_rows] + raw_events = [dict(row) for row in raw_rows] + metrics = [dict(row) for row in metric_rows] + hot = sorted(metrics, key=lambda x: float(x.get("onchain_score") or 0), reverse=True)[:8] + risks = sorted(metrics, key=lambda x: float(x.get("risk_score") or 0), reverse=True)[:8] + total_netflow = sum(float(x.get("exchange_netflow_usd") or 0) for x in metrics) + dex_volume = sum(float(x.get("dex_volume_usd") or 0) for x in metrics) + return { + "hours": int(hours or 24), + "updated_at": _now(), + "kpi": { + "event_count": len(events), + "raw_event_count": len(raw_events), + "raw_unmapped_count": sum(1 for e in raw_events if e.get("mapping_status") == "unmapped"), + "raw_mapped_count": sum(1 for e in raw_events if e.get("mapping_status") == "mapped"), + "token_count": len({(m["symbol"], m["chain"], m.get("contract_address") or "") for m in metrics}), + "positive_events": sum(1 for e in events if e.get("direction") == "positive"), + "risk_events": sum(1 for e in events if e.get("direction") == "risk"), + "exchange_netflow_usd": round(total_netflow, 2), + "dex_volume_usd": round(dex_volume, 2), + }, + "hot_tokens": [_format_metric_item(row, active) for row in hot], + "risk_tokens": [_format_metric_item(row, active) for row in risks], + "raw_events": [_format_raw_event(row) for row in raw_latest], + "signals": _signal_counts(events), + } + + +def _signal_counts(events): + counts = {} + for e in events: + code = e.get("signal_code") or "" + if not code: + continue + counts.setdefault(code, {"signal_code": code, "signal_label": signal_label(code), "count": 0}) + counts[code]["count"] += 1 + return sorted(counts.values(), key=lambda x: x["count"], reverse=True) + + +def _format_metric_item(row, active=None): + active = active or {} + item = dict(row) + item["raw"] = _load(item.pop("raw_json", "{}"), {}) + rec = active.get(item.get("symbol")) or {} + item["recommendation"] = { + "rec_id": rec.get("id") or 0, + "execution_status": rec.get("execution_status") or "", + "action_status": rec.get("action_status") or "", + "display_bucket": rec.get("display_bucket") or "", + "has_active": bool(rec), + } + return item + + +def list_onchain_tokens(limit=30, offset=0, chain="", signal="", hours=24): + init_onchain_tables() + limit = max(1, min(int(limit or 30), 100)) + offset = max(0, int(offset or 0)) + cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat() + clauses = [] + params = [] + if chain: + clauses.append("m.chain=?") + params.append(str(chain).lower()) + if signal: + clauses.append( + """ + EXISTS ( + SELECT 1 FROM onchain_events e + WHERE e.symbol=m.symbol AND e.chain=m.chain + AND e.detected_at >= ? AND e.signal_code=? + ) + """ + ) + params.extend([cutoff, signal]) + where = " AND ".join(clauses) if clauses else "1=1" + conn = get_conn() + total = conn.execute( + f""" + SELECT COUNT(*) FROM ( + SELECT m.symbol, m.chain, m.contract_address + FROM onchain_token_metrics m + WHERE {where} + GROUP BY m.symbol, m.chain, m.contract_address + ) + """, + tuple(params), + ).fetchone()[0] + rows = conn.execute( + f""" + SELECT m.*, + (SELECT COUNT(*) FROM onchain_events e + WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.detected_at >= ?) AS event_count, + (SELECT COUNT(*) FROM onchain_events e + WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.direction='risk' AND e.detected_at >= ?) AS risk_event_count + FROM ({_latest_metrics_subquery(hours)}) m + WHERE {where} + ORDER BY m.onchain_score DESC, m.risk_score DESC, m.metric_time DESC + LIMIT ? OFFSET ? + """, + (cutoff, cutoff, cutoff, *params, limit, offset), + ).fetchall() + rec_rows = conn.execute( + "SELECT symbol, id, execution_status, action_status, display_bucket FROM recommendation WHERE status='active'" + ).fetchall() + conn.close() + active = {row["symbol"]: dict(row) for row in rec_rows} + return { + "items": [_format_metric_item(row, active) for row in rows], + "total": int(total or 0), + "limit": limit, + "offset": offset, + "has_more": offset + len(rows) < int(total or 0), + } + + +def get_onchain_token_detail(symbol, hours=72): + init_onchain_tables() + symbol = normalize_symbol(symbol) + cutoff = (datetime.now() - timedelta(hours=int(hours or 72))).isoformat() + conn = get_conn() + mappings = conn.execute( + "SELECT * FROM onchain_token_map WHERE symbol=? ORDER BY confidence DESC, updated_at DESC", + (symbol,), + ).fetchall() + events = conn.execute( + """ + SELECT * FROM onchain_events + WHERE symbol=? AND detected_at >= ? + ORDER BY detected_at DESC, id DESC + LIMIT 100 + """, + (symbol, cutoff), + ).fetchall() + metrics = conn.execute( + """ + SELECT * FROM onchain_token_metrics + WHERE symbol=? AND metric_time >= ? + ORDER BY metric_time DESC, id DESC + LIMIT 100 + """, + (symbol, cutoff), + ).fetchall() + rec = conn.execute( + """ + SELECT id, rec_time, action_status, execution_status, display_bucket, entry_price, current_price + FROM recommendation + WHERE symbol=? AND status='active' + ORDER BY id DESC LIMIT 1 + """, + (symbol,), + ).fetchone() + conn.close() + return { + "symbol": symbol, + "hours": int(hours or 72), + "mappings": [_with_raw(row) for row in mappings], + "events": [_with_raw(row) for row in events], + "metrics": [_with_raw(row) for row in metrics], + "recommendation": dict(rec) if rec else None, + } + + +def list_onchain_events(limit=50, offset=0, chain="", signal="", status="", hours=24): + init_onchain_tables() + limit = max(1, min(int(limit or 50), 200)) + offset = max(0, int(offset or 0)) + cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat() + clauses = ["detected_at >= ?"] + params = [cutoff] + if chain: + clauses.append("chain=?") + params.append(str(chain).lower()) + if signal: + clauses.append("signal_code=?") + params.append(signal) + if status: + clauses.append("status=?") + params.append(status) + where = " AND ".join(clauses) + conn = get_conn() + total = conn.execute(f"SELECT COUNT(*) FROM onchain_events WHERE {where}", tuple(params)).fetchone()[0] + rows = conn.execute( + f""" + SELECT * FROM onchain_events + WHERE {where} + ORDER BY datetime(detected_at) DESC, id DESC + LIMIT ? OFFSET ? + """, + (*params, limit, offset), + ).fetchall() + conn.close() + return {"items": [_with_raw(row) for row in rows], "total": int(total or 0), "limit": limit, "offset": offset, "has_more": offset + len(rows) < int(total or 0)} + + +def list_onchain_raw_events(limit=50, offset=0, chain="", source="", event_type="", mapping_status="", hours=24): + init_onchain_tables() + limit = max(1, min(int(limit or 50), 200)) + offset = max(0, int(offset or 0)) + cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat() + clauses = ["detected_at >= ?"] + params = [cutoff] + if chain: + clauses.append("chain=?") + params.append(str(chain).lower()) + if source: + clauses.append("source=?") + params.append(source) + if event_type: + clauses.append("event_type=?") + params.append(event_type) + if mapping_status: + clauses.append("mapping_status=?") + params.append(mapping_status) + where = " AND ".join(clauses) + conn = get_conn() + total = conn.execute(f"SELECT COUNT(*) FROM onchain_raw_events WHERE {where}", tuple(params)).fetchone()[0] + rows = conn.execute( + f""" + SELECT * FROM onchain_raw_events + WHERE {where} + ORDER BY datetime(detected_at) DESC, importance DESC, id DESC + LIMIT ? OFFSET ? + """, + (*params, limit, offset), + ).fetchall() + conn.close() + return { + "items": [_format_raw_event(row) for row in rows], + "total": int(total or 0), + "limit": limit, + "offset": offset, + "has_more": offset + len(rows) < int(total or 0), + } + + +def update_event_status(event_ids, status): + if not event_ids: + return 0 + init_onchain_tables() + conn = get_conn() + cur = conn.execute( + "UPDATE onchain_events SET status=? WHERE id IN (" + ",".join(["?"] * len(event_ids)) + ")", + (status, *[int(x) for x in event_ids]), + ) + conn.commit() + conn.close() + return int(cur.rowcount or 0) + + +def _with_raw(row): + item = dict(row) + if "raw_json" in item: + item["raw"] = _load(item.pop("raw_json"), {}) + return item + + +def _format_raw_event(row): + item = _with_raw(row) + item["event_label"] = raw_event_type_label(item.get("event_type")) + item["token_short"] = _short_address(item.get("token_address")) + return item + + +def _short_address(value): + value = str(value or "") + if len(value) <= 14: + return value + return value[:6] + "..." + value[-4:] diff --git a/app/db/recommendation_queries.py b/app/db/recommendation_queries.py index 757d733..8c11037 100644 --- a/app/db/recommendation_queries.py +++ b/app/db/recommendation_queries.py @@ -106,7 +106,82 @@ def get_active_recommendations(actionable_only: bool = False): if actionable_only and not _is_actionable_execution_status(item.get("execution_status")): continue result.append(item) - return result + return _attach_onchain_context(result) + + +def _attach_onchain_context(items): + if not items: + return items + symbols = sorted({item.get("symbol") for item in items if item.get("symbol")}) + if not symbols: + return items + placeholders = ",".join(["?"] * len(symbols)) + try: + conn = get_conn() + rows = conn.execute( + f""" + SELECT m.* + FROM onchain_token_metrics m + JOIN ( + SELECT symbol, MAX(metric_time) AS max_time + FROM onchain_token_metrics + WHERE symbol IN ({placeholders}) + GROUP BY symbol + ) latest ON latest.symbol=m.symbol AND latest.max_time=m.metric_time + """, + tuple(symbols), + ).fetchall() + events = conn.execute( + f""" + SELECT * + FROM onchain_events + WHERE symbol IN ({placeholders}) + AND detected_at >= datetime('now', '-24 hours') + ORDER BY datetime(detected_at) DESC, id DESC + """, + tuple(symbols), + ).fetchall() + conn.close() + except Exception: + return items + metrics = {row["symbol"]: dict(row) for row in rows} + by_symbol = {} + for row in events: + by_symbol.setdefault(row["symbol"], []).append(dict(row)) + for item in items: + metric = metrics.get(item.get("symbol")) or {} + evs = by_symbol.get(item.get("symbol")) or [] + if not metric and not evs: + continue + risk_events = [e for e in evs if e.get("direction") == "risk"] + positive_events = [e for e in evs if e.get("direction") == "positive"] + if risk_events: + headline = risk_events[0].get("signal_label") or "链上风险升温" + elif positive_events: + headline = positive_events[0].get("signal_label") or "链上资金异动" + else: + headline = "链上异动" + item["onchain_context"] = { + "headline": headline, + "chain": metric.get("chain") or (evs[0].get("chain") if evs else ""), + "onchain_score": metric.get("onchain_score") or 0, + "risk_score": metric.get("risk_score") or 0, + "dex_volume_usd": metric.get("dex_volume_usd") or 0, + "liquidity_usd": metric.get("liquidity_usd") or 0, + "event_count_24h": len(evs), + "risk_event_count_24h": len(risk_events), + "top_events": [ + { + "signal_code": e.get("signal_code"), + "signal_label": e.get("signal_label"), + "direction": e.get("direction"), + "value_usd": e.get("value_usd") or 0, + "detected_at": e.get("detected_at"), + } + for e in evs[:3] + ], + } + return items def get_active_recommendations_deduped( @@ -209,8 +284,10 @@ def get_active_recommendations_deduped( summary["expired_filtered"] = summary.pop("expired", 0) if not with_meta: + _attach_onchain_context(all_items) return attach_recommendation_insights(all_items) page_items = all_items[offset : offset + limit] if limit else all_items[offset:] + _attach_onchain_context(page_items) attach_recommendation_insights(page_items) return { "items": page_items, diff --git a/app/db/scheduler_db.py b/app/db/scheduler_db.py index 49a7805..fe6726c 100644 --- a/app/db/scheduler_db.py +++ b/app/db/scheduler_db.py @@ -79,6 +79,16 @@ DEFAULT_JOBS = [ "description": "舆情采集", "sort_order": 50, }, + { + "job_name": "onchain", + "command": "onchain", + "args": [], + "every_seconds": 1800, + "initial_delay": 150, + "lock_group": "onchain_write", + "description": "链上异动追踪", + "sort_order": 55, + }, { "job_name": "llm-sentiment", "command": "llm-insights", @@ -497,6 +507,7 @@ def _display_job_name(job_name): "confirm": "确认", "screener": "粗筛", "sentiment": "舆情", + "onchain": "链上", "llm-sentiment": "AI舆情", "review": "复盘", }.get(job_name, job_name) diff --git a/app/db/schema.py b/app/db/schema.py index 9fbf60e..4941b2a 100644 --- a/app/db/schema.py +++ b/app/db/schema.py @@ -1,5 +1,11 @@ """Schema/init-oriented DB API.""" -from app.db.altcoin_db import get_conn, init_db +from app.db.altcoin_db import get_conn, init_db as _init_main_db +from app.db.onchain_db import init_onchain_tables + + +def init_db(): + _init_main_db() + init_onchain_tables() __all__ = ["get_conn", "init_db"] diff --git a/app/services/onchain_monitor.py b/app/services/onchain_monitor.py new file mode 100644 index 0000000..5f1d759 --- /dev/null +++ b/app/services/onchain_monitor.py @@ -0,0 +1,744 @@ +"""On-chain signal collector and candidate bridge. + +V1 deliberately treats on-chain data as a discovery/risk layer. It writes +normalized events/metrics and may request a technical check through event_news, +but it never creates recommendations or changes recommendation state directly. +""" + +import json +import os +from datetime import datetime, timedelta + +import requests + +from app.db import onchain_db +from app.db.altcoin_db import get_conn, init_db, log_cron_run +from app.db.onchain_db import ( + MIN_MAPPING_CONFIDENCE, + POSITIVE_SIGNALS, + RISK_SIGNALS, + find_mapping_by_contract, + get_token_mappings, + init_onchain_tables, + insert_onchain_event, + insert_onchain_raw_event, + insert_token_metric, + normalize_symbol, + signal_direction, + signal_label, +) +from app.services.event_driven_screener import _event_hash as event_hash +from app.services.event_driven_screener import _tradable_symbol, init_event_tables + + +DEFAULT_CHAINS = ("ethereum", "bsc", "base", "arbitrum", "solana") +SOLANA_AUTO_ALLOWLIST = { + "WIF", "BONK", "JUP", "RAY", "PYTH", "PENGU", "JTO", "MEW", "POPCAT", "PNUT", + "FARTCOIN", "RENDER", "HNT", "MOBILE", "ORCA", "KMNO", "DRIFT", "TNSR", "IO", +} +NON_TARGET_NATIVE_BASES = { + "AVAX", "FIL", "SUI", "APT", "DOT", "ADA", "XRP", "LTC", "BCH", "ATOM", "NEAR", + "SEI", "INJ", "TON", "ETC", "ICP", "HBAR", "ALGO", "VET", "TRX", "XLM", "KAS", + "TIA", "EGLD", "FLOW", "KAVA", "MINA", "IOTA", "XMR", "DASH", "ZEC", +} +BRIDGED_TOKEN_MARKERS = ( + "wrapped", "wormhole", "portal", "bridged", "bridge", "axelar", "allbridge", + "binance-peg", "multichain", "layerzero", "lz", "wavax", "wfil", +) +DEX_CHAIN_ALIASES = { + "ethereum": "ethereum", + "eth": "ethereum", + "bsc": "bsc", + "bnb": "bsc", + "base": "base", + "arbitrum": "arbitrum", + "arb": "arbitrum", + "solana": "solana", + "sol": "solana", +} + +DEXSCREENER_RAW_ENDPOINTS = ( + ("token_profile_latest", "https://api.dexscreener.com/token-profiles/latest/v1"), + ("token_boost_latest", "https://api.dexscreener.com/token-boosts/latest/v1"), + ("token_boost_top", "https://api.dexscreener.com/token-boosts/top/v1"), +) + + +def _env_bool(name, default=False): + value = os.getenv(name) + if value is None: + return default + return str(value).strip().lower() in ("1", "true", "yes", "on") + + +def _env_int(name, default): + try: + return int(os.getenv(name, str(default)) or default) + except Exception: + return default + + +def _env_float(name, default): + try: + return float(os.getenv(name, str(default)) or default) + except Exception: + return default + + +def get_onchain_params(): + """Runtime provider config. Keep this out of rules.yaml.""" + chains = [x.strip().lower() for x in os.getenv("ALPHAX_ONCHAIN_CHAINS", ",".join(DEFAULT_CHAINS)).split(",") if x.strip()] + return { + "enabled": _env_bool("ALPHAX_ONCHAIN_ENABLED", False), + "chains": chains or list(DEFAULT_CHAINS), + "timeout": _env_int("ALPHAX_ONCHAIN_TIMEOUT", 15), + "candidate_enabled": _env_bool("ALPHAX_ONCHAIN_CANDIDATE_ENABLED", True), + "candidate_min_score": _env_float("ALPHAX_ONCHAIN_CANDIDATE_MIN_SCORE", 70), + "candidate_min_confidence": _env_int("ALPHAX_ONCHAIN_CANDIDATE_MIN_CONFIDENCE", 70), + "candidate_cooldown_hours": _env_float("ALPHAX_ONCHAIN_CANDIDATE_COOLDOWN_HOURS", 6), + "dexscreener_enabled": _env_bool("ALPHAX_ONCHAIN_DEXSCREENER_ENABLED", True), + "dex_volume_spike_pct": _env_float("ALPHAX_ONCHAIN_DEX_VOLUME_SPIKE_PCT", 80), + "dex_min_liquidity_usd": _env_float("ALPHAX_ONCHAIN_DEX_MIN_LIQUIDITY_USD", 100000), + "dex_min_volume_24h_usd": _env_float("ALPHAX_ONCHAIN_DEX_MIN_VOLUME_24H_USD", 100000), + "liquidity_add_pct": _env_float("ALPHAX_ONCHAIN_LIQUIDITY_ADD_PCT", 25), + "liquidity_remove_pct": _env_float("ALPHAX_ONCHAIN_LIQUIDITY_REMOVE_PCT", -25), + "whale_tx_usd": _env_float("ALPHAX_ONCHAIN_WHALE_TX_USD", 250000), + "etherscan_api_key": os.getenv("ALPHAX_ETHERSCAN_API_KEY", "").strip(), + "helius_api_key": os.getenv("ALPHAX_HELIUS_API_KEY", "").strip(), + } + + +def _now(): + return datetime.now() + + +def _request_json(url, params=None, timeout=15): + resp = requests.get(url, params=params or {}, timeout=timeout, headers={"User-Agent": "AlphaX-Agent-Crypto/1.0"}) + if resp.status_code >= 400: + raise RuntimeError(f"http_{resp.status_code}:{resp.text[:200]}") + return resp.json() + + +def _safe_float(value, default=0.0): + try: + return float(value or 0) + except Exception: + return default + + +def _safe_pct_change(new_value, old_value): + new_value = _safe_float(new_value) + old_value = _safe_float(old_value) + if old_value <= 0: + return 0.0 + return (new_value - old_value) / old_value * 100 + + +def _chain_alias(value): + key = str(value or "").lower() + return DEX_CHAIN_ALIASES.get(key, key) + + +def _latest_metric(symbol, chain, contract_address): + conn = get_conn() + row = conn.execute( + """ + SELECT * FROM onchain_token_metrics + WHERE symbol=? AND chain=? AND contract_address=? AND window='1h' + ORDER BY metric_time DESC, id DESC LIMIT 1 + """, + (symbol, chain, contract_address or ""), + ).fetchone() + conn.close() + return dict(row) if row else None + + +def _event_amount(item): + return _safe_float(item.get("amount")) + + +def _event_total_amount(item): + return _safe_float(item.get("totalAmount") or item.get("total_amount")) + + +def _raw_importance(event_type, item): + amount = _event_amount(item) + total = _event_total_amount(item) + if event_type == "token_boost_top": + return max(total, amount, 1) + if event_type == "token_boost_latest": + return max(amount, total * 0.5, 1) + return 1 + + +def normalize_dexscreener_raw_event(item, event_type, cfg=None): + cfg = cfg or get_onchain_params() + chain = _chain_alias(item.get("chainId")) + if chain not in set(cfg.get("chains") or DEFAULT_CHAINS): + return None + token_address = str(item.get("tokenAddress") or "").strip() + if not token_address: + return None + mapping = find_mapping_by_contract(chain, token_address) + links = item.get("links") or [] + symbol_guess = "" + name = "" + if isinstance(links, list): + for link in links: + if not isinstance(link, dict): + continue + if not name and link.get("label"): + name = str(link.get("label") or "") + raw = { + "chainId": item.get("chainId"), + "tokenAddress": token_address, + "url": item.get("url") or "", + "description": item.get("description") or "", + "icon": item.get("icon") or "", + "header": item.get("header") or "", + "links": links, + "amount": item.get("amount"), + "totalAmount": item.get("totalAmount"), + } + title = "DEX Screener" + if event_type == "token_profile_latest": + title = "Token 资料更新" + elif event_type == "token_boost_latest": + title = "DEX Boost 新增" + elif event_type == "token_boost_top": + title = "DEX Boost 榜单" + return { + "source": "dexscreener", + "chain": chain, + "event_type": event_type, + "token_address": token_address, + "symbol_guess": symbol_guess, + "name": name, + "title": title, + "description": item.get("description") or "", + "url": item.get("url") or "", + "icon": item.get("icon") or "", + "amount": _event_amount(item), + "total_amount": _event_total_amount(item), + "importance": _raw_importance(event_type, item), + "mapped_symbol": mapping.get("symbol") if mapping else "", + "mapping_status": "mapped" if mapping else "unmapped", + "detected_at": _now().isoformat(timespec="seconds"), + "raw": raw, + } + + +def fetch_dexscreener_raw_events(limit=80): + cfg = get_onchain_params() + if not cfg.get("dexscreener_enabled", True): + return {"raw_events": [], "errors": ["dexscreener_disabled"]} + inserted = [] + errors = [] + per_source_limit = max(1, int(limit or 80)) + for event_type, url in DEXSCREENER_RAW_ENDPOINTS: + try: + data = _request_json(url, timeout=cfg.get("timeout", 15)) + items = data if isinstance(data, list) else data.get("items") or data.get("data") or [] + for item in items[:per_source_limit]: + if not isinstance(item, dict): + continue + event = normalize_dexscreener_raw_event(item, event_type, cfg=cfg) + if not event: + continue + if insert_onchain_raw_event(event): + inserted.append(event) + except Exception as exc: + errors.append(f"{event_type}:{str(exc)[:160]}") + return {"raw_events": inserted, "errors": errors} + + +def _discover_seed_symbols(limit=120): + conn = get_conn() + symbols = [] + try: + rows = conn.execute( + """ + SELECT DISTINCT symbol + FROM recommendation + WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history' + ORDER BY rec_time DESC + LIMIT ? + """, + (int(limit or 120),), + ).fetchall() + symbols.extend([row["symbol"] for row in rows if row["symbol"]]) + except Exception: + pass + try: + rows = conn.execute( + """ + SELECT DISTINCT symbol + FROM coin_state + WHERE state != '过期' + ORDER BY detected_at DESC + LIMIT ? + """, + (int(limit or 120),), + ).fetchall() + symbols.extend([row["symbol"] for row in rows if row["symbol"]]) + except Exception: + pass + conn.close() + seen = set() + ordered = [] + for symbol in symbols: + norm = normalize_symbol(symbol) + if not norm or norm in seen or not _tradable_symbol(norm): + continue + seen.add(norm) + ordered.append(norm) + return ordered[: int(limit or 120)] + + +def _score_pair_candidate(pair, requested_symbol, chains): + base = (pair.get("baseToken") or {}) + quote = (pair.get("quoteToken") or {}) + base_symbol = str(base.get("symbol") or "").upper() + req_base = str(requested_symbol or "").split("/")[0].upper() + liquidity = _safe_float((pair.get("liquidity") or {}).get("usd")) + volume = _safe_float((pair.get("volume") or {}).get("h24")) + chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or "").lower(), str(pair.get("chainId") or "").lower()) + score = 0 + if base_symbol == req_base: + score += 50 + if chain in set(chains or []): + score += 15 + if quote.get("symbol") in ("USDT", "USDC", "USD", "FDUSD", "USDE", "DAI", "USDS"): + score += 10 + if liquidity >= 100000: + score += 10 + if volume >= 100000: + score += 10 + if liquidity >= 500000: + score += 5 + return score + + +def _pair_rejection_reason(pair, requested_symbol, chains): + base = pair.get("baseToken") or {} + quote = pair.get("quoteToken") or {} + req_base = str(requested_symbol or "").split("/")[0].upper() + base_symbol = str(base.get("symbol") or "").upper() + base_name = str(base.get("name") or "").lower() + pair_url = str(pair.get("url") or "").lower() + chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or "").lower(), str(pair.get("chainId") or "").lower()) + + if base_symbol != req_base: + return "symbol_mismatch" + if chain not in set(chains or []): + return "chain_not_supported" + if req_base in NON_TARGET_NATIVE_BASES: + return "native_chain_not_in_scope" + if chain == "solana" and req_base not in SOLANA_AUTO_ALLOWLIST: + return "solana_not_allowlisted" + text = " ".join([base_name, base_symbol.lower(), str(quote.get("symbol") or "").lower(), pair_url]) + if any(marker in text for marker in BRIDGED_TOKEN_MARKERS): + return "bridged_or_wrapped_token" + return "" + + +def discover_token_mappings(limit=60): + cfg = get_onchain_params() + chains = set(cfg.get("chains") or DEFAULT_CHAINS) + seeds = _discover_seed_symbols(limit=limit) + if not seeds: + return {"inserted": 0, "candidates": [], "errors": ["no_seed_symbols"]} + inserted = [] + errors = [] + for symbol in seeds: + existing = get_token_mappings(symbol, min_confidence=1, active_only=False) + if existing: + continue + base = symbol.split("/")[0] + try: + data = _request_json("https://api.dexscreener.com/latest/dex/search", params={"q": base}, timeout=cfg.get("timeout", 15)) + pairs = data.get("pairs") or [] + pair_candidates = [] + for pair in pairs: + chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or "").lower(), str(pair.get("chainId") or "").lower()) + if chain not in chains: + continue + if _pair_rejection_reason(pair, symbol, chains): + continue + pair_candidates.append((pair, _score_pair_candidate(pair, symbol, chains))) + if not pair_candidates: + continue + pair_candidates.sort(key=lambda x: (x[1], _safe_float((x[0].get("liquidity") or {}).get("usd")), _safe_float((x[0].get("volume") or {}).get("h24"))), reverse=True) + best, score = pair_candidates[0] + if score < 55: + continue + base_token = best.get("baseToken") or {} + chain = DEX_CHAIN_ALIASES.get(str(best.get("chainId") or "").lower(), str(best.get("chainId") or "").lower()) + contract = str(base_token.get("address") or "").strip() + if not contract: + continue + confidence = min(95, 60 + score) + mapping_id = onchain_db.upsert_token_mapping( + symbol=symbol, + chain=chain, + contract_address=contract, + source="dexscreener_search", + confidence=confidence, + raw={ + "search_query": base, + "matched_pair": { + "pairAddress": best.get("pairAddress") or "", + "dexId": best.get("dexId") or "", + "url": best.get("url") or "", + "liquidity": best.get("liquidity") or {}, + "volume": best.get("volume") or {}, + "priceChange": best.get("priceChange") or {}, + "baseToken": base_token, + "quoteToken": best.get("quoteToken") or {}, + }, + }, + is_active=True, + ) + if mapping_id: + inserted.append({"symbol": symbol, "chain": chain, "contract_address": contract, "confidence": confidence}) + except Exception as exc: + errors.append(f"{symbol}:{str(exc)[:160]}") + return {"inserted": len(inserted), "candidates": inserted, "errors": errors} + + +def _score_metric(metric): + score = 0.0 + risk = 0.0 + vol_change = _safe_float(metric.get("dex_volume_change_pct")) + liq_change = _safe_float(metric.get("liquidity_change_pct")) + netflow = _safe_float(metric.get("exchange_netflow_usd")) + whale = _safe_float(metric.get("whale_accumulation_usd")) + smart = _safe_float(metric.get("smart_money_score")) + if vol_change > 0: + score += min(35, vol_change / 4) + if liq_change > 0: + score += min(20, liq_change / 3) + if netflow < 0: + score += min(20, abs(netflow) / 100000) + if whale > 0: + score += min(20, whale / 100000) + score += min(20, smart) + if liq_change < 0: + risk += min(40, abs(liq_change)) + if netflow > 0: + risk += min(35, netflow / 100000) + metric["onchain_score"] = round(min(score, 100), 2) + metric["risk_score"] = round(min(risk, 100), 2) + return metric + + +def derive_dex_signals(metric, cfg=None): + cfg = cfg or get_onchain_params() + signals = [] + vol_change = _safe_float(metric.get("dex_volume_change_pct")) + liq_change = _safe_float(metric.get("liquidity_change_pct")) + if vol_change >= cfg.get("dex_volume_spike_pct", 80): + signals.append("dex_volume_spike") + if liq_change >= cfg.get("liquidity_add_pct", 25): + signals.append("liquidity_add") + if liq_change <= cfg.get("liquidity_remove_pct", -25): + signals.append("liquidity_remove_risk") + return signals + + +def _event_from_metric(metric, signal_code, source="dexscreener"): + direction = signal_direction(signal_code) + severity = "RISK" if direction == "risk" else "A" if _safe_float(metric.get("onchain_score")) >= 75 else "B" + return { + "chain": metric.get("chain"), + "symbol": metric.get("symbol"), + "contract_address": metric.get("contract_address") or "", + "event_type": "onchain_signal", + "signal_code": signal_code, + "signal_label": signal_label(signal_code), + "direction": direction, + "value_usd": metric.get("dex_volume_usd") or metric.get("whale_accumulation_usd") or abs(metric.get("exchange_netflow_usd") or 0), + "confidence": 75 if direction != "risk" else 80, + "severity": severity, + "detected_at": metric.get("metric_time") or _now().isoformat(), + "source": source, + "url": metric.get("url") or "", + "raw": metric, + } + + +def normalize_dexscreener_pair(pair, mapping, cfg=None): + cfg = cfg or get_onchain_params() + symbol = normalize_symbol(mapping.get("symbol")) + chain = DEX_CHAIN_ALIASES.get(str(pair.get("chainId") or mapping.get("chain") or "").lower(), str(mapping.get("chain") or "").lower()) + contract = mapping.get("contract_address") or (pair.get("baseToken") or {}).get("address") or "" + liquidity = _safe_float((pair.get("liquidity") or {}).get("usd")) + volume = _safe_float((pair.get("volume") or {}).get("h24")) + prev = _latest_metric(symbol, chain, contract) + prev_volume = _safe_float(prev.get("dex_volume_usd") if prev else 0) + prev_liquidity = _safe_float(prev.get("liquidity_usd") if prev else 0) + metric = { + "symbol": symbol, + "chain": chain, + "contract_address": contract, + "window": "1h", + "metric_time": _now().isoformat(timespec="seconds"), + "dex_volume_usd": volume, + "dex_volume_change_pct": _safe_pct_change(volume, prev_volume), + "liquidity_usd": liquidity, + "liquidity_change_pct": _safe_pct_change(liquidity, prev_liquidity), + "exchange_netflow_usd": 0, + "whale_accumulation_usd": 0, + "holder_delta": 0, + "smart_money_score": 0, + "source": "dexscreener", + "url": pair.get("url") or "", + "raw": { + "pair_address": pair.get("pairAddress") or "", + "dex_id": pair.get("dexId") or "", + "price_usd": pair.get("priceUsd") or "", + "fdv": pair.get("fdv") or 0, + "txns": pair.get("txns") or {}, + "price_change": pair.get("priceChange") or {}, + "volume": pair.get("volume") or {}, + "liquidity": pair.get("liquidity") or {}, + }, + } + return _score_metric(metric) + + +def fetch_dexscreener_metrics(limit=60): + cfg = get_onchain_params() + if not cfg.get("dexscreener_enabled", True): + return {"metrics": [], "events": [], "errors": ["dexscreener_disabled"]} + mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) + bootstrap = None + if not mappings: + bootstrap = discover_token_mappings(limit=limit) + mappings = get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) + metrics = [] + events = [] + errors = [] + if bootstrap: + errors.extend(bootstrap.get("errors") or []) + for mapping in mappings[: int(limit or 60)]: + symbol = normalize_symbol(mapping.get("symbol")) + if not symbol or not _tradable_symbol(symbol): + continue + try: + url = "https://api.dexscreener.com/latest/dex/tokens/" + str(mapping.get("contract_address") or "").strip() + data = _request_json(url, timeout=cfg.get("timeout", 15)) + pairs = data.get("pairs") or [] + wanted_chain = DEX_CHAIN_ALIASES.get(str(mapping.get("chain") or "").lower(), str(mapping.get("chain") or "").lower()) + pairs = [p for p in pairs if DEX_CHAIN_ALIASES.get(str(p.get("chainId") or "").lower(), str(p.get("chainId") or "").lower()) == wanted_chain] + if not pairs: + continue + best = max(pairs, key=lambda p: _safe_float((p.get("liquidity") or {}).get("usd"))) + metric = normalize_dexscreener_pair(best, mapping, cfg=cfg) + if metric.get("liquidity_usd", 0) < cfg.get("dex_min_liquidity_usd", 100000) and metric.get("dex_volume_usd", 0) < cfg.get("dex_min_volume_24h_usd", 100000): + insert_token_metric(metric) + metrics.append(metric) + continue + insert_token_metric(metric) + metrics.append(metric) + for code in derive_dex_signals(metric, cfg): + event = _event_from_metric(metric, code, source="dexscreener") + if insert_onchain_event(event): + events.append(event) + except Exception as exc: + errors.append(f"{symbol}:{str(exc)[:160]}") + return {"metrics": metrics, "events": events, "errors": errors} + + +def ingest_normalized_events(events): + """Test/integration helper for provider adapters.""" + init_db() + init_onchain_tables() + inserted = [] + for event in events or []: + eid = insert_onchain_event(event) + if eid: + item = dict(event) + item["id"] = eid + inserted.append(item) + queued = enqueue_onchain_candidates() + return {"inserted": len(inserted), "queued": queued.get("queued", 0), "events": inserted, "candidate_result": queued} + + +def _candidate_title(event): + label = event.get("signal_label") or signal_label(event.get("signal_code")) + value = _safe_float(event.get("value_usd")) + value_txt = f" · ${value:,.0f}" if value > 0 else "" + return f"链上异动 {event.get('symbol')}: {label}{value_txt}" + + +def enqueue_onchain_candidates(min_score=None, min_confidence=None, cooldown_hours=None, limit=20): + cfg = get_onchain_params() + if not cfg.get("candidate_enabled", True): + return {"queued": 0, "skipped": 0, "symbols": [], "reason": "candidate_disabled"} + min_score = cfg.get("candidate_min_score", 70) if min_score is None else min_score + min_confidence = cfg.get("candidate_min_confidence", 70) if min_confidence is None else min_confidence + cooldown_hours = cfg.get("candidate_cooldown_hours", 6) if cooldown_hours is None else cooldown_hours + init_onchain_tables() + init_event_tables() + cutoff = (_now() - timedelta(hours=24)).isoformat() + conn = get_conn() + rows = conn.execute( + """ + SELECT e.*, + COALESCE(( + SELECT m.onchain_score FROM onchain_token_metrics m + WHERE m.symbol=e.symbol AND m.chain=e.chain + ORDER BY datetime(m.metric_time) DESC, m.id DESC LIMIT 1 + ), 0) AS latest_onchain_score, + COALESCE(( + SELECT m.risk_score FROM onchain_token_metrics m + WHERE m.symbol=e.symbol AND m.chain=e.chain + ORDER BY datetime(m.metric_time) DESC, m.id DESC LIMIT 1 + ), 0) AS latest_risk_score + FROM onchain_events e + WHERE e.status IN ('new', 'candidate_failed') + AND e.detected_at >= ? + AND e.direction='positive' + ORDER BY e.confidence DESC, e.value_usd DESC, datetime(e.detected_at) DESC + LIMIT ? + """, + (cutoff, int(limit or 20)), + ).fetchall() + queued = [] + skipped_ids = [] + now = _now().isoformat(timespec="seconds") + cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat() + for row in rows: + event = dict(row) + symbol = normalize_symbol(event.get("symbol")) + if not symbol or not _tradable_symbol(symbol): + skipped_ids.append(event["id"]) + continue + score = max(_safe_float(event.get("latest_onchain_score")), _safe_float(event.get("confidence"))) + if score < float(min_score or 0) or int(event.get("confidence") or 0) < int(min_confidence or 0): + continue + recent = conn.execute( + """ + SELECT id FROM event_news + WHERE source='onchain' AND symbol=? AND detected_at >= ? + LIMIT 1 + """, + (symbol, cooldown_cutoff), + ).fetchone() + if recent: + skipped_ids.append(event["id"]) + continue + title = _candidate_title(event) + h = event_hash("onchain", title, symbol) + try: + conn.execute( + """ + INSERT INTO event_news + (event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed) + VALUES (?, 'onchain', ?, ?, ?, ?, ?, ?, 'onchain_candidate', ?, 0) + """, + ( + h, + symbol, + title, + event.get("url") or "", + event.get("detected_at") or now, + now, + event.get("severity") or "A", + json.dumps( + { + "onchain_event_id": event.get("id"), + "chain": event.get("chain"), + "signal_code": event.get("signal_code"), + "signal_label": event.get("signal_label"), + "confidence": event.get("confidence"), + "value_usd": event.get("value_usd"), + "onchain_score": event.get("latest_onchain_score"), + "risk_score": event.get("latest_risk_score"), + }, + ensure_ascii=False, + ), + ), + ) + conn.execute("UPDATE onchain_events SET status='candidate_queued' WHERE id=?", (event.get("id"),)) + queued.append(symbol) + except Exception: + skipped_ids.append(event["id"]) + if skipped_ids: + conn.execute( + "UPDATE onchain_events SET status='candidate_skipped' WHERE id IN (" + ",".join(["?"] * len(skipped_ids)) + ")", + tuple(skipped_ids), + ) + conn.commit() + conn.close() + return {"queued": len(queued), "skipped": len(skipped_ids), "symbols": queued} + + +def run_once(limit=60): + started = _now() + init_db() + init_onchain_tables() + cfg = get_onchain_params() + output = { + "status": "disabled" if not cfg.get("enabled") else "processed", + "metrics_count": 0, + "events_count": 0, + "raw_events_count": 0, + "candidate_queued": 0, + "errors": [], + "check_time": _now().isoformat(), + } + if cfg.get("enabled"): + raw = fetch_dexscreener_raw_events(limit=limit) + output["raw_events_count"] = len(raw.get("raw_events") or []) + output["errors"].extend(raw.get("errors") or []) + dex = fetch_dexscreener_metrics(limit=limit) + output["metrics_count"] += len(dex.get("metrics") or []) + output["events_count"] += len(dex.get("events") or []) + output["errors"].extend(dex.get("errors") or []) + output["discovered_mappings"] = discover_token_mappings(limit=limit).get("inserted", 0) if not get_token_mappings(min_confidence=MIN_MAPPING_CONFIDENCE) else 0 + if output.get("discovered_mappings"): + output["status"] = "bootstrapped" + dex = fetch_dexscreener_metrics(limit=limit) + output["metrics_count"] = len(dex.get("metrics") or []) + output["events_count"] = len(dex.get("events") or []) + output["errors"].extend(dex.get("errors") or []) + queued = enqueue_onchain_candidates() + output["candidate_queued"] = queued.get("queued", 0) + output["candidate_symbols"] = queued.get("symbols", []) + if not output["metrics_count"] and not output["events_count"] and not output["raw_events_count"]: + output["status"] = "no_onchain_data" + log_cron_run( + job_name="链上", + script_name="onchain_monitor.py", + run_status="success" if not output["errors"] else "error", + result_status=output["status"], + started_at=started.isoformat(), + finished_at=_now().isoformat(), + duration_ms=int((_now() - started).total_seconds() * 1000), + summary={ + "metrics_count": output["metrics_count"], + "events_count": output["events_count"], + "raw_events_count": output["raw_events_count"], + "candidate_queued": output["candidate_queued"], + "enabled": cfg.get("enabled"), + }, + error_message="; ".join(output["errors"][:5]), + ) + print(json.dumps(output, ensure_ascii=False, indent=2, default=str)) + return output + + +__all__ = [ + "POSITIVE_SIGNALS", + "RISK_SIGNALS", + "derive_dex_signals", + "enqueue_onchain_candidates", + "fetch_dexscreener_metrics", + "fetch_dexscreener_raw_events", + "get_onchain_params", + "ingest_normalized_events", + "normalize_dexscreener_pair", + "run_once", +] diff --git a/app/web/routes_onchain.py b/app/web/routes_onchain.py new file mode 100644 index 0000000..3efcd60 --- /dev/null +++ b/app/web/routes_onchain.py @@ -0,0 +1,75 @@ +from fastapi import APIRouter, Cookie + +from app.db.onchain_db import ( + get_onchain_overview, + get_onchain_token_detail, + list_onchain_events, + list_onchain_raw_events, + list_onchain_tokens, +) +from app.web.shared import require_api_user_with_subscription + + +router = APIRouter() + + +@router.get("/api/onchain/overview") +async def api_onchain_overview(hours: int = 24, altcoin_session: str = Cookie(default="")): + require_api_user_with_subscription(altcoin_session) + return get_onchain_overview(hours=hours) + + +@router.get("/api/onchain/tokens") +async def api_onchain_tokens( + limit: int = 30, + offset: int = 0, + chain: str = "", + signal: str = "", + hours: int = 24, + altcoin_session: str = Cookie(default=""), +): + require_api_user_with_subscription(altcoin_session) + return list_onchain_tokens(limit=limit, offset=offset, chain=chain, signal=signal, hours=hours) + + +@router.get("/api/onchain/tokens/{symbol:path}") +async def api_onchain_token_detail(symbol: str, hours: int = 72, altcoin_session: str = Cookie(default="")): + require_api_user_with_subscription(altcoin_session) + return get_onchain_token_detail(symbol=symbol, hours=hours) + + +@router.get("/api/onchain/events") +async def api_onchain_events( + limit: int = 50, + offset: int = 0, + chain: str = "", + signal: str = "", + status: str = "", + hours: int = 24, + altcoin_session: str = Cookie(default=""), +): + require_api_user_with_subscription(altcoin_session) + return list_onchain_events(limit=limit, offset=offset, chain=chain, signal=signal, status=status, hours=hours) + + +@router.get("/api/onchain/raw-events") +async def api_onchain_raw_events( + limit: int = 50, + offset: int = 0, + chain: str = "", + source: str = "", + event_type: str = "", + mapping_status: str = "", + hours: int = 24, + altcoin_session: str = Cookie(default=""), +): + require_api_user_with_subscription(altcoin_session) + return list_onchain_raw_events( + limit=limit, + offset=offset, + chain=chain, + source=source, + event_type=event_type, + mapping_status=mapping_status, + hours=hours, + ) diff --git a/app/web/routes_pages.py b/app/web/routes_pages.py index ffe422f..04a7891 100644 --- a/app/web/routes_pages.py +++ b/app/web/routes_pages.py @@ -109,6 +109,13 @@ def build_router(templates, repo_root: Path, stock_report_template: str): return redirect return render_page("sentiment.html", request) + @router.get("/onchain", response_class=HTMLResponse) + async def onchain_page(request: Request): + user, redirect = require_page_user(request) + if redirect: + return redirect + return render_page("onchain.html", request) + @router.get("/iteration", response_class=HTMLResponse) async def iteration_page(request: Request): user, redirect = require_page_user(request) diff --git a/app/web/web_server.py b/app/web/web_server.py index d814faa..ba650e0 100644 --- a/app/web/web_server.py +++ b/app/web/web_server.py @@ -15,6 +15,7 @@ from app.db.recommendation_queries import get_active_recommendations, get_active from app.web.routes_admin import build_router as build_admin_router from app.web.routes_auth import router as auth_router from app.web.routes_content import build_router as build_content_router +from app.web.routes_onchain import router as onchain_router from app.web.routes_pages import build_router as build_pages_router from app.web.routes_recommendations import router as recommendations_router from app.web.routes_strategy import router as strategy_router @@ -41,6 +42,7 @@ templates = Jinja2Templates(directory=str(REPO_ROOT / "static")) app.include_router(auth_router) app.include_router(recommendations_router) app.include_router(strategy_router) +app.include_router(onchain_router) app.include_router(build_admin_router(templates)) app.include_router(build_content_router(REPO_ROOT)) app.include_router(build_pages_router(templates, REPO_ROOT, STOCK_REPORT_TEMPLATE)) diff --git a/docker-compose.yml b/docker-compose.yml index 19c81ee..4869a80 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,7 +40,7 @@ services: - .env environment: # 本地 Docker 副本需要真实跑链路,方便验证筛选/确认/跟踪/复盘结果。 - # 调度器仍然单进程串行执行,避免 SQLite 写锁。 + # 调度器以后台子进程方式并发执行,并通过业务锁组规避 SQLite 写冲突。 ALPHAX_SCHEDULER_DRY_RUN: "0" ALPHAX_DB_PATH: "/app/data/altcoin_monitor.db" command: ["scheduler"] diff --git a/rules.yaml b/rules.yaml index b339c41..27c6a77 100644 --- a/rules.yaml +++ b/rules.yaml @@ -407,11 +407,11 @@ event_driven: note: Solana meme主题扩散 meta: version: 1 - last_review: '2026-05-15T00:15:38.149520' - last_reverse_analysis: '2026-05-15T00:16:18.257946' - total_reviews: 38 + last_review: '2026-05-15T11:16:43.982118' + last_reverse_analysis: '2026-05-15T11:17:20.540656' + total_reviews: 44 total_rules_learned: 37 - iteration_count: 43 + iteration_count: 49 strategy_version: v1.7.11 strategy_revision_started_at: '2026-05-09T01:20:00' strategy_revision_note: 'v1.7.11: 触发时效治理,旧形态只作背景,消息触发显式标记' diff --git a/static/admin.html b/static/admin.html index 7dd9b12..115a6d8 100644 --- a/static/admin.html +++ b/static/admin.html @@ -4,6 +4,7 @@ {% block nav_links %} 看板 舆情 +链上异动 订阅 推荐 diff --git a/static/app.html b/static/app.html index cb25e53..b7ddbb4 100644 --- a/static/app.html +++ b/static/app.html @@ -167,6 +167,12 @@ .ai-insight .ai-text { color: var(--ink); font-size: 12px; line-height: 1.45; margin-top: 3px; word-break: break-word; } .ai-insight .ai-list { display: flex; flex-wrap: wrap; gap: 4px; } .ai-insight .ai-pill { display: inline-flex; padding: 4px 7px; border-radius: 999px; font-size: 11px; color: var(--slate); background: var(--canvas); border: 1px solid var(--hairline-soft); } +.onchain-brief { margin: 0 18px 8px; border: 1px solid rgba(66,98,255,.14); border-radius: var(--radius-lg); background: rgba(66,98,255,.045); padding: 9px 10px; display: grid; gap: 6px; } +.onchain-brief.risk { border-color: rgba(229,62,62,.18); background: var(--red-light); } +.onchain-head { display:flex; align-items:center; justify-content:space-between; gap:8px; color:var(--ink); font-size:12px; font-weight:900; } +.onchain-meta { color:var(--stone); font-size:11px; line-height:1.45; } +.onchain-score { color:var(--blue); font-weight:950; font-family:ui-monospace,SFMono-Regular,Menlo,monospace; } +.onchain-brief.risk .onchain-score { color:var(--red); } /* ===== K-LINE ===== */ .kline-wrap { padding: 0 8px 4px; } @@ -241,6 +247,7 @@ .stats-main { width: 100%; } .entry-plan { grid-template-columns: repeat(2, minmax(0, 1fr)); padding: 8px 14px; } .decision-strip { margin: 0 14px 8px; grid-template-columns: 86px minmax(0,1fr); } + .onchain-brief { margin: 0 14px 8px; } } @media(max-width:360px) { @@ -774,6 +781,15 @@ function renderRecCard(r) { ''+ ''; } + var onchainHtml = ''; + var oc = r.onchain_context || null; + if (oc && (oc.event_count_24h || oc.onchain_score || oc.risk_score)) { + var ocRisk = Number(oc.risk_event_count_24h || 0) > 0 || Number(oc.risk_score || 0) >= 60; + var ocTitle = cleanDisplayText(oc.headline || (ocRisk ? '链上风险升温' : '链上资金异动')); + var ocScore = ocRisk ? Number(oc.risk_score || 0).toFixed(0) : Number(oc.onchain_score || 0).toFixed(0); + var ocMeta = [oc.chain || '链上', '24h事件 '+(oc.event_count_24h || 0), oc.dex_volume_usd ? ('DEX量 $'+fmtCompactNumber(oc.dex_volume_usd)) : ''].filter(Boolean).join(' · '); + onchainHtml = '
'+ocTitle+''+ocScore+'
'+escHtml(ocMeta)+'
'; + } var entryPlanHtml = ''; if (isTradePlan) { entryPlanHtml = '
' + @@ -793,6 +809,7 @@ function renderRecCard(r) { return '
'+base.slice(0,2).toUpperCase()+'
'+base+'
'+actionBadge+''+score+''+st.label+'
'+ '
$'+priceFmt+''+changeHtml+'
'+ decisionHtml+ + onchainHtml+ aiInsightHtml+ '
'+ (isWeakObserve ? weakNoteHtml : entryPlanHtml)+ diff --git a/static/base.html b/static/base.html index 35026b5..e3a1de6 100644 --- a/static/base.html +++ b/static/base.html @@ -157,6 +157,7 @@ a { color: inherit; text-decoration: none; } + @@ -173,6 +174,7 @@ a { color: inherit; text-decoration: none; } {% block nav_links %} 看板 舆情 + 链上异动 订阅 推荐 diff --git a/static/cron.html b/static/cron.html index 871120a..c5d419e 100644 --- a/static/cron.html +++ b/static/cron.html @@ -3,6 +3,7 @@ {% block nav_links %} 看板 舆情 +链上异动 订阅 推荐 diff --git a/static/iteration.html b/static/iteration.html index 6eff4a1..12cb772 100644 --- a/static/iteration.html +++ b/static/iteration.html @@ -3,6 +3,7 @@ {% block nav_links %} 看板 舆情 +链上异动 订阅 推荐 diff --git a/static/llm_insights.html b/static/llm_insights.html index 5fe505d..0d2261f 100644 --- a/static/llm_insights.html +++ b/static/llm_insights.html @@ -3,6 +3,7 @@ {% block nav_links %} 看板 舆情 +链上异动 订阅 推荐 diff --git a/static/onchain.html b/static/onchain.html new file mode 100644 index 0000000..b738c4a --- /dev/null +++ b/static/onchain.html @@ -0,0 +1,88 @@ +{% extends "base.html" %} +{% block title %}AlphaX Agent | Crypto — 链上异动{% endblock %} +{% block nav_links %} +看板 +舆情 +链上异动 +订阅 +推荐 + + + + + + + +{% endblock %} +{% block extra_head_css %} + +{% endblock %} +{% block content %} +
+
+

链上异动

跟踪 DEX 放量、流动性变化、资金流和鲸鱼行为。链上信号只负责发现线索,最终仍交给技术确认。

+
+ + +
+
+
链上异动不是买入指令。高质量正向信号会进入技术检查;交易所流入、流动性撤出、持仓集中等负向信号只作为风险上下文。
+
+
实时链上流
原始事件先展示,映射后再进入分析链路
+
+
+ + +
+
--
+
+
加载中...
+
+
加载中...
+
+
链上热度
按 onchain score
加载中...
+
风险异动
流入/撤池/集中度
加载中...
+
+
+
+
+ + +
+
币种链上分风险分DEX 成交成交变化流动性变化主链路
加载中...
+
--
+
+
+
单币详情
选择币种
+
从列表中选择一个币,查看链上事件与主链路关系。
+
+
+
+{% endblock %} +{% block extra_script %} + +{% endblock %} diff --git a/static/pipeline.html b/static/pipeline.html index c6291bb..dec791e 100644 --- a/static/pipeline.html +++ b/static/pipeline.html @@ -3,6 +3,7 @@ {% block nav_links %} 看板 舆情 +链上异动 订阅 推荐 diff --git a/static/referral.html b/static/referral.html index 3178f65..78a5443 100644 --- a/static/referral.html +++ b/static/referral.html @@ -4,6 +4,7 @@ {% block nav_links %} 看板 舆情 +链上异动 订阅 推荐 diff --git a/static/sentiment.html b/static/sentiment.html index a2c758a..1dc4a1c 100644 --- a/static/sentiment.html +++ b/static/sentiment.html @@ -3,6 +3,7 @@ {% block nav_links %} 看板 舆情 +链上异动 订阅 推荐 diff --git a/static/strategy.html b/static/strategy.html index 5bd026e..3cd8f52 100644 --- a/static/strategy.html +++ b/static/strategy.html @@ -3,6 +3,7 @@ {% block nav_links %} 看板 舆情 +链上异动 订阅 推荐 diff --git a/static/subscription.html b/static/subscription.html index 29b15f4..983d157 100644 --- a/static/subscription.html +++ b/static/subscription.html @@ -3,6 +3,7 @@ {% block nav_links %} 看板 舆情 +链上异动 订阅 推荐 diff --git a/static/watchlist.html b/static/watchlist.html index 89034cb..130e50f 100644 --- a/static/watchlist.html +++ b/static/watchlist.html @@ -3,6 +3,7 @@ {% block nav_links %} 看板 舆情 +链上异动 订阅 推荐 diff --git a/tests/test_onchain_tracking.py b/tests/test_onchain_tracking.py new file mode 100644 index 0000000..d0522bb --- /dev/null +++ b/tests/test_onchain_tracking.py @@ -0,0 +1,281 @@ +import json +import os +import sqlite3 +import sys +from datetime import datetime + +from fastapi.testclient import TestClient + +PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if PROJECT_DIR not in sys.path: + sys.path.insert(0, PROJECT_DIR) + +from app.db import altcoin_db, onchain_db, scheduler_db +from app.services import onchain_monitor +from app.web import web_server + + +def _temp_db(monkeypatch, tmp_path): + db_path = tmp_path / "altcoin_monitor.db" + monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) + altcoin_db.init_db() + onchain_db.init_onchain_tables() + return db_path + + +def test_mapping_requires_confidence_and_preserves_multi_chain(monkeypatch, tmp_path): + _temp_db(monkeypatch, tmp_path) + onchain_db.upsert_token_mapping("ABC", "ethereum", "0xaaa", source="manual", confidence=95) + onchain_db.upsert_token_mapping("ABC", "bsc", "0xbbb", source="manual", confidence=55) + + usable = onchain_db.get_token_mappings("ABC", min_confidence=70) + + assert len(usable) == 1 + assert usable[0]["chain"] == "ethereum" + assert usable[0]["contract_address"] == "0xaaa" + + +def test_dex_signal_codes_from_metric(monkeypatch, tmp_path): + _temp_db(monkeypatch, tmp_path) + cfg = { + "dex_volume_spike_pct": 80, + "liquidity_add_pct": 25, + "liquidity_remove_pct": -25, + } + + assert onchain_monitor.derive_dex_signals({"dex_volume_change_pct": 120, "liquidity_change_pct": 40}, cfg) == [ + "dex_volume_spike", + "liquidity_add", + ] + assert onchain_monitor.derive_dex_signals({"dex_volume_change_pct": 10, "liquidity_change_pct": -35}, cfg) == [ + "liquidity_remove_risk" + ] + + +def test_auto_mapping_rejects_non_target_native_and_wrapped_tokens(monkeypatch, tmp_path): + _temp_db(monkeypatch, tmp_path) + cfg = onchain_monitor.get_onchain_params() + chains = set(cfg.get("chains") or []) + + avax_pair = { + "chainId": "solana", + "baseToken": {"symbol": "AVAX", "name": "Avalanche"}, + "quoteToken": {"symbol": "USDC"}, + "liquidity": {"usd": 500000}, + "volume": {"h24": 1000000}, + "url": "https://example.com", + } + wrapped_pair = { + "chainId": "ethereum", + "baseToken": {"symbol": "FIL", "name": "Wrapped Filecoin"}, + "quoteToken": {"symbol": "USDT"}, + "liquidity": {"usd": 500000}, + "volume": {"h24": 1000000}, + "url": "https://example.com/wrapped-filecoin", + } + + assert onchain_monitor._pair_rejection_reason(avax_pair, "AVAX/USDT", chains) == "native_chain_not_in_scope" + assert onchain_monitor._pair_rejection_reason(wrapped_pair, "FIL/USDT", chains) == "native_chain_not_in_scope" + assert onchain_monitor._pair_rejection_reason( + { + "chainId": "solana", + "baseToken": {"symbol": "UNKNOWN", "name": "Unknown"}, + "quoteToken": {"symbol": "USDC"}, + "liquidity": {"usd": 500000}, + "volume": {"h24": 1000000}, + "url": "https://example.com", + }, + "UNKNOWN/USDT", + chains, + ) == "solana_not_allowlisted" + + +def test_onchain_candidate_enqueues_event_news_not_recommendation(monkeypatch, tmp_path): + db_path = _temp_db(monkeypatch, tmp_path) + onchain_db.insert_token_metric( + { + "symbol": "ABC/USDT", + "chain": "ethereum", + "contract_address": "0xaaa", + "window": "1h", + "metric_time": datetime.now().isoformat(), + "dex_volume_usd": 500000, + "dex_volume_change_pct": 160, + "liquidity_usd": 300000, + "liquidity_change_pct": 35, + "onchain_score": 82, + "risk_score": 0, + "source": "test", + } + ) + event_id = onchain_db.insert_onchain_event( + { + "chain": "ethereum", + "symbol": "ABC/USDT", + "contract_address": "0xaaa", + "signal_code": "dex_volume_spike", + "direction": "positive", + "value_usd": 500000, + "confidence": 88, + "severity": "A", + "detected_at": datetime.now().isoformat(), + "source": "test", + } + ) + + result = onchain_monitor.enqueue_onchain_candidates(min_score=70, min_confidence=70, cooldown_hours=6) + + assert event_id > 0 + assert result["queued"] == 1 + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + news = conn.execute("SELECT * FROM event_news WHERE source='onchain'").fetchone() + rec_count = conn.execute("SELECT COUNT(*) FROM recommendation").fetchone()[0] + status = conn.execute("SELECT status FROM onchain_events WHERE id=?", (event_id,)).fetchone()[0] + conn.close() + assert news["event_type"] == "onchain_candidate" + assert json.loads(news["raw_json"])["signal_code"] == "dex_volume_spike" + assert rec_count == 0 + assert status == "candidate_queued" + + +def test_negative_onchain_signal_is_risk_context_only(monkeypatch, tmp_path): + db_path = _temp_db(monkeypatch, tmp_path) + onchain_db.insert_onchain_event( + { + "chain": "ethereum", + "symbol": "RISK/USDT", + "signal_code": "exchange_inflow_risk", + "direction": "risk", + "value_usd": 900000, + "confidence": 92, + "severity": "RISK", + "detected_at": datetime.now().isoformat(), + "source": "test", + } + ) + + result = onchain_monitor.enqueue_onchain_candidates(min_score=1, min_confidence=1) + + conn = sqlite3.connect(db_path) + news_count = conn.execute("SELECT COUNT(*) FROM event_news WHERE source='onchain'").fetchone()[0] + conn.close() + assert result["queued"] == 0 + assert news_count == 0 + + +def test_onchain_api_and_page(monkeypatch, tmp_path): + _temp_db(monkeypatch, tmp_path) + onchain_db.insert_token_metric( + { + "symbol": "ABC/USDT", + "chain": "base", + "contract_address": "0xabc", + "window": "1h", + "metric_time": datetime.now().isoformat(), + "dex_volume_usd": 123000, + "dex_volume_change_pct": 90, + "liquidity_usd": 456000, + "liquidity_change_pct": 12, + "onchain_score": 76, + "risk_score": 8, + "source": "test", + } + ) + client = TestClient(web_server.app) + + page = client.get("/onchain") + assert page.status_code == 200 + assert "链上异动" in page.text + + overview = client.get("/api/onchain/overview") + assert overview.status_code == 200 + assert overview.json()["kpi"]["token_count"] == 1 + + tokens = client.get("/api/onchain/tokens") + assert tokens.status_code == 200 + assert tokens.json()["items"][0]["symbol"] == "ABC/USDT" + + +def test_raw_dexscreener_events_store_without_mapping(monkeypatch, tmp_path): + _temp_db(monkeypatch, tmp_path) + monkeypatch.setenv("ALPHAX_ONCHAIN_ENABLED", "1") + monkeypatch.setenv("ALPHAX_ONCHAIN_CHAINS", "ethereum,solana") + + def fake_request(url, params=None, timeout=15): + if "token-profiles" in url: + return [ + { + "chainId": "ethereum", + "tokenAddress": "0xraw", + "url": "https://dexscreener.com/ethereum/0xraw", + "description": "Unmapped token started trending", + "icon": "https://example.com/icon.png", + } + ] + if "token-boosts/latest" in url: + return [ + { + "chainId": "solana", + "tokenAddress": "So111", + "url": "https://dexscreener.com/solana/So111", + "amount": 25, + "totalAmount": 100, + } + ] + if "token-boosts/top" in url: + return [] + return {"pairs": []} + + monkeypatch.setattr(onchain_monitor, "_request_json", fake_request) + + result = onchain_monitor.fetch_dexscreener_raw_events(limit=10) + + assert len(result["raw_events"]) == 2 + raw = onchain_db.list_onchain_raw_events(hours=24) + assert raw["total"] == 2 + assert raw["items"][0]["mapping_status"] == "unmapped" + assert raw["items"][0]["event_label"] + + +def test_raw_event_api_and_overview_counts(monkeypatch, tmp_path): + _temp_db(monkeypatch, tmp_path) + onchain_db.upsert_token_mapping("ABC", "base", "0xabc", source="manual", confidence=95) + onchain_db.insert_onchain_raw_event( + { + "source": "dexscreener", + "chain": "base", + "event_type": "token_boost_top", + "token_address": "0xabc", + "title": "DEX Boost 榜单", + "amount": 10, + "total_amount": 80, + "importance": 80, + "mapped_symbol": "ABC/USDT", + "mapping_status": "mapped", + "detected_at": datetime.now().isoformat(), + } + ) + client = TestClient(web_server.app) + + overview = client.get("/api/onchain/overview") + events = client.get("/api/onchain/raw-events") + + assert overview.status_code == 200 + assert overview.json()["kpi"]["raw_event_count"] == 1 + assert overview.json()["kpi"]["raw_mapped_count"] == 1 + assert events.status_code == 200 + assert events.json()["items"][0]["mapped_symbol"] == "ABC/USDT" + + +def test_scheduler_seeds_onchain_job(monkeypatch, tmp_path): + _temp_db(monkeypatch, tmp_path) + sched_path = tmp_path / "scheduler_state.db" + monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) + scheduler_db.init_scheduler_tables() + + jobs = {item["job_name"]: item for item in scheduler_db.get_job_configs()} + + assert jobs["onchain"]["command"] == "onchain" + assert jobs["onchain"]["lock_group"] == "onchain_write" diff --git a/tests/test_scheduler_control.py b/tests/test_scheduler_control.py index cc7ce2e..fa19bca 100644 --- a/tests/test_scheduler_control.py +++ b/tests/test_scheduler_control.py @@ -17,7 +17,9 @@ import docker.scheduler as scheduler def test_scheduler_tables_seed_defaults(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" + sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) altcoin_db.init_db() scheduler_db.init_scheduler_tables() @@ -25,11 +27,14 @@ def test_scheduler_tables_seed_defaults(monkeypatch, tmp_path): assert jobs["event"]["lock_group"] == "recommendation_write" assert jobs["confirm"]["lock_group"] == "recommendation_write" assert jobs["tracker"]["every_seconds"] == 180 + assert jobs["onchain"]["lock_group"] == "onchain_write" def test_scheduler_control_api_and_page(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" + sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) altcoin_db.init_db() scheduler_db.init_scheduler_tables() @@ -94,7 +99,9 @@ class _FakeProc: def test_scheduler_starts_different_lock_groups_concurrently(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" + sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) altcoin_db.init_db() scheduler_db.init_scheduler_tables() monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc) @@ -113,7 +120,9 @@ def test_scheduler_starts_different_lock_groups_concurrently(monkeypatch, tmp_pa def test_scheduler_blocks_shared_lock_and_prevents_reentry(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" + sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) altcoin_db.init_db() scheduler_db.init_scheduler_tables() monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc) @@ -138,7 +147,9 @@ def test_scheduler_blocks_shared_lock_and_prevents_reentry(monkeypatch, tmp_path def test_disabled_job_does_not_auto_run_but_manual_force_can_start(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" + sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) altcoin_db.init_db() scheduler_db.init_scheduler_tables() monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc)