import json
import os
import sys
from datetime import datetime, timedelta
from unittest.mock import patch
import pandas as pd
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.services import event_driven_screener as ed
def test_symbol_extraction_filters_usdt_suffix_and_pollution():
title = "Binance Futures Will Launch ABCUSDT, XYZUSDT and USD1USDT USDⓈ-Margined Perpetual Contracts"
symbols = ed._symbol_from_title(title)
assert "ABC/USDT" in symbols
assert "XYZ/USDT" in symbols
assert "ABCUSDT/USDT" not in symbols
assert "USD1/USDT" not in symbols
def test_recent_time_window_rejects_old_news():
assert ed._is_recent(datetime.now() - timedelta(hours=2), 3) is True
assert ed._is_recent(datetime.now() - timedelta(hours=8), 3) is False
assert ed._is_recent(None, 3) is False
def test_classify_major_listing_as_s_level_and_negative_as_risk():
level, event_type = ed.classify_event("Binance Futures Will Launch ABCUSDT USDⓈ-Margined Perpetual Contracts")
assert level == "S"
assert event_type == "major_listing_or_contract"
level, event_type = ed.classify_event("Binance Will Delist ABC on 2026-05-07")
assert level == "RISK"
assert event_type == "risk_negative"
def test_store_events_deduplicates_by_hash(tmp_path):
# 使用真实DB表,但同一事件重复插入只保留一次
event = {
"source": "binance_listing",
"symbol": "ABC/USDT",
"title": "Binance Futures Will Launch ABCUSDT USDⓈ-Margined Perpetual Contracts",
"url": "https://example.com",
"published_at": datetime.now(),
"importance": "S",
"event_type": "major_listing_or_contract",
"raw": {"id": 1},
}
first = ed.store_events([event])
second = ed.store_events([event])
assert len(first) in (0, 1) # 若本地DB已有同事件,允许0
assert second == []
def test_quick_technical_check_rejects_old_overheated_gain():
event = {
"source": "binance_listing",
"symbol": "ABC/USDT",
"title": "Binance Futures Will Launch ABCUSDT USDⓈ-Margined Perpetual Contracts",
"importance": "S",
}
with patch.object(ed, "_ticker_info", return_value={"price": 1.0, "change_24h": 35.0, "volume_24h": 10000000}):
result = ed.quick_technical_check(event)
assert result["decision"] == "risk"
assert "不追高" in result["reason"]
def test_theme_expansion_spreads_ton_news_to_ecosystem_symbols():
event = {
"source": "coingecko_trending",
"symbol": "TON/USDT",
"title": "Telegram becomes the main driver of the TON ecosystem and cuts TON fees",
"url": "https://example.com/ton",
"published_at": datetime.now(),
"importance": "B",
"event_type": "market_heat",
"raw": {},
}
expanded = ed.expand_theme_events([event])
by_symbol = {e["symbol"]: e for e in expanded}
assert "TON/USDT" in by_symbol
assert "NOT/USDT" in by_symbol
assert "DOGS/USDT" in by_symbol
assert by_symbol["DOGS/USDT"]["importance"] == "A"
assert by_symbol["DOGS/USDT"]["event_type"] == "theme_expansion"
assert "主题扩散:ton_ecosystem" in by_symbol["DOGS/USDT"]["title"]
def test_theme_expansion_respects_max_expanded_symbols(monkeypatch):
event = {
"source": "coingecko_trending",
"symbol": "TON/USDT",
"title": "Telegram becomes the main driver of the TON ecosystem and cuts TON fees",
"url": "https://example.com/ton",
"published_at": datetime.now(),
"importance": "B",
"event_type": "market_heat",
"raw": {},
}
original_cfg = ed._cfg()
limited_cfg = json.loads(json.dumps(original_cfg))
limited_cfg["theme_expansion"]["max_expanded_symbols"] = 2
monkeypatch.setattr(ed, "_cfg", lambda: limited_cfg)
expanded = ed.expand_theme_events([event])
assert len(expanded) <= 3 # 原始事件 + 最多 2 个扩散事件
def test_wublock_atom_feed_events_are_parsed_and_symbolized(monkeypatch):
xml = """
SUI ecosystem project announces major upgrade and airdrop
2026-05-16T10:00:00Z
"""
class Resp:
status_code = 200
content = xml.encode("utf-8")
monkeypatch.setattr(ed, "_now", lambda: datetime(2026, 5, 16, 18, 0, 0))
with patch.object(ed.requests, "get", return_value=Resp()):
events = ed.fetch_rss_events("wublock123", {
"enabled": True,
"type": "rss",
"url": "https://www.wublock123.com/feed",
"weight": "A",
"symbol_aliases": {"sui": "SUI"},
})
assert len(events) == 1
assert events[0]["source"] == "wublock123"
assert events[0]["symbol"] == "SUI/USDT"
assert events[0]["importance"] == "A"
assert events[0]["url"] == "https://www.wublock123.com/example"
def test_panews_rss_feed_events_are_parsed_and_symbolized(monkeypatch):
xml = """
-
Solana 生态 Meme 项目成交量大幅上升,SUI 同步活跃
https://www.panewslab.com/example
Sat, 16 May 2026 10:00:00 GMT
"""
class Resp:
status_code = 200
content = xml.encode("utf-8")
monkeypatch.setattr(ed, "_now", lambda: datetime(2026, 5, 16, 18, 0, 0))
with patch.object(ed.requests, "get", return_value=Resp()):
events = ed.fetch_rss_events("panewslab", {
"enabled": True,
"type": "rss",
"url": "https://www.panewslab.com/rss.xml?lang=zh&type=NORMAL%2CNEWS",
"weight": "A",
"symbol_aliases": {"solana": "SOL", "sui": "SUI"},
})
assert {e["symbol"] for e in events} == {"SOL/USDT", "SUI/USDT"}
assert {e["source"] for e in events} == {"panewslab"}
assert all(e["importance"] == "A" for e in events)
def _fake_ohlcv(rows=60):
return pd.DataFrame({
"timestamp": pd.date_range("2026-05-01", periods=rows, freq="h"),
"open": [1.0] * rows,
"high": [1.02] * rows,
"low": [0.98] * rows,
"close": [1.0] * rows,
"volume": [1000.0] * rows,
})
def test_theme_static_accumulation_bonus_can_upgrade_to_recommend():
event = {
"source": "coingecko_trending",
"symbol": "DOGS/USDT",
"title": "[主题扩散:ton_ecosystem] Telegram becomes the main driver of the TON ecosystem",
"importance": "A",
"event_type": "theme_expansion",
}
with patch.object(ed, "_ticker_info", return_value={"price": 0.001, "change_24h": 5.0, "volume_24h": 10000000}), \
patch.object(ed, "fetch_klines", return_value=_fake_ohlcv()), \
patch.object(ed, "detect_volume_price_fly", return_value=None), \
patch.object(ed, "detect_static_accumulation", return_value={"static_count": 23, "vol_ratio": 1.4}), \
patch.object(ed, "full_pa_analysis", return_value={"ignition_points": []}), \
patch.object(ed, "fetch_derivatives_context", return_value={"funding_rate": 0, "top_trader_long_pct": 56}), \
patch.object(ed, "calc_atr", return_value=0.00005):
result = ed.quick_technical_check(event)
assert result["score"] >= 6
assert result["decision"] == "recommend"
assert any("生态主题+强静K蓄力升权" in s for s in result["signals"])
def test_run_once_caps_processed_events(monkeypatch):
events = [
{
"source": "unit",
"symbol": f"T{i}/USDT",
"title": f"event {i}",
"url": "",
"published_at": datetime.now(),
"importance": "A",
"event_type": "news",
"event_hash": f"h{i}",
"raw": {},
}
for i in range(6)
]
processed = []
monkeypatch.setattr(ed, "init_db", lambda: None)
monkeypatch.setattr(ed, "init_event_tables", lambda: None)
monkeypatch.setattr(ed, "collect_events", lambda: events)
monkeypatch.setattr(ed, "store_events", lambda collected: collected)
monkeypatch.setattr(ed, "process_event", lambda event: processed.append(event["symbol"]) or {
"event": event,
"result": {"decision": "ignore", "score": 0, "reason": ""},
"rec_id": 0,
"pushed": False,
})
monkeypatch.setattr(ed, "log_cron_run", lambda **kwargs: None)
result = ed.run_once(limit=2, max_seconds=30)
assert result["processed_count"] == 2
assert result["skipped_due_to_limit"] == 4
assert processed == ["T0/USDT", "T1/USDT"]