astock-agent/backend/app/research/market_agent.py
2026-06-10 08:36:25 +08:00

52 lines
2.0 KiB
Python

"""Market regime research agent."""
from __future__ import annotations
from typing import Any
def build_market_view(market_temp: Any, strategy_profile: dict | None = None) -> dict:
temp = float(getattr(market_temp, "temperature", 0) or 0)
up = int(getattr(market_temp, "up_count", 0) or 0)
down = int(getattr(market_temp, "down_count", 0) or 0)
limit_up = int(getattr(market_temp, "limit_up_count", 0) or 0)
limit_down = int(getattr(market_temp, "limit_down_count", 0) or 0)
breadth_total = max(up + down, 1)
up_ratio = up / breadth_total
if temp >= 70 and limit_up >= 60:
regime = "bullish_mainline"
summary = "市场情绪偏强,主线进攻窗口打开。"
elif temp >= 55 and up_ratio >= 0.52:
regime = "bullish_rotation"
summary = "市场偏强但仍需确认主线持续性,适合围绕前排轮动。"
elif temp >= 40:
regime = "range_rotation"
summary = "市场震荡轮动,优先等待回踩和承接确认。"
elif limit_down > max(limit_up, 1):
regime = "risk_off"
summary = "市场风险偏好较弱,优先防守和降低出手频率。"
else:
regime = "defensive_watch"
summary = "市场温度偏低,保留观察,不主动扩大仓位。"
confidence = min(0.95, max(0.45, abs(temp - 50) / 60 + abs(up_ratio - 0.5) + 0.45))
stance = (strategy_profile or {}).get("market_stance") or ""
if stance:
summary = f"{summary}{stance}策略生效。"
return {
"regime": regime,
"confidence": round(confidence, 2),
"summary": summary,
"temperature": round(temp, 1),
"up_count": up,
"down_count": down,
"limit_up_count": limit_up,
"limit_down_count": limit_down,
"source": getattr(market_temp, "source", "snapshot"),
"data_status": getattr(market_temp, "data_status", "fresh"),
"source_detail": getattr(market_temp, "source_detail", ""),
"limit_counts_reliable": bool(getattr(market_temp, "limit_counts_reliable", False)),
}