import os import sys import pandas as pd import numpy as np from typing import Dict, Any, List, Optional, Tuple import datetime import time import json from datetime import datetime, timedelta # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from api.binance_api import BinanceAPI from api.okx_api import OKXAPI from api.deepseek_api import DeepSeekAPI from models.data_processor import DataProcessor from utils.config_loader import ConfigLoader from utils.dingtalk_bot import DingTalkBot from utils.db_manager import get_db_manager from utils.discord_bot import DiscordBot class CryptoAgent: """加密货币智能体,用于分析市场数据并生成交易策略""" def __init__(self, config_path: str = None): """ 初始化加密货币智能体 Args: config_path: 配置文件路径,如果为None,则使用默认路径 """ # 加载配置 self.config_loader = ConfigLoader(config_path) # 获取各部分配置 self.binance_config = self.config_loader.get_binance_config() self.okx_config = self.config_loader.get_okx_config() self.deepseek_config = self.config_loader.get_deepseek_config() self.crypto_config = self.config_loader.get_crypto_config() self.data_config = self.config_loader.get_data_config() self.dingtalk_config = self.config_loader.get_dingtalk_config() self.discord_config = self.config_loader.get_discord_config() # 初始化API客户端 self.binance_api = BinanceAPI( api_key=self.binance_config['api_key'], api_secret=self.binance_config['api_secret'], test_mode=self.binance_config['test_mode'] ) # 初始化OKX API客户端 self.okx_api = OKXAPI( api_key=self.okx_config['api_key'], api_secret=self.okx_config['api_secret'], passphrase=self.okx_config['passphrase'], test_mode=self.okx_config['test_mode'] ) self.deepseek_api = DeepSeekAPI() # 初始化数据处理器 self.data_processor = DataProcessor(storage_path=self.data_config['storage_path']) # 初始化钉钉机器人(如果启用) self.dingtalk_bot = None if self.dingtalk_config.get('enabled', False): self.dingtalk_bot = DingTalkBot( webhook_url=self.dingtalk_config['webhook_url'], secret=self.dingtalk_config.get('secret') ) print("钉钉机器人已启用") # 初始化Discord机器人(如果启用) self.discord_bot = None if self.discord_config.get('enabled', False): self.discord_bot = DiscordBot( webhook_url=self.discord_config['crypto_webhook_url'] ) print("Discord机器人已启用") # 初始化数据库管理器 self.db_manager = get_db_manager() # 设置支持的加密货币 self.base_currencies = self.crypto_config['base_currencies'] self.quote_currency = self.crypto_config['quote_currency'] self.symbols = [f"{base}{self.quote_currency}" for base in self.base_currencies] # 设置时间间隔 self.time_interval = self.crypto_config['time_interval'] def fetch_historical_data(self, symbol: str, days: int = 30) -> pd.DataFrame: """ 获取历史数据 Args: symbol: 交易对符号,例如 'BTCUSDT' days: 要获取的天数 Returns: 历史数据DataFrame """ print(f"获取{symbol}的历史数据({days}天)...") # 计算开始时间 start_time = datetime.now() - timedelta(days=days) start_str = start_time.strftime("%Y-%m-%d") try: # 首先尝试从Binance获取K线数据 print(f"尝试从Binance获取{symbol}的K线数据...") data = self.binance_api.get_historical_klines( symbol=symbol, interval=self.time_interval, start_str=start_str ) if data.empty: # 如果从Binance获取失败,尝试从OKX获取 print(f"从Binance获取数据失败,尝试从OKX获取{symbol}的K线数据...") data = self.okx_api.get_historical_klines( symbol=symbol, interval=self.time_interval, start_str=start_str, limit=100 # OKX最多返回100条记录,这里可能需要多次请求以获取更多数据 ) except Exception as e: print(f"从Binance获取数据出错: {e},尝试从OKX获取...") try: # 尝试从OKX获取 data = self.okx_api.get_historical_klines( symbol=symbol, interval=self.time_interval, start_str=start_str, limit=100 ) except Exception as okx_e: print(f"从OKX获取数据也出错: {okx_e}") return pd.DataFrame() if data.empty: print(f"无法从任何交易所获取{symbol}的历史数据") return pd.DataFrame() # 保存原始数据 raw_data_path = self.data_processor.save_data(symbol, data, data_type='raw') print(f"原始数据已保存到: {raw_data_path}") return data def process_data(self, symbol: str, data: pd.DataFrame) -> pd.DataFrame: """ 处理数据 Args: symbol: 交易对符号,例如 'BTCUSDT' data: 原始数据 Returns: 处理后的数据 """ print(f"处理{symbol}的数据...") # 预处理数据 processed_data = self.data_processor.preprocess_market_data(symbol, data) # 保存处理后的数据 processed_data_path = self.data_processor.save_data(symbol, processed_data, data_type='processed') print(f"处理后的数据已保存到: {processed_data_path}") return processed_data def _format_market_data(self, market_data: Dict[str, Any]) -> str: """ 格式化市场数据为适合大模型的格式 Args: market_data: 市场数据 Returns: 格式化的数据字符串 """ # 这里可以根据实际情况调整格式化方式 return json.dumps(market_data, indent=2) def _build_market_analysis_prompt(self, formatted_data: str) -> str: """ 构建市场分析提示词 Args: formatted_data: 格式化的市场数据 Returns: 提示词 """ return f"""请对以下加密货币市场K线数据进行深入分析,并给出交易建议。请使用中文回复。 数据: {formatted_data} 请严格按照以下步骤分析: 1. 分析主要技术指标(包括RSI、MACD、布林带、均线等),判断当前市场趋势 2. 基于K线数据和斐波那契回调水平,确定关键支撑位和压力位 3. 根据技术分析给出清晰的操作建议 4. 评估当前操作建议的紧迫性 请以JSON格式回复,仅包含以下字段: - market_trend: 市场趋势 (牛市, 熊市, 震荡) - technical_analysis: 技术指标详细分析 (重点分析RSI、MACD、布林带、均线交叉等情况) - support_levels: 基于斐波那契回调的支撑位列表(标明各个支撑位对应的斐波那契水平,例如0.382、0.5、0.618等) - resistance_levels: 基于斐波那契回调的阻力位列表(同样标明对应水平) - volume_analysis: 交易量分析,重点关注量价关系 - recommendation: 操作建议 (买入、卖出或等待) - entry_points: 推荐入场点位 - exit_points: 推荐出场点位 - stop_loss: 建议止损位 - take_profit: 建议止盈位 - urgency_level: 操作紧迫性评级 (1-5,1为最低紧迫性,5为最高紧迫性) - urgency_reason: 紧迫性评级的原因说明 - summary: 分析总结(不超过50字) 请确保回复为有效的JSON格式,分析要精准专业。""" def analyze_symbol(self, symbol: str) -> Dict[str, Any]: """ 分析单个交易对 """ results = {} print(f"\n开始分析{symbol}...") # 获取并处理数据 raw_data = self.fetch_historical_data(symbol, days=self.crypto_config['historical_days']) if not raw_data.empty: processed_data = self.process_data(symbol, raw_data) # 准备市场数据 market_data = { "symbol": symbol, "current_price": float(processed_data['close'].iloc[-1]), # "price_change_24h": float(processed_data['close'].iloc[-1] - processed_data['close'].iloc[-24]), # "price_change_percentage_24h": float((processed_data['close'].iloc[-1] - processed_data['close'].iloc[-24]) / processed_data['close'].iloc[-24] * 100), "historical_prices": processed_data['close'].tail(100).tolist(), "volumes": processed_data['volume'].tail(100).tolist(), "technical_indicators": { "rsi": float(processed_data['RSI'].iloc[-1]), "macd": float(processed_data['MACD'].iloc[-1]), "macd_signal": float(processed_data['MACD_Signal'].iloc[-1]), "bollinger_upper": float(processed_data['Bollinger_Upper'].iloc[-1]), "bollinger_lower": float(processed_data['Bollinger_Lower'].iloc[-1]), "ma5": float(processed_data['MA5'].iloc[-1]), "ma10": float(processed_data['MA10'].iloc[-1]), "ma20": float(processed_data['MA20'].iloc[-1]), "ma50": float(processed_data['MA50'].iloc[-1]), "atr": float(processed_data['ATR'].iloc[-1]) }, "klines": processed_data[['open', 'high', 'low', 'close', 'volume']].tail(100).to_dict('records') } # 将市场数据格式化为适合大模型的格式 formatted_data = self._format_market_data(market_data) # 构建提示词 prompt = self._build_market_analysis_prompt(formatted_data) # 调用API获取分析和交易建议 response, usage = self.deepseek_api.call_model(prompt, task_type="交易分析", symbol=symbol) # 解析响应 analysis_result = self.deepseek_api.extract_json_from_response(response) # 添加token使用信息 if usage: analysis_result["_token_usage"] = usage # 整合结果 results[symbol] = { "analysis": analysis_result, "timestamp": datetime.now().isoformat() } # 保存分析结果到数据库 try: # 保存到数据库 saved = self.db_manager.save_analysis_result( agent='crypto', symbol=symbol, time_interval=self.time_interval, analysis_result=analysis_result ) if saved: print(f"{symbol}分析结果已保存到数据库") else: print(f"{symbol}分析结果保存到数据库失败") except Exception as e: print(f"保存{symbol}分析结果到数据库时出错: {e}") print(f"{symbol}分析完成") else: print(f"跳过{symbol}分析,无法获取数据") return results def analyze_all_symbols(self) -> Dict[str, Dict[str, Any]]: """ 分析所有支持的交易对 Returns: 所有交易对的分析结果 """ results = {} for symbol in self.symbols: print(f"\n开始分析{symbol}...") single_result = self.analyze_symbol(symbol) results[symbol] = single_result[symbol] return results def run_analysis_cycle(self, symbol: Optional[str] = None) -> Dict[str, Any]: """ 运行一个完整的分析周期 Returns: 分析结果 """ print(f"开始新的分析周期,时间:{datetime.now().isoformat()}") if symbol: results = self.analyze_symbol(symbol) else: results = self.analyze_all_symbols() # 保存分析结果 timestamp = datetime.now().strftime("%Y%m%d%H%M%S") results_dir = os.path.join(self.data_config['storage_path'], 'analysis_results') os.makedirs(results_dir, exist_ok=True) results_file = os.path.join(results_dir, f"analysis_results_{timestamp}.json") with open(results_file, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"分析结果已保存到:{results_file}") # 把分析结果调用大模型转化成交易建议 message = self.convert_analysis_to_trading_suggestions(results) print(f"交易建议: {message}") if self.dingtalk_bot: self.dingtalk_bot.send_markdown(title="加密货币交易建议", text=message) if self.discord_bot: self.discord_bot.send_message(content=message) # 导出 DeepSeek API token 使用情况 self._export_token_usage() return results def convert_analysis_to_trading_suggestions(self, results: Dict[str, Any]) -> str: """ 把分析的JSON结果调用大模型转化成交易建议 """ prompt = f""" 请对以下加密货币市场分析的JSON结果进行归纳总结: 需要输出的内容包括: 标题:AI Agent 加密货币分析报告 1. 对目标交易对行情进行总结 2. 对所有交易对给出操作建议: 2.1 操作建议(做多、做空、观望) 2.2 操作点位 2.3 止损止盈点位 2.4 操作评级 2.5 建议原因 以下是每个交易对的分析结果: {results} 请以优美的Markdown格式输出,通过 emoji 标签来增加可读性。 """ system_prompt = """ 你是一个专业的加密货币分析高手,你擅长分析市场趋势、预测价格走向和提供交易建议,请始终使用中文回复。 """ response, usage = self.deepseek_api.call_model(prompt, system_prompt=system_prompt, task_type="交易建议") message = self.deepseek_api.extract_text_from_response(response) return message def _export_token_usage(self) -> None: """ 导出 DeepSeek API token 使用情况 """ try: # 每天导出一次详细的JSON数据 today = datetime.now().strftime("%Y%m%d") token_usage_dir = os.path.join(self.data_config['storage_path'], 'token_usage') os.makedirs(token_usage_dir, exist_ok=True) json_file = os.path.join(token_usage_dir, f"deepseek_token_usage_{today}.json") # 如果文件不存在,导出一次 if not os.path.exists(json_file): exported_file = self.deepseek_api.export_token_usage(json_file) if exported_file: print(f"DeepSeek API token 使用情况已导出到:{exported_file}") # 每次都导出CSV格式的统计数据 csv_file = os.path.join(token_usage_dir, f"deepseek_token_usage_{today}.csv") self.deepseek_api.export_token_usage(csv_file, "csv") # 输出当前使用情况统计 stats = self.deepseek_api.get_token_usage_stats() print(f"DeepSeek API token 使用统计:") print(f"- 总调用次数: {stats['total_calls']}") print(f"- 总token数: {stats['total_tokens']}") print(f"- 输入token: {stats['total_prompt_tokens']}") print(f"- 输出token: {stats['total_completion_tokens']}") print(f"- 平均每次调用: {stats['average_tokens_per_call']:.2f} tokens") except Exception as e: print(f"导出 token 使用情况时出错: {e}") def start_agent(self, symbol: Optional[str] = None) -> None: """ 启动智能体 Args: run_once: 是否只运行一次分析周期 """ print("启动加密货币分析智能体...") try: if symbol: self.run_analysis_cycle(symbol) else: self.run_analysis_cycle() # 导出最终的token使用情况 self._export_token_usage() except KeyboardInterrupt: print("\n智能体已停止") # 导出最终的token使用情况 self._export_token_usage() except Exception as e: print(f"智能体运行时出错: {e}") import traceback traceback.print_exc() # 发生错误时也尝试导出token使用情况 self._export_token_usage()