crypto.ai/cryptoai/api/okx_api.py
2025-04-28 12:14:28 +08:00

281 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import hmac
import base64
import json
import requests
import hashlib
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
import pandas as pd
class OKXAPI:
"""OKX API客户端用于获取加密货币数据"""
def __init__(self, api_key: str, api_secret: str, passphrase: str, test_mode: bool = True):
"""
初始化OKX API客户端
Args:
api_key: OKX API密钥
api_secret: OKX API密钥
passphrase: OKX API密码
test_mode: 是否使用测试模式
"""
self.api_key = api_key
self.api_secret = api_secret
self.passphrase = passphrase
self.test_mode = test_mode
# API基础URL
if test_mode:
self.base_url = "https://www.okx.com" # OKX生产环境
else:
self.base_url = "https://www.okx.com" # OKX生产环境
self.session = requests.Session()
def _get_timestamp(self) -> str:
"""获取ISO格式的时间戳"""
return datetime.utcnow().isoformat("T", "milliseconds") + "Z"
def _sign(self, timestamp: str, method: str, request_path: str, body: str = "") -> Dict[str, str]:
"""
生成OKX API签名
Args:
timestamp: 时间戳
method: HTTP方法例如 GET, POST
request_path: 请求路径
body: 请求体,默认为空
Returns:
包含签名的请求头
"""
if not body:
body = ""
message = timestamp + method + request_path + body
mac = hmac.new(
bytes(self.api_secret, encoding="utf-8"),
bytes(message, encoding="utf-8"),
digestmod=hashlib.sha256
)
d = mac.digest()
signature = base64.b64encode(d).decode()
headers = {
"OK-ACCESS-KEY": self.api_key,
"OK-ACCESS-SIGN": signature,
"OK-ACCESS-TIMESTAMP": timestamp,
"OK-ACCESS-PASSPHRASE": self.passphrase
}
if self.test_mode:
headers["x-simulated-trading"] = "1"
return headers
def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""
处理API响应
Args:
response: 请求响应
Returns:
处理后的响应数据
"""
if not response.ok:
print(f"API请求失败: {response.status_code} {response.text}")
return {"error": f"API请求失败: {response.status_code} {response.text}"}
data = response.json()
if data.get("code") != "0":
print(f"API错误: {data.get('code')} {data.get('msg')}")
return {"error": f"API错误: {data.get('code')} {data.get('msg')}"}
return data
def _convert_interval(self, interval: str) -> str:
"""
将Binance格式的时间间隔转换为OKX格式
Args:
interval: Binance格式的时间间隔例如 "1m", "1h", "1d"
Returns:
OKX格式的时间间隔
"""
interval_map = {
"1m": "1m", # 1分钟
"3m": "3m", # 3分钟
"5m": "5m", # 5分钟
"15m": "15m", # 15分钟
"30m": "30m", # 30分钟
"1h": "1H", # 1小时
"2h": "2H", # 2小时
"4h": "4H", # 4小时
"6h": "6H", # 6小时
"12h": "12H", # 12小时
"1d": "1D", # 1天
"1w": "1W", # 1周
"1M": "1M" # 1个月
}
return interval_map.get(interval, "1D") # 默认1天
def get_historical_klines(self, symbol: str, interval: str, start_str: Optional[str] = None,
limit: int = 100) -> pd.DataFrame:
"""
获取历史K线数据
Args:
symbol: 交易对符号,例如 "BTC-USDT"
interval: Binance格式的K线时间间隔例如 "1m", "1h", "1d"
start_str: 开始时间,格式为 "YYYY-MM-DD"如果为None则从当前时间开始往前推
limit: 获取的K线数量默认100最大100
Returns:
K线数据的DataFrame
"""
# 转换时间间隔格式
bar = self._convert_interval(interval)
# OKX的交易对格式转换
okx_symbol = symbol.replace("USDT", "-USDT")
# 构建请求路径
request_path = "/api/v5/market/candles"
# 限制查询数量
limit = min(limit, 100) # OKX最多返回100条记录
# 计算起始时间和结束时间
if start_str:
start_dt = datetime.strptime(start_str, "%Y-%m-%d")
# OKX需要Unix时间戳以秒为单位
after = int(start_dt.timestamp())
params = {"instId": okx_symbol, "bar": bar, "limit": str(limit), "after": str(after)}
else:
params = {"instId": okx_symbol, "bar": bar, "limit": str(limit)}
# 构建完整的URL
url = self.base_url + request_path
try:
# 生成请求头
timestamp = self._get_timestamp()
path_with_params = request_path + "?" + "&".join([f"{k}={v}" for k, v in params.items()])
headers = self._sign(timestamp, "GET", path_with_params)
# 发送请求
response = self.session.get(url, params=params, headers=headers)
# 处理响应
result = self._handle_response(response)
if "error" in result:
print(f"获取K线数据出错: {result['error']}")
return pd.DataFrame()
# 获取K线数据
data = result.get("data", [])
if not data:
print(f"未获取到{symbol}的K线数据")
return pd.DataFrame()
# 创建DataFrame
# OKX K线格式: [0]timestamp, [1]open, [2]high, [3]low, [4]close, [5]vol, [6]volCcy
columns = ["timestamp", "open", "high", "low", "close", "volume", "volumeCcy"]
df = pd.DataFrame(data, columns=columns)
# 转换列类型
# 时间戳是以毫秒为单位,需要转换为日期时间
df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms")
df["open"] = df["open"].astype(float)
df["high"] = df["high"].astype(float)
df["low"] = df["low"].astype(float)
df["close"] = df["close"].astype(float)
df["volume"] = df["volume"].astype(float)
# 设置索引
df.set_index("timestamp", inplace=True)
# 按照时间升序排序
df = df.sort_index()
# 选择需要的列
df = df[["open", "high", "low", "close", "volume"]]
return df
except Exception as e:
print(f"获取K线数据时出错: {e}")
return pd.DataFrame()
def get_exchange_info(self) -> Dict[str, Any]:
"""
获取交易所信息
Returns:
交易所信息
"""
url = self.base_url + "/api/v5/public/instruments"
params = {"instType": "SPOT"} # 只获取现货交易对
try:
# 发送请求
response = self.session.get(url, params=params)
# 处理响应
result = self._handle_response(response)
if "error" in result:
print(f"获取交易所信息出错: {result['error']}")
return {}
return result
except Exception as e:
print(f"获取交易所信息时出错: {e}")
return {}
def get_ticker(self, symbol: str) -> Dict[str, Any]:
"""
获取最新行情
Args:
symbol: 交易对符号,例如 "BTC-USDT"
Returns:
最新行情
"""
# OKX的交易对格式转换
okx_symbol = symbol.replace("USDT", "-USDT")
url = self.base_url + "/api/v5/market/ticker"
params = {"instId": okx_symbol}
try:
# 发送请求
response = self.session.get(url, params=params)
# 处理响应
result = self._handle_response(response)
if "error" in result:
print(f"获取最新行情出错: {result['error']}")
return {}
return result
except Exception as e:
print(f"获取最新行情时出错: {e}")
return {}