优化!

This commit is contained in:
aaron 2026-04-04 23:12:22 +08:00
parent 36143fb0cc
commit de75b0419b
3 changed files with 284 additions and 36 deletions

View File

@ -509,6 +509,7 @@ class CryptoAgent:
await self._check_and_set_pending_tp_sl_hyperliquid()
if self.bitget:
await self._check_and_set_pending_tp_sl_bitget()
await self._check_bitget_missing_tp_sl() # 兜底:检查缺少的 TP/SL 并补救
for symbol in self.symbols:
await self.analyze_symbol(symbol)
@ -721,13 +722,13 @@ class CryptoAgent:
if self.settings.paper_trading_enabled:
logger.info(f"\n📊 【模拟盘】")
paper_positions, paper_account, paper_pending = self._get_paper_trading_state()
paper_signal = self._select_signal_for_platform(valid_signals, 'PaperTrading')
paper_signal = self._select_signal_for_platform(valid_signals, 'PaperTrading', market_state=market_signal.get('market_state', '中性'))
if paper_signal:
logger.info(
f" 采用信号: {paper_signal.get('timeframe', 'unknown')} | "
f"{paper_signal.get('action')} | {paper_signal.get('confidence', 0)}%"
)
trading_signal = self._build_execution_signal(symbol, paper_signal, current_price)
trading_signal = self._build_execution_signal(symbol, paper_signal, current_price, market_signal)
paper_decision = self.execute_signal_with_rules(
trading_signal, 'PaperTrading', paper_account, paper_positions, paper_pending
)
@ -745,13 +746,13 @@ class CryptoAgent:
if self.hyperliquid:
logger.info(f"\n🔥 【Hyperliquid】")
hl_positions, hl_account, hl_pending = self._get_hyperliquid_trading_state()
hl_signal = self._select_signal_for_platform(valid_signals, 'Hyperliquid')
hl_signal = self._select_signal_for_platform(valid_signals, 'Hyperliquid', market_state=market_signal.get('market_state', '中性'))
if hl_signal:
logger.info(
f" 采用信号: {hl_signal.get('timeframe', 'unknown')} | "
f"{hl_signal.get('action')} | {hl_signal.get('confidence', 0)}%"
)
trading_signal = self._build_execution_signal(symbol, hl_signal, current_price)
trading_signal = self._build_execution_signal(symbol, hl_signal, current_price, market_signal)
hl_decision = self.execute_signal_with_rules(
trading_signal, 'Hyperliquid', hl_account, hl_positions, hl_pending
)
@ -769,13 +770,13 @@ class CryptoAgent:
if self.bitget:
logger.info(f"\n🔥 【Bitget】")
bg_positions, bg_account, bg_pending = self._get_bitget_trading_state()
bg_signal = self._select_signal_for_platform(valid_signals, 'Bitget')
bg_signal = self._select_signal_for_platform(valid_signals, 'Bitget', market_state=market_signal.get('market_state', '中性'))
if bg_signal:
logger.info(
f" 采用信号: {bg_signal.get('timeframe', 'unknown')} | "
f"{bg_signal.get('action')} | {bg_signal.get('confidence', 0)}%"
)
trading_signal = self._build_execution_signal(symbol, bg_signal, current_price)
trading_signal = self._build_execution_signal(symbol, bg_signal, current_price, market_signal)
bg_decision = self.execute_signal_with_rules(
trading_signal, 'Bitget', bg_account, bg_positions, bg_pending
)
@ -1223,25 +1224,48 @@ class CryptoAgent:
sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True)
return sorted_signals[0]
def _select_signal_for_platform(self, signals: List[Dict[str, Any]], platform_name: str) -> Optional[Dict[str, Any]]:
"""根据平台偏好选择最适合执行的信号"""
def _select_signal_for_platform(self, signals: List[Dict[str, Any]],
platform_name: str,
market_state: str = '中性') -> Optional[Dict[str, Any]]:
"""根据平台偏好和市场状态选择最适合执行的信号"""
if not signals:
return None
# 震荡市:趋势信号降权(信心 × 0.8),优先选择日内反转信号
adjusted_signals = []
for signal in signals:
s = dict(signal) # 不修改原信号
confidence = s.get('confidence', 50)
lane = s.get('timeframe') or s.get('type') or 'unknown'
if market_state == '震荡市' and lane == 'medium_term':
confidence = int(confidence * 0.8)
s['_regime_adjusted'] = True
s['_original_confidence'] = s.get('confidence', 50)
s['confidence'] = confidence
elif market_state in ('趋势市', '日内趋势') and lane == 'short_term':
pass # 趋势市不降权日内信号
adjusted_signals.append(s)
lane_priority = self.PLATFORM_SIGNAL_PRIORITY.get(platform_name, ['short_term', 'medium_term'])
by_lane: Dict[str, List[Dict[str, Any]]] = {}
for signal in signals:
for signal in adjusted_signals:
lane = signal.get('timeframe') or signal.get('type') or 'unknown'
by_lane.setdefault(lane, []).append(signal)
for lane in lane_priority:
candidates = by_lane.get(lane, [])
if candidates:
return sorted(candidates, key=lambda item: item.get('confidence', 0), reverse=True)[0]
best = sorted(candidates, key=lambda item: item.get('confidence', 0), reverse=True)[0]
if best.get('_regime_adjusted'):
logger.info(f" 📊 震荡市降权: {lane} 原始信心={best.get('_original_confidence')} → 调整后={best.get('confidence')}")
return best
return sorted(signals, key=lambda item: item.get('confidence', 0), reverse=True)[0]
return sorted(adjusted_signals, key=lambda item: item.get('confidence', 0), reverse=True)[0]
def _build_execution_signal(self, symbol: str, signal: Dict[str, Any], current_price: float) -> Dict[str, Any]:
def _build_execution_signal(self, symbol: str, signal: Dict[str, Any],
current_price: float,
market_signal: Dict[str, Any] = None) -> Dict[str, Any]:
"""构建传给执行规则层的标准信号格式"""
signal_type = signal.get('timeframe') or signal.get('type') or 'medium_term'
position_size = signal.get('position_size') or self.SIGNAL_POSITION_SIZE_DEFAULTS.get(signal_type, 'light')
@ -1259,6 +1283,9 @@ class CryptoAgent:
'timeframe': signal_type,
'type': signal_type,
'position_size': position_size,
'market_state': market_signal.get('market_state', '中性') if market_signal else '中性',
'regime': (market_signal.get('range_metrics') or {}).get('regime', '') if market_signal else '',
'funding_rate_data': market_signal.get('funding_rate_data') if market_signal else None,
}
def _get_signal_for_decision(self, market_signal: Dict[str, Any], decision: Dict[str, Any]) -> Dict[str, Any]:
@ -2254,6 +2281,25 @@ class CryptoAgent:
default_positions=self.SIGNAL_POSITION_SIZE_DEFAULTS,
)
# 市场状态仓位调整:震荡市降低仓位,避免来回被止损
regime = signal.get('regime', '')
REGIME_MARGIN_MULTIPLIERS = {
'ranging': 0.5, # 震荡市:仓位减半
'transitional': 0.7, # 过渡期:仓位七折
'weak_trend': 0.9, # 弱趋势:仓位九折
'strong_trend': 1.0, # 强趋势:满仓
}
regime_multiplier = REGIME_MARGIN_MULTIPLIERS.get(regime, 1.0)
if regime_multiplier < 1.0:
logger.info(f" 📊 市场状态调整: regime={regime}, 仓位系数={regime_multiplier}")
target_margin_pct *= regime_multiplier
# 连败降温:连败时降低仓位
streak_multiplier = signal.get('_streak_margin_multiplier', 1.0)
if streak_multiplier < 1.0:
logger.info(f" 📊 连败降温: 仓位系数={streak_multiplier}")
target_margin_pct *= streak_multiplier
margin, _, budget_reason = calculate_margin_and_position_value(
balance=balance,
available_margin=available,
@ -2386,6 +2432,53 @@ class CryptoAgent:
# 无反向订单 → 正常开仓
return "OPEN", "无反向订单,正常开仓"
def _check_losing_streak(self, platform_name: str, max_lookback: int = 5) -> Dict[str, Any]:
"""
检查近期交易连败情况
Returns:
{
'losing_streak': int,
'should_cool_down': bool,
'margin_multiplier': float,
'reason': str,
}
"""
recent_orders = []
if platform_name == 'PaperTrading' and self.paper_trading:
recent_orders = self.paper_trading.get_order_history(limit=max_lookback)
# Bitget/Hyperliquid 实盘暂不查询历史API 限制),用空列表
if not recent_orders:
return {'losing_streak': 0, 'should_cool_down': False, 'margin_multiplier': 1.0, 'reason': ''}
# 计算连败(从最近的交易往回数)
losing_streak = 0
for order in recent_orders:
pnl = order.get('pnl_amount', 0) or 0
if pnl < 0:
losing_streak += 1
else:
break # 遇到盈利就停止计数
if losing_streak >= 3:
return {
'losing_streak': losing_streak,
'should_cool_down': True,
'margin_multiplier': 0.3,
'reason': f"连败 {losing_streak}降温仓位×0.3"
}
elif losing_streak >= 2:
return {
'losing_streak': losing_streak,
'should_cool_down': True,
'margin_multiplier': 0.5,
'reason': f"连败 {losing_streak}降温仓位×0.5"
}
return {'losing_streak': losing_streak, 'should_cool_down': False, 'margin_multiplier': 1.0, 'reason': ''}
def _check_risk_control(self, signal: Dict[str, Any],
platform_name: str,
account: Dict[str, Any],
@ -2456,6 +2549,22 @@ class CryptoAgent:
except:
pass # 价格解析失败,跳过检查
# 5. 资金费率检查(极端资金费率时拒绝开仓)
funding_data = signal.get('funding_rate_data')
if funding_data:
fr_pct = funding_data.get('funding_rate_percent', 0) or 0
signal_action = signal.get('action', '')
if signal_action == 'buy' and fr_pct > 0.05:
return False, f"资金费率过热 {fr_pct:.4f}%(做多拥挤),拒绝做多"
elif signal_action == 'sell' and fr_pct < -0.05:
return False, f"资金费率过冷 {fr_pct:.4f}%(做空拥挤),拒绝做空"
# 6. 连败降温检查(不拒绝,但降低仓位)
streak_info = self._check_losing_streak(platform_name)
if streak_info.get('should_cool_down'):
signal['_streak_margin_multiplier'] = streak_info['margin_multiplier']
logger.warning(f"[{platform_name}] ⚠️ {streak_info['reason']}")
return True, "通过风控检查"
def execute_signal_with_rules(self, signal: Dict[str, Any],
@ -3212,6 +3321,77 @@ class CryptoAgent:
except Exception as e:
logger.error(f"[Bitget] 检查挂单 TP/SL 补设异常: {e}")
async def _check_bitget_missing_tp_sl(self):
"""定时检查 Bitget 持仓是否缺少止盈止损,缺少则从信号补救"""
if not self.bitget:
return
try:
positions = self.bitget.get_open_positions()
if not positions:
return
for pos in positions:
symbol = pos.get('symbol', '') # e.g. "BTCUSDT"
if not symbol:
continue
# 获取当前止盈止损
coin = symbol.replace('USDT', '')
tp_sl = self.bitget.get_tp_sl_prices(coin)
has_tp = tp_sl.get('take_profit') is not None
has_sl = tp_sl.get('stop_loss') is not None
if has_tp and has_sl:
continue # 都有,跳过
# 缺少 TP 或 SL从信号数据库查找最近信号补救
latest_signal = self.signal_db.get_latest_signal('crypto', symbol)
if not latest_signal:
missing = ('止盈' if not has_tp else '') + ('/' if not has_tp and not has_sl else '') + ('止损' if not has_sl else '')
logger.warning(f"[Bitget] ⚠️ {symbol} 缺少{missing},且无历史信号可补救")
continue
tp_price = latest_signal.get('take_profit')
sl_price = latest_signal.get('stop_loss')
if not tp_price and not sl_price:
logger.warning(f"[Bitget] ⚠️ {symbol} 缺少止盈止损,最近信号也无 TP/SL")
continue
# 只补救缺少的
set_tp = tp_price if not has_tp else None
set_sl = sl_price if not has_sl else None
missing_parts = []
if not has_tp:
missing_parts.append(f"TP={set_tp}")
if not has_sl:
missing_parts.append(f"SL={set_sl}")
missing_desc = ' & '.join(missing_parts)
logger.warning(f"[Bitget] 🔧 {symbol} 缺少 {missing_desc},从信号补救...")
# 用仓位实际大小设置
size = abs(pos.get('size', 0))
if size <= 0:
continue
tp_sl_result = self.bitget.set_tp_sl(
symbol=coin,
is_long=pos.get('size', 0) > 0,
size=size,
tp_price=set_tp,
sl_price=set_sl,
)
if tp_sl_result.get('success'):
logger.info(f"[Bitget] ✅ 补救成功: {symbol} {missing_desc}")
else:
logger.warning(f"[Bitget] ⚠️ 补救失败: {tp_sl_result.get('error')}")
except Exception as e:
logger.error(f"[Bitget] 止盈止损兜底检查异常: {e}")
def _calculate_hyperliquid_position_size(self, decision: Dict[str, Any], current_price: float) -> float:
"""
计算 Hyperliquid 仓位大小基于可用保证金和风控限制
@ -3858,12 +4038,19 @@ class CryptoAgent:
async def _check_position_management_all_platforms(self):
"""检查各平台的持仓管理(止盈/止损/移动止损)"""
try:
# 获取当前价格
# 获取当前价格和 ATR 波动率
current_prices = {}
volatility_data = {}
for symbol in self.symbols:
try:
data = self.exchange.get_multi_timeframe_data(symbol)
current_prices[symbol] = float(data['5m'].iloc[-1]['close'])
# 提取 1h ATR 占价格的百分比,用于动态移动止损
if '1h' in data and 'atr' in data['1h'].columns:
atr_value = data['1h']['atr'].iloc[-1]
price_1h = data['1h']['close'].iloc[-1]
if atr_value and price_1h > 0:
volatility_data[symbol] = float(atr_value) / float(price_1h)
except:
continue
@ -3881,8 +4068,8 @@ class CryptoAgent:
if not positions:
continue
# 检查持仓管理
actions = executor.check_position_management(positions, current_prices)
# 检查持仓管理(传递 ATR 波动率数据)
actions = executor.check_position_management(positions, current_prices, volatility_data)
# 执行建议的操作
for action_info in actions:

View File

@ -174,10 +174,16 @@ class BaseExecutor(ABC):
def check_position_management(self,
positions: List[Dict],
current_prices: Dict[str, float]) -> List[Dict[str, Any]]:
current_prices: Dict[str, float],
volatility_data: Optional[Dict[str, float]] = None) -> List[Dict[str, Any]]:
"""
持仓管理检查
Args:
positions: 持仓列表
current_prices: 当前价格 {symbol: price}
volatility_data: 波动率数据 {symbol: atr_pct}1h ATR / price
Returns:
建议的操作列表
"""
@ -230,23 +236,56 @@ class BaseExecutor(ABC):
'priority': 2
})
# 规则3: 移动止损
if pnl_pct >= 2:
# 规则3: 动态移动止损(基于 ATR 波动率)
current_sl = pos.get('stop_loss')
if side == 'buy' and current_sl and current_sl < entry_price:
if not current_sl:
continue
# 获取波动率ATR 占价格的百分比(如 0.015 = 1.5%
atr_pct = (volatility_data.get(symbol) if volatility_data else None) or 0.02
# 波动率倍数:低波动(震荡)给更多空间,高波动(趋势)收得更紧
atr_multiplier = max(1.5, 2.0 / atr_pct)
# 两级移动止损
# 第一级:盈利 >= 0.6 * ATR% * multiplier → SL 移到入场价 ± 1倍ATR给足震荡空间
# 第二级:盈利 >= 1.0 * ATR% * multiplier → SL 移到保本
step1_threshold = atr_pct * 100 * atr_multiplier * 0.6
step2_threshold = atr_pct * 100 * atr_multiplier * 1.0
if pnl_pct >= step2_threshold:
# 盈利充足,移到保本
if (side == 'buy' and current_sl < entry_price) or \
(side == 'sell' and current_sl > entry_price):
actions.append({
'symbol': symbol,
'action': 'MOVE_SL',
'new_sl': entry_price,
'reason': f"盈利 {pnl_pct:.1f}% >= 2%,移动止损到入场价",
'pnl_pct': pnl_pct,
'reason': f"盈利 {pnl_pct:.1f}% >= {step2_threshold:.1f}%,移动止损到保本",
'priority': 3
})
elif side == 'sell' and current_sl and current_sl > entry_price:
elif pnl_pct >= step1_threshold:
# 盈利初步,移到入场价 ± 1倍ATR保留震荡空间不急于保本
if side == 'buy' and current_sl < entry_price:
new_sl = entry_price * (1 - atr_pct)
if new_sl > current_sl:
actions.append({
'symbol': symbol,
'action': 'MOVE_SL',
'new_sl': entry_price,
'reason': f"盈利 {pnl_pct:.1f}% >= 2%,移动止损到入场价",
'new_sl': new_sl,
'pnl_pct': pnl_pct,
'reason': f"盈利 {pnl_pct:.1f}% >= {step1_threshold:.1f}%,移动止损到入场价-1ATR (${new_sl:.2f})",
'priority': 3
})
elif side == 'sell' and current_sl > entry_price:
new_sl = entry_price * (1 + atr_pct)
if new_sl < current_sl:
actions.append({
'symbol': symbol,
'action': 'MOVE_SL',
'new_sl': new_sl,
'pnl_pct': pnl_pct,
'reason': f"盈利 {pnl_pct:.1f}% >= {step1_threshold:.1f}%,移动止损到入场价+1ATR (${new_sl:.2f})",
'priority': 3
})

View File

@ -208,7 +208,7 @@ class MarketSignalAnalyzer:
news_context = await self._get_news_context(symbol)
# 3. 获取合约市场数据(资金费率、持仓量等)
futures_context = await self._get_futures_context(symbol)
futures_context, futures_market_data = await self._get_futures_context(symbol)
# 4. 将日内和趋势拆成两次独立分析,避免一个 prompt 同时混做两件事
intraday_prompt = self._build_analysis_prompt(
@ -251,7 +251,24 @@ class MarketSignalAnalyzer:
intraday_result = self._parse_llm_response(intraday_response or "", symbol)
trend_result = self._parse_llm_response(trend_response or "", symbol)
return self._merge_lane_results(symbol, intraday_result, trend_result)
result = self._merge_lane_results(symbol, intraday_result, trend_result)
# 携带量化 regime 数据到最终结果,供执行层使用
if market_context.get('range_metrics'):
result['range_metrics'] = market_context['range_metrics']
# 携带资金费率数据到最终结果,供执行层风控使用
if futures_market_data:
funding = futures_market_data.get('funding_rate') or {}
result['funding_rate_data'] = {
'funding_rate': funding.get('funding_rate'),
'funding_rate_percent': funding.get('funding_rate_percent', 0),
'sentiment': funding.get('sentiment'),
'sentiment_level': funding.get('sentiment_level'),
'open_interest': futures_market_data.get('open_interest'),
}
return result
except Exception as e:
logger.error(f"市场信号分析失败: {e}")
@ -385,6 +402,7 @@ class MarketSignalAnalyzer:
'trend': "\n".join(trend_parts),
'levels': "\n".join(levels_parts),
'range_warning': range_warning,
'range_metrics': range_metrics,
}
def _get_session_open(self, df: Optional[pd.DataFrame]) -> Optional[float]:
@ -1184,8 +1202,12 @@ class MarketSignalAnalyzer:
logger.warning(f"获取新闻失败: {e}")
return "新闻获取失败"
async def _get_futures_context(self, symbol: str) -> str:
"""获取合约市场数据(资金费率、持仓量、溢价率)"""
async def _get_futures_context(self, symbol: str) -> tuple:
"""获取合约市场数据(资金费率、持仓量、溢价率)
Returns:
(formatted_str, raw_market_data) - 格式化文本和原始数据
"""
try:
loop = asyncio.get_event_loop()
market_data = await loop.run_in_executor(
@ -1194,11 +1216,11 @@ class MarketSignalAnalyzer:
symbol
)
if not market_data:
return ""
return self._format_futures_context(symbol, market_data)
return "", None
return self._format_futures_context(symbol, market_data), market_data
except Exception as e:
logger.warning(f"获取 {symbol} 合约数据失败: {e}")
return ""
return "", None
def _format_futures_context(self, symbol: str, market_data: Dict[str, Any]) -> str:
"""格式化高价值合约特征,避免大段说明性文本"""