From cde11656c80e9f1dbb0820d5e394975f6c089680 Mon Sep 17 00:00:00 2001
From: aaron <>
Date: Thu, 30 Apr 2026 20:28:19 +0800
Subject: [PATCH] 1
---
backend/app/api/market.py | 36 +-
backend/app/db/database.py | 4 +
backend/app/db/tables.py | 43 +++
backend/app/engine/scheduler.py | 28 ++
backend/app/llm/batch_screener.py | 8 +-
backend/app/llm/prompts.py | 10 +
backend/app/llm/strategy_config.py | 421 ++++++++++++++++++++++
backend/app/llm/strategy_iteration.py | 239 +++++++++++-
backend/app/llm/strategy_selector.py | 6 +-
backend/app/main.py | 3 +
frontend/src/app/(auth)/strategy/page.tsx | 276 +++++++++++++-
frontend/src/lib/api.ts | 96 ++++-
12 files changed, 1152 insertions(+), 18 deletions(-)
create mode 100644 backend/app/llm/strategy_config.py
diff --git a/backend/app/api/market.py b/backend/app/api/market.py
index f57e19b9..632eefa6 100644
--- a/backend/app/api/market.py
+++ b/backend/app/api/market.py
@@ -2,7 +2,7 @@
from datetime import datetime
-from fastapi import APIRouter, Depends
+from fastapi import APIRouter, Depends, HTTPException
from app.data.tushare_client import tushare_client
from app.data import tencent_client
@@ -96,6 +96,37 @@ async def get_strategy_iteration(limit: int = 50):
return result
+@router.get("/strategy-configs")
+async def get_strategy_configs(_admin: dict = Depends(get_current_admin)):
+ """获取当前策略配置中心状态。"""
+ from app.llm.strategy_config import (
+ get_active_prompt_configs,
+ get_active_strategy_configs,
+ get_recent_config_changes,
+ )
+
+ return {
+ "strategies": await get_active_strategy_configs(),
+ "prompts": await get_active_prompt_configs(),
+ "changes": await get_recent_config_changes(limit=30),
+ }
+
+
+@router.post("/strategy-configs/{strategy_id}/rollback")
+async def rollback_strategy_config(strategy_id: str, _admin: dict = Depends(get_current_admin)):
+ """回滚某个策略到上一配置版本。"""
+ from app.llm.strategy_config import rollback_strategy_config as rollback
+
+ try:
+ result = await rollback(strategy_id)
+ except ValueError as e:
+ raise HTTPException(status_code=400, detail=str(e))
+ cache.delete("market:strategy_board:rules")
+ cache.delete("market:strategy_iteration:80:rules")
+ cache.delete("market:strategy_iteration:50:rules")
+ return {"status": "ok", "strategy": result}
+
+
@router.get("/ops-status")
async def get_ops_status():
"""管理员任务中心状态与数据新鲜度(只读,不触发扫描或 LLM)。"""
@@ -182,8 +213,9 @@ async def generate_strategy_board(_admin: dict = Depends(get_current_admin)):
async def generate_strategy_iteration(limit: int = 50, _admin: dict = Depends(get_current_admin)):
"""管理员手动生成带 LLM 分析的策略复盘"""
from app.llm.strategy_iteration import build_strategy_iteration_report
- result = await build_strategy_iteration_report(limit=limit, include_llm=True)
+ result = await build_strategy_iteration_report(limit=limit, include_llm=True, apply_auto_config=True)
cache.delete(f"market:strategy_iteration:{limit}:rules")
+ cache.delete("market:strategy_board:rules")
return result
async def _overview_realtime():
diff --git a/backend/app/db/database.py b/backend/app/db/database.py
index f5a8837c..a1b3ed54 100644
--- a/backend/app/db/database.py
+++ b/backend/app/db/database.py
@@ -93,6 +93,10 @@ async def init_db():
"ALTER TABLE watchlist_analyses ADD COLUMN full_analysis TEXT DEFAULT ''",
"ALTER TABLE watchlist_analyses ADD COLUMN score_reference REAL DEFAULT 0",
"ALTER TABLE watchlist_analyses ADD COLUMN analysis_mode TEXT DEFAULT 'scheduled'",
+ "ALTER TABLE strategy_configs ADD COLUMN evidence_json TEXT DEFAULT '{}'",
+ "ALTER TABLE strategy_configs ADD COLUMN effective_from DATETIME DEFAULT CURRENT_TIMESTAMP",
+ "ALTER TABLE prompt_configs ADD COLUMN evidence_json TEXT DEFAULT '{}'",
+ "ALTER TABLE strategy_config_changes ADD COLUMN prompt_key TEXT DEFAULT ''",
]:
try:
await conn.execute(
diff --git a/backend/app/db/tables.py b/backend/app/db/tables.py
index 5d191c03..f2086e5b 100644
--- a/backend/app/db/tables.py
+++ b/backend/app/db/tables.py
@@ -195,3 +195,46 @@ error_logs_table = Table(
Column("detail", Text, default=""), # 完整异常信息(traceback)
Column("created_at", DateTime, server_default=func.now()),
)
+
+strategy_configs_table = Table(
+ "strategy_configs", metadata,
+ Column("id", Integer, primary_key=True, autoincrement=True),
+ Column("strategy_id", Text, nullable=False),
+ Column("version", Integer, nullable=False, default=1),
+ Column("config_json", Text, nullable=False),
+ Column("is_active", Boolean, default=True),
+ Column("source", Text, default="manual"), # manual / auto_review / rollback / default_seed
+ Column("change_reason", Text, default=""),
+ Column("evidence_json", Text, default="{}"),
+ Column("effective_from", DateTime, server_default=func.now()),
+ Column("created_at", DateTime, server_default=func.now()),
+)
+
+prompt_configs_table = Table(
+ "prompt_configs", metadata,
+ Column("id", Integer, primary_key=True, autoincrement=True),
+ Column("prompt_key", Text, nullable=False),
+ Column("version", Integer, nullable=False, default=1),
+ Column("content", Text, nullable=False),
+ Column("is_active", Boolean, default=True),
+ Column("source", Text, default="manual"), # manual / candidate / rollback / default_seed
+ Column("change_reason", Text, default=""),
+ Column("evidence_json", Text, default="{}"),
+ Column("created_at", DateTime, server_default=func.now()),
+)
+
+strategy_config_changes_table = Table(
+ "strategy_config_changes", metadata,
+ Column("id", Integer, primary_key=True, autoincrement=True),
+ Column("change_type", Text, nullable=False), # auto_applied / pending / rollback / manual
+ Column("status", Text, default="pending"), # pending / applied / rejected
+ Column("strategy_id", Text, default=""),
+ Column("prompt_key", Text, default=""),
+ Column("base_version", Integer, default=0),
+ Column("new_version", Integer, default=0),
+ Column("diff_json", Text, default="{}"),
+ Column("evidence_json", Text, default="{}"),
+ Column("reason", Text, default=""),
+ Column("created_at", DateTime, server_default=func.now()),
+ Column("applied_at", DateTime),
+)
diff --git a/backend/app/engine/scheduler.py b/backend/app/engine/scheduler.py
index 13f49925..865f8fef 100644
--- a/backend/app/engine/scheduler.py
+++ b/backend/app/engine/scheduler.py
@@ -51,6 +51,29 @@ async def _run_watchlist_analysis():
await log_error("scheduler", f"自选股定时分析失败: {e}", detail=traceback.format_exc())
+async def _run_strategy_iteration():
+ """收盘后生成策略复盘,并允许小幅自动配置调整。"""
+ logger.info("=== 开始策略复盘与配置校准 ===")
+ try:
+ from app.llm.strategy_iteration import build_strategy_iteration_report
+ report = await build_strategy_iteration_report(limit=80, include_llm=False, apply_auto_config=True)
+ logger.info(
+ "策略复盘完成: sample=%s auto_change=%s",
+ report.get("sample_size", 0),
+ bool(report.get("auto_config_change")),
+ )
+ await broadcast_update({
+ "type": "strategy_iteration_ready",
+ "sample_size": report.get("sample_size", 0),
+ "auto_config_changed": bool(report.get("auto_config_change")),
+ "timestamp": datetime.now().isoformat(),
+ })
+ except Exception as e:
+ logger.error(f"策略复盘自动校准失败: {e}")
+ from app.db.error_logger import log_error
+ await log_error("scheduler", f"策略复盘自动校准失败: {e}", detail=traceback.format_exc())
+
+
def setup_scheduler():
"""配置所有定时任务(交易日时间)"""
@@ -93,6 +116,11 @@ def setup_scheduler():
id="watchlist_analysis", replace_existing=True
)
+ scheduler.add_job(
+ _run_strategy_iteration, CronTrigger(hour=16, minute=35, day_of_week="mon-fri"),
+ id="strategy_iteration", replace_existing=True
+ )
+
logger.info("盘中调度器已配置完成")
diff --git a/backend/app/llm/batch_screener.py b/backend/app/llm/batch_screener.py
index 6718e021..4f4ec546 100644
--- a/backend/app/llm/batch_screener.py
+++ b/backend/app/llm/batch_screener.py
@@ -18,6 +18,7 @@ logger = logging.getLogger(__name__)
async def prefilter_single_stock(candidate: dict, market_summary: str) -> dict:
"""对单只候选股票做轻量 LLM 预筛。"""
from app.llm.prompts import STOCK_PREFILTER_PROMPT
+ from app.llm.strategy_config import get_prompt_content
from app.llm.client import get_client
stock_text = f"""\
@@ -42,7 +43,8 @@ async def prefilter_single_stock(candidate: dict, market_summary: str) -> dict:
if candidate.get("intraday_volume"):
stock_text += f"\n\n## 分时量能摘要\n{candidate['intraday_volume']}"
- user_msg = f"{STOCK_PREFILTER_PROMPT}\n\n## 市场环境\n{market_summary}\n\n{stock_text}\n\n请输出 JSON。"
+ prompt = await get_prompt_content("stock_prefilter", STOCK_PREFILTER_PROMPT)
+ user_msg = f"{prompt}\n\n## 市场环境\n{market_summary}\n\n{stock_text}\n\n请输出 JSON。"
try:
client = get_client()
@@ -102,6 +104,7 @@ async def analyze_single_stock(candidate: dict, market_summary: str) -> dict:
}
"""
from app.llm.prompts import SINGLE_STOCK_ANALYSIS_PROMPT
+ from app.llm.strategy_config import get_prompt_content
from app.llm.client import get_client
# 构建 prompt — 不传 signal_type,让 LLM 独立判断
@@ -120,7 +123,8 @@ async def analyze_single_stock(candidate: dict, market_summary: str) -> dict:
if candidate.get("capital_flow_summary"):
stock_text += f"\n\n## 资金流向\n{candidate['capital_flow_summary']}"
- user_msg = f"{SINGLE_STOCK_ANALYSIS_PROMPT}\n\n## 市场环境\n{market_summary}\n\n{stock_text}\n\n请给出你的分析。"
+ prompt = await get_prompt_content("single_stock_analysis", SINGLE_STOCK_ANALYSIS_PROMPT)
+ user_msg = f"{prompt}\n\n## 市场环境\n{market_summary}\n\n{stock_text}\n\n请给出你的分析。"
try:
client = get_client()
diff --git a/backend/app/llm/prompts.py b/backend/app/llm/prompts.py
index 2269b38c..76b3f5fc 100644
--- a/backend/app/llm/prompts.py
+++ b/backend/app/llm/prompts.py
@@ -187,3 +187,13 @@ STOCK_PREFILTER_PROMPT = """\
- confidence 必须是 1-10 整数
- focus_points 最多三条,尽量具体
- 如果拿不准,优先给 watch,不要滥给 ignore"""
+
+
+STRATEGY_ITERATION_PROMPT = """\
+请基于推荐复盘数据,输出策略迭代建议。
+要求:
+1. 明确指出最该收紧、保留、加强的策略或信号;
+2. 只提出可执行调整建议,不要泛泛而谈;
+3. 不要承诺收益;
+4. 区分可小幅自动配置的参数调整与需要人工确认的大改动;
+5. 180字以内。"""
diff --git a/backend/app/llm/strategy_config.py b/backend/app/llm/strategy_config.py
new file mode 100644
index 00000000..695573dd
--- /dev/null
+++ b/backend/app/llm/strategy_config.py
@@ -0,0 +1,421 @@
+"""策略配置中心
+
+把可迭代的策略参数和 Prompt 版本持久化到数据库。
+代码里的默认策略只作为兜底;一旦数据库有激活配置,下一轮扫描直接读取配置。
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+from datetime import datetime
+from typing import Any
+
+from sqlalchemy import text
+
+from app.db.database import get_db
+from app.db import tables
+
+logger = logging.getLogger(__name__)
+
+CONFIG_FIELDS = {
+ "name",
+ "description",
+ "entry_signal_priority",
+ "score_weights",
+ "min_score",
+ "buy_threshold",
+ "max_position_pct",
+ "allow_trading",
+ "actionable_limit",
+ "watch_limit",
+ "target_focus_sectors",
+ "market_stance",
+ "decision_note",
+ "notes",
+}
+
+PROMPT_DEFAULT_KEYS = {
+ "stock_prefilter": "STOCK_PREFILTER_PROMPT",
+ "single_stock_analysis": "SINGLE_STOCK_ANALYSIS_PROMPT",
+ "strategy_iteration": "STRATEGY_ITERATION_PROMPT",
+}
+
+
+def profile_to_config(profile) -> dict[str, Any]:
+ data = profile.model_dump() if hasattr(profile, "model_dump") else dict(profile)
+ return {key: data[key] for key in CONFIG_FIELDS if key in data}
+
+
+def apply_config_to_profile(profile, config: dict[str, Any] | None, generated_by: str = "config"):
+ if not config:
+ return profile
+ updated = profile.model_copy(deep=True)
+ for key, value in config.items():
+ if key in CONFIG_FIELDS and hasattr(updated, key):
+ setattr(updated, key, value)
+ updated.generated_by = generated_by
+ return updated
+
+
+async def load_active_strategy_profile(profile):
+ row = await _load_active_strategy_row(profile.strategy_id)
+ if not row:
+ return profile
+ config = _json_loads(row["config_json"], {})
+ updated = apply_config_to_profile(profile, config, generated_by=f"config:v{row['version']}")
+ updated.feedback_applied = True
+ updated.feedback_notes = [
+ f"策略配置版本 v{row['version']} ({row['source']}) 已生效",
+ row["change_reason"] or "使用配置中心激活版本",
+ ]
+ return updated
+
+
+async def get_active_strategy_configs() -> list[dict]:
+ await ensure_default_configs()
+ async with get_db() as db:
+ result = await db.execute(
+ text(
+ "SELECT * FROM strategy_configs "
+ "WHERE is_active = 1 "
+ "ORDER BY strategy_id ASC, version DESC"
+ )
+ )
+ return [_format_strategy_row(row._mapping) for row in result.fetchall()]
+
+
+async def get_recent_config_changes(limit: int = 20) -> list[dict]:
+ async with get_db() as db:
+ result = await db.execute(
+ text(
+ "SELECT * FROM strategy_config_changes "
+ "ORDER BY id DESC LIMIT :limit"
+ ),
+ {"limit": limit},
+ )
+ return [_format_change_row(row._mapping) for row in result.fetchall()]
+
+
+async def get_active_prompt_configs() -> list[dict]:
+ await ensure_default_configs()
+ async with get_db() as db:
+ result = await db.execute(
+ text(
+ "SELECT * FROM prompt_configs "
+ "WHERE is_active = 1 "
+ "ORDER BY prompt_key ASC, version DESC"
+ )
+ )
+ return [_format_prompt_row(row._mapping) for row in result.fetchall()]
+
+
+async def get_prompt_content(prompt_key: str, default: str) -> str:
+ async with get_db() as db:
+ result = await db.execute(
+ text(
+ "SELECT content FROM prompt_configs "
+ "WHERE prompt_key = :key AND is_active = 1 "
+ "ORDER BY version DESC LIMIT 1"
+ ),
+ {"key": prompt_key},
+ )
+ row = result.fetchone()
+ return str(row._mapping["content"]) if row else default
+
+
+async def create_active_strategy_config(
+ strategy_id: str,
+ config: dict[str, Any],
+ *,
+ source: str,
+ reason: str,
+ evidence: dict[str, Any] | None = None,
+ change_type: str = "manual",
+) -> dict:
+ """写入一个新的激活策略配置版本,并记录变更。"""
+ async with get_db() as db:
+ base = await _load_active_strategy_row(strategy_id, db=db)
+ version = int(base["version"]) + 1 if base else 1
+ before = _json_loads(base["config_json"], {}) if base else {}
+ diff = _build_diff(before, config)
+
+ await db.execute(
+ text("UPDATE strategy_configs SET is_active = 0 WHERE strategy_id = :sid"),
+ {"sid": strategy_id},
+ )
+ await db.execute(
+ tables.strategy_configs_table.insert().values(
+ strategy_id=strategy_id,
+ version=version,
+ config_json=json.dumps(config, ensure_ascii=False),
+ is_active=True,
+ source=source,
+ change_reason=reason,
+ evidence_json=json.dumps(evidence or {}, ensure_ascii=False),
+ )
+ )
+ await db.execute(
+ tables.strategy_config_changes_table.insert().values(
+ change_type=change_type,
+ status="applied",
+ strategy_id=strategy_id,
+ base_version=int(base["version"]) if base else 0,
+ new_version=version,
+ diff_json=json.dumps(diff, ensure_ascii=False),
+ evidence_json=json.dumps(evidence or {}, ensure_ascii=False),
+ reason=reason,
+ applied_at=datetime.now(),
+ )
+ )
+ await db.commit()
+
+ row = await _load_active_strategy_row(strategy_id)
+ return _format_strategy_row(row)
+
+
+async def rollback_strategy_config(strategy_id: str) -> dict:
+ """回滚到当前策略的上一个版本。"""
+ async with get_db() as db:
+ active = await _load_active_strategy_row(strategy_id, db=db)
+ if not active:
+ raise ValueError("当前策略没有激活配置")
+ result = await db.execute(
+ text(
+ "SELECT * FROM strategy_configs "
+ "WHERE strategy_id = :sid AND version < :version "
+ "ORDER BY version DESC LIMIT 1"
+ ),
+ {"sid": strategy_id, "version": active["version"]},
+ )
+ previous_row = result.fetchone()
+ if not previous_row:
+ raise ValueError("没有可回滚的上一版本")
+ current = active
+ previous = previous_row._mapping
+ await db.execute(
+ text("UPDATE strategy_configs SET is_active = 0 WHERE strategy_id = :sid"),
+ {"sid": strategy_id},
+ )
+ await db.execute(
+ text("UPDATE strategy_configs SET is_active = 1, source = 'rollback' WHERE id = :id"),
+ {"id": previous["id"]},
+ )
+ await db.execute(
+ tables.strategy_config_changes_table.insert().values(
+ change_type="rollback",
+ status="applied",
+ strategy_id=strategy_id,
+ base_version=int(current["version"]),
+ new_version=int(previous["version"]),
+ diff_json=json.dumps(
+ _build_diff(_json_loads(current["config_json"], {}), _json_loads(previous["config_json"], {})),
+ ensure_ascii=False,
+ ),
+ reason=f"回滚 {strategy_id} 到 v{previous['version']}",
+ applied_at=datetime.now(),
+ )
+ )
+ await db.commit()
+ row = await _load_active_strategy_row(strategy_id)
+ return _format_strategy_row(row)
+
+
+async def ensure_default_configs() -> None:
+ """首次启动时把代码默认策略和默认 Prompt 种子写入数据库。"""
+ from app.llm.strategy_selector import get_strategy_profile_by_id
+ from app.llm import prompts
+
+ strategy_ids = ["breakout_attack", "pullback_rotation", "launch_probe", "defensive_watch"]
+ async with get_db() as db:
+ for strategy_id in strategy_ids:
+ count = (await db.execute(
+ text("SELECT COUNT(*) FROM strategy_configs WHERE strategy_id = :sid"),
+ {"sid": strategy_id},
+ )).scalar() or 0
+ if count:
+ continue
+ profile = get_strategy_profile_by_id(strategy_id)
+ await db.execute(
+ tables.strategy_configs_table.insert().values(
+ strategy_id=strategy_id,
+ version=1,
+ config_json=json.dumps(profile_to_config(profile), ensure_ascii=False),
+ is_active=True,
+ source="default_seed",
+ change_reason="初始化默认策略配置",
+ )
+ )
+
+ prompt_defaults = {
+ "stock_prefilter": getattr(prompts, "STOCK_PREFILTER_PROMPT", ""),
+ "single_stock_analysis": getattr(prompts, "SINGLE_STOCK_ANALYSIS_PROMPT", ""),
+ "strategy_iteration": getattr(prompts, "STRATEGY_ITERATION_PROMPT", ""),
+ }
+ for prompt_key, content in prompt_defaults.items():
+ if not content:
+ continue
+ count = (await db.execute(
+ text("SELECT COUNT(*) FROM prompt_configs WHERE prompt_key = :key"),
+ {"key": prompt_key},
+ )).scalar() or 0
+ if count:
+ continue
+ await db.execute(
+ tables.prompt_configs_table.insert().values(
+ prompt_key=prompt_key,
+ version=1,
+ content=content,
+ is_active=True,
+ source="default_seed",
+ change_reason="初始化默认 Prompt 配置",
+ )
+ )
+ await db.commit()
+
+
+async def maybe_auto_apply_review_adjustment(report: dict) -> dict | None:
+ """根据复盘报告做小幅自动配置调整。
+
+ 大幅结构调整仍只进入报告建议,不自动改配置。
+ """
+ sample_size = int(report.get("sample_size") or 0)
+ if sample_size < 10:
+ return None
+ if await _has_auto_change_today():
+ return None
+
+ for suggestion in report.get("adjustment_suggestions", []) or []:
+ strategy_id = suggestion.get("target", "")
+ if strategy_id not in {"breakout_attack", "pullback_rotation", "launch_probe", "defensive_watch"}:
+ continue
+ active = await _load_active_strategy_row(strategy_id)
+ if not active:
+ continue
+ config = _json_loads(active["config_json"], {})
+ changed = _apply_small_adjustment(config, suggestion.get("action", ""))
+ if not changed:
+ continue
+ evidence = {
+ "sample_size": sample_size,
+ "summary": report.get("summary", ""),
+ "suggestion": suggestion,
+ }
+ return await create_active_strategy_config(
+ strategy_id,
+ config,
+ source="auto_review",
+ reason=suggestion.get("reason", "复盘触发小幅自动配置调整"),
+ evidence=evidence,
+ change_type="auto_applied",
+ )
+ return None
+
+
+async def _load_active_strategy_row(strategy_id: str, db=None):
+ own_session = db is None
+ if own_session:
+ async with get_db() as session:
+ return await _load_active_strategy_row(strategy_id, db=session)
+ result = await db.execute(
+ text(
+ "SELECT * FROM strategy_configs "
+ "WHERE strategy_id = :sid AND is_active = 1 "
+ "ORDER BY version DESC LIMIT 1"
+ ),
+ {"sid": strategy_id},
+ )
+ row = result.fetchone()
+ return row._mapping if row else None
+
+
+async def _has_auto_change_today() -> bool:
+ async with get_db() as db:
+ count = (await db.execute(
+ text(
+ "SELECT COUNT(*) FROM strategy_config_changes "
+ "WHERE change_type = 'auto_applied' "
+ "AND date(created_at) = date('now', 'localtime')"
+ )
+ )).scalar() or 0
+ return count > 0
+
+
+def _apply_small_adjustment(config: dict[str, Any], action: str) -> bool:
+ if action == "tighten":
+ config["buy_threshold"] = min(float(config.get("buy_threshold", 60)) + 1, 80)
+ config["max_position_pct"] = max(float(config.get("max_position_pct", 10)) - 5, 0)
+ config["actionable_limit"] = max(int(config.get("actionable_limit", 1)) - 1, 0)
+ return True
+ if action == "promote":
+ config["buy_threshold"] = max(float(config.get("buy_threshold", 60)) - 1, float(config.get("min_score", 0)))
+ config["watch_limit"] = min(int(config.get("watch_limit", 3)) + 1, 8)
+ return True
+ if action == "reduce":
+ config["buy_threshold"] = min(float(config.get("buy_threshold", 60)) + 1, 80)
+ config["watch_limit"] = max(int(config.get("watch_limit", 3)) - 1, 1)
+ return True
+ return False
+
+
+def _build_diff(before: dict[str, Any], after: dict[str, Any]) -> dict[str, dict[str, Any]]:
+ diff: dict[str, dict[str, Any]] = {}
+ for key in sorted(set(before) | set(after)):
+ if before.get(key) != after.get(key):
+ diff[key] = {"from": before.get(key), "to": after.get(key)}
+ return diff
+
+
+def _json_loads(value: str | None, default):
+ try:
+ return json.loads(value or "")
+ except Exception:
+ return default
+
+
+def _format_strategy_row(row) -> dict:
+ if not row:
+ return {}
+ return {
+ "id": row["id"],
+ "strategy_id": row["strategy_id"],
+ "version": row["version"],
+ "config": _json_loads(row["config_json"], {}),
+ "is_active": bool(row["is_active"]),
+ "source": row["source"] or "",
+ "change_reason": row["change_reason"] or "",
+ "evidence": _json_loads(row["evidence_json"], {}),
+ "effective_from": str(row["effective_from"] or ""),
+ "created_at": str(row["created_at"] or ""),
+ }
+
+
+def _format_prompt_row(row) -> dict:
+ return {
+ "id": row["id"],
+ "prompt_key": row["prompt_key"],
+ "version": row["version"],
+ "content": row["content"],
+ "is_active": bool(row["is_active"]),
+ "source": row["source"] or "",
+ "change_reason": row["change_reason"] or "",
+ "evidence": _json_loads(row["evidence_json"], {}),
+ "created_at": str(row["created_at"] or ""),
+ }
+
+
+def _format_change_row(row) -> dict:
+ return {
+ "id": row["id"],
+ "change_type": row["change_type"],
+ "status": row["status"],
+ "strategy_id": row["strategy_id"] or "",
+ "prompt_key": row["prompt_key"] or "",
+ "base_version": row["base_version"] or 0,
+ "new_version": row["new_version"] or 0,
+ "diff": _json_loads(row["diff_json"], {}),
+ "evidence": _json_loads(row["evidence_json"], {}),
+ "reason": row["reason"] or "",
+ "created_at": str(row["created_at"] or ""),
+ "applied_at": str(row["applied_at"] or ""),
+ }
diff --git a/backend/app/llm/strategy_iteration.py b/backend/app/llm/strategy_iteration.py
index 8b4770b1..0e86f22c 100644
--- a/backend/app/llm/strategy_iteration.py
+++ b/backend/app/llm/strategy_iteration.py
@@ -14,10 +14,25 @@ from app.config import settings
logger = logging.getLogger(__name__)
-async def build_strategy_iteration_report(limit: int = 50, include_llm: bool = False) -> dict:
+async def build_strategy_iteration_report(
+ limit: int = 50,
+ include_llm: bool = False,
+ apply_auto_config: bool = False,
+) -> dict:
rows = await _load_recent_tracking(limit)
rule_report = _build_rule_report(rows)
+ auto_change = None
+ if apply_auto_config:
+ from app.llm.strategy_config import maybe_auto_apply_review_adjustment
+ try:
+ auto_change = await maybe_auto_apply_review_adjustment(rule_report)
+ except Exception as e:
+ logger.warning(f"自动策略配置调整失败: {e}")
+ if auto_change:
+ rule_report["auto_config_change"] = auto_change
+ rule_report["generated_by"] = "rules+auto_config"
+
if include_llm and settings.deepseek_api_key and rows:
ai_text = await _generate_ai_iteration(rule_report, rows)
if ai_text:
@@ -43,6 +58,8 @@ async def _load_recent_tracking(limit: int) -> list[dict]:
r_action_plan = _column_or_default(rec_columns, "action_plan", "'观察'", "r")
r_position_score = _column_or_default(rec_columns, "position_score", "50", "r")
r_lifecycle_status = _column_or_default(rec_columns, "lifecycle_status", "'candidate'", "r")
+ r_capital_score = _column_or_default(rec_columns, "capital_score", "0", "r")
+ r_recall_tags = _column_or_default(rec_columns, "recall_tags", "'[]'", "r")
t_max_return = _column_or_default(tracking_columns, "max_return_pct", "t.pct_from_entry", "t")
t_max_drawdown = _column_or_default(tracking_columns, "max_drawdown_pct", "t.pct_from_entry", "t")
t_days_since = _column_or_default(tracking_columns, "days_since_recommendation", "0", "t")
@@ -53,7 +70,9 @@ async def _load_recent_tracking(limit: int) -> list[dict]:
text(
"SELECT r.id, r.ts_code, r.name, r.sector, r.strategy, r.entry_signal_type, "
f"{r_action_plan} AS action_plan, r.score, r.market_temp_score, r.sector_score, "
- f"{r_position_score} AS position_score, {r_lifecycle_status} AS lifecycle_status, r.created_at, "
+ f"{r_capital_score} AS capital_score, {r_position_score} AS position_score, "
+ f"{r_lifecycle_status} AS lifecycle_status, {r_recall_tags} AS recall_tags, "
+ "r.entry_price, r.target_price, r.stop_loss, r.created_at, "
f"t.pct_from_entry, {t_max_return} AS max_return_pct, {t_max_drawdown} AS max_drawdown_pct, "
f"{t_days_since} AS days_since_recommendation, {t_close_reason} AS close_reason, "
f"{t_review_note} AS review_note, t.track_date "
@@ -94,7 +113,12 @@ def _build_rule_report(rows: list[dict]) -> dict:
"strategy_stats": [],
"signal_stats": [],
"failure_patterns": ["样本不足,先积累推荐生命周期数据。"],
+ "review_windows": [],
+ "failure_cases": [],
+ "success_patterns": [],
"adjustment_suggestions": [],
+ "agent_patch_prompts": [],
+ "auto_config_change": None,
"ai_analysis": "",
"generated_by": "rules",
}
@@ -104,6 +128,16 @@ def _build_rule_report(rows: list[dict]) -> dict:
signal_stats = _group_stats(tracked_rows, "entry_signal_type")
failure_patterns = _detect_failure_patterns(tracked_rows)
suggestions = _build_adjustment_suggestions(strategy_stats, signal_stats, failure_patterns, len(tracked_rows))
+ review_windows = _build_review_windows(tracked_rows)
+ failure_cases = _build_failure_cases(tracked_rows)
+ success_patterns = _build_success_patterns(tracked_rows)
+ patch_prompts = _build_agent_patch_prompts(
+ strategy_stats=strategy_stats,
+ signal_stats=signal_stats,
+ failure_patterns=failure_patterns,
+ failure_cases=failure_cases,
+ sample_size=len(tracked_rows),
+ )
wins = sum(1 for r in tracked_rows if (r.get("pct_from_entry") or 0) > 0)
avg_return = _avg([r.get("pct_from_entry") for r in tracked_rows])
@@ -120,7 +154,12 @@ def _build_rule_report(rows: list[dict]) -> dict:
"strategy_stats": strategy_stats,
"signal_stats": signal_stats,
"failure_patterns": failure_patterns,
+ "review_windows": review_windows,
+ "failure_cases": failure_cases,
+ "success_patterns": success_patterns,
"adjustment_suggestions": suggestions,
+ "agent_patch_prompts": patch_prompts,
+ "auto_config_change": None,
"ai_analysis": "",
"generated_by": "rules",
}
@@ -246,6 +285,192 @@ def _build_adjustment_suggestions(
return suggestions[:6]
+def _build_review_windows(rows: list[dict]) -> list[dict]:
+ windows = []
+ for days in [3, 5, 10]:
+ items = [
+ r for r in rows
+ if int(r.get("days_since_recommendation") or 0) >= days
+ ]
+ if not items:
+ windows.append({
+ "window_days": days,
+ "count": 0,
+ "win_rate": 0,
+ "avg_return": 0,
+ "hit_target_rate": 0,
+ "hit_stop_rate": 0,
+ "avg_max_return": 0,
+ "avg_max_drawdown": 0,
+ })
+ continue
+ wins = sum(1 for r in items if (r.get("pct_from_entry") or 0) > 0)
+ hit_target = sum(1 for r in items if r.get("close_reason") == "hit_target")
+ hit_stop = sum(1 for r in items if r.get("close_reason") == "hit_stop_loss")
+ count = len(items)
+ windows.append({
+ "window_days": days,
+ "count": count,
+ "win_rate": round(wins / count * 100, 1),
+ "avg_return": _avg([r.get("pct_from_entry") for r in items]),
+ "hit_target_rate": round(hit_target / count * 100, 1),
+ "hit_stop_rate": round(hit_stop / count * 100, 1),
+ "avg_max_return": _avg([r.get("max_return_pct") for r in items]),
+ "avg_max_drawdown": _avg([r.get("max_drawdown_pct") for r in items]),
+ })
+ return windows
+
+
+def _build_failure_cases(rows: list[dict]) -> list[dict]:
+ failures = [
+ r for r in rows
+ if (r.get("pct_from_entry") or 0) < 0
+ or (r.get("close_reason") in {"hit_stop_loss", "review_expired_loss", "review_expired_flat"})
+ or (r.get("max_drawdown_pct") or 0) < -5
+ ]
+ failures.sort(key=lambda r: ((r.get("pct_from_entry") or 0), (r.get("max_drawdown_pct") or 0)))
+ return [_case_summary(r) for r in failures[:8]]
+
+
+def _build_success_patterns(rows: list[dict]) -> list[dict]:
+ successes = [
+ r for r in rows
+ if r.get("close_reason") == "hit_target"
+ or (r.get("max_return_pct") or 0) >= 3
+ or (r.get("pct_from_entry") or 0) > 2
+ ]
+ successes.sort(key=lambda r: (r.get("max_return_pct") or 0), reverse=True)
+ return [_case_summary(r) for r in successes[:8]]
+
+
+def _case_summary(row: dict) -> dict:
+ recall_tags = row.get("recall_tags") or "[]"
+ try:
+ tags = json.loads(recall_tags) if isinstance(recall_tags, str) else recall_tags
+ except Exception:
+ tags = []
+ return {
+ "ts_code": row.get("ts_code"),
+ "name": row.get("name"),
+ "sector": row.get("sector") or "",
+ "strategy": row.get("strategy") or "unknown",
+ "entry_signal_type": row.get("entry_signal_type") or "unknown",
+ "action_plan": row.get("action_plan") or "观察",
+ "score": row.get("score") or 0,
+ "market_temp_score": row.get("market_temp_score") or 0,
+ "sector_score": row.get("sector_score") or 0,
+ "capital_score": row.get("capital_score") or 0,
+ "position_score": row.get("position_score") or 50,
+ "pct_from_entry": row.get("pct_from_entry") or 0,
+ "max_return_pct": row.get("max_return_pct") or 0,
+ "max_drawdown_pct": row.get("max_drawdown_pct") or 0,
+ "days_since_recommendation": row.get("days_since_recommendation") or 0,
+ "close_reason": row.get("close_reason") or "",
+ "review_note": row.get("review_note") or "",
+ "recall_tags": tags,
+ }
+
+
+def _build_agent_patch_prompts(
+ strategy_stats: list[dict],
+ signal_stats: list[dict],
+ failure_patterns: list[str],
+ failure_cases: list[dict],
+ sample_size: int,
+) -> list[dict]:
+ if sample_size < 10:
+ return []
+
+ prompts = []
+ weak_strategy = next(
+ (
+ s for s in strategy_stats
+ if s["count"] >= 3 and s["win_rate"] < 40 and s["avg_return"] < 0
+ ),
+ None,
+ )
+ if weak_strategy:
+ prompts.append(_patch_prompt(
+ title=f"收紧 {weak_strategy['name']} 策略配置",
+ severity="high",
+ evidence=f"样本{weak_strategy['count']}条,胜率{weak_strategy['win_rate']}%,平均收益{weak_strategy['avg_return']}%。",
+ target_files=["backend/app/llm/strategy_config.py", "backend/app/llm/strategy_selector.py"],
+ prompt=(
+ f"请基于策略复盘收紧 {weak_strategy['name']}。优先通过策略配置版本调整完成,不要改无关代码。"
+ f"证据:{weak_strategy['count']}条样本,胜率{weak_strategy['win_rate']}%,平均收益{weak_strategy['avg_return']}%,"
+ f"平均最大回撤{weak_strategy['avg_max_drawdown']}%。"
+ "请提高 buy_threshold 1-2 分,降低 actionable_limit 或 max_position_pct,并保留回滚记录。"
+ ),
+ ))
+
+ weak_signal = next(
+ (
+ s for s in signal_stats
+ if s["count"] >= 3 and s["avg_max_drawdown"] < -5
+ ),
+ None,
+ )
+ if weak_signal:
+ prompts.append(_patch_prompt(
+ title=f"降低 {weak_signal['name']} 信号风险暴露",
+ severity="medium",
+ evidence=f"样本{weak_signal['count']}条,平均最大回撤{weak_signal['avg_max_drawdown']}%。",
+ target_files=["backend/app/engine/screener.py", "backend/app/analysis/breakout_signals.py"],
+ prompt=(
+ f"请基于复盘结果降低 {weak_signal['name']} 信号的风险暴露。"
+ f"证据:样本{weak_signal['count']}条,平均最大回撤{weak_signal['avg_max_drawdown']}%,"
+ f"命中止损{weak_signal['hit_stop']}次。"
+ "请检查该信号的入场质量、位置过滤和止损设置,给出最小代码补丁,并保持其他信号行为不变。"
+ ),
+ ))
+
+ if any("弱势市场" in p for p in failure_patterns):
+ prompts.append(_patch_prompt(
+ title="强化弱势市场防守配置",
+ severity="high",
+ evidence="复盘显示弱势市场亏损样本集中。",
+ target_files=["backend/app/llm/strategy_config.py", "backend/app/llm/strategy_selector.py"],
+ prompt=(
+ "请强化弱势市场防守配置。证据:复盘显示市场温度低于45时亏损样本集中。"
+ "优先把低温环境下的 allow_trading、actionable_limit、buy_threshold 做成可配置护栏,"
+ "小幅收紧可自动生效,大幅禁用策略需生成待确认变更。"
+ ),
+ ))
+
+ if failure_cases and not prompts:
+ worst = failure_cases[0]
+ prompts.append(_patch_prompt(
+ title="复查推荐失效样本的共同过滤条件",
+ severity="medium",
+ evidence=f"最差样本 {worst['name']} 收益{worst['pct_from_entry']}%,最大回撤{worst['max_drawdown_pct']}%。",
+ target_files=["backend/app/engine/screener.py", "backend/app/llm/strategy_iteration.py"],
+ prompt=(
+ "请复查最近推荐失效样本的共同过滤条件,优先寻找可配置化的收紧项。"
+ f"最差样本:{worst['name']}({worst['ts_code']}),策略{worst['strategy']},"
+ f"信号{worst['entry_signal_type']},收益{worst['pct_from_entry']}%,"
+ f"最大回撤{worst['max_drawdown_pct']}%。"
+ "请不要凭单一样本大改策略,必须保留样本数门槛。"
+ ),
+ ))
+
+ return prompts[:4]
+
+
+def _patch_prompt(title: str, severity: str, evidence: str, target_files: list[str], prompt: str) -> dict:
+ return {
+ "title": title,
+ "severity": severity,
+ "evidence": evidence,
+ "target_files": target_files,
+ "prompt": prompt,
+ "acceptance_criteria": [
+ "python3 -m compileall backend/app 通过",
+ "策略配置变更有版本记录且可回滚",
+ "历史推荐和跟踪数据读取不受影响",
+ ],
+ }
+
+
def _derive_feedback_controls(report: dict) -> dict:
suggestions = report.get("adjustment_suggestions", []) or []
sample_size = int(report.get("sample_size") or 0)
@@ -309,6 +534,8 @@ def _derive_feedback_controls(report: dict) -> dict:
async def _generate_ai_iteration(rule_report: dict, rows: list[dict]) -> str:
from app.llm.client import chat_completion
+ from app.llm.prompts import STRATEGY_ITERATION_PROMPT
+ from app.llm.strategy_config import get_prompt_content
sample = [
{
@@ -325,12 +552,8 @@ async def _generate_ai_iteration(rule_report: dict, rows: list[dict]) -> str:
for r in rows[:20]
]
- user_msg = f"""请基于以下推荐复盘数据,输出策略迭代建议。
-要求:
-1. 明确指出最该收紧、保留、加强的策略或信号;
-2. 只提出可执行调整建议,不要泛泛而谈;
-3. 不要承诺收益;
-4. 180字以内。
+ prompt = await get_prompt_content("strategy_iteration", STRATEGY_ITERATION_PROMPT)
+ user_msg = f"""{prompt}
规则复盘:
{json.dumps(rule_report, ensure_ascii=False)}
diff --git a/backend/app/llm/strategy_selector.py b/backend/app/llm/strategy_selector.py
index c305448f..02158a6a 100644
--- a/backend/app/llm/strategy_selector.py
+++ b/backend/app/llm/strategy_selector.py
@@ -122,13 +122,15 @@ async def select_strategy_profile(
hot_sectors: list[SectorInfo],
intraday: bool,
) -> StrategyProfile:
+ from app.llm.strategy_config import load_active_strategy_profile
+
profile = _select_rule_profile(market_temp, hot_sectors, intraday)
- profile = await _apply_strategy_feedback(profile)
+ profile = await load_active_strategy_profile(profile)
if settings.deepseek_api_key:
llm_profile = await _select_llm_profile(market_temp, hot_sectors, intraday, profile)
if llm_profile:
- profile = llm_profile
+ profile = await load_active_strategy_profile(llm_profile)
return profile
diff --git a/backend/app/main.py b/backend/app/main.py
index 6b71e061..524231be 100644
--- a/backend/app/main.py
+++ b/backend/app/main.py
@@ -99,6 +99,9 @@ async def lifespan(app: FastAPI):
await init_db()
logger.info("数据库初始化完成")
await ensure_admin_exists()
+ from app.llm.strategy_config import ensure_default_configs
+ await ensure_default_configs()
+ logger.info("策略配置中心初始化完成")
start_scheduler()
logger.info("调度器已启动")
yield
diff --git a/frontend/src/app/(auth)/strategy/page.tsx b/frontend/src/app/(auth)/strategy/page.tsx
index 4b28905a..1c4b58bb 100644
--- a/frontend/src/app/(auth)/strategy/page.tsx
+++ b/frontend/src/app/(auth)/strategy/page.tsx
@@ -2,10 +2,14 @@
import { useCallback, useEffect, useMemo, useState } from "react";
import { useRouter } from "next/navigation";
-import { fetchAPI } from "@/lib/api";
+import { fetchAPI, postAPI } from "@/lib/api";
import { useAuth } from "@/hooks/use-auth";
import type {
+ AgentPatchPrompt,
PerformanceStats,
+ StrategyConfigCenter,
+ StrategyConfigChange,
+ StrategyConfigRecord,
StrategyAdjustment,
StrategyIterationReport,
StrategyStat,
@@ -24,21 +28,56 @@ export default function StrategyPage() {
const router = useRouter();
const [iteration, setIteration] = useState