astock-agent/backend/app/engine/recommender.py
2026-06-08 11:11:05 +08:00

1189 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""推荐引擎
管理推荐状态,提供推荐查询接口。
将筛选结果持久化并管理历史推荐。
"""
import logging
import json
import asyncio
import traceback
from functools import partial
from datetime import datetime, timedelta
from app.engine.screener import run_screening
from app.data.models import Recommendation, MarketTemperature, SectorInfo
from app.llm.strategy_selector import get_strategy_profile_by_id
from app.db.database import get_db
from app.db import tables
logger = logging.getLogger(__name__)
# 扫描锁:防止同时触发两次扫描
_scan_lock = asyncio.Lock()
_scan_running = False
async def _run_async_in_worker(async_fn, *args, **kwargs):
"""在独立工作线程中运行重负载异步任务,避免阻塞主事件循环。"""
runner = partial(asyncio.run, async_fn(*args, **kwargs))
return await asyncio.to_thread(runner)
def _has_valid_market_breadth(market_temp: MarketTemperature | None) -> bool:
if not market_temp:
return False
return (market_temp.up_count or 0) + (market_temp.down_count or 0) > 0
def _safe_json_dict(value) -> dict:
if not value:
return {}
if isinstance(value, dict):
return value
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
except Exception:
return {}
def _safe_json_list_value(value) -> list:
if not value:
return []
if isinstance(value, list):
return value
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, list) else []
except Exception:
return []
def _build_legacy_decision_trace(row) -> dict:
r = row._mapping if hasattr(row, "_mapping") else row
action_plan = r.get("action_plan") or "观察"
score = float(r.get("score") or 0)
tags = _safe_json_list_value(r.get("recall_tags"))
reasons = _safe_json_list_value(r.get("reasons"))
return {
"version": 0,
"headline": f"{action_plan}: {r.get('sector') or '未归类'}候选,综合分{score:.0f}",
"action_plan": action_plan,
"final_score": round(score, 1),
"route_tags": tags,
"evidence": reasons[:3],
"score_breakdown": [
{"key": "sector", "label": "主题热度", "score": round(float(r.get("sector_score") or 0), 1), "weight": 0},
{"key": "capital", "label": "资金", "score": round(float(r.get("capital_score") or 0), 1), "weight": 0},
{"key": "technical", "label": "技术", "score": round(float(r.get("technical_score") or 0), 1), "weight": 0},
],
"boosts": [],
"penalties": [],
"risk_tags": [],
"llm_adjustment": None,
}
def _scan_meta_from_row(row) -> dict | None:
if not row:
return None
r = row._mapping if hasattr(row, "_mapping") else row
detail = _safe_json_dict(r.get("detail_json"))
return {
"scan_session": r.get("scan_session") or "",
"scan_mode": r.get("scan_mode") or "",
"status": r.get("status") or "ok",
"stage": r.get("stage") or "",
"input_count": int(r.get("input_count") or 0),
"output_count": int(r.get("output_count") or 0),
"filtered_count": int(r.get("filtered_count") or 0),
"summary": r.get("summary") or "",
"created_at": str(r.get("created_at") or ""),
"date": str(r.get("created_at") or "")[:10] if r.get("created_at") else "",
"detail": detail,
"action_counts": detail.get("action_counts") or {},
"elimination_reasons": detail.get("elimination_reasons") or {},
}
async def _load_latest_completed_scan(db) -> dict | None:
from sqlalchemy import text
result = await db.execute(
text(
"SELECT * FROM scan_process_logs "
"WHERE stage = 'final_filter' "
"ORDER BY created_at DESC, id DESC LIMIT 1"
)
)
return _scan_meta_from_row(result.fetchone())
async def _load_latest_strategy_profile_for_session(db, scan_session: str) -> dict | None:
if not scan_session:
return None
from sqlalchemy import text
result = await db.execute(
text(
"SELECT detail_json FROM scan_process_logs "
"WHERE scan_session = :session AND stage = 'strategy_profile' "
"ORDER BY created_at DESC, id DESC LIMIT 1"
),
{"session": scan_session},
)
row = result.fetchone()
if not row:
return None
profile = _safe_json_dict(row._mapping.get("detail_json"))
return profile or None
async def refresh_recommendations(trade_date: str = None, scan_session: str = "manual") -> dict:
"""刷新推荐列表(带扫描锁防止并发)"""
global _scan_running
if _scan_lock.locked():
logger.warning("扫描已在执行中,跳过本次触发")
return await _load_today_from_db()
async with _scan_lock:
_scan_running = True
try:
# run_screening 内部混合了大量同步行情请求和 pandas 计算,
# 若直接在主事件循环执行,会导致页面读接口和 WebSocket 被拖住。
result = await _run_async_in_worker(run_screening, trade_date, scan_session=scan_session)
# 给每条推荐添加 scan_session
for rec in result.get("recommendations", []):
rec.scan_session = scan_session
rec.created_at = datetime.now()
# 持久化到数据库(这是 async 操作,需要在主线程中执行)
await _save_to_db(result)
# 推送本轮可操作/重点关注推荐,失败不影响扫描结果。
await _push_recommendation_notifications(result, scan_session)
# 更新历史推荐跟踪(检查之前推荐的后续表现)
await _update_tracking()
return result
finally:
_scan_running = False
async def _push_recommendation_notifications(result: dict, scan_session: str) -> None:
try:
from app.notifications.feishu import send_recommendation_push
sent = await send_recommendation_push(
recommendations=result.get("recommendations", []),
market_temp=result.get("market_temp"),
scan_session=scan_session,
strategy_profile=result.get("strategy_profile"),
)
if sent:
logger.info("已发送飞书推荐推送: scan_session=%s", scan_session)
except Exception as e:
logger.warning("飞书推荐推送失败: %s", e)
async def _update_tracking():
"""更新历史推荐的跟踪数据"""
try:
from sqlalchemy import text
from app.data.tushare_client import tushare_client
trade_date = await asyncio.to_thread(tushare_client.get_latest_trade_date)
async with get_db() as db:
# 查找所有活跃的推荐(有 entry_price 且未被标记为 closed
result = await db.execute(
text(
"SELECT id, ts_code, entry_price, target_price, stop_loss, "
"review_after_days, lifecycle_status, created_at "
"FROM recommendations "
"WHERE entry_price IS NOT NULL "
"AND entry_price > 0 "
"AND COALESCE(lifecycle_status, 'candidate') NOT IN ('closed_win', 'closed_loss', 'invalidated', 'expired') "
"AND date(created_at) <= date(:today) "
"ORDER BY created_at DESC LIMIT 50"
),
{"today": datetime.now().strftime("%Y-%m-%d")},
)
rows = result.fetchall()
if not rows:
return
# 获取这些股票的今日收盘价
codes = [r[1] for r in rows]
daily_all = await asyncio.to_thread(tushare_client.get_daily_all, trade_date)
price_map = {}
if not daily_all.empty:
for _, row in daily_all.iterrows():
if row["ts_code"] in codes:
price_map[row["ts_code"]] = row["close"]
tracked = 0
for r in rows:
rec_id, ts_code, entry_price, target_price, stop_loss, review_after_days, lifecycle_status, created_at = r
current_price = price_map.get(ts_code)
if current_price is None or entry_price is None or entry_price <= 0:
continue
track_metrics = await asyncio.to_thread(
_calculate_tracking_metrics,
ts_code=ts_code,
entry_price=float(entry_price),
current_price=float(current_price),
created_at=created_at,
latest_trade_date=trade_date,
)
pct = track_metrics["pct_from_entry"]
max_price = track_metrics["max_price"]
min_price = track_metrics["min_price"]
max_return_pct = track_metrics["max_return_pct"]
max_drawdown_pct = track_metrics["max_drawdown_pct"]
days_since = track_metrics["days_since_recommendation"]
hit_target = bool(target_price and max_price >= target_price)
hit_stop = bool(stop_loss and min_price <= stop_loss)
review_days = int(review_after_days or 3)
expired = days_since >= review_days and not hit_target and not hit_stop
status, new_lifecycle, close_reason = _derive_lifecycle_status(
hit_target=hit_target,
hit_stop=hit_stop,
expired=expired,
pct=pct,
previous_status=lifecycle_status or "candidate",
)
review_note = _build_review_note(
pct=pct,
max_return_pct=max_return_pct,
max_drawdown_pct=max_drawdown_pct,
days_since=days_since,
close_reason=close_reason,
)
# 检查今天是否已经跟踪过
exists = await db.execute(
text(
"SELECT id FROM recommendation_tracking "
"WHERE recommendation_id = :rid AND track_date = :td"
),
{"rid": rec_id, "td": trade_date},
)
if exists.fetchone():
continue
await db.execute(
tables.recommendation_tracking_table.insert().values(
recommendation_id=rec_id,
track_date=trade_date,
current_price=current_price,
pct_from_entry=pct,
max_price=max_price,
min_price=min_price,
max_return_pct=max_return_pct,
max_drawdown_pct=max_drawdown_pct,
days_since_recommendation=days_since,
hit_target=hit_target,
hit_stop_loss=hit_stop,
close_reason=close_reason,
review_note=review_note,
status=status,
)
)
await db.execute(
text(
"UPDATE recommendations SET lifecycle_status = :status "
"WHERE id = :rid"
),
{"status": new_lifecycle, "rid": rec_id},
)
tracked += 1
await db.commit()
if tracked > 0:
logger.info(f"已更新 {tracked} 条推荐跟踪记录")
except Exception as e:
logger.error(f"更新推荐跟踪失败: {e}")
from app.db.error_logger import log_error
await log_error("recommender", f"更新推荐跟踪失败: {e}", detail=traceback.format_exc())
def _calculate_tracking_metrics(
ts_code: str,
entry_price: float,
current_price: float,
created_at,
latest_trade_date: str,
) -> dict:
"""计算推荐后的收益、最大收益和最大回撤。
使用 Tushare 日线高低价回放推荐后的表现;失败时退化为当前价。
"""
from app.data.tushare_client import tushare_client
created_date = str(created_at)[:10] if created_at else ""
created_yyyymmdd = created_date.replace("-", "") if created_date else latest_trade_date
max_price = current_price
min_price = current_price
days_since = 0
try:
df = tushare_client.get_stock_daily(ts_code, days=60)
if not df.empty:
df = df[df["trade_date"] >= created_yyyymmdd].sort_values("trade_date")
if not df.empty:
max_price = float(df["high"].max())
min_price = float(df["low"].min())
days_since = len(df["trade_date"].unique()) - 1
except Exception as e:
logger.debug(f"计算跟踪指标失败 {ts_code}: {e}")
pct = round((current_price - entry_price) / entry_price * 100, 2)
max_return_pct = round((max_price - entry_price) / entry_price * 100, 2)
max_drawdown_pct = round((min_price - entry_price) / entry_price * 100, 2)
return {
"pct_from_entry": pct,
"max_price": round(max_price, 2),
"min_price": round(min_price, 2),
"max_return_pct": max_return_pct,
"max_drawdown_pct": max_drawdown_pct,
"days_since_recommendation": max(days_since, 0),
}
def _derive_lifecycle_status(
hit_target: bool,
hit_stop: bool,
expired: bool,
pct: float,
previous_status: str,
) -> tuple[str, str, str]:
if hit_target:
return "closed", "closed_win", "hit_target"
if hit_stop:
return "closed", "closed_loss", "hit_stop_loss"
if expired:
if pct > 0:
return "closed", "closed_win", "review_expired_profit"
if pct < -2:
return "closed", "closed_loss", "review_expired_loss"
return "closed", "expired", "review_expired_flat"
if previous_status == "actionable":
return "active", "tracking", ""
return "active", "tracking", ""
def _build_review_note(
pct: float,
max_return_pct: float,
max_drawdown_pct: float,
days_since: int,
close_reason: str,
) -> str:
if close_reason == "hit_target":
return f"{days_since}个交易日内命中目标,最大浮盈{max_return_pct}%"
if close_reason == "hit_stop_loss":
return f"{days_since}个交易日内触发止损,最大回撤{max_drawdown_pct}%"
if close_reason == "review_expired_profit":
return f"复盘窗口到期,当前收益{pct}%,最大浮盈{max_return_pct}%"
if close_reason == "review_expired_loss":
return f"复盘窗口到期,当前亏损{pct}%,最大回撤{max_drawdown_pct}%"
if close_reason == "review_expired_flat":
return f"复盘窗口到期,收益{pct}%,未形成有效进攻"
return f"跟踪中,当前收益{pct}%,最大浮盈{max_return_pct}%,最大回撤{max_drawdown_pct}%"
async def get_performance_stats() -> dict:
"""获取推荐胜率统计"""
try:
from sqlalchemy import text
latest_tracked_sql = (
"WITH latest_tracked AS ("
" SELECT r.id AS recommendation_id, t.pct_from_entry, t.max_return_pct, "
" t.max_drawdown_pct, t.hit_target, t.hit_stop_loss "
" FROM recommendations r "
" INNER JOIN recommendation_tracking t ON t.recommendation_id = r.id "
" INNER JOIN ("
" SELECT recommendation_id, MAX(id) as max_id "
" FROM recommendation_tracking GROUP BY recommendation_id"
" ) latest ON t.id = latest.max_id"
") "
)
async with get_db() as db:
# 总推荐数
result = await db.execute(
text("SELECT COUNT(DISTINCT id) FROM recommendations")
)
total = result.scalar() or 0
# 有跟踪记录的推荐
result = await db.execute(
text(
latest_tracked_sql +
"SELECT COUNT(*) FROM latest_tracked"
)
)
tracked = result.scalar() or 0
# 胜率基于最新跟踪日的最终 pct正值=盈利,负值=亏损)
result = await db.execute(
text(
latest_tracked_sql +
"SELECT COUNT(*) FROM latest_tracked WHERE pct_from_entry > 0"
)
)
winning = result.scalar() or 0
# 平均收益(基于最新跟踪日的 pct
result = await db.execute(
text(
latest_tracked_sql +
"SELECT AVG(pct_from_entry) FROM latest_tracked"
)
)
avg_return = result.scalar()
avg_return = round(float(avg_return), 2) if avg_return else 0
# 达到目标价的推荐
result = await db.execute(
text(
latest_tracked_sql +
"SELECT COUNT(*) FROM latest_tracked WHERE hit_target = 1"
)
)
hit_target_count = result.scalar() or 0
# 触发止损的推荐
result = await db.execute(
text(
latest_tracked_sql +
"SELECT COUNT(*) FROM latest_tracked WHERE hit_stop_loss = 1"
)
)
hit_stop_count = result.scalar() or 0
# 生命周期分布
result = await db.execute(
text(
"SELECT COALESCE(lifecycle_status, 'candidate') AS status, COUNT(*) AS cnt "
"FROM recommendations GROUP BY COALESCE(lifecycle_status, 'candidate')"
)
)
lifecycle_counts = {
row._mapping["status"]: row._mapping["cnt"]
for row in result.fetchall()
}
# 最大浮盈/最大回撤统计
result = await db.execute(
text(
latest_tracked_sql +
"SELECT AVG(max_return_pct), AVG(max_drawdown_pct) FROM latest_tracked"
)
)
avg_extremes = result.fetchone()
avg_max_return = round(float(avg_extremes[0]), 2) if avg_extremes and avg_extremes[0] is not None else 0
avg_max_drawdown = round(float(avg_extremes[1]), 2) if avg_extremes and avg_extremes[1] is not None else 0
# 全量跟踪样本用于复盘统计,页面详情只取最近一批展示。
result = await db.execute(
text(
"SELECT r.ts_code, r.name, r.signal, r.entry_price, "
" r.target_price, r.stop_loss, r.entry_signal_type, r.score, "
" r.action_plan, r.lifecycle_status, r.recall_tags, r.prefilter_decision, "
" t.pct_from_entry, t.current_price, t.track_date, t.hit_target, t.hit_stop_loss, "
" t.max_return_pct, t.max_drawdown_pct, t.days_since_recommendation, "
" t.close_reason, t.review_note, r.created_at "
"FROM recommendations r "
"INNER JOIN recommendation_tracking t ON t.recommendation_id = r.id "
"INNER JOIN ("
" SELECT recommendation_id, MAX(id) as max_id "
" FROM recommendation_tracking GROUP BY recommendation_id"
") latest ON t.id = latest.max_id "
"ORDER BY r.created_at DESC"
)
)
all_details = []
for row in result.fetchall():
r = row._mapping
all_details.append({
"ts_code": r["ts_code"],
"name": r["name"],
"signal": r["signal"],
"entry_signal_type": r["entry_signal_type"],
"action_plan": r["action_plan"],
"lifecycle_status": r["lifecycle_status"],
"recall_tags": json.loads(r["recall_tags"]) if r["recall_tags"] else [],
"prefilter_decision": r["prefilter_decision"] or "",
"score": r["score"],
"entry_price": r["entry_price"],
"target_price": r["target_price"],
"stop_loss": r["stop_loss"],
"current_price": r["current_price"],
"pct_from_entry": r["pct_from_entry"],
"max_return_pct": r["max_return_pct"],
"max_drawdown_pct": r["max_drawdown_pct"],
"days_since_recommendation": r["days_since_recommendation"],
"close_reason": r["close_reason"],
"review_note": r["review_note"],
"track_date": r["track_date"],
"hit_target": bool(r["hit_target"]),
"hit_stop_loss": bool(r["hit_stop_loss"]),
"created_at": str(r["created_at"])[:10] if r["created_at"] else "",
})
winning = min(winning, tracked)
win_rate = round(winning / tracked * 100, 1) if tracked > 0 else 0
details = all_details[:20]
return {
"total_recommendations": total,
"tracked": tracked,
"winning": winning,
"win_rate": win_rate,
"avg_return": avg_return,
"avg_max_return": avg_max_return,
"avg_max_drawdown": avg_max_drawdown,
"hit_target_count": hit_target_count,
"hit_stop_count": hit_stop_count,
"lifecycle_counts": lifecycle_counts,
"route_breakdown": _build_route_breakdown(all_details),
"prefilter_breakdown": _build_prefilter_breakdown(all_details),
"details": details,
}
except Exception as e:
logger.error(f"获取胜率统计失败: {e}")
from app.db.error_logger import log_error
await log_error("recommender", f"获取胜率统计失败: {e}", detail=traceback.format_exc())
return {
"total_recommendations": 0, "tracked": 0, "winning": 0,
"win_rate": 0, "avg_return": 0, "avg_max_return": 0,
"avg_max_drawdown": 0, "hit_target_count": 0,
"hit_stop_count": 0, "lifecycle_counts": {}, "route_breakdown": [], "prefilter_breakdown": [], "details": [],
}
async def get_latest_recommendations() -> dict:
"""获取最新推荐结果(直接从数据库读取,不做内存缓存)"""
return await _load_today_from_db()
async def get_latest_market_anomalies(limit: int = 8) -> list[dict]:
"""获取非主线市场异动观察,不混入主推荐池。"""
try:
async with get_db() as db:
from sqlalchemy import text
import json
result = await db.execute(
text(
"SELECT * FROM recommendations "
"WHERE date(created_at) = (SELECT date(created_at) FROM recommendations ORDER BY created_at DESC LIMIT 1) "
"AND COALESCE(recall_tags, '[]') NOT LIKE '%hot_theme_core%' "
"AND COALESCE(recall_tags, '[]') NOT LIKE '%theme_leader%' "
"AND COALESCE(recall_tags, '[]') NOT LIKE '%top_theme_member%' "
"AND COALESCE(recall_tags, '[]') NOT LIKE '%sector_recall%' "
"ORDER BY score DESC LIMIT :limit"
),
{"limit": limit},
)
rows = result.fetchall()
return [
{
"ts_code": r._mapping["ts_code"],
"name": r._mapping["name"],
"sector": r._mapping["sector"] or "",
"score": r._mapping["score"] or 0,
"action_plan": r._mapping["action_plan"] or "观察",
"recall_tags": json.loads(r._mapping.get("recall_tags") or "[]"),
"prefilter_decision": r._mapping.get("prefilter_decision") or "",
"reason": r._mapping.get("prefilter_reason") or "",
"created_at": str(r._mapping["created_at"]) if r._mapping["created_at"] else None,
}
for r in rows
]
except Exception as e:
logger.error(f"获取市场异动观察失败: {e}")
from app.db.error_logger import log_error
await log_error("recommender", f"获取市场异动观察失败: {e}", detail=traceback.format_exc())
return []
async def get_latest_sectors() -> list[SectorInfo]:
"""获取最新的板块热度数据(从数据库读取,不触发扫描)"""
return await _load_sectors_from_db()
async def get_recommendation_history(days: int = 7) -> list[dict]:
"""获取历史推荐记录,按日期分组返回"""
import json
start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
async with get_db() as db:
from sqlalchemy import text
scan_result = await db.execute(
text(
"SELECT s.* FROM scan_process_logs s "
"INNER JOIN ("
" SELECT date(created_at) AS scan_date, MAX(id) AS max_id "
" FROM scan_process_logs "
" WHERE stage = 'final_filter' AND created_at >= :start "
" GROUP BY date(created_at)"
") latest ON s.id = latest.max_id "
"ORDER BY s.created_at DESC, s.id DESC"
),
{"start": start},
)
scan_groups = {
meta["date"]: meta
for meta in (_scan_meta_from_row(row) for row in scan_result.fetchall())
if meta and meta.get("date")
}
# 查询所有历史推荐,按 ts_code 去重(每天取最新一条)
stmt = text(
"SELECT r.*, "
"latest_t.current_price AS latest_current_price, "
"latest_t.pct_from_entry AS latest_pct_from_entry, "
"latest_t.max_return_pct AS latest_max_return_pct, "
"latest_t.max_drawdown_pct AS latest_max_drawdown_pct, "
"latest_t.days_since_recommendation AS latest_days_since_recommendation, "
"latest_t.close_reason AS latest_close_reason, "
"latest_t.review_note AS latest_review_note, "
"latest_t.track_date AS latest_track_date "
"FROM recommendations r "
"LEFT JOIN ("
" SELECT t.* FROM recommendation_tracking t "
" INNER JOIN ("
" SELECT recommendation_id, MAX(id) AS max_id "
" FROM recommendation_tracking GROUP BY recommendation_id"
" ) lt ON t.id = lt.max_id"
") latest_t ON latest_t.recommendation_id = r.id "
"WHERE r.created_at >= :start "
"AND (r.action_plan IN ('可操作', '重点关注') OR r.score >= 56) "
"AND ("
" COALESCE(r.recall_tags, '[]') LIKE '%hot_theme_core%' "
" OR COALESCE(r.recall_tags, '[]') LIKE '%theme_leader%' "
" OR COALESCE(r.recall_tags, '[]') LIKE '%top_theme_member%' "
" OR COALESCE(r.recall_tags, '[]') LIKE '%sector_recall%'"
") "
"AND r.id IN ("
" SELECT MAX(id) FROM recommendations "
" WHERE created_at >= :start "
" GROUP BY date(created_at), ts_code"
") "
"ORDER BY r.created_at DESC, r.score DESC"
)
result = await db.execute(stmt, {"start": start})
rows = result.fetchall()
# 按日期分组。先用扫描日志建组,保证“当天扫描但无推荐”也会被展示。
grouped: dict[str, dict] = {
date_str: {"scan": scan_meta, "recommendations": []}
for date_str, scan_meta in scan_groups.items()
}
for row in rows:
r = row._mapping
# SQLite created_at 是字符串 "YYYY-MM-DD HH:MM:SS"
ca = r["created_at"]
if ca:
date_str = str(ca)[:10] # 取前10字符即日期部分
created_at_str = str(ca)
else:
date_str = "unknown"
created_at_str = None
rec_dict = {
"ts_code": r["ts_code"],
"name": r["name"],
"sector": r["sector"] or "",
"score": r["score"] or 0,
"level": _score_to_level_static(r["score"] or 0),
"signal": r["signal"] or "HOLD",
"market_temp_score": r["market_temp_score"] or 0,
"sector_score": r["sector_score"] or 0,
"capital_score": r["capital_score"] or 0,
"technical_score": r["technical_score"] or 0,
"supply_demand_score": r.get("supply_demand_score") or 0,
"price_action_score": r.get("price_action_score") or 0,
"position_score": r.get("position_score") or 50,
"valuation_score": r.get("valuation_score") or 50,
"entry_price": r["entry_price"],
"target_price": r["target_price"],
"stop_loss": r["stop_loss"],
"reasons": json.loads(r["reasons"]) if r["reasons"] else [],
"risk_note": r.get("risk_note") or "",
"entry_timing": r.get("entry_timing") or "",
"action_plan": r.get("action_plan") or "观察",
"trigger_condition": r.get("trigger_condition") or "",
"invalidation_condition": r.get("invalidation_condition") or "",
"suggested_position_pct": r.get("suggested_position_pct") or 0,
"review_after_days": r.get("review_after_days") or 3,
"lifecycle_status": r.get("lifecycle_status") or "candidate",
"data_freshness": r.get("data_freshness") or "",
"strategy": r.get("strategy") or "trend_breakout",
"entry_signal_type": r.get("entry_signal_type") or "none",
"llm_analysis": r.get("llm_analysis") or "",
"llm_score": r.get("llm_score"),
"recall_tags": json.loads(r.get("recall_tags") or "[]"),
"prefilter_decision": r.get("prefilter_decision") or "",
"prefilter_reason": r.get("prefilter_reason") or "",
"focus_points": json.loads(r.get("focus_points") or "[]"),
"decision_trace": _safe_json_dict(r.get("decision_trace")) or _build_legacy_decision_trace(r),
"tracking": {
"current_price": r.get("latest_current_price"),
"pct_from_entry": r.get("latest_pct_from_entry"),
"max_return_pct": r.get("latest_max_return_pct"),
"max_drawdown_pct": r.get("latest_max_drawdown_pct"),
"days_since_recommendation": r.get("latest_days_since_recommendation"),
"close_reason": r.get("latest_close_reason") or "",
"review_note": r.get("latest_review_note") or "",
"track_date": r.get("latest_track_date") or "",
} if r.get("latest_track_date") else None,
"scan_session": r["scan_session"] or "",
"created_at": created_at_str,
}
if date_str not in grouped:
grouped[date_str] = {"scan": None, "recommendations": []}
grouped[date_str]["recommendations"].append(rec_dict)
# 转为列表,按日期降序
result_list = []
for date_str in sorted(grouped.keys(), reverse=True):
group = grouped[date_str]
recs = group["recommendations"]
scan_meta = group["scan"]
buy_count = sum(1 for r in recs if r["signal"] == "BUY")
avg_score = round(sum(r["score"] for r in recs) / len(recs), 1) if recs else 0
result_list.append({
"date": date_str,
"count": len(recs),
"buy_count": buy_count,
"avg_score": avg_score,
"recommendations": recs,
"scan_session": scan_meta.get("scan_session") if scan_meta else "",
"scan_mode": scan_meta.get("scan_mode") if scan_meta else "",
"scan_status": scan_meta.get("status") if scan_meta else "",
"scanned_at": scan_meta.get("created_at") if scan_meta else "",
"scan_summary": scan_meta.get("summary") if scan_meta else "",
"scan_input_count": scan_meta.get("input_count") if scan_meta else 0,
"scan_output_count": scan_meta.get("output_count") if scan_meta else len(recs),
"scan_filtered_count": scan_meta.get("filtered_count") if scan_meta else 0,
"elimination_reasons": scan_meta.get("elimination_reasons") if scan_meta else {},
})
return result_list
def _score_to_level_static(score: float) -> str:
"""根据评分确定推荐等级"""
if score >= 75:
return "强烈推荐"
elif score >= 60:
return "推荐"
elif score >= 45:
return "关注"
else:
return "观望"
def _build_route_breakdown(details: list[dict]) -> list[dict]:
stats: dict[str, dict] = {}
for item in details:
for tag in item.get("recall_tags", []) or []:
bucket = stats.setdefault(tag, {"route": tag, "count": 0, "wins": 0, "avg_return_sum": 0.0})
bucket["count"] += 1
pct = float(item.get("pct_from_entry") or 0)
bucket["avg_return_sum"] += pct
if pct > 0:
bucket["wins"] += 1
result = []
for bucket in stats.values():
count = bucket["count"] or 1
result.append({
"route": bucket["route"],
"count": bucket["count"],
"win_rate": round(bucket["wins"] / count * 100, 1),
"avg_return": round(bucket["avg_return_sum"] / count, 2),
})
return sorted(result, key=lambda item: item["count"], reverse=True)
def _build_prefilter_breakdown(details: list[dict]) -> list[dict]:
stats: dict[str, dict] = {}
for item in details:
key = item.get("prefilter_decision") or "unknown"
bucket = stats.setdefault(key, {"decision": key, "count": 0, "wins": 0, "avg_return_sum": 0.0})
bucket["count"] += 1
pct = float(item.get("pct_from_entry") or 0)
bucket["avg_return_sum"] += pct
if pct > 0:
bucket["wins"] += 1
result = []
for bucket in stats.values():
count = bucket["count"] or 1
result.append({
"decision": bucket["decision"],
"count": bucket["count"],
"win_rate": round(bucket["wins"] / count * 100, 1),
"avg_return": round(bucket["avg_return_sum"] / count, 2),
})
return sorted(result, key=lambda item: item["count"], reverse=True)
async def _save_to_db(result: dict):
"""将推荐结果保存到数据库"""
try:
async with get_db() as db:
from sqlalchemy import bindparam, text
# 保存市场温度
mt = result.get("market_temp")
if mt:
if _has_valid_market_breadth(mt):
# 使用 INSERT OR REPLACE 确保重复扫描能更新数据
stmt = text(
"INSERT OR REPLACE INTO market_temperature "
"(trade_date, up_count, down_count, limit_up_count, limit_down_count, "
"max_streak, broken_rate, temperature) "
"VALUES (:td, :up, :down, :lu, :ld, :ms, :br, :temp)"
)
await db.execute(stmt, {
"td": mt.trade_date,
"up": mt.up_count,
"down": mt.down_count,
"lu": mt.limit_up_count,
"ld": mt.limit_down_count,
"ms": mt.max_streak,
"br": mt.broken_rate,
"temp": mt.temperature,
})
else:
logger.warning(
"跳过无效市场温度快照: trade_date=%s temperature=%s up=%s down=%s",
mt.trade_date,
mt.temperature,
mt.up_count,
mt.down_count,
)
# 保存板块热度(先清除同一 trade_date 的旧数据,再批量插入)
trade_date_val = mt.trade_date if mt else ""
if trade_date_val:
await db.execute(
text("DELETE FROM sector_heat WHERE trade_date = :td"),
{"td": trade_date_val},
)
sector_values = [
{
"sector_code": sector.sector_code,
"sector_name": sector.sector_name,
"board_type": sector.board_type,
"theme_id": sector.theme_id,
"theme_name": sector.theme_name,
"theme_aliases": json.dumps(sector.theme_aliases, ensure_ascii=False),
"pct_change": sector.pct_change,
"capital_inflow": sector.capital_inflow,
"limit_up_count": sector.limit_up_count,
"heat_score": sector.heat_score,
"stage": sector.stage,
"days_continuous": sector.days_continuous,
"member_count": sector.member_count,
"leading_stocks": json.dumps(sector.leading_stocks, ensure_ascii=False),
"pct_trend": json.dumps(sector.pct_trend, ensure_ascii=False),
"turnover_avg": sector.turnover_avg,
"main_force_ratio": sector.main_force_ratio,
"trade_date": trade_date_val,
}
for sector in result.get("hot_sectors", [])
]
if sector_values:
await db.execute(tables.sector_heat_table.insert(), sector_values)
# 保存推荐:先批量清除当日旧记录,再批量插入
today_str = datetime.now().strftime("%Y-%m-%d")
now_dt = datetime.now()
qualified_recs = [
rec for rec in result.get("recommendations", [])
if (
rec.action_plan in {"可操作", "重点关注"}
or rec.score >= 56
)
]
if qualified_recs:
# 批量删除当日同一 ts_code 的旧记录
codes = [rec.ts_code for rec in qualified_recs]
delete_stmt = text(
"DELETE FROM recommendations "
"WHERE date(created_at) = :today AND ts_code IN :codes"
).bindparams(bindparam("codes", expanding=True))
await db.execute(
delete_stmt,
{"today": today_str, "codes": codes},
)
# 批量插入新记录
rec_values = [
{
"ts_code": rec.ts_code,
"name": rec.name,
"sector": rec.sector,
"score": rec.score,
"market_temp_score": rec.market_temp_score,
"sector_score": rec.sector_score,
"capital_score": rec.capital_score,
"technical_score": rec.technical_score,
"supply_demand_score": rec.supply_demand_score,
"price_action_score": rec.price_action_score,
"position_score": rec.position_score,
"valuation_score": rec.valuation_score,
"signal": rec.signal,
"entry_price": rec.entry_price,
"target_price": rec.target_price,
"stop_loss": rec.stop_loss,
"reasons": json.dumps(rec.reasons, ensure_ascii=False),
"risk_note": rec.risk_note,
"action_plan": rec.action_plan,
"trigger_condition": rec.trigger_condition,
"invalidation_condition": rec.invalidation_condition,
"suggested_position_pct": rec.suggested_position_pct,
"review_after_days": rec.review_after_days,
"lifecycle_status": rec.lifecycle_status,
"data_freshness": rec.data_freshness,
"llm_analysis": rec.llm_analysis,
"strategy": rec.strategy,
"entry_signal_type": rec.entry_signal_type,
"entry_timing": rec.entry_timing,
"llm_score": rec.llm_score,
"recall_tags": json.dumps(rec.recall_tags, ensure_ascii=False),
"prefilter_decision": rec.prefilter_decision,
"prefilter_reason": rec.prefilter_reason,
"focus_points": json.dumps(rec.focus_points, ensure_ascii=False),
"decision_trace": json.dumps(rec.decision_trace, ensure_ascii=False),
"scan_session": rec.scan_session,
"created_at": now_dt,
}
for rec in qualified_recs
]
await db.execute(tables.recommendations_table.insert(), rec_values)
await db.commit()
logger.info(
f"已保存 {len(qualified_recs)} 条推荐到数据库"
f"(共 {len(result.get('recommendations', []))} 条,过滤掉低优先级候选)"
)
except Exception as e:
logger.error(f"保存推荐到数据库失败: {e}")
from app.db.error_logger import log_error
await log_error("recommender", f"保存推荐到数据库失败: {e}", detail=traceback.format_exc())
async def _load_today_from_db() -> dict:
"""从数据库加载今日推荐"""
today = datetime.now().strftime("%Y-%m-%d")
try:
async with get_db() as db:
from sqlalchemy import text
import json
latest_scan = await _load_latest_completed_scan(db)
latest_scan_date = (latest_scan or {}).get("date") or ""
latest_scan_session = (latest_scan or {}).get("scan_session") or ""
# 加载市场温度(按 trade_date 取最新交易日)
result = await db.execute(
text(
"SELECT * FROM market_temperature "
"ORDER BY CASE WHEN COALESCE(up_count, 0) + COALESCE(down_count, 0) > 0 THEN 0 ELSE 1 END, "
"REPLACE(trade_date, '-', '') DESC, id DESC LIMIT 1"
)
)
mt_row = result.fetchone()
market_temp = None
if mt_row:
m = mt_row._mapping
market_temp = MarketTemperature(
trade_date=m["trade_date"],
up_count=m["up_count"],
down_count=m["down_count"],
limit_up_count=m["limit_up_count"],
limit_down_count=m["limit_down_count"],
max_streak=m["max_streak"],
broken_rate=m["broken_rate"],
temperature=m["temperature"],
)
if latest_scan:
recommendation_sql = (
"SELECT * FROM recommendations "
"WHERE date(created_at) = :target_date "
"AND (:scan_session = '' OR scan_session = :scan_session) "
"AND (action_plan IN ('可操作', '重点关注') OR score >= 56) "
"AND ("
" COALESCE(recall_tags, '[]') LIKE '%hot_theme_core%' "
" OR COALESCE(recall_tags, '[]') LIKE '%theme_leader%' "
" OR COALESCE(recall_tags, '[]') LIKE '%top_theme_member%' "
" OR COALESCE(recall_tags, '[]') LIKE '%sector_recall%'"
") "
"AND id IN ("
" SELECT MAX(id) FROM recommendations "
" WHERE date(created_at) = :target_date "
" AND (:scan_session = '' OR scan_session = :scan_session) "
" GROUP BY ts_code"
") "
"ORDER BY score DESC"
)
result = await db.execute(
text(recommendation_sql),
{"target_date": latest_scan_date, "scan_session": latest_scan_session},
)
else:
# 兼容旧库:没有扫描日志时,才回退到最近一个有推荐的日期。
result = await db.execute(
text("SELECT * FROM recommendations "
"WHERE date(created_at) = (SELECT date(created_at) FROM recommendations ORDER BY created_at DESC LIMIT 1) "
"AND (action_plan IN ('可操作', '重点关注') OR score >= 56) "
"AND ("
" COALESCE(recall_tags, '[]') LIKE '%hot_theme_core%' "
" OR COALESCE(recall_tags, '[]') LIKE '%theme_leader%' "
" OR COALESCE(recall_tags, '[]') LIKE '%top_theme_member%' "
" OR COALESCE(recall_tags, '[]') LIKE '%sector_recall%'"
") "
"AND id IN (SELECT MAX(id) FROM recommendations "
" WHERE date(created_at) = (SELECT date(created_at) FROM recommendations ORDER BY created_at DESC LIMIT 1) "
" GROUP BY ts_code) "
"ORDER BY score DESC")
)
rows = result.fetchall()
recommendations = []
for row in rows:
r = row._mapping
recommendations.append(Recommendation(
ts_code=r["ts_code"],
name=r["name"],
sector=r["sector"] or "",
score=r["score"] or 0,
market_temp_score=r["market_temp_score"] or 0,
sector_score=r["sector_score"] or 0,
capital_score=r["capital_score"] or 0,
technical_score=r["technical_score"] or 0,
supply_demand_score=r.get("supply_demand_score") or 0,
price_action_score=r.get("price_action_score") or 0,
position_score=r.get("position_score") or 50,
valuation_score=r.get("valuation_score") or 50,
signal=r["signal"] or "HOLD",
entry_price=r["entry_price"],
target_price=r["target_price"],
stop_loss=r["stop_loss"],
reasons=json.loads(r["reasons"]) if r["reasons"] else [],
risk_note=r.get("risk_note") or "",
entry_timing=r.get("entry_timing") or "",
action_plan=r.get("action_plan") or "观察",
trigger_condition=r.get("trigger_condition") or "",
invalidation_condition=r.get("invalidation_condition") or "",
suggested_position_pct=r.get("suggested_position_pct") or 0,
review_after_days=r.get("review_after_days") or 3,
lifecycle_status=r.get("lifecycle_status") or "candidate",
data_freshness=r.get("data_freshness") or "",
llm_analysis=r.get("llm_analysis") or "",
strategy=r.get("strategy") or "trend_breakout",
entry_signal_type=r.get("entry_signal_type") or "none",
llm_score=r.get("llm_score"),
recall_tags=json.loads(r.get("recall_tags") or "[]"),
prefilter_decision=r.get("prefilter_decision") or "",
prefilter_reason=r.get("prefilter_reason") or "",
focus_points=json.loads(r.get("focus_points") or "[]"),
decision_trace=_safe_json_dict(r.get("decision_trace")) or _build_legacy_decision_trace(r),
scan_session=r["scan_session"] or "",
created_at=r["created_at"],
))
strategy_profile = None
if latest_scan:
strategy_profile = await _load_latest_strategy_profile_for_session(db, latest_scan_session)
if not strategy_profile and recommendations:
strategy_profile = get_strategy_profile_by_id(recommendations[0].strategy).model_dump()
return {
"market_temp": market_temp,
"hot_sectors": [],
"capital_filtered": [],
"recommendations": recommendations,
"strategy_profile": strategy_profile,
"latest_scan": latest_scan,
"scan_mode": (latest_scan or {}).get("scan_mode", "unknown"),
}
except Exception as e:
logger.error(f"从数据库加载推荐失败: {e}")
from app.db.error_logger import log_error
await log_error("recommender", f"从数据库加载推荐失败: {e}", detail=traceback.format_exc())
return {"market_temp": None, "hot_sectors": [], "capital_filtered": [], "recommendations": [], "latest_scan": None}
async def _load_sectors_from_db() -> list[SectorInfo]:
"""从数据库加载最近的板块热度数据"""
try:
async with get_db() as db:
from sqlalchemy import text
result = await db.execute(
text(
"SELECT * FROM sector_heat "
"WHERE REPLACE(trade_date, '-', '') = ("
" SELECT MAX(REPLACE(trade_date, '-', '')) FROM sector_heat"
") "
"AND id IN ("
" SELECT MAX(id) FROM sector_heat "
" WHERE REPLACE(trade_date, '-', '') = ("
" SELECT MAX(REPLACE(trade_date, '-', '')) FROM sector_heat"
" ) "
" GROUP BY sector_code"
") "
"ORDER BY heat_score DESC"
)
)
rows = result.fetchall()
sectors = []
for row in rows:
r = row._mapping
# Parse JSON fields with fallback
leading_stocks = json.loads(r.get("leading_stocks") or "[]")
pct_trend = json.loads(r.get("pct_trend") or "[]")
theme_aliases = json.loads(r.get("theme_aliases") or "[]")
sectors.append(SectorInfo(
sector_code=r["sector_code"],
sector_name=r["sector_name"],
board_type=r.get("board_type") or "snapshot",
theme_id=r.get("theme_id") or "",
theme_name=r.get("theme_name") or r["sector_name"],
theme_aliases=theme_aliases,
trade_date=r.get("trade_date") or "",
pct_change=r["pct_change"] or 0,
capital_inflow=r["capital_inflow"] or 0,
limit_up_count=r["limit_up_count"] or 0,
days_continuous=r.get("days_continuous") or 0,
heat_score=r["heat_score"] or 0,
stage=r.get("stage") or "mid",
member_count=r.get("member_count") or 0,
leading_stocks=leading_stocks,
pct_trend=pct_trend,
turnover_avg=r.get("turnover_avg") or 0,
main_force_ratio=r.get("main_force_ratio") or 0,
source="snapshot",
))
return sectors
except Exception as e:
logger.error(f"从数据库加载板块数据失败: {e}")
from app.db.error_logger import log_error
await log_error("recommender", f"从数据库加载板块数据失败: {e}", detail=traceback.format_exc())
return []