"""
模拟交易服务 - 订单管理和盈亏统计
"""
import uuid
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from app.models.paper_trading import PaperOrder, OrderStatus, OrderSide, SignalGrade, EntryType
from app.services.db_service import db_service
from app.config import get_settings
from app.utils.logger import logger
class PaperTradingService:
"""模拟交易服务"""
def __init__(self):
"""初始化模拟交易服务"""
self.settings = get_settings()
self.active_orders: Dict[str, PaperOrder] = {} # 内存缓存活跃订单
# 合约交易配置
self.initial_balance = self.settings.paper_trading_initial_balance # 初始本金
self.leverage = self.settings.paper_trading_leverage # 杠杆倍数
self.margin_per_order = self.settings.paper_trading_margin_per_order # 每单保证金
self.max_orders = self.settings.paper_trading_max_orders # 最大订单数
self.auto_close_opposite = self.settings.paper_trading_auto_close_opposite # 是否自动平掉反向持仓
self.breakeven_threshold = self.settings.paper_trading_breakeven_threshold # 保本止损触发阈值
# 移动止损配置
self.trailing_stop_enabled = self.settings.paper_trading_trailing_stop_enabled
self.trailing_stop_threshold_multiplier = self.settings.paper_trading_trailing_stop_threshold_multiplier
self.trailing_stop_ratio = self.settings.paper_trailing_stop_ratio
# 动态止盈配置
self.dynamic_tp_enabled = self.settings.paper_trading_dynamic_tp_enabled
self.strong_trend_ratio = self.settings.paper_trading_strong_trend_ratio
self.weak_trend_ratio = self.settings.paper_trading_weak_trend_ratio
self.sideways_tp_percent = self.settings.paper_trading_sideways_tp_percent
# 确保表已创建
self._ensure_table_exists()
# 加载活跃订单到内存
self._load_active_orders()
logger.info(f"模拟交易服务初始化完成(自动平反向持仓: {'启用' if self.auto_close_opposite else '禁用'},"
f"保本止损阈值: {self.breakeven_threshold}%,"
f"移动止损: {'启用' if self.trailing_stop_enabled else '禁用'},"
f"触发倍数: {self.trailing_stop_threshold_multiplier}x,"
f"跟随比例: {self.trailing_stop_ratio * 100}%)")
def _ensure_table_exists(self):
"""确保数据表已创建,并迁移新字段"""
from app.models.paper_trading import PaperOrder
from app.models.database import Base
from sqlalchemy import text
Base.metadata.create_all(bind=db_service.engine)
db = db_service.get_session()
try:
# 检查并添加新字段 breakeven_triggered
try:
db.execute(text("SELECT breakeven_triggered FROM paper_orders LIMIT 1"))
except Exception:
try:
db.execute(text("ALTER TABLE paper_orders ADD COLUMN breakeven_triggered INTEGER DEFAULT 0"))
db.commit()
logger.info("数据库迁移: 添加 breakeven_triggered 字段")
except Exception as e:
logger.warning(f"添加 breakeven_triggered 字段失败(可能已存在): {e}")
db.rollback()
# 检查并添加移动止损相关字段
try:
db.execute(text("SELECT trailing_stop_triggered FROM paper_orders LIMIT 1"))
except Exception:
try:
db.execute(text("ALTER TABLE paper_orders ADD COLUMN trailing_stop_triggered INTEGER DEFAULT 0"))
db.execute(text("ALTER TABLE paper_orders ADD COLUMN trailing_stop_base_profit REAL DEFAULT 0"))
db.commit()
logger.info("数据库迁移: 添加 trailing_stop_triggered 和 trailing_stop_base_profit 字段")
except Exception as e:
logger.warning(f"添加移动止损字段失败(可能已存在): {e}")
db.rollback()
finally:
db.close()
def _load_active_orders(self):
"""从数据库加载活跃订单到内存"""
db = db_service.get_session()
try:
orders = db.query(PaperOrder).filter(
PaperOrder.status.in_([OrderStatus.PENDING, OrderStatus.OPEN])
).all()
# 使用 make_transient 将对象从会话中分离,使其成为独立对象
from sqlalchemy.orm import make_transient
for order in orders:
db.expunge(order) # 从会话中移除
make_transient(order) # 使对象独立
self.active_orders[order.order_id] = order
logger.info(f"已加载 {len(orders)} 个活跃订单")
except Exception as e:
logger.error(f"加载活跃订单失败: {e}")
finally:
db.close()
def create_order_from_signal(self, signal: Dict[str, Any], current_price: float = None) -> Dict[str, Any]:
"""
从交易信号创建模拟订单
Args:
signal: 交易信号
- symbol: 交易对
- action: 'buy' 或 'sell'
- entry_type: 'market' 或 'limit'
- price / entry_price: 入场价
- stop_loss: 止损价
- take_profit: 止盈价
- confidence: 置信度
- signal_grade / grade: 信号等级
- signal_type / type: 信号类型
- reason: 入场原因
current_price: 当前价格(用于市价单)
Returns:
包含 'order' 和 'cancelled_orders' 的字典
"""
result = {'order': None, 'cancelled_orders': []}
action = signal.get('action')
if action not in ['buy', 'sell']:
return result
symbol = signal.get('symbol', 'UNKNOWN')
side = OrderSide.LONG if action == 'buy' else OrderSide.SHORT
entry_price = signal.get('entry_price') or signal.get('price', 0)
# === 反向订单处理 ===
# 1. 总是取消同一交易对的反向挂单(混合策略)
cancelled_orders = self._cancel_opposite_pending_orders(symbol, side)
result['cancelled_orders'] = cancelled_orders
# 2. 可选:智能平掉反向持仓(需要配置启用)
if self.auto_close_opposite and current_price:
grade = signal.get('signal_grade') or signal.get('grade', 'D')
self._close_opposite_positions(symbol, side, grade, current_price)
# === 限制检查 ===
# 1. 检查总订单数(持仓+挂单)是否超过最大限制
total_orders = len(self.active_orders)
if total_orders >= self.max_orders:
logger.info(f"订单限制: 已达到最大订单数 {self.max_orders},跳过")
return result
# 2. 检查是否有接近的挂单(价格差距 < 1%)
same_direction_orders = [
order for order in self.active_orders.values()
if order.symbol == symbol and order.side == side
]
pending_orders = [
order for order in same_direction_orders
if order.status == OrderStatus.PENDING
]
for pending in pending_orders:
price_diff = abs(pending.entry_price - entry_price) / pending.entry_price
if price_diff < 0.01: # 价格差距小于 1%
logger.info(f"订单限制: {symbol} 已有接近的挂单 @ ${pending.entry_price:,.2f},新信号 @ ${entry_price:,.2f},跳过")
return result
# 获取信号等级
grade = signal.get('signal_grade') or signal.get('grade', 'D')
if grade == 'D':
logger.info(f"D级信号不开仓: {signal.get('symbol')}")
return result
# === 动态仓位计算 ===
position_size = signal.get('position_size', 'light')
margin, position_value = self._calculate_dynamic_position(position_size, symbol)
if margin <= 0:
logger.info(f"无可用保证金: {symbol} | 当前杠杆已达上限")
return result
quantity = position_value # 订单数量(以 USDT 计价)
# 确定入场类型
entry_type_str = signal.get('entry_type', 'market')
entry_type = EntryType.LIMIT if entry_type_str == 'limit' else EntryType.MARKET
# 生成订单ID
order_id = f"PT-{symbol}-{datetime.now().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:6]}"
# 确定订单状态和成交价
if entry_type == EntryType.MARKET:
# 现价单:立即开仓
status = OrderStatus.OPEN
filled_price = current_price if current_price else entry_price
opened_at = datetime.utcnow()
else:
# 挂单:等待触发
status = OrderStatus.PENDING
filled_price = None
opened_at = None
db = db_service.get_session()
try:
order = PaperOrder(
order_id=order_id,
symbol=symbol,
side=side,
entry_price=entry_price,
stop_loss=signal.get('stop_loss', 0),
take_profit=signal.get('take_profit', 0),
filled_price=filled_price,
quantity=quantity,
signal_grade=SignalGrade(grade),
signal_type=signal.get('signal_type') or signal.get('type', 'swing'),
confidence=signal.get('confidence', 0),
trend=signal.get('trend'),
entry_type=entry_type,
status=status,
opened_at=opened_at,
entry_reasons=[signal.get('reason', '')] if signal.get('reason') else signal.get('reasons', []),
indicators=signal.get('indicators', {})
)
db.add(order)
db.commit()
db.refresh(order)
# 添加到活跃订单缓存
self.active_orders[order.order_id] = order
entry_type_text = "现价" if entry_type == EntryType.MARKET else "挂单"
status_text = "已开仓" if status == OrderStatus.OPEN else "等待触发"
logger.info(f"创建模拟订单: {order_id} | {symbol} {side.value} [{entry_type_text}] @ ${entry_price:,.2f} | {status_text}")
logger.info(f" 保证金: ${margin:,.0f} | 杠杆: {self.leverage}x | 持仓价值: ${position_value:,.0f} | 当前订单数: {len(self.active_orders)}/{self.max_orders}")
result['order'] = order
return result
except Exception as e:
logger.error(f"创建模拟订单失败: {e}")
db.rollback()
return result
finally:
db.close()
def _calculate_dynamic_position(self, position_size: str, symbol: str) -> tuple:
"""
根据 LLM 建议的仓位大小计算实际保证金和持仓价值
Args:
position_size: 'heavy' / 'medium' / 'light'
symbol: 交易对
Returns:
(margin, position_value) 元组
"""
# 获取当前账户状态
account = self.get_account_status()
balance = account['current_balance']
used_margin = account['used_margin']
max_leverage = self.leverage # 最大杠杆 20x
# 计算可用保证金空间
# 全仓模式下:最大持仓价值 = 余额 × 最大杠杆
max_position_value = balance * max_leverage
current_position_value = account['total_position_value']
available_position_value = max_position_value - current_position_value
if available_position_value <= 0:
logger.warning(f"已达最大杠杆限制,无法开仓")
return 0, 0
# 根据 position_size 确定仓位比例
# heavy: 可用空间的 30%
# medium: 可用空间的 15%
# light: 可用空间的 5%
size_ratio = {
'heavy': 0.30,
'medium': 0.15,
'light': 0.05
}.get(position_size, 0.05)
# 计算目标持仓价值
target_position_value = available_position_value * size_ratio
# 设置最小和最大限制
min_position_value = 1000 # 最小持仓价值 1000 USDT
max_single_position = balance * 5 # 单笔最大不超过 5x 杠杆
position_value = max(min_position_value, min(target_position_value, max_single_position))
# 确保不超过可用空间
position_value = min(position_value, available_position_value)
# 计算对应的保证金
margin = position_value / max_leverage
logger.info(f"动态仓位计算: {position_size} | 可用空间: ${available_position_value:,.0f} | "
f"目标仓位: ${position_value:,.0f} | 保证金: ${margin:,.0f}")
return margin, position_value
def get_position_info(self) -> Dict[str, Any]:
"""
获取当前持仓信息(供 LLM 分析使用)
Returns:
持仓信息字典
"""
account = self.get_account_status()
active_orders = self.get_active_orders()
# 计算当前杠杆
balance = account['current_balance']
total_position_value = account['total_position_value']
current_leverage = total_position_value / balance if balance > 0 else 0
# 格式化持仓列表
positions = []
for order in active_orders:
positions.append({
'symbol': order.get('symbol'),
'side': order.get('side'),
'status': order.get('status'),
'entry_price': order.get('filled_price') or order.get('entry_price'),
'quantity': order.get('quantity'),
'pnl_percent': order.get('pnl_percent', 0)
})
return {
'account_balance': balance,
'total_position_value': total_position_value,
'current_leverage': current_leverage,
'max_leverage': self.leverage,
'positions': positions
}
def check_price_triggers(self, symbol: str, current_price: float) -> List[Dict[str, Any]]:
"""
检查当前价格是否触发挂单入场或止盈止损
Args:
symbol: 交易对
current_price: 当前价格
Returns:
触发的订单结果列表(包括挂单激活和平仓结果)
"""
triggered = []
activated_order_ids = set() # 记录本轮刚激活的订单,避免立即检查止盈止损
# 1. 检查挂单是否触发入场
pending_orders = [
order for order in self.active_orders.values()
if order.symbol == symbol and order.status == OrderStatus.PENDING
]
for order in pending_orders:
result = self._check_pending_entry(order, current_price)
if result:
triggered.append(result)
activated_order_ids.add(order.order_id)
logger.info(f"挂单触发入场: {order.order_id} | {symbol} @ ${current_price:,.2f}")
# 2. 检查持仓订单是否触发止盈止损(跳过本轮刚激活的订单)
open_orders = [
order for order in self.active_orders.values()
if order.symbol == symbol and order.status == OrderStatus.OPEN
and order.order_id not in activated_order_ids
]
for order in open_orders:
result = self._check_order_trigger(order, current_price)
if result:
triggered.append(result)
else:
# 更新最大回撤和最大盈利,并检查保本止损
breakeven_result = self._update_order_extremes(order, current_price)
if breakeven_result:
triggered.append(breakeven_result)
return triggered
def _check_pending_entry(self, order: PaperOrder, current_price: float) -> Optional[Dict[str, Any]]:
"""
检查挂单是否触发入场
做多挂单:价格下跌到入场价时触发(买入)
做空挂单:价格上涨到入场价时触发(卖出)
Returns:
如果触发,返回激活结果字典;否则返回 None
"""
should_trigger = False
if order.side == OrderSide.LONG:
# 做多:价格 <= 入场价 触发
if current_price <= order.entry_price:
should_trigger = True
else:
# 做空:价格 >= 入场价 触发
if current_price >= order.entry_price:
should_trigger = True
if should_trigger:
return self._activate_pending_order(order, current_price)
return None
def _activate_pending_order(self, order: PaperOrder, filled_price: float) -> Optional[Dict[str, Any]]:
"""
激活挂单,转为持仓
Returns:
激活结果字典,包含订单信息
"""
db = db_service.get_session()
try:
# 从数据库重新查询订单,确保在当前会话中
db_order = db.query(PaperOrder).filter(PaperOrder.order_id == order.order_id).first()
if not db_order:
logger.error(f"数据库中未找到订单: {order.order_id}")
return None
# 更新订单状态
db_order.status = OrderStatus.OPEN
db_order.filled_price = filled_price
db_order.opened_at = datetime.utcnow()
db.commit()
db.refresh(db_order)
# 使用 make_transient 将对象从会话中分离,使其成为独立对象
from sqlalchemy.orm import make_transient
db.expunge(db_order) # 从会话中移除
make_transient(db_order) # 使对象独立
# 用独立的对象替换内存缓存中的旧对象
self.active_orders[order.order_id] = db_order
logger.info(f"挂单已激活: {order.order_id} | {order.symbol} {order.side.value} @ ${filled_price:,.2f}")
# 返回激活结果
return {
'event_type': 'order_filled',
'order_id': order.order_id,
'symbol': order.symbol,
'side': order.side.value,
'entry_price': order.entry_price, # 挂单价
'filled_price': filled_price,
'quantity': order.quantity,
'signal_grade': order.signal_grade.value if order.signal_grade else None,
'stop_loss': order.stop_loss,
'take_profit': order.take_profit
}
except Exception as e:
logger.error(f"激活挂单失败: {e}")
import traceback
logger.error(traceback.format_exc())
db.rollback()
return None
finally:
db.close()
def _check_order_trigger(self, order: PaperOrder, current_price: float) -> Optional[Dict[str, Any]]:
"""检查单个订单是否触发"""
triggered = False
new_status = None
exit_price = current_price
if order.side == OrderSide.LONG:
# 做多: 价格 >= 止盈价 触发止盈, 价格 <= 止损价 触发止损
if current_price >= order.take_profit:
triggered = True
new_status = OrderStatus.CLOSED_TP
exit_price = order.take_profit
elif current_price <= order.stop_loss:
triggered = True
# 通过标记判断是否是保本止损
if getattr(order, 'breakeven_triggered', 0) == 1:
new_status = OrderStatus.CLOSED_BE
else:
new_status = OrderStatus.CLOSED_SL
exit_price = order.stop_loss
else:
# 做空: 价格 <= 止盈价 触发止盈, 价格 >= 止损价 触发止损
if current_price <= order.take_profit:
triggered = True
new_status = OrderStatus.CLOSED_TP
exit_price = order.take_profit
elif current_price >= order.stop_loss:
triggered = True
# 通过标记判断是否是保本止损
if getattr(order, 'breakeven_triggered', 0) == 1:
new_status = OrderStatus.CLOSED_BE
else:
new_status = OrderStatus.CLOSED_SL
exit_price = order.stop_loss
if triggered:
return self._close_order(order, new_status, exit_price)
return None
def _close_order(self, order: PaperOrder, status: OrderStatus, exit_price: float) -> Dict[str, Any]:
"""平仓并计算盈亏"""
db = db_service.get_session()
try:
# 从数据库重新查询订单,确保在当前会话中
db_order = db.query(PaperOrder).filter(PaperOrder.order_id == order.order_id).first()
if not db_order:
logger.error(f"数据库中未找到订单: {order.order_id}")
return None
# 计算盈亏
if db_order.side == OrderSide.LONG:
pnl_percent = ((exit_price - db_order.filled_price) / db_order.filled_price) * 100
else:
pnl_percent = ((db_order.filled_price - exit_price) / db_order.filled_price) * 100
pnl_amount = db_order.quantity * pnl_percent / 100
# 计算持仓时间
hold_duration = datetime.utcnow() - db_order.opened_at if db_order.opened_at else timedelta(0)
# 更新订单
db_order.status = status
db_order.exit_price = exit_price
db_order.closed_at = datetime.utcnow()
db_order.pnl_amount = round(pnl_amount, 2)
db_order.pnl_percent = round(pnl_percent, 4)
db.commit()
db.refresh(db_order)
# 从活跃订单缓存中移除
if order.order_id in self.active_orders:
del self.active_orders[order.order_id]
result = {
'order_id': db_order.order_id,
'symbol': db_order.symbol,
'side': db_order.side.value,
'status': status.value,
'entry_price': db_order.filled_price,
'exit_price': exit_price,
'quantity': db_order.quantity,
'pnl_amount': db_order.pnl_amount,
'pnl_percent': db_order.pnl_percent,
'is_win': pnl_amount > 0,
'hold_duration': str(hold_duration).split('.')[0], # 去掉微秒
'signal_grade': db_order.signal_grade.value if db_order.signal_grade else None
}
status_text = {"closed_tp": "止盈", "closed_sl": "止损", "closed_be": "保本止损"}.get(status.value, "平仓")
logger.info(f"订单{status_text}: {db_order.order_id} | {db_order.symbol} | 盈亏: {pnl_percent:+.2f}% (${pnl_amount:+.2f})")
return result
except Exception as e:
logger.error(f"平仓失败: {e}")
import traceback
logger.error(traceback.format_exc())
db.rollback()
return None
finally:
db.close()
def _assess_trend_strength(self, order: PaperOrder, current_price: float) -> str:
"""
评估当前趋势强度(用于动态止盈)
Returns:
'strong' - 强趋势(移动止损跟随比例高)
'weak' - 弱趋势(移动止损跟随比例低)
'sideways' - 震荡市(考虑固定止盈)
"""
# 基于订单的最大盈利和当前盈亏判断趋势强度
max_profit = order.max_profit
current_pnl_percent = 0
if order.side == OrderSide.LONG:
current_pnl_percent = ((current_price - order.filled_price) / order.filled_price) * 100
else:
current_pnl_percent = ((order.filled_price - current_price) / order.filled_price) * 100
# 判断1:盈利是否持续创新高
profit_streak = (current_pnl_percent >= max_profit * 0.8) # 当前盈利接近最大盈利
# 判断2:最大盈利是否足够大(说明有趋势)
significant_profit = max_profit >= 3 # 至少3%盈利
# 判断3:当前盈利是否回撤较多
profit_pullback = (max_profit - current_pnl_percent) / max_profit if max_profit > 0 else 0
# 综合判断
if profit_streak and significant_profit and profit_pullback < 0.3:
return 'strong'
elif significant_profit and profit_pullback >= 0.5:
# 盈利回撤超过50%,可能是震荡或反转
return 'sideways'
elif max_profit < 1.5:
# 盈利不到1.5%,趋势尚未确立
return 'weak'
else:
return 'weak'
def _get_dynamic_ratio(self, order: PaperOrder, current_price: float) -> float:
"""
根据趋势强度动态计算移动止损跟随比例
Returns:
跟随比例(0-1之间)
"""
if not self.dynamic_tp_enabled:
return self.trailing_stop_ratio
trend_strength = self._assess_trend_strength(order, current_price)
if trend_strength == 'strong':
# 强趋势:高跟随比例,锁定更多利润
return self.strong_trend_ratio
elif trend_strength == 'sideways':
# 震荡市:低跟随比例,避免被来回扫
return self.weak_trend_ratio
else:
# 弱趋势:中等跟随比例
return self.trailing_stop_ratio
def _update_order_extremes(self, order: PaperOrder, current_price: float) -> Optional[Dict[str, Any]]:
"""
更新订单的最大回撤和最大盈利,并检查是否需要移动止损
止损策略(按优先级):
1. 移动止损:盈利达到阈值倍数后,止损按比例跟随盈利移动
2. 保本止损:盈利达到阈值时,将止损移动到开仓价
Returns:
如果触发了止损移动,返回通知字典;否则返回 None
"""
if order.side == OrderSide.LONG:
current_pnl_percent = ((current_price - order.filled_price) / order.filled_price) * 100
else:
current_pnl_percent = ((order.filled_price - current_price) / order.filled_price) * 100
# 检查是否需要更新极值
needs_update = False
stop_moved = False
stop_move_type = "" # "breakeven" 或 "trailing"
new_stop_loss = None
if current_pnl_percent > order.max_profit:
order.max_profit = current_pnl_percent
needs_update = True
if current_pnl_percent < order.max_drawdown:
order.max_drawdown = current_pnl_percent
needs_update = True
# === 移动止损逻辑(优先级高于保本止损) ===
if self.trailing_stop_enabled and self.trailing_stop_threshold_multiplier > 0:
trailing_threshold = self.breakeven_threshold * self.trailing_stop_threshold_multiplier
# 检查是否达到移动止损触发阈值
if current_pnl_percent >= trailing_threshold:
trailing_triggered = getattr(order, 'trailing_stop_triggered', 0) == 1
if not trailing_triggered:
# 首次触发移动止损:记录基准盈利,并移动止损
order.trailing_stop_triggered = 1
order.trailing_stop_base_profit = current_pnl_percent
needs_update = True
stop_moved = True
stop_move_type = "trailing_first"
# 计算新的止损价(锁定部分利润)
if order.side == OrderSide.LONG:
# 做多:新止损价 = 开仓价 * (1 + 盈利 * 跟随比例)
locked_profit_percent = current_pnl_percent * self._get_dynamic_ratio(order, current_price)
new_stop_loss = order.filled_price * (1 + locked_profit_percent / 100)
order.stop_loss = new_stop_loss
else:
# 做空:新止损价 = 开仓价 * (1 - 盈利 * 跟随比例)
locked_profit_percent = current_pnl_percent * self._get_dynamic_ratio(order, current_price)
new_stop_loss = order.filled_price * (1 - locked_profit_percent / 100)
order.stop_loss = new_stop_loss
logger.info(f"移动止损首次触发: {order.order_id} | {order.symbol} | 盈利 {current_pnl_percent:.2f}% >= {trailing_threshold:.2f}% | "
f"锁定利润 {locked_profit_percent:.2f}% | 止损移至 ${order.stop_loss:,.2f}")
elif getattr(order, 'trailing_stop_triggered', 0) == 1:
# 已触发过移动止损,持续跟随
base_profit = getattr(order, 'trailing_stop_base_profit', 0)
additional_profit = current_pnl_percent - base_profit
if additional_profit > 0:
# 有额外盈利,移动止损以锁定更多利润
if order.side == OrderSide.LONG:
# 做多:当前止损对应的盈利
current_stop_profit = ((order.stop_loss - order.filled_price) / order.filled_price) * 100
# 新的止损盈利 = 基准 + 额外盈利 * 跟随比例
new_stop_profit = base_profit * self._get_dynamic_ratio(order, current_price)
new_stop_loss = order.filled_price * (1 + new_stop_profit / 100)
# 只有当新止损高于当前止损时才更新(做多止损只能上移)
if new_stop_loss > order.stop_loss:
old_stop = order.stop_loss
order.stop_loss = new_stop_loss
needs_update = True
stop_moved = True
stop_move_type = "trailing_update"
logger.info(f"移动止损更新: {order.order_id} | {order.symbol} | "
f"盈利 {current_pnl_percent:.2f}% | 止损 ${old_stop:,.2f} -> ${new_stop_loss:,.2f}")
else:
# 做空:当前止损对应的盈利
current_stop_profit = ((order.filled_price - order.stop_loss) / order.filled_price) * 100
# 新的止损盈利 = 基准 + 额外盈利 * 跟随比例
new_stop_profit = base_profit * self._get_dynamic_ratio(order, current_price)
new_stop_loss = order.filled_price * (1 - new_stop_profit / 100)
# 只有当新止损低于当前止损时才更新(做空止损只能下移)
if new_stop_loss < order.stop_loss:
old_stop = order.stop_loss
order.stop_loss = new_stop_loss
needs_update = True
stop_moved = True
stop_move_type = "trailing_update"
logger.info(f"移动止损更新: {order.order_id} | {order.symbol} | "
f"盈利 {current_pnl_percent:.2f}% | 止损 ${old_stop:,.2f} -> ${new_stop_loss:,.2f}")
# === 保本止损逻辑(仅在未触发移动止损时生效) ===
if self.breakeven_threshold > 0 and current_pnl_percent >= self.breakeven_threshold:
# 检查是否还没有触发过保本止损,也没有触发过移动止损
breakeven_not_triggered = getattr(order, 'breakeven_triggered', 0) != 1
trailing_not_triggered = getattr(order, 'trailing_stop_triggered', 0) != 1
if breakeven_not_triggered and trailing_not_triggered:
if order.side == OrderSide.LONG:
# 做多:止损应该低于开仓价,如果止损还在开仓价下方,则移动到开仓价
if order.stop_loss < order.filled_price:
order.stop_loss = order.filled_price
order.breakeven_triggered = 1
needs_update = True
stop_moved = True
stop_move_type = "breakeven"
new_stop_loss = order.stop_loss
logger.info(f"保本止损触发: {order.order_id} | {order.symbol} | 盈利 {current_pnl_percent:.2f}% >= {self.breakeven_threshold}% | 止损移至开仓价 ${order.filled_price:,.2f}")
else:
# 做空:止损应该高于开仓价,如果止损还在开仓价上方,则移动到开仓价
if order.stop_loss > order.filled_price:
order.stop_loss = order.filled_price
order.breakeven_triggered = 1
needs_update = True
stop_moved = True
stop_move_type = "breakeven"
new_stop_loss = order.stop_loss
logger.info(f"保本止损触发: {order.order_id} | {order.symbol} | 盈利 {current_pnl_percent:.2f}% >= {self.breakeven_threshold}% | 止损移至开仓价 ${order.filled_price:,.2f}")
# 如果有更新,持久化到数据库
if needs_update:
db = db_service.get_session()
try:
db_order = db.query(PaperOrder).filter(PaperOrder.order_id == order.order_id).first()
if db_order:
db_order.max_profit = order.max_profit
db_order.max_drawdown = order.max_drawdown
db_order.stop_loss = order.stop_loss
db_order.breakeven_triggered = getattr(order, 'breakeven_triggered', 0)
# 更新移动止损相关字段
if hasattr(order, 'trailing_stop_triggered'):
db_order.trailing_stop_triggered = order.trailing_stop_triggered
if hasattr(order, 'trailing_stop_base_profit'):
db_order.trailing_stop_base_profit = getattr(order, 'trailing_stop_base_profit', 0)
db.commit()
except Exception as e:
logger.error(f"更新订单极值失败: {e}")
db.rollback()
finally:
db.close()
# 如果触发了止损移动,返回通知信息
if stop_moved and new_stop_loss is not None:
return {
'event_type': 'stop_loss_moved',
'order_id': order.order_id,
'symbol': order.symbol,
'side': order.side.value,
'move_type': stop_move_type,
'filled_price': order.filled_price,
'new_stop_loss': new_stop_loss,
'current_pnl_percent': current_pnl_percent
}
return None
def close_order_manual(self, order_id: str, exit_price: float) -> Optional[Dict[str, Any]]:
"""手动平仓或取消挂单"""
if order_id not in self.active_orders:
logger.warning(f"订单不存在或已平仓: {order_id}")
return None
order = self.active_orders[order_id]
# 如果是挂单,取消而不是平仓
if order.status == OrderStatus.PENDING:
return self._cancel_pending_order(order)
return self._close_order(order, OrderStatus.CLOSED_MANUAL, exit_price)
def _cancel_pending_order(self, order: PaperOrder) -> Dict[str, Any]:
"""取消挂单"""
db = db_service.get_session()
try:
order.status = OrderStatus.CANCELLED
order.closed_at = datetime.utcnow()
db.merge(order)
db.commit()
# 从活跃订单缓存中移除
if order.order_id in self.active_orders:
del self.active_orders[order.order_id]
logger.info(f"挂单已取消: {order.order_id} | {order.symbol}")
return {
'order_id': order.order_id,
'symbol': order.symbol,
'side': order.side.value,
'status': 'cancelled',
'entry_price': order.entry_price,
'message': '挂单已取消'
}
except Exception as e:
logger.error(f"取消挂单失败: {e}")
db.rollback()
return None
finally:
db.close()
def _cancel_opposite_pending_orders(self, symbol: str, new_side: OrderSide) -> List[Dict[str, Any]]:
"""
取消同一交易对的反向挂单
Args:
symbol: 交易对
new_side: 新信号的方向
Returns:
被取消的订单信息列表
"""
# 找出所有反向挂单
opposite_side = OrderSide.SHORT if new_side == OrderSide.LONG else OrderSide.LONG
opposite_pending = [
order for order in self.active_orders.values()
if order.symbol == symbol
and order.side == opposite_side
and order.status == OrderStatus.PENDING
]
if not opposite_pending:
return []
cancelled_orders = []
for order in opposite_pending:
result = self._cancel_pending_order(order)
if result:
result['event_type'] = 'pending_cancelled'
result['new_side'] = new_side.value
result['reason'] = f'收到反向{new_side.value}信号,自动撤销'
cancelled_orders.append(result)
logger.info(f"自动取消反向挂单: {order.order_id} | {symbol} {opposite_side.value} @ ${order.entry_price:,.2f}")
if cancelled_orders:
logger.info(f"已取消 {len(cancelled_orders)} 个反向挂单({symbol} {opposite_side.value}),新信号方向: {new_side.value}")
return cancelled_orders
def _close_opposite_positions(self, symbol: str, new_side: OrderSide, signal_grade: str, current_price: float) -> int:
"""
智能平掉反向持仓(可选策略)
Args:
symbol: 交易对
new_side: 新信号的方向
signal_grade: 新信号等级
current_price: 当前价格
Returns:
平仓的订单数量
"""
# 找出所有反向持仓
opposite_side = OrderSide.SHORT if new_side == OrderSide.LONG else OrderSide.LONG
opposite_positions = [
order for order in self.active_orders.values()
if order.symbol == symbol
and order.side == opposite_side
and order.status == OrderStatus.OPEN
]
if not opposite_positions:
return 0
closed_count = 0
for order in opposite_positions:
# 计算当前盈亏
if order.side == OrderSide.LONG:
pnl_percent = ((current_price - order.filled_price) / order.filled_price) * 100
else:
pnl_percent = ((order.filled_price - current_price) / order.filled_price) * 100
# 智能决策:只在特定条件下平仓
should_close = False
reason = ""
# 条件1:A级信号 + 反向持仓亏损超过3%
if signal_grade == 'A' and pnl_percent < -3:
should_close = True
reason = f"A级反向信号且当前亏损{pnl_percent:.2f}%,提前止损"
# 条件2:A级信号 + 反向持仓盈利但接近止损(盈利<1%)
elif signal_grade == 'A' and 0 < pnl_percent < 1:
should_close = True
reason = f"A级反向信号且盈利较小({pnl_percent:.2f}%),落袋为安"
if should_close:
result = self._close_order(order, OrderStatus.CLOSED_MANUAL, current_price)
if result:
closed_count += 1
logger.info(f"自动平掉反向持仓: {order.order_id} | {symbol} {opposite_side.value} | 原因: {reason}")
if closed_count > 0:
logger.info(f"已平掉 {closed_count} 个反向持仓({symbol} {opposite_side.value})")
return closed_count
def get_active_orders(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
"""获取活跃订单(从数据库读取,确保多进程同步)"""
db = db_service.get_session()
try:
query = db.query(PaperOrder).filter(
PaperOrder.status.in_([OrderStatus.PENDING, OrderStatus.OPEN])
)
if symbol:
query = query.filter(PaperOrder.symbol == symbol)
orders = query.all()
# 转换为字典
result = []
for order in orders:
try:
result.append(order.to_dict())
except Exception as e:
logger.error(f"转换订单 {order.order_id} 为字典失败: {e}")
import traceback
logger.error(traceback.format_exc())
return result
except Exception as e:
logger.error(f"获取活跃订单失败: {e}")
return []
finally:
db.close()
def get_order_by_id(self, order_id: str) -> Optional[Dict[str, Any]]:
"""根据ID获取订单"""
# 先从缓存查找
if order_id in self.active_orders:
try:
return self.active_orders[order_id].to_dict()
except Exception as e:
logger.error(f"转换订单 {order_id} 为字典失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
# 从数据库查找
db = db_service.get_session()
try:
order = db.query(PaperOrder).filter(PaperOrder.order_id == order_id).first()
return order.to_dict() if order else None
finally:
db.close()
def get_order_history(self, symbol: Optional[str] = None, limit: int = 100) -> List[Dict[str, Any]]:
"""获取历史订单"""
db = db_service.get_session()
try:
query = db.query(PaperOrder).filter(
PaperOrder.status.in_([
OrderStatus.CLOSED_TP,
OrderStatus.CLOSED_SL,
OrderStatus.CLOSED_BE,
OrderStatus.CLOSED_MANUAL
])
)
if symbol:
query = query.filter(PaperOrder.symbol == symbol)
orders = query.order_by(PaperOrder.closed_at.desc()).limit(limit).all()
return [o.to_dict() for o in orders]
finally:
db.close()
def calculate_statistics(self, symbol: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> Dict[str, Any]:
"""计算交易统计"""
db = db_service.get_session()
try:
query = db.query(PaperOrder).filter(
PaperOrder.status.in_([
OrderStatus.CLOSED_TP,
OrderStatus.CLOSED_SL,
OrderStatus.CLOSED_BE,
OrderStatus.CLOSED_MANUAL
])
)
if symbol:
query = query.filter(PaperOrder.symbol == symbol)
if start_date:
query = query.filter(PaperOrder.closed_at >= start_date)
if end_date:
query = query.filter(PaperOrder.closed_at <= end_date)
orders = query.all()
if not orders:
return self._empty_statistics()
# 计算各项指标
total_trades = len(orders)
winning_trades = len([o for o in orders if o.pnl_amount > 0])
losing_trades = len([o for o in orders if o.pnl_amount < 0])
total_pnl = sum(o.pnl_amount for o in orders)
total_pnl_percent = sum(o.pnl_percent for o in orders)
wins = [o.pnl_amount for o in orders if o.pnl_amount > 0]
losses = [abs(o.pnl_amount) for o in orders if o.pnl_amount < 0]
gross_profit = sum(wins) if wins else 0
gross_loss = sum(losses) if losses else 0
return {
'total_trades': total_trades,
'winning_trades': winning_trades,
'losing_trades': losing_trades,
'win_rate': round((winning_trades / total_trades * 100), 2) if total_trades > 0 else 0,
'total_pnl': round(total_pnl, 2),
'total_pnl_percent': round(total_pnl_percent, 2),
'average_pnl': round(total_pnl / total_trades, 2) if total_trades > 0 else 0,
'average_win': round(sum(wins) / len(wins), 2) if wins else 0,
'average_loss': round(sum(losses) / len(losses), 2) if losses else 0,
'profit_factor': round(gross_profit / gross_loss, 2) if gross_loss > 0 else float('inf'),
'max_drawdown': min(o.max_drawdown for o in orders) if orders else 0,
'best_trade': max(o.pnl_percent for o in orders) if orders else 0,
'worst_trade': min(o.pnl_percent for o in orders) if orders else 0,
'by_grade': self._calculate_grade_statistics(orders),
'by_type': self._calculate_type_statistics(orders),
'by_symbol': self._calculate_symbol_statistics(orders)
}
finally:
db.close()
def _empty_statistics(self) -> Dict[str, Any]:
"""返回空统计结构"""
return {
'total_trades': 0,
'winning_trades': 0,
'losing_trades': 0,
'win_rate': 0,
'total_pnl': 0,
'total_pnl_percent': 0,
'average_pnl': 0,
'average_win': 0,
'average_loss': 0,
'profit_factor': 0,
'max_drawdown': 0,
'best_trade': 0,
'worst_trade': 0,
'by_grade': {},
'by_type': {},
'by_symbol': {}
}
def get_account_status(self) -> Dict[str, Any]:
"""
获取账户状态
Returns:
账户状态信息,包括余额、已用保证金、可用保证金等
"""
# 计算已用保证金(每个活跃订单占用固定保证金)
active_count = len(self.active_orders)
used_margin = active_count * self.margin_per_order
# 计算已实现盈亏(从历史订单)
db = db_service.get_session()
try:
closed_orders = db.query(PaperOrder).filter(
PaperOrder.status.in_([
OrderStatus.CLOSED_TP,
OrderStatus.CLOSED_SL,
OrderStatus.CLOSED_BE,
OrderStatus.CLOSED_MANUAL
])
).all()
realized_pnl = sum(o.pnl_amount for o in closed_orders)
finally:
db.close()
# 计算当前余额
current_balance = self.initial_balance + realized_pnl
# 计算可用保证金
available_margin = current_balance - used_margin
# 计算可开仓数
available_orders = self.max_orders - active_count
return {
'initial_balance': self.initial_balance,
'realized_pnl': round(realized_pnl, 2),
'current_balance': round(current_balance, 2),
'used_margin': round(used_margin, 2),
'available_margin': round(available_margin, 2),
'leverage': self.leverage,
'margin_per_order': self.margin_per_order,
'active_orders': active_count,
'max_orders': self.max_orders,
'available_orders': available_orders,
'total_position_value': round(used_margin * self.leverage, 2),
'margin_ratio': round((used_margin / current_balance * 100), 2) if current_balance > 0 else 0
}
def _calculate_grade_statistics(self, orders: List[PaperOrder]) -> Dict[str, Any]:
"""按信号等级统计"""
result = {}
for grade in ['A', 'B', 'C', 'D']:
grade_orders = [o for o in orders if o.signal_grade and o.signal_grade.value == grade]
if grade_orders:
wins = len([o for o in grade_orders if o.pnl_amount > 0])
result[grade] = {
'count': len(grade_orders),
'win_rate': round(wins / len(grade_orders) * 100, 1),
'total_pnl': round(sum(o.pnl_amount for o in grade_orders), 2)
}
return result
def _calculate_type_statistics(self, orders: List[PaperOrder]) -> Dict[str, Any]:
"""按信号类型统计"""
result = {}
for signal_type in ['swing', 'short_term']:
type_orders = [o for o in orders if o.signal_type == signal_type]
if type_orders:
wins = len([o for o in type_orders if o.pnl_amount > 0])
result[signal_type] = {
'count': len(type_orders),
'win_rate': round(wins / len(type_orders) * 100, 1),
'total_pnl': round(sum(o.pnl_amount for o in type_orders), 2)
}
return result
def _calculate_symbol_statistics(self, orders: List[PaperOrder]) -> Dict[str, Any]:
"""按交易对统计"""
result = {}
symbols = set(o.symbol for o in orders)
for symbol in symbols:
symbol_orders = [o for o in orders if o.symbol == symbol]
if symbol_orders:
wins = len([o for o in symbol_orders if o.pnl_amount > 0])
result[symbol] = {
'count': len(symbol_orders),
'win_rate': round(wins / len(symbol_orders) * 100, 1),
'total_pnl': round(sum(o.pnl_amount for o in symbol_orders), 2)
}
return result
def get_period_statistics(self, hours: int = 4) -> Dict[str, Any]:
"""
获取指定时间段内的统计数据
Args:
hours: 统计时间段(小时)
Returns:
时间段内的统计数据
"""
db = db_service.get_session()
try:
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
# 查询时间段内平仓的订单
closed_orders = db.query(PaperOrder).filter(
PaperOrder.status.in_([
OrderStatus.CLOSED_TP,
OrderStatus.CLOSED_SL,
OrderStatus.CLOSED_BE,
OrderStatus.CLOSED_MANUAL
]),
PaperOrder.closed_at >= cutoff_time
).all()
# 查询时间段内新开仓的订单(包括当前活跃的)
new_orders = db.query(PaperOrder).filter(
PaperOrder.created_at >= cutoff_time
).all()
# 计算时间段内的盈亏
period_pnl = sum(o.pnl_amount for o in closed_orders)
period_wins = len([o for o in closed_orders if o.pnl_amount > 0])
period_losses = len([o for o in closed_orders if o.pnl_amount < 0])
return {
'period_hours': hours,
'new_orders': len(new_orders),
'closed_orders': len(closed_orders),
'period_pnl': round(period_pnl, 2),
'period_wins': period_wins,
'period_losses': period_losses,
'period_win_rate': round(period_wins / len(closed_orders) * 100, 1) if closed_orders else 0
}
finally:
db.close()
def generate_report(self, hours: int = 4) -> str:
"""
生成模拟交易报告
Args:
hours: 报告时间段(小时)
Returns:
格式化的报告文本
"""
# 获取总体统计
total_stats = self.calculate_statistics()
# 获取时间段统计
period_stats = self.get_period_statistics(hours)
# 获取当前活跃订单
active_orders = self.get_active_orders()
# 构建报告
lines = [
f"📊 模拟交易 {hours} 小时报告",
"",
"━━━━━━ 总体情况 ━━━━━━",
f"总交易数: {total_stats['total_trades']} | 胜率: {total_stats['win_rate']}%",
f"总盈亏: ${total_stats['total_pnl']:+.2f}",
"",
f"━━━━━━ 过去 {hours} 小时 ━━━━━━",
f"新订单: {period_stats['new_orders']} | 已平仓: {period_stats['closed_orders']}",
f"本期盈亏: ${period_stats['period_pnl']:+.2f}",
]
# 当前持仓
open_orders = [o for o in active_orders if o.get('status') == 'open']
pending_orders = [o for o in active_orders if o.get('status') == 'pending']
if open_orders or pending_orders:
lines.append("")
lines.append("━━━━━━ 当前订单 ━━━━━━")
for order in open_orders[:5]: # 最多显示5个
side_text = "做多" if order.get('side') == 'long' else "做空"
entry_price = order.get('filled_price') or order.get('entry_price', 0)
lines.append(f"✅ {order.get('symbol')} {side_text} @ ${entry_price:,.0f}")
for order in pending_orders[:3]: # 最多显示3个挂单
side_text = "做多" if order.get('side') == 'long' else "做空"
lines.append(f"⏳ {order.get('symbol')} {side_text} 挂单 @ ${order.get('entry_price', 0):,.0f}")
if len(open_orders) > 5:
lines.append(f"... 还有 {len(open_orders) - 5} 个持仓")
if len(pending_orders) > 3:
lines.append(f"... 还有 {len(pending_orders) - 3} 个挂单")
# 按等级统计
by_grade = total_stats.get('by_grade', {})
if by_grade:
lines.append("")
lines.append("━━━━━━ 按等级统计 ━━━━━━")
for grade in ['A', 'B', 'C']:
if grade in by_grade:
g = by_grade[grade]
pnl_sign = "+" if g['total_pnl'] >= 0 else ""
lines.append(f"{grade}级: {g['count']}笔 | 胜率{g['win_rate']}% | {pnl_sign}${g['total_pnl']:.0f}")
lines.append("")
lines.append(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
return "\n".join(lines)
def reset_all_data(self) -> Dict[str, Any]:
"""
重置所有模拟交易数据
Returns:
重置结果,包含删除的订单数量
"""
db = db_service.get_session()
try:
# 统计删除前的数量
total_count = db.query(PaperOrder).count()
active_count = len(self.active_orders)
# 删除所有订单(包括活跃和历史订单)
deleted = db.query(PaperOrder).delete(synchronize_session='fetch')
db.commit()
# 清空内存缓存
self.active_orders.clear()
logger.info(f"模拟交易数据已重置,删除 {deleted} 条订单(总计 {total_count} 条)")
return {
'deleted_count': deleted,
'active_orders_cleared': active_count
}
except Exception as e:
db.rollback()
logger.error(f"重置模拟交易数据失败: {e}")
raise
finally:
db.close()
# 全局单例
_paper_trading_service: Optional[PaperTradingService] = None
def get_paper_trading_service() -> PaperTradingService:
"""获取模拟交易服务单例"""
global _paper_trading_service
if _paper_trading_service is None:
_paper_trading_service = PaperTradingService()
return _paper_trading_service