1610 lines
69 KiB
Python
1610 lines
69 KiB
Python
"""
|
||
加密货币交易智能体 - 主控制器(LLM 驱动版)
|
||
"""
|
||
import asyncio
|
||
from typing import Dict, Any, List, Optional
|
||
from datetime import datetime, timedelta
|
||
import pandas as pd
|
||
|
||
from app.utils.logger import logger
|
||
from app.config import get_settings
|
||
from app.services.bitget_service import bitget_service
|
||
from app.services.feishu_service import get_feishu_service
|
||
from app.services.telegram_service import get_telegram_service
|
||
from app.services.paper_trading_service import get_paper_trading_service
|
||
from app.services.signal_database_service import get_signal_db_service
|
||
from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer
|
||
from app.crypto_agent.trading_decision_maker import TradingDecisionMaker
|
||
from app.utils.system_status import get_system_monitor, AgentStatus
|
||
|
||
|
||
class CryptoAgent:
|
||
"""加密货币交易信号智能体(LLM 驱动版)"""
|
||
|
||
_instance = None
|
||
_initialized = False
|
||
|
||
def __new__(cls, *args, **kwargs):
|
||
"""单例模式 - 确保只有一个实例"""
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
"""初始化智能体"""
|
||
# 防止重复初始化
|
||
if CryptoAgent._initialized:
|
||
return
|
||
|
||
CryptoAgent._initialized = True
|
||
self.settings = get_settings()
|
||
self.exchange = bitget_service # 交易所服务
|
||
self.feishu = get_feishu_service()
|
||
self.telegram = get_telegram_service()
|
||
|
||
# 新架构:市场信号分析器 + 交易决策器
|
||
self.market_analyzer = MarketSignalAnalyzer()
|
||
self.decision_maker = TradingDecisionMaker()
|
||
|
||
self.signal_db = get_signal_db_service() # 信号数据库服务
|
||
|
||
# 模拟交易服务(始终启用)
|
||
self.paper_trading = get_paper_trading_service()
|
||
|
||
# 实盘交易服务(如果配置了 API)
|
||
self.real_trading = None
|
||
try:
|
||
from app.services.real_trading_service import get_real_trading_service
|
||
self.real_trading = get_real_trading_service()
|
||
except Exception as e:
|
||
logger.warning(f"实盘交易服务初始化失败: {e}")
|
||
|
||
# 状态管理
|
||
self.last_signals: Dict[str, Dict[str, Any]] = {}
|
||
self.signal_cooldown: Dict[str, datetime] = {}
|
||
|
||
# 配置
|
||
self.symbols = self.settings.crypto_symbols.split(',')
|
||
|
||
# 运行状态
|
||
self.running = False
|
||
self._event_loop = None
|
||
|
||
# 注册到系统监控
|
||
monitor = get_system_monitor()
|
||
self._monitor_info = monitor.register_agent(
|
||
agent_id="crypto_agent",
|
||
name="加密货币智能体",
|
||
agent_type="crypto"
|
||
)
|
||
monitor.update_config("crypto_agent", {
|
||
"symbols": self.symbols,
|
||
"auto_trading_enabled": True, # 模拟交易始终启用
|
||
"analysis_interval": "每5分钟整点"
|
||
})
|
||
|
||
logger.info(f"加密货币智能体初始化完成(LLM 驱动),监控交易对: {self.symbols}")
|
||
logger.info(f"📊 交易: 始终启用")
|
||
|
||
if self.real_trading:
|
||
auto_status = "启用" if self.real_trading.get_auto_trading_status() else "禁用"
|
||
logger.info(f"实盘交易: 已配置 (自动交易: {auto_status})")
|
||
else:
|
||
logger.info(f"实盘交易: 未配置")
|
||
|
||
def _on_price_update(self, symbol: str, price: float):
|
||
"""处理实时价格更新(用于模拟交易)"""
|
||
if not self.paper_trading:
|
||
return
|
||
|
||
triggered = self.paper_trading.check_price_triggers(symbol, price)
|
||
|
||
for result in triggered:
|
||
if self._event_loop and self._event_loop.is_running():
|
||
# 根据事件类型选择不同的通知方法
|
||
event_type = result.get('event_type', 'order_closed')
|
||
if event_type == 'order_filled':
|
||
asyncio.run_coroutine_threadsafe(self._notify_order_filled(result), self._event_loop)
|
||
elif event_type == 'breakeven_triggered':
|
||
asyncio.run_coroutine_threadsafe(self._notify_breakeven_triggered(result), self._event_loop)
|
||
else:
|
||
asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop)
|
||
else:
|
||
logger.warning(f"无法发送通知: 事件循环不可用")
|
||
|
||
async def _notify_order_filled(self, result: Dict[str, Any]):
|
||
"""发送挂单成交通知"""
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
|
||
grade = result.get('signal_grade', 'N/A')
|
||
|
||
title = f"✅ 挂单成交 - {result.get('symbol')}"
|
||
|
||
content_parts = [
|
||
f"{side_icon} **方向**: {side_text}",
|
||
f"⭐ **信号等级**: {grade}",
|
||
f"💰 **挂单价**: ${result.get('entry_price', 0):,.2f}",
|
||
f"🎯 **成交价**: ${result.get('filled_price', 0):,.2f}",
|
||
f"💵 **仓位**: ${result.get('quantity', 0):,.0f}",
|
||
]
|
||
|
||
if result.get('stop_loss'):
|
||
content_parts.append(f"🛑 **止损**: ${result.get('stop_loss', 0):,.2f}")
|
||
if result.get('take_profit'):
|
||
content_parts.append(f"🎯 **止盈**: ${result.get('take_profit', 0):,.2f}")
|
||
|
||
content = "\n".join(content_parts)
|
||
|
||
if self.settings.feishu_enabled:
|
||
await self.feishu.send_card(title, content, "green")
|
||
if self.settings.telegram_enabled:
|
||
message = f"{title}\n\n{content}"
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送挂单成交通知: {result.get('order_id')}")
|
||
|
||
async def _notify_pending_cancelled(self, result: Dict[str, Any]):
|
||
"""发送挂单撤销通知"""
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
|
||
new_side_text = "做多" if result.get('new_side') == 'long' else "做空"
|
||
|
||
title = f"⚠️ 挂单撤销 - {result.get('symbol')}"
|
||
|
||
content_parts = [
|
||
f"{side_icon} **原方向**: {side_text}",
|
||
f"💰 **挂单价**: ${result.get('entry_price', 0):,.2f}",
|
||
f"",
|
||
f"📝 **原因**: 收到反向{new_side_text}信号,自动撤销",
|
||
]
|
||
|
||
content = "\n".join(content_parts)
|
||
|
||
if self.settings.feishu_enabled:
|
||
await self.feishu.send_card(title, content, "orange")
|
||
if self.settings.telegram_enabled:
|
||
message = f"{title}\n\n{content}"
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送挂单撤销通知: {result.get('order_id')}")
|
||
|
||
async def _notify_breakeven_triggered(self, result: Dict[str, Any]):
|
||
"""发送移动止损触发通知"""
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
side_icon = '🟢' if result.get('side') == 'long' else '🔴'
|
||
pnl_percent = result.get('current_pnl_percent', 0)
|
||
|
||
title = f"📈 移动止损已启动 - {result.get('symbol')}"
|
||
|
||
content_parts = [
|
||
f"{side_icon} **方向**: {side_text}",
|
||
f"",
|
||
f"💰 **开仓价**: ${result.get('filled_price', 0):,.2f}",
|
||
f"📈 **当前盈利**: {pnl_percent:+.2f}%",
|
||
f"🛑 **新止损价**: ${result.get('new_stop_loss', 0):,.2f}",
|
||
f"",
|
||
f"💰 锁定利润,让利润奔跑"
|
||
]
|
||
|
||
content = "\n".join(content_parts)
|
||
|
||
if self.settings.feishu_enabled:
|
||
await self.feishu.send_card(title, content, "green")
|
||
if self.settings.telegram_enabled:
|
||
message = f"{title}\n\n{content}"
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送移动止损通知: {result.get('order_id')}")
|
||
|
||
async def _notify_order_closed(self, result: Dict[str, Any]):
|
||
"""发送订单平仓通知"""
|
||
status = result.get('status', '')
|
||
is_win = result.get('is_win', False)
|
||
|
||
if status == 'closed_tp':
|
||
emoji = "🎯"
|
||
status_text = "止盈平仓"
|
||
color = "green"
|
||
elif status == 'closed_sl':
|
||
emoji = "🛑"
|
||
status_text = "止损平仓"
|
||
color = "red"
|
||
elif status == 'closed_be':
|
||
emoji = "📈"
|
||
status_text = "移动止损"
|
||
color = "orange"
|
||
else:
|
||
emoji = "📤"
|
||
status_text = "手动平仓"
|
||
color = "blue"
|
||
|
||
win_text = "盈利" if is_win else "亏损"
|
||
win_emoji = "✅" if is_win else "❌"
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
|
||
|
||
title = f"{emoji} 订单{status_text}"
|
||
|
||
content_parts = [
|
||
f"{side_icon} **方向**: {side_text}",
|
||
f"💰 **交易对**: {result.get('symbol')}",
|
||
f"📊 **入场**: ${result.get('entry_price', 0):,.2f}",
|
||
f"🎯 **出场**: ${result.get('exit_price', 0):,.2f}",
|
||
f"{win_emoji} **{win_text}**: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})",
|
||
f"⏱️ **持仓时间**: {result.get('hold_duration', 'N/A')}",
|
||
]
|
||
|
||
content = "\n".join(content_parts)
|
||
|
||
if self.settings.feishu_enabled:
|
||
await self.feishu.send_card(title, content, color)
|
||
if self.settings.telegram_enabled:
|
||
message = f"{title}\n\n{content}"
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送订单平仓通知: {result.get('order_id')}")
|
||
|
||
def _get_seconds_until_next_5min(self) -> int:
|
||
"""计算距离下一个5分钟整点的秒数"""
|
||
now = datetime.now()
|
||
current_minute = now.minute
|
||
current_second = now.second
|
||
|
||
minutes_past = current_minute % 5
|
||
if minutes_past == 0 and current_second == 0:
|
||
return 0
|
||
minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5
|
||
seconds_to_wait = minutes_to_wait * 60 - current_second
|
||
|
||
return seconds_to_wait
|
||
|
||
async def run(self):
|
||
"""主运行循环"""
|
||
self.running = True
|
||
self._event_loop = asyncio.get_event_loop()
|
||
|
||
# 更新状态为启动中
|
||
monitor = get_system_monitor()
|
||
monitor.update_status("crypto_agent", AgentStatus.STARTING)
|
||
|
||
# 启动横幅
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("🚀 加密货币交易信号智能体(LLM 驱动)")
|
||
logger.info("=" * 60)
|
||
logger.info(f" 监控交易对: {', '.join(self.symbols)}")
|
||
logger.info(f" 运行模式: 每5分钟整点执行")
|
||
logger.info(f" 分析引擎: LLM 自主分析")
|
||
logger.info(f" 模拟交易: 已启用")
|
||
logger.info("=" * 60 + "\n")
|
||
|
||
# 更新状态为运行中
|
||
monitor.update_status("crypto_agent", AgentStatus.RUNNING)
|
||
|
||
# 注意:不再启动独立的价格监控
|
||
# 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查
|
||
logger.info(f"模拟交易已启用(由后台统一监控)")
|
||
|
||
# 发送启动通知(卡片格式)
|
||
title = "🚀 加密货币智能体已启动"
|
||
|
||
# 构建卡片内容
|
||
content_parts = [
|
||
f"🤖 **驱动引擎**: LLM 自主分析",
|
||
f"📊 **监控交易对**: {len(self.symbols)} 个",
|
||
f" {', '.join(self.symbols)}",
|
||
f"⏰ **运行频率**: 每5分钟整点",
|
||
f"💰 **交易系统**: 已启用(后台统一监控)",
|
||
f"🎯 **分析维度**: 技术面 + 资金面 + 情绪面",
|
||
]
|
||
|
||
content = "\n".join(content_parts)
|
||
await self.feishu.send_card(title, content, "green")
|
||
await self.telegram.send_startup_notification(self.symbols)
|
||
|
||
while self.running:
|
||
try:
|
||
wait_seconds = self._get_seconds_until_next_5min()
|
||
if wait_seconds > 0:
|
||
next_run = datetime.now() + timedelta(seconds=wait_seconds)
|
||
logger.info(f"⏳ 等待 {wait_seconds} 秒,下次运行: {next_run.strftime('%H:%M:%S')}")
|
||
await asyncio.sleep(wait_seconds)
|
||
|
||
run_time = datetime.now()
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]")
|
||
logger.info("=" * 60)
|
||
|
||
for symbol in self.symbols:
|
||
await self.analyze_symbol(symbol)
|
||
|
||
logger.info("\n" + "─" * 60)
|
||
logger.info(f"✅ 本轮分析完成,共分析 {len(self.symbols)} 个交易对")
|
||
logger.info("─" * 60 + "\n")
|
||
|
||
await asyncio.sleep(2)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 分析循环出错: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
await asyncio.sleep(10)
|
||
|
||
def stop(self):
|
||
"""停止运行"""
|
||
self.running = False
|
||
|
||
# 更新状态
|
||
monitor = get_system_monitor()
|
||
monitor.update_status("crypto_agent", AgentStatus.STOPPED)
|
||
|
||
logger.info("加密货币智能体已停止")
|
||
|
||
def _check_volatility(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[bool, str, float]:
|
||
"""
|
||
检查波动率,判断是否值得进行 LLM 分析(组合方案)
|
||
|
||
使用 1 小时 K 线判断趋势波动,5 分钟 K 线检测突发波动
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
data: 多周期K线数据
|
||
|
||
Returns:
|
||
(should_analyze, reason, volatility_percent)
|
||
should_analyze: 是否应该进行分析
|
||
reason: 原因说明
|
||
volatility_percent: 1小时波动率百分比
|
||
"""
|
||
# 检查是否启用波动率过滤
|
||
if not self.settings.crypto_volatility_filter_enabled:
|
||
return True, "波动率过滤未启用", 0
|
||
|
||
try:
|
||
# 1. 首先检查 1 小时趋势波动率
|
||
df_1h = data.get('1h')
|
||
if df_1h is None or len(df_1h) < 20:
|
||
# 数据不足,保守起见允许分析
|
||
return True, "1小时数据不足,允许分析", 0
|
||
|
||
# 获取最近20根K线
|
||
recent_1h = df_1h.iloc[-20:]
|
||
|
||
# 计算最高价和最低价
|
||
high = recent_1h['high'].max()
|
||
low = recent_1h['low'].min()
|
||
current_price = float(recent_1h.iloc[-1]['close'])
|
||
|
||
# 计算1小时波动率
|
||
if low > 0:
|
||
volatility_1h_percent = ((high - low) / low) * 100
|
||
else:
|
||
volatility_1h_percent = 0
|
||
|
||
# 计算价格变化范围(相对于当前价格)
|
||
price_range_high_percent = ((high - current_price) / current_price) * 100 if current_price > 0 else 0
|
||
price_range_low_percent = ((current_price - low) / current_price) * 100 if current_price > 0 else 0
|
||
|
||
# 从配置读取阈值
|
||
min_volatility = self.settings.crypto_min_volatility_percent
|
||
min_price_range = self.settings.crypto_min_price_range_percent
|
||
|
||
# 如果1小时波动率足够大,直接允许分析
|
||
if volatility_1h_percent >= min_volatility or price_range_high_percent >= min_price_range or price_range_low_percent >= min_price_range:
|
||
return True, f"1小时趋势活跃 (波动率 {volatility_1h_percent:.2f}%),值得分析", volatility_1h_percent
|
||
|
||
# 2. 1小时波动率较低,检查5分钟突发波动
|
||
df_5m = data.get('5m')
|
||
if df_5m is not None and len(df_5m) >= 3:
|
||
# 获取最近3根5分钟K线(15分钟内的变化)
|
||
recent_5m = df_5m.iloc[-3:]
|
||
|
||
# 计算5分钟价格变化幅度
|
||
price_start = float(recent_5m.iloc[0]['close'])
|
||
price_end = float(recent_5m.iloc[-1]['close'])
|
||
|
||
if price_start > 0:
|
||
price_change_5m = abs(price_end - price_start) / price_start * 100
|
||
else:
|
||
price_change_5m = 0
|
||
|
||
# 从配置读取5分钟突发波动阈值
|
||
surge_threshold = self.settings.crypto_5m_surge_threshold
|
||
|
||
logger.debug(f"{symbol} 5分钟价格变化: {price_start:.2f} -> {price_end:.2f} = {price_change_5m:.2f}% (阈值: {surge_threshold}%)")
|
||
|
||
# 如果5分钟突发波动超过阈值,仍然允许分析
|
||
if price_change_5m >= surge_threshold:
|
||
direction = "上涨" if price_end > price_start else "下跌"
|
||
return True, f"5分钟突发{direction} ({price_change_5m:.2f}% > {surge_threshold}%),强制分析", volatility_1h_percent
|
||
|
||
# 3. 波动率过低,跳过分析
|
||
reason = f"波动率过低 (1小时: {volatility_1h_percent:.2f}% < {min_volatility}%, 5分钟无突发波动),跳过分析"
|
||
logger.info(f"⏸️ {symbol} {reason}")
|
||
return False, reason, volatility_1h_percent
|
||
|
||
except Exception as e:
|
||
logger.warning(f"{symbol} 波动率检查失败: {e},允许分析")
|
||
return True, "波动率检查失败,允许分析", 0
|
||
|
||
async def analyze_symbol(self, symbol: str):
|
||
"""
|
||
分析单个交易对(新架构:市场分析 + 交易决策分离)
|
||
|
||
新架构流程:
|
||
1. 市场信号分析器分析市场(不包含仓位信息)
|
||
2. 交易决策器根据信号+仓位+账户状态做决策
|
||
3. 执行交易决策
|
||
|
||
Args:
|
||
symbol: 交易对,如 'BTCUSDT'
|
||
"""
|
||
try:
|
||
# 更新活动时间
|
||
monitor = get_system_monitor()
|
||
monitor.update_activity("crypto_agent")
|
||
|
||
logger.info(f"\n{'─' * 50}")
|
||
logger.info(f"📊 {symbol} 分析开始")
|
||
logger.info(f"{'─' * 50}")
|
||
|
||
# 1. 获取多周期数据
|
||
data = self.exchange.get_multi_timeframe_data(symbol)
|
||
|
||
if not self._validate_data(data):
|
||
logger.warning(f"⚠️ {symbol} 数据不完整,跳过分析")
|
||
return
|
||
|
||
# 当前价格
|
||
current_price = float(data['5m'].iloc[-1]['close'])
|
||
price_change_24h = self._calculate_price_change(data['1h'])
|
||
logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})")
|
||
|
||
# 1.5. 波动率检查(节省 LLM 调用)
|
||
should_analyze, volatility_reason, volatility = self._check_volatility(symbol, data)
|
||
if not should_analyze:
|
||
logger.info(f"⏸️ {volatility_reason},跳过本次 LLM 分析")
|
||
return
|
||
|
||
# ============================================================
|
||
# 第一阶段:市场信号分析(不包含仓位信息)
|
||
# ============================================================
|
||
logger.info(f"\n🤖 【第一阶段:市场信号分析】")
|
||
|
||
market_signal = await self.market_analyzer.analyze(
|
||
symbol, data,
|
||
symbols=self.symbols
|
||
)
|
||
|
||
# 输出市场分析结果
|
||
self._log_market_signal(market_signal)
|
||
|
||
# 过滤掉 wait 信号,只保留 buy/sell 信号
|
||
signals = market_signal.get('signals', [])
|
||
trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']]
|
||
|
||
if not trade_signals:
|
||
logger.info(f"\n⏸️ 结论: 无交易信号(仅有观望建议),继续观望")
|
||
return
|
||
|
||
# 检查是否有达到阈值的交易信号
|
||
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
|
||
valid_signals = [s for s in trade_signals if s.get('confidence', 0) >= threshold]
|
||
|
||
if not valid_signals:
|
||
logger.info(f"\n⏸️ 结论: 无交易信号达到置信度阈值 ({threshold}%),继续观望")
|
||
return
|
||
|
||
logger.info(f"\n✅ 发现 {len(valid_signals)} 个有效交易信号(达到 {threshold}% 阈值)")
|
||
|
||
# ============================================================
|
||
# 发送市场信号通知(独立于交易决策)
|
||
# ============================================================
|
||
await self._send_market_signal_notification(market_signal, current_price)
|
||
|
||
# ============================================================
|
||
# 第二阶段:交易决策(信号 + 仓位 + 账户状态)
|
||
# 模拟交易和实盘交易分别进行独立决策
|
||
# ============================================================
|
||
logger.info(f"\n🤖 【第二阶段:交易决策】")
|
||
|
||
# 获取配置
|
||
paper_trading_enabled = self.settings.paper_trading_enabled
|
||
real_trading_enabled = self.settings.real_trading_enabled
|
||
|
||
# 分别存储模拟和实盘的决策
|
||
paper_decision = None
|
||
real_decision = None
|
||
|
||
# 模拟交易决策
|
||
if paper_trading_enabled:
|
||
logger.info(f"\n📊 【模拟交易决策】")
|
||
positions, account = self._get_trading_state(use_real_trading=False)
|
||
paper_decision = await self.decision_maker.make_decision(
|
||
market_signal, positions, account, current_price
|
||
)
|
||
self._log_trading_decision(paper_decision)
|
||
else:
|
||
logger.info(f"⏸️ 模拟交易未启用")
|
||
|
||
# 实盘交易决策
|
||
if real_trading_enabled:
|
||
logger.info(f"\n💰 【实盘交易决策】")
|
||
# 检查是否开启自动交易
|
||
if self.real_trading and self.real_trading.get_auto_trading_status():
|
||
positions, account = self._get_trading_state(use_real_trading=True)
|
||
real_decision = await self.decision_maker.make_decision(
|
||
market_signal, positions, account, current_price
|
||
)
|
||
self._log_trading_decision(real_decision)
|
||
else:
|
||
logger.info(f"⏸️ 实盘自动交易未开启")
|
||
else:
|
||
logger.info(f"⏸️ 实盘交易未启用")
|
||
|
||
# ============================================================
|
||
# 第三阶段:执行交易决策
|
||
# ============================================================
|
||
await self._execute_decisions(paper_decision, real_decision, market_signal, current_price)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 分析 {symbol} 出错: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
def _log_market_signal(self, signal: Dict[str, Any]):
|
||
"""输出市场信号分析结果"""
|
||
logger.info(f" 市场状态: {signal.get('market_state')}")
|
||
logger.info(f" 趋势: {signal.get('trend')}")
|
||
|
||
# 新闻情绪
|
||
news_sentiment = signal.get('news_sentiment', '')
|
||
if news_sentiment:
|
||
sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': '➖'}.get(news_sentiment, '')
|
||
logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}")
|
||
|
||
# 关键价位
|
||
import re
|
||
key_levels = signal.get('key_levels', {})
|
||
if key_levels.get('support') or key_levels.get('resistance'):
|
||
# 从字符串中提取数字(处理 "66065 (15m布林下轨)" 这种格式)
|
||
def extract_number(val):
|
||
if isinstance(val, (int, float)):
|
||
return float(val)
|
||
if isinstance(val, str):
|
||
# 提取第一个数字
|
||
match = re.search(r'[\d,]+\.?\d*', val.replace(',', ''))
|
||
if match:
|
||
return float(match.group())
|
||
return None
|
||
|
||
supports = [extract_number(s) for s in key_levels.get('support', [])[:2]]
|
||
resistances = [extract_number(r) for r in key_levels.get('resistance', [])[:2]]
|
||
support_str = ', '.join([f"${s:,.2f}" for s in supports if s is not None])
|
||
resistance_str = ', '.join([f"${r:,.2f}" for r in resistances if r is not None])
|
||
logger.info(f" 支撑位: {support_str or '-'}")
|
||
logger.info(f" 阻力位: {resistance_str or '-'}")
|
||
|
||
# 信号列表 - 区分交易信号和观望建议
|
||
signals = signal.get('signals', [])
|
||
trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']]
|
||
wait_signals = [s for s in signals if s.get('action') == 'wait']
|
||
|
||
if trade_signals:
|
||
logger.info(f"\n🎯 【发现 {len(trade_signals)} 个交易信号】")
|
||
for i, sig in enumerate(trade_signals, 1):
|
||
signal_type = sig.get('timeframe', 'unknown')
|
||
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
|
||
type_text = type_map.get(signal_type, signal_type)
|
||
|
||
action = sig.get('action', 'hold')
|
||
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'}
|
||
action_text = action_map.get(action, action)
|
||
|
||
confidence = sig.get('confidence', 0)
|
||
|
||
logger.info(f"\n [{i}] {type_text} | {action_text}")
|
||
logger.info(f" 信心度: {confidence}%")
|
||
logger.info(f" 入场: ${sig.get('entry_zone', 'N/A')}")
|
||
logger.info(f" 止损: ${sig.get('stop_loss', 'N/A')}")
|
||
logger.info(f" 止盈: ${sig.get('take_profit', 'N/A')}")
|
||
logger.info(f" 理由: {sig.get('reasoning', 'N/A')}")
|
||
|
||
if wait_signals:
|
||
logger.info(f"\n📋 【{len(wait_signals)} 个观望建议(不触发交易)】")
|
||
for i, sig in enumerate(wait_signals, 1):
|
||
signal_type = sig.get('timeframe', 'unknown')
|
||
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
|
||
type_text = type_map.get(signal_type, signal_type)
|
||
|
||
confidence = sig.get('confidence', 0)
|
||
|
||
logger.info(f"\n [{i}] {type_text} | 观望")
|
||
logger.info(f" 信心度: {confidence}%")
|
||
logger.info(f" 理由: {sig.get('reasoning', 'N/A')}")
|
||
|
||
def _log_trading_decision(self, decision: Dict[str, Any]):
|
||
"""输出交易决策结果"""
|
||
decision_type = decision.get('decision', 'HOLD')
|
||
decision_map = {
|
||
'OPEN': '🟢 开仓',
|
||
'CLOSE': '🔴 平仓',
|
||
'ADD': '➕ 加仓',
|
||
'REDUCE': '➖ 减仓',
|
||
'HOLD': '⏸️ 观望'
|
||
}
|
||
|
||
logger.info(f" 决策: {decision_map.get(decision_type, decision_type)}")
|
||
logger.info(f" 动作: {decision.get('action', 'N/A')}")
|
||
logger.info(f" 仓位: {decision.get('position_size', 'N/A')}")
|
||
|
||
# quantity 是保证金,显示持仓价值 = 保证金 × 20
|
||
quantity = decision.get('quantity', 0)
|
||
if isinstance(quantity, (int, float)) and quantity > 0:
|
||
logger.info(f" 持仓价值: ${quantity * 20:,.2f} (保证金 ${quantity:.2f})")
|
||
else:
|
||
logger.info(f" 数量: ${decision.get('quantity', 'N/A')}")
|
||
|
||
if decision.get('stop_loss'):
|
||
logger.info(f" 止损: ${decision.get('stop_loss')}")
|
||
if decision.get('take_profit'):
|
||
logger.info(f" 止盈: ${decision.get('take_profit')}")
|
||
|
||
logger.info(f" 理由: {decision.get('reasoning', 'N/A')}")
|
||
|
||
risk = decision.get('risk_analysis', '')
|
||
if risk:
|
||
logger.info(f" 风险: {risk}")
|
||
|
||
def _get_trading_state(self, use_real_trading: bool = False) -> tuple:
|
||
"""
|
||
获取交易状态(持仓和账户)
|
||
|
||
Args:
|
||
use_real_trading: True 获取实盘状态,False 获取模拟交易状态
|
||
"""
|
||
if use_real_trading and self.real_trading:
|
||
# 实盘交易
|
||
active_orders = self.real_trading.get_active_orders()
|
||
account = self.real_trading.get_account_status()
|
||
else:
|
||
# 模拟交易
|
||
active_orders = self.paper_trading.get_active_orders()
|
||
account = self.paper_trading.get_account_status()
|
||
|
||
# 转换为标准格式(只返回已成交的订单作为持仓)
|
||
position_list = []
|
||
for order in active_orders:
|
||
# 只返回已成交(OPEN 状态且有 filled_price)的订单
|
||
if order.get('status') == 'open' and order.get('filled_price'):
|
||
position_list.append({
|
||
'symbol': order.get('symbol'),
|
||
'side': order.get('side'),
|
||
'holding': order.get('quantity', 0), # 使用 quantity 作为持仓量
|
||
'entry_price': order.get('filled_price') or order.get('entry_price'),
|
||
'stop_loss': order.get('stop_loss'),
|
||
'take_profit': order.get('take_profit')
|
||
})
|
||
|
||
return position_list, account
|
||
|
||
async def _execute_decisions(self, paper_decision: Dict[str, Any],
|
||
real_decision: Dict[str, Any],
|
||
market_signal: Dict[str, Any], current_price: float):
|
||
"""执行交易决策(模拟和实盘分别执行)"""
|
||
# 选择最佳信号用于保存
|
||
best_signal = self._get_best_signal_from_market(market_signal)
|
||
|
||
# 保存信号到数据库(只保存一次)
|
||
if best_signal:
|
||
signal_to_save = best_signal.copy()
|
||
signal_to_save['signal_type'] = 'crypto'
|
||
signal_to_save['symbol'] = market_signal.get('symbol')
|
||
signal_to_save['current_price'] = current_price
|
||
self.signal_db.add_signal(signal_to_save)
|
||
|
||
# 获取配置
|
||
paper_trading_enabled = self.settings.paper_trading_enabled
|
||
real_trading_enabled = self.settings.real_trading_enabled
|
||
|
||
# 记录执行结果
|
||
paper_executed = False
|
||
real_executed = False
|
||
|
||
# ============================================================
|
||
# 执行模拟交易决策
|
||
# ============================================================
|
||
if paper_trading_enabled and paper_decision:
|
||
decision_type = paper_decision.get('decision', 'HOLD')
|
||
|
||
if decision_type == 'HOLD':
|
||
reasoning = paper_decision.get('reasoning', '观望')
|
||
logger.info(f"\n📊 模拟交易: {reasoning}")
|
||
# 有信号但决策为 HOLD,发送未执行通知
|
||
await self._notify_signal_not_executed(market_signal, paper_decision, current_price, is_paper=True)
|
||
else:
|
||
logger.info(f"\n📊 【执行模拟交易】")
|
||
|
||
if decision_type in ['OPEN', 'ADD']:
|
||
# 先执行交易
|
||
result = await self._execute_paper_trade(paper_decision, market_signal, current_price)
|
||
# 检查是否成功执行
|
||
order = result.get('order') if result else None
|
||
if order:
|
||
# 只有成功创建订单后才发送通知
|
||
await self._send_signal_notification(market_signal, paper_decision, current_price, is_paper=True)
|
||
paper_executed = True
|
||
else:
|
||
# 有信号但订单创建失败,发送未执行通知
|
||
reason = result.get('message', '订单创建失败') if result else '订单创建失败'
|
||
await self._notify_signal_not_executed(market_signal, paper_decision, current_price, is_paper=True, reason=reason)
|
||
logger.warning(f" ⚠️ 模拟交易未执行,已发送通知")
|
||
elif decision_type == 'CLOSE':
|
||
await self._execute_close(paper_decision, paper_trading=True)
|
||
paper_executed = True
|
||
|
||
# ============================================================
|
||
# 执行实盘交易决策
|
||
# ============================================================
|
||
if real_trading_enabled and real_decision:
|
||
# 检查是否开启自动交易
|
||
if self.real_trading and self.real_trading.get_auto_trading_status():
|
||
decision_type = real_decision.get('decision', 'HOLD')
|
||
|
||
if decision_type == 'HOLD':
|
||
reasoning = real_decision.get('reasoning', '观望')
|
||
logger.info(f"\n💰 实盘交易: {reasoning}")
|
||
# 有信号但决策为 HOLD,发送未执行通知
|
||
await self._notify_signal_not_executed(market_signal, real_decision, current_price, is_paper=False)
|
||
else:
|
||
logger.info(f"\n💰 【执行实盘交易】")
|
||
|
||
if decision_type in ['OPEN', 'ADD']:
|
||
# 先执行交易
|
||
result = await self._execute_real_trade(real_decision, market_signal, current_price)
|
||
# 检查是否成功执行
|
||
if result and result.get('success'):
|
||
# 只有成功创建订单后才发送通知
|
||
await self._send_signal_notification(market_signal, real_decision, current_price, is_paper=False)
|
||
real_executed = True
|
||
else:
|
||
# 有信号但订单创建失败,发送未执行通知
|
||
reason = result.get('message', '订单创建失败') if result else '订单创建失败'
|
||
await self._notify_signal_not_executed(market_signal, real_decision, current_price, is_paper=False, reason=reason)
|
||
logger.warning(f" ⚠️ 实盘交易未执行,已发送通知")
|
||
elif decision_type == 'CLOSE':
|
||
await self._execute_close(real_decision, paper_trading=False)
|
||
real_executed = True
|
||
|
||
# 如果都没有执行,给出提示
|
||
if not paper_executed and not real_executed:
|
||
logger.info(f"\n⏸️ 所有交易均为观望,无需执行")
|
||
|
||
def _get_best_signal_from_market(self, market_signal: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""从市场信号中获取最佳信号"""
|
||
signals = market_signal.get('signals', [])
|
||
if not signals:
|
||
return {}
|
||
|
||
# 按信心度排序,取最高的
|
||
sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True)
|
||
return sorted_signals[0]
|
||
|
||
async def _send_market_signal_notification(self, market_signal: Dict[str, Any],
|
||
current_price: float):
|
||
"""发送市场信号通知(第一阶段)- 调用前已确保有有效信号"""
|
||
try:
|
||
# 获取配置的阈值
|
||
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
|
||
|
||
# 过滤达到阈值的信号(防御性检查)
|
||
signals = market_signal.get('signals', [])
|
||
valid_signals = [s for s in signals if s.get('confidence', 0) >= threshold]
|
||
|
||
if not valid_signals:
|
||
return
|
||
|
||
# 取最佳信号(按信心度排序)
|
||
best_signal = sorted(valid_signals, key=lambda x: x.get('confidence', 0), reverse=True)[0]
|
||
|
||
# 构建通知消息 - 完全匹配旧格式
|
||
symbol = market_signal.get('symbol')
|
||
market_state = market_signal.get('market_state')
|
||
trend = market_signal.get('trend')
|
||
|
||
# 注意:经过 market_signal_analyzer 解析后
|
||
# - 'action' 是 buy/sell/wait (交易方向)
|
||
# - 'timeframe' 是 short_term/medium_term/long_term (周期)
|
||
sig_action = best_signal.get('action', 'hold') # buy/sell/wait
|
||
timeframe = best_signal.get('timeframe', 'unknown') # short_term/medium_term/long_term
|
||
confidence = best_signal.get('confidence', 0)
|
||
entry_val = best_signal.get('entry_zone', 'N/A')
|
||
sl_val = best_signal.get('stop_loss', 'N/A')
|
||
tp_val = best_signal.get('take_profit', 'N/A')
|
||
reasoning = best_signal.get('reasoning', '无')
|
||
|
||
# 格式化价格用于显示(处理 float 和 N/A)
|
||
def format_price(price_value):
|
||
if price_value is None or price_value == 'N/A':
|
||
return 'N/A'
|
||
if isinstance(price_value, float):
|
||
return f"{price_value:,.2f}"
|
||
return str(price_value)
|
||
|
||
entry = format_price(entry_val)
|
||
sl = format_price(sl_val)
|
||
tp = format_price(tp_val)
|
||
|
||
# 类型映射
|
||
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
|
||
timeframe_text = type_map.get(timeframe, timeframe)
|
||
|
||
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空', 'hold': '➖ 观望'}
|
||
action_text = action_map.get(sig_action, sig_action)
|
||
|
||
# 入场类型 - 从信号中获取
|
||
entry_type = best_signal.get('entry_type', 'market')
|
||
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
|
||
entry_type_icon = '⚡' if entry_type == 'market' else '⏳'
|
||
|
||
# 等级(基于信心度映射)- 与 market_signal_analyzer.py 保持一致
|
||
# A级(80-100): 量价配合 + 多指标共振 + 多周期确认
|
||
# B级(60-79): 量价配合 + 主要指标确认
|
||
# C级(40-59): 有机会但量价不够理想
|
||
# D级(<40): 量价背离或信号矛盾
|
||
if confidence >= 80:
|
||
grade = 'A'
|
||
grade_icon = '⭐⭐⭐'
|
||
elif confidence >= 60:
|
||
grade = 'B'
|
||
grade_icon = '⭐⭐'
|
||
elif confidence >= 40:
|
||
grade = 'C'
|
||
grade_icon = '⭐'
|
||
else:
|
||
grade = 'D'
|
||
grade_icon = ''
|
||
|
||
# 仓位(基于信心度和杠杆空间)- 与新的等级阈值对齐
|
||
if confidence >= 80: # A级信号
|
||
position_size = 'heavy'
|
||
position_icon = '🔥'
|
||
position_text = '重仓'
|
||
elif confidence >= 60: # B级信号
|
||
position_size = 'medium'
|
||
position_icon = '📊'
|
||
position_text = '中仓'
|
||
else: # C级或D级信号
|
||
position_size = 'light'
|
||
position_icon = '🌱'
|
||
position_text = '轻仓'
|
||
|
||
# 计算止损止盈百分比(价格已经是 float)
|
||
try:
|
||
# 使用当前价格作为入场价(如果 entry_zone 是 N/A)
|
||
entry_for_calc = entry_val if isinstance(entry_val, (int, float)) else current_price
|
||
|
||
if isinstance(sl_val, (int, float)) and isinstance(entry_for_calc, (int, float)) and entry_for_calc > 0:
|
||
if sig_action == 'buy':
|
||
sl_percent = ((sl_val - entry_for_calc) / entry_for_calc * 100)
|
||
else:
|
||
sl_percent = ((entry_for_calc - sl_val) / entry_for_calc * 100)
|
||
sl_display = f"{sl_percent:+.1f}%"
|
||
else:
|
||
sl_display = "N/A"
|
||
|
||
if isinstance(tp_val, (int, float)) and isinstance(entry_for_calc, (int, float)) and entry_for_calc > 0:
|
||
if sig_action == 'buy':
|
||
tp_percent = ((tp_val - entry_for_calc) / entry_for_calc * 100)
|
||
else:
|
||
tp_percent = ((entry_for_calc - tp_val) / entry_for_calc * 100)
|
||
tp_display = f"{tp_percent:+.1f}%"
|
||
else:
|
||
tp_display = "N/A"
|
||
except:
|
||
sl_display = "N/A"
|
||
tp_display = "N/A"
|
||
|
||
# 构建卡片标题和颜色
|
||
if sig_action == 'buy':
|
||
title = f"🟢 {symbol} {timeframe_text}做多信号"
|
||
color = "green"
|
||
else:
|
||
title = f"🔴 {symbol} {timeframe_text}做空信号"
|
||
color = "red"
|
||
|
||
# 构建卡片内容
|
||
content_parts = [
|
||
f"**{timeframe_text}** | **{grade}**{grade_icon} | **{confidence}%** 置信度",
|
||
f"{entry_type_icon} **入场**: {entry_type_text} | {position_icon} **仓位**: {position_text}",
|
||
f"",
|
||
]
|
||
|
||
# 入场价格显示
|
||
if entry_type == 'limit':
|
||
# 限价订单:显示建议挂单价格和当前价格
|
||
content_parts.append(f"📋 **挂单价格**: ${entry}")
|
||
content_parts.append(f"📍 **当前价格**: ${format_price(current_price)}")
|
||
else:
|
||
# 市价订单:显示当前价格作为入场价
|
||
content_parts.append(f"💰 **入场价**: ${entry}")
|
||
|
||
if sl != 'N/A':
|
||
content_parts.append(f"🛑 **止损价**: ${sl} ({sl_display})")
|
||
if tp != 'N/A':
|
||
content_parts.append(f"🎯 **止盈价**: ${tp} ({tp_display})")
|
||
|
||
content_parts.append(f"")
|
||
content_parts.append(f"📝 **分析理由**:")
|
||
content_parts.append(f"{reasoning}")
|
||
|
||
content = "\n".join(content_parts)
|
||
|
||
# 根据配置发送通知
|
||
if self.settings.feishu_enabled:
|
||
await self.feishu.send_card(title, content, color)
|
||
if self.settings.telegram_enabled:
|
||
# Telegram 使用文本格式
|
||
message = f"{title}\n\n{content}"
|
||
await self.telegram.send_message(message)
|
||
logger.info(f" 📤 已发送市场信号通知 (阈值: {threshold}%)")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"发送市场信号通知失败: {e}")
|
||
import traceback
|
||
logger.debug(traceback.format_exc())
|
||
|
||
async def _send_signal_notification(self, market_signal: Dict[str, Any],
|
||
decision: Dict[str, Any], current_price: float,
|
||
is_paper: bool = True):
|
||
"""发送交易执行通知(第三阶段)"""
|
||
try:
|
||
decision_type = decision.get('decision', 'HOLD')
|
||
|
||
# 只在非观望决策时发送执行通知
|
||
if decision_type == 'HOLD':
|
||
return
|
||
|
||
# 构建消息 - 使用旧格式风格
|
||
symbol = market_signal.get('symbol')
|
||
action = decision.get('action', '')
|
||
reasoning = decision.get('reasoning', '')
|
||
risk_analysis = decision.get('risk_analysis', '')
|
||
position_size = decision.get('position_size', 'N/A')
|
||
quantity = decision.get('quantity', 'N/A')
|
||
stop_loss = decision.get('stop_loss', '')
|
||
take_profit = decision.get('take_profit', '')
|
||
confidence = decision.get('confidence', 0)
|
||
|
||
# 决策类型映射
|
||
decision_map = {
|
||
'OPEN': '开仓',
|
||
'CLOSE': '平仓',
|
||
'ADD': '加仓',
|
||
'REDUCE': '减仓'
|
||
}
|
||
decision_text = decision_map.get(decision_type, decision_type)
|
||
|
||
# 账户类型标识
|
||
account_type = "📊" if is_paper else "💰"
|
||
|
||
# 方向图标
|
||
if 'long' in action.lower() or 'buy' in action.lower():
|
||
action_icon = '🟢'
|
||
action_text = '做多'
|
||
elif 'short' in action.lower() or 'sell' in action.lower():
|
||
action_icon = '🔴'
|
||
action_text = '做空'
|
||
else:
|
||
action_icon = '➖'
|
||
action_text = action
|
||
|
||
# 从市场信号中获取入场方式(需要在构建标题之前)
|
||
best_signal = self._get_best_signal_from_market(market_signal)
|
||
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
|
||
entry_type_text = '现价单' if entry_type == 'market' else '挂单'
|
||
entry_type_icon = '⚡' if entry_type == 'market' else '⏳'
|
||
|
||
# 仓位图标
|
||
position_map = {'heavy': '🔥 重仓', 'medium': '📊 中仓', 'light': '🌱 轻仓'}
|
||
position_display = position_map.get(position_size, '🌱 轻仓')
|
||
|
||
# 构建卡片标题和颜色 - 考虑入场方式
|
||
# 挂单时标题显示"挂单",现价单时显示"开仓"/"平仓"等
|
||
if decision_type == 'OPEN':
|
||
decision_title = '挂单' if entry_type == 'limit' else '开仓'
|
||
title = f"{account_type} {symbol} {decision_title}"
|
||
color = "green"
|
||
elif decision_type == 'CLOSE':
|
||
decision_title = '挂单' if entry_type == 'limit' else '平仓'
|
||
title = f"{account_type} {symbol} {decision_title}"
|
||
color = "orange"
|
||
elif decision_type == 'ADD':
|
||
decision_title = '挂单' if entry_type == 'limit' else '加仓'
|
||
title = f"{account_type} {symbol} {decision_title}"
|
||
color = "green"
|
||
elif decision_type == 'REDUCE':
|
||
decision_title = '挂单' if entry_type == 'limit' else '减仓'
|
||
title = f"{account_type} {symbol} {decision_title}"
|
||
color = "orange"
|
||
else:
|
||
title = f"{account_type} {symbol} 交易执行"
|
||
color = "blue"
|
||
|
||
# 构建卡片内容
|
||
# quantity 是保证金金额,需要显示持仓价值 = 保证金 × 杠杆
|
||
margin = quantity if quantity != 'N/A' else 0
|
||
leverage = 20 # 模拟交易固定 20x 杠杆
|
||
position_value = margin * leverage if isinstance(margin, (int, float)) else 'N/A'
|
||
position_value_display = f"${position_value:,.2f}" if isinstance(position_value, (int, float)) else "N/A"
|
||
|
||
# 根据入场方式显示不同的价格信息
|
||
if entry_type == 'market':
|
||
price_display = f"💵 **入场价**: ${current_price:,.2f} (现价)"
|
||
else:
|
||
entry_price = best_signal.get('entry_zone', current_price) if best_signal else current_price
|
||
price_display = f"💵 **挂单价**: ${entry_price:,.2f} (等待)"
|
||
|
||
content_parts = [
|
||
f"{action_icon} **操作**: {decision_text} ({action_text})",
|
||
f"{entry_type_icon} **入场方式**: {entry_type_text}",
|
||
f"{position_display.replace(' ', ': **')} | 📈 信心度: **{confidence}%**",
|
||
f"",
|
||
f"💰 **持仓价值**: {position_value_display}",
|
||
price_display,
|
||
]
|
||
|
||
if stop_loss:
|
||
content_parts.append(f"🛑 **止损价**: ${stop_loss}")
|
||
if take_profit:
|
||
content_parts.append(f"🎯 **止盈价**: ${take_profit}")
|
||
|
||
# 简洁的决策理由和风险分析(各1句话)
|
||
content_parts.append(f"")
|
||
content_parts.append(f"📝 **决策**: {reasoning}")
|
||
|
||
if risk_analysis:
|
||
content_parts.append(f"⚠️ **风险**: {risk_analysis}")
|
||
|
||
content_parts.append(f"")
|
||
content_parts.append(f"💼 交易已执行")
|
||
|
||
content = "\n".join(content_parts)
|
||
|
||
# 根据配置发送通知
|
||
if self.settings.feishu_enabled:
|
||
await self.feishu.send_card(title, content, color)
|
||
if self.settings.telegram_enabled:
|
||
# Telegram 使用文本格式
|
||
message = f"{title}\n\n{content}"
|
||
await self.telegram.send_message(message)
|
||
logger.info(f" 📤 已发送交易执行通知: {decision_text}")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"发送交易执行通知失败: {e}")
|
||
|
||
async def _execute_paper_trade(self, decision: Dict[str, Any], market_signal: Dict[str, Any], current_price: float):
|
||
"""执行模拟交易"""
|
||
try:
|
||
symbol = decision.get('symbol')
|
||
action = decision.get('action', '')
|
||
position_size = decision.get('position_size', 'light')
|
||
|
||
# 直接使用 LLM 决策的 quantity
|
||
quantity = decision.get('quantity', 0)
|
||
if quantity <= 0:
|
||
logger.warning(f" ⚠️ LLM 决策的 quantity 无效: {quantity},使用默认值")
|
||
quantity = self._calculate_quantity_by_position_size(position_size, real_trading=False)
|
||
|
||
logger.info(f" 准备创建模拟订单: {symbol} {action} {position_size}")
|
||
logger.info(f" LLM 决策金额: ${quantity:.2f} USDT")
|
||
|
||
# 转换决策的 action 为 paper_trading 期望的格式
|
||
trading_action = self._convert_trading_action(action)
|
||
|
||
# 从市场信号中获取入场方式和入场价格
|
||
best_signal = self._get_best_signal_from_market(market_signal)
|
||
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
|
||
entry_price = best_signal.get('entry_zone', current_price) if best_signal else current_price
|
||
|
||
logger.info(f" 入场方式: {entry_type} | 入场价格: ${entry_price:,.2f}")
|
||
|
||
# 转换决策为订单格式(价格字段已在 LLM 解析时转换为 float)
|
||
order_data = {
|
||
'symbol': symbol,
|
||
'action': trading_action, # 使用转换后的 action
|
||
'entry_type': entry_type, # 使用信号中的入场方式
|
||
'entry_price': entry_price if entry_type == 'limit' else current_price, # limit单使用entry_zone,market单使用current_price
|
||
'stop_loss': decision.get('stop_loss'),
|
||
'take_profit': decision.get('take_profit'),
|
||
'confidence': decision.get('confidence', 50),
|
||
'signal_grade': 'B', # 默认B级
|
||
'position_size': position_size,
|
||
'quantity': quantity # 使用 LLM 决策的金额
|
||
}
|
||
|
||
logger.debug(f" 订单数据: {order_data}")
|
||
|
||
result = self.paper_trading.create_order_from_signal(order_data, current_price)
|
||
|
||
logger.debug(f" 创建订单结果: {result}")
|
||
|
||
# 记录订单
|
||
order = result.get('order')
|
||
if order:
|
||
# quantity 是保证金金额,持仓价值 = 保证金 × 20
|
||
position_value = quantity * 20
|
||
logger.info(f" 📝 已创建模拟订单: {order.order_id} | 仓位: {position_size} | 持仓价值: ${position_value:.2f}")
|
||
else:
|
||
logger.warning(f" ⚠️ 创建模拟订单失败: {result}")
|
||
|
||
# 返回结果
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行模拟交易失败: {e}")
|
||
import traceback
|
||
logger.debug(traceback.format_exc())
|
||
|
||
def _convert_trading_action(self, action: str) -> str:
|
||
"""转换交易决策的 action 为 buy/sell 格式"""
|
||
if not action:
|
||
return 'buy'
|
||
|
||
action_lower = action.lower()
|
||
|
||
# 做多相关的 action
|
||
if 'long' in action_lower or 'buy' in action_lower:
|
||
return 'buy'
|
||
# 做空相关的 action
|
||
elif 'short' in action_lower or 'sell' in action_lower:
|
||
return 'sell'
|
||
|
||
# 默认返回 buy
|
||
return 'buy'
|
||
|
||
def _calculate_quantity_by_position_size(self, position_size: str, real_trading: bool = False) -> float:
|
||
"""根据仓位大小计算实际金额"""
|
||
if real_trading:
|
||
# 实盘交易配置
|
||
position_config = {
|
||
'heavy': self.settings.real_trading_max_single_position,
|
||
'medium': self.settings.real_trading_max_single_position * 0.6,
|
||
'light': self.settings.real_trading_max_single_position * 0.3
|
||
}
|
||
else:
|
||
# 模拟交易配置
|
||
position_config = {
|
||
'heavy': self.settings.paper_trading_position_a, # 1000
|
||
'medium': self.settings.paper_trading_position_b, # 500
|
||
'light': self.settings.paper_trading_position_c # 200
|
||
}
|
||
|
||
return position_config.get(position_size, 200)
|
||
|
||
async def _execute_real_trade(self, decision: Dict[str, Any], market_signal: Dict[str, Any], current_price: float):
|
||
"""执行实盘交易"""
|
||
try:
|
||
symbol = decision.get('symbol')
|
||
action = decision.get('action', '')
|
||
position_size = decision.get('position_size', 'light')
|
||
|
||
# 直接使用 LLM 决策的 quantity
|
||
quantity = decision.get('quantity', 0)
|
||
if quantity <= 0:
|
||
logger.warning(f" ⚠️ LLM 决策的 quantity 无效: {quantity},使用默认值")
|
||
quantity = self._calculate_quantity_by_position_size(position_size, real_trading=True)
|
||
|
||
logger.info(f" 实盘交易: {position_size} → 保证金 ${quantity:.2f} USDT → 持仓价值 ${quantity * 20:.2f}")
|
||
|
||
# 转换决策的 action 为 paper_trading 期望的格式
|
||
trading_action = self._convert_trading_action(action)
|
||
|
||
# 从市场信号中获取入场方式和入场价格
|
||
best_signal = self._get_best_signal_from_market(market_signal)
|
||
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
|
||
entry_price = best_signal.get('entry_zone', current_price) if best_signal else current_price
|
||
|
||
logger.info(f" 入场方式: {entry_type} | 入场价格: ${entry_price:,.2f}")
|
||
|
||
# 转换决策为订单格式(价格字段已在 LLM 解析时转换为 float)
|
||
order_data = {
|
||
'symbol': symbol,
|
||
'action': trading_action, # 使用转换后的 action
|
||
'entry_type': entry_type, # 使用信号中的入场方式
|
||
'entry_price': entry_price if entry_type == 'limit' else current_price, # limit单使用entry_zone,market单使用current_price
|
||
'stop_loss': decision.get('stop_loss'),
|
||
'take_profit': decision.get('take_profit'),
|
||
'confidence': decision.get('confidence', 50),
|
||
'signal_grade': 'B',
|
||
'position_size': position_size,
|
||
'quantity': quantity # 使用 LLM 决策的金额
|
||
}
|
||
|
||
result = self.real_trading.create_order_from_signal(order_data, current_price)
|
||
|
||
if result.get('success'):
|
||
logger.info(f" 💰 已创建实盘订单: {result.get('order_id')} | 持仓价值: ${quantity * 20:.2f}")
|
||
await self._notify_real_order_created(symbol, decision, result)
|
||
else:
|
||
logger.warning(f" ⚠️ 创建实盘订单失败: {result.get('message', 'Unknown error')}")
|
||
|
||
# 返回结果
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行实盘交易失败: {e}")
|
||
|
||
async def _execute_close(self, decision: Dict[str, Any], paper_trading: bool = True):
|
||
"""执行平仓
|
||
|
||
Args:
|
||
decision: 交易决策
|
||
paper_trading: True=模拟交易, False=实盘交易
|
||
"""
|
||
try:
|
||
symbol = decision.get('symbol')
|
||
|
||
if paper_trading:
|
||
# 模拟平仓
|
||
if self.paper_trading:
|
||
# TODO: 实现模拟平仓逻辑
|
||
logger.info(f" 🔒 模拟平仓: {symbol}")
|
||
logger.info(f" 理由: {decision.get('reasoning', '')}")
|
||
else:
|
||
logger.warning(f" 模拟交易服务未初始化")
|
||
else:
|
||
# 实盘平仓
|
||
if self.real_trading and self.real_trading.get_auto_trading_status():
|
||
# TODO: 实现实盘平仓逻辑
|
||
logger.info(f" 🔒 实盘平仓: {symbol}")
|
||
logger.info(f" 理由: {decision.get('reasoning', '')}")
|
||
else:
|
||
logger.warning(f" 实盘交易服务未启用或自动交易未开启")
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行平仓失败: {e}")
|
||
|
||
def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any],
|
||
current_price: float) -> Dict[str, Any]:
|
||
"""转换 LLM 信号格式为模拟交易格式"""
|
||
signal_type = signal.get('type', 'medium_term')
|
||
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
|
||
|
||
# 获取入场类型和入场价
|
||
entry_type = signal.get('entry_type', 'market')
|
||
entry_price = signal.get('entry_price', current_price)
|
||
|
||
return {
|
||
'symbol': symbol,
|
||
'action': signal.get('action', 'hold'),
|
||
'entry_type': entry_type, # market 或 limit
|
||
'entry_price': entry_price, # 入场价(挂单价格)
|
||
'price': current_price, # 当前价格
|
||
'stop_loss': signal.get('stop_loss', 0),
|
||
'take_profit': signal.get('take_profit', 0),
|
||
'confidence': signal.get('confidence', 0),
|
||
'signal_grade': signal.get('grade', 'D'),
|
||
'signal_type': type_map.get(signal_type, 'swing'),
|
||
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
|
||
'reasons': [signal.get('reason', '')],
|
||
'timestamp': datetime.now()
|
||
}
|
||
|
||
def _calculate_price_change(self, h1_data: pd.DataFrame) -> str:
|
||
"""计算24小时价格变化"""
|
||
if len(h1_data) < 24:
|
||
return "N/A"
|
||
price_now = h1_data.iloc[-1]['close']
|
||
price_24h_ago = h1_data.iloc[-24]['close']
|
||
change = ((price_now - price_24h_ago) / price_24h_ago) * 100
|
||
if change >= 0:
|
||
return f"+{change:.2f}%"
|
||
return f"{change:.2f}%"
|
||
|
||
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
|
||
"""验证数据完整性"""
|
||
required_intervals = ['5m', '15m', '1h', '4h']
|
||
for interval in required_intervals:
|
||
if interval not in data or data[interval].empty:
|
||
return False
|
||
if len(data[interval]) < 20:
|
||
return False
|
||
return True
|
||
|
||
def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool:
|
||
"""判断是否应该发送信号"""
|
||
action = signal.get('action', 'wait')
|
||
if action == 'wait':
|
||
return False
|
||
|
||
confidence = signal.get('confidence', 0)
|
||
# 使用配置文件中的阈值
|
||
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
|
||
if confidence < threshold:
|
||
return False
|
||
|
||
# 检查冷却时间(30分钟内不重复发送相同方向的信号)
|
||
if symbol in self.signal_cooldown:
|
||
cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30)
|
||
if datetime.now() < cooldown_end:
|
||
if symbol in self.last_signals:
|
||
if self.last_signals[symbol].get('action') == action:
|
||
logger.debug(f"{symbol} 信号冷却中,跳过")
|
||
return False
|
||
|
||
return True
|
||
|
||
async def _review_and_adjust_positions(
|
||
self,
|
||
symbol: str,
|
||
data: Dict[str, pd.DataFrame]
|
||
):
|
||
"""
|
||
回顾并调整现有持仓(基于市场分析 + 当前持仓状态)
|
||
|
||
每次分析后自动回顾该交易对的所有持仓,让决策器决定是否需要:
|
||
- 调整止损止盈
|
||
- 部分平仓
|
||
- 全部平仓
|
||
"""
|
||
try:
|
||
# 获取该交易对的所有活跃持仓(只看已成交的)
|
||
active_orders = self.paper_trading.get_active_orders()
|
||
positions = [
|
||
order for order in active_orders
|
||
if order.get('symbol') == symbol
|
||
and order.get('status') == 'open'
|
||
and order.get('filled_price') # 只处理已成交的订单
|
||
]
|
||
|
||
if not positions:
|
||
return # 没有持仓需要回顾
|
||
|
||
logger.info(f"\n🔄 【持仓回顾中...】共 {len(positions)} 个持仓")
|
||
|
||
# TODO: 实现持仓回顾功能
|
||
# 1. 获取当前市场信号
|
||
# 2. 将持仓信息传递给决策器
|
||
# 3. 根据决策调整持仓
|
||
|
||
logger.info(" 持仓回顾功能待实现")
|
||
|
||
except Exception as e:
|
||
logger.error(f"持仓回顾失败: {e}", exc_info=True)
|
||
|
||
def _convert_to_real_signal(self, symbol: str, signal: Dict[str, Any],
|
||
current_price: float) -> Dict[str, Any]:
|
||
"""转换 LLM 信号格式为实盘交易格式"""
|
||
signal_type = signal.get('type', 'medium_term')
|
||
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
|
||
|
||
# 获取入场类型和入场价
|
||
entry_type = signal.get('entry_type', 'market')
|
||
entry_price = signal.get('entry_price', current_price)
|
||
|
||
# 映射 action: buy -> long, sell -> short
|
||
action = signal.get('action', 'hold')
|
||
side_map = {'buy': 'long', 'sell': 'short'}
|
||
|
||
return {
|
||
'symbol': symbol,
|
||
'side': side_map.get(action, 'long'),
|
||
'entry_type': entry_type,
|
||
'entry_price': entry_price,
|
||
'stop_loss': signal.get('stop_loss', 0),
|
||
'take_profit': signal.get('take_profit', 0),
|
||
'confidence': signal.get('confidence', 0),
|
||
'grade': signal.get('grade', 'D'),
|
||
'signal_type': type_map.get(signal_type, 'swing'),
|
||
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
|
||
'trend': signal.get('trend')
|
||
}
|
||
|
||
async def _notify_real_order_created(
|
||
self,
|
||
symbol: str,
|
||
signal: Dict[str, Any],
|
||
result: Dict[str, Any]
|
||
):
|
||
"""发送实盘订单创建通知"""
|
||
side = signal.get('action', 'buy')
|
||
side_text = "做多" if side == 'buy' else "做空"
|
||
side_icon = "🟢" if side == 'buy' else "🔴"
|
||
grade = signal.get('grade', 'N/A')
|
||
position_size = result.get('position_size', 'light')
|
||
quantity = result.get('quantity', 0) # 这是保证金金额
|
||
position_value = quantity * 20 # 持仓价值 = 保证金 × 杠杆
|
||
|
||
title = f"💰 实盘订单已创建 - {symbol}"
|
||
|
||
content_parts = [
|
||
f"{side_icon} **方向**: {side_text}",
|
||
f"⭐ **信号等级**: {grade}",
|
||
f"📊 **仓位**: {position_size}",
|
||
f"💰 **持仓价值**: ${position_value:,.2f}",
|
||
f"🆔 **订单ID**: {result.get('order_id', '')[:12]}...",
|
||
f"",
|
||
f"⚠️ **真实资金交易中**",
|
||
]
|
||
|
||
content = "\n".join(content_parts)
|
||
|
||
if self.settings.feishu_enabled:
|
||
await self.feishu.send_card(title, content, "red")
|
||
if self.settings.telegram_enabled:
|
||
message = f"{title}\n\n{content}"
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送实盘订单创建通知: {result.get('order_id')}")
|
||
|
||
async def _notify_position_adjustment(
|
||
self,
|
||
symbol: str,
|
||
order_id: str,
|
||
decision: Dict[str, Any],
|
||
result: Dict[str, Any]
|
||
):
|
||
"""发送持仓调整通知"""
|
||
action = decision.get('action')
|
||
reason = decision.get('reason', '')
|
||
|
||
action_map = {
|
||
'ADJUST_SL_TP': '🔄 调整止损止盈',
|
||
'PARTIAL_CLOSE': '📤 部分平仓',
|
||
'FULL_CLOSE': '🚪 全部平仓'
|
||
}
|
||
action_text = action_map.get(action, action)
|
||
|
||
title = f"{action_text} - {symbol}"
|
||
|
||
content_parts = [
|
||
f"📊 **订单**: {order_id[:8]}",
|
||
f"📝 **原因**: {reason}",
|
||
]
|
||
|
||
if action == 'ADJUST_SL_TP':
|
||
changes = result.get('changes', [])
|
||
content_parts.append(f"🔄 **调整内容**: {', '.join(changes)}")
|
||
|
||
elif action in ['PARTIAL_CLOSE', 'FULL_CLOSE']:
|
||
pnl_info = result.get('pnl', {})
|
||
if pnl_info:
|
||
pnl = pnl_info.get('pnl', 0)
|
||
pnl_percent = pnl_info.get('pnl_percent', 0)
|
||
content_parts.append(f"💰 **实现盈亏**: ${pnl:+.2f} ({pnl_percent:+.1f}%)")
|
||
|
||
if action == 'PARTIAL_CLOSE':
|
||
close_percent = decision.get('close_percent', 0)
|
||
remaining = result.get('remaining_quantity', 0)
|
||
content_parts.append(f"📊 **平仓比例**: {close_percent:.0f}%")
|
||
content_parts.append(f"💵 **剩余仓位**: ${remaining:,.0f}")
|
||
|
||
content = "\n".join(content_parts)
|
||
|
||
if self.settings.feishu_enabled:
|
||
await self.feishu.send_card(title, content, "blue")
|
||
if self.settings.telegram_enabled:
|
||
message = f"{title}\n\n{content}"
|
||
await self.telegram.send_message(message)
|
||
|
||
async def _notify_signal_not_executed(
|
||
self,
|
||
market_signal: Dict[str, Any],
|
||
decision: Dict[str, Any],
|
||
current_price: float,
|
||
is_paper: bool = True,
|
||
reason: str = ""
|
||
):
|
||
"""发送有信号但未执行交易的通知"""
|
||
try:
|
||
symbol = market_signal.get('symbol')
|
||
account_type = "📊" if is_paper else "💰"
|
||
|
||
# 获取最佳信号
|
||
best_signal = self._get_best_signal_from_market(market_signal)
|
||
if not best_signal:
|
||
return
|
||
|
||
confidence = best_signal.get('confidence', 0)
|
||
entry_type = best_signal.get('entry_type', 'market')
|
||
entry_zone = best_signal.get('entry_zone', current_price)
|
||
|
||
# 决策信息
|
||
decision_type = decision.get('decision', 'HOLD')
|
||
decision_reasoning = decision.get('reasoning', '')
|
||
|
||
# 如果有外部传入的 reason(订单创建失败的具体原因),优先使用
|
||
if reason:
|
||
final_reason = reason
|
||
elif decision_reasoning:
|
||
final_reason = decision_reasoning
|
||
else:
|
||
final_reason = "未知原因"
|
||
|
||
# 方向图标
|
||
action = best_signal.get('action', 'wait')
|
||
if action == 'buy':
|
||
action_icon = '🟢'
|
||
action_text = '做多'
|
||
elif action == 'sell':
|
||
action_icon = '🔴'
|
||
action_text = '做空'
|
||
else:
|
||
action_icon = '➖'
|
||
action_text = '观望'
|
||
|
||
# 构建标题
|
||
title = f"{account_type} {symbol} 信号未执行"
|
||
|
||
# 构建内容
|
||
content_parts = [
|
||
f"{action_icon} **信号**: {action_text} | 📈 信心度: **{confidence}%**",
|
||
f"",
|
||
f"**入场方式**: {entry_type}",
|
||
f"**建议入场价**: ${entry_zone:,.2f}" if isinstance(entry_zone, (int, float)) else f"**建议入场价**: {entry_zone}",
|
||
f"**当前价格**: ${current_price:,.2f}",
|
||
f"",
|
||
f"⚠️ **未执行原因**:",
|
||
f"{final_reason}",
|
||
]
|
||
|
||
content = "\n".join(content_parts)
|
||
|
||
# 发送通知
|
||
if self.settings.feishu_enabled:
|
||
await self.feishu.send_card(title, content, "orange")
|
||
if self.settings.telegram_enabled:
|
||
message = f"{title}\n\n{content}"
|
||
await self.telegram.send_message(message)
|
||
|
||
logger.info(f" 📤 已发送信号未执行通知: {decision_type} - {final_reason[:50]}")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"发送信号未执行通知失败: {e}")
|
||
|
||
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
|
||
"""单次分析(用于测试或手动触发)"""
|
||
data = self.exchange.get_multi_timeframe_data(symbol)
|
||
|
||
if not self._validate_data(data):
|
||
return {'error': '数据不完整'}
|
||
|
||
# 使用新架构:市场分析 + 交易决策
|
||
market_signal = await self.market_analyzer.analyze(
|
||
symbol, data,
|
||
symbols=self.symbols
|
||
)
|
||
|
||
positions, account = self._get_trading_state()
|
||
decision = await self.decision_maker.make_decision(
|
||
market_signal, positions, account
|
||
)
|
||
|
||
return {
|
||
'market_signal': market_signal,
|
||
'trading_decision': decision
|
||
}
|
||
|
||
def get_status(self) -> Dict[str, Any]:
|
||
"""获取智能体状态"""
|
||
return {
|
||
'running': self.running,
|
||
'symbols': self.symbols,
|
||
'mode': 'LLM 驱动',
|
||
'last_signals': {
|
||
symbol: {
|
||
'type': sig.get('type'),
|
||
'action': sig.get('action'),
|
||
'confidence': sig.get('confidence'),
|
||
'grade': sig.get('grade')
|
||
}
|
||
for symbol, sig in self.last_signals.items()
|
||
}
|
||
}
|
||
|
||
|
||
# 全局单例
|
||
_crypto_agent: Optional['CryptoAgent'] = None
|
||
|
||
|
||
def get_crypto_agent() -> 'CryptoAgent':
|
||
"""获取加密货币智能体单例"""
|
||
# 直接使用类单例,不使用全局变量(避免 reload 时重置)
|
||
return CryptoAgent()
|
||
|