import json import os import sqlite3 import sys from datetime import datetime from fastapi.testclient import TestClient PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if PROJECT_DIR not in sys.path: sys.path.insert(0, PROJECT_DIR) from app.db import altcoin_db, onchain_db, scheduler_db from app.services import onchain_monitor from app.web import web_server def _temp_db(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) altcoin_db.init_db() onchain_db.init_onchain_tables() return db_path def test_mapping_requires_confidence_and_preserves_multi_chain(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) onchain_db.upsert_token_mapping("ABC", "ethereum", "0xaaa", source="manual", confidence=95) onchain_db.upsert_token_mapping("ABC", "bsc", "0xbbb", source="manual", confidence=55) usable = onchain_db.get_token_mappings("ABC", min_confidence=70) assert len(usable) == 1 assert usable[0]["chain"] == "ethereum" assert usable[0]["contract_address"] == "0xaaa" def test_dex_signal_codes_from_metric(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) cfg = { "dex_volume_spike_pct": 80, "dex_min_hour_volume_usd": 50000, "dex_hour_volume_share_pct": 8, "liquidity_add_pct": 25, "liquidity_remove_pct": -25, } assert onchain_monitor.derive_dex_signals({"dex_volume_change_pct": 120, "liquidity_change_pct": 40}, cfg) == [ "dex_volume_spike", "liquidity_add", ] assert onchain_monitor.derive_dex_signals({"dex_volume_change_pct": 10, "liquidity_change_pct": -35}, cfg) == [ "liquidity_remove_risk" ] assert onchain_monitor.derive_dex_signals( {"dex_volume_usd": 600000, "dex_volume_1h_usd": 80000, "dex_volume_change_pct": 0, "liquidity_change_pct": 0}, cfg, ) == ["dex_volume_spike"] def test_auto_mapping_rejects_non_target_native_and_wrapped_tokens(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_ONCHAIN_CHAINS", "ethereum,solana") cfg = onchain_monitor.get_onchain_params() chains = set(cfg.get("chains") or []) avax_pair = { "chainId": "solana", "baseToken": {"symbol": "AVAX", "name": "Avalanche"}, "quoteToken": {"symbol": "USDC"}, "liquidity": {"usd": 500000}, "volume": {"h24": 1000000}, "url": "https://example.com", } wrapped_pair = { "chainId": "ethereum", "baseToken": {"symbol": "FIL", "name": "Wrapped Filecoin"}, "quoteToken": {"symbol": "USDT"}, "liquidity": {"usd": 500000}, "volume": {"h24": 1000000}, "url": "https://example.com/wrapped-filecoin", } assert onchain_monitor._pair_rejection_reason(avax_pair, "AVAX/USDT", chains) == "native_chain_not_in_scope" assert onchain_monitor._pair_rejection_reason(wrapped_pair, "FIL/USDT", chains) == "native_chain_not_in_scope" assert onchain_monitor._pair_rejection_reason( { "chainId": "solana", "baseToken": {"symbol": "UNKNOWN", "name": "Unknown"}, "quoteToken": {"symbol": "USDC"}, "liquidity": {"usd": 500000}, "volume": {"h24": 1000000}, "url": "https://example.com", }, "UNKNOWN/USDT", chains, ) == "solana_not_allowlisted" def test_onchain_candidate_enqueues_event_news_not_recommendation(monkeypatch, tmp_path): db_path = _temp_db(monkeypatch, tmp_path) onchain_db.insert_token_metric( { "symbol": "ABC/USDT", "chain": "ethereum", "contract_address": "0xaaa", "window": "1h", "metric_time": datetime.now().isoformat(), "dex_volume_usd": 500000, "dex_volume_change_pct": 160, "liquidity_usd": 300000, "liquidity_change_pct": 35, "onchain_score": 82, "risk_score": 0, "source": "test", } ) event_id = onchain_db.insert_onchain_event( { "chain": "ethereum", "symbol": "ABC/USDT", "contract_address": "0xaaa", "signal_code": "dex_volume_spike", "direction": "positive", "value_usd": 500000, "confidence": 88, "severity": "A", "detected_at": datetime.now().isoformat(), "source": "test", } ) result = onchain_monitor.enqueue_onchain_candidates(min_score=70, min_confidence=70, cooldown_hours=6) assert event_id > 0 assert result["queued"] == 1 conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row news = conn.execute("SELECT * FROM event_news WHERE source='onchain'").fetchone() rec_count = conn.execute("SELECT COUNT(*) FROM recommendation").fetchone()[0] status = conn.execute("SELECT status FROM onchain_events WHERE id=?", (event_id,)).fetchone()[0] conn.close() assert news["event_type"] == "onchain_candidate" assert json.loads(news["raw_json"])["signal_code"] == "dex_volume_spike" assert rec_count == 0 assert status == "candidate_queued" def test_negative_onchain_signal_is_risk_context_only(monkeypatch, tmp_path): db_path = _temp_db(monkeypatch, tmp_path) onchain_db.insert_onchain_event( { "chain": "ethereum", "symbol": "RISK/USDT", "signal_code": "exchange_inflow_risk", "direction": "risk", "value_usd": 900000, "confidence": 92, "severity": "RISK", "detected_at": datetime.now().isoformat(), "source": "test", } ) result = onchain_monitor.enqueue_onchain_candidates(min_score=1, min_confidence=1) conn = sqlite3.connect(db_path) news_count = conn.execute("SELECT COUNT(*) FROM event_news WHERE source='onchain'").fetchone()[0] conn.close() assert result["queued"] == 0 assert news_count == 0 def test_onchain_api_and_page(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) onchain_db.insert_token_metric( { "symbol": "ABC/USDT", "chain": "base", "contract_address": "0xabc", "window": "1h", "metric_time": datetime.now().isoformat(), "dex_volume_usd": 123000, "dex_volume_change_pct": 90, "liquidity_usd": 456000, "liquidity_change_pct": 12, "onchain_score": 76, "risk_score": 8, "source": "test", } ) client = TestClient(web_server.app) page = client.get("/onchain") assert page.status_code == 200 assert "链上异动" in page.text overview = client.get("/api/onchain/overview") assert overview.status_code == 200 assert overview.json()["kpi"]["token_count"] == 1 assert overview.json()["provider_status"]["providers"][0]["provider"] == "nodereal" tokens = client.get("/api/onchain/tokens") assert tokens.status_code == 200 assert tokens.json()["items"][0]["symbol"] == "ABC/USDT" provider_status = client.get("/api/onchain/provider-status") assert provider_status.status_code == 200 assert provider_status.json()["coverage"]["metrics"] == 1 def test_raw_dexscreener_events_store_without_mapping(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_ONCHAIN_ENABLED", "1") monkeypatch.setenv("ALPHAX_ONCHAIN_CHAINS", "ethereum,solana") monkeypatch.setenv("ALPHAX_ONCHAIN_DEXSCREENER_ENABLED", "1") def fake_request(url, params=None, timeout=15): if "token-profiles" in url: return [ { "chainId": "ethereum", "tokenAddress": "0xraw", "url": "https://dexscreener.com/ethereum/0xraw", "description": "Unmapped token started trending", "icon": "https://example.com/icon.png", } ] if "token-boosts/latest" in url: return [ { "chainId": "solana", "tokenAddress": "So111", "url": "https://dexscreener.com/solana/So111", "amount": 25, "totalAmount": 100, } ] if "token-boosts/top" in url: return [] return {"pairs": []} monkeypatch.setattr(onchain_monitor, "_request_json", fake_request) result = onchain_monitor.fetch_dexscreener_raw_events(limit=10) assert len(result["raw_events"]) == 2 raw = onchain_db.list_onchain_raw_events(hours=24) assert raw["total"] == 2 assert raw["items"][0]["mapping_status"] == "unmapped" assert raw["items"][0]["event_label"] def test_raw_event_api_and_overview_counts(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) onchain_db.upsert_token_mapping("ABC", "base", "0xabc", source="manual", confidence=95) onchain_db.insert_onchain_raw_event( { "source": "dexscreener", "chain": "base", "event_type": "token_boost_top", "token_address": "0xabc", "title": "DEX Boost 榜单", "amount": 10, "total_amount": 80, "importance": 80, "mapped_symbol": "ABC/USDT", "mapping_status": "mapped", "detected_at": datetime.now().isoformat(), } ) client = TestClient(web_server.app) overview = client.get("/api/onchain/overview") events = client.get("/api/onchain/raw-events") important = client.get("/api/onchain/raw-events?priority=important") low = client.get("/api/onchain/raw-events?priority=low") assert overview.status_code == 200 assert overview.json()["kpi"]["raw_event_count"] == 1 assert overview.json()["kpi"]["raw_mapped_count"] == 1 assert events.status_code == 200 assert events.json()["items"][0]["mapped_symbol"] == "ABC/USDT" assert important.status_code == 200 assert important.json()["total"] == 0 assert low.status_code == 200 assert low.json()["total"] == 1 def test_nodereal_events_generate_metrics_and_normalized_event(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_NODEREAL_API_KEY", "test-key") onchain_db.upsert_token_mapping("ABC", "ethereum", "0xabc", source="manual", confidence=95) onchain_db.insert_token_metric( { "symbol": "ABC/USDT", "chain": "ethereum", "contract_address": "0xabc", "window": "1h", "metric_time": datetime.now().isoformat(), "dex_volume_usd": 100000, "liquidity_usd": 100000, "source": "test", "raw": {"price_usd": "2"}, } ) class FakeNodeRealClient: def supports_chain(self, chain): return chain == "ethereum" def token_holder_count(self, chain, contract): assert chain == "ethereum" assert contract == "0xabc" return 120 def block_number(self, chain): assert chain == "ethereum" return 1000 def get_logs(self, chain, log_filter): assert log_filter["address"] == "0xabc" return [ { "transactionHash": "0xtx", "data": hex(200000 * 10**18), "topics": [ onchain_monitor.TRANSFER_TOPIC, "0x0000000000000000000000001111111111111111111111111111111111111111", "0x0000000000000000000000002222222222222222222222222222222222222222", ], } ] monkeypatch.setattr(onchain_monitor, "_nodereal_client", lambda cfg=None: FakeNodeRealClient()) result = onchain_monitor.fetch_nodereal_events(limit=10) assert result["errors"] == [] assert len(result["events"]) == 1 assert len(result["metrics"]) == 1 events = onchain_db.list_onchain_events(hours=50000) assert events["total"] == 1 assert events["items"][0]["source"] == "nodereal" assert events["items"][0]["signal_code"] == "whale_accumulation" def test_legacy_helius_is_disabled_by_default(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) monkeypatch.setenv("ALPHAX_HELIUS_API_KEY", "test-key") onchain_db.upsert_token_mapping("SOLX", "solana", "Mint111", source="manual", confidence=95) result = onchain_monitor.fetch_helius_events(limit=10) assert result["events"] == [] assert result["errors"] == ["helius_disabled"] def test_scheduler_seeds_onchain_job(monkeypatch, tmp_path): _temp_db(monkeypatch, tmp_path) sched_path = tmp_path / "scheduler_state.db" monkeypatch.setattr(scheduler_db, "SCHEDULER_DB_PATH", str(sched_path)) scheduler_db.init_scheduler_tables() jobs = {item["job_name"]: item for item in scheduler_db.get_job_configs()} assert jobs["onchain"]["command"] == "onchain" assert jobs["onchain"]["lock_group"] == "onchain_write"