872 lines
35 KiB
Python
872 lines
35 KiB
Python
"""
|
||
加密货币交易智能体 - 主控制器(LLM 驱动版)
|
||
"""
|
||
import asyncio
|
||
from typing import Dict, Any, List, Optional
|
||
from datetime import datetime, timedelta
|
||
import pandas as pd
|
||
|
||
from app.utils.logger import logger
|
||
from app.config import get_settings
|
||
from app.services.bitget_service import bitget_service
|
||
from app.services.feishu_service import get_feishu_service
|
||
from app.services.telegram_service import get_telegram_service
|
||
from app.services.paper_trading_service import get_paper_trading_service
|
||
from app.services.signal_database_service import get_signal_db_service
|
||
from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer
|
||
from app.utils.system_status import get_system_monitor, AgentStatus
|
||
|
||
|
||
class CryptoAgent:
|
||
"""加密货币交易信号智能体(LLM 驱动版)"""
|
||
|
||
_instance = None
|
||
_initialized = False
|
||
|
||
def __new__(cls, *args, **kwargs):
|
||
"""单例模式 - 确保只有一个实例"""
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
"""初始化智能体"""
|
||
# 防止重复初始化
|
||
if CryptoAgent._initialized:
|
||
return
|
||
|
||
CryptoAgent._initialized = True
|
||
self.settings = get_settings()
|
||
self.binance = bitget_service # 使用 Bitget 服务
|
||
self.feishu = get_feishu_service()
|
||
self.telegram = get_telegram_service()
|
||
self.llm_analyzer = LLMSignalAnalyzer()
|
||
self.signal_db = get_signal_db_service() # 信号数据库服务
|
||
|
||
# 模拟交易服务(始终启用)
|
||
self.paper_trading = get_paper_trading_service()
|
||
|
||
# 实盘交易服务(如果配置了 API)
|
||
self.real_trading = None
|
||
try:
|
||
from app.services.real_trading_service import get_real_trading_service
|
||
self.real_trading = get_real_trading_service()
|
||
except Exception as e:
|
||
logger.warning(f"实盘交易服务初始化失败: {e}")
|
||
|
||
# 状态管理
|
||
self.last_signals: Dict[str, Dict[str, Any]] = {}
|
||
self.signal_cooldown: Dict[str, datetime] = {}
|
||
|
||
# 配置
|
||
self.symbols = self.settings.crypto_symbols.split(',')
|
||
|
||
# 运行状态
|
||
self.running = False
|
||
self._event_loop = None
|
||
|
||
# 注册到系统监控
|
||
monitor = get_system_monitor()
|
||
self._monitor_info = monitor.register_agent(
|
||
agent_id="crypto_agent",
|
||
name="加密货币智能体",
|
||
agent_type="crypto"
|
||
)
|
||
monitor.update_config("crypto_agent", {
|
||
"symbols": self.symbols,
|
||
"auto_trading_enabled": self.paper_trading_enabled, # 改名为自动交易
|
||
"analysis_interval": "每5分钟整点"
|
||
})
|
||
|
||
logger.info(f"加密货币智能体初始化完成(LLM 驱动),监控交易对: {self.symbols}")
|
||
logger.info(f"模拟交易: 始终启用")
|
||
|
||
if self.real_trading:
|
||
auto_status = "启用" if self.real_trading.get_auto_trading_status() else "禁用"
|
||
logger.info(f"实盘交易: 已配置 (自动交易: {auto_status})")
|
||
else:
|
||
logger.info(f"实盘交易: 未配置")
|
||
|
||
def _on_price_update(self, symbol: str, price: float):
|
||
"""处理实时价格更新(用于模拟交易)"""
|
||
if not self.paper_trading:
|
||
return
|
||
|
||
triggered = self.paper_trading.check_price_triggers(symbol, price)
|
||
|
||
for result in triggered:
|
||
if self._event_loop and self._event_loop.is_running():
|
||
# 根据事件类型选择不同的通知方法
|
||
event_type = result.get('event_type', 'order_closed')
|
||
if event_type == 'order_filled':
|
||
asyncio.run_coroutine_threadsafe(self._notify_order_filled(result), self._event_loop)
|
||
elif event_type == 'breakeven_triggered':
|
||
asyncio.run_coroutine_threadsafe(self._notify_breakeven_triggered(result), self._event_loop)
|
||
else:
|
||
asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop)
|
||
else:
|
||
logger.warning(f"无法发送通知: 事件循环不可用")
|
||
|
||
async def _notify_order_filled(self, result: Dict[str, Any]):
|
||
"""发送挂单成交通知"""
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
grade = result.get('signal_grade', 'N/A')
|
||
|
||
message = f"""✅ 挂单成交
|
||
|
||
交易对: {result.get('symbol')}
|
||
方向: {side_text}
|
||
等级: {grade}
|
||
挂单价: ${result.get('entry_price', 0):,.2f}
|
||
成交价: ${result.get('filled_price', 0):,.2f}
|
||
仓位: ${result.get('quantity', 0):,.0f}
|
||
止损: ${result.get('stop_loss', 0):,.2f}
|
||
止盈: ${result.get('take_profit', 0):,.2f}"""
|
||
|
||
await self.feishu.send_text(message)
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送挂单成交通知: {result.get('order_id')}")
|
||
|
||
async def _notify_pending_cancelled(self, result: Dict[str, Any]):
|
||
"""发送挂单撤销通知"""
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
new_side_text = "做多" if result.get('new_side') == 'long' else "做空"
|
||
|
||
message = f"""⚠️ 挂单撤销
|
||
|
||
交易对: {result.get('symbol')}
|
||
原方向: {side_text}
|
||
挂单价: ${result.get('entry_price', 0):,.2f}
|
||
原因: 收到反向{new_side_text}信号,自动撤销"""
|
||
|
||
await self.feishu.send_text(message)
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送挂单撤销通知: {result.get('order_id')}")
|
||
|
||
async def _notify_breakeven_triggered(self, result: Dict[str, Any]):
|
||
"""发送移动止损触发通知"""
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
|
||
message = f"""📈 移动止损已启动
|
||
|
||
交易对: {result.get('symbol')}
|
||
方向: {side_text}
|
||
开仓价: ${result.get('filled_price', 0):,.2f}
|
||
当前盈利: {result.get('current_pnl_percent', 0):.2f}%
|
||
新止损价: ${result.get('new_stop_loss', 0):,.2f}
|
||
|
||
💰 锁定利润,让利润奔跑"""
|
||
|
||
await self.feishu.send_text(message)
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送移动止损通知: {result.get('order_id')}")
|
||
|
||
async def _notify_order_closed(self, result: Dict[str, Any]):
|
||
"""发送订单平仓通知"""
|
||
status = result.get('status', '')
|
||
is_win = result.get('is_win', False)
|
||
|
||
if status == 'closed_tp':
|
||
emoji = "🎯"
|
||
status_text = "止盈平仓"
|
||
elif status == 'closed_sl':
|
||
emoji = "🛑"
|
||
status_text = "止损平仓"
|
||
elif status == 'closed_be':
|
||
emoji = "📈"
|
||
status_text = "移动止损"
|
||
else:
|
||
emoji = "📤"
|
||
status_text = "手动平仓"
|
||
|
||
win_text = "盈利" if is_win else "亏损"
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
|
||
message = f"""{emoji} 订单{status_text}
|
||
|
||
交易对: {result.get('symbol')}
|
||
方向: {side_text}
|
||
入场: ${result.get('entry_price', 0):,.2f}
|
||
出场: ${result.get('exit_price', 0):,.2f}
|
||
{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})
|
||
持仓时间: {result.get('hold_duration', 'N/A')}"""
|
||
|
||
await self.feishu.send_text(message)
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送订单平仓通知: {result.get('order_id')}")
|
||
|
||
def _get_seconds_until_next_5min(self) -> int:
|
||
"""计算距离下一个5分钟整点的秒数"""
|
||
now = datetime.now()
|
||
current_minute = now.minute
|
||
current_second = now.second
|
||
|
||
minutes_past = current_minute % 5
|
||
if minutes_past == 0 and current_second == 0:
|
||
return 0
|
||
minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5
|
||
seconds_to_wait = minutes_to_wait * 60 - current_second
|
||
|
||
return seconds_to_wait
|
||
|
||
async def run(self):
|
||
"""主运行循环"""
|
||
self.running = True
|
||
self._event_loop = asyncio.get_event_loop()
|
||
|
||
# 更新状态为启动中
|
||
monitor = get_system_monitor()
|
||
monitor.update_status("crypto_agent", AgentStatus.STARTING)
|
||
|
||
# 启动横幅
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("🚀 加密货币交易信号智能体(LLM 驱动)")
|
||
logger.info("=" * 60)
|
||
logger.info(f" 监控交易对: {', '.join(self.symbols)}")
|
||
logger.info(f" 运行模式: 每5分钟整点执行")
|
||
logger.info(f" 分析引擎: LLM 自主分析")
|
||
if self.paper_trading_enabled:
|
||
logger.info(f" 模拟交易: 已启用")
|
||
logger.info("=" * 60 + "\n")
|
||
|
||
# 更新状态为运行中
|
||
monitor.update_status("crypto_agent", AgentStatus.RUNNING)
|
||
|
||
# 注意:不再启动独立的价格监控
|
||
# 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查
|
||
if self.paper_trading_enabled:
|
||
logger.info(f"模拟交易已启用(由后台统一监控)")
|
||
|
||
# 发送启动通知
|
||
await self.feishu.send_text(
|
||
f"🚀 加密货币智能体已启动(LLM 驱动)\n"
|
||
f"监控交易对: {', '.join(self.symbols)}\n"
|
||
f"运行时间: 每5分钟整点"
|
||
)
|
||
await self.telegram.send_startup_notification(self.symbols)
|
||
|
||
while self.running:
|
||
try:
|
||
wait_seconds = self._get_seconds_until_next_5min()
|
||
if wait_seconds > 0:
|
||
next_run = datetime.now() + timedelta(seconds=wait_seconds)
|
||
logger.info(f"⏳ 等待 {wait_seconds} 秒,下次运行: {next_run.strftime('%H:%M:%S')}")
|
||
await asyncio.sleep(wait_seconds)
|
||
|
||
run_time = datetime.now()
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]")
|
||
logger.info("=" * 60)
|
||
|
||
for symbol in self.symbols:
|
||
await self.analyze_symbol(symbol)
|
||
|
||
logger.info("\n" + "─" * 60)
|
||
logger.info(f"✅ 本轮分析完成,共分析 {len(self.symbols)} 个交易对")
|
||
logger.info("─" * 60 + "\n")
|
||
|
||
await asyncio.sleep(2)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 分析循环出错: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
await asyncio.sleep(10)
|
||
|
||
def stop(self):
|
||
"""停止运行"""
|
||
self.running = False
|
||
|
||
# 更新状态
|
||
monitor = get_system_monitor()
|
||
monitor.update_status("crypto_agent", AgentStatus.STOPPED)
|
||
|
||
logger.info("加密货币智能体已停止")
|
||
|
||
def _check_volatility(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[bool, str, float]:
|
||
"""
|
||
检查波动率,判断是否值得进行 LLM 分析(组合方案)
|
||
|
||
使用 1 小时 K 线判断趋势波动,5 分钟 K 线检测突发波动
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
data: 多周期K线数据
|
||
|
||
Returns:
|
||
(should_analyze, reason, volatility_percent)
|
||
should_analyze: 是否应该进行分析
|
||
reason: 原因说明
|
||
volatility_percent: 1小时波动率百分比
|
||
"""
|
||
# 检查是否启用波动率过滤
|
||
if not self.settings.crypto_volatility_filter_enabled:
|
||
return True, "波动率过滤未启用", 0
|
||
|
||
try:
|
||
# 1. 首先检查 1 小时趋势波动率
|
||
df_1h = data.get('1h')
|
||
if df_1h is None or len(df_1h) < 20:
|
||
# 数据不足,保守起见允许分析
|
||
return True, "1小时数据不足,允许分析", 0
|
||
|
||
# 获取最近20根K线
|
||
recent_1h = df_1h.iloc[-20:]
|
||
|
||
# 计算最高价和最低价
|
||
high = recent_1h['high'].max()
|
||
low = recent_1h['low'].min()
|
||
current_price = float(recent_1h.iloc[-1]['close'])
|
||
|
||
# 计算1小时波动率
|
||
if low > 0:
|
||
volatility_1h_percent = ((high - low) / low) * 100
|
||
else:
|
||
volatility_1h_percent = 0
|
||
|
||
# 计算价格变化范围(相对于当前价格)
|
||
price_range_high_percent = ((high - current_price) / current_price) * 100 if current_price > 0 else 0
|
||
price_range_low_percent = ((current_price - low) / current_price) * 100 if current_price > 0 else 0
|
||
|
||
# 从配置读取阈值
|
||
min_volatility = self.settings.crypto_min_volatility_percent
|
||
min_price_range = self.settings.crypto_min_price_range_percent
|
||
|
||
# 如果1小时波动率足够大,直接允许分析
|
||
if volatility_1h_percent >= min_volatility or price_range_high_percent >= min_price_range or price_range_low_percent >= min_price_range:
|
||
return True, f"1小时趋势活跃 (波动率 {volatility_1h_percent:.2f}%),值得分析", volatility_1h_percent
|
||
|
||
# 2. 1小时波动率较低,检查5分钟突发波动
|
||
df_5m = data.get('5m')
|
||
if df_5m is not None and len(df_5m) >= 3:
|
||
# 获取最近3根5分钟K线(15分钟内的变化)
|
||
recent_5m = df_5m.iloc[-3:]
|
||
|
||
# 计算5分钟价格变化幅度
|
||
price_start = float(recent_5m.iloc[0]['close'])
|
||
price_end = float(recent_5m.iloc[-1]['close'])
|
||
|
||
if price_start > 0:
|
||
price_change_5m = abs(price_end - price_start) / price_start * 100
|
||
else:
|
||
price_change_5m = 0
|
||
|
||
# 从配置读取5分钟突发波动阈值
|
||
surge_threshold = self.settings.crypto_5m_surge_threshold
|
||
|
||
logger.debug(f"{symbol} 5分钟价格变化: {price_start:.2f} -> {price_end:.2f} = {price_change_5m:.2f}% (阈值: {surge_threshold}%)")
|
||
|
||
# 如果5分钟突发波动超过阈值,仍然允许分析
|
||
if price_change_5m >= surge_threshold:
|
||
direction = "上涨" if price_end > price_start else "下跌"
|
||
return True, f"5分钟突发{direction} ({price_change_5m:.2f}% > {surge_threshold}%),强制分析", volatility_1h_percent
|
||
|
||
# 3. 波动率过低,跳过分析
|
||
reason = f"波动率过低 (1小时: {volatility_1h_percent:.2f}% < {min_volatility}%, 5分钟无突发波动),跳过分析"
|
||
logger.info(f"⏸️ {symbol} {reason}")
|
||
return False, reason, volatility_1h_percent
|
||
|
||
except Exception as e:
|
||
logger.warning(f"{symbol} 波动率检查失败: {e},允许分析")
|
||
return True, "波动率检查失败,允许分析", 0
|
||
|
||
async def analyze_symbol(self, symbol: str):
|
||
"""
|
||
分析单个交易对(LLM 驱动)
|
||
|
||
Args:
|
||
symbol: 交易对,如 'BTCUSDT'
|
||
"""
|
||
try:
|
||
# 更新活动时间
|
||
monitor = get_system_monitor()
|
||
monitor.update_activity("crypto_agent")
|
||
|
||
logger.info(f"\n{'─' * 50}")
|
||
logger.info(f"📊 {symbol} 分析开始")
|
||
logger.info(f"{'─' * 50}")
|
||
|
||
# 1. 获取多周期数据
|
||
data = self.binance.get_multi_timeframe_data(symbol)
|
||
|
||
if not self._validate_data(data):
|
||
logger.warning(f"⚠️ {symbol} 数据不完整,跳过分析")
|
||
return
|
||
|
||
# 当前价格
|
||
current_price = float(data['5m'].iloc[-1]['close'])
|
||
price_change_24h = self._calculate_price_change(data['1h'])
|
||
logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})")
|
||
|
||
# 1.5. 波动率检查(节省 LLM 调用)
|
||
should_analyze, volatility_reason, volatility = self._check_volatility(symbol, data)
|
||
if not should_analyze:
|
||
logger.info(f"⏸️ {volatility_reason},跳过本次 LLM 分析")
|
||
return
|
||
|
||
# 获取当前持仓信息(供 LLM 仓位决策)
|
||
position_info = self.paper_trading.get_position_info()
|
||
|
||
# 2. LLM 分析(包含新闻舆情和持仓信息)
|
||
logger.info(f"\n🤖 【LLM 分析中...】")
|
||
result = await self.llm_analyzer.analyze(
|
||
symbol, data,
|
||
symbols=self.symbols,
|
||
position_info=position_info
|
||
)
|
||
|
||
# 输出分析摘要
|
||
summary = result.get('analysis_summary', '无')
|
||
logger.info(f" 市场状态: {summary}")
|
||
|
||
# 输出新闻情绪
|
||
news_sentiment = result.get('news_sentiment', '')
|
||
news_impact = result.get('news_impact', '')
|
||
if news_sentiment:
|
||
sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': '➖'}.get(news_sentiment, '')
|
||
logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}")
|
||
if news_impact:
|
||
logger.info(f" 消息影响: {news_impact}")
|
||
|
||
# 输出关键价位
|
||
levels = result.get('key_levels', {})
|
||
if levels.get('support') or levels.get('resistance'):
|
||
support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]])
|
||
resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]])
|
||
logger.info(f" 支撑位: {support_str or '-'}")
|
||
logger.info(f" 阻力位: {resistance_str or '-'}")
|
||
|
||
# 2.5. 回顾并调整现有持仓(LLM 主动管理)
|
||
if self.paper_trading_enabled and self.paper_trading:
|
||
await self._review_and_adjust_positions(symbol, data)
|
||
|
||
# 3. 处理信号
|
||
signals = result.get('signals', [])
|
||
|
||
if not signals:
|
||
logger.info(f"\n⏸️ 结论: 无交易信号,继续观望")
|
||
return
|
||
|
||
# 输出所有信号
|
||
logger.info(f"\n🎯 【发现 {len(signals)} 个信号】")
|
||
|
||
for sig in signals:
|
||
signal_type = sig.get('type', 'unknown')
|
||
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
|
||
type_text = type_map.get(signal_type, signal_type)
|
||
|
||
action = sig.get('action', 'wait')
|
||
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'}
|
||
action_text = action_map.get(action, action)
|
||
|
||
grade = sig.get('grade', 'D')
|
||
confidence = sig.get('confidence', 0)
|
||
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(grade, '')
|
||
|
||
logger.info(f"\n [{type_text}] {action_text}")
|
||
logger.info(f" 等级: {grade} {grade_icon} | 置信度: {confidence}%")
|
||
logger.info(f" 入场: ${sig.get('entry_price', 0):,.2f} | "
|
||
f"止损: ${sig.get('stop_loss', 0):,.2f} | "
|
||
f"止盈: ${sig.get('take_profit', 0):,.2f}")
|
||
logger.info(f" 原因: {sig.get('reason', '无')[:100]}")
|
||
|
||
if sig.get('risk_warning'):
|
||
logger.info(f" 风险: {sig.get('risk_warning')}")
|
||
|
||
# 4. 选择最佳信号发送通知
|
||
best_signal = self.llm_analyzer.get_best_signal(result)
|
||
|
||
if best_signal and self._should_send_signal(symbol, best_signal):
|
||
logger.info(f"\n📤 【发送通知】")
|
||
|
||
# 构建通知消息
|
||
telegram_message = self.llm_analyzer.format_signal_message(best_signal, symbol)
|
||
feishu_card = self.llm_analyzer.format_feishu_card(best_signal, symbol)
|
||
|
||
# 发送通知 - 飞书使用卡片格式,Telegram 使用文本格式
|
||
await self.feishu.send_card(
|
||
feishu_card['title'],
|
||
feishu_card['content'],
|
||
feishu_card['color']
|
||
)
|
||
await self.telegram.send_message(telegram_message)
|
||
|
||
logger.info(f" ✅ 已发送信号通知")
|
||
|
||
# 保存信号到数据库
|
||
signal_to_save = best_signal.copy()
|
||
signal_to_save['signal_type'] = 'crypto'
|
||
signal_to_save['symbol'] = symbol
|
||
signal_to_save['current_price'] = current_price
|
||
self.signal_db.add_signal(signal_to_save)
|
||
|
||
# 更新状态
|
||
self.last_signals[symbol] = best_signal
|
||
self.signal_cooldown[symbol] = datetime.now()
|
||
|
||
# 5. 创建模拟订单(始终执行)
|
||
if self.paper_trading:
|
||
grade = best_signal.get('grade', 'D')
|
||
position_size = best_signal.get('position_size', 'light')
|
||
if grade != 'D':
|
||
# 转换信号格式以兼容 paper_trading
|
||
paper_signal = self._convert_to_paper_signal(symbol, best_signal, current_price)
|
||
result = self.paper_trading.create_order_from_signal(paper_signal, current_price)
|
||
|
||
# 发送被取消挂单的通知
|
||
cancelled_orders = result.get('cancelled_orders', [])
|
||
for cancelled in cancelled_orders:
|
||
await self._notify_pending_cancelled(cancelled)
|
||
|
||
# 记录新订单
|
||
order = result.get('order')
|
||
if order:
|
||
logger.info(f" 📝 已创建模拟订单: {order.order_id} | 仓位: {position_size}")
|
||
|
||
# 6. 创建实盘订单(如果启用了自动交易)
|
||
if self.real_trading and self.real_trading.get_auto_trading_status():
|
||
grade = best_signal.get('grade', 'D')
|
||
position_size = best_signal.get('position_size', 'light')
|
||
if grade != 'D':
|
||
# 转换信号格式以兼容 real_trading
|
||
real_signal = self._convert_to_real_signal(symbol, best_signal, current_price)
|
||
result = self.real_trading.create_order_from_signal(real_signal, current_price)
|
||
|
||
if result.get('success'):
|
||
logger.info(f" 💰 已创建实盘订单: {result.get('order_id')} | 仓位: {position_size} | 数量: ${result.get('quantity', 0):.2f}")
|
||
|
||
# 发送实盘订单成交通知
|
||
await self._notify_real_order_created(symbol, best_signal, result)
|
||
elif not result.get('skipped'):
|
||
# 只有非跳过的情况才记录错误
|
||
logger.warning(f" ⚠️ 实盘订单创建失败: {result.get('message')}")
|
||
else:
|
||
if best_signal:
|
||
logger.info(f"\n⏸️ 信号冷却中或置信度不足,不发送通知")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 分析 {symbol} 出错: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any],
|
||
current_price: float) -> Dict[str, Any]:
|
||
"""转换 LLM 信号格式为模拟交易格式"""
|
||
signal_type = signal.get('type', 'medium_term')
|
||
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
|
||
|
||
# 获取入场类型和入场价
|
||
entry_type = signal.get('entry_type', 'market')
|
||
entry_price = signal.get('entry_price', current_price)
|
||
|
||
return {
|
||
'symbol': symbol,
|
||
'action': signal.get('action', 'hold'),
|
||
'entry_type': entry_type, # market 或 limit
|
||
'entry_price': entry_price, # 入场价(挂单价格)
|
||
'price': current_price, # 当前价格
|
||
'stop_loss': signal.get('stop_loss', 0),
|
||
'take_profit': signal.get('take_profit', 0),
|
||
'confidence': signal.get('confidence', 0),
|
||
'signal_grade': signal.get('grade', 'D'),
|
||
'signal_type': type_map.get(signal_type, 'swing'),
|
||
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
|
||
'reasons': [signal.get('reason', '')],
|
||
'timestamp': datetime.now()
|
||
}
|
||
|
||
def _calculate_price_change(self, h1_data: pd.DataFrame) -> str:
|
||
"""计算24小时价格变化"""
|
||
if len(h1_data) < 24:
|
||
return "N/A"
|
||
price_now = h1_data.iloc[-1]['close']
|
||
price_24h_ago = h1_data.iloc[-24]['close']
|
||
change = ((price_now - price_24h_ago) / price_24h_ago) * 100
|
||
if change >= 0:
|
||
return f"+{change:.2f}%"
|
||
return f"{change:.2f}%"
|
||
|
||
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
|
||
"""验证数据完整性"""
|
||
required_intervals = ['5m', '15m', '1h', '4h']
|
||
for interval in required_intervals:
|
||
if interval not in data or data[interval].empty:
|
||
return False
|
||
if len(data[interval]) < 20:
|
||
return False
|
||
return True
|
||
|
||
def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool:
|
||
"""判断是否应该发送信号"""
|
||
action = signal.get('action', 'wait')
|
||
if action == 'wait':
|
||
return False
|
||
|
||
confidence = signal.get('confidence', 0)
|
||
# 使用配置文件中的阈值
|
||
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
|
||
if confidence < threshold:
|
||
return False
|
||
|
||
# 检查冷却时间(30分钟内不重复发送相同方向的信号)
|
||
if symbol in self.signal_cooldown:
|
||
cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30)
|
||
if datetime.now() < cooldown_end:
|
||
if symbol in self.last_signals:
|
||
if self.last_signals[symbol].get('action') == action:
|
||
logger.debug(f"{symbol} 信号冷却中,跳过")
|
||
return False
|
||
|
||
return True
|
||
|
||
async def _review_and_adjust_positions(
|
||
self,
|
||
symbol: str,
|
||
data: Dict[str, pd.DataFrame]
|
||
):
|
||
"""
|
||
回顾并调整现有持仓(LLM 主动管理)
|
||
|
||
每次分析后自动回顾该交易对的所有持仓,让 LLM 决定是否需要:
|
||
- 调整止损止盈
|
||
- 部分平仓
|
||
- 全部平仓
|
||
"""
|
||
try:
|
||
# 获取该交易对的所有活跃持仓(只看已成交的)
|
||
active_orders = self.paper_trading.get_active_orders()
|
||
positions = [
|
||
order for order in active_orders
|
||
if order.get('symbol') == symbol
|
||
and order.get('status') == 'open'
|
||
and order.get('filled_price') # 只处理已成交的订单
|
||
]
|
||
|
||
if not positions:
|
||
return # 没有持仓需要回顾
|
||
|
||
logger.info(f"\n🔄 【LLM 回顾持仓中...】共 {len(positions)} 个持仓")
|
||
|
||
# 准备持仓数据
|
||
position_data = []
|
||
for order in positions:
|
||
entry_price = order.get('filled_price') or order.get('entry_price', 0)
|
||
current_price = self.binance.get_ticker(symbol).get('lastPrice', entry_price)
|
||
if isinstance(current_price, str):
|
||
current_price = float(current_price)
|
||
|
||
# 计算盈亏百分比
|
||
side = order.get('side')
|
||
if side == 'long':
|
||
pnl_percent = ((current_price - entry_price) / entry_price) * 100
|
||
else:
|
||
pnl_percent = ((entry_price - current_price) / entry_price) * 100
|
||
|
||
position_data.append({
|
||
'order_id': order.get('order_id'),
|
||
'side': side,
|
||
'entry_price': entry_price,
|
||
'current_price': current_price,
|
||
'stop_loss': order.get('stop_loss', 0),
|
||
'take_profit': order.get('take_profit', 0),
|
||
'quantity': order.get('quantity', 0),
|
||
'pnl_percent': pnl_percent,
|
||
'open_time': order.get('open_time')
|
||
})
|
||
|
||
# 调用 LLM 回顾分析
|
||
decisions = await self.llm_analyzer.review_positions(symbol, position_data, data)
|
||
|
||
if not decisions:
|
||
logger.info(" LLM 建议保持所有持仓不变")
|
||
return
|
||
|
||
# 执行 LLM 的调整建议
|
||
logger.info(f"\n📝 【LLM 调整建议】共 {len(decisions)} 个")
|
||
|
||
for decision in decisions:
|
||
order_id = decision.get('order_id')
|
||
action = decision.get('action')
|
||
reason = decision.get('reason', '')
|
||
|
||
action_map = {
|
||
'HOLD': '保持',
|
||
'ADJUST_SL_TP': '调整止损止盈',
|
||
'PARTIAL_CLOSE': '部分平仓',
|
||
'FULL_CLOSE': '全部平仓'
|
||
}
|
||
action_text = action_map.get(action, action)
|
||
|
||
logger.info(f" 订单 {order_id[:8]}: {action_text} - {reason}")
|
||
|
||
# 执行调整
|
||
result = self.paper_trading.adjust_position_by_llm(
|
||
order_id=order_id,
|
||
action=action,
|
||
new_sl=decision.get('new_sl'),
|
||
new_tp=decision.get('new_tp'),
|
||
close_percent=decision.get('close_percent')
|
||
)
|
||
|
||
if result.get('success'):
|
||
# 发送通知
|
||
await self._notify_position_adjustment(symbol, order_id, decision, result)
|
||
|
||
# 如果是平仓操作,从活跃订单中移除
|
||
if action in ['PARTIAL_CLOSE', 'FULL_CLOSE']:
|
||
closed_result = result.get('pnl', {})
|
||
if closed_result:
|
||
pnl = closed_result.get('pnl', 0)
|
||
pnl_percent = closed_result.get('pnl_percent', 0)
|
||
logger.info(f" ✅ 已平仓: PnL ${pnl:+.2f} ({pnl_percent:+.1f}%)")
|
||
else:
|
||
logger.warning(f" ❌ 执行失败: {result.get('error')}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"持仓回顾失败: {e}", exc_info=True)
|
||
|
||
def _convert_to_real_signal(self, symbol: str, signal: Dict[str, Any],
|
||
current_price: float) -> Dict[str, Any]:
|
||
"""转换 LLM 信号格式为实盘交易格式"""
|
||
signal_type = signal.get('type', 'medium_term')
|
||
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
|
||
|
||
# 获取入场类型和入场价
|
||
entry_type = signal.get('entry_type', 'market')
|
||
entry_price = signal.get('entry_price', current_price)
|
||
|
||
# 映射 action: buy -> long, sell -> short
|
||
action = signal.get('action', 'hold')
|
||
side_map = {'buy': 'long', 'sell': 'short'}
|
||
|
||
return {
|
||
'symbol': symbol,
|
||
'side': side_map.get(action, 'long'),
|
||
'entry_type': entry_type,
|
||
'entry_price': entry_price,
|
||
'stop_loss': signal.get('stop_loss', 0),
|
||
'take_profit': signal.get('take_profit', 0),
|
||
'confidence': signal.get('confidence', 0),
|
||
'grade': signal.get('grade', 'D'),
|
||
'signal_type': type_map.get(signal_type, 'swing'),
|
||
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
|
||
'trend': signal.get('trend')
|
||
}
|
||
|
||
async def _notify_real_order_created(
|
||
self,
|
||
symbol: str,
|
||
signal: Dict[str, Any],
|
||
result: Dict[str, Any]
|
||
):
|
||
"""发送实盘订单创建通知"""
|
||
side = signal.get('action', 'buy')
|
||
side_text = "做多" if side == 'buy' else "做空"
|
||
grade = signal.get('grade', 'N/A')
|
||
position_size = result.get('position_size', 'light')
|
||
quantity = result.get('quantity', 0)
|
||
|
||
message = f"""💰 实盘订单已创建
|
||
|
||
交易对: {symbol}
|
||
方向: {side_text}
|
||
等级: {grade}
|
||
仓位: {position_size}
|
||
数量: ${quantity:.2f}
|
||
订单 ID: {result.get('order_id', '')[:12]}...
|
||
|
||
⚠️ 真实资金交易中"""
|
||
|
||
await self.feishu.send_text(message)
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送实盘订单创建通知: {result.get('order_id')}")
|
||
|
||
async def _notify_position_adjustment(
|
||
self,
|
||
symbol: str,
|
||
order_id: str,
|
||
decision: Dict[str, Any],
|
||
result: Dict[str, Any]
|
||
):
|
||
"""发送持仓调整通知"""
|
||
action = decision.get('action')
|
||
reason = decision.get('reason', '')
|
||
|
||
action_map = {
|
||
'ADJUST_SL_TP': '🔄 调整止损止盈',
|
||
'PARTIAL_CLOSE': '📤 部分平仓',
|
||
'FULL_CLOSE': '🚪 全部平仓'
|
||
}
|
||
action_text = action_map.get(action, action)
|
||
|
||
message = f"""{action_text}
|
||
|
||
交易对: {symbol}
|
||
订单: {order_id[:8]}
|
||
原因: {reason}"""
|
||
|
||
if action == 'ADJUST_SL_TP':
|
||
changes = result.get('changes', [])
|
||
message += f"\n调整内容: {', '.join(changes)}"
|
||
|
||
elif action in ['PARTIAL_CLOSE', 'FULL_CLOSE']:
|
||
pnl_info = result.get('pnl', {})
|
||
if pnl_info:
|
||
pnl = pnl_info.get('pnl', 0)
|
||
pnl_percent = pnl_info.get('pnl_percent', 0)
|
||
message += f"\n实现盈亏: ${pnl:+.2f} ({pnl_percent:+.1f}%)"
|
||
|
||
if action == 'PARTIAL_CLOSE':
|
||
close_percent = decision.get('close_percent', 0)
|
||
remaining = result.get('remaining_quantity', 0)
|
||
message += f"\n平仓比例: {close_percent:.0f}%"
|
||
message += f"\n剩余仓位: ${remaining:,.0f}"
|
||
|
||
await self.feishu.send_text(message)
|
||
await self.telegram.send_message(message)
|
||
|
||
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
|
||
"""单次分析(用于测试或手动触发)"""
|
||
data = self.binance.get_multi_timeframe_data(symbol)
|
||
|
||
if not self._validate_data(data):
|
||
return {'error': '数据不完整'}
|
||
|
||
# 获取持仓信息
|
||
position_info = self.paper_trading.get_position_info()
|
||
|
||
result = await self.llm_analyzer.analyze(
|
||
symbol, data,
|
||
symbols=self.symbols,
|
||
position_info=position_info
|
||
)
|
||
return result
|
||
|
||
def get_status(self) -> Dict[str, Any]:
|
||
"""获取智能体状态"""
|
||
return {
|
||
'running': self.running,
|
||
'symbols': self.symbols,
|
||
'mode': 'LLM 驱动',
|
||
'last_signals': {
|
||
symbol: {
|
||
'type': sig.get('type'),
|
||
'action': sig.get('action'),
|
||
'confidence': sig.get('confidence'),
|
||
'grade': sig.get('grade')
|
||
}
|
||
for symbol, sig in self.last_signals.items()
|
||
}
|
||
}
|
||
|
||
|
||
# 全局单例
|
||
_crypto_agent: Optional['CryptoAgent'] = None
|
||
|
||
|
||
def get_crypto_agent() -> 'CryptoAgent':
|
||
"""获取加密货币智能体单例"""
|
||
# 直接使用类单例,不使用全局变量(避免 reload 时重置)
|
||
return CryptoAgent()
|
||
|