tradusai/main.py
2025-12-02 22:54:03 +08:00

237 lines
7.4 KiB
Python

"""
Main application: Binance WebSocket to Redis Stream ingestion pipeline
"""
import asyncio
import logging
import signal
import sys
from typing import Dict, Any
from config import settings
from core import (
BinanceWebSocketClient,
RedisStreamWriter,
MessageDeduplicator,
BufferedMessageProcessor,
)
# Configure logging
logging.basicConfig(
level=getattr(logging, settings.LOG_LEVEL),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
],
)
logger = logging.getLogger(__name__)
class IngestionPipeline:
"""
Main ingestion pipeline orchestrator.
Coordinates:
- WebSocket client
- Message deduplication
- Buffering and rate limiting
- Redis Stream writing
- Health monitoring
"""
def __init__(self):
self.ws_client: BinanceWebSocketClient = None
self.redis_writer = RedisStreamWriter()
self.deduplicator = MessageDeduplicator()
self.buffer_processor = BufferedMessageProcessor()
self.is_running = False
self.tasks = []
async def on_message(self, message: Dict[str, Any]) -> None:
"""
Handle incoming WebSocket message
Args:
message: Raw message from WebSocket
"""
try:
# Check for duplicates
if self.deduplicator.is_duplicate(message):
logger.debug(f"Duplicate message filtered: {message.get('E')}")
return
# Add to buffer (with overflow protection)
success = await self.buffer_processor.add_message(message)
if not success:
logger.warning("Message dropped due to buffer overflow")
except Exception as e:
logger.error(f"Error in message handler: {e}", exc_info=True)
async def process_messages(self) -> None:
"""Background task to process buffered messages"""
logger.info("Starting message processor...")
while self.is_running:
try:
# Get batch of messages
batch = await self.buffer_processor.get_batch(timeout=1.0)
if not batch:
await asyncio.sleep(0.1)
continue
# Write batch to Redis
written = await self.redis_writer.write_batch(batch)
if written > 0:
logger.debug(f"Wrote {written} messages to Redis")
# Check buffer health
if self.buffer_processor.is_buffer_critical():
logger.warning(
f"Buffer usage critical: "
f"{self.buffer_processor.get_buffer_usage():.1%}"
)
except Exception as e:
logger.error(f"Error processing messages: {e}", exc_info=True)
await asyncio.sleep(1)
logger.info("Message processor stopped")
async def monitor_health(self) -> None:
"""Background task to monitor system health"""
logger.info("Starting health monitor...")
while self.is_running:
try:
await asyncio.sleep(settings.HEALTH_CHECK_INTERVAL)
# Check WebSocket health
ws_healthy = self.ws_client.is_healthy() if self.ws_client else False
# Check Redis health
redis_healthy = await self.redis_writer.health_check()
# Get statistics
dedup_stats = self.deduplicator.get_stats()
buffer_stats = self.buffer_processor.get_stats()
redis_stats = self.redis_writer.get_stats()
# Log health status
logger.info(
f"Health Check | "
f"WebSocket: {'' if ws_healthy else ''} | "
f"Redis: {'' if redis_healthy else ''} | "
f"Buffer: {buffer_stats['buffer_usage']} | "
f"Dedup: {dedup_stats['duplicate_rate']} | "
f"Written: {redis_stats['messages_written']}"
)
# Alert if unhealthy
if not ws_healthy:
logger.error("WebSocket connection is unhealthy!")
if not redis_healthy:
logger.error("Redis connection is unhealthy!")
except Exception as e:
logger.error(f"Error in health monitor: {e}", exc_info=True)
logger.info("Health monitor stopped")
async def start(self) -> None:
"""Start ingestion pipeline"""
logger.info("=" * 60)
logger.info("Starting Binance Real-time Data Ingestion Pipeline")
logger.info("=" * 60)
logger.info(f"Symbol: {settings.SYMBOL.upper()}")
logger.info(f"Kline Intervals: {', '.join(settings.kline_intervals_list)}")
logger.info(f"Redis Host: {settings.REDIS_HOST}:{settings.REDIS_PORT}")
logger.info("=" * 60)
self.is_running = True
try:
# Connect to Redis
logger.info("Connecting to Redis...")
await self.redis_writer.connect()
# Initialize WebSocket client
self.ws_client = BinanceWebSocketClient(
symbol=settings.SYMBOL,
on_message=self.on_message,
)
# Start background tasks
logger.info("Starting background tasks...")
self.tasks = [
asyncio.create_task(self.ws_client.start()),
asyncio.create_task(self.process_messages()),
asyncio.create_task(self.monitor_health()),
]
# Wait for all tasks
await asyncio.gather(*self.tasks)
except Exception as e:
logger.error(f"Fatal error in pipeline: {e}", exc_info=True)
await self.stop()
async def stop(self) -> None:
"""Stop ingestion pipeline gracefully"""
logger.info("Stopping ingestion pipeline...")
self.is_running = False
# Stop WebSocket client
if self.ws_client:
await self.ws_client.stop()
# Cancel background tasks
for task in self.tasks:
if not task.done():
task.cancel()
# Wait for tasks to complete
if self.tasks:
await asyncio.gather(*self.tasks, return_exceptions=True)
# Close Redis connection
await self.redis_writer.close()
# Print final statistics
logger.info("=" * 60)
logger.info("Final Statistics:")
logger.info(f"Deduplication: {self.deduplicator.get_stats()}")
logger.info(f"Buffer: {self.buffer_processor.get_stats()}")
logger.info(f"Redis: {self.redis_writer.get_stats()}")
logger.info("=" * 60)
logger.info("Pipeline stopped successfully")
async def main():
"""Main entry point"""
pipeline = IngestionPipeline()
# Setup signal handlers for graceful shutdown
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}, shutting down...")
asyncio.create_task(pipeline.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start pipeline
await pipeline.start()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)