stock-ai-agent/backend/app/crypto_agent/crypto_agent.py
2026-02-26 00:02:20 +08:00

1707 lines
73 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
加密货币交易智能体 - 主控制器LLM 驱动版)
"""
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pandas as pd
from app.utils.logger import logger
from app.config import get_settings
from app.services.bitget_service import bitget_service
from app.services.feishu_service import get_feishu_service
from app.services.telegram_service import get_telegram_service
from app.services.paper_trading_service import get_paper_trading_service
from app.services.signal_database_service import get_signal_db_service
from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer
from app.crypto_agent.trading_decision_maker import TradingDecisionMaker
from app.utils.system_status import get_system_monitor, AgentStatus
class CryptoAgent:
"""加密货币交易信号智能体LLM 驱动版)"""
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
"""单例模式 - 确保只有一个实例"""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""初始化智能体"""
# 防止重复初始化
if CryptoAgent._initialized:
return
CryptoAgent._initialized = True
self.settings = get_settings()
self.exchange = bitget_service # 交易所服务
self.feishu = get_feishu_service()
self.telegram = get_telegram_service()
# 新架构:市场信号分析器 + 交易决策器
self.market_analyzer = MarketSignalAnalyzer()
self.decision_maker = TradingDecisionMaker()
self.signal_db = get_signal_db_service() # 信号数据库服务
# 模拟交易服务(始终启用)
self.paper_trading = get_paper_trading_service()
# 实盘交易服务(如果配置了 API
self.real_trading = None
try:
from app.services.real_trading_service import get_real_trading_service
self.real_trading = get_real_trading_service()
except Exception as e:
logger.warning(f"实盘交易服务初始化失败: {e}")
# 状态管理
self.last_signals: Dict[str, Dict[str, Any]] = {}
self.signal_cooldown: Dict[str, datetime] = {}
# 配置
self.symbols = self.settings.crypto_symbols.split(',')
# 运行状态
self.running = False
self._event_loop = None
# 注册到系统监控
monitor = get_system_monitor()
self._monitor_info = monitor.register_agent(
agent_id="crypto_agent",
name="加密货币智能体",
agent_type="crypto"
)
monitor.update_config("crypto_agent", {
"symbols": self.symbols,
"auto_trading_enabled": True, # 模拟交易始终启用
"analysis_interval": "每5分钟整点"
})
logger.info(f"加密货币智能体初始化完成LLM 驱动),监控交易对: {self.symbols}")
logger.info(f"📊 交易: 始终启用")
if self.real_trading:
auto_status = "启用" if self.real_trading.get_auto_trading_status() else "禁用"
logger.info(f"实盘交易: 已配置 (自动交易: {auto_status})")
else:
logger.info(f"实盘交易: 未配置")
def _on_price_update(self, symbol: str, price: float):
"""处理实时价格更新(用于模拟交易)"""
if not self.paper_trading:
return
triggered = self.paper_trading.check_price_triggers(symbol, price)
for result in triggered:
if self._event_loop and self._event_loop.is_running():
# 根据事件类型选择不同的通知方法
event_type = result.get('event_type', 'order_closed')
if event_type == 'order_filled':
asyncio.run_coroutine_threadsafe(self._notify_order_filled(result), self._event_loop)
elif event_type == 'breakeven_triggered':
asyncio.run_coroutine_threadsafe(self._notify_breakeven_triggered(result), self._event_loop)
else:
asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop)
else:
logger.warning(f"无法发送通知: 事件循环不可用")
async def _notify_order_filled(self, result: Dict[str, Any]):
"""发送挂单成交通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
grade = result.get('signal_grade', 'N/A')
title = f"✅ 挂单成交 - {result.get('symbol')}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"⭐ **信号等级**: {grade}",
f"💰 **挂单价**: ${result.get('entry_price', 0):,.2f}",
f"🎯 **成交价**: ${result.get('filled_price', 0):,.2f}",
f"💵 **仓位**: ${result.get('quantity', 0):,.0f}",
]
if result.get('stop_loss'):
content_parts.append(f"🛑 **止损**: ${result.get('stop_loss', 0):,.2f}")
if result.get('take_profit'):
content_parts.append(f"🎯 **止盈**: ${result.get('take_profit', 0):,.2f}")
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "green")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
logger.info(f"已发送挂单成交通知: {result.get('order_id')}")
async def _notify_pending_cancelled(self, result: Dict[str, Any]):
"""发送挂单撤销通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
new_side_text = "做多" if result.get('new_side') == 'long' else "做空"
title = f"⚠️ 挂单撤销 - {result.get('symbol')}"
content_parts = [
f"{side_icon} **原方向**: {side_text}",
f"💰 **挂单价**: ${result.get('entry_price', 0):,.2f}",
f"",
f"📝 **原因**: 收到反向{new_side_text}信号,自动撤销",
]
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "orange")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
logger.info(f"已发送挂单撤销通知: {result.get('order_id')}")
async def _notify_breakeven_triggered(self, result: Dict[str, Any]):
"""发送移动止损触发通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = '🟢' if result.get('side') == 'long' else '🔴'
pnl_percent = result.get('current_pnl_percent', 0)
title = f"📈 移动止损已启动 - {result.get('symbol')}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"",
f"💰 **开仓价**: ${result.get('filled_price', 0):,.2f}",
f"📈 **当前盈利**: {pnl_percent:+.2f}%",
f"🛑 **新止损价**: ${result.get('new_stop_loss', 0):,.2f}",
f"",
f"💰 锁定利润,让利润奔跑"
]
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "green")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
logger.info(f"已发送移动止损通知: {result.get('order_id')}")
async def _notify_order_closed(self, result: Dict[str, Any]):
"""发送订单平仓通知"""
status = result.get('status', '')
is_win = result.get('is_win', False)
if status == 'closed_tp':
emoji = "🎯"
status_text = "止盈平仓"
color = "green"
elif status == 'closed_sl':
emoji = "🛑"
status_text = "止损平仓"
color = "red"
elif status == 'closed_be':
emoji = "📈"
status_text = "移动止损"
color = "orange"
else:
emoji = "📤"
status_text = "手动平仓"
color = "blue"
win_text = "盈利" if is_win else "亏损"
win_emoji = "" if is_win else ""
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = "🟢" if result.get('side') == 'long' else "🔴"
title = f"{emoji} 订单{status_text}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"💰 **交易对**: {result.get('symbol')}",
f"📊 **入场**: ${result.get('entry_price', 0):,.2f}",
f"🎯 **出场**: ${result.get('exit_price', 0):,.2f}",
f"{win_emoji} **{win_text}**: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})",
f"⏱️ **持仓时间**: {result.get('hold_duration', 'N/A')}",
]
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, color)
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
logger.info(f"已发送订单平仓通知: {result.get('order_id')}")
def _get_seconds_until_next_5min(self) -> int:
"""计算距离下一个5分钟整点的秒数"""
now = datetime.now()
current_minute = now.minute
current_second = now.second
minutes_past = current_minute % 5
if minutes_past == 0 and current_second == 0:
return 0
minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5
seconds_to_wait = minutes_to_wait * 60 - current_second
return seconds_to_wait
async def run(self):
"""主运行循环"""
self.running = True
self._event_loop = asyncio.get_event_loop()
# 更新状态为启动中
monitor = get_system_monitor()
monitor.update_status("crypto_agent", AgentStatus.STARTING)
# 启动横幅
logger.info("\n" + "=" * 60)
logger.info("🚀 加密货币交易信号智能体LLM 驱动)")
logger.info("=" * 60)
logger.info(f" 监控交易对: {', '.join(self.symbols)}")
logger.info(f" 运行模式: 每5分钟整点执行")
logger.info(f" 分析引擎: LLM 自主分析")
logger.info(f" 模拟交易: 已启用")
logger.info("=" * 60 + "\n")
# 更新状态为运行中
monitor.update_status("crypto_agent", AgentStatus.RUNNING)
# 注意:不再启动独立的价格监控
# 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查
logger.info(f"模拟交易已启用(由后台统一监控)")
# 发送启动通知(卡片格式)
title = "🚀 加密货币智能体已启动"
# 构建卡片内容
content_parts = [
f"🤖 **驱动引擎**: LLM 自主分析",
f"📊 **监控交易对**: {len(self.symbols)}",
f" {', '.join(self.symbols)}",
f"⏰ **运行频率**: 每5分钟整点",
f"💰 **交易系统**: 已启用(后台统一监控)",
f"🎯 **分析维度**: 技术面 + 资金面 + 情绪面",
]
content = "\n".join(content_parts)
await self.feishu.send_card(title, content, "green")
await self.telegram.send_startup_notification(self.symbols)
while self.running:
try:
wait_seconds = self._get_seconds_until_next_5min()
if wait_seconds > 0:
next_run = datetime.now() + timedelta(seconds=wait_seconds)
logger.info(f"⏳ 等待 {wait_seconds} 秒,下次运行: {next_run.strftime('%H:%M:%S')}")
await asyncio.sleep(wait_seconds)
run_time = datetime.now()
logger.info("\n" + "=" * 60)
logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]")
logger.info("=" * 60)
for symbol in self.symbols:
await self.analyze_symbol(symbol)
logger.info("\n" + "" * 60)
logger.info(f"✅ 本轮分析完成,共分析 {len(self.symbols)} 个交易对")
logger.info("" * 60 + "\n")
await asyncio.sleep(2)
except Exception as e:
logger.error(f"❌ 分析循环出错: {e}")
import traceback
logger.error(traceback.format_exc())
await asyncio.sleep(10)
def stop(self):
"""停止运行"""
self.running = False
# 更新状态
monitor = get_system_monitor()
monitor.update_status("crypto_agent", AgentStatus.STOPPED)
logger.info("加密货币智能体已停止")
def _check_volatility(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[bool, str, float]:
"""
检查波动率,判断是否值得进行 LLM 分析(组合方案)
使用 1 小时 K 线判断趋势波动5 分钟 K 线检测突发波动
Args:
symbol: 交易对
data: 多周期K线数据
Returns:
(should_analyze, reason, volatility_percent)
should_analyze: 是否应该进行分析
reason: 原因说明
volatility_percent: 1小时波动率百分比
"""
# 检查是否启用波动率过滤
if not self.settings.crypto_volatility_filter_enabled:
return True, "波动率过滤未启用", 0
try:
# 1. 首先检查 1 小时趋势波动率
df_1h = data.get('1h')
if df_1h is None or len(df_1h) < 20:
# 数据不足,保守起见允许分析
return True, "1小时数据不足允许分析", 0
# 获取最近20根K线
recent_1h = df_1h.iloc[-20:]
# 计算最高价和最低价
high = recent_1h['high'].max()
low = recent_1h['low'].min()
current_price = float(recent_1h.iloc[-1]['close'])
# 计算1小时波动率
if low > 0:
volatility_1h_percent = ((high - low) / low) * 100
else:
volatility_1h_percent = 0
# 计算价格变化范围(相对于当前价格)
price_range_high_percent = ((high - current_price) / current_price) * 100 if current_price > 0 else 0
price_range_low_percent = ((current_price - low) / current_price) * 100 if current_price > 0 else 0
# 从配置读取阈值
min_volatility = self.settings.crypto_min_volatility_percent
min_price_range = self.settings.crypto_min_price_range_percent
# 如果1小时波动率足够大直接允许分析
if volatility_1h_percent >= min_volatility or price_range_high_percent >= min_price_range or price_range_low_percent >= min_price_range:
return True, f"1小时趋势活跃 (波动率 {volatility_1h_percent:.2f}%),值得分析", volatility_1h_percent
# 2. 1小时波动率较低检查5分钟突发波动
df_5m = data.get('5m')
if df_5m is not None and len(df_5m) >= 3:
# 获取最近3根5分钟K线15分钟内的变化
recent_5m = df_5m.iloc[-3:]
# 计算5分钟价格变化幅度
price_start = float(recent_5m.iloc[0]['close'])
price_end = float(recent_5m.iloc[-1]['close'])
if price_start > 0:
price_change_5m = abs(price_end - price_start) / price_start * 100
else:
price_change_5m = 0
# 从配置读取5分钟突发波动阈值
surge_threshold = self.settings.crypto_5m_surge_threshold
logger.debug(f"{symbol} 5分钟价格变化: {price_start:.2f} -> {price_end:.2f} = {price_change_5m:.2f}% (阈值: {surge_threshold}%)")
# 如果5分钟突发波动超过阈值仍然允许分析
if price_change_5m >= surge_threshold:
direction = "上涨" if price_end > price_start else "下跌"
return True, f"5分钟突发{direction} ({price_change_5m:.2f}% > {surge_threshold}%),强制分析", volatility_1h_percent
# 3. 波动率过低,跳过分析
reason = f"波动率过低 (1小时: {volatility_1h_percent:.2f}% < {min_volatility}%, 5分钟无突发波动),跳过分析"
logger.info(f"⏸️ {symbol} {reason}")
return False, reason, volatility_1h_percent
except Exception as e:
logger.warning(f"{symbol} 波动率检查失败: {e},允许分析")
return True, "波动率检查失败,允许分析", 0
async def analyze_symbol(self, symbol: str):
"""
分析单个交易对(新架构:市场分析 + 交易决策分离)
新架构流程:
1. 市场信号分析器分析市场(不包含仓位信息)
2. 交易决策器根据信号+仓位+账户状态做决策
3. 执行交易决策
Args:
symbol: 交易对,如 'BTCUSDT'
"""
try:
# 更新活动时间
monitor = get_system_monitor()
monitor.update_activity("crypto_agent")
logger.info(f"\n{'' * 50}")
logger.info(f"📊 {symbol} 分析开始")
logger.info(f"{'' * 50}")
# 1. 获取多周期数据
data = self.exchange.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
logger.warning(f"⚠️ {symbol} 数据不完整,跳过分析")
return
# 当前价格
current_price = float(data['5m'].iloc[-1]['close'])
price_change_24h = self._calculate_price_change(data['1h'])
logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})")
# 1.5. 波动率检查(节省 LLM 调用)
should_analyze, volatility_reason, volatility = self._check_volatility(symbol, data)
if not should_analyze:
logger.info(f"⏸️ {volatility_reason},跳过本次 LLM 分析")
return
# ============================================================
# 第一阶段:市场信号分析(不包含仓位信息)
# ============================================================
logger.info(f"\n🤖 【第一阶段:市场信号分析】")
market_signal = await self.market_analyzer.analyze(
symbol, data,
symbols=self.symbols
)
# 输出市场分析结果
self._log_market_signal(market_signal)
# 过滤掉 wait 信号,只保留 buy/sell 信号
signals = market_signal.get('signals', [])
trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']]
if not trade_signals:
logger.info(f"\n⏸️ 结论: 无交易信号(仅有观望建议),继续观望")
return
# 检查是否有达到阈值的交易信号
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
valid_signals = [s for s in trade_signals if s.get('confidence', 0) >= threshold]
if not valid_signals:
logger.info(f"\n⏸️ 结论: 无交易信号达到置信度阈值 ({threshold}%),继续观望")
return
logger.info(f"\n✅ 发现 {len(valid_signals)} 个有效交易信号(达到 {threshold}% 阈值)")
# ============================================================
# 发送市场信号通知(独立于交易决策)
# ============================================================
await self._send_market_signal_notification(market_signal, current_price)
# ============================================================
# 第二阶段:交易决策(信号 + 仓位 + 账户状态)
# 模拟交易和实盘交易分别进行独立决策
# ============================================================
logger.info(f"\n🤖 【第二阶段:交易决策】")
# 获取配置
paper_trading_enabled = self.settings.paper_trading_enabled
real_trading_enabled = self.settings.real_trading_enabled
# 分别存储模拟和实盘的决策
paper_decision = None
real_decision = None
# 模拟交易决策
if paper_trading_enabled:
logger.info(f"\n📊 【模拟交易决策】")
positions, account, pending_orders = self._get_trading_state(use_real_trading=False)
paper_decision = await self.decision_maker.make_decision(
market_signal, positions, account, current_price, pending_orders
)
self._log_trading_decision(paper_decision)
else:
logger.info(f"⏸️ 模拟交易未启用")
# 实盘交易决策
if real_trading_enabled:
logger.info(f"\n💰 【实盘交易决策】")
# 检查是否开启自动交易
if self.real_trading and self.real_trading.get_auto_trading_status():
positions, account, pending_orders = self._get_trading_state(use_real_trading=True)
real_decision = await self.decision_maker.make_decision(
market_signal, positions, account, current_price, pending_orders
)
self._log_trading_decision(real_decision)
else:
logger.info(f"⏸️ 实盘自动交易未开启")
else:
logger.info(f"⏸️ 实盘交易未启用")
# ============================================================
# 第三阶段:执行交易决策
# ============================================================
await self._execute_decisions(paper_decision, real_decision, market_signal, current_price)
except Exception as e:
logger.error(f"❌ 分析 {symbol} 出错: {e}")
import traceback
logger.error(traceback.format_exc())
def _log_market_signal(self, signal: Dict[str, Any]):
"""输出市场信号分析结果"""
logger.info(f" 市场状态: {signal.get('market_state')}")
logger.info(f" 趋势: {signal.get('trend')}")
# 新闻情绪
news_sentiment = signal.get('news_sentiment', '')
if news_sentiment:
sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': ''}.get(news_sentiment, '')
logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}")
# 关键价位
import re
key_levels = signal.get('key_levels', {})
if key_levels.get('support') or key_levels.get('resistance'):
# 从字符串中提取数字(处理 "66065 (15m布林下轨)" 这种格式)
def extract_number(val):
if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
# 提取第一个数字
match = re.search(r'[\d,]+\.?\d*', val.replace(',', ''))
if match:
return float(match.group())
return None
supports = [extract_number(s) for s in key_levels.get('support', [])[:2]]
resistances = [extract_number(r) for r in key_levels.get('resistance', [])[:2]]
support_str = ', '.join([f"${s:,.2f}" for s in supports if s is not None])
resistance_str = ', '.join([f"${r:,.2f}" for r in resistances if r is not None])
logger.info(f" 支撑位: {support_str or '-'}")
logger.info(f" 阻力位: {resistance_str or '-'}")
# 信号列表 - 区分交易信号和观望建议
signals = signal.get('signals', [])
trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']]
wait_signals = [s for s in signals if s.get('action') == 'wait']
if trade_signals:
logger.info(f"\n🎯 【发现 {len(trade_signals)} 个交易信号】")
for i, sig in enumerate(trade_signals, 1):
signal_type = sig.get('timeframe', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(signal_type, signal_type)
action = sig.get('action', 'hold')
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'}
action_text = action_map.get(action, action)
confidence = sig.get('confidence', 0)
logger.info(f"\n [{i}] {type_text} | {action_text}")
logger.info(f" 信心度: {confidence}%")
logger.info(f" 入场: ${sig.get('entry_zone', 'N/A')}")
logger.info(f" 止损: ${sig.get('stop_loss', 'N/A')}")
logger.info(f" 止盈: ${sig.get('take_profit', 'N/A')}")
logger.info(f" 理由: {sig.get('reasoning', 'N/A')}")
if wait_signals:
logger.info(f"\n📋 【{len(wait_signals)} 个观望建议(不触发交易)】")
for i, sig in enumerate(wait_signals, 1):
signal_type = sig.get('timeframe', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(signal_type, signal_type)
confidence = sig.get('confidence', 0)
logger.info(f"\n [{i}] {type_text} | 观望")
logger.info(f" 信心度: {confidence}%")
logger.info(f" 理由: {sig.get('reasoning', 'N/A')}")
def _log_trading_decision(self, decision: Dict[str, Any]):
"""输出交易决策结果"""
decision_type = decision.get('decision', 'HOLD')
decision_map = {
'OPEN': '🟢 开仓',
'CLOSE': '🔴 平仓',
'ADD': ' 加仓',
'REDUCE': ' 减仓',
'HOLD': '⏸️ 观望'
}
logger.info(f" 决策: {decision_map.get(decision_type, decision_type)}")
logger.info(f" 动作: {decision.get('action', 'N/A')}")
logger.info(f" 仓位: {decision.get('position_size', 'N/A')}")
# quantity 是保证金,显示持仓价值 = 保证金 × 20
quantity = decision.get('quantity', 0)
if isinstance(quantity, (int, float)) and quantity > 0:
logger.info(f" 持仓价值: ${quantity * 20:,.2f} (保证金 ${quantity:.2f})")
else:
logger.info(f" 数量: ${decision.get('quantity', 'N/A')}")
if decision.get('stop_loss'):
logger.info(f" 止损: ${decision.get('stop_loss')}")
if decision.get('take_profit'):
logger.info(f" 止盈: ${decision.get('take_profit')}")
logger.info(f" 理由: {decision.get('reasoning', 'N/A')}")
risk = decision.get('risk_analysis', '')
if risk:
logger.info(f" 风险: {risk}")
def _get_trading_state(self, use_real_trading: bool = False) -> tuple:
"""
获取交易状态(持仓和账户)
Args:
use_real_trading: True 获取实盘状态False 获取模拟交易状态
Returns:
(positions, account, pending_orders) - 持仓列表、账户状态、挂单列表
"""
if use_real_trading and self.real_trading:
# 实盘交易
active_orders = self.real_trading.get_active_orders()
account = self.real_trading.get_account_status()
else:
# 模拟交易
active_orders = self.paper_trading.get_active_orders()
account = self.paper_trading.get_account_status()
# 分离持仓和挂单
position_list = []
pending_orders = []
for order in active_orders:
if order.get('status') == 'open' and order.get('filled_price'):
# 已成交的订单作为持仓
position_list.append({
'symbol': order.get('symbol'),
'side': order.get('side'),
'holding': order.get('quantity', 0),
'entry_price': order.get('filled_price') or order.get('entry_price'),
'stop_loss': order.get('stop_loss'),
'take_profit': order.get('take_profit')
})
elif order.get('status') == 'pending':
# 未成交的订单作为挂单
pending_orders.append({
'order_id': order.get('order_id'),
'symbol': order.get('symbol'),
'side': order.get('side'),
'entry_price': order.get('entry_price'),
'quantity': order.get('quantity', 0),
'entry_type': order.get('entry_type', 'market'),
'confidence': order.get('confidence', 0)
})
return position_list, account, pending_orders
async def _execute_decisions(self, paper_decision: Dict[str, Any],
real_decision: Dict[str, Any],
market_signal: Dict[str, Any], current_price: float):
"""执行交易决策(模拟和实盘分别执行)"""
# 选择最佳信号用于保存
best_signal = self._get_best_signal_from_market(market_signal)
# 保存信号到数据库(只保存一次)
if best_signal:
signal_to_save = best_signal.copy()
signal_to_save['signal_type'] = 'crypto'
signal_to_save['symbol'] = market_signal.get('symbol')
signal_to_save['current_price'] = current_price
self.signal_db.add_signal(signal_to_save)
# 获取配置
paper_trading_enabled = self.settings.paper_trading_enabled
real_trading_enabled = self.settings.real_trading_enabled
# 记录执行结果
paper_executed = False
real_executed = False
# ============================================================
# 执行模拟交易决策
# ============================================================
if paper_trading_enabled and paper_decision:
decision_type = paper_decision.get('decision', 'HOLD')
if decision_type == 'HOLD':
reasoning = paper_decision.get('reasoning', '观望')
logger.info(f"\n📊 模拟交易: {reasoning}")
# 有信号但决策为 HOLD发送未执行通知
await self._notify_signal_not_executed(market_signal, paper_decision, current_price, is_paper=True)
else:
logger.info(f"\n📊 【执行模拟交易】")
if decision_type in ['OPEN', 'ADD']:
# 先执行交易
result = await self._execute_paper_trade(paper_decision, market_signal, current_price)
# 检查是否成功执行
order = result.get('order') if result else None
if order:
# 只有成功创建订单后才发送通知
await self._send_signal_notification(market_signal, paper_decision, current_price, is_paper=True)
paper_executed = True
else:
# 有信号但订单创建失败,发送未执行通知
reason = result.get('message', '订单创建失败') if result else '订单创建失败'
await self._notify_signal_not_executed(market_signal, paper_decision, current_price, is_paper=True, reason=reason)
logger.warning(f" ⚠️ 模拟交易未执行,已发送通知")
elif decision_type == 'CLOSE':
await self._execute_close(paper_decision, paper_trading=True)
paper_executed = True
elif decision_type == 'CANCEL_PENDING':
await self._execute_cancel_pending(paper_decision, paper_trading=True)
paper_executed = True
elif decision_type == 'REDUCE':
await self._execute_reduce(paper_decision, paper_trading=True)
paper_executed = True
# ============================================================
# 执行实盘交易决策
# ============================================================
if real_trading_enabled and real_decision:
# 检查是否开启自动交易
if self.real_trading and self.real_trading.get_auto_trading_status():
decision_type = real_decision.get('decision', 'HOLD')
if decision_type == 'HOLD':
reasoning = real_decision.get('reasoning', '观望')
logger.info(f"\n💰 实盘交易: {reasoning}")
# 有信号但决策为 HOLD发送未执行通知
await self._notify_signal_not_executed(market_signal, real_decision, current_price, is_paper=False)
else:
logger.info(f"\n💰 【执行实盘交易】")
if decision_type in ['OPEN', 'ADD']:
# 先执行交易
result = await self._execute_real_trade(real_decision, market_signal, current_price)
# 检查是否成功执行
if result and result.get('success'):
# 只有成功创建订单后才发送通知
await self._send_signal_notification(market_signal, real_decision, current_price, is_paper=False)
real_executed = True
else:
# 有信号但订单创建失败,发送未执行通知
reason = result.get('message', '订单创建失败') if result else '订单创建失败'
await self._notify_signal_not_executed(market_signal, real_decision, current_price, is_paper=False, reason=reason)
logger.warning(f" ⚠️ 实盘交易未执行,已发送通知")
elif decision_type == 'CLOSE':
await self._execute_close(real_decision, paper_trading=False)
real_executed = True
elif decision_type == 'CANCEL_PENDING':
await self._execute_cancel_pending(real_decision, paper_trading=False)
real_executed = True
elif decision_type == 'REDUCE':
await self._execute_reduce(real_decision, paper_trading=False)
real_executed = True
# 如果都没有执行,给出提示
if not paper_executed and not real_executed:
logger.info(f"\n⏸️ 所有交易均为观望,无需执行")
def _get_best_signal_from_market(self, market_signal: Dict[str, Any]) -> Dict[str, Any]:
"""从市场信号中获取最佳信号"""
signals = market_signal.get('signals', [])
if not signals:
return {}
# 按信心度排序,取最高的
sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True)
return sorted_signals[0]
async def _send_market_signal_notification(self, market_signal: Dict[str, Any],
current_price: float):
"""发送市场信号通知(第一阶段)- 调用前已确保有有效信号"""
try:
# 获取配置的阈值
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
# 过滤达到阈值的信号(防御性检查)
signals = market_signal.get('signals', [])
valid_signals = [s for s in signals if s.get('confidence', 0) >= threshold]
if not valid_signals:
return
# 取最佳信号(按信心度排序)
best_signal = sorted(valid_signals, key=lambda x: x.get('confidence', 0), reverse=True)[0]
# 构建通知消息 - 完全匹配旧格式
symbol = market_signal.get('symbol')
market_state = market_signal.get('market_state')
trend = market_signal.get('trend')
# 注意:经过 market_signal_analyzer 解析后
# - 'action' 是 buy/sell/wait (交易方向)
# - 'timeframe' 是 short_term/medium_term/long_term (周期)
sig_action = best_signal.get('action', 'hold') # buy/sell/wait
timeframe = best_signal.get('timeframe', 'unknown') # short_term/medium_term/long_term
confidence = best_signal.get('confidence', 0)
entry_val = best_signal.get('entry_zone', 'N/A')
sl_val = best_signal.get('stop_loss', 'N/A')
tp_val = best_signal.get('take_profit', 'N/A')
reasoning = best_signal.get('reasoning', '')
# 格式化价格用于显示(处理 float 和 N/A
def format_price(price_value):
if price_value is None or price_value == 'N/A':
return 'N/A'
if isinstance(price_value, float):
return f"{price_value:,.2f}"
return str(price_value)
entry = format_price(entry_val)
sl = format_price(sl_val)
tp = format_price(tp_val)
# 类型映射
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
timeframe_text = type_map.get(timeframe, timeframe)
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空', 'hold': ' 观望'}
action_text = action_map.get(sig_action, sig_action)
# 入场类型 - 从信号中获取
entry_type = best_signal.get('entry_type', 'market')
entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待'
entry_type_icon = '' if entry_type == 'market' else ''
# 等级(基于信心度映射)- 与 market_signal_analyzer.py 保持一致
# A级(80-100): 量价配合 + 多指标共振 + 多周期确认
# B级(60-79): 量价配合 + 主要指标确认
# C级(40-59): 有机会但量价不够理想
# D级(<40): 量价背离或信号矛盾
if confidence >= 80:
grade = 'A'
grade_icon = '⭐⭐⭐'
elif confidence >= 60:
grade = 'B'
grade_icon = '⭐⭐'
elif confidence >= 40:
grade = 'C'
grade_icon = ''
else:
grade = 'D'
grade_icon = ''
# 仓位(基于信心度和杠杆空间)- 与新的等级阈值对齐
if confidence >= 80: # A级信号
position_size = 'heavy'
position_icon = '🔥'
position_text = '重仓'
elif confidence >= 60: # B级信号
position_size = 'medium'
position_icon = '📊'
position_text = '中仓'
else: # C级或D级信号
position_size = 'light'
position_icon = '🌱'
position_text = '轻仓'
# 计算止损止盈百分比(价格已经是 float
try:
# 使用当前价格作为入场价(如果 entry_zone 是 N/A
entry_for_calc = entry_val if isinstance(entry_val, (int, float)) else current_price
if isinstance(sl_val, (int, float)) and isinstance(entry_for_calc, (int, float)) and entry_for_calc > 0:
if sig_action == 'buy':
sl_percent = ((sl_val - entry_for_calc) / entry_for_calc * 100)
else:
sl_percent = ((entry_for_calc - sl_val) / entry_for_calc * 100)
sl_display = f"{sl_percent:+.1f}%"
else:
sl_display = "N/A"
if isinstance(tp_val, (int, float)) and isinstance(entry_for_calc, (int, float)) and entry_for_calc > 0:
if sig_action == 'buy':
tp_percent = ((tp_val - entry_for_calc) / entry_for_calc * 100)
else:
tp_percent = ((entry_for_calc - tp_val) / entry_for_calc * 100)
tp_display = f"{tp_percent:+.1f}%"
else:
tp_display = "N/A"
except:
sl_display = "N/A"
tp_display = "N/A"
# 构建卡片标题和颜色
if sig_action == 'buy':
title = f"🟢 {symbol} {timeframe_text}做多信号"
color = "green"
else:
title = f"🔴 {symbol} {timeframe_text}做空信号"
color = "red"
# 构建卡片内容
content_parts = [
f"**{timeframe_text}** | **{grade}**{grade_icon} | **{confidence}%** 置信度",
f"{entry_type_icon} **入场**: {entry_type_text} | {position_icon} **仓位**: {position_text}",
f"",
]
# 入场价格显示
if entry_type == 'limit':
# 限价订单:显示建议挂单价格和当前价格
content_parts.append(f"📋 **挂单价格**: ${entry}")
content_parts.append(f"📍 **当前价格**: ${format_price(current_price)}")
else:
# 市价订单:显示当前价格作为入场价
content_parts.append(f"💰 **入场价**: ${entry}")
if sl != 'N/A':
content_parts.append(f"🛑 **止损价**: ${sl} ({sl_display})")
if tp != 'N/A':
content_parts.append(f"🎯 **止盈价**: ${tp} ({tp_display})")
content_parts.append(f"")
content_parts.append(f"📝 **分析理由**:")
content_parts.append(f"{reasoning}")
content = "\n".join(content_parts)
# 根据配置发送通知
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, color)
if self.settings.telegram_enabled:
# Telegram 使用文本格式
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
logger.info(f" 📤 已发送市场信号通知 (阈值: {threshold}%)")
except Exception as e:
logger.warning(f"发送市场信号通知失败: {e}")
import traceback
logger.debug(traceback.format_exc())
async def _send_signal_notification(self, market_signal: Dict[str, Any],
decision: Dict[str, Any], current_price: float,
is_paper: bool = True):
"""发送交易执行通知(第三阶段)"""
try:
decision_type = decision.get('decision', 'HOLD')
# 只在非观望决策时发送执行通知
if decision_type == 'HOLD':
return
# 构建消息 - 使用旧格式风格
symbol = market_signal.get('symbol')
action = decision.get('action', '')
reasoning = decision.get('reasoning', '')
risk_analysis = decision.get('risk_analysis', '')
position_size = decision.get('position_size', 'N/A')
quantity = decision.get('quantity', 'N/A')
stop_loss = decision.get('stop_loss', '')
take_profit = decision.get('take_profit', '')
confidence = decision.get('confidence', 0)
# 决策类型映射
decision_map = {
'OPEN': '开仓',
'CLOSE': '平仓',
'ADD': '加仓',
'REDUCE': '减仓'
}
decision_text = decision_map.get(decision_type, decision_type)
# 账户类型标识
account_type = "📊" if is_paper else "💰"
# 方向图标
if 'long' in action.lower() or 'buy' in action.lower():
action_icon = '🟢'
action_text = '做多'
elif 'short' in action.lower() or 'sell' in action.lower():
action_icon = '🔴'
action_text = '做空'
else:
action_icon = ''
action_text = action
# 从市场信号中获取入场方式(需要在构建标题之前)
best_signal = self._get_best_signal_from_market(market_signal)
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
entry_type_text = '现价单' if entry_type == 'market' else '挂单'
entry_type_icon = '' if entry_type == 'market' else ''
# 仓位图标
position_map = {'heavy': '🔥 重仓', 'medium': '📊 中仓', 'light': '🌱 轻仓'}
position_display = position_map.get(position_size, '🌱 轻仓')
# 构建卡片标题和颜色 - 考虑入场方式
# 挂单时标题显示"挂单",现价单时显示"开仓"/"平仓"等
if decision_type == 'OPEN':
decision_title = '挂单' if entry_type == 'limit' else '开仓'
title = f"{account_type} {symbol} {decision_title}"
color = "green"
elif decision_type == 'CLOSE':
decision_title = '挂单' if entry_type == 'limit' else '平仓'
title = f"{account_type} {symbol} {decision_title}"
color = "orange"
elif decision_type == 'ADD':
decision_title = '挂单' if entry_type == 'limit' else '加仓'
title = f"{account_type} {symbol} {decision_title}"
color = "green"
elif decision_type == 'REDUCE':
decision_title = '挂单' if entry_type == 'limit' else '减仓'
title = f"{account_type} {symbol} {decision_title}"
color = "orange"
else:
title = f"{account_type} {symbol} 交易执行"
color = "blue"
# 构建卡片内容
# quantity 是保证金金额,需要显示持仓价值 = 保证金 × 杠杆
margin = quantity if quantity != 'N/A' else 0
leverage = 20 # 模拟交易固定 20x 杠杆
position_value = margin * leverage if isinstance(margin, (int, float)) else 'N/A'
position_value_display = f"${position_value:,.2f}" if isinstance(position_value, (int, float)) else "N/A"
# 根据入场方式显示不同的价格信息
if entry_type == 'market':
price_display = f"💵 **入场价**: ${current_price:,.2f} (现价)"
else:
entry_price = best_signal.get('entry_zone', current_price) if best_signal else current_price
price_display = f"💵 **挂单价**: ${entry_price:,.2f} (等待)"
content_parts = [
f"{action_icon} **操作**: {decision_text} ({action_text})",
f"{entry_type_icon} **入场方式**: {entry_type_text}",
f"{position_display.replace(' ', ': **')} | 📈 信心度: **{confidence}%**",
f"",
f"💰 **持仓价值**: {position_value_display}",
price_display,
]
if stop_loss:
content_parts.append(f"🛑 **止损价**: ${stop_loss}")
if take_profit:
content_parts.append(f"🎯 **止盈价**: ${take_profit}")
# 简洁的决策理由和风险分析各1句话
content_parts.append(f"")
content_parts.append(f"📝 **决策**: {reasoning}")
if risk_analysis:
content_parts.append(f"⚠️ **风险**: {risk_analysis}")
content_parts.append(f"")
content_parts.append(f"💼 交易已执行")
content = "\n".join(content_parts)
# 根据配置发送通知
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, color)
if self.settings.telegram_enabled:
# Telegram 使用文本格式
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
logger.info(f" 📤 已发送交易执行通知: {decision_text}")
except Exception as e:
logger.warning(f"发送交易执行通知失败: {e}")
async def _execute_paper_trade(self, decision: Dict[str, Any], market_signal: Dict[str, Any], current_price: float):
"""执行模拟交易"""
try:
symbol = decision.get('symbol')
action = decision.get('action', '')
position_size = decision.get('position_size', 'light')
# 直接使用 LLM 决策的 quantity
quantity = decision.get('quantity', 0)
if quantity <= 0:
logger.warning(f" ⚠️ LLM 决策的 quantity 无效: {quantity},使用默认值")
quantity = self._calculate_quantity_by_position_size(position_size, real_trading=False)
logger.info(f" 准备创建模拟订单: {symbol} {action} {position_size}")
logger.info(f" LLM 决策金额: ${quantity:.2f} USDT")
# 转换决策的 action 为 paper_trading 期望的格式
trading_action = self._convert_trading_action(action)
# 从市场信号中获取入场方式和入场价格
best_signal = self._get_best_signal_from_market(market_signal)
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
entry_price = best_signal.get('entry_zone', current_price) if best_signal else current_price
logger.info(f" 入场方式: {entry_type} | 入场价格: ${entry_price:,.2f}")
# 转换决策为订单格式(价格字段已在 LLM 解析时转换为 float
order_data = {
'symbol': symbol,
'action': trading_action, # 使用转换后的 action
'entry_type': entry_type, # 使用信号中的入场方式
'entry_price': entry_price if entry_type == 'limit' else current_price, # limit单使用entry_zonemarket单使用current_price
'stop_loss': decision.get('stop_loss'),
'take_profit': decision.get('take_profit'),
'confidence': decision.get('confidence', 50),
'signal_grade': 'B', # 默认B级
'position_size': position_size,
'quantity': quantity # 使用 LLM 决策的金额
}
logger.debug(f" 订单数据: {order_data}")
result = self.paper_trading.create_order_from_signal(order_data, current_price)
logger.debug(f" 创建订单结果: {result}")
# 记录订单
order = result.get('order')
if order:
# quantity 是保证金金额,持仓价值 = 保证金 × 20
position_value = quantity * 20
logger.info(f" 📝 已创建模拟订单: {order.order_id} | 仓位: {position_size} | 持仓价值: ${position_value:.2f}")
else:
logger.warning(f" ⚠️ 创建模拟订单失败: {result}")
# 返回结果
return result
except Exception as e:
logger.error(f"执行模拟交易失败: {e}")
import traceback
logger.debug(traceback.format_exc())
def _convert_trading_action(self, action: str) -> str:
"""转换交易决策的 action 为 buy/sell 格式"""
if not action:
return 'buy'
action_lower = action.lower()
# 做多相关的 action
if 'long' in action_lower or 'buy' in action_lower:
return 'buy'
# 做空相关的 action
elif 'short' in action_lower or 'sell' in action_lower:
return 'sell'
# 默认返回 buy
return 'buy'
def _calculate_quantity_by_position_size(self, position_size: str, real_trading: bool = False) -> float:
"""根据仓位大小计算实际金额"""
if real_trading:
# 实盘交易配置
position_config = {
'heavy': self.settings.real_trading_max_single_position,
'medium': self.settings.real_trading_max_single_position * 0.6,
'light': self.settings.real_trading_max_single_position * 0.3
}
else:
# 模拟交易配置
position_config = {
'heavy': self.settings.paper_trading_position_a, # 1000
'medium': self.settings.paper_trading_position_b, # 500
'light': self.settings.paper_trading_position_c # 200
}
return position_config.get(position_size, 200)
async def _execute_real_trade(self, decision: Dict[str, Any], market_signal: Dict[str, Any], current_price: float):
"""执行实盘交易"""
try:
symbol = decision.get('symbol')
action = decision.get('action', '')
position_size = decision.get('position_size', 'light')
# 直接使用 LLM 决策的 quantity
quantity = decision.get('quantity', 0)
if quantity <= 0:
logger.warning(f" ⚠️ LLM 决策的 quantity 无效: {quantity},使用默认值")
quantity = self._calculate_quantity_by_position_size(position_size, real_trading=True)
logger.info(f" 实盘交易: {position_size} → 保证金 ${quantity:.2f} USDT → 持仓价值 ${quantity * 20:.2f}")
# 转换决策的 action 为 paper_trading 期望的格式
trading_action = self._convert_trading_action(action)
# 从市场信号中获取入场方式和入场价格
best_signal = self._get_best_signal_from_market(market_signal)
entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market'
entry_price = best_signal.get('entry_zone', current_price) if best_signal else current_price
logger.info(f" 入场方式: {entry_type} | 入场价格: ${entry_price:,.2f}")
# 转换决策为订单格式(价格字段已在 LLM 解析时转换为 float
order_data = {
'symbol': symbol,
'action': trading_action, # 使用转换后的 action
'entry_type': entry_type, # 使用信号中的入场方式
'entry_price': entry_price if entry_type == 'limit' else current_price, # limit单使用entry_zonemarket单使用current_price
'stop_loss': decision.get('stop_loss'),
'take_profit': decision.get('take_profit'),
'confidence': decision.get('confidence', 50),
'signal_grade': 'B',
'position_size': position_size,
'quantity': quantity # 使用 LLM 决策的金额
}
result = self.real_trading.create_order_from_signal(order_data, current_price)
if result.get('success'):
logger.info(f" 💰 已创建实盘订单: {result.get('order_id')} | 持仓价值: ${quantity * 20:.2f}")
await self._notify_real_order_created(symbol, decision, result)
else:
logger.warning(f" ⚠️ 创建实盘订单失败: {result.get('message', 'Unknown error')}")
# 返回结果
return result
except Exception as e:
logger.error(f"执行实盘交易失败: {e}")
async def _execute_close(self, decision: Dict[str, Any], paper_trading: bool = True):
"""执行平仓
Args:
decision: 交易决策
paper_trading: True=模拟交易, False=实盘交易
"""
try:
symbol = decision.get('symbol')
if paper_trading:
# 模拟平仓
if self.paper_trading:
# TODO: 实现模拟平仓逻辑
logger.info(f" 🔒 模拟平仓: {symbol}")
logger.info(f" 理由: {decision.get('reasoning', '')}")
else:
logger.warning(f" 模拟交易服务未初始化")
else:
# 实盘平仓
if self.real_trading and self.real_trading.get_auto_trading_status():
# TODO: 实现实盘平仓逻辑
logger.info(f" 🔒 实盘平仓: {symbol}")
logger.info(f" 理由: {decision.get('reasoning', '')}")
else:
logger.warning(f" 实盘交易服务未启用或自动交易未开启")
except Exception as e:
logger.error(f"执行平仓失败: {e}")
async def _execute_cancel_pending(self, decision: Dict[str, Any], paper_trading: bool = True):
"""执行取消挂单
Args:
decision: 交易决策
paper_trading: True=模拟交易, False=实盘交易
"""
try:
symbol = decision.get('symbol')
orders_to_cancel = decision.get('orders_to_cancel', [])
if not orders_to_cancel:
logger.info(f" ⚠️ 没有需要取消的订单")
return
trading_service = self.paper_trading if paper_trading else self.real_trading
trading_type = "模拟" if paper_trading else "实盘"
if not trading_service:
logger.warning(f" {trading_type}交易服务未初始化")
return
logger.info(f" 🚫 {trading_type}取消挂单: {symbol}")
logger.info(f" 取消订单数量: {len(orders_to_cancel)}")
cancelled_count = 0
for order_id in orders_to_cancel:
try:
# 取消订单
result = trading_service.cancel_order(order_id)
if result.get('success'):
cancelled_count += 1
logger.info(f" ✅ 已取消订单: {order_id}")
else:
logger.warning(f" ⚠️ 取消订单失败: {order_id} | {result.get('message', '')}")
except Exception as e:
logger.error(f" ❌ 取消订单异常: {order_id} | {e}")
logger.info(f" 📊 成功取消 {cancelled_count}/{len(orders_to_cancel)} 个订单")
except Exception as e:
logger.error(f"执行取消挂单失败: {e}")
async def _execute_reduce(self, decision: Dict[str, Any], paper_trading: bool = True):
"""执行减仓
Args:
decision: 交易决策
paper_trading: True=模拟交易, False=实盘交易
"""
try:
symbol = decision.get('symbol')
trading_type = "模拟" if paper_trading else "实盘"
logger.info(f" 📤 {trading_type}减仓: {symbol}")
logger.info(f" 理由: {decision.get('reasoning', '')}")
trading_service = self.paper_trading if paper_trading else self.real_trading
if not trading_service:
logger.warning(f" {trading_type}交易服务未初始化")
return
# TODO: 实现减仓逻辑
# 减仓可以是部分平仓,需要根据决策中的参数执行
logger.info(f" ⚠️ 减仓功能待实现")
except Exception as e:
logger.error(f"执行减仓失败: {e}")
def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""转换 LLM 信号格式为模拟交易格式"""
signal_type = signal.get('type', 'medium_term')
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
# 获取入场类型和入场价
entry_type = signal.get('entry_type', 'market')
entry_price = signal.get('entry_price', current_price)
return {
'symbol': symbol,
'action': signal.get('action', 'hold'),
'entry_type': entry_type, # market 或 limit
'entry_price': entry_price, # 入场价(挂单价格)
'price': current_price, # 当前价格
'stop_loss': signal.get('stop_loss', 0),
'take_profit': signal.get('take_profit', 0),
'confidence': signal.get('confidence', 0),
'signal_grade': signal.get('grade', 'D'),
'signal_type': type_map.get(signal_type, 'swing'),
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
'reasons': [signal.get('reason', '')],
'timestamp': datetime.now()
}
def _calculate_price_change(self, h1_data: pd.DataFrame) -> str:
"""计算24小时价格变化"""
if len(h1_data) < 24:
return "N/A"
price_now = h1_data.iloc[-1]['close']
price_24h_ago = h1_data.iloc[-24]['close']
change = ((price_now - price_24h_ago) / price_24h_ago) * 100
if change >= 0:
return f"+{change:.2f}%"
return f"{change:.2f}%"
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
"""验证数据完整性"""
required_intervals = ['5m', '15m', '1h', '4h']
for interval in required_intervals:
if interval not in data or data[interval].empty:
return False
if len(data[interval]) < 20:
return False
return True
def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool:
"""判断是否应该发送信号"""
action = signal.get('action', 'wait')
if action == 'wait':
return False
confidence = signal.get('confidence', 0)
# 使用配置文件中的阈值
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
if confidence < threshold:
return False
# 检查冷却时间30分钟内不重复发送相同方向的信号
if symbol in self.signal_cooldown:
cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30)
if datetime.now() < cooldown_end:
if symbol in self.last_signals:
if self.last_signals[symbol].get('action') == action:
logger.debug(f"{symbol} 信号冷却中,跳过")
return False
return True
async def _review_and_adjust_positions(
self,
symbol: str,
data: Dict[str, pd.DataFrame]
):
"""
回顾并调整现有持仓(基于市场分析 + 当前持仓状态)
每次分析后自动回顾该交易对的所有持仓,让决策器决定是否需要:
- 调整止损止盈
- 部分平仓
- 全部平仓
"""
try:
# 获取该交易对的所有活跃持仓(只看已成交的)
active_orders = self.paper_trading.get_active_orders()
positions = [
order for order in active_orders
if order.get('symbol') == symbol
and order.get('status') == 'open'
and order.get('filled_price') # 只处理已成交的订单
]
if not positions:
return # 没有持仓需要回顾
logger.info(f"\n🔄 【持仓回顾中...】共 {len(positions)} 个持仓")
# TODO: 实现持仓回顾功能
# 1. 获取当前市场信号
# 2. 将持仓信息传递给决策器
# 3. 根据决策调整持仓
logger.info(" 持仓回顾功能待实现")
except Exception as e:
logger.error(f"持仓回顾失败: {e}", exc_info=True)
def _convert_to_real_signal(self, symbol: str, signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""转换 LLM 信号格式为实盘交易格式"""
signal_type = signal.get('type', 'medium_term')
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
# 获取入场类型和入场价
entry_type = signal.get('entry_type', 'market')
entry_price = signal.get('entry_price', current_price)
# 映射 action: buy -> long, sell -> short
action = signal.get('action', 'hold')
side_map = {'buy': 'long', 'sell': 'short'}
return {
'symbol': symbol,
'side': side_map.get(action, 'long'),
'entry_type': entry_type,
'entry_price': entry_price,
'stop_loss': signal.get('stop_loss', 0),
'take_profit': signal.get('take_profit', 0),
'confidence': signal.get('confidence', 0),
'grade': signal.get('grade', 'D'),
'signal_type': type_map.get(signal_type, 'swing'),
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
'trend': signal.get('trend')
}
async def _notify_real_order_created(
self,
symbol: str,
signal: Dict[str, Any],
result: Dict[str, Any]
):
"""发送实盘订单创建通知"""
side = signal.get('action', 'buy')
side_text = "做多" if side == 'buy' else "做空"
side_icon = "🟢" if side == 'buy' else "🔴"
grade = signal.get('grade', 'N/A')
position_size = result.get('position_size', 'light')
quantity = result.get('quantity', 0) # 这是保证金金额
position_value = quantity * 20 # 持仓价值 = 保证金 × 杠杆
title = f"💰 实盘订单已创建 - {symbol}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"⭐ **信号等级**: {grade}",
f"📊 **仓位**: {position_size}",
f"💰 **持仓价值**: ${position_value:,.2f}",
f"🆔 **订单ID**: {result.get('order_id', '')[:12]}...",
f"",
f"⚠️ **真实资金交易中**",
]
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "red")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
logger.info(f"已发送实盘订单创建通知: {result.get('order_id')}")
async def _notify_position_adjustment(
self,
symbol: str,
order_id: str,
decision: Dict[str, Any],
result: Dict[str, Any]
):
"""发送持仓调整通知"""
action = decision.get('action')
reason = decision.get('reason', '')
action_map = {
'ADJUST_SL_TP': '🔄 调整止损止盈',
'PARTIAL_CLOSE': '📤 部分平仓',
'FULL_CLOSE': '🚪 全部平仓'
}
action_text = action_map.get(action, action)
title = f"{action_text} - {symbol}"
content_parts = [
f"📊 **订单**: {order_id[:8]}",
f"📝 **原因**: {reason}",
]
if action == 'ADJUST_SL_TP':
changes = result.get('changes', [])
content_parts.append(f"🔄 **调整内容**: {', '.join(changes)}")
elif action in ['PARTIAL_CLOSE', 'FULL_CLOSE']:
pnl_info = result.get('pnl', {})
if pnl_info:
pnl = pnl_info.get('pnl', 0)
pnl_percent = pnl_info.get('pnl_percent', 0)
content_parts.append(f"💰 **实现盈亏**: ${pnl:+.2f} ({pnl_percent:+.1f}%)")
if action == 'PARTIAL_CLOSE':
close_percent = decision.get('close_percent', 0)
remaining = result.get('remaining_quantity', 0)
content_parts.append(f"📊 **平仓比例**: {close_percent:.0f}%")
content_parts.append(f"💵 **剩余仓位**: ${remaining:,.0f}")
content = "\n".join(content_parts)
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "blue")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
async def _notify_signal_not_executed(
self,
market_signal: Dict[str, Any],
decision: Dict[str, Any],
current_price: float,
is_paper: bool = True,
reason: str = ""
):
"""发送有信号但未执行交易的通知"""
try:
symbol = market_signal.get('symbol')
account_type = "📊" if is_paper else "💰"
# 获取最佳信号
best_signal = self._get_best_signal_from_market(market_signal)
if not best_signal:
return
confidence = best_signal.get('confidence', 0)
entry_type = best_signal.get('entry_type', 'market')
entry_zone = best_signal.get('entry_zone', current_price)
# 决策信息
decision_type = decision.get('decision', 'HOLD')
decision_reasoning = decision.get('reasoning', '')
# 如果有外部传入的 reason订单创建失败的具体原因优先使用
if reason:
final_reason = reason
elif decision_reasoning:
final_reason = decision_reasoning
else:
final_reason = "未知原因"
# 方向图标
action = best_signal.get('action', 'wait')
if action == 'buy':
action_icon = '🟢'
action_text = '做多'
elif action == 'sell':
action_icon = '🔴'
action_text = '做空'
else:
action_icon = ''
action_text = '观望'
# 构建标题
title = f"{account_type} {symbol} 信号未执行"
# 构建内容
content_parts = [
f"{action_icon} **信号**: {action_text} | 📈 信心度: **{confidence}%**",
f"",
f"**入场方式**: {entry_type}",
f"**建议入场价**: ${entry_zone:,.2f}" if isinstance(entry_zone, (int, float)) else f"**建议入场价**: {entry_zone}",
f"**当前价格**: ${current_price:,.2f}",
f"",
f"⚠️ **未执行原因**:",
f"{final_reason}",
]
content = "\n".join(content_parts)
# 发送通知
if self.settings.feishu_enabled:
await self.feishu.send_card(title, content, "orange")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
logger.info(f" 📤 已发送信号未执行通知: {decision_type} - {final_reason[:50]}")
except Exception as e:
logger.warning(f"发送信号未执行通知失败: {e}")
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
"""单次分析(用于测试或手动触发)"""
data = self.exchange.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
return {'error': '数据不完整'}
# 使用新架构:市场分析 + 交易决策
market_signal = await self.market_analyzer.analyze(
symbol, data,
symbols=self.symbols
)
positions, account = self._get_trading_state()
decision = await self.decision_maker.make_decision(
market_signal, positions, account
)
return {
'market_signal': market_signal,
'trading_decision': decision
}
def get_status(self) -> Dict[str, Any]:
"""获取智能体状态"""
return {
'running': self.running,
'symbols': self.symbols,
'mode': 'LLM 驱动',
'last_signals': {
symbol: {
'type': sig.get('type'),
'action': sig.get('action'),
'confidence': sig.get('confidence'),
'grade': sig.get('grade')
}
for symbol, sig in self.last_signals.items()
}
}
# 全局单例
_crypto_agent: Optional['CryptoAgent'] = None
def get_crypto_agent() -> 'CryptoAgent':
"""获取加密货币智能体单例"""
# 直接使用类单例,不使用全局变量(避免 reload 时重置)
return CryptoAgent()