""" FastAPI主应用 """ import asyncio from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from contextlib import asynccontextmanager from app.config import get_settings from app.utils.logger import logger from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks, signals from app.utils.error_handler import setup_global_exception_handler, init_error_notifier import os # 后台任务 _price_monitor_task = None _report_task = None _stock_agent_task = None _crypto_agent_task = None async def price_monitor_loop(): """后台价格监控循环 - 使用轮询检查止盈止损""" from app.services.paper_trading_service import get_paper_trading_service from app.services.binance_service import binance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service logger.info("后台价格监控任务已启动(轮询模式)") feishu = get_feishu_service() telegram = get_telegram_service() paper_trading = get_paper_trading_service() # 价格更新回调 - 检查止盈止损 async def on_price_update(symbol: str, price: float): """价格更新时检查止盈止损""" try: # 检查止盈止损 triggered = paper_trading.check_price_triggers(symbol, price) # 发送通知 for result in triggered: status = result.get('status', '') event_type = result.get('event_type', 'order_closed') # 处理挂单成交事件 if event_type == 'order_filled': side_text = "做多" if result.get('side') == 'long' else "做空" grade = result.get('signal_grade', 'N/A') message = f"""✅ 挂单成交 交易对: {result.get('symbol')} 方向: {side_text} 等级: {grade} 挂单价: ${result.get('entry_price', 0):,.2f} 成交价: ${result.get('filled_price', 0):,.2f} 仓位: ${result.get('quantity', 0):,.0f} 止损: ${result.get('stop_loss', 0):,.2f} 止盈: ${result.get('take_profit', 0):,.2f}""" # 发送通知 await feishu.send_text(message) await telegram.send_message(message) logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}") continue # 处理订单平仓事件 is_win = result.get('is_win', False) if status == 'closed_tp': emoji = "🎯" status_text = "止盈平仓" elif status == 'closed_sl': emoji = "🛑" status_text = "止损平仓" elif status == 'closed_be': emoji = "🔒" status_text = "保本止损" else: emoji = "📤" status_text = "平仓" win_text = "盈利" if is_win else "亏损" side_text = "做多" if result.get('side') == 'long' else "做空" message = f"""{emoji} 订单{status_text} 交易对: {result.get('symbol')} 方向: {side_text} 入场: ${result.get('entry_price', 0):,.2f} 出场: ${result.get('exit_price', 0):,.2f} {win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f}) 持仓时间: {result.get('hold_duration', 'N/A')}""" # 发送通知 await feishu.send_text(message) await telegram.send_message(message) logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}") except Exception as e: logger.error(f"处理 {symbol} 价格更新失败: {e}") # 持续监控活跃订单 while True: try: # 获取活跃订单 active_orders = paper_trading.get_active_orders() if not active_orders: await asyncio.sleep(10) # 没有活跃订单时,10秒检查一次 continue # 获取所有需要的交易对 symbols = set(order.get('symbol') for order in active_orders if order.get('symbol')) # 获取价格并检查止盈止损 for symbol in symbols: try: price = binance_service.get_current_price(symbol) if not price: continue # 检查止盈止损 triggered = paper_trading.check_price_triggers(symbol, price) # 发送通知 for result in triggered: status = result.get('status', '') event_type = result.get('event_type', 'order_closed') # 处理止损移动事件 if event_type == 'stop_loss_moved': move_type = result.get('move_type', '') side_text = "做多" if result.get('side') == 'long' else "做空" pnl = result.get('current_pnl_percent', 0) if move_type == 'trailing_first': message = f"""📈 移动止损已激活 交易对: {result.get('symbol')} 方向: {side_text} 当前盈利: {pnl:+.2f}% 新止损价: ${result.get('new_stop_loss', 0):,.2f} 💰 锁定利润,让利润奔跑""" elif move_type == 'trailing_update': message = f"""📊 止损已上移 交易对: {result.get('symbol')} 方向: {side_text} 当前盈利: {pnl:+.2f}% 新止损价: ${result.get('new_stop_loss', 0):,.2f} 🎯 继续锁定更多利润""" elif move_type == 'breakeven': message = f"""🔒 保本止损已触发 交易对: {result.get('symbol')} 方向: {side_text} 当前盈利: {pnl:+.2f}% 止损移至: ${result.get('new_stop_loss', 0):,.2f} (保本价)""" # 发送通知 await feishu.send_text(message) await telegram.send_message(message) logger.info(f"后台监控触发止损移动: {result.get('order_id')} | {symbol}") continue # 处理挂单成交事件 if event_type == 'order_filled': side_text = "做多" if result.get('side') == 'long' else "做空" grade = result.get('signal_grade', 'N/A') message = f"""✅ 挂单成交 交易对: {result.get('symbol')} 方向: {side_text} 等级: {grade} 挂单价: ${result.get('entry_price', 0):,.2f} 成交价: ${result.get('filled_price', 0):,.2f} 仓位: ${result.get('quantity', 0):,.0f} 止损: ${result.get('stop_loss', 0):,.2f} 止盈: ${result.get('take_profit', 0):,.2f}""" # 发送通知 await feishu.send_text(message) await telegram.send_message(message) logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}") continue # 处理订单平仓事件 is_win = result.get('is_win', False) if status == 'closed_tp': emoji = "🎯" status_text = "止盈平仓" elif status == 'closed_sl': emoji = "🛑" status_text = "止损平仓" elif status == 'closed_be': emoji = "🔒" status_text = "保本止损" else: emoji = "📤" status_text = "平仓" win_text = "盈利" if is_win else "亏损" side_text = "做多" if result.get('side') == 'long' else "做空" message = f"""{emoji} 订单{status_text} 交易对: {result.get('symbol')} 方向: {side_text} 入场: ${result.get('entry_price', 0):,.2f} 出场: ${result.get('exit_price', 0):,.2f} {win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f}) 持仓时间: {result.get('hold_duration', 'N/A')}""" # 发送通知 await feishu.send_text(message) await telegram.send_message(message) logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}") except Exception as e: logger.error(f"检查 {symbol} 价格失败: {e}") # 每 3 秒检查一次 await asyncio.sleep(3) except Exception as e: logger.error(f"价格监控循环出错: {e}") await asyncio.sleep(5) async def periodic_report_loop(): """定时报告循环 - 每4小时发送一次模拟交易报告""" from datetime import datetime from app.services.paper_trading_service import get_paper_trading_service from app.services.telegram_service import get_telegram_service logger.info("定时报告任务已启动") # 计算距离下一个整4小时的等待时间 def get_seconds_until_next_4h(): now = datetime.now() current_hour = now.hour # 下一个4小时整点: 0, 4, 8, 12, 16, 20 next_4h = ((current_hour // 4) + 1) * 4 if next_4h >= 24: next_4h = 0 # 需要等到明天 from datetime import timedelta next_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) else: next_time = now.replace(hour=next_4h, minute=0, second=0, microsecond=0) wait_seconds = (next_time - now).total_seconds() return int(wait_seconds), next_time while True: try: # 计算等待时间 wait_seconds, next_time = get_seconds_until_next_4h() logger.info(f"下次报告时间: {next_time.strftime('%Y-%m-%d %H:%M')},等待 {wait_seconds // 3600}小时{(wait_seconds % 3600) // 60}分钟") # 等待到下一个4小时整点 await asyncio.sleep(wait_seconds) # 生成并发送报告 paper_trading = get_paper_trading_service() telegram = get_telegram_service() report = paper_trading.generate_report(hours=4) await telegram.send_message(report, parse_mode="HTML") logger.info("已发送4小时模拟交易报告") except Exception as e: logger.error(f"定时报告循环出错: {e}") import traceback logger.error(traceback.format_exc()) await asyncio.sleep(60) # 出错后等待1分钟再重试 @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" global _price_monitor_task, _report_task, _stock_agent_task, _crypto_agent_task # 启动时执行 logger.info("应用启动") # 初始化全局异常处理器 setup_global_exception_handler() logger.info("全局异常处理器已安装") # 初始化飞书错误通知 try: from app.services.feishu_service import get_feishu_service feishu_service = get_feishu_service() init_error_notifier( feishu_service=feishu_service, enabled=True, # 启用异常通知 cooldown=300 # 5分钟冷却时间 ) logger.info("✅ 系统异常通知已启用(异常将发送到飞书)") except Exception as e: logger.warning(f"飞书异常通知初始化失败: {e}") # 启动后台任务 settings = get_settings() if getattr(settings, 'paper_trading_enabled', True): _price_monitor_task = asyncio.create_task(price_monitor_loop()) logger.info("后台价格监控任务已创建") _report_task = asyncio.create_task(periodic_report_loop()) logger.info("定时报告任务已创建") # 启动加密货币智能体 if getattr(settings, 'crypto_symbols', '') and settings.crypto_symbols.strip(): try: from app.crypto_agent.crypto_agent import get_crypto_agent crypto_agent = get_crypto_agent() # 防止重复启动 if not crypto_agent.running: _crypto_agent_task = asyncio.create_task(crypto_agent.run()) logger.info(f"加密货币智能体已启动,监控: {settings.crypto_symbols}") else: logger.info("加密货币智能体已在运行中") except Exception as e: logger.error(f"加密货币智能体启动失败: {e}") # 启动美股智能体 if getattr(settings, 'stock_symbols', '') and settings.stock_symbols.strip(): try: from app.stock_agent.stock_agent import get_stock_agent stock_agent = get_stock_agent() _stock_agent_task = asyncio.create_task(stock_agent.start()) # 设置智能体实例到 API 模块 stocks.set_stock_agent(stock_agent) logger.info(f"美股智能体已启动,监控: {settings.stock_symbols}") except Exception as e: logger.error(f"美股智能体启动失败: {e}") logger.error(f"提示: 请确保已安装 yfinance (pip install yfinance)") yield # 关闭时执行 if _price_monitor_task: _price_monitor_task.cancel() try: await _price_monitor_task except asyncio.CancelledError: pass logger.info("后台价格监控任务已停止") if _report_task: _report_task.cancel() try: await _report_task except asyncio.CancelledError: pass logger.info("定时报告任务已停止") # 停止加密货币智能体 if _crypto_agent_task: _crypto_agent_task.cancel() try: await _crypto_agent_task except asyncio.CancelledError: pass logger.info("加密货币智能体已停止") # 停止美股智能体 if _stock_agent_task: _stock_agent_task.cancel() try: await _stock_agent_task except asyncio.CancelledError: pass logger.info("美股智能体已停止") logger.info("应用关闭") # 创建FastAPI应用 app = FastAPI( title="A股AI分析Agent系统", description="基于AI Agent的股票智能分析系统", version="1.0.0", lifespan=lifespan ) # 配置CORS settings = get_settings() app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origins.split(","), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 注册路由 app.include_router(auth.router, tags=["认证"]) app.include_router(admin.router, tags=["后台管理"]) app.include_router(chat.router, prefix="/api/chat", tags=["对话"]) app.include_router(stock.router, prefix="/api/stock", tags=["股票数据"]) app.include_router(skills.router, prefix="/api/skills", tags=["技能管理"]) app.include_router(llm.router, tags=["LLM模型"]) app.include_router(paper_trading.router, tags=["模拟交易"]) app.include_router(stocks.router, prefix="/api/stocks", tags=["美股分析"]) app.include_router(signals.router, tags=["信号管理"]) # 挂载静态文件 frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "frontend") if os.path.exists(frontend_path): app.mount("/static", StaticFiles(directory=frontend_path), name="static") @app.get("/") async def root(): """根路径,返回主应用页面""" index_path = os.path.join(frontend_path, "index.html") if os.path.exists(index_path): return FileResponse(index_path) return {"message": "页面不存在"} @app.get("/app") async def app_page(): """主应用页面""" index_path = os.path.join(frontend_path, "index.html") if os.path.exists(index_path): return FileResponse(index_path) return {"message": "页面不存在"} @app.get("/health") async def health_check(): """健康检查""" return {"status": "healthy"} @app.get("/paper-trading") async def paper_trading_page(): """模拟交易页面""" page_path = os.path.join(frontend_path, "paper-trading.html") if os.path.exists(page_path): return FileResponse(page_path) return {"message": "页面不存在"} @app.get("/signals") async def signals_page(): """信号列表页面""" page_path = os.path.join(frontend_path, "signals.html") if os.path.exists(page_path): return FileResponse(page_path) return {"message": "页面不存在"} if __name__ == "__main__": import uvicorn # 设置全局异常处理器(防止主线程异常退出) setup_global_exception_handler() try: uvicorn.run( "app.main:app", host=settings.api_host, port=settings.api_port, reload=settings.debug ) except Exception as e: logger.error(f"应用启动失败: {e}") import traceback logger.error(traceback.format_exc()) raise