commit d01c0d49cdd93b1e55008475a63c76138dbe5ff2 Author: aaron <> Date: Thu May 14 21:40:22 2026 +0800 first commit diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..40cc919 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,28 @@ +.git +.gitignore +.dockerignore + +__pycache__/ +*.py[cod] +.pytest_cache/ +.coverage +htmlcov/ + +.venv/ +venv/ +env/ + +data/ +*.log + +.env +.env.* +!.env.example + +.DS_Store +.idea/ +.vscode/ + +dist/ +build/ +*.egg-info/ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c634d87 --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +.pytest_cache/ +.coverage +htmlcov/ + +# Virtual environments +.venv/ +venv/ +env/ + +# Local runtime data +data/*.db +data/*.sqlite +data/*.sqlite3 +*.log + +# Local configuration and secrets +.env +.env.* +!.env.example + +# OS and editor files +.DS_Store +.idea/ +.vscode/ + +# Docker / build artifacts +dist/ +build/ +*.egg-info/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..921aab4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.13-slim + +WORKDIR /app + +COPY app ./app +COPY requirements.txt . + +ENV APP_HOST=0.0.0.0 +ENV APP_PORT=8000 +ENV DATABASE_PATH=/data/dispatcher.db + +EXPOSE 8000 + +CMD ["python", "-m", "app"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..6957eee --- /dev/null +++ b/README.md @@ -0,0 +1,78 @@ +# TradingView Alert Dispatcher + +接收 TradingView webhook alert,按 `timeframe + symbol + strategy` 路由到飞书 webhook,并提供管理控制台。 + +## Run Locally + +```bash +python3 -m app +``` + +默认地址:`http://localhost:8000` + +默认登录: + +- 用户名:`admin` +- 密码:`change-me-now` + +首次启动会把 `ADMIN_PASSWORD` 写入数据库并保存为哈希。之后请在管理台的「账号安全」页面修改密码;修改后环境变量不会覆盖数据库中的新密码。 + +## Docker + +```bash +docker compose up --build +``` + +Compose 会启动两个服务:`dispatcher` 负责 Web/API/管理台,`worker` 负责周期性处理失败重试。 + +## TradingView Payload + +```json +{ + "timeframe": "5m", + "symbol": "BTCUSDT", + "strategy": "breakout", + "action": "buy", + "price": 68000 +} +``` + +发送到: + +```text +POST /webhook/tradingview +Content-Type: application/json +``` + +## Feishu Message Templates + +路由规则支持两种消息类型: + +- `Card`:默认,发送飞书 interactive card。 +- `Text`:发送普通文本消息。 + +标题和正文模板支持 `{{field}}` 占位符,字段来自 TradingView alert JSON。嵌套字段可以写成 `{{order.id}}`。 + +每条路由规则通过「发送到」下拉框选择一个飞书 Webhook。需要同一个信号发到多个群时,可以建多条匹配条件相同、目标不同的规则,并用优先级控制命中顺序;当前默认路由逻辑只发送最高优先级命中的规则。 + +示例正文模板: + +```text +**品种**: {{symbol}} +**周期**: {{timeframe}} +**策略**: {{strategy}} +**动作**: {{action}} +**价格**: {{price}} +``` + +## Environment + +- `ADMIN_USERNAME` +- `ADMIN_PASSWORD` +- `SESSION_SECRET` +- `DATABASE_PATH` +- `RETENTION_DAYS` +- `MAX_DELIVERY_ATTEMPTS` +- `RETRY_BACKOFF_SECONDS` +- `FEISHU_TIMEOUT_SECONDS` +- `WORKER_INTERVAL_SECONDS` diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..d78aa11 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1 @@ +"""TradingView alert dispatcher.""" diff --git a/app/__main__.py b/app/__main__.py new file mode 100644 index 0000000..221fbd6 --- /dev/null +++ b/app/__main__.py @@ -0,0 +1,4 @@ +from app.server import run + + +run() diff --git a/app/auth.py b/app/auth.py new file mode 100644 index 0000000..0a82c95 --- /dev/null +++ b/app/auth.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import base64 +import hashlib +import hmac +import os +import time +from http import cookies + +from app.config import Settings + + +COOKIE_NAME = "tv_dispatcher_session" +SESSION_TTL_SECONDS = 60 * 60 * 12 +PASSWORD_ITERATIONS = 200_000 + + +def _sign(secret: str, value: str) -> str: + return hmac.new(secret.encode(), value.encode(), hashlib.sha256).hexdigest() + + +def make_session_cookie(settings: Settings) -> str: + payload = f"{settings.admin_username}:{int(time.time())}" + encoded = base64.urlsafe_b64encode(payload.encode()).decode() + return f"{encoded}.{_sign(settings.session_secret, encoded)}" + + +def is_valid_session(settings: Settings, cookie_header: str | None) -> bool: + if not cookie_header: + return False + jar = cookies.SimpleCookie(cookie_header) + morsel = jar.get(COOKIE_NAME) + if not morsel: + return False + try: + encoded, signature = morsel.value.split(".", 1) + if not hmac.compare_digest(signature, _sign(settings.session_secret, encoded)): + return False + raw = base64.urlsafe_b64decode(encoded.encode()).decode() + username, issued = raw.rsplit(":", 1) + return username == settings.admin_username and time.time() - int(issued) <= SESSION_TTL_SECONDS + except Exception: + return False + + +def hash_password(password: str) -> str: + salt = os.urandom(16) + digest = hashlib.pbkdf2_hmac("sha256", password.encode(), salt, PASSWORD_ITERATIONS) + return f"pbkdf2_sha256${PASSWORD_ITERATIONS}${base64.b64encode(salt).decode()}${base64.b64encode(digest).decode()}" + + +def verify_password(password: str, password_hash: str) -> bool: + try: + algorithm, iterations, salt, digest = password_hash.split("$", 3) + if algorithm != "pbkdf2_sha256": + return False + expected = hashlib.pbkdf2_hmac( + "sha256", + password.encode(), + base64.b64decode(salt.encode()), + int(iterations), + ) + return hmac.compare_digest(base64.b64encode(expected).decode(), digest) + except Exception: + return False + + +def check_credentials(settings: Settings, username: str, password: str, password_hash: str) -> bool: + return hmac.compare_digest(username, settings.admin_username) and verify_password(password, password_hash) diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..6f9681a --- /dev/null +++ b/app/config.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Settings: + app_name: str = "TradingView Alert Dispatcher" + host: str = "0.0.0.0" + port: int = 8000 + database_path: str = "data/dispatcher.db" + admin_username: str = "admin" + admin_password: str = "change-me-now" + session_secret: str = "change-this-session-secret" + retention_days: int = 30 + max_delivery_attempts: int = 3 + retry_backoff_seconds: int = 60 + feishu_timeout_seconds: int = 10 + + +def get_settings() -> Settings: + return Settings( + host=os.getenv("APP_HOST", "0.0.0.0"), + port=int(os.getenv("APP_PORT", "8000")), + database_path=os.getenv("DATABASE_PATH", "data/dispatcher.db"), + admin_username=os.getenv("ADMIN_USERNAME", "admin"), + admin_password=os.getenv("ADMIN_PASSWORD", "change-me-now"), + session_secret=os.getenv("SESSION_SECRET", "change-this-session-secret"), + retention_days=int(os.getenv("RETENTION_DAYS", "30")), + max_delivery_attempts=int(os.getenv("MAX_DELIVERY_ATTEMPTS", "3")), + retry_backoff_seconds=int(os.getenv("RETRY_BACKOFF_SECONDS", "60")), + feishu_timeout_seconds=int(os.getenv("FEISHU_TIMEOUT_SECONDS", "10")), + ) diff --git a/app/db.py b/app/db.py new file mode 100644 index 0000000..8a5a463 --- /dev/null +++ b/app/db.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +import json +import os +import sqlite3 +from contextlib import contextmanager +from datetime import datetime, timedelta, timezone +from typing import Any, Iterator + +from app.auth import hash_password +from app.config import Settings + + +UTC = timezone.utc + + +def now_iso() -> str: + return datetime.now(UTC).replace(microsecond=0).isoformat() + + +def to_json(value: Any) -> str: + return json.dumps(value, ensure_ascii=False, separators=(",", ":")) + + +def from_json(value: str | None, default: Any = None) -> Any: + if not value: + return default + return json.loads(value) + + +class Database: + def __init__(self, settings: Settings): + self.path = settings.database_path + os.makedirs(os.path.dirname(self.path) or ".", exist_ok=True) + + @contextmanager + def connect(self) -> Iterator[sqlite3.Connection]: + conn = sqlite3.connect(self.path) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA foreign_keys = ON") + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + def migrate(self, settings: Settings) -> None: + with self.connect() as conn: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS admin_settings ( + id INTEGER PRIMARY KEY CHECK (id = 1), + password_hash TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS webhook_targets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + webhook_url TEXT NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS routing_rules ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + timeframe TEXT NOT NULL, + symbol TEXT NOT NULL, + strategy TEXT NOT NULL, + priority INTEGER NOT NULL DEFAULT 100, + enabled INTEGER NOT NULL DEFAULT 1, + target_ids TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_rules_match + ON routing_rules(enabled, timeframe, symbol, strategy, priority); + + CREATE TABLE IF NOT EXISTS alerts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timeframe TEXT NOT NULL, + symbol TEXT NOT NULL, + strategy TEXT NOT NULL, + action TEXT, + price REAL, + payload TEXT NOT NULL, + matched_rule_id INTEGER, + status TEXT NOT NULL, + error TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY(matched_rule_id) REFERENCES routing_rules(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS deliveries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + alert_id INTEGER NOT NULL, + rule_id INTEGER, + target_id INTEGER, + target_name TEXT NOT NULL, + webhook_url TEXT NOT NULL, + status TEXT NOT NULL, + attempts INTEGER NOT NULL DEFAULT 0, + next_attempt_at TEXT, + last_attempt_at TEXT, + response_code INTEGER, + response_body TEXT, + error TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(alert_id) REFERENCES alerts(id) ON DELETE CASCADE, + FOREIGN KEY(rule_id) REFERENCES routing_rules(id) ON DELETE SET NULL, + FOREIGN KEY(target_id) REFERENCES webhook_targets(id) ON DELETE SET NULL + ); + + CREATE INDEX IF NOT EXISTS idx_deliveries_retry + ON deliveries(status, next_attempt_at); + """ + ) + existing_columns = { + row["name"] for row in conn.execute("PRAGMA table_info(routing_rules)").fetchall() + } + if "message_type" not in existing_columns: + conn.execute("ALTER TABLE routing_rules ADD COLUMN message_type TEXT NOT NULL DEFAULT 'card'") + if "card_title_template" not in existing_columns: + conn.execute( + "ALTER TABLE routing_rules ADD COLUMN card_title_template TEXT NOT NULL DEFAULT 'TradingView {{symbol}} {{action}}'" + ) + if "card_body_template" not in existing_columns: + conn.execute( + "ALTER TABLE routing_rules ADD COLUMN card_body_template TEXT NOT NULL DEFAULT '{{symbol}} {{timeframe}} {{strategy}} {{action}} @ {{price}}'" + ) + admin = conn.execute("SELECT id FROM admin_settings WHERE id = 1").fetchone() + if not admin: + now = now_iso() + conn.execute( + "INSERT INTO admin_settings (id, password_hash, created_at, updated_at) VALUES (1, ?, ?, ?)", + (hash_password(settings.admin_password), now, now), + ) + + def cleanup_old_logs(self, retention_days: int) -> int: + cutoff = (datetime.now(UTC) - timedelta(days=retention_days)).replace(microsecond=0).isoformat() + with self.connect() as conn: + cur = conn.execute("DELETE FROM alerts WHERE created_at < ?", (cutoff,)) + return cur.rowcount diff --git a/app/dispatcher.py b/app/dispatcher.py new file mode 100644 index 0000000..8c9a048 --- /dev/null +++ b/app/dispatcher.py @@ -0,0 +1,296 @@ +from __future__ import annotations + +import json +import re +import urllib.error +import urllib.request +from datetime import datetime, timedelta, timezone +from typing import Any + +from app.config import Settings +from app.db import Database, from_json, now_iso, to_json + + +UTC = timezone.utc + + +REQUIRED_ALERT_FIELDS = ("timeframe", "symbol", "strategy") +TEMPLATE_PATTERN = re.compile(r"{{\s*([a-zA-Z0-9_.-]+)\s*}}|(? dict[str, Any]: + missing = [field for field in REQUIRED_ALERT_FIELDS if not str(payload.get(field, "")).strip()] + if missing: + raise ValidationError(f"Missing required fields: {', '.join(missing)}") + normalized = dict(payload) + normalized["timeframe"] = str(payload["timeframe"]).strip() + normalized["symbol"] = str(payload["symbol"]).strip().upper() + normalized["strategy"] = str(payload["strategy"]).strip() + if "price" in normalized and normalized["price"] not in (None, ""): + try: + normalized["price"] = float(normalized["price"]) + except (TypeError, ValueError) as exc: + raise ValidationError("price must be numeric") from exc + return normalized + + +def resolve_template_value(alert: dict[str, Any], field: str) -> str: + value: Any = alert + for part in field.split("."): + if isinstance(value, dict) and part in value: + value = value[part] + else: + return "" + if value is None: + return "" + if isinstance(value, (dict, list)): + return json.dumps(value, ensure_ascii=False) + return str(value) + + +def render_template(template: str, alert: dict[str, Any]) -> str: + return TEMPLATE_PATTERN.sub(lambda match: resolve_template_value(alert, match.group(1) or match.group(2)), template) + + +def default_body(alert: dict[str, Any]) -> str: + action = alert.get("action") or alert.get("signal") or "alert" + lines = [ + f"TradingView 信号: {alert['symbol']}", + f"周期: {alert['timeframe']}", + f"策略: {alert['strategy']}", + f"动作: {action}", + ] + if alert.get("price") is not None: + lines.append(f"价格: {alert['price']}") + if alert.get("time"): + lines.append(f"时间: {alert['time']}") + return "\n".join(lines) + + +def build_feishu_message(alert: dict[str, Any], rule: dict[str, Any] | None = None) -> dict[str, Any]: + rule = rule or {} + title_template = rule.get("card_title_template") or "TradingView {{symbol}} {{action}}" + body_template = rule.get("card_body_template") or default_body(alert) + title = render_template(title_template, alert).strip() or f"TradingView {alert['symbol']}" + body = render_template(body_template, alert).strip() or default_body(alert) + + if rule.get("message_type") == "text": + return {"msg_type": "text", "content": {"text": f"{title}\n{body}"}} + + return { + "msg_type": "interactive", + "card": { + "config": {"wide_screen_mode": True}, + "header": { + "template": "blue", + "title": {"tag": "plain_text", "content": title}, + }, + "elements": [ + {"tag": "div", "text": {"tag": "lark_md", "content": body}}, + { + "tag": "hr", + }, + { + "tag": "note", + "elements": [ + { + "tag": "plain_text", + "content": f"{alert['symbol']} · {alert['timeframe']} · {alert['strategy']}", + } + ], + }, + ], + }, + } + + +class Dispatcher: + def __init__(self, db: Database, settings: Settings): + self.db = db + self.settings = settings + + def receive_alert(self, payload: dict[str, Any]) -> dict[str, Any]: + alert = normalize_alert(payload) + created_at = now_iso() + with self.db.connect() as conn: + rule = conn.execute( + """ + SELECT * FROM routing_rules + WHERE enabled = 1 + AND timeframe = ? + AND upper(symbol) = ? + AND strategy = ? + ORDER BY priority ASC, id ASC + LIMIT 1 + """, + (alert["timeframe"], alert["symbol"], alert["strategy"]), + ).fetchone() + + status = "matched" if rule else "unmatched" + cur = conn.execute( + """ + INSERT INTO alerts ( + timeframe, symbol, strategy, action, price, payload, + matched_rule_id, status, error, created_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + alert["timeframe"], + alert["symbol"], + alert["strategy"], + alert.get("action") or alert.get("signal"), + alert.get("price"), + to_json(alert), + rule["id"] if rule else None, + status, + None if rule else "No enabled routing rule matched this alert.", + created_at, + ), + ) + alert_id = int(cur.lastrowid) + + delivery_ids: list[int] = [] + if rule: + target_ids = from_json(rule["target_ids"], []) + if target_ids: + placeholders = ",".join("?" for _ in target_ids) + targets = conn.execute( + f"SELECT * FROM webhook_targets WHERE enabled = 1 AND id IN ({placeholders})", + target_ids, + ).fetchall() + for target in targets: + delivery = conn.execute( + """ + INSERT INTO deliveries ( + alert_id, rule_id, target_id, target_name, webhook_url, + status, attempts, next_attempt_at, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, 'pending', 0, ?, ?, ?) + """, + ( + alert_id, + rule["id"], + target["id"], + target["name"], + target["webhook_url"], + created_at, + created_at, + created_at, + ), + ) + delivery_ids.append(int(delivery.lastrowid)) + + if rule and not delivery_ids: + conn.execute( + "UPDATE alerts SET status = ?, error = ? WHERE id = ?", + ("unmatched", "Matched rule has no enabled webhook targets.", alert_id), + ) + + self.process_due_deliveries() + return { + "alert_id": alert_id, + "status": status, + "matched_rule_id": rule["id"] if rule else None, + "delivery_ids": delivery_ids, + } + + def process_due_deliveries(self, limit: int = 25) -> int: + now = now_iso() + with self.db.connect() as conn: + rows = conn.execute( + """ + SELECT d.*, a.payload + , r.message_type, r.card_title_template, r.card_body_template + FROM deliveries d + JOIN alerts a ON a.id = d.alert_id + LEFT JOIN routing_rules r ON r.id = d.rule_id + WHERE d.status IN ('pending', 'retry') + AND (d.next_attempt_at IS NULL OR d.next_attempt_at <= ?) + ORDER BY d.created_at ASC + LIMIT ? + """, + (now, limit), + ).fetchall() + + processed = 0 + for row in rows: + delivery = dict(row) + payload = from_json(delivery["payload"], {}) + self._send_delivery(delivery, payload) + processed += 1 + return processed + + def _send_delivery(self, delivery: dict[str, Any], alert: dict[str, Any]) -> None: + attempts = int(delivery["attempts"]) + 1 + message = build_feishu_message(alert, delivery) + encoded = json.dumps(message, ensure_ascii=False).encode() + request = urllib.request.Request( + delivery["webhook_url"], + data=encoded, + headers={"Content-Type": "application/json"}, + method="POST", + ) + response_code: int | None = None + response_body: str | None = None + error: str | None = None + status = "sent" + + try: + with urllib.request.urlopen(request, timeout=self.settings.feishu_timeout_seconds) as response: + response_code = response.getcode() + response_body = response.read(2048).decode(errors="replace") + if response_code >= 400: + status = "failed" + error = f"Feishu webhook returned HTTP {response_code}" + except urllib.error.HTTPError as exc: + response_code = exc.code + response_body = exc.read(2048).decode(errors="replace") + status = "failed" + error = f"Feishu webhook returned HTTP {exc.code}" + except Exception as exc: + status = "failed" + error = str(exc) + + next_attempt_at = None + if status == "failed" and attempts < self.settings.max_delivery_attempts: + status = "retry" + next_time = datetime.now(UTC) + timedelta(seconds=self.settings.retry_backoff_seconds * attempts) + next_attempt_at = next_time.replace(microsecond=0).isoformat() + + with self.db.connect() as conn: + conn.execute( + """ + UPDATE deliveries + SET status = ?, attempts = ?, next_attempt_at = ?, last_attempt_at = ?, + response_code = ?, response_body = ?, error = ?, updated_at = ? + WHERE id = ? + """, + ( + status, + attempts, + next_attempt_at, + now_iso(), + response_code, + response_body, + error, + now_iso(), + delivery["id"], + ), + ) + failed_open = conn.execute( + """ + SELECT COUNT(*) AS count + FROM deliveries + WHERE alert_id = ? AND status IN ('pending', 'retry', 'failed') + """, + (delivery["alert_id"],), + ).fetchone()["count"] + conn.execute( + "UPDATE alerts SET status = ? WHERE id = ?", + ("delivered" if failed_open == 0 else "partial", delivery["alert_id"]), + ) diff --git a/app/server.py b/app/server.py new file mode 100644 index 0000000..81f9936 --- /dev/null +++ b/app/server.py @@ -0,0 +1,587 @@ +from __future__ import annotations + +import html +import json +import mimetypes +import os +from http import HTTPStatus +from http.cookies import SimpleCookie +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any +from urllib.parse import parse_qs, urlparse + +from app.auth import COOKIE_NAME, check_credentials, hash_password, is_valid_session, make_session_cookie +from app.config import Settings, get_settings +from app.db import Database, from_json, now_iso, to_json +from app.dispatcher import Dispatcher, ValidationError + + +class AppContext: + def __init__(self, settings: Settings): + self.settings = settings + self.db = Database(settings) + self.db.migrate(settings) + self.dispatcher = Dispatcher(self.db, settings) + + +def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any] | list[Any]) -> None: + body = json.dumps(payload, ensure_ascii=False).encode() + handler.send_response(status) + handler.send_header("Content-Type", "application/json; charset=utf-8") + handler.send_header("Content-Length", str(len(body))) + handler.end_headers() + handler.wfile.write(body) + + +def redirect(handler: BaseHTTPRequestHandler, location: str) -> None: + handler.send_response(HTTPStatus.SEE_OTHER) + handler.send_header("Location", location) + handler.end_headers() + + +def read_body(handler: BaseHTTPRequestHandler) -> bytes: + length = int(handler.headers.get("Content-Length", "0") or "0") + return handler.rfile.read(length) + + +def parse_form(handler: BaseHTTPRequestHandler) -> dict[str, str]: + data = read_body(handler).decode() + return {key: values[-1] for key, values in parse_qs(data).items()} + + +def parse_form_multi(handler: BaseHTTPRequestHandler) -> dict[str, list[str]]: + return parse_qs(read_body(handler).decode()) + + +def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]: + try: + value = json.loads(read_body(handler).decode() or "{}") + except json.JSONDecodeError as exc: + raise ValidationError("Request body must be valid JSON") from exc + if not isinstance(value, dict): + raise ValidationError("Request body must be a JSON object") + return value + + +def target_select_options( + targets: list[dict[str, Any]], + selected_ids: list[int] | None = None, + placeholder: bool = False, +) -> str: + selected_ids = selected_ids or [] + options = [''] if placeholder else [] + for target in targets: + selected = "selected" if target["id"] in selected_ids else "" + disabled = "" if target["enabled"] else "disabled" + suffix = "" if target["enabled"] else " (停用)" + options.append( + f'' + ) + return "".join(options) + + +class Handler(BaseHTTPRequestHandler): + context: AppContext + + def log_message(self, format: str, *args: Any) -> None: + print("%s - - [%s] %s" % (self.address_string(), self.log_date_time_string(), format % args)) + + def do_GET(self) -> None: + parsed = urlparse(self.path) + if parsed.path == "/health": + json_response(self, 200, {"ok": True}) + return + if parsed.path == "/login": + self.render_login() + return + if parsed.path.startswith("/static/"): + self.serve_static(parsed.path) + return + if not self.require_auth(): + return + if parsed.path in ("/", "/dashboard"): + self.render_dashboard() + elif parsed.path == "/targets": + self.render_targets() + elif parsed.path == "/rules": + self.render_rules() + elif parsed.path == "/logs": + self.render_logs() + elif parsed.path == "/test": + self.render_test() + elif parsed.path == "/account": + self.render_account() + elif parsed.path == "/api/targets": + json_response(self, 200, self.list_targets()) + elif parsed.path == "/api/rules": + json_response(self, 200, self.list_rules()) + elif parsed.path == "/api/logs": + json_response(self, 200, self.list_logs()) + else: + self.send_error(404) + + def do_POST(self) -> None: + parsed = urlparse(self.path) + if parsed.path == "/webhook/tradingview": + self.handle_tradingview_webhook() + return + if parsed.path == "/login": + self.handle_login() + return + if not self.require_auth(): + return + routes = { + "/targets/create": self.create_target, + "/targets/update": self.update_target, + "/targets/delete": self.delete_target, + "/rules/create": self.create_rule, + "/rules/update": self.update_rule, + "/rules/delete": self.delete_rule, + "/test/send": self.send_test, + "/account/password": self.change_password, + "/deliveries/retry": self.retry_deliveries, + "/logout": self.logout, + } + handler = routes.get(parsed.path) + if not handler: + self.send_error(404) + return + handler() + + def require_auth(self) -> bool: + if is_valid_session(self.context.settings, self.headers.get("Cookie")): + return True + redirect(self, "/login") + return False + + def layout(self, title: str, body: str) -> bytes: + nav = [ + ("/dashboard", "概览"), + ("/rules", "路由规则"), + ("/targets", "飞书 Webhook"), + ("/logs", "日志"), + ("/test", "测试发送"), + ("/account", "账号安全"), + ] + items = "".join(f'{label}' for href, label in nav) + return f""" + + + + + {html.escape(title)} + + + + + +
{body}
+ +""".encode() + + def send_html(self, title: str, body: str) -> None: + content = self.layout(title, body) + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(content))) + self.end_headers() + self.wfile.write(content) + + def render_login(self) -> None: + content = """ + + + + + Login + + + +
+

