"""Runtime configuration storage. rules.yaml is the read-only baseline. Anything that can change online lives here so code deploys cannot overwrite production state. """ from __future__ import annotations import copy import json from datetime import datetime from app.db.schema import get_conn, init_db STRATEGY_TABLE = "strategy_runtime_config" SYSTEM_TABLE = "system_config" SYSTEM_CONFIG_POLICY = { "paper_trading": { "visible": True, "editable": True, "delete_allowed": False, "category": "策略交易", "source_of_truth": "配置中心", "managed_by": "config_center", "warning": "", }, "event_driven": { "visible": True, "editable": True, "delete_allowed": False, "category": "事件驱动", "source_of_truth": "配置中心", "managed_by": "config_center", "warning": "", }, "event_driven.sources": { "visible": True, "editable": True, "delete_allowed": True, "category": "事件驱动", "source_of_truth": "配置中心", "managed_by": "config_center", "warning": "", }, "sentiment": { "visible": True, "editable": True, "delete_allowed": False, "category": "舆情", "source_of_truth": "配置中心", "managed_by": "config_center", "warning": "", }, "monitoring": { "visible": True, "editable": True, "delete_allowed": False, "category": "监控复盘", "source_of_truth": "配置中心", "managed_by": "config_center", "warning": "", }, "notification": { "visible": True, "editable": True, "delete_allowed": False, "category": "通知", "source_of_truth": "配置中心 + 环境变量", "managed_by": "config_center", "warning": "只允许配置开关和 env key 名,真实 webhook 必须放在环境变量。", }, "scheduler": { "visible": True, "editable": True, "delete_allowed": False, "category": "调度", "source_of_truth": "配置中心", "managed_by": "config_center", "warning": "", }, "llm": { "visible": False, "editable": False, "delete_allowed": False, "category": "AI 基础设施", "source_of_truth": "环境变量", "managed_by": "env", "warning": "LLM endpoint、model 和 API key env 由部署环境管理,不在配置中心公开编辑。", }, "live_trading": { "visible": False, "editable": False, "delete_allowed": False, "category": "实盘基础设施", "source_of_truth": "实盘账号页 + 环境变量", "managed_by": "live_trading_console", "warning": "实盘交易所 endpoint、API env 指针和账号风控由实盘控制台或环境变量管理。", }, "price_streamer": { "visible": False, "editable": False, "delete_allowed": False, "category": "行情基础设施", "source_of_truth": "环境变量", "managed_by": "env", "warning": "行情源、websocket URL、缓存目录等属于基础设施配置,不在配置中心公开编辑。", }, "email": { "visible": False, "editable": False, "delete_allowed": False, "category": "邮件基础设施", "source_of_truth": "环境变量", "managed_by": "env", "warning": "SMTP host/user/password/sender 只允许通过环境变量管理。", }, "bootstrap_admin": { "visible": False, "editable": False, "delete_allowed": False, "category": "启动安全", "source_of_truth": "环境变量", "managed_by": "env", "warning": "默认管理员启动参数只允许通过环境变量管理。", }, } DEFAULT_SYSTEM_CONFIG_POLICY = { "visible": False, "editable": False, "delete_allowed": False, "category": "内部配置", "source_of_truth": "代码/环境变量", "managed_by": "internal", "warning": "未登记的 system 配置默认不在配置中心公开,避免配置漂移。", } def config_policy(kind: str, key: str) -> dict: if kind == "strategy": return { "visible": True, "editable": True, "delete_allowed": True, "category": "策略运行时", "source_of_truth": "配置中心", "managed_by": "config_center", "warning": "", } return copy.deepcopy(SYSTEM_CONFIG_POLICY.get(key, DEFAULT_SYSTEM_CONFIG_POLICY)) def is_config_editable(kind: str, key: str) -> bool: return bool(config_policy(kind, key).get("editable")) def is_config_delete_allowed(kind: str, key: str) -> bool: return bool(config_policy(kind, key).get("delete_allowed")) def _now() -> str: return datetime.now().isoformat() def _loads(value, fallback=None): try: if isinstance(value, str) and value.strip(): return json.loads(value) if isinstance(value, (dict, list)): return value except Exception: pass return copy.deepcopy(fallback if fallback is not None else {}) def _dumps(value) -> str: return json.dumps(value if value is not None else {}, ensure_ascii=False, sort_keys=True, default=str) def _table(kind: str) -> str: return SYSTEM_TABLE if kind == "system" else STRATEGY_TABLE def _ensure(): init_db() def get_config(kind: str, key: str, default=None): _ensure() table = _table(kind) conn = get_conn() try: row = conn.execute(f"SELECT config_json FROM {table} WHERE config_key=%s", (key,)).fetchone() finally: conn.close() if not row: return copy.deepcopy(default) return _loads(row["config_json"], default if default is not None else {}) def set_config(kind: str, key: str, value, *, description: str = "", source: str = "manual", updated_by: str = "", is_secret: bool = False): _ensure() table = _table(kind) now = _now() conn = get_conn() try: if table == SYSTEM_TABLE: conn.execute( """ INSERT INTO system_config (config_key, config_json, description, source, is_secret, updated_by, created_at, updated_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT(config_key) DO UPDATE SET config_json=excluded.config_json, description=COALESCE(NULLIF(excluded.description, ''), system_config.description), source=excluded.source, is_secret=excluded.is_secret, updated_by=excluded.updated_by, updated_at=excluded.updated_at """, (key, _dumps(value), description, source, int(bool(is_secret)), updated_by, now, now), ) else: conn.execute( """ INSERT INTO strategy_runtime_config (config_key, config_json, description, source, updated_by, created_at, updated_at) VALUES (%s,%s,%s,%s,%s,%s,%s) ON CONFLICT(config_key) DO UPDATE SET config_json=excluded.config_json, description=COALESCE(NULLIF(excluded.description, ''), strategy_runtime_config.description), source=excluded.source, updated_by=excluded.updated_by, updated_at=excluded.updated_at """, (key, _dumps(value), description, source, updated_by, now, now), ) conn.commit() finally: conn.close() return get_config(kind, key) def delete_config(kind: str, key: str) -> bool: _ensure() table = _table(kind) conn = get_conn() try: cur = conn.execute(f"DELETE FROM {table} WHERE config_key=%s", (key,)) conn.commit() return getattr(cur, "rowcount", 0) > 0 finally: conn.close() def list_configs(kind: str, *, include_hidden: bool = False): _ensure() table = _table(kind) conn = get_conn() try: rows = conn.execute(f"SELECT * FROM {table} ORDER BY config_key").fetchall() finally: conn.close() items = [] for row in rows: item = dict(row) policy = config_policy(kind, item.get("config_key", "")) if not include_hidden and not policy.get("visible"): continue item["config"] = _loads(item.pop("config_json", "{}"), {}) item["kind"] = kind item.update(policy) if item.get("is_secret"): item["config"] = {"masked": True} items.append(item) return items def deep_merge(base, override): if not isinstance(base, dict) or not isinstance(override, dict): return copy.deepcopy(override) merged = copy.deepcopy(base) for key, value in override.items(): if isinstance(value, dict) and isinstance(merged.get(key), dict): merged[key] = deep_merge(merged[key], value) else: merged[key] = copy.deepcopy(value) return merged def get_strategy_override(): return get_config("strategy", "rules_override", default={}) def set_strategy_override(value, updated_by="", source="manual"): return set_config("strategy", "rules_override", value, description="Runtime strategy override layered on top of rules.yaml", source=source, updated_by=updated_by) def get_strategy_meta(default=None): return get_config("strategy", "meta", default=default) def set_strategy_meta(value, updated_by="", source="runtime"): return set_config("strategy", "meta", value, description="Runtime strategy iteration metadata", source=source, updated_by=updated_by) def get_learned_rules_config(default=None): return get_config("strategy", "learned_rules", default=default) def set_learned_rules_config(value, updated_by="", source="runtime"): return set_config("strategy", "learned_rules", value, description="Runtime learned rules released by review gate", source=source, updated_by=updated_by) def get_event_sources(default=None): return get_config("system", "event_driven.sources", default=default) def set_event_sources(value, updated_by="", source="manual"): return set_config("system", "event_driven.sources", value, description="Runtime news/event sources", source=source, updated_by=updated_by) def get_event_driven_config(default=None): return get_config("system", "event_driven", default=default) def set_event_driven_config(value, updated_by="", source="manual"): return set_config("system", "event_driven", value, description="Runtime event/news driven screening settings", source=source, updated_by=updated_by) def get_sentiment_config(default=None): return get_config("system", "sentiment", default=default) def set_sentiment_config(value, updated_by="", source="manual"): return set_config("system", "sentiment", value, description="Runtime sentiment monitoring settings", source=source, updated_by=updated_by) def get_monitoring_config(default=None): return get_config("system", "monitoring", default=default) def set_monitoring_config(value, updated_by="", source="manual"): return set_config("system", "monitoring", value, description="Runtime monitoring and audit settings", source=source, updated_by=updated_by) def get_llm_config(default=None): return get_config("system", "llm", default=default) def set_llm_config(value, updated_by="", source="manual"): return set_config("system", "llm", value, description="LLM provider and module switches; API key stays in env", source=source, updated_by=updated_by) def get_paper_trading_config(default=None): return get_config("system", "paper_trading", default=default) def set_paper_trading_config(value, updated_by="", source="manual"): return set_config("system", "paper_trading", value, description="Paper trading account and execution model", source=source, updated_by=updated_by) def get_live_trading_config(default=None): return get_config("system", "live_trading", default=default) def set_live_trading_config(value, updated_by="", source="manual"): return set_config("system", "live_trading", value, description="Live trading exchange, account and risk settings; API secrets stay in env", source=source, updated_by=updated_by) def get_price_streamer_config(default=None): return get_config("system", "price_streamer", default=default) def set_price_streamer_config(value, updated_by="", source="manual"): return set_config("system", "price_streamer", value, description="Realtime websocket price streamer settings", source=source, updated_by=updated_by) def get_notification_config(default=None): return get_config("system", "notification", default=default) def set_notification_config(value, updated_by="", source="manual"): return set_config("system", "notification", value, description="Notification channel switches and env pointers", source=source, updated_by=updated_by) def get_email_config(default=None): return get_config("system", "email", default=default) def set_email_config(value, updated_by="", source="manual"): return set_config("system", "email", value, description="SMTP email settings; password stays in env", source=source, updated_by=updated_by) def get_bootstrap_admin_config(default=None): return get_config("system", "bootstrap_admin", default=default) def set_bootstrap_admin_config(value, updated_by="", source="manual"): return set_config("system", "bootstrap_admin", value, description="Default admin bootstrap env pointers", source=source, updated_by=updated_by) def get_scheduler_config(default=None): return get_config("system", "scheduler", default=default) def set_scheduler_config(value, updated_by="", source="manual"): return set_config("system", "scheduler", value, description="Scheduler runtime process settings", source=source, updated_by=updated_by) def seed_system_defaults(defaults: dict[str, tuple[object, str]]): seeded = [] for key, item in (defaults or {}).items(): value, description = item if get_config("system", key, default=None) is None: set_config("system", key, value, description=description, source="seed_default") seeded.append(key) return seeded __all__ = [ "deep_merge", "config_policy", "delete_config", "get_config", "get_event_driven_config", "get_event_sources", "get_bootstrap_admin_config", "get_email_config", "get_llm_config", "get_monitoring_config", "get_notification_config", "get_paper_trading_config", "get_price_streamer_config", "get_scheduler_config", "get_sentiment_config", "get_learned_rules_config", "get_strategy_meta", "get_strategy_override", "is_config_delete_allowed", "is_config_editable", "list_configs", "set_config", "set_event_driven_config", "set_event_sources", "set_bootstrap_admin_config", "set_email_config", "set_llm_config", "set_monitoring_config", "set_notification_config", "set_paper_trading_config", "set_price_streamer_config", "set_scheduler_config", "set_sentiment_config", "seed_system_defaults", "set_learned_rules_config", "set_strategy_meta", "set_strategy_override", ]