diff --git a/config/settings.py b/config/settings.py index 9608f89..2544654 100644 --- a/config/settings.py +++ b/config/settings.py @@ -2,6 +2,7 @@ Configuration settings for Signal Generation System Pure API mode - no Redis dependency """ +from typing import List from pydantic_settings import BaseSettings, SettingsConfigDict @@ -14,8 +15,14 @@ class Settings(BaseSettings): extra="ignore" # Ignore extra fields from environment ) - # Symbol Configuration - SYMBOL: str = "BTCUSDT" + # Symbol Configuration - 支持多币种 + SYMBOLS: str = "BTCUSDT,ETHUSDT" # 逗号分隔的交易对列表 + SYMBOL: str = "BTCUSDT" # 向后兼容,默认主币种 + + @property + def symbols_list(self) -> List[str]: + """解析币种列表""" + return [s.strip().upper() for s in self.SYMBOLS.split(',') if s.strip()] # Binance API Configuration BINANCE_API_BASE_URL: str = "https://fapi.binance.com" diff --git a/scheduler.py b/scheduler.py index 05fcad5..c3ab3f2 100644 --- a/scheduler.py +++ b/scheduler.py @@ -2,13 +2,17 @@ Signal Generation Scheduler - 定时生成交易信号 每隔指定时间间隔自动运行量化分析和LLM决策 +支持多币种: BTC/USDT, ETH/USDT 等 """ import asyncio import logging import signal import sys +import os +import json from datetime import datetime from pathlib import Path +from typing import Dict, Any, List # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent)) @@ -29,25 +33,31 @@ logger = logging.getLogger(__name__) class SignalScheduler: - """定时信号生成调度器""" + """定时信号生成调度器 - 支持多币种""" - def __init__(self, interval_minutes: int = 5): + def __init__(self, interval_minutes: int = 5, symbols: List[str] = None): """ Args: interval_minutes: 生成信号的时间间隔(分钟) + symbols: 交易对列表,如 ['BTCUSDT', 'ETHUSDT'] """ self.interval_minutes = interval_minutes self.is_running = False - # Initialize components - self.engine = MarketAnalysisEngine() - self.quant_generator = QuantitativeSignalGenerator() + # 支持多币种 + self.symbols = symbols or settings.symbols_list + logger.info(f"支持的交易对: {', '.join(self.symbols)}") - # Initialize LLM decision maker + # 为每个币种初始化分析引擎 + self.engines: Dict[str, MarketAnalysisEngine] = {} + for symbol in self.symbols: + self.engines[symbol] = MarketAnalysisEngine(symbol=symbol) + + # 共享组件 + self.quant_generator = QuantitativeSignalGenerator() self.llm_maker = LLMDecisionMaker(provider='openai') # Initialize DingTalk notifier - import os dingtalk_webhook = os.getenv('DINGTALK_WEBHOOK') dingtalk_secret = os.getenv('DINGTALK_SECRET') self.dingtalk = DingTalkNotifier( @@ -58,47 +68,52 @@ class SignalScheduler: logger.info(f"Signal Scheduler 初始化完成 - 每{interval_minutes}分钟生成一次信号") - async def generate_signal_once(self) -> dict: - """执行一次信号生成""" + async def generate_signal_for_symbol(self, symbol: str) -> Dict: + """为单个币种生成信号""" try: - logger.info("=" * 80) - logger.info(f"开始生成交易信号 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - logger.info("=" * 80) - - # Step 1: Market analysis - analysis = self.engine.analyze_current_market(timeframe='5m') - - if 'error' in analysis: - logger.warning(f"市场分析失败: {analysis['error']}") + engine = self.engines.get(symbol) + if not engine: + logger.error(f"未找到 {symbol} 的分析引擎") return None - logger.info(f"市场分析完成 - 价格: ${analysis['current_price']:,.2f}, 趋势: {analysis['trend_analysis'].get('direction')}") + logger.info(f"[{symbol}] 开始分析...") + + # Step 1: Market analysis + analysis = engine.analyze_current_market(timeframe='5m') + + if 'error' in analysis: + logger.warning(f"[{symbol}] 市场分析失败: {analysis['error']}") + return None + + logger.info(f"[{symbol}] 价格: ${analysis['current_price']:,.2f}, 趋势: {analysis['trend_analysis'].get('direction')}") # Step 2: Quantitative signal quant_signal = self.quant_generator.generate_signal(analysis) - logger.info(f"量化信号: {quant_signal['signal_type']} (得分: {quant_signal['composite_score']:.1f})") + logger.info(f"[{symbol}] 量化信号: {quant_signal['signal_type']} (得分: {quant_signal['composite_score']:.1f})") # Step 3: LLM decision - llm_signal = None - llm_context = self.engine.get_llm_context(format='full') + llm_context = engine.get_llm_context(format='full') llm_signal = self.llm_maker.generate_decision(llm_context, analysis) if llm_signal.get('enabled', True): - logger.info(f"LLM信号: {llm_signal['signal_type']} (置信度: {llm_signal.get('confidence', 0):.2%})") + logger.info(f"[{symbol}] LLM信号: {llm_signal['signal_type']} (置信度: {llm_signal.get('confidence', 0):.2%})") else: - logger.info("LLM未启用 (无API key)") + logger.info(f"[{symbol}] LLM未启用") # Step 4: Aggregate signals aggregated = SignalAggregator.aggregate_signals(quant_signal, llm_signal) + aggregated['symbol'] = symbol # 添加币种标识 - logger.info(f"最终信号: {aggregated['final_signal']} (置信度: {aggregated['final_confidence']:.2%})") + logger.info(f"[{symbol}] 最终信号: {aggregated['final_signal']} (置信度: {aggregated['final_confidence']:.2%})") - # Step 5: Save to file - output_file = Path(__file__).parent / 'output' / 'latest_signal.json' - output_file.parent.mkdir(exist_ok=True) + # Step 5: Save to file (每个币种独立文件) + output_dir = Path(__file__).parent / 'output' + output_dir.mkdir(exist_ok=True) - import json + # 保存独立信号文件 + symbol_file = output_dir / f'signal_{symbol.lower()}.json' output_data = { + 'symbol': symbol, 'timestamp': datetime.now().isoformat(), 'aggregated_signal': aggregated, 'market_analysis': { @@ -110,52 +125,88 @@ class SignalScheduler: 'llm_signal': llm_signal if llm_signal and llm_signal.get('enabled', True) else None, } - with open(output_file, 'w') as f: + with open(symbol_file, 'w') as f: json.dump(output_data, f, indent=2, ensure_ascii=False) - logger.info(f"信号已保存到: {output_file}") - - # Step 6: Send DingTalk notification - try: - final_signal = aggregated.get('final_signal', 'HOLD') - - should_notify = False - notify_reason = "" - - if final_signal in ['BUY', 'SELL']: - should_notify = True - notify_reason = f"明确{final_signal}信号" - elif final_signal == 'HOLD': - # 检查是否有日内机会 - llm_signal = aggregated.get('llm_signal') - if llm_signal and isinstance(llm_signal, dict): - opportunities = llm_signal.get('opportunities', {}) - short_term = opportunities.get('short_term_5m_15m_1h', {}) - if short_term.get('exists', False): - should_notify = True - direction = short_term.get('direction', 'N/A') - notify_reason = f"HOLD信号,但存在短期{direction}机会" - - if should_notify: - logger.info(f"发送钉钉通知 - {notify_reason}") - sent = self.dingtalk.send_signal(aggregated) - if sent: - logger.info(f"钉钉通知发送成功") - else: - logger.warning(f"钉钉通知发送失败或未配置") - else: - logger.info(f"HOLD信号且无日内机会,跳过钉钉通知") - except Exception as e: - logger.error(f"钉钉通知发送异常: {e}", exc_info=True) - - logger.info("=" * 80) - - return aggregated + return output_data except Exception as e: - logger.error(f"信号生成失败: {e}", exc_info=True) + logger.error(f"[{symbol}] 信号生成失败: {e}", exc_info=True) return None + async def generate_signal_once(self) -> Dict: + """为所有币种生成信号""" + logger.info("=" * 80) + logger.info(f"开始生成交易信号 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(f"交易对: {', '.join(self.symbols)}") + logger.info("=" * 80) + + all_signals = {} + + # 为每个币种生成信号 + for symbol in self.symbols: + result = await self.generate_signal_for_symbol(symbol) + if result: + all_signals[symbol] = result + + # 保存汇总信号文件 (latest_signal.json 保持向后兼容,使用第一个币种) + if all_signals: + # 合并所有信号到一个文件 + combined_file = Path(__file__).parent / 'output' / 'latest_signals.json' + with open(combined_file, 'w') as f: + json.dump({ + 'timestamp': datetime.now().isoformat(), + 'symbols': all_signals, + }, f, indent=2, ensure_ascii=False) + + # 向后兼容: latest_signal.json 使用第一个币种 + first_symbol = self.symbols[0] + if first_symbol in all_signals: + compat_file = Path(__file__).parent / 'output' / 'latest_signal.json' + with open(compat_file, 'w') as f: + json.dump(all_signals[first_symbol], f, indent=2, ensure_ascii=False) + + # Step 6: Send DingTalk notification for signals with opportunities + for symbol, signal_data in all_signals.items(): + await self._send_notification(symbol, signal_data) + + logger.info("=" * 80) + return all_signals + + async def _send_notification(self, symbol: str, signal_data: Dict): + """发送钉钉通知""" + try: + aggregated = signal_data.get('aggregated_signal', {}) + final_signal = aggregated.get('final_signal', 'HOLD') + + should_notify = False + notify_reason = "" + + if final_signal in ['BUY', 'SELL']: + should_notify = True + notify_reason = f"[{symbol}] 明确{final_signal}信号" + elif final_signal == 'HOLD': + llm_signal = aggregated.get('llm_signal') + if llm_signal and isinstance(llm_signal, dict): + opportunities = llm_signal.get('opportunities', {}) + short_term = opportunities.get('short_term_5m_15m_1h', {}) + if short_term.get('exists', False): + should_notify = True + direction = short_term.get('direction', 'N/A') + notify_reason = f"[{symbol}] 存在短期{direction}机会" + + if should_notify: + logger.info(f"发送钉钉通知 - {notify_reason}") + # 在信号中添加币种信息 + aggregated['symbol'] = symbol + sent = self.dingtalk.send_signal(aggregated) + if sent: + logger.info(f"[{symbol}] 钉钉通知发送成功") + else: + logger.warning(f"[{symbol}] 钉钉通知发送失败或未配置") + except Exception as e: + logger.error(f"[{symbol}] 钉钉通知发送异常: {e}", exc_info=True) + async def run(self): """启动调度器主循环""" self.is_running = True diff --git a/trading/paper_trading.py b/trading/paper_trading.py index 83af38a..9b83662 100644 --- a/trading/paper_trading.py +++ b/trading/paper_trading.py @@ -1,12 +1,12 @@ """ -Paper Trading Module - 多周期独立仓位管理 +Paper Trading Module - 多币种多周期独立仓位管理 -支持三个独立周期的模拟交易: +支持多币种 (BTC/USDT, ETH/USDT 等) 和三个独立周期的模拟交易: - 短周期 (5m/15m/1h): short_term_5m_15m_1h / intraday - 中周期 (4h/1d): medium_term_4h_1d / swing - 长周期 (1d/1w): long_term_1d_1w -每个周期独立管理: +每个币种的每个周期独立管理: - 独立仓位 - 独立止盈止损 - 独立统计数据 @@ -20,6 +20,8 @@ from pathlib import Path from dataclasses import dataclass, asdict, field from enum import Enum +from config.settings import settings + logger = logging.getLogger(__name__) @@ -162,18 +164,22 @@ class Trade: pnl: float pnl_pct: float exit_reason: str + symbol: str = "BTCUSDT" # 交易币种 def to_dict(self) -> dict: return asdict(self) @classmethod def from_dict(cls, data: dict) -> 'Trade': + # 兼容旧数据 + if 'symbol' not in data: + data['symbol'] = 'BTCUSDT' return cls(**data) @dataclass class TimeFrameAccount: - """单个周期的账户 + """单个币种单个周期的账户 资金结构: - initial_balance: 初始本金 @@ -187,6 +193,7 @@ class TimeFrameAccount: timeframe: str initial_balance: float leverage: int + symbol: str = "BTCUSDT" # 交易币种 realized_pnl: float = 0.0 # 已实现盈亏 position: Optional[Position] = None trades: List[Trade] = field(default_factory=list) @@ -228,6 +235,7 @@ class TimeFrameAccount: def to_dict(self) -> dict: return { 'timeframe': self.timeframe, + 'symbol': self.symbol, 'initial_balance': self.initial_balance, 'realized_pnl': self.realized_pnl, 'leverage': self.leverage, @@ -249,6 +257,7 @@ class TimeFrameAccount: timeframe=data['timeframe'], initial_balance=data['initial_balance'], leverage=data['leverage'], + symbol=data.get('symbol', 'BTCUSDT'), # 兼容旧数据 realized_pnl=realized_pnl, stats=data.get('stats', {}), equity_curve=data.get('equity_curve', []), @@ -260,28 +269,33 @@ class TimeFrameAccount: class MultiTimeframePaperTrader: - """多周期模拟盘交易器""" + """多币种多周期模拟盘交易器""" def __init__( self, initial_balance: float = 10000.0, - state_file: str = None + state_file: str = None, + symbols: List[str] = None ): self.initial_balance = initial_balance + # 支持的币种列表 + self.symbols = symbols or settings.symbols_list + logger.info(f"支持的交易对: {', '.join(self.symbols)}") + # 状态文件 if state_file: self.state_file = Path(state_file) else: self.state_file = Path(__file__).parent.parent / 'output' / 'paper_trading_state.json' - # 初始化三个周期账户 - self.accounts: Dict[TimeFrame, TimeFrameAccount] = {} + # 多币种多周期账户: {symbol: {TimeFrame: TimeFrameAccount}} + self.accounts: Dict[str, Dict[TimeFrame, TimeFrameAccount]] = {} # 加载或初始化状态 self._load_state() - logger.info(f"Multi-timeframe Paper Trader initialized: total_balance=${initial_balance:.2f}") + logger.info(f"Multi-symbol Multi-timeframe Paper Trader initialized: {len(self.symbols)} symbols") def _load_state(self): """加载持久化状态""" @@ -290,13 +304,33 @@ class MultiTimeframePaperTrader: with open(self.state_file, 'r') as f: state = json.load(f) - # 加载各周期账户 - for tf in TimeFrame: - tf_data = state.get('accounts', {}).get(tf.value) - if tf_data: - self.accounts[tf] = TimeFrameAccount.from_dict(tf_data) - else: - self._init_account(tf) + # 检查是否是新的多币种格式 + if 'symbols' in state: + # 新格式: {symbols: {BTCUSDT: {short: {...}, medium: {...}, long: {...}}, ...}} + for symbol in self.symbols: + symbol_data = state.get('symbols', {}).get(symbol, {}) + self.accounts[symbol] = {} + for tf in TimeFrame: + tf_data = symbol_data.get(tf.value) + if tf_data: + self.accounts[symbol][tf] = TimeFrameAccount.from_dict(tf_data) + else: + self._init_account(symbol, tf) + else: + # 旧格式: {accounts: {short: {...}, medium: {...}, long: {...}}} + # 将旧数据迁移到第一个币种 (BTCUSDT) + first_symbol = self.symbols[0] if self.symbols else 'BTCUSDT' + self.accounts[first_symbol] = {} + for tf in TimeFrame: + tf_data = state.get('accounts', {}).get(tf.value) + if tf_data: + tf_data['symbol'] = first_symbol # 添加 symbol 字段 + self.accounts[first_symbol][tf] = TimeFrameAccount.from_dict(tf_data) + else: + self._init_account(first_symbol, tf) + # 初始化其他币种 + for symbol in self.symbols[1:]: + self._init_symbol_accounts(symbol) logger.info(f"Loaded state from {self.state_file}") except Exception as e: @@ -306,18 +340,25 @@ class MultiTimeframePaperTrader: self._init_all_accounts() def _init_all_accounts(self): - """初始化所有账户""" - for tf in TimeFrame: - self._init_account(tf) + """初始化所有币种所有周期账户""" + for symbol in self.symbols: + self._init_symbol_accounts(symbol) - def _init_account(self, tf: TimeFrame): - """初始化单个周期账户""" + def _init_symbol_accounts(self, symbol: str): + """初始化单个币种的所有周期账户""" + self.accounts[symbol] = {} + for tf in TimeFrame: + self._init_account(symbol, tf) + + def _init_account(self, symbol: str, tf: TimeFrame): + """初始化单个币种单个周期账户""" config = TIMEFRAME_CONFIG[tf] - # 每个周期独立初始资金 10000 USD,10倍杠杆,最大仓位价值 100000 USD - self.accounts[tf] = TimeFrameAccount( + # 每个币种每个周期独立初始资金 10000 USD,10倍杠杆,最大仓位价值 100000 USD + self.accounts[symbol][tf] = TimeFrameAccount( timeframe=tf.value, initial_balance=config['initial_balance'], leverage=config['leverage'], + symbol=symbol, realized_pnl=0.0, ) @@ -325,34 +366,95 @@ class MultiTimeframePaperTrader: """保存状态到文件""" self.state_file.parent.mkdir(parents=True, exist_ok=True) + # 新格式: {symbols: {BTCUSDT: {short: {...}, ...}, ETHUSDT: {...}}, accounts: {...}} + symbols_data = {} + for symbol, tf_accounts in self.accounts.items(): + symbols_data[symbol] = { + tf.value: acc.to_dict() for tf, acc in tf_accounts.items() + } + + # 同时保留旧格式兼容 (使用第一个币种) + first_symbol = self.symbols[0] if self.symbols else 'BTCUSDT' + legacy_accounts = {} + if first_symbol in self.accounts: + legacy_accounts = { + tf.value: acc.to_dict() for tf, acc in self.accounts[first_symbol].items() + } + state = { - 'accounts': {tf.value: acc.to_dict() for tf, acc in self.accounts.items()}, + 'symbols': symbols_data, + 'accounts': legacy_accounts, # 向后兼容 'last_updated': datetime.now().isoformat(), } with open(self.state_file, 'w') as f: json.dump(state, f, indent=2, ensure_ascii=False) - def process_signal(self, signal: Dict[str, Any], current_price: float) -> Dict[str, Any]: - """处理交易信号 - 检查所有周期""" + def process_signal( + self, + signal: Dict[str, Any], + current_price: float, + symbol: str = None + ) -> Dict[str, Any]: + """处理单个币种的交易信号 - 检查所有周期 + + Args: + signal: 该币种的信号数据 + current_price: 该币种当前价格 + symbol: 交易对,如 'BTCUSDT'。若未指定则使用第一个币种 + """ + symbol = symbol or (self.symbols[0] if self.symbols else 'BTCUSDT') + + # 确保该币种的账户已初始化 + if symbol not in self.accounts: + self._init_symbol_accounts(symbol) + results = { 'timestamp': datetime.now().isoformat(), + 'symbol': symbol, 'current_price': current_price, 'timeframes': {}, } for tf in TimeFrame: - result = self._process_timeframe_signal(tf, signal, current_price) + result = self._process_timeframe_signal(symbol, tf, signal, current_price) results['timeframes'][tf.value] = result self._save_state() return results - def _process_timeframe_signal( - self, tf: TimeFrame, signal: Dict[str, Any], current_price: float + def process_all_signals( + self, + signals: Dict[str, Dict[str, Any]], + prices: Dict[str, float] ) -> Dict[str, Any]: - """处理单个周期的信号""" - account = self.accounts[tf] + """处理所有币种的信号 + + Args: + signals: {symbol: signal_data} 各币种的信号 + prices: {symbol: price} 各币种的当前价格 + """ + results = { + 'timestamp': datetime.now().isoformat(), + 'symbols': {}, + } + + for symbol in self.symbols: + if symbol in signals and symbol in prices: + result = self.process_signal( + signal=signals[symbol], + current_price=prices[symbol], + symbol=symbol + ) + results['symbols'][symbol] = result + + return results + + def _process_timeframe_signal( + self, symbol: str, tf: TimeFrame, signal: Dict[str, Any], current_price: float + ) -> Dict[str, Any]: + """处理单个币种单个周期的信号""" + account = self.accounts[symbol][tf] config = TIMEFRAME_CONFIG[tf] result = { @@ -361,11 +463,11 @@ class MultiTimeframePaperTrader: } # 更新权益曲线 - self._update_equity_curve(tf, current_price) + self._update_equity_curve(symbol, tf, current_price) # 1. 检查止盈止损 if account.position and account.position.side != 'FLAT': - close_result = self._check_close_position(tf, current_price) + close_result = self._check_close_position(symbol, tf, current_price) if close_result: result['action'] = 'CLOSE' result['details'] = close_result @@ -417,17 +519,17 @@ class MultiTimeframePaperTrader: # 反向信号:只平仓不开反向仓 if (account.position.side == 'LONG' and direction == 'SHORT') or \ (account.position.side == 'SHORT' and direction == 'LONG'): - close_result = self._close_position(tf, current_price, 'SIGNAL_REVERSE') + close_result = self._close_position(symbol, tf, current_price, 'SIGNAL_REVERSE') result['action'] = 'CLOSE' result['details'] = close_result logger.info( - f"[{config['name']}] 反向信号平仓,等待下一周期新信号" + f"[{symbol}][{config['name']}] 反向信号平仓,等待下一周期新信号" ) return result else: # 同方向信号:尝试金字塔加仓 add_result = self._add_position( - tf, current_price, + symbol, tf, current_price, signal_stop_loss, signal_take_profit, tf_signal.get('reasoning', '')[:100] ) @@ -439,14 +541,14 @@ class MultiTimeframePaperTrader: result['action'] = 'HOLD' result['details'] = { 'position': account.position.to_dict(), - 'unrealized_pnl': self._calc_unrealized_pnl(tf, current_price), + 'unrealized_pnl': self._calc_unrealized_pnl(symbol, tf, current_price), 'reason': '已达最大仓位层级', } return result # 4. 无持仓,开新仓(首仓) open_result = self._open_position( - tf, direction, current_price, + symbol, tf, direction, current_price, signal_stop_loss, signal_take_profit, tf_signal.get('reasoning', '')[:100] ) @@ -488,28 +590,28 @@ class MultiTimeframePaperTrader: logger.error(f"Error extracting signal: {e}") return None - def _get_max_position_value(self, tf: TimeFrame) -> float: + def _get_max_position_value(self, symbol: str, tf: TimeFrame) -> float: """获取最大仓位价值(本金 × 杠杆)""" - account = self.accounts[tf] + account = self.accounts[symbol][tf] return account.initial_balance * account.leverage - def _get_current_position_value(self, tf: TimeFrame, current_price: float) -> float: + def _get_current_position_value(self, symbol: str, tf: TimeFrame, current_price: float) -> float: """获取当前仓位价值""" - account = self.accounts[tf] + account = self.accounts[symbol][tf] if not account.position or account.position.side == 'FLAT': return 0.0 return account.position.size * current_price def _open_position( - self, tf: TimeFrame, direction: str, price: float, + self, symbol: str, tf: TimeFrame, direction: str, price: float, stop_loss: float, take_profit: float, reasoning: str ) -> Optional[Dict]: """开首仓(金字塔第一层)""" - account = self.accounts[tf] + account = self.accounts[symbol][tf] config = TIMEFRAME_CONFIG[tf] # 计算首仓仓位:最大仓位 × 首仓比例 - max_position_value = self._get_max_position_value(tf) + max_position_value = self._get_max_position_value(symbol, tf) first_level_ratio = PYRAMID_LEVELS[0] # 40% position_value = max_position_value * first_level_ratio margin = position_value / account.leverage @@ -518,7 +620,7 @@ class MultiTimeframePaperTrader: # 检查可用余额是否足够 available_balance = account.get_available_balance() if available_balance < margin: - logger.warning(f"[{config['name']}] 可用余额不足: ${available_balance:.2f} < ${margin:.2f}") + logger.warning(f"[{symbol}][{config['name']}] 可用余额不足: ${available_balance:.2f} < ${margin:.2f}") return None if size <= 0: @@ -542,13 +644,17 @@ class MultiTimeframePaperTrader: signal_reasoning=reasoning, ) + # 确定单位名称 + unit = symbol.replace('USDT', '') if symbol.endswith('USDT') else symbol + logger.info( - f"[{config['name']}] OPEN {direction} [L1/{len(PYRAMID_LEVELS)}]: price=${price:.2f}, " - f"size={size:.6f} BTC, margin=${margin:.2f}, value=${position_value:.2f}, " + f"[{symbol}][{config['name']}] OPEN {direction} [L1/{len(PYRAMID_LEVELS)}]: price=${price:.2f}, " + f"size={size:.6f} {unit}, margin=${margin:.2f}, value=${position_value:.2f}, " f"SL=${stop_loss:.2f}, TP=${take_profit:.2f}" ) return { + 'symbol': symbol, 'timeframe': tf.value, 'side': direction, 'entry_price': price, @@ -562,11 +668,11 @@ class MultiTimeframePaperTrader: } def _add_position( - self, tf: TimeFrame, price: float, + self, symbol: str, tf: TimeFrame, price: float, stop_loss: float, take_profit: float, reasoning: str ) -> Optional[Dict]: """金字塔加仓""" - account = self.accounts[tf] + account = self.accounts[symbol][tf] config = TIMEFRAME_CONFIG[tf] pos = account.position @@ -576,11 +682,11 @@ class MultiTimeframePaperTrader: # 检查是否已达最大层级 current_level = pos.pyramid_level if current_level >= len(PYRAMID_LEVELS): - logger.info(f"[{config['name']}] 已达最大仓位层级 {current_level}/{len(PYRAMID_LEVELS)}") + logger.info(f"[{symbol}][{config['name']}] 已达最大仓位层级 {current_level}/{len(PYRAMID_LEVELS)}") return None # 计算加仓仓位 - max_position_value = self._get_max_position_value(tf) + max_position_value = self._get_max_position_value(symbol, tf) level_ratio = PYRAMID_LEVELS[current_level] add_position_value = max_position_value * level_ratio add_margin = add_position_value / account.leverage @@ -590,7 +696,7 @@ class MultiTimeframePaperTrader: available_balance = account.get_available_balance() if available_balance < add_margin: logger.warning( - f"[{config['name']}] 加仓余额不足: ${available_balance:.2f} < ${add_margin:.2f}" + f"[{symbol}][{config['name']}] 加仓余额不足: ${available_balance:.2f} < ${add_margin:.2f}" ) return None @@ -608,15 +714,19 @@ class MultiTimeframePaperTrader: pos.stop_loss = stop_loss pos.take_profit = take_profit + # 确定单位名称 + unit = symbol.replace('USDT', '') if symbol.endswith('USDT') else symbol + new_level = pos.pyramid_level logger.info( - f"[{config['name']}] ADD {pos.side} [L{new_level}/{len(PYRAMID_LEVELS)}]: price=${price:.2f}, " - f"add_size={add_size:.6f} BTC, add_margin=${add_margin:.2f}, " - f"total_size={pos.size:.6f} BTC, total_margin=${pos.margin:.2f}, " + f"[{symbol}][{config['name']}] ADD {pos.side} [L{new_level}/{len(PYRAMID_LEVELS)}]: price=${price:.2f}, " + f"add_size={add_size:.6f} {unit}, add_margin=${add_margin:.2f}, " + f"total_size={pos.size:.6f} {unit}, total_margin=${pos.margin:.2f}, " f"avg_price=${pos.entry_price:.2f}" ) return { + 'symbol': symbol, 'timeframe': tf.value, 'side': pos.side, 'add_price': price, @@ -632,9 +742,9 @@ class MultiTimeframePaperTrader: 'take_profit': take_profit, } - def _check_close_position(self, tf: TimeFrame, current_price: float) -> Optional[Dict]: + def _check_close_position(self, symbol: str, tf: TimeFrame, current_price: float) -> Optional[Dict]: """检查是否触发止盈止损""" - account = self.accounts[tf] + account = self.accounts[symbol][tf] pos = account.position if not pos or pos.side == 'FLAT': @@ -642,20 +752,20 @@ class MultiTimeframePaperTrader: if pos.side == 'LONG': if current_price >= pos.take_profit: - return self._close_position(tf, current_price, 'TAKE_PROFIT') + return self._close_position(symbol, tf, current_price, 'TAKE_PROFIT') elif current_price <= pos.stop_loss: - return self._close_position(tf, current_price, 'STOP_LOSS') + return self._close_position(symbol, tf, current_price, 'STOP_LOSS') else: # SHORT if current_price <= pos.take_profit: - return self._close_position(tf, current_price, 'TAKE_PROFIT') + return self._close_position(symbol, tf, current_price, 'TAKE_PROFIT') elif current_price >= pos.stop_loss: - return self._close_position(tf, current_price, 'STOP_LOSS') + return self._close_position(symbol, tf, current_price, 'STOP_LOSS') return None - def _close_position(self, tf: TimeFrame, price: float, reason: str) -> Dict: + def _close_position(self, symbol: str, tf: TimeFrame, price: float, reason: str) -> Dict: """平仓""" - account = self.accounts[tf] + account = self.accounts[symbol][tf] config = TIMEFRAME_CONFIG[tf] pos = account.position @@ -678,7 +788,7 @@ class MultiTimeframePaperTrader: # 记录交易 trade = Trade( - id=f"{tf.value[0].upper()}{len(account.trades)+1:04d}", + id=f"{symbol[0]}{tf.value[0].upper()}{len(account.trades)+1:04d}", timeframe=tf.value, side=pos.side, entry_price=pos.entry_price, @@ -689,14 +799,16 @@ class MultiTimeframePaperTrader: pnl=pnl, pnl_pct=pnl_pct, exit_reason=reason, + symbol=symbol, ) account.trades.append(trade) - self._update_stats(tf, trade) + self._update_stats(symbol, tf, trade) # 计算新的账户权益 new_equity = account.get_equity() result = { + 'symbol': symbol, 'timeframe': tf.value, 'side': pos.side, 'entry_price': pos.entry_price, @@ -711,7 +823,7 @@ class MultiTimeframePaperTrader: } logger.info( - f"[{config['name']}] CLOSE {pos.side}: entry=${pos.entry_price:.2f}, " + f"[{symbol}][{config['name']}] CLOSE {pos.side}: entry=${pos.entry_price:.2f}, " f"exit=${price:.2f}, PnL=${pnl:.2f} ({pnl_pct:.2f}%), reason={reason}, " f"equity=${new_equity:.2f}" ) @@ -719,9 +831,9 @@ class MultiTimeframePaperTrader: account.position = None return result - def _calc_unrealized_pnl(self, tf: TimeFrame, current_price: float) -> Dict[str, float]: + def _calc_unrealized_pnl(self, symbol: str, tf: TimeFrame, current_price: float) -> Dict[str, float]: """计算未实现盈亏""" - account = self.accounts[tf] + account = self.accounts[symbol][tf] pos = account.position if not pos or pos.side == 'FLAT': @@ -738,10 +850,10 @@ class MultiTimeframePaperTrader: return {'pnl': pnl, 'pnl_pct': pnl_pct} - def _update_equity_curve(self, tf: TimeFrame, current_price: float): + def _update_equity_curve(self, symbol: str, tf: TimeFrame, current_price: float): """更新权益曲线""" - account = self.accounts[tf] - unrealized = self._calc_unrealized_pnl(tf, current_price) + account = self.accounts[symbol][tf] + unrealized = self._calc_unrealized_pnl(symbol, tf, current_price) equity = account.get_equity(unrealized['pnl']) account.equity_curve.append({ @@ -753,9 +865,9 @@ class MultiTimeframePaperTrader: 'price': current_price, }) - def _update_stats(self, tf: TimeFrame, trade: Trade): + def _update_stats(self, symbol: str, tf: TimeFrame, trade: Trade): """更新统计数据""" - account = self.accounts[tf] + account = self.accounts[symbol][tf] stats = account.stats stats['total_trades'] += 1 @@ -789,21 +901,43 @@ class MultiTimeframePaperTrader: if drawdown > stats['max_drawdown']: stats['max_drawdown'] = drawdown - def get_status(self, current_price: float = None) -> Dict[str, Any]: - """获取所有周期状态""" + def get_status( + self, + current_price: float = None, + symbol: str = None, + prices: Dict[str, float] = None + ) -> Dict[str, Any]: + """获取状态 + + Args: + current_price: 单币种价格(向后兼容) + symbol: 指定币种(若为空则返回所有) + prices: 多币种价格 {symbol: price} + """ + # 如果指定了单个币种 + if symbol: + return self._get_symbol_status(symbol, current_price or (prices.get(symbol) if prices else None)) + + # 返回所有币种汇总 + return self._get_all_status(prices or {self.symbols[0]: current_price} if current_price else {}) + + def _get_symbol_status(self, symbol: str, current_price: float = None) -> Dict[str, Any]: + """获取单个币种所有周期状态""" + if symbol not in self.accounts: + return {'error': f'Symbol {symbol} not found'} + total_equity = 0 total_initial = 0 total_realized_pnl = 0 total_unrealized_pnl = 0 - # 先计算各周期数据 timeframes_data = {} for tf in TimeFrame: - account = self.accounts[tf] + account = self.accounts[symbol][tf] config = TIMEFRAME_CONFIG[tf] # 计算未实现盈亏 - unrealized = self._calc_unrealized_pnl(tf, current_price) if current_price else {'pnl': 0, 'pnl_pct': 0} + unrealized = self._calc_unrealized_pnl(symbol, tf, current_price) if current_price else {'pnl': 0, 'pnl_pct': 0} equity = account.get_equity(unrealized['pnl']) total_initial += account.initial_balance @@ -811,12 +945,12 @@ class MultiTimeframePaperTrader: total_unrealized_pnl += unrealized['pnl'] total_equity += equity - # 收益率 = (权益 - 初始本金) / 初始本金 return_pct = (equity - account.initial_balance) / account.initial_balance * 100 if account.initial_balance > 0 else 0 tf_status = { 'name': config['name'], 'name_en': config['name_en'], + 'symbol': symbol, 'initial_balance': account.initial_balance, 'realized_pnl': account.realized_pnl, 'unrealized_pnl': unrealized['pnl'], @@ -841,11 +975,11 @@ class MultiTimeframePaperTrader: timeframes_data[tf.value] = tf_status - # 总收益率 total_return = (total_equity - total_initial) / total_initial * 100 if total_initial > 0 else 0 - status = { + return { 'timestamp': datetime.now().isoformat(), + 'symbol': symbol, 'total_initial_balance': total_initial, 'total_realized_pnl': total_realized_pnl, 'total_unrealized_pnl': total_unrealized_pnl, @@ -854,13 +988,67 @@ class MultiTimeframePaperTrader: 'timeframes': timeframes_data, } - return status + def _get_all_status(self, prices: Dict[str, float] = None) -> Dict[str, Any]: + """获取所有币种汇总状态""" + prices = prices or {} - def reset(self): - """重置所有账户""" - self._init_all_accounts() + grand_total_equity = 0 + grand_total_initial = 0 + grand_total_realized_pnl = 0 + grand_total_unrealized_pnl = 0 + + symbols_data = {} + for symbol in self.symbols: + if symbol not in self.accounts: + continue + + current_price = prices.get(symbol) + symbol_status = self._get_symbol_status(symbol, current_price) + + symbols_data[symbol] = symbol_status + + grand_total_initial += symbol_status.get('total_initial_balance', 0) + grand_total_realized_pnl += symbol_status.get('total_realized_pnl', 0) + grand_total_unrealized_pnl += symbol_status.get('total_unrealized_pnl', 0) + grand_total_equity += symbol_status.get('total_equity', 0) + + grand_total_return = (grand_total_equity - grand_total_initial) / grand_total_initial * 100 if grand_total_initial > 0 else 0 + + # 向后兼容:保留 timeframes 字段(使用第一个币种) + first_symbol = self.symbols[0] if self.symbols else None + legacy_timeframes = symbols_data.get(first_symbol, {}).get('timeframes', {}) if first_symbol else {} + + return { + 'timestamp': datetime.now().isoformat(), + 'symbols': symbols_data, + 'timeframes': legacy_timeframes, # 向后兼容 + 'grand_total_initial_balance': grand_total_initial, + 'grand_total_realized_pnl': grand_total_realized_pnl, + 'grand_total_unrealized_pnl': grand_total_unrealized_pnl, + 'grand_total_equity': grand_total_equity, + 'grand_total_return': grand_total_return, + # 向后兼容字段 + 'total_initial_balance': grand_total_initial, + 'total_realized_pnl': grand_total_realized_pnl, + 'total_unrealized_pnl': grand_total_unrealized_pnl, + 'total_equity': grand_total_equity, + 'total_return': grand_total_return, + } + + def reset(self, symbol: str = None): + """重置账户 + + Args: + symbol: 指定币种,若为空则重置所有 + """ + if symbol: + if symbol in self.accounts: + self._init_symbol_accounts(symbol) + logger.info(f"{symbol} accounts reset") + else: + self._init_all_accounts() + logger.info("All accounts reset") self._save_state() - logger.info("All accounts reset") # 兼容旧的 PaperTrader 接口 diff --git a/web/api.py b/web/api.py index e9e4dc1..67178e5 100644 --- a/web/api.py +++ b/web/api.py @@ -1,10 +1,11 @@ """ -FastAPI Web Service - 多周期交易状态展示 API +FastAPI Web Service - 多币种多周期交易状态展示 API """ import json import asyncio import urllib.request import ssl +import sys from datetime import datetime from pathlib import Path from typing import Dict, Any, List, Optional @@ -13,45 +14,74 @@ from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) +from config.settings import settings + # 状态文件路径 STATE_FILE = Path(__file__).parent.parent / 'output' / 'paper_trading_state.json' SIGNAL_FILE = Path(__file__).parent.parent / 'output' / 'latest_signal.json' +SIGNALS_FILE = Path(__file__).parent.parent / 'output' / 'latest_signals.json' -# Binance API -BINANCE_PRICE_URL = "https://fapi.binance.com/fapi/v1/ticker/price?symbol=BTCUSDT" +# 支持的币种列表 +SYMBOLS = settings.symbols_list + +# Binance API - 多币种价格 +BINANCE_PRICE_BASE_URL = "https://fapi.binance.com/fapi/v1/ticker/price" app = FastAPI(title="Trading Dashboard", version="2.0.0") -# 全局价格缓存 -_current_price: float = 0.0 +# 全局价格缓存 - 多币种 +_current_prices: Dict[str, float] = {} _price_update_time: datetime = None -async def fetch_binance_price() -> Optional[float]: - """从 Binance 获取实时价格(使用标准库)""" - global _current_price, _price_update_time +async def fetch_binance_prices() -> Dict[str, float]: + """从 Binance 获取所有币种实时价格""" + global _current_prices, _price_update_time try: - # 使用线程池执行同步请求,避免阻塞事件循环 loop = asyncio.get_event_loop() - price = await loop.run_in_executor(None, _fetch_price_sync) - if price: - _current_price = price + prices = await loop.run_in_executor(None, _fetch_prices_sync) + if prices: + _current_prices.update(prices) _price_update_time = datetime.now() - return _current_price + return _current_prices except Exception as e: - print(f"Error fetching Binance price: {type(e).__name__}: {e}") - return _current_price if _current_price > 0 else None + print(f"Error fetching Binance prices: {type(e).__name__}: {e}") + return _current_prices -def _fetch_price_sync() -> Optional[float]: - """同步获取价格""" +async def fetch_binance_price(symbol: str = 'BTCUSDT') -> Optional[float]: + """从 Binance 获取单个币种实时价格(向后兼容)""" + prices = await fetch_binance_prices() + return prices.get(symbol) + + +def _fetch_prices_sync() -> Dict[str, float]: + """同步获取所有币种价格""" + prices = {} try: - # 创建 SSL 上下文 ctx = ssl.create_default_context() - req = urllib.request.Request( - BINANCE_PRICE_URL, - headers={'User-Agent': 'Mozilla/5.0'} - ) + for symbol in SYMBOLS: + try: + url = f"{BINANCE_PRICE_BASE_URL}?symbol={symbol}" + req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) + with urllib.request.urlopen(req, timeout=5, context=ctx) as response: + data = json.loads(response.read().decode('utf-8')) + prices[symbol] = float(data['price']) + except Exception as e: + print(f"Fetch {symbol} price error: {type(e).__name__}: {e}") + except Exception as e: + print(f"Sync fetch error: {type(e).__name__}: {e}") + return prices + + +def _fetch_price_sync(symbol: str = 'BTCUSDT') -> Optional[float]: + """同步获取单个币种价格(向后兼容)""" + try: + ctx = ssl.create_default_context() + url = f"{BINANCE_PRICE_BASE_URL}?symbol={symbol}" + req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req, timeout=5, context=ctx) as response: data = json.loads(response.read().decode('utf-8')) return float(data['price']) @@ -92,20 +122,26 @@ def load_trading_state() -> Dict[str, Any]: except Exception as e: print(f"Error loading state: {e}") - # 返回默认状态 + # 返回默认状态 - 多币种格式 + default_symbols = {} + for symbol in SYMBOLS: + default_symbols[symbol] = { + 'short': _default_account('short', 10000, symbol), + 'medium': _default_account('medium', 10000, symbol), + 'long': _default_account('long', 10000, symbol), + } + return { - 'accounts': { - 'short': _default_account('short', 10000), - 'medium': _default_account('medium', 10000), - 'long': _default_account('long', 10000), - }, + 'symbols': default_symbols, + 'accounts': default_symbols.get(SYMBOLS[0], {}) if SYMBOLS else {}, # 向后兼容 'last_updated': None, } -def _default_account(timeframe: str, initial_balance: float) -> Dict: +def _default_account(timeframe: str, initial_balance: float, symbol: str = 'BTCUSDT') -> Dict: return { 'timeframe': timeframe, + 'symbol': symbol, 'initial_balance': initial_balance, 'realized_pnl': 0.0, 'leverage': 10, # 所有周期统一 10 倍杠杆 @@ -124,12 +160,36 @@ def _default_account(timeframe: str, initial_balance: float) -> Dict: } -def load_latest_signal() -> Dict[str, Any]: - """加载最新信号""" +def load_latest_signal(symbol: str = None) -> Dict[str, Any]: + """加载最新信号 + + Args: + symbol: 指定币种,若为空则加载所有 + """ try: - if SIGNAL_FILE.exists(): - with open(SIGNAL_FILE, 'r') as f: - return json.load(f) + if symbol: + # 加载单个币种信号 + symbol_file = Path(__file__).parent.parent / 'output' / f'signal_{symbol.lower()}.json' + if symbol_file.exists(): + with open(symbol_file, 'r') as f: + return json.load(f) + # 降级到旧文件 + elif SIGNAL_FILE.exists(): + with open(SIGNAL_FILE, 'r') as f: + return json.load(f) + else: + # 加载所有币种信号 + if SIGNALS_FILE.exists(): + with open(SIGNALS_FILE, 'r') as f: + return json.load(f) + elif SIGNAL_FILE.exists(): + with open(SIGNAL_FILE, 'r') as f: + data = json.load(f) + # 转换为新格式 + return { + 'timestamp': data.get('timestamp'), + 'symbols': {SYMBOLS[0] if SYMBOLS else 'BTCUSDT': data} + } except Exception as e: print(f"Error loading signal: {e}") return {} @@ -145,29 +205,143 @@ async def root(): @app.get("/api/status") -async def get_status(): - """获取多周期交易状态""" +async def get_status(symbol: str = None): + """获取多币种多周期交易状态 + + Args: + symbol: 指定币种(可选),若为空则返回所有币种汇总 + """ state = load_trading_state() + + # 检查是否是新的多币种格式 + if 'symbols' in state: + return _get_multi_symbol_status(state, symbol) + else: + # 旧格式兼容 + return _get_legacy_status(state) + + +def _get_multi_symbol_status(state: Dict, symbol: str = None) -> Dict: + """处理多币种状态""" + symbols_data = state.get('symbols', {}) + + if symbol: + # 返回单个币种状态 + if symbol not in symbols_data: + return {"error": f"Symbol '{symbol}' not found"} + return _build_symbol_status(symbol, symbols_data[symbol], state.get('last_updated')) + + # 返回所有币种汇总 + grand_total_initial = 0 + grand_total_realized_pnl = 0 + grand_total_equity = 0 + + all_symbols_status = {} + for sym, accounts in symbols_data.items(): + sym_status = _build_symbol_status(sym, accounts, None) + all_symbols_status[sym] = sym_status + + grand_total_initial += sym_status.get('total_initial_balance', 0) + grand_total_realized_pnl += sym_status.get('total_realized_pnl', 0) + grand_total_equity += sym_status.get('total_equity', 0) + + grand_total_return = (grand_total_equity - grand_total_initial) / grand_total_initial * 100 if grand_total_initial > 0 else 0 + + # 向后兼容:保留 timeframes 字段 + first_symbol = SYMBOLS[0] if SYMBOLS else None + legacy_timeframes = all_symbols_status.get(first_symbol, {}).get('timeframes', {}) if first_symbol else {} + + return { + 'timestamp': datetime.now().isoformat(), + 'symbols': all_symbols_status, + 'supported_symbols': SYMBOLS, + 'timeframes': legacy_timeframes, # 向后兼容 + 'grand_total_initial_balance': grand_total_initial, + 'grand_total_realized_pnl': grand_total_realized_pnl, + 'grand_total_equity': grand_total_equity, + 'grand_total_return': grand_total_return, + # 向后兼容字段 + 'total_initial_balance': grand_total_initial, + 'total_realized_pnl': grand_total_realized_pnl, + 'total_equity': grand_total_equity, + 'total_return': grand_total_return, + 'last_updated': state.get('last_updated'), + } + + +def _build_symbol_status(symbol: str, accounts: Dict, last_updated: str = None) -> Dict: + """构建单个币种的状态""" + total_initial = 0 + total_realized_pnl = 0 + total_equity = 0 + + timeframes = {} + for tf_key, acc in accounts.items(): + initial = acc.get('initial_balance', 0) + realized_pnl = acc.get('realized_pnl', 0) + + if 'realized_pnl' not in acc and 'balance' in acc: + realized_pnl = acc['balance'] - initial + + equity = initial + realized_pnl + + position = acc.get('position') + used_margin = position.get('margin', 0) if position else 0 + available_balance = equity - used_margin + + total_initial += initial + total_realized_pnl += realized_pnl + total_equity += equity + + return_pct = (equity - initial) / initial * 100 if initial > 0 else 0 + + timeframes[tf_key] = { + 'name': '短周期' if tf_key == 'short' else '中周期' if tf_key == 'medium' else '长周期', + 'name_en': 'Short-term' if tf_key == 'short' else 'Medium-term' if tf_key == 'medium' else 'Long-term', + 'symbol': symbol, + 'initial_balance': initial, + 'realized_pnl': realized_pnl, + 'equity': equity, + 'available_balance': available_balance, + 'used_margin': used_margin, + 'return_pct': return_pct, + 'leverage': acc.get('leverage', 10), + 'position': position, + 'stats': acc.get('stats', {}), + } + + total_return = (total_equity - total_initial) / total_initial * 100 if total_initial > 0 else 0 + + return { + 'timestamp': datetime.now().isoformat(), + 'symbol': symbol, + 'total_initial_balance': total_initial, + 'total_realized_pnl': total_realized_pnl, + 'total_equity': total_equity, + 'total_return': total_return, + 'timeframes': timeframes, + 'last_updated': last_updated, + } + + +def _get_legacy_status(state: Dict) -> Dict: + """处理旧格式状态(向后兼容)""" accounts = state.get('accounts', {}) total_initial = 0 total_realized_pnl = 0 total_equity = 0 - # 构建各周期状态 timeframes = {} for tf_key, acc in accounts.items(): initial = acc.get('initial_balance', 0) realized_pnl = acc.get('realized_pnl', 0) - # 兼容旧数据格式 if 'realized_pnl' not in acc and 'balance' in acc: realized_pnl = acc['balance'] - initial - # 计算权益(不含未实现盈亏,因为 API 没有实时价格) equity = initial + realized_pnl - # 检查持仓的保证金 position = acc.get('position') used_margin = position.get('margin', 0) if position else 0 available_balance = equity - used_margin @@ -206,17 +380,41 @@ async def get_status(): @app.get("/api/trades") -async def get_trades(timeframe: str = None, limit: int = 50): - """获取交易记录""" +async def get_trades(symbol: str = None, timeframe: str = None, limit: int = 50): + """获取交易记录 + + Args: + symbol: 指定币种(可选) + timeframe: 指定周期(可选) + limit: 返回数量限制 + """ state = load_trading_state() - accounts = state.get('accounts', {}) all_trades = [] - for tf_key, acc in accounts.items(): - if timeframe and tf_key != timeframe: - continue - trades = acc.get('trades', []) - all_trades.extend(trades) + + # 检查是否是新的多币种格式 + if 'symbols' in state: + symbols_data = state.get('symbols', {}) + for sym, accounts in symbols_data.items(): + if symbol and sym != symbol: + continue + for tf_key, acc in accounts.items(): + if timeframe and tf_key != timeframe: + continue + trades = acc.get('trades', []) + # 确保每个交易都有 symbol 字段 + for trade in trades: + if 'symbol' not in trade: + trade['symbol'] = sym + all_trades.extend(trades) + else: + # 旧格式 + accounts = state.get('accounts', {}) + for tf_key, acc in accounts.items(): + if timeframe and tf_key != timeframe: + continue + trades = acc.get('trades', []) + all_trades.extend(trades) # 按时间排序 all_trades.sort(key=lambda x: x.get('exit_time', ''), reverse=True) @@ -228,17 +426,39 @@ async def get_trades(timeframe: str = None, limit: int = 50): @app.get("/api/equity") -async def get_equity_curve(timeframe: str = None, limit: int = 500): - """获取权益曲线""" +async def get_equity_curve(symbol: str = None, timeframe: str = None, limit: int = 500): + """获取权益曲线 + + Args: + symbol: 指定币种(可选) + timeframe: 指定周期(可选) + limit: 返回数量限制 + """ state = load_trading_state() - accounts = state.get('accounts', {}) result = {} - for tf_key, acc in accounts.items(): - if timeframe and tf_key != timeframe: - continue - equity_curve = acc.get('equity_curve', []) - result[tf_key] = equity_curve[-limit:] if limit > 0 else equity_curve + + if 'symbols' in state: + symbols_data = state.get('symbols', {}) + for sym, accounts in symbols_data.items(): + if symbol and sym != symbol: + continue + sym_result = {} + for tf_key, acc in accounts.items(): + if timeframe and tf_key != timeframe: + continue + equity_curve = acc.get('equity_curve', []) + sym_result[tf_key] = equity_curve[-limit:] if limit > 0 else equity_curve + if sym_result: + result[sym] = sym_result + else: + # 旧格式 + accounts = state.get('accounts', {}) + for tf_key, acc in accounts.items(): + if timeframe and tf_key != timeframe: + continue + equity_curve = acc.get('equity_curve', []) + result[tf_key] = equity_curve[-limit:] if limit > 0 else equity_curve return { 'data': result, @@ -246,19 +466,46 @@ async def get_equity_curve(timeframe: str = None, limit: int = 500): @app.get("/api/signal") -async def get_signal(): - """获取最新信号""" - signal = load_latest_signal() +async def get_signal(symbol: str = None): + """获取最新信号 + Args: + symbol: 指定币种(可选),若为空则返回所有 + """ + if symbol: + # 加载单个币种信号 + signal = load_latest_signal(symbol) + return _format_signal_response(signal, symbol) + else: + # 加载所有币种信号 + all_signals = load_latest_signal() + + if 'symbols' in all_signals: + # 新的多币种格式 + result = { + 'timestamp': all_signals.get('timestamp'), + 'symbols': {}, + 'supported_symbols': SYMBOLS, + } + for sym, sig_data in all_signals.get('symbols', {}).items(): + result['symbols'][sym] = _format_signal_response(sig_data, sym) + return result + else: + # 旧格式 + return _format_signal_response(all_signals, SYMBOLS[0] if SYMBOLS else 'BTCUSDT') + + +def _format_signal_response(signal: Dict, symbol: str) -> Dict: + """格式化信号响应""" agg = signal.get('aggregated_signal', {}) llm = agg.get('llm_signal', {}) market = signal.get('market_analysis', {}) - # 提取各周期机会 opportunities = llm.get('opportunities', {}) return { - 'timestamp': agg.get('timestamp'), + 'symbol': symbol, + 'timestamp': signal.get('timestamp') or agg.get('timestamp'), 'final_signal': agg.get('final_signal'), 'final_confidence': agg.get('final_confidence'), 'current_price': agg.get('levels', {}).get('current_price') or market.get('price'), @@ -273,24 +520,40 @@ async def get_signal(): @app.get("/api/timeframe/{timeframe}") -async def get_timeframe_detail(timeframe: str): - """获取单个周期详情""" +async def get_timeframe_detail(timeframe: str, symbol: str = None): + """获取单个周期详情 + + Args: + timeframe: 周期 (short, medium, long) + symbol: 指定币种(可选),默认第一个 + """ state = load_trading_state() - accounts = state.get('accounts', {}) + symbol = symbol or (SYMBOLS[0] if SYMBOLS else 'BTCUSDT') + + if 'symbols' in state: + symbols_data = state.get('symbols', {}) + if symbol not in symbols_data: + return {"error": f"Symbol '{symbol}' not found"} + accounts = symbols_data[symbol] + else: + accounts = state.get('accounts', {}) if timeframe not in accounts: return {"error": f"Timeframe '{timeframe}' not found"} acc = accounts[timeframe] initial = acc.get('initial_balance', 0) - balance = acc.get('balance', 0) + realized_pnl = acc.get('realized_pnl', 0) + equity = initial + realized_pnl return { + 'symbol': symbol, 'timeframe': timeframe, - 'balance': balance, + 'equity': equity, 'initial_balance': initial, - 'return_pct': (balance - initial) / initial * 100 if initial > 0 else 0, - 'leverage': acc.get('leverage', 1), + 'realized_pnl': realized_pnl, + 'return_pct': (equity - initial) / initial * 100 if initial > 0 else 0, + 'leverage': acc.get('leverage', 10), 'position': acc.get('position'), 'stats': acc.get('stats', {}), 'recent_trades': acc.get('trades', [])[-20:], @@ -298,14 +561,25 @@ async def get_timeframe_detail(timeframe: str): } +@app.get("/api/prices") +async def get_prices(): + """获取所有币种实时价格""" + prices = await fetch_binance_prices() + return { + 'timestamp': datetime.now().isoformat(), + 'prices': prices, + 'supported_symbols': SYMBOLS, + } + + @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): - """WebSocket 实时推送""" + """WebSocket 实时推送 - 支持多币种""" await manager.connect(websocket) try: - # 获取初始实时价格 - current_price = await fetch_binance_price() + # 获取所有币种初始实时价格 + current_prices = await fetch_binance_prices() # 发送初始状态 state = load_trading_state() @@ -314,32 +588,41 @@ async def websocket_endpoint(websocket: WebSocket): 'type': 'init', 'state': state, 'signal': signal, - 'current_price': current_price, + 'prices': current_prices, + 'current_price': current_prices.get(SYMBOLS[0]) if SYMBOLS else None, # 向后兼容 + 'supported_symbols': SYMBOLS, }) # 持续推送更新 last_state_mtime = STATE_FILE.stat().st_mtime if STATE_FILE.exists() else 0 last_signal_mtime = SIGNAL_FILE.stat().st_mtime if SIGNAL_FILE.exists() else 0 - last_price = current_price - price_update_counter = 0 + last_signals_mtime = SIGNALS_FILE.stat().st_mtime if SIGNALS_FILE.exists() else 0 + last_prices = current_prices.copy() while True: await asyncio.sleep(1) - price_update_counter += 1 - # 每秒获取实时价格并推送 - current_price = await fetch_binance_price() - if current_price and current_price != last_price: - last_price = current_price + # 每秒获取所有币种实时价格并推送 + current_prices = await fetch_binance_prices() + price_changed = False + for sym, price in current_prices.items(): + if price and price != last_prices.get(sym): + price_changed = True + break + + if price_changed: + last_prices = current_prices.copy() await websocket.send_json({ 'type': 'price_update', - 'current_price': current_price, + 'prices': current_prices, + 'current_price': current_prices.get(SYMBOLS[0]) if SYMBOLS else None, # 向后兼容 'timestamp': datetime.now().isoformat(), }) # 检查状态文件更新 current_state_mtime = STATE_FILE.stat().st_mtime if STATE_FILE.exists() else 0 current_signal_mtime = SIGNAL_FILE.stat().st_mtime if SIGNAL_FILE.exists() else 0 + current_signals_mtime = SIGNALS_FILE.stat().st_mtime if SIGNALS_FILE.exists() else 0 if current_state_mtime > last_state_mtime: last_state_mtime = current_state_mtime @@ -349,8 +632,12 @@ async def websocket_endpoint(websocket: WebSocket): 'state': state, }) - if current_signal_mtime > last_signal_mtime: + # 检查信号文件更新(新格式或旧格式) + signal_updated = (current_signal_mtime > last_signal_mtime or + current_signals_mtime > last_signals_mtime) + if signal_updated: last_signal_mtime = current_signal_mtime + last_signals_mtime = current_signals_mtime signal = load_latest_signal() await websocket.send_json({ 'type': 'signal_update', diff --git a/web/static/index.html b/web/static/index.html index ccb667b..d602f67 100644 --- a/web/static/index.html +++ b/web/static/index.html @@ -71,7 +71,7 @@

AI Quant Trading

-

BTC/USDT Perpetual • Multi-Timeframe • Powered by Quantitative Analysis & AI

+

Multi-Symbol • Multi-Timeframe • Powered by Quantitative Analysis & AI

@@ -82,20 +82,29 @@
- +
+ +
+ + +
Total Balance
-
$30,000.00
+
$60,000.00
Total Return
+0.00%
-
Price
+
BTC Price
$0.00
@@ -229,6 +238,7 @@ + @@ -239,7 +249,7 @@ - +
Symbol TF Side Entry
No trades yet
No trades yet
@@ -257,10 +267,14 @@