tradusai/signals/llm_decision.py
2025-12-11 16:11:35 +08:00

760 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
LLM Decision Maker - Use Claude/GPT for trading decisions
"""
import logging
import json
import re
from typing import Dict, Any, Optional
from datetime import datetime
import os
from config.settings import settings
logger = logging.getLogger(__name__)
class LLMDecisionMaker:
"""Generate trading decisions using LLM (Claude or OpenAI)"""
def __init__(self, provider: str = 'claude', api_key: Optional[str] = None):
"""
Initialize LLM decision maker
Args:
provider: 'claude' or 'openai'
api_key: API key (or use environment variable)
"""
self.provider = provider.lower()
self.api_key = api_key or self._get_api_key()
if not self.api_key:
logger.warning(f"No API key found for {provider}. LLM decisions will be disabled.")
self.enabled = False
else:
self.enabled = True
self._init_client()
def _get_api_key(self) -> Optional[str]:
"""Get API key from environment"""
if self.provider == 'claude':
return os.getenv('ANTHROPIC_API_KEY')
elif self.provider == 'openai':
return os.getenv('OPENAI_API_KEY')
return None
def _init_client(self):
"""Initialize LLM client"""
try:
if self.provider == 'claude':
import anthropic
self.client = anthropic.Anthropic(api_key=self.api_key)
self.model = "claude-3-5-sonnet-20241022"
elif self.provider == 'openai':
import openai
# Support custom base URL (for Deepseek, etc.)
base_url = os.getenv('OPENAI_BASE_URL')
if base_url:
self.client = openai.OpenAI(
api_key=self.api_key,
base_url=base_url
)
# Use appropriate model for the endpoint
if 'deepseek' in base_url.lower():
self.model = "deepseek-chat"
logger.info("Using Deepseek API endpoint")
else:
self.model = "gpt-4-turbo-preview"
else:
self.client = openai.OpenAI(api_key=self.api_key)
self.model = "gpt-4-turbo-preview"
logger.info(f"Initialized {self.provider} client with model {self.model}")
except ImportError as e:
logger.error(f"Failed to import {self.provider} library: {e}")
self.enabled = False
except Exception as e:
logger.error(f"Failed to initialize {self.provider} client: {e}")
self.enabled = False
def generate_decision(
self,
market_context: Dict[str, Any],
analysis: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Generate trading decision using LLM
Args:
market_context: LLM context from LLMContextBuilder
analysis: Optional full analysis for additional details
Returns:
Decision dict with signal, reasoning, and levels
"""
if not self.enabled:
return self._disabled_response()
try:
# Build prompt
prompt = self._build_prompt(market_context, analysis)
# Log the complete prompt being sent to LLM
logger.info("=" * 80)
logger.info("📤 完整的 LLM 提示词 (发送给 Deepseek):")
logger.info("=" * 80)
logger.info(prompt)
logger.info("=" * 80)
# Call LLM
response_text = self._call_llm(prompt)
# Log the LLM response
logger.info("=" * 80)
logger.info("📥 LLM 原始响应:")
logger.info("=" * 80)
logger.info(response_text)
logger.info("=" * 80)
# Parse response
decision = self._parse_response(response_text, market_context)
logger.info(
f"LLM decision: {decision['signal_type']} "
f"(confidence: {decision.get('confidence', 0):.2f})"
)
return decision
except Exception as e:
logger.error(f"Error generating LLM decision: {e}", exc_info=True)
logger.debug(f"Market context: {market_context}")
return self._error_response(str(e))
def _build_prompt(
self,
market_context: Dict[str, Any],
analysis: Optional[Dict[str, Any]]
) -> str:
"""Build trading decision prompt"""
# Extract context elements
market_state = market_context.get('market_state', {})
momentum = market_context.get('momentum', {})
signal_consensus = market_context.get('signal_consensus', 0.5)
current_price = market_context.get('current_price', 0)
kline_data = market_context.get('kline_data', {})
# Build structured prompt
prompt = f"""你是一个专业的加密货币交易分析师。基于以下多时间周期的K线数据和技术指标,提供分层次的交易建议。
**重要**: 你需要自己从K线数据中识别支撑位和压力位,不要依赖预先计算的值。
## 当前价格
${current_price:,.2f}
## 你的分析任务
1. **分析K线数据** - 识别各周期的支撑位、压力位、趋势结构
2. **结合技术指标** - RSI、MACD、成交量等确认信号
3. **给出交易建议** - 分短期/中期/长期三个级别
## 请提供以下内容 (使用JSON格式):
{{
"signal": "BUY" | "SELL" | "HOLD",
"confidence": 0.0-1.0,
// 你识别的关键价位
"key_levels": {{
"short_term": {{
"support": [支撑位数组],
"resistance": [压力位数组]
}},
"medium_term": {{
"support": [支撑位数组],
"resistance": [压力位数组]
}},
"long_term": {{
"support": [支撑位数组],
"resistance": [压力位数组]
}}
}},
// 分时间级别的交易机会分析 - 支持金字塔加仓的多级进场
"opportunities": {{
"short_term_5m_15m_1h": {{
"exists": true/false,
"direction": "LONG" | "SHORT" | null,
"entry_levels": [
{{"price": 首次进场价格, "ratio": 0.4, "reasoning": "首仓理由"}},
{{"price": 第二次进场价格, "ratio": 0.3, "reasoning": "加仓1理由"}},
{{"price": 第三次进场价格, "ratio": 0.2, "reasoning": "加仓2理由"}},
{{"price": 第四次进场价格, "ratio": 0.1, "reasoning": "加仓3理由"}}
],
"stop_loss": 止损价格数值或null,
"take_profit": 止盈价格数值或null,
"reasoning": "短期日内机会说明"
}},
"medium_term_4h_1d": {{
"exists": true/false,
"direction": "LONG" | "SHORT" | null,
"entry_levels": [
{{"price": 首次进场价格, "ratio": 0.4, "reasoning": "首仓理由"}},
{{"price": 第二次进场价格, "ratio": 0.3, "reasoning": "加仓1理由"}},
{{"price": 第三次进场价格, "ratio": 0.2, "reasoning": "加仓2理由"}},
{{"price": 第四次进场价格, "ratio": 0.1, "reasoning": "加仓3理由"}}
],
"stop_loss": 止损价格数值或null,
"take_profit": 止盈价格数值或null,
"reasoning": "中期波段机会说明"
}},
"long_term_1d_1w": {{
"exists": true/false,
"direction": "LONG" | "SHORT" | null,
"entry_levels": [
{{"price": 首次进场价格, "ratio": 0.4, "reasoning": "首仓理由"}},
{{"price": 第二次进场价格, "ratio": 0.3, "reasoning": "加仓1理由"}},
{{"price": 第三次进场价格, "ratio": 0.2, "reasoning": "加仓2理由"}},
{{"price": 第四次进场价格, "ratio": 0.1, "reasoning": "加仓3理由"}}
],
"stop_loss": 止损价格数值或null,
"take_profit": 止盈价格数值或null,
"reasoning": "长期趋势机会说明"
}},
"ambush": {{
"exists": true/false,
"price_level": 埋伏价格数值或null,
"reasoning": "埋伏点位说明"
}}
}},
// 分级别操作建议(必填)
"recommendations_by_timeframe": {{
"short_term": "短期(5m/15m/1h)操作建议",
"medium_term": "中期(4h/1d)操作建议",
"long_term": "长期(1d/1w)操作建议"
}},
"reasoning": "多周期综合分析",
"risk_level": "LOW" | "MEDIUM" | "HIGH",
"key_factors": ["影响因素1", "影响因素2", ...]
}}
**重要原则**:
1. **优先日内短线** - 重点关注 short_term_5m_15m_1h 的日内交易机会
2. **不同周期盈利要求不同** - 短期≥1%中期≥2%长期≥5%,不满足则 exists=false
3. **自行识别支撑压力位** - 从K线数据中找出重要的高低点作为支撑压力位
4. **响应必须是有效的JSON格式** - 不要包含注释
5. **金字塔加仓策略** - entry_levels 必须包含4个价位:
- 做多: 首仓价格最高,后续价位逐渐降低 (越跌越买)
- 做空: 首仓价格最低,后续价位逐渐升高 (越涨越卖)
- ratio总和=1.0 (0.4+0.3+0.2+0.1)
- 各级价位间距建议: 短期0.3-0.5%中期0.5-1%长期1-2%
"""
# Add multi-timeframe technical indicators
if 'multi_timeframe' in market_context:
mtf = market_context['multi_timeframe']
prompt += "\n## 各周期技术指标\n\n"
tf_order = [
('5m', '5分钟', '日内短线参考'),
('15m', '15分钟', '日内短线参考'),
('1h', '1小时', '短期趋势'),
('4h', '4小时', '中期趋势'),
('1d', '日线', '大趋势'),
('1w', '周线', '长期趋势')
]
for tf_key, tf_name, tf_desc in tf_order:
if tf_key not in mtf:
continue
data = mtf[tf_key]
quant = data.get('quantitative', {})
prompt += f"### {tf_name} ({tf_desc})\n"
prompt += f"- 量化评分: {quant.get('composite_score', 0):.1f} | 信号: {quant.get('signal_type', 'HOLD')}\n"
prompt += f"- 趋势: {data.get('trend_direction', '未知')} ({data.get('trend_strength', 'weak')})\n"
prompt += f"- RSI: {data.get('rsi', 50):.1f} ({data.get('rsi_status', '中性')})\n"
prompt += f"- MACD: {data.get('macd_signal', '未知')}\n"
prompt += f"- ATR: ${data.get('atr', 0):.2f} ({data.get('atr_pct', 0):.2f}%)\n"
vol_ratio = data.get('volume_ratio', 1)
vol_status = "放量" if vol_ratio > 1.2 else "缩量" if vol_ratio < 0.8 else "正常"
prompt += f"- 成交量: {vol_status} ({vol_ratio:.2f}x)\n\n"
# Add K-line data
if kline_data:
prompt += "\n## K线数据 (请从中识别支撑位和压力位)\n\n"
prompt += "格式: t=时间, o=开盘, h=最高, l=最低, c=收盘, v=成交量\n\n"
tf_kline_order = [
('5m', '5分钟K线 (最近1天)'),
('15m', '15分钟K线 (最近1天)'),
('1h', '1小时K线 (最近3天)'),
('4h', '4小时K线 (最近3天)'),
('1d', '日线K线 (最近30天)'),
('1w', '周线K线 (最近60周)')
]
for tf_key, tf_desc in tf_kline_order:
if tf_key not in kline_data:
continue
klines = kline_data[tf_key]
if not klines:
continue
prompt += f"### {tf_desc}\n"
prompt += "```\n"
# 对于短周期显示所有K线对于长周期可能只显示最近的一部分
display_klines = klines
for k in display_klines:
prompt += f"{k['t']} | o:{k['o']} h:{k['h']} l:{k['l']} c:{k['c']} v:{k['v']:.0f}\n"
prompt += "```\n\n"
# Add analysis guidelines
prompt += """
## 支撑压力位识别方法
1. **短期支撑压力 (5m/15m/1h)**
- 近1天内的明显高低点
- 多次触及但未突破的价格
- 整数关口 (如 91000, 92000)
2. **中期支撑压力 (4h/1d)**
- 近几天的重要高低点
- 趋势线位置
- 前期成交密集区
3. **长期支撑压力 (1d/1w)**
- 周线/月线级别的高低点
- 历史重要价格区间
- 大周期趋势线
## 止盈止损设置(不同周期要求不同)
- 短期 (5m/15m/1h): 止损 0.3%-0.5%, 止盈 ≥1%
- 中期 (4h/1d): 止损 1%-2%, 止盈 ≥2%
- 长期 (1d/1w): 止损 2%-4%, 止盈 ≥5%
重要:各周期的盈利空间必须满足最低要求才给出建议:
- 短期机会: (take_profit - entry) / entry ≥ 1%
- 中期机会: (take_profit - entry) / entry ≥ 2%
- 长期机会: (take_profit - entry) / entry ≥ 5%
"""
return prompt
def _call_llm(self, prompt: str) -> str:
"""Call LLM API"""
if self.provider == 'claude':
return self._call_claude(prompt)
elif self.provider == 'openai':
return self._call_openai(prompt)
else:
raise ValueError(f"Unsupported provider: {self.provider}")
def _call_claude(self, prompt: str) -> str:
"""Call Claude API"""
try:
response = self.client.messages.create(
model=self.model,
max_tokens=1500,
temperature=0.7,
messages=[
{"role": "user", "content": prompt}
]
)
return response.content[0].text
except Exception as e:
logger.error(f"Claude API error: {e}")
raise
def _call_openai(self, prompt: str) -> str:
"""Call OpenAI API"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": "You are a professional cryptocurrency trading analyst. Provide trading advice in JSON format."
},
{"role": "user", "content": prompt}
],
max_tokens=1500,
temperature=0.7,
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"OpenAI API error: {e}")
raise
def _parse_response(
self,
response_text: str,
market_context: Dict[str, Any]
) -> Dict[str, Any]:
"""Parse LLM response into structured decision"""
# Try to extract JSON from response
json_match = re.search(r'\{[\s\S]*\}', response_text)
if not json_match:
logger.warning("No JSON found in LLM response, using fallback parsing")
return self._fallback_parse(response_text, market_context)
try:
llm_decision = json.loads(json_match.group())
logger.debug(f"Parsed LLM JSON: {llm_decision}")
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON: {e}, using fallback")
logger.debug(f"JSON match was: {json_match.group()[:500]}")
return self._fallback_parse(response_text, market_context)
# Helper function to safely convert to float
def safe_float(value, default=0.0):
"""Safely convert value to float, handling None and invalid values"""
if value is None:
return default
try:
return float(value)
except (ValueError, TypeError):
return default
# Helper function to calculate profit percentage
def calc_profit_pct(entry, take_profit, direction):
"""Calculate profit percentage for a trade"""
if not entry or not take_profit or entry <= 0:
return 0
if direction == 'LONG':
return (take_profit - entry) / entry * 100
elif direction == 'SHORT':
return (entry - take_profit) / entry * 100
return 0
# Helper function to check if opportunity meets minimum profit threshold
def meets_profit_threshold(opp, min_profit_pct=1.0):
"""Check if opportunity has at least min_profit_pct profit potential"""
if not opp.get('exists'):
return False
entry = safe_float(opp.get('entry_price'), 0)
tp = safe_float(opp.get('take_profit'), 0)
direction = opp.get('direction')
profit_pct = calc_profit_pct(entry, tp, direction)
return profit_pct >= min_profit_pct
# Parse opportunities structure (support both old and new format)
opportunities = llm_decision.get('opportunities', {})
# Helper function to normalize entry_levels
def normalize_entry_levels(opp: dict, direction: str, current_price: float) -> list:
"""Normalize entry_levels format, handling both new and old formats"""
entry_levels = opp.get('entry_levels', [])
if entry_levels and isinstance(entry_levels, list):
# New format with entry_levels array
normalized = []
for i, level in enumerate(entry_levels[:4]): # Max 4 levels
if isinstance(level, dict):
normalized.append({
'price': safe_float(level.get('price'), 0),
'ratio': safe_float(level.get('ratio'), [0.4, 0.3, 0.2, 0.1][i]),
'reasoning': level.get('reasoning', ''),
'level': i,
})
elif isinstance(level, (int, float)):
normalized.append({
'price': safe_float(level, 0),
'ratio': [0.4, 0.3, 0.2, 0.1][i],
'reasoning': '',
'level': i,
})
return normalized
# Fallback: convert old single entry_price format to entry_levels
entry_price = safe_float(opp.get('entry_price'), 0)
if entry_price <= 0:
entry_price = current_price
# Generate 4 levels with default spacing
levels = []
if direction == 'LONG':
# For LONG: first entry highest, subsequent entries lower
spacings = [0, 0.003, 0.006, 0.010] # 0%, 0.3%, 0.6%, 1%
for i, spacing in enumerate(spacings):
levels.append({
'price': round(entry_price * (1 - spacing), 2),
'ratio': [0.4, 0.3, 0.2, 0.1][i],
'reasoning': f'Level {i+1}' if i > 0 else 'Initial entry',
'level': i,
})
else: # SHORT
# For SHORT: first entry lowest, subsequent entries higher
spacings = [0, 0.003, 0.006, 0.010]
for i, spacing in enumerate(spacings):
levels.append({
'price': round(entry_price * (1 + spacing), 2),
'ratio': [0.4, 0.3, 0.2, 0.1][i],
'reasoning': f'Level {i+1}' if i > 0 else 'Initial entry',
'level': i,
})
return levels
# Try new format first
short_term = opportunities.get('short_term_5m_15m_1h', {})
medium_term = opportunities.get('medium_term_4h_1d', {})
long_term = opportunities.get('long_term_1d_1w', {})
ambush = opportunities.get('ambush', {})
# Fallback to old format for backward compatibility
if not short_term and not medium_term and not long_term:
intraday = opportunities.get('intraday', {})
swing = opportunities.get('swing', {})
# Map old format to new format
short_term = intraday
medium_term = swing
long_term = {}
# Apply minimum profit filter to all opportunities - 不同周期不同要求
MIN_PROFIT_SHORT = settings.MIN_PROFIT_PCT_SHORT # 短周期 1%
MIN_PROFIT_MEDIUM = settings.MIN_PROFIT_PCT_MEDIUM # 中周期 2%
MIN_PROFIT_LONG = settings.MIN_PROFIT_PCT_LONG # 长周期 5%
# Filter short_term (最低 1%)
short_term_valid = meets_profit_threshold(short_term, MIN_PROFIT_SHORT)
if short_term.get('exists') and not short_term_valid:
profit_pct = calc_profit_pct(
safe_float(short_term.get('entry_price'), 0),
safe_float(short_term.get('take_profit'), 0),
short_term.get('direction')
)
logger.info(f"短期机会被过滤: 盈利空间 {profit_pct:.2f}% < {MIN_PROFIT_SHORT}%")
short_term = {'exists': False, 'reasoning': f'盈利空间不足{MIN_PROFIT_SHORT}% (仅{profit_pct:.2f}%),建议观望'}
# Filter medium_term (最低 2%)
medium_term_valid = meets_profit_threshold(medium_term, MIN_PROFIT_MEDIUM)
if medium_term.get('exists') and not medium_term_valid:
profit_pct = calc_profit_pct(
safe_float(medium_term.get('entry_price'), 0),
safe_float(medium_term.get('take_profit'), 0),
medium_term.get('direction')
)
logger.info(f"中期机会被过滤: 盈利空间 {profit_pct:.2f}% < {MIN_PROFIT_MEDIUM}%")
medium_term = {'exists': False, 'reasoning': f'盈利空间不足{MIN_PROFIT_MEDIUM}% (仅{profit_pct:.2f}%),建议观望'}
# Filter long_term (最低 5%)
long_term_valid = meets_profit_threshold(long_term, MIN_PROFIT_LONG)
if long_term.get('exists') and not long_term_valid:
profit_pct = calc_profit_pct(
safe_float(long_term.get('entry_price'), 0),
safe_float(long_term.get('take_profit'), 0),
long_term.get('direction')
)
logger.info(f"长期机会被过滤: 盈利空间 {profit_pct:.2f}% < {MIN_PROFIT_LONG}%")
long_term = {'exists': False, 'reasoning': f'盈利空间不足{MIN_PROFIT_LONG}% (仅{profit_pct:.2f}%),建议观望'}
# Determine primary levels (priority: short > medium > long) - 优先日内短线
entry = market_context.get('current_price', 0)
stop_loss = 0
take_profit = 0
if short_term.get('exists'):
entry = safe_float(short_term.get('entry_price'), market_context.get('current_price', 0))
stop_loss = safe_float(short_term.get('stop_loss'), 0)
take_profit = safe_float(short_term.get('take_profit'), 0)
elif medium_term.get('exists'):
entry = safe_float(medium_term.get('entry_price'), market_context.get('current_price', 0))
stop_loss = safe_float(medium_term.get('stop_loss'), 0)
take_profit = safe_float(medium_term.get('take_profit'), 0)
elif long_term.get('exists'):
entry = safe_float(long_term.get('entry_price'), market_context.get('current_price', 0))
stop_loss = safe_float(long_term.get('stop_loss'), 0)
take_profit = safe_float(long_term.get('take_profit'), 0)
# Get recommendations by timeframe
recommendations = llm_decision.get('recommendations_by_timeframe', {})
# Get current price for entry level normalization
current_price = market_context.get('current_price', 0)
# Normalize entry_levels for each opportunity
short_term_levels = normalize_entry_levels(
short_term, short_term.get('direction', 'LONG'), current_price
) if short_term.get('exists') else []
medium_term_levels = normalize_entry_levels(
medium_term, medium_term.get('direction', 'LONG'), current_price
) if medium_term.get('exists') else []
long_term_levels = normalize_entry_levels(
long_term, long_term.get('direction', 'LONG'), current_price
) if long_term.get('exists') else []
# Get first entry price for backward compatibility
def get_first_entry(levels: list, fallback: float) -> float:
if levels and len(levels) > 0:
return levels[0].get('price', fallback)
return fallback
# Validate and structure decision
decision = {
'timestamp': datetime.now().isoformat(),
'signal_type': llm_decision.get('signal', 'HOLD').upper(),
'confidence': safe_float(llm_decision.get('confidence'), 0.5),
'trade_type': 'MULTI_TIMEFRAME', # New format uses multiple timeframes
'reasoning': llm_decision.get('reasoning', ''),
# New opportunities breakdown (multi-timeframe) with entry_levels
'opportunities': {
'short_term_5m_15m_1h': {
'exists': short_term.get('exists', False),
'direction': short_term.get('direction'),
'entry_levels': short_term_levels, # New: array of entry levels for pyramiding
'entry_price': get_first_entry(short_term_levels, safe_float(short_term.get('entry_price'), 0)), # Backward compat
'stop_loss': safe_float(short_term.get('stop_loss'), 0),
'take_profit': safe_float(short_term.get('take_profit'), 0),
'reasoning': short_term.get('reasoning', '')
},
'medium_term_4h_1d': {
'exists': medium_term.get('exists', False),
'direction': medium_term.get('direction'),
'entry_levels': medium_term_levels,
'entry_price': get_first_entry(medium_term_levels, safe_float(medium_term.get('entry_price'), 0)),
'stop_loss': safe_float(medium_term.get('stop_loss'), 0),
'take_profit': safe_float(medium_term.get('take_profit'), 0),
'reasoning': medium_term.get('reasoning', '')
},
'long_term_1d_1w': {
'exists': long_term.get('exists', False),
'direction': long_term.get('direction'),
'entry_levels': long_term_levels,
'entry_price': get_first_entry(long_term_levels, safe_float(long_term.get('entry_price'), 0)),
'stop_loss': safe_float(long_term.get('stop_loss'), 0),
'take_profit': safe_float(long_term.get('take_profit'), 0),
'reasoning': long_term.get('reasoning', '')
},
'ambush': {
'exists': ambush.get('exists', False),
'price_level': safe_float(ambush.get('price_level'), 0),
'reasoning': ambush.get('reasoning', '')
},
# Keep old format for backward compatibility
'intraday': {
'exists': short_term.get('exists', False),
'direction': short_term.get('direction'),
'entry_levels': short_term_levels,
'entry_price': get_first_entry(short_term_levels, safe_float(short_term.get('entry_price'), 0)),
'stop_loss': safe_float(short_term.get('stop_loss'), 0),
'take_profit': safe_float(short_term.get('take_profit'), 0),
'reasoning': short_term.get('reasoning', '')
},
'swing': {
'exists': medium_term.get('exists', False) or long_term.get('exists', False),
'direction': medium_term.get('direction') or long_term.get('direction'),
'entry_levels': medium_term_levels if medium_term.get('exists') else long_term_levels,
'entry_price': safe_float(medium_term.get('entry_price') or long_term.get('entry_price'), 0),
'stop_loss': safe_float(medium_term.get('stop_loss') or long_term.get('stop_loss'), 0),
'take_profit': safe_float(medium_term.get('take_profit') or long_term.get('take_profit'), 0),
'reasoning': medium_term.get('reasoning', '') or long_term.get('reasoning', '')
},
},
# Recommendations by timeframe
'recommendations_by_timeframe': {
'short_term': recommendations.get('short_term', ''),
'medium_term': recommendations.get('medium_term', ''),
'long_term': recommendations.get('long_term', '')
},
# Primary levels (for backward compatibility)
'levels': {
'current_price': market_context.get('current_price', 0),
'entry': entry,
'stop_loss': stop_loss,
'take_profit_1': take_profit,
'take_profit_2': take_profit,
'take_profit_3': take_profit,
},
'risk_level': llm_decision.get('risk_level', 'MEDIUM'),
'key_factors': llm_decision.get('key_factors', []),
'raw_response': response_text,
}
# Calculate risk-reward ratio
entry = decision['levels']['entry']
stop_loss = decision['levels']['stop_loss']
tp1 = decision['levels']['take_profit_1']
if entry and stop_loss and tp1 and entry != stop_loss:
risk = abs(entry - stop_loss)
reward = abs(tp1 - entry)
decision['risk_reward_ratio'] = round(reward / risk, 2) if risk > 0 else 0
else:
decision['risk_reward_ratio'] = 0
return decision
def _fallback_parse(
self,
response_text: str,
market_context: Dict[str, Any]
) -> Dict[str, Any]:
"""Fallback parsing when JSON extraction fails"""
# Simple keyword-based signal extraction
text_lower = response_text.lower()
if 'buy' in text_lower or '买入' in response_text or '做多' in response_text:
signal_type = 'BUY'
confidence = 0.6
elif 'sell' in text_lower or '卖出' in response_text or '做空' in response_text:
signal_type = 'SELL'
confidence = 0.6
else:
signal_type = 'HOLD'
confidence = 0.5
return {
'timestamp': datetime.now().isoformat(),
'signal_type': signal_type,
'confidence': confidence,
'reasoning': response_text[:500], # First 500 chars
'levels': {
'current_price': market_context.get('current_price', 0),
'entry': 0,
'stop_loss': 0,
'take_profit_1': 0,
'take_profit_2': 0,
'take_profit_3': 0,
},
'risk_level': 'MEDIUM',
'time_horizon': 'MEDIUM',
'key_factors': [],
'raw_response': response_text,
'warning': 'Fallback parsing used - levels not available',
}
def _disabled_response(self) -> Dict[str, Any]:
"""Return response when LLM is disabled"""
return {
'timestamp': datetime.now().isoformat(),
'signal_type': 'HOLD',
'confidence': 0,
'reasoning': 'LLM decision maker is disabled (no API key)',
'enabled': False,
}
def _error_response(self, error_msg: str) -> Dict[str, Any]:
"""Return error response"""
return {
'timestamp': datetime.now().isoformat(),
'signal_type': 'HOLD',
'confidence': 0,
'reasoning': f'Error generating decision: {error_msg}',
'error': error_msg,
}