from __future__ import annotations import json import re import socket import urllib.error import urllib.request from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta, timezone from typing import Any from app.config import Settings from app.db import Database, from_json, now_iso, to_json UTC = timezone.utc TEMPLATE_PATTERN = re.compile(r"{{\s*([a-zA-Z0-9_.-]+)\s*}}|(? dict[str, Any]: normalized = dict(payload) normalized["timeframe"] = str(payload.get("timeframe", "")).strip().lower() normalized["symbol"] = str(payload.get("symbol", "")).strip().upper() normalized["strategy"] = str(payload.get("strategy", "")).strip() if "price" in normalized and normalized["price"] not in (None, ""): try: normalized["price"] = float(normalized["price"]) except (TypeError, ValueError) as exc: raise ValidationError("price must be numeric") from exc return normalized def resolve_template_value(alert: dict[str, Any], field: str) -> str: value: Any = alert for part in field.split("."): if isinstance(value, dict) and part in value: value = value[part] else: return "" if value is None: return "" if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=False) return str(value) def render_template(template: str, alert: dict[str, Any]) -> str: return TEMPLATE_PATTERN.sub(lambda match: resolve_template_value(alert, match.group(1) or match.group(2)), template) def default_body(alert: dict[str, Any]) -> str: action = alert.get("action") or alert.get("signal") or "alert" lines = [ f"TradingView 信号: {alert.get('symbol') or '-'}", f"周期: {alert.get('timeframe') or '-'}", f"策略: {alert.get('strategy') or '-'}", f"动作: {action}", ] if alert.get("price") is not None: lines.append(f"价格: {alert['price']}") if alert.get("time"): lines.append(f"时间: {alert['time']}") return "\n".join(lines) def build_feishu_message(alert: dict[str, Any], rule: dict[str, Any] | None = None) -> dict[str, Any]: rule = rule or {} title_template = rule.get("card_title_template") or "TradingView {{symbol}} {{action}}" body_template = rule.get("card_body_template") or default_body(alert) title = render_template(title_template, alert).strip() or f"TradingView {alert.get('symbol') or 'Alert'}" body = render_template(body_template, alert).strip() or default_body(alert) return { "msg_type": "interactive", "card": { "config": {"wide_screen_mode": True}, "header": { "template": "blue", "title": {"tag": "plain_text", "content": title}, }, "elements": [ {"tag": "div", "text": {"tag": "lark_md", "content": body}}, { "tag": "hr", }, { "tag": "note", "elements": [ { "tag": "plain_text", "content": f"{alert.get('symbol') or '-'} · {alert.get('timeframe') or '-'} · {alert.get('strategy') or '-'}", } ], }, ], }, } def feishu_response_error(response_body: str | None) -> str | None: if not response_body: return None try: payload = json.loads(response_body) except json.JSONDecodeError: return None if not isinstance(payload, dict): return None code = payload.get("code", payload.get("StatusCode", payload.get("status_code"))) if code in (None, 0, "0"): return None message = ( payload.get("msg") or payload.get("message") or payload.get("StatusMessage") or payload.get("status_msg") or "unknown error" ) return f"feishu_error: code={code}, message={message}" class Dispatcher: def __init__(self, db: Database, settings: Settings): self.db = db self.settings = settings def find_matching_rule(self, alert: dict[str, Any]) -> dict[str, Any] | None: normalized = normalize_alert(alert) with self.db.connect() as conn: row = conn.execute( """ SELECT * FROM routing_rules WHERE enabled = 1 AND (timeframe = '' OR lower(timeframe) = ?) AND (symbol = '' OR upper(symbol) = ?) AND (strategy = '' OR strategy = ?) AND (timeframe <> '' OR symbol <> '' OR strategy <> '') ORDER BY priority ASC, ( CASE WHEN timeframe <> '' THEN 1 ELSE 0 END + CASE WHEN symbol <> '' THEN 1 ELSE 0 END + CASE WHEN strategy <> '' THEN 1 ELSE 0 END ) DESC, id ASC LIMIT 1 """, (normalized["timeframe"], normalized["symbol"], normalized["strategy"]), ).fetchone() return dict(row) if row else None def explain_unmatched_rule(self, alert: dict[str, Any]) -> str: normalized = normalize_alert(alert) with self.db.connect() as conn: candidates = conn.execute( """ SELECT id, name, timeframe, symbol, strategy, enabled, priority FROM routing_rules ORDER BY enabled DESC, priority ASC, id ASC LIMIT 8 """ ).fetchall() if not candidates: return "No routing rules configured." details = [] for row in candidates: mismatches = [] if not row["enabled"]: mismatches.append("rule disabled") if row["timeframe"] and row["timeframe"].lower() != normalized["timeframe"]: mismatches.append(f"timeframe {normalized['timeframe'] or '-'} != {row['timeframe']}") if row["symbol"] and row["symbol"].upper() != normalized["symbol"]: mismatches.append(f"symbol {normalized['symbol'] or '-'} != {row['symbol']}") if row["strategy"] and row["strategy"] != normalized["strategy"]: mismatches.append(f"strategy {normalized['strategy'] or '-'} != {row['strategy']}") reason = "; ".join(mismatches) if mismatches else "matched fields but was not selected" details.append(f"#{row['id']} {row['name']}: {reason}") return "No enabled routing rule matched this alert. Closest rules: " + " | ".join(details) def receive_alert(self, payload: dict[str, Any]) -> dict[str, Any]: alert = normalize_alert(payload) created_at = now_iso() with self.db.connect() as conn: rule = self.find_matching_rule(alert) status = "queued" if rule else "unmatched" error = None if rule else self.explain_unmatched_rule(alert) cur = conn.execute( """ INSERT INTO alerts ( timeframe, symbol, strategy, action, price, payload, matched_rule_id, status, error, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( alert["timeframe"], alert["symbol"], alert["strategy"], alert.get("action") or alert.get("signal"), alert.get("price"), to_json(alert), rule["id"] if rule else None, status, error, created_at, ), ) alert_id = int(cur.lastrowid) delivery_ids: list[int] = [] if rule: target_ids = from_json(rule["target_ids"], []) if target_ids: placeholders = ",".join("?" for _ in target_ids) targets = conn.execute( f"SELECT * FROM webhook_targets WHERE id IN ({placeholders})", target_ids, ).fetchall() for target in targets: delivery = conn.execute( """ INSERT INTO deliveries ( alert_id, rule_id, target_id, target_name, webhook_url, status, attempts, next_attempt_at, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, 'pending', 0, ?, ?, ?) """, ( alert_id, rule["id"], target["id"], target["name"], target["webhook_url"], None, created_at, created_at, ), ) delivery_ids.append(int(delivery.lastrowid)) if rule and not delivery_ids: status = "unmatched" error = "Matched rule has no webhook targets." conn.execute( "UPDATE alerts SET status = ?, error = ? WHERE id = ?", (status, error, alert_id), ) if self.settings.dispatch_inline: self.process_due_deliveries( limit=self.settings.delivery_batch_size, concurrency=self.settings.delivery_concurrency, ) return { "alert_id": alert_id, "status": status, "matched_rule_id": rule["id"] if rule else None, "delivery_ids": delivery_ids, } def process_due_deliveries(self, limit: int = 25, concurrency: int = 1) -> int: now = now_iso() stale_cutoff = (datetime.now(UTC) - timedelta(seconds=max(300, self.settings.retry_backoff_seconds))).replace( microsecond=0 ).isoformat() with self.db.connect() as conn: conn.execute( """ UPDATE deliveries SET status = 'retry', next_attempt_at = ?, updated_at = ?, error = COALESCE(error, 'worker_recovered: stale processing task reset') WHERE status = 'processing' AND updated_at <= ? """, (now, now, stale_cutoff), ) rows = conn.execute( """ SELECT d.*, a.payload, r.card_title_template, r.card_body_template FROM deliveries d JOIN alerts a ON a.id = d.alert_id LEFT JOIN routing_rules r ON r.id = d.rule_id WHERE d.status IN ('pending', 'retry') AND (d.next_attempt_at IS NULL OR d.next_attempt_at <= ?) ORDER BY d.created_at ASC LIMIT ? """, (now, limit), ).fetchall() claimed_rows = [] for row in rows: cur = conn.execute( """ UPDATE deliveries SET status = 'processing', updated_at = ? WHERE id = ? AND status IN ('pending', 'retry') """, (now, row["id"]), ) if cur.rowcount == 1: claimed_rows.append(row) jobs = [(dict(row), from_json(row["payload"], {})) for row in claimed_rows] if not jobs: return 0 worker_count = max(1, min(concurrency, len(jobs))) if worker_count == 1: for delivery, payload in jobs: self._send_delivery(delivery, payload) else: with ThreadPoolExecutor(max_workers=worker_count) as executor: futures = [executor.submit(self._send_delivery, delivery, payload) for delivery, payload in jobs] for future in futures: future.result() return len(jobs) def process_delivery_by_id(self, delivery_id: int) -> bool: now = now_iso() with self.db.connect() as conn: row = conn.execute( """ SELECT d.*, a.payload, r.card_title_template, r.card_body_template FROM deliveries d JOIN alerts a ON a.id = d.alert_id LEFT JOIN routing_rules r ON r.id = d.rule_id WHERE d.id = ? """, (delivery_id,), ).fetchone() if not row or row["status"] == "sent": return False conn.execute( """ UPDATE deliveries SET status = 'processing', next_attempt_at = NULL, updated_at = ? WHERE id = ? """, (now, delivery_id), ) delivery = dict(row) payload = from_json(delivery["payload"], {}) self._send_delivery(delivery, payload) return True def retry_delivery_now(self, delivery_id: int) -> bool: now = now_iso() with self.db.connect() as conn: row = conn.execute( """ SELECT status FROM deliveries WHERE id = ? """, (delivery_id,), ).fetchone() if not row or row["status"] == "sent": return False conn.execute( """ UPDATE deliveries SET status = 'pending', next_attempt_at = ?, updated_at = ? WHERE id = ? """, (now, now, delivery_id), ) return self.process_delivery_by_id(delivery_id) def _send_delivery(self, delivery: dict[str, Any], alert: dict[str, Any]) -> None: attempts = int(delivery["attempts"]) + 1 message = build_feishu_message(alert, delivery) encoded = json.dumps(message, ensure_ascii=False).encode() request = urllib.request.Request( delivery["webhook_url"], data=encoded, headers={"Content-Type": "application/json"}, method="POST", ) response_code: int | None = None response_body: str | None = None error: str | None = None status = "sent" try: with urllib.request.urlopen(request, timeout=self.settings.feishu_timeout_seconds) as response: response_code = response.getcode() response_body = response.read(2048).decode(errors="replace") if response_code >= 400: status = "failed" error = f"Feishu webhook returned HTTP {response_code}" else: error = feishu_response_error(response_body) if error: status = "failed" except urllib.error.HTTPError as exc: response_code = exc.code response_body = exc.read(2048).decode(errors="replace") status = "failed" error = f"http_error: Feishu webhook returned HTTP {exc.code}" except (TimeoutError, socket.timeout) as exc: status = "failed" error = f"timeout: {exc}" except urllib.error.URLError as exc: status = "failed" error = f"network_error: {exc.reason}" except Exception as exc: status = "failed" error = f"send_error: {exc}" next_attempt_at = None if status == "failed" and attempts < self.settings.max_delivery_attempts: status = "retry" next_time = datetime.now(UTC) + timedelta(seconds=self.settings.retry_backoff_seconds * attempts) next_attempt_at = next_time.replace(microsecond=0).isoformat() attempted_at = now_iso() with self.db.connect() as conn: conn.execute( """ INSERT INTO delivery_attempts ( delivery_id, alert_id, attempt_no, status, response_code, response_body, error, attempted_at, next_attempt_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( delivery["id"], delivery["alert_id"], attempts, status, response_code, response_body, error, attempted_at, next_attempt_at, ), ) conn.execute( """ UPDATE deliveries SET status = ?, attempts = ?, next_attempt_at = ?, last_attempt_at = ?, response_code = ?, response_body = ?, error = ?, updated_at = ? WHERE id = ? """, ( status, attempts, next_attempt_at, attempted_at, response_code, response_body, error, attempted_at, delivery["id"], ), ) failed_open = conn.execute( """ SELECT COUNT(*) AS count FROM deliveries WHERE alert_id = ? AND status IN ('pending', 'retry', 'failed') """, (delivery["alert_id"],), ).fetchone()["count"] conn.execute( "UPDATE alerts SET status = ? WHERE id = ?", ("delivered" if failed_open == 0 else "partial", delivery["alert_id"]), )