tradingview-alert-dispatcher/app/server.py
2026-05-26 21:10:06 +08:00

1285 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import html
import json
import mimetypes
import os
import threading
import urllib.error
import urllib.request
from datetime import datetime, timezone
from http import HTTPStatus
from http.cookies import SimpleCookie
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from app.auth import COOKIE_NAME, check_credentials, hash_password, is_valid_session, make_session_cookie
from app.config import Settings, get_settings
from app.db import Database, from_json, now_iso, to_json
from app.dispatcher import Dispatcher, ValidationError, build_feishu_message, normalize_alert
LOG_PAGE_SIZE = 25
class AppContext:
def __init__(self, settings: Settings):
self.settings = settings
self.db = Database(settings)
self.db.migrate(settings)
self.dispatcher = Dispatcher(self.db, settings)
self.dispatch_wakeup_lock = threading.Lock()
self.dispatch_wakeup_running = False
def wake_dispatcher(self) -> None:
if self.settings.dispatch_inline or not self.settings.dispatch_wakeup_on_receive:
return
with self.dispatch_wakeup_lock:
if self.dispatch_wakeup_running:
return
self.dispatch_wakeup_running = True
thread = threading.Thread(target=self._run_dispatch_wakeup, name="dispatch-wakeup", daemon=True)
thread.start()
def _run_dispatch_wakeup(self) -> None:
try:
while True:
processed = self.dispatcher.process_due_deliveries(
limit=self.settings.delivery_batch_size,
concurrency=self.settings.delivery_concurrency,
)
if processed < self.settings.delivery_batch_size:
break
finally:
with self.dispatch_wakeup_lock:
self.dispatch_wakeup_running = False
def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any] | list[Any]) -> None:
body = json.dumps(payload, ensure_ascii=False).encode()
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)
def redirect(handler: BaseHTTPRequestHandler, location: str) -> None:
handler.send_response(HTTPStatus.SEE_OTHER)
handler.send_header("Location", location)
handler.end_headers()
def read_body(handler: BaseHTTPRequestHandler) -> bytes:
length = int(handler.headers.get("Content-Length", "0") or "0")
return handler.rfile.read(length)
def parse_form(handler: BaseHTTPRequestHandler) -> dict[str, str]:
data = read_body(handler).decode()
return {key: values[-1] for key, values in parse_qs(data).items()}
def parse_form_multi(handler: BaseHTTPRequestHandler) -> dict[str, list[str]]:
return parse_qs(read_body(handler).decode())
def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
try:
value = json.loads(read_body(handler).decode() or "{}")
except json.JSONDecodeError as exc:
raise ValidationError("Request body must be valid JSON") from exc
if not isinstance(value, dict):
raise ValidationError("Request body must be a JSON object")
return value
def target_checkbox_options(
targets: list[dict[str, Any]],
selected_ids: list[int] | None = None,
) -> str:
selected_ids = selected_ids or []
if not targets:
return '<p class="warning">还没有可用的飞书 Webhook请先到飞书 Webhook 页面创建。</p>'
options = []
for target in targets:
checked = "checked" if target["id"] in selected_ids else ""
options.append(
f'<label class="target-choice"><input type="checkbox" name="target_ids" value="{target["id"]}" {checked}> <span>{html.escape(target["name"])}</span></label>'
)
return "".join(options)
def template_picker(templates: list[dict[str, Any]]) -> str:
if not templates:
return '<p class="muted-note">还没有飞书内容模板,可以直接填写标题和正文。</p>'
options = ['<option value="">不套用模板</option>']
for template in templates:
options.append(
"<option "
f'value="{template["id"]}" '
f'data-title="{html.escape(template["card_title_template"], quote=True)}" '
f'data-body="{html.escape(template["card_body_template"], quote=True)}">'
f'{html.escape(template["name"])}'
"</option>"
)
return (
'<div class="template-picker">'
'<label>套用飞书内容模板'
f'<select data-template-picker>{"".join(options)}</select>'
'</label>'
'<a class="button-link secondary compact" href="/templates/new">新增模板</a>'
'</div>'
)
def parse_positive_int(value: str | None, default: int = 1) -> int:
try:
parsed = int(value or "")
except ValueError:
return default
return parsed if parsed > 0 else default
def page_window(current: int, total_pages: int) -> list[int]:
start = max(1, current - 2)
end = min(total_pages, current + 2)
return list(range(start, end + 1))
def render_pagination(
base_path: str,
active_tab: str,
page: int,
total: int,
page_size: int,
filters: dict[str, str] | None = None,
) -> str:
total_pages = max(1, (total + page_size - 1) // page_size)
if total_pages <= 1:
return f'<div class="pagination muted-note">共 {total} 条</div>'
filters = filters or {}
def item(label: str, target_page: int, disabled: bool = False, current: bool = False) -> str:
if disabled:
return f'<span class="page-link disabled">{html.escape(label)}</span>'
active_class = " current" if current else ""
query = {"tab": active_tab, "page": str(target_page), **filters}
query_string = "&".join(
f"{html.escape(key)}={html.escape(value)}" for key, value in query.items() if value
)
return (
f'<a class="page-link{active_class}" '
f'href="{base_path}?{query_string}">{html.escape(label)}</a>'
)
links = [
item("上一页", page - 1, disabled=page <= 1),
*(item(str(number), number, current=number == page) for number in page_window(page, total_pages)),
item("下一页", page + 1, disabled=page >= total_pages),
]
return (
'<div class="pagination">'
f'<span class="page-total">共 {total} 条,第 {page} / {total_pages} 页</span>'
f'{"".join(links)}'
'</div>'
)
def display_timezone(settings: Settings) -> timezone:
if settings.timezone:
try:
return ZoneInfo(settings.timezone)
except ZoneInfoNotFoundError:
return datetime.now().astimezone().tzinfo or timezone.utc
return datetime.now().astimezone().tzinfo or timezone.utc
def display_timezone_label(settings: Settings) -> str:
if settings.timezone:
return settings.timezone
return str(display_timezone(settings))
def format_display_time(settings: Settings, value: str | None) -> str:
if not value:
return ""
try:
parsed = datetime.fromisoformat(value)
except ValueError:
return value
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(display_timezone(settings)).strftime("%Y-%m-%d %H:%M:%S")
def filter_value(query: dict[str, list[str]], key: str) -> str:
return query.get(key, [""])[-1].strip()
def status_select(name: str, selected: str, options: list[tuple[str, str]]) -> str:
items = ['<option value="">全部</option>']
for value, label in options:
checked = " selected" if selected == value else ""
items.append(f'<option value="{html.escape(value)}"{checked}>{html.escape(label)}</option>')
return f'<select name="{html.escape(name)}">{"".join(items)}</select>'
def log_filter_form(active_tab: str, filters: dict[str, str]) -> str:
symbol = html.escape(filters.get("symbol", ""))
strategy = html.escape(filters.get("strategy", ""))
status = filters.get("status", "")
target = html.escape(filters.get("target", ""))
if active_tab == "alerts":
alert_status = status_select(
"status",
status,
[
("queued", "已入队"),
("unmatched", "未命中"),
("partial", "部分完成"),
("delivered", "已送达"),
],
)
fields = f"""
<label>品种<input name="symbol" value="{symbol}" placeholder="GOLD"></label>
<label>策略<input name="strategy" value="{strategy}" placeholder="supply_demand"></label>
<label>状态{alert_status}</label>"""
else:
delivery_status = status_select(
"status",
status,
[
("pending", "待发送"),
("processing", "发送中"),
("retry", "重试中"),
("failed", "失败"),
("sent", "已发送"),
],
)
fields = f"""
<label>目标<input name="target" value="{target}" placeholder="Webhook 名称"></label>
<label>状态{delivery_status}</label>"""
return f"""<form class="panel log-filter" method="get" action="/logs">
<input type="hidden" name="tab" value="{html.escape(active_tab)}">
<input type="hidden" name="page" value="1">
{fields}
<div class="actions"><button type="submit">筛选</button><a class="button-link secondary" href="/logs?tab={html.escape(active_tab)}&page=1">重置</a></div>
</form>"""
class Handler(BaseHTTPRequestHandler):
context: AppContext
def log_message(self, format: str, *args: Any) -> None:
print("%s - - [%s] %s" % (self.address_string(), self.log_date_time_string(), format % args))
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/health":
self.handle_health()
return
if parsed.path == "/login":
self.render_login()
return
if parsed.path.startswith("/static/"):
self.serve_static(parsed.path)
return
if not self.require_auth():
return
if parsed.path in ("/", "/dashboard"):
self.render_dashboard()
elif parsed.path == "/targets":
self.render_targets()
elif parsed.path == "/targets/delete":
self.render_target_delete(parsed)
elif parsed.path == "/templates":
self.render_templates()
elif parsed.path == "/templates/new":
self.render_template_new()
elif parsed.path == "/templates/edit":
self.render_template_edit(parsed)
elif parsed.path == "/templates/delete":
self.render_template_delete(parsed)
elif parsed.path == "/rules":
self.render_rules()
elif parsed.path == "/rules/new":
self.render_rule_new()
elif parsed.path == "/rules/edit":
self.render_rule_edit(parsed)
elif parsed.path == "/rules/delete":
self.render_rule_delete(parsed)
elif parsed.path == "/logs":
self.render_logs(parsed)
elif parsed.path == "/test":
self.render_test()
elif parsed.path == "/account":
self.render_account()
elif parsed.path == "/api/targets":
json_response(self, 200, self.list_targets())
elif parsed.path == "/api/rules":
json_response(self, 200, self.list_rules())
elif parsed.path == "/api/logs":
json_response(self, 200, self.list_recent_logs())
else:
self.send_error(404)
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/webhook/tradingview":
self.handle_tradingview_webhook()
return
if parsed.path == "/login":
self.handle_login()
return
if not self.require_auth():
return
routes = {
"/targets/create": self.create_target,
"/targets/update": self.update_target,
"/targets/delete": self.delete_target,
"/targets/test": self.test_target,
"/templates/create": self.create_template,
"/templates/update": self.update_template,
"/templates/delete": self.delete_template,
"/rules/create": self.create_rule,
"/rules/update": self.update_rule,
"/rules/toggle": self.toggle_rule,
"/rules/delete": self.delete_rule,
"/test/send": self.send_test,
"/account/password": self.change_password,
"/deliveries/retry": self.retry_deliveries,
"/deliveries/retry-one": self.retry_delivery_one,
"/logout": self.logout,
}
handler = routes.get(parsed.path)
if not handler:
self.send_error(404)
return
handler()
def require_auth(self) -> bool:
if is_valid_session(self.context.settings, self.headers.get("Cookie")):
return True
redirect(self, "/login")
return False
def layout(self, title: str, body: str) -> bytes:
nav = [
("/dashboard", "概览"),
("/rules", "路由规则"),
("/templates", "飞书模板"),
("/targets", "飞书 Webhook"),
("/logs", "日志"),
("/test", "测试发送"),
("/account", "账号安全"),
]
items = "".join(f'<a href="{href}">{label}</a>' for href, label in nav)
return f"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{html.escape(title)}</title>
<link rel="stylesheet" href="/static/styles.css">
<script src="/static/app.js" defer></script>
</head>
<body>
<aside class="sidebar">
<div class="brand">TV Dispatch</div>
<nav>{items}</nav>
<form method="post" action="/logout"><button class="ghost" type="submit">退出</button></form>
</aside>
<main class="shell">{body}</main>
</body>
</html>""".encode()
def send_html(self, title: str, body: str) -> None:
content = self.layout(title, body)
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def render_login(self) -> None:
content = """<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Login</title>
<link rel="stylesheet" href="/static/styles.css">
</head>
<body class="login-page">
<form class="login-card" method="post" action="/login">
<h1>TV Dispatch</h1>
<p>TradingView alert routing console</p>
<label>用户名<input name="username" autocomplete="username" required></label>
<label>密码<input name="password" type="password" autocomplete="current-password" required></label>
<button type="submit">登录</button>
</form>
</body>
</html>""".encode()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def handle_login(self) -> None:
form = parse_form(self)
if not check_credentials(
self.context.settings,
form.get("username", ""),
form.get("password", ""),
self.get_admin_password_hash(),
):
redirect(self, "/login")
return
cookie = SimpleCookie()
cookie[COOKIE_NAME] = make_session_cookie(self.context.settings)
cookie[COOKIE_NAME]["path"] = "/"
cookie[COOKIE_NAME]["httponly"] = True
cookie[COOKIE_NAME]["samesite"] = "Lax"
self.send_response(HTTPStatus.SEE_OTHER)
self.send_header("Location", "/dashboard")
self.send_header("Set-Cookie", cookie.output(header="").strip())
self.end_headers()
def get_admin_password_hash(self) -> str:
with self.context.db.connect() as conn:
row = conn.execute("SELECT password_hash FROM admin_settings WHERE id = 1").fetchone()
return row["password_hash"]
def logout(self) -> None:
self.send_response(HTTPStatus.SEE_OTHER)
self.send_header("Location", "/login")
self.send_header("Set-Cookie", f"{COOKIE_NAME}=; Path=/; Max-Age=0; HttpOnly; SameSite=Lax")
self.end_headers()
def serve_static(self, path: str) -> None:
local_path = os.path.join(os.path.dirname(__file__), "static", os.path.basename(path))
if not os.path.exists(local_path):
self.send_error(404)
return
with open(local_path, "rb") as file:
content = file.read()
self.send_response(200)
self.send_header("Content-Type", mimetypes.guess_type(local_path)[0] or "application/octet-stream")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def handle_health(self) -> None:
now = datetime.now(timezone.utc)
try:
with self.context.db.connect() as conn:
conn.execute("SELECT 1").fetchone()
counts = {
"pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'pending'").fetchone()["c"],
"retry": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'retry'").fetchone()["c"],
"processing": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'processing'").fetchone()["c"],
"failed": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'failed'").fetchone()["c"],
}
worker_row = conn.execute(
"SELECT value FROM app_state WHERE key = 'worker.last_seen_at'"
).fetchone()
except Exception as exc:
json_response(self, 503, {"ok": False, "database": "error", "error": str(exc)})
return
worker_last_seen = worker_row["value"] if worker_row else None
worker_fresh = False
if worker_last_seen:
try:
parsed = datetime.fromisoformat(worker_last_seen)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
max_age = max(60, self.context.settings.worker_interval_seconds * 3)
worker_fresh = (now - parsed.astimezone(timezone.utc)).total_seconds() <= max_age
except ValueError:
worker_fresh = False
json_response(
self,
200,
{
"ok": True,
"database": "ok",
"queue": counts,
"worker": {
"last_seen_at": worker_last_seen,
"fresh": worker_fresh,
"interval_seconds": self.context.settings.worker_interval_seconds,
},
"dispatch_inline": self.context.settings.dispatch_inline,
"dispatch_wakeup_on_receive": self.context.settings.dispatch_wakeup_on_receive,
},
)
def handle_tradingview_webhook(self) -> None:
if self.context.settings.webhook_token:
query = parse_qs(urlparse(self.path).query)
token = self.headers.get("X-Webhook-Token") or query.get("token", [""])[-1]
if token != self.context.settings.webhook_token:
json_response(self, 401, {"error": "Invalid webhook token"})
return
try:
payload = parse_json_body(self)
result = self.context.dispatcher.receive_alert(payload)
if result.get("delivery_ids"):
self.context.wake_dispatcher()
json_response(self, 202, result)
except ValidationError as exc:
json_response(self, 400, {"error": str(exc)})
def list_targets(self) -> list[dict[str, Any]]:
with self.context.db.connect() as conn:
rows = conn.execute("SELECT * FROM webhook_targets ORDER BY id DESC").fetchall()
return [dict(row) for row in rows]
def list_rules(self) -> list[dict[str, Any]]:
with self.context.db.connect() as conn:
rows = conn.execute("SELECT * FROM routing_rules ORDER BY priority ASC, id DESC").fetchall()
rules = []
for row in rows:
item = dict(row)
item["target_ids"] = from_json(item["target_ids"], [])
rules.append(item)
return rules
def list_templates(self) -> list[dict[str, Any]]:
with self.context.db.connect() as conn:
rows = conn.execute("SELECT * FROM message_templates ORDER BY id DESC").fetchall()
return [dict(row) for row in rows]
def list_logs(
self,
active_tab: str = "alerts",
page: int = 1,
page_size: int = LOG_PAGE_SIZE,
filters: dict[str, str] | None = None,
) -> dict[str, Any]:
filters = filters or {}
offset = (page - 1) * page_size
with self.context.db.connect() as conn:
alert_where = []
alert_params: list[Any] = []
if filters.get("symbol"):
alert_where.append("symbol = ?")
alert_params.append(filters["symbol"].upper())
if filters.get("strategy"):
alert_where.append("strategy = ?")
alert_params.append(filters["strategy"])
if filters.get("status"):
alert_where.append("status = ?")
alert_params.append(filters["status"])
alert_where_sql = f"WHERE {' AND '.join(alert_where)}" if alert_where else ""
delivery_where = []
delivery_params: list[Any] = []
if filters.get("status"):
delivery_where.append("status = ?")
delivery_params.append(filters["status"])
if filters.get("target"):
delivery_where.append("target_name LIKE ?")
delivery_params.append(f"%{filters['target']}%")
delivery_where_sql = f"WHERE {' AND '.join(delivery_where)}" if delivery_where else ""
alert_total = conn.execute(
f"SELECT COUNT(*) AS c FROM alerts {alert_where_sql}",
alert_params,
).fetchone()["c"]
delivery_total = conn.execute(
f"SELECT COUNT(*) AS c FROM deliveries {delivery_where_sql}",
delivery_params,
).fetchone()["c"]
alerts = []
deliveries = []
if active_tab == "alerts":
alerts = conn.execute(
f"SELECT * FROM alerts {alert_where_sql} ORDER BY id DESC LIMIT ? OFFSET ?",
(*alert_params, page_size, offset),
).fetchall()
else:
deliveries = conn.execute(
f"""
SELECT d.*,
(
SELECT COUNT(*)
FROM delivery_attempts da
WHERE da.delivery_id = d.id
) AS attempt_records
FROM deliveries d
{delivery_where_sql}
ORDER BY d.id DESC LIMIT ? OFFSET ?
""",
(*delivery_params, page_size, offset),
).fetchall()
return {
"alerts": [dict(row) for row in alerts],
"deliveries": [dict(row) for row in deliveries],
"alert_total": alert_total,
"delivery_total": delivery_total,
}
def list_recent_logs(self) -> dict[str, list[dict[str, Any]]]:
with self.context.db.connect() as conn:
alerts = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 100").fetchall()
deliveries = conn.execute("SELECT * FROM deliveries ORDER BY id DESC LIMIT 200").fetchall()
attempts = conn.execute("SELECT * FROM delivery_attempts ORDER BY id DESC LIMIT 200").fetchall()
return {
"alerts": [dict(row) for row in alerts],
"deliveries": [dict(row) for row in deliveries],
"delivery_attempts": [dict(row) for row in attempts],
}
def render_dashboard(self) -> None:
host = self.headers.get("Host", f"localhost:{self.context.settings.port}")
scheme = self.headers.get("X-Forwarded-Proto", "http")
base_url = f"{scheme}://{host}"
webhook_url = f"{base_url}/webhook/tradingview"
token = self.context.settings.webhook_token
webhook_url_with_token = f"{webhook_url}?token={token}" if token else webhook_url
token_block = (
f"""<div class="copy-row"><span>Webhook Token</span><code>{html.escape(token)}</code><button type="button" data-copy="{html.escape(token)}">复制</button></div>
<div class="copy-row"><span>Header 方式</span><code>X-Webhook-Token: {html.escape(token)}</code><button type="button" data-copy="X-Webhook-Token: {html.escape(token)}">复制</button></div>"""
if token
else """<p class="warning">当前未设置 WEBHOOK_TOKEN任何知道地址的人都可以提交 alert。生产环境建议设置。</p>"""
)
webhook_panel = f"""<section class="panel">
<h2>TradingView Webhook 配置</h2>
<div class="copy-row"><span>Webhook URL</span><code>{html.escape(webhook_url_with_token)}</code><button type="button" data-copy="{html.escape(webhook_url_with_token)}">复制</button></div>
<div class="copy-row"><span>纯 URL</span><code>{html.escape(webhook_url)}</code><button type="button" data-copy="{html.escape(webhook_url)}">复制</button></div>
{token_block}
</section>"""
with self.context.db.connect() as conn:
counts = {
"alerts": conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"],
"rules": conn.execute("SELECT COUNT(*) AS c FROM routing_rules").fetchone()["c"],
"targets": conn.execute("SELECT COUNT(*) AS c FROM webhook_targets").fetchone()["c"],
"pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'pending'").fetchone()["c"],
"processing": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'processing'").fetchone()["c"],
"retry": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'retry'").fetchone()["c"],
"failed": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'failed'").fetchone()["c"],
}
recent = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 8").fetchall()
cards = "".join(f'<div class="metric"><span>{label}</span><strong>{value}</strong></div>' for label, value in [
("Alerts", counts["alerts"]),
("Rules", counts["rules"]),
("Targets", counts["targets"]),
("待发送", counts["pending"]),
("发送中", counts["processing"]),
("重试中", counts["retry"]),
("失败", counts["failed"]),
])
rows = "".join(
f"<tr><td>{row['id']}</td><td>{html.escape(row['symbol'])}</td><td>{html.escape(row['timeframe'])}</td><td>{html.escape(row['strategy'])}</td><td><span class='status'>{html.escape(row['status'])}</span></td><td>{format_display_time(self.context.settings, row['created_at'])}</td></tr>"
for row in recent
)
self.send_html("概览", f"<header><h1>概览</h1><p>结构化 alert 分发、飞书转发和重试状态。</p></header>{webhook_panel}<section class='metrics'>{cards}</section><section><h2>最近 Alert</h2><table><thead><tr><th>ID</th><th>品种</th><th>周期</th><th>策略</th><th>状态</th><th>时间</th></tr></thead><tbody>{rows}</tbody></table></section>")
def render_targets(self) -> None:
targets = self.list_targets()
rows = "".join(
f"""<tr>
<td>{target['id']}<input form="target-update-{target['id']}" type="hidden" name="id" value="{target['id']}"></td>
<td><input form="target-update-{target['id']}" name="name" value="{html.escape(target['name'])}" required></td>
<td class="url"><input form="target-update-{target['id']}" name="webhook_url" value="{html.escape(target['webhook_url'])}" type="url" required></td>
<td><form id="target-update-{target['id']}" class="inline" method="post" action="/targets/update"></form><button form="target-update-{target['id']}" type="submit">更新</button>
<form class="inline" method="post" action="/targets/test"><input type="hidden" name="id" value="{target['id']}"><button type="submit">测试</button></form>
<a class="button-link danger-link" href="/targets/delete?id={target['id']}">删除</a>
</td></tr>"""
for target in targets
)
form = """<form class="panel" method="post" action="/targets/create">
<h2>新增飞书 Webhook</h2>
<label>名称<input name="name" required></label>
<label>Webhook URL<input name="webhook_url" type="url" required></label>
<button type="submit">保存目标</button>
</form>"""
notice = getattr(self, "_target_notice", "")
self.send_html("飞书 Webhook", f"<header><h1>飞书 Webhook</h1><p>维护所有可分发的飞书机器人地址。</p></header>{notice}{form}<table><thead><tr><th>ID</th><th>名称</th><th>URL</th><th>操作</th></tr></thead><tbody>{rows}</tbody></table>")
def render_target_delete(self, parsed: Any) -> None:
target_id = parse_qs(parsed.query).get("id", [""])[-1]
with self.context.db.connect() as conn:
target = conn.execute("SELECT * FROM webhook_targets WHERE id = ?", (target_id,)).fetchone()
if not target:
self.send_error(404)
return
body = f"""<header><h1>删除飞书 Webhook</h1><p>请确认是否删除这个飞书目标。</p></header>
<section class="panel narrow">
<h2>{html.escape(target['name'])}</h2>
<p class="url">{html.escape(target['webhook_url'])}</p>
<form method="post" action="/targets/delete" class="actions">
<input type="hidden" name="id" value="{target['id']}">
<button class="danger" type="submit">确认删除</button>
<a class="button-link secondary" href="/targets">取消</a>
</form>
</section>"""
self.send_html("删除飞书 Webhook", body)
def render_templates(self) -> None:
templates = self.list_templates()
rows = "".join(
f"""<tr>
<td>{template['id']}</td>
<td>{html.escape(template['name'])}</td>
<td>{html.escape(template['description'] or '-')}</td>
<td><span class="template-sample">{html.escape(template['card_title_template'])}</span></td>
<td><a class="button-link" href="/templates/edit?id={template['id']}">编辑</a><a class="button-link danger-link" href="/templates/delete?id={template['id']}">删除</a></td>
</tr>"""
for template in templates
)
header = """<header class="page-header"><div><h1>飞书内容模板</h1><p>把常用卡片标题和正文保存成模板,新建或编辑路由规则时可以一键套用。</p></div><a class="button-link" href="/templates/new">新增模板</a></header>"""
self.send_html("飞书内容模板", f"{header}<table><thead><tr><th>ID</th><th>名称</th><th>说明</th><th>标题模板</th><th>操作</th></tr></thead><tbody>{rows}</tbody></table>")
def render_template_form(
self,
title: str,
action: str,
template: dict[str, Any] | None = None,
) -> None:
template = template or {
"id": "",
"name": "",
"description": "",
"card_title_template": "{{title}}",
"card_body_template": "**{{symbol}} · {{timeframe}} · {{direction}}**\n\n> {{signal_type}}\n\n**价格区**\n- 试仓位:{{probe_entry}}\n- 优先位:{{priority_entry}}\n- 防守位:{{defense_price}}\n- 止损位:{{stop_loss}}\n- 当前价:{{current_price}}\n\n**说明**\n{{description}}\n\n**风险提示**\n{{risk_tip}}",
}
hidden_id = f'<input type="hidden" name="id" value="{template["id"]}">' if template.get("id") else ""
button_text = "保存修改" if template.get("id") else "创建模板"
body = f"""<header><h1>{html.escape(title)}</h1><p>模板支持 <code>{{{{field}}}}</code> 或 <code>{{field}}</code> 占位符,字段来自 TradingView JSON。</p></header>
<form class="panel template-form" method="post" action="{action}">
{hidden_id}
<label>模板名称<input name="name" value="{html.escape(str(template['name']))}" required></label>
<label>说明<input name="description" value="{html.escape(str(template['description']))}" placeholder="例如:供需系统阻力/支撑区域通用"></label>
<label>卡片标题模板<input name="card_title_template" value="{html.escape(str(template['card_title_template']))}" required></label>
<label>卡片正文模板<textarea name="card_body_template" rows="16" required>{html.escape(str(template['card_body_template']))}</textarea></label>
<div class="actions"><button type="submit">{button_text}</button><a class="button-link secondary" href="/templates">返回列表</a></div>
</form>"""
self.send_html(title, body)
def render_template_new(self) -> None:
self.render_template_form("新增飞书内容模板", "/templates/create")
def render_template_edit(self, parsed: Any) -> None:
template_id = parse_qs(parsed.query).get("id", [""])[-1]
with self.context.db.connect() as conn:
template = conn.execute("SELECT * FROM message_templates WHERE id = ?", (template_id,)).fetchone()
if not template:
self.send_error(404)
return
self.render_template_form("编辑飞书内容模板", "/templates/update", dict(template))
def render_template_delete(self, parsed: Any) -> None:
template_id = parse_qs(parsed.query).get("id", [""])[-1]
with self.context.db.connect() as conn:
template = conn.execute("SELECT * FROM message_templates WHERE id = ?", (template_id,)).fetchone()
if not template:
self.send_error(404)
return
body = f"""<header><h1>删除飞书内容模板</h1><p>请确认是否删除这个模板。</p></header>
<section class="panel narrow">
<h2>{html.escape(template['name'])}</h2>
<p>删除模板不会影响已经创建的路由规则,只是后续不能再套用它。</p>
<form method="post" action="/templates/delete" class="actions">
<input type="hidden" name="id" value="{template['id']}">
<button class="danger" type="submit">确认删除</button>
<a class="button-link secondary" href="/templates">取消</a>
</form>
</section>"""
self.send_html("删除飞书内容模板", body)
def render_rules(self) -> None:
targets = self.list_targets()
rules = self.list_rules()
target_names = {target["id"]: target["name"] for target in targets}
rows = ""
for rule in rules:
conditions = [
f"周期={html.escape(rule['timeframe'])}" if rule["timeframe"] else "",
f"品种={html.escape(rule['symbol'])}" if rule["symbol"] else "",
f"策略={html.escape(rule['strategy'])}" if rule["strategy"] else "",
]
target_badges = "".join(
f'<span class="tag">{html.escape(target_names.get(target_id, f"#{target_id}"))}</span>'
for target_id in rule["target_ids"]
) or "-"
rows += f"""<tr>
<td>{rule['id']}</td>
<td>{html.escape(rule['name'])}</td>
<td>{'<br>'.join(item for item in conditions if item) or '-'}</td>
<td>{rule['priority']}</td>
<td><div class="tag-list">{target_badges}</div></td>
<td>
<form class="inline" method="post" action="/rules/toggle">
<input type="hidden" name="id" value="{rule['id']}">
<label class="switch switch-compact" title="切换规则启用状态">
<input name="enabled" type="checkbox" {'checked' if rule['enabled'] else ''} onchange="this.form.submit()">
<span></span><strong>{'启用' if rule['enabled'] else '停用'}</strong>
</label>
</form>
</td>
<td><a class="button-link" href="/rules/edit?id={rule['id']}">编辑</a><a class="button-link danger-link" href="/rules/delete?id={rule['id']}">删除</a></td>
</tr>"""
header = """<header class="page-header"><div><h1>路由规则</h1><p>周期、品种、策略至少填写一个;空字段表示不限。消息统一用飞书卡片发送。</p></div><a class="button-link" href="/rules/new">新增规则</a></header>"""
self.send_html("路由规则", f"{header}<table><thead><tr><th>ID</th><th>名称</th><th>匹配条件</th><th>优先级</th><th>发送到</th><th>状态</th><th>操作</th></tr></thead><tbody>{rows}</tbody></table>")
def render_rule_form(
self,
title: str,
action: str,
rule: dict[str, Any] | None = None,
) -> None:
targets = self.list_targets()
templates = self.list_templates()
rule = rule or {
"id": "",
"name": "",
"timeframe": "",
"symbol": "",
"strategy": "",
"priority": 100,
"card_title_template": "TradingView {{symbol}} {{action}}",
"card_body_template": "**品种**: {{symbol}}\n**周期**: {{timeframe}}\n**策略**: {{strategy}}\n**动作**: {{action}}\n**价格**: {{price}}",
"target_ids": [],
"enabled": 1,
}
selected_targets = target_checkbox_options(targets, rule.get("target_ids", []))
picker = template_picker(templates)
hidden_id = f'<input type="hidden" name="id" value="{rule["id"]}">' if rule.get("id") else ""
button_text = "保存修改" if rule.get("id") else "创建规则"
body = f"""<header><h1>{html.escape(title)}</h1><p>消息统一使用飞书卡片。周期、品种、策略至少填写一个,空字段表示不限。</p></header>
<form class="panel rule-form" method="post" action="{action}">
{hidden_id}
<div class="grid">
<label>规则名<input name="name" value="{html.escape(str(rule['name']))}" required></label>
<label>周期<input name="timeframe" value="{html.escape(str(rule['timeframe']))}" placeholder="5m空=不限"></label>
<label>品种<input name="symbol" value="{html.escape(str(rule['symbol']))}" placeholder="BTCUSDT空=不限"></label>
<label>策略<input name="strategy" value="{html.escape(str(rule['strategy']))}" placeholder="breakout空=不限"></label>
<label>优先级<input name="priority" type="number" value="{rule['priority']}" required></label>
</div>
{picker}
<label>卡片标题模板<input data-title-template-input name="card_title_template" value="{html.escape(str(rule['card_title_template']))}" required></label>
<label>卡片正文模板<textarea data-body-template-input name="card_body_template" rows="10">{html.escape(str(rule['card_body_template']))}</textarea></label>
<div class="field-target"><span class="field-label">发送到</span><div class="target-choices">{selected_targets}</div></div>
<label class="switch"><input name="enabled" type="checkbox" {'checked' if rule.get('enabled') else ''}><span></span><strong>启用规则</strong></label>
<div class="actions"><button type="submit">{button_text}</button><a class="button-link secondary" href="/rules">返回列表</a></div>
</form>"""
self.send_html(title, body)
def render_rule_new(self) -> None:
self.render_rule_form("新增路由规则", "/rules/create")
def render_rule_edit(self, parsed: Any) -> None:
query = parse_qs(parsed.query)
rule_id = query.get("id", [""])[-1]
with self.context.db.connect() as conn:
rule = conn.execute("SELECT * FROM routing_rules WHERE id = ?", (rule_id,)).fetchone()
if not rule:
self.send_error(404)
return
rule_dict = dict(rule)
rule_dict["target_ids"] = from_json(rule_dict["target_ids"], [])
self.render_rule_form("编辑路由规则", "/rules/update", rule_dict)
def render_rule_delete(self, parsed: Any) -> None:
rule_id = parse_qs(parsed.query).get("id", [""])[-1]
with self.context.db.connect() as conn:
rule = conn.execute("SELECT * FROM routing_rules WHERE id = ?", (rule_id,)).fetchone()
if not rule:
self.send_error(404)
return
body = f"""<header><h1>删除路由规则</h1><p>请确认是否删除这条规则。</p></header>
<section class="panel narrow">
<h2>{html.escape(rule['name'])}</h2>
<p>删除后不会再匹配对应 alert已有日志不受影响。</p>
<form method="post" action="/rules/delete" class="actions">
<input type="hidden" name="id" value="{rule['id']}">
<button class="danger" type="submit">确认删除</button>
<a class="button-link secondary" href="/rules">取消</a>
</form>
</section>"""
self.send_html("删除路由规则", body)
def render_logs(self, parsed: Any) -> None:
query = parse_qs(parsed.query)
active_tab = query.get("tab", ["alerts"])[-1]
if active_tab not in {"alerts", "deliveries"}:
active_tab = "alerts"
page = parse_positive_int(query.get("page", ["1"])[-1])
filters = {
"symbol": filter_value(query, "symbol"),
"strategy": filter_value(query, "strategy"),
"status": filter_value(query, "status"),
"target": filter_value(query, "target"),
}
total_key = "alert_total" if active_tab == "alerts" else "delivery_total"
logs = self.list_logs(active_tab=active_tab, page=page, filters=filters)
total_pages = max(1, (logs[total_key] + LOG_PAGE_SIZE - 1) // LOG_PAGE_SIZE)
if page > total_pages:
page = total_pages
logs = self.list_logs(active_tab=active_tab, page=page, filters=filters)
alert_rows = ""
for row in logs["alerts"]:
try:
raw_payload = json.dumps(json.loads(row["payload"]), ensure_ascii=False, indent=2)
except Exception:
raw_payload = row["payload"] or ""
created_at = format_display_time(self.context.settings, row["created_at"])
alert_rows += f"""<tr>
<td>{row['id']}</td>
<td>{html.escape(row['symbol'])}</td>
<td>{html.escape(row['timeframe'])}</td>
<td>{html.escape(row['strategy'])}</td>
<td><span class='status'>{html.escape(row['status'])}</span></td>
<td>{html.escape(row['error'] or '')}</td>
<td>{created_at}</td>
<td><details class="payload-details"><summary>查看</summary><pre>{html.escape(raw_payload)}</pre></details></td>
</tr>"""
delivery_rows = "".join(
f"""<tr><td>{row['id']}</td><td>{row['alert_id']}</td><td>{html.escape(row['target_name'])}</td><td><span class='status'>{html.escape(row['status'])}</span></td><td>{row['attempts']}</td><td>{row.get('attempt_records', row['attempts'])}</td><td>{html.escape(str(row['response_code'] or ''))}</td><td>{format_display_time(self.context.settings, row['last_attempt_at'])}</td><td>{html.escape(row['error'] or '')}</td><td>{format_display_time(self.context.settings, row['next_attempt_at'])}</td><td><form class="inline" method="post" action="/deliveries/retry-one"><input type="hidden" name="id" value="{row['id']}"><button class="small-button" type="submit" {'disabled' if row['status'] == 'sent' else ''}>立即重发</button></form></td></tr>"""
for row in logs["deliveries"]
)
alert_empty = '<tr><td colspan="8" class="empty-cell">暂无 Alert 日志</td></tr>'
delivery_empty = '<tr><td colspan="11" class="empty-cell">暂无分发日志</td></tr>'
alert_active = " active" if active_tab == "alerts" else ""
delivery_active = " active" if active_tab == "deliveries" else ""
active_table = (
f"""<table><thead><tr><th>ID</th><th>品种</th><th>周期</th><th>策略</th><th>状态</th><th>错误</th><th>时间</th><th>原始 Alert</th></tr></thead><tbody>{alert_rows or alert_empty}</tbody></table>
{render_pagination("/logs", "alerts", page, logs["alert_total"], LOG_PAGE_SIZE, filters)}"""
if active_tab == "alerts"
else f"""<table><thead><tr><th>ID</th><th>Alert</th><th>目标</th><th>状态</th><th>次数</th><th>明细</th><th>HTTP</th><th>发送时间</th><th>错误</th><th>下次重试</th><th>操作</th></tr></thead><tbody>{delivery_rows or delivery_empty}</tbody></table>
{render_pagination("/logs", "deliveries", page, logs["delivery_total"], LOG_PAGE_SIZE, filters)}"""
)
timezone_label = html.escape(display_timezone_label(self.context.settings))
body = f"""<header class="page-header"><div><h1>日志</h1><p>按类型查看 Alert 和分发记录,每页 {LOG_PAGE_SIZE} 条。当前显示时区:{timezone_label}。</p></div>
<form method="post" action="/deliveries/retry"><button type="submit">处理到期重试</button></form></header>
<nav class="tabs" aria-label="日志类型">
<a class="tab{alert_active}" href="/logs?tab=alerts&page=1">Alert 日志 <span>{logs["alert_total"]}</span></a>
<a class="tab{delivery_active}" href="/logs?tab=deliveries&page=1">分发日志 <span>{logs["delivery_total"]}</span></a>
</nav>
{log_filter_form(active_tab, filters)}
<section class="log-panel">{active_table}</section>"""
self.send_html("日志", body)
def render_test(self) -> None:
sample = html.escape(json.dumps({"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000}, indent=2))
result = getattr(self, "_test_result_html", "")
body = f"""<header><h1>测试发送</h1><p>提交一条模拟 TradingView alert走完整匹配和飞书转发流程。</p></header>
<form class="panel" method="post" action="/test/send">
<label>Alert JSON<textarea name="payload" rows="12">{sample}</textarea></label>
<button type="submit">发送测试 Alert</button>
</form>"""
if result:
body += result
self.send_html("测试发送", body)
def render_account(self) -> None:
body = """<header><h1>账号安全</h1><p>修改当前管理员密码,修改成功后会退出登录。</p></header>
<form class="panel narrow" method="post" action="/account/password">
<h2>修改密码</h2>
<label>当前密码<input name="current_password" type="password" autocomplete="current-password" required></label>
<label>新密码<input name="new_password" type="password" autocomplete="new-password" minlength="8" required></label>
<label>确认新密码<input name="confirm_password" type="password" autocomplete="new-password" minlength="8" required></label>
<button type="submit">更新密码</button>
</form>"""
self.send_html("账号安全", body)
def create_target(self) -> None:
form = parse_form(self)
now = now_iso()
with self.context.db.connect() as conn:
conn.execute(
"INSERT INTO webhook_targets (name, webhook_url, enabled, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
(form["name"].strip(), form["webhook_url"].strip(), 1, now, now),
)
redirect(self, "/targets")
def update_target(self) -> None:
form = parse_form(self)
with self.context.db.connect() as conn:
conn.execute(
"UPDATE webhook_targets SET name = ?, webhook_url = ?, enabled = ?, updated_at = ? WHERE id = ?",
(form["name"].strip(), form["webhook_url"].strip(), 1, now_iso(), form["id"]),
)
redirect(self, "/targets")
def delete_target(self) -> None:
form = parse_form(self)
with self.context.db.connect() as conn:
conn.execute("DELETE FROM webhook_targets WHERE id = ?", (form["id"],))
redirect(self, "/targets")
def test_target(self) -> None:
form = parse_form(self)
with self.context.db.connect() as conn:
target = conn.execute("SELECT * FROM webhook_targets WHERE id = ?", (form.get("id"),)).fetchone()
if not target:
self.send_error(404)
return
message = build_feishu_message(
{"symbol": "TEST", "timeframe": "5m", "strategy": "webhook-test", "action": "test"},
{
"card_title_template": "TV Dispatch 测试消息",
"card_body_template": "**目标**: {{symbol}}\n**动作**: {{action}}\n这是一条飞书 Webhook 连通性测试。",
},
)
try:
data = json.dumps(message, ensure_ascii=False).encode()
request = urllib.request.Request(
target["webhook_url"],
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=self.context.settings.feishu_timeout_seconds) as response:
status = response.getcode()
self.render_targets_with_notice(f"测试消息已发送到 {html.escape(target['name'])}HTTP {status}", success=True)
except urllib.error.HTTPError as exc:
self.render_targets_with_notice(f"测试发送失败HTTP {exc.code}", success=False)
except Exception as exc:
self.render_targets_with_notice(f"测试发送失败:{html.escape(str(exc))}", success=False)
def render_targets_with_notice(self, message: str, success: bool) -> None:
self._target_notice = f"""<section class="result-panel {'success' if success else 'error'}"><h2>Webhook 测试</h2><p>{message}</p></section>"""
self.render_targets()
def create_template(self) -> None:
form = parse_form(self)
now = now_iso()
with self.context.db.connect() as conn:
conn.execute(
"""
INSERT INTO message_templates (
name, description, card_title_template, card_body_template,
created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
form["name"].strip(),
form.get("description", "").strip(),
form["card_title_template"].strip(),
form["card_body_template"].strip(),
now,
now,
),
)
redirect(self, "/templates")
def update_template(self) -> None:
form = parse_form(self)
with self.context.db.connect() as conn:
conn.execute(
"""
UPDATE message_templates
SET name = ?, description = ?, card_title_template = ?,
card_body_template = ?, updated_at = ?
WHERE id = ?
""",
(
form["name"].strip(),
form.get("description", "").strip(),
form["card_title_template"].strip(),
form["card_body_template"].strip(),
now_iso(),
form["id"],
),
)
redirect(self, "/templates")
def delete_template(self) -> None:
form = parse_form(self)
with self.context.db.connect() as conn:
conn.execute("DELETE FROM message_templates WHERE id = ?", (form["id"],))
redirect(self, "/templates")
def create_rule(self) -> None:
form = parse_form_multi(self)
target_ids = [int(value) for value in form.get("target_ids", [])]
timeframe = form.get("timeframe", [""])[-1].strip()
symbol = form.get("symbol", [""])[-1].strip().upper()
strategy = form.get("strategy", [""])[-1].strip()
if not any((timeframe, symbol, strategy)):
self.send_error(400, "周期、品种、策略至少填写一个")
return
if not target_ids:
self.send_error(400, "至少选择一个飞书 Webhook")
return
now = now_iso()
with self.context.db.connect() as conn:
conn.execute(
"""
INSERT INTO routing_rules (
name, timeframe, symbol, strategy, priority, message_type,
card_title_template, card_body_template, enabled, target_ids,
created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
form.get("name", [""])[-1].strip(),
timeframe,
symbol,
strategy,
int(form.get("priority", ["100"])[-1]),
"card",
form.get("card_title_template", ["TradingView {{symbol}} {{action}}"])[-1].strip(),
form.get("card_body_template", [""])[-1].strip(),
1 if form.get("enabled", [""])[-1] == "on" else 0,
to_json(target_ids),
now,
now,
),
)
redirect(self, "/rules")
def delete_rule(self) -> None:
form = parse_form(self)
with self.context.db.connect() as conn:
conn.execute("DELETE FROM routing_rules WHERE id = ?", (form["id"],))
redirect(self, "/rules")
def toggle_rule(self) -> None:
form = parse_form(self)
enabled = 1 if form.get("enabled") == "on" else 0
with self.context.db.connect() as conn:
conn.execute(
"UPDATE routing_rules SET enabled = ?, updated_at = ? WHERE id = ?",
(enabled, now_iso(), form.get("id", "")),
)
redirect(self, "/rules")
def update_rule(self) -> None:
form = parse_form_multi(self)
target_ids = [int(value) for value in form.get("target_ids", [])]
timeframe = form.get("timeframe", [""])[-1].strip()
symbol = form.get("symbol", [""])[-1].strip().upper()
strategy = form.get("strategy", [""])[-1].strip()
if not any((timeframe, symbol, strategy)):
self.send_error(400, "周期、品种、策略至少填写一个")
return
if not target_ids:
self.send_error(400, "至少选择一个飞书 Webhook")
return
with self.context.db.connect() as conn:
conn.execute(
"""
UPDATE routing_rules
SET name = ?, timeframe = ?, symbol = ?, strategy = ?, priority = ?,
message_type = ?, card_title_template = ?, card_body_template = ?,
enabled = ?, target_ids = ?, updated_at = ?
WHERE id = ?
""",
(
form.get("name", [""])[-1].strip(),
timeframe,
symbol,
strategy,
int(form.get("priority", ["100"])[-1]),
"card",
form.get("card_title_template", ["TradingView {{symbol}} {{action}}"])[-1].strip(),
form.get("card_body_template", [""])[-1].strip(),
1 if form.get("enabled", [""])[-1] == "on" else 0,
to_json(target_ids),
now_iso(),
form.get("id", [""])[-1],
),
)
redirect(self, "/rules")
def send_test(self) -> None:
form = parse_form(self)
payload_text = form.get("payload", "{}")
try:
payload = json.loads(payload_text)
result = self.context.dispatcher.receive_alert(payload)
delivery_text = ", ".join(str(item) for item in result.get("delivery_ids", [])) or "-"
self._test_result_html = f"""<section class="result-panel success">
<h2>测试结果</h2>
<p>测试 alert 已入队worker 会异步发送飞书;如果开启 DISPATCH_INLINE 才会在当前请求里立即发送。</p>
<div class="result-grid">
<div><span>Alert ID</span><strong>{result.get("alert_id")}</strong></div>
<div><span>状态</span><strong>{html.escape(str(result.get("status")))}</strong></div>
<div><span>命中规则</span><strong>{html.escape(str(result.get("matched_rule_id") or "-"))}</strong></div>
<div><span>Delivery</span><strong>{html.escape(delivery_text)}</strong></div>
</div>
<details><summary>查看响应 JSON</summary><pre>{html.escape(json.dumps(result, ensure_ascii=False, indent=2))}</pre></details>
</section>"""
self.render_test()
except (json.JSONDecodeError, ValidationError) as exc:
self._test_result_html = f"""<section class="result-panel error">
<h2>测试失败</h2>
<p>{html.escape(str(exc))}</p>
</section>"""
self.render_test()
def change_password(self) -> None:
form = parse_form(self)
current_password = form.get("current_password", "")
new_password = form.get("new_password", "")
confirm_password = form.get("confirm_password", "")
if not check_credentials(
self.context.settings,
self.context.settings.admin_username,
current_password,
self.get_admin_password_hash(),
):
json_response(self, 400, {"error": "当前密码不正确"})
return
if len(new_password) < 8:
json_response(self, 400, {"error": "新密码至少需要 8 位"})
return
if new_password != confirm_password:
json_response(self, 400, {"error": "两次输入的新密码不一致"})
return
with self.context.db.connect() as conn:
conn.execute(
"UPDATE admin_settings SET password_hash = ?, updated_at = ? WHERE id = 1",
(hash_password(new_password), now_iso()),
)
self.logout()
def retry_deliveries(self) -> None:
self.context.dispatcher.process_due_deliveries(
limit=self.context.settings.delivery_batch_size,
concurrency=self.context.settings.delivery_concurrency,
)
redirect(self, "/logs")
def retry_delivery_one(self) -> None:
form = parse_form(self)
try:
delivery_id = int(form.get("id", ""))
except ValueError:
self.send_error(400, "Invalid delivery id")
return
self.context.dispatcher.retry_delivery_now(delivery_id)
redirect(self, "/logs?tab=deliveries&page=1")
def make_handler(context: AppContext) -> type[Handler]:
class BoundHandler(Handler):
pass
BoundHandler.context = context
return BoundHandler
def run() -> None:
settings = get_settings()
context = AppContext(settings)
context.db.cleanup_old_logs(settings.retention_days)
server = ThreadingHTTPServer((settings.host, settings.port), make_handler(context))
print(f"Serving {settings.app_name} on http://{settings.host}:{settings.port}")
server.serve_forever()
if __name__ == "__main__":
run()