diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index 8b24ec7..7a01222 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -13,7 +13,8 @@ from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service from app.services.paper_trading_service import get_paper_trading_service from app.services.signal_database_service import get_signal_db_service -from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer +from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer +from app.crypto_agent.trading_decision_maker import TradingDecisionMaker from app.utils.system_status import get_system_monitor, AgentStatus @@ -40,7 +41,14 @@ class CryptoAgent: self.binance = bitget_service # 使用 Bitget 服务 self.feishu = get_feishu_service() self.telegram = get_telegram_service() - self.llm_analyzer = LLMSignalAnalyzer() + + # 新架构:市场信号分析器 + 交易决策器 + self.market_analyzer = MarketSignalAnalyzer() + self.decision_maker = TradingDecisionMaker() + + # 保留旧的 LLM 分析器用于兼容(可选) + # self.llm_analyzer = LLMSignalAnalyzer() + self.signal_db = get_signal_db_service() # 信号数据库服务 # 模拟交易服务(始终启用) @@ -74,7 +82,7 @@ class CryptoAgent: ) monitor.update_config("crypto_agent", { "symbols": self.symbols, - "auto_trading_enabled": self.paper_trading_enabled, # 改名为自动交易 + "auto_trading_enabled": True, # 模拟交易始终启用 "analysis_interval": "每5分钟整点" }) @@ -123,8 +131,10 @@ class CryptoAgent: 止损: ${result.get('stop_loss', 0):,.2f} 止盈: ${result.get('take_profit', 0):,.2f}""" - await self.feishu.send_text(message) - await self.telegram.send_message(message) + if self.settings.feishu_enabled: + await self.feishu.send_text(message) + if self.settings.telegram_enabled: + await self.telegram.send_message(message) logger.info(f"已发送挂单成交通知: {result.get('order_id')}") async def _notify_pending_cancelled(self, result: Dict[str, Any]): @@ -139,8 +149,10 @@ class CryptoAgent: 挂单价: ${result.get('entry_price', 0):,.2f} 原因: 收到反向{new_side_text}信号,自动撤销""" - await self.feishu.send_text(message) - await self.telegram.send_message(message) + if self.settings.feishu_enabled: + await self.feishu.send_text(message) + if self.settings.telegram_enabled: + await self.telegram.send_message(message) logger.info(f"已发送挂单撤销通知: {result.get('order_id')}") async def _notify_breakeven_triggered(self, result: Dict[str, Any]): @@ -157,8 +169,10 @@ class CryptoAgent: 💰 锁定利润,让利润奔跑""" - await self.feishu.send_text(message) - await self.telegram.send_message(message) + if self.settings.feishu_enabled: + await self.feishu.send_text(message) + if self.settings.telegram_enabled: + await self.telegram.send_message(message) logger.info(f"已发送移动止损通知: {result.get('order_id')}") async def _notify_order_closed(self, result: Dict[str, Any]): @@ -191,8 +205,10 @@ class CryptoAgent: {win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f}) 持仓时间: {result.get('hold_duration', 'N/A')}""" - await self.feishu.send_text(message) - await self.telegram.send_message(message) + if self.settings.feishu_enabled: + await self.feishu.send_text(message) + if self.settings.telegram_enabled: + await self.telegram.send_message(message) logger.info(f"已发送订单平仓通知: {result.get('order_id')}") def _get_seconds_until_next_5min(self) -> int: @@ -225,8 +241,7 @@ class CryptoAgent: logger.info(f" 监控交易对: {', '.join(self.symbols)}") logger.info(f" 运行模式: 每5分钟整点执行") logger.info(f" 分析引擎: LLM 自主分析") - if self.paper_trading_enabled: - logger.info(f" 模拟交易: 已启用") + logger.info(f" 模拟交易: 已启用") logger.info("=" * 60 + "\n") # 更新状态为运行中 @@ -234,8 +249,7 @@ class CryptoAgent: # 注意:不再启动独立的价格监控 # 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查 - if self.paper_trading_enabled: - logger.info(f"模拟交易已启用(由后台统一监控)") + logger.info(f"模拟交易已启用(由后台统一监控)") # 发送启动通知 await self.feishu.send_text( @@ -372,7 +386,12 @@ class CryptoAgent: async def analyze_symbol(self, symbol: str): """ - 分析单个交易对(LLM 驱动) + 分析单个交易对(新架构:市场分析 + 交易决策分离) + + 新架构流程: + 1. 市场信号分析器分析市场(不包含仓位信息) + 2. 交易决策器根据信号+仓位+账户状态做决策 + 3. 执行交易决策 Args: symbol: 交易对,如 'BTCUSDT' @@ -404,155 +423,772 @@ class CryptoAgent: logger.info(f"⏸️ {volatility_reason},跳过本次 LLM 分析") return - # 获取当前持仓信息(供 LLM 仓位决策) - # 优先使用实盘持仓信息(如果实盘交易启用),否则使用模拟盘持仓 - if self.real_trading and self.real_trading.get_auto_trading_status(): - position_info = self.real_trading.get_position_info_for_llm() - else: - position_info = self.paper_trading.get_position_info() + # ============================================================ + # 第一阶段:市场信号分析(不包含仓位信息) + # ============================================================ + logger.info(f"\n🤖 【第一阶段:市场信号分析】") - # 2. LLM 分析(包含新闻舆情和持仓信息) - logger.info(f"\n🤖 【LLM 分析中...】") - result = await self.llm_analyzer.analyze( + market_signal = await self.market_analyzer.analyze( symbol, data, - symbols=self.symbols, - position_info=position_info + symbols=self.symbols ) - # 输出分析摘要 - summary = result.get('analysis_summary', '无') - logger.info(f" 市场状态: {summary}") + # 输出市场分析结果 + self._log_market_signal(market_signal) - # 输出新闻情绪 - news_sentiment = result.get('news_sentiment', '') - news_impact = result.get('news_impact', '') - if news_sentiment: - sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': '➖'}.get(news_sentiment, '') - logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}") - if news_impact: - logger.info(f" 消息影响: {news_impact}") + # 过滤掉 wait 信号,只保留 buy/sell 信号 + signals = market_signal.get('signals', []) + trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']] - # 输出关键价位 - levels = result.get('key_levels', {}) - if levels.get('support') or levels.get('resistance'): - support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]]) - resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]]) - logger.info(f" 支撑位: {support_str or '-'}") - logger.info(f" 阻力位: {resistance_str or '-'}") - - # 2.5. 回顾并调整现有持仓(LLM 主动管理) - if self.paper_trading_enabled and self.paper_trading: - await self._review_and_adjust_positions(symbol, data) - - # 3. 处理信号 - signals = result.get('signals', []) - - if not signals: - logger.info(f"\n⏸️ 结论: 无交易信号,继续观望") + if not trade_signals: + logger.info(f"\n⏸️ 结论: 无交易信号(仅有观望建议),继续观望") return - # 输出所有信号 - logger.info(f"\n🎯 【发现 {len(signals)} 个信号】") + # 检查是否有达到阈值的交易信号 + threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比 + valid_signals = [s for s in trade_signals if s.get('confidence', 0) >= threshold] - for sig in signals: - signal_type = sig.get('type', 'unknown') - type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'} - type_text = type_map.get(signal_type, signal_type) + if not valid_signals: + logger.info(f"\n⏸️ 结论: 无交易信号达到置信度阈值 ({threshold}%),继续观望") + return - action = sig.get('action', 'wait') - action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'} - action_text = action_map.get(action, action) + logger.info(f"\n✅ 发现 {len(valid_signals)} 个有效交易信号(达到 {threshold}% 阈值)") - grade = sig.get('grade', 'D') - confidence = sig.get('confidence', 0) - grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(grade, '') + # ============================================================ + # 发送市场信号通知(独立于交易决策) + # ============================================================ + await self._send_market_signal_notification(market_signal, current_price) - logger.info(f"\n [{type_text}] {action_text}") - logger.info(f" 等级: {grade} {grade_icon} | 置信度: {confidence}%") - logger.info(f" 入场: ${sig.get('entry_price', 0):,.2f} | " - f"止损: ${sig.get('stop_loss', 0):,.2f} | " - f"止盈: ${sig.get('take_profit', 0):,.2f}") - logger.info(f" 原因: {sig.get('reason', '无')[:100]}") + # ============================================================ + # 第二阶段:交易决策(信号 + 仓位 + 账户状态) + # 模拟交易和实盘交易分别进行独立决策 + # ============================================================ + logger.info(f"\n🤖 【第二阶段:交易决策】") - if sig.get('risk_warning'): - logger.info(f" 风险: {sig.get('risk_warning')}") + # 获取配置 + paper_trading_enabled = self.settings.paper_trading_enabled + real_trading_enabled = self.settings.real_trading_enabled - # 4. 选择最佳信号发送通知 - best_signal = self.llm_analyzer.get_best_signal(result) + # 分别存储模拟和实盘的决策 + paper_decision = None + real_decision = None - if best_signal and self._should_send_signal(symbol, best_signal): - logger.info(f"\n📤 【发送通知】") - - # 构建通知消息 - telegram_message = self.llm_analyzer.format_signal_message(best_signal, symbol) - feishu_card = self.llm_analyzer.format_feishu_card(best_signal, symbol) - - # 发送通知 - 飞书使用卡片格式,Telegram 使用文本格式 - await self.feishu.send_card( - feishu_card['title'], - feishu_card['content'], - feishu_card['color'] + # 模拟交易决策 + if paper_trading_enabled: + logger.info(f"\n📊 【模拟交易决策】") + positions, account = self._get_trading_state(use_real_trading=False) + paper_decision = await self.decision_maker.make_decision( + market_signal, positions, account, current_price ) - await self.telegram.send_message(telegram_message) - - logger.info(f" ✅ 已发送信号通知") - - # 保存信号到数据库 - signal_to_save = best_signal.copy() - signal_to_save['signal_type'] = 'crypto' - signal_to_save['symbol'] = symbol - signal_to_save['current_price'] = current_price - self.signal_db.add_signal(signal_to_save) - - # 更新状态 - self.last_signals[symbol] = best_signal - self.signal_cooldown[symbol] = datetime.now() - - # 5. 创建模拟订单(始终执行) - if self.paper_trading: - grade = best_signal.get('grade', 'D') - position_size = best_signal.get('position_size', 'light') - if grade != 'D': - # 转换信号格式以兼容 paper_trading - paper_signal = self._convert_to_paper_signal(symbol, best_signal, current_price) - result = self.paper_trading.create_order_from_signal(paper_signal, current_price) - - # 发送被取消挂单的通知 - cancelled_orders = result.get('cancelled_orders', []) - for cancelled in cancelled_orders: - await self._notify_pending_cancelled(cancelled) - - # 记录新订单 - order = result.get('order') - if order: - logger.info(f" 📝 已创建模拟订单: {order.order_id} | 仓位: {position_size}") - - # 6. 创建实盘订单(如果启用了自动交易) - if self.real_trading and self.real_trading.get_auto_trading_status(): - grade = best_signal.get('grade', 'D') - position_size = best_signal.get('position_size', 'light') - if grade != 'D': - # 转换信号格式以兼容 real_trading - real_signal = self._convert_to_real_signal(symbol, best_signal, current_price) - result = self.real_trading.create_order_from_signal(real_signal, current_price) - - if result.get('success'): - logger.info(f" 💰 已创建实盘订单: {result.get('order_id')} | 仓位: {position_size} | 数量: ${result.get('quantity', 0):.2f}") - - # 发送实盘订单成交通知 - await self._notify_real_order_created(symbol, best_signal, result) - elif not result.get('skipped'): - # 只有非跳过的情况才记录错误 - logger.warning(f" ⚠️ 实盘订单创建失败: {result.get('message')}") + self._log_trading_decision(paper_decision) else: - if best_signal: - logger.info(f"\n⏸️ 信号冷却中或置信度不足,不发送通知") + logger.info(f"⏸️ 模拟交易未启用") + + # 实盘交易决策 + if real_trading_enabled: + logger.info(f"\n💰 【实盘交易决策】") + # 检查是否开启自动交易 + if self.real_trading and self.real_trading.get_auto_trading_status(): + positions, account = self._get_trading_state(use_real_trading=True) + real_decision = await self.decision_maker.make_decision( + market_signal, positions, account, current_price + ) + self._log_trading_decision(real_decision) + else: + logger.info(f"⏸️ 实盘自动交易未开启") + else: + logger.info(f"⏸️ 实盘交易未启用") + + # ============================================================ + # 第三阶段:执行交易决策 + # ============================================================ + await self._execute_decisions(paper_decision, real_decision, market_signal, current_price) except Exception as e: logger.error(f"❌ 分析 {symbol} 出错: {e}") import traceback logger.error(traceback.format_exc()) + def _log_market_signal(self, signal: Dict[str, Any]): + """输出市场信号分析结果""" + logger.info(f" 市场状态: {signal.get('market_state')}") + logger.info(f" 趋势: {signal.get('trend')}") + + # 新闻情绪 + news_sentiment = signal.get('news_sentiment', '') + if news_sentiment: + sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': '➖'}.get(news_sentiment, '') + logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}") + + # 关键价位 + import re + key_levels = signal.get('key_levels', {}) + if key_levels.get('support') or key_levels.get('resistance'): + # 从字符串中提取数字(处理 "66065 (15m布林下轨)" 这种格式) + def extract_number(val): + if isinstance(val, (int, float)): + return float(val) + if isinstance(val, str): + # 提取第一个数字 + match = re.search(r'[\d,]+\.?\d*', val.replace(',', '')) + if match: + return float(match.group()) + return None + + supports = [extract_number(s) for s in key_levels.get('support', [])[:2]] + resistances = [extract_number(r) for r in key_levels.get('resistance', [])[:2]] + support_str = ', '.join([f"${s:,.2f}" for s in supports if s is not None]) + resistance_str = ', '.join([f"${r:,.2f}" for r in resistances if r is not None]) + logger.info(f" 支撑位: {support_str or '-'}") + logger.info(f" 阻力位: {resistance_str or '-'}") + + # 信号列表 - 区分交易信号和观望建议 + signals = signal.get('signals', []) + trade_signals = [s for s in signals if s.get('action') in ['buy', 'sell']] + wait_signals = [s for s in signals if s.get('action') == 'wait'] + + if trade_signals: + logger.info(f"\n🎯 【发现 {len(trade_signals)} 个交易信号】") + for i, sig in enumerate(trade_signals, 1): + signal_type = sig.get('timeframe', 'unknown') + type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'} + type_text = type_map.get(signal_type, signal_type) + + action = sig.get('action', 'hold') + action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'} + action_text = action_map.get(action, action) + + confidence = sig.get('confidence', 0) + + logger.info(f"\n [{i}] {type_text} | {action_text}") + logger.info(f" 信心度: {confidence}%") + logger.info(f" 入场: ${sig.get('entry_zone', 'N/A')}") + logger.info(f" 止损: ${sig.get('stop_loss', 'N/A')}") + logger.info(f" 止盈: ${sig.get('take_profit', 'N/A')}") + logger.info(f" 理由: {sig.get('reasoning', 'N/A')}") + + if wait_signals: + logger.info(f"\n📋 【{len(wait_signals)} 个观望建议(不触发交易)】") + for i, sig in enumerate(wait_signals, 1): + signal_type = sig.get('timeframe', 'unknown') + type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'} + type_text = type_map.get(signal_type, signal_type) + + confidence = sig.get('confidence', 0) + + logger.info(f"\n [{i}] {type_text} | 观望") + logger.info(f" 信心度: {confidence}%") + logger.info(f" 理由: {sig.get('reasoning', 'N/A')}") + + def _log_trading_decision(self, decision: Dict[str, Any]): + """输出交易决策结果""" + decision_type = decision.get('decision', 'HOLD') + decision_map = { + 'OPEN': '🟢 开仓', + 'CLOSE': '🔴 平仓', + 'ADD': '➕ 加仓', + 'REDUCE': '➖ 减仓', + 'HOLD': '⏸️ 观望' + } + + logger.info(f" 决策: {decision_map.get(decision_type, decision_type)}") + logger.info(f" 动作: {decision.get('action', 'N/A')}") + logger.info(f" 仓位: {decision.get('position_size', 'N/A')}") + + # quantity 是保证金,显示持仓价值 = 保证金 × 20 + quantity = decision.get('quantity', 0) + if isinstance(quantity, (int, float)) and quantity > 0: + logger.info(f" 持仓价值: ${quantity * 20:,.2f} (保证金 ${quantity:.2f})") + else: + logger.info(f" 数量: ${decision.get('quantity', 'N/A')}") + + if decision.get('stop_loss'): + logger.info(f" 止损: ${decision.get('stop_loss')}") + if decision.get('take_profit'): + logger.info(f" 止盈: ${decision.get('take_profit')}") + + logger.info(f" 理由: {decision.get('reasoning', 'N/A')}") + + risk = decision.get('risk_analysis', '') + if risk: + logger.info(f" 风险: {risk}") + + def _get_trading_state(self, use_real_trading: bool = False) -> tuple: + """ + 获取交易状态(持仓和账户) + + Args: + use_real_trading: True 获取实盘状态,False 获取模拟交易状态 + """ + if use_real_trading and self.real_trading: + # 实盘交易 + active_orders = self.real_trading.get_active_orders() + account = self.real_trading.get_account_status() + else: + # 模拟交易 + active_orders = self.paper_trading.get_active_orders() + account = self.paper_trading.get_account_status() + + # 转换为标准格式(只返回已成交的订单作为持仓) + position_list = [] + for order in active_orders: + # 只返回已成交(OPEN 状态且有 filled_price)的订单 + if order.get('status') == 'open' and order.get('filled_price'): + position_list.append({ + 'symbol': order.get('symbol'), + 'side': order.get('side'), + 'holding': order.get('quantity', 0), # 使用 quantity 作为持仓量 + 'entry_price': order.get('filled_price') or order.get('entry_price'), + 'stop_loss': order.get('stop_loss'), + 'take_profit': order.get('take_profit') + }) + + return position_list, account + + async def _execute_decisions(self, paper_decision: Dict[str, Any], + real_decision: Dict[str, Any], + market_signal: Dict[str, Any], current_price: float): + """执行交易决策(模拟和实盘分别执行)""" + # 选择最佳信号用于保存 + best_signal = self._get_best_signal_from_market(market_signal) + + # 保存信号到数据库(只保存一次) + if best_signal: + signal_to_save = best_signal.copy() + signal_to_save['signal_type'] = 'crypto' + signal_to_save['symbol'] = market_signal.get('symbol') + signal_to_save['current_price'] = current_price + self.signal_db.add_signal(signal_to_save) + + # 获取配置 + paper_trading_enabled = self.settings.paper_trading_enabled + real_trading_enabled = self.settings.real_trading_enabled + + # 记录执行结果 + paper_executed = False + real_executed = False + + # ============================================================ + # 执行模拟交易决策 + # ============================================================ + if paper_trading_enabled and paper_decision: + decision_type = paper_decision.get('decision', 'HOLD') + + if decision_type == 'HOLD': + logger.info(f"\n📊 模拟交易: {paper_decision.get('reasoning', '观望')}") + else: + logger.info(f"\n📊 【执行模拟交易】") + + if decision_type in ['OPEN', 'ADD']: + # 先执行交易 + result = await self._execute_paper_trade(paper_decision, market_signal, current_price) + # 检查是否成功执行 + order = result.get('order') if result else None + if order: + # 只有成功创建订单后才发送通知 + await self._send_signal_notification(market_signal, paper_decision, current_price, is_paper=True) + paper_executed = True + else: + logger.warning(f" ⚠️ 模拟交易未执行,跳过通知") + elif decision_type == 'CLOSE': + await self._execute_close(paper_decision, paper_trading=True) + paper_executed = True + + # ============================================================ + # 执行实盘交易决策 + # ============================================================ + if real_trading_enabled and real_decision: + # 检查是否开启自动交易 + if self.real_trading and self.real_trading.get_auto_trading_status(): + decision_type = real_decision.get('decision', 'HOLD') + + if decision_type == 'HOLD': + logger.info(f"\n💰 实盘交易: {real_decision.get('reasoning', '观望')}") + else: + logger.info(f"\n💰 【执行实盘交易】") + + if decision_type in ['OPEN', 'ADD']: + # 先执行交易 + result = await self._execute_real_trade(real_decision, market_signal, current_price) + # 检查是否成功执行 + if result and result.get('success'): + # 只有成功创建订单后才发送通知 + await self._send_signal_notification(market_signal, real_decision, current_price, is_paper=False) + real_executed = True + else: + logger.warning(f" ⚠️ 实盘交易未执行,跳过通知") + elif decision_type == 'CLOSE': + await self._execute_close(real_decision, paper_trading=False) + real_executed = True + + # 如果都没有执行,给出提示 + if not paper_executed and not real_executed: + logger.info(f"\n⏸️ 所有交易均为观望,无需执行") + + def _get_best_signal_from_market(self, market_signal: Dict[str, Any]) -> Dict[str, Any]: + """从市场信号中获取最佳信号""" + signals = market_signal.get('signals', []) + if not signals: + return {} + + # 按信心度排序,取最高的 + sorted_signals = sorted(signals, key=lambda x: x.get('confidence', 0), reverse=True) + return sorted_signals[0] + + async def _send_market_signal_notification(self, market_signal: Dict[str, Any], + current_price: float): + """发送市场信号通知(第一阶段)- 调用前已确保有有效信号""" + try: + # 获取配置的阈值 + threshold = self.settings.crypto_llm_threshold * 100 # 转换为百分比 + + # 过滤达到阈值的信号(防御性检查) + signals = market_signal.get('signals', []) + valid_signals = [s for s in signals if s.get('confidence', 0) >= threshold] + + if not valid_signals: + return + + # 取最佳信号(按信心度排序) + best_signal = sorted(valid_signals, key=lambda x: x.get('confidence', 0), reverse=True)[0] + + # 构建通知消息 - 完全匹配旧格式 + symbol = market_signal.get('symbol') + market_state = market_signal.get('market_state') + trend = market_signal.get('trend') + + # 注意:经过 market_signal_analyzer 解析后 + # - 'action' 是 buy/sell/wait (交易方向) + # - 'timeframe' 是 short_term/medium_term/long_term (周期) + sig_action = best_signal.get('action', 'hold') # buy/sell/wait + timeframe = best_signal.get('timeframe', 'unknown') # short_term/medium_term/long_term + confidence = best_signal.get('confidence', 0) + entry_val = best_signal.get('entry_zone', 'N/A') + sl_val = best_signal.get('stop_loss', 'N/A') + tp_val = best_signal.get('take_profit', 'N/A') + reasoning = best_signal.get('reasoning', '无') + + # 格式化价格用于显示(处理 float 和 N/A) + def format_price(price_value): + if price_value is None or price_value == 'N/A': + return 'N/A' + if isinstance(price_value, float): + return f"{price_value:,.2f}" + return str(price_value) + + entry = format_price(entry_val) + sl = format_price(sl_val) + tp = format_price(tp_val) + + # 类型映射 + type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'} + timeframe_text = type_map.get(timeframe, timeframe) + + action_map = {'buy': '🟢 做多', 'sell': '🔴 做空', 'hold': '➖ 观望'} + action_text = action_map.get(sig_action, sig_action) + + # 入场类型 - 从信号中获取 + entry_type = best_signal.get('entry_type', 'market') + entry_type_text = '现价入场' if entry_type == 'market' else '挂单等待' + entry_type_icon = '⚡' if entry_type == 'market' else '⏳' + + # 等级(基于信心度映射) + if confidence >= 85: + grade = 'A' + grade_icon = '⭐⭐⭐' + elif confidence >= 75: + grade = 'B' + grade_icon = '⭐⭐' + elif confidence >= 60: + grade = 'C' + grade_icon = '⭐' + else: + grade = 'D' + grade_icon = '' + + # 仓位(基于信心度和杠杆空间) + if confidence >= 80: + position_size = 'heavy' + position_icon = '🔥' + position_text = '重仓' + elif confidence >= 70: + position_size = 'medium' + position_icon = '📊' + position_text = '中仓' + else: + position_size = 'light' + position_icon = '🌱' + position_text = '轻仓' + + # 计算止损止盈百分比(价格已经是 float) + try: + # 使用当前价格作为入场价(如果 entry_zone 是 N/A) + entry_for_calc = entry_val if isinstance(entry_val, (int, float)) else current_price + + if isinstance(sl_val, (int, float)) and isinstance(entry_for_calc, (int, float)) and entry_for_calc > 0: + if sig_action == 'buy': + sl_percent = ((sl_val - entry_for_calc) / entry_for_calc * 100) + else: + sl_percent = ((entry_for_calc - sl_val) / entry_for_calc * 100) + sl_display = f"{sl_percent:+.1f}%" + else: + sl_display = "N/A" + + if isinstance(tp_val, (int, float)) and isinstance(entry_for_calc, (int, float)) and entry_for_calc > 0: + if sig_action == 'buy': + tp_percent = ((tp_val - entry_for_calc) / entry_for_calc * 100) + else: + tp_percent = ((entry_for_calc - tp_val) / entry_for_calc * 100) + tp_display = f"{tp_percent:+.1f}%" + else: + tp_display = "N/A" + except: + sl_display = "N/A" + tp_display = "N/A" + + # 构建卡片标题和颜色 + if sig_action == 'buy': + title = f"🟢 {symbol} {timeframe_text}做多信号" + color = "green" + else: + title = f"🔴 {symbol} {timeframe_text}做空信号" + color = "red" + + # 构建卡片内容 + content_parts = [ + f"**{timeframe_text}** | **{grade}**{grade_icon} | **{confidence}%** 置信度", + f"{entry_type_icon} **入场**: {entry_type_text} | {position_icon} **仓位**: {position_text}", + f"", + ] + + # 入场价格显示 + if entry_type == 'limit': + # 限价订单:显示建议挂单价格和当前价格 + content_parts.append(f"📋 **挂单价格**: ${entry}") + content_parts.append(f"📍 **当前价格**: ${format_price(current_price)}") + else: + # 市价订单:显示当前价格作为入场价 + content_parts.append(f"💰 **入场价**: ${entry}") + + if sl != 'N/A': + content_parts.append(f"🛑 **止损价**: ${sl} ({sl_display})") + if tp != 'N/A': + content_parts.append(f"🎯 **止盈价**: ${tp} ({tp_display})") + + content_parts.append(f"") + content_parts.append(f"📝 **分析理由**:") + content_parts.append(f"{reasoning}") + + content = "\n".join(content_parts) + + # 根据配置发送通知 + if self.settings.feishu_enabled: + await self.feishu.send_card(title, content, color) + if self.settings.telegram_enabled: + # Telegram 使用文本格式 + message = f"{title}\n\n{content}" + await self.telegram.send_message(message) + logger.info(f" 📤 已发送市场信号通知 (阈值: {threshold}%)") + + except Exception as e: + logger.warning(f"发送市场信号通知失败: {e}") + import traceback + logger.debug(traceback.format_exc()) + + async def _send_signal_notification(self, market_signal: Dict[str, Any], + decision: Dict[str, Any], current_price: float, + is_paper: bool = True): + """发送交易执行通知(第三阶段)""" + try: + decision_type = decision.get('decision', 'HOLD') + + # 只在非观望决策时发送执行通知 + if decision_type == 'HOLD': + return + + # 构建消息 - 使用旧格式风格 + symbol = market_signal.get('symbol') + action = decision.get('action', '') + reasoning = decision.get('reasoning', '') + risk_analysis = decision.get('risk_analysis', '') + position_size = decision.get('position_size', 'N/A') + quantity = decision.get('quantity', 'N/A') + stop_loss = decision.get('stop_loss', '') + take_profit = decision.get('take_profit', '') + confidence = decision.get('confidence', 0) + + # 决策类型映射 + decision_map = { + 'OPEN': '开仓', + 'CLOSE': '平仓', + 'ADD': '加仓', + 'REDUCE': '减仓' + } + decision_text = decision_map.get(decision_type, decision_type) + + # 账户类型标识 + account_type = "📊 模拟" if is_paper else "💰 实盘" + + # 方向图标 + if 'long' in action.lower() or 'buy' in action.lower(): + action_icon = '🟢' + action_text = '做多' + elif 'short' in action.lower() or 'sell' in action.lower(): + action_icon = '🔴' + action_text = '做空' + else: + action_icon = '➖' + action_text = action + + # 仓位图标 + position_map = {'heavy': '🔥 重仓', 'medium': '📊 中仓', 'light': '🌱 轻仓'} + position_display = position_map.get(position_size, '🌱 轻仓') + + # 构建卡片标题和颜色 + if decision_type == 'OPEN': + title = f"{account_type} {symbol} {decision_text}" + color = "green" + elif decision_type == 'CLOSE': + title = f"{account_type} {symbol} {decision_text}" + color = "orange" + elif decision_type == 'ADD': + title = f"{account_type} {symbol} {decision_text}" + color = "green" + elif decision_type == 'REDUCE': + title = f"{account_type} {symbol} {decision_text}" + color = "orange" + else: + title = f"{account_type} {symbol} 交易执行" + color = "blue" + + # 构建卡片内容 + # quantity 是保证金金额,需要显示持仓价值 = 保证金 × 杠杆 + margin = quantity if quantity != 'N/A' else 0 + leverage = 20 # 模拟交易固定 20x 杠杆 + position_value = margin * leverage if isinstance(margin, (int, float)) else 'N/A' + position_value_display = f"${position_value:,.2f}" if isinstance(position_value, (int, float)) else "N/A" + + content_parts = [ + f"{action_icon} **操作**: {decision_text} ({action_text})", + f"{position_display.replace(' ', ': **')} | 📈 信心度: **{confidence}%**", + f"", + f"💰 **持仓价值**: {position_value_display}", + f"💵 **入场价**: ${current_price:,.2f}", + ] + + if stop_loss: + content_parts.append(f"🛑 **止损价**: ${stop_loss}") + if take_profit: + content_parts.append(f"🎯 **止盈价**: ${take_profit}") + + content_parts.append(f"") + content_parts.append(f"📝 **决策理由**:") + content_parts.append(f"{reasoning}") + + if risk_analysis: + content_parts.append(f"") + content_parts.append(f"⚠️ **风险分析**:") + content_parts.append(f"{risk_analysis}") + + content_parts.append(f"") + content_parts.append(f"💼 模拟交易已执行") + + content = "\n".join(content_parts) + + # 根据配置发送通知 + if self.settings.feishu_enabled: + await self.feishu.send_card(title, content, color) + if self.settings.telegram_enabled: + # Telegram 使用文本格式 + message = f"{title}\n\n{content}" + await self.telegram.send_message(message) + logger.info(f" 📤 已发送交易执行通知: {decision_text}") + + except Exception as e: + logger.warning(f"发送交易执行通知失败: {e}") + + async def _execute_paper_trade(self, decision: Dict[str, Any], market_signal: Dict[str, Any], current_price: float): + """执行模拟交易""" + try: + symbol = decision.get('symbol') + action = decision.get('action', '') + position_size = decision.get('position_size', 'light') + + # 直接使用 LLM 决策的 quantity + quantity = decision.get('quantity', 0) + if quantity <= 0: + logger.warning(f" ⚠️ LLM 决策的 quantity 无效: {quantity},使用默认值") + quantity = self._calculate_quantity_by_position_size(position_size, real_trading=False) + + logger.info(f" 准备创建模拟订单: {symbol} {action} {position_size}") + logger.info(f" LLM 决策金额: ${quantity:.2f} USDT") + + # 转换决策的 action 为 paper_trading 期望的格式 + trading_action = self._convert_trading_action(action) + + # 从市场信号中获取入场方式和入场价格 + best_signal = self._get_best_signal_from_market(market_signal) + entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market' + entry_price = best_signal.get('entry_zone', current_price) if best_signal else current_price + + logger.info(f" 入场方式: {entry_type} | 入场价格: ${entry_price:,.2f}") + + # 转换决策为订单格式(价格字段已在 LLM 解析时转换为 float) + order_data = { + 'symbol': symbol, + 'action': trading_action, # 使用转换后的 action + 'entry_type': entry_type, # 使用信号中的入场方式 + 'entry_price': entry_price if entry_type == 'limit' else current_price, # limit单使用entry_zone,market单使用current_price + 'stop_loss': decision.get('stop_loss'), + 'take_profit': decision.get('take_profit'), + 'confidence': decision.get('confidence', 50), + 'signal_grade': 'B', # 默认B级 + 'position_size': position_size, + 'quantity': quantity # 使用 LLM 决策的金额 + } + + logger.debug(f" 订单数据: {order_data}") + + result = self.paper_trading.create_order_from_signal(order_data, current_price) + + logger.debug(f" 创建订单结果: {result}") + + # 记录订单 + order = result.get('order') + if order: + # quantity 是保证金金额,持仓价值 = 保证金 × 20 + position_value = quantity * 20 + logger.info(f" 📝 已创建模拟订单: {order.order_id} | 仓位: {position_size} | 持仓价值: ${position_value:.2f}") + else: + logger.warning(f" ⚠️ 创建模拟订单失败: {result}") + + # 返回结果 + return result + + except Exception as e: + logger.error(f"执行模拟交易失败: {e}") + import traceback + logger.debug(traceback.format_exc()) + + def _convert_trading_action(self, action: str) -> str: + """转换交易决策的 action 为 buy/sell 格式""" + if not action: + return 'buy' + + action_lower = action.lower() + + # 做多相关的 action + if 'long' in action_lower or 'buy' in action_lower: + return 'buy' + # 做空相关的 action + elif 'short' in action_lower or 'sell' in action_lower: + return 'sell' + + # 默认返回 buy + return 'buy' + + def _calculate_quantity_by_position_size(self, position_size: str, real_trading: bool = False) -> float: + """根据仓位大小计算实际金额""" + if real_trading: + # 实盘交易配置 + position_config = { + 'heavy': self.settings.real_trading_max_single_position, + 'medium': self.settings.real_trading_max_single_position * 0.6, + 'light': self.settings.real_trading_max_single_position * 0.3 + } + else: + # 模拟交易配置 + position_config = { + 'heavy': self.settings.paper_trading_position_a, # 1000 + 'medium': self.settings.paper_trading_position_b, # 500 + 'light': self.settings.paper_trading_position_c # 200 + } + + return position_config.get(position_size, 200) + + async def _execute_real_trade(self, decision: Dict[str, Any], market_signal: Dict[str, Any], current_price: float): + """执行实盘交易""" + try: + symbol = decision.get('symbol') + action = decision.get('action', '') + position_size = decision.get('position_size', 'light') + + # 直接使用 LLM 决策的 quantity + quantity = decision.get('quantity', 0) + if quantity <= 0: + logger.warning(f" ⚠️ LLM 决策的 quantity 无效: {quantity},使用默认值") + quantity = self._calculate_quantity_by_position_size(position_size, real_trading=True) + + logger.info(f" 实盘交易: {position_size} → 保证金 ${quantity:.2f} USDT → 持仓价值 ${quantity * 20:.2f}") + + # 转换决策的 action 为 paper_trading 期望的格式 + trading_action = self._convert_trading_action(action) + + # 从市场信号中获取入场方式和入场价格 + best_signal = self._get_best_signal_from_market(market_signal) + entry_type = best_signal.get('entry_type', 'market') if best_signal else 'market' + entry_price = best_signal.get('entry_zone', current_price) if best_signal else current_price + + logger.info(f" 入场方式: {entry_type} | 入场价格: ${entry_price:,.2f}") + + # 转换决策为订单格式(价格字段已在 LLM 解析时转换为 float) + order_data = { + 'symbol': symbol, + 'action': trading_action, # 使用转换后的 action + 'entry_type': entry_type, # 使用信号中的入场方式 + 'entry_price': entry_price if entry_type == 'limit' else current_price, # limit单使用entry_zone,market单使用current_price + 'stop_loss': decision.get('stop_loss'), + 'take_profit': decision.get('take_profit'), + 'confidence': decision.get('confidence', 50), + 'signal_grade': 'B', + 'position_size': position_size, + 'quantity': quantity # 使用 LLM 决策的金额 + } + + result = self.real_trading.create_order_from_signal(order_data, current_price) + + if result.get('success'): + logger.info(f" 💰 已创建实盘订单: {result.get('order_id')} | 持仓价值: ${quantity * 20:.2f}") + await self._notify_real_order_created(symbol, decision, result) + else: + logger.warning(f" ⚠️ 创建实盘订单失败: {result.get('message', 'Unknown error')}") + + # 返回结果 + return result + + except Exception as e: + logger.error(f"执行实盘交易失败: {e}") + + async def _execute_close(self, decision: Dict[str, Any], paper_trading: bool = True): + """执行平仓 + + Args: + decision: 交易决策 + paper_trading: True=模拟交易, False=实盘交易 + """ + try: + symbol = decision.get('symbol') + + if paper_trading: + # 模拟平仓 + if self.paper_trading: + # TODO: 实现模拟平仓逻辑 + logger.info(f" 🔒 模拟平仓: {symbol}") + logger.info(f" 理由: {decision.get('reasoning', '')}") + else: + logger.warning(f" 模拟交易服务未初始化") + else: + # 实盘平仓 + if self.real_trading and self.real_trading.get_auto_trading_status(): + # TODO: 实现实盘平仓逻辑 + logger.info(f" 🔒 实盘平仓: {symbol}") + logger.info(f" 理由: {decision.get('reasoning', '')}") + else: + logger.warning(f" 实盘交易服务未启用或自动交易未开启") + + except Exception as e: + logger.error(f"执行平仓失败: {e}") + def _convert_to_paper_signal(self, symbol: str, signal: Dict[str, Any], current_price: float) -> Dict[str, Any]: """转换 LLM 信号格式为模拟交易格式""" @@ -629,9 +1265,9 @@ class CryptoAgent: data: Dict[str, pd.DataFrame] ): """ - 回顾并调整现有持仓(LLM 主动管理) + 回顾并调整现有持仓(基于市场分析 + 当前持仓状态) - 每次分析后自动回顾该交易对的所有持仓,让 LLM 决定是否需要: + 每次分析后自动回顾该交易对的所有持仓,让决策器决定是否需要: - 调整止损止盈 - 部分平仓 - 全部平仓 @@ -649,82 +1285,14 @@ class CryptoAgent: if not positions: return # 没有持仓需要回顾 - logger.info(f"\n🔄 【LLM 回顾持仓中...】共 {len(positions)} 个持仓") + logger.info(f"\n🔄 【持仓回顾中...】共 {len(positions)} 个持仓") - # 准备持仓数据 - position_data = [] - for order in positions: - entry_price = order.get('filled_price') or order.get('entry_price', 0) - current_price = self.binance.get_ticker(symbol).get('lastPrice', entry_price) - if isinstance(current_price, str): - current_price = float(current_price) + # TODO: 实现持仓回顾功能 + # 1. 获取当前市场信号 + # 2. 将持仓信息传递给决策器 + # 3. 根据决策调整持仓 - # 计算盈亏百分比 - side = order.get('side') - if side == 'long': - pnl_percent = ((current_price - entry_price) / entry_price) * 100 - else: - pnl_percent = ((entry_price - current_price) / entry_price) * 100 - - position_data.append({ - 'order_id': order.get('order_id'), - 'side': side, - 'entry_price': entry_price, - 'current_price': current_price, - 'stop_loss': order.get('stop_loss', 0), - 'take_profit': order.get('take_profit', 0), - 'quantity': order.get('quantity', 0), - 'pnl_percent': pnl_percent, - 'open_time': order.get('open_time') - }) - - # 调用 LLM 回顾分析 - decisions = await self.llm_analyzer.review_positions(symbol, position_data, data) - - if not decisions: - logger.info(" LLM 建议保持所有持仓不变") - return - - # 执行 LLM 的调整建议 - logger.info(f"\n📝 【LLM 调整建议】共 {len(decisions)} 个") - - for decision in decisions: - order_id = decision.get('order_id') - action = decision.get('action') - reason = decision.get('reason', '') - - action_map = { - 'HOLD': '保持', - 'ADJUST_SL_TP': '调整止损止盈', - 'PARTIAL_CLOSE': '部分平仓', - 'FULL_CLOSE': '全部平仓' - } - action_text = action_map.get(action, action) - - logger.info(f" 订单 {order_id[:8]}: {action_text} - {reason}") - - # 执行调整 - result = self.paper_trading.adjust_position_by_llm( - order_id=order_id, - action=action, - new_sl=decision.get('new_sl'), - new_tp=decision.get('new_tp'), - close_percent=decision.get('close_percent') - ) - - if result.get('success'): - # 发送通知 - await self._notify_position_adjustment(symbol, order_id, decision, result) - - # 如果是平仓操作,从活跃订单中移除 - if action in ['PARTIAL_CLOSE', 'FULL_CLOSE']: - closed_result = result.get('pnl', {}) - if closed_result: - pnl = closed_result.get('pnl', 0) - pnl_percent = closed_result.get('pnl_percent', 0) - logger.info(f" ✅ 已平仓: PnL ${pnl:+.2f} ({pnl_percent:+.1f}%)") - else: - logger.warning(f" ❌ 执行失败: {result.get('error')}") + logger.info(" 持仓回顾功能待实现") except Exception as e: logger.error(f"持仓回顾失败: {e}", exc_info=True) @@ -768,7 +1336,8 @@ class CryptoAgent: side_text = "做多" if side == 'buy' else "做空" grade = signal.get('grade', 'N/A') position_size = result.get('position_size', 'light') - quantity = result.get('quantity', 0) + quantity = result.get('quantity', 0) # 这是保证金金额 + position_value = quantity * 20 # 持仓价值 = 保证金 × 杠杆 message = f"""💰 实盘订单已创建 @@ -776,13 +1345,15 @@ class CryptoAgent: 方向: {side_text} 等级: {grade} 仓位: {position_size} -数量: ${quantity:.2f} +持仓价值: ${position_value:,.2f} 订单 ID: {result.get('order_id', '')[:12]}... ⚠️ 真实资金交易中""" - await self.feishu.send_text(message) - await self.telegram.send_message(message) + if self.settings.feishu_enabled: + await self.feishu.send_text(message) + if self.settings.telegram_enabled: + await self.telegram.send_message(message) logger.info(f"已发送实盘订单创建通知: {result.get('order_id')}") async def _notify_position_adjustment( @@ -826,8 +1397,10 @@ class CryptoAgent: message += f"\n平仓比例: {close_percent:.0f}%" message += f"\n剩余仓位: ${remaining:,.0f}" - await self.feishu.send_text(message) - await self.telegram.send_message(message) + if self.settings.feishu_enabled: + await self.feishu.send_text(message) + if self.settings.telegram_enabled: + await self.telegram.send_message(message) async def analyze_once(self, symbol: str) -> Dict[str, Any]: """单次分析(用于测试或手动触发)""" @@ -836,15 +1409,21 @@ class CryptoAgent: if not self._validate_data(data): return {'error': '数据不完整'} - # 获取持仓信息 - position_info = self.paper_trading.get_position_info() - - result = await self.llm_analyzer.analyze( + # 使用新架构:市场分析 + 交易决策 + market_signal = await self.market_analyzer.analyze( symbol, data, - symbols=self.symbols, - position_info=position_info + symbols=self.symbols ) - return result + + positions, account = self._get_trading_state() + decision = await self.decision_maker.make_decision( + market_signal, positions, account + ) + + return { + 'market_signal': market_signal, + 'trading_decision': decision + } def get_status(self) -> Dict[str, Any]: """获取智能体状态""" diff --git a/backend/app/crypto_agent/market_signal_analyzer.py b/backend/app/crypto_agent/market_signal_analyzer.py new file mode 100644 index 0000000..ca5232a --- /dev/null +++ b/backend/app/crypto_agent/market_signal_analyzer.py @@ -0,0 +1,546 @@ +""" +市场信号分析器 - 纯市场分析,不包含任何仓位信息 + +职责: +1. 分析K线、量价、技术指标 +2. 分析新闻舆情 +3. 输出纯市场信号(buy/sell/hold + confidence + reasoning) + +不负责: +- 仓位管理 +- 风险控制 +- 具体下单决策 +""" +import json +import re +from typing import Dict, Any, Optional, List +from datetime import datetime +from app.utils.logger import logger +from app.services.llm_service import llm_service +from app.services.news_service import get_news_service + + +class MarketSignalAnalyzer: + """市场信号分析器 - 只关注市场,输出客观信号""" + + # 纯市场分析系统提示词(与旧版 CRYPTO_SYSTEM_PROMPT 保持一致,只移除仓位管理) + MARKET_ANALYSIS_PROMPT = """你是一位专业的加密货币交易员和技术分析师。你的任务是综合分析**K线数据、量价关系、技术指标和新闻舆情**,给出交易信号。 + +## 核心理念 +加密货币市场波动大,每天都有交易机会。你的目标是: +- **主动寻找机会**,而不是被动等待完美信号 +- 短线交易重点关注:超跌反弹、超涨回落、关键位突破 +- 中线交易重点关注:趋势回调、形态突破、多周期共振 + +## 一、量价分析(最重要) +量价关系是判断趋势真假的核心: + +### 1. 健康上涨信号 +- **放量上涨**:价格上涨 + 成交量放大(量比>1.5)= 上涨有效,可追多 +- **缩量回调**:上涨后回调 + 成交量萎缩(量比<0.7)= 回调健康,可低吸 + +### 2. 健康下跌信号 +- **放量下跌**:价格下跌 + 成交量放大 = 下跌有效,可追空 +- **缩量反弹**:下跌后反弹 + 成交量萎缩 = 反弹无力,可做空 + +### 3. 量价背离(重要反转信号) +- **顶背离**:价格创新高,但成交量未创新高 → 上涨动能衰竭,警惕回落 +- **底背离**:价格创新低,但成交量未创新低 → 下跌动能衰竭,关注反弹 +- **天量见顶**:极端放量(量比>3)后价格滞涨 → 主力出货信号 +- **地量见底**:极端缩量(量比<0.3)后价格企稳 → 抛压枯竭信号 + +### 4. 突破确认 +- **有效突破**:突破关键位 + 放量确认(量比>1.5)= 真突破 +- **假突破**:突破关键位 + 缩量 = 假突破,可能回落 + +## 二、K线形态分析 +### 反转形态 +- **锤子线/倒锤子**:下跌趋势中出现,下影线长 = 底部信号 +- **吞没形态**:大阳吞没前一根阴线 = 看涨;大阴吞没前一根阳线 = 看跌 +- **十字星**:在高位/低位出现 = 变盘信号 +- **早晨之星/黄昏之星**:三根K线组合的反转信号 + +### 持续形态 +- **三连阳/三连阴**:趋势延续信号 +- **旗形整理**:趋势中的健康回调 + +## 三、技术指标分析 +### RSI(相对强弱指标) +**RSI 是最重要的超买超卖指标,请注意细节:** +- **RSI < 30**:超卖区,关注反弹机会 + - RSI 从 30 以下回升,交叉上穿 30:买入信号 + - RSI 底背离(价格新低但 RSI 未创新低):强买入信号 +- **RSI > 70**:超买区,关注回落风险 + - RSI 从 70 以上回落,交叉下穿 70:卖出信号 + - RSI 顶背离(价格新高但 RSI 未创新高):强卖出信号 +- **RSI 40-60**:震荡区,观望为主 +- **RSI 趋势**:RSI 自身的趋势变化比单一数值更重要 + +### MACD +- 金叉(DIF 上穿 DEA):做多信号 +- 死叉(DIF 下穿 DEA):做空信号 +- 零轴上方金叉:强势做多 +- 零轴下方死叉:强势做空 +- MACD 柱状图背离:重要反转信号 + +### 布林带 +- 触及下轨 + 企稳:反弹做多 +- 触及上轨 + 受阻:回落做空 +- 布林带收口:即将变盘 +- 布林带开口:趋势启动 + +### 均线系统(重要) +**均线系统是趋势判断的核心,请仔细分析:** +- **多头排列**(MA5 > MA10 > MA20 > MA50):强势上涨趋势,回调做多 +- **空头排列**(MA5 < MA10 < MA20 < MA50):强势下跌趋势,反弹做空 +- **价格与 MA 的关系**: + - 价格站稳 MA5/MA10 上方:短线上涨 + - 价格突破 MA20:中线转多 + - 价格跌破 MA20:中线转空 + - MA50 是中期趋势的分水岭 +- **均线金叉死叉**: + - MA5 上穿 MA10:短线买入信号 + - MA5 下穿 MA10:短线卖出信号 + - MA10 上穿 MA20:中线买入信号 + - MA10 下穿 MA20:中线卖出信号 + +## 四、新闻舆情分析 +结合最新市场新闻判断: +- **重大利好**:监管利好、机构入场、ETF 通过等 → 提高做多置信度 +- **重大利空**:监管打压、交易所暴雷、黑客攻击等 → 提高做空置信度 +- **市场情绪**:恐慌指数、社交媒体热度 +- **大户动向**:鲸鱼转账、交易所流入流出 + +## 五、多周期共振(关键分析框架) +**多周期共振是提高信号质量的核心方法:** + +### 周期层级关系 +- **4h(趋势层)**:决定中期大方向 +- **1h(主周期)**:主要交易周期 +- **15m(入场层)**:寻找入场时机 +- **5m(精确入场)**:确认最佳入场点 + +### 共振判断标准 +**强共振(A级信号)**: +- 所有周期趋势同向(如 4h多 + 1h多 + 15m多) +- 多周期 RSI 同时超买/超卖后出现背离 +- 多周期 MA 同时金叉/死叉 + +**中等共振(B级信号)**: +- 大周期(4h+1h)同向 +- 主周期(1h)技术指标明确 + +**弱共振(C级信号)**: +- 只有单一周期信号 +- 多周期方向不一致 + +### 实战策略 +- **顺势交易**:4h 和 1h 同向时,在 15m/5m 寻找入场点 +- **逆势谨慎**:只有 1h 信号但 4h 反向时,降低置信度 +- **突破交易**:多周期同时突破关键位,信号最强 + +## 六、入场方式 +根据市场分析综合判断入场方式: +- **market**:现价立即入场 + - 信号强烈且明确(A级或高置信度B级) + - 放量突破关键位,趋势明确 + - 多周期共振,等待可能错过机会 + - 市场波动大,等待可能价格变化太快 +- **limit**:挂单等待入场 + - 信号强度中等(B级或C级) + - 当前价格距离理想入场位有一定距离 + - 判断市场可能回调到更好位置 + - 希望获得更优成交价格,愿意承担可能无法成交的风险 + +**重要**: +- 必须同时输出 `entry_zone`(建议入场价)和 `entry_type`(入场方式) +- 入场方式由你的市场分析判断,不是简单的价格距离计算 +- 如果选择 `limit`,`entry_zone` 应该是你建议的挂单价格 + +## 输出格式 +请严格按照以下 JSON 格式输出: + +```json +{ + "analysis_summary": "简要描述当前市场状态(50字以内)", + "volume_analysis": "量价分析结论(30字以内)", + "news_sentiment": "positive/negative/neutral", + "news_impact": "新闻对市场的影响分析(30字以内)", + "signals": [ + { + "type": "short_term/medium_term/long_term", + "action": "buy/sell", + "entry_type": "market/limit", + "confidence": 0-100, + "grade": "A/B/C/D", + "entry_zone": 66000, + "stop_loss": 65500, + "take_profit": 67500, + "reasoning": "详细的入场理由(必须包含量价分析)", + "key_factors": ["关键因素1", "关键因素2"] + } + ], + "key_levels": { + "support": [65000, 64500], + "resistance": [67000, 67500] + } +} +``` + +## 重要说明 +- **所有价格必须是纯数字**,不要加 $ 符号、逗号或其他格式 +- `entry_zone`、`stop_loss`、`take_profit` 必须是数字类型,不要是字符串 +- `key_levels` 中的支撑位和阻力位也必须是数字数组 + +## 信号等级与置信度 +- **A级**(80-100):量价配合 + 多指标共振 + 多周期确认 +- **B级**(60-79):量价配合 + 主要指标确认 +- **C级**(40-59):有机会但量价不够理想 +- **D级**(<40):量价背离或信号矛盾 + +## 注意事项 +1. **只在有明确的做多或做空机会时才输出信号**(action 为 buy 或 sell) +2. 如果市场不明朗,没有明确交易机会,**不要输出任何信号**(signals 为空数组 []) +3. 信号强度(confidence)要合理,不要随意给高分 +4. 60-70分:一般信号,可轻仓试探 +5. 75-85分:较强信号,可正常仓位 +6. 90+分:强信号,但也要控制风险 +7. 不要输出 action 为 "wait" 的信号,如果没有交易机会就不输出 + +记住:你只负责分析市场,输出客观的交易信号,不需要考虑仓位管理和风险控制! +""" + + def __init__(self): + self.news_service = get_news_service() + + async def analyze(self, symbol: str, data: Dict[str, Any], + symbols: List[str] = None) -> Dict[str, Any]: + """ + 分析市场并生成信号 + + Args: + symbol: 交易对 + data: 多周期K线数据 + symbols: 所有监控的交易对(用于市场对比) + + Returns: + 市场信号字典 + """ + try: + # 1. 准备市场数据 + market_context = self._prepare_market_context(symbol, data, symbols) + + # 2. 获取新闻舆情 + news_context = await self._get_news_context(symbol) + + # 3. 构建 LLM 提示词 + prompt = self._build_analysis_prompt(symbol, market_context, news_context) + + # 4. 调用 LLM 分析 + messages = [ + {"role": "system", "content": self.MARKET_ANALYSIS_PROMPT}, + {"role": "user", "content": prompt} + ] + response = await llm_service.achat(messages) + + # 5. 解析结果 + result = self._parse_llm_response(response, symbol) + + return result + + except Exception as e: + logger.error(f"市场信号分析失败: {e}") + import traceback + logger.debug(traceback.format_exc()) + return self._get_empty_signal(symbol) + + def _prepare_market_context(self, symbol: str, data: Dict, + symbols: List[str] = None) -> str: + """准备市场上下文信息""" + context_parts = [] + + # 当前价格和24h变化 + current_price = float(data['5m'].iloc[-1]['close']) + price_change_24h = self._calculate_price_change_24h(data['1h']) + context_parts.append(f"当前价格: ${current_price:,.2f} ({price_change_24h})") + + # 多周期数据 + for tf_name, df in data.items(): + if df is None or len(df) == 0: + continue + + latest = df.iloc[-1] + context_parts.append(f"\n## {tf_name} 数据") + context_parts.append(f"开: {latest['open']}, 高: {latest['high']}, 低: {latest['low']}, 收: {latest['close']}") + context_parts.append(f"成交量: {latest.get('volume', 'N/A')}") + + # 技术指标 + if 'rsi' in df.columns: + rsi = df['rsi'].iloc[-1] + context_parts.append(f"RSI: {rsi:.2f}") + if 'macd' in df.columns: + macd = df['macd'].iloc[-1] + signal = df['macd_signal'].iloc[-1] + context_parts.append(f"MACD: {macd:.4f}, 信号线: {signal:.4f}") + if 'bb_upper' in df.columns: + bb_upper = df['bb_upper'].iloc[-1] + bb_lower = df['bb_lower'].iloc[-1] + context_parts.append(f"布林带: 上轨 {bb_upper:.2f}, 下轨 {bb_lower:.2f}") + + # 均线系统 + context_parts.append(f"\n## 均线系统") + df_1h = data.get('1h') + if df_1h is not None and len(df_1h) > 0: + latest = df_1h.iloc[-1] + context_parts.append(f"MA5: {latest.get('ma5', 'N/A')}") + context_parts.append(f"MA10: {latest.get('ma10', 'N/A')}") + context_parts.append(f"MA20: {latest.get('ma20', 'N/A')}") + context_parts.append(f"MA50: {latest.get('ma50', 'N/A')}") + + # 判断均线排列 + ma5 = latest.get('ma5', 0) + ma10 = latest.get('ma10', 0) + ma20 = latest.get('ma20', 0) + ma50 = latest.get('ma50', 0) + + if all([ma5, ma10, ma20, ma50]): + if ma5 > ma10 > ma20 > ma50: + context_parts.append("均线排列: 多头排列 📈") + elif ma5 < ma10 < ma20 < ma50: + context_parts.append("均线排列: 空头排列 📉") + else: + context_parts.append("均线排列: 交织,方向不明") + + # 量比分析 + df_5m = data.get('5m') + if df_5m is not None and len(df_5m) >= 20: + vol_latest = df_5m['volume'].iloc[-1] + vol_ma20 = df_5m['volume'].iloc[-20:-1].mean() + volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1 + context_parts.append(f"\n## 量价分析") + context_parts.append(f"最新成交量: {vol_latest:.0f}") + context_parts.append(f"20周期均量: {vol_ma20:.0f}") + context_parts.append(f"量比: {volume_ratio:.2f}") + + if volume_ratio > 1.5: + context_parts.append("量价状态: 放量 📊") + elif volume_ratio < 0.7: + context_parts.append("量价状态: 缩量 📉") + else: + context_parts.append("量价状态: 平量 ➖") + + return "\n".join(context_parts) + + async def _get_news_context(self, symbol: str) -> str: + """获取新闻舆情上下文""" + try: + news_result = await self.news_service.get_crypto_news(symbol) + + if not news_result or not news_result.get('articles'): + return "无最新新闻" + + articles = news_result['articles'][:5] # 只取前5条 + context_parts = ["\n## 最新新闻"] + + for article in articles: + title = article.get('title', '') + source = article.get('source', '') + published_at = article.get('publishedAt', '') + time_str = published_at.split('T')[1][:5] if 'T' in published_at else '' + + context_parts.append(f"- [{time_str}] {title} ({source})") + + return "\n".join(context_parts) + + except Exception as e: + logger.warning(f"获取新闻失败: {e}") + return "新闻获取失败" + + def _build_analysis_prompt(self, symbol: str, market_context: str, + news_context: str) -> str: + """构建分析提示词""" + return f"""请分析 {symbol} 的市场情况: + +{market_context} + +{news_context} + +请根据以上数据,给出你的市场判断和交易信号。 +""" + + def _parse_llm_response(self, response: str, symbol: str) -> Dict[str, Any]: + """解析 LLM 响应""" + try: + # 尝试提取 JSON + json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response) + if json_match: + json_str = json_match.group(1) + else: + json_match = re.search(r'\{[\s\S]*\}', response) + if json_match: + json_str = json_match.group(0) + else: + raise ValueError("无法找到 JSON 响应") + + # 清理 JSON 字符串(移除可能导致解析错误的注释等) + json_str = self._clean_json_string(json_str) + + logger.debug(f"解析的 JSON 字符串: {json_str[:500]}...") # 打印前500字符用于调试 + + result = json.loads(json_str) + + # 清理价格字段 - 转换为 float + result = self._clean_price_fields(result) + + # 添加元数据 + result['symbol'] = symbol + result['timestamp'] = datetime.now().isoformat() + result['raw_response'] = response + + # 兼容处理:确保 signals 中的字段与旧格式一致 + if 'signals' in result: + for sig in result['signals']: + # LLM 输出的 "type" 是 timeframe (short_term/medium_term/long_term) + # 需要映射为 "timeframe",而 "action" 才是 buy/sell/wait + if 'type' in sig: + # 如果 type 是 short_term/medium_term/long_term,映射为 timeframe + if sig['type'] in ['short_term', 'medium_term', 'long_term']: + sig['timeframe'] = sig.pop('type') + # 如果 type 是 buy/sell/wait,映射为 action + elif sig['type'] in ['buy', 'sell', 'wait']: + sig['action'] = sig.pop('type') + + # 确保 action 字段存在 + if 'action' not in sig and 'timeframe' in sig: + # 从 reasoning 或其他字段推断 action + sig['action'] = 'wait' + + # 确保 grade 字段存在 + if 'grade' not in sig: + # 根据 confidence 推断 grade + confidence = sig.get('confidence', 0) + if confidence >= 80: + sig['grade'] = 'A' + elif confidence >= 60: + sig['grade'] = 'B' + elif confidence >= 40: + sig['grade'] = 'C' + else: + sig['grade'] = 'D' + + # 从信号中推断 market_state 和 trend + if 'signals' in result and result['signals']: + # 找出置信度最高的信号 + best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0)) + action = best_signal.get('action', 'wait') + confidence = best_signal.get('confidence', 0) + + # 推断市场状态 + if confidence >= 70: + if action == 'buy': + result['market_state'] = '强势上涨' + elif action == 'sell': + result['market_state'] = '强势下跌' + else: + result['market_state'] = '震荡整理' + else: + result['market_state'] = '震荡整理' + + # 推断趋势 + if action == 'buy': + result['trend'] = 'up' + elif action == 'sell': + result['trend'] = 'down' + else: + result['trend'] = 'sideways' + else: + result['market_state'] = '无明确信号' + result['trend'] = 'sideways' + + logger.info(f"✅ 市场信号分析完成: {symbol}") + logger.debug(f"市场信号: {json.dumps(result, ensure_ascii=False, indent=2)}") + + return result + + except Exception as e: + logger.warning(f"解析 LLM 响应失败: {e}") + logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符 + return self._get_empty_signal(symbol) + + def _clean_json_string(self, json_str: str) -> str: + """清理 JSON 字符串,移除可能导致解析错误的内容""" + # 移除单行注释 // ... + json_str = re.sub(r'//.*?(?=\n|$)', '', json_str) + # 移除多行注释 /* ... */ + json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str) + # 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}) + json_str = re.sub(r',\s*([}\]])', r'\1', json_str) + return json_str + + def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: + """清理价格字段,转换为 float""" + def clean_price(price_value): + if price_value is None: + return None + if isinstance(price_value, (int, float)): + return float(price_value) + if isinstance(price_value, str): + # 移除 $ 符号和逗号 + cleaned = price_value.replace('$', '').replace(',', '').strip() + if cleaned: + try: + return float(cleaned) + except ValueError: + return None + return None + + # 清理 key_levels 中的支撑位和阻力位 + if 'key_levels' in data and data['key_levels']: + key_levels = data['key_levels'] + if 'support' in key_levels: + data['key_levels']['support'] = [clean_price(s) for s in key_levels['support']] + if 'resistance' in key_levels: + data['key_levels']['resistance'] = [clean_price(r) for r in key_levels['resistance']] + + # 清理 signals 中的价格字段 + if 'signals' in data: + for sig in data['signals']: + price_fields = ['entry_zone', 'stop_loss', 'take_profit'] + for field in price_fields: + if field in sig: + sig[field] = clean_price(sig[field]) + + return data + + def _calculate_price_change_24h(self, df) -> str: + """计算24小时涨跌幅""" + try: + if df is None or len(df) < 24: + return "N/A" + + current_price = float(df['close'].iloc[-1]) + price_24h_ago = float(df['close'].iloc[-24]) + change = ((current_price - price_24h_ago) / price_24h_ago) * 100 + + sign = "+" if change >= 0 else "" + return f"{sign}{change:.2f}%" + + except Exception as e: + logger.debug(f"计算24h涨跌失败: {e}") + return "N/A" + + def _get_empty_signal(self, symbol: str) -> Dict[str, Any]: + """返回空信号""" + return { + 'symbol': symbol, + 'analysis_summary': 'unknown', + 'volume_analysis': '分析失败', + 'news_sentiment': 'neutral', + 'news_impact': '无', + 'market_state': '分析失败', + 'trend': 'sideways', + 'signals': [], + 'key_levels': {}, + 'timestamp': datetime.now().isoformat(), + 'error': '信号分析失败' + } diff --git a/backend/app/crypto_agent/trading_decision_maker.py b/backend/app/crypto_agent/trading_decision_maker.py new file mode 100644 index 0000000..c64f714 --- /dev/null +++ b/backend/app/crypto_agent/trading_decision_maker.py @@ -0,0 +1,461 @@ +""" +交易决策器 - 基于市场信号和当前状态做出交易决策 + +职责: +1. 接收市场信号(不含仓位信息) +2. 接收当前持仓状态 +3. 接收账户状态 +4. 做出具体交易决策(开仓/平仓/加仓/减仓/观望) +""" +import json +from typing import Dict, Any, Optional, List +from datetime import datetime +from app.utils.logger import logger +from app.services.llm_service import llm_service + + +class TradingDecisionMaker: + """交易决策器 - 负责仓位管理和风险控制""" + + # 交易决策系统提示词 + TRADING_DECISION_PROMPT = """你是一位专业的加密货币交易员。你的职责是**根据市场信号和当前仓位状态,做出交易决策**。 + +## 你的职责 +- 分析市场信号的质量 +- 结合当前持仓评估风险 +- 考虑账户整体状况 +- 做出具体交易决策 + +## 输入信息 +你将收到: +1. 市场信号(方向、强度、理由) +2. 当前持仓列表 +3. 账户状态(余额、已用保证金、杠杆等) + +## 决策类型 +### 1. 开仓(OPEN) +**时机**:无持仓或可以加仓时 +**要求**: +- **A级信号(confidence >= 80)**:可开 heavy/medium/light 仓位 +- **B级信号(60 <= confidence < 80)**:只能开 medium/light 仓位 +- **C级信号(40 <= confidence < 60)**:只能开 light 仓位 +- **D级信号(confidence < 40)**:不开仓,返回 HOLD +- 账户有足够的可用杠杆空间 +- 风险可控(止损明确) + +### 2. 平仓(CLOSE) +**时机**: +- 触发止损/止盈 +- 信号反转 +- 风险过大 + +### 3. 加仓(ADD) +**时机**: +- 已有盈利持仓 +- 同向新信号 +- 趋势加强 + +### 4. 减仓(REDUCE) +**时机**: +- 部分止盈 +- 降低风险敞口 +- 不确定增加 + +### 5. 观望(HOLD) +**时机**: +- 信号不明确 +- 风险过大 +- 可用杠杆空间不足 +- 等待更好时机 + +## 仓位管理规则 +### 全仓模式(联合保证金) +- **最大杠杆 20 倍**:最大仓位金额 = 账户余额 × 20 +- **当前杠杆**:当前杠杆 = 当前持仓价值 / 账户余额 +- **可用杠杆空间百分比**:(最大仓位金额 - 当前持仓价值) / 最大仓位金额 × 100% + +### 仓位大小选择(综合考虑信号质量和可用空间) +仓位大小由**信号等级**和**可用杠杆空间**共同决定: + +#### 1. 信号等级决定最大仓位上限 +- **A级信号(80-100分)**:可选择 heavy/medium/light +- **B级信号(60-79分)**:只能选择 medium/light +- **C级信号(40-59分)**:只能选择 light +- **D级信号(<40分)**:不开仓,返回 HOLD + +#### 2. 可用杠杆空间决定是否可开仓 +- **可用空间 >= 10%**:可以开 heavy 仓位 +- **可用空间 >= 5%**:可以开 medium 仓位 +- **可用空间 >= 3%**:可以开 light 仓位 +- **可用空间 < 3%**:不新开仓,返回 HOLD + +#### 3. 仓位大小与保证金金额 +- **heavy**:使用保证金 = 账户余额 × 12% +- **medium**:使用保证金 = 账户余额 × 6% +- **light**:使用保证金 = 账户余额 × 3% + +#### 4. 选择逻辑示例 +假设当前可用杠杆空间为 50%: +- A级信号 → 可以选择 heavy(空间足够,信号质量高) +- B级信号 → 只能选择 medium/light(信号质量中等) +- C级信号 → 只能选择 light(信号质量一般,保守仓位) + +假设当前可用杠杆空间为 4%: +- A级信号 → 只能选择 medium/light(空间不足) +- B级信号 → 只能选择 light(空间不足) +- C级信号 → 不开仓(空间不足) + +**重要**:`quantity` 字段输出的是**保证金金额**,不是持仓价值。交易系统会使用杠杆自动计算实际持仓价值。 + +### 计算示例 +- 账户余额:$10,000 +- 最大仓位金额:$10,000 × 20 = $200,000 +- 当前持仓价值:$20,000(当前杠杆 2x) +- 可用仓位金额:$200,000 - $20,000 = $180,000 +- 可用杠杆空间:$180,000 / $200,000 = 90% +- 计算公式:保证金金额 = 账户余额 × 使用比例 + - heavy:保证金 $10,000 × 12% = $1,200 → 持仓价值 $1,200 × 20 = $24,000 + - medium:保证金 $10,000 × 6% = $600 → 持仓价值 $600 × 20 = $12,000 + - light:保证金 $10,000 × 3% = $300 → 持仓价值 $300 × 20 = $6,000 + +### 风险控制 +- 单笔最大亏损不超过账户 2% +- 止损必须明确 +- 避免过度交易 +- 不追涨杀跌 + +## 决策输出格式 +请以 JSON 格式输出: + +```json +{ + "decision": "OPEN/CLOSE/ADD/REDUCE/HOLD", + "symbol": "BTC/USDT", + "side": "buy/sell", + "action": "open_long/close_short/add_long/...", + "position_size": "heavy/medium/light", + "quantity": 1200, + "confidence": 0-100, + "reasoning": "决策理由", + "risk_analysis": "风险分析", + "stop_loss": 65500, + "take_profit": 67500, + "notes": "其他说明" +} +``` + +## 重要说明 +- **所有价格必须是纯数字**,不要加 $ 符号、逗号或其他格式 +- `stop_loss`、`take_profit` 必须是数字类型 +- **quantity 是保证金金额(USDT)**,交易系统会使用杠杆计算实际持仓价值 +- **position_size** 和 **quantity** 必须匹配(heavy 对应最大保证金金额) +- **入场方式由市场信号决定**,你只需要根据市场信号的 `entry_type` 来执行交易 + +## 注意事项 +1. **安全第一**:宁可错过机会,也不要冒过大风险 +2. **遵守杠杆限制**:总杠杆永远不超过 20 倍 +3. **理性决策**:不要被 FOMO 情绪左右 +4. **灵活应变**:根据市场变化调整策略 +5. **记录决策**:清晰记录决策理由,便于复盘 + +记住:你是交易执行者,不是市场分析师。市场分析已经完成了,你只需要根据分析结果和当前状态做出理性的交易决策! +""" + + def __init__(self): + pass + + async def make_decision(self, + market_signal: Dict[str, Any], + positions: List[Dict[str, Any]], + account: Dict[str, Any], + current_price: float = None) -> Dict[str, Any]: + """ + 做出交易决策 + + Args: + market_signal: 市场信号(来自 MarketSignalAnalyzer) + positions: 当前持仓列表 + account: 账户状态 + current_price: 当前价格(用于判断入场方式) + + Returns: + 交易决策字典 + """ + try: + # 1. 准备决策上下文 + decision_context = self._prepare_decision_context( + market_signal, positions, account, current_price + ) + + # 2. 构建提示词 + prompt = self._build_decision_prompt(decision_context) + + # 3. 调用 LLM 做决策 + messages = [ + {"role": "system", "content": self.TRADING_DECISION_PROMPT}, + {"role": "user", "content": prompt} + ] + response = await llm_service.achat(messages) + + # 4. 解析结果 + result = self._parse_decision_response(response, market_signal['symbol']) + + # 5. 验证决策安全性 + result = self._validate_decision(result, positions, account) + + return result + + except Exception as e: + logger.error(f"交易决策失败: {e}") + import traceback + logger.debug(traceback.format_exc()) + return self._get_hold_decision(market_signal['symbol'], "决策系统异常") + + def _prepare_decision_context(self, + market_signal: Dict[str, Any], + positions: List[Dict[str, Any]], + account: Dict[str, Any], + current_price: float = None) -> Dict[str, Any]: + """准备决策上下文""" + context = { + 'symbol': market_signal.get('symbol'), + 'market_state': market_signal.get('market_state'), + 'trend': market_signal.get('trend'), + 'signals': market_signal.get('signals', []), + 'key_levels': market_signal.get('key_levels', {}), + 'positions': positions, + 'account': account, + 'current_price': current_price + } + + # 计算账户状态 + balance = float(account.get('current_balance', 0)) + total_position_value = float(account.get('total_position_value', 0)) + used_margin = float(account.get('used_margin', 0)) + + # 当前杠杆(全仓模式) + max_leverage = 20 + max_position_value = balance * max_leverage # 最大仓位金额 + current_leverage = (total_position_value / balance) if balance > 0 else 0 + available_position_value = max(0, max_position_value - total_position_value) # 剩余可用仓位金额 + available_leverage_percent = (available_position_value / max_position_value * 100) if max_position_value > 0 else 0 # 可用杠杆空间百分比 + + context['leverage_info'] = { + 'balance': balance, + 'current_leverage': current_leverage, + 'total_position_value': total_position_value, + 'max_position_value': max_position_value, + 'available_position_value': available_position_value, + 'available_leverage_percent': available_leverage_percent, + 'max_leverage': max_leverage + } + + return context + + def _build_decision_prompt(self, context: Dict[str, Any]) -> str: + """构建决策提示词""" + prompt_parts = [] + + # 市场信号 + prompt_parts.append(f"## 市场信号") + prompt_parts.append(f"交易对: {context['symbol']}") + prompt_parts.append(f"市场状态: {context.get('market_state')}") + prompt_parts.append(f"趋势: {context.get('trend')}") + + # 当前价格(如果有) + current_price = context.get('current_price') + if current_price: + prompt_parts.append(f"当前价格: ${current_price:,.2f}") + + # 信号列表 + signals = context.get('signals', []) + if signals: + prompt_parts.append(f"\n## 信号列表") + for i, sig in enumerate(signals, 1): + # timeframe 是 short_term/medium_term/long_term + timeframe = sig.get('timeframe', 'N/A') + action = sig.get('action', 'N/A') + prompt_parts.append(f"{i}. {timeframe} | {action}") + prompt_parts.append(f" 信心度: {sig.get('confidence', 0)}") + + # 添加入场价格信息 + entry_zone = sig.get('entry_zone') + if entry_zone: + prompt_parts.append(f" 建议入场价: ${entry_zone:,.2f}") + + prompt_parts.append(f" 理由: {sig.get('reasoning', 'N/A')}") + + # 关键价位 + key_levels = context.get('key_levels', {}) + if key_levels: + prompt_parts.append(f"\n## 关键价位") + if key_levels.get('support'): + # 提取数字并格式化 + import re + def extract_num(val): + if isinstance(val, (int, float)): + return float(val) + if isinstance(val, str): + match = re.search(r'[\d,]+\.?\d*', val.replace(',', '')) + if match: + return float(match.group()) + return None + + supports = [extract_num(s) for s in key_levels['support'][:3]] + supports_str = ', '.join([f"${s:,.2f}" for s in supports if s is not None]) + prompt_parts.append(f"支撑位: {supports_str}") + if key_levels.get('resistance'): + import re + def extract_num(val): + if isinstance(val, (int, float)): + return float(val) + if isinstance(val, str): + match = re.search(r'[\d,]+\.?\d*', val.replace(',', '')) + if match: + return float(match.group()) + return None + + resistances = [extract_num(r) for r in key_levels['resistance'][:3]] + resistances_str = ', '.join([f"${r:,.2f}" for r in resistances if r is not None]) + prompt_parts.append(f"阻力位: {resistances_str}") + + # 当前持仓 + positions = context.get('positions', []) + prompt_parts.append(f"\n## 当前持仓") + if positions: + for pos in positions: + if pos.get('holding', 0) > 0: + prompt_parts.append(f"- {pos.get('symbol')}: {pos.get('side')} {pos.get('holding')} USDT") + prompt_parts.append(f" 开仓价: ${pos.get('entry_price')}") + prompt_parts.append(f" 止损: ${pos.get('stop_loss')}") + prompt_parts.append(f" 止盈: ${pos.get('take_profit')}") + else: + prompt_parts.append("无持仓") + + # 账户状态 + account = context.get('account', {}) + lev_info = context.get('leverage_info', {}) + prompt_parts.append(f"\n## 账户状态") + prompt_parts.append(f"余额: ${account.get('current_balance', 0):.2f}") + prompt_parts.append(f"可用: ${account.get('available', 0):.2f}") + prompt_parts.append(f"已用保证金: ${account.get('used_margin', 0):.2f}") + prompt_parts.append(f"持仓价值: ${account.get('total_position_value', 0):.2f}") + prompt_parts.append(f"\n## 杠杆信息") + prompt_parts.append(f"当前杠杆: {lev_info.get('current_leverage', 0):.1f}x") + prompt_parts.append(f"最大仓位金额: ${lev_info.get('max_position_value', 0):,.2f}") + prompt_parts.append(f"可用仓位金额: ${lev_info.get('available_position_value', 0):,.2f}") + prompt_parts.append(f"可用杠杆空间: {lev_info.get('available_leverage_percent', 0):.1f}%") + prompt_parts.append(f"最大杠杆限制: {lev_info.get('max_leverage', 20)}x") + + prompt_parts.append(f"\n请根据以上信息,做出交易决策。") + + return "\n".join(prompt_parts) + + def _parse_decision_response(self, response: str, symbol: str) -> Dict[str, Any]: + """解析决策响应""" + try: + import re + + # 尝试提取 JSON + json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response) + if json_match: + json_str = json_match.group(1) + else: + json_match = re.search(r'\{[\s\S]*\}', response) + if json_match: + json_str = json_match.group(0) + else: + raise ValueError("无法找到 JSON 响应") + + # 清理 JSON 字符串 + json_str = self._clean_json_string(json_str) + + result = json.loads(json_str) + + # 清理价格字段 - 转换为 float + result = self._clean_price_fields(result) + + # 添加元数据 + result['symbol'] = symbol + result['timestamp'] = datetime.now().isoformat() + result['raw_response'] = response + + logger.info(f"✅ 交易决策完成: {symbol} | {result.get('decision', 'HOLD')}") + + return result + + except Exception as e: + logger.warning(f"解析决策响应失败: {e}") + logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符 + return self._get_hold_decision(symbol, "解析失败,默认观望") + + def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]: + """清理价格字段,转换为 float""" + def clean_price(price_value): + if price_value is None: + return None + if isinstance(price_value, (int, float)): + return float(price_value) + if isinstance(price_value, str): + # 移除 $ 符号和逗号 + cleaned = price_value.replace('$', '').replace(',', '').strip() + if cleaned: + try: + return float(cleaned) + except ValueError: + return None + return None + + # 清理顶层价格字段 + price_fields = ['stop_loss', 'take_profit', 'quantity'] + for field in price_fields: + if field in data: + data[field] = clean_price(data[field]) + + return data + + def _clean_json_string(self, json_str: str) -> str: + """清理 JSON 字符串,移除可能导致解析错误的内容""" + import re + # 移除单行注释 // ... + json_str = re.sub(r'//.*?(?=\n|$)', '', json_str) + # 移除多行注释 /* ... */ + json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str) + # 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}) + json_str = re.sub(r',\s*([}\]])', r'\1', json_str) + return json_str + + def _validate_decision(self, decision: Dict[str, Any], + positions: List[Dict[str, Any]], + account: Dict[str, Any]) -> Dict[str, Any]: + """验证决策安全性""" + # 检查杠杆限制 + if decision.get('decision') in ['OPEN', 'ADD']: + balance = float(account.get('current_balance', 0)) + total_position_value = float(account.get('total_position_value', 0)) + max_leverage = 20 + max_position_value = balance * max_leverage + + # quantity 是保证金金额,需要乘以杠杆得到持仓价值 + margin = float(decision.get('quantity', 0)) + position_value = margin * max_leverage # 使用最大杠杆计算持仓价值 + new_total_value = total_position_value + position_value + + if new_total_value > max_position_value: + logger.warning(f"⚠️ 决策被拒绝: 超过最大仓位金额 (保证金 ${margin:.2f} → 持仓价值 ${position_value:.2f}, 总计 ${new_total_value:,.2f} > ${max_position_value:,.2f})") + return self._get_hold_decision( + decision['symbol'], + f"超过最大仓位金额 (保证金 ${margin:.2f} → 持仓价值 ${position_value:.2f}, 总计 ${new_total_value:,.2f} > ${max_position_value:,.2f})" + ) + + return decision + + def _get_hold_decision(self, symbol: str, reason: str = "") -> Dict[str, Any]: + """返回观望决策""" + return { + 'decision': 'HOLD', + 'symbol': symbol, + 'action': 'hold', + 'reasoning': f'观望: {reason}', + 'timestamp': datetime.now().isoformat() + } diff --git a/backend/app/services/news_service.py b/backend/app/services/news_service.py index 74e17be..72c62d7 100644 --- a/backend/app/services/news_service.py +++ b/backend/app/services/news_service.py @@ -515,6 +515,58 @@ class NewsService: logger.debug(traceback.format_exc()) return [] + async def get_crypto_news(self, symbol: str, limit: int = 10) -> Dict[str, Any]: + """ + 获取加密货币相关新闻 + + Args: + symbol: 加密货币代码(如 BTCUSDT) + limit: 最大结果数 + + Returns: + 包含 articles 列表的字典 + """ + try: + # 获取一般市场新闻(包含加密货币相关) + all_news = await self.get_latest_news(limit=limit * 2) + + # 筛选与该币种相关的新闻 + symbol_keywords = { + 'BTCUSDT': ['BTC', 'Bitcoin', '比特币', 'bitcoin'], + 'ETHUSDT': ['ETH', 'Ethereum', '以太坊', 'ethereum'], + 'SOLUSDT': ['SOL', 'Solana', 'solana'], + 'BNBUSDT': ['BNB', 'Binance', '币安'], + 'ADAUSDT': ['ADA', 'Cardano', 'cardano'], + 'XRPUSDT': ['XRP', 'Ripple'], + 'DOGEUSDT': ['DOGE', 'Dogecoin', '狗狗币'], + 'MATICUSDT': ['MATIC', 'Polygon'], + } + + # 通用加密货币关键词 + crypto_keywords = ['crypto', 'cryptocurrency', '加密货币', 'blockchain', '区块链'] + + keywords = symbol_keywords.get(symbol, []) + crypto_keywords + + filtered_news = [] + for news in all_news: + title = news.get('title', '').lower() + description = news.get('description', '').lower() + + if any(kw.lower() in title or kw.lower() in description for kw in keywords): + filtered_news.append(news) + + if len(filtered_news) >= limit: + break + + return { + 'articles': filtered_news[:limit], + 'total': len(filtered_news) + } + + except Exception as e: + logger.warning(f"获取加密货币新闻失败: {e}") + return {'articles': [], 'total': 0} + def format_news_for_llm(self, news_list: List[Dict[str, Any]], max_items: int = 10) -> str: """ diff --git a/backend/app/services/paper_trading_service.py b/backend/app/services/paper_trading_service.py index 0171aca..d4930b6 100644 --- a/backend/app/services/paper_trading_service.py +++ b/backend/app/services/paper_trading_service.py @@ -186,8 +186,18 @@ class PaperTradingService: return result # === 动态仓位计算 === - position_size = signal.get('position_size', 'light') - margin, position_value = self._calculate_dynamic_position(position_size, symbol) + # 优先使用信号中的 quantity(LLM 决策的保证金金额) + quantity_from_signal = signal.get('quantity') + if quantity_from_signal is not None and quantity_from_signal > 0: + # LLM 决策的 quantity 是保证金金额 + margin = float(quantity_from_signal) + # 计算持仓价值(保证金 × 杠杆) + position_value = margin * self.leverage + logger.debug(f"使用 LLM 决策保证金: ${margin:.2f}, 持仓价值: ${position_value:.2f}") + else: + # 回退到动态仓位计算 + position_size = signal.get('position_size', 'light') + margin, position_value = self._calculate_dynamic_position(position_size, symbol) if margin <= 0: logger.info(f"无可用保证金: {symbol} | 当前杠杆已达上限") @@ -1150,6 +1160,7 @@ class PaperTradingService: 'realized_pnl': round(realized_pnl, 2), 'current_balance': round(current_balance, 2), 'used_margin': round(used_margin, 2), + 'available': round(available_margin, 2), # 添加 available 字段,兼容性 'available_margin': round(available_margin, 2), 'leverage': self.leverage, 'margin_per_order': self.margin_per_order, diff --git a/backend/app/services/real_trading_service.py b/backend/app/services/real_trading_service.py index 358db0c..ba9b8f2 100644 --- a/backend/app/services/real_trading_service.py +++ b/backend/app/services/real_trading_service.py @@ -342,45 +342,64 @@ class RealTradingService: 'message': f'当前杠杆已达 {current_leverage:.1f}x,已超最大限制 {max_total_leverage}x,无法开仓' } - # 根据可用杠杆空间动态调整仓位比例 - custom_ratios = { - 'heavy': 0.12, # heavy: 12% 可用杠杆空间 - 'medium': 0.06, # medium: 6% 可用杠杆空间 - 'light': 0.03 # light: 3% 可用杠杆空间 - } + # 优先使用信号中的 quantity(LLM 决策的保证金金额) + quantity_from_signal = signal.get('quantity') + if quantity_from_signal is not None and quantity_from_signal > 0: + # LLM 决策的 quantity 是保证金金额 + margin = float(quantity_from_signal) + # 计算持仓价值(保证金 × 杠杆) + position_value = margin * self.default_leverage + logger.info(f"使用 LLM 决策保证金: ${margin:.2f}, 持仓价值: ${position_value:.2f}") - # 计算仓位(使用统一的仓位管理器) - margin, position_value = calculate_real_position( - balance=balance, - used_margin=used_margin, - total_position_value=total_position_value, - position_size=position_size, - symbol=symbol, - max_leverage=int(available_leverage), # 使用可用杠杆空间 - custom_ratios=custom_ratios - ) - - if margin <= 0 or position_value <= 0: - return { - 'success': False, - 'message': '无法开仓:仓位计算失败或已达杠杆限制' - } - - # 再次验证:加仓后的总杠杆不超过 20 倍 - new_total_value = total_position_value + position_value - new_leverage = new_total_value / balance if balance > 0 else 0 - if new_leverage > max_total_leverage: - # 调整仓位大小到安全范围内 - safe_position_value = balance * max_total_leverage - total_position_value - if safe_position_value > 0: - position_value = safe_position_value - margin = position_value / self.default_leverage - logger.warning(f"仓位已调整,确保总杠杆不超过 {max_total_leverage}x") - else: + # 验证:加仓后的总杠杆不超过 20 倍 + new_total_value = total_position_value + position_value + new_leverage = new_total_value / balance if balance > 0 else 0 + if new_leverage > max_total_leverage: return { 'success': False, - 'message': f'当前杠杆 {current_leverage:.1f}x,无法再加仓' + 'message': f'LLM 决策会导致总杠杆 {new_leverage:.1f}x 超过限制 {max_total_leverage}x (保证金 ${margin:.2f}, 持仓价值 ${position_value:.2f})' } + else: + # 回退到动态仓位计算 + # 根据可用杠杆空间动态调整仓位比例 + custom_ratios = { + 'heavy': 0.12, # heavy: 12% 可用杠杆空间 + 'medium': 0.06, # medium: 6% 可用杠杆空间 + 'light': 0.03 # light: 3% 可用杠杆空间 + } + + # 计算仓位(使用统一的仓位管理器) + margin, position_value = calculate_real_position( + balance=balance, + used_margin=used_margin, + total_position_value=total_position_value, + position_size=position_size, + symbol=symbol, + max_leverage=int(available_leverage), # 使用可用杠杆空间 + custom_ratios=custom_ratios + ) + + if margin <= 0 or position_value <= 0: + return { + 'success': False, + 'message': '无法开仓:仓位计算失败或已达杠杆限制' + } + + # 再次验证:加仓后的总杠杆不超过 20 倍 + new_total_value = total_position_value + position_value + new_leverage = new_total_value / balance if balance > 0 else 0 + if new_leverage > max_total_leverage: + # 调整仓位大小到安全范围内 + safe_position_value = balance * max_total_leverage - total_position_value + if safe_position_value > 0: + position_value = safe_position_value + margin = position_value / self.default_leverage + logger.warning(f"仓位已调整,确保总杠杆不超过 {max_total_leverage}x") + else: + return { + 'success': False, + 'message': f'当前杠杆 {current_leverage:.1f}x,无法再加仓' + } quantity = position_value # 订单数量(以 USDT 计价) diff --git a/backend/app/services/signal_database_service.py b/backend/app/services/signal_database_service.py index a173018..63133fc 100644 --- a/backend/app/services/signal_database_service.py +++ b/backend/app/services/signal_database_service.py @@ -33,6 +33,23 @@ class SignalDatabaseService: """添加信号到数据库""" db = self.db_service.get_session() try: + # 清理价格字段 - 移除 $ 符号和逗号 + def clean_price(price_value): + """清理价格字段,转换为 float""" + if price_value is None: + return None + if isinstance(price_value, (int, float)): + return float(price_value) + if isinstance(price_value, str): + # 移除 $ 符号和逗号 + cleaned = price_value.replace('$', '').replace(',', '').strip() + if cleaned: + try: + return float(cleaned) + except ValueError: + return None + return None + # 创建信号对象 signal = TradingSignal( signal_type=signal_data.get('signal_type', 'crypto'), @@ -40,10 +57,10 @@ class SignalDatabaseService: action=signal_data.get('action', 'hold'), grade=signal_data.get('grade', 'D'), confidence=signal_data.get('confidence', 0), - entry_price=signal_data.get('entry_price'), - stop_loss=signal_data.get('stop_loss'), - take_profit=signal_data.get('take_profit'), - current_price=signal_data.get('current_price'), + entry_price=clean_price(signal_data.get('entry_price')), + stop_loss=clean_price(signal_data.get('stop_loss')), + take_profit=clean_price(signal_data.get('take_profit')), + current_price=clean_price(signal_data.get('current_price')), signal_type_detail=signal_data.get('type'), entry_type=signal_data.get('entry_type'), position_size=signal_data.get('position_size'),