""" 执行监管目标定义与默认注册工厂。 """ from __future__ import annotations from dataclasses import dataclass from typing import Any, List, Optional @dataclass class ExecutionTarget: """执行监管目标:平台 + 账户。""" target_key: str platform: str account_id: str service: Any executor: Any supports_pending_timeout: bool = True supports_position_management: bool = True supports_tpsl_repair: bool = False pending_tpsl_state_key: Optional[str] = None def build_default_execution_targets(agent: Any) -> List[ExecutionTarget]: """根据当前 agent 已启用的平台生成默认执行监管目标。""" targets: List[ExecutionTarget] = [] paper_executor = agent.executors.get('PaperTrading') if getattr(agent, 'paper_trading', None) and paper_executor: targets.append(ExecutionTarget( target_key="PaperTrading", platform="PaperTrading", account_id="default", service=agent.paper_trading, executor=paper_executor, supports_pending_timeout=True, supports_position_management=True, supports_tpsl_repair=False, )) bitget_services = getattr(agent, 'bitget_services', {}) or {} bitget_executors = getattr(agent, 'bitget_executors', {}) or {} for account_id, service in bitget_services.items(): executor = bitget_executors.get(account_id) if not service or not executor: continue target_key = f"Bitget:{account_id}" targets.append(ExecutionTarget( target_key=target_key, platform="Bitget", account_id=account_id, service=service, executor=executor, supports_pending_timeout=True, supports_position_management=True, supports_tpsl_repair=True, pending_tpsl_state_key=target_key, )) return targets