""" Telegram 通知服务 - 通过 Bot API 发送交易信号到频道 """ import httpx from typing import Dict, Any, Optional from app.utils.logger import logger from app.config import get_settings class TelegramService: """Telegram 机器人通知服务""" def __init__(self, bot_token: str = "", channel_id: str = ""): """ 初始化 Telegram 服务 Args: bot_token: Telegram Bot Token (从 @BotFather 获取) channel_id: 频道 ID (如 @your_channel 或 -1001234567890) """ settings = get_settings() self.bot_token = bot_token or getattr(settings, 'telegram_bot_token', '') self.channel_id = channel_id or getattr(settings, 'telegram_channel_id', '') self.enabled = bool(self.bot_token and self.channel_id) if self.enabled: self.api_base = f"https://api.telegram.org/bot{self.bot_token}" logger.info(f"Telegram 通知服务初始化完成,频道: {self.channel_id}") else: self.api_base = "" logger.warning("Telegram Bot Token 或 Channel ID 未配置,通知功能已禁用") async def send_message(self, text: str, parse_mode: str = "HTML") -> bool: """ 发送文本消息 Args: text: 消息内容 (支持 HTML 或 Markdown) parse_mode: 解析模式 ("HTML" 或 "Markdown") Returns: 是否发送成功 """ if not self.enabled: logger.warning("Telegram 服务未启用,跳过发送") return False data = { "chat_id": self.channel_id, "text": text, "parse_mode": parse_mode, "disable_web_page_preview": True } return await self._send("sendMessage", data) async def send_trading_signal(self, signal: Dict[str, Any]) -> bool: """ 发送交易信号消息 Args: signal: 交易信号数据 Returns: 是否发送成功 """ if not self.enabled: logger.warning("Telegram 服务未启用,跳过发送") return False action = signal.get('action', 'hold') symbol = signal.get('symbol', 'UNKNOWN') price = signal.get('price', 0) trend = signal.get('trend', 'neutral') confidence = signal.get('confidence', 0) signal_type = signal.get('signal_type', 'swing') signal_grade = signal.get('signal_grade', 'D') indicators = signal.get('indicators', {}) llm_analysis = signal.get('llm_analysis', '') stop_loss = signal.get('stop_loss', 0) take_profit = signal.get('take_profit', 0) reasons = signal.get('reasons', []) # 信号类型 type_emoji = "📈" if signal_type == 'short_term' else "📊" type_text = "短线信号" if signal_type == 'short_term' else "波段信号" type_hint = "快进快出,建议轻仓" if signal_type == 'short_term' else "趋势跟踪,可适当持仓" # 动作 if action == 'buy': action_emoji = "🟢" action_text = "买入 / LONG" elif action == 'sell': action_emoji = "🔴" action_text = "卖出 / SHORT" else: action_emoji = "⚪" action_text = "观望 / WAIT" # 趋势 trend_map = { 'bullish': '📈 看涨', 'bearish': '📉 看跌', 'neutral': '↔️ 震荡' } trend_text = trend_map.get(trend, '未知') # 等级 grade_stars = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(signal_grade, '') # 构建消息 lines = [ f"{action_emoji} {action_text} - {symbol}", f"", f"━━━━━━━━━━━━━━━━━━━━", f"", f"{type_emoji} 信号类型: {type_text}", f"💡 操作建议: {type_hint}", f"", f"💰 当前价格: ${price:,.2f}", f"📊 趋势方向: {trend_text}", f"🎯 置信度: {confidence}%", f"⭐ 信号等级: {signal_grade} {grade_stars}", ] # 触发原因 if reasons: lines.extend([ f"", f"━━━━━━━━━━━━━━━━━━━━", f"", f"📋 触发原因:", ]) for reason in reasons[:5]: # 清理 emoji 避免重复 clean_reason = reason.replace("📊 ", "").replace("📈 ", "").replace("📉 ", "") lines.append(f" • {clean_reason}") # 技术指标 if indicators: rsi = indicators.get('rsi', 0) macd = indicators.get('macd', 0) macd_signal_val = indicators.get('macd_signal', 0) k = indicators.get('k', 0) d = indicators.get('d', 0) rsi_status = "超卖↑" if rsi < 30 else ("超买↓" if rsi > 70 else "中性") macd_status = "金叉" if macd > macd_signal_val else "死叉" lines.extend([ f"", f"━━━━━━━━━━━━━━━━━━━━", f"", f"📉 技术指标:", f" • RSI(14): {rsi:.1f} ({rsi_status})", f" • MACD: {macd_status}", ]) if k > 0: lines.append(f" • KDJ: K={k:.1f}, D={d:.1f}") # 止损止盈 if stop_loss > 0 or take_profit > 0: lines.extend([ f"", f"━━━━━━━━━━━━━━━━━━━━", f"", f"🛡️ 风险管理:", ]) if stop_loss > 0: sl_percent = ((stop_loss - price) / price) * 100 lines.append(f" • 止损: ${stop_loss:,.2f} ({sl_percent:+.1f}%)") if take_profit > 0: tp_percent = ((take_profit - price) / price) * 100 lines.append(f" • 止盈: ${take_profit:,.2f} ({tp_percent:+.1f}%)") # AI 分析 if llm_analysis: analysis_text = llm_analysis[:150] + "..." if len(llm_analysis) > 150 else llm_analysis lines.extend([ f"", f"━━━━━━━━━━━━━━━━━━━━", f"", f"🤖 AI 分析:", f"{analysis_text}", ]) # 免责声明 lines.extend([ f"", f"━━━━━━━━━━━━━━━━━━━━", f"", f"⚠️ 仅供参考,不构成投资建议", ]) message = "\n".join(lines) return await self.send_message(message, parse_mode="HTML") async def send_trend_change(self, symbol: str, old_trend: str, new_trend: str, price: float) -> bool: """ 发送趋势变化通知 Args: symbol: 交易对 old_trend: 旧趋势 new_trend: 新趋势 price: 当前价格 Returns: 是否发送成功 """ trend_emoji = { 'bullish': '📈', 'bearish': '📉', 'neutral': '↔️' } trend_text = { 'bullish': '看涨', 'bearish': '看跌', 'neutral': '震荡' } old_emoji = trend_emoji.get(old_trend, '❓') new_emoji = trend_emoji.get(new_trend, '❓') old_text = trend_text.get(old_trend, old_trend) new_text = trend_text.get(new_trend, new_trend) message = f"""🔄 趋势变化 - {symbol} ━━━━━━━━━━━━━━━━━━━━ 变化: {old_text} {old_emoji} → {new_text} {new_emoji} 💰 当前价格: ${price:,.2f} ━━━━━━━━━━━━━━━━━━━━ 请关注后续交易信号""" return await self.send_message(message, parse_mode="HTML") async def send_startup_notification(self, symbols: list) -> bool: """ 发送启动通知 Args: symbols: 监控的交易对列表 Returns: 是否发送成功 """ message = f"""🚀 加密货币智能体已启动 ━━━━━━━━━━━━━━━━━━━━ 📊 监控交易对: {', '.join(symbols)} ⏰ 运行模式: 每5分钟整点执行 ━━━━━━━━━━━━━━━━━━━━ 开始监控市场信号...""" return await self.send_message(message, parse_mode="HTML") async def _send(self, method: str, data: Dict[str, Any]) -> bool: """ 发送请求到 Telegram API Args: method: API 方法名 data: 请求数据 Returns: 是否发送成功 """ try: url = f"{self.api_base}/{method}" async with httpx.AsyncClient() as client: response = await client.post( url, json=data, timeout=10.0 ) result = response.json() if result.get('ok'): logger.info(f"Telegram 消息发送成功") return True else: logger.error(f"Telegram 消息发送失败: {result.get('description', 'Unknown error')}") return False except Exception as e: logger.error(f"Telegram 消息发送异常: {e}") return False # 全局实例(延迟初始化) _telegram_service: Optional[TelegramService] = None def get_telegram_service() -> TelegramService: """获取 Telegram 服务实例""" global _telegram_service if _telegram_service is None: _telegram_service = TelegramService() return _telegram_service