from __future__ import annotations import json import re import urllib.error import urllib.request from datetime import datetime, timedelta, timezone from typing import Any from app.config import Settings from app.db import Database, from_json, now_iso, to_json UTC = timezone.utc TEMPLATE_PATTERN = re.compile(r"{{\s*([a-zA-Z0-9_.-]+)\s*}}|(? dict[str, Any]: normalized = dict(payload) normalized["timeframe"] = str(payload.get("timeframe", "")).strip() normalized["symbol"] = str(payload.get("symbol", "")).strip().upper() normalized["strategy"] = str(payload.get("strategy", "")).strip() if "price" in normalized and normalized["price"] not in (None, ""): try: normalized["price"] = float(normalized["price"]) except (TypeError, ValueError) as exc: raise ValidationError("price must be numeric") from exc return normalized def resolve_template_value(alert: dict[str, Any], field: str) -> str: value: Any = alert for part in field.split("."): if isinstance(value, dict) and part in value: value = value[part] else: return "" if value is None: return "" if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=False) return str(value) def render_template(template: str, alert: dict[str, Any]) -> str: return TEMPLATE_PATTERN.sub(lambda match: resolve_template_value(alert, match.group(1) or match.group(2)), template) def default_body(alert: dict[str, Any]) -> str: action = alert.get("action") or alert.get("signal") or "alert" lines = [ f"TradingView 信号: {alert.get('symbol') or '-'}", f"周期: {alert.get('timeframe') or '-'}", f"策略: {alert.get('strategy') or '-'}", f"动作: {action}", ] if alert.get("price") is not None: lines.append(f"价格: {alert['price']}") if alert.get("time"): lines.append(f"时间: {alert['time']}") return "\n".join(lines) def build_feishu_message(alert: dict[str, Any], rule: dict[str, Any] | None = None) -> dict[str, Any]: rule = rule or {} title_template = rule.get("card_title_template") or "TradingView {{symbol}} {{action}}" body_template = rule.get("card_body_template") or default_body(alert) title = render_template(title_template, alert).strip() or f"TradingView {alert.get('symbol') or 'Alert'}" body = render_template(body_template, alert).strip() or default_body(alert) return { "msg_type": "interactive", "card": { "config": {"wide_screen_mode": True}, "header": { "template": "blue", "title": {"tag": "plain_text", "content": title}, }, "elements": [ {"tag": "div", "text": {"tag": "lark_md", "content": body}}, { "tag": "hr", }, { "tag": "note", "elements": [ { "tag": "plain_text", "content": f"{alert.get('symbol') or '-'} · {alert.get('timeframe') or '-'} · {alert.get('strategy') or '-'}", } ], }, ], }, } class Dispatcher: def __init__(self, db: Database, settings: Settings): self.db = db self.settings = settings def find_matching_rule(self, alert: dict[str, Any]) -> dict[str, Any] | None: normalized = normalize_alert(alert) with self.db.connect() as conn: row = conn.execute( """ SELECT * FROM routing_rules WHERE enabled = 1 AND (timeframe = '' OR timeframe = ?) AND (symbol = '' OR upper(symbol) = ?) AND (strategy = '' OR strategy = ?) AND (timeframe <> '' OR symbol <> '' OR strategy <> '') ORDER BY priority ASC, ( CASE WHEN timeframe <> '' THEN 1 ELSE 0 END + CASE WHEN symbol <> '' THEN 1 ELSE 0 END + CASE WHEN strategy <> '' THEN 1 ELSE 0 END ) DESC, id ASC LIMIT 1 """, (normalized["timeframe"], normalized["symbol"], normalized["strategy"]), ).fetchone() return dict(row) if row else None def receive_alert(self, payload: dict[str, Any]) -> dict[str, Any]: alert = normalize_alert(payload) created_at = now_iso() with self.db.connect() as conn: rule = self.find_matching_rule(alert) status = "matched" if rule else "unmatched" cur = conn.execute( """ INSERT INTO alerts ( timeframe, symbol, strategy, action, price, payload, matched_rule_id, status, error, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( alert["timeframe"], alert["symbol"], alert["strategy"], alert.get("action") or alert.get("signal"), alert.get("price"), to_json(alert), rule["id"] if rule else None, status, None if rule else "No enabled routing rule matched this alert.", created_at, ), ) alert_id = int(cur.lastrowid) delivery_ids: list[int] = [] if rule: target_ids = from_json(rule["target_ids"], []) if target_ids: placeholders = ",".join("?" for _ in target_ids) targets = conn.execute( f"SELECT * FROM webhook_targets WHERE id IN ({placeholders})", target_ids, ).fetchall() for target in targets: delivery = conn.execute( """ INSERT INTO deliveries ( alert_id, rule_id, target_id, target_name, webhook_url, status, attempts, next_attempt_at, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, 'pending', 0, ?, ?, ?) """, ( alert_id, rule["id"], target["id"], target["name"], target["webhook_url"], created_at, created_at, created_at, ), ) delivery_ids.append(int(delivery.lastrowid)) if rule and not delivery_ids: conn.execute( "UPDATE alerts SET status = ?, error = ? WHERE id = ?", ("unmatched", "Matched rule has no webhook targets.", alert_id), ) self.process_due_deliveries() return { "alert_id": alert_id, "status": status, "matched_rule_id": rule["id"] if rule else None, "delivery_ids": delivery_ids, } def process_due_deliveries(self, limit: int = 25) -> int: now = now_iso() with self.db.connect() as conn: rows = conn.execute( """ SELECT d.*, a.payload, r.card_title_template, r.card_body_template FROM deliveries d JOIN alerts a ON a.id = d.alert_id LEFT JOIN routing_rules r ON r.id = d.rule_id WHERE d.status IN ('pending', 'retry') AND (d.next_attempt_at IS NULL OR d.next_attempt_at <= ?) ORDER BY d.created_at ASC LIMIT ? """, (now, limit), ).fetchall() processed = 0 for row in rows: delivery = dict(row) payload = from_json(delivery["payload"], {}) self._send_delivery(delivery, payload) processed += 1 return processed def _send_delivery(self, delivery: dict[str, Any], alert: dict[str, Any]) -> None: attempts = int(delivery["attempts"]) + 1 message = build_feishu_message(alert, delivery) encoded = json.dumps(message, ensure_ascii=False).encode() request = urllib.request.Request( delivery["webhook_url"], data=encoded, headers={"Content-Type": "application/json"}, method="POST", ) response_code: int | None = None response_body: str | None = None error: str | None = None status = "sent" try: with urllib.request.urlopen(request, timeout=self.settings.feishu_timeout_seconds) as response: response_code = response.getcode() response_body = response.read(2048).decode(errors="replace") if response_code >= 400: status = "failed" error = f"Feishu webhook returned HTTP {response_code}" except urllib.error.HTTPError as exc: response_code = exc.code response_body = exc.read(2048).decode(errors="replace") status = "failed" error = f"Feishu webhook returned HTTP {exc.code}" except Exception as exc: status = "failed" error = str(exc) next_attempt_at = None if status == "failed" and attempts < self.settings.max_delivery_attempts: status = "retry" next_time = datetime.now(UTC) + timedelta(seconds=self.settings.retry_backoff_seconds * attempts) next_attempt_at = next_time.replace(microsecond=0).isoformat() with self.db.connect() as conn: conn.execute( """ UPDATE deliveries SET status = ?, attempts = ?, next_attempt_at = ?, last_attempt_at = ?, response_code = ?, response_body = ?, error = ?, updated_at = ? WHERE id = ? """, ( status, attempts, next_attempt_at, now_iso(), response_code, response_body, error, now_iso(), delivery["id"], ), ) failed_open = conn.execute( """ SELECT COUNT(*) AS count FROM deliveries WHERE alert_id = ? AND status IN ('pending', 'retry', 'failed') """, (delivery["alert_id"],), ).fetchone()["count"] conn.execute( "UPDATE alerts SET status = ? WHERE id = ?", ("delivered" if failed_open == 0 else "partial", delivery["alert_id"]), )