stock-ai-agent/backend/tests/test_bitget_live_integration.py
2026-03-30 11:56:28 +08:00

520 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Bitget UTA V3 真实 API 集成测试
⚠️ 警告:此测试会使用真实 API 调用和真实订单!
- 使用最小下单量1 张 BTC = 0.01 BTC
- 市价单会立即成交,产生实际盈亏
- 测试后自动清理所有订单和持仓
覆盖接口:
- [UTA V3] 查询余额 (privateUtaGetV3AccountAssets)
- [UTA V3] 设置杠杆 (privateUtaPostV3AccountSetLeverage)
- [UTA V3] 查询持仓 (fetch_positions + uta:True)
- [UTA V3] 市价开仓 (create_order + uta:True)
- [UTA V3] 限价挂单 (create_order + uta:True)
- [UTA V3] 撤单 (cancel_all_orders + uta:True)
- [UTA V3] 市价平仓 (create_market_order + uta:True + reduceOnly)
- [UTA V3] 止盈止损 (modify_sl_tp + uta:True)
- [Service] 完整交易流程: 账户状态 → 开仓 → 持仓验证 → 平仓
运行方式:
cd backend
python3 tests/test_bitget_live_integration.py
"""
import os
import sys
import time
import traceback
from datetime import datetime
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '..', '.env'))
from app.services.bitget_trading_api_sdk import BitgetTradingAPI
from app.services.bitget_live_trading_service import BitgetLiveTradingService
from app.config import get_settings
# ==================== 测试配置 ====================
TEST_SYMBOL = 'BTCUSDT' # 测试交易对
TEST_CONTRACTS = 1 # 最小下单量 (1 张 = 0.01 BTC)
TEST_LEVERAGE = 5 # 测试杠杆倍数
LIMIT_OFFSET_PCT = 0.05 # 限价单偏离当前价百分比5%,确保不成交)
class TestResult:
"""测试结果收集器"""
def __init__(self):
self.results = []
def record(self, name: str, passed: bool, detail: str = ""):
self.results.append((name, passed, detail))
status = "✅ PASS" if passed else "❌ FAIL"
print(f" {status}: {name}")
if detail:
print(f" {detail}")
def summary(self):
print(f"\n{'='*60}")
print("测试结果汇总")
print(f"{'='*60}")
passed = sum(1 for _, p, _ in self.results if p)
total = len(self.results)
for name, p, detail in self.results:
status = "" if p else ""
line = f" {status} {name}"
if not p and detail:
line += f"{detail}"
print(line)
print(f"\n<><E680BB><EFBFBD>: {passed}/{total} 通过")
print(f"{'='*60}")
return passed == total
# ==================== 初始化 ====================
def create_api() -> BitgetTradingAPI:
"""创建 BitgetTradingAPI 实例"""
api_key = os.getenv('BITGET_API_KEY', '')
api_secret = os.getenv('BITGET_API_SECRET', '')
passphrase = os.getenv('BITGET_PASSPHRASE', '')
if not api_key or not api_secret:
raise ValueError("请在 .env 中配置 BITGET_API_KEY 和 BITGET_API_SECRET")
return BitgetTradingAPI(
api_key=api_key,
api_secret=api_secret,
passphrase=passphrase,
use_testnet=os.getenv('BITGET_USE_TESTNET', 'true').lower() == 'true'
)
def create_service(api: BitgetTradingAPI) -> BitgetLiveTradingService:
"""创建 BitgetLiveTradingService 实例(绕过 __init__ 避免重复调用)"""
service = BitgetLiveTradingService.__new__(BitgetLiveTradingService)
service.trading_api = api
service.settings = get_settings()
service.max_total_leverage = 10.0
service.max_single_position = 100.0
service.circuit_breaker_drawdown = 0.25
service.initial_balance = None
return service
# ==================== SDK 层测试 ====================
def test_sdk_connection(api: BitgetTradingAPI, r: TestResult):
"""SDK: API 连接 + 服务器时间"""
try:
server_time = api.exchange.fetch_time()
dt = datetime.fromtimestamp(server_time / 1000)
r.record("SDK 连接", True, f"服务器时间: {dt}")
except Exception as e:
r.record("SDK 连接", False, str(e))
def test_sdk_load_markets(api: BitgetTradingAPI, r: TestResult):
"""SDK: 加载市场信息(验证 BTC/USDT:USDT 存在)"""
try:
markets = api.exchange.load_markets()
btc = markets.get('BTC/USDT:USDT')
if btc:
contract_size = btc.get('contractSize', 'N/A')
r.record("加载市场", True, f"BTC contractSize={contract_size}")
else:
r.record("加载市场", False, "BTC/USDT:USDT 不在市场列表中")
except Exception as e:
r.record("加载市场", False, str(e))
def test_sdk_get_balance(api: BitgetTradingAPI, r: TestResult):
"""SDK: [UTA V3] 查询余额 (privateUtaGetV3AccountAssets)"""
try:
balance = api.get_balance()
usdt = balance.get('USDT', {})
available = usdt.get('available', '0')
equity = usdt.get('equity', '0')
r.record(
"UTA 查询余额",
float(available) > 0 or float(equity) > 0,
f"可用={available}, 权益={equity}"
)
except Exception as e:
r.record("UTA 查询余额", False, str(e))
def test_sdk_set_leverage(api: BitgetTradingAPI, r: TestResult):
"""SDK: [UTA V3] 设置杠杆 (privateUtaPostV3AccountSetLeverage)"""
try:
success = api.set_leverage(TEST_SYMBOL, TEST_LEVERAGE)
r.record("UTA 设置杠杆", success, f"{TEST_SYMBOL}{TEST_LEVERAGE}x")
except Exception as e:
r.record("UTA 设置杠杆", False, str(e))
def test_sdk_get_positions(api: BitgetTradingAPI, r: TestResult):
"""SDK: [UTA V3] 查询持仓 (fetch_positions + uta:True)"""
try:
positions = api.get_position()
# 只要不报错就算通过UTA 账户必须能查到
count = len([p for p in positions if float(p.get('contracts', 0)) > 0])
r.record("UTA 查询持仓", True, f"当前活跃持仓: {count}")
except Exception as e:
r.record("UTA 查询持仓", False, str(e))
def test_sdk_limit_order_and_cancel(api: BitgetTradingAPI, r: TestResult):
"""SDK: [UTA V3] 下限价单 + 查询挂单 + 撤单"""
order_id = None
try:
# 1. 获取当前价格
ticker = api.exchange.fetch_ticker('BTC/USDT:USDT')
price = ticker['last']
limit_price = round(price * (1 - LIMIT_OFFSET_PCT), 1) # 低 5% 挂单
# 2. 下限价单1 张 BTC = 0.01 BTC
order = api.place_order(
symbol=TEST_SYMBOL,
side='buy',
order_type='limit',
size=TEST_CONTRACTS,
price=limit_price
)
if not order:
r.record("UTA 限价挂单", False, "下单返回空")
return
order_id = order.get('id')
status = order.get('status', 'unknown')
r.record("UTA 限价挂单", order_id is not None, f"id={order_id}, status={status}, price=${limit_price:,.1f}")
time.sleep(1)
# 3. 查询挂单
open_orders = api.get_open_orders(TEST_SYMBOL)
found = any(o.get('id') == order_id for o in open_orders)
r.record("UTA 查询挂单", found or len(open_orders) > 0, f"挂单数={len(open_orders)}")
# 4. 撤单
cancel_ok = api.cancel_all_orders(TEST_SYMBOL)
r.record("UTA 撤单", cancel_ok, f"cancel_all_orders → {cancel_ok}")
time.sleep(1)
# 5. 验证挂单已撤
remaining = api.get_open_orders(TEST_SYMBOL)
r.record("验证撤单完成", len(remaining) == 0, f"剩余挂单: {len(remaining)}")
except Exception as e:
r.record("UTA 限价单流程", False, f"{e}\n{traceback.format_exc()}")
# 清理
if order_id:
try:
api.cancel_all_orders(TEST_SYMBOL)
except:
pass
def test_sdk_market_order_flow(api: BitgetTradingAPI, r: TestResult):
"""SDK: [UTA V3] 市价开仓 → 验证持仓 → 止盈止损 → 市价平仓"""
opened = False
try:
# 1. 获取当前价格
ticker = api.exchange.fetch_ticker('BTC/USDT:USDT')
price = ticker['last']
print(f"\n 当前 BTC: ${price:,.2f}")
# 2. 市价开多 1 张
order = api.place_order(
symbol=TEST_SYMBOL,
side='buy',
order_type='market',
size=TEST_CONTRACTS,
)
if not order:
r.record("UTA 市价开仓", False, "下单返回空")
return
avg = order.get('average') or order.get('price') or price
avg = float(avg) if avg else price
r.record("UTA 市价开仓", True, f"id={order.get('id')}, avg=${avg:,.2f}")
opened = True
time.sleep(2)
# 3. 验证持仓
positions = api.get_position(TEST_SYMBOL)
has_pos = any(
float(p.get('info', {}).get('available', 0)) > 0
for p in positions
)
r.record("验证持仓存在", has_pos)
# 4. 设置止盈止损
tp_price = round(price * 1.02, 1) # +2%
sl_price = round(price * 0.98, 1) # -2%
try:
tp_sl_ok = api.modify_sl_tp(TEST_SYMBOL, stop_loss=sl_price, take_profit=tp_price)
r.record("UTA 止盈止损", tp_sl_ok, f"TP=${tp_price:,.1f}, SL=${sl_price:,.1f}")
except Exception as e:
r.record("UTA 止盈止损", False, str(e))
time.sleep(1)
# 5. 撤销止盈止损挂单
try:
api.cancel_all_orders(TEST_SYMBOL)
except:
pass
time.sleep(1)
# 6. 市价平仓
close_order = api.close_position(TEST_SYMBOL)
if close_order:
close_avg = close_order.get('average') or close_order.get('price') or 0
if close_avg:
close_avg = float(close_avg)
r.record("UTA 市价平仓", True, f"平仓价=${close_avg:,.2f}")
opened = False
else:
r.record("UTA 市价平仓", False, "close_position 返回空")
time.sleep(2)
# 7. 验证已平仓
positions_after = api.get_position(TEST_SYMBOL)
still_open = any(
float(p.get('info', {}).get('available', 0)) > 0
for p in positions_after
)
r.record("验证已平仓", not still_open)
except Exception as e:
r.record("市价单流程异常", False, f"{e}\n{traceback.format_exc()}")
finally:
# 确保清理
if opened:
try:
api.cancel_all_orders(TEST_SYMBOL)
time.sleep(0.5)
api.close_position(TEST_SYMBOL)
print(" 🧹 已自动清理残留持仓")
except Exception as cleanup_err:
print(f" ⚠️ 清理失败,请手动检查: {cleanup_err}")
# ==================== Service 层测试 ====================
def test_service_account_state(service: BitgetLiveTradingService, r: TestResult):
"""Service: 获取账户状态"""
try:
state = service.get_account_state()
av = state['account_value']
ab = state['available_balance']
r.record(
"Service 账户状态",
av > 0,
f"权益=${av:,.2f}, 可用=${ab:,.2f}, 已用=${state['total_margin_used']:,.2f}"
)
except Exception as e:
r.record("Service 账户状态", False, str(e))
def test_service_market_order_flow(service: BitgetLiveTradingService, r: TestResult):
"""Service: 完整下单流程 (place_market_order → get_open_positions → market_close_position)"""
opened = False
try:
# 1. 市价开多 1 张 BTC
result = service.place_market_order('BTC', is_buy=True, size=TEST_CONTRACTS)
if not result.get('success'):
r.record("Service 市价开仓", False, result.get('error', '未知'))
return
r.record("Service 市价开仓", True, f"order_id={result.get('order_id')}")
opened = True
time.sleep(2)
# 2. 验证持仓
positions = service.get_open_positions()
btc_pos = next((p for p in positions if p['coin'] == 'BTC'), None)
r.record("Service 持仓验证", btc_pos is not None,
f"size={btc_pos['size']}, entry=${btc_pos['entry_price']:,.2f}" if btc_pos else "未找到 BTC 持仓")
# 3. 风控检查
risk = service.check_risk_limits()
r.record("Service 风控检查", risk['allowed'], risk.get('reason', '通过'))
time.sleep(1)
# 4. 市价平仓
close_result = service.market_close_position('BTC')
r.record("Service 市价平仓", close_result.get('success', False),
close_result.get('error', f"size={close_result.get('size', 'N/A')}"))
if close_result.get('success'):
opened = False
time.sleep(2)
# 5. 验证已平仓
positions_after = service.get_open_positions()
btc_after = next((p for p in positions_after if p['coin'] == 'BTC'), None)
r.record("Service 平仓验证", btc_after is None)
except Exception as e:
r.record("Service 下单流程异常", False, f"{e}\n{traceback.format_exc()}")
finally:
if opened:
try:
service.market_close_position('BTC')
print(" 🧹 已自动清理残留持仓")
except:
print(" ⚠️ 清理失败,请手动检查")
def test_service_limit_order_flow(service: BitgetLiveTradingService, r: TestResult):
"""Service: 限价单流程 (place_limit_order → get_open_orders → cancel_order)"""
try:
# 获取当前价格
ticker = service.trading_api.exchange.fetch_ticker('BTC/USDT:USDT')
price = ticker['last']
limit_price = round(price * (1 - LIMIT_OFFSET_PCT), 1)
# 1. 下限价买单
result = service.place_limit_order('BTC', is_buy=True, size=TEST_CONTRACTS, price=limit_price)
if not result.get('success'):
r.record("Service 限价挂单", False, result.get('error', '未知'))
return
order_id = result.get('order_id')
r.record("Service 限价挂单", True, f"id={order_id}, status={result.get('order_status')}")
time.sleep(1)
# 2. 查询挂单
orders = service.get_open_orders('BTC')
r.record("Service 查询挂单", len(orders) > 0, f"挂单数={len(orders)}")
# 3. 撤单
if order_id:
cancel = service.cancel_order('BTC', order_id)
r.record("Service 撤单", cancel.get('success', False))
time.sleep(1)
# 4. 验证撤单完成
remaining = service.get_open_orders('BTC')
r.record("Service 验证撤单", len(remaining) == 0, f"剩余={len(remaining)}")
except Exception as e:
r.record("Service 限价单流程", False, str(e))
try:
service.cancel_all_orders('BTC')
except:
pass
def test_service_leverage_sync(service: BitgetLiveTradingService, r: TestResult):
"""Service: 杠杆同步"""
try:
result = service.sync_default_leverage(['BTCUSDT', 'ETHUSDT'], leverage=TEST_LEVERAGE)
r.record(
"Service 杠杆同步",
result['success'],
f"{TEST_LEVERAGE}x → {result.get('results', {})}"
)
except Exception as e:
r.record("Service 杠杆同步", False, str(e))
# ==================== 主入口 ====================
def main():
print(f"\n{'='*60}")
print(f" Bitget UTA V3 实盘接口集成测试")
print(f"{'='*60}")
print(f" 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f" 交易对: {TEST_SYMBOL}")
print(f" 下单量: {TEST_CONTRACTS} 张 (0.01 BTC)")
print(f" 杠杆: {TEST_LEVERAGE}x")
print(f"{'='*60}")
r = TestResult()
# 初始化
try:
api = create_api()
service = create_service(api)
print(f" API Key: {api.api_key[:10]}...")
print(f" UTA 模式: {api.use_unified_account}")
print(f" 测试网: {api.use_testnet}")
except Exception as e:
print(f"\n❌ 初始化失败: {e}")
traceback.print_exc()
sys.exit(1)
# ---- SDK 层测试 ----
print(f"\n{''*40}")
print(" SDK 层 (BitgetTradingAPI)")
print(f"{''*40}")
test_sdk_connection(api, r)
time.sleep(0.3)
test_sdk_load_markets(api, r)
time.sleep(0.3)
test_sdk_get_balance(api, r)
time.sleep(0.3)
test_sdk_set_leverage(api, r)
time.sleep(0.3)
test_sdk_get_positions(api, r)
time.sleep(0.3)
test_sdk_limit_order_and_cancel(api, r)
time.sleep(0.5)
test_sdk_market_order_flow(api, r)
time.sleep(0.5)
# ---- Service 层测试 ----
print(f"\n{''*40}")
print(" Service 层 (BitgetLiveTradingService)")
print(f"{''*40}")
test_service_account_state(service, r)
time.sleep(0.3)
test_service_leverage_sync(service, r)
time.sleep(0.3)
test_service_limit_order_flow(service, r)
time.sleep(0.5)
test_service_market_order_flow(service, r)
# ---- 汇总 ----
all_passed = r.summary()
sys.exit(0 if all_passed else 1)
if __name__ == '__main__':
print("\n⚠️ 此测试会产生真实订单和手续费!")
print(" 使用最小量 1 张 BTC (约 $600-1000 保证金)")
confirm = input("\n是否继续?(yes/no): ")
if confirm.strip().lower() != 'yes':
print("已取消")
sys.exit(0)
main()