""" Main application: Binance WebSocket to Redis Stream ingestion pipeline """ import asyncio import logging import signal import sys from typing import Dict, Any from config import settings from core import ( BinanceWebSocketClient, RedisStreamWriter, MessageDeduplicator, BufferedMessageProcessor, ) # Configure logging logging.basicConfig( level=getattr(logging, settings.LOG_LEVEL), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(sys.stdout), ], ) logger = logging.getLogger(__name__) class IngestionPipeline: """ Main ingestion pipeline orchestrator. Coordinates: - WebSocket client - Message deduplication - Buffering and rate limiting - Redis Stream writing - Health monitoring """ def __init__(self): self.ws_client: BinanceWebSocketClient = None self.redis_writer = RedisStreamWriter() self.deduplicator = MessageDeduplicator() self.buffer_processor = BufferedMessageProcessor() self.is_running = False self.tasks = [] async def on_message(self, message: Dict[str, Any]) -> None: """ Handle incoming WebSocket message Args: message: Raw message from WebSocket """ try: # Check for duplicates if self.deduplicator.is_duplicate(message): logger.debug(f"Duplicate message filtered: {message.get('E')}") return # Add to buffer (with overflow protection) success = await self.buffer_processor.add_message(message) if not success: logger.warning("Message dropped due to buffer overflow") except Exception as e: logger.error(f"Error in message handler: {e}", exc_info=True) async def process_messages(self) -> None: """Background task to process buffered messages""" logger.info("Starting message processor...") while self.is_running: try: # Get batch of messages batch = await self.buffer_processor.get_batch(timeout=1.0) if not batch: await asyncio.sleep(0.1) continue # Write batch to Redis written = await self.redis_writer.write_batch(batch) if written > 0: logger.debug(f"Wrote {written} messages to Redis") # Check buffer health if self.buffer_processor.is_buffer_critical(): logger.warning( f"Buffer usage critical: " f"{self.buffer_processor.get_buffer_usage():.1%}" ) except Exception as e: logger.error(f"Error processing messages: {e}", exc_info=True) await asyncio.sleep(1) logger.info("Message processor stopped") async def monitor_health(self) -> None: """Background task to monitor system health""" logger.info("Starting health monitor...") while self.is_running: try: await asyncio.sleep(settings.HEALTH_CHECK_INTERVAL) # Check WebSocket health ws_healthy = self.ws_client.is_healthy() if self.ws_client else False # Check Redis health redis_healthy = await self.redis_writer.health_check() # Get statistics dedup_stats = self.deduplicator.get_stats() buffer_stats = self.buffer_processor.get_stats() redis_stats = self.redis_writer.get_stats() # Log health status logger.info( f"Health Check | " f"WebSocket: {'✓' if ws_healthy else '✗'} | " f"Redis: {'✓' if redis_healthy else '✗'} | " f"Buffer: {buffer_stats['buffer_usage']} | " f"Dedup: {dedup_stats['duplicate_rate']} | " f"Written: {redis_stats['messages_written']}" ) # Alert if unhealthy if not ws_healthy: logger.error("WebSocket connection is unhealthy!") if not redis_healthy: logger.error("Redis connection is unhealthy!") except Exception as e: logger.error(f"Error in health monitor: {e}", exc_info=True) logger.info("Health monitor stopped") async def start(self) -> None: """Start ingestion pipeline""" logger.info("=" * 60) logger.info("Starting Binance Real-time Data Ingestion Pipeline") logger.info("=" * 60) logger.info(f"Symbol: {settings.SYMBOL.upper()}") logger.info(f"Kline Intervals: {', '.join(settings.kline_intervals_list)}") logger.info(f"Redis Host: {settings.REDIS_HOST}:{settings.REDIS_PORT}") logger.info("=" * 60) self.is_running = True try: # Connect to Redis logger.info("Connecting to Redis...") await self.redis_writer.connect() # Initialize WebSocket client self.ws_client = BinanceWebSocketClient( symbol=settings.SYMBOL, on_message=self.on_message, ) # Start background tasks logger.info("Starting background tasks...") self.tasks = [ asyncio.create_task(self.ws_client.start()), asyncio.create_task(self.process_messages()), asyncio.create_task(self.monitor_health()), ] # Wait for all tasks await asyncio.gather(*self.tasks) except Exception as e: logger.error(f"Fatal error in pipeline: {e}", exc_info=True) await self.stop() async def stop(self) -> None: """Stop ingestion pipeline gracefully""" logger.info("Stopping ingestion pipeline...") self.is_running = False # Stop WebSocket client if self.ws_client: await self.ws_client.stop() # Cancel background tasks for task in self.tasks: if not task.done(): task.cancel() # Wait for tasks to complete if self.tasks: await asyncio.gather(*self.tasks, return_exceptions=True) # Close Redis connection await self.redis_writer.close() # Print final statistics logger.info("=" * 60) logger.info("Final Statistics:") logger.info(f"Deduplication: {self.deduplicator.get_stats()}") logger.info(f"Buffer: {self.buffer_processor.get_stats()}") logger.info(f"Redis: {self.redis_writer.get_stats()}") logger.info("=" * 60) logger.info("Pipeline stopped successfully") async def main(): """Main entry point""" pipeline = IngestionPipeline() # Setup signal handlers for graceful shutdown def signal_handler(sig, frame): logger.info(f"Received signal {sig}, shutting down...") asyncio.create_task(pipeline.stop()) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Start pipeline await pipeline.start() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Interrupted by user") except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) sys.exit(1)