diff --git a/README.md b/README.md index 1899142..24b8f36 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,17 @@ docker compose up --build Compose 会启动两个服务:`dispatcher` 负责 Web/API/管理台,`worker` 负责周期性处理失败重试。 +## Delivery Reliability + +系统采用“先入库、后发送”的至少一次投递模型: + +- TradingView webhook 命中规则后,会先写入 `alerts` 和 `deliveries`,再异步发送飞书。 +- 未发送的飞书会保留在 `deliveries`,状态为 `pending`、`processing`、`retry` 或 `failed`。 +- 每次实际发送尝试都会写入 `delivery_attempts`,包括 HTTP 状态、飞书返回体、错误原因和下次重试时间。 +- 发送失败会按 `RETRY_BACKOFF_SECONDS` 延迟重试,直到达到 `MAX_DELIVERY_ATTEMPTS`。 +- 飞书 HTTP 200 但响应体业务码非 0 时,也会按失败处理并进入重试。 +- 管理台「日志」页面可以查看未发送、重试中、失败的分发记录,并可手动立即重发。 + ## TradingView Payload ```json diff --git a/app/db.py b/app/db.py index 0c9b6ec..c484fc7 100644 --- a/app/db.py +++ b/app/db.py @@ -136,8 +136,26 @@ class Database: FOREIGN KEY(target_id) REFERENCES webhook_targets(id) ON DELETE SET NULL ); + CREATE TABLE IF NOT EXISTS delivery_attempts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + delivery_id INTEGER NOT NULL, + alert_id INTEGER NOT NULL, + attempt_no INTEGER NOT NULL, + status TEXT NOT NULL, + response_code INTEGER, + response_body TEXT, + error TEXT, + attempted_at TEXT NOT NULL, + next_attempt_at TEXT, + FOREIGN KEY(delivery_id) REFERENCES deliveries(id) ON DELETE CASCADE, + FOREIGN KEY(alert_id) REFERENCES alerts(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_deliveries_retry ON deliveries(status, next_attempt_at); + + CREATE INDEX IF NOT EXISTS idx_delivery_attempts_delivery + ON delivery_attempts(delivery_id, attempt_no); """ ) existing_columns = { diff --git a/app/dispatcher.py b/app/dispatcher.py index c8d1bb0..5d27de7 100644 --- a/app/dispatcher.py +++ b/app/dispatcher.py @@ -103,6 +103,30 @@ def build_feishu_message(alert: dict[str, Any], rule: dict[str, Any] | None = No } +def feishu_response_error(response_body: str | None) -> str | None: + if not response_body: + return None + try: + payload = json.loads(response_body) + except json.JSONDecodeError: + return None + if not isinstance(payload, dict): + return None + + code = payload.get("code", payload.get("StatusCode", payload.get("status_code"))) + if code in (None, 0, "0"): + return None + + message = ( + payload.get("msg") + or payload.get("message") + or payload.get("StatusMessage") + or payload.get("status_msg") + or "unknown error" + ) + return f"feishu_error: code={code}, message={message}" + + class Dispatcher: def __init__(self, db: Database, settings: Settings): self.db = db @@ -224,9 +248,11 @@ class Dispatcher: delivery_ids.append(int(delivery.lastrowid)) if rule and not delivery_ids: + status = "unmatched" + error = "Matched rule has no webhook targets." conn.execute( "UPDATE alerts SET status = ?, error = ? WHERE id = ?", - ("unmatched", "Matched rule has no webhook targets.", alert_id), + (status, error, alert_id), ) if self.settings.dispatch_inline: @@ -269,20 +295,21 @@ class Dispatcher: """, (now, limit), ).fetchall() - delivery_ids = [row["id"] for row in rows] - if delivery_ids: - placeholders = ",".join("?" for _ in delivery_ids) - conn.execute( - f""" + claimed_rows = [] + for row in rows: + cur = conn.execute( + """ UPDATE deliveries SET status = 'processing', updated_at = ? - WHERE id IN ({placeholders}) + WHERE id = ? AND status IN ('pending', 'retry') """, - (now, *delivery_ids), + (now, row["id"]), ) + if cur.rowcount == 1: + claimed_rows.append(row) - jobs = [(dict(row), from_json(row["payload"], {})) for row in rows] + jobs = [(dict(row), from_json(row["payload"], {})) for row in claimed_rows] if not jobs: return 0 worker_count = max(1, min(concurrency, len(jobs))) @@ -369,6 +396,10 @@ class Dispatcher: if response_code >= 400: status = "failed" error = f"Feishu webhook returned HTTP {response_code}" + else: + error = feishu_response_error(response_body) + if error: + status = "failed" except urllib.error.HTTPError as exc: response_code = exc.code response_body = exc.read(2048).decode(errors="replace") @@ -390,7 +421,28 @@ class Dispatcher: next_time = datetime.now(UTC) + timedelta(seconds=self.settings.retry_backoff_seconds * attempts) next_attempt_at = next_time.replace(microsecond=0).isoformat() + attempted_at = now_iso() with self.db.connect() as conn: + conn.execute( + """ + INSERT INTO delivery_attempts ( + delivery_id, alert_id, attempt_no, status, response_code, + response_body, error, attempted_at, next_attempt_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + delivery["id"], + delivery["alert_id"], + attempts, + status, + response_code, + response_body, + error, + attempted_at, + next_attempt_at, + ), + ) conn.execute( """ UPDATE deliveries @@ -402,11 +454,11 @@ class Dispatcher: status, attempts, next_attempt_at, - now_iso(), + attempted_at, response_code, response_body, error, - now_iso(), + attempted_at, delivery["id"], ), ) diff --git a/app/server.py b/app/server.py index 30354eb..7809530 100644 --- a/app/server.py +++ b/app/server.py @@ -605,7 +605,17 @@ class Handler(BaseHTTPRequestHandler): ).fetchall() else: deliveries = conn.execute( - f"SELECT * FROM deliveries {delivery_where_sql} ORDER BY id DESC LIMIT ? OFFSET ?", + f""" + SELECT d.*, + ( + SELECT COUNT(*) + FROM delivery_attempts da + WHERE da.delivery_id = d.id + ) AS attempt_records + FROM deliveries d + {delivery_where_sql} + ORDER BY d.id DESC LIMIT ? OFFSET ? + """, (*delivery_params, page_size, offset), ).fetchall() return { @@ -619,7 +629,12 @@ class Handler(BaseHTTPRequestHandler): with self.context.db.connect() as conn: alerts = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 100").fetchall() deliveries = conn.execute("SELECT * FROM deliveries ORDER BY id DESC LIMIT 200").fetchall() - return {"alerts": [dict(row) for row in alerts], "deliveries": [dict(row) for row in deliveries]} + attempts = conn.execute("SELECT * FROM delivery_attempts ORDER BY id DESC LIMIT 200").fetchall() + return { + "alerts": [dict(row) for row in alerts], + "deliveries": [dict(row) for row in deliveries], + "delivery_attempts": [dict(row) for row in attempts], + } def render_dashboard(self) -> None: host = self.headers.get("Host", f"localhost:{self.context.settings.port}") @@ -927,18 +942,18 @@ class Handler(BaseHTTPRequestHandler):
查看
{html.escape(raw_payload)}
""" delivery_rows = "".join( - f"""{row['id']}{row['alert_id']}{html.escape(row['target_name'])}{html.escape(row['status'])}{row['attempts']}{html.escape(str(row['response_code'] or ''))}{format_display_time(self.context.settings, row['last_attempt_at'])}{html.escape(row['error'] or '')}{format_display_time(self.context.settings, row['next_attempt_at'])}
""" + f"""{row['id']}{row['alert_id']}{html.escape(row['target_name'])}{html.escape(row['status'])}{row['attempts']}{row.get('attempt_records', row['attempts'])}{html.escape(str(row['response_code'] or ''))}{format_display_time(self.context.settings, row['last_attempt_at'])}{html.escape(row['error'] or '')}{format_display_time(self.context.settings, row['next_attempt_at'])}
""" for row in logs["deliveries"] ) alert_empty = '暂无 Alert 日志' - delivery_empty = '暂无分发日志' + delivery_empty = '暂无分发日志' alert_active = " active" if active_tab == "alerts" else "" delivery_active = " active" if active_tab == "deliveries" else "" active_table = ( f"""{alert_rows or alert_empty}
ID品种周期策略状态错误时间原始 Alert
{render_pagination("/logs", "alerts", page, logs["alert_total"], LOG_PAGE_SIZE, filters)}""" if active_tab == "alerts" - else f"""{delivery_rows or delivery_empty}
IDAlert目标状态次数HTTP发送时间错误下次重试操作
+ else f"""{delivery_rows or delivery_empty}
IDAlert目标状态次数明细HTTP发送时间错误下次重试操作
{render_pagination("/logs", "deliveries", page, logs["delivery_total"], LOG_PAGE_SIZE, filters)}""" ) timezone_label = html.escape(display_timezone_label(self.context.settings)) diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py index c3d369d..7464685 100644 --- a/tests/test_dispatcher.py +++ b/tests/test_dispatcher.py @@ -3,6 +3,7 @@ from __future__ import annotations import os import tempfile import unittest +from unittest.mock import patch from app.config import Settings from app.db import Database, now_iso, to_json @@ -163,6 +164,69 @@ class DispatcherTest(unittest.TestCase): self.assertEqual(delivery["attempts"], 1) self.assertTrue(delivery["error"].startswith("network_error:") or delivery["error"].startswith("send_error:")) + def test_feishu_business_error_is_retried_even_with_http_200(self) -> None: + class Response: + def __enter__(self) -> "Response": + return self + + def __exit__(self, exc_type: object, exc: object, traceback: object) -> None: + return None + + def getcode(self) -> int: + return 200 + + def read(self, size: int = -1) -> bytes: + return b'{"code":9499,"msg":"bad sign"}' + + self.add_rule(self.add_target(url="https://open.feishu.cn/open-apis/bot/v2/hook/test")) + + with patch("urllib.request.urlopen", return_value=Response()): + result = self.dispatcher.receive_alert( + {"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy"} + ) + processed = self.dispatcher.process_due_deliveries(limit=10) + + with self.db.connect() as conn: + delivery = conn.execute("SELECT * FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone() + attempt = conn.execute( + "SELECT * FROM delivery_attempts WHERE delivery_id = ?", + (delivery["id"],), + ).fetchone() + self.assertEqual(processed, 1) + self.assertEqual(delivery["status"], "retry") + self.assertEqual(delivery["attempts"], 1) + self.assertIn("feishu_error: code=9499", delivery["error"]) + self.assertEqual(attempt["attempt_no"], 1) + self.assertEqual(attempt["status"], "retry") + self.assertIn("feishu_error: code=9499", attempt["error"]) + + def test_retry_attempts_are_recorded_individually(self) -> None: + self.add_rule(self.add_target(url="https://open.feishu.cn/open-apis/bot/v2/hook/test")) + + with patch("urllib.request.urlopen", side_effect=TimeoutError("timed out")): + result = self.dispatcher.receive_alert( + {"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy"} + ) + self.dispatcher.process_due_deliveries(limit=10) + with self.db.connect() as conn: + delivery = conn.execute( + "SELECT * FROM deliveries WHERE alert_id = ?", + (result["alert_id"],), + ).fetchone() + self.dispatcher.retry_delivery_now(delivery["id"]) + + with self.db.connect() as conn: + delivery = conn.execute("SELECT * FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone() + attempts = conn.execute( + "SELECT * FROM delivery_attempts WHERE delivery_id = ? ORDER BY attempt_no", + (delivery["id"],), + ).fetchall() + + self.assertEqual(delivery["status"], "failed") + self.assertEqual(delivery["attempts"], 2) + self.assertEqual([row["attempt_no"] for row in attempts], [1, 2]) + self.assertEqual([row["status"] for row in attempts], ["retry", "failed"]) + def test_rule_can_dispatch_to_multiple_targets(self) -> None: target_a = self.add_target("ops-a") target_b = self.add_target("ops-b") @@ -177,6 +241,20 @@ class DispatcherTest(unittest.TestCase): count = conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone()["c"] self.assertEqual(count, 2) + def test_rule_with_missing_targets_returns_unmatched(self) -> None: + self.add_rule_with_targets([999]) + + result = self.dispatcher.receive_alert( + {"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy"} + ) + + with self.db.connect() as conn: + alert = conn.execute("SELECT * FROM alerts WHERE id = ?", (result["alert_id"],)).fetchone() + self.assertEqual(result["status"], "unmatched") + self.assertEqual(result["delivery_ids"], []) + self.assertEqual(alert["status"], "unmatched") + self.assertEqual(alert["error"], "Matched rule has no webhook targets.") + def test_legacy_disabled_target_is_still_dispatchable(self) -> None: target_id = self.add_target() with self.db.connect() as conn: