""" 市场信号分析器 - 纯市场分析,不包含任何仓位信息 职责: 1. 分析K线、量价、技术指标 2. 分析新闻舆情 3. 输出纯市场信号(buy/sell/hold + confidence + reasoning) 不负责: - 仓位管理 - 风险控制 - 具体下单决策 """ import json import re import asyncio import numpy as np import pandas as pd from typing import Dict, Any, Optional, List from datetime import datetime from app.utils.logger import logger from app.services.llm_service import llm_service from app.services.news_service import get_news_service from app.services.bitget_service import bitget_service class MarketSignalAnalyzer: """市场信号分析器 - 只关注市场,输出客观信号""" INTRADAY_ANALYSIS_TEMPERATURE = 0.15 TREND_ANALYSIS_TEMPERATURE = 0.10 ANALYSIS_MAX_TOKENS = 1200 LANE_MIN_CONFIDENCE = { 'short_term': 60, 'medium_term': 65, } INTRADAY_ANALYSIS_PROMPT = """你是一位专业的加密货币日内交易员,只负责生成 short_term 信号。 你的任务是基于 5m / 15m / 30m、当日开盘、VWAP、开盘区间、关键位和衍生品拥挤度,判断未来 30 分钟到 4 小时内是否存在可执行 setup。 执行原则: 1. 先判断日内 regime:trending / ranging / neutral。 2. 趋势日内只做顺势回调或突破后的回踩确认,不追涨杀跌。 3. 震荡日内只做区间边界附近的反转,不在区间中部开仓。 4. 技术指标只做辅助,优先看结构、关键位、波动率、量能、VWAP 偏离和距离。 5. 没有清晰止损、止盈和盈亏比就不交易。 6. 本次分析独立进行,不参考任何上一轮信号。 信号要求: 1. 只允许输出 0 或 1 个 short_term 信号。 2. 盈亏比至少 1:1.5。 3. 如果价格处于加速延伸,优先返回空信号。 4. 如果价格位于区间中部、离关键位太远、止损过宽或方向证据冲突,必须返回空信号。 5. 只有在 setup 足够清晰时才允许输出信号;宁可空仓,不要勉强给单。 6. entry_type: - 价格已回到关键位并出现确认,可用 market - 仍需等待回踩/反抽,使用 limit 7. grade / confidence 约束: - A: 80-100,结构、位置、量价、时机都对齐 - B: 70-79,条件较完整但仍有一项次优 - C: 60-69,只有轻仓试错级别 - 60 以下不要输出交易信号 输出 JSON,禁止输出解释性正文: ```json { "market_state": "ranging/trending/neutral", "trend_direction": "uptrend/downtrend/neutral", "trend_strength": "strong/medium/weak", "analysis_summary": "20字以内,总结日内状态", "key_levels": { "support": [数字, 数字], "resistance": [数字, 数字] }, "signals": [ { "type": "short_term", "action": "buy/sell", "entry_type": "market/limit", "confidence": 0, "grade": "A/B/C", "entry_price": 0, "stop_loss": 0, "take_profit": 0, "reasoning": "结构+关键位+量能+波动率" } ] } ``` 额外约束: 1. `analysis_summary` 控制在 20 个中文字符以内。 2. `reasoning` 只写一条简洁证据链,不要写仓位建议。 3. `entry_price` / `stop_loss` / `take_profit` 必须是纯数字。 4. 做多必须满足 `stop_loss < entry_price < take_profit`;做空必须满足 `take_profit < entry_price < stop_loss`。 5. 没有 setup 时必须返回 `signals: []`。 """ TREND_ANALYSIS_PROMPT = """你是一位专业的加密货币趋势交易员,只负责生成 medium_term 信号。 你的任务是基于 1h / 4h、关键位、趋势阶段、反转检测、衍生品拥挤度和新闻催化,判断未来 4 小时到 3 天内是否存在趋势 setup。 执行原则: 1. 4h 决定大方向,1h 决定节奏与入场位置。 2. 只做两类交易: - 趋势延续:4h 趋势明确,1h 回踩关键位后确认继续 - 趋势反转:4h 结构和 1h 动能同时改善,且反转证据充分 3. 禁止仅凭 15m 噪音逆 4h 开仓。 4. 趋势晚期、资金费率过热或价格过度偏离关键均线时,要显著降低开仓积极性。 5. 没有清晰位置优势就不交易。 6. 本次分析独立进行,不参考任何上一轮信号。 信号要求: 1. 只允许输出 0 或 1 个 medium_term 信号。 2. 盈亏比至少 1:1.8。 3. 如果 4h 与 1h 明显冲突,优先返回空信号。 4. 反转信号必须比延续信号更严格。 5. 如果趋势处于晚期且没有回踩确认,或反转证据不足,必须返回空信号。 6. 只有在位置优势和方向一致性都充分时才允许开仓。 7. grade / confidence 约束: - A: 82-100,4h/1h 同向且位置优 - B: 72-81,趋势或反转证据较完整 - C: 65-71,仅限早期确认不足的轻仓趋势尝试 - 65 以下不要输出交易信号 输出 JSON,禁止输出解释性正文: ```json { "market_state": "ranging/trending/neutral", "trend_direction": "uptrend/downtrend/neutral", "trend_strength": "strong/medium/weak", "analysis_summary": "20字以内,总结趋势状态", "key_levels": { "support": [数字, 数字], "resistance": [数字, 数字] }, "signals": [ { "type": "medium_term", "action": "buy/sell", "entry_type": "market/limit", "confidence": 0, "grade": "A/B/C", "entry_price": 0, "stop_loss": 0, "take_profit": 0, "reasoning": "4h方向+1h节奏+关键位+量价" } ] } ``` 额外约束: 1. `analysis_summary` 控制在 20 个中文字符以内。 2. `reasoning` 只写一条简洁证据链,不要写仓位建议。 3. `entry_price` / `stop_loss` / `take_profit` 必须是纯数字。 4. 做多必须满足 `stop_loss < entry_price < take_profit`;做空必须满足 `take_profit < entry_price < stop_loss`。 5. 没有 setup 时必须返回 `signals: []`。 """ def __init__(self): self.news_service = get_news_service() self.exchange = bitget_service async def analyze(self, symbol: str, data: Dict[str, Any], symbols: List[str] = None) -> Dict[str, Any]: """ 分析市场并生成信号 Args: symbol: 交易对 data: 多周期K线数据 symbols: 所有监控的交易对(用于市场对比) Returns: 市场信号字典 """ try: # 1. 准备市场数据 market_context = self._prepare_market_context(symbol, data, symbols) # 2. 获取新闻舆情 news_context = await self._get_news_context(symbol) # 3. 获取合约市场数据(资金费率、持仓量等) futures_context = await self._get_futures_context(symbol) # 4. 将日内和趋势拆成两次独立分析,避免一个 prompt 同时混做两件事 intraday_prompt = self._build_analysis_prompt( symbol=symbol, lane="intraday", market_context=market_context, news_context=news_context, futures_context=futures_context ) trend_prompt = self._build_analysis_prompt( symbol=symbol, lane="trend", market_context=market_context, news_context=news_context, futures_context=futures_context ) intraday_messages = [ {"role": "system", "content": self.INTRADAY_ANALYSIS_PROMPT}, {"role": "user", "content": intraday_prompt} ] trend_messages = [ {"role": "system", "content": self.TREND_ANALYSIS_PROMPT}, {"role": "user", "content": trend_prompt} ] intraday_response, trend_response = await asyncio.gather( llm_service.achat( intraday_messages, temperature=self.INTRADAY_ANALYSIS_TEMPERATURE, max_tokens=self.ANALYSIS_MAX_TOKENS ), llm_service.achat( trend_messages, temperature=self.TREND_ANALYSIS_TEMPERATURE, max_tokens=self.ANALYSIS_MAX_TOKENS ) ) intraday_result = self._parse_llm_response(intraday_response or "", symbol) trend_result = self._parse_llm_response(trend_response or "", symbol) return self._merge_lane_results(symbol, intraday_result, trend_result) except Exception as e: logger.error(f"市场信号分析失败: {e}") import traceback logger.debug(traceback.format_exc()) return self._get_empty_signal(symbol) def _prepare_market_context(self, symbol: str, data: Dict, symbols: List[str] = None) -> Dict[str, str]: """准备市场上下文信息""" current_price = float(data['5m'].iloc[-1]['close']) price_change_24h = self._calculate_price_change_24h(data['1h']) day_open = self._get_session_open(data.get('1h')) session_vwap = self._calculate_session_vwap(data.get('5m')) opening_range = self._calculate_opening_range(data.get('5m')) feature_5m = self._summarize_timeframe_features(data.get('5m'), '5m') feature_15m = self._summarize_timeframe_features(data.get('15m'), '15m') feature_30m = self._summarize_timeframe_features(data.get('30m'), '30m') feature_1h = self._summarize_timeframe_features(data.get('1h'), '1h') feature_4h = self._summarize_timeframe_features(data.get('4h'), '4h') intraday_alignment = self._describe_alignment([feature_5m, feature_15m, feature_30m]) trend_alignment = self._describe_alignment([feature_1h, feature_4h]) range_zone = self._detect_range_zone(data) reversal_detection = self._detect_trend_reversal(data) trend_stage = self._detect_trend_stage(data) key_levels = self._derive_key_levels(data, range_zone) snapshot_parts = [ f"## 市场快照", f"- 交易对: {symbol}", f"- 当前价格: {current_price:.2f}", f"- 24h涨跌: {price_change_24h}", f"- 当日开盘: {day_open:.2f}" if day_open is not None else "- 当日开盘: N/A", f"- 会话VWAP: {session_vwap:.2f}" if session_vwap is not None else "- 会话VWAP: N/A", ] if day_open: snapshot_parts.append(f"- 相对日开盘偏离: {((current_price - day_open) / day_open) * 100:+.2f}%") if session_vwap: snapshot_parts.append(f"- 相对VWAP偏离: {((current_price - session_vwap) / session_vwap) * 100:+.2f}%") if opening_range: snapshot_parts.append( f"- 开盘区间(前30分钟): 高 {opening_range['high']:.2f} / 低 {opening_range['low']:.2f}" ) intraday_parts = [ "## 日内特征", self._format_feature_line(feature_5m), self._format_feature_line(feature_15m), self._format_feature_line(feature_30m), f"- 日内级别一致性: {intraday_alignment}", ] trend_parts = [ "## 趋势特征", self._format_feature_line(feature_1h), self._format_feature_line(feature_4h), f"- 趋势级别一致性: {trend_alignment}", ] if trend_stage.get('stage') != 'unknown': stage_map = {'early': '早期', 'middle': '中期', 'late': '晚期'} trend_parts.append( f"- 趋势阶段: {stage_map.get(trend_stage['stage'], trend_stage['stage'])} ({trend_stage['confidence']}%) | {trend_stage['analysis']}" ) levels_parts = [ "## 关键位", f"- 支撑位: {self._format_levels(key_levels.get('support'))}", f"- 阻力位: {self._format_levels(key_levels.get('resistance'))}", ] if range_zone.get('is_ranging'): intraday_parts.append( f"- 区间判断: 是 ({range_zone['confidence']}%) | 宽度 {range_zone.get('range_width_pct', 0):.2f}% | {range_zone.get('analysis', '')}" ) else: intraday_parts.append("- 区间判断: 否") if reversal_detection.get('is_reversing'): reversal_type = "bullish" if reversal_detection.get('reversal_type') == 'bullish_reversal' else "bearish" signal_desc = ", ".join(sig['desc'] for sig in reversal_detection.get('signals', [])[:3]) trend_parts.append( f"- 反转检测: {reversal_type} ({reversal_detection['confidence']}%) | {signal_desc}" ) else: trend_parts.append("- 反转检测: 无显著反转信号") return { 'snapshot': "\n".join(snapshot_parts), 'intraday': "\n".join(intraday_parts), 'trend': "\n".join(trend_parts), 'levels': "\n".join(levels_parts), } def _get_session_open(self, df: Optional[pd.DataFrame]) -> Optional[float]: """获取当前交易日开盘价""" if df is None or df.empty: return None try: if 'open_time' not in df.columns: return float(df.iloc[-24]['open']) if len(df) >= 24 else float(df.iloc[0]['open']) latest_time = pd.to_datetime(df['open_time'].iloc[-1]) session_start = latest_time.normalize() today_bars = df[df['open_time'] >= session_start] if not today_bars.empty: return float(today_bars.iloc[0]['open']) except Exception as e: logger.debug(f"获取交易日开盘价失败: {e}") return float(df.iloc[0]['open']) if not df.empty else None def _calculate_session_vwap(self, df: Optional[pd.DataFrame]) -> Optional[float]: """计算当前交易日 VWAP""" if df is None or df.empty or 'volume' not in df.columns: return None try: session_df = df if 'open_time' in df.columns: latest_time = pd.to_datetime(df['open_time'].iloc[-1]) session_start = latest_time.normalize() session_df = df[df['open_time'] >= session_start] if session_df.empty: return None typical_price = (session_df['high'] + session_df['low'] + session_df['close']) / 3 volume = session_df['volume'].replace(0, np.nan) total_volume = volume.sum() if pd.isna(total_volume) or total_volume <= 0: return None return float((typical_price * session_df['volume']).sum() / total_volume) except Exception as e: logger.debug(f"计算 VWAP 失败: {e}") return None def _calculate_opening_range(self, df: Optional[pd.DataFrame], bars: int = 6) -> Optional[Dict[str, float]]: """计算前 30 分钟开盘区间""" if df is None or df.empty or len(df) < bars: return None try: session_df = df if 'open_time' in df.columns: latest_time = pd.to_datetime(df['open_time'].iloc[-1]) session_start = latest_time.normalize() session_df = df[df['open_time'] >= session_start] session_df = session_df.iloc[:bars] if session_df.empty: return None return { 'high': float(session_df['high'].max()), 'low': float(session_df['low'].min()) } except Exception as e: logger.debug(f"计算开盘区间失败: {e}") return None def _summarize_timeframe_features(self, df: Optional[pd.DataFrame], timeframe: str) -> Dict[str, Any]: """将单个周期的 K 线转换为高价值特征摘要""" feature = { 'timeframe': timeframe, 'available': False, 'close': None, 'ema_alignment': 'neutral', 'structure': 'unknown', 'momentum_3': None, 'momentum_12': None, 'rsi': None, 'atr_pct': None, 'volume_ratio': None, 'distance_to_ema20': None, 'distance_to_recent_high': None, 'distance_to_recent_low': None, 'is_accelerating': False, } if df is None or df.empty or len(df) < 20: return feature latest = df.iloc[-1] close = float(latest['close']) ema5 = latest.get('ema5') ema10 = latest.get('ema10') ema20 = latest.get('ema20') rsi = latest.get('rsi') atr = latest.get('atr') feature.update({ 'available': True, 'close': close, 'rsi': float(rsi) if pd.notna(rsi) else None, 'atr_pct': float(atr / close * 100) if pd.notna(atr) and close > 0 else None, 'distance_to_ema20': self._distance_percent(close, ema20), 'structure': self._infer_price_structure(df), 'momentum_3': self._window_return(df, 3), 'momentum_12': self._window_return(df, 12), 'volume_ratio': self._calculate_volume_ratio(df), 'is_accelerating': self._is_accelerating(df), }) if pd.notna(ema5) and pd.notna(ema10) and pd.notna(ema20): if ema5 > ema10 > ema20: feature['ema_alignment'] = 'bull' elif ema5 < ema10 < ema20: feature['ema_alignment'] = 'bear' else: feature['ema_alignment'] = 'mixed' recent_window = df.iloc[-20:] recent_high = float(recent_window['high'].max()) recent_low = float(recent_window['low'].min()) feature['distance_to_recent_high'] = self._distance_percent(close, recent_high) feature['distance_to_recent_low'] = self._distance_percent(close, recent_low) return feature def _format_feature_line(self, feature: Dict[str, Any]) -> str: """格式化单周期特征摘要""" if not feature.get('available'): return f"- {feature.get('timeframe')}: 数据不足" def fmt(value: Optional[float], digits: int = 2) -> str: return "N/A" if value is None else f"{value:+.{digits}f}%" return ( f"- {feature['timeframe']}: 结构={feature['structure']} | EMA={feature['ema_alignment']} | " f"3bar={fmt(feature['momentum_3'])} | 12bar={fmt(feature['momentum_12'])} | " f"RSI={feature['rsi']:.1f} | ATR={feature['atr_pct']:.2f}% | " f"量比={feature['volume_ratio']:.2f} | " f"距EMA20={fmt(feature['distance_to_ema20'])} | " f"距20bar高点={fmt(feature['distance_to_recent_high'])} | " f"距20bar低点={fmt(feature['distance_to_recent_low'])} | " f"加速={'是' if feature['is_accelerating'] else '否'}" ) def _describe_alignment(self, features: List[Dict[str, Any]]) -> str: """描述多周期方向一致性""" directions = [] for feature in features: if not feature.get('available'): continue direction = feature.get('ema_alignment') if direction == 'mixed': direction = 'neutral' directions.append(direction) if not directions: return "数据不足" if all(direction == 'bull' for direction in directions): return "多头一致" if all(direction == 'bear' for direction in directions): return "空头一致" if all(direction == 'neutral' for direction in directions): return "全部中性" return "存在分歧" def _derive_key_levels(self, data: Dict[str, pd.DataFrame], range_zone: Dict[str, Any]) -> Dict[str, List[float]]: """提炼高价值支撑阻力位""" supports: List[float] = [] resistances: List[float] = [] if range_zone.get('support_level'): supports.append(float(range_zone['support_level'])) if range_zone.get('resistance_level'): resistances.append(float(range_zone['resistance_level'])) for timeframe, count in [('30m', 20), ('1h', 20), ('4h', 12)]: df = data.get(timeframe) if df is None or len(df) < count: continue window = df.iloc[-count:] supports.append(float(window['low'].min())) resistances.append(float(window['high'].max())) ema20 = df['ema20'].iloc[-1] if 'ema20' in df.columns else None if pd.notna(ema20): if float(ema20) < float(df['close'].iloc[-1]): supports.append(float(ema20)) else: resistances.append(float(ema20)) return { 'support': self._dedupe_levels(supports, reverse=True), 'resistance': self._dedupe_levels(resistances, reverse=False), } def _dedupe_levels(self, levels: List[float], reverse: bool) -> List[float]: """对价位去重,避免同类水平位过密""" cleaned = sorted([float(level) for level in levels if level], reverse=reverse) deduped: List[float] = [] for level in cleaned: if not deduped: deduped.append(level) continue if abs(level - deduped[-1]) / deduped[-1] > 0.003: deduped.append(level) if len(deduped) >= 3: break return deduped def _format_levels(self, levels: Optional[List[float]]) -> str: if not levels: return "N/A" return ", ".join(f"{level:.2f}" for level in levels[:3]) def _infer_price_structure(self, df: pd.DataFrame, lookback: int = 20) -> str: """根据分段高低点判断 HH/HL / LH/LL / 区间""" if df is None or len(df) < lookback: return "unknown" window = df.iloc[-lookback:] half = max(lookback // 2, 5) first = window.iloc[:half] second = window.iloc[-half:] prev_high = float(first['high'].max()) prev_low = float(first['low'].min()) recent_high = float(second['high'].max()) recent_low = float(second['low'].min()) if recent_high > prev_high and recent_low > prev_low: return "HH/HL" if recent_high < prev_high and recent_low < prev_low: return "LH/LL" return "range/mixed" def _window_return(self, df: pd.DataFrame, bars: int) -> Optional[float]: if df is None or len(df) <= bars: return None start_price = float(df['close'].iloc[-bars - 1]) end_price = float(df['close'].iloc[-1]) if start_price <= 0: return None return (end_price - start_price) / start_price * 100 def _calculate_volume_ratio(self, df: pd.DataFrame, window: int = 20) -> float: if df is None or len(df) <= window: return 1.0 latest_volume = float(df['volume'].iloc[-1]) baseline = float(df['volume'].iloc[-window:-1].mean()) if baseline <= 0: return 1.0 return latest_volume / baseline def _is_accelerating(self, df: pd.DataFrame, bars: int = 3, threshold: float = 0.3) -> bool: if df is None or len(df) < bars + 1: return False closes = df['close'].iloc[-(bars + 1):].values changes = [ (closes[i] - closes[i - 1]) / closes[i - 1] * 100 for i in range(1, len(closes)) if closes[i - 1] > 0 ] if len(changes) < bars: return False same_direction = all(change > 0 for change in changes) or all(change < 0 for change in changes) large_enough = sum(1 for change in changes if abs(change) >= threshold) >= bars - 1 return same_direction and large_enough def _distance_percent(self, value: Optional[float], reference: Optional[float]) -> Optional[float]: if value is None or reference is None or pd.isna(reference) or reference == 0: return None return (float(value) - float(reference)) / float(reference) * 100 async def _get_news_context(self, symbol: str) -> str: """获取新闻舆情上下文""" try: news_result = await self.news_service.get_crypto_news(symbol) if not news_result or not news_result.get('articles'): return "无最新新闻" articles = news_result['articles'][:5] # 只取前5条 context_parts = ["\n## 最新新闻"] for article in articles: title = article.get('title', '') source = article.get('source', '') published_at = article.get('publishedAt', '') time_str = published_at.split('T')[1][:5] if 'T' in published_at else '' context_parts.append(f"- [{time_str}] {title} ({source})") return "\n".join(context_parts) except Exception as e: logger.warning(f"获取新闻失败: {e}") return "新闻获取失败" async def _get_futures_context(self, symbol: str) -> str: """获取合约市场数据(资金费率、持仓量、溢价率)""" try: loop = asyncio.get_event_loop() market_data = await loop.run_in_executor( None, self.exchange.get_futures_market_data, symbol ) if not market_data: return "" return self._format_futures_context(symbol, market_data) except Exception as e: logger.warning(f"获取 {symbol} 合约数据失败: {e}") return "" def _format_futures_context(self, symbol: str, market_data: Dict[str, Any]) -> str: """格式化高价值合约特征,避免大段说明性文本""" funding = market_data.get('funding_rate', {}) oi = market_data.get('open_interest', {}) premium = market_data.get('premium_rate') lines = [ f"## 衍生品特征", f"- 交易对: {symbol}", ] if funding: lines.append( f"- 资金费率: {funding.get('funding_rate_percent', 0):+.4f}% | 情绪: {funding.get('sentiment', 'unknown')}" ) lines.append( f"- 标记价 vs 指数价: {market_data.get('mark_price', 0):.2f} vs {market_data.get('index_price', 0):.2f}" ) if oi: lines.append(f"- 持仓量: {oi.get('open_interest', 0):,.0f}") if premium is not None: lines.append(f"- 溢价率: {premium:+.2f}%") return "\n".join(lines) def _analyze_trend_position(self, data: Dict[str, pd.DataFrame]) -> str: """分析趋势位置和日内交易机会(使用 EMA)+ 市场状态判断(震荡/趋势)""" try: df_30m = data.get('30m') df_15m = data.get('15m') df_1h = data.get('1h') if df_30m is None or len(df_30m) < 50: return "" latest_30m = df_30m.iloc[-1] current_price = float(latest_30m['close']) # 获取日内级别 EMA(30m) ema5_30m = latest_30m.get('ma5') # 实际是 ema5 ema10_30m = latest_30m.get('ma10') # 实际是 ema10 ema20_30m = latest_30m.get('ma20') # 实际是 ema20 if not all([ema5_30m, ema10_30m, ema20_30m]): return "" # ========== 新增:市场状态判断(震荡 vs 趋势) ========== market_state = "unknown" market_state_reason = [] # 1h EMA 趋势判断 if df_1h is not None and len(df_1h) >= 20: latest_1h = df_1h.iloc[-1] ema5_1h = latest_1h.get('ma5') ema10_1h = latest_1h.get('ma10') ema20_1h = latest_1h.get('ma20') if ema5_1h and ema10_1h and ema20_1h: # 1h EMA 多头/空头排列 → 趋势市 if ema5_1h > ema10_1h > ema20_1h: market_state = "trending" market_state_reason.append("1h EMA 多头排列") elif ema5_1h < ema10_1h < ema20_1h: market_state = "trending" market_state_reason.append("1h EMA 空头排列") else: market_state = "ranging" market_state_reason.append("1h EMA 纠缠") # 波动率判断(ATR 变化) if df_30m is not None and len(df_30m) >= 24 and 'atr' in df_30m.columns: recent_atr = df_30m['atr'].iloc[-6:].mean() # 最近3小时 older_atr = df_30m['atr'].iloc[-12:-6].mean() # 之前3小时 if pd.notna(recent_atr) and pd.notna(older_atr) and older_atr > 0: atr_change = (recent_atr - older_atr) / older_atr * 100 if atr_change > 20: if market_state != "trending": market_state = "trending" market_state_reason.append(f"ATR 扩张 {atr_change:.0f}%") elif atr_change < -20: if market_state != "ranging": market_state = "ranging" market_state_reason.append(f"ATR 收缩 {abs(atr_change):.0f}%") # 价格动量判断(15m) if df_15m is not None and len(df_15m) >= 20: recent_high = df_15m['high'].iloc[-20:].max() recent_low = df_15m['low'].iloc[-20:].min() price_range = (recent_high - recent_low) / current_price * 100 if price_range < 2.5: # 15分钟内波动小于2.5% → 震荡 if market_state != "trending": market_state = "ranging" market_state_reason.append(f"15m 波动 {price_range:.1f}% 较小") elif price_range > 4: # 15分钟内波动大于4% → 趋势 if market_state != "ranging": market_state = "trending" market_state_reason.append(f"15m 波动 {price_range:.1f}% 较大") # 判断日内趋势(30m EMA 为主) if ema5_30m > ema10_30m > ema20_30m: intraday_trend = "上升" intraday_emoji = "📈" elif ema5_30m < ema10_30m < ema20_30m: intraday_trend = "下跌" intraday_emoji = "📉" else: intraday_trend = "震荡" intraday_emoji = "➖" # 构建市场状态分析 analysis_parts = [] # 市场状态显示(新增) if market_state == "trending": state_emoji = "📊" state_text = f"{state_emoji} **市场状态: 趋势市**" analysis_parts.append(state_text) analysis_parts.append(f" 判断依据: {', '.join(market_state_reason)}") analysis_parts.append(f" 策略: 跟随趋势,等待回调/反弹到 EMA20 顺势入场") analysis_parts.append(f" 目标: 3-5%,盈亏比 ≥ 1:1.5") analysis_parts.append(f" 严禁: 逆势做超短线") elif market_state == "ranging": state_emoji = "🔄" state_text = f"{state_emoji} **市场状态: 震荡市**" analysis_parts.append(state_text) analysis_parts.append(f" 判断依据: {', '.join(market_state_reason)}") analysis_parts.append(f" 策略: 5分钟级别高抛低吸,支撑位多、压力位空") analysis_parts.append(f" 目标: 1-2%,盈亏比 ≥ 1:1.5") analysis_parts.append(f" 严禁: 追涨杀跌") else: analysis_parts.append(f"⚠️ 市场状态: 不明确,观望为主") analysis_parts.append(f"") analysis_parts.append(f"日内趋势(30m EMA): {intraday_emoji} {intraday_trend}") analysis = analysis_parts # 检查15分钟级别入场时机 if df_15m is not None and len(df_15m) >= 20: latest_15m = df_15m.iloc[-1] rsi_15m = latest_15m.get('rsi', 50) ema5_15m = latest_15m.get('ma5') # 实际是 ema5 ema20_15m = latest_15m.get('ma20') # 实际是 ema20 # 检查短期动能 if len(df_15m) >= 5: recent_closes = df_15m['close'].iloc[-5:].values is_accelerating = all(recent_closes[i] > recent_closes[i-1] for i in range(1, 5)) # 检查连续大阳线/阴线(快速移动) recent_changes = [(recent_closes[i] - recent_closes[i-1]) / recent_closes[i-1] * 100 for i in range(1, len(recent_closes))] big_moves = sum(1 for change in recent_changes if abs(change) > 0.3) is_rapid_moving = big_moves >= 3 avg_move = sum(abs(c) for c in recent_changes) / len(recent_changes) if recent_changes else 0 else: is_accelerating = False is_rapid_moving = False avg_move = 0 # 计算价格偏离 if ema5_15m and ema20_15m: deviation_ema5_15m = abs(current_price - ema5_15m) / ema5_15m * 100 distance_to_ema20 = abs(current_price - ema20_15m) / ema20_15m * 100 else: deviation_ema5_15m = 0 distance_to_ema20 = 0 # 检查成交量 df_5m = data.get('5m') volume_ratio = 1 if df_5m is not None and len(df_5m) >= 20: vol_latest = df_5m['volume'].iloc[-1] vol_ma20 = df_5m['volume'].iloc[-20:-1].mean() volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1 # 检查5m连续K线走势 if len(df_5m) >= 3: recent_5m_closes = df_5m['close'].iloc[-3:].values recent_5m_changes = [(recent_5m_closes[i] - recent_5m_closes[i-1]) / recent_5m_closes[i-1] * 100 for i in range(1, len(recent_5m_closes))] big_5m_moves = sum(1 for change in recent_5m_changes if abs(change) > 0.3) is_5m_accelerating = big_5m_moves >= 2 else: is_5m_accelerating = False # 日内过度延伸检查(EMA 反应更快,阈值更严格) is_overextended = ( (rsi_15m > 70 and intraday_trend == "上升") or (rsi_15m < 30 and intraday_trend == "下跌") or deviation_ema5_15m > 3 ) if intraday_trend == "上升": # 价格加速检查 - 强制观望,防止追涨 if is_rapid_moving and volume_ratio > 1.5 and deviation_ema5_15m > 0.5: analysis.append(f"⚠️ 15m: 价格正在快速上涨!连续{big_moves}根大阳线,平均涨幅{avg_move:.2f}%") analysis.append(f" → 量比 {volume_ratio:.1f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 🚨 **严禁追涨!强制 HOLD 观望**,等待回调后再考虑") analysis.append(f" → 如果要入场,等待回调到 EMA20 支撑位用 limit 挂单") analysis.append(f" → 追涨是持续止损的主要原因!") elif is_overextended: analysis.append(f"⚠️ 15m 过度延伸: RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 不要追多,等待回调") elif is_accelerating and not is_overextended: analysis.append(f"15m: 正在上涨中,建议等待回调") analysis.append(f" → 等待回调到 EMA20 支撑位用 limit 挂单做多") analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") elif distance_to_ema20 < 1: analysis.append(f"15m: 回调到 EMA20 支撑位附近") analysis.append(f" → 支撑位做多反弹(EMA20: ${ema20_15m:.0f})") analysis.append(f" → 用 limit 挂单入场,止损1%,目标2-3%,盈亏比 >= 1:1.5") else: analysis.append(f"15m: 上涨中,耐心等待回调机会") analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 不要追多,等待回调到支撑位") elif intraday_trend == "下跌": # 价格加速检查 - 强制观望,防止杀跌 if is_rapid_moving and volume_ratio > 1.5 and deviation_ema5_15m > 0.5: analysis.append(f"⚠️ 15m: 价格正在快速下跌!连续{big_moves}根大阴线,平均跌幅{avg_move:.2f}%") analysis.append(f" → 量比 {volume_ratio:.1f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 🚨 **严禁杀跌!强制 HOLD 观望**,等待反弹后再考虑") analysis.append(f" → 如果要入场,等待反弹到 EMA20 压力位用 limit 挂单") analysis.append(f" → 杀跌是持续止损的主要原因!") elif is_overextended: analysis.append(f"⚠️ 15m 过度延伸: RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 不要追空,等待反弹") elif is_accelerating and not is_overextended: analysis.append(f"15m: 正在下跌中,建议等待反弹") analysis.append(f" → 等待反弹到 EMA20 压力位用 limit 挂单做空") analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") elif distance_to_ema20 < 1: analysis.append(f"15m: 反弹到 EMA20 压力位附近") analysis.append(f" → 压力位做空回调(EMA20: ${ema20_15m:.0f})") analysis.append(f" → 用 limit 挂单入场,止损1%,目标2-3%,盈亏比 >= 1:1.5") else: analysis.append(f"15m: 下跌中,耐心等待反弹机会") analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") analysis.append(f" → 不要追空,等待反弹到压力位") else: analysis.append(f"15m: 震荡,观望或双向轻仓") analysis.append(f" → 支撑位多,压力位空,盈亏比 >= 1:1.5") # 日内交易要点 analysis.append(f"\n💡 稳健交易要点:") analysis.append(f"- **90%用limit挂单,10%用market**:耐心等待回调,不要追涨杀跌") analysis.append(f"- **价格加速时强制HOLD**:连续大阳/阴线时观望,等回调/反弹") analysis.append(f"- **RSI极端区强制HOLD**:>70(多)或 <30(空)时不入场") analysis.append(f"- **偏离EMA5>1.5%强制HOLD**:价格过度延伸,等待回归") analysis.append(f"- **盈亏比第一**: 必须 >= 1:1.5,否则不开仓") analysis.append(f"- **快进快出**: 持仓不超过4小时") analysis.append(f"- **止损设置**: 优先 1.5×ATR(30m),参考范围 0.8-2.5%") analysis.append(f"- **目标盈利**: 2-3%") analysis.append(f"- **宁可错过,不做错**: 追涨杀跌是持续止损的主要原因") return "\n".join(analysis) if analysis else "" except Exception as e: logger.warning(f"趋势位置分析失败: {e}") return "" def _build_analysis_prompt(self, symbol: str, lane: str, market_context: Dict[str, str], news_context: str, futures_context: str = "") -> str: """构建分析提示词""" lane_text = "日内交易分析" if lane == "intraday" else "趋势交易分析" lane_scope = ( [ "只根据下面提供的日内结构化特征做判断,不要脑补未提供的数据。", "重点阅读 5m/15m/30m、当日开盘、VWAP、开盘区间、区间状态、关键位和衍生品过热程度。", ] if lane == "intraday" else [ "只根据下面提供的趋势结构化特征做判断,不要脑补未提供的数据。", "重点阅读 1h/4h、一致性、趋势阶段、反转检测、关键位、新闻催化和衍生品拥挤度。", ] ) selected_sections = [ market_context.get('snapshot', ''), market_context.get('intraday', '') if lane == "intraday" else market_context.get('trend', ''), market_context.get('levels', ''), ] prompt_parts = [ f"请对 {symbol} 进行{lane_text}。", *lane_scope, ] for section in selected_sections: if section: prompt_parts.append("") prompt_parts.append(section) if news_context and news_context not in {"无最新新闻", "新闻获取失败"}: prompt_parts.append("") prompt_parts.append(news_context) if futures_context: prompt_parts.append("") prompt_parts.append(futures_context) prompt_parts.append("") prompt_parts.append("输出要求:只返回 system prompt 定义的 JSON 对象。没有高质量 setup 就返回 signals: []。") return "\n".join(prompt_parts) def _merge_lane_results(self, symbol: str, intraday_result: Dict[str, Any], trend_result: Dict[str, Any]) -> Dict[str, Any]: """合并日内与趋势两路 LLM 结果""" result = self._get_empty_signal(symbol) result['raw_response'] = { 'intraday': intraday_result.get('raw_response', ''), 'trend': trend_result.get('raw_response', '') } intraday_signals = self._normalize_lane_signals(intraday_result.get('signals', []), 'short_term') trend_signals = self._normalize_lane_signals(trend_result.get('signals', []), 'medium_term') merged_signals = sorted( intraday_signals + trend_signals, key=lambda signal: signal.get('confidence', 0), reverse=True )[:2] result['signals'] = merged_signals result['key_levels'] = { 'support': self._dedupe_levels( (intraday_result.get('key_levels', {}) or {}).get('support', []) + (trend_result.get('key_levels', {}) or {}).get('support', []), reverse=True ), 'resistance': self._dedupe_levels( (intraday_result.get('key_levels', {}) or {}).get('resistance', []) + (trend_result.get('key_levels', {}) or {}).get('resistance', []), reverse=False ), } trend_direction = trend_result.get('trend_direction') if trend_direction in (None, 'neutral'): trend_direction = intraday_result.get('trend_direction', 'neutral') result['trend_direction'] = trend_direction or 'neutral' trend_strength = trend_result.get('trend_strength') if trend_strength in (None, 'weak') and result['trend_direction'] == 'neutral': trend_strength = intraday_result.get('trend_strength', 'weak') result['trend_strength'] = trend_strength or 'weak' intraday_state = intraday_result.get('market_state') trend_state = trend_result.get('market_state') if trend_state == 'trending': result['market_state'] = '趋势市' elif intraday_state == 'ranging': result['market_state'] = '震荡市' elif intraday_state == 'trending': result['market_state'] = '日内趋势' else: result['market_state'] = intraday_state or trend_state or '中性' intraday_summary = intraday_result.get('analysis_summary', '无') trend_summary = trend_result.get('analysis_summary', '无') result['analysis_summary'] = f"日内:{intraday_summary} | 趋势:{trend_summary}" if result['trend_direction'] == 'uptrend': result['trend'] = 'up' elif result['trend_direction'] == 'downtrend': result['trend'] = 'down' else: result['trend'] = 'sideways' result['timestamp'] = datetime.now().isoformat() return result def _normalize_lane_signals(self, signals: List[Dict[str, Any]], lane_type: str) -> List[Dict[str, Any]]: """统一信号时间框架标识""" normalized = [] for signal in sorted(signals, key=lambda item: item.get('confidence', 0), reverse=True): if signal.get('action') not in ['buy', 'sell']: continue signal = dict(signal) signal['confidence'] = max(0, min(float(signal.get('confidence', 0) or 0), 100)) min_confidence = self.LANE_MIN_CONFIDENCE.get(lane_type, 60) if signal['confidence'] < min_confidence: continue signal['entry_type'] = signal.get('entry_type', 'market') if signal['entry_type'] not in {'market', 'limit'}: signal['entry_type'] = 'market' if not self._is_signal_price_structure_valid(signal): continue signal['grade'] = self._infer_signal_grade(signal['confidence'], lane_type) if not signal.get('reasoning'): signal['reasoning'] = '结构与关键位共振' signal['timeframe'] = lane_type signal['type'] = lane_type normalized.append(signal) return normalized[:1] def _infer_signal_grade(self, confidence: float, lane_type: str) -> str: """根据 lane 规则统一 grade,避免模型随意给等级""" if lane_type == 'medium_term': if confidence >= 82: return 'A' if confidence >= 72: return 'B' return 'C' if confidence >= 80: return 'A' if confidence >= 70: return 'B' return 'C' def _is_signal_price_structure_valid(self, signal: Dict[str, Any]) -> bool: """验证信号价格结构是否合法""" entry_price = signal.get('entry_price') stop_loss = signal.get('stop_loss') take_profit = signal.get('take_profit') action = signal.get('action') if not all(isinstance(price, (int, float)) and price > 0 for price in [entry_price, stop_loss, take_profit]): return False if action == 'buy': return stop_loss < entry_price < take_profit if action == 'sell': return take_profit < entry_price < stop_loss return False def _analyze_multi_timeframe_trend(self, data: Dict[str, Any]) -> str: """ 多级别趋势分析 - 检测小级别反转信号 目的:识别小级别(15m/30m)已经反转,但大级别(1h/4h)还未反应的情况 这样可以提前捕捉反转信号,而不是等待均线系统确认 """ context_parts = ["\n## 🔄 多级别趋势分析(检测反转信号)"] # 定义各级别 timeframes = { '5m': ('超短线', 5), '15m': ('短线', 15), '30m': ('日内', 30), '1h': ('小时', 60), '4h': ('趋势', 240) } trend_status = {} # 存储各级别趋势状态 # 分析各级别趋势 for tf, (tf_name, minutes) in timeframes.items(): df = data.get(tf) if df is None or len(df) < 10: continue latest = df.iloc[-1] prev = df.iloc[-2] # 1. 均线趋势判断 ma5 = latest.get('ma5', 0) ma10 = latest.get('ma10', 0) ma20 = latest.get('ma20', 0) ma_trend = None if ma5 and ma10 and ma20: if ma5 > ma10 > ma20: ma_trend = 'bull' elif ma5 < ma10 < ma20: ma_trend = 'bear' else: ma_trend = 'neutral' # 2. MACD 趋势判断 macd_trend = None if 'macd' in df.columns and 'macd_signal' in df.columns: macd = df['macd'].iloc[-1] signal = df['macd_signal'].iloc[-1] hist = df.get('macd_hist', pd.Series([0])).iloc[-1] if macd > 0 and signal > 0: macd_trend = 'bull' elif macd < 0 and signal < 0: macd_trend = 'bear' else: macd_trend = 'neutral' # 3. 价格动量(最近3根K线) close_3 = df['close'].iloc[-3] close_2 = df['close'].iloc[-2] close_1 = df['close'].iloc[-1] price_momentum = 'up' if close_1 > close_3 else 'down' if close_1 < close_3 else 'flat' # 综合判断趋势 if ma_trend == 'bull' and (macd_trend == 'bull' or price_momentum == 'up'): trend = 'bull' elif ma_trend == 'bear' and (macd_trend == 'bear' or price_momentum == 'down'): trend = 'bear' elif price_momentum == 'up' and macd_trend == 'bull': trend = 'bull' elif price_momentum == 'down' and macd_trend == 'bear': trend = 'bear' else: trend = 'neutral' trend_status[tf] = { 'name': tf_name, 'trend': trend, 'ma_trend': ma_trend, 'macd_trend': macd_trend, 'momentum': price_momentum, 'price': float(latest['close']), 'change_3': ((close_1 - close_3) / close_3 * 100) if close_3 > 0 else 0 } # 生成多级别趋势报告 if not trend_status: context_parts.append("⚠️ 数据不足,无法进行多级别分析") return "\n".join(context_parts) # 检测反转信号 reversal_signals = [] # 1. 小级别反转但大级别未反转 if ('15m' in trend_status and '1h' in trend_status and trend_status['15m']['trend'] != trend_status['1h']['trend'] and trend_status['15m']['trend'] != 'neutral'): small_tf = trend_status['15m'] large_tf = trend_status['1h'] reversal_type = "🔄 反转信号" if large_tf['trend'] != 'neutral' else "⚡ 启动信号" reversal_signals.append( f"{reversal_type}: 15分钟[{small_tf['trend']}] vs 1小时[{large_tf['trend']}]" ) reversal_signals.append( f" 15分钟变动: {small_tf['change_3']:+.2f}% | 价格: ${small_tf['price']:.2f}" ) # 2. 30分钟反转但4小时未反转(更强的反转信号) if ('30m' in trend_status and '4h' in trend_status and trend_status['30m']['trend'] != trend_status['4h']['trend'] and trend_status['30m']['trend'] != 'neutral'): small_tf = trend_status['30m'] large_tf = trend_status['4h'] reversal_type = "🔄 强反转" if large_tf['trend'] != 'neutral' else "⚡ 趋势启动" reversal_signals.append( f"{reversal_type}: 30分钟[{small_tf['trend']}] vs 4小时[{large_tf['trend']}]" ) reversal_signals.append( f" 30分钟变动: {small_tf['change_3']:+.2f}% | 价格: ${small_tf['price']:.2f}" ) # 添加各级别趋势详情 context_parts.append("\n各级别趋势状态:") for tf in ['5m', '15m', '30m', '1h', '4h']: if tf in trend_status: status = trend_status[tf] trend_icon = {'bull': '📈', 'bear': '📉', 'neutral': '➡️'}.get(status['trend'], '❓') context_parts.append( f" {tf} ({status['name']}): {trend_icon} {status['trend']} " f"| 动量: {status['change_3']:+.2f}% | 价格: ${status['price']:.2f}" ) # 添加反转信号 if reversal_signals: context_parts.append("\n⚠️ 检测到级别背离/反转信号:") context_parts.extend(reversal_signals) context_parts.append("\n💡 提示: 小级别已反转但大级别滞后,可考虑:") context_parts.append(" - 反手操作(平掉旧仓位,开新方向仓位)") context_parts.append(" - 顺势短线(跟随小级别趋势,快进快出)") context_parts.append(" - 等待大级别确认(避免假突破)") else: context_parts.append("\n✅ 各级别趋势一致,无反转信号") return "\n".join(context_parts) def _parse_llm_response(self, response: str, symbol: str) -> Dict[str, Any]: """解析 LLM 响应""" try: # 尝试提取 JSON json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response) if json_match: json_str = json_match.group(1) else: json_match = re.search(r'\{[\s\S]*\}', response) if json_match: json_str = json_match.group(0) else: raise ValueError("无法找到 JSON 响应") # 清理 JSON 字符串(移除可能导致解析错误的注释等) json_str = self._clean_json_string(json_str) logger.debug(f"解析的 JSON 字符串: {json_str[:500]}...") # 打印前500字符用于调试 result = json.loads(json_str) # 清理价格字段 - 转换为 float result = self._clean_price_fields(result) result = self._normalize_response_schema(result) # 添加元数据 result['symbol'] = symbol result['timestamp'] = datetime.now().isoformat() result['raw_response'] = response # 兼容处理:确保 signals 中的字段与旧格式一致 if 'signals' in result: for sig in result['signals']: # LLM 输出的 "type" 是 timeframe (short_term/medium_term/long_term) # 需要映射为 "timeframe",而 "action" 才是 buy/sell/wait if 'type' in sig: # 如果 type 是 short_term/medium_term/long_term,映射为 timeframe if sig['type'] in ['short_term', 'medium_term', 'long_term']: sig['timeframe'] = sig.pop('type') # 如果 type 是 buy/sell/wait,映射为 action elif sig['type'] in ['buy', 'sell', 'wait']: sig['action'] = sig.pop('type') # 确保 action 字段存在 if 'action' not in sig and 'timeframe' in sig: # 从 reasoning 或其他字段推断 action sig['action'] = 'wait' # 确保 grade 字段存在 if 'grade' not in sig: # 根据 confidence 推断 grade confidence = sig.get('confidence', 0) if confidence >= 80: sig['grade'] = 'A' elif confidence >= 60: sig['grade'] = 'B' elif confidence >= 40: sig['grade'] = 'C' else: sig['grade'] = 'D' # 处理趋势字段 - 优先使用 LLM 返回的趋势字段,否则从信号推断 if 'trend_direction' not in result or 'trend_strength' not in result: # 从 signals 中推断趋势 if 'signals' in result and result['signals']: best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0)) action = best_signal.get('action', 'wait') confidence = best_signal.get('confidence', 0) # 推断趋势方向(如果 LLM 没有提供) if 'trend_direction' not in result: if action == 'buy': result['trend_direction'] = 'uptrend' elif action == 'sell': result['trend_direction'] = 'downtrend' else: result['trend_direction'] = 'neutral' # 推断趋势强度(如果 LLM 没有提供) if 'trend_strength' not in result: result['trend_strength'] = 'strong' if confidence >= 70 else 'medium' if confidence >= 50 else 'weak' # 从信号中推断 market_state(用于向后兼容) if 'signals' in result and result['signals']: best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0)) action = best_signal.get('action', 'wait') confidence = best_signal.get('confidence', 0) trend_direction = result.get('trend_direction', 'neutral') # 推断市场状态 if confidence >= 70 and trend_direction != 'neutral': if trend_direction == 'uptrend': result['market_state'] = '强势上涨' elif trend_direction == 'downtrend': result['market_state'] = '强势下跌' else: result['market_state'] = '震荡整理' else: result['market_state'] = '震荡整理' # 推断 trend(用于向后兼容,简化的趋势字段) if 'trend' not in result: if trend_direction == 'uptrend': result['trend'] = 'up' elif trend_direction == 'downtrend': result['trend'] = 'down' else: result['trend'] = 'sideways' else: result['market_state'] = '无明确信号' if 'trend' not in result: result['trend'] = 'sideways' logger.info(f"✅ 市场信号分析完成: {symbol}") logger.debug(f"市场信号: {json.dumps(result, ensure_ascii=False, indent=2)}") return result except Exception as e: logger.warning(f"解析 LLM 响应失败: {e}") logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符 return self._get_empty_signal(symbol) def _clean_json_string(self, json_str: str) -> str: """清理 JSON 字符串,移除可能导致解析错误的内容""" # 移除单行注释 // ... json_str = re.sub(r'//.*?(?=\n|$)', '', json_str) # 移除多行注释 /* ... */ json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str) # 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}) json_str = re.sub(r',\s*([}\]])', r'\1', json_str) return json_str def _normalize_response_schema(self, result: Dict[str, Any]) -> Dict[str, Any]: """归一化 LLM 输出结构,避免下游依赖脏数据""" if not isinstance(result, dict): return self._get_empty_signal("") normalized = dict(result) normalized['market_state'] = str(normalized.get('market_state', 'neutral') or 'neutral') normalized['trend_direction'] = str(normalized.get('trend_direction', 'neutral') or 'neutral') normalized['trend_strength'] = str(normalized.get('trend_strength', 'weak') or 'weak') normalized['analysis_summary'] = self._truncate_summary(normalized.get('analysis_summary', '')) normalized['key_levels'] = self._normalize_key_levels(normalized.get('key_levels')) normalized['signals'] = normalized.get('signals') if isinstance(normalized.get('signals'), list) else [] return normalized def _normalize_key_levels(self, key_levels: Any) -> Dict[str, List[float]]: """归一化关键位结构""" if not isinstance(key_levels, dict): return {'support': [], 'resistance': []} support = [float(level) for level in key_levels.get('support', []) if isinstance(level, (int, float))] resistance = [float(level) for level in key_levels.get('resistance', []) if isinstance(level, (int, float))] return { 'support': self._dedupe_levels(support, reverse=True), 'resistance': self._dedupe_levels(resistance, reverse=False), } def _truncate_summary(self, summary: Any, max_length: int = 20) -> str: text = str(summary or '').strip() return text[:max_length] def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: """清理价格字段,转换为 float""" def clean_price(price_value): if price_value is None: return None if isinstance(price_value, (int, float)): return float(price_value) if isinstance(price_value, str): # 移除 $ 符号和逗号 cleaned = price_value.replace('$', '').replace(',', '').strip() if cleaned: try: return float(cleaned) except ValueError: return None return None # 清理 key_levels 中的支撑位和阻力位 if 'key_levels' in data and data['key_levels']: key_levels = data['key_levels'] if 'support' in key_levels: data['key_levels']['support'] = [clean_price(s) for s in key_levels['support']] if 'resistance' in key_levels: data['key_levels']['resistance'] = [clean_price(r) for r in key_levels['resistance']] # 清理 signals 中的价格字段 if 'signals' in data: for sig in data['signals']: price_fields = ['entry_price', 'stop_loss', 'take_profit'] for field in price_fields: if field in sig: sig[field] = clean_price(sig[field]) # 验证止损止盈价格的合理性 entry_price = sig.get('entry_price') stop_loss = sig.get('stop_loss') take_profit = sig.get('take_profit') action = sig.get('action', '') if entry_price and entry_price > 0: MAX_REASONABLE_DEVIATION = 0.50 # 50% has_invalid_price = False # 检查止损 if stop_loss is not None: deviation = abs(stop_loss - entry_price) / entry_price if deviation > MAX_REASONABLE_DEVIATION: logger.warning(f"⚠️ [{data.get('symbol', '')}] 信号止损价格不合理: entry={entry_price}, stop_loss={stop_loss}, 偏离={deviation*100:.1f}%") has_invalid_price = True elif action == 'buy' and stop_loss >= entry_price: logger.warning(f"⚠️ [{data.get('symbol', '')}] 做多止损错误: entry={entry_price}, stop_loss={stop_loss} 应该 < entry") has_invalid_price = True elif action == 'sell' and stop_loss <= entry_price: logger.warning(f"⚠️ [{data.get('symbol', '')}] 做空止损错误: entry={entry_price}, stop_loss={stop_loss} 应该 > entry") has_invalid_price = True # 检查止盈 if take_profit is not None: deviation = abs(take_profit - entry_price) / entry_price if deviation > MAX_REASONABLE_DEVIATION: logger.warning(f"⚠️ [{data.get('symbol', '')}] 信号止盈价格不合理: entry={entry_price}, take_profit={take_profit}, 偏离={deviation*100:.1f}%") has_invalid_price = True elif action == 'buy' and take_profit <= entry_price: logger.warning(f"⚠️ [{data.get('symbol', '')}] 做多止盈错误: entry={entry_price}, take_profit={take_profit} 应该 > entry") has_invalid_price = True elif action == 'sell' and take_profit >= entry_price: logger.warning(f"⚠️ [{data.get('symbol', '')}] 做空止盈错误: entry={entry_price}, take_profit={take_profit} 应该 < entry") has_invalid_price = True # 如果价格不合理,降低等级为 D 或移除信号 if has_invalid_price: original_grade = sig.get('grade', 'C') sig['grade'] = 'D' sig['confidence'] = 0 # 添加错误说明 if 'reasoning' in sig: sig['reasoning'] = f"[价格异常] {sig['reasoning']}" logger.error(f"❌ [{data.get('symbol', '')}] 信号价格异常,等级从 {original_grade} 降为 D,止损止盈已清空") # 清空不合理的价格 sig['stop_loss'] = None sig['take_profit'] = None return data def _calculate_price_change_24h(self, df) -> str: """计算24小时涨跌幅""" try: if df is None or len(df) < 24: return "N/A" current_price = float(df['close'].iloc[-1]) price_24h_ago = float(df['close'].iloc[-24]) change = ((current_price - price_24h_ago) / price_24h_ago) * 100 sign = "+" if change >= 0 else "" return f"{sign}{change:.2f}%" except Exception as e: logger.debug(f"计算24h涨跌失败: {e}") return "N/A" def _get_empty_signal(self, symbol: str) -> Dict[str, Any]: """返回空信号""" return { 'symbol': symbol, 'trend_direction': 'neutral', 'trend_strength': 'weak', 'analysis_summary': 'unknown', 'volume_analysis': '分析失败', 'news_sentiment': 'neutral', 'news_impact': '无', 'market_state': '分析失败', 'trend': 'sideways', 'signals': [], 'key_levels': {}, 'timestamp': datetime.now().isoformat(), 'error': '信号分析失败' } def _analyze_volatility(self, data: Dict[str, pd.DataFrame]) -> str: """分析波动率变化(使用 30m 作为日内主周期)""" df = data.get('30m') if df is None or len(df) < 24 or 'atr' not in df.columns: return "" lines = [] # ATR 变化趋势 recent_atr = df['atr'].iloc[-6:].mean() # 最近 6 根(3小时) older_atr = df['atr'].iloc[-12:-6].mean() # 之前 6 根 if pd.isna(recent_atr) or pd.isna(older_atr) or older_atr == 0: return "" atr_change = (recent_atr - older_atr) / older_atr * 100 current_atr = float(df['atr'].iloc[-1]) current_price = float(df['close'].iloc[-1]) atr_percent = current_atr / current_price * 100 lines.append(f"当前 ATR (30m): ${current_atr:.2f} ({atr_percent:.2f}%)") if atr_change > 20: lines.append(f"**波动率扩张**: ATR 上升 {atr_change:.0f}%,日内趋势可能启动") elif atr_change < -20: lines.append(f"**波动率收缩**: ATR 下降 {abs(atr_change):.0f}%,可能即将变盘") else: lines.append(f"波动率稳定: ATR 变化 {atr_change:+.0f}%") # 布林带宽度 if 'bb_upper' in df.columns and 'bb_lower' in df.columns: bb_width = (float(df['bb_upper'].iloc[-1]) - float(df['bb_lower'].iloc[-1])) / current_price * 100 bb_width_prev = (float(df['bb_upper'].iloc[-6]) - float(df['bb_lower'].iloc[-6])) / float(df['close'].iloc[-6]) * 100 if bb_width < bb_width_prev * 0.8: lines.append(f"**布林带收口**: 宽度 {bb_width:.1f}%,变盘信号") elif bb_width > bb_width_prev * 1.2: lines.append(f"**布林带开口**: 宽度 {bb_width:.1f}%,趋势延续") return "\n".join(lines) def _detect_range_zone(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: """ 检测震荡区间 - 计算明确的支撑位和压力位 使用多种方法综合判断: 1. 价格通道(最近N根K线的最高/最低价) 2. 成交量密集区(Volume Profile) 3. 布林带 4. EMA支撑/压力 """ result = { 'is_ranging': False, 'support_level': None, 'resistance_level': None, 'range_width_pct': None, 'confidence': 0, 'volume_profile_support': None, 'volume_profile_resistance': None, 'analysis': '' } try: df_30m = data.get('30m') df_1h = data.get('1h') df_15m = data.get('15m') if df_30m is None or len(df_30m) < 48: # 需要至少48根K线(24小时) return result current_price = float(df_30m['close'].iloc[-1]) # ========== 1. 价格通道分析 ========== # 使用最近24-48根K线(12-24小时)计算价格通道 lookback_periods = [24, 36, 48] price_channels = [] for period in lookback_periods: if len(df_30m) >= period: period_data = df_30m.iloc[-period:] high = period_data['high'].max() low = period_data['low'].min() price_channels.append({'high': high, 'low': low, 'width': high - low}) # 选择波动最稳定的通道(宽度变化最小的) if price_channels: avg_width = sum(pc['width'] for pc in price_channels) / len(price_channels) selected_channel = min(price_channels, key=lambda pc: abs(pc['width'] - avg_width)) support = selected_channel['low'] resistance = selected_channel['high'] range_width = resistance - support range_width_pct = (range_width / current_price) * 100 # 震荡区间判断标准 # 1. 区间宽度 < 5%(震荡市) # 2. 价格在区间中位数附近 # 3. EMA 纠缠 is_narrow_range = range_width_pct < 5.0 price_in_middle = (current_price - support) / range_width > 0.3 and \ (current_price - support) / range_width < 0.7 # EMA 纠缠检查 ema5 = df_30m['ma5'].iloc[-1] if 'ma5' in df_30m.columns else None ema10 = df_30m['ma10'].iloc[-1] if 'ma10' in df_30m.columns else None ema20 = df_30m['ma20'].iloc[-1] if 'ma20' in df_30m.columns else None ema_entangled = False if all([ema5, ema10, ema20]): ema_spread = (max(ema5, ema10, ema20) - min(ema5, ema10, ema20)) / current_price * 100 ema_entangled = ema_spread < 1.0 # EMA 排列差距 < 1% # ========== 2. 成交量密集区分析 ========== volume_profile_support = None volume_profile_resistance = None if len(df_30m) >= 48: # 找出成交量最大的价格区间 df_30m_copy = df_30m.iloc[-48:].copy() df_30m_copy['avg_price'] = (df_30m_copy['high'] + df_30m_copy['low'] + df_30m_copy['close']) / 3 df_30m_copy['volume_weight'] = df_30m_copy['volume'] * df_30m_copy['avg_price'] # 按价格分层,找出高成交量区域 price_bins = pd.cut(df_30m_copy['avg_price'], bins=10) volume_by_price = df_30m_copy.groupby(price_bins, observed=True)['volume'].sum() if len(volume_by_price) > 0: # 高成交量区作为支撑/压力 max_vol_bin = volume_by_price.idxmax() if max_vol_bin is not None: vp_level = (max_vol_bin.left + max_vol_bin.right) / 2 if vp_level < current_price * 0.98: volume_profile_support = float(vp_level) elif vp_level > current_price * 1.02: volume_profile_resistance = float(vp_level) # ========== 3. 布林带支撑/压力 ========== bb_support = None bb_resistance = None if 'bb_lower' in df_30m.columns and 'bb_upper' in df_30m.columns: bb_support = float(df_30m['bb_lower'].iloc[-1]) bb_resistance = float(df_30m['bb_upper'].iloc[-1]) # ========== 4. 关键价格点综合 ========== # 综合多个指标得出最可靠的支撑/压力位 support_candidates = [] resistance_candidates = [] if support: support_candidates.append(support) if volume_profile_support: support_candidates.append(volume_profile_support) if bb_support: support_candidates.append(bb_support) if resistance: resistance_candidates.append(resistance) if volume_profile_resistance: resistance_candidates.append(volume_profile_resistance) if bb_resistance: resistance_candidates.append(bb_resistance) # 取中位数作为最终的支撑/压力位 final_support = np.median(support_candidates) if support_candidates else None final_resistance = np.median(resistance_candidates) if resistance_candidates else None # ========== 5. 计算置信度 ========== confidence = 0 reasons = [] if is_narrow_range: confidence += 30 reasons.append(f"区间窄({range_width_pct:.1f}%)") if price_in_middle: confidence += 20 reasons.append("价格在中部") if ema_entangled: confidence += 25 reasons.append("EMA纠缠") # 成交量分布检查 - 如果成交量在区间两端较小,说明是有效震荡 if len(df_30m) >= 24: recent_vol = df_30m['volume'].iloc[-12:].mean() older_vol = df_30m['volume'].iloc[-24:-12].mean() if abs(recent_vol - older_vol) / older_vol < 0.3: confidence += 15 reasons.append("成交量平稳") # 价格反弹次数 - 检查在支撑/压力位附近是否有多次反弹 if final_support and final_resistance: bounce_count = 0 for i in range(-24, 0): if i >= -len(df_30m): row = df_30m.iloc[i] # 检查是否在支撑位附近反弹 if abs(row['low'] - final_support) / final_support < 0.005 and row['close'] > row['open']: bounce_count += 1 # 检查是否在压力位附近回落 if abs(row['high'] - final_resistance) / final_resistance < 0.005 and row['close'] < row['open']: bounce_count += 1 if bounce_count >= 2: confidence += 10 reasons.append(f"边界反弹{bounce_count}次") result.update({ 'is_ranging': confidence >= 60, 'support_level': float(final_support) if final_support else None, 'resistance_level': float(final_resistance) if final_resistance else None, 'range_width_pct': range_width_pct, 'confidence': confidence, 'volume_profile_support': volume_profile_support, 'volume_profile_resistance': volume_profile_resistance, 'analysis': f"震荡判断: {confidence}% ({', '.join(reasons) if reasons else '无'})" }) except Exception as e: logger.warning(f"震荡区间检测失败: {e}") import traceback logger.debug(traceback.format_exc()) return result def _detect_trend_reversal(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: """ 检测趋势反转信号 综合多个指标判断趋势是否可能反转: 1. RSI 背离(价格创新高但RSI不创新高 / 价格创新低但RSI不创新低) 2. MACD 柱状图缩短/背离 3. 量价背离(价格上涨但成交量下降) 4. 关键K线形态(吞没、锤子线、十字星等) 5. 多周期趋势不一致 """ result = { 'is_reversing': False, 'reversal_type': None, # 'bullish_reversal' or 'bearish_reversal' 'confidence': 0, 'signals': [], 'analysis': '' } try: df_15m = data.get('15m') df_30m = data.get('30m') df_1h = data.get('1h') if df_15m is None or len(df_15m) < 30: return result reversal_signals = [] bullish_signals = 0 bearish_signals = 0 # ========== 1. RSI 背离检测 ========== if 'rsi' in df_15m.columns and len(df_15m) >= 20: recent_5 = df_15m.iloc[-5:] prev_5 = df_15m.iloc[-15:-10] # 顶背离(看跌反转) recent_high = recent_5['high'].max() recent_rsi_at_high = recent_5.loc[recent_5['high'] == recent_high, 'rsi'].values[0] prev_high = prev_5['high'].max() prev_rsi_at_high = prev_5.loc[prev_5['high'] == prev_high, 'rsi'].values[0] if recent_high > prev_high and recent_rsi_at_high < prev_rsi_at_high: bearish_signals += 2 reversal_signals.append({ 'type': 'rsi_divergence', 'direction': 'bearish', 'weight': 2, 'desc': 'RSI顶背离:价格创新高但RSI不创新高' }) # 底背离(看涨反转) recent_low = recent_5['low'].min() recent_rsi_at_low = recent_5.loc[recent_5['low'] == recent_low, 'rsi'].values[0] prev_low = prev_5['low'].min() prev_rsi_at_low = prev_5.loc[prev_5['low'] == prev_low, 'rsi'].values[0] if recent_low < prev_low and recent_rsi_at_low > prev_rsi_at_low: bullish_signals += 2 reversal_signals.append({ 'type': 'rsi_divergence', 'direction': 'bullish', 'weight': 2, 'desc': 'RSI底背离:价格创新低但RSI不创新低' }) # ========== 2. MACD 柱状图分析 ========== if 'macd_hist' in df_15m.columns and len(df_15m) >= 10: hist_recent = df_15m['macd_hist'].iloc[-3:].values hist_prev = df_15m['macd_hist'].iloc[-6:-3].values # MACD 柱状图缩短 = 动能衰竭 if all(h > 0 for h in hist_prev): # 之前是正向 if hist_recent[2] < hist_recent[1] < hist_recent[0]: # 持续缩短 bearish_signals += 1 reversal_signals.append({ 'type': 'macd_histogram', 'direction': 'bearish', 'weight': 1, 'desc': 'MACD柱状图持续缩短:上涨动能衰竭' }) if all(h < 0 for h in hist_prev): # 之前是负向 if hist_recent[2] > hist_recent[1] > hist_recent[0]: # 持续收窄 bullish_signals += 1 reversal_signals.append({ 'type': 'macd_histogram', 'direction': 'bullish', 'weight': 1, 'desc': 'MACD柱状图持续收窄:下跌动能衰竭' }) # MACD 金叉/死叉 if len(df_15m) >= 2: macd_current = df_15m['macd'].iloc[-1] signal_current = df_15m['macd_signal'].iloc[-1] macd_prev = df_15m['macd'].iloc[-2] signal_prev = df_15m['macd_signal'].iloc[-2] # 金叉 if macd_prev <= signal_prev and macd_current > signal_current: bullish_signals += 1 reversal_signals.append({ 'type': 'macd_cross', 'direction': 'bullish', 'weight': 1, 'desc': 'MACD金叉' }) # 死叉 if macd_prev >= signal_prev and macd_current < signal_current: bearish_signals += 1 reversal_signals.append({ 'type': 'macd_cross', 'direction': 'bearish', 'weight': 1, 'desc': 'MACD死叉' }) # ========== 3. 量价背离检测 ========== if 'volume' in df_15m.columns and len(df_15m) >= 10: recent_price_change = (df_15m['close'].iloc[-1] - df_15m['close'].iloc[-5]) / df_15m['close'].iloc[-5] recent_volume = df_15m['volume'].iloc[-5:].mean() older_volume = df_15m['volume'].iloc[-10:-5].mean() # 价格上涨但成交量下降(量价背离) if recent_price_change > 0.01 and recent_volume < older_volume * 0.8: bearish_signals += 1 reversal_signals.append({ 'type': 'volume_divergence', 'direction': 'bearish', 'weight': 1, 'desc': '量价背离:价格上涨但成交量萎缩' }) # 价格下跌但成交量下降(可能见底) if recent_price_change < -0.01 and recent_volume < older_volume * 0.7: bullish_signals += 1 reversal_signals.append({ 'type': 'volume_divergence', 'direction': 'bullish', 'weight': 1, 'desc': '下跌缩量:抛压枯竭,可能见底' }) # ========== 4. 关键K线形态检测 ========== if len(df_15m) >= 3: latest = df_15m.iloc[-1] prev = df_15m.iloc[-2] # 吞没形态 open_latest, close_latest = latest['open'], latest['close'] open_prev, close_prev = prev['open'], prev['close'] # 阳包阴(看涨) if (close_latest > open_latest and # 当前是阳线 close_prev < open_prev and # 前一个是阴线 open_latest <= close_prev and # 开盘价低于前一个收盘价 close_latest >= open_prev): # 收盘价高于前一个开盘价 bullish_signals += 2 reversal_signals.append({ 'type': 'candlestick', 'direction': 'bullish', 'weight': 2, 'desc': '阳包阴吞没形态(强反转信号)' }) # 阴包阳(看跌) if (close_latest < open_latest and # 当前是阴线 close_prev > open_prev and # 前一个是阳线 open_latest >= close_prev and # 开盘价高于前一个收盘价 close_latest <= open_prev): # 收盘价低于前一个开盘价 bearish_signals += 2 reversal_signals.append({ 'type': 'candlestick', 'direction': 'bearish', 'weight': 2, 'desc': '阴包阳吞没形态(强反转信号)' }) # 锤子线/倒锤子 body_size = abs(close_latest - open_latest) upper_shadow = df_15m['high'].iloc[-1] - max(open_latest, close_latest) lower_shadow = min(open_latest, close_latest) - df_15m['low'].iloc[-1] # 锤子线(看涨) if lower_shadow >= body_size * 2 and upper_shadow < body_size * 0.5: bullish_signals += 1 reversal_signals.append({ 'type': 'candlestick', 'direction': 'bullish', 'weight': 1, 'desc': '锤子线(底部反转信号)' }) # 倒锤子(看跌) if upper_shadow >= body_size * 2 and lower_shadow < body_size * 0.5: bearish_signals += 1 reversal_signals.append({ 'type': 'candlestick', 'direction': 'bearish', 'weight': 1, 'desc': '倒锤子线(顶部反转信号)' }) # ========== 5. 多周期趋势不一致 ========== trend_15m = self._get_trend_direction(df_15m) trend_30m = self._get_trend_direction(df_30m) trend_1h = self._get_trend_direction(df_1h) # 小周期反转但大周期未反应 if trend_15m and trend_1h and trend_15m != trend_1h: if trend_15m == 'bull' and trend_1h == 'bear': bullish_signals += 1 reversal_signals.append({ 'type': 'timeframe_divergence', 'direction': 'bullish', 'weight': 1, 'desc': '15分钟转多但1小时仍看空(潜在反转)' }) elif trend_15m == 'bear' and trend_1h == 'bull': bearish_signals += 1 reversal_signals.append({ 'type': 'timeframe_divergence', 'direction': 'bearish', 'weight': 1, 'desc': '15分钟转空但1小时仍看多(潜在反转)' }) # ========== 计算反转信号强度 ========== total_signals = len(reversal_signals) if total_signals >= 3: # 至少3个反转信号才认为可能反转 if bullish_signals >= bearish_signals + 2: result['is_reversing'] = True result['reversal_type'] = 'bullish_reversal' result['confidence'] = min(90, bullish_signals * 15) result['signals'] = [s for s in reversal_signals if s['direction'] == 'bullish'] elif bearish_signals >= bullish_signals + 2: result['is_reversing'] = True result['reversal_type'] = 'bearish_reversal' result['confidence'] = min(90, bearish_signals * 15) result['signals'] = [s for s in reversal_signals if s['direction'] == 'bearish'] if reversal_signals: result['analysis'] = f"检测到 {len(reversal_signals)} 个反转信号" else: result['analysis'] = "无反转信号" except Exception as e: logger.warning(f"趋势反转检测失败: {e}") import traceback logger.debug(traceback.format_exc()) return result def _get_trend_direction(self, df: pd.DataFrame) -> str: """获取趋势方向:bull/bear/neutral""" if df is None or len(df) < 10: return 'neutral' try: # 使用EMA判断 ma5 = df['ma5'].iloc[-1] if 'ma5' in df.columns else None ma10 = df['ma10'].iloc[-1] if 'ma10' in df.columns else None ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns else None if ma5 and ma10 and ma20: if ma5 > ma10 > ma20: return 'bull' elif ma5 < ma10 < ma20: return 'bear' # 使用MACD判断 if 'macd' in df.columns and 'macd_signal' in df.columns: macd = df['macd'].iloc[-1] signal = df['macd_signal'].iloc[-1] if macd > signal and macd > 0: return 'bull' elif macd < signal and macd < 0: return 'bear' except Exception as e: logger.debug(f"趋势方向判断失败: {e}") return 'neutral' def _detect_trend_stage(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: """ 检测趋势阶段:早期/中期/晚期 判断标准: 1. 早期:刚突破关键位,均线刚开始排列,动能开始释放 2. 中期:均线排列稳定,价格沿趋势移动,量能健康 3. 晚期:价格过度延伸,RSI极端区,量价背离,多次假突破 """ result = { 'stage': 'unknown', # 'early', 'middle', 'late' 'confidence': 0, 'signals': [], 'analysis': '' } try: df_30m = data.get('30m') df_1h = data.get('1h') if df_30m is None or len(df_30m) < 30: return result current_price = float(df_30m['close'].iloc[-1]) stage_signals = [] early_score = 0 middle_score = 0 late_score = 0 # ========== 1. EMA 排列状态 ========== ema5 = df_30m['ma5'].iloc[-1] if 'ma5' in df_30m.columns else None ema10 = df_30m['ma10'].iloc[-1] if 'ma10' in df_30m.columns else None ema20 = df_30m['ma20'].iloc[-1] if 'ma20' in df_30m.columns else None ema50 = df_30m['ma50'].iloc[-1] if 'ma50' in df_30m.columns else None if all([ema5, ema10, ema20, ema50]): # 检查EMA排列是否形成 if ema5 > ema10 > ema20 > ema50: # 多头排列 # 检查排列刚刚形成(早期)还是已经稳定(中期/晚期) ema5_cross_ma20 = False if len(df_30m) >= 10: # 检查最近10根内是否发生过金叉 for i in range(-10, 0): if df_30m['ma5'].iloc[i] > df_30m['ma20'].iloc[i]: if i > -10 and df_30m['ma5'].iloc[i-1] <= df_30m['ma20'].iloc[i-1]: ema5_cross_ma20 = True break if ema5_cross_ma20: early_score += 30 stage_signals.append("EMA排列刚形成(早期)") else: # 检查EMA间距 ema_spread = (ema5 - ema20) / ema20 * 100 if ema_spread > 3: late_score += 20 stage_signals.append(f"EMA间距过大({ema_spread:.1f}%) - 可能过度延伸") else: middle_score += 20 stage_signals.append("EMA排列稳定(中期)") elif ema5 < ema10 < ema20 < ema50: # 空头排列 ema5_cross_ma20 = False if len(df_30m) >= 10: for i in range(-10, 0): if df_30m['ma5'].iloc[i] < df_30m['ma20'].iloc[i]: if i > -10 and df_30m['ma5'].iloc[i-1] >= df_30m['ma20'].iloc[i-1]: ema5_cross_ma20 = True break if ema5_cross_ma20: early_score += 30 stage_signals.append("EMA排列刚形成(早期)") else: ema_spread = (ema20 - ema5) / ema20 * 100 if ema_spread > 3: late_score += 20 stage_signals.append(f"EMA间距过大({ema_spread:.1f}%) - 可能过度延伸") else: middle_score += 20 stage_signals.append("EMA排列稳定(中期)") # ========== 2. RSI 状态 ========== if 'rsi' in df_30m.columns: rsi_current = df_30m['rsi'].iloc[-1] rsi_prev = df_30m['rsi'].iloc[-5:-1].values # RSI极端区 - 晚期信号 if rsi_current > 70: late_score += 25 stage_signals.append(f"RSI超买({rsi_current:.0f}) - 趋势晚期") elif rsi_current < 30: late_score += 25 stage_signals.append(f"RSI超卖({rsi_current:.0f}) - 趋势晚期") elif 50 <= rsi_current <= 65: middle_score += 15 stage_signals.append(f"RSI健康({rsi_current:.0f}) - 趋势中期") elif 40 <= rsi_current <= 60: early_score += 10 stage_signals.append(f"RSI中性({rsi_current:.0f}) - 可能早期") # RSI趋势检查 if len(rsi_prev) >= 3: rsi_trend = "up" if rsi_current > rsi_prev[-1] else "down" if rsi_current < rsi_prev[-1] else "flat" if rsi_trend == "flat": late_score += 10 stage_signals.append("RSI走平 - 动能衰竭") # ========== 3. 价格偏离度 ========== if ema20: deviation = abs(current_price - ema20) / ema20 * 100 if deviation > 5: late_score += 30 stage_signals.append(f"价格偏离EMA20 {deviation:.1f}% - 过度延伸") elif deviation > 3: late_score += 15 stage_signals.append(f"价格偏离EMA20 {deviation:.1f}% - 警戒区域") elif deviation < 1: if early_score < middle_score: # 只在不是明显早期时加分 middle_score += 10 stage_signals.append("价格贴近EMA20 - 趋势稳固") # ========== 4. 量价关系 ========== if 'volume' in df_30m.columns and len(df_30m) >= 10: recent_vol = df_30m['volume'].iloc[-5:].mean() older_vol = df_30m['volume'].iloc[-10:-5].mean() vol_change = (recent_vol - older_vol) / older_vol * 100 price_change_5 = (df_30m['close'].iloc[-1] - df_30m['close'].iloc[-5]) / df_30m['close'].iloc[-5] * 100 # 价格上涨但成交量下降(量价背离)- 晚期信号 if price_change_5 > 1 and vol_change < -20: late_score += 20 stage_signals.append(f"量价背离(涨{price_change_5:.1f}%量减{vol_change:.0f}%)- 可能见顶") elif price_change_5 < -1 and vol_change < -20: late_score += 20 stage_signals.append(f"量价背离(跌{price_change_5:.1f}%量减{vol_change:.0f}%)- 可能见底") elif price_change_5 > 1 and vol_change > 30: early_score += 15 stage_signals.append(f"放量上涨(涨{price_change_5:.1f}%量增{vol_change:.0f}%)- 可能早期") elif price_change_5 < -1 and vol_change > 30: early_score += 15 stage_signals.append(f"放量下跌(跌{price_change_5:.1f}%量增{vol_change:.0f}%)- 可能早期") # ========== 5. 波动率状态 ========== if 'atr' in df_30m.columns and len(df_30m) >= 20: recent_atr = df_30m['atr'].iloc[-5:].mean() older_atr = df_30m['atr'].iloc[-15:-5].mean() atr_change = (recent_atr - older_atr) / older_atr * 100 if older_atr > 0 else 0 if atr_change > 30: early_score += 10 stage_signals.append(f"ATR扩张({atr_change:.0f}%) - 趋势启动") elif atr_change < -30: late_score += 10 stage_signals.append(f"ATR收缩({atr_change:.0f}%) - 动能衰竭") # ========== 6. 连续同向K线数量 ========== if len(df_30m) >= 5: recent_closes = df_30m['close'].iloc[-5:].values consecutive_up = sum(1 for i in range(1, len(recent_closes)) if recent_closes[i] > recent_closes[i-1]) consecutive_down = sum(1 for i in range(1, len(recent_closes)) if recent_closes[i] < recent_closes[i-1]) if consecutive_up >= 4: late_score += 15 stage_signals.append(f"连续{consecutive_up}根阳线 - 可能过度") elif consecutive_down >= 4: late_score += 15 stage_signals.append(f"连续{consecutive_down}根阴线 - 可能过度") # ========== 综合判断趋势阶段 ========== scores = { 'early': early_score, 'middle': middle_score, 'late': late_score } max_score = max(scores.values()) if max_score < 20: result['stage'] = 'unknown' result['analysis'] = "趋势阶段不明确" elif max_score == late_score and late_score >= 40: result['stage'] = 'late' result['confidence'] = min(95, late_score) result['signals'] = stage_signals result['analysis'] = f"⚠️ 趋势晚期({late_score}分)- " + "; ".join(stage_signals[:3]) elif max_score == early_score and early_score >= 30: result['stage'] = 'early' result['confidence'] = min(90, early_score) result['signals'] = stage_signals result['analysis'] = f"趋势早期({early_score}分)- " + "; ".join(stage_signals[:3]) else: result['stage'] = 'middle' result['confidence'] = min(85, middle_score) result['signals'] = stage_signals result['analysis'] = f"趋势中期({middle_score}分)- " + "; ".join(stage_signals[:3]) except Exception as e: logger.warning(f"趋势阶段检测失败: {e}") import traceback logger.debug(traceback.format_exc()) return result