astock-agent/backend/app/api/recommendations.py
2026-04-22 22:19:29 +08:00

178 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""推荐列表 API"""
import asyncio
import logging
import traceback
from datetime import datetime
from fastapi import APIRouter, Depends
from app.engine.recommender import (
refresh_recommendations,
get_latest_recommendations,
get_recommendation_history,
get_performance_stats,
)
from app.config import is_trading_hours, should_prefer_realtime_today
from app.data.tushare_client import tushare_client
from app.core.deps import get_current_admin
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/recommendations", tags=["recommendations"])
@router.get("/latest")
async def get_latest():
"""获取最新推荐列表"""
result = await get_latest_recommendations()
mt = result.get("market_temp")
return {
"market_temperature": {
"trade_date": mt.trade_date if mt else "",
"temperature": mt.temperature if mt else 0,
"up_count": mt.up_count if mt else 0,
"down_count": mt.down_count if mt else 0,
"limit_up_count": mt.limit_up_count if mt else 0,
"limit_down_count": mt.limit_down_count if mt else 0,
"max_streak": mt.max_streak if mt else 0,
"broken_rate": mt.broken_rate if mt else 0,
"index_above_ma20": mt.index_above_ma20 if mt else False,
} if mt else None,
"recommendations": [
{
"ts_code": r.ts_code,
"name": r.name,
"sector": r.sector,
"score": r.score,
"level": r.level,
"signal": r.signal,
"market_temp_score": r.market_temp_score,
"sector_score": r.sector_score,
"capital_score": r.capital_score,
"technical_score": r.technical_score,
"supply_demand_score": r.supply_demand_score,
"price_action_score": r.price_action_score,
"position_score": r.position_score,
"valuation_score": r.valuation_score,
"entry_price": r.entry_price,
"target_price": r.target_price,
"stop_loss": r.stop_loss,
"reasons": r.reasons,
"risk_note": r.risk_note,
"llm_analysis": r.llm_analysis,
"entry_timing": r.entry_timing,
"action_plan": r.action_plan,
"trigger_condition": r.trigger_condition,
"invalidation_condition": r.invalidation_condition,
"suggested_position_pct": r.suggested_position_pct,
"review_after_days": r.review_after_days,
"lifecycle_status": r.lifecycle_status,
"data_freshness": r.data_freshness,
"llm_score": r.llm_score,
"recall_tags": r.recall_tags,
"prefilter_decision": r.prefilter_decision,
"prefilter_reason": r.prefilter_reason,
"focus_points": r.focus_points,
"strategy": r.strategy,
"entry_signal_type": r.entry_signal_type,
"scan_session": r.scan_session,
"created_at": r.created_at.isoformat() if r.created_at else None,
}
for r in result.get("recommendations", [])
],
"scan_mode": result.get("scan_mode", "unknown"),
"strategy_profile": result.get("strategy_profile"),
}
@router.post("/refresh")
async def refresh(scan_session: str = "manual", _admin: dict = Depends(get_current_admin)):
"""手动触发一次全量筛选(后台执行,立即返回)"""
from app.engine.recommender import _scan_running, _scan_lock
if _scan_running:
return {
"status": "already_running",
"message": "扫描正在执行中,请稍候",
"is_trading": is_trading_hours(),
}
# 在后台执行扫描,立即返回响应
asyncio.create_task(_run_scan_background(scan_session))
return {
"status": "scanning",
"message": "扫描已启动,完成后自动刷新",
"is_trading": is_trading_hours(),
}
async def _run_scan_background(scan_session: str):
"""后台执行扫描并推送结果"""
from app.engine.recommender import refresh_recommendations
from app.api.websocket import broadcast_update
try:
result = await refresh_recommendations(scan_session=scan_session)
rec_count = len(result.get("recommendations", []))
mt = result.get("market_temp")
# 通过 WebSocket 推送扫描完成通知
await broadcast_update({
"type": "scan_update",
"session": scan_session,
"count": rec_count,
"temperature": mt.temperature if mt else 0,
"scan_mode": result.get("scan_mode", "unknown"),
"timestamp": datetime.now().isoformat(),
})
except Exception as e:
logger.error(f"后台扫描失败: {e}")
from app.db.error_logger import log_error
await log_error("recommender_api", f"后台扫描失败: {e}", detail=traceback.format_exc())
await broadcast_update({
"type": "scan_error",
"session": scan_session,
"message": str(e),
"timestamp": datetime.now().isoformat(),
})
@router.post("/update-tracking")
async def update_tracking(_admin: dict = Depends(get_current_admin)):
"""独立更新推荐跟踪数据(不触发新扫描,盘中可单独调用)"""
from app.engine.recommender import _update_tracking
await _update_tracking()
stats = await get_performance_stats()
return {
"status": "ok",
"tracked": stats.get("tracked", 0),
"win_rate": stats.get("win_rate", 0),
"avg_return": stats.get("avg_return", 0),
}
@router.get("/status")
async def get_scan_status():
"""获取当前扫描状态信息"""
latest_trade_date = tushare_client.get_latest_trade_date()
prefer_realtime = should_prefer_realtime_today(latest_trade_date)
return {
"is_trading": is_trading_hours(),
"scan_mode": "intraday" if prefer_realtime else "post_market",
"description": "今日实时分析优先" if prefer_realtime else "盘后分析Tushare日级数据",
}
@router.get("/history")
async def get_history(days: int = 7):
"""获取历史推荐(按日期分组)"""
return await get_recommendation_history(days)
@router.get("/performance")
async def performance():
"""获取推荐胜率统计"""
return await get_performance_stats()