diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py
index 6124101..be5c040 100644
--- a/backend/app/crypto_agent/crypto_agent.py
+++ b/backend/app/crypto_agent/crypto_agent.py
@@ -64,6 +64,24 @@ class CryptoAgent:
}
}
+ PLATFORM_SIGNAL_PRIORITY = {
+ 'PaperTrading': ['short_term', 'medium_term'],
+ 'Hyperliquid': ['short_term', 'medium_term'],
+ 'Bitget': ['medium_term', 'short_term'],
+ }
+
+ SIGNAL_POSITION_SIZE_DEFAULTS = {
+ 'short_term': 'light',
+ 'medium_term': 'medium',
+ 'long_term': 'medium',
+ }
+
+ SIGNAL_MARGIN_MULTIPLIERS = {
+ 'short_term': 0.85,
+ 'medium_term': 1.0,
+ 'long_term': 1.0,
+ }
+
def __new__(cls, *args, **kwargs):
"""单例模式 - 确保只有一个实例"""
if cls._instance is None:
@@ -636,34 +654,9 @@ class CryptoAgent:
# ============================================================
logger.info(f"\n🤖 【第一阶段:市场信号分析】")
- # 获取上一轮的信号(用于上下文)
- previous_signal = self.last_signals.get(symbol)
-
- # 显示上一轮信号(如果有)
- if previous_signal:
- prev_time = previous_signal.get('timestamp', 'Unknown')
- prev_trend = previous_signal.get('trend', 'Unknown')
- prev_signals = previous_signal.get('signals', [])
- logger.info(f"📋 上一轮分析时间: {prev_time}")
- logger.info(f"📋 上一轮趋势: {prev_trend}")
-
- if prev_signals:
- for sig in prev_signals:
- action = sig.get('action', 'N/A')
- confidence = sig.get('confidence', 0)
- timeframe = sig.get('timeframe', 'unknown')
- type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
- type_text = type_map.get(timeframe, timeframe)
- logger.info(f"📋 上一轮信号: {type_text} | {action} | {confidence}%")
- else:
- logger.info(f"📋 上一轮信号: 无交易信号(观望)")
- else:
- logger.info(f"📋 上一轮信号: 无历史记录(首次分析)")
-
market_signal = await self.market_analyzer.analyze(
symbol, data,
- symbols=self.symbols,
- previous_signal=previous_signal
+ symbols=self.symbols
)
# 输出市场分析结果
@@ -696,6 +689,11 @@ class CryptoAgent:
return
logger.info(f"\n✅ 发现 {len(valid_signals)} 个有效交易信号(达到 {threshold}% 阈值)")
+ for signal in valid_signals:
+ logger.info(
+ f" - {signal.get('timeframe', signal.get('type', 'unknown'))} | "
+ f"{signal.get('action')} | {signal.get('confidence', 0)}%"
+ )
# ============================================================
# 发送市场信号通知(独立于交易决策)
@@ -707,34 +705,26 @@ class CryptoAgent:
# ============================================================
logger.info(f"\n🤖 【第二阶段:各平台独立处理信号】")
- # 使用第一个有效信号
- main_signal = valid_signals[0]
- signal_action = main_signal.get('action') # buy/sell
-
- # 构建标准信号格式
- trading_signal = {
- 'symbol': symbol,
- 'action': signal_action,
- 'confidence': main_signal.get('confidence', 50),
- 'grade': main_signal.get('grade', 'C'), # 添加信号等级
- 'entry_price': main_signal.get('entry_price', current_price),
- 'stop_loss': main_signal.get('stop_loss'),
- 'take_profit': main_signal.get('take_profit'),
- 'reasoning': main_signal.get('reasoning', ''),
- }
-
- logger.info(f" 信号: {signal_action} {symbol} @ ${trading_signal['entry_price']:.2f} (置信度 {trading_signal['confidence']}%, {trading_signal['grade']}级)")
-
# 2.1 模拟盘处理
if self.settings.paper_trading_enabled:
logger.info(f"\n📊 【模拟盘】")
paper_positions, paper_account, paper_pending = self._get_paper_trading_state()
- paper_decision = self.execute_signal_with_rules(
- trading_signal, 'PaperTrading', paper_account, paper_positions, paper_pending
- )
- paper_decision = self._normalize_execution_decision(
- paper_decision, paper_positions, paper_pending
- )
+ paper_signal = self._select_signal_for_platform(valid_signals, 'PaperTrading')
+ if paper_signal:
+ logger.info(
+ f" 采用信号: {paper_signal.get('timeframe', 'unknown')} | "
+ f"{paper_signal.get('action')} | {paper_signal.get('confidence', 0)}%"
+ )
+ trading_signal = self._build_execution_signal(symbol, paper_signal, current_price)
+ paper_decision = self.execute_signal_with_rules(
+ trading_signal, 'PaperTrading', paper_account, paper_positions, paper_pending
+ )
+ paper_decision = self._normalize_execution_decision(
+ paper_decision, paper_positions, paper_pending
+ )
+ else:
+ logger.info(" 无可执行信号")
+ paper_decision = {"decision": "HOLD", "action": "IGNORE", "reason": "无适配信号", "reasoning": "无适配信号"}
# 不发送决策通知(因为是基于硬编码规则的执行,不是 LLM 决策)
# await self._send_trading_decision_notification(
# paper_decision, market_signal, current_price, prefix="[模拟盘]"
@@ -747,12 +737,22 @@ class CryptoAgent:
if self.hyperliquid:
logger.info(f"\n🔥 【Hyperliquid】")
hl_positions, hl_account, hl_pending = self._get_hyperliquid_trading_state()
- hl_decision = self.execute_signal_with_rules(
- trading_signal, 'Hyperliquid', hl_account, hl_positions, hl_pending
- )
- hl_decision = self._normalize_execution_decision(
- hl_decision, hl_positions, hl_pending
- )
+ hl_signal = self._select_signal_for_platform(valid_signals, 'Hyperliquid')
+ if hl_signal:
+ logger.info(
+ f" 采用信号: {hl_signal.get('timeframe', 'unknown')} | "
+ f"{hl_signal.get('action')} | {hl_signal.get('confidence', 0)}%"
+ )
+ trading_signal = self._build_execution_signal(symbol, hl_signal, current_price)
+ hl_decision = self.execute_signal_with_rules(
+ trading_signal, 'Hyperliquid', hl_account, hl_positions, hl_pending
+ )
+ hl_decision = self._normalize_execution_decision(
+ hl_decision, hl_positions, hl_pending
+ )
+ else:
+ logger.info(" 无可执行信号")
+ hl_decision = {"decision": "HOLD", "action": "IGNORE", "reason": "无适配信号", "reasoning": "无适配信号"}
# 不发送决策通知(因为是基于硬编码规则的执行,不是 LLM 决策)
# await self._send_trading_decision_notification(
# hl_decision, market_signal, current_price, prefix="[Hyperliquid]"
@@ -765,12 +765,22 @@ class CryptoAgent:
if self.bitget:
logger.info(f"\n🔥 【Bitget】")
bg_positions, bg_account, bg_pending = self._get_bitget_trading_state()
- bg_decision = self.execute_signal_with_rules(
- trading_signal, 'Bitget', bg_account, bg_positions, bg_pending
- )
- bg_decision = self._normalize_execution_decision(
- bg_decision, bg_positions, bg_pending
- )
+ bg_signal = self._select_signal_for_platform(valid_signals, 'Bitget')
+ if bg_signal:
+ logger.info(
+ f" 采用信号: {bg_signal.get('timeframe', 'unknown')} | "
+ f"{bg_signal.get('action')} | {bg_signal.get('confidence', 0)}%"
+ )
+ trading_signal = self._build_execution_signal(symbol, bg_signal, current_price)
+ bg_decision = self.execute_signal_with_rules(
+ trading_signal, 'Bitget', bg_account, bg_positions, bg_pending
+ )
+ bg_decision = self._normalize_execution_decision(
+ bg_decision, bg_positions, bg_pending
+ )
+ else:
+ logger.info(" 无可执行信号")
+ bg_decision = {"decision": "HOLD", "action": "IGNORE", "reason": "无适配信号", "reasoning": "无适配信号"}
# 不发送决策通知(因为是基于硬编码规则的执行,不是 LLM 决策)
# await self._send_trading_decision_notification(
# bg_decision, market_signal, current_price, prefix="[Bitget]"
@@ -1118,14 +1128,19 @@ class CryptoAgent:
bitget_decision: Dict[str, Any],
market_signal: Dict[str, Any], current_price: float):
"""执行交易决策(三轨独立)"""
- # 选择最佳信号用于保存
- best_signal = self._get_best_signal_from_market(market_signal)
+ # 保存本轮所有达到阈值的可交易信号,避免分流后只落一条信号
+ threshold = self.settings.crypto_llm_threshold * 100
+ symbol = market_signal.get('symbol')
+ signals = market_signal.get('signals', [])
+ valid_signals = [
+ signal for signal in signals
+ if signal.get('action') in {'buy', 'sell'} and signal.get('confidence', 0) >= threshold
+ ]
- # 保存信号到数据库(只保存一次)
- if best_signal:
- signal_to_save = best_signal.copy()
+ for signal in valid_signals:
+ signal_to_save = signal.copy()
signal_to_save['signal_type'] = 'crypto'
- signal_to_save['symbol'] = market_signal.get('symbol')
+ signal_to_save['symbol'] = symbol
signal_to_save['current_price'] = current_price
self.signal_db.add_signal(signal_to_save)
@@ -1235,6 +1250,64 @@ class CryptoAgent:
sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True)
return sorted_signals[0]
+ def _select_signal_for_platform(self, signals: List[Dict[str, Any]], platform_name: str) -> Optional[Dict[str, Any]]:
+ """根据平台偏好选择最适合执行的信号"""
+ if not signals:
+ return None
+
+ lane_priority = self.PLATFORM_SIGNAL_PRIORITY.get(platform_name, ['short_term', 'medium_term'])
+ by_lane: Dict[str, List[Dict[str, Any]]] = {}
+ for signal in signals:
+ lane = signal.get('timeframe') or signal.get('type') or 'unknown'
+ by_lane.setdefault(lane, []).append(signal)
+
+ for lane in lane_priority:
+ candidates = by_lane.get(lane, [])
+ if candidates:
+ return sorted(candidates, key=lambda item: item.get('confidence', 0), reverse=True)[0]
+
+ return sorted(signals, key=lambda item: item.get('confidence', 0), reverse=True)[0]
+
+ def _build_execution_signal(self, symbol: str, signal: Dict[str, Any], current_price: float) -> Dict[str, Any]:
+ """构建传给执行规则层的标准信号格式"""
+ signal_type = signal.get('timeframe') or signal.get('type') or 'medium_term'
+ position_size = signal.get('position_size') or self.SIGNAL_POSITION_SIZE_DEFAULTS.get(signal_type, 'light')
+
+ return {
+ 'symbol': symbol,
+ 'action': signal.get('action'),
+ 'confidence': signal.get('confidence', 50),
+ 'grade': signal.get('grade', 'C'),
+ 'entry_type': signal.get('entry_type', 'market'),
+ 'entry_price': signal.get('entry_price', current_price),
+ 'stop_loss': signal.get('stop_loss'),
+ 'take_profit': signal.get('take_profit'),
+ 'reasoning': signal.get('reasoning', ''),
+ 'timeframe': signal_type,
+ 'type': signal_type,
+ 'position_size': position_size,
+ }
+
+ def _get_signal_for_decision(self, market_signal: Dict[str, Any], decision: Dict[str, Any]) -> Dict[str, Any]:
+ """优先返回与当前执行决策匹配的信号,用于通知和展示"""
+ if not market_signal:
+ return {}
+
+ target_timeframe = decision.get('timeframe') or decision.get('type')
+ target_action = decision.get('signal_action') or decision.get('action')
+ signals = market_signal.get('signals', [])
+
+ if target_timeframe or target_action:
+ matched = [
+ signal for signal in signals
+ if (not target_timeframe or (signal.get('timeframe') or signal.get('type')) == target_timeframe)
+ and (not target_action or signal.get('action') == target_action)
+ ]
+ if matched:
+ return sorted(matched, key=lambda item: item.get('confidence', 0), reverse=True)[0]
+
+ return self._get_best_signal_from_market(market_signal)
+
async def _send_market_signal_notification(self, market_signal: Dict[str, Any],
current_price: float):
"""发送市场信号通知(第一阶段)- 调用前已确保有有效信号"""
@@ -1444,9 +1517,12 @@ class CryptoAgent:
title = f"[决策] {account_type} {symbol} 交易决策: {decision_text}"
# 获取最佳信号用于显示
- best_signal = self._get_best_signal_from_market(market_signal)
+ best_signal = self._get_signal_for_decision(market_signal, decision)
signal_confidence = best_signal.get('confidence', 0) if best_signal else 0
signal_action = best_signal.get('action', '') if best_signal else ''
+ signal_timeframe = best_signal.get('timeframe', best_signal.get('type', 'unknown')) if best_signal else 'unknown'
+ timeframe_map = {'short_term': '短线', 'medium_term': '趋势', 'long_term': '长线'}
+ timeframe_text = timeframe_map.get(signal_timeframe, signal_timeframe)
# 方向图标
if 'buy' in signal_action.lower() or 'long' in signal_action.lower():
@@ -1461,7 +1537,7 @@ class CryptoAgent:
# 构建内容
content_parts = [
- f"{action_icon} **市场信号**: {action_text} | 信心度: {signal_confidence}%",
+ f"{action_icon} **市场信号**: {action_text} | {timeframe_text} | 信心度: {signal_confidence}%",
f"",
f"🎯 **交易决策**: {decision_text}",
f"",
@@ -1568,7 +1644,7 @@ class CryptoAgent:
# confidence 优先从决策本身读取,否则从市场信号的最佳信号读取
confidence = decision.get('confidence')
if confidence is None:
- _best = self._get_best_signal_from_market(market_signal)
+ _best = self._get_signal_for_decision(market_signal, decision)
confidence = _best.get('confidence', 0) if _best else 0
# 决策类型映射
@@ -1595,8 +1671,11 @@ class CryptoAgent:
action_text = action
# 从市场信号中获取入场方式(需要在构建标题之前)
- best_signal = self._get_best_signal_from_market(market_signal)
+ best_signal = self._get_signal_for_decision(market_signal, decision)
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
+ signal_timeframe = best_signal.get('timeframe', best_signal.get('type', 'unknown')) if best_signal else 'unknown'
+ timeframe_map = {'short_term': '短线', 'medium_term': '趋势', 'long_term': '长线'}
+ timeframe_text = timeframe_map.get(signal_timeframe, signal_timeframe)
# 对 Hyperliquid 限价单:用实际订单状态决定显示
# resting=真的在挂单中, filled=已立即成交, None=非HL或市价单
@@ -1661,6 +1740,7 @@ class CryptoAgent:
content_parts = [
f"{action_icon} **操作**: {decision_text} ({action_text})",
+ f"🧭 **信号类型**: {timeframe_text}",
f"{entry_type_icon} **入场方式**: {entry_type_text}",
f"{position_display.replace(' ', ': **')} | 📈 信心度: **{confidence}%**",
f"",
@@ -1711,10 +1791,15 @@ class CryptoAgent:
action_text = "做多" if 'buy' in action.lower() else ("做空" if 'sell' in action.lower() else action)
decision_text = {'OPEN': '开仓', 'ADD': '加仓'}.get(decision_type, decision_type)
+ best_signal = self._get_signal_for_decision(market_signal, decision)
+ signal_timeframe = best_signal.get('timeframe', best_signal.get('type', 'unknown')) if best_signal else 'unknown'
+ timeframe_map = {'short_term': '短线', 'medium_term': '趋势', 'long_term': '长线'}
+ timeframe_text = timeframe_map.get(signal_timeframe, signal_timeframe)
title = f"{title_prefix}⚠️ {symbol} {decision_text}未执行"
content = "\n".join([
f"🔴 **决策**: {decision_text}({action_text})",
+ f"🧭 **信号类型**: {timeframe_text}",
f"❌ **未执行原因**: {reason}",
f"🕐 **时间**: {datetime.now().strftime('%H:%M:%S')}",
])
@@ -1753,10 +1838,10 @@ class CryptoAgent:
# 转换决策的 action 为 paper_trading 期望的格式
trading_action = self._convert_trading_action(action)
- # 从市场信号中获取入场方式和入场价格
- best_signal = self._get_best_signal_from_market(market_signal)
- entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
- entry_price = best_signal.get('entry_price', current_price) if best_signal else current_price
+ # 兼容旧入口,但信号选择仍按当前决策对应的 lane 匹配
+ matched_signal = self._get_signal_for_decision(market_signal, decision)
+ entry_type = matched_signal.get('entry_type', 'market') if matched_signal else 'market'
+ entry_price = matched_signal.get('entry_price', current_price) if matched_signal else current_price
logger.info(f" 入场方式: {entry_type} | 入场价格: ${entry_price:,.2f}")
@@ -2267,6 +2352,8 @@ class CryptoAgent:
Returns:
(margin, reason) - 保证金金额和原因
"""
+ signal_type = signal.get('timeframe') or signal.get('type') or 'medium_term'
+
# 基础保证金比例(超激进配置 - 最大化资金利用率)
confidence = signal.get('confidence', 50)
if confidence >= 90:
@@ -2279,6 +2366,8 @@ class CryptoAgent:
base_margin_pct = 0.08 # C级: 8% (轻仓试探)
grade = 'C'
+ base_margin_pct *= self.SIGNAL_MARGIN_MULTIPLIERS.get(signal_type, 1.0)
+
# 可用保证金
available = account.get('available', account.get('available_balance', 0))
balance = account.get('current_balance', 0)
@@ -2321,7 +2410,7 @@ class CryptoAgent:
if margin > available:
margin = available * 0.95 # 留 5% 余量
- return round(margin, 2), f"信号{grade}级({confidence}%) → {base_margin_pct*100}%保证金"
+ return round(margin, 2), f"{signal_type} 信号{grade}级({confidence}%) → {base_margin_pct*100:.1f}%保证金"
def _handle_same_direction(self, signal: Dict[str, Any],
positions: List[Dict],
@@ -2437,6 +2526,7 @@ class CryptoAgent:
return "OPEN", "无反向订单,正常开仓"
def _check_risk_control(self, signal: Dict[str, Any],
+ platform_name: str,
account: Dict[str, Any],
positions: List[Dict],
pending_orders: List[Dict]) -> tuple:
@@ -2457,7 +2547,7 @@ class CryptoAgent:
# 2. 可用余额检查
available = account.get('available', account.get('available_balance', 0))
symbol = signal.get('symbol', '').replace('USDT', '').upper()
- rules = self.PLATFORM_RULES.get('Bitget', {}) # 使用 Bitget 规则检查最小保证金
+ rules = self.PLATFORM_RULES.get(platform_name, {})
min_margin = rules.get('min_margin', {}).get(symbol, 10)
if available < min_margin:
@@ -2473,6 +2563,9 @@ class CryptoAgent:
sl = signal.get('stop_loss')
tp = signal.get('take_profit')
+ signal_type = signal.get('timeframe') or signal.get('type') or 'medium_term'
+ min_rr = 1.5 if signal_type == 'short_term' else 1.8 if signal_type == 'medium_term' else 1.2
+
if entry > 0 and sl and tp:
try:
sl = float(sl)
@@ -2487,8 +2580,8 @@ class CryptoAgent:
if risk > 0:
risk_reward_ratio = reward / risk
- if risk_reward_ratio < 1.2:
- return False, f"盈亏比 {risk_reward_ratio:.2f} < 1.2,不执行"
+ if risk_reward_ratio < min_rr:
+ return False, f"{signal_type} 盈亏比 {risk_reward_ratio:.2f} < {min_rr:.1f},不执行"
except:
pass # 价格解析失败,跳过检查
@@ -2512,17 +2605,26 @@ class CryptoAgent:
Returns:
执行决策字典
"""
+ if not signal:
+ return {
+ "decision": "HOLD",
+ "action": "IGNORE",
+ "reason": "无适配信号",
+ "reasoning": "无适配信号"
+ }
+
logger.info(f"\n🎯 [{platform_name}] 处理交易信号: {signal.get('action')} {signal.get('symbol')}")
# 1. 风控检查
- passed, reason = self._check_risk_control(signal, account, positions, pending_orders)
+ passed, reason = self._check_risk_control(signal, platform_name, account, positions, pending_orders)
if not passed:
logger.info(f" ❌ 风控未通过: {reason}")
return {
"decision": "HOLD",
"action": "IGNORE",
"reason": reason,
- "reasoning": reason
+ "reasoning": reason,
+ **signal
}
# 2. 处理同向订单
@@ -2534,7 +2636,8 @@ class CryptoAgent:
"decision": "HOLD",
"action": same_action,
"reason": same_reason,
- "reasoning": same_reason
+ "reasoning": same_reason,
+ **signal
}
# 3. 处理反向订单
@@ -2604,6 +2707,9 @@ class CryptoAgent:
"take_profit": signal.get('take_profit'),
"confidence": signal.get('confidence', 0),
"grade": signal.get('grade', 'C'),
+ "timeframe": signal.get('timeframe'),
+ "type": signal.get('type'),
+ "position_size": signal.get('position_size', 'light'),
}
async def _execute_bitget_decisions(self, decision: Dict[str, Any],
@@ -3312,32 +3418,6 @@ class CryptoAgent:
# 发生错误时返回 0,不开仓
return 0
- def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any],
- current_price: float) -> Dict[str, Any]:
- """转换 LLM 信号格式为模拟交易格式"""
- signal_type = signal.get('type', 'medium_term')
- type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
-
- # 获取入场类型和入场价
- entry_type = signal.get('entry_type', 'market')
- entry_price = signal.get('entry_price', current_price)
-
- return {
- 'symbol': symbol,
- 'action': signal.get('action', 'hold'),
- 'entry_type': entry_type, # market 或 limit
- 'entry_price': entry_price, # 入场价(挂单价格)
- 'price': current_price, # 当前价格
- 'stop_loss': signal.get('stop_loss', 0),
- 'take_profit': signal.get('take_profit', 0),
- 'confidence': signal.get('confidence', 0),
- 'signal_grade': signal.get('grade', 'D'),
- 'signal_type': type_map.get(signal_type, 'swing'),
- 'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
- 'reasons': [signal.get('reason', '')],
- 'timestamp': datetime.now()
- }
-
def _calculate_price_change(self, h1_data: pd.DataFrame) -> str:
"""计算24小时价格变化"""
if len(h1_data) < 24:
@@ -3351,7 +3431,7 @@ class CryptoAgent:
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
"""验证数据完整性"""
- required_intervals = ['1m', '5m', '15m', '30m', '1h']
+ required_intervals = ['1m', '5m', '15m', '30m', '1h', '4h']
for interval in required_intervals:
if interval not in data or data[interval].empty:
return False
@@ -3420,34 +3500,6 @@ class CryptoAgent:
except Exception as e:
logger.error(f"持仓回顾失败: {e}", exc_info=True)
- def _convert_to_real_signal(self, symbol: str, signal: Dict[str, Any],
- current_price: float) -> Dict[str, Any]:
- """转换 LLM 信号格式为实盘交易格式"""
- signal_type = signal.get('type', 'medium_term')
- type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
-
- # 获取入场类型和入场价
- entry_type = signal.get('entry_type', 'market')
- entry_price = signal.get('entry_price', current_price)
-
- # 映射 action: buy -> long, sell -> short
- action = signal.get('action', 'hold')
- side_map = {'buy': 'long', 'sell': 'short'}
-
- return {
- 'symbol': symbol,
- 'side': side_map.get(action, 'long'),
- 'entry_type': entry_type,
- 'entry_price': entry_price,
- 'stop_loss': signal.get('stop_loss', 0),
- 'take_profit': signal.get('take_profit', 0),
- 'confidence': signal.get('confidence', 0),
- 'grade': signal.get('grade', 'D'),
- 'signal_type': type_map.get(signal_type, 'swing'),
- 'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
- 'trend': signal.get('trend')
- }
-
async def _notify_position_adjustment(
self,
symbol: str,
@@ -3512,14 +3564,16 @@ class CryptoAgent:
symbol = market_signal.get('symbol')
account_type = "📊"
- # 获取最佳信号
- best_signal = self._get_best_signal_from_market(market_signal)
- if not best_signal:
+ signal = self._get_signal_for_decision(market_signal, decision)
+ if not signal:
return
- confidence = best_signal.get('confidence', 0)
- entry_type = best_signal.get('entry_type', 'market')
- entry_price = best_signal.get('entry_price', current_price)
+ confidence = signal.get('confidence', 0)
+ entry_type = signal.get('entry_type', 'market')
+ entry_price = signal.get('entry_price', current_price)
+ signal_timeframe = signal.get('timeframe', signal.get('type', 'unknown'))
+ timeframe_map = {'short_term': '短线', 'medium_term': '趋势', 'long_term': '长线'}
+ timeframe_text = timeframe_map.get(signal_timeframe, signal_timeframe)
# 决策信息
decision_type = decision.get('decision', 'HOLD')
@@ -3534,7 +3588,7 @@ class CryptoAgent:
final_reason = "未知原因"
# 方向图标
- action = best_signal.get('action', 'wait')
+ action = signal.get('action', 'wait')
if action == 'buy':
action_icon = '🟢'
action_text = '做多'
@@ -3550,7 +3604,7 @@ class CryptoAgent:
# 构建内容
content_parts = [
- f"{action_icon} **信号**: {action_text} | 📈 信心度: **{confidence}%**",
+ f"{action_icon} **信号**: {action_text} | {timeframe_text} | 📈 信心度: **{confidence}%**",
f"",
f"**入场方式**: {entry_type}",
f"**建议入场价**: ${entry_price:,.2f}" if isinstance(entry_price, (int, float)) else f"**建议入场价**: {entry_price}",
@@ -3584,11 +3638,9 @@ class CryptoAgent:
return {'error': '数据不完整'}
# 使用新架构:市场分析 + 交易决策
- previous_signal = self.last_signals.get(symbol)
market_signal = await self.market_analyzer.analyze(
symbol, data,
- symbols=self.symbols,
- previous_signal=previous_signal
+ symbols=self.symbols
)
positions, account = self._get_trading_state()
diff --git a/backend/app/crypto_agent/executor/paper_trading_executor.py b/backend/app/crypto_agent/executor/paper_trading_executor.py
index edd0249..0c10102 100644
--- a/backend/app/crypto_agent/executor/paper_trading_executor.py
+++ b/backend/app/crypto_agent/executor/paper_trading_executor.py
@@ -46,6 +46,9 @@ class PaperTradingExecutor(BaseExecutor):
signal_grade = 'D'
# 构建信号字典
+ raw_signal_type = decision.get('timeframe') or decision.get('type', 'swing')
+ signal_type = 'short_term' if raw_signal_type == 'short_term' else 'swing'
+
signal = {
'symbol': symbol,
'action': action,
@@ -57,6 +60,9 @@ class PaperTradingExecutor(BaseExecutor):
'confidence': confidence,
'quantity': adjusted_margin,
'position_size': decision.get('position_size', 'light'),
+ 'signal_type': signal_type,
+ 'type': raw_signal_type,
+ 'reason': decision.get('reasoning', decision.get('reason', '')),
}
# 执行下单(统一调用方式)
diff --git a/backend/app/crypto_agent/market_signal_analyzer.py b/backend/app/crypto_agent/market_signal_analyzer.py
index 9263521..bbb439d 100644
--- a/backend/app/crypto_agent/market_signal_analyzer.py
+++ b/backend/app/crypto_agent/market_signal_analyzer.py
@@ -27,130 +27,136 @@ from app.services.bitget_service import bitget_service
class MarketSignalAnalyzer:
"""市场信号分析器 - 只关注市场,输出客观信号"""
- # 市场分析系统提示词(Al Brooks价格行为学 + 供需 + 量价)
- MARKET_ANALYSIS_PROMPT = """你是一位专业的加密货币日内交易员,采用 Al Brooks 价格行为学分析方法。
-核心原则:价格行为 + 供需区域 + 量价关系 决定入场,技术指标(RSI/ATR)仅作辅助参考。
-满足 2-3 个入场条件即可交易,不追求所有条件同时满足。
+ INTRADAY_ANALYSIS_TEMPERATURE = 0.15
+ TREND_ANALYSIS_TEMPERATURE = 0.10
+ ANALYSIS_MAX_TOKENS = 1200
+ LANE_MIN_CONFIDENCE = {
+ 'short_term': 60,
+ 'medium_term': 65,
+ }
-## 第一步:判断市场状态
+ INTRADAY_ANALYSIS_PROMPT = """你是一位专业的加密货币日内交易员,只负责生成 short_term 信号。
-**价格结构判断(优先于EMA)**:
-- 上升趋势:连续更高高点(HH)+ 更高低点(HL)
-- 下降趋势:连续更低高点(LH)+ 更低低点(LL)
-- 震荡区间:高点不再创新高,低点不再创新低,价格横向震荡
+你的任务是基于 5m / 15m / 30m、当日开盘、VWAP、开盘区间、关键位和衍生品拥挤度,判断未来 30 分钟到 4 小时内是否存在可执行 setup。
-**趋势强弱**:
-- 强趋势:大实体K线居多,回调幅度 < 前段波动的50%,回调缩量
-- 弱趋势 / 变盘预警:K线实体缩小、大影线增多、回调超过50%、多次假突破
+执行原则:
+1. 先判断日内 regime:trending / ranging / neutral。
+2. 趋势日内只做顺势回调或突破后的回踩确认,不追涨杀跌。
+3. 震荡日内只做区间边界附近的反转,不在区间中部开仓。
+4. 技术指标只做辅助,优先看结构、关键位、波动率、量能、VWAP 偏离和距离。
+5. 没有清晰止损、止盈和盈亏比就不交易。
+6. 本次分析独立进行,不参考任何上一轮信号。
-## 第二步:识别供需区域
-
-**供给区(压力/阻力)**:
-- 前期显著高点,价格曾在此快速下行的区域
-- 多次测试未能有效突破的价格区间
-- 当日开盘价、整数关口
-
-**需求区(支撑)**:
-- 前期显著低点,价格曾在此快速上行的区域
-- 多次测试未能有效跌破的价格区间
-- 当日开盘价、整数关口
-
-## 第三步:寻找入场机会
-
-### A. 趋势市:顺势回调入场
-
-满足以下 **2-3 项**即可入场,不需全部满足:
-
-1. **两段回调**(Al Brooks 核心):上升趋势中,回调分两波(A段下→小反弹→B段再下),B段低点出现看涨K线 → 做多;下降趋势反向同理
-2. **关键位反转**:价格回调至供需区/前期高低点/整数关口,出现明确反转K线(吞没、锤子、大阳/阴线)
-3. **量价确认**:回调缩量(量比<0.8),在支撑位出现放量反弹(量比>1.2)
-4. **强趋势K线**:出现大实体顺势K线(实体 > 影线总和),表明方向明确
-5. **二次入场**(更可靠):第一次突破/回调往往是陷阱;等待回测确认后的第二次方向运动再入场
-
-**entry_type 选择**:
-- 价格正在快速移动中 → limit 挂单等待回调
-- 回调已到位,出现反转K线 → market 立即入场
-
-### B. 震荡市:区间边界反向入场
-
-1. 价格接近区间上沿(供给区)+ 看跌K线形态 → 做空
-2. 价格接近区间下沿(需求区)+ 看涨K线形态 → 做多
-3. 需有量价配合(入场K线相对放量)
-
-### C. 量价过滤(关键!)
-
-**允许入场**:
-- ✅ 突破时量比 > 1.2(放量突破,真突破概率高)
-- ✅ 回调缩量(量比 < 0.8)后,在支撑位出现放量反转
-
-**禁止入场**:
-- ❌ 突破时量比 < 0.8(无量突破,假突破概率高)
-- ❌ 连续 3 根以上大实体同向K线(加速中,不追)
-- ❌ 盈亏比 < 1:1.2
-
-## 第四步:止损与目标
-
-**止损设置**(Al Brooks信号K线止损法):
-- 做多止损:信号K线(反转K线)低点下方 0.2-0.3% 缓冲
-- 做空止损:信号K线高点上方 0.2-0.3% 缓冲
-- 用 1.5×ATR(30m) 验证合理性,止损范围参考 0.8-2.5%
-
-**目标设置**:
-- 下一个供需区域(做多看上方压力,做空看下方支撑)
-- 最低盈亏比:1:1.2(强信号可要求 1:1.5)
-- 日内目标:2-3%
-
-## 资金费率快速判断
-
-| 情绪 | 做多操作 | 做空操作 |
-|------|---------|---------|
-| 极度贪婪(>+0.1%) | 降低置信度,不加仓 | 可适当提高优先级 |
-| 中性 | 正常操作 | 正常操作 |
-| 极度恐惧(<-0.1%) | 可适当提高优先级 | 降低置信度,不加仓 |
-
-## 历史信号参考
-
-若已有上一轮信号,只在以下情况输出新信号:
-- 趋势结构发生明确反转(HH/HL → LH/LL,或反向)
-- 新入场价与上一轮差距 ≥ 1.5%(出现新的供需位)
-- 距上一轮信号超过 2 小时
-- 价格已触及上一轮止损或止盈
-
-## 输出格式(严格遵守)
+信号要求:
+1. 只允许输出 0 或 1 个 short_term 信号。
+2. 盈亏比至少 1:1.5。
+3. 如果价格处于加速延伸,优先返回空信号。
+4. 如果价格位于区间中部、离关键位太远、止损过宽或方向证据冲突,必须返回空信号。
+5. 只有在 setup 足够清晰时才允许输出信号;宁可空仓,不要勉强给单。
+6. entry_type:
+ - 价格已回到关键位并出现确认,可用 market
+ - 仍需等待回踩/反抽,使用 limit
+7. grade / confidence 约束:
+ - A: 80-100,结构、位置、量价、时机都对齐
+ - B: 70-79,条件较完整但仍有一项次优
+ - C: 60-69,只有轻仓试错级别
+ - 60 以下不要输出交易信号
+输出 JSON,禁止输出解释性正文:
```json
{
- "market_state": "ranging/trending",
+ "market_state": "ranging/trending/neutral",
"trend_direction": "uptrend/downtrend/neutral",
"trend_strength": "strong/medium/weak",
- "analysis_summary": "市场状态简述,30字以内",
+ "analysis_summary": "20字以内,总结日内状态",
"key_levels": {
"support": [数字, 数字],
"resistance": [数字, 数字]
},
"signals": [
{
- "type": "short_term/medium_term",
+ "type": "short_term",
"action": "buy/sell",
"entry_type": "market/limit",
- "confidence": 0-100,
+ "confidence": 0,
"grade": "A/B/C",
- "entry_price": 数字,
- "stop_loss": 数字,
- "take_profit": 数字,
- "reasoning": "入场依据:供需位+K线形态+量价状态"
+ "entry_price": 0,
+ "stop_loss": 0,
+ "take_profit": 0,
+ "reasoning": "结构+关键位+量能+波动率"
}
]
}
```
-**输出规则**:
-- 无明确信号时:signals 为空数组 []
-- 盈亏比 < 1:1.2 不输出信号
-- 最多输出 2 个最强信号
-- entry_price / stop_loss / take_profit 必须是纯数字,不加 $ 或逗号
+额外约束:
+1. `analysis_summary` 控制在 20 个中文字符以内。
+2. `reasoning` 只写一条简洁证据链,不要写仓位建议。
+3. `entry_price` / `stop_loss` / `take_profit` 必须是纯数字。
+4. 做多必须满足 `stop_loss < entry_price < take_profit`;做空必须满足 `take_profit < entry_price < stop_loss`。
+5. 没有 setup 时必须返回 `signals: []`。
+"""
-你只负责分析市场信号,不负责仓位管理和风险控制。
+ TREND_ANALYSIS_PROMPT = """你是一位专业的加密货币趋势交易员,只负责生成 medium_term 信号。
+
+你的任务是基于 1h / 4h、关键位、趋势阶段、反转检测、衍生品拥挤度和新闻催化,判断未来 4 小时到 3 天内是否存在趋势 setup。
+
+执行原则:
+1. 4h 决定大方向,1h 决定节奏与入场位置。
+2. 只做两类交易:
+ - 趋势延续:4h 趋势明确,1h 回踩关键位后确认继续
+ - 趋势反转:4h 结构和 1h 动能同时改善,且反转证据充分
+3. 禁止仅凭 15m 噪音逆 4h 开仓。
+4. 趋势晚期、资金费率过热或价格过度偏离关键均线时,要显著降低开仓积极性。
+5. 没有清晰位置优势就不交易。
+6. 本次分析独立进行,不参考任何上一轮信号。
+
+信号要求:
+1. 只允许输出 0 或 1 个 medium_term 信号。
+2. 盈亏比至少 1:1.8。
+3. 如果 4h 与 1h 明显冲突,优先返回空信号。
+4. 反转信号必须比延续信号更严格。
+5. 如果趋势处于晚期且没有回踩确认,或反转证据不足,必须返回空信号。
+6. 只有在位置优势和方向一致性都充分时才允许开仓。
+7. grade / confidence 约束:
+ - A: 82-100,4h/1h 同向且位置优
+ - B: 72-81,趋势或反转证据较完整
+ - C: 65-71,仅限早期确认不足的轻仓趋势尝试
+ - 65 以下不要输出交易信号
+
+输出 JSON,禁止输出解释性正文:
+```json
+{
+ "market_state": "ranging/trending/neutral",
+ "trend_direction": "uptrend/downtrend/neutral",
+ "trend_strength": "strong/medium/weak",
+ "analysis_summary": "20字以内,总结趋势状态",
+ "key_levels": {
+ "support": [数字, 数字],
+ "resistance": [数字, 数字]
+ },
+ "signals": [
+ {
+ "type": "medium_term",
+ "action": "buy/sell",
+ "entry_type": "market/limit",
+ "confidence": 0,
+ "grade": "A/B/C",
+ "entry_price": 0,
+ "stop_loss": 0,
+ "take_profit": 0,
+ "reasoning": "4h方向+1h节奏+关键位+量价"
+ }
+ ]
+}
+```
+
+额外约束:
+1. `analysis_summary` 控制在 20 个中文字符以内。
+2. `reasoning` 只写一条简洁证据链,不要写仓位建议。
+3. `entry_price` / `stop_loss` / `take_profit` 必须是纯数字。
+4. 做多必须满足 `stop_loss < entry_price < take_profit`;做空必须满足 `take_profit < entry_price < stop_loss`。
+5. 没有 setup 时必须返回 `signals: []`。
"""
def __init__(self):
@@ -158,8 +164,7 @@ class MarketSignalAnalyzer:
self.exchange = bitget_service
async def analyze(self, symbol: str, data: Dict[str, Any],
- symbols: List[str] = None,
- previous_signal: Dict[str, Any] = None) -> Dict[str, Any]:
+ symbols: List[str] = None) -> Dict[str, Any]:
"""
分析市场并生成信号
@@ -167,7 +172,6 @@ class MarketSignalAnalyzer:
symbol: 交易对
data: 多周期K线数据
symbols: 所有监控的交易对(用于市场对比)
- previous_signal: 上一轮的分析信号(用于避免重复信号和提供上下文)
Returns:
市场信号字典
@@ -182,20 +186,48 @@ class MarketSignalAnalyzer:
# 3. 获取合约市场数据(资金费率、持仓量等)
futures_context = await self._get_futures_context(symbol)
- # 4. 构建 LLM 提示词
- prompt = self._build_analysis_prompt(symbol, market_context, news_context, previous_signal, futures_context)
+ # 4. 将日内和趋势拆成两次独立分析,避免一个 prompt 同时混做两件事
+ intraday_prompt = self._build_analysis_prompt(
+ symbol=symbol,
+ lane="intraday",
+ market_context=market_context,
+ news_context=news_context,
+ futures_context=futures_context
+ )
+ trend_prompt = self._build_analysis_prompt(
+ symbol=symbol,
+ lane="trend",
+ market_context=market_context,
+ news_context=news_context,
+ futures_context=futures_context
+ )
- # 5. 调用 LLM 分析
- messages = [
- {"role": "system", "content": self.MARKET_ANALYSIS_PROMPT},
- {"role": "user", "content": prompt}
+ intraday_messages = [
+ {"role": "system", "content": self.INTRADAY_ANALYSIS_PROMPT},
+ {"role": "user", "content": intraday_prompt}
+ ]
+ trend_messages = [
+ {"role": "system", "content": self.TREND_ANALYSIS_PROMPT},
+ {"role": "user", "content": trend_prompt}
]
- response = await llm_service.achat(messages)
- # 6. 解析结果
- result = self._parse_llm_response(response, symbol)
+ intraday_response, trend_response = await asyncio.gather(
+ llm_service.achat(
+ intraday_messages,
+ temperature=self.INTRADAY_ANALYSIS_TEMPERATURE,
+ max_tokens=self.ANALYSIS_MAX_TOKENS
+ ),
+ llm_service.achat(
+ trend_messages,
+ temperature=self.TREND_ANALYSIS_TEMPERATURE,
+ max_tokens=self.ANALYSIS_MAX_TOKENS
+ )
+ )
- return result
+ intraday_result = self._parse_llm_response(intraday_response or "", symbol)
+ trend_result = self._parse_llm_response(trend_response or "", symbol)
+
+ return self._merge_lane_results(symbol, intraday_result, trend_result)
except Exception as e:
logger.error(f"市场信号分析失败: {e}")
@@ -204,187 +236,371 @@ class MarketSignalAnalyzer:
return self._get_empty_signal(symbol)
def _prepare_market_context(self, symbol: str, data: Dict,
- symbols: List[str] = None) -> str:
+ symbols: List[str] = None) -> Dict[str, str]:
"""准备市场上下文信息"""
- context_parts = []
-
- # 当前价格和24h变化
current_price = float(data['5m'].iloc[-1]['close'])
price_change_24h = self._calculate_price_change_24h(data['1h'])
- context_parts.append(f"当前价格: ${current_price:,.2f} ({price_change_24h})")
+ day_open = self._get_session_open(data.get('1h'))
+ session_vwap = self._calculate_session_vwap(data.get('5m'))
+ opening_range = self._calculate_opening_range(data.get('5m'))
- # 当日开盘价(供需区域参考)
- df_1h = data.get('1h')
- if df_1h is not None and len(df_1h) > 0:
- # 取今天0点后的第一根1h K线作为当日开盘
- try:
- now = pd.Timestamp.now()
- today_start = now.normalize() # 今天0:00
- today_bars = df_1h[df_1h.index >= today_start] if hasattr(df_1h.index, 'normalize') else df_1h.iloc[-24:]
- day_open = float(today_bars.iloc[0]['open']) if len(today_bars) > 0 else float(df_1h.iloc[-24]['open'])
- except Exception:
- day_open = float(df_1h.iloc[-1]['open'])
- context_parts.append(f"当日开盘价: {day_open:.1f}(供需参考)")
+ feature_5m = self._summarize_timeframe_features(data.get('5m'), '5m')
+ feature_15m = self._summarize_timeframe_features(data.get('15m'), '15m')
+ feature_30m = self._summarize_timeframe_features(data.get('30m'), '30m')
+ feature_1h = self._summarize_timeframe_features(data.get('1h'), '1h')
+ feature_4h = self._summarize_timeframe_features(data.get('4h'), '4h')
- # 多周期数据(价格行为K线 + 技术指标摘要)
- # 主要时间周期显示多根K线,辅助周期显示摘要
- # 15m/30m加大根数以识别波段高低点和供需区
- PRICE_ACTION_BARS = {'5m': 10, '15m': 16, '30m': 12, '1h': 6}
+ intraday_alignment = self._describe_alignment([feature_5m, feature_15m, feature_30m])
+ trend_alignment = self._describe_alignment([feature_1h, feature_4h])
+ range_zone = self._detect_range_zone(data)
+ reversal_detection = self._detect_trend_reversal(data)
+ trend_stage = self._detect_trend_stage(data)
+ key_levels = self._derive_key_levels(data, range_zone)
- for tf_name, df in data.items():
- if df is None or len(df) == 0:
+ snapshot_parts = [
+ f"## 市场快照",
+ f"- 交易对: {symbol}",
+ f"- 当前价格: {current_price:.2f}",
+ f"- 24h涨跌: {price_change_24h}",
+ f"- 当日开盘: {day_open:.2f}" if day_open is not None else "- 当日开盘: N/A",
+ f"- 会话VWAP: {session_vwap:.2f}" if session_vwap is not None else "- 会话VWAP: N/A",
+ ]
+
+ if day_open:
+ snapshot_parts.append(f"- 相对日开盘偏离: {((current_price - day_open) / day_open) * 100:+.2f}%")
+ if session_vwap:
+ snapshot_parts.append(f"- 相对VWAP偏离: {((current_price - session_vwap) / session_vwap) * 100:+.2f}%")
+ if opening_range:
+ snapshot_parts.append(
+ f"- 开盘区间(前30分钟): 高 {opening_range['high']:.2f} / 低 {opening_range['low']:.2f}"
+ )
+
+ intraday_parts = [
+ "## 日内特征",
+ self._format_feature_line(feature_5m),
+ self._format_feature_line(feature_15m),
+ self._format_feature_line(feature_30m),
+ f"- 日内级别一致性: {intraday_alignment}",
+ ]
+
+ trend_parts = [
+ "## 趋势特征",
+ self._format_feature_line(feature_1h),
+ self._format_feature_line(feature_4h),
+ f"- 趋势级别一致性: {trend_alignment}",
+ ]
+
+ if trend_stage.get('stage') != 'unknown':
+ stage_map = {'early': '早期', 'middle': '中期', 'late': '晚期'}
+ trend_parts.append(
+ f"- 趋势阶段: {stage_map.get(trend_stage['stage'], trend_stage['stage'])} ({trend_stage['confidence']}%) | {trend_stage['analysis']}"
+ )
+
+ levels_parts = [
+ "## 关键位",
+ f"- 支撑位: {self._format_levels(key_levels.get('support'))}",
+ f"- 阻力位: {self._format_levels(key_levels.get('resistance'))}",
+ ]
+
+ if range_zone.get('is_ranging'):
+ intraday_parts.append(
+ f"- 区间判断: 是 ({range_zone['confidence']}%) | 宽度 {range_zone.get('range_width_pct', 0):.2f}% | {range_zone.get('analysis', '')}"
+ )
+ else:
+ intraday_parts.append("- 区间判断: 否")
+
+ if reversal_detection.get('is_reversing'):
+ reversal_type = "bullish" if reversal_detection.get('reversal_type') == 'bullish_reversal' else "bearish"
+ signal_desc = ", ".join(sig['desc'] for sig in reversal_detection.get('signals', [])[:3])
+ trend_parts.append(
+ f"- 反转检测: {reversal_type} ({reversal_detection['confidence']}%) | {signal_desc}"
+ )
+ else:
+ trend_parts.append("- 反转检测: 无显著反转信号")
+
+ return {
+ 'snapshot': "\n".join(snapshot_parts),
+ 'intraday': "\n".join(intraday_parts),
+ 'trend': "\n".join(trend_parts),
+ 'levels': "\n".join(levels_parts),
+ }
+
+ def _get_session_open(self, df: Optional[pd.DataFrame]) -> Optional[float]:
+ """获取当前交易日开盘价"""
+ if df is None or df.empty:
+ return None
+
+ try:
+ if 'open_time' not in df.columns:
+ return float(df.iloc[-24]['open']) if len(df) >= 24 else float(df.iloc[0]['open'])
+
+ latest_time = pd.to_datetime(df['open_time'].iloc[-1])
+ session_start = latest_time.normalize()
+ today_bars = df[df['open_time'] >= session_start]
+ if not today_bars.empty:
+ return float(today_bars.iloc[0]['open'])
+ except Exception as e:
+ logger.debug(f"获取交易日开盘价失败: {e}")
+
+ return float(df.iloc[0]['open']) if not df.empty else None
+
+ def _calculate_session_vwap(self, df: Optional[pd.DataFrame]) -> Optional[float]:
+ """计算当前交易日 VWAP"""
+ if df is None or df.empty or 'volume' not in df.columns:
+ return None
+
+ try:
+ session_df = df
+ if 'open_time' in df.columns:
+ latest_time = pd.to_datetime(df['open_time'].iloc[-1])
+ session_start = latest_time.normalize()
+ session_df = df[df['open_time'] >= session_start]
+
+ if session_df.empty:
+ return None
+
+ typical_price = (session_df['high'] + session_df['low'] + session_df['close']) / 3
+ volume = session_df['volume'].replace(0, np.nan)
+ total_volume = volume.sum()
+ if pd.isna(total_volume) or total_volume <= 0:
+ return None
+
+ return float((typical_price * session_df['volume']).sum() / total_volume)
+ except Exception as e:
+ logger.debug(f"计算 VWAP 失败: {e}")
+ return None
+
+ def _calculate_opening_range(self, df: Optional[pd.DataFrame], bars: int = 6) -> Optional[Dict[str, float]]:
+ """计算前 30 分钟开盘区间"""
+ if df is None or df.empty or len(df) < bars:
+ return None
+
+ try:
+ session_df = df
+ if 'open_time' in df.columns:
+ latest_time = pd.to_datetime(df['open_time'].iloc[-1])
+ session_start = latest_time.normalize()
+ session_df = df[df['open_time'] >= session_start]
+
+ session_df = session_df.iloc[:bars]
+ if session_df.empty:
+ return None
+
+ return {
+ 'high': float(session_df['high'].max()),
+ 'low': float(session_df['low'].min())
+ }
+ except Exception as e:
+ logger.debug(f"计算开盘区间失败: {e}")
+ return None
+
+ def _summarize_timeframe_features(self, df: Optional[pd.DataFrame], timeframe: str) -> Dict[str, Any]:
+ """将单个周期的 K 线转换为高价值特征摘要"""
+ feature = {
+ 'timeframe': timeframe,
+ 'available': False,
+ 'close': None,
+ 'ema_alignment': 'neutral',
+ 'structure': 'unknown',
+ 'momentum_3': None,
+ 'momentum_12': None,
+ 'rsi': None,
+ 'atr_pct': None,
+ 'volume_ratio': None,
+ 'distance_to_ema20': None,
+ 'distance_to_recent_high': None,
+ 'distance_to_recent_low': None,
+ 'is_accelerating': False,
+ }
+
+ if df is None or df.empty or len(df) < 20:
+ return feature
+
+ latest = df.iloc[-1]
+ close = float(latest['close'])
+ ema5 = latest.get('ema5')
+ ema10 = latest.get('ema10')
+ ema20 = latest.get('ema20')
+ rsi = latest.get('rsi')
+ atr = latest.get('atr')
+
+ feature.update({
+ 'available': True,
+ 'close': close,
+ 'rsi': float(rsi) if pd.notna(rsi) else None,
+ 'atr_pct': float(atr / close * 100) if pd.notna(atr) and close > 0 else None,
+ 'distance_to_ema20': self._distance_percent(close, ema20),
+ 'structure': self._infer_price_structure(df),
+ 'momentum_3': self._window_return(df, 3),
+ 'momentum_12': self._window_return(df, 12),
+ 'volume_ratio': self._calculate_volume_ratio(df),
+ 'is_accelerating': self._is_accelerating(df),
+ })
+
+ if pd.notna(ema5) and pd.notna(ema10) and pd.notna(ema20):
+ if ema5 > ema10 > ema20:
+ feature['ema_alignment'] = 'bull'
+ elif ema5 < ema10 < ema20:
+ feature['ema_alignment'] = 'bear'
+ else:
+ feature['ema_alignment'] = 'mixed'
+
+ recent_window = df.iloc[-20:]
+ recent_high = float(recent_window['high'].max())
+ recent_low = float(recent_window['low'].min())
+ feature['distance_to_recent_high'] = self._distance_percent(close, recent_high)
+ feature['distance_to_recent_low'] = self._distance_percent(close, recent_low)
+
+ return feature
+
+ def _format_feature_line(self, feature: Dict[str, Any]) -> str:
+ """格式化单周期特征摘要"""
+ if not feature.get('available'):
+ return f"- {feature.get('timeframe')}: 数据不足"
+
+ def fmt(value: Optional[float], digits: int = 2) -> str:
+ return "N/A" if value is None else f"{value:+.{digits}f}%"
+
+ return (
+ f"- {feature['timeframe']}: 结构={feature['structure']} | EMA={feature['ema_alignment']} | "
+ f"3bar={fmt(feature['momentum_3'])} | 12bar={fmt(feature['momentum_12'])} | "
+ f"RSI={feature['rsi']:.1f} | ATR={feature['atr_pct']:.2f}% | "
+ f"量比={feature['volume_ratio']:.2f} | "
+ f"距EMA20={fmt(feature['distance_to_ema20'])} | "
+ f"距20bar高点={fmt(feature['distance_to_recent_high'])} | "
+ f"距20bar低点={fmt(feature['distance_to_recent_low'])} | "
+ f"加速={'是' if feature['is_accelerating'] else '否'}"
+ )
+
+ def _describe_alignment(self, features: List[Dict[str, Any]]) -> str:
+ """描述多周期方向一致性"""
+ directions = []
+ for feature in features:
+ if not feature.get('available'):
+ continue
+ direction = feature.get('ema_alignment')
+ if direction == 'mixed':
+ direction = 'neutral'
+ directions.append(direction)
+
+ if not directions:
+ return "数据不足"
+ if all(direction == 'bull' for direction in directions):
+ return "多头一致"
+ if all(direction == 'bear' for direction in directions):
+ return "空头一致"
+ if all(direction == 'neutral' for direction in directions):
+ return "全部中性"
+ return "存在分歧"
+
+ def _derive_key_levels(self, data: Dict[str, pd.DataFrame], range_zone: Dict[str, Any]) -> Dict[str, List[float]]:
+ """提炼高价值支撑阻力位"""
+ supports: List[float] = []
+ resistances: List[float] = []
+
+ if range_zone.get('support_level'):
+ supports.append(float(range_zone['support_level']))
+ if range_zone.get('resistance_level'):
+ resistances.append(float(range_zone['resistance_level']))
+
+ for timeframe, count in [('30m', 20), ('1h', 20), ('4h', 12)]:
+ df = data.get(timeframe)
+ if df is None or len(df) < count:
continue
- context_parts.append(f"\n## {tf_name} 数据")
+ window = df.iloc[-count:]
+ supports.append(float(window['low'].min()))
+ resistances.append(float(window['high'].max()))
- n_bars = PRICE_ACTION_BARS.get(tf_name, 0)
- if n_bars > 0 and len(df) >= 2:
- # 预计算量比基准(用前20根K线均量,排除显示窗口以外的数据)
- vol_window = df['volume'].iloc[-(n_bars + 20):-n_bars] if len(df) > n_bars + 20 else df['volume'].iloc[:max(1, len(df) - n_bars)]
- vol_ma = vol_window.mean() if len(vol_window) > 0 else df['volume'].mean()
+ ema20 = df['ema20'].iloc[-1] if 'ema20' in df.columns else None
+ if pd.notna(ema20):
+ if float(ema20) < float(df['close'].iloc[-1]):
+ supports.append(float(ema20))
+ else:
+ resistances.append(float(ema20))
- # 显示最近 N 根K线(价格行为分析)
- bars = df.iloc[-n_bars:] if len(df) >= n_bars else df
- total = len(bars)
- bar_lines = []
- for i, (_, row) in enumerate(bars.iterrows()):
- offset = -(total - 1 - i) # 最新=0,往前为负
- o, h, l, c = float(row['open']), float(row['high']), float(row['low']), float(row['close'])
- vol = float(row.get('volume', 0))
- body = abs(c - o)
- upper_wick = h - max(o, c)
- lower_wick = min(o, c) - l
- direction = "阳" if c >= o else "阴"
- # 量比标注:放量/缩量/平量
- if vol_ma > 0:
- vr = vol / vol_ma
- if vr >= 1.5:
- vol_tag = f"量比:{vr:.1f}🔥"
- elif vr >= 1.2:
- vol_tag = f"量比:{vr:.1f}↑"
- elif vr <= 0.6:
- vol_tag = f"量比:{vr:.1f}↓↓"
- elif vr <= 0.8:
- vol_tag = f"量比:{vr:.1f}↓"
- else:
- vol_tag = f"量比:{vr:.1f}"
- else:
- vol_tag = ""
- label = " ← 当前" if offset == 0 else ""
- bar_lines.append(
- f" [{offset:+d}] {direction}线 开:{o:.1f} 高:{h:.1f} 低:{l:.1f} 收:{c:.1f} "
- f"实体:{body:.1f} 上影:{upper_wick:.1f} 下影:{lower_wick:.1f} {vol_tag}{label}"
- )
- context_parts.append("\n".join(bar_lines))
- else:
- # 辅助周期只显示最新一根
- latest = df.iloc[-1]
- context_parts.append(f"开: {latest['open']}, 高: {latest['high']}, 低: {latest['low']}, 收: {latest['close']}")
- context_parts.append(f"成交量: {latest.get('volume', 'N/A')}")
+ return {
+ 'support': self._dedupe_levels(supports, reverse=True),
+ 'resistance': self._dedupe_levels(resistances, reverse=False),
+ }
- # 技术指标摘要(RSI + ATR,去掉 MACD/BB 的逐周期输出)
- indicators = []
- if 'rsi' in df.columns and not pd.isna(df['rsi'].iloc[-1]):
- indicators.append(f"RSI: {df['rsi'].iloc[-1]:.1f}")
- if 'atr' in df.columns and not pd.isna(df['atr'].iloc[-1]):
- indicators.append(f"ATR: {df['atr'].iloc[-1]:.2f}")
- if 'ema20' in df.columns and not pd.isna(df['ema20'].iloc[-1]):
- indicators.append(f"EMA20: {df['ema20'].iloc[-1]:.1f}")
- if indicators:
- context_parts.append(" 指标: " + " | ".join(indicators))
+ def _dedupe_levels(self, levels: List[float], reverse: bool) -> List[float]:
+ """对价位去重,避免同类水平位过密"""
+ cleaned = sorted([float(level) for level in levels if level], reverse=reverse)
+ deduped: List[float] = []
+ for level in cleaned:
+ if not deduped:
+ deduped.append(level)
+ continue
+ if abs(level - deduped[-1]) / deduped[-1] > 0.003:
+ deduped.append(level)
+ if len(deduped) >= 3:
+ break
+ return deduped
- # 多级别趋势分析(检测小级别反转)
- context_parts.append(self._analyze_multi_timeframe_trend(data))
+ def _format_levels(self, levels: Optional[List[float]]) -> str:
+ if not levels:
+ return "N/A"
+ return ", ".join(f"{level:.2f}" for level in levels[:3])
- # 量比分析
- df_5m = data.get('5m')
- if df_5m is not None and len(df_5m) >= 20:
- vol_latest = df_5m['volume'].iloc[-1]
- vol_ma20 = df_5m['volume'].iloc[-20:-1].mean()
- volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1
- context_parts.append(f"\n## 量价分析")
- context_parts.append(f"最新成交量: {vol_latest:.0f}")
- context_parts.append(f"20周期均量: {vol_ma20:.0f}")
- context_parts.append(f"量比: {volume_ratio:.2f}")
+ def _infer_price_structure(self, df: pd.DataFrame, lookback: int = 20) -> str:
+ """根据分段高低点判断 HH/HL / LH/LL / 区间"""
+ if df is None or len(df) < lookback:
+ return "unknown"
- if volume_ratio > 1.5:
- context_parts.append("量价状态: 放量 📊")
- elif volume_ratio < 0.7:
- context_parts.append("量价状态: 缩量 📉")
- else:
- context_parts.append("量价状态: 平量 ➖")
+ window = df.iloc[-lookback:]
+ half = max(lookback // 2, 5)
+ first = window.iloc[:half]
+ second = window.iloc[-half:]
- # 波动率分析
- volatility_analysis = self._analyze_volatility(data)
- if volatility_analysis:
- context_parts.append(f"\n## 波动率分析")
- context_parts.append(volatility_analysis)
+ prev_high = float(first['high'].max())
+ prev_low = float(first['low'].min())
+ recent_high = float(second['high'].max())
+ recent_low = float(second['low'].min())
- # 趋势位置分析(新增:避免盲目追涨杀跌)
- trend_position_analysis = self._analyze_trend_position(data)
- if trend_position_analysis:
- context_parts.append(f"\n## 趋势位置分析")
- context_parts.append(trend_position_analysis)
+ if recent_high > prev_high and recent_low > prev_low:
+ return "HH/HL"
+ if recent_high < prev_high and recent_low < prev_low:
+ return "LH/LL"
+ return "range/mixed"
- # ========== 新增:震荡区间检测 ==========
- range_zone = self._detect_range_zone(data)
- if range_zone['is_ranging']:
- context_parts.append(f"\n## 🔔 震荡区间检测(重要!)")
- context_parts.append(f"**状态**: 震荡市(置信度: {range_zone['confidence']}%)")
- if range_zone['support_level'] and range_zone['resistance_level']:
- context_parts.append(f"**支撑位**: ${range_zone['support_level']:,.2f}")
- context_parts.append(f"**压力位**: ${range_zone['resistance_level']:,.2f}")
- context_parts.append(f"**区间宽度**: {range_zone['range_width_pct']:.2f}%")
- if range_zone['volume_profile_support']:
- context_parts.append(f"**成交量密集区支撑**: ${range_zone['volume_profile_support']:,.2f}")
- if range_zone['volume_profile_resistance']:
- context_parts.append(f"**成交量密集区压力**: ${range_zone['volume_profile_resistance']:,.2f}")
- context_parts.append(f"**分析**: {range_zone['analysis']}")
- context_parts.append(f"\n**震荡市交易策略**:")
- context_parts.append(f" → 下沿附近挂多单,上沿附近挂空单")
- context_parts.append(f" → 目标: 对岸边界,快进快出")
- context_parts.append(f" → 严禁追涨杀跌!")
+ def _window_return(self, df: pd.DataFrame, bars: int) -> Optional[float]:
+ if df is None or len(df) <= bars:
+ return None
+ start_price = float(df['close'].iloc[-bars - 1])
+ end_price = float(df['close'].iloc[-1])
+ if start_price <= 0:
+ return None
+ return (end_price - start_price) / start_price * 100
- # ========== 新增:趋势反转检测 ==========
- reversal_detection = self._detect_trend_reversal(data)
- if reversal_detection['is_reversing']:
- context_parts.append(f"\n## ⚠️ 趋势反转信号(非常重要!)")
- context_parts.append(f"**检测到反转信号**!置信度: {reversal_detection['confidence']}%")
- if reversal_detection['reversal_type'] == 'bullish_reversal':
- context_parts.append(f"**反转类型**: 看涨反转 📈")
- else:
- context_parts.append(f"**反转类型**: 看跌反转 📉")
- context_parts.append(f"\n**反转信号详情**:")
- for sig in reversal_detection['signals'][:5]: # 最多显示5个信号
- context_parts.append(f" - [{sig['type']}] {sig['desc']} (权重: {sig['weight']})")
- context_parts.append(f"\n**🚨 反转信号处理规则**:")
- context_parts.append(f" → 现有同向持仓建议平仓")
- context_parts.append(f" → 考虑反方向开仓(需等待确认)")
- context_parts.append(f" → 或者暂时观望,等待反转确认")
- context_parts.append(f" → 严禁继续原方向开新仓!")
+ def _calculate_volume_ratio(self, df: pd.DataFrame, window: int = 20) -> float:
+ if df is None or len(df) <= window:
+ return 1.0
+ latest_volume = float(df['volume'].iloc[-1])
+ baseline = float(df['volume'].iloc[-window:-1].mean())
+ if baseline <= 0:
+ return 1.0
+ return latest_volume / baseline
- # ========== 新增:趋势阶段检测 ==========
- trend_stage = self._detect_trend_stage(data)
- if trend_stage['stage'] != 'unknown':
- stage_emoji = {'early': '🌱', 'middle': '🔄', 'late': '🌅'}.get(trend_stage['stage'], '❓')
- stage_name = {'early': '早期', 'middle': '中期', 'late': '晚期'}.get(trend_stage['stage'], '未知')
- context_parts.append(f"\n## 趋势阶段分析")
- context_parts.append(f"**当前阶段**: {stage_emoji} {stage_name}(置信度: {trend_stage['confidence']}%)")
- context_parts.append(f"**分析**: {trend_stage['analysis']}")
+ def _is_accelerating(self, df: pd.DataFrame, bars: int = 3, threshold: float = 0.3) -> bool:
+ if df is None or len(df) < bars + 1:
+ return False
+ closes = df['close'].iloc[-(bars + 1):].values
+ changes = [
+ (closes[i] - closes[i - 1]) / closes[i - 1] * 100
+ for i in range(1, len(closes))
+ if closes[i - 1] > 0
+ ]
+ if len(changes) < bars:
+ return False
+ same_direction = all(change > 0 for change in changes) or all(change < 0 for change in changes)
+ large_enough = sum(1 for change in changes if abs(change) >= threshold) >= bars - 1
+ return same_direction and large_enough
- if trend_stage['stage'] == 'late':
- context_parts.append(f"\n**⚠️ 晚期阶段警告**:")
- context_parts.append(f" → 趋势可能即将反转或进入震荡")
- context_parts.append(f" → 严禁追涨/追空开新仓")
- context_parts.append(f" → 现有盈利持仓建议逐步止盈")
- context_parts.append(f" → 等待明确反转信号后再决策")
- elif trend_stage['stage'] == 'early':
- context_parts.append(f"\n**✅ 早期阶段机会**:")
- context_parts.append(f" → 趋势刚启动,可顺势轻仓入场")
- context_parts.append(f" → 设置止损后可持有更长时间")
- context_parts.append(f" → 目标可看更大空间")
-
- return "\n".join(context_parts)
+ def _distance_percent(self, value: Optional[float], reference: Optional[float]) -> Optional[float]:
+ if value is None or reference is None or pd.isna(reference) or reference == 0:
+ return None
+ return (float(value) - float(reference)) / float(reference) * 100
async def _get_news_context(self, symbol: str) -> str:
"""获取新闻舆情上下文"""
@@ -422,11 +638,38 @@ class MarketSignalAnalyzer:
)
if not market_data:
return ""
- return self.exchange.format_futures_data_for_llm(symbol, market_data)
+ return self._format_futures_context(symbol, market_data)
except Exception as e:
logger.warning(f"获取 {symbol} 合约数据失败: {e}")
return ""
+ def _format_futures_context(self, symbol: str, market_data: Dict[str, Any]) -> str:
+ """格式化高价值合约特征,避免大段说明性文本"""
+ funding = market_data.get('funding_rate', {})
+ oi = market_data.get('open_interest', {})
+ premium = market_data.get('premium_rate')
+
+ lines = [
+ f"## 衍生品特征",
+ f"- 交易对: {symbol}",
+ ]
+
+ if funding:
+ lines.append(
+ f"- 资金费率: {funding.get('funding_rate_percent', 0):+.4f}% | 情绪: {funding.get('sentiment', 'unknown')}"
+ )
+ lines.append(
+ f"- 标记价 vs 指数价: {market_data.get('mark_price', 0):.2f} vs {market_data.get('index_price', 0):.2f}"
+ )
+
+ if oi:
+ lines.append(f"- 持仓量: {oi.get('open_interest', 0):,.0f}")
+
+ if premium is not None:
+ lines.append(f"- 溢价率: {premium:+.2f}%")
+
+ return "\n".join(lines)
+
def _analyze_trend_position(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析趋势位置和日内交易机会(使用 EMA)+ 市场状态判断(震荡/趋势)"""
try:
@@ -667,84 +910,175 @@ class MarketSignalAnalyzer:
logger.warning(f"趋势位置分析失败: {e}")
return ""
- def _build_analysis_prompt(self, symbol: str, market_context: str,
+ def _build_analysis_prompt(self, symbol: str, lane: str,
+ market_context: Dict[str, str],
news_context: str,
- previous_signal: Dict[str, Any] = None,
futures_context: str = "") -> str:
"""构建分析提示词"""
- prompt_parts = [
- f"请分析 {symbol} 的市场情况:\n",
- market_context,
- "",
- news_context
+ lane_text = "日内交易分析" if lane == "intraday" else "趋势交易分析"
+ lane_scope = (
+ [
+ "只根据下面提供的日内结构化特征做判断,不要脑补未提供的数据。",
+ "重点阅读 5m/15m/30m、当日开盘、VWAP、开盘区间、区间状态、关键位和衍生品过热程度。",
+ ]
+ if lane == "intraday"
+ else [
+ "只根据下面提供的趋势结构化特征做判断,不要脑补未提供的数据。",
+ "重点阅读 1h/4h、一致性、趋势阶段、反转检测、关键位、新闻催化和衍生品拥挤度。",
+ ]
+ )
+
+ selected_sections = [
+ market_context.get('snapshot', ''),
+ market_context.get('intraday', '') if lane == "intraday" else market_context.get('trend', ''),
+ market_context.get('levels', ''),
]
- # 添加合约市场数据(资金费率等)
+ prompt_parts = [
+ f"请对 {symbol} 进行{lane_text}。",
+ *lane_scope,
+ ]
+
+ for section in selected_sections:
+ if section:
+ prompt_parts.append("")
+ prompt_parts.append(section)
+
+ if news_context and news_context not in {"无最新新闻", "新闻获取失败"}:
+ prompt_parts.append("")
+ prompt_parts.append(news_context)
+
if futures_context:
prompt_parts.append("")
prompt_parts.append(futures_context)
- # 添加历史信号上下文
- if previous_signal:
- prev_time = previous_signal.get('timestamp', 'Unknown')
- prev_trend = previous_signal.get('trend', 'Unknown')
- prev_signals = previous_signal.get('signals', [])
-
- prompt_parts.append("\n" + "="*60)
- prompt_parts.append("## 上一轮分析信号(必须参考!)")
- prompt_parts.append("="*60)
- prompt_parts.append(f"分析时间: {prev_time}")
- prompt_parts.append(f"趋势判断: {prev_trend}")
-
- if prev_signals:
- prompt_parts.append("\n之前给出的信号:")
- for i, sig in enumerate(prev_signals, 1):
- action = sig.get('action', 'N/A')
- confidence = sig.get('confidence', 0)
- timeframe = sig.get('timeframe', 'unknown')
- type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
- type_text = type_map.get(timeframe, timeframe)
-
- entry = sig.get('entry_price', 'N/A')
- sl = sig.get('stop_loss', 'N/A')
- tp = sig.get('take_profit', 'N/A')
- reasoning = sig.get('reasoning', 'N/A')
-
- prompt_parts.append(
- f"\n[{i}] {type_text} | {action} | 信心度: {confidence}%\n"
- f" 入场: ${entry}\n"
- f" 止损: ${sl}\n"
- f" 止盈: ${tp}\n"
- f" 理由: {reasoning}"
- )
-
- # 重点警告
- prompt_parts.append("\n" + "!"*60)
- prompt_parts.append("🚨 严禁重复信号!")
- prompt_parts.append("!"*60)
- prompt_parts.append("如果上一轮已经给出了相同方向的信号(做空/做多),")
- prompt_parts.append("且趋势没有发生明确反转,")
- prompt_parts.append("绝对不要重复给出相同方向的信号!")
- prompt_parts.append("")
- prompt_parts.append("只有在以下情况才输出新信号:")
- prompt_parts.append(" ✓ 趋势发生了明确的反转")
- prompt_parts.append(" ✓ 上一轮是观望,现在出现了新的明确机会")
- prompt_parts.append(" ✓ 价格已触及上一轮的止损/止盈价位")
- prompt_parts.append("")
- prompt_parts.append("以下情况不要输出信号:")
- prompt_parts.append(" ✗ 趋势延续,只是价格继续向同一方向移动")
- prompt_parts.append(" ✗ 仅仅因为均线排列仍然有效")
- prompt_parts.append(" ✗ 没有明显的市场变化")
- else:
- prompt_parts.append("\n上一轮没有给出交易信号(市场观望建议)")
- prompt_parts.append("\n你可以基于当前市场情况给出新的信号。")
-
- prompt_parts.append("\n" + "="*60)
-
- prompt_parts.append("\n请根据以上数据,给出你的市场判断和交易信号。")
+ prompt_parts.append("")
+ prompt_parts.append("输出要求:只返回 system prompt 定义的 JSON 对象。没有高质量 setup 就返回 signals: []。")
return "\n".join(prompt_parts)
+ def _merge_lane_results(self, symbol: str,
+ intraday_result: Dict[str, Any],
+ trend_result: Dict[str, Any]) -> Dict[str, Any]:
+ """合并日内与趋势两路 LLM 结果"""
+ result = self._get_empty_signal(symbol)
+ result['raw_response'] = {
+ 'intraday': intraday_result.get('raw_response', ''),
+ 'trend': trend_result.get('raw_response', '')
+ }
+
+ intraday_signals = self._normalize_lane_signals(intraday_result.get('signals', []), 'short_term')
+ trend_signals = self._normalize_lane_signals(trend_result.get('signals', []), 'medium_term')
+ merged_signals = sorted(
+ intraday_signals + trend_signals,
+ key=lambda signal: signal.get('confidence', 0),
+ reverse=True
+ )[:2]
+
+ result['signals'] = merged_signals
+ result['key_levels'] = {
+ 'support': self._dedupe_levels(
+ (intraday_result.get('key_levels', {}) or {}).get('support', []) +
+ (trend_result.get('key_levels', {}) or {}).get('support', []),
+ reverse=True
+ ),
+ 'resistance': self._dedupe_levels(
+ (intraday_result.get('key_levels', {}) or {}).get('resistance', []) +
+ (trend_result.get('key_levels', {}) or {}).get('resistance', []),
+ reverse=False
+ ),
+ }
+
+ trend_direction = trend_result.get('trend_direction')
+ if trend_direction in (None, 'neutral'):
+ trend_direction = intraday_result.get('trend_direction', 'neutral')
+ result['trend_direction'] = trend_direction or 'neutral'
+
+ trend_strength = trend_result.get('trend_strength')
+ if trend_strength in (None, 'weak') and result['trend_direction'] == 'neutral':
+ trend_strength = intraday_result.get('trend_strength', 'weak')
+ result['trend_strength'] = trend_strength or 'weak'
+
+ intraday_state = intraday_result.get('market_state')
+ trend_state = trend_result.get('market_state')
+ if trend_state == 'trending':
+ result['market_state'] = '趋势市'
+ elif intraday_state == 'ranging':
+ result['market_state'] = '震荡市'
+ elif intraday_state == 'trending':
+ result['market_state'] = '日内趋势'
+ else:
+ result['market_state'] = intraday_state or trend_state or '中性'
+
+ intraday_summary = intraday_result.get('analysis_summary', '无')
+ trend_summary = trend_result.get('analysis_summary', '无')
+ result['analysis_summary'] = f"日内:{intraday_summary} | 趋势:{trend_summary}"
+
+ if result['trend_direction'] == 'uptrend':
+ result['trend'] = 'up'
+ elif result['trend_direction'] == 'downtrend':
+ result['trend'] = 'down'
+ else:
+ result['trend'] = 'sideways'
+
+ result['timestamp'] = datetime.now().isoformat()
+ return result
+
+ def _normalize_lane_signals(self, signals: List[Dict[str, Any]], lane_type: str) -> List[Dict[str, Any]]:
+ """统一信号时间框架标识"""
+ normalized = []
+ for signal in sorted(signals, key=lambda item: item.get('confidence', 0), reverse=True):
+ if signal.get('action') not in ['buy', 'sell']:
+ continue
+ signal = dict(signal)
+ signal['confidence'] = max(0, min(float(signal.get('confidence', 0) or 0), 100))
+ min_confidence = self.LANE_MIN_CONFIDENCE.get(lane_type, 60)
+ if signal['confidence'] < min_confidence:
+ continue
+ signal['entry_type'] = signal.get('entry_type', 'market')
+ if signal['entry_type'] not in {'market', 'limit'}:
+ signal['entry_type'] = 'market'
+ if not self._is_signal_price_structure_valid(signal):
+ continue
+ signal['grade'] = self._infer_signal_grade(signal['confidence'], lane_type)
+ if not signal.get('reasoning'):
+ signal['reasoning'] = '结构与关键位共振'
+ signal['timeframe'] = lane_type
+ signal['type'] = lane_type
+ normalized.append(signal)
+ return normalized[:1]
+
+ def _infer_signal_grade(self, confidence: float, lane_type: str) -> str:
+ """根据 lane 规则统一 grade,避免模型随意给等级"""
+ if lane_type == 'medium_term':
+ if confidence >= 82:
+ return 'A'
+ if confidence >= 72:
+ return 'B'
+ return 'C'
+
+ if confidence >= 80:
+ return 'A'
+ if confidence >= 70:
+ return 'B'
+ return 'C'
+
+ def _is_signal_price_structure_valid(self, signal: Dict[str, Any]) -> bool:
+ """验证信号价格结构是否合法"""
+ entry_price = signal.get('entry_price')
+ stop_loss = signal.get('stop_loss')
+ take_profit = signal.get('take_profit')
+ action = signal.get('action')
+
+ if not all(isinstance(price, (int, float)) and price > 0 for price in [entry_price, stop_loss, take_profit]):
+ return False
+
+ if action == 'buy':
+ return stop_loss < entry_price < take_profit
+ if action == 'sell':
+ return take_profit < entry_price < stop_loss
+ return False
+
def _analyze_multi_timeframe_trend(self, data: Dict[str, Any]) -> str:
"""
多级别趋势分析 - 检测小级别反转信号
@@ -920,6 +1254,7 @@ class MarketSignalAnalyzer:
# 清理价格字段 - 转换为 float
result = self._clean_price_fields(result)
+ result = self._normalize_response_schema(result)
# 添加元数据
result['symbol'] = symbol
@@ -1029,6 +1364,37 @@ class MarketSignalAnalyzer:
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
return json_str
+ def _normalize_response_schema(self, result: Dict[str, Any]) -> Dict[str, Any]:
+ """归一化 LLM 输出结构,避免下游依赖脏数据"""
+ if not isinstance(result, dict):
+ return self._get_empty_signal("")
+
+ normalized = dict(result)
+ normalized['market_state'] = str(normalized.get('market_state', 'neutral') or 'neutral')
+ normalized['trend_direction'] = str(normalized.get('trend_direction', 'neutral') or 'neutral')
+ normalized['trend_strength'] = str(normalized.get('trend_strength', 'weak') or 'weak')
+ normalized['analysis_summary'] = self._truncate_summary(normalized.get('analysis_summary', ''))
+ normalized['key_levels'] = self._normalize_key_levels(normalized.get('key_levels'))
+ normalized['signals'] = normalized.get('signals') if isinstance(normalized.get('signals'), list) else []
+ return normalized
+
+ def _normalize_key_levels(self, key_levels: Any) -> Dict[str, List[float]]:
+ """归一化关键位结构"""
+ if not isinstance(key_levels, dict):
+ return {'support': [], 'resistance': []}
+
+ support = [float(level) for level in key_levels.get('support', []) if isinstance(level, (int, float))]
+ resistance = [float(level) for level in key_levels.get('resistance', []) if isinstance(level, (int, float))]
+
+ return {
+ 'support': self._dedupe_levels(support, reverse=True),
+ 'resistance': self._dedupe_levels(resistance, reverse=False),
+ }
+
+ def _truncate_summary(self, summary: Any, max_length: int = 20) -> str:
+ text = str(summary or '').strip()
+ return text[:max_length]
+
def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清理价格字段,转换为 float"""
def clean_price(price_value):
@@ -1056,9 +1422,6 @@ class MarketSignalAnalyzer:
# 清理 signals 中的价格字段
if 'signals' in data:
- # 标记需要移除的信号索引
- signals_to_remove = []
-
for sig in data['signals']:
price_fields = ['entry_price', 'stop_loss', 'take_profit']
for field in price_fields:
diff --git a/backend/app/services/bitget_service.py b/backend/app/services/bitget_service.py
index 8f540b3..bb15f7f 100644
--- a/backend/app/services/bitget_service.py
+++ b/backend/app/services/bitget_service.py
@@ -19,6 +19,7 @@ class BitgetService:
'15m': '15m',
'30m': '30m',
'1h': '1H', # Bitget 大写
+ '4h': '4H', # Bitget 大写
}
# Bitget API 基础 URL
@@ -104,7 +105,7 @@ class BitgetService:
category: 产品类型,默认 USDT-FUTURES
Returns:
- 包含 1m, 5m, 15m, 30m, 1h 数据的字典
+ 包含 1m, 5m, 15m, 30m, 1h, 4h 数据的字典
"""
# 不同周期使用不同的数据量,平衡分析深度和性能
# 1m: 200根 = 3.3小时(超短线精确入场)
@@ -112,16 +113,18 @@ class BitgetService:
# 15m: 200根 = 2.1天(短线分析)
# 30m: 200根 = 4.2天(日内趋势)
# 1h: 300根 = 12.5天(日内主趋势)
+ # 4h: 180根 = 30天(趋势判断)
limits = {
'1m': 200,
'5m': 200,
'15m': 200,
'30m': 200,
- '1h': 300
+ '1h': 300,
+ '4h': 180
}
data = {}
- for interval in ['1m', '5m', '15m', '30m', '1h']:
+ for interval in ['1m', '5m', '15m', '30m', '1h', '4h']:
df = self.get_klines(symbol, interval, limit=limits.get(interval, 100),
category=category)
if not df.empty:
@@ -183,6 +186,7 @@ class BitgetService:
'15m': {'ma_short': 5, 'ma_mid': 10, 'ma_long': 20, 'ma_extra': 50},
'30m': {'ma_short': 5, 'ma_mid': 10, 'ma_long': 20, 'ma_extra': 50},
'1h': {'ma_short': 5, 'ma_mid': 10, 'ma_long': 20, 'ma_extra': 50},
+ '4h': {'ma_short': 5, 'ma_mid': 10, 'ma_long': 20, 'ma_extra': 50},
}
config = ma_config.get(interval, ma_config['1h'])
diff --git a/frontend/trading.html b/frontend/trading.html
index d647bb6..cdd7940 100644
--- a/frontend/trading.html
+++ b/frontend/trading.html
@@ -358,7 +358,7 @@
挂单中 ({{ pendingOrders.length }})