""" 交易决策器 - 基于市场信号和当前状态做出交易决策 职责: 1. 接收市场信号(不含仓位信息) 2. 接收当前持仓状态 3. 接收账户状态 4. 做出具体交易决策(开仓/平仓/加仓/减仓/观望) """ import json from typing import Dict, Any, Optional, List from datetime import datetime from app.utils.logger import logger from app.services.llm_service import llm_service class TradingDecisionMaker: """交易决策器 - 负责仓位管理和风险控制""" # 交易决策系统提示词 TRADING_DECISION_PROMPT = """你是一位专业的加密货币交易员。你的职责是**根据市场信号和当前仓位状态,做出交易决策**。 ## 你的职责 - 分析市场信号的质量 - 结合当前持仓评估风险 - 考虑账户整体状况 - 做出具体交易决策 ## 输入信息 你将收到: 1. 市场信号(方向、强度、理由) 2. 当前持仓列表 3. 账户状态(余额、已用保证金、杠杆等) ## 决策类型 ### 1. 开仓(OPEN) **时机**:无持仓或可以加仓时 **要求**: - **A级信号(confidence >= 80)**:可开 heavy/medium/light 仓位 - **B级信号(60 <= confidence < 80)**:只能开 medium/light 仓位 - **C级信号(40 <= confidence < 60)**:只能开 light 仓位 - **D级信号(confidence < 40)**:不开仓,返回 HOLD - 账户有足够的可用杠杆空间 - 风险可控(止损明确) ### 2. 平仓(CLOSE) **时机**: - 触发止损/止盈 - 信号反转 - 风险过大 ### 3. 加仓(ADD) **时机**: - 已有盈利持仓 - 同向新信号 - 趋势加强 **价格距离限制(重要)**: - 如果已有持仓/挂单的价格与新价格距离 < 1%,**不加仓也不开新仓** - 例如:现有 BTC 做多持仓 @ $95,000,新信号 @ $95,500(差距 0.52% < 1%),拒绝开仓 - 这是为了避免在同一价格区域重复建仓,导致风险过度集中 - **例外**:如果信号是 A 级(confidence >= 90)且趋势非常强劲,可以考虑放宽到 0.5% ### 4. 减仓(REDUCE) **时机**: - 部分止盈 - 降低风险敞口 - 不确定增加 ### 5. 观望(HOLD) **时机**: - 信号不明确 - 风险过大 - 可用杠杆空间不足 - 等待更好时机 ## 仓位管理规则 ### 全仓模式(联合保证金) - **最大杠杆 20 倍**:最大仓位金额 = 账户余额 × 20 - **当前杠杆**:当前杠杆 = 当前持仓价值 / 账户余额 - **可用杠杆空间百分比**:(最大仓位金额 - 当前持仓价值) / 最大仓位金额 × 100% ### 仓位大小选择(综合考虑信号质量和可用空间) 仓位大小由**信号等级**和**可用杠杆空间**共同决定: #### 1. 信号等级决定最大仓位上限 - **A级信号(80-100分)**:可选择 heavy/medium/light - **B级信号(60-79分)**:只能选择 medium/light - **C级信号(40-59分)**:只能选择 light - **D级信号(<40分)**:不开仓,返回 HOLD #### 2. 可用杠杆空间决定是否可开仓 - **可用空间 >= 10%**:可以开 heavy 仓位 - **可用空间 >= 5%**:可以开 medium 仓位 - **可用空间 >= 3%**:可以开 light 仓位 - **可用空间 < 3%**:不新开仓,返回 HOLD #### 3. 仓位大小与保证金金额 - **heavy**:使用保证金 = 账户余额 × 12% - **medium**:使用保证金 = 账户余额 × 6% - **light**:使用保证金 = 账户余额 × 3% #### 4. 选择逻辑示例 假设当前可用杠杆空间为 50%: - A级信号 → 可以选择 heavy(空间足够,信号质量高) - B级信号 → 只能选择 medium/light(信号质量中等) - C级信号 → 只能选择 light(信号质量一般,保守仓位) 假设当前可用杠杆空间为 4%: - A级信号 → 只能选择 medium/light(空间不足) - B级信号 → 只能选择 light(空间不足) - C级信号 → 不开仓(空间不足) **重要**:`quantity` 字段输出的是**保证金金额**,不是持仓价值。交易系统会使用杠杆自动计算实际持仓价值。 ### 计算示例 - 账户余额:$10,000 - 最大仓位金额:$10,000 × 20 = $200,000 - 当前持仓价值:$20,000(当前杠杆 2x) - 可用仓位金额:$200,000 - $20,000 = $180,000 - 可用杠杆空间:$180,000 / $200,000 = 90% - 计算公式:保证金金额 = 账户余额 × 使用比例 - heavy:保证金 $10,000 × 12% = $1,200 → 持仓价值 $1,200 × 20 = $24,000 - medium:保证金 $10,000 × 6% = $600 → 持仓价值 $600 × 20 = $12,000 - light:保证金 $10,000 × 3% = $300 → 持仓价值 $300 × 20 = $6,000 ### 风险控制 - 单笔最大亏损不超过账户 2% - 止损必须明确 - 避免过度交易 - 不追涨杀跌 ## 决策输出格式 请以 JSON 格式输出: ```json { "decision": "OPEN/CLOSE/ADD/REDUCE/HOLD", "symbol": "BTC/USDT", "side": "buy/sell", "action": "open_long/close_short/add_long/...", "position_size": "heavy/medium/light", "quantity": 1200, "confidence": 0-100, "reasoning": "简洁的决策理由(1句话,15字以内)", "risk_analysis": "核心风险点(1句话,15字以内)", "stop_loss": 65500, "take_profit": 67500, "notes": "其他说明" } ``` ## 重要说明 - **所有价格必须是纯数字**,不要加 $ 符号、逗号或其他格式 - `stop_loss`、`take_profit` 必须是数字类型 - **quantity 是保证金金额(USDT)**,交易系统会使用杠杆计算实际持仓价值 - **position_size** 和 **quantity** 必须匹配(heavy 对应最大保证金金额) - **入场方式由市场信号决定**,你只需要根据市场信号的 `entry_type` 来执行交易 ## 输出简洁性要求(重要!) - **reasoning(决策理由)**:用1句话说清楚决策原因,不超过15个字 - ❌ 错误示例:"由于市场信号显示强劲的上升趋势,且当前可用杠杆空间充足,因此决定开仓做多" - ✅ 正确示例:"A级信号,上升趋势明确,空间充足" - **risk_analysis(风险分析)**:用1句话指出核心风险,不超过15个字 - ❌ 错误示例:"需要注意市场波动性增加可能带来的潜在亏损风险,同时关注止损位的设置" - ✅ 正确示例:"关注波动风险,止损设好" ## 注意事项 1. **安全第一**:宁可错过机会,也不要冒过大风险 2. **遵守杠杆限制**:总杠杆永远不超过 20 倍 3. **理性决策**:不要被 FOMO 情绪左右 4. **灵活应变**:根据市场变化调整策略 5. **简洁输出**:决策理由和风险分析必须简明扼要 记住:你是交易执行者,不是市场分析师。市场分析已经完成了,你只需要根据分析结果和当前状态做出理性的交易决策! """ def __init__(self): pass async def make_decision(self, market_signal: Dict[str, Any], positions: List[Dict[str, Any]], account: Dict[str, Any], current_price: float = None) -> Dict[str, Any]: """ 做出交易决策 Args: market_signal: 市场信号(来自 MarketSignalAnalyzer) positions: 当前持仓列表 account: 账户状态 current_price: 当前价格(用于判断入场方式) Returns: 交易决策字典 """ try: # 1. 准备决策上下文 decision_context = self._prepare_decision_context( market_signal, positions, account, current_price ) # 2. 构建提示词 prompt = self._build_decision_prompt(decision_context) # 3. 调用 LLM 做决策 messages = [ {"role": "system", "content": self.TRADING_DECISION_PROMPT}, {"role": "user", "content": prompt} ] response = await llm_service.achat(messages) # 4. 解析结果 result = self._parse_decision_response(response, market_signal['symbol']) # 5. 验证决策安全性 result = self._validate_decision(result, positions, account) return result except Exception as e: logger.error(f"交易决策失败: {e}") import traceback logger.debug(traceback.format_exc()) return self._get_hold_decision(market_signal['symbol'], "决策系统异常") def _prepare_decision_context(self, market_signal: Dict[str, Any], positions: List[Dict[str, Any]], account: Dict[str, Any], current_price: float = None) -> Dict[str, Any]: """准备决策上下文""" context = { 'symbol': market_signal.get('symbol'), 'market_state': market_signal.get('market_state'), 'trend': market_signal.get('trend'), 'signals': market_signal.get('signals', []), 'key_levels': market_signal.get('key_levels', {}), 'positions': positions, 'account': account, 'current_price': current_price } # 计算账户状态 balance = float(account.get('current_balance', 0)) total_position_value = float(account.get('total_position_value', 0)) used_margin = float(account.get('used_margin', 0)) # 当前杠杆(全仓模式) max_leverage = 20 max_position_value = balance * max_leverage # 最大仓位金额 current_leverage = (total_position_value / balance) if balance > 0 else 0 available_position_value = max(0, max_position_value - total_position_value) # 剩余可用仓位金额 available_leverage_percent = (available_position_value / max_position_value * 100) if max_position_value > 0 else 0 # 可用杠杆空间百分比 context['leverage_info'] = { 'balance': balance, 'current_leverage': current_leverage, 'total_position_value': total_position_value, 'max_position_value': max_position_value, 'available_position_value': available_position_value, 'available_leverage_percent': available_leverage_percent, 'max_leverage': max_leverage } # 价格距离检查信息(用于 LLM 判断) context['price_distance_check'] = { 'enabled': True, 'min_distance_percent': 1.0, # 最小价格距离 1% 'exception_threshold': 90 # A 级信号且 confidence >= 90 时可放宽到 0.5% } return context def _build_decision_prompt(self, context: Dict[str, Any]) -> str: """构建决策提示词""" prompt_parts = [] # 市场信号 prompt_parts.append(f"## 市场信号") prompt_parts.append(f"交易对: {context['symbol']}") prompt_parts.append(f"市场状态: {context.get('market_state')}") prompt_parts.append(f"趋势: {context.get('trend')}") # 当前价格(如果有) current_price = context.get('current_price') if current_price: prompt_parts.append(f"当前价格: ${current_price:,.2f}") # 信号列表 signals = context.get('signals', []) if signals: prompt_parts.append(f"\n## 信号列表") for i, sig in enumerate(signals, 1): # timeframe 是 short_term/medium_term/long_term timeframe = sig.get('timeframe', 'N/A') action = sig.get('action', 'N/A') prompt_parts.append(f"{i}. {timeframe} | {action}") prompt_parts.append(f" 信心度: {sig.get('confidence', 0)}") # 添加入场价格信息 entry_zone = sig.get('entry_zone') if entry_zone: prompt_parts.append(f" 建议入场价: ${entry_zone:,.2f}") prompt_parts.append(f" 理由: {sig.get('reasoning', 'N/A')}") # 关键价位 key_levels = context.get('key_levels', {}) if key_levels: prompt_parts.append(f"\n## 关键价位") if key_levels.get('support'): # 提取数字并格式化 import re def extract_num(val): if isinstance(val, (int, float)): return float(val) if isinstance(val, str): match = re.search(r'[\d,]+\.?\d*', val.replace(',', '')) if match: return float(match.group()) return None supports = [extract_num(s) for s in key_levels['support'][:3]] supports_str = ', '.join([f"${s:,.2f}" for s in supports if s is not None]) prompt_parts.append(f"支撑位: {supports_str}") if key_levels.get('resistance'): import re def extract_num(val): if isinstance(val, (int, float)): return float(val) if isinstance(val, str): match = re.search(r'[\d,]+\.?\d*', val.replace(',', '')) if match: return float(match.group()) return None resistances = [extract_num(r) for r in key_levels['resistance'][:3]] resistances_str = ', '.join([f"${r:,.2f}" for r in resistances if r is not None]) prompt_parts.append(f"阻力位: {resistances_str}") # 当前持仓 positions = context.get('positions', []) prompt_parts.append(f"\n## 当前持仓") if positions: for pos in positions: if pos.get('holding', 0) > 0: prompt_parts.append(f"- {pos.get('symbol')}: {pos.get('side')} {pos.get('holding')} USDT") prompt_parts.append(f" 开仓价: ${pos.get('entry_price')}") prompt_parts.append(f" 止损: ${pos.get('stop_loss')}") prompt_parts.append(f" 止盈: ${pos.get('take_profit')}") else: prompt_parts.append("无持仓") # 账户状态 account = context.get('account', {}) lev_info = context.get('leverage_info', {}) prompt_parts.append(f"\n## 账户状态") prompt_parts.append(f"余额: ${account.get('current_balance', 0):.2f}") prompt_parts.append(f"可用: ${account.get('available', 0):.2f}") prompt_parts.append(f"已用保证金: ${account.get('used_margin', 0):.2f}") prompt_parts.append(f"持仓价值: ${account.get('total_position_value', 0):.2f}") prompt_parts.append(f"\n## 杠杆信息") prompt_parts.append(f"当前杠杆: {lev_info.get('current_leverage', 0):.1f}x") prompt_parts.append(f"最大仓位金额: ${lev_info.get('max_position_value', 0):,.2f}") prompt_parts.append(f"可用仓位金额: ${lev_info.get('available_position_value', 0):,.2f}") prompt_parts.append(f"可用杠杆空间: {lev_info.get('available_leverage_percent', 0):.1f}%") prompt_parts.append(f"最大杠杆限制: {lev_info.get('max_leverage', 20)}x") # 价格距离检查规则 price_check = context.get('price_distance_check', {}) if price_check.get('enabled'): prompt_parts.append(f"\n## 价格距离限制") prompt_parts.append(f"⚠️ 重要:如果有相同方向的持仓/挂单,价格距离必须 >= {price_check.get('min_distance_percent', 1)}%") prompt_parts.append(f"- 低于此距离不开新仓,避免风险过度集中") prompt_parts.append(f"- A级信号(confidence >= {price_check.get('exception_threshold', 90)})可考虑放宽到 0.5%") prompt_parts.append(f"\n请根据以上信息,做出交易决策。") return "\n".join(prompt_parts) def _parse_decision_response(self, response: str, symbol: str) -> Dict[str, Any]: """解析决策响应""" try: import re # 尝试提取 JSON json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response) if json_match: json_str = json_match.group(1) else: json_match = re.search(r'\{[\s\S]*\}', response) if json_match: json_str = json_match.group(0) else: raise ValueError("无法找到 JSON 响应") # 清理 JSON 字符串 json_str = self._clean_json_string(json_str) result = json.loads(json_str) # 清理价格字段 - 转换为 float result = self._clean_price_fields(result) # 添加元数据 result['symbol'] = symbol result['timestamp'] = datetime.now().isoformat() result['raw_response'] = response logger.info(f"✅ 交易决策完成: {symbol} | {result.get('decision', 'HOLD')}") return result except Exception as e: logger.warning(f"解析决策响应失败: {e}") logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符 return self._get_hold_decision(symbol, "解析失败,默认观望") def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: """清理价格字段,转换为 float""" def clean_price(price_value): if price_value is None: return None if isinstance(price_value, (int, float)): return float(price_value) if isinstance(price_value, str): # 移除 $ 符号和逗号 cleaned = price_value.replace('$', '').replace(',', '').strip() if cleaned: try: return float(cleaned) except ValueError: return None return None # 清理顶层价格字段 price_fields = ['stop_loss', 'take_profit', 'quantity'] for field in price_fields: if field in data: data[field] = clean_price(data[field]) return data def _clean_json_string(self, json_str: str) -> str: """清理 JSON 字符串,移除可能导致解析错误的内容""" import re # 移除单行注释 // ... json_str = re.sub(r'//.*?(?=\n|$)', '', json_str) # 移除多行注释 /* ... */ json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str) # 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}) json_str = re.sub(r',\s*([}\]])', r'\1', json_str) return json_str def _validate_decision(self, decision: Dict[str, Any], positions: List[Dict[str, Any]], account: Dict[str, Any]) -> Dict[str, Any]: """验证决策安全性""" # 检查杠杆限制 if decision.get('decision') in ['OPEN', 'ADD']: balance = float(account.get('current_balance', 0)) total_position_value = float(account.get('total_position_value', 0)) max_leverage = 20 max_position_value = balance * max_leverage # quantity 是保证金金额,需要乘以杠杆得到持仓价值 margin = float(decision.get('quantity', 0)) position_value = margin * max_leverage # 使用最大杠杆计算持仓价值 new_total_value = total_position_value + position_value if new_total_value > max_position_value: logger.warning(f"⚠️ 决策被拒绝: 超过最大仓位金额 (保证金 ${margin:.2f} → 持仓价值 ${position_value:.2f}, 总计 ${new_total_value:,.2f} > ${max_position_value:,.2f})") return self._get_hold_decision( decision['symbol'], f"超过最大仓位金额 (保证金 ${margin:.2f} → 持仓价值 ${position_value:.2f}, 总计 ${new_total_value:,.2f} > ${max_position_value:,.2f})" ) return decision def _get_hold_decision(self, symbol: str, reason: str = "") -> Dict[str, Any]: """返回观望决策""" return { 'decision': 'HOLD', 'symbol': symbol, 'action': 'hold', 'reasoning': f'观望: {reason}', 'timestamp': datetime.now().isoformat() }