import os import sys import pandas as pd import numpy as np from typing import Dict, Any, List, Optional, Tuple import datetime import time import json from datetime import datetime, timedelta # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from api.binance_api import BinanceAPI from api.deepseek_api import DeepSeekAPI from models.data_processor import DataProcessor from utils.config_loader import ConfigLoader from utils.dingtalk_bot import DingTalkBot class CryptoAgent: """加密货币智能体,用于分析市场数据并生成交易策略""" def __init__(self, config_path: str = None): """ 初始化加密货币智能体 Args: config_path: 配置文件路径,如果为None,则使用默认路径 """ # 加载配置 self.config_loader = ConfigLoader(config_path) # 获取各部分配置 self.binance_config = self.config_loader.get_binance_config() self.deepseek_config = self.config_loader.get_deepseek_config() self.crypto_config = self.config_loader.get_crypto_config() self.data_config = self.config_loader.get_data_config() self.agent_config = self.config_loader.get_agent_config() self.dingtalk_config = self.config_loader.get_dingtalk_config() # 初始化API客户端 self.binance_api = BinanceAPI( api_key=self.binance_config['api_key'], api_secret=self.binance_config['api_secret'], test_mode=self.binance_config['test_mode'] ) self.deepseek_api = DeepSeekAPI( api_key=self.deepseek_config['api_key'], model=self.deepseek_config['model'] ) # 初始化数据处理器 self.data_processor = DataProcessor(storage_path=self.data_config['storage_path']) # 初始化钉钉机器人(如果启用) self.dingtalk_bot = None if self.dingtalk_config.get('enabled', False): self.dingtalk_bot = DingTalkBot( webhook_url=self.dingtalk_config['webhook_url'], secret=self.dingtalk_config.get('secret') ) print("钉钉机器人已启用") # 设置支持的加密货币 self.base_currencies = self.crypto_config['base_currencies'] self.quote_currency = self.crypto_config['quote_currency'] self.symbols = [f"{base}{self.quote_currency}" for base in self.base_currencies] # 设置时间间隔 self.time_interval = self.crypto_config['time_interval'] # 风险等级 self.risk_level = self.agent_config['risk_level'] # 支持的策略 self.strategies = self.agent_config['strategies'] def fetch_historical_data(self, symbol: str, days: int = 30) -> pd.DataFrame: """ 获取历史数据 Args: symbol: 交易对符号,例如 'BTCUSDT' days: 要获取的天数 Returns: 历史数据DataFrame """ print(f"获取{symbol}的历史数据({days}天)...") # 计算开始时间 start_time = datetime.now() - timedelta(days=days) start_str = start_time.strftime("%Y-%m-%d") # 获取K线数据 data = self.binance_api.get_historical_klines( symbol=symbol, interval=self.time_interval, start_str=start_str ) if data.empty: print(f"无法获取{symbol}的历史数据") return pd.DataFrame() # 保存原始数据 raw_data_path = self.data_processor.save_data(symbol, data, data_type='raw') print(f"原始数据已保存到: {raw_data_path}") return data def process_data(self, symbol: str, data: pd.DataFrame) -> pd.DataFrame: """ 处理数据 Args: symbol: 交易对符号,例如 'BTCUSDT' data: 原始数据 Returns: 处理后的数据 """ print(f"处理{symbol}的数据...") # 预处理数据 processed_data = self.data_processor.preprocess_market_data(symbol, data) # 保存处理后的数据 processed_data_path = self.data_processor.save_data(symbol, processed_data, data_type='processed') print(f"处理后的数据已保存到: {processed_data_path}") return processed_data def _format_market_data(self, market_data: Dict[str, Any]) -> str: """ 格式化市场数据为适合大模型的格式 Args: market_data: 市场数据 Returns: 格式化的数据字符串 """ # 这里可以根据实际情况调整格式化方式 return json.dumps(market_data, indent=2) def _build_market_analysis_prompt(self, formatted_data: str) -> str: """ 构建市场分析提示词 Args: formatted_data: 格式化的市场数据 Returns: 提示词 """ return f"""请对以下加密货币市场K线数据进行深入分析,并给出交易建议。请使用中文回复。 数据: {formatted_data} 请严格按照以下步骤分析: 1. 分析主要技术指标(包括RSI、MACD、布林带、均线等),判断当前市场趋势 2. 基于K线数据和斐波那契回调水平,确定关键支撑位和压力位 3. 根据技术分析给出清晰的操作建议 4. 评估当前操作建议的紧迫性 请以JSON格式回复,仅包含以下字段: - market_trend: 市场趋势 (牛市, 熊市, 震荡) - technical_analysis: 技术指标详细分析 (重点分析RSI、MACD、布林带、均线交叉等情况) - support_levels: 基于斐波那契回调的支撑位列表(标明各个支撑位对应的斐波那契水平,例如0.382、0.5、0.618等) - resistance_levels: 基于斐波那契回调的阻力位列表(同样标明对应水平) - volume_analysis: 交易量分析,重点关注量价关系 - recommendation: 操作建议 (买入、卖出或等待) - entry_points: 推荐入场点位 - exit_points: 推荐出场点位 - stop_loss: 建议止损位 - take_profit: 建议止盈位 - urgency_level: 操作紧迫性评级 (1-5,1为最低紧迫性,5为最高紧迫性) - urgency_reason: 紧迫性评级的原因说明 - summary: 分析总结(不超过50字) 请确保回复为有效的JSON格式,分析要精准专业。""" def analyze_all_symbols(self) -> Dict[str, Dict[str, Any]]: """ 分析所有支持的交易对 Returns: 所有交易对的分析结果 """ results = {} for symbol in self.symbols: print(f"\n开始分析{symbol}...") # 获取并处理数据 raw_data = self.fetch_historical_data(symbol, days=self.data_config['historical_days']) if not raw_data.empty: processed_data = self.process_data(symbol, raw_data) # 准备市场数据 market_data = { "symbol": symbol, "current_price": float(processed_data['close'].iloc[-1]), "price_change_24h": float(processed_data['close'].iloc[-1] - processed_data['close'].iloc[-24]), "price_change_percentage_24h": float((processed_data['close'].iloc[-1] - processed_data['close'].iloc[-24]) / processed_data['close'].iloc[-24] * 100), "historical_prices": processed_data['close'].tail(100).tolist(), "volumes": processed_data['volume'].tail(100).tolist(), "technical_indicators": { "rsi": float(processed_data['RSI'].iloc[-1]), "macd": float(processed_data['MACD'].iloc[-1]), "macd_signal": float(processed_data['MACD_Signal'].iloc[-1]), "bollinger_upper": float(processed_data['Bollinger_Upper'].iloc[-1]), "bollinger_lower": float(processed_data['Bollinger_Lower'].iloc[-1]), "ma5": float(processed_data['MA5'].iloc[-1]), "ma10": float(processed_data['MA10'].iloc[-1]), "ma20": float(processed_data['MA20'].iloc[-1]), "ma50": float(processed_data['MA50'].iloc[-1]), "atr": float(processed_data['ATR'].iloc[-1]) }, "klines": processed_data[['open', 'high', 'low', 'close', 'volume']].tail(30).to_dict('records') } # 将市场数据格式化为适合大模型的格式 formatted_data = self._format_market_data(market_data) # 构建提示词 prompt = self._build_market_analysis_prompt(formatted_data) # 调用API获取分析和交易建议 response, usage = self.deepseek_api.call_model(prompt, task_type="交易分析", symbol=symbol) # 解析响应 analysis_result = self.deepseek_api.extract_json_from_response(response) # 添加token使用信息 if usage: analysis_result["_token_usage"] = usage # 整合结果 results[symbol] = { "analysis": analysis_result, "timestamp": datetime.now().isoformat() } # 如果钉钉机器人已启用,发送分析报告 if self.dingtalk_bot: try: print(f"发送{symbol}分析报告到钉钉...") response = self.dingtalk_bot.send_analysis_report(symbol, results[symbol]) if response.get('errcode') == 0: print(f"{symbol}分析报告发送成功") else: print(f"{symbol}分析报告发送失败: {response}") except Exception as e: print(f"发送{symbol}分析报告时出错: {e}") print(f"{symbol}分析完成") else: print(f"跳过{symbol}分析,无法获取数据") return results def send_notifications(self, symbol: str, analysis_data: Dict[str, Any]) -> bool: """ 发送分析结果通知 Args: symbol: 交易对符号 analysis_data: 分析数据 Returns: 发送是否成功 """ if not self.dingtalk_bot: print(f"钉钉通知未启用,跳过发送 {symbol} 的分析结果") return False try: # 使用已初始化的钉钉机器人实例发送完整分析报告 response = self.dingtalk_bot.send_analysis_report(symbol, analysis_data) if response.get('errcode') == 0: print(f"成功发送 {symbol} 分析结果到钉钉") return True else: print(f"发送 {symbol} 分析结果到钉钉失败: {response}") return False except Exception as e: print(f"发送钉钉通知时出错: {e}") return False def run_analysis_cycle(self) -> Dict[str, Any]: """ 运行一个完整的分析周期 Returns: 分析结果 """ print(f"开始新的分析周期,时间:{datetime.now().isoformat()}") results = self.analyze_all_symbols() # 保存分析结果 timestamp = datetime.now().strftime("%Y%m%d%H%M%S") results_dir = os.path.join(self.data_config['storage_path'], 'analysis_results') os.makedirs(results_dir, exist_ok=True) results_file = os.path.join(results_dir, f"analysis_results_{timestamp}.json") with open(results_file, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"分析结果已保存到:{results_file}") # 导出 DeepSeek API token 使用情况 self._export_token_usage() return results def _export_token_usage(self) -> None: """ 导出 DeepSeek API token 使用情况 """ try: # 每天导出一次详细的JSON数据 today = datetime.now().strftime("%Y%m%d") token_usage_dir = os.path.join(self.data_config['storage_path'], 'token_usage') os.makedirs(token_usage_dir, exist_ok=True) json_file = os.path.join(token_usage_dir, f"deepseek_token_usage_{today}.json") # 如果文件不存在,导出一次 if not os.path.exists(json_file): exported_file = self.deepseek_api.export_token_usage(json_file) if exported_file: print(f"DeepSeek API token 使用情况已导出到:{exported_file}") # 每次都导出CSV格式的统计数据 csv_file = os.path.join(token_usage_dir, f"deepseek_token_usage_{today}.csv") self.deepseek_api.export_token_usage(csv_file, "csv") # 输出当前使用情况统计 stats = self.deepseek_api.get_token_usage_stats() print(f"DeepSeek API token 使用统计:") print(f"- 总调用次数: {stats['total_calls']}") print(f"- 总token数: {stats['total_tokens']}") print(f"- 输入token: {stats['total_prompt_tokens']}") print(f"- 输出token: {stats['total_completion_tokens']}") print(f"- 平均每次调用: {stats['average_tokens_per_call']:.2f} tokens") except Exception as e: print(f"导出 token 使用情况时出错: {e}") def start_agent(self, run_once: bool = False) -> None: """ 启动智能体 Args: run_once: 是否只运行一次分析周期 """ print("启动加密货币分析智能体...") try: if run_once: self.run_analysis_cycle() # 导出最终的token使用情况 self._export_token_usage() else: while True: self.run_analysis_cycle() # 等待下一个分析周期 analysis_interval = self.agent_config['analysis_interval'] print(f"等待{analysis_interval}分钟后进行下一次分析...") time.sleep(analysis_interval * 60) except KeyboardInterrupt: print("\n智能体已停止") # 导出最终的token使用情况 self._export_token_usage() except Exception as e: print(f"智能体运行时出错: {e}") import traceback traceback.print_exc() # 发生错误时也尝试导出token使用情况 self._export_token_usage()