alphax/tests/test_onchain_factor_scoring.py
2026-05-23 12:13:31 +08:00

106 lines
3.1 KiB
Python

from datetime import datetime
import pytest
from app.core.factor_scoring import FactorScorer
from app.db.onchain_db import get_onchain_factor_context, insert_onchain_event, insert_token_metric
from app.services.altcoin_confirm import _apply_onchain_factor_score
def test_onchain_factor_context_returns_recent_positive_and_risk_events():
now = datetime.now().isoformat()
insert_token_metric(
{
"symbol": "BEAM/USDT",
"chain": "ethereum",
"window": "1h",
"metric_time": now,
"onchain_score": 82,
"risk_score": 12,
"source": "nodereal",
}
)
insert_onchain_event(
{
"symbol": "BEAM/USDT",
"chain": "ethereum",
"signal_code": "whale_accumulation",
"direction": "positive",
"value_usd": 1_200_000,
"confidence": 88,
"detected_at": now,
"source": "nodereal",
}
)
insert_onchain_event(
{
"symbol": "BEAM/USDT",
"chain": "ethereum",
"signal_code": "exchange_inflow_risk",
"direction": "risk",
"value_usd": 800_000,
"confidence": 80,
"detected_at": now,
"source": "nodereal",
}
)
ctx = get_onchain_factor_context("BEAM/USDT", hours=24)
assert ctx["has_data"] is True
assert ctx["metrics"]["onchain_score"] == pytest.approx(82)
assert ctx["positive_event_count"] == 1
assert ctx["risk_event_count"] == 1
assert ctx["top_positive"]["signal_code"] == "whale_accumulation"
def test_onchain_factor_score_uses_review_weights():
now = datetime.now().isoformat()
insert_onchain_event(
{
"symbol": "SMART/USDT",
"chain": "bsc",
"signal_code": "smart_money_buying",
"direction": "positive",
"value_usd": 500_000,
"confidence": 76,
"detected_at": now,
"source": "nodereal",
}
)
scorer = FactorScorer(weights={"smart_money_buying": 0})
delta, signals, ctx = _apply_onchain_factor_score("SMART/USDT", scorer)
assert ctx["has_data"] is True
assert signals
assert delta == 0
assert scorer.summary()["items"][0]["factor_code"] == "smart_money_buying"
assert scorer.summary()["items"][0]["score_delta"] == 0
def test_onchain_risk_factor_is_recorded_as_negative_score():
now = datetime.now().isoformat()
insert_onchain_event(
{
"symbol": "RISKY/USDT",
"chain": "ethereum",
"signal_code": "exchange_inflow_risk",
"direction": "risk",
"value_usd": 900_000,
"confidence": 82,
"detected_at": now,
"source": "nodereal",
}
)
scorer = FactorScorer(weights={})
delta, signals, ctx = _apply_onchain_factor_score("RISKY/USDT", scorer)
summary = scorer.summary()
assert ctx["has_data"] is True
assert signals
assert delta < 0
assert summary["items"][0]["score_delta"] < 0
assert summary["risk_score"] > 0