import json import os import sqlite3 import sys from datetime import datetime from fastapi.testclient import TestClient PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if PROJECT_DIR not in sys.path: sys.path.insert(0, PROJECT_DIR) from app.db import altcoin_db, onchain_db, scheduler_db from app.services import onchain_monitor from app.web import web_server def _temp_db(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) altcoin_db.init_db() onchain_db.init_onchain_tables() return db_path def test_mapping_requires_confidence_and_preserves_multi_chain(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) onchain_db.upsert_token_mapping("ABC", "ethereum", "0xaaa", source="manual", confidence=95) onchain_db.upsert_token_mapping("ABC", "bsc", "0xbbb", source="manual", confidence=55) usable = onchain_db.get_token_mappings("ABC", min_confidence=70) assert len(usable) == 1 assert usable[0]["chain"] == "ethereum" assert usable[0]["contract_address"] == "0xaaa" def test_auto_mapping_rejects_non_target_native_and_wrapped_tokens(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) assert onchain_monitor._is_auto_mapping_symbol_allowed("STORJ", "Storj") is True assert onchain_monitor._is_auto_mapping_symbol_allowed("AVAX", "Avalanche") is False assert onchain_monitor._is_auto_mapping_symbol_allowed("FIL", "Wrapped Filecoin") is False assert onchain_monitor._is_auto_mapping_symbol_allowed("USDT", "Tether USD") is False def test_onchain_candidate_enqueues_event_news_not_recommendation(monkeypatch, tmp_path): db_path = _temp_db(monkeypatch, tmp_path) onchain_db.insert_token_metric( { "symbol": "ABC/USDT", "chain": "ethereum", "contract_address": "0xaaa", "window": "1h", "metric_time": datetime.now().isoformat(), "dex_volume_usd": 500000, "dex_volume_change_pct": 160, "liquidity_usd": 300000, "liquidity_change_pct": 35, "onchain_score": 82, "risk_score": 0, "source": "test", } ) event_id = onchain_db.insert_onchain_event( { "chain": "ethereum", "symbol": "ABC/USDT", "contract_address": "0xaaa", "signal_code": "dex_volume_spike", "direction": "positive", "value_usd": 500000, "confidence": 88, "severity": "A", "detected_at": datetime.now().isoformat(), "source": "test", } ) result = onchain_monitor.enqueue_onchain_candidates(min_score=70, min_confidence=70, cooldown_hours=6) assert event_id > 0 assert result["queued"] == 1 conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row news = conn.execute("SELECT * FROM event_news WHERE source='onchain'").fetchone() rec_count = conn.execute("SELECT COUNT(*) FROM recommendation").fetchone()[0] status = conn.execute("SELECT status FROM onchain_events WHERE id=?", (event_id,)).fetchone()[0] conn.close() assert news["event_type"] == "onchain_candidate" assert json.loads(news["raw_json"])["signal_code"] == "dex_volume_spike" assert rec_count == 0 assert status == "candidate_queued" def test_negative_onchain_signal_is_risk_context_only(monkeypatch, tmp_path): db_path = _temp_db(monkeypatch, tmp_path) onchain_db.insert_onchain_event( { "chain": "ethereum", "symbol": "RISK/USDT", "signal_code": "exchange_inflow_risk", "direction": "risk", "value_usd": 900000, "confidence": 92, "severity": "RISK", "detected_at": datetime.now().isoformat(), "source": "test", } ) result = onchain_monitor.enqueue_onchain_candidates(min_score=1, min_confidence=1) conn = sqlite3.connect(db_path) news_count = conn.execute("SELECT COUNT(*) FROM event_news WHERE source='onchain'").fetchone()[0] conn.close() assert result["queued"] == 0 assert news_count == 0 def test_onchain_api_and_page(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) onchain_db.insert_token_metric( { "symbol": "ABC/USDT", "chain": "base", "contract_address": "0xabc", "window": "1h", "metric_time": datetime.now().isoformat(), "dex_volume_usd": 123000, "dex_volume_change_pct": 90, "liquidity_usd": 456000, "liquidity_change_pct": 12, "onchain_score": 76, "risk_score": 8, "source": "test", } ) client = TestClient(web_server.app) page = client.get("/onchain") assert page.status_code == 200 assert "链上异动" in page.text overview = client.get("/api/onchain/overview") assert overview.status_code == 200 assert overview.json()["kpi"]["token_count"] == 1 assert overview.json()["provider_status"]["providers"][0]["provider"] == "nodereal" tokens = client.get("/api/onchain/tokens") assert tokens.status_code == 200 assert tokens.json()["items"][0]["symbol"] == "ABC/USDT" provider_status = client.get("/api/onchain/provider-status") assert provider_status.status_code == 200 assert provider_status.json()["coverage"]["metrics"] == 1 def test_raw_event_api_and_overview_counts(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) onchain_db.upsert_token_mapping("ABC", "ethereum", "0xabc", source="manual", confidence=95) onchain_db.insert_onchain_raw_event( { "source": "nodereal", "chain": "ethereum", "event_type": "evm_transfer", "token_address": "0xabc", "title": "NodeReal ERC-20 原始转账", "amount": 10, "total_amount": 80, "importance": 80, "mapped_symbol": "ABC/USDT", "mapping_status": "mapped", "detected_at": datetime.now().isoformat(), } ) client = TestClient(web_server.app) overview = client.get("/api/onchain/overview") events = client.get("/api/onchain/raw-events") important = client.get("/api/onchain/raw-events?priority=important") low = client.get("/api/onchain/raw-events?priority=low") assert overview.status_code == 200 assert overview.json()["kpi"]["raw_event_count"] == 1 assert overview.json()["kpi"]["raw_mapped_count"] == 1 assert events.status_code == 200 assert events.json()["items"][0]["mapped_symbol"] == "ABC/USDT" assert important.status_code == 200 assert important.json()["total"] == 1 assert low.status_code == 200 assert low.json()["total"] == 0 def test_overview_ignores_legacy_signals_and_surfaces_mapped_raw_feed(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) onchain_db.insert_onchain_event( { "chain": "ethereum", "symbol": "OLD/USDT", "signal_code": "dex_volume_spike", "direction": "positive", "value_usd": 1000000, "confidence": 80, "severity": "A", "detected_at": datetime.now().isoformat(), "source": "dexscreener", } ) onchain_db.insert_onchain_raw_event( { "source": "nodereal", "chain": "bsc", "event_type": "evm_transfer", "token_address": "0xbeam", "title": "NodeReal ERC-20 原始转账", "amount": 200, "total_amount": 200, "importance": 82, "mapped_symbol": "BEAM/USDT", "mapping_status": "mapped", "detected_at": datetime.now().isoformat(), } ) overview = onchain_db.get_onchain_overview(hours=24) assert overview["kpi"]["positive_events"] == 0 assert overview["kpi"]["raw_mapped_count"] == 1 assert overview["kpi"]["mapped_signal_count"] == 1 assert overview["hot_tokens"][0]["symbol"] == "BEAM/USDT" assert overview["signals"] == [] def test_token_detail_includes_mapped_raw_events(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) onchain_db.upsert_token_mapping( "BEAM/USDT", "bsc", "0xbeam", source="nodereal_erc20_metadata", confidence=90, raw={"symbol": "BEAM", "name": "Beam", "decimals": 18}, ) onchain_db.insert_onchain_raw_event( { "source": "nodereal", "chain": "bsc", "event_type": "evm_transfer", "token_address": "0xbeam", "title": "NodeReal ERC-20 原始转账", "amount": 300 * 10**18, "total_amount": 300 * 10**18, "importance": 78, "mapped_symbol": "BEAM/USDT", "mapping_status": "mapped", "detected_at": datetime.now().isoformat(), } ) detail = onchain_db.get_onchain_token_detail("BEAM/USDT", hours=24) assert detail["events"] == [] assert detail["raw_event_count"] == 1 assert detail["raw_events"][0]["mapped_symbol"] == "BEAM/USDT" assert detail["raw_events"][0]["display_amount_label"] == "300 BEAM" assert "数量约 300 BEAM" in detail["raw_events"][0]["human_summary"] assert detail["raw_events"][0]["pipeline_note"] == "已映射,可进入后续链上信号分析。" def test_nodereal_events_generate_metrics_and_normalized_event(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_NODEREAL_API_KEY", "test-key") onchain_db.upsert_token_mapping("ABC", "ethereum", "0xabc", source="manual", confidence=95) onchain_db.insert_token_metric( { "symbol": "ABC/USDT", "chain": "ethereum", "contract_address": "0xabc", "window": "1h", "metric_time": datetime.now().isoformat(), "dex_volume_usd": 100000, "liquidity_usd": 100000, "source": "test", "raw": {"price_usd": "2"}, } ) class FakeNodeRealClient: def supports_chain(self, chain): return chain == "ethereum" def token_holder_count(self, chain, contract): assert chain == "ethereum" assert contract == "0xabc" return 120 def block_number(self, chain): assert chain == "ethereum" return 1000 def get_logs(self, chain, log_filter): if "address" not in log_filter: return [] assert log_filter["address"] == "0xabc" return [ { "address": "0xabc", "transactionHash": "0xtx", "data": hex(200000 * 10**18), "topics": [ onchain_monitor.TRANSFER_TOPIC, "0x0000000000000000000000001111111111111111111111111111111111111111", "0x0000000000000000000000002222222222222222222222222222222222222222", ], } ] monkeypatch.setattr(onchain_monitor, "_nodereal_client", lambda cfg=None: FakeNodeRealClient()) result = onchain_monitor.fetch_nodereal_events(limit=10) assert result["errors"] == [] assert len(result["events"]) == 1 assert len(result["metrics"]) == 1 events = onchain_db.list_onchain_events(hours=50000) assert events["total"] == 1 assert events["items"][0]["source"] == "nodereal" assert events["items"][0]["signal_code"] == "whale_accumulation" def test_nodereal_no_supported_mapping_error_has_diagnostics(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_NODEREAL_API_KEY", "test-key") monkeypatch.setenv("ALPHAX_NODEREAL_CHAINS", "ethereum,bsc") onchain_db.upsert_token_mapping("SOLX", "solana", "Mint111", source="manual", confidence=95) class EmptyNodeRealClient: def supports_chain(self, chain): return chain in {"ethereum", "bsc"} def block_number(self, chain): return 100 def get_logs(self, chain, log_filter): return [] monkeypatch.setattr(onchain_monitor, "_nodereal_client", lambda cfg=None: EmptyNodeRealClient()) result = onchain_monitor.fetch_nodereal_events(limit=10) assert result["metrics"] == [] assert result["events"] == [] assert result["diagnostics"]["mapping_total"] == 1 assert result["diagnostics"]["chain_mapping_total"] == 0 assert result["diagnostics"]["mapping_note"] == "no_enabled_chain_mappings_raw_events_only" assert result["errors"] == [] def test_nodereal_records_raw_events_without_strategy_mappings(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_NODEREAL_API_KEY", "test-key") monkeypatch.setenv("ALPHAX_NODEREAL_CHAINS", "ethereum") class RawNodeRealClient: def supports_chain(self, chain): return chain == "ethereum" def block_number(self, chain): return 1000 def token_holder_count(self, chain, contract): return 0 def get_logs(self, chain, log_filter): if "address" in log_filter: return [] return [ { "address": "0xabc", "transactionHash": "0xrawtx", "data": hex(987654321), "topics": [ onchain_monitor.TRANSFER_TOPIC, "0x0000000000000000000000001111111111111111111111111111111111111111", "0x0000000000000000000000002222222222222222222222222222222222222222", ], } ] monkeypatch.setattr(onchain_monitor, "_nodereal_client", lambda cfg=None: RawNodeRealClient()) result = onchain_monitor.fetch_nodereal_events(limit=10) assert result["errors"] == [] assert result["events"] == [] assert len(result["raw_events"]) == 1 assert result["diagnostics"]["mapping_note"] == "no_strategy_mappings_raw_events_only" raw = onchain_db.list_onchain_raw_events(hours=50000) assert raw["total"] == 1 assert raw["items"][0]["source"] == "nodereal" assert raw["items"][0]["mapping_status"] == "unmapped" assert raw["items"][0]["event_type"] == "evm_transfer" def test_nodereal_auto_maps_raw_event_from_erc20_metadata(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_NODEREAL_API_KEY", "test-key") monkeypatch.setenv("ALPHAX_NODEREAL_CHAINS", "ethereum") def abi_string(value): body = value.encode() padded = body + (b"\x00" * ((32 - len(body) % 32) % 32)) return "0x" + (32).to_bytes(32, "big").hex() + len(body).to_bytes(32, "big").hex() + padded.hex() class MetadataNodeRealClient: def supports_chain(self, chain): return chain == "ethereum" def block_number(self, chain): return 1000 def token_holder_count(self, chain, contract): return 0 def get_logs(self, chain, log_filter): if "address" in log_filter: return [] return [ { "address": "0xstorj", "transactionHash": "0xrawtx", "data": hex(1000 * 10**8), "topics": [ onchain_monitor.TRANSFER_TOPIC, "0x0000000000000000000000001111111111111111111111111111111111111111", "0x0000000000000000000000002222222222222222222222222222222222222222", ], } ] def eth_call(self, chain, to_address, data, block="latest"): if data == onchain_monitor.ERC20_SYMBOL_SELECTOR: return abi_string("STORJ") if data == onchain_monitor.ERC20_NAME_SELECTOR: return abi_string("Storj") if data == onchain_monitor.ERC20_DECIMALS_SELECTOR: return hex(8) return "0x" monkeypatch.setattr(onchain_monitor, "_nodereal_client", lambda cfg=None: MetadataNodeRealClient()) result = onchain_monitor.fetch_nodereal_events(limit=10) assert result["errors"] == [] assert len(result["raw_events"]) == 1 raw = onchain_db.list_onchain_raw_events(hours=50000) assert raw["items"][0]["mapping_status"] == "mapped" assert raw["items"][0]["mapped_symbol"] == "STORJ/USDT" mappings = onchain_db.get_token_mappings("STORJ/USDT") assert len(mappings) == 1 assert mappings[0]["source"] == "nodereal_erc20_metadata" def test_nodereal_seeds_configured_token_mappings_from_env(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_NODEREAL_API_KEY", "test-key") monkeypatch.setenv( "ALPHAX_ONCHAIN_TOKEN_MAPPINGS", '[{"symbol":"ENVX/USDT","chain":"ethereum","contract_address":"0xabc","confidence":96}]', ) class EmptyNodeRealClient: def supports_chain(self, chain): return chain == "ethereum" def token_holder_count(self, chain, contract): return 0 def block_number(self, chain): return 100 def get_logs(self, chain, log_filter): if "address" not in log_filter: return [] return [] monkeypatch.setattr(onchain_monitor, "_nodereal_client", lambda cfg=None: EmptyNodeRealClient()) result = onchain_monitor.fetch_nodereal_events(limit=10) assert result["errors"] == [] assert result["diagnostics"]["seeded_mappings"] == 1 mappings = onchain_db.get_token_mappings("ENVX/USDT") assert len(mappings) == 1 assert mappings[0]["contract_address"] == "0xabc" def test_scheduler_seeds_onchain_job(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) scheduler_db.init_scheduler_tables() jobs = {item["job_name"]: item for item in scheduler_db.get_job_configs()} assert jobs["onchain"]["command"] == "onchain" assert jobs["onchain"]["lock_group"] == "onchain_write"