tradingview-alert-dispatcher/tests/test_dispatcher.py
2026-05-15 22:04:05 +08:00

196 lines
7.9 KiB
Python

from __future__ import annotations
import os
import tempfile
import unittest
from app.config import Settings
from app.db import Database, now_iso, to_json
from app.dispatcher import Dispatcher, ValidationError, build_feishu_message
class DispatcherTest(unittest.TestCase):
def setUp(self) -> None:
self.tmpdir = tempfile.TemporaryDirectory()
self.settings = Settings(
database_path=os.path.join(self.tmpdir.name, "test.db"),
max_delivery_attempts=2,
retry_backoff_seconds=1,
feishu_timeout_seconds=1,
)
self.db = Database(self.settings)
self.db.migrate(self.settings)
self.dispatcher = Dispatcher(self.db, self.settings)
def tearDown(self) -> None:
self.tmpdir.cleanup()
def add_target(self, name: str = "ops", url: str = "http://127.0.0.1:9/hook") -> int:
now = now_iso()
with self.db.connect() as conn:
cur = conn.execute(
"INSERT INTO webhook_targets (name, webhook_url, enabled, created_at, updated_at) VALUES (?, ?, 1, ?, ?)",
(name, url, now, now),
)
return int(cur.lastrowid)
def add_rule(
self,
target_id: int,
priority: int = 100,
name: str = "rule",
timeframe: str = "5m",
symbol: str = "BTCUSDT",
strategy: str = "breakout",
) -> int:
return self.add_rule_with_targets([target_id], priority, name, timeframe, symbol, strategy)
def add_rule_with_targets(
self,
target_ids: list[int],
priority: int = 100,
name: str = "rule",
timeframe: str = "5m",
symbol: str = "BTCUSDT",
strategy: str = "breakout",
) -> int:
now = now_iso()
with self.db.connect() as conn:
cur = conn.execute(
"""
INSERT INTO routing_rules
(
name, timeframe, symbol, strategy, priority, message_type,
card_title_template, card_body_template, enabled, target_ids,
created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, 'card', 'Signal {{symbol}}', 'Price {{price}}', 1, ?, ?, ?)
""",
(name, timeframe, symbol, strategy, priority, to_json(target_ids), now, now),
)
return int(cur.lastrowid)
def test_unmatched_alert_is_stored(self) -> None:
result = self.dispatcher.receive_alert({"timeframe": "15m", "symbol": "ETHUSDT", "strategy": "trend"})
self.assertEqual(result["status"], "unmatched")
with self.db.connect() as conn:
alert = conn.execute("SELECT * FROM alerts WHERE id = ?", (result["alert_id"],)).fetchone()
self.assertEqual(alert["status"], "unmatched")
def test_highest_priority_rule_wins(self) -> None:
slow_target = self.add_target("slow")
fast_target = self.add_target("fast")
slow_rule = self.add_rule(slow_target, priority=100, name="slow")
fast_rule = self.add_rule(fast_target, priority=1, name="fast")
result = self.dispatcher.receive_alert(
{"timeframe": "5m", "symbol": "btcusdt", "strategy": "breakout", "action": "buy"}
)
self.assertEqual(result["matched_rule_id"], fast_rule)
self.assertNotEqual(result["matched_rule_id"], slow_rule)
with self.db.connect() as conn:
delivery = conn.execute("SELECT * FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone()
self.assertEqual(delivery["target_id"], fast_target)
def test_single_dimension_rule_matches(self) -> None:
target_id = self.add_target()
rule_id = self.add_rule(target_id, timeframe="", symbol="BTCUSDT", strategy="")
result = self.dispatcher.receive_alert({"symbol": "btcusdt", "action": "buy"})
self.assertEqual(result["status"], "matched")
self.assertEqual(result["matched_rule_id"], rule_id)
def test_more_specific_rule_wins_when_priority_ties(self) -> None:
broad_target = self.add_target("broad")
specific_target = self.add_target("specific")
broad_rule = self.add_rule(broad_target, priority=10, name="broad", timeframe="", symbol="BTCUSDT", strategy="")
specific_rule = self.add_rule(specific_target, priority=10, name="specific")
result = self.dispatcher.receive_alert({"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout"})
self.assertEqual(result["matched_rule_id"], specific_rule)
self.assertNotEqual(result["matched_rule_id"], broad_rule)
def test_find_matching_rule_preview(self) -> None:
target_id = self.add_target()
rule_id = self.add_rule(target_id, timeframe="", symbol="BTCUSDT", strategy="")
rule = self.dispatcher.find_matching_rule({"symbol": "btcusdt"})
self.assertIsNotNone(rule)
self.assertEqual(rule["id"], rule_id)
def test_failed_delivery_is_marked_for_retry(self) -> None:
target_id = self.add_target()
self.add_rule(target_id)
result = self.dispatcher.receive_alert(
{"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy"}
)
with self.db.connect() as conn:
delivery = conn.execute("SELECT * FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone()
self.assertEqual(delivery["status"], "retry")
self.assertEqual(delivery["attempts"], 1)
self.assertIsNotNone(delivery["error"])
def test_rule_can_dispatch_to_multiple_targets(self) -> None:
target_a = self.add_target("ops-a")
target_b = self.add_target("ops-b")
self.add_rule_with_targets([target_a, target_b])
result = self.dispatcher.receive_alert(
{"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy"}
)
self.assertEqual(len(result["delivery_ids"]), 2)
with self.db.connect() as conn:
count = conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone()["c"]
self.assertEqual(count, 2)
def test_legacy_disabled_target_is_still_dispatchable(self) -> None:
target_id = self.add_target()
with self.db.connect() as conn:
conn.execute("UPDATE webhook_targets SET enabled = 0 WHERE id = ?", (target_id,))
self.add_rule(target_id)
result = self.dispatcher.receive_alert(
{"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy"}
)
self.assertEqual(len(result["delivery_ids"]), 1)
def test_card_template_uses_alert_fields(self) -> None:
message = build_feishu_message(
{"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000},
{
"message_type": "card",
"card_title_template": "{{symbol}} {{action}}",
"card_body_template": "**价格** {{price}}",
},
)
self.assertEqual(message["msg_type"], "interactive")
self.assertEqual(message["card"]["header"]["title"]["content"], "BTCUSDT buy")
self.assertEqual(message["card"]["elements"][0]["text"]["content"], "**价格** 68000")
def test_template_accepts_legacy_single_braces_and_still_sends_card(self) -> None:
message = build_feishu_message(
{"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000},
{
"message_type": "text",
"card_title_template": "TradingView {symbol} {action}",
"card_body_template": "价格 {price}",
},
)
self.assertEqual(message["msg_type"], "interactive")
self.assertEqual(message["card"]["header"]["title"]["content"], "TradingView BTCUSDT buy")
self.assertEqual(message["card"]["elements"][0]["text"]["content"], "价格 68000")
if __name__ == "__main__":
unittest.main()