From dba001e6247db4d2f4b0bd4c77fb32741747f3b5 Mon Sep 17 00:00:00 2001
From: aaron <>
Date: Fri, 6 Feb 2026 15:52:14 +0800
Subject: [PATCH] update
---
backend/app/config.py | 4 +
backend/app/crypto_agent/crypto_agent.py | 26 +-
backend/app/services/telegram_service.py | 307 +++++++++++++++++++++++
3 files changed, 328 insertions(+), 9 deletions(-)
create mode 100644 backend/app/services/telegram_service.py
diff --git a/backend/app/config.py b/backend/app/config.py
index 13c27b7..d1fb626 100644
--- a/backend/app/config.py
+++ b/backend/app/config.py
@@ -96,6 +96,10 @@ class Settings(BaseSettings):
# 飞书机器人配置
feishu_webhook_url: str = "https://open.feishu.cn/open-apis/bot/v2/hook/8a1dcf69-6753-41e2-a393-edc4f7822db0"
+ # Telegram 机器人配置
+ telegram_bot_token: str = "" # 从 @BotFather 获取
+ telegram_channel_id: str = "" # 频道 ID,如 @your_channel 或 -1001234567890
+
# 加密货币交易智能体配置
crypto_symbols: str = "BTCUSDT,ETHUSDT,BNBUSDT,SOLUSDT" # 监控的交易对,逗号分隔
crypto_analysis_interval: int = 60 # 分析间隔(秒)
diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py
index d8ec15e..8ce1b73 100644
--- a/backend/app/crypto_agent/crypto_agent.py
+++ b/backend/app/crypto_agent/crypto_agent.py
@@ -10,6 +10,7 @@ from app.utils.logger import logger
from app.config import get_settings
from app.services.binance_service import binance_service
from app.services.feishu_service import get_feishu_service
+from app.services.telegram_service import get_telegram_service
from app.crypto_agent.signal_analyzer import SignalAnalyzer
from app.crypto_agent.strategy import TrendFollowingStrategy
@@ -22,6 +23,7 @@ class CryptoAgent:
self.settings = get_settings()
self.binance = binance_service
self.feishu = get_feishu_service()
+ self.telegram = get_telegram_service()
self.analyzer = SignalAnalyzer()
self.strategy = TrendFollowingStrategy()
@@ -69,12 +71,13 @@ class CryptoAgent:
logger.info(f" LLM阈值: {self.llm_threshold * 100:.0f}%")
logger.info("=" * 60 + "\n")
- # 发送启动通知
+ # 发送启动通知(飞书 + Telegram)
await self.feishu.send_text(
f"🚀 加密货币智能体已启动\n"
f"监控交易对: {', '.join(self.symbols)}\n"
f"运行时间: 每5分钟整点 (:00, :05, :10, ...)"
)
+ await self.telegram.send_startup_notification(self.symbols)
while self.running:
try:
@@ -149,12 +152,15 @@ class CryptoAgent:
logger.info(f" 方向: {trend_icon} | 强度: {trend_strength} | 阶段: {phase_text}")
- # 3. 检查趋势变化
- last_direction = self.last_trends.get(symbol, {})
- if isinstance(last_direction, dict):
- last_direction = last_direction.get('direction', 'neutral')
- if last_direction and last_direction != trend_direction:
- await self._handle_trend_change(symbol, last_direction, trend_direction, data)
+ # 3. 检查趋势变化(只有之前有记录时才检查,避免启动时误报)
+ if symbol in self.last_trends:
+ last_trend = self.last_trends[symbol]
+ if isinstance(last_trend, dict):
+ last_direction = last_trend.get('direction', 'neutral')
+ else:
+ last_direction = last_trend
+ if last_direction != trend_direction:
+ await self._handle_trend_change(symbol, last_direction, trend_direction, data)
self.last_trends[symbol] = trend
@@ -246,16 +252,17 @@ class CryptoAgent:
else:
signal['llm_analysis'] = llm_result.get('summary', llm_result.get('raw', '')[:200])
- # 8. 发送飞书通知(置信度仍然足够高时)
+ # 8. 发送通知(飞书 + Telegram,置信度仍然足够高时)
if signal['confidence'] >= 50:
await self.feishu.send_trading_signal(signal)
+ await self.telegram.send_trading_signal(signal)
# 9. 更新状态
self.last_signals[symbol] = signal
self.signal_cooldown[symbol] = datetime.now()
action_text = '买入' if signal['action'] == 'buy' else '卖出'
- logger.info(f" ✅ 已发送 {action_text} 信号通知")
+ logger.info(f" ✅ 已发送 {action_text} 信号通知(飞书+Telegram)")
else:
logger.info(f" ⏸️ 置信度不足({signal['confidence']}%),不发送通知")
else:
@@ -329,6 +336,7 @@ class CryptoAgent:
"""处理趋势变化"""
price = float(data['1h'].iloc[-1]['close'])
await self.feishu.send_trend_change(symbol, old_trend, new_trend, price)
+ await self.telegram.send_trend_change(symbol, old_trend, new_trend, price)
logger.info(f"{symbol} 趋势变化: {old_trend} -> {new_trend}")
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
diff --git a/backend/app/services/telegram_service.py b/backend/app/services/telegram_service.py
new file mode 100644
index 0000000..e2c7ecb
--- /dev/null
+++ b/backend/app/services/telegram_service.py
@@ -0,0 +1,307 @@
+"""
+Telegram 通知服务 - 通过 Bot API 发送交易信号到频道
+"""
+import httpx
+from typing import Dict, Any, Optional
+from app.utils.logger import logger
+from app.config import get_settings
+
+
+class TelegramService:
+ """Telegram 机器人通知服务"""
+
+ def __init__(self, bot_token: str = "", channel_id: str = ""):
+ """
+ 初始化 Telegram 服务
+
+ Args:
+ bot_token: Telegram Bot Token (从 @BotFather 获取)
+ channel_id: 频道 ID (如 @your_channel 或 -1001234567890)
+ """
+ settings = get_settings()
+ self.bot_token = bot_token or getattr(settings, 'telegram_bot_token', '')
+ self.channel_id = channel_id or getattr(settings, 'telegram_channel_id', '')
+ self.enabled = bool(self.bot_token and self.channel_id)
+
+ if self.enabled:
+ self.api_base = f"https://api.telegram.org/bot{self.bot_token}"
+ logger.info(f"Telegram 通知服务初始化完成,频道: {self.channel_id}")
+ else:
+ self.api_base = ""
+ logger.warning("Telegram Bot Token 或 Channel ID 未配置,通知功能已禁用")
+
+ async def send_message(self, text: str, parse_mode: str = "HTML") -> bool:
+ """
+ 发送文本消息
+
+ Args:
+ text: 消息内容 (支持 HTML 或 Markdown)
+ parse_mode: 解析模式 ("HTML" 或 "Markdown")
+
+ Returns:
+ 是否发送成功
+ """
+ if not self.enabled:
+ logger.warning("Telegram 服务未启用,跳过发送")
+ return False
+
+ data = {
+ "chat_id": self.channel_id,
+ "text": text,
+ "parse_mode": parse_mode,
+ "disable_web_page_preview": True
+ }
+
+ return await self._send("sendMessage", data)
+
+ async def send_trading_signal(self, signal: Dict[str, Any]) -> bool:
+ """
+ 发送交易信号消息
+
+ Args:
+ signal: 交易信号数据
+
+ Returns:
+ 是否发送成功
+ """
+ if not self.enabled:
+ logger.warning("Telegram 服务未启用,跳过发送")
+ return False
+
+ action = signal.get('action', 'hold')
+ symbol = signal.get('symbol', 'UNKNOWN')
+ price = signal.get('price', 0)
+ trend = signal.get('trend', 'neutral')
+ confidence = signal.get('confidence', 0)
+ signal_type = signal.get('signal_type', 'swing')
+ signal_grade = signal.get('signal_grade', 'D')
+ indicators = signal.get('indicators', {})
+ llm_analysis = signal.get('llm_analysis', '')
+ stop_loss = signal.get('stop_loss', 0)
+ take_profit = signal.get('take_profit', 0)
+ reasons = signal.get('reasons', [])
+
+ # 信号类型
+ type_emoji = "📈" if signal_type == 'short_term' else "📊"
+ type_text = "短线信号" if signal_type == 'short_term' else "波段信号"
+ type_hint = "快进快出,建议轻仓" if signal_type == 'short_term' else "趋势跟踪,可适当持仓"
+
+ # 动作
+ if action == 'buy':
+ action_emoji = "🟢"
+ action_text = "买入 / LONG"
+ elif action == 'sell':
+ action_emoji = "🔴"
+ action_text = "卖出 / SHORT"
+ else:
+ action_emoji = "⚪"
+ action_text = "观望 / WAIT"
+
+ # 趋势
+ trend_map = {
+ 'bullish': '📈 看涨',
+ 'bearish': '📉 看跌',
+ 'neutral': '↔️ 震荡'
+ }
+ trend_text = trend_map.get(trend, '未知')
+
+ # 等级
+ grade_stars = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(signal_grade, '')
+
+ # 构建消息
+ lines = [
+ f"{action_emoji} {action_text} - {symbol}",
+ f"",
+ f"━━━━━━━━━━━━━━━━━━━━",
+ f"",
+ f"{type_emoji} 信号类型: {type_text}",
+ f"💡 操作建议: {type_hint}",
+ f"",
+ f"💰 当前价格: ${price:,.2f}",
+ f"📊 趋势方向: {trend_text}",
+ f"🎯 置信度: {confidence}%",
+ f"⭐ 信号等级: {signal_grade} {grade_stars}",
+ ]
+
+ # 触发原因
+ if reasons:
+ lines.extend([
+ f"",
+ f"━━━━━━━━━━━━━━━━━━━━",
+ f"",
+ f"📋 触发原因:",
+ ])
+ for reason in reasons[:5]:
+ # 清理 emoji 避免重复
+ clean_reason = reason.replace("📊 ", "").replace("📈 ", "").replace("📉 ", "")
+ lines.append(f" • {clean_reason}")
+
+ # 技术指标
+ if indicators:
+ rsi = indicators.get('rsi', 0)
+ macd = indicators.get('macd', 0)
+ macd_signal_val = indicators.get('macd_signal', 0)
+ k = indicators.get('k', 0)
+ d = indicators.get('d', 0)
+
+ rsi_status = "超卖↑" if rsi < 30 else ("超买↓" if rsi > 70 else "中性")
+ macd_status = "金叉" if macd > macd_signal_val else "死叉"
+
+ lines.extend([
+ f"",
+ f"━━━━━━━━━━━━━━━━━━━━",
+ f"",
+ f"📉 技术指标:",
+ f" • RSI(14): {rsi:.1f} ({rsi_status})",
+ f" • MACD: {macd_status}",
+ ])
+ if k > 0:
+ lines.append(f" • KDJ: K={k:.1f}, D={d:.1f}")
+
+ # 止损止盈
+ if stop_loss > 0 or take_profit > 0:
+ lines.extend([
+ f"",
+ f"━━━━━━━━━━━━━━━━━━━━",
+ f"",
+ f"🛡️ 风险管理:",
+ ])
+ if stop_loss > 0:
+ sl_percent = ((stop_loss - price) / price) * 100
+ lines.append(f" • 止损: ${stop_loss:,.2f} ({sl_percent:+.1f}%)")
+ if take_profit > 0:
+ tp_percent = ((take_profit - price) / price) * 100
+ lines.append(f" • 止盈: ${take_profit:,.2f} ({tp_percent:+.1f}%)")
+
+ # AI 分析
+ if llm_analysis:
+ analysis_text = llm_analysis[:150] + "..." if len(llm_analysis) > 150 else llm_analysis
+ lines.extend([
+ f"",
+ f"━━━━━━━━━━━━━━━━━━━━",
+ f"",
+ f"🤖 AI 分析:",
+ f"{analysis_text}",
+ ])
+
+ # 免责声明
+ lines.extend([
+ f"",
+ f"━━━━━━━━━━━━━━━━━━━━",
+ f"",
+ f"⚠️ 仅供参考,不构成投资建议",
+ ])
+
+ message = "\n".join(lines)
+ return await self.send_message(message, parse_mode="HTML")
+
+ async def send_trend_change(self, symbol: str, old_trend: str, new_trend: str, price: float) -> bool:
+ """
+ 发送趋势变化通知
+
+ Args:
+ symbol: 交易对
+ old_trend: 旧趋势
+ new_trend: 新趋势
+ price: 当前价格
+
+ Returns:
+ 是否发送成功
+ """
+ trend_emoji = {
+ 'bullish': '📈',
+ 'bearish': '📉',
+ 'neutral': '↔️'
+ }
+ trend_text = {
+ 'bullish': '看涨',
+ 'bearish': '看跌',
+ 'neutral': '震荡'
+ }
+
+ old_emoji = trend_emoji.get(old_trend, '❓')
+ new_emoji = trend_emoji.get(new_trend, '❓')
+ old_text = trend_text.get(old_trend, old_trend)
+ new_text = trend_text.get(new_trend, new_trend)
+
+ message = f"""🔄 趋势变化 - {symbol}
+
+━━━━━━━━━━━━━━━━━━━━
+
+变化: {old_text} {old_emoji} → {new_text} {new_emoji}
+
+💰 当前价格: ${price:,.2f}
+
+━━━━━━━━━━━━━━━━━━━━
+
+请关注后续交易信号"""
+
+ return await self.send_message(message, parse_mode="HTML")
+
+ async def send_startup_notification(self, symbols: list) -> bool:
+ """
+ 发送启动通知
+
+ Args:
+ symbols: 监控的交易对列表
+
+ Returns:
+ 是否发送成功
+ """
+ message = f"""🚀 加密货币智能体已启动
+
+━━━━━━━━━━━━━━━━━━━━
+
+📊 监控交易对: {', '.join(symbols)}
+⏰ 运行模式: 每5分钟整点执行
+
+━━━━━━━━━━━━━━━━━━━━
+
+开始监控市场信号..."""
+
+ return await self.send_message(message, parse_mode="HTML")
+
+ async def _send(self, method: str, data: Dict[str, Any]) -> bool:
+ """
+ 发送请求到 Telegram API
+
+ Args:
+ method: API 方法名
+ data: 请求数据
+
+ Returns:
+ 是否发送成功
+ """
+ try:
+ url = f"{self.api_base}/{method}"
+ async with httpx.AsyncClient() as client:
+ response = await client.post(
+ url,
+ json=data,
+ timeout=10.0
+ )
+
+ result = response.json()
+
+ if result.get('ok'):
+ logger.info(f"Telegram 消息发送成功")
+ return True
+ else:
+ logger.error(f"Telegram 消息发送失败: {result.get('description', 'Unknown error')}")
+ return False
+
+ except Exception as e:
+ logger.error(f"Telegram 消息发送异常: {e}")
+ return False
+
+
+# 全局实例(延迟初始化)
+_telegram_service: Optional[TelegramService] = None
+
+
+def get_telegram_service() -> TelegramService:
+ """获取 Telegram 服务实例"""
+ global _telegram_service
+ if _telegram_service is None:
+ _telegram_service = TelegramService()
+ return _telegram_service