stock-ai-agent/backend/app/crypto_agent/crypto_agent.py
2026-02-07 00:59:49 +08:00

393 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
加密货币交易智能体 - 主控制器LLM 驱动版)
"""
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pandas as pd
from app.utils.logger import logger
from app.config import get_settings
from app.services.binance_service import binance_service
from app.services.feishu_service import get_feishu_service
from app.services.telegram_service import get_telegram_service
from app.services.paper_trading_service import get_paper_trading_service
from app.services.price_monitor_service import get_price_monitor_service
from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer
class CryptoAgent:
"""加密货币交易信号智能体LLM 驱动版)"""
def __init__(self):
"""初始化智能体"""
self.settings = get_settings()
self.binance = binance_service
self.feishu = get_feishu_service()
self.telegram = get_telegram_service()
self.llm_analyzer = LLMSignalAnalyzer()
# 模拟交易服务
self.paper_trading_enabled = self.settings.paper_trading_enabled
if self.paper_trading_enabled:
self.paper_trading = get_paper_trading_service()
self.price_monitor = get_price_monitor_service()
self.price_monitor.add_price_callback(self._on_price_update)
else:
self.paper_trading = None
self.price_monitor = None
# 状态管理
self.last_signals: Dict[str, Dict[str, Any]] = {}
self.signal_cooldown: Dict[str, datetime] = {}
# 配置
self.symbols = self.settings.crypto_symbols.split(',')
# 运行状态
self.running = False
self._event_loop = None
logger.info(f"加密货币智能体初始化完成LLM 驱动),监控交易对: {self.symbols}")
if self.paper_trading_enabled:
logger.info(f"模拟交易已启用")
def _on_price_update(self, symbol: str, price: float):
"""处理实时价格更新(用于模拟交易)"""
if not self.paper_trading:
return
triggered = self.paper_trading.check_price_triggers(symbol, price)
for result in triggered:
if self._event_loop and self._event_loop.is_running():
asyncio.run_coroutine_threadsafe(self._notify_order_closed(result), self._event_loop)
else:
logger.warning(f"无法发送平仓通知: 事件循环不可用")
async def _notify_order_closed(self, result: Dict[str, Any]):
"""发送订单平仓通知"""
status = result.get('status', '')
is_win = result.get('is_win', False)
if status == 'closed_tp':
emoji = "🎯"
status_text = "止盈平仓"
elif status == 'closed_sl':
emoji = "🛑"
status_text = "止损平仓"
else:
emoji = "📤"
status_text = "手动平仓"
win_text = "盈利" if is_win else "亏损"
side_text = "做多" if result.get('side') == 'long' else "做空"
message = f"""{emoji} 订单{status_text}
交易对: {result.get('symbol')}
方向: {side_text}
入场: ${result.get('entry_price', 0):,.2f}
出场: ${result.get('exit_price', 0):,.2f}
{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})
持仓时间: {result.get('hold_duration', 'N/A')}"""
await self.feishu.send_text(message)
await self.telegram.send_message(message)
logger.info(f"已发送订单平仓通知: {result.get('order_id')}")
def _get_seconds_until_next_5min(self) -> int:
"""计算距离下一个5分钟整点的秒数"""
now = datetime.now()
current_minute = now.minute
current_second = now.second
minutes_past = current_minute % 5
if minutes_past == 0 and current_second == 0:
return 0
minutes_to_wait = 5 - minutes_past if minutes_past > 0 else 5
seconds_to_wait = minutes_to_wait * 60 - current_second
return seconds_to_wait
async def run(self):
"""主运行循环"""
self.running = True
self._event_loop = asyncio.get_event_loop()
# 启动横幅
logger.info("\n" + "=" * 60)
logger.info("🚀 加密货币交易信号智能体LLM 驱动)")
logger.info("=" * 60)
logger.info(f" 监控交易对: {', '.join(self.symbols)}")
logger.info(f" 运行模式: 每5分钟整点执行")
logger.info(f" 分析引擎: LLM 自主分析")
if self.paper_trading_enabled:
logger.info(f" 模拟交易: 已启用")
logger.info("=" * 60 + "\n")
# 启动价格监控
if self.paper_trading_enabled and self.price_monitor:
for symbol in self.symbols:
self.price_monitor.subscribe_symbol(symbol)
logger.info(f"已启动 WebSocket 价格监控: {', '.join(self.symbols)}")
# 发送启动通知
await self.feishu.send_text(
f"🚀 加密货币智能体已启动LLM 驱动)\n"
f"监控交易对: {', '.join(self.symbols)}\n"
f"运行时间: 每5分钟整点"
)
await self.telegram.send_startup_notification(self.symbols)
while self.running:
try:
wait_seconds = self._get_seconds_until_next_5min()
if wait_seconds > 0:
next_run = datetime.now() + timedelta(seconds=wait_seconds)
logger.info(f"⏳ 等待 {wait_seconds} 秒,下次运行: {next_run.strftime('%H:%M:%S')}")
await asyncio.sleep(wait_seconds)
run_time = datetime.now()
logger.info("\n" + "=" * 60)
logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]")
logger.info("=" * 60)
for symbol in self.symbols:
await self.analyze_symbol(symbol)
logger.info("\n" + "" * 60)
logger.info(f"✅ 本轮分析完成,共分析 {len(self.symbols)} 个交易对")
logger.info("" * 60 + "\n")
await asyncio.sleep(2)
except Exception as e:
logger.error(f"❌ 分析循环出错: {e}")
import traceback
logger.error(traceback.format_exc())
await asyncio.sleep(10)
def stop(self):
"""停止运行"""
self.running = False
if self.price_monitor:
self.price_monitor.stop()
logger.info("加密货币智能体已停止")
async def analyze_symbol(self, symbol: str):
"""
分析单个交易对LLM 驱动)
Args:
symbol: 交易对,如 'BTCUSDT'
"""
try:
logger.info(f"\n{'' * 50}")
logger.info(f"📊 {symbol} 分析开始")
logger.info(f"{'' * 50}")
# 1. 获取多周期数据
data = self.binance.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
logger.warning(f"⚠️ {symbol} 数据不完整,跳过分析")
return
# 当前价格
current_price = float(data['5m'].iloc[-1]['close'])
price_change_24h = self._calculate_price_change(data['1h'])
logger.info(f"💰 当前价格: ${current_price:,.2f} ({price_change_24h})")
# 2. LLM 分析(包含新闻舆情)
logger.info(f"\n🤖 【LLM 分析中...】")
result = await self.llm_analyzer.analyze(symbol, data, symbols=self.symbols)
# 输出分析摘要
summary = result.get('analysis_summary', '')
logger.info(f" 市场状态: {summary}")
# 输出新闻情绪
news_sentiment = result.get('news_sentiment', '')
news_impact = result.get('news_impact', '')
if news_sentiment:
sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': ''}.get(news_sentiment, '')
logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}")
if news_impact:
logger.info(f" 消息影响: {news_impact}")
# 输出关键价位
levels = result.get('key_levels', {})
if levels.get('support') or levels.get('resistance'):
support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]])
resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]])
logger.info(f" 支撑位: {support_str or '-'}")
logger.info(f" 阻力位: {resistance_str or '-'}")
# 3. 处理信号
signals = result.get('signals', [])
if not signals:
logger.info(f"\n⏸️ 结论: 无交易信号,继续观望")
return
# 输出所有信号
logger.info(f"\n🎯 【发现 {len(signals)} 个信号】")
for sig in signals:
signal_type = sig.get('type', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(signal_type, signal_type)
action = sig.get('action', 'wait')
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'}
action_text = action_map.get(action, action)
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(grade, '')
logger.info(f"\n [{type_text}] {action_text}")
logger.info(f" 等级: {grade} {grade_icon} | 置信度: {confidence}%")
logger.info(f" 入场: ${sig.get('entry_price', 0):,.2f} | "
f"止损: ${sig.get('stop_loss', 0):,.2f} | "
f"止盈: ${sig.get('take_profit', 0):,.2f}")
logger.info(f" 原因: {sig.get('reason', '')[:100]}")
if sig.get('risk_warning'):
logger.info(f" 风险: {sig.get('risk_warning')}")
# 4. 选择最佳信号发送通知
best_signal = self.llm_analyzer.get_best_signal(result)
if best_signal and self._should_send_signal(symbol, best_signal):
logger.info(f"\n📤 【发送通知】")
# 构建通知消息
telegram_message = self.llm_analyzer.format_signal_message(best_signal, symbol)
feishu_card = self.llm_analyzer.format_feishu_card(best_signal, symbol)
# 发送通知 - 飞书使用卡片格式Telegram 使用文本格式
await self.feishu.send_card(
feishu_card['title'],
feishu_card['content'],
feishu_card['color']
)
await self.telegram.send_message(telegram_message)
logger.info(f" ✅ 已发送信号通知")
# 更新状态
self.last_signals[symbol] = best_signal
self.signal_cooldown[symbol] = datetime.now()
# 5. 创建模拟订单
if self.paper_trading_enabled and self.paper_trading:
grade = best_signal.get('grade', 'D')
if grade != 'D':
# 转换信号格式以兼容 paper_trading
paper_signal = self._convert_to_paper_signal(symbol, best_signal, current_price)
order = self.paper_trading.create_order_from_signal(paper_signal)
if order:
logger.info(f" 📝 已创建模拟订单: {order.order_id}")
else:
if best_signal:
logger.info(f"\n⏸️ 信号冷却中或置信度不足,不发送通知")
except Exception as e:
logger.error(f"❌ 分析 {symbol} 出错: {e}")
import traceback
logger.error(traceback.format_exc())
def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any],
current_price: float) -> Dict[str, Any]:
"""转换 LLM 信号格式为模拟交易格式"""
signal_type = signal.get('type', 'medium_term')
type_map = {'short_term': 'short_term', 'medium_term': 'swing', 'long_term': 'swing'}
return {
'symbol': symbol,
'action': signal.get('action', 'hold'),
'price': current_price,
'stop_loss': signal.get('stop_loss', 0),
'take_profit': signal.get('take_profit', 0),
'confidence': signal.get('confidence', 0),
'signal_grade': signal.get('grade', 'D'),
'signal_type': type_map.get(signal_type, 'swing'),
'reasons': [signal.get('reason', '')],
'timestamp': datetime.now()
}
def _calculate_price_change(self, h1_data: pd.DataFrame) -> str:
"""计算24小时价格变化"""
if len(h1_data) < 24:
return "N/A"
price_now = h1_data.iloc[-1]['close']
price_24h_ago = h1_data.iloc[-24]['close']
change = ((price_now - price_24h_ago) / price_24h_ago) * 100
if change >= 0:
return f"+{change:.2f}%"
return f"{change:.2f}%"
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
"""验证数据完整性"""
required_intervals = ['5m', '15m', '1h', '4h']
for interval in required_intervals:
if interval not in data or data[interval].empty:
return False
if len(data[interval]) < 20:
return False
return True
def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool:
"""判断是否应该发送信号"""
action = signal.get('action', 'wait')
if action == 'wait':
return False
confidence = signal.get('confidence', 0)
if confidence < 50:
return False
# 检查冷却时间30分钟内不重复发送相同方向的信号
if symbol in self.signal_cooldown:
cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=30)
if datetime.now() < cooldown_end:
if symbol in self.last_signals:
if self.last_signals[symbol].get('action') == action:
logger.debug(f"{symbol} 信号冷却中,跳过")
return False
return True
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
"""单次分析(用于测试或手动触发)"""
data = self.binance.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
return {'error': '数据不完整'}
result = await self.llm_analyzer.analyze(symbol, data, symbols=self.symbols)
return result
def get_status(self) -> Dict[str, Any]:
"""获取智能体状态"""
return {
'running': self.running,
'symbols': self.symbols,
'mode': 'LLM 驱动',
'last_signals': {
symbol: {
'type': sig.get('type'),
'action': sig.get('action'),
'confidence': sig.get('confidence'),
'grade': sig.get('grade')
}
for symbol, sig in self.last_signals.items()
}
}
# 全局实例
crypto_agent = CryptoAgent()