""" 模拟交易服务 - 订单管理和盈亏统计 """ import uuid from datetime import datetime, timedelta from typing import Dict, Any, List, Optional from app.models.paper_trading import PaperOrder, OrderStatus, OrderSide, SignalGrade, EntryType from app.services.db_service import db_service from app.config import get_settings from app.utils.logger import logger class PaperTradingService: """模拟交易服务""" def __init__(self): """初始化模拟交易服务""" self.settings = get_settings() self.active_orders: Dict[str, PaperOrder] = {} # 内存缓存活跃订单 # 合约交易配置 self.initial_balance = self.settings.paper_trading_initial_balance # 初始本金 self.leverage = self.settings.paper_trading_leverage # 杠杆倍数 self.margin_per_order = self.settings.paper_trading_margin_per_order # 每单保证金 self.max_orders = self.settings.paper_trading_max_orders # 最大订单数 self.auto_close_opposite = self.settings.paper_trading_auto_close_opposite # 是否自动平掉反向持仓 self.breakeven_threshold = self.settings.paper_trading_breakeven_threshold # 保本止损触发阈值 # 确保表已创建 self._ensure_table_exists() # 加载活跃订单到内存 self._load_active_orders() logger.info(f"模拟交易服务初始化完成(自动平反向持仓: {'启用' if self.auto_close_opposite else '禁用'},保本止损阈值: {self.breakeven_threshold}%)") def _ensure_table_exists(self): """确保数据表已创建,并迁移新字段""" from app.models.paper_trading import PaperOrder from app.models.database import Base from sqlalchemy import text Base.metadata.create_all(bind=db_service.engine) # 检查并添加新字段 breakeven_triggered db = db_service.get_session() try: # 尝试查询新字段,如果失败则添加 db.execute(text("SELECT breakeven_triggered FROM paper_orders LIMIT 1")) except Exception: try: db.execute(text("ALTER TABLE paper_orders ADD COLUMN breakeven_triggered INTEGER DEFAULT 0")) db.commit() logger.info("数据库迁移: 添加 breakeven_triggered 字段") except Exception as e: logger.warning(f"添加 breakeven_triggered 字段失败(可能已存在): {e}") db.rollback() finally: db.close() def _load_active_orders(self): """从数据库加载活跃订单到内存""" db = db_service.get_session() try: orders = db.query(PaperOrder).filter( PaperOrder.status.in_([OrderStatus.PENDING, OrderStatus.OPEN]) ).all() # 使用 make_transient 将对象从会话中分离,使其成为独立对象 from sqlalchemy.orm import make_transient for order in orders: db.expunge(order) # 从会话中移除 make_transient(order) # 使对象独立 self.active_orders[order.order_id] = order logger.info(f"已加载 {len(orders)} 个活跃订单") except Exception as e: logger.error(f"加载活跃订单失败: {e}") finally: db.close() def create_order_from_signal(self, signal: Dict[str, Any], current_price: float = None) -> Dict[str, Any]: """ 从交易信号创建模拟订单 Args: signal: 交易信号 - symbol: 交易对 - action: 'buy' 或 'sell' - entry_type: 'market' 或 'limit' - price / entry_price: 入场价 - stop_loss: 止损价 - take_profit: 止盈价 - confidence: 置信度 - signal_grade / grade: 信号等级 - signal_type / type: 信号类型 - reason: 入场原因 current_price: 当前价格(用于市价单) Returns: 包含 'order' 和 'cancelled_orders' 的字典 """ result = {'order': None, 'cancelled_orders': []} action = signal.get('action') if action not in ['buy', 'sell']: return result symbol = signal.get('symbol', 'UNKNOWN') side = OrderSide.LONG if action == 'buy' else OrderSide.SHORT entry_price = signal.get('entry_price') or signal.get('price', 0) # === 反向订单处理 === # 1. 总是取消同一交易对的反向挂单(混合策略) cancelled_orders = self._cancel_opposite_pending_orders(symbol, side) result['cancelled_orders'] = cancelled_orders # 2. 可选:智能平掉反向持仓(需要配置启用) if self.auto_close_opposite and current_price: grade = signal.get('signal_grade') or signal.get('grade', 'D') self._close_opposite_positions(symbol, side, grade, current_price) # === 限制检查 === # 1. 检查总订单数(持仓+挂单)是否超过最大限制 total_orders = len(self.active_orders) if total_orders >= self.max_orders: logger.info(f"订单限制: 已达到最大订单数 {self.max_orders},跳过") return result # 2. 检查是否有接近的挂单(价格差距 < 1%) same_direction_orders = [ order for order in self.active_orders.values() if order.symbol == symbol and order.side == side ] pending_orders = [ order for order in same_direction_orders if order.status == OrderStatus.PENDING ] for pending in pending_orders: price_diff = abs(pending.entry_price - entry_price) / pending.entry_price if price_diff < 0.01: # 价格差距小于 1% logger.info(f"订单限制: {symbol} 已有接近的挂单 @ ${pending.entry_price:,.2f},新信号 @ ${entry_price:,.2f},跳过") return result # 获取信号等级 grade = signal.get('signal_grade') or signal.get('grade', 'D') if grade == 'D': logger.info(f"D级信号不开仓: {signal.get('symbol')}") return result # === 动态仓位计算 === position_size = signal.get('position_size', 'light') margin, position_value = self._calculate_dynamic_position(position_size, symbol) if margin <= 0: logger.info(f"无可用保证金: {symbol} | 当前杠杆已达上限") return result quantity = position_value # 订单数量(以 USDT 计价) # 确定入场类型 entry_type_str = signal.get('entry_type', 'market') entry_type = EntryType.LIMIT if entry_type_str == 'limit' else EntryType.MARKET # 生成订单ID order_id = f"PT-{symbol}-{datetime.now().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:6]}" # 确定订单状态和成交价 if entry_type == EntryType.MARKET: # 现价单:立即开仓 status = OrderStatus.OPEN filled_price = current_price if current_price else entry_price opened_at = datetime.utcnow() else: # 挂单:等待触发 status = OrderStatus.PENDING filled_price = None opened_at = None db = db_service.get_session() try: order = PaperOrder( order_id=order_id, symbol=symbol, side=side, entry_price=entry_price, stop_loss=signal.get('stop_loss', 0), take_profit=signal.get('take_profit', 0), filled_price=filled_price, quantity=quantity, signal_grade=SignalGrade(grade), signal_type=signal.get('signal_type') or signal.get('type', 'swing'), confidence=signal.get('confidence', 0), trend=signal.get('trend'), entry_type=entry_type, status=status, opened_at=opened_at, entry_reasons=[signal.get('reason', '')] if signal.get('reason') else signal.get('reasons', []), indicators=signal.get('indicators', {}) ) db.add(order) db.commit() db.refresh(order) # 添加到活跃订单缓存 self.active_orders[order.order_id] = order entry_type_text = "现价" if entry_type == EntryType.MARKET else "挂单" status_text = "已开仓" if status == OrderStatus.OPEN else "等待触发" logger.info(f"创建模拟订单: {order_id} | {symbol} {side.value} [{entry_type_text}] @ ${entry_price:,.2f} | {status_text}") logger.info(f" 保证金: ${margin:,.0f} | 杠杆: {self.leverage}x | 持仓价值: ${position_value:,.0f} | 当前订单数: {len(self.active_orders)}/{self.max_orders}") result['order'] = order return result except Exception as e: logger.error(f"创建模拟订单失败: {e}") db.rollback() return result finally: db.close() def _calculate_dynamic_position(self, position_size: str, symbol: str) -> tuple: """ 根据 LLM 建议的仓位大小计算实际保证金和持仓价值 Args: position_size: 'heavy' / 'medium' / 'light' symbol: 交易对 Returns: (margin, position_value) 元组 """ # 获取当前账户状态 account = self.get_account_status() balance = account['current_balance'] used_margin = account['used_margin'] max_leverage = self.leverage # 最大杠杆 20x # 计算可用保证金空间 # 全仓模式下:最大持仓价值 = 余额 × 最大杠杆 max_position_value = balance * max_leverage current_position_value = account['total_position_value'] available_position_value = max_position_value - current_position_value if available_position_value <= 0: logger.warning(f"已达最大杠杆限制,无法开仓") return 0, 0 # 根据 position_size 确定仓位比例 # heavy: 可用空间的 30% # medium: 可用空间的 15% # light: 可用空间的 5% size_ratio = { 'heavy': 0.30, 'medium': 0.15, 'light': 0.05 }.get(position_size, 0.05) # 计算目标持仓价值 target_position_value = available_position_value * size_ratio # 设置最小和最大限制 min_position_value = 1000 # 最小持仓价值 1000 USDT max_single_position = balance * 5 # 单笔最大不超过 5x 杠杆 position_value = max(min_position_value, min(target_position_value, max_single_position)) # 确保不超过可用空间 position_value = min(position_value, available_position_value) # 计算对应的保证金 margin = position_value / max_leverage logger.info(f"动态仓位计算: {position_size} | 可用空间: ${available_position_value:,.0f} | " f"目标仓位: ${position_value:,.0f} | 保证金: ${margin:,.0f}") return margin, position_value def get_position_info(self) -> Dict[str, Any]: """ 获取当前持仓信息(供 LLM 分析使用) Returns: 持仓信息字典 """ account = self.get_account_status() active_orders = self.get_active_orders() # 计算当前杠杆 balance = account['current_balance'] total_position_value = account['total_position_value'] current_leverage = total_position_value / balance if balance > 0 else 0 # 格式化持仓列表 positions = [] for order in active_orders: positions.append({ 'symbol': order.get('symbol'), 'side': order.get('side'), 'status': order.get('status'), 'entry_price': order.get('filled_price') or order.get('entry_price'), 'quantity': order.get('quantity'), 'pnl_percent': order.get('pnl_percent', 0) }) return { 'account_balance': balance, 'total_position_value': total_position_value, 'current_leverage': current_leverage, 'max_leverage': self.leverage, 'positions': positions } def check_price_triggers(self, symbol: str, current_price: float) -> List[Dict[str, Any]]: """ 检查当前价格是否触发挂单入场或止盈止损 Args: symbol: 交易对 current_price: 当前价格 Returns: 触发的订单结果列表(包括挂单激活和平仓结果) """ triggered = [] activated_order_ids = set() # 记录本轮刚激活的订单,避免立即检查止盈止损 # 1. 检查挂单是否触发入场 pending_orders = [ order for order in self.active_orders.values() if order.symbol == symbol and order.status == OrderStatus.PENDING ] for order in pending_orders: result = self._check_pending_entry(order, current_price) if result: triggered.append(result) activated_order_ids.add(order.order_id) logger.info(f"挂单触发入场: {order.order_id} | {symbol} @ ${current_price:,.2f}") # 2. 检查持仓订单是否触发止盈止损(跳过本轮刚激活的订单) open_orders = [ order for order in self.active_orders.values() if order.symbol == symbol and order.status == OrderStatus.OPEN and order.order_id not in activated_order_ids ] for order in open_orders: result = self._check_order_trigger(order, current_price) if result: triggered.append(result) else: # 更新最大回撤和最大盈利,并检查保本止损 breakeven_result = self._update_order_extremes(order, current_price) if breakeven_result: triggered.append(breakeven_result) return triggered def _check_pending_entry(self, order: PaperOrder, current_price: float) -> Optional[Dict[str, Any]]: """ 检查挂单是否触发入场 做多挂单:价格下跌到入场价时触发(买入) 做空挂单:价格上涨到入场价时触发(卖出) Returns: 如果触发,返回激活结果字典;否则返回 None """ should_trigger = False if order.side == OrderSide.LONG: # 做多:价格 <= 入场价 触发 if current_price <= order.entry_price: should_trigger = True else: # 做空:价格 >= 入场价 触发 if current_price >= order.entry_price: should_trigger = True if should_trigger: return self._activate_pending_order(order, current_price) return None def _activate_pending_order(self, order: PaperOrder, filled_price: float) -> Optional[Dict[str, Any]]: """ 激活挂单,转为持仓 Returns: 激活结果字典,包含订单信息 """ db = db_service.get_session() try: # 从数据库重新查询订单,确保在当前会话中 db_order = db.query(PaperOrder).filter(PaperOrder.order_id == order.order_id).first() if not db_order: logger.error(f"数据库中未找到订单: {order.order_id}") return None # 更新订单状态 db_order.status = OrderStatus.OPEN db_order.filled_price = filled_price db_order.opened_at = datetime.utcnow() db.commit() db.refresh(db_order) # 使用 make_transient 将对象从会话中分离,使其成为独立对象 from sqlalchemy.orm import make_transient db.expunge(db_order) # 从会话中移除 make_transient(db_order) # 使对象独立 # 用独立的对象替换内存缓存中的旧对象 self.active_orders[order.order_id] = db_order logger.info(f"挂单已激活: {order.order_id} | {order.symbol} {order.side.value} @ ${filled_price:,.2f}") # 返回激活结果 return { 'event_type': 'order_filled', 'order_id': order.order_id, 'symbol': order.symbol, 'side': order.side.value, 'entry_price': order.entry_price, # 挂单价 'filled_price': filled_price, 'quantity': order.quantity, 'signal_grade': order.signal_grade.value if order.signal_grade else None, 'stop_loss': order.stop_loss, 'take_profit': order.take_profit } except Exception as e: logger.error(f"激活挂单失败: {e}") import traceback logger.error(traceback.format_exc()) db.rollback() return None finally: db.close() def _check_order_trigger(self, order: PaperOrder, current_price: float) -> Optional[Dict[str, Any]]: """检查单个订单是否触发""" triggered = False new_status = None exit_price = current_price if order.side == OrderSide.LONG: # 做多: 价格 >= 止盈价 触发止盈, 价格 <= 止损价 触发止损 if current_price >= order.take_profit: triggered = True new_status = OrderStatus.CLOSED_TP exit_price = order.take_profit elif current_price <= order.stop_loss: triggered = True # 通过标记判断是否是保本止损 if getattr(order, 'breakeven_triggered', 0) == 1: new_status = OrderStatus.CLOSED_BE else: new_status = OrderStatus.CLOSED_SL exit_price = order.stop_loss else: # 做空: 价格 <= 止盈价 触发止盈, 价格 >= 止损价 触发止损 if current_price <= order.take_profit: triggered = True new_status = OrderStatus.CLOSED_TP exit_price = order.take_profit elif current_price >= order.stop_loss: triggered = True # 通过标记判断是否是保本止损 if getattr(order, 'breakeven_triggered', 0) == 1: new_status = OrderStatus.CLOSED_BE else: new_status = OrderStatus.CLOSED_SL exit_price = order.stop_loss if triggered: return self._close_order(order, new_status, exit_price) return None def _close_order(self, order: PaperOrder, status: OrderStatus, exit_price: float) -> Dict[str, Any]: """平仓并计算盈亏""" db = db_service.get_session() try: # 从数据库重新查询订单,确保在当前会话中 db_order = db.query(PaperOrder).filter(PaperOrder.order_id == order.order_id).first() if not db_order: logger.error(f"数据库中未找到订单: {order.order_id}") return None # 计算盈亏 if db_order.side == OrderSide.LONG: pnl_percent = ((exit_price - db_order.filled_price) / db_order.filled_price) * 100 else: pnl_percent = ((db_order.filled_price - exit_price) / db_order.filled_price) * 100 pnl_amount = db_order.quantity * pnl_percent / 100 # 计算持仓时间 hold_duration = datetime.utcnow() - db_order.opened_at if db_order.opened_at else timedelta(0) # 更新订单 db_order.status = status db_order.exit_price = exit_price db_order.closed_at = datetime.utcnow() db_order.pnl_amount = round(pnl_amount, 2) db_order.pnl_percent = round(pnl_percent, 4) db.commit() db.refresh(db_order) # 从活跃订单缓存中移除 if order.order_id in self.active_orders: del self.active_orders[order.order_id] result = { 'order_id': db_order.order_id, 'symbol': db_order.symbol, 'side': db_order.side.value, 'status': status.value, 'entry_price': db_order.filled_price, 'exit_price': exit_price, 'quantity': db_order.quantity, 'pnl_amount': db_order.pnl_amount, 'pnl_percent': db_order.pnl_percent, 'is_win': pnl_amount > 0, 'hold_duration': str(hold_duration).split('.')[0], # 去掉微秒 'signal_grade': db_order.signal_grade.value if db_order.signal_grade else None } status_text = {"closed_tp": "止盈", "closed_sl": "止损", "closed_be": "保本止损"}.get(status.value, "平仓") logger.info(f"订单{status_text}: {db_order.order_id} | {db_order.symbol} | 盈亏: {pnl_percent:+.2f}% (${pnl_amount:+.2f})") return result except Exception as e: logger.error(f"平仓失败: {e}") import traceback logger.error(traceback.format_exc()) db.rollback() return None finally: db.close() def _update_order_extremes(self, order: PaperOrder, current_price: float) -> Optional[Dict[str, Any]]: """更新订单的最大回撤和最大盈利,并检查是否需要移动止损到保本""" if order.side == OrderSide.LONG: current_pnl_percent = ((current_price - order.filled_price) / order.filled_price) * 100 else: current_pnl_percent = ((order.filled_price - current_price) / order.filled_price) * 100 # 检查是否需要更新极值 needs_update = False breakeven_triggered = False if current_pnl_percent > order.max_profit: order.max_profit = current_pnl_percent needs_update = True if current_pnl_percent < order.max_drawdown: order.max_drawdown = current_pnl_percent needs_update = True # 保本止损逻辑:当盈利达到阈值时,将止损移动到开仓价 if self.breakeven_threshold > 0 and current_pnl_percent >= self.breakeven_threshold: # 检查止损是否还没有移动到保本位(通过标记判断) if getattr(order, 'breakeven_triggered', 0) != 1: if order.side == OrderSide.LONG: # 做多:止损应该低于开仓价,如果止损还在开仓价下方,则移动到开仓价 if order.stop_loss < order.filled_price: order.stop_loss = order.filled_price order.breakeven_triggered = 1 needs_update = True breakeven_triggered = True logger.info(f"保本止损触发: {order.order_id} | {order.symbol} | 盈利 {current_pnl_percent:.2f}% >= {self.breakeven_threshold}% | 止损移至开仓价 ${order.filled_price:,.2f}") else: # 做空:止损应该高于开仓价,如果止损还在开仓价上方,则移动到开仓价 if order.stop_loss > order.filled_price: order.stop_loss = order.filled_price order.breakeven_triggered = 1 needs_update = True breakeven_triggered = True logger.info(f"保本止损触发: {order.order_id} | {order.symbol} | 盈利 {current_pnl_percent:.2f}% >= {self.breakeven_threshold}% | 止损移至开仓价 ${order.filled_price:,.2f}") # 如果有更新,持久化到数据库 if needs_update: db = db_service.get_session() try: db_order = db.query(PaperOrder).filter(PaperOrder.order_id == order.order_id).first() if db_order: db_order.max_profit = order.max_profit db_order.max_drawdown = order.max_drawdown db_order.stop_loss = order.stop_loss db_order.breakeven_triggered = order.breakeven_triggered db.commit() except Exception as e: logger.error(f"更新订单极值失败: {e}") db.rollback() finally: db.close() # 如果触发了保本止损,返回通知信息 if breakeven_triggered: return { 'event_type': 'breakeven_triggered', 'order_id': order.order_id, 'symbol': order.symbol, 'side': order.side.value, 'filled_price': order.filled_price, 'new_stop_loss': order.stop_loss, 'current_pnl_percent': current_pnl_percent } return None def close_order_manual(self, order_id: str, exit_price: float) -> Optional[Dict[str, Any]]: """手动平仓或取消挂单""" if order_id not in self.active_orders: logger.warning(f"订单不存在或已平仓: {order_id}") return None order = self.active_orders[order_id] # 如果是挂单,取消而不是平仓 if order.status == OrderStatus.PENDING: return self._cancel_pending_order(order) return self._close_order(order, OrderStatus.CLOSED_MANUAL, exit_price) def _cancel_pending_order(self, order: PaperOrder) -> Dict[str, Any]: """取消挂单""" db = db_service.get_session() try: order.status = OrderStatus.CANCELLED order.closed_at = datetime.utcnow() db.merge(order) db.commit() # 从活跃订单缓存中移除 if order.order_id in self.active_orders: del self.active_orders[order.order_id] logger.info(f"挂单已取消: {order.order_id} | {order.symbol}") return { 'order_id': order.order_id, 'symbol': order.symbol, 'side': order.side.value, 'status': 'cancelled', 'entry_price': order.entry_price, 'message': '挂单已取消' } except Exception as e: logger.error(f"取消挂单失败: {e}") db.rollback() return None finally: db.close() def _cancel_opposite_pending_orders(self, symbol: str, new_side: OrderSide) -> List[Dict[str, Any]]: """ 取消同一交易对的反向挂单 Args: symbol: 交易对 new_side: 新信号的方向 Returns: 被取消的订单信息列表 """ # 找出所有反向挂单 opposite_side = OrderSide.SHORT if new_side == OrderSide.LONG else OrderSide.LONG opposite_pending = [ order for order in self.active_orders.values() if order.symbol == symbol and order.side == opposite_side and order.status == OrderStatus.PENDING ] if not opposite_pending: return [] cancelled_orders = [] for order in opposite_pending: result = self._cancel_pending_order(order) if result: result['event_type'] = 'pending_cancelled' result['new_side'] = new_side.value result['reason'] = f'收到反向{new_side.value}信号,自动撤销' cancelled_orders.append(result) logger.info(f"自动取消反向挂单: {order.order_id} | {symbol} {opposite_side.value} @ ${order.entry_price:,.2f}") if cancelled_orders: logger.info(f"已取消 {len(cancelled_orders)} 个反向挂单({symbol} {opposite_side.value}),新信号方向: {new_side.value}") return cancelled_orders def _close_opposite_positions(self, symbol: str, new_side: OrderSide, signal_grade: str, current_price: float) -> int: """ 智能平掉反向持仓(可选策略) Args: symbol: 交易对 new_side: 新信号的方向 signal_grade: 新信号等级 current_price: 当前价格 Returns: 平仓的订单数量 """ # 找出所有反向持仓 opposite_side = OrderSide.SHORT if new_side == OrderSide.LONG else OrderSide.LONG opposite_positions = [ order for order in self.active_orders.values() if order.symbol == symbol and order.side == opposite_side and order.status == OrderStatus.OPEN ] if not opposite_positions: return 0 closed_count = 0 for order in opposite_positions: # 计算当前盈亏 if order.side == OrderSide.LONG: pnl_percent = ((current_price - order.filled_price) / order.filled_price) * 100 else: pnl_percent = ((order.filled_price - current_price) / order.filled_price) * 100 # 智能决策:只在特定条件下平仓 should_close = False reason = "" # 条件1:A级信号 + 反向持仓亏损超过3% if signal_grade == 'A' and pnl_percent < -3: should_close = True reason = f"A级反向信号且当前亏损{pnl_percent:.2f}%,提前止损" # 条件2:A级信号 + 反向持仓盈利但接近止损(盈利<1%) elif signal_grade == 'A' and 0 < pnl_percent < 1: should_close = True reason = f"A级反向信号且盈利较小({pnl_percent:.2f}%),落袋为安" if should_close: result = self._close_order(order, OrderStatus.CLOSED_MANUAL, current_price) if result: closed_count += 1 logger.info(f"自动平掉反向持仓: {order.order_id} | {symbol} {opposite_side.value} | 原因: {reason}") if closed_count > 0: logger.info(f"已平掉 {closed_count} 个反向持仓({symbol} {opposite_side.value})") return closed_count def get_active_orders(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]: """获取活跃订单(从数据库读取,确保多进程同步)""" db = db_service.get_session() try: query = db.query(PaperOrder).filter( PaperOrder.status.in_([OrderStatus.PENDING, OrderStatus.OPEN]) ) if symbol: query = query.filter(PaperOrder.symbol == symbol) orders = query.all() # 转换为字典 result = [] for order in orders: try: result.append(order.to_dict()) except Exception as e: logger.error(f"转换订单 {order.order_id} 为字典失败: {e}") import traceback logger.error(traceback.format_exc()) return result except Exception as e: logger.error(f"获取活跃订单失败: {e}") return [] finally: db.close() def get_order_by_id(self, order_id: str) -> Optional[Dict[str, Any]]: """根据ID获取订单""" # 先从缓存查找 if order_id in self.active_orders: try: return self.active_orders[order_id].to_dict() except Exception as e: logger.error(f"转换订单 {order_id} 为字典失败: {e}") import traceback logger.error(traceback.format_exc()) return None # 从数据库查找 db = db_service.get_session() try: order = db.query(PaperOrder).filter(PaperOrder.order_id == order_id).first() return order.to_dict() if order else None finally: db.close() def get_order_history(self, symbol: Optional[str] = None, limit: int = 100) -> List[Dict[str, Any]]: """获取历史订单""" db = db_service.get_session() try: query = db.query(PaperOrder).filter( PaperOrder.status.in_([ OrderStatus.CLOSED_TP, OrderStatus.CLOSED_SL, OrderStatus.CLOSED_BE, OrderStatus.CLOSED_MANUAL ]) ) if symbol: query = query.filter(PaperOrder.symbol == symbol) orders = query.order_by(PaperOrder.closed_at.desc()).limit(limit).all() return [o.to_dict() for o in orders] finally: db.close() def calculate_statistics(self, symbol: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> Dict[str, Any]: """计算交易统计""" db = db_service.get_session() try: query = db.query(PaperOrder).filter( PaperOrder.status.in_([ OrderStatus.CLOSED_TP, OrderStatus.CLOSED_SL, OrderStatus.CLOSED_BE, OrderStatus.CLOSED_MANUAL ]) ) if symbol: query = query.filter(PaperOrder.symbol == symbol) if start_date: query = query.filter(PaperOrder.closed_at >= start_date) if end_date: query = query.filter(PaperOrder.closed_at <= end_date) orders = query.all() if not orders: return self._empty_statistics() # 计算各项指标 total_trades = len(orders) winning_trades = len([o for o in orders if o.pnl_amount > 0]) losing_trades = len([o for o in orders if o.pnl_amount < 0]) total_pnl = sum(o.pnl_amount for o in orders) total_pnl_percent = sum(o.pnl_percent for o in orders) wins = [o.pnl_amount for o in orders if o.pnl_amount > 0] losses = [abs(o.pnl_amount) for o in orders if o.pnl_amount < 0] gross_profit = sum(wins) if wins else 0 gross_loss = sum(losses) if losses else 0 return { 'total_trades': total_trades, 'winning_trades': winning_trades, 'losing_trades': losing_trades, 'win_rate': round((winning_trades / total_trades * 100), 2) if total_trades > 0 else 0, 'total_pnl': round(total_pnl, 2), 'total_pnl_percent': round(total_pnl_percent, 2), 'average_pnl': round(total_pnl / total_trades, 2) if total_trades > 0 else 0, 'average_win': round(sum(wins) / len(wins), 2) if wins else 0, 'average_loss': round(sum(losses) / len(losses), 2) if losses else 0, 'profit_factor': round(gross_profit / gross_loss, 2) if gross_loss > 0 else float('inf'), 'max_drawdown': min(o.max_drawdown for o in orders) if orders else 0, 'best_trade': max(o.pnl_percent for o in orders) if orders else 0, 'worst_trade': min(o.pnl_percent for o in orders) if orders else 0, 'by_grade': self._calculate_grade_statistics(orders), 'by_type': self._calculate_type_statistics(orders), 'by_symbol': self._calculate_symbol_statistics(orders) } finally: db.close() def _empty_statistics(self) -> Dict[str, Any]: """返回空统计结构""" return { 'total_trades': 0, 'winning_trades': 0, 'losing_trades': 0, 'win_rate': 0, 'total_pnl': 0, 'total_pnl_percent': 0, 'average_pnl': 0, 'average_win': 0, 'average_loss': 0, 'profit_factor': 0, 'max_drawdown': 0, 'best_trade': 0, 'worst_trade': 0, 'by_grade': {}, 'by_type': {}, 'by_symbol': {} } def get_account_status(self) -> Dict[str, Any]: """ 获取账户状态 Returns: 账户状态信息,包括余额、已用保证金、可用保证金等 """ # 计算已用保证金(每个活跃订单占用固定保证金) active_count = len(self.active_orders) used_margin = active_count * self.margin_per_order # 计算已实现盈亏(从历史订单) db = db_service.get_session() try: closed_orders = db.query(PaperOrder).filter( PaperOrder.status.in_([ OrderStatus.CLOSED_TP, OrderStatus.CLOSED_SL, OrderStatus.CLOSED_BE, OrderStatus.CLOSED_MANUAL ]) ).all() realized_pnl = sum(o.pnl_amount for o in closed_orders) finally: db.close() # 计算当前余额 current_balance = self.initial_balance + realized_pnl # 计算可用保证金 available_margin = current_balance - used_margin # 计算可开仓数 available_orders = self.max_orders - active_count return { 'initial_balance': self.initial_balance, 'realized_pnl': round(realized_pnl, 2), 'current_balance': round(current_balance, 2), 'used_margin': round(used_margin, 2), 'available_margin': round(available_margin, 2), 'leverage': self.leverage, 'margin_per_order': self.margin_per_order, 'active_orders': active_count, 'max_orders': self.max_orders, 'available_orders': available_orders, 'total_position_value': round(used_margin * self.leverage, 2), 'margin_ratio': round((used_margin / current_balance * 100), 2) if current_balance > 0 else 0 } def _calculate_grade_statistics(self, orders: List[PaperOrder]) -> Dict[str, Any]: """按信号等级统计""" result = {} for grade in ['A', 'B', 'C', 'D']: grade_orders = [o for o in orders if o.signal_grade and o.signal_grade.value == grade] if grade_orders: wins = len([o for o in grade_orders if o.pnl_amount > 0]) result[grade] = { 'count': len(grade_orders), 'win_rate': round(wins / len(grade_orders) * 100, 1), 'total_pnl': round(sum(o.pnl_amount for o in grade_orders), 2) } return result def _calculate_type_statistics(self, orders: List[PaperOrder]) -> Dict[str, Any]: """按信号类型统计""" result = {} for signal_type in ['swing', 'short_term']: type_orders = [o for o in orders if o.signal_type == signal_type] if type_orders: wins = len([o for o in type_orders if o.pnl_amount > 0]) result[signal_type] = { 'count': len(type_orders), 'win_rate': round(wins / len(type_orders) * 100, 1), 'total_pnl': round(sum(o.pnl_amount for o in type_orders), 2) } return result def _calculate_symbol_statistics(self, orders: List[PaperOrder]) -> Dict[str, Any]: """按交易对统计""" result = {} symbols = set(o.symbol for o in orders) for symbol in symbols: symbol_orders = [o for o in orders if o.symbol == symbol] if symbol_orders: wins = len([o for o in symbol_orders if o.pnl_amount > 0]) result[symbol] = { 'count': len(symbol_orders), 'win_rate': round(wins / len(symbol_orders) * 100, 1), 'total_pnl': round(sum(o.pnl_amount for o in symbol_orders), 2) } return result def get_period_statistics(self, hours: int = 4) -> Dict[str, Any]: """ 获取指定时间段内的统计数据 Args: hours: 统计时间段(小时) Returns: 时间段内的统计数据 """ db = db_service.get_session() try: cutoff_time = datetime.utcnow() - timedelta(hours=hours) # 查询时间段内平仓的订单 closed_orders = db.query(PaperOrder).filter( PaperOrder.status.in_([ OrderStatus.CLOSED_TP, OrderStatus.CLOSED_SL, OrderStatus.CLOSED_BE, OrderStatus.CLOSED_MANUAL ]), PaperOrder.closed_at >= cutoff_time ).all() # 查询时间段内新开仓的订单(包括当前活跃的) new_orders = db.query(PaperOrder).filter( PaperOrder.created_at >= cutoff_time ).all() # 计算时间段内的盈亏 period_pnl = sum(o.pnl_amount for o in closed_orders) period_wins = len([o for o in closed_orders if o.pnl_amount > 0]) period_losses = len([o for o in closed_orders if o.pnl_amount < 0]) return { 'period_hours': hours, 'new_orders': len(new_orders), 'closed_orders': len(closed_orders), 'period_pnl': round(period_pnl, 2), 'period_wins': period_wins, 'period_losses': period_losses, 'period_win_rate': round(period_wins / len(closed_orders) * 100, 1) if closed_orders else 0 } finally: db.close() def generate_report(self, hours: int = 4) -> str: """ 生成模拟交易报告 Args: hours: 报告时间段(小时) Returns: 格式化的报告文本 """ # 获取总体统计 total_stats = self.calculate_statistics() # 获取时间段统计 period_stats = self.get_period_statistics(hours) # 获取当前活跃订单 active_orders = self.get_active_orders() # 构建报告 lines = [ f"📊 模拟交易 {hours} 小时报告", "", "━━━━━━ 总体情况 ━━━━━━", f"总交易数: {total_stats['total_trades']} | 胜率: {total_stats['win_rate']}%", f"总盈亏: ${total_stats['total_pnl']:+.2f}", "", f"━━━━━━ 过去 {hours} 小时 ━━━━━━", f"新订单: {period_stats['new_orders']} | 已平仓: {period_stats['closed_orders']}", f"本期盈亏: ${period_stats['period_pnl']:+.2f}", ] # 当前持仓 open_orders = [o for o in active_orders if o.get('status') == 'open'] pending_orders = [o for o in active_orders if o.get('status') == 'pending'] if open_orders or pending_orders: lines.append("") lines.append("━━━━━━ 当前订单 ━━━━━━") for order in open_orders[:5]: # 最多显示5个 side_text = "做多" if order.get('side') == 'long' else "做空" entry_price = order.get('filled_price') or order.get('entry_price', 0) lines.append(f"✅ {order.get('symbol')} {side_text} @ ${entry_price:,.0f}") for order in pending_orders[:3]: # 最多显示3个挂单 side_text = "做多" if order.get('side') == 'long' else "做空" lines.append(f"⏳ {order.get('symbol')} {side_text} 挂单 @ ${order.get('entry_price', 0):,.0f}") if len(open_orders) > 5: lines.append(f"... 还有 {len(open_orders) - 5} 个持仓") if len(pending_orders) > 3: lines.append(f"... 还有 {len(pending_orders) - 3} 个挂单") # 按等级统计 by_grade = total_stats.get('by_grade', {}) if by_grade: lines.append("") lines.append("━━━━━━ 按等级统计 ━━━━━━") for grade in ['A', 'B', 'C']: if grade in by_grade: g = by_grade[grade] pnl_sign = "+" if g['total_pnl'] >= 0 else "" lines.append(f"{grade}级: {g['count']}笔 | 胜率{g['win_rate']}% | {pnl_sign}${g['total_pnl']:.0f}") lines.append("") lines.append(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}") return "\n".join(lines) def reset_all_data(self) -> Dict[str, Any]: """ 重置所有模拟交易数据 Returns: 重置结果,包含删除的订单数量 """ db = db_service.get_session() try: # 统计删除前的数量 total_count = db.query(PaperOrder).count() active_count = len(self.active_orders) # 删除所有订单(包括活跃和历史订单) deleted = db.query(PaperOrder).delete(synchronize_session='fetch') db.commit() # 清空内存缓存 self.active_orders.clear() logger.info(f"模拟交易数据已重置,删除 {deleted} 条订单(总计 {total_count} 条)") return { 'deleted_count': deleted, 'active_orders_cleared': active_count } except Exception as e: db.rollback() logger.error(f"重置模拟交易数据失败: {e}") raise finally: db.close() # 全局单例 _paper_trading_service: Optional[PaperTradingService] = None def get_paper_trading_service() -> PaperTradingService: """获取模拟交易服务单例""" global _paper_trading_service if _paper_trading_service is None: _paper_trading_service = PaperTradingService() return _paper_trading_service