tradusai/analysis/data_reader.py
2025-12-04 01:27:58 +08:00

304 lines
9.0 KiB
Python

"""
Data reader for fetching market data from Binance API
Pure API mode - no Redis dependency
"""
import logging
from typing import Optional, List, Dict, Any
from datetime import datetime
import pandas as pd
import requests
import time
from .config import config
logger = logging.getLogger(__name__)
class MarketDataReader:
"""Read market data directly from Binance API"""
def __init__(self, symbol: str = 'BTCUSDT'):
"""
Initialize data reader
Args:
symbol: Trading pair (e.g., 'BTCUSDT')
"""
self.symbol = symbol
self.base_url = 'https://fapi.binance.com'
# Cache for rate limiting
self._cache = {}
self._cache_ttl = 60 # Cache TTL in seconds
def _get_cached(self, key: str) -> Optional[Any]:
"""Get cached data if not expired"""
if key in self._cache:
data, timestamp = self._cache[key]
if time.time() - timestamp < self._cache_ttl:
return data
return None
def _set_cache(self, key: str, data: Any) -> None:
"""Set cache with current timestamp"""
self._cache[key] = (data, time.time())
def fetch_klines(
self, interval: str = '5m', limit: int = 200
) -> pd.DataFrame:
"""
Fetch kline data from Binance API
Args:
interval: Kline interval (e.g., '5m', '15m', '1h', '4h')
limit: Number of candles to fetch (max 1500)
Returns:
DataFrame with OHLCV data
"""
cache_key = f"kline:{self.symbol}:{interval}"
cached = self._get_cached(cache_key)
if cached is not None:
logger.debug(f"Using cached data for {cache_key}")
return cached
try:
url = f'{self.base_url}/fapi/v1/klines'
params = {
'symbol': self.symbol,
'interval': interval,
'limit': min(limit, 1500)
}
logger.info(f"Fetching {limit} candles from Binance API ({self.symbol} {interval})...")
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
klines = []
for item in data:
klines.append({
'timestamp': datetime.fromtimestamp(item[0] / 1000),
'open': float(item[1]),
'high': float(item[2]),
'low': float(item[3]),
'close': float(item[4]),
'volume': float(item[5]),
'quote_volume': float(item[7]),
'trades': int(item[8]),
'is_closed': True,
})
df = pd.DataFrame(klines)
if not df.empty:
df.set_index('timestamp', inplace=True)
df.sort_index(inplace=True)
logger.info(f"Fetched {len(df)} candles from Binance API")
# Cache the result
self._set_cache(cache_key, df)
return df
except Exception as e:
logger.error(f"Error fetching from Binance API: {e}")
return pd.DataFrame()
def read_kline_stream(
self, stream_key: str, count: int = None, use_api_fallback: bool = True
) -> pd.DataFrame:
"""
Read kline data - compatibility method that extracts interval from stream key
Args:
stream_key: Stream key format (e.g., 'binance:raw:kline:5m')
count: Number of candles to fetch
use_api_fallback: Ignored (always uses API)
Returns:
DataFrame with OHLCV data
"""
if count is None:
count = config.LOOKBACK_PERIODS
# Extract interval from stream key (e.g., 'binance:raw:kline:5m' -> '5m')
try:
interval = stream_key.split(':')[-1]
except Exception:
interval = '5m'
return self.fetch_klines(interval=interval, limit=count)
def fetch_historical_klines_from_api(
self, symbol: str = None, interval: str = '5m', limit: int = 200
) -> pd.DataFrame:
"""
Fetch historical kline data from Binance API
Compatibility method for existing code
Args:
symbol: Trading pair (uses self.symbol if None)
interval: Kline interval
limit: Number of candles
Returns:
DataFrame with OHLCV data
"""
if symbol and symbol != self.symbol:
# Temporarily change symbol
old_symbol = self.symbol
self.symbol = symbol
df = self.fetch_klines(interval=interval, limit=limit)
self.symbol = old_symbol
return df
return self.fetch_klines(interval=interval, limit=limit)
def fetch_depth(self, limit: int = 20) -> Optional[Dict[str, Any]]:
"""
Fetch order book depth from Binance API
Args:
limit: Number of levels (5, 10, 20, 50, 100, 500, 1000)
Returns:
Dict with bids and asks, or None if error
"""
cache_key = f"depth:{self.symbol}:{limit}"
cached = self._get_cached(cache_key)
if cached is not None:
return cached
try:
url = f'{self.base_url}/fapi/v1/depth'
params = {
'symbol': self.symbol,
'limit': limit
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
result = {
'timestamp': datetime.now(),
'bids': [[float(p), float(q)] for p, q in data['bids']],
'asks': [[float(p), float(q)] for p, q in data['asks']],
}
self._set_cache(cache_key, result)
return result
except Exception as e:
logger.error(f"Error fetching depth data: {e}")
return None
def read_latest_depth(self) -> Optional[Dict[str, Any]]:
"""
Read latest order book depth
Compatibility method for existing code
Returns:
Dict with bids and asks, or None if no data
"""
return self.fetch_depth(limit=20)
def fetch_recent_trades(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
Fetch recent trades from Binance API
Args:
limit: Number of trades to fetch (max 1000)
Returns:
List of trade dictionaries
"""
cache_key = f"trades:{self.symbol}:{limit}"
cached = self._get_cached(cache_key)
if cached is not None:
return cached
try:
url = f'{self.base_url}/fapi/v1/trades'
params = {
'symbol': self.symbol,
'limit': min(limit, 1000)
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
trades = []
for item in data:
trades.append({
'timestamp': datetime.fromtimestamp(item['time'] / 1000),
'price': float(item['price']),
'quantity': float(item['qty']),
'is_buyer_maker': item['isBuyerMaker'],
})
self._set_cache(cache_key, trades)
return trades
except Exception as e:
logger.error(f"Error fetching trade data: {e}")
return []
def read_recent_trades(self, count: int = 100) -> List[Dict[str, Any]]:
"""
Read recent trades
Compatibility method for existing code
Args:
count: Number of recent trades to fetch
Returns:
List of trade dictionaries
"""
return self.fetch_recent_trades(limit=count)
def get_multi_timeframe_data(self) -> Dict[str, pd.DataFrame]:
"""
Fetch data from multiple timeframes
Returns:
Dict mapping timeframe to DataFrame
"""
timeframes = {
'5m': 200,
'15m': 200,
'1h': 200,
'4h': 200,
'1d': 100,
'1w': 65,
}
data = {}
for tf, count in timeframes.items():
df = self.fetch_klines(interval=tf, limit=count)
if not df.empty:
data[tf] = df
return data
def get_latest_price(self) -> Optional[float]:
"""Get latest price from ticker"""
try:
url = f'{self.base_url}/fapi/v1/ticker/price'
params = {'symbol': self.symbol}
response = requests.get(url, params=params, timeout=5)
response.raise_for_status()
data = response.json()
return float(data['price'])
except Exception as e:
logger.error(f"Error getting latest price: {e}")
# Fallback: get from kline
df = self.fetch_klines(interval='1m', limit=1)
if not df.empty:
return float(df.iloc[-1]['close'])
return None