#!/usr/bin/env python3 """ Bitget vs Binance 数据对比测试脚本 测试内容: 1. K线数据对比 2. 当前价格对比 3. 资金费率对比 4. 多周期数据对比 5. 技术指标计算对比 """ import sys import os from pathlib import Path # 添加项目路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "backend")) import pandas as pd from app.services.binance_service import binance_service from app.services.bitget_service import bitget_service def print_section(title: str): """打印分节标题""" print("\n" + "=" * 80) print(f" {title}") print("=" * 80) def print_comparison(label: str, binance_value, bitget_value, tolerance: float = 0.01): """打印对比结果""" # 计算差异百分比 if binance_value and bitget_value: if isinstance(binance_value, (int, float)) and isinstance(bitget_value, (int, float)): diff_percent = abs(binance_value - bitget_value) / binance_value * 100 if binance_value != 0 else 0 match = "✅" if diff_percent <= tolerance else "❌" print(f"{match} {label}") print(f" Binance: {binance_value}") print(f" Bitget: {bitget_value}") print(f" 差异: {diff_percent:.2f}%") else: print(f"✅ {label}") print(f" Binance: {binance_value}") print(f" Bitget: {bitget_value}") else: print(f"❌ {label}") print(f" Binance: {binance_value}") print(f" Bitget: {bitget_value}") def test_kline_data(symbol: str = "BTCUSDT"): """测试 K线数据""" print_section("1. K线数据对比") interval = '5m' limit = 100 print(f"\n获取 {symbol} {interval} K线数据({limit}根)...") # 获取数据 binance_df = binance_service.get_klines(symbol, interval, limit) bitget_df = bitget_service.get_klines(symbol, interval, limit) if binance_df.empty: print("❌ Binance 数据为空") return if bitget_df.empty: print("❌ Bitget 数据为空") return print(f"\n✅ Binance 获取 {len(binance_df)} 根 K线") print(f"✅ Bitget 获取 {len(bitget_df)} 根 K线") # 对比最新一根K线 print("\n最新K线对比:") b_latest = binance_df.iloc[-1] g_latest = bitget_df.iloc[-1] print_comparison("开盘价", float(b_latest['open']), float(g_latest['open']), tolerance=0.1) print_comparison("最高价", float(b_latest['high']), float(g_latest['high']), tolerance=0.1) print_comparison("最低价", float(b_latest['low']), float(g_latest['low']), tolerance=0.1) print_comparison("收盘价", float(b_latest['close']), float(g_latest['close']), tolerance=0.1) print_comparison("成交量", float(b_latest['volume']), float(g_latest['volume']), tolerance=1.0) # 对比时间戳 print(f"\n时间对比:") print(f" Binance: {b_latest['open_time']}") print(f" Bitget: {g_latest['open_time']}") # 检查数据结构 print("\n数据结构对比:") print(f" Binance 列: {list(binance_df.columns)}") print(f" Bitget 列: {list(bitget_df.columns)}") def test_current_price(symbol: str = "BTCUSDT"): """测试当前价格""" print_section("2. 当前价格对比") print(f"\n获取 {symbol} 当前价格...") binance_price = binance_service.get_current_price(symbol) bitget_price = bitget_service.get_current_price(symbol) print_comparison("当前价格", binance_price, bitget_price, tolerance=0.05) def test_funding_rate(symbol: str = "BTCUSDT"): """测试资金费率""" print_section("3. 资金费率对比") print(f"\n获取 {symbol} 资金费率...") binance_fr = binance_service.get_funding_rate(symbol) bitget_fr = bitget_service.get_funding_rate(symbol) if not binance_fr: print("❌ Binance 资金费率数据为空") return if not bitget_fr: print("❌ Bitget 资金费率数据为空") return print("\n资金费率对比:") # 对比资金费率 fr_diff = abs(binance_fr['funding_rate'] - bitget_fr['funding_rate']) fr_match = "✅" if fr_diff < 0.0001 else "❌" print(f"{fr_match} 资金费率") print(f" Binance: {binance_fr['funding_rate']:.6f} ({binance_fr['funding_rate_percent']:.4f}%)") print(f" Bitget: {bitget_fr['funding_rate']:.6f} ({bitget_fr['funding_rate_percent']:.4f}%)") print(f" 差异: {fr_diff:.6f}") # 对比标记价格 print_comparison("标记价格", binance_fr.get('mark_price'), bitget_fr.get('mark_price'), tolerance=0.1) # 对比指数价格 print_comparison("指数价格", binance_fr.get('index_price'), bitget_fr.get('index_price'), tolerance=0.1) # 对比市场情绪 print(f"\n市场情绪:") print(f" Binance: {binance_fr.get('sentiment', 'N/A')}") print(f" Bitget: {bitget_fr.get('sentiment', 'N/A')}") def test_multi_timeframe(symbol: str = "BTCUSDT"): """测试多周期数据""" print_section("4. 多周期数据对比") print(f"\n获取 {symbol} 多周期数据...") binance_data = binance_service.get_multi_timeframe_data(symbol) bitget_data = bitget_service.get_multi_timeframe_data(symbol) intervals = ['5m', '15m', '1h', '4h'] print("\n各周期数据量:") for interval in intervals: b_count = len(binance_data.get(interval, [])) g_count = len(bitget_data.get(interval, [])) print(f" {interval}: Binance {b_count}根, Bitget {g_count}根") # 对比最新价格 print("\n各周期最新价格:") for interval in intervals: b_df = binance_data.get(interval, pd.DataFrame()) g_df = bitget_data.get(interval, pd.DataFrame()) if not b_df.empty and not g_df.empty: b_price = float(b_df.iloc[-1]['close']) g_price = float(g_df.iloc[-1]['close']) print(f" {interval}: Binance ${b_price:,.2f}, Bitget ${g_price:,.2f}") def test_technical_indicators(symbol: str = "BTCUSDT"): """测试技术指标计算""" print_section("5. 技术指标计算对比") print(f"\n获取 {symbol} 1h K线并计算指标...") binance_df = binance_service.get_klines(symbol, '1h', 100) bitget_df = bitget_service.get_klines(symbol, '1h', 100) if binance_df.empty or bitget_df.empty: print("❌ K线数据为空") return # 计算指标 binance_df = binance_service.calculate_indicators(binance_df, '1h') bitget_df = bitget_service.calculate_indicators(bitget_df, '1h') # 对比最新的指标值 print("\n最新技术指标对比:") b_latest = binance_df.iloc[-1] g_latest = bitget_df.iloc[-1] # RSI print_comparison("RSI(14)", b_latest['rsi'], g_latest['rsi'], tolerance=5.0) # MACD print_comparison("MACD", b_latest['macd'], g_latest['macd'], tolerance=10.0) # 布林带 print_comparison("布林带上轨", b_latest['bb_upper'], g_latest['bb_upper'], tolerance=0.5) print_comparison("布林带中轨", b_latest['bb_middle'], g_latest['bb_middle'], tolerance=0.5) print_comparison("布林带下轨", b_latest['bb_lower'], g_latest['bb_lower'], tolerance=0.5) # 移动平均线 print_comparison("MA5", b_latest['ma5'], g_latest['ma5'], tolerance=0.2) print_comparison("MA10", b_latest['ma10'], g_latest['ma10'], tolerance=0.2) print_comparison("MA20", b_latest['ma20'], g_latest['ma20'], tolerance=0.2) def test_ticker(symbol: str = "BTCUSDT"): """测试 ticker 数据""" print_section("6. Ticker 数据对比") print(f"\n获取 {symbol} ticker 数据...") binance_stats = binance_service.get_24h_stats(symbol) bitget_ticker = bitget_service.get_ticker(symbol) if not binance_stats: print("❌ Binance ticker 数据为空") return if not bitget_ticker: print("❌ Bitget ticker 数据为空") return print("\n24h 统计对比:") print_comparison("最新价", binance_stats['price'], float(bitget_ticker['lastPrice']), tolerance=0.1) print_comparison("24h最高", binance_stats['high'], float(bitget_ticker['highPrice24h']), tolerance=0.5) print_comparison("24h最低", binance_stats['low'], float(bitget_ticker['lowPrice24h']), tolerance=0.5) print_comparison("24h成交量", binance_stats['volume'], float(bitget_ticker['volume24h']), tolerance=5.0) def main(): """主函数""" print("\n" + "🚀" * 40) print("\nBitget vs Binance 数据对比测试") print(f"测试时间: {pd.Timestamp.now()}") # 测试交易对 test_symbol = "BTCUSDT" try: # 1. K线数据 test_kline_data(test_symbol) # 2. 当前价格 test_current_price(test_symbol) # 3. 资金费率 test_funding_rate(test_symbol) # 4. 多周期数据 test_multi_timeframe(test_symbol) # 5. 技术指标 test_technical_indicators(test_symbol) # 6. Ticker 数据 test_ticker(test_symbol) print("\n" + "=" * 80) print(" ✅ 所有测试完成!") print("=" * 80 + "\n") except Exception as e: print(f"\n❌ 测试出错: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()