TV Dispatch

+

TradingView alert routing console

+ + + +
+ +""".encode() + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(content))) + self.end_headers() + self.wfile.write(content) + + def handle_login(self) -> None: + form = parse_form(self) + if not check_credentials( + self.context.settings, + form.get("username", ""), + form.get("password", ""), + self.get_admin_password_hash(), + ): + redirect(self, "/login") + return + cookie = SimpleCookie() + cookie[COOKIE_NAME] = make_session_cookie(self.context.settings) + cookie[COOKIE_NAME]["path"] = "/" + cookie[COOKIE_NAME]["httponly"] = True + cookie[COOKIE_NAME]["samesite"] = "Lax" + self.send_response(HTTPStatus.SEE_OTHER) + self.send_header("Location", "/dashboard") + self.send_header("Set-Cookie", cookie.output(header="").strip()) + self.end_headers() + + def get_admin_password_hash(self) -> str: + with self.context.db.connect() as conn: + row = conn.execute("SELECT password_hash FROM admin_settings WHERE id = 1").fetchone() + return row["password_hash"] + + def logout(self) -> None: + self.send_response(HTTPStatus.SEE_OTHER) + self.send_header("Location", "/login") + self.send_header("Set-Cookie", f"{COOKIE_NAME}=; Path=/; Max-Age=0; HttpOnly; SameSite=Lax") + self.end_headers() + + def serve_static(self, path: str) -> None: + local_path = os.path.join(os.path.dirname(__file__), "static", os.path.basename(path)) + if not os.path.exists(local_path): + self.send_error(404) + return + with open(local_path, "rb") as file: + content = file.read() + self.send_response(200) + self.send_header("Content-Type", mimetypes.guess_type(local_path)[0] or "application/octet-stream") + self.send_header("Content-Length", str(len(content))) + self.end_headers() + self.wfile.write(content) + + def handle_tradingview_webhook(self) -> None: + try: + payload = parse_json_body(self) + result = self.context.dispatcher.receive_alert(payload) + json_response(self, 202, result) + except ValidationError as exc: + json_response(self, 400, {"error": str(exc)}) + + def list_targets(self) -> list[dict[str, Any]]: + with self.context.db.connect() as conn: + rows = conn.execute("SELECT * FROM webhook_targets ORDER BY id DESC").fetchall() + return [dict(row) for row in rows] + + def list_rules(self) -> list[dict[str, Any]]: + with self.context.db.connect() as conn: + rows = conn.execute("SELECT * FROM routing_rules ORDER BY priority ASC, id DESC").fetchall() + rules = [] + for row in rows: + item = dict(row) + item["target_ids"] = from_json(item["target_ids"], []) + rules.append(item) + return rules + + def list_logs(self) -> dict[str, list[dict[str, Any]]]: + with self.context.db.connect() as conn: + alerts = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 100").fetchall() + deliveries = conn.execute("SELECT * FROM deliveries ORDER BY id DESC LIMIT 200").fetchall() + return {"alerts": [dict(row) for row in alerts], "deliveries": [dict(row) for row in deliveries]} + + def render_dashboard(self) -> None: + with self.context.db.connect() as conn: + counts = { + "alerts": conn.execute("SELECT COUNT(*) AS c FROM alerts").fetchone()["c"], + "rules": conn.execute("SELECT COUNT(*) AS c FROM routing_rules").fetchone()["c"], + "targets": conn.execute("SELECT COUNT(*) AS c FROM webhook_targets").fetchone()["c"], + "pending": conn.execute("SELECT COUNT(*) AS c FROM deliveries WHERE status IN ('pending','retry')").fetchone()["c"], + } + recent = conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 8").fetchall() + cards = "".join(f'
{label}{value}
' for label, value in [ + ("Alerts", counts["alerts"]), + ("Rules", counts["rules"]), + ("Targets", counts["targets"]), + ("Pending", counts["pending"]), + ]) + rows = "".join( + f"{row['id']}{html.escape(row['symbol'])}{html.escape(row['timeframe'])}{html.escape(row['strategy'])}{html.escape(row['status'])}{row['created_at']}" + for row in recent + ) + self.send_html("概览", f"

概览

结构化 alert 分发、飞书转发和重试状态。

{cards}

最近 Alert

{rows}
ID品种周期策略状态时间
") + + def render_targets(self) -> None: + targets = self.list_targets() + rows = "".join( + f""" +{target['id']} + + + +
+
+""" + for target in targets + ) + form = """
+

