""" Realtime Trading - 基于 WebSocket 实时数据的多周期交易 使用 Binance WebSocket 获取实时价格,结合信号进行多周期独立交易 - 短周期 (5m/15m/1h) - 中周期 (4h/1d) - 长周期 (1d/1w) """ import asyncio import json import logging import signal from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional, Callable import websockets from .paper_trading import MultiTimeframePaperTrader, TimeFrame, TIMEFRAME_CONFIG logger = logging.getLogger(__name__) class RealtimeTrader: """实时多周期交易器""" def __init__( self, symbol: str = "btcusdt", initial_balance: float = 10000.0, signal_check_interval: int = 60, ): """ 初始化实时交易器 Args: symbol: 交易对 (小写) initial_balance: 初始资金 (分配给三个周期) signal_check_interval: 信号检查间隔(秒) """ self.symbol = symbol.lower() self.signal_check_interval = signal_check_interval # WebSocket URL self.ws_url = f"wss://fstream.binance.com/ws/{self.symbol}@aggTrade" # 多周期交易器 self.trader = MultiTimeframePaperTrader(initial_balance=initial_balance) # 状态 self.current_price = 0.0 self.last_signal_check = 0 self.is_running = False self.ws = None # 信号文件路径 self.signal_file = Path(__file__).parent.parent / 'output' / 'latest_signal.json' # 回调函数 self.on_trade_callback: Optional[Callable] = None self.on_price_callback: Optional[Callable] = None async def start(self): """启动实时交易""" self.is_running = True logger.info(f"Starting multi-timeframe realtime trader for {self.symbol.upper()}") logger.info(f"WebSocket URL: {self.ws_url}") logger.info(f"Signal check interval: {self.signal_check_interval}s") for tf in TimeFrame: config = TIMEFRAME_CONFIG[tf] account = self.trader.accounts[tf] equity = account.get_equity() logger.info(f" [{config['name_en']}] Equity: ${equity:.2f}, Leverage: {account.leverage}x") while self.is_running: try: await self._connect_and_trade() except Exception as e: logger.error(f"Connection error: {e}") if self.is_running: logger.info("Reconnecting in 5 seconds...") await asyncio.sleep(5) async def _connect_and_trade(self): """连接 WebSocket 并开始交易""" async with websockets.connect(self.ws_url) as ws: self.ws = ws logger.info("WebSocket connected") self._print_status() async for message in ws: if not self.is_running: break try: data = json.loads(message) await self._process_tick(data) except json.JSONDecodeError: continue except Exception as e: logger.error(f"Error processing tick: {e}") async def _process_tick(self, data: Dict[str, Any]): """处理每个 tick 数据""" self.current_price = float(data.get('p', 0)) if self.current_price <= 0: return if self.on_price_callback: self.on_price_callback(self.current_price) # 检查各周期止盈止损 for tf in TimeFrame: account = self.trader.accounts[tf] if account.position and account.position.side != 'FLAT': close_result = self.trader._check_close_position(tf, self.current_price) if close_result: self._on_position_closed(tf, close_result) # 定期检查信号 now = asyncio.get_event_loop().time() if now - self.last_signal_check >= self.signal_check_interval: self.last_signal_check = now await self._check_and_execute_signal() async def _check_and_execute_signal(self): """检查信号并执行交易""" signal_data = self._load_latest_signal() if not signal_data: return results = self.trader.process_signal(signal_data, self.current_price) # 处理各周期结果 for tf_value, result in results.get('timeframes', {}).items(): if result['action'] in ['OPEN', 'CLOSE', 'REVERSE']: tf = TimeFrame(tf_value) self._on_trade_executed(tf, result) self._print_status() def _load_latest_signal(self) -> Optional[Dict[str, Any]]: """加载最新信号""" try: if not self.signal_file.exists(): return None with open(self.signal_file, 'r') as f: return json.load(f) except Exception as e: logger.error(f"Error loading signal: {e}") return None def _on_trade_executed(self, tf: TimeFrame, result: Dict[str, Any]): """交易执行回调""" config = TIMEFRAME_CONFIG[tf] action = result['action'] details = result.get('details', {}) if action == 'OPEN': logger.info("=" * 60) logger.info(f"🟢 [{config['name_en']}] OPEN {details['side']}") logger.info(f" Entry: ${details['entry_price']:.2f}") logger.info(f" Size: {details['size']:.6f} BTC") logger.info(f" Stop Loss: ${details['stop_loss']:.2f}") logger.info(f" Take Profit: ${details['take_profit']:.2f}") logger.info("=" * 60) elif action == 'CLOSE': pnl = details.get('pnl', 0) pnl_icon = "🟢" if pnl > 0 else "🔴" logger.info("=" * 60) logger.info(f"{pnl_icon} [{config['name_en']}] CLOSE {details['side']}") logger.info(f" Entry: ${details['entry_price']:.2f}") logger.info(f" Exit: ${details['exit_price']:.2f}") logger.info(f" PnL: ${pnl:.2f} ({details['pnl_pct']:.2f}%)") logger.info(f" Reason: {details['reason']}") logger.info(f" New Equity: ${details.get('new_equity', 0):.2f}") logger.info("=" * 60) elif action == 'REVERSE': logger.info("=" * 60) logger.info(f"🔄 [{config['name_en']}] REVERSE POSITION") if 'close' in details: logger.info(f" Closed: PnL ${details['close'].get('pnl', 0):.2f}") if 'open' in details: logger.info(f" Opened: {details['open']['side']} @ ${details['open']['entry_price']:.2f}") logger.info("=" * 60) if self.on_trade_callback: self.on_trade_callback({'timeframe': tf.value, 'action': action, 'details': details}) def _on_position_closed(self, tf: TimeFrame, close_result: Dict[str, Any]): """持仓被平仓回调(止盈止损)""" config = TIMEFRAME_CONFIG[tf] pnl = close_result.get('pnl', 0) pnl_icon = "🟢" if pnl > 0 else "🔴" reason = close_result.get('reason', '') reason_icon = "🎯" if reason == 'TAKE_PROFIT' else "🛑" logger.info("=" * 60) logger.info(f"{reason_icon} [{config['name_en']}] {reason}") logger.info(f" {pnl_icon} PnL: ${pnl:.2f} ({close_result.get('pnl_pct', 0):.2f}%)") logger.info(f" Entry: ${close_result.get('entry_price', 0):.2f}") logger.info(f" Exit: ${close_result.get('exit_price', 0):.2f}") logger.info(f" New Equity: ${close_result.get('new_equity', 0):.2f}") logger.info("=" * 60) if self.on_trade_callback: self.on_trade_callback({ 'timeframe': tf.value, 'action': 'CLOSE', 'details': close_result, }) def _print_status(self): """打印当前状态""" status = self.trader.get_status(self.current_price) print("\n" + "=" * 80) print(f"📊 MULTI-TIMEFRAME TRADING STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 80) print(f"💵 Current Price: ${self.current_price:.2f}") print(f"💰 Total Equity: ${status['total_equity']:.2f} (Initial: ${status['total_initial_balance']:.2f})") print(f"📈 Total Return: {status['total_return']:.2f}%") print("-" * 80) for tf_value, tf_status in status['timeframes'].items(): name = tf_status['name_en'] equity = tf_status['equity'] return_pct = tf_status['return_pct'] leverage = tf_status['leverage'] stats = tf_status['stats'] return_icon = "🟢" if return_pct > 0 else "🔴" if return_pct < 0 else "⚪" print(f"\n📊 {name} ({leverage}x)") print(f" Equity: ${equity:.2f} | Return: {return_icon} {return_pct:+.2f}%") print(f" Trades: {stats['total_trades']} | Win Rate: {stats['win_rate']:.1f}% | PnL: ${stats['total_pnl']:.2f}") pos = tf_status.get('position') if pos: unrealized = pos.get('unrealized_pnl', 0) unrealized_pct = pos.get('unrealized_pnl_pct', 0) pnl_icon = "🟢" if unrealized > 0 else "🔴" if unrealized < 0 else "⚪" print(f" Position: {pos['side']} @ ${pos['entry_price']:.2f}") print(f" SL: ${pos['stop_loss']:.2f} | TP: ${pos['take_profit']:.2f}") print(f" {pnl_icon} Unrealized: ${unrealized:.2f} ({unrealized_pct:+.2f}%)") else: print(f" Position: FLAT") print("=" * 80 + "\n") def stop(self): """停止交易""" self.is_running = False logger.info("Stopping realtime trader...") def get_status(self) -> Dict[str, Any]: """获取状态""" return self.trader.get_status(self.current_price) async def main(): """主函数""" from dotenv import load_dotenv load_dotenv(Path(__file__).parent.parent / '.env') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) trader = RealtimeTrader( symbol='btcusdt', initial_balance=10000.0, signal_check_interval=30, ) def signal_handler(sig, frame): logger.info("Received shutdown signal") trader.stop() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) print("\n" + "=" * 80) print("🚀 MULTI-TIMEFRAME REALTIME TRADING") print("=" * 80) print("Timeframes:") print(" 📈 Short-term (5m/15m/1h) - 5x leverage") print(" 📊 Medium-term (4h/1d) - 3x leverage") print(" 📉 Long-term (1d/1w) - 2x leverage") print("=" * 80) print("Press Ctrl+C to stop") print("=" * 80 + "\n") await trader.start() if __name__ == "__main__": asyncio.run(main())