diff --git a/.env.example b/.env.example index e8d8222..4f52b7c 100644 --- a/.env.example +++ b/.env.example @@ -16,4 +16,7 @@ RETENTION_DAYS=30 MAX_DELIVERY_ATTEMPTS=3 RETRY_BACKOFF_SECONDS=60 FEISHU_TIMEOUT_SECONDS=10 +DISPATCH_INLINE=false +DELIVERY_BATCH_SIZE=100 +DELIVERY_CONCURRENCY=5 WORKER_INTERVAL_SECONDS=15 diff --git a/app/config.py b/app/config.py index d8f685d..fb8907f 100644 --- a/app/config.py +++ b/app/config.py @@ -19,6 +19,10 @@ class Settings: retry_backoff_seconds: int = 60 feishu_timeout_seconds: int = 10 timezone: str = "" + dispatch_inline: bool = False + delivery_batch_size: int = 100 + delivery_concurrency: int = 5 + worker_interval_seconds: int = 15 def get_settings() -> Settings: @@ -35,4 +39,8 @@ def get_settings() -> Settings: retry_backoff_seconds=int(os.getenv("RETRY_BACKOFF_SECONDS", "60")), feishu_timeout_seconds=int(os.getenv("FEISHU_TIMEOUT_SECONDS", "10")), timezone=os.getenv("APP_TIMEZONE") or os.getenv("TZ", ""), + dispatch_inline=os.getenv("DISPATCH_INLINE", "").lower() in {"1", "true", "yes", "on"}, + delivery_batch_size=int(os.getenv("DELIVERY_BATCH_SIZE", "100")), + delivery_concurrency=int(os.getenv("DELIVERY_CONCURRENCY", "5")), + worker_interval_seconds=int(os.getenv("WORKER_INTERVAL_SECONDS", "15")), ) diff --git a/app/db.py b/app/db.py index fde24a3..0c9b6ec 100644 --- a/app/db.py +++ b/app/db.py @@ -91,6 +91,12 @@ class Database: updated_at TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS app_state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_rules_match ON routing_rules(enabled, timeframe, symbol, strategy, priority); diff --git a/app/dispatcher.py b/app/dispatcher.py index b8cd415..c8d1bb0 100644 --- a/app/dispatcher.py +++ b/app/dispatcher.py @@ -2,8 +2,10 @@ from __future__ import annotations import json import re +import socket import urllib.error import urllib.request +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta, timezone from typing import Any @@ -130,13 +132,43 @@ class Dispatcher: ).fetchone() return dict(row) if row else None + def explain_unmatched_rule(self, alert: dict[str, Any]) -> str: + normalized = normalize_alert(alert) + with self.db.connect() as conn: + candidates = conn.execute( + """ + SELECT id, name, timeframe, symbol, strategy, enabled, priority + FROM routing_rules + ORDER BY enabled DESC, priority ASC, id ASC + LIMIT 8 + """ + ).fetchall() + if not candidates: + return "No routing rules configured." + + details = [] + for row in candidates: + mismatches = [] + if not row["enabled"]: + mismatches.append("rule disabled") + if row["timeframe"] and row["timeframe"] != normalized["timeframe"]: + mismatches.append(f"timeframe {normalized['timeframe'] or '-'} != {row['timeframe']}") + if row["symbol"] and row["symbol"].upper() != normalized["symbol"]: + mismatches.append(f"symbol {normalized['symbol'] or '-'} != {row['symbol']}") + if row["strategy"] and row["strategy"] != normalized["strategy"]: + mismatches.append(f"strategy {normalized['strategy'] or '-'} != {row['strategy']}") + reason = "; ".join(mismatches) if mismatches else "matched fields but was not selected" + details.append(f"#{row['id']} {row['name']}: {reason}") + return "No enabled routing rule matched this alert. Closest rules: " + " | ".join(details) + def receive_alert(self, payload: dict[str, Any]) -> dict[str, Any]: alert = normalize_alert(payload) created_at = now_iso() with self.db.connect() as conn: rule = self.find_matching_rule(alert) - status = "matched" if rule else "unmatched" + status = "queued" if rule else "unmatched" + error = None if rule else self.explain_unmatched_rule(alert) cur = conn.execute( """ INSERT INTO alerts ( @@ -154,7 +186,7 @@ class Dispatcher: to_json(alert), rule["id"] if rule else None, status, - None if rule else "No enabled routing rule matched this alert.", + error, created_at, ), ) @@ -184,7 +216,7 @@ class Dispatcher: target["id"], target["name"], target["webhook_url"], - created_at, + None, created_at, created_at, ), @@ -197,7 +229,11 @@ class Dispatcher: ("unmatched", "Matched rule has no webhook targets.", alert_id), ) - self.process_due_deliveries() + if self.settings.dispatch_inline: + self.process_due_deliveries( + limit=self.settings.delivery_batch_size, + concurrency=self.settings.delivery_concurrency, + ) return { "alert_id": alert_id, "status": status, @@ -205,9 +241,21 @@ class Dispatcher: "delivery_ids": delivery_ids, } - def process_due_deliveries(self, limit: int = 25) -> int: + def process_due_deliveries(self, limit: int = 25, concurrency: int = 1) -> int: now = now_iso() + stale_cutoff = (datetime.now(UTC) - timedelta(seconds=max(300, self.settings.retry_backoff_seconds))).replace( + microsecond=0 + ).isoformat() with self.db.connect() as conn: + conn.execute( + """ + UPDATE deliveries + SET status = 'retry', next_attempt_at = ?, updated_at = ?, + error = COALESCE(error, 'worker_recovered: stale processing task reset') + WHERE status = 'processing' AND updated_at <= ? + """, + (now, now, stale_cutoff), + ) rows = conn.execute( """ SELECT d.*, a.payload, r.card_title_template, r.card_body_template @@ -220,15 +268,84 @@ class Dispatcher: LIMIT ? """, (now, limit), - ).fetchall() + ).fetchall() + delivery_ids = [row["id"] for row in rows] + if delivery_ids: + placeholders = ",".join("?" for _ in delivery_ids) + conn.execute( + f""" + UPDATE deliveries + SET status = 'processing', updated_at = ? + WHERE id IN ({placeholders}) + AND status IN ('pending', 'retry') + """, + (now, *delivery_ids), + ) - processed = 0 - for row in rows: - delivery = dict(row) - payload = from_json(delivery["payload"], {}) - self._send_delivery(delivery, payload) - processed += 1 - return processed + jobs = [(dict(row), from_json(row["payload"], {})) for row in rows] + if not jobs: + return 0 + worker_count = max(1, min(concurrency, len(jobs))) + if worker_count == 1: + for delivery, payload in jobs: + self._send_delivery(delivery, payload) + else: + with ThreadPoolExecutor(max_workers=worker_count) as executor: + futures = [executor.submit(self._send_delivery, delivery, payload) for delivery, payload in jobs] + for future in futures: + future.result() + return len(jobs) + + def process_delivery_by_id(self, delivery_id: int) -> bool: + now = now_iso() + with self.db.connect() as conn: + row = conn.execute( + """ + SELECT d.*, a.payload, r.card_title_template, r.card_body_template + FROM deliveries d + JOIN alerts a ON a.id = d.alert_id + LEFT JOIN routing_rules r ON r.id = d.rule_id + WHERE d.id = ? + """, + (delivery_id,), + ).fetchone() + if not row or row["status"] == "sent": + return False + conn.execute( + """ + UPDATE deliveries + SET status = 'processing', next_attempt_at = NULL, updated_at = ? + WHERE id = ? + """, + (now, delivery_id), + ) + delivery = dict(row) + payload = from_json(delivery["payload"], {}) + self._send_delivery(delivery, payload) + return True + + def retry_delivery_now(self, delivery_id: int) -> bool: + now = now_iso() + with self.db.connect() as conn: + row = conn.execute( + """ + SELECT status + FROM deliveries + WHERE id = ? + """, + (delivery_id,), + ).fetchone() + if not row or row["status"] == "sent": + return False + conn.execute( + """ + UPDATE deliveries + SET status = 'pending', next_attempt_at = ?, updated_at = ? + WHERE id = ? + """, + (now, now, delivery_id), + ) + return self.process_delivery_by_id(delivery_id) def _send_delivery(self, delivery: dict[str, Any], alert: dict[str, Any]) -> None: attempts = int(delivery["attempts"]) + 1 @@ -256,10 +373,16 @@ class Dispatcher: response_code = exc.code response_body = exc.read(2048).decode(errors="replace") status = "failed" - error = f"Feishu webhook returned HTTP {exc.code}" + error = f"http_error: Feishu webhook returned HTTP {exc.code}" + except (TimeoutError, socket.timeout) as exc: + status = "failed" + error = f"timeout: {exc}" + except urllib.error.URLError as exc: + status = "failed" + error = f"network_error: {exc.reason}" except Exception as exc: status = "failed" - error = str(exc) + error = f"send_error: {exc}" next_attempt_at = None if status == "failed" and attempts < self.settings.max_delivery_attempts: diff --git a/app/server.py b/app/server.py index 4cbcfdd..346e69a 100644 --- a/app/server.py +++ b/app/server.py @@ -123,18 +123,30 @@ def page_window(current: int, total_pages: int) -> list[int]: return list(range(start, end + 1)) -def render_pagination(base_path: str, active_tab: str, page: int, total: int, page_size: int) -> str: +def render_pagination( + base_path: str, + active_tab: str, + page: int, + total: int, + page_size: int, + filters: dict[str, str] | None = None, +) -> str: total_pages = max(1, (total + page_size - 1) // page_size) if total_pages <= 1: return f'' + filters = filters or {} def item(label: str, target_page: int, disabled: bool = False, current: bool = False) -> str: if disabled: return f'{html.escape(label)}' active_class = " current" if current else "" + query = {"tab": active_tab, "page": str(target_page), **filters} + query_string = "&".join( + f"{html.escape(key)}={html.escape(value)}" for key, value in query.items() if value + ) return ( f'{html.escape(label)}' + f'href="{base_path}?{query_string}">{html.escape(label)}' ) links = [ @@ -177,6 +189,61 @@ def format_display_time(settings: Settings, value: str | None) -> str: return parsed.astimezone(display_timezone(settings)).strftime("%Y-%m-%d %H:%M:%S") +def filter_value(query: dict[str, list[str]], key: str) -> str: + return query.get(key, [""])[-1].strip() + + +def status_select(name: str, selected: str, options: list[tuple[str, str]]) -> str: + items = [''] + for value, label in options: + checked = " selected" if selected == value else "" + items.append(f'') + return f'' + + +def log_filter_form(active_tab: str, filters: dict[str, str]) -> str: + symbol = html.escape(filters.get("symbol", "")) + strategy = html.escape(filters.get("strategy", "")) + status = filters.get("status", "") + target = html.escape(filters.get("target", "")) + if active_tab == "alerts": + alert_status = status_select( + "status", + status, + [ + ("queued", "已入队"), + ("unmatched", "未命中"), + ("partial", "部分完成"), + ("delivered", "已送达"), + ], + ) + fields = f""" + + +""" + else: + delivery_status = status_select( + "status", + status, + [ + ("pending", "待发送"), + ("processing", "发送中"), + ("retry", "重试中"), + ("failed", "失败"), + ("sent", "已发送"), + ], + ) + fields = f""" + +""" + return f"""
+ + +{fields} +
重置
+
""" + + class Handler(BaseHTTPRequestHandler): context: AppContext @@ -186,7 +253,7 @@ class Handler(BaseHTTPRequestHandler): def do_GET(self) -> None: parsed = urlparse(self.path) if parsed.path == "/health": - json_response(self, 200, {"ok": True}) + self.handle_health() return if parsed.path == "/login": self.render_login() @@ -257,6 +324,7 @@ class Handler(BaseHTTPRequestHandler): "/test/send": self.send_test, "/account/password": self.change_password, "/deliveries/retry": self.retry_deliveries, + "/deliveries/retry-one": self.retry_delivery_one, "/logout": self.logout, } handler = routes.get(parsed.path) @@ -378,6 +446,51 @@ class Handler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(content) + def handle_health(self) -> None: + now = datetime.now(timezone.utc) + try: + with self.context.db.connect() as conn: + conn.execute("SELECT 1").fetchone() + counts = { + "pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'pending'").fetchone()["c"], + "retry": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'retry'").fetchone()["c"], + "processing": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'processing'").fetchone()["c"], + "failed": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'failed'").fetchone()["c"], + } + worker_row = conn.execute( + "SELECT value FROM app_state WHERE key = 'worker.last_seen_at'" + ).fetchone() + except Exception as exc: + json_response(self, 503, {"ok": False, "database": "error", "error": str(exc)}) + return + + worker_last_seen = worker_row["value"] if worker_row else None + worker_fresh = False + if worker_last_seen: + try: + parsed = datetime.fromisoformat(worker_last_seen) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + max_age = max(60, self.context.settings.worker_interval_seconds * 3) + worker_fresh = (now - parsed.astimezone(timezone.utc)).total_seconds() <= max_age + except ValueError: + worker_fresh = False + json_response( + self, + 200, + { + "ok": True, + "database": "ok", + "queue": counts, + "worker": { + "last_seen_at": worker_last_seen, + "fresh": worker_fresh, + "interval_seconds": self.context.settings.worker_interval_seconds, + }, + "dispatch_inline": self.context.settings.dispatch_inline, + }, + ) + def handle_tradingview_webhook(self) -> None: if self.context.settings.webhook_token: query = parse_qs(urlparse(self.path).query) @@ -417,22 +530,53 @@ class Handler(BaseHTTPRequestHandler): active_tab: str = "alerts", page: int = 1, page_size: int = LOG_PAGE_SIZE, + filters: dict[str, str] | None = None, ) -> dict[str, Any]: + filters = filters or {} offset = (page - 1) * page_size with self.context.db.connect() as conn: - alert_total = conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"] - delivery_total = conn.execute("SELECT COUNT(*) AS c FROM deliveries").fetchone()["c"] + alert_where = [] + alert_params: list[Any] = [] + if filters.get("symbol"): + alert_where.append("symbol = ?") + alert_params.append(filters["symbol"].upper()) + if filters.get("strategy"): + alert_where.append("strategy = ?") + alert_params.append(filters["strategy"]) + if filters.get("status"): + alert_where.append("status = ?") + alert_params.append(filters["status"]) + alert_where_sql = f"WHERE {' AND '.join(alert_where)}" if alert_where else "" + + delivery_where = [] + delivery_params: list[Any] = [] + if filters.get("status"): + delivery_where.append("status = ?") + delivery_params.append(filters["status"]) + if filters.get("target"): + delivery_where.append("target_name LIKE ?") + delivery_params.append(f"%{filters['target']}%") + delivery_where_sql = f"WHERE {' AND '.join(delivery_where)}" if delivery_where else "" + + alert_total = conn.execute( + f"SELECT COUNT(*) AS c FROM alerts {alert_where_sql}", + alert_params, + ).fetchone()["c"] + delivery_total = conn.execute( + f"SELECT COUNT(*) AS c FROM deliveries {delivery_where_sql}", + delivery_params, + ).fetchone()["c"] alerts = [] deliveries = [] if active_tab == "alerts": alerts = conn.execute( - "SELECT * FROM alerts ORDER BY id DESC LIMIT ? OFFSET ?", - (page_size, offset), + f"SELECT * FROM alerts {alert_where_sql} ORDER BY id DESC LIMIT ? OFFSET ?", + (*alert_params, page_size, offset), ).fetchall() else: deliveries = conn.execute( - "SELECT * FROM deliveries ORDER BY id DESC LIMIT ? OFFSET ?", - (page_size, offset), + f"SELECT * FROM deliveries {delivery_where_sql} ORDER BY id DESC LIMIT ? OFFSET ?", + (*delivery_params, page_size, offset), ).fetchall() return { "alerts": [dict(row) for row in alerts], @@ -471,14 +615,20 @@ class Handler(BaseHTTPRequestHandler): "alerts": conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"], "rules": conn.execute("SELECT COUNT(*) AS c FROM routing_rules").fetchone()["c"], "targets": conn.execute("SELECT COUNT(*) AS c FROM webhook_targets").fetchone()["c"], - "pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status IN ('pending','retry')").fetchone()["c"], + "pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'pending'").fetchone()["c"], + "processing": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'processing'").fetchone()["c"], + "retry": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'retry'").fetchone()["c"], + "failed": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'failed'").fetchone()["c"], } recent = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 8").fetchall() cards = "".join(f'
{label}{value}
' for label, value in [ ("Alerts", counts["alerts"]), ("Rules", counts["rules"]), ("Targets", counts["targets"]), - ("Pending", counts["pending"]), + ("待发送", counts["pending"]), + ("发送中", counts["processing"]), + ("重试中", counts["retry"]), + ("失败", counts["failed"]), ]) rows = "".join( f"{row['id']}{html.escape(row['symbol'])}{html.escape(row['timeframe'])}{html.escape(row['strategy'])}{html.escape(row['status'])}{format_display_time(self.context.settings, row['created_at'])}" @@ -709,12 +859,18 @@ class Handler(BaseHTTPRequestHandler): if active_tab not in {"alerts", "deliveries"}: active_tab = "alerts" page = parse_positive_int(query.get("page", ["1"])[-1]) + filters = { + "symbol": filter_value(query, "symbol"), + "strategy": filter_value(query, "strategy"), + "status": filter_value(query, "status"), + "target": filter_value(query, "target"), + } total_key = "alert_total" if active_tab == "alerts" else "delivery_total" - logs = self.list_logs(active_tab=active_tab, page=page) + logs = self.list_logs(active_tab=active_tab, page=page, filters=filters) total_pages = max(1, (logs[total_key] + LOG_PAGE_SIZE - 1) // LOG_PAGE_SIZE) if page > total_pages: page = total_pages - logs = self.list_logs(active_tab=active_tab, page=page) + logs = self.list_logs(active_tab=active_tab, page=page, filters=filters) alert_rows = "" for row in logs["alerts"]: try: @@ -733,19 +889,19 @@ class Handler(BaseHTTPRequestHandler):
查看
{html.escape(raw_payload)}
""" delivery_rows = "".join( - f"{row['id']}{row['alert_id']}{html.escape(row['target_name'])}{html.escape(row['status'])}{row['attempts']}{html.escape(str(row['response_code'] or ''))}{format_display_time(self.context.settings, row['last_attempt_at'])}{html.escape(row['error'] or '')}{format_display_time(self.context.settings, row['next_attempt_at'])}" + f"""{row['id']}{row['alert_id']}{html.escape(row['target_name'])}{html.escape(row['status'])}{row['attempts']}{html.escape(str(row['response_code'] or ''))}{format_display_time(self.context.settings, row['last_attempt_at'])}{html.escape(row['error'] or '')}{format_display_time(self.context.settings, row['next_attempt_at'])}
""" for row in logs["deliveries"] ) alert_empty = '暂无 Alert 日志' - delivery_empty = '暂无分发日志' + delivery_empty = '暂无分发日志' alert_active = " active" if active_tab == "alerts" else "" delivery_active = " active" if active_tab == "deliveries" else "" active_table = ( f"""{alert_rows or alert_empty}
ID品种周期策略状态错误时间原始 Alert
-{render_pagination("/logs", "alerts", page, logs["alert_total"], LOG_PAGE_SIZE)}""" +{render_pagination("/logs", "alerts", page, logs["alert_total"], LOG_PAGE_SIZE, filters)}""" if active_tab == "alerts" - else f"""{delivery_rows or delivery_empty}
IDAlert目标状态次数HTTP发送时间错误下次重试
-{render_pagination("/logs", "deliveries", page, logs["delivery_total"], LOG_PAGE_SIZE)}""" + else f"""{delivery_rows or delivery_empty}
IDAlert目标状态次数HTTP发送时间错误下次重试操作
+{render_pagination("/logs", "deliveries", page, logs["delivery_total"], LOG_PAGE_SIZE, filters)}""" ) timezone_label = html.escape(display_timezone_label(self.context.settings)) body = f"""