新增飞书 Webhook

+ + + + +
""" + self.send_html("飞书 Webhook", f"

飞书 Webhook

维护所有可分发的飞书机器人地址。

{form}{rows}
ID名称URL状态操作
") + + def render_rules(self) -> None: + targets = self.list_targets() + rules = self.list_rules() + rows = "" + for rule in rules: + message_type_options = "".join( + f'' + for value, label in [("card", "Card"), ("text", "Text")] + ) + selected_targets = target_select_options(targets, rule["target_ids"], placeholder=True) + rows += f""" +{rule['id']} + + + + + + + + + + +
""" + create_target_options = target_select_options(targets, placeholder=True) + form = f"""
+

新增路由规则

+
+ + + + + +
+
+ + + + +
+ + +
""" + self.send_html("路由规则", f"

路由规则

每条规则选择一个飞书 Webhook。模板支持 TradingView JSON 字段,例如 {{{{symbol}}}}、{{{{timeframe}}}}、{{{{strategy}}}}、{{{{price}}}},嵌套字段可写 {{{{order.id}}}}。

{form}{rows}
ID名称周期品种策略优先级消息标题模板内容模板发送到状态操作
") + + def render_logs(self) -> None: + logs = self.list_logs() + alert_rows = "".join( + f"{row['id']}{html.escape(row['symbol'])}{html.escape(row['timeframe'])}{html.escape(row['strategy'])}{html.escape(row['status'])}{html.escape(row['error'] or '')}{row['created_at']}" + for row in logs["alerts"] + ) + delivery_rows = "".join( + f"{row['id']}{row['alert_id']}{html.escape(row['target_name'])}{html.escape(row['status'])}{row['attempts']}{html.escape(str(row['response_code'] or ''))}{html.escape(row['error'] or '')}{html.escape(row['next_attempt_at'] or '')}" + for row in logs["deliveries"] + ) + body = f"""

