stock-ai-agent/backend/app/crypto_agent/crypto_agent.py
2026-02-22 11:33:53 +08:00

759 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
加密货币交易智能体 - 主控制器LLM 驱动版)
"""
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pandas as pd
from app.utils.logger import logger
from app.config import get_settings
from app.services.binance_service import binance_service
from app.services.feishu_service import get_feishu_service
from app.services.telegram_service import get_telegram_service
from app.services.paper_trading_service import get_paper_trading_service
from app.services.signal_database_service import get_signal_db_service
from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer
class CryptoAgent:
"""加密货币交易信号智能体LLM 驱动版)"""
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
"""单例模式 - 确保只有一个实例"""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""初始化智能体"""
# 防止重复初始化
if CryptoAgent._initialized:
return
CryptoAgent._initialized = True
self.settings = get_settings()
self.binance = binance_service
self.feishu = get_feishu_service()
self.telegram = get_telegram_service()
self.llm_analyzer = LLMSignalAnalyzer()
self.signal_db = get_signal_db_service() # 信号数据库服务
# 模拟交易服务
self.paper_trading_enabled = self.settings.paper_trading_enabled
if self.paper_trading_enabled:
self.paper_trading = get_paper_trading_service()
else:
self.paper_trading = None
# 状态管理
self.last_signals: Dict[str, Dict[str, Any]] = {}
self.signal_cooldown: Dict[str, datetime] = {}
# 配置
self.symbols = self.settings.crypto_symbols.split(',')
# 运行状态
self.running = False
self._event_loop = None
logger.info(f"加密货币智能体初始化完成LLM 驱动),监控交易对: {self.symbols}")
if self.paper_trading_enabled:
logger.info(f"模拟交易已启用")
def _on_price_update(self, symbol: str, price: float):
"""处理实时价格更新(用于模拟交易)"""
if not self.paper_trading:
return
triggered = self.paper_trading.check_price_triggers(symbol, price)
for result in triggered:
if self._event_loop and self._event_loop.is_running():
# 根据事件类型选择不同的通知方法
event_type = result.get('event_type', 'order_closed')
if event_type == 'order_filled':
asyncio.run_coroutine_threadsafe(self._notify_order_filled(result), self._event_loop)
elif event_type == 'breakeven_triggered':
asyncio.run_coroutine_threadsafe(self._notify_breakeven_triggered(result), self._event_loop)
else:
asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop)
else:
logger.warning(f"无法发送通知: 事件循环不可用")
async def _notify_order_filled(self, result: Dict[str, Any]):
"""发送挂单成交通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
grade = result.get('signal_grade', 'N/A')
message = f"""✅ 挂单成交
交易对: {result.get('symbol')}
方向: {side_text}
等级: {grade}
挂单价: ${result.get('entry_price', 0):,.2f}
成交价: ${result.get('filled_price', 0):,.2f}
仓位: ${result.get('quantity', 0):,.0f}
止损: ${result.get('stop_loss', 0):,.2f}
止盈: ${result.get('take_profit', 0):,.2f}"""
await self.feishu.send_text(message)
await self.telegram.send_message(message)
logger.info(f"已发送挂单成交通知: {result.get('order_id')}")
async def _notify_pending_cancelled(self, result: Dict[str, Any]):
"""发送挂单撤销通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
new_side_text = "做多" if result.get('new_side') == 'long' else "做空"
message = f"""⚠️ 挂单撤销
交易对: {result.get('symbol')}
原方向: {side_text}
挂单价: ${result.get('entry_price', 0):,.2f}
原因: 收到反向{new_side_text}信号,自动撤销"""
await self.feishu.send_text(message)
await self.telegram.send_message(message)
logger.info(f"已发送挂单撤销通知: {result.get('order_id')}")
async def _notify_breakeven_triggered(self, result: Dict[str, Any]):
"""发送保本止损触发通知"""
side_text = "做多" if result.get('side') == 'long' else "做空"
message = f"""🛡️ 保本止损已启动
交易对: {result.get('symbol')}
方向: {side_text}
开仓价: ${result.get('filled_price', 0):,.2f}
当前盈利: {result.get('current_pnl_percent', 0):.2f}%
止损已移至: ${result.get('new_stop_loss', 0):,.2f}
✅ 本单已锁定保本,不会亏损"""
await self.feishu.send_text(message)
await self.telegram.send_message(message)
logger.info(f"已发送保本止损通知: {result.get('order_id')}")
async def _notify_order_closed(self, result: Dict[str, Any]):
"""发送订单平仓通知"""
status = result.get('status', '')
is_win = result.get('is_win', False)
if status == 'closed_tp':
emoji = "🎯"
status_text = "止盈平仓"
elif status == 'closed_sl':
emoji = "🛑"
status_text = "止损平仓"
elif status == 'closed_be':
emoji = "🔒"
status_text = "保本止损"
else:
emoji = "📤"
status_text = "手动平仓"
win_text = "盈利" if is_win else "亏损"
side_text = "做多" if result.get('side') == 'long' else "做空"
message = f"""{emoji} 订单{status_text}
交易对: {result.get('symbol')}
方向: {side_text}
入场: ${result.get('entry_price', 0):,.2f}
出场: ${result.get('exit_price', 0):,.2f}
{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})
持仓时间: {result.get('hold_duration', 'N/A')}"""
await self.feishu.send_text(message)
await self.telegram.send_message(message)
logger.info(f"已发送订单平仓通知: {result.get('order_id')}")
def _get_seconds_until_next_5min(self) -> int:
"""计算距离下一个5分钟整点的秒数"""
now = datetime.now()
current_minute = now.minute
current_second = now.second
minutes_past = current_minute % 5
if minutes_past == 0 and current_second == 0:
return 0
minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5
seconds_to_wait = minutes_to_wait * 60 - current_second
return seconds_to_wait
async def run(self):
"""主运行循环"""
self.running = True
self._event_loop = asyncio.get_event_loop()
# 启动横幅
logger.info("\n" + "=" * 60)
logger.info("🚀 加密货币交易信号智能体LLM 驱动)")
logger.info("=" * 60)
logger.info(f" 监控交易对: {', '.join(self.symbols)}")
logger.info(f" 运行模式: 每5分钟整点执行")
logger.info(f" 分析引擎: LLM 自主分析")
if self.paper_trading_enabled:
logger.info(f" 模拟交易: 已启用")
logger.info("=" * 60 + "\n")
# 注意:不再启动独立的价格监控
# 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查
if self.paper_trading_enabled:
logger.info(f"模拟交易已启用(由后台统一监控)")
# 发送启动通知
await self.feishu.send_text(
f"🚀 加密货币智能体已启动LLM 驱动)\n"
f"监控交易对: {', '.join(self.symbols)}\n"
f"运行时间: 每5分钟整点"
)
await self.telegram.send_startup_notification(self.symbols)
while self.running:
try:
wait_seconds = self._get_seconds_until_next_5min()
if wait_seconds > 0:
next_run = datetime.now() + timedelta(seconds=wait_seconds)
logger.info(f"⏳ 等待 {wait_seconds} 秒,下次运行: {next_run.strftime('%H:%M:%S')}")
await asyncio.sleep(wait_seconds)
run_time = datetime.now()
logger.info("\n" + "=" * 60)
logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]")
logger.info("=" * 60)
for symbol in self.symbols:
await self.analyze_symbol(symbol)
logger.info("\n" + "" * 60)
logger.info(f"✅ 本轮分析完成,共分析 {len(self.symbols)} 个交易对")
logger.info("" * 60 + "\n")
await asyncio.sleep(2)
except Exception as e:
logger.error(f"❌ 分析循环出错: {e}")
import traceback
logger.error(traceback.format_exc())
await asyncio.sleep(10)
def stop(self):
"""停止运行"""
self.running = False
logger.info("加密货币智能体已停止")
def _check_volatility(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[bool, str, float]:
"""
检查波动率,判断是否值得进行 LLM 分析(组合方案)
使用 1 小时 K 线判断趋势波动5 分钟 K 线检测突发波动
Args:
symbol: 交易对
data: 多周期K线数据
Returns:
(should_analyze, reason, volatility_percent)
should_analyze: 是否应该进行分析
reason: 原因说明
volatility_percent: 1小时波动率百分比
"""
# 检查是否启用波动率过滤
if not self.settings.crypto_volatility_filter_enabled:
return True, "波动率过滤未启用", 0
try:
# 1. 首先检查 1 小时趋势波动率
df_1h = data.get('1h')
if df_1h is None or len(df_1h) < 20:
# 数据不足,保守起见允许分析
return True, "1小时数据不足允许分析", 0
# 获取最近20根K线
recent_1h = df_1h.iloc[-20:]
# 计算最高价和最低价
high = recent_1h['high'].max()
low = recent_1h['low'].min()
current_price = float(recent_1h.iloc[-1]['close'])
# 计算1小时波动率
if low > 0:
volatility_1h_percent = ((high - low) / low) * 100
else:
volatility_1h_percent = 0
# 计算价格变化范围(相对于当前价格)
price_range_high_percent = ((high - current_price) / current_price) * 100 if current_price > 0 else 0
price_range_low_percent = ((current_price - low) / current_price) * 100 if current_price > 0 else 0
# 从配置读取阈值
min_volatility = self.settings.crypto_min_volatility_percent
min_price_range = self.settings.crypto_min_price_range_percent
# 如果1小时波动率足够大直接允许分析
if volatility_1h_percent >= min_volatility or price_range_high_percent >= min_price_range or price_range_low_percent >= min_price_range:
return True, f"1小时趋势活跃 (波动率 {volatility_1h_percent:.2f}%),值得分析", volatility_1h_percent
# 2. 1小时波动率较低检查5分钟突发波动
df_5m = data.get('5m')
if df_5m is not None and len(df_5m) >= 3:
# 获取最近3根5分钟K线15分钟内的变化
recent_5m = df_5m.iloc[-3:]
# 计算5分钟价格变化幅度
price_start = float(recent_5m.iloc[0]['close'])
price_end = float(recent_5m.iloc[-1]['close'])
if price_start > 0:
price_change_5m = abs(price_end - price_start) / price_start * 100
else:
price_change_5m = 0
# 从配置读取5分钟突发波动阈值
surge_threshold = self.settings.crypto_5m_surge_threshold
logger.debug(f"{symbol} 5分钟价格变化: {price_start:.2f} -> {price_end:.2f} = {price_change_5m:.2f}% (阈值: {surge_threshold}%)")
# 如果5分钟突发波动超过阈值仍然允许分析
if price_change_5m >= surge_threshold:
direction = "上涨" if price_end > price_start else "下跌"
return True, f"5分钟突发{direction} ({price_change_5m:.2f}% > {surge_threshold}%),强制分析", volatility_1h_percent
# 3. 波动率过低,跳过分析
reason = f"波动率过低 (1小时: {volatility_1h_percent:.2f}% < {min_volatility}%, 5分钟无突发波动),跳过分析"
logger.info(f"⏸️ {symbol} {reason}")
return False, reason, volatility_1h_percent
except Exception as e:
logger.warning(f"{symbol} 波动率检查失败: {e},允许分析")
return True, "波动率检查失败,允许分析", 0
async def analyze_symbol(self, symbol: str):
"""
分析单个交易对LLM 驱动)
Args:
symbol: 交易对,如 'BTCUSDT'
"""
try:
logger.info(f"\n{'' * 50}")
logger.info(f"📊 {symbol} 分析开始")
logger.info(f"{'' * 50}")
# 1. 获取多周期数据
data = self.binance.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
logger.warning(f"⚠️ {symbol} 数据不完整,跳过分析")
return
# 当前价格
current_price = float(data['5m'].iloc[-1]['close'])
price_change_24h = self._calculate_price_change(data['1h'])
logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})")
# 1.5. 波动率检查(节省 LLM 调用)
should_analyze, volatility_reason, volatility = self._check_volatility(symbol, data)
if not should_analyze:
logger.info(f"⏸️ {volatility_reason},跳过本次 LLM 分析")
return
# 获取当前持仓信息(供 LLM 仓位决策)
position_info = self.paper_trading.get_position_info()
# 2. LLM 分析(包含新闻舆情和持仓信息)
logger.info(f"\n🤖 【LLM 分析中...】")
result = await self.llm_analyzer.analyze(
symbol, data,
symbols=self.symbols,
position_info=position_info
)
# 输出分析摘要
summary = result.get('analysis_summary', '')
logger.info(f" 市场状态: {summary}")
# 输出新闻情绪
news_sentiment = result.get('news_sentiment', '')
news_impact = result.get('news_impact', '')
if news_sentiment:
sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': ''}.get(news_sentiment, '')
logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}")
if news_impact:
logger.info(f" 消息影响: {news_impact}")
# 输出关键价位
levels = result.get('key_levels', {})
if levels.get('support') or levels.get('resistance'):
support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]])
resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]])
logger.info(f" 支撑位: {support_str or '-'}")
logger.info(f" 阻力位: {resistance_str or '-'}")
# 2.5. 回顾并调整现有持仓LLM 主动管理)
if self.paper_trading_enabled and self.paper_trading:
await self._review_and_adjust_positions(symbol, data)
# 3. 处理信号
signals = result.get('signals', [])
if not signals:
logger.info(f"\n⏸️ 结论: 无交易信号,继续观望")
return
# 输出所有信号
logger.info(f"\n🎯 【发现 {len(signals)} 个信号】")
for sig in signals:
signal_type = sig.get('type', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(signal_type, signal_type)
action = sig.get('action', 'wait')
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'}
action_text = action_map.get(action, action)
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(grade, '')
logger.info(f"\n [{type_text}] {action_text}")
logger.info(f" 等级: {grade} {grade_icon} | 置信度: {confidence}%")
logger.info(f" 入场: ${sig.get('entry_price', 0):,.2f} | "
f"止损: ${sig.get('stop_loss', 0):,.2f} | "
f"止盈: ${sig.get('take_profit', 0):,.2f}")
logger.info(f" 原因: {sig.get('reason', '')[:100]}")
if sig.get('risk_warning'):
logger.info(f" 风险: {sig.get('risk_warning')}")
# 4. 选择最佳信号发送通知
best_signal = self.llm_analyzer.get_best_signal(result)
if best_signal and self._should_send_signal(symbol, best_signal):
logger.info(f"\n📤 【发送通知】")
# 构建通知消息
telegram_message = self.llm_analyzer.format_signal_message(best_signal, symbol)
feishu_card = self.llm_analyzer.format_feishu_card(best_signal, symbol)
# 发送通知 - 飞书使用卡片格式Telegram 使用文本格式
await self.feishu.send_card(
feishu_card['title'],
feishu_card['content'],
feishu_card['color']
)
await self.telegram.send_message(telegram_message)
logger.info(f" ✅ 已发送信号通知")
# 保存信号到数据库
signal_to_save = best_signal.copy()
signal_to_save['signal_type'] = 'crypto'
signal_to_save['symbol'] = symbol
signal_to_save['current_price'] = current_price
self.signal_db.add_signal(signal_to_save)
# 更新状态
self.last_signals[symbol] = best_signal
self.signal_cooldown[symbol] = datetime.now()
# 5. 创建模拟订单
if self.paper_trading_enabled and self.paper_trading:
grade = best_signal.get('grade', 'D')
position_size = best_signal.get('position_size', 'light')
if grade != 'D':
# 转换信号格式以兼容 paper_trading
paper_signal = self._convert_to_paper_signal(symbol, best_signal, current_price)
result = self.paper_trading.create_order_from_signal(paper_signal, current_price)
# 发送被取消挂单的通知
cancelled_orders = result.get('cancelled_orders', [])
for cancelled in cancelled_orders:
await self._notify_pending_cancelled(cancelled)
# 记录新订单
order = result.get('order')
if order:
logger.info(f" 📝 已创建模拟订单: {order.order_id} | 仓位: {position_size}")
else:
if best_signal:
logger.info(f"\n⏸️ 信号冷却中或置信度不足,不发送通知")
except Exception as e:
logger.error(f"❌ 分析 {symbol} 出错: {e}")
import traceback
logger.error(traceback.format_exc())
def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""转换 LLM 信号格式为模拟交易格式"""
signal_type = signal.get('type', 'medium_term')
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
# 获取入场类型和入场价
entry_type = signal.get('entry_type', 'market')
entry_price = signal.get('entry_price', current_price)
return {
'symbol': symbol,
'action': signal.get('action', 'hold'),
'entry_type': entry_type, # market 或 limit
'entry_price': entry_price, # 入场价(挂单价格)
'price': current_price, # 当前价格
'stop_loss': signal.get('stop_loss', 0),
'take_profit': signal.get('take_profit', 0),
'confidence': signal.get('confidence', 0),
'signal_grade': signal.get('grade', 'D'),
'signal_type': type_map.get(signal_type, 'swing'),
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
'reasons': [signal.get('reason', '')],
'timestamp': datetime.now()
}
def _calculate_price_change(self, h1_data: pd.DataFrame) -> str:
"""计算24小时价格变化"""
if len(h1_data) < 24:
return "N/A"
price_now = h1_data.iloc[-1]['close']
price_24h_ago = h1_data.iloc[-24]['close']
change = ((price_now - price_24h_ago) / price_24h_ago) * 100
if change >= 0:
return f"+{change:.2f}%"
return f"{change:.2f}%"
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
"""验证数据完整性"""
required_intervals = ['5m', '15m', '1h', '4h']
for interval in required_intervals:
if interval not in data or data[interval].empty:
return False
if len(data[interval]) < 20:
return False
return True
def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool:
"""判断是否应该发送信号"""
action = signal.get('action', 'wait')
if action == 'wait':
return False
confidence = signal.get('confidence', 0)
# 使用配置文件中的阈值
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
if confidence < threshold:
return False
# 检查冷却时间30分钟内不重复发送相同方向的信号
if symbol in self.signal_cooldown:
cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30)
if datetime.now() < cooldown_end:
if symbol in self.last_signals:
if self.last_signals[symbol].get('action') == action:
logger.debug(f"{symbol} 信号冷却中,跳过")
return False
return True
async def _review_and_adjust_positions(
self,
symbol: str,
data: Dict[str, pd.DataFrame]
):
"""
回顾并调整现有持仓LLM 主动管理)
每次分析后自动回顾该交易对的所有持仓,让 LLM 决定是否需要:
- 调整止损止盈
- 部分平仓
- 全部平仓
"""
try:
# 获取该交易对的所有活跃持仓(只看已成交的)
active_orders = self.paper_trading.get_active_orders()
positions = [
order for order in active_orders
if order.get('symbol') == symbol
and order.get('status') == 'open'
and order.get('filled_price') # 只处理已成交的订单
]
if not positions:
return # 没有持仓需要回顾
logger.info(f"\n🔄 【LLM 回顾持仓中...】共 {len(positions)} 个持仓")
# 准备持仓数据
position_data = []
for order in positions:
entry_price = order.get('filled_price') or order.get('entry_price', 0)
current_price = self.binance.get_ticker(symbol).get('lastPrice', entry_price)
if isinstance(current_price, str):
current_price = float(current_price)
# 计算盈亏百分比
side = order.get('side')
if side == 'long':
pnl_percent = ((current_price - entry_price) / entry_price) * 100
else:
pnl_percent = ((entry_price - current_price) / entry_price) * 100
position_data.append({
'order_id': order.get('order_id'),
'side': side,
'entry_price': entry_price,
'current_price': current_price,
'stop_loss': order.get('stop_loss', 0),
'take_profit': order.get('take_profit', 0),
'quantity': order.get('quantity', 0),
'pnl_percent': pnl_percent,
'open_time': order.get('open_time')
})
# 调用 LLM 回顾分析
decisions = await self.llm_analyzer.review_positions(symbol, position_data, data)
if not decisions:
logger.info(" LLM 建议保持所有持仓不变")
return
# 执行 LLM 的调整建议
logger.info(f"\n📝 【LLM 调整建议】共 {len(decisions)}")
for decision in decisions:
order_id = decision.get('order_id')
action = decision.get('action')
reason = decision.get('reason', '')
action_map = {
'HOLD': '保持',
'ADJUST_SL_TP': '调整止损止盈',
'PARTIAL_CLOSE': '部分平仓',
'FULL_CLOSE': '全部平仓'
}
action_text = action_map.get(action, action)
logger.info(f" 订单 {order_id[:8]}: {action_text} - {reason}")
# 执行调整
result = self.paper_trading.adjust_position_by_llm(
order_id=order_id,
action=action,
new_sl=decision.get('new_sl'),
new_tp=decision.get('new_tp'),
close_percent=decision.get('close_percent')
)
if result.get('success'):
# 发送通知
await self._notify_position_adjustment(symbol, order_id, decision, result)
# 如果是平仓操作,从活跃订单中移除
if action in ['PARTIAL_CLOSE', 'FULL_CLOSE']:
closed_result = result.get('pnl', {})
if closed_result:
pnl = closed_result.get('pnl', 0)
pnl_percent = closed_result.get('pnl_percent', 0)
logger.info(f" ✅ 已平仓: PnL ${pnl:+.2f} ({pnl_percent:+.1f}%)")
else:
logger.warning(f" ❌ 执行失败: {result.get('error')}")
except Exception as e:
logger.error(f"持仓回顾失败: {e}", exc_info=True)
async def _notify_position_adjustment(
self,
symbol: str,
order_id: str,
decision: Dict[str, Any],
result: Dict[str, Any]
):
"""发送持仓调整通知"""
action = decision.get('action')
reason = decision.get('reason', '')
action_map = {
'ADJUST_SL_TP': '🔄 调整止损止盈',
'PARTIAL_CLOSE': '📤 部分平仓',
'FULL_CLOSE': '🚪 全部平仓'
}
action_text = action_map.get(action, action)
message = f"""{action_text}
交易对: {symbol}
订单: {order_id[:8]}
原因: {reason}"""
if action == 'ADJUST_SL_TP':
changes = result.get('changes', [])
message += f"\n调整内容: {', '.join(changes)}"
elif action in ['PARTIAL_CLOSE', 'FULL_CLOSE']:
pnl_info = result.get('pnl', {})
if pnl_info:
pnl = pnl_info.get('pnl', 0)
pnl_percent = pnl_info.get('pnl_percent', 0)
message += f"\n实现盈亏: ${pnl:+.2f} ({pnl_percent:+.1f}%)"
if action == 'PARTIAL_CLOSE':
close_percent = decision.get('close_percent', 0)
remaining = result.get('remaining_quantity', 0)
message += f"\n平仓比例: {close_percent:.0f}%"
message += f"\n剩余仓位: ${remaining:,.0f}"
await self.feishu.send_text(message)
await self.telegram.send_message(message)
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
"""单次分析(用于测试或手动触发)"""
data = self.binance.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
return {'error': '数据不完整'}
# 获取持仓信息
position_info = self.paper_trading.get_position_info()
result = await self.llm_analyzer.analyze(
symbol, data,
symbols=self.symbols,
position_info=position_info
)
return result
def get_status(self) -> Dict[str, Any]:
"""获取智能体状态"""
return {
'running': self.running,
'symbols': self.symbols,
'mode': 'LLM 驱动',
'last_signals': {
symbol: {
'type': sig.get('type'),
'action': sig.get('action'),
'confidence': sig.get('confidence'),
'grade': sig.get('grade')
}
for symbol, sig in self.last_signals.items()
}
}
# 全局单例
_crypto_agent: Optional['CryptoAgent'] = None
def get_crypto_agent() -> 'CryptoAgent':
"""获取加密货币智能体单例"""
# 直接使用类单例,不使用全局变量(避免 reload 时重置)
return CryptoAgent()