alphax/app/integrations/feishu_push.py
2026-05-18 17:02:44 +08:00

66 lines
2.1 KiB
Python

"""Feishu push transport for paper trading only."""
from __future__ import annotations
import os
import requests
from app.config.system_config import notification_config
def _feishu_settings():
cfg = notification_config() or {}
feishu = cfg.get("feishu") or {}
try:
timeout = int(feishu.get("timeout") or 10)
except Exception:
timeout = 10
webhook_env = str(feishu.get("webhook_env") or "ALTCOIN_FEISHU_WEBHOOK")
return {
"enabled": bool(cfg.get("enabled", True)) and bool(feishu.get("enabled", True)),
"webhook_env": webhook_env,
"webhook_url": os.getenv(webhook_env, "").strip(),
"timeout": timeout,
}
def push_card(card_content):
"""Paper trading cards only.
Non-paper-trading cards are rejected by construction so no other channel can
continue using this transport by accident.
"""
if not isinstance(card_content, dict):
return False, {"skipped": True, "reason": "invalid_card"}
metadata = card_content.get("metadata") or {}
if str(metadata.get("source") or "").strip().lower() != "paper_trading":
return False, {"skipped": True, "reason": "paper_trading_only"}
settings = _feishu_settings()
if not settings["enabled"]:
return False, "feishu notification disabled"
if not settings["webhook_url"]:
return False, f"{settings['webhook_env']} not configured"
payload = {"msg_type": "interactive", "card": card_content}
try:
resp = requests.post(settings["webhook_url"], json=payload, timeout=settings["timeout"])
result = resp.json()
ok = resp.status_code == 200 and result.get("StatusCode") == 0
return ok, result
except Exception as exc:
return False, str(exc)
def push_altcoin_tp_sl_alert(*args, **kwargs):
"""Backward-compatible alias for legacy imports.
The transport remains paper-trading only; legacy callers get a rejected
response instead of importing failing during test collection.
"""
return False, {"skipped": True, "reason": "deprecated_alias"}
__all__ = ["push_card", "push_altcoin_tp_sl_alert"]