281 lines
9.0 KiB
Python
281 lines
9.0 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import os
|
||
import time
|
||
import hmac
|
||
import base64
|
||
import json
|
||
import requests
|
||
import hashlib
|
||
from typing import Dict, Any, Optional, List
|
||
from datetime import datetime, timedelta
|
||
import pandas as pd
|
||
|
||
|
||
class OKXAPI:
|
||
"""OKX API客户端,用于获取加密货币数据"""
|
||
|
||
def __init__(self, api_key: str, api_secret: str, passphrase: str, test_mode: bool = True):
|
||
"""
|
||
初始化OKX API客户端
|
||
|
||
Args:
|
||
api_key: OKX API密钥
|
||
api_secret: OKX API密钥
|
||
passphrase: OKX API密码
|
||
test_mode: 是否使用测试模式
|
||
"""
|
||
self.api_key = api_key
|
||
self.api_secret = api_secret
|
||
self.passphrase = passphrase
|
||
self.test_mode = test_mode
|
||
|
||
# API基础URL
|
||
if test_mode:
|
||
self.base_url = "https://www.okx.com" # OKX生产环境
|
||
else:
|
||
self.base_url = "https://www.okx.com" # OKX生产环境
|
||
|
||
self.session = requests.Session()
|
||
|
||
def _get_timestamp(self) -> str:
|
||
"""获取ISO格式的时间戳"""
|
||
return datetime.utcnow().isoformat("T", "milliseconds") + "Z"
|
||
|
||
def _sign(self, timestamp: str, method: str, request_path: str, body: str = "") -> Dict[str, str]:
|
||
"""
|
||
生成OKX API签名
|
||
|
||
Args:
|
||
timestamp: 时间戳
|
||
method: HTTP方法,例如 GET, POST
|
||
request_path: 请求路径
|
||
body: 请求体,默认为空
|
||
|
||
Returns:
|
||
包含签名的请求头
|
||
"""
|
||
if not body:
|
||
body = ""
|
||
|
||
message = timestamp + method + request_path + body
|
||
mac = hmac.new(
|
||
bytes(self.api_secret, encoding="utf-8"),
|
||
bytes(message, encoding="utf-8"),
|
||
digestmod=hashlib.sha256
|
||
)
|
||
d = mac.digest()
|
||
signature = base64.b64encode(d).decode()
|
||
|
||
headers = {
|
||
"OK-ACCESS-KEY": self.api_key,
|
||
"OK-ACCESS-SIGN": signature,
|
||
"OK-ACCESS-TIMESTAMP": timestamp,
|
||
"OK-ACCESS-PASSPHRASE": self.passphrase
|
||
}
|
||
|
||
if self.test_mode:
|
||
headers["x-simulated-trading"] = "1"
|
||
|
||
return headers
|
||
|
||
def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
|
||
"""
|
||
处理API响应
|
||
|
||
Args:
|
||
response: 请求响应
|
||
|
||
Returns:
|
||
处理后的响应数据
|
||
"""
|
||
if not response.ok:
|
||
print(f"API请求失败: {response.status_code} {response.text}")
|
||
return {"error": f"API请求失败: {response.status_code} {response.text}"}
|
||
|
||
data = response.json()
|
||
|
||
if data.get("code") != "0":
|
||
print(f"API错误: {data.get('code')} {data.get('msg')}")
|
||
return {"error": f"API错误: {data.get('code')} {data.get('msg')}"}
|
||
|
||
return data
|
||
|
||
def _convert_interval(self, interval: str) -> str:
|
||
"""
|
||
将Binance格式的时间间隔转换为OKX格式
|
||
|
||
Args:
|
||
interval: Binance格式的时间间隔,例如 "1m", "1h", "1d"
|
||
|
||
Returns:
|
||
OKX格式的时间间隔
|
||
"""
|
||
interval_map = {
|
||
"1m": "1m", # 1分钟
|
||
"3m": "3m", # 3分钟
|
||
"5m": "5m", # 5分钟
|
||
"15m": "15m", # 15分钟
|
||
"30m": "30m", # 30分钟
|
||
"1h": "1H", # 1小时
|
||
"2h": "2H", # 2小时
|
||
"4h": "4H", # 4小时
|
||
"6h": "6H", # 6小时
|
||
"12h": "12H", # 12小时
|
||
"1d": "1D", # 1天
|
||
"1w": "1W", # 1周
|
||
"1M": "1M" # 1个月
|
||
}
|
||
|
||
return interval_map.get(interval, "1D") # 默认1天
|
||
|
||
def get_historical_klines(self, symbol: str, interval: str, start_str: Optional[str] = None,
|
||
limit: int = 100) -> pd.DataFrame:
|
||
"""
|
||
获取历史K线数据
|
||
|
||
Args:
|
||
symbol: 交易对符号,例如 "BTC-USDT"
|
||
interval: Binance格式的K线时间间隔,例如 "1m", "1h", "1d"
|
||
start_str: 开始时间,格式为 "YYYY-MM-DD",如果为None,则从当前时间开始往前推
|
||
limit: 获取的K线数量,默认100,最大100
|
||
|
||
Returns:
|
||
K线数据的DataFrame
|
||
"""
|
||
# 转换时间间隔格式
|
||
bar = self._convert_interval(interval)
|
||
|
||
# OKX的交易对格式转换
|
||
okx_symbol = symbol.replace("USDT", "-USDT")
|
||
|
||
# 构建请求路径
|
||
request_path = "/api/v5/market/candles"
|
||
|
||
# 限制查询数量
|
||
limit = min(limit, 100) # OKX最多返回100条记录
|
||
|
||
# 计算起始时间和结束时间
|
||
if start_str:
|
||
start_dt = datetime.strptime(start_str, "%Y-%m-%d")
|
||
# OKX需要Unix时间戳(以秒为单位)
|
||
after = int(start_dt.timestamp())
|
||
params = {"instId": okx_symbol, "bar": bar, "limit": str(limit), "after": str(after)}
|
||
else:
|
||
params = {"instId": okx_symbol, "bar": bar, "limit": str(limit)}
|
||
|
||
# 构建完整的URL
|
||
url = self.base_url + request_path
|
||
|
||
try:
|
||
# 生成请求头
|
||
timestamp = self._get_timestamp()
|
||
path_with_params = request_path + "?" + "&".join([f"{k}={v}" for k, v in params.items()])
|
||
headers = self._sign(timestamp, "GET", path_with_params)
|
||
|
||
# 发送请求
|
||
response = self.session.get(url, params=params, headers=headers)
|
||
|
||
# 处理响应
|
||
result = self._handle_response(response)
|
||
|
||
if "error" in result:
|
||
print(f"获取K线数据出错: {result['error']}")
|
||
return pd.DataFrame()
|
||
|
||
# 获取K线数据
|
||
data = result.get("data", [])
|
||
|
||
if not data:
|
||
print(f"未获取到{symbol}的K线数据")
|
||
return pd.DataFrame()
|
||
|
||
# 创建DataFrame
|
||
# OKX K线格式: [0]timestamp, [1]open, [2]high, [3]low, [4]close, [5]vol, [6]volCcy
|
||
columns = ["timestamp", "open", "high", "low", "close", "volume", "volumeCcy"]
|
||
df = pd.DataFrame(data, columns=columns)
|
||
|
||
# 转换列类型
|
||
# 时间戳是以毫秒为单位,需要转换为日期时间
|
||
df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms")
|
||
df["open"] = df["open"].astype(float)
|
||
df["high"] = df["high"].astype(float)
|
||
df["low"] = df["low"].astype(float)
|
||
df["close"] = df["close"].astype(float)
|
||
df["volume"] = df["volume"].astype(float)
|
||
|
||
# 设置索引
|
||
df.set_index("timestamp", inplace=True)
|
||
|
||
# 按照时间升序排序
|
||
df = df.sort_index()
|
||
|
||
# 选择需要的列
|
||
df = df[["open", "high", "low", "close", "volume"]]
|
||
|
||
return df
|
||
|
||
except Exception as e:
|
||
print(f"获取K线数据时出错: {e}")
|
||
return pd.DataFrame()
|
||
|
||
def get_exchange_info(self) -> Dict[str, Any]:
|
||
"""
|
||
获取交易所信息
|
||
|
||
Returns:
|
||
交易所信息
|
||
"""
|
||
url = self.base_url + "/api/v5/public/instruments"
|
||
params = {"instType": "SPOT"} # 只获取现货交易对
|
||
|
||
try:
|
||
# 发送请求
|
||
response = self.session.get(url, params=params)
|
||
|
||
# 处理响应
|
||
result = self._handle_response(response)
|
||
|
||
if "error" in result:
|
||
print(f"获取交易所信息出错: {result['error']}")
|
||
return {}
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"获取交易所信息时出错: {e}")
|
||
return {}
|
||
|
||
def get_ticker(self, symbol: str) -> Dict[str, Any]:
|
||
"""
|
||
获取最新行情
|
||
|
||
Args:
|
||
symbol: 交易对符号,例如 "BTC-USDT"
|
||
|
||
Returns:
|
||
最新行情
|
||
"""
|
||
# OKX的交易对格式转换
|
||
okx_symbol = symbol.replace("USDT", "-USDT")
|
||
|
||
url = self.base_url + "/api/v5/market/ticker"
|
||
params = {"instId": okx_symbol}
|
||
|
||
try:
|
||
# 发送请求
|
||
response = self.session.get(url, params=params)
|
||
|
||
# 处理响应
|
||
result = self._handle_response(response)
|
||
|
||
if "error" in result:
|
||
print(f"获取最新行情出错: {result['error']}")
|
||
return {}
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"获取最新行情时出错: {e}")
|
||
return {} |