This commit is contained in:
aaron 2025-04-27 14:12:27 +08:00
parent f28d96ee97
commit 098ba1ad78
11 changed files with 613 additions and 17 deletions

56
.gitignore vendored Normal file
View File

@ -0,0 +1,56 @@
# Python缓存文件
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# 虚拟环境
venv/
ENV/
env/
.env
.venv
# IDE相关文件
.idea/
.vscode/
*.swp
*.swo
.DS_Store
# 日志和数据文件
*.log
*.csv
*.json
*.pkl
logs/
*.db
# 项目特定忽略
# 忽略数据目录,但保留目录结构
cryptoai/data/*
!cryptoai/data/.gitkeep
# 忽略配置文件(包含敏感信息)
cryptoai/config/config.yaml
# 但保留示例配置
!cryptoai/config/config.example.yaml
# 忽略生成的分析结果
cryptoai/data/analysis_results/

View File

@ -15,6 +15,7 @@ from api.binance_api import BinanceAPI
from api.deepseek_api import DeepSeekAPI
from models.data_processor import DataProcessor
from utils.config_loader import ConfigLoader
from utils.dingtalk_bot import DingTalkBot
class CryptoAgent:
@ -36,6 +37,7 @@ class CryptoAgent:
self.crypto_config = self.config_loader.get_crypto_config()
self.data_config = self.config_loader.get_data_config()
self.agent_config = self.config_loader.get_agent_config()
self.dingtalk_config = self.config_loader.get_dingtalk_config()
# 初始化API客户端
self.binance_api = BinanceAPI(
@ -52,6 +54,15 @@ class CryptoAgent:
# 初始化数据处理器
self.data_processor = DataProcessor(storage_path=self.data_config['storage_path'])
# 初始化钉钉机器人(如果启用)
self.dingtalk_bot = None
if self.dingtalk_config.get('enabled', False):
self.dingtalk_bot = DingTalkBot(
webhook_url=self.dingtalk_config['webhook_url'],
secret=self.dingtalk_config.get('secret')
)
print("钉钉机器人已启用")
# 设置支持的加密货币
self.base_currencies = self.crypto_config['base_currencies']
self.quote_currency = self.crypto_config['quote_currency']
@ -306,6 +317,18 @@ class CryptoAgent:
"timestamp": datetime.now().isoformat()
}
# 如果钉钉机器人已启用,发送分析报告
if self.dingtalk_bot:
try:
print(f"发送{symbol}分析报告到钉钉...")
response = self.dingtalk_bot.send_analysis_report(symbol, results[symbol])
if response.get('errcode') == 0:
print(f"{symbol}分析报告发送成功")
else:
print(f"{symbol}分析报告发送失败: {response}")
except Exception as e:
print(f"发送{symbol}分析报告时出错: {e}")
print(f"{symbol}分析完成")
else:
print(f"跳过{symbol}分析,无法获取数据")
@ -349,7 +372,7 @@ class CryptoAgent:
}
# 简单策略执行逻辑
if position == 'BUY':
if position == 'BUY' or '' in position:
# 示例:下单买入
# 在实际应用中,这里应该有更复杂的仓位管理和风险管理逻辑
quantity = 0.01 # 示例数量,实际应用中应该基于资金和风险计算
@ -364,7 +387,11 @@ class CryptoAgent:
result["action"] = "BUY"
result["order_result"] = order_result
elif position == 'SELL':
# 如果钉钉机器人已启用,发送交易通知
if self.dingtalk_bot:
self.send_trade_notification(symbol, "买入", quantity, current_price, strategy)
elif position == 'SELL' or '' in position:
# 示例:下单卖出
quantity = 0.01 # 示例数量
@ -378,6 +405,10 @@ class CryptoAgent:
result["action"] = "SELL"
result["order_result"] = order_result
# 如果钉钉机器人已启用,发送交易通知
if self.dingtalk_bot:
self.send_trade_notification(symbol, "卖出", quantity, current_price, strategy)
else:
# 持有或其他策略
result["action"] = "HOLD"
@ -386,6 +417,56 @@ class CryptoAgent:
print(f"执行{symbol}策略完成:{result['action']}")
return result
def send_trade_notification(self, symbol: str, action: str, quantity: float, price: float, strategy: Dict[str, Any]) -> None:
"""
发送交易通知到钉钉
Args:
symbol: 交易对符号
action: 交易操作买入或卖出
quantity: 交易数量
price: 交易价格
strategy: 交易策略
"""
if not self.dingtalk_bot:
return
try:
# 设置图标
if action == "买入":
icon = "🟢"
else:
icon = "🔴"
# 获取策略理由
reasoning = strategy.get('reasoning', '无理由')
# 构建通知内容
title = f"{icon} {symbol} {action}交易执行通知"
text = f"""### {symbol} {action}交易已执行
**交易详情**:
- 操作: {icon} **{action}**
- 数量: {quantity}
- 价格: {price}
- 总额: {quantity * price} {self.quote_currency}
**交易理由**:
{reasoning}
*交易时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""
# 发送通知
at_mobiles = self.dingtalk_config.get('at_mobiles', [])
at_all = self.dingtalk_config.get('at_all', False)
self.dingtalk_bot.send_markdown(title, text, at_mobiles, at_all)
print(f"{symbol} {action}交易通知已发送")
except Exception as e:
print(f"发送交易通知时出错: {e}")
def run_analysis_cycle(self) -> Dict[str, Any]:
"""
运行一个完整的分析周期

View File

@ -106,7 +106,7 @@ class DeepSeekAPI:
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "你是一个专业的加密货币分析助手,擅长分析市场趋势、预测价格走向和提供交易建议。"},
{"role": "system", "content": "你是一个专业的加密货币分析助手,擅长分析市场趋势、预测价格走向和提供交易建议。请始终使用中文回复并确保输出格式规范的JSON。"},
{"role": "user", "content": prompt}
],
"temperature": 0.2, # 低温度使输出更加确定性
@ -159,7 +159,7 @@ class DeepSeekAPI:
Returns:
提示词
"""
return f"""请分析以下加密货币市场数据,并提供详细的市场分析。
return f"""请分析以下加密货币市场数据,并提供详细的市场分析。请使用中文回复。
数据:
{formatted_data}
@ -172,7 +172,7 @@ class DeepSeekAPI:
5. 关键技术指标解读如RSIMACD等
请以JSON格式回复包含以下字段:
- market_trend: 市场趋势 (bullish, bearish, neutral)
- market_trend: 市场趋势 (牛市, 熊市, 震荡)
- support_levels: 支撑位列表
- resistance_levels: 阻力位列表
- volume_analysis: 交易量分析
@ -180,7 +180,7 @@ class DeepSeekAPI:
- technical_indicators: 技术指标分析
- summary: 总结
请确保回复为有效的JSON格式"""
请确保回复为有效的JSON格式并使用中文进行分析"""
def _build_price_prediction_prompt(self, symbol: str, formatted_data: str) -> str:
"""
@ -193,7 +193,7 @@ class DeepSeekAPI:
Returns:
提示词
"""
return f"""请基于以下{symbol}的历史数据预测未来24小时、7天和30天的价格走势。
return f"""请基于以下{symbol}的历史数据预测未来24小时、7天和30天的价格走势。请使用中文回复。
历史数据:
{formatted_data}
@ -203,13 +203,13 @@ class DeepSeekAPI:
请以JSON格式回复包含以下字段:
- symbol: 交易对符号
- current_price: 当前价格
- prediction_24h: 24小时预测 (包含 price_range, trend, confidence)
- prediction_7d: 7天预测 (包含 price_range, trend, confidence)
- prediction_30d: 30天预测 (包含 price_range, trend, confidence)
- prediction_24h: 24小时预测 (包含 price_range价格区间, trend趋势, confidence置信度)
- prediction_7d: 7天预测 (包含 price_range价格区间, trend趋势, confidence置信度)
- prediction_30d: 30天预测 (包含 price_range价格区间, trend趋势, confidence置信度)
- key_factors: 影响预测的关键因素
- risk_assessment: 风险评估
请确保回复为有效的JSON格式"""
请确保回复为有效的JSON格式并使用中文进行分析"""
def _build_trading_strategy_prompt(self, symbol: str, analysis_result: Dict[str, Any], risk_level: str) -> str:
"""
@ -225,7 +225,7 @@ class DeepSeekAPI:
"""
analysis_json = json.dumps(analysis_result, indent=2)
return f"""请基于以下{symbol}的市场分析结果,生成一个风险等级为{risk_level}的交易策略。
return f"""请基于以下{symbol}的市场分析结果,生成一个风险等级为{risk_level}的交易策略。请使用中文回复。
分析结果:
{analysis_json}
@ -234,7 +234,7 @@ class DeepSeekAPI:
请以JSON格式回复包含以下字段:
- symbol: 交易对符号
- risk_level: 风险等级 ({risk_level})
- risk_level: 风险等级 (low低风险, medium中风险, high高风险)
- position: 建议仓位 (买入卖出持有)
- entry_points: 入场点列表
- exit_points: 出场点列表
@ -244,7 +244,7 @@ class DeepSeekAPI:
- strategy_type: 策略类型 (例如趋势跟踪反转突破等)
- reasoning: 策略推理过程
请确保回复为有效的JSON格式"""
请确保回复为有效的JSON格式并使用中文进行分析"""
def _parse_analysis_response(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""

View File

@ -0,0 +1,49 @@
# Binance API设置
binance:
api_key: "your_binance_api_key_here"
api_secret: "your_binance_api_secret_here"
test_mode: true # 设置为false将使用实盘交易
# DeepSeek AI设置
deepseek:
api_key: "your_deepseek_api_key_here"
model: "deepseek-chat" # 使用的模型
# 加密货币设置
crypto:
base_currencies:
- "BTC"
- "ETH"
- "BNB"
- "SOL"
- "ADA"
quote_currency: "USDT"
time_interval: "4h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d
# 数据设置
data:
storage_path: "./cryptoai/data"
historical_days: 30
update_interval: 60 # 数据更新间隔(分钟)
# Agent设置
agent:
analysis_interval: 120 # 分析间隔(分钟)
strategies:
- "trend_following"
- "momentum"
- "sentiment"
risk_level: "medium" # 可选: low, medium, high
# 日志设置
logging:
level: "INFO" # 可选: DEBUG, INFO, WARNING, ERROR
file_path: "./cryptoai/logs"
# 钉钉机器人设置
dingtalk:
enabled: true # 是否启用钉钉机器人
webhook_url: "https://oapi.dingtalk.com/robot/send?access_token=your_dingtalk_token_here"
secret: "your_dingtalk_secret_here" # 如果没有设置安全设置,可以为空
at_mobiles: [] # 需要@的手机号列表
at_all: false # 是否@所有人

View File

@ -18,7 +18,7 @@ crypto:
# - "SOL"
# - "ADA"
quote_currency: "USDT"
time_interval: "1d" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d
time_interval: "4h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d
# 数据设置
data:
@ -39,3 +39,11 @@ agent:
logging:
level: "INFO" # 可选: DEBUG, INFO, WARNING, ERROR
file_path: "./cryptoai/logs"
# 钉钉机器人设置
dingtalk:
enabled: true # 是否启用钉钉机器人
webhook_url: "https://oapi.dingtalk.com/robot/send?access_token=2278b723cd363bb6f85592c743b59b166e70b9e02a275bb5cedbc33b53a5cbdc"
secret: "your_secret" # 如果没有设置安全设置,可以为空
at_mobiles: [] # 需要@的手机号列表
at_all: false # 是否@所有人

0
cryptoai/data/.gitkeep Normal file
View File

View File

@ -74,3 +74,7 @@ class ConfigLoader:
def get_logging_config(self) -> Dict[str, Any]:
"""获取日志配置"""
return self.get_config('logging')
def get_dingtalk_config(self) -> Dict[str, Any]:
"""获取钉钉机器人配置"""
return self.get_config('dingtalk')

View File

@ -0,0 +1,398 @@
import json
import requests
import time
import hmac
import hashlib
import base64
import urllib.parse
import traceback
from typing import Dict, Any, List, Optional, Union
class DingTalkBot:
"""钉钉机器人工具类,用于发送分析结果到钉钉群"""
def __init__(self, webhook_url: str, secret: Optional[str] = None):
"""
初始化钉钉机器人
Args:
webhook_url: 钉钉机器人的webhook地址
secret: 钉钉机器人的签名密钥如果启用了安全设置
"""
self.webhook_url = webhook_url
self.secret = secret
def _sign(self) -> str:
"""
生成钉钉机器人签名
Returns:
签名后的URL
"""
if not self.secret:
return self.webhook_url
timestamp = str(round(time.time() * 1000))
string_to_sign = f"{timestamp}\n{self.secret}"
hmac_code = hmac.new(
self.secret.encode(),
string_to_sign.encode(),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
def send_text(self, content: str, at_mobiles: List[str] = None, at_all: bool = False) -> Dict[str, Any]:
"""
发送文本消息
Args:
content: 消息内容
at_mobiles: @的手机号列表
at_all: 是否@所有人
Returns:
接口返回结果
"""
data = {
"msgtype": "text",
"text": {"content": content},
"at": {
"atMobiles": at_mobiles if at_mobiles else [],
"isAtAll": at_all
}
}
return self._post(data)
def send_markdown(self, title: str, text: str, at_mobiles: List[str] = None, at_all: bool = False) -> Dict[str, Any]:
"""
发送Markdown消息
Args:
title: 消息标题
text: Markdown格式的消息内容
at_mobiles: @的手机号列表
at_all: 是否@所有人
Returns:
接口返回结果
"""
data = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": text
},
"at": {
"atMobiles": at_mobiles if at_mobiles else [],
"isAtAll": at_all
}
}
return self._post(data)
def _post(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
发送消息到钉钉
Args:
data: 消息数据
Returns:
接口返回结果
"""
try:
webhook = self._sign()
headers = {'Content-Type': 'application/json; charset=utf-8'}
response = requests.post(webhook, json=data, headers=headers)
return response.json()
except Exception as e:
print(f"发送钉钉消息时出错: {e}")
traceback.print_exc()
return {"errcode": -1, "errmsg": str(e)}
def format_analysis_result(self, symbol: str, analysis_result: Dict[str, Any]) -> str:
"""
格式化分析结果为Markdown格式
Args:
symbol: 交易对符号
analysis_result: 分析结果
Returns:
格式化后的Markdown文本
"""
try:
if not analysis_result or 'error' in analysis_result:
return f"### {symbol} 分析结果错误\n\n获取分析结果时出错: {analysis_result.get('error', '未知错误')}"
# 提取关键信息
market_trend = analysis_result.get('market_trend', '未知')
support_levels = analysis_result.get('support_levels', [])
if isinstance(support_levels, list):
support_levels_str = ''.join([str(level) for level in support_levels])
else:
support_levels_str = str(support_levels)
resistance_levels = analysis_result.get('resistance_levels', [])
if isinstance(resistance_levels, list):
resistance_levels_str = ''.join([str(level) for level in resistance_levels])
else:
resistance_levels_str = str(resistance_levels)
volume_analysis = analysis_result.get('volume_analysis', '未知')
market_sentiment = analysis_result.get('market_sentiment', '未知')
summary = analysis_result.get('summary', '无摘要')
# 根据市场趋势设置颜色标志
if '' in market_trend or 'bull' in str(market_trend).lower():
trend_icon = "🟢"
elif '' in market_trend or 'bear' in str(market_trend).lower():
trend_icon = "🔴"
else:
trend_icon = "🟡"
# 构建Markdown文本
markdown = f"""### {trend_icon} {symbol} 市场分析
**市场趋势**: {market_trend}
**支撑位**: {support_levels_str}
**阻力位**: {resistance_levels_str}
**交易量分析**: {volume_analysis}
**市场情绪**: {market_sentiment}
**总结**: {summary}
*分析时间: {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return markdown
except Exception as e:
print(f"格式化分析结果时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化分析结果出错\n\n{str(e)}"
def format_prediction_result(self, symbol: str, prediction_result: Dict[str, Any]) -> str:
"""
格式化预测结果为Markdown格式
Args:
symbol: 交易对符号
prediction_result: 预测结果
Returns:
格式化后的Markdown文本
"""
try:
if not prediction_result or 'error' in prediction_result:
return f"### {symbol} 预测结果错误\n\n获取预测结果时出错: {prediction_result.get('error', '未知错误')}"
# 提取关键信息
current_price = prediction_result.get('current_price', '未知')
prediction_24h = prediction_result.get('prediction_24h', {})
prediction_7d = prediction_result.get('prediction_7d', {})
prediction_30d = prediction_result.get('prediction_30d', {})
key_factors = prediction_result.get('key_factors', [])
if isinstance(key_factors, list):
key_factors_str = '\n'.join([f"- {factor}" for factor in key_factors])
else:
key_factors_str = str(key_factors)
risk_assessment = prediction_result.get('risk_assessment', '未知')
# 格式化预测数据
def format_prediction(pred_data):
if not pred_data:
return "无数据"
price_range = pred_data.get('price_range', '未知')
trend = pred_data.get('trend', '未知')
confidence = pred_data.get('confidence', '未知')
# 根据趋势设置图标
if '上升' in str(trend) or '增长' in str(trend) or 'up' in str(trend).lower():
trend_icon = "📈"
elif '下降' in str(trend) or '下跌' in str(trend) or 'down' in str(trend).lower():
trend_icon = "📉"
else:
trend_icon = "📊"
return f"{trend_icon} **{trend}** (价格区间: {price_range}, 置信度: {confidence})"
# 构建Markdown文本
markdown = f"""### {symbol} 价格预测
**当前价格**: {current_price}
**24小时预测**: {format_prediction(prediction_24h)}
**7天预测**: {format_prediction(prediction_7d)}
**30天预测**: {format_prediction(prediction_30d)}
**关键影响因素**:
{key_factors_str}
**风险评估**: {risk_assessment}
*预测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return markdown
except Exception as e:
print(f"格式化预测结果时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化预测结果出错\n\n{str(e)}"
def format_strategy_result(self, symbol: str, strategy_result: Dict[str, Any]) -> str:
"""
格式化策略结果为Markdown格式
Args:
symbol: 交易对符号
strategy_result: 策略结果
Returns:
格式化后的Markdown文本
"""
try:
if not strategy_result or 'error' in strategy_result:
return f"### {symbol} 策略结果错误\n\n获取策略结果时出错: {strategy_result.get('error', '未知错误')}"
# 提取关键信息
risk_level = strategy_result.get('risk_level', '未知')
position = strategy_result.get('position', '未知')
entry_points = strategy_result.get('entry_points', [])
if isinstance(entry_points, list):
entry_points_str = ''.join([str(point) for point in entry_points])
else:
entry_points_str = str(entry_points)
exit_points = strategy_result.get('exit_points', [])
if isinstance(exit_points, list):
exit_points_str = ''.join([str(point) for point in exit_points])
else:
exit_points_str = str(exit_points)
stop_loss = strategy_result.get('stop_loss', '未知')
take_profit = strategy_result.get('take_profit', '未知')
time_frame = strategy_result.get('time_frame', '未知')
strategy_type = strategy_result.get('strategy_type', '未知')
reasoning = strategy_result.get('reasoning', '无理由')
# 根据建议仓位设置图标
if '' in position or 'buy' in str(position).lower():
position_icon = "🟢"
elif '' in position or 'sell' in str(position).lower():
position_icon = "🔴"
else:
position_icon = ""
# 根据风险等级设置图标
if 'low' in str(risk_level).lower() or '' in str(risk_level):
risk_icon = "🟢"
elif 'medium' in str(risk_level).lower() or '' in str(risk_level):
risk_icon = "🟡"
else:
risk_icon = "🔴"
# 构建Markdown文本
markdown = f"""### {symbol} 交易策略
**建议操作**: {position_icon} {position}
**风险等级**: {risk_icon} {risk_level}
**策略类型**: {strategy_type}
**时间框架**: {time_frame}
**入场点**: {entry_points_str}
**出场点**: {exit_points_str}
**止损位**: {stop_loss}
**止盈位**: {take_profit}
**策略理由**:
{reasoning}
*策略生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return markdown
except Exception as e:
print(f"格式化策略结果时出错: {e}")
traceback.print_exc()
return f"### {symbol} 格式化策略结果出错\n\n{str(e)}"
def send_analysis_report(self, symbol: str, analysis_data: Dict[str, Any]) -> Dict[str, Any]:
"""
发送完整分析报告分析+预测+策略
Args:
symbol: 交易对符号
analysis_data: 包含分析预测和策略的数据
Returns:
接口返回结果
"""
try:
analysis_result = analysis_data.get('analysis', {})
prediction_result = analysis_data.get('prediction', {})
strategy_result = analysis_data.get('strategy', {})
# 获取市场趋势,用于设置标题图标
market_trend = ''
if analysis_result and 'market_trend' in analysis_result:
market_trend = analysis_result['market_trend']
if '' in str(market_trend) or 'bull' in str(market_trend).lower():
title_icon = "🟢"
elif '' in str(market_trend) or 'bear' in str(market_trend).lower():
title_icon = "🔴"
else:
title_icon = "🟡"
# 获取建议操作
position = '未知'
if strategy_result and 'position' in strategy_result:
position = strategy_result['position']
# 构建标题
title = f"{title_icon} {symbol} 加密货币分析报告 | 建议: {position}"
# 构建完整报告内容
markdown_text = f"# {symbol} 加密货币AI分析报告\n\n"
# 添加分析结果
markdown_text += "## 一、市场分析\n\n"
markdown_text += self.format_analysis_result(symbol, analysis_result)
markdown_text += "\n\n"
# 添加预测结果
markdown_text += "## 二、价格预测\n\n"
markdown_text += self.format_prediction_result(symbol, prediction_result)
markdown_text += "\n\n"
# 添加策略结果
markdown_text += "## 三、交易策略\n\n"
markdown_text += self.format_strategy_result(symbol, strategy_result)
# 发送Markdown消息
return self.send_markdown(title, markdown_text)
except Exception as e:
print(f"发送分析报告时出错: {e}")
traceback.print_exc()
error_msg = f"### {symbol} 分析报告生成错误\n\n{str(e)}"
return self.send_markdown(f"⚠️ {symbol} 分析报告错误", error_msg)