This commit is contained in:
aaron 2025-12-09 23:35:46 +08:00
parent 020808f69f
commit 151c9cb6fd
3 changed files with 652 additions and 98 deletions

View File

@ -0,0 +1,332 @@
"""
Volatility Monitor - 实时监控价格波动率
当波动率突然升高时触发分析防止漏掉突发行情
"""
import asyncio
import logging
import json
import urllib.request
import ssl
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass, field
from collections import deque
logger = logging.getLogger(__name__)
@dataclass
class PricePoint:
"""价格数据点"""
timestamp: datetime
price: float
volume: float = 0.0
@dataclass
class VolatilityState:
"""单个币种的波动率状态"""
symbol: str
prices: deque = field(default_factory=lambda: deque(maxlen=120)) # 保存最近120个数据点
last_trigger_time: datetime = None
baseline_volatility: float = 0.0 # 基准波动率
current_volatility: float = 0.0 # 当前波动率
def add_price(self, price: float, volume: float = 0.0):
"""添加价格点"""
self.prices.append(PricePoint(
timestamp=datetime.now(),
price=price,
volume=volume
))
def get_prices_in_window(self, minutes: int) -> List[float]:
"""获取指定时间窗口内的价格"""
cutoff = datetime.now() - timedelta(minutes=minutes)
return [p.price for p in self.prices if p.timestamp >= cutoff]
class VolatilityMonitor:
"""
波动率监控器
监控策略:
1. 价格变化率: 短期价格变化超过阈值
2. 波动率突增: 当前波动率 vs 基准波动率
3. 连续大幅波动: 连续多个周期出现大幅波动
"""
# Binance API
BINANCE_PRICE_URL = "https://fapi.binance.com/fapi/v1/ticker/price"
BINANCE_TICKER_URL = "https://fapi.binance.com/fapi/v1/ticker/24hr"
def __init__(
self,
symbols: List[str],
on_volatility_spike: Callable[[str, Dict], Any] = None,
check_interval: int = 5, # 检查间隔(秒)
price_change_threshold: float = 0.5, # 价格变化阈值 0.5%
volatility_multiplier: float = 2.0, # 波动率突增倍数
cooldown_minutes: int = 3, # 触发后冷却时间(分钟)
):
"""
Args:
symbols: 监控的交易对列表
on_volatility_spike: 波动率突增时的回调函数
check_interval: 检查间隔
price_change_threshold: 价格变化阈值百分比
volatility_multiplier: 波动率突增倍数阈值
cooldown_minutes: 触发后冷却时间分钟
"""
self.symbols = symbols
self.on_volatility_spike = on_volatility_spike
self.check_interval = check_interval
self.price_change_threshold = price_change_threshold
self.volatility_multiplier = volatility_multiplier
self.cooldown_minutes = cooldown_minutes
# 每个币种的状态
self.states: Dict[str, VolatilityState] = {
symbol: VolatilityState(symbol=symbol) for symbol in symbols
}
self.is_running = False
self._ssl_context = ssl.create_default_context()
logger.info(
f"VolatilityMonitor 初始化: {len(symbols)} 个币种, "
f"检查间隔 {check_interval}s, 价格阈值 {price_change_threshold}%, "
f"波动率倍数 {volatility_multiplier}x, 冷却时间 {cooldown_minutes}min"
)
async def start(self):
"""启动监控"""
self.is_running = True
logger.info("VolatilityMonitor 启动")
# 先获取初始价格建立基准
await self._initialize_baseline()
while self.is_running:
try:
await self._check_volatility()
await asyncio.sleep(self.check_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"波动率检查错误: {e}", exc_info=True)
await asyncio.sleep(self.check_interval)
logger.info("VolatilityMonitor 已停止")
def stop(self):
"""停止监控"""
self.is_running = False
async def _initialize_baseline(self):
"""初始化基准波动率"""
logger.info("正在建立波动率基准...")
# 获取24小时统计数据来估算基准波动率
for symbol in self.symbols:
try:
ticker = await self._fetch_24h_ticker(symbol)
if ticker:
# 使用24小时价格变化百分比作为基准
price_change_pct = abs(float(ticker.get('priceChangePercent', 0)))
# 转换为每分钟的预期波动率
self.states[symbol].baseline_volatility = price_change_pct / (24 * 60) * 5 # 5分钟窗口
logger.info(
f"[{symbol}] 24h变化: {price_change_pct:.2f}%, "
f"基准波动率: {self.states[symbol].baseline_volatility:.4f}%/5min"
)
except Exception as e:
logger.error(f"[{symbol}] 获取基准数据失败: {e}")
self.states[symbol].baseline_volatility = 0.1 # 默认值
# 收集一些初始价格数据
for _ in range(6): # 收集30秒的数据
prices = await self._fetch_prices()
for symbol, price in prices.items():
if symbol in self.states:
self.states[symbol].add_price(price)
await asyncio.sleep(5)
logger.info("波动率基准建立完成")
async def _check_volatility(self):
"""检查所有币种的波动率"""
prices = await self._fetch_prices()
for symbol, price in prices.items():
if symbol not in self.states:
continue
state = self.states[symbol]
state.add_price(price)
# 检查是否在冷却期
if state.last_trigger_time:
elapsed = (datetime.now() - state.last_trigger_time).total_seconds() / 60
if elapsed < self.cooldown_minutes:
continue
# 检查波动率
spike_info = self._detect_volatility_spike(state)
if spike_info:
logger.warning(
f"[{symbol}] 波动率突增! "
f"类型: {spike_info['type']}, "
f"变化: {spike_info['change_pct']:.2f}%, "
f"当前价: ${price:,.2f}"
)
state.last_trigger_time = datetime.now()
# 触发回调
if self.on_volatility_spike:
try:
result = self.on_volatility_spike(symbol, spike_info)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error(f"[{symbol}] 波动率回调执行失败: {e}")
def _detect_volatility_spike(self, state: VolatilityState) -> Optional[Dict]:
"""
检测波动率突增
返回:
触发信息字典如果未触发返回 None
"""
if len(state.prices) < 10:
return None
prices = [p.price for p in state.prices]
current_price = prices[-1]
# 1. 检查短期价格变化 (1分钟)
prices_1min = state.get_prices_in_window(1)
if len(prices_1min) >= 2:
min_price = min(prices_1min)
max_price = max(prices_1min)
change_1min = (max_price - min_price) / min_price * 100
if change_1min >= self.price_change_threshold:
direction = "UP" if current_price >= prices_1min[0] else "DOWN"
return {
'type': 'rapid_move',
'timeframe': '1min',
'change_pct': change_1min,
'direction': direction,
'current_price': current_price,
'trigger_time': datetime.now().isoformat(),
}
# 2. 检查中期价格变化 (5分钟)
prices_5min = state.get_prices_in_window(5)
if len(prices_5min) >= 10:
min_price = min(prices_5min)
max_price = max(prices_5min)
change_5min = (max_price - min_price) / min_price * 100
# 5分钟窗口使用更高的阈值
if change_5min >= self.price_change_threshold * 2:
direction = "UP" if current_price >= prices_5min[0] else "DOWN"
return {
'type': 'sustained_move',
'timeframe': '5min',
'change_pct': change_5min,
'direction': direction,
'current_price': current_price,
'trigger_time': datetime.now().isoformat(),
}
# 3. 检查波动率突增
if len(prices_5min) >= 10 and state.baseline_volatility > 0:
# 计算当前波动率 (标准差 / 均值 * 100)
import statistics
mean_price = statistics.mean(prices_5min)
std_price = statistics.stdev(prices_5min)
current_volatility = (std_price / mean_price) * 100
state.current_volatility = current_volatility
if current_volatility >= state.baseline_volatility * self.volatility_multiplier:
return {
'type': 'volatility_spike',
'timeframe': '5min',
'change_pct': current_volatility,
'baseline_pct': state.baseline_volatility,
'multiplier': current_volatility / state.baseline_volatility,
'current_price': current_price,
'trigger_time': datetime.now().isoformat(),
}
return None
async def _fetch_prices(self) -> Dict[str, float]:
"""获取所有币种的当前价格"""
prices = {}
loop = asyncio.get_event_loop()
try:
for symbol in self.symbols:
price = await loop.run_in_executor(None, self._fetch_price_sync, symbol)
if price:
prices[symbol] = price
except Exception as e:
logger.error(f"获取价格失败: {e}")
return prices
def _fetch_price_sync(self, symbol: str) -> Optional[float]:
"""同步获取单个币种价格"""
try:
url = f"{self.BINANCE_PRICE_URL}?symbol={symbol}"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=5, context=self._ssl_context) as response:
data = json.loads(response.read().decode('utf-8'))
return float(data['price'])
except Exception as e:
logger.debug(f"获取 {symbol} 价格失败: {e}")
return None
async def _fetch_24h_ticker(self, symbol: str) -> Optional[Dict]:
"""获取24小时统计数据"""
loop = asyncio.get_event_loop()
try:
return await loop.run_in_executor(None, self._fetch_24h_ticker_sync, symbol)
except Exception as e:
logger.error(f"获取 {symbol} 24h数据失败: {e}")
return None
def _fetch_24h_ticker_sync(self, symbol: str) -> Optional[Dict]:
"""同步获取24小时统计"""
try:
url = f"{self.BINANCE_TICKER_URL}?symbol={symbol}"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=5, context=self._ssl_context) as response:
return json.loads(response.read().decode('utf-8'))
except Exception as e:
logger.debug(f"获取 {symbol} 24h数据失败: {e}")
return None
def get_status(self) -> Dict[str, Any]:
"""获取监控状态"""
status = {
'is_running': self.is_running,
'symbols': {},
}
for symbol, state in self.states.items():
prices = [p.price for p in state.prices]
status['symbols'][symbol] = {
'price_count': len(prices),
'current_price': prices[-1] if prices else None,
'baseline_volatility': state.baseline_volatility,
'current_volatility': state.current_volatility,
'last_trigger': state.last_trigger_time.isoformat() if state.last_trigger_time else None,
}
return status

