stock-ai-agent/backend/app/main.py
2026-02-20 20:14:22 +08:00

335 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FastAPI主应用
"""
import asyncio
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from contextlib import asynccontextmanager
from app.config import get_settings
from app.utils.logger import logger
from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks, signals
import os
# 后台任务
_price_monitor_task = None
_report_task = None
_stock_agent_task = None
_crypto_agent_task = None
async def price_monitor_loop():
"""后台价格监控循环 - 检查止盈止损"""
from app.services.paper_trading_service import get_paper_trading_service
from app.services.binance_service import binance_service
from app.services.feishu_service import get_feishu_service
from app.services.telegram_service import get_telegram_service
logger.info("后台价格监控任务已启动")
while True:
try:
paper_trading = get_paper_trading_service()
feishu = get_feishu_service()
telegram = get_telegram_service()
# 获取活跃订单
active_orders = paper_trading.get_active_orders()
if not active_orders:
await asyncio.sleep(10) # 没有活跃订单时10秒检查一次
continue
# 获取所有需要的交易对
symbols = set(order.get('symbol') for order in active_orders if order.get('symbol'))
# 获取价格并检查止盈止损
for symbol in symbols:
try:
price = binance_service.get_current_price(symbol)
if not price:
continue
# 检查止盈止损
triggered = paper_trading.check_price_triggers(symbol, price)
# 发送通知
for result in triggered:
status = result.get('status', '')
event_type = result.get('event_type', 'order_closed')
# 处理挂单成交事件
if event_type == 'order_filled':
side_text = "做多" if result.get('side') == 'long' else "做空"
grade = result.get('signal_grade', 'N/A')
message = f"""✅ 挂单成交
交易对: {result.get('symbol')}
方向: {side_text}
等级: {grade}
挂单价: ${result.get('entry_price', 0):,.2f}
成交价: ${result.get('filled_price', 0):,.2f}
仓位: ${result.get('quantity', 0):,.0f}
止损: ${result.get('stop_loss', 0):,.2f}
止盈: ${result.get('take_profit', 0):,.2f}"""
# 发送通知
await feishu.send_text(message)
await telegram.send_message(message)
logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}")
continue
# 处理订单平仓事件
is_win = result.get('is_win', False)
if status == 'closed_tp':
emoji = "🎯"
status_text = "止盈平仓"
elif status == 'closed_sl':
emoji = "🛑"
status_text = "止损平仓"
elif status == 'closed_be':
emoji = "🔒"
status_text = "保本止损"
else:
emoji = "📤"
status_text = "平仓"
win_text = "盈利" if is_win else "亏损"
side_text = "做多" if result.get('side') == 'long' else "做空"
message = f"""{emoji} 订单{status_text}
交易对: {result.get('symbol')}
方向: {side_text}
入场: ${result.get('entry_price', 0):,.2f}
出场: ${result.get('exit_price', 0):,.2f}
{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})
持仓时间: {result.get('hold_duration', 'N/A')}"""
# 发送通知
await feishu.send_text(message)
await telegram.send_message(message)
logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}")
except Exception as e:
logger.error(f"检查 {symbol} 价格失败: {e}")
# 每 3 秒检查一次
await asyncio.sleep(3)
except Exception as e:
logger.error(f"价格监控循环出错: {e}")
await asyncio.sleep(5)
async def periodic_report_loop():
"""定时报告循环 - 每4小时发送一次模拟交易报告"""
from datetime import datetime
from app.services.paper_trading_service import get_paper_trading_service
from app.services.telegram_service import get_telegram_service
logger.info("定时报告任务已启动")
# 计算距离下一个整4小时的等待时间
def get_seconds_until_next_4h():
now = datetime.now()
current_hour = now.hour
# 下一个4小时整点: 0, 4, 8, 12, 16, 20
next_4h = ((current_hour // 4) + 1) * 4
if next_4h >= 24:
next_4h = 0
# 需要等到明天
from datetime import timedelta
next_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
else:
next_time = now.replace(hour=next_4h, minute=0, second=0, microsecond=0)
wait_seconds = (next_time - now).total_seconds()
return int(wait_seconds), next_time
while True:
try:
# 计算等待时间
wait_seconds, next_time = get_seconds_until_next_4h()
logger.info(f"下次报告时间: {next_time.strftime('%Y-%m-%d %H:%M')},等待 {wait_seconds // 3600}小时{(wait_seconds % 3600) // 60}分钟")
# 等待到下一个4小时整点
await asyncio.sleep(wait_seconds)
# 生成并发送报告
paper_trading = get_paper_trading_service()
telegram = get_telegram_service()
report = paper_trading.generate_report(hours=4)
await telegram.send_message(report, parse_mode="HTML")
logger.info("已发送4小时模拟交易报告")
except Exception as e:
logger.error(f"定时报告循环出错: {e}")
import traceback
logger.error(traceback.format_exc())
await asyncio.sleep(60) # 出错后等待1分钟再重试
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
global _price_monitor_task, _report_task, _stock_agent_task, _crypto_agent_task
# 启动时执行
logger.info("应用启动")
# 启动后台任务
settings = get_settings()
if getattr(settings, 'paper_trading_enabled', True):
_price_monitor_task = asyncio.create_task(price_monitor_loop())
logger.info("后台价格监控任务已创建")
_report_task = asyncio.create_task(periodic_report_loop())
logger.info("定时报告任务已创建")
# 启动加密货币智能体
if getattr(settings, 'crypto_symbols', '') and settings.crypto_symbols.strip():
try:
from app.crypto_agent.crypto_agent import crypto_agent
_crypto_agent_task = asyncio.create_task(crypto_agent.run())
logger.info(f"加密货币智能体已启动,监控: {settings.crypto_symbols}")
except Exception as e:
logger.error(f"加密货币智能体启动失败: {e}")
# 启动美股智能体
if getattr(settings, 'stock_symbols', '') and settings.stock_symbols.strip():
try:
from app.stock_agent.stock_agent import get_stock_agent
stock_agent = get_stock_agent()
_stock_agent_task = asyncio.create_task(stock_agent.start())
# 设置智能体实例到 API 模块
stocks.set_stock_agent(stock_agent)
logger.info(f"美股智能体已启动,监控: {settings.stock_symbols}")
except Exception as e:
logger.error(f"美股智能体启动失败: {e}")
logger.error(f"提示: 请确保已安装 yfinance (pip install yfinance)")
yield
# 关闭时执行
if _price_monitor_task:
_price_monitor_task.cancel()
try:
await _price_monitor_task
except asyncio.CancelledError:
pass
logger.info("后台价格监控任务已停止")
if _report_task:
_report_task.cancel()
try:
await _report_task
except asyncio.CancelledError:
pass
logger.info("定时报告任务已停止")
# 停止加密货币智能体
if _crypto_agent_task:
_crypto_agent_task.cancel()
try:
await _crypto_agent_task
except asyncio.CancelledError:
pass
logger.info("加密货币智能体已停止")
# 停止美股智能体
if _stock_agent_task:
_stock_agent_task.cancel()
try:
await _stock_agent_task
except asyncio.CancelledError:
pass
logger.info("美股智能体已停止")
logger.info("应用关闭")
# 创建FastAPI应用
app = FastAPI(
title="A股AI分析Agent系统",
description="基于AI Agent的股票智能分析系统",
version="1.0.0",
lifespan=lifespan
)
# 配置CORS
settings = get_settings()
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins.split(","),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(auth.router, tags=["认证"])
app.include_router(admin.router, tags=["后台管理"])
app.include_router(chat.router, prefix="/api/chat", tags=["对话"])
app.include_router(stock.router, prefix="/api/stock", tags=["股票数据"])
app.include_router(skills.router, prefix="/api/skills", tags=["技能管理"])
app.include_router(llm.router, tags=["LLM模型"])
app.include_router(paper_trading.router, tags=["模拟交易"])
app.include_router(stocks.router, prefix="/api/stocks", tags=["美股分析"])
app.include_router(signals.router, tags=["信号管理"])
# 挂载静态文件
frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "frontend")
if os.path.exists(frontend_path):
app.mount("/static", StaticFiles(directory=frontend_path), name="static")
@app.get("/")
async def root():
"""根路径,返回主应用页面"""
index_path = os.path.join(frontend_path, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"message": "页面不存在"}
@app.get("/app")
async def app_page():
"""主应用页面"""
index_path = os.path.join(frontend_path, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"message": "页面不存在"}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
@app.get("/paper-trading")
async def paper_trading_page():
"""模拟交易页面"""
page_path = os.path.join(frontend_path, "paper-trading.html")
if os.path.exists(page_path):
return FileResponse(page_path)
return {"message": "页面不存在"}
@app.get("/signals")
async def signals_page():
"""信号列表页面"""
page_path = os.path.join(frontend_path, "signals.html")
if os.path.exists(page_path):
return FileResponse(page_path)
return {"message": "页面不存在"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host=settings.api_host,
port=settings.api_port,
reload=settings.debug
)