From 754bb02c12f0002cae9c4156e767367469f2b55a Mon Sep 17 00:00:00 2001 From: aaron <> Date: Sun, 22 Feb 2026 11:56:51 +0800 Subject: [PATCH] update --- backend/app/api/system.py | 93 +++++ backend/app/crypto_agent/crypto_agent.py | 26 ++ backend/app/main.py | 57 ++- backend/app/utils/system_status.py | 134 +++++++ frontend/status.html | 438 +++++++++++++++++++++++ 5 files changed, 747 insertions(+), 1 deletion(-) create mode 100644 backend/app/api/system.py create mode 100644 backend/app/utils/system_status.py create mode 100644 frontend/status.html diff --git a/backend/app/api/system.py b/backend/app/api/system.py new file mode 100644 index 0000000..d9774bd --- /dev/null +++ b/backend/app/api/system.py @@ -0,0 +1,93 @@ +""" +系统状态 API +""" +from fastapi import APIRouter, HTTPException +from typing import Dict, Any +from app.utils.logger import logger +from app.utils.system_status import get_system_monitor + +router = APIRouter() + + +@router.get("/status", response_model=Dict[str, Any]) +async def get_system_status(): + """ + 获取系统状态 + + 返回所有 Agent 的运行状态和系统信息 + """ + try: + monitor = get_system_monitor() + summary = monitor.get_summary() + + # 添加额外的系统信息 + response = { + "status": "success", + "data": summary + } + + return response + + except Exception as e: + logger.error(f"获取系统状态失败: {e}") + raise HTTPException(status_code=500, detail=f"获取系统状态失败: {str(e)}") + + +@router.get("/status/summary") +async def get_status_summary(): + """ + 获取系统状态摘要 + + 返回简化的状态信息,用于快速检查 + """ + try: + monitor = get_system_monitor() + summary = monitor.get_summary() + + return { + "status": "success", + "data": { + "total_agents": summary["total_agents"], + "running_agents": summary["running_agents"], + "error_agents": summary["error_agents"], + "uptime_seconds": summary["uptime_seconds"], + "agents": { + agent_id: { + "name": info["name"], + "status": info["status"] + } + for agent_id, info in summary["agents"].items() + } + } + } + + except Exception as e: + logger.error(f"获取状态摘要失败: {e}") + raise HTTPException(status_code=500, detail=f"获取状态摘要失败: {str(e)}") + + +@router.get("/status/agents/{agent_id}") +async def get_agent_status(agent_id: str): + """ + 获取指定 Agent 的详细状态 + + Args: + agent_id: Agent ID (如: crypto_agent, stock_agent) + """ + try: + monitor = get_system_monitor() + agent_info = monitor.get_agent_status(agent_id) + + if agent_info is None: + raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' 不存在") + + return { + "status": "success", + "data": agent_info.to_dict() + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"获取 Agent 状态失败: {e}") + raise HTTPException(status_code=500, detail=f"获取 Agent 状态失败: {str(e)}") diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index 41da811..4844fc0 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -14,6 +14,7 @@ from app.services.telegram_service import get_telegram_service from app.services.paper_trading_service import get_paper_trading_service from app.services.signal_database_service import get_signal_db_service from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer +from app.utils.system_status import get_system_monitor, AgentStatus class CryptoAgent: @@ -60,6 +61,19 @@ class CryptoAgent: self.running = False self._event_loop = None + # 注册到系统监控 + monitor = get_system_monitor() + self._monitor_info = monitor.register_agent( + agent_id="crypto_agent", + name="加密货币智能体", + agent_type="crypto" + ) + monitor.update_config("crypto_agent", { + "symbols": self.symbols, + "paper_trading_enabled": self.paper_trading_enabled, + "analysis_interval": "每5分钟整点" + }) + logger.info(f"加密货币智能体初始化完成(LLM 驱动),监控交易对: {self.symbols}") if self.paper_trading_enabled: logger.info(f"模拟交易已启用") @@ -191,6 +205,10 @@ class CryptoAgent: self.running = True self._event_loop = asyncio.get_event_loop() + # 更新状态为启动中 + monitor = get_system_monitor() + monitor.update_status("crypto_agent", AgentStatus.STARTING) + # 启动横幅 logger.info("\n" + "=" * 60) logger.info("🚀 加密货币交易信号智能体(LLM 驱动)") @@ -202,6 +220,9 @@ class CryptoAgent: logger.info(f" 模拟交易: 已启用") logger.info("=" * 60 + "\n") + # 更新状态为运行中 + monitor.update_status("crypto_agent", AgentStatus.RUNNING) + # 注意:不再启动独立的价格监控 # 价格监控由 main.py 中的 price_monitor_loop 统一处理,避免重复检查 if self.paper_trading_enabled: @@ -246,6 +267,11 @@ class CryptoAgent: def stop(self): """停止运行""" self.running = False + + # 更新状态 + monitor = get_system_monitor() + monitor.update_status("crypto_agent", AgentStatus.STOPPED) + logger.info("加密货币智能体已停止") def _check_volatility(self, symbol: str, data: Dict[str, pd.DataFrame]) -> tuple[bool, str, float]: diff --git a/backend/app/main.py b/backend/app/main.py index b53ac79..9ca0b2e 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,8 +9,9 @@ from fastapi.responses import FileResponse from contextlib import asynccontextmanager from app.config import get_settings from app.utils.logger import logger -from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks, signals +from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks, signals, system from app.utils.error_handler import setup_global_exception_handler, init_error_notifier +from app.utils.system_status import get_system_monitor import os @@ -233,6 +234,48 @@ async def price_monitor_loop(): await asyncio.sleep(5) +async def _print_system_status(): + """打印系统状态摘要""" + from app.utils.system_status import get_system_monitor + + monitor = get_system_monitor() + summary = monitor.get_summary() + + logger.info("\n" + "=" * 60) + logger.info("📊 系统状态摘要") + logger.info("=" * 60) + logger.info(f"启动时间: {summary['system_start_time']}") + logger.info(f"运行时长: {int(summary['uptime_seconds'] // 60)} 分钟") + logger.info(f"Agent 总数: {summary['total_agents']}") + logger.info(f"运行中: {summary['running_agents']} | 错误: {summary['error_agents']}") + + if summary['agents']: + logger.info("\n🤖 Agent 详情:") + for agent_id, agent_info in summary['agents'].items(): + status_icon = { + "运行中": "✅", + "启动中": "🔄", + "已停止": "⏸️", + "错误": "❌", + "未初始化": "⚪" + }.get(agent_info['status'], "❓") + + logger.info(f" {status_icon} {agent_info['name']} - {agent_info['status']}") + + # 显示配置 + config = agent_info.get('config', {}) + if config: + if 'symbols' in config: + logger.info(f" 监控: {', '.join(config['symbols'])}") + if 'paper_trading_enabled' in config: + pt_status = "已启用" if config['paper_trading_enabled'] else "未启用" + logger.info(f" 模拟交易: {pt_status}") + if 'analysis_interval' in config: + logger.info(f" 分析间隔: {config['analysis_interval']}") + + logger.info("=" * 60 + "\n") + + async def periodic_report_loop(): """定时报告循环 - 每4小时发送一次模拟交易报告""" from datetime import datetime @@ -343,6 +386,9 @@ async def lifespan(app: FastAPI): logger.error(f"美股智能体启动失败: {e}") logger.error(f"提示: 请确保已安装 yfinance (pip install yfinance)") + # 显示系统状态摘要 + await _print_system_status() + yield # 关闭时执行 @@ -411,6 +457,7 @@ app.include_router(llm.router, tags=["LLM模型"]) app.include_router(paper_trading.router, tags=["模拟交易"]) app.include_router(stocks.router, prefix="/api/stocks", tags=["美股分析"]) app.include_router(signals.router, tags=["信号管理"]) +app.include_router(system.router, prefix="/api/system", tags=["系统状态"]) # 挂载静态文件 frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "frontend") @@ -454,6 +501,14 @@ async def signals_page(): return FileResponse(page_path) return {"message": "页面不存在"} +@app.get("/status") +async def status_page(): + """系统状态监控页面""" + page_path = os.path.join(frontend_path, "status.html") + if os.path.exists(page_path): + return FileResponse(page_path) + return {"message": "页面不存在"} + if __name__ == "__main__": import uvicorn diff --git a/backend/app/utils/system_status.py b/backend/app/utils/system_status.py new file mode 100644 index 0000000..b2a27ba --- /dev/null +++ b/backend/app/utils/system_status.py @@ -0,0 +1,134 @@ +""" +系统状态监控模块 + +跟踪各个 Agent 的启动、运行状态 +""" +from typing import Dict, Any, Optional +from datetime import datetime +from enum import Enum +from threading import Lock + + +class AgentStatus(Enum): + """Agent 状态枚举""" + NOT_INITIALIZED = "未初始化" + STARTING = "启动中" + RUNNING = "运行中" + STOPPED = "已停止" + ERROR = "错误" + + +class AgentInfo: + """Agent 信息类""" + + def __init__(self, name: str, agent_type: str): + self.name = name # Agent 名称 + self.agent_type = agent_type # Agent 类型 (crypto/stock/smart) + self.status = AgentStatus.NOT_INITIALIZED + self.start_time: Optional[datetime] = None + self.last_activity: Optional[datetime] = None + self.error_message: Optional[str] = None + self.config: Dict[str, Any] = {} # 配置信息 + + def to_dict(self) -> Dict[str, Any]: + """转换为字典""" + return { + "name": self.name, + "type": self.agent_type, + "status": self.status.value, + "start_time": self.start_time.strftime("%Y-%m-%d %H:%M:%S") if self.start_time else None, + "last_activity": self.last_activity.strftime("%Y-%m-%d %H:%M:%S") if self.last_activity else None, + "error_message": self.error_message, + "config": self.config + } + + +class SystemStatusMonitor: + """系统状态监控器(单例)""" + + _instance = None + _lock = Lock() + + def __new__(cls): + """单例模式""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + """初始化监控器""" + if not hasattr(self, '_initialized'): + self._initialized = True + self._agents: Dict[str, AgentInfo] = {} + self._system_start_time = datetime.now() + + def register_agent(self, agent_id: str, name: str, agent_type: str) -> AgentInfo: + """注册一个 Agent""" + agent_info = AgentInfo(name, agent_type) + self._agents[agent_id] = agent_info + return agent_info + + def update_status(self, agent_id: str, status: AgentStatus, error_message: str = None): + """更新 Agent 状态""" + if agent_id in self._agents: + agent_info = self._agents[agent_id] + agent_info.status = status + agent_info.last_activity = datetime.now() + + if status == AgentStatus.STARTING: + agent_info.start_time = datetime.now() + + if error_message: + agent_info.error_message = error_message + elif status != AgentStatus.ERROR: + agent_info.error_message = None + + def update_config(self, agent_id: str, config: Dict[str, Any]): + """更新 Agent 配置""" + if agent_id in self._agents: + self._agents[agent_id].config = config + + def get_agent_status(self, agent_id: str) -> Optional[AgentInfo]: + """获取指定 Agent 状态""" + return self._agents.get(agent_id) + + def get_all_status(self) -> Dict[str, Dict[str, Any]]: + """获取所有 Agent 状态""" + return { + agent_id: agent_info.to_dict() + for agent_id, agent_info in self._agents.items() + } + + def get_summary(self) -> Dict[str, Any]: + """获取系统状态摘要""" + running_count = sum( + 1 for agent in self._agents.values() + if agent.status == AgentStatus.RUNNING + ) + error_count = sum( + 1 for agent in self._agents.values() + if agent.status == AgentStatus.ERROR + ) + + return { + "system_start_time": self._system_start_time.strftime("%Y-%m-%d %H:%M:%S"), + "uptime_seconds": (datetime.now() - self._system_start_time).total_seconds(), + "total_agents": len(self._agents), + "running_agents": running_count, + "error_agents": error_count, + "agents": self.get_all_status() + } + + +# 全局单例 +_monitor: Optional[SystemStatusMonitor] = None + + +def get_system_monitor() -> SystemStatusMonitor: + """获取系统监控器实例""" + global _monitor + if _monitor is None: + _monitor = SystemStatusMonitor() + return _monitor diff --git a/frontend/status.html b/frontend/status.html new file mode 100644 index 0000000..b61630c --- /dev/null +++ b/frontend/status.html @@ -0,0 +1,438 @@ + + + + + + 系统状态监控 - Stock Agent + + + +
+
+

🚀 系统状态监控

+

实时监控所有 Agent 的运行状态

+
+ + 加载中... +
+
+ +
+ +
+
+
运行时长
+
-
+
+
+
Agent 总数
+
-
+
+
+
运行中
+
-
+
+
+
错误
+
-
+
+
+ +
+

🤖 Agent 详情

+
+
加载中...
+
+
+
+ + + +