import json import requests import time import hmac import hashlib import base64 import urllib.parse import traceback from typing import Dict, Any, List, Optional, Union class DingTalkBot: """钉钉机器人工具类,用于发送分析结果到钉钉群""" def __init__(self, webhook_url: str, secret: Optional[str] = None): """ 初始化钉钉机器人 Args: webhook_url: 钉钉机器人的webhook地址 secret: 钉钉机器人的签名密钥(如果启用了安全设置) """ self.webhook_url = webhook_url self.secret = secret def _sign(self) -> str: """ 生成钉钉机器人签名 Returns: 签名后的URL """ if not self.secret: return self.webhook_url timestamp = str(round(time.time() * 1000)) string_to_sign = f"{timestamp}\n{self.secret}" hmac_code = hmac.new( self.secret.encode(), string_to_sign.encode(), digestmod=hashlib.sha256 ).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return f"{self.webhook_url}×tamp={timestamp}&sign={sign}" def send_text(self, content: str, at_mobiles: List[str] = None, at_all: bool = False) -> Dict[str, Any]: """ 发送文本消息 Args: content: 消息内容 at_mobiles: 要@的手机号列表 at_all: 是否@所有人 Returns: 接口返回结果 """ data = { "msgtype": "text", "text": {"content": content}, "at": { "atMobiles": at_mobiles if at_mobiles else [], "isAtAll": at_all } } return self._post(data) def send_markdown(self, title: str, text: str, at_mobiles: List[str] = None, at_all: bool = False) -> Dict[str, Any]: """ 发送Markdown消息 Args: title: 消息标题 text: Markdown格式的消息内容 at_mobiles: 要@的手机号列表 at_all: 是否@所有人 Returns: 接口返回结果 """ data = { "msgtype": "markdown", "markdown": { "title": title, "text": text }, "at": { "atMobiles": at_mobiles if at_mobiles else [], "isAtAll": at_all } } return self._post(data) def _post(self, data: Dict[str, Any]) -> Dict[str, Any]: """ 发送消息到钉钉 Args: data: 消息数据 Returns: 接口返回结果 """ try: webhook = self._sign() headers = {'Content-Type': 'application/json; charset=utf-8'} response = requests.post(webhook, json=data, headers=headers) return response.json() except Exception as e: print(f"发送钉钉消息时出错: {e}") traceback.print_exc() return {"errcode": -1, "errmsg": str(e)} def _format_complex_content(self, content) -> str: """ 格式化复杂内容(JSON对象或列表)为易读的文本 Args: content: 需要格式化的内容 Returns: 格式化后的文本 """ if isinstance(content, dict): # 将字典转换为项目列表 formatted_text = "" for key, value in content.items(): key_display = key.replace('_', ' ').title() if isinstance(value, (dict, list)): formatted_text += f"- **{key_display}**: \n" # 递归处理嵌套结构 nested_text = self._format_complex_content(value) # 增加缩进 nested_text = '\n'.join([f" {line}" for line in nested_text.split('\n')]) formatted_text += f"{nested_text}\n" else: formatted_text += f"- **{key_display}**: {value}\n" return formatted_text elif isinstance(content, list): # 将列表转换为项目列表 formatted_text = "" for item in content: if isinstance(item, (dict, list)): nested_text = self._format_complex_content(item) formatted_text += f"- {nested_text}\n" else: formatted_text += f"- {item}\n" return formatted_text else: # 简单类型直接返回字符串 return str(content) def format_analysis_result(self, symbol: str, analysis_result: Dict[str, Any]) -> str: """ 格式化分析结果为Markdown格式 Args: symbol: 交易对符号 analysis_result: 分析结果 Returns: 格式化后的Markdown文本 """ try: if not analysis_result or 'error' in analysis_result: return f"### {symbol} 分析结果错误\n\n获取分析结果时出错: {analysis_result.get('error', '未知错误')}" # 提取关键信息 market_trend = analysis_result.get('market_trend', '未知') # 支撑位和阻力位 support_levels = analysis_result.get('support_levels', []) if isinstance(support_levels, list): support_levels_str = '\n'.join([f"- {level}" for level in support_levels]) else: support_levels_str = str(support_levels) resistance_levels = analysis_result.get('resistance_levels', []) if isinstance(resistance_levels, list): resistance_levels_str = '\n'.join([f"- {level}" for level in resistance_levels]) else: resistance_levels_str = str(resistance_levels) # 技术指标分析 technical_analysis = analysis_result.get('technical_analysis', '未知') if isinstance(technical_analysis, (dict, list)): technical_analysis = self._format_complex_content(technical_analysis) # 交易量分析 volume_analysis = analysis_result.get('volume_analysis', '未知') if isinstance(volume_analysis, (dict, list)): volume_analysis = self._format_complex_content(volume_analysis) # 操作建议 recommendation = analysis_result.get('recommendation', '未知') entry_points = analysis_result.get('entry_points', []) if isinstance(entry_points, list): entry_points_str = '、'.join([str(point) for point in entry_points]) else: entry_points_str = str(entry_points) exit_points = analysis_result.get('exit_points', []) if isinstance(exit_points, list): exit_points_str = '、'.join([str(point) for point in exit_points]) else: exit_points_str = str(exit_points) stop_loss = analysis_result.get('stop_loss', '未知') take_profit = analysis_result.get('take_profit', '未知') # 紧迫性评级 urgency_level = analysis_result.get('urgency_level', 0) urgency_reason = analysis_result.get('urgency_reason', '未知') # 紧迫性等级图标 if int(urgency_level) >= 4: urgency_icon = "🔴" elif int(urgency_level) >= 2: urgency_icon = "🟡" else: urgency_icon = "🟢" # 总结 summary = analysis_result.get('summary', '无摘要') # 根据市场趋势设置图标 if '牛' in str(market_trend) or 'bull' in str(market_trend).lower(): trend_icon = "🟢" elif '熊' in str(market_trend) or 'bear' in str(market_trend).lower(): trend_icon = "🔴" else: trend_icon = "🟡" # 构建Markdown文本 markdown = f"""### {trend_icon} {symbol} 市场分析 **市场趋势**: {market_trend} **技术指标分析**: {technical_analysis} **交易量分析**: {volume_analysis} **支撑位(斐波那契)**: {support_levels_str} **阻力位(斐波那契)**: {resistance_levels_str} **操作建议**: {recommendation} **入场点位**: {entry_points_str} **出场点位**: {exit_points_str} **止损位**: {stop_loss} **止盈位**: {take_profit} **操作紧迫性**: {urgency_icon} {urgency_level}/5 **紧迫性原因**: {urgency_reason} **总结**: {summary} *分析时间: {time.strftime('%Y-%m-%d %H:%M:%S')}* """ return markdown except Exception as e: print(f"格式化分析结果时出错: {e}") traceback.print_exc() return f"### {symbol} 格式化分析结果出错\n\n{str(e)}" def format_prediction_result(self, symbol: str, prediction_result: Dict[str, Any]) -> str: """ 格式化预测结果为Markdown格式 Args: symbol: 交易对符号 prediction_result: 预测结果 Returns: 格式化后的Markdown文本 """ try: if not prediction_result or 'error' in prediction_result: return f"### {symbol} 预测结果错误\n\n获取预测结果时出错: {prediction_result.get('error', '未知错误')}" # 提取关键信息 current_price = prediction_result.get('current_price', '未知') # 24小时预测 prediction_24h = prediction_result.get('prediction_24h', {}) price_range_24h = prediction_24h.get('price_range', '未知') trend_24h = prediction_24h.get('trend', '未知') # 7天预测 prediction_7d = prediction_result.get('prediction_7d', {}) price_range_7d = prediction_7d.get('price_range', '未知') trend_7d = prediction_7d.get('trend', '未知') # 关键因素 key_factors = prediction_result.get('key_factors', []) if isinstance(key_factors, list): key_factors_text = '\n'.join([f"- {factor}" for factor in key_factors]) else: key_factors_text = str(key_factors) # 根据24小时趋势设置图标 if '上涨' in str(trend_24h) or 'up' in str(trend_24h).lower(): trend_icon = "🟢" elif '下跌' in str(trend_24h) or 'down' in str(trend_24h).lower(): trend_icon = "🔴" else: trend_icon = "🟡" # 构建Markdown文本 markdown = f"""### {trend_icon} {symbol} 价格预测 **当前价格**: {current_price} **24小时预测**: - 价格区间: {price_range_24h} - 趋势: {trend_24h} **7天预测**: - 价格区间: {price_range_7d} - 趋势: {trend_7d} **关键影响因素**: {key_factors_text} *预测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}* """ return markdown except Exception as e: print(f"格式化预测结果时出错: {e}") traceback.print_exc() return f"### {symbol} 格式化预测结果出错\n\n{str(e)}" def format_strategy_result(self, symbol: str, strategy_result: Dict[str, Any]) -> str: """ 格式化策略结果为Markdown格式 Args: symbol: 交易对符号 strategy_result: 策略结果 Returns: 格式化后的Markdown文本 """ try: if not strategy_result or 'error' in strategy_result: return f"### {symbol} 策略结果错误\n\n获取策略结果时出错: {strategy_result.get('error', '未知错误')}" # 提取关键信息 position = strategy_result.get('position', '未知') entry_points = strategy_result.get('entry_points', []) if isinstance(entry_points, list): entry_points_str = '、'.join([str(point) for point in entry_points]) else: entry_points_str = str(entry_points) exit_points = strategy_result.get('exit_points', []) if isinstance(exit_points, list): exit_points_str = '、'.join([str(point) for point in exit_points]) else: exit_points_str = str(exit_points) stop_loss = strategy_result.get('stop_loss', '未知') take_profit = strategy_result.get('take_profit', '未知') # 紧迫性评级 urgency_level = strategy_result.get('urgency_level', 0) reasoning = strategy_result.get('reasoning', '无理由') # 根据建议仓位设置图标 if '买' in position or 'buy' in str(position).lower(): position_icon = "🟢" elif '卖' in position or 'sell' in str(position).lower(): position_icon = "🔴" else: position_icon = "⚪" # 紧迫性等级图标 if int(urgency_level) >= 4: urgency_icon = "🔴" elif int(urgency_level) >= 2: urgency_icon = "🟡" else: urgency_icon = "🟢" # 构建Markdown文本 markdown = f"""### {symbol} 交易策略 **建议操作**: {position_icon} {position} **入场点位**: {entry_points_str} **出场点位**: {exit_points_str} **止损位**: {stop_loss} **止盈位**: {take_profit} **操作紧迫性**: {urgency_icon} {urgency_level}/5 **策略理由**: {reasoning} *策略生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}* """ return markdown except Exception as e: print(f"格式化策略结果时出错: {e}") traceback.print_exc() return f"### {symbol} 格式化策略结果出错\n\n{str(e)}" def format_integrated_report(self, symbol: str, analysis_result: Dict[str, Any]) -> str: """ 格式化整合后的分析报告 Args: symbol: 交易对符号 analysis_result: 分析结果 Returns: 格式化后的文本 """ try: if not analysis_result or 'error' in analysis_result: return f"### {symbol} 分析结果错误\n\n获取分析结果时出错: {analysis_result.get('error', '未知错误')}" # 提取关键信息 market_trend = analysis_result.get('market_trend', '未知') # 支撑位和阻力位的格式化 def format_fibonacci_levels(levels): formatted_lines = [] if isinstance(levels, list): for level in levels: if isinstance(level, dict) and 'level' in level and 'price' in level: formatted_lines.append(f"- {level['level']} 位: {level['price']}") else: formatted_lines.append(f"- {level}") elif isinstance(levels, dict): for level, price in levels.items(): if level.replace('.', '', 1).isdigit(): # 检查是否是数字形式的级别 formatted_lines.append(f"- {level} 位: {price}") else: formatted_lines.append(f"- {level}: {price}") else: formatted_lines.append(str(levels)) return '\n'.join(formatted_lines) support_levels = analysis_result.get('support_levels', []) support_levels_str = format_fibonacci_levels(support_levels) resistance_levels = analysis_result.get('resistance_levels', []) resistance_levels_str = format_fibonacci_levels(resistance_levels) # 技术指标分析 technical_analysis = analysis_result.get('technical_analysis', '未知') if isinstance(technical_analysis, (dict, list)): technical_analysis = self._format_complex_content(technical_analysis) # 交易量分析 volume_analysis = analysis_result.get('volume_analysis', '未知') if isinstance(volume_analysis, (dict, list)): volume_analysis = self._format_complex_content(volume_analysis) # 操作建议 recommendation = analysis_result.get('recommendation', '未知') entry_points = analysis_result.get('entry_points', []) if isinstance(entry_points, list): entry_points_str = '、'.join([str(point) for point in entry_points]) else: entry_points_str = str(entry_points) exit_points = analysis_result.get('exit_points', []) if isinstance(exit_points, list): exit_points_str = '、'.join([str(point) for point in exit_points]) else: exit_points_str = str(exit_points) stop_loss = analysis_result.get('stop_loss', '未知') take_profit = analysis_result.get('take_profit', '未知') # 紧迫性评级 urgency_level = analysis_result.get('urgency_level', 0) urgency_reason = analysis_result.get('urgency_reason', '未知') # 紧迫性等级图标 if int(urgency_level) >= 4: urgency_icon = "🔴" elif int(urgency_level) >= 2: urgency_icon = "🟡" else: urgency_icon = "🟢" # 总结 summary = analysis_result.get('summary', '无摘要') # 根据市场趋势设置图标 if '牛' in str(market_trend) or 'bull' in str(market_trend).lower(): trend_icon = "🟢" elif '熊' in str(market_trend) or 'bear' in str(market_trend).lower(): trend_icon = "🔴" else: trend_icon = "🟡" # 构建Markdown文本 markdown = f"""### DeepSeek AI 加密货币分析报告 **交易对**: {symbol} **分析时间**: {time.strftime('%Y-%m-%d %H:%M:%S')} **市场趋势**: {trend_icon} {market_trend} **技术指标分析**: {technical_analysis} **交易量分析**: {volume_analysis} **支撑位(斐波那契)**: {support_levels_str} **阻力位(斐波那契)**: {resistance_levels_str} **操作建议**: {recommendation} **入场点位**: {entry_points_str} **出场点位**: {exit_points_str} **止损位**: {stop_loss} **止盈位**: {take_profit} **操作紧迫性**: {urgency_icon} {urgency_level}/5 **紧迫性原因**: {urgency_reason} **总结**: {summary} """ return markdown except Exception as e: print(f"格式化整合后的分析报告时出错: {e}") traceback.print_exc() return f"### {symbol} 格式化整合后的分析报告出错\n\n{str(e)}" def send_analysis_report(self, symbol: str, analysis_data: Dict[str, Any]) -> Dict[str, Any]: """ 发送分析报告(整合市场分析和交易建议) Args: symbol: 交易对符号 analysis_data: 包含分析数据 Returns: 接口返回结果 """ try: analysis_result = analysis_data.get('analysis', {}) # 获取市场趋势,用于设置标题图标 market_trend = '' if analysis_result and 'market_trend' in analysis_result: market_trend = analysis_result['market_trend'] if '牛' in str(market_trend) or 'bull' in str(market_trend).lower(): title_icon = "🟢" elif '熊' in str(market_trend) or 'bear' in str(market_trend).lower(): title_icon = "🔴" else: title_icon = "🟡" # 获取建议操作 position = analysis_result.get('recommendation', '未知') # 构建标题 title = f"{title_icon} {symbol} 加密货币分析 | 建议: {position}" # 格式化分析结果(整合了分析和交易建议) markdown_text = self.format_integrated_report(symbol, analysis_result) # 发送Markdown消息 return self.send_markdown(title, markdown_text) except Exception as e: print(f"发送分析报告时出错: {e}") traceback.print_exc() error_msg = f"### {symbol} 分析报告生成错误\n\n{str(e)}" return self.send_markdown(f"⚠️ {symbol} 分析报告错误", error_msg)