'
)
def display_timezone(settings: Settings) -> timezone:
if settings.timezone:
try:
return ZoneInfo(settings.timezone)
except ZoneInfoNotFoundError:
return datetime.now().astimezone().tzinfo or timezone.utc
return datetime.now().astimezone().tzinfo or timezone.utc
def display_timezone_label(settings: Settings) -> str:
if settings.timezone:
return settings.timezone
return str(display_timezone(settings))
def format_display_time(settings: Settings, value: str | None) -> str:
if not value:
return ""
try:
parsed = datetime.fromisoformat(value)
except ValueError:
return value
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(display_timezone(settings)).strftime("%Y-%m-%d %H:%M:%S")
def filter_value(query: dict[str, list[str]], key: str) -> str:
return query.get(key, [""])[-1].strip()
def status_select(name: str, selected: str, options: list[tuple[str, str]]) -> str:
items = ['']
for value, label in options:
checked = " selected" if selected == value else ""
items.append(f'')
return f''
def log_filter_form(active_tab: str, filters: dict[str, str]) -> str:
symbol = html.escape(filters.get("symbol", ""))
strategy = html.escape(filters.get("strategy", ""))
status = filters.get("status", "")
target = html.escape(filters.get("target", ""))
if active_tab == "alerts":
alert_status = status_select(
"status",
status,
[
("queued", "已入队"),
("unmatched", "未命中"),
("partial", "部分完成"),
("delivered", "已送达"),
],
)
fields = f"""
"""
else:
delivery_status = status_select(
"status",
status,
[
("pending", "待发送"),
("processing", "发送中"),
("retry", "重试中"),
("failed", "失败"),
("sent", "已发送"),
],
)
fields = f"""
"""
return f""""""
class Handler(BaseHTTPRequestHandler):
context: AppContext
def log_message(self, format: str, *args: Any) -> None:
print("%s - - [%s] %s" % (self.address_string(), self.log_date_time_string(), format % args))
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/health":
self.handle_health()
return
if parsed.path == "/login":
self.render_login()
return
if parsed.path.startswith("/static/"):
self.serve_static(parsed.path)
return
if not self.require_auth():
return
if parsed.path in ("/", "/dashboard"):
self.render_dashboard()
elif parsed.path == "/targets":
self.render_targets()
elif parsed.path == "/targets/delete":
self.render_target_delete(parsed)
elif parsed.path == "/templates":
self.render_templates()
elif parsed.path == "/templates/new":
self.render_template_new()
elif parsed.path == "/templates/edit":
self.render_template_edit(parsed)
elif parsed.path == "/templates/delete":
self.render_template_delete(parsed)
elif parsed.path == "/rules":
self.render_rules()
elif parsed.path == "/rules/new":
self.render_rule_new()
elif parsed.path == "/rules/edit":
self.render_rule_edit(parsed)
elif parsed.path == "/rules/delete":
self.render_rule_delete(parsed)
elif parsed.path == "/logs":
self.render_logs(parsed)
elif parsed.path == "/test":
self.render_test()
elif parsed.path == "/account":
self.render_account()
elif parsed.path == "/api/targets":
json_response(self, 200, self.list_targets())
elif parsed.path == "/api/rules":
json_response(self, 200, self.list_rules())
elif parsed.path == "/api/logs":
json_response(self, 200, self.list_recent_logs())
else:
self.send_error(404)
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/webhook/tradingview":
self.handle_tradingview_webhook()
return
if parsed.path == "/login":
self.handle_login()
return
if not self.require_auth():
return
routes = {
"/targets/create": self.create_target,
"/targets/update": self.update_target,
"/targets/delete": self.delete_target,
"/targets/test": self.test_target,
"/templates/create": self.create_template,
"/templates/update": self.update_template,
"/templates/delete": self.delete_template,
"/rules/create": self.create_rule,
"/rules/update": self.update_rule,
"/rules/delete": self.delete_rule,
"/test/send": self.send_test,
"/account/password": self.change_password,
"/deliveries/retry": self.retry_deliveries,
"/deliveries/retry-one": self.retry_delivery_one,
"/logout": self.logout,
}
handler = routes.get(parsed.path)
if not handler:
self.send_error(404)
return
handler()
def require_auth(self) -> bool:
if is_valid_session(self.context.settings, self.headers.get("Cookie")):
return True
redirect(self, "/login")
return False
def layout(self, title: str, body: str) -> bytes:
nav = [
("/dashboard", "概览"),
("/rules", "路由规则"),
("/templates", "飞书模板"),
("/targets", "飞书 Webhook"),
("/logs", "日志"),
("/test", "测试发送"),
("/account", "账号安全"),
]
items = "".join(f'{label}' for href, label in nav)
return f"""
{html.escape(title)}{body}
""".encode()
def send_html(self, title: str, body: str) -> None:
content = self.layout(title, body)
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def render_login(self) -> None:
content = """
Login
""".encode()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def handle_login(self) -> None:
form = parse_form(self)
if not check_credentials(
self.context.settings,
form.get("username", ""),
form.get("password", ""),
self.get_admin_password_hash(),
):
redirect(self, "/login")
return
cookie = SimpleCookie()
cookie[COOKIE_NAME] = make_session_cookie(self.context.settings)
cookie[COOKIE_NAME]["path"] = "/"
cookie[COOKIE_NAME]["httponly"] = True
cookie[COOKIE_NAME]["samesite"] = "Lax"
self.send_response(HTTPStatus.SEE_OTHER)
self.send_header("Location", "/dashboard")
self.send_header("Set-Cookie", cookie.output(header="").strip())
self.end_headers()
def get_admin_password_hash(self) -> str:
with self.context.db.connect() as conn:
row = conn.execute("SELECT password_hash FROM admin_settings WHERE id = 1").fetchone()
return row["password_hash"]
def logout(self) -> None:
self.send_response(HTTPStatus.SEE_OTHER)
self.send_header("Location", "/login")
self.send_header("Set-Cookie", f"{COOKIE_NAME}=; Path=/; Max-Age=0; HttpOnly; SameSite=Lax")
self.end_headers()
def serve_static(self, path: str) -> None:
local_path = os.path.join(os.path.dirname(__file__), "static", os.path.basename(path))
if not os.path.exists(local_path):
self.send_error(404)
return
with open(local_path, "rb") as file:
content = file.read()
self.send_response(200)
self.send_header("Content-Type", mimetypes.guess_type(local_path)[0] or "application/octet-stream")
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
def handle_health(self) -> None:
now = datetime.now(timezone.utc)
try:
with self.context.db.connect() as conn:
conn.execute("SELECT 1").fetchone()
counts = {
"pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'pending'").fetchone()["c"],
"retry": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'retry'").fetchone()["c"],
"processing": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'processing'").fetchone()["c"],
"failed": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'failed'").fetchone()["c"],
}
worker_row = conn.execute(
"SELECT value FROM app_state WHERE key = 'worker.last_seen_at'"
).fetchone()
except Exception as exc:
json_response(self, 503, {"ok": False, "database": "error", "error": str(exc)})
return
worker_last_seen = worker_row["value"] if worker_row else None
worker_fresh = False
if worker_last_seen:
try:
parsed = datetime.fromisoformat(worker_last_seen)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
max_age = max(60, self.context.settings.worker_interval_seconds * 3)
worker_fresh = (now - parsed.astimezone(timezone.utc)).total_seconds() <= max_age
except ValueError:
worker_fresh = False
json_response(
self,
200,
{
"ok": True,
"database": "ok",
"queue": counts,
"worker": {
"last_seen_at": worker_last_seen,
"fresh": worker_fresh,
"interval_seconds": self.context.settings.worker_interval_seconds,
},
"dispatch_inline": self.context.settings.dispatch_inline,
"dispatch_wakeup_on_receive": self.context.settings.dispatch_wakeup_on_receive,
},
)
def handle_tradingview_webhook(self) -> None:
if self.context.settings.webhook_token:
query = parse_qs(urlparse(self.path).query)
token = self.headers.get("X-Webhook-Token") or query.get("token", [""])[-1]
if token != self.context.settings.webhook_token:
json_response(self, 401, {"error": "Invalid webhook token"})
return
try:
payload = parse_json_body(self)
result = self.context.dispatcher.receive_alert(payload)
if result.get("delivery_ids"):
self.context.wake_dispatcher()
json_response(self, 202, result)
except ValidationError as exc:
json_response(self, 400, {"error": str(exc)})
def list_targets(self) -> list[dict[str, Any]]:
with self.context.db.connect() as conn:
rows = conn.execute("SELECT * FROM webhook_targets ORDER BY id DESC").fetchall()
return [dict(row) for row in rows]
def list_rules(self) -> list[dict[str, Any]]:
with self.context.db.connect() as conn:
rows = conn.execute("SELECT * FROM routing_rules ORDER BY priority ASC, id DESC").fetchall()
rules = []
for row in rows:
item = dict(row)
item["target_ids"] = from_json(item["target_ids"], [])
rules.append(item)
return rules
def list_templates(self) -> list[dict[str, Any]]:
with self.context.db.connect() as conn:
rows = conn.execute("SELECT * FROM message_templates ORDER BY id DESC").fetchall()
return [dict(row) for row in rows]
def list_logs(
self,
active_tab: str = "alerts",
page: int = 1,
page_size: int = LOG_PAGE_SIZE,
filters: dict[str, str] | None = None,
) -> dict[str, Any]:
filters = filters or {}
offset = (page - 1) * page_size
with self.context.db.connect() as conn:
alert_where = []
alert_params: list[Any] = []
if filters.get("symbol"):
alert_where.append("symbol = ?")
alert_params.append(filters["symbol"].upper())
if filters.get("strategy"):
alert_where.append("strategy = ?")
alert_params.append(filters["strategy"])
if filters.get("status"):
alert_where.append("status = ?")
alert_params.append(filters["status"])
alert_where_sql = f"WHERE {' AND '.join(alert_where)}" if alert_where else ""
delivery_where = []
delivery_params: list[Any] = []
if filters.get("status"):
delivery_where.append("status = ?")
delivery_params.append(filters["status"])
if filters.get("target"):
delivery_where.append("target_name LIKE ?")
delivery_params.append(f"%{filters['target']}%")
delivery_where_sql = f"WHERE {' AND '.join(delivery_where)}" if delivery_where else ""
alert_total = conn.execute(
f"SELECT COUNT(*) AS c FROM alerts {alert_where_sql}",
alert_params,
).fetchone()["c"]
delivery_total = conn.execute(
f"SELECT COUNT(*) AS c FROM deliveries {delivery_where_sql}",
delivery_params,
).fetchone()["c"]
alerts = []
deliveries = []
if active_tab == "alerts":
alerts = conn.execute(
f"SELECT * FROM alerts {alert_where_sql} ORDER BY id DESC LIMIT ? OFFSET ?",
(*alert_params, page_size, offset),
).fetchall()
else:
deliveries = conn.execute(
f"SELECT * FROM deliveries {delivery_where_sql} ORDER BY id DESC LIMIT ? OFFSET ?",
(*delivery_params, page_size, offset),
).fetchall()
return {
"alerts": [dict(row) for row in alerts],
"deliveries": [dict(row) for row in deliveries],
"alert_total": alert_total,
"delivery_total": delivery_total,
}
def list_recent_logs(self) -> dict[str, list[dict[str, Any]]]:
with self.context.db.connect() as conn:
alerts = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 100").fetchall()
deliveries = conn.execute("SELECT * FROM deliveries ORDER BY id DESC LIMIT 200").fetchall()
return {"alerts": [dict(row) for row in alerts], "deliveries": [dict(row) for row in deliveries]}
def render_dashboard(self) -> None:
host = self.headers.get("Host", f"localhost:{self.context.settings.port}")
scheme = self.headers.get("X-Forwarded-Proto", "http")
base_url = f"{scheme}://{host}"
webhook_url = f"{base_url}/webhook/tradingview"
token = self.context.settings.webhook_token
webhook_url_with_token = f"{webhook_url}?token={token}" if token else webhook_url
token_block = (
f"""
Webhook Token{html.escape(token)}
Header 方式X-Webhook-Token: {html.escape(token)}
"""
if token
else """
当前未设置 WEBHOOK_TOKEN,任何知道地址的人都可以提交 alert。生产环境建议设置。
"""
)
webhook_panel = f"""
TradingView Webhook 配置
Webhook URL{html.escape(webhook_url_with_token)}
纯 URL{html.escape(webhook_url)}
{token_block}
"""
with self.context.db.connect() as conn:
counts = {
"alerts": conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"],
"rules": conn.execute("SELECT COUNT(*) AS c FROM routing_rules").fetchone()["c"],
"targets": conn.execute("SELECT COUNT(*) AS c FROM webhook_targets").fetchone()["c"],
"pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'pending'").fetchone()["c"],
"processing": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'processing'").fetchone()["c"],
"retry": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'retry'").fetchone()["c"],
"failed": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status = 'failed'").fetchone()["c"],
}
recent = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 8").fetchall()
cards = "".join(f'
{label}{value}
' for label, value in [
("Alerts", counts["alerts"]),
("Rules", counts["rules"]),
("Targets", counts["targets"]),
("待发送", counts["pending"]),
("发送中", counts["processing"]),
("重试中", counts["retry"]),
("失败", counts["failed"]),
])
rows = "".join(
f"
"""
for target in targets
)
form = """"""
notice = getattr(self, "_target_notice", "")
self.send_html("飞书 Webhook", f"
飞书 Webhook
维护所有可分发的飞书机器人地址。
{notice}{form}
ID
名称
URL
操作
{rows}
")
def render_target_delete(self, parsed: Any) -> None:
target_id = parse_qs(parsed.query).get("id", [""])[-1]
with self.context.db.connect() as conn:
target = conn.execute("SELECT * FROM webhook_targets WHERE id = ?", (target_id,)).fetchone()
if not target:
self.send_error(404)
return
body = f"""
"""
self.send_html(title, body)
def render_template_new(self) -> None:
self.render_template_form("新增飞书内容模板", "/templates/create")
def render_template_edit(self, parsed: Any) -> None:
template_id = parse_qs(parsed.query).get("id", [""])[-1]
with self.context.db.connect() as conn:
template = conn.execute("SELECT * FROM message_templates WHERE id = ?", (template_id,)).fetchone()
if not template:
self.send_error(404)
return
self.render_template_form("编辑飞书内容模板", "/templates/update", dict(template))
def render_template_delete(self, parsed: Any) -> None:
template_id = parse_qs(parsed.query).get("id", [""])[-1]
with self.context.db.connect() as conn:
template = conn.execute("SELECT * FROM message_templates WHERE id = ?", (template_id,)).fetchone()
if not template:
self.send_error(404)
return
body = f"""
删除飞书内容模板
请确认是否删除这个模板。
{html.escape(template['name'])}
删除模板不会影响已经创建的路由规则,只是后续不能再套用它。
"""
self.send_html("删除飞书内容模板", body)
def render_rules(self) -> None:
targets = self.list_targets()
rules = self.list_rules()
target_names = {target["id"]: target["name"] for target in targets}
rows = ""
for rule in rules:
conditions = [
f"周期={html.escape(rule['timeframe'])}" if rule["timeframe"] else "",
f"品种={html.escape(rule['symbol'])}" if rule["symbol"] else "",
f"策略={html.escape(rule['strategy'])}" if rule["strategy"] else "",
]
target_badges = "".join(
f'{html.escape(target_names.get(target_id, f"#{target_id}"))}'
for target_id in rule["target_ids"]
) or "-"
rows += f"""
{rule['id']}
{html.escape(rule['name'])}
{' '.join(item for item in conditions if item) or '-'}