""" 市场信号分析器 - 纯市场分析,不包含任何仓位信息 职责: 1. 分析K线、量价、技术指标 2. 分析新闻舆情 3. 输出纯市场信号(buy/sell/hold + confidence + reasoning) 不负责: - 仓位管理 - 风险控制 - 具体下单决策 """ import json import re from typing import Dict, Any, Optional, List from datetime import datetime from app.utils.logger import logger from app.services.llm_service import llm_service from app.services.news_service import get_news_service class MarketSignalAnalyzer: """市场信号分析器 - 只关注市场,输出客观信号""" # 纯市场分析系统提示词(与旧版 CRYPTO_SYSTEM_PROMPT 保持一致,只移除仓位管理) MARKET_ANALYSIS_PROMPT = """你是一位专业的加密货币交易员和技术分析师。你的任务是综合分析**K线数据、量价关系、技术指标和新闻舆情**,给出交易信号。 ## 核心理念 加密货币市场波动大,每天都有交易机会。你的目标是: - **主动寻找机会**,而不是被动等待完美信号 - 短线交易重点关注:超跌反弹、超涨回落、关键位突破 - 中线交易重点关注:趋势回调、形态突破、多周期共振 ## 一、量价分析(最重要) 量价关系是判断趋势真假的核心: ### 1. 健康上涨信号 - **放量上涨**:价格上涨 + 成交量放大(量比>1.5)= 上涨有效,可追多 - **缩量回调**:上涨后回调 + 成交量萎缩(量比<0.7)= 回调健康,可低吸 ### 2. 健康下跌信号 - **放量下跌**:价格下跌 + 成交量放大 = 下跌有效,可追空 - **缩量反弹**:下跌后反弹 + 成交量萎缩 = 反弹无力,可做空 ### 3. 量价背离(重要反转信号) - **顶背离**:价格创新高,但成交量未创新高 → 上涨动能衰竭,警惕回落 - **底背离**:价格创新低,但成交量未创新低 → 下跌动能衰竭,关注反弹 - **天量见顶**:极端放量(量比>3)后价格滞涨 → 主力出货信号 - **地量见底**:极端缩量(量比<0.3)后价格企稳 → 抛压枯竭信号 ### 4. 突破确认 - **有效突破**:突破关键位 + 放量确认(量比>1.5)= 真突破 - **假突破**:突破关键位 + 缩量 = 假突破,可能回落 ## 二、K线形态分析 ### 反转形态 - **锤子线/倒锤子**:下跌趋势中出现,下影线长 = 底部信号 - **吞没形态**:大阳吞没前一根阴线 = 看涨;大阴吞没前一根阳线 = 看跌 - **十字星**:在高位/低位出现 = 变盘信号 - **早晨之星/黄昏之星**:三根K线组合的反转信号 ### 持续形态 - **三连阳/三连阴**:趋势延续信号 - **旗形整理**:趋势中的健康回调 ## 三、技术指标分析 ### RSI(相对强弱指标) **RSI 是最重要的超买超卖指标,请注意细节:** - **RSI < 30**:超卖区,关注反弹机会 - RSI 从 30 以下回升,交叉上穿 30:买入信号 - RSI 底背离(价格新低但 RSI 未创新低):强买入信号 - **RSI > 70**:超买区,关注回落风险 - RSI 从 70 以上回落,交叉下穿 70:卖出信号 - RSI 顶背离(价格新高但 RSI 未创新高):强卖出信号 - **RSI 40-60**:震荡区,观望为主 - **RSI 趋势**:RSI 自身的趋势变化比单一数值更重要 ### MACD - 金叉(DIF 上穿 DEA):做多信号 - 死叉(DIF 下穿 DEA):做空信号 - 零轴上方金叉:强势做多 - 零轴下方死叉:强势做空 - MACD 柱状图背离:重要反转信号 ### 布林带 - 触及下轨 + 企稳:反弹做多 - 触及上轨 + 受阻:回落做空 - 布林带收口:即将变盘 - 布林带开口:趋势启动 ### 均线系统(重要) **均线系统是趋势判断的核心,请仔细分析:** - **多头排列**(MA5 > MA10 > MA20 > MA50):强势上涨趋势,回调做多 - **空头排列**(MA5 < MA10 < MA20 < MA50):强势下跌趋势,反弹做空 - **价格与 MA 的关系**: - 价格站稳 MA5/MA10 上方:短线上涨 - 价格突破 MA20:中线转多 - 价格跌破 MA20:中线转空 - MA50 是中期趋势的分水岭 - **均线金叉死叉**: - MA5 上穿 MA10:短线买入信号 - MA5 下穿 MA10:短线卖出信号 - MA10 上穿 MA20:中线买入信号 - MA10 下穿 MA20:中线卖出信号 ## 四、新闻舆情分析 结合最新市场新闻判断: - **重大利好**:监管利好、机构入场、ETF 通过等 → 提高做多置信度 - **重大利空**:监管打压、交易所暴雷、黑客攻击等 → 提高做空置信度 - **市场情绪**:恐慌指数、社交媒体热度 - **大户动向**:鲸鱼转账、交易所流入流出 ## 五、多周期共振(关键分析框架) **多周期共振是提高信号质量的核心方法:** ### 周期层级关系 - **4h(趋势层)**:决定中期大方向 - **1h(主周期)**:主要交易周期 - **15m(入场层)**:寻找入场时机 - **5m(精确入场)**:确认最佳入场点 ### 共振判断标准 **强共振(A级信号)**: - 所有周期趋势同向(如 4h多 + 1h多 + 15m多) - 多周期 RSI 同时超买/超卖后出现背离 - 多周期 MA 同时金叉/死叉 **中等共振(B级信号)**: - 大周期(4h+1h)同向 - 主周期(1h)技术指标明确 **弱共振(C级信号)**: - 只有单一周期信号 - 多周期方向不一致 ### 实战策略 - **顺势交易**:4h 和 1h 同向时,在 15m/5m 寻找入场点 - **逆势谨慎**:只有 1h 信号但 4h 反向时,降低置信度 - **突破交易**:多周期同时突破关键位,信号最强 ## 六、入场方式 根据市场分析综合判断入场方式: - **market**:现价立即入场 - 信号强烈且明确(A级或高置信度B级) - 放量突破关键位,趋势明确 - 多周期共振,等待可能错过机会 - 市场波动大,等待可能价格变化太快 - **limit**:挂单等待入场 - 信号强度中等(B级或C级) - 当前价格距离理想入场位有一定距离 - 判断市场可能回调到更好位置 - 希望获得更优成交价格,愿意承担可能无法成交的风险 **重要**: - 必须同时输出 `entry_zone`(建议入场价)和 `entry_type`(入场方式) - 入场方式由你的市场分析判断,不是简单的价格距离计算 - 如果选择 `limit`,`entry_zone` 应该是你建议的挂单价格 ## 输出格式 请严格按照以下 JSON 格式输出: ```json { "analysis_summary": "简要描述当前市场状态(50字以内)", "volume_analysis": "量价分析结论(30字以内)", "news_sentiment": "positive/negative/neutral", "news_impact": "新闻对市场的影响分析(30字以内)", "signals": [ { "type": "short_term/medium_term/long_term", "action": "buy/sell", "entry_type": "market/limit", "confidence": 0-100, "grade": "A/B/C/D", "entry_zone": 66000, "stop_loss": 65500, "take_profit": 67500, "reasoning": "详细的入场理由(必须包含量价分析)", "key_factors": ["关键因素1", "关键因素2"] } ], "key_levels": { "support": [65000, 64500], "resistance": [67000, 67500] } } ``` ## 重要说明 - **所有价格必须是纯数字**,不要加 $ 符号、逗号或其他格式 - `entry_zone`、`stop_loss`、`take_profit` 必须是数字类型,不要是字符串 - `key_levels` 中的支撑位和阻力位也必须是数字数组 ## 信号等级与置信度 - **A级**(80-100):量价配合 + 多指标共振 + 多周期确认 - **B级**(60-79):量价配合 + 主要指标确认 - **C级**(40-59):有机会但量价不够理想 - **D级**(<40):量价背离或信号矛盾 ## 注意事项 1. **只在有明确的做多或做空机会时才输出信号**(action 为 buy 或 sell) 2. 如果市场不明朗,没有明确交易机会,**不要输出任何信号**(signals 为空数组 []) 3. 信号强度(confidence)要合理,不要随意给高分 4. 60-70分:一般信号,可轻仓试探 5. 75-85分:较强信号,可正常仓位 6. 90+分:强信号,但也要控制风险 7. 不要输出 action 为 "wait" 的信号,如果没有交易机会就不输出 记住:你只负责分析市场,输出客观的交易信号,不需要考虑仓位管理和风险控制! """ def __init__(self): self.news_service = get_news_service() async def analyze(self, symbol: str, data: Dict[str, Any], symbols: List[str] = None) -> Dict[str, Any]: """ 分析市场并生成信号 Args: symbol: 交易对 data: 多周期K线数据 symbols: 所有监控的交易对(用于市场对比) Returns: 市场信号字典 """ try: # 1. 准备市场数据 market_context = self._prepare_market_context(symbol, data, symbols) # 2. 获取新闻舆情 news_context = await self._get_news_context(symbol) # 3. 构建 LLM 提示词 prompt = self._build_analysis_prompt(symbol, market_context, news_context) # 4. 调用 LLM 分析 messages = [ {"role": "system", "content": self.MARKET_ANALYSIS_PROMPT}, {"role": "user", "content": prompt} ] response = await llm_service.achat(messages) # 5. 解析结果 result = self._parse_llm_response(response, symbol) return result except Exception as e: logger.error(f"市场信号分析失败: {e}") import traceback logger.debug(traceback.format_exc()) return self._get_empty_signal(symbol) def _prepare_market_context(self, symbol: str, data: Dict, symbols: List[str] = None) -> str: """准备市场上下文信息""" context_parts = [] # 当前价格和24h变化 current_price = float(data['5m'].iloc[-1]['close']) price_change_24h = self._calculate_price_change_24h(data['1h']) context_parts.append(f"当前价格: ${current_price:,.2f} ({price_change_24h})") # 多周期数据 for tf_name, df in data.items(): if df is None or len(df) == 0: continue latest = df.iloc[-1] context_parts.append(f"\n## {tf_name} 数据") context_parts.append(f"开: {latest['open']}, 高: {latest['high']}, 低: {latest['low']}, 收: {latest['close']}") context_parts.append(f"成交量: {latest.get('volume', 'N/A')}") # 技术指标 if 'rsi' in df.columns: rsi = df['rsi'].iloc[-1] context_parts.append(f"RSI: {rsi:.2f}") if 'macd' in df.columns: macd = df['macd'].iloc[-1] signal = df['macd_signal'].iloc[-1] context_parts.append(f"MACD: {macd:.4f}, 信号线: {signal:.4f}") if 'bb_upper' in df.columns: bb_upper = df['bb_upper'].iloc[-1] bb_lower = df['bb_lower'].iloc[-1] context_parts.append(f"布林带: 上轨 {bb_upper:.2f}, 下轨 {bb_lower:.2f}") # 均线系统 context_parts.append(f"\n## 均线系统") df_1h = data.get('1h') if df_1h is not None and len(df_1h) > 0: latest = df_1h.iloc[-1] context_parts.append(f"MA5: {latest.get('ma5', 'N/A')}") context_parts.append(f"MA10: {latest.get('ma10', 'N/A')}") context_parts.append(f"MA20: {latest.get('ma20', 'N/A')}") context_parts.append(f"MA50: {latest.get('ma50', 'N/A')}") # 判断均线排列 ma5 = latest.get('ma5', 0) ma10 = latest.get('ma10', 0) ma20 = latest.get('ma20', 0) ma50 = latest.get('ma50', 0) if all([ma5, ma10, ma20, ma50]): if ma5 > ma10 > ma20 > ma50: context_parts.append("均线排列: 多头排列 📈") elif ma5 < ma10 < ma20 < ma50: context_parts.append("均线排列: 空头排列 📉") else: context_parts.append("均线排列: 交织,方向不明") # 量比分析 df_5m = data.get('5m') if df_5m is not None and len(df_5m) >= 20: vol_latest = df_5m['volume'].iloc[-1] vol_ma20 = df_5m['volume'].iloc[-20:-1].mean() volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1 context_parts.append(f"\n## 量价分析") context_parts.append(f"最新成交量: {vol_latest:.0f}") context_parts.append(f"20周期均量: {vol_ma20:.0f}") context_parts.append(f"量比: {volume_ratio:.2f}") if volume_ratio > 1.5: context_parts.append("量价状态: 放量 📊") elif volume_ratio < 0.7: context_parts.append("量价状态: 缩量 📉") else: context_parts.append("量价状态: 平量 ➖") return "\n".join(context_parts) async def _get_news_context(self, symbol: str) -> str: """获取新闻舆情上下文""" try: news_result = await self.news_service.get_crypto_news(symbol) if not news_result or not news_result.get('articles'): return "无最新新闻" articles = news_result['articles'][:5] # 只取前5条 context_parts = ["\n## 最新新闻"] for article in articles: title = article.get('title', '') source = article.get('source', '') published_at = article.get('publishedAt', '') time_str = published_at.split('T')[1][:5] if 'T' in published_at else '' context_parts.append(f"- [{time_str}] {title} ({source})") return "\n".join(context_parts) except Exception as e: logger.warning(f"获取新闻失败: {e}") return "新闻获取失败" def _build_analysis_prompt(self, symbol: str, market_context: str, news_context: str) -> str: """构建分析提示词""" return f"""请分析 {symbol} 的市场情况: {market_context} {news_context} 请根据以上数据,给出你的市场判断和交易信号。 """ def _parse_llm_response(self, response: str, symbol: str) -> Dict[str, Any]: """解析 LLM 响应""" try: # 尝试提取 JSON json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response) if json_match: json_str = json_match.group(1) else: json_match = re.search(r'\{[\s\S]*\}', response) if json_match: json_str = json_match.group(0) else: raise ValueError("无法找到 JSON 响应") # 清理 JSON 字符串(移除可能导致解析错误的注释等) json_str = self._clean_json_string(json_str) logger.debug(f"解析的 JSON 字符串: {json_str[:500]}...") # 打印前500字符用于调试 result = json.loads(json_str) # 清理价格字段 - 转换为 float result = self._clean_price_fields(result) # 添加元数据 result['symbol'] = symbol result['timestamp'] = datetime.now().isoformat() result['raw_response'] = response # 兼容处理:确保 signals 中的字段与旧格式一致 if 'signals' in result: for sig in result['signals']: # LLM 输出的 "type" 是 timeframe (short_term/medium_term/long_term) # 需要映射为 "timeframe",而 "action" 才是 buy/sell/wait if 'type' in sig: # 如果 type 是 short_term/medium_term/long_term,映射为 timeframe if sig['type'] in ['short_term', 'medium_term', 'long_term']: sig['timeframe'] = sig.pop('type') # 如果 type 是 buy/sell/wait,映射为 action elif sig['type'] in ['buy', 'sell', 'wait']: sig['action'] = sig.pop('type') # 确保 action 字段存在 if 'action' not in sig and 'timeframe' in sig: # 从 reasoning 或其他字段推断 action sig['action'] = 'wait' # 确保 grade 字段存在 if 'grade' not in sig: # 根据 confidence 推断 grade confidence = sig.get('confidence', 0) if confidence >= 80: sig['grade'] = 'A' elif confidence >= 60: sig['grade'] = 'B' elif confidence >= 40: sig['grade'] = 'C' else: sig['grade'] = 'D' # 从信号中推断 market_state 和 trend if 'signals' in result and result['signals']: # 找出置信度最高的信号 best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0)) action = best_signal.get('action', 'wait') confidence = best_signal.get('confidence', 0) # 推断市场状态 if confidence >= 70: if action == 'buy': result['market_state'] = '强势上涨' elif action == 'sell': result['market_state'] = '强势下跌' else: result['market_state'] = '震荡整理' else: result['market_state'] = '震荡整理' # 推断趋势 if action == 'buy': result['trend'] = 'up' elif action == 'sell': result['trend'] = 'down' else: result['trend'] = 'sideways' else: result['market_state'] = '无明确信号' result['trend'] = 'sideways' logger.info(f"✅ 市场信号分析完成: {symbol}") logger.debug(f"市场信号: {json.dumps(result, ensure_ascii=False, indent=2)}") return result except Exception as e: logger.warning(f"解析 LLM 响应失败: {e}") logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符 return self._get_empty_signal(symbol) def _clean_json_string(self, json_str: str) -> str: """清理 JSON 字符串,移除可能导致解析错误的内容""" # 移除单行注释 // ... json_str = re.sub(r'//.*?(?=\n|$)', '', json_str) # 移除多行注释 /* ... */ json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str) # 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}) json_str = re.sub(r',\s*([}\]])', r'\1', json_str) return json_str def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: """清理价格字段,转换为 float""" def clean_price(price_value): if price_value is None: return None if isinstance(price_value, (int, float)): return float(price_value) if isinstance(price_value, str): # 移除 $ 符号和逗号 cleaned = price_value.replace('$', '').replace(',', '').strip() if cleaned: try: return float(cleaned) except ValueError: return None return None # 清理 key_levels 中的支撑位和阻力位 if 'key_levels' in data and data['key_levels']: key_levels = data['key_levels'] if 'support' in key_levels: data['key_levels']['support'] = [clean_price(s) for s in key_levels['support']] if 'resistance' in key_levels: data['key_levels']['resistance'] = [clean_price(r) for r in key_levels['resistance']] # 清理 signals 中的价格字段 if 'signals' in data: for sig in data['signals']: price_fields = ['entry_zone', 'stop_loss', 'take_profit'] for field in price_fields: if field in sig: sig[field] = clean_price(sig[field]) return data def _calculate_price_change_24h(self, df) -> str: """计算24小时涨跌幅""" try: if df is None or len(df) < 24: return "N/A" current_price = float(df['close'].iloc[-1]) price_24h_ago = float(df['close'].iloc[-24]) change = ((current_price - price_24h_ago) / price_24h_ago) * 100 sign = "+" if change >= 0 else "" return f"{sign}{change:.2f}%" except Exception as e: logger.debug(f"计算24h涨跌失败: {e}") return "N/A" def _get_empty_signal(self, symbol: str) -> Dict[str, Any]: """返回空信号""" return { 'symbol': symbol, 'analysis_summary': 'unknown', 'volume_analysis': '分析失败', 'news_sentiment': 'neutral', 'news_impact': '无', 'market_state': '分析失败', 'trend': 'sideways', 'signals': [], 'key_levels': {}, 'timestamp': datetime.now().isoformat(), 'error': '信号分析失败' }