日志

最近 100 条 alert 和 200 条分发任务。

+
+

Alert 日志

{alert_rows}
ID品种周期策略状态错误时间
+

Delivery 日志

{delivery_rows}
IDAlert目标状态次数HTTP错误下次重试
""" + self.send_html("日志", body) + + def render_test(self) -> None: + sample = html.escape(json.dumps({"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000}, indent=2)) + result = getattr(self, "_test_result_html", "") + body = f"""

测试发送

提交一条模拟 TradingView alert,走完整匹配和飞书转发流程。

+
+ + +
""" + if result: + body += result + self.send_html("测试发送", body) + + def render_account(self) -> None: + body = """

账号安全

修改当前管理员密码,修改成功后会退出登录。

+
+

修改密码

+ + + + +
""" + self.send_html("账号安全", body) + + def create_target(self) -> None: + form = parse_form(self) + now = now_iso() + with self.context.db.connect() as conn: + conn.execute( + "INSERT INTO webhook_targets (name, webhook_url, enabled, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", + (form["name"].strip(), form["webhook_url"].strip(), 1 if form.get("enabled") == "on" else 0, now, now), + ) + redirect(self, "/targets") + + def update_target(self) -> None: + form = parse_form(self) + with self.context.db.connect() as conn: + conn.execute( + "UPDATE webhook_targets SET name = ?, webhook_url = ?, enabled = ?, updated_at = ? WHERE id = ?", + (form["name"].strip(), form["webhook_url"].strip(), 1 if form.get("enabled") == "on" else 0, now_iso(), form["id"]), + ) + redirect(self, "/targets") + + def delete_target(self) -> None: + form = parse_form(self) + with self.context.db.connect() as conn: + conn.execute("DELETE FROM webhook_targets WHERE id = ?", (form["id"],)) + redirect(self, "/targets") + + def create_rule(self) -> None: + form = parse_form_multi(self) + target_ids = [int(value) for value in form.get("target_ids", [])] + now = now_iso() + with self.context.db.connect() as conn: + conn.execute( + """ + INSERT INTO routing_rules ( + name, timeframe, symbol, strategy, priority, message_type, + card_title_template, card_body_template, enabled, target_ids, + created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + form.get("name", [""])[-1].strip(), + form.get("timeframe", [""])[-1].strip(), + form.get("symbol", [""])[-1].strip().upper(), + form.get("strategy", [""])[-1].strip(), + int(form.get("priority", ["100"])[-1]), + form.get("message_type", ["card"])[-1], + form.get("card_title_template", ["TradingView {{symbol}} {{action}}"])[-1].strip(), + form.get("card_body_template", [""])[-1].strip(), + 1 if form.get("enabled", [""])[-1] == "on" else 0, + to_json(target_ids), + now, + now, + ), + ) + redirect(self, "/rules") + + def delete_rule(self) -> None: + form = parse_form(self) + with self.context.db.connect() as conn: + conn.execute("DELETE FROM routing_rules WHERE id = ?", (form["id"],)) + redirect(self, "/rules") + + def update_rule(self) -> None: + form = parse_form_multi(self) + target_ids = [int(value) for value in form.get("target_ids", [])] + with self.context.db.connect() as conn: + conn.execute( + """ + UPDATE routing_rules + SET name = ?, timeframe = ?, symbol = ?, strategy = ?, priority = ?, + message_type = ?, card_title_template = ?, card_body_template = ?, + enabled = ?, target_ids = ?, updated_at = ? + WHERE id = ? + """, + ( + form.get("name", [""])[-1].strip(), + form.get("timeframe", [""])[-1].strip(), + form.get("symbol", [""])[-1].strip().upper(), + form.get("strategy", [""])[-1].strip(), + int(form.get("priority", ["100"])[-1]), + form.get("message_type", ["card"])[-1], + form.get("card_title_template", ["TradingView {{symbol}} {{action}}"])[-1].strip(), + form.get("card_body_template", [""])[-1].strip(), + 1 if form.get("enabled", [""])[-1] == "on" else 0, + to_json(target_ids), + now_iso(), + form.get("id", [""])[-1], + ), + ) + redirect(self, "/rules") + + def send_test(self) -> None: + form = parse_form(self) + payload_text = form.get("payload", "{}") + try: + payload = json.loads(payload_text) + result = self.context.dispatcher.receive_alert(payload) + delivery_text = ", ".join(str(item) for item in result.get("delivery_ids", [])) or "-" + self._test_result_html = f"""
+

测试结果

+
+
Alert ID{result.get("alert_id")}
+
状态{html.escape(str(result.get("status")))}
+
命中规则{html.escape(str(result.get("matched_rule_id") or "-"))}
+
Delivery{html.escape(delivery_text)}
+
+
查看响应 JSON
{html.escape(json.dumps(result, ensure_ascii=False, indent=2))}
+
""" + self.render_test() + except (json.JSONDecodeError, ValidationError) as exc: + self._test_result_html = f"""
+

测试失败

+

{html.escape(str(exc))}

+
""" + self.render_test() + + def change_password(self) -> None: + form = parse_form(self) + current_password = form.get("current_password", "") + new_password = form.get("new_password", "") + confirm_password = form.get("confirm_password", "") + if not check_credentials( + self.context.settings, + self.context.settings.admin_username, + current_password, + self.get_admin_password_hash(), + ): + json_response(self, 400, {"error": "当前密码不正确"}) + return + if len(new_password) < 8: + json_response(self, 400, {"error": "新密码至少需要 8 位"}) + return + if new_password != confirm_password: + json_response(self, 400, {"error": "两次输入的新密码不一致"}) + return + with self.context.db.connect() as conn: + conn.execute( + "UPDATE admin_settings SET password_hash = ?, updated_at = ? WHERE id = 1", + (hash_password(new_password), now_iso()), + ) + self.logout() + + def retry_deliveries(self) -> None: + self.context.dispatcher.process_due_deliveries(limit=100) + redirect(self, "/logs") + + +def make_handler(context: AppContext) -> type[Handler]: + class BoundHandler(Handler): + pass + + BoundHandler.context = context + return BoundHandler + + +def run() -> None: + settings = get_settings() + context = AppContext(settings) + context.db.cleanup_old_logs(settings.retention_days) + server = ThreadingHTTPServer((settings.host, settings.port), make_handler(context)) + print(f"Serving {settings.app_name} on http://{settings.host}:{settings.port}") + server.serve_forever() + + +if __name__ == "__main__": + run() diff --git a/app/static/app.js b/app/static/app.js new file mode 100644 index 0000000..ca5b27b --- /dev/null +++ b/app/static/app.js @@ -0,0 +1,29 @@ +function updateMessageForm(scope) { + const typeSelect = scope.querySelector("[data-message-type]"); + if (!typeSelect) return; + const isText = typeSelect.value === "text"; + const titleLabel = scope.querySelector("[data-title-label]"); + const bodyLabel = scope.querySelector("[data-body-label]"); + const titleTemplate = scope.querySelector("[data-title-template]"); + const bodyTemplate = scope.querySelector("[data-body-template]"); + + scope.classList.toggle("text-message", isText); + if (titleLabel) titleLabel.textContent = isText ? "文本标题模板" : "卡片标题模板"; + if (bodyLabel) bodyLabel.textContent = isText ? "文本内容模板" : "卡片正文模板"; + if (titleTemplate) { + titleTemplate.placeholder = isText ? "例如:TradingView {{symbol}}" : "例如:TradingView {{symbol}} {{action}}"; + } + if (bodyTemplate) { + bodyTemplate.placeholder = isText ? "{{symbol}} {{timeframe}} {{strategy}} {{action}}" : "**品种**: {{symbol}}"; + } +} + +document.addEventListener("DOMContentLoaded", () => { + document.querySelectorAll("[data-message-form]").forEach((scope) => { + updateMessageForm(scope); + const typeSelect = scope.querySelector("[data-message-type]"); + if (typeSelect) { + typeSelect.addEventListener("change", () => updateMessageForm(scope)); + } + }); +}); diff --git a/app/static/styles.css b/app/static/styles.css new file mode 100644 index 0000000..499ee59 --- /dev/null +++ b/app/static/styles.css @@ -0,0 +1,385 @@ +:root { + --bg: #f5f3ec; + --ink: #1e2528; + --muted: #667071; + --line: #d9d4c8; + --panel: #fffdf8; + --accent: #0f766e; + --accent-strong: #0b534d; + --danger: #b42318; + --shadow: 0 18px 45px rgba(40, 34, 23, 0.12); +} + +* { + box-sizing: border-box; +} + +body { + margin: 0; + background: var(--bg); + color: var(--ink); + font-family: ui-serif, Georgia, "Times New Roman", serif; +} + +body::before { + content: ""; + position: fixed; + inset: 0; + pointer-events: none; + background-image: linear-gradient(rgba(30, 37, 40, 0.035) 1px, transparent 1px), + linear-gradient(90deg, rgba(30, 37, 40, 0.03) 1px, transparent 1px); + background-size: 28px 28px; +} + +.sidebar { + position: fixed; + inset: 0 auto 0 0; + width: 236px; + padding: 28px 20px; + background: #182326; + color: #f7f1e4; + display: flex; + flex-direction: column; + gap: 28px; +} + +.brand { + font-size: 26px; + font-weight: 800; + letter-spacing: 0; +} + +nav { + display: grid; + gap: 8px; +} + +nav a { + color: #dfe8e4; + text-decoration: none; + padding: 11px 12px; + border-radius: 6px; + font-family: ui-sans-serif, system-ui, sans-serif; +} + +nav a:hover { + background: rgba(255, 255, 255, 0.09); +} + +.shell { + position: relative; + margin-left: 236px; + min-height: 100vh; + padding: 36px; +} + +header { + margin-bottom: 24px; +} + +h1, +h2 { + margin: 0 0 8px; + line-height: 1.1; +} + +h1 { + font-size: 38px; +} + +h2 { + font-size: 22px; + margin-top: 26px; +} + +p { + color: var(--muted); + margin: 0; + font-family: ui-sans-serif, system-ui, sans-serif; +} + +.metrics { + display: grid; + grid-template-columns: repeat(4, minmax(0, 1fr)); + gap: 16px; + margin-bottom: 28px; +} + +.metric, +.panel, +table { + background: var(--panel); + border: 1px solid var(--line); + box-shadow: var(--shadow); +} + +.metric { + border-radius: 8px; + padding: 20px; +} + +.metric span { + display: block; + color: var(--muted); + font-family: ui-sans-serif, system-ui, sans-serif; + font-size: 13px; +} + +.metric strong { + display: block; + margin-top: 8px; + font-size: 36px; +} + +.panel { + border-radius: 8px; + padding: 22px; + margin-bottom: 24px; +} + +.grid { + display: grid; + grid-template-columns: repeat(7, minmax(0, 1fr)); + gap: 14px; +} + +label { + display: grid; + gap: 7px; + color: var(--muted); + font: 600 13px ui-sans-serif, system-ui, sans-serif; + margin-bottom: 14px; +} + +input, +select, +textarea { + width: 100%; + border: 1px solid var(--line); + border-radius: 6px; + padding: 11px 12px; + background: #fff; + color: var(--ink); + font: 15px ui-sans-serif, system-ui, sans-serif; +} + +textarea { + font-family: ui-monospace, SFMono-Regular, Menlo, monospace; +} + +td input, +td select, +td textarea { + min-width: 130px; +} + +td textarea { + min-width: 240px; +} + +.narrow { + max-width: 560px; +} + +.field-compact, +.field-target { + width: fit-content; + max-width: 100%; +} + +.select-compact { + width: 140px; +} + +.select-target { + width: clamp(220px, 34vw, 360px); +} + +td .select-compact { + min-width: 110px; +} + +td .select-target { + min-width: 180px; + max-width: 260px; +} + +.text-message [data-title-template] { + background: #f7f3e8; +} + +.check { + display: inline-flex; + align-items: center; + gap: 8px; + margin-right: 16px; +} + +.check input { + width: auto; +} + +.checks { + margin: 8px 0 12px; +} + +button { + border: 0; + border-radius: 6px; + padding: 11px 16px; + background: var(--accent); + color: white; + font-weight: 800; + cursor: pointer; +} + +button:hover { + background: var(--accent-strong); +} + +.ghost { + margin-top: auto; + width: 100%; + background: rgba(255, 255, 255, 0.1); +} + +.danger { + background: var(--danger); +} + +.result-panel { + border-radius: 8px; + border: 1px solid var(--line); + background: var(--panel); + box-shadow: var(--shadow); + padding: 22px; + margin-top: 22px; +} + +.result-panel.success { + border-left: 5px solid var(--accent); +} + +.result-panel.error { + border-left: 5px solid var(--danger); +} + +.result-grid { + display: grid; + grid-template-columns: repeat(4, minmax(0, 1fr)); + gap: 12px; + margin: 16px 0; +} + +.result-grid div { + border: 1px solid var(--line); + border-radius: 6px; + padding: 12px; + background: #fff; +} + +.result-grid span { + display: block; + color: var(--muted); + font: 12px ui-sans-serif, system-ui, sans-serif; +} + +.result-grid strong { + display: block; + margin-top: 5px; + font: 700 18px ui-sans-serif, system-ui, sans-serif; +} + +details { + font-family: ui-sans-serif, system-ui, sans-serif; +} + +pre { + overflow: auto; + border-radius: 6px; + padding: 14px; + background: #182326; + color: #f7f1e4; +} + +.inline { + display: inline; +} + +table { + width: 100%; + border-collapse: collapse; + border-radius: 8px; + overflow: hidden; + font-family: ui-sans-serif, system-ui, sans-serif; + margin-bottom: 28px; +} + +th, +td { + text-align: left; + padding: 12px 13px; + border-bottom: 1px solid var(--line); + vertical-align: top; + font-size: 14px; +} + +th { + background: #ece7da; + color: #3b4445; +} + +.url { + max-width: 460px; + word-break: break-all; +} + +.status { + display: inline-block; + border: 1px solid var(--line); + border-radius: 999px; + padding: 3px 9px; + background: #f7f3e8; + font-size: 12px; +} + +.login-page { + min-height: 100vh; + display: grid; + place-items: center; +} + +.login-card { + width: min(420px, calc(100vw - 32px)); + padding: 30px; + border-radius: 8px; + background: var(--panel); + border: 1px solid var(--line); + box-shadow: var(--shadow); +} + +.login-card h1 { + margin-bottom: 6px; +} + +.login-card p { + margin-bottom: 22px; +} + +@media (max-width: 920px) { + .sidebar { + position: static; + width: auto; + } + + .shell { + margin-left: 0; + padding: 22px; + } + + .metrics, + .grid, + .result-grid { + grid-template-columns: 1fr; + } +} diff --git a/app/worker.py b/app/worker.py new file mode 100644 index 0000000..4aef191 --- /dev/null +++ b/app/worker.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import os +import time + +from app.config import get_settings +from app.db import Database +from app.dispatcher import Dispatcher + + +def run() -> None: + settings = get_settings() + db = Database(settings) + db.migrate(settings) + dispatcher = Dispatcher(db, settings) + interval = int(os.getenv("WORKER_INTERVAL_SECONDS", "15")) + print(f"Retry worker running every {interval}s") + while True: + processed = dispatcher.process_due_deliveries(limit=100) + if processed: + print(f"processed {processed} due deliveries") + time.sleep(interval) + + +if __name__ == "__main__": + run() diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..08ea6d9 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,31 @@ +services: + dispatcher: + build: . + ports: + - "8000:8000" + environment: + ADMIN_USERNAME: admin + ADMIN_PASSWORD: change-me-now + SESSION_SECRET: replace-with-a-long-random-secret + RETENTION_DAYS: 30 + MAX_DELIVERY_ATTEMPTS: 3 + RETRY_BACKOFF_SECONDS: 60 + volumes: + - dispatcher-data:/data + + worker: + build: . + command: ["python", "-m", "app.worker"] + environment: + ADMIN_USERNAME: admin + ADMIN_PASSWORD: change-me-now + SESSION_SECRET: replace-with-a-long-random-secret + RETENTION_DAYS: 30 + MAX_DELIVERY_ATTEMPTS: 3 + RETRY_BACKOFF_SECONDS: 60 + WORKER_INTERVAL_SECONDS: 15 + volumes: + - dispatcher-data:/data + +volumes: + dispatcher-data: diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..edf444b --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +# The runtime intentionally uses only Python standard library modules. diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py new file mode 100644 index 0000000..d0c422b --- /dev/null +++ b/tests/test_dispatcher.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import os +import tempfile +import unittest + +from app.config import Settings +from app.db import Database, now_iso, to_json +from app.dispatcher import Dispatcher, ValidationError, build_feishu_message + + +class DispatcherTest(unittest.TestCase): + def setUp(self) -> None: + self.tmpdir = tempfile.TemporaryDirectory() + self.settings = Settings( + database_path=os.path.join(self.tmpdir.name, "test.db"), + max_delivery_attempts=2, + retry_backoff_seconds=1, + feishu_timeout_seconds=1, + ) + self.db = Database(self.settings) + self.db.migrate(self.settings) + self.dispatcher = Dispatcher(self.db, self.settings) + + def tearDown(self) -> None: + self.tmpdir.cleanup() + + def add_target(self, name: str = "ops", url: str = "http://127.0.0.1:9/hook") -> int: + now = now_iso() + with self.db.connect() as conn: + cur = conn.execute( + "INSERT INTO webhook_targets (name, webhook_url, enabled, created_at, updated_at) VALUES (?, ?, 1, ?, ?)", + (name, url, now, now), + ) + return int(cur.lastrowid) + + def add_rule(self, target_id: int, priority: int = 100, name: str = "rule") -> int: + now = now_iso() + with self.db.connect() as conn: + cur = conn.execute( + """ + INSERT INTO routing_rules + ( + name, timeframe, symbol, strategy, priority, message_type, + card_title_template, card_body_template, enabled, target_ids, + created_at, updated_at + ) + VALUES (?, '5m', 'BTCUSDT', 'breakout', ?, 'card', 'Signal {{symbol}}', 'Price {{price}}', 1, ?, ?, ?) + """, + (name, priority, to_json([target_id]), now, now), + ) + return int(cur.lastrowid) + + def test_missing_required_fields_are_rejected(self) -> None: + with self.assertRaises(ValidationError): + self.dispatcher.receive_alert({"symbol": "BTCUSDT"}) + + def test_unmatched_alert_is_stored(self) -> None: + result = self.dispatcher.receive_alert({"timeframe": "15m", "symbol": "ETHUSDT", "strategy": "trend"}) + self.assertEqual(result["status"], "unmatched") + with self.db.connect() as conn: + alert = conn.execute("SELECT * FROM alerts WHERE id = ?", (result["alert_id"],)).fetchone() + self.assertEqual(alert["status"], "unmatched") + + def test_highest_priority_rule_wins(self) -> None: + slow_target = self.add_target("slow") + fast_target = self.add_target("fast") + slow_rule = self.add_rule(slow_target, priority=100, name="slow") + fast_rule = self.add_rule(fast_target, priority=1, name="fast") + + result = self.dispatcher.receive_alert( + {"timeframe": "5m", "symbol": "btcusdt", "strategy": "breakout", "action": "buy"} + ) + + self.assertEqual(result["matched_rule_id"], fast_rule) + self.assertNotEqual(result["matched_rule_id"], slow_rule) + with self.db.connect() as conn: + delivery = conn.execute("SELECT * FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone() + self.assertEqual(delivery["target_id"], fast_target) + + def test_failed_delivery_is_marked_for_retry(self) -> None: + target_id = self.add_target() + self.add_rule(target_id) + + result = self.dispatcher.receive_alert( + {"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy"} + ) + + with self.db.connect() as conn: + delivery = conn.execute("SELECT * FROM deliveries WHERE alert_id = ?", (result["alert_id"],)).fetchone() + self.assertEqual(delivery["status"], "retry") + self.assertEqual(delivery["attempts"], 1) + self.assertIsNotNone(delivery["error"]) + + def test_card_template_uses_alert_fields(self) -> None: + message = build_feishu_message( + {"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000}, + { + "message_type": "card", + "card_title_template": "{{symbol}} {{action}}", + "card_body_template": "**价格** {{price}}", + }, + ) + + self.assertEqual(message["msg_type"], "interactive") + self.assertEqual(message["card"]["header"]["title"]["content"], "BTCUSDT buy") + self.assertEqual(message["card"]["elements"][0]["text"]["content"], "**价格** 68000") + + def test_template_accepts_legacy_single_braces(self) -> None: + message = build_feishu_message( + {"timeframe": "5m", "symbol": "BTCUSDT", "strategy": "breakout", "action": "buy", "price": 68000}, + { + "message_type": "text", + "card_title_template": "TradingView {symbol} {action}", + "card_body_template": "价格 {price}", + }, + ) + + self.assertEqual(message["content"]["text"], "TradingView BTCUSDT buy\n价格 68000") + + +if __name__ == "__main__": + unittest.main()