""" 实盘交易服务 - Bitget 合约交易 提供与模拟交易服务类似的接口,但执行的是真实交易 集成 LLM 仓位管理决策 """ import uuid from datetime import datetime, timedelta from typing import Dict, Any, List, Optional from app.models.real_trading import RealOrder from app.models.paper_trading import OrderStatus, OrderSide, SignalGrade, EntryType from app.services.db_service import db_service from app.services.position_manager import calculate_real_position from app.config import get_settings from app.utils.logger import logger class RealTradingService: """实盘交易服务""" def __init__(self): """初始化实盘交易服务""" self.settings = get_settings() self.active_orders: Dict[str, RealOrder] = {} # 内存缓存活跃订单 # 实盘交易配置 self.max_single_position = self.settings.real_trading_max_single_position self.max_total_ratio = self.settings.real_trading_max_total_ratio self.default_leverage = self.settings.real_trading_default_leverage self.risk_per_trade = self.settings.real_trading_risk_per_trade self.max_orders = self.settings.real_trading_max_orders # 自动交易开关(从数据库加载) self.auto_trading_enabled = self._load_auto_trading_status() # 获取交易 API (使用 CCXT SDK 版本) from app.services.bitget_trading_api_sdk import get_bitget_trading_api self.trading_api = get_bitget_trading_api() if not self.trading_api: logger.error("Bitget 交易 API 未初始化,实盘交易功能不可用") return # 确保表已创建 self._ensure_table_exists() # 加载活跃订单 self._load_active_orders() logger.info(f"实盘交易服务初始化完成(最大单笔: ${self.max_single_position}," f"杠杆: {self.default_leverage}x,最大持仓: {self.max_orders}," f"自动交易: {'启用' if self.auto_trading_enabled else '禁用'})") def _ensure_table_exists(self): """确保数据表已创建""" from app.models.real_trading import RealOrder from app.models.database import Base from sqlalchemy import text Base.metadata.create_all(bind=db_service.engine) # 创建自动交易开关表(使用简单的文本检查而不是 ORM) db = db_service.get_session() try: # 检查表是否存在 result = db.execute(text(""" SELECT name FROM sqlite_master WHERE type='table' AND name='real_trading_settings' """)).fetchone() if not result: # 表不存在,创建表 db.execute(text(""" CREATE TABLE real_trading_settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """)) db.commit() # 初始化自动交易开关 db.execute(text(""" INSERT INTO real_trading_settings (key, value) VALUES ('auto_trading_enabled', '0') """)) db.commit() logger.info("创建实盘交易设置表") except Exception as e: logger.warning(f"创建设置表失败: {e}") db.rollback() finally: db.close() def _load_auto_trading_status(self) -> bool: """从数据库加载自动交易开关状态""" db = db_service.get_session() try: from sqlalchemy import text result = db.execute(text("SELECT value FROM real_trading_settings WHERE key = 'auto_trading_enabled'")).fetchone() if result: return result[0] == '1' return False except Exception as e: logger.warning(f"加载自动交易状态失败: {e}") return False finally: db.close() def set_auto_trading(self, enabled: bool) -> bool: """设置自动交易开关""" db = db_service.get_session() try: from sqlalchemy import text db.execute(text(""" UPDATE real_trading_settings SET value = :value, updated_at = CURRENT_TIMESTAMP WHERE key = 'auto_trading_enabled' """), {'value': '1' if enabled else '0'}) db.commit() self.auto_trading_enabled = enabled logger.info(f"实盘自动交易已{'启用' if enabled else '禁用'}") return True except Exception as e: logger.error(f"设置自动交易失败: {e}") db.rollback() return False finally: db.close() def get_auto_trading_status(self) -> bool: """获取自动交易状态""" return self.auto_trading_enabled def _load_active_orders(self): """从数据库加载活跃订单""" db = db_service.get_session() try: orders = db.query(RealOrder).filter( RealOrder.status.in_([OrderStatus.PENDING, OrderStatus.OPEN]) ).all() for order in orders: self.active_orders[order.order_id] = order logger.info(f"实盘交易: 加载了 {len(orders)} 个活跃订单") finally: db.close() def create_order_from_signal(self, signal: Dict[str, Any], current_price: float = None) -> Dict[str, Any]: """ 从信号创建实盘订单(集成 LLM 仓位管理) Args: signal: LLM 分析信号 - symbol: 交易对 - side: 'long' or 'short' - entry_type: 'market' or 'limit' - entry_price: 入场价 - stop_loss: 止损价 - take_profit: 止盈价 - grade: 信号等级 - confidence: 置信度 - position_size: LLM 建议的仓位大小 ('heavy', 'medium', 'light') current_price: 当前价格 Returns: 创建结果 """ # 检查自动交易开关 if not self.auto_trading_enabled: logger.info(f"实盘自动交易已禁用,跳过信号执行") return { 'success': False, 'message': '实盘自动交易已禁用', 'skipped': True } if not self.trading_api: return { 'success': False, 'message': '交易 API 未初始化' } db = db_service.get_session() result = { 'success': False, 'message': '', 'order_id': None } try: # 检查实盘交易是否启用 if not self.settings.real_trading_enabled: return { 'success': False, 'message': '实盘交易未启用,请检查配置 REAL_TRADING_ENABLED=true' } # 获取信号信息 symbol = signal.get('symbol') side = signal.get('side') # 'long' or 'short' entry_type = signal.get('entry_type', 'market') # 'market' or 'limit' entry_price = signal.get('entry_price') stop_loss = signal.get('stop_loss') take_profit = signal.get('take_profit') grade = signal.get('grade', 'D') confidence = signal.get('confidence', 0) position_size = signal.get('position_size', 'light') # LLM 建议的仓位大小 # 验证必需参数 if not all([symbol, side, stop_loss, take_profit]): return { 'success': False, 'message': '信号缺少必需参数' } # 获取当前价格 if not current_price: from app.services.bitget_service import bitget_service current_price = bitget_service.get_current_price(symbol) if not current_price: return { 'success': False, 'message': '无法获取当前价格' } # 设置入场价 if entry_type == 'market': entry_price = current_price elif not entry_price: entry_price = current_price # 风险检查 - 检查订单数量 if len(self.active_orders) >= self.max_orders: return { 'success': False, 'message': f'已达最大持仓数 {self.max_orders}' } # 获取账户状态 account = self.get_account_status() balance = account['current_balance'] available = account['available'] used_margin = account['used_margin'] total_position_value = account['total_position_value'] if available < 10: return { 'success': False, 'message': f'可用余额不足 (${available:.2f})' } # === 使用 LLM 建议的仓位大小计算仓位 === # 实盘使用更保守的比例配置 custom_ratios = { 'heavy': 0.15, # 激进方案:heavy 15%(模拟盘是 30%) 'medium': 0.08, # 激进方案:medium 8%(模拟盘是 15%) 'light': 0.03 # 激进方案:light 3%(模拟盘是 5%) } # 计算仓位(使用统一的仓位管理器) margin, position_value = calculate_real_position( balance=balance, used_margin=used_margin, total_position_value=total_position_value, position_size=position_size, symbol=symbol, max_leverage=self.default_leverage, custom_ratios=custom_ratios ) if margin <= 0 or position_value <= 0: return { 'success': False, 'message': '无法开仓:仓位计算失败或已达杠杆限制' } quantity = position_value # 订单数量(以 USDT 计价) # 最小仓位限制(100 美金测试,最小 5 USDT) min_quantity = 5 if quantity < min_quantity: return { 'success': False, 'message': f'计算仓位 ${quantity:.2f} 小于最小值 ${min_quantity}' } # 创建订单对象 order_id = str(uuid.uuid4()) order = RealOrder( order_id=order_id, symbol=symbol, side=OrderSide.LONG if side == 'long' else OrderSide.SHORT, entry_price=entry_price, stop_loss=stop_loss, take_profit=take_profit, quantity=quantity, leverage=self.default_leverage, signal_grade=SignalGrade[grade.upper()] if grade.upper() in ['A', 'B', 'C', 'D'] else SignalGrade.D, signal_type=signal.get('signal_type', 'swing'), confidence=confidence, trend=signal.get('trend'), entry_type=EntryType.MARKET if entry_type == 'market' else EntryType.LIMIT, status=OrderStatus.OPEN if entry_type == 'market' else OrderStatus.PENDING ) db.add(order) db.commit() db.refresh(order) # 调用 Bitget API 下单 exchange_result = self._place_bitget_order(order, current_price) if exchange_result['success']: # 更新订单状态 order.exchange_order_id = exchange_result.get('order_id') order.client_order_id = exchange_result.get('client_order_id') order.filled_price = exchange_result.get('filled_price', entry_price) order.filled_at = datetime.now() order.status = OrderStatus.OPEN db.commit() # 添加到内存缓存 self.active_orders[order_id] = order result['success'] = True result['message'] = f'实盘订单创建成功 (仓位: {position_size})' result['order_id'] = order_id result['exchange_order_id'] = exchange_result.get('order_id') result['position_size'] = position_size result['quantity'] = quantity logger.info(f"✅ 实盘订单创建成功: {symbol} {side} ${entry_price} | " f"仓位: {position_size} | 数量: ${quantity:.2f} | " f"-> {exchange_result.get('order_id')}") else: # 下单失败,删除记录 db.delete(order) db.commit() result['message'] = f"下单失败: {exchange_result.get('message', '未知错误')}" logger.error(f"❌ 实盘订单下单失败: {exchange_result.get('message')}") except Exception as e: db.rollback() result['message'] = f"创建订单失败: {str(e)}" logger.error(f"创建实盘订单失败: {e}") finally: db.close() return result def _place_bitget_order(self, order: RealOrder, current_price: float) -> Dict: """ 调用 Bitget API 下单 (使用 CCXT SDK) Returns: {'success': bool, 'order_id': str, 'client_order_id': str, 'filled_price': float, 'message': str} """ try: # CCXT 使用标准的 buy/sell 方向 # 对于合约:buy = 开多/平空,sell = 开空/平多 # 这里我们简化为:long = buy, short = sell side_map = { OrderSide.LONG: 'buy', OrderSide.SHORT: 'sell' } # 映射订单类型 order_type = 'market' if order.entry_type == EntryType.MARKET else 'limit' # 计算合约数量(张数) # Bitget U本位合约:1张 = 1 USDT(大多数情况) size = order.quantity # 简化处理,实际应该根据合约规格计算 # 生成自定义订单ID client_order_id = f"real_{order.order_id[:8]}" # 调用 API 下单 result = self.trading_api.place_order( symbol=order.symbol, side=side_map[order.side], order_type=order_type, size=size, price=order.entry_price if order_type == 'limit' else None, client_order_id=client_order_id ) if result: # CCXT 返回的订单对象格式 # result['id'] 是交易所订单ID # result['price'] 是委托价格 # result['average'] 是成交均价(如果已成交) order_id = result.get('id') filled_price = float(result.get('average', 0)) or current_price return { 'success': True, 'order_id': order_id, 'client_order_id': client_order_id, 'filled_price': filled_price } else: return { 'success': False, 'message': 'API 调用失败' } except Exception as e: logger.error(f"Bitget 下单失败: {e}") return { 'success': False, 'message': str(e) } def get_active_orders(self) -> List[Dict]: """获取活跃订单列表""" return [order.to_dict() for order in self.active_orders.values()] def get_order(self, order_id: str) -> Optional[Dict]: """获取指定订单""" order = self.active_orders.get(order_id) if order: return order.to_dict() return None def sync_positions_from_exchange(self) -> List[Dict]: """ 从交易所同步持仓状态 Returns: 同步后的持仓列表 """ if not self.trading_api: return [] try: # 获取交易所实际持仓 positions = self.trading_api.get_position() logger.info(f"从交易所同步了 {len(positions)} 个持仓") return positions except Exception as e: logger.error(f"同步持仓失败: {e}") return [] def get_account_status(self) -> Dict: """获取账户状态""" if not self.trading_api: return { 'current_balance': 0, 'used_margin': 0, 'total_position_value': 0, 'available': 0 } try: balance_info = self.trading_api.get_balance() usdt_info = balance_info.get('USDT', {}) available = float(usdt_info.get('available', 0)) frozen = float(usdt_info.get('frozen', 0)) locked = float(usdt_info.get('locked', 0)) # 计算持仓价值 total_position_value = 0 for order in self.active_orders.values(): if order.status == OrderStatus.OPEN: total_position_value += order.quantity return { 'current_balance': available + frozen + locked, 'available': available, 'used_margin': frozen + locked, 'total_position_value': total_position_value } except Exception as e: logger.error(f"获取账户状态失败: {e}") return { 'current_balance': 0, 'used_margin': 0, 'total_position_value': 0, 'available': 0 } def get_position_info(self) -> Dict[str, Any]: """ 获取当前持仓信息(供 LLM 分析使用) Returns: 持仓信息字典 """ account = self.get_account_status() active_orders = self.get_active_orders() # 计算当前杠杆 balance = account['current_balance'] total_position_value = account['total_position_value'] current_leverage = total_position_value / balance if balance > 0 else 0 # 格式化持仓列表 positions = [] for order in active_orders: positions.append({ 'symbol': order.get('symbol'), 'side': order.get('side'), 'status': order.get('status'), 'entry_price': order.get('filled_price') or order.get('entry_price'), 'quantity': order.get('quantity'), 'pnl_percent': order.get('pnl_percent', 0) }) return { 'account_balance': balance, 'total_position_value': total_position_value, 'current_leverage': current_leverage, 'max_leverage': self.default_leverage, 'active_order_count': len(active_orders), 'max_orders': self.max_orders, 'positions': positions } # 全局实例 _real_trading_service: Optional[RealTradingService] = None def get_real_trading_service() -> Optional[RealTradingService]: """ 获取实盘交易服务实例(单例) 注意:不再检查 REAL_TRADING_ENABLED 配置 只要 API 配置了就初始化服务,自动交易可以单独控制 Returns: RealTradingService 实例或 None(如果未配置 API) """ global _real_trading_service if _real_trading_service: return _real_trading_service settings = get_settings() # 检查是否配置了 API Key(不再检查 REAL_TRADING_ENABLED) if not settings.bitget_api_key or not settings.bitget_api_secret: logger.warning("Bitget API Key 未配置,实盘交易功能不可用") return None _real_trading_service = RealTradingService() return _real_trading_service