"""市场概览 API""" from datetime import datetime from fastapi import APIRouter, Depends, HTTPException from app.data.cache import cache from app.engine.recommender import get_latest_recommendations from app.config import settings, is_trading_hours, should_prefer_realtime_today, today_trade_date from app.core.deps import get_current_admin router = APIRouter(prefix="/api/market", tags=["market"]) @router.get("/temperature") async def get_temperature(): """获取市场温度快照。页面访问只读数据库,不触发外部行情。""" result = await get_latest_recommendations() mt = result.get("market_temp") if mt: return { "trade_date": mt.trade_date, "temperature": mt.temperature, "up_count": mt.up_count, "down_count": mt.down_count, "limit_up_count": mt.limit_up_count, "limit_down_count": mt.limit_down_count, "max_streak": mt.max_streak, "broken_rate": mt.broken_rate, "index_above_ma20": getattr(mt, "index_above_ma20", False), "is_trading": is_trading_hours(), "data_mode": "daily_snapshot", "limit_counts_reliable": False, } return { "trade_date": "", "temperature": 0, "up_count": 0, "down_count": 0, "limit_up_count": 0, "limit_down_count": 0, "max_streak": 0, "broken_rate": 0, "index_above_ma20": False, "is_trading": is_trading_hours(), } @router.get("/overview") async def get_overview(): """市场概况快照。 页面访问不拉腾讯/Tushare。当前库里还没有指数快照表,先返回空数组。 后续应由扫描任务把指数概览写入本地表后再展示。 """ return [] @router.get("/strategy-board") async def get_strategy_board(): """获取今日市场作战面板(只读,不触发 LLM)""" cache_key = "market:strategy_board:rules" cached = cache.get(cache_key) if cached is not None: return cached from app.llm.strategy_board import build_strategy_board result = await build_strategy_board(include_llm=False) cache.set(cache_key, result, settings.cache_ttl_realtime) return result @router.get("/strategy-iteration") async def get_strategy_iteration(limit: int = 50): """获取策略复盘迭代建议(只读,不触发 LLM)""" cache_key = f"market:strategy_iteration:{limit}:rules" cached = cache.get(cache_key) if cached is not None: return cached from app.llm.strategy_iteration import build_strategy_iteration_report result = await build_strategy_iteration_report(limit=limit, include_llm=False) cache.set(cache_key, result, settings.cache_ttl_realtime) return result @router.get("/strategy-configs") async def get_strategy_configs(_admin: dict = Depends(get_current_admin)): """获取当前策略配置中心状态。""" from app.llm.strategy_config import ( get_active_prompt_configs, get_active_strategy_configs, get_recent_config_changes, ) return { "strategies": await get_active_strategy_configs(), "prompts": await get_active_prompt_configs(), "changes": await get_recent_config_changes(limit=30), } @router.post("/strategy-configs/{strategy_id}/rollback") async def rollback_strategy_config(strategy_id: str, _admin: dict = Depends(get_current_admin)): """回滚某个策略到上一配置版本。""" from app.llm.strategy_config import rollback_strategy_config as rollback try: result = await rollback(strategy_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) cache.delete("market:strategy_board:rules") cache.delete("market:strategy_iteration:80:rules") cache.delete("market:strategy_iteration:50:rules") return {"status": "ok", "strategy": result} @router.get("/ops-status") async def get_ops_status(): """管理员任务中心状态与数据新鲜度(只读,不触发扫描或 LLM)。""" from sqlalchemy import text from app.db.database import get_db from app.engine.recommender import _scan_running async with get_db() as db: rec_row = (await db.execute( text( "SELECT created_at FROM recommendations " "ORDER BY created_at DESC LIMIT 1" ) )).fetchone() tracking_row = (await db.execute( text( "SELECT track_date, created_at FROM recommendation_tracking " "ORDER BY track_date DESC, id DESC LIMIT 1" ) )).fetchone() market_row = (await db.execute( text( "SELECT trade_date, created_at FROM market_temperature " "ORDER BY REPLACE(trade_date, '-', '') DESC, id DESC LIMIT 1" ) )).fetchone() sector_row = (await db.execute( text( "SELECT trade_date, created_at FROM sector_heat " "ORDER BY REPLACE(trade_date, '-', '') DESC, id DESC LIMIT 1" ) )).fetchone() def _fmt_dt(value): return str(value or "") latest_market_date = str(market_row._mapping["trade_date"]) if market_row else "" latest_sector_date = str(sector_row._mapping["trade_date"]) if sector_row else "" latest_tracking_date = str(tracking_row._mapping["track_date"]) if tracking_row else "" today = today_trade_date() sector_lagging = bool(latest_sector_date and latest_sector_date.replace("-", "") < today) market_lagging = bool(latest_market_date and latest_market_date.replace("-", "") < today) return { "scan_running": _scan_running, "scan_mode": "realtime_today" if should_prefer_realtime_today(latest_market_date) else "post_market", "is_trading": is_trading_hours(), "data_freshness": { "market_trade_date": latest_market_date, "sector_trade_date": latest_sector_date, "tracking_trade_date": latest_tracking_date, "last_recommendation_created_at": _fmt_dt(rec_row._mapping["created_at"]) if rec_row else "", "last_tracking_created_at": _fmt_dt(tracking_row._mapping["created_at"]) if tracking_row else "", "last_market_created_at": _fmt_dt(market_row._mapping["created_at"]) if market_row else "", "last_sector_created_at": _fmt_dt(sector_row._mapping["created_at"]) if sector_row else "", "status": "stale" if sector_lagging or market_lagging else "fresh" if latest_market_date else "empty", "message": ( f"板块快照仍停留在 {latest_sector_date},展示层将优先使用今日实时板块榜。" if sector_lagging else f"最新市场日期 {latest_market_date},最近跟踪 {latest_tracking_date or '暂无'}" if latest_market_date else "暂无市场缓存数据,请由管理员触发扫描。" ), "generated_at": datetime.now().isoformat(), }, "actions": [ {"key": "refresh", "label": "立即扫描", "admin_only": True}, {"key": "update_tracking", "label": "更新跟踪", "admin_only": True}, {"key": "generate_strategy_board", "label": "生成策略板", "admin_only": True}, {"key": "generate_strategy_iteration", "label": "生成策略复盘", "admin_only": True}, ], } @router.post("/generate-strategy-board") async def generate_strategy_board(_admin: dict = Depends(get_current_admin)): """管理员手动生成带 LLM 说明的策略看板""" from app.llm.strategy_board import build_strategy_board result = await build_strategy_board(include_llm=True) cache.delete("market:strategy_board:rules") return result @router.post("/generate-strategy-iteration") async def generate_strategy_iteration(limit: int = 50, _admin: dict = Depends(get_current_admin)): """管理员手动生成带 LLM 分析的策略复盘""" from app.llm.strategy_iteration import build_strategy_iteration_report result = await build_strategy_iteration_report(limit=limit, include_llm=True, apply_auto_config=True) cache.delete(f"market:strategy_iteration:{limit}:rules") cache.delete("market:strategy_board:rules") return result