From 6a067fd39e036e5b8f7a14dad6115363e4e95af3 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Mon, 30 Mar 2026 00:53:36 +0800 Subject: [PATCH] update --- backend/app/api/paper_trading.py | 37 +- backend/app/crypto_agent/crypto_agent.py | 307 ++-- .../crypto_agent/market_signal_analyzer.py | 991 +++++++------ .../crypto_agent/trading_decision_maker.py | 1291 ----------------- backend/app/services/paper_trading_service.py | 10 + .../test_market_signal_analyzer_lane_rules.py | 177 +++ frontend/trading.html | 131 +- 7 files changed, 997 insertions(+), 1947 deletions(-) delete mode 100644 backend/app/crypto_agent/trading_decision_maker.py create mode 100644 backend/tests/test_market_signal_analyzer_lane_rules.py diff --git a/backend/app/api/paper_trading.py b/backend/app/api/paper_trading.py index 987bf09..cddc02c 100644 --- a/backend/app/api/paper_trading.py +++ b/backend/app/api/paper_trading.py @@ -110,7 +110,7 @@ async def get_order(order_id: str): @router.post("/orders/{order_id}/close") -async def close_order(order_id: str, request: CloseOrderRequest): +async def close_order(order_id: str, request: Optional[CloseOrderRequest] = None): """ 手动平仓 @@ -119,7 +119,17 @@ async def close_order(order_id: str, request: CloseOrderRequest): """ try: service = get_paper_trading_service() - result = service.close_order_manual(order_id, request.exit_price) + order = service.get_order_by_id(order_id) + if not order: + raise HTTPException(status_code=404, detail="订单不存在或已平仓") + + exit_price = request.exit_price if request and request.exit_price > 0 else 0 + if exit_price <= 0: + exit_price = service._get_current_price(order.get('symbol', '')) + if exit_price <= 0: + exit_price = order.get('filled_price') or order.get('entry_price') or 0 + + result = service.close_order_manual(order_id, float(exit_price)) if not result: raise HTTPException(status_code=404, detail="订单不存在或已平仓") @@ -136,6 +146,28 @@ async def close_order(order_id: str, request: CloseOrderRequest): raise HTTPException(status_code=500, detail=str(e)) +@router.post("/orders/{order_id}/cancel") +async def cancel_order(order_id: str): + """撤销挂单""" + try: + service = get_paper_trading_service() + result = service.cancel_order(order_id) + + if not result.get("success"): + raise HTTPException(status_code=400, detail=result.get("message", "撤单失败")) + + return { + "success": True, + "message": result.get("message", "撤单成功"), + "result": result + } + except HTTPException: + raise + except Exception as e: + logger.error(f"撤单失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.delete("/orders/{order_id}") async def delete_order( order_id: str, @@ -409,7 +441,6 @@ async def reset_paper_trading(): logger.error(f"重置交易数据失败: {e}") raise HTTPException(status_code=500, detail=str(e)) - @router.post("/recalculate-statistics") async def recalculate_statistics(): """ diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index be5c040..9fd03ef 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -16,7 +16,6 @@ from app.services.dingtalk_service import get_dingtalk_service from app.services.paper_trading_service import get_paper_trading_service from app.services.signal_database_service import get_signal_db_service from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer -from app.crypto_agent.trading_decision_maker import TradingDecisionMaker from app.utils.system_status import get_system_monitor, AgentStatus @@ -82,6 +81,18 @@ class CryptoAgent: 'long_term': 1.0, } + SIGNAL_MIN_STOP_LOSS_PCT = { + 'short_term': 0.6, + 'medium_term': 1.0, + 'long_term': 1.2, + } + + SIGNAL_MIN_TAKE_PROFIT_PCT = { + 'short_term': 1.0, + 'medium_term': 2.0, + 'long_term': 2.5, + } + def __new__(cls, *args, **kwargs): """单例模式 - 确保只有一个实例""" if cls._instance is None: @@ -102,21 +113,14 @@ class CryptoAgent: self.telegram = get_telegram_service() self.dingtalk = get_dingtalk_service() # 添加钉钉服务 - # 新架构:市场信号分析器 + 交易决策器 + # 信号层:只负责市场分析 self.market_analyzer = MarketSignalAnalyzer() - self.decision_maker = None # 延迟初始化,需要 paper_trading 的杠杆配置 self.signal_db = get_signal_db_service() # 信号数据库服务 # 模拟交易服务(始终启用) self.paper_trading = get_paper_trading_service() - # 初始化决策器(需要杠杆配置) - self.decision_maker = TradingDecisionMaker( - leverage=self.paper_trading.leverage, - max_total_leverage=self.paper_trading.max_total_leverage - ) - # Hyperliquid 实盘服务(可选) from app.services.hyperliquid_trading_service import get_hyperliquid_service self.hyperliquid = get_hyperliquid_service() @@ -612,12 +616,12 @@ class CryptoAgent: async def analyze_symbol(self, symbol: str): """ - 分析单个交易对(新架构:市场分析 + 交易决策分离) + 分析单个交易对(信号分析 + 平台执行规则) - 新架构流程: + 当前流程: 1. 市场信号分析器分析市场(不包含仓位信息) - 2. 交易决策器根据信号+仓位+账户状态做决策 - 3. 执行交易决策 + 2. 各平台按自身规则筛选并处理信号 + 3. 执行交易动作 Args: symbol: 交易对,如 'BTCUSDT' @@ -696,7 +700,7 @@ class CryptoAgent: ) # ============================================================ - # 发送市场信号通知(独立于交易决策) + # 发送市场信号通知 # ============================================================ await self._send_market_signal_notification(market_signal, current_price) @@ -725,10 +729,6 @@ class CryptoAgent: else: logger.info(" 无可执行信号") paper_decision = {"decision": "HOLD", "action": "IGNORE", "reason": "无适配信号", "reasoning": "无适配信号"} - # 不发送决策通知(因为是基于硬编码规则的执行,不是 LLM 决策) - # await self._send_trading_decision_notification( - # paper_decision, market_signal, current_price, prefix="[模拟盘]" - # ) else: paper_decision = {"action": "IGNORE", "reason": "未启用"} logger.info(f"⏸️ 模拟盘交易未启用") @@ -753,10 +753,6 @@ class CryptoAgent: else: logger.info(" 无可执行信号") hl_decision = {"decision": "HOLD", "action": "IGNORE", "reason": "无适配信号", "reasoning": "无适配信号"} - # 不发送决策通知(因为是基于硬编码规则的执行,不是 LLM 决策) - # await self._send_trading_decision_notification( - # hl_decision, market_signal, current_price, prefix="[Hyperliquid]" - # ) else: hl_decision = {"action": "IGNORE", "reason": "未启用"} logger.info(f"⏸️ Hyperliquid 实盘交易未启用") @@ -781,16 +777,12 @@ class CryptoAgent: else: logger.info(" 无可执行信号") bg_decision = {"decision": "HOLD", "action": "IGNORE", "reason": "无适配信号", "reasoning": "无适配信号"} - # 不发送决策通知(因为是基于硬编码规则的执行,不是 LLM 决策) - # await self._send_trading_decision_notification( - # bg_decision, market_signal, current_price, prefix="[Bitget]" - # ) else: bg_decision = {"action": "IGNORE", "reason": "未启用"} logger.info(f"⏸️ Bitget 实盘交易未启用") # ============================================================ - # 第三阶段:执行交易决策(各平台独立) + # 第三阶段:执行交易动作(各平台独立) # ============================================================ await self._execute_decisions(paper_decision, hl_decision, bg_decision, market_signal, current_price) @@ -870,41 +862,6 @@ class CryptoAgent: logger.info(f" 信心度: {confidence}%") logger.info(f" 理由: {sig.get('reasoning', 'N/A')}") - def _log_trading_decision(self, decision: Dict[str, Any]): - """输出交易决策结果""" - decision_type = decision.get('decision', 'HOLD') - decision_map = { - 'OPEN': '🟢 开仓', - 'CLOSE': '🔴 平仓', - 'ADD': '➕ 加仓', - 'REDUCE': '➖ 减仓', - 'HOLD': '⏸️ 观望' - } - - logger.info(f" 决策: {decision_map.get(decision_type, decision_type)}") - logger.info(f" 动作: {decision.get('action', 'N/A')}") - logger.info(f" 仓位: {decision.get('position_size', 'N/A')}") - - # quantity 是保证金,显示持仓价值 = 保证金 × 杠杆 - quantity = decision.get('quantity', 0) - if isinstance(quantity, (int, float)) and quantity > 0: - leverage = self.paper_trading.leverage # 使用实际的杠杆配置 - position_value = quantity * leverage - logger.info(f" 持仓价值: ${position_value:,.2f} (保证金 ${quantity:.2f})") - else: - logger.info(f" 数量: ${decision.get('quantity', 'N/A')}") - - if decision.get('stop_loss'): - logger.info(f" 止损: ${decision.get('stop_loss')}") - if decision.get('take_profit'): - logger.info(f" 止盈: ${decision.get('take_profit')}") - - logger.info(f" 理由: {decision.get('reasoning', 'N/A')}") - - risk = decision.get('risk_analysis', '') - if risk: - logger.info(f" 风险: {risk}") - def _get_paper_trading_state(self) -> tuple: """ 获取模拟盘交易状态(持仓和账户) @@ -1173,6 +1130,9 @@ class CryptoAgent: if decision_type == 'HOLD': reasoning = decision.get('reasoning', decision.get('reason', '观望')) logger.info(f"\n📊 交易决策: {reasoning}") + await self._notify_signal_not_executed( + market_signal, decision, current_price, reason=f"[模拟盘] {reasoning}", prefix="[模拟盘]" + ) return logger.info(f"\n📊 【执行交易】") @@ -1478,144 +1438,6 @@ class CryptoAgent: import traceback logger.debug(traceback.format_exc()) - async def _send_trading_decision_notification(self, decision: Dict[str, Any], - market_signal: Dict[str, Any], - current_price: float, - prefix: str = ""): - """发送交易决策通知(第二阶段)""" - try: - decision_type = decision.get('decision', 'HOLD') - symbol = market_signal.get('symbol') - - # 账户类型标识 - account_type = f"{prefix} 📊 交易" if prefix else "📊 交易" - - # 决策类型映射 - decision_map = { - 'OPEN': '开仓', - 'CLOSE': '平仓', - 'ADD': '加仓', - 'REDUCE': '减仓', - 'CANCEL_PENDING': '取消挂单', - 'HOLD': '观望' - } - - decision_text = decision_map.get(decision_type, decision_type) - - # 根据决策类型设置颜色 - color_map = { - 'OPEN': 'green', - 'ADD': 'green', - 'CLOSE': 'orange', - 'REDUCE': 'orange', - 'CANCEL_PENDING': 'red', - 'HOLD': 'gray' - } - color = color_map.get(decision_type, 'blue') - - # 构建标题 - 添加 [决策] 前缀区分 - title = f"[决策] {account_type} {symbol} 交易决策: {decision_text}" - - # 获取最佳信号用于显示 - best_signal = self._get_signal_for_decision(market_signal, decision) - signal_confidence = best_signal.get('confidence', 0) if best_signal else 0 - signal_action = best_signal.get('action', '') if best_signal else '' - signal_timeframe = best_signal.get('timeframe', best_signal.get('type', 'unknown')) if best_signal else 'unknown' - timeframe_map = {'short_term': '短线', 'medium_term': '趋势', 'long_term': '长线'} - timeframe_text = timeframe_map.get(signal_timeframe, signal_timeframe) - - # 方向图标 - if 'buy' in signal_action.lower() or 'long' in signal_action.lower(): - action_icon = '🟢' - action_text = '做多' - elif 'sell' in signal_action.lower() or 'short' in signal_action.lower(): - action_icon = '🔴' - action_text = '做空' - else: - action_icon = '➖' - action_text = '观望' - - # 构建内容 - content_parts = [ - f"{action_icon} **市场信号**: {action_text} | {timeframe_text} | 信心度: {signal_confidence}%", - f"", - f"🎯 **交易决策**: {decision_text}", - f"", - ] - - # 添加决策详情 - if decision_type != 'HOLD': - reasoning = decision.get('reasoning', '') - risk_analysis = decision.get('risk_analysis', '') - position_size = decision.get('position_size', 'N/A') - - # 仓位图标 - position_map = {'heavy': '🔥 重仓', 'medium': '📊 中仓', 'light': '🌱 轻仓', 'micro': '🌱 微仓'} - position_display = position_map.get(position_size, position_size) - - # HTML转义,避免特殊字符破坏HTML格式 - import html - escaped_reasoning = html.escape(reasoning) if reasoning else '' - escaped_risk = html.escape(risk_analysis) if risk_analysis else '' - - content_parts.extend([ - f"📊 **仓位**: {position_display}", - f"💭 **决策理由**: {escaped_reasoning}", - ]) - - if escaped_risk: - content_parts.append(f"⚠️ **风险**: {escaped_risk}") - - # 添加价格信息(如果有) - quantity = decision.get('quantity', 0) - if isinstance(quantity, (int, float)) and quantity > 0: - leverage = self.paper_trading.leverage # 使用实际的杠杆配置 - position_value = quantity * leverage - content_parts.append(f"💰 **持仓价值**: ${position_value:,.2f} (保证金 ${quantity:.2f})") - - stop_loss = decision.get('stop_loss') - take_profit = decision.get('take_profit') - if stop_loss: - content_parts.append(f"🛑 **止损**: ${stop_loss}") - if take_profit: - content_parts.append(f"🎯 **止盈**: ${take_profit}") - - # 取消挂单时显示要取消的订单 - if decision_type == 'CANCEL_PENDING': - orders_to_cancel = decision.get('orders_to_cancel', []) - if orders_to_cancel: - content_parts.append(f"🚫 **取消订单**: {len(orders_to_cancel)} 个") - for order_id in orders_to_cancel[:3]: # 最多显示3个 - content_parts.append(f" - {order_id}") - if len(orders_to_cancel) > 3: - content_parts.append(f" - ... 还有 {len(orders_to_cancel) - 3} 个") - else: - # HOLD 决策 - reasoning = decision.get('reasoning', '综合评估后选择观望') - content_parts.append(f"💭 **理由**: {reasoning}") - - content_parts.append("") - content_parts.append(f"⏰ 当前价格: ${current_price:,.2f}") - - content = "\n".join(content_parts) - - # 发送通知 - [决策] 发送到 paper_trading webhook(trading) - if self.settings.feishu_enabled: - await self.feishu_paper.send_card(title, content, color) - if self.settings.telegram_enabled: - # Telegram 使用文本格式 - message = f"{title}\n\n{content}" - await self.telegram.send_message(message) - if self.settings.dingtalk_enabled: - await self.dingtalk.send_action_card(title, content) - - logger.info(f" 📤 已发送交易决策通知: {decision_text}") - - except Exception as e: - logger.warning(f"发送交易决策通知失败: {e}") - import traceback - logger.debug(traceback.format_exc()) - async def _send_signal_notification(self, market_signal: Dict[str, Any], decision: Dict[str, Any], current_price: float, prefix: str = "", hl_order_status: str = None): @@ -2579,6 +2401,16 @@ class CryptoAgent: reward = entry - tp if risk > 0: + stop_distance_pct = risk / entry * 100 + take_distance_pct = reward / entry * 100 + min_stop_pct = self.SIGNAL_MIN_STOP_LOSS_PCT.get(signal_type, 0.6) + min_take_pct = self.SIGNAL_MIN_TAKE_PROFIT_PCT.get(signal_type, 1.0) + + if stop_distance_pct < min_stop_pct: + return False, f"{signal_type} 止损距离 {stop_distance_pct:.2f}% < {min_stop_pct:.1f}%,不执行" + if take_distance_pct < min_take_pct: + return False, f"{signal_type} 止盈距离 {take_distance_pct:.2f}% < {min_take_pct:.1f}%,不执行" + risk_reward_ratio = reward / risk if risk_reward_ratio < min_rr: return False, f"{signal_type} 盈亏比 {risk_reward_ratio:.2f} < {min_rr:.1f},不执行" @@ -2724,6 +2556,9 @@ class CryptoAgent: if decision_type == 'HOLD': reasoning = decision.get('reasoning', decision.get('reason', '观望')) logger.info(f" Bitget 决策: {reasoning}") + await self._notify_signal_not_executed( + market_signal, decision, current_price, reason=f"[Bitget] {reasoning}", prefix="[Bitget]" + ) return # 使用执行器 @@ -3063,6 +2898,9 @@ class CryptoAgent: if decision_type == 'HOLD': reasoning = decision.get('reasoning', decision.get('reason', '观望')) logger.info(f" Hyperliquid 决策: {reasoning}") + await self._notify_signal_not_executed( + market_signal, decision, current_price, reason=f"[Hyperliquid] {reasoning}", prefix="[Hyperliquid]" + ) return # 使用执行器 @@ -3557,12 +3395,14 @@ class CryptoAgent: market_signal: Dict[str, Any], decision: Dict[str, Any], current_price: float, - reason: str = "" + reason: str = "", + prefix: str = "" ): """发送有信号但未执行交易的通知""" try: symbol = market_signal.get('symbol') account_type = "📊" + title_prefix = f"{prefix} " if prefix else "" signal = self._get_signal_for_decision(market_signal, decision) if not signal: @@ -3600,7 +3440,7 @@ class CryptoAgent: action_text = '观望' # 构建标题 - title = f"{account_type} {symbol} 信号未执行" + title = f"{title_prefix}{account_type} {symbol} 信号未执行" # 构建内容 content_parts = [ @@ -3631,26 +3471,75 @@ class CryptoAgent: logger.warning(f"发送信号未执行通知失败: {e}") async def analyze_once(self, symbol: str) -> Dict[str, Any]: - """单次分析(用于测试或手动触发)""" + """单次分析并返回市场信号与平台执行预览""" data = self.exchange.get_multi_timeframe_data(symbol) if not self._validate_data(data): return {'error': '数据不完整'} - # 使用新架构:市场分析 + 交易决策 + current_price = float(data['5m'].iloc[-1]['close']) market_signal = await self.market_analyzer.analyze( symbol, data, symbols=self.symbols ) - positions, account = self._get_trading_state() - decision = await self.decision_maker.make_decision( - market_signal, positions, account - ) + signals = market_signal.get('signals', []) + threshold = self.settings.crypto_llm_threshold * 100 + valid_signals = [ + signal for signal in signals + if signal.get('action') in {'buy', 'sell'} and signal.get('confidence', 0) >= threshold + ] + + execution_preview: Dict[str, Any] = {} + + if self.settings.paper_trading_enabled: + paper_positions, paper_account, paper_pending = self._get_paper_trading_state() + paper_signal = self._select_signal_for_platform(valid_signals, 'PaperTrading') + execution_preview['PaperTrading'] = self._normalize_execution_decision( + self.execute_signal_with_rules( + self._build_execution_signal(symbol, paper_signal, current_price), + 'PaperTrading', + paper_account, + paper_positions, + paper_pending, + ), + paper_positions, + paper_pending, + ) if paper_signal else {"decision": "HOLD", "action": "IGNORE", "reason": "无适配信号", "reasoning": "无适配信号"} + + if self.hyperliquid: + hl_positions, hl_account, hl_pending = self._get_hyperliquid_trading_state() + hl_signal = self._select_signal_for_platform(valid_signals, 'Hyperliquid') + execution_preview['Hyperliquid'] = self._normalize_execution_decision( + self.execute_signal_with_rules( + self._build_execution_signal(symbol, hl_signal, current_price), + 'Hyperliquid', + hl_account, + hl_positions, + hl_pending, + ), + hl_positions, + hl_pending, + ) if hl_signal else {"decision": "HOLD", "action": "IGNORE", "reason": "无适配信号", "reasoning": "无适配信号"} + + if self.bitget: + bg_positions, bg_account, bg_pending = self._get_bitget_trading_state() + bg_signal = self._select_signal_for_platform(valid_signals, 'Bitget') + execution_preview['Bitget'] = self._normalize_execution_decision( + self.execute_signal_with_rules( + self._build_execution_signal(symbol, bg_signal, current_price), + 'Bitget', + bg_account, + bg_positions, + bg_pending, + ), + bg_positions, + bg_pending, + ) if bg_signal else {"decision": "HOLD", "action": "IGNORE", "reason": "无适配信号", "reasoning": "无适配信号"} return { 'market_signal': market_signal, - 'trading_decision': decision + 'execution_preview': execution_preview, } def get_status(self) -> Dict[str, Any]: diff --git a/backend/app/crypto_agent/market_signal_analyzer.py b/backend/app/crypto_agent/market_signal_analyzer.py index bbb439d..1f9b3d5 100644 --- a/backend/app/crypto_agent/market_signal_analyzer.py +++ b/backend/app/crypto_agent/market_signal_analyzer.py @@ -34,33 +34,52 @@ class MarketSignalAnalyzer: 'short_term': 60, 'medium_term': 65, } + LANE_MIN_RISK_REWARD = { + 'short_term': 1.5, + 'medium_term': 1.8, + } + LANE_MIN_STOP_LOSS_PCT = { + 'short_term': 0.6, + 'medium_term': 1.0, + } + LANE_MIN_TAKE_PROFIT_PCT = { + 'short_term': 1.0, + 'medium_term': 2.0, + } + FIB_MIN_PIVOT_SEPARATION_BARS = 4 + FIB_PIVOT_VOLUME_LOOKBACK = 20 INTRADAY_ANALYSIS_PROMPT = """你是一位专业的加密货币日内交易员,只负责生成 short_term 信号。 -你的任务是基于 5m / 15m / 30m、当日开盘、VWAP、开盘区间、关键位和衍生品拥挤度,判断未来 30 分钟到 4 小时内是否存在可执行 setup。 +你的任务是基于 5m / 15m / 30m、当日开盘、VWAP、开盘区间、关键位、Fib 回撤位和衍生品拥挤度,判断未来 30 分钟到 4 小时内是否存在可执行 setup。 执行原则: 1. 先判断日内 regime:trending / ranging / neutral。 2. 趋势日内只做顺势回调或突破后的回踩确认,不追涨杀跌。 3. 震荡日内只做区间边界附近的反转,不在区间中部开仓。 4. 技术指标只做辅助,优先看结构、关键位、波动率、量能、VWAP 偏离和距离。 -5. 没有清晰止损、止盈和盈亏比就不交易。 -6. 本次分析独立进行,不参考任何上一轮信号。 +5. 优先使用“优先支撑 / 优先阻力”和“可交易多头区 / 可交易空头区”,普通支撑阻力只作补充。 +6. 没有清晰止损、止盈和盈亏比就不交易。 +7. 本次分析独立进行,不参考任何上一轮信号。 信号要求: 1. 只允许输出 0 或 1 个 short_term 信号。 2. 盈亏比至少 1:1.5。 3. 如果价格处于加速延伸,优先返回空信号。 4. 如果价格位于区间中部、离关键位太远、止损过宽或方向证据冲突,必须返回空信号。 -5. 只有在 setup 足够清晰时才允许输出信号;宁可空仓,不要勉强给单。 -6. entry_type: +5. 做多时,entry 应尽量靠近优先支撑或多头共振区;做空时,entry 应尽量靠近优先阻力或空头共振区。 +6. 只有在 setup 足够清晰时才允许输出信号;宁可空仓,不要勉强给单。 +7. entry_type: - 价格已回到关键位并出现确认,可用 market - 仍需等待回踩/反抽,使用 limit -7. grade / confidence 约束: +8. grade / confidence 约束: - A: 80-100,结构、位置、量价、时机都对齐 - B: 70-79,条件较完整但仍有一项次优 - C: 60-69,只有轻仓试错级别 - 60 以下不要输出交易信号 +9. 止损止盈距离下限: + - short_term 止损距离至少 0.6% + - short_term 止盈距离至少 1.0% 输出 JSON,禁止输出解释性正文: ```json @@ -99,7 +118,7 @@ class MarketSignalAnalyzer: TREND_ANALYSIS_PROMPT = """你是一位专业的加密货币趋势交易员,只负责生成 medium_term 信号。 -你的任务是基于 1h / 4h、关键位、趋势阶段、反转检测、衍生品拥挤度和新闻催化,判断未来 4 小时到 3 天内是否存在趋势 setup。 +你的任务是基于 1h / 4h、关键位、Fib 回撤/扩展位、趋势阶段、反转检测、衍生品拥挤度和新闻催化,判断未来 4 小时到 3 天内是否存在趋势 setup。 执行原则: 1. 4h 决定大方向,1h 决定节奏与入场位置。 @@ -110,6 +129,7 @@ class MarketSignalAnalyzer: 4. 趋势晚期、资金费率过热或价格过度偏离关键均线时,要显著降低开仓积极性。 5. 没有清晰位置优势就不交易。 6. 本次分析独立进行,不参考任何上一轮信号。 +7. 优先使用“优先支撑 / 优先阻力”和“可交易多头区 / 可交易空头区”,普通关键位只作补充。 信号要求: 1. 只允许输出 0 或 1 个 medium_term 信号。 @@ -118,11 +138,15 @@ class MarketSignalAnalyzer: 4. 反转信号必须比延续信号更严格。 5. 如果趋势处于晚期且没有回踩确认,或反转证据不足,必须返回空信号。 6. 只有在位置优势和方向一致性都充分时才允许开仓。 -7. grade / confidence 约束: +7. 趋势延续单的 entry 应优先靠近优先支撑/阻力或对应共振区,不在远离关键位的位置追价。 +8. grade / confidence 约束: - A: 82-100,4h/1h 同向且位置优 - B: 72-81,趋势或反转证据较完整 - C: 65-71,仅限早期确认不足的轻仓趋势尝试 - 65 以下不要输出交易信号 +9. 止损止盈距离下限: + - medium_term 止损距离至少 1.0% + - medium_term 止盈距离至少 2.0% 输出 JSON,禁止输出解释性正文: ```json @@ -255,7 +279,8 @@ class MarketSignalAnalyzer: range_zone = self._detect_range_zone(data) reversal_detection = self._detect_trend_reversal(data) trend_stage = self._detect_trend_stage(data) - key_levels = self._derive_key_levels(data, range_zone) + fib_context = self._build_fibonacci_context(data, current_price) + key_levels = self._derive_key_levels(data, range_zone, fib_context, current_price) snapshot_parts = [ f"## 市场快照", @@ -301,6 +326,19 @@ class MarketSignalAnalyzer: f"- 支撑位: {self._format_levels(key_levels.get('support'))}", f"- 阻力位: {self._format_levels(key_levels.get('resistance'))}", ] + if key_levels.get('support_priority'): + levels_parts.append(f"- 优先支撑: {self._format_level_priority(key_levels['support_priority'])}") + if key_levels.get('resistance_priority'): + levels_parts.append(f"- 优先阻力: {self._format_level_priority(key_levels['resistance_priority'])}") + if key_levels.get('best_long_zone'): + levels_parts.append(f"- 可交易多头区: {self._format_trade_zone(key_levels['best_long_zone'])}") + if key_levels.get('best_short_zone'): + levels_parts.append(f"- 可交易空头区: {self._format_trade_zone(key_levels['best_short_zone'])}") + + if fib_context.get('intraday'): + intraday_parts.append(f"- 日内Fib: {fib_context['intraday']}") + if fib_context.get('trend'): + trend_parts.append(f"- 趋势Fib: {fib_context['trend']}") if range_zone.get('is_ranging'): intraday_parts.append( @@ -493,37 +531,507 @@ class MarketSignalAnalyzer: return "全部中性" return "存在分歧" - def _derive_key_levels(self, data: Dict[str, pd.DataFrame], range_zone: Dict[str, Any]) -> Dict[str, List[float]]: + def _derive_key_levels(self, data: Dict[str, pd.DataFrame], range_zone: Dict[str, Any], + fib_context: Optional[Dict[str, Any]] = None, + current_price: Optional[float] = None) -> Dict[str, Any]: """提炼高价值支撑阻力位""" - supports: List[float] = [] - resistances: List[float] = [] + support_candidates: List[Dict[str, Any]] = [] + resistance_candidates: List[Dict[str, Any]] = [] if range_zone.get('support_level'): - supports.append(float(range_zone['support_level'])) + support_candidates.append(self._make_level_candidate(float(range_zone['support_level']), 1.2, "区间下沿")) if range_zone.get('resistance_level'): - resistances.append(float(range_zone['resistance_level'])) + resistance_candidates.append(self._make_level_candidate(float(range_zone['resistance_level']), 1.2, "区间上沿")) - for timeframe, count in [('30m', 20), ('1h', 20), ('4h', 12)]: + for timeframe, count, tf_weight in [('30m', 20, 1.0), ('1h', 20, 1.15), ('4h', 12, 1.3)]: df = data.get(timeframe) if df is None or len(df) < count: continue window = df.iloc[-count:] - supports.append(float(window['low'].min())) - resistances.append(float(window['high'].max())) + support_candidates.append(self._make_level_candidate(float(window['low'].min()), tf_weight, f"{timeframe}低点")) + resistance_candidates.append(self._make_level_candidate(float(window['high'].max()), tf_weight, f"{timeframe}高点")) ema20 = df['ema20'].iloc[-1] if 'ema20' in df.columns else None if pd.notna(ema20): if float(ema20) < float(df['close'].iloc[-1]): - supports.append(float(ema20)) + support_candidates.append(self._make_level_candidate(float(ema20), tf_weight * 0.9, f"{timeframe}EMA20")) else: - resistances.append(float(ema20)) + resistance_candidates.append(self._make_level_candidate(float(ema20), tf_weight * 0.9, f"{timeframe}EMA20")) + + fib_levels = fib_context or {} + for detail in fib_levels.get('support_details', []): + support_candidates.append( + self._make_level_candidate( + float(detail['price']), + self._score_fib_level(detail), + f"Fib{detail['ratio']:.3f}" + ) + ) + for detail in fib_levels.get('resistance_details', []): + resistance_candidates.append( + self._make_level_candidate( + float(detail['price']), + self._score_fib_level(detail), + f"Fib{detail['ratio']:.3f}" + ) + ) + + ranked_supports = self._rank_level_candidates(support_candidates, current_price, reverse=True) + ranked_resistances = self._rank_level_candidates(resistance_candidates, current_price, reverse=False) + best_long_zone = self._build_trade_zone(ranked_supports, current_price, "buy") + best_short_zone = self._build_trade_zone(ranked_resistances, current_price, "sell") return { - 'support': self._dedupe_levels(supports, reverse=True), - 'resistance': self._dedupe_levels(resistances, reverse=False), + 'support': self._sort_level_prices([item['price'] for item in ranked_supports[:3]], reverse=True), + 'resistance': self._sort_level_prices([item['price'] for item in ranked_resistances[:3]], reverse=False), + 'support_priority': ranked_supports[:3], + 'resistance_priority': ranked_resistances[:3], + 'best_long_zone': best_long_zone, + 'best_short_zone': best_short_zone, } + def _build_fibonacci_context(self, data: Dict[str, pd.DataFrame], current_price: float) -> Dict[str, Any]: + """提炼日内与趋势的 Fib 关键位,只保留对当前价格最有意义的层级""" + contexts = { + 'intraday': '', + 'trend': '', + 'support_levels': [], + 'resistance_levels': [], + 'support_details': [], + 'resistance_details': [], + } + + fib_specs = [ + ('30m', 48, 'intraday', '日内'), + ('4h', 60, 'trend', '趋势'), + ] + + for timeframe, lookback, key, label in fib_specs: + fib_result = self._calculate_fibonacci_levels(data.get(timeframe), current_price, lookback) + if not fib_result: + continue + + summary = ( + f"{label}波段 {fib_result['swing_low']:.2f}->{fib_result['swing_high']:.2f} | " + f"方向={fib_result['direction']} | " + f"支撑={self._format_fib_levels(fib_result.get('support_details'))} | " + f"阻力={self._format_fib_levels(fib_result.get('resistance_details'))}" + ) + if fib_result.get('confluence'): + summary += f" | 共振={fib_result['confluence']}" + if fib_result.get('trade_zone'): + summary += f" | 可交易区={fib_result['trade_zone']}" + + contexts[key] = summary + contexts['support_levels'].extend(fib_result.get('support_levels', [])) + contexts['resistance_levels'].extend(fib_result.get('resistance_levels', [])) + contexts['support_details'].extend(fib_result.get('support_details', [])) + contexts['resistance_details'].extend(fib_result.get('resistance_details', [])) + + contexts['support_levels'] = self._dedupe_levels(contexts['support_levels'], reverse=True) + contexts['resistance_levels'] = self._dedupe_levels(contexts['resistance_levels'], reverse=False) + return contexts + + def _calculate_fibonacci_levels(self, df: Optional[pd.DataFrame], current_price: float, + lookback: int = 60) -> Optional[Dict[str, Any]]: + """基于最近确认波段计算 Fib 回撤和扩展位""" + if df is None or df.empty or len(df) < max(lookback // 2, 20): + return None + + window = df.iloc[-lookback:].copy() + swing = self._select_recent_fib_swing(window, current_price) + + if swing: + pivot_high = float(swing['high']) + pivot_low = float(swing['low']) + is_upswing = swing['direction'] == 'up' + else: + pivot_high = float(window['high'].max()) + pivot_low = float(window['low'].min()) + high_idx = window['high'].idxmax() + low_idx = window['low'].idxmin() + is_upswing = low_idx < high_idx + + span = pivot_high - pivot_low + + if span <= 0 or pivot_low <= 0: + return None + + direction = 'up' if is_upswing else 'down' + retracement_ratios = [0.382, 0.5, 0.618, 0.786] + extension_ratios = [1.272, 1.618] + + levels: List[Dict[str, Any]] = [] + if is_upswing: + for ratio in retracement_ratios: + price = pivot_high - span * ratio + levels.append({'kind': 'retracement', 'ratio': ratio, 'price': price}) + for ratio in extension_ratios: + price = pivot_low + span * ratio + levels.append({'kind': 'extension', 'ratio': ratio, 'price': price}) + else: + for ratio in retracement_ratios: + price = pivot_low + span * ratio + levels.append({'kind': 'retracement', 'ratio': ratio, 'price': price}) + for ratio in extension_ratios: + price = pivot_high - span * ratio + levels.append({'kind': 'extension', 'ratio': ratio, 'price': price}) + + support_candidates = sorted( + [level for level in levels if level['price'] < current_price], + key=lambda item: item['price'], + reverse=True + ) + resistance_candidates = sorted( + [level for level in levels if level['price'] > current_price], + key=lambda item: item['price'] + ) + + if not support_candidates and not resistance_candidates: + nearest = sorted(levels, key=lambda item: abs(item['price'] - current_price))[:2] + support_candidates = sorted( + [level for level in nearest if level['price'] <= current_price], + key=lambda item: item['price'], + reverse=True + ) + resistance_candidates = sorted( + [level for level in nearest if level['price'] > current_price], + key=lambda item: item['price'] + ) + + support_details = self._serialize_fib_levels(support_candidates[:2], current_price) + resistance_details = self._serialize_fib_levels(resistance_candidates[:2], current_price) + support_levels = [level['price'] for level in support_details] + resistance_levels = [level['price'] for level in resistance_details] + + confluence_parts = [] + ema20 = window['ema20'].iloc[-1] if 'ema20' in window.columns else None + if pd.notna(ema20): + nearest_fib = min(levels, key=lambda item: abs(item['price'] - float(ema20))) + distance = abs(nearest_fib['price'] - float(ema20)) / float(ema20) + if distance <= 0.004: + confluence_parts.append(f"Fib{nearest_fib['ratio']:.3f}+EMA20") + + recent_high = float(window['high'].iloc[-12:].max()) + recent_low = float(window['low'].iloc[-12:].min()) + for ref_price, ref_name in [(recent_high, '前高'), (recent_low, '前低')]: + nearest_fib = min(levels, key=lambda item: abs(item['price'] - ref_price)) + distance = abs(nearest_fib['price'] - ref_price) / ref_price if ref_price > 0 else 1 + if distance <= 0.004: + confluence_parts.append(f"Fib{nearest_fib['ratio']:.3f}+{ref_name}") + + return { + 'direction': direction, + 'swing_high': pivot_high, + 'swing_low': pivot_low, + 'support_levels': support_levels, + 'resistance_levels': resistance_levels, + 'support_details': support_details, + 'resistance_details': resistance_details, + 'confluence': " / ".join(dict.fromkeys(confluence_parts)), + 'trade_zone': self._describe_fib_trade_zone(direction, support_details, resistance_details), + } + + def _select_recent_fib_swing(self, window: pd.DataFrame, current_price: float) -> Optional[Dict[str, Any]]: + """从最近确认 pivot 中选择一个有效波段用于 Fib""" + pivots = self._find_confirmed_pivots(window) + if len(pivots) < 2: + return None + + atr_pct = self._estimate_window_atr_pct(window, current_price) + min_span_pct = max(0.01 if current_price >= 100 else 0.015, atr_pct * 1.8) + ema20 = window['ema20'].iloc[-1] if 'ema20' in window.columns else None + current_close = float(window['close'].iloc[-1]) + best_candidate = None + best_score = float('-inf') + + for end_idx in range(len(pivots) - 1, 0, -1): + end_pivot = pivots[end_idx] + for start_idx in range(end_idx - 1, -1, -1): + start_pivot = pivots[start_idx] + if start_pivot['type'] == end_pivot['type']: + continue + + low_price = min(start_pivot['price'], end_pivot['price']) + high_price = max(start_pivot['price'], end_pivot['price']) + if low_price <= 0: + continue + + separation_bars = end_pivot['pos'] - start_pivot['pos'] + if separation_bars < self.FIB_MIN_PIVOT_SEPARATION_BARS: + continue + + span_pct = (high_price - low_price) / low_price + if span_pct < min_span_pct: + continue + + direction = 'up' if start_pivot['type'] == 'low' else 'down' + volume_score = self._pivot_volume_score(window, start_pivot['pos']) + self._pivot_volume_score(window, end_pivot['pos']) + direction_score = 0.0 + if pd.notna(ema20): + if direction == 'up' and current_close >= float(ema20): + direction_score = 0.4 + elif direction == 'down' and current_close <= float(ema20): + direction_score = 0.4 + + score = ( + (end_pivot['pos'] / max(len(window), 1)) * 1.8 + + min(span_pct / max(min_span_pct, 1e-6), 3.0) + + min(separation_bars / 12, 1.2) + + volume_score + + direction_score + ) + + candidate = { + 'low': low_price, + 'high': high_price, + 'direction': direction, + 'start_pos': start_pivot['pos'], + 'end_pos': end_pivot['pos'], + 'score': score, + } + + if score > best_score: + best_candidate = candidate + best_score = score + + return best_candidate + + def _find_confirmed_pivots(self, window: pd.DataFrame, left_bars: int = 2, right_bars: int = 2) -> List[Dict[str, Any]]: + """寻找确认过的 pivot high / low""" + pivots: List[Dict[str, Any]] = [] + if window is None or len(window) < left_bars + right_bars + 1: + return pivots + + highs = window['high'].tolist() + lows = window['low'].tolist() + + for pos in range(left_bars, len(window) - right_bars): + high = float(highs[pos]) + low = float(lows[pos]) + + left_highs = highs[pos - left_bars:pos] + right_highs = highs[pos + 1:pos + 1 + right_bars] + left_lows = lows[pos - left_bars:pos] + right_lows = lows[pos + 1:pos + 1 + right_bars] + + if all(high > value for value in left_highs) and all(high >= value for value in right_highs): + pivots.append({'pos': pos, 'type': 'high', 'price': high}) + + if all(low < value for value in left_lows) and all(low <= value for value in right_lows): + pivots.append({'pos': pos, 'type': 'low', 'price': low}) + + pivots.sort(key=lambda item: item['pos']) + return self._compress_adjacent_pivots(pivots) + + def _compress_adjacent_pivots(self, pivots: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """压缩相邻同类型 pivot,保留更极端的那个""" + compressed: List[Dict[str, Any]] = [] + for pivot in pivots: + if not compressed or compressed[-1]['type'] != pivot['type']: + compressed.append(pivot) + continue + + previous = compressed[-1] + if pivot['type'] == 'high' and pivot['price'] >= previous['price']: + compressed[-1] = pivot + elif pivot['type'] == 'low' and pivot['price'] <= previous['price']: + compressed[-1] = pivot + return compressed + + def _estimate_window_atr_pct(self, window: pd.DataFrame, current_price: float) -> float: + """估算当前窗口 ATR 百分比,用于过滤过小波段""" + if 'atr' in window.columns: + atr_series = window['atr'].dropna() + if not atr_series.empty and current_price > 0: + return float(atr_series.iloc[-1]) / current_price + return 0.006 + + def _pivot_volume_score(self, window: pd.DataFrame, pos: int) -> float: + """估算 pivot 附近的成交量强度""" + if 'volume' not in window.columns or pos >= len(window): + return 0.0 + + baseline_start = max(0, pos - self.FIB_PIVOT_VOLUME_LOOKBACK) + baseline = float(window['volume'].iloc[baseline_start:pos + 1].mean()) if pos > baseline_start else 0.0 + if baseline <= 0: + return 0.0 + + pivot_volume = float(window['volume'].iloc[pos]) + ratio = pivot_volume / baseline + if ratio >= 1.8: + return 0.5 + if ratio >= 1.3: + return 0.25 + return 0.0 + + def _serialize_fib_levels(self, levels: List[Dict[str, Any]], current_price: float) -> List[Dict[str, Any]]: + """把 Fib 层级转成带距离说明的结构""" + serialized: List[Dict[str, Any]] = [] + for level in levels: + price = float(level['price']) + distance_pct = abs(price - current_price) / current_price * 100 if current_price > 0 else 0 + serialized.append({ + 'ratio': float(level['ratio']), + 'price': price, + 'distance_pct': distance_pct, + 'kind': level.get('kind', 'retracement'), + }) + return serialized + + def _format_fib_levels(self, levels: Optional[List[Dict[str, Any]]]) -> str: + if not levels: + return "N/A" + return ", ".join( + f"{level['price']:.2f}({self._fib_kind_label(level.get('kind'))}Fib{level['ratio']:.3f}, {level['distance_pct']:.1f}%)" + for level in levels[:2] + ) + + def _fib_kind_label(self, kind: Optional[str]) -> str: + if kind == 'extension': + return "扩展" + return "回撤" + + def _describe_fib_trade_zone(self, direction: str, + support_details: List[Dict[str, Any]], + resistance_details: List[Dict[str, Any]]) -> str: + if direction == 'up' and support_details: + return f"顺势回踩 {support_details[0]['price']:.2f} 附近" + if direction == 'down' and resistance_details: + return f"顺势反抽 {resistance_details[0]['price']:.2f} 附近" + + nearest = support_details[:1] + resistance_details[:1] + if nearest: + return f"{nearest[0]['price']:.2f} 附近观察反应" + return "" + + def _make_level_candidate(self, price: float, score: float, source: str) -> Dict[str, Any]: + return { + 'price': float(price), + 'score': float(score), + 'sources': [source], + } + + def _score_fib_level(self, detail: Dict[str, Any]) -> float: + ratio = round(float(detail.get('ratio', 0)), 3) + kind = detail.get('kind', 'retracement') + + ratio_score = { + 0.382: 0.95, + 0.5: 1.05, + 0.618: 1.25, + 0.786: 1.1, + 1.272: 0.9, + 1.618: 0.95, + }.get(ratio, 0.85) + + if kind == 'extension': + ratio_score *= 0.95 + + distance_pct = float(detail.get('distance_pct', 0)) + if distance_pct <= 0.5: + ratio_score += 0.25 + elif distance_pct <= 1.0: + ratio_score += 0.15 + + return ratio_score + + def _rank_level_candidates(self, candidates: List[Dict[str, Any]], + current_price: Optional[float], + reverse: bool) -> List[Dict[str, Any]]: + """把多来源关键位聚合成带优先级的层级""" + if not candidates: + return [] + + sorted_candidates = sorted(candidates, key=lambda item: item['price'], reverse=reverse) + clusters: List[Dict[str, Any]] = [] + tolerance = 0.0035 + + for candidate in sorted_candidates: + price = candidate['price'] + matched_cluster = None + for cluster in clusters: + base_price = cluster['price'] + if base_price <= 0: + continue + if abs(price - base_price) / base_price <= tolerance: + matched_cluster = cluster + break + + if matched_cluster is None: + clusters.append({ + 'price': price, + 'score': candidate['score'], + 'sources': list(candidate['sources']), + 'count': 1, + }) + continue + + total_score = matched_cluster['score'] + candidate['score'] + matched_cluster['price'] = ( + matched_cluster['price'] * matched_cluster['score'] + price * candidate['score'] + ) / total_score + matched_cluster['score'] = total_score + matched_cluster['count'] += 1 + matched_cluster['sources'].extend(candidate['sources']) + + ranked = [] + for cluster in clusters: + unique_sources = list(dict.fromkeys(cluster['sources'])) + confluence_bonus = 0.35 * max(0, len(unique_sources) - 1) + distance_pct = abs(cluster['price'] - current_price) / current_price * 100 if current_price and current_price > 0 else 0 + ranked.append({ + 'price': float(cluster['price']), + 'score': round(cluster['score'] + confluence_bonus, 2), + 'sources': unique_sources[:3], + 'distance_pct': distance_pct, + }) + + ranked.sort(key=lambda item: (-item['score'], item['distance_pct'])) + return ranked + + def _format_level_priority(self, levels: List[Dict[str, Any]]) -> str: + if not levels: + return "N/A" + return " | ".join( + f"{level['price']:.2f}(强度{level['score']:.2f}, {'+'.join(level['sources'])})" + for level in levels[:3] + ) + + def _build_trade_zone(self, levels: List[Dict[str, Any]], current_price: Optional[float], + action: str) -> Optional[Dict[str, Any]]: + if not levels: + return None + + level = levels[0] + price = float(level['price']) + reference = current_price if current_price and current_price > 0 else price + raw_distance = float(level.get('distance_pct', 0) or 0) + band_pct = max(0.25, min(0.8, raw_distance * 0.35 if raw_distance > 0 else 0.4)) + half_band = price * band_pct / 100 + + return { + 'action': action, + 'center': price, + 'low': price - half_band, + 'high': price + half_band, + 'distance_pct': abs(price - reference) / reference * 100 if reference > 0 else 0, + 'score': level.get('score', 0), + 'sources': level.get('sources', []), + } + + def _format_trade_zone(self, zone: Optional[Dict[str, Any]]) -> str: + if not zone: + return "N/A" + return ( + f"{zone['low']:.2f}-{zone['high']:.2f} " + f"(中心 {zone['center']:.2f}, 强度{zone['score']:.2f}, {'+'.join(zone.get('sources', []))})" + ) + + def _sort_level_prices(self, prices: List[float], reverse: bool) -> List[float]: + return sorted([float(price) for price in prices if isinstance(price, (int, float))], reverse=reverse) + def _dedupe_levels(self, levels: List[float], reverse: bool) -> List[float]: """对价位去重,避免同类水平位过密""" cleaned = sorted([float(level) for level in levels if level], reverse=reverse) @@ -670,246 +1178,6 @@ class MarketSignalAnalyzer: return "\n".join(lines) - def _analyze_trend_position(self, data: Dict[str, pd.DataFrame]) -> str: - """分析趋势位置和日内交易机会(使用 EMA)+ 市场状态判断(震荡/趋势)""" - try: - df_30m = data.get('30m') - df_15m = data.get('15m') - df_1h = data.get('1h') - - if df_30m is None or len(df_30m) < 50: - return "" - - latest_30m = df_30m.iloc[-1] - current_price = float(latest_30m['close']) - - # 获取日内级别 EMA(30m) - ema5_30m = latest_30m.get('ma5') # 实际是 ema5 - ema10_30m = latest_30m.get('ma10') # 实际是 ema10 - ema20_30m = latest_30m.get('ma20') # 实际是 ema20 - - if not all([ema5_30m, ema10_30m, ema20_30m]): - return "" - - # ========== 新增:市场状态判断(震荡 vs 趋势) ========== - market_state = "unknown" - market_state_reason = [] - - # 1h EMA 趋势判断 - if df_1h is not None and len(df_1h) >= 20: - latest_1h = df_1h.iloc[-1] - ema5_1h = latest_1h.get('ma5') - ema10_1h = latest_1h.get('ma10') - ema20_1h = latest_1h.get('ma20') - - if ema5_1h and ema10_1h and ema20_1h: - # 1h EMA 多头/空头排列 → 趋势市 - if ema5_1h > ema10_1h > ema20_1h: - market_state = "trending" - market_state_reason.append("1h EMA 多头排列") - elif ema5_1h < ema10_1h < ema20_1h: - market_state = "trending" - market_state_reason.append("1h EMA 空头排列") - else: - market_state = "ranging" - market_state_reason.append("1h EMA 纠缠") - - # 波动率判断(ATR 变化) - if df_30m is not None and len(df_30m) >= 24 and 'atr' in df_30m.columns: - recent_atr = df_30m['atr'].iloc[-6:].mean() # 最近3小时 - older_atr = df_30m['atr'].iloc[-12:-6].mean() # 之前3小时 - - if pd.notna(recent_atr) and pd.notna(older_atr) and older_atr > 0: - atr_change = (recent_atr - older_atr) / older_atr * 100 - - if atr_change > 20: - if market_state != "trending": - market_state = "trending" - market_state_reason.append(f"ATR 扩张 {atr_change:.0f}%") - elif atr_change < -20: - if market_state != "ranging": - market_state = "ranging" - market_state_reason.append(f"ATR 收缩 {abs(atr_change):.0f}%") - - # 价格动量判断(15m) - if df_15m is not None and len(df_15m) >= 20: - recent_high = df_15m['high'].iloc[-20:].max() - recent_low = df_15m['low'].iloc[-20:].min() - price_range = (recent_high - recent_low) / current_price * 100 - - if price_range < 2.5: # 15分钟内波动小于2.5% → 震荡 - if market_state != "trending": - market_state = "ranging" - market_state_reason.append(f"15m 波动 {price_range:.1f}% 较小") - elif price_range > 4: # 15分钟内波动大于4% → 趋势 - if market_state != "ranging": - market_state = "trending" - market_state_reason.append(f"15m 波动 {price_range:.1f}% 较大") - - # 判断日内趋势(30m EMA 为主) - if ema5_30m > ema10_30m > ema20_30m: - intraday_trend = "上升" - intraday_emoji = "📈" - elif ema5_30m < ema10_30m < ema20_30m: - intraday_trend = "下跌" - intraday_emoji = "📉" - else: - intraday_trend = "震荡" - intraday_emoji = "➖" - - # 构建市场状态分析 - analysis_parts = [] - - # 市场状态显示(新增) - if market_state == "trending": - state_emoji = "📊" - state_text = f"{state_emoji} **市场状态: 趋势市**" - analysis_parts.append(state_text) - analysis_parts.append(f" 判断依据: {', '.join(market_state_reason)}") - analysis_parts.append(f" 策略: 跟随趋势,等待回调/反弹到 EMA20 顺势入场") - analysis_parts.append(f" 目标: 3-5%,盈亏比 ≥ 1:1.5") - analysis_parts.append(f" 严禁: 逆势做超短线") - elif market_state == "ranging": - state_emoji = "🔄" - state_text = f"{state_emoji} **市场状态: 震荡市**" - analysis_parts.append(state_text) - analysis_parts.append(f" 判断依据: {', '.join(market_state_reason)}") - analysis_parts.append(f" 策略: 5分钟级别高抛低吸,支撑位多、压力位空") - analysis_parts.append(f" 目标: 1-2%,盈亏比 ≥ 1:1.5") - analysis_parts.append(f" 严禁: 追涨杀跌") - else: - analysis_parts.append(f"⚠️ 市场状态: 不明确,观望为主") - - analysis_parts.append(f"") - analysis_parts.append(f"日内趋势(30m EMA): {intraday_emoji} {intraday_trend}") - - analysis = analysis_parts - - # 检查15分钟级别入场时机 - if df_15m is not None and len(df_15m) >= 20: - latest_15m = df_15m.iloc[-1] - rsi_15m = latest_15m.get('rsi', 50) - ema5_15m = latest_15m.get('ma5') # 实际是 ema5 - ema20_15m = latest_15m.get('ma20') # 实际是 ema20 - - # 检查短期动能 - if len(df_15m) >= 5: - recent_closes = df_15m['close'].iloc[-5:].values - is_accelerating = all(recent_closes[i] > recent_closes[i-1] for i in range(1, 5)) - # 检查连续大阳线/阴线(快速移动) - recent_changes = [(recent_closes[i] - recent_closes[i-1]) / recent_closes[i-1] * 100 - for i in range(1, len(recent_closes))] - big_moves = sum(1 for change in recent_changes if abs(change) > 0.3) - is_rapid_moving = big_moves >= 3 - avg_move = sum(abs(c) for c in recent_changes) / len(recent_changes) if recent_changes else 0 - else: - is_accelerating = False - is_rapid_moving = False - avg_move = 0 - - # 计算价格偏离 - if ema5_15m and ema20_15m: - deviation_ema5_15m = abs(current_price - ema5_15m) / ema5_15m * 100 - distance_to_ema20 = abs(current_price - ema20_15m) / ema20_15m * 100 - else: - deviation_ema5_15m = 0 - distance_to_ema20 = 0 - - # 检查成交量 - df_5m = data.get('5m') - volume_ratio = 1 - if df_5m is not None and len(df_5m) >= 20: - vol_latest = df_5m['volume'].iloc[-1] - vol_ma20 = df_5m['volume'].iloc[-20:-1].mean() - volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1 - - # 检查5m连续K线走势 - if len(df_5m) >= 3: - recent_5m_closes = df_5m['close'].iloc[-3:].values - recent_5m_changes = [(recent_5m_closes[i] - recent_5m_closes[i-1]) / recent_5m_closes[i-1] * 100 - for i in range(1, len(recent_5m_closes))] - big_5m_moves = sum(1 for change in recent_5m_changes if abs(change) > 0.3) - is_5m_accelerating = big_5m_moves >= 2 - else: - is_5m_accelerating = False - - # 日内过度延伸检查(EMA 反应更快,阈值更严格) - is_overextended = ( - (rsi_15m > 70 and intraday_trend == "上升") or - (rsi_15m < 30 and intraday_trend == "下跌") or - deviation_ema5_15m > 3 - ) - - if intraday_trend == "上升": - # 价格加速检查 - 强制观望,防止追涨 - if is_rapid_moving and volume_ratio > 1.5 and deviation_ema5_15m > 0.5: - analysis.append(f"⚠️ 15m: 价格正在快速上涨!连续{big_moves}根大阳线,平均涨幅{avg_move:.2f}%") - analysis.append(f" → 量比 {volume_ratio:.1f},偏离 EMA5 {deviation_ema5_15m:.1f}%") - analysis.append(f" → 🚨 **严禁追涨!强制 HOLD 观望**,等待回调后再考虑") - analysis.append(f" → 如果要入场,等待回调到 EMA20 支撑位用 limit 挂单") - analysis.append(f" → 追涨是持续止损的主要原因!") - elif is_overextended: - analysis.append(f"⚠️ 15m 过度延伸: RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") - analysis.append(f" → 不要追多,等待回调") - elif is_accelerating and not is_overextended: - analysis.append(f"15m: 正在上涨中,建议等待回调") - analysis.append(f" → 等待回调到 EMA20 支撑位用 limit 挂单做多") - analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") - elif distance_to_ema20 < 1: - analysis.append(f"15m: 回调到 EMA20 支撑位附近") - analysis.append(f" → 支撑位做多反弹(EMA20: ${ema20_15m:.0f})") - analysis.append(f" → 用 limit 挂单入场,止损1%,目标2-3%,盈亏比 >= 1:1.5") - else: - analysis.append(f"15m: 上涨中,耐心等待回调机会") - analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") - analysis.append(f" → 不要追多,等待回调到支撑位") - - elif intraday_trend == "下跌": - # 价格加速检查 - 强制观望,防止杀跌 - if is_rapid_moving and volume_ratio > 1.5 and deviation_ema5_15m > 0.5: - analysis.append(f"⚠️ 15m: 价格正在快速下跌!连续{big_moves}根大阴线,平均跌幅{avg_move:.2f}%") - analysis.append(f" → 量比 {volume_ratio:.1f},偏离 EMA5 {deviation_ema5_15m:.1f}%") - analysis.append(f" → 🚨 **严禁杀跌!强制 HOLD 观望**,等待反弹后再考虑") - analysis.append(f" → 如果要入场,等待反弹到 EMA20 压力位用 limit 挂单") - analysis.append(f" → 杀跌是持续止损的主要原因!") - elif is_overextended: - analysis.append(f"⚠️ 15m 过度延伸: RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") - analysis.append(f" → 不要追空,等待反弹") - elif is_accelerating and not is_overextended: - analysis.append(f"15m: 正在下跌中,建议等待反弹") - analysis.append(f" → 等待反弹到 EMA20 压力位用 limit 挂单做空") - analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") - elif distance_to_ema20 < 1: - analysis.append(f"15m: 反弹到 EMA20 压力位附近") - analysis.append(f" → 压力位做空回调(EMA20: ${ema20_15m:.0f})") - analysis.append(f" → 用 limit 挂单入场,止损1%,目标2-3%,盈亏比 >= 1:1.5") - else: - analysis.append(f"15m: 下跌中,耐心等待反弹机会") - analysis.append(f" → RSI {rsi_15m:.0f},偏离 EMA5 {deviation_ema5_15m:.1f}%") - analysis.append(f" → 不要追空,等待反弹到压力位") - - else: - analysis.append(f"15m: 震荡,观望或双向轻仓") - analysis.append(f" → 支撑位多,压力位空,盈亏比 >= 1:1.5") - - # 日内交易要点 - analysis.append(f"\n💡 稳健交易要点:") - analysis.append(f"- **90%用limit挂单,10%用market**:耐心等待回调,不要追涨杀跌") - analysis.append(f"- **价格加速时强制HOLD**:连续大阳/阴线时观望,等回调/反弹") - analysis.append(f"- **RSI极端区强制HOLD**:>70(多)或 <30(空)时不入场") - analysis.append(f"- **偏离EMA5>1.5%强制HOLD**:价格过度延伸,等待回归") - analysis.append(f"- **盈亏比第一**: 必须 >= 1:1.5,否则不开仓") - analysis.append(f"- **快进快出**: 持仓不超过4小时") - analysis.append(f"- **止损设置**: 优先 1.5×ATR(30m),参考范围 0.8-2.5%") - analysis.append(f"- **目标盈利**: 2-3%") - analysis.append(f"- **宁可错过,不做错**: 追涨杀跌是持续止损的主要原因") - - return "\n".join(analysis) if analysis else "" - - except Exception as e: - logger.warning(f"趋势位置分析失败: {e}") - return "" - def _build_analysis_prompt(self, symbol: str, lane: str, market_context: Dict[str, str], news_context: str, @@ -919,12 +1187,14 @@ class MarketSignalAnalyzer: lane_scope = ( [ "只根据下面提供的日内结构化特征做判断,不要脑补未提供的数据。", - "重点阅读 5m/15m/30m、当日开盘、VWAP、开盘区间、区间状态、关键位和衍生品过热程度。", + "重点阅读 5m/15m/30m、当日开盘、VWAP、开盘区间、区间状态、关键位、Fib 回撤位和衍生品过热程度。", + "优先参考“优先支撑/优先阻力”和“可交易多头区/可交易空头区”,不要在远离关键位的位置给 entry。", ] if lane == "intraday" else [ "只根据下面提供的趋势结构化特征做判断,不要脑补未提供的数据。", - "重点阅读 1h/4h、一致性、趋势阶段、反转检测、关键位、新闻催化和衍生品拥挤度。", + "重点阅读 1h/4h、一致性、趋势阶段、反转检测、关键位、Fib 回撤/扩展位、新闻催化和衍生品拥挤度。", + "优先参考“优先支撑/优先阻力”和“可交易多头区/可交易空头区”,趋势单必须体现位置优势,不接受远离关键位追价。", ] ) @@ -1040,6 +1310,10 @@ class MarketSignalAnalyzer: signal['entry_type'] = 'market' if not self._is_signal_price_structure_valid(signal): continue + if not self._meets_min_price_distance(signal, lane_type): + continue + if not self._meets_min_risk_reward(signal, lane_type): + continue signal['grade'] = self._infer_signal_grade(signal['confidence'], lane_type) if not signal.get('reasoning'): signal['reasoning'] = '结构与关键位共振' @@ -1079,157 +1353,54 @@ class MarketSignalAnalyzer: return take_profit < entry_price < stop_loss return False - def _analyze_multi_timeframe_trend(self, data: Dict[str, Any]) -> str: - """ - 多级别趋势分析 - 检测小级别反转信号 + def _meets_min_risk_reward(self, signal: Dict[str, Any], lane_type: str) -> bool: + """验证信号是否达到最小盈亏比要求""" + entry_price = signal.get('entry_price') + stop_loss = signal.get('stop_loss') + take_profit = signal.get('take_profit') + action = signal.get('action') - 目的:识别小级别(15m/30m)已经反转,但大级别(1h/4h)还未反应的情况 - 这样可以提前捕捉反转信号,而不是等待均线系统确认 - """ - context_parts = ["\n## 🔄 多级别趋势分析(检测反转信号)"] + if not all(isinstance(price, (int, float)) and price > 0 for price in [entry_price, stop_loss, take_profit]): + return False - # 定义各级别 - timeframes = { - '5m': ('超短线', 5), - '15m': ('短线', 15), - '30m': ('日内', 30), - '1h': ('小时', 60), - '4h': ('趋势', 240) - } - - trend_status = {} # 存储各级别趋势状态 - - # 分析各级别趋势 - for tf, (tf_name, minutes) in timeframes.items(): - df = data.get(tf) - if df is None or len(df) < 10: - continue - - latest = df.iloc[-1] - prev = df.iloc[-2] - - # 1. 均线趋势判断 - ma5 = latest.get('ma5', 0) - ma10 = latest.get('ma10', 0) - ma20 = latest.get('ma20', 0) - - ma_trend = None - if ma5 and ma10 and ma20: - if ma5 > ma10 > ma20: - ma_trend = 'bull' - elif ma5 < ma10 < ma20: - ma_trend = 'bear' - else: - ma_trend = 'neutral' - - # 2. MACD 趋势判断 - macd_trend = None - if 'macd' in df.columns and 'macd_signal' in df.columns: - macd = df['macd'].iloc[-1] - signal = df['macd_signal'].iloc[-1] - hist = df.get('macd_hist', pd.Series([0])).iloc[-1] - - if macd > 0 and signal > 0: - macd_trend = 'bull' - elif macd < 0 and signal < 0: - macd_trend = 'bear' - else: - macd_trend = 'neutral' - - # 3. 价格动量(最近3根K线) - close_3 = df['close'].iloc[-3] - close_2 = df['close'].iloc[-2] - close_1 = df['close'].iloc[-1] - - price_momentum = 'up' if close_1 > close_3 else 'down' if close_1 < close_3 else 'flat' - - # 综合判断趋势 - if ma_trend == 'bull' and (macd_trend == 'bull' or price_momentum == 'up'): - trend = 'bull' - elif ma_trend == 'bear' and (macd_trend == 'bear' or price_momentum == 'down'): - trend = 'bear' - elif price_momentum == 'up' and macd_trend == 'bull': - trend = 'bull' - elif price_momentum == 'down' and macd_trend == 'bear': - trend = 'bear' - else: - trend = 'neutral' - - trend_status[tf] = { - 'name': tf_name, - 'trend': trend, - 'ma_trend': ma_trend, - 'macd_trend': macd_trend, - 'momentum': price_momentum, - 'price': float(latest['close']), - 'change_3': ((close_1 - close_3) / close_3 * 100) if close_3 > 0 else 0 - } - - # 生成多级别趋势报告 - if not trend_status: - context_parts.append("⚠️ 数据不足,无法进行多级别分析") - return "\n".join(context_parts) - - # 检测反转信号 - reversal_signals = [] - - # 1. 小级别反转但大级别未反转 - if ('15m' in trend_status and '1h' in trend_status and - trend_status['15m']['trend'] != trend_status['1h']['trend'] and - trend_status['15m']['trend'] != 'neutral'): - - small_tf = trend_status['15m'] - large_tf = trend_status['1h'] - - reversal_type = "🔄 反转信号" if large_tf['trend'] != 'neutral' else "⚡ 启动信号" - - reversal_signals.append( - f"{reversal_type}: 15分钟[{small_tf['trend']}] vs 1小时[{large_tf['trend']}]" - ) - reversal_signals.append( - f" 15分钟变动: {small_tf['change_3']:+.2f}% | 价格: ${small_tf['price']:.2f}" - ) - - # 2. 30分钟反转但4小时未反转(更强的反转信号) - if ('30m' in trend_status and '4h' in trend_status and - trend_status['30m']['trend'] != trend_status['4h']['trend'] and - trend_status['30m']['trend'] != 'neutral'): - - small_tf = trend_status['30m'] - large_tf = trend_status['4h'] - - reversal_type = "🔄 强反转" if large_tf['trend'] != 'neutral' else "⚡ 趋势启动" - - reversal_signals.append( - f"{reversal_type}: 30分钟[{small_tf['trend']}] vs 4小时[{large_tf['trend']}]" - ) - reversal_signals.append( - f" 30分钟变动: {small_tf['change_3']:+.2f}% | 价格: ${small_tf['price']:.2f}" - ) - - # 添加各级别趋势详情 - context_parts.append("\n各级别趋势状态:") - for tf in ['5m', '15m', '30m', '1h', '4h']: - if tf in trend_status: - status = trend_status[tf] - trend_icon = {'bull': '📈', 'bear': '📉', 'neutral': '➡️'}.get(status['trend'], '❓') - context_parts.append( - f" {tf} ({status['name']}): {trend_icon} {status['trend']} " - f"| 动量: {status['change_3']:+.2f}% | 价格: ${status['price']:.2f}" - ) - - # 添加反转信号 - if reversal_signals: - context_parts.append("\n⚠️ 检测到级别背离/反转信号:") - context_parts.extend(reversal_signals) - context_parts.append("\n💡 提示: 小级别已反转但大级别滞后,可考虑:") - context_parts.append(" - 反手操作(平掉旧仓位,开新方向仓位)") - context_parts.append(" - 顺势短线(跟随小级别趋势,快进快出)") - context_parts.append(" - 等待大级别确认(避免假突破)") + if action == 'buy': + risk = entry_price - stop_loss + reward = take_profit - entry_price + elif action == 'sell': + risk = stop_loss - entry_price + reward = entry_price - take_profit else: - context_parts.append("\n✅ 各级别趋势一致,无反转信号") + return False - return "\n".join(context_parts) + if risk <= 0 or reward <= 0: + return False + + rr = reward / risk + return rr >= self.LANE_MIN_RISK_REWARD.get(lane_type, 1.5) + + def _meets_min_price_distance(self, signal: Dict[str, Any], lane_type: str) -> bool: + """验证止损和止盈的绝对距离是否符合对应周期""" + entry_price = signal.get('entry_price') + stop_loss = signal.get('stop_loss') + take_profit = signal.get('take_profit') + action = signal.get('action') + + if not all(isinstance(price, (int, float)) and price > 0 for price in [entry_price, stop_loss, take_profit]): + return False + + if action == 'buy': + stop_distance_pct = (entry_price - stop_loss) / entry_price * 100 + take_distance_pct = (take_profit - entry_price) / entry_price * 100 + elif action == 'sell': + stop_distance_pct = (stop_loss - entry_price) / entry_price * 100 + take_distance_pct = (entry_price - take_profit) / entry_price * 100 + else: + return False + + min_stop_pct = self.LANE_MIN_STOP_LOSS_PCT.get(lane_type, 0.6) + min_take_pct = self.LANE_MIN_TAKE_PROFIT_PCT.get(lane_type, 1.0) + + return stop_distance_pct >= min_stop_pct and take_distance_pct >= min_take_pct def _parse_llm_response(self, response: str, symbol: str) -> Dict[str, Any]: """解析 LLM 响应""" diff --git a/backend/app/crypto_agent/trading_decision_maker.py b/backend/app/crypto_agent/trading_decision_maker.py deleted file mode 100644 index 624881a..0000000 --- a/backend/app/crypto_agent/trading_decision_maker.py +++ /dev/null @@ -1,1291 +0,0 @@ -""" -交易决策器 - 基于市场信号和当前状态做出交易决策 - -职责: -1. 接收市场信号(不含仓位信息) -2. 接收当前持仓状态 -3. 接收账户状态 -4. 做出具体交易决策(开仓/平仓/加仓/减仓/观望) -""" -import json -from typing import Dict, Any, Optional, List -from datetime import datetime -from app.utils.logger import logger -from app.services.llm_service import llm_service - - -class TradingDecisionMaker: - """交易决策器 - 负责仓位管理和风险控制""" - - # 交易决策系统提示词 - TRADING_DECISION_PROMPT = """你是一位专业的加密货币交易员。你的核心职责是**仓位管理和风险控制**,而不是盲目开仓。 - -## 🎯 核心理念 -**日内交易:快进快出 + 盈亏比第一 + 严控风险** - -## 🚨 反转信号处理(最高优先级!) - -### 系统会检测以下反转信号: -1. **RSI 背离**:价格创新高/低但 RSI 不创新高/低(权重2) -2. **MACD 柱状图缩短**:动能衰竭信号(权重1) -3. **MACD 金叉/死叉**:趋势反转信号(权重1) -4. **量价背离**:价格上涨但成交量下降(权重1) -5. **关键K线形态**:吞没、锤子线、十字星(权重1-2) -6. **多周期趋势不一致**:小周期反转但大周期未反应(权重1) - -### 🔴 当检测到反转信号时(必须遵守!): - -**1. 如果有同方向持仓 → 强制平仓** - - 检测到看跌反转 + 有做多持仓 → **CLOSE(立即平仓)** - - 检测到看涨反转 + 有做空持仓 → **CLOSE(立即平仓)** - - 不要等待反弹/回调,反转可能很快发生 - -**2. 严禁继续原方向开新仓** - - 检测到反转信号后 → **停止原方向任何新操作** - - 之前做多,现在检测到看跌反转 → **严禁开新多单** - - 之前做空,现在检测到看涨反转 → **严禁开新空单** - -**3. 可以考虑反手操作(仅当反转置信度 ≥ 70%)** - - FLIP_POSITION:平掉旧仓位 + 开立新方向仓位 - - 或者先平仓观望,等待反转确认后再入场 - -**4. 如果不确定 → 先平仓观望** - - 宁可错过机会,也不要被套在反向位置 - -## 🌅 趋势阶段处理(避免晚期被套) - -### 系统会判断趋势处于哪个阶段: -- **早期**:刚突破关键位,均线刚开始排列,动能开始释放 -- **中期**:均线排列稳定,价格沿趋势移动,量能健康 -- **晚期**:价格过度延伸,RSI极端区,量价背离 - -### 不同阶段的处理规则: - -**✅ 早期阶段(可积极入场):** -- 可以顺势轻仓入场 -- 设置止损后可持有更长时间 -- 目标可看更大空间(3-5%) - -**✅ 中期阶段(稳健持仓):** -- 等待回调/反弹入场 -- 顺势持仓,让利润奔跑 -- 不要被小波动洗出 - -**🔴 晚期阶段(强制谨慎!):** -- **严禁追涨/追空开新仓** -- **现有盈利持仓建议逐步止盈** -- **等待明确反转信号后再决策** -- 宁可错过最后一段利润,也不要被套 - -### ⚠️ 铁律:趋势晚期 + 检测到反转信号 = 立即平仓 -- 晚期阶段本身风险就大 -- 如果再检测到反转信号 → 必须立即平仓 -- 不要幻想"最后一段利润" - -### 🚨 盈亏比铁律(违反即拒绝) -**所有交易必须满足盈亏比 ≥ 1:1.2,回调入场≥ 1:1.5** - -``` -盈亏比 = (目标盈利 - 入场价) / (入场价 - 止损价) - -做多:盈亏比 = (止盈价 - 入场价) / (入场价 - 止损价) -做空:盈亏比 = (入场价 - 止盈价) / (止损价 - 入场价) - -示例(突破追涨): -- BTC 做多:入场 65000,止损 64500(-0.77%),止盈 65970(+1.5%) -- 盈亏比 = (65970 - 65000) / (65000 - 64500) = 970 / 500 ≈ 1.94 ✅ - -示例(回调做多): -- BTC 做多:入场 64800(回调),止损 64300(-0.77%),止盈 65800(+1.54%) -- 盈亏比 = (65800 - 64800) / (64800 - 64300) = 1000 / 500 = 2.0 ✅ - -如果盈亏比 < 1:1.2,绝对不要开仓! -``` - -### 日内交易参数 -| 参数 | 设定值 | -|------|--------| -| 止损幅度 | 优先 1.5×ATR(30m),参考范围 0.8-2% | -| 目标盈利 | 2-3%(日内快速获利,达到即走) | -| 盈亏比要求 | ≥ 1:1.2(回调入场≥1:1.5) | -| 单笔持仓时长 | 不超过4小时,达到目标立即平仓 | -| 仓位大小 | 轻仓为主(light/micro),禁止heavy | - -## 决策流程(必须按顺序执行) - -### 第一步:检查现有仓位和挂单(最重要!) -在考虑任何新操作之前,先分析当前状态: - -1. **是否有相同方向的持仓?** - - 如果有 → 考虑是继续持有、加仓、还是减仓 - - 如果没有 → 进入下一步检查 - -2. **是否有反向挂单需要取消?** - - 如果新信号是 buy → 检查是否有 sell 挂单需要取消 - - 如果新信号是 sell → 检查是否有 buy 挂单需要取消 - -3. **是否有同向挂单?** - - 挂单价格是否合理?是否需要调整? - - 是否距离新信号价格太近(< 2%)? - -### 第二步:根据现有状态决策 - -#### 情况A:有相同方向持仓 + 新信号同向 -**默认选择:HOLD(继续持有)** - -**只有在信号非常强烈时才考虑以下操作:** - -**1. 加仓(ADD)** - 必须同时满足: -- ✅ 新信号是 **A级**(confidence >= 90) -- ✅ 当前持仓盈利 >= 1.5%(但 < 2.5%,达到目标应该平仓) -- ✅ 新信号价格距离持仓价格 >= 1.5% -- ✅ 趋势在加强(突破加速,不是延续) -- ✅ 有足够的可用杠杆空间 -- ❌ **日内交易达到目标盈利(2-3%)后立即平仓,不考虑加仓** - -**2. 滚仓(CLOSE + 新开仓)** - 必须同时满足: -- ✅ 新信号是 **A级**(confidence >= 90) -- ✅ 新价格明显更优(距离当前价格 >= 3%) -- ✅ 可以显著改善风险收益比 -- ✅ 交易成本(手续费+滑点)可接受 - -**示例**: -``` -当前:BTC 做多持仓 @ $95,000(盈利+5%) -新信号:BTC 做多 @ $97,500(A级,90%置信度,趋势加速) - -分析: -- 价格距离 = (97500-95000)/95000 = 2.63% >= 1.5% -- 持仓盈利 = 5% >= 1.5%(但 < 2.5%,未达平仓线) -- A级信号,趋势在加速 -- 决策:ADD(加仓) -- 理由:A级信号,趋势加速,持仓盈利中,价格距离合适 -``` - -**示例2 - 滚仓**: -``` -当前:BTC 做多持仓 @ $95,000(浮亏-1%) -新信号:BTC 做多 @ $92,000(A级,92%置信度,强支撑位) - -分析: -- 价格距离 = (95000-92000)/95000 = 3.16% >= 3% -- 新价格在强支撑位,可改善入场成本 -- A级信号(92%置信度) -- 决策:CLOSE 当前持仓 + OPEN 新仓位 -- 理由:滚仓至更优价格,改善风险收益比 -``` - -**❌ 严禁**: -- 价格距离 < 2% 时加仓 -- 持仓亏损时加仓(摊平成本是坏习惯) -- 信号不是A级时加仓 -- 信号不是A级时滚仓 - -#### 情况B:有相同方向持仓 + 新信号反向 -**优先选择**: -1. **CLOSE(平仓)** - 如果趋势反转明确 -2. **REDUCE(减仓)** - 如果趋势不明但需降低风险 -3. **HOLD(观望)** - 如果反转信号不强 - -**特殊情况:检测到多级别反转信号(优先级最高!)** -- 如果信号中包含 **"🔄 强反转"** 或 **"⚡ 趋势启动"** 标记 -- 说明小级别已反转但大级别滞后,这是**提前布局的机会** -- **优先选择:FLIP_POSITION(反手操作)** - - 平掉当前持仓 - - 开立新方向仓位(如果信号质量足够) - - 理由:抓住小级别反转机会,避免等待大级别确认而错失最佳点位 - -#### 情况C:无持仓 + 有同向挂单 -**优先选择:OPEN(新增挂单)- 金字塔式布局** - -**核心原则:允许合理的多挂单策略** -- 同方向可以最多有3个挂单(金字塔布局) -- 新挂单价格与现有挂单价格差异 > 1% -- 不同价格位分散风险,提高成交概率 - -**允许新增挂单的条件**: -- ✅ 当前同向挂单数量 < 3个 -- ✅ 新信号入场价与所有现有挂单价格差异 > 1% -- ✅ 新信号置信度 >= 60(C级以上) -- ✅ 价格没有在快速加速移动 - -**示例1:新增第2个挂单** -``` -当前:BTC 做多挂单1 @ $94,000 -新信号:BTC 做多 @ $92,700(B级,75%置信度) - -分析: -- 价格差异:(94000-92700)/94000 = 1.38% > 1% -- 挂单数量:1个 < 3个 -- 决策:OPEN(新增挂单2 @ $92,700) -- 理由:金字塔布局,在不同价位布置挂单 -``` - -**示例2:新增第3个挂单** -``` -当前:BTC 做多挂单1 @ $94,000,挂单2 @ $92,700 -新信号:BTC 做多 @ $91,500(B级,70%置信度) - -分析: -- 新价格与挂单2差异:(92700-91500)/92700 = 1.29% > 1% -- 挂单数量:2个 < 3个 -- 决策:OPEN(新增挂单3 @ $91,500) -- 理由:金字塔布局,继续分散挂单位置 -``` - -**示例3:达到3个挂单上限** -``` -当前:BTC 做多挂单1 @ $94,000,挂单2 @ $92,700,挂单3 @ $91,500 -新信号:BTC 做多 @ $90,800(B级,70%置信度) - -分析: -- 已有3个挂单,达到上限 -- 决策:HOLD -- 理由:同向挂单已达3个上限,不再新增 -``` - -**示例4:价格差异太小** -``` -当前:BTC 做多挂单1 @ $94,000 -新信号:BTC 做多 @ $93,200(价格差异仅0.85%) - -分析: -- 价格差异:(94000-93200)/94000 = 0.85% < 1% -- 太接近现有挂单,没有意义 -- 决策:HOLD -- 理由:新价格与现有挂单太近,不新增 -``` - -**⚠️ 例外情况(保持 HOLD)**: -- 价格正在快速加速移动(5m 连续大阳/阴线) -- 新信号置信度 < 60(D级信号,质量太低) -- 新信号入场价距离当前价格 >= 2%(追涨杀跌风险) -- 新信号是 market 入场(改成 market 去追) -- 价格差异 < 1%(太接近现有挂单) - -**❌ 严禁**: -- 同向挂单数量 >= 3个时继续新增 -- 价格差异 < 1% 时新增挂单 -- 取消挂单后市价追涨/杀跌 - -#### 情况D:无持仓 + 有反向挂单 -**优先选择**: -1. **CANCEL_PENDING(取消反向挂单)** -2. 然后根据新信号决定是否开新仓 - -#### 情况E:完全无持仓无挂单 -**这时才考虑开新仓(OPEN)** - -### 第三步:开新仓的规则 -**金字塔式挂单策略**: -- 无持仓无挂单:可以开新仓 -- 无持仓 + 同向挂单 < 3个:可以新增挂单(价格差异 > 1%) -- 无持仓 + 同向挂单 >= 3个:不再新增 -- 有持仓 + 无同向挂单:可以考虑加仓 -- 有持仓 + 有同向挂单:优先持仓,挂单次之 - -**开新仓条件**: -- 信号质量足够高(confidence >= 60) -- 可用杠杆空间充足 -- 价格和止损合理 -- 不在价格加速移动中 - -## 🚨 铁律(违反即拒绝) - -### 1. 金字塔挂单规则(同方向) -- **最多3个挂单**:同一标的同一方向最多允许3个挂单 -- **价格差异 > 1%**:新挂单与现有挂单价格差异必须 > 1% -- **挂单优先**:优先使用 limit 挂单,慎用 market 市价 -- **不要重复**:价格差异 < 1% 时不要新增挂单 -- **有持仓时**:优先考虑持仓管理,挂单次之 -- **价格距离 < 2% 时不加仓**:与持仓价格太近时不加仓 - -### 2. 趋势与信号一致性 -| 当前趋势 | 信号方向 | 允许操作 | -|---------|---------|---------| -| `uptrend` (上升) | buy (做多) | ✅ 允许 | -| `uptrend` (上升) | sell (做空) | ❌ 禁止(除非有多重反转信号 + confidence >= 90) | -| `downtrend` (下降) | sell (做空) | ✅ 允许 | -| `downtrend` (下降) | buy (做多) | ❌ 禁止(除非有多重反转信号 + confidence >= 90) | -| `neutral` (震荡) | buy/sell | ✅ 允许但轻仓 | - -### 3. 取消挂单规则 -- **只能取消反向挂单**:buy信号取消sell挂单,sell信号取消buy挂单 -- **绝不取消同向挂单**:buy信号不应取消buy挂单 -- **绝不取消挂单去市价追涨**:这是持续止损的主要原因 -- **只能取消当前交易对的挂单**:不要取消其他交易对的订单 - -### 4. 价格加速检测规则(防止追涨杀跌) -**以下情况强制 HOLD,禁止任何操作**: -- ❌ 信号入场价距离当前价格 >= 2%(价格正在快速移动) -- ❌ 15m RSI > 70(多)或 < 30(空)(极端区间) -- ❌ 价格偏离 EMA5 > 1.5%(过度延伸) -- ❌ 5m 连续2根以上大阳/阴线(加速中) - -**价格加速时的操作原则**: -- ✅ 有持仓:继续持有,考虑止盈 -- ✅ 有挂单:等待成交,不要调整 -- ❌ 无持仓无挂单:强制 HOLD,不要追涨杀跌 - -## 仓位大小规则(日内交易保守策略) - -### 信号等级决定仓位上限 -- **A级(85-100分)**:medium/light/micro 可选 -- **B级(60-84分)**:只能 light/micro -- **C级(40-59分)**:只能 micro -- **D级(<40分)**:不开仓 - -### 趋势强度调整仓位 -| 趋势 | 顺势仓位 | 逆势仓位 | -|-----|---------|---------| -| strong | 100% | 禁止 | -| medium | 80% | 禁止 | -| weak | 60% | 禁止 | -| neutral | 50% | 30% | - -### 具体保证金金额(已降低风险) -- **medium**:账户余额 × 4%(原6%) -- **light**:账户余额 × 2%(原3%) -- **micro**:账户余额 × 1%(原1.5%) -- **heavy**:❌ 日内交易禁用 - -### ⚠️ 最小保证金要求(Bitget 合约限制) -**不同币种的最小保证金要求(10x 杠杆下)**: -- **BTC** (0.01 BTC/张): 最小保证金 $85 -- **ETH** (0.1 ETH/张): 最小保证金 $35 -- **SOL** (1 SOL/张): 最小保证金 $14 -- **其他币种**: 参考合约规格,确保保证金 × 杠杆 ≥ 1 张合约价值 - -**计算规则**: -``` -最小保证金 = (合约规格 × 当前价格) ÷ 杠杆倍数 -``` - -**必须确保**:计算的保证金金额 ≥ 该币种的最小保证金,否则无法开仓! - -### ⚠️ 日内交易特别规则 -- 达到目标盈利(2-3%)**立即平仓**,不贪婪 -- 持仓超过3小时**考虑强制平仓** -- 亏损达到止损**立即平仓**,不幻想 - -## 输出格式 -```json -{ - "decision": "OPEN/CLOSE/ADD/REDUCE/CANCEL_PENDING/HOLD", - "action": "buy/sell", - "entry_type": "market/limit", // market=现价入场, limit=挂单等待 - "quantity": 保证金金额(USDT), - "entry_price": 入场价格, - "stop_loss": 止损价格, - "take_profit": 止盈价格, - "orders_to_close": ["order_id_1"], // CLOSE/REDUCE 时指定要平仓的订单ID - "orders_to_cancel": ["order_id_1"], // CANCEL_PENDING 时指定要取消的订单ID - "reasoning": "决策理由(必须说明当前持仓/挂单状态以及为什么选择这个操作)", - "risk_analysis": "风险分析" -} -``` - -**重要提示**: -- 当 `decision` 为 `CLOSE` 时,**必须**在 `orders_to_close` 中指定要平仓的订单ID列表 -- 当 `decision` 为 `CANCEL_PENDING` 时,**必须**在 `orders_to_cancel` 中指定要取消的订单ID列表 -- 同方向允许最多3个挂单(金字塔布局),价格差异需 > 1% -- 如果需要平仓所有持仓,`orders_to_close` 应包含所有持仓订单的ID - -## 决策示例 - -### 示例1:有持仓 + 同向信号 - 普通情况(HOLD) -``` -当前状态:BTC 做多持仓 @ $95,000(盈利+3%) -新信号:BTC 做多 @ $96,500(confidence 75%,B级) - -分析: -- 价格距离 = (96500-95000)/95000 = 1.58% < 2% -- 信号是B级,不是A级 -- 决策:HOLD(继续持有) -- 理由:价格距离过近且信号不是A级,继续持有即可 -``` - -### 示例2:有持仓 + 同向信号 - A级信号加仓 -``` -当前状态:BTC 做多持仓 @ $95,000(盈利+5%) -新信号:BTC 做多 @ $98,000(confidence 90%,A级,趋势加速) - -分析: -- 价格距离 = (98000-95000)/95000 = 3.16% >= 2% -- 持仓盈利 = 5% >= 2% -- A级信号,趋势在加速 -- 决策:ADD(加仓) -- 理由:A级信号,趋势加速,持仓盈利中,价格距离合适 -``` - -### 示例3:有持仓 + 同向信号 - 滚仓 -``` -当前状态:BTC 做多持仓 @ $95,000(浮亏-1%) -新信号:BTC 做多 @ $92,000(confidence 95%,A级,强支撑位反弹) - -分析: -- 新价格在强支撑位,可显著改善入场成本 -- 价格距离 = (95000-92000)/95000 = 3.16% >= 3% -- A级信号(95%置信度) -- 决策:CLOSE 当前持仓 + OPEN 新仓位 @ $92,000 -- 理由:滚仓至更优价格,改善风险收益比 -``` - -### 示例4:有持仓 + 反向信号 -``` -当前状态:BTC 做多持仓 @ $95,000(亏损-1%),订单ID: ord_123 -新信号:BTC 做空 @ $94,500(confidence 85%,趋势反转) - -分析: -- 趋势已明确反转 -- 决策:CLOSE(平仓止损) -- orders_to_close: ["ord_123"] -- 理由:趋势反转,及时止损 -``` - -### 示例5:有挂单 + 同向信号 - 金字塔布局(OPEN) -``` -当前状态:BTC 做多挂单1 @ $94,500(未成交),订单ID: ord_456 -新信号:BTC 做多 @ $93,500(confidence 75%,B级,回调至支撑位) - -分析: -- 同向挂单数量:1个 < 3个 -- 价格差异:(94500-93500)/94500 = 1.06% > 1% -- 满足金字塔布局条件 -- 决策:OPEN(新增挂单2) -- 理由:金字塔布局,在不同价位布置挂单,提高成交概率 -``` - -### 示例6:有挂单 + 同向信号 - 价格差异太小(HOLD) -``` -当前状态:BTC 做多挂单1 @ $94,500,挂单2 @ $93,500 -新信号:BTC 做多 @ $93,000(confidence 70%,B级) - -分析: -- 同向挂单数量:2个 < 3个 -- 新价格与挂单2差异:(93500-93000)/93500 = 0.53% < 1% -- 太接近现有挂单 -- 决策:HOLD -- 理由:新价格与现有挂单太近,不新增 -``` - -### 示例7:有挂单 + 同向信号 - 达到上限(HOLD) -``` -当前状态:BTC 做多挂单1 @ $94,500,挂单2 @ $93,500,挂单3 @ $92,500 -新信号:BTC 做多 @ $91,500(confidence 70%,B级) - -分析: -- 同向挂单数量:3个(已达上限) -- 决策:HOLD -- 理由:同向挂单已达3个上限,不再新增 -``` - -### 示例8:有挂单 + 同向信号 - 价格加速(HOLD) -``` -当前状态:BTC 做多挂单 @ $94,000(未成交) -新信号:BTC 做多 @ $97,000(confidence 85%,B级,突破) - -分析: -- 价格快速移动($94,000 → $97,000,涨幅3.2%) -- 5m 连续2根大阳线,正在加速 -- 新信号入场价距离当前价格 >= 2% -- 决策:HOLD(不要追涨) -- 理由:价格正在加速,禁止追涨杀跌,等待回调 -``` - -### 示例9:完全无持仓无挂单 -``` -当前状态:无持仓,无挂单 -新信号:BTC 做多 @ $95,000(confidence 80%,uptrend) - -分析: -- 满足开新仓的所有条件 -- 决策:OPEN(开仓) -- 理由:首次入场,信号质量高,趋势向好 -``` - -## 杠杆和风险控制 -- **最大杠杆 20 倍**:最大仓位金额 = 账户余额 × 20 -- **当前杠杆**:当前杠杆 = 当前持仓价值 / 账户余额 -- **可用杠杆空间百分比**:(最大仓位金额 - 当前持仓价值) / 最大仓位金额 × 100% -- **可用杠杆空间 >= 3%** 才能开新仓 - -## 输出格式要求 -```json -{ - "decision": "OPEN/CLOSE/ADD/REDUCE/CANCEL_PENDING/HOLD", - "action": "buy/sell", - "quantity": 保证金金额(USDT), - "entry_price": 入场价格, - "stop_loss": 止损价格, - "take_profit": 止盈价格, - "orders_to_close": ["order_id_1"], // CLOSE/REDUCE 时指定要平仓的订单ID - "orders_to_cancel": ["order_id_1"], // CANCEL_PENDING 时指定要取消的订单ID - "reasoning": "决策理由(必须说明当前持仓/挂单状态以及为什么选择这个操作)", - "risk_analysis": "风险分析" -} -``` - -**重要提示**: -- 当 `decision` 为 `CLOSE` 时,**必须**在 `orders_to_close` 中指定要平仓的订单ID列表 -- 当 `decision` 为 `CANCEL_PENDING` 时,**必须**在 `orders_to_cancel` 中指定要取消的订单ID列表 - -## 入场价格选择策略 - -- 使用信号中的 `entry_price` 作为入场价格 -- 如果选择 `limit`(挂单)方式,等待价格达到 `entry_price` 时成交 - -## 重要原则 -1. **仓位管理优先**:先管理现有仓位,再考虑开新仓 -2. **避免重复开仓**:同一标的同一方向最多1个持仓 + 1个挂单 -3. **安全第一**:宁可错过机会,也不要冒过大风险 -4. **遵守杠杆限制**:总杠杆永远不超过 20 倍 -5. **理性决策**:不要被 FOMO 情绪左右 - -记住:你是仓位管理者,不是信号执行器。你的首要任务是管理好现有仓位! -""" - - def __init__(self, leverage: int = 20, max_total_leverage: float = 10): - """ - 初始化交易决策器 - - Args: - leverage: 单笔订单杠杆倍数 - max_total_leverage: 总杠杆上限(持仓+挂单,倍数) - """ - self.leverage = leverage - self.max_total_leverage = max_total_leverage - - async def make_decision(self, - market_signal: Dict[str, Any], - positions: List[Dict[str, Any]], - account: Dict[str, Any], - current_price: float = None, - pending_orders: List[Dict[str, Any]] = None) -> Dict[str, Any]: - """ - 做出交易决策 - - Args: - market_signal: 市场信号(来自 MarketSignalAnalyzer) - positions: 当前持仓列表 - account: 账户状态 - current_price: 当前价格(用于判断入场方式) - pending_orders: 未成交的挂单列表 - - Returns: - 交易决策字典 - """ - try: - # 1. 准备决策上下文 - decision_context = self._prepare_decision_context( - market_signal, positions, account, current_price, pending_orders or [] - ) - - # 2. 构建提示词 - prompt = self._build_decision_prompt(decision_context) - - # 3. 调用 LLM 做决策 - messages = [ - {"role": "system", "content": self.TRADING_DECISION_PROMPT}, - {"role": "user", "content": prompt} - ] - response = await llm_service.achat(messages) - - # 4. 解析结果 - result = self._parse_decision_response(response, market_signal['symbol']) - - # 5. 验证决策安全性 - result = self._validate_decision( - result, positions, account, - pending_orders=pending_orders or [], - market_signal=market_signal - ) - - return result - - except Exception as e: - logger.error(f"交易决策失败: {e}") - import traceback - logger.debug(traceback.format_exc()) - return self._get_hold_decision(market_signal['symbol'], "决策系统异常") - - def _prepare_decision_context(self, - market_signal: Dict[str, Any], - positions: List[Dict[str, Any]], - account: Dict[str, Any], - current_price: float = None, - pending_orders: List[Dict[str, Any]] = None) -> Dict[str, Any]: - """准备决策上下文""" - context = { - 'symbol': market_signal.get('symbol'), - 'market_state': market_signal.get('market_state'), - 'trend': market_signal.get('trend'), - 'trend_direction': market_signal.get('trend_direction'), # 新增:趋势方向 - 'trend_strength': market_signal.get('trend_strength'), # 新增:趋势强度 - 'signals': market_signal.get('signals', []), - 'key_levels': market_signal.get('key_levels', {}), - 'positions': positions, - 'pending_orders': pending_orders or [], # 新增:挂单列表 - 'account': account, - 'current_price': current_price - } - - # 计算账户状态 - balance = float(account.get('current_balance', 0)) - total_position_value = float(account.get('total_position_value', 0)) - used_margin = float(account.get('used_margin', 0)) - - # 当前杠杆(使用配置的杠杆值) - max_leverage = self.leverage - max_position_value = balance * max_leverage # 最大仓位金额 - current_leverage = (total_position_value / balance) if balance > 0 else 0 - available_position_value = max(0, max_position_value - total_position_value) # 剩余可用仓位金额 - available_leverage_percent = (available_position_value / max_position_value * 100) if max_position_value > 0 else 0 # 可用杠杆空间百分比 - - context['leverage_info'] = { - 'balance': balance, - 'current_leverage': current_leverage, - 'total_position_value': total_position_value, - 'max_position_value': max_position_value, - 'available_position_value': available_position_value, - 'available_leverage_percent': available_leverage_percent, - 'max_leverage': max_leverage - } - - # 价格距离检查信息(用于 LLM 判断) - context['price_distance_check'] = { - 'enabled': True, - 'min_distance_percent': 2.0, # 最小价格距离 2% - 'no_exception': True # 没有例外情况 - } - - return context - - def _build_decision_prompt(self, context: Dict[str, Any]) -> str: - """构建决策提示词""" - prompt_parts = [] - - # ============================================================ - # 第一步:仓位管理决策流程摘要(最优先!) - # ============================================================ - prompt_parts.append("="*60) - prompt_parts.append("## 🎯 仓位管理决策流程(按顺序执行)") - prompt_parts.append("="*60) - - positions = context.get('positions', []) - pending_orders = context.get('pending_orders', []) - signals = context.get('signals', []) - - # 分析当前状态 - has_positions = len([p for p in positions if p.get('symbol') == context['symbol']]) > 0 - has_pending = len([o for o in pending_orders if o.get('symbol') == context['symbol']]) > 0 - - # 检查是否有强烈信号 - strong_signals = [s for s in signals if s.get('confidence', 0) >= 90] - has_strong_signal = len(strong_signals) > 0 - - if has_positions: - prompt_parts.append("📊 当前状态:**有持仓**") - prompt_parts.append("") - prompt_parts.append("决策优先级:") - prompt_parts.append("1️⃣ 首先检查是否需要平仓/减仓(信号反向或趋势减弱)") - if has_strong_signal: - prompt_parts.append("2️⃣ 然后检查是否需要加仓/滚仓(**有A级信号**,价格距离 >= 1.5%)") - prompt_parts.append(" ⭐ A级信号(confidence >= 90)可考虑加仓或滚仓") - else: - prompt_parts.append("2️⃣ 然后检查是否需要加仓(价格距离 >= 1.5%,盈利 >= 1.5%)") - prompt_parts.append(" ⚠️ 当前信号不是A级,不建议加仓") - prompt_parts.append("3️⃣ ❌ 不要开新仓(已有持仓时优先管理现有仓位)") - prompt_parts.append("") - prompt_parts.append("⚠️ 严禁重复开仓!在有持仓时只选择 HOLD/ADD/CLOSE/REDUCE") - - elif has_pending: - prompt_parts.append("📝 当前状态:**有挂单,无持仓**") - prompt_parts.append("") - prompt_parts.append("决策优先级:") - prompt_parts.append("1️⃣ 首先检查是否需要取消挂单(信号反向或价格不优)") - if has_strong_signal: - prompt_parts.append("2️⃣ 然后检查是否需要调整挂单或现价入场(**有A级信号**)") - prompt_parts.append(" ⭐ A级信号(confidence >= 90)可考虑取消挂单现价入场") - else: - prompt_parts.append("2️⃣ 然后检查是否需要调整挂单价格") - prompt_parts.append(" ⚠️ 当前信号不是A级,不建议调整挂单") - prompt_parts.append("3️⃣ ❌ 不要开新仓(已有挂单时等待成交或调整)") - prompt_parts.append("") - prompt_parts.append("⚠️ 严禁重复挂单!在有挂单时只选择 HOLD/CANCEL_PENDING") - - else: - prompt_parts.append("✨ 当前状态:**完全无持仓无挂单**") - prompt_parts.append("") - prompt_parts.append("决策优先级:") - prompt_parts.append("1️⃣ 这时才考虑开新仓(OPEN)") - prompt_parts.append("2️⃣ 必须满足所有开仓条件(信号质量、杠杆空间、价格合理)") - if not has_strong_signal: - prompt_parts.append(" ⚠️ 当前信号不是A级,建议轻仓") - prompt_parts.append("") - prompt_parts.append("✅ 可以开新仓,但必须谨慎评估") - - prompt_parts.append("") - prompt_parts.append("="*60) - prompt_parts.append("") - - # 市场信号 - prompt_parts.append(f"## 市场信号") - prompt_parts.append(f"交易对: {context['symbol']}") - prompt_parts.append(f"市场状态: {context.get('market_state')}") - - # 趋势信息(新增) - trend_direction = context.get('trend_direction', 'neutral') - trend_strength = context.get('trend_strength', 'weak') - direction_text = {'uptrend': '📈 上升趋势', 'downtrend': '📉 下降趋势', 'neutral': '➖ 震荡'}.get(trend_direction, trend_direction) - strength_text = {'strong': '强势', 'medium': '中等', 'weak': '弱势'}.get(trend_strength, trend_strength) - prompt_parts.append(f"趋势: {direction_text} ({strength_text})") - - # 当前价格(如果有) - current_price = context.get('current_price') - if current_price: - prompt_parts.append(f"当前价格: ${current_price:,.2f}") - - # 信号列表 - signals = context.get('signals', []) - if signals: - prompt_parts.append(f"\n## 信号列表") - for i, sig in enumerate(signals, 1): - # timeframe 是 short_term/medium_term/long_term - timeframe = sig.get('timeframe', 'N/A') - action = sig.get('action', 'N/A') - prompt_parts.append(f"{i}. {timeframe} | {action}") - prompt_parts.append(f" 信心度: {sig.get('confidence', 0)}") - - # 添加入场价格信息 - entry_price = sig.get('entry_price') - if entry_price: - prompt_parts.append(f" 建议入场价: ${entry_price:,.2f}") - - prompt_parts.append(f" 理由: {sig.get('reasoning', 'N/A')}") - - # 趋势一致性检查(新增) - trend_direction = context.get('trend_direction', 'neutral') - trend_strength = context.get('trend_strength', 'weak') - prompt_parts.append(f"\n## 🚨 趋势一致性检查(第一优先级)") - - for i, sig in enumerate(signals, 1): - action = sig.get('action', 'hold') - is_aligned = (trend_direction == 'uptrend' and action == 'buy') or \ - (trend_direction == 'downtrend' and action == 'sell') or \ - (trend_direction == 'neutral') - - if is_aligned: - prompt_parts.append(f"✅ 信号#{i} ({action}) 与趋势 ({trend_direction}) 一致 → 可正常开仓") - else: - # 逆势信号 - if trend_strength == 'strong': - prompt_parts.append(f"❌ 信号#{i} ({action}) 与强趋势 ({trend_direction}) 相反 → **严禁逆势,返回 HOLD**") - elif trend_strength == 'medium': - confidence = sig.get('confidence', 0) - if confidence >= 90: - prompt_parts.append(f"⚠️ 信号#{i} ({action}) 与中等趋势相反,但 confidence={confidence}>=85 → 可谨慎 micro 仓位") - else: - prompt_parts.append(f"❌ 信号#{i} ({action}) 与中等趋势相反,confidence不足 → 返回 HOLD") - else: # weak or neutral - confidence = sig.get('confidence', 0) - if confidence >= 90: - prompt_parts.append(f"⚠️ 信号#{i} ({action}) 与弱趋势相反,但 confidence={confidence}>=85 → 可 micro 仓位") - else: - prompt_parts.append(f"❌ 信号#{i} ({action}) 与弱趋势相反,confidence不足 → 返回 HOLD") - - # 关键价位 - key_levels = context.get('key_levels', {}) - if key_levels: - prompt_parts.append(f"\n## 关键价位") - if key_levels.get('support'): - # 提取数字并格式化 - import re - def extract_num(val): - if isinstance(val, (int, float)): - return float(val) - if isinstance(val, str): - match = re.search(r'[\d,]+\.?\d*', val.replace(',', '')) - if match: - return float(match.group()) - return None - - supports = [extract_num(s) for s in key_levels['support'][:3]] - supports_str = ', '.join([f"${s:,.2f}" for s in supports if s is not None]) - prompt_parts.append(f"支撑位: {supports_str}") - if key_levels.get('resistance'): - import re - def extract_num(val): - if isinstance(val, (int, float)): - return float(val) - if isinstance(val, str): - match = re.search(r'[\d,]+\.?\d*', val.replace(',', '')) - if match: - return float(match.group()) - return None - - resistances = [extract_num(r) for r in key_levels['resistance'][:3]] - resistances_str = ', '.join([f"${r:,.2f}" for r in resistances if r is not None]) - prompt_parts.append(f"阻力位: {resistances_str}") - - # 当前持仓 - positions = context.get('positions', []) - prompt_parts.append(f"\n## 当前持仓") - if positions: - for pos in positions: - if pos.get('holding', 0) > 0: - prompt_parts.append(f"- {pos.get('symbol')}: {pos.get('side')} {pos.get('holding')} USDT") - prompt_parts.append(f" 开仓价: ${pos.get('entry_price')}") - prompt_parts.append(f" 止损: ${pos.get('stop_loss')}") - prompt_parts.append(f" 止盈: ${pos.get('take_profit')}") - else: - prompt_parts.append("无持仓") - - # 当前挂单 - pending_orders = context.get('pending_orders', []) - prompt_parts.append(f"\n## 当前挂单(仅 {context['symbol']} 的挂单)") - if pending_orders: - prompt_parts.append(f"⚠️ 重要:以下挂单都属于当前交易对 {context['symbol']},取消订单时只能选择这些订单ID") - prompt_parts.append(f"⚠️ 取消规则:做空信号时只能取消做多(🟢long)挂单,做多信号时只能取消做空(🔴short)挂单") - - # 分类统计挂单方向 - long_orders = [o for o in pending_orders if o.get('side') == 'long'] - short_orders = [o for o in pending_orders if o.get('side') == 'short'] - - # 如果只有同向挂单,明确提示LLM - signals = context.get('signals', []) - if signals: - main_action = signals[0].get('action', 'hold') if signals else 'hold' - if main_action == 'sell' and not long_orders: - prompt_parts.append(f"📌 注意:当前只有做空挂单,与做空信号同向,无需取消!") - elif main_action == 'buy' and not short_orders: - prompt_parts.append(f"📌 注意:当前只有做多挂单,与做多信号同向,无需取消!") - - for order in pending_orders: - side_icon = "🟢" if order.get('side') == 'long' else "🔴" - entry_type = "现价单" if order.get('entry_type') == 'market' else "挂单" - side_text = "做多" if order.get('side') == 'long' else "做空" - prompt_parts.append(f"- {side_icon} {order.get('symbol')}: {side_text}({order.get('side')}) | {entry_type}") - prompt_parts.append(f" 挂单价: ${order.get('entry_price')} | 数量: {order.get('quantity')} USDT") - prompt_parts.append(f" 订单ID: {order.get('order_id')}") - else: - prompt_parts.append("无挂单") - - # 账户状态 - account = context.get('account', {}) - lev_info = context.get('leverage_info', {}) - prompt_parts.append(f"\n## 账户状态") - prompt_parts.append(f"余额: ${account.get('current_balance', 0):.2f}") - prompt_parts.append(f"可用: ${account.get('available', 0):.2f}") - prompt_parts.append(f"已用保证金: ${account.get('used_margin', 0):.2f}") - prompt_parts.append(f"持仓价值: ${account.get('total_position_value', 0):.2f}") - prompt_parts.append(f"\n## 杠杆信息") - prompt_parts.append(f"当前杠杆: {lev_info.get('current_leverage', 0):.1f}x") - prompt_parts.append(f"最大仓位金额: ${lev_info.get('max_position_value', 0):,.2f}") - prompt_parts.append(f"可用仓位金额: ${lev_info.get('available_position_value', 0):,.2f}") - prompt_parts.append(f"可用杠杆空间: {lev_info.get('available_leverage_percent', 0):.1f}%") - prompt_parts.append(f"最大杠杆限制: {lev_info.get('max_leverage', 20)}x") - - # 价格距离检查规则 - price_check = context.get('price_distance_check', {}) - if price_check.get('enabled'): - min_distance = price_check.get('min_distance_percent', 2) - prompt_parts.append(f"\n## 价格距离限制(必须遵守)") - prompt_parts.append(f"⚠️ 重要:如果有相同方向的持仓/挂单,价格距离必须 >= {min_distance}%") - prompt_parts.append(f"- 低于此距离不开新仓,避免风险过度集中") - prompt_parts.append(f"- 此规则**没有例外**,无论信号等级多高都必须遵守") - - # 价格加速检测规则(新增 - 防止追涨杀跌) - prompt_parts.append(f"\n## 🚨 价格加速检测(防止追涨杀跌)") - prompt_parts.append(f"**以下情况强制 HOLD,禁止任何新开仓操作**:") - prompt_parts.append(f"1. 信号入场价距离当前价格 >= 2% → 价格正在快速移动,不要追") - prompt_parts.append(f"2. 15m RSI > 70(多)或 < 30(空)→ 极端区间,不要追") - prompt_parts.append(f"3. 价格偏离 EMA5 > 1.5% → 过度延伸,不要追") - prompt_parts.append(f"4. 5m 连续2根以上大阳/阴线 → 加速中,不要追") - prompt_parts.append(f"") - prompt_parts.append(f"**记住:追涨杀跌是持续止损的主要原因!宁可错过,不做错!**") - - # 计算并显示当前价格距离 - current_price = context.get('current_price') - signals = context.get('signals', []) - positions = context.get('positions', []) - pending_orders = context.get('pending_orders', []) - - if signals and current_price: - for sig in signals: - sig_action = sig.get('action') - sig_entry = sig.get('entry_price') - - if sig_action and sig_entry: - try: - sig_entry = float(sig_entry) - prompt_parts.append(f"\n当前信号 {sig_action} @ ${sig_entry:,.2f} 的价格距离检查:") - - # 检查持仓 - for pos in positions: - if pos.get('symbol') == context['symbol']: - pos_side = pos.get('side') - if (sig_action == 'buy' and pos_side == 'long') or (sig_action == 'sell' and pos_side == 'short'): - pos_entry = float(pos.get('entry_price', 0)) - distance = abs(sig_entry - pos_entry) / pos_entry * 100 - status = "✅ 通过" if distance >= min_distance else f"❌ 拒绝 (距离 {distance:.2f}% < {min_distance}%)" - prompt_parts.append(f" - 持仓 {pos_side} @ ${pos_entry:,.2f}: 距离 {distance:.2f}% {status}") - - # 检查挂单 - for order in pending_orders: - if order.get('symbol') == context['symbol']: - order_side = order.get('side') - if (sig_action == 'buy' and order_side == 'long') or (sig_action == 'sell' and order_side == 'short'): - order_entry = float(order.get('entry_price', 0)) - distance = abs(sig_entry - order_entry) / order_entry * 100 - status = "✅ 通过" if distance >= min_distance else f"❌ 拒绝 (距离 {distance:.2f}% < {min_distance}%)" - prompt_parts.append(f" - 挂单 {order_side} @ ${order_entry:,.2f}: 距离 {distance:.2f}% {status}") - - except (ValueError, TypeError): - pass - - # 盈亏比检查规则(新增) - prompt_parts.append(f"\n## 盈亏比检查(日内交易铁律)") - prompt_parts.append(f"⚠️ 所有交易必须满足盈亏比 >= 1:1.2,回调入场>= 1:1.5") - prompt_parts.append(f"") - prompt_parts.append(f"盈亏比计算公式:") - prompt_parts.append(f" 做多盈亏比 = (止盈价 - 入场价) / (入场价 - 止损价)") - prompt_parts.append(f" 做空盈亏比 = (入场价 - 止盈价) / (止损价 - 入场价)") - prompt_parts.append(f"") - prompt_parts.append(f"⚠️ 如果盈亏比 < 1:1.2,**不要开仓(返回 HOLD)**") - - # 计算并显示盈亏比 - signals = context.get('signals', []) - if signals: - for sig in signals: - action = sig.get('action') - entry = sig.get('entry_price') - stop_loss = sig.get('stop_loss') - take_profit = sig.get('take_profit') - - if action and entry and stop_loss and take_profit: - try: - entry = float(entry) - stop_loss = float(stop_loss) - take_profit = float(take_profit) - - if action == 'buy': - risk_ratio = entry - stop_loss - reward_ratio = take_profit - entry - elif action == 'sell': - risk_ratio = stop_loss - entry - reward_ratio = entry - take_profit - else: - continue - - if risk_ratio > 0 and reward_ratio > 0: - rr_ratio = reward_ratio / risk_ratio - rr_percent = (risk_ratio / entry) * 100 - - if rr_ratio >= 1.5: - status = f"✅ 通过 (1:{rr_ratio:.1f})" - else: - status = f"❌ 拒绝 (1:{rr_ratio:.1f} < 1:1.2)" - - prompt_parts.append(f"\n信号 {action} @ ${entry:,.2f}:") - prompt_parts.append(f" - 止损: ${stop_loss:,.2f} (风险 {risk_ratio:.0f} / {rr_percent:.1f}%)") - prompt_parts.append(f" - 止盈: ${take_profit:,.2f} (盈利 {reward_ratio:.0f})") - prompt_parts.append(f" - 盈亏比: 1:{rr_ratio:.2f} {status}") - - except (ValueError, TypeError, ZeroDivisionError): - pass - - prompt_parts.append(f"\n请根据以上信息,做出交易决策。") - - return "\n".join(prompt_parts) - - def _parse_decision_response(self, response: str, symbol: str) -> Dict[str, Any]: - """解析决策响应""" - try: - import re - - # 尝试提取 JSON - json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response) - if json_match: - json_str = json_match.group(1) - else: - json_match = re.search(r'\{[\s\S]*\}', response) - if json_match: - json_str = json_match.group(0) - else: - raise ValueError("无法找到 JSON 响应") - - # 清理 JSON 字符串 - json_str = self._clean_json_string(json_str) - - result = json.loads(json_str) - - # 清理价格字段 - 转换为 float - result = self._clean_price_fields(result) - - # 添加元数据 - result['symbol'] = symbol - result['timestamp'] = datetime.now().isoformat() - result['raw_response'] = response - - logger.info(f"✅ 交易决策完成: {symbol} | {result.get('decision', 'HOLD')}") - - return result - - except Exception as e: - logger.warning(f"解析决策响应失败: {e}") - logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符 - return self._get_hold_decision(symbol, "解析失败,默认观望") - - def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: - """清理价格字段,转换为 float""" - def clean_price(price_value): - if price_value is None: - return None - if isinstance(price_value, (int, float)): - return float(price_value) - if isinstance(price_value, str): - # 移除 $ 符号和逗号 - cleaned = price_value.replace('$', '').replace(',', '').strip() - if cleaned: - try: - return float(cleaned) - except ValueError: - return None - return None - - # 清理顶层价格字段 - price_fields = ['stop_loss', 'take_profit', 'quantity'] - for field in price_fields: - if field in data: - data[field] = clean_price(data[field]) - - # 验证止损止盈价格的合理性 - data = self._validate_price_fields(data) - - return data - - def _validate_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: - """验证止损止盈价格的合理性,拒绝明显错误的值""" - entry = data.get('entry_price') - stop_loss = data.get('stop_loss') - take_profit = data.get('take_profit') - action = data.get('decision', '') # OPEN/CLOSE/HOLD - - if not entry or entry <= 0: - return data - - # 判断是做多还是做空 - is_long = action == 'OPEN' and data.get('action') == 'buy' - is_short = action == 'OPEN' and data.get('action') == 'sell' - - # 检查止损价格是否合理(偏离入场价不超过 50%) - MAX_REASONABLE_DEVIATION = 0.50 # 50% - - if stop_loss is not None: - deviation = abs(stop_loss - entry) / entry - # 如果止损价格偏离入场价超过 50%,认为是错误的 - if deviation > MAX_REASONABLE_DEVIATION: - logger.warning(f"⚠️ 止损价格不合理: entry={entry}, stop_loss={stop_loss}, 偏离={deviation*100:.1f}%,已忽略") - data['stop_loss'] = None - else: - # 做多:止损应该低于入场价 - if is_long and stop_loss >= entry: - logger.warning(f"⚠️ 做多止损错误: entry={entry}, stop_loss={stop_loss} 应该 < entry,已忽略") - data['stop_loss'] = None - # 做空:止损应该高于入场价 - elif is_short and stop_loss <= entry: - logger.warning(f"⚠️ 做空止损错误: entry={entry}, stop_loss={stop_loss} 应该 > entry,已忽略") - data['stop_loss'] = None - - if take_profit is not None: - deviation = abs(take_profit - entry) / entry - # 如果止盈价格偏离入场价超过 50%,认为是错误的 - if deviation > MAX_REASONABLE_DEVIATION: - logger.warning(f"⚠️ 止盈价格不合理: entry={entry}, take_profit={take_profit}, 偏离={deviation*100:.1f}%,已忽略") - data['take_profit'] = None - else: - # 做多:止盈应该高于入场价 - if is_long and take_profit <= entry: - logger.warning(f"⚠️ 做多止盈错误: entry={entry}, take_profit={take_profit} 应该 > entry,已忽略") - data['take_profit'] = None - # 做空:止盈应该低于入场价 - elif is_short and take_profit >= entry: - logger.warning(f"⚠️ 做空止盈错误: entry={entry}, take_profit={take_profit} 应该 < entry,已忽略") - data['take_profit'] = None - - return data - - def _clean_json_string(self, json_str: str) -> str: - """清理 JSON 字符串,移除可能导致解析错误的内容""" - import re - # 移除单行注释 // ... - json_str = re.sub(r'//.*?(?=\n|$)', '', json_str) - # 移除多行注释 /* ... */ - json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str) - # 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}) - json_str = re.sub(r',\s*([}\]])', r'\1', json_str) - return json_str - - def _validate_decision(self, decision: Dict[str, Any], - positions: List[Dict[str, Any]], - account: Dict[str, Any], - pending_orders: List[Dict[str, Any]] = None, - market_signal: Dict[str, Any] = None) -> Dict[str, Any]: - """验证决策安全性""" - # 检查杠杆限制 - if decision.get('decision') in ['OPEN', 'ADD']: - balance = float(account.get('current_balance', 0)) - total_position_value = float(account.get('total_position_value', 0)) - max_leverage = self.leverage # 使用配置的杠杆值 - max_position_value = balance * max_leverage - - # quantity 是保证金金额,需要乘以杠杆得到持仓价值 - margin = float(decision.get('quantity', 0)) - position_value = margin * max_leverage # 使用最大杠杆计算持仓价值 - new_total_value = total_position_value + position_value - - if new_total_value > max_position_value: - logger.warning(f"⚠️ 决策被拒绝: 超过最大仓位金额 (保证金 ${margin:.2f} → 持仓价值 ${position_value:.2f}, 总计 ${new_total_value:,.2f} > ${max_position_value:,.2f})") - return self._get_hold_decision( - decision['symbol'], - f"超过最大仓位金额 (保证金 ${margin:.2f} → 持仓价值 ${position_value:.2f}, 总计 ${new_total_value:,.2f} > ${max_position_value:,.2f})" - ) - - # 最小保证金检查(Bitget 合约限制) - # 不同币种的最小保证金要求(10x 杠杆下) - MIN_MARGIN_REQUIREMENTS = { - 'BTC': 85, # 0.01 BTC/张 ≈ $850 - 'ETH': 35, # 0.1 ETH/张 ≈ $350 - 'SOL': 14, # 1 SOL/张 ≈ $140 - 'BNB': 7, # 0.1 BNB/张 ≈ $70 - 'XRP': 10, # 10 XRP/张 ≈ $100 - 'DOGE': 8, # 100 DOGE/张 ≈ $80 - 'ADA': 8, # 10 ADA/张 ≈ $80 (估计) - 'AVAX': 10, # 1 AVAX/张 ≈ $100 - 'LINK': 8, # 1 LINK/张 ≈ $80 - 'DOT': 5, # 1 DOT/张 ≈ $50 - 'MATIC': 8, # 10 MATIC/张 ≈ $80 - 'POL': 8, # 10 POL/张 ≈ $80 - } - - symbol = decision.get('symbol', '').replace('USDT', '').upper() - min_margin = MIN_MARGIN_REQUIREMENTS.get(symbol, 10) # 默认最小 $10 - - if margin > 0 and margin < min_margin: - logger.warning(f"⚠️ {symbol} 保证金不足: ${margin:.2f} < 最小要求 ${min_margin:.2f}") - logger.info(f" 自动调整保证金: ${margin:.2f} → ${min_margin:.2f}") - decision['quantity'] = min_margin - logger.info(f" ✅ 保证金已调整为最小值: ${min_margin:.2f}") - - # 盈亏比检查:所有交易必须满足盈亏比 >= 1:1.2 - action = decision.get('action', '') - entry_price = decision.get('entry_price') - stop_loss = decision.get('stop_loss') - take_profit = decision.get('take_profit') - - if action and entry_price and stop_loss and take_profit: - try: - entry_price = float(entry_price) - stop_loss = float(stop_loss) - take_profit = float(take_profit) - - # 计算盈亏比 - if action == 'buy': - risk = entry_price - stop_loss - reward = take_profit - entry_price - elif action == 'sell': - risk = stop_loss - entry_price - reward = entry_price - take_profit - else: - risk = 0 - reward = 0 - - # 验证价格方向正确性 - if action == 'buy': - if stop_loss >= entry_price: - logger.warning(f"⚠️ 决策被拒绝: 做多止损错误 (entry={entry_price}, stop_loss={stop_loss} 应该 < entry)") - return self._get_hold_decision(decision['symbol'], f"做多止损价格错误") - if take_profit <= entry_price: - logger.warning(f"⚠️ 决策被拒绝: 做多止盈错误 (entry={entry_price}, take_profit={take_profit} 应该 > entry)") - return self._get_hold_decision(decision['symbol'], f"做多止盈价格错误") - elif action == 'sell': - if stop_loss <= entry_price: - logger.warning(f"⚠️ 决策被拒绝: 做空止损错误 (entry={entry_price}, stop_loss={stop_loss} 应该 > entry)") - return self._get_hold_decision(decision['symbol'], f"做空止损价格错误") - if take_profit >= entry_price: - logger.warning(f"⚠️ 决策被拒绝: 做空止盈错误 (entry={entry_price}, take_profit={take_profit} 应该 < entry)") - return self._get_hold_decision(decision['symbol'], f"做空止盈价格错误") - - # 检查盈亏比 - if risk > 0 and reward > 0: - rr_ratio = reward / risk - min_rr_ratio = 1.2 # 最小盈亏比 1:1.2 - - if rr_ratio < min_rr_ratio: - logger.warning(f"⚠️ 决策被拒绝: 盈亏比不足 (1:{rr_ratio:.2f} < 1:{min_rr_ratio:.1f})") - logger.warning(f" entry={entry_price}, stop_loss={stop_loss}, take_profit={take_profit}") - logger.warning(f" 风险={risk:.0f}, 盈利={reward:.0f}, 盈亏比=1:{rr_ratio:.2f}") - return self._get_hold_decision( - decision['symbol'], - f"盈亏比不足 (1:{rr_ratio:.2f} < 1:{min_rr_ratio:.1f})" - ) - - except (ValueError, TypeError, ZeroDivisionError) as e: - logger.warning(f"盈亏比检查失败: {e}") - - # 价格距离检查:相同方向相同标的的挂单,价格距离 < 2% 时不加仓/开仓 - action = decision.get('action', '') - new_entry_price = decision.get('entry_price') - - if action and new_entry_price: - try: - new_entry_price = float(new_entry_price) - min_distance_percent = 2.0 - - # 检查持仓 - for pos in positions or []: - if pos.get('symbol') == decision.get('symbol'): - pos_side = pos.get('side', '') # 'long' or 'short' - pos_entry = float(pos.get('entry_price', 0)) - - # 相同方向的持仓 - if (action == 'buy' and pos_side == 'long') or (action == 'sell' and pos_side == 'short'): - distance_percent = abs(new_entry_price - pos_entry) / pos_entry * 100 - if distance_percent < min_distance_percent: - logger.warning(f"⚠️ 决策被拒绝: 价格距离过近 (新价格 ${new_entry_price:,.2f} vs 持仓 ${pos_entry:,.2f}, 距离 {distance_percent:.2f}% < {min_distance_percent}%)") - return self._get_hold_decision( - decision['symbol'], - f"价格距离持仓过近 (新价格 ${new_entry_price:,.2f} vs 持仓 ${pos_entry:,.2f}, 距离 {distance_percent:.2f}% < {min_distance_percent}%)" - ) - - # 检查挂单 - for order in pending_orders or []: - if order.get('symbol') == decision.get('symbol'): - order_side = order.get('side', '') - order_entry = float(order.get('entry_price', 0)) - - # 相同方向的挂单 - if (action == 'buy' and order_side == 'long') or (action == 'sell' and order_side == 'short'): - distance_percent = abs(new_entry_price - order_entry) / order_entry * 100 - if distance_percent < min_distance_percent: - logger.warning(f"⚠️ 决策被拒绝: 价格距离过近 (新价格 ${new_entry_price:,.2f} vs 挂单 ${order_entry:,.2f}, 距离 {distance_percent:.2f}% < {min_distance_percent}%)") - return self._get_hold_decision( - decision['symbol'], - f"价格距离挂单过近 (新价格 ${new_entry_price:,.2f} vs 挂单 ${order_entry:,.2f}, 距离 {distance_percent:.2f}% < {min_distance_percent}%)" - ) - - except (ValueError, TypeError) as e: - logger.warning(f"价格距离检查失败: {e}") - - return decision - - def _get_hold_decision(self, symbol: str, reason: str = "") -> Dict[str, Any]: - """返回观望决策""" - return { - 'decision': 'HOLD', - 'symbol': symbol, - 'action': 'hold', - 'reasoning': f'观望: {reason}', - 'timestamp': datetime.now().isoformat() - } diff --git a/backend/app/services/paper_trading_service.py b/backend/app/services/paper_trading_service.py index 0faee03..388f266 100644 --- a/backend/app/services/paper_trading_service.py +++ b/backend/app/services/paper_trading_service.py @@ -2211,7 +2211,17 @@ class PaperTradingService: def _get_current_price(self, symbol: str) -> float: """获取交易对当前价格""" try: + from app.services.price_monitor_service import get_price_monitor_service from app.services.bitget_service import bitget_service + + cached_price = get_price_monitor_service().get_latest_price(symbol) + if cached_price: + return float(cached_price) + + price = bitget_service.get_current_price(symbol) + if price: + return float(price) + ticker = bitget_service.get_ticker(symbol) if ticker and 'lastPrice' in ticker: return float(ticker['lastPrice']) diff --git a/backend/tests/test_market_signal_analyzer_lane_rules.py b/backend/tests/test_market_signal_analyzer_lane_rules.py new file mode 100644 index 0000000..b2c605d --- /dev/null +++ b/backend/tests/test_market_signal_analyzer_lane_rules.py @@ -0,0 +1,177 @@ +""" +MarketSignalAnalyzer 回归测试 + +覆盖重点: + - 关键位聚合后保留优先级,同时数值数组继续按价格顺序返回 + - lane 级别的最小盈亏比和止盈止损距离门槛 + - Fib 摘要与可交易区输出 +""" +import importlib.util +import sys +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pandas as pd + + +def load_market_signal_analyzer_class(): + analyzer_path = Path(__file__).resolve().parents[1] / "app" / "crypto_agent" / "market_signal_analyzer.py" + + if "app" not in sys.modules: + app_pkg = types.ModuleType("app") + app_pkg.__path__ = [str(analyzer_path.parents[2] / "app")] + sys.modules["app"] = app_pkg + + if "app.services" not in sys.modules: + services_pkg = types.ModuleType("app.services") + services_pkg.__path__ = [str(analyzer_path.parents[1] / "services")] + sys.modules["app.services"] = services_pkg + + if "app.crypto_agent" not in sys.modules: + crypto_pkg = types.ModuleType("app.crypto_agent") + crypto_pkg.__path__ = [str(analyzer_path.parent)] + sys.modules["app.crypto_agent"] = crypto_pkg + + if "app.utils" not in sys.modules: + utils_pkg = types.ModuleType("app.utils") + utils_pkg.__path__ = [str(analyzer_path.parents[1] / "utils")] + sys.modules["app.utils"] = utils_pkg + + logger_module = types.ModuleType("app.utils.logger") + logger_module.logger = MagicMock() + sys.modules["app.utils.logger"] = logger_module + + llm_module = types.ModuleType("app.services.llm_service") + llm_module.llm_service = MagicMock() + sys.modules["app.services.llm_service"] = llm_module + + news_module = types.ModuleType("app.services.news_service") + news_module.get_news_service = MagicMock(return_value=MagicMock()) + sys.modules["app.services.news_service"] = news_module + + bitget_module = types.ModuleType("app.services.bitget_service") + bitget_module.bitget_service = MagicMock() + sys.modules["app.services.bitget_service"] = bitget_module + + module_name = "app.crypto_agent.market_signal_analyzer_test" + spec = importlib.util.spec_from_file_location(module_name, analyzer_path) + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + return module.MarketSignalAnalyzer + + +def make_analyzer(): + return load_market_signal_analyzer_class()() + + +def make_frame(closes, lows, highs, ema20, atr=10.0): + return pd.DataFrame( + { + "close": closes, + "low": lows, + "high": highs, + "ema20": [ema20] * len(closes), + "atr": [atr] * len(closes), + "volume": [1000 + i * 10 for i in range(len(closes))], + } + ) + + +def test_derive_key_levels_keeps_numeric_arrays_price_sorted_and_builds_trade_zones(): + analyzer = make_analyzer() + data = { + "30m": make_frame( + closes=[100, 101, 102, 103, 104, 105, 106, 104, 103, 102, 101, 100, 99, 100, 101, 102, 103, 104, 105, 106], + lows=[98, 99, 100, 101, 102, 103, 104, 102, 101, 100, 99, 98, 97, 98, 99, 100, 101, 102, 103, 104], + highs=[101, 102, 103, 104, 105, 106, 107, 105, 104, 103, 102, 101, 100, 101, 102, 103, 104, 105, 106, 107], + ema20=102, + ), + "1h": make_frame( + closes=[110 + i for i in range(20)], + lows=[108 + i for i in range(20)], + highs=[111 + i for i in range(20)], + ema20=118, + ), + "4h": make_frame( + closes=[120 + i for i in range(12)], + lows=[118 + i for i in range(12)], + highs=[121 + i for i in range(12)], + ema20=126, + ), + } + fib_context = { + "support_details": [ + {"price": 124.2, "ratio": 0.618, "distance_pct": 0.4, "kind": "retracement"}, + {"price": 122.8, "ratio": 0.5, "distance_pct": 1.1, "kind": "retracement"}, + ], + "resistance_details": [ + {"price": 131.4, "ratio": 1.272, "distance_pct": 1.0, "kind": "extension"}, + {"price": 133.1, "ratio": 1.618, "distance_pct": 2.3, "kind": "extension"}, + ], + } + + levels = analyzer._derive_key_levels( + data=data, + range_zone={"support_level": 123.6, "resistance_level": 130.2}, + fib_context=fib_context, + current_price=128.0, + ) + + assert levels["support"] == sorted(levels["support"], reverse=True) + assert levels["resistance"] == sorted(levels["resistance"]) + assert levels["best_long_zone"]["center"] == levels["support_priority"][0]["price"] + assert levels["best_short_zone"]["center"] == levels["resistance_priority"][0]["price"] + assert levels["best_long_zone"]["low"] < levels["best_long_zone"]["high"] + assert any("Fib0.618" in item["sources"] for item in levels["support_priority"]) + + +def test_lane_specific_risk_reward_and_distance_thresholds(): + analyzer = make_analyzer() + signal = { + "action": "sell", + "entry_price": 100.0, + "stop_loss": 101.0, + "take_profit": 98.0, + } + + assert analyzer._meets_min_risk_reward(signal, "short_term") is True + assert analyzer._meets_min_risk_reward(signal, "medium_term") is True + assert analyzer._meets_min_price_distance(signal, "short_term") is True + assert analyzer._meets_min_price_distance(signal, "medium_term") is True + + tighter_signal = { + "action": "sell", + "entry_price": 100.0, + "stop_loss": 101.0, + "take_profit": 98.4, + } + + assert analyzer._meets_min_risk_reward(tighter_signal, "short_term") is True + assert analyzer._meets_min_risk_reward(tighter_signal, "medium_term") is False + assert analyzer._meets_min_price_distance(tighter_signal, "short_term") is True + assert analyzer._meets_min_price_distance(tighter_signal, "medium_term") is False + + +def test_fibonacci_context_marks_kind_and_trade_zone(): + analyzer = make_analyzer() + df = pd.DataFrame( + { + "close": [100, 102, 105, 108, 112, 116, 120, 118, 116, 114, 112, 110, 108, 106, 104, 102, 101, 100, 99, 98], + "high": [101, 103, 106, 109, 113, 117, 121, 119, 117, 115, 113, 111, 109, 107, 105, 103, 102, 101, 100, 99], + "low": [99, 101, 104, 107, 111, 115, 119, 117, 115, 113, 111, 109, 107, 105, 103, 101, 100, 99, 98, 97], + "ema20": [108] * 20, + "atr": [2.5] * 20, + "volume": [1000, 1100, 1300, 1500, 1700, 1900, 2100, 1800, 1600, 1500, 1400, 1300, 1200, 1100, 1050, 1000, 980, 960, 940, 920], + } + ) + + result = analyzer._calculate_fibonacci_levels(df, current_price=104.0, lookback=20) + + assert result is not None + assert result["trade_zone"] + assert any(level["kind"] in {"retracement", "extension"} for level in result["support_details"] + result["resistance_details"]) + + formatted = analyzer._format_fib_levels(result["support_details"] or result["resistance_details"]) + assert "回撤Fib" in formatted or "扩展Fib" in formatted diff --git a/frontend/trading.html b/frontend/trading.html index cdd7940..0102f6e 100644 --- a/frontend/trading.html +++ b/frontend/trading.html @@ -282,9 +282,6 @@
-
@@ -393,6 +390,8 @@ 数量 入场价 当前价 + 止损 + 止盈 杠杆 保证金 未实现盈亏 @@ -408,16 +407,18 @@ {{ order.side === 'long' ? '做多' : '做空' }} - {{ order.quantity ? order.quantity.toFixed(4) : '0.0000' }} - {{ order.entry_price ? '$' + order.entry_price.toFixed(2) : '$0.00' }} - {{ order.current_price ? '$' + order.current_price.toFixed(2) : '-' }} + {{ formatNumber(order.quantity, 4) }} + {{ formatCurrency(order.display_entry_price) }} + {{ formatCurrency(order.current_price) }} + {{ formatCurrency(order.stop_loss) }} + {{ formatCurrency(order.take_profit) }} {{ order.leverage || 0 }}x - {{ order.margin ? '$' + order.margin.toFixed(2) : '$0.00' }} + {{ formatCurrency(order.margin) }} - {{ order.unrealized_pnl >= 0 ? '+' : '' }}${{ order.unrealized_pnl ? order.unrealized_pnl.toFixed(2) : '0.00' }} + {{ formatSignedCurrency(order.unrealized_pnl) }} - {{ order.pnl_percent >= 0 ? '+' : '' }}{{ order.pnl_percent ? order.pnl_percent.toFixed(2) : '0.00' }}% + {{ formatSignedPercent(order.pnl_percent) }} @@ -659,7 +660,22 @@ }, computed: { openPositions() { - return this.orders.filter(o => o.status === 'open'); + return this.orders + .filter(order => order.status === 'open') + .map(order => { + const displayEntryPrice = this.getDisplayEntryPrice(order); + const currentPrice = this.resolveOrderCurrentPrice(order); + const pnlPercent = this.calculateOpenOrderPnlPercent(order, currentPrice, displayEntryPrice); + const unrealizedPnl = this.calculateOpenOrderPnlAmount(order, pnlPercent); + + return { + ...order, + display_entry_price: displayEntryPrice, + current_price: currentPrice || null, + pnl_percent: pnlPercent, + unrealized_pnl: unrealizedPnl + }; + }); }, pendingOrders() { return this.orders.filter(o => o.status === 'pending'); @@ -741,14 +757,8 @@ async fetchOrders() { try { const response = await axios.get('/api/trading/orders'); - console.log('API Response:', response.data); if (response.data.success) { this.orders = response.data.orders || []; - console.log('Orders loaded:', this.orders.length); - console.log('Orders data:', this.orders); - console.log('Open positions:', this.orders.filter(o => o.status === 'open')); - console.log('Pending orders:', this.orders.filter(o => o.status === 'pending')); - console.log('Order history:', this.orders.filter(o => o.status === 'closed')); } } catch (error) { console.error('获取订单失败:', error); @@ -759,7 +769,10 @@ if (!confirm('确定要平仓吗?')) return; try { - const response = await axios.post(`/api/trading/orders/${order.order_id}/close`); + const exitPrice = this.resolveOrderCurrentPrice(order) || this.getDisplayEntryPrice(order); + const response = await axios.post(`/api/trading/orders/${order.order_id}/close`, { + exit_price: exitPrice + }); if (response.data.success) { await this.refreshData(); alert('平仓成功'); @@ -806,23 +819,6 @@ } }, - async resetAccount() { - if (!confirm('确定要重置账户吗?这将清除所有持仓和订单!')) return; - - try { - const response = await axios.post('/api/trading/account/reset'); - if (response.data.success) { - await this.refreshData(); - alert('账户重置成功'); - } else { - alert('重置失败: ' + (response.data.message || '未知错误')); - } - } catch (error) { - console.error('重置账户失败:', error); - alert('重置失败: ' + (error.response?.data?.detail || error.message)); - } - }, - async sendReport() { this.sendingReport = true; try { @@ -851,6 +847,73 @@ }); }, + formatCurrency(value) { + const number = Number(value); + if (!Number.isFinite(number) || number <= 0) return '-'; + return `$${number.toFixed(2)}`; + }, + + formatSignedCurrency(value) { + const number = Number(value); + if (!Number.isFinite(number)) return '$0.00'; + return `${number >= 0 ? '+' : '-'}$${Math.abs(number).toFixed(2)}`; + }, + + formatSignedPercent(value) { + const number = Number(value); + if (!Number.isFinite(number)) return '0.00%'; + return `${number >= 0 ? '+' : '-'}${Math.abs(number).toFixed(2)}%`; + }, + + formatNumber(value, digits = 2) { + const number = Number(value); + if (!Number.isFinite(number)) return (0).toFixed(digits); + return number.toFixed(digits); + }, + + getDisplayEntryPrice(order) { + const price = Number(order.filled_price || order.entry_price || 0); + return Number.isFinite(price) ? price : 0; + }, + + resolveOrderCurrentPrice(order) { + const latest = Number(this.latestPrices?.[order.symbol]); + if (Number.isFinite(latest) && latest > 0) { + return latest; + } + + const current = Number(order.current_price); + if (Number.isFinite(current) && current > 0) { + return current; + } + + return 0; + }, + + calculateOpenOrderPnlPercent(order, currentPrice, entryPrice) { + if (!currentPrice || !entryPrice) { + return Number(order.pnl_percent || 0); + } + + if (order.side === 'long') { + return ((currentPrice - entryPrice) / entryPrice) * 100; + } + + if (order.side === 'short') { + return ((entryPrice - currentPrice) / entryPrice) * 100; + } + + return Number(order.pnl_percent || 0); + }, + + calculateOpenOrderPnlAmount(order, pnlPercent) { + const positionValue = Number(order.quantity || 0); + if (!Number.isFinite(positionValue) || positionValue <= 0) { + return Number(order.unrealized_pnl || 0); + } + return positionValue * pnlPercent / 100; + }, + getCloseReason(reason) { const map = { 'manual': '手动',