486 lines
19 KiB
Python
486 lines
19 KiB
Python
"""
|
||
加密货币交易智能体 - 主控制器(LLM 驱动版)
|
||
"""
|
||
import asyncio
|
||
from typing import Dict, Any, List, Optional
|
||
from datetime import datetime, timedelta
|
||
import pandas as pd
|
||
|
||
from app.utils.logger import logger
|
||
from app.config import get_settings
|
||
from app.services.binance_service import binance_service
|
||
from app.services.feishu_service import get_feishu_service
|
||
from app.services.telegram_service import get_telegram_service
|
||
from app.services.paper_trading_service import get_paper_trading_service
|
||
from app.services.price_monitor_service import get_price_monitor_service
|
||
from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer
|
||
|
||
|
||
class CryptoAgent:
|
||
"""加密货币交易信号智能体(LLM 驱动版)"""
|
||
|
||
def __init__(self):
|
||
"""初始化智能体"""
|
||
self.settings = get_settings()
|
||
self.binance = binance_service
|
||
self.feishu = get_feishu_service()
|
||
self.telegram = get_telegram_service()
|
||
self.llm_analyzer = LLMSignalAnalyzer()
|
||
|
||
# 模拟交易服务
|
||
self.paper_trading_enabled = self.settings.paper_trading_enabled
|
||
if self.paper_trading_enabled:
|
||
self.paper_trading = get_paper_trading_service()
|
||
self.price_monitor = get_price_monitor_service()
|
||
self.price_monitor.add_price_callback(self._on_price_update)
|
||
else:
|
||
self.paper_trading = None
|
||
self.price_monitor = None
|
||
|
||
# 状态管理
|
||
self.last_signals: Dict[str, Dict[str, Any]] = {}
|
||
self.signal_cooldown: Dict[str, datetime] = {}
|
||
|
||
# 配置
|
||
self.symbols = self.settings.crypto_symbols.split(',')
|
||
|
||
# 运行状态
|
||
self.running = False
|
||
self._event_loop = None
|
||
|
||
logger.info(f"加密货币智能体初始化完成(LLM 驱动),监控交易对: {self.symbols}")
|
||
if self.paper_trading_enabled:
|
||
logger.info(f"模拟交易已启用")
|
||
|
||
def _on_price_update(self, symbol: str, price: float):
|
||
"""处理实时价格更新(用于模拟交易)"""
|
||
if not self.paper_trading:
|
||
return
|
||
|
||
triggered = self.paper_trading.check_price_triggers(symbol, price)
|
||
|
||
for result in triggered:
|
||
if self._event_loop and self._event_loop.is_running():
|
||
# 根据事件类型选择不同的通知方法
|
||
event_type = result.get('event_type', 'order_closed')
|
||
if event_type == 'order_filled':
|
||
asyncio.run_coroutine_threadsafe(self._notify_order_filled(result), self._event_loop)
|
||
elif event_type == 'breakeven_triggered':
|
||
asyncio.run_coroutine_threadsafe(self._notify_breakeven_triggered(result), self._event_loop)
|
||
else:
|
||
asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop)
|
||
else:
|
||
logger.warning(f"无法发送通知: 事件循环不可用")
|
||
|
||
async def _notify_order_filled(self, result: Dict[str, Any]):
|
||
"""发送挂单成交通知"""
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
grade = result.get('signal_grade', 'N/A')
|
||
|
||
message = f"""✅ 挂单成交
|
||
|
||
交易对: {result.get('symbol')}
|
||
方向: {side_text}
|
||
等级: {grade}
|
||
挂单价: ${result.get('entry_price', 0):,.2f}
|
||
成交价: ${result.get('filled_price', 0):,.2f}
|
||
仓位: ${result.get('quantity', 0):,.0f}
|
||
止损: ${result.get('stop_loss', 0):,.2f}
|
||
止盈: ${result.get('take_profit', 0):,.2f}"""
|
||
|
||
await self.feishu.send_text(message)
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送挂单成交通知: {result.get('order_id')}")
|
||
|
||
async def _notify_pending_cancelled(self, result: Dict[str, Any]):
|
||
"""发送挂单撤销通知"""
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
new_side_text = "做多" if result.get('new_side') == 'long' else "做空"
|
||
|
||
message = f"""⚠️ 挂单撤销
|
||
|
||
交易对: {result.get('symbol')}
|
||
原方向: {side_text}
|
||
挂单价: ${result.get('entry_price', 0):,.2f}
|
||
原因: 收到反向{new_side_text}信号,自动撤销"""
|
||
|
||
await self.feishu.send_text(message)
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送挂单撤销通知: {result.get('order_id')}")
|
||
|
||
async def _notify_breakeven_triggered(self, result: Dict[str, Any]):
|
||
"""发送保本止损触发通知"""
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
|
||
message = f"""🛡️ 保本止损已启动
|
||
|
||
交易对: {result.get('symbol')}
|
||
方向: {side_text}
|
||
开仓价: ${result.get('filled_price', 0):,.2f}
|
||
当前盈利: {result.get('current_pnl_percent', 0):.2f}%
|
||
止损已移至: ${result.get('new_stop_loss', 0):,.2f}
|
||
|
||
✅ 本单已锁定保本,不会亏损"""
|
||
|
||
await self.feishu.send_text(message)
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送保本止损通知: {result.get('order_id')}")
|
||
|
||
async def _notify_order_closed(self, result: Dict[str, Any]):
|
||
"""发送订单平仓通知"""
|
||
status = result.get('status', '')
|
||
is_win = result.get('is_win', False)
|
||
|
||
if status == 'closed_tp':
|
||
emoji = "🎯"
|
||
status_text = "止盈平仓"
|
||
elif status == 'closed_sl':
|
||
emoji = "🛑"
|
||
status_text = "止损平仓"
|
||
else:
|
||
emoji = "📤"
|
||
status_text = "手动平仓"
|
||
|
||
win_text = "盈利" if is_win else "亏损"
|
||
side_text = "做多" if result.get('side') == 'long' else "做空"
|
||
|
||
message = f"""{emoji} 订单{status_text}
|
||
|
||
交易对: {result.get('symbol')}
|
||
方向: {side_text}
|
||
入场: ${result.get('entry_price', 0):,.2f}
|
||
出场: ${result.get('exit_price', 0):,.2f}
|
||
{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})
|
||
持仓时间: {result.get('hold_duration', 'N/A')}"""
|
||
|
||
await self.feishu.send_text(message)
|
||
await self.telegram.send_message(message)
|
||
logger.info(f"已发送订单平仓通知: {result.get('order_id')}")
|
||
|
||
def _get_seconds_until_next_5min(self) -> int:
|
||
"""计算距离下一个5分钟整点的秒数"""
|
||
now = datetime.now()
|
||
current_minute = now.minute
|
||
current_second = now.second
|
||
|
||
minutes_past = current_minute % 5
|
||
if minutes_past == 0 and current_second == 0:
|
||
return 0
|
||
minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5
|
||
seconds_to_wait = minutes_to_wait * 60 - current_second
|
||
|
||
return seconds_to_wait
|
||
|
||
async def run(self):
|
||
"""主运行循环"""
|
||
self.running = True
|
||
self._event_loop = asyncio.get_event_loop()
|
||
|
||
# 启动横幅
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("🚀 加密货币交易信号智能体(LLM 驱动)")
|
||
logger.info("=" * 60)
|
||
logger.info(f" 监控交易对: {', '.join(self.symbols)}")
|
||
logger.info(f" 运行模式: 每5分钟整点执行")
|
||
logger.info(f" 分析引擎: LLM 自主分析")
|
||
if self.paper_trading_enabled:
|
||
logger.info(f" 模拟交易: 已启用")
|
||
logger.info("=" * 60 + "\n")
|
||
|
||
# 启动价格监控
|
||
if self.paper_trading_enabled and self.price_monitor:
|
||
for symbol in self.symbols:
|
||
self.price_monitor.subscribe_symbol(symbol)
|
||
logger.info(f"已启动 WebSocket 价格监控: {', '.join(self.symbols)}")
|
||
|
||
# 发送启动通知
|
||
await self.feishu.send_text(
|
||
f"🚀 加密货币智能体已启动(LLM 驱动)\n"
|
||
f"监控交易对: {', '.join(self.symbols)}\n"
|
||
f"运行时间: 每5分钟整点"
|
||
)
|
||
await self.telegram.send_startup_notification(self.symbols)
|
||
|
||
while self.running:
|
||
try:
|
||
wait_seconds = self._get_seconds_until_next_5min()
|
||
if wait_seconds > 0:
|
||
next_run = datetime.now() + timedelta(seconds=wait_seconds)
|
||
logger.info(f"⏳ 等待 {wait_seconds} 秒,下次运行: {next_run.strftime('%H:%M:%S')}")
|
||
await asyncio.sleep(wait_seconds)
|
||
|
||
run_time = datetime.now()
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]")
|
||
logger.info("=" * 60)
|
||
|
||
for symbol in self.symbols:
|
||
await self.analyze_symbol(symbol)
|
||
|
||
logger.info("\n" + "─" * 60)
|
||
logger.info(f"✅ 本轮分析完成,共分析 {len(self.symbols)} 个交易对")
|
||
logger.info("─" * 60 + "\n")
|
||
|
||
await asyncio.sleep(2)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 分析循环出错: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
await asyncio.sleep(10)
|
||
|
||
def stop(self):
|
||
"""停止运行"""
|
||
self.running = False
|
||
if self.price_monitor:
|
||
self.price_monitor.stop()
|
||
logger.info("加密货币智能体已停止")
|
||
|
||
async def analyze_symbol(self, symbol: str):
|
||
"""
|
||
分析单个交易对(LLM 驱动)
|
||
|
||
Args:
|
||
symbol: 交易对,如 'BTCUSDT'
|
||
"""
|
||
try:
|
||
logger.info(f"\n{'─' * 50}")
|
||
logger.info(f"📊 {symbol} 分析开始")
|
||
logger.info(f"{'─' * 50}")
|
||
|
||
# 1. 获取多周期数据
|
||
data = self.binance.get_multi_timeframe_data(symbol)
|
||
|
||
if not self._validate_data(data):
|
||
logger.warning(f"⚠️ {symbol} 数据不完整,跳过分析")
|
||
return
|
||
|
||
# 当前价格
|
||
current_price = float(data['5m'].iloc[-1]['close'])
|
||
price_change_24h = self._calculate_price_change(data['1h'])
|
||
logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})")
|
||
|
||
# 获取当前持仓信息(供 LLM 仓位决策)
|
||
position_info = self.paper_trading.get_position_info()
|
||
|
||
# 2. LLM 分析(包含新闻舆情和持仓信息)
|
||
logger.info(f"\n🤖 【LLM 分析中...】")
|
||
result = await self.llm_analyzer.analyze(
|
||
symbol, data,
|
||
symbols=self.symbols,
|
||
position_info=position_info
|
||
)
|
||
|
||
# 输出分析摘要
|
||
summary = result.get('analysis_summary', '无')
|
||
logger.info(f" 市场状态: {summary}")
|
||
|
||
# 输出新闻情绪
|
||
news_sentiment = result.get('news_sentiment', '')
|
||
news_impact = result.get('news_impact', '')
|
||
if news_sentiment:
|
||
sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': '➖'}.get(news_sentiment, '')
|
||
logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}")
|
||
if news_impact:
|
||
logger.info(f" 消息影响: {news_impact}")
|
||
|
||
# 输出关键价位
|
||
levels = result.get('key_levels', {})
|
||
if levels.get('support') or levels.get('resistance'):
|
||
support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]])
|
||
resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]])
|
||
logger.info(f" 支撑位: {support_str or '-'}")
|
||
logger.info(f" 阻力位: {resistance_str or '-'}")
|
||
|
||
# 3. 处理信号
|
||
signals = result.get('signals', [])
|
||
|
||
if not signals:
|
||
logger.info(f"\n⏸️ 结论: 无交易信号,继续观望")
|
||
return
|
||
|
||
# 输出所有信号
|
||
logger.info(f"\n🎯 【发现 {len(signals)} 个信号】")
|
||
|
||
for sig in signals:
|
||
signal_type = sig.get('type', 'unknown')
|
||
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
|
||
type_text = type_map.get(signal_type, signal_type)
|
||
|
||
action = sig.get('action', 'wait')
|
||
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'}
|
||
action_text = action_map.get(action, action)
|
||
|
||
grade = sig.get('grade', 'D')
|
||
confidence = sig.get('confidence', 0)
|
||
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(grade, '')
|
||
|
||
logger.info(f"\n [{type_text}] {action_text}")
|
||
logger.info(f" 等级: {grade} {grade_icon} | 置信度: {confidence}%")
|
||
logger.info(f" 入场: ${sig.get('entry_price', 0):,.2f} | "
|
||
f"止损: ${sig.get('stop_loss', 0):,.2f} | "
|
||
f"止盈: ${sig.get('take_profit', 0):,.2f}")
|
||
logger.info(f" 原因: {sig.get('reason', '无')[:100]}")
|
||
|
||
if sig.get('risk_warning'):
|
||
logger.info(f" 风险: {sig.get('risk_warning')}")
|
||
|
||
# 4. 选择最佳信号发送通知
|
||
best_signal = self.llm_analyzer.get_best_signal(result)
|
||
|
||
if best_signal and self._should_send_signal(symbol, best_signal):
|
||
logger.info(f"\n📤 【发送通知】")
|
||
|
||
# 构建通知消息
|
||
telegram_message = self.llm_analyzer.format_signal_message(best_signal, symbol)
|
||
feishu_card = self.llm_analyzer.format_feishu_card(best_signal, symbol)
|
||
|
||
# 发送通知 - 飞书使用卡片格式,Telegram 使用文本格式
|
||
await self.feishu.send_card(
|
||
feishu_card['title'],
|
||
feishu_card['content'],
|
||
feishu_card['color']
|
||
)
|
||
await self.telegram.send_message(telegram_message)
|
||
|
||
logger.info(f" ✅ 已发送信号通知")
|
||
|
||
# 更新状态
|
||
self.last_signals[symbol] = best_signal
|
||
self.signal_cooldown[symbol] = datetime.now()
|
||
|
||
# 5. 创建模拟订单
|
||
if self.paper_trading_enabled and self.paper_trading:
|
||
grade = best_signal.get('grade', 'D')
|
||
position_size = best_signal.get('position_size', 'light')
|
||
if grade != 'D':
|
||
# 转换信号格式以兼容 paper_trading
|
||
paper_signal = self._convert_to_paper_signal(symbol, best_signal, current_price)
|
||
result = self.paper_trading.create_order_from_signal(paper_signal, current_price)
|
||
|
||
# 发送被取消挂单的通知
|
||
cancelled_orders = result.get('cancelled_orders', [])
|
||
for cancelled in cancelled_orders:
|
||
await self._notify_pending_cancelled(cancelled)
|
||
|
||
# 记录新订单
|
||
order = result.get('order')
|
||
if order:
|
||
logger.info(f" 📝 已创建模拟订单: {order.order_id} | 仓位: {position_size}")
|
||
else:
|
||
if best_signal:
|
||
logger.info(f"\n⏸️ 信号冷却中或置信度不足,不发送通知")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 分析 {symbol} 出错: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any],
|
||
current_price: float) -> Dict[str, Any]:
|
||
"""转换 LLM 信号格式为模拟交易格式"""
|
||
signal_type = signal.get('type', 'medium_term')
|
||
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
|
||
|
||
# 获取入场类型和入场价
|
||
entry_type = signal.get('entry_type', 'market')
|
||
entry_price = signal.get('entry_price', current_price)
|
||
|
||
return {
|
||
'symbol': symbol,
|
||
'action': signal.get('action', 'hold'),
|
||
'entry_type': entry_type, # market 或 limit
|
||
'entry_price': entry_price, # 入场价(挂单价格)
|
||
'price': current_price, # 当前价格
|
||
'stop_loss': signal.get('stop_loss', 0),
|
||
'take_profit': signal.get('take_profit', 0),
|
||
'confidence': signal.get('confidence', 0),
|
||
'signal_grade': signal.get('grade', 'D'),
|
||
'signal_type': type_map.get(signal_type, 'swing'),
|
||
'position_size': signal.get('position_size', 'light'), # LLM 建议的仓位大小
|
||
'reasons': [signal.get('reason', '')],
|
||
'timestamp': datetime.now()
|
||
}
|
||
|
||
def _calculate_price_change(self, h1_data: pd.DataFrame) -> str:
|
||
"""计算24小时价格变化"""
|
||
if len(h1_data) < 24:
|
||
return "N/A"
|
||
price_now = h1_data.iloc[-1]['close']
|
||
price_24h_ago = h1_data.iloc[-24]['close']
|
||
change = ((price_now - price_24h_ago) / price_24h_ago) * 100
|
||
if change >= 0:
|
||
return f"+{change:.2f}%"
|
||
return f"{change:.2f}%"
|
||
|
||
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
|
||
"""验证数据完整性"""
|
||
required_intervals = ['5m', '15m', '1h', '4h']
|
||
for interval in required_intervals:
|
||
if interval not in data or data[interval].empty:
|
||
return False
|
||
if len(data[interval]) < 20:
|
||
return False
|
||
return True
|
||
|
||
def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool:
|
||
"""判断是否应该发送信号"""
|
||
action = signal.get('action', 'wait')
|
||
if action == 'wait':
|
||
return False
|
||
|
||
confidence = signal.get('confidence', 0)
|
||
# 使用配置文件中的阈值
|
||
threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比
|
||
if confidence < threshold:
|
||
return False
|
||
|
||
# 检查冷却时间(30分钟内不重复发送相同方向的信号)
|
||
if symbol in self.signal_cooldown:
|
||
cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30)
|
||
if datetime.now() < cooldown_end:
|
||
if symbol in self.last_signals:
|
||
if self.last_signals[symbol].get('action') == action:
|
||
logger.debug(f"{symbol} 信号冷却中,跳过")
|
||
return False
|
||
|
||
return True
|
||
|
||
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
|
||
"""单次分析(用于测试或手动触发)"""
|
||
data = self.binance.get_multi_timeframe_data(symbol)
|
||
|
||
if not self._validate_data(data):
|
||
return {'error': '数据不完整'}
|
||
|
||
# 获取持仓信息
|
||
position_info = self.paper_trading.get_position_info()
|
||
|
||
result = await self.llm_analyzer.analyze(
|
||
symbol, data,
|
||
symbols=self.symbols,
|
||
position_info=position_info
|
||
)
|
||
return result
|
||
|
||
def get_status(self) -> Dict[str, Any]:
|
||
"""获取智能体状态"""
|
||
return {
|
||
'running': self.running,
|
||
'symbols': self.symbols,
|
||
'mode': 'LLM 驱动',
|
||
'last_signals': {
|
||
symbol: {
|
||
'type': sig.get('type'),
|
||
'action': sig.get('action'),
|
||
'confidence': sig.get('confidence'),
|
||
'grade': sig.get('grade')
|
||
}
|
||
for symbol, sig in self.last_signals.items()
|
||
}
|
||
}
|
||
|
||
|
||
# 全局实例
|
||
crypto_agent = CryptoAgent()
|