This commit is contained in:
aaron 2026-05-18 20:50:16 +08:00
parent 027970de29
commit e86c528013
9 changed files with 433 additions and 43 deletions

View File

@ -16,4 +16,7 @@ RETENTION_DAYS=30
MAX_DELIVERY_ATTEMPTS=3
RETRY_BACKOFF_SECONDS=60
FEISHU_TIMEOUT_SECONDS=10
DISPATCH_INLINE=false
DELIVERY_BATCH_SIZE=100
DELIVERY_CONCURRENCY=5
WORKER_INTERVAL_SECONDS=15

View File

@ -19,6 +19,10 @@ class Settings:
retry_backoff_seconds: int = 60
feishu_timeout_seconds: int = 10
timezone: str = ""
dispatch_inline: bool = False
delivery_batch_size: int = 100
delivery_concurrency: int = 5
worker_interval_seconds: int = 15
def get_settings() -> Settings:
@ -35,4 +39,8 @@ def get_settings() -> Settings:
retry_backoff_seconds=int(os.getenv("RETRY_BACKOFF_SECONDS", "60")),
feishu_timeout_seconds=int(os.getenv("FEISHU_TIMEOUT_SECONDS", "10")),
timezone=os.getenv("APP_TIMEZONE") or os.getenv("TZ", ""),
dispatch_inline=os.getenv("DISPATCH_INLINE", "").lower() in {"1", "true", "yes", "on"},
delivery_batch_size=int(os.getenv("DELIVERY_BATCH_SIZE", "100")),
delivery_concurrency=int(os.getenv("DELIVERY_CONCURRENCY", "5")),
worker_interval_seconds=int(os.getenv("WORKER_INTERVAL_SECONDS", "15")),
)

View File

@ -91,6 +91,12 @@ class Database:
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS app_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_rules_match
ON routing_rules(enabled, timeframe, symbol, strategy, priority);

View File

