"""Feishu push transport for paper trading only.""" from __future__ import annotations import os import requests from app.config.system_config import notification_config def _feishu_settings(): cfg = notification_config() or {} feishu = cfg.get("feishu") or {} try: timeout = int(feishu.get("timeout") or 10) except Exception: timeout = 10 webhook_env = str(feishu.get("webhook_env") or "ALTCOIN_FEISHU_WEBHOOK") return { "enabled": bool(cfg.get("enabled", True)) and bool(feishu.get("enabled", True)), "webhook_env": webhook_env, "webhook_url": os.getenv(webhook_env, "").strip(), "timeout": timeout, } def push_card(card_content): """Paper trading cards only. Non-paper-trading cards are rejected by construction so no other channel can continue using this transport by accident. """ if not isinstance(card_content, dict): return False, {"skipped": True, "reason": "invalid_card"} metadata = card_content.get("metadata") or {} if str(metadata.get("source") or "").strip().lower() != "paper_trading": return False, {"skipped": True, "reason": "paper_trading_only"} settings = _feishu_settings() if not settings["enabled"]: return False, "feishu notification disabled" if not settings["webhook_url"]: return False, f"{settings['webhook_env']} not configured" payload = {"msg_type": "interactive", "card": card_content} try: resp = requests.post(settings["webhook_url"], json=payload, timeout=settings["timeout"]) result = resp.json() ok = resp.status_code == 200 and result.get("StatusCode") == 0 return ok, result except Exception as exc: return False, str(exc) __all__ = ["push_card"]