188 lines
8.2 KiB
Python
188 lines
8.2 KiB
Python
"""Research review aggregation.
|
|
|
|
This module connects recommendation tracking with the new research layer, so
|
|
we can review which themes, chain nodes, signals and risks are working.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
from typing import Any
|
|
|
|
from sqlalchemy import text
|
|
|
|
from app.db.database import get_db
|
|
|
|
|
|
async def build_research_review(days: int = 60) -> dict:
|
|
start = (datetime.now() - timedelta(days=max(1, days))).strftime("%Y-%m-%d")
|
|
rows = await _load_review_rows(start)
|
|
theme_rows = _breakdown(rows, "theme")
|
|
chain_rows = _breakdown(rows, "chain_node")
|
|
signal_rows = _breakdown(rows, "entry_signal_type")
|
|
risk_rows = _risk_breakdown(rows)
|
|
summary = _summary(rows, theme_rows, chain_rows, signal_rows, risk_rows)
|
|
return {
|
|
"days": days,
|
|
"sample_count": len(rows),
|
|
"tracked_count": sum(1 for item in rows if item.get("pct_from_entry") is not None),
|
|
"theme_breakdown": theme_rows,
|
|
"chain_breakdown": chain_rows,
|
|
"signal_breakdown": signal_rows,
|
|
"risk_breakdown": risk_rows,
|
|
"summary": summary,
|
|
}
|
|
|
|
|
|
async def _load_review_rows(start: str) -> list[dict[str, Any]]:
|
|
async with get_db() as db:
|
|
result = await db.execute(
|
|
text(
|
|
"WITH latest_tracking AS ("
|
|
" SELECT t.* FROM recommendation_tracking t "
|
|
" INNER JOIN ("
|
|
" SELECT recommendation_id, MAX(id) AS max_id "
|
|
" FROM recommendation_tracking GROUP BY recommendation_id"
|
|
" ) lt ON t.id = lt.max_id"
|
|
"), latest_notes AS ("
|
|
" SELECT n.* FROM stock_research_notes n "
|
|
" INNER JOIN ("
|
|
" SELECT ts_code, MAX(id) AS max_id "
|
|
" FROM stock_research_notes GROUP BY ts_code"
|
|
" ) ln ON n.id = ln.max_id"
|
|
"), risk_summary AS ("
|
|
" SELECT ts_code, "
|
|
" GROUP_CONCAT(DISTINCT risk_type) AS risk_types, "
|
|
" MAX(CASE WHEN reject = 1 THEN 1 ELSE 0 END) AS rejected "
|
|
" FROM risk_events GROUP BY ts_code"
|
|
") "
|
|
"SELECT r.id, r.ts_code, r.name, r.sector, r.entry_signal_type, r.action_plan, "
|
|
" r.lifecycle_status, r.score, r.created_at, "
|
|
" lt.pct_from_entry, lt.max_return_pct, lt.max_drawdown_pct, lt.hit_target, "
|
|
" lt.hit_stop_loss, lt.close_reason, lt.review_note, "
|
|
" COALESCE(n.theme, r.sector, '未归类') AS theme, "
|
|
" COALESCE(n.chain_node, '未归类') AS chain_node, "
|
|
" COALESCE(n.stock_role, '待归类') AS stock_role, "
|
|
" COALESCE(n.logic_score, 0) AS logic_score, "
|
|
" COALESCE(rs.risk_types, '') AS risk_types, "
|
|
" COALESCE(rs.rejected, 0) AS risk_rejected "
|
|
"FROM recommendations r "
|
|
"LEFT JOIN latest_tracking lt ON lt.recommendation_id = r.id "
|
|
"LEFT JOIN latest_notes n ON n.ts_code = r.ts_code "
|
|
"LEFT JOIN risk_summary rs ON rs.ts_code = r.ts_code "
|
|
"WHERE r.created_at >= :start "
|
|
"ORDER BY r.created_at DESC, r.score DESC"
|
|
),
|
|
{"start": start},
|
|
)
|
|
return [dict(row._mapping) for row in result.fetchall()]
|
|
|
|
|
|
def _breakdown(rows: list[dict[str, Any]], key: str) -> list[dict]:
|
|
groups: dict[str, list[dict]] = defaultdict(list)
|
|
for row in rows:
|
|
label = str(row.get(key) or "未归类")
|
|
groups[label].append(row)
|
|
return [_build_group(label, items) for label, items in groups.items() if label][:12]
|
|
|
|
|
|
def _risk_breakdown(rows: list[dict[str, Any]]) -> list[dict]:
|
|
groups: dict[str, list[dict]] = defaultdict(list)
|
|
for row in rows:
|
|
risk_types = [item for item in str(row.get("risk_types") or "").split(",") if item]
|
|
for risk_type in risk_types:
|
|
groups[risk_type].append(row)
|
|
result = [_build_group(label, items) for label, items in groups.items()]
|
|
return sorted(result, key=lambda item: (item["sample_count"], abs(item["avg_return"])), reverse=True)[:10]
|
|
|
|
|
|
def _build_group(label: str, items: list[dict]) -> dict:
|
|
tracked = [item for item in items if item.get("pct_from_entry") is not None]
|
|
wins = [item for item in tracked if float(item.get("pct_from_entry") or 0) > 0]
|
|
avg_return = _avg(tracked, "pct_from_entry")
|
|
avg_max_return = _avg(tracked, "max_return_pct")
|
|
avg_drawdown = _avg(tracked, "max_drawdown_pct")
|
|
hit_target = sum(1 for item in tracked if item.get("hit_target"))
|
|
hit_stop = sum(1 for item in tracked if item.get("hit_stop_loss"))
|
|
return {
|
|
"label": label,
|
|
"sample_count": len(items),
|
|
"tracked_count": len(tracked),
|
|
"win_rate": round(len(wins) / len(tracked) * 100, 1) if tracked else 0,
|
|
"avg_return": round(avg_return, 2),
|
|
"avg_max_return": round(avg_max_return, 2),
|
|
"avg_drawdown": round(avg_drawdown, 2),
|
|
"hit_target_count": hit_target,
|
|
"hit_stop_count": hit_stop,
|
|
"effectiveness": _effectiveness(len(tracked), len(wins), avg_return, hit_stop),
|
|
"top_samples": [
|
|
{
|
|
"ts_code": item.get("ts_code"),
|
|
"name": item.get("name"),
|
|
"pct_from_entry": item.get("pct_from_entry"),
|
|
"max_return_pct": item.get("max_return_pct"),
|
|
"created_at": str(item.get("created_at") or "")[:10],
|
|
}
|
|
for item in sorted(tracked, key=lambda row: float(row.get("pct_from_entry") or 0), reverse=True)[:3]
|
|
],
|
|
}
|
|
|
|
|
|
def _summary(
|
|
rows: list[dict],
|
|
themes: list[dict],
|
|
chains: list[dict],
|
|
signals: list[dict],
|
|
risks: list[dict],
|
|
) -> dict:
|
|
tracked = [item for item in rows if item.get("pct_from_entry") is not None]
|
|
strongest_theme = _first_by_effectiveness(themes)
|
|
strongest_chain = _first_by_effectiveness(chains)
|
|
strongest_signal = _first_by_effectiveness(signals)
|
|
weakest_risk = sorted(risks, key=lambda item: item["avg_return"])[:1]
|
|
headline = "等待形成研究复盘样本"
|
|
if tracked:
|
|
headline = f"近 {len(tracked)} 个跟踪样本,{strongest_theme.get('label', '主题')} 相对更有效"
|
|
return {
|
|
"headline": headline,
|
|
"strongest_theme": strongest_theme,
|
|
"strongest_chain": strongest_chain,
|
|
"strongest_signal": strongest_signal,
|
|
"weakest_risk": weakest_risk[0] if weakest_risk else {},
|
|
"suggestions": _suggestions(strongest_theme, strongest_chain, strongest_signal, weakest_risk[0] if weakest_risk else {}),
|
|
}
|
|
|
|
|
|
def _first_by_effectiveness(items: list[dict]) -> dict:
|
|
eligible = [item for item in items if item["tracked_count"] >= 1]
|
|
if not eligible:
|
|
return {}
|
|
return sorted(eligible, key=lambda item: (item["effectiveness"], item["win_rate"], item["avg_return"]), reverse=True)[0]
|
|
|
|
|
|
def _suggestions(theme: dict, chain: dict, signal: dict, risk: dict) -> list[str]:
|
|
suggestions = []
|
|
if theme:
|
|
suggestions.append(f"提高 {theme['label']} 方向的复盘权重,当前胜率 {theme['win_rate']}%。")
|
|
if chain:
|
|
suggestions.append(f"优先跟踪 {chain['label']} 环节,平均收益 {chain['avg_return']}%。")
|
|
if signal:
|
|
suggestions.append(f"信号上关注 {signal['label']},减少低效入口。")
|
|
if risk and risk.get("avg_return", 0) < 0:
|
|
suggestions.append(f"命中 {risk['label']} 风险的样本表现偏弱,后续应降低排序或直接观察。")
|
|
return suggestions[:4] or ["样本不足,继续累积主题、环节和信号复盘数据。"]
|
|
|
|
|
|
def _effectiveness(tracked_count: int, wins: int, avg_return: float, hit_stop: int) -> float:
|
|
if tracked_count <= 0:
|
|
return 0
|
|
win_rate = wins / tracked_count * 100
|
|
stop_penalty = hit_stop / tracked_count * 18
|
|
return round(win_rate * 0.55 + avg_return * 2.5 - stop_penalty + min(tracked_count, 8), 2)
|
|
|
|
|
|
def _avg(rows: list[dict], key: str) -> float:
|
|
values = [float(item.get(key) or 0) for item in rows if item.get(key) is not None]
|
|
return sum(values) / len(values) if values else 0
|