修复移动止损的 bug

This commit is contained in:
aaron 2026-02-23 21:53:03 +08:00
parent b95040a282
commit 4d3472758e
9 changed files with 884 additions and 793 deletions

View File

@ -62,7 +62,8 @@ class BitgetTradingAPI:
# ==================== 订单操作 ==================== # ==================== 订单操作 ====================
def place_order(self, symbol: str, side: str, order_type: str, def place_order(self, symbol: str, side: str, order_type: str,
size: float, price: float = None, client_order_id: str = None) -> Optional[Dict]: size: float, price: float = None, client_order_id: str = None,
stop_loss: float = None, take_profit: float = None) -> Optional[Dict]:
""" """
下单 下单
@ -73,6 +74,8 @@ class BitgetTradingAPI:
size: 数量张数 size: 数量张数
price: 价格限价单必需 price: 价格限价单必需
client_order_id: 自定义订单ID client_order_id: 自定义订单ID
stop_loss: 止损价格可选
take_profit: 止盈价格可选
Returns: Returns:
订单信息 订单信息
@ -82,31 +85,34 @@ class BitgetTradingAPI:
ccxt_symbol = self._standardize_symbol(symbol) ccxt_symbol = self._standardize_symbol(symbol)
# 构建订单参数 # 构建订单参数
params = {} # 单向持仓模式 + 联合保证金模式
params = {
'tdMode': 'cross', # 联合保证金模式(全仓)
'marginCoin': 'USDT', # 保证金币种
}
if client_order_id: if client_order_id:
params['clientOrderId'] = client_order_id params['clientOrderId'] = client_order_id
# 下单 # 添加止损止盈参数
if order_type == 'market': if stop_loss:
order = self.exchange.create_market_order( params['stopLoss'] = str(stop_loss)
symbol=ccxt_symbol, if take_profit:
side=side, params['takeProfit'] = str(take_profit)
amount=size,
params=params # 下单 - 直接使用通用 create_order 方法
) order = self.exchange.create_order(
else: # limit symbol=ccxt_symbol,
if not price: type=order_type,
logger.error("限价单必须指定价格") side=side,
return None amount=size,
order = self.exchange.create_limit_order( price=price,
symbol=ccxt_symbol, params=params
side=side, )
amount=size,
price=price,
params=params
)
logger.info(f"✅ 下单成功: {symbol} {side} {size}张 @ {price or '市价'}") logger.info(f"✅ 下单成功: {symbol} {side} {size}张 @ {price or '市价'}")
if stop_loss or take_profit:
logger.info(f" 止损: {stop_loss}, 止盈: {take_profit}")
logger.debug(f"订单详情: {order}") logger.debug(f"订单详情: {order}")
return order return order
@ -174,14 +180,17 @@ class BitgetTradingAPI:
logger.error(f"❌ 撤销所有挂单异常: {e}") logger.error(f"❌ 撤销所有挂单异常: {e}")
return False return False
def close_position(self, symbol: str, side: str, size: float = None, def close_position(self, symbol: str, side: str = None, size: float = None,
price: float = None) -> Optional[Dict]: price: float = None) -> Optional[Dict]:
""" """
平仓 平仓双向持仓模式
Args: Args:
symbol: 交易对 symbol: 交易对
side: 平仓方向 (buy=平空仓/sell=平多仓) side: 平仓方向可选不传则自动判断
- None: 自动判断查询持仓后决定
- 'buy': 平空仓
- 'sell': 平多仓
size: 平仓数量不传则全部平仓 size: 平仓数量不传则全部平仓
price: 平仓价格不传则市价 price: 平仓价格不传则市价
@ -197,16 +206,31 @@ class BitgetTradingAPI:
logger.warning(f"没有找到 {symbol} 的持仓") logger.warning(f"没有找到 {symbol} 的持仓")
return None return None
position = positions[0] # 查找有持仓的仓位
current_size = abs(float(position.get('contracts', 0))) position = None
for pos in positions:
contracts = float(pos.get('contracts', 0))
if contracts != 0:
position = pos
break
if current_size == 0: if not position:
logger.warning(f"{symbol} 持仓数量为 0无需平仓") logger.warning(f"{symbol} 持仓数量为 0无需平仓")
return None return None
current_size = abs(float(position.get('contracts', 0)))
pos_side = position.get('side') # 'long' or 'short'
# 如果没有指定平仓方向,根据持仓方向自动判断
if side is None:
# 多仓用 sell 平,空仓用 buy 平
side = 'sell' if pos_side == 'long' else 'buy'
# 如果没有指定平仓数量,则全部平仓 # 如果没有指定平仓数量,则全部平仓
close_size = size if size else current_size close_size = size if size else current_size
logger.info(f"平仓: {symbol} 持仓方向={pos_side}, 平仓方向={side}, 数量={close_size}")
# 执行平仓 # 执行平仓
order_type = 'limit' if price else 'market' order_type = 'limit' if price else 'market'
order = self.place_order( order = self.place_order(
@ -227,6 +251,183 @@ class BitgetTradingAPI:
logger.error(f"❌ 平仓异常: {e}") logger.error(f"❌ 平仓异常: {e}")
return None return None
def set_trailing_stop(self, symbol: str, callback_rate: float = None,
activation_price: float = None) -> bool:
"""
设置移动止损
Args:
symbol: 交易对
callback_rate: 回调比例 0.01 表示 1%
activation_price: 激活价格
Returns:
是否成功
"""
try:
ccxt_symbol = self._standardize_symbol(symbol)
# 获取当前持仓
positions = self.get_position(symbol)
if not positions:
logger.warning(f"没有找到 {symbol} 的持仓")
return False
position = positions[0]
current_size = float(position.get('contracts', 0))
if current_size == 0:
logger.warning(f"{symbol} 持仓数量为 0")
return False
# 使用 CCXT 的私人 API 设置移动止损
# Bitget 需要通过私人API调用
params = {
'symbol': ccxt_symbol,
'trailingStopCallbackRate': callback_rate,
}
if activation_price:
params['trailingStopActivationPrice'] = activation_price
self.exchange.private_mix_post_modify_contract_trailing_stop(params)
logger.info(f"✅ 设置移动止损成功: {symbol} callback={callback_rate}")
return True
except ccxt.BaseError as e:
logger.error(f"❌ 设置移动止损失败: {e}")
return False
except Exception as e:
logger.error(f"❌ 设置移动止损异常: {e}")
return False
def modify_sl_tp(self, symbol: str, stop_loss: float = None,
take_profit: float = None) -> bool:
"""
修改持仓的止损止盈
注意Bitget 的止损止盈需要在开仓时设置或者通过下独立的止损/止盈计划订单
对于单向持仓模式我们使用计划订单来实现
Args:
symbol: 交易对
stop_loss: 止损价格
take_profit: 止盈价格
Returns:
是否成功
"""
try:
ccxt_symbol = self._standardize_symbol(symbol)
# 获取当前持仓
positions = self.get_position(symbol)
if not positions:
logger.warning(f"没有找到 {symbol} 的持仓")
return False
# 查找有持仓的仓位
position = None
for pos in positions:
if float(pos.get('contracts', 0)) != 0:
position = pos
break
if not position:
logger.warning(f"{symbol} 持仓数量为 0")
return False
contracts = float(position.get('contracts', 0))
pos_side = position.get('side')
mark_price = float(position.get('markPrice', 0))
logger.info(f"当前持仓: {symbol} {pos_side} {contracts}张, 标记价={mark_price}")
# 验证价格
if pos_side == 'long':
if stop_loss and stop_loss >= mark_price:
logger.error(f"做多止损必须低于当前价格: SL={stop_loss}, Mark={mark_price}")
return False
if take_profit and take_profit <= mark_price:
logger.error(f"做多止盈必须高于当前价格: TP={take_profit}, Mark={mark_price}")
return False
else:
if stop_loss and stop_loss <= mark_price:
logger.error(f"做空止损必须高于当前价格: SL={stop_loss}, Mark={mark_price}")
return False
if take_profit and take_profit >= mark_price:
logger.error(f"做空止盈必须低于当前价格: TP={take_profit}, Mark={mark_price}")
return False
# 使用独立的止损/止盈计划订单
# 注意:这种方式需要在平仓时也取消这些计划订单
orders_created = []
# 止损单
if stop_loss:
sl_side = 'sell' if pos_side == 'long' else 'buy'
try:
# 使用普通的 create_order 创建止损市价单
sl_order = self.exchange.create_order(
symbol=ccxt_symbol,
type='stop_market',
side=sl_side,
amount=contracts,
price=None,
params={
'stopPrice': stop_loss,
'triggerBy': 'mark_price',
'tdMode': 'cross',
'marginCoin': 'USDT',
'reduceOnly': True, # 只平仓
}
)
orders_created.append(('止损', sl_order))
logger.info(f"✅ 止损单已下: {sl_side} {contracts}张 @ ${stop_loss}")
except Exception as e:
logger.warning(f"下止损单失败: {e}")
# 止盈单
if take_profit:
tp_side = 'sell' if pos_side == 'long' else 'buy'
try:
# 使用普通的 create_order 创建止盈限价单
tp_order = self.exchange.create_order(
symbol=ccxt_symbol,
type='limit',
side=tp_side,
amount=contracts,
price=take_profit,
params={
'tdMode': 'cross',
'marginCoin': 'USDT',
'reduceOnly': True, # 只平仓
}
)
orders_created.append(('止盈', tp_order))
logger.info(f"✅ 止盈单已下: {tp_side} {contracts}张 @ ${take_profit}")
except Exception as e:
logger.warning(f"下止盈单失败: {e}")
if orders_created:
logger.info(f"✅ 止损止盈设置完成: SL={stop_loss}, TP={take_profit}")
logger.info(f" 注意: 止损止盈以独立订单形式存在,平仓时需要同时取消")
return True
else:
logger.warning(f"⚠️ 未能成功下任何止损止盈单")
return False
except ccxt.BaseError as e:
logger.error(f"❌ 修改止损止盈失败: {e}")
return False
except Exception as e:
logger.error(f"❌ 修改止损止盈异常: {e}")
import traceback
logger.debug(traceback.format_exc())
return False
# ==================== 查询操作 ==================== # ==================== 查询操作 ====================
def get_order(self, symbol: str, order_id: str = None, client_order_id: str = None) -> Optional[Dict]: def get_order(self, symbol: str, order_id: str = None, client_order_id: str = None) -> Optional[Dict]:

View File

@ -718,43 +718,36 @@ class PaperTradingService:
elif getattr(order, 'trailing_stop_triggered', 0) == 1: elif getattr(order, 'trailing_stop_triggered', 0) == 1:
# 已触发过移动止损,持续跟随 # 已触发过移动止损,持续跟随
base_profit = getattr(order, 'trailing_stop_base_profit', 0) # 做空:止损只能下移(降低价格),做多止损只能上移(提高价格)
additional_profit = current_pnl_percent - base_profit # 计算新的止损盈利 = 当前总盈利 × 跟随比例
new_stop_profit = current_pnl_percent * self._get_dynamic_ratio(order, current_price)
if additional_profit > 0: if order.side == OrderSide.LONG:
# 有额外盈利,移动止损以锁定更多利润 # 做多:新止损价 = 开仓价 × (1 + 新止损盈利%)
if order.side == OrderSide.LONG: new_stop_loss = order.filled_price * (1 + new_stop_profit / 100)
# 做多:当前止损对应的盈利
current_stop_profit = ((order.stop_loss - order.filled_price) / order.filled_price) * 100
# 新的止损盈利 = 基准 + 额外盈利 * 跟随比例
new_stop_profit = base_profit * self._get_dynamic_ratio(order, current_price)
new_stop_loss = order.filled_price * (1 + new_stop_profit / 100)
# 只有当新止损高于当前止损时才更新(做多止损只能上移) # 只有当新止损高于当前止损时才更新
if new_stop_loss > order.stop_loss: if new_stop_loss > order.stop_loss:
old_stop = order.stop_loss old_stop = order.stop_loss
order.stop_loss = new_stop_loss order.stop_loss = new_stop_loss
needs_update = True needs_update = True
stop_moved = True stop_moved = True
stop_move_type = "trailing_update" stop_move_type = "trailing_update"
logger.info(f"移动止损更新: {order.order_id} | {order.symbol} | " logger.info(f"移动止损更新: {order.order_id} | {order.symbol} | "
f"盈利 {current_pnl_percent:.2f}% | 止损 ${old_stop:,.2f} -> ${new_stop_loss:,.2f}") f"盈利 {current_pnl_percent:.2f}% | 锁定 {new_stop_profit:.2f}% | 止损 ${old_stop:,.2f} -> ${new_stop_loss:,.2f}")
else: else:
# 做空:当前止损对应的盈利 # 做空:新止损价 = 开仓价 × (1 - 新止损盈利%)
current_stop_profit = ((order.filled_price - order.stop_loss) / order.filled_price) * 100 new_stop_loss = order.filled_price * (1 - new_stop_profit / 100)
# 新的止损盈利 = 基准 + 额外盈利 * 跟随比例
new_stop_profit = base_profit * self._get_dynamic_ratio(order, current_price)
new_stop_loss = order.filled_price * (1 - new_stop_profit / 100)
# 只有当新止损低于当前止损时才更新(做空止损只能下移) # 只有当新止损低于当前止损时才更新
if new_stop_loss < order.stop_loss: if new_stop_loss < order.stop_loss:
old_stop = order.stop_loss old_stop = order.stop_loss
order.stop_loss = new_stop_loss order.stop_loss = new_stop_loss
needs_update = True needs_update = True
stop_moved = True stop_moved = True
stop_move_type = "trailing_update" stop_move_type = "trailing_update"
logger.info(f"移动止损更新: {order.order_id} | {order.symbol} | " logger.info(f"移动止损更新: {order.order_id} | {order.symbol} | "
f"盈利 {current_pnl_percent:.2f}% | 止损 ${old_stop:,.2f} -> ${new_stop_loss:,.2f}") f"盈利 {current_pnl_percent:.2f}% | 锁定 {new_stop_profit:.2f}% | 止损 ${old_stop:,.2f} -> ${new_stop_loss:,.2f}")
# === 保本止损逻辑(仅在未触发移动止损时生效) === # === 保本止损逻辑(仅在未触发移动止损时生效) ===
if self.breakeven_threshold > 0 and current_pnl_percent >= self.breakeven_threshold: if self.breakeven_threshold > 0 and current_pnl_percent >= self.breakeven_threshold:

View File

@ -1,147 +0,0 @@
#!/usr/bin/env python3
"""
测试 Bitget 集成功能
验证切换到 Bitget 后的各项功能是否正常
"""
import sys
import os
from pathlib import Path
# 添加项目路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "backend"))
from app.services.bitget_service import bitget_service
def test_basic_functions():
"""测试基本功能"""
print("\n" + "=" * 80)
print("测试 Bitget 基本功能")
print("=" * 80)
symbol = "BTCUSDT"
# 1. 测试获取当前价格
print(f"\n1. 获取当前价格 ({symbol})...")
price = bitget_service.get_current_price(symbol)
if price:
print(f" ✅ 当前价格: ${price:,.2f}")
else:
print(f" ❌ 获取失败")
return False
# 2. 测试获取 K线数据
print(f"\n2. 获取 K线数据 ({symbol} 5m)...")
klines = bitget_service.get_klines(symbol, '5m', limit=10)
if not klines.empty:
print(f" ✅ 获取 {len(klines)} 根 K线")
print(f" 最新: 开${klines.iloc[-1]['open']:,.2f} "
f"高${klines.iloc[-1]['high']:,.2f} "
f"低${klines.iloc[-1]['low']:,.2f} "
f"收${klines.iloc[-1]['close']:,.2f}")
else:
print(f" ❌ 获取失败")
return False
# 3. 测试获取多周期数据
print(f"\n3. 获取多周期数据 ({symbol})...")
multi_data = bitget_service.get_multi_timeframe_data(symbol)
if multi_data:
print(f" ✅ 获取成功")
for interval, df in multi_data.items():
if not df.empty:
print(f" {interval}: {len(df)}根, 最新价 ${df.iloc[-1]['close']:,.2f}")
else:
print(f" ❌ 获取失败")
return False
# 4. 测试获取资金费率
print(f"\n4. 获取资金费率 ({symbol})...")
funding = bitget_service.get_funding_rate(symbol)
if funding:
print(f" ✅ 资金费率: {funding['funding_rate_percent']:.4f}%")
print(f" 标记价格: ${funding['mark_price']:,.2f}")
print(f" 指数价格: ${funding['index_price']:,.2f}")
print(f" 市场情绪: {funding['sentiment']}")
else:
print(f" ❌ 获取失败")
return False
# 5. 测试获取合约市场数据
print(f"\n5. 获取合约市场数据 ({symbol})...")
futures_data = bitget_service.get_futures_market_data(symbol)
if futures_data:
print(f" ✅ 获取成功")
print(f" 溢价率: {futures_data['premium_rate']:.2f}%")
print(f" 市场情绪: {futures_data['market_sentiment']}")
else:
print(f" ❌ 获取失败")
return False
# 6. 测试格式化数据供 LLM
print(f"\n6. 格式化合约数据供 LLM...")
formatted = bitget_service.format_futures_data_for_llm(symbol, futures_data)
if formatted:
print(f" ✅ 格式化成功")
print(f" 预览:\n{formatted[:200]}...")
else:
print(f" ❌ 格式化失败")
return False
return True
def test_multiple_symbols():
"""测试多个交易对"""
print("\n" + "=" * 80)
print("测试多个交易对")
print("=" * 80)
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT"]
for symbol in symbols:
print(f"\n{symbol}:")
price = bitget_service.get_current_price(symbol)
if price:
print(f" ✅ ${price:,.2f}")
else:
print(f" ❌ 获取失败")
def main():
"""主函数"""
print("\n" + "🚀" * 40)
print("\nBitget 集成功能测试")
print(f"测试时间: {pd.Timestamp.now()}")
try:
# 基本功能测试
if not test_basic_functions():
print("\n❌ 基本功能测试失败")
return
# 多交易对测试
test_multiple_symbols()
print("\n" + "=" * 80)
print(" ✅ 所有测试通过!")
print("=" * 80)
print("\nBitget 已成功集成,可以正常使用!")
print("\n切换总结:")
print(" ✅ crypto_agent.py -> 使用 Bitget")
print(" ✅ llm_signal_analyzer.py -> 使用 Bitget")
print(" ✅ paper_trading_service.py -> 使用 Bitget")
print(" ✅ main.py -> 使用 Bitget")
print(" ✅ api/paper_trading.py -> 使用 Bitget")
except Exception as e:
print(f"\n❌ 测试出错: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
import pandas as pd
main()

View File

@ -1,170 +0,0 @@
"""
测试 Bitget CCXT SDK 实现
在运行之前请确保
1. 已安装 ccxt: pip install ccxt
2. 已在 .env 文件中配置 Bitget API 密钥
3. 使用测试网进行测试BITGET_USE_TESTNET=true
"""
import sys
import os
from pathlib import Path
# 添加项目路径
script_dir = Path(__file__).parent
project_root = script_dir.parent
backend_dir = project_root / "backend"
sys.path.insert(0, str(backend_dir))
sys.path.insert(0, str(project_root))
# 设置工作目录
os.chdir(project_root)
from app.services.bitget_trading_api_sdk import BitgetTradingAPI
from app.config import get_settings
from app.utils.logger import logger
def test_connection():
"""测试 API 连接"""
logger.info("=" * 60)
logger.info("测试 1: API 连接测试")
logger.info("=" * 60)
settings = get_settings()
api = BitgetTradingAPI(
api_key=settings.bitget_api_key,
api_secret=settings.bitget_api_secret,
passphrase=settings.bitget_passphrase,
use_testnet=settings.bitget_use_testnet
)
result = api.test_connection()
logger.info(f"连接测试结果: {'✅ 成功' if result else '❌ 失败'}")
return api if result else None
def test_get_balance(api):
"""测试查询余额"""
logger.info("\n" + "=" * 60)
logger.info("测试 2: 查询账户余额")
logger.info("=" * 60)
balance = api.get_balance()
if balance:
logger.info(f"✅ 余额查询成功")
for currency, info in balance.items():
available = info.get('available', '0')
frozen = info.get('frozen', '0')
logger.info(f" {currency}: 可用={available}, 冻结={frozen}")
else:
logger.error("❌ 余额查询失败")
def test_get_position(api):
"""测试查询持仓"""
logger.info("\n" + "=" * 60)
logger.info("测试 3: 查询持仓")
logger.info("=" * 60)
positions = api.get_position()
if positions:
logger.info(f"✅ 持仓查询成功,共 {len(positions)} 个持仓")
for pos in positions:
symbol = pos.get('symbol', 'N/A')
size = pos.get('contracts', 0)
side = pos.get('side', 'N/A')
unrealized_pnl = pos.get('unrealizedPnl', 0)
logger.info(f" {symbol} {side} {size}张 (未实现盈亏: {unrealized_pnl})")
else:
logger.info(" 当前无持仓")
def test_get_open_orders(api):
"""测试查询挂单"""
logger.info("\n" + "=" * 60)
logger.info("测试 4: 查询当前挂单")
logger.info("=" * 60)
orders = api.get_open_orders()
if orders:
logger.info(f"✅ 挂单查询成功,共 {len(orders)} 个挂单")
for order in orders[:5]: # 只显示前 5 个
symbol = order.get('symbol', 'N/A')
side = order.get('side', 'N/A')
price = order.get('price', 'N/A')
amount = order.get('amount', 'N/A')
logger.info(f" {symbol} {side} @ {price} x {amount}")
else:
logger.info(" 当前无挂单")
def test_market_ticker(api, symbol='BTC/USDT:USDT'):
"""测试获取市场行情"""
logger.info("\n" + "=" * 60)
logger.info(f"测试 5: 获取 {symbol} 市场行情")
logger.info("=" * 60)
try:
ticker = api.exchange.fetch_ticker(symbol)
if ticker:
logger.info(f"✅ 行情获取成功")
logger.info(f" 交易对: {symbol}")
logger.info(f" 最新价: {ticker.get('last', 'N/A')}")
logger.info(f" 24h涨跌: {ticker.get('percentage', 'N/A')}%")
logger.info(f" 24h成交量: {ticker.get('baseVolume', 'N/A')}")
except Exception as e:
logger.error(f"❌ 行情获取失败: {e}")
def main():
"""主测试函数"""
logger.info("🚀 开始测试 Bitget CCXT SDK 实现")
# 检查配置
settings = get_settings()
if not settings.bitget_api_key or not settings.bitget_api_secret:
logger.error("❌ 未配置 Bitget API 密钥,请在 .env 文件中设置:")
logger.error(" BITGET_API_KEY=your_api_key")
logger.error(" BITGET_API_SECRET=your_api_secret")
return
# 测试连接
api = test_connection()
if not api:
logger.error("❌ API 连接失败,请检查:")
logger.error(" 1. API 密钥是否正确")
logger.error(" 2. 网络连接是否正常")
logger.error(" 3. 测试网是否可用")
return
# 运行测试
try:
test_get_balance(api)
test_get_position(api)
test_get_open_orders(api)
test_market_ticker(api)
logger.info("\n" + "=" * 60)
logger.info("✅ 所有测试完成")
logger.info("=" * 60)
except Exception as e:
logger.error(f"❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
finally:
# 关闭连接
api.close()
logger.info("\n🔌 API 连接已关闭")
if __name__ == "__main__":
main()

View File

@ -1,133 +0,0 @@
"""
测试 Bitget V2 API
"""
import sys
sys.path.insert(0, '/Users/aaron/source_code/Stock_Agent')
from backend.app.services.bitget_trading_api import get_bitget_trading_api
from backend.app.utils.logger import logger
def test_bitget_v2():
"""测试 Bitget V2 API 各个功能"""
print("\n" + "="*60)
print("测试 Bitget V2 API")
print("="*60)
# 获取 API 实例
api = get_bitget_trading_api()
if not api:
print("❌ 无法初始化 Bitget API请检查 .env 配置)")
return False
print(f"\n📡 API 端点: {api.base_url}")
print(f"🔑 API Key: {api.api_key[:10]}...{api.api_key[-4:]}")
# 测试 1: 连接测试和账户余额
print("\n" + "-"*60)
print("测试 1: 账户余额查询")
print("-"*60)
try:
balance = api.get_balance()
if balance:
usdt = balance.get('USDT', {})
print(f"✅ USDT 可用余额: {usdt.get('available', '0')}")
print(f" 冻结: {usdt.get('frozen', '0')}")
print(f" 锁定: {usdt.get('locked', '0')}")
else:
print("⚠️ 余额查询返回空数据")
except Exception as e:
print(f"❌ 余额查询失败: {e}")
return False
# 测试 2: 账户信息
print("\n" + "-"*60)
print("测试 2: 账户信息查询")
print("-"*60)
try:
account_info = api.get_account_info()
if account_info:
print(f"✅ 账户信息: {list(account_info.keys())}")
else:
print("⚠️ 账户信息查询返回空数据")
except Exception as e:
print(f"❌ 账户信息查询失败: {e}")
# 测试 3: 查询持仓
print("\n" + "-"*60)
print("测试 3: 持仓查询")
print("-"*60)
try:
positions = api.get_position()
if positions:
print(f"✅ 当前持仓数: {len(positions)}")
for pos in positions:
symbol = pos.get('symbol', 'N/A')
hold_side = pos.get('holdSide', 'N/A')
total = pos.get('total', '0')
leverage = pos.get('leverage', 'N/A')
print(f" {symbol} {hold_side}: {total} 张 (杠杆 {leverage}x)")
else:
print("✅ 当前无持仓")
except Exception as e:
print(f"❌ 持仓查询失败: {e}")
return False
# 测试 4: 查询当前挂单
print("\n" + "-"*60)
print("测试 4: 当前挂单查询")
print("-"*60)
try:
open_orders = api.get_open_orders()
if open_orders:
print(f"✅ 当前挂单数: {len(open_orders)}")
for order in open_orders[:5]: # 只显示前5个
symbol = order.get('symbol', 'N/A')
side = order.get('side', 'N/A')
size = order.get('size', '0')
price = order.get('price', '市价')
print(f" {symbol} {side} {size} @ {price}")
if len(open_orders) > 5:
print(f" ... 还有 {len(open_orders) - 5} 个挂单")
else:
print("✅ 当前无挂单")
except Exception as e:
print(f"❌ 挂单查询失败: {e}")
return False
# 测试 5: 查询历史订单
print("\n" + "-"*60)
print("测试 5: 历史订单查询 (BTCUSDT)")
print("-"*60)
try:
history_orders = api.get_history_orders('BTCUSDT', limit=10)
if history_orders:
print(f"✅ 最近 {len(history_orders)} 条历史订单")
for order in history_orders[:3]: # 只显示前3个
order_id = order.get('orderId', 'N/A')
side = order.get('side', 'N/A')
order_type = order.get('orderType', 'N/A')
state = order.get('state', 'N/A')
print(f" 订单 {order_id}: {side} {order_type} - {state}")
else:
print("✅ 暂无历史订单")
except Exception as e:
print(f"❌ 历史订单查询失败: {e}")
print("\n" + "="*60)
print("✅ 所有核心功能测试完成V2 API 工作正常")
print("="*60 + "\n")
return True
if __name__ == "__main__":
success = test_bitget_v2()
sys.exit(0 if success else 1)

View File

@ -1,282 +0,0 @@
#!/usr/bin/env python3
"""
Bitget vs Binance 数据对比测试脚本
测试内容:
1. K线数据对比
2. 当前价格对比
3. 资金费率对比
4. 多周期数据对比
5. 技术指标计算对比
"""
import sys
import os
from pathlib import Path
# 添加项目路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "backend"))
import pandas as pd
from app.services.binance_service import binance_service
from app.services.bitget_service import bitget_service
def print_section(title: str):
"""打印分节标题"""
print("\n" + "=" * 80)
print(f" {title}")
print("=" * 80)
def print_comparison(label: str, binance_value, bitget_value, tolerance: float = 0.01):
"""打印对比结果"""
# 计算差异百分比
if binance_value and bitget_value:
if isinstance(binance_value, (int, float)) and isinstance(bitget_value, (int, float)):
diff_percent = abs(binance_value - bitget_value) / binance_value * 100 if binance_value != 0 else 0
match = "" if diff_percent <= tolerance else ""
print(f"{match} {label}")
print(f" Binance: {binance_value}")
print(f" Bitget: {bitget_value}")
print(f" 差异: {diff_percent:.2f}%")
else:
print(f"{label}")
print(f" Binance: {binance_value}")
print(f" Bitget: {bitget_value}")
else:
print(f"{label}")
print(f" Binance: {binance_value}")
print(f" Bitget: {bitget_value}")
def test_kline_data(symbol: str = "BTCUSDT"):
"""测试 K线数据"""
print_section("1. K线数据对比")
interval = '5m'
limit = 100
print(f"\n获取 {symbol} {interval} K线数据{limit}根)...")
# 获取数据
binance_df = binance_service.get_klines(symbol, interval, limit)
bitget_df = bitget_service.get_klines(symbol, interval, limit)
if binance_df.empty:
print("❌ Binance 数据为空")
return
if bitget_df.empty:
print("❌ Bitget 数据为空")
return
print(f"\n✅ Binance 获取 {len(binance_df)} 根 K线")
print(f"✅ Bitget 获取 {len(bitget_df)} 根 K线")
# 对比最新一根K线
print("\n最新K线对比:")
b_latest = binance_df.iloc[-1]
g_latest = bitget_df.iloc[-1]
print_comparison("开盘价", float(b_latest['open']), float(g_latest['open']), tolerance=0.1)
print_comparison("最高价", float(b_latest['high']), float(g_latest['high']), tolerance=0.1)
print_comparison("最低价", float(b_latest['low']), float(g_latest['low']), tolerance=0.1)
print_comparison("收盘价", float(b_latest['close']), float(g_latest['close']), tolerance=0.1)
print_comparison("成交量", float(b_latest['volume']), float(g_latest['volume']), tolerance=1.0)
# 对比时间戳
print(f"\n时间对比:")
print(f" Binance: {b_latest['open_time']}")
print(f" Bitget: {g_latest['open_time']}")
# 检查数据结构
print("\n数据结构对比:")
print(f" Binance 列: {list(binance_df.columns)}")
print(f" Bitget 列: {list(bitget_df.columns)}")
def test_current_price(symbol: str = "BTCUSDT"):
"""测试当前价格"""
print_section("2. 当前价格对比")
print(f"\n获取 {symbol} 当前价格...")
binance_price = binance_service.get_current_price(symbol)
bitget_price = bitget_service.get_current_price(symbol)
print_comparison("当前价格", binance_price, bitget_price, tolerance=0.05)
def test_funding_rate(symbol: str = "BTCUSDT"):
"""测试资金费率"""
print_section("3. 资金费率对比")
print(f"\n获取 {symbol} 资金费率...")
binance_fr = binance_service.get_funding_rate(symbol)
bitget_fr = bitget_service.get_funding_rate(symbol)
if not binance_fr:
print("❌ Binance 资金费率数据为空")
return
if not bitget_fr:
print("❌ Bitget 资金费率数据为空")
return
print("\n资金费率对比:")
# 对比资金费率
fr_diff = abs(binance_fr['funding_rate'] - bitget_fr['funding_rate'])
fr_match = "" if fr_diff < 0.0001 else ""
print(f"{fr_match} 资金费率")
print(f" Binance: {binance_fr['funding_rate']:.6f} ({binance_fr['funding_rate_percent']:.4f}%)")
print(f" Bitget: {bitget_fr['funding_rate']:.6f} ({bitget_fr['funding_rate_percent']:.4f}%)")
print(f" 差异: {fr_diff:.6f}")
# 对比标记价格
print_comparison("标记价格", binance_fr.get('mark_price'), bitget_fr.get('mark_price'), tolerance=0.1)
# 对比指数价格
print_comparison("指数价格", binance_fr.get('index_price'), bitget_fr.get('index_price'), tolerance=0.1)
# 对比市场情绪
print(f"\n市场情绪:")
print(f" Binance: {binance_fr.get('sentiment', 'N/A')}")
print(f" Bitget: {bitget_fr.get('sentiment', 'N/A')}")
def test_multi_timeframe(symbol: str = "BTCUSDT"):
"""测试多周期数据"""
print_section("4. 多周期数据对比")
print(f"\n获取 {symbol} 多周期数据...")
binance_data = binance_service.get_multi_timeframe_data(symbol)
bitget_data = bitget_service.get_multi_timeframe_data(symbol)
intervals = ['5m', '15m', '1h', '4h']
print("\n各周期数据量:")
for interval in intervals:
b_count = len(binance_data.get(interval, []))
g_count = len(bitget_data.get(interval, []))
print(f" {interval}: Binance {b_count}根, Bitget {g_count}")
# 对比最新价格
print("\n各周期最新价格:")
for interval in intervals:
b_df = binance_data.get(interval, pd.DataFrame())
g_df = bitget_data.get(interval, pd.DataFrame())
if not b_df.empty and not g_df.empty:
b_price = float(b_df.iloc[-1]['close'])
g_price = float(g_df.iloc[-1]['close'])
print(f" {interval}: Binance ${b_price:,.2f}, Bitget ${g_price:,.2f}")
def test_technical_indicators(symbol: str = "BTCUSDT"):
"""测试技术指标计算"""
print_section("5. 技术指标计算对比")
print(f"\n获取 {symbol} 1h K线并计算指标...")
binance_df = binance_service.get_klines(symbol, '1h', 100)
bitget_df = bitget_service.get_klines(symbol, '1h', 100)
if binance_df.empty or bitget_df.empty:
print("❌ K线数据为空")
return
# 计算指标
binance_df = binance_service.calculate_indicators(binance_df, '1h')
bitget_df = bitget_service.calculate_indicators(bitget_df, '1h')
# 对比最新的指标值
print("\n最新技术指标对比:")
b_latest = binance_df.iloc[-1]
g_latest = bitget_df.iloc[-1]
# RSI
print_comparison("RSI(14)", b_latest['rsi'], g_latest['rsi'], tolerance=5.0)
# MACD
print_comparison("MACD", b_latest['macd'], g_latest['macd'], tolerance=10.0)
# 布林带
print_comparison("布林带上轨", b_latest['bb_upper'], g_latest['bb_upper'], tolerance=0.5)
print_comparison("布林带中轨", b_latest['bb_middle'], g_latest['bb_middle'], tolerance=0.5)
print_comparison("布林带下轨", b_latest['bb_lower'], g_latest['bb_lower'], tolerance=0.5)
# 移动平均线
print_comparison("MA5", b_latest['ma5'], g_latest['ma5'], tolerance=0.2)
print_comparison("MA10", b_latest['ma10'], g_latest['ma10'], tolerance=0.2)
print_comparison("MA20", b_latest['ma20'], g_latest['ma20'], tolerance=0.2)
def test_ticker(symbol: str = "BTCUSDT"):
"""测试 ticker 数据"""
print_section("6. Ticker 数据对比")
print(f"\n获取 {symbol} ticker 数据...")
binance_stats = binance_service.get_24h_stats(symbol)
bitget_ticker = bitget_service.get_ticker(symbol)
if not binance_stats:
print("❌ Binance ticker 数据为空")
return
if not bitget_ticker:
print("❌ Bitget ticker 数据为空")
return
print("\n24h 统计对比:")
print_comparison("最新价", binance_stats['price'], float(bitget_ticker['lastPrice']), tolerance=0.1)
print_comparison("24h最高", binance_stats['high'], float(bitget_ticker['highPrice24h']), tolerance=0.5)
print_comparison("24h最低", binance_stats['low'], float(bitget_ticker['lowPrice24h']), tolerance=0.5)
print_comparison("24h成交量", binance_stats['volume'], float(bitget_ticker['volume24h']), tolerance=5.0)
def main():
"""主函数"""
print("\n" + "🚀" * 40)
print("\nBitget vs Binance 数据对比测试")
print(f"测试时间: {pd.Timestamp.now()}")
# 测试交易对
test_symbol = "BTCUSDT"
try:
# 1. K线数据
test_kline_data(test_symbol)
# 2. 当前价格
test_current_price(test_symbol)
# 3. 资金费率
test_funding_rate(test_symbol)
# 4. 多周期数据
test_multi_timeframe(test_symbol)
# 5. 技术指标
test_technical_indicators(test_symbol)
# 6. Ticker 数据
test_ticker(test_symbol)
print("\n" + "=" * 80)
print(" ✅ 所有测试完成!")
print("=" * 80 + "\n")
except Exception as e:
print(f"\n❌ 测试出错: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
测试使用 modify-order 修改止损止盈
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'backend'))
import time
import uuid
from app.services.bitget_trading_api_sdk import get_bitget_trading_api
def print_section(title):
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60)
def main():
print("\n" + "🧪" * 30)
print(" 测试 modify-order 修改止损止盈")
print("🧪" * 30)
api = get_bitget_trading_api()
exchange = api.exchange
# 检查是否有 modify order 相关的方法
print("\n检查 modify order 方法:")
methods = [m for m in dir(exchange) if 'modify' in m.lower() and 'order' in m.lower()]
for m in methods:
print(f" {m}")
# 1. 开仓
print_section("1. 开仓")
order = api.place_order(
symbol="BTC/USDT:USDT",
side='buy',
order_type='market',
size=0.0001,
client_order_id=f"test_{uuid.uuid4().hex[:8]}"
)
if not order:
print("❌ 开仓失败")
return
order_id = order.get('id')
print(f"✅ 开仓成功! 订单ID: {order_id}")
time.sleep(2)
# 2. 查询持仓
print_section("2. 查询持仓")
positions = api.get_position("BTC/USDT:USDT")
position = None
for pos in positions:
if float(pos.get('contracts', 0)) > 0:
position = pos
break
if not position:
print("❌ 无持仓")
return
mark_price = float(position.get('markPrice'))
print(f"持仓: {position.get('contracts')} 张, 标记价: ${mark_price:,.2f}")
# 3. 尝试使用 edit_order 修改(添加止损止盈)
print_section("3. 使用 edit_order 添加止损止盈")
stop_loss = mark_price * 0.98
take_profit = mark_price * 1.03
print(f"目标止损: ${stop_loss:,.2f}")
print(f"目标止盈: ${take_profit:,.2f}")
try:
# 尝试使用 CCXT 的 edit_order 方法
# 注意可能需要传入订单ID和新参数
result = exchange.edit_order(
id=order_id,
symbol="BTC/USDT:USDT",
type='market', # 原订单类型
side='buy', # 原订单方向
amount=0.0001, # 原订单数量
params={
'stopLoss': str(stop_loss),
'takeProfit': str(take_profit),
'tdMode': 'cross',
'marginCoin': 'USDT',
}
)
print(f"✅ edit_order 成功: {result}")
except Exception as e:
print(f"❌ edit_order 失败: {e}")
# 4. 平仓
print_section("4. 平仓")
api.close_position("BTC/USDT:USDT")
print("✅ 已平仓")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"\n\n❌ 测试出错: {e}")
import traceback
traceback.print_exc()

View File

@ -0,0 +1,324 @@
"""
实盘交易完整流程测试脚本
测试内容
1. 获取账户余额
2. 获取当前持仓
3. 下测试单极小金额
4. 查询订单状态
5. 修改止损止盈
6. 平仓
风险控制
- 使用极小金额测试5-10 USDT
- 测试完成后自动平仓
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'backend'))
import asyncio
import time
from app.services.bitget_trading_api_sdk import get_bitget_trading_api
from app.services.real_trading_service import get_real_trading_service
from app.utils.logger import logger
def print_section(title):
"""打印分隔线"""
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60)
def test_account_info(trading_api):
"""测试1: 获取账户信息"""
print_section("1. 获取账户信息")
try:
balance = trading_api.get_balance()
print(f"✅ 账户余额:")
for currency, info in balance.items():
if isinstance(info, dict):
available = info.get('available', 0)
frozen = info.get('frozen', 0)
locked = info.get('locked', 0)
if float(available) > 0 or float(frozen) > 0:
print(f" {currency}: 可用={float(available):.2f}, 冻结={float(frozen):.2f}, 锁定={float(locked):.2f}")
return True
except Exception as e:
print(f"❌ 获取账户信息失败: {e}")
return False
def test_get_positions(trading_api):
"""测试2: 获取当前持仓"""
print_section("2. 获取当前持仓")
try:
positions = trading_api.get_position()
print(f"✅ 当前持仓数量: {len(positions)}")
open_positions = [p for p in positions if float(p.get('contracts', 0)) != 0]
if open_positions:
print(f" 活跃持仓:")
for pos in open_positions:
symbol = pos.get('symbol')
side = pos.get('side')
contracts = pos.get('contracts')
entry_price = pos.get('entryPrice')
mark_price = pos.get('markPrice')
unrealized_pnl = pos.get('unrealizedPnl', 0)
print(f" {symbol} {side} | 数量: {contracts} | 入场: {entry_price} | 标记: {mark_price} | 浮盈: {unrealized_pnl}")
else:
print(f" 当前无持仓")
return True
except Exception as e:
print(f"❌ 获取持仓失败: {e}")
return False
def test_place_order(trading_api, symbol="BTCUSDT", side='buy', size=5):
"""测试3: 下测试单"""
print_section("3. 下测试单")
# 生成订单ID
import uuid
client_order_id = f"test_{uuid.uuid4().hex[:8]}"
print(f" 交易对: {symbol}")
print(f" 方向: {side}")
print(f" 数量: {size} USDT")
print(f" 客户端ID: {client_order_id}")
try:
# 先获取当前价格
ticker = trading_api.exchange.fetch_ticker(symbol)
current_price = ticker['last']
print(f" 当前价格: ${current_price:,.2f}")
# 下市价单
print(f" 📝 正在下单...")
result = trading_api.place_order(
symbol=symbol,
side=side,
order_type='market',
size=size,
client_order_id=client_order_id
)
if result:
order_id = result.get('id')
filled_price = result.get('average') or result.get('price')
print(f"✅ 下单成功!")
print(f" 订单ID: {order_id}")
print(f" 成交价格: ${filled_price:,.2f}")
print(f" 成交数量: {size} USDT")
# 等待一下让订单完全成交
time.sleep(2)
return order_id, symbol, side
else:
print(f"❌ 下单失败: 无返回结果")
return None
except Exception as e:
print(f"❌ 下单失败: {e}")
import traceback
print(traceback.format_exc())
return None
def test_get_order(trading_api, order_id, symbol):
"""测试4: 查询订单状态"""
print_section("4. 查询订单状态")
try:
order = trading_api.exchange.fetch_order(order_id, symbol)
print(f"✅ 订单状态: {order.get('status')}")
print(f" 订单ID: {order.get('id')}")
print(f" 价格: {order.get('price')}")
print(f" 已成交: {order.get('filled')}")
print(f" 剩余: {order.get('remaining')}")
return order
except Exception as e:
print(f"❌ 查询订单失败: {e}")
return None
def test_modify_sl_tp(trading_api, symbol, side='buy'):
"""测试5: 修改止损止盈"""
print_section("5. 修改止损止盈")
# 获取当前持仓
try:
positions = trading_api.get_position()
position = None
for pos in positions:
if pos.get('symbol') == symbol and float(pos.get('contracts', 0)) != 0:
position = pos
break
if not position:
print(f"❌ 未找到 {symbol} 的持仓")
return False
# 获取当前价格来设置合理的止损止盈
ticker = trading_api.exchange.fetch_ticker(symbol)
current_price = float(ticker['last'])
if side == 'buy':
stop_loss = current_price * 0.97 # 3% 止损
take_profit = current_price * 1.05 # 5% 止盈
else:
stop_loss = current_price * 1.03 # 3% 止损
take_profit = current_price * 0.95 # 5% 止盈
print(f" 当前价格: ${current_price:,.2f}")
print(f" 设置止损: ${stop_loss:,.2f}")
print(f" 设置止盈: ${take_profit:,.2f}")
# 使用交易所API修改止损止盈这需要根据具体交易所API实现
# 注意: Bitget 可能需要使用不同的API来修改止损止盈
print(f"⚠️ 注意: 修改止损止盈功能需要根据交易所具体API实现")
print(f"✅ 止损止盈参数已计算实际修改需要查看交易所API文档")
return True
except Exception as e:
print(f"❌ 修改止损止盈失败: {e}")
import traceback
print(traceback.format_exc())
return False
def test_close_position(trading_api, symbol, side):
"""测试6: 平仓"""
print_section("6. 平仓测试")
# 获取当前持仓
try:
positions = trading_api.get_position()
position = None
for pos in positions:
if pos.get('symbol') == symbol and float(pos.get('contracts', 0)) != 0:
position = pos
break
if not position:
print(f"❌ 未找到 {symbol} 的持仓,无法平仓")
return False
contracts = float(position.get('contracts', 0))
print(f" 持仓数量: {contracts}")
# 平仓方向与开仓相反
close_side = 'sell' if side == 'buy' else 'buy'
print(f" 平仓方向: {close_side}")
# 生成平仓订单ID
import uuid
client_order_id = f"close_{uuid.uuid4().hex[:8]}"
print(f" 📝 正在平仓...")
result = trading_api.place_order(
symbol=symbol,
side=close_side,
order_type='market',
size=contracts, # 平掉所有持仓
client_order_id=client_order_id,
reduce_only=True # 只平仓,不开新仓
)
if result:
print(f"✅ 平仓成功!")
print(f" 订单ID: {result.get('id')}")
return True
else:
print(f"❌ 平仓失败: 无返回结果")
return False
except Exception as e:
print(f"❌ 平仓失败: {e}")
import traceback
print(traceback.format_exc())
return False
def main():
"""主测试流程"""
print("\n" + "🚀" * 30)
print(" 实盘交易完整流程测试")
print("🚀" * 30)
print("\n⚠️ 警告: 此测试将使用真实资金但金额很小5-10 USDT")
print("⚠️ 请确保:")
print(" 1. Bitget API 配置正确")
print(" 2. 账户有足够的测试资金")
print(" 3. 使用测试网或小额资金")
# 确认开始测试
confirm = input("\n是否开始测试? (输入 'yes' 继续): ")
if confirm.lower() != 'yes':
print("测试已取消")
return
# 获取交易API
trading_api = get_bitget_trading_api()
if not trading_api:
print("❌ Bitget API 未初始化,请检查配置")
return
print(f"\n📡 交易所: {'Bitget 测试网' if trading_api.use_testnet else 'Bitget 正式网'}")
# 测试1: 获取账户信息
if not test_account_info(trading_api):
return
# 测试2: 获取当前持仓
if not test_get_positions(trading_api):
return
# 测试3: 下测试单
symbol = "BTCUSDT"
side = 'buy' # 做多
size = 5 # 5 USDT
order_result = test_place_order(trading_api, symbol, side, size)
if not order_result:
print("\n❌ 下单失败,终止测试")
return
order_id, _, _ = order_result
# 测试4: 查询订单状态
test_get_order(trading_api, order_id, symbol)
# 测试5: 修改止损止盈
test_modify_sl_tp(trading_api, symbol, side)
# 测试6: 平仓
time.sleep(2) # 等待一下
test_close_position(trading_api, symbol, side)
# 最终状态
print_section("测试完成")
print("✅ 所有测试已完成,请检查:")
print(" 1. Bitget 账户中的订单记录")
print(" 2. 余额变化")
print(" 3. 持仓情况")
# 再次获取持仓确认已平仓
test_get_positions(trading_api)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n⚠️ 测试被用户中断")
except Exception as e:
print(f"\n\n❌ 测试出错: {e}")
import traceback
print(traceback.format_exc())

View File

@ -0,0 +1,200 @@
"""
实盘交易简单测试 - 使用 API 直接测试
测试流程
1. 获取账户余额
2. 下一个小单
3. 查询订单
4. 平仓
"""
import requests
import json
import time
BASE_URL = "http://localhost:8000"
def print_section(title):
print("\n" + "=" * 50)
print(f" {title}")
print("=" * 50)
def test_get_account():
"""获取账户信息"""
print_section("1. 获取账户信息")
response = requests.get(f"{BASE_URL}/api/real-trading/account")
data = response.json()
if data.get('success'):
account = data['account']
print(f"✅ 账户余额: ${account['current_balance']:.2f}")
print(f" 可用: ${account['available']:.2f}")
print(f" 已用: ${account['used_margin']:.2f}")
print(f" 持仓价值: ${account['total_position_value']:.2f}")
return account
else:
print(f"❌ 获取账户信息失败: {data.get('message')}")
return None
def test_get_positions():
"""获取持仓"""
print_section("2. 获取当前持仓")
response = requests.get(f"{BASE_URL}/api/real-trading/positions")
data = response.json()
if data.get('success'):
positions = data['positions']
print(f"✅ 持仓数量: {len(positions)}")
for pos in positions:
if pos.get('holding', 0) > 0:
print(f" {pos['symbol']}: {pos['holding']} USDT")
return positions
else:
print(f"❌ 获取持仓失败: {data.get('message')}")
return []
def test_place_small_order():
"""下一个小测试单"""
print_section("3. 下测试单")
# 使用模拟交易API测试更安全
symbol = "BTCUSDT"
# 先获取当前价格
ticker_response = requests.get(f"{BASE_URL}/api/bitget/ticker?symbol={symbol}")
ticker_data = ticker_response.json()
if not ticker_data.get('success'):
print(f"❌ 获取价格失败")
return None
current_price = ticker_data['data'].get('last_price', 0)
print(f" 当前价格: ${current_price:,.2f}")
# 计算测试参数
stop_loss = current_price * 0.95
take_profit = current_price * 1.05
order_data = {
"symbol": symbol,
"action": "buy",
"entry_type": "market",
"entry_price": current_price,
"stop_loss": stop_loss,
"take_profit": take_profit,
"confidence": 60,
"signal_grade": "C",
"position_size": "light"
}
print(f" 测试参数:")
print(f" 止损: ${stop_loss:,.2f}")
print(f" 止盈: ${take_profit:,.2f}")
# 先用模拟交易测试
print(f"\n 📝 正在创建模拟交易订单...")
response = requests.post(f"{BASE_URL}/api/paper-trading/order", json=order_data)
data = response.json()
if data.get('success'):
order = data.get('order')
print(f"✅ 模拟订单创建成功!")
print(f" 订单ID: {order['order_id']}")
print(f" 数量: ${order['quantity']:.2f}")
return order
else:
print(f"❌ 下单失败: {data.get('message')}")
return None
def test_get_orders():
"""获取订单列表"""
print_section("4. 获取订单列表")
response = requests.get(f"{BASE_URL}/api/paper-trading/orders?status=active&limit=10")
data = response.json()
if data.get('success'):
orders = data['orders']
print(f"✅ 活跃订单数量: {len(orders)}")
for order in orders:
print(f" {order['symbol']} {order['side']} | "
f"状态: {order['status']} | "
f"数量: ${order['quantity']:.2f}")
return orders
else:
print(f"❌ 获取订单失败: {data.get('message')}")
return []
def test_close_order(order_id):
"""平仓"""
print_section("5. 平仓测试")
print(f" 正在平仓订单: {order_id}")
response = requests.post(f"{BASE_URL}/api/paper-trading/close", json={
"order_id": order_id,
"reason": "测试平仓"
})
data = response.json()
if data.get('success'):
print(f"✅ 平仓成功!")
print(f" 平仓价格: ${data.get('close_price', 0):,.2f}")
print(f" 盈亏: ${data.get('pnl', 0):.2f}")
return True
else:
print(f"❌ 平仓失败: {data.get('message')}")
return False
def main():
print("\n" + "🧪" * 25)
print(" 实盘交易流程测试 (模拟模式)")
print("🧪" * 25)
print("\n此测试使用模拟交易,不会动用真实资金")
input("\n按 Enter 开始测试...")
# 测试1: 获取账户
test_get_account()
# 测试2: 获取持仓
test_get_positions()
# 测试3: 下单
order = test_place_small_order()
if order:
time.sleep(1)
# 测试4: 查询订单
test_get_orders()
time.sleep(1)
# 测试5: 平仓
test_close_order(order['order_id'])
print_section("测试完成")
print("✅ 所有测试已完成")
print(" 请检查前端页面确认订单状态")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n⚠️ 测试被用户中断")
except Exception as e:
print(f"\n\n❌ 测试出错: {e}")
import traceback
traceback.print_exc()