"""Feishu/Lark 告警发送""" from __future__ import annotations import hashlib import logging from datetime import datetime from zoneinfo import ZoneInfo import httpx from app.config import settings from app.data.cache import cache logger = logging.getLogger(__name__) def _build_signature( source: str, message: str, level: str, context: dict | None = None, ) -> str: context = context or {} basis = "|".join([ source, level, message.strip(), str(context.get("method", "")), str(context.get("path", "")), ]) return hashlib.sha1(basis.encode("utf-8")).hexdigest() def _truncate(text: str, limit: int) -> str: text = (text or "").strip() if len(text) <= limit: return text return f"{text[:limit]}..." async def send_feishu_alert( source: str, message: str, detail: str = "", level: str = "error", context: dict | None = None, ) -> bool: """发送 Feishu 告警,内置去重,失败不抛异常。""" if not settings.alert_enabled or not settings.feishu_webhook_url: return False signature = _build_signature(source, message, level, context) dedup_key = f"feishu_alert:{signature}" if cache.get(dedup_key): return False cache.set(dedup_key, True, settings.alert_dedup_ttl_seconds) now = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") context = context or {} detail = _truncate(detail, settings.alert_max_detail_chars) lines = [ f"[{settings.alert_app_name}] {level.upper()}", f"环境: {settings.alert_environment}", f"时间: {now}", f"来源: {source}", f"摘要: {message}", ] if context.get("method") or context.get("path"): lines.append( f"请求: {context.get('method', '')} {context.get('path', '')}".strip() ) if context.get("query"): lines.append(f"Query: {context['query']}") if detail: lines.append(f"详情: {detail}") payload = { "msg_type": "text", "content": { "text": "\n".join(lines), }, } try: async with httpx.AsyncClient(timeout=8, follow_redirects=True) as client: resp = await client.post(settings.feishu_webhook_url, json=payload) resp.raise_for_status() return True except Exception as e: logger.warning("Feishu 告警发送失败: %s", e) return False