832 lines
39 KiB
Python
832 lines
39 KiB
Python
from __future__ import annotations
|
||
|
||
import html
|
||
import json
|
||
import mimetypes
|
||
import os
|
||
import urllib.error
|
||
import urllib.request
|
||
from http import HTTPStatus
|
||
from http.cookies import SimpleCookie
|
||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||
from typing import Any
|
||
from urllib.parse import parse_qs, urlparse
|
||
|
||
from app.auth import COOKIE_NAME, check_credentials, hash_password, is_valid_session, make_session_cookie
|
||
from app.config import Settings, get_settings
|
||
from app.db import Database, from_json, now_iso, to_json
|
||
from app.dispatcher import Dispatcher, ValidationError, build_feishu_message, normalize_alert
|
||
|
||
|
||
class AppContext:
|
||
def __init__(self, settings: Settings):
|
||
self.settings = settings
|
||
self.db = Database(settings)
|
||
self.db.migrate(settings)
|
||
self.dispatcher = Dispatcher(self.db, settings)
|
||
|
||
|
||
def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any] | list[Any]) -> None:
|
||
body = json.dumps(payload, ensure_ascii=False).encode()
|
||
handler.send_response(status)
|
||
handler.send_header("Content-Type", "application/json; charset=utf-8")
|
||
handler.send_header("Content-Length", str(len(body)))
|
||
handler.end_headers()
|
||
handler.wfile.write(body)
|
||
|
||
|
||
def redirect(handler: BaseHTTPRequestHandler, location: str) -> None:
|
||
handler.send_response(HTTPStatus.SEE_OTHER)
|
||
handler.send_header("Location", location)
|
||
handler.end_headers()
|
||
|
||
|
||
def read_body(handler: BaseHTTPRequestHandler) -> bytes:
|
||
length = int(handler.headers.get("Content-Length", "0") or "0")
|
||
return handler.rfile.read(length)
|
||
|
||
|
||
def parse_form(handler: BaseHTTPRequestHandler) -> dict[str, str]:
|
||
data = read_body(handler).decode()
|
||
return {key: values[-1] for key, values in parse_qs(data).items()}
|
||
|
||
|
||
def parse_form_multi(handler: BaseHTTPRequestHandler) -> dict[str, list[str]]:
|
||
return parse_qs(read_body(handler).decode())
|
||
|
||
|
||
def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
|
||
try:
|
||
value = json.loads(read_body(handler).decode() or "{}")
|
||
except json.JSONDecodeError as exc:
|
||
raise ValidationError("Request body must be valid JSON") from exc
|
||
if not isinstance(value, dict):
|
||
raise ValidationError("Request body must be a JSON object")
|
||
return value
|
||
|
||
|
||
def target_checkbox_options(
|
||
targets: list[dict[str, Any]],
|
||
selected_ids: list[int] | None = None,
|
||
) -> str:
|
||
selected_ids = selected_ids or []
|
||
if not targets:
|
||
return '<p class="warning">还没有可用的飞书 Webhook,请先到飞书 Webhook 页面创建。</p>'
|
||
options = []
|
||
for target in targets:
|
||
checked = "checked" if target["id"] in selected_ids else ""
|
||
disabled = "" if target["enabled"] else "disabled"
|
||
suffix = "" if target["enabled"] else " (停用)"
|
||
options.append(
|
||
f'<label class="target-choice"><input type="checkbox" name="target_ids" value="{target["id"]}" {checked} {disabled}> <span>{html.escape(target["name"])}{suffix}</span></label>'
|
||
)
|
||
return "".join(options)
|
||
|
||
|
||
class Handler(BaseHTTPRequestHandler):
|
||
context: AppContext
|
||
|
||
def log_message(self, format: str, *args: Any) -> None:
|
||
print("%s - - [%s] %s" % (self.address_string(), self.log_date_time_string(), format % args))
|
||
|
||
def do_GET(self) -> None:
|
||
parsed = urlparse(self.path)
|
||
if parsed.path == "/health":
|
||
json_response(self, 200, {"ok": True})
|
||
return
|
||
if parsed.path == "/login":
|
||
self.render_login()
|
||
return
|
||
if parsed.path.startswith("/static/"):
|
||
self.serve_static(parsed.path)
|
||
return
|
||
if not self.require_auth():
|
||
return
|
||
if parsed.path in ("/", "/dashboard"):
|
||
self.render_dashboard()
|
||
elif parsed.path == "/targets":
|
||
self.render_targets()
|
||
elif parsed.path == "/targets/delete":
|
||
self.render_target_delete(parsed)
|
||
elif parsed.path == "/rules":
|
||
self.render_rules()
|
||
elif parsed.path == "/rules/new":
|
||
self.render_rule_new()
|
||
elif parsed.path == "/rules/edit":
|
||
self.render_rule_edit(parsed)
|
||
elif parsed.path == "/rules/delete":
|
||
self.render_rule_delete(parsed)
|
||
elif parsed.path == "/logs":
|
||
self.render_logs()
|
||
elif parsed.path == "/test":
|
||
self.render_test()
|
||
elif parsed.path == "/account":
|
||
self.render_account()
|
||
elif parsed.path == "/api/targets":
|
||
json_response(self, 200, self.list_targets())
|
||
elif parsed.path == "/api/rules":
|
||
json_response(self, 200, self.list_rules())
|
||
elif parsed.path == "/api/logs":
|
||
json_response(self, 200, self.list_logs())
|
||
else:
|
||
self.send_error(404)
|
||
|
||
def do_POST(self) -> None:
|
||
parsed = urlparse(self.path)
|
||
if parsed.path == "/webhook/tradingview":
|
||
self.handle_tradingview_webhook()
|
||
return
|
||
if parsed.path == "/login":
|
||
self.handle_login()
|
||
return
|
||
if not self.require_auth():
|
||
return
|
||
routes = {
|
||
"/targets/create": self.create_target,
|
||
"/targets/update": self.update_target,
|
||
"/targets/delete": self.delete_target,
|
||
"/targets/test": self.test_target,
|
||
"/rules/create": self.create_rule,
|
||
"/rules/update": self.update_rule,
|
||
"/rules/delete": self.delete_rule,
|
||
"/rules/preview": self.preview_rule,
|
||
"/test/send": self.send_test,
|
||
"/account/password": self.change_password,
|
||
"/deliveries/retry": self.retry_deliveries,
|
||
"/logout": self.logout,
|
||
}
|
||
handler = routes.get(parsed.path)
|
||
if not handler:
|
||
self.send_error(404)
|
||
return
|
||
handler()
|
||
|
||
def require_auth(self) -> bool:
|
||
if is_valid_session(self.context.settings, self.headers.get("Cookie")):
|
||
return True
|
||
redirect(self, "/login")
|
||
return False
|
||
|
||
def layout(self, title: str, body: str) -> bytes:
|
||
nav = [
|
||
("/dashboard", "概览"),
|
||
("/rules", "路由规则"),
|
||
("/targets", "飞书 Webhook"),
|
||
("/logs", "日志"),
|
||
("/test", "测试发送"),
|
||
("/account", "账号安全"),
|
||
]
|
||
items = "".join(f'<a href="{href}">{label}</a>' for href, label in nav)
|
||
return f"""<!doctype html>
|
||
<html lang="zh-CN">
|
||
<head>
|
||
<meta charset="utf-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||
<title>{html.escape(title)}</title>
|
||
<link rel="stylesheet" href="/static/styles.css">
|
||
<script src="/static/app.js" defer></script>
|
||
</head>
|
||
<body>
|
||
<aside class="sidebar">
|
||
<div class="brand">TV Dispatch</div>
|
||
<nav>{items}</nav>
|
||
<form method="post" action="/logout"><button class="ghost" type="submit">退出</button></form>
|
||
</aside>
|
||
<main class="shell">{body}</main>
|
||
</body>
|
||
</html>""".encode()
|
||
|
||
def send_html(self, title: str, body: str) -> None:
|
||
content = self.layout(title, body)
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||
self.send_header("Content-Length", str(len(content)))
|
||
self.end_headers()
|
||
self.wfile.write(content)
|
||
|
||
def render_login(self) -> None:
|
||
content = """<!doctype html>
|
||
<html lang="zh-CN">
|
||
<head>
|
||
<meta charset="utf-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||
<title>Login</title>
|
||
<link rel="stylesheet" href="/static/styles.css">
|
||
</head>
|
||
<body class="login-page">
|
||
<form class="login-card" method="post" action="/login">
|
||
<h1>TV Dispatch</h1>
|
||
<p>TradingView alert routing console</p>
|
||
<label>用户名<input name="username" autocomplete="username" required></label>
|
||
<label>密码<input name="password" type="password" autocomplete="current-password" required></label>
|
||
<button type="submit">登录</button>
|
||
</form>
|
||
</body>
|
||
</html>""".encode()
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||
self.send_header("Content-Length", str(len(content)))
|
||
self.end_headers()
|
||
self.wfile.write(content)
|
||
|
||
def handle_login(self) -> None:
|
||
form = parse_form(self)
|
||
if not check_credentials(
|
||
self.context.settings,
|
||
form.get("username", ""),
|
||
form.get("password", ""),
|
||
self.get_admin_password_hash(),
|
||
):
|
||
redirect(self, "/login")
|
||
return
|
||
cookie = SimpleCookie()
|
||
cookie[COOKIE_NAME] = make_session_cookie(self.context.settings)
|
||
cookie[COOKIE_NAME]["path"] = "/"
|
||
cookie[COOKIE_NAME]["httponly"] = True
|
||
cookie[COOKIE_NAME]["samesite"] = "Lax"
|
||
self.send_response(HTTPStatus.SEE_OTHER)
|
||
self.send_header("Location", "/dashboard")
|
||
self.send_header("Set-Cookie", cookie.output(header="").strip())
|
||
self.end_headers()
|
||
|
||
def get_admin_password_hash(self) -> str:
|
||
with self.context.db.connect() as conn:
|
||
row = conn.execute("SELECT password_hash FROM admin_settings WHERE id = 1").fetchone()
|
||
return row["password_hash"]
|
||
|
||
def logout(self) -> None:
|
||
self.send_response(HTTPStatus.SEE_OTHER)
|
||
self.send_header("Location", "/login")
|
||
self.send_header("Set-Cookie", f"{COOKIE_NAME}=; Path=/; Max-Age=0; HttpOnly; SameSite=Lax")
|
||
self.end_headers()
|
||
|
||
def serve_static(self, path: str) -> None:
|
||
local_path = os.path.join(os.path.dirname(__file__), "static", os.path.basename(path))
|
||
if not os.path.exists(local_path):
|
||
self.send_error(404)
|
||
return
|
||
with open(local_path, "rb") as file:
|
||
content = file.read()
|
||
self.send_response(200)
|
||
self.send_header("Content-Type", mimetypes.guess_type(local_path)[0] or "application/octet-stream")
|
||
self.send_header("Content-Length", str(len(content)))
|
||
self.end_headers()
|
||
self.wfile.write(content)
|
||
|
||
def handle_tradingview_webhook(self) -> None:
|
||
if self.context.settings.webhook_token:
|
||
query = parse_qs(urlparse(self.path).query)
|
||
token = self.headers.get("X-Webhook-Token") or query.get("token", [""])[-1]
|
||
if token != self.context.settings.webhook_token:
|
||
json_response(self, 401, {"error": "Invalid webhook token"})
|
||
return
|
||
try:
|
||
payload = parse_json_body(self)
|
||
result = self.context.dispatcher.receive_alert(payload)
|
||
json_response(self, 202, result)
|
||
except ValidationError as exc:
|
||
json_response(self, 400, {"error": str(exc)})
|
||
|
||
def list_targets(self) -> list[dict[str, Any]]:
|
||
with self.context.db.connect() as conn:
|
||
rows = conn.execute("SELECT * FROM webhook_targets ORDER BY id DESC").fetchall()
|
||
return [dict(row) for row in rows]
|
||
|
||
def list_rules(self) -> list[dict[str, Any]]:
|
||
with self.context.db.connect() as conn:
|
||
rows = conn.execute("SELECT * FROM routing_rules ORDER BY priority ASC, id DESC").fetchall()
|
||
rules = []
|
||
for row in rows:
|
||
item = dict(row)
|
||
item["target_ids"] = from_json(item["target_ids"], [])
|
||
rules.append(item)
|
||
return rules
|
||
|
||
def list_logs(self) -> dict[str, list[dict[str, Any]]]:
|
||
with self.context.db.connect() as conn:
|
||
alerts = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 100").fetchall()
|
||
deliveries = conn.execute("SELECT * FROM deliveries ORDER BY id DESC LIMIT 200").fetchall()
|
||
return {"alerts": [dict(row) for row in alerts], "deliveries": [dict(row) for row in deliveries]}
|
||
|
||
def render_dashboard(self) -> None:
|
||
host = self.headers.get("Host", f"localhost:{self.context.settings.port}")
|
||
scheme = self.headers.get("X-Forwarded-Proto", "http")
|
||
base_url = f"{scheme}://{host}"
|
||
webhook_url = f"{base_url}/webhook/tradingview"
|
||
token = self.context.settings.webhook_token
|
||
webhook_url_with_token = f"{webhook_url}?token={token}" if token else webhook_url
|
||
token_block = (
|
||
f"""<div class="copy-row"><span>Webhook Token</span><code>{html.escape(token)}</code><button type="button" data-copy="{html.escape(token)}">复制</button></div>
|
||
<div class="copy-row"><span>Header 方式</span><code>X-Webhook-Token: {html.escape(token)}</code><button type="button" data-copy="X-Webhook-Token: {html.escape(token)}">复制</button></div>"""
|
||
if token
|
||
else """<p class="warning">当前未设置 WEBHOOK_TOKEN,任何知道地址的人都可以提交 alert。生产环境建议设置。</p>"""
|
||
)
|
||
webhook_panel = f"""<section class="panel">
|
||
<h2>TradingView Webhook 配置</h2>
|
||
<div class="copy-row"><span>Webhook URL</span><code>{html.escape(webhook_url_with_token)}</code><button type="button" data-copy="{html.escape(webhook_url_with_token)}">复制</button></div>
|
||
<div class="copy-row"><span>纯 URL</span><code>{html.escape(webhook_url)}</code><button type="button" data-copy="{html.escape(webhook_url)}">复制</button></div>
|
||
{token_block}
|
||
</section>"""
|
||
with self.context.db.connect() as conn:
|
||
counts = {
|
||
"alerts": conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"],
|
||
"rules": conn.execute("SELECT COUNT(*) AS c FROM routing_rules").fetchone()["c"],
|
||
"targets": conn.execute("SELECT COUNT(*) AS c FROM webhook_targets").fetchone()["c"],
|
||
"pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status IN ('pending','retry')").fetchone()["c"],
|
||
}
|
||
recent = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 8").fetchall()
|
||
cards = "".join(f'<div class="metric"><span>{label}</span><strong>{value}</strong></div>' for label, value in [
|
||
("Alerts", counts["alerts"]),
|
||
("Rules", counts["rules"]),
|
||
("Targets", counts["targets"]),
|
||
("Pending", counts["pending"]),
|
||
])
|
||
rows = "".join(
|
||
f"<tr><td>{row['id']}</td><td>{html.escape(row['symbol'])}</td><td>{html.escape(row['timeframe'])}</td><td>{html.escape(row['strategy'])}</td><td><span class='status'>{html.escape(row['status'])}</span></td><td>{row['created_at']}</td></tr>"
|
||
for row in recent
|
||
)
|
||
self.send_html("概览", f"<header><h1>概览</h1><p>结构化 alert 分发、飞书转发和重试状态。</p></header>{webhook_panel}<section class='metrics'>{cards}</section><section><h2>最近 Alert</h2><table><thead><tr><th>ID</th><th>品种</th><th>周期</th><th>策略</th><th>状态</th><th>时间</th></tr></thead><tbody>{rows}</tbody></table></section>")
|
||
|
||
def render_targets(self) -> None:
|
||
targets = self.list_targets()
|
||
rows = "".join(
|
||
f"""<tr>
|
||
<td>{target['id']}<input form="target-update-{target['id']}" type="hidden" name="id" value="{target['id']}"></td>
|
||
<td><input form="target-update-{target['id']}" name="name" value="{html.escape(target['name'])}" required></td>
|
||
<td class="url"><input form="target-update-{target['id']}" name="webhook_url" value="{html.escape(target['webhook_url'])}" type="url" required></td>
|
||
<td><label class="check"><input form="target-update-{target['id']}" name="enabled" type="checkbox" {'checked' if target['enabled'] else ''}> 启用</label></td>
|
||
<td><form id="target-update-{target['id']}" class="inline" method="post" action="/targets/update"></form><button form="target-update-{target['id']}" type="submit">更新</button>
|
||
<form class="inline" method="post" action="/targets/test"><input type="hidden" name="id" value="{target['id']}"><button type="submit">测试</button></form>
|
||
<a class="button-link danger-link" href="/targets/delete?id={target['id']}">删除</a>
|
||
</td></tr>"""
|
||
for target in targets
|
||
)
|
||
form = """<form class="panel" method="post" action="/targets/create">
|
||
<h2>新增飞书 Webhook</h2>
|
||
<label>名称<input name="name" required></label>
|
||
<label>Webhook URL<input name="webhook_url" type="url" required></label>
|
||
<label class="check"><input name="enabled" type="checkbox" checked> 启用</label>
|
||
<button type="submit">保存目标</button>
|
||
</form>"""
|
||
notice = getattr(self, "_target_notice", "")
|
||
self.send_html("飞书 Webhook", f"<header><h1>飞书 Webhook</h1><p>维护所有可分发的飞书机器人地址。</p></header>{notice}{form}<table><thead><tr><th>ID</th><th>名称</th><th>URL</th><th>状态</th><th>操作</th></tr></thead><tbody>{rows}</tbody></table>")
|
||
|
||
def render_target_delete(self, parsed: Any) -> None:
|
||
target_id = parse_qs(parsed.query).get("id", [""])[-1]
|
||
with self.context.db.connect() as conn:
|
||
target = conn.execute("SELECT * FROM webhook_targets WHERE id = ?", (target_id,)).fetchone()
|
||
if not target:
|
||
self.send_error(404)
|
||
return
|
||
body = f"""<header><h1>删除飞书 Webhook</h1><p>请确认是否删除这个飞书目标。</p></header>
|
||
<section class="panel narrow">
|
||
<h2>{html.escape(target['name'])}</h2>
|
||
<p class="url">{html.escape(target['webhook_url'])}</p>
|
||
<form method="post" action="/targets/delete" class="actions">
|
||
<input type="hidden" name="id" value="{target['id']}">
|
||
<button class="danger" type="submit">确认删除</button>
|
||
<a class="button-link secondary" href="/targets">取消</a>
|
||
</form>
|
||
</section>"""
|
||
self.send_html("删除飞书 Webhook", body)
|
||
|
||
def render_rules(self) -> None:
|
||
targets = self.list_targets()
|
||
rules = self.list_rules()
|
||
target_names = {target["id"]: target["name"] for target in targets}
|
||
rows = ""
|
||
for rule in rules:
|
||
conditions = [
|
||
f"周期={html.escape(rule['timeframe'])}" if rule["timeframe"] else "",
|
||
f"品种={html.escape(rule['symbol'])}" if rule["symbol"] else "",
|
||
f"策略={html.escape(rule['strategy'])}" if rule["strategy"] else "",
|
||
]
|
||
target_name = "<br>".join(
|
||
html.escape(target_names.get(target_id, f"#{target_id}")) for target_id in rule["target_ids"]
|
||
) or "-"
|
||
rows += f"""<tr>
|
||
<td>{rule['id']}</td>
|
||
<td>{html.escape(rule['name'])}</td>
|
||
<td>{'<br>'.join(item for item in conditions if item) or '-'}</td>
|
||
<td>{rule['priority']}</td>
|
||
<td>{html.escape(target_name)}</td>
|
||
<td><span class="status">{'启用' if rule['enabled'] else '停用'}</span></td>
|
||
<td><a class="button-link" href="/rules/edit?id={rule['id']}">编辑</a><a class="button-link danger-link" href="/rules/delete?id={rule['id']}">删除</a></td>
|
||
</tr>"""
|
||
header = """<header class="page-header"><div><h1>路由规则</h1><p>周期、品种、策略至少填写一个;空字段表示不限。消息统一用飞书卡片发送。</p></div><a class="button-link" href="/rules/new">新增规则</a></header>"""
|
||
self.send_html("路由规则", f"{header}<table><thead><tr><th>ID</th><th>名称</th><th>匹配条件</th><th>优先级</th><th>发送到</th><th>状态</th><th>操作</th></tr></thead><tbody>{rows}</tbody></table>")
|
||
|
||
def render_rule_form(
|
||
self,
|
||
title: str,
|
||
action: str,
|
||
rule: dict[str, Any] | None = None,
|
||
preview_html: str = "",
|
||
sample_payload: str | None = None,
|
||
) -> None:
|
||
targets = self.list_targets()
|
||
rule = rule or {
|
||
"id": "",
|
||
"name": "",
|
||
"timeframe": "",
|
||
"symbol": "",
|
||
"strategy": "",
|
||
"priority": 100,
|
||
"card_title_template": "TradingView {{symbol}} {{action}}",
|
||
"card_body_template": "**品种**: {{symbol}}\n**周期**: {{timeframe}}\n**策略**: {{strategy}}\n**动作**: {{action}}\n**价格**: {{price}}",
|
||
"target_ids": [],
|
||
"enabled": 1,
|
||
}
|
||
selected_targets = target_checkbox_options(targets, rule.get("target_ids", []))
|
||
hidden_id = f'<input type="hidden" name="id" value="{rule["id"]}">' if rule.get("id") else ""
|
||
button_text = "保存修改" if rule.get("id") else "创建规则"
|
||
sample_payload = sample_payload or json.dumps(
|
||
{"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000},
|
||
ensure_ascii=False,
|
||
indent=2,
|
||
)
|
||
body = f"""<header><h1>{html.escape(title)}</h1><p>消息统一使用飞书卡片。周期、品种、策略至少填写一个,空字段表示不限。</p></header>
|
||
<form class="panel rule-form" method="post" action="{action}">
|
||
{hidden_id}
|
||
<div class="grid">
|
||
<label>规则名<input name="name" value="{html.escape(str(rule['name']))}" required></label>
|
||
<label>周期<input name="timeframe" value="{html.escape(str(rule['timeframe']))}" placeholder="5m,空=不限"></label>
|
||
<label>品种<input name="symbol" value="{html.escape(str(rule['symbol']))}" placeholder="BTCUSDT,空=不限"></label>
|
||
<label>策略<input name="strategy" value="{html.escape(str(rule['strategy']))}" placeholder="breakout,空=不限"></label>
|
||
<label>优先级<input name="priority" type="number" value="{rule['priority']}" required></label>
|
||
</div>
|
||
<label>卡片标题模板<input name="card_title_template" value="{html.escape(str(rule['card_title_template']))}" required></label>
|
||
<label>卡片正文模板<textarea name="card_body_template" rows="6">{html.escape(str(rule['card_body_template']))}</textarea></label>
|
||
<div class="field-target"><span class="field-label">发送到</span><div class="target-choices">{selected_targets}</div></div>
|
||
<label class="check"><input name="enabled" type="checkbox" {'checked' if rule.get('enabled') else ''}> 启用</label>
|
||
<h2>规则命中与卡片预览</h2>
|
||
<input type="hidden" name="source_action" value="{html.escape(action)}">
|
||
<label>样例 Alert JSON<textarea name="sample_payload" rows="9">{html.escape(sample_payload)}</textarea></label>
|
||
<div class="actions"><button type="submit">{button_text}</button><button type="submit" formaction="/rules/preview">预览命中和卡片</button><a class="button-link secondary" href="/rules">返回列表</a></div>
|
||
</form>{preview_html}"""
|
||
self.send_html(title, body)
|
||
|
||
def render_rule_new(self) -> None:
|
||
self.render_rule_form("新增路由规则", "/rules/create")
|
||
|
||
def render_rule_edit(self, parsed: Any) -> None:
|
||
query = parse_qs(parsed.query)
|
||
rule_id = query.get("id", [""])[-1]
|
||
with self.context.db.connect() as conn:
|
||
rule = conn.execute("SELECT * FROM routing_rules WHERE id = ?", (rule_id,)).fetchone()
|
||
if not rule:
|
||
self.send_error(404)
|
||
return
|
||
rule_dict = dict(rule)
|
||
rule_dict["target_ids"] = from_json(rule_dict["target_ids"], [])
|
||
self.render_rule_form("编辑路由规则", "/rules/update", rule_dict)
|
||
|
||
def render_rule_delete(self, parsed: Any) -> None:
|
||
rule_id = parse_qs(parsed.query).get("id", [""])[-1]
|
||
with self.context.db.connect() as conn:
|
||
rule = conn.execute("SELECT * FROM routing_rules WHERE id = ?", (rule_id,)).fetchone()
|
||
if not rule:
|
||
self.send_error(404)
|
||
return
|
||
body = f"""<header><h1>删除路由规则</h1><p>请确认是否删除这条规则。</p></header>
|
||
<section class="panel narrow">
|
||
<h2>{html.escape(rule['name'])}</h2>
|
||
<p>删除后不会再匹配对应 alert,已有日志不受影响。</p>
|
||
<form method="post" action="/rules/delete" class="actions">
|
||
<input type="hidden" name="id" value="{rule['id']}">
|
||
<button class="danger" type="submit">确认删除</button>
|
||
<a class="button-link secondary" href="/rules">取消</a>
|
||
</form>
|
||
</section>"""
|
||
self.send_html("删除路由规则", body)
|
||
|
||
def build_rule_from_form(self, form: dict[str, list[str]]) -> dict[str, Any]:
|
||
return {
|
||
"id": form.get("id", [""])[-1],
|
||
"name": form.get("name", [""])[-1].strip(),
|
||
"timeframe": form.get("timeframe", [""])[-1].strip(),
|
||
"symbol": form.get("symbol", [""])[-1].strip().upper(),
|
||
"strategy": form.get("strategy", [""])[-1].strip(),
|
||
"priority": int(form.get("priority", ["100"])[-1] or 100),
|
||
"card_title_template": form.get("card_title_template", ["TradingView {{symbol}} {{action}}"])[-1].strip(),
|
||
"card_body_template": form.get("card_body_template", [""])[-1].strip(),
|
||
"target_ids": [int(value) for value in form.get("target_ids", []) if value],
|
||
"enabled": 1 if form.get("enabled", [""])[-1] == "on" else 0,
|
||
}
|
||
|
||
def preview_rule(self) -> None:
|
||
form = parse_form_multi(self)
|
||
rule = self.build_rule_from_form(form)
|
||
sample_payload = form.get("sample_payload", ["{}"])[-1]
|
||
source_action = form.get("source_action", ["/rules/create"])[-1]
|
||
title = "编辑路由规则" if rule.get("id") else "新增路由规则"
|
||
try:
|
||
alert = normalize_alert(json.loads(sample_payload))
|
||
message = build_feishu_message(alert, rule)
|
||
matched = self.context.dispatcher.find_matching_rule(alert)
|
||
current_matches = self.rule_matches_alert(rule, alert)
|
||
preview_html = self.render_preview_result(rule, message, matched, current_matches)
|
||
except (json.JSONDecodeError, ValidationError, ValueError) as exc:
|
||
preview_html = f"""<section class="result-panel error"><h2>预览失败</h2><p>{html.escape(str(exc))}</p></section>"""
|
||
self.render_rule_form(title, source_action, rule, preview_html, sample_payload)
|
||
|
||
def rule_matches_alert(self, rule: dict[str, Any], alert: dict[str, Any]) -> bool:
|
||
if not any((rule.get("timeframe"), rule.get("symbol"), rule.get("strategy"))):
|
||
return False
|
||
if rule.get("timeframe") and rule["timeframe"] != alert.get("timeframe"):
|
||
return False
|
||
if rule.get("symbol") and rule["symbol"].upper() != alert.get("symbol"):
|
||
return False
|
||
if rule.get("strategy") and rule["strategy"] != alert.get("strategy"):
|
||
return False
|
||
return True
|
||
|
||
def render_preview_result(
|
||
self,
|
||
rule: dict[str, Any],
|
||
message: dict[str, Any],
|
||
matched: dict[str, Any] | None,
|
||
current_matches: bool,
|
||
) -> str:
|
||
title = message["card"]["header"]["title"]["content"]
|
||
content = message["card"]["elements"][0]["text"]["content"]
|
||
matched_text = f"当前已保存规则 #{matched['id']} {matched['name']}" if matched else "没有已保存规则会命中"
|
||
current_text = "当前表单会匹配样例 Alert" if current_matches else "当前表单不会匹配样例 Alert"
|
||
return f"""<section class="result-panel success">
|
||
<h2>预览结果</h2>
|
||
<div class="result-grid">
|
||
<div><span>当前表单</span><strong>{html.escape(current_text)}</strong></div>
|
||
<div><span>系统实际命中</span><strong>{html.escape(matched_text)}</strong></div>
|
||
<div><span>规则优先级</span><strong>{rule.get('priority')}</strong></div>
|
||
<div><span>消息类型</span><strong>飞书卡片</strong></div>
|
||
</div>
|
||
<div class="feishu-preview">
|
||
<div class="feishu-preview-header">{html.escape(title)}</div>
|
||
<pre>{html.escape(content)}</pre>
|
||
</div>
|
||
</section>"""
|
||
|
||
def render_logs(self) -> None:
|
||
logs = self.list_logs()
|
||
alert_rows = ""
|
||
for row in logs["alerts"]:
|
||
try:
|
||
raw_payload = json.dumps(json.loads(row["payload"]), ensure_ascii=False, indent=2)
|
||
except Exception:
|
||
raw_payload = row["payload"] or ""
|
||
alert_rows += f"""<tr>
|
||
<td>{row['id']}</td>
|
||
<td>{html.escape(row['symbol'])}</td>
|
||
<td>{html.escape(row['timeframe'])}</td>
|
||
<td>{html.escape(row['strategy'])}</td>
|
||
<td><span class='status'>{html.escape(row['status'])}</span></td>
|
||
<td>{html.escape(row['error'] or '')}</td>
|
||
<td>{row['created_at']}</td>
|
||
<td><details class="payload-details"><summary>查看</summary><pre>{html.escape(raw_payload)}</pre></details></td>
|
||
</tr>"""
|
||
delivery_rows = "".join(
|
||
f"<tr><td>{row['id']}</td><td>{row['alert_id']}</td><td>{html.escape(row['target_name'])}</td><td><span class='status'>{html.escape(row['status'])}</span></td><td>{row['attempts']}</td><td>{html.escape(str(row['response_code'] or ''))}</td><td>{html.escape(row['error'] or '')}</td><td>{html.escape(row['next_attempt_at'] or '')}</td></tr>"
|
||
for row in logs["deliveries"]
|
||
)
|
||
body = f"""<header><h1>日志</h1><p>最近 100 条 alert 和 200 条分发任务。</p></header>
|
||
<form method="post" action="/deliveries/retry"><button type="submit">处理到期重试</button></form>
|
||
<section><h2>Alert 日志</h2><table><thead><tr><th>ID</th><th>品种</th><th>周期</th><th>策略</th><th>状态</th><th>错误</th><th>时间</th><th>原始 Alert</th></tr></thead><tbody>{alert_rows}</tbody></table></section>
|
||
<section><h2>Delivery 日志</h2><table><thead><tr><th>ID</th><th>Alert</th><th>目标</th><th>状态</th><th>次数</th><th>HTTP</th><th>错误</th><th>下次重试</th></tr></thead><tbody>{delivery_rows}</tbody></table></section>"""
|
||
self.send_html("日志", body)
|
||
|
||
def render_test(self) -> None:
|
||
sample = html.escape(json.dumps({"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000}, indent=2))
|
||
result = getattr(self, "_test_result_html", "")
|
||
body = f"""<header><h1>测试发送</h1><p>提交一条模拟 TradingView alert,走完整匹配和飞书转发流程。</p></header>
|
||
<form class="panel" method="post" action="/test/send">
|
||
<label>Alert JSON<textarea name="payload" rows="12">{sample}</textarea></label>
|
||
<button type="submit">发送测试 Alert</button>
|
||
</form>"""
|
||
if result:
|
||
body += result
|
||
self.send_html("测试发送", body)
|
||
|
||
def render_account(self) -> None:
|
||
body = """<header><h1>账号安全</h1><p>修改当前管理员密码,修改成功后会退出登录。</p></header>
|
||
<form class="panel narrow" method="post" action="/account/password">
|
||
<h2>修改密码</h2>
|
||
<label>当前密码<input name="current_password" type="password" autocomplete="current-password" required></label>
|
||
<label>新密码<input name="new_password" type="password" autocomplete="new-password" minlength="8" required></label>
|
||
<label>确认新密码<input name="confirm_password" type="password" autocomplete="new-password" minlength="8" required></label>
|
||
<button type="submit">更新密码</button>
|
||
</form>"""
|
||
self.send_html("账号安全", body)
|
||
|
||
def create_target(self) -> None:
|
||
form = parse_form(self)
|
||
now = now_iso()
|
||
with self.context.db.connect() as conn:
|
||
conn.execute(
|
||
"INSERT INTO webhook_targets (name, webhook_url, enabled, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
|
||
(form["name"].strip(), form["webhook_url"].strip(), 1 if form.get("enabled") == "on" else 0, now, now),
|
||
)
|
||
redirect(self, "/targets")
|
||
|
||
def update_target(self) -> None:
|
||
form = parse_form(self)
|
||
with self.context.db.connect() as conn:
|
||
conn.execute(
|
||
"UPDATE webhook_targets SET name = ?, webhook_url = ?, enabled = ?, updated_at = ? WHERE id = ?",
|
||
(form["name"].strip(), form["webhook_url"].strip(), 1 if form.get("enabled") == "on" else 0, now_iso(), form["id"]),
|
||
)
|
||
redirect(self, "/targets")
|
||
|
||
def delete_target(self) -> None:
|
||
form = parse_form(self)
|
||
with self.context.db.connect() as conn:
|
||
conn.execute("DELETE FROM webhook_targets WHERE id = ?", (form["id"],))
|
||
redirect(self, "/targets")
|
||
|
||
def test_target(self) -> None:
|
||
form = parse_form(self)
|
||
with self.context.db.connect() as conn:
|
||
target = conn.execute("SELECT * FROM webhook_targets WHERE id = ?", (form.get("id"),)).fetchone()
|
||
if not target:
|
||
self.send_error(404)
|
||
return
|
||
message = build_feishu_message(
|
||
{"symbol": "TEST", "timeframe": "5m", "strategy": "webhook-test", "action": "test"},
|
||
{
|
||
"card_title_template": "TV Dispatch 测试消息",
|
||
"card_body_template": "**目标**: {{symbol}}\n**动作**: {{action}}\n这是一条飞书 Webhook 连通性测试。",
|
||
},
|
||
)
|
||
try:
|
||
data = json.dumps(message, ensure_ascii=False).encode()
|
||
request = urllib.request.Request(
|
||
target["webhook_url"],
|
||
data=data,
|
||
headers={"Content-Type": "application/json"},
|
||
method="POST",
|
||
)
|
||
with urllib.request.urlopen(request, timeout=self.context.settings.feishu_timeout_seconds) as response:
|
||
status = response.getcode()
|
||
self.render_targets_with_notice(f"测试消息已发送到 {html.escape(target['name'])},HTTP {status}", success=True)
|
||
except urllib.error.HTTPError as exc:
|
||
self.render_targets_with_notice(f"测试发送失败:HTTP {exc.code}", success=False)
|
||
except Exception as exc:
|
||
self.render_targets_with_notice(f"测试发送失败:{html.escape(str(exc))}", success=False)
|
||
|
||
def render_targets_with_notice(self, message: str, success: bool) -> None:
|
||
self._target_notice = f"""<section class="result-panel {'success' if success else 'error'}"><h2>Webhook 测试</h2><p>{message}</p></section>"""
|
||
self.render_targets()
|
||
|
||
def create_rule(self) -> None:
|
||
form = parse_form_multi(self)
|
||
target_ids = [int(value) for value in form.get("target_ids", [])]
|
||
timeframe = form.get("timeframe", [""])[-1].strip()
|
||
symbol = form.get("symbol", [""])[-1].strip().upper()
|
||
strategy = form.get("strategy", [""])[-1].strip()
|
||
if not any((timeframe, symbol, strategy)):
|
||
self.send_error(400, "周期、品种、策略至少填写一个")
|
||
return
|
||
now = now_iso()
|
||
with self.context.db.connect() as conn:
|
||
conn.execute(
|
||
"""
|
||
INSERT INTO routing_rules (
|
||
name, timeframe, symbol, strategy, priority, message_type,
|
||
card_title_template, card_body_template, enabled, target_ids,
|
||
created_at, updated_at
|
||
)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
form.get("name", [""])[-1].strip(),
|
||
timeframe,
|
||
symbol,
|
||
strategy,
|
||
int(form.get("priority", ["100"])[-1]),
|
||
"card",
|
||
form.get("card_title_template", ["TradingView {{symbol}} {{action}}"])[-1].strip(),
|
||
form.get("card_body_template", [""])[-1].strip(),
|
||
1 if form.get("enabled", [""])[-1] == "on" else 0,
|
||
to_json(target_ids),
|
||
now,
|
||
now,
|
||
),
|
||
)
|
||
redirect(self, "/rules")
|
||
|
||
def delete_rule(self) -> None:
|
||
form = parse_form(self)
|
||
with self.context.db.connect() as conn:
|
||
conn.execute("DELETE FROM routing_rules WHERE id = ?", (form["id"],))
|
||
redirect(self, "/rules")
|
||
|
||
def update_rule(self) -> None:
|
||
form = parse_form_multi(self)
|
||
target_ids = [int(value) for value in form.get("target_ids", [])]
|
||
timeframe = form.get("timeframe", [""])[-1].strip()
|
||
symbol = form.get("symbol", [""])[-1].strip().upper()
|
||
strategy = form.get("strategy", [""])[-1].strip()
|
||
if not any((timeframe, symbol, strategy)):
|
||
self.send_error(400, "周期、品种、策略至少填写一个")
|
||
return
|
||
with self.context.db.connect() as conn:
|
||
conn.execute(
|
||
"""
|
||
UPDATE routing_rules
|
||
SET name = ?, timeframe = ?, symbol = ?, strategy = ?, priority = ?,
|
||
message_type = ?, card_title_template = ?, card_body_template = ?,
|
||
enabled = ?, target_ids = ?, updated_at = ?
|
||
WHERE id = ?
|
||
""",
|
||
(
|
||
form.get("name", [""])[-1].strip(),
|
||
timeframe,
|
||
symbol,
|
||
strategy,
|
||
int(form.get("priority", ["100"])[-1]),
|
||
"card",
|
||
form.get("card_title_template", ["TradingView {{symbol}} {{action}}"])[-1].strip(),
|
||
form.get("card_body_template", [""])[-1].strip(),
|
||
1 if form.get("enabled", [""])[-1] == "on" else 0,
|
||
to_json(target_ids),
|
||
now_iso(),
|
||
form.get("id", [""])[-1],
|
||
),
|
||
)
|
||
redirect(self, "/rules")
|
||
|
||
def send_test(self) -> None:
|
||
form = parse_form(self)
|
||
payload_text = form.get("payload", "{}")
|
||
try:
|
||
payload = json.loads(payload_text)
|
||
result = self.context.dispatcher.receive_alert(payload)
|
||
delivery_text = ", ".join(str(item) for item in result.get("delivery_ids", [])) or "-"
|
||
self._test_result_html = f"""<section class="result-panel success">
|
||
<h2>测试结果</h2>
|
||
<div class="result-grid">
|
||
<div><span>Alert ID</span><strong>{result.get("alert_id")}</strong></div>
|
||
<div><span>状态</span><strong>{html.escape(str(result.get("status")))}</strong></div>
|
||
<div><span>命中规则</span><strong>{html.escape(str(result.get("matched_rule_id") or "-"))}</strong></div>
|
||
<div><span>Delivery</span><strong>{html.escape(delivery_text)}</strong></div>
|
||
</div>
|
||
<details><summary>查看响应 JSON</summary><pre>{html.escape(json.dumps(result, ensure_ascii=False, indent=2))}</pre></details>
|
||
</section>"""
|
||
self.render_test()
|
||
except (json.JSONDecodeError, ValidationError) as exc:
|
||
self._test_result_html = f"""<section class="result-panel error">
|
||
<h2>测试失败</h2>
|
||
<p>{html.escape(str(exc))}</p>
|
||
</section>"""
|
||
self.render_test()
|
||
|
||
def change_password(self) -> None:
|
||
form = parse_form(self)
|
||
current_password = form.get("current_password", "")
|
||
new_password = form.get("new_password", "")
|
||
confirm_password = form.get("confirm_password", "")
|
||
if not check_credentials(
|
||
self.context.settings,
|
||
self.context.settings.admin_username,
|
||
current_password,
|
||
self.get_admin_password_hash(),
|
||
):
|
||
json_response(self, 400, {"error": "当前密码不正确"})
|
||
return
|
||
if len(new_password) < 8:
|
||
json_response(self, 400, {"error": "新密码至少需要 8 位"})
|
||
return
|
||
if new_password != confirm_password:
|
||
json_response(self, 400, {"error": "两次输入的新密码不一致"})
|
||
return
|
||
with self.context.db.connect() as conn:
|
||
conn.execute(
|
||
"UPDATE admin_settings SET password_hash = ?, updated_at = ? WHERE id = 1",
|
||
(hash_password(new_password), now_iso()),
|
||
)
|
||
self.logout()
|
||
|
||
def retry_deliveries(self) -> None:
|
||
self.context.dispatcher.process_due_deliveries(limit=100)
|
||
redirect(self, "/logs")
|
||
|
||
|
||
def make_handler(context: AppContext) -> type[Handler]:
|
||
class BoundHandler(Handler):
|
||
pass
|
||
|
||
BoundHandler.context = context
|
||
return BoundHandler
|
||
|
||
|
||
def run() -> None:
|
||
settings = get_settings()
|
||
context = AppContext(settings)
|
||
context.db.cleanup_old_logs(settings.retention_days)
|
||
server = ThreadingHTTPServer((settings.host, settings.port), make_handler(context))
|
||
print(f"Serving {settings.app_name} on http://{settings.host}:{settings.port}")
|
||
server.serve_forever()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run()
|