stock-ai-agent/backend/app/crypto_agent/trading_decision_maker.py
2026-02-25 21:50:34 +08:00

491 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
交易决策器 - 基于市场信号和当前状态做出交易决策
职责:
1. 接收市场信号(不含仓位信息)
2. 接收当前持仓状态
3. 接收账户状态
4. 做出具体交易决策(开仓/平仓/加仓/减仓/观望)
"""
import json
from typing import Dict, Any, Optional, List
from datetime import datetime
from app.utils.logger import logger
from app.services.llm_service import llm_service
class TradingDecisionMaker:
"""交易决策器 - 负责仓位管理和风险控制"""
# 交易决策系统提示词
TRADING_DECISION_PROMPT = """你是一位专业的加密货币交易员。你的职责是**根据市场信号和当前仓位状态,做出交易决策**。
## 你的职责
- 分析市场信号的质量
- 结合当前持仓评估风险
- 考虑账户整体状况
- 做出具体交易决策
## 输入信息
你将收到:
1. 市场信号(方向、强度、理由)
2. 当前持仓列表
3. 账户状态(余额、已用保证金、杠杆等)
## 决策类型
### 1. 开仓OPEN
**时机**:无持仓或可以加仓时
**要求**
- **A级信号confidence >= 80**:可开 heavy/medium/light 仓位
- **B级信号60 <= confidence < 80**:只能开 medium/light 仓位
- **C级信号40 <= confidence < 60**:只能开 light 仓位
- **D级信号confidence < 40**:不开仓,返回 HOLD
- 账户有足够的可用杠杆空间
- 风险可控(止损明确)
### 2. 平仓CLOSE
**时机**
- 触发止损/止盈
- 信号反转
- 风险过大
### 3. 加仓ADD
**时机**
- 已有盈利持仓
- 同向新信号
- 趋势加强
**价格距离限制(重要)**
- 如果已有持仓/挂单的价格与新价格距离 < 1%**不加仓也不开新仓**
- 例如:现有 BTC 做多持仓 @ $95,000新信号 @ $95,500差距 0.52% < 1%),拒绝开仓
- 这是为了避免在同一价格区域重复建仓,导致风险过度集中
- **例外**:如果信号是 A 级confidence >= 90且趋势非常强劲可以考虑放宽到 0.5%
### 4. 减仓REDUCE
**时机**
- 部分止盈
- 降低风险敞口
- 不确定增加
### 5. 观望HOLD
**时机**
- 信号不明确
- 风险过大
- 可用杠杆空间不足
- 等待更好时机
## 仓位管理规则
### 全仓模式(联合保证金)
- **最大杠杆 20 倍**:最大仓位金额 = 账户余额 × 20
- **当前杠杆**:当前杠杆 = 当前持仓价值 / 账户余额
- **可用杠杆空间百分比**(最大仓位金额 - 当前持仓价值) / 最大仓位金额 × 100%
### 仓位大小选择(综合考虑信号质量和可用空间)
仓位大小由**信号等级**和**可用杠杆空间**共同决定:
#### 1. 信号等级决定最大仓位上限
- **A级信号80-100分**:可选择 heavy/medium/light
- **B级信号60-79分**:只能选择 medium/light
- **C级信号40-59分**:只能选择 light
- **D级信号<40分**:不开仓,返回 HOLD
#### 2. 可用杠杆空间决定是否可开仓
- **可用空间 >= 10%**:可以开 heavy 仓位
- **可用空间 >= 5%**:可以开 medium 仓位
- **可用空间 >= 3%**:可以开 light 仓位
- **可用空间 < 3%**:不新开仓,返回 HOLD
#### 3. 仓位大小与保证金金额
- **heavy**:使用保证金 = 账户余额 × 12%
- **medium**:使用保证金 = 账户余额 × 6%
- **light**:使用保证金 = 账户余额 × 3%
#### 4. 选择逻辑示例
假设当前可用杠杆空间为 50%
- A级信号 → 可以选择 heavy空间足够信号质量高
- B级信号 → 只能选择 medium/light信号质量中等
- C级信号 → 只能选择 light信号质量一般保守仓位
假设当前可用杠杆空间为 4%
- A级信号 → 只能选择 medium/light空间不足
- B级信号 → 只能选择 light空间不足
- C级信号 → 不开仓(空间不足)
**重要**`quantity` 字段输出的是**保证金金额**,不是持仓价值。交易系统会使用杠杆自动计算实际持仓价值。
### 计算示例
- 账户余额:$10,000
- 最大仓位金额:$10,000 × 20 = $200,000
- 当前持仓价值:$20,000当前杠杆 2x
- 可用仓位金额:$200,000 - $20,000 = $180,000
- 可用杠杆空间:$180,000 / $200,000 = 90%
- 计算公式:保证金金额 = 账户余额 × 使用比例
- heavy保证金 $10,000 × 12% = $1,200 → 持仓价值 $1,200 × 20 = $24,000
- medium保证金 $10,000 × 6% = $600 → 持仓价值 $600 × 20 = $12,000
- light保证金 $10,000 × 3% = $300 → 持仓价值 $300 × 20 = $6,000
### 风险控制
- 单笔最大亏损不超过账户 2%
- 止损必须明确
- 避免过度交易
- 不追涨杀跌
## 决策输出格式
请以 JSON 格式输出:
```json
{
"decision": "OPEN/CLOSE/ADD/REDUCE/HOLD",
"symbol": "BTC/USDT",
"side": "buy/sell",
"action": "open_long/close_short/add_long/...",
"position_size": "heavy/medium/light",
"quantity": 1200,
"confidence": 0-100,
"reasoning": "简洁的决策理由1句话15字以内",
"risk_analysis": "核心风险点1句话15字以内",
"stop_loss": 65500,
"take_profit": 67500,
"notes": "其他说明"
}
```
## 重要说明
- **所有价格必须是纯数字**,不要加 $ 符号、逗号或其他格式
- `stop_loss`、`take_profit` 必须是数字类型
- **quantity 是保证金金额USDT**,交易系统会使用杠杆计算实际持仓价值
- **position_size** 和 **quantity** 必须匹配heavy 对应最大保证金金额)
- **入场方式由市场信号决定**,你只需要根据市场信号的 `entry_type` 来执行交易
## 输出简洁性要求(重要!)
- **reasoning决策理由**用1句话说清楚决策原因不超过15个字
- ❌ 错误示例:"由于市场信号显示强劲的上升趋势,且当前可用杠杆空间充足,因此决定开仓做多"
- ✅ 正确示例:"A级信号上升趋势明确空间充足"
- **risk_analysis风险分析**用1句话指出核心风险不超过15个字
- ❌ 错误示例:"需要注意市场波动性增加可能带来的潜在亏损风险,同时关注止损位的设置"
- ✅ 正确示例:"关注波动风险,止损设好"
## 注意事项
1. **安全第一**:宁可错过机会,也不要冒过大风险
2. **遵守杠杆限制**:总杠杆永远不超过 20 倍
3. **理性决策**:不要被 FOMO 情绪左右
4. **灵活应变**:根据市场变化调整策略
5. **简洁输出**:决策理由和风险分析必须简明扼要
记住:你是交易执行者,不是市场分析师。市场分析已经完成了,你只需要根据分析结果和当前状态做出理性的交易决策!
"""
def __init__(self):
pass
async def make_decision(self,
market_signal: Dict[str, Any],
positions: List[Dict[str, Any]],
account: Dict[str, Any],
current_price: float = None) -> Dict[str, Any]:
"""
做出交易决策
Args:
market_signal: 市场信号(来自 MarketSignalAnalyzer
positions: 当前持仓列表
account: 账户状态
current_price: 当前价格(用于判断入场方式)
Returns:
交易决策字典
"""
try:
# 1. 准备决策上下文
decision_context = self._prepare_decision_context(
market_signal, positions, account, current_price
)
# 2. 构建提示词
prompt = self._build_decision_prompt(decision_context)
# 3. 调用 LLM 做决策
messages = [
{"role": "system", "content": self.TRADING_DECISION_PROMPT},
{"role": "user", "content": prompt}
]
response = await llm_service.achat(messages)
# 4. 解析结果
result = self._parse_decision_response(response, market_signal['symbol'])
# 5. 验证决策安全性
result = self._validate_decision(result, positions, account)
return result
except Exception as e:
logger.error(f"交易决策失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return self._get_hold_decision(market_signal['symbol'], "决策系统异常")
def _prepare_decision_context(self,
market_signal: Dict[str, Any],
positions: List[Dict[str, Any]],
account: Dict[str, Any],
current_price: float = None) -> Dict[str, Any]:
"""准备决策上下文"""
context = {
'symbol': market_signal.get('symbol'),
'market_state': market_signal.get('market_state'),
'trend': market_signal.get('trend'),
'signals': market_signal.get('signals', []),
'key_levels': market_signal.get('key_levels', {}),
'positions': positions,
'account': account,
'current_price': current_price
}
# 计算账户状态
balance = float(account.get('current_balance', 0))
total_position_value = float(account.get('total_position_value', 0))
used_margin = float(account.get('used_margin', 0))
# 当前杠杆(全仓模式)
max_leverage = 20
max_position_value = balance * max_leverage # 最大仓位金额
current_leverage = (total_position_value / balance) if balance > 0 else 0
available_position_value = max(0, max_position_value - total_position_value) # 剩余可用仓位金额
available_leverage_percent = (available_position_value / max_position_value * 100) if max_position_value > 0 else 0 # 可用杠杆空间百分比
context['leverage_info'] = {
'balance': balance,
'current_leverage': current_leverage,
'total_position_value': total_position_value,
'max_position_value': max_position_value,
'available_position_value': available_position_value,
'available_leverage_percent': available_leverage_percent,
'max_leverage': max_leverage
}
# 价格距离检查信息(用于 LLM 判断)
context['price_distance_check'] = {
'enabled': True,
'min_distance_percent': 1.0, # 最小价格距离 1%
'exception_threshold': 90 # A 级信号且 confidence >= 90 时可放宽到 0.5%
}
return context
def _build_decision_prompt(self, context: Dict[str, Any]) -> str:
"""构建决策提示词"""
prompt_parts = []
# 市场信号
prompt_parts.append(f"## 市场信号")
prompt_parts.append(f"交易对: {context['symbol']}")
prompt_parts.append(f"市场状态: {context.get('market_state')}")
prompt_parts.append(f"趋势: {context.get('trend')}")
# 当前价格(如果有)
current_price = context.get('current_price')
if current_price:
prompt_parts.append(f"当前价格: ${current_price:,.2f}")
# 信号列表
signals = context.get('signals', [])
if signals:
prompt_parts.append(f"\n## 信号列表")
for i, sig in enumerate(signals, 1):
# timeframe 是 short_term/medium_term/long_term
timeframe = sig.get('timeframe', 'N/A')
action = sig.get('action', 'N/A')
prompt_parts.append(f"{i}. {timeframe} | {action}")
prompt_parts.append(f" 信心度: {sig.get('confidence', 0)}")
# 添加入场价格信息
entry_zone = sig.get('entry_zone')
if entry_zone:
prompt_parts.append(f" 建议入场价: ${entry_zone:,.2f}")
prompt_parts.append(f" 理由: {sig.get('reasoning', 'N/A')}")
# 关键价位
key_levels = context.get('key_levels', {})
if key_levels:
prompt_parts.append(f"\n## 关键价位")
if key_levels.get('support'):
# 提取数字并格式化
import re
def extract_num(val):
if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
match = re.search(r'[\d,]+\.?\d*', val.replace(',', ''))
if match:
return float(match.group())
return None
supports = [extract_num(s) for s in key_levels['support'][:3]]
supports_str = ', '.join([f"${s:,.2f}" for s in supports if s is not None])
prompt_parts.append(f"支撑位: {supports_str}")
if key_levels.get('resistance'):
import re
def extract_num(val):
if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
match = re.search(r'[\d,]+\.?\d*', val.replace(',', ''))
if match:
return float(match.group())
return None
resistances = [extract_num(r) for r in key_levels['resistance'][:3]]
resistances_str = ', '.join([f"${r:,.2f}" for r in resistances if r is not None])
prompt_parts.append(f"阻力位: {resistances_str}")
# 当前持仓
positions = context.get('positions', [])
prompt_parts.append(f"\n## 当前持仓")
if positions:
for pos in positions:
if pos.get('holding', 0) > 0:
prompt_parts.append(f"- {pos.get('symbol')}: {pos.get('side')} {pos.get('holding')} USDT")
prompt_parts.append(f" 开仓价: ${pos.get('entry_price')}")
prompt_parts.append(f" 止损: ${pos.get('stop_loss')}")
prompt_parts.append(f" 止盈: ${pos.get('take_profit')}")
else:
prompt_parts.append("无持仓")
# 账户状态
account = context.get('account', {})
lev_info = context.get('leverage_info', {})
prompt_parts.append(f"\n## 账户状态")
prompt_parts.append(f"余额: ${account.get('current_balance', 0):.2f}")
prompt_parts.append(f"可用: ${account.get('available', 0):.2f}")
prompt_parts.append(f"已用保证金: ${account.get('used_margin', 0):.2f}")
prompt_parts.append(f"持仓价值: ${account.get('total_position_value', 0):.2f}")
prompt_parts.append(f"\n## 杠杆信息")
prompt_parts.append(f"当前杠杆: {lev_info.get('current_leverage', 0):.1f}x")
prompt_parts.append(f"最大仓位金额: ${lev_info.get('max_position_value', 0):,.2f}")
prompt_parts.append(f"可用仓位金额: ${lev_info.get('available_position_value', 0):,.2f}")
prompt_parts.append(f"可用杠杆空间: {lev_info.get('available_leverage_percent', 0):.1f}%")
prompt_parts.append(f"最大杠杆限制: {lev_info.get('max_leverage', 20)}x")
# 价格距离检查规则
price_check = context.get('price_distance_check', {})
if price_check.get('enabled'):
prompt_parts.append(f"\n## 价格距离限制")
prompt_parts.append(f"⚠️ 重要:如果有相同方向的持仓/挂单,价格距离必须 >= {price_check.get('min_distance_percent', 1)}%")
prompt_parts.append(f"- 低于此距离不开新仓,避免风险过度集中")
prompt_parts.append(f"- A级信号confidence >= {price_check.get('exception_threshold', 90)})可考虑放宽到 0.5%")
prompt_parts.append(f"\n请根据以上信息,做出交易决策。")
return "\n".join(prompt_parts)
def _parse_decision_response(self, response: str, symbol: str) -> Dict[str, Any]:
"""解析决策响应"""
try:
import re
# 尝试提取 JSON
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response)
if json_match:
json_str = json_match.group(1)
else:
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
json_str = json_match.group(0)
else:
raise ValueError("无法找到 JSON 响应")
# 清理 JSON 字符串
json_str = self._clean_json_string(json_str)
result = json.loads(json_str)
# 清理价格字段 - 转换为 float
result = self._clean_price_fields(result)
# 添加元数据
result['symbol'] = symbol
result['timestamp'] = datetime.now().isoformat()
result['raw_response'] = response
logger.info(f"✅ 交易决策完成: {symbol} | {result.get('decision', 'HOLD')}")
return result
except Exception as e:
logger.warning(f"解析决策响应失败: {e}")
logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符
return self._get_hold_decision(symbol, "解析失败,默认观望")
def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清理价格字段,转换为 float"""
def clean_price(price_value):
if price_value is None:
return None
if isinstance(price_value, (int, float)):
return float(price_value)
if isinstance(price_value, str):
# 移除 $ 符号和逗号
cleaned = price_value.replace('$', '').replace(',', '').strip()
if cleaned:
try:
return float(cleaned)
except ValueError:
return None
return None
# 清理顶层价格字段
price_fields = ['stop_loss', 'take_profit', 'quantity']
for field in price_fields:
if field in data:
data[field] = clean_price(data[field])
return data
def _clean_json_string(self, json_str: str) -> str:
"""清理 JSON 字符串,移除可能导致解析错误的内容"""
import re
# 移除单行注释 // ...
json_str = re.sub(r'//.*?(?=\n|$)', '', json_str)
# 移除多行注释 /* ... */
json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str)
# 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
return json_str
def _validate_decision(self, decision: Dict[str, Any],
positions: List[Dict[str, Any]],
account: Dict[str, Any]) -> Dict[str, Any]:
"""验证决策安全性"""
# 检查杠杆限制
if decision.get('decision') in ['OPEN', 'ADD']:
balance = float(account.get('current_balance', 0))
total_position_value = float(account.get('total_position_value', 0))
max_leverage = 20
max_position_value = balance * max_leverage
# quantity 是保证金金额,需要乘以杠杆得到持仓价值
margin = float(decision.get('quantity', 0))
position_value = margin * max_leverage # 使用最大杠杆计算持仓价值
new_total_value = total_position_value + position_value
if new_total_value > max_position_value:
logger.warning(f"⚠️ 决策被拒绝: 超过最大仓位金额 (保证金 ${margin:.2f} → 持仓价值 ${position_value:.2f}, 总计 ${new_total_value:,.2f} > ${max_position_value:,.2f})")
return self._get_hold_decision(
decision['symbol'],
f"超过最大仓位金额 (保证金 ${margin:.2f} → 持仓价值 ${position_value:.2f}, 总计 ${new_total_value:,.2f} > ${max_position_value:,.2f})"
)
return decision
def _get_hold_decision(self, symbol: str, reason: str = "") -> Dict[str, Any]:
"""返回观望决策"""
return {
'decision': 'HOLD',
'symbol': symbol,
'action': 'hold',
'reasoning': f'观望: {reason}',
'timestamp': datetime.now().isoformat()
}