stock-ai-agent/backend/app/api/paper_trading.py
2026-02-08 13:51:13 +08:00

285 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
模拟交易 API
"""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional
from datetime import datetime
from pydantic import BaseModel
from app.services.paper_trading_service import get_paper_trading_service
from app.services.price_monitor_service import get_price_monitor_service
from app.services.binance_service import binance_service
from app.utils.logger import logger
router = APIRouter(prefix="/api/paper-trading", tags=["模拟交易"])
class CloseOrderRequest(BaseModel):
"""手动平仓请求"""
exit_price: float
class OrderResponse(BaseModel):
"""订单响应"""
success: bool
message: str
data: Optional[dict] = None
@router.get("/orders")
async def get_orders(
symbol: Optional[str] = Query(None, description="交易对筛选"),
status: Optional[str] = Query(None, description="状态筛选: active, closed"),
limit: int = Query(100, description="返回数量限制")
):
"""
获取订单列表
- symbol: 可选,按交易对筛选
- status: 可选active=活跃订单, closed=已平仓订单
- limit: 返回数量限制默认100
"""
try:
service = get_paper_trading_service()
if status == "active":
orders = service.get_active_orders(symbol)
elif status == "closed":
orders = service.get_order_history(symbol, limit)
else:
# 返回所有订单
active = service.get_active_orders(symbol)
history = service.get_order_history(symbol, limit)
orders = active + history
return {
"success": True,
"count": len(orders),
"orders": orders
}
except Exception as e:
logger.error(f"获取订单列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/orders/active")
async def get_active_orders(
symbol: Optional[str] = Query(None, description="交易对筛选")
):
"""获取活跃订单"""
try:
service = get_paper_trading_service()
orders = service.get_active_orders(symbol)
return {
"success": True,
"count": len(orders),
"orders": orders
}
except Exception as e:
logger.error(f"获取活跃订单失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/orders/{order_id}")
async def get_order(order_id: str):
"""获取订单详情"""
try:
service = get_paper_trading_service()
order = service.get_order_by_id(order_id)
if not order:
raise HTTPException(status_code=404, detail="订单不存在")
return {
"success": True,
"order": order
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取订单详情失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/orders/{order_id}/close")
async def close_order(order_id: str, request: CloseOrderRequest):
"""
手动平仓
- order_id: 订单ID
- exit_price: 平仓价格
"""
try:
service = get_paper_trading_service()
result = service.close_order_manual(order_id, request.exit_price)
if not result:
raise HTTPException(status_code=404, detail="订单不存在或已平仓")
return {
"success": True,
"message": "平仓成功",
"result": result
}
except HTTPException:
raise
except Exception as e:
logger.error(f"手动平仓失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/statistics")
async def get_statistics(
symbol: Optional[str] = Query(None, description="交易对筛选"),
start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD)"),
end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD)")
):
"""
获取交易统计
- symbol: 可选,按交易对筛选
- start_date: 可选,开始日期
- end_date: 可选,结束日期
"""
try:
service = get_paper_trading_service()
# 解析日期
start = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None
end = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None
stats = service.calculate_statistics(symbol, start, end)
return {
"success": True,
"statistics": stats
}
except ValueError as e:
raise HTTPException(status_code=400, detail=f"日期格式错误: {e}")
except Exception as e:
logger.error(f"获取统计数据失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/statistics/by-grade")
async def get_statistics_by_grade():
"""按信号等级获取统计"""
try:
service = get_paper_trading_service()
stats = service.calculate_statistics()
return {
"success": True,
"by_grade": stats.get("by_grade", {})
}
except Exception as e:
logger.error(f"获取等级统计失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/statistics/by-symbol")
async def get_statistics_by_symbol():
"""按交易对获取统计"""
try:
service = get_paper_trading_service()
stats = service.calculate_statistics()
return {
"success": True,
"by_symbol": stats.get("by_symbol", {})
}
except Exception as e:
logger.error(f"获取交易对统计失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/monitor/status")
async def get_monitor_status():
"""获取价格监控状态和实时价格"""
try:
monitor = get_price_monitor_service()
paper_trading = get_paper_trading_service()
# 获取活跃订单的交易对
active_orders = paper_trading.get_active_orders()
symbols_needed = set(order.get('symbol') for order in active_orders if order.get('symbol'))
# 获取价格 - 优先使用监控服务的缓存价格
latest_prices = dict(monitor.latest_prices)
# 如果监控服务没有价格数据,直接从 Binance 获取
for symbol in symbols_needed:
if symbol not in latest_prices or latest_prices[symbol] is None:
price = binance_service.get_current_price(symbol)
if price:
latest_prices[symbol] = price
# 注意:止盈止损检查由后台任务 (main.py price_monitor_loop) 处理
# 这里只返回价格数据供前端显示
return {
"success": True,
"running": True, # 后台任务始终运行
"subscribed_symbols": list(symbols_needed),
"latest_prices": latest_prices
}
except Exception as e:
logger.error(f"获取监控状态失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/reset")
async def reset_paper_trading():
"""
重置所有模拟交易数据
警告:此操作将删除所有订单记录,不可恢复!
"""
try:
service = get_paper_trading_service()
result = service.reset_all_data()
return {
"success": True,
"message": f"模拟交易数据已重置,删除 {result['deleted_count']} 条订单",
"result": result
}
except Exception as e:
logger.error(f"重置模拟交易数据失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/report")
async def send_report(
hours: int = Query(4, description="报告时间段(小时)"),
send_telegram: bool = Query(True, description="是否发送到Telegram")
):
"""
手动触发发送模拟交易报告
- hours: 报告时间段默认4小时
- send_telegram: 是否发送到Telegram默认True
"""
try:
from app.services.telegram_service import get_telegram_service
service = get_paper_trading_service()
report = service.generate_report(hours=hours)
result = {
"success": True,
"report": report,
"telegram_sent": False
}
if send_telegram:
telegram = get_telegram_service()
sent = await telegram.send_message(report, parse_mode="HTML")
result["telegram_sent"] = sent
return result
except Exception as e:
logger.error(f"生成报告失败: {e}")
raise HTTPException(status_code=500, detail=str(e))