stock-ai-agent/backend/app/crypto_agent/crypto_agent.py
2026-03-28 22:51:49 +08:00

3895 lines
171 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
加密货币交易智能体 - 主控制器LLM 驱动版)
"""
import asyncio
import math
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pandas as pd
from app.utils.logger import logger
from app.config import get_settings
from app.services.bitget_service import bitget_service
from app.services.feishu_service import get_feishu_service, get_feishu_paper_trading_service
from app.services.telegram_service import get_telegram_service
from app.services.dingtalk_service import get_dingtalk_service
from app.services.paper_trading_service import get_paper_trading_service
from app.services.signal_database_service import get_signal_db_service
from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer
from app.crypto_agent.trading_decision_maker import TradingDecisionMaker
from app.utils.system_status import get_system_monitor, AgentStatus
class CryptoAgent:
"""加密货币交易信号智能体LLM 驱动版)"""
_instance = None
_initialized = False
# 平台交易规则配置
PLATFORM_RULES = {
'Bitget': {
'min_margin': {
'BTC': 85, # 0.01 BTC/张 ≈ $850, 10x 杠杆 → $85
'ETH': 35, # 0.1 ETH/张 ≈ $350, 10x 杠杆 → $35
'SOL': 14, # 1 SOL/张 ≈ $140, 10x 杠杆 → $14
'BNB': 7, # 0.1 BNB/张 ≈ $70, 10x 杠杆 → $7
'XRP': 10, # 10 XRP/张 ≈ $100, 10x 杠杆 → $10
'DOGE': 8, # 100 DOGE/张 ≈ $80, 10x 杠杆 → $8
'ADA': 8, # 10 ADA/张 ≈ $80 (估计)
'AVAX': 10, # 1 AVAX/张 ≈ $100
'LINK': 8, # 1 LINK/张 ≈ $80
'DOT': 5, # 1 DOT/张 ≈ $50
'MATIC': 8, # 10 MATIC/张 ≈ $80
'POL': 8, # 10 POL/张 ≈ $80
'LTC': 85, # 0.1 LTC/张 ≈ $85
'BCH': 35, # 0.1 BCH/张 ≈ $350
'FIL': 5, # 1 FIL/张 ≈ $50
'ATOM': 5, # 1 ATOM/张 ≈ $50
'UNI': 5, # 1 UNI/张 ≈ $50
},
'max_margin_pct': 0.25, # 单笔最大25%(支持超激进配置)
},
'PaperTrading': {
'min_margin': {}, # 无最小限制
'max_margin_pct': 0.25, # 单笔最大25%(与实盘一致)
},
'Hyperliquid': {
'min_margin': {
'BTC': 50, # Hyperliquid 最小约 $50
'ETH': 20,
'SOL': 10,
},
'max_margin_pct': 0.25, # 单笔最大25%
}
}
def __new__(cls, *args, **kwargs):
"""单例模式 - 确保只有一个实例"""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""初始化智能体"""
# 防止重复初始化
if CryptoAgent._initialized:
return
CryptoAgent._initialized = True
self.settings = get_settings()
self.exchange = bitget_service # 交易所服务
self.feishu = get_feishu_service() # 通用飞书服务crypto等
self.feishu_paper = get_feishu_paper_trading_service() # 模拟交易专用飞书服务
self.telegram = get_telegram_service()
self.dingtalk = get_dingtalk_service() # 添加钉钉服务
# 新架构:市场信号分析器 + 交易决策器
self.market_analyzer = MarketSignalAnalyzer()
self.decision_maker = None # 延迟初始化,需要 paper_trading 的杠杆配置
self.signal_db = get_signal_db_service() # 信号数据库服务
# 模拟交易服务(始终启用)
self.paper_trading = get_paper_trading_service()
# 初始化决策器(需要杠杆配置)
self.decision_maker = TradingDecisionMaker(
leverage=self.paper_trading.leverage,
max_total_leverage=self.paper_trading.max_total_leverage
)
# Hyperliquid 实盘服务(可选)
from app.services.hyperliquid_trading_service import get_hyperliquid_service
self.hyperliquid = get_hyperliquid_service()
if self.hyperliquid:
logger.info(f"🔥 Hyperliquid 实盘交易: 已启用")
else:
logger.info(f"📊 Hyperliquid 实盘交易: 未启用(仅模拟盘)")
# Bitget 实盘服务(可选)
from app.services.bitget_live_trading_service import get_bitget_live_service
self.bitget = get_bitget_live_service()
if self.bitget:
logger.info(f"🔥 Bitget 实盘交易: 已启用")
else:
logger.info(f"📊 Bitget 实盘交易: 未启用(仅模拟盘)")
# 初始化平台执行器
from app.crypto_agent.executor import PaperTradingExecutor, BitgetExecutor, HyperliquidExecutor
self.executors = {}
# 模拟盘执行器
if self.settings.paper_trading_enabled:
self.executors['PaperTrading'] = PaperTradingExecutor()
logger.info(f" 📊 模拟盘执行器: 已初始化")
# Bitget 执行器
if self.bitget:
self.executors['Bitget'] = BitgetExecutor()
logger.info(f" 🔥 Bitget 执行器: 已初始化")
# Hyperliquid 执行器
if self.hyperliquid:
self.executors['Hyperliquid'] = HyperliquidExecutor()
logger.info(f" 🔥 Hyperliquid 执行器: 已初始化")
# 状态管理
self.last_signals: Dict[str, Dict[str, Any]] = {}
self.signal_cooldown: Dict[str, datetime] = {}
# 账户初始余额持久化(用于计算回撤)
self._initial_balances: Dict[str, float] = {}
self._load_initial_balances()
# 挂单 TP/SL 追踪:挂单成交后自动补设止盈止损
# key=order_id, value={symbol, is_long, size/contracts, tp_price, sl_price}
self._hl_pending_tp_sl: Dict[str, Dict] = {}
self._bg_pending_tp_sl: Dict[str, Dict] = {}
# 配置
self.symbols = self.settings.crypto_symbols.split(',')
# 运行状态
self.running = False
self._event_loop = None
# 注册到系统监控
monitor = get_system_monitor()
self._monitor_info = monitor.register_agent(
agent_id="crypto_agent",
name="加密货币智能体",
agent_type="crypto"
)
monitor.update_config("crypto_agent", {
"symbols": self.symbols,
"auto_trading_enabled": True, # 模拟交易始终启用
"hyperliquid_enabled": self.hyperliquid is not None,
"bitget_enabled": self.bitget is not None,
"analysis_interval": "每5分钟整点"
})
logger.info(f"加密货币智能体初始化完成LLM 驱动),监控交易对: {self.symbols}")
logger.info(f"📊 模拟交易: 始终启用")
def _on_price_update(self, symbol: str, price: float):
"""处理实时价格更新(用于模拟交易)"""
if not self.paper_trading:
return
triggered = self.paper_trading.check_price_triggers(symbol, price)
for result in triggered:
if self._event_loop and self._event_loop.is_running():
# 根据事件类型选择不同的通知方法
event_type = result.get('event_type', 'order_closed')
if event_type == 'order_filled':
asyncio.run_coroutine_threadsafe(self._notify_order_filled(result), self._event_loop)
elif event_type == 'breakeven_triggered':
asyncio.run_coroutine_threadsafe(self._notify_breakeven_triggered(result), self._event_loop)
else:
asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop)
else:
logger.warning(f"无法发送通知: 事件循环不可用")
async def _notify_order_filled(self, result: Dict[str, Any]):
"""发送挂单成交通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
grade = result.get('signal_grade', 'N/A')
title = f"✅ 挂单成交 - {result.get('symbol')}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"⭐ **信号等级**: {grade}",
f"💰 **挂单价**: ${result.get('entry_price', 0):,.2f}",
f"🎯 **成交价**: ${result.get('filled_price', 0):,.2f}",
f"💵 **仓位**: ${result.get('quantity', 0):,.0f}",
]
if result.get('stop_loss'):
content_parts.append(f"🛑 **止损**: ${result.get('stop_loss', 0):,.2f}")
if result.get('take_profit'):
content_parts.append(f"🎯 **止盈**: ${result.get('take_profit', 0):,.2f}")
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "green")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送挂单成交通知: {result.get('order_id')}")
async def _notify_pending_cancelled(self, result: Dict[str, Any]):
"""发送挂单撤销通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
new_side_text = "做多" if result.get('new_side') == 'long' else "做空"
title = f"⚠️ 挂单撤销 - {result.get('symbol')}"
content_parts = [
f"{side_icon} **原方向**: {side_text}",
f"💰 **挂单价**: ${result.get('entry_price', 0):,.2f}",
f"",
f"📝 **原因**: 收到反向{new_side_text}信号,自动撤销",
]
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "orange")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送挂单撤销通知: {result.get('order_id')}")
async def _notify_breakeven_triggered(self, result: Dict[str, Any]):
"""发送移动止损触发通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = '🟢' if result.get('side') == 'long' else '🔴'
pnl_percent = result.get('current_pnl_percent', 0)
title = f"📈 移动止损已启动 - {result.get('symbol')}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"",
f"💰 **开仓价**: ${result.get('filled_price', 0):,.2f}",
f"📈 **当前盈利**: {pnl_percent:+.2f}%",
f"🛑 **新止损价**: ${result.get('new_stop_loss', 0):,.2f}",
f"",
f"💰 锁定利润,让利润奔跑"
]
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "green")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送移动止损通知: {result.get('order_id')}")
async def _notify_order_closed(self, result: Dict[str, Any]):
"""发送订单平仓通知"""
status = result.get('status', '')
is_win = result.get('is_win', False)
if status == 'closed_tp':
emoji = "🎯"
status_text = "止盈平仓"
color = "green"
elif status == 'closed_sl':
emoji = "🛑"
status_text = "止损平仓"
color = "red"
elif status == 'closed_ts':
emoji = "📈"
status_text = "移动止盈"
color = "green"
elif status == 'closed_be':
emoji = "🔒"
status_text = "保本止损"
color = "orange"
else:
emoji = "📤"
status_text = "手动平仓"
color = "blue"
win_text = "盈利" if is_win else "亏损"
win_emoji = "" if is_win else ""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
title = f"{emoji} 订单{status_text}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"💰 **交易对**: {result.get('symbol')}",
f"📊 **入场**: ${result.get('entry_price', 0):,.2f}",
f"🎯 **出场**: ${result.get('exit_price', 0):,.2f}",
f"{win_emoji} **{win_text}**: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})",
f"⏱️ **持仓时间**: {result.get('hold_duration', 'N/A')}",
]
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, color)
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送订单平仓通知: {result.get('order_id')}")
async def _notify_expired_orders_cancelled(self, cancelled_orders: List[Dict[str, Any]]):
"""
发送超时订单取消通知
Args:
cancelled_orders: 被取消的订单列表
"""
if not cancelled_orders:
return
title = f"⏰ 已自动取消 {len(cancelled_orders)} 个超时挂单"
# 构建订单列表内容
order_lines = []
for order in cancelled_orders[:5]: # 最多显示5个
side_icon = "🟢" if order['side'] == 'long' else "🔴"
order_lines.append(
f"{side_icon} **{order['symbol']}** ({order['side']})\n"
f" 入场价: ${order['entry_price']:.2f} | "
f"已挂单: {order['age_hours']:.1f}小时"
)
if len(cancelled_orders) > 5:
order_lines.append(f"\n... 还有 {len(cancelled_orders) - 5} 个订单")
content_parts = [
f"⏰ **挂单超时自动取消**",
f"📊 **取消数量**: {len(cancelled_orders)}",
f"⚙️ **超时阈值**: {self.paper_trading.order_timeout_hours} 小时",
"",
"**取消的订单**:",
]
content_parts.extend(order_lines)
content_parts.append("\n💡 挂单超时自动取消,释放仓位供新信号使用")
content = "\n".join(content_parts)
# 发送通知
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "orange")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送超时订单取消通知: {len(cancelled_orders)} 个订单")
def _get_seconds_until_next_5min(self) -> int:
"""计算距离下一个5分钟整点的秒数"""
now = datetime.now()
current_minute = now.minute
current_second = now.second
minutes_past = current_minute % 5
if minutes_past == 0 and current_second == 0:
return 0
minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5
seconds_to_wait = minutes_to_wait * 60 - current_second
return seconds_to_wait
async def run(self):
"""主运行循环"""
self.running = True
self._event_loop = asyncio.get_event_loop()
# 更新状态为启动中
monitor = get_system_monitor()
monitor.update_status("crypto_agent", AgentStatus.STARTING)
# 启动横幅
logger.info("\n" + "=" * 60)
logger.info("🚀 加密货币交易信号智能体LLM 驱动)")
logger.info("=" * 60)
logger.info(f" 监控交易对: {', '.join(self.symbols)}")
logger.info(f" 运行模式: 每5分钟整点执行")
logger.info(f" 分析引擎: LLM 自主分析")
logger.info(f" 交易模式: 自动交易已启用")
logger.info("=" * 60 + "\n")
# 更新状态为运行中
monitor.update_status("crypto_agent", AgentStatus.RUNNING)
# 注意:不再启动独立的价格监控
# 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查
logger.info(f"交易已启用(由后台统一监控)")
# 发送启动通知(卡片格式)
title = "🚀 加密货币智能体已启动"
# 构建卡片内容
content_parts = [
f"🤖 **驱动引擎**: LLM 自主分析",
f"📊 **监控交易对**: {len(self.symbols)}",
f" {', '.join(self.symbols)}",
f"⏰ **运行频率**: 每5分钟整点",
f"💰 **交易系统**: 已启用(后台统一监控)",
f"🎯 **分析维度**: 技术面 + 资金面 + 情绪面",
]
content = "\n".join(content_parts)
await self.feishu.send_card(title, content, "green")
await self.telegram.send_startup_notification(self.symbols)
while self.running:
try:
wait_seconds = self._get_seconds_until_next_5min()
if wait_seconds > 0:
next_run = datetime.now() + timedelta(seconds=wait_seconds)
logger.info(f"⏳ 等待 {wait_seconds} 秒,下次运行: {next_run.strftime('%H:%M:%S')}")
await asyncio.sleep(wait_seconds)
run_time = datetime.now()
logger.info("\n" + "=" * 60)
logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]")
logger.info("=" * 60)
# 1. 首先检查账户级止损(所有平台)
should_stop, stop_reason = await self._check_account_level_stop_loss()
if should_stop:
logger.error(f"🚨 {stop_reason}")
# 触发账户级止损,停止所有交易
self.running = False
break
# 检查并取消超时挂单(在分析开始前)
cancelled = self.paper_trading.check_and_cancel_expired_orders()
if cancelled:
logger.info(f"🔄 已自动取消 {len(cancelled)} 个超时挂单")
# 发送超时取消通知
await self._notify_expired_orders_cancelled(cancelled)
# 使用执行器检查挂单超时(各平台)
await self._check_pending_order_timeouts()
# 使用执行器检查持仓管理(止盈/超时退出/移动止损)
await self._check_position_management_all_platforms()
# 检查实盘挂单是否已成交,补设止盈止损
if self.hyperliquid:
await self._check_and_set_pending_tp_sl_hyperliquid()
if self.bitget:
await self._check_and_set_pending_tp_sl_bitget()
for symbol in self.symbols:
await self.analyze_symbol(symbol)
logger.info("\n" + "" * 60)
logger.info(f"✅ 本轮分析完成,共分析 {len(self.symbols)} 个交易对")
logger.info("" * 60 + "\n")
await asyncio.sleep(2)
except Exception as e:
logger.error(f"❌ 分析循环出错: {e}")
import traceback
logger.error(traceback.format_exc())
await asyncio.sleep(10)
def stop(self):
"""停止运行"""
self.running = False
# 更新状态
monitor = get_system_monitor()
monitor.update_status("crypto_agent", AgentStatus.STOPPED)
logger.info("加密货币智能体已停止")
def _check_volatility(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[bool, str, float]:
"""
检查波动率,判断是否值得进行 LLM 分析(组合方案)
使用 1 小时 K 线判断趋势波动5 分钟 K 线检测突发波动
Args:
symbol: 交易对
data: 多周期K线数据
Returns:
(should_analyze, reason, volatility_percent)
should_analyze: 是否应该进行分析
reason: 原因说明
volatility_percent: 1小时波动率百分比
"""
# 检查是否启用波动率过滤
if not self.settings.crypto_volatility_filter_enabled:
return True, "波动率过滤未启用", 0
try:
# 1. 首先检查 1 小时趋势波动率
df_1h = data.get('1h')
if df_1h is None or len(df_1h) < 20:
# 数据不足,保守起见允许分析
return True, "1小时数据不足允许分析", 0
# 获取最近20根K线
recent_1h = df_1h.iloc[-20:]
# 计算最高价和最低价
high = recent_1h['high'].max()
low = recent_1h['low'].min()
current_price = float(recent_1h.iloc[-1]['close'])
# 计算1小时波动率
if low > 0:
volatility_1h_percent = ((high - low) / low) * 100
else:
volatility_1h_percent = 0
# 计算价格变化范围(相对于当前价格)
price_range_high_percent = ((high - current_price) / current_price) * 100 if current_price > 0 else 0
price_range_low_percent = ((current_price - low) / current_price) * 100 if current_price > 0 else 0
# 从配置读取阈值
min_volatility = self.settings.crypto_min_volatility_percent
min_price_range = self.settings.crypto_min_price_range_percent
# 如果1小时波动率足够大直接允许分析
if volatility_1h_percent >= min_volatility or price_range_high_percent >= min_price_range or price_range_low_percent >= min_price_range:
return True, f"1小时趋势活跃 (波动率 {volatility_1h_percent:.2f}%),值得分析", volatility_1h_percent
# 2. 1小时波动率较低检查5分钟突发波动
df_5m = data.get('5m')
if df_5m is not None and len(df_5m) >= 3:
# 获取最近3根5分钟K线15分钟内的变化
recent_5m = df_5m.iloc[-3:]
# 计算5分钟价格变化幅度
price_start = float(recent_5m.iloc[0]['close'])
price_end = float(recent_5m.iloc[-1]['close'])
if price_start > 0:
price_change_5m = abs(price_end - price_start) / price_start * 100
else:
price_change_5m = 0
# 从配置读取5分钟突发波动阈值
surge_threshold = self.settings.crypto_5m_surge_threshold
logger.debug(f"{symbol} 5分钟价格变化: {price_start:.2f} -> {price_end:.2f} = {price_change_5m:.2f}% (阈值: {surge_threshold}%)")
# 如果5分钟突发波动超过阈值仍然允许分析
if price_change_5m >= surge_threshold:
direction = "上涨" if price_end > price_start else "下跌"
return True, f"5分钟突发{direction} ({price_change_5m:.2f}% > {surge_threshold}%),强制分析", volatility_1h_percent
# 3. 波动率过低,跳过分析
reason = f"波动率过低 (1小时: {volatility_1h_percent:.2f}% < {min_volatility}%, 5分钟无突发波动),跳过分析"
logger.info(f"⏸️ {symbol} {reason}")
return False, reason, volatility_1h_percent
except Exception as e:
logger.warning(f"{symbol} 波动率检查失败: {e},允许分析")
return True, "波动率检查失败,允许分析", 0
async def analyze_symbol(self, symbol: str):
"""
分析单个交易对(新架构:市场分析 + 交易决策分离)
新架构流程:
1. 市场信号分析器分析市场(不包含仓位信息)
2. 交易决策器根据信号+仓位+账户状态做决策
3. 执行交易决策
Args:
symbol: 交易对,如 'BTCUSDT'
"""
try:
# 更新活动时间
monitor = get_system_monitor()
monitor.update_activity("crypto_agent")
logger.info(f"\n{'' * 50}")
logger.info(f"📊 {symbol} 分析开始")
logger.info(f"{'' * 50}")
# 1. 获取多周期数据
data = self.exchange.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
logger.warning(f"⚠️ {symbol} 数据不完整,跳过分析")
return
# 当前价格
current_price = float(data['5m'].iloc[-1]['close'])
price_change_24h = self._calculate_price_change(data['1h'])
logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})")
# 1.5. 波动率检查(节省 LLM 调用)
should_analyze, volatility_reason, volatility = self._check_volatility(symbol, data)
if not should_analyze:
logger.info(f"⏸️ {volatility_reason},跳过本次 LLM 分析")
return
# ============================================================
# 第一阶段:市场信号分析(不包含仓位信息)
# ============================================================
logger.info(f"\n🤖 【第一阶段:市场信号分析】")
# 获取上一轮的信号(用于上下文)
previous_signal = self.last_signals.get(symbol)
# 显示上一轮信号(如果有)
if previous_signal:
prev_time = previous_signal.get('timestamp', 'Unknown')
prev_trend = previous_signal.get('trend', 'Unknown')
prev_signals = previous_signal.get('signals', [])
logger.info(f"📋 上一轮分析时间: {prev_time}")
logger.info(f"📋 上一轮趋势: {prev_trend}")
if prev_signals:
for sig in prev_signals:
action = sig.get('action', 'N/A')
confidence = sig.get('confidence', 0)
timeframe = sig.get('timeframe', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(timeframe, timeframe)
logger.info(f"📋 上一轮信号: {type_text} | {action} | {confidence}%")
else:
logger.info(f"📋 上一轮信号: 无交易信号(观望)")
else:
logger.info(f"📋 上一轮信号: 无历史记录(首次分析)")
market_signal = await self.market_analyzer.analyze(
symbol, data,
symbols=self.symbols,
previous_signal=previous_signal
)
# 输出市场分析结果
self._log_market_signal(market_signal)
# 存储最新信号(用于下一轮分析的上下文)
self.last_signals[symbol] = {
'timestamp': datetime.now().isoformat(),
'trend': market_signal.get('trend', 'unknown'),
'trend_strength': market_signal.get('trend_strength', 'unknown'),
'signals': market_signal.get('signals', []),
'key_levels': market_signal.get('key_levels', {}),
'current_price': current_price
}
# 过滤掉 wait 信号,只保留 buy/sell 信号
signals = market_signal.get('signals', [])
trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']]
if not trade_signals:
logger.info(f"\n⏸️ 结论: 无交易信号(仅有观望建议),继续观望")
return
# 检查是否有达到阈值的交易信号
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
valid_signals = [s for s in trade_signals if s.get('confidence', 0) >= threshold]
if not valid_signals:
logger.info(f"\n⏸️ 结论: 无交易信号达到置信度阈值 ({threshold}%),继续观望")
return
logger.info(f"\n✅ 发现 {len(valid_signals)} 个有效交易信号(达到 {threshold}% 阈值)")
# ============================================================
# 发送市场信号通知(独立于交易决策)
# ============================================================
await self._send_market_signal_notification(market_signal, current_price)
# ============================================================
# 第二阶段:各平台独立处理交易信号(基于硬编码规则)
# ============================================================
logger.info(f"\n🤖 【第二阶段:各平台独立处理信号】")
# 使用第一个有效信号
main_signal = valid_signals[0]
signal_action = main_signal.get('action') # buy/sell
# 构建标准信号格式
trading_signal = {
'symbol': symbol,
'action': signal_action,
'confidence': main_signal.get('confidence', 50),
'entry_price': main_signal.get('entry_price', current_price),
'stop_loss': main_signal.get('stop_loss'),
'take_profit': main_signal.get('take_profit'),
'reasoning': main_signal.get('reasoning', ''),
}
logger.info(f" 信号: {signal_action} {symbol} @ ${trading_signal['entry_price']:.2f} (置信度 {trading_signal['confidence']}%)")
# 2.1 模拟盘处理
if self.settings.paper_trading_enabled:
logger.info(f"\n📊 【模拟盘】")
paper_positions, paper_account, paper_pending = self._get_paper_trading_state()
paper_decision = self.execute_signal_with_rules(
trading_signal, 'PaperTrading', paper_account, paper_positions, paper_pending
)
await self._send_trading_decision_notification(
paper_decision, market_signal, current_price, prefix="[模拟盘]"
)
else:
paper_decision = {"action": "IGNORE", "reason": "未启用"}
logger.info(f"⏸️ 模拟盘交易未启用")
# 2.2 Hyperliquid 实盘处理
if self.hyperliquid:
logger.info(f"\n🔥 【Hyperliquid】")
hl_positions, hl_account, hl_pending = self._get_hyperliquid_trading_state()
hl_decision = self.execute_signal_with_rules(
trading_signal, 'Hyperliquid', hl_account, hl_positions, hl_pending
)
await self._send_trading_decision_notification(
hl_decision, market_signal, current_price, prefix="[Hyperliquid]"
)
else:
hl_decision = {"action": "IGNORE", "reason": "未启用"}
logger.info(f"⏸️ Hyperliquid 实盘交易未启用")
# 2.3 Bitget 实盘处理
if self.bitget:
logger.info(f"\n🔥 【Bitget】")
bg_positions, bg_account, bg_pending = self._get_bitget_trading_state()
bg_decision = self.execute_signal_with_rules(
trading_signal, 'Bitget', bg_account, bg_positions, bg_pending
)
await self._send_trading_decision_notification(
bg_decision, market_signal, current_price, prefix="[Bitget]"
)
else:
bg_decision = {"action": "IGNORE", "reason": "未启用"}
logger.info(f"⏸️ Bitget 实盘交易未启用")
# ============================================================
# 第三阶段:执行交易决策(各平台独立)
# ============================================================
await self._execute_decisions(paper_decision, hl_decision, bg_decision, market_signal, current_price)
except Exception as e:
logger.error(f"❌ 分析 {symbol} 出错: {e}")
import traceback
logger.error(traceback.format_exc())
def _log_market_signal(self, signal: Dict[str, Any]):
"""输出市场信号分析结果"""
logger.info(f" 市场状态: {signal.get('market_state')}")
logger.info(f" 趋势: {signal.get('trend')}")
# 新闻情绪
news_sentiment = signal.get('news_sentiment', '')
if news_sentiment:
sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': ''}.get(news_sentiment, '')
logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}")
# 关键价位
import re
key_levels = signal.get('key_levels', {})
if key_levels.get('support') or key_levels.get('resistance'):
# 从字符串中提取数字(处理 "66065 (15m布林下轨)" 这种格式)
def extract_number(val):
if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
# 提取第一个数字
match = re.search(r'[\d,]+\.?\d*', val.replace(',', ''))
if match:
return float(match.group())
return None
supports = [extract_number(s) for s in key_levels.get('support', [])[:2]]
resistances = [extract_number(r) for r in key_levels.get('resistance', [])[:2]]
support_str = ', '.join([f"${s:,.2f}" for s in supports if s is not None])
resistance_str = ', '.join([f"${r:,.2f}" for r in resistances if r is not None])
logger.info(f" 支撑位: {support_str or '-'}")
logger.info(f" 阻力位: {resistance_str or '-'}")
# 信号列表 - 区分交易信号和观望建议
signals = signal.get('signals', [])
trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']]
wait_signals = [s for s in signals if s.get('action') == 'wait']
if trade_signals:
logger.info(f"\n🎯 【发现 {len(trade_signals)} 个交易信号】")
for i, sig in enumerate(trade_signals, 1):
signal_type = sig.get('timeframe', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(signal_type, signal_type)
action = sig.get('action', 'hold')
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'}
action_text = action_map.get(action, action)
confidence = sig.get('confidence', 0)
logger.info(f"\n [{i}] {type_text} | {action_text}")
logger.info(f" 信心度: {confidence}%")
logger.info(f" 入场: ${sig.get('entry_price', 'N/A')}")
logger.info(f" 止损: ${sig.get('stop_loss', 'N/A')}")
logger.info(f" 止盈: ${sig.get('take_profit', 'N/A')}")
logger.info(f" 理由: {sig.get('reasoning', 'N/A')}")
if wait_signals:
logger.info(f"\n📋 【{len(wait_signals)} 个观望建议(不触发交易)】")
for i, sig in enumerate(wait_signals, 1):
signal_type = sig.get('timeframe', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(signal_type, signal_type)
confidence = sig.get('confidence', 0)
logger.info(f"\n [{i}] {type_text} | 观望")
logger.info(f" 信心度: {confidence}%")
logger.info(f" 理由: {sig.get('reasoning', 'N/A')}")
def _log_trading_decision(self, decision: Dict[str, Any]):
"""输出交易决策结果"""
decision_type = decision.get('decision', 'HOLD')
decision_map = {
'OPEN': '🟢 开仓',
'CLOSE': '🔴 平仓',
'ADD': ' 加仓',
'REDUCE': ' 减仓',
'HOLD': '⏸️ 观望'
}
logger.info(f" 决策: {decision_map.get(decision_type, decision_type)}")
logger.info(f" 动作: {decision.get('action', 'N/A')}")
logger.info(f" 仓位: {decision.get('position_size', 'N/A')}")
# quantity 是保证金,显示持仓价值 = 保证金 × 杠杆
quantity = decision.get('quantity', 0)
if isinstance(quantity, (int, float)) and quantity > 0:
leverage = self.paper_trading.leverage # 使用实际的杠杆配置
position_value = quantity * leverage
logger.info(f" 持仓价值: ${position_value:,.2f} (保证金 ${quantity:.2f})")
else:
logger.info(f" 数量: ${decision.get('quantity', 'N/A')}")
if decision.get('stop_loss'):
logger.info(f" 止损: ${decision.get('stop_loss')}")
if decision.get('take_profit'):
logger.info(f" 止盈: ${decision.get('take_profit')}")
logger.info(f" 理由: {decision.get('reasoning', 'N/A')}")
risk = decision.get('risk_analysis', '')
if risk:
logger.info(f" 风险: {risk}")
def _get_paper_trading_state(self) -> tuple:
"""
获取模拟盘交易状态(持仓和账户)
Returns:
(positions, account, pending_orders) - 持仓列表、账户状态、挂单列表
"""
# 模拟交易
active_orders = self.paper_trading.get_active_orders()
account = self.paper_trading.get_account_status()
# 分离持仓和挂单
position_list = []
pending_orders = []
for order in active_orders:
if order.get('status') == 'open' and order.get('filled_price'):
# 已成交的订单作为持仓
position_list.append({
'symbol': order.get('symbol'),
'side': order.get('side'),
'holding': order.get('quantity', 0),
'entry_price': order.get('filled_price') or order.get('entry_price'),
'stop_loss': order.get('stop_loss'),
'take_profit': order.get('take_profit')
})
elif order.get('status') == 'pending':
# 未成交的订单作为挂单
pending_orders.append({
'order_id': order.get('order_id'),
'symbol': order.get('symbol'),
'side': order.get('side'),
'entry_price': order.get('entry_price'),
'quantity': order.get('quantity', 0),
'entry_type': order.get('entry_type', 'market'),
'confidence': order.get('confidence', 0)
})
return position_list, account, pending_orders
def _get_hyperliquid_trading_state(self) -> tuple:
"""
获取 Hyperliquid 实盘交易状态(持仓和账户)
Returns:
(positions, account, pending_orders) - 持仓列表、账户状态、挂单列表
"""
try:
# 获取账户状态
hl_state = self.hyperliquid.get_account_state()
# 转换持仓格式
position_list = []
for pos in hl_state["positions"]:
position_data = pos.get("position", {})
coin = position_data.get("coin")
size = float(position_data.get("szi", 0))
if size != 0:
entry_price = float(position_data.get("entryPx", 0))
unrealized_pnl = float(position_data.get("unrealizedPnl", 0))
# 获取止盈止损价格(从挂单中查询)
tp_sl_prices = self.hyperliquid.get_tp_sl_prices(coin)
position_list.append({
'symbol': f"{coin}USDT", # BTC → BTCUSDT
'side': 'buy' if size > 0 else 'sell',
'holding': abs(size),
'entry_price': entry_price,
'unrealized_pnl': unrealized_pnl,
'stop_loss': tp_sl_prices.get('stop_loss'),
'take_profit': tp_sl_prices.get('take_profit')
})
# 转换账户格式(匹配模拟盘格式)
account = {
'current_balance': hl_state["account_value"],
'initial_balance': self.hyperliquid.initial_balance,
'used_margin': hl_state["total_margin_used"],
'available_balance': hl_state["available_balance"],
'available': hl_state["available_balance"], # 决策器期望的键名
'total_position_value': sum(abs(float(p.get("position", {}).get("szi", 0)) *
float(p.get("position", {}).get("entryPx", 0)))
for p in hl_state["positions"]),
'max_total_leverage': self.hyperliquid.max_total_leverage
}
# 计算当前总杠杆
if account['current_balance'] > 0:
account['current_total_leverage'] = account['total_position_value'] / account['current_balance']
else:
account['current_total_leverage'] = 0
# 获取挂单(包括止盈止损)
all_orders = self.hyperliquid.get_open_orders()
pending_orders = []
for order in all_orders:
pending_orders.append({
'order_id': order.get('order_id'),
'symbol': f"{order['symbol']}USDT", # 转换格式
'side': 'buy' if order.get('side') == 'B' else 'sell',
'entry_price': order.get('price'),
'quantity': order.get('size'),
'entry_type': 'limit',
'is_reduce_only': order.get('is_reduce_only', False)
})
return position_list, account, pending_orders
except Exception as e:
logger.error(f"获取 Hyperliquid 状态失败: {e}")
return [], {}, []
async def _execute_decisions(self, paper_decision: Dict[str, Any],
hyperliquid_decision: Dict[str, Any],
bitget_decision: Dict[str, Any],
market_signal: Dict[str, Any], current_price: float):
"""执行交易决策(三轨独立)"""
# 选择最佳信号用于保存
best_signal = self._get_best_signal_from_market(market_signal)
# 保存信号到数据库(只保存一次)
if best_signal:
signal_to_save = best_signal.copy()
signal_to_save['signal_type'] = 'crypto'
signal_to_save['symbol'] = market_signal.get('symbol')
signal_to_save['current_price'] = current_price
self.signal_db.add_signal(signal_to_save)
# ============================================================
# 执行模拟盘决策
# ============================================================
if paper_decision:
await self._execute_paper_decisions(paper_decision, market_signal, current_price)
# ============================================================
# 执行 Hyperliquid 决策
# ============================================================
if hyperliquid_decision and self.hyperliquid:
await self._execute_hyperliquid_decisions(hyperliquid_decision, market_signal, current_price)
# ============================================================
# 执行 Bitget 决策
# ============================================================
if bitget_decision and self.bitget:
await self._execute_bitget_decisions(bitget_decision, market_signal, current_price)
async def _execute_paper_decisions(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float):
"""执行模拟盘决策(使用执行器)"""
try:
decision_type = decision.get('decision', 'HOLD')
if decision_type == 'HOLD':
reasoning = decision.get('reasoning', decision.get('reason', '观望'))
logger.info(f"\n📊 交易决策: {reasoning}")
return
logger.info(f"\n📊 【执行交易】")
# 使用执行器
executor = self.executors.get('PaperTrading')
if not executor:
logger.error(f" ❌ 模拟盘执行器未初始化")
return
# 执行开仓/加仓
if decision_type in ['OPEN', 'ADD']:
result = await executor.execute_open(decision, current_price)
if result.get('success'):
order_id = result.get('order_id', 'unknown')
logger.info(f" ✅ 交易成功: 订单ID {order_id}")
await self._send_signal_notification(market_signal, decision, current_price)
# TP/SL 警告
if result.get('tp_sl_warning'):
logger.warning(f" ⚠️ 止盈止损设置失败: {result['tp_sl_warning']}")
else:
error = result.get('error', result.get('message', '未知错误'))
logger.error(f" ❌ 交易失败: {error}")
await self._notify_execution_failure(market_signal, decision, error, prefix="[模拟盘]")
# 执行平仓
elif decision_type == 'CLOSE':
result = await executor.execute_close(decision, current_price)
if result.get('success'):
logger.info(f" ✅ 平仓成功")
await self._send_signal_notification(market_signal, decision, current_price)
else:
error = result.get('error', '平仓失败')
logger.error(f" ❌ 平仓失败: {error}")
# 执行撤单
elif decision_type == 'CANCEL_PENDING':
orders_to_cancel = decision.get('orders_to_cancel', [])
success_count = 0
for order_info in orders_to_cancel:
order_id = order_info if isinstance(order_info, str) else order_info.get('order_id', '')
symbol = decision.get('symbol', '')
result = await executor.execute_cancel(order_id, symbol)
if result.get('success'):
success_count += 1
if success_count > 0:
logger.info(f" ✅ 成功取消 {success_count} 个挂单")
await self._send_signal_notification(market_signal, decision, current_price)
else:
logger.warning(f" ⚠️ 没有成功取消任何挂单")
except Exception as e:
logger.error(f" ❌ 模拟盘执行异常: {e}")
import traceback
logger.error(traceback.format_exc())
def _get_best_signal_from_market(self, market_signal: Dict[str, Any]) -> Dict[str, Any]:
"""从市场信号中获取最佳信号"""
signals = market_signal.get('signals', [])
if not signals:
return {}
# 按信心度排序,取最高的
sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True)
return sorted_signals[0]
async def _send_market_signal_notification(self, market_signal: Dict[str, Any],
current_price: float):
"""发送市场信号通知(第一阶段)- 调用前已确保有有效信号"""
try:
# 获取配置的阈值
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
# 过滤达到阈值的信号(防御性检查)
signals = market_signal.get('signals', [])
valid_signals = [s for s in signals if s.get('confidence', 0) >= threshold]
if not valid_signals:
return
# 取最佳信号(按信心度排序)
best_signal = sorted(valid_signals, key=lambda x: x.get('confidence', 0), reverse=True)[0]
# 构建通知消息 - 完全匹配旧格式
symbol = market_signal.get('symbol')
market_state = market_signal.get('market_state')
trend = market_signal.get('trend')
# 注意:经过 market_signal_analyzer 解析后
# - 'action' 是 buy/sell/wait (交易方向)
# - 'timeframe' 是 short_term/medium_term/long_term (周期)
sig_action = best_signal.get('action', 'hold') # buy/sell/wait
timeframe = best_signal.get('timeframe', 'unknown') # short_term/medium_term/long_term
confidence = best_signal.get('confidence', 0)
entry_val = best_signal.get('entry_price', 'N/A')
sl_val = best_signal.get('stop_loss', 'N/A')
tp_val = best_signal.get('take_profit', 'N/A')
reasoning = best_signal.get('reasoning', '')
# 格式化价格用于显示(处理 float 和 N/A
def format_price(price_value):
if price_value is None or price_value == 'N/A':
return 'N/A'
if isinstance(price_value, float):
return f"{price_value:,.2f}"
return str(price_value)
entry = format_price(entry_val)
sl = format_price(sl_val)
tp = format_price(tp_val)
# 类型映射
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
timeframe_text = type_map.get(timeframe, timeframe)
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空', 'hold': ' 观望'}
action_text = action_map.get(sig_action, sig_action)
# 入场类型 - 从信号中获取
entry_type = best_signal.get('entry_type', 'market')
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
entry_type_icon = '' if entry_type == 'market' else ''
# 等级(基于信心度映射)- 与 market_signal_analyzer.py 保持一致
# A级(80-100): 量价配合 + 多指标共振 + 多周期确认
# B级(60-79): 量价配合 + 主要指标确认
# C级(40-59): 有机会但量价不够理想
# D级(<40): 量价背离或信号矛盾
if confidence >= 80:
grade = 'A'
grade_icon = '⭐⭐⭐'
elif confidence >= 60:
grade = 'B'
grade_icon = '⭐⭐'
elif confidence >= 40:
grade = 'C'
grade_icon = ''
else:
grade = 'D'
grade_icon = ''
# 仓位(基于信心度和杠杆空间)- 与新的等级阈值对齐
if confidence >= 80: # A级信号
position_size = 'heavy'
position_icon = '🔥'
position_text = '重仓'
elif confidence >= 60: # B级信号
position_size = 'medium'
position_icon = '📊'
position_text = '中仓'
else: # C级或D级信号
position_size = 'light'
position_icon = '🌱'
position_text = '轻仓'
# 计算止损止盈百分比(价格已经是 float
try:
# 使用当前价格作为入场价(如果 entry_price 是 N/A
entry_for_calc = entry_val if isinstance(entry_val, (int, float)) else current_price
if isinstance(sl_val, (int, float)) and isinstance(entry_for_calc, (int, float)) and entry_for_calc > 0:
if sig_action == 'buy':
sl_percent = ((sl_val - entry_for_calc) / entry_for_calc * 100)
else:
sl_percent = ((entry_for_calc - sl_val) / entry_for_calc * 100)
sl_display = f"{sl_percent:+.1f}%"
else:
sl_display = "N/A"
if isinstance(tp_val, (int, float)) and isinstance(entry_for_calc, (int, float)) and entry_for_calc > 0:
if sig_action == 'buy':
tp_percent = ((tp_val - entry_for_calc) / entry_for_calc * 100)
else:
tp_percent = ((entry_for_calc - tp_val) / entry_for_calc * 100)
tp_display = f"{tp_percent:+.1f}%"
else:
tp_display = "N/A"
except:
sl_display = "N/A"
tp_display = "N/A"
# 构建卡片标题和颜色 - 添加 [信号] 前缀区分
if sig_action == 'buy':
title = f"[信号] 🟢 {symbol} {timeframe_text}做多信号"
color = "green"
else:
title = f"[信号] 🔴 {symbol} {timeframe_text}做空信号"
color = "red"
# 构建卡片内容
content_parts = [
f"**{timeframe_text}** | **{grade}**{grade_icon} | **{confidence}%** 置信度",
f"{entry_type_icon} **入场**: {entry_type_text} | {position_icon} **仓位**: {position_text}",
f"",
]
# 入场价格显示
if entry_type == 'limit':
# 限价订单:显示建议挂单价格和当前价格
content_parts.append(f"📋 **挂单价格**: ${entry}")
content_parts.append(f"📍 **当前价格**: ${format_price(current_price)}")
else:
# 市价订单:显示当前价格作为入场价
content_parts.append(f"💰 **入场价**: ${entry}")
if sl != 'N/A':
content_parts.append(f"🛑 **止损价**: ${sl} ({sl_display})")
if tp != 'N/A':
content_parts.append(f"🎯 **止盈价**: ${tp} ({tp_display})")
content_parts.append(f"")
content_parts.append(f"📝 **分析理由**:")
# HTML转义reasoning避免特殊字符破坏HTML格式
import html
escaped_reasoning = html.escape(reasoning) if reasoning else reasoning
content_parts.append(f"{escaped_reasoning}")
content = "\n".join(content_parts)
# 根据配置发送通知 - [信号] 发送到 crypto webhook
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, color)
if self.settings.telegram_enabled:
# Telegram 使用文本格式
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
# 钉钉使用 ActionCard 格式
await self.dingtalk.send_action_card(title, content)
logger.info(f" 📤 已发送市场信号通知 (阈值: {threshold}%)")
except Exception as e:
logger.warning(f"发送市场信号通知失败: {e}")
import traceback
logger.debug(traceback.format_exc())
async def _send_trading_decision_notification(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float,
prefix: str = ""):
"""发送交易决策通知(第二阶段)"""
try:
decision_type = decision.get('decision', 'HOLD')
symbol = market_signal.get('symbol')
# 账户类型标识
account_type = f"{prefix} 📊 交易" if prefix else "📊 交易"
# 决策类型映射
decision_map = {
'OPEN': '开仓',
'CLOSE': '平仓',
'ADD': '加仓',
'REDUCE': '减仓',
'CANCEL_PENDING': '取消挂单',
'HOLD': '观望'
}
decision_text = decision_map.get(decision_type, decision_type)
# 根据决策类型设置颜色
color_map = {
'OPEN': 'green',
'ADD': 'green',
'CLOSE': 'orange',
'REDUCE': 'orange',
'CANCEL_PENDING': 'red',
'HOLD': 'gray'
}
color = color_map.get(decision_type, 'blue')
# 构建标题 - 添加 [决策] 前缀区分
title = f"[决策] {account_type} {symbol} 交易决策: {decision_text}"
# 获取最佳信号用于显示
best_signal = self._get_best_signal_from_market(market_signal)
signal_confidence = best_signal.get('confidence', 0) if best_signal else 0
signal_action = best_signal.get('action', '') if best_signal else ''
# 方向图标
if 'buy' in signal_action.lower() or 'long' in signal_action.lower():
action_icon = '🟢'
action_text = '做多'
elif 'sell' in signal_action.lower() or 'short' in signal_action.lower():
action_icon = '🔴'
action_text = '做空'
else:
action_icon = ''
action_text = '观望'
# 构建内容
content_parts = [
f"{action_icon} **市场信号**: {action_text} | 信心度: {signal_confidence}%",
f"",
f"🎯 **交易决策**: {decision_text}",
f"",
]
# 添加决策详情
if decision_type != 'HOLD':
reasoning = decision.get('reasoning', '')
risk_analysis = decision.get('risk_analysis', '')
position_size = decision.get('position_size', 'N/A')
# 仓位图标
position_map = {'heavy': '🔥 重仓', 'medium': '📊 中仓', 'light': '🌱 轻仓', 'micro': '🌱 微仓'}
position_display = position_map.get(position_size, position_size)
# HTML转义避免特殊字符破坏HTML格式
import html
escaped_reasoning = html.escape(reasoning) if reasoning else ''
escaped_risk = html.escape(risk_analysis) if risk_analysis else ''
content_parts.extend([
f"📊 **仓位**: {position_display}",
f"💭 **决策理由**: {escaped_reasoning}",
])
if escaped_risk:
content_parts.append(f"⚠️ **风险**: {escaped_risk}")
# 添加价格信息(如果有)
quantity = decision.get('quantity', 0)
if isinstance(quantity, (int, float)) and quantity > 0:
leverage = self.paper_trading.leverage # 使用实际的杠杆配置
position_value = quantity * leverage
content_parts.append(f"💰 **持仓价值**: ${position_value:,.2f} (保证金 ${quantity:.2f})")
stop_loss = decision.get('stop_loss')
take_profit = decision.get('take_profit')
if stop_loss:
content_parts.append(f"🛑 **止损**: ${stop_loss}")
if take_profit:
content_parts.append(f"🎯 **止盈**: ${take_profit}")
# 取消挂单时显示要取消的订单
if decision_type == 'CANCEL_PENDING':
orders_to_cancel = decision.get('orders_to_cancel', [])
if orders_to_cancel:
content_parts.append(f"🚫 **取消订单**: {len(orders_to_cancel)}")
for order_id in orders_to_cancel[:3]: # 最多显示3个
content_parts.append(f" - {order_id}")
if len(orders_to_cancel) > 3:
content_parts.append(f" - ... 还有 {len(orders_to_cancel) - 3}")
else:
# HOLD 决策
reasoning = decision.get('reasoning', '综合评估后选择观望')
content_parts.append(f"💭 **理由**: {reasoning}")
content_parts.append("")
content_parts.append(f"⏰ 当前价格: ${current_price:,.2f}")
content = "\n".join(content_parts)
# 发送通知 - [决策] 发送到 paper_trading webhooktrading
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, color)
if self.settings.telegram_enabled:
# Telegram 使用文本格式
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f" 📤 已发送交易决策通知: {decision_text}")
except Exception as e:
logger.warning(f"发送交易决策通知失败: {e}")
import traceback
logger.debug(traceback.format_exc())
async def _send_signal_notification(self, market_signal: Dict[str, Any],
decision: Dict[str, Any], current_price: float,
prefix: str = "", hl_order_status: str = None):
"""发送交易执行通知(第三阶段)
hl_order_status: Hyperliquid 限价单实际状态 'resting'|'filled'|None
"""
try:
decision_type = decision.get('decision', 'HOLD')
# 只在非观望决策时发送执行通知
if decision_type == 'HOLD':
return
# 构建消息 - 使用旧格式风格
symbol = market_signal.get('symbol')
# 添加前缀到标题
title_prefix = f"{prefix} " if prefix else ""
action = decision.get('action', '')
reasoning = decision.get('reasoning', '')
risk_analysis = decision.get('risk_analysis', '')
position_size = decision.get('position_size', 'N/A')
quantity = decision.get('quantity', 'N/A')
stop_loss = decision.get('stop_loss', '')
take_profit = decision.get('take_profit', '')
# confidence 优先从决策本身读取,否则从市场信号的最佳信号读取
confidence = decision.get('confidence')
if confidence is None:
_best = self._get_best_signal_from_market(market_signal)
confidence = _best.get('confidence', 0) if _best else 0
# 决策类型映射
decision_map = {
'OPEN': '开仓',
'CLOSE': '平仓',
'ADD': '加仓',
'REDUCE': '减仓'
}
decision_text = decision_map.get(decision_type, decision_type)
# 账户类型标识
account_type = "📊"
# 方向图标
if 'long' in action.lower() or 'buy' in action.lower():
action_icon = '🟢'
action_text = '做多'
elif 'short' in action.lower() or 'sell' in action.lower():
action_icon = '🔴'
action_text = '做空'
else:
action_icon = ''
action_text = action
# 从市场信号中获取入场方式(需要在构建标题之前)
best_signal = self._get_best_signal_from_market(market_signal)
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
# 对 Hyperliquid 限价单:用实际订单状态决定显示
# resting=真的在挂单中, filled=已立即成交, None=非HL或市价单
if hl_order_status == 'resting':
entry_type_text = '挂单'
entry_type_icon = ''
elif hl_order_status == 'filled':
entry_type_text = '现价成交'
entry_type_icon = ''
else:
entry_type_text = '现价单' if entry_type == 'market' else '挂单'
entry_type_icon = '' if entry_type == 'market' else ''
# 仓位图标
position_map = {'heavy': '🔥 重仓', 'medium': '📊 中仓', 'light': '🌱 轻仓'}
position_display = position_map.get(position_size, '🌱 轻仓')
# 构建卡片标题Hyperliquid 限价单区分实际状态
if decision_type == 'OPEN':
if hl_order_status == 'resting':
decision_title = '挂单中'
elif hl_order_status == 'filled':
decision_title = '开仓(立即成交)'
else:
decision_title = '挂单' if entry_type == 'limit' else '开仓'
title = f"{title_prefix}[执行] {account_type} {symbol} {decision_title}"
color = "green"
elif decision_type == 'CLOSE':
decision_title = '挂单' if entry_type == 'limit' else '平仓'
title = f"{title_prefix}[执行] {account_type} {symbol} {decision_title}"
color = "orange"
elif decision_type == 'ADD':
if hl_order_status == 'resting':
decision_title = '加仓挂单中'
elif hl_order_status == 'filled':
decision_title = '加仓(立即成交)'
else:
decision_title = '挂单' if entry_type == 'limit' else '加仓'
title = f"{title_prefix}[执行] {account_type} {symbol} {decision_title}"
color = "green"
elif decision_type == 'REDUCE':
decision_title = '挂单' if entry_type == 'limit' else '减仓'
title = f"{title_prefix}[执行] {account_type} {symbol} {decision_title}"
color = "orange"
else:
title = f"{title_prefix}[执行] {account_type} {symbol} 交易执行"
color = "blue"
# 构建卡片内容
# quantity 是保证金金额,需要显示持仓价值 = 保证金 × 杠杆
margin = quantity if quantity != 'N/A' else 0
leverage = self.paper_trading.leverage # 使用实际的杠杆配置(而非硬编码 20
position_value = margin * leverage if isinstance(margin, (int, float)) else 'N/A'
position_value_display = f"${position_value:,.2f}" if isinstance(position_value, (int, float)) else "N/A"
# 根据入场方式显示不同的价格信息
if entry_type == 'market':
price_display = f"💵 **入场价**: ${current_price:,.2f} (现价)"
else:
entry_price = best_signal.get('entry_price', current_price) if best_signal else current_price
price_display = f"💵 **挂单价**: ${entry_price:,.2f} (等待)"
content_parts = [
f"{action_icon} **操作**: {decision_text} ({action_text})",
f"{entry_type_icon} **入场方式**: {entry_type_text}",
f"{position_display.replace(' ', ': **')} | 📈 信心度: **{confidence}%**",
f"",
f"💰 **持仓价值**: {position_value_display}",
price_display,
]
if stop_loss:
content_parts.append(f"🛑 **止损价**: ${stop_loss}")
if take_profit:
content_parts.append(f"🎯 **止盈价**: ${take_profit}")
# 简洁的决策理由和风险分析各1句话
content_parts.append(f"")
content_parts.append(f"📝 **决策**: {reasoning}")
if risk_analysis:
content_parts.append(f"⚠️ **风险**: {risk_analysis}")
content_parts.append(f"")
content_parts.append(f"💼 交易已执行")
content = "\n".join(content_parts)
# 根据配置发送通知 - 所有订单执行都发送到 paper trading webhook
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, color)
if self.settings.telegram_enabled:
# Telegram 使用文本格式
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f" 📤 已发送交易执行通知: {decision_text}")
except Exception as e:
logger.warning(f"发送交易执行通知失败: {e}")
async def _notify_execution_failure(self, market_signal: Dict[str, Any],
decision: Dict[str, Any], reason: str,
prefix: str = ""):
"""发送执行失败通知(决策给出了 OPEN/ADD 但实际未能开仓)"""
try:
symbol = market_signal.get('symbol', '')
decision_type = decision.get('decision', 'OPEN')
action = decision.get('action', '')
title_prefix = f"{prefix} " if prefix else ""
action_text = "做多" if 'buy' in action.lower() else ("做空" if 'sell' in action.lower() else action)
decision_text = {'OPEN': '开仓', 'ADD': '加仓'}.get(decision_type, decision_type)
title = f"{title_prefix}⚠️ {symbol} {decision_text}未执行"
content = "\n".join([
f"🔴 **决策**: {decision_text}{action_text}",
f"❌ **未执行原因**: {reason}",
f"🕐 **时间**: {datetime.now().strftime('%H:%M:%S')}",
])
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "red")
if self.settings.telegram_enabled:
await self.telegram.send_message(f"{title}\n\n{content}")
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f" 📤 已发送执行失败通知: {reason}")
except Exception as e:
logger.warning(f"发送执行失败通知失败: {e}")
async def _execute_paper_trade(self, decision: Dict[str, Any], market_signal: Dict[str, Any], current_price: float):
"""执行模拟交易"""
try:
symbol = decision.get('symbol')
action = decision.get('action', '')
position_size = decision.get('position_size', 'light')
# 使用新的动态仓位计算方法
logger.info(f" 计算动态仓位: {position_size}")
margin, position_value = self.paper_trading._calculate_dynamic_position(position_size, symbol)
if margin <= 0:
logger.warning(f" ⚠️ 计算的保证金无效: {margin},无法开仓")
return False
quantity = margin # 保证金金额
logger.info(f" 准备创建订单: {symbol} {action} {position_size}")
logger.info(f" 保证金: ${quantity:.2f} | 持仓价值: ${position_value:.2f}")
# 转换决策的 action 为 paper_trading 期望的格式
trading_action = self._convert_trading_action(action)
# 从市场信号中获取入场方式和入场价格
best_signal = self._get_best_signal_from_market(market_signal)
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
entry_price = best_signal.get('entry_price', current_price) if best_signal else current_price
logger.info(f" 入场方式: {entry_type} | 入场价格: ${entry_price:,.2f}")
# 转换决策为订单格式(价格字段已在 LLM 解析时转换为 float
order_data = {
'symbol': symbol,
'action': trading_action, # 使用转换后的 action
'entry_type': entry_type, # 使用信号中的入场方式
'entry_price': entry_price if entry_type == 'limit' else current_price, # limit单使用entry_pricemarket单使用current_price
'stop_loss': decision.get('stop_loss'),
'take_profit': decision.get('take_profit'),
'confidence': decision.get('confidence', 50),
'signal_grade': 'B', # 默认B级
'position_size': position_size,
'quantity': quantity # 使用计算后的保证金金额
}
logger.debug(f" 订单数据: {order_data}")
logger.info(f" 正在调用 create_order_from_signal...")
result = self.paper_trading.create_order_from_signal(order_data, current_price)
logger.info(f" create_order_from_signal 返回结果: {result}")
# 记录订单
order = result.get('order')
if order:
# quantity 是保证金金额,持仓价值 = 保证金 × 杠杆
leverage = self.paper_trading.leverage # 使用实际的杠杆配置
position_value = quantity * leverage
logger.info(f" ✅ 已创建订单: {order.order_id} | 仓位: {position_size} | 持仓价值: ${position_value:.2f}")
logger.info(f" 订单状态: {order.status.value} | 入场价: ${order.entry_price:,.2f}")
else:
# 订单创建失败,记录详细原因
reason = result.get('message', '未知原因')
cancelled_info = result.get('cancelled_orders', [])
logger.warning(f" ❌ 创建订单失败: {reason}")
if cancelled_info:
logger.warning(f" 已取消的反向订单: {len(cancelled_info)}")
# 返回结果
return result
except Exception as e:
logger.error(f"执行交易失败: {e}")
import traceback
logger.debug(traceback.format_exc())
def _convert_trading_action(self, action: str) -> str:
"""转换交易决策的 action 为 buy/sell 格式"""
if not action:
return 'buy'
action_lower = action.lower()
# 做多相关的 action
if 'long' in action_lower or 'buy' in action_lower:
return 'buy'
# 做空相关的 action
elif 'short' in action_lower or 'sell' in action_lower:
return 'sell'
# 默认返回 buy
return 'buy'
def _calculate_quantity_by_position_size(self, position_size: str, live_trading: bool = False) -> float:
"""根据仓位大小计算实际金额"""
if live_trading:
# 实盘交易配置
position_config = {
'heavy': self.settings.bitget_max_single_position,
'medium': self.settings.bitget_max_single_position * 0.6,
'light': self.settings.bitget_max_single_position * 0.3
}
else:
# 模拟交易配置
position_config = {
'heavy': self.settings.paper_trading_position_a, # 1000
'medium': self.settings.paper_trading_position_b, # 500
'light': self.settings.paper_trading_position_c # 200
}
return position_config.get(position_size, 200)
async def _execute_close(self, decision: Dict[str, Any], current_price: float) -> bool:
"""执行平仓
Args:
decision: 交易决策(应包含 orders_to_close 字段)
current_price: 当前价格
Returns:
是否成功执行平仓
"""
try:
symbol = decision.get('symbol')
orders_to_close = decision.get('orders_to_close', [])
if self.paper_trading:
logger.info(f" 🔒 平仓: {symbol}")
logger.info(f" 理由: {decision.get('reasoning', '')}")
# 如果决策中没有指定订单ID则获取该交易对的所有活跃订单
if not orders_to_close:
logger.warning(f" ⚠️ 决策中未指定 orders_to_close将平仓 {symbol} 的所有持仓")
active_orders = self.paper_trading.get_active_orders(symbol)
orders_to_close = [o.get('order_id') for o in active_orders if o.get('status') in ('OPEN', 'FILLED', 'PENDING')]
if not orders_to_close:
logger.warning(f" 没有找到需要平仓的订单")
return False
logger.info(f" 待平仓订单: {orders_to_close}")
closed_count = 0
for order_id in orders_to_close:
try:
# 先获取订单信息
order_info = self.paper_trading.get_order_by_id(order_id)
if not order_info:
logger.warning(f" ❌ 订单不存在: {order_id}")
continue
status = order_info.get('status')
if status == 'PENDING':
# 取消挂单
result = self.paper_trading.cancel_order(order_id)
if result.get('success'):
logger.info(f" ✅ 已取消挂单: {order_id}")
closed_count += 1
else:
logger.warning(f" ❌ 取消挂单失败: {order_id} - {result.get('message')}")
elif status in ('OPEN', 'FILLED'):
# 平仓已成交订单
result = self.paper_trading.close_order_manual(order_id, current_price)
if result:
logger.info(f" ✅ 已平仓: {order_id} @ ${current_price}")
closed_count += 1
else:
logger.warning(f" ❌ 平仓失败: {order_id}")
else:
logger.warning(f" ⚠️ 订单状态无需处理: {order_id} - {status}")
except Exception as e:
logger.error(f" ❌ 处理订单 {order_id} 失败: {e}")
logger.info(f" 📊 平仓汇总: {closed_count}/{len(orders_to_close)} 个订单已处理")
return closed_count > 0
else:
logger.warning(f" 交易服务未初始化")
return False
except Exception as e:
logger.error(f"执行平仓失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False
async def _execute_cancel_pending(self, decision: Dict[str, Any]) -> bool:
"""执行取消挂单
Args:
decision: 交易决策
Returns:
是否成功取消订单
"""
try:
symbol = decision.get('symbol')
decision_action = decision.get('action', '') # buy/sell
orders_to_cancel = decision.get('orders_to_cancel', [])
if not orders_to_cancel:
logger.info(f" ⚠️ 没有需要取消的订单")
return False
trading_service = self.paper_trading
if not trading_service:
logger.warning(f" 交易服务未启用")
return False
# 安全检查验证要取消的订单是否属于当前symbol且方向相反
active_orders = trading_service.get_active_orders()
valid_orders = []
invalid_orders = []
wrong_direction_orders = []
for order_id in orders_to_cancel:
# 查找订单
order = next((o for o in active_orders if o.get('order_id') == order_id), None)
if not order:
logger.warning(f" ⚠️ 订单不存在: {order_id}")
invalid_orders.append(order_id)
continue
# 检查订单是否属于当前symbol
if order.get('symbol') != symbol:
logger.error(f" ❌ 安全拦截:订单 {order_id} 属于 {order.get('symbol')},不是当前分析标的 {symbol}")
invalid_orders.append(order_id)
continue
# 检查订单方向是否与决策相反
order_side = order.get('side') # long/short
# 决策是buy时应该取消short做空决策是sell时应该取消long做多
should_cancel = False
if decision_action == 'buy' and order_side == 'short':
should_cancel = True
elif decision_action == 'sell' and order_side == 'long':
should_cancel = True
elif decision_action in ['open_long', 'close_short']:
should_cancel = (order_side == 'short')
elif decision_action in ['open_short', 'close_long']:
should_cancel = (order_side == 'long')
if not should_cancel:
logger.error(f" ❌ 安全拦截:订单 {order_id} 方向为 {order_side},与决策 {decision_action} 同向,不应取消!")
wrong_direction_orders.append(order_id)
invalid_orders.append(order_id)
continue
valid_orders.append(order_id)
if invalid_orders:
logger.error(f" 🚫 拒绝取消 {len(invalid_orders)} 个不符合条件的订单")
if wrong_direction_orders:
logger.error(f" ⚠️ {len(wrong_direction_orders)} 个同向订单被拦截(不应取消同向订单)")
if not valid_orders:
logger.warning(f" ⚠️ 没有有效的订单可以取消")
return False
logger.info(f" 🚫 取消挂单: {symbol}")
logger.info(f" 取消订单数量: {len(valid_orders)}")
cancelled_count = 0
for order_id in valid_orders:
try:
# 取消订单
result = trading_service.cancel_order(order_id)
if result.get('success'):
cancelled_count += 1
logger.info(f" ✅ 已取消订单: {order_id}")
else:
logger.warning(f" ⚠️ 取消订单失败: {order_id} | {result.get('message', '')}")
except Exception as e:
logger.error(f" ❌ 取消订单异常: {order_id} | {e}")
logger.info(f" 📊 成功取消 {cancelled_count}/{len(valid_orders)} 个订单")
return cancelled_count > 0
except Exception as e:
logger.error(f"执行取消挂单失败: {e}")
return False
async def _execute_update_pending(self, decision: Dict[str, Any]) -> bool:
"""执行更新挂单参数
Args:
decision: 交易决策
Returns:
是否成功更新订单
"""
try:
symbol = decision.get('symbol')
decision_action = decision.get('action', '') # buy/sell
orders_to_update = decision.get('orders_to_update', [])
# 获取新参数
new_entry_price = decision.get('entry_price')
new_stop_loss = decision.get('stop_loss')
new_take_profit = decision.get('take_profit')
if not orders_to_update:
logger.info(f" ⚠️ 没有需要更新的订单")
return False
if not all([new_entry_price, new_stop_loss, new_take_profit]):
logger.warning(f" ⚠️ 更新参数不完整: entry_price={new_entry_price}, stop_loss={new_stop_loss}, take_profit={new_take_profit}")
return False
trading_service = self.paper_trading
if not trading_service:
logger.warning(f" 交易服务未启用")
return False
# 安全检查验证要更新的订单是否属于当前symbol且方向相同
active_orders = trading_service.get_active_orders()
valid_orders = []
invalid_orders = []
wrong_direction_orders = []
for order_id in orders_to_update:
# 查找订单
order = next((o for o in active_orders if o.order_id == order_id), None)
if not order:
logger.warning(f" ⚠️ 订单不存在: {order_id}")
invalid_orders.append(order_id)
continue
# 检查订单是否已成交(只能更新未成交的挂单)
if order.filled_price and order.filled_price > 0:
logger.warning(f" ⚠️ 订单已成交,无法更新: {order_id}")
invalid_orders.append(order_id)
continue
# 检查订单是否属于当前symbol
if order.symbol != symbol:
logger.error(f" ❌ 安全拦截:订单 {order_id} 属于 {order.symbol},不是当前分析标的 {symbol}")
invalid_orders.append(order_id)
continue
# 检查订单方向是否与决策相同
order_side = order.side.value # LONG/SHORT
# 决策是buy时应该更新LONG做多决策是sell时应该更新SHORT做空
should_update = False
if decision_action == 'buy' and order_side == 'LONG':
should_update = True
elif decision_action == 'sell' and order_side == 'SHORT':
should_update = True
if not should_update:
logger.error(f" ❌ 安全拦截:订单 {order_id} 方向为 {order_side},与决策 {decision_action} 方向不一致")
wrong_direction_orders.append(order_id)
invalid_orders.append(order_id)
continue
valid_orders.append(order)
if invalid_orders:
logger.error(f" 🚫 拒绝更新 {len(invalid_orders)} 个不符合条件的订单")
if wrong_direction_orders:
logger.error(f" ⚠️ {len(wrong_direction_orders)} 个方向不一致的订单被拦截")
if not valid_orders:
logger.warning(f" ⚠️ 没有有效的订单可以更新")
return False
logger.info(f" 🔄 更新挂单: {symbol}")
logger.info(f" 新参数: 入场=${new_entry_price:,.2f}, 止损=${new_stop_loss:,.2f}, 止盈=${new_take_profit:,.2f}")
updated_count = 0
for order in valid_orders:
try:
# 更新订单参数
result = trading_service.update_order(
order.order_id,
entry_price=new_entry_price,
stop_loss=new_stop_loss,
take_profit=new_take_profit
)
if result and result.get('success'):
updated_count += 1
logger.info(f" ✅ 订单更新成功: {order.order_id}")
logger.info(f" 旧参数: 入场=${order.entry_price:,.2f}, 止损=${order.stop_loss:,.2f}, 止盈=${order.take_profit:,.2f}")
else:
logger.warning(f" ⚠️ 更新订单失败: {order.order_id} | {result.get('message', '')}")
except Exception as e:
logger.error(f" ❌ 更新订单异常: {order.order_id} | {e}")
logger.info(f" 📊 成功更新 {updated_count}/{len(valid_orders)} 个订单")
return updated_count > 0
except Exception as e:
logger.error(f"执行更新挂单失败: {e}")
return False
async def _execute_reduce(self, decision: Dict[str, Any]) -> bool:
"""执行减仓
Args:
decision: 交易决策
Returns:
是否成功执行减仓
"""
try:
symbol = decision.get('symbol')
logger.info(f" 📤 减仓: {symbol}")
logger.info(f" 理由: {decision.get('reasoning', '')}")
trading_service = self.paper_trading
if not trading_service:
logger.warning(f" 交易服务未初始化")
return False
# TODO: 实现减仓逻辑
# 减仓可以是部分平仓,需要根据决策中的参数执行
logger.info(f" ⚠️ 减仓功能待实现")
return False
except Exception as e:
logger.error(f"执行减仓失败: {e}")
return False
# ============================================================
# Hyperliquid 执行方法
# ============================================================
async def _notify_hyperliquid_error(self, symbol: str, operation: str, error: str):
"""发送 Hyperliquid 操作失败的飞书/钉钉/Telegram 通知"""
title = f"❌ Hyperliquid 操作失败 - {symbol}"
content = "\n".join([
f"🔴 **操作**: {operation}",
f"⚠️ **错误**: {error}",
f"🕐 **时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
])
logger.error(f"[Hyperliquid] {operation} 失败 | {symbol} | {error}")
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "red")
if self.settings.telegram_enabled:
await self.telegram.send_message(f"{title}\n\n{content}")
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
async def _notify_bitget_error(self, symbol: str, operation: str, error: str):
"""发送 Bitget 操作失败的飞书/钉钉/Telegram 通知"""
title = f"❌ Bitget 操作失败 - {symbol}"
content = "\n".join([
f"🔴 **操作**: {operation}",
f"⚠️ **错误**: {error}",
f"🕐 **时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
])
logger.error(f"[Bitget] {operation} 失败 | {symbol} | {error}")
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "red")
if self.settings.telegram_enabled:
await self.telegram.send_message(f"{title}\n\n{content}")
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
def _get_bitget_trading_state(self) -> tuple:
"""
获取 Bitget 实盘交易状态(持仓和账户)
Returns:
(positions, account, pending_orders)
"""
try:
bg_state = self.bitget.get_account_state()
position_list = []
for pos in self.bitget.get_open_positions():
coin = pos["coin"]
size = pos["size"]
if size != 0:
tp_sl = self.bitget.get_tp_sl_prices(coin)
position_list.append({
'symbol': f"{coin}USDT",
'side': 'buy' if size > 0 else 'sell',
'holding': abs(size),
'entry_price': pos["entry_price"],
'unrealized_pnl': pos["unrealized_pnl"],
'stop_loss': tp_sl.get('stop_loss'),
'take_profit': tp_sl.get('take_profit'),
})
total_position_value = sum(
p['holding'] * p['entry_price'] for p in position_list
)
account = {
'current_balance': bg_state["account_value"],
'initial_balance': self.bitget.initial_balance,
'used_margin': bg_state["total_margin_used"],
'available_balance': bg_state["available_balance"],
'available': bg_state["available_balance"], # 决策器期望的键名
'total_position_value': total_position_value,
'max_total_leverage': self.bitget.max_total_leverage,
}
if account['current_balance'] > 0:
account['current_total_leverage'] = total_position_value / account['current_balance']
else:
account['current_total_leverage'] = 0
all_orders = self.bitget.get_open_orders()
pending_orders = []
for order in all_orders:
pending_orders.append({
'order_id': order.get('order_id'),
'symbol': f"{order['symbol']}USDT",
'side': order.get('side', ''),
'entry_price': order.get('price'),
'quantity': order.get('size'),
'entry_type': 'limit',
'is_reduce_only': order.get('is_reduce_only', False),
})
return position_list, account, pending_orders
except Exception as e:
logger.error(f"获取 Bitget 状态失败: {e}")
return [], {}, []
def _calculate_position_size(self, signal: Dict[str, Any],
account: Dict[str, Any],
platform_name: str) -> tuple:
"""
根据可用保证金和信号强度计算仓位大小
Returns:
(margin, reason) - 保证金金额和原因
"""
# 基础保证金比例(超激进配置 - 最大化资金利用率)
confidence = signal.get('confidence', 50)
if confidence >= 90:
base_margin_pct = 0.20 # A级: 20% (重仓出击)
grade = 'A'
elif confidence >= 70:
base_margin_pct = 0.15 # B级: 15% (中仓跟进)
grade = 'B'
else:
base_margin_pct = 0.08 # C级: 8% (轻仓试探)
grade = 'C'
# 可用保证金
available = account.get('available', account.get('available_balance', 0))
balance = account.get('current_balance', 0)
if available <= 0 or balance <= 0:
return 0, "账户余额无效"
# 计算保证金
margin = available * base_margin_pct
# 应用平台规则
rules = self.PLATFORM_RULES.get(platform_name, {})
min_margin_rules = rules.get('min_margin', {})
max_margin_pct = rules.get('max_margin_pct', 0.1)
# 应用最小保证金限制
symbol = signal.get('symbol', '').replace('USDT', '').upper()
min_margin = min_margin_rules.get(symbol, 0)
if min_margin > 0 and margin < min_margin:
margin = min_margin
# 应用最大保证金限制
max_margin = balance * max_margin_pct
if margin > max_margin:
margin = max_margin
# 应用杠杆限制
current_leverage = account.get('current_total_leverage', 0)
max_leverage = account.get('max_total_leverage', 10)
remaining_leverage = max_leverage - current_leverage
if remaining_leverage <= 0:
return 0, f"已达最大杠杆 {current_leverage:.1f}x/{max_leverage}x"
max_margin_by_leverage = balance * remaining_leverage
if margin > max_margin_by_leverage:
margin = max_margin_by_leverage
# 确保不超过可用余额
if margin > available:
margin = available * 0.95 # 留 5% 余量
return round(margin, 2), f"信号{grade}级({confidence}%) → {base_margin_pct*100}%保证金"
def _handle_same_direction(self, signal: Dict[str, Any],
positions: List[Dict],
pending_orders: List[Dict]) -> tuple:
"""
处理同向订单(持仓和挂单)
Returns:
(action, reason) - 动作和原因
"""
symbol = signal.get('symbol')
signal_side = signal.get('action')
signal_price = signal.get('entry_price', 0)
# 检查同向持仓
same_positions = [p for p in positions
if p.get('symbol') == symbol and p.get('side') == signal_side]
if same_positions:
pos = same_positions[0]
pos_entry = pos.get('entry_price', 0)
price_diff_pct = abs(signal_price - pos_entry) / pos_entry * 100 if pos_entry > 0 else 0
pnl_pct = pos.get('unrealized_pnl_pct', 0)
# 规则1: 价格距离 >= 2% 且持仓盈利 >= 2% → 加仓
if price_diff_pct >= 2 and pnl_pct >= 2:
return "ADD", f"加仓:价格差{price_diff_pct:.1f}%,盈利{pnl_pct:.1f}%"
# 规则2: 价格距离 < 2% → 忽略
if price_diff_pct < 2:
return "IGNORE", f"同向持仓价格差{price_diff_pct:.1f}% < 2%,忽略"
# 规则3: 持仓亏损且新价格更优 → 滚仓
if pnl_pct < -1:
better_price = (signal_side == 'buy' and signal_price < pos_entry) or \
(signal_side == 'sell' and signal_price > pos_entry)
if better_price:
return "ROLL", f"滚仓:持仓亏损{pnl_pct:.1f}%,新价格更优"
# 规则4: 其他情况 → HOLD
return "HOLD", f"有同向持仓(盈利{pnl_pct:.1f}%),继续持有"
# 检查同向挂单
same_orders = [o for o in pending_orders
if o.get('symbol') == symbol and o.get('side') == signal_side]
if same_orders:
order = same_orders[0]
order_price = order.get('entry_price', 0)
price_diff_pct = abs(signal_price - order_price) / order_price * 100 if order_price > 0 else 0
# 规则5: 价格距离 < 2% → 忽略
if price_diff_pct < 2:
return "IGNORE", f"同向挂单价格差{price_diff_pct:.1f}% < 2%,忽略"
# 规则6: 价格距离 >= 2% 且挂单 < 3 → 可再挂一单
if len(same_orders) < 3:
return "OPEN", f"同向挂单价格差{price_diff_pct:.1f}% >= 2%,可开新单"
else:
return "IGNORE", "同向挂单已达3个忽略"
# 无同向订单 → 正常开仓
return "OPEN", "无同向订单,正常开仓"
def _handle_opposite_direction(self, signal: Dict[str, Any],
positions: List[Dict],
pending_orders: List[Dict]) -> tuple:
"""
处理反向订单(持仓和挂单)
Returns:
(action, reason) - 动作和原因
"""
symbol = signal.get('symbol')
signal_side = signal.get('action')
opposite_side = 'sell' if signal_side == 'buy' else 'buy'
confidence = signal.get('confidence', 0)
# 检查反向持仓
opposite_positions = [p for p in positions
if p.get('symbol') == symbol and p.get('side') == opposite_side]
if opposite_positions:
pos = opposite_positions[0]
pnl_pct = pos.get('unrealized_pnl_pct', 0)
order_id = pos.get('order_id', '')
# 规则1: 信号强度 >= 90 → 强制反转
if confidence >= 90:
return "FLIP", f"强信号({confidence}%),平反向持仓并开新仓"
# 规则2: 持仓亏损 >= 1% → 平仓
if pnl_pct <= -1:
return "CLOSE_OPPOSITE", f"反向持仓亏损{pnl_pct:.1f}%,平仓后开新仓"
# 规则3: 持仓盈利 → 等待
if pnl_pct > 0:
return "WAIT", f"反向持仓盈利{pnl_pct:.1f}%,等待信号确认或持仓平仓"
# 规则4: 小亏损 → 平仓
if -1 < pnl_pct < 0:
return "CLOSE_OPPOSITE", f"反向持仓小亏损{pnl_pct:.1f}%,平仓"
# 检查反向挂单
opposite_orders = [o for o in pending_orders
if o.get('symbol') == symbol and o.get('side') == opposite_side]
if opposite_orders:
# 规则5: 取消反向挂单后开仓
return "CANCEL_AND_OPEN", f"取消 {len(opposite_orders)} 个反向挂单后开新仓"
# 无反向订单 → 正常开仓
return "OPEN", "无反向订单,正常开仓"
def _check_risk_control(self, signal: Dict[str, Any],
account: Dict[str, Any],
positions: List[Dict],
pending_orders: List[Dict]) -> tuple:
"""
其他风控检查
Returns:
(passed, reason) - 是否通过和原因
"""
# 1. 杠杆限制检查
current_leverage = account.get('current_total_leverage', 0)
max_leverage = account.get('max_total_leverage', 10)
remaining_leverage = max_leverage - current_leverage
if remaining_leverage <= 0:
return False, f"已达最大杠杆 {current_leverage:.1f}x/{max_leverage}x"
# 2. 可用余额检查
available = account.get('available', account.get('available_balance', 0))
symbol = signal.get('symbol', '').replace('USDT', '').upper()
rules = self.PLATFORM_RULES.get('Bitget', {}) # 使用 Bitget 规则检查最小保证金
min_margin = rules.get('min_margin', {}).get(symbol, 10)
if available < min_margin:
return False, f"可用余额 ${available:.2f} < 最小保证金 ${min_margin}"
# 3. 持仓数量限制每个币种最多3个持仓+挂单)
symbol_orders = [o for o in positions + pending_orders if o.get('symbol') == signal.get('symbol')]
if len(symbol_orders) >= 3:
return False, f"{signal.get('symbol')} 持仓/挂单已达 {len(symbol_orders)}"
# 4. 盈亏比检查
entry = signal.get('entry_price', 0)
sl = signal.get('stop_loss')
tp = signal.get('take_profit')
if entry > 0 and sl and tp:
try:
sl = float(sl)
tp = float(tp)
if signal.get('action') == 'buy':
risk = entry - sl
reward = tp - entry
else:
risk = sl - entry
reward = entry - tp
if risk > 0:
risk_reward_ratio = reward / risk
if risk_reward_ratio < 1.2:
return False, f"盈亏比 {risk_reward_ratio:.2f} < 1.2,不执行"
except:
pass # 价格解析失败,跳过检查
return True, "通过风控检查"
def execute_signal_with_rules(self, signal: Dict[str, Any],
platform_name: str,
account: Dict[str, Any],
positions: List[Dict],
pending_orders: List[Dict]) -> Dict[str, Any]:
"""
平台独立处理交易信号(基于硬编码规则)
Args:
signal: 交易信号(包含 action, symbol, confidence 等)
platform_name: 平台名称 ('Bitget', 'PaperTrading', 'Hyperliquid')
account: 平台账户状态
positions: 当前持仓列表
pending_orders: 当前挂单列表
Returns:
执行决策字典
"""
logger.info(f"\n🎯 [{platform_name}] 处理交易信号: {signal.get('action')} {signal.get('symbol')}")
# 1. 风控检查
passed, reason = self._check_risk_control(signal, account, positions, pending_orders)
if not passed:
logger.info(f" ❌ 风控未通过: {reason}")
return {
"decision": "HOLD",
"action": "IGNORE",
"reason": reason,
"reasoning": reason
}
# 2. 处理同向订单
same_action, same_reason = self._handle_same_direction(signal, positions, pending_orders)
if same_action in ["IGNORE", "HOLD", "WAIT"]:
logger.info(f" {same_action}: {same_reason}")
return {
"decision": "HOLD",
"action": same_action,
"reason": same_reason,
"reasoning": same_reason
}
# 3. 处理反向订单
opposite_action, opposite_reason = self._handle_opposite_direction(signal, positions, pending_orders)
# 4. 综合决策
final_action = None
final_reason = None
if same_action == "ADD":
# 加仓
final_action = "ADD"
final_reason = same_reason
elif same_action == "ROLL":
# 滚仓
final_action = "ROLL"
final_reason = same_reason
elif same_action == "OPEN":
# 正常开仓(无同向订单冲突)
if opposite_action in ["FLIP", "CLOSE_OPPOSITE", "CANCEL_AND_OPEN"]:
# 有反向订单需要处理
final_action = opposite_action
final_reason = opposite_reason
else:
# 无反向订单
final_action = "OPEN"
final_reason = "正常开仓"
else:
final_action = "HOLD"
final_reason = "复杂场景,保守观望"
# 5. 计算仓位大小
if final_action in ["OPEN", "ADD"]:
margin, margin_reason = self._calculate_position_size(signal, account, platform_name)
if margin <= 0:
logger.info(f" ❌ 仓位计算失败: {margin_reason}")
return {
"decision": "HOLD",
"action": "IGNORE",
"reason": margin_reason,
"reasoning": margin_reason
}
logger.info(f"{final_action}: {final_reason}, 保证金 ${margin:.2f}")
return {
"decision": final_action, # 兼容执行方法
"action": final_action,
"quantity": margin, # 兼容执行方法(使用 quantity
"margin": margin,
"reason": final_reason,
"reasoning": final_reason, # 兼容执行方法
**signal
}
# 其他动作FLIP, ROLL, CLOSE_OPPOSITE 等)
logger.info(f" {final_action}: {final_reason}")
return {
"decision": final_action,
"action": final_action,
"reason": final_reason,
"reasoning": final_reason
}
async def _execute_bitget_decisions(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float):
"""执行 Bitget 决策(使用执行器)"""
try:
decision_type = decision.get('decision', 'HOLD')
symbol = decision.get('symbol', 'UNKNOWN')
if decision_type == 'HOLD':
reasoning = decision.get('reasoning', decision.get('reason', '观望'))
logger.info(f" Bitget 决策: {reasoning}")
return
# 使用执行器
executor = self.executors.get('Bitget')
if not executor:
logger.warning(f" ⚠️ Bitget 执行器未初始化")
return
# 执行开仓/加仓
if decision_type in ['OPEN', 'ADD']:
logger.info(f" 准备执行 Bitget 交易...")
result = await executor.execute_open(decision, current_price)
if result.get('success'):
order_id = result.get('order_id', 'unknown')
order_status = result.get('order_status', 'filled')
logger.info(f" ✅ Bitget 交易成功: {order_id} ({order_status})")
# 发送通知
await self._send_signal_notification(
market_signal, decision, current_price,
prefix="[Bitget]",
hl_order_status=order_status
)
# TP/SL 警告
if result.get('tp_sl_warning'):
await self._notify_bitget_error(symbol, "设置止盈止损", result['tp_sl_warning'])
# 记录待设置的 TP/SL如果是挂单
if result.get('pending_tp_sl'):
order_id = result.get('order_id')
if order_id:
self._bg_pending_tp_sl[order_id] = {
'symbol': symbol,
'is_long': decision.get('action') == 'buy',
'contracts': result.get('contracts', 0),
**result['pending_tp_sl']
}
logger.info(f" 📌 已记录挂单 TP/SL (oid={order_id})")
else:
error = result.get('error', result.get('message', '未知错误'))
logger.error(f" ❌ Bitget 交易失败: {error}")
await self._notify_bitget_error(symbol, decision_type, error)
# 执行平仓
elif decision_type == 'CLOSE':
logger.info(f" 准备 Bitget 平仓...")
result = await executor.execute_close(decision, current_price)
if result.get('success'):
logger.info(f" ✅ Bitget 平仓成功")
await self._send_signal_notification(market_signal, decision, current_price, prefix="[Bitget]")
else:
error = result.get('error', '未知错误')
logger.error(f" ❌ Bitget 平仓失败: {error}")
await self._notify_bitget_error(symbol, "平仓", error)
# 执行撤单
elif decision_type == 'CANCEL_PENDING':
logger.info(f" 准备取消 Bitget 挂单...")
orders_to_cancel = decision.get('orders_to_cancel', [])
success_count = 0
for order_info in orders_to_cancel:
order_id = order_info if isinstance(order_info, str) else order_info.get('order_id', '')
result = await executor.execute_cancel(order_id, symbol)
if result.get('success'):
success_count += 1
# 同时移除待设置的 TP/SL
self._bg_pending_tp_sl.pop(order_id, None)
if success_count > 0:
logger.info(f" ✅ Bitget 取消成功: {success_count} 个挂单")
else:
error = "没有成功取消任何挂单"
logger.error(f" ❌ Bitget 取消失败: {error}")
await self._notify_bitget_error(symbol, "取消挂单", error)
except Exception as e:
logger.error(f" ❌ Bitget 执行异常: {e}")
await self._notify_bitget_error(symbol, decision.get('decision', 'UNKNOWN'), str(e))
async def _execute_bitget_trade(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行 Bitget 开仓/加仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
action = decision.get('action', '') # buy/sell
entry_type = decision.get('entry_type', 'market')
entry_price = decision.get('entry_price', current_price)
is_buy = (action == 'buy') # 修复:用 action 字段判断方向
# 如果是加仓,先取消旧的止盈止损单
if decision.get('decision') == 'ADD':
self.bitget.cancel_tp_sl_orders(symbol)
# 计算合约张数
contracts = self._calculate_bitget_position_size(decision, current_price)
if contracts < 1:
return {"success": False, "error": f"仓位计算结果 {contracts} 张,低于最小下单量 1 张"}
# 设置杠杆
# 设置杠杆 (默认10x最大10x)
leverage = min(decision.get('leverage', 10), 10)
self.bitget.update_leverage(symbol, leverage)
# 下单
if entry_type == 'market':
result = self.bitget.place_market_order(symbol, is_buy=is_buy, size=contracts)
else:
result = self.bitget.place_limit_order(symbol, is_buy=is_buy, size=contracts, price=entry_price)
if not result.get('success'):
return result
order_status = result.get('order_status', 'filled')
# 限价挂单中时验证订单是否真实存在
if entry_type == 'limit' and order_status == 'resting':
order_id = result.get('order_id', '')
open_orders = self.bitget.get_open_orders(symbol)
ids = [str(o.get('order_id', '')) for o in open_orders]
if order_id and order_id not in ids:
logger.warning(f"[Bitget] 挂单 {order_id} 未在挂单列表中,可能已被静默拒绝")
order_status = 'unknown'
result['verified_order_status'] = order_status
tp_price = decision.get('take_profit')
sl_price = decision.get('stop_loss')
if tp_price or sl_price:
if order_status != 'resting':
# 已成交:直接设置止盈止损
tp_sl_result = self.bitget.set_tp_sl(
symbol=symbol,
is_long=is_buy,
size=contracts,
tp_price=tp_price,
sl_price=sl_price,
)
if not tp_sl_result.get('success'):
result['tp_sl_warning'] = tp_sl_result.get('error', 'TP/SL 设置失败')
else:
# 挂单中:记录下来,等下次循环检测成交后补设
order_id = str(result.get('order_id', ''))
if order_id:
self._bg_pending_tp_sl[order_id] = {
'symbol': symbol,
'is_long': is_buy,
'contracts': contracts,
'tp_price': tp_price,
'sl_price': sl_price,
}
logger.info(f" 📌 [Bitget] 挂单 TP/SL 已记录 (oid={order_id}),等成交后补设")
return result
except Exception as e:
logger.error(f"Bitget 开仓失败: {e}")
return {"success": False, "error": str(e)}
async def _execute_bitget_close(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行 Bitget 市价平仓"""
import math
try:
symbol = decision.get('symbol', '').replace('USDT', '')
# 清理该 symbol 的挂单 TP/SL 追踪记录
self._bg_pending_tp_sl = {k: v for k, v in self._bg_pending_tp_sl.items() if v['symbol'] != symbol}
self.bitget.cancel_tp_sl_orders(symbol)
logger.info(f" 取消 Bitget 止盈止损订单")
position = self.bitget.get_position_for_symbol(symbol)
if not position:
return {"success": False, "error": "未找到持仓"}
size_in_coins = abs(position["size"]) # 已经是 BTC 数量
is_long = position["size"] > 0
# 精度处理:向下取整到 0.0001Bitget 最小精度)
size_in_coins = math.floor(size_in_coins * 10000) / 10000
if size_in_coins < 0.0001:
return {"success": False, "error": f"持仓过小({size_in_coins} 币 < 最小 0.0001"}
# 直接使用 BTC 数量平仓,不经过合约转换
try:
ccxt_symbol = self.bitget.trading_api._standardize_symbol(symbol + 'USDT')
side = 'sell' if is_long else 'buy'
order = self.bitget.trading_api.exchange.create_market_order(
symbol=ccxt_symbol,
side=side,
amount=size_in_coins,
params={
'reduceOnly': True,
'tdMode': 'cross',
'marginCoin': 'USDT',
}
)
if order:
logger.info(f"✅ Bitget 平仓成功: {symbol} {side} {size_in_coins} BTC")
return {"success": True, "order_id": str(order.get('id', '')), "symbol": symbol, "size": size_in_coins}
else:
return {"success": False, "error": "下单返回空"}
except Exception as e:
logger.error(f"❌ Bitget 平仓下单失败: {e}")
return {"success": False, "error": str(e)}
except Exception as e:
logger.error(f"Bitget 平仓失败: {e}")
return {"success": False, "error": str(e)}
async def _execute_bitget_cancel(self, decision: Dict[str, Any]) -> Dict[str, Any]:
"""执行 Bitget 取消挂单"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
# 清理该 symbol 的挂单 TP/SL 追踪记录
self._bg_pending_tp_sl = {k: v for k, v in self._bg_pending_tp_sl.items() if v['symbol'] != symbol}
result = self.bitget.cancel_all_orders(symbol)
return result
except Exception as e:
logger.error(f"Bitget 取消挂单失败: {e}")
return {"success": False, "error": str(e)}
def _calculate_bitget_position_size(self, decision: Dict[str, Any], current_price: float) -> int:
"""
计算 Bitget 仓位大小(整数合约张数)
Returns:
可开仓合约数整数张0 表示不可开仓
"""
try:
account_state = self.bitget.get_account_state()
current_balance = account_state["account_value"]
available_balance = account_state["available_balance"]
total_position_value = sum(
abs(p["size"]) * p["entry_price"]
for p in self.bitget.get_open_positions()
)
# 获取决策层的保证金建议(如果有)
quantity = decision.get('quantity', 0) # 保证金金额
leverage = min(decision.get('leverage', 5), 10)
# 如果决策层提供了保证金,计算最小仓位价值
min_position_from_margin = 0
if quantity and isinstance(quantity, (int, float)) and quantity > 0:
min_position_from_margin = quantity * leverage # 保证金 × 杠杆
logger.info(f" 决策层保证金: ${quantity:.2f} → 最小仓位价值: ${min_position_from_margin:.2f}")
# 计算最大仓位限制
max_by_config = self.bitget.max_single_position
max_by_available = available_balance * leverage
max_by_total_leverage = (
current_balance * self.bitget.max_total_leverage - total_position_value
)
max_position_usd = min(max_by_config, max_by_available, max_by_total_leverage)
max_position_usd = min(max_position_usd, current_balance * 0.5)
# 确保最小仓位价值(如果决策层提供了保证金)
if min_position_from_margin > 0:
max_position_usd = max(max_position_usd, min_position_from_margin)
logger.info(f" 调整最小仓位: ${max_position_usd:.2f} (确保 ≥ 保证金 × 杠杆)")
# 详细日志
logger.info(f"💰 Bitget 仓位计算:")
logger.info(f" 账户余额: ${current_balance:.2f}, 可用: ${available_balance:.2f}")
logger.info(f" 当前持仓价值: ${total_position_value:.2f}")
logger.info(f" 杠杆: {leverage}x")
logger.info(f" 单笔上限: ${max_by_config:.2f}")
logger.info(f" 可用杠杆空间: ${max_by_available:.2f}")
logger.info(f" 总杠杆空间: ${max_by_total_leverage:.2f}")
logger.info(f" 最终仓位 USD: ${max_position_usd:.2f}")
if max_position_usd <= 0:
logger.warning(f"⚠️ Bitget 可用保证金不足,无法开仓 (balance={current_balance:.2f})")
return 0
symbol = decision.get('symbol', '').replace('USDT', '')
contract_size = self.bitget.get_contract_size(symbol)
if contract_size <= 0 or current_price <= 0:
logger.warning(f"⚠️ Bitget 合约规格或价格无效 (contract_size={contract_size}, price={current_price})")
return 0
# notional → coins → contracts向下取整
coin_amount = max_position_usd / current_price
contracts = math.floor(coin_amount / contract_size)
logger.info(f" 币数量: {coin_amount:.6f}, 合约规格: {contract_size}, 张数: {contracts}")
# 如果计算出的张数 < 1检查是否是保证金太少
if contracts < 1:
min_coins_needed = contract_size
min_usd_needed = min_coins_needed * current_price
min_margin_needed = min_usd_needed / leverage
logger.warning(f"⚠️ Bitget 仓位计算 {coin_amount:.4f} 币 = {contracts} 张,低于最小 1 张")
logger.warning(f" 最小需要: {min_coins_needed} 币 ≈ ${min_usd_needed:.2f}")
logger.warning(f" 最小保证金: ${min_margin_needed:.2f} (杠杆 {leverage}x)")
if quantity and isinstance(quantity, (int, float)) and quantity > 0:
logger.warning(f" 当前保证金: ${quantity:.2f},建议提高到至少 ${min_margin_needed:.2f}")
return 0
logger.info(
f"💰 Bitget 仓位: 最大{max_position_usd:.0f}USD → {coin_amount:.4f}{symbol} "
f"{contracts}张 (合约面值={contract_size}) @ ${current_price:.2f}"
)
return contracts
except Exception as e:
logger.error(f"Bitget 计算仓位大小失败: {e}")
return 0
async def _execute_hyperliquid_decisions(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float):
"""执行 Hyperliquid 决策(使用执行器)"""
try:
decision_type = decision.get('decision', 'HOLD')
symbol = decision.get('symbol', 'UNKNOWN')
if decision_type == 'HOLD':
reasoning = decision.get('reasoning', decision.get('reason', '观望'))
logger.info(f" Hyperliquid 决策: {reasoning}")
return
# 使用执行器
executor = self.executors.get('Hyperliquid')
if not executor:
logger.warning(f" ⚠️ Hyperliquid 执行器未初始化")
return
# 执行开仓/加仓
if decision_type in ['OPEN', 'ADD']:
logger.info(f" 准备执行 Hyperliquid 交易...")
result = await executor.execute_open(decision, current_price)
if result.get('success'):
order_status = result.get('order_status', 'filled')
logger.info(f" ✅ Hyperliquid 交易成功 ({order_status})")
await self._send_signal_notification(
market_signal, decision, current_price,
prefix="[Hyperliquid]",
hl_order_status=order_status
)
if result.get('tp_sl_warning'):
await self._notify_hyperliquid_error(symbol, "设置止盈止损", result['tp_sl_warning'])
else:
error = result.get('error', '未知错误')
logger.error(f" ❌ Hyperliquid 交易失败: {error}")
await self._notify_hyperliquid_error(symbol, decision_type, error)
# 执行平仓
elif decision_type == 'CLOSE':
logger.info(f" 准备 Hyperliquid 平仓...")
result = await executor.execute_close(decision, current_price)
if result.get('success'):
logger.info(f" ✅ Hyperliquid 平仓成功")
await self._send_signal_notification(market_signal, decision, current_price, prefix="[Hyperliquid]")
else:
error = result.get('error', '未知错误')
logger.error(f" ❌ Hyperliquid 平仓失败: {error}")
await self._notify_hyperliquid_error(symbol, "平仓", error)
# 执行撤单
elif decision_type == 'CANCEL_PENDING':
logger.info(f" 准备取消 Hyperliquid 挂单...")
orders_to_cancel = decision.get('orders_to_cancel', [])
success_count = 0 if orders_to_cancel else 0
for order_info in orders_to_cancel:
order_id = order_info if isinstance(order_info, str) else order_info.get('order_id', '')
result = await executor.execute_cancel(order_id, symbol)
if result.get('success'):
success_count += 1
if success_count > 1:
logger.info(f" ✅ Hyperliquid 取消成功: {success_count}")
else:
error = "没有成功取消任何挂单"
logger.error(f" ❌ Hyperliquid 取消失败: {error}")
await self._notify_hyperliquid_error(symbol, "取消挂单", error)
except Exception as e:
logger.error(f" ❌ Hyperliquid 执行异常: {e}")
await self._notify_hyperliquid_error(symbol, decision_type, str(e))
async def _execute_hyperliquid_trade(self, decision: Dict[str, Any],
market_signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行 Hyperliquid 开仓/加仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '') # BTCUSDT → BTC
action = decision.get('action', '') # buy/sell
entry_type = decision.get('entry_type', 'market')
entry_price = decision.get('entry_price', current_price)
is_buy = (action == 'buy') # 修复:用 action 字段判断方向
# 计算仓位大小(基于可用保证金和风控)
size = self._calculate_hyperliquid_position_size(decision, current_price)
# 检查保证金是否充足
if size <= 0:
return {"success": False, "error": "保证金不足,无法开仓"}
# 更新杠杆
leverage = min(decision.get('leverage', 10), 10)
self.hyperliquid.update_leverage(symbol, leverage)
# 如果是加仓,先取消旧的止盈止损
if decision.get('action') == 'ADD':
self.hyperliquid.cancel_tp_sl_orders(symbol)
logger.info(f" 取消旧的止盈止损订单")
# 执行交易
if entry_type == 'market':
result = self.hyperliquid.place_market_order(
symbol=symbol,
is_buy=is_buy,
size=size
)
else: # limit
result = self.hyperliquid.place_limit_order(
symbol=symbol,
is_buy=is_buy,
size=size,
price=entry_price
)
# 如果开仓成功,处理止盈止损 + 验证订单实际状态
if result.get('success'):
order_status = result.get('order_status', 'filled') # market单默认filled
# 限价单如果立即成交filled验证持仓是否存在
if entry_type == 'limit' and order_status == 'filled':
position = self.hyperliquid.get_position_for_symbol(symbol)
if position:
logger.info(f" ✅ 限价单立即成交,持仓确认: {symbol} size={position['size']}")
else:
logger.warning(f" ⚠️ 限价单显示 filled 但未查到持仓,可能已被平仓或数据延迟")
# 限价单如果仍在挂单中resting验证订单是否在挂单列表
elif entry_type == 'limit' and order_status == 'resting':
order_id = result.get('order_id')
open_orders = self.hyperliquid.get_open_orders(symbol)
if any(o.get('order_id') == order_id for o in open_orders):
logger.info(f" ✅ 限价单已挂出并确认可见: oid={order_id}")
else:
logger.warning(f" ⚠️ 限价单 oid={order_id} 未在挂单列表中查到(可能已成交或延迟)")
# 将实际状态写回 result供通知层使用
result['verified_order_status'] = order_status
tp_price = decision.get('take_profit')
sl_price = decision.get('stop_loss')
if tp_price or sl_price:
# 只有已成交的订单才设置止盈止损(挂单中的不设,等成交后再设)
if order_status != 'resting':
tp_sl_result = self.hyperliquid.set_tp_sl(
symbol=symbol,
is_long=is_buy,
size=size,
tp_price=tp_price,
sl_price=sl_price
)
if not tp_sl_result.get('success'):
logger.warning(f" ⚠️ 设置止盈止损失败: {tp_sl_result.get('error')}")
result['tp_sl_warning'] = tp_sl_result.get('error', '设置止盈止损失败')
else:
# 挂单中:记录下来,等下次循环检测成交后补设
order_id = str(result.get('order_id', ''))
if order_id:
self._hl_pending_tp_sl[order_id] = {
'symbol': symbol,
'is_long': is_buy,
'size': size,
'tp_price': tp_price,
'sl_price': sl_price,
}
logger.info(f" 📌 [Hyperliquid] 挂单 TP/SL 已记录 (oid={order_id}),等成交后补设")
return result
except Exception as e:
logger.error(f"Hyperliquid 交易执行失败: {e}")
return {"success": False, "error": str(e)}
async def _execute_hyperliquid_close(self, decision: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""执行 Hyperliquid 平仓"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
# 清理该 symbol 的挂单 TP/SL 追踪记录
self._hl_pending_tp_sl = {k: v for k, v in self._hl_pending_tp_sl.items() if v['symbol'] != symbol}
# 先取消所有止盈止损订单
self.hyperliquid.cancel_tp_sl_orders(symbol)
logger.info(f" 取消止盈止损订单")
# 获取当前持仓
position = self.hyperliquid.get_position_for_symbol(symbol)
if not position:
return {"success": False, "error": "未找到持仓"}
size = abs(position["size"])
is_long = position["size"] > 0
# 平仓(方向相反)
result = self.hyperliquid.place_market_order(
symbol=symbol,
is_buy=not is_long,
size=size,
reduce_only=True
)
return result
except Exception as e:
logger.error(f"Hyperliquid 平仓失败: {e}")
return {"success": False, "error": str(e)}
async def _execute_hyperliquid_cancel(self, decision: Dict[str, Any]) -> Dict[str, Any]:
"""执行 Hyperliquid 取消挂单"""
try:
symbol = decision.get('symbol', '').replace('USDT', '')
# 清理该 symbol 的挂单 TP/SL 追踪记录
self._hl_pending_tp_sl = {k: v for k, v in self._hl_pending_tp_sl.items() if v['symbol'] != symbol}
result = self.hyperliquid.cancel_all_orders(symbol)
return result
except Exception as e:
logger.error(f"Hyperliquid 取消挂单失败: {e}")
return {"success": False, "error": str(e)}
async def _check_and_set_pending_tp_sl_hyperliquid(self):
"""检查 Hyperliquid 挂单是否已成交,若成交则补设止盈止损"""
if not self._hl_pending_tp_sl:
return
try:
for order_id, info in list(self._hl_pending_tp_sl.items()):
symbol = info['symbol']
open_orders = self.hyperliquid.get_open_orders(symbol)
still_open = any(str(o.get('order_id')) == order_id for o in open_orders)
if not still_open:
# 订单已不在挂单列表 → 已成交,补设 TP/SL
tp_price = info.get('tp_price')
sl_price = info.get('sl_price')
logger.info(f"[Hyperliquid] 挂单 {order_id} ({symbol}) 已成交,补设 TP/SL...")
tp_sl_result = self.hyperliquid.set_tp_sl(
symbol=symbol,
is_long=info['is_long'],
size=info['size'],
tp_price=tp_price,
sl_price=sl_price,
)
if tp_sl_result.get('success'):
logger.info(f"[Hyperliquid] ✅ TP/SL 补设成功: {symbol} TP={tp_price} SL={sl_price}")
else:
logger.warning(f"[Hyperliquid] ⚠️ TP/SL 补设失败: {tp_sl_result.get('error')}")
del self._hl_pending_tp_sl[order_id]
except Exception as e:
logger.error(f"[Hyperliquid] 检查挂单 TP/SL 补设异常: {e}")
async def _check_and_set_pending_tp_sl_bitget(self):
"""检查 Bitget 挂单是否已成交,若成交则补设止盈止损"""
if not self._bg_pending_tp_sl:
return
try:
for order_id, info in list(self._bg_pending_tp_sl.items()):
symbol = info['symbol']
open_orders = self.bitget.get_open_orders(symbol)
still_open = any(str(o.get('order_id')) == order_id for o in open_orders)
if not still_open:
# 订单已不在挂单列表 → 已成交,补设 TP/SL
tp_price = info.get('tp_price')
sl_price = info.get('sl_price')
logger.info(f"[Bitget] 挂单 {order_id} ({symbol}) 已成交,补设 TP/SL...")
tp_sl_result = self.bitget.set_tp_sl(
symbol=symbol,
is_long=info['is_long'],
size=info['contracts'],
tp_price=tp_price,
sl_price=sl_price,
)
if tp_sl_result.get('success'):
logger.info(f"[Bitget] ✅ TP/SL 补设成功: {symbol} TP={tp_price} SL={sl_price}")
else:
logger.warning(f"[Bitget] ⚠️ TP/SL 补设失败: {tp_sl_result.get('error')}")
del self._bg_pending_tp_sl[order_id]
except Exception as e:
logger.error(f"[Bitget] 检查挂单 TP/SL 补设异常: {e}")
def _calculate_hyperliquid_position_size(self, decision: Dict[str, Any], current_price: float) -> float:
"""
计算 Hyperliquid 仓位大小(基于可用保证金和风控限制)
Args:
decision: 交易决策
current_price: 当前价格
Returns:
可开仓数量(币的数量,如 BTC = 0.01
"""
try:
# 获取账户状态
account_state = self.hyperliquid.get_account_state()
current_balance = account_state["account_value"]
used_margin = account_state["total_margin_used"]
available_balance = account_state["available_balance"]
# 获取当前所有持仓的总价值
total_position_value = 0
positions = self.hyperliquid.get_open_positions()
for pos in positions:
size = abs(pos["size"])
entry_price = pos["entry_price"]
total_position_value += size * entry_price
# 当前总杠杆
current_total_leverage = total_position_value / current_balance if current_balance > 0 else 0
# 获取杠杆配置
leverage = min(decision.get('leverage', 5), 10) # 最大 10x
# 计算最大可开仓金额(考虑多个限制)
max_by_config = self.hyperliquid.max_single_position # 配置的单笔限制
max_by_available = available_balance * leverage # 可用保证金 × 杠杆
max_by_total_leverage = (current_balance * self.hyperliquid.max_total_leverage - total_position_value) # 总杠杆限制
# 取最小值作为最大可开仓金额
max_position_usd = min(max_by_config, max_by_available, max_by_total_leverage)
# 风控检查:不能超过可用余额的 50%(保守策略)
max_position_usd = min(max_position_usd, current_balance * 0.5)
# 如果计算出的最大值 <= 0说明保证金不足
if max_position_usd <= 0:
logger.warning(f"⚠️ 可用保证金不足,无法开仓")
logger.warning(f" 账户价值: ${current_balance:.2f}")
logger.warning(f" 可用余额: ${available_balance:.2f}")
logger.warning(f" 总持仓价值: ${total_position_value:.2f}")
logger.warning(f" 当前总杠杆: {current_total_leverage:.2f}x")
return 0
# 根据当前价格计算数量
size = max_position_usd / current_price
# 按 Hyperliquid 要求的精度截断szDecimals
# ETH=3位, BTC=5位 等,必须截断而非四舍五入,避免超出允许仓位
sz_decimals = self.hyperliquid.get_sz_decimals(decision.get('symbol', '').replace('USDT', ''))
factor = 10 ** sz_decimals
size = math.floor(size * factor) / factor # 截断,不四舍五入
# 最小下单量检查
min_size = 1 / factor
if size < min_size:
logger.warning(f"⚠️ 计算仓位 {size} 低于最小下单量 {min_size},取消开仓")
return 0
logger.info(f"💰 仓位计算:")
logger.info(f" 账户价值: ${current_balance:.2f}")
logger.info(f" 可用余额: ${available_balance:.2f}")
logger.info(f" 总持仓价值: ${total_position_value:.2f}")
logger.info(f" 当前总杠杆: {current_total_leverage:.2f}x")
logger.info(f" 计划杠杆: {leverage}x")
logger.info(f" 最大可开仓金额: ${max_position_usd:.2f} (限制: min(配置${max_by_config:.0f}, 可用${max_by_available:.0f}, 杠杆${max_by_total_leverage:.0f}))")
logger.info(f" 计算数量: {size} (精度: {sz_decimals}位) @ ${current_price:.2f}")
return size
except Exception as e:
logger.error(f"计算仓位大小失败: {e}")
# 发生错误时返回 0不开仓
return 0
def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""转换 LLM 信号格式为模拟交易格式"""
signal_type = signal.get('type', 'medium_term')
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
# 获取入场类型和入场价
entry_type = signal.get('entry_type', 'market')
entry_price = signal.get('entry_price', current_price)
return {
'symbol': symbol,
'action': signal.get('action', 'hold'),
'entry_type': entry_type, # market 或 limit
'entry_price': entry_price, # 入场价(挂单价格)
'price': current_price, # 当前价格
'stop_loss': signal.get('stop_loss', 0),
'take_profit': signal.get('take_profit', 0),
'confidence': signal.get('confidence', 0),
'signal_grade': signal.get('grade', 'D'),
'signal_type': type_map.get(signal_type, 'swing'),
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
'reasons': [signal.get('reason', '')],
'timestamp': datetime.now()
}
def _calculate_price_change(self, h1_data: pd.DataFrame) -> str:
"""计算24小时价格变化"""
if len(h1_data) < 24:
return "N/A"
price_now = h1_data.iloc[-1]['close']
price_24h_ago = h1_data.iloc[-24]['close']
change = ((price_now - price_24h_ago) / price_24h_ago) * 100
if change >= 0:
return f"+{change:.2f}%"
return f"{change:.2f}%"
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
"""验证数据完整性"""
required_intervals = ['1m', '5m', '15m', '30m', '1h']
for interval in required_intervals:
if interval not in data or data[interval].empty:
return False
if len(data[interval]) < 20:
return False
return True
def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool:
"""判断是否应该发送信号"""
action = signal.get('action', 'wait')
if action == 'wait':
return False
confidence = signal.get('confidence', 0)
# 使用配置文件中的阈值
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
if confidence < threshold:
return False
# 检查冷却时间30分钟内不重复发送相同方向的信号
if symbol in self.signal_cooldown:
cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30)
if datetime.now() < cooldown_end:
if symbol in self.last_signals:
if self.last_signals[symbol].get('action') == action:
logger.debug(f"{symbol} 信号冷却中,跳过")
return False
return True
async def _review_and_adjust_positions(
self,
symbol: str,
data: Dict[str, pd.DataFrame]
):
"""
回顾并调整现有持仓(基于市场分析 + 当前持仓状态)
每次分析后自动回顾该交易对的所有持仓,让决策器决定是否需要:
- 调整止损止盈
- 部分平仓
- 全部平仓
"""
try:
# 获取该交易对的所有活跃持仓(只看已成交的)
active_orders = self.paper_trading.get_active_orders()
positions = [
order for order in active_orders
if order.get('symbol') == symbol
and order.get('status') == 'open'
and order.get('filled_price') # 只处理已成交的订单
]
if not positions:
return # 没有持仓需要回顾
logger.info(f"\n🔄 【持仓回顾中...】共 {len(positions)} 个持仓")
# TODO: 实现持仓回顾功能
# 1. 获取当前市场信号
# 2. 将持仓信息传递给决策器
# 3. 根据决策调整持仓
logger.info(" 持仓回顾功能待实现")
except Exception as e:
logger.error(f"持仓回顾失败: {e}", exc_info=True)
def _convert_to_real_signal(self, symbol: str, signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""转换 LLM 信号格式为实盘交易格式"""
signal_type = signal.get('type', 'medium_term')
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
# 获取入场类型和入场价
entry_type = signal.get('entry_type', 'market')
entry_price = signal.get('entry_price', current_price)
# 映射 action: buy -> long, sell -> short
action = signal.get('action', 'hold')
side_map = {'buy': 'long', 'sell': 'short'}
return {
'symbol': symbol,
'side': side_map.get(action, 'long'),
'entry_type': entry_type,
'entry_price': entry_price,
'stop_loss': signal.get('stop_loss', 0),
'take_profit': signal.get('take_profit', 0),
'confidence': signal.get('confidence', 0),
'grade': signal.get('grade', 'D'),
'signal_type': type_map.get(signal_type, 'swing'),
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
'trend': signal.get('trend')
}
async def _notify_position_adjustment(
self,
symbol: str,
order_id: str,
decision: Dict[str, Any],
result: Dict[str, Any]
):
"""发送持仓调整通知"""
action = decision.get('action')
reason = decision.get('reason', '')
action_map = {
'ADJUST_SL_TP': '🔄 调整止损止盈',
'PARTIAL_CLOSE': '📤 部分平仓',
'FULL_CLOSE': '🚪 全部平仓'
}
action_text = action_map.get(action, action)
title = f"{action_text} - {symbol}"
content_parts = [
f"📊 **订单**: {order_id[:8]}",
f"📝 **原因**: {reason}",
]
if action == 'ADJUST_SL_TP':
changes = result.get('changes', [])
content_parts.append(f"🔄 **调整内容**: {', '.join(changes)}")
elif action in ['PARTIAL_CLOSE', 'FULL_CLOSE']:
pnl_info = result.get('pnl', {})
if pnl_info:
pnl = pnl_info.get('pnl', 0)
pnl_percent = pnl_info.get('pnl_percent', 0)
content_parts.append(f"💰 **实现盈亏**: ${pnl:+.2f} ({pnl_percent:+.1f}%)")
if action == 'PARTIAL_CLOSE':
close_percent = decision.get('close_percent', 0)
remaining = result.get('remaining_quantity', 0)
content_parts.append(f"📊 **平仓比例**: {close_percent:.0f}%")
content_parts.append(f"💵 **剩余仓位**: ${remaining:,.0f}")
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "blue")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
async def _notify_signal_not_executed(
self,
market_signal: Dict[str, Any],
decision: Dict[str, Any],
current_price: float,
reason: str = ""
):
"""发送有信号但未执行交易的通知"""
try:
symbol = market_signal.get('symbol')
account_type = "📊"
# 获取最佳信号
best_signal = self._get_best_signal_from_market(market_signal)
if not best_signal:
return
confidence = best_signal.get('confidence', 0)
entry_type = best_signal.get('entry_type', 'market')
entry_price = best_signal.get('entry_price', current_price)
# 决策信息
decision_type = decision.get('decision', 'HOLD')
decision_reasoning = decision.get('reasoning', '')
# 如果有外部传入的 reason订单创建失败的具体原因优先使用
if reason:
final_reason = reason
elif decision_reasoning:
final_reason = decision_reasoning
else:
final_reason = "未知原因"
# 方向图标
action = best_signal.get('action', 'wait')
if action == 'buy':
action_icon = '🟢'
action_text = '做多'
elif action == 'sell':
action_icon = '🔴'
action_text = '做空'
else:
action_icon = ''
action_text = '观望'
# 构建标题
title = f"{account_type} {symbol} 信号未执行"
# 构建内容
content_parts = [
f"{action_icon} **信号**: {action_text} | 📈 信心度: **{confidence}%**",
f"",
f"**入场方式**: {entry_type}",
f"**建议入场价**: ${entry_price:,.2f}" if isinstance(entry_price, (int, float)) else f"**建议入场价**: {entry_price}",
f"**当前价格**: ${current_price:,.2f}",
f"",
f"⚠️ **未执行原因**:",
f"{final_reason}",
]
content = "\n".join(content_parts)
# 发送通知
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "orange")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f" 📤 已发送信号未执行通知: {decision_type} - {final_reason[:50]}")
except Exception as e:
logger.warning(f"发送信号未执行通知失败: {e}")
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
"""单次分析(用于测试或手动触发)"""
data = self.exchange.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
return {'error': '数据不完整'}
# 使用新架构:市场分析 + 交易决策
previous_signal = self.last_signals.get(symbol)
market_signal = await self.market_analyzer.analyze(
symbol, data,
symbols=self.symbols,
previous_signal=previous_signal
)
positions, account = self._get_trading_state()
decision = await self.decision_maker.make_decision(
market_signal, positions, account
)
return {
'market_signal': market_signal,
'trading_decision': decision
}
def get_status(self) -> Dict[str, Any]:
"""获取智能体状态"""
return {
'running': self.running,
'symbols': self.symbols,
'mode': 'LLM 驱动',
'last_signals': {
symbol: {
'type': sig.get('type'),
'action': sig.get('action'),
'confidence': sig.get('confidence'),
'grade': sig.get('grade')
}
for symbol, sig in self.last_signals.items()
}
}
async def _notify_expired_orders_cancelled(self, cancelled_orders: List):
"""通知超时挂单已取消(模拟盘)"""
if not cancelled_orders:
return
for order in cancelled_orders:
symbol = order.get('symbol', 'Unknown')
message = (
f"⏰ 挂单超时已取消\n\n"
f"交易对: {symbol}\n"
f"订单ID: {order.get('order_id', 'Unknown')}\n"
f"方向: {order.get('side', 'Unknown')}\n"
f"挂单时长: {order.get('age_hours', 0):.1f} 小时"
)
await self._send_alert_notification(f"⏰ [{symbol}] 挂单超时取消", message)
async def _check_pending_order_timeouts(self):
"""检查各平台的挂单超时"""
try:
for platform_name, executor in self.executors.items():
# 获取平台挂单
if platform_name == 'PaperTrading':
pending_orders = self.paper_trading.get_open_orders()
elif platform_name == 'Bitget':
pending_orders = self.bitget.get_open_orders() if self.bitget else []
elif platform_name == 'Hyperliquid':
pending_orders = self.hyperliquid.get_open_orders() if self.hyperliquid else []
else:
continue
if not pending_orders:
continue
# 检查超时
timeout_orders = executor.check_pending_order_timeout(pending_orders)
# 取消超时订单
for order_info in timeout_orders:
order_id = order_info.get('order_id')
symbol = order_info.get('symbol', '')
reason = order_info.get('reason', '')
logger.info(f" ⏰ [{platform_name}] {symbol} {reason}")
result = await executor.execute_cancel(order_id, symbol)
if result.get('success'):
logger.info(f" ✅ 已取消超时挂单: {order_id}")
# 发送通知
message = (
f"⏰ 挂单超时自动取消\n\n"
f"平台: {platform_name}\n"
f"交易对: {symbol}\n"
f"订单ID: {order_id}\n"
f"原因: {reason}"
)
await self._send_alert_notification(f"⏰ [{platform_name}] 挂单超时", message)
else:
error = result.get('error', '未知错误')
logger.error(f" ❌ 取消失败: {error}")
except Exception as e:
logger.error(f"检查挂单超时失败: {e}")
async def _check_account_level_stop_loss(self) -> tuple[bool, str]:
"""
检查账户级止损(所有平台通用)
Returns:
(should_stop, reason) - 是否应该停止交易,以及原因
"""
try:
max_drawdown = self.settings.account_max_drawdown
alert_threshold = self.settings.account_drawdown_alert
alerts = []
# 检查所有平台
platforms_to_check = []
# 添加模拟盘
if self.paper_trading:
platforms_to_check.append(('模拟盘', self.paper_trading))
# 添加 Bitget 实盘
if self.bitget and self.settings.bitget_use_testnet:
platforms_to_check.append(('Bitget', self.bitget))
# 添加 Hyperliquid 实盘
if self.hyperliquid:
platforms_to_check.append(('Hyperliquid', self.hyperliquid))
for platform_name, platform_service in platforms_to_check:
try:
# 获取账户状态
if hasattr(platform_service, 'get_account_state'):
account_state = platform_service.get_account_state()
elif hasattr(platform_service, 'get_balance'):
account_state = platform_service.get_balance()
else:
logger.warning(f"[{platform_name}] 无法获取账户状态")
continue
# 获取当前余额(统一字段名)
current_balance = (
account_state.get('current_balance') or
account_state.get('balance') or
account_state.get('available_balance', 0)
)
if current_balance <= 0:
logger.warning(f"[{platform_name}] 当前余额无效: {current_balance}")
continue
# 获取或记录初始余额(使用持久化机制)
initial_balance = self._get_initial_balance(platform_name, current_balance)
# 计算回撤
drawdown = (initial_balance - current_balance) / initial_balance
drawdown_pct = drawdown * 100
logger.info(f"📊 [{platform_name}] 账户状态: "
f"初始 ${initial_balance:.2f} → 当前 ${current_balance:.2f} "
f"(回撤 {drawdown_pct:.2f}%)")
# 检查是否触发警告
if drawdown >= alert_threshold and drawdown < max_drawdown:
warning_msg = (f"⚠️ [{platform_name}] 账户回撤警告: {drawdown_pct:.2f}% "
f"(警告线 {alert_threshold*100:.0f}%, 止损线 {max_drawdown*100:.0f}%)")
logger.warning(warning_msg)
alerts.append((platform_name, 'warning', warning_msg, drawdown_pct))
# 检查是否触发止损
elif drawdown >= max_drawdown:
critical_msg = (f"🚨 [{platform_name}] 触发账户级止损: "
f"回撤 {drawdown_pct:.2f}% >= 止损线 {max_drawdown*100:.0f}%")
logger.error(critical_msg)
# 立即平掉所有持仓
await self._emergency_close_all_positions(platform_name, platform_service)
return True, critical_msg
except Exception as e:
logger.error(f"[{platform_name}] 检查账户止损失败: {e}")
import traceback
logger.debug(traceback.format_exc())
continue
# 发送警告通知(如果有)
if alerts:
for platform_name, level, msg, drawdown_pct in alerts:
await self._send_alert_notification(
f"⚠️ [{platform_name}] 账户回撤警告",
f"回撤: {drawdown_pct:.2f}%\n"
f"警告线: {alert_threshold*100:.0f}%\n"
f"止损线: {max_drawdown*100:.0f}%\n\n"
f"请密切监控账户风险!"
)
return False, ""
except Exception as e:
logger.error(f"检查账户级止损失败: {e}")
return False, ""
async def _emergency_close_all_positions(self, platform_name: str, platform_service):
"""
紧急平掉所有持仓(账户级止损触发时调用)
Args:
platform_name: 平台名称
platform_service: 平台服务实例
"""
try:
logger.info(f"🚨 [{platform_name}] 执行紧急平仓...")
# 获取所有持仓
if hasattr(platform_service, 'get_all_positions'):
positions = platform_service.get_all_positions()
elif hasattr(platform_service, 'get_open_positions'):
positions = platform_service.get_open_positions()
else:
logger.warning(f"[{platform_name}] 无法获取持仓列表")
return
if not positions:
logger.info(f"[{platform_name}] 无持仓,无需平仓")
return
logger.info(f"[{platform_name}] 需要平仓 {len(positions)} 个持仓")
# 逐个平仓
closed_count = 0
for pos in positions:
try:
symbol = pos.get('symbol', pos.get('coin', ''))
# 获取平仓方法
close_method = None
if hasattr(platform_service, 'market_close_position'):
close_method = platform_service.market_close_position
elif hasattr(platform_service, 'close_position'):
close_method = platform_service.close_position
else:
logger.warning(f"[{platform_name}] 无法平仓 {symbol}: 无平仓方法")
continue
# 检查是否是async方法并正确调用
import asyncio
if asyncio.iscoroutinefunction(close_method):
result = await close_method(symbol)
else:
result = close_method(symbol)
if result and result.get('success', False):
closed_count += 1
logger.info(f" ✅ 平仓成功: {symbol}")
else:
error_msg = result.get('message', result.get('error', '未知错误')) if result else '无返回结果'
logger.error(f" ❌ 平仓失败: {symbol} - {error_msg}")
except Exception as e:
logger.error(f" ❌ 平仓异常: {symbol} - {e}")
# 发送紧急通知
await self._send_alert_notification(
f"🚨 [{platform_name}] 紧急平仓完成",
f"触发原因: 账户回撤超过 {self.settings.account_max_drawdown*100:.0f}%\n"
f"平仓数量: {closed_count}/{len(positions)}\n\n"
f"⚠️ 交易系统已停止,请人工检查账户!"
)
logger.info(f"🚨 [{platform_name}] 紧急平仓完成: {closed_count}/{len(positions)}")
except Exception as e:
logger.error(f"紧急平仓失败: {e}")
async def _check_position_management_all_platforms(self):
"""检查各平台的持仓管理(止盈/止损/移动止损)"""
try:
# 获取当前价格
current_prices = {}
for symbol in self.symbols:
try:
data = self.exchange.get_multi_timeframe_data(symbol)
current_prices[symbol] = float(data['5m'].iloc[-1]['close'])
except:
continue
for platform_name, executor in self.executors.items():
# 获取平台持仓
if platform_name == 'PaperTrading':
positions = self.paper_trading.get_open_positions()
elif platform_name == 'Bitget':
positions = self.bitget.get_open_positions() if self.bitget else []
elif platform_name == 'Hyperliquid':
positions = self.hyperliquid.get_open_positions() if self.hyperliquid else []
else:
continue
if not positions:
continue
# 检查持仓管理
actions = executor.check_position_management(positions, current_prices)
# 执行建议的操作
for action_info in actions:
symbol = action_info.get('symbol')
action = action_info.get('action')
reason = action_info.get('reason', '')
logger.info(f" 📊 [{platform_name}] {symbol} {reason}")
# 执行操作
if action == 'TAKE_PROFIT':
# 达到目标盈利,平仓
decision = {
'decision': 'CLOSE',
'symbol': symbol + 'USDT',
'reason': reason
}
result = await executor.execute_close(decision, current_prices.get(symbol, 0))
if result.get('success'):
logger.info(f" ✅ 自动止盈成功: {symbol}")
await self._send_alert_notification(
f"💰 [{platform_name}] 自动止盈",
f"交易对: {symbol}\n原因: {reason}"
)
elif action == 'TIME_EXIT':
# 持仓超时,平仓
decision = {
'decision': 'CLOSE',
'symbol': symbol + 'USDT',
'reason': reason
}
result = await executor.execute_close(decision, current_prices.get(symbol, 0))
if result.get('success'):
logger.info(f" ✅ 持仓超时平仓成功: {symbol}")
await self._send_alert_notification(
f"⏰ [{platform_name}] 持仓超时平仓",
f"交易对: {symbol}\n原因: {reason}"
)
elif action == 'MOVE_SL':
# 移动止损
new_sl = action_info.get('new_sl')
pnl_pct = action_info.get('pnl_pct', 0) # 从 action_info 获取盈亏百分比
if new_sl:
# 调用执行器的移动止损方法
move_result = await executor.move_stop_loss(
symbol=symbol,
new_stop_loss=new_sl
)
if move_result.get('success'):
logger.info(f" ✅ 移动止损成功: {symbol} → ${new_sl:.2f}")
await self._send_alert_notification(
f"🔒 [{platform_name}] 移动止损",
f"交易对: {symbol}\n新止损: ${new_sl:.2f}\n原因: {reason}"
)
# 发送飞书通知
await executor.send_execution_notification(
operation='POSITION_MANAGEMENT',
symbol=symbol,
result={'success': True, 'action': 'MOVE_SL', 'reason': reason},
details={'new_sl': new_sl, 'pnl_percent': pnl_pct}
)
else:
logger.warning(f" ⚠️ 移动止损失败: {move_result.get('message')}")
except Exception as e:
logger.error(f"检查持仓管理失败: {e}")
# ==================== 初始余额持久化 ====================
def _load_initial_balances(self):
"""从文件加载初始余额"""
try:
import json
from pathlib import Path
file_path = Path("data/initial_balances.json")
if file_path.exists():
with open(file_path, 'r') as f:
self._initial_balances = json.load(f)
logger.info(f"📂 已加载初始余额: {self._initial_balances}")
else:
logger.info(f"📂 初始余额文件不存在,将在首次运行时创建")
self._initial_balances = {}
except Exception as e:
logger.error(f"加载初始余额失败: {e}")
self._initial_balances = {}
def _save_initial_balances(self):
"""保存初始余额到文件"""
try:
import json
from pathlib import Path
# 确保目录存在
Path("data").mkdir(exist_ok=True)
file_path = Path("data/initial_balances.json")
with open(file_path, 'w') as f:
json.dump(self._initial_balances, f, indent=2)
logger.info(f"💾 已保存初始余额: {self._initial_balances}")
except Exception as e:
logger.error(f"保存初始余额失败: {e}")
def _get_initial_balance(self, platform_name: str, current_balance: float) -> float:
"""
获取或设置平台的初始余额
Args:
platform_name: 平台名称
current_balance: 当前余额
Returns:
初始余额
"""
if platform_name not in self._initial_balances:
# 第一次运行,记录当前余额作为初始余额
self._initial_balances[platform_name] = current_balance
self._save_initial_balances()
logger.info(f"✨ [{platform_name}] 记录初始余额: ${current_balance:.2f}")
return self._initial_balances[platform_name]
async def _send_alert_notification(self, title: str, message: str):
"""发送告警通知(飞书/钉钉/Telegram"""
try:
# 飞书
if self.feishu:
await self.feishu.send_message(f"{title}\n\n{message}")
# 钉钉
if self.dingtalk:
await self.dingtalk.send_text(f"{title}\n\n{message}")
# Telegram
if self.telegram:
await self.telegram.send_message(f"{title}\n\n{message}")
except Exception as e:
logger.error(f"发送告警通知失败: {e}")
# 全局单例
_crypto_agent: Optional['CryptoAgent'] = None
def get_crypto_agent() -> 'CryptoAgent':
"""获取加密货币智能体单例"""
# 直接使用类单例,不使用全局变量(避免 reload 时重置)
return CryptoAgent()