""" Realtime Trading - 基于 WebSocket 实时数据的多周期交易 使用 Binance WebSocket 获取实时价格,结合信号进行多周期独立交易 支持多币种: BTC/USDT, ETH/USDT 等 每个币种每个周期独立: - 短周期 (5m/15m/1h) - 中周期 (4h/1d) - 长周期 (1d/1w) """ import asyncio import json import logging import signal from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional, Callable, List import websockets from .paper_trading import MultiTimeframePaperTrader, TimeFrame, TIMEFRAME_CONFIG from config.settings import settings logger = logging.getLogger(__name__) class RealtimeTrader: """实时多周期交易器 - 支持多币种""" def __init__( self, symbols: List[str] = None, initial_balance: float = 10000.0, signal_check_interval: int = 60, ): """ 初始化实时交易器 Args: symbols: 交易对列表,如 ['BTCUSDT', 'ETHUSDT'] initial_balance: 每个周期的初始资金 signal_check_interval: 信号检查间隔(秒) """ # 支持多币种 self.symbols = symbols or settings.symbols_list self.signal_check_interval = signal_check_interval # WebSocket URLs - 多币种 self.ws_streams = [f"{sym.lower()}@aggTrade" for sym in self.symbols] self.ws_url = f"wss://fstream.binance.com/stream?streams={'/'.join(self.ws_streams)}" # 多币种多周期交易器 self.trader = MultiTimeframePaperTrader( initial_balance=initial_balance, symbols=self.symbols ) # 状态 - 多币种价格 self.current_prices: Dict[str, float] = {sym: 0.0 for sym in self.symbols} self.last_signal_check = 0 self.is_running = False self.ws = None # 信号文件路径 self.signal_dir = Path(__file__).parent.parent / 'output' self.signal_file = self.signal_dir / 'latest_signal.json' # 向后兼容 # 回调函数 self.on_trade_callback: Optional[Callable] = None self.on_price_callback: Optional[Callable] = None logger.info(f"RealtimeTrader 初始化: {len(self.symbols)} 个币种") async def start(self): """启动实时交易""" self.is_running = True logger.info(f"Starting multi-symbol multi-timeframe realtime trader") logger.info(f"Symbols: {', '.join(self.symbols)}") logger.info(f"WebSocket URL: {self.ws_url}") logger.info(f"Signal check interval: {self.signal_check_interval}s") # 打印各币种各周期状态 for symbol in self.symbols: logger.info(f" [{symbol}]:") for tf in TimeFrame: config = TIMEFRAME_CONFIG[tf] account = self.trader.accounts[symbol][tf] equity = account.get_equity() logger.info(f" [{config['name_en']}] Equity: ${equity:.2f}, Leverage: {account.leverage}x") while self.is_running: try: await self._connect_and_trade() except Exception as e: logger.error(f"Connection error: {e}") if self.is_running: logger.info("Reconnecting in 5 seconds...") await asyncio.sleep(5) async def _connect_and_trade(self): """连接 WebSocket 并开始交易""" async with websockets.connect(self.ws_url) as ws: self.ws = ws logger.info("WebSocket connected") self._print_status() async for message in ws: if not self.is_running: break try: data = json.loads(message) await self._process_tick(data) except json.JSONDecodeError: continue except Exception as e: logger.error(f"Error processing tick: {e}") async def _process_tick(self, data: Dict[str, Any]): """处理每个 tick 数据""" # 多币种 WebSocket 格式: {"stream": "btcusdt@aggTrade", "data": {...}} stream = data.get('stream', '') tick_data = data.get('data', data) # 兼容单流和多流格式 # 从 stream 名称解析币种 symbol = None for sym in self.symbols: if sym.lower() in stream.lower(): symbol = sym break if not symbol: # 尝试从数据中获取 symbol = tick_data.get('s', '').upper() if symbol not in self.symbols: symbol = self.symbols[0] if self.symbols else None if not symbol: return price = float(tick_data.get('p', 0)) if price <= 0: return # 更新该币种价格 self.current_prices[symbol] = price if self.on_price_callback: self.on_price_callback(symbol, price) # 检查该币种各周期止盈止损 for tf in TimeFrame: account = self.trader.accounts[symbol][tf] if account.position and account.position.side != 'FLAT': close_result = self.trader._check_close_position(symbol, tf, price) if close_result: self._on_position_closed(symbol, tf, close_result) # 定期检查信号 (所有币种) now = asyncio.get_event_loop().time() if now - self.last_signal_check >= self.signal_check_interval: self.last_signal_check = now await self._check_and_execute_signal() async def _check_and_execute_signal(self): """检查信号并执行交易 - 所有币种""" # 加载所有币种信号 all_signals = self._load_all_signals() for symbol in self.symbols: signal_data = all_signals.get(symbol) if not signal_data: continue price = self.current_prices.get(symbol, 0) if price <= 0: continue results = self.trader.process_signal(signal_data, price, symbol=symbol) # 处理各周期结果 for tf_value, result in results.get('timeframes', {}).items(): if result['action'] in ['OPEN', 'CLOSE', 'REVERSE', 'ADD']: tf = TimeFrame(tf_value) self._on_trade_executed(symbol, tf, result) self._print_status() def _load_all_signals(self) -> Dict[str, Dict[str, Any]]: """加载所有币种的最新信号""" signals = {} # 尝试加载合并信号文件 signals_file = self.signal_dir / 'latest_signals.json' if signals_file.exists(): try: with open(signals_file, 'r') as f: data = json.load(f) if 'symbols' in data: signals = data['symbols'] except Exception as e: logger.error(f"Error loading signals file: {e}") # 加载各币种独立信号文件 for symbol in self.symbols: if symbol in signals: continue symbol_file = self.signal_dir / f'signal_{symbol.lower()}.json' if symbol_file.exists(): try: with open(symbol_file, 'r') as f: signals[symbol] = json.load(f) except Exception as e: logger.error(f"Error loading {symbol} signal: {e}") # 向后兼容: 使用旧格式的 latest_signal.json # 注意: 只分配给信号中指定的币种,或者根据价格推断币种 if self.signal_file.exists(): try: with open(self.signal_file, 'r') as f: data = json.load(f) # 尝试从信号中获取币种 signal_symbol = data.get('symbol') # 如果没有明确指定,尝试从价格推断 if not signal_symbol: signal_price = (data.get('aggregated_signal', {}) .get('levels', {}) .get('current_price', 0)) if signal_price: # BTC 价格通常 > $10,000,ETH 价格通常 < $10,000 if signal_price > 10000: signal_symbol = 'BTCUSDT' elif signal_price > 100: signal_symbol = 'ETHUSDT' # 只有当推断出的币种在我们的列表中,且还没有信号时才使用 if signal_symbol and signal_symbol in self.symbols and signal_symbol not in signals: signals[signal_symbol] = data logger.info(f"Loaded legacy signal for {signal_symbol}") except Exception as e: logger.error(f"Error loading legacy signal: {e}") return signals def _load_latest_signal(self) -> Optional[Dict[str, Any]]: """加载最新信号 (向后兼容)""" try: if not self.signal_file.exists(): return None with open(self.signal_file, 'r') as f: return json.load(f) except Exception as e: logger.error(f"Error loading signal: {e}") return None def _on_trade_executed(self, symbol: str, tf: TimeFrame, result: Dict[str, Any]): """交易执行回调""" config = TIMEFRAME_CONFIG[tf] action = result['action'] details = result.get('details', {}) unit = symbol.replace('USDT', '') if symbol.endswith('USDT') else symbol if action == 'OPEN': logger.info("=" * 60) logger.info(f"🟢 [{symbol}][{config['name_en']}] OPEN {details['side']}") logger.info(f" Entry: ${details['entry_price']:.2f}") logger.info(f" Size: {details['size']:.6f} {unit}") logger.info(f" Stop Loss: ${details['stop_loss']:.2f}") logger.info(f" Take Profit: ${details['take_profit']:.2f}") logger.info("=" * 60) elif action == 'ADD': logger.info("=" * 60) logger.info(f"📈 [{symbol}][{config['name_en']}] ADD {details['side']} [L{details['pyramid_level']}/{details['max_levels']}]") logger.info(f" Add Price: ${details['add_price']:.2f}") logger.info(f" Add Size: {details['add_size']:.6f} {unit}") logger.info(f" Total Size: {details['total_size']:.6f} {unit}") logger.info(f" Avg Entry: ${details['avg_entry_price']:.2f}") logger.info("=" * 60) elif action == 'CLOSE': pnl = details.get('pnl', 0) pnl_icon = "🟢" if pnl > 0 else "🔴" logger.info("=" * 60) logger.info(f"{pnl_icon} [{symbol}][{config['name_en']}] CLOSE {details['side']}") logger.info(f" Entry: ${details['entry_price']:.2f}") logger.info(f" Exit: ${details['exit_price']:.2f}") logger.info(f" PnL: ${pnl:.2f} ({details['pnl_pct']:.2f}%)") logger.info(f" Reason: {details['reason']}") logger.info(f" New Equity: ${details.get('new_equity', 0):.2f}") logger.info("=" * 60) elif action == 'REVERSE': logger.info("=" * 60) logger.info(f"🔄 [{symbol}][{config['name_en']}] REVERSE POSITION") if 'close' in details: logger.info(f" Closed: PnL ${details['close'].get('pnl', 0):.2f}") if 'open' in details: logger.info(f" Opened: {details['open']['side']} @ ${details['open']['entry_price']:.2f}") logger.info("=" * 60) if self.on_trade_callback: self.on_trade_callback({'symbol': symbol, 'timeframe': tf.value, 'action': action, 'details': details}) def _on_position_closed(self, symbol: str, tf: TimeFrame, close_result: Dict[str, Any]): """持仓被平仓回调(止盈止损)""" config = TIMEFRAME_CONFIG[tf] pnl = close_result.get('pnl', 0) pnl_icon = "🟢" if pnl > 0 else "🔴" reason = close_result.get('reason', '') reason_icon = "🎯" if reason == 'TAKE_PROFIT' else "🛑" logger.info("=" * 60) logger.info(f"{reason_icon} [{symbol}][{config['name_en']}] {reason}") logger.info(f" {pnl_icon} PnL: ${pnl:.2f} ({close_result.get('pnl_pct', 0):.2f}%)") logger.info(f" Entry: ${close_result.get('entry_price', 0):.2f}") logger.info(f" Exit: ${close_result.get('exit_price', 0):.2f}") logger.info(f" New Equity: ${close_result.get('new_equity', 0):.2f}") logger.info("=" * 60) if self.on_trade_callback: self.on_trade_callback({ 'symbol': symbol, 'timeframe': tf.value, 'action': 'CLOSE', 'details': close_result, }) def _print_status(self): """打印当前状态""" status = self.trader.get_status(prices=self.current_prices) print("\n" + "=" * 80) print(f"📊 MULTI-SYMBOL MULTI-TIMEFRAME TRADING STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 80) # 打印各币种价格 prices_str = " | ".join([f"{sym}: ${price:,.2f}" for sym, price in self.current_prices.items() if price > 0]) print(f"💵 Prices: {prices_str}") print(f"💰 Grand Total Equity: ${status['total_equity']:.2f} (Initial: ${status['total_initial_balance']:.2f})") print(f"📈 Grand Total Return: {status['total_return']:.2f}%") print("-" * 80) # 按币种打印 for symbol in self.symbols: symbol_status = status.get('symbols', {}).get(symbol, {}) if not symbol_status: continue unit = symbol.replace('USDT', '') if symbol.endswith('USDT') else symbol sym_equity = symbol_status.get('total_equity', 0) sym_return = symbol_status.get('total_return', 0) return_icon = "🟢" if sym_return > 0 else "🔴" if sym_return < 0 else "⚪" print(f"\n🪙 {symbol} - Equity: ${sym_equity:.2f} | Return: {return_icon} {sym_return:+.2f}%") for tf_value, tf_status in symbol_status.get('timeframes', {}).items(): name = tf_status['name_en'] equity = tf_status['equity'] return_pct = tf_status['return_pct'] leverage = tf_status['leverage'] stats = tf_status['stats'] return_icon = "🟢" if return_pct > 0 else "🔴" if return_pct < 0 else "⚪" print(f" 📊 {name} ({leverage}x)") print(f" Equity: ${equity:.2f} | Return: {return_icon} {return_pct:+.2f}%") print(f" Trades: {stats['total_trades']} | Win Rate: {stats['win_rate']:.1f}% | PnL: ${stats['total_pnl']:.2f}") pos = tf_status.get('position') if pos: unrealized = pos.get('unrealized_pnl', 0) unrealized_pct = pos.get('unrealized_pnl_pct', 0) pnl_icon = "🟢" if unrealized > 0 else "🔴" if unrealized < 0 else "⚪" print(f" Position: {pos['side']} @ ${pos['entry_price']:.2f}") print(f" SL: ${pos['stop_loss']:.2f} | TP: ${pos['take_profit']:.2f}") print(f" {pnl_icon} Unrealized: ${unrealized:.2f} ({unrealized_pct:+.2f}%)") else: print(f" Position: FLAT") print("=" * 80 + "\n") def stop(self): """停止交易""" self.is_running = False logger.info("Stopping realtime trader...") def get_status(self) -> Dict[str, Any]: """获取状态""" return self.trader.get_status(prices=self.current_prices) async def main(): """主函数""" from dotenv import load_dotenv import os load_dotenv(Path(__file__).parent.parent / '.env') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) trader = RealtimeTrader( symbols=settings.symbols_list, initial_balance=10000.0, signal_check_interval=30, ) def signal_handler(sig, frame): logger.info("Received shutdown signal") trader.stop() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) print("\n" + "=" * 80) print("🚀 MULTI-SYMBOL MULTI-TIMEFRAME REALTIME TRADING") print("=" * 80) print(f"Symbols: {', '.join(settings.symbols_list)}") print("Timeframes:") print(" 📈 Short-term (5m/15m/1h) - 10x leverage") print(" 📊 Medium-term (4h/1d) - 10x leverage") print(" 📉 Long-term (1d/1w) - 10x leverage") print(f"Initial Balance: $10,000 per timeframe per symbol") print(f"Total Initial: ${10000 * 3 * len(settings.symbols_list):,}") print("=" * 80) print("Press Ctrl+C to stop") print("=" * 80 + "\n") await trader.start() if __name__ == "__main__": asyncio.run(main())