from __future__ import annotations import os import tempfile import unittest from app.config import Settings from app.db import Database, now_iso, to_json from app.dispatcher import Dispatcher, ValidationError, build_feishu_message class DispatcherTest(unittest.TestCase): def setUp(self) -> None: self.tmpdir = tempfile.TemporaryDirectory() self.settings = Settings( database_path=os.path.join(self.tmpdir.name, "test.db"), max_delivery_attempts=2, retry_backoff_seconds=1, feishu_timeout_seconds=1, ) self.db = Database(self.settings) self.db.migrate(self.settings) self.dispatcher = Dispatcher(self.db, self.settings) def tearDown(self) -> None: self.tmpdir.cleanup() def add_target(self, name: str = "ops", url: str = "http://127.0.0.1:9/hook") -> int: now = now_iso() with self.db.connect() as conn: cur = conn.execute( "INSERT INTO webhook_targets (name, webhook_url, enabled, created_at, updated_at) VALUES (?, ?, 1, ?, ?)", (name, url, now, now), ) return int(cur.lastrowid) def add_rule( self, target_id: int, priority: int = 100, name: str = "rule", timeframe: str = "5m", symbol: str = "BTCUSDT", strategy: str = "breakout", ) -> int: return self.add_rule_with_targets([target_id], priority, name, timeframe, symbol, strategy) def add_rule_with_targets( self, target_ids: list[int], priority: int = 100, name: str = "rule", timeframe: str = "5m", symbol: str = "BTCUSDT", strategy: str = "breakout", ) -> int: now = now_iso() with self.db.connect() as conn: cur = conn.execute( """ INSERT INTO routing_rules ( name, timeframe, symbol, strategy, priority, message_type, card_title_template, card_body_template, enabled, target_ids, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, 'card', 'Signal {{symbol}}', 'Price {{price}}', 1, ?, ?, ?) """, (name, timeframe, symbol, strategy, priority, to_json(target_ids), now, now), ) return int(cur.lastrowid) def test_unmatched_alert_is_stored(self) -> None: result = self.dispatcher.receive_alert({"timeframe": "15m", "symbol": "ETHUSDT", "strategy": "trend"}) self.assertEqual(result["status"], "unmatched") with self.db.connect() as conn: alert = conn.execute("SELECT * FROM alerts WHERE id = ?", (result["alert_id"],)).fetchone() self.assertEqual(alert["status"], "unmatched") def test_highest_priority_rule_wins(self) -> None: slow_target = self.add_target("slow") fast_target = self.add_target("fast") slow_rule = self.add_rule(slow_target, priority=100, name="slow") fast_rule = self.add_rule(fast_target, priority=1, name="fast") result = self.dispatcher.receive_alert( {"timeframe": "5m", "symbol": "btcusdt", "strategy": "breakout", "action": "buy"} ) self.assertEqual(result["matched_rule_id"], fast_rule) self.assertNotEqual(result["matched_rule_id"], slow_rule) with self.db.connect() as conn: delivery = conn.execute("SELECT * FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone() self.assertEqual(delivery["target_id"], fast_target) def test_single_dimension_rule_matches(self) -> None: target_id = self.add_target() rule_id = self.add_rule(target_id, timeframe="", symbol="BTCUSDT", strategy="") result = self.dispatcher.receive_alert({"symbol": "btcusdt", "action": "buy"}) self.assertEqual(result["status"], "matched") self.assertEqual(result["matched_rule_id"], rule_id) def test_more_specific_rule_wins_when_priority_ties(self) -> None: broad_target = self.add_target("broad") specific_target = self.add_target("specific") broad_rule = self.add_rule(broad_target, priority=10, name="broad", timeframe="", symbol="BTCUSDT", strategy="") specific_rule = self.add_rule(specific_target, priority=10, name="specific") result = self.dispatcher.receive_alert({"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout"}) self.assertEqual(result["matched_rule_id"], specific_rule) self.assertNotEqual(result["matched_rule_id"], broad_rule) def test_find_matching_rule_preview(self) -> None: target_id = self.add_target() rule_id = self.add_rule(target_id, timeframe="", symbol="BTCUSDT", strategy="") rule = self.dispatcher.find_matching_rule({"symbol": "btcusdt"}) self.assertIsNotNone(rule) self.assertEqual(rule["id"], rule_id) def test_failed_delivery_is_marked_for_retry(self) -> None: target_id = self.add_target() self.add_rule(target_id) result = self.dispatcher.receive_alert( {"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy"} ) with self.db.connect() as conn: delivery = conn.execute("SELECT * FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone() self.assertEqual(delivery["status"], "retry") self.assertEqual(delivery["attempts"], 1) self.assertIsNotNone(delivery["error"]) def test_rule_can_dispatch_to_multiple_targets(self) -> None: target_a = self.add_target("ops-a") target_b = self.add_target("ops-b") self.add_rule_with_targets([target_a, target_b]) result = self.dispatcher.receive_alert( {"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy"} ) self.assertEqual(len(result["delivery_ids"]), 2) with self.db.connect() as conn: count = conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone()["c"] self.assertEqual(count, 2) def test_card_template_uses_alert_fields(self) -> None: message = build_feishu_message( {"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000}, { "message_type": "card", "card_title_template": "{{symbol}} {{action}}", "card_body_template": "**价格** {{price}}", }, ) self.assertEqual(message["msg_type"], "interactive") self.assertEqual(message["card"]["header"]["title"]["content"], "BTCUSDT buy") self.assertEqual(message["card"]["elements"][0]["text"]["content"], "**价格** 68000") def test_template_accepts_legacy_single_braces_and_still_sends_card(self) -> None: message = build_feishu_message( {"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000}, { "message_type": "text", "card_title_template": "TradingView {symbol} {action}", "card_body_template": "价格 {price}", }, ) self.assertEqual(message["msg_type"], "interactive") self.assertEqual(message["card"]["header"]["title"]["content"], "TradingView BTCUSDT buy") self.assertEqual(message["card"]["elements"][0]["text"]["content"], "价格 68000") if __name__ == "__main__": unittest.main()