astock-agent/backend/app/llm/strategy_board.py
2026-05-14 11:10:17 +08:00

352 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""市场作战面板
把市场温度、板块、推荐和历史跟踪结果汇总成每天可执行的策略视图。
规则层保证稳定输出LLM 层负责补充解释和迭代建议。
"""
import logging
from app.config import settings, should_prefer_realtime_today, today_trade_date
from app.data.models import (
MarketTemperature,
Recommendation,
SectorInfo,
StrategyBoard,
StrategyFocus,
StrategySectorFocus,
)
logger = logging.getLogger(__name__)
def _sector_pct(sector: SectorInfo) -> float:
return float(sector.realtime_pct_change if sector.realtime_pct_change is not None else sector.pct_change)
def _sector_turnover(sector: SectorInfo) -> float:
return float(sector.realtime_turnover_rate if sector.realtime_turnover_rate is not None else sector.turnover_avg)
def _sector_up_count(sector: SectorInfo) -> int:
return int(sector.realtime_up_count if sector.realtime_up_count is not None else 0)
def _sector_down_count(sector: SectorInfo) -> int:
return int(sector.realtime_down_count if sector.realtime_down_count is not None else 0)
def _sector_breadth(sector: SectorInfo) -> int:
return _sector_up_count(sector) - _sector_down_count(sector)
def _sector_strength_score(sector: SectorInfo) -> float:
strength = _sector_pct(sector) * 12
strength += min(max(_sector_breadth(sector), -30), 30) * 0.6
strength += min(_sector_turnover(sector), 12) * 1.5
strength += min(sector.limit_up_count, 8) * 2.5
strength += min(sector.days_continuous, 5) * 1.5
return round(strength, 1)
async def build_strategy_board(include_llm: bool = False) -> dict:
"""生成今日市场作战面板。"""
from app.engine.recommender import (
get_latest_recommendations,
get_latest_sectors,
get_performance_stats,
)
latest = await get_latest_recommendations()
market_temp = latest.get("market_temp")
recommendations = latest.get("recommendations", [])
sectors = await get_latest_sectors()
performance = await get_performance_stats()
from app.llm.strategy_iteration import build_strategy_iteration_report
iteration_report = await build_strategy_iteration_report(limit=50, include_llm=include_llm)
board = _build_rule_board(market_temp, sectors, recommendations, performance)
board.iteration_report = iteration_report
if iteration_report.get("adjustment_suggestions"):
board.iteration_notes = [
s.get("reason", "")
for s in iteration_report["adjustment_suggestions"][:3]
if s.get("reason")
] or board.iteration_notes
if include_llm and settings.deepseek_api_key:
board.ai_review = await _generate_ai_review(board, recommendations, performance)
if board.ai_review:
board.generated_by = "rules+llm"
return board.model_dump()
def _build_rule_board(
market_temp: MarketTemperature | None,
sectors: list[SectorInfo],
recommendations: list[Recommendation],
performance: dict,
) -> StrategyBoard:
temp = market_temp.temperature if market_temp else 0
raw_trade_date = market_temp.trade_date if market_temp else ""
prefer_realtime = should_prefer_realtime_today(raw_trade_date)
trade_date = today_trade_date() if prefer_realtime else raw_trade_date
data_mode = "realtime_today" if prefer_realtime else "daily_snapshot"
market_regime, risk_level, action_bias, position_suggestion = _classify_market(temp, market_temp)
actionable = [r for r in recommendations if r.action_plan == "可操作"]
watch = [r for r in recommendations if r.action_plan == "重点关注"]
strong_sectors = [
s for s in sectors[:5]
if _sector_pct(s) >= 1.5 and (_sector_breadth(s) > 0 or s.limit_up_count >= 2)
]
avg_score = (
round(sum(r.score for r in recommendations) / len(recommendations), 1)
if recommendations else 0
)
recommended_mode = _choose_strategy_mode(temp, sectors, recommendations)
strategy_focus = _build_strategy_focus(temp, sectors, recommendations)
watch_sectors = [_sector_focus(s) for s in sectors[:5]]
avoid_rules = _build_avoid_rules(temp, sectors, recommendations)
iteration_notes = _build_iteration_notes(performance, recommendations)
mode_prefix = "今日实时视角:" if prefer_realtime else ""
summary = (
f"{mode_prefix}{market_regime},风险等级{risk_level}"
f"当前 {len(recommendations)} 只入选,其中 {len(actionable)} 只可操作、"
f"{len(watch)} 只重点关注,平均分 {avg_score}"
f"{'主线活跃板块 ' + ' / '.join(s.sector_name for s in strong_sectors[:3]) + '' if strong_sectors else '板块尚未形成强共振,优先等确认。'}"
)
metrics = {
"temperature": temp,
"recommendation_count": len(recommendations),
"actionable_count": len(actionable),
"watch_count": len(watch),
"avg_score": avg_score,
"win_rate": performance.get("win_rate", 0),
"avg_return": performance.get("avg_return", 0),
"tracked": performance.get("tracked", 0),
}
return StrategyBoard(
trade_date=trade_date,
data_mode=data_mode,
market_regime=market_regime,
risk_level=risk_level,
action_bias=action_bias,
position_suggestion=position_suggestion,
summary=summary,
recommended_mode=recommended_mode,
strategy_focus=strategy_focus,
watch_sectors=watch_sectors,
avoid_rules=avoid_rules,
iteration_notes=iteration_notes,
metrics=metrics,
)
def _classify_market(
temp: float, market_temp: MarketTemperature | None
) -> tuple[str, str, str, str]:
if temp >= 75:
return ("强势进攻", "", "可积极关注主线龙头和突破确认", "单票 20%-30%,总仓 50%-70%")
if temp >= 60:
return ("修复偏强", "中低", "优先做早中期板块的突破/回踩确认", "单票 15%-25%,总仓 40%-60%")
if temp >= 45:
return ("震荡分化", "", "只做板块一致性强的低吸或确认机会", "单票 10%-20%,总仓 25%-40%")
if temp >= 30:
return ("弱势防守", "中高", "以观察池为主,减少追高,只等强确认", "单票 0%-10%,总仓 0%-25%")
return ("退潮冰点", "", "暂停主动出手,等待市场修复和主线重新出现", "空仓或极低仓观察")
def _choose_strategy_mode(
temp: float, sectors: list[SectorInfo], recommendations: list[Recommendation]
) -> str:
top = sectors[:5]
strong = [s for s in top if _sector_pct(s) >= 2 and _sector_breadth(s) > 0]
active = [s for s in top if _sector_pct(s) >= 1 and (_sector_turnover(s) >= 2 or s.limit_up_count >= 2)]
end_stage = [s for s in top if s.stage == "end" and _sector_pct(s) > 0]
if temp >= 65 and len(strong) >= 2:
return "顺主线做强势确认"
if temp >= 55 and active:
return "围绕强板块做回踩确认"
if temp < 45 or end_stage:
return "缩仓观察,回避后排追高"
if recommendations:
return "观察池跟踪,等待触发"
return "防守观察"
def _build_strategy_focus(
temp: float, sectors: list[SectorInfo], recommendations: list[Recommendation]
) -> list[StrategyFocus]:
focus: list[StrategyFocus] = []
signal_counts: dict[str, int] = {}
for rec in recommendations:
signal_counts[rec.entry_signal_type] = signal_counts.get(rec.entry_signal_type, 0) + 1
top_signal = max(signal_counts, key=signal_counts.get) if signal_counts else ""
signal_label = {
"breakout": "突破型",
"breakout_confirm": "突破确认型",
"pullback": "回踩型",
"launch": "启动型",
"reversal": "反转型",
}.get(top_signal, "观察型")
focus.append(StrategyFocus(
label=signal_label,
description=f"当前推荐中该类型占比较高,适合作为今日主要观察模板。",
))
if sectors:
main = sectors[0]
sector_pct = _sector_pct(main)
breadth = _sector_breadth(main)
turnover = _sector_turnover(main)
focus.append(StrategyFocus(
label=f"{main.sector_name} 主线跟踪",
description=(
f"当前涨幅 {sector_pct:+.2f}% ,广度 {breadth:+d},换手 {turnover:.1f}% "
f"阶段 {main.stage},优先确认资金是否延续。"
),
))
if len(sectors) > 1:
runner_up = sectors[1]
focus.append(StrategyFocus(
label=f"{runner_up.sector_name} 轮动监控",
description=(
f"强度分 {_sector_strength_score(runner_up)}"
f"若涨幅继续扩大且广度转强,可切入今日第二梯队。"
),
))
if temp < 45:
focus.append(StrategyFocus(
label="防守优先",
description="市场温度不足,推荐只作为观察池,不宜扩大仓位。",
))
elif sectors and _sector_pct(sectors[0]) < 1:
focus.append(StrategyFocus(
label="等一致性强化",
description="板块领涨幅度仍有限,优先等主线扩散和个股触发后再出手。",
))
return focus
def _sector_focus(sector: SectorInfo) -> StrategySectorFocus:
stage_view = {
"early": "早期,重点观察资金是否连续流入",
"mid": "中期,适合寻找回踩或突破确认",
"late": "后期,防止加速后分歧",
"end": "末期,谨慎追高",
}.get(sector.stage, "阶段不明,等待确认")
return StrategySectorFocus(
sector_name=sector.sector_name,
stage=sector.stage,
heat_score=sector.heat_score,
pct_change=_sector_pct(sector),
limit_up_count=sector.limit_up_count,
turnover_rate=_sector_turnover(sector),
up_count=_sector_up_count(sector),
down_count=_sector_down_count(sector),
data_mode=sector.data_mode,
view=(
f"{stage_view};当前涨幅 {_sector_pct(sector):+.2f}%"
+ (
f",上涨/下跌 {_sector_up_count(sector)}/{_sector_down_count(sector)}"
if sector.is_realtime else ""
)
),
)
def _build_avoid_rules(
temp: float, sectors: list[SectorInfo], recommendations: list[Recommendation]
) -> list[str]:
rules = []
top = sectors[:5]
if temp < 45:
rules.append("市场温度低于45时不追突破首日只等次日确认或回踩。")
if any(s.stage == "end" for s in top):
rules.append("板块进入末期时,降低同板块追高标的权重。")
if top and _sector_pct(top[0]) < 1:
rules.append("主线板块涨幅不足1%时,不把局部异动当成全面进攻信号。")
if any(_sector_breadth(s) < 0 for s in top[:3] if s.is_realtime):
rules.append("板块涨幅与广度背离时,优先回避后排补涨和冲高追单。")
if any(_sector_turnover(s) > 8 and s.stage in ("late", "end") for s in top):
rules.append("高换手叠加板块后期阶段时,防止情绪过热后的快速分歧。")
if any(r.position_score < 35 for r in recommendations):
rules.append("位置安全分低于35的标的只观察不主动追入。")
if not rules:
rules.append("推荐失效条件触发后不补仓,等待下一次扫描重新确认。")
return rules
def _build_iteration_notes(performance: dict, recommendations: list[Recommendation]) -> list[str]:
notes = []
tracked = performance.get("tracked", 0) or 0
win_rate = performance.get("win_rate", 0) or 0
avg_return = performance.get("avg_return", 0) or 0
hit_stop = performance.get("hit_stop_count", 0) or 0
hit_target = performance.get("hit_target_count", 0) or 0
if tracked < 10:
notes.append("跟踪样本不足,暂不自动调整策略权重,优先积累推荐生命周期数据。")
else:
if win_rate < 45:
notes.append("近期胜率偏低,下轮应提高入场确认门槛,减少弱势环境下的突破型推荐。")
if avg_return < 0:
notes.append("平均收益为负,建议收紧止损触发和推荐失效条件。")
if hit_stop > hit_target:
notes.append("止损次数多于命中目标,优先复查追高和板块末期惩罚是否不足。")
actionable_count = sum(1 for r in recommendations if r.action_plan == "可操作")
if actionable_count > 5:
notes.append("可操作标的偏多,前端应按板块集中度和评分排序控制关注数量。")
return notes
async def _generate_ai_review(
board: StrategyBoard,
recommendations: list[Recommendation],
performance: dict,
) -> str:
"""用 LLM 生成简短的策略解释,不参与硬性交易决策。"""
from app.llm.client import chat_completion
rec_lines = "\n".join(
f"- {r.name}({r.ts_code}) {r.action_plan} {r.entry_signal_type} "
f"评分{r.score} 仓位{r.suggested_position_pct}% 触发: {r.trigger_condition}"
for r in recommendations[:8]
) or "暂无推荐"
user_msg = f"""请基于以下系统数据生成一段今日A股策略作战说明要求
1. 明确区分市场事实、策略推断和风险约束;
2. 不要承诺收益,不要给绝对化买卖结论;
3. 最多220字中文。
市场状态: {board.market_regime}
风险等级: {board.risk_level}
操作倾向: {board.action_bias}
仓位建议: {board.position_suggestion}
推荐策略: {board.recommended_mode}
历史跟踪: 胜率{performance.get('win_rate', 0)}%, 平均收益{performance.get('avg_return', 0)}%
推荐摘要:
{rec_lines}
"""
resp = await chat_completion([
{"role": "system", "content": "你是一位谨慎的A股交易研究助手擅长把量化结果转成可执行但有风险边界的策略说明。"},
{"role": "user", "content": user_msg},
])
return resp.content.strip() if resp and resp.content else ""