tradingview-alert-dispatcher/app/server.py
2026-05-14 21:40:22 +08:00

588 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import html
import json
import mimetypes
import os
from http import HTTPStatus
from http.cookies import SimpleCookie
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse
from app.auth import COOKIE_NAME, check_credentials, hash_password, is_valid_session, make_session_cookie
from app.config import Settings, get_settings
from app.db import Database, from_json, now_iso, to_json
from app.dispatcher import Dispatcher, ValidationError
class AppContext:
def __init__(self, settings: Settings):
self.settings = settings
self.db = Database(settings)
self.db.migrate(settings)
self.dispatcher = Dispatcher(self.db, settings)
def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any] | list[Any]) -> None:
body = json.dumps(payload, ensure_ascii=False).encode()
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)
def redirect(handler: BaseHTTPRequestHandler, location: str) -> None:
handler.send_response(HTTPStatus.SEE_OTHER)
handler.send_header("Location", location)
handler.end_headers()
def read_body(handler: BaseHTTPRequestHandler) -> bytes:
length = int(handler.headers.get("Content-Length", "0") or "0")
return handler.rfile.read(length)
def parse_form(handler: BaseHTTPRequestHandler) -> dict[str, str]:
data = read_body(handler).decode()
return {key: values[-1] for key, values in parse_qs(data).items()}
def parse_form_multi(handler: BaseHTTPRequestHandler) -> dict[str, list[str]]:
return parse_qs(read_body(handler).decode())
def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
try:
value = json.loads(read_body(handler).decode() or "{}")
except json.JSONDecodeError as exc:
raise ValidationError("Request body must be valid JSON") from exc
if not isinstance(value, dict):
raise ValidationError("Request body must be a JSON object")
return value
def target_select_options(
targets: list[dict[str, Any]],
selected_ids: list[int] | None = None,
placeholder: bool = False,
) -> str:
selected_ids = selected_ids or []
options = ['<option value="">请选择飞书 Webhook</option>'] if placeholder else []
for target in targets:
selected = "selected" if target["id"] in selected_ids else ""
disabled = "" if target["enabled"] else "disabled"
suffix = "" if target["enabled"] else " (停用)"
options.append(
f'<option value="{target["id"]}" {selected} {disabled}>{html.escape(target["name"])}{suffix}</option>'
)
return "".join(options)
class Handler(BaseHTTPRequestHandler):
context: AppContext
def log_message(self, format: str, *args: Any) -> None:
print("%s - - [%s] %s" % (self.address_string(), self.log_date_time_string(), format % args))
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/health":
json_response(self, 200, {"ok": True})
return
if parsed.path == "/login":
self.render_login()
return
if parsed.path.startswith("/static/"):
self.serve_static(parsed.path)
return
if not self.require_auth():
return
if parsed.path in ("/", "/dashboard"):
self.render_dashboard()
elif parsed.path == "/targets":
self.render_targets()
elif parsed.path == "/rules":
self.render_rules()
elif parsed.path == "/logs":
self.render_logs()
elif parsed.path == "/test":
self.render_test()
elif parsed.path == "/account":
self.render_account()
elif parsed.path == "/api/targets":
json_response(self, 200, self.list_targets())
elif parsed.path == "/api/rules":
json_response(self, 200, self.list_rules())
elif parsed.path == "/api/logs":
json_response(self, 200, self.list_logs())
else:
self.send_error(404)
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/webhook/tradingview":
self.handle_tradingview_webhook()
return
if parsed.path == "/login":
self.handle_login()
return
if not self.require_auth():
return
routes = {
"/targets/create": self.create_target,
"/targets/update": self.update_target,
"/targets/delete": self.delete_target,
"/rules/create": self.create_rule,
"/rules/update": self.update_rule,
"/rules/delete": self.delete_rule,
"/test/send": self.send_test,
"/account/password": self.change_password,
"/deliveries/retry": self.retry_deliveries,
"/logout": self.logout,
}
handler = routes.get(parsed.path)
if not handler:
self.send_error(404)
return
handler()
def require_auth(self) -> bool:
if is_valid_session(self.context.settings, self.headers.get("Cookie")):
return True
redirect(self, "/login")
return False
def layout(self, title: str, body: str) -> bytes:
nav = [
("/dashboard", "概览"),
("/rules", "路由规则"),
("/targets", "飞书 Webhook"),
("/logs", "日志"),
("/test", "测试发送"),
("/account", "账号安全"),
]
items = "".join(f'<a href="{href}">{label}</a>' for href, label in nav)
return f"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{html.escape(title)}</title>
<link rel="stylesheet" href="/static/styles.css">
<script src="/static/app.js" defer></script>
</head>
<body>
<aside class="sidebar">
<div class="brand">TV Dispatch</div>
<nav>{items}</nav>
<form method="post" action="/logout"><button class="ghost" type="submit">退出</button></form>
</aside>
<main class="shell">{body}</main>
</body>
</html>""".encode()
def send_html(self, title: str, body: str) -> None:
content = self.layout(title, body)
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def render_login(self) -> None:
content = """<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Login</title>
<link rel="stylesheet" href="/static/styles.css">
</head>
<body class="login-page">
<form class="login-card" method="post" action="/login">
<h1>TV Dispatch</h1>
<p>TradingView alert routing console</p>
<label>用户名<input name="username" autocomplete="username" required></label>
<label>密码<input name="password" type="password" autocomplete="current-password" required></label>
<button type="submit">登录</button>
</form>
</body>
</html>""".encode()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def handle_login(self) -> None:
form = parse_form(self)
if not check_credentials(
self.context.settings,
form.get("username", ""),
form.get("password", ""),
self.get_admin_password_hash(),
):
redirect(self, "/login")
return
cookie = SimpleCookie()
cookie[COOKIE_NAME] = make_session_cookie(self.context.settings)
cookie[COOKIE_NAME]["path"] = "/"
cookie[COOKIE_NAME]["httponly"] = True
cookie[COOKIE_NAME]["samesite"] = "Lax"
self.send_response(HTTPStatus.SEE_OTHER)
self.send_header("Location", "/dashboard")
self.send_header("Set-Cookie", cookie.output(header="").strip())
self.end_headers()
def get_admin_password_hash(self) -> str:
with self.context.db.connect() as conn:
row = conn.execute("SELECT password_hash FROM admin_settings WHERE id = 1").fetchone()
return row["password_hash"]
def logout(self) -> None:
self.send_response(HTTPStatus.SEE_OTHER)
self.send_header("Location", "/login")
self.send_header("Set-Cookie", f"{COOKIE_NAME}=; Path=/; Max-Age=0; HttpOnly; SameSite=Lax")
self.end_headers()
def serve_static(self, path: str) -> None:
local_path = os.path.join(os.path.dirname(__file__), "static", os.path.basename(path))
if not os.path.exists(local_path):
self.send_error(404)
return
with open(local_path, "rb") as file:
content = file.read()
self.send_response(200)
self.send_header("Content-Type", mimetypes.guess_type(local_path)[0] or "application/octet-stream")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def handle_tradingview_webhook(self) -> None:
try:
payload = parse_json_body(self)
result = self.context.dispatcher.receive_alert(payload)
json_response(self, 202, result)
except ValidationError as exc:
json_response(self, 400, {"error": str(exc)})
def list_targets(self) -> list[dict[str, Any]]:
with self.context.db.connect() as conn:
rows = conn.execute("SELECT * FROM webhook_targets ORDER BY id DESC").fetchall()
return [dict(row) for row in rows]
def list_rules(self) -> list[dict[str, Any]]:
with self.context.db.connect() as conn:
rows = conn.execute("SELECT * FROM routing_rules ORDER BY priority ASC, id DESC").fetchall()
rules = []
for row in rows:
item = dict(row)
item["target_ids"] = from_json(item["target_ids"], [])
rules.append(item)
return rules
def list_logs(self) -> dict[str, list[dict[str, Any]]]:
with self.context.db.connect() as conn:
alerts = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 100").fetchall()
deliveries = conn.execute("SELECT * FROM deliveries ORDER BY id DESC LIMIT 200").fetchall()
return {"alerts": [dict(row) for row in alerts], "deliveries": [dict(row) for row in deliveries]}
def render_dashboard(self) -> None:
with self.context.db.connect() as conn:
counts = {
"alerts": conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"],
"rules": conn.execute("SELECT COUNT(*) AS c FROM routing_rules").fetchone()["c"],
"targets": conn.execute("SELECT COUNT(*) AS c FROM webhook_targets").fetchone()["c"],
"pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status IN ('pending','retry')").fetchone()["c"],
}
recent = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 8").fetchall()
cards = "".join(f'<div class="metric"><span>{label}</span><strong>{value}</strong></div>' for label, value in [
("Alerts", counts["alerts"]),
("Rules", counts["rules"]),
("Targets", counts["targets"]),
("Pending", counts["pending"]),
])
rows = "".join(
f"<tr><td>{row['id']}</td><td>{html.escape(row['symbol'])}</td><td>{html.escape(row['timeframe'])}</td><td>{html.escape(row['strategy'])}</td><td><span class='status'>{html.escape(row['status'])}</span></td><td>{row['created_at']}</td></tr>"
for row in recent
)
self.send_html("概览", f"<header><h1>概览</h1><p>结构化 alert 分发、飞书转发和重试状态。</p></header><section class='metrics'>{cards}</section><section><h2>最近 Alert</h2><table><thead><tr><th>ID</th><th>品种</th><th>周期</th><th>策略</th><th>状态</th><th>时间</th></tr></thead><tbody>{rows}</tbody></table></section>")
def render_targets(self) -> None:
targets = self.list_targets()
rows = "".join(
f"""<tr>
<td>{target['id']}<input form="target-update-{target['id']}" type="hidden" name="id" value="{target['id']}"></td>
<td><input form="target-update-{target['id']}" name="name" value="{html.escape(target['name'])}" required></td>
<td class="url"><input form="target-update-{target['id']}" name="webhook_url" value="{html.escape(target['webhook_url'])}" type="url" required></td>
<td><label class="check"><input form="target-update-{target['id']}" name="enabled" type="checkbox" {'checked' if target['enabled'] else ''}> 启用</label></td>
<td><form id="target-update-{target['id']}" class="inline" method="post" action="/targets/update"></form><button form="target-update-{target['id']}" type="submit">更新</button>
<form class="inline" method="post" action="/targets/delete"><input type="hidden" name="id" value="{target['id']}"><button class="danger" type="submit">删除</button></form>
</td></tr>"""
for target in targets
)
form = """<form class="panel" method="post" action="/targets/create">
<h2>新增飞书 Webhook</h2>
<label>名称<input name="name" required></label>
<label>Webhook URL<input name="webhook_url" type="url" required></label>
<label class="check"><input name="enabled" type="checkbox" checked> 启用</label>
<button type="submit">保存目标</button>
</form>"""
self.send_html("飞书 Webhook", f"<header><h1>飞书 Webhook</h1><p>维护所有可分发的飞书机器人地址。</p></header>{form}<table><thead><tr><th>ID</th><th>名称</th><th>URL</th><th>状态</th><th>操作</th></tr></thead><tbody>{rows}</tbody></table>")
def render_rules(self) -> None:
targets = self.list_targets()
rules = self.list_rules()
rows = ""
for rule in rules:
message_type_options = "".join(
f'<option value="{value}" {"selected" if rule["message_type"] == value else ""}>{label}</option>'
for value, label in [("card", "Card"), ("text", "Text")]
)
selected_targets = target_select_options(targets, rule["target_ids"], placeholder=True)
rows += f"""<tr data-message-form>
<td>{rule['id']}<input form="rule-update-{rule['id']}" type="hidden" name="id" value="{rule['id']}"></td>
<td><input form="rule-update-{rule['id']}" name="name" value="{html.escape(rule['name'])}" required></td>
<td><input form="rule-update-{rule['id']}" name="timeframe" value="{html.escape(rule['timeframe'])}" required></td>
<td><input form="rule-update-{rule['id']}" name="symbol" value="{html.escape(rule['symbol'])}" required></td>
<td><input form="rule-update-{rule['id']}" name="strategy" value="{html.escape(rule['strategy'])}" required></td>
<td><input form="rule-update-{rule['id']}" name="priority" type="number" value="{rule['priority']}" required></td>
<td><select class="select-compact" form="rule-update-{rule['id']}" name="message_type" data-message-type>{message_type_options}</select></td>
<td><textarea form="rule-update-{rule['id']}" name="card_title_template" rows="2" data-title-template>{html.escape(rule['card_title_template'])}</textarea></td>
<td><textarea form="rule-update-{rule['id']}" name="card_body_template" rows="4" data-body-template>{html.escape(rule['card_body_template'])}</textarea></td>
<td><select class="select-target" form="rule-update-{rule['id']}" name="target_ids" required>{selected_targets}</select></td>
<td><label class="check"><input form="rule-update-{rule['id']}" name="enabled" type="checkbox" {'checked' if rule['enabled'] else ''}> 启用</label></td>
<td><form id="rule-update-{rule['id']}" class="inline" method="post" action="/rules/update"></form><button form="rule-update-{rule['id']}" type="submit">更新</button><form class="inline" method="post" action="/rules/delete"><input type="hidden" name="id" value="{rule['id']}"><button class="danger" type="submit">删除</button></form></td></tr>"""
create_target_options = target_select_options(targets, placeholder=True)
form = f"""<form class="panel" method="post" action="/rules/create" data-message-form>
<h2>新增路由规则</h2>
<div class="grid">
<label>规则名<input name="name" required></label>
<label>周期<input name="timeframe" placeholder="5m" required></label>
<label>品种<input name="symbol" placeholder="BTCUSDT" required></label>
<label>策略<input name="strategy" placeholder="breakout" required></label>
<label>优先级<input name="priority" type="number" value="100" required></label>
</div>
<div>
<label class="field-compact">消息类型<select class="select-compact" name="message_type" data-message-type><option value="card" selected>Card</option><option value="text">Text</option></select></label>
<label><span data-title-label>卡片标题模板</span><input name="card_title_template" value="TradingView {{{{symbol}}}} {{{{action}}}}" data-title-template required></label>
<label><span data-body-label>卡片正文模板</span><textarea name="card_body_template" rows="5" data-body-template>**品种**: {{{{symbol}}}}
**周期**: {{{{timeframe}}}}
**策略**: {{{{strategy}}}}
**动作**: {{{{action}}}}
**价格**: {{{{price}}}}</textarea></label>
<label class="field-target">发送到<select class="select-target" name="target_ids" required>{create_target_options}</select></label>
</div>
<label class="check"><input name="enabled" type="checkbox" checked> 启用</label>
<button type="submit">保存规则</button>
</form>"""
self.send_html("路由规则", f"<header><h1>路由规则</h1><p>每条规则选择一个飞书 Webhook。模板支持 TradingView JSON 字段,例如 {{{{symbol}}}}{{{{timeframe}}}}{{{{strategy}}}}{{{{price}}}},嵌套字段可写 {{{{order.id}}}}。</p></header>{form}<table><thead><tr><th>ID</th><th>名称</th><th>周期</th><th>品种</th><th>策略</th><th>优先级</th><th>消息</th><th>标题模板</th><th>内容模板</th><th>发送到</th><th>状态</th><th>操作</th></tr></thead><tbody>{rows}</tbody></table>")
def render_logs(self) -> None:
logs = self.list_logs()
alert_rows = "".join(
f"<tr><td>{row['id']}</td><td>{html.escape(row['symbol'])}</td><td>{html.escape(row['timeframe'])}</td><td>{html.escape(row['strategy'])}</td><td><span class='status'>{html.escape(row['status'])}</span></td><td>{html.escape(row['error'] or '')}</td><td>{row['created_at']}</td></tr>"
for row in logs["alerts"]
)
delivery_rows = "".join(
f"<tr><td>{row['id']}</td><td>{row['alert_id']}</td><td>{html.escape(row['target_name'])}</td><td><span class='status'>{html.escape(row['status'])}</span></td><td>{row['attempts']}</td><td>{html.escape(str(row['response_code'] or ''))}</td><td>{html.escape(row['error'] or '')}</td><td>{html.escape(row['next_attempt_at'] or '')}</td></tr>"
for row in logs["deliveries"]
)
body = f"""<header><h1>日志</h1><p>最近 100 条 alert 和 200 条分发任务。</p></header>
<form method="post" action="/deliveries/retry"><button type="submit">处理到期重试</button></form>
<section><h2>Alert 日志</h2><table><thead><tr><th>ID</th><th>品种</th><th>周期</th><th>策略</th><th>状态</th><th>错误</th><th>时间</th></tr></thead><tbody>{alert_rows}</tbody></table></section>
<section><h2>Delivery 日志</h2><table><thead><tr><th>ID</th><th>Alert</th><th>目标</th><th>状态</th><th>次数</th><th>HTTP</th><th>错误</th><th>下次重试</th></tr></thead><tbody>{delivery_rows}</tbody></table></section>"""
self.send_html("日志", body)
def render_test(self) -> None:
sample = html.escape(json.dumps({"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000}, indent=2))
result = getattr(self, "_test_result_html", "")
body = f"""<header><h1>测试发送</h1><p>提交一条模拟 TradingView alert走完整匹配和飞书转发流程。</p></header>
<form class="panel" method="post" action="/test/send">
<label>Alert JSON<textarea name="payload" rows="12">{sample}</textarea></label>
<button type="submit">发送测试 Alert</button>
</form>"""
if result:
body += result
self.send_html("测试发送", body)
def render_account(self) -> None:
body = """<header><h1>账号安全</h1><p>修改当前管理员密码,修改成功后会退出登录。</p></header>
<form class="panel narrow" method="post" action="/account/password">
<h2>修改密码</h2>
<label>当前密码<input name="current_password" type="password" autocomplete="current-password" required></label>
<label>新密码<input name="new_password" type="password" autocomplete="new-password" minlength="8" required></label>
<label>确认新密码<input name="confirm_password" type="password" autocomplete="new-password" minlength="8" required></label>
<button type="submit">更新密码</button>
</form>"""
self.send_html("账号安全", body)
def create_target(self) -> None:
form = parse_form(self)
now = now_iso()
with self.context.db.connect() as conn:
conn.execute(
"INSERT INTO webhook_targets (name, webhook_url, enabled, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
(form["name"].strip(), form["webhook_url"].strip(), 1 if form.get("enabled") == "on" else 0, now, now),
)
redirect(self, "/targets")
def update_target(self) -> None:
form = parse_form(self)
with self.context.db.connect() as conn:
conn.execute(
"UPDATE webhook_targets SET name = ?, webhook_url = ?, enabled = ?, updated_at = ? WHERE id = ?",
(form["name"].strip(), form["webhook_url"].strip(), 1 if form.get("enabled") == "on" else 0, now_iso(), form["id"]),
)
redirect(self, "/targets")
def delete_target(self) -> None:
form = parse_form(self)
with self.context.db.connect() as conn:
conn.execute("DELETE FROM webhook_targets WHERE id = ?", (form["id"],))
redirect(self, "/targets")
def create_rule(self) -> None:
form = parse_form_multi(self)
target_ids = [int(value) for value in form.get("target_ids", [])]
now = now_iso()
with self.context.db.connect() as conn:
conn.execute(
"""
INSERT INTO routing_rules (
name, timeframe, symbol, strategy, priority, message_type,
card_title_template, card_body_template, enabled, target_ids,
created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
form.get("name", [""])[-1].strip(),
form.get("timeframe", [""])[-1].strip(),
form.get("symbol", [""])[-1].strip().upper(),
form.get("strategy", [""])[-1].strip(),
int(form.get("priority", ["100"])[-1]),
form.get("message_type", ["card"])[-1],
form.get("card_title_template", ["TradingView {{symbol}} {{action}}"])[-1].strip(),
form.get("card_body_template", [""])[-1].strip(),
1 if form.get("enabled", [""])[-1] == "on" else 0,
to_json(target_ids),
now,
now,
),
)
redirect(self, "/rules")
def delete_rule(self) -> None:
form = parse_form(self)
with self.context.db.connect() as conn:
conn.execute("DELETE FROM routing_rules WHERE id = ?", (form["id"],))
redirect(self, "/rules")
def update_rule(self) -> None:
form = parse_form_multi(self)
target_ids = [int(value) for value in form.get("target_ids", [])]
with self.context.db.connect() as conn:
conn.execute(
"""
UPDATE routing_rules
SET name = ?, timeframe = ?, symbol = ?, strategy = ?, priority = ?,
message_type = ?, card_title_template = ?, card_body_template = ?,
enabled = ?, target_ids = ?, updated_at = ?
WHERE id = ?
""",
(
form.get("name", [""])[-1].strip(),
form.get("timeframe", [""])[-1].strip(),
form.get("symbol", [""])[-1].strip().upper(),
form.get("strategy", [""])[-1].strip(),
int(form.get("priority", ["100"])[-1]),
form.get("message_type", ["card"])[-1],
form.get("card_title_template", ["TradingView {{symbol}} {{action}}"])[-1].strip(),
form.get("card_body_template", [""])[-1].strip(),
1 if form.get("enabled", [""])[-1] == "on" else 0,
to_json(target_ids),
now_iso(),
form.get("id", [""])[-1],
),
)
redirect(self, "/rules")
def send_test(self) -> None:
form = parse_form(self)
payload_text = form.get("payload", "{}")
try:
payload = json.loads(payload_text)
result = self.context.dispatcher.receive_alert(payload)
delivery_text = ", ".join(str(item) for item in result.get("delivery_ids", [])) or "-"
self._test_result_html = f"""<section class="result-panel success">
<h2>测试结果</h2>
<div class="result-grid">
<div><span>Alert ID</span><strong>{result.get("alert_id")}</strong></div>
<div><span>状态</span><strong>{html.escape(str(result.get("status")))}</strong></div>
<div><span>命中规则</span><strong>{html.escape(str(result.get("matched_rule_id") or "-"))}</strong></div>
<div><span>Delivery</span><strong>{html.escape(delivery_text)}</strong></div>
</div>
<details><summary>查看响应 JSON</summary><pre>{html.escape(json.dumps(result, ensure_ascii=False, indent=2))}</pre></details>
</section>"""
self.render_test()
except (json.JSONDecodeError, ValidationError) as exc:
self._test_result_html = f"""<section class="result-panel error">
<h2>测试失败</h2>
<p>{html.escape(str(exc))}</p>
</section>"""
self.render_test()
def change_password(self) -> None:
form = parse_form(self)
current_password = form.get("current_password", "")
new_password = form.get("new_password", "")
confirm_password = form.get("confirm_password", "")
if not check_credentials(
self.context.settings,
self.context.settings.admin_username,
current_password,
self.get_admin_password_hash(),
):
json_response(self, 400, {"error": "当前密码不正确"})
return
if len(new_password) < 8:
json_response(self, 400, {"error": "新密码至少需要 8 位"})
return
if new_password != confirm_password:
json_response(self, 400, {"error": "两次输入的新密码不一致"})
return
with self.context.db.connect() as conn:
conn.execute(
"UPDATE admin_settings SET password_hash = ?, updated_at = ? WHERE id = 1",
(hash_password(new_password), now_iso()),
)
self.logout()
def retry_deliveries(self) -> None:
self.context.dispatcher.process_due_deliveries(limit=100)
redirect(self, "/logs")
def make_handler(context: AppContext) -> type[Handler]:
class BoundHandler(Handler):
pass
BoundHandler.context = context
return BoundHandler
def run() -> None:
settings = get_settings()
context = AppContext(settings)
context.db.cleanup_old_logs(settings.retention_days)
server = ThreadingHTTPServer((settings.host, settings.port), make_handler(context))
print(f"Serving {settings.app_name} on http://{settings.host}:{settings.port}")
server.serve_forever()
if __name__ == "__main__":
run()