""" Data reader for fetching market data from Binance API Pure API mode - no Redis dependency """ import logging from typing import Optional, List, Dict, Any from datetime import datetime import pandas as pd import requests import time from .config import config logger = logging.getLogger(__name__) class MarketDataReader: """Read market data directly from Binance API""" def __init__(self, symbol: str = 'BTCUSDT'): """ Initialize data reader Args: symbol: Trading pair (e.g., 'BTCUSDT') """ self.symbol = symbol self.base_url = 'https://fapi.binance.com' # Cache for rate limiting self._cache = {} self._cache_ttl = 60 # Cache TTL in seconds def _get_cached(self, key: str) -> Optional[Any]: """Get cached data if not expired""" if key in self._cache: data, timestamp = self._cache[key] if time.time() - timestamp < self._cache_ttl: return data return None def _set_cache(self, key: str, data: Any) -> None: """Set cache with current timestamp""" self._cache[key] = (data, time.time()) def fetch_klines( self, interval: str = '5m', limit: int = 200 ) -> pd.DataFrame: """ Fetch kline data from Binance API Args: interval: Kline interval (e.g., '5m', '15m', '1h', '4h') limit: Number of candles to fetch (max 1500) Returns: DataFrame with OHLCV data """ cache_key = f"kline:{self.symbol}:{interval}" cached = self._get_cached(cache_key) if cached is not None: logger.debug(f"Using cached data for {cache_key}") return cached try: url = f'{self.base_url}/fapi/v1/klines' params = { 'symbol': self.symbol, 'interval': interval, 'limit': min(limit, 1500) } logger.info(f"Fetching {limit} candles from Binance API ({self.symbol} {interval})...") response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() klines = [] for item in data: klines.append({ 'timestamp': datetime.fromtimestamp(item[0] / 1000), 'open': float(item[1]), 'high': float(item[2]), 'low': float(item[3]), 'close': float(item[4]), 'volume': float(item[5]), 'quote_volume': float(item[7]), 'trades': int(item[8]), 'is_closed': True, }) df = pd.DataFrame(klines) if not df.empty: df.set_index('timestamp', inplace=True) df.sort_index(inplace=True) logger.info(f"Fetched {len(df)} candles from Binance API") # Cache the result self._set_cache(cache_key, df) return df except Exception as e: logger.error(f"Error fetching from Binance API: {e}") return pd.DataFrame() def read_kline_stream( self, stream_key: str, count: int = None, use_api_fallback: bool = True ) -> pd.DataFrame: """ Read kline data - compatibility method that extracts interval from stream key Args: stream_key: Stream key format (e.g., 'binance:raw:kline:5m') count: Number of candles to fetch use_api_fallback: Ignored (always uses API) Returns: DataFrame with OHLCV data """ if count is None: count = config.LOOKBACK_PERIODS # Extract interval from stream key (e.g., 'binance:raw:kline:5m' -> '5m') try: interval = stream_key.split(':')[-1] except Exception: interval = '5m' return self.fetch_klines(interval=interval, limit=count) def fetch_historical_klines_from_api( self, symbol: str = None, interval: str = '5m', limit: int = 200 ) -> pd.DataFrame: """ Fetch historical kline data from Binance API Compatibility method for existing code Args: symbol: Trading pair (uses self.symbol if None) interval: Kline interval limit: Number of candles Returns: DataFrame with OHLCV data """ if symbol and symbol != self.symbol: # Temporarily change symbol old_symbol = self.symbol self.symbol = symbol df = self.fetch_klines(interval=interval, limit=limit) self.symbol = old_symbol return df return self.fetch_klines(interval=interval, limit=limit) def fetch_depth(self, limit: int = 20) -> Optional[Dict[str, Any]]: """ Fetch order book depth from Binance API Args: limit: Number of levels (5, 10, 20, 50, 100, 500, 1000) Returns: Dict with bids and asks, or None if error """ cache_key = f"depth:{self.symbol}:{limit}" cached = self._get_cached(cache_key) if cached is not None: return cached try: url = f'{self.base_url}/fapi/v1/depth' params = { 'symbol': self.symbol, 'limit': limit } response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() result = { 'timestamp': datetime.now(), 'bids': [[float(p), float(q)] for p, q in data['bids']], 'asks': [[float(p), float(q)] for p, q in data['asks']], } self._set_cache(cache_key, result) return result except Exception as e: logger.error(f"Error fetching depth data: {e}") return None def read_latest_depth(self) -> Optional[Dict[str, Any]]: """ Read latest order book depth Compatibility method for existing code Returns: Dict with bids and asks, or None if no data """ return self.fetch_depth(limit=20) def fetch_recent_trades(self, limit: int = 100) -> List[Dict[str, Any]]: """ Fetch recent trades from Binance API Args: limit: Number of trades to fetch (max 1000) Returns: List of trade dictionaries """ cache_key = f"trades:{self.symbol}:{limit}" cached = self._get_cached(cache_key) if cached is not None: return cached try: url = f'{self.base_url}/fapi/v1/trades' params = { 'symbol': self.symbol, 'limit': min(limit, 1000) } response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() trades = [] for item in data: trades.append({ 'timestamp': datetime.fromtimestamp(item['time'] / 1000), 'price': float(item['price']), 'quantity': float(item['qty']), 'is_buyer_maker': item['isBuyerMaker'], }) self._set_cache(cache_key, trades) return trades except Exception as e: logger.error(f"Error fetching trade data: {e}") return [] def read_recent_trades(self, count: int = 100) -> List[Dict[str, Any]]: """ Read recent trades Compatibility method for existing code Args: count: Number of recent trades to fetch Returns: List of trade dictionaries """ return self.fetch_recent_trades(limit=count) def get_multi_timeframe_data(self) -> Dict[str, pd.DataFrame]: """ Fetch data from multiple timeframes Returns: Dict mapping timeframe to DataFrame """ timeframes = { '5m': 200, '15m': 200, '1h': 200, '4h': 200, '1d': 100, '1w': 65, } data = {} for tf, count in timeframes.items(): df = self.fetch_klines(interval=tf, limit=count) if not df.empty: data[tf] = df return data def get_latest_price(self) -> Optional[float]: """Get latest price from ticker""" try: url = f'{self.base_url}/fapi/v1/ticker/price' params = {'symbol': self.symbol} response = requests.get(url, params=params, timeout=5) response.raise_for_status() data = response.json() return float(data['price']) except Exception as e: logger.error(f"Error getting latest price: {e}") # Fallback: get from kline df = self.fetch_klines(interval='1m', limit=1) if not df.empty: return float(df.iloc[-1]['close']) return None