from __future__ import annotations
import html
import json
import mimetypes
import os
import urllib.error
import urllib.request
from http import HTTPStatus
from http.cookies import SimpleCookie
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse
from app.auth import COOKIE_NAME, check_credentials, hash_password, is_valid_session, make_session_cookie
from app.config import Settings, get_settings
from app.db import Database, from_json, now_iso, to_json
from app.dispatcher import Dispatcher, ValidationError, build_feishu_message, normalize_alert
class AppContext:
def __init__(self, settings: Settings):
self.settings = settings
self.db = Database(settings)
self.db.migrate(settings)
self.dispatcher = Dispatcher(self.db, settings)
def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any] | list[Any]) -> None:
body = json.dumps(payload, ensure_ascii=False).encode()
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)
def redirect(handler: BaseHTTPRequestHandler, location: str) -> None:
handler.send_response(HTTPStatus.SEE_OTHER)
handler.send_header("Location", location)
handler.end_headers()
def read_body(handler: BaseHTTPRequestHandler) -> bytes:
length = int(handler.headers.get("Content-Length", "0") or "0")
return handler.rfile.read(length)
def parse_form(handler: BaseHTTPRequestHandler) -> dict[str, str]:
data = read_body(handler).decode()
return {key: values[-1] for key, values in parse_qs(data).items()}
def parse_form_multi(handler: BaseHTTPRequestHandler) -> dict[str, list[str]]:
return parse_qs(read_body(handler).decode())
def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
try:
value = json.loads(read_body(handler).decode() or "{}")
except json.JSONDecodeError as exc:
raise ValidationError("Request body must be valid JSON") from exc
if not isinstance(value, dict):
raise ValidationError("Request body must be a JSON object")
return value
def target_select_options(
targets: list[dict[str, Any]],
selected_ids: list[int] | None = None,
placeholder: bool = False,
) -> str:
selected_ids = selected_ids or []
options = [''] if placeholder else []
for target in targets:
selected = "selected" if target["id"] in selected_ids else ""
disabled = "" if target["enabled"] else "disabled"
suffix = "" if target["enabled"] else " (停用)"
options.append(
f''
)
return "".join(options)
class Handler(BaseHTTPRequestHandler):
context: AppContext
def log_message(self, format: str, *args: Any) -> None:
print("%s - - [%s] %s" % (self.address_string(), self.log_date_time_string(), format % args))
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/health":
json_response(self, 200, {"ok": True})
return
if parsed.path == "/login":
self.render_login()
return
if parsed.path.startswith("/static/"):
self.serve_static(parsed.path)
return
if not self.require_auth():
return
if parsed.path in ("/", "/dashboard"):
self.render_dashboard()
elif parsed.path == "/targets":
self.render_targets()
elif parsed.path == "/targets/delete":
self.render_target_delete(parsed)
elif parsed.path == "/rules":
self.render_rules()
elif parsed.path == "/rules/new":
self.render_rule_new()
elif parsed.path == "/rules/edit":
self.render_rule_edit(parsed)
elif parsed.path == "/rules/delete":
self.render_rule_delete(parsed)
elif parsed.path == "/logs":
self.render_logs()
elif parsed.path == "/test":
self.render_test()
elif parsed.path == "/account":
self.render_account()
elif parsed.path == "/api/targets":
json_response(self, 200, self.list_targets())
elif parsed.path == "/api/rules":
json_response(self, 200, self.list_rules())
elif parsed.path == "/api/logs":
json_response(self, 200, self.list_logs())
else:
self.send_error(404)
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/webhook/tradingview":
self.handle_tradingview_webhook()
return
if parsed.path == "/login":
self.handle_login()
return
if not self.require_auth():
return
routes = {
"/targets/create": self.create_target,
"/targets/update": self.update_target,
"/targets/delete": self.delete_target,
"/targets/test": self.test_target,
"/rules/create": self.create_rule,
"/rules/update": self.update_rule,
"/rules/delete": self.delete_rule,
"/rules/preview": self.preview_rule,
"/test/send": self.send_test,
"/account/password": self.change_password,
"/deliveries/retry": self.retry_deliveries,
"/logout": self.logout,
}
handler = routes.get(parsed.path)
if not handler:
self.send_error(404)
return
handler()
def require_auth(self) -> bool:
if is_valid_session(self.context.settings, self.headers.get("Cookie")):
return True
redirect(self, "/login")
return False
def layout(self, title: str, body: str) -> bytes:
nav = [
("/dashboard", "概览"),
("/rules", "路由规则"),
("/targets", "飞书 Webhook"),
("/logs", "日志"),
("/test", "测试发送"),
("/account", "账号安全"),
]
items = "".join(f'{label}' for href, label in nav)
return f"""
{html.escape(title)}{body}
""".encode()
def send_html(self, title: str, body: str) -> None:
content = self.layout(title, body)
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def render_login(self) -> None:
content = """
Login
""".encode()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def handle_login(self) -> None:
form = parse_form(self)
if not check_credentials(
self.context.settings,
form.get("username", ""),
form.get("password", ""),
self.get_admin_password_hash(),
):
redirect(self, "/login")
return
cookie = SimpleCookie()
cookie[COOKIE_NAME] = make_session_cookie(self.context.settings)
cookie[COOKIE_NAME]["path"] = "/"
cookie[COOKIE_NAME]["httponly"] = True
cookie[COOKIE_NAME]["samesite"] = "Lax"
self.send_response(HTTPStatus.SEE_OTHER)
self.send_header("Location", "/dashboard")
self.send_header("Set-Cookie", cookie.output(header="").strip())
self.end_headers()
def get_admin_password_hash(self) -> str:
with self.context.db.connect() as conn:
row = conn.execute("SELECT password_hash FROM admin_settings WHERE id = 1").fetchone()
return row["password_hash"]
def logout(self) -> None:
self.send_response(HTTPStatus.SEE_OTHER)
self.send_header("Location", "/login")
self.send_header("Set-Cookie", f"{COOKIE_NAME}=; Path=/; Max-Age=0; HttpOnly; SameSite=Lax")
self.end_headers()
def serve_static(self, path: str) -> None:
local_path = os.path.join(os.path.dirname(__file__), "static", os.path.basename(path))
if not os.path.exists(local_path):
self.send_error(404)
return
with open(local_path, "rb") as file:
content = file.read()
self.send_response(200)
self.send_header("Content-Type", mimetypes.guess_type(local_path)[0] or "application/octet-stream")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def handle_tradingview_webhook(self) -> None:
if self.context.settings.webhook_token:
query = parse_qs(urlparse(self.path).query)
token = self.headers.get("X-Webhook-Token") or query.get("token", [""])[-1]
if token != self.context.settings.webhook_token:
json_response(self, 401, {"error": "Invalid webhook token"})
return
try:
payload = parse_json_body(self)
result = self.context.dispatcher.receive_alert(payload)
json_response(self, 202, result)
except ValidationError as exc:
json_response(self, 400, {"error": str(exc)})
def list_targets(self) -> list[dict[str, Any]]:
with self.context.db.connect() as conn:
rows = conn.execute("SELECT * FROM webhook_targets ORDER BY id DESC").fetchall()
return [dict(row) for row in rows]
def list_rules(self) -> list[dict[str, Any]]:
with self.context.db.connect() as conn:
rows = conn.execute("SELECT * FROM routing_rules ORDER BY priority ASC, id DESC").fetchall()
rules = []
for row in rows:
item = dict(row)
item["target_ids"] = from_json(item["target_ids"], [])
rules.append(item)
return rules
def list_logs(self) -> dict[str, list[dict[str, Any]]]:
with self.context.db.connect() as conn:
alerts = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 100").fetchall()
deliveries = conn.execute("SELECT * FROM deliveries ORDER BY id DESC LIMIT 200").fetchall()
return {"alerts": [dict(row) for row in alerts], "deliveries": [dict(row) for row in deliveries]}
def render_dashboard(self) -> None:
host = self.headers.get("Host", f"localhost:{self.context.settings.port}")
scheme = self.headers.get("X-Forwarded-Proto", "http")
base_url = f"{scheme}://{host}"
webhook_url = f"{base_url}/webhook/tradingview"
token = self.context.settings.webhook_token
webhook_url_with_token = f"{webhook_url}?token={token}" if token else webhook_url
token_block = (
f"""
Webhook Token{html.escape(token)}
Header 方式X-Webhook-Token: {html.escape(token)}
"""
if token
else """
当前未设置 WEBHOOK_TOKEN,任何知道地址的人都可以提交 alert。生产环境建议设置。
"""
)
webhook_panel = f"""
TradingView Webhook 配置
Webhook URL{html.escape(webhook_url_with_token)}
纯 URL{html.escape(webhook_url)}
{token_block}
"""
with self.context.db.connect() as conn:
counts = {
"alerts": conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"],
"rules": conn.execute("SELECT COUNT(*) AS c FROM routing_rules").fetchone()["c"],
"targets": conn.execute("SELECT COUNT(*) AS c FROM webhook_targets").fetchone()["c"],
"pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status IN ('pending','retry')").fetchone()["c"],
}
recent = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 8").fetchall()
cards = "".join(f'
{label}{value}
' for label, value in [
("Alerts", counts["alerts"]),
("Rules", counts["rules"]),
("Targets", counts["targets"]),
("Pending", counts["pending"]),
])
rows = "".join(
f"
"""
for target in targets
)
form = """"""
notice = getattr(self, "_target_notice", "")
self.send_html("飞书 Webhook", f"
飞书 Webhook
维护所有可分发的飞书机器人地址。
{notice}{form}
ID
名称
URL
状态
操作
{rows}
")
def render_target_delete(self, parsed: Any) -> None:
target_id = parse_qs(parsed.query).get("id", [""])[-1]
with self.context.db.connect() as conn:
target = conn.execute("SELECT * FROM webhook_targets WHERE id = ?", (target_id,)).fetchone()
if not target:
self.send_error(404)
return
body = f"""
删除飞书 Webhook
请确认是否删除这个飞书目标。
{html.escape(target['name'])}
{html.escape(target['webhook_url'])}
"""
self.send_html("删除飞书 Webhook", body)
def render_rules(self) -> None:
targets = self.list_targets()
rules = self.list_rules()
target_names = {target["id"]: target["name"] for target in targets}
rows = ""
for rule in rules:
conditions = [
f"周期={html.escape(rule['timeframe'])}" if rule["timeframe"] else "",
f"品种={html.escape(rule['symbol'])}" if rule["symbol"] else "",
f"策略={html.escape(rule['strategy'])}" if rule["strategy"] else "",
]
target_name = target_names.get(rule["target_ids"][0], "-") if rule["target_ids"] else "-"
rows += f"""
{rule['id']}
{html.escape(rule['name'])}
{' '.join(item for item in conditions if item) or '-'}