stock-ai-agent/backend/app/main.py
2026-02-07 01:14:08 +08:00

194 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FastAPI主应用
"""
import asyncio
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from contextlib import asynccontextmanager
from app.config import get_settings
from app.utils.logger import logger
from app.api import chat, stock, skills, llm, auth, admin, paper_trading
import os
# 后台价格监控任务
_price_monitor_task = None
async def price_monitor_loop():
"""后台价格监控循环 - 检查止盈止损"""
from app.services.paper_trading_service import get_paper_trading_service
from app.services.binance_service import binance_service
from app.services.feishu_service import get_feishu_service
from app.services.telegram_service import get_telegram_service
logger.info("后台价格监控任务已启动")
while True:
try:
paper_trading = get_paper_trading_service()
feishu = get_feishu_service()
telegram = get_telegram_service()
# 获取活跃订单
active_orders = paper_trading.get_active_orders()
if not active_orders:
await asyncio.sleep(10) # 没有活跃订单时10秒检查一次
continue
# 获取所有需要的交易对
symbols = set(order.get('symbol') for order in active_orders if order.get('symbol'))
# 获取价格并检查止盈止损
for symbol in symbols:
try:
price = binance_service.get_current_price(symbol)
if not price:
continue
# 检查止盈止损
triggered = paper_trading.check_price_triggers(symbol, price)
# 发送通知
for result in triggered:
status = result.get('status', '')
is_win = result.get('is_win', False)
if status == 'closed_tp':
emoji = "🎯"
status_text = "止盈平仓"
elif status == 'closed_sl':
emoji = "🛑"
status_text = "止损平仓"
else:
emoji = "📤"
status_text = "平仓"
win_text = "盈利" if is_win else "亏损"
side_text = "做多" if result.get('side') == 'long' else "做空"
message = f"""{emoji} 订单{status_text}
交易对: {result.get('symbol')}
方向: {side_text}
入场: ${result.get('entry_price', 0):,.2f}
出场: ${result.get('exit_price', 0):,.2f}
{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})
持仓时间: {result.get('hold_duration', 'N/A')}"""
# 发送通知
await feishu.send_text(message)
await telegram.send_message(message)
logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}")
except Exception as e:
logger.error(f"检查 {symbol} 价格失败: {e}")
# 每 3 秒检查一次
await asyncio.sleep(3)
except Exception as e:
logger.error(f"价格监控循环出错: {e}")
await asyncio.sleep(5)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
global _price_monitor_task
# 启动时执行
logger.info("应用启动")
# 启动后台价格监控任务
settings = get_settings()
if getattr(settings, 'paper_trading_enabled', True):
_price_monitor_task = asyncio.create_task(price_monitor_loop())
logger.info("后台价格监控任务已创建")
yield
# 关闭时执行
if _price_monitor_task:
_price_monitor_task.cancel()
try:
await _price_monitor_task
except asyncio.CancelledError:
pass
logger.info("后台价格监控任务已停止")
logger.info("应用关闭")
# 创建FastAPI应用
app = FastAPI(
title="A股AI分析Agent系统",
description="基于AI Agent的股票智能分析系统",
version="1.0.0",
lifespan=lifespan
)
# 配置CORS
settings = get_settings()
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins.split(","),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(auth.router, tags=["认证"])
app.include_router(admin.router, tags=["后台管理"])
app.include_router(chat.router, prefix="/api/chat", tags=["对话"])
app.include_router(stock.router, prefix="/api/stock", tags=["股票数据"])
app.include_router(skills.router, prefix="/api/skills", tags=["技能管理"])
app.include_router(llm.router, tags=["LLM模型"])
app.include_router(paper_trading.router, tags=["模拟交易"])
# 挂载静态文件
frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "frontend")
if os.path.exists(frontend_path):
app.mount("/static", StaticFiles(directory=frontend_path), name="static")
@app.get("/")
async def root():
"""根路径,返回主应用页面"""
index_path = os.path.join(frontend_path, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"message": "页面不存在"}
@app.get("/app")
async def app_page():
"""主应用页面"""
index_path = os.path.join(frontend_path, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"message": "页面不存在"}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
@app.get("/paper-trading")
async def paper_trading_page():
"""模拟交易页面"""
page_path = os.path.join(frontend_path, "paper-trading.html")
if os.path.exists(page_path):
return FileResponse(page_path)
return {"message": "页面不存在"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host=settings.api_host,
port=settings.api_port,
reload=settings.debug
)