""" Quantitative Signal Generator - Rule-based trading signals """ import logging from typing import Dict, Any, List, Optional from datetime import datetime from config.settings import settings logger = logging.getLogger(__name__) class QuantitativeSignalGenerator: """Generate trading signals based on quantitative analysis""" @staticmethod def generate_signal(analysis: Dict[str, Any]) -> Dict[str, Any]: """ Generate trading signal from market analysis Args: analysis: Complete market analysis from MarketAnalysisEngine Returns: Signal dict with direction, strength, entry/exit levels """ if 'error' in analysis: return QuantitativeSignalGenerator._no_signal("Insufficient data") # Extract components trend = analysis.get('trend_analysis', {}) momentum = analysis.get('momentum', {}) sr_levels = analysis.get('support_resistance', {}) breakout = analysis.get('breakout', {}) orderflow = analysis.get('orderflow', {}) current_price = analysis.get('current_price', 0) # Calculate individual signal scores trend_score = QuantitativeSignalGenerator._calculate_trend_score(trend) momentum_score = QuantitativeSignalGenerator._calculate_momentum_score(momentum) orderflow_score = QuantitativeSignalGenerator._calculate_orderflow_score(orderflow) breakout_score = QuantitativeSignalGenerator._calculate_breakout_score(breakout) # Composite signal score (-100 to +100) # Positive = bullish, Negative = bearish composite_score = ( trend_score * 0.35 + momentum_score * 0.25 + orderflow_score * 0.25 + breakout_score * 0.15 ) # Determine signal direction and strength signal_type = QuantitativeSignalGenerator._determine_signal_type(composite_score) signal_strength = abs(composite_score) / 100 # 0-1 scale # Calculate entry/exit levels entry_level = QuantitativeSignalGenerator._calculate_entry_level( current_price, signal_type, sr_levels ) stop_loss = QuantitativeSignalGenerator._calculate_stop_loss( current_price, signal_type, analysis.get('indicators', {}), sr_levels ) take_profit_levels = QuantitativeSignalGenerator._calculate_take_profit( current_price, signal_type, sr_levels, stop_loss ) # Calculate consensus score (0-1) - 多个指标的一致性程度 consensus_score = QuantitativeSignalGenerator._calculate_consensus_score( trend_score, momentum_score, orderflow_score, breakout_score ) # Build signal # Calculate profit percentage if signal_type == 'BUY' and entry_level > 0: profit_pct = (take_profit_levels[0] - entry_level) / entry_level * 100 elif signal_type == 'SELL' and entry_level > 0: profit_pct = (entry_level - take_profit_levels[0]) / entry_level * 100 else: profit_pct = 0 # Apply minimum profit filter - 盈利空间不足时建议观望 MIN_PROFIT_PCT = settings.MIN_PROFIT_PCT original_signal_type = signal_type filtered_reason = "" if signal_type != 'HOLD' and profit_pct < MIN_PROFIT_PCT: logger.info(f"量化信号被过滤: {signal_type} 盈利空间 {profit_pct:.2f}% < {MIN_PROFIT_PCT}%,改为 HOLD") filtered_reason = f" (原{original_signal_type}信号盈利空间不足{MIN_PROFIT_PCT}%,仅{profit_pct:.2f}%)" signal_type = 'HOLD' signal_strength = 0 signal = { 'timestamp': datetime.now().isoformat(), 'signal_type': signal_type, # 'BUY', 'SELL', 'HOLD' 'signal_strength': round(signal_strength, 2), 'composite_score': round(composite_score, 1), 'confidence': QuantitativeSignalGenerator._calculate_confidence( trend, momentum, orderflow ), 'consensus_score': round(consensus_score, 2), # 共识得分 (关键指标) 'profit_pct': round(profit_pct, 2), # 预期盈利空间百分比 'scores': { 'trend': round(trend_score, 1), 'momentum': round(momentum_score, 1), 'orderflow': round(orderflow_score, 1), 'breakout': round(breakout_score, 1), }, 'levels': { 'current_price': current_price, 'entry': entry_level, 'stop_loss': stop_loss, 'take_profit_1': take_profit_levels[0], 'take_profit_2': take_profit_levels[1], 'take_profit_3': take_profit_levels[2], }, 'risk_reward_ratio': QuantitativeSignalGenerator._calculate_rr_ratio( entry_level, stop_loss, take_profit_levels[0] ), 'reasoning': QuantitativeSignalGenerator._generate_reasoning( original_signal_type, trend, momentum, orderflow, breakout ) + filtered_reason, } logger.info( f"Generated signal: {signal_type} (strength: {signal_strength:.2f}, " f"composite: {composite_score:.1f})" ) return signal @staticmethod def _calculate_trend_score(trend: Dict[str, Any]) -> float: """Calculate trend score (-100 to +100)""" direction = trend.get('direction', '震荡') strength = trend.get('strength', 'weak') adx = trend.get('adx', 0) ema_alignment = trend.get('ema_alignment', 'neutral') score = 0 # Direction if direction == '上涨': score += 50 elif direction == '下跌': score -= 50 # Strength multiplier if strength == 'strong': score *= 1.5 elif strength == 'moderate': score *= 1.2 elif strength == 'weak': score *= 0.7 # ADX confirmation if adx > 25: score *= 1.2 elif adx < 15: score *= 0.6 # EMA alignment if ema_alignment == 'bullish' and score > 0: score *= 1.1 elif ema_alignment == 'bearish' and score < 0: score *= 1.1 return max(-100, min(100, score)) @staticmethod def _calculate_momentum_score(momentum: Dict[str, Any]) -> float: """Calculate momentum score (-100 to +100)""" rsi = momentum.get('rsi', 50) macd_signal = momentum.get('macd_signal', '') rsi_trend = momentum.get('rsi_trend', '中性') score = 0 # RSI score if rsi > 70: score -= 30 # Overbought - bearish elif rsi > 60: score += 20 # Strong but not overbought elif rsi > 50: score += 10 elif rsi > 40: score -= 10 elif rsi > 30: score -= 20 else: score += 30 # Oversold - bullish # MACD signal if '金叉' in macd_signal: if '扩大' in macd_signal: score += 40 else: score += 20 elif '死叉' in macd_signal: if '扩大' in macd_signal: score -= 40 else: score -= 20 # RSI trend if rsi_trend == '上升中': score += 15 elif rsi_trend == '下降中': score -= 15 return max(-100, min(100, score)) @staticmethod def _calculate_orderflow_score(orderflow: Optional[Dict[str, Any]]) -> float: """Calculate order flow score (-100 to +100)""" if not orderflow: return 0 imbalance = orderflow.get('imbalance', {}) strength = orderflow.get('strength', {}) large_orders = orderflow.get('large_orders', {}) score = 0 # Imbalance imbalance_pct = imbalance.get('imbalance_pct', 0) score += imbalance_pct # -100 to +100 # Strength confirmation strength_val = strength.get('strength_score', 0) score = (score + strength_val) / 2 # Average # Large order bias large_bid_count = large_orders.get('large_bids_count', 0) large_ask_count = large_orders.get('large_asks_count', 0) if large_bid_count > large_ask_count * 1.5: score += 15 elif large_ask_count > large_bid_count * 1.5: score -= 15 return max(-100, min(100, score)) @staticmethod def _calculate_breakout_score(breakout: Dict[str, Any]) -> float: """Calculate breakout score (-100 to +100)""" if not breakout.get('has_breakout'): return 0 breakout_type = breakout.get('type', '') if breakout_type == 'resistance_breakout': return 80 # Strong bullish elif breakout_type == 'support_breakdown': return -80 # Strong bearish # Approaching key level (not yet broken) if 'approaching' in breakout: approaching = breakout.get('approaching') if approaching == 'resistance': return 20 # Cautiously bullish elif approaching == 'support': return -20 # Cautiously bearish return 0 @staticmethod def _determine_signal_type(composite_score: float) -> str: """Determine signal type from composite score""" if composite_score > 40: return 'BUY' elif composite_score < -40: return 'SELL' else: return 'HOLD' @staticmethod def _calculate_confidence( trend: Dict[str, Any], momentum: Dict[str, Any], orderflow: Optional[Dict[str, Any]] ) -> float: """ Calculate signal confidence (0-1) High confidence when multiple indicators align """ alignments = 0 total_checks = 0 # Trend-momentum alignment trend_dir = trend.get('direction', '震荡') rsi = momentum.get('rsi', 50) total_checks += 1 if (trend_dir == '上涨' and 50 < rsi < 70) or \ (trend_dir == '下跌' and 30 < rsi < 50): alignments += 1 # MACD-trend alignment macd_signal = momentum.get('macd_signal', '') total_checks += 1 if (trend_dir == '上涨' and '金叉' in macd_signal) or \ (trend_dir == '下跌' and '死叉' in macd_signal): alignments += 1 # Order flow alignment (if available) if orderflow: imbalance = orderflow.get('imbalance', {}) pressure = imbalance.get('pressure', 'neutral') total_checks += 1 if (trend_dir == '上涨' and 'buy' in pressure) or \ (trend_dir == '下跌' and 'sell' in pressure): alignments += 1 # ADX strength confirmation adx = trend.get('adx', 0) total_checks += 1 if adx > 20: alignments += 1 confidence = alignments / total_checks if total_checks > 0 else 0.5 return round(confidence, 2) @staticmethod def _calculate_entry_level( current_price: float, signal_type: str, sr_levels: Dict[str, Any] ) -> float: """Calculate optimal entry level""" if signal_type == 'BUY': # Try to enter at support or current price support = sr_levels.get('nearest_support') if support and current_price - support < current_price * 0.005: # Within 0.5% return support return current_price elif signal_type == 'SELL': # Try to enter at resistance or current price resistance = sr_levels.get('nearest_resistance') if resistance and resistance - current_price < current_price * 0.005: return resistance return current_price else: # HOLD return current_price @staticmethod def _calculate_stop_loss( current_price: float, signal_type: str, indicators: Dict[str, Any], sr_levels: Dict[str, Any] ) -> float: """Calculate stop loss level using ATR and S/R""" atr = indicators.get('atr', current_price * 0.01) # Default 1% ATR if signal_type == 'BUY': # Stop loss below support or 1.5 * ATR support = sr_levels.get('nearest_support') atr_stop = current_price - (atr * 1.5) if support and support < current_price: # Use the lower of support-buffer or ATR stop support_stop = support * 0.998 # 0.2% below support return min(support_stop, atr_stop) return atr_stop elif signal_type == 'SELL': # Stop loss above resistance or 1.5 * ATR resistance = sr_levels.get('nearest_resistance') atr_stop = current_price + (atr * 1.5) if resistance and resistance > current_price: # Use the higher of resistance+buffer or ATR stop resistance_stop = resistance * 1.002 # 0.2% above resistance return max(resistance_stop, atr_stop) return atr_stop else: # HOLD return current_price @staticmethod def _calculate_take_profit( current_price: float, signal_type: str, sr_levels: Dict[str, Any], stop_loss: float ) -> List[float]: """ Calculate 3 take profit levels Returns: [TP1, TP2, TP3] """ risk = abs(current_price - stop_loss) if signal_type == 'BUY': # Use resistance levels or risk-reward ratios resistance = sr_levels.get('nearest_resistance') tp1 = resistance if resistance and resistance > current_price else current_price + (risk * 1.5) tp2 = current_price + (risk * 2.5) tp3 = current_price + (risk * 4.0) elif signal_type == 'SELL': # Use support levels or risk-reward ratios support = sr_levels.get('nearest_support') tp1 = support if support and support < current_price else current_price - (risk * 1.5) tp2 = current_price - (risk * 2.5) tp3 = current_price - (risk * 4.0) else: # HOLD return [current_price, current_price, current_price] return [round(tp, 2) for tp in [tp1, tp2, tp3]] @staticmethod def _calculate_rr_ratio(entry: float, stop_loss: float, take_profit: float) -> float: """Calculate risk-reward ratio""" risk = abs(entry - stop_loss) reward = abs(take_profit - entry) if risk == 0: return 0 return round(reward / risk, 2) @staticmethod def _generate_reasoning( signal_type: str, trend: Dict[str, Any], momentum: Dict[str, Any], orderflow: Optional[Dict[str, Any]], breakout: Dict[str, Any] ) -> str: """Generate human-readable reasoning for the signal""" reasons = [] # Trend trend_dir = trend.get('direction', '震荡') trend_strength = trend.get('strength', 'weak') reasons.append(f"趋势{trend_dir} ({trend_strength})") # Momentum rsi = momentum.get('rsi', 50) macd_signal = momentum.get('macd_signal', '') reasons.append(f"RSI={rsi:.0f}") if macd_signal: reasons.append(f"MACD {macd_signal}") # Order flow if orderflow: imbalance = orderflow.get('imbalance', {}) status = imbalance.get('status', '') if status: reasons.append(f"订单流: {status}") # Breakout if breakout.get('has_breakout'): breakout_type = breakout.get('type', '') if breakout_type == 'resistance_breakout': reasons.append("突破压力位") elif breakout_type == 'support_breakdown': reasons.append("跌破支撑位") return "; ".join(reasons) @staticmethod def _calculate_consensus_score( trend_score: float, momentum_score: float, orderflow_score: float, breakout_score: float ) -> float: """ 计算多个指标的共识得分 (0-1) 共识得分反映各个指标方向的一致性: - 1.0 = 所有指标完全一致 (都强烈看多或看空) - 0.5 = 指标方向混杂 - 0.0 = 指标完全矛盾 这是LLM Gate的关键指标!只有共识≥0.75时才考虑调用LLM """ # 将各个分数归一化到方向: +1 (看多), 0 (中性), -1 (看空) def normalize_direction(score: float, threshold: float = 10.0) -> float: """将分数转换为方向值""" if score > threshold: return min(score / 50, 1.0) # 最大1.0 elif score < -threshold: return max(score / 50, -1.0) # 最小-1.0 else: return 0.0 # 中性 # 获取各个指标的方向 trend_dir = normalize_direction(trend_score) momentum_dir = normalize_direction(momentum_score) orderflow_dir = normalize_direction(orderflow_score) breakout_dir = normalize_direction(breakout_score, threshold=5.0) # 突破阈值较低 # 计算方向的一致性 # 方法: 计算各方向与主导方向的相关性 # 计算加权平均方向 (趋势和订单流权重较高) weighted_avg_dir = ( trend_dir * 0.40 + momentum_dir * 0.25 + orderflow_dir * 0.25 + breakout_dir * 0.10 ) # 如果加权平均接近0,说明没有明确方向,共识度低 if abs(weighted_avg_dir) < 0.2: return 0.0 # 计算各指标与主导方向的一致性 alignments = [] # 趋势一致性 if abs(trend_dir) > 0.3: # 趋势有方向 alignment = 1.0 if (trend_dir * weighted_avg_dir) > 0 else 0.0 alignments.append(alignment * 0.40) # 趋势权重40% # 动量一致性 if abs(momentum_dir) > 0.3: alignment = 1.0 if (momentum_dir * weighted_avg_dir) > 0 else 0.0 alignments.append(alignment * 0.30) # 动量权重30% # 订单流一致性 if abs(orderflow_dir) > 0.3: alignment = 1.0 if (orderflow_dir * weighted_avg_dir) > 0 else 0.0 alignments.append(alignment * 0.25) # 订单流权重25% # 突破一致性 if abs(breakout_dir) > 0.3: alignment = 1.0 if (breakout_dir * weighted_avg_dir) > 0 else 0.0 alignments.append(alignment * 0.05) # 突破权重5% # 如果没有任何指标有明确方向 if not alignments: return 0.0 # 计算总共识度 consensus = sum(alignments) # 额外加分: 如果主导方向很强 (>0.6) if abs(weighted_avg_dir) > 0.6: consensus = min(consensus * 1.2, 1.0) return round(max(0.0, min(1.0, consensus)), 2) @staticmethod def _no_signal(reason: str) -> Dict[str, Any]: """Return a HOLD signal with reason""" return { 'timestamp': datetime.now().isoformat(), 'signal_type': 'HOLD', 'signal_strength': 0, 'composite_score': 0, 'confidence': 0, 'consensus_score': 0, # 添加共识得分 'reasoning': reason, }