diff --git a/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc b/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc index 6900ef4..6f5b755 100644 Binary files a/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc and b/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc differ diff --git a/cryptoai/agents/crypto_agent.py b/cryptoai/agents/crypto_agent.py index 209918d..aab67de 100644 --- a/cryptoai/agents/crypto_agent.py +++ b/cryptoai/agents/crypto_agent.py @@ -2,7 +2,7 @@ import os import sys import pandas as pd import numpy as np -from typing import Dict, Any, List, Optional +from typing import Dict, Any, List, Optional, Tuple import datetime import time import json @@ -76,9 +76,6 @@ class CryptoAgent: # 支持的策略 self.strategies = self.agent_config['strategies'] - - # 分析结果缓存 - self.analysis_cache = {} def fetch_historical_data(self, symbol: str, days: int = 30) -> pd.DataFrame: """ @@ -136,151 +133,56 @@ class CryptoAgent: return processed_data - def analyze_market(self, symbol: str, data: pd.DataFrame = None) -> Dict[str, Any]: + def _format_market_data(self, market_data: Dict[str, Any]) -> str: """ - 分析市场 + 格式化市场数据为适合大模型的格式 Args: - symbol: 交易对符号,例如 'BTCUSDT' - data: 要分析的数据,如果为None,则获取最新数据 + market_data: 市场数据 Returns: - 市场分析结果 + 格式化的数据字符串 """ - print(f"分析{symbol}的市场情况...") - - if data is None: - # 尝试加载最新的处理后数据 - latest_data_file = self.data_processor.get_latest_data_file(symbol, data_type='processed') - - if latest_data_file: - data = self.data_processor.load_data(latest_data_file) - else: - # 如果没有找到处理后的数据,则获取新数据 - raw_data = self.fetch_historical_data(symbol, days=self.data_config['historical_days']) - data = self.process_data(symbol, raw_data) - - if data.empty: - print(f"没有可用的{symbol}数据进行分析") - return {} - - # 准备市场数据 - market_data = { - "symbol": symbol, - "current_price": float(data['close'].iloc[-1]), - "price_change_24h": float(data['close'].iloc[-1] - data['close'].iloc[-24]), - "price_change_percentage_24h": float((data['close'].iloc[-1] - data['close'].iloc[-24]) / data['close'].iloc[-24] * 100), - "historical_prices": data['close'].tail(100).tolist(), - "volumes": data['volume'].tail(100).tolist(), - "technical_indicators": { - "rsi": float(data['RSI'].iloc[-1]), - "macd": float(data['MACD'].iloc[-1]), - "macd_signal": float(data['MACD_Signal'].iloc[-1]), - "bollinger_upper": float(data['Bollinger_Upper'].iloc[-1]), - "bollinger_lower": float(data['Bollinger_Lower'].iloc[-1]), - "ma5": float(data['MA5'].iloc[-1]), - "ma10": float(data['MA10'].iloc[-1]), - "ma20": float(data['MA20'].iloc[-1]), - "ma50": float(data['MA50'].iloc[-1]), - "atr": float(data['ATR'].iloc[-1]) - } - } - - # 获取当前市场订单簿 - order_book = self.binance_api.get_order_book(symbol, limit=10) - - if order_book: - market_data["order_book"] = { - "bids": order_book.get('bids', [])[:5], - "asks": order_book.get('asks', [])[:5] - } - - # 使用DeepSeek进行市场分析 - analysis_result = self.deepseek_api.analyze_market_data(market_data) - - # 缓存分析结果 - self.analysis_cache[symbol] = analysis_result - - return analysis_result + # 这里可以根据实际情况调整格式化方式 + return json.dumps(market_data, indent=2) - def predict_price(self, symbol: str) -> Dict[str, Any]: + def _build_market_analysis_prompt(self, formatted_data: str) -> str: """ - 预测价格 + 构建市场分析提示词 Args: - symbol: 交易对符号,例如 'BTCUSDT' + formatted_data: 格式化的市场数据 Returns: - 价格预测结果 + 提示词 """ - print(f"预测{symbol}的价格趋势...") - - # 尝试加载最新的处理后数据 - latest_data_file = self.data_processor.get_latest_data_file(symbol, data_type='processed') - - if not latest_data_file: - # 如果没有找到处理后的数据,则获取新数据 - raw_data = self.fetch_historical_data(symbol, days=self.data_config['historical_days']) - data = self.process_data(symbol, raw_data) - else: - data = self.data_processor.load_data(latest_data_file) - - if data.empty: - print(f"没有可用的{symbol}数据进行预测") - return {} - - # 准备历史数据 - historical_data = { - "symbol": symbol, - "current_price": float(data['close'].iloc[-1]), - "price_history": data[['open', 'high', 'low', 'close']].tail(90).to_dict(orient='records'), - "volume_history": data['volume'].tail(90).tolist(), - "technical_indicators": { - "rsi_history": data['RSI'].tail(30).tolist(), - "macd_history": data['MACD'].tail(30).tolist(), - "ma5_history": data['MA5'].tail(30).tolist(), - "ma20_history": data['MA20'].tail(30).tolist(), - "ma50_history": data['MA50'].tail(30).tolist() - } - } - - # 使用DeepSeek进行价格预测 - prediction_result = self.deepseek_api.predict_price_trend(symbol, historical_data) - - return prediction_result - - def generate_strategy(self, symbol: str, analysis_result: Dict[str, Any] = None) -> Dict[str, Any]: - """ - 生成交易策略 - - Args: - symbol: 交易对符号,例如 'BTCUSDT' - analysis_result: 分析结果,如果为None,则使用缓存或重新分析 - - Returns: - 交易策略 - """ - print(f"为{symbol}生成交易策略...") - - if analysis_result is None: - # 使用缓存的分析结果或重新分析 - if symbol in self.analysis_cache: - analysis_result = self.analysis_cache[symbol] - else: - analysis_result = self.analyze_market(symbol) - - if not analysis_result: - print(f"没有可用的{symbol}分析结果来生成策略") - return {} - - # 使用DeepSeek生成交易策略 - strategy = self.deepseek_api.generate_trading_strategy( - symbol=symbol, - analysis_result=analysis_result, - risk_level=self.risk_level - ) - - return strategy + return f"""请对以下加密货币市场K线数据进行深入分析,并给出交易建议。请使用中文回复。 + +数据: +{formatted_data} + +请严格按照以下步骤分析: +1. 分析主要技术指标(包括RSI、MACD、布林带、均线等),判断当前市场趋势 +2. 基于K线数据和斐波那契回调水平,确定关键支撑位和压力位 +3. 根据技术分析给出清晰的操作建议 +4. 评估当前操作建议的紧迫性 + +请以JSON格式回复,仅包含以下字段: +- market_trend: 市场趋势 (牛市, 熊市, 震荡) +- technical_analysis: 技术指标详细分析 (重点分析RSI、MACD、布林带、均线交叉等情况) +- support_levels: 基于斐波那契回调的支撑位列表(标明各个支撑位对应的斐波那契水平,例如0.382、0.5、0.618等) +- resistance_levels: 基于斐波那契回调的阻力位列表(同样标明对应水平) +- volume_analysis: 交易量分析,重点关注量价关系 +- recommendation: 操作建议 (买入、卖出或等待) +- entry_points: 推荐入场点位 +- exit_points: 推荐出场点位 +- stop_loss: 建议止损位 +- take_profit: 建议止盈位 +- urgency_level: 操作紧迫性评级 (1-5,1为最低紧迫性,5为最高紧迫性) +- urgency_reason: 紧迫性评级的原因说明 +- summary: 分析总结(不超过50字) + +请确保回复为有效的JSON格式,分析要精准专业。""" def analyze_all_symbols(self) -> Dict[str, Dict[str, Any]]: """ @@ -300,20 +202,48 @@ class CryptoAgent: if not raw_data.empty: processed_data = self.process_data(symbol, raw_data) - # 分析市场 - analysis_result = self.analyze_market(symbol, processed_data) + # 准备市场数据 + market_data = { + "symbol": symbol, + "current_price": float(processed_data['close'].iloc[-1]), + "price_change_24h": float(processed_data['close'].iloc[-1] - processed_data['close'].iloc[-24]), + "price_change_percentage_24h": float((processed_data['close'].iloc[-1] - processed_data['close'].iloc[-24]) / processed_data['close'].iloc[-24] * 100), + "historical_prices": processed_data['close'].tail(100).tolist(), + "volumes": processed_data['volume'].tail(100).tolist(), + "technical_indicators": { + "rsi": float(processed_data['RSI'].iloc[-1]), + "macd": float(processed_data['MACD'].iloc[-1]), + "macd_signal": float(processed_data['MACD_Signal'].iloc[-1]), + "bollinger_upper": float(processed_data['Bollinger_Upper'].iloc[-1]), + "bollinger_lower": float(processed_data['Bollinger_Lower'].iloc[-1]), + "ma5": float(processed_data['MA5'].iloc[-1]), + "ma10": float(processed_data['MA10'].iloc[-1]), + "ma20": float(processed_data['MA20'].iloc[-1]), + "ma50": float(processed_data['MA50'].iloc[-1]), + "atr": float(processed_data['ATR'].iloc[-1]) + }, + "klines": processed_data[['open', 'high', 'low', 'close', 'volume']].tail(30).to_dict('records') + } - # 预测价格 - prediction_result = self.predict_price(symbol) + # 将市场数据格式化为适合大模型的格式 + formatted_data = self._format_market_data(market_data) - # 生成策略 - strategy = self.generate_strategy(symbol, analysis_result) + # 构建提示词 + prompt = self._build_market_analysis_prompt(formatted_data) + + # 调用API获取分析和交易建议 + response, usage = self.deepseek_api.call_model(prompt, task_type="交易分析", symbol=symbol) + + # 解析响应 + analysis_result = self.deepseek_api.extract_json_from_response(response) + + # 添加token使用信息 + if usage: + analysis_result["_token_usage"] = usage # 整合结果 results[symbol] = { "analysis": analysis_result, - "prediction": prediction_result, - "strategy": strategy, "timestamp": datetime.now().isoformat() } @@ -335,137 +265,35 @@ class CryptoAgent: return results - def execute_strategy(self, symbol: str, strategy: Dict[str, Any]) -> Dict[str, Any]: + def send_notifications(self, symbol: str, analysis_data: Dict[str, Any]) -> bool: """ - 执行交易策略 - - Args: - symbol: 交易对符号,例如 'BTCUSDT' - strategy: 交易策略 - - Returns: - 执行结果 - """ - if not strategy or 'position' not in strategy: - print(f"无法执行{symbol}的策略,策略数据不完整") - return {"status": "error", "message": "策略数据不完整"} - - print(f"执行{symbol}的交易策略...") - - # 获取当前价格 - current_price_info = self.binance_api.get_ticker(symbol) - - if not current_price_info: - print(f"无法获取{symbol}的当前价格") - return {"status": "error", "message": "无法获取当前价格"} - - current_price = float(current_price_info.get('price', 0)) - - # 根据策略决定操作 - position = strategy.get('position', '').upper() - - result = { - "symbol": symbol, - "strategy": strategy, - "current_price": current_price, - "timestamp": datetime.now().isoformat() - } - - # 简单策略执行逻辑 - if position == 'BUY' or '买' in position: - # 示例:下单买入 - # 在实际应用中,这里应该有更复杂的仓位管理和风险管理逻辑 - quantity = 0.01 # 示例数量,实际应用中应该基于资金和风险计算 - - order_result = self.binance_api.place_order( - symbol=symbol, - side='BUY', - type='MARKET', - quantity=quantity - ) - - result["action"] = "BUY" - result["order_result"] = order_result - - # 如果钉钉机器人已启用,发送交易通知 - if self.dingtalk_bot: - self.send_trade_notification(symbol, "买入", quantity, current_price, strategy) - - elif position == 'SELL' or '卖' in position: - # 示例:下单卖出 - quantity = 0.01 # 示例数量 - - order_result = self.binance_api.place_order( - symbol=symbol, - side='SELL', - type='MARKET', - quantity=quantity - ) - - result["action"] = "SELL" - result["order_result"] = order_result - - # 如果钉钉机器人已启用,发送交易通知 - if self.dingtalk_bot: - self.send_trade_notification(symbol, "卖出", quantity, current_price, strategy) - - else: - # 持有或其他策略 - result["action"] = "HOLD" - result["message"] = "根据策略保持当前仓位" - - print(f"执行{symbol}策略完成:{result['action']}") - return result - - def send_trade_notification(self, symbol: str, action: str, quantity: float, price: float, strategy: Dict[str, Any]) -> None: - """ - 发送交易通知到钉钉 + 发送分析结果通知 Args: symbol: 交易对符号 - action: 交易操作(买入或卖出) - quantity: 交易数量 - price: 交易价格 - strategy: 交易策略 + analysis_data: 分析数据 + + Returns: + 发送是否成功 """ if not self.dingtalk_bot: - return + print(f"钉钉通知未启用,跳过发送 {symbol} 的分析结果") + return False try: - # 设置图标 - if action == "买入": - icon = "🟢" + # 使用已初始化的钉钉机器人实例发送完整分析报告 + response = self.dingtalk_bot.send_analysis_report(symbol, analysis_data) + + if response.get('errcode') == 0: + print(f"成功发送 {symbol} 分析结果到钉钉") + return True else: - icon = "🔴" - - # 获取策略理由 - reasoning = strategy.get('reasoning', '无理由') - - # 构建通知内容 - title = f"{icon} {symbol} {action}交易执行通知" - text = f"""### {symbol} {action}交易已执行 - -**交易详情**: -- 操作: {icon} **{action}** -- 数量: {quantity} -- 价格: {price} -- 总额: {quantity * price} {self.quote_currency} - -**交易理由**: -{reasoning} - -*交易时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* - """ - - # 发送通知 - at_mobiles = self.dingtalk_config.get('at_mobiles', []) - at_all = self.dingtalk_config.get('at_all', False) - self.dingtalk_bot.send_markdown(title, text, at_mobiles, at_all) - - print(f"{symbol} {action}交易通知已发送") - + print(f"发送 {symbol} 分析结果到钉钉失败: {response}") + return False + except Exception as e: - print(f"发送交易通知时出错: {e}") + print(f"发送钉钉通知时出错: {e}") + return False def run_analysis_cycle(self) -> Dict[str, Any]: """ @@ -562,34 +390,4 @@ class CryptoAgent: import traceback traceback.print_exc() # 发生错误时也尝试导出token使用情况 - self._export_token_usage() - - def send_notifications(self, symbol: str, analysis_data: Dict[str, Any]) -> bool: - """ - 发送分析结果通知 - - Args: - symbol: 交易对符号 - analysis_data: 分析数据 - - Returns: - 发送是否成功 - """ - if not self.dingtalk_bot: - print(f"钉钉通知未启用,跳过发送 {symbol} 的分析结果") - return False - - try: - # 使用已初始化的钉钉机器人实例发送完整分析报告 - response = self.dingtalk_bot.send_analysis_report(symbol, analysis_data) - - if response.get('errcode') == 0: - print(f"成功发送 {symbol} 分析结果到钉钉") - return True - else: - print(f"发送 {symbol} 分析结果到钉钉失败: {response}") - return False - - except Exception as e: - print(f"发送钉钉通知时出错: {e}") - return False \ No newline at end of file + self._export_token_usage() \ No newline at end of file diff --git a/cryptoai/api/__pycache__/deepseek_api.cpython-313.pyc b/cryptoai/api/__pycache__/deepseek_api.cpython-313.pyc index dcf42c5..ea26e90 100644 Binary files a/cryptoai/api/__pycache__/deepseek_api.cpython-313.pyc and b/cryptoai/api/__pycache__/deepseek_api.cpython-313.pyc differ diff --git a/cryptoai/api/deepseek_api.py b/cryptoai/api/deepseek_api.py index ea5f75a..729211f 100644 --- a/cryptoai/api/deepseek_api.py +++ b/cryptoai/api/deepseek_api.py @@ -17,7 +17,7 @@ logging.basicConfig( ) class DeepSeekAPI: - """DeepSeek API交互类,用于进行市场分析和预测""" + """DeepSeek API交互类,用于进行大语言模型调用""" def __init__(self, api_key: str, model: str = "deepseek-moe-16b-chat"): """ @@ -46,89 +46,112 @@ class DeepSeekAPI: # 创建日志记录器 self.logger = logging.getLogger("DeepSeekAPI") - def analyze_market_data(self, market_data: Dict[str, Any]) -> Dict[str, Any]: + def call_model(self, prompt: str, system_prompt: str = None, task_type: str = "未知任务", symbol: str = "未知", temperature: float = 0.2, max_tokens: int = 2000) -> Tuple[Dict[str, Any], Dict[str, Any]]: """ - 分析市场数据 + 调用DeepSeek大语言模型 Args: - market_data: 包含市场数据的字典,例如价格、交易量等 + prompt: 用户提示词 + system_prompt: 系统提示词,如果为None则使用默认值 + task_type: 任务类型,用于记录 + symbol: 交易对符号,用于记录 + temperature: 采样温度,控制输出随机性 + max_tokens: 最大生成token数 Returns: - 分析结果 + (API响应, token使用信息) """ - # 将市场数据格式化为适合大模型的格式 - formatted_data = self._format_market_data(market_data) + if system_prompt is None: + system_prompt = "你是一个专业的加密货币分析助手,擅长分析市场趋势、预测价格走向和提供交易建议。请始终使用中文回复,并确保输出格式规范的JSON。" + + usage_info = {} - # 构建提示词 - prompt = self._build_market_analysis_prompt(formatted_data) + try: + endpoint = f"{self.base_url}/chat/completions" + + payload = { + "model": self.model, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt} + ], + "temperature": temperature, + "max_tokens": max_tokens + } + + start_time = time.time() + response = requests.post(endpoint, headers=self.headers, json=payload) + response.raise_for_status() + response_data = response.json() + end_time = time.time() + + # 记录token使用情况 + if 'usage' in response_data: + prompt_tokens = response_data['usage'].get('prompt_tokens', 0) + completion_tokens = response_data['usage'].get('completion_tokens', 0) + total_tokens = response_data['usage'].get('total_tokens', 0) + + usage_info = { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + "task_type": task_type, + "symbol": symbol, + "model": self.model, + "timestamp": datetime.datetime.now().isoformat(), + "duration_seconds": round(end_time - start_time, 2) + } + + # 更新总计 + self.token_usage["total_prompt_tokens"] += prompt_tokens + self.token_usage["total_completion_tokens"] += completion_tokens + self.token_usage["total_tokens"] += total_tokens + self.token_usage["calls"].append(usage_info) + + # 记录到日志 + self.logger.info( + f"DeepSeek API调用 - 任务: {task_type}, 符号: {symbol}, " + f"输入tokens: {prompt_tokens}, 输出tokens: {completion_tokens}, " + f"总tokens: {total_tokens}, 耗时: {round(end_time - start_time, 2)}秒" + ) + + return response_data, usage_info - # 调用API获取分析 - response, usage = self._call_api(prompt, task_type="市场分析", symbol=market_data.get("symbol", "未知")) - - # 解析响应 - result = self._parse_analysis_response(response) - - # 添加token使用信息 - if usage: - result["_token_usage"] = usage - - return result + except Exception as e: + error_msg = f"调用DeepSeek API时出错: {e}" + self.logger.error(error_msg) + return {}, usage_info - def predict_price_trend(self, symbol: str, historical_data: Dict[str, Any]) -> Dict[str, Any]: + def extract_json_from_response(self, response: Dict[str, Any]) -> Dict[str, Any]: """ - 预测价格趋势 + 从响应中提取JSON数据 Args: - symbol: 交易对符号,例如 'BTCUSDT' - historical_data: 历史数据 + response: API响应 Returns: - 预测结果 + 提取的JSON数据 """ - # 格式化历史数据 - formatted_data = self._format_historical_data(symbol, historical_data) - - # 构建提示词 - prompt = self._build_price_prediction_prompt(symbol, formatted_data) - - # 调用API获取预测 - response, usage = self._call_api(prompt, task_type="价格预测", symbol=symbol) - - # 解析响应 - result = self._parse_prediction_response(response) - - # 添加token使用信息 - if usage: - result["_token_usage"] = usage + try: + if 'choices' in response and len(response['choices']) > 0: + content = response['choices'][0]['message']['content'] + + # 尝试从响应中提取JSON + start_idx = content.find('{') + end_idx = content.rfind('}') + 1 + + if start_idx != -1 and end_idx != -1: + json_str = content[start_idx:end_idx] + return json.loads(json_str) + + return {"error": "无法从响应中提取JSON", "raw_content": content} - return result - - def generate_trading_strategy(self, symbol: str, analysis_result: Dict[str, Any], risk_level: str) -> Dict[str, Any]: - """ - 生成交易策略 + return {"error": "API响应格式不正确", "raw_response": response} - Args: - symbol: 交易对符号,例如 'BTCUSDT' - analysis_result: 分析结果 - risk_level: 风险等级,'low', 'medium', 'high' - - Returns: - 交易策略 - """ - # 构建提示词 - prompt = self._build_trading_strategy_prompt(symbol, analysis_result, risk_level) - - # 调用API获取策略 - response, usage = self._call_api(prompt, task_type="交易策略", symbol=symbol) - - # 解析响应 - result = self._parse_strategy_response(response) - - # 添加token使用信息 - if usage: - result["_token_usage"] = usage - - return result + except Exception as e: + error_msg = f"解析响应时出错: {e}" + self.logger.error(error_msg) + return {"error": str(e), "raw_response": response} def get_token_usage_stats(self) -> Dict[str, Any]: """ @@ -209,255 +232,4 @@ class DeepSeekAPI: except Exception as e: error_msg = f"导出Token使用数据时出错: {e}" self.logger.error(error_msg) - return "" - - def _call_api(self, prompt: str, task_type: str = "未知任务", symbol: str = "未知") -> Tuple[Dict[str, Any], Dict[str, Any]]: - """ - 调用DeepSeek API - - Args: - prompt: 提示词 - task_type: 任务类型 - symbol: 交易对符号 - - Returns: - (API响应, token使用信息) - """ - usage_info = {} - - try: - endpoint = f"{self.base_url}/chat/completions" - - payload = { - "model": self.model, - "messages": [ - {"role": "system", "content": "你是一个专业的加密货币分析助手,擅长分析市场趋势、预测价格走向和提供交易建议。请始终使用中文回复,并确保输出格式规范的JSON。"}, - {"role": "user", "content": prompt} - ], - "temperature": 0.2, # 低温度使输出更加确定性 - "max_tokens": 2000 - } - - start_time = time.time() - response = requests.post(endpoint, headers=self.headers, json=payload) - response.raise_for_status() - response_data = response.json() - end_time = time.time() - - # 记录token使用情况 - if 'usage' in response_data: - prompt_tokens = response_data['usage'].get('prompt_tokens', 0) - completion_tokens = response_data['usage'].get('completion_tokens', 0) - total_tokens = response_data['usage'].get('total_tokens', 0) - - usage_info = { - "prompt_tokens": prompt_tokens, - "completion_tokens": completion_tokens, - "total_tokens": total_tokens, - "task_type": task_type, - "symbol": symbol, - "model": self.model, - "timestamp": datetime.datetime.now().isoformat(), - "duration_seconds": round(end_time - start_time, 2) - } - - # 更新总计 - self.token_usage["total_prompt_tokens"] += prompt_tokens - self.token_usage["total_completion_tokens"] += completion_tokens - self.token_usage["total_tokens"] += total_tokens - self.token_usage["calls"].append(usage_info) - - # 记录到日志 - self.logger.info( - f"DeepSeek API调用 - 任务: {task_type}, 符号: {symbol}, " - f"输入tokens: {prompt_tokens}, 输出tokens: {completion_tokens}, " - f"总tokens: {total_tokens}, 耗时: {round(end_time - start_time, 2)}秒" - ) - - return response_data, usage_info - - except Exception as e: - error_msg = f"调用DeepSeek API时出错: {e}" - self.logger.error(error_msg) - return {}, usage_info - - def _format_market_data(self, market_data: Dict[str, Any]) -> str: - """ - 格式化市场数据为适合大模型的格式 - - Args: - market_data: 市场数据 - - Returns: - 格式化的数据字符串 - """ - # 这里可以根据实际情况调整格式化方式 - return json.dumps(market_data, indent=2) - - def _format_historical_data(self, symbol: str, historical_data: Dict[str, Any]) -> str: - """ - 格式化历史数据为适合大模型的格式 - - Args: - symbol: 交易对符号 - historical_data: 历史数据 - - Returns: - 格式化的数据字符串 - """ - # 可以根据实际情况调整格式化方式 - return json.dumps(historical_data, indent=2) - - def _build_market_analysis_prompt(self, formatted_data: str) -> str: - """ - 构建市场分析提示词 - - Args: - formatted_data: 格式化的市场数据 - - Returns: - 提示词 - """ - return f"""请分析以下加密货币市场数据,并提供详细的市场分析。请使用中文回复。 - -数据: -{formatted_data} - -请包括以下内容: -1. 市场总体趋势 -2. 主要支撑位和阻力位 -3. 交易量分析 -4. 市场情绪评估 -5. 关键技术指标解读(如RSI、MACD等) - -请以JSON格式回复,包含以下字段: -- market_trend: 市场趋势 (牛市, 熊市, 震荡) -- support_levels: 支撑位列表 -- resistance_levels: 阻力位列表 -- volume_analysis: 交易量分析 -- market_sentiment: 市场情绪 -- technical_indicators: 技术指标分析 -- summary: 总结 - -请确保回复为有效的JSON格式,并使用中文进行分析。""" - - def _build_price_prediction_prompt(self, symbol: str, formatted_data: str) -> str: - """ - 构建价格预测提示词 - - Args: - symbol: 交易对符号 - formatted_data: 格式化的历史数据 - - Returns: - 提示词 - """ - return f"""请基于以下{symbol}的历史数据,预测未来24小时、7天和30天的价格走势。请使用中文回复。 - -历史数据: -{formatted_data} - -请考虑市场趋势、技术指标、历史模式和当前市场情况,提供详细的预测分析。 - -请以JSON格式回复,包含以下字段: -- symbol: 交易对符号 -- current_price: 当前价格 -- prediction_24h: 24小时预测 (包含 price_range价格区间, trend趋势, confidence置信度) -- prediction_7d: 7天预测 (包含 price_range价格区间, trend趋势, confidence置信度) -- prediction_30d: 30天预测 (包含 price_range价格区间, trend趋势, confidence置信度) -- key_factors: 影响预测的关键因素 -- risk_assessment: 风险评估 - -请确保回复为有效的JSON格式,并使用中文进行分析。""" - - def _build_trading_strategy_prompt(self, symbol: str, analysis_result: Dict[str, Any], risk_level: str) -> str: - """ - 构建交易策略提示词 - - Args: - symbol: 交易对符号 - analysis_result: 分析结果 - risk_level: 风险等级 - - Returns: - 提示词 - """ - analysis_json = json.dumps(analysis_result, indent=2) - - return f"""请基于以下{symbol}的市场分析结果,生成一个风险等级为{risk_level}的交易策略。请使用中文回复。 - -分析结果: -{analysis_json} - -请考虑市场趋势、技术指标、风险等级和当前市场情况,提供详细的交易策略。 - -请以JSON格式回复,包含以下字段: -- symbol: 交易对符号 -- risk_level: 风险等级 (low低风险, medium中风险, high高风险) -- position: 建议仓位 (买入、卖出、持有) -- entry_points: 入场点列表 -- exit_points: 出场点列表 -- stop_loss: 止损位 -- take_profit: 止盈位 -- time_frame: 建议的交易时间框架 -- strategy_type: 策略类型 (例如:趋势跟踪、反转、突破等) -- reasoning: 策略推理过程 - -请确保回复为有效的JSON格式,并使用中文进行分析。""" - - def _parse_analysis_response(self, response: Dict[str, Any]) -> Dict[str, Any]: - """ - 解析分析响应 - - Args: - response: API响应 - - Returns: - 解析后的分析结果 - """ - try: - if 'choices' in response and len(response['choices']) > 0: - content = response['choices'][0]['message']['content'] - - # 尝试从响应中提取JSON - start_idx = content.find('{') - end_idx = content.rfind('}') + 1 - - if start_idx != -1 and end_idx != -1: - json_str = content[start_idx:end_idx] - return json.loads(json_str) - - return {"error": "无法从响应中提取JSON", "raw_content": content} - - return {"error": "API响应格式不正确", "raw_response": response} - - except Exception as e: - error_msg = f"解析分析响应时出错: {e}" - self.logger.error(error_msg) - return {"error": str(e), "raw_response": response} - - def _parse_prediction_response(self, response: Dict[str, Any]) -> Dict[str, Any]: - """ - 解析预测响应 - - Args: - response: API响应 - - Returns: - 解析后的预测结果 - """ - # 与_parse_analysis_response相同的实现 - return self._parse_analysis_response(response) - - def _parse_strategy_response(self, response: Dict[str, Any]) -> Dict[str, Any]: - """ - 解析策略响应 - - Args: - response: API响应 - - Returns: - 解析后的策略结果 - """ - # 与_parse_analysis_response相同的实现 - return self._parse_analysis_response(response) \ No newline at end of file + return "" \ No newline at end of file diff --git a/cryptoai/config/config.yaml b/cryptoai/config/config.yaml index 278a98b..722c953 100644 --- a/cryptoai/config/config.yaml +++ b/cryptoai/config/config.yaml @@ -12,11 +12,11 @@ deepseek: # 加密货币设置 crypto: base_currencies: - # - "BTC" + - "BTC" # - "ETH" # - "BNB" # - "SOL" - - "SUI" + # - "SUI" quote_currency: "USDT" time_interval: "4h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d diff --git a/cryptoai/models/__pycache__/data_processor.cpython-313.pyc b/cryptoai/models/__pycache__/data_processor.cpython-313.pyc index af93817..6c33b70 100644 Binary files a/cryptoai/models/__pycache__/data_processor.cpython-313.pyc and b/cryptoai/models/__pycache__/data_processor.cpython-313.pyc differ diff --git a/cryptoai/models/data_processor.py b/cryptoai/models/data_processor.py index 6b6e08a..f22fbf1 100644 --- a/cryptoai/models/data_processor.py +++ b/cryptoai/models/data_processor.py @@ -45,8 +45,8 @@ class DataProcessor: data = data[columns_to_keep] # 检查并处理缺失值 - data.fillna(method='ffill', inplace=True) # 前向填充 - data.fillna(method='bfill', inplace=True) # 后向填充 + data.bfill(inplace=True) # 前向填充 + data.bfill(inplace=True) # 后向填充 # 添加技术指标 data = self.add_technical_indicators(data) @@ -107,7 +107,7 @@ class DataProcessor: df['ATR'] = true_range.rolling(14).mean() # 填充计算指标产生的NaN值 - df.fillna(method='bfill', inplace=True) + df.bfill(inplace=True) return df diff --git a/cryptoai/utils/dingtalk_bot.py b/cryptoai/utils/dingtalk_bot.py index 8265aff..4c7120d 100644 --- a/cryptoai/utils/dingtalk_bot.py +++ b/cryptoai/utils/dingtalk_bot.py @@ -170,58 +170,63 @@ class DingTalkBot: # 提取关键信息 market_trend = analysis_result.get('market_trend', '未知') + + # 支撑位和阻力位 support_levels = analysis_result.get('support_levels', []) if isinstance(support_levels, list): - support_levels_str = '、'.join([str(level) for level in support_levels]) + support_levels_str = '\n'.join([f"- {level}" for level in support_levels]) else: support_levels_str = str(support_levels) resistance_levels = analysis_result.get('resistance_levels', []) if isinstance(resistance_levels, list): - resistance_levels_str = '、'.join([str(level) for level in resistance_levels]) + resistance_levels_str = '\n'.join([f"- {level}" for level in resistance_levels]) else: resistance_levels_str = str(resistance_levels) - - # 格式化交易量分析和市场情绪 + + # 技术指标分析 + technical_analysis = analysis_result.get('technical_analysis', '未知') + if isinstance(technical_analysis, (dict, list)): + technical_analysis = self._format_complex_content(technical_analysis) + + # 交易量分析 volume_analysis = analysis_result.get('volume_analysis', '未知') if isinstance(volume_analysis, (dict, list)): volume_analysis = self._format_complex_content(volume_analysis) - market_sentiment = analysis_result.get('market_sentiment', '未知') - if isinstance(market_sentiment, (dict, list)): - market_sentiment = self._format_complex_content(market_sentiment) + # 操作建议 + recommendation = analysis_result.get('recommendation', '未知') + entry_points = analysis_result.get('entry_points', []) + if isinstance(entry_points, list): + entry_points_str = '、'.join([str(point) for point in entry_points]) + else: + entry_points_str = str(entry_points) + + exit_points = analysis_result.get('exit_points', []) + if isinstance(exit_points, list): + exit_points_str = '、'.join([str(point) for point in exit_points]) + else: + exit_points_str = str(exit_points) - # 处理总结字段 - 检查是否为字符串形式的JSON并尝试解析 + stop_loss = analysis_result.get('stop_loss', '未知') + take_profit = analysis_result.get('take_profit', '未知') + + # 紧迫性评级 + urgency_level = analysis_result.get('urgency_level', 0) + urgency_reason = analysis_result.get('urgency_reason', '未知') + + # 紧迫性等级图标 + if int(urgency_level) >= 4: + urgency_icon = "🔴" + elif int(urgency_level) >= 2: + urgency_icon = "🟡" + else: + urgency_icon = "🟢" + + # 总结 summary = analysis_result.get('summary', '无摘要') - if isinstance(summary, str) and (summary.startswith('{') or summary.startswith('{')): - try: - # 尝试解析JSON字符串 - summary_json = json.loads(summary) - formatted_summary = "" - - # 添加概述 - if 'overview' in summary_json: - formatted_summary += f"**总体概述**:{summary_json['overview']}\n\n" - - # 添加关键点 - if 'key_points' in summary_json and isinstance(summary_json['key_points'], list): - formatted_summary += "**关键点**:\n" - for point in summary_json['key_points']: - formatted_summary += f"- {point}\n" - formatted_summary += "\n" - - # 添加交易建议 - if 'trading_suggestion' in summary_json: - formatted_summary += f"**交易建议**:{summary_json['trading_suggestion']}" - - summary = formatted_summary - except Exception as e: - # 如果解析失败,保留原始内容 - print(f"解析summary JSON时出错: {e}") - elif isinstance(summary, (dict, list)): - summary = self._format_complex_content(summary) - # 根据市场趋势设置颜色标志 + # 根据市场趋势设置图标 if '牛' in str(market_trend) or 'bull' in str(market_trend).lower(): trend_icon = "🟢" elif '熊' in str(market_trend) or 'bear' in str(market_trend).lower(): @@ -234,18 +239,33 @@ class DingTalkBot: **市场趋势**: {market_trend} -**支撑位**: {support_levels_str} - -**阻力位**: {resistance_levels_str} +**技术指标分析**: +{technical_analysis} **交易量分析**: {volume_analysis} -**市场情绪**: -{market_sentiment} +**支撑位(斐波那契)**: +{support_levels_str} -**总结**: -{summary} +**阻力位(斐波那契)**: +{resistance_levels_str} + +**操作建议**: {recommendation} + +**入场点位**: {entry_points_str} + +**出场点位**: {exit_points_str} + +**止损位**: {stop_loss} + +**止盈位**: {take_profit} + +**操作紧迫性**: {urgency_icon} {urgency_level}/5 + +**紧迫性原因**: {urgency_reason} + +**总结**: {summary} *分析时间: {time.strftime('%Y-%m-%d %H:%M:%S')}* """ @@ -273,57 +293,47 @@ class DingTalkBot: # 提取关键信息 current_price = prediction_result.get('current_price', '未知') - prediction_24h = prediction_result.get('prediction_24h', {}) - prediction_7d = prediction_result.get('prediction_7d', {}) - prediction_30d = prediction_result.get('prediction_30d', {}) - # 处理关键影响因素 + # 24小时预测 + prediction_24h = prediction_result.get('prediction_24h', {}) + price_range_24h = prediction_24h.get('price_range', '未知') + trend_24h = prediction_24h.get('trend', '未知') + + # 7天预测 + prediction_7d = prediction_result.get('prediction_7d', {}) + price_range_7d = prediction_7d.get('price_range', '未知') + trend_7d = prediction_7d.get('trend', '未知') + + # 关键因素 key_factors = prediction_result.get('key_factors', []) if isinstance(key_factors, list): - key_factors_str = '\n'.join([f"- {factor}" for factor in key_factors]) + key_factors_text = '\n'.join([f"- {factor}" for factor in key_factors]) else: - key_factors_str = self._format_complex_content(key_factors) - - # 处理风险评估 - risk_assessment = prediction_result.get('risk_assessment', '未知') - if isinstance(risk_assessment, (dict, list)): - risk_assessment = self._format_complex_content(risk_assessment) + key_factors_text = str(key_factors) - # 格式化预测数据 - def format_prediction(pred_data): - if not pred_data: - return "无数据" - - price_range = pred_data.get('price_range', '未知') - trend = pred_data.get('trend', '未知') - confidence = pred_data.get('confidence', '未知') - - # 根据趋势设置图标 - if '上升' in str(trend) or '增长' in str(trend) or 'up' in str(trend).lower(): - trend_icon = "📈" - elif '下降' in str(trend) or '下跌' in str(trend) or 'down' in str(trend).lower(): - trend_icon = "📉" - else: - trend_icon = "📊" - - return f"{trend_icon} **{trend}** (价格区间: {price_range}, 置信度: {confidence})" + # 根据24小时趋势设置图标 + if '上涨' in str(trend_24h) or 'up' in str(trend_24h).lower(): + trend_icon = "🟢" + elif '下跌' in str(trend_24h) or 'down' in str(trend_24h).lower(): + trend_icon = "🔴" + else: + trend_icon = "🟡" # 构建Markdown文本 - markdown = f"""### {symbol} 价格预测 + markdown = f"""### {trend_icon} {symbol} 价格预测 **当前价格**: {current_price} -**24小时预测**: {format_prediction(prediction_24h)} +**24小时预测**: +- 价格区间: {price_range_24h} +- 趋势: {trend_24h} -**7天预测**: {format_prediction(prediction_7d)} - -**30天预测**: {format_prediction(prediction_30d)} +**7天预测**: +- 价格区间: {price_range_7d} +- 趋势: {trend_7d} **关键影响因素**: -{key_factors_str} - -**风险评估**: -{risk_assessment} +{key_factors_text} *预测时间: {time.strftime('%Y-%m-%d %H:%M:%S')}* """ @@ -350,7 +360,6 @@ class DingTalkBot: return f"### {symbol} 策略结果错误\n\n获取策略结果时出错: {strategy_result.get('error', '未知错误')}" # 提取关键信息 - risk_level = strategy_result.get('risk_level', '未知') position = strategy_result.get('position', '未知') entry_points = strategy_result.get('entry_points', []) @@ -364,11 +373,13 @@ class DingTalkBot: exit_points_str = '、'.join([str(point) for point in exit_points]) else: exit_points_str = str(exit_points) - + stop_loss = strategy_result.get('stop_loss', '未知') take_profit = strategy_result.get('take_profit', '未知') - time_frame = strategy_result.get('time_frame', '未知') - strategy_type = strategy_result.get('strategy_type', '未知') + + # 紧迫性评级 + urgency_level = strategy_result.get('urgency_level', 0) + reasoning = strategy_result.get('reasoning', '无理由') # 根据建议仓位设置图标 @@ -379,33 +390,29 @@ class DingTalkBot: else: position_icon = "⚪" - # 根据风险等级设置图标 - if 'low' in str(risk_level).lower() or '低' in str(risk_level): - risk_icon = "🟢" - elif 'medium' in str(risk_level).lower() or '中' in str(risk_level): - risk_icon = "🟡" + # 紧迫性等级图标 + if int(urgency_level) >= 4: + urgency_icon = "🔴" + elif int(urgency_level) >= 2: + urgency_icon = "🟡" else: - risk_icon = "🔴" + urgency_icon = "🟢" # 构建Markdown文本 markdown = f"""### {symbol} 交易策略 **建议操作**: {position_icon} {position} -**风险等级**: {risk_icon} {risk_level} +**入场点位**: {entry_points_str} -**策略类型**: {strategy_type} - -**时间框架**: {time_frame} - -**入场点**: {entry_points_str} - -**出场点**: {exit_points_str} +**出场点位**: {exit_points_str} **止损位**: {stop_loss} **止盈位**: {take_profit} +**操作紧迫性**: {urgency_icon} {urgency_level}/5 + **策略理由**: {reasoning} @@ -418,21 +425,157 @@ class DingTalkBot: traceback.print_exc() return f"### {symbol} 格式化策略结果出错\n\n{str(e)}" - def send_analysis_report(self, symbol: str, analysis_data: Dict[str, Any]) -> Dict[str, Any]: + def format_integrated_report(self, symbol: str, analysis_result: Dict[str, Any]) -> str: """ - 发送完整分析报告(分析+预测+策略) + 格式化整合后的分析报告 Args: symbol: 交易对符号 - analysis_data: 包含分析、预测和策略的数据 + analysis_result: 分析结果 + + Returns: + 格式化后的文本 + """ + try: + if not analysis_result or 'error' in analysis_result: + return f"### {symbol} 分析结果错误\n\n获取分析结果时出错: {analysis_result.get('error', '未知错误')}" + + # 提取关键信息 + market_trend = analysis_result.get('market_trend', '未知') + + # 支撑位和阻力位的格式化 + def format_fibonacci_levels(levels): + formatted_lines = [] + if isinstance(levels, list): + for level in levels: + if isinstance(level, dict) and 'level' in level and 'price' in level: + formatted_lines.append(f"- {level['level']} 位: {level['price']}") + else: + formatted_lines.append(f"- {level}") + elif isinstance(levels, dict): + for level, price in levels.items(): + if level.replace('.', '', 1).isdigit(): # 检查是否是数字形式的级别 + formatted_lines.append(f"- {level} 位: {price}") + else: + formatted_lines.append(f"- {level}: {price}") + else: + formatted_lines.append(str(levels)) + return '\n'.join(formatted_lines) + + support_levels = analysis_result.get('support_levels', []) + support_levels_str = format_fibonacci_levels(support_levels) + + resistance_levels = analysis_result.get('resistance_levels', []) + resistance_levels_str = format_fibonacci_levels(resistance_levels) + + # 技术指标分析 + technical_analysis = analysis_result.get('technical_analysis', '未知') + if isinstance(technical_analysis, (dict, list)): + technical_analysis = self._format_complex_content(technical_analysis) + + # 交易量分析 + volume_analysis = analysis_result.get('volume_analysis', '未知') + if isinstance(volume_analysis, (dict, list)): + volume_analysis = self._format_complex_content(volume_analysis) + + # 操作建议 + recommendation = analysis_result.get('recommendation', '未知') + entry_points = analysis_result.get('entry_points', []) + if isinstance(entry_points, list): + entry_points_str = '、'.join([str(point) for point in entry_points]) + else: + entry_points_str = str(entry_points) + + exit_points = analysis_result.get('exit_points', []) + if isinstance(exit_points, list): + exit_points_str = '、'.join([str(point) for point in exit_points]) + else: + exit_points_str = str(exit_points) + + stop_loss = analysis_result.get('stop_loss', '未知') + take_profit = analysis_result.get('take_profit', '未知') + + # 紧迫性评级 + urgency_level = analysis_result.get('urgency_level', 0) + urgency_reason = analysis_result.get('urgency_reason', '未知') + + # 紧迫性等级图标 + if int(urgency_level) >= 4: + urgency_icon = "🔴" + elif int(urgency_level) >= 2: + urgency_icon = "🟡" + else: + urgency_icon = "🟢" + + # 总结 + summary = analysis_result.get('summary', '无摘要') + + # 根据市场趋势设置图标 + if '牛' in str(market_trend) or 'bull' in str(market_trend).lower(): + trend_icon = "🟢" + elif '熊' in str(market_trend) or 'bear' in str(market_trend).lower(): + trend_icon = "🔴" + else: + trend_icon = "🟡" + + # 构建Markdown文本 + markdown = f"""### DeepSeek AI 加密货币分析报告 + +**交易对**: {symbol} + +**分析时间**: {time.strftime('%Y-%m-%d %H:%M:%S')} + +**市场趋势**: {trend_icon} {market_trend} + +**技术指标分析**: +{technical_analysis} + +**交易量分析**: +{volume_analysis} + +**支撑位(斐波那契)**: +{support_levels_str} + +**阻力位(斐波那契)**: +{resistance_levels_str} + +**操作建议**: {recommendation} + +**入场点位**: {entry_points_str} + +**出场点位**: {exit_points_str} + +**止损位**: {stop_loss} + +**止盈位**: {take_profit} + +**操作紧迫性**: {urgency_icon} {urgency_level}/5 + +**紧迫性原因**: {urgency_reason} + +**总结**: {summary} + +""" + return markdown + + except Exception as e: + print(f"格式化整合后的分析报告时出错: {e}") + traceback.print_exc() + return f"### {symbol} 格式化整合后的分析报告出错\n\n{str(e)}" + + def send_analysis_report(self, symbol: str, analysis_data: Dict[str, Any]) -> Dict[str, Any]: + """ + 发送分析报告(整合市场分析和交易建议) + + Args: + symbol: 交易对符号 + analysis_data: 包含分析数据 Returns: 接口返回结果 """ try: analysis_result = analysis_data.get('analysis', {}) - prediction_result = analysis_data.get('prediction', {}) - strategy_result = analysis_data.get('strategy', {}) # 获取市场趋势,用于设置标题图标 market_trend = '' @@ -447,29 +590,13 @@ class DingTalkBot: title_icon = "🟡" # 获取建议操作 - position = '未知' - if strategy_result and 'position' in strategy_result: - position = strategy_result['position'] + position = analysis_result.get('recommendation', '未知') # 构建标题 - title = f"{title_icon} {symbol} 加密货币分析报告 | 建议: {position}" + title = f"{title_icon} {symbol} 加密货币分析 | 建议: {position}" - # 构建完整报告内容 - markdown_text = f"# {symbol} 加密货币AI分析报告\n\n" - - # 添加分析结果 - markdown_text += "## 一、市场分析\n\n" - markdown_text += self.format_analysis_result(symbol, analysis_result) - markdown_text += "\n\n" - - # 添加预测结果 - markdown_text += "## 二、价格预测\n\n" - markdown_text += self.format_prediction_result(symbol, prediction_result) - markdown_text += "\n\n" - - # 添加策略结果 - markdown_text += "## 三、交易策略\n\n" - markdown_text += self.format_strategy_result(symbol, strategy_result) + # 格式化分析结果(整合了分析和交易建议) + markdown_text = self.format_integrated_report(symbol, analysis_result) # 发送Markdown消息 return self.send_markdown(title, markdown_text)