#!/usr/bin/env python # -*- coding: utf-8 -*- import os import json import requests import urllib.parse from typing import Dict, Any, Optional from datetime import datetime import pandas as pd class AllTickAPI: """AllTick API客户端,用于获取黄金等商品的K线数据""" def __init__(self, api_key: str): """ 初始化AllTick API客户端 Args: api_key: AllTick API密钥 """ self.api_key = api_key self.base_url = "https://quote.alltick.io/quote-b-api" self.stock_url = "https://quote.alltick.io/quote-stock-b-api" self.session = requests.Session() def get_stock_klines(self, symbol: str, interval: str, limit: int = 500) -> pd.DataFrame: """ 获取股票历史K线数据 """ return self._get_historical_klines(symbol, interval, limit, self.stock_url) def get_common_klines(self, symbol: str, interval: str, limit: int = 500) -> pd.DataFrame: """ 获取历史K线数据 """ return self._get_historical_klines(symbol, interval, limit, self.base_url) def _get_historical_klines(self, symbol: str, interval: str, limit: int = 500, base_url: str = None) -> pd.DataFrame: """ 获取历史K线数据 Args: symbol: 交易对符号,例如 "XAUUSD" interval: K线时间间隔,可选: 1m, 5m, 15m, 30m, 1h, 2h, 4h, 1d, 1w, 1M start_str: 开始时间,格式为 "YYYY-MM-DD" limit: 获取的K线数量,最大为1000 Returns: K线数据的DataFrame """ # 转换时间间隔格式 kline_type = self._convert_interval(interval) # 计算结束时间戳 end_timestamp = 0 # 0表示从当前时间开始查询 # 限制查询数量 query_kline_num = min(limit, 1000) # AllTick API每次最多查询1000根K线 # 构建查询参数 query_data = { "trace": datetime.now().strftime("%Y%m%d%H%M%S"), "data": { "code": symbol, "kline_type": kline_type, "kline_timestamp_end": end_timestamp, "query_kline_num": query_kline_num, "adjust_type": 0 # 0: 除权 } } # 对查询参数进行编码 encoded_query = urllib.parse.quote(json.dumps(query_data)) # 构建完整的URL url = f"{base_url}/kline?token={self.api_key}&query={encoded_query}" try: # 发送请求 response = self.session.get(url) response.raise_for_status() # 如果响应状态码不是200,则抛出异常 # 解析响应 result = response.json() # 检查返回状态 if result.get("ret") != 200: print(f"请求出错: {result.get('msg')}") return pd.DataFrame() # 获取K线数据 kline_data = result.get("data", {}).get("kline_list", []) if not kline_data: print(f"未获取到{symbol}的K线数据") return pd.DataFrame() # 转换为DataFrame df = pd.DataFrame(kline_data) # 转换列类型 df["timestamp"] = df["timestamp"].astype(int) df["open"] = df["open_price"].astype(float) df["high"] = df["high_price"].astype(float) df["low"] = df["low_price"].astype(float) df["close"] = df["close_price"].astype(float) df["volume"] = df["volume"].astype(float) # 设置索引 # df.set_index("timestamp", inplace=True) # 选择需要的列 df = df[["open", "high", "low", "close", "volume","timestamp"]] # 数据倒序 df = df.iloc[::-1] return df except Exception as e: print(f"获取K线数据时出错: {e}") return pd.DataFrame() def _convert_interval(self, interval: str) -> int: """ 将Binance格式的时间间隔转换为AllTick格式 Args: interval: Binance格式的时间间隔,例如 "1m", "1h", "1d" Returns: AllTick格式的K线类型 """ interval_map = { "1m": 1, # 1分钟K "5m": 2, # 5分钟K "15m": 3, # 15分钟K "30m": 4, # 30分钟K "1h": 5, # 小时K "2h": 6, # 2小时K "4h": 7, # 4小时K "1d": 8, # 日K "1w": 9, # 周K "1M": 10 # 月K } return interval_map.get(interval, 8) # 默认返回日K