"""Feishu push transport for paper trading only.""" from __future__ import annotations import os from copy import deepcopy import requests from app.config.system_config import notification_config def _feishu_settings(): cfg = notification_config() or {} feishu = cfg.get("feishu") or {} try: timeout = int(feishu.get("timeout") or 10) except Exception: timeout = 10 webhook_env = str(feishu.get("webhook_env") or "ALTCOIN_FEISHU_WEBHOOK") return { "enabled": bool(cfg.get("enabled", True)) and bool(feishu.get("enabled", True)), "webhook_env": webhook_env, "webhook_url": os.getenv(webhook_env, "").strip(), "timeout": timeout, } def _runtime_env() -> str: return str(os.getenv("ALPHAX_ENV") or "dev").strip().lower() or "dev" def _env_label(env: str) -> str: labels = { "dev": "DEV", "development": "DEV", "local": "DEV", "test": "TEST", "testing": "TEST", "stage": "STAGING", "staging": "STAGING", } return labels.get(env, env.upper()) def _mark_card_environment(card_content): env = _runtime_env() card = deepcopy(card_content) metadata = dict(card.get("metadata") or {}) metadata["alphax_env"] = env card["metadata"] = metadata if env in ("prod", "production"): return card label = _env_label(env) header = card.setdefault("header", {}) title = header.setdefault("title", {"tag": "plain_text", "content": ""}) if isinstance(title, dict): content = str(title.get("content") or "") prefix = f"[{label}] " if not content.startswith(prefix): title["content"] = prefix + content return card def push_card(card_content): """Paper trading cards only. Non-paper-trading cards are rejected by construction so no other channel can continue using this transport by accident. """ if not isinstance(card_content, dict): return False, {"skipped": True, "reason": "invalid_card"} metadata = card_content.get("metadata") or {} if str(metadata.get("source") or "").strip().lower() != "paper_trading": return False, {"skipped": True, "reason": "paper_trading_only"} card_content = _mark_card_environment(card_content) settings = _feishu_settings() if not settings["enabled"]: return False, "feishu notification disabled" if not settings["webhook_url"]: return False, f"{settings['webhook_env']} not configured" payload = {"msg_type": "interactive", "card": card_content} try: resp = requests.post(settings["webhook_url"], json=payload, timeout=settings["timeout"]) result = resp.json() ok = resp.status_code == 200 and result.get("StatusCode") == 0 return ok, result except Exception as exc: return False, str(exc) __all__ = ["push_card"]