@ -2,8 +2,10 @@ from __future__ import annotations
import json
import re
import socket
import urllib.error
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from typing import Any
@ -130,13 +132,43 @@ class Dispatcher:
).fetchone()
return dict(row) if row else None
def explain_unmatched_rule(self, alert: dict[str, Any]) -> str:
normalized = normalize_alert(alert)
with self.db.connect() as conn:
candidates = conn.execute(
"""
SELECT id, name, timeframe, symbol, strategy, enabled, priority
FROM routing_rules
ORDER BY enabled DESC, priority ASC, id ASC
LIMIT 8
"""
).fetchall()
if not candidates:
return "No routing rules configured."
details = []
for row in candidates:
mismatches = []
if not row["enabled"]:
mismatches.append("rule disabled")
if row["timeframe"] and row["timeframe"] != normalized["timeframe"]:
mismatches.append(f"timeframe {normalized['timeframe'] or '-'} != {row['timeframe']}")
if row["symbol"] and row["symbol"].upper() != normalized["symbol"]:
mismatches.append(f"symbol {normalized['symbol'] or '-'} != {row['symbol']}")
if row["strategy"] and row["strategy"] != normalized["strategy"]:
mismatches.append(f"strategy {normalized['strategy'] or '-'} != {row['strategy']}")
reason = "; ".join(mismatches) if mismatches else "matched fields but was not selected"
details.append(f"#{row['id']} {row['name']}: {reason}")
return "No enabled routing rule matched this alert. Closest rules: " + " | ".join(details)
def receive_alert(self, payload: dict[str, Any]) -> dict[str, Any]:
alert = normalize_alert(payload)
created_at = now_iso()
with self.db.connect() as conn:
rule = self.find_matching_rule(alert)
status = "matched" if rule else "unmatched"
status = "queued" if rule else "unmatched"
error = None if rule else self.explain_unmatched_rule(alert)
cur = conn.execute(
"""
INSERT INTO alerts (
@ -154,7 +186,7 @@ class Dispatcher:
to_json(alert),
rule["id"] if rule else None,
status,
None if rule else "No enabled routing rule matched this alert.",
error,
created_at,
),
)
@ -184,7 +216,7 @@ class Dispatcher:
target["id"],
target["name"],
target["webhook_url"],
created_at,
None,
created_at,
created_at,
),
@ -197,7 +229,11 @@ class Dispatcher:
("unmatched", "Matched rule has no webhook targets.", alert_id),
)
self.process_due_deliveries()
if self.settings.dispatch_inline:
self.process_due_deliveries(
limit=self.settings.delivery_batch_size,
concurrency=self.settings.delivery_concurrency,
)
return {
"alert_id": alert_id,
"status": status,
@ -205,9 +241,21 @@ class Dispatcher:
"delivery_ids": delivery_ids,
}
def process_due_deliveries(self, limit: int = 25) -> int:
def process_due_deliveries(self, limit: int = 25, concurrency: int = 1) -> int:
now = now_iso()
stale_cutoff = (datetime.now(UTC) - timedelta(seconds=max(300, self.settings.retry_backoff_seconds))).replace(
microsecond=0
).isoformat()
with self.db.connect() as conn:
conn.execute(
"""
UPDATE deliveries
SET status = 'retry', next_attempt_at = ?, updated_at = ?,
error = COALESCE(error, 'worker_recovered: stale processing task reset')
WHERE status = 'processing' AND updated_at <= ?
""",
(now, now, stale_cutoff),
)
rows = conn.execute(
"""
SELECT d.*, a.payload, r.card_title_template, r.card_body_template
@ -220,15 +268,84 @@ class Dispatcher:
LIMIT ?
""",
(now, limit),
).fetchall()
).fetchall()
delivery_ids = [row["id"] for row in rows]
if delivery_ids:
placeholders = ",".join("?" for _ in delivery_ids)
conn.execute(
f"""
UPDATE deliveries
SET status = 'processing', updated_at = ?
WHERE id IN ({placeholders})
AND status IN ('pending', 'retry')
""",
(now, *delivery_ids),
)
processed = 0
for row in rows:
delivery = dict(row)
payload = from_json(delivery["payload"], {})
self._send_delivery(delivery, payload)
processed += 1
return processed
jobs = [(dict(row), from_json(row["payload"], {})) for row in rows]
if not jobs:
return 0
worker_count = max(1, min(concurrency, len(jobs)))
if worker_count == 1:
for delivery, payload in jobs:
self._send_delivery(delivery, payload)
else:
with ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = [executor.submit(self._send_delivery, delivery, payload) for delivery, payload in jobs]
for future in futures:
future.result()
return len(jobs)
def process_delivery_by_id(self, delivery_id: int) -> bool:
now = now_iso()
with self.db.connect() as conn:
row = conn.execute(
"""
SELECT d.*, a.payload, r.card_title_template, r.card_body_template
FROM deliveries d
JOIN alerts a ON a.id = d.alert_id
LEFT JOIN routing_rules r ON r.id = d.rule_id
WHERE d.id = ?
""",
(delivery_id,),
).fetchone()
if not row or row["status"] == "sent":
return False
conn.execute(
"""
UPDATE deliveries
SET status = 'processing', next_attempt_at = NULL, updated_at = ?
WHERE id = ?
""",
(now, delivery_id),
)
delivery = dict(row)
payload = from_json(delivery["payload"], {})
self._send_delivery(delivery, payload)
return True
def retry_delivery_now(self, delivery_id: int) -> bool:
now = now_iso()
with self.db.connect() as conn:
row = conn.execute(
"""
SELECT status
FROM deliveries
WHERE id = ?
""",
(delivery_id,),
).fetchone()
if not row or row["status"] == "sent":
return False
conn.execute(
"""
UPDATE deliveries
SET status = 'pending', next_attempt_at = ?, updated_at = ?
WHERE id = ?
""",
(now, now, delivery_id),
)
return self.process_delivery_by_id(delivery_id)
def _send_delivery(self, delivery: dict[str, Any], alert: dict[str, Any]) -> None:
attempts = int(delivery["attempts"]) + 1
@ -256,10 +373,16 @@ class Dispatcher:
response_code = exc.code
response_body = exc.read(2048).decode(errors="replace")
status = "failed"
error = f"Feishu webhook returned HTTP {exc.code}"
error = f"http_error: Feishu webhook returned HTTP {exc.code}"
except (TimeoutError, socket.timeout) as exc:
status = "failed"
error = f"timeout: {exc}"
except urllib.error.URLError as exc:
status = "failed"
error = f"network_error: {exc.reason}"
except Exception as exc:
status = "failed"
error = str(exc)
error = f"send_error: {exc}"
next_attempt_at = None
if status == "failed" and attempts < self.settings.max_delivery_attempts:

View File

@ -123,18 +123,30 @@ def page_window(current: int, total_pages: int) -> list[int]:
return list(range(start, end + 1))
def render_pagination(base_path: str, active_tab: str, page: int, total: int, page_size: int) -> str:
def render_pagination(
base_path: str,
active_tab: str,
page: int,
total: int,
page_size: int,
filters: dict[str, str] | None = None,
) -> str:
total_pages = max(1, (total + page_size - 1) // page_size)
if total_pages <= 1:
return f'<div class="pagination muted-note">共 {total} 条</div>'
filters = filters or {}
def item(label: str, target_page: int, disabled: bool = False, current: bool = False) -> str:
if disabled:
return f'<span class="page-link disabled">{html.escape(label)}</span>'
active_class = " current" if current else ""
query = {"tab": active_tab, "page": str(target_page), **filters}
query_string = "&".join(
f"{html.escape(key)}={html.escape(value)}" for key, value in query.items() if value
)
return (
f'<a class="page-link{active_class}" '
f'href="{base_path}?tab={active_tab}&page={target_page}">{html.escape(label)}</a>'
f'href="{base_path}?{query_string}">{html.escape(label)}</a>'
)
links = [
@ -177,6 +189,61 @@ def format_display_time(settings: Settings, value: str | None) -> str:
return parsed.astimezone(display_timezone(settings)).strftime("%Y-%m-%d %H:%M:%S")
def filter_value(query: dict[str, list[str]], key: str) -> str:
return query.get(key, [""])[-1].strip()
def status_select(name: str, selected: str, options: list[tuple[str, str]]) -> str:
items = ['<option value="">全部</option>']
for value, label in options:
checked = " selected" if selected == value else ""
items.append(f'<option value="{html.escape(value)}"{checked}>{html.escape(label)}</option>')
return f'<select name="{html.escape(name)}">{"".join(items)}</select>'
def log_filter_form(active_tab: str, filters: dict[str, str]) -> str:
symbol = html.escape(filters.get("symbol", ""))
strategy = html.escape(filters.get("strategy", ""))
status = filters.get("status", "")
target = html.escape(filters.get("target", ""))
if active_tab == "alerts":
alert_status = status_select(
"status",
status,
[
("queued", "已入队"),
("unmatched", "未命中"),
("partial", "部分完成"),
("delivered", "已送达"),
],
)
fields = f"""
<label>品种<input name="symbol" value="{symbol}" placeholder="GOLD"></label>
<label>策略<input name="strategy" value="{strategy}" placeholder="supply_demand"></label>
<label>状态{alert_status}</label>"""
else:
delivery_status = status_select(
"status",
status,
[
("pending", "待发送"),
("processing", "发送中"),
("retry", "重试中"),
("failed", "失败"),
("sent", "已发送"),
],
)
fields = f"""
<label>目标<input name="target" value="{target}" placeholder="Webhook 名称"></label>
<label>状态{delivery_status}</label>"""
return f"""<form class="panel log-filter" method="get" action="/logs">
<input type="hidden" name="tab" value="{html.escape(active_tab)}">
<input type="hidden" name="page" value="1">
{fields}
<div class="actions"><button type="submit">筛选</button><a class="button-link secondary" href="/logs?tab={html.escape(active_tab)}&page=1">重置</a></div>
</form>"""
class Handler(BaseHTTPRequestHandler):
context: AppContext
@ -186,7 +253,7 @@ class Handler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/health":
json_response(self, 200, {"ok": True})
self.handle_health()
return
if parsed.path == "/login":
self.render_login()
@ -257,6 +324,7 @@ class Handler(BaseHTTPRequestHandler):
"/test/send": self.send_test,
"/account/password": self.change_password,
"/deliveries/retry": self.retry_deliveries,
"/deliveries/retry-one": self.retry_delivery_one,
"/logout": self.logout,
}
handler = routes.get(parsed.path)
@ -378,6 +446,51 @@ class Handler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(content)
def handle_health(self) -> None:
now = datetime.now(timezone.utc)
try:
with self.context.db.connect() as conn:
conn.execute("SELECT 1").fetchone()
counts = {
"pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'pending'").fetchone()["c"],
"retry": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'retry'").fetchone()["c"],
"processing": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'processing'").fetchone()["c"],
"failed": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'failed'").fetchone()["c"],
}
worker_row = conn.execute(
"SELECT value FROM app_state WHERE key = 'worker.last_seen_at'"
).fetchone()
except Exception as exc:
json_response(self, 503, {"ok": False, "database": "error", "error": str(exc)})
return
worker_last_seen = worker_row["value"] if worker_row else None
worker_fresh = False
if worker_last_seen:
try:
parsed = datetime.fromisoformat(worker_last_seen)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
max_age = max(60, self.context.settings.worker_interval_seconds * 3)
worker_fresh = (now - parsed.astimezone(timezone.utc)).total_seconds() <= max_age
except ValueError:
worker_fresh = False
json_response(
self,
200,
{
"ok": True,
"database": "ok",
"queue": counts,
"worker": {
"last_seen_at": worker_last_seen,
"fresh": worker_fresh,
"interval_seconds": self.context.settings.worker_interval_seconds,
},
"dispatch_inline": self.context.settings.dispatch_inline,
},
)
def handle_tradingview_webhook(self) -> None:
if self.context.settings.webhook_token:
query = parse_qs(urlparse(self.path).query)
@ -417,22 +530,53 @@ class Handler(BaseHTTPRequestHandler):
active_tab: str = "alerts",
page: int = 1,
page_size: int = LOG_PAGE_SIZE,
filters: dict[str, str] | None = None,
) -> dict[str, Any]:
filters = filters or {}
offset = (page - 1) * page_size
with self.context.db.connect() as conn:
alert_total = conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"]
delivery_total = conn.execute("SELECT COUNT(*) AS c FROM deliveries").fetchone()["c"]
alert_where = []
alert_params: list[Any] = []
if filters.get("symbol"):
alert_where.append("symbol = ?")
alert_params.append(filters["symbol"].upper())
if filters.get("strategy"):
alert_where.append("strategy = ?")
alert_params.append(filters["strategy"])
if filters.get("status"):
alert_where.append("status = ?")
alert_params.append(filters["status"])
alert_where_sql = f"WHERE {' AND '.join(alert_where)}" if alert_where else ""
delivery_where = []
delivery_params: list[Any] = []
if filters.get("status"):
delivery_where.append("status = ?")
delivery_params.append(filters["status"])
if filters.get("target"):
delivery_where.append("target_name LIKE ?")
delivery_params.append(f"%{filters['target']}%")
delivery_where_sql = f"WHERE {' AND '.join(delivery_where)}" if delivery_where else ""
alert_total = conn.execute(
f"SELECT COUNT(*) AS c FROM alerts {alert_where_sql}",
alert_params,
).fetchone()["c"]
delivery_total = conn.execute(
f"SELECT COUNT(*) AS c FROM deliveries {delivery_where_sql}",
delivery_params,
).fetchone()["c"]
alerts = []
deliveries = []
if active_tab == "alerts":
alerts = conn.execute(
"SELECT * FROM alerts ORDER BY id DESC LIMIT ? OFFSET ?",
(page_size, offset),
f"SELECT * FROM alerts {alert_where_sql} ORDER BY id DESC LIMIT ? OFFSET ?",
(*alert_params, page_size, offset),
).fetchall()
else:
deliveries = conn.execute(
"SELECT * FROM deliveries ORDER BY id DESC LIMIT ? OFFSET ?",
(page_size, offset),
f"SELECT * FROM deliveries {delivery_where_sql} ORDER BY id DESC LIMIT ? OFFSET ?",
(*delivery_params, page_size, offset),
).fetchall()
return {
"alerts": [dict(row) for row in alerts],
@ -471,14 +615,20 @@ class Handler(BaseHTTPRequestHandler):
"alerts": conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"],
"rules": conn.execute("SELECT COUNT(*) AS c FROM routing_rules").fetchone()["c"],
"targets": conn.execute("SELECT COUNT(*) AS c FROM webhook_targets").fetchone()["c"],
"pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status IN ('pending','retry')").fetchone()["c"],
"pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'pending'").fetchone()["c"],
"processing": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'processing'").fetchone()["c"],
"retry": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'retry'").fetchone()["c"],
"failed": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'failed'").fetchone()["c"],
}
recent = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 8").fetchall()
cards = "".join(f'<div class="metric"><span>{label}</span><strong>{value}</strong></div>' for label, value in [
("Alerts", counts["alerts"]),
("Rules", counts["rules"]),
("Targets", counts["targets"]),
("Pending", counts["pending"]),
("待发送", counts["pending"]),
("发送中", counts["processing"]),
("重试中", counts["retry"]),
("失败", counts["failed"]),
])
rows = "".join(
f"<tr><td>{row['id']}</td><td>{html.escape(row['symbol'])}</td><td>{html.escape(row['timeframe'])}</td><td>{html.escape(row['strategy'])}</td><td><span class='status'>{html.escape(row['status'])}</span></td><td>{format_display_time(self.context.settings, row['created_at'])}</td></tr>"
@ -709,12 +859,18 @@ class Handler(BaseHTTPRequestHandler):
if active_tab not in {"alerts", "deliveries"}:
active_tab = "alerts"
page = parse_positive_int(query.get("page", ["1"])[-1])
filters = {
"symbol": filter_value(query, "symbol"),
"strategy": filter_value(query, "strategy"),
"status": filter_value(query, "status"),
"target": filter_value(query, "target"),
}
total_key = "alert_total" if active_tab == "alerts" else "delivery_total"
logs = self.list_logs(active_tab=active_tab, page=page)
logs = self.list_logs(active_tab=active_tab, page=page, filters=filters)
total_pages = max(1, (logs[total_key] + LOG_PAGE_SIZE - 1) // LOG_PAGE_SIZE)
if page > total_pages:
page = total_pages
logs = self.list_logs(active_tab=active_tab, page=page)
logs = self.list_logs(active_tab=active_tab, page=page, filters=filters)
alert_rows = ""
for row in logs["alerts"]:
try:
@ -733,19 +889,19 @@ class Handler(BaseHTTPRequestHandler):
<td><details class="payload-details"><summary>查看</summary><pre>{html.escape(raw_payload)}</pre></details></td>
</tr>"""
delivery_rows = "".join(
f"<tr><td>{row['id']}</td><td>{row['alert_id']}</td><td>{html.escape(row['target_name'])}</td><td><span class='status'>{html.escape(row['status'])}</span></td><td>{row['attempts']}</td><td>{html.escape(str(row['response_code'] or ''))}</td><td>{format_display_time(self.context.settings, row['last_attempt_at'])}</td><td>{html.escape(row['error'] or '')}</td><td>{format_display_time(self.context.settings, row['next_attempt_at'])}</td></tr>"
f"""<tr><td>{row['id']}</td><td>{row['alert_id']}</td><td>{html.escape(row['target_name'])}</td><td><span class='status'>{html.escape(row['status'])}</span></td><td>{row['attempts']}</td><td>{html.escape(str(row['response_code'] or ''))}</td><td>{format_display_time(self.context.settings, row['last_attempt_at'])}</td><td>{html.escape(row['error'] or '')}</td><td>{format_display_time(self.context.settings, row['next_attempt_at'])}</td><td><form class="inline" method="post" action="/deliveries/retry-one"><input type="hidden" name="id" value="{row['id']}"><button class="small-button" type="submit" {'disabled' if row['status'] == 'sent' else ''}>立即重发</button></form></td></tr>"""
for row in logs["deliveries"]
)
alert_empty = '<tr><td colspan="8" class="empty-cell">暂无 Alert 日志</td></tr>'
delivery_empty = '<tr><td colspan="9" class="empty-cell">暂无分发日志</td></tr>'
delivery_empty = '<tr><td colspan="10" class="empty-cell">暂无分发日志</td></tr>'
alert_active = " active" if active_tab == "alerts" else ""
delivery_active = " active" if active_tab == "deliveries" else ""
active_table = (
f"""<table><thead><tr><th>ID</th><th>品种</th><th>周期</th><th>策略</th><th>状态</th><th>错误</th><th>时间</th><th>原始 Alert</th></tr></thead><tbody>{alert_rows or alert_empty}</tbody></table>
{render_pagination("/logs", "alerts", page, logs["alert_total"], LOG_PAGE_SIZE)}"""
{render_pagination("/logs", "alerts", page, logs["alert_total"], LOG_PAGE_SIZE, filters)}"""
if active_tab == "alerts"
else f"""<table><thead><tr><th>ID</th><th>Alert</th><th>目标</th><th>状态</th><th>次数</th><th>HTTP</th><th>发送时间</th><th>错误</th><th>下次重试</th></tr></thead><tbody>{delivery_rows or delivery_empty}</tbody></table>
{render_pagination("/logs", "deliveries", page, logs["delivery_total"], LOG_PAGE_SIZE)}"""
else f"""<table><thead><tr><th>ID</th><th>Alert</th><th>目标</th><th>状态</th><th>次数</th><th>HTTP</th><th>发送时间</th><th>错误</th><th>下次重试</th><th>操作</th></tr></thead><tbody>{delivery_rows or delivery_empty}</tbody></table>
{render_pagination("/logs", "deliveries", page, logs["delivery_total"], LOG_PAGE_SIZE, filters)}"""
)
timezone_label = html.escape(display_timezone_label(self.context.settings))
body = f"""<header class="page-header"><div><h1>日志</h1><p>按类型查看 Alert 和分发记录,每页 {LOG_PAGE_SIZE} 条。当前显示时区:{timezone_label}。</p></div>
@ -754,6 +910,7 @@ class Handler(BaseHTTPRequestHandler):
<a class="tab{alert_active}" href="/logs?tab=alerts&page=1">Alert 日志 <span>{logs["alert_total"]}</span></a>
<a class="tab{delivery_active}" href="/logs?tab=deliveries&page=1">分发日志 <span>{logs["delivery_total"]}</span></a>
</nav>
{log_filter_form(active_tab, filters)}
<section class="log-panel">{active_table}</section>"""
self.send_html("日志", body)
@ -982,6 +1139,7 @@ class Handler(BaseHTTPRequestHandler):
delivery_text = ", ".join(str(item) for item in result.get("delivery_ids", [])) or "-"
self._test_result_html = f"""<section class="result-panel success">
<h2>测试结果</h2>
<p>测试 alert 已入队worker 会异步发送飞书如果开启 DISPATCH_INLINE 才会在当前请求里立即发送</p>
<div class="result-grid">
<div><span>Alert ID</span><strong>{result.get("alert_id")}</strong></div>
<div><span>状态</span><strong>{html.escape(str(result.get("status")))}</strong></div>
@ -1025,9 +1183,22 @@ class Handler(BaseHTTPRequestHandler):
self.logout()
def retry_deliveries(self) -> None:
self.context.dispatcher.process_due_deliveries(limit=100)
self.context.dispatcher.process_due_deliveries(
limit=self.context.settings.delivery_batch_size,
concurrency=self.context.settings.delivery_concurrency,
)
redirect(self, "/logs")
def retry_delivery_one(self) -> None:
form = parse_form(self)
try:
delivery_id = int(form.get("id", ""))
except ValueError:
self.send_error(400, "Invalid delivery id")
return
self.context.dispatcher.retry_delivery_now(delivery_id)
redirect(self, "/logs?tab=deliveries&page=1")
def make_handler(context: AppContext) -> type[Handler]:
class BoundHandler(Handler):

View File

@ -154,7 +154,7 @@ code {
.metrics {
display: grid;
grid-template-columns: repeat(4, minmax(0, 1fr));
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: 16px;
margin-bottom: 28px;
}
@ -404,6 +404,17 @@ button:hover {
background: var(--accent-strong);
}
button:disabled {
cursor: not-allowed;
background: #b8b0a1;
}
.small-button {
padding: 7px 10px;
font-size: 12px;
white-space: nowrap;
}
.ghost {
margin-top: auto;
width: 100%;
@ -558,6 +569,21 @@ pre {
margin-bottom: 12px;
}
.log-filter {
display: grid;
grid-template-columns: repeat(4, minmax(160px, 1fr));
align-items: end;
gap: 12px;
padding: 16px;
margin-bottom: 16px;
box-shadow: none;
}
.log-filter label,
.log-filter .actions {
margin-bottom: 0;
}
.pagination {
display: flex;
flex-wrap: wrap;
@ -696,4 +722,8 @@ th {
.template-picker {
grid-template-columns: 1fr;
}
.log-filter {
grid-template-columns: 1fr;
}
}

View File

@ -1,10 +1,9 @@
from __future__ import annotations
import os
import time
from app.config import get_settings
from app.db import Database
from app.db import Database, now_iso
from app.dispatcher import Dispatcher
@ -13,10 +12,26 @@ def run() -> None:
db = Database(settings)
db.migrate(settings)
dispatcher = Dispatcher(db, settings)
interval = int(os.getenv("WORKER_INTERVAL_SECONDS", "15"))
print(f"Retry worker running every {interval}s")
interval = settings.worker_interval_seconds
print(
"Delivery worker running every "
f"{interval}s, batch={settings.delivery_batch_size}, concurrency={settings.delivery_concurrency}"
)
while True:
processed = dispatcher.process_due_deliveries(limit=100)
with db.connect() as conn:
now = now_iso()
conn.execute(
"""
INSERT INTO app_state (key, value, updated_at)
VALUES ('worker.last_seen_at', ?, ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at
""",
(now, now),
)
processed = dispatcher.process_due_deliveries(
limit=settings.delivery_batch_size,
concurrency=settings.delivery_concurrency,
)
if processed:
print(f"processed {processed} due deliveries")
time.sleep(interval)

View File

@ -12,6 +12,10 @@ services:
RETENTION_DAYS: ${RETENTION_DAYS:-30}
MAX_DELIVERY_ATTEMPTS: ${MAX_DELIVERY_ATTEMPTS:-3}
RETRY_BACKOFF_SECONDS: ${RETRY_BACKOFF_SECONDS:-60}
FEISHU_TIMEOUT_SECONDS: ${FEISHU_TIMEOUT_SECONDS:-10}
DISPATCH_INLINE: ${DISPATCH_INLINE:-false}
DELIVERY_BATCH_SIZE: ${DELIVERY_BATCH_SIZE:-100}
DELIVERY_CONCURRENCY: ${DELIVERY_CONCURRENCY:-5}
volumes:
- dispatcher-data:/data
@ -27,6 +31,10 @@ services:
RETENTION_DAYS: ${RETENTION_DAYS:-30}
MAX_DELIVERY_ATTEMPTS: ${MAX_DELIVERY_ATTEMPTS:-3}
RETRY_BACKOFF_SECONDS: ${RETRY_BACKOFF_SECONDS:-60}
FEISHU_TIMEOUT_SECONDS: ${FEISHU_TIMEOUT_SECONDS:-10}
DISPATCH_INLINE: ${DISPATCH_INLINE:-false}
DELIVERY_BATCH_SIZE: ${DELIVERY_BATCH_SIZE:-100}
DELIVERY_CONCURRENCY: ${DELIVERY_CONCURRENCY:-5}
WORKER_INTERVAL_SECONDS: ${WORKER_INTERVAL_SECONDS:-15}
volumes:
- dispatcher-data:/data

View File

@ -78,6 +78,16 @@ class DispatcherTest(unittest.TestCase):
alert = conn.execute("SELECT * FROM alerts WHERE id = ?", (result["alert_id"],)).fetchone()
self.assertEqual(alert["status"], "unmatched")
def test_unmatched_alert_records_rule_mismatch_explanation(self) -> None:
target_id = self.add_target()
self.add_rule(target_id, timeframe="5M", symbol="GOLD", strategy="supply_demand")
result = self.dispatcher.receive_alert({"timeframe": "5M", "symbol": "GLOD", "strategy": "supply_demand"})
with self.db.connect() as conn:
alert = conn.execute("SELECT * FROM alerts WHERE id = ?", (result["alert_id"],)).fetchone()
self.assertIn("symbol GLOD != GOLD", alert["error"])
def test_highest_priority_rule_wins(self) -> None:
slow_target = self.add_target("slow")
fast_target = self.add_target("fast")
@ -100,7 +110,7 @@ class DispatcherTest(unittest.TestCase):
result = self.dispatcher.receive_alert({"symbol": "btcusdt", "action": "buy"})
self.assertEqual(result["status"], "matched")
self.assertEqual(result["status"], "queued")
self.assertEqual(result["matched_rule_id"], rule_id)
def test_more_specific_rule_wins_when_priority_ties(self) -> None:
@ -123,7 +133,7 @@ class DispatcherTest(unittest.TestCase):
self.assertIsNotNone(rule)
self.assertEqual(rule["id"], rule_id)
def test_failed_delivery_is_marked_for_retry(self) -> None:
def test_delivery_is_queued_until_worker_processes_it(self) -> None:
target_id = self.add_target()
self.add_rule(target_id)
@ -133,9 +143,25 @@ class DispatcherTest(unittest.TestCase):
with self.db.connect() as conn:
delivery = conn.execute("SELECT * FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone()
self.assertEqual(result["status"], "queued")
self.assertEqual(delivery["status"], "pending")
self.assertEqual(delivery["attempts"], 0)
def test_failed_delivery_is_marked_for_retry_after_worker_processes_it(self) -> None:
target_id = self.add_target()
self.add_rule(target_id)
result = self.dispatcher.receive_alert(
{"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy"}
)
processed = self.dispatcher.process_due_deliveries(limit=10)
with self.db.connect() as conn:
delivery = conn.execute("SELECT * FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone()
self.assertEqual(processed, 1)
self.assertEqual(delivery["status"], "retry")
self.assertEqual(delivery["attempts"], 1)
self.assertIsNotNone(delivery["error"])
self.assertTrue(delivery["error"].startswith("network_error:") or delivery["error"].startswith("send_error:"))
def test_rule_can_dispatch_to_multiple_targets(self) -> None:
target_a = self.add_target("ops-a")