View File

@ -3,6 +3,10 @@ Signal Generation Scheduler - 定时生成交易信号
每隔指定时间间隔自动运行量化分析和LLM决策
支持多币种: BTC/USDT, ETH/USDT
特性:
- 定时分析: 每隔指定时间自动分析
- 波动率触发: 当价格波动率突增时自动触发分析
"""
import asyncio
import logging
@ -19,6 +23,7 @@ sys.path.insert(0, str(Path(__file__).parent))
from config.settings import settings
from analysis.engine import MarketAnalysisEngine
from analysis.volatility_monitor import VolatilityMonitor
from signals.quantitative import QuantitativeSignalGenerator
from signals.llm_decision import LLMDecisionMaker
from signals.aggregator import SignalAggregator
@ -33,16 +38,27 @@ logger = logging.getLogger(__name__)
class SignalScheduler:
"""定时信号生成调度器 - 支持多币种"""
"""定时信号生成调度器 - 支持多币种 + 波动率触发"""
def __init__(self, interval_minutes: int = 5, symbols: List[str] = None):
def __init__(
self,
interval_minutes: int = 5,
symbols: List[str] = None,
enable_volatility_trigger: bool = True,
volatility_threshold: float = 0.5, # 波动率阈值 0.5%
volatility_cooldown: int = 3, # 波动率触发冷却时间(分钟)
):
"""
Args:
interval_minutes: 生成信号的时间间隔分钟
symbols: 交易对列表 ['BTCUSDT', 'ETHUSDT']
enable_volatility_trigger: 是否启用波动率触发
volatility_threshold: 波动率触发阈值百分比
volatility_cooldown: 波动率触发后的冷却时间分钟
"""
self.interval_minutes = interval_minutes
self.is_running = False
self.enable_volatility_trigger = enable_volatility_trigger
# 支持多币种
self.symbols = symbols or settings.symbols_list
@ -66,7 +82,26 @@ class SignalScheduler:
enabled=bool(dingtalk_webhook)
)
logger.info(f"Signal Scheduler 初始化完成 - 每{interval_minutes}分钟生成一次信号")
# 波动率监控器
self.volatility_monitor = None
if enable_volatility_trigger:
self.volatility_monitor = VolatilityMonitor(
symbols=self.symbols,
on_volatility_spike=self._on_volatility_spike,
check_interval=5, # 每5秒检查一次
price_change_threshold=volatility_threshold,
volatility_multiplier=2.0,
cooldown_minutes=volatility_cooldown,
)
# 用于追踪触发的分析任务
self._pending_analysis: Dict[str, bool] = {}
self._analysis_lock = asyncio.Lock()
logger.info(
f"Signal Scheduler 初始化完成 - 定时: 每{interval_minutes}分钟, "
f"波动率触发: {'启用' if enable_volatility_trigger else '禁用'}"
)
async def generate_signal_for_symbol(self, symbol: str) -> Dict:
"""为单个币种生成信号"""
@ -207,29 +242,84 @@ class SignalScheduler:
except Exception as e:
logger.error(f"[{symbol}] 钉钉通知发送异常: {e}", exc_info=True)
async def _on_volatility_spike(self, symbol: str, spike_info: Dict):
"""波动率突增时的回调处理"""
logger.info(
f"🚨 [{symbol}] 波动率触发! "
f"类型: {spike_info['type']}, "
f"变化: {spike_info['change_pct']:.2f}%, "
f"方向: {spike_info.get('direction', 'N/A')}"
)
# 避免重复触发
async with self._analysis_lock:
if self._pending_analysis.get(symbol):
logger.info(f"[{symbol}] 已有分析任务在执行,跳过")
return
self._pending_analysis[symbol] = True
try:
# 为该币种生成信号
logger.info(f"🔄 [{symbol}] 波动率触发分析开始...")
result = await self.generate_signal_for_symbol(symbol)
if result:
# 保存独立信号文件(已在 generate_signal_for_symbol 中完成)
# 发送通知(标记为波动率触发)
aggregated = result.get('aggregated_signal', {})
aggregated['trigger_type'] = 'volatility_spike'
aggregated['spike_info'] = spike_info
await self._send_notification(symbol, result)
logger.info(f"✅ [{symbol}] 波动率触发分析完成")
else:
logger.warning(f"[{symbol}] 波动率触发分析失败")
finally:
async with self._analysis_lock:
self._pending_analysis[symbol] = False
async def run(self):
"""启动调度器主循环"""
self.is_running = True
logger.info(f"Signal Scheduler 启动 - 每{self.interval_minutes}分钟生成信号")
# 立即生成一次
await self.generate_signal_once()
# 启动波动率监控器(如果启用)
volatility_task = None
if self.volatility_monitor:
volatility_task = asyncio.create_task(self.volatility_monitor.start())
logger.info("波动率监控器已启动")
# 定时循环
while self.is_running:
try:
# 等待指定时间间隔
await asyncio.sleep(self.interval_minutes * 60)
try:
# 立即生成一次
await self.generate_signal_once()
# 生成信号
await self.generate_signal_once()
# 定时循环
while self.is_running:
try:
# 等待指定时间间隔
await asyncio.sleep(self.interval_minutes * 60)
except asyncio.CancelledError:
logger.info("调度器收到取消信号")
break
except Exception as e:
logger.error(f"调度器错误: {e}", exc_info=True)
await asyncio.sleep(60) # 错误后等待1分钟再继续
# 生成信号
await self.generate_signal_once()
except asyncio.CancelledError:
logger.info("调度器收到取消信号")
break
except Exception as e:
logger.error(f"调度器错误: {e}", exc_info=True)
await asyncio.sleep(60) # 错误后等待1分钟再继续
finally:
# 停止波动率监控器
if self.volatility_monitor:
self.volatility_monitor.stop()
if volatility_task:
volatility_task.cancel()
try:
await volatility_task
except asyncio.CancelledError:
pass
logger.info("Signal Scheduler 已停止")
@ -240,11 +330,27 @@ class SignalScheduler:
async def main():
"""主入口"""
# 从环境变量或默认值获取间隔
import os
# 从环境变量获取配置
interval = int(os.getenv('SIGNAL_INTERVAL_MINUTES', '5'))
enable_volatility = os.getenv('ENABLE_VOLATILITY_TRIGGER', 'true').lower() == 'true'
volatility_threshold = float(os.getenv('VOLATILITY_THRESHOLD', '0.5'))
volatility_cooldown = int(os.getenv('VOLATILITY_COOLDOWN_MINUTES', '3'))
scheduler = SignalScheduler(interval_minutes=interval)
logger.info("=" * 60)
logger.info("Signal Scheduler 配置:")
logger.info(f" 定时间隔: {interval} 分钟")
logger.info(f" 波动率触发: {'启用' if enable_volatility else '禁用'}")
if enable_volatility:
logger.info(f" 波动率阈值: {volatility_threshold}%")
logger.info(f" 触发冷却: {volatility_cooldown} 分钟")
logger.info("=" * 60)
scheduler = SignalScheduler(
interval_minutes=interval,
enable_volatility_trigger=enable_volatility,
volatility_threshold=volatility_threshold,
volatility_cooldown=volatility_cooldown,
)
# Setup signal handlers for graceful shutdown
def signal_handler(sig, _frame):

View File

@ -2,6 +2,9 @@
Realtime Trading - 基于 WebSocket 实时数据的多周期交易
使用 Binance WebSocket 获取实时价格结合信号进行多周期独立交易
支持多币种: BTC/USDT, ETH/USDT
每个币种每个周期独立:
- 短周期 (5m/15m/1h)
- 中周期 (4h/1d)
- 长周期 (1d/1w)
@ -12,21 +15,22 @@ import logging
import signal
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Callable
from typing import Dict, Any, Optional, Callable, List
import websockets
from .paper_trading import MultiTimeframePaperTrader, TimeFrame, TIMEFRAME_CONFIG
from config.settings import settings
logger = logging.getLogger(__name__)
class RealtimeTrader:
"""实时多周期交易器"""
"""实时多周期交易器 - 支持多币种"""
def __init__(
self,
symbol: str = "btcusdt",
symbols: List[str] = None,
initial_balance: float = 10000.0,
signal_check_interval: int = 60,
):
@ -34,44 +38,56 @@ class RealtimeTrader:
初始化实时交易器
Args:
symbol: 交易对 (小写)
initial_balance: 初始资金 (分配给三个周期)
symbols: 交易对列表 ['BTCUSDT', 'ETHUSDT']
initial_balance: 每个周期的初始资金
signal_check_interval: 信号检查间隔()
"""
self.symbol = symbol.lower()
# 支持多币种
self.symbols = symbols or settings.symbols_list
self.signal_check_interval = signal_check_interval
# WebSocket URL
self.ws_url = f"wss://fstream.binance.com/ws/{self.symbol}@aggTrade"
# WebSocket URLs - 多币种
self.ws_streams = [f"{sym.lower()}@aggTrade" for sym in self.symbols]
self.ws_url = f"wss://fstream.binance.com/stream?streams={'/'.join(self.ws_streams)}"
# 多周期交易器
self.trader = MultiTimeframePaperTrader(initial_balance=initial_balance)
# 多币种多周期交易器
self.trader = MultiTimeframePaperTrader(
initial_balance=initial_balance,
symbols=self.symbols
)
# 状态
self.current_price = 0.0
# 状态 - 多币种价格
self.current_prices: Dict[str, float] = {sym: 0.0 for sym in self.symbols}
self.last_signal_check = 0
self.is_running = False
self.ws = None
# 信号文件路径
self.signal_file = Path(__file__).parent.parent / 'output' / 'latest_signal.json'
self.signal_dir = Path(__file__).parent.parent / 'output'
self.signal_file = self.signal_dir / 'latest_signal.json' # 向后兼容
# 回调函数
self.on_trade_callback: Optional[Callable] = None
self.on_price_callback: Optional[Callable] = None
logger.info(f"RealtimeTrader 初始化: {len(self.symbols)} 个币种")
async def start(self):
"""启动实时交易"""
self.is_running = True
logger.info(f"Starting multi-timeframe realtime trader for {self.symbol.upper()}")
logger.info(f"Starting multi-symbol multi-timeframe realtime trader")
logger.info(f"Symbols: {', '.join(self.symbols)}")
logger.info(f"WebSocket URL: {self.ws_url}")
logger.info(f"Signal check interval: {self.signal_check_interval}s")
for tf in TimeFrame:
config = TIMEFRAME_CONFIG[tf]
account = self.trader.accounts[tf]
equity = account.get_equity()
logger.info(f" [{config['name_en']}] Equity: ${equity:.2f}, Leverage: {account.leverage}x")
# 打印各币种各周期状态
for symbol in self.symbols:
logger.info(f" [{symbol}]:")
for tf in TimeFrame:
config = TIMEFRAME_CONFIG[tf]
account = self.trader.accounts[symbol][tf]
equity = account.get_equity()
logger.info(f" [{config['name_en']}] Equity: ${equity:.2f}, Leverage: {account.leverage}x")
while self.is_running:
try:
@ -104,47 +120,116 @@ class RealtimeTrader:
async def _process_tick(self, data: Dict[str, Any]):
"""处理每个 tick 数据"""
self.current_price = float(data.get('p', 0))
# 多币种 WebSocket 格式: {"stream": "btcusdt@aggTrade", "data": {...}}
stream = data.get('stream', '')
tick_data = data.get('data', data) # 兼容单流和多流格式
if self.current_price <= 0:
# 从 stream 名称解析币种
symbol = None
for sym in self.symbols:
if sym.lower() in stream.lower():
symbol = sym
break
if not symbol:
# 尝试从数据中获取
symbol = tick_data.get('s', '').upper()
if symbol not in self.symbols:
symbol = self.symbols[0] if self.symbols else None
if not symbol:
return
price = float(tick_data.get('p', 0))
if price <= 0:
return
# 更新该币种价格
self.current_prices[symbol] = price
if self.on_price_callback:
self.on_price_callback(self.current_price)
self.on_price_callback(symbol, price)
# 检查各周期止盈止损
# 检查该币种各周期止盈止损
for tf in TimeFrame:
account = self.trader.accounts[tf]
account = self.trader.accounts[symbol][tf]
if account.position and account.position.side != 'FLAT':
close_result = self.trader._check_close_position(tf, self.current_price)
close_result = self.trader._check_close_position(symbol, tf, price)
if close_result:
self._on_position_closed(tf, close_result)
self._on_position_closed(symbol, tf, close_result)
# 定期检查信号
# 定期检查信号 (所有币种)
now = asyncio.get_event_loop().time()
if now - self.last_signal_check >= self.signal_check_interval:
self.last_signal_check = now
await self._check_and_execute_signal()
async def _check_and_execute_signal(self):
"""检查信号并执行交易"""
signal_data = self._load_latest_signal()
"""检查信号并执行交易 - 所有币种"""
# 加载所有币种信号
all_signals = self._load_all_signals()
if not signal_data:
return
for symbol in self.symbols:
signal_data = all_signals.get(symbol)
if not signal_data:
continue
results = self.trader.process_signal(signal_data, self.current_price)
price = self.current_prices.get(symbol, 0)
if price <= 0:
continue
# 处理各周期结果
for tf_value, result in results.get('timeframes', {}).items():
if result['action'] in ['OPEN', 'CLOSE', 'REVERSE']:
tf = TimeFrame(tf_value)
self._on_trade_executed(tf, result)
results = self.trader.process_signal(signal_data, price, symbol=symbol)
# 处理各周期结果
for tf_value, result in results.get('timeframes', {}).items():
if result['action'] in ['OPEN', 'CLOSE', 'REVERSE', 'ADD']:
tf = TimeFrame(tf_value)
self._on_trade_executed(symbol, tf, result)
self._print_status()
def _load_all_signals(self) -> Dict[str, Dict[str, Any]]:
"""加载所有币种的最新信号"""
signals = {}
# 尝试加载合并信号文件
signals_file = self.signal_dir / 'latest_signals.json'
if signals_file.exists():
try:
with open(signals_file, 'r') as f:
data = json.load(f)
if 'symbols' in data:
signals = data['symbols']
except Exception as e:
logger.error(f"Error loading signals file: {e}")
# 加载各币种独立信号文件
for symbol in self.symbols:
if symbol in signals:
continue
symbol_file = self.signal_dir / f'signal_{symbol.lower()}.json'
if symbol_file.exists():
try:
with open(symbol_file, 'r') as f:
signals[symbol] = json.load(f)
except Exception as e:
logger.error(f"Error loading {symbol} signal: {e}")
# 向后兼容: 使用旧格式的 latest_signal.json
if not signals and self.signal_file.exists():
try:
with open(self.signal_file, 'r') as f:
data = json.load(f)
first_symbol = self.symbols[0] if self.symbols else 'BTCUSDT'
signals[first_symbol] = data
except Exception as e:
logger.error(f"Error loading legacy signal: {e}")
return signals
def _load_latest_signal(self) -> Optional[Dict[str, Any]]:
"""加载最新信号"""
"""加载最新信号 (向后兼容)"""
try:
if not self.signal_file.exists():
return None
@ -155,26 +240,36 @@ class RealtimeTrader:
logger.error(f"Error loading signal: {e}")
return None
def _on_trade_executed(self, tf: TimeFrame, result: Dict[str, Any]):
def _on_trade_executed(self, symbol: str, tf: TimeFrame, result: Dict[str, Any]):
"""交易执行回调"""
config = TIMEFRAME_CONFIG[tf]
action = result['action']
details = result.get('details', {})
unit = symbol.replace('USDT', '') if symbol.endswith('USDT') else symbol
if action == 'OPEN':
logger.info("=" * 60)
logger.info(f"🟢 [{config['name_en']}] OPEN {details['side']}")
logger.info(f"🟢 [{symbol}][{config['name_en']}] OPEN {details['side']}")
logger.info(f" Entry: ${details['entry_price']:.2f}")
logger.info(f" Size: {details['size']:.6f} BTC")
logger.info(f" Size: {details['size']:.6f} {unit}")
logger.info(f" Stop Loss: ${details['stop_loss']:.2f}")
logger.info(f" Take Profit: ${details['take_profit']:.2f}")
logger.info("=" * 60)
elif action == 'ADD':
logger.info("=" * 60)
logger.info(f"📈 [{symbol}][{config['name_en']}] ADD {details['side']} [L{details['pyramid_level']}/{details['max_levels']}]")
logger.info(f" Add Price: ${details['add_price']:.2f}")
logger.info(f" Add Size: {details['add_size']:.6f} {unit}")
logger.info(f" Total Size: {details['total_size']:.6f} {unit}")
logger.info(f" Avg Entry: ${details['avg_entry_price']:.2f}")
logger.info("=" * 60)
elif action == 'CLOSE':
pnl = details.get('pnl', 0)
pnl_icon = "🟢" if pnl > 0 else "🔴"
logger.info("=" * 60)
logger.info(f"{pnl_icon} [{config['name_en']}] CLOSE {details['side']}")
logger.info(f"{pnl_icon} [{symbol}][{config['name_en']}] CLOSE {details['side']}")
logger.info(f" Entry: ${details['entry_price']:.2f}")
logger.info(f" Exit: ${details['exit_price']:.2f}")
logger.info(f" PnL: ${pnl:.2f} ({details['pnl_pct']:.2f}%)")
@ -184,7 +279,7 @@ class RealtimeTrader:
elif action == 'REVERSE':
logger.info("=" * 60)
logger.info(f"🔄 [{config['name_en']}] REVERSE POSITION")
logger.info(f"🔄 [{symbol}][{config['name_en']}] REVERSE POSITION")
if 'close' in details:
logger.info(f" Closed: PnL ${details['close'].get('pnl', 0):.2f}")
if 'open' in details:
@ -192,9 +287,9 @@ class RealtimeTrader:
logger.info("=" * 60)
if self.on_trade_callback:
self.on_trade_callback({'timeframe': tf.value, 'action': action, 'details': details})
self.on_trade_callback({'symbol': symbol, 'timeframe': tf.value, 'action': action, 'details': details})
def _on_position_closed(self, tf: TimeFrame, close_result: Dict[str, Any]):
def _on_position_closed(self, symbol: str, tf: TimeFrame, close_result: Dict[str, Any]):
"""持仓被平仓回调(止盈止损)"""
config = TIMEFRAME_CONFIG[tf]
pnl = close_result.get('pnl', 0)
@ -203,7 +298,7 @@ class RealtimeTrader:
reason_icon = "🎯" if reason == 'TAKE_PROFIT' else "🛑"
logger.info("=" * 60)
logger.info(f"{reason_icon} [{config['name_en']}] {reason}")
logger.info(f"{reason_icon} [{symbol}][{config['name_en']}] {reason}")
logger.info(f" {pnl_icon} PnL: ${pnl:.2f} ({close_result.get('pnl_pct', 0):.2f}%)")
logger.info(f" Entry: ${close_result.get('entry_price', 0):.2f}")
logger.info(f" Exit: ${close_result.get('exit_price', 0):.2f}")
@ -212,6 +307,7 @@ class RealtimeTrader:
if self.on_trade_callback:
self.on_trade_callback({
'symbol': symbol,
'timeframe': tf.value,
'action': 'CLOSE',
'details': close_result,
@ -219,39 +315,55 @@ class RealtimeTrader:
def _print_status(self):
"""打印当前状态"""
status = self.trader.get_status(self.current_price)
status = self.trader.get_status(prices=self.current_prices)
print("\n" + "=" * 80)
print(f"📊 MULTI-TIMEFRAME TRADING STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"📊 MULTI-SYMBOL MULTI-TIMEFRAME TRADING STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)
print(f"💵 Current Price: ${self.current_price:.2f}")
print(f"💰 Total Equity: ${status['total_equity']:.2f} (Initial: ${status['total_initial_balance']:.2f})")
print(f"📈 Total Return: {status['total_return']:.2f}%")
# 打印各币种价格
prices_str = " | ".join([f"{sym}: ${price:,.2f}" for sym, price in self.current_prices.items() if price > 0])
print(f"💵 Prices: {prices_str}")
print(f"💰 Grand Total Equity: ${status['total_equity']:.2f} (Initial: ${status['total_initial_balance']:.2f})")
print(f"📈 Grand Total Return: {status['total_return']:.2f}%")
print("-" * 80)
for tf_value, tf_status in status['timeframes'].items():
name = tf_status['name_en']
equity = tf_status['equity']
return_pct = tf_status['return_pct']
leverage = tf_status['leverage']
stats = tf_status['stats']
# 按币种打印
for symbol in self.symbols:
symbol_status = status.get('symbols', {}).get(symbol, {})
if not symbol_status:
continue
return_icon = "🟢" if return_pct > 0 else "🔴" if return_pct < 0 else ""
unit = symbol.replace('USDT', '') if symbol.endswith('USDT') else symbol
sym_equity = symbol_status.get('total_equity', 0)
sym_return = symbol_status.get('total_return', 0)
return_icon = "🟢" if sym_return > 0 else "🔴" if sym_return < 0 else ""
print(f"\n📊 {name} ({leverage}x)")
print(f" Equity: ${equity:.2f} | Return: {return_icon} {return_pct:+.2f}%")
print(f" Trades: {stats['total_trades']} | Win Rate: {stats['win_rate']:.1f}% | PnL: ${stats['total_pnl']:.2f}")
print(f"\n🪙 {symbol} - Equity: ${sym_equity:.2f} | Return: {return_icon} {sym_return:+.2f}%")
pos = tf_status.get('position')
if pos:
unrealized = pos.get('unrealized_pnl', 0)
unrealized_pct = pos.get('unrealized_pnl_pct', 0)
pnl_icon = "🟢" if unrealized > 0 else "🔴" if unrealized < 0 else ""
print(f" Position: {pos['side']} @ ${pos['entry_price']:.2f}")
print(f" SL: ${pos['stop_loss']:.2f} | TP: ${pos['take_profit']:.2f}")
print(f" {pnl_icon} Unrealized: ${unrealized:.2f} ({unrealized_pct:+.2f}%)")
else:
print(f" Position: FLAT")
for tf_value, tf_status in symbol_status.get('timeframes', {}).items():
name = tf_status['name_en']
equity = tf_status['equity']
return_pct = tf_status['return_pct']
leverage = tf_status['leverage']
stats = tf_status['stats']
return_icon = "🟢" if return_pct > 0 else "🔴" if return_pct < 0 else ""
print(f" 📊 {name} ({leverage}x)")
print(f" Equity: ${equity:.2f} | Return: {return_icon} {return_pct:+.2f}%")
print(f" Trades: {stats['total_trades']} | Win Rate: {stats['win_rate']:.1f}% | PnL: ${stats['total_pnl']:.2f}")
pos = tf_status.get('position')
if pos:
unrealized = pos.get('unrealized_pnl', 0)
unrealized_pct = pos.get('unrealized_pnl_pct', 0)
pnl_icon = "🟢" if unrealized > 0 else "🔴" if unrealized < 0 else ""
print(f" Position: {pos['side']} @ ${pos['entry_price']:.2f}")
print(f" SL: ${pos['stop_loss']:.2f} | TP: ${pos['take_profit']:.2f}")
print(f" {pnl_icon} Unrealized: ${unrealized:.2f} ({unrealized_pct:+.2f}%)")
else:
print(f" Position: FLAT")
print("=" * 80 + "\n")
@ -262,12 +374,13 @@ class RealtimeTrader:
def get_status(self) -> Dict[str, Any]:
"""获取状态"""
return self.trader.get_status(self.current_price)
return self.trader.get_status(prices=self.current_prices)
async def main():
"""主函数"""
from dotenv import load_dotenv
import os
load_dotenv(Path(__file__).parent.parent / '.env')
@ -277,7 +390,7 @@ async def main():
)
trader = RealtimeTrader(
symbol='btcusdt',
symbols=settings.symbols_list,
initial_balance=10000.0,
signal_check_interval=30,
)
@ -290,12 +403,15 @@ async def main():
signal.signal(signal.SIGTERM, signal_handler)
print("\n" + "=" * 80)
print("🚀 MULTI-TIMEFRAME REALTIME TRADING")
print("🚀 MULTI-SYMBOL MULTI-TIMEFRAME REALTIME TRADING")
print("=" * 80)
print(f"Symbols: {', '.join(settings.symbols_list)}")
print("Timeframes:")
print(" 📈 Short-term (5m/15m/1h) - 5x leverage")
print(" 📊 Medium-term (4h/1d) - 3x leverage")
print(" 📉 Long-term (1d/1w) - 2x leverage")
print(" 📈 Short-term (5m/15m/1h) - 10x leverage")
print(" 📊 Medium-term (4h/1d) - 10x leverage")
print(" 📉 Long-term (1d/1w) - 10x leverage")
print(f"Initial Balance: $10,000 per timeframe per symbol")
print(f"Total Initial: ${10000 * 3 * len(settings.symbols_list):,}")
print("=" * 80)
print("Press Ctrl+C to stop")
print("=" * 80 + "\n")