""" FastAPI主应用 """ import asyncio from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from contextlib import asynccontextmanager from app.config import get_settings from app.utils.logger import logger from app.api import llm, auth, admin, paper_trading, signals, system, bitget_live from app.utils.error_handler import setup_global_exception_handler, init_error_notifier from app.utils.system_status import get_system_monitor import os # 后台任务 _price_monitor_task = None _crypto_agent_task = None async def price_monitor_loop(): """后台价格监控循环 - 使用轮询检查止盈止损""" from app.services.paper_trading_service import get_paper_trading_service from app.services.bitget_service import bitget_service from app.services.feishu_service import get_feishu_paper_trading_service from app.services.telegram_service import get_telegram_service logger.info("后台价格监控任务已启动(轮询模式)") feishu = get_feishu_paper_trading_service() # 使用 trading webhook telegram = get_telegram_service() paper_trading = get_paper_trading_service() # 价格更新回调 - 检查止盈止损 async def on_price_update(symbol: str, price: float): """价格更新时检查止盈止损""" try: # 检查止盈止损 triggered = paper_trading.check_price_triggers(symbol, price) # 发送通知 for result in triggered: status = result.get('status', '') event_type = result.get('event_type', 'order_closed') # 处理挂单成交事件 if event_type == 'order_filled': side_text = "做多" if result.get('side') == 'long' else "做空" side_icon = "🟢" if result.get('side') == 'long' else "🔴" grade = result.get('signal_grade', 'N/A') symbol = result.get('symbol', '') entry_price = result.get('entry_price', 0) filled_price = result.get('filled_price', 0) stop_loss = result.get('stop_loss', 0) take_profit = result.get('take_profit', 0) # 根据交易对精度格式化价格 try: from app.services.bitget_service import bitget_service precision = bitget_service.get_precision(symbol) price_fmt = f"{{:,.{precision['pricePrecision']}f}}" except: price_fmt = "{:,.2f}" title = f"✅ 挂单成交 - {symbol}" content_parts = [ f"{side_icon} **方向**: {side_text}", f"⭐ **信号等级**: {grade}", f"💰 **挂单价**: ${price_fmt.format(entry_price)}", f"🎯 **成交价**: ${price_fmt.format(filled_price)}", f"💵 **仓位**: ${result.get('notional', result.get('quantity', 0)):,.0f}", ] if stop_loss: content_parts.append(f"🛑 **止损**: ${price_fmt.format(stop_loss)}") if take_profit: content_parts.append(f"🎯 **止盈**: ${price_fmt.format(take_profit)}") content = "\n".join(content_parts) # 发送通知 await feishu.send_card(title, content, "green") await telegram.send_message(f"{title}\n\n{content}") logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}") continue # 处理订单平仓事件 is_win = result.get('is_win', False) if status == 'closed_tp': emoji = "🎯" status_text = "止盈平仓" color = "green" elif status == 'closed_ts': emoji = "📈" status_text = "移动止盈" color = "green" elif status == 'closed_sl': emoji = "🛑" status_text = "止损平仓" color = "red" elif status == 'closed_be': emoji = "🔒" status_text = "保本止损" color = "orange" else: emoji = "📤" status_text = "平仓" color = "blue" win_text = "盈利" if is_win else "亏损" win_emoji = "✅" if is_win else "❌" side_text = "做多" if result.get('side') == 'long' else "做空" side_icon = "🟢" if result.get('side') == 'long' else "🔴" title = f"{emoji} 订单{status_text} - {result.get('symbol')}" content_parts = [ f"{side_icon} **方向**: {side_text}", f"📊 **入场**: ${result.get('entry_price', 0):,.2f}", f"🎯 **出场**: ${result.get('exit_price', 0):,.2f}", f"{win_emoji} **{win_text}**: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})", f"⏱️ **持仓时间**: {result.get('hold_duration', 'N/A')}", ] content = "\n".join(content_parts) # 发送通知 await feishu.send_card(title, content, color) await telegram.send_message(f"{title}\n\n{content}") logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}") except Exception as e: logger.error(f"处理 {symbol} 价格更新失败: {e}") # 持续监控活跃订单 while True: try: # 获取活跃订单 active_orders = paper_trading.get_active_orders() if not active_orders: await asyncio.sleep(10) # 没有活跃订单时,10秒检查一次 continue # 获取所有需要的交易对 symbols = set(order.get('symbol') for order in active_orders if order.get('symbol')) # 获取价格并检查止盈止损 for symbol in symbols: try: price = bitget_service.get_current_price(symbol) if not price: continue # 检查止盈止损 triggered = paper_trading.check_price_triggers(symbol, price) # 发送通知 for result in triggered: status = result.get('status', '') event_type = result.get('event_type', 'order_closed') # 处理止损移动事件 if event_type == 'stop_loss_moved': move_type = result.get('move_type', '') side_text = "做多" if result.get('side') == 'long' else "做空" side_icon = '🟢' if result.get('side') == 'long' else '🔴' pnl = result.get('current_pnl_percent', 0) symbol_display = result.get('symbol', '') new_stop_loss = result.get('new_stop_loss', 0) # 根据交易对精度格式化价格 try: from app.services.bitget_service import bitget_service precision = bitget_service.get_precision(symbol_display) price_fmt = f"{{:,.{precision['pricePrecision']}f}}" except: price_fmt = "{:,.2f}" if move_type == 'trailing_first': title = f"📈 移动止损已激活 - {symbol_display}" content_parts = [ f"{side_icon} **方向**: {side_text}", f"", f"📈 **当前盈利**: {pnl:+.2f}%", f"🛑 **新止损价**: ${price_fmt.format(new_stop_loss)}", f"", f"💰 锁定利润,让利润奔跑" ] color = "green" elif move_type == 'trailing_update': title = f"📊 止损已上移 - {symbol_display}" content_parts = [ f"{side_icon} **方向**: {side_text}", f"", f"📈 **当前盈利**: {pnl:+.2f}%", f"🛑 **新止损价**: ${price_fmt.format(new_stop_loss)}", f"", f"🎯 继续锁定更多利润" ] color = "blue" elif move_type == 'breakeven': title = f"📈 移动止损已启动 - {symbol_display}" content_parts = [ f"{side_icon} **方向**: {side_text}", f"", f"📈 **当前盈利**: {pnl:+.2f}%", f"🛑 **新止损价**: ${price_fmt.format(new_stop_loss)}", f"", f"💰 锁定利润,让利润奔跑" ] color = "green" else: continue content = "\n".join(content_parts) # 发送通知 await feishu.send_card(title, content, color) if telegram: message = f"{title}\n\n{content}" await telegram.send_message(message) logger.info(f"后台监控触发止损移动: {result.get('order_id')} | {symbol}") continue # 处理挂单成交事件 if event_type == 'order_filled': side_text = "做多" if result.get('side') == 'long' else "做空" side_icon = '🟢' if result.get('side') == 'long' else '🔴' grade = result.get('signal_grade', 'N/A') symbol = result.get('symbol', '') entry_price = result.get('entry_price', 0) filled_price = result.get('filled_price', 0) stop_loss = result.get('stop_loss', 0) take_profit = result.get('take_profit', 0) # 根据交易对精度格式化价格 try: from app.services.bitget_service import bitget_service precision = bitget_service.get_precision(symbol) price_fmt = f"{{:,.{precision['pricePrecision']}f}}" except: price_fmt = "{:,.2f}" title = f"✅ 挂单成交 - {symbol}" content_parts = [ f"{side_icon} **方向**: {side_text}", f"", f"⭐ **信号等级**: {grade}", f"", f"💰 **挂单价**: ${price_fmt.format(entry_price)}", f"🎯 **成交价**: ${price_fmt.format(filled_price)}", f"📊 **名义仓位**: ${result.get('notional', result.get('quantity', 0)):,.0f}", f"", f"🛑 **止损价**: ${price_fmt.format(stop_loss)}", f"🎯 **止盈价**: ${price_fmt.format(take_profit)}" ] content = "\n".join(content_parts) # 发送通知 await feishu.send_card(title, content, "blue") if telegram: message = f"{title}\n\n{content}" await telegram.send_message(message) logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}") continue # 处理订单平仓事件 is_win = result.get('is_win', False) if status == 'closed_tp': emoji = "🎯" status_text = "止盈平仓" color = "green" elif status == 'closed_sl': emoji = "🛑" status_text = "止损平仓" color = "red" elif status == 'closed_be': emoji = "📈" status_text = "移动止损" color = "orange" else: emoji = "📤" status_text = "平仓" color = "blue" win_text = "盈利" if is_win else "亏损" win_emoji = "✅" if is_win else "❌" side_text = "做多" if result.get('side') == 'long' else "做空" side_icon = "🟢" if result.get('side') == 'long' else "🔴" title = f"{emoji} 订单{status_text} - {result.get('symbol')}" content_parts = [ f"{side_icon} **方向**: {side_text}", f"📊 **入场**: ${result.get('entry_price', 0):,.2f}", f"🎯 **出场**: ${result.get('exit_price', 0):,.2f}", f"{win_emoji} **{win_text}**: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})", f"⏱️ **持仓时间**: {result.get('hold_duration', 'N/A')}", ] content = "\n".join(content_parts) # 发送通知 await feishu.send_card(title, content, color) await telegram.send_message(f"{title}\n\n{content}") logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}") except Exception as e: logger.error(f"检查 {symbol} 价格失败: {e}") # 每 3 秒检查一次 await asyncio.sleep(3) except Exception as e: logger.error(f"价格监控循环出错: {e}") await asyncio.sleep(5) async def _print_system_status(): """打印系统状态摘要""" from app.utils.system_status import get_system_monitor monitor = get_system_monitor() summary = monitor.get_summary() logger.info("\n" + "=" * 60) logger.info("📊 系统状态摘要") logger.info("=" * 60) logger.info(f"启动时间: {summary['system_start_time']}") logger.info(f"运行时长: {int(summary['uptime_seconds'] // 60)} 分钟") logger.info(f"Agent 总数: {summary['total_agents']}") logger.info(f"运行中: {summary['running_agents']} | 错误: {summary['error_agents']}") if summary['agents']: logger.info("\n🤖 Agent 详情:") for agent_id, agent_info in summary['agents'].items(): status_icon = { "运行中": "✅", "启动中": "🔄", "已停止": "⏸️", "错误": "❌", "未初始化": "⚪" }.get(agent_info['status'], "❓") logger.info(f" {status_icon} {agent_info['name']} - {agent_info['status']}") # 显示配置 config = agent_info.get('config', {}) if config: if 'symbols' in config: logger.info(f" 监控: {', '.join(config['symbols'])}") if 'paper_trading_enabled' in config: pt_status = "已启用" if config['paper_trading_enabled'] else "未启用" logger.info(f" 交易: {pt_status}") if 'analysis_interval' in config: logger.info(f" 分析间隔: {config['analysis_interval']}") logger.info("=" * 60 + "\n") @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" global _price_monitor_task, _crypto_agent_task # 启动时执行 logger.info("应用启动") # 初始化全局异常处理器 setup_global_exception_handler() logger.info("全局异常处理器已安装") # 初始化飞书错误通知 try: from app.services.feishu_service import FeishuService from app.config import get_settings settings = get_settings() # 使用专用的系统异常 webhook feishu_error_service = FeishuService( webhook_url=settings.feishu_error_webhook_url, service_type="error" ) init_error_notifier( feishu_service=feishu_error_service, enabled=True, # 启用异常通知 cooldown=300 # 5分钟冷却时间 ) logger.info("✅ 系统异常通知已启用(异常将发送到飞书)") except Exception as e: logger.warning(f"飞书异常通知初始化失败: {e}") # 启动后台任务 settings = get_settings() if getattr(settings, 'paper_trading_enabled', True): _price_monitor_task = asyncio.create_task(price_monitor_loop()) logger.info("后台价格监控任务已创建") # 启动加密货币智能体 if getattr(settings, 'crypto_symbols', '') and settings.crypto_symbols.strip(): try: from app.crypto_agent.crypto_agent import get_crypto_agent crypto_agent = get_crypto_agent() # 防止重复启动 if not crypto_agent.running: _crypto_agent_task = asyncio.create_task(crypto_agent.run()) logger.info(f"加密货币智能体已启动,监控: {settings.crypto_symbols}") else: logger.info("加密货币智能体已在运行中") except Exception as e: logger.error(f"加密货币智能体启动失败: {e}") # 显示系统状态摘要 await _print_system_status() yield # 关闭时执行 if _price_monitor_task: _price_monitor_task.cancel() try: await _price_monitor_task except asyncio.CancelledError: pass logger.info("后台价格监控任务已停止") # 停止加密货币智能体 if _crypto_agent_task: _crypto_agent_task.cancel() try: await _crypto_agent_task except asyncio.CancelledError: pass logger.info("加密货币智能体已停止") logger.info("应用关闭") # 创建FastAPI应用 app = FastAPI( title="Crypto Trading Agent", description="基于 AI 的加密货币交易分析与执行系统", version="1.0.0", lifespan=lifespan ) # 配置CORS settings = get_settings() app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins.split(","), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 注册路由 app.include_router(auth.router, tags=["认证"]) app.include_router(admin.router, tags=["后台管理"]) app.include_router(llm.router, tags=["LLM模型"]) app.include_router(paper_trading.router, tags=["交易"]) app.include_router(bitget_live.router, tags=["Bitget"]) app.include_router(signals.router, tags=["信号管理"]) app.include_router(system.router, prefix="/api/system", tags=["系统状态"]) # 挂载静态文件 frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "frontend") if os.path.exists(frontend_path): app.mount("/static", StaticFiles(directory=frontend_path), name="static") @app.get("/") async def root(): """根路径,返回主应用页面""" index_path = os.path.join(frontend_path, "trading.html") if os.path.exists(index_path): return FileResponse(index_path) return {"message": "页面不存在"} @app.get("/app") async def app_page(): """主应用页面""" index_path = os.path.join(frontend_path, "trading.html") if os.path.exists(index_path): return FileResponse(index_path) return {"message": "页面不存在"} @app.get("/health") async def health_check(): """健康检查""" return {"status": "healthy"} @app.get("/trading") async def trading_page(): """交易页面""" page_path = os.path.join(frontend_path, "trading.html") if os.path.exists(page_path): return FileResponse(page_path) return {"message": "页面不存在"} @app.get("/bitget-trading") async def bitget_trading_page(): """Bitget 交易页面兼容入口,统一跳转到当前 trading 页面""" page_path = os.path.join(frontend_path, "trading.html") if os.path.exists(page_path): return FileResponse(page_path) return {"message": "页面不存在"} @app.get("/signals") async def signals_page(): """信号列表页面""" page_path = os.path.join(frontend_path, "signals.html") if os.path.exists(page_path): return FileResponse(page_path) return {"message": "页面不存在"} @app.get("/console") async def console_page(): """系统总控台页面""" page_path = os.path.join(frontend_path, "console.html") if os.path.exists(page_path): return FileResponse(page_path) return {"message": "页面不存在"} if __name__ == "__main__": import uvicorn # 设置全局异常处理器(防止主线程异常退出) setup_global_exception_handler() try: uvicorn.run( "app.main:app", host=settings.api_host, port=settings.api_port, reload=settings.debug ) except Exception as e: logger.error(f"应用启动失败: {e}") import traceback logger.error(traceback.format_exc()) raise