astock-agent/backend/app/llm/strategy_board.py
2026-04-22 11:02:19 +08:00

269 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""市场作战面板
把市场温度、板块、推荐和历史跟踪结果汇总成每天可执行的策略视图。
规则层保证稳定输出LLM 层负责补充解释和迭代建议。
"""
import logging
from app.config import settings
from app.data.models import (
MarketTemperature,
Recommendation,
SectorInfo,
StrategyBoard,
StrategyFocus,
StrategySectorFocus,
)
logger = logging.getLogger(__name__)
async def build_strategy_board(include_llm: bool = False) -> dict:
"""生成今日市场作战面板。"""
from app.engine.recommender import (
get_latest_recommendations,
get_latest_sectors,
get_performance_stats,
)
latest = await get_latest_recommendations()
market_temp = latest.get("market_temp")
recommendations = latest.get("recommendations", [])
sectors = await get_latest_sectors()
performance = await get_performance_stats()
from app.llm.strategy_iteration import build_strategy_iteration_report
iteration_report = await build_strategy_iteration_report(limit=50, include_llm=include_llm)
board = _build_rule_board(market_temp, sectors, recommendations, performance)
board.iteration_report = iteration_report
if iteration_report.get("adjustment_suggestions"):
board.iteration_notes = [
s.get("reason", "")
for s in iteration_report["adjustment_suggestions"][:3]
if s.get("reason")
] or board.iteration_notes
if include_llm and settings.deepseek_api_key:
board.ai_review = await _generate_ai_review(board, recommendations, performance)
if board.ai_review:
board.generated_by = "rules+llm"
return board.model_dump()
def _build_rule_board(
market_temp: MarketTemperature | None,
sectors: list[SectorInfo],
recommendations: list[Recommendation],
performance: dict,
) -> StrategyBoard:
temp = market_temp.temperature if market_temp else 0
trade_date = market_temp.trade_date if market_temp else ""
market_regime, risk_level, action_bias, position_suggestion = _classify_market(temp, market_temp)
actionable = [r for r in recommendations if r.action_plan == "可操作"]
watch = [r for r in recommendations if r.action_plan == "重点关注"]
avg_score = (
round(sum(r.score for r in recommendations) / len(recommendations), 1)
if recommendations else 0
)
recommended_mode = _choose_strategy_mode(temp, sectors, recommendations)
strategy_focus = _build_strategy_focus(temp, sectors, recommendations)
watch_sectors = [_sector_focus(s) for s in sectors[:5]]
avoid_rules = _build_avoid_rules(temp, sectors, recommendations)
iteration_notes = _build_iteration_notes(performance, recommendations)
summary = (
f"{market_regime},风险等级{risk_level}"
f"当前 {len(recommendations)} 只入选,其中 {len(actionable)} 只可操作、"
f"{len(watch)} 只重点关注,平均分 {avg_score}"
)
metrics = {
"temperature": temp,
"recommendation_count": len(recommendations),
"actionable_count": len(actionable),
"watch_count": len(watch),
"avg_score": avg_score,
"win_rate": performance.get("win_rate", 0),
"avg_return": performance.get("avg_return", 0),
"tracked": performance.get("tracked", 0),
}
return StrategyBoard(
trade_date=trade_date,
market_regime=market_regime,
risk_level=risk_level,
action_bias=action_bias,
position_suggestion=position_suggestion,
summary=summary,
recommended_mode=recommended_mode,
strategy_focus=strategy_focus,
watch_sectors=watch_sectors,
avoid_rules=avoid_rules,
iteration_notes=iteration_notes,
metrics=metrics,
)
def _classify_market(
temp: float, market_temp: MarketTemperature | None
) -> tuple[str, str, str, str]:
if temp >= 75:
return ("强势进攻", "", "可积极关注主线龙头和突破确认", "单票 20%-30%,总仓 50%-70%")
if temp >= 60:
return ("修复偏强", "中低", "优先做早中期板块的突破/回踩确认", "单票 15%-25%,总仓 40%-60%")
if temp >= 45:
return ("震荡分化", "", "只做板块一致性强的低吸或确认机会", "单票 10%-20%,总仓 25%-40%")
if temp >= 30:
return ("弱势防守", "中高", "以观察池为主,减少追高,只等强确认", "单票 0%-10%,总仓 0%-25%")
return ("退潮冰点", "", "暂停主动出手,等待市场修复和主线重新出现", "空仓或极低仓观察")
def _choose_strategy_mode(
temp: float, sectors: list[SectorInfo], recommendations: list[Recommendation]
) -> str:
early_mid = [s for s in sectors[:5] if s.stage in ("early", "mid")]
if temp >= 60 and early_mid:
return "主线突破 + 回踩确认"
if temp >= 45:
return "精选回踩,降低追高"
if recommendations:
return "观察池跟踪,等待触发"
return "防守观察"
def _build_strategy_focus(
temp: float, sectors: list[SectorInfo], recommendations: list[Recommendation]
) -> list[StrategyFocus]:
focus: list[StrategyFocus] = []
signal_counts: dict[str, int] = {}
for rec in recommendations:
signal_counts[rec.entry_signal_type] = signal_counts.get(rec.entry_signal_type, 0) + 1
top_signal = max(signal_counts, key=signal_counts.get) if signal_counts else ""
signal_label = {
"breakout": "突破型",
"breakout_confirm": "突破确认型",
"pullback": "回踩型",
"launch": "启动型",
"reversal": "反转型",
}.get(top_signal, "观察型")
focus.append(StrategyFocus(
label=signal_label,
description=f"当前推荐中该类型占比较高,适合作为今日主要观察模板。",
))
if sectors:
main = sectors[0]
focus.append(StrategyFocus(
label=f"{main.sector_name} 主线跟踪",
description=f"热度 {main.heat_score},阶段 {main.stage},优先确认资金是否延续。",
))
if temp < 45:
focus.append(StrategyFocus(
label="防守优先",
description="市场温度不足,推荐只作为观察池,不宜扩大仓位。",
))
return focus
def _sector_focus(sector: SectorInfo) -> StrategySectorFocus:
stage_view = {
"early": "早期,重点观察资金是否连续流入",
"mid": "中期,适合寻找回踩或突破确认",
"late": "后期,防止加速后分歧",
"end": "末期,谨慎追高",
}.get(sector.stage, "阶段不明,等待确认")
return StrategySectorFocus(
sector_name=sector.sector_name,
stage=sector.stage,
heat_score=sector.heat_score,
pct_change=sector.pct_change,
limit_up_count=sector.limit_up_count,
view=stage_view,
)
def _build_avoid_rules(
temp: float, sectors: list[SectorInfo], recommendations: list[Recommendation]
) -> list[str]:
rules = []
if temp < 45:
rules.append("市场温度低于45时不追突破首日只等次日确认或回踩。")
if any(s.stage == "end" for s in sectors[:5]):
rules.append("板块进入末期时,降低同板块追高标的权重。")
if any(r.position_score < 35 for r in recommendations):
rules.append("位置安全分低于35的标的只观察不主动追入。")
if not rules:
rules.append("推荐失效条件触发后不补仓,等待下一次扫描重新确认。")
return rules
def _build_iteration_notes(performance: dict, recommendations: list[Recommendation]) -> list[str]:
notes = []
tracked = performance.get("tracked", 0) or 0
win_rate = performance.get("win_rate", 0) or 0
avg_return = performance.get("avg_return", 0) or 0
hit_stop = performance.get("hit_stop_count", 0) or 0
hit_target = performance.get("hit_target_count", 0) or 0
if tracked < 10:
notes.append("跟踪样本不足,暂不自动调整策略权重,优先积累推荐生命周期数据。")
else:
if win_rate < 45:
notes.append("近期胜率偏低,下轮应提高入场确认门槛,减少弱势环境下的突破型推荐。")
if avg_return < 0:
notes.append("平均收益为负,建议收紧止损触发和推荐失效条件。")
if hit_stop > hit_target:
notes.append("止损次数多于命中目标,优先复查追高和板块末期惩罚是否不足。")
actionable_count = sum(1 for r in recommendations if r.action_plan == "可操作")
if actionable_count > 5:
notes.append("可操作标的偏多,前端应按板块集中度和评分排序控制关注数量。")
return notes
async def _generate_ai_review(
board: StrategyBoard,
recommendations: list[Recommendation],
performance: dict,
) -> str:
"""用 LLM 生成简短的策略解释,不参与硬性交易决策。"""
from app.llm.client import chat_completion
rec_lines = "\n".join(
f"- {r.name}({r.ts_code}) {r.action_plan} {r.entry_signal_type} "
f"评分{r.score} 仓位{r.suggested_position_pct}% 触发: {r.trigger_condition}"
for r in recommendations[:8]
) or "暂无推荐"
user_msg = f"""请基于以下系统数据生成一段今日A股策略作战说明要求
1. 明确区分市场事实、策略推断和风险约束;
2. 不要承诺收益,不要给绝对化买卖结论;
3. 最多220字中文。
市场状态: {board.market_regime}
风险等级: {board.risk_level}
操作倾向: {board.action_bias}
仓位建议: {board.position_suggestion}
推荐策略: {board.recommended_mode}
历史跟踪: 胜率{performance.get('win_rate', 0)}%, 平均收益{performance.get('avg_return', 0)}%
推荐摘要:
{rec_lines}
"""
resp = await chat_completion([
{"role": "system", "content": "你是一位谨慎的A股交易研究助手擅长把量化结果转成可执行但有风险边界的策略说明。"},
{"role": "user", "content": user_msg},
])
return resp.content.strip() if resp and resp.content else ""