304 lines
9.0 KiB
Python
304 lines
9.0 KiB
Python
"""
|
|
Data reader for fetching market data from Binance API
|
|
Pure API mode - no Redis dependency
|
|
"""
|
|
import logging
|
|
from typing import Optional, List, Dict, Any
|
|
from datetime import datetime
|
|
import pandas as pd
|
|
import requests
|
|
import time
|
|
|
|
from .config import config
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MarketDataReader:
|
|
"""Read market data directly from Binance API"""
|
|
|
|
def __init__(self, symbol: str = 'BTCUSDT'):
|
|
"""
|
|
Initialize data reader
|
|
|
|
Args:
|
|
symbol: Trading pair (e.g., 'BTCUSDT')
|
|
"""
|
|
self.symbol = symbol
|
|
self.base_url = 'https://fapi.binance.com'
|
|
|
|
# Cache for rate limiting
|
|
self._cache = {}
|
|
self._cache_ttl = 60 # Cache TTL in seconds
|
|
|
|
def _get_cached(self, key: str) -> Optional[Any]:
|
|
"""Get cached data if not expired"""
|
|
if key in self._cache:
|
|
data, timestamp = self._cache[key]
|
|
if time.time() - timestamp < self._cache_ttl:
|
|
return data
|
|
return None
|
|
|
|
def _set_cache(self, key: str, data: Any) -> None:
|
|
"""Set cache with current timestamp"""
|
|
self._cache[key] = (data, time.time())
|
|
|
|
def fetch_klines(
|
|
self, interval: str = '5m', limit: int = 200
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Fetch kline data from Binance API
|
|
|
|
Args:
|
|
interval: Kline interval (e.g., '5m', '15m', '1h', '4h')
|
|
limit: Number of candles to fetch (max 1500)
|
|
|
|
Returns:
|
|
DataFrame with OHLCV data
|
|
"""
|
|
cache_key = f"kline:{self.symbol}:{interval}"
|
|
cached = self._get_cached(cache_key)
|
|
if cached is not None:
|
|
logger.debug(f"Using cached data for {cache_key}")
|
|
return cached
|
|
|
|
try:
|
|
url = f'{self.base_url}/fapi/v1/klines'
|
|
params = {
|
|
'symbol': self.symbol,
|
|
'interval': interval,
|
|
'limit': min(limit, 1500)
|
|
}
|
|
|
|
logger.info(f"Fetching {limit} candles from Binance API ({self.symbol} {interval})...")
|
|
response = requests.get(url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
klines = []
|
|
for item in data:
|
|
klines.append({
|
|
'timestamp': datetime.fromtimestamp(item[0] / 1000),
|
|
'open': float(item[1]),
|
|
'high': float(item[2]),
|
|
'low': float(item[3]),
|
|
'close': float(item[4]),
|
|
'volume': float(item[5]),
|
|
'quote_volume': float(item[7]),
|
|
'trades': int(item[8]),
|
|
'is_closed': True,
|
|
})
|
|
|
|
df = pd.DataFrame(klines)
|
|
if not df.empty:
|
|
df.set_index('timestamp', inplace=True)
|
|
df.sort_index(inplace=True)
|
|
logger.info(f"Fetched {len(df)} candles from Binance API")
|
|
|
|
# Cache the result
|
|
self._set_cache(cache_key, df)
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching from Binance API: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def read_kline_stream(
|
|
self, stream_key: str, count: int = None, use_api_fallback: bool = True
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Read kline data - compatibility method that extracts interval from stream key
|
|
|
|
Args:
|
|
stream_key: Stream key format (e.g., 'binance:raw:kline:5m')
|
|
count: Number of candles to fetch
|
|
use_api_fallback: Ignored (always uses API)
|
|
|
|
Returns:
|
|
DataFrame with OHLCV data
|
|
"""
|
|
if count is None:
|
|
count = config.LOOKBACK_PERIODS
|
|
|
|
# Extract interval from stream key (e.g., 'binance:raw:kline:5m' -> '5m')
|
|
try:
|
|
interval = stream_key.split(':')[-1]
|
|
except Exception:
|
|
interval = '5m'
|
|
|
|
return self.fetch_klines(interval=interval, limit=count)
|
|
|
|
def fetch_historical_klines_from_api(
|
|
self, symbol: str = None, interval: str = '5m', limit: int = 200
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Fetch historical kline data from Binance API
|
|
Compatibility method for existing code
|
|
|
|
Args:
|
|
symbol: Trading pair (uses self.symbol if None)
|
|
interval: Kline interval
|
|
limit: Number of candles
|
|
|
|
Returns:
|
|
DataFrame with OHLCV data
|
|
"""
|
|
if symbol and symbol != self.symbol:
|
|
# Temporarily change symbol
|
|
old_symbol = self.symbol
|
|
self.symbol = symbol
|
|
df = self.fetch_klines(interval=interval, limit=limit)
|
|
self.symbol = old_symbol
|
|
return df
|
|
return self.fetch_klines(interval=interval, limit=limit)
|
|
|
|
def fetch_depth(self, limit: int = 20) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Fetch order book depth from Binance API
|
|
|
|
Args:
|
|
limit: Number of levels (5, 10, 20, 50, 100, 500, 1000)
|
|
|
|
Returns:
|
|
Dict with bids and asks, or None if error
|
|
"""
|
|
cache_key = f"depth:{self.symbol}:{limit}"
|
|
cached = self._get_cached(cache_key)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
try:
|
|
url = f'{self.base_url}/fapi/v1/depth'
|
|
params = {
|
|
'symbol': self.symbol,
|
|
'limit': limit
|
|
}
|
|
|
|
response = requests.get(url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
result = {
|
|
'timestamp': datetime.now(),
|
|
'bids': [[float(p), float(q)] for p, q in data['bids']],
|
|
'asks': [[float(p), float(q)] for p, q in data['asks']],
|
|
}
|
|
|
|
self._set_cache(cache_key, result)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching depth data: {e}")
|
|
return None
|
|
|
|
def read_latest_depth(self) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Read latest order book depth
|
|
Compatibility method for existing code
|
|
|
|
Returns:
|
|
Dict with bids and asks, or None if no data
|
|
"""
|
|
return self.fetch_depth(limit=20)
|
|
|
|
def fetch_recent_trades(self, limit: int = 100) -> List[Dict[str, Any]]:
|
|
"""
|
|
Fetch recent trades from Binance API
|
|
|
|
Args:
|
|
limit: Number of trades to fetch (max 1000)
|
|
|
|
Returns:
|
|
List of trade dictionaries
|
|
"""
|
|
cache_key = f"trades:{self.symbol}:{limit}"
|
|
cached = self._get_cached(cache_key)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
try:
|
|
url = f'{self.base_url}/fapi/v1/trades'
|
|
params = {
|
|
'symbol': self.symbol,
|
|
'limit': min(limit, 1000)
|
|
}
|
|
|
|
response = requests.get(url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
trades = []
|
|
for item in data:
|
|
trades.append({
|
|
'timestamp': datetime.fromtimestamp(item['time'] / 1000),
|
|
'price': float(item['price']),
|
|
'quantity': float(item['qty']),
|
|
'is_buyer_maker': item['isBuyerMaker'],
|
|
})
|
|
|
|
self._set_cache(cache_key, trades)
|
|
return trades
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching trade data: {e}")
|
|
return []
|
|
|
|
def read_recent_trades(self, count: int = 100) -> List[Dict[str, Any]]:
|
|
"""
|
|
Read recent trades
|
|
Compatibility method for existing code
|
|
|
|
Args:
|
|
count: Number of recent trades to fetch
|
|
|
|
Returns:
|
|
List of trade dictionaries
|
|
"""
|
|
return self.fetch_recent_trades(limit=count)
|
|
|
|
def get_multi_timeframe_data(self) -> Dict[str, pd.DataFrame]:
|
|
"""
|
|
Fetch data from multiple timeframes
|
|
|
|
Returns:
|
|
Dict mapping timeframe to DataFrame
|
|
"""
|
|
timeframes = {
|
|
'5m': 200,
|
|
'15m': 200,
|
|
'1h': 200,
|
|
'4h': 200,
|
|
'1d': 100,
|
|
'1w': 65,
|
|
}
|
|
|
|
data = {}
|
|
for tf, count in timeframes.items():
|
|
df = self.fetch_klines(interval=tf, limit=count)
|
|
if not df.empty:
|
|
data[tf] = df
|
|
|
|
return data
|
|
|
|
def get_latest_price(self) -> Optional[float]:
|
|
"""Get latest price from ticker"""
|
|
try:
|
|
url = f'{self.base_url}/fapi/v1/ticker/price'
|
|
params = {'symbol': self.symbol}
|
|
|
|
response = requests.get(url, params=params, timeout=5)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
return float(data['price'])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting latest price: {e}")
|
|
# Fallback: get from kline
|
|
df = self.fetch_klines(interval='1m', limit=1)
|
|
if not df.empty:
|
|
return float(df.iloc[-1]['close'])
|
|
return None
|