#!/usr/bin/env python3 """ 测试加密货币智能体 - 单次分析 """ import asyncio import sys import os # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) async def test_binance_service(): """测试 Binance 数据服务""" print("\n" + "=" * 50) print("测试 Binance 数据服务") print("=" * 50) from app.services.binance_service import binance_service # 测试获取 K 线数据 print("\n1. 获取 BTCUSDT 1小时 K线数据...") df = binance_service.get_klines('BTCUSDT', '1h', limit=10) if not df.empty: print(f" ✓ 获取成功,共 {len(df)} 条数据") print(f" 最新收盘价: ${df.iloc[-1]['close']:,.2f}") else: print(" ✗ 获取失败") return False # 测试多周期数据 print("\n2. 获取 BTCUSDT 多周期数据...") data = binance_service.get_multi_timeframe_data('BTCUSDT') for interval, df in data.items(): if not df.empty: print(f" ✓ {interval}: {len(df)} 条数据") else: print(f" ✗ {interval}: 无数据") # 测试技术指标 print("\n3. 检查技术指标...") df = data['1h'] indicators = ['ma5', 'ma20', 'rsi', 'macd', 'bb_upper', 'k', 'd', 'atr'] for ind in indicators: if ind in df.columns: value = df.iloc[-1][ind] print(f" ✓ {ind}: {value:.4f}" if value else f" - {ind}: N/A") return True async def test_signal_analyzer(): """测试信号分析器 - 使用新架构""" print("\n" + "=" * 50) print("测试市场信号分析器(新架构)") print("=" * 50) from app.services.binance_service import binance_service from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer analyzer = MarketSignalAnalyzer() # 获取数据 data = binance_service.get_multi_timeframe_data('BTCUSDT') # 测试 LLM 分析 print("\n1. LLM 市场分析...") signal = await analyzer.analyze('BTCUSDT', data, symbols=['BTCUSDT', 'ETHUSDT']) print(f" 市场状态: {signal.get('market_state')}") print(f" 趋势: {signal.get('trend')}") print(f" 信号数量: {len(signal.get('signals', []))}") return True async def test_feishu_service(): """测试飞书通知服务""" print("\n" + "=" * 50) print("测试飞书通知服务") print("=" * 50) from app.services.feishu_service import get_feishu_service feishu = get_feishu_service() if not feishu.enabled: print(" ⚠ 飞书服务未配置,跳过测试") return True # 发送测试消息 print("\n1. 发送测试文本消息...") result = await feishu.send_text("🧪 这是一条测试消息,来自加密货币智能体") print(f" {'✓ 发送成功' if result else '✗ 发送失败'}") return result async def test_full_analysis(): """测试完整分析流程""" print("\n" + "=" * 50) print("测试完整分析流程") print("=" * 50) from app.crypto_agent.crypto_agent import CryptoAgent agent = CryptoAgent() for symbol in ['BTCUSDT', 'ETHUSDT']: print(f"\n分析 {symbol}...") result = await agent.analyze_once(symbol) if 'error' in result: print(f" ✗ 错误: {result['error']}") else: print(f" 价格: ${result['price']:,.2f}") print(f" 趋势: {result['trend']}") print(f" 动作: {result['action']}") print(f" 置信度: {result['confidence']}%") if result.get('stop_loss'): print(f" 止损: ${result['stop_loss']:,.2f}") if result.get('take_profit'): print(f" 止盈: ${result['take_profit']:,.2f}") return True async def main(): """主测试函数""" print("\n" + "=" * 60) print("加密货币智能体测试") print("=" * 60) tests = [ ("Binance 数据服务", test_binance_service), ("信号分析器", test_signal_analyzer), ("飞书通知服务", test_feishu_service), ("完整分析流程", test_full_analysis), ] results = [] for name, test_func in tests: try: result = await test_func() results.append((name, result)) except Exception as e: print(f"\n✗ {name} 测试出错: {e}") results.append((name, False)) # 打印总结 print("\n" + "=" * 60) print("测试总结") print("=" * 60) for name, result in results: status = "✓ 通过" if result else "✗ 失败" print(f" {name}: {status}") all_passed = all(r for _, r in results) print("\n" + ("所有测试通过!" if all_passed else "部分测试失败")) return all_passed if __name__ == "__main__": success = asyncio.run(main()) sys.exit(0 if success else 1)