320 lines
13 KiB
Python
320 lines
13 KiB
Python
"""Review and strategy iteration-facing DB API."""
|
|
|
|
import json
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
|
|
from app.db.strategy_rule_queries import (
|
|
backfill_strategy_failure_patterns,
|
|
dry_run_strategy_candidate_performance,
|
|
generate_candidates_from_review_history,
|
|
get_strategy_failure_patterns,
|
|
get_strategy_iteration_dashboard,
|
|
get_strategy_rule_candidates,
|
|
refresh_strategy_candidate_performance,
|
|
loads_json_field,
|
|
)
|
|
from app.db.strategy_insights import get_strategy_insights
|
|
from app.db.schema import get_conn
|
|
|
|
|
|
def log_strategy_iteration(
|
|
run_date=None,
|
|
trigger_source="daily_review",
|
|
title="",
|
|
summary="",
|
|
findings=None,
|
|
problems=None,
|
|
actions=None,
|
|
changed_rules=None,
|
|
metrics=None,
|
|
related_symbols=None,
|
|
config_diff=None,
|
|
effect_summary=None,
|
|
pollution_summary=None,
|
|
strategy_version="",
|
|
version_change_summary="",
|
|
success_analysis=None,
|
|
failure_analysis=None,
|
|
candidate_rules=None,
|
|
release_decision="",
|
|
release_reason="",
|
|
confidence_level="",
|
|
promotion_state="research_only",
|
|
conn_provider=None,
|
|
):
|
|
"""记录一次策略复盘/迭代日志"""
|
|
conn_factory = conn_provider or get_conn
|
|
conn = conn_factory()
|
|
now = datetime.now().isoformat()
|
|
run_date = run_date or now[:10]
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO strategy_iteration_log (
|
|
run_date, created_at, trigger_source, title, summary,
|
|
findings_json, problems_json, actions_json, changed_rules_json,
|
|
metrics_json, related_symbols_json, config_diff_json, effect_summary_json,
|
|
pollution_summary_json,
|
|
strategy_version, version_change_summary,
|
|
success_analysis_json, failure_analysis_json, candidate_rules_json,
|
|
release_decision, release_reason, confidence_level, promotion_state
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
""",
|
|
(
|
|
run_date,
|
|
now,
|
|
trigger_source or "daily_review",
|
|
title or "未命名迭代",
|
|
summary or "",
|
|
json.dumps(findings or [], ensure_ascii=False, default=str),
|
|
json.dumps(problems or [], ensure_ascii=False, default=str),
|
|
json.dumps(actions or [], ensure_ascii=False, default=str),
|
|
json.dumps(changed_rules or [], ensure_ascii=False, default=str),
|
|
json.dumps(metrics or {}, ensure_ascii=False, default=str),
|
|
json.dumps(related_symbols or [], ensure_ascii=False, default=str),
|
|
json.dumps(config_diff or {}, ensure_ascii=False, default=str),
|
|
json.dumps(effect_summary or {}, ensure_ascii=False, default=str),
|
|
json.dumps(pollution_summary or {}, ensure_ascii=False, default=str),
|
|
(strategy_version or "").strip(),
|
|
(version_change_summary or "").strip(),
|
|
json.dumps(success_analysis or {}, ensure_ascii=False, default=str),
|
|
json.dumps(failure_analysis or {}, ensure_ascii=False, default=str),
|
|
json.dumps(candidate_rules or [], ensure_ascii=False, default=str),
|
|
(release_decision or "").strip(),
|
|
(release_reason or "").strip(),
|
|
(confidence_level or "").strip(),
|
|
(promotion_state or "research_only").strip(),
|
|
),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def get_strategy_iteration_logs(limit=30, conn_provider=None, json_loader=None):
|
|
conn_factory = conn_provider or get_conn
|
|
loader = json_loader or loads_json_field
|
|
conn = conn_factory()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT * FROM strategy_iteration_log
|
|
ORDER BY created_at DESC, id DESC
|
|
LIMIT %s
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
conn.close()
|
|
|
|
result = []
|
|
for row in rows:
|
|
item = dict(row)
|
|
item["findings"] = loader(item.get("findings_json"), [])
|
|
item["problems"] = loader(item.get("problems_json"), [])
|
|
item["actions"] = loader(item.get("actions_json"), [])
|
|
item["changed_rules"] = loader(item.get("changed_rules_json"), [])
|
|
item["metrics"] = loader(item.get("metrics_json"), {})
|
|
item["related_symbols"] = loader(item.get("related_symbols_json"), [])
|
|
item["config_diff"] = loader(item.get("config_diff_json"), {})
|
|
item["effect_summary"] = loader(item.get("effect_summary_json"), {})
|
|
item["pollution_summary"] = loader(item.get("pollution_summary_json"), {})
|
|
item["success_analysis"] = loader(item.get("success_analysis_json"), {})
|
|
item["failure_analysis"] = loader(item.get("failure_analysis_json"), {})
|
|
item["candidate_rules"] = loader(item.get("candidate_rules_json"), [])
|
|
item["release_decision"] = (item.get("release_decision") or "").strip()
|
|
item["release_reason"] = (item.get("release_reason") or "").strip()
|
|
item["confidence_level"] = (item.get("confidence_level") or "").strip()
|
|
item["promotion_state"] = (item.get("promotion_state") or "research_only").strip()
|
|
item["strategy_version"] = (item.get("strategy_version") or "").strip()
|
|
item["version_change_summary"] = (item.get("version_change_summary") or "").strip()
|
|
result.append(item)
|
|
return result
|
|
|
|
|
|
def get_strategy_iteration_summary(days=30, conn_provider=None, json_loader=None):
|
|
conn_factory = conn_provider or get_conn
|
|
loader = json_loader or loads_json_field
|
|
conn = conn_factory()
|
|
cutoff = (datetime.now() - timedelta(days=float(days or 30))).isoformat()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT * FROM strategy_iteration_log
|
|
WHERE created_at >= %s
|
|
ORDER BY created_at DESC, id DESC
|
|
""",
|
|
(cutoff,),
|
|
).fetchall()
|
|
rec_rows = conn.execute(
|
|
"""
|
|
SELECT strategy_version, status, pnl_pct, max_pnl_pct, max_drawdown_pct
|
|
FROM recommendation
|
|
WHERE strategy_version IS NOT NULL AND trim(strategy_version) != ''
|
|
"""
|
|
).fetchall()
|
|
conn.close()
|
|
|
|
def classify_recommendation_result(row):
|
|
status = row.get("status") or ""
|
|
pnl_pct = row.get("pnl_pct") or 0
|
|
max_pnl_pct = row.get("max_pnl_pct") or 0
|
|
max_drawdown_pct = row.get("max_drawdown_pct") or 0
|
|
if status in ("hit_tp1", "hit_tp2"):
|
|
return "success"
|
|
if status == "stopped_out":
|
|
return "failed"
|
|
if status == "expired":
|
|
if max_pnl_pct >= 5:
|
|
return "success"
|
|
if pnl_pct <= -3 or max_drawdown_pct <= -5:
|
|
return "failed"
|
|
return "pending"
|
|
if status == "active":
|
|
if max_pnl_pct >= 5:
|
|
return "success"
|
|
if pnl_pct <= -3 or max_drawdown_pct <= -5:
|
|
return "failed"
|
|
return "pending"
|
|
return "pending"
|
|
|
|
version_stats_map = {}
|
|
for row in rec_rows:
|
|
item = dict(row)
|
|
strategy_version = (item.get("strategy_version") or "").strip()
|
|
if not strategy_version:
|
|
continue
|
|
bucket = version_stats_map.setdefault(
|
|
strategy_version,
|
|
{
|
|
"strategy_version": strategy_version,
|
|
"recommendation_count": 0,
|
|
"success_count": 0,
|
|
"failed_count": 0,
|
|
"pending_count": 0,
|
|
"pnl_values": [],
|
|
},
|
|
)
|
|
bucket["recommendation_count"] += 1
|
|
bucket["pnl_values"].append(float(item.get("pnl_pct") or 0))
|
|
outcome = classify_recommendation_result(item)
|
|
if outcome == "success":
|
|
bucket["success_count"] += 1
|
|
elif outcome == "failed":
|
|
bucket["failed_count"] += 1
|
|
else:
|
|
bucket["pending_count"] += 1
|
|
|
|
logs = []
|
|
trigger_counts = {}
|
|
changed_rule_count = 0
|
|
unique_days = set()
|
|
titles = []
|
|
problem_keywords = {}
|
|
total_config_change_count = 0
|
|
hit_rates = []
|
|
avg_pnls = []
|
|
version_changelog = []
|
|
|
|
for row in rows:
|
|
item = dict(row)
|
|
item["findings"] = loader(item.get("findings_json"), [])
|
|
item["problems"] = loader(item.get("problems_json"), [])
|
|
item["actions"] = loader(item.get("actions_json"), [])
|
|
item["changed_rules"] = loader(item.get("changed_rules_json"), [])
|
|
item["metrics"] = loader(item.get("metrics_json"), {})
|
|
item["related_symbols"] = loader(item.get("related_symbols_json"), [])
|
|
item["config_diff"] = loader(item.get("config_diff_json"), {})
|
|
item["effect_summary"] = loader(item.get("effect_summary_json"), {})
|
|
item["pollution_summary"] = loader(item.get("pollution_summary_json"), {})
|
|
item["success_analysis"] = loader(item.get("success_analysis_json"), {})
|
|
item["failure_analysis"] = loader(item.get("failure_analysis_json"), {})
|
|
item["candidate_rules"] = loader(item.get("candidate_rules_json"), [])
|
|
item["release_decision"] = (item.get("release_decision") or "").strip()
|
|
item["release_reason"] = (item.get("release_reason") or "").strip()
|
|
item["confidence_level"] = (item.get("confidence_level") or "").strip()
|
|
item["promotion_state"] = (item.get("promotion_state") or "research_only").strip()
|
|
item["strategy_version"] = (item.get("strategy_version") or "").strip()
|
|
item["version_change_summary"] = (item.get("version_change_summary") or "").strip()
|
|
logs.append(item)
|
|
|
|
unique_days.add(item.get("run_date") or (item.get("created_at") or "")[:10])
|
|
trigger = item.get("trigger_source") or "unknown"
|
|
trigger_counts[trigger] = trigger_counts.get(trigger, 0) + 1
|
|
changed_rule_count += len(item.get("changed_rules") or [])
|
|
if item.get("title"):
|
|
titles.append(item["title"])
|
|
for problem in item.get("problems") or []:
|
|
key = str(problem).strip()
|
|
if key:
|
|
problem_keywords[key] = problem_keywords.get(key, 0) + 1
|
|
|
|
diff = item.get("config_diff") or {}
|
|
total_config_change_count += len(diff.get("changed") or []) + len(diff.get("added") or []) + len(diff.get("removed") or [])
|
|
|
|
effect = item.get("effect_summary") or {}
|
|
if isinstance(effect.get("hit_rate_pct"), (int, float)):
|
|
hit_rates.append(effect.get("hit_rate_pct"))
|
|
if isinstance(effect.get("avg_pnl"), (int, float)):
|
|
avg_pnls.append(effect.get("avg_pnl"))
|
|
|
|
if item.get("strategy_version"):
|
|
version_changelog.append({
|
|
"strategy_version": item.get("strategy_version"),
|
|
"created_at": item.get("created_at"),
|
|
"run_date": item.get("run_date"),
|
|
"title": item.get("title") or "",
|
|
"summary": item.get("summary") or "",
|
|
"version_change_summary": item.get("version_change_summary") or "",
|
|
"changed_rules_count": len(item.get("changed_rules") or []),
|
|
"config_change_count": len(diff.get("changed") or []) + len(diff.get("added") or []) + len(diff.get("removed") or []),
|
|
})
|
|
|
|
top_problems = sorted(problem_keywords.items(), key=lambda x: (-x[1], x[0]))[:5]
|
|
|
|
def _version_sort_key(version):
|
|
nums = [int(x) for x in re.findall(r"\d+", str(version or ""))]
|
|
return tuple(nums) if nums else (0,)
|
|
|
|
version_stats = []
|
|
for strategy_version, bucket in sorted(version_stats_map.items(), key=lambda kv: _version_sort_key(kv[0]), reverse=True):
|
|
resolved = bucket["success_count"] + bucket["failed_count"]
|
|
version_stats.append({
|
|
"strategy_version": strategy_version,
|
|
"recommendation_count": bucket["recommendation_count"],
|
|
"success_count": bucket["success_count"],
|
|
"failed_count": bucket["failed_count"],
|
|
"pending_count": bucket["pending_count"],
|
|
"success_rate_pct": round(bucket["success_count"] / resolved * 100, 1) if resolved else 0,
|
|
"avg_pnl_pct": round(sum(bucket["pnl_values"]) / len(bucket["pnl_values"]), 2) if bucket["pnl_values"] else 0,
|
|
})
|
|
|
|
return {
|
|
"days": days,
|
|
"total_logs": len(logs),
|
|
"unique_run_days": len(unique_days),
|
|
"trigger_counts": trigger_counts,
|
|
"change_rule_count": changed_rule_count,
|
|
"config_change_count": total_config_change_count,
|
|
"recent_titles": titles[:8],
|
|
"top_problems": [{"problem": k, "count": v} for k, v in top_problems],
|
|
"version_stats": version_stats,
|
|
"version_changelog": version_changelog[:12],
|
|
"effect_overview": {
|
|
"avg_hit_rate_pct": round(sum(hit_rates) / len(hit_rates), 1) if hit_rates else 0,
|
|
"avg_pnl": round(sum(avg_pnls) / len(avg_pnls), 2) if avg_pnls else 0,
|
|
"samples": len(logs),
|
|
},
|
|
}
|
|
|
|
|
|
__all__ = [
|
|
"backfill_strategy_failure_patterns",
|
|
"dry_run_strategy_candidate_performance",
|
|
"generate_candidates_from_review_history",
|
|
"get_strategy_failure_patterns",
|
|
"get_strategy_insights",
|
|
"get_strategy_iteration_dashboard",
|
|
"get_strategy_iteration_logs",
|
|
"get_strategy_iteration_summary",
|
|
"get_strategy_rule_candidates",
|
|
"log_strategy_iteration",
|
|
"refresh_strategy_candidate_performance",
|
|
]
|