""" LLM 驱动的信号分析器 - 让 LLM 自主分析市场数据并给出交易信号 """ import json import re import pandas as pd from typing import Dict, Any, Optional, List from datetime import datetime from app.utils.logger import logger from app.services.llm_service import llm_service from app.services.news_service import get_news_service class LLMSignalAnalyzer: """LLM 驱动的交易信号分析器""" # 系统提示词 - 让 LLM 自主分析 SYSTEM_PROMPT = """你是一位专业的加密货币交易员和技术分析师。你的任务是综合分析**K线数据、量价关系、技术指标和新闻舆情**,给出交易信号。 ## 核心理念 加密货币市场波动大,每天都有交易机会。你的目标是: - **主动寻找机会**,而不是被动等待完美信号 - 短线交易重点关注:超跌反弹、超涨回落、关键位突破 - 中线交易重点关注:趋势回调、形态突破、多周期共振 ## 一、量价分析(最重要) 量价关系是判断趋势真假的核心: ### 1. 健康上涨信号 - **放量上涨**:价格上涨 + 成交量放大(量比>1.5)= 上涨有效,可追多 - **缩量回调**:上涨后回调 + 成交量萎缩(量比<0.7)= 回调健康,可低吸 ### 2. 健康下跌信号 - **放量下跌**:价格下跌 + 成交量放大 = 下跌有效,可追空 - **缩量反弹**:下跌后反弹 + 成交量萎缩 = 反弹无力,可做空 ### 3. 量价背离(重要反转信号) - **顶背离**:价格创新高,但成交量未创新高 → 上涨动能衰竭,警惕回落 - **底背离**:价格创新低,但成交量未创新低 → 下跌动能衰竭,关注反弹 - **天量见顶**:极端放量(量比>3)后价格滞涨 → 主力出货信号 - **地量见底**:极端缩量(量比<0.3)后价格企稳 → 抛压枯竭信号 ### 4. 突破确认 - **有效突破**:突破关键位 + 放量确认(量比>1.5)= 真突破 - **假突破**:突破关键位 + 缩量 = 假突破,可能回落 ## 二、K线形态分析 ### 反转形态 - **锤子线/倒锤子**:下跌趋势中出现,下影线长 = 底部信号 - **吞没形态**:大阳吞没前一根阴线 = 看涨;大阴吞没前一根阳线 = 看跌 - **十字星**:在高位/低位出现 = 变盘信号 - **早晨之星/黄昏之星**:三根K线组合的反转信号 ### 持续形态 - **三连阳/三连阴**:趋势延续信号 - **旗形整理**:趋势中的健康回调 ## 三、技术指标分析 ### RSI(相对强弱指标) - RSI < 30:超卖区,关注反弹机会 - RSI > 70:超买区,关注回落风险 - RSI 背离:价格与 RSI 走势相反 = 重要反转信号 ### MACD - 金叉(DIF 上穿 DEA):做多信号 - 死叉(DIF 下穿 DEA):做空信号 - 零轴上方金叉:强势做多 - 零轴下方死叉:强势做空 - MACD 柱状图背离:重要反转信号 ### 布林带 - 触及下轨 + 企稳:反弹做多 - 触及上轨 + 受阻:回落做空 - 布林带收口:即将变盘 - 布林带开口:趋势启动 ### 均线系统 - 多头排列(MA5>MA10>MA20):上涨趋势 - 空头排列(MA54%ATR) 2. **做空止损**: - 优先放在最近阻力位(前高)上方 0.3-0.5% - 如果有 MA20 阻力,可放在 MA20 上方 0.5% - 如果最近高点距离过近(<1%),则使用 ATR 1.2-1.5倍 - 避免止损距离过大(>4%ATR) ### 止盈设置(移动止盈策略) **不设固定止盈位,让利润奔跑!** 1. **take_profit 设置为保险价位**: - 做多:入场价 + 15%(作为极端情况的保险止盈) - 做空:入场价 - 15% - 这个价位只是"保险",正常情况下不会触及 2. **实际止盈靠移动止损**: - 系统会通过移动止损自动锁定利润 - 盈利 2% 后开始移动止损,锁定部分利润 - 盈利越多,止损跟随移动,确保吃到趋势 ### 风险收益比 - 虽然不设固定止盈,但仍要确保止损合理 - 理想情况下,潜在风险(止损距离)应控制在 2-3% 以内 ## 重要原则 1. **量价优先** - 任何信号都必须有量能配合才可靠 2. **积极但不冒进** - 有合理依据就给出信号,不要过于保守 3. 每种类型最多输出一个信号 4. 止损必须基于关键支撑/阻力位(前低前高、MA20),不要用固定百分比 5. 止盈设置为保险价位(做多+15%,做空-15%),实际靠移动止损锁定利润 6. reason 字段必须包含量价分析(如"放量突破+RSI=45,量比1.8确认有效") 7. entry_type 必须明确:信号已触发用 market,等待更好价位用 limit 8. **position_size 必须明确**:根据信号质量和持仓情况给出 heavy/medium/light""" def __init__(self, agent_type: str = "crypto"): """初始化分析器 Args: agent_type: 智能体类型,支持 'crypto', 'stock', 'smart' """ from app.config import get_settings self.news_service = get_news_service() settings = get_settings() # 根据智能体类型选择模型配置 model_config_map = { 'crypto': 'crypto_agent_model', 'stock': 'stock_agent_model', 'smart': 'smart_agent_model' } config_key = model_config_map.get(agent_type, 'crypto_agent_model') self.model_override = getattr(settings, config_key, None) self.agent_type = agent_type agent_name_map = { 'crypto': '加密货币', 'stock': '股票', # 改为通用的"股票",具体市场类型会在分析时根据符号判断 'smart': '智能助手' } agent_name = agent_name_map.get(agent_type, '未知') logger.info(f"LLM 信号分析器初始化完成({agent_name},模型: {self.model_override or '默认'})") def _get_market_type(self, symbol: str) -> str: """根据股票代码判断市场类型""" if symbol.endswith('.HK'): return '港股' else: return '美股' async def analyze(self, symbol: str, data: Dict[str, pd.DataFrame], symbols: List[str] = None, position_info: Dict[str, Any] = None) -> Dict[str, Any]: """ 使用 LLM 分析市场数据 Args: symbol: 交易对,如 'BTCUSDT' data: 多周期K线数据 {'5m': df, '15m': df, '1h': df, '4h': df} symbols: 所有监控的交易对(用于过滤相关新闻) position_info: 当前持仓信息,用于仓位管理决策 - account_balance: 账户余额 - total_position_value: 总持仓价值 - current_leverage: 当前杠杆倍数 - positions: 各交易对持仓列表 Returns: 分析结果 """ try: # 获取市场类型 market_type = self._get_market_type(symbol) if self.agent_type == 'stock' else '' # 获取新闻数据 news_text = await self._get_news_context(symbol, symbols or [symbol]) # 构建数据提示 data_prompt = self._build_data_prompt(symbol, data, news_text, position_info) # 调用 LLM response = llm_service.chat([ {"role": "system", "content": self.SYSTEM_PROMPT}, {"role": "user", "content": data_prompt} ], model_override=self.model_override) if not response: logger.warning(f"{symbol} LLM 分析无响应") return self._empty_result(symbol, "LLM 无响应") # 解析响应 result = self._parse_response(response) result['symbol'] = symbol result['timestamp'] = datetime.now().isoformat() # 记录日志 signals = result.get('signals', []) if signals: for sig in signals: logger.info(f"{symbol} [{market_type}][{sig['type']}] {sig['action']} " f"置信度:{sig['confidence']}% 等级:{sig['grade']} " f"原因:{sig['reason'][:50]}...") else: logger.info(f"{symbol} [{market_type}] 无交易信号 - {result.get('analysis_summary', '观望')}") return result except Exception as e: logger.error(f"{symbol} LLM 分析出错: {e}") import traceback logger.error(traceback.format_exc()) return self._empty_result(symbol, str(e)) async def _get_news_context(self, symbol: str, symbols: List[str]) -> str: """获取新闻上下文(暂时禁用)""" # 暂时禁用新闻获取,只做技术面分析 return "" def _format_position_info(self, symbol: str, position_info: Dict[str, Any]) -> str: """格式化持仓信息供 LLM 参考""" lines = [] # 账户概况 balance = position_info.get('account_balance', 0) total_value = position_info.get('total_position_value', 0) current_leverage = position_info.get('current_leverage', 0) max_leverage = 20 # 最大杠杆限制 lines.append(f"- 账户余额: ${balance:,.2f}") lines.append(f"- 总持仓价值: ${total_value:,.2f}") lines.append(f"- 当前杠杆: {current_leverage:.1f}x / {max_leverage}x") # 可用杠杆空间 available_leverage = max_leverage - current_leverage if available_leverage > 0: available_value = balance * available_leverage lines.append(f"- 可开仓空间: ${available_value:,.2f} ({available_leverage:.1f}x)") else: lines.append("- ⚠️ 已达最大杠杆,不建议加仓") # 当前交易对持仓 positions = position_info.get('positions', []) symbol_positions = [p for p in positions if p.get('symbol') == symbol] if symbol_positions: lines.append(f"\n**{symbol} 当前持仓**:") for pos in symbol_positions: side = "做多" if pos.get('side') == 'long' else "做空" entry = pos.get('entry_price', 0) pnl = pos.get('pnl_percent', 0) lines.append(f" - {side} @ ${entry:,.2f} | 盈亏: {pnl:+.2f}%") else: lines.append(f"\n**{symbol}**: 无持仓") # 其他交易对持仓概况 other_positions = [p for p in positions if p.get('symbol') != symbol and p.get('status') == 'open'] if other_positions: lines.append(f"\n**其他持仓**: {len(other_positions)} 个") return "\n".join(lines) def _build_data_prompt(self, symbol: str, data: Dict[str, pd.DataFrame], news_text: str = "", position_info: Dict[str, Any] = None) -> str: """构建数据提示词""" parts = [f"# {symbol} 市场数据分析\n"] parts.append(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") # 当前价格 current_price = 0 if '5m' in data and not data['5m'].empty: current_price = float(data['5m'].iloc[-1]['close']) parts.append(f"**当前价格**: ${current_price:,.2f}\n") # === 新增:账户和持仓信息 === if position_info: parts.append("\n## 账户与持仓状态") parts.append(self._format_position_info(symbol, position_info)) # === 新增:关键价位分析 === key_levels = self._calculate_key_levels(data) if key_levels: parts.append("\n## 关键价位") parts.append(key_levels) # === 新增:多周期共振分析 === resonance = self._analyze_multi_timeframe_resonance(data) if resonance: parts.append("\n## 多周期共振") parts.append(resonance) # === 新增:市场结构分析 === structure = self._analyze_market_structure(data) if structure: parts.append("\n## 市场结构") parts.append(structure) # === 新增:波动率分析 === volatility = self._analyze_volatility(data) if volatility: parts.append("\n## 波动率分析") parts.append(volatility) # 各周期数据 for interval in ['4h', '1h', '15m', '5m']: df = data.get(interval) if df is None or df.empty: continue parts.append(f"\n## {interval.upper()} 周期数据") # 最新指标(传入 df 以分析趋势变化) latest = df.iloc[-1] parts.append(self._format_indicators(latest, df)) # 最近 K 线数据 parts.append(self._format_recent_klines(df, interval)) # 添加新闻数据 if news_text and news_text != "暂无相关新闻": parts.append(f"\n{news_text}") parts.append("\n---") parts.append("请综合分析以上技术数据和新闻舆情,判断是否存在短线、中线或长线的交易机会。") parts.append("如果没有明确的交易机会,signals 数组返回空即可。") return "\n".join(parts) def _calculate_key_levels(self, data: Dict[str, pd.DataFrame]) -> str: """计算关键支撑阻力位""" lines = [] # 使用 4h 数据计算关键价位 df = data.get('4h') if df is None or len(df) < 20: return "" current_price = float(df.iloc[-1]['close']) # 1. 前高前低(最近 20 根 K 线) recent = df.iloc[-20:] recent_high = float(recent['high'].max()) recent_low = float(recent['low'].min()) # 2. 整数关口 round_levels = [] base = int(current_price / 1000) * 1000 for offset in [-2000, -1000, 0, 1000, 2000]: level = base + offset if level > 0: round_levels.append(level) # 3. 斐波那契回撤位(基于最近的高低点) fib_levels = [] price_range = recent_high - recent_low if price_range > 0: fib_ratios = [0, 0.236, 0.382, 0.5, 0.618, 0.786, 1] for ratio in fib_ratios: fib_price = recent_low + price_range * ratio fib_levels.append((ratio, fib_price)) # 构建输出 lines.append(f"- 近期高点: ${recent_high:,.2f}") lines.append(f"- 近期低点: ${recent_low:,.2f}") # 找出最近的支撑和阻力 supports = [] resistances = [] # 从斐波那契位找支撑阻力 for ratio, price in fib_levels: if price < current_price * 0.995: # 低于当前价 0.5% 以上 supports.append(price) elif price > current_price * 1.005: # 高于当前价 0.5% 以上 resistances.append(price) # 添加整数关口 for level in round_levels: if level < current_price * 0.995: supports.append(level) elif level > current_price * 1.005: resistances.append(level) # 排序并取最近的 supports = sorted(set(supports), reverse=True)[:3] resistances = sorted(set(resistances))[:3] if supports: support_str = ", ".join([f"${s:,.0f}" for s in supports]) lines.append(f"- 支撑位: {support_str}") if resistances: resistance_str = ", ".join([f"${r:,.0f}" for r in resistances]) lines.append(f"- 阻力位: {resistance_str}") # 当前价格位置 if recent_high > recent_low: position = (current_price - recent_low) / (recent_high - recent_low) * 100 if position > 80: pos_text = "接近高点,注意回调风险" elif position < 20: pos_text = "接近低点,关注反弹机会" else: pos_text = f"处于区间 {position:.0f}% 位置" lines.append(f"- 价格位置: {pos_text}") return "\n".join(lines) def _analyze_multi_timeframe_resonance(self, data: Dict[str, pd.DataFrame]) -> str: """分析多周期共振""" trends = {} for interval in ['4h', '1h', '15m', '5m']: df = data.get(interval) if df is None or len(df) < 10: continue # 判断趋势方向 ma5 = df['ma5'].iloc[-1] if 'ma5' in df.columns else None ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns else None close = df['close'].iloc[-1] if pd.notna(ma5) and pd.notna(ma20): if close > ma5 > ma20: trends[interval] = 'bullish' elif close < ma5 < ma20: trends[interval] = 'bearish' else: trends[interval] = 'neutral' if len(trends) < 2: return "" lines = [] # 统计各方向数量 bullish_count = sum(1 for t in trends.values() if t == 'bullish') bearish_count = sum(1 for t in trends.values() if t == 'bearish') total = len(trends) # 各周期趋势 trend_map = {'bullish': '📈多', 'bearish': '📉空', 'neutral': '➡️震荡'} trend_str = " | ".join([f"{k}: {trend_map.get(v, v)}" for k, v in trends.items()]) lines.append(f"- 各周期趋势: {trend_str}") # 共振判断 if bullish_count == total: lines.append(f"- **强共振做多**: 所有周期均为多头排列") elif bearish_count == total: lines.append(f"- **强共振做空**: 所有周期均为空头排列") elif bullish_count >= total * 0.75: lines.append(f"- **偏多共振**: {bullish_count}/{total} 周期看多") elif bearish_count >= total * 0.75: lines.append(f"- **偏空共振**: {bearish_count}/{total} 周期看空") else: lines.append(f"- **无明显共振**: 多空分歧,建议观望") return "\n".join(lines) def _analyze_market_structure(self, data: Dict[str, pd.DataFrame]) -> str: """分析市场结构(趋势、高低点)""" df = data.get('1h') if df is None or len(df) < 24: return "" lines = [] recent = df.iloc[-24:] # 最近 24 根 1h K 线 # 找出局部高低点 highs = [] lows = [] for i in range(2, len(recent) - 2): # 局部高点:比前后两根都高 if (recent.iloc[i]['high'] > recent.iloc[i-1]['high'] and recent.iloc[i]['high'] > recent.iloc[i-2]['high'] and recent.iloc[i]['high'] > recent.iloc[i+1]['high'] and recent.iloc[i]['high'] > recent.iloc[i+2]['high']): highs.append((i, float(recent.iloc[i]['high']))) # 局部低点:比前后两根都低 if (recent.iloc[i]['low'] < recent.iloc[i-1]['low'] and recent.iloc[i]['low'] < recent.iloc[i-2]['low'] and recent.iloc[i]['low'] < recent.iloc[i+1]['low'] and recent.iloc[i]['low'] < recent.iloc[i+2]['low']): lows.append((i, float(recent.iloc[i]['low']))) # 判断趋势结构 if len(highs) >= 2 and len(lows) >= 2: # 检查高点是否越来越高 higher_highs = all(highs[i][1] < highs[i+1][1] for i in range(len(highs)-1)) lower_highs = all(highs[i][1] > highs[i+1][1] for i in range(len(highs)-1)) # 检查低点是否越来越高 higher_lows = all(lows[i][1] < lows[i+1][1] for i in range(len(lows)-1)) lower_lows = all(lows[i][1] > lows[i+1][1] for i in range(len(lows)-1)) if higher_highs and higher_lows: lines.append("- **上升趋势**: 更高的高点(HH) + 更高的低点(HL)") elif lower_highs and lower_lows: lines.append("- **下降趋势**: 更低的高点(LH) + 更低的低点(LL)") elif higher_lows and lower_highs: lines.append("- **收敛三角形**: 高点下移 + 低点上移,即将突破") elif lower_lows and higher_highs: lines.append("- **扩散形态**: 波动加大,方向不明") else: lines.append("- **震荡结构**: 无明显趋势") else: lines.append("- **结构不明**: 高低点不足,难以判断") # 计算趋势强度 if len(recent) >= 10: price_change = (float(recent.iloc[-1]['close']) - float(recent.iloc[0]['close'])) / float(recent.iloc[0]['close']) * 100 if abs(price_change) > 3: direction = "上涨" if price_change > 0 else "下跌" lines.append(f"- 24h 趋势: {direction} {abs(price_change):.1f}%") return "\n".join(lines) def _analyze_volatility(self, data: Dict[str, pd.DataFrame]) -> str: """分析波动率变化""" df = data.get('1h') if df is None or len(df) < 24 or 'atr' not in df.columns: return "" lines = [] # ATR 变化趋势 recent_atr = df['atr'].iloc[-6:].mean() # 最近 6 根 older_atr = df['atr'].iloc[-12:-6].mean() # 之前 6 根 if pd.isna(recent_atr) or pd.isna(older_atr) or older_atr == 0: return "" atr_change = (recent_atr - older_atr) / older_atr * 100 current_atr = float(df['atr'].iloc[-1]) current_price = float(df['close'].iloc[-1]) atr_percent = current_atr / current_price * 100 lines.append(f"- 当前 ATR: ${current_atr:.2f} ({atr_percent:.2f}%)") if atr_change > 20: lines.append(f"- **波动率扩张**: ATR 上升 {atr_change:.0f}%,趋势可能启动") elif atr_change < -20: lines.append(f"- **波动率收缩**: ATR 下降 {abs(atr_change):.0f}%,可能即将突破") else: lines.append(f"- 波动率稳定: ATR 变化 {atr_change:+.0f}%") # 布林带宽度 if 'bb_upper' in df.columns and 'bb_lower' in df.columns: bb_width = (float(df['bb_upper'].iloc[-1]) - float(df['bb_lower'].iloc[-1])) / current_price * 100 bb_width_prev = (float(df['bb_upper'].iloc[-6]) - float(df['bb_lower'].iloc[-6])) / float(df['close'].iloc[-6]) * 100 if bb_width < bb_width_prev * 0.8: lines.append(f"- **布林带收口**: 宽度 {bb_width:.1f}%,变盘信号") elif bb_width > bb_width_prev * 1.2: lines.append(f"- **布林带开口**: 宽度 {bb_width:.1f}%,趋势延续") return "\n".join(lines) return "\n".join(parts) def _format_indicators(self, row: pd.Series, df: pd.DataFrame = None) -> str: """格式化指标数据(含趋势变化分析)""" lines = [] # 价格 close = row.get('close', 0) open_price = row.get('open', 0) high = row.get('high', 0) low = row.get('low', 0) change = ((close - open_price) / open_price * 100) if open_price else 0 lines.append(f"- K线: O={open_price:.2f} H={high:.2f} L={low:.2f} C={close:.2f} ({change:+.2f}%)") # 均线 ma5 = row.get('ma5', 0) ma10 = row.get('ma10', 0) ma20 = row.get('ma20', 0) ma50 = row.get('ma50', 0) if pd.notna(ma20): # 判断均线排列 if pd.notna(ma5) and pd.notna(ma10): if ma5 > ma10 > ma20: ma_trend = "多头排列" elif ma5 < ma10 < ma20: ma_trend = "空头排列" else: ma_trend = "交织" else: ma_trend = "" ma_str = f"- 均线: MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f}" if pd.notna(ma50): ma_str += f", MA50={ma50:.2f}" if ma_trend: ma_str += f" ({ma_trend})" lines.append(ma_str) # RSI(含趋势分析) rsi = row.get('rsi', 0) if pd.notna(rsi): rsi_status = "超卖" if rsi < 30 else ("超买" if rsi > 70 else "中性") rsi_trend = self._analyze_indicator_trend(df, 'rsi', 6) if df is not None else "" rsi_line = f"- RSI: {rsi:.1f} ({rsi_status})" if rsi_trend: rsi_line += f" {rsi_trend}" lines.append(rsi_line) # MACD(含趋势分析) macd = row.get('macd', 0) macd_signal = row.get('macd_signal', 0) macd_hist = row.get('macd_hist', 0) if pd.notna(macd): macd_status = "多头" if macd > macd_signal else "空头" macd_trend = self._analyze_macd_trend(df) if df is not None else "" macd_line = f"- MACD: DIF={macd:.4f}, DEA={macd_signal:.4f}, 柱={macd_hist:.4f} ({macd_status})" if macd_trend: macd_line += f" {macd_trend}" lines.append(macd_line) # KDJ(含金叉死叉检测) k = row.get('k', 0) d = row.get('d', 0) j = row.get('j', 0) if pd.notna(k): kdj_signal = self._detect_kdj_cross(df) if df is not None else "" kdj_line = f"- KDJ: K={k:.1f}, D={d:.1f}, J={j:.1f}" if kdj_signal: kdj_line += f" {kdj_signal}" lines.append(kdj_line) # 布林带(含位置分析) bb_upper = row.get('bb_upper', 0) bb_middle = row.get('bb_middle', 0) bb_lower = row.get('bb_lower', 0) if pd.notna(bb_upper): # 判断价格在布林带中的位置 if close >= bb_upper: bb_pos = "触及上轨" elif close <= bb_lower: bb_pos = "触及下轨" elif close > bb_middle: bb_pos = "中轨上方" else: bb_pos = "中轨下方" lines.append(f"- 布林带: 上={bb_upper:.2f}, 中={bb_middle:.2f}, 下={bb_lower:.2f} ({bb_pos})") # ATR atr = row.get('atr', 0) if pd.notna(atr): lines.append(f"- ATR: {atr:.2f}") # 成交量 volume = row.get('volume', 0) volume_ratio = row.get('volume_ratio', 0) if pd.notna(volume_ratio): vol_status = "放量" if volume_ratio > 1.5 else ("缩量" if volume_ratio < 0.5 else "正常") lines.append(f"- 成交量: {volume:.2f}, 量比={volume_ratio:.2f} ({vol_status})") return "\n".join(lines) def _analyze_indicator_trend(self, df: pd.DataFrame, indicator: str, lookback: int = 6) -> str: """分析指标趋势变化""" if df is None or len(df) < lookback: return "" recent = df[indicator].iloc[-lookback:] if recent.isna().any(): return "" first_val = recent.iloc[0] last_val = recent.iloc[-1] change = last_val - first_val # RSI 特殊处理 if indicator == 'rsi': if first_val > 70 and last_val < 70: return "[从超买回落]" elif first_val < 30 and last_val > 30: return "[从超卖反弹]" elif change > 10: return "[快速上升]" elif change < -10: return "[快速下降]" return "" def _analyze_macd_trend(self, df: pd.DataFrame, lookback: int = 6) -> str: """分析 MACD 趋势""" if df is None or len(df) < lookback: return "" recent_hist = df['macd_hist'].iloc[-lookback:] recent_macd = df['macd'].iloc[-lookback:] recent_signal = df['macd_signal'].iloc[-lookback:] if recent_hist.isna().any(): return "" # 检测金叉死叉 for i in range(-3, 0): if i - 1 >= -len(recent_macd): prev_diff = recent_macd.iloc[i-1] - recent_signal.iloc[i-1] curr_diff = recent_macd.iloc[i] - recent_signal.iloc[i] if prev_diff < 0 and curr_diff > 0: return "[刚刚金叉]" elif prev_diff > 0 and curr_diff < 0: return "[刚刚死叉]" # 检测柱状图趋势 positive_count = sum(1 for x in recent_hist if x > 0) hist_trend = recent_hist.iloc[-1] - recent_hist.iloc[-3] if len(recent_hist) >= 3 else 0 if positive_count == lookback and hist_trend > 0: return "[红柱持续放大]" elif positive_count == lookback and hist_trend < 0: return "[红柱开始缩小]" elif positive_count == 0 and hist_trend < 0: return "[绿柱持续放大]" elif positive_count == 0 and hist_trend > 0: return "[绿柱开始缩小]" return "" def _detect_kdj_cross(self, df: pd.DataFrame, lookback: int = 3) -> str: """检测 KDJ 金叉死叉""" if df is None or len(df) < lookback: return "" recent_k = df['k'].iloc[-lookback:] recent_d = df['d'].iloc[-lookback:] if recent_k.isna().any() or recent_d.isna().any(): return "" # 检测最近的交叉 for i in range(-lookback + 1, 0): prev_diff = recent_k.iloc[i-1] - recent_d.iloc[i-1] curr_diff = recent_k.iloc[i] - recent_d.iloc[i] if prev_diff < 0 and curr_diff > 0: # 金叉位置判断 k_val = recent_k.iloc[i] if k_val < 20: return "[低位金叉,强买入信号]" elif k_val < 50: return "[中位金叉]" else: return "[高位金叉,谨慎]" elif prev_diff > 0 and curr_diff < 0: k_val = recent_k.iloc[i] if k_val > 80: return "[高位死叉,强卖出信号]" elif k_val > 50: return "[中位死叉]" else: return "[低位死叉,谨慎]" return "" def _format_recent_klines(self, df: pd.DataFrame, interval: str) -> str: """格式化最近 K 线(含量价分析)""" # 根据周期决定显示数量 # 4h: 12根=2天, 1h: 24根=1天, 15m: 16根=4小时, 5m: 12根=1小时 count = {'4h': 12, '1h': 24, '15m': 16, '5m': 12}.get(interval, 12) if len(df) < count: count = len(df) lines = [f"\n最近 {count} 根 K 线(含量价数据):"] lines.append("| 时间 | 开盘 | 最高 | 最低 | 收盘 | 涨跌 | 成交量 | 量比 | RSI |") lines.append("|------|------|------|------|------|------|--------|------|-----|") for i in range(-count, 0): row = df.iloc[i] change = ((row['close'] - row['open']) / row['open'] * 100) if row['open'] else 0 change_str = f"{change:+.2f}%" time_str = row['open_time'].strftime('%m-%d %H:%M') if pd.notna(row.get('open_time')) else 'N/A' rsi = row.get('rsi', 0) rsi_str = f"{rsi:.0f}" if pd.notna(rsi) else "-" # 成交量和量比 volume = row.get('volume', 0) volume_ratio = row.get('volume_ratio', 1.0) if pd.notna(volume) and volume > 0: # 格式化成交量(大数字用K/M表示) if volume >= 1000000: vol_str = f"{volume/1000000:.1f}M" elif volume >= 1000: vol_str = f"{volume/1000:.1f}K" else: vol_str = f"{volume:.0f}" else: vol_str = "-" vol_ratio_str = f"{volume_ratio:.2f}" if pd.notna(volume_ratio) else "-" lines.append(f"| {time_str} | {row['open']:.2f} | {row['high']:.2f} | " f"{row['low']:.2f} | {row['close']:.2f} | {change_str} | {vol_str} | {vol_ratio_str} | {rsi_str} |") # 添加量价分析提示 lines.append(self._analyze_volume_price(df, count)) return "\n".join(lines) def _analyze_volume_price(self, df: pd.DataFrame, count: int) -> str: """分析量价关系""" if len(df) < count: return "" recent = df.iloc[-count:] analysis = [] # 计算价格趋势 price_change = (recent.iloc[-1]['close'] - recent.iloc[0]['close']) / recent.iloc[0]['close'] * 100 # 计算成交量趋势 vol_first_half = recent.iloc[:count//2]['volume'].mean() if 'volume' in recent.columns else 0 vol_second_half = recent.iloc[count//2:]['volume'].mean() if 'volume' in recent.columns else 0 if vol_first_half > 0 and vol_second_half > 0: vol_change = (vol_second_half - vol_first_half) / vol_first_half * 100 # 量价分析 if price_change > 1: # 上涨 if vol_change > 20: analysis.append("📈 **量价分析**: 放量上涨,上涨有效") elif vol_change < -20: analysis.append("⚠️ **量价分析**: 缩量上涨,警惕回调") else: analysis.append("➡️ **量价分析**: 量能平稳上涨") elif price_change < -1: # 下跌 if vol_change > 20: analysis.append("📉 **量价分析**: 放量下跌,下跌有效") elif vol_change < -20: analysis.append("💡 **量价分析**: 缩量下跌,关注企稳") else: analysis.append("➡️ **量价分析**: 量能平稳下跌") else: # 横盘 if vol_change < -30: analysis.append("🔄 **量价分析**: 缩量整理,等待方向") else: analysis.append("🔄 **量价分析**: 横盘震荡") # 检测量价背离 if len(df) >= 10: recent_10 = df.iloc[-10:] # 检查是否有新高/新低 price_high_idx = recent_10['high'].idxmax() price_low_idx = recent_10['low'].idxmin() if 'volume' in recent_10.columns: # 顶背离检测 if price_high_idx == recent_10.index[-1]: # 最新K线创新高 prev_high_idx = recent_10['high'].iloc[:-1].idxmax() if recent_10.loc[price_high_idx, 'volume'] < recent_10.loc[prev_high_idx, 'volume'] * 0.8: analysis.append("🔴 **顶背离**: 价格新高但量能不足,警惕回落") # 底背离检测 if price_low_idx == recent_10.index[-1]: # 最新K线创新低 prev_low_idx = recent_10['low'].iloc[:-1].idxmin() if recent_10.loc[price_low_idx, 'volume'] < recent_10.loc[prev_low_idx, 'volume'] * 0.8: analysis.append("🟢 **底背离**: 价格新低但量能萎缩,关注反弹") return "\n" + "\n".join(analysis) if analysis else "" def _parse_response(self, response: str) -> Dict[str, Any]: """解析 LLM 响应""" result = { 'raw_response': response, 'analysis_summary': '', 'signals': [], 'key_levels': {'support': [], 'resistance': []} } try: # 尝试提取 JSON json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response) if json_match: json_str = json_match.group(1) else: # 尝试直接解析 json_str = response parsed = json.loads(json_str) result['analysis_summary'] = parsed.get('analysis_summary', '') result['signals'] = parsed.get('signals', []) result['key_levels'] = parsed.get('key_levels', {'support': [], 'resistance': []}) # 验证和清理信号 valid_signals = [] for sig in result['signals']: if self._validate_signal(sig): valid_signals.append(sig) result['signals'] = valid_signals except json.JSONDecodeError: logger.warning("LLM 响应不是有效 JSON,尝试提取关键信息") result['analysis_summary'] = self._extract_summary(response) return result def _validate_signal(self, signal: Dict[str, Any]) -> bool: """验证信号是否有效""" required_fields = ['type', 'action', 'confidence', 'grade', 'reason'] for field in required_fields: if field not in signal: return False # 验证类型 if signal['type'] not in ['short_term', 'medium_term', 'long_term']: return False # 验证动作 if signal['action'] not in ['buy', 'sell', 'wait']: return False # wait 动作不算有效信号 if signal['action'] == 'wait': return False # 验证置信度(必须 >= 60 才算有效信号,即 B 级及以上) confidence = signal.get('confidence', 0) if not isinstance(confidence, (int, float)) or confidence < 60: return False # 验证入场类型(默认为 market) entry_type = signal.get('entry_type', 'market') if entry_type not in ['market', 'limit']: signal['entry_type'] = 'market' # 默认现价入场 # 验证仓位大小(默认根据等级设置) position_size = signal.get('position_size', '') if position_size not in ['heavy', 'medium', 'light']: # 根据信号等级设置默认仓位 grade = signal.get('grade', 'C') if grade == 'A': signal['position_size'] = 'medium' # A级默认中仓 elif grade == 'B': signal['position_size'] = 'light' # B级默认轻仓 else: signal['position_size'] = 'light' # C级默认轻仓 return True def _extract_summary(self, text: str) -> str: """从文本中提取摘要""" text = text.strip() if len(text) > 100: return text[:100] + "..." return text def _empty_result(self, symbol: str, reason: str = "") -> Dict[str, Any]: """返回空结果""" return { 'symbol': symbol, 'timestamp': datetime.now().isoformat(), 'analysis_summary': reason or '无法分析', 'signals': [], 'key_levels': {'support': [], 'resistance': []}, 'error': reason } def get_best_signal(self, result: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ 从分析结果中获取最佳信号 Args: result: analyze() 的返回结果 Returns: 最佳信号,如果没有则返回 None """ signals = result.get('signals', []) if not signals: return None # 按置信度排序 sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True) return sorted_signals[0] def format_signal_message(self, signal: Dict[str, Any], symbol: str) -> str: """ 格式化信号消息(用于 Telegram 通知) Args: signal: 信号数据 symbol: 交易对 Returns: 格式化的消息文本 """ # 获取股票名称 from app.stock_agent.stock_agent import STOCK_NAMES stock_name = STOCK_NAMES.get(symbol, '') type_map = { 'short_term': '短线', 'medium_term': '中线', 'long_term': '长线' } action_map = { 'buy': '做多', 'sell': '做空' } signal_type = type_map.get(signal['type'], signal['type']) action = action_map.get(signal['action'], signal['action']) grade = signal.get('grade', 'C') confidence = signal.get('confidence', 0) entry_type = signal.get('entry_type', 'market') # 等级图标 grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(grade, '') # 方向图标 action_icon = '🟢' if signal['action'] == 'buy' else '🔴' # 入场类型 entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待' entry_type_icon = '⚡' if entry_type == 'market' else '⏳' # 仓位大小 position_size = signal.get('position_size', 'light') position_map = {'heavy': '重仓', 'medium': '中仓', 'light': '轻仓'} position_icon = {'heavy': '🔥', 'medium': '📊', 'light': '🌱'}.get(position_size, '🌱') position_text = position_map.get(position_size, '轻仓') # 计算风险收益比 entry = signal.get('entry_price', 0) sl = signal.get('stop_loss', 0) tp = signal.get('take_profit', 0) sl_percent = ((sl - entry) / entry * 100) if entry else 0 tp_percent = ((tp - entry) / entry * 100) if entry else 0 # 构建标题(带股票名称) symbol_display = f"{stock_name}({symbol})" if stock_name else symbol message = f"""📊 {symbol_display} {signal_type}信号 {action_icon} **方向**: {action} {entry_type_icon} **入场**: {entry_type_text} {position_icon} **仓位**: {position_text} ⭐ **等级**: {grade} {grade_icon} 📈 **置信度**: {confidence}% 💰 **入场价**: ${entry:,.2f} 🛑 **止损价**: ${sl:,.2f} ({sl_percent:+.1f}%) 🎯 **止盈价**: ${tp:,.2f} ({tp_percent:+.1f}%) 📝 **分析理由**: {signal.get('reason', '无')} ⚠️ **风险提示**: {signal.get('risk_warning', '请注意风险控制')}""" return message def format_feishu_card(self, signal: Dict[str, Any], symbol: str) -> Dict[str, Any]: """ 格式化飞书卡片消息 Args: signal: 信号数据 symbol: 交易对 Returns: 包含 title, content, color 的字典 """ # 获取股票名称 from app.stock_agent.stock_agent import STOCK_NAMES stock_name = STOCK_NAMES.get(symbol, '') type_map = { 'short_term': '短线', 'medium_term': '中线', 'long_term': '长线' } action_map = { 'buy': '做多', 'sell': '做空' } signal_type = type_map.get(signal['type'], signal['type']) action = action_map.get(signal['action'], signal['action']) grade = signal.get('grade', 'C') confidence = signal.get('confidence', 0) entry_type = signal.get('entry_type', 'market') # 等级图标 grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(grade, '') # 入场类型 entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待' entry_type_icon = '⚡' if entry_type == 'market' else '⏳' # 仓位大小 position_size = signal.get('position_size', 'light') position_map = {'heavy': '重仓', 'medium': '中仓', 'light': '轻仓'} position_icon = {'heavy': '🔥', 'medium': '📊', 'light': '🌱'}.get(position_size, '🌱') position_text = position_map.get(position_size, '轻仓') # 标题和颜色 - 区分美股/港股 is_market_order = entry_type == 'market' market_badge = '【现价】' if is_market_order else '' # 识别市场类型(港股以 .HK 结尾) if symbol.endswith('.HK'): market_tag = '[港股] ' else: market_tag = '[美股] ' # 构建带名称的股票显示 symbol_display = f"{stock_name}({symbol})" if stock_name else symbol if signal['action'] == 'buy': title = f"🟢 {market_tag}{symbol_display} {signal_type}做多信号 {market_badge}" color = "green" else: title = f"🔴 {market_tag}{symbol_display} {signal_type}做空信号 {market_badge}" color = "red" # 计算风险收益比 entry = signal.get('entry_price', 0) sl = signal.get('stop_loss', 0) tp = signal.get('take_profit', 0) sl_percent = ((sl - entry) / entry * 100) if entry else 0 tp_percent = ((tp - entry) / entry * 100) if entry else 0 # 构建 Markdown 内容 - 现价时突出显示 if is_market_order: # 现价入场,重点突出价格 content_parts = [ f"**{signal_type}** | **{grade}**{grade_icon} | **{confidence}%** 置信度", f"{entry_type_icon} **入场方式**: {entry_type_text}", f"{position_icon} **建议仓位**: {position_text}", "", f"💰 **⭐ 现价入场 ⭐**", f"**>>> ${entry:,.2f} <<<**", "", f"🛑 **止损**: ${sl:,.2f} ({sl_percent:+.1f}%)", f"🎯 **止盈**: ${tp:,.2f} ({tp_percent:+.1f}%)", "", f"📝 **分析理由**:", f"{signal.get('reason', '无')}", "", f"⚠️ **风险提示**:", f"{signal.get('risk_warning', '请注意风险控制')}", ] else: # 挂单,正常显示 content_parts = [ f"**{signal_type}** | **{grade}**{grade_icon} | **{confidence}%** 置信度", f"{entry_type_icon} **入场方式**: {entry_type_text}", f"{position_icon} **建议仓位**: {position_text}", "", f"💰 **入场**: ${entry:,.2f}", f"🛑 **止损**: ${sl:,.2f} ({sl_percent:+.1f}%)", f"🎯 **止盈**: ${tp:,.2f} ({tp_percent:+.1f}%)", "", f"📝 **分析理由**:", f"{signal.get('reason', '无')}", "", f"⚠️ **风险提示**:", f"{signal.get('risk_warning', '请注意风险控制')}", ] return { 'title': title, 'content': '\n'.join(content_parts), 'color': color } # 持仓回顾分析的 System Prompt POSITION_REVIEW_PROMPT = """你是一个专业的加密货币交易风险管理专家。你的任务是回顾现有持仓,根据最新市场行情决定是否需要调整。 ## 你的职责 对于每个持仓,你需要分析: 1. 当前持仓状态(盈亏、持仓时间、风险敞口) 2. 最新市场行情(趋势、支撑阻力、技术指标) 3. 原有交易逻辑是否依然有效 4. 是否需要调整止损止盈 5. 是否需要平仓(部分或全部) ## 决策类型 ### 1. HOLD(保持) - 适用场景:行情符合预期,趋势延续 - 操作:不改变任何设置 ### 2. ADJUST_SL_TP(调整止损止盈) - 适用场景: - **盈利状态**:趋势强劲,可以收紧止损锁定更多利润 - **亏损状态**:支撑/阻力位变化,需要调整止损到更合理位置 - **目标接近**:原止盈目标接近,但趋势仍强,可上移止盈 - 操作:更新 stop_loss 和/或 take_profit ### 3. PARTIAL_CLOSE(部分平仓) - 适用场景: - 盈利较大,但不确定性增加 - 重要阻力位附近,锁定部分利润 - 趋势有转弱迹象 - 操作:平掉 close_percent 比例的仓位 ### 4. FULL_CLOSE(全部平仓) - 适用场景: - **止损型**:趋势明确反转,止损信号出现 - **止盈型**:目标达成,或出现更好的机会 - **风险型**:重大利空/利好的不确定性 - 操作:平掉全部仓位 ## 调整原则 ### 盈利状态(盈亏 > 0) 1. **收紧止损**:如果盈利 > 2%,可以将止损移至保本或盈利 1% 位置 2. **部分止盈**:如果盈利 > 5% 且接近重要阻力位,可平掉 30-50% 仓位 3. **继续持有**:如果趋势强劲,可以放宽止损让利润奔跑 ### 亏损状态(盈亏 < 0) 1. **提前止损**:如果出现明确的反转信号,不要等止损触发 2. **调整止损**:如果关键支撑/阻力位变化,更新止损位置 3. **继续持有**:如果只是正常波动,原交易逻辑未变,继续持有 ### 重要技术信号 1. **趋势反转**:多周期共振转反、跌破/突破关键 MA 2. **量价背离**:价格新高但成交量萎缩 3. **MACD 背离**:价格新高/新低但 MACD 未确认 4. **RSI 极端**:RSI > 75 或 < 25 后掉头 ## 输出格式 对于每个持仓,输出 JSON: ```json { "order_id": "订单ID", "action": "HOLD | ADJUST_SL_TP | PARTIAL_CLOSE | FULL_CLOSE", "new_sl": 新止损价格(仅 ADJUST_SL_TP 时), "new_tp": 新止盈价格(仅 ADJUST_SL_TP 时), "close_percent": 平仓比例 0-100(仅 PARTIAL_CLOSE 时), "reason": "调整原因(简明扼要,20字以内)" } ``` ## 重要原则 1. **主动管理**:不要被动等待止损触发,主动识别风险 2. **保护利润**:盈利状态下,优先考虑锁定利润 3. **果断止损**:亏损状态下,如果趋势反转,果断离场 4. **灵活调整**:根据最新行情,不局限于开仓时的判断 5. **考虑成本**:频繁调整会增加交易成本,只在有明确信号时调整 """ async def review_positions( self, symbol: str, positions: List[Dict[str, Any]], data: Dict[str, pd.DataFrame] ) -> List[Dict[str, Any]]: """ 回顾并分析现有持仓,给出调整建议 Args: symbol: 交易对 positions: 持仓列表,每个持仓包含: - order_id: 订单ID - side: 'long' or 'short' - entry_price: 开仓价格 - current_price: 当前价格 - stop_loss: 当前止损 - take_profit: 当前止盈 - quantity: 仓位数量 - pnl_percent: 盈亏百分比 - open_time: 开仓时间 data: 多周期K线数据 Returns: 调整建议列表 """ if not positions: return [] try: # 构建持仓分析提示 prompt = self._build_position_review_prompt(symbol, positions, data) # 调用 LLM response = llm_service.chat([ {"role": "system", "content": self.POSITION_REVIEW_PROMPT}, {"role": "user", "content": prompt} ], model_override=self.model_override) if not response: logger.warning(f"{symbol} 持仓回顾 LLM 分析无响应") return [] # 解析响应 return self._parse_position_review_response(response) except Exception as e: logger.error(f"持仓回顾分析失败: {e}", exc_info=True) return [] def _build_position_review_prompt( self, symbol: str, positions: List[Dict[str, Any]], data: Dict[str, pd.DataFrame] ) -> str: """构建持仓分析提示""" lines = [f"# {symbol} 持仓回顾分析", f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"] lines.append("\n## 当前持仓") for idx, pos in enumerate(positions, 1): side_text = "做多 📈" if pos['side'] == 'long' else "做空 📉" pnl_text = f"+{pos['pnl_percent']:.1f}%" if pos['pnl_percent'] >= 0 else f"{pos['pnl_percent']:.1f}%" pnl_emoji = "✅" if pos['pnl_percent'] >= 0 else "❌" lines.append(f"\n### 持仓 {idx}: {pos['order_id']}") lines.append(f"- 方向: {side_text}") lines.append(f"- 开仓价: ${pos['entry_price']:,.2f}") lines.append(f"- 当前价: ${pos['current_price']:,.2f}") lines.append(f"- 盈亏: {pnl_emoji} {pnl_text}") lines.append(f"- 止损: ${pos['stop_loss']:,.2f}") lines.append(f"- 止盈: ${pos['take_profit']:,.2f}") lines.append(f"- 仓位: ${pos['quantity']:,.0f}") # 计算持仓时间 if 'open_time' in pos: open_time = pos['open_time'] if isinstance(open_time, str): open_time = datetime.fromisoformat(open_time) duration = datetime.now() - open_time hours = duration.total_seconds() / 3600 lines.append(f"- 持仓时间: {hours:.1f} 小时") # 添加市场分析 lines.append("\n## 最新市场分析") # 使用 1h 和 4h 数据分析 for interval in ['4h', '1h']: df = data.get(interval) if df is None or len(df) < 20: continue latest = df.iloc[-1] prev = df.iloc[-2] lines.append(f"\n### {interval} 周期") lines.append(f"- 当前价格: ${latest['close']:,.2f}") lines.append(f"- 涨跌幅: {((latest['close'] - prev['close']) / prev['close'] * 100):+.2f}%") if 'ma5' in df.columns and pd.notna(latest['ma5']): lines.append(f"- MA5: ${latest['ma5']:,.2f}") if 'ma20' in df.columns and pd.notna(latest['ma20']): lines.append(f"- MA20: ${latest['ma20']:,.2f}") if 'rsi' in df.columns and pd.notna(latest['rsi']): rsi_val = latest['rsi'] rsi_status = "超买" if rsi_val > 70 else "超卖" if rsi_val < 30 else "正常" lines.append(f"- RSI: {rsi_val:.1f} ({rsi_status})") if 'macd' in df.columns and pd.notna(latest['macd']): macd_trend = "多头" if latest['macd'] > 0 else "空头" lines.append(f"- MACD: {latest['macd']:.4f} ({macd_trend})") # 添加趋势判断 lines.append("\n## 请给出调整建议") lines.append("对于每个持仓,请分析是否需要调整,并按 JSON 格式输出。") return "\n".join(lines) def _parse_position_review_response(self, response: str) -> List[Dict[str, Any]]: """解析持仓回顾响应""" try: # 尝试提取 JSON 数组 import json import re # 查找 JSON 数组 json_match = re.search(r'\[\s*\{.*?\}\s*\]', response, re.DOTALL) if json_match: json_str = json_match.group(0) decisions = json.loads(json_str) # 验证每个决策的格式 valid_decisions = [] for decision in decisions: if 'order_id' in decision and 'action' in decision: valid_decisions.append(decision) else: logger.warning(f"无效的决策格式: {decision}") return valid_decisions # 如果找不到 JSON 数组,尝试解析单个对象 json_match = re.search(r'\{[^{}]*"action"[^{}]*\}', response, re.DOTALL) if json_match: json_str = json_match.group(0) decision = json.loads(json_str) if 'order_id' in decision and 'action' in decision: return [decision] logger.warning(f"无法解析持仓回顾响应: {response[:200]}") return [] except json.JSONDecodeError as e: logger.error(f"解析持仓回顾 JSON 失败: {e}") return [] except Exception as e: logger.error(f"解析持仓回顾响应时出错: {e}") return []