tradusai/signals/llm_decision.py
2025-12-11 22:51:51 +08:00

840 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
LLM Decision Maker - Use Claude/GPT for trading decisions
"""
import logging
import json
import re
from typing import Dict, Any, Optional
from datetime import datetime
import os
from config.settings import settings
logger = logging.getLogger(__name__)
class LLMDecisionMaker:
"""Generate trading decisions using LLM (Claude or OpenAI)"""
def __init__(self, provider: str = 'claude', api_key: Optional[str] = None):
"""
Initialize LLM decision maker
Args:
provider: 'claude' or 'openai'
api_key: API key (or use environment variable)
"""
self.provider = provider.lower()
self.api_key = api_key or self._get_api_key()
if not self.api_key:
logger.warning(f"No API key found for {provider}. LLM decisions will be disabled.")
self.enabled = False
else:
self.enabled = True
self._init_client()
def _get_api_key(self) -> Optional[str]:
"""Get API key from environment"""
if self.provider == 'claude':
return os.getenv('ANTHROPIC_API_KEY')
elif self.provider == 'openai':
return os.getenv('OPENAI_API_KEY')
return None
def _init_client(self):
"""Initialize LLM client"""
try:
if self.provider == 'claude':
import anthropic
self.client = anthropic.Anthropic(api_key=self.api_key)
self.model = "claude-3-5-sonnet-20241022"
elif self.provider == 'openai':
import openai
# Support custom base URL (for Deepseek, etc.)
base_url = os.getenv('OPENAI_BASE_URL')
if base_url:
self.client = openai.OpenAI(
api_key=self.api_key,
base_url=base_url
)
# Use appropriate model for the endpoint
if 'deepseek' in base_url.lower():
self.model = "deepseek-chat"
logger.info("Using Deepseek API endpoint")
else:
self.model = "gpt-4-turbo-preview"
else:
self.client = openai.OpenAI(api_key=self.api_key)
self.model = "gpt-4-turbo-preview"
logger.info(f"Initialized {self.provider} client with model {self.model}")
except ImportError as e:
logger.error(f"Failed to import {self.provider} library: {e}")
self.enabled = False
except Exception as e:
logger.error(f"Failed to initialize {self.provider} client: {e}")
self.enabled = False
def generate_decision(
self,
market_context: Dict[str, Any],
analysis: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Generate trading decision using LLM
Args:
market_context: LLM context from LLMContextBuilder
analysis: Optional full analysis for additional details
Returns:
Decision dict with signal, reasoning, and levels
"""
if not self.enabled:
return self._disabled_response()
try:
# Build prompt
prompt = self._build_prompt(market_context, analysis)
# Log the complete prompt being sent to LLM
logger.info("=" * 80)
logger.info("📤 完整的 LLM 提示词 (发送给 Deepseek):")
logger.info("=" * 80)
logger.info(prompt)
logger.info("=" * 80)
# Call LLM
response_text = self._call_llm(prompt)
# Log the LLM response
logger.info("=" * 80)
logger.info("📥 LLM 原始响应:")
logger.info("=" * 80)
logger.info(response_text)
logger.info("=" * 80)
# Parse response
decision = self._parse_response(response_text, market_context)
logger.info(
f"LLM decision: {decision['signal_type']} "
f"(confidence: {decision.get('confidence', 0):.2f})"
)
return decision
except Exception as e:
logger.error(f"Error generating LLM decision: {e}", exc_info=True)
logger.debug(f"Market context: {market_context}")
return self._error_response(str(e))
def _build_prompt(
self,
market_context: Dict[str, Any],
analysis: Optional[Dict[str, Any]]
) -> str:
"""Build trading decision prompt"""
# Extract context elements
market_state = market_context.get('market_state', {})
momentum = market_context.get('momentum', {})
signal_consensus = market_context.get('signal_consensus', 0.5)
current_price = market_context.get('current_price', 0)
kline_data = market_context.get('kline_data', {})
# Build structured prompt - 自动化交易友好的JSON格式
prompt = f"""你是一个专业的加密货币交易分析师。基于以下多时间周期的K线数据和技术指标,提供精确的交易信号。
## 当前价格: ${current_price:,.2f}
## 输出要求
请严格按照以下JSON Schema输出所有字段必须存在价格字段必须是数字(无机会时用0):
```json
{{
"signal": "BUY|SELL|HOLD",
"confidence": 0.65,
"risk_level": "LOW|MEDIUM|HIGH",
"market_bias": "BULLISH|BEARISH|NEUTRAL",
"trades": [
{{
"id": "short_001",
"timeframe": "short",
"status": "ACTIVE|INACTIVE",
"direction": "LONG|SHORT|NONE",
"entry": {{
"price_1": 90000.00,
"price_2": 89700.00,
"price_3": 89400.00,
"price_4": 89100.00
}},
"exit": {{
"stop_loss": 88500.00,
"take_profit_1": 91000.00,
"take_profit_2": 92000.00,
"take_profit_3": 93000.00
}},
"position": {{
"size_pct_1": 40,
"size_pct_2": 30,
"size_pct_3": 20,
"size_pct_4": 10
}},
"risk_reward": 2.5,
"expected_profit_pct": 1.5,
"reasoning": "简要说明"
}},
{{
"id": "medium_001",
"timeframe": "medium",
"status": "ACTIVE|INACTIVE",
"direction": "LONG|SHORT|NONE",
"entry": {{
"price_1": 89500.00,
"price_2": 89000.00,
"price_3": 88500.00,
"price_4": 88000.00
}},
"exit": {{
"stop_loss": 86000.00,
"take_profit_1": 93000.00,
"take_profit_2": 95000.00,
"take_profit_3": 98000.00
}},
"position": {{
"size_pct_1": 40,
"size_pct_2": 30,
"size_pct_3": 20,
"size_pct_4": 10
}},
"risk_reward": 2.0,
"expected_profit_pct": 3.5,
"reasoning": "简要说明"
}},
{{
"id": "long_001",
"timeframe": "long",
"status": "ACTIVE|INACTIVE",
"direction": "LONG|SHORT|NONE",
"entry": {{
"price_1": 88000.00,
"price_2": 86000.00,
"price_3": 84000.00,
"price_4": 82000.00
}},
"exit": {{
"stop_loss": 78000.00,
"take_profit_1": 95000.00,
"take_profit_2": 100000.00,
"take_profit_3": 110000.00
}},
"position": {{
"size_pct_1": 40,
"size_pct_2": 30,
"size_pct_3": 20,
"size_pct_4": 10
}},
"risk_reward": 2.0,
"expected_profit_pct": 8.0,
"reasoning": "简要说明"
}}
],
"key_levels": {{
"support": [89000.00, 88000.00, 86000.00],
"resistance": [92000.00, 94000.00, 96000.00]
}},
"analysis": {{
"trend": "UP|DOWN|SIDEWAYS",
"momentum": "STRONG|WEAK|NEUTRAL",
"volume": "HIGH|LOW|NORMAL",
"summary": "一句话市场总结"
}},
"key_factors": ["因素1", "因素2", "因素3"]
}}
```
## 字段说明
**trades数组** (必须包含3个元素分别对应short/medium/long):
- `timeframe`: "short"=5m/15m/1h, "medium"=4h/1d, "long"=1d/1w
- `status`: "ACTIVE"=有交易机会, "INACTIVE"=暂无机会
- `direction`: "LONG"=做多, "SHORT"=做空, "NONE"=无方向
- `entry.price_1~4`: 金字塔4级进场价做多时price_1最高逐渐降低做空时price_1最低逐渐升高
- `exit.stop_loss`: 统一止损价
- `exit.take_profit_1~3`: 3级止盈目标
- `position.size_pct_1~4`: 各级仓位百分比,总和=100
- `risk_reward`: 风险回报比
- `expected_profit_pct`: 预期盈利百分比
**盈利要求** (不满足时status=INACTIVE):
- short: expected_profit_pct >= 1.0%
- medium: expected_profit_pct >= 2.0%
- long: expected_profit_pct >= 5.0%
**价格间距建议**:
- short: 各级入场价间距 0.3-0.5%
- medium: 各级入场价间距 0.5-1.0%
- long: 各级入场价间距 1.0-2.0%
## 重要规则
1. **所有价格字段必须是数字**无机会时填0不要用null
2. **trades数组必须有3个元素**,顺序: short, medium, long
3. **status=INACTIVE时**direction设为"NONE"所有价格设为0
4. **只输出JSON**,不要有其他文字
"""
# Add multi-timeframe technical indicators
if 'multi_timeframe' in market_context:
mtf = market_context['multi_timeframe']
prompt += "\n## 各周期技术指标\n\n"
tf_order = [
('5m', '5分钟', '日内短线参考'),
('15m', '15分钟', '日内短线参考'),
('1h', '1小时', '短期趋势'),
('4h', '4小时', '中期趋势'),
('1d', '日线', '大趋势'),
('1w', '周线', '长期趋势')
]
for tf_key, tf_name, tf_desc in tf_order:
if tf_key not in mtf:
continue
data = mtf[tf_key]
quant = data.get('quantitative', {})
prompt += f"### {tf_name} ({tf_desc})\n"
prompt += f"- 量化评分: {quant.get('composite_score', 0):.1f} | 信号: {quant.get('signal_type', 'HOLD')}\n"
prompt += f"- 趋势: {data.get('trend_direction', '未知')} ({data.get('trend_strength', 'weak')})\n"
prompt += f"- RSI: {data.get('rsi', 50):.1f} ({data.get('rsi_status', '中性')})\n"
prompt += f"- MACD: {data.get('macd_signal', '未知')}\n"
prompt += f"- ATR: ${data.get('atr', 0):.2f} ({data.get('atr_pct', 0):.2f}%)\n"
vol_ratio = data.get('volume_ratio', 1)
vol_status = "放量" if vol_ratio > 1.2 else "缩量" if vol_ratio < 0.8 else "正常"
prompt += f"- 成交量: {vol_status} ({vol_ratio:.2f}x)\n\n"
# Add K-line data
if kline_data:
prompt += "\n## K线数据 (请从中识别支撑位和压力位)\n\n"
prompt += "格式: t=时间, o=开盘, h=最高, l=最低, c=收盘, v=成交量\n\n"
tf_kline_order = [
('5m', '5分钟K线 (最近1天)'),
('15m', '15分钟K线 (最近1天)'),
('1h', '1小时K线 (最近3天)'),
('4h', '4小时K线 (最近3天)'),
('1d', '日线K线 (最近30天)'),
('1w', '周线K线 (最近60周)')
]
for tf_key, tf_desc in tf_kline_order:
if tf_key not in kline_data:
continue
klines = kline_data[tf_key]
if not klines:
continue
prompt += f"### {tf_desc}\n"
prompt += "```\n"
# 对于短周期显示所有K线对于长周期可能只显示最近的一部分
display_klines = klines
for k in display_klines:
prompt += f"{k['t']} | o:{k['o']} h:{k['h']} l:{k['l']} c:{k['c']} v:{k['v']:.0f}\n"
prompt += "```\n\n"
# Add analysis guidelines
prompt += """
## 分析指南
### 支撑压力位识别
1. **短期**: 近1天内明显高低点、整数关口
2. **中期**: 近几天重要高低点、趋势线
3. **长期**: 周线/月线级别高低点
### 止盈止损设置
- **short**: 止损0.3-0.5%, 止盈≥1%
- **medium**: 止损1-2%, 止盈≥2%
- **long**: 止损2-4%, 止盈≥5%
### 最终检查
1. 确保JSON格式正确无注释
2. 确保trades数组有3个元素
3. 确保所有价格是数字不是null
4. 确保INACTIVE时所有价格为0
"""
return prompt
def _call_llm(self, prompt: str) -> str:
"""Call LLM API"""
if self.provider == 'claude':
return self._call_claude(prompt)
elif self.provider == 'openai':
return self._call_openai(prompt)
else:
raise ValueError(f"Unsupported provider: {self.provider}")
def _call_claude(self, prompt: str) -> str:
"""Call Claude API"""
try:
response = self.client.messages.create(
model=self.model,
max_tokens=1500,
temperature=0.7,
messages=[
{"role": "user", "content": prompt}
]
)
return response.content[0].text
except Exception as e:
logger.error(f"Claude API error: {e}")
raise
def _call_openai(self, prompt: str) -> str:
"""Call OpenAI API"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": "You are a professional cryptocurrency trading analyst. Provide trading advice in JSON format."
},
{"role": "user", "content": prompt}
],
max_tokens=1500,
temperature=0.7,
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"OpenAI API error: {e}")
raise
def _parse_response(
self,
response_text: str,
market_context: Dict[str, Any]
) -> Dict[str, Any]:
"""Parse LLM response into structured decision - 支持新版trades数组格式"""
# Try to extract JSON from response
json_match = re.search(r'\{[\s\S]*\}', response_text)
if not json_match:
logger.warning("No JSON found in LLM response, using fallback parsing")
return self._fallback_parse(response_text, market_context)
try:
llm_decision = json.loads(json_match.group())
logger.debug(f"Parsed LLM JSON: {llm_decision}")
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON: {e}, using fallback")
logger.debug(f"JSON match was: {json_match.group()[:500]}")
return self._fallback_parse(response_text, market_context)
# Helper function to safely convert to float
def safe_float(value, default=0.0):
if value is None:
return default
try:
return float(value)
except (ValueError, TypeError):
return default
current_price = market_context.get('current_price', 0)
# ========== 检测新版trades数组格式 ==========
trades = llm_decision.get('trades', [])
if trades and isinstance(trades, list) and len(trades) >= 3:
# 新版格式trades数组
return self._parse_new_format(llm_decision, market_context, safe_float)
else:
# 旧版格式opportunities对象
return self._parse_old_format(llm_decision, market_context, safe_float)
def _parse_new_format(
self,
llm_decision: Dict[str, Any],
market_context: Dict[str, Any],
safe_float
) -> Dict[str, Any]:
"""解析新版trades数组格式"""
current_price = market_context.get('current_price', 0)
trades = llm_decision.get('trades', [])
# 构建trades字典 (by timeframe)
trades_by_tf = {}
for trade in trades:
tf = trade.get('timeframe', '')
if tf in ['short', 'medium', 'long']:
trades_by_tf[tf] = trade
# 转换为统一的opportunities格式 (向后兼容)
def convert_trade_to_opportunity(trade: Dict, tf_key: str) -> Dict:
"""将新格式trade转换为旧格式opportunity"""
if not trade or trade.get('status') != 'ACTIVE':
return {
'exists': False,
'direction': None,
'entry_levels': [],
'entry_price': 0,
'stop_loss': 0,
'take_profit': 0,
'reasoning': trade.get('reasoning', '') if trade else ''
}
entry = trade.get('entry', {})
exit_data = trade.get('exit', {})
position = trade.get('position', {})
# 构建entry_levels
entry_levels = []
for i in range(1, 5):
price = safe_float(entry.get(f'price_{i}'), 0)
ratio = safe_float(position.get(f'size_pct_{i}'), [40, 30, 20, 10][i-1]) / 100
if price > 0:
entry_levels.append({
'price': price,
'ratio': ratio,
'level': i - 1,
'reasoning': ''
})
return {
'exists': True,
'direction': trade.get('direction', 'NONE'),
'entry_levels': entry_levels,
'entry_price': safe_float(entry.get('price_1'), 0),
'stop_loss': safe_float(exit_data.get('stop_loss'), 0),
'take_profit': safe_float(exit_data.get('take_profit_1'), 0),
'take_profit_2': safe_float(exit_data.get('take_profit_2'), 0),
'take_profit_3': safe_float(exit_data.get('take_profit_3'), 0),
'risk_reward': safe_float(trade.get('risk_reward'), 0),
'expected_profit_pct': safe_float(trade.get('expected_profit_pct'), 0),
'reasoning': trade.get('reasoning', '')
}
short_opp = convert_trade_to_opportunity(trades_by_tf.get('short'), 'short')
medium_opp = convert_trade_to_opportunity(trades_by_tf.get('medium'), 'medium')
long_opp = convert_trade_to_opportunity(trades_by_tf.get('long'), 'long')
# 获取分析数据
analysis = llm_decision.get('analysis', {})
key_levels = llm_decision.get('key_levels', {})
# 构建最终decision
decision = {
'timestamp': datetime.now().isoformat(),
'signal_type': llm_decision.get('signal', 'HOLD').upper(),
'confidence': safe_float(llm_decision.get('confidence'), 0.5),
'trade_type': 'MULTI_TIMEFRAME',
'market_bias': llm_decision.get('market_bias', 'NEUTRAL'),
'risk_level': llm_decision.get('risk_level', 'MEDIUM'),
# 新版trades数组 (原始格式保留)
'trades': trades,
# 向后兼容的opportunities格式
'opportunities': {
'short_term_5m_15m_1h': short_opp,
'medium_term_4h_1d': medium_opp,
'long_term_1d_1w': long_opp,
# 向后兼容
'intraday': short_opp,
'swing': medium_opp if medium_opp.get('exists') else long_opp,
},
# 分析数据
'analysis': {
'trend': analysis.get('trend', 'SIDEWAYS'),
'momentum': analysis.get('momentum', 'NEUTRAL'),
'volume': analysis.get('volume', 'NORMAL'),
'summary': analysis.get('summary', ''),
},
# 关键价位
'key_levels': {
'support': key_levels.get('support', []),
'resistance': key_levels.get('resistance', []),
},
'key_factors': llm_decision.get('key_factors', []),
'reasoning': analysis.get('summary', ''),
# 价格水平 (向后兼容)
'levels': {
'current_price': current_price,
'entry': short_opp['entry_price'] or medium_opp['entry_price'] or long_opp['entry_price'],
'stop_loss': short_opp['stop_loss'] or medium_opp['stop_loss'] or long_opp['stop_loss'],
'take_profit_1': short_opp['take_profit'] or medium_opp['take_profit'] or long_opp['take_profit'],
'take_profit_2': short_opp.get('take_profit_2', 0) or medium_opp.get('take_profit_2', 0),
'take_profit_3': short_opp.get('take_profit_3', 0) or medium_opp.get('take_profit_3', 0),
},
'raw_response': '', # Will be set by caller
}
# Calculate risk-reward ratio
entry = decision['levels']['entry']
stop_loss = decision['levels']['stop_loss']
tp1 = decision['levels']['take_profit_1']
if entry and stop_loss and tp1 and entry != stop_loss:
risk = abs(entry - stop_loss)
reward = abs(tp1 - entry)
decision['risk_reward_ratio'] = round(reward / risk, 2) if risk > 0 else 0
else:
decision['risk_reward_ratio'] = 0
return decision
def _parse_old_format(
self,
llm_decision: Dict[str, Any],
market_context: Dict[str, Any],
safe_float
) -> Dict[str, Any]:
"""解析旧版opportunities格式 (向后兼容)"""
current_price = market_context.get('current_price', 0)
# Helper functions
def calc_profit_pct(entry, take_profit, direction):
if not entry or not take_profit or entry <= 0:
return 0
if direction == 'LONG':
return (take_profit - entry) / entry * 100
elif direction == 'SHORT':
return (entry - take_profit) / entry * 100
return 0
def meets_profit_threshold(opp, min_profit_pct=1.0):
if not opp.get('exists'):
return False
entry = safe_float(opp.get('entry_price'), 0)
tp = safe_float(opp.get('take_profit'), 0)
direction = opp.get('direction')
profit_pct = calc_profit_pct(entry, tp, direction)
return profit_pct >= min_profit_pct
def normalize_entry_levels(opp: dict, direction: str) -> list:
entry_levels = opp.get('entry_levels', [])
if entry_levels and isinstance(entry_levels, list):
normalized = []
for i, level in enumerate(entry_levels[:4]):
if isinstance(level, dict):
normalized.append({
'price': safe_float(level.get('price'), 0),
'ratio': safe_float(level.get('ratio'), [0.4, 0.3, 0.2, 0.1][i]),
'level': i,
})
return normalized
entry_price = safe_float(opp.get('entry_price'), current_price)
levels = []
spacings = [0, 0.003, 0.006, 0.010]
for i, spacing in enumerate(spacings):
if direction == 'LONG':
price = round(entry_price * (1 - spacing), 2)
else:
price = round(entry_price * (1 + spacing), 2)
levels.append({'price': price, 'ratio': [0.4, 0.3, 0.2, 0.1][i], 'level': i})
return levels
# Parse opportunities
opportunities = llm_decision.get('opportunities', {})
short_term = opportunities.get('short_term_5m_15m_1h', {})
medium_term = opportunities.get('medium_term_4h_1d', {})
long_term = opportunities.get('long_term_1d_1w', {})
if not short_term and not medium_term and not long_term:
short_term = opportunities.get('intraday', {})
medium_term = opportunities.get('swing', {})
# Apply profit filters
MIN_PROFIT_SHORT = settings.MIN_PROFIT_PCT_SHORT
MIN_PROFIT_MEDIUM = settings.MIN_PROFIT_PCT_MEDIUM
MIN_PROFIT_LONG = settings.MIN_PROFIT_PCT_LONG
if short_term.get('exists') and not meets_profit_threshold(short_term, MIN_PROFIT_SHORT):
short_term = {'exists': False, 'reasoning': '盈利空间不足'}
if medium_term.get('exists') and not meets_profit_threshold(medium_term, MIN_PROFIT_MEDIUM):
medium_term = {'exists': False, 'reasoning': '盈利空间不足'}
if long_term.get('exists') and not meets_profit_threshold(long_term, MIN_PROFIT_LONG):
long_term = {'exists': False, 'reasoning': '盈利空间不足'}
# Normalize entry levels
short_levels = normalize_entry_levels(short_term, short_term.get('direction', 'LONG')) if short_term.get('exists') else []
medium_levels = normalize_entry_levels(medium_term, medium_term.get('direction', 'LONG')) if medium_term.get('exists') else []
long_levels = normalize_entry_levels(long_term, long_term.get('direction', 'LONG')) if long_term.get('exists') else []
def get_first_entry(levels, fallback):
return levels[0]['price'] if levels else fallback
# Build decision
recommendations = llm_decision.get('recommendations_by_timeframe', {})
decision = {
'timestamp': datetime.now().isoformat(),
'signal_type': llm_decision.get('signal', 'HOLD').upper(),
'confidence': safe_float(llm_decision.get('confidence'), 0.5),
'trade_type': 'MULTI_TIMEFRAME',
'reasoning': llm_decision.get('reasoning', ''),
'opportunities': {
'short_term_5m_15m_1h': {
'exists': short_term.get('exists', False),
'direction': short_term.get('direction'),
'entry_levels': short_levels,
'entry_price': get_first_entry(short_levels, safe_float(short_term.get('entry_price'), 0)),
'stop_loss': safe_float(short_term.get('stop_loss'), 0),
'take_profit': safe_float(short_term.get('take_profit'), 0),
'reasoning': short_term.get('reasoning', '')
},
'medium_term_4h_1d': {
'exists': medium_term.get('exists', False),
'direction': medium_term.get('direction'),
'entry_levels': medium_levels,
'entry_price': get_first_entry(medium_levels, safe_float(medium_term.get('entry_price'), 0)),
'stop_loss': safe_float(medium_term.get('stop_loss'), 0),
'take_profit': safe_float(medium_term.get('take_profit'), 0),
'reasoning': medium_term.get('reasoning', '')
},
'long_term_1d_1w': {
'exists': long_term.get('exists', False),
'direction': long_term.get('direction'),
'entry_levels': long_levels,
'entry_price': get_first_entry(long_levels, safe_float(long_term.get('entry_price'), 0)),
'stop_loss': safe_float(long_term.get('stop_loss'), 0),
'take_profit': safe_float(long_term.get('take_profit'), 0),
'reasoning': long_term.get('reasoning', '')
},
'intraday': {
'exists': short_term.get('exists', False),
'direction': short_term.get('direction'),
'entry_levels': short_levels,
'entry_price': get_first_entry(short_levels, safe_float(short_term.get('entry_price'), 0)),
'stop_loss': safe_float(short_term.get('stop_loss'), 0),
'take_profit': safe_float(short_term.get('take_profit'), 0),
'reasoning': short_term.get('reasoning', '')
},
'swing': {
'exists': medium_term.get('exists', False) or long_term.get('exists', False),
'direction': medium_term.get('direction') or long_term.get('direction'),
'entry_levels': medium_levels if medium_term.get('exists') else long_levels,
'entry_price': safe_float(medium_term.get('entry_price') or long_term.get('entry_price'), 0),
'stop_loss': safe_float(medium_term.get('stop_loss') or long_term.get('stop_loss'), 0),
'take_profit': safe_float(medium_term.get('take_profit') or long_term.get('take_profit'), 0),
'reasoning': medium_term.get('reasoning', '') or long_term.get('reasoning', '')
},
},
'recommendations_by_timeframe': {
'short_term': recommendations.get('short_term', ''),
'medium_term': recommendations.get('medium_term', ''),
'long_term': recommendations.get('long_term', '')
},
# Primary levels (for backward compatibility)
'levels': {
'current_price': current_price,
'entry': get_first_entry(short_levels, 0) or get_first_entry(medium_levels, 0) or get_first_entry(long_levels, 0),
'stop_loss': safe_float(short_term.get('stop_loss'), 0) or safe_float(medium_term.get('stop_loss'), 0) or safe_float(long_term.get('stop_loss'), 0),
'take_profit_1': safe_float(short_term.get('take_profit'), 0) or safe_float(medium_term.get('take_profit'), 0) or safe_float(long_term.get('take_profit'), 0),
'take_profit_2': 0,
'take_profit_3': 0,
},
'risk_level': llm_decision.get('risk_level', 'MEDIUM'),
'key_factors': llm_decision.get('key_factors', []),
'raw_response': '',
}
# Calculate risk-reward ratio
entry = decision['levels']['entry']
stop_loss = decision['levels']['stop_loss']
tp1 = decision['levels']['take_profit_1']
if entry and stop_loss and tp1 and entry != stop_loss:
risk = abs(entry - stop_loss)
reward = abs(tp1 - entry)
decision['risk_reward_ratio'] = round(reward / risk, 2) if risk > 0 else 0
else:
decision['risk_reward_ratio'] = 0
return decision
def _fallback_parse(
self,
response_text: str,
market_context: Dict[str, Any]
) -> Dict[str, Any]:
"""Fallback parsing when JSON extraction fails"""
# Simple keyword-based signal extraction
text_lower = response_text.lower()
if 'buy' in text_lower or '买入' in response_text or '做多' in response_text:
signal_type = 'BUY'
confidence = 0.6
elif 'sell' in text_lower or '卖出' in response_text or '做空' in response_text:
signal_type = 'SELL'
confidence = 0.6
else:
signal_type = 'HOLD'
confidence = 0.5
return {
'timestamp': datetime.now().isoformat(),
'signal_type': signal_type,
'confidence': confidence,
'reasoning': response_text[:500], # First 500 chars
'levels': {
'current_price': market_context.get('current_price', 0),
'entry': 0,
'stop_loss': 0,
'take_profit_1': 0,
'take_profit_2': 0,
'take_profit_3': 0,
},
'risk_level': 'MEDIUM',
'time_horizon': 'MEDIUM',
'key_factors': [],
'raw_response': response_text,
'warning': 'Fallback parsing used - levels not available',
}
def _disabled_response(self) -> Dict[str, Any]:
"""Return response when LLM is disabled"""
return {
'timestamp': datetime.now().isoformat(),
'signal_type': 'HOLD',
'confidence': 0,
'reasoning': 'LLM decision maker is disabled (no API key)',
'enabled': False,
}
def _error_response(self, error_msg: str) -> Dict[str, Any]:
"""Return error response"""
return {
'timestamp': datetime.now().isoformat(),
'signal_type': 'HOLD',
'confidence': 0,
'reasoning': f'Error generating decision: {error_msg}',
'error': error_msg,
}