This commit is contained in:
aaron 2026-04-22 22:54:33 +08:00
parent b1fcb6d29e
commit e3bad3ea29
4 changed files with 353 additions and 43 deletions

View File

@ -123,6 +123,14 @@ class Settings(BaseSettings):
crypto_min_volatility_percent: float = 0.5 # 最小波动率(百分比),低于此值跳过分析
crypto_min_price_range_percent: float = 0.3 # 最小价格变动范围(百分比),低于此值跳过分析
crypto_5m_surge_threshold: float = 1.0 # 5分钟突发波动阈值百分比超过此值即使1小时波动率低也会触发分析
crypto_intraday_llm_cooldown_minutes: int = 15 # 日内 LLM 分析冷却时间
crypto_trend_llm_cooldown_minutes: int = 60 # 趋势 LLM 分析冷却时间
crypto_force_llm_surge_threshold: float = 1.2 # 15分钟突发波动强制触发 LLM 的阈值
crypto_force_llm_trade_zone_pct: float = 0.25 # 接近关键交易区时强制触发 LLM 的距离阈值
crypto_event_analysis_enabled: bool = True # 是否启用实时行情事件触发分析
crypto_event_analysis_window_minutes: int = 5 # 实时行情异动检测窗口
crypto_event_analysis_price_change_percent: float = 0.8 # 检测窗口内涨跌超过该阈值触发日内分析
crypto_event_analysis_cooldown_minutes: int = 10 # 同一交易对事件触发分析冷却
# Brave Search API 配置
brave_api_key: str = ""

View File

