tradingview-alert-dispatcher/app/dispatcher.py
2026-05-26 21:10:06 +08:00

477 lines
18 KiB
Python

from __future__ import annotations
import json
import re
import socket
import urllib.error
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from typing import Any
from app.config import Settings
from app.db import Database, from_json, now_iso, to_json
UTC = timezone.utc
TEMPLATE_PATTERN = re.compile(r"{{\s*([a-zA-Z0-9_.-]+)\s*}}|(?<!{){\s*([a-zA-Z0-9_.-]+)\s*}(?!})")
class ValidationError(ValueError):
pass
def normalize_alert(payload: dict[str, Any]) -> dict[str, Any]:
normalized = dict(payload)
normalized["timeframe"] = str(payload.get("timeframe", "")).strip()
normalized["symbol"] = str(payload.get("symbol", "")).strip().upper()
normalized["strategy"] = str(payload.get("strategy", "")).strip()
if "price" in normalized and normalized["price"] not in (None, ""):
try:
normalized["price"] = float(normalized["price"])
except (TypeError, ValueError) as exc:
raise ValidationError("price must be numeric") from exc
return normalized
def resolve_template_value(alert: dict[str, Any], field: str) -> str:
value: Any = alert
for part in field.split("."):
if isinstance(value, dict) and part in value:
value = value[part]
else:
return ""
if value is None:
return ""
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False)
return str(value)
def render_template(template: str, alert: dict[str, Any]) -> str:
return TEMPLATE_PATTERN.sub(lambda match: resolve_template_value(alert, match.group(1) or match.group(2)), template)
def default_body(alert: dict[str, Any]) -> str:
action = alert.get("action") or alert.get("signal") or "alert"
lines = [
f"TradingView 信号: {alert.get('symbol') or '-'}",
f"周期: {alert.get('timeframe') or '-'}",
f"策略: {alert.get('strategy') or '-'}",
f"动作: {action}",
]
if alert.get("price") is not None:
lines.append(f"价格: {alert['price']}")
if alert.get("time"):
lines.append(f"时间: {alert['time']}")
return "\n".join(lines)
def build_feishu_message(alert: dict[str, Any], rule: dict[str, Any] | None = None) -> dict[str, Any]:
rule = rule or {}
title_template = rule.get("card_title_template") or "TradingView {{symbol}} {{action}}"
body_template = rule.get("card_body_template") or default_body(alert)
title = render_template(title_template, alert).strip() or f"TradingView {alert.get('symbol') or 'Alert'}"
body = render_template(body_template, alert).strip() or default_body(alert)
return {
"msg_type": "interactive",
"card": {
"config": {"wide_screen_mode": True},
"header": {
"template": "blue",
"title": {"tag": "plain_text", "content": title},
},
"elements": [
{"tag": "div", "text": {"tag": "lark_md", "content": body}},
{
"tag": "hr",
},
{
"tag": "note",
"elements": [
{
"tag": "plain_text",
"content": f"{alert.get('symbol') or '-'} · {alert.get('timeframe') or '-'} · {alert.get('strategy') or '-'}",
}
],
},
],
},
}
def feishu_response_error(response_body: str | None) -> str | None:
if not response_body:
return None
try:
payload = json.loads(response_body)
except json.JSONDecodeError:
return None
if not isinstance(payload, dict):
return None
code = payload.get("code", payload.get("StatusCode", payload.get("status_code")))
if code in (None, 0, "0"):
return None
message = (
payload.get("msg")
or payload.get("message")
or payload.get("StatusMessage")
or payload.get("status_msg")
or "unknown error"
)
return f"feishu_error: code={code}, message={message}"
class Dispatcher:
def __init__(self, db: Database, settings: Settings):
self.db = db
self.settings = settings
def find_matching_rule(self, alert: dict[str, Any]) -> dict[str, Any] | None:
normalized = normalize_alert(alert)
with self.db.connect() as conn:
row = conn.execute(
"""
SELECT * FROM routing_rules
WHERE enabled = 1
AND (timeframe = '' OR timeframe = ?)
AND (symbol = '' OR upper(symbol) = ?)
AND (strategy = '' OR strategy = ?)
AND (timeframe <> '' OR symbol <> '' OR strategy <> '')
ORDER BY priority ASC,
(
CASE WHEN timeframe <> '' THEN 1 ELSE 0 END +
CASE WHEN symbol <> '' THEN 1 ELSE 0 END +
CASE WHEN strategy <> '' THEN 1 ELSE 0 END
) DESC,
id ASC
LIMIT 1
""",
(normalized["timeframe"], normalized["symbol"], normalized["strategy"]),
).fetchone()
return dict(row) if row else None
def explain_unmatched_rule(self, alert: dict[str, Any]) -> str:
normalized = normalize_alert(alert)
with self.db.connect() as conn:
candidates = conn.execute(
"""
SELECT id, name, timeframe, symbol, strategy, enabled, priority
FROM routing_rules
ORDER BY enabled DESC, priority ASC, id ASC
LIMIT 8
"""
).fetchall()
if not candidates:
return "No routing rules configured."
details = []
for row in candidates:
mismatches = []
if not row["enabled"]:
mismatches.append("rule disabled")
if row["timeframe"] and row["timeframe"] != normalized["timeframe"]:
mismatches.append(f"timeframe {normalized['timeframe'] or '-'} != {row['timeframe']}")
if row["symbol"] and row["symbol"].upper() != normalized["symbol"]:
mismatches.append(f"symbol {normalized['symbol'] or '-'} != {row['symbol']}")
if row["strategy"] and row["strategy"] != normalized["strategy"]:
mismatches.append(f"strategy {normalized['strategy'] or '-'} != {row['strategy']}")
reason = "; ".join(mismatches) if mismatches else "matched fields but was not selected"
details.append(f"#{row['id']} {row['name']}: {reason}")
return "No enabled routing rule matched this alert. Closest rules: " + " | ".join(details)
def receive_alert(self, payload: dict[str, Any]) -> dict[str, Any]:
alert = normalize_alert(payload)
created_at = now_iso()
with self.db.connect() as conn:
rule = self.find_matching_rule(alert)
status = "queued" if rule else "unmatched"
error = None if rule else self.explain_unmatched_rule(alert)
cur = conn.execute(
"""
INSERT INTO alerts (
timeframe, symbol, strategy, action, price, payload,
matched_rule_id, status, error, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
alert["timeframe"],
alert["symbol"],
alert["strategy"],
alert.get("action") or alert.get("signal"),
alert.get("price"),
to_json(alert),
rule["id"] if rule else None,
status,
error,
created_at,
),
)
alert_id = int(cur.lastrowid)
delivery_ids: list[int] = []
if rule:
target_ids = from_json(rule["target_ids"], [])
if target_ids:
placeholders = ",".join("?" for _ in target_ids)
targets = conn.execute(
f"SELECT * FROM webhook_targets WHERE id IN ({placeholders})",
target_ids,
).fetchall()
for target in targets:
delivery = conn.execute(
"""
INSERT INTO deliveries (
alert_id, rule_id, target_id, target_name, webhook_url,
status, attempts, next_attempt_at, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, 'pending', 0, ?, ?, ?)
""",
(
alert_id,
rule["id"],
target["id"],
target["name"],
target["webhook_url"],
None,
created_at,
created_at,
),
)
delivery_ids.append(int(delivery.lastrowid))
if rule and not delivery_ids:
status = "unmatched"
error = "Matched rule has no webhook targets."
conn.execute(
"UPDATE alerts SET status = ?, error = ? WHERE id = ?",
(status, error, alert_id),
)
if self.settings.dispatch_inline:
self.process_due_deliveries(
limit=self.settings.delivery_batch_size,
concurrency=self.settings.delivery_concurrency,
)
return {
"alert_id": alert_id,
"status": status,
"matched_rule_id": rule["id"] if rule else None,
"delivery_ids": delivery_ids,
}
def process_due_deliveries(self, limit: int = 25, concurrency: int = 1) -> int:
now = now_iso()
stale_cutoff = (datetime.now(UTC) - timedelta(seconds=max(300, self.settings.retry_backoff_seconds))).replace(
microsecond=0
).isoformat()
with self.db.connect() as conn:
conn.execute(
"""
UPDATE deliveries
SET status = 'retry', next_attempt_at = ?, updated_at = ?,
error = COALESCE(error, 'worker_recovered: stale processing task reset')
WHERE status = 'processing' AND updated_at <= ?
""",
(now, now, stale_cutoff),
)
rows = conn.execute(
"""
SELECT d.*, a.payload, r.card_title_template, r.card_body_template
FROM deliveries d
JOIN alerts a ON a.id = d.alert_id
LEFT JOIN routing_rules r ON r.id = d.rule_id
WHERE d.status IN ('pending', 'retry')
AND (d.next_attempt_at IS NULL OR d.next_attempt_at <= ?)
ORDER BY d.created_at ASC
LIMIT ?
""",
(now, limit),
).fetchall()
claimed_rows = []
for row in rows:
cur = conn.execute(
"""
UPDATE deliveries
SET status = 'processing', updated_at = ?
WHERE id = ?
AND status IN ('pending', 'retry')
""",
(now, row["id"]),
)
if cur.rowcount == 1:
claimed_rows.append(row)
jobs = [(dict(row), from_json(row["payload"], {})) for row in claimed_rows]
if not jobs:
return 0
worker_count = max(1, min(concurrency, len(jobs)))
if worker_count == 1:
for delivery, payload in jobs:
self._send_delivery(delivery, payload)
else:
with ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = [executor.submit(self._send_delivery, delivery, payload) for delivery, payload in jobs]
for future in futures:
future.result()
return len(jobs)
def process_delivery_by_id(self, delivery_id: int) -> bool:
now = now_iso()
with self.db.connect() as conn:
row = conn.execute(
"""
SELECT d.*, a.payload, r.card_title_template, r.card_body_template
FROM deliveries d
JOIN alerts a ON a.id = d.alert_id
LEFT JOIN routing_rules r ON r.id = d.rule_id
WHERE d.id = ?
""",
(delivery_id,),
).fetchone()
if not row or row["status"] == "sent":
return False
conn.execute(
"""
UPDATE deliveries
SET status = 'processing', next_attempt_at = NULL, updated_at = ?
WHERE id = ?
""",
(now, delivery_id),
)
delivery = dict(row)
payload = from_json(delivery["payload"], {})
self._send_delivery(delivery, payload)
return True
def retry_delivery_now(self, delivery_id: int) -> bool:
now = now_iso()
with self.db.connect() as conn:
row = conn.execute(
"""
SELECT status
FROM deliveries
WHERE id = ?
""",
(delivery_id,),
).fetchone()
if not row or row["status"] == "sent":
return False
conn.execute(
"""
UPDATE deliveries
SET status = 'pending', next_attempt_at = ?, updated_at = ?
WHERE id = ?
""",
(now, now, delivery_id),
)
return self.process_delivery_by_id(delivery_id)
def _send_delivery(self, delivery: dict[str, Any], alert: dict[str, Any]) -> None:
attempts = int(delivery["attempts"]) + 1
message = build_feishu_message(alert, delivery)
encoded = json.dumps(message, ensure_ascii=False).encode()
request = urllib.request.Request(
delivery["webhook_url"],
data=encoded,
headers={"Content-Type": "application/json"},
method="POST",
)
response_code: int | None = None
response_body: str | None = None
error: str | None = None
status = "sent"
try:
with urllib.request.urlopen(request, timeout=self.settings.feishu_timeout_seconds) as response:
response_code = response.getcode()
response_body = response.read(2048).decode(errors="replace")
if response_code >= 400:
status = "failed"
error = f"Feishu webhook returned HTTP {response_code}"
else:
error = feishu_response_error(response_body)
if error:
status = "failed"
except urllib.error.HTTPError as exc:
response_code = exc.code
response_body = exc.read(2048).decode(errors="replace")
status = "failed"
error = f"http_error: Feishu webhook returned HTTP {exc.code}"
except (TimeoutError, socket.timeout) as exc:
status = "failed"
error = f"timeout: {exc}"
except urllib.error.URLError as exc:
status = "failed"
error = f"network_error: {exc.reason}"
except Exception as exc:
status = "failed"
error = f"send_error: {exc}"
next_attempt_at = None
if status == "failed" and attempts < self.settings.max_delivery_attempts:
status = "retry"
next_time = datetime.now(UTC) + timedelta(seconds=self.settings.retry_backoff_seconds * attempts)
next_attempt_at = next_time.replace(microsecond=0).isoformat()
attempted_at = now_iso()
with self.db.connect() as conn:
conn.execute(
"""
INSERT INTO delivery_attempts (
delivery_id, alert_id, attempt_no, status, response_code,
response_body, error, attempted_at, next_attempt_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
delivery["id"],
delivery["alert_id"],
attempts,
status,
response_code,
response_body,
error,
attempted_at,
next_attempt_at,
),
)
conn.execute(
"""
UPDATE deliveries
SET status = ?, attempts = ?, next_attempt_at = ?, last_attempt_at = ?,
response_code = ?, response_body = ?, error = ?, updated_at = ?
WHERE id = ?
""",
(
status,
attempts,
next_attempt_at,
attempted_at,
response_code,
response_body,
error,
attempted_at,
delivery["id"],
),
)
failed_open = conn.execute(
"""
SELECT COUNT(*) AS count
FROM deliveries
WHERE alert_id = ? AND status IN ('pending', 'retry', 'failed')
""",
(delivery["alert_id"],),
).fetchone()["count"]
conn.execute(
"UPDATE alerts SET status = ? WHERE id = ?",
("delivered" if failed_open == 0 else "partial", delivery["alert_id"]),
)