From 151c9cb6fd26ac2f6429e6178c174c7ddb7ab11a Mon Sep 17 00:00:00 2001 From: aaron <> Date: Tue, 9 Dec 2025 23:35:46 +0800 Subject: [PATCH] 1 --- analysis/volatility_monitor.py | 332 +++++++++++++++++++++++++++++++++ scheduler.py | 148 ++++++++++++--- trading/realtime_trader.py | 270 +++++++++++++++++++-------- 3 files changed, 652 insertions(+), 98 deletions(-) create mode 100644 analysis/volatility_monitor.py diff --git a/analysis/volatility_monitor.py b/analysis/volatility_monitor.py new file mode 100644 index 0000000..312f8f3 --- /dev/null +++ b/analysis/volatility_monitor.py @@ -0,0 +1,332 @@ +""" +Volatility Monitor - 实时监控价格波动率 + +当波动率突然升高时触发分析,防止漏掉突发行情 +""" +import asyncio +import logging +import json +import urllib.request +import ssl +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Callable, Any +from dataclasses import dataclass, field +from collections import deque + +logger = logging.getLogger(__name__) + + +@dataclass +class PricePoint: + """价格数据点""" + timestamp: datetime + price: float + volume: float = 0.0 + + +@dataclass +class VolatilityState: + """单个币种的波动率状态""" + symbol: str + prices: deque = field(default_factory=lambda: deque(maxlen=120)) # 保存最近120个数据点 + last_trigger_time: datetime = None + baseline_volatility: float = 0.0 # 基准波动率 + current_volatility: float = 0.0 # 当前波动率 + + def add_price(self, price: float, volume: float = 0.0): + """添加价格点""" + self.prices.append(PricePoint( + timestamp=datetime.now(), + price=price, + volume=volume + )) + + def get_prices_in_window(self, minutes: int) -> List[float]: + """获取指定时间窗口内的价格""" + cutoff = datetime.now() - timedelta(minutes=minutes) + return [p.price for p in self.prices if p.timestamp >= cutoff] + + +class VolatilityMonitor: + """ + 波动率监控器 + + 监控策略: + 1. 价格变化率: 短期价格变化超过阈值 + 2. 波动率突增: 当前波动率 vs 基准波动率 + 3. 连续大幅波动: 连续多个周期出现大幅波动 + """ + + # Binance API + BINANCE_PRICE_URL = "https://fapi.binance.com/fapi/v1/ticker/price" + BINANCE_TICKER_URL = "https://fapi.binance.com/fapi/v1/ticker/24hr" + + def __init__( + self, + symbols: List[str], + on_volatility_spike: Callable[[str, Dict], Any] = None, + check_interval: int = 5, # 检查间隔(秒) + price_change_threshold: float = 0.5, # 价格变化阈值 0.5% + volatility_multiplier: float = 2.0, # 波动率突增倍数 + cooldown_minutes: int = 3, # 触发后冷却时间(分钟) + ): + """ + Args: + symbols: 监控的交易对列表 + on_volatility_spike: 波动率突增时的回调函数 + check_interval: 检查间隔(秒) + price_change_threshold: 价格变化阈值(百分比) + volatility_multiplier: 波动率突增倍数阈值 + cooldown_minutes: 触发后冷却时间(分钟) + """ + self.symbols = symbols + self.on_volatility_spike = on_volatility_spike + self.check_interval = check_interval + self.price_change_threshold = price_change_threshold + self.volatility_multiplier = volatility_multiplier + self.cooldown_minutes = cooldown_minutes + + # 每个币种的状态 + self.states: Dict[str, VolatilityState] = { + symbol: VolatilityState(symbol=symbol) for symbol in symbols + } + + self.is_running = False + self._ssl_context = ssl.create_default_context() + + logger.info( + f"VolatilityMonitor 初始化: {len(symbols)} 个币种, " + f"检查间隔 {check_interval}s, 价格阈值 {price_change_threshold}%, " + f"波动率倍数 {volatility_multiplier}x, 冷却时间 {cooldown_minutes}min" + ) + + async def start(self): + """启动监控""" + self.is_running = True + logger.info("VolatilityMonitor 启动") + + # 先获取初始价格建立基准 + await self._initialize_baseline() + + while self.is_running: + try: + await self._check_volatility() + await asyncio.sleep(self.check_interval) + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"波动率检查错误: {e}", exc_info=True) + await asyncio.sleep(self.check_interval) + + logger.info("VolatilityMonitor 已停止") + + def stop(self): + """停止监控""" + self.is_running = False + + async def _initialize_baseline(self): + """初始化基准波动率""" + logger.info("正在建立波动率基准...") + + # 获取24小时统计数据来估算基准波动率 + for symbol in self.symbols: + try: + ticker = await self._fetch_24h_ticker(symbol) + if ticker: + # 使用24小时价格变化百分比作为基准 + price_change_pct = abs(float(ticker.get('priceChangePercent', 0))) + # 转换为每分钟的预期波动率 + self.states[symbol].baseline_volatility = price_change_pct / (24 * 60) * 5 # 5分钟窗口 + logger.info( + f"[{symbol}] 24h变化: {price_change_pct:.2f}%, " + f"基准波动率: {self.states[symbol].baseline_volatility:.4f}%/5min" + ) + except Exception as e: + logger.error(f"[{symbol}] 获取基准数据失败: {e}") + self.states[symbol].baseline_volatility = 0.1 # 默认值 + + # 收集一些初始价格数据 + for _ in range(6): # 收集30秒的数据 + prices = await self._fetch_prices() + for symbol, price in prices.items(): + if symbol in self.states: + self.states[symbol].add_price(price) + await asyncio.sleep(5) + + logger.info("波动率基准建立完成") + + async def _check_volatility(self): + """检查所有币种的波动率""" + prices = await self._fetch_prices() + + for symbol, price in prices.items(): + if symbol not in self.states: + continue + + state = self.states[symbol] + state.add_price(price) + + # 检查是否在冷却期 + if state.last_trigger_time: + elapsed = (datetime.now() - state.last_trigger_time).total_seconds() / 60 + if elapsed < self.cooldown_minutes: + continue + + # 检查波动率 + spike_info = self._detect_volatility_spike(state) + if spike_info: + logger.warning( + f"[{symbol}] 波动率突增! " + f"类型: {spike_info['type']}, " + f"变化: {spike_info['change_pct']:.2f}%, " + f"当前价: ${price:,.2f}" + ) + state.last_trigger_time = datetime.now() + + # 触发回调 + if self.on_volatility_spike: + try: + result = self.on_volatility_spike(symbol, spike_info) + if asyncio.iscoroutine(result): + await result + except Exception as e: + logger.error(f"[{symbol}] 波动率回调执行失败: {e}") + + def _detect_volatility_spike(self, state: VolatilityState) -> Optional[Dict]: + """ + 检测波动率突增 + + 返回: + 触发信息字典,如果未触发返回 None + """ + if len(state.prices) < 10: + return None + + prices = [p.price for p in state.prices] + current_price = prices[-1] + + # 1. 检查短期价格变化 (1分钟) + prices_1min = state.get_prices_in_window(1) + if len(prices_1min) >= 2: + min_price = min(prices_1min) + max_price = max(prices_1min) + change_1min = (max_price - min_price) / min_price * 100 + + if change_1min >= self.price_change_threshold: + direction = "UP" if current_price >= prices_1min[0] else "DOWN" + return { + 'type': 'rapid_move', + 'timeframe': '1min', + 'change_pct': change_1min, + 'direction': direction, + 'current_price': current_price, + 'trigger_time': datetime.now().isoformat(), + } + + # 2. 检查中期价格变化 (5分钟) + prices_5min = state.get_prices_in_window(5) + if len(prices_5min) >= 10: + min_price = min(prices_5min) + max_price = max(prices_5min) + change_5min = (max_price - min_price) / min_price * 100 + + # 5分钟窗口使用更高的阈值 + if change_5min >= self.price_change_threshold * 2: + direction = "UP" if current_price >= prices_5min[0] else "DOWN" + return { + 'type': 'sustained_move', + 'timeframe': '5min', + 'change_pct': change_5min, + 'direction': direction, + 'current_price': current_price, + 'trigger_time': datetime.now().isoformat(), + } + + # 3. 检查波动率突增 + if len(prices_5min) >= 10 and state.baseline_volatility > 0: + # 计算当前波动率 (标准差 / 均值 * 100) + import statistics + mean_price = statistics.mean(prices_5min) + std_price = statistics.stdev(prices_5min) + current_volatility = (std_price / mean_price) * 100 + + state.current_volatility = current_volatility + + if current_volatility >= state.baseline_volatility * self.volatility_multiplier: + return { + 'type': 'volatility_spike', + 'timeframe': '5min', + 'change_pct': current_volatility, + 'baseline_pct': state.baseline_volatility, + 'multiplier': current_volatility / state.baseline_volatility, + 'current_price': current_price, + 'trigger_time': datetime.now().isoformat(), + } + + return None + + async def _fetch_prices(self) -> Dict[str, float]: + """获取所有币种的当前价格""" + prices = {} + loop = asyncio.get_event_loop() + + try: + for symbol in self.symbols: + price = await loop.run_in_executor(None, self._fetch_price_sync, symbol) + if price: + prices[symbol] = price + except Exception as e: + logger.error(f"获取价格失败: {e}") + + return prices + + def _fetch_price_sync(self, symbol: str) -> Optional[float]: + """同步获取单个币种价格""" + try: + url = f"{self.BINANCE_PRICE_URL}?symbol={symbol}" + req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) + with urllib.request.urlopen(req, timeout=5, context=self._ssl_context) as response: + data = json.loads(response.read().decode('utf-8')) + return float(data['price']) + except Exception as e: + logger.debug(f"获取 {symbol} 价格失败: {e}") + return None + + async def _fetch_24h_ticker(self, symbol: str) -> Optional[Dict]: + """获取24小时统计数据""" + loop = asyncio.get_event_loop() + try: + return await loop.run_in_executor(None, self._fetch_24h_ticker_sync, symbol) + except Exception as e: + logger.error(f"获取 {symbol} 24h数据失败: {e}") + return None + + def _fetch_24h_ticker_sync(self, symbol: str) -> Optional[Dict]: + """同步获取24小时统计""" + try: + url = f"{self.BINANCE_TICKER_URL}?symbol={symbol}" + req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) + with urllib.request.urlopen(req, timeout=5, context=self._ssl_context) as response: + return json.loads(response.read().decode('utf-8')) + except Exception as e: + logger.debug(f"获取 {symbol} 24h数据失败: {e}") + return None + + def get_status(self) -> Dict[str, Any]: + """获取监控状态""" + status = { + 'is_running': self.is_running, + 'symbols': {}, + } + + for symbol, state in self.states.items(): + prices = [p.price for p in state.prices] + status['symbols'][symbol] = { + 'price_count': len(prices), + 'current_price': prices[-1] if prices else None, + 'baseline_volatility': state.baseline_volatility, + 'current_volatility': state.current_volatility, + 'last_trigger': state.last_trigger_time.isoformat() if state.last_trigger_time else None, + } + + return status diff --git a/scheduler.py b/scheduler.py index c3ab3f2..6bf8b46 100644 --- a/scheduler.py +++ b/scheduler.py @@ -3,6 +3,10 @@ Signal Generation Scheduler - 定时生成交易信号 每隔指定时间间隔自动运行量化分析和LLM决策 支持多币种: BTC/USDT, ETH/USDT 等 + +特性: +- 定时分析: 每隔指定时间自动分析 +- 波动率触发: 当价格波动率突增时自动触发分析 """ import asyncio import logging @@ -19,6 +23,7 @@ sys.path.insert(0, str(Path(__file__).parent)) from config.settings import settings from analysis.engine import MarketAnalysisEngine +from analysis.volatility_monitor import VolatilityMonitor from signals.quantitative import QuantitativeSignalGenerator from signals.llm_decision import LLMDecisionMaker from signals.aggregator import SignalAggregator @@ -33,16 +38,27 @@ logger = logging.getLogger(__name__) class SignalScheduler: - """定时信号生成调度器 - 支持多币种""" + """定时信号生成调度器 - 支持多币种 + 波动率触发""" - def __init__(self, interval_minutes: int = 5, symbols: List[str] = None): + def __init__( + self, + interval_minutes: int = 5, + symbols: List[str] = None, + enable_volatility_trigger: bool = True, + volatility_threshold: float = 0.5, # 波动率阈值 0.5% + volatility_cooldown: int = 3, # 波动率触发冷却时间(分钟) + ): """ Args: interval_minutes: 生成信号的时间间隔(分钟) symbols: 交易对列表,如 ['BTCUSDT', 'ETHUSDT'] + enable_volatility_trigger: 是否启用波动率触发 + volatility_threshold: 波动率触发阈值(百分比) + volatility_cooldown: 波动率触发后的冷却时间(分钟) """ self.interval_minutes = interval_minutes self.is_running = False + self.enable_volatility_trigger = enable_volatility_trigger # 支持多币种 self.symbols = symbols or settings.symbols_list @@ -66,7 +82,26 @@ class SignalScheduler: enabled=bool(dingtalk_webhook) ) - logger.info(f"Signal Scheduler 初始化完成 - 每{interval_minutes}分钟生成一次信号") + # 波动率监控器 + self.volatility_monitor = None + if enable_volatility_trigger: + self.volatility_monitor = VolatilityMonitor( + symbols=self.symbols, + on_volatility_spike=self._on_volatility_spike, + check_interval=5, # 每5秒检查一次 + price_change_threshold=volatility_threshold, + volatility_multiplier=2.0, + cooldown_minutes=volatility_cooldown, + ) + + # 用于追踪触发的分析任务 + self._pending_analysis: Dict[str, bool] = {} + self._analysis_lock = asyncio.Lock() + + logger.info( + f"Signal Scheduler 初始化完成 - 定时: 每{interval_minutes}分钟, " + f"波动率触发: {'启用' if enable_volatility_trigger else '禁用'}" + ) async def generate_signal_for_symbol(self, symbol: str) -> Dict: """为单个币种生成信号""" @@ -207,29 +242,84 @@ class SignalScheduler: except Exception as e: logger.error(f"[{symbol}] 钉钉通知发送异常: {e}", exc_info=True) + async def _on_volatility_spike(self, symbol: str, spike_info: Dict): + """波动率突增时的回调处理""" + logger.info( + f"🚨 [{symbol}] 波动率触发! " + f"类型: {spike_info['type']}, " + f"变化: {spike_info['change_pct']:.2f}%, " + f"方向: {spike_info.get('direction', 'N/A')}" + ) + + # 避免重复触发 + async with self._analysis_lock: + if self._pending_analysis.get(symbol): + logger.info(f"[{symbol}] 已有分析任务在执行,跳过") + return + self._pending_analysis[symbol] = True + + try: + # 为该币种生成信号 + logger.info(f"🔄 [{symbol}] 波动率触发分析开始...") + result = await self.generate_signal_for_symbol(symbol) + + if result: + # 保存独立信号文件(已在 generate_signal_for_symbol 中完成) + # 发送通知(标记为波动率触发) + aggregated = result.get('aggregated_signal', {}) + aggregated['trigger_type'] = 'volatility_spike' + aggregated['spike_info'] = spike_info + await self._send_notification(symbol, result) + + logger.info(f"✅ [{symbol}] 波动率触发分析完成") + else: + logger.warning(f"[{symbol}] 波动率触发分析失败") + + finally: + async with self._analysis_lock: + self._pending_analysis[symbol] = False + async def run(self): """启动调度器主循环""" self.is_running = True logger.info(f"Signal Scheduler 启动 - 每{self.interval_minutes}分钟生成信号") - # 立即生成一次 - await self.generate_signal_once() + # 启动波动率监控器(如果启用) + volatility_task = None + if self.volatility_monitor: + volatility_task = asyncio.create_task(self.volatility_monitor.start()) + logger.info("波动率监控器已启动") - # 定时循环 - while self.is_running: - try: - # 等待指定时间间隔 - await asyncio.sleep(self.interval_minutes * 60) + try: + # 立即生成一次 + await self.generate_signal_once() - # 生成信号 - await self.generate_signal_once() + # 定时循环 + while self.is_running: + try: + # 等待指定时间间隔 + await asyncio.sleep(self.interval_minutes * 60) - except asyncio.CancelledError: - logger.info("调度器收到取消信号") - break - except Exception as e: - logger.error(f"调度器错误: {e}", exc_info=True) - await asyncio.sleep(60) # 错误后等待1分钟再继续 + # 生成信号 + await self.generate_signal_once() + + except asyncio.CancelledError: + logger.info("调度器收到取消信号") + break + except Exception as e: + logger.error(f"调度器错误: {e}", exc_info=True) + await asyncio.sleep(60) # 错误后等待1分钟再继续 + + finally: + # 停止波动率监控器 + if self.volatility_monitor: + self.volatility_monitor.stop() + if volatility_task: + volatility_task.cancel() + try: + await volatility_task + except asyncio.CancelledError: + pass logger.info("Signal Scheduler 已停止") @@ -240,11 +330,27 @@ class SignalScheduler: async def main(): """主入口""" - # 从环境变量或默认值获取间隔 - import os + # 从环境变量获取配置 interval = int(os.getenv('SIGNAL_INTERVAL_MINUTES', '5')) + enable_volatility = os.getenv('ENABLE_VOLATILITY_TRIGGER', 'true').lower() == 'true' + volatility_threshold = float(os.getenv('VOLATILITY_THRESHOLD', '0.5')) + volatility_cooldown = int(os.getenv('VOLATILITY_COOLDOWN_MINUTES', '3')) - scheduler = SignalScheduler(interval_minutes=interval) + logger.info("=" * 60) + logger.info("Signal Scheduler 配置:") + logger.info(f" 定时间隔: {interval} 分钟") + logger.info(f" 波动率触发: {'启用' if enable_volatility else '禁用'}") + if enable_volatility: + logger.info(f" 波动率阈值: {volatility_threshold}%") + logger.info(f" 触发冷却: {volatility_cooldown} 分钟") + logger.info("=" * 60) + + scheduler = SignalScheduler( + interval_minutes=interval, + enable_volatility_trigger=enable_volatility, + volatility_threshold=volatility_threshold, + volatility_cooldown=volatility_cooldown, + ) # Setup signal handlers for graceful shutdown def signal_handler(sig, _frame): diff --git a/trading/realtime_trader.py b/trading/realtime_trader.py index 89c4121..dc70b56 100644 --- a/trading/realtime_trader.py +++ b/trading/realtime_trader.py @@ -2,6 +2,9 @@ Realtime Trading - 基于 WebSocket 实时数据的多周期交易 使用 Binance WebSocket 获取实时价格,结合信号进行多周期独立交易 +支持多币种: BTC/USDT, ETH/USDT 等 + +每个币种每个周期独立: - 短周期 (5m/15m/1h) - 中周期 (4h/1d) - 长周期 (1d/1w) @@ -12,21 +15,22 @@ import logging import signal from datetime import datetime from pathlib import Path -from typing import Dict, Any, Optional, Callable +from typing import Dict, Any, Optional, Callable, List import websockets from .paper_trading import MultiTimeframePaperTrader, TimeFrame, TIMEFRAME_CONFIG +from config.settings import settings logger = logging.getLogger(__name__) class RealtimeTrader: - """实时多周期交易器""" + """实时多周期交易器 - 支持多币种""" def __init__( self, - symbol: str = "btcusdt", + symbols: List[str] = None, initial_balance: float = 10000.0, signal_check_interval: int = 60, ): @@ -34,44 +38,56 @@ class RealtimeTrader: 初始化实时交易器 Args: - symbol: 交易对 (小写) - initial_balance: 初始资金 (分配给三个周期) + symbols: 交易对列表,如 ['BTCUSDT', 'ETHUSDT'] + initial_balance: 每个周期的初始资金 signal_check_interval: 信号检查间隔(秒) """ - self.symbol = symbol.lower() + # 支持多币种 + self.symbols = symbols or settings.symbols_list self.signal_check_interval = signal_check_interval - # WebSocket URL - self.ws_url = f"wss://fstream.binance.com/ws/{self.symbol}@aggTrade" + # WebSocket URLs - 多币种 + self.ws_streams = [f"{sym.lower()}@aggTrade" for sym in self.symbols] + self.ws_url = f"wss://fstream.binance.com/stream?streams={'/'.join(self.ws_streams)}" - # 多周期交易器 - self.trader = MultiTimeframePaperTrader(initial_balance=initial_balance) + # 多币种多周期交易器 + self.trader = MultiTimeframePaperTrader( + initial_balance=initial_balance, + symbols=self.symbols + ) - # 状态 - self.current_price = 0.0 + # 状态 - 多币种价格 + self.current_prices: Dict[str, float] = {sym: 0.0 for sym in self.symbols} self.last_signal_check = 0 self.is_running = False self.ws = None # 信号文件路径 - self.signal_file = Path(__file__).parent.parent / 'output' / 'latest_signal.json' + self.signal_dir = Path(__file__).parent.parent / 'output' + self.signal_file = self.signal_dir / 'latest_signal.json' # 向后兼容 # 回调函数 self.on_trade_callback: Optional[Callable] = None self.on_price_callback: Optional[Callable] = None + logger.info(f"RealtimeTrader 初始化: {len(self.symbols)} 个币种") + async def start(self): """启动实时交易""" self.is_running = True - logger.info(f"Starting multi-timeframe realtime trader for {self.symbol.upper()}") + logger.info(f"Starting multi-symbol multi-timeframe realtime trader") + logger.info(f"Symbols: {', '.join(self.symbols)}") logger.info(f"WebSocket URL: {self.ws_url}") logger.info(f"Signal check interval: {self.signal_check_interval}s") - for tf in TimeFrame: - config = TIMEFRAME_CONFIG[tf] - account = self.trader.accounts[tf] - equity = account.get_equity() - logger.info(f" [{config['name_en']}] Equity: ${equity:.2f}, Leverage: {account.leverage}x") + # 打印各币种各周期状态 + for symbol in self.symbols: + logger.info(f" [{symbol}]:") + for tf in TimeFrame: + config = TIMEFRAME_CONFIG[tf] + account = self.trader.accounts[symbol][tf] + equity = account.get_equity() + logger.info(f" [{config['name_en']}] Equity: ${equity:.2f}, Leverage: {account.leverage}x") while self.is_running: try: @@ -104,47 +120,116 @@ class RealtimeTrader: async def _process_tick(self, data: Dict[str, Any]): """处理每个 tick 数据""" - self.current_price = float(data.get('p', 0)) + # 多币种 WebSocket 格式: {"stream": "btcusdt@aggTrade", "data": {...}} + stream = data.get('stream', '') + tick_data = data.get('data', data) # 兼容单流和多流格式 - if self.current_price <= 0: + # 从 stream 名称解析币种 + symbol = None + for sym in self.symbols: + if sym.lower() in stream.lower(): + symbol = sym + break + + if not symbol: + # 尝试从数据中获取 + symbol = tick_data.get('s', '').upper() + if symbol not in self.symbols: + symbol = self.symbols[0] if self.symbols else None + + if not symbol: return + price = float(tick_data.get('p', 0)) + if price <= 0: + return + + # 更新该币种价格 + self.current_prices[symbol] = price + if self.on_price_callback: - self.on_price_callback(self.current_price) + self.on_price_callback(symbol, price) - # 检查各周期止盈止损 + # 检查该币种各周期止盈止损 for tf in TimeFrame: - account = self.trader.accounts[tf] + account = self.trader.accounts[symbol][tf] if account.position and account.position.side != 'FLAT': - close_result = self.trader._check_close_position(tf, self.current_price) + close_result = self.trader._check_close_position(symbol, tf, price) if close_result: - self._on_position_closed(tf, close_result) + self._on_position_closed(symbol, tf, close_result) - # 定期检查信号 + # 定期检查信号 (所有币种) now = asyncio.get_event_loop().time() if now - self.last_signal_check >= self.signal_check_interval: self.last_signal_check = now await self._check_and_execute_signal() async def _check_and_execute_signal(self): - """检查信号并执行交易""" - signal_data = self._load_latest_signal() + """检查信号并执行交易 - 所有币种""" + # 加载所有币种信号 + all_signals = self._load_all_signals() - if not signal_data: - return + for symbol in self.symbols: + signal_data = all_signals.get(symbol) + if not signal_data: + continue - results = self.trader.process_signal(signal_data, self.current_price) + price = self.current_prices.get(symbol, 0) + if price <= 0: + continue - # 处理各周期结果 - for tf_value, result in results.get('timeframes', {}).items(): - if result['action'] in ['OPEN', 'CLOSE', 'REVERSE']: - tf = TimeFrame(tf_value) - self._on_trade_executed(tf, result) + results = self.trader.process_signal(signal_data, price, symbol=symbol) + + # 处理各周期结果 + for tf_value, result in results.get('timeframes', {}).items(): + if result['action'] in ['OPEN', 'CLOSE', 'REVERSE', 'ADD']: + tf = TimeFrame(tf_value) + self._on_trade_executed(symbol, tf, result) self._print_status() + def _load_all_signals(self) -> Dict[str, Dict[str, Any]]: + """加载所有币种的最新信号""" + signals = {} + + # 尝试加载合并信号文件 + signals_file = self.signal_dir / 'latest_signals.json' + if signals_file.exists(): + try: + with open(signals_file, 'r') as f: + data = json.load(f) + if 'symbols' in data: + signals = data['symbols'] + except Exception as e: + logger.error(f"Error loading signals file: {e}") + + # 加载各币种独立信号文件 + for symbol in self.symbols: + if symbol in signals: + continue + + symbol_file = self.signal_dir / f'signal_{symbol.lower()}.json' + if symbol_file.exists(): + try: + with open(symbol_file, 'r') as f: + signals[symbol] = json.load(f) + except Exception as e: + logger.error(f"Error loading {symbol} signal: {e}") + + # 向后兼容: 使用旧格式的 latest_signal.json + if not signals and self.signal_file.exists(): + try: + with open(self.signal_file, 'r') as f: + data = json.load(f) + first_symbol = self.symbols[0] if self.symbols else 'BTCUSDT' + signals[first_symbol] = data + except Exception as e: + logger.error(f"Error loading legacy signal: {e}") + + return signals + def _load_latest_signal(self) -> Optional[Dict[str, Any]]: - """加载最新信号""" + """加载最新信号 (向后兼容)""" try: if not self.signal_file.exists(): return None @@ -155,26 +240,36 @@ class RealtimeTrader: logger.error(f"Error loading signal: {e}") return None - def _on_trade_executed(self, tf: TimeFrame, result: Dict[str, Any]): + def _on_trade_executed(self, symbol: str, tf: TimeFrame, result: Dict[str, Any]): """交易执行回调""" config = TIMEFRAME_CONFIG[tf] action = result['action'] details = result.get('details', {}) + unit = symbol.replace('USDT', '') if symbol.endswith('USDT') else symbol if action == 'OPEN': logger.info("=" * 60) - logger.info(f"🟢 [{config['name_en']}] OPEN {details['side']}") + logger.info(f"🟢 [{symbol}][{config['name_en']}] OPEN {details['side']}") logger.info(f" Entry: ${details['entry_price']:.2f}") - logger.info(f" Size: {details['size']:.6f} BTC") + logger.info(f" Size: {details['size']:.6f} {unit}") logger.info(f" Stop Loss: ${details['stop_loss']:.2f}") logger.info(f" Take Profit: ${details['take_profit']:.2f}") logger.info("=" * 60) + elif action == 'ADD': + logger.info("=" * 60) + logger.info(f"📈 [{symbol}][{config['name_en']}] ADD {details['side']} [L{details['pyramid_level']}/{details['max_levels']}]") + logger.info(f" Add Price: ${details['add_price']:.2f}") + logger.info(f" Add Size: {details['add_size']:.6f} {unit}") + logger.info(f" Total Size: {details['total_size']:.6f} {unit}") + logger.info(f" Avg Entry: ${details['avg_entry_price']:.2f}") + logger.info("=" * 60) + elif action == 'CLOSE': pnl = details.get('pnl', 0) pnl_icon = "🟢" if pnl > 0 else "🔴" logger.info("=" * 60) - logger.info(f"{pnl_icon} [{config['name_en']}] CLOSE {details['side']}") + logger.info(f"{pnl_icon} [{symbol}][{config['name_en']}] CLOSE {details['side']}") logger.info(f" Entry: ${details['entry_price']:.2f}") logger.info(f" Exit: ${details['exit_price']:.2f}") logger.info(f" PnL: ${pnl:.2f} ({details['pnl_pct']:.2f}%)") @@ -184,7 +279,7 @@ class RealtimeTrader: elif action == 'REVERSE': logger.info("=" * 60) - logger.info(f"🔄 [{config['name_en']}] REVERSE POSITION") + logger.info(f"🔄 [{symbol}][{config['name_en']}] REVERSE POSITION") if 'close' in details: logger.info(f" Closed: PnL ${details['close'].get('pnl', 0):.2f}") if 'open' in details: @@ -192,9 +287,9 @@ class RealtimeTrader: logger.info("=" * 60) if self.on_trade_callback: - self.on_trade_callback({'timeframe': tf.value, 'action': action, 'details': details}) + self.on_trade_callback({'symbol': symbol, 'timeframe': tf.value, 'action': action, 'details': details}) - def _on_position_closed(self, tf: TimeFrame, close_result: Dict[str, Any]): + def _on_position_closed(self, symbol: str, tf: TimeFrame, close_result: Dict[str, Any]): """持仓被平仓回调(止盈止损)""" config = TIMEFRAME_CONFIG[tf] pnl = close_result.get('pnl', 0) @@ -203,7 +298,7 @@ class RealtimeTrader: reason_icon = "🎯" if reason == 'TAKE_PROFIT' else "🛑" logger.info("=" * 60) - logger.info(f"{reason_icon} [{config['name_en']}] {reason}") + logger.info(f"{reason_icon} [{symbol}][{config['name_en']}] {reason}") logger.info(f" {pnl_icon} PnL: ${pnl:.2f} ({close_result.get('pnl_pct', 0):.2f}%)") logger.info(f" Entry: ${close_result.get('entry_price', 0):.2f}") logger.info(f" Exit: ${close_result.get('exit_price', 0):.2f}") @@ -212,6 +307,7 @@ class RealtimeTrader: if self.on_trade_callback: self.on_trade_callback({ + 'symbol': symbol, 'timeframe': tf.value, 'action': 'CLOSE', 'details': close_result, @@ -219,39 +315,55 @@ class RealtimeTrader: def _print_status(self): """打印当前状态""" - status = self.trader.get_status(self.current_price) + status = self.trader.get_status(prices=self.current_prices) print("\n" + "=" * 80) - print(f"📊 MULTI-TIMEFRAME TRADING STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"📊 MULTI-SYMBOL MULTI-TIMEFRAME TRADING STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 80) - print(f"💵 Current Price: ${self.current_price:.2f}") - print(f"💰 Total Equity: ${status['total_equity']:.2f} (Initial: ${status['total_initial_balance']:.2f})") - print(f"📈 Total Return: {status['total_return']:.2f}%") + + # 打印各币种价格 + prices_str = " | ".join([f"{sym}: ${price:,.2f}" for sym, price in self.current_prices.items() if price > 0]) + print(f"💵 Prices: {prices_str}") + print(f"💰 Grand Total Equity: ${status['total_equity']:.2f} (Initial: ${status['total_initial_balance']:.2f})") + print(f"📈 Grand Total Return: {status['total_return']:.2f}%") print("-" * 80) - for tf_value, tf_status in status['timeframes'].items(): - name = tf_status['name_en'] - equity = tf_status['equity'] - return_pct = tf_status['return_pct'] - leverage = tf_status['leverage'] - stats = tf_status['stats'] + # 按币种打印 + for symbol in self.symbols: + symbol_status = status.get('symbols', {}).get(symbol, {}) + if not symbol_status: + continue - return_icon = "🟢" if return_pct > 0 else "🔴" if return_pct < 0 else "⚪" + unit = symbol.replace('USDT', '') if symbol.endswith('USDT') else symbol + sym_equity = symbol_status.get('total_equity', 0) + sym_return = symbol_status.get('total_return', 0) + return_icon = "🟢" if sym_return > 0 else "🔴" if sym_return < 0 else "⚪" - print(f"\n📊 {name} ({leverage}x)") - print(f" Equity: ${equity:.2f} | Return: {return_icon} {return_pct:+.2f}%") - print(f" Trades: {stats['total_trades']} | Win Rate: {stats['win_rate']:.1f}% | PnL: ${stats['total_pnl']:.2f}") + print(f"\n🪙 {symbol} - Equity: ${sym_equity:.2f} | Return: {return_icon} {sym_return:+.2f}%") - pos = tf_status.get('position') - if pos: - unrealized = pos.get('unrealized_pnl', 0) - unrealized_pct = pos.get('unrealized_pnl_pct', 0) - pnl_icon = "🟢" if unrealized > 0 else "🔴" if unrealized < 0 else "⚪" - print(f" Position: {pos['side']} @ ${pos['entry_price']:.2f}") - print(f" SL: ${pos['stop_loss']:.2f} | TP: ${pos['take_profit']:.2f}") - print(f" {pnl_icon} Unrealized: ${unrealized:.2f} ({unrealized_pct:+.2f}%)") - else: - print(f" Position: FLAT") + for tf_value, tf_status in symbol_status.get('timeframes', {}).items(): + name = tf_status['name_en'] + equity = tf_status['equity'] + return_pct = tf_status['return_pct'] + leverage = tf_status['leverage'] + stats = tf_status['stats'] + + return_icon = "🟢" if return_pct > 0 else "🔴" if return_pct < 0 else "⚪" + + print(f" 📊 {name} ({leverage}x)") + print(f" Equity: ${equity:.2f} | Return: {return_icon} {return_pct:+.2f}%") + print(f" Trades: {stats['total_trades']} | Win Rate: {stats['win_rate']:.1f}% | PnL: ${stats['total_pnl']:.2f}") + + pos = tf_status.get('position') + if pos: + unrealized = pos.get('unrealized_pnl', 0) + unrealized_pct = pos.get('unrealized_pnl_pct', 0) + pnl_icon = "🟢" if unrealized > 0 else "🔴" if unrealized < 0 else "⚪" + print(f" Position: {pos['side']} @ ${pos['entry_price']:.2f}") + print(f" SL: ${pos['stop_loss']:.2f} | TP: ${pos['take_profit']:.2f}") + print(f" {pnl_icon} Unrealized: ${unrealized:.2f} ({unrealized_pct:+.2f}%)") + else: + print(f" Position: FLAT") print("=" * 80 + "\n") @@ -262,12 +374,13 @@ class RealtimeTrader: def get_status(self) -> Dict[str, Any]: """获取状态""" - return self.trader.get_status(self.current_price) + return self.trader.get_status(prices=self.current_prices) async def main(): """主函数""" from dotenv import load_dotenv + import os load_dotenv(Path(__file__).parent.parent / '.env') @@ -277,7 +390,7 @@ async def main(): ) trader = RealtimeTrader( - symbol='btcusdt', + symbols=settings.symbols_list, initial_balance=10000.0, signal_check_interval=30, ) @@ -290,12 +403,15 @@ async def main(): signal.signal(signal.SIGTERM, signal_handler) print("\n" + "=" * 80) - print("🚀 MULTI-TIMEFRAME REALTIME TRADING") + print("🚀 MULTI-SYMBOL MULTI-TIMEFRAME REALTIME TRADING") print("=" * 80) + print(f"Symbols: {', '.join(settings.symbols_list)}") print("Timeframes:") - print(" 📈 Short-term (5m/15m/1h) - 5x leverage") - print(" 📊 Medium-term (4h/1d) - 3x leverage") - print(" 📉 Long-term (1d/1w) - 2x leverage") + print(" 📈 Short-term (5m/15m/1h) - 10x leverage") + print(" 📊 Medium-term (4h/1d) - 10x leverage") + print(" 📉 Long-term (1d/1w) - 10x leverage") + print(f"Initial Balance: $10,000 per timeframe per symbol") + print(f"Total Initial: ${10000 * 3 * len(settings.symbols_list):,}") print("=" * 80) print("Press Ctrl+C to stop") print("=" * 80 + "\n")