stock-ai-agent/backend/app/api/system.py
2026-04-22 11:03:24 +08:00

481 lines
19 KiB
Python

"""
系统状态 API
"""
from datetime import datetime, timedelta
from fastapi import APIRouter, HTTPException
from typing import Dict, Any
from app.utils.logger import logger
from app.utils.system_status import get_system_monitor
from app.crypto_agent.crypto_agent import get_crypto_agent
from app.services.signal_database_service import get_signal_db_service
from app.services.paper_trading_service import get_paper_trading_service
from app.services.bitget_live_trading_service import get_bitget_live_service
from app.services.hyperliquid_trading_service import get_hyperliquid_service
router = APIRouter()
def _parse_signal_timestamp(value: Any) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
return value.replace(tzinfo=None) if value.tzinfo else value
text = str(value).replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(text)
return parsed.replace(tzinfo=None) if parsed.tzinfo else parsed
except ValueError:
return None
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
if value is None or value == "":
return default
return float(value)
except (TypeError, ValueError):
return default
def _normalize_platform_position(platform: str, position: Dict[str, Any]) -> Dict[str, Any]:
side_raw = str(position.get("side") or "").lower()
side = "long" if side_raw in {"buy", "long"} else "short"
symbol = position.get("symbol") or position.get("coin") or "-"
if isinstance(symbol, str) and symbol.endswith("USDTUSDT"):
symbol = symbol.replace("USDTUSDT", "USDT")
entry_price = _safe_float(position.get("entry_price") or position.get("filled_price"))
mark_price = _safe_float(position.get("mark_price") or position.get("current_price"))
if mark_price <= 0:
mark_price = entry_price
size = abs(_safe_float(position.get("size") or position.get("quantity")))
leverage = _safe_float(position.get("leverage"))
margin = _safe_float(position.get("margin") or position.get("initialMargin") or position.get("initial_margin"))
unrealized_pnl = _safe_float(position.get("unrealized_pnl") or position.get("pnl_amount"))
pnl_percent = _safe_float(position.get("unrealized_pnl_pct") or position.get("pnl_percent") or position.get("percentage"))
return {
"platform": platform,
"symbol": symbol,
"side": side,
"size": size,
"entry_price": entry_price,
"mark_price": mark_price,
"leverage": leverage,
"margin": margin,
"unrealized_pnl": unrealized_pnl,
"pnl_percent": pnl_percent,
"take_profit": position.get("take_profit"),
"stop_loss": position.get("stop_loss"),
"liquidation_price": position.get("liquidation_price"),
"opened_at": position.get("opened_at") or position.get("created_at"),
}
def _normalize_platform_order(platform: str, order: Dict[str, Any]) -> Dict[str, Any]:
side_raw = str(order.get("side") or "").lower()
side = "long" if side_raw in {"buy", "long", "b"} else "short"
symbol = order.get("symbol") or order.get("coin") or "-"
if isinstance(symbol, str) and symbol.endswith("USDTUSDT"):
symbol = symbol.replace("USDTUSDT", "USDT")
price = _safe_float(order.get("price") or order.get("entry_price"))
size = abs(_safe_float(order.get("size") or order.get("quantity")))
order_type = str(order.get("order_type") or order.get("entry_type") or order.get("type") or "").lower()
status = str(order.get("status") or "").lower()
is_reduce_only = bool(order.get("is_reduce_only"))
category = "tp_sl" if is_reduce_only else "entry"
if platform == "paper" and status == "pending":
category = "entry"
return {
"platform": platform,
"symbol": symbol,
"side": side,
"category": category,
"price": price,
"size": size,
"leverage": _safe_float(order.get("leverage")),
"margin": _safe_float(order.get("margin")),
"order_type": order_type,
"status": status,
"stop_loss": order.get("stop_loss"),
"take_profit": order.get("take_profit"),
"signal_grade": order.get("signal_grade"),
"signal_type": order.get("signal_type"),
"confidence": _safe_float(order.get("confidence")),
"created_at": order.get("created_at") or order.get("timestamp"),
}
def _build_attention_items(
platform_halts: Dict[str, Any],
platforms: Dict[str, Any],
unified_positions: list[Dict[str, Any]],
unified_orders: list[Dict[str, Any]],
execution_events: list[Dict[str, Any]],
) -> list[Dict[str, Any]]:
items: list[Dict[str, Any]] = []
for platform, halt in (platform_halts or {}).items():
if halt and halt.get("halted"):
items.append({
"severity": "danger",
"title": f"{platform} 已停机",
"detail": halt.get("reason") or "平台已触发停机/熔断",
"timestamp": halt.get("halted_at"),
})
for platform_name, payload in (platforms or {}).items():
if payload.get("enabled") is False:
continue
risk = payload.get("risk") or {}
current_leverage = _safe_float(risk.get("current_leverage"))
max_leverage = _safe_float(risk.get("max_leverage"))
drawdown_pct = _safe_float(risk.get("drawdown_percent") or risk.get("drawdown"))
breaker_pct = _safe_float(risk.get("circuit_breaker_threshold"), 25.0)
if breaker_pct > 0 and drawdown_pct >= breaker_pct * 0.7:
items.append({
"severity": "warning",
"title": f"{platform_name} 回撤接近熔断",
"detail": f"当前回撤 {drawdown_pct:.1f}% / 阈值 {breaker_pct:.1f}%",
"timestamp": None,
})
if max_leverage > 0 and current_leverage >= max_leverage * 0.8:
items.append({
"severity": "warning",
"title": f"{platform_name} 杠杆占用偏高",
"detail": f"当前总杠杆 {current_leverage:.2f}x / 上限 {max_leverage:.2f}x",
"timestamp": None,
})
for pos in unified_positions:
if not pos.get("stop_loss") or not pos.get("take_profit"):
items.append({
"severity": "warning",
"title": f"{pos['platform']} {pos['symbol']} 风控不完整",
"detail": "持仓缺少止盈或止损,请确认执行层保护单状态",
"timestamp": pos.get("opened_at"),
})
pending_entry_count = sum(1 for order in unified_orders if order.get("category") == "entry")
if pending_entry_count > 0:
items.append({
"severity": "info",
"title": "存在待成交入场单",
"detail": f"当前共有 {pending_entry_count} 笔待成交入场单,建议关注价格接近度和资金占用。",
"timestamp": None,
})
for event in (execution_events or [])[:8]:
if event.get("status") in {"error", "warning"}:
items.append({
"severity": event.get("status"),
"title": f"{event.get('platform', '-')}: {event.get('event_type', '-')}",
"detail": event.get("reason") or "最近执行事件需要关注",
"timestamp": event.get("timestamp"),
})
return items[:12]
@router.get("/status", response_model=Dict[str, Any])
async def get_system_status():
"""
获取系统状态
返回所有 Agent 的运行状态和系统信息
"""
try:
monitor = get_system_monitor()
summary = monitor.get_summary()
# 添加额外的系统信息
response = {
"status": "success",
"data": summary
}
return response
except Exception as e:
logger.error(f"获取系统状态失败: {e}")
raise HTTPException(status_code=500, detail=f"获取系统状态失败: {str(e)}")
@router.get("/status/summary")
async def get_status_summary():
"""
获取系统状态摘要
返回简化的状态信息,用于快速检查
"""
try:
monitor = get_system_monitor()
summary = monitor.get_summary()
return {
"status": "success",
"data": {
"total_agents": summary["total_agents"],
"running_agents": summary["running_agents"],
"error_agents": summary["error_agents"],
"uptime_seconds": summary["uptime_seconds"],
"agents": {
agent_id: {
"name": info["name"],
"status": info["status"]
}
for agent_id, info in summary["agents"].items()
}
}
}
except Exception as e:
logger.error(f"获取状态摘要失败: {e}")
raise HTTPException(status_code=500, detail=f"获取状态摘要失败: {str(e)}")
@router.get("/status/agents/{agent_id}")
async def get_agent_status(agent_id: str):
"""
获取指定 Agent 的详细状态
Args:
agent_id: Agent ID (如: crypto_agent, stock_agent)
"""
try:
monitor = get_system_monitor()
agent_info = monitor.get_agent_status(agent_id)
if agent_info is None:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' 不存在")
return {
"status": "success",
"data": agent_info.to_dict()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取 Agent 状态失败: {e}")
raise HTTPException(status_code=500, detail=f"获取 Agent 状态失败: {str(e)}")
@router.get("/console", response_model=Dict[str, Any])
async def get_console_snapshot():
"""
获取总控台快照
聚合系统运行态、信号统计、模拟盘与实盘平台状态,供总控台页面使用。
"""
try:
monitor = get_system_monitor()
summary = monitor.get_summary()
now = datetime.now()
signal_db = get_signal_db_service()
signal_stats = signal_db.get_signal_stats(days=7)
latest_signals = signal_db.get_latest_signals(limit=12, days=3)
crypto_agent = get_crypto_agent()
crypto_status = crypto_agent.get_status()
paper_service = get_paper_trading_service()
paper_account = paper_service.get_account_status()
paper_orders = paper_service.get_active_orders()
paper_positions = [o for o in paper_orders if o.get('status') == 'open']
paper_pending = [o for o in paper_orders if o.get('status') == 'pending']
paper_stats = paper_service.calculate_statistics()
bitget_service = get_bitget_live_service()
bitget_summary = {"enabled": False}
if bitget_service is not None:
bg_account = bitget_service.get_account_state()
bg_positions = bitget_service.get_open_positions()
bg_orders = bitget_service.get_open_orders()
bg_total_position_value = sum(abs(p["size"]) * p["entry_price"] for p in bg_positions)
bg_drawdown = 0.0
if bitget_service.initial_balance and bitget_service.initial_balance > 0:
bg_drawdown = (bitget_service.initial_balance - bg_account["account_value"]) / bitget_service.initial_balance * 100
bitget_summary = {
"enabled": True,
"account": {
"account_value": bg_account.get("account_value", 0),
"available_balance": bg_account.get("available_balance", 0),
"total_margin_used": bg_account.get("total_margin_used", 0),
"initial_balance": bitget_service.initial_balance,
},
"positions": {
"count": len(bg_positions),
"total_value": bg_total_position_value,
"items": bg_positions[:8],
},
"orders": {
"count": len(bg_orders),
"entry_orders": len([o for o in bg_orders if not o.get("is_reduce_only")]),
"tp_sl_orders": len([o for o in bg_orders if o.get("is_reduce_only")]),
"items": bg_orders[:8],
},
"risk": {
"current_leverage": bg_total_position_value / bg_account["account_value"] if bg_account.get("account_value", 0) > 0 else 0,
"max_leverage": bitget_service.max_total_leverage,
"drawdown_percent": bg_drawdown,
"circuit_breaker_threshold": bitget_service.circuit_breaker_drawdown * 100,
},
}
hyperliquid_service = get_hyperliquid_service()
hyperliquid_summary = {"enabled": False}
if hyperliquid_service is not None:
hl_account = hyperliquid_service.get_account_state()
hl_positions = hyperliquid_service.get_open_positions()
hl_orders = hyperliquid_service.get_open_orders()
hl_total_position_value = sum(abs(p["size"]) * p["entry_price"] for p in hl_positions)
hl_drawdown = 0.0
if hyperliquid_service.initial_balance and hyperliquid_service.initial_balance > 0:
hl_drawdown = (hyperliquid_service.initial_balance - hl_account["account_value"]) / hyperliquid_service.initial_balance * 100
hyperliquid_summary = {
"enabled": True,
"account": {
"account_value": hl_account.get("account_value", 0),
"available_balance": hl_account.get("available_balance", 0),
"total_margin_used": hl_account.get("total_margin_used", 0),
"initial_balance": hyperliquid_service.initial_balance,
},
"positions": {
"count": len(hl_positions),
"total_value": hl_total_position_value,
"items": hl_positions[:8],
},
"orders": {
"count": len(hl_orders),
"entry_orders": len([o for o in hl_orders if not o.get("is_reduce_only")]),
"tp_sl_orders": len([o for o in hl_orders if o.get("is_reduce_only")]),
"items": hl_orders[:8],
},
"risk": {
"current_leverage": hl_total_position_value / hl_account["account_value"] if hl_account.get("account_value", 0) > 0 else 0,
"max_leverage": hyperliquid_service.max_total_leverage,
"drawdown_percent": hl_drawdown,
"circuit_breaker_threshold": hyperliquid_service.circuit_breaker_drawdown * 100,
},
}
recent_cutoff = now - timedelta(minutes=30)
recent_signal_count = sum(
1
for signal in latest_signals
if (_parse_signal_timestamp(signal.get("created_at")) or datetime.min) >= recent_cutoff
)
paper_position_items = [
_normalize_platform_position("paper", pos)
for pos in paper_service.get_open_positions()[:12]
]
paper_order_items = [
_normalize_platform_order("paper", order)
for order in paper_pending[:12]
]
bitget_position_items = [
_normalize_platform_position("bitget", pos)
for pos in (bg_positions[:12] if bitget_service is not None else [])
]
bitget_order_items = [
_normalize_platform_order("bitget", order)
for order in (bg_orders[:12] if bitget_service is not None else [])
]
hyperliquid_position_items = [
_normalize_platform_position("hyperliquid", pos)
for pos in (hl_positions[:12] if hyperliquid_service is not None else [])
]
hyperliquid_order_items = [
_normalize_platform_order("hyperliquid", order)
for order in (hl_orders[:12] if hyperliquid_service is not None else [])
]
unified_positions = sorted(
paper_position_items + bitget_position_items + hyperliquid_position_items,
key=lambda item: _parse_signal_timestamp(item.get("opened_at")) or datetime.min,
reverse=True,
)
unified_orders = sorted(
paper_order_items + bitget_order_items + hyperliquid_order_items,
key=lambda item: _parse_signal_timestamp(item.get("created_at")) or datetime.min,
reverse=True,
)
platforms_payload = {
"paper": {
"enabled": True,
"account": paper_account,
"positions": {
"count": len(paper_positions),
"items": paper_positions[:8],
},
"orders": {
"count": len(paper_orders),
"pending_count": len(paper_pending),
"items": paper_pending[:8],
},
"statistics": {
"win_rate": paper_stats.get("win_rate", 0),
"total_trades": paper_stats.get("total_trades", 0),
"total_pnl": paper_stats.get("total_pnl", 0),
"max_drawdown": paper_stats.get("max_drawdown", 0),
"by_grade": paper_stats.get("by_grade", {}),
},
"risk": {
"current_leverage": paper_account.get("current_total_leverage", 0),
"max_leverage": paper_account.get("max_total_leverage", 0),
"drawdown_percent": paper_account.get("max_drawdown", 0),
"circuit_breaker_threshold": 25,
},
},
"bitget": bitget_summary,
"hyperliquid": hyperliquid_summary,
}
execution_events = crypto_agent.get_recent_execution_events(limit=40)
attention_items = _build_attention_items(
crypto_status.get("platform_halts", {}),
platforms_payload,
unified_positions,
unified_orders,
execution_events,
)
return {
"status": "success",
"data": {
"generated_at": now.isoformat(),
"system": summary,
"crypto_agent": crypto_status,
"execution_events": execution_events,
"signals": {
"stats_7d": signal_stats,
"latest": latest_signals,
"recent_30m_count": recent_signal_count,
},
"platforms": platforms_payload,
"management": {
"positions": unified_positions[:18],
"orders": unified_orders[:24],
"attention_items": attention_items,
},
}
}
except Exception as e:
logger.error(f"获取总控台快照失败: {e}")
raise HTTPException(status_code=500, detail=f"获取总控台快照失败: {str(e)}")