stock-ai-agent/backend/app/main.py
2026-02-24 15:31:32 +08:00

557 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FastAPI主应用
"""
import asyncio
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from contextlib import asynccontextmanager
from app.config import get_settings
from app.utils.logger import logger
from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks, signals, system, real_trading
from app.utils.error_handler import setup_global_exception_handler, init_error_notifier
from app.utils.system_status import get_system_monitor
import os
# 后台任务
_price_monitor_task = None
_report_task = None
_stock_agent_task = None
_crypto_agent_task = None
async def price_monitor_loop():
"""后台价格监控循环 - 使用轮询检查止盈止损"""
from app.services.paper_trading_service import get_paper_trading_service
from app.services.bitget_service import bitget_service
from app.services.feishu_service import get_feishu_service
from app.services.telegram_service import get_telegram_service
logger.info("后台价格监控任务已启动(轮询模式)")
feishu = get_feishu_service()
telegram = get_telegram_service()
paper_trading = get_paper_trading_service()
# 价格更新回调 - 检查止盈止损
async def on_price_update(symbol: str, price: float):
"""价格更新时检查止盈止损"""
try:
# 检查止盈止损
triggered = paper_trading.check_price_triggers(symbol, price)
# 发送通知
for result in triggered:
status = result.get('status', '')
event_type = result.get('event_type', 'order_closed')
# 处理挂单成交事件
if event_type == 'order_filled':
side_text = "做多" if result.get('side') == 'long' else "做空"
grade = result.get('signal_grade', 'N/A')
message = f"""✅ 挂单成交
交易对: {result.get('symbol')}
方向: {side_text}
等级: {grade}
挂单价: ${result.get('entry_price', 0):,.2f}
成交价: ${result.get('filled_price', 0):,.2f}
仓位: ${result.get('quantity', 0):,.0f}
止损: ${result.get('stop_loss', 0):,.2f}
止盈: ${result.get('take_profit', 0):,.2f}"""
# 发送通知
await feishu.send_text(message)
await telegram.send_message(message)
logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}")
continue
# 处理订单平仓事件
is_win = result.get('is_win', False)
if status == 'closed_tp':
emoji = "🎯"
status_text = "止盈平仓"
elif status == 'closed_sl':
emoji = "🛑"
status_text = "止损平仓"
elif status == 'closed_be':
emoji = "🔒"
status_text = "保本止损"
else:
emoji = "📤"
status_text = "平仓"
win_text = "盈利" if is_win else "亏损"
side_text = "做多" if result.get('side') == 'long' else "做空"
message = f"""{emoji} 订单{status_text}
交易对: {result.get('symbol')}
方向: {side_text}
入场: ${result.get('entry_price', 0):,.2f}
出场: ${result.get('exit_price', 0):,.2f}
{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})
持仓时间: {result.get('hold_duration', 'N/A')}"""
# 发送通知
await feishu.send_text(message)
await telegram.send_message(message)
logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}")
except Exception as e:
logger.error(f"处理 {symbol} 价格更新失败: {e}")
# 持续监控活跃订单
while True:
try:
# 获取活跃订单
active_orders = paper_trading.get_active_orders()
if not active_orders:
await asyncio.sleep(10) # 没有活跃订单时10秒检查一次
continue
# 获取所有需要的交易对
symbols = set(order.get('symbol') for order in active_orders if order.get('symbol'))
# 获取价格并检查止盈止损
for symbol in symbols:
try:
price = bitget_service.get_current_price(symbol)
if not price:
continue
# 检查止盈止损
triggered = paper_trading.check_price_triggers(symbol, price)
# 发送通知
for result in triggered:
status = result.get('status', '')
event_type = result.get('event_type', 'order_closed')
# 处理止损移动事件
if event_type == 'stop_loss_moved':
move_type = result.get('move_type', '')
side_text = "做多" if result.get('side') == 'long' else "做空"
side_icon = '🟢' if result.get('side') == 'long' else '🔴'
pnl = result.get('current_pnl_percent', 0)
symbol_display = result.get('symbol', '')
if move_type == 'trailing_first':
title = f"📈 移动止损已激活 - {symbol_display}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"",
f"📈 **当前盈利**: {pnl:+.2f}%",
f"🛑 **新止损价**: ${result.get('new_stop_loss', 0):,.2f}",
f"",
f"💰 锁定利润,让利润奔跑"
]
color = "green"
elif move_type == 'trailing_update':
title = f"📊 止损已上移 - {symbol_display}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"",
f"📈 **当前盈利**: {pnl:+.2f}%",
f"🛑 **新止损价**: ${result.get('new_stop_loss', 0):,.2f}",
f"",
f"🎯 继续锁定更多利润"
]
color = "blue"
elif move_type == 'breakeven':
title = f"📈 移动止损已启动 - {symbol_display}"
content_parts = [
f"{side_icon} **方向**: {side_text}",
f"",
f"📈 **当前盈利**: {pnl:+.2f}%",
f"🛑 **新止损价**: ${result.get('new_stop_loss', 0):,.2f}",
f"",
f"💰 锁定利润,让利润奔跑"
]
color = "green"
else:
continue
content = "\n".join(content_parts)
# 发送通知
await feishu.send_card(title, content, color)
if telegram:
message = f"{title}\n\n{content}"
await telegram.send_message(message)
logger.info(f"后台监控触发止损移动: {result.get('order_id')} | {symbol}")
continue
# 处理挂单成交事件
if event_type == 'order_filled':
side_text = "做多" if result.get('side') == 'long' else "做空"
grade = result.get('signal_grade', 'N/A')
message = f"""✅ 挂单成交
交易对: {result.get('symbol')}
方向: {side_text}
等级: {grade}
挂单价: ${result.get('entry_price', 0):,.2f}
成交价: ${result.get('filled_price', 0):,.2f}
仓位: ${result.get('quantity', 0):,.0f}
止损: ${result.get('stop_loss', 0):,.2f}
止盈: ${result.get('take_profit', 0):,.2f}"""
# 发送通知
await feishu.send_text(message)
await telegram.send_message(message)
logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}")
continue
# 处理订单平仓事件
is_win = result.get('is_win', False)
if status == 'closed_tp':
emoji = "🎯"
status_text = "止盈平仓"
elif status == 'closed_sl':
emoji = "🛑"
status_text = "止损平仓"
elif status == 'closed_be':
emoji = "📈"
status_text = "移动止损"
else:
emoji = "📤"
status_text = "平仓"
win_text = "盈利" if is_win else "亏损"
side_text = "做多" if result.get('side') == 'long' else "做空"
message = f"""{emoji} 订单{status_text}
交易对: {result.get('symbol')}
方向: {side_text}
入场: ${result.get('entry_price', 0):,.2f}
出场: ${result.get('exit_price', 0):,.2f}
{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})
持仓时间: {result.get('hold_duration', 'N/A')}"""
# 发送通知
await feishu.send_text(message)
await telegram.send_message(message)
logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}")
except Exception as e:
logger.error(f"检查 {symbol} 价格失败: {e}")
# 每 3 秒检查一次
await asyncio.sleep(3)
except Exception as e:
logger.error(f"价格监控循环出错: {e}")
await asyncio.sleep(5)
async def _print_system_status():
"""打印系统状态摘要"""
from app.utils.system_status import get_system_monitor
monitor = get_system_monitor()
summary = monitor.get_summary()
logger.info("\n" + "=" * 60)
logger.info("📊 系统状态摘要")
logger.info("=" * 60)
logger.info(f"启动时间: {summary['system_start_time']}")
logger.info(f"运行时长: {int(summary['uptime_seconds'] // 60)} 分钟")
logger.info(f"Agent 总数: {summary['total_agents']}")
logger.info(f"运行中: {summary['running_agents']} | 错误: {summary['error_agents']}")
if summary['agents']:
logger.info("\n🤖 Agent 详情:")
for agent_id, agent_info in summary['agents'].items():
status_icon = {
"运行中": "",
"启动中": "🔄",
"已停止": "⏸️",
"错误": "",
"未初始化": ""
}.get(agent_info['status'], "")
logger.info(f" {status_icon} {agent_info['name']} - {agent_info['status']}")
# 显示配置
config = agent_info.get('config', {})
if config:
if 'symbols' in config:
logger.info(f" 监控: {', '.join(config['symbols'])}")
if 'paper_trading_enabled' in config:
pt_status = "已启用" if config['paper_trading_enabled'] else "未启用"
logger.info(f" 模拟交易: {pt_status}")
if 'analysis_interval' in config:
logger.info(f" 分析间隔: {config['analysis_interval']}")
logger.info("=" * 60 + "\n")
async def periodic_report_loop():
"""定时报告循环 - 每4小时发送一次模拟交易报告"""
from datetime import datetime
from app.services.paper_trading_service import get_paper_trading_service
from app.services.telegram_service import get_telegram_service
logger.info("定时报告任务已启动")
# 计算距离下一个整4小时的等待时间
def get_seconds_until_next_4h():
now = datetime.now()
current_hour = now.hour
# 下一个4小时整点: 0, 4, 8, 12, 16, 20
next_4h = ((current_hour // 4) + 1) * 4
if next_4h >= 24:
next_4h = 0
# 需要等到明天
from datetime import timedelta
next_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
else:
next_time = now.replace(hour=next_4h, minute=0, second=0, microsecond=0)
wait_seconds = (next_time - now).total_seconds()
return int(wait_seconds), next_time
while True:
try:
# 计算等待时间
wait_seconds, next_time = get_seconds_until_next_4h()
logger.info(f"下次报告时间: {next_time.strftime('%Y-%m-%d %H:%M')},等待 {wait_seconds // 3600}小时{(wait_seconds % 3600) // 60}分钟")
# 等待到下一个4小时整点
await asyncio.sleep(wait_seconds)
# 生成并发送报告
paper_trading = get_paper_trading_service()
telegram = get_telegram_service()
report = paper_trading.generate_report(hours=4)
await telegram.send_message(report, parse_mode="HTML")
logger.info("已发送4小时模拟交易报告")
except Exception as e:
logger.error(f"定时报告循环出错: {e}")
import traceback
logger.error(traceback.format_exc())
await asyncio.sleep(60) # 出错后等待1分钟再重试
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
global _price_monitor_task, _report_task, _stock_agent_task, _crypto_agent_task
# 启动时执行
logger.info("应用启动")
# 初始化全局异常处理器
setup_global_exception_handler()
logger.info("全局异常处理器已安装")
# 初始化飞书错误通知
try:
from app.services.feishu_service import get_feishu_service
feishu_service = get_feishu_service()
init_error_notifier(
feishu_service=feishu_service,
enabled=True, # 启用异常通知
cooldown=300 # 5分钟冷却时间
)
logger.info("✅ 系统异常通知已启用(异常将发送到飞书)")
except Exception as e:
logger.warning(f"飞书异常通知初始化失败: {e}")
# 启动后台任务
settings = get_settings()
if getattr(settings, 'paper_trading_enabled', True):
_price_monitor_task = asyncio.create_task(price_monitor_loop())
logger.info("后台价格监控任务已创建")
_report_task = asyncio.create_task(periodic_report_loop())
logger.info("定时报告任务已创建")
# 启动加密货币智能体
if getattr(settings, 'crypto_symbols', '') and settings.crypto_symbols.strip():
try:
from app.crypto_agent.crypto_agent import get_crypto_agent
crypto_agent = get_crypto_agent()
# 防止重复启动
if not crypto_agent.running:
_crypto_agent_task = asyncio.create_task(crypto_agent.run())
logger.info(f"加密货币智能体已启动,监控: {settings.crypto_symbols}")
else:
logger.info("加密货币智能体已在运行中")
except Exception as e:
logger.error(f"加密货币智能体启动失败: {e}")
# 启动美股智能体
if getattr(settings, 'stock_symbols', '') and settings.stock_symbols.strip():
try:
from app.stock_agent.stock_agent import get_stock_agent
stock_agent = get_stock_agent()
_stock_agent_task = asyncio.create_task(stock_agent.start())
# 设置智能体实例到 API 模块
stocks.set_stock_agent(stock_agent)
logger.info(f"美股智能体已启动,监控: {settings.stock_symbols}")
except Exception as e:
logger.error(f"美股智能体启动失败: {e}")
logger.error(f"提示: 请确保已安装 yfinance (pip install yfinance)")
# 显示系统状态摘要
await _print_system_status()
yield
# 关闭时执行
if _price_monitor_task:
_price_monitor_task.cancel()
try:
await _price_monitor_task
except asyncio.CancelledError:
pass
logger.info("后台价格监控任务已停止")
if _report_task:
_report_task.cancel()
try:
await _report_task
except asyncio.CancelledError:
pass
logger.info("定时报告任务已停止")
# 停止加密货币智能体
if _crypto_agent_task:
_crypto_agent_task.cancel()
try:
await _crypto_agent_task
except asyncio.CancelledError:
pass
logger.info("加密货币智能体已停止")
# 停止美股智能体
if _stock_agent_task:
_stock_agent_task.cancel()
try:
await _stock_agent_task
except asyncio.CancelledError:
pass
logger.info("美股智能体已停止")
logger.info("应用关闭")
# 创建FastAPI应用
app = FastAPI(
title="A股AI分析Agent系统",
description="基于AI Agent的股票智能分析系统",
version="1.0.0",
lifespan=lifespan
)
# 配置CORS
settings = get_settings()
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins.split(","),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(auth.router, tags=["认证"])
app.include_router(admin.router, tags=["后台管理"])
app.include_router(chat.router, prefix="/api/chat", tags=["对话"])
app.include_router(stock.router, prefix="/api/stock", tags=["股票数据"])
app.include_router(skills.router, prefix="/api/skills", tags=["技能管理"])
app.include_router(llm.router, tags=["LLM模型"])
app.include_router(paper_trading.router, tags=["模拟交易"])
app.include_router(real_trading.router, tags=["实盘交易"])
app.include_router(stocks.router, prefix="/api/stocks", tags=["美股分析"])
app.include_router(signals.router, tags=["信号管理"])
app.include_router(system.router, prefix="/api/system", tags=["系统状态"])
# 挂载静态文件
frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "frontend")
if os.path.exists(frontend_path):
app.mount("/static", StaticFiles(directory=frontend_path), name="static")
@app.get("/")
async def root():
"""根路径,返回主应用页面"""
index_path = os.path.join(frontend_path, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"message": "页面不存在"}
@app.get("/app")
async def app_page():
"""主应用页面"""
index_path = os.path.join(frontend_path, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"message": "页面不存在"}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
@app.get("/paper-trading")
async def paper_trading_page():
"""模拟交易页面"""
page_path = os.path.join(frontend_path, "paper-trading.html")
if os.path.exists(page_path):
return FileResponse(page_path)
return {"message": "页面不存在"}
@app.get("/real-trading")
async def real_trading_page():
"""实盘交易页面"""
page_path = os.path.join(frontend_path, "real-trading.html")
if os.path.exists(page_path):
return FileResponse(page_path)
return {"message": "页面不存在"}
@app.get("/signals")
async def signals_page():
"""信号列表页面"""
page_path = os.path.join(frontend_path, "signals.html")
if os.path.exists(page_path):
return FileResponse(page_path)
return {"message": "页面不存在"}
@app.get("/status")
async def status_page():
"""系统状态监控页面"""
page_path = os.path.join(frontend_path, "status.html")
if os.path.exists(page_path):
return FileResponse(page_path)
return {"message": "页面不存在"}
if __name__ == "__main__":
import uvicorn
# 设置全局异常处理器(防止主线程异常退出)
setup_global_exception_handler()
try:
uvicorn.run(
"app.main:app",
host=settings.api_host,
port=settings.api_port,
reload=settings.debug
)
except Exception as e:
logger.error(f"应用启动失败: {e}")
import traceback
logger.error(traceback.format_exc())
raise