@ -238,6 +238,10 @@ class CryptoAgent:
"last_signal_symbol": None,
"last_heartbeat_notified_at": None,
}
self._lane_analysis_state: Dict[str, Dict[str, Any]] = {}
self._event_analysis_state: Dict[str, Dict[str, Any]] = {}
self._event_analysis_tasks: Dict[str, asyncio.Task] = {}
self._price_monitor_registered = False
# 挂单 TP/SL 追踪:挂单成交后自动补设止盈止损
# key=order_id, value={symbol, is_long, size/contracts, tp_price, sl_price}
@ -269,7 +273,7 @@ class CryptoAgent:
"auto_trading_enabled": True, # 模拟交易始终启用
"hyperliquid_enabled": self.hyperliquid is not None,
"bitget_enabled": self.bitget is not None,
"analysis_interval": "每5分钟整点"
"analysis_interval": "每5分钟轻扫描LLM分层冷却"
})
logger.info(f"加密货币智能体初始化完成LLM 驱动),监控交易对: {self.symbols}")
@ -303,6 +307,24 @@ class CryptoAgent:
def _touch_analysis_heartbeat(self):
self._analysis_monitor["last_heartbeat_at"] = datetime.now().isoformat()
def _get_lane_state(self, symbol: str) -> Dict[str, Any]:
return self._lane_analysis_state.setdefault(symbol, {
"last_intraday_at": None,
"last_trend_at": None,
"cached_intraday": None,
"cached_trend": None,
"last_force_reason": "",
})
def _get_event_analysis_state(self, symbol: str) -> Dict[str, Any]:
return self._event_analysis_state.setdefault(symbol, {
"window_start_at": None,
"window_start_price": None,
"last_triggered_at": None,
"last_trigger_reason": "",
"last_price": None,
})
def _parse_iso_datetime(self, value: Optional[str]) -> Optional[datetime]:
if not value:
return None
@ -447,6 +469,169 @@ class CryptoAgent:
"last_alert_at": last_alert_at,
}
def _detect_force_llm_trigger(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[bool, str]:
try:
df_5m = data.get('5m')
if df_5m is not None and len(df_5m) >= 3:
recent_5m = df_5m.iloc[-3:]
price_start = float(recent_5m.iloc[0]['close'])
price_end = float(recent_5m.iloc[-1]['close'])
if price_start > 0:
change_pct = abs(price_end - price_start) / price_start * 100
threshold = self.settings.crypto_force_llm_surge_threshold
if change_pct >= threshold:
direction = "上涨" if price_end > price_start else "下跌"
return True, f"15分钟突发{direction} {change_pct:.2f}% >= {threshold:.2f}%"
df_1h = data.get('1h')
if df_1h is not None and len(df_1h) >= 20 and df_5m is not None and not df_5m.empty:
current_price = float(df_5m.iloc[-1]['close'])
recent_1h = df_1h.iloc[-20:]
high = float(recent_1h['high'].max())
low = float(recent_1h['low'].min())
zone_threshold = self.settings.crypto_force_llm_trade_zone_pct
if current_price > 0:
high_distance = abs(high - current_price) / current_price * 100
low_distance = abs(current_price - low) / current_price * 100
if min(high_distance, low_distance) <= zone_threshold:
zone = "阻力" if high_distance <= low_distance else "支撑"
return True, f"价格接近20小时{zone}位,距离 {min(high_distance, low_distance):.2f}% <= {zone_threshold:.2f}%"
except Exception as e:
logger.debug(f"{symbol} 强制 LLM 触发检测失败: {e}")
return False, ""
def _resolve_llm_lanes_for_symbol(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[List[str], Dict[str, Any], str]:
now = datetime.now()
state = self._get_lane_state(symbol)
force, force_reason = self._detect_force_llm_trigger(symbol, data)
lanes: List[str] = []
intraday_last = self._parse_iso_datetime(state.get("last_intraday_at"))
trend_last = self._parse_iso_datetime(state.get("last_trend_at"))
intraday_cooldown = timedelta(minutes=self.settings.crypto_intraday_llm_cooldown_minutes)
trend_cooldown = timedelta(minutes=self.settings.crypto_trend_llm_cooldown_minutes)
if force or intraday_last is None or not state.get("cached_intraday") or now - intraday_last >= intraday_cooldown:
lanes.append("intraday")
if force or trend_last is None or not state.get("cached_trend") or now - trend_last >= trend_cooldown:
lanes.append("trend")
cached_results = {}
if state.get("cached_intraday"):
cached_results["intraday"] = state["cached_intraday"]
if state.get("cached_trend"):
cached_results["trend"] = state["cached_trend"]
if not lanes and not cached_results:
lanes = ["intraday", "trend"]
state["last_force_reason"] = force_reason if force else ""
return lanes, cached_results, force_reason
def _update_lane_analysis_state(self, symbol: str, market_signal: Dict[str, Any]):
state = self._get_lane_state(symbol)
now_iso = datetime.now().isoformat()
lane_results = market_signal.get("lane_results") or {}
fresh_lanes = set((market_signal.get("llm_lanes") or {}).get("fresh") or [])
if "intraday" in fresh_lanes and lane_results.get("intraday"):
state["last_intraday_at"] = now_iso
state["cached_intraday"] = lane_results["intraday"]
if "trend" in fresh_lanes and lane_results.get("trend"):
state["last_trend_at"] = now_iso
state["cached_trend"] = lane_results["trend"]
def _register_price_event_monitor(self):
if self._price_monitor_registered or not self.settings.crypto_event_analysis_enabled:
return
try:
from app.services.price_monitor_service import get_price_monitor_service
monitor = get_price_monitor_service()
for symbol in self.symbols:
monitor.subscribe_symbol(symbol)
monitor.add_price_callback(self._on_realtime_price_update)
self._price_monitor_registered = True
logger.info("✅ CryptoAgent 已接入实时行情事件触发分析")
except Exception as e:
logger.warning(f"实时行情事件触发分析接入失败: {e}")
def _on_realtime_price_update(self, symbol: str, price: float):
if not self.running or not self.settings.crypto_event_analysis_enabled:
return
if symbol not in self.symbols:
return
if not price or price <= 0:
return
now = datetime.now()
state = self._get_event_analysis_state(symbol)
state["last_price"] = price
window_minutes = self.settings.crypto_event_analysis_window_minutes
window_start_at = self._parse_iso_datetime(state.get("window_start_at"))
window_start_price = state.get("window_start_price")
if not window_start_at or not window_start_price or now - window_start_at >= timedelta(minutes=window_minutes):
state["window_start_at"] = now.isoformat()
state["window_start_price"] = price
return
change_pct = abs(price - float(window_start_price)) / float(window_start_price) * 100
threshold = self.settings.crypto_event_analysis_price_change_percent
if change_pct < threshold:
return
last_triggered_at = self._parse_iso_datetime(state.get("last_triggered_at"))
cooldown = timedelta(minutes=self.settings.crypto_event_analysis_cooldown_minutes)
if last_triggered_at and now - last_triggered_at < cooldown:
return
if symbol in self._event_analysis_tasks and not self._event_analysis_tasks[symbol].done():
return
direction = "上涨" if price > float(window_start_price) else "下跌"
reason = f"{window_minutes}分钟内{direction} {change_pct:.2f}% >= {threshold:.2f}%"
state["last_triggered_at"] = now.isoformat()
state["last_trigger_reason"] = reason
state["window_start_at"] = now.isoformat()
state["window_start_price"] = price
if self._event_loop and self._event_loop.is_running():
asyncio.run_coroutine_threadsafe(
self._run_event_triggered_analysis(symbol, reason),
self._event_loop,
)
logger.info(f"⚡ 已排队实时行情事件分析: {symbol} | {reason}")
else:
logger.warning(f"实时行情事件分析跳过: 事件循环不可用 ({symbol})")
async def _run_event_triggered_analysis(self, symbol: str, reason: str):
current_task = asyncio.current_task()
if current_task:
self._event_analysis_tasks[symbol] = current_task
self._record_analysis_event(
"event_analysis_triggered",
symbol=symbol,
status="info",
detail=reason,
extra={"trigger_reason": reason},
)
try:
await self.analyze_symbol(
symbol,
trigger_source="realtime_event",
force_lanes=["intraday"],
trigger_reason=reason,
)
finally:
task = self._event_analysis_tasks.get(symbol)
if task is current_task:
self._event_analysis_tasks.pop(symbol, None)
async def _maybe_alert_tp_sl_incomplete(self,
platform: str,
tracking_key: str,
@ -759,7 +944,7 @@ class CryptoAgent:
logger.info("🚀 加密货币交易信号智能体LLM 驱动)")
logger.info("=" * 60)
logger.info(f" 监控交易对: {', '.join(self.symbols)}")
logger.info(f" 运行模式: 每5分钟整点执行")
logger.info(f" 运行模式: 每5分钟轻扫描LLM分层冷却")
logger.info(f" 分析引擎: LLM 自主分析")
logger.info(f" 交易模式: 自动交易已启用")
logger.info("=" * 60 + "\n")
@ -770,6 +955,7 @@ class CryptoAgent:
# 注意:不再启动独立的价格监控
# 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查
logger.info(f"交易已启用(由后台统一监控)")
self._register_price_event_monitor()
# 发送启动通知(卡片格式)
title = "🚀 加密货币智能体已启动"
@ -779,7 +965,8 @@ class CryptoAgent:
f"🤖 **驱动引擎**: LLM 自主分析",
f"📊 **监控交易对**: {len(self.symbols)}",
f" {', '.join(self.symbols)}",
f"⏰ **运行频率**: 每5分钟整点",
f"⏰ **运行频率**: 每5分钟轻扫描",
f"🧊 **LLM 冷却**: 日内 {self.settings.crypto_intraday_llm_cooldown_minutes} 分钟 / 趋势 {self.settings.crypto_trend_llm_cooldown_minutes} 分钟",
f"💰 **交易系统**: 已启用(后台统一监控)",
f"🎯 **分析维度**: 技术面 + 资金面 + 情绪面",
]
@ -977,7 +1164,11 @@ class CryptoAgent:
logger.warning(f"{symbol} 波动率检查失败: {e},允许分析")
return True, "波动率检查失败,允许分析", 0
async def analyze_symbol(self, symbol: str):
async def analyze_symbol(self,
symbol: str,
trigger_source: str = "schedule",
force_lanes: Optional[List[str]] = None,
trigger_reason: str = ""):
"""
分析单个交易对信号分析 + 平台执行规则
@ -1005,7 +1196,7 @@ class CryptoAgent:
)
logger.info(f"\n{'' * 50}")
logger.info(f"📊 {symbol} 分析开始")
logger.info(f"📊 {symbol} 分析开始 ({trigger_source})")
logger.info(f"{'' * 50}")
# 1. 获取多周期数据
@ -1048,12 +1239,38 @@ class CryptoAgent:
# ============================================================
# 第一阶段:市场信号分析(不包含仓位信息)
# ============================================================
logger.info(f"\n🤖 【第一阶段:市场信号分析】")
lanes_to_run, cached_lane_results, force_reason = self._resolve_llm_lanes_for_symbol(symbol, data)
if force_lanes:
merged_lanes = set(lanes_to_run)
merged_lanes.update(force_lanes)
lanes_to_run = sorted(merged_lanes)
force_reason = trigger_reason or force_reason or f"{trigger_source} 强制刷新 {', '.join(force_lanes)}"
logger.info(f"\n🤖 【第一阶段:市场信号分析】 lanes={lanes_to_run or ['cache_only']}")
if force_reason:
logger.info(f" ⚡ 强制触发 LLM: {force_reason}")
elif not lanes_to_run:
logger.info(" 🧊 LLM 冷却中,使用上一轮 lane 缓存结果")
self._record_analysis_event(
"llm_lane_plan",
symbol=symbol,
status="info",
detail=force_reason or (f"本轮执行 lane: {', '.join(lanes_to_run)}" if lanes_to_run else "LLM 冷却中,使用缓存 lane 结果"),
extra={
"lanes_to_run": lanes_to_run,
"cache_only": not bool(lanes_to_run),
"force_reason": force_reason,
"trigger_source": trigger_source,
},
)
market_signal = await self.market_analyzer.analyze(
symbol, data,
symbols=self.symbols
symbols=self.symbols,
lanes=lanes_to_run,
cached_lane_results=cached_lane_results,
)
self._update_lane_analysis_state(symbol, market_signal)
# 输出市场分析结果
self._log_market_signal(market_signal)
@ -4793,6 +5010,19 @@ class CryptoAgent:
'platform_halts': self.get_platform_halt_status(),
'analysis_monitor': self._analysis_monitor,
'analysis_notifications': self._analysis_notification_state,
'lane_analysis_state': self._lane_analysis_state,
'event_analysis_state': self._event_analysis_state,
'llm_schedule': {
'scan_interval_minutes': 5,
'intraday_cooldown_minutes': self.settings.crypto_intraday_llm_cooldown_minutes,
'trend_cooldown_minutes': self.settings.crypto_trend_llm_cooldown_minutes,
'force_surge_threshold': self.settings.crypto_force_llm_surge_threshold,
'force_trade_zone_pct': self.settings.crypto_force_llm_trade_zone_pct,
'event_analysis_enabled': self.settings.crypto_event_analysis_enabled,
'event_analysis_window_minutes': self.settings.crypto_event_analysis_window_minutes,
'event_analysis_price_change_percent': self.settings.crypto_event_analysis_price_change_percent,
'event_analysis_cooldown_minutes': self.settings.crypto_event_analysis_cooldown_minutes,
},
'last_signals': {
symbol: {
'type': sig.get('type'),

View File

@ -197,7 +197,9 @@ class MarketSignalAnalyzer:
self.exchange = bitget_service
async def analyze(self, symbol: str, data: Dict[str, Any],
symbols: List[str] = None) -> Dict[str, Any]:
symbols: List[str] = None,
lanes: Optional[List[str]] = None,
cached_lane_results: Optional[Dict[str, Dict[str, Any]]] = None) -> Dict[str, Any]:
"""
分析市场并生成信号
@ -219,50 +221,74 @@ class MarketSignalAnalyzer:
# 3. 获取合约市场数据(资金费率、持仓量等)
futures_context, futures_market_data = await self._get_futures_context(symbol)
# 4. 将日内和趋势拆成两次独立分析,避免一个 prompt 同时混做两件事
intraday_prompt = self._build_analysis_prompt(
symbol=symbol,
lane="intraday",
market_context=market_context,
news_context=news_context,
futures_context=futures_context,
futures_market_data=futures_market_data,
)
trend_prompt = self._build_analysis_prompt(
symbol=symbol,
lane="trend",
market_context=market_context,
news_context=news_context,
futures_context=futures_context,
futures_market_data=futures_market_data,
)
lanes_to_run = set(lanes or ["intraday", "trend"])
cached_lane_results = cached_lane_results or {}
lane_tasks = {}
intraday_messages = [
{"role": "system", "content": self.INTRADAY_ANALYSIS_PROMPT},
{"role": "user", "content": intraday_prompt}
]
trend_messages = [
{"role": "system", "content": self.TREND_ANALYSIS_PROMPT},
{"role": "user", "content": trend_prompt}
]
intraday_response, trend_response = await asyncio.gather(
llm_service.achat(
intraday_messages,
if "intraday" in lanes_to_run:
intraday_prompt = self._build_analysis_prompt(
symbol=symbol,
lane="intraday",
market_context=market_context,
news_context=news_context,
futures_context=futures_context,
futures_market_data=futures_market_data,
)
lane_tasks["intraday"] = llm_service.achat(
[
{"role": "system", "content": self.INTRADAY_ANALYSIS_PROMPT},
{"role": "user", "content": intraday_prompt}
],
temperature=self.INTRADAY_ANALYSIS_TEMPERATURE,
max_tokens=self.ANALYSIS_MAX_TOKENS
),
llm_service.achat(
trend_messages,
)
if "trend" in lanes_to_run:
trend_prompt = self._build_analysis_prompt(
symbol=symbol,
lane="trend",
market_context=market_context,
news_context=news_context,
futures_context=futures_context,
futures_market_data=futures_market_data,
)
lane_tasks["trend"] = llm_service.achat(
[
{"role": "system", "content": self.TREND_ANALYSIS_PROMPT},
{"role": "user", "content": trend_prompt}
],
temperature=self.TREND_ANALYSIS_TEMPERATURE,
max_tokens=self.ANALYSIS_MAX_TOKENS
)
)
intraday_result = self._parse_llm_response(intraday_response or "", symbol)
trend_result = self._parse_llm_response(trend_response or "", symbol)
lane_responses = {}
if lane_tasks:
responses = await asyncio.gather(*lane_tasks.values())
lane_responses = dict(zip(lane_tasks.keys(), responses))
intraday_result = (
self._parse_llm_response(lane_responses.get("intraday") or "", symbol)
if "intraday" in lane_responses
else dict(cached_lane_results.get("intraday") or self._get_empty_signal(symbol))
)
trend_result = (
self._parse_llm_response(lane_responses.get("trend") or "", symbol)
if "trend" in lane_responses
else dict(cached_lane_results.get("trend") or self._get_empty_signal(symbol))
)
intraday_result['_lane_source'] = 'fresh' if "intraday" in lane_responses else 'cache'
trend_result['_lane_source'] = 'fresh' if "trend" in lane_responses else 'cache'
result = self._merge_lane_results(symbol, intraday_result, trend_result)
result['llm_lanes'] = {
'requested': sorted(lanes_to_run),
'fresh': sorted(lane_responses.keys()),
'cached': sorted(set(["intraday", "trend"]) - set(lane_responses.keys())),
}
result['lane_results'] = {
'intraday': intraday_result,
'trend': trend_result,
}
# 携带量化 regime 数据到最终结果,供执行层使用
if market_context.get('range_metrics'):

View File

@ -1192,6 +1192,31 @@
min-width: 92px;
}
.lane-state-list {
display: grid;
gap: 8px;
margin-top: 10px;
}
.lane-state-item {
display: grid;
grid-template-columns: 76px 1fr;
gap: 10px;
padding-top: 8px;
border-top: 1px solid rgba(255,255,255,0.06);
font-size: 12px;
}
.lane-state-symbol {
font-family: "IBM Plex Mono", monospace;
color: var(--text);
}
.lane-state-detail {
color: var(--muted);
line-height: 1.5;
}
.analysis-log-item {
padding: 12px 14px;
border-radius: 14px;
@ -2264,6 +2289,9 @@
const summaryCard = document.getElementById('runtimeSummaryCard');
const monitor = analysisMonitor || {};
const notifications = cachedConsoleData?.crypto_agent?.analysis_notifications || {};
const schedule = cachedConsoleData?.crypto_agent?.llm_schedule || {};
const laneState = cachedConsoleData?.crypto_agent?.lane_analysis_state || {};
const eventState = cachedConsoleData?.crypto_agent?.event_analysis_state || {};
const cycleStatus = monitor.last_cycle_status || 'idle';
const progressText = monitor.current_cycle_total
? `${monitor.current_cycle_index || 0}/${monitor.current_cycle_total} ${monitor.current_cycle_symbol || ''}`
@ -2291,6 +2319,20 @@
const heartbeatSentAt = notifications.last_heartbeat_notified_at;
const lastSignalAt = notifications.last_signal_at;
const lastSignalSymbol = notifications.last_signal_symbol || '-';
const laneRows = Object.entries(laneState).slice(0, 4).map(([symbol, state]) => `
<div class="lane-state-item">
<div class="lane-state-symbol">${symbol}</div>
<div class="lane-state-detail">
日内 ${state.last_intraday_at ? relativeTime(state.last_intraday_at) : '-'} /
趋势 ${state.last_trend_at ? relativeTime(state.last_trend_at) : '-'}
${state.last_force_reason ? `<br>强触发: ${state.last_force_reason}` : ''}
</div>
</div>
`).join('');
const latestEventTrigger = Object.entries(eventState)
.map(([symbol, state]) => ({ symbol, ...state }))
.filter((state) => state.last_triggered_at)
.sort((a, b) => new Date(b.last_triggered_at) - new Date(a.last_triggered_at))[0];
summaryCard.innerHTML = `
<div class="runtime-summary-title">运行摘要</div>
<div class="runtime-summary-main">${monitor.last_analysis_status || 'idle'} / ${monitor.last_analysis_symbol || '-'}</div>
@ -2298,7 +2340,11 @@
<div class="runtime-summary-row"><span>最近分析说明</span><strong>${monitor.last_analysis_detail || '-'}</strong></div>
<div class="runtime-summary-row"><span>最近信号</span><strong>${lastSignalAt ? `${lastSignalSymbol} / ${relativeTime(lastSignalAt)}` : '近 60 分钟无信号'}</strong></div>
<div class="runtime-summary-row"><span>上次心跳通知</span><strong>${heartbeatSentAt ? `${relativeTime(heartbeatSentAt)} / ${formatTime(heartbeatSentAt)}` : '尚未发送'}</strong></div>
<div class="runtime-summary-row"><span>LLM 冷却</span><strong>日内 ${schedule.intraday_cooldown_minutes || '-'}m / 趋势 ${schedule.trend_cooldown_minutes || '-'}m</strong></div>
<div class="runtime-summary-row"><span>事件触发</span><strong>${schedule.event_analysis_enabled ? `${schedule.event_analysis_window_minutes || '-'}m / ${formatPercent(schedule.event_analysis_price_change_percent || 0, 1)} / 冷却${schedule.event_analysis_cooldown_minutes || '-'}m` : '关闭'}</strong></div>
<div class="runtime-summary-row"><span>最近异动分析</span><strong>${latestEventTrigger ? `${latestEventTrigger.symbol} / ${relativeTime(latestEventTrigger.last_triggered_at)}` : '暂无'}</strong></div>
</div>
<div class="lane-state-list">${laneRows || '<div class="analysis-log-detail">暂无 lane 状态,等待下一轮分析。</div>'}</div>
`;
if (!analysisEvents || analysisEvents.length === 0) {