""" 系统状态监控模块 跟踪各个 Agent 的启动、运行状态 """ from typing import Dict, Any, Optional from datetime import datetime from enum import Enum from threading import Lock class AgentStatus(Enum): """Agent 状态枚举""" NOT_INITIALIZED = "未初始化" STARTING = "启动中" RUNNING = "运行中" STOPPED = "已停止" ERROR = "错误" class AgentInfo: """Agent 信息类""" def __init__(self, name: str, agent_type: str): self.name = name # Agent 名称 self.agent_type = agent_type # Agent 类型 (crypto/stock/smart) self.status = AgentStatus.NOT_INITIALIZED self.start_time: Optional[datetime] = None self.last_activity: Optional[datetime] = None self.error_message: Optional[str] = None self.config: Dict[str, Any] = {} # 配置信息 def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "name": self.name, "type": self.agent_type, "status": self.status.value, "start_time": self.start_time.strftime("%Y-%m-%d %H:%M:%S") if self.start_time else None, "last_activity": self.last_activity.strftime("%Y-%m-%d %H:%M:%S") if self.last_activity else None, "error_message": self.error_message, "config": self.config } class SystemStatusMonitor: """系统状态监控器(单例)""" _instance = None _lock = Lock() def __new__(cls): """单例模式""" if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): """初始化监控器""" if not hasattr(self, '_initialized'): self._initialized = True self._agents: Dict[str, AgentInfo] = {} self._system_start_time = datetime.now() def register_agent(self, agent_id: str, name: str, agent_type: str) -> AgentInfo: """注册一个 Agent""" agent_info = AgentInfo(name, agent_type) self._agents[agent_id] = agent_info return agent_info def update_status(self, agent_id: str, status: AgentStatus, error_message: str = None): """更新 Agent 状态""" if agent_id in self._agents: agent_info = self._agents[agent_id] agent_info.status = status agent_info.last_activity = datetime.now() if status == AgentStatus.STARTING: agent_info.start_time = datetime.now() if error_message: agent_info.error_message = error_message elif status != AgentStatus.ERROR: agent_info.error_message = None def update_config(self, agent_id: str, config: Dict[str, Any]): """更新 Agent 配置""" if agent_id in self._agents: self._agents[agent_id].config = config def get_agent_status(self, agent_id: str) -> Optional[AgentInfo]: """获取指定 Agent 状态""" return self._agents.get(agent_id) def get_all_status(self) -> Dict[str, Dict[str, Any]]: """获取所有 Agent 状态""" return { agent_id: agent_info.to_dict() for agent_id, agent_info in self._agents.items() } def get_summary(self) -> Dict[str, Any]: """获取系统状态摘要""" running_count = sum( 1 for agent in self._agents.values() if agent.status == AgentStatus.RUNNING ) error_count = sum( 1 for agent in self._agents.values() if agent.status == AgentStatus.ERROR ) return { "system_start_time": self._system_start_time.strftime("%Y-%m-%d %H:%M:%S"), "uptime_seconds": (datetime.now() - self._system_start_time).total_seconds(), "total_agents": len(self._agents), "running_agents": running_count, "error_agents": error_count, "agents": self.get_all_status() } # 全局单例 _monitor: Optional[SystemStatusMonitor] = None def get_system_monitor() -> SystemStatusMonitor: """获取系统监控器实例""" global _monitor if _monitor is None: _monitor = SystemStatusMonitor() return _monitor