tradusai/trading/realtime_trader.py
2025-12-09 12:27:47 +08:00

355 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Realtime Paper Trading - 基于 WebSocket 实时数据的模拟盘
使用 Binance WebSocket 获取实时价格,结合信号进行模拟交易
支持仓位管理:金字塔加仓、最大持仓限制、部分止盈
"""
import asyncio
import json
import logging
import signal
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Callable
import websockets
from .paper_trading import PaperTrader
logger = logging.getLogger(__name__)
class RealtimeTrader:
"""实时模拟盘交易器"""
def __init__(
self,
symbol: str = "btcusdt",
initial_balance: float = 10000.0,
leverage: int = 5,
max_position_pct: float = 0.5,
base_position_pct: float = 0.1,
signal_check_interval: int = 60, # 每60秒检查一次信号
):
"""
初始化实时交易器
Args:
symbol: 交易对 (小写)
initial_balance: 初始资金
leverage: 杠杆倍数
max_position_pct: 最大持仓比例 (占资金百分比)
base_position_pct: 基础仓位比例 (每次入场)
signal_check_interval: 信号检查间隔(秒)
"""
self.symbol = symbol.lower()
self.signal_check_interval = signal_check_interval
# WebSocket URL
self.ws_url = f"wss://fstream.binance.com/ws/{self.symbol}@aggTrade"
# 模拟盘 - 使用新的仓位管理参数
self.trader = PaperTrader(
initial_balance=initial_balance,
leverage=leverage,
max_position_pct=max_position_pct,
base_position_pct=base_position_pct,
)
# 状态
self.current_price = 0.0
self.last_signal_check = 0
self.is_running = False
self.ws = None
# 信号文件路径
self.signal_file = Path(__file__).parent.parent / 'output' / 'latest_signal.json'
# 回调函数
self.on_trade_callback: Optional[Callable] = None
self.on_price_callback: Optional[Callable] = None
async def start(self):
"""启动实时交易"""
self.is_running = True
logger.info(f"Starting realtime trader for {self.symbol.upper()}")
logger.info(f"WebSocket URL: {self.ws_url}")
logger.info(f"Initial balance: ${self.trader.balance:.2f}")
logger.info(f"Leverage: {self.trader.leverage}x")
logger.info(f"Max position: {self.trader.position_manager.max_position_pct * 100}%")
logger.info(f"Base position: {self.trader.position_manager.base_position_pct * 100}%")
logger.info(f"Signal check interval: {self.signal_check_interval}s")
while self.is_running:
try:
await self._connect_and_trade()
except Exception as e:
logger.error(f"Connection error: {e}")
if self.is_running:
logger.info("Reconnecting in 5 seconds...")
await asyncio.sleep(5)
async def _connect_and_trade(self):
"""连接 WebSocket 并开始交易"""
async with websockets.connect(self.ws_url) as ws:
self.ws = ws
logger.info("WebSocket connected")
# 打印初始状态
self._print_status()
async for message in ws:
if not self.is_running:
break
try:
data = json.loads(message)
await self._process_tick(data)
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"Error processing tick: {e}")
async def _process_tick(self, data: Dict[str, Any]):
"""处理每个 tick 数据"""
# 提取价格
self.current_price = float(data.get('p', 0))
if self.current_price <= 0:
return
# 调用价格回调
if self.on_price_callback:
self.on_price_callback(self.current_price)
# 检查止盈止损
if self.trader.position:
close_result = self.trader._check_close_position(self.current_price)
if close_result:
self._on_position_closed(close_result)
# 定期检查信号
now = asyncio.get_event_loop().time()
if now - self.last_signal_check >= self.signal_check_interval:
self.last_signal_check = now
await self._check_and_execute_signal()
async def _check_and_execute_signal(self):
"""检查信号并执行交易"""
signal = self._load_latest_signal()
if not signal:
return
result = self.trader.process_signal(signal, self.current_price)
if result['action'] in ['OPEN', 'CLOSE', 'REVERSE', 'ADD', 'PARTIAL_CLOSE']:
self._on_trade_executed(result)
self._print_status()
def _load_latest_signal(self) -> Optional[Dict[str, Any]]:
"""加载最新信号"""
try:
if not self.signal_file.exists():
return None
with open(self.signal_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading signal: {e}")
return None
def _on_trade_executed(self, result: Dict[str, Any]):
"""交易执行回调"""
action = result['action']
details = result['details']
if action == 'OPEN':
logger.info("=" * 60)
logger.info(f"🟢 OPEN {details['side']}")
logger.info(f" Entry: ${details['entry_price']:.2f}")
logger.info(f" Size: {details['size']:.6f} BTC")
logger.info(f" Total Size: {details['total_size']:.6f} BTC")
logger.info(f" Stop Loss: ${details['stop_loss']:.2f}")
logger.info(f" Take Profit: ${details['take_profit']:.2f}")
logger.info("=" * 60)
elif action == 'ADD':
logger.info("=" * 60)
logger.info(f" ADD POSITION {details['side']}")
logger.info(f" Add Price: ${details['add_price']:.2f}")
logger.info(f" Add Size: {details['add_size']:.6f} BTC")
logger.info(f" Total Size: {details['total_size']:.6f} BTC")
logger.info(f" Avg Entry: ${details['avg_entry_price']:.2f}")
logger.info(f" Entries: {details['num_entries']}")
logger.info("=" * 60)
elif action == 'CLOSE':
pnl = details['pnl']
pnl_icon = "🟢" if pnl > 0 else "🔴"
logger.info("=" * 60)
logger.info(f"{pnl_icon} CLOSE {details['side']}")
logger.info(f" Entry: ${details['entry_price']:.2f}")
logger.info(f" Exit: ${details['exit_price']:.2f}")
logger.info(f" Size: {details['size']:.6f} BTC")
logger.info(f" Entries: {details.get('num_entries', 1)}")
logger.info(f" PnL: ${pnl:.2f} ({details['pnl_pct']:.2f}%)")
logger.info(f" Reason: {details['reason']}")
logger.info(f" New Balance: ${details['new_balance']:.2f}")
logger.info("=" * 60)
elif action == 'PARTIAL_CLOSE':
pnl = details['pnl']
pnl_icon = "🟢" if pnl > 0 else "🔴"
logger.info("=" * 60)
logger.info(f"📉 PARTIAL CLOSE {details['side']}")
logger.info(f" Closed Size: {details['closed_size']:.6f} BTC")
logger.info(f" Exit: ${details['exit_price']:.2f}")
logger.info(f" {pnl_icon} PnL: ${pnl:.2f} ({details['pnl_pct']:.2f}%)")
logger.info(f" Remaining: {details['remaining_size']:.6f} BTC")
logger.info(f" New Balance: ${details['new_balance']:.2f}")
logger.info("=" * 60)
elif action == 'REVERSE':
logger.info("=" * 60)
logger.info("🔄 REVERSE POSITION")
if 'close' in details:
logger.info(f" Closed: PnL ${details['close']['pnl']:.2f}")
if 'open' in details:
logger.info(f" Opened: {details['open']['side']} @ ${details['open']['entry_price']:.2f}")
logger.info("=" * 60)
# 调用外部回调
if self.on_trade_callback:
self.on_trade_callback(result)
def _on_position_closed(self, close_result: Dict[str, Any]):
"""持仓被平仓回调(止盈止损)"""
pnl = close_result['pnl']
pnl_icon = "🟢" if pnl > 0 else "🔴"
reason_icon = "🎯" if close_result['reason'] == 'TAKE_PROFIT' else "🛑"
logger.info("=" * 60)
logger.info(f"{reason_icon} {close_result['reason']}")
logger.info(f" {pnl_icon} PnL: ${pnl:.2f} ({close_result['pnl_pct']:.2f}%)")
logger.info(f" Entry: ${close_result['entry_price']:.2f}")
logger.info(f" Exit: ${close_result['exit_price']:.2f}")
logger.info(f" Size: {close_result['size']:.6f} BTC")
logger.info(f" Entries: {close_result.get('num_entries', 1)}")
logger.info(f" New Balance: ${close_result['new_balance']:.2f}")
logger.info("=" * 60)
self._print_status()
if self.on_trade_callback:
self.on_trade_callback({
'action': 'CLOSE',
'details': close_result,
})
def _print_status(self):
"""打印当前状态"""
status = self.trader.get_status(self.current_price)
print("\n" + "=" * 70)
print(f"📊 PAPER TRADING STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 70)
print(f"💰 Balance: ${status['balance']:.2f} (Initial: ${status['initial_balance']:.2f})")
print(f"📈 Total Return: {status['total_return']:.2f}%")
print(f"💵 Current Price: ${self.current_price:.2f}")
if status['position']:
pos = status['position']
unrealized = pos.get('unrealized_pnl', 0)
unrealized_pct = pos.get('unrealized_pnl_pct', 0)
pnl_icon = "🟢" if unrealized > 0 else "🔴" if unrealized < 0 else ""
print(f"\n📍 Position: {pos['side']} ({len(pos.get('entries', []))} entries)")
print(f" Avg Entry: ${pos['avg_entry_price']:.2f}")
print(f" Size: {pos['total_size']:.6f} BTC")
print(f" Stop Loss: ${pos['stop_loss']:.2f}")
print(f" Take Profit: ${pos['take_profit']:.2f}")
print(f" {pnl_icon} Unrealized PnL: ${unrealized:.2f} ({unrealized_pct:.2f}%)")
else:
print("\n📍 Position: FLAT (No position)")
stats = status['stats']
print(f"\n📊 Statistics:")
print(f" Total Trades: {stats['total_trades']}")
print(f" Win Rate: {stats['win_rate']:.1f}%")
print(f" Total PnL: ${stats['total_pnl']:.2f}")
print(f" Profit Factor: {stats['profit_factor']:.2f}")
print(f" Max Drawdown: {stats['max_drawdown']:.2f}%")
print(f" Max Consecutive Wins: {stats.get('max_consecutive_wins', 0)}")
print(f" Max Consecutive Losses: {stats.get('max_consecutive_losses', 0)}")
if status['recent_trades']:
print(f"\n📝 Recent Trades:")
for trade in status['recent_trades'][-5:]:
pnl_icon = "🟢" if trade['pnl'] > 0 else "🔴"
print(f" {pnl_icon} {trade['side']} | PnL: ${trade['pnl']:.2f} ({trade['pnl_pct']:.1f}%) | {trade['exit_reason']}")
print("=" * 70 + "\n")
def stop(self):
"""停止交易"""
self.is_running = False
logger.info("Stopping realtime trader...")
def get_status(self) -> Dict[str, Any]:
"""获取状态"""
return self.trader.get_status(self.current_price)
async def main():
"""主函数"""
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv(Path(__file__).parent.parent / '.env')
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 创建交易器
trader = RealtimeTrader(
symbol='btcusdt',
initial_balance=10000.0,
leverage=5,
max_position_pct=0.5, # 最大持仓50%资金
base_position_pct=0.1, # 每次入场10%资金
signal_check_interval=30, # 每30秒检查一次信号
)
# 设置信号处理
def signal_handler(sig, frame):
logger.info("Received shutdown signal")
trader.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 启动
print("\n" + "=" * 70)
print("🚀 REALTIME PAPER TRADING")
print("=" * 70)
print("Position Management:")
print(" - Max position: 50% of balance")
print(" - Base entry: 10% of balance")
print(" - Max entries: 5 (pyramid)")
print(" - Pyramid factor: 0.8x per entry")
print(" - Signal cooldown: 5 minutes")
print("=" * 70)
print("Press Ctrl+C to stop")
print("=" * 70 + "\n")
await trader.start()
if __name__ == "__main__":
asyncio.run(main())