tradusai/trading/realtime_trader.py
2025-12-09 12:57:31 +08:00

307 lines
11 KiB
Python

"""
Realtime Trading - 基于 WebSocket 实时数据的多周期交易
使用 Binance WebSocket 获取实时价格,结合信号进行多周期独立交易
- 短周期 (5m/15m/1h)
- 中周期 (4h/1d)
- 长周期 (1d/1w)
"""
import asyncio
import json
import logging
import signal
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Callable
import websockets
from .paper_trading import MultiTimeframePaperTrader, TimeFrame, TIMEFRAME_CONFIG
logger = logging.getLogger(__name__)
class RealtimeTrader:
"""实时多周期交易器"""
def __init__(
self,
symbol: str = "btcusdt",
initial_balance: float = 10000.0,
signal_check_interval: int = 60,
):
"""
初始化实时交易器
Args:
symbol: 交易对 (小写)
initial_balance: 初始资金 (分配给三个周期)
signal_check_interval: 信号检查间隔(秒)
"""
self.symbol = symbol.lower()
self.signal_check_interval = signal_check_interval
# WebSocket URL
self.ws_url = f"wss://fstream.binance.com/ws/{self.symbol}@aggTrade"
# 多周期交易器
self.trader = MultiTimeframePaperTrader(initial_balance=initial_balance)
# 状态
self.current_price = 0.0
self.last_signal_check = 0
self.is_running = False
self.ws = None
# 信号文件路径
self.signal_file = Path(__file__).parent.parent / 'output' / 'latest_signal.json'
# 回调函数
self.on_trade_callback: Optional[Callable] = None
self.on_price_callback: Optional[Callable] = None
async def start(self):
"""启动实时交易"""
self.is_running = True
logger.info(f"Starting multi-timeframe realtime trader for {self.symbol.upper()}")
logger.info(f"WebSocket URL: {self.ws_url}")
logger.info(f"Signal check interval: {self.signal_check_interval}s")
for tf in TimeFrame:
config = TIMEFRAME_CONFIG[tf]
account = self.trader.accounts[tf]
logger.info(f" [{config['name_en']}] Balance: ${account.balance:.2f}, Leverage: {account.leverage}x")
while self.is_running:
try:
await self._connect_and_trade()
except Exception as e:
logger.error(f"Connection error: {e}")
if self.is_running:
logger.info("Reconnecting in 5 seconds...")
await asyncio.sleep(5)
async def _connect_and_trade(self):
"""连接 WebSocket 并开始交易"""
async with websockets.connect(self.ws_url) as ws:
self.ws = ws
logger.info("WebSocket connected")
self._print_status()
async for message in ws:
if not self.is_running:
break
try:
data = json.loads(message)
await self._process_tick(data)
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"Error processing tick: {e}")
async def _process_tick(self, data: Dict[str, Any]):
"""处理每个 tick 数据"""
self.current_price = float(data.get('p', 0))
if self.current_price <= 0:
return
if self.on_price_callback:
self.on_price_callback(self.current_price)
# 检查各周期止盈止损
for tf in TimeFrame:
account = self.trader.accounts[tf]
if account.position and account.position.side != 'FLAT':
close_result = self.trader._check_close_position(tf, self.current_price)
if close_result:
self._on_position_closed(tf, close_result)
# 定期检查信号
now = asyncio.get_event_loop().time()
if now - self.last_signal_check >= self.signal_check_interval:
self.last_signal_check = now
await self._check_and_execute_signal()
async def _check_and_execute_signal(self):
"""检查信号并执行交易"""
signal_data = self._load_latest_signal()
if not signal_data:
return
results = self.trader.process_signal(signal_data, self.current_price)
# 处理各周期结果
for tf_value, result in results.get('timeframes', {}).items():
if result['action'] in ['OPEN', 'CLOSE', 'REVERSE']:
tf = TimeFrame(tf_value)
self._on_trade_executed(tf, result)
self._print_status()
def _load_latest_signal(self) -> Optional[Dict[str, Any]]:
"""加载最新信号"""
try:
if not self.signal_file.exists():
return None
with open(self.signal_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading signal: {e}")
return None
def _on_trade_executed(self, tf: TimeFrame, result: Dict[str, Any]):
"""交易执行回调"""
config = TIMEFRAME_CONFIG[tf]
action = result['action']
details = result.get('details', {})
if action == 'OPEN':
logger.info("=" * 60)
logger.info(f"🟢 [{config['name_en']}] OPEN {details['side']}")
logger.info(f" Entry: ${details['entry_price']:.2f}")
logger.info(f" Size: {details['size']:.6f} BTC")
logger.info(f" Stop Loss: ${details['stop_loss']:.2f}")
logger.info(f" Take Profit: ${details['take_profit']:.2f}")
logger.info("=" * 60)
elif action == 'CLOSE':
pnl = details.get('pnl', 0)
pnl_icon = "🟢" if pnl > 0 else "🔴"
logger.info("=" * 60)
logger.info(f"{pnl_icon} [{config['name_en']}] CLOSE {details['side']}")
logger.info(f" Entry: ${details['entry_price']:.2f}")
logger.info(f" Exit: ${details['exit_price']:.2f}")
logger.info(f" PnL: ${pnl:.2f} ({details['pnl_pct']:.2f}%)")
logger.info(f" Reason: {details['reason']}")
logger.info(f" New Balance: ${details['new_balance']:.2f}")
logger.info("=" * 60)
elif action == 'REVERSE':
logger.info("=" * 60)
logger.info(f"🔄 [{config['name_en']}] REVERSE POSITION")
if 'close' in details:
logger.info(f" Closed: PnL ${details['close'].get('pnl', 0):.2f}")
if 'open' in details:
logger.info(f" Opened: {details['open']['side']} @ ${details['open']['entry_price']:.2f}")
logger.info("=" * 60)
if self.on_trade_callback:
self.on_trade_callback({'timeframe': tf.value, 'action': action, 'details': details})
def _on_position_closed(self, tf: TimeFrame, close_result: Dict[str, Any]):
"""持仓被平仓回调(止盈止损)"""
config = TIMEFRAME_CONFIG[tf]
pnl = close_result.get('pnl', 0)
pnl_icon = "🟢" if pnl > 0 else "🔴"
reason = close_result.get('reason', '')
reason_icon = "🎯" if reason == 'TAKE_PROFIT' else "🛑"
logger.info("=" * 60)
logger.info(f"{reason_icon} [{config['name_en']}] {reason}")
logger.info(f" {pnl_icon} PnL: ${pnl:.2f} ({close_result.get('pnl_pct', 0):.2f}%)")
logger.info(f" Entry: ${close_result.get('entry_price', 0):.2f}")
logger.info(f" Exit: ${close_result.get('exit_price', 0):.2f}")
logger.info(f" New Balance: ${close_result.get('new_balance', 0):.2f}")
logger.info("=" * 60)
if self.on_trade_callback:
self.on_trade_callback({
'timeframe': tf.value,
'action': 'CLOSE',
'details': close_result,
})
def _print_status(self):
"""打印当前状态"""
status = self.trader.get_status(self.current_price)
print("\n" + "=" * 80)
print(f"📊 MULTI-TIMEFRAME TRADING STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
print(f"💵 Current Price: ${self.current_price:.2f}")
print(f"💰 Total Balance: ${status['total_balance']:.2f} (Initial: ${status['total_initial_balance']:.2f})")
print(f"📈 Total Return: {status['total_return']:.2f}%")
print("-" * 80)
for tf_value, tf_status in status['timeframes'].items():
name = tf_status['name_en']
balance = tf_status['balance']
return_pct = tf_status['return_pct']
leverage = tf_status['leverage']
stats = tf_status['stats']
return_icon = "🟢" if return_pct > 0 else "🔴" if return_pct < 0 else ""
print(f"\n📊 {name} ({leverage}x)")
print(f" Balance: ${balance:.2f} | Return: {return_icon} {return_pct:+.2f}%")
print(f" Trades: {stats['total_trades']} | Win Rate: {stats['win_rate']:.1f}% | PnL: ${stats['total_pnl']:.2f}")
pos = tf_status.get('position')
if pos:
unrealized = pos.get('unrealized_pnl', 0)
unrealized_pct = pos.get('unrealized_pnl_pct', 0)
pnl_icon = "🟢" if unrealized > 0 else "🔴" if unrealized < 0 else ""
print(f" Position: {pos['side']} @ ${pos['entry_price']:.2f}")
print(f" SL: ${pos['stop_loss']:.2f} | TP: ${pos['take_profit']:.2f}")
print(f" {pnl_icon} Unrealized: ${unrealized:.2f} ({unrealized_pct:+.2f}%)")
else:
print(f" Position: FLAT")
print("=" * 80 + "\n")
def stop(self):
"""停止交易"""
self.is_running = False
logger.info("Stopping realtime trader...")
def get_status(self) -> Dict[str, Any]:
"""获取状态"""
return self.trader.get_status(self.current_price)
async def main():
"""主函数"""
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent.parent / '.env')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
trader = RealtimeTrader(
symbol='btcusdt',
initial_balance=10000.0,
signal_check_interval=30,
)
def signal_handler(sig, frame):
logger.info("Received shutdown signal")
trader.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print("\n" + "=" * 80)
print("🚀 MULTI-TIMEFRAME REALTIME TRADING")
print("=" * 80)
print("Timeframes:")
print(" 📈 Short-term (5m/15m/1h) - 5x leverage")
print(" 📊 Medium-term (4h/1d) - 3x leverage")
print(" 📉 Long-term (1d/1w) - 2x leverage")
print("=" * 80)
print("Press Ctrl+C to stop")
print("=" * 80 + "\n")
await trader.start()
if __name__ == "__main__":
asyncio.run(main())