alphax/app/db/onchain_db.py
2026-05-17 19:39:11 +08:00

874 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""On-chain discovery storage and read models.
The on-chain layer is a research/discovery input. It stores normalized external
facts and can enqueue technical-check candidates, but it must not create or
mutate trading recommendations directly.
"""
import json
import os
from datetime import datetime, timedelta
from app.db.altcoin_db import get_conn
from app.db.postgres_connection import ensure_migrations_once
from app.config.system_config import onchain_config
MIN_MAPPING_CONFIDENCE = 70
SIGNAL_LABELS = {
"large_token_transfer": "链上大额转账",
"dex_volume_spike": "DEX 放量",
"liquidity_add": "流动性增加",
"liquidity_remove_risk": "流动性撤出风险",
"exchange_outflow": "交易所流出",
"exchange_inflow_risk": "交易所流入风险",
"whale_accumulation": "鲸鱼增持",
"holder_concentration_risk": "持仓集中风险",
"smart_money_buying": "聪明钱买入",
}
RAW_EVENT_TYPE_LABELS = {
"token_profile_latest": "DEX 新币资料变更",
"token_boost_latest": "DEX 付费曝光新增",
"token_boost_top": "DEX 付费曝光榜",
}
RAW_EVENT_EXPLAINERS = {
"token_profile_latest": {
"plain": "项目方或社区刚在 DEX Screener 更新了代币资料、图标或链接。",
"meaning": "只代表曝光资料发生变化,信号较弱,通常不能单独说明有资金买入。",
"priority": "low",
},
"token_boost_latest": {
"plain": "有人为这个代币购买了 DEX Screener 曝光位。",
"meaning": "代表短期推广热度上升,可能吸引散户注意,但也可能只是营销。",
"priority": "medium",
},
"token_boost_top": {
"plain": "这个代币出现在 DEX Screener 付费曝光榜前列。",
"meaning": "代表平台内关注度较高,需要再看成交量、流动性和是否能映射交易对。",
"priority": "medium",
},
}
POSITIVE_SIGNALS = {"dex_volume_spike", "liquidity_add", "exchange_outflow", "whale_accumulation", "smart_money_buying"}
RISK_SIGNALS = {"liquidity_remove_risk", "exchange_inflow_risk", "holder_concentration_risk"}
def _now():
return datetime.now().isoformat()
def _dump(value):
return json.dumps(value or {}, ensure_ascii=False, sort_keys=True, default=str)
def _load(value, fallback=None):
try:
if isinstance(value, str) and value.strip():
return json.loads(value)
if value is not None:
return value
except Exception:
pass
return fallback
def _symbol_base(symbol):
return str(symbol or "").upper().replace("/USDT", "").replace("USDT", "").strip()
def normalize_symbol(symbol):
base = _symbol_base(symbol)
return f"{base}/USDT" if base else ""
def signal_label(code):
return SIGNAL_LABELS.get(str(code or ""), str(code or "链上信号"))
def signal_direction(code):
code = str(code or "")
if code in RISK_SIGNALS:
return "risk"
if code in POSITIVE_SIGNALS:
return "positive"
return "neutral"
def raw_event_type_label(event_type):
return RAW_EVENT_TYPE_LABELS.get(str(event_type or ""), str(event_type or "链上原始事件"))
def raw_event_explainer(event_type):
return RAW_EVENT_EXPLAINERS.get(
str(event_type or ""),
{
"plain": "链上或链上相关数据源捕捉到一条原始动态。",
"meaning": "需要完成币种映射和质量验证后,才可能进入技术检查。",
"priority": "medium",
},
)
def init_onchain_tables():
ensure_migrations_once()
def upsert_token_mapping(symbol, chain, contract_address, source="", confidence=0, raw=None, is_active=True):
init_onchain_tables()
now = _now()
symbol = normalize_symbol(symbol)
chain = str(chain or "").lower().strip()
contract_address = str(contract_address or "").strip()
if not symbol or not chain or not contract_address:
return 0
conn = get_conn()
cur = conn.execute(
"""
INSERT INTO onchain_token_map
(symbol, chain, contract_address, source, confidence, is_active, raw_json, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(symbol, chain, contract_address) DO UPDATE SET
source=excluded.source,
confidence=GREATEST(onchain_token_map.confidence, excluded.confidence),
is_active=excluded.is_active,
raw_json=excluded.raw_json,
updated_at=excluded.updated_at
""",
(symbol, chain, contract_address, source or "", int(confidence or 0), 1 if is_active else 0, _dump(raw), now, now),
)
conn.commit()
row = conn.execute(
"SELECT id FROM onchain_token_map WHERE symbol=%s AND chain=%s AND contract_address=%s",
(symbol, chain, contract_address),
).fetchone()
conn.close()
return int(row["id"] if row else 0)
def get_token_mappings(symbol="", min_confidence=MIN_MAPPING_CONFIDENCE, active_only=True):
init_onchain_tables()
clauses = ["confidence >= %s"]
params = [int(min_confidence or 0)]
if symbol:
clauses.append("symbol=%s")
params.append(normalize_symbol(symbol))
if active_only:
clauses.append("is_active=1")
conn = get_conn()
rows = conn.execute(
f"""
SELECT * FROM onchain_token_map
WHERE {' AND '.join(clauses)}
ORDER BY confidence DESC, updated_at DESC
""",
tuple(params),
).fetchall()
conn.close()
return [dict(row) for row in rows]
def _event_hash(event):
raw = "|".join(
[
str(event.get("source") or ""),
str(event.get("chain") or ""),
str(event.get("symbol") or ""),
str(event.get("signal_code") or event.get("event_type") or ""),
str(event.get("tx_hash") or event.get("contract_address") or ""),
str(event.get("detected_at") or ""),
str(round(float(event.get("value_usd") or 0), 2)),
]
).lower()
import hashlib
return hashlib.sha256(raw.encode()).hexdigest()[:24]
def _raw_event_hash(event):
raw = "|".join(
[
str(event.get("source") or ""),
str(event.get("chain") or ""),
str(event.get("event_type") or ""),
str(event.get("token_address") or ""),
str(event.get("url") or ""),
str(round(float(event.get("amount") or 0), 4)),
str(round(float(event.get("total_amount") or 0), 4)),
]
).lower()
import hashlib
return hashlib.sha256(raw.encode()).hexdigest()[:24]
def insert_onchain_event(event):
init_onchain_tables()
item = dict(event or {})
item["symbol"] = normalize_symbol(item.get("symbol"))
item["chain"] = str(item.get("chain") or "").lower().strip()
item["signal_code"] = str(item.get("signal_code") or "").strip()
item["event_type"] = str(item.get("event_type") or item["signal_code"] or "onchain_event")
if not item["symbol"] or not item["chain"] or not item["signal_code"]:
return 0
item["signal_label"] = item.get("signal_label") or signal_label(item["signal_code"])
item["direction"] = item.get("direction") or signal_direction(item["signal_code"])
item["detected_at"] = str(item.get("detected_at") or _now())
item["event_hash"] = item.get("event_hash") or _event_hash(item)
conn = get_conn()
try:
cur = conn.execute(
"""
INSERT INTO onchain_events (
event_hash, chain, symbol, contract_address, event_type, signal_code, signal_label,
direction, value_usd, amount, tx_hash, wallet_address, wallet_label,
counterparty_label, confidence, severity, status, detected_at, source, url, raw_json
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(event_hash) DO NOTHING
RETURNING id
""",
(
item["event_hash"],
item["chain"],
item["symbol"],
item.get("contract_address") or "",
item["event_type"],
item["signal_code"],
item["signal_label"],
item["direction"],
float(item.get("value_usd") or 0),
float(item.get("amount") or 0),
item.get("tx_hash") or "",
item.get("wallet_address") or "",
item.get("wallet_label") or "",
item.get("counterparty_label") or "",
int(item.get("confidence") or 0),
item.get("severity") or "B",
item.get("status") or "new",
item["detected_at"],
item.get("source") or "",
item.get("url") or "",
_dump(item.get("raw") or item.get("raw_json") or {}),
),
)
row = cur.fetchone()
event_id = int(row["id"] if row else 0)
conn.commit()
finally:
conn.close()
return event_id
def find_mapping_by_contract(chain, contract_address):
init_onchain_tables()
chain = str(chain or "").lower().strip()
contract_address = str(contract_address or "").strip()
if not chain or not contract_address:
return None
conn = get_conn()
row = conn.execute(
"""
SELECT *
FROM onchain_token_map
WHERE chain=%s AND lower(contract_address)=lower(%s) AND is_active=1
ORDER BY confidence DESC, updated_at DESC
LIMIT 1
""",
(chain, contract_address),
).fetchone()
conn.close()
return dict(row) if row else None
def insert_onchain_raw_event(event):
init_onchain_tables()
item = dict(event or {})
item["source"] = str(item.get("source") or "").strip()
item["chain"] = str(item.get("chain") or "").lower().strip()
item["event_type"] = str(item.get("event_type") or "onchain_raw_event").strip()
item["token_address"] = str(item.get("token_address") or "").strip()
if not item["source"] or not item["chain"] or not item["event_type"] or not item["token_address"]:
return 0
item["detected_at"] = str(item.get("detected_at") or _now())
item["event_hash"] = item.get("event_hash") or _raw_event_hash(item)
item["mapped_symbol"] = normalize_symbol(item.get("mapped_symbol")) if item.get("mapped_symbol") else ""
item["mapping_status"] = str(item.get("mapping_status") or ("mapped" if item["mapped_symbol"] else "unmapped"))
conn = get_conn()
try:
cur = conn.execute(
"""
INSERT INTO onchain_raw_events (
event_hash, source, chain, event_type, token_address, symbol_guess, name,
title, description, url, icon, amount, total_amount, importance,
mapped_symbol, mapping_status, detected_at, raw_json
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(event_hash) DO NOTHING
RETURNING id
""",
(
item["event_hash"],
item["source"],
item["chain"],
item["event_type"],
item["token_address"],
item.get("symbol_guess") or "",
item.get("name") or "",
item.get("title") or raw_event_type_label(item["event_type"]),
item.get("description") or "",
item.get("url") or "",
item.get("icon") or "",
float(item.get("amount") or 0),
float(item.get("total_amount") or 0),
float(item.get("importance") or 0),
item["mapped_symbol"],
item["mapping_status"],
item["detected_at"],
_dump(item.get("raw") or item.get("raw_json") or {}),
),
)
row = cur.fetchone()
event_id = int(row["id"] if row else 0)
conn.commit()
finally:
conn.close()
return event_id
def insert_token_metric(metric):
init_onchain_tables()
item = dict(metric or {})
item["symbol"] = normalize_symbol(item.get("symbol"))
item["chain"] = str(item.get("chain") or "").lower().strip()
item["window"] = str(item.get("window") or "1h").strip()
item["metric_time"] = str(item.get("metric_time") or _now())
if not item["symbol"] or not item["chain"] or not item["window"]:
return 0
conn = get_conn()
cur = conn.execute(
"""
INSERT INTO onchain_token_metrics (
symbol, chain, contract_address, "window", metric_time,
dex_volume_usd, dex_volume_change_pct, liquidity_usd, liquidity_change_pct,
exchange_netflow_usd, whale_accumulation_usd, holder_delta, smart_money_score,
onchain_score, risk_score, source, raw_json
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT(symbol, chain, contract_address, "window", metric_time) DO UPDATE SET
dex_volume_usd=excluded.dex_volume_usd,
dex_volume_change_pct=excluded.dex_volume_change_pct,
liquidity_usd=excluded.liquidity_usd,
liquidity_change_pct=excluded.liquidity_change_pct,
exchange_netflow_usd=excluded.exchange_netflow_usd,
whale_accumulation_usd=excluded.whale_accumulation_usd,
holder_delta=excluded.holder_delta,
smart_money_score=excluded.smart_money_score,
onchain_score=excluded.onchain_score,
risk_score=excluded.risk_score,
source=excluded.source,
raw_json=excluded.raw_json
RETURNING id
""",
(
item["symbol"],
item["chain"],
item.get("contract_address") or "",
item["window"],
item["metric_time"],
float(item.get("dex_volume_usd") or 0),
float(item.get("dex_volume_change_pct") or 0),
float(item.get("liquidity_usd") or 0),
float(item.get("liquidity_change_pct") or 0),
float(item.get("exchange_netflow_usd") or 0),
float(item.get("whale_accumulation_usd") or 0),
float(item.get("holder_delta") or 0),
float(item.get("smart_money_score") or 0),
float(item.get("onchain_score") or 0),
float(item.get("risk_score") or 0),
item.get("source") or "",
_dump(item.get("raw") or item.get("raw_json") or {}),
),
)
conn.commit()
metric_id = int(cur.fetchone()["id"] or 0)
conn.close()
return metric_id
def _latest_metrics_subquery(hours=24):
return """
SELECT m.*
FROM onchain_token_metrics m
JOIN (
SELECT symbol, chain, contract_address, MAX(metric_time) AS max_time
FROM onchain_token_metrics
WHERE metric_time >= %s
GROUP BY symbol, chain, contract_address
) latest ON latest.symbol=m.symbol
AND latest.chain=m.chain
AND latest.contract_address=m.contract_address
AND latest.max_time=m.metric_time
"""
def get_onchain_overview(hours=24):
init_onchain_tables()
cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat()
conn = get_conn()
event_rows = conn.execute("SELECT * FROM onchain_events WHERE detected_at >= %s", (cutoff,)).fetchall()
raw_rows = conn.execute("SELECT * FROM onchain_raw_events WHERE detected_at >= %s", (cutoff,)).fetchall()
raw_latest = conn.execute(
"""
SELECT * FROM onchain_raw_events
WHERE detected_at >= %s
AND event_type NOT IN ('token_profile_latest', 'token_boost_latest', 'token_boost_top')
ORDER BY detected_at::timestamp DESC, importance DESC, id DESC
LIMIT 12
""",
(cutoff,),
).fetchall()
metric_rows = conn.execute(_latest_metrics_subquery(hours), (cutoff,)).fetchall()
rec_rows = conn.execute(
"SELECT symbol, id, execution_status, action_status, display_bucket FROM recommendation WHERE status='active'"
).fetchall()
conn.close()
active = {row["symbol"]: dict(row) for row in rec_rows}
events = [dict(row) for row in event_rows]
raw_events = [dict(row) for row in raw_rows]
metrics = [dict(row) for row in metric_rows]
hot = sorted(metrics, key=lambda x: float(x.get("onchain_score") or 0), reverse=True)[:8]
risks = sorted(metrics, key=lambda x: float(x.get("risk_score") or 0), reverse=True)[:8]
total_netflow = sum(float(x.get("exchange_netflow_usd") or 0) for x in metrics)
dex_volume = sum(float(x.get("dex_volume_usd") or 0) for x in metrics)
return {
"hours": int(hours or 24),
"updated_at": _now(),
"kpi": {
"event_count": len(events),
"raw_event_count": len(raw_events),
"raw_unmapped_count": sum(1 for e in raw_events if e.get("mapping_status") == "unmapped"),
"raw_mapped_count": sum(1 for e in raw_events if e.get("mapping_status") == "mapped"),
"token_count": len({(m["symbol"], m["chain"], m.get("contract_address") or "") for m in metrics}),
"positive_events": sum(1 for e in events if e.get("direction") == "positive"),
"risk_events": sum(1 for e in events if e.get("direction") == "risk"),
"exchange_netflow_usd": round(total_netflow, 2),
"dex_volume_usd": round(dex_volume, 2),
},
"hot_tokens": [_format_metric_item(row, active) for row in hot],
"risk_tokens": [_format_metric_item(row, active) for row in risks],
"raw_events": [_format_raw_event(row) for row in raw_latest],
"signals": _signal_counts(events),
"provider_status": get_onchain_provider_status(hours=hours),
}
def get_onchain_provider_status(hours=24):
init_onchain_tables()
cfg = onchain_config()
hours = int(hours or 24)
cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()
etherscan_env = str(cfg.get("etherscan_api_key_env") or "ALPHAX_ETHERSCAN_API_KEY")
helius_env = str(cfg.get("helius_api_key_env") or "ALPHAX_HELIUS_API_KEY")
etherscan_chains = cfg.get("etherscan_chains") or ["ethereum"]
if isinstance(etherscan_chains, str):
etherscan_chains = [x.strip().lower() for x in etherscan_chains.split(",") if x.strip()]
conn = get_conn()
try:
raw_total = conn.execute("SELECT COUNT(*) FROM onchain_raw_events WHERE detected_at >= %s", (cutoff,)).fetchone()[0]
metric_total = conn.execute("SELECT COUNT(*) FROM onchain_token_metrics WHERE metric_time >= %s", (cutoff,)).fetchone()[0]
signal_total = conn.execute("SELECT COUNT(*) FROM onchain_events WHERE detected_at >= %s", (cutoff,)).fetchone()[0]
candidate_total = conn.execute(
"""
SELECT COUNT(*)
FROM event_news
WHERE source='onchain' AND detected_at >= %s
""",
(cutoff,),
).fetchone()[0]
mapping_total = conn.execute("SELECT COUNT(*) FROM onchain_token_map WHERE is_active=1").fetchone()[0]
mapping_usable = conn.execute(
"SELECT COUNT(*) FROM onchain_token_map WHERE is_active=1 AND confidence >= %s",
(MIN_MAPPING_CONFIDENCE,),
).fetchone()[0]
raw_by_type = conn.execute(
"""
SELECT event_type, COUNT(*) AS count
FROM onchain_raw_events
WHERE detected_at >= %s
GROUP BY event_type
ORDER BY count DESC, event_type
""",
(cutoff,),
).fetchall()
metric_sources = conn.execute(
"""
SELECT source, COUNT(*) AS count
FROM onchain_token_metrics
WHERE metric_time >= %s
GROUP BY source
ORDER BY count DESC, source
""",
(cutoff,),
).fetchall()
signal_sources = conn.execute(
"""
SELECT source, COUNT(*) AS count
FROM onchain_events
WHERE detected_at >= %s
GROUP BY source
ORDER BY count DESC, source
""",
(cutoff,),
).fetchall()
last_onchain = conn.execute(
"""
SELECT *
FROM cron_run_log
WHERE script_name='onchain_monitor.py' OR job_name IN ('链上','onchain')
ORDER BY started_at DESC, id DESC
LIMIT 1
"""
).fetchone()
finally:
conn.close()
summary = _load(last_onchain.get("summary_json") if last_onchain else "{}", {}) if last_onchain else {}
last_error = last_onchain.get("error_message") if last_onchain else ""
providers = [
{
"provider": "dexscreener",
"label": "DEX Screener",
"enabled": bool(cfg.get("dexscreener_enabled", True)),
"api_key_present": True,
"implemented": True,
"role": "低优先级曝光源Token 资料、付费推广、已映射合约的 DEX 成交量与流动性",
"raw_events": int(raw_total or 0),
"metrics": int(metric_total or 0),
"signals": int(sum(row["count"] for row in signal_sources if row["source"] == "dexscreener")),
"status": _provider_status_label(bool(cfg.get("dexscreener_enabled", True)), True, raw_total + metric_total, last_error),
},
{
"provider": "etherscan",
"label": "Etherscan",
"enabled": bool(cfg.get("etherscan_enabled", True)),
"api_key_present": bool(os.getenv(etherscan_env, "").strip()),
"implemented": True,
"role": "EVM 已映射合约的 ERC20 大额转账,当前链: " + ", ".join(etherscan_chains or ["ethereum"]),
"raw_events": 0,
"metrics": 0,
"signals": int(sum(row["count"] for row in signal_sources if row["source"] == "etherscan")),
"status": _provider_status_label(
bool(cfg.get("etherscan_enabled", True)),
True,
int(sum(row["count"] for row in signal_sources if row["source"] == "etherscan")),
last_error if "etherscan" in str(last_error).lower() else "",
),
},
{
"provider": "helius",
"label": "Helius",
"enabled": bool(cfg.get("helius_enabled", True)),
"api_key_present": bool(os.getenv(helius_env, "").strip()),
"implemented": True,
"role": "Solana 已映射 mint 的解析交易与大额 token 活动",
"raw_events": 0,
"metrics": 0,
"signals": int(sum(row["count"] for row in signal_sources if row["source"] == "helius")),
"status": _provider_status_label(
bool(cfg.get("helius_enabled", True)),
True,
int(sum(row["count"] for row in signal_sources if row["source"] == "helius")),
last_error if "helius" in str(last_error).lower() else "",
),
},
]
return {
"hours": hours,
"enabled": bool(cfg.get("enabled", False)),
"last_run": dict(last_onchain) if last_onchain else None,
"last_summary": summary,
"last_error": last_error or "",
"coverage": {
"active_mappings": int(mapping_total or 0),
"usable_mappings": int(mapping_usable or 0),
"raw_events": int(raw_total or 0),
"metrics": int(metric_total or 0),
"signals": int(signal_total or 0),
"queued_candidates": int(candidate_total or 0),
},
"raw_event_types": [dict(row) for row in raw_by_type],
"metric_sources": [dict(row) for row in metric_sources],
"signal_sources": [dict(row) for row in signal_sources],
"providers": providers,
}
def _provider_status_label(enabled, implemented, count, last_error=""):
if not enabled:
return "已关闭"
if not implemented:
return "未接入采集"
if count:
return "正常采集中"
if last_error:
return "最近采集失败"
return "暂无数据"
def _signal_counts(events):
counts = {}
for e in events:
code = e.get("signal_code") or ""
if not code:
continue
counts.setdefault(code, {"signal_code": code, "signal_label": signal_label(code), "count": 0})
counts[code]["count"] += 1
return sorted(counts.values(), key=lambda x: x["count"], reverse=True)
def _format_metric_item(row, active=None):
active = active or {}
item = dict(row)
item["raw"] = _load(item.pop("raw_json", "{}"), {})
rec = active.get(item.get("symbol")) or {}
item["recommendation"] = {
"rec_id": rec.get("id") or 0,
"execution_status": rec.get("execution_status") or "",
"action_status": rec.get("action_status") or "",
"display_bucket": rec.get("display_bucket") or "",
"has_active": bool(rec),
}
return item
def list_onchain_tokens(limit=30, offset=0, chain="", signal="", hours=24):
init_onchain_tables()
limit = max(1, min(int(limit or 30), 100))
offset = max(0, int(offset or 0))
cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat()
clauses = []
params = []
if chain:
clauses.append("m.chain=%s")
params.append(str(chain).lower())
if signal:
clauses.append(
"""
EXISTS (
SELECT 1 FROM onchain_events e
WHERE e.symbol=m.symbol AND e.chain=m.chain
AND e.detected_at >= %s AND e.signal_code=%s
)
"""
)
params.extend([cutoff, signal])
where = " AND ".join(clauses) if clauses else "1=1"
conn = get_conn()
total = conn.execute(
f"""
SELECT COUNT(*) FROM (
SELECT m.symbol, m.chain, m.contract_address
FROM onchain_token_metrics m
WHERE {where}
GROUP BY m.symbol, m.chain, m.contract_address
)
""",
tuple(params),
).fetchone()[0]
rows = conn.execute(
f"""
SELECT m.*,
(SELECT COUNT(*) FROM onchain_events e
WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.detected_at >= %s) AS event_count,
(SELECT COUNT(*) FROM onchain_events e
WHERE e.symbol=m.symbol AND e.chain=m.chain AND e.direction='risk' AND e.detected_at >= %s) AS risk_event_count
FROM ({_latest_metrics_subquery(hours)}) m
WHERE {where}
ORDER BY m.onchain_score DESC, m.risk_score DESC, m.metric_time DESC
LIMIT %s OFFSET %s
""",
(cutoff, cutoff, cutoff, *params, limit, offset),
).fetchall()
rec_rows = conn.execute(
"SELECT symbol, id, execution_status, action_status, display_bucket FROM recommendation WHERE status='active'"
).fetchall()
conn.close()
active = {row["symbol"]: dict(row) for row in rec_rows}
return {
"items": [_format_metric_item(row, active) for row in rows],
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(rows) < int(total or 0),
}
def get_onchain_token_detail(symbol, hours=72):
init_onchain_tables()
symbol = normalize_symbol(symbol)
cutoff = (datetime.now() - timedelta(hours=int(hours or 72))).isoformat()
conn = get_conn()
mappings = conn.execute(
"SELECT * FROM onchain_token_map WHERE symbol=%s ORDER BY confidence DESC, updated_at DESC",
(symbol,),
).fetchall()
events = conn.execute(
"""
SELECT * FROM onchain_events
WHERE symbol=%s AND detected_at >= %s
ORDER BY detected_at DESC, id DESC
LIMIT 100
""",
(symbol, cutoff),
).fetchall()
metrics = conn.execute(
"""
SELECT * FROM onchain_token_metrics
WHERE symbol=%s AND metric_time >= %s
ORDER BY metric_time DESC, id DESC
LIMIT 100
""",
(symbol, cutoff),
).fetchall()
rec = conn.execute(
"""
SELECT id, rec_time, action_status, execution_status, display_bucket, entry_price, current_price
FROM recommendation
WHERE symbol=%s AND status='active'
ORDER BY id DESC LIMIT 1
""",
(symbol,),
).fetchone()
conn.close()
return {
"symbol": symbol,
"hours": int(hours or 72),
"mappings": [_with_raw(row) for row in mappings],
"events": [_with_raw(row) for row in events],
"metrics": [_with_raw(row) for row in metrics],
"recommendation": dict(rec) if rec else None,
}
def list_onchain_events(limit=50, offset=0, chain="", signal="", status="", hours=24):
init_onchain_tables()
limit = max(1, min(int(limit or 50), 200))
offset = max(0, int(offset or 0))
cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat()
clauses = ["detected_at >= %s"]
params = [cutoff]
if chain:
clauses.append("chain=%s")
params.append(str(chain).lower())
if signal:
clauses.append("signal_code=%s")
params.append(signal)
if status:
clauses.append("status=%s")
params.append(status)
where = " AND ".join(clauses)
conn = get_conn()
total = conn.execute(f"SELECT COUNT(*) FROM onchain_events WHERE {where}", tuple(params)).fetchone()[0]
rows = conn.execute(
f"""
SELECT * FROM onchain_events
WHERE {where}
ORDER BY detected_at::timestamp DESC, id DESC
LIMIT %s OFFSET %s
""",
(*params, limit, offset),
).fetchall()
conn.close()
return {"items": [_with_raw(row) for row in rows], "total": int(total or 0), "limit": limit, "offset": offset, "has_more": offset + len(rows) < int(total or 0)}
def list_onchain_raw_events(limit=50, offset=0, chain="", source="", event_type="", mapping_status="", priority="", hours=24):
init_onchain_tables()
limit = max(1, min(int(limit or 50), 200))
offset = max(0, int(offset or 0))
cutoff = (datetime.now() - timedelta(hours=int(hours or 24))).isoformat()
clauses = ["detected_at >= %s"]
params = [cutoff]
if chain:
clauses.append("chain=%s")
params.append(str(chain).lower())
if source:
clauses.append("source=%s")
params.append(source)
if event_type:
clauses.append("event_type=%s")
params.append(event_type)
if mapping_status:
clauses.append("mapping_status=%s")
params.append(mapping_status)
if priority:
if priority == "important":
clauses.append("event_type NOT IN ('token_profile_latest', 'token_boost_latest', 'token_boost_top')")
elif priority == "low":
clauses.append("event_type IN ('token_profile_latest', 'token_boost_latest', 'token_boost_top')")
where = " AND ".join(clauses)
conn = get_conn()
total = conn.execute(f"SELECT COUNT(*) FROM onchain_raw_events WHERE {where}", tuple(params)).fetchone()[0]
rows = conn.execute(
f"""
SELECT * FROM onchain_raw_events
WHERE {where}
ORDER BY detected_at::timestamp DESC, importance DESC, id DESC
LIMIT %s OFFSET %s
""",
(*params, limit, offset),
).fetchall()
conn.close()
return {
"items": [_format_raw_event(row) for row in rows],
"total": int(total or 0),
"limit": limit,
"offset": offset,
"has_more": offset + len(rows) < int(total or 0),
}
def update_event_status(event_ids, status):
if not event_ids:
return 0
init_onchain_tables()
conn = get_conn()
cur = conn.execute(
"UPDATE onchain_events SET status=%s WHERE id IN (" + ",".join(["%s"] * len(event_ids)) + ")",
(status, *[int(x) for x in event_ids]),
)
conn.commit()
conn.close()
return int(cur.rowcount or 0)
def _with_raw(row):
item = dict(row)
if "raw_json" in item:
item["raw"] = _load(item.pop("raw_json"), {})
return item
def _format_raw_event(row):
item = _with_raw(row)
item["event_label"] = raw_event_type_label(item.get("event_type"))
explainer = raw_event_explainer(item.get("event_type"))
item["plain_summary"] = explainer.get("plain") or ""
item["why_matters"] = explainer.get("meaning") or ""
item["priority"] = explainer.get("priority") or "medium"
item["pipeline_note"] = (
"已映射,可进入后续链上信号分析。"
if item.get("mapping_status") == "mapped"
else "未完成币种映射,仅作为原始观察,不进入推荐。"
)
item["token_short"] = _short_address(item.get("token_address"))
return item
def _short_address(value):
value = str(value or "")
if len(value) <= 14:
return value
return value[:6] + "..." + value[-4:]