#!/usr/bin/env python # -*- coding: utf-8 -*- import os import time import hmac import base64 import json import requests import hashlib from typing import Dict, Any, Optional, List from datetime import datetime, timedelta import pandas as pd class OKXAPI: """OKX API客户端,用于获取加密货币数据""" def __init__(self, api_key: str, api_secret: str, passphrase: str, test_mode: bool = True): """ 初始化OKX API客户端 Args: api_key: OKX API密钥 api_secret: OKX API密钥 passphrase: OKX API密码 test_mode: 是否使用测试模式 """ self.api_key = api_key self.api_secret = api_secret self.passphrase = passphrase self.test_mode = test_mode # API基础URL if test_mode: self.base_url = "https://www.okx.com" # OKX生产环境 else: self.base_url = "https://www.okx.com" # OKX生产环境 self.session = requests.Session() def _get_timestamp(self) -> str: """获取ISO格式的时间戳""" return datetime.utcnow().isoformat("T", "milliseconds") + "Z" def _sign(self, timestamp: str, method: str, request_path: str, body: str = "") -> Dict[str, str]: """ 生成OKX API签名 Args: timestamp: 时间戳 method: HTTP方法,例如 GET, POST request_path: 请求路径 body: 请求体,默认为空 Returns: 包含签名的请求头 """ if not body: body = "" message = timestamp + method + request_path + body mac = hmac.new( bytes(self.api_secret, encoding="utf-8"), bytes(message, encoding="utf-8"), digestmod=hashlib.sha256 ) d = mac.digest() signature = base64.b64encode(d).decode() headers = { "OK-ACCESS-KEY": self.api_key, "OK-ACCESS-SIGN": signature, "OK-ACCESS-TIMESTAMP": timestamp, "OK-ACCESS-PASSPHRASE": self.passphrase } if self.test_mode: headers["x-simulated-trading"] = "1" return headers def _handle_response(self, response: requests.Response) -> Dict[str, Any]: """ 处理API响应 Args: response: 请求响应 Returns: 处理后的响应数据 """ if not response.ok: print(f"API请求失败: {response.status_code} {response.text}") return {"error": f"API请求失败: {response.status_code} {response.text}"} data = response.json() if data.get("code") != "0": print(f"API错误: {data.get('code')} {data.get('msg')}") return {"error": f"API错误: {data.get('code')} {data.get('msg')}"} return data def _convert_interval(self, interval: str) -> str: """ 将Binance格式的时间间隔转换为OKX格式 Args: interval: Binance格式的时间间隔,例如 "1m", "1h", "1d" Returns: OKX格式的时间间隔 """ interval_map = { "1m": "1m", # 1分钟 "3m": "3m", # 3分钟 "5m": "5m", # 5分钟 "15m": "15m", # 15分钟 "30m": "30m", # 30分钟 "1h": "1H", # 1小时 "2h": "2H", # 2小时 "4h": "4H", # 4小时 "6h": "6H", # 6小时 "12h": "12H", # 12小时 "1d": "1D", # 1天 "1w": "1W", # 1周 "1M": "1M" # 1个月 } return interval_map.get(interval, "1D") # 默认1天 def get_historical_klines(self, symbol: str, interval: str, start_str: Optional[str] = None, limit: int = 100) -> pd.DataFrame: """ 获取历史K线数据 Args: symbol: 交易对符号,例如 "BTC-USDT" interval: Binance格式的K线时间间隔,例如 "1m", "1h", "1d" start_str: 开始时间,格式为 "YYYY-MM-DD",如果为None,则从当前时间开始往前推 limit: 获取的K线数量,默认100,最大100 Returns: K线数据的DataFrame """ # 转换时间间隔格式 bar = self._convert_interval(interval) # OKX的交易对格式转换 okx_symbol = symbol.replace("USDT", "-USDT") # 构建请求路径 request_path = "/api/v5/market/candles" # 限制查询数量 limit = min(limit, 100) # OKX最多返回100条记录 # 计算起始时间和结束时间 if start_str: start_dt = datetime.strptime(start_str, "%Y-%m-%d") # OKX需要Unix时间戳(以秒为单位) after = int(start_dt.timestamp()) params = {"instId": okx_symbol, "bar": bar, "limit": str(limit), "after": str(after)} else: params = {"instId": okx_symbol, "bar": bar, "limit": str(limit)} # 构建完整的URL url = self.base_url + request_path try: # 生成请求头 timestamp = self._get_timestamp() path_with_params = request_path + "?" + "&".join([f"{k}={v}" for k, v in params.items()]) headers = self._sign(timestamp, "GET", path_with_params) # 发送请求 response = self.session.get(url, params=params, headers=headers) # 处理响应 result = self._handle_response(response) if "error" in result: print(f"获取K线数据出错: {result['error']}") return pd.DataFrame() # 获取K线数据 data = result.get("data", []) if not data: print(f"未获取到{symbol}的K线数据") return pd.DataFrame() # 创建DataFrame # OKX K线格式: [0]timestamp, [1]open, [2]high, [3]low, [4]close, [5]vol, [6]volCcy columns = ["timestamp", "open", "high", "low", "close", "volume", "volumeCcy"] df = pd.DataFrame(data, columns=columns) # 转换列类型 # 时间戳是以毫秒为单位,需要转换为日期时间 df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms") df["open"] = df["open"].astype(float) df["high"] = df["high"].astype(float) df["low"] = df["low"].astype(float) df["close"] = df["close"].astype(float) df["volume"] = df["volume"].astype(float) # 设置索引 df.set_index("timestamp", inplace=True) # 按照时间升序排序 df = df.sort_index() # 选择需要的列 df = df[["open", "high", "low", "close", "volume"]] return df except Exception as e: print(f"获取K线数据时出错: {e}") return pd.DataFrame() def get_exchange_info(self) -> Dict[str, Any]: """ 获取交易所信息 Returns: 交易所信息 """ url = self.base_url + "/api/v5/public/instruments" params = {"instType": "SPOT"} # 只获取现货交易对 try: # 发送请求 response = self.session.get(url, params=params) # 处理响应 result = self._handle_response(response) if "error" in result: print(f"获取交易所信息出错: {result['error']}") return {} return result except Exception as e: print(f"获取交易所信息时出错: {e}") return {} def get_ticker(self, symbol: str) -> Dict[str, Any]: """ 获取最新行情 Args: symbol: 交易对符号,例如 "BTC-USDT" Returns: 最新行情 """ # OKX的交易对格式转换 okx_symbol = symbol.replace("USDT", "-USDT") url = self.base_url + "/api/v5/market/ticker" params = {"instId": okx_symbol} try: # 发送请求 response = self.session.get(url, params=params) # 处理响应 result = self._handle_response(response) if "error" in result: print(f"获取最新行情出错: {result['error']}") return {} return result except Exception as e: print(f"获取最新行情时出错: {e}") return {}