""" Signal Aggregator - Combine quantitative and LLM signals """ import logging from typing import Dict, Any, List, Optional from datetime import datetime logger = logging.getLogger(__name__) class SignalAggregator: """Aggregate and compare signals from multiple sources""" @staticmethod def aggregate_signals( quant_signal: Dict[str, Any], llm_signal: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Aggregate quantitative and LLM signals Args: quant_signal: Signal from QuantitativeSignalGenerator llm_signal: Optional signal from LLMDecisionMaker Returns: Aggregated signal with consensus analysis """ # If no LLM signal, return quant signal only if not llm_signal or not llm_signal.get('enabled', True): return { 'timestamp': datetime.now().isoformat(), 'final_signal': quant_signal['signal_type'], 'final_confidence': quant_signal.get('confidence', 0.5), 'quantitative_signal': quant_signal, 'llm_signal': None, 'consensus': 'QUANT_ONLY', 'agreement_score': 1.0, 'levels': quant_signal.get('levels', {}), 'risk_reward_ratio': quant_signal.get('risk_reward_ratio', 0), 'recommendation': SignalAggregator._generate_recommendation( quant_signal, None ), 'warnings': SignalAggregator._check_warnings(quant_signal, None, 1.0), } # Extract signals quant_type = quant_signal.get('signal_type', 'HOLD') llm_type = llm_signal.get('signal_type', 'HOLD') quant_confidence = quant_signal.get('confidence', 0.5) llm_confidence = llm_signal.get('confidence', 0.5) # Calculate agreement agreement = SignalAggregator._calculate_agreement( quant_type, llm_type, quant_confidence, llm_confidence ) # Determine final signal final_signal, final_confidence, consensus_type = SignalAggregator._determine_final_signal( quant_type, llm_type, quant_confidence, llm_confidence, agreement ) # Combine levels final_levels = SignalAggregator._combine_levels(quant_signal, llm_signal) # Build aggregated signal aggregated = { 'timestamp': datetime.now().isoformat(), 'final_signal': final_signal, 'final_confidence': round(final_confidence, 2), 'consensus': consensus_type, 'agreement_score': round(agreement, 2), 'quantitative_signal': { 'signal_type': quant_type, 'signal': quant_type, # Keep both for compatibility 'confidence': quant_confidence, 'composite_score': quant_signal.get('composite_score', 0), 'scores': quant_signal.get('scores', {}), }, 'llm_signal': { 'signal_type': llm_type, 'signal': llm_type, # Keep both for compatibility 'confidence': llm_confidence, 'reasoning': llm_signal.get('reasoning', ''), 'key_factors': llm_signal.get('key_factors', []), # Multi-timeframe analysis fields 'opportunities': llm_signal.get('opportunities', {}), 'recommendations_by_timeframe': llm_signal.get('recommendations_by_timeframe', {}), 'trade_type': llm_signal.get('trade_type', ''), 'risk_level': llm_signal.get('risk_level', 'MEDIUM'), }, 'levels': final_levels, 'risk_reward_ratio': SignalAggregator._calculate_rr_ratio(final_levels), 'recommendation': SignalAggregator._generate_recommendation( quant_signal, llm_signal ), 'warnings': SignalAggregator._check_warnings( quant_signal, llm_signal, agreement ), } logger.info( f"Aggregated signal: {final_signal} (confidence: {final_confidence:.2f}, " f"consensus: {consensus_type}, agreement: {agreement:.2f})" ) return aggregated @staticmethod def _calculate_agreement( quant_signal: str, llm_signal: str, quant_confidence: float, llm_confidence: float ) -> float: """ Calculate agreement score between signals (0-1) 1.0 = Perfect agreement 0.5 = Neutral (one HOLD) 0.0 = Complete disagreement """ # Signal direction agreement if quant_signal == llm_signal: direction_agreement = 1.0 elif quant_signal == 'HOLD' or llm_signal == 'HOLD': direction_agreement = 0.5 # Neutral else: direction_agreement = 0.0 # Opposite signals # Confidence alignment (higher when both are confident) avg_confidence = (quant_confidence + llm_confidence) / 2 # Combined agreement agreement = direction_agreement * avg_confidence return agreement @staticmethod def _determine_final_signal( quant_signal: str, llm_signal: str, quant_confidence: float, llm_confidence: float, agreement: float ) -> tuple: """ Determine final signal from two sources Returns: (final_signal, final_confidence, consensus_type) """ # Perfect agreement if quant_signal == llm_signal: if quant_signal != 'HOLD': return ( quant_signal, (quant_confidence + llm_confidence) / 2, 'STRONG_CONSENSUS' ) else: return ( 'HOLD', (quant_confidence + llm_confidence) / 2, 'CONSENSUS_HOLD' ) # One is HOLD if quant_signal == 'HOLD': return ( llm_signal, llm_confidence * 0.7, # Reduce confidence 'LLM_LEADING' ) elif llm_signal == 'HOLD': return ( quant_signal, quant_confidence * 0.7, 'QUANT_LEADING' ) # Conflicting signals (BUY vs SELL) else: # Use higher confidence signal if quant_confidence > llm_confidence: return ( quant_signal, quant_confidence * 0.5, # Significantly reduce confidence 'CONFLICT_QUANT_WINS' ) elif llm_confidence > quant_confidence: return ( llm_signal, llm_confidence * 0.5, 'CONFLICT_LLM_WINS' ) else: # Equal confidence - default to HOLD return ( 'HOLD', 0.3, 'CONFLICT_HOLD' ) @staticmethod def _combine_levels( quant_signal: Dict[str, Any], llm_signal: Dict[str, Any] ) -> Dict[str, Any]: """Combine price levels from both signals""" quant_levels = quant_signal.get('levels', {}) llm_levels = llm_signal.get('levels', {}) # Average the levels if both exist combined = {} for key in ['current_price', 'entry', 'stop_loss', 'take_profit_1', 'take_profit_2', 'take_profit_3']: quant_val = quant_levels.get(key, 0) llm_val = llm_levels.get(key, 0) if quant_val > 0 and llm_val > 0: # Average both combined[key] = round((quant_val + llm_val) / 2, 2) elif quant_val > 0: combined[key] = quant_val elif llm_val > 0: combined[key] = llm_val else: combined[key] = 0 # Add range if values differ significantly for key in ['entry', 'stop_loss', 'take_profit_1']: quant_val = quant_levels.get(key, 0) llm_val = llm_levels.get(key, 0) if quant_val > 0 and llm_val > 0: diff_pct = abs(quant_val - llm_val) / ((quant_val + llm_val) / 2) * 100 if diff_pct > 1: # More than 1% difference combined[f'{key}_range'] = { 'quant': quant_val, 'llm': llm_val, 'diff_pct': round(diff_pct, 2), } return combined @staticmethod def _calculate_rr_ratio(levels: Dict[str, Any]) -> float: """Calculate risk-reward ratio from levels""" entry = levels.get('entry', 0) stop_loss = levels.get('stop_loss', 0) take_profit = levels.get('take_profit_1', 0) if entry == 0 or stop_loss == 0 or take_profit == 0: return 0 risk = abs(entry - stop_loss) reward = abs(take_profit - entry) if risk == 0: return 0 return round(reward / risk, 2) @staticmethod def _generate_recommendation( quant_signal: Dict[str, Any], llm_signal: Optional[Dict[str, Any]] ) -> str: """Generate human-readable recommendation""" quant_type = quant_signal.get('signal_type', 'HOLD') if not llm_signal: quant_conf = quant_signal.get('confidence', 0) quant_reasoning = quant_signal.get('reasoning', '') if quant_type == 'BUY' and quant_conf > 0.7: return f"强烈建议做多: {quant_reasoning}" elif quant_type == 'BUY': return f"考虑做多: {quant_reasoning}" elif quant_type == 'SELL' and quant_conf > 0.7: return f"强烈建议做空: {quant_reasoning}" elif quant_type == 'SELL': return f"考虑做空: {quant_reasoning}" else: return f"观望: {quant_reasoning}" llm_type = llm_signal.get('signal_type', 'HOLD') if quant_type == llm_type and quant_type != 'HOLD': return f"量化和AI分析一致建议{quant_type}: 高置信度交易机会" elif quant_type == llm_type and quant_type == 'HOLD': return "量化和AI分析均建议观望,等待更好的机会" elif quant_type == 'HOLD': return f"AI建议{llm_type},但量化信号不强,建议谨慎" elif llm_type == 'HOLD': return f"量化建议{quant_type},但AI建议观望,建议谨慎" else: return f"⚠️ 信号冲突: 量化建议{quant_type},AI建议{llm_type},强烈建议观望" @staticmethod def _check_warnings( quant_signal: Dict[str, Any], llm_signal: Optional[Dict[str, Any]], agreement: float ) -> List[str]: """Check for warnings and risk factors""" warnings = [] # Low agreement warning if llm_signal and agreement < 0.3: warnings.append("⚠️ 量化和AI信号严重分歧,建议观望") # Low confidence warning quant_conf = quant_signal.get('confidence', 0) if quant_conf < 0.4: warnings.append("⚠️ 量化信号置信度较低") if llm_signal: llm_conf = llm_signal.get('confidence', 0) if llm_conf < 0.4: warnings.append("⚠️ AI信号置信度较低") # Risk-reward warning quant_levels = quant_signal.get('levels', {}) rr = quant_signal.get('risk_reward_ratio', 0) if rr > 0 and rr < 1.5: warnings.append(f"⚠️ 风险回报比偏低 ({rr}:1), 建议至少1.5:1") # Large level discrepancy if llm_signal: quant_levels = quant_signal.get('levels', {}) llm_levels = llm_signal.get('levels', {}) for key in ['entry', 'stop_loss']: quant_val = quant_levels.get(key, 0) llm_val = llm_levels.get(key, 0) if quant_val > 0 and llm_val > 0: diff_pct = abs(quant_val - llm_val) / ((quant_val + llm_val) / 2) * 100 if diff_pct > 2: warnings.append( f"⚠️ {key}建议差异较大: 量化${quant_val:.2f} vs AI${llm_val:.2f} ({diff_pct:.1f}%)" ) return warnings