diff --git a/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc b/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc index 57a923d..6900ef4 100644 Binary files a/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc and b/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc differ diff --git a/cryptoai/agents/crypto_agent.py b/cryptoai/agents/crypto_agent.py index f0394b4..209918d 100644 --- a/cryptoai/agents/crypto_agent.py +++ b/cryptoai/agents/crypto_agent.py @@ -490,8 +490,45 @@ class CryptoAgent: print(f"分析结果已保存到:{results_file}") + # 导出 DeepSeek API token 使用情况 + self._export_token_usage() + return results + def _export_token_usage(self) -> None: + """ + 导出 DeepSeek API token 使用情况 + """ + try: + # 每天导出一次详细的JSON数据 + today = datetime.now().strftime("%Y%m%d") + token_usage_dir = os.path.join(self.data_config['storage_path'], 'token_usage') + os.makedirs(token_usage_dir, exist_ok=True) + + json_file = os.path.join(token_usage_dir, f"deepseek_token_usage_{today}.json") + + # 如果文件不存在,导出一次 + if not os.path.exists(json_file): + exported_file = self.deepseek_api.export_token_usage(json_file) + if exported_file: + print(f"DeepSeek API token 使用情况已导出到:{exported_file}") + + # 每次都导出CSV格式的统计数据 + csv_file = os.path.join(token_usage_dir, f"deepseek_token_usage_{today}.csv") + self.deepseek_api.export_token_usage(csv_file, "csv") + + # 输出当前使用情况统计 + stats = self.deepseek_api.get_token_usage_stats() + print(f"DeepSeek API token 使用统计:") + print(f"- 总调用次数: {stats['total_calls']}") + print(f"- 总token数: {stats['total_tokens']}") + print(f"- 输入token: {stats['total_prompt_tokens']}") + print(f"- 输出token: {stats['total_completion_tokens']}") + print(f"- 平均每次调用: {stats['average_tokens_per_call']:.2f} tokens") + + except Exception as e: + print(f"导出 token 使用情况时出错: {e}") + def start_agent(self, run_once: bool = False) -> None: """ 启动智能体 @@ -504,6 +541,8 @@ class CryptoAgent: try: if run_once: self.run_analysis_cycle() + # 导出最终的token使用情况 + self._export_token_usage() else: while True: self.run_analysis_cycle() @@ -515,11 +554,15 @@ class CryptoAgent: except KeyboardInterrupt: print("\n智能体已停止") + # 导出最终的token使用情况 + self._export_token_usage() except Exception as e: print(f"智能体运行时出错: {e}") import traceback traceback.print_exc() + # 发生错误时也尝试导出token使用情况 + self._export_token_usage() def send_notifications(self, symbol: str, analysis_data: Dict[str, Any]) -> bool: """ diff --git a/cryptoai/api/__pycache__/deepseek_api.cpython-313.pyc b/cryptoai/api/__pycache__/deepseek_api.cpython-313.pyc index 0905f97..dcf42c5 100644 Binary files a/cryptoai/api/__pycache__/deepseek_api.cpython-313.pyc and b/cryptoai/api/__pycache__/deepseek_api.cpython-313.pyc differ diff --git a/cryptoai/api/deepseek_api.py b/cryptoai/api/deepseek_api.py index 06d66a2..ea5f75a 100644 --- a/cryptoai/api/deepseek_api.py +++ b/cryptoai/api/deepseek_api.py @@ -1,9 +1,20 @@ import os import json import requests -from typing import Dict, Any, List, Optional +from typing import Dict, Any, List, Optional, Tuple import time +import logging +import datetime +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler("deepseek_token_usage.log"), + logging.StreamHandler() + ] +) class DeepSeekAPI: """DeepSeek API交互类,用于进行市场分析和预测""" @@ -23,6 +34,17 @@ class DeepSeekAPI: "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } + + # Token 使用统计 + self.token_usage = { + "total_prompt_tokens": 0, + "total_completion_tokens": 0, + "total_tokens": 0, + "calls": [] + } + + # 创建日志记录器 + self.logger = logging.getLogger("DeepSeekAPI") def analyze_market_data(self, market_data: Dict[str, Any]) -> Dict[str, Any]: """ @@ -41,10 +63,16 @@ class DeepSeekAPI: prompt = self._build_market_analysis_prompt(formatted_data) # 调用API获取分析 - response = self._call_api(prompt) + response, usage = self._call_api(prompt, task_type="市场分析", symbol=market_data.get("symbol", "未知")) # 解析响应 - return self._parse_analysis_response(response) + result = self._parse_analysis_response(response) + + # 添加token使用信息 + if usage: + result["_token_usage"] = usage + + return result def predict_price_trend(self, symbol: str, historical_data: Dict[str, Any]) -> Dict[str, Any]: """ @@ -64,10 +92,16 @@ class DeepSeekAPI: prompt = self._build_price_prediction_prompt(symbol, formatted_data) # 调用API获取预测 - response = self._call_api(prompt) + response, usage = self._call_api(prompt, task_type="价格预测", symbol=symbol) # 解析响应 - return self._parse_prediction_response(response) + result = self._parse_prediction_response(response) + + # 添加token使用信息 + if usage: + result["_token_usage"] = usage + + return result def generate_trading_strategy(self, symbol: str, analysis_result: Dict[str, Any], risk_level: str) -> Dict[str, Any]: """ @@ -85,21 +119,112 @@ class DeepSeekAPI: prompt = self._build_trading_strategy_prompt(symbol, analysis_result, risk_level) # 调用API获取策略 - response = self._call_api(prompt) + response, usage = self._call_api(prompt, task_type="交易策略", symbol=symbol) # 解析响应 - return self._parse_strategy_response(response) + result = self._parse_strategy_response(response) + + # 添加token使用信息 + if usage: + result["_token_usage"] = usage + + return result - def _call_api(self, prompt: str) -> Dict[str, Any]: + def get_token_usage_stats(self) -> Dict[str, Any]: + """ + 获取Token使用统计信息 + + Returns: + 包含使用统计的字典 + """ + return { + "total_prompt_tokens": self.token_usage["total_prompt_tokens"], + "total_completion_tokens": self.token_usage["total_completion_tokens"], + "total_tokens": self.token_usage["total_tokens"], + "total_calls": len(self.token_usage["calls"]), + "average_tokens_per_call": self.token_usage["total_tokens"] / len(self.token_usage["calls"]) if self.token_usage["calls"] else 0, + "detailed_calls": self.token_usage["calls"][-10:] # 仅返回最近10次调用详情 + } + + def export_token_usage(self, file_path: str = None, format: str = "json") -> str: + """ + 导出Token使用数据到文件 + + Args: + file_path: 文件路径,如果为None则自动生成 + format: 导出格式,支持'json'或'csv' + + Returns: + 导出文件的路径 + """ + if file_path is None: + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + file_path = f"deepseek_token_usage_{timestamp}.{format}" + + try: + if format.lower() == "json": + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(self.token_usage, f, indent=2, ensure_ascii=False) + elif format.lower() == "csv": + import csv + + with open(file_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + # 写入表头 + writer.writerow([ + "timestamp", "task_type", "symbol", "model", + "prompt_tokens", "completion_tokens", "total_tokens", + "duration_seconds" + ]) + + # 写入数据 + for call in self.token_usage["calls"]: + writer.writerow([ + call.get("timestamp", ""), + call.get("task_type", ""), + call.get("symbol", ""), + call.get("model", ""), + call.get("prompt_tokens", 0), + call.get("completion_tokens", 0), + call.get("total_tokens", 0), + call.get("duration_seconds", 0) + ]) + + # 写入总计 + writer.writerow([]) + writer.writerow([ + f"总计 (调用次数: {len(self.token_usage['calls'])})", + "", "", "", + self.token_usage["total_prompt_tokens"], + self.token_usage["total_completion_tokens"], + self.token_usage["total_tokens"], + "" + ]) + else: + raise ValueError(f"不支持的格式: {format},仅支持 'json' 或 'csv'") + + self.logger.info(f"Token使用数据已导出到: {file_path}") + return file_path + + except Exception as e: + error_msg = f"导出Token使用数据时出错: {e}" + self.logger.error(error_msg) + return "" + + def _call_api(self, prompt: str, task_type: str = "未知任务", symbol: str = "未知") -> Tuple[Dict[str, Any], Dict[str, Any]]: """ 调用DeepSeek API Args: prompt: 提示词 + task_type: 任务类型 + symbol: 交易对符号 Returns: - API响应 + (API响应, token使用信息) """ + usage_info = {} + try: endpoint = f"{self.base_url}/chat/completions" @@ -113,14 +238,48 @@ class DeepSeekAPI: "max_tokens": 2000 } + start_time = time.time() response = requests.post(endpoint, headers=self.headers, json=payload) response.raise_for_status() + response_data = response.json() + end_time = time.time() - return response.json() + # 记录token使用情况 + if 'usage' in response_data: + prompt_tokens = response_data['usage'].get('prompt_tokens', 0) + completion_tokens = response_data['usage'].get('completion_tokens', 0) + total_tokens = response_data['usage'].get('total_tokens', 0) + + usage_info = { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + "task_type": task_type, + "symbol": symbol, + "model": self.model, + "timestamp": datetime.datetime.now().isoformat(), + "duration_seconds": round(end_time - start_time, 2) + } + + # 更新总计 + self.token_usage["total_prompt_tokens"] += prompt_tokens + self.token_usage["total_completion_tokens"] += completion_tokens + self.token_usage["total_tokens"] += total_tokens + self.token_usage["calls"].append(usage_info) + + # 记录到日志 + self.logger.info( + f"DeepSeek API调用 - 任务: {task_type}, 符号: {symbol}, " + f"输入tokens: {prompt_tokens}, 输出tokens: {completion_tokens}, " + f"总tokens: {total_tokens}, 耗时: {round(end_time - start_time, 2)}秒" + ) + + return response_data, usage_info except Exception as e: - print(f"调用DeepSeek API时出错: {e}") - return {} + error_msg = f"调用DeepSeek API时出错: {e}" + self.logger.error(error_msg) + return {}, usage_info def _format_market_data(self, market_data: Dict[str, Any]) -> str: """ @@ -273,7 +432,8 @@ class DeepSeekAPI: return {"error": "API响应格式不正确", "raw_response": response} except Exception as e: - print(f"解析分析响应时出错: {e}") + error_msg = f"解析分析响应时出错: {e}" + self.logger.error(error_msg) return {"error": str(e), "raw_response": response} def _parse_prediction_response(self, response: Dict[str, Any]) -> Dict[str, Any]: diff --git a/cryptoai/config/config.yaml b/cryptoai/config/config.yaml index 24163a6..278a98b 100644 --- a/cryptoai/config/config.yaml +++ b/cryptoai/config/config.yaml @@ -13,17 +13,17 @@ deepseek: crypto: base_currencies: # - "BTC" - - "ETH" - - "BNB" - - "SOL" - # - "ADA" + # - "ETH" + # - "BNB" + # - "SOL" + - "SUI" quote_currency: "USDT" time_interval: "4h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d # 数据设置 data: storage_path: "./cryptoai/data" - historical_days: 30 + historical_days: 180 update_interval: 60 # 数据更新间隔(分钟) # Agent设置 diff --git a/cryptoai/utils/token_usage.py b/cryptoai/utils/token_usage.py new file mode 100644 index 0000000..99dd061 --- /dev/null +++ b/cryptoai/utils/token_usage.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +DeepSeek API Token使用情况分析工具 +""" + +import os +import sys +import argparse +import json +from typing import Dict, Any +import pandas as pd +from datetime import datetime + +# 添加项目根目录到Python路径 +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from api.deepseek_api import DeepSeekAPI +from utils.config_loader import ConfigLoader + + +def get_deepseek_api() -> DeepSeekAPI: + """ + 获取已配置的DeepSeekAPI实例 + """ + config_loader = ConfigLoader() + deepseek_config = config_loader.get_deepseek_config() + + if not deepseek_config or 'api_key' not in deepseek_config: + print("错误: 未找到DeepSeek API配置或API密钥") + sys.exit(1) + + return DeepSeekAPI( + api_key=deepseek_config['api_key'], + model=deepseek_config.get('model', 'deepseek-moe-16b-chat') + ) + + +def show_token_usage_stats(): + """ + 显示Token使用统计信息 + """ + api = get_deepseek_api() + stats = api.get_token_usage_stats() + + print("\n===== DeepSeek API Token使用统计 =====") + print(f"总输入Tokens: {stats['total_prompt_tokens']:,}") + print(f"总输出Tokens: {stats['total_completion_tokens']:,}") + print(f"总Tokens: {stats['total_tokens']:,}") + print(f"API调用次数: {stats['total_calls']}") + print(f"平均每次调用Tokens: {stats['average_tokens_per_call']:.2f}") + + if stats['total_calls'] > 0: + print("\n最近调用记录:") + for call in stats['detailed_calls']: + print(f" - {call['timestamp']} | {call['symbol']} | {call['task_type']} | " + f"输入: {call['prompt_tokens']} | 输出: {call['completion_tokens']} | " + f"总计: {call['total_tokens']} | 耗时: {call['duration_seconds']}秒") + + print("\n注意: 这些统计数据仅反映当前程序运行期间的使用情况") + + +def export_token_usage(format_type: str = "json", output_path: str = None): + """ + 导出Token使用数据 + + Args: + format_type: 导出格式,'json'或'csv' + output_path: 输出文件路径 + """ + api = get_deepseek_api() + + if output_path is None: + # 如果未指定路径,则使用默认路径 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + output_path = f"deepseek_token_usage_{timestamp}.{format_type}" + + file_path = api.export_token_usage(output_path, format_type) + + if file_path: + print(f"\nToken使用数据已导出到: {file_path}") + else: + print("\n导出Token使用数据失败") + + +def analyze_token_usage(json_file: str = None): + """ + 分析Token使用数据并显示统计 + + Args: + json_file: JSON格式的Token使用数据文件 + """ + try: + # 加载数据 + if json_file and os.path.exists(json_file): + with open(json_file, 'r', encoding='utf-8') as f: + data = json.load(f) + else: + # 使用当前内存中的数据 + api = get_deepseek_api() + data = api.token_usage + + if not data or not data.get('calls'): + print("没有可用的Token使用数据进行分析") + return + + # 转换为DataFrame进行分析 + df = pd.DataFrame(data['calls']) + + # 按任务类型分析 + task_analysis = df.groupby('task_type').agg({ + 'prompt_tokens': ['sum', 'mean'], + 'completion_tokens': ['sum', 'mean'], + 'total_tokens': ['sum', 'mean', 'count'] + }) + + # 按符号(交易对)分析 + symbol_analysis = df.groupby('symbol').agg({ + 'total_tokens': ['sum', 'mean', 'count'] + }) + + # 计算总体统计 + total_tokens = data['total_tokens'] + total_calls = len(data['calls']) + + # 显示分析结果 + print("\n===== DeepSeek API Token使用分析 =====") + print(f"总调用次数: {total_calls}") + print(f"总Token使用: {total_tokens:,}") + print(f"平均每次调用Token: {total_tokens / total_calls if total_calls else 0:.2f}") + + print("\n--- 按任务类型分析 ---") + for task, stats in task_analysis.iterrows(): + calls = stats[('total_tokens', 'count')] + total = stats[('total_tokens', 'sum')] + avg = stats[('total_tokens', 'mean')] + print(f"{task}: {calls}次调用, 共{total:,}tokens, 平均{avg:.2f}tokens/次") + + print("\n--- 按交易对分析 ---") + for symbol, stats in symbol_analysis.iterrows(): + calls = stats[('total_tokens', 'count')] + total = stats[('total_tokens', 'sum')] + avg = stats[('total_tokens', 'mean')] + print(f"{symbol}: {calls}次调用, 共{total:,}tokens, 平均{avg:.2f}tokens/次") + + except Exception as e: + print(f"分析Token使用数据时出错: {e}") + + +def main(): + """主函数""" + parser = argparse.ArgumentParser(description="DeepSeek API Token使用情况分析工具") + + subparsers = parser.add_subparsers(dest="command", help="命令") + + # 显示统计信息 + show_parser = subparsers.add_parser("show", help="显示Token使用统计") + + # 导出使用数据 + export_parser = subparsers.add_parser("export", help="导出Token使用数据") + export_parser.add_argument( + "--format", "-f", + choices=["json", "csv"], + default="json", + help="导出格式 (默认: json)" + ) + export_parser.add_argument( + "--output", "-o", + help="输出文件路径" + ) + + # 分析使用数据 + analyze_parser = subparsers.add_parser("analyze", help="分析Token使用数据") + analyze_parser.add_argument( + "--input", "-i", + help="输入JSON文件路径,如果未指定则使用当前内存中的数据" + ) + + args = parser.parse_args() + + if args.command == "show": + show_token_usage_stats() + elif args.command == "export": + export_token_usage(args.format, args.output) + elif args.command == "analyze": + analyze_token_usage(args.input) + else: + # 默认显示统计信息 + show_token_usage_stats() + + +if __name__ == "__main__": + main() \ No newline at end of file