From e86c52801383b08bb33bd8d85a9662bd8305ebe0 Mon Sep 17 00:00:00 2001
From: aaron <>
Date: Mon, 18 May 2026 20:50:16 +0800
Subject: [PATCH] 1
---
.env.example | 3 +
app/config.py | 8 ++
app/db.py | 6 ++
app/dispatcher.py | 153 +++++++++++++++++++++++++---
app/server.py | 209 +++++++++++++++++++++++++++++++++++----
app/static/styles.css | 32 +++++-
app/worker.py | 25 ++++-
docker-compose.yml | 8 ++
tests/test_dispatcher.py | 32 +++++-
9 files changed, 433 insertions(+), 43 deletions(-)
diff --git a/.env.example b/.env.example
index e8d8222..4f52b7c 100644
--- a/.env.example
+++ b/.env.example
@@ -16,4 +16,7 @@ RETENTION_DAYS=30
MAX_DELIVERY_ATTEMPTS=3
RETRY_BACKOFF_SECONDS=60
FEISHU_TIMEOUT_SECONDS=10
+DISPATCH_INLINE=false
+DELIVERY_BATCH_SIZE=100
+DELIVERY_CONCURRENCY=5
WORKER_INTERVAL_SECONDS=15
diff --git a/app/config.py b/app/config.py
index d8f685d..fb8907f 100644
--- a/app/config.py
+++ b/app/config.py
@@ -19,6 +19,10 @@ class Settings:
retry_backoff_seconds: int = 60
feishu_timeout_seconds: int = 10
timezone: str = ""
+ dispatch_inline: bool = False
+ delivery_batch_size: int = 100
+ delivery_concurrency: int = 5
+ worker_interval_seconds: int = 15
def get_settings() -> Settings:
@@ -35,4 +39,8 @@ def get_settings() -> Settings:
retry_backoff_seconds=int(os.getenv("RETRY_BACKOFF_SECONDS", "60")),
feishu_timeout_seconds=int(os.getenv("FEISHU_TIMEOUT_SECONDS", "10")),
timezone=os.getenv("APP_TIMEZONE") or os.getenv("TZ", ""),
+ dispatch_inline=os.getenv("DISPATCH_INLINE", "").lower() in {"1", "true", "yes", "on"},
+ delivery_batch_size=int(os.getenv("DELIVERY_BATCH_SIZE", "100")),
+ delivery_concurrency=int(os.getenv("DELIVERY_CONCURRENCY", "5")),
+ worker_interval_seconds=int(os.getenv("WORKER_INTERVAL_SECONDS", "15")),
)
diff --git a/app/db.py b/app/db.py
index fde24a3..0c9b6ec 100644
--- a/app/db.py
+++ b/app/db.py
@@ -91,6 +91,12 @@ class Database:
updated_at TEXT NOT NULL
);
+ CREATE TABLE IF NOT EXISTS app_state (
+ key TEXT PRIMARY KEY,
+ value TEXT NOT NULL,
+ updated_at TEXT NOT NULL
+ );
+
CREATE INDEX IF NOT EXISTS idx_rules_match
ON routing_rules(enabled, timeframe, symbol, strategy, priority);
diff --git a/app/dispatcher.py b/app/dispatcher.py
index b8cd415..c8d1bb0 100644
--- a/app/dispatcher.py
+++ b/app/dispatcher.py
@@ -2,8 +2,10 @@ from __future__ import annotations
import json
import re
+import socket
import urllib.error
import urllib.request
+from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from typing import Any
@@ -130,13 +132,43 @@ class Dispatcher:
).fetchone()
return dict(row) if row else None
+ def explain_unmatched_rule(self, alert: dict[str, Any]) -> str:
+ normalized = normalize_alert(alert)
+ with self.db.connect() as conn:
+ candidates = conn.execute(
+ """
+ SELECT id, name, timeframe, symbol, strategy, enabled, priority
+ FROM routing_rules
+ ORDER BY enabled DESC, priority ASC, id ASC
+ LIMIT 8
+ """
+ ).fetchall()
+ if not candidates:
+ return "No routing rules configured."
+
+ details = []
+ for row in candidates:
+ mismatches = []
+ if not row["enabled"]:
+ mismatches.append("rule disabled")
+ if row["timeframe"] and row["timeframe"] != normalized["timeframe"]:
+ mismatches.append(f"timeframe {normalized['timeframe'] or '-'} != {row['timeframe']}")
+ if row["symbol"] and row["symbol"].upper() != normalized["symbol"]:
+ mismatches.append(f"symbol {normalized['symbol'] or '-'} != {row['symbol']}")
+ if row["strategy"] and row["strategy"] != normalized["strategy"]:
+ mismatches.append(f"strategy {normalized['strategy'] or '-'} != {row['strategy']}")
+ reason = "; ".join(mismatches) if mismatches else "matched fields but was not selected"
+ details.append(f"#{row['id']} {row['name']}: {reason}")
+ return "No enabled routing rule matched this alert. Closest rules: " + " | ".join(details)
+
def receive_alert(self, payload: dict[str, Any]) -> dict[str, Any]:
alert = normalize_alert(payload)
created_at = now_iso()
with self.db.connect() as conn:
rule = self.find_matching_rule(alert)
- status = "matched" if rule else "unmatched"
+ status = "queued" if rule else "unmatched"
+ error = None if rule else self.explain_unmatched_rule(alert)
cur = conn.execute(
"""
INSERT INTO alerts (
@@ -154,7 +186,7 @@ class Dispatcher:
to_json(alert),
rule["id"] if rule else None,
status,
- None if rule else "No enabled routing rule matched this alert.",
+ error,
created_at,
),
)
@@ -184,7 +216,7 @@ class Dispatcher:
target["id"],
target["name"],
target["webhook_url"],
- created_at,
+ None,
created_at,
created_at,
),
@@ -197,7 +229,11 @@ class Dispatcher:
("unmatched", "Matched rule has no webhook targets.", alert_id),
)
- self.process_due_deliveries()
+ if self.settings.dispatch_inline:
+ self.process_due_deliveries(
+ limit=self.settings.delivery_batch_size,
+ concurrency=self.settings.delivery_concurrency,
+ )
return {
"alert_id": alert_id,
"status": status,
@@ -205,9 +241,21 @@ class Dispatcher:
"delivery_ids": delivery_ids,
}
- def process_due_deliveries(self, limit: int = 25) -> int:
+ def process_due_deliveries(self, limit: int = 25, concurrency: int = 1) -> int:
now = now_iso()
+ stale_cutoff = (datetime.now(UTC) - timedelta(seconds=max(300, self.settings.retry_backoff_seconds))).replace(
+ microsecond=0
+ ).isoformat()
with self.db.connect() as conn:
+ conn.execute(
+ """
+ UPDATE deliveries
+ SET status = 'retry', next_attempt_at = ?, updated_at = ?,
+ error = COALESCE(error, 'worker_recovered: stale processing task reset')
+ WHERE status = 'processing' AND updated_at <= ?
+ """,
+ (now, now, stale_cutoff),
+ )
rows = conn.execute(
"""
SELECT d.*, a.payload, r.card_title_template, r.card_body_template
@@ -220,15 +268,84 @@ class Dispatcher:
LIMIT ?
""",
(now, limit),
- ).fetchall()
+ ).fetchall()
+ delivery_ids = [row["id"] for row in rows]
+ if delivery_ids:
+ placeholders = ",".join("?" for _ in delivery_ids)
+ conn.execute(
+ f"""
+ UPDATE deliveries
+ SET status = 'processing', updated_at = ?
+ WHERE id IN ({placeholders})
+ AND status IN ('pending', 'retry')
+ """,
+ (now, *delivery_ids),
+ )
- processed = 0
- for row in rows:
- delivery = dict(row)
- payload = from_json(delivery["payload"], {})
- self._send_delivery(delivery, payload)
- processed += 1
- return processed
+ jobs = [(dict(row), from_json(row["payload"], {})) for row in rows]
+ if not jobs:
+ return 0
+ worker_count = max(1, min(concurrency, len(jobs)))
+ if worker_count == 1:
+ for delivery, payload in jobs:
+ self._send_delivery(delivery, payload)
+ else:
+ with ThreadPoolExecutor(max_workers=worker_count) as executor:
+ futures = [executor.submit(self._send_delivery, delivery, payload) for delivery, payload in jobs]
+ for future in futures:
+ future.result()
+ return len(jobs)
+
+ def process_delivery_by_id(self, delivery_id: int) -> bool:
+ now = now_iso()
+ with self.db.connect() as conn:
+ row = conn.execute(
+ """
+ SELECT d.*, a.payload, r.card_title_template, r.card_body_template
+ FROM deliveries d
+ JOIN alerts a ON a.id = d.alert_id
+ LEFT JOIN routing_rules r ON r.id = d.rule_id
+ WHERE d.id = ?
+ """,
+ (delivery_id,),
+ ).fetchone()
+ if not row or row["status"] == "sent":
+ return False
+ conn.execute(
+ """
+ UPDATE deliveries
+ SET status = 'processing', next_attempt_at = NULL, updated_at = ?
+ WHERE id = ?
+ """,
+ (now, delivery_id),
+ )
+ delivery = dict(row)
+ payload = from_json(delivery["payload"], {})
+ self._send_delivery(delivery, payload)
+ return True
+
+ def retry_delivery_now(self, delivery_id: int) -> bool:
+ now = now_iso()
+ with self.db.connect() as conn:
+ row = conn.execute(
+ """
+ SELECT status
+ FROM deliveries
+ WHERE id = ?
+ """,
+ (delivery_id,),
+ ).fetchone()
+ if not row or row["status"] == "sent":
+ return False
+ conn.execute(
+ """
+ UPDATE deliveries
+ SET status = 'pending', next_attempt_at = ?, updated_at = ?
+ WHERE id = ?
+ """,
+ (now, now, delivery_id),
+ )
+ return self.process_delivery_by_id(delivery_id)
def _send_delivery(self, delivery: dict[str, Any], alert: dict[str, Any]) -> None:
attempts = int(delivery["attempts"]) + 1
@@ -256,10 +373,16 @@ class Dispatcher:
response_code = exc.code
response_body = exc.read(2048).decode(errors="replace")
status = "failed"
- error = f"Feishu webhook returned HTTP {exc.code}"
+ error = f"http_error: Feishu webhook returned HTTP {exc.code}"
+ except (TimeoutError, socket.timeout) as exc:
+ status = "failed"
+ error = f"timeout: {exc}"
+ except urllib.error.URLError as exc:
+ status = "failed"
+ error = f"network_error: {exc.reason}"
except Exception as exc:
status = "failed"
- error = str(exc)
+ error = f"send_error: {exc}"
next_attempt_at = None
if status == "failed" and attempts < self.settings.max_delivery_attempts:
diff --git a/app/server.py b/app/server.py
index 4cbcfdd..346e69a 100644
--- a/app/server.py
+++ b/app/server.py
@@ -123,18 +123,30 @@ def page_window(current: int, total_pages: int) -> list[int]:
return list(range(start, end + 1))
-def render_pagination(base_path: str, active_tab: str, page: int, total: int, page_size: int) -> str:
+def render_pagination(
+ base_path: str,
+ active_tab: str,
+ page: int,
+ total: int,
+ page_size: int,
+ filters: dict[str, str] | None = None,
+) -> str:
total_pages = max(1, (total + page_size - 1) // page_size)
if total_pages <= 1:
return f'
'
+ filters = filters or {}
def item(label: str, target_page: int, disabled: bool = False, current: bool = False) -> str:
if disabled:
return f'{html.escape(label)}'
active_class = " current" if current else ""
+ query = {"tab": active_tab, "page": str(target_page), **filters}
+ query_string = "&".join(
+ f"{html.escape(key)}={html.escape(value)}" for key, value in query.items() if value
+ )
return (
f'{html.escape(label)}'
+ f'href="{base_path}?{query_string}">{html.escape(label)}'
)
links = [
@@ -177,6 +189,61 @@ def format_display_time(settings: Settings, value: str | None) -> str:
return parsed.astimezone(display_timezone(settings)).strftime("%Y-%m-%d %H:%M:%S")
+def filter_value(query: dict[str, list[str]], key: str) -> str:
+ return query.get(key, [""])[-1].strip()
+
+
+def status_select(name: str, selected: str, options: list[tuple[str, str]]) -> str:
+ items = ['']
+ for value, label in options:
+ checked = " selected" if selected == value else ""
+ items.append(f'')
+ return f''
+
+
+def log_filter_form(active_tab: str, filters: dict[str, str]) -> str:
+ symbol = html.escape(filters.get("symbol", ""))
+ strategy = html.escape(filters.get("strategy", ""))
+ status = filters.get("status", "")
+ target = html.escape(filters.get("target", ""))
+ if active_tab == "alerts":
+ alert_status = status_select(
+ "status",
+ status,
+ [
+ ("queued", "已入队"),
+ ("unmatched", "未命中"),
+ ("partial", "部分完成"),
+ ("delivered", "已送达"),
+ ],
+ )
+ fields = f"""
+
+
+"""
+ else:
+ delivery_status = status_select(
+ "status",
+ status,
+ [
+ ("pending", "待发送"),
+ ("processing", "发送中"),
+ ("retry", "重试中"),
+ ("failed", "失败"),
+ ("sent", "已发送"),
+ ],
+ )
+ fields = f"""
+
+"""
+ return f""""""
+
+
class Handler(BaseHTTPRequestHandler):
context: AppContext
@@ -186,7 +253,7 @@ class Handler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/health":
- json_response(self, 200, {"ok": True})
+ self.handle_health()
return
if parsed.path == "/login":
self.render_login()
@@ -257,6 +324,7 @@ class Handler(BaseHTTPRequestHandler):
"/test/send": self.send_test,
"/account/password": self.change_password,
"/deliveries/retry": self.retry_deliveries,
+ "/deliveries/retry-one": self.retry_delivery_one,
"/logout": self.logout,
}
handler = routes.get(parsed.path)
@@ -378,6 +446,51 @@ class Handler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(content)
+ def handle_health(self) -> None:
+ now = datetime.now(timezone.utc)
+ try:
+ with self.context.db.connect() as conn:
+ conn.execute("SELECT 1").fetchone()
+ counts = {
+ "pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'pending'").fetchone()["c"],
+ "retry": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'retry'").fetchone()["c"],
+ "processing": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'processing'").fetchone()["c"],
+ "failed": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'failed'").fetchone()["c"],
+ }
+ worker_row = conn.execute(
+ "SELECT value FROM app_state WHERE key = 'worker.last_seen_at'"
+ ).fetchone()
+ except Exception as exc:
+ json_response(self, 503, {"ok": False, "database": "error", "error": str(exc)})
+ return
+
+ worker_last_seen = worker_row["value"] if worker_row else None
+ worker_fresh = False
+ if worker_last_seen:
+ try:
+ parsed = datetime.fromisoformat(worker_last_seen)
+ if parsed.tzinfo is None:
+ parsed = parsed.replace(tzinfo=timezone.utc)
+ max_age = max(60, self.context.settings.worker_interval_seconds * 3)
+ worker_fresh = (now - parsed.astimezone(timezone.utc)).total_seconds() <= max_age
+ except ValueError:
+ worker_fresh = False
+ json_response(
+ self,
+ 200,
+ {
+ "ok": True,
+ "database": "ok",
+ "queue": counts,
+ "worker": {
+ "last_seen_at": worker_last_seen,
+ "fresh": worker_fresh,
+ "interval_seconds": self.context.settings.worker_interval_seconds,
+ },
+ "dispatch_inline": self.context.settings.dispatch_inline,
+ },
+ )
+
def handle_tradingview_webhook(self) -> None:
if self.context.settings.webhook_token:
query = parse_qs(urlparse(self.path).query)
@@ -417,22 +530,53 @@ class Handler(BaseHTTPRequestHandler):
active_tab: str = "alerts",
page: int = 1,
page_size: int = LOG_PAGE_SIZE,
+ filters: dict[str, str] | None = None,
) -> dict[str, Any]:
+ filters = filters or {}
offset = (page - 1) * page_size
with self.context.db.connect() as conn:
- alert_total = conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"]
- delivery_total = conn.execute("SELECT COUNT(*) AS c FROM deliveries").fetchone()["c"]
+ alert_where = []
+ alert_params: list[Any] = []
+ if filters.get("symbol"):
+ alert_where.append("symbol = ?")
+ alert_params.append(filters["symbol"].upper())
+ if filters.get("strategy"):
+ alert_where.append("strategy = ?")
+ alert_params.append(filters["strategy"])
+ if filters.get("status"):
+ alert_where.append("status = ?")
+ alert_params.append(filters["status"])
+ alert_where_sql = f"WHERE {' AND '.join(alert_where)}" if alert_where else ""
+
+ delivery_where = []
+ delivery_params: list[Any] = []
+ if filters.get("status"):
+ delivery_where.append("status = ?")
+ delivery_params.append(filters["status"])
+ if filters.get("target"):
+ delivery_where.append("target_name LIKE ?")
+ delivery_params.append(f"%{filters['target']}%")
+ delivery_where_sql = f"WHERE {' AND '.join(delivery_where)}" if delivery_where else ""
+
+ alert_total = conn.execute(
+ f"SELECT COUNT(*) AS c FROM alerts {alert_where_sql}",
+ alert_params,
+ ).fetchone()["c"]
+ delivery_total = conn.execute(
+ f"SELECT COUNT(*) AS c FROM deliveries {delivery_where_sql}",
+ delivery_params,
+ ).fetchone()["c"]
alerts = []
deliveries = []
if active_tab == "alerts":
alerts = conn.execute(
- "SELECT * FROM alerts ORDER BY id DESC LIMIT ? OFFSET ?",
- (page_size, offset),
+ f"SELECT * FROM alerts {alert_where_sql} ORDER BY id DESC LIMIT ? OFFSET ?",
+ (*alert_params, page_size, offset),
).fetchall()
else:
deliveries = conn.execute(
- "SELECT * FROM deliveries ORDER BY id DESC LIMIT ? OFFSET ?",
- (page_size, offset),
+ f"SELECT * FROM deliveries {delivery_where_sql} ORDER BY id DESC LIMIT ? OFFSET ?",
+ (*delivery_params, page_size, offset),
).fetchall()
return {
"alerts": [dict(row) for row in alerts],
@@ -471,14 +615,20 @@ class Handler(BaseHTTPRequestHandler):
"alerts": conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"],
"rules": conn.execute("SELECT COUNT(*) AS c FROM routing_rules").fetchone()["c"],
"targets": conn.execute("SELECT COUNT(*) AS c FROM webhook_targets").fetchone()["c"],
- "pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status IN ('pending','retry')").fetchone()["c"],
+ "pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'pending'").fetchone()["c"],
+ "processing": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'processing'").fetchone()["c"],
+ "retry": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'retry'").fetchone()["c"],
+ "failed": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'failed'").fetchone()["c"],
}
recent = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 8").fetchall()
cards = "".join(f'{label}{value}
' for label, value in [
("Alerts", counts["alerts"]),
("Rules", counts["rules"]),
("Targets", counts["targets"]),
- ("Pending", counts["pending"]),
+ ("待发送", counts["pending"]),
+ ("发送中", counts["processing"]),
+ ("重试中", counts["retry"]),
+ ("失败", counts["failed"]),
])
rows = "".join(
f"| {row['id']} | {html.escape(row['symbol'])} | {html.escape(row['timeframe'])} | {html.escape(row['strategy'])} | {html.escape(row['status'])} | {format_display_time(self.context.settings, row['created_at'])} |
"
@@ -709,12 +859,18 @@ class Handler(BaseHTTPRequestHandler):
if active_tab not in {"alerts", "deliveries"}:
active_tab = "alerts"
page = parse_positive_int(query.get("page", ["1"])[-1])
+ filters = {
+ "symbol": filter_value(query, "symbol"),
+ "strategy": filter_value(query, "strategy"),
+ "status": filter_value(query, "status"),
+ "target": filter_value(query, "target"),
+ }
total_key = "alert_total" if active_tab == "alerts" else "delivery_total"
- logs = self.list_logs(active_tab=active_tab, page=page)
+ logs = self.list_logs(active_tab=active_tab, page=page, filters=filters)
total_pages = max(1, (logs[total_key] + LOG_PAGE_SIZE - 1) // LOG_PAGE_SIZE)
if page > total_pages:
page = total_pages
- logs = self.list_logs(active_tab=active_tab, page=page)
+ logs = self.list_logs(active_tab=active_tab, page=page, filters=filters)
alert_rows = ""
for row in logs["alerts"]:
try:
@@ -733,19 +889,19 @@ class Handler(BaseHTTPRequestHandler):
查看{html.escape(raw_payload)} |
"""
delivery_rows = "".join(
- f"| {row['id']} | {row['alert_id']} | {html.escape(row['target_name'])} | {html.escape(row['status'])} | {row['attempts']} | {html.escape(str(row['response_code'] or ''))} | {format_display_time(self.context.settings, row['last_attempt_at'])} | {html.escape(row['error'] or '')} | {format_display_time(self.context.settings, row['next_attempt_at'])} |
"
+ f"""| {row['id']} | {row['alert_id']} | {html.escape(row['target_name'])} | {html.escape(row['status'])} | {row['attempts']} | {html.escape(str(row['response_code'] or ''))} | {format_display_time(self.context.settings, row['last_attempt_at'])} | {html.escape(row['error'] or '')} | {format_display_time(self.context.settings, row['next_attempt_at'])} | |
"""
for row in logs["deliveries"]
)
alert_empty = '| 暂无 Alert 日志 |
'
- delivery_empty = '| 暂无分发日志 |
'
+ delivery_empty = '| 暂无分发日志 |
'
alert_active = " active" if active_tab == "alerts" else ""
delivery_active = " active" if active_tab == "deliveries" else ""
active_table = (
f"""| ID | 品种 | 周期 | 策略 | 状态 | 错误 | 时间 | 原始 Alert |
{alert_rows or alert_empty}
-{render_pagination("/logs", "alerts", page, logs["alert_total"], LOG_PAGE_SIZE)}"""
+{render_pagination("/logs", "alerts", page, logs["alert_total"], LOG_PAGE_SIZE, filters)}"""
if active_tab == "alerts"
- else f"""| ID | Alert | 目标 | 状态 | 次数 | HTTP | 发送时间 | 错误 | 下次重试 |
{delivery_rows or delivery_empty}
-{render_pagination("/logs", "deliveries", page, logs["delivery_total"], LOG_PAGE_SIZE)}"""
+ else f"""| ID | Alert | 目标 | 状态 | 次数 | HTTP | 发送时间 | 错误 | 下次重试 | 操作 |
{delivery_rows or delivery_empty}
+{render_pagination("/logs", "deliveries", page, logs["delivery_total"], LOG_PAGE_SIZE, filters)}"""
)
timezone_label = html.escape(display_timezone_label(self.context.settings))
body = f"""