This commit is contained in:
aaron 2025-04-29 00:21:17 +08:00
parent 44f40477e1
commit 56503ca795
11 changed files with 65 additions and 590 deletions

View File

@ -34,7 +34,4 @@ RUN mkdir -p /app/cryptoai/data /app/logs
# EXPOSE 8000
# 设置容器启动命令
ENTRYPOINT ["python", "run.py"]
# 默认参数可在docker run时覆盖
CMD ["--agent", "crypto"]
ENTRYPOINT ["python", "run.py"]

View File

@ -296,53 +296,12 @@ class CryptoAgent:
except Exception as e:
print(f"保存{symbol}分析结果到数据库时出错: {e}")
# 如果钉钉机器人已启用,发送分析报告
if self.dingtalk_bot:
try:
print(f"发送{symbol}分析报告到钉钉...")
response = self.dingtalk_bot.send_analysis_report(symbol, results[symbol])
if response.get('errcode') == 0:
print(f"{symbol}分析报告发送成功")
else:
print(f"{symbol}分析报告发送失败: {response}")
except Exception as e:
print(f"发送{symbol}分析报告时出错: {e}")
print(f"{symbol}分析完成")
else:
print(f"跳过{symbol}分析,无法获取数据")
return results
def send_notifications(self, symbol: str, analysis_data: Dict[str, Any]) -> bool:
"""
发送分析结果通知
Args:
symbol: 交易对符号
analysis_data: 分析数据
Returns:
发送是否成功
"""
if not self.dingtalk_bot:
print(f"钉钉通知未启用,跳过发送 {symbol} 的分析结果")
return False
try:
# 使用已初始化的钉钉机器人实例发送完整分析报告
response = self.dingtalk_bot.send_analysis_report(symbol, analysis_data)
if response.get('errcode') == 0:
print(f"成功发送 {symbol} 分析结果到钉钉")
return True
else:
print(f"发送 {symbol} 分析结果到钉钉失败: {response}")
return False
except Exception as e:
print(f"发送钉钉通知时出错: {e}")
return False
def run_analysis_cycle(self) -> Dict[str, Any]:
"""
@ -366,12 +325,39 @@ class CryptoAgent:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"分析结果已保存到:{results_file}")
# 把分析结果调用大模型转化成交易建议
message = self.convert_analysis_to_trading_suggestions(results)
print(f"交易建议: {message}")
if self.dingtalk_bot:
self.dingtalk_bot.send_markdown(title="加密货币交易建议", text=message)
# 导出 DeepSeek API token 使用情况
self._export_token_usage()
return results
def convert_analysis_to_trading_suggestions(self, results: Dict[str, Any]) -> str:
"""
把分析的JSON结果调用大模型转化成交易建议
"""
prompt = f"""
请对以下加密货币市场分析的JSON结果进行深入分析 转化成包含分析时间技术分析支撑位压力位 建议买点卖点止损位止盈位仓位建议 增加适当的emoji便于阅读 简单明了
分析 JSON 结果:
{results}
"""
system_prompt = """
你是一个专业的加密货币分析助手你擅长分析市场趋势预测价格走向和提供交易建议请始终使用中文回复并确保输出格式规范的Markdown
"""
response, usage = self.deepseek_api.call_model(prompt, system_prompt=system_prompt, task_type="交易建议")
message = self.deepseek_api.extract_text_from_response(response)
return message
def _export_token_usage(self) -> None:
"""
导出 DeepSeek API token 使用情况

View File

