stock-ai-agent/backend/app/crypto_agent/crypto_agent.py
2026-03-25 22:23:38 +08:00

2937 lines
131 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
加密货币交易智能体 - 主控制器LLM 驱动版)
"""
import asyncio
import math
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pandas as pd
from app.utils.logger import logger
from app.config import get_settings
from app.services.bitget_service import bitget_service
from app.services.feishu_service import get_feishu_service, get_feishu_paper_trading_service
from app.services.telegram_service import get_telegram_service
from app.services.dingtalk_service import get_dingtalk_service
from app.services.paper_trading_service import get_paper_trading_service
from app.services.signal_database_service import get_signal_db_service
from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer
from app.crypto_agent.trading_decision_maker import TradingDecisionMaker
from app.utils.system_status import get_system_monitor, AgentStatus
class CryptoAgent:
"""加密货币交易信号智能体LLM 驱动版)"""
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
"""单例模式 - 确保只有一个实例"""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""初始化智能体"""
# 防止重复初始化
if CryptoAgent._initialized:
return
CryptoAgent._initialized = True
self.settings = get_settings()
self.exchange = bitget_service # 交易所服务
self.feishu = get_feishu_service() # 通用飞书服务crypto等
self.feishu_paper = get_feishu_paper_trading_service() # 模拟交易专用飞书服务
self.telegram = get_telegram_service()
self.dingtalk = get_dingtalk_service() # 添加钉钉服务
# 新架构:市场信号分析器 + 交易决策器
self.market_analyzer = MarketSignalAnalyzer()
self.decision_maker = None # 延迟初始化,需要 paper_trading 的杠杆配置
self.signal_db = get_signal_db_service() # 信号数据库服务
# 模拟交易服务(始终启用)
self.paper_trading = get_paper_trading_service()
# 初始化决策器(需要杠杆配置)
self.decision_maker = TradingDecisionMaker(
leverage=self.paper_trading.leverage,
max_total_leverage=self.paper_trading.max_total_leverage
)
# Hyperliquid 实盘服务(可选)
from app.services.hyperliquid_trading_service import get_hyperliquid_service
self.hyperliquid = get_hyperliquid_service()
if self.hyperliquid:
logger.info(f"🔥 Hyperliquid 实盘交易: 已启用")
else:
logger.info(f"📊 Hyperliquid 实盘交易: 未启用(仅模拟盘)")
# Bitget 实盘服务(可选)
from app.services.bitget_live_trading_service import get_bitget_live_service
self.bitget = get_bitget_live_service()
if self.bitget:
logger.info(f"🔥 Bitget 实盘交易: 已启用")
else:
logger.info(f"📊 Bitget 实盘交易: 未启用(仅模拟盘)")
# 状态管理
self.last_signals: Dict[str, Dict[str, Any]] = {}
self.signal_cooldown: Dict[str, datetime] = {}
# 挂单 TP/SL 追踪:挂单成交后自动补设止盈止损
# key=order_id, value={symbol, is_long, size/contracts, tp_price, sl_price}
self._hl_pending_tp_sl: Dict[str, Dict] = {}
self._bg_pending_tp_sl: Dict[str, Dict] = {}
# 配置
self.symbols = self.settings.crypto_symbols.split(',')
# 运行状态
self.running = False
self._event_loop = None
# 注册到系统监控
monitor = get_system_monitor()
self._monitor_info = monitor.register_agent(
agent_id="crypto_agent",
name="加密货币智能体",
agent_type="crypto"
)
monitor.update_config("crypto_agent", {
"symbols": self.symbols,
"auto_trading_enabled": True, # 模拟交易始终启用
"hyperliquid_enabled": self.hyperliquid is not None,
"bitget_enabled": self.bitget is not None,
"analysis_interval": "每5分钟整点"
})
logger.info(f"加密货币智能体初始化完成LLM 驱动),监控交易对: {self.symbols}")
logger.info(f"📊 模拟交易: 始终启用")
def _on_price_update(self, symbol: str, price: float):
"""处理实时价格更新(用于模拟交易)"""
if not self.paper_trading:
return
triggered = self.paper_trading.check_price_triggers(symbol, price)
for result in triggered:
if self._event_loop and self._event_loop.is_running():
# 根据事件类型选择不同的通知方法
event_type = result.get('event_type', 'order_closed')
if event_type == 'order_filled':
asyncio.run_coroutine_threadsafe(self._notify_order_filled(result), self._event_loop)
elif event_type == 'breakeven_triggered':
asyncio.run_coroutine_threadsafe(self._notify_breakeven_triggered(result), self._event_loop)
else:
asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop)
else:
logger.warning(f"无法发送通知: 事件循环不可用")
async def _notify_order_filled(self, result: Dict[str, Any]):
"""发送挂单成交通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
grade = result.get('signal_grade', 'N/A')
title = f"✅ 挂单成交 - {result.get('symbol')}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"⭐ **信号等级**: {grade}",
f"💰 **挂单价**: ${result.get('entry_price', 0):,.2f}",
f"🎯 **成交价**: ${result.get('filled_price', 0):,.2f}",
f"💵 **仓位**: ${result.get('quantity', 0):,.0f}",
]
if result.get('stop_loss'):
content_parts.append(f"🛑 **止损**: ${result.get('stop_loss', 0):,.2f}")
if result.get('take_profit'):
content_parts.append(f"🎯 **止盈**: ${result.get('take_profit', 0):,.2f}")
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "green")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送挂单成交通知: {result.get('order_id')}")
async def _notify_pending_cancelled(self, result: Dict[str, Any]):
"""发送挂单撤销通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
new_side_text = "做多" if result.get('new_side') == 'long' else "做空"
title = f"⚠️ 挂单撤销 - {result.get('symbol')}"
content_parts = [
f"{side_icon} **原方向**: {side_text}",
f"💰 **挂单价**: ${result.get('entry_price', 0):,.2f}",
f"",
f"📝 **原因**: 收到反向{new_side_text}信号,自动撤销",
]
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "orange")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送挂单撤销通知: {result.get('order_id')}")
async def _notify_breakeven_triggered(self, result: Dict[str, Any]):
"""发送移动止损触发通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = '🟢' if result.get('side') == 'long' else '🔴'
pnl_percent = result.get('current_pnl_percent', 0)
title = f"📈 移动止损已启动 - {result.get('symbol')}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"",
f"💰 **开仓价**: ${result.get('filled_price', 0):,.2f}",
f"📈 **当前盈利**: {pnl_percent:+.2f}%",
f"🛑 **新止损价**: ${result.get('new_stop_loss', 0):,.2f}",
f"",
f"💰 锁定利润,让利润奔跑"
]
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "green")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送移动止损通知: {result.get('order_id')}")
async def _notify_order_closed(self, result: Dict[str, Any]):
"""发送订单平仓通知"""
status = result.get('status', '')
is_win = result.get('is_win', False)
if status == 'closed_tp':
emoji = "🎯"
status_text = "止盈平仓"
color = "green"
elif status == 'closed_sl':
emoji = "🛑"
status_text = "止损平仓"
color = "red"
elif status == 'closed_ts':
emoji = "📈"
status_text = "移动止盈"
color = "green"
elif status == 'closed_be':
emoji = "🔒"
status_text = "保本止损"
color = "orange"
else:
emoji = "📤"
status_text = "手动平仓"
color = "blue"
win_text = "盈利" if is_win else "亏损"
win_emoji = "" if is_win else ""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
title = f"{emoji} 订单{status_text}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"💰 **交易对**: {result.get('symbol')}",
f"📊 **入场**: ${result.get('entry_price', 0):,.2f}",
f"🎯 **出场**: ${result.get('exit_price', 0):,.2f}",
f"{win_emoji} **{win_text}**: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})",
f"⏱️ **持仓时间**: {result.get('hold_duration', 'N/A')}",
]
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, color)
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送订单平仓通知: {result.get('order_id')}")
async def _notify_expired_orders_cancelled(self, cancelled_orders: List[Dict[str, Any]]):
"""
发送超时订单取消通知
Args:
cancelled_orders: 被取消的订单列表
"""
if not cancelled_orders:
return
title = f"⏰ 已自动取消 {len(cancelled_orders)} 个超时挂单"
# 构建订单列表内容
order_lines = []
for order in cancelled_orders[:5]: # 最多显示5个
side_icon = "🟢" if order['side'] == 'long' else "🔴"
order_lines.append(
f"{side_icon} **{order['symbol']}** ({order['side']})\n"
f" 入场价: ${order['entry_price']:.2f} | "
f"已挂单: {order['age_hours']:.1f}小时"
)
if len(cancelled_orders) > 5:
order_lines.append(f"\n... 还有 {len(cancelled_orders) - 5} 个订单")
content_parts = [
f"⏰ **挂单超时自动取消**",
f"📊 **取消数量**: {len(cancelled_orders)}",
f"⚙️ **超时阈值**: {self.paper_trading.order_timeout_hours} 小时",
"",
"**取消的订单**:",
]
content_parts.extend(order_lines)
content_parts.append("\n💡 挂单超时自动取消,释放仓位供新信号使用")
content = "\n".join(content_parts)
# 发送通知
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "orange")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送超时订单取消通知: {len(cancelled_orders)} 个订单")
def _get_seconds_until_next_5min(self) -> int:
"""计算距离下一个5分钟整点的秒数"""
now = datetime.now()
current_minute = now.minute
current_second = now.second
minutes_past = current_minute % 5
if minutes_past == 0 and current_second == 0:
return 0
minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5
seconds_to_wait = minutes_to_wait * 60 - current_second
return seconds_to_wait
async def run(self):
"""主运行循环"""
self.running = True
self._event_loop = asyncio.get_event_loop()
# 更新状态为启动中
monitor = get_system_monitor()
monitor.update_status("crypto_agent", AgentStatus.STARTING)
# 启动横幅
logger.info("\n" + "=" * 60)
logger.info("🚀 加密货币交易信号智能体LLM 驱动)")
logger.info("=" * 60)
logger.info(f" 监控交易对: {', '.join(self.symbols)}")
logger.info(f" 运行模式: 每5分钟整点执行")
logger.info(f" 分析引擎: LLM 自主分析")
logger.info(f" 交易模式: 自动交易已启用")
logger.info("=" * 60 + "\n")
# 更新状态为运行中
monitor.update_status("crypto_agent", AgentStatus.RUNNING)
# 注意:不再启动独立的价格监控
# 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查
logger.info(f"交易已启用(由后台统一监控)")
# 发送启动通知(卡片格式)
title = "🚀 加密货币智能体已启动"
# 构建卡片内容
content_parts = [
f"🤖 **驱动引擎**: LLM 自主分析",
f"📊 **监控交易对**: {len(self.symbols)}",
f" {', '.join(self.symbols)}",
f"⏰ **运行频率**: 每5分钟整点",
f"💰 **交易系统**: 已启用(后台统一监控)",
f"🎯 **分析维度**: 技术面 + 资金面 + 情绪面",
]
content = "\n".join(content_parts)
await self.feishu.send_card(title, content, "green")
await self.telegram.send_startup_notification(self.symbols)
while self.running:
try:
wait_seconds = self._get_seconds_until_next_5min()
if wait_seconds > 0:
next_run = datetime.now() + timedelta(seconds=wait_seconds)
logger.info(f"⏳ 等待 {wait_seconds} 秒,下次运行: {next_run.strftime('%H:%M:%S')}")
await asyncio.sleep(wait_seconds)
run_time = datetime.now()
logger.info("\n" + "=" * 60)
logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]")
logger.info("=" * 60)
# 检查并取消超时挂单(在分析开始前)
cancelled = self.paper_trading.check_and_cancel_expired_orders()
if cancelled:
logger.info(f"🔄 已自动取消 {len(cancelled)} 个超时挂单")
# 发送超时取消通知
await self._notify_expired_orders_cancelled(cancelled)
# 检查实盘挂单是否已成交,补设止盈止损
if self.hyperliquid:
await self._check_and_set_pending_tp_sl_hyperliquid()
if self.bitget:
await self._check_and_set_pending_tp_sl_bitget()
for symbol in self.symbols:
await self.analyze_symbol(symbol)
logger.info("\n" + "" * 60)
logger.info(f"✅ 本轮分析完成,共分析 {len(self.symbols)} 个交易对")
logger.info("" * 60 + "\n")
await asyncio.sleep(2)
except Exception as e:
logger.error(f"❌ 分析循环出错: {e}")
import traceback
logger.error(traceback.format_exc())
await asyncio.sleep(10)
def stop(self):
"""停止运行"""
self.running = False
# 更新状态
monitor = get_system_monitor()
monitor.update_status("crypto_agent", AgentStatus.STOPPED)
logger.info("加密货币智能体已停止")
def _check_volatility(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[bool, str, float]:
"""
检查波动率,判断是否值得进行 LLM 分析(组合方案)
使用 1 小时 K 线判断趋势波动5 分钟 K 线检测突发波动
Args:
symbol: 交易对
data: 多周期K线数据
Returns:
(should_analyze, reason, volatility_percent)
should_analyze: 是否应该进行分析
reason: 原因说明
volatility_percent: 1小时波动率百分比
"""
# 检查是否启用波动率过滤
if not self.settings.crypto_volatility_filter_enabled:
return True, "波动率过滤未启用", 0
try:
# 1. 首先检查 1 小时趋势波动率
df_1h = data.get('1h')
if df_1h is None or len(df_1h) < 20:
# 数据不足,保守起见允许分析
return True, "1小时数据不足允许分析", 0
# 获取最近20根K线
recent_1h = df_1h.iloc[-20:]
# 计算最高价和最低价
high = recent_1h['high'].max()
low = recent_1h['low'].min()
current_price = float(recent_1h.iloc[-1]['close'])
# 计算1小时波动率
if low > 0:
volatility_1h_percent = ((high - low) / low) * 100
else:
volatility_1h_percent = 0
# 计算价格变化范围(相对于当前价格)
price_range_high_percent = ((high - current_price) / current_price) * 100 if current_price > 0 else 0
price_range_low_percent = ((current_price - low) / current_price) * 100 if current_price > 0 else 0
# 从配置读取阈值
min_volatility = self.settings.crypto_min_volatility_percent
min_price_range = self.settings.crypto_min_price_range_percent
# 如果1小时波动率足够大直接允许分析
if volatility_1h_percent >= min_volatility or price_range_high_percent >= min_price_range or price_range_low_percent >= min_price_range:
return True, f"1小时趋势活跃 (波动率 {volatility_1h_percent:.2f}%),值得分析", volatility_1h_percent
# 2. 1小时波动率较低检查5分钟突发波动
df_5m = data.get('5m')
if df_5m is not None and len(df_5m) >= 3:
# 获取最近3根5分钟K线15分钟内的变化
recent_5m = df_5m.iloc[-3:]
# 计算5分钟价格变化幅度
price_start = float(recent_5m.iloc[0]['close'])
price_end = float(recent_5m.iloc[-1]['close'])
if price_start > 0:
price_change_5m = abs(price_end - price_start) / price_start * 100
else:
price_change_5m = 0
# 从配置读取5分钟突发波动阈值
surge_threshold = self.settings.crypto_5m_surge_threshold
logger.debug(f"{symbol} 5分钟价格变化: {price_start:.2f} -> {price_end:.2f} = {price_change_5m:.2f}% (阈值: {surge_threshold}%)")
# 如果5分钟突发波动超过阈值仍然允许分析
if price_change_5m >= surge_threshold:
direction = "上涨" if price_end > price_start else "下跌"
return True, f"5分钟突发{direction} ({price_change_5m:.2f}% > {surge_threshold}%),强制分析", volatility_1h_percent
# 3. 波动率过低,跳过分析
reason = f"波动率过低 (1小时: {volatility_1h_percent:.2f}% < {min_volatility}%, 5分钟无突发波动),跳过分析"
logger.info(f"⏸️ {symbol} {reason}")
return False, reason, volatility_1h_percent
except Exception as e:
logger.warning(f"{symbol} 波动率检查失败: {e},允许分析")
return True, "波动率检查失败,允许分析", 0
async def analyze_symbol(self, symbol: str):
"""
分析单个交易对(新架构:市场分析 + 交易决策分离)
新架构流程:
1. 市场信号分析器分析市场(不包含仓位信息)
2. 交易决策器根据信号+仓位+账户状态做决策
3. 执行交易决策
Args:
symbol: 交易对,如 'BTCUSDT'
"""
try:
# 更新活动时间
monitor = get_system_monitor()
monitor.update_activity("crypto_agent")
logger.info(f"\n{'' * 50}")
logger.info(f"📊 {symbol} 分析开始")
logger.info(f"{'' * 50}")
# 1. 获取多周期数据
data = self.exchange.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
logger.warning(f"⚠️ {symbol} 数据不完整,跳过分析")
return
# 当前价格
current_price = float(data['5m'].iloc[-1]['close'])
price_change_24h = self._calculate_price_change(data['1h'])
logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})")
# 1.5. 波动率检查(节省 LLM 调用)
should_analyze, volatility_reason, volatility = self._check_volatility(symbol, data)
if not should_analyze:
logger.info(f"⏸️ {volatility_reason},跳过本次 LLM 分析")
return
# ============================================================
# 第一阶段:市场信号分析(不包含仓位信息)
# ============================================================
logger.info(f"\n🤖 【第一阶段:市场信号分析】")
# 获取上一轮的信号(用于上下文)
previous_signal = self.last_signals.get(symbol)
# 显示上一轮信号(如果有)
if previous_signal:
prev_time = previous_signal.get('timestamp', 'Unknown')
prev_trend = previous_signal.get('trend', 'Unknown')
prev_signals = previous_signal.get('signals', [])
logger.info(f"📋 上一轮分析时间: {prev_time}")
logger.info(f"📋 上一轮趋势: {prev_trend}")
if prev_signals:
for sig in prev_signals:
action = sig.get('action', 'N/A')
confidence = sig.get('confidence', 0)
timeframe = sig.get('timeframe', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(timeframe, timeframe)
logger.info(f"📋 上一轮信号: {type_text} | {action} | {confidence}%")
else:
logger.info(f"📋 上一轮信号: 无交易信号(观望)")
else:
logger.info(f"📋 上一轮信号: 无历史记录(首次分析)")
market_signal = await self.market_analyzer.analyze(
symbol, data,
symbols=self.symbols,
previous_signal=previous_signal
)
# 输出市场分析结果
self._log_market_signal(market_signal)
# 存储最新信号(用于下一轮分析的上下文)
self.last_signals[symbol] = {
'timestamp': datetime.now().isoformat(),
'trend': market_signal.get('trend', 'unknown'),
'trend_strength': market_signal.get('trend_strength', 'unknown'),
'signals': market_signal.get('signals', []),
'key_levels': market_signal.get('key_levels', {}),
'current_price': current_price
}
# 过滤掉 wait 信号,只保留 buy/sell 信号
signals = market_signal.get('signals', [])
trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']]
if not trade_signals:
logger.info(f"\n⏸️ 结论: 无交易信号(仅有观望建议),继续观望")
return
# 检查是否有达到阈值的交易信号
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
valid_signals = [s for s in trade_signals if s.get('confidence', 0) >= threshold]
if not valid_signals:
logger.info(f"\n⏸️ 结论: 无交易信号达到置信度阈值 ({threshold}%),继续观望")
return
logger.info(f"\n✅ 发现 {len(valid_signals)} 个有效交易信号(达到 {threshold}% 阈值)")
# ============================================================
# 发送市场信号通知(独立于交易决策)
# ============================================================
await self._send_market_signal_notification(market_signal, current_price)
# ============================================================
# 第二阶段:交易决策(双轨独立)
# 模拟交易和 Hyperliquid 实盘分别进行独立决策
# ============================================================
logger.info(f"\n🤖 【第二阶段:交易决策】")
paper_decision = None
hyperliquid_decision = None
bitget_decision = None
# 2.1 模拟盘决策
if self.settings.paper_trading_enabled:
logger.info(f"\n📊 【模拟盘决策】")
paper_positions, paper_account, paper_pending = self._get_paper_trading_state()
paper_pending_for_symbol = [o for o in paper_pending if o.get('symbol') == symbol]
paper_decision = await self.decision_maker.make_decision(
market_signal, paper_positions, paper_account, current_price, paper_pending_for_symbol
)
logger.info(f" 模拟盘决策: {paper_decision.get('decision')} - {paper_decision.get('reasoning', '')}")
await self._send_trading_decision_notification(paper_decision, market_signal, current_price, prefix="[模拟盘]")
else:
logger.info(f"⏸️ 模拟盘交易未启用")
# 2.2 Hyperliquid 实盘决策(独立)
if self.hyperliquid:
logger.info(f"\n🔥 【Hyperliquid 决策】")
hl_positions, hl_account, hl_pending = self._get_hyperliquid_trading_state()
hl_pending_for_symbol = [o for o in hl_pending if o.get('symbol') == symbol]
hyperliquid_decision = await self.decision_maker.make_decision(
market_signal, hl_positions, hl_account, current_price, hl_pending_for_symbol
)
logger.info(f" Hyperliquid 决策: {hyperliquid_decision.get('decision')} - {hyperliquid_decision.get('reasoning', '')}")
await self._send_trading_decision_notification(hyperliquid_decision, market_signal, current_price, prefix="[Hyperliquid]")
else:
logger.info(f"⏸️ Hyperliquid 实盘交易未启用")
# 2.3 Bitget 实盘决策(独立)
if self.bitget:
logger.info(f"\n🔥 【Bitget 决策】")
bg_positions, bg_account, bg_pending = self._get_bitget_trading_state()
bg_pending_for_symbol = [o for o in bg_pending if o.get('symbol') == symbol]
bitget_decision = await self.decision_maker.make_decision(
market_signal, bg_positions, bg_account, current_price, bg_pending_for_symbol
)
logger.info(f" Bitget 决策: {bitget_decision.get('decision')} - {bitget_decision.get('reasoning', '')}")
await self._send_trading_decision_notification(bitget_decision, market_signal, current_price, prefix="[Bitget]")
else:
logger.info(f"⏸️ Bitget 实盘交易未启用")
# ============================================================
# 第三阶段:执行交易决策(双轨独立)
# ============================================================
await self._execute_decisions(paper_decision, hyperliquid_decision, bitget_decision, market_signal, current_price)
except Exception as e:
logger.error(f"❌ 分析 {symbol} 出错: {e}")
import traceback
logger.error(traceback.format_exc())
def _log_market_signal(self, signal: Dict[str, Any]):
"""输出市场信号分析结果"""
logger.info(f" 市场状态: {signal.get('market_state')}")
logger.info(f" 趋势: {signal.get('trend')}")
# 新闻情绪
news_sentiment = signal.get('news_sentiment', '')
if news_sentiment:
sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': ''}.get(news_sentiment, '')
logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}")
# 关键价位
import re
key_levels = signal.get('key_levels', {})
if key_levels.get('support') or key_levels.get('resistance'):
# 从字符串中提取数字(处理 "66065 (15m布林下轨)" 这种格式)
def extract_number(val):
if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
# 提取第一个数字
match = re.search(r'[\d,]+\.?\d*', val.replace(',', ''))
if match:
return float(match.group())
return None
supports = [extract_number(s) for s in key_levels.get('support', [])[:2]]
resistances = [extract_number(r) for r in key_levels.get('resistance', [])[:2]]
support_str = ', '.join([f"${s:,.2f}" for s in supports if s is not None])
resistance_str = ', '.join([f"${r:,.2f}" for r in resistances if r is not None])
logger.info(f" 支撑位: {support_str or '-'}")
logger.info(f" 阻力位: {resistance_str or '-'}")
# 信号列表 - 区分交易信号和观望建议
signals = signal.get('signals', [])
trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']]
wait_signals = [s for s in signals if s.get('action') == 'wait']
if trade_signals:
logger.info(f"\n🎯 【发现 {len(trade_signals)} 个交易信号】")
for i, sig in enumerate(trade_signals, 1):
signal_type = sig.get('timeframe', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(signal_type, signal_type)
action = sig.get('action', 'hold')
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'}
action_text = action_map.get(action, action)
confidence = sig.get('confidence', 0)
logger.info(f"\n [{i}] {type_text} | {action_text}")
logger.info(f" 信心度: {confidence}%")
logger.info(f" 入场: ${sig.get('entry_price', 'N/A')}")
logger.info(f" 止损: ${sig.get('stop_loss', 'N/A')}")
logger.info(f" 止盈: ${sig.get('take_profit', 'N/A')}")
logger.info(f" 理由: {sig.get('reasoning', 'N/A')}")
if wait_signals:
logger.info(f"\n📋 【{len(wait_signals)} 个观望建议(不触发交易)】")
for i, sig in enumerate(wait_signals, 1):
signal_type = sig.get('timeframe', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(signal_type, signal_type)
confidence = sig.get('confidence', 0)
logger.info(f"\n [{i}] {type_text} | 观望")
logger.info(f" 信心度: {confidence}%")
logger.info(f" 理由: {sig.get('reasoning', 'N/A')}")
def _log_trading_decision(self, decision: Dict[str, Any]):
"""输出交易决策结果"""
decision_type = decision.get('decision', 'HOLD')
decision_map = {
'OPEN': '🟢 开仓',
'CLOSE': '🔴 平仓',
'ADD': ' 加仓',
'REDUCE': ' 减仓',
'HOLD': '⏸️ 观望'
}
logger.info(f" 决策: {decision_map.get(decision_type, decision_type)}")
logger.info(f" 动作: {decision.get('action', 'N/A')}")
logger.info(f" 仓位: {decision.get('position_size', 'N/A')}")
# quantity 是保证金,显示持仓价值 = 保证金 × 杠杆
quantity = decision.get('quantity', 0)
if isinstance(quantity, (int, float)) and quantity > 0:
leverage = self.paper_trading.leverage # 使用实际的杠杆配置
position_value = quantity * leverage
logger.info(f" 持仓价值: ${position_value:,.2f} (保证金 ${quantity:.2f})")
else:
logger.info(f" 数量: ${decision.get('quantity', 'N/A')}")
if decision.get('stop_loss'):
logger.info(f" 止损: ${decision.get('stop_loss')}")
if decision.get('take_profit'):
logger.info(f" 止盈: ${decision.get('take_profit')}")
logger.info(f" 理由: {decision.get('reasoning', 'N/A')}")
risk = decision.get('risk_analysis', '')
if risk:
logger.info(f" 风险: {risk}")
def _get_paper_trading_state(self) -> tuple:
"""
获取模拟盘交易状态(持仓和账户)
Returns:
(positions, account, pending_orders) - 持仓列表、账户状态、挂单列表
"""
# 模拟交易
active_orders = self.paper_trading.get_active_orders()
account = self.paper_trading.get_account_status()
# 分离持仓和挂单
position_list = []
pending_orders = []
for order in active_orders:
if order.get('status') == 'open' and order.get('filled_price'):
# 已成交的订单作为持仓
position_list.append({
'symbol': order.get('symbol'),
'side': order.get('side'),
'holding': order.get('quantity', 0),
'entry_price': order.get('filled_price') or order.get('entry_price'),
'stop_loss': order.get('stop_loss'),
'take_profit': order.get('take_profit')
})
elif order.get('status') == 'pending':
# 未成交的订单作为挂单
pending_orders.append({
'order_id': order.get('order_id'),
'symbol': order.get('symbol'),
'side': order.get('side'),
'entry_price': order.get('entry_price'),
'quantity': order.get('quantity', 0),
'entry_type': order.get('entry_type', 'market'),
'confidence': order.get('confidence', 0)
})
return position_list, account, pending_orders
def _get_hyperliquid_trading_state(self) -> tuple:
"""
获取 Hyperliquid 实盘交易状态(持仓和账户)
Returns:
(positions, account, pending_orders) - 持仓列表、账户状态、挂单列表
"""
try:
# 获取账户状态
hl_state = self.hyperliquid.get_account_state()
# 转换持仓格式
position_list = []
for pos in hl_state["positions"]:
position_data = pos.get("position", {})
coin = position_data.get("coin")
size = float(position_data.get("szi", 0))
if size != 0:
entry_price = float(position_data.get("entryPx", 0))
unrealized_pnl = float(position_data.get("unrealizedPnl", 0))
# 获取止盈止损价格(从挂单中查询)
tp_sl_prices = self.hyperliquid.get_tp_sl_prices(coin)
position_list.append({
'symbol': f"{coin}USDT", # BTC → BTCUSDT
'side': 'buy' if size > 0 else 'sell',
'holding': abs(size),
'entry_price': entry_price,
'unrealized_pnl': unrealized_pnl,
'stop_loss': tp_sl_prices.get('stop_loss'),
'take_profit': tp_sl_prices.get('take_profit')
})
# 转换账户格式(匹配模拟盘格式)
account = {
'current_balance': hl_state["account_value"],
'initial_balance': self.hyperliquid.initial_balance,
'used_margin': hl_state["total_margin_used"],
'available_balance': hl_state["available_balance"],
'total_position_value': sum(abs(float(p.get("position", {}).get("szi", 0)) *
float(p.get("position", {}).get("entryPx", 0)))
for p in hl_state["positions"]),
'max_total_leverage': self.hyperliquid.max_total_leverage
}
# 计算当前总杠杆
if account['current_balance'] > 0:
account['current_total_leverage'] = account['total_position_value'] / account['current_balance']
else:
account['current_total_leverage'] = 0
# 获取挂单(包括止盈止损)
all_orders = self.hyperliquid.get_open_orders()
pending_orders = []
for order in all_orders:
pending_orders.append({
'order_id': order.get('order_id'),
'symbol': f"{order['symbol']}USDT", # 转换格式
'side': 'buy' if order.get('side') == 'B' else 'sell',
'entry_price': order.get('price'),
'quantity': order.get('size'),
'entry_type': 'limit',
'is_reduce_only': order.get('is_reduce_only', False)
})
return position_list, account, pending_orders
except Exception as e:
logger.error(f"获取 Hyperliquid 状态失败: {e}")
return [], {}, []
async def _execute_decisions(self, paper_decision: Dict[str, Any],
hyperliquid_decision: Dict[str, Any],
bitget_decision: Dict[str, Any],
market_signal: Dict[str, Any], current_price: float):
"""执行交易决策(三轨独立)"""
# 选择最佳信号用于保存
best_signal = self._get_best_signal_from_market(market_signal)
# 保存信号到数据库(只保存一次)
if best_signal:
signal_to_save = best_signal.copy()
signal_to_save['signal_type'] = 'crypto'
signal_to_save['symbol'] = market_signal.get('symbol')
signal_to_save['current_price'] = current_price
self.signal_db.add_signal(signal_to_save)
# ============================================================
# 执行模拟盘决策
# ============================================================
if paper_decision:
await self._execute_paper_decisions(paper_decision, market_signal, current_price)
# ============================================================
# 执行 Hyperliquid 决策
# ============================================================
if hyperliquid_decision and self.hyperliquid:
await self._execute_hyperliquid_decisions(hyperliquid_decision, market_signal, current_price)
# ============================================================
# 执行 Bitget 决策
# ============================================================
if bitget_decision and self.bitget:
await self._execute_bitget_decisions(bitget_decision, market_signal, current_price)
async def _execute_paper_decisions(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float):
"""执行模拟盘决策"""
decision_type = decision.get('decision', 'HOLD') # 修复:使用 'decision' 字段而不是 'action'
if decision_type == 'HOLD':
reasoning = decision.get('reasoning', '观望')
logger.info(f"\n📊 交易决策: {reasoning}")
# HOLD决策的理由已在交易决策通知中说明无需单独通知
else:
logger.info(f"\n📊 【执行交易】")
if decision_type in ['OPEN', 'ADD']:
# 先执行交易
logger.info(f" 准备执行交易...")
result = await self._execute_paper_trade(decision, market_signal, current_price)
# 检查是否成功执行
order = result.get('order') if result else None
logger.info(f" 订单创建检查: order={'存在' if order else '不存在'}, result_key={'order' in (result or {})}")
if order:
# 验证订单对象的有效性
if hasattr(order, 'order_id') and order.order_id:
logger.info(f" 订单验证通过: {order.order_id}")
# 只有成功创建订单后才发送通知
await self._send_signal_notification(market_signal, decision, current_price)
else:
logger.error(f" ❌ 订单对象无效: 缺少order_id属性")
await self._notify_execution_failure(market_signal, decision, "订单对象无效缺少order_id", prefix="[模拟盘]")
else:
reason = result.get('message', '订单创建失败') if result else '订单创建失败'
logger.warning(f" ⚠️ 交易未执行: {reason}")
await self._notify_execution_failure(market_signal, decision, reason, prefix="[模拟盘]")
elif decision_type == 'CLOSE':
close_success = await self._execute_close(decision, current_price)
# CLOSE 操作也发送执行通知
if close_success:
await self._send_signal_notification(market_signal, decision, current_price)
else:
logger.warning(f" ⚠️ 平仓未成功执行,跳过通知")
elif decision_type == 'CANCEL_PENDING':
cancel_success = await self._execute_cancel_pending(decision)
# CANCEL_PENDING 操作也发送执行通知
if cancel_success:
await self._send_signal_notification(market_signal, decision, current_price)
else:
logger.warning(f" ⚠️ 取消挂单未成功执行,跳过通知")
elif decision_type == 'REDUCE':
reduce_success = await self._execute_reduce(decision)
# REDUCE 操作也发送执行通知
if reduce_success:
await self._send_signal_notification(market_signal, decision, current_price)
else:
logger.warning(f" ⚠️ 减仓未成功执行,跳过通知")
def _get_best_signal_from_market(self, market_signal: Dict[str, Any]) -> Dict[str, Any]:
"""从市场信号中获取最佳信号"""
signals = market_signal.get('signals', [])
if not signals:
return {}
# 按信心度排序,取最高的
sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True)
return sorted_signals[0]
async def _send_market_signal_notification(self, market_signal: Dict[str, Any],
current_price: float):
"""发送市场信号通知(第一阶段)- 调用前已确保有有效信号"""
try:
# 获取配置的阈值
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
# 过滤达到阈值的信号(防御性检查)
signals = market_signal.get('signals', [])
valid_signals = [s for s in signals if s.get('confidence', 0) >= threshold]
if not valid_signals:
return
# 取最佳信号(按信心度排序)
best_signal = sorted(valid_signals, key=lambda x: x.get('confidence', 0), reverse=True)[0]
# 构建通知消息 - 完全匹配旧格式
symbol = market_signal.get('symbol')
market_state = market_signal.get('market_state')
trend = market_signal.get('trend')
# 注意:经过 market_signal_analyzer 解析后
# - 'action' 是 buy/sell/wait (交易方向)
# - 'timeframe' 是 short_term/medium_term/long_term (周期)
sig_action = best_signal.get('action', 'hold') # buy/sell/wait
timeframe = best_signal.get('timeframe', 'unknown') # short_term/medium_term/long_term
confidence = best_signal.get('confidence', 0)
entry_val = best_signal.get('entry_price', 'N/A')
sl_val = best_signal.get('stop_loss', 'N/A')
tp_val = best_signal.get('take_profit', 'N/A')
reasoning = best_signal.get('reasoning', '')
# 格式化价格用于显示(处理 float 和 N/A
def format_price(price_value):
if price_value is None or price_value == 'N/A':
return 'N/A'
if isinstance(price_value, float):
return f"{price_value:,.2f}"
return str(price_value)
entry = format_price(entry_val)
sl = format_price(sl_val)
tp = format_price(tp_val)
# 类型映射
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
timeframe_text = type_map.get(timeframe, timeframe)
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空', 'hold': ' 观望'}
action_text = action_map.get(sig_action, sig_action)
# 入场类型 - 从信号中获取
entry_type = best_signal.get('entry_type', 'market')
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
entry_type_icon = '' if entry_type == 'market' else ''
# 等级(基于信心度映射)- 与 market_signal_analyzer.py 保持一致
# A级(80-100): 量价配合 + 多指标共振 + 多周期确认
# B级(60-79): 量价配合 + 主要指标确认
# C级(40-59): 有机会但量价不够理想
# D级(<40): 量价背离或信号矛盾
if confidence >= 80:
grade = 'A'
grade_icon = '⭐⭐⭐'
elif confidence >= 60:
grade = 'B'
grade_icon = '⭐⭐'
elif confidence >= 40:
grade = 'C'
grade_icon = ''
else:
grade = 'D'
grade_icon = ''
# 仓位(基于信心度和杠杆空间)- 与新的等级阈值对齐
if confidence >= 80: # A级信号
position_size = 'heavy'
position_icon = '🔥'
position_text = '重仓'
elif confidence >= 60: # B级信号
position_size = 'medium'
position_icon = '📊'
position_text = '中仓'
else: # C级或D级信号
position_size = 'light'
position_icon = '🌱'
position_text = '轻仓'
# 计算止损止盈百分比(价格已经是 float
try:
# 使用当前价格作为入场价(如果 entry_price 是 N/A
entry_for_calc = entry_val if isinstance(entry_val, (int, float)) else current_price
if isinstance(sl_val, (int, float)) and isinstance(entry_for_calc, (int, float)) and entry_for_calc > 0:
if sig_action == 'buy':
sl_percent = ((sl_val - entry_for_calc) / entry_for_calc * 100)
else:
sl_percent = ((entry_for_calc - sl_val) / entry_for_calc * 100)
sl_display = f"{sl_percent:+.1f}%"
else:
sl_display = "N/A"
if isinstance(tp_val, (int, float)) and isinstance(entry_for_calc, (int, float)) and entry_for_calc > 0:
if sig_action == 'buy':
tp_percent = ((tp_val - entry_for_calc) / entry_for_calc * 100)
else:
tp_percent = ((entry_for_calc - tp_val) / entry_for_calc * 100)
tp_display = f"{tp_percent:+.1f}%"
else:
tp_display = "N/A"
except:
sl_display = "N/A"
tp_display = "N/A"
# 构建卡片标题和颜色 - 添加 [信号] 前缀区分
if sig_action == 'buy':
title = f"[信号] 🟢 {symbol} {timeframe_text}做多信号"
color = "green"
else:
title = f"[信号] 🔴 {symbol} {timeframe_text}做空信号"
color = "red"
# 构建卡片内容
content_parts = [
f"**{timeframe_text}** | **{grade}**{grade_icon} | **{confidence}%** 置信度",
f"{entry_type_icon} **入场**: {entry_type_text} | {position_icon} **仓位**: {position_text}",
f"",
]
# 入场价格显示
if entry_type == 'limit':
# 限价订单:显示建议挂单价格和当前价格
content_parts.append(f"📋 **挂单价格**: ${entry}")
content_parts.append(f"📍 **当前价格**: ${format_price(current_price)}")
else:
# 市价订单:显示当前价格作为入场价
content_parts.append(f"💰 **入场价**: ${entry}")
if sl != 'N/A':
content_parts.append(f"🛑 **止损价**: ${sl} ({sl_display})")
if tp != 'N/A':
content_parts.append(f"🎯 **止盈价**: ${tp} ({tp_display})")
content_parts.append(f"")
content_parts.append(f"📝 **分析理由**:")
# HTML转义reasoning避免特殊字符破坏HTML格式
import html
escaped_reasoning = html.escape(reasoning) if reasoning else reasoning
content_parts.append(f"{escaped_reasoning}")
content = "\n".join(content_parts)
# 根据配置发送通知 - [信号] 发送到 crypto webhook
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, color)
if self.settings.telegram_enabled:
# Telegram 使用文本格式
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
# 钉钉使用 ActionCard 格式
await self.dingtalk.send_action_card(title, content)
logger.info(f" 📤 已发送市场信号通知 (阈值: {threshold}%)")
except Exception as e:
logger.warning(f"发送市场信号通知失败: {e}")
import traceback
logger.debug(traceback.format_exc())
async def _send_trading_decision_notification(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float,
prefix: str = ""):
"""发送交易决策通知(第二阶段)"""
try:
decision_type = decision.get('decision', 'HOLD')
symbol = market_signal.get('symbol')
# 账户类型标识
account_type = f"{prefix} 📊 交易" if prefix else "📊 交易"
# 决策类型映射
decision_map = {
'OPEN': '开仓',
'CLOSE': '平仓',
'ADD': '加仓',
'REDUCE': '减仓',
'CANCEL_PENDING': '取消挂单',
'HOLD': '观望'
}
decision_text = decision_map.get(decision_type, decision_type)
# 根据决策类型设置颜色
color_map = {
'OPEN': 'green',
'ADD': 'green',
'CLOSE': 'orange',
'REDUCE': 'orange',
'CANCEL_PENDING': 'red',
'HOLD': 'gray'
}
color = color_map.get(decision_type, 'blue')
# 构建标题 - 添加 [决策] 前缀区分
title = f"[决策] {account_type} {symbol} 交易决策: {decision_text}"
# 获取最佳信号用于显示
best_signal = self._get_best_signal_from_market(market_signal)
signal_confidence = best_signal.get('confidence', 0) if best_signal else 0
signal_action = best_signal.get('action', '') if best_signal else ''
# 方向图标
if 'buy' in signal_action.lower() or 'long' in signal_action.lower():
action_icon = '🟢'
action_text = '做多'
elif 'sell' in signal_action.lower() or 'short' in signal_action.lower():
action_icon = '🔴'
action_text = '做空'
else:
action_icon = ''
action_text = '观望'
# 构建内容
content_parts = [
f"{action_icon} **市场信号**: {action_text} | 信心度: {signal_confidence}%",
f"",
f"🎯 **交易决策**: {decision_text}",
f"",
]
# 添加决策详情
if decision_type != 'HOLD':
reasoning = decision.get('reasoning', '')
risk_analysis = decision.get('risk_analysis', '')
position_size = decision.get('position_size', 'N/A')
# 仓位图标
position_map = {'heavy': '🔥 重仓', 'medium': '📊 中仓', 'light': '🌱 轻仓', 'micro': '🌱 微仓'}
position_display = position_map.get(position_size, position_size)
# HTML转义避免特殊字符破坏HTML格式
import html
escaped_reasoning = html.escape(reasoning) if reasoning else ''
escaped_risk = html.escape(risk_analysis) if risk_analysis else ''
content_parts.extend([
f"📊 **仓位**: {position_display}",
f"💭 **决策理由**: {escaped_reasoning}",
])
if escaped_risk:
content_parts.append(f"⚠️ **风险**: {escaped_risk}")
# 添加价格信息(如果有)
quantity = decision.get('quantity', 0)
if isinstance(quantity, (int, float)) and quantity > 0:
leverage = self.paper_trading.leverage # 使用实际的杠杆配置
position_value = quantity * leverage
content_parts.append(f"💰 **持仓价值**: ${position_value:,.2f} (保证金 ${quantity:.2f})")
stop_loss = decision.get('stop_loss')
take_profit = decision.get('take_profit')
if stop_loss:
content_parts.append(f"🛑 **止损**: ${stop_loss}")
if take_profit:
content_parts.append(f"🎯 **止盈**: ${take_profit}")
# 取消挂单时显示要取消的订单
if decision_type == 'CANCEL_PENDING':
orders_to_cancel = decision.get('orders_to_cancel', [])
if orders_to_cancel:
content_parts.append(f"🚫 **取消订单**: {len(orders_to_cancel)}")
for order_id in orders_to_cancel[:3]: # 最多显示3个
content_parts.append(f" - {order_id}")
if len(orders_to_cancel) > 3:
content_parts.append(f" - ... 还有 {len(orders_to_cancel) - 3}")
else:
# HOLD 决策
reasoning = decision.get('reasoning', '综合评估后选择观望')
content_parts.append(f"💭 **理由**: {reasoning}")
content_parts.append("")
content_parts.append(f"⏰ 当前价格: ${current_price:,.2f}")
content = "\n".join(content_parts)
# 发送通知 - [决策] 发送到 paper_trading webhooktrading
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, color)
if self.settings.telegram_enabled:
# Telegram 使用文本格式
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f" 📤 已发送交易决策通知: {decision_text}")
except Exception as e:
logger.warning(f"发送交易决策通知失败: {e}")
import traceback
logger.debug(traceback.format_exc())
async def _send_signal_notification(self, market_signal: Dict[str, Any],
decision: Dict[str, Any], current_price: float,
prefix: str = "", hl_order_status: str = None):
"""发送交易执行通知(第三阶段)
hl_order_status: Hyperliquid 限价单实际状态 'resting'|'filled'|None
"""
try:
decision_type = decision.get('decision', 'HOLD')
# 只在非观望决策时发送执行通知
if decision_type == 'HOLD':
return
# 构建消息 - 使用旧格式风格
symbol = market_signal.get('symbol')
# 添加前缀到标题
title_prefix = f"{prefix} " if prefix else ""
action = decision.get('action', '')
reasoning = decision.get('reasoning', '')
risk_analysis = decision.get('risk_analysis', '')
position_size = decision.get('position_size', 'N/A')
quantity = decision.get('quantity', 'N/A')
stop_loss = decision.get('stop_loss', '')
take_profit = decision.get('take_profit', '')
# confidence 优先从决策本身读取,否则从市场信号的最佳信号读取
confidence = decision.get('confidence')
if confidence is None:
_best = self._get_best_signal_from_market(market_signal)
confidence = _best.get('confidence', 0) if _best else 0
# 决策类型映射
decision_map = {
'OPEN': '开仓',
'CLOSE': '平仓',
'ADD': '加仓',
'REDUCE': '减仓'
}
decision_text = decision_map.get(decision_type, decision_type)
# 账户类型标识
account_type = "📊"
# 方向图标
if 'long' in action.lower() or 'buy' in action.lower():
action_icon = '🟢'
action_text = '做多'
elif 'short' in action.lower() or 'sell' in action.lower():
action_icon = '🔴'
action_text = '做空'
else:
action_icon = ''
action_text = action
# 从市场信号中获取入场方式(需要在构建标题之前)
best_signal = self._get_best_signal_from_market(market_signal)
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
# 对 Hyperliquid 限价单:用实际订单状态决定显示
# resting=真的在挂单中, filled=已立即成交, None=非HL或市价单
if hl_order_status == 'resting':
entry_type_text = '挂单'
entry_type_icon = ''
elif hl_order_status == 'filled':
entry_type_text = '现价成交'
entry_type_icon = ''
else:
entry_type_text = '现价单' if entry_type == 'market' else '挂单'
entry_type_icon = '' if entry_type == 'market' else ''
# 仓位图标
position_map = {'heavy': '🔥 重仓', 'medium': '📊 中仓', 'light': '🌱 轻仓'}
position_display = position_map.get(position_size, '🌱 轻仓')
# 构建卡片标题Hyperliquid 限价单区分实际状态
if decision_type == 'OPEN':
if hl_order_status == 'resting':
decision_title = '挂单中'
elif hl_order_status == 'filled':
decision_title = '开仓(立即成交)'
else:
decision_title = '挂单' if entry_type == 'limit' else '开仓'
title = f"{title_prefix}[执行] {account_type} {symbol} {decision_title}"
color = "green"
elif decision_type == 'CLOSE':
decision_title = '挂单' if entry_type == 'limit' else '平仓'
title = f"{title_prefix}[执行] {account_type} {symbol} {decision_title}"
color = "orange"
elif decision_type == 'ADD':
if hl_order_status == 'resting':
decision_title = '加仓挂单中'
elif hl_order_status == 'filled':
decision_title = '加仓(立即成交)'
else:
decision_title = '挂单' if entry_type == 'limit' else '加仓'
title = f"{title_prefix}[执行] {account_type} {symbol} {decision_title}"
color = "green"
elif decision_type == 'REDUCE':
decision_title = '挂单' if entry_type == 'limit' else '减仓'
title = f"{title_prefix}[执行] {account_type} {symbol} {decision_title}"
color = "orange"
else:
title = f"{title_prefix}[执行] {account_type} {symbol} 交易执行"
color = "blue"
# 构建卡片内容
# quantity 是保证金金额,需要显示持仓价值 = 保证金 × 杠杆
margin = quantity if quantity != 'N/A' else 0
leverage = self.paper_trading.leverage # 使用实际的杠杆配置(而非硬编码 20
position_value = margin * leverage if isinstance(margin, (int, float)) else 'N/A'
position_value_display = f"${position_value:,.2f}" if isinstance(position_value, (int, float)) else "N/A"
# 根据入场方式显示不同的价格信息
if entry_type == 'market':
price_display = f"💵 **入场价**: ${current_price:,.2f} (现价)"
else:
entry_price = best_signal.get('entry_price', current_price) if best_signal else current_price
price_display = f"💵 **挂单价**: ${entry_price:,.2f} (等待)"
content_parts = [
f"{action_icon} **操作**: {decision_text} ({action_text})",
f"{entry_type_icon} **入场方式**: {entry_type_text}",
f"{position_display.replace(' ', ': **')} | 📈 信心度: **{confidence}%**",
f"",
f"💰 **持仓价值**: {position_value_display}",
price_display,
]
if stop_loss:
content_parts.append(f"🛑 **止损价**: ${stop_loss}")
if take_profit:
content_parts.append(f"🎯 **止盈价**: ${take_profit}")
# 简洁的决策理由和风险分析各1句话
content_parts.append(f"")
content_parts.append(f"📝 **决策**: {reasoning}")
if risk_analysis:
content_parts.append(f"⚠️ **风险**: {risk_analysis}")
content_parts.append(f"")
content_parts.append(f"💼 交易已执行")
content = "\n".join(content_parts)
# 根据配置发送通知 - 所有订单执行都发送到 paper trading webhook
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, color)
if self.settings.telegram_enabled:
# Telegram 使用文本格式
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f" 📤 已发送交易执行通知: {decision_text}")
except Exception as e:
logger.warning(f"发送交易执行通知失败: {e}")
async def _notify_execution_failure(self, market_signal: Dict[str, Any],
decision: Dict[str, Any], reason: str,
prefix: str = ""):
"""发送执行失败通知(决策给出了 OPEN/ADD 但实际未能开仓)"""
try:
symbol = market_signal.get('symbol', '')
decision_type = decision.get('decision', 'OPEN')
action = decision.get('action', '')
title_prefix = f"{prefix} " if prefix else ""
action_text = "做多" if 'buy' in action.lower() else ("做空" if 'sell' in action.lower() else action)
decision_text = {'OPEN': '开仓', 'ADD': '加仓'}.get(decision_type, decision_type)
title = f"{title_prefix}⚠️ {symbol} {decision_text}未执行"
content = "\n".join([
f"🔴 **决策**: {decision_text}{action_text}",
f"❌ **未执行原因**: {reason}",
f"🕐 **时间**: {datetime.now().strftime('%H:%M:%S')}",
])
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "red")
if self.settings.telegram_enabled:
await self.telegram.send_message(f"{title}\n\n{content}")
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f" 📤 已发送执行失败通知: {reason}")
except Exception as e:
logger.warning(f"发送执行失败通知失败: {e}")
async def _execute_paper_trade(self, decision: Dict[str, Any], market_signal: Dict[str, Any], current_price: float):
"""执行模拟交易"""
try:
symbol = decision.get('symbol')
action = decision.get('action', '')
position_size = decision.get('position_size', 'light')
# 使用新的动态仓位计算方法
logger.info(f" 计算动态仓位: {position_size}")
margin, position_value = self.paper_trading._calculate_dynamic_position(position_size, symbol)
if margin <= 0:
logger.warning(f" ⚠️ 计算的保证金无效: {margin},无法开仓")
return False
quantity = margin # 保证金金额
logger.info(f" 准备创建订单: {symbol} {action} {position_size}")
logger.info(f" 保证金: ${quantity:.2f} | 持仓价值: ${position_value:.2f}")
# 转换决策的 action 为 paper_trading 期望的格式
trading_action = self._convert_trading_action(action)
# 从市场信号中获取入场方式和入场价格
best_signal = self._get_best_signal_from_market(market_signal)
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
entry_price = best_signal.get('entry_price', current_price) if best_signal else current_price
logger.info(f" 入场方式: {entry_type} | 入场价格: ${entry_price:,.2f}")
# 转换决策为订单格式(价格字段已在 LLM 解析时转换为 float
order_data = {
'symbol': symbol,
'action': trading_action, # 使用转换后的 action
'entry_type': entry_type, # 使用信号中的入场方式
'entry_price': entry_price if entry_type == 'limit' else current_price, # limit单使用entry_pricemarket单使用current_price
'stop_loss': decision.get('stop_loss'),
'take_profit': decision.get('take_profit'),
'confidence': decision.get('confidence', 50),
'signal_grade': 'B', # 默认B级
'position_size': position_size,
'quantity': quantity # 使用计算后的保证金金额
}
logger.debug(f" 订单数据: {order_data}")
logger.info(f" 正在调用 create_order_from_signal...")
result = self.paper_trading.create_order_from_signal(order_data, current_price)
logger.info(f" create_order_from_signal 返回结果: {result}")
# 记录订单
order = result.get('order')
if order:
# quantity 是保证金金额,持仓价值 = 保证金 × 杠杆
leverage = self.paper_trading.leverage # 使用实际的杠杆配置
position_value = quantity * leverage
logger.info(f" ✅ 已创建订单: {order.order_id} | 仓位: {position_size} | 持仓价值: ${position_value:.2f}")
logger.info(f" 订单状态: {order.status.value} | 入场价: ${order.entry_price:,.2f}")
else:
# 订单创建失败,记录详细原因
reason = result.get('message', '未知原因')
cancelled_info = result.get('cancelled_orders', [])
logger.warning(f" ❌ 创建订单失败: {reason}")
if cancelled_info:
logger.warning(f" 已取消的反向订单: {len(cancelled_info)}")
# 返回结果
return result
except Exception as e:
logger.error(f"执行交易失败: {e}")
import traceback
logger.debug(traceback.format_exc())
def _convert_trading_action(self, action: str) -> str:
"""转换交易决策的 action 为 buy/sell 格式"""
if not action:
return 'buy'
action_lower = action.lower()
# 做多相关的 action
if 'long' in action_lower or 'buy' in action_lower:
return 'buy'
# 做空相关的 action
elif 'short' in action_lower or 'sell' in action_lower:
return 'sell'
# 默认返回 buy
return 'buy'
def _calculate_quantity_by_position_size(self, position_size: str, live_trading: bool = False) -> float:
"""根据仓位大小计算实际金额"""
if live_trading:
# 实盘交易配置
position_config = {
'heavy': self.settings.bitget_max_single_position,
'medium': self.settings.bitget_max_single_position * 0.6,
'light': self.settings.bitget_max_single_position * 0.3
}
else:
# 模拟交易配置
position_config = {
'heavy': self.settings.paper_trading_position_a, # 1000
'medium': self.settings.paper_trading_position_b, # 500
'light': self.settings.paper_trading_position_c # 200
}
return position_config.get(position_size, 200)
async def _execute_close(self, decision: Dict[str, Any], current_price: float) -> bool:
"""执行平仓
Args:
decision: 交易决策(应包含 orders_to_close 字段)
current_price: 当前价格
Returns:
是否成功执行平仓
"""
try:
symbol = decision.get('symbol')
orders_to_close = decision.get('orders_to_close', [])
if self.paper_trading:
logger.info(f" 🔒 平仓: {symbol}")
logger.info(f" 理由: {decision.get('reasoning', '')}")
# 如果决策中没有指定订单ID则获取该交易对的所有活跃订单
if not orders_to_close:
logger.warning(f" ⚠️ 决策中未指定 orders_to_close将平仓 {symbol} 的所有持仓")
active_orders = self.paper_trading.get_active_orders(symbol)
orders_to_close = [o.get('order_id') for o in active_orders if o.get('status') in ('OPEN', 'FILLED', 'PENDING')]
if not orders_to_close:
logger.warning(f" 没有找到需要平仓的订单")
return False
logger.info(f" 待平仓订单: {orders_to_close}")
closed_count = 0
for order_id in orders_to_close:
try:
# 先获取订单信息
order_info = self.paper_trading.get_order_by_id(order_id)
if not order_info:
logger.warning(f" ❌ 订单不存在: {order_id}")
continue
status = order_info.get('status')
if status == 'PENDING':
# 取消挂单
result = self.paper_trading.cancel_order(order_id)
if result.get('success'):
logger.info(f" ✅ 已取消挂单: {order_id}")
closed_count += 1
else:
logger.warning(f" ❌ 取消挂单失败: {order_id} - {result.get('message')}")
elif status in ('OPEN', 'FILLED'):
# 平仓已成交订单
result = self.paper_trading.close_order_manual(order_id, current_price)
if result:
logger.info(f" ✅ 已平仓: {order_id} @ ${current_price}")
closed_count += 1
else:
logger.warning(f" ❌ 平仓失败: {order_id}")
else:
logger.warning(f" ⚠️ 订单状态无需处理: {order_id} - {status}")
except Exception as e:
logger.error(f" ❌ 处理订单 {order_id} 失败: {e}")
logger.info(f" 📊 平仓汇总: {closed_count}/{len(orders_to_close)} 个订单已处理")
return closed_count > 0
else:
logger.warning(f" 交易服务未初始化")
return False
except Exception as e:
logger.error(f"执行平仓失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False
async def _execute_cancel_pending(self, decision: Dict[str, Any]) -> bool:
"""执行取消挂单
Args:
decision: 交易决策
Returns:
是否成功取消订单
"""
try:
symbol = decision.get('symbol')
decision_action = decision.get('action', '') # buy/sell
orders_to_cancel = decision.get('orders_to_cancel', [])
if not orders_to_cancel:
logger.info(f" ⚠️ 没有需要取消的订单")
return False
trading_service = self.paper_trading
if not trading_service:
logger.warning(f" 交易服务未启用")
return False
# 安全检查验证要取消的订单是否属于当前symbol且方向相反
active_orders = trading_service.get_active_orders()
valid_orders = []
invalid_orders = []
wrong_direction_orders = []
for order_id in orders_to_cancel:
# 查找订单
order = next((o for o in active_orders if o.get('order_id') == order_id), None)
if not order:
logger.warning(f" ⚠️ 订单不存在: {order_id}")
invalid_orders.append(order_id)
continue
# 检查订单是否属于当前symbol
if order.get('symbol') != symbol:
logger.error(f" ❌ 安全拦截:订单 {order_id} 属于 {order.get('symbol')},不是当前分析标的 {symbol}")
invalid_orders.append(order_id)
continue
# 检查订单方向是否与决策相反
order_side = order.get('side') # long/short
# 决策是buy时应该取消short做空决策是sell时应该取消long做多
should_cancel = False
if decision_action == 'buy' and order_side == 'short':
should_cancel = True
elif decision_action == 'sell' and order_side == 'long':
should_cancel = True
elif decision_action in ['open_long', 'close_short']:
should_cancel = (order_side == 'short')
elif decision_action in ['open_short', 'close_long']:
should_cancel = (order_side == 'long')
if not should_cancel:
logger.error(f" ❌ 安全拦截:订单 {order_id} 方向为 {order_side},与决策 {decision_action} 同向,不应取消!")
wrong_direction_orders.append(order_id)
invalid_orders.append(order_id)
continue
valid_orders.append(order_id)
if invalid_orders:
logger.error(f" 🚫 拒绝取消 {len(invalid_orders)} 个不符合条件的订单")
if wrong_direction_orders:
logger.error(f" ⚠️ {len(wrong_direction_orders)} 个同向订单被拦截(不应取消同向订单)")
if not valid_orders:
logger.warning(f" ⚠️ 没有有效的订单可以取消")
return False
logger.info(f" 🚫 取消挂单: {symbol}")
logger.info(f" 取消订单数量: {len(valid_orders)}")
cancelled_count = 0
for order_id in valid_orders:
try:
# 取消订单
result = trading_service.cancel_order(order_id)
if result.get('success'):
cancelled_count += 1
logger.info(f" ✅ 已取消订单: {order_id}")
else:
logger.warning(f" ⚠️ 取消订单失败: {order_id} | {result.get('message', '')}")
except Exception as e:
logger.error(f" ❌ 取消订单异常: {order_id} | {e}")
logger.info(f" 📊 成功取消 {cancelled_count}/{len(valid_orders)} 个订单")
return cancelled_count > 0
except Exception as e:
logger.error(f"执行取消挂单失败: {e}")
return False
async def _execute_update_pending(self, decision: Dict[str, Any]) -> bool:
"""执行更新挂单参数
Args:
decision: 交易决策
Returns:
是否成功更新订单
"""
try:
symbol = decision.get('symbol')
decision_action = decision.get('action', '') # buy/sell
orders_to_update = decision.get('orders_to_update', [])
# 获取新参数
new_entry_price = decision.get('entry_price')
new_stop_loss = decision.get('stop_loss')
new_take_profit = decision.get('take_profit')
if not orders_to_update:
logger.info(f" ⚠️ 没有需要更新的订单")
return False
if not all([new_entry_price, new_stop_loss, new_take_profit]):
logger.warning(f" ⚠️ 更新参数不完整: entry_price={new_entry_price}, stop_loss={new_stop_loss}, take_profit={new_take_profit}")
return False
trading_service = self.paper_trading
if not trading_service:
logger.warning(f" 交易服务未启用")
return False
# 安全检查验证要更新的订单是否属于当前symbol且方向相同
active_orders = trading_service.get_active_orders()
valid_orders = []
invalid_orders = []
wrong_direction_orders = []
for order_id in orders_to_update:
# 查找订单
order = next((o for o in active_orders if o.order_id == order_id), None)
if not order:
logger.warning(f" ⚠️ 订单不存在: {order_id}")
invalid_orders.append(order_id)
continue
# 检查订单是否已成交(只能更新未成交的挂单)
if order.filled_price and order.filled_price > 0:
logger.warning(f" ⚠️ 订单已成交,无法更新: {order_id}")
invalid_orders.append(order_id)
continue
# 检查订单是否属于当前symbol
if order.symbol != symbol:
logger.error(f" ❌ 安全拦截:订单 {order_id} 属于 {order.symbol},不是当前分析标的 {symbol}")
invalid_orders.append(order_id)
continue
# 检查订单方向是否与决策相同
order_side = order.side.value # LONG/SHORT
# 决策是buy时应该更新LONG做多决策是sell时应该更新SHORT做空
should_update = False
if decision_action == 'buy' and order_side == 'LONG':
should_update = True
elif decision_action == 'sell' and order_side == 'SHORT':
should_update = True
if not should_update:
logger.error(f" ❌ 安全拦截:订单 {order_id} 方向为 {order_side},与决策 {decision_action} 方向不一致")
wrong_direction_orders.append(order_id)
invalid_orders.append(order_id)
continue
valid_orders.append(order)
if invalid_orders:
logger.error(f" 🚫 拒绝更新 {len(invalid_orders)} 个不符合条件的订单")
if wrong_direction_orders:
logger.error(f" ⚠️ {len(wrong_direction_orders)} 个方向不一致的订单被拦截")
if not valid_orders:
logger.warning(f" ⚠️ 没有有效的订单可以更新")
return False
logger.info(f" 🔄 更新挂单: {symbol}")
logger.info(f" 新参数: 入场=${new_entry_price:,.2f}, 止损=${new_stop_loss:,.2f}, 止盈=${new_take_profit:,.2f}")
updated_count = 0
for order in valid_orders:
try:
# 更新订单参数
result = trading_service.update_order(
order.order_id,
entry_price=new_entry_price,
stop_loss=new_stop_loss,
take_profit=new_take_profit
)
if result and result.get('success'):
updated_count += 1
logger.info(f" ✅ 订单更新成功: {order.order_id}")
logger.info(f" 旧参数: 入场=${order.entry_price:,.2f}, 止损=${order.stop_loss:,.2f}, 止盈=${order.take_profit:,.2f}")
else:
logger.warning(f" ⚠️ 更新订单失败: {order.order_id} | {result.get('message', '')}")
except Exception as e:
logger.error(f" ❌ 更新订单异常: {order.order_id} | {e}")
logger.info(f" 📊 成功更新 {updated_count}/{len(valid_orders)} 个订单")
return updated_count > 0
except Exception as e:
logger.error(f"执行更新挂单失败: {e}")
return False
async def _execute_reduce(self, decision: Dict[str, Any]) -> bool:
"""执行减仓
Args:
decision: 交易决策
Returns:
是否成功执行减仓
"""
try:
symbol = decision.get('symbol')
logger.info(f" 📤 减仓: {symbol}")
logger.info(f" 理由: {decision.get('reasoning', '')}")
trading_service = self.paper_trading
if not trading_service:
logger.warning(f" 交易服务未初始化")
return False
# TODO: 实现减仓逻辑
# 减仓可以是部分平仓,需要根据决策中的参数执行
logger.info(f" ⚠️ 减仓功能待实现")
return False
except Exception as e:
logger.error(f"执行减仓失败: {e}")
return False
# ============================================================
# Hyperliquid 执行方法
# ============================================================
async def _notify_hyperliquid_error(self, symbol: str, operation: str, error: str):
"""发送 Hyperliquid 操作失败的飞书/钉钉/Telegram 通知"""
title = f"❌ Hyperliquid 操作失败 - {symbol}"
content = "\n".join([
f"🔴 **操作**: {operation}",
f"⚠️ **错误**: {error}",
f"🕐 **时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
])
logger.error(f"[Hyperliquid] {operation} 失败 | {symbol} | {error}")
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "red")
if self.settings.telegram_enabled:
await self.telegram.send_message(f"{title}\n\n{content}")
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
async def _notify_bitget_error(self, symbol: str, operation: str, error: str):
"""发送 Bitget 操作失败的飞书/钉钉/Telegram 通知"""
title = f"❌ Bitget 操作失败 - {symbol}"
content = "\n".join([
f"🔴 **操作**: {operation}",
f"⚠️ **错误**: {error}",
f"🕐 **时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
])
logger.error(f"[Bitget] {operation} 失败 | {symbol} | {error}")
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "red")
if self.settings.telegram_enabled:
await self.telegram.send_message(f"{title}\n\n{content}")
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
def _get_bitget_trading_state(self) -> tuple:
"""
获取 Bitget 实盘交易状态(持仓和账户)
Returns:
(positions, account, pending_orders)
"""
try:
bg_state = self.bitget.get_account_state()
position_list = []
for pos in self.bitget.get_open_positions():
coin = pos["coin"]
size = pos["size"]
if size != 0:
tp_sl = self.bitget.get_tp_sl_prices(coin)
position_list.append({
'symbol': f"{coin}USDT",
'side': 'buy' if size > 0 else 'sell',
'holding': abs(size),
'entry_price': pos["entry_price"],
'unrealized_pnl': pos["unrealized_pnl"],
'stop_loss': tp_sl.get('stop_loss'),
'take_profit': tp_sl.get('take_profit'),
})
total_position_value = sum(
p['holding'] * p['entry_price'] for p in position_list
)
account = {
'current_balance': bg_state["account_value"],
'initial_balance': self.bitget.initial_balance,
'used_margin': bg_state["total_margin_used"],
'available_balance': bg_state["available_balance"],
'total_position_value': total_position_value,
'max_total_leverage': self.bitget.max_total_leverage,
}
if account['current_balance'] > 0:
account['current_total_leverage'] = total_position_value / account['current_balance']
else:
account['current_total_leverage'] = 0
all_orders = self.bitget.get_open_orders()
pending_orders = []
for order in all_orders:
pending_orders.append({
'order_id': order.get('order_id'),
'symbol': f"{order['symbol']}USDT",
'side': order.get('side', ''),
'entry_price': order.get('price'),
'quantity': order.get('size'),
'entry_type': 'limit',
'is_reduce_only': order.get('is_reduce_only', False),
})
return position_list, account, pending_orders
except Exception as e:
logger.error(f"获取 Bitget 状态失败: {e}")
return [], {}, []
async def _execute_bitget_decisions(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float):
"""执行 Bitget 决策"""
decision_type = decision.get('decision', 'HOLD')
symbol = decision.get('symbol', 'UNKNOWN')
if decision_type == 'HOLD':
reasoning = decision.get('reasoning', '观望')
logger.info(f" Bitget 决策: {reasoning}")
return
try:
if decision_type in ['OPEN', 'ADD']:
logger.info(f" 准备执行 Bitget 交易...")
result = await self._execute_bitget_trade(decision, market_signal, current_price)
if result.get('success'):
logger.info(f" ✅ Bitget 交易成功")
order_status = result.get('verified_order_status', 'filled')
await self._send_signal_notification(market_signal, decision, current_price,
prefix="[Bitget]",
hl_order_status=order_status)
if result.get('tp_sl_warning'):
await self._notify_bitget_error(symbol, "设置止盈止损", result['tp_sl_warning'])
else:
error = result.get('error', '未知错误')
logger.error(f" ❌ Bitget 交易失败: {error}")
await self._notify_bitget_error(symbol, decision_type, error)
elif decision_type == 'CLOSE':
logger.info(f" 准备 Bitget 平仓...")
result = await self._execute_bitget_close(decision, current_price)
if result.get('success'):
logger.info(f" ✅ Bitget 平仓成功")
await self._send_signal_notification(market_signal, decision, current_price, prefix="[Bitget]")
else:
error = result.get('error', '未知错误')
logger.error(f" ❌ Bitget 平仓失败: {error}")
await self._notify_bitget_error(symbol, "平仓", error)
elif decision_type == 'CANCEL_PENDING':
logger.info(f" 准备取消 Bitget 挂单...")
result = await self._execute_bitget_cancel(decision)
if result.get('success'):
logger.info(f" ✅ Bitget 取消成功")
else:
error = result.get('error', '未知错误')
logger.error(f" ❌ Bitget 取消失败: {error}")
await self._notify_bitget_error(symbol, "取消挂单", error)
except Exception as e:
logger.error(f" ❌ Bitget 执行异常: {e}")
await self._notify_bitget_error(symbol, decision_type, str(e))
async def _execute_bitget_trade(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行 Bitget 开仓/加仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
action = decision.get('action', '') # buy/sell
entry_type = decision.get('entry_type', 'market')
entry_price = decision.get('entry_price', current_price)
is_buy = (action == 'buy') # 修复:用 action 字段判断方向
# 如果是加仓,先取消旧的止盈止损单
if decision.get('decision') == 'ADD':
self.bitget.cancel_tp_sl_orders(symbol)
# 计算合约张数
contracts = self._calculate_bitget_position_size(decision, current_price)
if contracts < 1:
return {"success": False, "error": f"仓位计算结果 {contracts} 张,低于最小下单量 1 张"}
# 设置杠杆
leverage = min(decision.get('leverage', 5), 10)
self.bitget.update_leverage(symbol, leverage)
# 下单
if entry_type == 'market':
result = self.bitget.place_market_order(symbol, is_buy=is_buy, size=contracts)
else:
result = self.bitget.place_limit_order(symbol, is_buy=is_buy, size=contracts, price=entry_price)
if not result.get('success'):
return result
order_status = result.get('order_status', 'filled')
# 限价挂单中时验证订单是否真实存在
if entry_type == 'limit' and order_status == 'resting':
order_id = result.get('order_id', '')
open_orders = self.bitget.get_open_orders(symbol)
ids = [str(o.get('order_id', '')) for o in open_orders]
if order_id and order_id not in ids:
logger.warning(f"[Bitget] 挂单 {order_id} 未在挂单列表中,可能已被静默拒绝")
order_status = 'unknown'
result['verified_order_status'] = order_status
tp_price = decision.get('take_profit')
sl_price = decision.get('stop_loss')
if tp_price or sl_price:
if order_status != 'resting':
# 已成交:直接设置止盈止损
tp_sl_result = self.bitget.set_tp_sl(
symbol=symbol,
is_long=is_buy,
size=contracts,
tp_price=tp_price,
sl_price=sl_price,
)
if not tp_sl_result.get('success'):
result['tp_sl_warning'] = tp_sl_result.get('error', 'TP/SL 设置失败')
else:
# 挂单中:记录下来,等下次循环检测成交后补设
order_id = str(result.get('order_id', ''))
if order_id:
self._bg_pending_tp_sl[order_id] = {
'symbol': symbol,
'is_long': is_buy,
'contracts': contracts,
'tp_price': tp_price,
'sl_price': sl_price,
}
logger.info(f" 📌 [Bitget] 挂单 TP/SL 已记录 (oid={order_id}),等成交后补设")
return result
except Exception as e:
logger.error(f"Bitget 开仓失败: {e}")
return {"success": False, "error": str(e)}
async def _execute_bitget_close(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行 Bitget 市价平仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
# 清理该 symbol 的挂单 TP/SL 追踪记录
self._bg_pending_tp_sl = {k: v for k, v in self._bg_pending_tp_sl.items() if v['symbol'] != symbol}
self.bitget.cancel_tp_sl_orders(symbol)
logger.info(f" 取消 Bitget 止盈止损订单")
position = self.bitget.get_position_for_symbol(symbol)
if not position:
return {"success": False, "error": "未找到持仓"}
size_in_coins = abs(position["size"])
is_long = position["size"] > 0
contracts = self.bitget.coins_to_contracts(symbol, size_in_coins)
if contracts < 1:
return {"success": False, "error": f"持仓过小,无法下单({size_in_coins} 币 = {contracts} 张)"}
result = self.bitget.place_market_order(
symbol=symbol,
is_buy=not is_long,
size=contracts,
reduce_only=True
)
return result
except Exception as e:
logger.error(f"Bitget 平仓失败: {e}")
return {"success": False, "error": str(e)}
async def _execute_bitget_cancel(self, decision: Dict[str, Any]) -> Dict[str, Any]:
"""执行 Bitget 取消挂单"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
# 清理该 symbol 的挂单 TP/SL 追踪记录
self._bg_pending_tp_sl = {k: v for k, v in self._bg_pending_tp_sl.items() if v['symbol'] != symbol}
result = self.bitget.cancel_all_orders(symbol)
return result
except Exception as e:
logger.error(f"Bitget 取消挂单失败: {e}")
return {"success": False, "error": str(e)}
def _calculate_bitget_position_size(self, decision: Dict[str, Any], current_price: float) -> int:
"""
计算 Bitget 仓位大小(整数合约张数)
Returns:
可开仓合约数整数张0 表示不可开仓
"""
try:
account_state = self.bitget.get_account_state()
current_balance = account_state["account_value"]
available_balance = account_state["available_balance"]
total_position_value = sum(
abs(p["size"]) * p["entry_price"]
for p in self.bitget.get_open_positions()
)
leverage = min(decision.get('leverage', 5), 10)
max_by_config = self.bitget.max_single_position
max_by_available = available_balance * leverage
max_by_total_leverage = (
current_balance * self.bitget.max_total_leverage - total_position_value
)
max_position_usd = min(max_by_config, max_by_available, max_by_total_leverage)
max_position_usd = min(max_position_usd, current_balance * 0.5)
if max_position_usd <= 0:
logger.warning(f"⚠️ Bitget 可用保证金不足,无法开仓 (balance={current_balance:.2f})")
return 0
symbol = decision.get('symbol', '').replace('USDT', '')
contract_size = self.bitget.get_contract_size(symbol)
if contract_size <= 0 or current_price <= 0:
return 0
# notional → coins → contracts向下取整
coin_amount = max_position_usd / current_price
contracts = math.floor(coin_amount / contract_size)
if contracts < 1:
logger.warning(
f"⚠️ Bitget 仓位计算 {coin_amount:.4f} 币 = {contracts} 张,低于最小 1 张"
)
return 0
logger.info(
f"💰 Bitget 仓位: 最大{max_position_usd:.0f}USD → {coin_amount:.4f}{symbol} "
f"{contracts}张 (合约面值={contract_size}) @ ${current_price:.2f}"
)
return contracts
except Exception as e:
logger.error(f"Bitget 计算仓位大小失败: {e}")
return 0
async def _execute_hyperliquid_decisions(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float):
"""执行 Hyperliquid 决策"""
decision_type = decision.get('decision', 'HOLD') # 修复:使用 'decision' 字段而不是 'action'
symbol = decision.get('symbol', 'UNKNOWN')
if decision_type == 'HOLD':
reasoning = decision.get('reasoning', '观望')
logger.info(f" Hyperliquid 决策: {reasoning}")
return
try:
if decision_type in ['OPEN', 'ADD']:
logger.info(f" 准备执行 Hyperliquid 交易...")
result = await self._execute_hyperliquid_trade(decision, market_signal, current_price)
if result.get('success'):
logger.info(f" ✅ Hyperliquid 交易成功")
# 根据实际订单状态决定通知文案resting=真挂单filled=已成交
order_status = result.get('verified_order_status', 'filled')
await self._send_signal_notification(market_signal, decision, current_price,
prefix="[Hyperliquid]",
hl_order_status=order_status)
# 止盈止损设置失败时单独告警
if result.get('tp_sl_warning'):
await self._notify_hyperliquid_error(symbol, "设置止盈止损", result['tp_sl_warning'])
else:
error = result.get('error', '未知错误')
logger.error(f" ❌ Hyperliquid 交易失败: {error}")
await self._notify_hyperliquid_error(symbol, decision_type, error)
elif decision_type == 'CLOSE':
logger.info(f" 准备 Hyperliquid 平仓...")
result = await self._execute_hyperliquid_close(decision, current_price)
if result.get('success'):
logger.info(f" ✅ Hyperliquid 平仓成功")
await self._send_signal_notification(market_signal, decision, current_price, prefix="[Hyperliquid]")
else:
error = result.get('error', '未知错误')
logger.error(f" ❌ Hyperliquid 平仓失败: {error}")
await self._notify_hyperliquid_error(symbol, "平仓", error)
elif decision_type == 'CANCEL_PENDING':
logger.info(f" 准备取消 Hyperliquid 挂单...")
result = await self._execute_hyperliquid_cancel(decision)
if result.get('success'):
logger.info(f" ✅ Hyperliquid 取消成功")
else:
error = result.get('error', '未知错误')
logger.error(f" ❌ Hyperliquid 取消失败: {error}")
await self._notify_hyperliquid_error(symbol, "取消挂单", error)
except Exception as e:
logger.error(f" ❌ Hyperliquid 执行异常: {e}")
await self._notify_hyperliquid_error(symbol, decision_type, str(e))
async def _execute_hyperliquid_trade(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行 Hyperliquid 开仓/加仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '') # BTCUSDT → BTC
action = decision.get('action', '') # buy/sell
entry_type = decision.get('entry_type', 'market')
entry_price = decision.get('entry_price', current_price)
is_buy = (action == 'buy') # 修复:用 action 字段判断方向
# 计算仓位大小(基于可用保证金和风控)
size = self._calculate_hyperliquid_position_size(decision, current_price)
# 检查保证金是否充足
if size <= 0:
return {"success": False, "error": "保证金不足,无法开仓"}
# 更新杠杆
leverage = min(decision.get('leverage', 10), 10)
self.hyperliquid.update_leverage(symbol, leverage)
# 如果是加仓,先取消旧的止盈止损
if decision.get('action') == 'ADD':
self.hyperliquid.cancel_tp_sl_orders(symbol)
logger.info(f" 取消旧的止盈止损订单")
# 执行交易
if entry_type == 'market':
result = self.hyperliquid.place_market_order(
symbol=symbol,
is_buy=is_buy,
size=size
)
else: # limit
result = self.hyperliquid.place_limit_order(
symbol=symbol,
is_buy=is_buy,
size=size,
price=entry_price
)
# 如果开仓成功,处理止盈止损 + 验证订单实际状态
if result.get('success'):
order_status = result.get('order_status', 'filled') # market单默认filled
# 限价单如果立即成交filled验证持仓是否存在
if entry_type == 'limit' and order_status == 'filled':
position = self.hyperliquid.get_position_for_symbol(symbol)
if position:
logger.info(f" ✅ 限价单立即成交,持仓确认: {symbol} size={position['size']}")
else:
logger.warning(f" ⚠️ 限价单显示 filled 但未查到持仓,可能已被平仓或数据延迟")
# 限价单如果仍在挂单中resting验证订单是否在挂单列表
elif entry_type == 'limit' and order_status == 'resting':
order_id = result.get('order_id')
open_orders = self.hyperliquid.get_open_orders(symbol)
if any(o.get('order_id') == order_id for o in open_orders):
logger.info(f" ✅ 限价单已挂出并确认可见: oid={order_id}")
else:
logger.warning(f" ⚠️ 限价单 oid={order_id} 未在挂单列表中查到(可能已成交或延迟)")
# 将实际状态写回 result供通知层使用
result['verified_order_status'] = order_status
tp_price = decision.get('take_profit')
sl_price = decision.get('stop_loss')
if tp_price or sl_price:
# 只有已成交的订单才设置止盈止损(挂单中的不设,等成交后再设)
if order_status != 'resting':
tp_sl_result = self.hyperliquid.set_tp_sl(
symbol=symbol,
is_long=is_buy,
size=size,
tp_price=tp_price,
sl_price=sl_price
)
if not tp_sl_result.get('success'):
logger.warning(f" ⚠️ 设置止盈止损失败: {tp_sl_result.get('error')}")
result['tp_sl_warning'] = tp_sl_result.get('error', '设置止盈止损失败')
else:
# 挂单中:记录下来,等下次循环检测成交后补设
order_id = str(result.get('order_id', ''))
if order_id:
self._hl_pending_tp_sl[order_id] = {
'symbol': symbol,
'is_long': is_buy,
'size': size,
'tp_price': tp_price,
'sl_price': sl_price,
}
logger.info(f" 📌 [Hyperliquid] 挂单 TP/SL 已记录 (oid={order_id}),等成交后补设")
return result
except Exception as e:
logger.error(f"Hyperliquid 交易执行失败: {e}")
return {"success": False, "error": str(e)}
async def _execute_hyperliquid_close(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行 Hyperliquid 平仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
# 清理该 symbol 的挂单 TP/SL 追踪记录
self._hl_pending_tp_sl = {k: v for k, v in self._hl_pending_tp_sl.items() if v['symbol'] != symbol}
# 先取消所有止盈止损订单
self.hyperliquid.cancel_tp_sl_orders(symbol)
logger.info(f" 取消止盈止损订单")
# 获取当前持仓
position = self.hyperliquid.get_position_for_symbol(symbol)
if not position:
return {"success": False, "error": "未找到持仓"}
size = abs(position["size"])
is_long = position["size"] > 0
# 平仓(方向相反)
result = self.hyperliquid.place_market_order(
symbol=symbol,
is_buy=not is_long,
size=size,
reduce_only=True
)
return result
except Exception as e:
logger.error(f"Hyperliquid 平仓失败: {e}")
return {"success": False, "error": str(e)}
async def _execute_hyperliquid_cancel(self, decision: Dict[str, Any]) -> Dict[str, Any]:
"""执行 Hyperliquid 取消挂单"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
# 清理该 symbol 的挂单 TP/SL 追踪记录
self._hl_pending_tp_sl = {k: v for k, v in self._hl_pending_tp_sl.items() if v['symbol'] != symbol}
result = self.hyperliquid.cancel_all_orders(symbol)
return result
except Exception as e:
logger.error(f"Hyperliquid 取消挂单失败: {e}")
return {"success": False, "error": str(e)}
async def _check_and_set_pending_tp_sl_hyperliquid(self):
"""检查 Hyperliquid 挂单是否已成交,若成交则补设止盈止损"""
if not self._hl_pending_tp_sl:
return
try:
for order_id, info in list(self._hl_pending_tp_sl.items()):
symbol = info['symbol']
open_orders = self.hyperliquid.get_open_orders(symbol)
still_open = any(str(o.get('order_id')) == order_id for o in open_orders)
if not still_open:
# 订单已不在挂单列表 → 已成交,补设 TP/SL
tp_price = info.get('tp_price')
sl_price = info.get('sl_price')
logger.info(f"[Hyperliquid] 挂单 {order_id} ({symbol}) 已成交,补设 TP/SL...")
tp_sl_result = self.hyperliquid.set_tp_sl(
symbol=symbol,
is_long=info['is_long'],
size=info['size'],
tp_price=tp_price,
sl_price=sl_price,
)
if tp_sl_result.get('success'):
logger.info(f"[Hyperliquid] ✅ TP/SL 补设成功: {symbol} TP={tp_price} SL={sl_price}")
else:
logger.warning(f"[Hyperliquid] ⚠️ TP/SL 补设失败: {tp_sl_result.get('error')}")
del self._hl_pending_tp_sl[order_id]
except Exception as e:
logger.error(f"[Hyperliquid] 检查挂单 TP/SL 补设异常: {e}")
async def _check_and_set_pending_tp_sl_bitget(self):
"""检查 Bitget 挂单是否已成交,若成交则补设止盈止损"""
if not self._bg_pending_tp_sl:
return
try:
for order_id, info in list(self._bg_pending_tp_sl.items()):
symbol = info['symbol']
open_orders = self.bitget.get_open_orders(symbol)
still_open = any(str(o.get('order_id')) == order_id for o in open_orders)
if not still_open:
# 订单已不在挂单列表 → 已成交,补设 TP/SL
tp_price = info.get('tp_price')
sl_price = info.get('sl_price')
logger.info(f"[Bitget] 挂单 {order_id} ({symbol}) 已成交,补设 TP/SL...")
tp_sl_result = self.bitget.set_tp_sl(
symbol=symbol,
is_long=info['is_long'],
size=info['contracts'],
tp_price=tp_price,
sl_price=sl_price,
)
if tp_sl_result.get('success'):
logger.info(f"[Bitget] ✅ TP/SL 补设成功: {symbol} TP={tp_price} SL={sl_price}")
else:
logger.warning(f"[Bitget] ⚠️ TP/SL 补设失败: {tp_sl_result.get('error')}")
del self._bg_pending_tp_sl[order_id]
except Exception as e:
logger.error(f"[Bitget] 检查挂单 TP/SL 补设异常: {e}")
def _calculate_hyperliquid_position_size(self, decision: Dict[str, Any], current_price: float) -> float:
"""
计算 Hyperliquid 仓位大小(基于可用保证金和风控限制)
Args:
decision: 交易决策
current_price: 当前价格
Returns:
可开仓数量(币的数量,如 BTC = 0.01
"""
try:
# 获取账户状态
account_state = self.hyperliquid.get_account_state()
current_balance = account_state["account_value"]
used_margin = account_state["total_margin_used"]
available_balance = account_state["available_balance"]
# 获取当前所有持仓的总价值
total_position_value = 0
positions = self.hyperliquid.get_open_positions()
for pos in positions:
size = abs(pos["size"])
entry_price = pos["entry_price"]
total_position_value += size * entry_price
# 当前总杠杆
current_total_leverage = total_position_value / current_balance if current_balance > 0 else 0
# 获取杠杆配置
leverage = min(decision.get('leverage', 5), 10) # 最大 10x
# 计算最大可开仓金额(考虑多个限制)
max_by_config = self.hyperliquid.max_single_position # 配置的单笔限制
max_by_available = available_balance * leverage # 可用保证金 × 杠杆
max_by_total_leverage = (current_balance * self.hyperliquid.max_total_leverage - total_position_value) # 总杠杆限制
# 取最小值作为最大可开仓金额
max_position_usd = min(max_by_config, max_by_available, max_by_total_leverage)
# 风控检查:不能超过可用余额的 50%(保守策略)
max_position_usd = min(max_position_usd, current_balance * 0.5)
# 如果计算出的最大值 <= 0说明保证金不足
if max_position_usd <= 0:
logger.warning(f"⚠️ 可用保证金不足,无法开仓")
logger.warning(f" 账户价值: ${current_balance:.2f}")
logger.warning(f" 可用余额: ${available_balance:.2f}")
logger.warning(f" 总持仓价值: ${total_position_value:.2f}")
logger.warning(f" 当前总杠杆: {current_total_leverage:.2f}x")
return 0
# 根据当前价格计算数量
size = max_position_usd / current_price
# 按 Hyperliquid 要求的精度截断szDecimals
# ETH=3位, BTC=5位 等,必须截断而非四舍五入,避免超出允许仓位
sz_decimals = self.hyperliquid.get_sz_decimals(decision.get('symbol', '').replace('USDT', ''))
factor = 10 ** sz_decimals
size = math.floor(size * factor) / factor # 截断,不四舍五入
# 最小下单量检查
min_size = 1 / factor
if size < min_size:
logger.warning(f"⚠️ 计算仓位 {size} 低于最小下单量 {min_size},取消开仓")
return 0
logger.info(f"💰 仓位计算:")
logger.info(f" 账户价值: ${current_balance:.2f}")
logger.info(f" 可用余额: ${available_balance:.2f}")
logger.info(f" 总持仓价值: ${total_position_value:.2f}")
logger.info(f" 当前总杠杆: {current_total_leverage:.2f}x")
logger.info(f" 计划杠杆: {leverage}x")
logger.info(f" 最大可开仓金额: ${max_position_usd:.2f} (限制: min(配置${max_by_config:.0f}, 可用${max_by_available:.0f}, 杠杆${max_by_total_leverage:.0f}))")
logger.info(f" 计算数量: {size} (精度: {sz_decimals}位) @ ${current_price:.2f}")
return size
except Exception as e:
logger.error(f"计算仓位大小失败: {e}")
# 发生错误时返回 0不开仓
return 0
def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""转换 LLM 信号格式为模拟交易格式"""
signal_type = signal.get('type', 'medium_term')
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
# 获取入场类型和入场价
entry_type = signal.get('entry_type', 'market')
entry_price = signal.get('entry_price', current_price)
return {
'symbol': symbol,
'action': signal.get('action', 'hold'),
'entry_type': entry_type, # market 或 limit
'entry_price': entry_price, # 入场价(挂单价格)
'price': current_price, # 当前价格
'stop_loss': signal.get('stop_loss', 0),
'take_profit': signal.get('take_profit', 0),
'confidence': signal.get('confidence', 0),
'signal_grade': signal.get('grade', 'D'),
'signal_type': type_map.get(signal_type, 'swing'),
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
'reasons': [signal.get('reason', '')],
'timestamp': datetime.now()
}
def _calculate_price_change(self, h1_data: pd.DataFrame) -> str:
"""计算24小时价格变化"""
if len(h1_data) < 24:
return "N/A"
price_now = h1_data.iloc[-1]['close']
price_24h_ago = h1_data.iloc[-24]['close']
change = ((price_now - price_24h_ago) / price_24h_ago) * 100
if change >= 0:
return f"+{change:.2f}%"
return f"{change:.2f}%"
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
"""验证数据完整性"""
required_intervals = ['1m', '5m', '15m', '30m', '1h']
for interval in required_intervals:
if interval not in data or data[interval].empty:
return False
if len(data[interval]) < 20:
return False
return True
def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool:
"""判断是否应该发送信号"""
action = signal.get('action', 'wait')
if action == 'wait':
return False
confidence = signal.get('confidence', 0)
# 使用配置文件中的阈值
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
if confidence < threshold:
return False
# 检查冷却时间30分钟内不重复发送相同方向的信号
if symbol in self.signal_cooldown:
cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30)
if datetime.now() < cooldown_end:
if symbol in self.last_signals:
if self.last_signals[symbol].get('action') == action:
logger.debug(f"{symbol} 信号冷却中,跳过")
return False
return True
async def _review_and_adjust_positions(
self,
symbol: str,
data: Dict[str, pd.DataFrame]
):
"""
回顾并调整现有持仓(基于市场分析 + 当前持仓状态)
每次分析后自动回顾该交易对的所有持仓,让决策器决定是否需要:
- 调整止损止盈
- 部分平仓
- 全部平仓
"""
try:
# 获取该交易对的所有活跃持仓(只看已成交的)
active_orders = self.paper_trading.get_active_orders()
positions = [
order for order in active_orders
if order.get('symbol') == symbol
and order.get('status') == 'open'
and order.get('filled_price') # 只处理已成交的订单
]
if not positions:
return # 没有持仓需要回顾
logger.info(f"\n🔄 【持仓回顾中...】共 {len(positions)} 个持仓")
# TODO: 实现持仓回顾功能
# 1. 获取当前市场信号
# 2. 将持仓信息传递给决策器
# 3. 根据决策调整持仓
logger.info(" 持仓回顾功能待实现")
except Exception as e:
logger.error(f"持仓回顾失败: {e}", exc_info=True)
def _convert_to_real_signal(self, symbol: str, signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""转换 LLM 信号格式为实盘交易格式"""
signal_type = signal.get('type', 'medium_term')
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
# 获取入场类型和入场价
entry_type = signal.get('entry_type', 'market')
entry_price = signal.get('entry_price', current_price)
# 映射 action: buy -> long, sell -> short
action = signal.get('action', 'hold')
side_map = {'buy': 'long', 'sell': 'short'}
return {
'symbol': symbol,
'side': side_map.get(action, 'long'),
'entry_type': entry_type,
'entry_price': entry_price,
'stop_loss': signal.get('stop_loss', 0),
'take_profit': signal.get('take_profit', 0),
'confidence': signal.get('confidence', 0),
'grade': signal.get('grade', 'D'),
'signal_type': type_map.get(signal_type, 'swing'),
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
'trend': signal.get('trend')
}
async def _notify_position_adjustment(
self,
symbol: str,
order_id: str,
decision: Dict[str, Any],
result: Dict[str, Any]
):
"""发送持仓调整通知"""
action = decision.get('action')
reason = decision.get('reason', '')
action_map = {
'ADJUST_SL_TP': '🔄 调整止损止盈',
'PARTIAL_CLOSE': '📤 部分平仓',
'FULL_CLOSE': '🚪 全部平仓'
}
action_text = action_map.get(action, action)
title = f"{action_text} - {symbol}"
content_parts = [
f"📊 **订单**: {order_id[:8]}",
f"📝 **原因**: {reason}",
]
if action == 'ADJUST_SL_TP':
changes = result.get('changes', [])
content_parts.append(f"🔄 **调整内容**: {', '.join(changes)}")
elif action in ['PARTIAL_CLOSE', 'FULL_CLOSE']:
pnl_info = result.get('pnl', {})
if pnl_info:
pnl = pnl_info.get('pnl', 0)
pnl_percent = pnl_info.get('pnl_percent', 0)
content_parts.append(f"💰 **实现盈亏**: ${pnl:+.2f} ({pnl_percent:+.1f}%)")
if action == 'PARTIAL_CLOSE':
close_percent = decision.get('close_percent', 0)
remaining = result.get('remaining_quantity', 0)
content_parts.append(f"📊 **平仓比例**: {close_percent:.0f}%")
content_parts.append(f"💵 **剩余仓位**: ${remaining:,.0f}")
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "blue")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
async def _notify_signal_not_executed(
self,
market_signal: Dict[str, Any],
decision: Dict[str, Any],
current_price: float,
reason: str = ""
):
"""发送有信号但未执行交易的通知"""
try:
symbol = market_signal.get('symbol')
account_type = "📊"
# 获取最佳信号
best_signal = self._get_best_signal_from_market(market_signal)
if not best_signal:
return
confidence = best_signal.get('confidence', 0)
entry_type = best_signal.get('entry_type', 'market')
entry_price = best_signal.get('entry_price', current_price)
# 决策信息
decision_type = decision.get('decision', 'HOLD')
decision_reasoning = decision.get('reasoning', '')
# 如果有外部传入的 reason订单创建失败的具体原因优先使用
if reason:
final_reason = reason
elif decision_reasoning:
final_reason = decision_reasoning
else:
final_reason = "未知原因"
# 方向图标
action = best_signal.get('action', 'wait')
if action == 'buy':
action_icon = '🟢'
action_text = '做多'
elif action == 'sell':
action_icon = '🔴'
action_text = '做空'
else:
action_icon = ''
action_text = '观望'
# 构建标题
title = f"{account_type} {symbol} 信号未执行"
# 构建内容
content_parts = [
f"{action_icon} **信号**: {action_text} | 📈 信心度: **{confidence}%**",
f"",
f"**入场方式**: {entry_type}",
f"**建议入场价**: ${entry_price:,.2f}" if isinstance(entry_price, (int, float)) else f"**建议入场价**: {entry_price}",
f"**当前价格**: ${current_price:,.2f}",
f"",
f"⚠️ **未执行原因**:",
f"{final_reason}",
]
content = "\n".join(content_parts)
# 发送通知
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "orange")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f" 📤 已发送信号未执行通知: {decision_type} - {final_reason[:50]}")
except Exception as e:
logger.warning(f"发送信号未执行通知失败: {e}")
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
"""单次分析(用于测试或手动触发)"""
data = self.exchange.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
return {'error': '数据不完整'}
# 使用新架构:市场分析 + 交易决策
previous_signal = self.last_signals.get(symbol)
market_signal = await self.market_analyzer.analyze(
symbol, data,
symbols=self.symbols,
previous_signal=previous_signal
)
positions, account = self._get_trading_state()
decision = await self.decision_maker.make_decision(
market_signal, positions, account
)
return {
'market_signal': market_signal,
'trading_decision': decision
}
def get_status(self) -> Dict[str, Any]:
"""获取智能体状态"""
return {
'running': self.running,
'symbols': self.symbols,
'mode': 'LLM 驱动',
'last_signals': {
symbol: {
'type': sig.get('type'),
'action': sig.get('action'),
'confidence': sig.get('confidence'),
'grade': sig.get('grade')
}
for symbol, sig in self.last_signals.items()
}
}
# 全局单例
_crypto_agent: Optional['CryptoAgent'] = None
def get_crypto_agent() -> 'CryptoAgent':
"""获取加密货币智能体单例"""
# 直接使用类单例,不使用全局变量(避免 reload 时重置)
return CryptoAgent()