""" 测试 Bitget V2 API """ import sys sys.path.insert(0, '/Users/aaron/source_code/Stock_Agent') from backend.app.services.bitget_trading_api import get_bitget_trading_api from backend.app.utils.logger import logger def test_bitget_v2(): """测试 Bitget V2 API 各个功能""" print("\n" + "="*60) print("测试 Bitget V2 API") print("="*60) # 获取 API 实例 api = get_bitget_trading_api() if not api: print("❌ 无法初始化 Bitget API(请检查 .env 配置)") return False print(f"\n📡 API 端点: {api.base_url}") print(f"🔑 API Key: {api.api_key[:10]}...{api.api_key[-4:]}") # 测试 1: 连接测试和账户余额 print("\n" + "-"*60) print("测试 1: 账户余额查询") print("-"*60) try: balance = api.get_balance() if balance: usdt = balance.get('USDT', {}) print(f"✅ USDT 可用余额: {usdt.get('available', '0')}") print(f" 冻结: {usdt.get('frozen', '0')}") print(f" 锁定: {usdt.get('locked', '0')}") else: print("⚠️ 余额查询返回空数据") except Exception as e: print(f"❌ 余额查询失败: {e}") return False # 测试 2: 账户信息 print("\n" + "-"*60) print("测试 2: 账户信息查询") print("-"*60) try: account_info = api.get_account_info() if account_info: print(f"✅ 账户信息: {list(account_info.keys())}") else: print("⚠️ 账户信息查询返回空数据") except Exception as e: print(f"❌ 账户信息查询失败: {e}") # 测试 3: 查询持仓 print("\n" + "-"*60) print("测试 3: 持仓查询") print("-"*60) try: positions = api.get_position() if positions: print(f"✅ 当前持仓数: {len(positions)}") for pos in positions: symbol = pos.get('symbol', 'N/A') hold_side = pos.get('holdSide', 'N/A') total = pos.get('total', '0') leverage = pos.get('leverage', 'N/A') print(f" {symbol} {hold_side}: {total} 张 (杠杆 {leverage}x)") else: print("✅ 当前无持仓") except Exception as e: print(f"❌ 持仓查询失败: {e}") return False # 测试 4: 查询当前挂单 print("\n" + "-"*60) print("测试 4: 当前挂单查询") print("-"*60) try: open_orders = api.get_open_orders() if open_orders: print(f"✅ 当前挂单数: {len(open_orders)}") for order in open_orders[:5]: # 只显示前5个 symbol = order.get('symbol', 'N/A') side = order.get('side', 'N/A') size = order.get('size', '0') price = order.get('price', '市价') print(f" {symbol} {side} {size} @ {price}") if len(open_orders) > 5: print(f" ... 还有 {len(open_orders) - 5} 个挂单") else: print("✅ 当前无挂单") except Exception as e: print(f"❌ 挂单查询失败: {e}") return False # 测试 5: 查询历史订单 print("\n" + "-"*60) print("测试 5: 历史订单查询 (BTCUSDT)") print("-"*60) try: history_orders = api.get_history_orders('BTCUSDT', limit=10) if history_orders: print(f"✅ 最近 {len(history_orders)} 条历史订单") for order in history_orders[:3]: # 只显示前3个 order_id = order.get('orderId', 'N/A') side = order.get('side', 'N/A') order_type = order.get('orderType', 'N/A') state = order.get('state', 'N/A') print(f" 订单 {order_id}: {side} {order_type} - {state}") else: print("✅ 暂无历史订单") except Exception as e: print(f"❌ 历史订单查询失败: {e}") print("\n" + "="*60) print("✅ 所有核心功能测试完成!V2 API 工作正常") print("="*60 + "\n") return True if __name__ == "__main__": success = test_bitget_v2() sys.exit(0 if success else 1)