diff --git a/backend/app/api/paper_trading.py b/backend/app/api/paper_trading.py new file mode 100644 index 0000000..f6673fa --- /dev/null +++ b/backend/app/api/paper_trading.py @@ -0,0 +1,210 @@ +""" +模拟交易 API +""" +from fastapi import APIRouter, HTTPException, Query +from typing import Optional +from datetime import datetime +from pydantic import BaseModel + +from app.services.paper_trading_service import get_paper_trading_service +from app.services.price_monitor_service import get_price_monitor_service +from app.utils.logger import logger + + +router = APIRouter(prefix="/api/paper-trading", tags=["模拟交易"]) + + +class CloseOrderRequest(BaseModel): + """手动平仓请求""" + exit_price: float + + +class OrderResponse(BaseModel): + """订单响应""" + success: bool + message: str + data: Optional[dict] = None + + +@router.get("/orders") +async def get_orders( + symbol: Optional[str] = Query(None, description="交易对筛选"), + status: Optional[str] = Query(None, description="状态筛选: active, closed"), + limit: int = Query(100, description="返回数量限制") +): + """ + 获取订单列表 + + - symbol: 可选,按交易对筛选 + - status: 可选,active=活跃订单, closed=已平仓订单 + - limit: 返回数量限制,默认100 + """ + try: + service = get_paper_trading_service() + + if status == "active": + orders = service.get_active_orders(symbol) + elif status == "closed": + orders = service.get_order_history(symbol, limit) + else: + # 返回所有订单 + active = service.get_active_orders(symbol) + history = service.get_order_history(symbol, limit) + orders = active + history + + return { + "success": True, + "count": len(orders), + "orders": orders + } + except Exception as e: + logger.error(f"获取订单列表失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/orders/active") +async def get_active_orders( + symbol: Optional[str] = Query(None, description="交易对筛选") +): + """获取活跃订单""" + try: + service = get_paper_trading_service() + orders = service.get_active_orders(symbol) + return { + "success": True, + "count": len(orders), + "orders": orders + } + except Exception as e: + logger.error(f"获取活跃订单失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/orders/{order_id}") +async def get_order(order_id: str): + """获取订单详情""" + try: + service = get_paper_trading_service() + order = service.get_order_by_id(order_id) + + if not order: + raise HTTPException(status_code=404, detail="订单不存在") + + return { + "success": True, + "order": order + } + except HTTPException: + raise + except Exception as e: + logger.error(f"获取订单详情失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/orders/{order_id}/close") +async def close_order(order_id: str, request: CloseOrderRequest): + """ + 手动平仓 + + - order_id: 订单ID + - exit_price: 平仓价格 + """ + try: + service = get_paper_trading_service() + result = service.close_order_manual(order_id, request.exit_price) + + if not result: + raise HTTPException(status_code=404, detail="订单不存在或已平仓") + + return { + "success": True, + "message": "平仓成功", + "result": result + } + except HTTPException: + raise + except Exception as e: + logger.error(f"手动平仓失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/statistics") +async def get_statistics( + symbol: Optional[str] = Query(None, description="交易对筛选"), + start_date: Optional[str] = Query(None, description="开始日期 (YYYY-MM-DD)"), + end_date: Optional[str] = Query(None, description="结束日期 (YYYY-MM-DD)") +): + """ + 获取交易统计 + + - symbol: 可选,按交易对筛选 + - start_date: 可选,开始日期 + - end_date: 可选,结束日期 + """ + try: + service = get_paper_trading_service() + + # 解析日期 + start = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None + end = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None + + stats = service.calculate_statistics(symbol, start, end) + + return { + "success": True, + "statistics": stats + } + except ValueError as e: + raise HTTPException(status_code=400, detail=f"日期格式错误: {e}") + except Exception as e: + logger.error(f"获取统计数据失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/statistics/by-grade") +async def get_statistics_by_grade(): + """按信号等级获取统计""" + try: + service = get_paper_trading_service() + stats = service.calculate_statistics() + + return { + "success": True, + "by_grade": stats.get("by_grade", {}) + } + except Exception as e: + logger.error(f"获取等级统计失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/statistics/by-symbol") +async def get_statistics_by_symbol(): + """按交易对获取统计""" + try: + service = get_paper_trading_service() + stats = service.calculate_statistics() + + return { + "success": True, + "by_symbol": stats.get("by_symbol", {}) + } + except Exception as e: + logger.error(f"获取交易对统计失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/monitor/status") +async def get_monitor_status(): + """获取价格监控状态""" + try: + monitor = get_price_monitor_service() + + return { + "success": True, + "running": monitor.is_running(), + "subscribed_symbols": monitor.get_subscribed_symbols(), + "latest_prices": monitor.latest_prices + } + except Exception as e: + logger.error(f"获取监控状态失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/backend/app/config.py b/backend/app/config.py index d1fb626..dc65a0d 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -95,10 +95,12 @@ class Settings(BaseSettings): # 飞书机器人配置 feishu_webhook_url: str = "https://open.feishu.cn/open-apis/bot/v2/hook/8a1dcf69-6753-41e2-a393-edc4f7822db0" + feishu_enabled: bool = True # 是否启用飞书通知 # Telegram 机器人配置 telegram_bot_token: str = "" # 从 @BotFather 获取 telegram_channel_id: str = "" # 频道 ID,如 @your_channel 或 -1001234567890 + telegram_enabled: bool = False # 是否启用 Telegram 通知 # 加密货币交易智能体配置 crypto_symbols: str = "BTCUSDT,ETHUSDT,BNBUSDT,SOLUSDT" # 监控的交易对,逗号分隔 @@ -108,6 +110,12 @@ class Settings(BaseSettings): # Brave Search API 配置 brave_api_key: str = "" + # 模拟交易配置 + paper_trading_enabled: bool = True # 是否启用模拟交易 + paper_trading_position_a: float = 1000 # A级信号仓位 (USDT) + paper_trading_position_b: float = 500 # B级信号仓位 (USDT) + paper_trading_position_c: float = 200 # C级信号仓位 (USDT) + class Config: env_file = find_env_file() case_sensitive = False diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index 8ce1b73..afe743f 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -11,6 +11,8 @@ from app.config import get_settings from app.services.binance_service import binance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service +from app.services.paper_trading_service import get_paper_trading_service +from app.services.price_monitor_service import get_price_monitor_service from app.crypto_agent.signal_analyzer import SignalAnalyzer from app.crypto_agent.strategy import TrendFollowingStrategy @@ -27,6 +29,17 @@ class CryptoAgent: self.analyzer = SignalAnalyzer() self.strategy = TrendFollowingStrategy() + # 模拟交易服务 + self.paper_trading_enabled = self.settings.paper_trading_enabled + if self.paper_trading_enabled: + self.paper_trading = get_paper_trading_service() + self.price_monitor = get_price_monitor_service() + # 注册价格回调 + self.price_monitor.add_price_callback(self._on_price_update) + else: + self.paper_trading = None + self.price_monitor = None + # 状态管理 self.last_signals: Dict[str, Dict[str, Any]] = {} # 上次信号 self.last_trends: Dict[str, str] = {} # 上次趋势 @@ -41,6 +54,55 @@ class CryptoAgent: self.running = False logger.info(f"加密货币智能体初始化完成,监控交易对: {self.symbols}") + if self.paper_trading_enabled: + logger.info(f"模拟交易已启用") + + def _on_price_update(self, symbol: str, price: float): + """处理实时价格更新(用于模拟交易)""" + if not self.paper_trading: + return + + # 检查是否有订单触发止盈止损 + triggered = self.paper_trading.check_price_triggers(symbol, price) + + for result in triggered: + # 异步发送平仓通知 + asyncio.create_task(self._notify_order_closed(result)) + + async def _notify_order_closed(self, result: Dict[str, Any]): + """发送订单平仓通知""" + is_win = result.get('is_win', False) + status = result.get('status', '') + + # 确定图标和文本 + if status == 'closed_tp': + emoji = "🎯" + status_text = "止盈平仓" + elif status == 'closed_sl': + emoji = "🛑" + status_text = "止损平仓" + else: + emoji = "📤" + status_text = "手动平仓" + + win_text = "盈利" if is_win else "亏损" + side_text = "做多" if result.get('side') == 'long' else "做空" + + # 构建消息 + message = f"""{emoji} 订单{status_text} + +交易对: {result.get('symbol')} +方向: {side_text} +入场: ${result.get('entry_price', 0):,.2f} +出场: ${result.get('exit_price', 0):,.2f} +{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f}) +持仓时间: {result.get('hold_duration', 'N/A')}""" + + # 发送通知 + await self.feishu.send_text(message) + await self.telegram.send_message(message) + + logger.info(f"已发送订单平仓通知: {result.get('order_id')}") def _get_seconds_until_next_5min(self) -> int: """计算距离下一个5分钟整点的秒数""" @@ -69,8 +131,16 @@ class CryptoAgent: logger.info(f" 监控交易对: {', '.join(self.symbols)}") logger.info(f" 运行模式: 每5分钟整点执行 (:00, :05, :10, ...)") logger.info(f" LLM阈值: {self.llm_threshold * 100:.0f}%") + if self.paper_trading_enabled: + logger.info(f" 模拟交易: 已启用") logger.info("=" * 60 + "\n") + # 启动价格监控(用于模拟交易) + if self.paper_trading_enabled and self.price_monitor: + for symbol in self.symbols: + self.price_monitor.subscribe_symbol(symbol) + logger.info(f"已启动 WebSocket 价格监控: {', '.join(self.symbols)}") + # 发送启动通知(飞书 + Telegram) await self.feishu.send_text( f"🚀 加密货币智能体已启动\n" @@ -111,6 +181,11 @@ class CryptoAgent: def stop(self): """停止运行""" self.running = False + + # 停止价格监控 + if self.price_monitor: + self.price_monitor.stop() + logger.info("加密货币智能体已停止") async def analyze_symbol(self, symbol: str): @@ -263,6 +338,13 @@ class CryptoAgent: action_text = '买入' if signal['action'] == 'buy' else '卖出' logger.info(f" ✅ 已发送 {action_text} 信号通知(飞书+Telegram)") + + # 10. 创建模拟订单 + if self.paper_trading_enabled and self.paper_trading: + if signal.get('signal_grade', 'D') != 'D': + order = self.paper_trading.create_order_from_signal(signal) + if order: + logger.info(f" 📝 已创建模拟订单: {order.order_id}") else: logger.info(f" ⏸️ 置信度不足({signal['confidence']}%),不发送通知") else: diff --git a/backend/app/main.py b/backend/app/main.py index 858b8df..29c6dda 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -8,7 +8,7 @@ from fastapi.responses import FileResponse from contextlib import asynccontextmanager from app.config import get_settings from app.utils.logger import logger -from app.api import chat, stock, skills, llm, auth, admin +from app.api import chat, stock, skills, llm, auth, admin, paper_trading import os @@ -47,6 +47,7 @@ app.include_router(chat.router, prefix="/api/chat", tags=["对话"]) app.include_router(stock.router, prefix="/api/stock", tags=["股票数据"]) app.include_router(skills.router, prefix="/api/skills", tags=["技能管理"]) app.include_router(llm.router, tags=["LLM模型"]) +app.include_router(paper_trading.router, tags=["模拟交易"]) # 挂载静态文件 frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "frontend") @@ -74,6 +75,14 @@ async def health_check(): """健康检查""" return {"status": "healthy"} +@app.get("/paper-trading") +async def paper_trading_page(): + """模拟交易页面""" + page_path = os.path.join(frontend_path, "paper-trading.html") + if os.path.exists(page_path): + return FileResponse(page_path) + return {"message": "页面不存在"} + if __name__ == "__main__": import uvicorn uvicorn.run( diff --git a/backend/app/models/paper_trading.py b/backend/app/models/paper_trading.py new file mode 100644 index 0000000..a8d2b55 --- /dev/null +++ b/backend/app/models/paper_trading.py @@ -0,0 +1,110 @@ +""" +模拟交易数据模型 +""" +from enum import Enum +from datetime import datetime +from sqlalchemy import Column, Integer, String, Float, DateTime, JSON, Text, Enum as SQLEnum +from app.models.database import Base + + +class OrderStatus(str, Enum): + """订单状态""" + PENDING = "pending" # 等待入场 + OPEN = "open" # 持仓中 + CLOSED_TP = "closed_tp" # 止盈平仓 + CLOSED_SL = "closed_sl" # 止损平仓 + CLOSED_MANUAL = "closed_manual" # 手动平仓 + CANCELLED = "cancelled" # 已取消 + + +class OrderSide(str, Enum): + """订单方向""" + LONG = "long" # 做多 + SHORT = "short" # 做空 + + +class SignalGrade(str, Enum): + """信号等级""" + A = "A" + B = "B" + C = "C" + D = "D" + + +class PaperOrder(Base): + """模拟交易订单表""" + __tablename__ = "paper_orders" + + id = Column(Integer, primary_key=True, index=True) + + # 订单标识 + order_id = Column(String(64), unique=True, nullable=False, index=True) + + # 交易对信息 + symbol = Column(String(20), nullable=False, index=True) + side = Column(SQLEnum(OrderSide), nullable=False) + + # 价格信息 + entry_price = Column(Float, nullable=False) # 目标入场价 + stop_loss = Column(Float, nullable=False) # 止损价 + take_profit = Column(Float, nullable=False) # 止盈价 + filled_price = Column(Float, nullable=True) # 实际成交价 + exit_price = Column(Float, nullable=True) # 出场价 + + # 仓位信息 + quantity = Column(Float, default=1000) # 仓位大小 (USDT) + + # 信号信息 + signal_grade = Column(SQLEnum(SignalGrade), default=SignalGrade.D) + signal_type = Column(String(20), default="swing") # swing / short_term + confidence = Column(Float, default=0) # 置信度 (0-100) + trend = Column(String(20), nullable=True) # 趋势方向 + + # 订单状态 + status = Column(SQLEnum(OrderStatus), default=OrderStatus.PENDING, index=True) + + # 盈亏信息 + pnl_amount = Column(Float, default=0) # 盈亏金额 (USDT) + pnl_percent = Column(Float, default=0) # 盈亏百分比 + + # 风险指标 + max_drawdown = Column(Float, default=0) # 持仓期间最大回撤 + max_profit = Column(Float, default=0) # 持仓期间最大盈利 + + # 时间戳 + created_at = Column(DateTime, default=datetime.utcnow) + opened_at = Column(DateTime, nullable=True) # 开仓时间 + closed_at = Column(DateTime, nullable=True) # 平仓时间 + + # 附加数据 + entry_reasons = Column(JSON, nullable=True) # 入场原因 + indicators = Column(JSON, nullable=True) # 技术指标快照 + notes = Column(Text, nullable=True) # 备注 + + def to_dict(self): + """转换为字典""" + return { + 'id': self.id, + 'order_id': self.order_id, + 'symbol': self.symbol, + 'side': self.side.value if self.side else None, + 'entry_price': self.entry_price, + 'stop_loss': self.stop_loss, + 'take_profit': self.take_profit, + 'filled_price': self.filled_price, + 'exit_price': self.exit_price, + 'quantity': self.quantity, + 'signal_grade': self.signal_grade.value if self.signal_grade else None, + 'signal_type': self.signal_type, + 'confidence': self.confidence, + 'trend': self.trend, + 'status': self.status.value if self.status else None, + 'pnl_amount': self.pnl_amount, + 'pnl_percent': self.pnl_percent, + 'max_drawdown': self.max_drawdown, + 'max_profit': self.max_profit, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'opened_at': self.opened_at.isoformat() if self.opened_at else None, + 'closed_at': self.closed_at.isoformat() if self.closed_at else None, + 'entry_reasons': self.entry_reasons, + } diff --git a/backend/app/services/feishu_service.py b/backend/app/services/feishu_service.py index afb80e0..ba35ca9 100644 --- a/backend/app/services/feishu_service.py +++ b/backend/app/services/feishu_service.py @@ -20,9 +20,13 @@ class FeishuService: """ settings = get_settings() self.webhook_url = webhook_url or getattr(settings, 'feishu_webhook_url', '') - self.enabled = bool(self.webhook_url) + # 检查配置开关和 webhook_url 是否都有效 + config_enabled = getattr(settings, 'feishu_enabled', True) + self.enabled = config_enabled and bool(self.webhook_url) - if self.enabled: + if not config_enabled: + logger.info("飞书通知已通过配置禁用") + elif self.enabled: logger.info("飞书通知服务初始化完成") else: logger.warning("飞书 Webhook URL 未配置,通知功能已禁用") diff --git a/backend/app/services/paper_trading_service.py b/backend/app/services/paper_trading_service.py new file mode 100644 index 0000000..5b13d95 --- /dev/null +++ b/backend/app/services/paper_trading_service.py @@ -0,0 +1,455 @@ +""" +模拟交易服务 - 订单管理和盈亏统计 +""" +import uuid +from datetime import datetime, timedelta +from typing import Dict, Any, List, Optional + +from app.models.paper_trading import PaperOrder, OrderStatus, OrderSide, SignalGrade +from app.services.db_service import db_service +from app.config import get_settings +from app.utils.logger import logger + + +# 仓位大小配置 +POSITION_SIZE = { + 'A': 1000, # A级信号 1000 USDT + 'B': 500, # B级信号 500 USDT + 'C': 200, # C级信号 200 USDT + 'D': 0 # D级信号不开仓 +} + + +class PaperTradingService: + """模拟交易服务""" + + def __init__(self): + """初始化模拟交易服务""" + self.settings = get_settings() + self.active_orders: Dict[str, PaperOrder] = {} # 内存缓存活跃订单 + + # 确保表已创建 + self._ensure_table_exists() + + # 加载活跃订单到内存 + self._load_active_orders() + + logger.info("模拟交易服务初始化完成") + + def _ensure_table_exists(self): + """确保数据表已创建""" + from app.models.paper_trading import PaperOrder + from app.models.database import Base + Base.metadata.create_all(bind=db_service.engine) + + def _load_active_orders(self): + """从数据库加载活跃订单到内存""" + db = db_service.get_session() + try: + orders = db.query(PaperOrder).filter( + PaperOrder.status.in_([OrderStatus.PENDING, OrderStatus.OPEN]) + ).all() + + for order in orders: + self.active_orders[order.order_id] = order + + logger.info(f"已加载 {len(orders)} 个活跃订单") + except Exception as e: + logger.error(f"加载活跃订单失败: {e}") + finally: + db.close() + + def create_order_from_signal(self, signal: Dict[str, Any]) -> Optional[PaperOrder]: + """ + 从交易信号创建模拟订单 + + Args: + signal: 交易信号 + - symbol: 交易对 + - action: 'buy' 或 'sell' + - price: 入场价 + - stop_loss: 止损价 + - take_profit: 止盈价 + - confidence: 置信度 + - signal_grade: 信号等级 + - signal_type: 信号类型 + - reasons: 入场原因 + - indicators: 技术指标 + + Returns: + 创建的订单或 None + """ + action = signal.get('action') + if action not in ['buy', 'sell']: + return None + + # 获取信号等级 + grade = signal.get('signal_grade', 'D') + if grade == 'D': + logger.info(f"D级信号不开仓: {signal.get('symbol')}") + return None + + # 确定仓位大小 + quantity = POSITION_SIZE.get(grade, 0) + if quantity == 0: + return None + + # 确定订单方向 + side = OrderSide.LONG if action == 'buy' else OrderSide.SHORT + + # 生成订单ID + symbol = signal.get('symbol', 'UNKNOWN') + order_id = f"PT-{symbol}-{datetime.now().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:6]}" + + db = db_service.get_session() + try: + order = PaperOrder( + order_id=order_id, + symbol=symbol, + side=side, + entry_price=signal.get('price', 0), + stop_loss=signal.get('stop_loss', 0), + take_profit=signal.get('take_profit', 0), + filled_price=signal.get('price', 0), # 市价成交 + quantity=quantity, + signal_grade=SignalGrade(grade), + signal_type=signal.get('signal_type', 'swing'), + confidence=signal.get('confidence', 0), + trend=signal.get('trend'), + status=OrderStatus.OPEN, + opened_at=datetime.utcnow(), + entry_reasons=signal.get('reasons', []), + indicators=signal.get('indicators', {}) + ) + + db.add(order) + db.commit() + db.refresh(order) + + # 添加到活跃订单缓存 + self.active_orders[order.order_id] = order + + logger.info(f"创建模拟订单: {order_id} | {symbol} {side.value} @ ${order.entry_price:,.2f} | 仓位: ${quantity}") + return order + + except Exception as e: + logger.error(f"创建模拟订单失败: {e}") + db.rollback() + return None + finally: + db.close() + + def check_price_triggers(self, symbol: str, current_price: float) -> List[Dict[str, Any]]: + """ + 检查当前价格是否触发止盈止损 + + Args: + symbol: 交易对 + current_price: 当前价格 + + Returns: + 触发的订单结果列表 + """ + triggered = [] + orders_to_check = [ + order for order in self.active_orders.values() + if order.symbol == symbol and order.status == OrderStatus.OPEN + ] + + for order in orders_to_check: + result = self._check_order_trigger(order, current_price) + if result: + triggered.append(result) + else: + # 更新最大回撤和最大盈利 + self._update_order_extremes(order, current_price) + + return triggered + + def _check_order_trigger(self, order: PaperOrder, current_price: float) -> Optional[Dict[str, Any]]: + """检查单个订单是否触发""" + triggered = False + new_status = None + exit_price = current_price + + if order.side == OrderSide.LONG: + # 做多: 价格 >= 止盈价 触发止盈, 价格 <= 止损价 触发止损 + if current_price >= order.take_profit: + triggered = True + new_status = OrderStatus.CLOSED_TP + exit_price = order.take_profit + elif current_price <= order.stop_loss: + triggered = True + new_status = OrderStatus.CLOSED_SL + exit_price = order.stop_loss + else: + # 做空: 价格 <= 止盈价 触发止盈, 价格 >= 止损价 触发止损 + if current_price <= order.take_profit: + triggered = True + new_status = OrderStatus.CLOSED_TP + exit_price = order.take_profit + elif current_price >= order.stop_loss: + triggered = True + new_status = OrderStatus.CLOSED_SL + exit_price = order.stop_loss + + if triggered: + return self._close_order(order, new_status, exit_price) + + return None + + def _close_order(self, order: PaperOrder, status: OrderStatus, exit_price: float) -> Dict[str, Any]: + """平仓并计算盈亏""" + db = db_service.get_session() + try: + # 计算盈亏 + if order.side == OrderSide.LONG: + pnl_percent = ((exit_price - order.filled_price) / order.filled_price) * 100 + else: + pnl_percent = ((order.filled_price - exit_price) / order.filled_price) * 100 + + pnl_amount = order.quantity * pnl_percent / 100 + + # 计算持仓时间 + hold_duration = datetime.utcnow() - order.opened_at if order.opened_at else timedelta(0) + + # 更新订单 + order.status = status + order.exit_price = exit_price + order.closed_at = datetime.utcnow() + order.pnl_amount = round(pnl_amount, 2) + order.pnl_percent = round(pnl_percent, 4) + + db.merge(order) + db.commit() + + # 从活跃订单缓存中移除 + if order.order_id in self.active_orders: + del self.active_orders[order.order_id] + + result = { + 'order_id': order.order_id, + 'symbol': order.symbol, + 'side': order.side.value, + 'status': status.value, + 'entry_price': order.filled_price, + 'exit_price': exit_price, + 'quantity': order.quantity, + 'pnl_amount': order.pnl_amount, + 'pnl_percent': order.pnl_percent, + 'is_win': pnl_amount > 0, + 'hold_duration': str(hold_duration).split('.')[0], # 去掉微秒 + 'signal_grade': order.signal_grade.value if order.signal_grade else None + } + + status_text = "止盈" if status == OrderStatus.CLOSED_TP else "止损" + logger.info(f"订单{status_text}: {order.order_id} | {order.symbol} | 盈亏: {pnl_percent:+.2f}% (${pnl_amount:+.2f})") + + return result + + except Exception as e: + logger.error(f"平仓失败: {e}") + db.rollback() + return None + finally: + db.close() + + def _update_order_extremes(self, order: PaperOrder, current_price: float): + """更新订单的最大回撤和最大盈利""" + if order.side == OrderSide.LONG: + current_pnl_percent = ((current_price - order.filled_price) / order.filled_price) * 100 + else: + current_pnl_percent = ((order.filled_price - current_price) / order.filled_price) * 100 + + # 更新极值 + if current_pnl_percent > order.max_profit: + order.max_profit = current_pnl_percent + if current_pnl_percent < order.max_drawdown: + order.max_drawdown = current_pnl_percent + + def close_order_manual(self, order_id: str, exit_price: float) -> Optional[Dict[str, Any]]: + """手动平仓""" + if order_id not in self.active_orders: + logger.warning(f"订单不存在或已平仓: {order_id}") + return None + + order = self.active_orders[order_id] + return self._close_order(order, OrderStatus.CLOSED_MANUAL, exit_price) + + def get_active_orders(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]: + """获取活跃订单""" + orders = list(self.active_orders.values()) + if symbol: + orders = [o for o in orders if o.symbol == symbol] + return [o.to_dict() for o in orders] + + def get_order_by_id(self, order_id: str) -> Optional[Dict[str, Any]]: + """根据ID获取订单""" + # 先从缓存查找 + if order_id in self.active_orders: + return self.active_orders[order_id].to_dict() + + # 从数据库查找 + db = db_service.get_session() + try: + order = db.query(PaperOrder).filter(PaperOrder.order_id == order_id).first() + return order.to_dict() if order else None + finally: + db.close() + + def get_order_history(self, symbol: Optional[str] = None, limit: int = 100) -> List[Dict[str, Any]]: + """获取历史订单""" + db = db_service.get_session() + try: + query = db.query(PaperOrder).filter( + PaperOrder.status.in_([ + OrderStatus.CLOSED_TP, + OrderStatus.CLOSED_SL, + OrderStatus.CLOSED_MANUAL + ]) + ) + if symbol: + query = query.filter(PaperOrder.symbol == symbol) + + orders = query.order_by(PaperOrder.closed_at.desc()).limit(limit).all() + return [o.to_dict() for o in orders] + finally: + db.close() + + def calculate_statistics(self, symbol: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None) -> Dict[str, Any]: + """计算交易统计""" + db = db_service.get_session() + try: + query = db.query(PaperOrder).filter( + PaperOrder.status.in_([ + OrderStatus.CLOSED_TP, + OrderStatus.CLOSED_SL, + OrderStatus.CLOSED_MANUAL + ]) + ) + + if symbol: + query = query.filter(PaperOrder.symbol == symbol) + if start_date: + query = query.filter(PaperOrder.closed_at >= start_date) + if end_date: + query = query.filter(PaperOrder.closed_at <= end_date) + + orders = query.all() + + if not orders: + return self._empty_statistics() + + # 计算各项指标 + total_trades = len(orders) + winning_trades = len([o for o in orders if o.pnl_amount > 0]) + losing_trades = len([o for o in orders if o.pnl_amount < 0]) + + total_pnl = sum(o.pnl_amount for o in orders) + total_pnl_percent = sum(o.pnl_percent for o in orders) + + wins = [o.pnl_amount for o in orders if o.pnl_amount > 0] + losses = [abs(o.pnl_amount) for o in orders if o.pnl_amount < 0] + + gross_profit = sum(wins) if wins else 0 + gross_loss = sum(losses) if losses else 0 + + return { + 'total_trades': total_trades, + 'winning_trades': winning_trades, + 'losing_trades': losing_trades, + 'win_rate': round((winning_trades / total_trades * 100), 2) if total_trades > 0 else 0, + 'total_pnl': round(total_pnl, 2), + 'total_pnl_percent': round(total_pnl_percent, 2), + 'average_pnl': round(total_pnl / total_trades, 2) if total_trades > 0 else 0, + 'average_win': round(sum(wins) / len(wins), 2) if wins else 0, + 'average_loss': round(sum(losses) / len(losses), 2) if losses else 0, + 'profit_factor': round(gross_profit / gross_loss, 2) if gross_loss > 0 else float('inf'), + 'max_drawdown': min(o.max_drawdown for o in orders) if orders else 0, + 'best_trade': max(o.pnl_percent for o in orders) if orders else 0, + 'worst_trade': min(o.pnl_percent for o in orders) if orders else 0, + 'by_grade': self._calculate_grade_statistics(orders), + 'by_type': self._calculate_type_statistics(orders), + 'by_symbol': self._calculate_symbol_statistics(orders) + } + + finally: + db.close() + + def _empty_statistics(self) -> Dict[str, Any]: + """返回空统计结构""" + return { + 'total_trades': 0, + 'winning_trades': 0, + 'losing_trades': 0, + 'win_rate': 0, + 'total_pnl': 0, + 'total_pnl_percent': 0, + 'average_pnl': 0, + 'average_win': 0, + 'average_loss': 0, + 'profit_factor': 0, + 'max_drawdown': 0, + 'best_trade': 0, + 'worst_trade': 0, + 'by_grade': {}, + 'by_type': {}, + 'by_symbol': {} + } + + def _calculate_grade_statistics(self, orders: List[PaperOrder]) -> Dict[str, Any]: + """按信号等级统计""" + result = {} + for grade in ['A', 'B', 'C', 'D']: + grade_orders = [o for o in orders if o.signal_grade and o.signal_grade.value == grade] + if grade_orders: + wins = len([o for o in grade_orders if o.pnl_amount > 0]) + result[grade] = { + 'count': len(grade_orders), + 'win_rate': round(wins / len(grade_orders) * 100, 1), + 'total_pnl': round(sum(o.pnl_amount for o in grade_orders), 2) + } + return result + + def _calculate_type_statistics(self, orders: List[PaperOrder]) -> Dict[str, Any]: + """按信号类型统计""" + result = {} + for signal_type in ['swing', 'short_term']: + type_orders = [o for o in orders if o.signal_type == signal_type] + if type_orders: + wins = len([o for o in type_orders if o.pnl_amount > 0]) + result[signal_type] = { + 'count': len(type_orders), + 'win_rate': round(wins / len(type_orders) * 100, 1), + 'total_pnl': round(sum(o.pnl_amount for o in type_orders), 2) + } + return result + + def _calculate_symbol_statistics(self, orders: List[PaperOrder]) -> Dict[str, Any]: + """按交易对统计""" + result = {} + symbols = set(o.symbol for o in orders) + for symbol in symbols: + symbol_orders = [o for o in orders if o.symbol == symbol] + if symbol_orders: + wins = len([o for o in symbol_orders if o.pnl_amount > 0]) + result[symbol] = { + 'count': len(symbol_orders), + 'win_rate': round(wins / len(symbol_orders) * 100, 1), + 'total_pnl': round(sum(o.pnl_amount for o in symbol_orders), 2) + } + return result + + +# 全局单例 +_paper_trading_service: Optional[PaperTradingService] = None + + +def get_paper_trading_service() -> PaperTradingService: + """获取模拟交易服务单例""" + global _paper_trading_service + if _paper_trading_service is None: + _paper_trading_service = PaperTradingService() + return _paper_trading_service diff --git a/backend/app/services/price_monitor_service.py b/backend/app/services/price_monitor_service.py new file mode 100644 index 0000000..52de027 --- /dev/null +++ b/backend/app/services/price_monitor_service.py @@ -0,0 +1,377 @@ +""" +价格监控服务 - 使用 Binance WebSocket 实时监控价格 +""" +import threading +import time +import sys +import os +import logging +from typing import Dict, List, Callable, Optional, Set +from app.utils.logger import logger +from app.config import get_settings + +# 抑制 binance 库的 WebSocket 错误日志 +logging.getLogger('binance.websocket.reconnecting_websocket').setLevel(logging.CRITICAL) +logging.getLogger('binance.websocket.threaded_stream').setLevel(logging.CRITICAL) + + +class SuppressOutput: + """临时抑制 stdout/stderr 输出""" + def __init__(self, suppress_stderr=True, suppress_stdout=False): + self.suppress_stderr = suppress_stderr + self.suppress_stdout = suppress_stdout + self._stderr = None + self._stdout = None + self._devnull = None + + def __enter__(self): + self._devnull = open(os.devnull, 'w') + if self.suppress_stderr: + self._stderr = sys.stderr + sys.stderr = self._devnull + if self.suppress_stdout: + self._stdout = sys.stdout + sys.stdout = self._devnull + return self + + def __exit__(self, *args): + if self._stderr: + sys.stderr = self._stderr + if self._stdout: + sys.stdout = self._stdout + if self._devnull: + self._devnull.close() + + +class PriceMonitorService: + """实时价格监控服务""" + + def __init__(self): + """初始化价格监控服务""" + self.settings = get_settings() + self.twm = None + self.running = False + self.subscribed_symbols: Dict[str, str] = {} # symbol -> stream_name + self.price_callbacks: List[Callable[[str, float], None]] = [] + self.latest_prices: Dict[str, float] = {} + self._lock = threading.Lock() + self._pending_symbols: List[str] = [] # 待订阅的交易对 + self._reconnecting = False # 是否正在重连 + self._desired_symbols: Set[str] = set() # 期望订阅的交易对(用于重连) + self._stop_requested = False # 是否请求停止(区分主动停止和意外断开) + self._last_message_time: Dict[str, float] = {} # 上次收到消息的时间 + self._health_check_thread = None + + logger.info("价格监控服务初始化完成") + + def start(self): + """启动 WebSocket 管理器(在独立线程中)""" + if self.running: + logger.warning("价格监控服务已在运行") + return + + self._stop_requested = False + + def _start_in_thread(): + try: + # 延迟导入,避免在模块加载时就创建事件循环 + from binance import ThreadedWebsocketManager + + # Monkey patch: 抑制 binance 库的 "Read loop has been closed" 错误消息 + try: + from binance.ws import reconnecting_websocket + original_print = print + + def filtered_print(*args, **kwargs): + # 过滤掉 binance 的 read loop 错误消息 + if args and "Read loop" in str(args[0]): + return + original_print(*args, **kwargs) + + reconnecting_websocket.print = filtered_print + except Exception: + pass # 如果 patch 失败,继续运行 + + self.twm = ThreadedWebsocketManager( + api_key=self.settings.binance_api_key or "", + api_secret=self.settings.binance_api_secret or "" + ) + self.twm.start() + self.running = True + self._reconnecting = False + logger.info("WebSocket 管理器已启动") + + # 等待 WebSocket 完全启动 + time.sleep(1) + + # 订阅待处理的交易对 + for symbol in self._pending_symbols: + self._do_subscribe(symbol) + self._pending_symbols.clear() + + # 重连时恢复之前的订阅 + for symbol in self._desired_symbols: + if symbol not in self.subscribed_symbols: + self._do_subscribe(symbol) + + # 启动健康检查 + self._start_health_check() + + except Exception as e: + logger.error(f"启动 WebSocket 管理器失败: {e}") + import traceback + logger.error(traceback.format_exc()) + # 启动失败,尝试重连 + if not self._stop_requested: + self._schedule_reconnect() + + # 在独立线程中启动 + thread = threading.Thread(target=_start_in_thread, daemon=True) + thread.start() + + def _start_health_check(self): + """启动健康检查线程""" + def _check_health(): + while self.running and not self._stop_requested: + time.sleep(30) # 每30秒检查一次 + + if not self.running or self._stop_requested: + break + + # 检查是否有超过60秒没收到消息的交易对 + now = time.time() + for symbol in list(self._desired_symbols): + last_time = self._last_message_time.get(symbol, now) + if now - last_time > 60: + logger.warning(f"{symbol} 超过60秒未收到数据,触发重连") + self._schedule_reconnect() + break + + self._health_check_thread = threading.Thread(target=_check_health, daemon=True) + self._health_check_thread.start() + + def stop(self): + """停止 WebSocket 管理器""" + # 标记为主动停止 + self._stop_requested = True + + if not self.running: + return + + # 先标记为停止,防止回调继续处理 + self.running = False + + try: + # 抑制关闭时的错误输出(binance 库用 print 输出错误) + with SuppressOutput(suppress_stderr=True, suppress_stdout=True): + # 先停止所有 socket 订阅 + if self.twm: + for _, stream_name in list(self.subscribed_symbols.items()): + try: + self.twm.stop_socket(stream_name) + except: + pass + + # 等待一小段时间让 socket 关闭 + time.sleep(0.5) + + # 然后停止管理器 + try: + self.twm.stop() + except: + pass + + self.subscribed_symbols.clear() + self._desired_symbols.clear() + self._last_message_time.clear() + logger.info("WebSocket 管理器已停止") + except Exception as e: + # 忽略关闭时的错误 + pass + + def _schedule_reconnect(self, delay: int = 5): + """安排重连""" + if self._stop_requested or self._reconnecting: + return + + self._reconnecting = True + logger.warning(f"WebSocket 连接断开,{delay} 秒后尝试重连...") + + def _reconnect(): + time.sleep(delay) + if not self._stop_requested: + self._do_reconnect() + + thread = threading.Thread(target=_reconnect, daemon=True) + thread.start() + + def _do_reconnect(self): + """执行重连""" + if self._stop_requested: + return + + logger.info("正在重新连接 WebSocket...") + + # 清理旧连接(抑制错误输出) + with SuppressOutput(suppress_stderr=True, suppress_stdout=True): + try: + if self.twm: + self.twm.stop() + except: + pass + + self.twm = None + self.running = False + self.subscribed_symbols.clear() + + # 重新启动 + self.start() + + def subscribe_symbol(self, symbol: str): + """ + 订阅交易对的实时价格 + + Args: + symbol: 交易对,如 "BTCUSDT" + """ + symbol = symbol.upper() + + # 记录期望订阅的交易对(用于重连恢复) + self._desired_symbols.add(symbol) + + if symbol in self.subscribed_symbols: + logger.debug(f"已订阅 {symbol}") + return + + if not self.running: + # 如果还没启动,先加入待订阅列表,然后启动 + if symbol not in self._pending_symbols: + self._pending_symbols.append(symbol) + self.start() + return + + self._do_subscribe(symbol) + + def _do_subscribe(self, symbol: str): + """实际执行订阅""" + if not self.twm or not self.running: + return + + try: + stream_name = self.twm.start_symbol_ticker_socket( + callback=self._handle_price_update, + symbol=symbol + ) + self.subscribed_symbols[symbol] = stream_name + self._last_message_time[symbol] = time.time() + logger.info(f"已订阅 {symbol} 价格更新") + except Exception as e: + logger.error(f"订阅 {symbol} 失败: {e}") + + def unsubscribe_symbol(self, symbol: str): + """取消订阅交易对""" + symbol = symbol.upper() + if symbol not in self.subscribed_symbols: + return + + try: + stream_name = self.subscribed_symbols[symbol] + self.twm.stop_socket(stream_name) + del self.subscribed_symbols[symbol] + self._desired_symbols.discard(symbol) + logger.info(f"已取消订阅 {symbol}") + except Exception as e: + logger.error(f"取消订阅 {symbol} 失败: {e}") + + def add_price_callback(self, callback: Callable[[str, float], None]): + """ + 添加价格更新回调函数 + + Args: + callback: 回调函数,签名为 (symbol: str, price: float) -> None + """ + with self._lock: + if callback not in self.price_callbacks: + self.price_callbacks.append(callback) + + def remove_price_callback(self, callback: Callable): + """移除价格回调函数""" + with self._lock: + if callback in self.price_callbacks: + self.price_callbacks.remove(callback) + + def _handle_price_update(self, msg: Dict): + """处理 WebSocket 价格更新消息""" + # 如果服务已停止或正在重连,忽略消息 + if not self.running or self._reconnecting or self._stop_requested: + return + + try: + # 检查错误消息 + if msg.get('e') == 'error': + error_type = msg.get('type', '') + error_msg = str(msg.get('m', '')) + + # 这些错误通常是正常的连接关闭,不需要记录 + ignored_errors = ['ReadLoopClosed', 'ConnectionClosed', 'WebSocketClosed', 'read loop'] + if error_type in ignored_errors or any(e.lower() in error_msg.lower() for e in ignored_errors): + # 如果不是主动停止,触发重连 + if not self._stop_requested and self.running: + self.running = False + self._schedule_reconnect() + return + + # 其他错误记录日志(但不刷屏) + if self.running and not self._stop_requested: + logger.warning(f"WebSocket 消息: {msg}") + return + + symbol = msg.get('s') # 交易对 + price_str = msg.get('c') # 最新价格 + + if not symbol or not price_str: + return + + price = float(price_str) + + # 更新最新价格缓存和消息时间 + self.latest_prices[symbol] = price + self._last_message_time[symbol] = time.time() + + # 调用所有注册的回调函数 + with self._lock: + callbacks = self.price_callbacks.copy() + + for callback in callbacks: + try: + callback(symbol, price) + except Exception as e: + logger.error(f"价格回调执行出错: {e}") + + except Exception as e: + if self.running and not self._stop_requested: + logger.error(f"处理价格更新出错: {e}") + + def get_latest_price(self, symbol: str) -> Optional[float]: + """获取交易对的最新缓存价格""" + return self.latest_prices.get(symbol.upper()) + + def get_subscribed_symbols(self) -> List[str]: + """获取已订阅的交易对列表""" + return list(self.subscribed_symbols.keys()) + + def is_running(self) -> bool: + """检查服务是否在运行""" + return self.running + + +# 全局单例 +_price_monitor_service: Optional[PriceMonitorService] = None + + +def get_price_monitor_service() -> PriceMonitorService: + """获取价格监控服务单例""" + global _price_monitor_service + if _price_monitor_service is None: + _price_monitor_service = PriceMonitorService() + return _price_monitor_service diff --git a/backend/app/services/telegram_service.py b/backend/app/services/telegram_service.py index a948361..e6ed217 100644 --- a/backend/app/services/telegram_service.py +++ b/backend/app/services/telegram_service.py @@ -21,9 +21,14 @@ class TelegramService: settings = get_settings() self.bot_token = bot_token or getattr(settings, 'telegram_bot_token', '') self.channel_id = channel_id or getattr(settings, 'telegram_channel_id', '') - self.enabled = bool(self.bot_token and self.channel_id) + # 检查配置开关和必要参数是否都有效 + config_enabled = getattr(settings, 'telegram_enabled', True) + self.enabled = config_enabled and bool(self.bot_token and self.channel_id) - if self.enabled: + if not config_enabled: + self.api_base = "" + logger.info("Telegram 通知已通过配置禁用") + elif self.enabled: self.api_base = f"https://api.telegram.org/bot{self.bot_token}" logger.info(f"Telegram 通知服务初始化完成,频道: {self.channel_id}") else: diff --git a/backend/run_crypto_agent.py b/backend/run_crypto_agent.py index 3cc3aaa..66fac18 100644 --- a/backend/run_crypto_agent.py +++ b/backend/run_crypto_agent.py @@ -30,4 +30,9 @@ async def main(): if __name__ == "__main__": - asyncio.run(main()) + try: + asyncio.run(main()) + except KeyboardInterrupt: + # 静默处理 Ctrl+C,避免 WebSocket 关闭时的错误刷屏 + print("\n程序已退出") + sys.exit(0) diff --git a/frontend/paper-trading.html b/frontend/paper-trading.html new file mode 100644 index 0000000..eab6ab5 --- /dev/null +++ b/frontend/paper-trading.html @@ -0,0 +1,726 @@ + + +
+ + +暂无活跃订单
+| 订单ID | +交易对 | +方向 | +等级 | +入场价 | +止损 | +止盈 | +仓位 | +开仓时间 | +操作 | +
|---|---|---|---|---|---|---|---|---|---|
| {{ order.order_id.slice(-12) }} | +{{ order.symbol }} | +{{ order.side === 'long' ? '做多' : '做空' }} | +{{ order.signal_grade }} | +${{ order.entry_price?.toLocaleString() }} | +${{ order.stop_loss?.toLocaleString() }} | +${{ order.take_profit?.toLocaleString() }} | +${{ order.quantity }} | +{{ formatTime(order.opened_at) }} | ++ + | +
暂无历史订单
+| 订单ID | +交易对 | +方向 | +等级 | +入场价 | +出场价 | +盈亏 | +状态 | +平仓时间 | +
|---|---|---|---|---|---|---|---|---|
| {{ order.order_id.slice(-12) }} | +{{ order.symbol }} | +{{ order.side === 'long' ? '做多' : '做空' }} | +{{ order.signal_grade }} | +${{ order.filled_price?.toLocaleString() }} | +${{ order.exit_price?.toLocaleString() }} | ++ + {{ order.pnl_percent >= 0 ? '+' : '' }}{{ order.pnl_percent?.toFixed(2) }}% + (${{ order.pnl_amount >= 0 ? '+' : '' }}{{ order.pnl_amount?.toFixed(2) }}) + + | +{{ formatStatus(order.status) }} | +{{ formatTime(order.closed_at) }} | +