@ -300,55 +300,12 @@ class GoldAgent:
"analysis": analysis_result,
"timestamp": datetime.now().isoformat()
}
# 如果钉钉机器人已启用,发送分析报告
if self.dingtalk_bot:
try:
print(f"发送{symbol}分析报告到钉钉...")
response = self.dingtalk_bot.send_analysis_report(symbol, results[symbol])
if response.get('errcode') == 0:
print(f"{symbol}分析报告发送成功")
else:
print(f"{symbol}分析报告发送失败: {response}")
except Exception as e:
print(f"发送{symbol}分析报告时出错: {e}")
print(f"{symbol}分析完成")
else:
print(f"跳过{symbol}分析,无法获取数据")
return results
def send_notifications(self, symbol: str, analysis_data: Dict[str, Any]) -> bool:
"""
发送分析结果通知
Args:
symbol: 交易对符号
analysis_data: 分析数据
Returns:
发送是否成功
"""
if not self.dingtalk_bot:
print(f"钉钉通知未启用,跳过发送 {symbol} 的分析结果")
return False
try:
# 使用已初始化的钉钉机器人实例发送完整分析报告
response = self.dingtalk_bot.send_analysis_report(symbol, analysis_data)
if response.get('errcode') == 0:
print(f"成功发送 {symbol} 分析结果到钉钉")
return True
else:
print(f"发送 {symbol} 分析结果到钉钉失败: {response}")
return False
except Exception as e:
print(f"发送钉钉通知时出错: {e}")
return False
def run_analysis_cycle(self) -> Dict[str, Any]:
"""
运行一个完整的分析周期

View File

@ -122,6 +122,28 @@ class DeepSeekAPI:
self.logger.error(error_msg)
return {}, usage_info
def extract_text_from_response(self, response: Dict[str, Any]) -> str:
"""
从响应中提取文本数据
"""
try:
if 'choices' in response and len(response['choices']) > 0:
content = response['choices'][0]['message']['content']
# 如果内容以```markdown开头则去掉```
if content.startswith('```markdown'):
content = content[len('```markdown'):]
# 如果内容以```结尾,则去掉```
if content.endswith('```'):
content = content[:-len('```')]
return content
else:
return {"error": "无法从响应中提取文本", "raw_content": response}
except Exception as e:
return {"error": str(e), "raw_content": response}
def extract_json_from_response(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""
从响应中提取JSON数据

View File

@ -21,16 +21,15 @@ alltick:
api_key: "ee66d8e2868fd988fffacec40d078df8-c-app"
symbols:
- "XAUUSD" # 黄金/美元
# - "XAGUSD" # 白银/美元
# 加密货币设置
crypto:
base_currencies:
- "BTC"
- "ETH"
- "SOL"
- "SUI"
- "XRP"
# - "SOL"
# - "SUI"
# - "XRP"
quote_currency: "USDT"
time_interval: "1h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d
historical_days: 30

View File

@ -5,7 +5,8 @@ import os
import sys
import argparse
from typing import Dict, Any
import schedule
import time
# 添加项目根目录到Python路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
@ -17,8 +18,15 @@ from cryptoai.utils.config_loader import ConfigLoader
def main():
try:
# 启动智能体
CryptoAgent().start_agent()
print("程序启动")
# 设置每个整点运行一次
schedule.every().hour.do(CryptoAgent().start_agent)
# 启动定时任务
while True:
schedule.run_pending()
time.sleep(1)
# CryptoAgent().start_agent()
except KeyboardInterrupt:
print("\n程序已退出")

View File

@ -112,497 +112,4 @@ class DingTalkBot:
except Exception as e:
print(f"发送钉钉消息时出错: {e}")
traceback.print_exc()
return {"errcode": -1, "errmsg": str(e)}
def _format_complex_content(self, content) -> str:
"""
格式化复杂内容JSON对象或列表为易读的文本
Args:
content: 需要格式化的内容
Returns:
格式化后的文本
"""
if isinstance(content, dict):
# 将字典转换为项目列表
formatted_text = ""
for key, value in content.items():
key_display = key.replace('_', ' ').title()
if isinstance(value, (dict, list)):
formatted_text += f"- **{key_display}**: \n"
# 递归处理嵌套结构
nested_text = self._format_complex_content(value)
# 增加缩进
nested_text = '\n'.join([f" {line}" for line in nested_text.split('\n')])
formatted_text += f"{nested_text}\n"
else:
formatted_text += f"- **{key_display}**: {value}\n"
return formatted_text
elif isinstance(content, list):
# 将列表转换为项目列表
formatted_text = ""
for item in content:
if isinstance(item, (dict, list)):
nested_text = self._format_complex_content(item)
formatted_text += f"- {nested_text}\n"
else:
formatted_text += f"- {item}\n"
return formatted_text
else:
# 简单类型直接返回字符串
return str(content)
def format_analysis_result(self, symbol: str, analysis_result: Dict[str, Any]) -> str:
"""
格式化分析结果为Markdown格式
Args:
symbol: 交易对符号
analysis_result: 分析结果
Returns:
格式化后的Markdown文本
"""
try:
if not analysis_result or 'error' in analysis_result:
return f"### {symbol} 分析结果错误\n\n获取分析结果时出错: {analysis_result.get('error', '未知错误')}"
# 提取关键信息
market_trend = analysis_result.get('market_trend', '未知')
# 支撑位和阻力位
support_levels = analysis_result.get('support_levels', [])
if isinstance(support_levels, list):
support_levels_str = '\n'.join([f"- {level}" for level in support_levels])
else:
support_levels_str = str(support_levels)
resistance_levels = analysis_result.get('resistance_levels', [])
if isinstance(resistance_levels, list):
resistance_levels_str = '\n'.join([f"- {level}" for level in resistance_levels])
else:
resistance_levels_str = str(resistance_levels)
# 技术指标分析
technical_analysis = analysis_result.get('technical_analysis', '未知')
if isinstance(technical_analysis, (dict, list)):
technical_analysis = self._format_complex_content(technical_analysis)
# 交易量分析
volume_analysis = analysis_result.get('volume_analysis', '未知')
if isinstance(volume_analysis, (dict, list)):
volume_analysis = self._format_complex_content(volume_analysis)
# 操作建议
recommendation = analysis_result.get('recommendation', '未知')
entry_points = analysis_result.get('entry_points', [])
if isinstance(entry_points, list):
entry_points_str = ''.join([str(point) for point in entry_points])
else:
entry_points_str = str(entry_points)
exit_points = analysis_result.get('exit_points', [])
if isinstance(exit_points, list):
exit_points_str = ''.join([str(point) for point in exit_points])
else:
exit_points_str = str(exit_points)
stop_loss = analysis_result.get('stop_loss', '未知')
take_profit = analysis_result.get('take_profit', '未知')
# 紧迫性评级
urgency_level = analysis_result.get('urgency_level', 0)
urgency_reason = analysis_result.get('urgency_reason', '未知')
# 紧迫性等级图标
if int(urgency_level) >= 4:
urgency_icon = "🔴"
elif int(urgency_level) >= 2:
urgency_icon = "🟡"
else:
urgency_icon = "🟢"
# 总结
summary = analysis_result.get('summary', '无摘要')
# 根据市场趋势设置图标
if '' in str(market_trend) or 'bull' in str(market_trend).lower():
trend_icon = "🟢"
elif '' in str(market_trend) or 'bear' in str(market_trend).lower():
trend_icon = "🔴"
else:
trend_icon = "🟡"
# 构建Markdown文本
markdown = f"""### {trend_icon} {symbol} 市场分析
**市场趋势**: {market_trend}
**技术指标分析**:
{technical_analysis}
**交易量分析**:
{volume_analysis}
**支撑位(斐波那契)**:
{support_levels_str}
**阻力位(斐波那契)**:
{resistance_levels_str}
**操作建议**: {recommendation}
**入场点位**: {entry_points_str}
**出场点位**: {exit_points_str}
**止损位**: {stop_loss}
**止盈位**: {take_profit}
**操作紧迫性**: {urgency_icon} {urgency_level}/5
**紧迫性原因**: {urgency_reason}
**总结**: {summary}
*分析时间: {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return markdown
except Exception as e:
print(f"格式化分析结果时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化分析结果出错\n\n{str(e)}"
def format_prediction_result(self, symbol: str, prediction_result: Dict[str, Any]) -> str:
"""
格式化预测结果为Markdown格式
Args:
symbol: 交易对符号
prediction_result: 预测结果
Returns:
格式化后的Markdown文本
"""
try:
if not prediction_result or 'error' in prediction_result:
return f"### {symbol} 预测结果错误\n\n获取预测结果时出错: {prediction_result.get('error', '未知错误')}"
# 提取关键信息
current_price = prediction_result.get('current_price', '未知')
# 24小时预测
prediction_24h = prediction_result.get('prediction_24h', {})
price_range_24h = prediction_24h.get('price_range', '未知')
trend_24h = prediction_24h.get('trend', '未知')
# 7天预测
prediction_7d = prediction_result.get('prediction_7d', {})
price_range_7d = prediction_7d.get('price_range', '未知')
trend_7d = prediction_7d.get('trend', '未知')
# 关键因素
key_factors = prediction_result.get('key_factors', [])
if isinstance(key_factors, list):
key_factors_text = '\n'.join([f"- {factor}" for factor in key_factors])
else:
key_factors_text = str(key_factors)
# 根据24小时趋势设置图标
if '上涨' in str(trend_24h) or 'up' in str(trend_24h).lower():
trend_icon = "🟢"
elif '下跌' in str(trend_24h) or 'down' in str(trend_24h).lower():
trend_icon = "🔴"
else:
trend_icon = "🟡"
# 构建Markdown文本
markdown = f"""### {trend_icon} {symbol} 价格预测
**当前价格**: {current_price}
**24小时预测**:
- 价格区间: {price_range_24h}
- 趋势: {trend_24h}
**7天预测**:
- 价格区间: {price_range_7d}
- 趋势: {trend_7d}
**关键影响因素**:
{key_factors_text}
*预测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return markdown
except Exception as e:
print(f"格式化预测结果时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化预测结果出错\n\n{str(e)}"
def format_strategy_result(self, symbol: str, strategy_result: Dict[str, Any]) -> str:
"""
格式化策略结果为Markdown格式
Args:
symbol: 交易对符号
strategy_result: 策略结果
Returns:
格式化后的Markdown文本
"""
try:
if not strategy_result or 'error' in strategy_result:
return f"### {symbol} 策略结果错误\n\n获取策略结果时出错: {strategy_result.get('error', '未知错误')}"
# 提取关键信息
position = strategy_result.get('position', '未知')
entry_points = strategy_result.get('entry_points', [])
if isinstance(entry_points, list):
entry_points_str = ''.join([str(point) for point in entry_points])
else:
entry_points_str = str(entry_points)
exit_points = strategy_result.get('exit_points', [])
if isinstance(exit_points, list):
exit_points_str = ''.join([str(point) for point in exit_points])
else:
exit_points_str = str(exit_points)
stop_loss = strategy_result.get('stop_loss', '未知')
take_profit = strategy_result.get('take_profit', '未知')
# 紧迫性评级
urgency_level = strategy_result.get('urgency_level', 0)
reasoning = strategy_result.get('reasoning', '无理由')
# 根据建议仓位设置图标
if '' in position or 'buy' in str(position).lower():
position_icon = "🟢"
elif '' in position or 'sell' in str(position).lower():
position_icon = "🔴"
else:
position_icon = ""
# 紧迫性等级图标
if int(urgency_level) >= 4:
urgency_icon = "🔴"
elif int(urgency_level) >= 2:
urgency_icon = "🟡"
else:
urgency_icon = "🟢"
# 构建Markdown文本
markdown = f"""### {symbol} 交易策略
**建议操作**: {position_icon} {position}
**入场点位**: {entry_points_str}
**出场点位**: {exit_points_str}
**止损位**: {stop_loss}
**止盈位**: {take_profit}
**操作紧迫性**: {urgency_icon} {urgency_level}/5
**策略理由**:
{reasoning}
*策略生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return markdown
except Exception as e:
print(f"格式化策略结果时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化策略结果出错\n\n{str(e)}"
def format_integrated_report(self, symbol: str, analysis_result: Dict[str, Any]) -> str:
"""
格式化整合后的分析报告
Args:
symbol: 交易对符号
analysis_result: 分析结果
Returns:
格式化后的文本
"""
try:
if not analysis_result or 'error' in analysis_result:
return f"### {symbol} 分析结果错误\n\n获取分析结果时出错: {analysis_result.get('error', '未知错误')}"
# 提取关键信息
market_trend = analysis_result.get('market_trend', '未知')
# 支撑位和阻力位的格式化
def format_fibonacci_levels(levels):
formatted_lines = []
if isinstance(levels, list):
for level in levels:
if isinstance(level, dict) and 'level' in level and 'price' in level:
formatted_lines.append(f"- {level['level']} 位: {level['price']}")
else:
formatted_lines.append(f"- {level}")
elif isinstance(levels, dict):
for level, price in levels.items():
if level.replace('.', '', 1).isdigit(): # 检查是否是数字形式的级别
formatted_lines.append(f"- {level} 位: {price}")
else:
formatted_lines.append(f"- {level}: {price}")
else:
formatted_lines.append(str(levels))
return '\n'.join(formatted_lines)
support_levels = analysis_result.get('support_levels', [])
support_levels_str = format_fibonacci_levels(support_levels)
resistance_levels = analysis_result.get('resistance_levels', [])
resistance_levels_str = format_fibonacci_levels(resistance_levels)
# 技术指标分析
technical_analysis = analysis_result.get('technical_analysis', '未知')
if isinstance(technical_analysis, (dict, list)):
technical_analysis = self._format_complex_content(technical_analysis)
# 交易量分析
volume_analysis = analysis_result.get('volume_analysis', '未知')
if isinstance(volume_analysis, (dict, list)):
volume_analysis = self._format_complex_content(volume_analysis)
# 操作建议
recommendation = analysis_result.get('recommendation', '未知')
entry_points = analysis_result.get('entry_points', [])
if isinstance(entry_points, list):
entry_points_str = ''.join([str(point) for point in entry_points])
else:
entry_points_str = str(entry_points)
exit_points = analysis_result.get('exit_points', [])
if isinstance(exit_points, list):
exit_points_str = ''.join([str(point) for point in exit_points])
else:
exit_points_str = str(exit_points)
stop_loss = analysis_result.get('stop_loss', '未知')
take_profit = analysis_result.get('take_profit', '未知')
# 紧迫性评级
urgency_level = analysis_result.get('urgency_level', 0)
urgency_reason = analysis_result.get('urgency_reason', '未知')
# 紧迫性等级图标
if int(urgency_level) >= 4:
urgency_icon = "🔴"
elif int(urgency_level) >= 2:
urgency_icon = "🟡"
else:
urgency_icon = "🟢"
# 总结
summary = analysis_result.get('summary', '无摘要')
# 根据市场趋势设置图标
if '' in str(market_trend) or 'bull' in str(market_trend).lower():
trend_icon = "🟢"
elif '' in str(market_trend) or 'bear' in str(market_trend).lower():
trend_icon = "🔴"
else:
trend_icon = "🟡"
# 构建Markdown文本
markdown = f"""### DeepSeek AI 加密货币分析报告
**交易对**: {symbol}
**分析时间**: {time.strftime('%Y-%m-%d %H:%M:%S')}
**市场趋势**: {trend_icon} {market_trend}
**技术指标分析**:
{technical_analysis}
**交易量分析**:
{volume_analysis}
**支撑位(斐波那契)**:
{support_levels_str}
**阻力位(斐波那契)**:
{resistance_levels_str}
**操作建议**: {recommendation}
**入场点位**: {entry_points_str}
**出场点位**: {exit_points_str}
**止损位**: {stop_loss}
**止盈位**: {take_profit}
**操作紧迫性**: {urgency_icon} {urgency_level}/5
**紧迫性原因**: {urgency_reason}
**总结**: {summary}
"""
return markdown
except Exception as e:
print(f"格式化整合后的分析报告时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化整合后的分析报告出错\n\n{str(e)}"
def send_analysis_report(self, symbol: str, analysis_data: Dict[str, Any]) -> Dict[str, Any]:
"""
发送分析报告整合市场分析和交易建议
Args:
symbol: 交易对符号
analysis_data: 包含分析数据
Returns:
接口返回结果
"""
try:
analysis_result = analysis_data.get('analysis', {})
# 获取市场趋势,用于设置标题图标
market_trend = ''
if analysis_result and 'market_trend' in analysis_result:
market_trend = analysis_result['market_trend']
if '' in str(market_trend) or 'bull' in str(market_trend).lower():
title_icon = "🟢"
elif '' in str(market_trend) or 'bear' in str(market_trend).lower():
title_icon = "🔴"
else:
title_icon = "🟡"
# 获取建议操作
position = analysis_result.get('recommendation', '未知')
# 构建标题
title = f"{title_icon} {symbol} 加密货币分析 | 建议: {position}"
# 格式化分析结果(整合了分析和交易建议)
markdown_text = self.format_integrated_report(symbol, analysis_result)
# 发送Markdown消息
return self.send_markdown(title, markdown_text)
except Exception as e:
print(f"发送分析报告时出错: {e}")
traceback.print_exc()
error_msg = f"### {symbol} 分析报告生成错误\n\n{str(e)}"
return self.send_markdown(f"⚠️ {symbol} 分析报告错误", error_msg)
return {"errcode": -1, "errmsg": str(e)}

View File

@ -14,7 +14,6 @@ services:
# 持久化数据和日志
- cryptoai_data:/app/cryptoai/data
- cryptoai_logs:/app/logs
command: ["--agent", "crypto"]
volumes:
cryptoai_data: