import requests from binance.spot import Spot import pandas as pd from binance.um_futures import UMFutures import urllib3 api_key = "HCpeel8g6fsTK2630b7BvGBcS09Z3qfXkLVcAY2JkpaiMm1J6DWRvoQZBQlElDJg" api_secret= "TySs6onlHOTrGzV8fMdDxLKTWWYnQ4rCHVAmjrcHby17acKflmo7xVTWVsbqtxe7" client = Spot(api_key, api_secret) future_client = UMFutures(api_key, api_secret) # 获取市值前top的币种 def _get_top_coins_by_market_cap(top): coingecko_url = "https://api.coingecko.com/api/v3/coins/markets" params = { 'vs_currency': 'usd', 'order': 'market_cap_desc', 'per_page': top, 'page': 1 } urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning) response = requests.get(coingecko_url, params=params, verify=False) coins = response.json() return coins # 获取Binance上的USDT交易对信息 def _get_binance_usdt_pairs(): url = "https://api.binance.com/api/v3/exchangeInfo" urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning) response = requests.get(url, verify=False) data = response.json() usdt_pairs = [symbol['symbol'] for symbol in data['symbols'] if (symbol['quoteAsset'] == 'USDT' and symbol['status'] == 'TRADING')] return usdt_pairs def get_top_binance_usdt_pairs(top): # 获取市值前10的币种 top_coins = _get_top_coins_by_market_cap(top) # 获取Binance上的USDT交易对 usdt_pairs = _get_binance_usdt_pairs() # 筛选出前10币种中与USDT有交易对的币种 top_pairs = [] for coin in top_coins: coin_symbol = coin['symbol'].upper() pair = f"{coin_symbol}USDT" if pair in usdt_pairs: top_pairs.append(pair) return top_pairs ## 获取所有现货交易对 def get_symbols(): data = client.exchange_info()["symbols"] # 创建DataFrame columns = ['symbol', 'status', 'baseAsset', 'quoteAsset'] df = pd.DataFrame(data, columns=columns) df = df[(df['status'] == 'TRADING') & (df['quoteAsset'] == 'USDT')] return df['symbol'] def get_future_symbols(): data = future_client.exchange_info()["symbols"] # 创建DataFrame columns = ['symbol', 'status','contractType', 'baseAsset', 'quoteAsset'] df = pd.DataFrame(data, columns=columns) # 过滤出交易对 df = df[(df['status'] == 'TRADING') & (df['contractType'] == 'PERPETUAL')] return df['symbol'] ## 根据交易对和周期获取数据集 def get_klines(symbol,interval, future = False,limit=1000): # 获取 k 线数据 data = {} if future == True: data = future_client.klines(symbol, interval, limit=limit) else: data = client.klines(symbol, interval,limit=limit) # 将数据转换为DataFrame columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'] df = pd.DataFrame(data, columns=columns) # 转化成 float df['open'] = df['open'].astype('float64') df['high'] = df['high'].astype('float64') df['low'] = df['low'].astype('float64') df['close'] = df['close'].astype('float64') df['volume'] = df['volume'].astype('float64') # 将时间戳转换为日期时间格式 df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).map(lambda x: x.tz_convert('Asia/Shanghai')) df['close_time'] = pd.to_datetime(df['close_time'], unit='ms', utc=True).map(lambda x: x.tz_convert('Asia/Shanghai')) return df