stock-ai-agent/backend/app/api/system.py
2026-04-23 10:20:54 +08:00

456 lines
17 KiB
Python

"""
系统状态 API
"""
from datetime import datetime, timedelta
from fastapi import APIRouter, HTTPException
from typing import Dict, Any
import numpy as np
from app.utils.logger import logger
from app.utils.system_status import get_system_monitor
from app.crypto_agent.crypto_agent import get_crypto_agent
from app.services.signal_database_service import get_signal_db_service
from app.services.paper_trading_service import get_paper_trading_service
from app.services.bitget_live_trading_service import get_bitget_live_service
router = APIRouter()
def _sanitize_for_response(value: Any) -> Any:
if isinstance(value, dict):
return {str(key): _sanitize_for_response(item) for key, item in value.items()}
if isinstance(value, list):
return [_sanitize_for_response(item) for item in value]
if isinstance(value, tuple):
return [_sanitize_for_response(item) for item in value]
if isinstance(value, set):
return [_sanitize_for_response(item) for item in value]
if isinstance(value, np.bool_):
return bool(value)
if isinstance(value, np.integer):
return int(value)
if isinstance(value, np.floating):
return float(value)
if isinstance(value, np.ndarray):
return [_sanitize_for_response(item) for item in value.tolist()]
if isinstance(value, np.generic):
return value.item()
return value
def _parse_signal_timestamp(value: Any) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
return value.replace(tzinfo=None) if value.tzinfo else value
text = str(value).replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(text)
return parsed.replace(tzinfo=None) if parsed.tzinfo else parsed
except ValueError:
return None
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
if value is None or value == "":
return default
return float(value)
except (TypeError, ValueError):
return default
def _normalize_platform_position(platform: str, position: Dict[str, Any]) -> Dict[str, Any]:
side_raw = str(position.get("side") or "").lower()
side = "long" if side_raw in {"buy", "long"} else "short"
symbol = position.get("symbol") or position.get("coin") or "-"
if isinstance(symbol, str) and symbol.endswith("USDTUSDT"):
symbol = symbol.replace("USDTUSDT", "USDT")
entry_price = _safe_float(position.get("entry_price") or position.get("filled_price"))
mark_price = _safe_float(position.get("mark_price") or position.get("current_price"))
if mark_price <= 0:
mark_price = entry_price
size = abs(_safe_float(position.get("size") or position.get("quantity")))
leverage = _safe_float(position.get("leverage"))
margin = _safe_float(position.get("margin") or position.get("initialMargin") or position.get("initial_margin"))
unrealized_pnl = _safe_float(position.get("unrealized_pnl") or position.get("pnl_amount"))
pnl_percent = _safe_float(position.get("unrealized_pnl_pct") or position.get("pnl_percent") or position.get("percentage"))
return {
"platform": platform,
"symbol": symbol,
"side": side,
"size": size,
"entry_price": entry_price,
"mark_price": mark_price,
"leverage": leverage,
"margin": margin,
"unrealized_pnl": unrealized_pnl,
"pnl_percent": pnl_percent,
"take_profit": position.get("take_profit"),
"stop_loss": position.get("stop_loss"),
"liquidation_price": position.get("liquidation_price"),
"opened_at": position.get("opened_at") or position.get("created_at"),
}
def _normalize_platform_order(platform: str, order: Dict[str, Any]) -> Dict[str, Any]:
side_raw = str(order.get("side") or "").lower()
side = "long" if side_raw in {"buy", "long", "b"} else "short"
symbol = order.get("symbol") or order.get("coin") or "-"
if isinstance(symbol, str) and symbol.endswith("USDTUSDT"):
symbol = symbol.replace("USDTUSDT", "USDT")
price = _safe_float(order.get("price") or order.get("entry_price"))
size = abs(_safe_float(order.get("size") or order.get("quantity")))
order_type = str(order.get("order_type") or order.get("entry_type") or order.get("type") or "").lower()
status = str(order.get("status") or "").lower()
is_reduce_only = bool(order.get("is_reduce_only"))
category = "tp_sl" if is_reduce_only else "entry"
if platform == "paper" and status == "pending":
category = "entry"
return {
"platform": platform,
"symbol": symbol,
"side": side,
"category": category,
"price": price,
"size": size,
"leverage": _safe_float(order.get("leverage")),
"margin": _safe_float(order.get("margin")),
"order_type": order_type,
"status": status,
"stop_loss": order.get("stop_loss"),
"take_profit": order.get("take_profit"),
"signal_grade": order.get("signal_grade"),
"signal_type": order.get("signal_type"),
"confidence": _safe_float(order.get("confidence")),
"created_at": order.get("created_at") or order.get("timestamp"),
}
def _build_attention_items(
platform_halts: Dict[str, Any],
platforms: Dict[str, Any],
unified_positions: list[Dict[str, Any]],
unified_orders: list[Dict[str, Any]],
execution_events: list[Dict[str, Any]],
) -> list[Dict[str, Any]]:
items: list[Dict[str, Any]] = []
for platform, halt in (platform_halts or {}).items():
if halt and halt.get("halted"):
items.append({
"severity": "danger",
"title": f"{platform} 已停机",
"detail": halt.get("reason") or "平台已触发停机/熔断",
"timestamp": halt.get("halted_at"),
})
for platform_name, payload in (platforms or {}).items():
if payload.get("enabled") is False:
continue
risk = payload.get("risk") or {}
current_leverage = _safe_float(risk.get("current_leverage"))
max_leverage = _safe_float(risk.get("max_leverage"))
drawdown_pct = _safe_float(risk.get("drawdown_percent") or risk.get("drawdown"))
breaker_pct = _safe_float(risk.get("circuit_breaker_threshold"), 25.0)
if breaker_pct > 0 and drawdown_pct >= breaker_pct * 0.7:
items.append({
"severity": "warning",
"title": f"{platform_name} 回撤接近熔断",
"detail": f"当前回撤 {drawdown_pct:.1f}% / 阈值 {breaker_pct:.1f}%",
"timestamp": None,
})
if max_leverage > 0 and current_leverage >= max_leverage * 0.8:
items.append({
"severity": "warning",
"title": f"{platform_name} 杠杆占用偏高",
"detail": f"当前总杠杆 {current_leverage:.2f}x / 上限 {max_leverage:.2f}x",
"timestamp": None,
})
for pos in unified_positions:
if not pos.get("stop_loss") or not pos.get("take_profit"):
items.append({
"severity": "warning",
"title": f"{pos['platform']} {pos['symbol']} 风控不完整",
"detail": "持仓缺少止盈或止损,请确认执行层保护单状态",
"timestamp": pos.get("opened_at"),
})
pending_entry_count = sum(1 for order in unified_orders if order.get("category") == "entry")
if pending_entry_count > 0:
items.append({
"severity": "info",
"title": "存在待成交入场单",
"detail": f"当前共有 {pending_entry_count} 笔待成交入场单,建议关注价格接近度和资金占用。",
"timestamp": None,
})
for event in (execution_events or [])[:8]:
if event.get("status") in {"error", "warning"}:
items.append({
"severity": event.get("status"),
"title": f"{event.get('platform', '-')}: {event.get('event_type', '-')}",
"detail": event.get("reason") or "最近执行事件需要关注",
"timestamp": event.get("timestamp"),
})
return items[:12]
@router.get("/status", response_model=Dict[str, Any])
async def get_system_status():
"""
获取系统状态
返回所有 Agent 的运行状态和系统信息
"""
try:
monitor = get_system_monitor()
summary = monitor.get_summary()
# 添加额外的系统信息
response = {
"status": "success",
"data": summary
}
return response
except Exception as e:
logger.error(f"获取系统状态失败: {e}")
raise HTTPException(status_code=500, detail=f"获取系统状态失败: {str(e)}")
@router.get("/status/summary")
async def get_status_summary():
"""
获取系统状态摘要
返回简化的状态信息,用于快速检查
"""
try:
monitor = get_system_monitor()
summary = monitor.get_summary()
return {
"status": "success",
"data": {
"total_agents": summary["total_agents"],
"running_agents": summary["running_agents"],
"error_agents": summary["error_agents"],
"uptime_seconds": summary["uptime_seconds"],
"agents": {
agent_id: {
"name": info["name"],
"status": info["status"]
}
for agent_id, info in summary["agents"].items()
}
}
}
except Exception as e:
logger.error(f"获取状态摘要失败: {e}")
raise HTTPException(status_code=500, detail=f"获取状态摘要失败: {str(e)}")
@router.get("/status/agents/{agent_id}")
async def get_agent_status(agent_id: str):
"""
获取指定 Agent 的详细状态
Args:
agent_id: Agent ID (如: crypto_agent, stock_agent)
"""
try:
monitor = get_system_monitor()
agent_info = monitor.get_agent_status(agent_id)
if agent_info is None:
raise HTTPException(status_code=404, detail=f"Agent '{agent_id}' 不存在")
return {
"status": "success",
"data": agent_info.to_dict()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取 Agent 状态失败: {e}")
raise HTTPException(status_code=500, detail=f"获取 Agent 状态失败: {str(e)}")
@router.get("/console", response_model=Dict[str, Any])
async def get_console_snapshot():
"""
获取总控台快照
聚合系统运行态、信号统计、模拟盘与实盘平台状态,供总控台页面使用。
"""
try:
monitor = get_system_monitor()
summary = monitor.get_summary()
now = datetime.now()
signal_db = get_signal_db_service()
signal_stats = signal_db.get_signal_stats(days=7)
latest_signals = signal_db.get_latest_signals(limit=12, days=3)
crypto_agent = get_crypto_agent()
crypto_status = crypto_agent.get_status()
paper_service = get_paper_trading_service()
paper_account = paper_service.get_account_status()
paper_orders = paper_service.get_active_orders()
paper_positions = [o for o in paper_orders if o.get('status') == 'open']
paper_pending = [o for o in paper_orders if o.get('status') == 'pending']
paper_stats = paper_service.calculate_statistics()
bitget_service = get_bitget_live_service()
bitget_summary = {"enabled": False}
if bitget_service is not None:
bg_account = bitget_service.get_account_state()
bg_positions = bitget_service.get_open_positions()
bg_orders = bitget_service.get_open_orders()
bg_total_position_value = sum(abs(p["size"]) * p["entry_price"] for p in bg_positions)
bg_drawdown = 0.0
if bitget_service.initial_balance and bitget_service.initial_balance > 0:
bg_drawdown = (bitget_service.initial_balance - bg_account["account_value"]) / bitget_service.initial_balance * 100
bitget_summary = {
"enabled": True,
"account": {
"account_value": bg_account.get("account_value", 0),
"available_balance": bg_account.get("available_balance", 0),
"total_margin_used": bg_account.get("total_margin_used", 0),
"initial_balance": bitget_service.initial_balance,
},
"positions": {
"count": len(bg_positions),
"total_value": bg_total_position_value,
"items": bg_positions[:8],
},
"orders": {
"count": len(bg_orders),
"entry_orders": len([o for o in bg_orders if not o.get("is_reduce_only")]),
"tp_sl_orders": len([o for o in bg_orders if o.get("is_reduce_only")]),
"items": bg_orders[:8],
},
"risk": {
"current_leverage": bg_total_position_value / bg_account["account_value"] if bg_account.get("account_value", 0) > 0 else 0,
"max_leverage": bitget_service.max_total_leverage,
"drawdown_percent": bg_drawdown,
"circuit_breaker_threshold": bitget_service.circuit_breaker_drawdown * 100,
},
}
recent_cutoff = now - timedelta(minutes=30)
recent_signal_count = sum(
1
for signal in latest_signals
if (_parse_signal_timestamp(signal.get("created_at")) or datetime.min) >= recent_cutoff
)
paper_position_items = [
_normalize_platform_position("paper", pos)
for pos in paper_service.get_open_positions()[:12]
]
paper_order_items = [
_normalize_platform_order("paper", order)
for order in paper_pending[:12]
]
bitget_position_items = [
_normalize_platform_position("bitget", pos)
for pos in (bg_positions[:12] if bitget_service is not None else [])
]
bitget_order_items = [
_normalize_platform_order("bitget", order)
for order in (bg_orders[:12] if bitget_service is not None else [])
]
unified_positions = sorted(
paper_position_items + bitget_position_items,
key=lambda item: _parse_signal_timestamp(item.get("opened_at")) or datetime.min,
reverse=True,
)
unified_orders = sorted(
paper_order_items + bitget_order_items,
key=lambda item: _parse_signal_timestamp(item.get("created_at")) or datetime.min,
reverse=True,
)
platforms_payload = {
"paper": {
"enabled": True,
"account": paper_account,
"positions": {
"count": len(paper_positions),
"items": paper_positions[:8],
},
"orders": {
"count": len(paper_orders),
"pending_count": len(paper_pending),
"items": paper_pending[:8],
},
"statistics": {
"win_rate": paper_stats.get("win_rate", 0),
"total_trades": paper_stats.get("total_trades", 0),
"total_pnl": paper_stats.get("total_pnl", 0),
"max_drawdown": paper_stats.get("max_drawdown", 0),
"by_grade": paper_stats.get("by_grade", {}),
},
"risk": {
"current_leverage": paper_account.get("current_total_leverage", 0),
"max_leverage": paper_account.get("max_total_leverage", 0),
"drawdown_percent": paper_account.get("max_drawdown", 0),
"circuit_breaker_threshold": 25,
},
},
"bitget": bitget_summary,
}
execution_events = crypto_agent.get_recent_execution_events(limit=40)
attention_items = _build_attention_items(
crypto_status.get("platform_halts", {}),
platforms_payload,
unified_positions,
unified_orders,
execution_events,
)
payload = {
"status": "success",
"data": {
"generated_at": now.isoformat(),
"system": summary,
"crypto_agent": crypto_status,
"execution_events": execution_events,
"signals": {
"stats_7d": signal_stats,
"latest": latest_signals,
"recent_30m_count": recent_signal_count,
},
"platforms": platforms_payload,
"management": {
"positions": unified_positions[:18],
"orders": unified_orders[:24],
"attention_items": attention_items,
},
}
}
return _sanitize_for_response(payload)
except Exception as e:
logger.error(f"获取总控台快照失败: {e}")
raise HTTPException(status_code=500, detail=f"获取总控台快照失败: {str(e)}")