diff --git a/backend/app/api/debug.py b/backend/app/api/debug.py index 6f96005d..0717c981 100644 --- a/backend/app/api/debug.py +++ b/backend/app/api/debug.py @@ -1,5 +1,6 @@ """Debug API — 系统日志与运行状态""" +import json import os from datetime import datetime, timedelta from fastapi import APIRouter, Depends @@ -33,7 +34,7 @@ async def get_errors( conditions.append("level = :level") params["level"] = level - where = " AND " + " AND ".join(conditions) + where = " AND ".join(conditions) # 总数 count_result = await db.execute( @@ -110,7 +111,7 @@ async def system_status(_admin: dict = Depends(get_current_admin)): tables_counts = {} for t in ["recommendations", "sector_heat", "market_temperature", "recommendation_tracking", "stock_diagnoses", - "error_logs", "users"]: + "error_logs", "scan_process_logs", "users"]: result = await db.execute(text(f"SELECT COUNT(*) FROM {t}")) tables_counts[t] = result.scalar() or 0 @@ -146,3 +147,354 @@ async def system_status(_admin: dict = Depends(get_current_admin)): "tables_counts": tables_counts, "db_size_mb": db_size_mb, } + + +def _decode_detail(raw: str | None) -> dict: + if not raw: + return {} + try: + parsed = json.loads(raw) + return parsed if isinstance(parsed, dict) else {"value": parsed} + except Exception: + return {"raw": raw} + + +def _row_observation(row) -> dict: + r = row._mapping + return { + "id": r["id"], + "scan_session": r["scan_session"], + "scan_mode": r["scan_mode"] or "", + "ts_code": r["ts_code"], + "name": r["name"], + "theme_name": r["theme_name"] or "", + "stock_role": r["stock_role"] or "", + "action_plan": r["action_plan"] or "观察", + "final_score": r["final_score"] or 0, + "catalyst_score": r["catalyst_score"] or 0, + "theme_money_score": r["theme_money_score"] or 0, + "stock_money_score": r["stock_money_score"] or 0, + "emotion_role_score": r["emotion_role_score"] or 0, + "timing_score": r["timing_score"] or 0, + "entry_signal_type": r["entry_signal_type"] or "none", + "elimination_reason": r["elimination_reason"] or "", + "detail": _decode_detail(r["detail_json"]), + "created_at": str(r["created_at"]) if r["created_at"] else "", + } + + +@router.get("/scan-logs") +async def get_scan_logs( + limit: int = 100, + scan_session: str = None, + days: int = 7, + _admin: dict = Depends(get_current_admin), +): + """获取筛选过程日志(管理员)""" + start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + limit = max(1, min(limit, 300)) + + async with get_db() as db: + selected_session = scan_session + if not selected_session: + latest = await db.execute( + text( + "SELECT scan_session FROM scan_process_logs " + "WHERE created_at >= :start " + "ORDER BY created_at DESC LIMIT 1" + ), + {"start": start}, + ) + selected_session = latest.scalar() + + if not selected_session: + return {"scan_session": None, "logs": []} + + result = await db.execute( + text( + "SELECT id, scan_session, scan_mode, stage, stage_label, status, " + "input_count, output_count, filtered_count, summary, detail_json, created_at " + "FROM scan_process_logs " + "WHERE created_at >= :start AND scan_session = :scan_session " + "ORDER BY created_at ASC LIMIT :limit" + ), + {"start": start, "scan_session": selected_session, "limit": limit}, + ) + rows = result.fetchall() + + logs = [] + for row in rows: + r = row._mapping + logs.append({ + "id": r["id"], + "scan_session": r["scan_session"], + "scan_mode": r["scan_mode"] or "", + "stage": r["stage"], + "stage_label": r["stage_label"], + "status": r["status"] or "ok", + "input_count": r["input_count"] or 0, + "output_count": r["output_count"] or 0, + "filtered_count": r["filtered_count"] or 0, + "summary": r["summary"] or "", + "detail": _decode_detail(r["detail_json"]), + "created_at": str(r["created_at"]) if r["created_at"] else "", + }) + + return { + "scan_session": selected_session, + "logs": logs, + } + + +@router.get("/research-observations") +async def get_research_observations( + scan_session: str = None, + limit: int = 80, + days: int = 7, + _admin: dict = Depends(get_current_admin), +): + """获取候选股投研观察记录(管理员)""" + start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + limit = max(1, min(limit, 200)) + + async with get_db() as db: + selected_session = scan_session + if not selected_session: + latest = await db.execute( + text( + "SELECT scan_session FROM research_observations " + "WHERE created_at >= :start ORDER BY created_at DESC LIMIT 1" + ), + {"start": start}, + ) + selected_session = latest.scalar() + + if not selected_session: + return {"scan_session": None, "observations": [], "reason_counts": {}} + + result = await db.execute( + text( + "SELECT id, scan_session, scan_mode, ts_code, name, theme_name, stock_role, " + "action_plan, final_score, catalyst_score, theme_money_score, stock_money_score, " + "emotion_role_score, timing_score, entry_signal_type, elimination_reason, detail_json, created_at " + "FROM research_observations " + "WHERE created_at >= :start AND scan_session = :scan_session " + "ORDER BY final_score DESC LIMIT :limit" + ), + {"start": start, "scan_session": selected_session, "limit": limit}, + ) + rows = result.fetchall() + + observations = [_row_observation(row) for row in rows] + reason_counts = {} + for item in observations: + for part in (item["elimination_reason"] or "未知").split(";"): + if not part: + continue + reason_counts[part] = reason_counts.get(part, 0) + 1 + + return { + "scan_session": selected_session, + "observations": observations, + "reason_counts": reason_counts, + } + + +@router.get("/scan-sessions") +async def get_scan_sessions( + days: int = 7, + limit: int = 30, + _admin: dict = Depends(get_current_admin), +): + """获取筛选会话摘要(管理员)""" + start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + limit = max(1, min(limit, 100)) + + async with get_db() as db: + result = await db.execute( + text( + "SELECT scan_session, scan_mode, stage, status, input_count, " + "output_count, filtered_count, summary, created_at " + "FROM scan_process_logs " + "WHERE created_at >= :start " + "ORDER BY created_at DESC LIMIT 1000" + ), + {"start": start}, + ) + rows = result.fetchall() + + sessions = {} + order = [] + for row in rows: + r = row._mapping + session_id = r["scan_session"] + if session_id not in sessions: + sessions[session_id] = { + "scan_session": session_id, + "scan_mode": r["scan_mode"] or "", + "created_at": str(r["created_at"]) if r["created_at"] else "", + "stage_count": 0, + "status": "ok", + "input_count": 0, + "final_count": 0, + "drop_count": 0, + "last_summary": r["summary"] or "", + } + order.append(session_id) + + item = sessions[session_id] + item["stage_count"] += 1 + item["input_count"] = max(item["input_count"], int(r["input_count"] or 0)) + item["drop_count"] += int(r["filtered_count"] or 0) + if r["stage"] == "final_filter" and item["final_count"] == 0: + item["final_count"] = int(r["output_count"] or 0) + item["last_summary"] = r["summary"] or item["last_summary"] + + status = (r["status"] or "ok").lower() + if status in {"failed", "error", "critical"}: + item["status"] = "failed" + elif status in {"warning", "empty"} and item["status"] == "ok": + item["status"] = status + + return { + "sessions": [sessions[sid] for sid in order[:limit]], + } + + +@router.get("/data-source-health") +async def get_data_source_health( + days: int = 7, + _admin: dict = Depends(get_current_admin), +): + """数据源健康摘要(管理员,只读)。""" + start = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + known_sources = ["eastmoney", "tencent", "tushare", "akshare", "sina", "news", "tushare_news"] + health = { + source: { + "source": source, + "status": "ok", + "error_count": 0, + "warning_count": 0, + "last_error": "", + "last_seen_at": "", + } + for source in known_sources + } + + async with get_db() as db: + result = await db.execute( + text( + "SELECT source, level, message, created_at FROM error_logs " + "WHERE created_at >= :start " + "ORDER BY created_at DESC LIMIT 500" + ), + {"start": start}, + ) + rows = result.fetchall() + + table_rows = {} + for table_name, sql in { + "market_temperature": "SELECT trade_date, created_at FROM market_temperature ORDER BY id DESC LIMIT 1", + "sector_heat": "SELECT trade_date, created_at FROM sector_heat ORDER BY id DESC LIMIT 1", + "recommendations": "SELECT created_at FROM recommendations ORDER BY id DESC LIMIT 1", + "news_items": "SELECT created_at FROM news_items ORDER BY id DESC LIMIT 1", + "catalysts": "SELECT created_at FROM catalysts ORDER BY id DESC LIMIT 1", + }.items(): + row = (await db.execute(text(sql))).fetchone() + table_rows[table_name] = dict(row._mapping) if row else {} + + for row in rows: + r = row._mapping + source_text = str(r["source"] or "").lower() + matched = next((source for source in known_sources if source in source_text), source_text or "unknown") + if matched not in health: + health[matched] = { + "source": matched, + "status": "ok", + "error_count": 0, + "warning_count": 0, + "last_error": "", + "last_seen_at": "", + } + item = health[matched] + level_text = str(r["level"] or "error").lower() + if level_text in {"warning", "warn"}: + item["warning_count"] += 1 + if item["status"] == "ok": + item["status"] = "warning" + else: + item["error_count"] += 1 + item["status"] = "error" + if not item["last_error"]: + item["last_error"] = r["message"] or "" + item["last_seen_at"] = str(r["created_at"] or "") + + return { + "days": days, + "sources": sorted(health.values(), key=lambda item: (item["status"] != "error", item["status"] != "warning", item["source"])), + "freshness": { + key: {k: str(v or "") for k, v in value.items()} + for key, value in table_rows.items() + }, + "generated_at": datetime.now().isoformat(), + } + + +@router.get("/tasks") +async def get_tasks(_admin: dict = Depends(get_current_admin)): + """后台任务中心摘要(管理员,只读)。""" + from app.engine.scheduler import scheduler + from app.engine.recommender import _scan_running, _scan_lock + + jobs = [] + for job in scheduler.get_jobs(): + jobs.append({ + "id": job.id, + "name": job.name, + "next_run_time": str(job.next_run_time) if job.next_run_time else "", + "trigger": str(job.trigger), + }) + + async with get_db() as db: + recent_scan = await db.execute( + text( + "SELECT scan_session, scan_mode, stage, status, output_count, summary, created_at " + "FROM scan_process_logs ORDER BY created_at DESC LIMIT 12" + ) + ) + recent_errors = await db.execute( + text( + "SELECT source, level, message, created_at FROM error_logs " + "ORDER BY created_at DESC LIMIT 8" + ) + ) + + return { + "scheduler_running": scheduler.running, + "scan_running": _scan_running, + "scan_locked": _scan_lock.locked(), + "job_count": len(jobs), + "jobs": sorted(jobs, key=lambda item: item["next_run_time"] or "9999"), + "recent_scan_logs": [ + { + "scan_session": r._mapping["scan_session"], + "scan_mode": r._mapping["scan_mode"] or "", + "stage": r._mapping["stage"], + "status": r._mapping["status"] or "ok", + "output_count": r._mapping["output_count"] or 0, + "summary": r._mapping["summary"] or "", + "created_at": str(r._mapping["created_at"] or ""), + } + for r in recent_scan.fetchall() + ], + "recent_errors": [ + { + "source": r._mapping["source"], + "level": r._mapping["level"], + "message": r._mapping["message"], + "created_at": str(r._mapping["created_at"] or ""), + } + for r in recent_errors.fetchall() + ], + "generated_at": datetime.now().isoformat(), + } diff --git a/backend/app/db/database.py b/backend/app/db/database.py index 5c00bcd2..f556f3ff 100644 --- a/backend/app/db/database.py +++ b/backend/app/db/database.py @@ -113,6 +113,11 @@ async def init_db(): "CREATE UNIQUE INDEX IF NOT EXISTS idx_news_items_dedup_key ON news_items(dedup_key)", "CREATE INDEX IF NOT EXISTS idx_news_items_status_time ON news_items(status, published_at)", "CREATE INDEX IF NOT EXISTS idx_catalysts_source_url ON catalysts(source, url)", + "CREATE INDEX IF NOT EXISTS idx_scan_process_session_time ON scan_process_logs(scan_session, created_at)", + "CREATE INDEX IF NOT EXISTS idx_scan_process_stage_time ON scan_process_logs(stage, created_at)", + "CREATE INDEX IF NOT EXISTS idx_research_observations_session_score ON research_observations(scan_session, final_score)", + "CREATE INDEX IF NOT EXISTS idx_research_observations_code_time ON research_observations(ts_code, created_at)", + "CREATE INDEX IF NOT EXISTS idx_research_observations_theme_time ON research_observations(theme_name, created_at)", ]: try: await conn.execute(__import__("sqlalchemy").text(index_sql)) diff --git a/backend/app/db/research_logger.py b/backend/app/db/research_logger.py new file mode 100644 index 00000000..816face8 --- /dev/null +++ b/backend/app/db/research_logger.py @@ -0,0 +1,56 @@ +"""投研观察记录。""" + +from __future__ import annotations + +import json +from datetime import datetime +from typing import Any + +from app.db.database import get_db +from app.db import tables + + +def _safe_json(data: dict[str, Any] | None) -> str: + if not data: + return "{}" + try: + return json.dumps(data, ensure_ascii=False, default=str) + except Exception: + return "{}" + + +async def save_research_observations(observations: list[dict[str, Any]]) -> None: + """批量保存本轮候选股投研观察。 + + 记录失败不能影响筛选主流程。 + """ + if not observations: + return + try: + values = [] + now = datetime.now() + for item in observations: + values.append({ + "scan_session": item.get("scan_session") or "manual", + "scan_mode": item.get("scan_mode") or "", + "ts_code": item.get("ts_code") or "", + "name": item.get("name") or item.get("ts_code") or "", + "theme_name": item.get("theme_name") or "", + "stock_role": item.get("stock_role") or "", + "action_plan": item.get("action_plan") or "观察", + "final_score": float(item.get("final_score") or 0), + "catalyst_score": float(item.get("catalyst_score") or 0), + "theme_money_score": float(item.get("theme_money_score") or 0), + "stock_money_score": float(item.get("stock_money_score") or 0), + "emotion_role_score": float(item.get("emotion_role_score") or 0), + "timing_score": float(item.get("timing_score") or 0), + "entry_signal_type": item.get("entry_signal_type") or "none", + "elimination_reason": item.get("elimination_reason") or "", + "detail_json": _safe_json(item.get("detail")), + "created_at": now, + }) + async with get_db() as db: + await db.execute(tables.research_observations_table.insert(), values) + await db.commit() + except Exception: + pass diff --git a/backend/app/db/scan_logger.py b/backend/app/db/scan_logger.py new file mode 100644 index 00000000..ce31590b --- /dev/null +++ b/backend/app/db/scan_logger.py @@ -0,0 +1,60 @@ +"""筛选过程日志持久化。""" + +from __future__ import annotations + +import json +from datetime import datetime +from typing import Any + +from app.db.database import get_db +from app.db import tables + + +def _safe_json(data: dict[str, Any] | None) -> str: + if not data: + return "{}" + try: + return json.dumps(data, ensure_ascii=False, default=str) + except Exception: + return "{}" + + +async def log_scan_stage( + *, + scan_session: str, + scan_mode: str, + stage: str, + stage_label: str, + input_count: int = 0, + output_count: int = 0, + filtered_count: int | None = None, + status: str = "ok", + summary: str = "", + detail: dict[str, Any] | None = None, +) -> None: + """记录筛选漏斗的一关。 + + 这是观测能力,不能反过来影响筛选主流程,所以所有异常都会被吞掉。 + """ + try: + if filtered_count is None: + filtered_count = max(int(input_count or 0) - int(output_count or 0), 0) + async with get_db() as db: + await db.execute( + tables.scan_process_logs_table.insert().values( + scan_session=scan_session or "manual", + scan_mode=scan_mode or "", + stage=stage, + stage_label=stage_label, + status=status, + input_count=int(input_count or 0), + output_count=int(output_count or 0), + filtered_count=int(filtered_count or 0), + summary=summary or "", + detail_json=_safe_json(detail), + created_at=datetime.now(), + ) + ) + await db.commit() + except Exception: + pass diff --git a/backend/app/db/tables.py b/backend/app/db/tables.py index 1245dcd3..8bce7662 100644 --- a/backend/app/db/tables.py +++ b/backend/app/db/tables.py @@ -197,6 +197,44 @@ error_logs_table = Table( Column("created_at", DateTime, server_default=func.now()), ) +scan_process_logs_table = Table( + "scan_process_logs", metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("scan_session", Text, nullable=False), + Column("scan_mode", Text, default=""), + Column("stage", Text, nullable=False), + Column("stage_label", Text, nullable=False), + Column("status", Text, default="ok"), + Column("input_count", Integer, default=0), + Column("output_count", Integer, default=0), + Column("filtered_count", Integer, default=0), + Column("summary", Text, default=""), + Column("detail_json", Text, default="{}"), + Column("created_at", DateTime, server_default=func.now()), +) + +research_observations_table = Table( + "research_observations", metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("scan_session", Text, nullable=False), + Column("scan_mode", Text, default=""), + Column("ts_code", Text, nullable=False), + Column("name", Text, nullable=False), + Column("theme_name", Text, default=""), + Column("stock_role", Text, default=""), + Column("action_plan", Text, default="观察"), + Column("final_score", Float, default=0), + Column("catalyst_score", Float, default=0), + Column("theme_money_score", Float, default=0), + Column("stock_money_score", Float, default=0), + Column("emotion_role_score", Float, default=0), + Column("timing_score", Float, default=0), + Column("entry_signal_type", Text, default="none"), + Column("elimination_reason", Text, default=""), + Column("detail_json", Text, default="{}"), + Column("created_at", DateTime, server_default=func.now()), +) + strategy_configs_table = Table( "strategy_configs", metadata, Column("id", Integer, primary_key=True, autoincrement=True), diff --git a/backend/app/engine/recommender.py b/backend/app/engine/recommender.py index e695ee97..50e6979c 100644 --- a/backend/app/engine/recommender.py +++ b/backend/app/engine/recommender.py @@ -98,7 +98,7 @@ async def refresh_recommendations(trade_date: str = None, scan_session: str = "m try: # run_screening 内部混合了大量同步行情请求和 pandas 计算, # 若直接在主事件循环执行,会导致页面读接口和 WebSocket 被拖住。 - result = await _run_async_in_worker(run_screening, trade_date) + result = await _run_async_in_worker(run_screening, trade_date, scan_session=scan_session) # 给每条推荐添加 scan_session for rec in result.get("recommendations", []): diff --git a/backend/app/engine/screener.py b/backend/app/engine/screener.py index ad5003d9..5c95cccf 100644 --- a/backend/app/engine/screener.py +++ b/backend/app/engine/screener.py @@ -40,6 +40,8 @@ from app.config import settings, should_prefer_realtime_today from app.data.tushare_client import tushare_client from app.llm.strategy_selector import StrategyProfile, select_strategy_profile from app.catalyst.service import build_theme_catalyst_scores +from app.db.scan_logger import log_scan_stage +from app.db.research_logger import save_research_observations logger = logging.getLogger(__name__) @@ -49,7 +51,7 @@ def _is_main_theme_recommendation(rec: Recommendation) -> bool: return bool(tags & {"hot_theme_core", "theme_leader", "top_theme_member", "sector_recall"}) -async def run_screening(trade_date: str = None) -> dict: +async def run_screening(trade_date: str = None, scan_session: str = "manual") -> dict: """执行趋势突破筛选流程 返回: { @@ -79,6 +81,24 @@ async def run_screening(trade_date: str = None) -> dict: logger.info(f"市场温度: {market_temp.temperature}") market_temp_score = market_temp.temperature + await log_scan_stage( + scan_session=scan_session, + scan_mode=scan_mode, + stage="market_temperature", + stage_label="市场温度", + input_count=(market_temp.up_count or 0) + (market_temp.down_count or 0), + output_count=1, + filtered_count=0, + summary=f"市场温度 {market_temp.temperature:.1f},上涨{market_temp.up_count or 0}家,下跌{market_temp.down_count or 0}家", + detail={ + "temperature": market_temp.temperature, + "up_count": market_temp.up_count, + "down_count": market_temp.down_count, + "limit_up_count": market_temp.limit_up_count, + "limit_down_count": market_temp.limit_down_count, + "intraday": intraday, + }, + ) # ── Step 1: 主线主题定位 ── logger.info("=== Step 1: 主线主题定位 ===") @@ -97,6 +117,30 @@ async def run_screening(trade_date: str = None) -> dict: hot_sectors = all_themes[:settings.top_sector_count] hot_sectors = await _apply_catalyst_scores(hot_sectors) + await log_scan_stage( + scan_session=scan_session, + scan_mode=scan_mode, + stage="theme_selection", + stage_label="主线主题", + input_count=len(all_themes), + output_count=len(hot_sectors), + summary=f"从 {len(all_themes)} 个主题中保留 {len(hot_sectors)} 条主线", + detail={ + "themes": [ + { + "name": s.sector_name, + "heat_score": s.heat_score, + "pct_change": s.realtime_pct_change if s.realtime_pct_change is not None else s.pct_change, + "capital_inflow": s.capital_inflow, + "limit_up_count": s.limit_up_count, + "stage": s.stage, + "catalyst_score": getattr(s, "catalyst_score", 0), + "catalyst_count": getattr(s, "catalyst_count", 0), + } + for s in hot_sectors[:10] + ], + }, + ) for s in hot_sectors: logger.info(f" 目标主题: {s.sector_name} 涨幅{s.pct_change}% 资金{s.capital_inflow:.0f}万 " @@ -111,18 +155,53 @@ async def run_screening(trade_date: str = None) -> dict: f"=== 今日策略: {strategy_profile.name} ({strategy_profile.strategy_id}) " f"threshold={strategy_profile.buy_threshold} min_score={strategy_profile.min_score} ===" ) + await log_scan_stage( + scan_session=scan_session, + scan_mode=scan_mode, + stage="strategy_profile", + stage_label="策略参数", + input_count=len(hot_sectors), + output_count=1, + filtered_count=0, + summary=f"{strategy_profile.name}: 买入线 {strategy_profile.buy_threshold},保留线 {strategy_profile.min_score}", + detail=strategy_profile.model_dump(), + ) # ── Step 2: 多路召回构建候选池 ── logger.info("=== Step 2: 多路召回候选池 ===") + candidate_metrics: dict = {} candidates = await _build_candidate_pool( hot_sectors=hot_sectors, trade_date=trade_date, intraday=intraday, market_temp=market_temp, + metrics=candidate_metrics, + ) + await log_scan_stage( + scan_session=scan_session, + scan_mode=scan_mode, + stage="candidate_recall", + stage_label="候选召回", + input_count=len(hot_sectors), + output_count=len(candidates), + filtered_count=max(int(candidate_metrics.get("merged_count", 0) or 0) - len(candidates), 0), + summary=f"多路召回合并后进入规则评分 {len(candidates)} 只", + detail=candidate_metrics, ) if not candidates: logger.info("=== 筛选完成: 0 只股票 ===") + await log_scan_stage( + scan_session=scan_session, + scan_mode=scan_mode, + stage="final_filter", + stage_label="最终作战池", + input_count=0, + output_count=0, + filtered_count=0, + status="empty", + summary="候选池为空,本轮没有形成推荐", + ) return { "market_temp": market_temp, "hot_sectors": hot_sectors, @@ -132,6 +211,9 @@ async def run_screening(trade_date: str = None) -> dict: # ── Step 3 之前:注入腾讯实时价格(防止 Tushare 日线数据过时) ── if candidates: + quote_requested = len([c for c in candidates if "ts_code" in c]) + quote_updated = 0 + quote_error = "" try: from app.data.tencent_client import get_realtime_quotes_batch codes = [c["ts_code"] for c in candidates if "ts_code" in c] @@ -140,19 +222,56 @@ async def run_screening(trade_date: str = None) -> dict: q = quotes.get(c["ts_code"]) if q and q.price > 0: c["price"] = q.price + quote_updated += 1 except Exception as e: + quote_error = str(e) logger.warning(f"注入实时价格失败,使用 Tushare 收盘价: {e}") + await log_scan_stage( + scan_session=scan_session, + scan_mode=scan_mode, + stage="realtime_quote", + stage_label="实时行情校准", + input_count=quote_requested, + output_count=quote_updated, + status="warning" if quote_error else "ok", + summary=f"实时行情更新 {quote_updated}/{quote_requested} 只", + detail={"requested": quote_requested, "updated": quote_updated, "error": quote_error}, + ) # ── Step 3: 规则评分与交易计划 ── logger.info("=== Step 3: 规则评分与交易计划 ===") + scoring_metrics: dict = {} + research_observations: list[dict] = [] recommendations = await _build_recommendations( - candidates, market_temp, hot_sectors, market_temp_score, intraday, strategy_profile, + candidates, + market_temp, + hot_sectors, + market_temp_score, + intraday, + strategy_profile, + metrics=scoring_metrics, + research_observations=research_observations, + scan_session=scan_session, + scan_mode=scan_mode, + ) + await log_scan_stage( + scan_session=scan_session, + scan_mode=scan_mode, + stage="rule_scoring", + stage_label="规则评分", + input_count=len(candidates), + output_count=len(recommendations), + summary=f"完成 {scoring_metrics.get('analyzed_count', len(candidates))} 只规则评分,生成 {len(recommendations)} 个交易计划", + detail=scoring_metrics, ) + before_final_filter = len(recommendations) + final_filter_reasons = _build_final_filter_reasons(recommendations, strategy_profile) recommendations = [ r for r in recommendations if _is_main_theme_recommendation(r) and r.score >= strategy_profile.min_score ] + after_theme_filter = len(recommendations) recommendations = _finalize_battle_plan( recommendations=recommendations, @@ -160,6 +279,45 @@ async def run_screening(trade_date: str = None) -> dict: market_temp=market_temp, strategy_profile=strategy_profile, ) + action_counts = {"可操作": 0, "重点关注": 0, "观察": 0} + for rec in recommendations: + action_counts[rec.action_plan] = action_counts.get(rec.action_plan, 0) + 1 + final_codes = {rec.ts_code for rec in recommendations} + _apply_final_research_outcomes( + observations=research_observations, + final_codes=final_codes, + final_filter_reasons=final_filter_reasons, + min_score=strategy_profile.min_score, + ) + await save_research_observations(research_observations) + await log_scan_stage( + scan_session=scan_session, + scan_mode=scan_mode, + stage="final_filter", + stage_label="最终作战池", + input_count=before_final_filter, + output_count=len(recommendations), + filtered_count=max(before_final_filter - len(recommendations), 0), + status="empty" if len(recommendations) == 0 else "ok", + summary=f"主线与分数过滤后保留 {after_theme_filter} 只,最终作战池 {len(recommendations)} 只", + detail={ + "before_final_filter": before_final_filter, + "after_theme_score_filter": after_theme_filter, + "final_count": len(recommendations), + "action_counts": action_counts, + "elimination_reasons": _count_elimination_reasons(research_observations), + "top": [ + { + "ts_code": r.ts_code, + "name": r.name, + "score": r.score, + "action_plan": r.action_plan, + "entry_signal_type": r.entry_signal_type, + } + for r in recommendations[:10] + ], + }, + ) logger.info(f"=== 筛选完成: {len(recommendations)} 只股票 ({scan_mode}) ===") for r in recommendations[:5]: @@ -371,6 +529,7 @@ async def _build_candidate_pool( trade_date: str | None, intraday: bool, market_temp: MarketTemperature, + metrics: dict | None = None, ) -> list[dict]: """多路召回候选池。 @@ -408,6 +567,7 @@ async def _build_candidate_pool( realtime_candidates = [] _merge_candidate_batch(merged, realtime_candidates, route="realtime_market") else: + intraday_candidates = [] realtime_candidates = [] candidates = list(merged.values()) @@ -425,6 +585,31 @@ async def _build_candidate_pool( f"{'intraday=' + str(len(intraday_candidates)) + ' realtime=' + str(len(realtime_candidates)) if intraday else ''} " f"→ merged={len(top)}" ) + if metrics is not None: + route_counts = { + "sector_recall": len(sector_candidates), + "trend_scan": len(trend_candidates), + "intraday_active": len(intraday_candidates), + "realtime_market": len(realtime_candidates), + } + metrics.update({ + "route_counts": route_counts, + "raw_total": sum(route_counts.values()), + "merged_count": len(candidates), + "pool_limit": settings.candidate_pool_limit, + "output_count": len(top), + "deduplicated_count": max(sum(route_counts.values()) - len(candidates), 0), + "top_candidates": [ + { + "ts_code": item.get("ts_code"), + "name": item.get("name"), + "sector": item.get("sector"), + "recall_score": item.get("recall_score"), + "recall_tags": item.get("recall_tags", []), + } + for item in top[:10] + ], + }) return top @@ -595,6 +780,10 @@ async def _build_recommendations( market_temp_score: float = 0, intraday: bool = False, strategy_profile=None, + metrics: dict | None = None, + research_observations: list[dict] | None = None, + scan_session: str = "manual", + scan_mode: str = "", ) -> list[Recommendation]: """Step 3: 规则边界建模、评分与交易计划生成。""" from app.data.tushare_client import tushare_client @@ -618,6 +807,7 @@ async def _build_recommendations( recommendations = [] total = len(candidates) + skipped_counts = {"missing_code": 0, "kline_empty": 0, "stale_kline": 0, "exception": 0} signal_counts = {"breakout": 0, "breakout_confirm": 0, "pullback": 0, "launch": 0, "reversal": 0, "none": 0} score_weights = strategy_profile.score_weights if strategy_profile else { "catalyst": 0.30, @@ -633,6 +823,7 @@ async def _build_recommendations( for idx, stock in enumerate(candidates): ts_code = stock.get("ts_code", "") if not ts_code: + skipped_counts["missing_code"] += 1 continue name = stock.get("name") or name_map.get(ts_code, ts_code) @@ -642,6 +833,7 @@ async def _build_recommendations( # 获取 120 日 K 线 df = tushare_client.get_stock_daily(ts_code, 120) if df.empty or len(df) < 30: + skipped_counts["kline_empty"] += 1 continue # 数据新鲜度校验:最后一行必须是近 10 天内的数据 @@ -650,6 +842,7 @@ async def _build_recommendations( cutoff = (datetime.now() - timedelta(days=10)).strftime("%Y%m%d") if last_date < cutoff: logger.warning(f"K线数据过时 {ts_code}: 最新={last_date}, 需≥{cutoff}, 跳过") + skipped_counts["stale_kline"] += 1 continue # 添加技术指标 @@ -979,9 +1172,29 @@ async def _build_recommendations( decision_trace=decision_trace, ) recommendations.append(rec) + if research_observations is not None: + research_observations.append(_build_research_observation( + scan_session=scan_session, + scan_mode=scan_mode, + stock=stock, + rec=rec, + scoring_axes=scoring_axes, + flow_momentum_score=flow_momentum_score, + entry_signal_score=entry_signal.get("signal_score", 0), + sector_stage=sector_stage, + sector_limit_up=sector_limit_up, + catalyst_reasons=catalyst_reasons, + hot_theme_match=hot_theme_match, + market_temp=market_temp, + score_weights=score_weights, + boosts=boosts, + penalties=penalty_notes, + risk_tags=risk_tags, + )) except Exception as e: logger.debug(f"规则分析 {ts_code} 失败: {e}") + skipped_counts["exception"] += 1 continue logger.info( @@ -993,6 +1206,29 @@ async def _build_recommendations( ) recommendations.sort(key=lambda rec: rec.score, reverse=True) + if metrics is not None: + action_counts = {"可操作": 0, "重点关注": 0, "观察": 0} + for rec in recommendations: + action_counts[rec.action_plan] = action_counts.get(rec.action_plan, 0) + 1 + metrics.update({ + "input_count": total, + "analyzed_count": total - sum(skipped_counts.values()), + "output_count": len(recommendations), + "skipped_counts": skipped_counts, + "signal_counts": signal_counts, + "action_counts_before_final_filter": action_counts, + "score_top": [ + { + "ts_code": rec.ts_code, + "name": rec.name, + "sector": rec.sector, + "score": rec.score, + "action_plan": rec.action_plan, + "entry_signal_type": rec.entry_signal_type, + } + for rec in recommendations[:10] + ], + }) return recommendations @@ -1691,6 +1927,156 @@ def _score_to_level(score: float) -> str: return "回避" +def _derive_stock_role(stock: dict, hot_theme_match: SectorInfo | None) -> str: + tags = set(stock.get("recall_tags", []) or []) + if "theme_leader" in tags: + return "龙头/前排" + if "top_theme_member" in tags: + return "主题前排" + if "intraday_active" in tags or "realtime_active" in tags or "realtime_market" in tags: + return "盘中异动" + if hot_theme_match: + return "主线成分" + return "观察候选" + + +def _build_research_observation( + *, + scan_session: str, + scan_mode: str, + stock: dict, + rec: Recommendation, + scoring_axes: dict[str, float], + flow_momentum_score: float, + entry_signal_score: float, + sector_stage: str, + sector_limit_up: int, + catalyst_reasons: list[str], + hot_theme_match: SectorInfo | None, + market_temp: MarketTemperature, + score_weights: dict[str, float], + boosts: list[dict], + penalties: list[dict], + risk_tags: list[str], +) -> dict: + theme_name = hot_theme_match.sector_name if hot_theme_match else rec.sector + stock_role = _derive_stock_role(stock, hot_theme_match) + detail = { + "market": { + "temperature": round(market_temp.temperature, 1), + "up_count": market_temp.up_count, + "down_count": market_temp.down_count, + "limit_up_count": market_temp.limit_up_count, + "broken_rate": market_temp.broken_rate, + }, + "theme": { + "name": theme_name, + "matched": bool(hot_theme_match), + "stage": sector_stage, + "limit_up_count": sector_limit_up, + "heat_score": rec.sector_score, + "catalyst_reasons": catalyst_reasons[:3], + }, + "stock": { + "role": stock_role, + "recall_score": stock.get("recall_score", 0), + "recall_tags": stock.get("recall_tags", []), + "main_net_inflow": stock.get("main_net_inflow", 0), + "inflow_ratio": stock.get("inflow_ratio", 0), + "turnover_rate": stock.get("turnover_rate", 0), + "volume_ratio": stock.get("volume_ratio"), + "circ_mv": stock.get("circ_mv"), + }, + "scores": { + "weights": score_weights, + "axes": scoring_axes, + "flow_momentum": flow_momentum_score, + "entry_signal_score": entry_signal_score, + "final_score": rec.score, + }, + "decision": { + "action_plan": rec.action_plan, + "signal": rec.signal, + "entry_signal_type": rec.entry_signal_type, + "trigger_condition": rec.trigger_condition, + "invalidation_condition": rec.invalidation_condition, + "risk_note": rec.risk_note, + "boosts": boosts[:4], + "penalties": penalties[:4], + "risk_tags": risk_tags, + }, + } + return { + "scan_session": scan_session, + "scan_mode": scan_mode, + "ts_code": rec.ts_code, + "name": rec.name, + "theme_name": theme_name, + "stock_role": stock_role, + "action_plan": rec.action_plan, + "final_score": rec.score, + "catalyst_score": scoring_axes.get("catalyst", 0), + "theme_money_score": scoring_axes.get("theme_money", 0), + "stock_money_score": scoring_axes.get("stock_money", 0), + "emotion_role_score": scoring_axes.get("emotion_role", 0), + "timing_score": scoring_axes.get("timing", 0), + "entry_signal_type": rec.entry_signal_type, + "elimination_reason": "", + "detail": detail, + } + + +def _build_final_filter_reasons( + recommendations: list[Recommendation], + strategy_profile: StrategyProfile, +) -> dict[str, str]: + reasons = {} + for rec in recommendations: + reason_parts = [] + if not _is_main_theme_recommendation(rec): + reason_parts.append("非主线候选") + if rec.score < strategy_profile.min_score: + reason_parts.append(f"低于保留线{strategy_profile.min_score:.0f}") + if rec.action_plan == "观察": + reason_parts.append("仅观察档") + elif rec.action_plan == "重点关注": + reason_parts.append("关注未入最终池") + elif rec.action_plan == "可操作": + reason_parts.append("可操作但名额/风控限制") + reasons[rec.ts_code] = ";".join(reason_parts) or "最终名额限制" + return reasons + + +def _apply_final_research_outcomes( + *, + observations: list[dict], + final_codes: set[str], + final_filter_reasons: dict[str, str], + min_score: float, +) -> None: + for item in observations: + ts_code = item.get("ts_code", "") + if ts_code in final_codes: + item["elimination_reason"] = "进入最终作战池" + item.setdefault("detail", {}).setdefault("decision", {})["final_outcome"] = "kept" + continue + reason = final_filter_reasons.get(ts_code) or f"未达到保留线{min_score:.0f}" + item["elimination_reason"] = reason + item.setdefault("detail", {}).setdefault("decision", {})["final_outcome"] = "filtered" + item["detail"]["decision"]["elimination_reason"] = reason + + +def _count_elimination_reasons(observations: list[dict]) -> dict[str, int]: + counts: dict[str, int] = {} + for item in observations: + reason = item.get("elimination_reason") or "未知" + for part in str(reason).split(";"): + if not part: + continue + counts[part] = counts.get(part, 0) + 1 + return counts + + def _generate_reasons( stock: dict, entry_signal: dict, tech: TechnicalSignal | None, df, intraday: bool = False, @@ -2018,4 +2404,3 @@ def _build_trace_evidence( if signal_matches_profile: evidence.append("符合今日策略偏好的入场类型") return evidence[:5] - diff --git a/frontend/src/app/(auth)/data-health/page.tsx b/frontend/src/app/(auth)/data-health/page.tsx new file mode 100644 index 00000000..0a0a473b --- /dev/null +++ b/frontend/src/app/(auth)/data-health/page.tsx @@ -0,0 +1,183 @@ +"use client"; + +import { useCallback, useEffect, useMemo, useState } from "react"; +import { useAuth } from "@/hooks/use-auth"; +import { getDataSourceHealthAPI, type DataSourceHealthResult } from "@/lib/api"; + +export default function DataHealthPage() { + const { user } = useAuth(); + const [days, setDays] = useState(7); + const [data, setData] = useState(null); + const [loading, setLoading] = useState(true); + + const loadData = useCallback(async () => { + setLoading(true); + try { + setData(await getDataSourceHealthAPI(days)); + } finally { + setLoading(false); + } + }, [days]); + + useEffect(() => { + if (user?.role === "admin") loadData(); + }, [user, loadData]); + + const summary = useMemo(() => { + const sources = data?.sources || []; + return { + total: sources.length, + errors: sources.filter((item) => item.status === "error").length, + warnings: sources.filter((item) => item.status === "warning").length, + ok: sources.filter((item) => item.status === "ok").length, + }; + }, [data]); + + if (user?.role !== "admin") { + return ( +
+

需要管理员权限

+
+ ); + } + + return ( +
+
+
+
Data Health
+

数据源健康

+

+ 东方财富、腾讯、Tushare、AKShare 等数据源的错误聚合和本地数据新鲜度。 +

+
+
+ + +
+
+ +
+ + + + +
+ +
+
+
+

数据源状态

+ {data ? formatDateTime(data.generated_at) : "-"} +
+ {loading ? ( +
+ {[1, 2, 3, 4].map((item) =>
)} +
+ ) : !data?.sources.length ? ( +
暂无数据源记录
+ ) : ( +
+ {data.sources.map((item) => ( +
+
{sourceLabel(item.source)}
+ +
+ {item.error_count}错 / {item.warning_count}警 +
+
+ {item.last_error || "最近没有记录到异常"} +
+
+ {formatDateTime(item.last_seen_at)} +
+
+ ))} +
+ )} +
+ + +
+
+ ); +} + +function MetricCard({ label, value, tone = "default" }: { label: string; value: number; tone?: "default" | "ok" | "warning" | "error" }) { + const color = tone === "ok" ? "text-emerald-400" : tone === "warning" ? "text-amber-400" : tone === "error" ? "text-red-400" : "text-text-primary"; + return ( +
+
{label}
+
{value}
+
+ ); +} + +function StatusPill({ status }: { status: string }) { + const className = status === "ok" + ? "border-emerald-500/15 bg-emerald-500/[0.06] text-emerald-400" + : status === "warning" + ? "border-amber-500/15 bg-amber-500/[0.07] text-amber-400" + : "border-red-500/15 bg-red-500/[0.07] text-red-400"; + const label = status === "ok" ? "正常" : status === "warning" ? "警告" : "异常"; + return {label}; +} + +function sourceLabel(source: string) { + const map: Record = { + eastmoney: "东方财富", + tencent: "腾讯行情", + tushare: "Tushare", + akshare: "AKShare", + sina: "新浪行情", + news: "新闻管道", + tushare_news: "Tushare新闻", + }; + return map[source] || source; +} + +function freshnessLabel(key: string) { + const map: Record = { + market_temperature: "市场温度", + sector_heat: "板块热度", + recommendations: "推荐结论", + news_items: "新闻原文", + catalysts: "舆情催化", + }; + return map[key] || key; +} + +function formatDateTime(value: string) { + if (!value) return "-"; + const date = new Date(value); + if (Number.isNaN(date.getTime())) return value.slice(0, 16); + return date.toLocaleString("zh-CN", { month: "2-digit", day: "2-digit", hour: "2-digit", minute: "2-digit" }); +} diff --git a/frontend/src/app/(auth)/ops-logs/page.tsx b/frontend/src/app/(auth)/ops-logs/page.tsx new file mode 100644 index 00000000..a507c212 --- /dev/null +++ b/frontend/src/app/(auth)/ops-logs/page.tsx @@ -0,0 +1,567 @@ +"use client"; + +import { useCallback, useEffect, useMemo, useState } from "react"; +import { useAuth } from "@/hooks/use-auth"; +import { + clearErrorLogsAPI, + getErrorLogsAPI, + getResearchObservationsAPI, + getScanLogsAPI, + getScanSessionsAPI, + getSystemStatusAPI, + type ErrorLog, + type ResearchObservation, + type ScanProcessLog, + type ScanSessionSummary, + type SystemStatus, +} from "@/lib/api"; + +type OpsTab = "funnel" | "errors"; + +const STAGE_ORDER = [ + "market_temperature", + "theme_selection", + "strategy_profile", + "candidate_recall", + "realtime_quote", + "rule_scoring", + "final_filter", +]; + +export default function OpsLogsPage() { + const { user } = useAuth(); + const [tab, setTab] = useState("funnel"); + const [days, setDays] = useState(7); + const [sessions, setSessions] = useState([]); + const [selectedSession, setSelectedSession] = useState(""); + const [scanLogs, setScanLogs] = useState([]); + const [observations, setObservations] = useState([]); + const [reasonCounts, setReasonCounts] = useState>({}); + const [scanLoading, setScanLoading] = useState(true); + + const [errors, setErrors] = useState([]); + const [errorsTotal, setErrorsTotal] = useState(0); + const [sources, setSources] = useState([]); + const [levels, setLevels] = useState([]); + const [source, setSource] = useState(""); + const [level, setLevel] = useState(""); + const [errorsLoading, setErrorsLoading] = useState(false); + const [expandedErrorId, setExpandedErrorId] = useState(null); + const [systemStatus, setSystemStatus] = useState(null); + + const latestSession = sessions[0]; + const activeSession = sessions.find((item) => item.scan_session === selectedSession) || latestSession; + const sortedLogs = useMemo( + () => [...scanLogs].sort((a, b) => STAGE_ORDER.indexOf(a.stage) - STAGE_ORDER.indexOf(b.stage)), + [scanLogs], + ); + const maxCount = Math.max(...sortedLogs.map((item) => Math.max(item.input_count, item.output_count)), 1); + const finalLog = sortedLogs.find((item) => item.stage === "final_filter"); + + const fetchScanData = useCallback(async (session?: string) => { + setScanLoading(true); + try { + const sessionData = await getScanSessionsAPI(days, 30); + setSessions(sessionData.sessions); + const nextSession = session || sessionData.sessions[0]?.scan_session || ""; + setSelectedSession(nextSession); + const logData = await getScanLogsAPI(nextSession || undefined, days, 140); + setScanLogs(logData.logs); + const observationData = await getResearchObservationsAPI(nextSession || undefined, days, 80); + setObservations(observationData.observations); + setReasonCounts(observationData.reason_counts); + } catch { + setScanLogs([]); + setObservations([]); + setReasonCounts({}); + } finally { + setScanLoading(false); + } + }, [days]); + + const fetchErrors = useCallback(async () => { + setErrorsLoading(true); + try { + const result = await getErrorLogsAPI(80, source, level, days); + setErrors(result.errors); + setErrorsTotal(result.total); + setSources(result.sources); + setLevels(result.levels); + } catch { + setErrors([]); + } finally { + setErrorsLoading(false); + } + }, [days, level, source]); + + const fetchSystemStatus = useCallback(async () => { + try { + setSystemStatus(await getSystemStatusAPI()); + } catch { + setSystemStatus(null); + } + }, []); + + useEffect(() => { + if (user?.role === "admin") { + fetchScanData(); + fetchSystemStatus(); + } + }, [user, days, fetchScanData, fetchSystemStatus]); + + useEffect(() => { + if (user?.role === "admin" && tab === "errors") { + fetchErrors(); + } + }, [user, tab, fetchErrors]); + + if (user?.role !== "admin") { + return ( +
+

需要管理员权限

+
+ ); + } + + async function handleSelectSession(session: string) { + setSelectedSession(session); + setScanLoading(true); + try { + const result = await getScanLogsAPI(session, days, 140); + setScanLogs(result.logs); + const observationData = await getResearchObservationsAPI(session, days, 80); + setObservations(observationData.observations); + setReasonCounts(observationData.reason_counts); + } finally { + setScanLoading(false); + } + } + + async function handleClearErrors() { + const result = await clearErrorLogsAPI(30); + await fetchErrors(); + await fetchSystemStatus(); + alert(`已清除 ${result.deleted} 条旧日志`); + } + + return ( +
+
+
+
Ops Center
+

运行日志

+

+ 最新筛选批次、漏斗关口和异常记录集中在这里。 +

+
+
+ + +
+
+ +
+ {[ + { key: "funnel", label: "筛选漏斗" }, + { key: "errors", label: "系统错误" }, + ].map((item) => ( + + ))} +
+ + {tab === "funnel" ? ( +
+ + +
+
+ + + 0 ? "danger" : "muted"} /> + +
+ +
+
+
+

筛选过程可视化

+

输入、输出、过滤量和关键摘要按筛选顺序归档。

+
+ {activeSession?.scan_mode || "-"} +
+ + {scanLoading ? ( +
+ {[1, 2, 3, 4].map((item) =>
)} +
+ ) : sortedLogs.length === 0 ? ( +
+ 暂无筛选过程日志 +
+ ) : ( +
+ {sortedLogs.map((log, index) => ( + + ))} +
+ )} +
+ +
+
+
+

投研观察

+

候选股的主题、资金、角色、入场信号和最终淘汰原因。

+
+ {observations.length} 条记录 +
+
+
+
淘汰原因
+
+ {Object.entries(reasonCounts).slice(0, 8).map(([reason, count]) => ( +
+ {reason} + {count} +
+ ))} + {Object.keys(reasonCounts).length === 0 ? ( +
暂无原因分布
+ ) : null} +
+
+
+ {observations.length === 0 ? ( +
暂无投研观察记录
+ ) : ( +
+ {observations.slice(0, 12).map((item) => ( + + ))} +
+ )} +
+
+
+
+
+ ) : ( +
+
+
+
+

系统错误日志

+

这里保留接口、数据源和后台任务的异常,便于追踪不稳定数据源。

+
+
+ + + + +
+
+
+ +
+
+ 错误记录 + {errorsTotal} 条 +
+ {errorsLoading ? ( +
+ {[1, 2, 3].map((item) =>
)} +
+ ) : errors.length === 0 ? ( +
暂无错误日志
+ ) : ( +
+ {errors.map((item) => ( +
+ + {expandedErrorId === item.id && item.detail ? ( +
+                        {item.detail}
+                      
+ ) : null} +
+ ))} +
+ )} +
+
+ )} +
+ ); +} + +function FunnelStage({ log, index, maxCount }: { log: ScanProcessLog; index: number; maxCount: number }) { + const outputWidth = Math.max(4, Math.round((log.output_count / maxCount) * 100)); + const dropWidth = Math.max(0, Math.round((log.filtered_count / maxCount) * 100)); + const detailItems = extractDetailItems(log); + + return ( +
+
+
+ {index + 1} +
+
+
+

{log.stage_label}

+ + {formatDateTime(log.created_at)} +
+

{log.summary || "暂无摘要"}

+
+
+
+
+
+
+
+
+ + + +
+
+ {detailItems.length > 0 ? ( +
+ {detailItems.map((item) => ( + + {item} + + ))} +
+ ) : null} +
+ ); +} + +function extractDetailItems(log: ScanProcessLog) { + const detail = log.detail || {}; + if (log.stage === "candidate_recall") { + const routes = detail.route_counts as Record | undefined; + return routes ? Object.entries(routes).map(([key, value]) => `${routeLabel(key)} ${value}`) : []; + } + if (log.stage === "rule_scoring") { + const skipped = detail.skipped_counts as Record | undefined; + const signals = detail.signal_counts as Record | undefined; + return [ + ...(skipped ? Object.entries(skipped).filter(([, value]) => value > 0).map(([key, value]) => `${skipLabel(key)} ${value}`) : []), + ...(signals ? Object.entries(signals).filter(([, value]) => value > 0).map(([key, value]) => `${signalLabel(key)} ${value}`) : []), + ].slice(0, 10); + } + if (log.stage === "final_filter") { + const actions = detail.action_counts as Record | undefined; + const reasons = detail.elimination_reasons as Record | undefined; + return [ + ...(actions ? Object.entries(actions).map(([key, value]) => `${key} ${value}`) : []), + ...(reasons ? Object.entries(reasons).slice(0, 6).map(([key, value]) => `${key} ${value}`) : []), + ]; + } + if (log.stage === "theme_selection") { + const themes = detail.themes as Array<{ name?: string; heat_score?: number }> | undefined; + return themes?.slice(0, 5).map((item) => `${item.name || "主题"} ${Math.round(Number(item.heat_score || 0))}`) || []; + } + return []; +} + +function ResearchRow({ item }: { item: ResearchObservation }) { + return ( +
+
+
{item.name}
+
{item.ts_code}
+
+
{item.final_score.toFixed(1)}
+
+
+ {item.theme_name || "未归类"} + {item.stock_role || "候选"} + {signalLabel(item.entry_signal_type)} +
+
{item.elimination_reason || "待确认"}
+
+
+ + + + + +
+
+ ); +} + +function ScorePill({ label, value }: { label: string; value: number }) { + return ( +
+
{label}
+
{Math.round(value)}
+
+ ); +} + +function SummaryCard({ label, value, sub, tone = "muted" }: { label: string; value: string | number; sub: string; tone?: "primary" | "warning" | "danger" | "muted" }) { + const valueClass = tone === "primary" ? "text-amber-400" : tone === "warning" ? "text-amber-300" : tone === "danger" ? "text-red-400" : "text-text-primary"; + return ( +
+
{label}
+
{value}
+
{sub}
+
+ ); +} + +function TinyMetric({ label, value }: { label: string; value: string | number }) { + return ( +
+
{label}
+
{value}
+
+ ); +} + +function StatusPill({ status }: { status: string }) { + const normalized = status.toLowerCase(); + const className = normalized === "ok" + ? "border-emerald-500/15 bg-emerald-500/[0.06] text-emerald-400" + : normalized === "warning" || normalized === "empty" + ? "border-amber-500/15 bg-amber-500/[0.07] text-amber-400" + : "border-red-500/15 bg-red-500/[0.07] text-red-400"; + return ( + + {statusLabel(status)} + + ); +} + +function statusLabel(status: string) { + const normalized = status.toLowerCase(); + if (normalized === "ok") return "正常"; + if (normalized === "warning") return "警告"; + if (normalized === "empty") return "空结果"; + if (normalized === "error") return "错误"; + if (normalized === "critical") return "严重"; + return status || "未知"; +} + +function routeLabel(key: string) { + const map: Record = { + sector_recall: "主线召回", + trend_scan: "趋势召回", + intraday_active: "盘中异动", + realtime_market: "全市场异动", + }; + return map[key] || key; +} + +function skipLabel(key: string) { + const map: Record = { + missing_code: "缺代码", + kline_empty: "K线不足", + stale_kline: "K线过期", + exception: "评分异常", + }; + return map[key] || key; +} + +function signalLabel(key: string) { + const map: Record = { + breakout: "突破", + breakout_confirm: "确认", + pullback: "回踩", + launch: "启动", + reversal: "反转", + none: "无信号", + }; + return map[key] || key; +} + +function shortSession(session: string) { + if (!session) return "-"; + return session.length > 20 ? `${session.slice(0, 10)}...${session.slice(-6)}` : session; +} + +function formatDateTime(value: string) { + if (!value) return "-"; + const date = new Date(value); + if (Number.isNaN(date.getTime())) return value.slice(0, 16); + return date.toLocaleString("zh-CN", { month: "2-digit", day: "2-digit", hour: "2-digit", minute: "2-digit" }); +} diff --git a/frontend/src/app/(auth)/settings/page.tsx b/frontend/src/app/(auth)/settings/page.tsx index 69b86830..9f7b3ba7 100644 --- a/frontend/src/app/(auth)/settings/page.tsx +++ b/frontend/src/app/(auth)/settings/page.tsx @@ -2,6 +2,7 @@ import { useState, useEffect, useCallback } from "react"; import { useAuth } from "@/hooks/use-auth"; +import Link from "next/link"; import { listUsersAPI, disableUserAPI, @@ -11,17 +12,12 @@ import { toggleInviteCodeAPI, getDataStatsAPI, dataResetAPI, - getErrorLogsAPI, - clearErrorLogsAPI, - getSystemStatusAPI, type UserItem, type InviteCodeItem, type DataStats, - type ErrorLog, - type SystemStatus, } from "@/lib/api"; -type Tab = "users" | "data" | "logs"; +type Tab = "users" | "data" | "ops"; export default function UsersPage() { const { user: currentUser } = useAuth(); @@ -51,17 +47,6 @@ export default function UsersPage() { const [resetResultMsg, setResetResultMsg] = useState(null); const [confirmReset, setConfirmReset] = useState(false); - const [logs, setLogs] = useState([]); - const [logsTotal, setLogsTotal] = useState(0); - const [logSources, setLogSources] = useState([]); - const [logLevels, setLogLevels] = useState([]); - const [logFilterSource, setLogFilterSource] = useState(""); - const [logFilterLevel, setLogFilterLevel] = useState(""); - const [logDays, setLogDays] = useState(7); - const [logsLoading, setLogsLoading] = useState(false); - const [expandedLogId, setExpandedLogId] = useState(null); - const [systemStatus, setSystemStatus] = useState(null); - function copyCredential(account: string, password: string) { const text = `邮箱:${account}\n密码:${password}`; navigator.clipboard.writeText(text).then(() => { @@ -101,44 +86,13 @@ export default function UsersPage() { } }, []); - const fetchLogs = useCallback(async () => { - setLogsLoading(true); - try { - const result = await getErrorLogsAPI(50, logFilterSource, logFilterLevel, logDays); - setLogs(result.errors); - setLogsTotal(result.total); - setLogSources(result.sources); - setLogLevels(result.levels); - } catch { - // ignore - } finally { - setLogsLoading(false); - } - }, [logFilterSource, logFilterLevel, logDays]); - - const fetchSystemStatus = useCallback(async () => { - try { - const status = await getSystemStatusAPI(); - setSystemStatus(status); - } catch { - // ignore - } - }, []); - useEffect(() => { if (currentUser?.role === "admin") { fetchUsers(); fetchInviteCodes(); fetchStats(); - fetchSystemStatus(); } - }, [currentUser, fetchUsers, fetchInviteCodes, fetchStats, fetchSystemStatus]); - - useEffect(() => { - if (currentUser?.role === "admin" && tab === "logs") { - fetchLogs(); - } - }, [currentUser, tab, fetchLogs]); + }, [currentUser, fetchUsers, fetchInviteCodes, fetchStats]); if (currentUser?.role !== "admin") { return ( @@ -231,21 +185,10 @@ export default function UsersPage() { } } - async function handleClearLogs() { - try { - const result = await clearErrorLogsAPI(30); - fetchLogs(); - fetchSystemStatus(); - alert(`已清除 ${result.deleted} 条旧日志`); - } catch (err) { - alert(err instanceof Error ? err.message : "清除失败"); - } - } - const tabs: { key: Tab; label: string }[] = [ { key: "users", label: "用户与邀请码" }, { key: "data", label: "数据管理" }, - { key: "logs", label: "系统日志" }, + { key: "ops", label: "运维入口" }, ]; return ( @@ -586,93 +529,19 @@ export default function UsersPage() { )} - {tab === "logs" && ( -
- {systemStatus && ( -
-

系统状态

-
-
-
交易状态
-
- {systemStatus.is_trading ? "交易中" : "已收盘"} -
-
-
-
扫描状态
-
- {systemStatus.scan_running ? "扫描中" : systemStatus.scan_locked ? "锁定中" : "空闲"} -
-
-
-
24h 错误
-
0 ? "text-red-400" : "text-text-secondary"}`}>{systemStatus.recent_errors}
-
-
-
数据库大小
-
{systemStatus.db_size_mb} MB
-
-
-
- )} - -
- - - - - - {logsTotal} 条记录 -
- - {logsLoading ? ( -
- {[1, 2, 3].map((i) =>
)} -
- ) : logs.length === 0 ? ( -
-

暂无错误日志

-

系统运行正常

-
- ) : ( -
- {logs.map((log) => ( -
- - {expandedLogId === log.id && log.detail && ( -
-
{log.detail}
-
- )} -
- ))} -
- )} + {tab === "ops" && ( +
+ {[ + { href: "/ops-logs", title: "运行日志", desc: "筛选漏斗、系统错误和扫描批次" }, + { href: "/data-health", title: "数据源健康", desc: "东方财富、腾讯、Tushare、AKShare 状态" }, + { href: "/tasks", title: "任务中心", desc: "新闻、扫描、复盘、策略校准任务" }, + ].map((item) => ( + +
{item.title}
+
{item.desc}
+
打开
+ + ))}
)}
diff --git a/frontend/src/app/(auth)/tasks/page.tsx b/frontend/src/app/(auth)/tasks/page.tsx new file mode 100644 index 00000000..c5fa38aa --- /dev/null +++ b/frontend/src/app/(auth)/tasks/page.tsx @@ -0,0 +1,211 @@ +"use client"; + +import { useCallback, useEffect, useMemo, useState } from "react"; +import { useAuth } from "@/hooks/use-auth"; +import { getTaskCenterAPI, type TaskCenterResult } from "@/lib/api"; + +export default function TasksPage() { + const { user } = useAuth(); + const [data, setData] = useState(null); + const [loading, setLoading] = useState(true); + + const loadData = useCallback(async () => { + setLoading(true); + try { + setData(await getTaskCenterAPI()); + } finally { + setLoading(false); + } + }, []); + + useEffect(() => { + if (user?.role === "admin") loadData(); + }, [user, loadData]); + + const jobGroups = useMemo(() => { + const jobs = data?.jobs || []; + return { + news: jobs.filter((job) => job.id.startsWith("news")), + scan: jobs.filter((job) => job.id.includes("market") || job.id.includes("morning") || job.id.includes("afternoon") || job.id.includes("late") || job.id.includes("pre_") || job.id.includes("post_") || job.id.includes("close")), + review: jobs.filter((job) => job.id.includes("watchlist") || job.id.includes("strategy")), + }; + }, [data]); + + if (user?.role !== "admin") { + return ( +
+

需要管理员权限

+
+ ); + } + + return ( +
+
+
+
Task Center
+

任务中心

+

+ 后台新闻采集、选股扫描、跟踪复盘和策略校准任务的运行视图。 +

+
+ +
+ +
+ + + + 0 ? "error" : "default"} /> +
+ +
+
+ + + +
+ + +
+
+ ); +} + +function JobGroup({ title, jobs, loading }: { title: string; jobs: TaskCenterResult["jobs"]; loading: boolean }) { + return ( +
+
+

{title}

+ {jobs.length}项 +
+ {loading ? ( +
+ {[1, 2, 3].map((item) =>
)} +
+ ) : jobs.length === 0 ? ( +
暂无任务
+ ) : ( +
+ {jobs.map((job) => ( +
+
+
{jobLabel(job.id)}
+
{job.id}
+
+
{cleanTrigger(job.trigger)}
+
+ {formatDateTime(job.next_run_time)} +
+
+ ))} +
+ )} +
+ ); +} + +function MetricCard({ label, value, tone = "default" }: { label: string; value: string | number; tone?: "default" | "ok" | "warning" | "error" }) { + const color = tone === "ok" ? "text-emerald-400" : tone === "warning" ? "text-amber-400" : tone === "error" ? "text-red-400" : "text-text-primary"; + return ( +
+
{label}
+
{value}
+
+ ); +} + +function StatusPill({ status }: { status: string }) { + const normalized = status.toLowerCase(); + const className = normalized === "ok" + ? "border-emerald-500/15 bg-emerald-500/[0.06] text-emerald-400" + : normalized === "warning" || normalized === "empty" + ? "border-amber-500/15 bg-amber-500/[0.07] text-amber-400" + : "border-red-500/15 bg-red-500/[0.07] text-red-400"; + const label = normalized === "ok" ? "正常" : normalized === "empty" ? "空结果" : normalized === "warning" ? "警告" : "异常"; + return {label}; +} + +function jobLabel(id: string) { + const map: Record = { + news_pre_market: "盘前新闻", + news_morning: "早盘新闻", + news_noon: "午间新闻", + news_afternoon: "午后新闻", + news_post_market: "盘后新闻", + pre_market: "盘前扫描", + post_market: "盘后扫描", + watchlist_analysis: "自选股分析", + strategy_iteration: "策略复盘", + }; + if (map[id]) return map[id]; + if (id.includes("morning")) return "早盘扫描"; + if (id.includes("afternoon")) return "午后扫描"; + if (id.includes("late")) return "尾盘扫描"; + if (id.includes("close")) return "收盘扫描"; + return id; +} + +function stageLabel(stage: string) { + const map: Record = { + market_temperature: "市场温度", + theme_selection: "主线主题", + strategy_profile: "策略参数", + candidate_recall: "候选召回", + realtime_quote: "实时行情", + rule_scoring: "规则评分", + final_filter: "最终作战池", + }; + return map[stage] || stage; +} + +function cleanTrigger(trigger: string) { + return trigger.replace("cron[", "").replace("]", ""); +} + +function formatDateTime(value: string) { + if (!value) return "-"; + const date = new Date(value); + if (Number.isNaN(date.getTime())) return value.slice(0, 16); + return date.toLocaleString("zh-CN", { month: "2-digit", day: "2-digit", hour: "2-digit", minute: "2-digit" }); +} diff --git a/frontend/src/components/nav.tsx b/frontend/src/components/nav.tsx index 56dc4784..58df1337 100644 --- a/frontend/src/components/nav.tsx +++ b/frontend/src/components/nav.tsx @@ -101,6 +101,39 @@ function SettingsIcon() { ); } +function LogsIcon() { + return ( + + + + + + + ); +} + +function HealthIcon() { + return ( + + + + + ); +} + +function TasksIcon() { + return ( + + + + + + + + + ); +} + function SideNavItem({ href, icon, label }: { href: string; icon: React.ReactNode; label: string }) { const pathname = usePathname(); const isActive = pathname === href || (href !== "/dashboard" && pathname.startsWith(href)); @@ -131,10 +164,12 @@ export function SidebarNav() { } label="舆情雷达" /> } label="自选股" /> } label="研究助手" /> - } label="个股诊断" /> {user?.role === "admin" && ( <> } label="策略校准" /> + } label="运行日志" /> + } label="数据源健康" /> + } label="任务中心" /> } label="管理设置" /> )} diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 9c36d3f1..e1da8fda 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -896,6 +896,111 @@ export interface SystemStatus { db_size_mb: number; } +export interface ScanProcessLog { + id: number; + scan_session: string; + scan_mode: string; + stage: string; + stage_label: string; + status: string; + input_count: number; + output_count: number; + filtered_count: number; + summary: string; + detail: Record; + created_at: string; +} + +export interface ScanLogsResult { + scan_session: string | null; + logs: ScanProcessLog[]; +} + +export interface ScanSessionSummary { + scan_session: string; + scan_mode: string; + created_at: string; + stage_count: number; + status: string; + input_count: number; + final_count: number; + drop_count: number; + last_summary: string; +} + +export interface ScanSessionsResult { + sessions: ScanSessionSummary[]; +} + +export interface ResearchObservation { + id: number; + scan_session: string; + scan_mode: string; + ts_code: string; + name: string; + theme_name: string; + stock_role: string; + action_plan: string; + final_score: number; + catalyst_score: number; + theme_money_score: number; + stock_money_score: number; + emotion_role_score: number; + timing_score: number; + entry_signal_type: string; + elimination_reason: string; + detail: Record; + created_at: string; +} + +export interface ResearchObservationsResult { + scan_session: string | null; + observations: ResearchObservation[]; + reason_counts: Record; +} + +export interface DataSourceHealthItem { + source: string; + status: string; + error_count: number; + warning_count: number; + last_error: string; + last_seen_at: string; +} + +export interface DataSourceHealthResult { + days: number; + sources: DataSourceHealthItem[]; + freshness: Record>; + generated_at: string; +} + +export interface TaskCenterJob { + id: string; + name: string; + next_run_time: string; + trigger: string; +} + +export interface TaskCenterResult { + scheduler_running: boolean; + scan_running: boolean; + scan_locked: boolean; + job_count: number; + jobs: TaskCenterJob[]; + recent_scan_logs: Array<{ + scan_session: string; + scan_mode: string; + stage: string; + status: string; + output_count: number; + summary: string; + created_at: string; + }>; + recent_errors: Array<{ source: string; level: string; message: string; created_at: string }>; + generated_at: string; +} + export async function getErrorLogsAPI(limit: number = 50, source?: string, level?: string, days: number = 7): Promise { const params = new URLSearchParams({ limit: String(limit), days: String(days) }); if (source) params.set("source", source); @@ -910,3 +1015,28 @@ export async function clearErrorLogsAPI(days: number = 30): Promise<{ status: st export async function getSystemStatusAPI(): Promise { return fetchAPI("/api/debug/system"); } + +export async function getScanSessionsAPI(days: number = 7, limit: number = 30): Promise { + const params = new URLSearchParams({ days: String(days), limit: String(limit) }); + return fetchAPI(`/api/debug/scan-sessions?${params}`); +} + +export async function getScanLogsAPI(scanSession?: string, days: number = 7, limit: number = 120): Promise { + const params = new URLSearchParams({ days: String(days), limit: String(limit) }); + if (scanSession) params.set("scan_session", scanSession); + return fetchAPI(`/api/debug/scan-logs?${params}`); +} + +export async function getResearchObservationsAPI(scanSession?: string, days: number = 7, limit: number = 80): Promise { + const params = new URLSearchParams({ days: String(days), limit: String(limit) }); + if (scanSession) params.set("scan_session", scanSession); + return fetchAPI(`/api/debug/research-observations?${params}`); +} + +export async function getDataSourceHealthAPI(days: number = 7): Promise { + return fetchAPI(`/api/debug/data-source-health?days=${days}`); +} + +export async function getTaskCenterAPI(): Promise { + return fetchAPI("/api/debug/tasks"); +}