stock-ai-agent/backend/app/api/paper_trading.py
2026-02-28 19:34:12 +08:00

479 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
模拟交易 API
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional
from datetime import datetime
from pydantic import BaseModel
from app.services.paper_trading_service import get_paper_trading_service
from app.services.price_monitor_service import get_price_monitor_service
from app.services.bitget_service import bitget_service
from app.services.db_service import db_service
from app.utils.logger import logger
router = APIRouter(prefix="/api/paper-trading", tags=["交易"])
class CloseOrderRequest(BaseModel):
"""手动平仓请求"""
exit_price: float
class DeleteOrdersRequest(BaseModel):
"""批量删除订单请求"""
order_ids: list[str]
recalculate: bool = True # 是否重新计算统计数据
class OrderResponse(BaseModel):
"""订单响应"""
success: bool
message: str
data: Optional[dict] = None
@router.get("/orders")
async def get_orders(
symbol: Optional[str] = Query(None, description="交易对筛选"),
status: Optional[str] = Query(None, description="状态筛选: active, closed"),
limit: int = Query(100, description="返回数量限制")
):
"""
获取订单列表
- symbol: 可选,按交易对筛选
- status: 可选active=活跃订单, closed=已平仓订单
- limit: 返回数量限制默认100
"""
try:
service = get_paper_trading_service()
if status == "active":
orders = service.get_active_orders(symbol)
elif status == "closed":
orders = service.get_order_history(symbol, limit)
else:
# 返回所有订单
active = service.get_active_orders(symbol)
history = service.get_order_history(symbol, limit)
orders = active + history
return {
"success": True,
"count": len(orders),
"orders": orders
}
except Exception as e:
logger.error(f"获取订单列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/orders/active")
async def get_active_orders(
symbol: Optional[str] = Query(None, description="交易对筛选")
):
"""获取活跃订单"""
try:
service = get_paper_trading_service()
orders = service.get_active_orders(symbol)
return {
"success": True,
"count": len(orders),
"orders": orders
}
except Exception as e:
logger.error(f"获取活跃订单失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/orders/{order_id}")
async def get_order(order_id: str):
"""获取订单详情"""
try:
service = get_paper_trading_service()
order = service.get_order_by_id(order_id)
if not order:
raise HTTPException(status_code=404, detail="订单不存在")
return {
"success": True,
"order": order
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取订单详情失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/orders/{order_id}/close")
async def close_order(order_id: str, request: CloseOrderRequest):
"""
手动平仓
- order_id: 订单ID
- exit_price: 平仓价格
"""
try:
service = get_paper_trading_service()
result = service.close_order_manual(order_id, request.exit_price)
if not result:
raise HTTPException(status_code=404, detail="订单不存在或已平仓")
return {
"success": True,
"message": "平仓成功",
"result": result
}
except HTTPException:
raise
except Exception as e:
logger.error(f"手动平仓失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/orders/{order_id}")
async def delete_order(
order_id: str,
recalculate: bool = Query(True, description="是否重新计算统计数据默认True")
):
"""
删除订单(支持历史订单和活跃订单)
删除订单后会重新计算账户统计数据,包括:
- 已实现盈亏总和
- 当前余额
- 胜率
- 最大回撤
- 收益率
参数:
- order_id: 订单ID
- recalculate: 是否重新计算统计数据,默认 True
返回:
- 订单删除前的信息
- 删除后的统计数据变化
"""
try:
service = get_paper_trading_service()
result = service.delete_order(order_id, recalculate=recalculate)
if not result:
raise HTTPException(status_code=404, detail="订单不存在")
return {
"success": True,
"message": result.get('message', '订单已删除'),
"deleted_order": {
"order_id": result['order_id'],
"symbol": result['symbol'],
"side": result['side'],
"status": result['status'],
"pnl_amount": result['pnl_amount'],
"was_active": result['was_active']
},
"statistics_update": result.get('statistics_change', {}),
"recalculated": result.get('recalculated', False)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"删除订单失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/orders/batch-delete")
async def batch_delete_orders(request: DeleteOrdersRequest):
"""
批量删除订单
可以一次性删除多个订单,最后统一重新计算统计数据。
请求体:
{
"order_ids": ["PT-BTCUSDT-20240101-abc123", "PT-ETHUSDT-20240101-def456"],
"recalculate": true // 是否重新计算统计数据,默认 true
}
返回:
- 成功删除的订单列表
- 失败的订单列表
- 统计数据变化(只在 recalculate=true 时返回)
"""
try:
service = get_paper_trading_service()
deleted_orders = []
failed_orders = []
# 先删除所有订单(不重新计算)
for order_id in request.order_ids:
result = service.delete_order(order_id, recalculate=False)
if result:
deleted_orders.append({
"order_id": order_id,
"symbol": result['symbol'],
"status": result['status'],
"pnl_amount": result['pnl_amount']
})
else:
failed_orders.append(order_id)
# 最后统一重新计算一次统计数据
statistics_update = {}
if request.recalculate and deleted_orders:
db = db_service.get_session()
try:
statistics_update = service._recalculate_account_statistics(db)
finally:
db.close()
return {
"success": True,
"message": f"成功删除 {len(deleted_orders)} 个订单",
"deleted_count": len(deleted_orders),
"failed_count": len(failed_orders),
"deleted_orders": deleted_orders,
"failed_orders": failed_orders,
"statistics_update": statistics_update if request.recalculate else {}
}
except Exception as e:
logger.error(f"批量删除订单失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/statistics")
async def get_statistics(
symbol: Optional[str] = Query(None, description="交易对筛选"),
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD)")
):
"""
获取交易统计
- symbol: 可选,按交易对筛选
- start_date: 可选,开始日期
- end_date: 可选,结束日期
"""
try:
service = get_paper_trading_service()
# 解析日期
start = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None
end = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None
stats = service.calculate_statistics(symbol, start, end)
return {
"success": True,
"statistics": stats
}
except ValueError as e:
raise HTTPException(status_code=400, detail=f"日期格式错误: {e}")
except Exception as e:
logger.error(f"获取统计数据失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/account")
async def get_account_status():
"""
获取账户状态
返回账户余额、已用保证金、可用保证金等信息
"""
try:
service = get_paper_trading_service()
account = service.get_account_status()
return {
"success": True,
"account": account
}
except Exception as e:
logger.error(f"获取账户状态失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/statistics/by-grade")
async def get_statistics_by_grade():
"""按信号等级获取统计"""
try:
service = get_paper_trading_service()
stats = service.calculate_statistics()
return {
"success": True,
"by_grade": stats.get("by_grade", {})
}
except Exception as e:
logger.error(f"获取等级统计失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/statistics/by-symbol")
async def get_statistics_by_symbol():
"""按交易对获取统计"""
try:
service = get_paper_trading_service()
stats = service.calculate_statistics()
return {
"success": True,
"by_symbol": stats.get("by_symbol", {})
}
except Exception as e:
logger.error(f"获取交易对统计失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/daily-returns")
async def get_daily_returns(
days: int = Query(30, description="获取最近多少天的数据", ge=1, le=365)
):
"""
获取每日收益率数据
- days: 获取最近多少天的数据默认30天最大365天
"""
try:
service = get_paper_trading_service()
daily_returns = service.get_daily_returns(days=days)
return {
"success": True,
"data": daily_returns,
"count": len(daily_returns)
}
except Exception as e:
logger.error(f"获取每日收益率失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/monitor/status")
async def get_monitor_status():
"""获取价格监控状态和实时价格"""
try:
from app.config import get_settings
monitor = get_price_monitor_service()
settings = get_settings()
# 始终显示配置的交易对价格
configured_symbols = settings.crypto_symbols.split(',')
# 获取价格 - 优先使用监控服务的缓存价格
latest_prices = dict(monitor.latest_prices)
# 获取所有配置的交易对价格
for symbol in configured_symbols:
symbol = symbol.strip()
if symbol not in latest_prices or latest_prices[symbol] is None:
price = bitget_service.get_current_price(symbol)
if price:
latest_prices[symbol] = price
return {
"success": True,
"running": True,
"subscribed_symbols": configured_symbols,
"latest_prices": latest_prices
}
except Exception as e:
logger.error(f"获取监控状态失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/reset")
async def reset_paper_trading():
"""
重置所有模拟交易数据
警告:此操作将删除所有订单记录,不可恢复!
"""
try:
service = get_paper_trading_service()
result = service.reset_all_data()
return {
"success": True,
"message": f"交易数据已重置,删除 {result['deleted_count']} 条订单",
"result": result
}
except Exception as e:
logger.error(f"重置交易数据失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/recalculate-statistics")
async def recalculate_statistics():
"""
手动重新计算账户统计数据
当您怀疑统计数据不准确时,可以调用此接口重新计算:
- 已实现盈亏
- 当前余额
- 胜率
- 最大回撤
- 收益率
此操作不会删除或修改任何订单,只是重新计算统计数据。
"""
try:
service = get_paper_trading_service()
db = db_service.get_session()
try:
stats = service._recalculate_account_statistics(db)
return {
"success": True,
"message": "统计数据已重新计算",
"statistics": stats
}
finally:
db.close()
except Exception as e:
logger.error(f"重新计算统计数据失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/report")
async def send_report(
hours: int = Query(4, description="报告时间段(小时)"),
send_telegram: bool = Query(True, description="是否发送到Telegram")
):
"""
手动触发发送模拟交易报告
- hours: 报告时间段默认4小时
- send_telegram: 是否发送到Telegram默认True
"""
try:
from app.services.telegram_service import get_telegram_service
service = get_paper_trading_service()
report = service.generate_report(hours=hours)
result = {
"success": True,
"report": report,
"telegram_sent": False
}
if send_telegram:
telegram = get_telegram_service()
sent = await telegram.send_message(report, parse_mode="HTML")
result["telegram_sent"] = sent
return result
except Exception as e:
logger.error(f"生成报告失败: {e}")
raise HTTPException(status_code=500, detail=str(e))