""" 飞书通知服务 - 通过 Webhook 发送交易信号通知 """ import json import httpx from typing import Dict, Any, Optional from app.utils.logger import logger from app.config import get_settings class FeishuService: """飞书机器人通知服务""" def __init__(self, webhook_url: str = ""): """ 初始化飞书服务 Args: webhook_url: 飞书机器人 Webhook URL """ settings = get_settings() self.webhook_url = webhook_url or getattr(settings, 'feishu_webhook_url', '') self.enabled = bool(self.webhook_url) if self.enabled: logger.info("飞书通知服务初始化完成") else: logger.warning("飞书 Webhook URL 未配置,通知功能已禁用") async def send_text(self, message: str) -> bool: """ 发送文本消息 Args: message: 消息内容 Returns: 是否发送成功 """ if not self.enabled: logger.warning("飞书服务未启用,跳过发送") return False data = { "msg_type": "text", "content": { "text": message } } return await self._send(data) async def send_card(self, title: str, content: str, color: str = "blue") -> bool: """ 发送卡片消息 Args: title: 卡片标题 content: 卡片内容(支持 Markdown) color: 标题颜色 (blue, green, red, orange, purple) Returns: 是否发送成功 """ if not self.enabled: logger.warning("飞书服务未启用,跳过发送") return False # 颜色映射 color_map = { "blue": "blue", "green": "green", "red": "red", "orange": "orange", "purple": "purple" } data = { "msg_type": "interactive", "card": { "header": { "title": { "tag": "plain_text", "content": title }, "template": color_map.get(color, "blue") }, "elements": [ { "tag": "markdown", "content": content } ] } } return await self._send(data) async def send_trading_signal(self, signal: Dict[str, Any]) -> bool: """ 发送交易信号卡片 Args: signal: 交易信号数据 - symbol: 交易对 - action: 'buy' | 'sell' - price: 当前价格 - trend: 趋势方向 - confidence: 信号强度 (0-100) - indicators: 技术指标数据 - llm_analysis: LLM 分析结果(可选) - stop_loss: 建议止损价 - take_profit: 建议止盈价 Returns: 是否发送成功 """ if not self.enabled: logger.warning("飞书服务未启用,跳过发送") return False action = signal.get('action', 'hold') symbol = signal.get('symbol', 'UNKNOWN') price = signal.get('price', 0) trend = signal.get('trend', 'neutral') confidence = signal.get('confidence', 0) indicators = signal.get('indicators', {}) llm_analysis = signal.get('llm_analysis', '') stop_loss = signal.get('stop_loss', 0) take_profit = signal.get('take_profit', 0) # 确定标题和颜色 if action == 'buy': title = f"🟢 买入信号 - {symbol}" color = "green" action_text = "做多" elif action == 'sell': title = f"🔴 卖出信号 - {symbol}" color = "red" action_text = "做空" else: title = f"⚪ 观望 - {symbol}" color = "blue" action_text = "观望" # 趋势文本 trend_text = { 'bullish': '看涨 📈', 'bearish': '看跌 📉', 'neutral': '震荡 ↔️' }.get(trend, '未知') # 构建内容 content_parts = [ f"**当前价格**: ${price:,.2f}", f"**趋势方向**: {trend_text}", f"**信号强度**: {confidence}%", "", "---", "", "**技术指标**:" ] # 添加技术指标 if indicators: rsi = indicators.get('rsi', 0) macd = indicators.get('macd', 0) macd_signal = indicators.get('macd_signal', 0) rsi_status = "超卖 ↑" if rsi < 30 else ("超买 ↓" if rsi > 70 else "中性") macd_status = "金叉" if macd > macd_signal else "死叉" content_parts.extend([ f"• RSI(14): {rsi:.1f} ({rsi_status})", f"• MACD: {macd_status}", ]) if 'k' in indicators: content_parts.append(f"• KDJ: K={indicators['k']:.1f}, D={indicators['d']:.1f}") # 添加 LLM 分析 if llm_analysis: content_parts.extend([ "", "---", "", "**AI 分析**:", llm_analysis[:200] + "..." if len(llm_analysis) > 200 else llm_analysis ]) # 添加止损止盈建议 if stop_loss > 0 or take_profit > 0: content_parts.extend([ "", "---", "", "**风险管理**:" ]) if stop_loss > 0: sl_percent = ((stop_loss - price) / price) * 100 content_parts.append(f"• 建议止损: ${stop_loss:,.2f} ({sl_percent:+.1f}%)") if take_profit > 0: tp_percent = ((take_profit - price) / price) * 100 content_parts.append(f"• 建议止盈: ${take_profit:,.2f} ({tp_percent:+.1f}%)") # 添加免责声明 content_parts.extend([ "", "---", "", "*⚠️ 仅供参考,不构成投资建议*" ]) content = "\n".join(content_parts) return await self.send_card(title, content, color) async def send_trend_change(self, symbol: str, old_trend: str, new_trend: str, price: float) -> bool: """ 发送趋势变化通知 Args: symbol: 交易对 old_trend: 旧趋势 new_trend: 新趋势 price: 当前价格 Returns: 是否发送成功 """ trend_emoji = { 'bullish': '📈', 'bearish': '📉', 'neutral': '↔️' } trend_text = { 'bullish': '看涨', 'bearish': '看跌', 'neutral': '震荡' } title = f"🔄 趋势变化 - {symbol}" content = f"""**{symbol}** 趋势发生变化 **变化**: {trend_text.get(old_trend, old_trend)} {trend_emoji.get(old_trend, '')} → {trend_text.get(new_trend, new_trend)} {trend_emoji.get(new_trend, '')} **当前价格**: ${price:,.2f} *请关注后续交易信号*""" return await self.send_card(title, content, "orange") async def _send(self, data: Dict[str, Any]) -> bool: """ 发送消息到飞书 Args: data: 消息数据 Returns: 是否发送成功 """ try: async with httpx.AsyncClient() as client: response = await client.post( self.webhook_url, json=data, headers={"Content-Type": "application/json"}, timeout=10.0 ) result = response.json() if result.get('code') == 0 or result.get('StatusCode') == 0: logger.info("飞书消息发送成功") return True else: logger.error(f"飞书消息发送失败: {result}") return False except Exception as e: logger.error(f"飞书消息发送异常: {e}") return False # 全局实例(延迟初始化) _feishu_service: Optional[FeishuService] = None def get_feishu_service() -> FeishuService: """获取飞书服务实例""" global _feishu_service if _feishu_service is None: _feishu_service = FeishuService() return _feishu_service