| analysis | ||
| config | ||
| notifiers | ||
| output | ||
| scripts | ||
| signals | ||
| .env.example | ||
| .gitignore | ||
| ARCHITECTURE.md | ||
| CLEANUP_SUMMARY.md | ||
| DINGTALK_SETUP.md | ||
| docker-compose.yml | ||
| Dockerfile | ||
| Makefile | ||
| NOTIFICATION_SUMMARY.md | ||
| preview_message.py | ||
| QUICK_START.md | ||
| README.md | ||
| requirements.txt | ||
| run_analysis.sh | ||
| run_llm.sh | ||
| run_signal_smart.sh | ||
| run_signal.sh | ||
| scheduler.py | ||
| SIGNAL_GENERATION_GUIDE.md | ||
| START_HERE.md | ||
| start_system.sh | ||
| stop_system.sh | ||
| USAGE.md | ||
| view_logs.sh | ||
| view_signal.sh | ||
Binance Real-time Data Ingestion System
生产级的 Binance WebSocket 实时数据采集系统,用于加密货币日内交易辅助。
功能特性
核心功能
- 多流订阅: 同时订阅 K线、订单簿深度、实时成交数据
- 自动重连: 指数退避策略,网络中断自动恢复
- 消息去重: 基于事件时间戳 (E字段) 的 LRU 缓存去重
- 内存保护: 限流 + 有界缓冲区,防止内存泄漏
- 流式存储: 数据写入 Redis Stream,支持多消费者
生产级特性
- 异步 I/O (asyncio) 高性能处理
- 批量写入 Redis,降低网络开销
- 健康检查和性能监控
- 优雅关闭和信号处理
- Docker 容器化部署
- 完整的日志和统计信息
系统架构
┌─────────────────────────────────────────────────────────────────┐
│ Binance WebSocket API │
│ wss://fstream.binance.com/stream │
└────────────────────────┬────────────────────────────────────────┘
│
│ Multi-stream subscription
│ (kline_5m, depth20, aggTrade)
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ WebSocket Client (Auto-reconnect) │
│ - Exponential backoff │
│ - Heartbeat monitoring │
│ - Connection pooling │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Message Deduplicator │
│ - LRU cache (10,000 entries) │
│ - Event time (E field) based │
│ - TTL: 5 minutes │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Buffered Message Processor │
│ - Max buffer: 1,000 messages │
│ - Rate limit: 1,000 msg/sec │
│ - Batch processing │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Redis Stream Writer │
│ Stream keys: │
│ - binance:raw:kline:5m (K线数据) │
│ - binance:raw:depth:20 (订单簿深度) │
│ - binance:raw:trade (实时成交) │
│ │
│ MAXLEN: ~10,000 (auto-trim) │
└─────────────────────────────────────────────────────────────────┘
快速开始
前置要求
- Docker & Docker Compose
- 网络连接 (访问 Binance API)
1. 启动系统
# 克隆仓库
cd realtime-ingestion
# 复制环境变量配置
cp .env.example .env
# 启动所有服务 (Redis + 数据采集)
docker-compose up -d
# 查看日志
docker-compose logs -f ingestion
2. 验证数据采集
# 进入 Redis 容器
docker exec -it tradus-redis redis-cli
# 查看所有 Stream keys
KEYS binance:raw:*
# 查看 K线数据数量
XLEN binance:raw:kline:5m
# 读取最新的 10 条 K线数据
XREVRANGE binance:raw:kline:5m + - COUNT 10
# 实时监控新数据 (阻塞式读取)
XREAD BLOCK 0 STREAMS binance:raw:trade $
3. 使用 Web UI (可选)
# 启动 Redis Commander (Web 界面)
docker-compose --profile debug up -d redis-commander
# 访问: http://localhost:8081
配置说明
环境变量 (.env)
# Binance 配置
SYMBOL=btcusdt # 交易对
KLINE_INTERVAL=5m # K线周期
BINANCE_WS_BASE_URL=wss://fstream.binance.com
# Redis 配置
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_STREAM_MAXLEN=10000 # Stream 最大长度
# 性能调优
MAX_BUFFER_SIZE=1000 # 最大缓冲区大小
RATE_LIMIT_MESSAGES_PER_SEC=1000 # 每秒处理消息数上限
DEDUP_CACHE_SIZE=10000 # 去重缓存大小
# 重连策略
RECONNECT_INITIAL_DELAY=1.0 # 初始重连延迟 (秒)
RECONNECT_MAX_DELAY=60.0 # 最大重连延迟 (秒)
MAX_RECONNECT_ATTEMPTS=100 # 最大重连次数 (-1 = 无限)
# 监控
HEALTH_CHECK_INTERVAL=30 # 健康检查间隔 (秒)
LOG_LEVEL=INFO # 日志级别
数据格式
Redis Stream Keys
| Stream Key | 数据类型 | 更新频率 | 说明 |
|---|---|---|---|
binance:raw:kline:5m |
K线 | 每5分钟 | OHLCV 数据 |
binance:raw:depth:20 |
订单簿 | 100ms | 前20档买卖盘 |
binance:raw:trade |
成交 | 实时 | 归集成交记录 |
数据示例
K线数据:
{
"e": "kline",
"E": 1701234567890,
"s": "BTCUSDT",
"k": {
"o": "42350.50",
"h": "42400.00",
"l": "42340.10",
"c": "42385.20",
"v": "125.4563"
}
}
监控和运维
查看系统状态
# 查看容器状态
docker-compose ps
# 查看实时日志
docker-compose logs -f ingestion
# 查看 Redis 内存使用
docker exec tradus-redis redis-cli INFO memory
健康检查
系统每 30 秒输出健康状态:
Health Check | WebSocket: ✓ | Redis: ✓ | Buffer: 15.2% | Dedup: 2.34% | Written: 12345
性能指标
日志中会定期输出:
- WebSocket 状态: 连接是否健康
- Redis 状态: 写入是否正常
- 缓冲区使用率: 内存压力指示
- 去重率: 重复消息比例
- 已写入消息数: 累计处理量
故障排查
1. WebSocket 连接失败
症状: 日志显示 "WebSocket connection closed"
解决方案:
# 检查网络连接
ping fstream.binance.com
# 检查防火墙规则
# 确保允许出站 HTTPS (443) 和 WebSocket 连接
# 重启服务
docker-compose restart ingestion
2. Redis 连接失败
症状: "Failed to connect to Redis"
解决方案:
# 检查 Redis 是否运行
docker-compose ps redis
# 测试 Redis 连接
docker exec tradus-redis redis-cli ping
# 重启 Redis
docker-compose restart redis
3. 缓冲区溢出
症状: "Buffer overflow! Dropped message"
解决方案:
# 增加缓冲区大小
# 编辑 .env:
MAX_BUFFER_SIZE=2000
# 或降低数据流量
# 只订阅必要的流 (修改 websocket_client.py)
# 重启服务
docker-compose restart ingestion
4. 高内存占用
症状: Redis 或应用内存使用过高
解决方案:
# 减少 Stream MAXLEN
REDIS_STREAM_MAXLEN=5000
# 减少去重缓存大小
DEDUP_CACHE_SIZE=5000
# 重启并清空数据
docker-compose down
docker volume rm realtime-ingestion_redis_data
docker-compose up -d
开发模式
本地开发 (不使用 Docker)
# 安装依赖
pip install -r requirements.txt
# 启动 Redis (使用 Docker)
docker run -d -p 6379:6379 redis:7.2-alpine
# 修改 .env
cp .env.example .env
# 设置: REDIS_HOST=localhost
# 运行应用
python main.py
运行测试
# 单元测试
pytest tests/
# 集成测试
pytest tests/integration/
# 覆盖率报告
pytest --cov=core --cov-report=html
生产部署建议
1. 高可用配置
- 使用 Redis Sentinel 或 Redis Cluster 实现高可用
- 部署多个采集实例 (消息去重会自动处理)
- 配置健康检查和自动重启
2. 监控告警
集成 Prometheus + Grafana:
# docker-compose.yml 添加
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
3. 日志收集
使用 ELK Stack 或 Loki:
logging:
driver: "loki"
options:
loki-url: "http://loki:3100/loki/api/v1/push"
4. 安全加固
- 为 Redis 设置密码 (
.env中的REDIS_PASSWORD) - 使用专用网络隔离服务
- 限制容器资源使用 (
deploy.resources)
API 文档
Python 消费端示例
import redis
import orjson
# 创建 Redis 客户端
r = redis.Redis(host='localhost', port=6379, decode_responses=False)
# 使用 Consumer Group (推荐)
r.xgroup_create('binance:raw:kline:5m', 'my-processor', id='0', mkstream=True)
while True:
# 读取数据
messages = r.xreadgroup(
groupname='my-processor',
consumername='worker-1',
streams={'binance:raw:kline:5m': '>'},
count=10,
block=1000
)
for stream, stream_msgs in messages:
for msg_id, fields in stream_msgs:
# 解析 JSON
data = orjson.loads(fields[b'data'])
# 提取 K线数据
kline = data['k']
print(f"Price: {kline['c']}, Volume: {kline['v']}")
# 确认消息
r.xack('binance:raw:kline:5m', 'my-processor', msg_id)
许可证
MIT License
联系方式
如有问题或建议,请提交 Issue 或 Pull Request.
更新日志
v1.0.0 (2023-11-29)
- 初始版本发布
- 支持 Binance 永续合约 WebSocket 数据采集
- 实现自动重连、消息去重、内存保护
- Docker 容器化部署