#!/usr/bin/env python # -*- coding: utf-8 -*- import os import sys import argparse from typing import Dict, Any # 添加项目根目录到Python路径 current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) sys.path.append(parent_dir) from cryptoai.agents.crypto_agent import CryptoAgent from cryptoai.utils.config_loader import ConfigLoader def parse_arguments(): """解析命令行参数""" parser = argparse.ArgumentParser(description='加密货币AI分析智能体') parser.add_argument('--config', type=str, default=None, help='配置文件路径,默认使用 cryptoai/config/config.yaml') parser.add_argument('--run-once', action='store_true', help='只运行一次分析周期,而不是持续运行') parser.add_argument('--symbol', type=str, default=None, help='只分析指定的交易对,例如 BTCUSDT') parser.add_argument('--days', type=int, default=None, help='获取历史数据的天数,默认使用配置中的值') parser.add_argument('--interval', type=str, default=None, help='K线时间间隔,例如 1h,默认使用配置中的值') parser.add_argument('--risk-level', type=str, default=None, choices=['low', 'medium', 'high'], help='风险等级,默认使用配置中的值') return parser.parse_args() def override_config_with_args(config_loader: ConfigLoader, args) -> None: """ 使用命令行参数覆盖配置 Args: config_loader: 配置加载器 args: 命令行参数 """ # 获取原始配置 crypto_config = config_loader.get_crypto_config() agent_config = config_loader.get_agent_config() data_config = config_loader.get_data_config() # 覆盖配置 if args.symbol: crypto_config['base_currencies'] = [args.symbol.replace(crypto_config['quote_currency'], '')] if args.days: data_config['historical_days'] = args.days if args.interval: crypto_config['time_interval'] = args.interval if args.risk_level: agent_config['risk_level'] = args.risk_level # 保存修改后的配置 # 注意:这只是修改了内存中的配置,没有写入文件 # 如果需要保存到文件,可以实现一个 save_config 方法 def analyze_single_symbol(agent: CryptoAgent, symbol: str, days: int = 30) -> Dict[str, Any]: """ 分析单个交易对 Args: agent: 加密货币智能体 symbol: 交易对符号 days: 历史数据天数 Returns: 分析结果 """ print(f"\n开始分析{symbol}...") # 获取并处理数据 raw_data = agent.fetch_historical_data(symbol, days=days) if raw_data.empty: print(f"无法获取{symbol}的数据") return {} processed_data = agent.process_data(symbol, raw_data) # 分析市场 analysis_result = agent.analyze_market(symbol, processed_data) # 预测价格 prediction_result = agent.predict_price(symbol) # 生成策略 strategy = agent.generate_strategy(symbol, analysis_result) # 整合结果 result = { "analysis": analysis_result, "prediction": prediction_result, "strategy": strategy } print(f"{symbol}分析完成") return result def main(): """主函数""" # 解析命令行参数 args = parse_arguments() try: # 加载配置 config_loader = ConfigLoader(args.config) # 使用命令行参数覆盖配置 override_config_with_args(config_loader, args) # 创建智能体 agent = CryptoAgent(config_path=args.config) # 如果指定了单个交易对 if args.symbol: result = analyze_single_symbol( agent=agent, symbol=args.symbol, days=args.days or agent.data_config['historical_days'] ) print("\n分析结果:") print(f"市场趋势: {result.get('analysis', {}).get('market_trend', 'unknown')}") print(f"24小时预测: {result.get('prediction', {}).get('prediction_24h', {})}") print(f"建议操作: {result.get('strategy', {}).get('position', 'unknown')}") else: # 启动智能体 agent.start_agent(run_once=args.run_once) except KeyboardInterrupt: print("\n程序已退出") except Exception as e: print(f"程序运行出错: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()