添加 okx 的支持

This commit is contained in:
aaron 2025-04-28 12:14:28 +08:00
parent 230f501f98
commit bc7b095f40
6 changed files with 357 additions and 7 deletions

View File

@ -12,6 +12,7 @@ from datetime import datetime, timedelta
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from api.binance_api import BinanceAPI
from api.okx_api import OKXAPI
from api.deepseek_api import DeepSeekAPI
from models.data_processor import DataProcessor
from utils.config_loader import ConfigLoader
@ -33,6 +34,7 @@ class CryptoAgent:
# 获取各部分配置
self.binance_config = self.config_loader.get_binance_config()
self.okx_config = self.config_loader.get_okx_config()
self.deepseek_config = self.config_loader.get_deepseek_config()
self.crypto_config = self.config_loader.get_crypto_config()
self.data_config = self.config_loader.get_data_config()
@ -46,6 +48,14 @@ class CryptoAgent:
test_mode=self.binance_config['test_mode']
)
# 初始化OKX API客户端
self.okx_api = OKXAPI(
api_key=self.okx_config['api_key'],
api_secret=self.okx_config['api_secret'],
passphrase=self.okx_config['passphrase'],
test_mode=self.okx_config['test_mode']
)
self.deepseek_api = DeepSeekAPI(
api_key=self.deepseek_config['api_key'],
model=self.deepseek_config['model']
@ -94,15 +104,41 @@ class CryptoAgent:
start_time = datetime.now() - timedelta(days=days)
start_str = start_time.strftime("%Y-%m-%d")
# 获取K线数据
data = self.binance_api.get_historical_klines(
symbol=symbol,
interval=self.time_interval,
start_str=start_str
)
try:
# 首先尝试从Binance获取K线数据
print(f"尝试从Binance获取{symbol}的K线数据...")
data = self.binance_api.get_historical_klines(
symbol=symbol,
interval=self.time_interval,
start_str=start_str
)
if data.empty:
# 如果从Binance获取失败尝试从OKX获取
print(f"从Binance获取数据失败尝试从OKX获取{symbol}的K线数据...")
data = self.okx_api.get_historical_klines(
symbol=symbol,
interval=self.time_interval,
start_str=start_str,
limit=100 # OKX最多返回100条记录这里可能需要多次请求以获取更多数据
)
except Exception as e:
print(f"从Binance获取数据出错: {e}尝试从OKX获取...")
try:
# 尝试从OKX获取
data = self.okx_api.get_historical_klines(
symbol=symbol,
interval=self.time_interval,
start_str=start_str,
limit=100
)
except Exception as okx_e:
print(f"从OKX获取数据也出错: {okx_e}")
return pd.DataFrame()
if data.empty:
print(f"无法获取{symbol}的历史数据")
print(f"无法从任何交易所获取{symbol}的历史数据")
return pd.DataFrame()
# 保存原始数据

281
cryptoai/api/okx_api.py Normal file
View File

@ -0,0 +1,281 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import hmac
import base64
import json
import requests
import hashlib
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
import pandas as pd
class OKXAPI:
"""OKX API客户端用于获取加密货币数据"""
def __init__(self, api_key: str, api_secret: str, passphrase: str, test_mode: bool = True):
"""
初始化OKX API客户端
Args:
api_key: OKX API密钥
api_secret: OKX API密钥
passphrase: OKX API密码
test_mode: 是否使用测试模式
"""
self.api_key = api_key
self.api_secret = api_secret
self.passphrase = passphrase
self.test_mode = test_mode
# API基础URL
if test_mode:
self.base_url = "https://www.okx.com" # OKX生产环境
else:
self.base_url = "https://www.okx.com" # OKX生产环境
self.session = requests.Session()
def _get_timestamp(self) -> str:
"""获取ISO格式的时间戳"""
return datetime.utcnow().isoformat("T", "milliseconds") + "Z"
def _sign(self, timestamp: str, method: str, request_path: str, body: str = "") -> Dict[str, str]:
"""
生成OKX API签名
Args:
timestamp: 时间戳
method: HTTP方法例如 GET, POST
request_path: 请求路径
body: 请求体默认为空
Returns:
包含签名的请求头
"""
if not body:
body = ""
message = timestamp + method + request_path + body
mac = hmac.new(
bytes(self.api_secret, encoding="utf-8"),
bytes(message, encoding="utf-8"),
digestmod=hashlib.sha256
)
d = mac.digest()
signature = base64.b64encode(d).decode()
headers = {
"OK-ACCESS-KEY": self.api_key,
"OK-ACCESS-SIGN": signature,
"OK-ACCESS-TIMESTAMP": timestamp,
"OK-ACCESS-PASSPHRASE": self.passphrase
}
if self.test_mode:
headers["x-simulated-trading"] = "1"
return headers
def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""
处理API响应
Args:
response: 请求响应
Returns:
处理后的响应数据
"""
if not response.ok:
print(f"API请求失败: {response.status_code} {response.text}")
return {"error": f"API请求失败: {response.status_code} {response.text}"}
data = response.json()
if data.get("code") != "0":
print(f"API错误: {data.get('code')} {data.get('msg')}")
return {"error": f"API错误: {data.get('code')} {data.get('msg')}"}
return data
def _convert_interval(self, interval: str) -> str:
"""
将Binance格式的时间间隔转换为OKX格式
Args:
interval: Binance格式的时间间隔例如 "1m", "1h", "1d"
Returns:
OKX格式的时间间隔
"""
interval_map = {
"1m": "1m", # 1分钟
"3m": "3m", # 3分钟
"5m": "5m", # 5分钟
"15m": "15m", # 15分钟
"30m": "30m", # 30分钟
"1h": "1H", # 1小时
"2h": "2H", # 2小时
"4h": "4H", # 4小时
"6h": "6H", # 6小时
"12h": "12H", # 12小时
"1d": "1D", # 1天
"1w": "1W", # 1周
"1M": "1M" # 1个月
}
return interval_map.get(interval, "1D") # 默认1天
def get_historical_klines(self, symbol: str, interval: str, start_str: Optional[str] = None,
limit: int = 100) -> pd.DataFrame:
"""
获取历史K线数据
Args:
symbol: 交易对符号例如 "BTC-USDT"
interval: Binance格式的K线时间间隔例如 "1m", "1h", "1d"
start_str: 开始时间格式为 "YYYY-MM-DD"如果为None则从当前时间开始往前推
limit: 获取的K线数量默认100最大100
Returns:
K线数据的DataFrame
"""
# 转换时间间隔格式
bar = self._convert_interval(interval)
# OKX的交易对格式转换
okx_symbol = symbol.replace("USDT", "-USDT")
# 构建请求路径
request_path = "/api/v5/market/candles"
# 限制查询数量
limit = min(limit, 100) # OKX最多返回100条记录
# 计算起始时间和结束时间
if start_str:
start_dt = datetime.strptime(start_str, "%Y-%m-%d")
# OKX需要Unix时间戳以秒为单位
after = int(start_dt.timestamp())
params = {"instId": okx_symbol, "bar": bar, "limit": str(limit), "after": str(after)}
else:
params = {"instId": okx_symbol, "bar": bar, "limit": str(limit)}
# 构建完整的URL
url = self.base_url + request_path
try:
# 生成请求头
timestamp = self._get_timestamp()
path_with_params = request_path + "?" + "&".join([f"{k}={v}" for k, v in params.items()])
headers = self._sign(timestamp, "GET", path_with_params)
# 发送请求
response = self.session.get(url, params=params, headers=headers)
# 处理响应
result = self._handle_response(response)
if "error" in result:
print(f"获取K线数据出错: {result['error']}")
return pd.DataFrame()
# 获取K线数据
data = result.get("data", [])
if not data:
print(f"未获取到{symbol}的K线数据")
return pd.DataFrame()
# 创建DataFrame
# OKX K线格式: [0]timestamp, [1]open, [2]high, [3]low, [4]close, [5]vol, [6]volCcy
columns = ["timestamp", "open", "high", "low", "close", "volume", "volumeCcy"]
df = pd.DataFrame(data, columns=columns)
# 转换列类型
# 时间戳是以毫秒为单位,需要转换为日期时间
df["timestamp"] = pd.to_datetime(df["timestamp"].astype(int), unit="ms")
df["open"] = df["open"].astype(float)
df["high"] = df["high"].astype(float)
df["low"] = df["low"].astype(float)
df["close"] = df["close"].astype(float)
df["volume"] = df["volume"].astype(float)
# 设置索引
df.set_index("timestamp", inplace=True)
# 按照时间升序排序
df = df.sort_index()
# 选择需要的列
df = df[["open", "high", "low", "close", "volume"]]
return df
except Exception as e:
print(f"获取K线数据时出错: {e}")
return pd.DataFrame()
def get_exchange_info(self) -> Dict[str, Any]:
"""
获取交易所信息
Returns:
交易所信息
"""
url = self.base_url + "/api/v5/public/instruments"
params = {"instType": "SPOT"} # 只获取现货交易对
try:
# 发送请求
response = self.session.get(url, params=params)
# 处理响应
result = self._handle_response(response)
if "error" in result:
print(f"获取交易所信息出错: {result['error']}")
return {}
return result
except Exception as e:
print(f"获取交易所信息时出错: {e}")
return {}
def get_ticker(self, symbol: str) -> Dict[str, Any]:
"""
获取最新行情
Args:
symbol: 交易对符号例如 "BTC-USDT"
Returns:
最新行情
"""
# OKX的交易对格式转换
okx_symbol = symbol.replace("USDT", "-USDT")
url = self.base_url + "/api/v5/market/ticker"
params = {"instId": okx_symbol}
try:
# 发送请求
response = self.session.get(url, params=params)
# 处理响应
result = self._handle_response(response)
if "error" in result:
print(f"获取最新行情出错: {result['error']}")
return {}
return result
except Exception as e:
print(f"获取最新行情时出错: {e}")
return {}

View File

@ -4,6 +4,13 @@ binance:
api_secret: "TySs6onlHOTrGzV8fMdDxLKTWWYnQ4rCHVAmjrcHby17acKflmo7xVTWVsbqtxe7"
test_mode: true # 设置为false将使用实盘交易
# OKX API设置
okx:
api_key: "your_okx_api_key_here"
api_secret: "your_okx_api_secret_here"
passphrase: "your_okx_passphrase_here"
test_mode: true # 设置为false将使用实盘交易
# DeepSeek AI设置
deepseek:
api_key: "sk-9f6b56f08796435d988cf202e37f6ee3"

View File

@ -46,6 +46,15 @@ def parse_arguments():
parser.add_argument('--alltick-key', type=str, default=None,
help='AllTick API密钥用于获取黄金等商品数据')
parser.add_argument('--okx-key', type=str, default=None,
help='OKX API密钥')
parser.add_argument('--okx-secret', type=str, default=None,
help='OKX API密钥')
parser.add_argument('--okx-passphrase', type=str, default=None,
help='OKX API密码')
return parser.parse_args()
@ -88,6 +97,16 @@ def override_config_with_args(config_loader: ConfigLoader, args) -> None:
alltick_config = config_loader.get_config('alltick')
alltick_config['api_key'] = args.alltick_key
# 设置OKX API相关配置
if args.okx_key or args.okx_secret or args.okx_passphrase:
okx_config = config_loader.get_config('okx')
if args.okx_key:
okx_config['api_key'] = args.okx_key
if args.okx_secret:
okx_config['api_secret'] = args.okx_secret
if args.okx_passphrase:
okx_config['passphrase'] = args.okx_passphrase
# 保存修改后的配置
# 注意:这只是修改了内存中的配置,没有写入文件
# 如果需要保存到文件,可以实现一个 save_config 方法

View File

@ -64,6 +64,10 @@ class ConfigLoader:
"""获取Binance配置"""
return self.get_config('binance')
def get_okx_config(self) -> Dict[str, Any]:
"""获取OKX配置"""
return self.get_config('okx')
def get_deepseek_config(self) -> Dict[str, Any]:
"""获取DeepSeek配置"""
return self.get_config('deepseek')

3
run.py
View File

@ -13,6 +13,9 @@ CryptoAI 启动脚本
python run.py --agent gold # 使用黄金分析智能体
python run.py --agent crypto # 使用加密货币分析智能体
python run.py --alltick-key KEY # 设置AllTick API密钥
python run.py --okx-key KEY # 设置OKX API密钥
python run.py --okx-secret SECRET # 设置OKX API密钥
python run.py --okx-passphrase PASS # 设置OKX API密码
"""
import sys