""" 实盘交易简单测试 - 使用 API 直接测试 测试流程: 1. 获取账户余额 2. 下一个小单 3. 查询订单 4. 平仓 """ import requests import json import time BASE_URL = "http://localhost:8000" def print_section(title): print("\n" + "=" * 50) print(f" {title}") print("=" * 50) def test_get_account(): """获取账户信息""" print_section("1. 获取账户信息") response = requests.get(f"{BASE_URL}/api/real-trading/account") data = response.json() if data.get('success'): account = data['account'] print(f"✅ 账户余额: ${account['current_balance']:.2f}") print(f" 可用: ${account['available']:.2f}") print(f" 已用: ${account['used_margin']:.2f}") print(f" 持仓价值: ${account['total_position_value']:.2f}") return account else: print(f"❌ 获取账户信息失败: {data.get('message')}") return None def test_get_positions(): """获取持仓""" print_section("2. 获取当前持仓") response = requests.get(f"{BASE_URL}/api/real-trading/positions") data = response.json() if data.get('success'): positions = data['positions'] print(f"✅ 持仓数量: {len(positions)}") for pos in positions: if pos.get('holding', 0) > 0: print(f" {pos['symbol']}: {pos['holding']} USDT") return positions else: print(f"❌ 获取持仓失败: {data.get('message')}") return [] def test_place_small_order(): """下一个小测试单""" print_section("3. 下测试单") # 使用模拟交易API测试(更安全) symbol = "BTCUSDT" # 先获取当前价格 ticker_response = requests.get(f"{BASE_URL}/api/bitget/ticker?symbol={symbol}") ticker_data = ticker_response.json() if not ticker_data.get('success'): print(f"❌ 获取价格失败") return None current_price = ticker_data['data'].get('last_price', 0) print(f" 当前价格: ${current_price:,.2f}") # 计算测试参数 stop_loss = current_price * 0.95 take_profit = current_price * 1.05 order_data = { "symbol": symbol, "action": "buy", "entry_type": "market", "entry_price": current_price, "stop_loss": stop_loss, "take_profit": take_profit, "confidence": 60, "signal_grade": "C", "position_size": "light" } print(f" 测试参数:") print(f" 止损: ${stop_loss:,.2f}") print(f" 止盈: ${take_profit:,.2f}") # 先用模拟交易测试 print(f"\n 📝 正在创建模拟交易订单...") response = requests.post(f"{BASE_URL}/api/paper-trading/order", json=order_data) data = response.json() if data.get('success'): order = data.get('order') print(f"✅ 模拟订单创建成功!") print(f" 订单ID: {order['order_id']}") print(f" 数量: ${order['quantity']:.2f}") return order else: print(f"❌ 下单失败: {data.get('message')}") return None def test_get_orders(): """获取订单列表""" print_section("4. 获取订单列表") response = requests.get(f"{BASE_URL}/api/paper-trading/orders?status=active&limit=10") data = response.json() if data.get('success'): orders = data['orders'] print(f"✅ 活跃订单数量: {len(orders)}") for order in orders: print(f" {order['symbol']} {order['side']} | " f"状态: {order['status']} | " f"数量: ${order['quantity']:.2f}") return orders else: print(f"❌ 获取订单失败: {data.get('message')}") return [] def test_close_order(order_id): """平仓""" print_section("5. 平仓测试") print(f" 正在平仓订单: {order_id}") response = requests.post(f"{BASE_URL}/api/paper-trading/close", json={ "order_id": order_id, "reason": "测试平仓" }) data = response.json() if data.get('success'): print(f"✅ 平仓成功!") print(f" 平仓价格: ${data.get('close_price', 0):,.2f}") print(f" 盈亏: ${data.get('pnl', 0):.2f}") return True else: print(f"❌ 平仓失败: {data.get('message')}") return False def main(): print("\n" + "🧪" * 25) print(" 实盘交易流程测试 (模拟模式)") print("🧪" * 25) print("\n此测试使用模拟交易,不会动用真实资金") input("\n按 Enter 开始测试...") # 测试1: 获取账户 test_get_account() # 测试2: 获取持仓 test_get_positions() # 测试3: 下单 order = test_place_small_order() if order: time.sleep(1) # 测试4: 查询订单 test_get_orders() time.sleep(1) # 测试5: 平仓 test_close_order(order['order_id']) print_section("测试完成") print("✅ 所有测试已完成") print(" 请检查前端页面确认订单状态") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n⚠️ 测试被用户中断") except Exception as e: print(f"\n\n❌ 测试出错: {e}") import traceback traceback.print_exc()