From 54e33e7fff37d1e7979c11b503d5f5df261a7d80 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Tue, 9 Dec 2025 12:57:31 +0800 Subject: [PATCH] d --- output/paper_trading_state.json | 18 - trading/paper_trading.py | 955 ++++++++++++------------------- trading/realtime_trader.py | 232 +++----- web/api.py | 169 ++++-- web/static/index.html | 982 ++++++++++++-------------------- 5 files changed, 937 insertions(+), 1419 deletions(-) delete mode 100644 output/paper_trading_state.json diff --git a/output/paper_trading_state.json b/output/paper_trading_state.json deleted file mode 100644 index ce27d86..0000000 --- a/output/paper_trading_state.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "balance": 10000.0, - "position": null, - "trades": [], - "stats": { - "total_trades": 0, - "winning_trades": 0, - "losing_trades": 0, - "total_pnl": 0.0, - "max_drawdown": 0.0, - "peak_balance": 10000.0, - "win_rate": 0.0, - "avg_win": 0.0, - "avg_loss": 0.0, - "profit_factor": 0.0 - }, - "last_updated": "2025-12-09T12:02:05.263984" -} \ No newline at end of file diff --git a/trading/paper_trading.py b/trading/paper_trading.py index 5759a49..80fbfeb 100644 --- a/trading/paper_trading.py +++ b/trading/paper_trading.py @@ -1,11 +1,16 @@ """ -Paper Trading Module - 模拟盘交易系统 +Paper Trading Module - 多周期独立仓位管理 -支持仓位管理: -- 分批建仓(信号重复时加仓) -- 金字塔加仓策略 -- 最大持仓限制 -- 动态止盈止损 +支持三个独立周期的模拟交易: +- 短周期 (5m/15m/1h): short_term_5m_15m_1h / intraday +- 中周期 (4h/1d): medium_term_4h_1d / swing +- 长周期 (1d/1w): long_term_1d_1w + +每个周期独立管理: +- 独立仓位 +- 独立止盈止损 +- 独立统计数据 +- 独立权益曲线 """ import json import logging @@ -18,32 +23,48 @@ from enum import Enum logger = logging.getLogger(__name__) -class PositionSide(Enum): - LONG = "LONG" - SHORT = "SHORT" - FLAT = "FLAT" +class TimeFrame(Enum): + """交易周期""" + SHORT = "short" # 短周期 5m/15m/1h + MEDIUM = "medium" # 中周期 4h/1d + LONG = "long" # 长周期 1d/1w -@dataclass -class PositionEntry: - """单次入场记录""" - price: float - size: float # BTC 数量 - time: str - signal_id: str # 信号标识 +TIMEFRAME_CONFIG = { + TimeFrame.SHORT: { + 'name': '短周期', + 'name_en': 'Short-term', + 'signal_keys': ['short_term_5m_15m_1h', 'intraday'], + 'leverage': 10, + 'initial_balance': 10000.0, # 独立初始资金 + }, + TimeFrame.MEDIUM: { + 'name': '中周期', + 'name_en': 'Medium-term', + 'signal_keys': ['medium_term_4h_1d', 'swing'], + 'leverage': 10, + 'initial_balance': 10000.0, # 独立初始资金 + }, + TimeFrame.LONG: { + 'name': '长周期', + 'name_en': 'Long-term', + 'signal_keys': ['long_term_1d_1w'], + 'leverage': 10, + 'initial_balance': 10000.0, # 独立初始资金 + }, +} @dataclass class Position: - """持仓信息 - 支持多次入场""" + """持仓信息""" side: str # LONG, SHORT, FLAT - entries: List[Dict] = field(default_factory=list) # 多次入场记录 - total_size: float = 0.0 # 总持仓量 - avg_entry_price: float = 0.0 # 平均入场价 + entry_price: float = 0.0 + size: float = 0.0 # BTC 数量 stop_loss: float = 0.0 take_profit: float = 0.0 created_at: str = "" - last_updated: str = "" + signal_reasoning: str = "" def to_dict(self) -> dict: return asdict(self) @@ -52,66 +73,12 @@ class Position: def from_dict(cls, data: dict) -> 'Position': return cls(**data) - def add_entry(self, price: float, size: float, signal_id: str): - """添加入场记录""" - entry = { - 'price': price, - 'size': size, - 'time': datetime.now().isoformat(), - 'signal_id': signal_id, - } - self.entries.append(entry) - - # 更新平均价和总量 - total_value = sum(e['price'] * e['size'] for e in self.entries) - self.total_size = sum(e['size'] for e in self.entries) - self.avg_entry_price = total_value / self.total_size if self.total_size > 0 else 0 - self.last_updated = datetime.now().isoformat() - - def reduce_position(self, reduce_size: float) -> float: - """减仓 - 返回减仓的平均成本""" - if reduce_size >= self.total_size: - # 全部平仓 - avg_cost = self.avg_entry_price - self.entries = [] - self.total_size = 0 - self.avg_entry_price = 0 - return avg_cost - - # 部分减仓 - FIFO 方式 - remaining = reduce_size - removed_value = 0 - removed_size = 0 - - while remaining > 0 and self.entries: - entry = self.entries[0] - if entry['size'] <= remaining: - removed_value += entry['price'] * entry['size'] - removed_size += entry['size'] - remaining -= entry['size'] - self.entries.pop(0) - else: - removed_value += entry['price'] * remaining - removed_size += remaining - entry['size'] -= remaining - remaining = 0 - - # 更新总量和平均价 - self.total_size = sum(e['size'] for e in self.entries) - if self.total_size > 0: - total_value = sum(e['price'] * e['size'] for e in self.entries) - self.avg_entry_price = total_value / self.total_size - else: - self.avg_entry_price = 0 - - self.last_updated = datetime.now().isoformat() - return removed_value / removed_size if removed_size > 0 else 0 - @dataclass class Trade: """交易记录""" id: str + timeframe: str side: str entry_price: float entry_time: str @@ -121,7 +88,6 @@ class Trade: pnl: float pnl_pct: float exit_reason: str - signal_source: str def to_dict(self) -> dict: return asdict(self) @@ -131,195 +97,23 @@ class Trade: return cls(**data) -class PositionManager: - """仓位管理器""" +@dataclass +class TimeFrameAccount: + """单个周期的账户""" + timeframe: str + balance: float + initial_balance: float + leverage: int + position: Optional[Position] = None + trades: List[Trade] = field(default_factory=list) + stats: Dict = field(default_factory=dict) + equity_curve: List[Dict] = field(default_factory=list) - def __init__( - self, - max_position_pct: float = 0.5, # 最大持仓比例 (50% 资金) - base_position_pct: float = 0.1, # 基础仓位比例 (10% 资金) - max_entries: int = 5, # 最多加仓次数 - pyramid_factor: float = 0.8, # 金字塔因子 (每次加仓量递减) - signal_cooldown: int = 300, # 同方向信号冷却时间(秒) - ): - self.max_position_pct = max_position_pct - self.base_position_pct = base_position_pct - self.max_entries = max_entries - self.pyramid_factor = pyramid_factor - self.signal_cooldown = signal_cooldown - - # 记录最近的信号 - self.last_signal_time: Dict[str, datetime] = {} - self.signal_count: Dict[str, int] = {} # 连续同方向信号计数 - - def calculate_entry_size( - self, - balance: float, - current_position: Optional[Position], - signal_direction: str, - current_price: float, - leverage: int - ) -> float: - """ - 计算本次入场的仓位大小 - - Returns: - BTC 数量,0 表示不开仓 - """ - # 检查是否在冷却期 - now = datetime.now() - last_time = self.last_signal_time.get(signal_direction) - if last_time and (now - last_time).total_seconds() < self.signal_cooldown: - logger.info(f"Signal cooldown: {signal_direction}, skip entry") - return 0 - - # 计算最大允许仓位价值 - max_position_value = balance * self.max_position_pct * leverage - - # 当前持仓价值 - current_position_value = 0 - num_entries = 0 - - if current_position and current_position.side != 'FLAT': - if current_position.side == signal_direction: - # 同方向,考虑加仓 - current_position_value = current_position.total_size * current_price - num_entries = len(current_position.entries) - - if num_entries >= self.max_entries: - logger.info(f"Max entries reached: {num_entries}") - return 0 - else: - # 反方向,不在此处理(应先平仓) - return 0 - - # 计算剩余可用仓位 - remaining_value = max_position_value - current_position_value - if remaining_value <= 0: - logger.info(f"Max position reached") - return 0 - - # 金字塔计算:每次加仓量递减 - base_value = balance * self.base_position_pct * leverage - entry_value = base_value * (self.pyramid_factor ** num_entries) - - # 取最小值 - entry_value = min(entry_value, remaining_value) - - # 转换为 BTC 数量 - entry_size = entry_value / current_price - - # 更新信号记录 - self.last_signal_time[signal_direction] = now - self.signal_count[signal_direction] = self.signal_count.get(signal_direction, 0) + 1 - - return entry_size - - def should_take_partial_profit( - self, - position: Position, - current_price: float, - profit_levels: List[float] = [0.01, 0.02, 0.03] # 1%, 2%, 3% - ) -> Optional[Dict]: - """ - 检查是否应该部分止盈 - - Returns: - {'size': 减仓量, 'reason': 原因} 或 None - """ - if not position or position.side == 'FLAT' or position.total_size == 0: - return None - - # 计算当前盈利 - if position.side == 'LONG': - profit_pct = (current_price - position.avg_entry_price) / position.avg_entry_price - else: - profit_pct = (position.avg_entry_price - current_price) / position.avg_entry_price - - # 根据入场次数决定止盈策略 - num_entries = len(position.entries) - - # 多次入场时更积极止盈 - for i, level in enumerate(profit_levels): - adjusted_level = level * (1 - 0.1 * (num_entries - 1)) # 入场越多,止盈越早 - if profit_pct >= adjusted_level: - # 止盈 1/3 仓位 - reduce_size = position.total_size / 3 - if reduce_size * current_price >= 10: # 最小 $10 - return { - 'size': reduce_size, - 'reason': f'PARTIAL_TP_{int(level*100)}PCT', - 'profit_pct': profit_pct, - } - - return None - - def reset_signal_count(self, direction: str): - """重置信号计数(平仓后调用)""" - self.signal_count[direction] = 0 - - -class PaperTrader: - """模拟盘交易器 - 支持仓位管理""" - - def __init__( - self, - initial_balance: float = 10000.0, - leverage: int = 5, - max_position_pct: float = 0.5, - base_position_pct: float = 0.1, - state_file: str = None - ): - self.initial_balance = initial_balance - self.leverage = leverage - - # 仓位管理器 - self.position_manager = PositionManager( - max_position_pct=max_position_pct, - base_position_pct=base_position_pct, - ) - - # 状态文件 - if state_file: - self.state_file = Path(state_file) - else: - self.state_file = Path(__file__).parent.parent / 'output' / 'paper_trading_state.json' - - # 加载或初始化状态 - self._load_state() - - logger.info(f"Paper Trader initialized: balance=${self.balance:.2f}, leverage={leverage}x") - - def _load_state(self): - """加载持久化状态""" - if self.state_file.exists(): - try: - with open(self.state_file, 'r') as f: - state = json.load(f) - - self.balance = state.get('balance', self.initial_balance) - self.position = Position.from_dict(state['position']) if state.get('position') else None - self.trades = [Trade.from_dict(t) for t in state.get('trades', [])] - self.stats = state.get('stats', self._init_stats()) - self.equity_curve = state.get('equity_curve', []) - - logger.info(f"Loaded state: balance=${self.balance:.2f}, trades={len(self.trades)}") - except Exception as e: - logger.error(f"Failed to load state: {e}") - self._init_state() - else: - self._init_state() - - def _init_state(self): - """初始化状态""" - self.balance = self.initial_balance - self.position: Optional[Position] = None - self.trades: List[Trade] = [] - self.stats = self._init_stats() - self.equity_curve = [] # 权益曲线 + def __post_init__(self): + if not self.stats: + self.stats = self._init_stats() def _init_stats(self) -> dict: - """初始化统计数据""" return { 'total_trades': 0, 'winning_trades': 0, @@ -331,24 +125,105 @@ class PaperTrader: 'avg_win': 0.0, 'avg_loss': 0.0, 'profit_factor': 0.0, - 'total_long_trades': 0, - 'total_short_trades': 0, - 'consecutive_wins': 0, - 'consecutive_losses': 0, - 'max_consecutive_wins': 0, - 'max_consecutive_losses': 0, } + def to_dict(self) -> dict: + return { + 'timeframe': self.timeframe, + 'balance': self.balance, + 'initial_balance': self.initial_balance, + 'leverage': self.leverage, + 'position': self.position.to_dict() if self.position else None, + 'trades': [t.to_dict() for t in self.trades[-100:]], + 'stats': self.stats, + 'equity_curve': self.equity_curve[-500:], + } + + @classmethod + def from_dict(cls, data: dict) -> 'TimeFrameAccount': + account = cls( + timeframe=data['timeframe'], + balance=data['balance'], + initial_balance=data['initial_balance'], + leverage=data['leverage'], + stats=data.get('stats', {}), + equity_curve=data.get('equity_curve', []), + ) + if data.get('position'): + account.position = Position.from_dict(data['position']) + account.trades = [Trade.from_dict(t) for t in data.get('trades', [])] + return account + + +class MultiTimeframePaperTrader: + """多周期模拟盘交易器""" + + def __init__( + self, + initial_balance: float = 10000.0, + state_file: str = None + ): + self.initial_balance = initial_balance + + # 状态文件 + if state_file: + self.state_file = Path(state_file) + else: + self.state_file = Path(__file__).parent.parent / 'output' / 'paper_trading_state.json' + + # 初始化三个周期账户 + self.accounts: Dict[TimeFrame, TimeFrameAccount] = {} + + # 加载或初始化状态 + self._load_state() + + logger.info(f"Multi-timeframe Paper Trader initialized: total_balance=${initial_balance:.2f}") + + def _load_state(self): + """加载持久化状态""" + if self.state_file.exists(): + try: + with open(self.state_file, 'r') as f: + state = json.load(f) + + # 加载各周期账户 + for tf in TimeFrame: + tf_data = state.get('accounts', {}).get(tf.value) + if tf_data: + self.accounts[tf] = TimeFrameAccount.from_dict(tf_data) + else: + self._init_account(tf) + + logger.info(f"Loaded state from {self.state_file}") + except Exception as e: + logger.error(f"Failed to load state: {e}") + self._init_all_accounts() + else: + self._init_all_accounts() + + def _init_all_accounts(self): + """初始化所有账户""" + for tf in TimeFrame: + self._init_account(tf) + + def _init_account(self, tf: TimeFrame): + """初始化单个周期账户""" + config = TIMEFRAME_CONFIG[tf] + # 每个周期独立初始资金 10000 USD,10倍杠杆,最大仓位价值 100000 USD + account_balance = config['initial_balance'] + self.accounts[tf] = TimeFrameAccount( + timeframe=tf.value, + balance=account_balance, + initial_balance=account_balance, + leverage=config['leverage'], + ) + def _save_state(self): """保存状态到文件""" self.state_file.parent.mkdir(parents=True, exist_ok=True) state = { - 'balance': self.balance, - 'position': self.position.to_dict() if self.position else None, - 'trades': [t.to_dict() for t in self.trades[-200:]], - 'stats': self.stats, - 'equity_curve': self.equity_curve[-1000:], + 'accounts': {tf.value: acc.to_dict() for tf, acc in self.accounts.items()}, 'last_updated': datetime.now().isoformat(), } @@ -356,95 +231,95 @@ class PaperTrader: json.dump(state, f, indent=2, ensure_ascii=False) def process_signal(self, signal: Dict[str, Any], current_price: float) -> Dict[str, Any]: - """处理交易信号""" - result = { + """处理交易信号 - 检查所有周期""" + results = { 'timestamp': datetime.now().isoformat(), 'current_price': current_price, + 'timeframes': {}, + } + + for tf in TimeFrame: + result = self._process_timeframe_signal(tf, signal, current_price) + results['timeframes'][tf.value] = result + + self._save_state() + return results + + def _process_timeframe_signal( + self, tf: TimeFrame, signal: Dict[str, Any], current_price: float + ) -> Dict[str, Any]: + """处理单个周期的信号""" + account = self.accounts[tf] + config = TIMEFRAME_CONFIG[tf] + + result = { 'action': 'NONE', 'details': None, } # 更新权益曲线 - self._update_equity_curve(current_price) + self._update_equity_curve(tf, current_price) # 1. 检查止盈止损 - if self.position and self.position.side != 'FLAT': - close_result = self._check_close_position(current_price) + if account.position and account.position.side != 'FLAT': + close_result = self._check_close_position(tf, current_price) if close_result: result['action'] = 'CLOSE' result['details'] = close_result - self._save_state() return result - # 2. 检查部分止盈 - partial_tp = self.position_manager.should_take_partial_profit( - self.position, current_price - ) - if partial_tp: - close_result = self._partial_close(current_price, partial_tp['size'], partial_tp['reason']) - result['action'] = 'PARTIAL_CLOSE' - result['details'] = close_result - self._save_state() - return result + # 2. 提取该周期的信号 + tf_signal = self._extract_timeframe_signal(signal, config['signal_keys']) - # 3. 提取短期信号 - short_term = self._extract_short_term_signal(signal) - - if not short_term or not short_term.get('exists'): + if not tf_signal or not tf_signal.get('exists'): result['action'] = 'NO_SIGNAL' - result['details'] = {'reason': '无有效短期信号'} return result - direction = short_term['direction'] + direction = tf_signal.get('direction') + if not direction: + result['action'] = 'NO_SIGNAL' + return result - # 4. 如果有反向持仓,先平仓 - if self.position and self.position.side != 'FLAT': - if (self.position.side == 'LONG' and direction == 'SHORT') or \ - (self.position.side == 'SHORT' and direction == 'LONG'): - close_result = self._close_position(current_price, 'SIGNAL_REVERSE') + signal_stop_loss = tf_signal.get('stop_loss', 0) + signal_take_profit = tf_signal.get('take_profit', 0) + + # 验证止盈止损 + if signal_stop_loss <= 0 or signal_take_profit <= 0: + result['action'] = 'NO_SIGNAL' + result['details'] = {'reason': '缺少有效止盈止损'} + return result + + # 3. 如果有反向持仓,先平仓 + if account.position and account.position.side != 'FLAT': + if (account.position.side == 'LONG' and direction == 'SHORT') or \ + (account.position.side == 'SHORT' and direction == 'LONG'): + close_result = self._close_position(tf, current_price, 'SIGNAL_REVERSE') result['action'] = 'REVERSE' result['details'] = {'close': close_result} # 开反向仓 - open_result = self._try_open_position( - direction, current_price, - short_term.get('stop_loss', 0), - short_term.get('take_profit', 0), - short_term.get('reasoning', '')[:100] + open_result = self._open_position( + tf, direction, current_price, + signal_stop_loss, signal_take_profit, + tf_signal.get('reasoning', '')[:100] ) if open_result: result['details']['open'] = open_result - - self._save_state() return result else: - # 同方向,尝试加仓 - add_result = self._try_add_position( - direction, current_price, - short_term.get('stop_loss', 0), - short_term.get('take_profit', 0), - short_term.get('reasoning', '')[:100] - ) - if add_result: - result['action'] = 'ADD' - result['details'] = add_result - self._save_state() - return result - else: - result['action'] = 'HOLD' - result['details'] = { - 'position': self.position.to_dict(), - 'unrealized_pnl': self._calc_unrealized_pnl(current_price), - 'reason': '已有持仓,加仓条件不满足' - } - return result + # 同方向,保持持仓 + result['action'] = 'HOLD' + result['details'] = { + 'position': account.position.to_dict(), + 'unrealized_pnl': self._calc_unrealized_pnl(tf, current_price), + } + return result - # 5. 无持仓,开新仓 - open_result = self._try_open_position( - direction, current_price, - short_term.get('stop_loss', 0), - short_term.get('take_profit', 0), - short_term.get('reasoning', '')[:100] + # 4. 无持仓,开新仓 + open_result = self._open_position( + tf, direction, current_price, + signal_stop_loss, signal_take_profit, + tf_signal.get('reasoning', '')[:100] ) if open_result: @@ -452,352 +327,272 @@ class PaperTrader: result['details'] = open_result else: result['action'] = 'WAIT' - result['details'] = {'reason': '仓位条件不满足'} - self._save_state() return result - def _extract_short_term_signal(self, signal: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """提取短期信号""" + def _extract_timeframe_signal( + self, signal: Dict[str, Any], signal_keys: List[str] + ) -> Optional[Dict[str, Any]]: + """提取特定周期的信号""" try: + # 从 llm_signal.opportunities 中提取 llm_signal = signal.get('llm_signal') or signal.get('aggregated_signal', {}).get('llm_signal') if llm_signal and isinstance(llm_signal, dict): opportunities = llm_signal.get('opportunities', {}) - short_term = opportunities.get('short_term_5m_15m_1h') or opportunities.get('intraday') - if short_term: - return short_term + for key in signal_keys: + if key in opportunities and opportunities[key]: + return opportunities[key] + # 备选路径 agg = signal.get('aggregated_signal', {}) if agg: llm = agg.get('llm_signal', {}) if llm: opps = llm.get('opportunities', {}) - short_term = opps.get('short_term_5m_15m_1h') or opps.get('intraday') - if short_term: - return short_term + for key in signal_keys: + if key in opps and opps[key]: + return opps[key] return None except Exception as e: - logger.error(f"Error extracting short term signal: {e}") + logger.error(f"Error extracting signal: {e}") return None - def _try_open_position( - self, direction: str, price: float, - stop_loss: float, take_profit: float, signal_source: str + def _open_position( + self, tf: TimeFrame, direction: str, price: float, + stop_loss: float, take_profit: float, reasoning: str ) -> Optional[Dict]: - """尝试开仓""" - # 计算仓位大小 - entry_size = self.position_manager.calculate_entry_size( - self.balance, self.position, direction, price, self.leverage - ) + """开仓""" + account = self.accounts[tf] + config = TIMEFRAME_CONFIG[tf] - if entry_size <= 0: + # 计算仓位大小: 全部余额 * 杠杆倍数 (10000 * 10 = 100000 USD 仓位价值) + position_value = account.balance * account.leverage + size = position_value / price + + if size <= 0: return None - # 创建持仓 - self.position = Position( + account.position = Position( side=direction, - stop_loss=stop_loss if stop_loss > 0 else self._calc_default_stop(direction, price), - take_profit=take_profit if take_profit > 0 else self._calc_default_tp(direction, price), + entry_price=price, + size=size, + stop_loss=stop_loss, + take_profit=take_profit, created_at=datetime.now().isoformat(), + signal_reasoning=reasoning, ) - signal_id = f"S{datetime.now().strftime('%H%M%S')}" - self.position.add_entry(price, entry_size, signal_id) - logger.info( - f"OPEN {direction}: price=${price:.2f}, size={entry_size:.6f} BTC, " - f"SL=${self.position.stop_loss:.2f}, TP=${self.position.take_profit:.2f}" + f"[{config['name']}] OPEN {direction}: price=${price:.2f}, " + f"size={size:.6f} BTC, SL=${stop_loss:.2f}, TP=${take_profit:.2f}" ) return { + 'timeframe': tf.value, 'side': direction, 'entry_price': price, - 'size': entry_size, - 'total_size': self.position.total_size, - 'stop_loss': self.position.stop_loss, - 'take_profit': self.position.take_profit, - 'num_entries': 1, + 'size': size, + 'stop_loss': stop_loss, + 'take_profit': take_profit, } - def _try_add_position( - self, direction: str, price: float, - stop_loss: float, take_profit: float, signal_source: str - ) -> Optional[Dict]: - """尝试加仓""" - if not self.position or self.position.side != direction: - return None - - entry_size = self.position_manager.calculate_entry_size( - self.balance, self.position, direction, price, self.leverage - ) - - if entry_size <= 0: - return None - - signal_id = f"S{datetime.now().strftime('%H%M%S')}" - old_avg = self.position.avg_entry_price - self.position.add_entry(price, entry_size, signal_id) - - # 可选:更新止盈止损 - if stop_loss > 0: - self.position.stop_loss = stop_loss - if take_profit > 0: - self.position.take_profit = take_profit - - logger.info( - f"ADD {direction}: price=${price:.2f}, size={entry_size:.6f} BTC, " - f"avg_entry=${old_avg:.2f}->${self.position.avg_entry_price:.2f}, " - f"total_size={self.position.total_size:.6f}" - ) - - return { - 'side': direction, - 'add_price': price, - 'add_size': entry_size, - 'total_size': self.position.total_size, - 'avg_entry_price': self.position.avg_entry_price, - 'num_entries': len(self.position.entries), - } - - def _calc_default_stop(self, side: str, price: float) -> float: - """计算默认止损 (0.5%)""" - if side == 'LONG': - return price * 0.995 - else: - return price * 1.005 - - def _calc_default_tp(self, side: str, price: float) -> float: - """计算默认止盈 (1.5%)""" - if side == 'LONG': - return price * 1.015 - else: - return price * 0.985 - - def _check_close_position(self, current_price: float) -> Optional[Dict[str, Any]]: + def _check_close_position(self, tf: TimeFrame, current_price: float) -> Optional[Dict]: """检查是否触发止盈止损""" - if not self.position or self.position.side == 'FLAT': + account = self.accounts[tf] + pos = account.position + + if not pos or pos.side == 'FLAT': return None - if self.position.side == 'LONG': - if current_price >= self.position.take_profit: - return self._close_position(current_price, 'TAKE_PROFIT') - elif current_price <= self.position.stop_loss: - return self._close_position(current_price, 'STOP_LOSS') - else: - if current_price <= self.position.take_profit: - return self._close_position(current_price, 'TAKE_PROFIT') - elif current_price >= self.position.stop_loss: - return self._close_position(current_price, 'STOP_LOSS') + if pos.side == 'LONG': + if current_price >= pos.take_profit: + return self._close_position(tf, current_price, 'TAKE_PROFIT') + elif current_price <= pos.stop_loss: + return self._close_position(tf, current_price, 'STOP_LOSS') + else: # SHORT + if current_price <= pos.take_profit: + return self._close_position(tf, current_price, 'TAKE_PROFIT') + elif current_price >= pos.stop_loss: + return self._close_position(tf, current_price, 'STOP_LOSS') return None - def _close_position(self, price: float, reason: str) -> Dict[str, Any]: - """全部平仓""" - if not self.position or self.position.side == 'FLAT': - return {'error': 'No position to close'} + def _close_position(self, tf: TimeFrame, price: float, reason: str) -> Dict: + """平仓""" + account = self.accounts[tf] + config = TIMEFRAME_CONFIG[tf] + pos = account.position - pnl, pnl_pct = self._calc_pnl(price) - self.balance += pnl + if not pos or pos.side == 'FLAT': + return {'error': 'No position'} + # 计算盈亏 + if pos.side == 'LONG': + pnl_pct = (price - pos.entry_price) / pos.entry_price * 100 * account.leverage + else: + pnl_pct = (pos.entry_price - price) / pos.entry_price * 100 * account.leverage + + position_value = pos.size * pos.entry_price + pnl = position_value * (pnl_pct / 100) + + account.balance += pnl + + # 记录交易 trade = Trade( - id=f"T{len(self.trades)+1:04d}", - side=self.position.side, - entry_price=self.position.avg_entry_price, - entry_time=self.position.created_at, + id=f"{tf.value[0].upper()}{len(account.trades)+1:04d}", + timeframe=tf.value, + side=pos.side, + entry_price=pos.entry_price, + entry_time=pos.created_at, exit_price=price, exit_time=datetime.now().isoformat(), - size=self.position.total_size, + size=pos.size, pnl=pnl, pnl_pct=pnl_pct, exit_reason=reason, - signal_source=f"{len(self.position.entries)} entries", ) - self.trades.append(trade) - self._update_stats(trade) + account.trades.append(trade) + self._update_stats(tf, trade) result = { - 'side': self.position.side, - 'entry_price': self.position.avg_entry_price, + 'timeframe': tf.value, + 'side': pos.side, + 'entry_price': pos.entry_price, 'exit_price': price, - 'size': self.position.total_size, - 'num_entries': len(self.position.entries), + 'size': pos.size, 'pnl': pnl, 'pnl_pct': pnl_pct, 'reason': reason, - 'new_balance': self.balance, + 'new_balance': account.balance, } logger.info( - f"CLOSE {self.position.side}: avg_entry=${self.position.avg_entry_price:.2f}, " + f"[{config['name']}] CLOSE {pos.side}: entry=${pos.entry_price:.2f}, " f"exit=${price:.2f}, PnL=${pnl:.2f} ({pnl_pct:.2f}%), reason={reason}" ) - # 重置 - self.position_manager.reset_signal_count(self.position.side) - self.position = None - + account.position = None return result - def _partial_close(self, price: float, size: float, reason: str) -> Dict[str, Any]: - """部分平仓""" - if not self.position or self.position.side == 'FLAT': - return {'error': 'No position'} + def _calc_unrealized_pnl(self, tf: TimeFrame, current_price: float) -> Dict[str, float]: + """计算未实现盈亏""" + account = self.accounts[tf] + pos = account.position - avg_cost = self.position.reduce_position(size) + if not pos or pos.side == 'FLAT': + return {'pnl': 0, 'pnl_pct': 0} - if self.position.side == 'LONG': - pnl_pct = (price - avg_cost) / avg_cost * 100 * self.leverage + if pos.side == 'LONG': + pnl_pct = (current_price - pos.entry_price) / pos.entry_price * 100 * account.leverage else: - pnl_pct = (avg_cost - price) / avg_cost * 100 * self.leverage + pnl_pct = (pos.entry_price - current_price) / pos.entry_price * 100 * account.leverage - pnl = size * avg_cost * (pnl_pct / 100) - self.balance += pnl - - trade = Trade( - id=f"T{len(self.trades)+1:04d}", - side=self.position.side, - entry_price=avg_cost, - entry_time=self.position.created_at, - exit_price=price, - exit_time=datetime.now().isoformat(), - size=size, - pnl=pnl, - pnl_pct=pnl_pct, - exit_reason=reason, - signal_source="partial", - ) - self.trades.append(trade) - self._update_stats(trade) - - logger.info( - f"PARTIAL CLOSE: size={size:.6f}, PnL=${pnl:.2f} ({pnl_pct:.2f}%), " - f"remaining={self.position.total_size:.6f}" - ) - - # 如果完全平仓 - if self.position.total_size <= 0: - self.position_manager.reset_signal_count(self.position.side) - self.position = None - - return { - 'side': self.position.side if self.position else 'FLAT', - 'closed_size': size, - 'exit_price': price, - 'pnl': pnl, - 'pnl_pct': pnl_pct, - 'reason': reason, - 'remaining_size': self.position.total_size if self.position else 0, - 'new_balance': self.balance, - } - - def _calc_pnl(self, current_price: float) -> tuple: - """计算盈亏""" - if not self.position: - return 0.0, 0.0 - - if self.position.side == 'LONG': - pnl_pct = (current_price - self.position.avg_entry_price) / self.position.avg_entry_price * 100 - else: - pnl_pct = (self.position.avg_entry_price - current_price) / self.position.avg_entry_price * 100 - - pnl_pct *= self.leverage - position_value = self.position.total_size * self.position.avg_entry_price + position_value = pos.size * pos.entry_price pnl = position_value * (pnl_pct / 100) - return pnl, pnl_pct - - def _calc_unrealized_pnl(self, current_price: float) -> Dict[str, float]: - """计算未实现盈亏""" - pnl, pnl_pct = self._calc_pnl(current_price) return {'pnl': pnl, 'pnl_pct': pnl_pct} - def _update_equity_curve(self, current_price: float): + def _update_equity_curve(self, tf: TimeFrame, current_price: float): """更新权益曲线""" - equity = self.balance - if self.position and self.position.total_size > 0: - unrealized = self._calc_unrealized_pnl(current_price) + account = self.accounts[tf] + equity = account.balance + + if account.position and account.position.side != 'FLAT': + unrealized = self._calc_unrealized_pnl(tf, current_price) equity += unrealized['pnl'] - self.equity_curve.append({ + account.equity_curve.append({ 'timestamp': datetime.now().isoformat(), 'equity': equity, - 'balance': self.balance, + 'balance': account.balance, 'price': current_price, }) - def _update_stats(self, trade: Trade): + def _update_stats(self, tf: TimeFrame, trade: Trade): """更新统计数据""" - self.stats['total_trades'] += 1 - self.stats['total_pnl'] += trade.pnl + account = self.accounts[tf] + stats = account.stats - if trade.side == 'LONG': - self.stats['total_long_trades'] += 1 - else: - self.stats['total_short_trades'] += 1 + stats['total_trades'] += 1 + stats['total_pnl'] += trade.pnl if trade.pnl > 0: - self.stats['winning_trades'] += 1 - self.stats['consecutive_wins'] += 1 - self.stats['consecutive_losses'] = 0 - if self.stats['consecutive_wins'] > self.stats['max_consecutive_wins']: - self.stats['max_consecutive_wins'] = self.stats['consecutive_wins'] + stats['winning_trades'] += 1 else: - self.stats['losing_trades'] += 1 - self.stats['consecutive_losses'] += 1 - self.stats['consecutive_wins'] = 0 - if self.stats['consecutive_losses'] > self.stats['max_consecutive_losses']: - self.stats['max_consecutive_losses'] = self.stats['consecutive_losses'] + stats['losing_trades'] += 1 - if self.stats['total_trades'] > 0: - self.stats['win_rate'] = self.stats['winning_trades'] / self.stats['total_trades'] * 100 + if stats['total_trades'] > 0: + stats['win_rate'] = stats['winning_trades'] / stats['total_trades'] * 100 - wins = [t for t in self.trades if t.pnl > 0] - losses = [t for t in self.trades if t.pnl <= 0] + wins = [t for t in account.trades if t.pnl > 0] + losses = [t for t in account.trades if t.pnl <= 0] if wins: - self.stats['avg_win'] = sum(t.pnl for t in wins) / len(wins) + stats['avg_win'] = sum(t.pnl for t in wins) / len(wins) if losses: - self.stats['avg_loss'] = sum(t.pnl for t in losses) / len(losses) + stats['avg_loss'] = sum(t.pnl for t in losses) / len(losses) - if self.stats['avg_loss'] != 0: - self.stats['profit_factor'] = abs(self.stats['avg_win'] / self.stats['avg_loss']) + if stats['avg_loss'] != 0: + stats['profit_factor'] = abs(stats['avg_win'] / stats['avg_loss']) - if self.balance > self.stats['peak_balance']: - self.stats['peak_balance'] = self.balance + if account.balance > stats['peak_balance']: + stats['peak_balance'] = account.balance - drawdown = (self.stats['peak_balance'] - self.balance) / self.stats['peak_balance'] * 100 - if drawdown > self.stats['max_drawdown']: - self.stats['max_drawdown'] = drawdown + drawdown = (stats['peak_balance'] - account.balance) / stats['peak_balance'] * 100 + if drawdown > stats['max_drawdown']: + stats['max_drawdown'] = drawdown def get_status(self, current_price: float = None) -> Dict[str, Any]: - """获取当前状态""" + """获取所有周期状态""" + total_balance = sum(acc.balance for acc in self.accounts.values()) + total_initial = sum(acc.initial_balance for acc in self.accounts.values()) + status = { 'timestamp': datetime.now().isoformat(), - 'balance': self.balance, - 'initial_balance': self.initial_balance, - 'total_return': (self.balance - self.initial_balance) / self.initial_balance * 100, - 'leverage': self.leverage, - 'position': None, - 'stats': self.stats, - 'recent_trades': [t.to_dict() for t in self.trades[-10:]], - 'equity_curve': self.equity_curve[-100:], + 'total_balance': total_balance, + 'total_initial_balance': total_initial, + 'total_return': (total_balance - total_initial) / total_initial * 100, + 'timeframes': {}, } - if self.position and self.position.total_size > 0: - pos_dict = self.position.to_dict() - if current_price: - unrealized = self._calc_unrealized_pnl(current_price) - pos_dict['current_price'] = current_price - pos_dict['unrealized_pnl'] = unrealized['pnl'] - pos_dict['unrealized_pnl_pct'] = unrealized['pnl_pct'] - status['position'] = pos_dict + for tf in TimeFrame: + account = self.accounts[tf] + config = TIMEFRAME_CONFIG[tf] + + tf_status = { + 'name': config['name'], + 'name_en': config['name_en'], + 'balance': account.balance, + 'initial_balance': account.initial_balance, + 'return_pct': (account.balance - account.initial_balance) / account.initial_balance * 100, + 'leverage': account.leverage, + 'position': None, + 'stats': account.stats, + 'recent_trades': [t.to_dict() for t in account.trades[-10:]], + 'equity_curve': account.equity_curve[-100:], + } + + if account.position and account.position.side != 'FLAT': + pos_dict = account.position.to_dict() + if current_price: + unrealized = self._calc_unrealized_pnl(tf, current_price) + pos_dict['current_price'] = current_price + pos_dict['unrealized_pnl'] = unrealized['pnl'] + pos_dict['unrealized_pnl_pct'] = unrealized['pnl_pct'] + tf_status['position'] = pos_dict + + status['timeframes'][tf.value] = tf_status return status def reset(self): - """重置模拟盘""" - self._init_state() + """重置所有账户""" + self._init_all_accounts() self._save_state() - logger.info("Paper trading account reset") + logger.info("All accounts reset") + + +# 兼容旧的 PaperTrader 接口 +PaperTrader = MultiTimeframePaperTrader diff --git a/trading/realtime_trader.py b/trading/realtime_trader.py index d39930d..3c53a57 100644 --- a/trading/realtime_trader.py +++ b/trading/realtime_trader.py @@ -1,46 +1,41 @@ """ -Realtime Paper Trading - 基于 WebSocket 实时数据的模拟盘 +Realtime Trading - 基于 WebSocket 实时数据的多周期交易 -使用 Binance WebSocket 获取实时价格,结合信号进行模拟交易 -支持仓位管理:金字塔加仓、最大持仓限制、部分止盈 +使用 Binance WebSocket 获取实时价格,结合信号进行多周期独立交易 +- 短周期 (5m/15m/1h) +- 中周期 (4h/1d) +- 长周期 (1d/1w) """ import asyncio import json import logging import signal -import sys from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional, Callable import websockets -from .paper_trading import PaperTrader +from .paper_trading import MultiTimeframePaperTrader, TimeFrame, TIMEFRAME_CONFIG logger = logging.getLogger(__name__) class RealtimeTrader: - """实时模拟盘交易器""" + """实时多周期交易器""" def __init__( self, symbol: str = "btcusdt", initial_balance: float = 10000.0, - leverage: int = 5, - max_position_pct: float = 0.5, - base_position_pct: float = 0.1, - signal_check_interval: int = 60, # 每60秒检查一次信号 + signal_check_interval: int = 60, ): """ 初始化实时交易器 Args: symbol: 交易对 (小写) - initial_balance: 初始资金 - leverage: 杠杆倍数 - max_position_pct: 最大持仓比例 (占资金百分比) - base_position_pct: 基础仓位比例 (每次入场) + initial_balance: 初始资金 (分配给三个周期) signal_check_interval: 信号检查间隔(秒) """ self.symbol = symbol.lower() @@ -49,13 +44,8 @@ class RealtimeTrader: # WebSocket URL self.ws_url = f"wss://fstream.binance.com/ws/{self.symbol}@aggTrade" - # 模拟盘 - 使用新的仓位管理参数 - self.trader = PaperTrader( - initial_balance=initial_balance, - leverage=leverage, - max_position_pct=max_position_pct, - base_position_pct=base_position_pct, - ) + # 多周期交易器 + self.trader = MultiTimeframePaperTrader(initial_balance=initial_balance) # 状态 self.current_price = 0.0 @@ -73,14 +63,15 @@ class RealtimeTrader: async def start(self): """启动实时交易""" self.is_running = True - logger.info(f"Starting realtime trader for {self.symbol.upper()}") + logger.info(f"Starting multi-timeframe realtime trader for {self.symbol.upper()}") logger.info(f"WebSocket URL: {self.ws_url}") - logger.info(f"Initial balance: ${self.trader.balance:.2f}") - logger.info(f"Leverage: {self.trader.leverage}x") - logger.info(f"Max position: {self.trader.position_manager.max_position_pct * 100}%") - logger.info(f"Base position: {self.trader.position_manager.base_position_pct * 100}%") logger.info(f"Signal check interval: {self.signal_check_interval}s") + for tf in TimeFrame: + config = TIMEFRAME_CONFIG[tf] + account = self.trader.accounts[tf] + logger.info(f" [{config['name_en']}] Balance: ${account.balance:.2f}, Leverage: {account.leverage}x") + while self.is_running: try: await self._connect_and_trade() @@ -96,7 +87,6 @@ class RealtimeTrader: self.ws = ws logger.info("WebSocket connected") - # 打印初始状态 self._print_status() async for message in ws: @@ -113,21 +103,21 @@ class RealtimeTrader: async def _process_tick(self, data: Dict[str, Any]): """处理每个 tick 数据""" - # 提取价格 self.current_price = float(data.get('p', 0)) if self.current_price <= 0: return - # 调用价格回调 if self.on_price_callback: self.on_price_callback(self.current_price) - # 检查止盈止损 - if self.trader.position: - close_result = self.trader._check_close_position(self.current_price) - if close_result: - self._on_position_closed(close_result) + # 检查各周期止盈止损 + for tf in TimeFrame: + account = self.trader.accounts[tf] + if account.position and account.position.side != 'FLAT': + close_result = self.trader._check_close_position(tf, self.current_price) + if close_result: + self._on_position_closed(tf, close_result) # 定期检查信号 now = asyncio.get_event_loop().time() @@ -137,16 +127,20 @@ class RealtimeTrader: async def _check_and_execute_signal(self): """检查信号并执行交易""" - signal = self._load_latest_signal() + signal_data = self._load_latest_signal() - if not signal: + if not signal_data: return - result = self.trader.process_signal(signal, self.current_price) + results = self.trader.process_signal(signal_data, self.current_price) - if result['action'] in ['OPEN', 'CLOSE', 'REVERSE', 'ADD', 'PARTIAL_CLOSE']: - self._on_trade_executed(result) - self._print_status() + # 处理各周期结果 + for tf_value, result in results.get('timeframes', {}).items(): + if result['action'] in ['OPEN', 'CLOSE', 'REVERSE']: + tf = TimeFrame(tf_value) + self._on_trade_executed(tf, result) + + self._print_status() def _load_latest_signal(self) -> Optional[Dict[str, Any]]: """加载最新信号""" @@ -160,90 +154,64 @@ class RealtimeTrader: logger.error(f"Error loading signal: {e}") return None - def _on_trade_executed(self, result: Dict[str, Any]): + def _on_trade_executed(self, tf: TimeFrame, result: Dict[str, Any]): """交易执行回调""" + config = TIMEFRAME_CONFIG[tf] action = result['action'] - details = result['details'] + details = result.get('details', {}) if action == 'OPEN': logger.info("=" * 60) - logger.info(f"🟢 OPEN {details['side']}") + logger.info(f"🟢 [{config['name_en']}] OPEN {details['side']}") logger.info(f" Entry: ${details['entry_price']:.2f}") logger.info(f" Size: {details['size']:.6f} BTC") - logger.info(f" Total Size: {details['total_size']:.6f} BTC") logger.info(f" Stop Loss: ${details['stop_loss']:.2f}") logger.info(f" Take Profit: ${details['take_profit']:.2f}") logger.info("=" * 60) - elif action == 'ADD': - logger.info("=" * 60) - logger.info(f"➕ ADD POSITION {details['side']}") - logger.info(f" Add Price: ${details['add_price']:.2f}") - logger.info(f" Add Size: {details['add_size']:.6f} BTC") - logger.info(f" Total Size: {details['total_size']:.6f} BTC") - logger.info(f" Avg Entry: ${details['avg_entry_price']:.2f}") - logger.info(f" Entries: {details['num_entries']}") - logger.info("=" * 60) - elif action == 'CLOSE': - pnl = details['pnl'] + pnl = details.get('pnl', 0) pnl_icon = "🟢" if pnl > 0 else "🔴" logger.info("=" * 60) - logger.info(f"{pnl_icon} CLOSE {details['side']}") + logger.info(f"{pnl_icon} [{config['name_en']}] CLOSE {details['side']}") logger.info(f" Entry: ${details['entry_price']:.2f}") logger.info(f" Exit: ${details['exit_price']:.2f}") - logger.info(f" Size: {details['size']:.6f} BTC") - logger.info(f" Entries: {details.get('num_entries', 1)}") logger.info(f" PnL: ${pnl:.2f} ({details['pnl_pct']:.2f}%)") logger.info(f" Reason: {details['reason']}") logger.info(f" New Balance: ${details['new_balance']:.2f}") logger.info("=" * 60) - elif action == 'PARTIAL_CLOSE': - pnl = details['pnl'] - pnl_icon = "🟢" if pnl > 0 else "🔴" - logger.info("=" * 60) - logger.info(f"📉 PARTIAL CLOSE {details['side']}") - logger.info(f" Closed Size: {details['closed_size']:.6f} BTC") - logger.info(f" Exit: ${details['exit_price']:.2f}") - logger.info(f" {pnl_icon} PnL: ${pnl:.2f} ({details['pnl_pct']:.2f}%)") - logger.info(f" Remaining: {details['remaining_size']:.6f} BTC") - logger.info(f" New Balance: ${details['new_balance']:.2f}") - logger.info("=" * 60) - elif action == 'REVERSE': logger.info("=" * 60) - logger.info("🔄 REVERSE POSITION") + logger.info(f"🔄 [{config['name_en']}] REVERSE POSITION") if 'close' in details: - logger.info(f" Closed: PnL ${details['close']['pnl']:.2f}") + logger.info(f" Closed: PnL ${details['close'].get('pnl', 0):.2f}") if 'open' in details: logger.info(f" Opened: {details['open']['side']} @ ${details['open']['entry_price']:.2f}") logger.info("=" * 60) - # 调用外部回调 if self.on_trade_callback: - self.on_trade_callback(result) + self.on_trade_callback({'timeframe': tf.value, 'action': action, 'details': details}) - def _on_position_closed(self, close_result: Dict[str, Any]): + def _on_position_closed(self, tf: TimeFrame, close_result: Dict[str, Any]): """持仓被平仓回调(止盈止损)""" - pnl = close_result['pnl'] + config = TIMEFRAME_CONFIG[tf] + pnl = close_result.get('pnl', 0) pnl_icon = "🟢" if pnl > 0 else "🔴" - reason_icon = "🎯" if close_result['reason'] == 'TAKE_PROFIT' else "🛑" + reason = close_result.get('reason', '') + reason_icon = "🎯" if reason == 'TAKE_PROFIT' else "🛑" logger.info("=" * 60) - logger.info(f"{reason_icon} {close_result['reason']}") - logger.info(f" {pnl_icon} PnL: ${pnl:.2f} ({close_result['pnl_pct']:.2f}%)") - logger.info(f" Entry: ${close_result['entry_price']:.2f}") - logger.info(f" Exit: ${close_result['exit_price']:.2f}") - logger.info(f" Size: {close_result['size']:.6f} BTC") - logger.info(f" Entries: {close_result.get('num_entries', 1)}") - logger.info(f" New Balance: ${close_result['new_balance']:.2f}") + logger.info(f"{reason_icon} [{config['name_en']}] {reason}") + logger.info(f" {pnl_icon} PnL: ${pnl:.2f} ({close_result.get('pnl_pct', 0):.2f}%)") + logger.info(f" Entry: ${close_result.get('entry_price', 0):.2f}") + logger.info(f" Exit: ${close_result.get('exit_price', 0):.2f}") + logger.info(f" New Balance: ${close_result.get('new_balance', 0):.2f}") logger.info("=" * 60) - self._print_status() - if self.on_trade_callback: self.on_trade_callback({ + 'timeframe': tf.value, 'action': 'CLOSE', 'details': close_result, }) @@ -252,44 +220,39 @@ class RealtimeTrader: """打印当前状态""" status = self.trader.get_status(self.current_price) - print("\n" + "=" * 70) - print(f"📊 PAPER TRADING STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - print("=" * 70) - print(f"💰 Balance: ${status['balance']:.2f} (Initial: ${status['initial_balance']:.2f})") - print(f"📈 Total Return: {status['total_return']:.2f}%") + print("\n" + "=" * 80) + print(f"📊 MULTI-TIMEFRAME TRADING STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print("=" * 80) print(f"💵 Current Price: ${self.current_price:.2f}") + print(f"💰 Total Balance: ${status['total_balance']:.2f} (Initial: ${status['total_initial_balance']:.2f})") + print(f"📈 Total Return: {status['total_return']:.2f}%") + print("-" * 80) - if status['position']: - pos = status['position'] - unrealized = pos.get('unrealized_pnl', 0) - unrealized_pct = pos.get('unrealized_pnl_pct', 0) - pnl_icon = "🟢" if unrealized > 0 else "🔴" if unrealized < 0 else "⚪" - print(f"\n📍 Position: {pos['side']} ({len(pos.get('entries', []))} entries)") - print(f" Avg Entry: ${pos['avg_entry_price']:.2f}") - print(f" Size: {pos['total_size']:.6f} BTC") - print(f" Stop Loss: ${pos['stop_loss']:.2f}") - print(f" Take Profit: ${pos['take_profit']:.2f}") - print(f" {pnl_icon} Unrealized PnL: ${unrealized:.2f} ({unrealized_pct:.2f}%)") - else: - print("\n📍 Position: FLAT (No position)") + for tf_value, tf_status in status['timeframes'].items(): + name = tf_status['name_en'] + balance = tf_status['balance'] + return_pct = tf_status['return_pct'] + leverage = tf_status['leverage'] + stats = tf_status['stats'] - stats = status['stats'] - print(f"\n📊 Statistics:") - print(f" Total Trades: {stats['total_trades']}") - print(f" Win Rate: {stats['win_rate']:.1f}%") - print(f" Total PnL: ${stats['total_pnl']:.2f}") - print(f" Profit Factor: {stats['profit_factor']:.2f}") - print(f" Max Drawdown: {stats['max_drawdown']:.2f}%") - print(f" Max Consecutive Wins: {stats.get('max_consecutive_wins', 0)}") - print(f" Max Consecutive Losses: {stats.get('max_consecutive_losses', 0)}") + return_icon = "🟢" if return_pct > 0 else "🔴" if return_pct < 0 else "⚪" - if status['recent_trades']: - print(f"\n📝 Recent Trades:") - for trade in status['recent_trades'][-5:]: - pnl_icon = "🟢" if trade['pnl'] > 0 else "🔴" - print(f" {pnl_icon} {trade['side']} | PnL: ${trade['pnl']:.2f} ({trade['pnl_pct']:.1f}%) | {trade['exit_reason']}") + print(f"\n📊 {name} ({leverage}x)") + print(f" Balance: ${balance:.2f} | Return: {return_icon} {return_pct:+.2f}%") + print(f" Trades: {stats['total_trades']} | Win Rate: {stats['win_rate']:.1f}% | PnL: ${stats['total_pnl']:.2f}") - print("=" * 70 + "\n") + pos = tf_status.get('position') + if pos: + unrealized = pos.get('unrealized_pnl', 0) + unrealized_pct = pos.get('unrealized_pnl_pct', 0) + pnl_icon = "🟢" if unrealized > 0 else "🔴" if unrealized < 0 else "⚪" + print(f" Position: {pos['side']} @ ${pos['entry_price']:.2f}") + print(f" SL: ${pos['stop_loss']:.2f} | TP: ${pos['take_profit']:.2f}") + print(f" {pnl_icon} Unrealized: ${unrealized:.2f} ({unrealized_pct:+.2f}%)") + else: + print(f" Position: FLAT") + + print("=" * 80 + "\n") def stop(self): """停止交易""" @@ -303,29 +266,21 @@ class RealtimeTrader: async def main(): """主函数""" - import os from dotenv import load_dotenv - # 加载环境变量 load_dotenv(Path(__file__).parent.parent / '.env') - # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) - # 创建交易器 trader = RealtimeTrader( symbol='btcusdt', initial_balance=10000.0, - leverage=5, - max_position_pct=0.5, # 最大持仓50%资金 - base_position_pct=0.1, # 每次入场10%资金 - signal_check_interval=30, # 每30秒检查一次信号 + signal_check_interval=30, ) - # 设置信号处理 def signal_handler(sig, frame): logger.info("Received shutdown signal") trader.stop() @@ -333,19 +288,16 @@ async def main(): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - # 启动 - print("\n" + "=" * 70) - print("🚀 REALTIME PAPER TRADING") - print("=" * 70) - print("Position Management:") - print(" - Max position: 50% of balance") - print(" - Base entry: 10% of balance") - print(" - Max entries: 5 (pyramid)") - print(" - Pyramid factor: 0.8x per entry") - print(" - Signal cooldown: 5 minutes") - print("=" * 70) + print("\n" + "=" * 80) + print("🚀 MULTI-TIMEFRAME REALTIME TRADING") + print("=" * 80) + print("Timeframes:") + print(" 📈 Short-term (5m/15m/1h) - 5x leverage") + print(" 📊 Medium-term (4h/1d) - 3x leverage") + print(" 📉 Long-term (1d/1w) - 2x leverage") + print("=" * 80) print("Press Ctrl+C to stop") - print("=" * 70 + "\n") + print("=" * 80 + "\n") await trader.start() diff --git a/web/api.py b/web/api.py index 801c9b8..0b8e7d4 100644 --- a/web/api.py +++ b/web/api.py @@ -1,24 +1,23 @@ """ -FastAPI Web Service - 模拟盘状态展示 API +FastAPI Web Service - 多周期交易状态展示 API """ import json import asyncio from datetime import datetime from pathlib import Path -from typing import Dict, Any, Optional, List +from typing import Dict, Any, List from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles -from fastapi.responses import HTMLResponse, FileResponse -from pydantic import BaseModel +from fastapi.responses import FileResponse # 状态文件路径 STATE_FILE = Path(__file__).parent.parent / 'output' / 'paper_trading_state.json' SIGNAL_FILE = Path(__file__).parent.parent / 'output' / 'latest_signal.json' -app = FastAPI(title="Paper Trading Dashboard", version="1.0.0") +app = FastAPI(title="Trading Dashboard", version="2.0.0") + -# WebSocket 连接管理 class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] @@ -28,7 +27,8 @@ class ConnectionManager: self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): - self.active_connections.remove(websocket) + if websocket in self.active_connections: + self.active_connections.remove(websocket) async def broadcast(self, message: dict): for connection in self.active_connections: @@ -37,6 +37,7 @@ class ConnectionManager: except: pass + manager = ConnectionManager() @@ -49,8 +50,23 @@ def load_trading_state() -> Dict[str, Any]: except Exception as e: print(f"Error loading state: {e}") + # 返回默认状态 return { - 'balance': 10000.0, + 'accounts': { + 'short': _default_account('short', 10000), + 'medium': _default_account('medium', 10000), + 'long': _default_account('long', 10000), + }, + 'last_updated': None, + } + + +def _default_account(timeframe: str, balance: float) -> Dict: + return { + 'timeframe': timeframe, + 'balance': balance, + 'initial_balance': balance, + 'leverage': 10, # 所有周期统一 10 倍杠杆 'position': None, 'trades': [], 'stats': { @@ -59,7 +75,7 @@ def load_trading_state() -> Dict[str, Any]: 'losing_trades': 0, 'total_pnl': 0.0, 'max_drawdown': 0.0, - 'peak_balance': 10000.0, + 'peak_balance': balance, 'win_rate': 0.0, }, 'equity_curve': [], @@ -83,49 +99,85 @@ async def root(): html_file = Path(__file__).parent / 'static' / 'index.html' if html_file.exists(): return FileResponse(html_file) - return HTMLResponse("
Static files not found
") + return {"error": "Static files not found"} @app.get("/api/status") async def get_status(): - """获取模拟盘状态""" + """获取多周期交易状态""" state = load_trading_state() - signal = load_latest_signal() + accounts = state.get('accounts', {}) - # 计算总收益率 - initial_balance = 10000.0 - total_return = (state.get('balance', initial_balance) - initial_balance) / initial_balance * 100 + # 计算总余额 + total_balance = sum(acc.get('balance', 0) for acc in accounts.values()) + total_initial = sum(acc.get('initial_balance', 0) for acc in accounts.values()) + total_return = (total_balance - total_initial) / total_initial * 100 if total_initial > 0 else 0 + + # 构建各周期状态 + timeframes = {} + for tf_key, acc in accounts.items(): + initial = acc.get('initial_balance', 0) + balance = acc.get('balance', 0) + return_pct = (balance - initial) / initial * 100 if initial > 0 else 0 + + timeframes[tf_key] = { + 'name': '短周期' if tf_key == 'short' else '中周期' if tf_key == 'medium' else '长周期', + 'name_en': 'Short-term' if tf_key == 'short' else 'Medium-term' if tf_key == 'medium' else 'Long-term', + 'balance': balance, + 'initial_balance': initial, + 'return_pct': return_pct, + 'leverage': acc.get('leverage', 1), + 'position': acc.get('position'), + 'stats': acc.get('stats', {}), + } return { 'timestamp': datetime.now().isoformat(), - 'balance': state.get('balance', initial_balance), - 'initial_balance': initial_balance, + 'total_balance': total_balance, + 'total_initial_balance': total_initial, 'total_return': total_return, - 'position': state.get('position'), - 'stats': state.get('stats', {}), + 'timeframes': timeframes, 'last_updated': state.get('last_updated'), } @app.get("/api/trades") -async def get_trades(limit: int = 50): +async def get_trades(timeframe: str = None, limit: int = 50): """获取交易记录""" state = load_trading_state() - trades = state.get('trades', []) + accounts = state.get('accounts', {}) + + all_trades = [] + for tf_key, acc in accounts.items(): + if timeframe and tf_key != timeframe: + continue + trades = acc.get('trades', []) + all_trades.extend(trades) + + # 按时间排序 + all_trades.sort(key=lambda x: x.get('exit_time', ''), reverse=True) + return { - 'total': len(trades), - 'trades': trades[-limit:] if limit > 0 else trades, + 'total': len(all_trades), + 'trades': all_trades[:limit] if limit > 0 else all_trades, } @app.get("/api/equity") -async def get_equity_curve(limit: int = 500): +async def get_equity_curve(timeframe: str = None, limit: int = 500): """获取权益曲线""" state = load_trading_state() - equity_curve = state.get('equity_curve', []) + accounts = state.get('accounts', {}) + + result = {} + for tf_key, acc in accounts.items(): + if timeframe and tf_key != timeframe: + continue + equity_curve = acc.get('equity_curve', []) + result[tf_key] = equity_curve[-limit:] if limit > 0 else equity_curve + return { - 'total': len(equity_curve), - 'data': equity_curve[-limit:] if limit > 0 else equity_curve, + 'data': result, } @@ -134,51 +186,51 @@ async def get_signal(): """获取最新信号""" signal = load_latest_signal() - # 提取关键信息 agg = signal.get('aggregated_signal', {}) llm = agg.get('llm_signal', {}) - quant = agg.get('quantitative_signal', {}) market = signal.get('market_analysis', {}) + # 提取各周期机会 + opportunities = llm.get('opportunities', {}) + return { 'timestamp': agg.get('timestamp'), 'final_signal': agg.get('final_signal'), 'final_confidence': agg.get('final_confidence'), - 'consensus': agg.get('consensus'), - 'current_price': agg.get('levels', {}).get('current_price'), - 'llm': { - 'signal': llm.get('signal_type'), - 'confidence': llm.get('confidence'), - 'reasoning': llm.get('reasoning'), - 'opportunities': llm.get('opportunities', {}), - 'recommendations': llm.get('recommendations_by_timeframe', {}), - }, - 'quantitative': { - 'signal': quant.get('signal_type'), - 'confidence': quant.get('confidence'), - 'composite_score': quant.get('composite_score'), - 'scores': quant.get('scores', {}), - }, - 'market': { - 'price': market.get('price'), - 'trend': market.get('trend', {}), - 'momentum': market.get('momentum', {}), + 'current_price': agg.get('levels', {}).get('current_price') or market.get('price'), + 'opportunities': { + 'short': opportunities.get('short_term_5m_15m_1h') or opportunities.get('intraday'), + 'medium': opportunities.get('medium_term_4h_1d') or opportunities.get('swing'), + 'long': opportunities.get('long_term_1d_1w'), }, + 'reasoning': llm.get('reasoning'), + 'recommendations': llm.get('recommendations_by_timeframe', {}), } -@app.get("/api/position") -async def get_position(): - """获取当前持仓详情""" +@app.get("/api/timeframe/{timeframe}") +async def get_timeframe_detail(timeframe: str): + """获取单个周期详情""" state = load_trading_state() - position = state.get('position') + accounts = state.get('accounts', {}) - if not position: - return {'has_position': False, 'position': None} + if timeframe not in accounts: + return {"error": f"Timeframe '{timeframe}' not found"} + + acc = accounts[timeframe] + initial = acc.get('initial_balance', 0) + balance = acc.get('balance', 0) return { - 'has_position': position.get('side') != 'FLAT' and position.get('total_size', 0) > 0, - 'position': position, + 'timeframe': timeframe, + 'balance': balance, + 'initial_balance': initial, + 'return_pct': (balance - initial) / initial * 100 if initial > 0 else 0, + 'leverage': acc.get('leverage', 1), + 'position': acc.get('position'), + 'stats': acc.get('stats', {}), + 'recent_trades': acc.get('trades', [])[-20:], + 'equity_curve': acc.get('equity_curve', [])[-200:], } @@ -202,9 +254,8 @@ async def websocket_endpoint(websocket: WebSocket): last_signal_mtime = SIGNAL_FILE.stat().st_mtime if SIGNAL_FILE.exists() else 0 while True: - await asyncio.sleep(1) # 每秒检查一次 + await asyncio.sleep(1) - # 检查状态文件更新 current_state_mtime = STATE_FILE.stat().st_mtime if STATE_FILE.exists() else 0 current_signal_mtime = SIGNAL_FILE.stat().st_mtime if SIGNAL_FILE.exists() else 0 @@ -239,4 +290,4 @@ if static_dir.exists(): if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8080) + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/web/static/index.html b/web/static/index.html index 0617229..8d51866 100644 --- a/web/static/index.html +++ b/web/static/index.html @@ -11,22 +11,9 @@ tailwind.config = { theme: { extend: { - fontFamily: { - sans: ['Inter', 'system-ui', 'sans-serif'], - }, + fontFamily: { sans: ['Inter', 'system-ui', 'sans-serif'] }, colors: { - primary: { - 50: '#f0f9ff', - 100: '#e0f2fe', - 200: '#bae6fd', - 300: '#7dd3fc', - 400: '#38bdf8', - 500: '#0ea5e9', - 600: '#0284c7', - 700: '#0369a1', - 800: '#075985', - 900: '#0c4a6e', - }, + primary: { 400: '#38bdf8', 500: '#0ea5e9', 600: '#0284c7' }, success: '#10b981', danger: '#ef4444', warning: '#f59e0b', @@ -36,544 +23,393 @@ } -BTC/USDT Perpetual
+BTC/USDT Perpetual • Multi-Timeframe • Powered by Quantitative Analysis & AI
No Position
-Waiting for signal...
-Loading Signal
-Fetching latest analysis...
+| ID | -Side | -Entry | -Exit | -Size | -PnL | -Reason | -Time | +
|---|---|---|---|---|---|---|---|
| TF | +Side | +Entry | +Exit | +PnL | +Reason | +Time | |
|
-
- No trades yet - |
- |||||||
| No trades yet | |||||||