From b404721e169c3790a99fde72aca8dffbd488192f Mon Sep 17 00:00:00 2001 From: aaron <> Date: Thu, 4 Dec 2025 01:27:58 +0800 Subject: [PATCH] update --- .env.example | 41 ++-- Dockerfile | 10 +- analysis/config.py | 17 +- analysis/data_reader.py | 363 +++++++++++++++-------------- analysis/engine.py | 36 +-- analysis/llm_context.py | 15 +- config/settings.py | 62 +---- core/__init__.py | 12 - core/deduplicator.py | 176 -------------- core/rate_limiter.py | 209 ----------------- core/redis_writer.py | 247 -------------------- core/websocket_client.py | 209 ----------------- docker-compose.yml | 108 +-------- main.py | 236 ------------------- output/latest_signal.json | 219 +++++++++-------- redis.conf | 36 --- requirements.txt | 12 +- run_signal.sh | 18 +- scheduler.py | 70 ++---- scripts/generate_trading_signal.py | 180 ++++++-------- scripts/monitor.sh | 50 ---- scripts/run_analysis.py | 4 + scripts/test_redis_read.py | 95 -------- signals/llm_gate.py | 255 -------------------- start_system.sh | 17 +- stop_system.sh | 8 +- view_data.sh | 48 ---- 27 files changed, 479 insertions(+), 2274 deletions(-) delete mode 100644 core/__init__.py delete mode 100644 core/deduplicator.py delete mode 100644 core/rate_limiter.py delete mode 100644 core/redis_writer.py delete mode 100644 core/websocket_client.py delete mode 100644 main.py delete mode 100644 redis.conf delete mode 100755 scripts/monitor.sh delete mode 100755 scripts/test_redis_read.py delete mode 100644 signals/llm_gate.py delete mode 100755 view_data.sh diff --git a/.env.example b/.env.example index fe10ff1..3413475 100644 --- a/.env.example +++ b/.env.example @@ -1,42 +1,29 @@ -# Binance Configuration -BINANCE_WS_BASE_URL=wss://fstream.binance.com -SYMBOL=btcusdt -KLINE_INTERVALS=5m,15m,1h,4h # Multiple intervals (comma-separated) +# Symbol Configuration +SYMBOL=BTCUSDT -# Redis Configuration -REDIS_HOST=redis -REDIS_PORT=6379 -REDIS_DB=0 -REDIS_PASSWORD= +# Binance API (optional - defaults to https://fapi.binance.com) +BINANCE_API_BASE_URL=https://fapi.binance.com -# Performance Tuning -MAX_BUFFER_SIZE=1000 -RATE_LIMIT_MESSAGES_PER_SEC=1000 -DEDUP_CACHE_SIZE=10000 +# Kline intervals for multi-timeframe analysis +KLINE_INTERVALS=5m,15m,1h,4h # Logging LOG_LEVEL=INFO -# LLM Gate Configuration (极简门控 - 频率为主,量化初筛) -LLM_GATE_ENABLED=true # 启用 LLM 门控 - -# 数据要求 -LLM_MIN_CANDLES=100 # 最少K线数量 - -# 信号质量(极简 - 只检查综合得分) -LLM_MIN_COMPOSITE_SCORE=15.0 # 最小综合得分(过滤完全中性信号) - -# 频率限制(核心控制!) -LLM_MAX_CALLS_PER_DAY=12 # 每天最多调用次数 -LLM_MIN_INTERVAL_MINUTES=0 # 最小调用间隔(分钟) +# Signal Generation Interval +SIGNAL_INTERVAL_MINUTES=10 # 每15分钟生成一次信号 # LLM API Configuration (optional - for AI-powered trading signals) # Option 1: Use Deepseek (recommended for Chinese market analysis, low cost) -# OPENAI_API_KEY=sk-your-deepseek-key-here -# OPENAI_BASE_URL=https://api.deepseek.com +OPENAI_API_KEY=sk-9f6b56f08796435d988cf202e37f6ee3 +OPENAI_BASE_URL=https://api.deepseek.com # Option 2: Use OpenAI GPT # OPENAI_API_KEY=sk-your-openai-key-here # Option 3: Use Anthropic Claude # ANTHROPIC_API_KEY=sk-ant-your-key-here + +# DingTalk Notification (optional) +DINGTALK_WEBHOOK=https://oapi.dingtalk.com/robot/send?access_token=a4fa1c1a6a07a5ed07d79c701f79b44efb1e726da3b47b50495ebdc9190423ec +DINGTALK_SECRET=SECdc6dffe3b6838a5d8afde3486d5415b9a17d3ebc9cbf934438883acee1189e8d diff --git a/Dockerfile b/Dockerfile index 0e548e7..924deda 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,15 +25,15 @@ COPY --from=builder /usr/local/bin /usr/local/bin # Copy application code COPY config ./config -COPY core ./core COPY analysis ./analysis COPY signals ./signals -COPY scripts ./scripts COPY notifiers ./notifiers -COPY main.py . COPY scheduler.py . COPY .env.example .env +# Create output directory +RUN mkdir -p /app/output + # Create non-root user for security RUN useradd -m -u 1000 appuser && \ chown -R appuser:appuser /app @@ -44,5 +44,5 @@ USER appuser HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ CMD python -c "import sys; sys.exit(0)" -# Run application -CMD ["python", "-u", "main.py"] +# Run scheduler by default +CMD ["python", "-u", "scheduler.py"] diff --git a/analysis/config.py b/analysis/config.py index 3dc3c77..ecca3c8 100644 --- a/analysis/config.py +++ b/analysis/config.py @@ -7,20 +7,11 @@ from pydantic_settings import BaseSettings class AnalysisConfig(BaseSettings): """Analysis configuration""" - # Redis connection - REDIS_HOST: str = "localhost" - REDIS_PORT: int = 6379 - REDIS_DB: int = 0 + # Symbol configuration + SYMBOL: str = "BTCUSDT" - # Stream keys - KLINE_5M_KEY: str = "binance:raw:kline:5m" - KLINE_15M_KEY: str = "binance:raw:kline:15m" - KLINE_1H_KEY: str = "binance:raw:kline:1h" - KLINE_4H_KEY: str = "binance:raw:kline:4h" - KLINE_1D_KEY: str = "binance:raw:kline:1d" - KLINE_1W_KEY: str = "binance:raw:kline:1w" - DEPTH_KEY: str = "binance:raw:depth:20" - TRADE_KEY: str = "binance:raw:trade" + # Binance API + BINANCE_API_BASE_URL: str = "https://fapi.binance.com" # Analysis parameters LOOKBACK_PERIODS: int = 200 # Number of candles to analyze diff --git a/analysis/data_reader.py b/analysis/data_reader.py index 6a57d9a..f4254ed 100644 --- a/analysis/data_reader.py +++ b/analysis/data_reader.py @@ -1,12 +1,11 @@ """ -Data reader for fetching market data from Redis Streams +Data reader for fetching market data from Binance API +Pure API mode - no Redis dependency """ import logging from typing import Optional, List, Dict, Any -from datetime import datetime, timedelta +from datetime import datetime import pandas as pd -import redis -import orjson import requests import time @@ -17,47 +16,67 @@ logger = logging.getLogger(__name__) class MarketDataReader: - """Read and aggregate market data from Redis Streams""" + """Read market data directly from Binance API""" - def __init__(self): - self.redis_client = redis.Redis( - host=config.REDIS_HOST, - port=config.REDIS_PORT, - db=config.REDIS_DB, - decode_responses=False, - ) - - def fetch_historical_klines_from_api( - self, symbol: str = 'BTCUSDT', interval: str = '5m', limit: int = 200 - ) -> pd.DataFrame: + def __init__(self, symbol: str = 'BTCUSDT'): """ - Fetch historical kline data from Binance API + Initialize data reader Args: symbol: Trading pair (e.g., 'BTCUSDT') + """ + self.symbol = symbol + self.base_url = 'https://fapi.binance.com' + + # Cache for rate limiting + self._cache = {} + self._cache_ttl = 60 # Cache TTL in seconds + + def _get_cached(self, key: str) -> Optional[Any]: + """Get cached data if not expired""" + if key in self._cache: + data, timestamp = self._cache[key] + if time.time() - timestamp < self._cache_ttl: + return data + return None + + def _set_cache(self, key: str, data: Any) -> None: + """Set cache with current timestamp""" + self._cache[key] = (data, time.time()) + + def fetch_klines( + self, interval: str = '5m', limit: int = 200 + ) -> pd.DataFrame: + """ + Fetch kline data from Binance API + + Args: interval: Kline interval (e.g., '5m', '15m', '1h', '4h') limit: Number of candles to fetch (max 1500) Returns: - DataFrame with historical OHLCV data + DataFrame with OHLCV data """ - try: - # Binance API endpoint - url = 'https://fapi.binance.com/fapi/v1/klines' + cache_key = f"kline:{self.symbol}:{interval}" + cached = self._get_cached(cache_key) + if cached is not None: + logger.debug(f"Using cached data for {cache_key}") + return cached + try: + url = f'{self.base_url}/fapi/v1/klines' params = { - 'symbol': symbol, + 'symbol': self.symbol, 'interval': interval, - 'limit': min(limit, 1500) # API limit + 'limit': min(limit, 1500) } - logger.info(f"Fetching {limit} historical candles from Binance API ({symbol} {interval})...") + logger.info(f"Fetching {limit} candles from Binance API ({self.symbol} {interval})...") response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() - # Parse API response klines = [] for item in data: klines.append({ @@ -69,15 +88,17 @@ class MarketDataReader: 'volume': float(item[5]), 'quote_volume': float(item[7]), 'trades': int(item[8]), - 'is_closed': True, # Historical data is always closed + 'is_closed': True, }) df = pd.DataFrame(klines) if not df.empty: df.set_index('timestamp', inplace=True) df.sort_index(inplace=True) - logger.info(f"✅ Fetched {len(df)} candles from Binance API") + logger.info(f"Fetched {len(df)} candles from Binance API") + # Cache the result + self._set_cache(cache_key, df) return df except Exception as e: @@ -88,144 +109,146 @@ class MarketDataReader: self, stream_key: str, count: int = None, use_api_fallback: bool = True ) -> pd.DataFrame: """ - Read kline data from Redis Stream and convert to DataFrame - Only includes completed candles (x: true). If insufficient data, - fetches historical data from Binance API. + Read kline data - compatibility method that extracts interval from stream key Args: - stream_key: Redis stream key (e.g., 'binance:raw:kline:5m') - count: Number of recent candles to fetch (default: LOOKBACK_PERIODS) - use_api_fallback: Whether to fetch from API if Redis data insufficient + stream_key: Stream key format (e.g., 'binance:raw:kline:5m') + count: Number of candles to fetch + use_api_fallback: Ignored (always uses API) Returns: - DataFrame with OHLCV data and indicators + DataFrame with OHLCV data """ if count is None: count = config.LOOKBACK_PERIODS - try: - # Read MORE messages from stream to account for duplicates - # Multiply by 10 to ensure we get enough unique candles after filtering - messages = self.redis_client.xrevrange(stream_key, count=count * 10) - - if not messages: - logger.warning(f"No data found in stream: {stream_key}") - # Fallback to API - if use_api_fallback: - return self._fetch_from_api_with_interval(stream_key, count) - return pd.DataFrame() - - # Parse messages - ONLY keep completed candles (x: true) - klines = [] - seen_timestamps = set() - - for msg_id, fields in reversed(messages): # Reverse to get chronological order - data = orjson.loads(fields[b'data']) - k = data.get('k', {}) - - # IMPORTANT: Only keep completed candles - if not k.get('x', False): - continue - - # Deduplicate by timestamp - timestamp = k['t'] - if timestamp in seen_timestamps: - continue - seen_timestamps.add(timestamp) - - klines.append({ - 'timestamp': datetime.fromtimestamp(k['t'] / 1000), - 'open': float(k['o']), - 'high': float(k['h']), - 'low': float(k['l']), - 'close': float(k['c']), - 'volume': float(k['v']), - 'quote_volume': float(k['q']), - 'trades': int(k['n']), - 'is_closed': k['x'], - }) - - # Stop if we have enough candles - if len(klines) >= count: - break - - # Create DataFrame - df = pd.DataFrame(klines) - - if df.empty: - logger.warning(f"No completed candles found in stream: {stream_key}") - # Fallback to API - if use_api_fallback: - return self._fetch_from_api_with_interval(stream_key, count) - return df - - df.set_index('timestamp', inplace=True) - df.sort_index(inplace=True) - - logger.info(f"Loaded {len(df)} completed candles from {stream_key}") - - # If still insufficient, supplement with API data - if len(df) < count and use_api_fallback: - logger.warning(f"Insufficient data: {len(df)}/{count} candles. Fetching from API...") - api_df = self._fetch_from_api_with_interval(stream_key, count) - - if not api_df.empty: - # Merge Redis and API data, preferring Redis for overlapping periods - combined = pd.concat([api_df, df]) - combined = combined[~combined.index.duplicated(keep='last')] - combined.sort_index(inplace=True) - logger.info(f"Combined data: {len(combined)} candles (Redis: {len(df)}, API: {len(api_df)})") - return combined - - return df - - except Exception as e: - logger.error(f"Error reading kline stream {stream_key}: {e}") - return pd.DataFrame() - - def _fetch_from_api_with_interval(self, stream_key: str, count: int) -> pd.DataFrame: - """Extract interval from stream key and fetch from API""" # Extract interval from stream key (e.g., 'binance:raw:kline:5m' -> '5m') try: interval = stream_key.split(':')[-1] - return self.fetch_historical_klines_from_api( - symbol='BTCUSDT', - interval=interval, - limit=count - ) + except Exception: + interval = '5m' + + return self.fetch_klines(interval=interval, limit=count) + + def fetch_historical_klines_from_api( + self, symbol: str = None, interval: str = '5m', limit: int = 200 + ) -> pd.DataFrame: + """ + Fetch historical kline data from Binance API + Compatibility method for existing code + + Args: + symbol: Trading pair (uses self.symbol if None) + interval: Kline interval + limit: Number of candles + + Returns: + DataFrame with OHLCV data + """ + if symbol and symbol != self.symbol: + # Temporarily change symbol + old_symbol = self.symbol + self.symbol = symbol + df = self.fetch_klines(interval=interval, limit=limit) + self.symbol = old_symbol + return df + return self.fetch_klines(interval=interval, limit=limit) + + def fetch_depth(self, limit: int = 20) -> Optional[Dict[str, Any]]: + """ + Fetch order book depth from Binance API + + Args: + limit: Number of levels (5, 10, 20, 50, 100, 500, 1000) + + Returns: + Dict with bids and asks, or None if error + """ + cache_key = f"depth:{self.symbol}:{limit}" + cached = self._get_cached(cache_key) + if cached is not None: + return cached + + try: + url = f'{self.base_url}/fapi/v1/depth' + params = { + 'symbol': self.symbol, + 'limit': limit + } + + response = requests.get(url, params=params, timeout=10) + response.raise_for_status() + data = response.json() + + result = { + 'timestamp': datetime.now(), + 'bids': [[float(p), float(q)] for p, q in data['bids']], + 'asks': [[float(p), float(q)] for p, q in data['asks']], + } + + self._set_cache(cache_key, result) + return result + except Exception as e: - logger.error(f"Error extracting interval from {stream_key}: {e}") - return pd.DataFrame() + logger.error(f"Error fetching depth data: {e}") + return None def read_latest_depth(self) -> Optional[Dict[str, Any]]: """ - Read latest order book depth data + Read latest order book depth + Compatibility method for existing code Returns: Dict with bids and asks, or None if no data """ + return self.fetch_depth(limit=20) + + def fetch_recent_trades(self, limit: int = 100) -> List[Dict[str, Any]]: + """ + Fetch recent trades from Binance API + + Args: + limit: Number of trades to fetch (max 1000) + + Returns: + List of trade dictionaries + """ + cache_key = f"trades:{self.symbol}:{limit}" + cached = self._get_cached(cache_key) + if cached is not None: + return cached + try: - messages = self.redis_client.xrevrange(config.DEPTH_KEY, count=1) - - if not messages: - return None - - msg_id, fields = messages[0] - data = orjson.loads(fields[b'data']) - - return { - 'timestamp': datetime.fromtimestamp(data['E'] / 1000), - 'bids': [[float(p), float(q)] for p, q in data['b']], - 'asks': [[float(p), float(q)] for p, q in data['a']], + url = f'{self.base_url}/fapi/v1/trades' + params = { + 'symbol': self.symbol, + 'limit': min(limit, 1000) } + response = requests.get(url, params=params, timeout=10) + response.raise_for_status() + data = response.json() + + trades = [] + for item in data: + trades.append({ + 'timestamp': datetime.fromtimestamp(item['time'] / 1000), + 'price': float(item['price']), + 'quantity': float(item['qty']), + 'is_buyer_maker': item['isBuyerMaker'], + }) + + self._set_cache(cache_key, trades) + return trades + except Exception as e: - logger.error(f"Error reading depth data: {e}") - return None + logger.error(f"Error fetching trade data: {e}") + return [] def read_recent_trades(self, count: int = 100) -> List[Dict[str, Any]]: """ - Read recent trade data + Read recent trades + Compatibility method for existing code Args: count: Number of recent trades to fetch @@ -233,28 +256,7 @@ class MarketDataReader: Returns: List of trade dictionaries """ - try: - messages = self.redis_client.xrevrange(config.TRADE_KEY, count=count) - - if not messages: - return [] - - trades = [] - for msg_id, fields in messages: - data = orjson.loads(fields[b'data']) - - trades.append({ - 'timestamp': datetime.fromtimestamp(data['T'] / 1000), - 'price': float(data['p']), - 'quantity': float(data['q']), - 'is_buyer_maker': data['m'], # True = sell, False = buy - }) - - return trades - - except Exception as e: - logger.error(f"Error reading trade data: {e}") - return [] + return self.fetch_recent_trades(limit=count) def get_multi_timeframe_data(self) -> Dict[str, pd.DataFrame]: """ @@ -263,32 +265,39 @@ class MarketDataReader: Returns: Dict mapping timeframe to DataFrame """ - # Different timeframes need different amount of data - # Shorter timeframes: 200 candles (for detailed analysis) - # Longer timeframes: fewer candles (100 for 1d, 60+ for 1w) timeframes = { - '5m': (config.KLINE_5M_KEY, 200), - '15m': (config.KLINE_15M_KEY, 200), - '1h': (config.KLINE_1H_KEY, 200), - '4h': (config.KLINE_4H_KEY, 200), - '1d': (config.KLINE_1D_KEY, 100), # 100 days ≈ 3+ months - '1w': (config.KLINE_1W_KEY, 65), # 65 weeks ≈ 15 months + '5m': 200, + '15m': 200, + '1h': 200, + '4h': 200, + '1d': 100, + '1w': 65, } data = {} - for tf, (key, count) in timeframes.items(): - df = self.read_kline_stream(key, count=count) + for tf, count in timeframes.items(): + df = self.fetch_klines(interval=tf, limit=count) if not df.empty: data[tf] = df return data def get_latest_price(self) -> Optional[float]: - """Get latest close price from 5m kline""" + """Get latest price from ticker""" try: - df = self.read_kline_stream(config.KLINE_5M_KEY, count=1) - if not df.empty: - return float(df.iloc[-1]['close']) + url = f'{self.base_url}/fapi/v1/ticker/price' + params = {'symbol': self.symbol} + + response = requests.get(url, params=params, timeout=5) + response.raise_for_status() + data = response.json() + + return float(data['price']) + except Exception as e: logger.error(f"Error getting latest price: {e}") - return None + # Fallback: get from kline + df = self.fetch_klines(interval='1m', limit=1) + if not df.empty: + return float(df.iloc[-1]['close']) + return None diff --git a/analysis/engine.py b/analysis/engine.py index 40972f8..f6f3c13 100644 --- a/analysis/engine.py +++ b/analysis/engine.py @@ -1,8 +1,9 @@ """ Main Market Analysis Engine - Orchestrates all analysis components +Pure API mode - no Redis dependency """ import logging -from typing import Dict, Any, Optional +from typing import Dict, Any import pandas as pd from .data_reader import MarketDataReader @@ -10,6 +11,7 @@ from .indicators import TechnicalIndicators from .market_structure import MarketStructureAnalyzer from .orderflow import OrderFlowAnalyzer from .llm_context import LLMContextBuilder +from .config import config logger = logging.getLogger(__name__) @@ -20,23 +22,33 @@ class MarketAnalysisEngine: Main analysis engine that orchestrates all market analysis components """ - def __init__(self): - self.data_reader = MarketDataReader() + def __init__(self, symbol: str = None): + """ + Initialize analysis engine + + Args: + symbol: Trading symbol (default from config) + """ + self.symbol = symbol or config.SYMBOL + self.data_reader = MarketDataReader(symbol=self.symbol) self.llm_builder = LLMContextBuilder() def analyze_current_market( - self, timeframe: str = '5m', symbol: str = 'BTCUSDT' + self, timeframe: str = '5m', symbol: str = None ) -> Dict[str, Any]: """ Perform complete market analysis for current state Args: timeframe: Primary timeframe for analysis (5m, 15m, 1h, 4h) - symbol: Trading symbol + symbol: Trading symbol (uses default if None) Returns: Complete analysis dictionary """ + if symbol is None: + symbol = self.symbol + try: logger.info(f"Starting market analysis for {symbol} on {timeframe}") @@ -170,11 +182,8 @@ class MarketAnalysisEngine: Returns: DataFrame with OHLCV and indicators """ - # Map timeframe to stream key - stream_key = f"binance:raw:kline:{timeframe}" - - # Fetch data - df = self.data_reader.read_kline_stream(stream_key) + # Fetch data directly from API + df = self.data_reader.fetch_klines(interval=timeframe, limit=config.LOOKBACK_PERIODS) if df.empty: return df @@ -186,7 +195,7 @@ class MarketAnalysisEngine: def check_data_availability(self) -> Dict[str, Any]: """ - Check what data is available in Redis + Check what data is available from API Returns: Dict with data availability status @@ -197,10 +206,9 @@ class MarketAnalysisEngine: 'trades': False, } - # Check kline streams + # Check kline data for each timeframe for tf in ['5m', '15m', '1h', '4h']: - stream_key = f"binance:raw:kline:{tf}" - df = self.data_reader.read_kline_stream(stream_key, count=1) + df = self.data_reader.fetch_klines(interval=tf, limit=1) status['klines'][tf] = { 'available': not df.empty, 'latest': df.index[-1].isoformat() if not df.empty else None, diff --git a/analysis/llm_context.py b/analysis/llm_context.py index 288474a..18db5de 100644 --- a/analysis/llm_context.py +++ b/analysis/llm_context.py @@ -97,20 +97,9 @@ class LLMContextBuilder: Returns: Dict mapping timeframe to DataFrame """ - from .config import config as analysis_config - - timeframes = { - '5m': (analysis_config.KLINE_5M_KEY, KLINE_LIMITS['5m']), - '15m': (analysis_config.KLINE_15M_KEY, KLINE_LIMITS['15m']), - '1h': (analysis_config.KLINE_1H_KEY, KLINE_LIMITS['1h']), - '4h': (analysis_config.KLINE_4H_KEY, KLINE_LIMITS['4h']), - '1d': (analysis_config.KLINE_1D_KEY, KLINE_LIMITS['1d']), - '1w': (analysis_config.KLINE_1W_KEY, KLINE_LIMITS['1w']), - } - data = {} - for tf, (key, count) in timeframes.items(): - df = self.data_reader.read_kline_stream(key, count=count) + for tf, count in KLINE_LIMITS.items(): + df = self.data_reader.fetch_klines(interval=tf, limit=count) if not df.empty: data[tf] = df diff --git a/config/settings.py b/config/settings.py index 371b91c..f3d4eda 100644 --- a/config/settings.py +++ b/config/settings.py @@ -1,5 +1,6 @@ """ -Configuration settings for Binance WebSocket data ingestion system +Configuration settings for Signal Generation System +Pure API mode - no Redis dependency """ from pydantic_settings import BaseSettings, SettingsConfigDict @@ -13,67 +14,26 @@ class Settings(BaseSettings): extra="ignore" # Ignore extra fields from environment ) - # Binance WebSocket Configuration - BINANCE_WS_BASE_URL: str = "wss://fstream.binance.com" - SYMBOL: str = "btcusdt" + # Symbol Configuration + SYMBOL: str = "BTCUSDT" - # Stream subscriptions - KLINE_INTERVALS: str = "5m,15m,1h,4h" # Multiple kline intervals (comma-separated) - DEPTH_LEVEL: int = 20 # Top 20 order book levels + # Binance API Configuration + BINANCE_API_BASE_URL: str = "https://fapi.binance.com" - # Redis Configuration - REDIS_HOST: str = "redis" - REDIS_PORT: int = 6379 - REDIS_DB: int = 0 - REDIS_PASSWORD: str = "" - - # Redis Stream Keys (prefix, actual keys are dynamic based on intervals) - REDIS_STREAM_KLINE_PREFIX: str = "binance:raw:kline" # Will be: binance:raw:kline:5m, etc. - REDIS_STREAM_DEPTH: str = "binance:raw:depth:20" - REDIS_STREAM_TRADE: str = "binance:raw:trade" + # Kline intervals for multi-timeframe analysis + KLINE_INTERVALS: str = "5m,15m,1h,4h" @property def kline_intervals_list(self) -> list: """Parse kline intervals from comma-separated string""" return [interval.strip() for interval in self.KLINE_INTERVALS.split(',')] - # Stream Configuration - REDIS_STREAM_MAXLEN: int = 10000 # Keep last 10k messages per stream - - # Reconnection Strategy - RECONNECT_INITIAL_DELAY: float = 1.0 # Initial delay in seconds - RECONNECT_MAX_DELAY: float = 60.0 # Max delay in seconds - RECONNECT_MULTIPLIER: float = 2.0 # Exponential backoff multiplier - MAX_RECONNECT_ATTEMPTS: int = 100 # -1 for unlimited - - # Memory Protection - MAX_BUFFER_SIZE: int = 1000 # Max messages in memory buffer - RATE_LIMIT_MESSAGES_PER_SEC: int = 1000 # Max messages processed per second - - # Message Deduplication - DEDUP_CACHE_SIZE: int = 10000 # Size of deduplication cache - DEDUP_TTL_SECONDS: int = 300 # TTL for dedup entries (5 minutes) - # Monitoring - HEALTH_CHECK_INTERVAL: int = 30 # Health check interval in seconds LOG_LEVEL: str = "INFO" - # LLM Gate Configuration (极简门控 - 频率为主,量化初筛) - LLM_GATE_ENABLED: bool = True # 启用 LLM 门控 - - # 数据要求 - LLM_MIN_CANDLES: int = 100 # 最少K线数量 - - # 信号质量(极简 - 只检查综合得分) - LLM_MIN_COMPOSITE_SCORE: float = 0.0 # Gate关闭 - 每次都调用LLM - - # 频率限制(核心控制!) - LLM_MAX_CALLS_PER_DAY: int = 12 # 每天最多调用次数 - LLM_MIN_INTERVAL_MINUTES: int = 15 # 最小调用间隔(分钟) - - # 盈利空间过滤(过滤低利润机会) - MIN_PROFIT_PCT: float = 1.0 # 最小盈利空间百分比,低于此值的机会不给操作建议 - PREFER_INTRADAY: bool = True # 优先日内短线交易建议 + # Profit filter + MIN_PROFIT_PCT: float = 1.0 + PREFER_INTRADAY: bool = True settings = Settings() diff --git a/core/__init__.py b/core/__init__.py deleted file mode 100644 index 1263527..0000000 --- a/core/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -from .websocket_client import BinanceWebSocketClient -from .redis_writer import RedisStreamWriter -from .deduplicator import MessageDeduplicator -from .rate_limiter import RateLimiter, BufferedMessageProcessor - -__all__ = [ - "BinanceWebSocketClient", - "RedisStreamWriter", - "MessageDeduplicator", - "RateLimiter", - "BufferedMessageProcessor", -] diff --git a/core/deduplicator.py b/core/deduplicator.py deleted file mode 100644 index 217ab54..0000000 --- a/core/deduplicator.py +++ /dev/null @@ -1,176 +0,0 @@ -""" -Message deduplication using event time (E field) and LRU cache -""" -import logging -import time -from collections import OrderedDict -from typing import Dict, Any, Optional - -from config import settings - - -logger = logging.getLogger(__name__) - - -class MessageDeduplicator: - """ - LRU-based message deduplicator with TTL support. - - Uses the 'E' field (event time) from Binance messages as unique identifier. - Automatically evicts old entries to prevent memory leaks. - """ - - def __init__( - self, - max_size: int = settings.DEDUP_CACHE_SIZE, - ttl_seconds: int = settings.DEDUP_TTL_SECONDS, - ): - """ - Initialize deduplicator - - Args: - max_size: Maximum number of entries to keep in cache - ttl_seconds: Time-to-live for cache entries in seconds - """ - self.max_size = max_size - self.ttl_seconds = ttl_seconds - - # OrderedDict for LRU cache: {message_key: timestamp} - self._cache: OrderedDict[str, float] = OrderedDict() - - # Statistics - self.stats = { - "total_checked": 0, - "duplicates_found": 0, - "cache_evictions": 0, - "ttl_evictions": 0, - } - - def _generate_key(self, message: Dict[str, Any]) -> Optional[str]: - """ - Generate unique key for message - - Uses combination of: - - Stream name (_stream field) - - Event time (E field) - - Symbol (s field) - - Args: - message: Message data - - Returns: - Unique key or None if key cannot be generated - """ - try: - # Get stream name - stream = message.get("_stream", "unknown") - - # Get event time (E field) - primary dedup identifier - event_time = message.get("E") - if not event_time: - # Fallback to T field for some message types - event_time = message.get("T") - - if not event_time: - logger.warning(f"No event time found in message: {message}") - return None - - # Get symbol (s field) - symbol = message.get("s", "") - - # Create composite key - key = f"{stream}:{symbol}:{event_time}" - return key - - except Exception as e: - logger.error(f"Error generating dedup key: {e}") - return None - - def _evict_expired(self) -> None: - """Remove expired entries based on TTL""" - if not self._cache: - return - - current_time = time.time() - expired_keys = [] - - # Find expired entries - for key, timestamp in self._cache.items(): - if current_time - timestamp > self.ttl_seconds: - expired_keys.append(key) - else: - # OrderedDict is sorted by insertion time - # Once we hit a non-expired entry, all following entries are also non-expired - break - - # Remove expired entries - for key in expired_keys: - del self._cache[key] - self.stats["ttl_evictions"] += 1 - - def _evict_lru(self) -> None: - """Remove least recently used entry""" - if self._cache: - self._cache.popitem(last=False) # FIFO: remove oldest - self.stats["cache_evictions"] += 1 - - def is_duplicate(self, message: Dict[str, Any]) -> bool: - """ - Check if message is a duplicate - - Args: - message: Message data to check - - Returns: - True if duplicate, False if new message - """ - self.stats["total_checked"] += 1 - - # Generate unique key - key = self._generate_key(message) - if not key: - # If we can't generate a key, assume it's not a duplicate - return False - - # Clean up expired entries periodically - if self.stats["total_checked"] % 100 == 0: - self._evict_expired() - - # Check if key exists in cache - current_time = time.time() - - if key in self._cache: - # Update timestamp (move to end for LRU) - del self._cache[key] - self._cache[key] = current_time - - self.stats["duplicates_found"] += 1 - return True - - # New message - add to cache - self._cache[key] = current_time - - # Enforce max size - if len(self._cache) > self.max_size: - self._evict_lru() - - return False - - def clear(self) -> None: - """Clear all cache entries""" - self._cache.clear() - logger.info("Deduplication cache cleared") - - def get_stats(self) -> Dict[str, Any]: - """Get deduplication statistics""" - duplicate_rate = ( - self.stats["duplicates_found"] / self.stats["total_checked"] - if self.stats["total_checked"] > 0 - else 0.0 - ) - - return { - **self.stats, - "cache_size": len(self._cache), - "duplicate_rate": f"{duplicate_rate:.2%}", - } diff --git a/core/rate_limiter.py b/core/rate_limiter.py deleted file mode 100644 index 6e05c01..0000000 --- a/core/rate_limiter.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -Rate limiter and buffer manager for memory leak protection -""" -import asyncio -import logging -import time -from typing import Dict, Any, List -from collections import deque - -from config import settings - - -logger = logging.getLogger(__name__) - - -class RateLimiter: - """ - Token bucket rate limiter for message processing. - - Prevents overwhelming downstream systems and protects against memory leaks. - """ - - def __init__(self, max_rate: int = settings.RATE_LIMIT_MESSAGES_PER_SEC): - """ - Initialize rate limiter - - Args: - max_rate: Maximum messages per second - """ - self.max_rate = max_rate - self.tokens = max_rate - self.last_update = time.time() - self.lock = asyncio.Lock() - - async def acquire(self) -> bool: - """ - Acquire token for processing a message - - Returns: - True if token acquired, False if rate limit exceeded - """ - async with self.lock: - now = time.time() - elapsed = now - self.last_update - - # Refill tokens based on elapsed time - self.tokens = min( - self.max_rate, - self.tokens + elapsed * self.max_rate - ) - self.last_update = now - - if self.tokens >= 1: - self.tokens -= 1 - return True - - return False - - async def wait(self) -> None: - """Wait until a token is available""" - while not await self.acquire(): - await asyncio.sleep(0.01) # 10ms sleep - - -class BufferedMessageProcessor: - """ - Buffered message processor with memory protection. - - Features: - - Bounded buffer to prevent memory exhaustion - - Batch processing for efficiency - - Overflow detection and alerts - - Backpressure handling - """ - - def __init__( - self, - max_buffer_size: int = settings.MAX_BUFFER_SIZE, - batch_size: int = 100, - batch_timeout: float = 1.0, - ): - """ - Initialize buffered processor - - Args: - max_buffer_size: Maximum messages in buffer - batch_size: Number of messages to batch before processing - batch_timeout: Max time to wait before processing partial batch (seconds) - """ - self.max_buffer_size = max_buffer_size - self.batch_size = batch_size - self.batch_timeout = batch_timeout - - # Bounded deque for FIFO buffer - self.buffer: deque = deque(maxlen=max_buffer_size) - self.lock = asyncio.Lock() - - # Statistics - self.stats = { - "messages_buffered": 0, - "messages_processed": 0, - "messages_dropped": 0, - "buffer_overflows": 0, - "current_buffer_size": 0, - "max_buffer_size_reached": 0, - } - - async def add_message(self, message: Dict[str, Any]) -> bool: - """ - Add message to buffer - - Args: - message: Message to buffer - - Returns: - True if added successfully, False if buffer is full (message dropped) - """ - async with self.lock: - current_size = len(self.buffer) - - # Check if buffer is full - if current_size >= self.max_buffer_size: - self.stats["messages_dropped"] += 1 - self.stats["buffer_overflows"] += 1 - - if self.stats["buffer_overflows"] % 100 == 1: - logger.warning( - f"Buffer overflow! Dropped message. " - f"Buffer size: {current_size}/{self.max_buffer_size}" - ) - return False - - # Add to buffer - self.buffer.append(message) - self.stats["messages_buffered"] += 1 - self.stats["current_buffer_size"] = len(self.buffer) - - # Track max buffer size - if current_size > self.stats["max_buffer_size_reached"]: - self.stats["max_buffer_size_reached"] = current_size - - return True - - async def get_batch(self, timeout: float = None) -> List[Dict[str, Any]]: - """ - Get batch of messages from buffer - - Args: - timeout: Max time to wait for batch (seconds) - - Returns: - List of messages (may be less than batch_size) - """ - timeout = timeout or self.batch_timeout - start_time = time.time() - batch = [] - - while len(batch) < self.batch_size: - async with self.lock: - if self.buffer: - batch.append(self.buffer.popleft()) - self.stats["current_buffer_size"] = len(self.buffer) - - # Check timeout - if time.time() - start_time >= timeout: - break - - # If buffer is empty and we have some messages, return them - if not self.buffer and batch: - break - - # Small sleep to avoid busy waiting - if not batch: - await asyncio.sleep(0.01) - - if batch: - self.stats["messages_processed"] += len(batch) - - return batch - - def get_buffer_usage(self) -> float: - """Get buffer usage percentage (0.0 to 1.0)""" - return len(self.buffer) / self.max_buffer_size if self.max_buffer_size > 0 else 0.0 - - def is_buffer_critical(self, threshold: float = 0.8) -> bool: - """Check if buffer usage is above critical threshold""" - return self.get_buffer_usage() > threshold - - def get_stats(self) -> Dict[str, Any]: - """Get processor statistics""" - buffer_usage = self.get_buffer_usage() - drop_rate = ( - self.stats["messages_dropped"] / self.stats["messages_buffered"] - if self.stats["messages_buffered"] > 0 - else 0.0 - ) - - return { - **self.stats, - "buffer_usage": f"{buffer_usage:.1%}", - "drop_rate": f"{drop_rate:.2%}", - } - - async def clear(self) -> None: - """Clear all buffered messages""" - async with self.lock: - self.buffer.clear() - self.stats["current_buffer_size"] = 0 - logger.info("Message buffer cleared") diff --git a/core/redis_writer.py b/core/redis_writer.py deleted file mode 100644 index 1ee6384..0000000 --- a/core/redis_writer.py +++ /dev/null @@ -1,247 +0,0 @@ -""" -Redis Stream writer with batch support and error handling -""" -import asyncio -import logging -from typing import Dict, Any, Optional -import orjson -import redis.asyncio as redis -from redis.exceptions import RedisError, ConnectionError as RedisConnectionError - -from config import settings - - -logger = logging.getLogger(__name__) - - -class RedisStreamWriter: - """ - Redis Stream writer for real-time market data. - - Features: - - Async Redis client with connection pooling - - Automatic stream trimming (MAXLEN) - - JSON serialization with orjson - - Connection retry logic - - Performance metrics - """ - - def __init__(self): - self.redis_client: Optional[redis.Redis] = None - self.is_connected = False - - # Statistics - self.stats = { - "messages_written": 0, - "kline_count": 0, - "depth_count": 0, - "trade_count": 0, - "errors": 0, - } - - async def connect(self) -> None: - """Establish Redis connection""" - try: - self.redis_client = redis.Redis( - host=settings.REDIS_HOST, - port=settings.REDIS_PORT, - db=settings.REDIS_DB, - password=settings.REDIS_PASSWORD if settings.REDIS_PASSWORD else None, - encoding="utf-8", - decode_responses=False, # We'll handle JSON encoding - socket_connect_timeout=5, - socket_keepalive=True, - health_check_interval=30, - ) - - # Test connection - await self.redis_client.ping() - self.is_connected = True - logger.info("✓ Redis connection established") - - except RedisConnectionError as e: - logger.error(f"Failed to connect to Redis: {e}") - raise - except Exception as e: - logger.error(f"Unexpected error connecting to Redis: {e}") - raise - - async def close(self) -> None: - """Close Redis connection""" - if self.redis_client: - await self.redis_client.close() - self.is_connected = False - logger.info("Redis connection closed") - - def _serialize_message(self, message: Dict[str, Any]) -> bytes: - """ - Serialize message to JSON bytes using orjson - - Args: - message: Message data - - Returns: - JSON bytes - """ - return orjson.dumps(message) - - def _determine_stream_key(self, message: Dict[str, Any]) -> Optional[str]: - """ - Determine which Redis Stream to write to based on message type - - Args: - message: Message data - - Returns: - Redis stream key or None if unknown type - """ - stream = message.get("_stream", "") - - # Kline stream - extract interval from stream name - if "kline" in stream or ("e" in message and message["e"] == "kline"): - # Extract interval from stream name (e.g., "btcusdt@kline_5m" -> "5m") - if "@kline_" in stream: - interval = stream.split("@kline_")[1] - return f"{settings.REDIS_STREAM_KLINE_PREFIX}:{interval}" - # Fallback: extract from message data - elif "k" in message and "i" in message["k"]: - interval = message["k"]["i"] - return f"{settings.REDIS_STREAM_KLINE_PREFIX}:{interval}" - - # Depth stream - if "depth" in stream or ("e" in message and message["e"] == "depthUpdate"): - return settings.REDIS_STREAM_DEPTH - - # Trade stream - if "trade" in stream or "aggTrade" in stream or ("e" in message and message["e"] in ["trade", "aggTrade"]): - return settings.REDIS_STREAM_TRADE - - logger.warning(f"Unknown message type, stream: {stream}, message: {message}") - return None - - async def write_message(self, message: Dict[str, Any]) -> bool: - """ - Write single message to appropriate Redis Stream - - Args: - message: Message data - - Returns: - True if successful, False otherwise - """ - if not self.is_connected or not self.redis_client: - logger.error("Redis client not connected") - return False - - try: - # Determine stream key - stream_key = self._determine_stream_key(message) - if not stream_key: - return False - - # Serialize message - message_json = self._serialize_message(message) - - # Write to Redis Stream with MAXLEN - await self.redis_client.xadd( - name=stream_key, - fields={"data": message_json}, - maxlen=settings.REDIS_STREAM_MAXLEN, - approximate=True, # Use ~ for better performance - ) - - # Update statistics - self.stats["messages_written"] += 1 - if "kline" in stream_key: - self.stats["kline_count"] += 1 - elif "depth" in stream_key: - self.stats["depth_count"] += 1 - elif "trade" in stream_key: - self.stats["trade_count"] += 1 - - return True - - except RedisError as e: - logger.error(f"Redis error writing message: {e}") - self.stats["errors"] += 1 - return False - - except Exception as e: - logger.error(f"Unexpected error writing message: {e}", exc_info=True) - self.stats["errors"] += 1 - return False - - async def write_batch(self, messages: list[Dict[str, Any]]) -> int: - """ - Write batch of messages using pipeline - - Args: - messages: List of messages - - Returns: - Number of successfully written messages - """ - if not self.is_connected or not self.redis_client: - logger.error("Redis client not connected") - return 0 - - if not messages: - return 0 - - try: - # Group messages by stream key - streams: Dict[str, list[bytes]] = {} - - for message in messages: - stream_key = self._determine_stream_key(message) - if not stream_key: - continue - - message_json = self._serialize_message(message) - - if stream_key not in streams: - streams[stream_key] = [] - streams[stream_key].append(message_json) - - # Write using pipeline - async with self.redis_client.pipeline(transaction=False) as pipe: - for stream_key, stream_messages in streams.items(): - for msg in stream_messages: - pipe.xadd( - name=stream_key, - fields={"data": msg}, - maxlen=settings.REDIS_STREAM_MAXLEN, - approximate=True, - ) - - await pipe.execute() - - # Update statistics - total_written = sum(len(msgs) for msgs in streams.values()) - self.stats["messages_written"] += total_written - - return total_written - - except RedisError as e: - logger.error(f"Redis error in batch write: {e}") - self.stats["errors"] += 1 - return 0 - - except Exception as e: - logger.error(f"Unexpected error in batch write: {e}", exc_info=True) - self.stats["errors"] += 1 - return 0 - - async def health_check(self) -> bool: - """Check Redis connection health""" - try: - if not self.redis_client: - return False - await self.redis_client.ping() - return True - except Exception: - return False - - def get_stats(self) -> Dict[str, Any]: - """Get writer statistics""" - return {**self.stats} diff --git a/core/websocket_client.py b/core/websocket_client.py deleted file mode 100644 index 383631c..0000000 --- a/core/websocket_client.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -Binance WebSocket Client with auto-reconnection and exponential backoff -""" -import asyncio -import logging -import json -import time -from typing import Callable, Optional, Dict, Any -from datetime import datetime -import websockets -from websockets.exceptions import ConnectionClosed, WebSocketException - -from config import settings - - -logger = logging.getLogger(__name__) - - -class BinanceWebSocketClient: - """ - Binance Futures WebSocket client with production-grade features: - - Auto-reconnection with exponential backoff - - Multi-stream subscription - - Heartbeat monitoring - - Graceful shutdown - """ - - def __init__( - self, - symbol: str, - on_message: Callable[[Dict[str, Any]], None], - on_error: Optional[Callable[[Exception], None]] = None, - ): - self.symbol = symbol.lower() - self.on_message = on_message - self.on_error = on_error - - self.ws: Optional[websockets.WebSocketClientProtocol] = None - self.is_running = False - self.reconnect_count = 0 - self.last_message_time = time.time() - - # Reconnection settings - self.reconnect_delay = settings.RECONNECT_INITIAL_DELAY - self.max_reconnect_delay = settings.RECONNECT_MAX_DELAY - self.reconnect_multiplier = settings.RECONNECT_MULTIPLIER - - # Build stream URL - self.ws_url = self._build_stream_url() - - def _build_stream_url(self) -> str: - """Build multi-stream WebSocket URL""" - streams = [] - - # Add multiple kline intervals - for interval in settings.kline_intervals_list: - streams.append(f"{self.symbol}@kline_{interval}") - - # Add depth and trade streams - streams.append(f"{self.symbol}@depth20@100ms") # Top 20 depth, 100ms updates - streams.append(f"{self.symbol}@aggTrade") # Aggregated trades - - stream_path = "/".join(streams) - url = f"{settings.BINANCE_WS_BASE_URL}/stream?streams={stream_path}" - logger.info(f"WebSocket URL: {url}") - logger.info(f"Subscribing to kline intervals: {', '.join(settings.kline_intervals_list)}") - return url - - async def connect(self) -> None: - """Establish WebSocket connection with retry logic""" - attempt = 0 - - while self.is_running: - try: - attempt += 1 - logger.info(f"Connecting to Binance WebSocket (attempt {attempt})...") - - async with websockets.connect( - self.ws_url, - ping_interval=20, # Send ping every 20s - ping_timeout=10, # Wait 10s for pong - close_timeout=10, - ) as websocket: - self.ws = websocket - self.reconnect_delay = settings.RECONNECT_INITIAL_DELAY - self.reconnect_count = 0 - - logger.info("✓ WebSocket connected successfully") - - # Message receiving loop - await self._receive_messages() - - except ConnectionClosed as e: - logger.warning(f"WebSocket connection closed: {e.code} - {e.reason}") - await self._handle_reconnect() - - except WebSocketException as e: - logger.error(f"WebSocket error: {e}") - if self.on_error: - self.on_error(e) - await self._handle_reconnect() - - except Exception as e: - logger.error(f"Unexpected error: {e}", exc_info=True) - if self.on_error: - self.on_error(e) - await self._handle_reconnect() - - finally: - self.ws = None - - logger.info("WebSocket client stopped") - - async def _receive_messages(self) -> None: - """Receive and process messages from WebSocket""" - if not self.ws: - return - - async for message in self.ws: - try: - self.last_message_time = time.time() - - # Parse JSON message - data = json.loads(message) - - # Handle combined stream format - if "stream" in data and "data" in data: - stream_name = data["stream"] - stream_data = data["data"] - - # Add metadata - stream_data["_stream"] = stream_name - stream_data["_received_at"] = datetime.utcnow().isoformat() - - # Process message - await self._process_message(stream_data) - else: - # Single stream format - data["_received_at"] = datetime.utcnow().isoformat() - await self._process_message(data) - - except json.JSONDecodeError as e: - logger.error(f"Failed to parse JSON: {e}, message: {message[:200]}") - except Exception as e: - logger.error(f"Error processing message: {e}", exc_info=True) - - async def _process_message(self, data: Dict[str, Any]) -> None: - """Process received message""" - try: - # Call user-defined message handler - if asyncio.iscoroutinefunction(self.on_message): - await self.on_message(data) - else: - self.on_message(data) - except Exception as e: - logger.error(f"Error in message handler: {e}", exc_info=True) - - async def _handle_reconnect(self) -> None: - """Handle reconnection with exponential backoff""" - if not self.is_running: - return - - self.reconnect_count += 1 - - # Check max attempts - if ( - settings.MAX_RECONNECT_ATTEMPTS > 0 - and self.reconnect_count > settings.MAX_RECONNECT_ATTEMPTS - ): - logger.error("Max reconnection attempts reached. Stopping client.") - self.is_running = False - return - - # Calculate delay with exponential backoff - delay = min( - self.reconnect_delay * (self.reconnect_multiplier ** (self.reconnect_count - 1)), - self.max_reconnect_delay, - ) - - logger.info(f"Reconnecting in {delay:.1f}s (attempt {self.reconnect_count})...") - await asyncio.sleep(delay) - - async def start(self) -> None: - """Start WebSocket client""" - if self.is_running: - logger.warning("Client is already running") - return - - self.is_running = True - logger.info("Starting WebSocket client...") - await self.connect() - - async def stop(self) -> None: - """Stop WebSocket client gracefully""" - logger.info("Stopping WebSocket client...") - self.is_running = False - - if self.ws: - await self.ws.close() - self.ws = None - - def is_healthy(self) -> bool: - """Check if client is healthy (receiving messages)""" - if not self.is_running or not self.ws: - return False - - # Check if we've received a message in the last 60 seconds - time_since_last_message = time.time() - self.last_message_time - return time_since_last_message < 60 diff --git a/docker-compose.yml b/docker-compose.yml index d3849c1..b0e988b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,79 +1,6 @@ version: '3.8' services: - # Redis - Message Stream Storage - redis: - image: redis:7.2-alpine - container_name: tradus-redis - ports: - - "6379:6379" - volumes: - - redis_data:/data - - ./redis.conf:/usr/local/etc/redis/redis.conf - command: redis-server /usr/local/etc/redis/redis.conf - healthcheck: - test: ["CMD", "redis-cli", "ping"] - interval: 10s - timeout: 3s - retries: 3 - start_period: 10s - networks: - - tradus-network - restart: unless-stopped - - # Binance WebSocket Ingestion Service - ingestion: - build: - context: . - dockerfile: Dockerfile - container_name: tradus-ingestion - env_file: .env - volumes: - - llm_gate_data:/app/data # LLM Gate 状态文件持久化 - environment: - # Binance Configuration - - BINANCE_WS_BASE_URL=wss://fstream.binance.com - - SYMBOL=btcusdt - - KLINE_INTERVAL=5m - - # Redis Configuration - - REDIS_HOST=redis - - REDIS_PORT=6379 - - REDIS_DB=0 - - REDIS_PASSWORD= - - # Stream Keys - - REDIS_STREAM_KLINE=binance:raw:kline:5m - - REDIS_STREAM_DEPTH=binance:raw:depth:20 - - REDIS_STREAM_TRADE=binance:raw:trade - - # Performance Tuning - - MAX_BUFFER_SIZE=1000 - - RATE_LIMIT_MESSAGES_PER_SEC=1000 - - DEDUP_CACHE_SIZE=10000 - - REDIS_STREAM_MAXLEN=10000 - - # Reconnection Strategy - - RECONNECT_INITIAL_DELAY=1.0 - - RECONNECT_MAX_DELAY=60.0 - - MAX_RECONNECT_ATTEMPTS=100 - - # Monitoring - - HEALTH_CHECK_INTERVAL=30 - - LOG_LEVEL=INFO - - depends_on: - redis: - condition: service_healthy - networks: - - tradus-network - restart: unless-stopped - logging: - driver: "json-file" - options: - max-size: "10m" - max-file: "3" - # Signal Generator Scheduler - 定时生成交易信号 scheduler: build: @@ -83,14 +10,10 @@ services: command: python -u scheduler.py env_file: .env volumes: - - llm_gate_data:/app/data # 共享 LLM Gate 状态 - ./output:/app/output # 输出信号文件 environment: - # Redis Configuration - - REDIS_HOST=redis - - REDIS_PORT=6379 - - REDIS_DB=0 - - REDIS_PASSWORD= + # Symbol Configuration + - SYMBOL=BTCUSDT # Signal generation interval - SIGNAL_INTERVAL_MINUTES=15 # 每15分钟生成一次信号 @@ -99,9 +22,6 @@ services: - LOG_LEVEL=INFO - depends_on: - redis: - condition: service_healthy networks: - tradus-network restart: unless-stopped @@ -110,30 +30,6 @@ services: options: max-size: "10m" max-file: "3" - profiles: - - scheduler # Only start with: docker-compose --profile scheduler up - - # Redis Commander - Optional Web UI for Redis - redis-commander: - image: rediscommander/redis-commander:latest - container_name: tradus-redis-ui - environment: - - REDIS_HOSTS=local:redis:6379 - ports: - - "8081:8081" - depends_on: - - redis - networks: - - tradus-network - restart: unless-stopped - profiles: - - debug # Only start with: docker-compose --profile debug up - -volumes: - redis_data: - driver: local - llm_gate_data: - driver: local networks: tradus-network: diff --git a/main.py b/main.py deleted file mode 100644 index 6f4ef83..0000000 --- a/main.py +++ /dev/null @@ -1,236 +0,0 @@ -""" -Main application: Binance WebSocket to Redis Stream ingestion pipeline -""" -import asyncio -import logging -import signal -import sys -from typing import Dict, Any - -from config import settings -from core import ( - BinanceWebSocketClient, - RedisStreamWriter, - MessageDeduplicator, - BufferedMessageProcessor, -) - - -# Configure logging -logging.basicConfig( - level=getattr(logging, settings.LOG_LEVEL), - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - handlers=[ - logging.StreamHandler(sys.stdout), - ], -) - -logger = logging.getLogger(__name__) - - -class IngestionPipeline: - """ - Main ingestion pipeline orchestrator. - - Coordinates: - - WebSocket client - - Message deduplication - - Buffering and rate limiting - - Redis Stream writing - - Health monitoring - """ - - def __init__(self): - self.ws_client: BinanceWebSocketClient = None - self.redis_writer = RedisStreamWriter() - self.deduplicator = MessageDeduplicator() - self.buffer_processor = BufferedMessageProcessor() - - self.is_running = False - self.tasks = [] - - async def on_message(self, message: Dict[str, Any]) -> None: - """ - Handle incoming WebSocket message - - Args: - message: Raw message from WebSocket - """ - try: - # Check for duplicates - if self.deduplicator.is_duplicate(message): - logger.debug(f"Duplicate message filtered: {message.get('E')}") - return - - # Add to buffer (with overflow protection) - success = await self.buffer_processor.add_message(message) - if not success: - logger.warning("Message dropped due to buffer overflow") - - except Exception as e: - logger.error(f"Error in message handler: {e}", exc_info=True) - - async def process_messages(self) -> None: - """Background task to process buffered messages""" - logger.info("Starting message processor...") - - while self.is_running: - try: - # Get batch of messages - batch = await self.buffer_processor.get_batch(timeout=1.0) - - if not batch: - await asyncio.sleep(0.1) - continue - - # Write batch to Redis - written = await self.redis_writer.write_batch(batch) - if written > 0: - logger.debug(f"Wrote {written} messages to Redis") - - # Check buffer health - if self.buffer_processor.is_buffer_critical(): - logger.warning( - f"Buffer usage critical: " - f"{self.buffer_processor.get_buffer_usage():.1%}" - ) - - except Exception as e: - logger.error(f"Error processing messages: {e}", exc_info=True) - await asyncio.sleep(1) - - logger.info("Message processor stopped") - - async def monitor_health(self) -> None: - """Background task to monitor system health""" - logger.info("Starting health monitor...") - - while self.is_running: - try: - await asyncio.sleep(settings.HEALTH_CHECK_INTERVAL) - - # Check WebSocket health - ws_healthy = self.ws_client.is_healthy() if self.ws_client else False - - # Check Redis health - redis_healthy = await self.redis_writer.health_check() - - # Get statistics - dedup_stats = self.deduplicator.get_stats() - buffer_stats = self.buffer_processor.get_stats() - redis_stats = self.redis_writer.get_stats() - - # Log health status - logger.info( - f"Health Check | " - f"WebSocket: {'✓' if ws_healthy else '✗'} | " - f"Redis: {'✓' if redis_healthy else '✗'} | " - f"Buffer: {buffer_stats['buffer_usage']} | " - f"Dedup: {dedup_stats['duplicate_rate']} | " - f"Written: {redis_stats['messages_written']}" - ) - - # Alert if unhealthy - if not ws_healthy: - logger.error("WebSocket connection is unhealthy!") - if not redis_healthy: - logger.error("Redis connection is unhealthy!") - - except Exception as e: - logger.error(f"Error in health monitor: {e}", exc_info=True) - - logger.info("Health monitor stopped") - - async def start(self) -> None: - """Start ingestion pipeline""" - logger.info("=" * 60) - logger.info("Starting Binance Real-time Data Ingestion Pipeline") - logger.info("=" * 60) - logger.info(f"Symbol: {settings.SYMBOL.upper()}") - logger.info(f"Kline Intervals: {', '.join(settings.kline_intervals_list)}") - logger.info(f"Redis Host: {settings.REDIS_HOST}:{settings.REDIS_PORT}") - logger.info("=" * 60) - - self.is_running = True - - try: - # Connect to Redis - logger.info("Connecting to Redis...") - await self.redis_writer.connect() - - # Initialize WebSocket client - self.ws_client = BinanceWebSocketClient( - symbol=settings.SYMBOL, - on_message=self.on_message, - ) - - # Start background tasks - logger.info("Starting background tasks...") - self.tasks = [ - asyncio.create_task(self.ws_client.start()), - asyncio.create_task(self.process_messages()), - asyncio.create_task(self.monitor_health()), - ] - - # Wait for all tasks - await asyncio.gather(*self.tasks) - - except Exception as e: - logger.error(f"Fatal error in pipeline: {e}", exc_info=True) - await self.stop() - - async def stop(self) -> None: - """Stop ingestion pipeline gracefully""" - logger.info("Stopping ingestion pipeline...") - self.is_running = False - - # Stop WebSocket client - if self.ws_client: - await self.ws_client.stop() - - # Cancel background tasks - for task in self.tasks: - if not task.done(): - task.cancel() - - # Wait for tasks to complete - if self.tasks: - await asyncio.gather(*self.tasks, return_exceptions=True) - - # Close Redis connection - await self.redis_writer.close() - - # Print final statistics - logger.info("=" * 60) - logger.info("Final Statistics:") - logger.info(f"Deduplication: {self.deduplicator.get_stats()}") - logger.info(f"Buffer: {self.buffer_processor.get_stats()}") - logger.info(f"Redis: {self.redis_writer.get_stats()}") - logger.info("=" * 60) - logger.info("Pipeline stopped successfully") - - -async def main(): - """Main entry point""" - pipeline = IngestionPipeline() - - # Setup signal handlers for graceful shutdown - def signal_handler(sig, frame): - logger.info(f"Received signal {sig}, shutting down...") - asyncio.create_task(pipeline.stop()) - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - # Start pipeline - await pipeline.start() - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - logger.info("Interrupted by user") - except Exception as e: - logger.error(f"Fatal error: {e}", exc_info=True) - sys.exit(1) diff --git a/output/latest_signal.json b/output/latest_signal.json index e8180fd..ebd5de0 100755 --- a/output/latest_signal.json +++ b/output/latest_signal.json @@ -1,34 +1,32 @@ { - "timestamp": "2025-12-03T16:25:01.783326", "aggregated_signal": { - "timestamp": "2025-12-03T16:25:01.781521", + "timestamp": "2025-12-04T01:26:44.257404", "final_signal": "HOLD", - "final_confidence": 0.58, + "final_confidence": 0.28, "consensus": "CONSENSUS_HOLD", - "agreement_score": 0.58, + "agreement_score": 0.28, "quantitative_signal": { "signal_type": "HOLD", "signal": "HOLD", - "confidence": 0.5, - "composite_score": -12.8, + "confidence": 0.0, + "composite_score": 33.2, "scores": { "trend": -23.1, - "momentum": -65, - "orderflow": 46.3, + "momentum": 65, + "orderflow": 100, "breakout": 0 } }, "llm_signal": { "signal_type": "HOLD", "signal": "HOLD", - "confidence": 0.67, - "reasoning": "多周期综合分析显示市场处于关键分歧点。短期(1h)与超短期(5m/15m)趋势不一致,中期(4h与1d)趋势完全相反,长期(1d与1w)趋势也存在矛盾。各周期未能形成共振,市场缺乏统一方向,呈现震荡格局。当前价格位于多个周期的关键位之间,方向选择有待确认。", + "confidence": 0.55, + "reasoning": "多周期综合分析显示市场处于关键抉择期。短期(5m-1h)陷入无序震荡,缺乏交易价值。中期(4h-1d)在$90,156-$93,932构建震荡平台,MACD有修复迹象,倾向于在支撑位附近寻找低吸机会。长期(1d-1w)仍处于自11月高点以来的大级别盘整中,周线上涨趋势未改但动能减弱。当前核心矛盾是中期震荡与长期趋势的共振点尚未出现,需等待更明确的突破信号。风险主要来自震荡区间内的假突破和低成交量下的价格异动。", "key_factors": [ - "多周期趋势严重分歧,方向不明", - "成交量普遍萎缩,市场动能不足", - "价格位于关键支撑与压力区间内震荡", - "大周期(日线、周线)MACD信号矛盾", - "市场等待突破以选择后续方向" + "4小时及日线级别宽幅震荡区间的突破方向", + "成交量能否在关键价位有效放大", + "日线MACD能否形成金叉确认反弹", + "周线RSI(40.7)能否回升至50中性区域以上" ], "opportunities": { "short_term_5m_15m_1h": { @@ -37,15 +35,15 @@ "entry_price": 0, "stop_loss": 0, "take_profit": 0, - "reasoning": "短期周期趋势分歧。5m和15m显示下跌趋势(量化评分-24.3),但1h显示上涨趋势(量化评分23.4)。动量指标(MACD死叉)与部分趋势信号矛盾,RSI处于中性区域,成交量萎缩,缺乏明确的共振入场信号。价格在1h支撑($91,637)和压力($92,273)之间震荡,方向不明。" + "reasoning": "当前价格$92,488.90位于短期震荡区间中部。5分钟和15分钟周期趋势为弱势下跌,但1小时周期为强势上涨,多周期信号矛盾。RSI和MACD指标均呈中性或弱信号,成交量缩量,缺乏明确的短期方向性突破动能。预期盈利空间不足1%,建议观望。" }, "medium_term_4h_1d": { - "exists": false, - "direction": null, - "entry_price": 0, - "stop_loss": 0, - "take_profit": 0, - "reasoning": "中期周期趋势严重分歧。4h周期显示上涨趋势(量化评分29.3,趋势强度moderate,RSI 60.2强势),但1d周期显示下跌趋势(量化评分-23.4,趋势强度strong)。MACD信号不一致(4h金叉收窄,1d金叉扩大),价格接近4h压力位($93,080)但未突破。成交量萎缩,市场缺乏明确的波段方向动能,建议观望等待趋势统一。" + "exists": true, + "direction": "LONG", + "entry_price": 91500.0, + "stop_loss": 90100.0, + "take_profit": 94500.0, + "reasoning": "4小时和日线图显示价格在$90,156-$93,932区间内宽幅震荡。当前价格接近区间中下部。日线MACD死叉收窄,有潜在底背离迹象。若价格能回踩并站稳$91,500(近期多次反弹的支撑位)上方,可视为中期做多机会,目标看向区间上沿$94,500附近,盈利空间约3.2%。" }, "long_term_1d_1w": { "exists": false, @@ -53,12 +51,12 @@ "entry_price": 0, "stop_loss": 0, "take_profit": 0, - "reasoning": "长期周期趋势存在分歧。1d周期显示下跌趋势(趋势强度strong),而1w周期显示上涨趋势(趋势强度moderate)。量化评分方向不一致(1d: -23.4, 1w: 11.8)。周线MACD仍为死叉(尽管收窄),日线MACD为金叉,信号矛盾。价格位于周线支撑($91,130)上方,但未形成明确的大周期共振趋势,缺乏长期布局的清晰入场点。" + "reasoning": "周线趋势虽为上涨,但RSI(40.7)偏弱,且价格仍处于11月以来的宽幅震荡区间($83,786 - $101,450)内。日线级别趋势不明,缺乏明确的长期趋势启动信号。当前价格位于区间中部,直接追涨或杀跌的风险回报比不佳,建议等待更明确的突破信号。" }, "ambush": { "exists": true, - "price_level": 91130.0, - "reasoning": "基于周线关键支撑位$91,130设置埋伏。若价格回调至此位置,并伴随1h或4h周期出现明确的反弹反转信号(如RSI超卖反弹、MACD金叉、放量),可考虑分批布局多单,博弈周线级别上涨趋势的延续。" + "price_level": 90100.0, + "reasoning": "基于4小时和日线K线数据,$90,100-$90,156区域是近期多次测试的关键支撑区间(12月1日、12月3日低点)。若价格因市场情绪再次回落至此区域并出现企稳迹象(如长下影线、成交量放大),是风险可控的埋伏做多点位,止损可设在$88,900下方。" }, "intraday": { "exists": false, @@ -66,84 +64,104 @@ "entry_price": 0, "stop_loss": 0, "take_profit": 0, - "reasoning": "短期周期趋势分歧。5m和15m显示下跌趋势(量化评分-24.3),但1h显示上涨趋势(量化评分23.4)。动量指标(MACD死叉)与部分趋势信号矛盾,RSI处于中性区域,成交量萎缩,缺乏明确的共振入场信号。价格在1h支撑($91,637)和压力($92,273)之间震荡,方向不明。" + "reasoning": "当前价格$92,488.90位于短期震荡区间中部。5分钟和15分钟周期趋势为弱势下跌,但1小时周期为强势上涨,多周期信号矛盾。RSI和MACD指标均呈中性或弱信号,成交量缩量,缺乏明确的短期方向性突破动能。预期盈利空间不足1%,建议观望。" }, "swing": { - "exists": false, - "direction": null, - "entry_price": 0, - "stop_loss": 0, - "take_profit": 0, - "reasoning": "中期周期趋势严重分歧。4h周期显示上涨趋势(量化评分29.3,趋势强度moderate,RSI 60.2强势),但1d周期显示下跌趋势(量化评分-23.4,趋势强度strong)。MACD信号不一致(4h金叉收窄,1d金叉扩大),价格接近4h压力位($93,080)但未突破。成交量萎缩,市场缺乏明确的波段方向动能,建议观望等待趋势统一。" + "exists": true, + "direction": "LONG", + "entry_price": 91500.0, + "stop_loss": 90100.0, + "take_profit": 94500.0, + "reasoning": "4小时和日线图显示价格在$90,156-$93,932区间内宽幅震荡。当前价格接近区间中下部。日线MACD死叉收窄,有潜在底背离迹象。若价格能回踩并站稳$91,500(近期多次反弹的支撑位)上方,可视为中期做多机会,目标看向区间上沿$94,500附近,盈利空间约3.2%。" } }, "recommendations_by_timeframe": { - "short_term": "短期(5m/15m/1h)操作建议:观望。当前5m/15m与1h趋势方向矛盾,市场处于震荡整理状态,缺乏清晰的日内交易机会。可关注价格对1h支撑$91,637和压力$92,273的突破情况,等待小周期形成共振后再考虑入场。", - "medium_term": "中期(4h/1d)操作建议:观望。4h看涨与1d看跌形成强烈分歧,市场方向不明。建议等待价格有效突破4h压力$93,080(确认中期转强)或跌破4h支撑$90,612(确认中期转弱),并结合成交量放大信号,再寻找波段交易机会。", - "long_term": "长期(1d/1w)操作建议:观望。日线与周线趋势不一致,长期趋势未明朗。可关注周线支撑$91,130的防守情况。若价格能站稳该支撑并推动日线趋势转涨,形成大周期共振,则可能开启长期上涨趋势;反之,若跌破,则长期趋势可能转弱。目前宜耐心等待更明确的趋势信号。" + "short_term": "短期(5m/15m/1h)建议观望。价格处于无趋势震荡中,技术指标矛盾,日内交易缺乏明确的、盈利空间≥1%的机会。避免在$92,000-$93,000区间内频繁操作。", + "medium_term": "中期(4h/1d)可关注回调做多机会。等待价格回落至$91,500附近企稳后分批布局,止损设于$90,100下方,目标看向$94,500。若直接向上突破$93,000并站稳,可轻仓追多,目标$94,500。", + "long_term": "长期(1d/1w)建议继续持有现有仓位或保持观望。需等待价格有效突破$94,000(确认短期强势)或跌破$89,000(确认转弱)来明确大方向。在方向明确前,不宜进行大规模长期仓位调整。" }, "trade_type": "MULTI_TIMEFRAME", "risk_level": "MEDIUM" }, "levels": { - "current_price": 92028.4, - "entry": 92028.4, - "stop_loss": 92028.4, - "take_profit_1": 92028.4, - "take_profit_2": 92028.4, - "take_profit_3": 92028.4 + "current_price": 92485.5, + "entry": 91991.05, + "stop_loss": 91291.05, + "take_profit_1": 93491.05, + "take_profit_2": 93491.05, + "take_profit_3": 93491.05, + "entry_range": { + "quant": 92482.1, + "llm": 91500.0, + "diff_pct": 1.07 + }, + "stop_loss_range": { + "quant": 92482.1, + "llm": 90100.0, + "diff_pct": 2.61 + }, + "take_profit_1_range": { + "quant": 92482.1, + "llm": 94500.0, + "diff_pct": 2.16 + } }, - "risk_reward_ratio": 0, + "risk_reward_ratio": 2.14, "recommendation": "量化和AI分析均建议观望,等待更好的机会", - "warnings": [] + "warnings": [ + "⚠️ 量化和AI信号严重分歧,建议观望", + "⚠️ 量化信号置信度较低", + "⚠️ stop_loss建议差异较大: 量化$92482.10 vs AI$90100.00 (2.6%)" + ] }, "market_analysis": { - "price": 92028.4, + "price": 92482.1, "trend": { "direction": "下跌", "strength": "weak", - "phase": "下跌中", - "adx": 12.9, + "phase": "下跌后反弹", + "adx": 9.8, "ema_alignment": "bearish" }, "momentum": { - "rsi": 44.0, - "rsi_status": "中性偏弱", - "rsi_trend": "下降中", - "macd_signal": "死叉扩大", - "macd_hist": -23.7636 + "rsi": 51.8, + "rsi_status": "中性偏强", + "rsi_trend": "上升中", + "macd_signal": "金叉扩大", + "macd_hist": 24.4447 } }, "quantitative_signal": { - "timestamp": "2025-12-03T16:24:23.426153", + "timestamp": "2025-12-04T01:26:02.011873", "signal_type": "HOLD", - "signal_strength": 0.13, - "composite_score": -12.8, - "confidence": 0.5, - "consensus_score": 0.7, + "signal_strength": 0.33, + "composite_score": 33.2, + "confidence": 0.0, + "consensus_score": 0.55, + "profit_pct": 0, "scores": { "trend": -23.1, - "momentum": -65, - "orderflow": 46.3, + "momentum": 65, + "orderflow": 100, "breakout": 0 }, "levels": { - "current_price": 92028.4, - "entry": 92028.4, - "stop_loss": 92028.4, - "take_profit_1": 92028.4, - "take_profit_2": 92028.4, - "take_profit_3": 92028.4 + "current_price": 92482.1, + "entry": 92482.1, + "stop_loss": 92482.1, + "take_profit_1": 92482.1, + "take_profit_2": 92482.1, + "take_profit_3": 92482.1 }, "risk_reward_ratio": 0, - "reasoning": "趋势下跌 (weak); RSI=44; MACD 死叉扩大; 订单流: 强买方主导" + "reasoning": "趋势下跌 (weak); RSI=52; MACD 金叉扩大; 订单流: 强买方主导" }, "llm_signal": { - "timestamp": "2025-12-03T16:25:01.781138", + "timestamp": "2025-12-04T01:26:44.257201", "signal_type": "HOLD", - "confidence": 0.67, + "confidence": 0.55, "trade_type": "MULTI_TIMEFRAME", - "reasoning": "多周期综合分析显示市场处于关键分歧点。短期(1h)与超短期(5m/15m)趋势不一致,中期(4h与1d)趋势完全相反,长期(1d与1w)趋势也存在矛盾。各周期未能形成共振,市场缺乏统一方向,呈现震荡格局。当前价格位于多个周期的关键位之间,方向选择有待确认。", + "reasoning": "多周期综合分析显示市场处于关键抉择期。短期(5m-1h)陷入无序震荡,缺乏交易价值。中期(4h-1d)在$90,156-$93,932构建震荡平台,MACD有修复迹象,倾向于在支撑位附近寻找低吸机会。长期(1d-1w)仍处于自11月高点以来的大级别盘整中,周线上涨趋势未改但动能减弱。当前核心矛盾是中期震荡与长期趋势的共振点尚未出现,需等待更明确的突破信号。风险主要来自震荡区间内的假突破和低成交量下的价格异动。", "opportunities": { "short_term_5m_15m_1h": { "exists": false, @@ -151,15 +169,15 @@ "entry_price": 0, "stop_loss": 0, "take_profit": 0, - "reasoning": "短期周期趋势分歧。5m和15m显示下跌趋势(量化评分-24.3),但1h显示上涨趋势(量化评分23.4)。动量指标(MACD死叉)与部分趋势信号矛盾,RSI处于中性区域,成交量萎缩,缺乏明确的共振入场信号。价格在1h支撑($91,637)和压力($92,273)之间震荡,方向不明。" + "reasoning": "当前价格$92,488.90位于短期震荡区间中部。5分钟和15分钟周期趋势为弱势下跌,但1小时周期为强势上涨,多周期信号矛盾。RSI和MACD指标均呈中性或弱信号,成交量缩量,缺乏明确的短期方向性突破动能。预期盈利空间不足1%,建议观望。" }, "medium_term_4h_1d": { - "exists": false, - "direction": null, - "entry_price": 0, - "stop_loss": 0, - "take_profit": 0, - "reasoning": "中期周期趋势严重分歧。4h周期显示上涨趋势(量化评分29.3,趋势强度moderate,RSI 60.2强势),但1d周期显示下跌趋势(量化评分-23.4,趋势强度strong)。MACD信号不一致(4h金叉收窄,1d金叉扩大),价格接近4h压力位($93,080)但未突破。成交量萎缩,市场缺乏明确的波段方向动能,建议观望等待趋势统一。" + "exists": true, + "direction": "LONG", + "entry_price": 91500.0, + "stop_loss": 90100.0, + "take_profit": 94500.0, + "reasoning": "4小时和日线图显示价格在$90,156-$93,932区间内宽幅震荡。当前价格接近区间中下部。日线MACD死叉收窄,有潜在底背离迹象。若价格能回踩并站稳$91,500(近期多次反弹的支撑位)上方,可视为中期做多机会,目标看向区间上沿$94,500附近,盈利空间约3.2%。" }, "long_term_1d_1w": { "exists": false, @@ -167,12 +185,12 @@ "entry_price": 0, "stop_loss": 0, "take_profit": 0, - "reasoning": "长期周期趋势存在分歧。1d周期显示下跌趋势(趋势强度strong),而1w周期显示上涨趋势(趋势强度moderate)。量化评分方向不一致(1d: -23.4, 1w: 11.8)。周线MACD仍为死叉(尽管收窄),日线MACD为金叉,信号矛盾。价格位于周线支撑($91,130)上方,但未形成明确的大周期共振趋势,缺乏长期布局的清晰入场点。" + "reasoning": "周线趋势虽为上涨,但RSI(40.7)偏弱,且价格仍处于11月以来的宽幅震荡区间($83,786 - $101,450)内。日线级别趋势不明,缺乏明确的长期趋势启动信号。当前价格位于区间中部,直接追涨或杀跌的风险回报比不佳,建议等待更明确的突破信号。" }, "ambush": { "exists": true, - "price_level": 91130.0, - "reasoning": "基于周线关键支撑位$91,130设置埋伏。若价格回调至此位置,并伴随1h或4h周期出现明确的反弹反转信号(如RSI超卖反弹、MACD金叉、放量),可考虑分批布局多单,博弈周线级别上涨趋势的延续。" + "price_level": 90100.0, + "reasoning": "基于4小时和日线K线数据,$90,100-$90,156区域是近期多次测试的关键支撑区间(12月1日、12月3日低点)。若价格因市场情绪再次回落至此区域并出现企稳迹象(如长下影线、成交量放大),是风险可控的埋伏做多点位,止损可设在$88,900下方。" }, "intraday": { "exists": false, @@ -180,39 +198,38 @@ "entry_price": 0, "stop_loss": 0, "take_profit": 0, - "reasoning": "短期周期趋势分歧。5m和15m显示下跌趋势(量化评分-24.3),但1h显示上涨趋势(量化评分23.4)。动量指标(MACD死叉)与部分趋势信号矛盾,RSI处于中性区域,成交量萎缩,缺乏明确的共振入场信号。价格在1h支撑($91,637)和压力($92,273)之间震荡,方向不明。" + "reasoning": "当前价格$92,488.90位于短期震荡区间中部。5分钟和15分钟周期趋势为弱势下跌,但1小时周期为强势上涨,多周期信号矛盾。RSI和MACD指标均呈中性或弱信号,成交量缩量,缺乏明确的短期方向性突破动能。预期盈利空间不足1%,建议观望。" }, "swing": { - "exists": false, - "direction": null, - "entry_price": 0, - "stop_loss": 0, - "take_profit": 0, - "reasoning": "中期周期趋势严重分歧。4h周期显示上涨趋势(量化评分29.3,趋势强度moderate,RSI 60.2强势),但1d周期显示下跌趋势(量化评分-23.4,趋势强度strong)。MACD信号不一致(4h金叉收窄,1d金叉扩大),价格接近4h压力位($93,080)但未突破。成交量萎缩,市场缺乏明确的波段方向动能,建议观望等待趋势统一。" + "exists": true, + "direction": "LONG", + "entry_price": 91500.0, + "stop_loss": 90100.0, + "take_profit": 94500.0, + "reasoning": "4小时和日线图显示价格在$90,156-$93,932区间内宽幅震荡。当前价格接近区间中下部。日线MACD死叉收窄,有潜在底背离迹象。若价格能回踩并站稳$91,500(近期多次反弹的支撑位)上方,可视为中期做多机会,目标看向区间上沿$94,500附近,盈利空间约3.2%。" } }, "recommendations_by_timeframe": { - "short_term": "短期(5m/15m/1h)操作建议:观望。当前5m/15m与1h趋势方向矛盾,市场处于震荡整理状态,缺乏清晰的日内交易机会。可关注价格对1h支撑$91,637和压力$92,273的突破情况,等待小周期形成共振后再考虑入场。", - "medium_term": "中期(4h/1d)操作建议:观望。4h看涨与1d看跌形成强烈分歧,市场方向不明。建议等待价格有效突破4h压力$93,080(确认中期转强)或跌破4h支撑$90,612(确认中期转弱),并结合成交量放大信号,再寻找波段交易机会。", - "long_term": "长期(1d/1w)操作建议:观望。日线与周线趋势不一致,长期趋势未明朗。可关注周线支撑$91,130的防守情况。若价格能站稳该支撑并推动日线趋势转涨,形成大周期共振,则可能开启长期上涨趋势;反之,若跌破,则长期趋势可能转弱。目前宜耐心等待更明确的趋势信号。" + "short_term": "短期(5m/15m/1h)建议观望。价格处于无趋势震荡中,技术指标矛盾,日内交易缺乏明确的、盈利空间≥1%的机会。避免在$92,000-$93,000区间内频繁操作。", + "medium_term": "中期(4h/1d)可关注回调做多机会。等待价格回落至$91,500附近企稳后分批布局,止损设于$90,100下方,目标看向$94,500。若直接向上突破$93,000并站稳,可轻仓追多,目标$94,500。", + "long_term": "长期(1d/1w)建议继续持有现有仓位或保持观望。需等待价格有效突破$94,000(确认短期强势)或跌破$89,000(确认转弱)来明确大方向。在方向明确前,不宜进行大规模长期仓位调整。" }, "levels": { - "current_price": 92028.4, - "entry": 92028.4, - "stop_loss": 0, - "take_profit_1": 0, - "take_profit_2": 0, - "take_profit_3": 0 + "current_price": 92488.9, + "entry": 91500.0, + "stop_loss": 90100.0, + "take_profit_1": 94500.0, + "take_profit_2": 94500.0, + "take_profit_3": 94500.0 }, "risk_level": "MEDIUM", "key_factors": [ - "多周期趋势严重分歧,方向不明", - "成交量普遍萎缩,市场动能不足", - "价格位于关键支撑与压力区间内震荡", - "大周期(日线、周线)MACD信号矛盾", - "市场等待突破以选择后续方向" + "4小时及日线级别宽幅震荡区间的突破方向", + "成交量能否在关键价位有效放大", + "日线MACD能否形成金叉确认反弹", + "周线RSI(40.7)能否回升至50中性区域以上" ], - "raw_response": "```json\n{\n \"signal\": \"HOLD\",\n \"confidence\": 0.67,\n \"opportunities\": {\n \"short_term_5m_15m_1h\": {\n \"exists\": false,\n \"timeframe_label\": \"短期 (5m/15m/1h)\",\n \"direction\": null,\n \"entry_price\": null,\n \"stop_loss\": null,\n \"take_profit\": null,\n \"reasoning\": \"短期周期趋势分歧。5m和15m显示下跌趋势(量化评分-24.3),但1h显示上涨趋势(量化评分23.4)。动量指标(MACD死叉)与部分趋势信号矛盾,RSI处于中性区域,成交量萎缩,缺乏明确的共振入场信号。价格在1h支撑($91,637)和压力($92,273)之间震荡,方向不明。\",\n \"持仓时间\": \"几分钟到几小时\"\n },\n \"medium_term_4h_1d\": {\n \"exists\": false,\n \"timeframe_label\": \"中期 (4h/1d)\",\n \"direction\": null,\n \"entry_price\": null,\n \"stop_loss\": null,\n \"take_profit\": null,\n \"reasoning\": \"中期周期趋势严重分歧。4h周期显示上涨趋势(量化评分29.3,趋势强度moderate,RSI 60.2强势),但1d周期显示下跌趋势(量化评分-23.4,趋势强度strong)。MACD信号不一致(4h金叉收窄,1d金叉扩大),价格接近4h压力位($93,080)但未突破。成交量萎缩,市场缺乏明确的波段方向动能,建议观望等待趋势统一。\",\n \"持仓时间\": \"数天到一周\"\n },\n \"long_term_1d_1w\": {\n \"exists\": false,\n \"timeframe_label\": \"长期 (1d/1w)\",\n \"direction\": null,\n \"entry_price\": null,\n \"stop_loss\": null,\n \"take_profit\": null,\n \"reasoning\": \"长期周期趋势存在分歧。1d周期显示下跌趋势(趋势强度strong),而1w周期显示上涨趋势(趋势强度moderate)。量化评分方向不一致(1d: -23.4, 1w: 11.8)。周线MACD仍为死叉(尽管收窄),日线MACD为金叉,信号矛盾。价格位于周线支撑($91,130)上方,但未形成明确的大周期共振趋势,缺乏长期布局的清晰入场点。\",\n \"持仓时间\": \"数周到数月\"\n },\n \"ambush\": {\n \"exists\": true,\n \"price_level\": 91130.0,\n \"reasoning\": \"基于周线关键支撑位$91,130设置埋伏。若价格回调至此位置,并伴随1h或4h周期出现明确的反弹反转信号(如RSI超卖反弹、MACD金叉、放量),可考虑分批布局多单,博弈周线级别上涨趋势的延续。\",\n \"timeframe\": \"基于周线(1w)的关键支撑位\"\n }\n },\n \"recommendations_by_timeframe\": {\n \"short_term\": \"短期(5m/15m/1h)操作建议:观望。当前5m/15m与1h趋势方向矛盾,市场处于震荡整理状态,缺乏清晰的日内交易机会。可关注价格对1h支撑$91,637和压力$92,273的突破情况,等待小周期形成共振后再考虑入场。\",\n \"medium_term\": \"中期(4h/1d)操作建议:观望。4h看涨与1d看跌形成强烈分歧,市场方向不明。建议等待价格有效突破4h压力$93,080(确认中期转强)或跌破4h支撑$90,612(确认中期转弱),并结合成交量放大信号,再寻找波段交易机会。\",\n \"long_term\": \"长期(1d/1w)操作建议:观望。日线与周线趋势不一致,长期趋势未明朗。可关注周线支撑$91,130的防守情况。若价格能站稳该支撑并推动日线趋势转涨,形成大周期共振,则可能开启长期上涨趋势;反之,若跌破,则长期趋势可能转弱。目前宜耐心等待更明确的趋势信号。\"\n },\n \"reasoning\": \"多周期综合分析显示市场处于关键分歧点。短期(1h)与超短期(5m/15m)趋势不一致,中期(4h与1d)趋势完全相反,长期(1d与1w)趋势也存在矛盾。各周期未能形成共振,市场缺乏统一方向,呈现震荡格局。当前价格位于多个周期的关键位之间,方向选择有待确认。\",\n \"risk_level\": \"MEDIUM\",\n \"key_factors\": [\"多周期趋势严重分歧,方向不明\", \"成交量普遍萎缩,市场动能不足\", \"价格位于关键支撑与压力区间内震荡\", \"大周期(日线、周线)MACD信号矛盾\", \"市场等待突破以选择后续方向\"]\n}\n```", - "risk_reward_ratio": 0 + "raw_response": "```json\n{\n \"signal\": \"HOLD\",\n \"confidence\": 0.55,\n \"key_levels\": {\n \"short_term\": {\n \"support\": [91637.0, 90906.0, 90643.0],\n \"resistance\": [92488.0, 92841.0, 93068.0]\n },\n \"medium_term\": {\n \"support\": [90156.0, 89225.0, 86970.0],\n \"resistance\": [93932.0, 94544.0, 95560.0]\n },\n \"long_term\": {\n \"support\": [80600.0, 83786.0, 86066.0],\n \"resistance\": [101450.0, 104670.0, 107473.0]\n }\n },\n \"opportunities\": {\n \"short_term_5m_15m_1h\": {\n \"exists\": false,\n \"direction\": null,\n \"entry_price\": null,\n \"stop_loss\": null,\n \"take_profit\": null,\n \"reasoning\": \"当前价格$92,488.90位于短期震荡区间中部。5分钟和15分钟周期趋势为弱势下跌,但1小时周期为强势上涨,多周期信号矛盾。RSI和MACD指标均呈中性或弱信号,成交量缩量,缺乏明确的短期方向性突破动能。预期盈利空间不足1%,建议观望。\"\n },\n \"medium_term_4h_1d\": {\n \"exists\": true,\n \"direction\": \"LONG\",\n \"entry_price\": 91500.0,\n \"stop_loss\": 90100.0,\n \"take_profit\": 94500.0,\n \"reasoning\": \"4小时和日线图显示价格在$90,156-$93,932区间内宽幅震荡。当前价格接近区间中下部。日线MACD死叉收窄,有潜在底背离迹象。若价格能回踩并站稳$91,500(近期多次反弹的支撑位)上方,可视为中期做多机会,目标看向区间上沿$94,500附近,盈利空间约3.2%。\"\n },\n \"long_term_1d_1w\": {\n \"exists\": false,\n \"direction\": null,\n \"entry_price\": null,\n \"stop_loss\": null,\n \"take_profit\": null,\n \"reasoning\": \"周线趋势虽为上涨,但RSI(40.7)偏弱,且价格仍处于11月以来的宽幅震荡区间($83,786 - $101,450)内。日线级别趋势不明,缺乏明确的长期趋势启动信号。当前价格位于区间中部,直接追涨或杀跌的风险回报比不佳,建议等待更明确的突破信号。\"\n },\n \"ambush\": {\n \"exists\": true,\n \"price_level\": 90100.0,\n \"reasoning\": \"基于4小时和日线K线数据,$90,100-$90,156区域是近期多次测试的关键支撑区间(12月1日、12月3日低点)。若价格因市场情绪再次回落至此区域并出现企稳迹象(如长下影线、成交量放大),是风险可控的埋伏做多点位,止损可设在$88,900下方。\"\n }\n },\n \"recommendations_by_timeframe\": {\n \"short_term\": \"短期(5m/15m/1h)建议观望。价格处于无趋势震荡中,技术指标矛盾,日内交易缺乏明确的、盈利空间≥1%的机会。避免在$92,000-$93,000区间内频繁操作。\",\n \"medium_term\": \"中期(4h/1d)可关注回调做多机会。等待价格回落至$91,500附近企稳后分批布局,止损设于$90,100下方,目标看向$94,500。若直接向上突破$93,000并站稳,可轻仓追多,目标$94,500。\",\n \"long_term\": \"长期(1d/1w)建议继续持有现有仓位或保持观望。需等待价格有效突破$94,000(确认短期强势)或跌破$89,000(确认转弱)来明确大方向。在方向明确前,不宜进行大规模长期仓位调整。\"\n },\n \"reasoning\": \"多周期综合分析显示市场处于关键抉择期。短期(5m-1h)陷入无序震荡,缺乏交易价值。中期(4h-1d)在$90,156-$93,932构建震荡平台,MACD有修复迹象,倾向于在支撑位附近寻找低吸机会。长期(1d-1w)仍处于自11月高点以来的大级别盘整中,周线上涨趋势未改但动能减弱。当前核心矛盾是中期震荡与长期趋势的共振点尚未出现,需等待更明确的突破信号。风险主要来自震荡区间内的假突破和低成交量下的价格异动。\",\n \"risk_level\": \"MEDIUM\",\n \"key_factors\": [\"4小时及日线级别宽幅震荡区间的突破方向\", \"成交量能否在关键价位有效放大\", \"日线MACD能否形成金叉确认反弹\", \"周线RSI(40.7)能否回升至50中性区域以上\"]\n}\n```", + "risk_reward_ratio": 2.14 } } \ No newline at end of file diff --git a/redis.conf b/redis.conf deleted file mode 100644 index 1b78dff..0000000 --- a/redis.conf +++ /dev/null @@ -1,36 +0,0 @@ -# Redis Configuration for Real-time Data Ingestion - -# Network -bind 0.0.0.0 -protected-mode no -port 6379 - -# Persistence -save 900 1 -save 300 10 -save 60 10000 -stop-writes-on-bgsave-error yes -rdbcompression yes -rdbchecksum yes -dbfilename dump.rdb -dir /data - -# Memory Management -maxmemory 512mb -maxmemory-policy allkeys-lru - -# Append Only File (AOF) - Disabled for performance -appendonly no - -# Logging -loglevel notice -logfile "" - -# Stream Configuration -stream-node-max-bytes 4096 -stream-node-max-entries 100 - -# Performance -tcp-backlog 511 -timeout 0 -tcp-keepalive 300 diff --git a/requirements.txt b/requirements.txt index 88551f9..8422077 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,3 @@ -# WebSocket client -websockets==12.0 - -# Redis async client (includes aioredis functionality) -redis[hiredis]==5.0.1 - # Configuration management (use pre-built wheels) pydantic>=2.0,<3.0 pydantic-settings>=2.0,<3.0 @@ -11,12 +5,12 @@ pydantic-settings>=2.0,<3.0 # Environment variables python-dotenv==1.0.0 -# Fast JSON serialization +# Fast JSON serialization (optional, for better performance) orjson==3.9.10 # Data analysis and technical indicators -pandas==2.1.4 -numpy==1.26.2 +pandas>=2.2.0 +numpy>=2.0.0 ta==0.11.0 # LLM clients (optional - only if you want LLM decision making) diff --git a/run_signal.sh b/run_signal.sh index 40c4790..f97b5f5 100755 --- a/run_signal.sh +++ b/run_signal.sh @@ -3,25 +3,25 @@ set -e # 遇到错误立即退出 -echo "🚀 开始生成交易信号..." +echo "开始生成交易信号..." echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" # 检查 scheduler 容器是否运行 if ! docker compose ps scheduler | grep -q "running"; then - echo "⚠️ scheduler 容器未运行,正在启动..." - docker compose --profile scheduler up -d - echo "✅ 等待服务就绪..." - sleep 5 + echo "scheduler 容器未运行,正在启动..." + docker compose up -d + echo "等待服务就绪..." + sleep 3 fi -# 运行信号生成(API 配置已在 docker-compose.yml 中) +# 运行信号生成 docker compose exec scheduler python /app/scripts/generate_trading_signal.py echo "" echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" -echo "✅ 信号生成完成!" +echo "信号生成完成!" echo "" -echo "📊 查看完整结果:" +echo "查看完整结果:" echo " cat output/latest_signal.json | python -m json.tool" echo "" -echo "📱 如果是 BUY/SELL 信号,已自动发送钉钉通知" +echo "如果是 BUY/SELL 信号,已自动发送钉钉通知" diff --git a/scheduler.py b/scheduler.py index 76c52e9..05fcad5 100644 --- a/scheduler.py +++ b/scheduler.py @@ -17,7 +17,6 @@ from config.settings import settings from analysis.engine import MarketAnalysisEngine from signals.quantitative import QuantitativeSignalGenerator from signals.llm_decision import LLMDecisionMaker -from signals.llm_gate import LLMGate from signals.aggregator import SignalAggregator from notifiers.dingtalk import DingTalkNotifier @@ -44,16 +43,6 @@ class SignalScheduler: self.engine = MarketAnalysisEngine() self.quant_generator = QuantitativeSignalGenerator() - # Initialize LLM gate - self.llm_gate = None - if settings.LLM_GATE_ENABLED: - self.llm_gate = LLMGate( - min_candles=settings.LLM_MIN_CANDLES, - min_composite_score=settings.LLM_MIN_COMPOSITE_SCORE, - max_calls_per_day=settings.LLM_MAX_CALLS_PER_DAY, - min_call_interval_minutes=settings.LLM_MIN_INTERVAL_MINUTES, - ) - # Initialize LLM decision maker self.llm_maker = LLMDecisionMaker(provider='openai') @@ -67,54 +56,42 @@ class SignalScheduler: enabled=bool(dingtalk_webhook) ) - logger.info(f"🤖 Signal Scheduler 初始化完成 - 每{interval_minutes}分钟生成一次信号") + logger.info(f"Signal Scheduler 初始化完成 - 每{interval_minutes}分钟生成一次信号") async def generate_signal_once(self) -> dict: """执行一次信号生成""" try: logger.info("=" * 80) - logger.info(f"📊 开始生成交易信号 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(f"开始生成交易信号 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info("=" * 80) # Step 1: Market analysis analysis = self.engine.analyze_current_market(timeframe='5m') if 'error' in analysis: - logger.warning(f"⚠️ 市场分析失败: {analysis['error']}") + logger.warning(f"市场分析失败: {analysis['error']}") return None - logger.info(f"✅ 市场分析完成 - 价格: ${analysis['current_price']:,.2f}, 趋势: {analysis['trend_analysis'].get('direction')}") + logger.info(f"市场分析完成 - 价格: ${analysis['current_price']:,.2f}, 趋势: {analysis['trend_analysis'].get('direction')}") # Step 2: Quantitative signal quant_signal = self.quant_generator.generate_signal(analysis) - logger.info(f"📈 量化信号: {quant_signal['signal_type']} (得分: {quant_signal['composite_score']:.1f})") + logger.info(f"量化信号: {quant_signal['signal_type']} (得分: {quant_signal['composite_score']:.1f})") - # Step 3: Check LLM gate and generate LLM decision + # Step 3: LLM decision llm_signal = None - should_call_llm = True + llm_context = self.engine.get_llm_context(format='full') + llm_signal = self.llm_maker.generate_decision(llm_context, analysis) - if self.llm_gate: - should_call_llm, gate_reason = self.llm_gate.should_call_llm(quant_signal, analysis) - - if should_call_llm: - logger.info(f"✅ LLM Gate: PASSED - {gate_reason}") - else: - logger.info(f"🚫 LLM Gate: BLOCKED - {gate_reason}") - - # Call LLM if gate passed - if should_call_llm: - llm_context = self.engine.get_llm_context(format='full') - llm_signal = self.llm_maker.generate_decision(llm_context, analysis) - - if llm_signal.get('enabled', True): - logger.info(f"🤖 LLM信号: {llm_signal['signal_type']} (置信度: {llm_signal.get('confidence', 0):.2%})") - else: - logger.info("⚠️ LLM未启用 (无API key)") + if llm_signal.get('enabled', True): + logger.info(f"LLM信号: {llm_signal['signal_type']} (置信度: {llm_signal.get('confidence', 0):.2%})") + else: + logger.info("LLM未启用 (无API key)") # Step 4: Aggregate signals aggregated = SignalAggregator.aggregate_signals(quant_signal, llm_signal) - logger.info(f"🎯 最终信号: {aggregated['final_signal']} (置信度: {aggregated['final_confidence']:.2%})") + logger.info(f"最终信号: {aggregated['final_signal']} (置信度: {aggregated['final_confidence']:.2%})") # Step 5: Save to file output_file = Path(__file__).parent / 'output' / 'latest_signal.json' @@ -136,13 +113,10 @@ class SignalScheduler: with open(output_file, 'w') as f: json.dump(output_data, f, indent=2, ensure_ascii=False) - logger.info(f"💾 信号已保存到: {output_file}") + logger.info(f"信号已保存到: {output_file}") # Step 6: Send DingTalk notification try: - # 发送通知的条件: - # 1. BUY/SELL 明确信号 - # 2. HOLD信号但有日内交易机会 final_signal = aggregated.get('final_signal', 'HOLD') should_notify = False @@ -163,29 +137,29 @@ class SignalScheduler: notify_reason = f"HOLD信号,但存在短期{direction}机会" if should_notify: - logger.info(f"📱 发送钉钉通知 - {notify_reason}") + logger.info(f"发送钉钉通知 - {notify_reason}") sent = self.dingtalk.send_signal(aggregated) if sent: - logger.info(f"✅ 钉钉通知发送成功") + logger.info(f"钉钉通知发送成功") else: - logger.warning(f"⚠️ 钉钉通知发送失败或未配置") + logger.warning(f"钉钉通知发送失败或未配置") else: - logger.info(f"ℹ️ HOLD信号且无日内机会,跳过钉钉通知") + logger.info(f"HOLD信号且无日内机会,跳过钉钉通知") except Exception as e: - logger.error(f"❌ 钉钉通知发送异常: {e}", exc_info=True) + logger.error(f"钉钉通知发送异常: {e}", exc_info=True) logger.info("=" * 80) return aggregated except Exception as e: - logger.error(f"❌ 信号生成失败: {e}", exc_info=True) + logger.error(f"信号生成失败: {e}", exc_info=True) return None async def run(self): """启动调度器主循环""" self.is_running = True - logger.info(f"🚀 Signal Scheduler 启动 - 每{self.interval_minutes}分钟生成信号") + logger.info(f"Signal Scheduler 启动 - 每{self.interval_minutes}分钟生成信号") # 立即生成一次 await self.generate_signal_once() @@ -206,7 +180,7 @@ class SignalScheduler: logger.error(f"调度器错误: {e}", exc_info=True) await asyncio.sleep(60) # 错误后等待1分钟再继续 - logger.info("🛑 Signal Scheduler 已停止") + logger.info("Signal Scheduler 已停止") def stop(self): """停止调度器""" diff --git a/scripts/generate_trading_signal.py b/scripts/generate_trading_signal.py index 71382c5..73fe5d7 100755 --- a/scripts/generate_trading_signal.py +++ b/scripts/generate_trading_signal.py @@ -12,11 +12,13 @@ from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) -from config.settings import settings +# Load .env file +from dotenv import load_dotenv +load_dotenv(Path(__file__).parent.parent / '.env') + from analysis.engine import MarketAnalysisEngine from signals.quantitative import QuantitativeSignalGenerator from signals.llm_decision import LLMDecisionMaker -from signals.llm_gate import LLMGate from signals.aggregator import SignalAggregator from notifiers.dingtalk import DingTalkNotifier @@ -47,9 +49,9 @@ def print_signal(signal: dict, title: str): if 'trade_type' in signal: trade_type = signal['trade_type'] trade_type_display = { - 'INTRADAY': '📊 日内交易', - 'SWING': '📈 中长线交易', - 'NONE': '⏸️ 观望' + 'INTRADAY': 'Intraday', + 'SWING': 'Swing', + 'NONE': 'None' }.get(trade_type, trade_type) print(f"Trade Type: {trade_type_display}") @@ -83,39 +85,30 @@ def print_signal(signal: dict, title: str): # Intraday opportunity if opps.get('intraday', {}).get('exists'): intra = opps['intraday'] - print(f"\n📊 日内交易机会:") - print(f" 方向: {intra.get('direction', 'N/A')}") + print(f"\nIntraday Opportunity:") + print(f" Direction: {intra.get('direction', 'N/A')}") if intra.get('entry_price'): - print(f" 入场: ${intra['entry_price']:,.2f}") + print(f" Entry: ${intra['entry_price']:,.2f}") if intra.get('stop_loss'): - print(f" 止损: ${intra['stop_loss']:,.2f}") + print(f" Stop: ${intra['stop_loss']:,.2f}") if intra.get('take_profit'): - print(f" 止盈: ${intra['take_profit']:,.2f}") + print(f" Target: ${intra['take_profit']:,.2f}") if intra.get('reasoning'): - print(f" 说明: {intra['reasoning']}") + print(f" Reasoning: {intra['reasoning']}") # Swing opportunity if opps.get('swing', {}).get('exists'): swing = opps['swing'] - print(f"\n📈 中长线交易机会:") - print(f" 方向: {swing.get('direction', 'N/A')}") + print(f"\nSwing Opportunity:") + print(f" Direction: {swing.get('direction', 'N/A')}") if swing.get('entry_price'): - print(f" 入场: ${swing['entry_price']:,.2f}") + print(f" Entry: ${swing['entry_price']:,.2f}") if swing.get('stop_loss'): - print(f" 止损: ${swing['stop_loss']:,.2f}") + print(f" Stop: ${swing['stop_loss']:,.2f}") if swing.get('take_profit'): - print(f" 止盈: ${swing['take_profit']:,.2f}") + print(f" Target: ${swing['take_profit']:,.2f}") if swing.get('reasoning'): - print(f" 说明: {swing['reasoning']}") - - # Ambush opportunity - if opps.get('ambush', {}).get('exists'): - ambush = opps['ambush'] - print(f"\n📌 埋伏点位:") - if ambush.get('price_level'): - print(f" 埋伏价位: ${ambush['price_level']:,.2f}") - if ambush.get('reasoning'): - print(f" 说明: {ambush['reasoning']}") + print(f" Reasoning: {swing['reasoning']}") if 'reasoning' in signal: print(f"\nReasoning: {signal['reasoning']}") @@ -123,17 +116,17 @@ def print_signal(signal: dict, title: str): def print_aggregated_signal(aggregated: dict): """Print aggregated signal""" - print_section("📊 AGGREGATED TRADING SIGNAL") + print_section("AGGREGATED TRADING SIGNAL") - print(f"\n🎯 Final Signal: {aggregated['final_signal']}") - print(f"📈 Confidence: {aggregated['final_confidence']:.2%}") - print(f"🤝 Consensus: {aggregated['consensus']}") - print(f"✅ Agreement Score: {aggregated['agreement_score']:.2%}") + print(f"\nFinal Signal: {aggregated['final_signal']}") + print(f"Confidence: {aggregated['final_confidence']:.2%}") + print(f"Consensus: {aggregated['consensus']}") + print(f"Agreement Score: {aggregated['agreement_score']:.2%}") # Quantitative signal - print("\n" + "─" * 80) + print("\n" + "-" * 80) quant = aggregated['quantitative_signal'] - print(f"🔢 QUANTITATIVE SIGNAL: {quant.get('signal_type', quant.get('signal', 'HOLD'))} (confidence: {quant.get('confidence', 0):.2%})") + print(f"QUANTITATIVE SIGNAL: {quant.get('signal_type', quant.get('signal', 'HOLD'))} (confidence: {quant.get('confidence', 0):.2%})") print(f" Composite Score: {quant.get('composite_score', 0):.1f}") if 'scores' in quant: scores = quant['scores'] @@ -143,48 +136,39 @@ def print_aggregated_signal(aggregated: dict): f"Breakout: {scores.get('breakout', 0):>6.1f}") # LLM signal - print("\n" + "─" * 80) + print("\n" + "-" * 80) llm = aggregated.get('llm_signal') if llm and isinstance(llm, dict): - trade_type_icon = { - 'INTRADAY': '📊', - 'SWING': '📈', - 'AMBUSH': '📌', - 'NONE': '⏸️' - }.get(llm.get('trade_type', 'NONE'), '❓') trade_type_text = { - 'INTRADAY': '日内交易', - 'SWING': '中长线', - 'AMBUSH': '埋伏', - 'NONE': '观望' + 'INTRADAY': 'Intraday', + 'SWING': 'Swing', + 'AMBUSH': 'Ambush', + 'NONE': 'None' }.get(llm.get('trade_type', 'NONE'), llm.get('trade_type', 'N/A')) - print(f"🤖 LLM SIGNAL: {llm.get('signal_type', llm.get('signal', 'HOLD'))} (confidence: {llm.get('confidence', 0):.2%})") - print(f" Trade Type: {trade_type_icon} {trade_type_text}") + print(f"LLM SIGNAL: {llm.get('signal_type', llm.get('signal', 'HOLD'))} (confidence: {llm.get('confidence', 0):.2%})") + print(f" Trade Type: {trade_type_text}") # Display opportunities if available if 'opportunities' in llm: opps = llm['opportunities'] if opps.get('intraday', {}).get('exists'): intra = opps['intraday'] - print(f" 📊 日内: {intra.get('direction')} @ ${intra.get('entry_price', 0):,.0f}") + print(f" Intraday: {intra.get('direction')} @ ${intra.get('entry_price', 0):,.0f}") if opps.get('swing', {}).get('exists'): swing = opps['swing'] - print(f" 📈 中长线: {swing.get('direction')} @ ${swing.get('entry_price', 0):,.0f}") - if opps.get('ambush', {}).get('exists'): - ambush = opps['ambush'] - print(f" 📌 埋伏: ${ambush.get('price_level', 0):,.0f}") + print(f" Swing: {swing.get('direction')} @ ${swing.get('entry_price', 0):,.0f}") print(f" Reasoning: {llm.get('reasoning', 'N/A')[:200]}") if llm.get('key_factors'): print(f" Key Factors: {', '.join(llm['key_factors'][:3])}") else: - print("🤖 LLM SIGNAL: Not available (no API key configured)") + print("LLM SIGNAL: Not available (no API key configured)") # Final levels - print("\n" + "─" * 80) + print("\n" + "-" * 80) levels = aggregated['levels'] - print("💰 RECOMMENDED LEVELS:") + print("RECOMMENDED LEVELS:") print(f" Current Price: ${levels['current_price']:>10,.2f}") print(f" Entry: ${levels['entry']:>10,.2f}") print(f" Stop Loss: ${levels['stop_loss']:>10,.2f}") @@ -197,14 +181,14 @@ def print_aggregated_signal(aggregated: dict): print(f"\n Risk/Reward Ratio: 1:{rr:.2f}") # Recommendation - print("\n" + "─" * 80) - print(f"💡 RECOMMENDATION:") + print("\n" + "-" * 80) + print(f"RECOMMENDATION:") print(f" {aggregated['recommendation']}") # Warnings if aggregated.get('warnings'): - print("\n" + "─" * 80) - print("⚠️ WARNINGS:") + print("\n" + "-" * 80) + print("WARNINGS:") for warning in aggregated['warnings']: print(f" {warning}") @@ -218,7 +202,7 @@ def main(): help='Send notification to DingTalk') args = parser.parse_args() - print_section("🚀 TRADING SIGNAL GENERATOR", 80) + print_section("TRADING SIGNAL GENERATOR", 80) # Initialize components logger.info("Initializing analysis engine...") @@ -238,78 +222,44 @@ def main(): enabled=bool(dingtalk_webhook) ) - # Initialize LLM gate (极简门控 - 频率为主,量化初筛) - llm_gate = None - if settings.LLM_GATE_ENABLED: - logger.info("Initializing simplified LLM gate...") - llm_gate = LLMGate( - min_candles=settings.LLM_MIN_CANDLES, - min_composite_score=settings.LLM_MIN_COMPOSITE_SCORE, - max_calls_per_day=settings.LLM_MAX_CALLS_PER_DAY, - min_call_interval_minutes=settings.LLM_MIN_INTERVAL_MINUTES, - ) - - # Try to initialize LLM (will be disabled if no API key) + # Initialize LLM decision maker # Use 'openai' provider - supports OpenAI, Deepseek, and other OpenAI-compatible APIs - llm_maker = LLMDecisionMaker(provider='openai') # or 'claude' + llm_maker = LLMDecisionMaker(provider='openai') # Step 1: Perform market analysis - print_section("1️⃣ MARKET ANALYSIS") + print_section("1. MARKET ANALYSIS") analysis = engine.analyze_current_market(timeframe='5m') if 'error' in analysis: - print(f"❌ Error: {analysis['error']}") - print("\n💡 Tip: Wait for more data to accumulate (need at least 200 candles)") + print(f"Error: {analysis['error']}") + print("\nTip: Check your network connection to Binance API") return - print(f"✅ Analysis complete") + print(f"Analysis complete") print(f" Price: ${analysis['current_price']:,.2f}") print(f" Trend: {analysis['trend_analysis'].get('direction', 'unknown')}") print(f" RSI: {analysis['momentum'].get('rsi', 0):.1f}") print(f" MACD: {analysis['momentum'].get('macd_signal', 'unknown')}") # Step 2: Generate quantitative signal - print_section("2️⃣ QUANTITATIVE SIGNAL") + print_section("2. QUANTITATIVE SIGNAL") quant_signal = quant_generator.generate_signal(analysis) - print_signal(quant_signal, "📊 Quantitative Analysis") + print_signal(quant_signal, "Quantitative Analysis") - # Step 3: Check LLM gate and generate LLM decision - print_section("3️⃣ LLM DECISION") + # Step 3: Generate LLM decision + print_section("3. LLM DECISION") - llm_signal = None - should_call_llm = True - gate_reason = "LLM gate disabled" + llm_context = engine.get_llm_context(format='full') + llm_signal = llm_maker.generate_decision(llm_context, analysis) - # Check LLM gate prerequisites - if llm_gate: - should_call_llm, gate_reason = llm_gate.should_call_llm(quant_signal, analysis) - - if should_call_llm: - print(f"\n✅ LLM Gate: PASSED") - print(f" Reason: {gate_reason}") - else: - print(f"\n❌ LLM Gate: BLOCKED") - print(f" Reason: {gate_reason}") - print(f"\n💡 LLM will NOT be called. Using quantitative signal only.") - print(f" Quantitative score: {quant_signal.get('composite_score', 0):.1f}") - print(f" Quantitative confidence: {quant_signal.get('confidence', 0):.2%}") - - # Call LLM only if gate passed - if should_call_llm: - llm_context = engine.get_llm_context(format='full') - llm_signal = llm_maker.generate_decision(llm_context, analysis) - - if llm_signal.get('enabled', True): - print_signal(llm_signal, "🤖 LLM Analysis") - else: - print("\n🤖 LLM Analysis: Disabled (no API key)") - print(" Set ANTHROPIC_API_KEY or OPENAI_API_KEY to enable") + if llm_signal.get('enabled', True): + print_signal(llm_signal, "LLM Analysis") else: - # LLM blocked by gate, use None (aggregator will use quant-only) - print("\n🤖 LLM Analysis: Skipped (gate blocked)") + print("\nLLM Analysis: Disabled (no API key)") + print(" Set ANTHROPIC_API_KEY or OPENAI_API_KEY to enable") # Step 4: Aggregate signals - print_section("4️⃣ SIGNAL AGGREGATION") + print_section("4. SIGNAL AGGREGATION") aggregated = SignalAggregator.aggregate_signals(quant_signal, llm_signal) print_aggregated_signal(aggregated) @@ -332,18 +282,18 @@ def main(): with open(output_file, 'w') as f: json.dump(output_data, f, indent=2, ensure_ascii=False) - print(f"\n💾 Signal saved to: {output_file}") + print(f"\nSignal saved to: {output_file}") # Send DingTalk notification if enabled if dingtalk: - print(f"\n📱 Sending DingTalk notification...") + print(f"\nSending DingTalk notification...") success = dingtalk.send_signal(aggregated) if success: - print(f"✅ DingTalk notification sent successfully") + print(f"DingTalk notification sent successfully") else: - print(f"❌ Failed to send DingTalk notification") + print(f"Failed to send DingTalk notification") - print_section("✅ SIGNAL GENERATION COMPLETE", 80) + print_section("SIGNAL GENERATION COMPLETE", 80) if __name__ == "__main__": diff --git a/scripts/monitor.sh b/scripts/monitor.sh deleted file mode 100755 index a74b1de..0000000 --- a/scripts/monitor.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/bash - -# Real-time monitoring script for ingestion system - -echo "==================================================" -echo " Binance Data Ingestion - Real-time Monitor" -echo "==================================================" -echo "" - -# Check if Docker is running -if ! docker info > /dev/null 2>&1; then - echo "❌ Docker is not running" - exit 1 -fi - -# Detect Docker Compose command -if command -v docker-compose &> /dev/null; then - DOCKER_COMPOSE="docker-compose" -else - DOCKER_COMPOSE="docker compose" -fi - -echo "📊 Container Status:" -$DOCKER_COMPOSE ps -echo "" - -echo "🔍 Stream Statistics:" -docker exec tradus-redis redis-cli << EOF -KEYS binance:raw:* -XLEN binance:raw:kline:5m -XLEN binance:raw:depth:20 -XLEN binance:raw:trade -EOF -echo "" - -echo "💾 Redis Memory Usage:" -docker exec tradus-redis redis-cli INFO memory | grep used_memory_human -echo "" - -echo "📈 Latest K-line Data (last 3):" -docker exec tradus-redis redis-cli XREVRANGE binance:raw:kline:5m + - COUNT 3 -echo "" - -echo "📋 Application Logs (last 20 lines):" -$DOCKER_COMPOSE logs --tail=20 ingestion -echo "" - -echo "==================================================" -echo " Monitoring complete" -echo "==================================================" diff --git a/scripts/run_analysis.py b/scripts/run_analysis.py index 823bc0d..df0b014 100755 --- a/scripts/run_analysis.py +++ b/scripts/run_analysis.py @@ -10,6 +10,10 @@ from pathlib import Path # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) +# Load .env file +from dotenv import load_dotenv +load_dotenv(Path(__file__).parent.parent / '.env') + from analysis.engine import MarketAnalysisEngine diff --git a/scripts/test_redis_read.py b/scripts/test_redis_read.py deleted file mode 100755 index c441541..0000000 --- a/scripts/test_redis_read.py +++ /dev/null @@ -1,95 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to read data from Redis Streams -""" -import redis -import orjson -import sys - - -def test_read_streams(): - """Read and display data from all Redis Streams""" - - # Connect to Redis - try: - r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=False) - r.ping() - print("✓ Connected to Redis\n") - except Exception as e: - print(f"✗ Failed to connect to Redis: {e}") - sys.exit(1) - - # Stream keys to read - streams = [ - 'binance:raw:kline:5m', - 'binance:raw:depth:20', - 'binance:raw:trade', - ] - - print("=" * 80) - print("Reading data from Redis Streams") - print("=" * 80) - - for stream_key in streams: - print(f"\n📊 Stream: {stream_key}") - print("-" * 80) - - try: - # Get stream length - length = r.xlen(stream_key) - print(f"Stream length: {length}") - - if length == 0: - print("No data available yet\n") - continue - - # Read last 3 messages - messages = r.xrevrange(stream_key, count=3) - - for i, (msg_id, fields) in enumerate(messages, 1): - print(f"\n[Message {i}]") - print(f"ID: {msg_id.decode()}") - - # Parse JSON data - data = orjson.loads(fields[b'data']) - - # Display based on stream type - if 'kline' in stream_key: - kline = data.get('k', {}) - print(f"Symbol: {data.get('s')}") - print(f"Open: {kline.get('o')}") - print(f"High: {kline.get('h')}") - print(f"Low: {kline.get('l')}") - print(f"Close: {kline.get('c')}") - print(f"Volume: {kline.get('v')}") - print(f"Closed: {kline.get('x')}") - - elif 'depth' in stream_key: - print(f"Symbol: {data.get('s')}") - print(f"Event time: {data.get('E')}") - print(f"First update ID: {data.get('U')}") - print(f"Last update ID: {data.get('u')}") - bids = data.get('b', [])[:3] - asks = data.get('a', [])[:3] - print(f"Top 3 bids: {bids}") - print(f"Top 3 asks: {asks}") - - elif 'trade' in stream_key: - print(f"Symbol: {data.get('s')}") - print(f"Price: {data.get('p')}") - print(f"Quantity: {data.get('q')}") - print(f"Time: {data.get('T')}") - print(f"Buyer is maker: {data.get('m')}") - - print(f"Received at: {data.get('_received_at')}") - - except Exception as e: - print(f"Error reading stream {stream_key}: {e}") - - print("\n" + "=" * 80) - print("✓ Test completed") - print("=" * 80) - - -if __name__ == "__main__": - test_read_streams() diff --git a/signals/llm_gate.py b/signals/llm_gate.py deleted file mode 100644 index 8bab211..0000000 --- a/signals/llm_gate.py +++ /dev/null @@ -1,255 +0,0 @@ -""" -LLM Gate - 极简门控系统,以频率控制为主 - -核心原则: -1. 频率限制 - 每天最多12次,间隔≥15分钟(核心控制!) -2. 数据基本可用 - 至少100根K线,基础指标完整 -3. 信号基本质量 - 综合得分≥15(只过滤完全中性的信号) -""" -import logging -import os -import json -from typing import Dict, Any, Tuple, Optional -from datetime import datetime, timedelta -from pathlib import Path - -logger = logging.getLogger(__name__) - - -class LLMGate: - """ - 极简 LLM 门控系统 - 以频率控制为主,量化初筛为辅 - - 设计原则: - - 频率限制是核心(防止过度调用) - - 量化分析做初步筛选(过滤完全中性信号) - - 尽可能让LLM有机会深度分析 - """ - - def __init__( - self, - # 数据要求 - min_candles: int = 100, # 最少K线数量 - - # 信号质量(极简 - 只检查综合得分) - min_composite_score: float = 15.0, # 最小综合得分(过滤完全中性信号) - - # 频率限制(核心控制!) - max_calls_per_day: int = 12, # 每天最多调用次数 - min_call_interval_minutes: int = 15, # 最小调用间隔(分钟) - - # 状态存储 - state_file: str = '/app/data/llm_gate_state.json', - ): - """ - 初始化极简 LLM Gate - - Args: - min_candles: 最少K线数量 - min_composite_score: 最小综合得分(唯一的质量检查) - max_calls_per_day: 每天最多调用次数 - min_call_interval_minutes: 最小调用间隔 - state_file: 状态文件路径 - """ - # 数据要求 - self.min_candles = min_candles - - # 信号质量(极简) - self.min_composite_score = min_composite_score - - # 频率限制 - self.max_calls_per_day = max_calls_per_day - self.min_call_interval_minutes = min_call_interval_minutes - - # 状态管理 - self.state_file = state_file - self.state = self._load_state() - - logger.info( - f"🚦 LLM Gate 初始化 (极简模式): " - f"每天最多{max_calls_per_day}次, " - f"间隔≥{min_call_interval_minutes}分钟, " - f"综合得分≥{min_composite_score} (唯一质量检查)" - ) - - def should_call_llm( - self, - quant_signal: Dict[str, Any], - analysis: Dict[str, Any] - ) -> Tuple[bool, str]: - """ - 判断是否应该调用 LLM(优化版:简化检查,主要靠频率限制) - - 检查顺序 (快速失败原则): - 1. 频率限制 (核心!) - 2. 数据基本可用性 - 3. 信号基本质量 (量化初筛) - - Returns: - (should_call, reason) - """ - - # Check 1: 频率限制 (核心控制!) - freq_check, freq_reason = self._check_frequency_limit() - if not freq_check: - logger.info(f"🚫 LLM Gate: 频率限制 - {freq_reason}") - return False, freq_reason - - # Check 2: 数据基本可用性(简化版) - data_check, data_reason = self._check_data_sufficiency(analysis) - if not data_check: - logger.info(f"🚫 LLM Gate: 数据不足 - {data_reason}") - return False, data_reason - - # Check 3: 信号基本质量(量化初筛,门槛很低) - quality_check, quality_reason = self._check_signal_quality(quant_signal, analysis) - if not quality_check: - logger.info(f"🚫 LLM Gate: 信号质量不足 - {quality_reason}") - return False, quality_reason - - # ✅ 所有检查通过 - 让 LLM 进行深度分析 - logger.info( - f"✅ LLM Gate: PASSED! " - f"{quality_reason}, " - f"今日已调用{self.state['today_calls']}/{self.max_calls_per_day}次" - ) - - # 记录本次调用 - self._record_call() - - return True, f"量化初筛通过: {quality_reason}" - - def _check_frequency_limit(self) -> Tuple[bool, str]: - """检查频率限制""" - - now = datetime.now() - today_str = now.strftime('%Y-%m-%d') - - # 重置每日计数 - if self.state.get('last_date') != today_str: - self.state['last_date'] = today_str - self.state['today_calls'] = 0 - self._save_state() - - # Check 1: 每日调用次数 - if self.state['today_calls'] >= self.max_calls_per_day: - return False, f"今日已调用{self.state['today_calls']}次,达到上限{self.max_calls_per_day}次" - - # Check 2: 调用间隔 - last_call_time = self.state.get('last_call_time') - if last_call_time: - last_call = datetime.fromisoformat(last_call_time) - elapsed = (now - last_call).total_seconds() / 60 # 转为分钟 - if elapsed < self.min_call_interval_minutes: - return False, f"距离上次调用仅{elapsed:.1f}分钟,需≥{self.min_call_interval_minutes}分钟" - - return True, "频率检查通过" - - def _check_data_sufficiency(self, analysis: Dict[str, Any]) -> Tuple[bool, str]: - """检查数据充足性 (提高到200根K线)""" - - metadata = analysis.get('metadata', {}) - candle_count = metadata.get('candle_count', 0) - - if candle_count < self.min_candles: - return False, f"K线数量不足: {candle_count}/{self.min_candles}根" - - # 确保所有必要的指标都已计算 - required_keys = ['trend_analysis', 'momentum', 'support_resistance', 'orderflow'] - for key in required_keys: - if key not in analysis: - return False, f"缺少关键指标: {key}" - - return True, f"数据充足: {candle_count}根K线" - - def _check_signal_quality( - self, - quant_signal: Dict[str, Any], - analysis: Dict[str, Any] - ) -> Tuple[bool, str]: - """ - 检查信号质量(极简版 - 只检查综合得分) - - 只要量化分析给出了明确信号(不是完全中性),就让LLM来深度分析 - """ - - # 唯一检查: 综合得分强度 (门槛非常低,只过滤完全中性的信号) - composite_score = abs(quant_signal.get('composite_score', 0)) - if composite_score < self.min_composite_score: - return False, f"综合得分不足: {composite_score:.1f} < {self.min_composite_score}" - - # ✅ 通过 - 其他所有检查都删除了 - signal_type = quant_signal.get('signal_type', 'HOLD') - return True, f"信号类型: {signal_type}, 综合得分: {composite_score:.1f}" - - def _load_state(self) -> Dict[str, Any]: - """加载状态文件""" - - # 确保目录存在 - state_path = Path(self.state_file) - state_path.parent.mkdir(parents=True, exist_ok=True) - - if state_path.exists(): - try: - with open(self.state_file, 'r') as f: - return json.load(f) - except Exception as e: - logger.warning(f"加载状态文件失败: {e}") - - # 默认状态 - return { - 'last_date': '', - 'today_calls': 0, - 'last_call_time': None, - 'total_calls': 0, - } - - def _save_state(self): - """保存状态文件""" - try: - with open(self.state_file, 'w') as f: - json.dump(self.state, f, indent=2) - except Exception as e: - logger.error(f"保存状态文件失败: {e}") - - def _record_call(self): - """记录本次调用""" - now = datetime.now() - self.state['today_calls'] += 1 - self.state['total_calls'] = self.state.get('total_calls', 0) + 1 - self.state['last_call_time'] = now.isoformat() - self._save_state() - - logger.info( - f"📝 记录LLM调用: 今日第{self.state['today_calls']}次, " - f"累计第{self.state['total_calls']}次" - ) - - def get_stats(self) -> Dict[str, Any]: - """获取统计信息""" - now = datetime.now() - today_str = now.strftime('%Y-%m-%d') - - # 重置每日计数 - if self.state.get('last_date') != today_str: - today_calls = 0 - else: - today_calls = self.state['today_calls'] - - # 计算距离上次调用的时间 - last_call_time = self.state.get('last_call_time') - if last_call_time: - last_call = datetime.fromisoformat(last_call_time) - minutes_since_last = (now - last_call).total_seconds() / 60 - else: - minutes_since_last = None - - return { - 'today_calls': today_calls, - 'max_calls_per_day': self.max_calls_per_day, - 'remaining_calls_today': max(0, self.max_calls_per_day - today_calls), - 'total_calls': self.state.get('total_calls', 0), - 'last_call_time': last_call_time, - 'minutes_since_last_call': minutes_since_last, - 'can_call_now': minutes_since_last is None or minutes_since_last >= self.min_call_interval_minutes, - } diff --git a/start_system.sh b/start_system.sh index 3dd40cb..d591bb2 100755 --- a/start_system.sh +++ b/start_system.sh @@ -1,15 +1,15 @@ #!/bin/bash -# 启动完整的交易系统(数据采集 + 定时信号生成) +# 启动信号生成服务 -echo "🚀 启动 Tradus AI 交易系统..." +echo "🚀 启动 Tradus AI 信号生成服务..." echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" -# 启动所有服务 -docker compose --profile scheduler up -d +# 启动服务 +docker compose up -d echo "" echo "⏳ 等待服务就绪..." -sleep 5 +sleep 3 # 检查服务状态 echo "" @@ -20,10 +20,9 @@ echo "" echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" echo "✅ 系统启动完成!" echo "" -echo "📝 组件说明:" -echo " • Redis: 数据存储" -echo " • Ingestion: Binance 实时数据采集" -echo " • Scheduler: 每5分钟自动生成交易信号" +echo "📝 说明:" +echo " • Scheduler: 每15分钟自动从 Binance API 获取数据并生成交易信号" +echo " • 无需 Redis 依赖,直接通过 API 获取实时数据" echo "" echo "📱 钉钉通知: BUY/SELL 信号会自动推送" echo "" diff --git a/stop_system.sh b/stop_system.sh index 13aad7c..c232a16 100755 --- a/stop_system.sh +++ b/stop_system.sh @@ -1,14 +1,14 @@ #!/bin/bash -# 停止交易系统 +# 停止信号生成服务 -echo "🛑 停止 Tradus AI 交易系统..." +echo "🛑 停止 Tradus AI 信号生成服务..." echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" -docker compose --profile scheduler down +docker compose down echo "" echo "✅ 系统已停止" echo "" echo "💡 提示:" echo " 重新启动: ./start_system.sh" -echo " 完全清理: docker compose --profile scheduler down -v" +echo " 完全清理: docker compose down -v" diff --git a/view_data.sh b/view_data.sh deleted file mode 100755 index 2029688..0000000 --- a/view_data.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/bin/bash -# 查看实时采集的数据 - -echo "════════════════════════════════════════════════════════════════" -echo " 📡 实时数据监控 " -echo "════════════════════════════════════════════════════════════════" -echo "" - -# 获取当前价格 -echo "📊 当前 BTC 价格:" -docker compose exec ingestion python3 -c " -import redis, orjson -r = redis.Redis(host='redis', port=6379, decode_responses=False) -messages = r.xrevrange('binance:raw:kline:5m', count=1) -if messages: - msg_id, fields = messages[0] - data = orjson.loads(fields[b'data']) - k = data['k'] - print(f\" \${float(k['c']):>12,.2f} (最新)\") - print(f\" \${float(k['h']):>12,.2f} (5分钟最高)\") - print(f\" \${float(k['l']):>12,.2f} (5分钟最低)\") - print(f\" 成交量: {float(k['v']):.2f} BTC\") -else: - print(' 数据加载中...') -" 2>/dev/null - -echo "" -echo "─────────────────────────────────────────────────────────────────" -echo "" - -# 数据流状态 -echo "📈 数据流状态:" -for stream in "binance:raw:kline:5m" "binance:raw:kline:15m" "binance:raw:kline:1h" "binance:raw:trade" "binance:raw:depth:20"; do - count=$(docker exec tradus-redis redis-cli XLEN $stream) - name=$(echo $stream | cut -d: -f3-) - printf " %-15s: %'6d 条消息\n" "$name" "$count" -done - -echo "" -echo "─────────────────────────────────────────────────────────────────" -echo "" - -# 服务状态 -echo "🚀 服务状态:" -docker compose ps | grep -E "(tradus-redis|tradus-ingestion)" | awk '{print " "$1": "$6}' - -echo "" -echo "════════════════════════════════════════════════════════════════"