crypto.ai/cryptoai/utils/dingtalk_bot.py
2025-04-27 23:10:59 +08:00

608 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import requests
import time
import hmac
import hashlib
import base64
import urllib.parse
import traceback
from typing import Dict, Any, List, Optional, Union
class DingTalkBot:
"""钉钉机器人工具类,用于发送分析结果到钉钉群"""
def __init__(self, webhook_url: str, secret: Optional[str] = None):
"""
初始化钉钉机器人
Args:
webhook_url: 钉钉机器人的webhook地址
secret: 钉钉机器人的签名密钥(如果启用了安全设置)
"""
self.webhook_url = webhook_url
self.secret = secret
def _sign(self) -> str:
"""
生成钉钉机器人签名
Returns:
签名后的URL
"""
if not self.secret:
return self.webhook_url
timestamp = str(round(time.time() * 1000))
string_to_sign = f"{timestamp}\n{self.secret}"
hmac_code = hmac.new(
self.secret.encode(),
string_to_sign.encode(),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
def send_text(self, content: str, at_mobiles: List[str] = None, at_all: bool = False) -> Dict[str, Any]:
"""
发送文本消息
Args:
content: 消息内容
at_mobiles: 要@的手机号列表
at_all: 是否@所有人
Returns:
接口返回结果
"""
data = {
"msgtype": "text",
"text": {"content": content},
"at": {
"atMobiles": at_mobiles if at_mobiles else [],
"isAtAll": at_all
}
}
return self._post(data)
def send_markdown(self, title: str, text: str, at_mobiles: List[str] = None, at_all: bool = False) -> Dict[str, Any]:
"""
发送Markdown消息
Args:
title: 消息标题
text: Markdown格式的消息内容
at_mobiles: 要@的手机号列表
at_all: 是否@所有人
Returns:
接口返回结果
"""
data = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": text
},
"at": {
"atMobiles": at_mobiles if at_mobiles else [],
"isAtAll": at_all
}
}
return self._post(data)
def _post(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
发送消息到钉钉
Args:
data: 消息数据
Returns:
接口返回结果
"""
try:
webhook = self._sign()
headers = {'Content-Type': 'application/json; charset=utf-8'}
response = requests.post(webhook, json=data, headers=headers)
return response.json()
except Exception as e:
print(f"发送钉钉消息时出错: {e}")
traceback.print_exc()
return {"errcode": -1, "errmsg": str(e)}
def _format_complex_content(self, content) -> str:
"""
格式化复杂内容JSON对象或列表为易读的文本
Args:
content: 需要格式化的内容
Returns:
格式化后的文本
"""
if isinstance(content, dict):
# 将字典转换为项目列表
formatted_text = ""
for key, value in content.items():
key_display = key.replace('_', ' ').title()
if isinstance(value, (dict, list)):
formatted_text += f"- **{key_display}**: \n"
# 递归处理嵌套结构
nested_text = self._format_complex_content(value)
# 增加缩进
nested_text = '\n'.join([f" {line}" for line in nested_text.split('\n')])
formatted_text += f"{nested_text}\n"
else:
formatted_text += f"- **{key_display}**: {value}\n"
return formatted_text
elif isinstance(content, list):
# 将列表转换为项目列表
formatted_text = ""
for item in content:
if isinstance(item, (dict, list)):
nested_text = self._format_complex_content(item)
formatted_text += f"- {nested_text}\n"
else:
formatted_text += f"- {item}\n"
return formatted_text
else:
# 简单类型直接返回字符串
return str(content)
def format_analysis_result(self, symbol: str, analysis_result: Dict[str, Any]) -> str:
"""
格式化分析结果为Markdown格式
Args:
symbol: 交易对符号
analysis_result: 分析结果
Returns:
格式化后的Markdown文本
"""
try:
if not analysis_result or 'error' in analysis_result:
return f"### {symbol} 分析结果错误\n\n获取分析结果时出错: {analysis_result.get('error', '未知错误')}"
# 提取关键信息
market_trend = analysis_result.get('market_trend', '未知')
# 支撑位和阻力位
support_levels = analysis_result.get('support_levels', [])
if isinstance(support_levels, list):
support_levels_str = '\n'.join([f"- {level}" for level in support_levels])
else:
support_levels_str = str(support_levels)
resistance_levels = analysis_result.get('resistance_levels', [])
if isinstance(resistance_levels, list):
resistance_levels_str = '\n'.join([f"- {level}" for level in resistance_levels])
else:
resistance_levels_str = str(resistance_levels)
# 技术指标分析
technical_analysis = analysis_result.get('technical_analysis', '未知')
if isinstance(technical_analysis, (dict, list)):
technical_analysis = self._format_complex_content(technical_analysis)
# 交易量分析
volume_analysis = analysis_result.get('volume_analysis', '未知')
if isinstance(volume_analysis, (dict, list)):
volume_analysis = self._format_complex_content(volume_analysis)
# 操作建议
recommendation = analysis_result.get('recommendation', '未知')
entry_points = analysis_result.get('entry_points', [])
if isinstance(entry_points, list):
entry_points_str = ''.join([str(point) for point in entry_points])
else:
entry_points_str = str(entry_points)
exit_points = analysis_result.get('exit_points', [])
if isinstance(exit_points, list):
exit_points_str = ''.join([str(point) for point in exit_points])
else:
exit_points_str = str(exit_points)
stop_loss = analysis_result.get('stop_loss', '未知')
take_profit = analysis_result.get('take_profit', '未知')
# 紧迫性评级
urgency_level = analysis_result.get('urgency_level', 0)
urgency_reason = analysis_result.get('urgency_reason', '未知')
# 紧迫性等级图标
if int(urgency_level) >= 4:
urgency_icon = "🔴"
elif int(urgency_level) >= 2:
urgency_icon = "🟡"
else:
urgency_icon = "🟢"
# 总结
summary = analysis_result.get('summary', '无摘要')
# 根据市场趋势设置图标
if '' in str(market_trend) or 'bull' in str(market_trend).lower():
trend_icon = "🟢"
elif '' in str(market_trend) or 'bear' in str(market_trend).lower():
trend_icon = "🔴"
else:
trend_icon = "🟡"
# 构建Markdown文本
markdown = f"""### {trend_icon} {symbol} 市场分析
**市场趋势**: {market_trend}
**技术指标分析**:
{technical_analysis}
**交易量分析**:
{volume_analysis}
**支撑位(斐波那契)**:
{support_levels_str}
**阻力位(斐波那契)**:
{resistance_levels_str}
**操作建议**: {recommendation}
**入场点位**: {entry_points_str}
**出场点位**: {exit_points_str}
**止损位**: {stop_loss}
**止盈位**: {take_profit}
**操作紧迫性**: {urgency_icon} {urgency_level}/5
**紧迫性原因**: {urgency_reason}
**总结**: {summary}
*分析时间: {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return markdown
except Exception as e:
print(f"格式化分析结果时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化分析结果出错\n\n{str(e)}"
def format_prediction_result(self, symbol: str, prediction_result: Dict[str, Any]) -> str:
"""
格式化预测结果为Markdown格式
Args:
symbol: 交易对符号
prediction_result: 预测结果
Returns:
格式化后的Markdown文本
"""
try:
if not prediction_result or 'error' in prediction_result:
return f"### {symbol} 预测结果错误\n\n获取预测结果时出错: {prediction_result.get('error', '未知错误')}"
# 提取关键信息
current_price = prediction_result.get('current_price', '未知')
# 24小时预测
prediction_24h = prediction_result.get('prediction_24h', {})
price_range_24h = prediction_24h.get('price_range', '未知')
trend_24h = prediction_24h.get('trend', '未知')
# 7天预测
prediction_7d = prediction_result.get('prediction_7d', {})
price_range_7d = prediction_7d.get('price_range', '未知')
trend_7d = prediction_7d.get('trend', '未知')
# 关键因素
key_factors = prediction_result.get('key_factors', [])
if isinstance(key_factors, list):
key_factors_text = '\n'.join([f"- {factor}" for factor in key_factors])
else:
key_factors_text = str(key_factors)
# 根据24小时趋势设置图标
if '上涨' in str(trend_24h) or 'up' in str(trend_24h).lower():
trend_icon = "🟢"
elif '下跌' in str(trend_24h) or 'down' in str(trend_24h).lower():
trend_icon = "🔴"
else:
trend_icon = "🟡"
# 构建Markdown文本
markdown = f"""### {trend_icon} {symbol} 价格预测
**当前价格**: {current_price}
**24小时预测**:
- 价格区间: {price_range_24h}
- 趋势: {trend_24h}
**7天预测**:
- 价格区间: {price_range_7d}
- 趋势: {trend_7d}
**关键影响因素**:
{key_factors_text}
*预测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return markdown
except Exception as e:
print(f"格式化预测结果时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化预测结果出错\n\n{str(e)}"
def format_strategy_result(self, symbol: str, strategy_result: Dict[str, Any]) -> str:
"""
格式化策略结果为Markdown格式
Args:
symbol: 交易对符号
strategy_result: 策略结果
Returns:
格式化后的Markdown文本
"""
try:
if not strategy_result or 'error' in strategy_result:
return f"### {symbol} 策略结果错误\n\n获取策略结果时出错: {strategy_result.get('error', '未知错误')}"
# 提取关键信息
position = strategy_result.get('position', '未知')
entry_points = strategy_result.get('entry_points', [])
if isinstance(entry_points, list):
entry_points_str = ''.join([str(point) for point in entry_points])
else:
entry_points_str = str(entry_points)
exit_points = strategy_result.get('exit_points', [])
if isinstance(exit_points, list):
exit_points_str = ''.join([str(point) for point in exit_points])
else:
exit_points_str = str(exit_points)
stop_loss = strategy_result.get('stop_loss', '未知')
take_profit = strategy_result.get('take_profit', '未知')
# 紧迫性评级
urgency_level = strategy_result.get('urgency_level', 0)
reasoning = strategy_result.get('reasoning', '无理由')
# 根据建议仓位设置图标
if '' in position or 'buy' in str(position).lower():
position_icon = "🟢"
elif '' in position or 'sell' in str(position).lower():
position_icon = "🔴"
else:
position_icon = ""
# 紧迫性等级图标
if int(urgency_level) >= 4:
urgency_icon = "🔴"
elif int(urgency_level) >= 2:
urgency_icon = "🟡"
else:
urgency_icon = "🟢"
# 构建Markdown文本
markdown = f"""### {symbol} 交易策略
**建议操作**: {position_icon} {position}
**入场点位**: {entry_points_str}
**出场点位**: {exit_points_str}
**止损位**: {stop_loss}
**止盈位**: {take_profit}
**操作紧迫性**: {urgency_icon} {urgency_level}/5
**策略理由**:
{reasoning}
*策略生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return markdown
except Exception as e:
print(f"格式化策略结果时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化策略结果出错\n\n{str(e)}"
def format_integrated_report(self, symbol: str, analysis_result: Dict[str, Any]) -> str:
"""
格式化整合后的分析报告
Args:
symbol: 交易对符号
analysis_result: 分析结果
Returns:
格式化后的文本
"""
try:
if not analysis_result or 'error' in analysis_result:
return f"### {symbol} 分析结果错误\n\n获取分析结果时出错: {analysis_result.get('error', '未知错误')}"
# 提取关键信息
market_trend = analysis_result.get('market_trend', '未知')
# 支撑位和阻力位的格式化
def format_fibonacci_levels(levels):
formatted_lines = []
if isinstance(levels, list):
for level in levels:
if isinstance(level, dict) and 'level' in level and 'price' in level:
formatted_lines.append(f"- {level['level']} 位: {level['price']}")
else:
formatted_lines.append(f"- {level}")
elif isinstance(levels, dict):
for level, price in levels.items():
if level.replace('.', '', 1).isdigit(): # 检查是否是数字形式的级别
formatted_lines.append(f"- {level} 位: {price}")
else:
formatted_lines.append(f"- {level}: {price}")
else:
formatted_lines.append(str(levels))
return '\n'.join(formatted_lines)
support_levels = analysis_result.get('support_levels', [])
support_levels_str = format_fibonacci_levels(support_levels)
resistance_levels = analysis_result.get('resistance_levels', [])
resistance_levels_str = format_fibonacci_levels(resistance_levels)
# 技术指标分析
technical_analysis = analysis_result.get('technical_analysis', '未知')
if isinstance(technical_analysis, (dict, list)):
technical_analysis = self._format_complex_content(technical_analysis)
# 交易量分析
volume_analysis = analysis_result.get('volume_analysis', '未知')
if isinstance(volume_analysis, (dict, list)):
volume_analysis = self._format_complex_content(volume_analysis)
# 操作建议
recommendation = analysis_result.get('recommendation', '未知')
entry_points = analysis_result.get('entry_points', [])
if isinstance(entry_points, list):
entry_points_str = ''.join([str(point) for point in entry_points])
else:
entry_points_str = str(entry_points)
exit_points = analysis_result.get('exit_points', [])
if isinstance(exit_points, list):
exit_points_str = ''.join([str(point) for point in exit_points])
else:
exit_points_str = str(exit_points)
stop_loss = analysis_result.get('stop_loss', '未知')
take_profit = analysis_result.get('take_profit', '未知')
# 紧迫性评级
urgency_level = analysis_result.get('urgency_level', 0)
urgency_reason = analysis_result.get('urgency_reason', '未知')
# 紧迫性等级图标
if int(urgency_level) >= 4:
urgency_icon = "🔴"
elif int(urgency_level) >= 2:
urgency_icon = "🟡"
else:
urgency_icon = "🟢"
# 总结
summary = analysis_result.get('summary', '无摘要')
# 根据市场趋势设置图标
if '' in str(market_trend) or 'bull' in str(market_trend).lower():
trend_icon = "🟢"
elif '' in str(market_trend) or 'bear' in str(market_trend).lower():
trend_icon = "🔴"
else:
trend_icon = "🟡"
# 构建Markdown文本
markdown = f"""### DeepSeek AI 加密货币分析报告
**交易对**: {symbol}
**分析时间**: {time.strftime('%Y-%m-%d %H:%M:%S')}
**市场趋势**: {trend_icon} {market_trend}
**技术指标分析**:
{technical_analysis}
**交易量分析**:
{volume_analysis}
**支撑位(斐波那契)**:
{support_levels_str}
**阻力位(斐波那契)**:
{resistance_levels_str}
**操作建议**: {recommendation}
**入场点位**: {entry_points_str}
**出场点位**: {exit_points_str}
**止损位**: {stop_loss}
**止盈位**: {take_profit}
**操作紧迫性**: {urgency_icon} {urgency_level}/5
**紧迫性原因**: {urgency_reason}
**总结**: {summary}
"""
return markdown
except Exception as e:
print(f"格式化整合后的分析报告时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化整合后的分析报告出错\n\n{str(e)}"
def send_analysis_report(self, symbol: str, analysis_data: Dict[str, Any]) -> Dict[str, Any]:
"""
发送分析报告(整合市场分析和交易建议)
Args:
symbol: 交易对符号
analysis_data: 包含分析数据
Returns:
接口返回结果
"""
try:
analysis_result = analysis_data.get('analysis', {})
# 获取市场趋势,用于设置标题图标
market_trend = ''
if analysis_result and 'market_trend' in analysis_result:
market_trend = analysis_result['market_trend']
if '' in str(market_trend) or 'bull' in str(market_trend).lower():
title_icon = "🟢"
elif '' in str(market_trend) or 'bear' in str(market_trend).lower():
title_icon = "🔴"
else:
title_icon = "🟡"
# 获取建议操作
position = analysis_result.get('recommendation', '未知')
# 构建标题
title = f"{title_icon} {symbol} 加密货币分析 | 建议: {position}"
# 格式化分析结果(整合了分析和交易建议)
markdown_text = self.format_integrated_report(symbol, analysis_result)
# 发送Markdown消息
return self.send_markdown(title, markdown_text)
except Exception as e:
print(f"发送分析报告时出错: {e}")
traceback.print_exc()
error_msg = f"### {symbol} 分析报告生成错误\n\n{str(e)}"
return self.send_markdown(f"⚠️ {symbol} 分析报告错误", error_msg)