tradusai/README.md
2025-12-02 22:54:03 +08:00

422 lines
11 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Binance Real-time Data Ingestion System
生产级的 Binance WebSocket 实时数据采集系统,用于加密货币日内交易辅助。
## 功能特性
### 核心功能
- **多流订阅**: 同时订阅 K线、订单簿深度、实时成交数据
- **自动重连**: 指数退避策略,网络中断自动恢复
- **消息去重**: 基于事件时间戳 (E字段) 的 LRU 缓存去重
- **内存保护**: 限流 + 有界缓冲区,防止内存泄漏
- **流式存储**: 数据写入 Redis Stream支持多消费者
### 生产级特性
- 异步 I/O (asyncio) 高性能处理
- 批量写入 Redis降低网络开销
- 健康检查和性能监控
- 优雅关闭和信号处理
- Docker 容器化部署
- 完整的日志和统计信息
---
## 系统架构
```
┌─────────────────────────────────────────────────────────────────┐
│ Binance WebSocket API │
│ wss://fstream.binance.com/stream │
└────────────────────────┬────────────────────────────────────────┘
│ Multi-stream subscription
│ (kline_5m, depth20, aggTrade)
┌─────────────────────────────────────────────────────────────────┐
│ WebSocket Client (Auto-reconnect) │
│ - Exponential backoff │
│ - Heartbeat monitoring │
│ - Connection pooling │
└────────────────────────┬────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Message Deduplicator │
│ - LRU cache (10,000 entries) │
│ - Event time (E field) based │
│ - TTL: 5 minutes │
└────────────────────────┬────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Buffered Message Processor │
│ - Max buffer: 1,000 messages │
│ - Rate limit: 1,000 msg/sec │
│ - Batch processing │
└────────────────────────┬────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Redis Stream Writer │
│ Stream keys: │
│ - binance:raw:kline:5m (K线数据) │
│ - binance:raw:depth:20 (订单簿深度) │
│ - binance:raw:trade (实时成交) │
│ │
│ MAXLEN: ~10,000 (auto-trim) │
└─────────────────────────────────────────────────────────────────┘
```
---
## 快速开始
### 前置要求
- Docker & Docker Compose
- 网络连接 (访问 Binance API)
### 1. 启动系统
```bash
# 克隆仓库
cd realtime-ingestion
# 复制环境变量配置
cp .env.example .env
# 启动所有服务 (Redis + 数据采集)
docker-compose up -d
# 查看日志
docker-compose logs -f ingestion
```
### 2. 验证数据采集
```bash
# 进入 Redis 容器
docker exec -it tradus-redis redis-cli
# 查看所有 Stream keys
KEYS binance:raw:*
# 查看 K线数据数量
XLEN binance:raw:kline:5m
# 读取最新的 10 条 K线数据
XREVRANGE binance:raw:kline:5m + - COUNT 10
# 实时监控新数据 (阻塞式读取)
XREAD BLOCK 0 STREAMS binance:raw:trade $
```
### 3. 使用 Web UI (可选)
```bash
# 启动 Redis Commander (Web 界面)
docker-compose --profile debug up -d redis-commander
# 访问: http://localhost:8081
```
---
## 配置说明
### 环境变量 (.env)
```bash
# Binance 配置
SYMBOL=btcusdt # 交易对
KLINE_INTERVAL=5m # K线周期
BINANCE_WS_BASE_URL=wss://fstream.binance.com
# Redis 配置
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_STREAM_MAXLEN=10000 # Stream 最大长度
# 性能调优
MAX_BUFFER_SIZE=1000 # 最大缓冲区大小
RATE_LIMIT_MESSAGES_PER_SEC=1000 # 每秒处理消息数上限
DEDUP_CACHE_SIZE=10000 # 去重缓存大小
# 重连策略
RECONNECT_INITIAL_DELAY=1.0 # 初始重连延迟 (秒)
RECONNECT_MAX_DELAY=60.0 # 最大重连延迟 (秒)
MAX_RECONNECT_ATTEMPTS=100 # 最大重连次数 (-1 = 无限)
# 监控
HEALTH_CHECK_INTERVAL=30 # 健康检查间隔 (秒)
LOG_LEVEL=INFO # 日志级别
```
---
## 数据格式
详见 [REDIS_DATA_EXAMPLES.md](./REDIS_DATA_EXAMPLES.md)
### Redis Stream Keys
| Stream Key | 数据类型 | 更新频率 | 说明 |
|------------|----------|----------|------|
| `binance:raw:kline:5m` | K线 | 每5分钟 | OHLCV 数据 |
| `binance:raw:depth:20` | 订单簿 | 100ms | 前20档买卖盘 |
| `binance:raw:trade` | 成交 | 实时 | 归集成交记录 |
### 数据示例
**K线数据:**
```json
{
"e": "kline",
"E": 1701234567890,
"s": "BTCUSDT",
"k": {
"o": "42350.50",
"h": "42400.00",
"l": "42340.10",
"c": "42385.20",
"v": "125.4563"
}
}
```
---
## 监控和运维
### 查看系统状态
```bash
# 查看容器状态
docker-compose ps
# 查看实时日志
docker-compose logs -f ingestion
# 查看 Redis 内存使用
docker exec tradus-redis redis-cli INFO memory
```
### 健康检查
系统每 30 秒输出健康状态:
```
Health Check | WebSocket: ✓ | Redis: ✓ | Buffer: 15.2% | Dedup: 2.34% | Written: 12345
```
### 性能指标
日志中会定期输出:
- **WebSocket 状态**: 连接是否健康
- **Redis 状态**: 写入是否正常
- **缓冲区使用率**: 内存压力指示
- **去重率**: 重复消息比例
- **已写入消息数**: 累计处理量
---
## 故障排查
### 1. WebSocket 连接失败
**症状**: 日志显示 "WebSocket connection closed"
**解决方案**:
```bash
# 检查网络连接
ping fstream.binance.com
# 检查防火墙规则
# 确保允许出站 HTTPS (443) 和 WebSocket 连接
# 重启服务
docker-compose restart ingestion
```
### 2. Redis 连接失败
**症状**: "Failed to connect to Redis"
**解决方案**:
```bash
# 检查 Redis 是否运行
docker-compose ps redis
# 测试 Redis 连接
docker exec tradus-redis redis-cli ping
# 重启 Redis
docker-compose restart redis
```
### 3. 缓冲区溢出
**症状**: "Buffer overflow! Dropped message"
**解决方案**:
```bash
# 增加缓冲区大小
# 编辑 .env:
MAX_BUFFER_SIZE=2000
# 或降低数据流量
# 只订阅必要的流 (修改 websocket_client.py)
# 重启服务
docker-compose restart ingestion
```
### 4. 高内存占用
**症状**: Redis 或应用内存使用过高
**解决方案**:
```bash
# 减少 Stream MAXLEN
REDIS_STREAM_MAXLEN=5000
# 减少去重缓存大小
DEDUP_CACHE_SIZE=5000
# 重启并清空数据
docker-compose down
docker volume rm realtime-ingestion_redis_data
docker-compose up -d
```
---
## 开发模式
### 本地开发 (不使用 Docker)
```bash
# 安装依赖
pip install -r requirements.txt
# 启动 Redis (使用 Docker)
docker run -d -p 6379:6379 redis:7.2-alpine
# 修改 .env
cp .env.example .env
# 设置: REDIS_HOST=localhost
# 运行应用
python main.py
```
### 运行测试
```bash
# 单元测试
pytest tests/
# 集成测试
pytest tests/integration/
# 覆盖率报告
pytest --cov=core --cov-report=html
```
---
## 生产部署建议
### 1. 高可用配置
- 使用 **Redis Sentinel****Redis Cluster** 实现高可用
- 部署多个采集实例 (消息去重会自动处理)
- 配置健康检查和自动重启
### 2. 监控告警
集成 Prometheus + Grafana:
```yaml
# docker-compose.yml 添加
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
```
### 3. 日志收集
使用 ELK Stack 或 Loki:
```yaml
logging:
driver: "loki"
options:
loki-url: "http://loki:3100/loki/api/v1/push"
```
### 4. 安全加固
- 为 Redis 设置密码 (`.env` 中的 `REDIS_PASSWORD`)
- 使用专用网络隔离服务
- 限制容器资源使用 (`deploy.resources`)
---
## API 文档
### Python 消费端示例
```python
import redis
import orjson
# 创建 Redis 客户端
r = redis.Redis(host='localhost', port=6379, decode_responses=False)
# 使用 Consumer Group (推荐)
r.xgroup_create('binance:raw:kline:5m', 'my-processor', id='0', mkstream=True)
while True:
# 读取数据
messages = r.xreadgroup(
groupname='my-processor',
consumername='worker-1',
streams={'binance:raw:kline:5m': '>'},
count=10,
block=1000
)
for stream, stream_msgs in messages:
for msg_id, fields in stream_msgs:
# 解析 JSON
data = orjson.loads(fields[b'data'])
# 提取 K线数据
kline = data['k']
print(f"Price: {kline['c']}, Volume: {kline['v']}")
# 确认消息
r.xack('binance:raw:kline:5m', 'my-processor', msg_id)
```
---
## 许可证
MIT License
---
## 联系方式
如有问题或建议,请提交 Issue 或 Pull Request.
---
## 更新日志
### v1.0.0 (2023-11-29)
- 初始版本发布
- 支持 Binance 永续合约 WebSocket 数据采集
- 实现自动重连、消息去重、内存保护
- Docker 容器化部署