"""Recommendation lifecycle and display-state derivation helpers.""" import json from datetime import datetime from app.config.config_loader import get_meta from app.core.opportunity_lifecycle import ( apply_entry_quality_gate, derive_display_bucket, is_executed_lifecycle, normalize_action_status, ) from app.core.strategy_registry import normalize_strategy_code, strategy_label def state_fields_for_storage(status, action_status, execution_status="", reason=""): bucket = derive_display_bucket(status or "active", action_status, execution_status) return ( bucket.get("execution_status", execution_status or "observe"), bucket.get("display_bucket", "watch_pool"), bucket.get("lifecycle_state", "watching"), 1 if is_executed_lifecycle(status or "active", action_status, bucket.get("execution_status")) else 0, reason or "", ) def derive_minimal_state_fields(status, action_status, entry_plan=None): action = normalize_action_status(action_status, status) if action == "可即刻买入": execution_status = "buy_now" reason = "策略确认当前入场窗口" elif action == "等回踩": execution_status = "wait_pullback" reason = "等待回踩触发,未触发前不计推荐收益" elif action == "持有": execution_status = "holding" reason = "已进入持仓跟踪" elif action in ("止盈1", "止盈2", "跟踪止盈"): execution_status = "completed" reason = "利润管理/阶段兑现" elif action in ("止损", "衰减", "反转", "放弃", "过期", "归档") or status in ("stopped_out", "expired", "invalid", "archived"): execution_status = "invalid" reason = "机会失效,归入历史复盘" else: execution_status = "observe" reason = "观察池,未触发入场" return state_fields_for_storage(status, action, execution_status, reason) def opportunity_fields_from_plan(entry_plan): plan = entry_plan if isinstance(entry_plan, dict) else {} return { "opportunity_level": str(plan.get("opportunity_level") or ""), "opportunity_level_label": str(plan.get("opportunity_level_label") or ""), "holding_horizon": str(plan.get("holding_horizon") or ""), "entry_model": str(plan.get("entry_model") or ""), "stop_model": str(plan.get("stop_model") or plan.get("stop_basis") or ""), "tp_model": str(plan.get("tp_model") or plan.get("tp_basis") or ""), } def entry_window_policy( entry_price, current_price, rec_time, event_time=None, window_hours=2.0, up_deviation_pct=1.5, down_deviation_pct=1.2, ): """Stage-1 entry window trust policy.""" event_time = event_time or datetime.now().strftime("%Y-%m-%dT%H:%M:%S") try: entry_price = float(entry_price or 0) current_price = float(current_price or 0) except Exception: entry_price = 0 current_price = 0 deviation_pct = round((current_price / entry_price - 1) * 100, 2) if entry_price and current_price else 0.0 age_minutes = 0.0 try: start = datetime.fromisoformat(str(rec_time)) end = datetime.fromisoformat(str(event_time)) age_minutes = round((end - start).total_seconds() / 60.0, 1) except Exception: age_minutes = 0.0 remaining_minutes = round(max(0.0, window_hours * 60.0 - age_minutes), 1) result = { "status": "active", "label": "入场窗口有效", "reason": "入场窗口仍在有效期内,价格未明显脱离触发价", "age_minutes": age_minutes, "remaining_minutes": remaining_minutes, "window_hours": window_hours, "entry_price": entry_price, "current_price": current_price, "deviation_pct": deviation_pct, "max_up_deviation_pct": up_deviation_pct, "max_down_deviation_pct": down_deviation_pct, } if age_minutes > window_hours * 60.0: result.update({ "status": "expired", "label": "窗口已过期", "reason": f"入场窗口超过有效期 {window_hours:g} 小时,避免沿用旧信号追入", "remaining_minutes": 0.0, }) elif deviation_pct > up_deviation_pct: result.update({ "status": "price_left_up", "label": "价格已上脱离", "reason": f"当前价较触发价上脱离 {deviation_pct:.2f}%,超过 {up_deviation_pct:g}% 阈值,避免追高", }) elif deviation_pct < -down_deviation_pct: result.update({ "status": "price_left_down", "label": "价格已下破", "reason": f"当前价较触发价下破 {abs(deviation_pct):.2f}%,买点动能失效,转观察", }) return result def risk_suggestion(entry_price, stop_loss, tp1, risk_budget_pct=1.0, max_position_pct=100.0): """Convert entry/stop/TP1 into a simple position-size suggestion.""" try: entry_price = float(entry_price or 0) stop_loss = float(stop_loss or 0) tp1 = float(tp1 or 0) except Exception: entry_price = stop_loss = tp1 = 0 stop_distance_pct = round(abs(entry_price - stop_loss) / entry_price * 100, 2) if entry_price and stop_loss else 0.0 suggested_position_pct = round(min(max_position_pct, risk_budget_pct / stop_distance_pct * 100), 2) if stop_distance_pct else 0.0 tp1_profit_pct = round((tp1 / entry_price - 1) * 100, 2) if entry_price and tp1 else 0.0 rr = round(tp1_profit_pct / stop_distance_pct, 2) if stop_distance_pct else 0.0 max_loss_pct = round(suggested_position_pct * stop_distance_pct / 100, 2) if suggested_position_pct else 0.0 return { "risk_budget_pct": risk_budget_pct, "stop_distance_pct": stop_distance_pct, "suggested_position_pct": suggested_position_pct, "max_loss_pct": max_loss_pct, "tp1_profit_pct": tp1_profit_pct, "rr": rr, "max_position_pct": max_position_pct, "valid": bool(entry_price and stop_loss and stop_distance_pct > 0), } def execution_fields_from_persisted_state(item, entry_plan=None): """Derive display execution state from persisted status/action only.""" entry_plan = entry_plan if entry_plan is not None else normalize_entry_plan(item.get("entry_plan_json")) status = (item.get("status") or "active").strip() action_status = normalize_action_status(item.get("action_status") or "持有", status) bucket = derive_display_bucket(status, action_status, "") execution_status = bucket.get("execution_status") if execution_status == "completed": return "completed", "✅ 已兑现,仅观察", f"该机会已进入{action_status or '利润管理'}阶段,仅作为持仓跟踪记录" if execution_status == "invalid": if action_status == "止损": reason = "该机会已触发风险边界,原入场逻辑失效" elif action_status == "衰减": reason = "该机会已出现趋势衰减,追高性价比下降" elif action_status == "反转": reason = "该机会已出现趋势反转,原多头逻辑被破坏" elif action_status == "放弃": reason = "该机会已被标记为放弃,不再满足入场条件" else: reason = "该机会观察周期结束或逻辑失效,已归入历史复盘" return "invalid", "🔴 已失效,勿追", reason if execution_status == "buy_now": stop = str(entry_plan.get("stop_loss", "")) if entry_plan else "" return "buy_now", "🟢 现在可买", "推荐时就是可即刻买入;策略确认当前仍在入场窗口" + ((",风险边界 " + stop) if stop else "") if execution_status == "wait_pullback": gate = entry_plan.get("entry_quality_gate") or {} if gate.get("reasons"): reason = "等待更优位置;" + ";".join(gate.get("reasons", [])[:3]) else: reason = "等待回踩至 " + (str(entry_plan.get("entry_price", "")) if entry_plan else "参考价") + " 附近再评估" return "wait_pullback", "🟡 等回踩,不追高", reason if execution_status == "holding": return "holding", "持仓跟踪", "该机会已触发入场,进入持仓跟踪" gate = entry_plan.get("entry_quality_gate") or {} if gate.get("reasons"): reason = "机会结构仍在观察;" + ";".join(gate.get("reasons", [])[:3]) else: reason = "暂无明确入场窗口,继续观察" return "observe", "观察池", reason def normalize_entry_plan(entry_plan_json): try: if isinstance(entry_plan_json, dict): return entry_plan_json if entry_plan_json: return json.loads(entry_plan_json) except Exception: pass return {} def normalize_json_object(payload): try: if isinstance(payload, dict): return payload if payload: parsed = json.loads(payload) if isinstance(parsed, dict): return parsed except Exception: pass return {} def normalize_signals(payload): try: if isinstance(payload, list): return payload if isinstance(payload, str) and payload.strip(): parsed = json.loads(payload) if isinstance(parsed, list): return parsed except Exception: pass return [] def observe_tier(item): """Observation pool tier: strong=worth user attention, weak=low-quality watch.""" status = str(item.get("execution_status") or "") if status in ("buy_now", "wait_pullback") or item.get("display_bucket") == "realtime": return "strong", "入场/等待类有效机会" try: score = float(item.get("rec_score") or 0) except Exception: score = 0 signals = item.get("signals") or [] if isinstance(signals, str): signals = normalize_signals(signals) sig_text = " ".join(str(x) for x in signals) force_reason = str(item.get("force_reason") or "") derivatives = normalize_json_object(item.get("derivatives_context_json") or item.get("derivatives_context")) market = normalize_json_object(item.get("market_context_json") or item.get("market_context")) if not derivatives and isinstance(item.get("derivatives_context"), dict): derivatives = item.get("derivatives_context") or {} if not market and isinstance(item.get("market_context"), dict): market = item.get("market_context") or {} long_pct = 0.0 try: long_pct = float(derivatives.get("top_trader_long_pct") or 0) except Exception: long_pct = 0.0 acc1 = 0.0 acc4 = 0.0 try: acc1 = float(market.get("turnover_acceleration_1h") or 0) acc4 = float(market.get("turnover_acceleration_4h") or 0) except Exception: pass stale_only = ("已过期" in sig_text or "历史" in sig_text) and not any(k in sig_text for k in ("当前", "新近", "刚刚", "入场窗口", "量价齐飞")) weak_reasons = [] if score < 50: weak_reasons.append(f"评分偏低({int(score)})") if stale_only: weak_reasons.append("主要触发来自历史/过期信号") if "静K蓄力旁路" in force_reason and acc4 < 1.3 and acc1 < 1.3: weak_reasons.append("静K旁路量能不足") gate = {} try: ep = item.get("entry_plan") or normalize_json_object(item.get("entry_plan_json")) gate = ep.get("entry_quality_gate") or {} except Exception: gate = {} gate_reasons = gate.get("reasons") or [] gate_reason_text = ";".join(str(x) for x in gate_reasons[:3]) if any("回踩参考已到" in str(x) and "不达标" in str(x) for x in gate_reasons): return "weak" if score < 55 else "strong", (gate_reason_text or "回踩参考已到,但实时盈亏比不达标") + ";暂不构成入场窗口,继续观察是否重新恢复可买盈亏比" strong_context = score >= 65 or long_pct >= 75 or max(acc1, acc4) >= 1.5 if weak_reasons and not strong_context: return "weak", ";".join(weak_reasons[:3]) if gate_reason_text: return "strong", gate_reason_text + ";继续观察结构是否恢复" return "strong", "观察池有效候选" def derive_execution_fields(item): entry_plan = normalize_entry_plan(item.get("entry_plan_json")) market_context = normalize_json_object(item.get("market_context_json")) derivatives_context = normalize_json_object(item.get("derivatives_context_json")) sector_context = normalize_json_object(item.get("sector_context_json")) signals = normalize_signals(item.get("signals")) item["signals"] = signals initial_action = normalize_action_status(entry_plan.get("entry_action") or item.get("action_status") or "持有", item.get("status") or "active") action_status = normalize_action_status(item.get("action_status") or initial_action or "持有", item.get("status") or "active") if action_status == "持有" and initial_action in ("可即刻买入", "等回踩", "观察"): action_status = initial_action current_price_for_window = item.get("latest_cache_price") or item.get("current_price") or item.get("entry_price") or 0 action_status, entry_plan, _entry_gate_reasons = apply_entry_quality_gate( action_status=action_status, entry_plan=entry_plan, signals=item.get("signals"), current_price=current_price_for_window, market_context=market_context, derivatives_context=derivatives_context, sector_context=sector_context, strategy_code=item.get("strategy_code") or entry_plan.get("strategy_code"), ) try: rec_score_for_gate = float(item.get("rec_score") or 0) except Exception: rec_score_for_gate = 0 if action_status == "可即刻买入" and rec_score_for_gate > 0 and rec_score_for_gate < 25: reasons = [f"推荐评分{rec_score_for_gate:g}<25,属于信号不足,禁止展示为现价买入"] gate = entry_plan.get("entry_quality_gate") if isinstance(entry_plan.get("entry_quality_gate"), dict) else {} existing_reasons = list(gate.get("reasons") or []) entry_plan["entry_quality_gate"] = { **gate, "blocked_action": gate.get("blocked_action") or action_status, "final_action": "观察", "reasons": existing_reasons + reasons, } action_status = "观察" if initial_action == "可即刻买入" and action_status != "可即刻买入": initial_action = action_status status = (item.get("status") or "active").strip() force_reason = (item.get("force_reason") or "").strip() base_state = (item.get("base_state") or "").strip() sector_signal_count = item.get("sector_signal_count") strategy_version = str(item.get("strategy_version") or "").strip() if not strategy_version: strategy_version = str(get_meta().get("strategy_version") or "").strip() if current_price_for_window: item["current_price"] = current_price_for_window try: entry_price_for_pnl = float(item.get("entry_price") or 0) current_price_float = float(current_price_for_window or 0) if entry_price_for_pnl > 0 and current_price_float > 0: item["pnl_pct"] = round((current_price_float - entry_price_for_pnl) / entry_price_for_pnl * 100, 2) except Exception: pass if item.get("latest_cache_updated_at"): item["current_price_updated_at"] = item.get("latest_cache_updated_at") entry_window = entry_window_policy( entry_plan.get("entry_price") or item.get("entry_price") or 0, current_price_for_window, item.get("rec_time") or "", ) if action_status == "可即刻买入" else {} if entry_window.get("status") == "price_left_down": try: current_for_entry = float(current_price_for_window or 0) stop_for_entry = float(entry_plan.get("stop_loss") or item.get("stop_loss") or 0) rr_live_ok = entry_plan.get("risk_reward_ok_live") is True or entry_plan.get("risk_reward_ok") is True trigger_ok = entry_plan.get("entry_trigger_confirmed") is True or int(item.get("entry_triggered") or 0) == 1 if current_for_entry > stop_for_entry > 0 and rr_live_ok and trigger_ok: entry_window = { **entry_window, "status": "active", "label": "价格更优", "reason": "当前价低于计划入场价,但尚未跌破止损且实时盈亏比仍合格,可按更优价格执行", } except Exception: pass if action_status == "可即刻买入" and entry_window: window_status = entry_window.get("status") if window_status in ("expired", "price_left_down"): action_status = "观察" elif window_status == "price_left_up": action_status = "等回踩" if window_status and window_status != "active": item["entry_window_alert"] = entry_window item_for_execution = {**item, "action_status": action_status} execution_status, execution_label, execution_reason = execution_fields_from_persisted_state(item_for_execution, entry_plan) bucket_fields = derive_display_bucket(status, action_status, execution_status) execution_status = bucket_fields.get("execution_status") or execution_status item["initial_action"] = initial_action item["action_status"] = normalize_action_status(action_status, status) item["execution_status"] = execution_status item["execution_label"] = execution_label item["execution_reason"] = execution_reason if item.get("entry_window_alert") and item["action_status"] == "可即刻买入": item["action_status"] = "等回踩" if item["entry_window_alert"].get("status") == "price_left_up" else "观察" execution_status, execution_label, execution_reason = execution_fields_from_persisted_state( {**item, "action_status": item["action_status"], "status": status}, entry_plan ) item["execution_status"] = execution_status item["execution_label"] = execution_label item["execution_reason"] = execution_reason item["display_bucket"] = bucket_fields.get("display_bucket") item["lifecycle_state"] = bucket_fields.get("lifecycle_state") bucket_fields = derive_display_bucket(status, item["action_status"], item["execution_status"]) item["execution_status"] = bucket_fields.get("execution_status") or item["execution_status"] item["display_bucket"] = bucket_fields.get("display_bucket") item["lifecycle_state"] = bucket_fields.get("lifecycle_state") item["entry_triggered"] = 1 if is_executed_lifecycle(status, item["action_status"], item["execution_status"]) else 0 observe_tier_value, observe_reason = observe_tier(item) item["observe_tier"] = observe_tier_value item["observe_reason"] = observe_reason item["entry_plan"] = entry_plan opportunity_fields = opportunity_fields_from_plan(entry_plan) for key, value in opportunity_fields.items(): item[key] = item.get(key) or value if item.get("opportunity_level") and not item.get("opportunity_level_label"): try: from app.core.opportunity_level import opportunity_level_meta meta = opportunity_level_meta(item["opportunity_level"]) item["opportunity_level_label"] = meta.get("label", "") item["holding_horizon"] = item.get("holding_horizon") or meta.get("holding_horizon", "") item["entry_model"] = item.get("entry_model") or meta.get("entry_model", "") item["stop_model"] = item.get("stop_model") or meta.get("stop_model", "") item["tp_model"] = item.get("tp_model") or meta.get("tp_model", "") except Exception: pass item["entry_window"] = entry_window if entry_window and entry_window.get("status") != "active": item["entry_window_alert"] = entry_window item["risk_suggestion"] = risk_suggestion( entry_plan.get("entry_price") or item.get("entry_price") or 0, entry_plan.get("stop_loss") or item.get("stop_loss") or 0, entry_plan.get("tp1") or entry_plan.get("take_profit_1") or item.get("tp1") or 0, ) item["market_context"] = market_context item["derivatives_context"] = derivatives_context item["sector_context"] = sector_context item["force_reason"] = force_reason item["base_state"] = base_state item["sector_signal_count"] = sector_signal_count item["strategy_version"] = strategy_version item["strategy_version_label"] = f"策略版本 {strategy_version}" if strategy_version else "" strategy_code = normalize_strategy_code(item.get("strategy_code")) item["strategy_code"] = strategy_code item["strategy_name"] = strategy_label(strategy_code) try: item["strategy_snapshot"] = json.loads(item.get("strategy_snapshot_json") or "{}") except Exception: item["strategy_snapshot"] = {} try: item["factor_roles"] = json.loads(item.get("factor_roles_json") or "{}") except Exception: item["factor_roles"] = {} attach_discovery_trade_fields(item) return item def is_actionable_execution_status(status): return status in ("buy_now", "wait_pullback") def attach_discovery_trade_fields(item): """Split discovery state from trade execution stage.""" discovery_state = str(item.get("rec_state") or "").strip() trade_stage = str(item.get("execution_status") or "observe").strip() or "observe" trade_label = item.get("execution_label") or "" discovery_label_map = { "爆发": "发现爆发", "加速": "发现加速", "蓄力": "发现蓄力", "过期": "发现过期", } trade_label_map = { "buy_now": "现在可买", "wait_pullback": "等回踩", "observe": "观察中", "holding": "持仓跟踪", "completed": "已兑现", "invalid": "已失效", } item["discovery_state"] = discovery_state item["discovery_label"] = discovery_label_map.get(discovery_state, discovery_state or "发现观察") item["trade_stage"] = trade_stage item["trade_stage_label"] = trade_label or trade_label_map.get(trade_stage, trade_stage or "观察中") item["is_discovery_burst"] = discovery_state == "爆发" item["is_executable_now"] = trade_stage == "buy_now" item["is_trade_candidate"] = trade_stage in ("buy_now", "wait_pullback") item["is_watch_pool"] = trade_stage in ("wait_pullback", "observe") or item.get("display_bucket") == "watch_pool" return item def is_executed_trade(item): """Only truly triggered/position/closed samples count as executed PnL.""" status = (item.get("status") or "").strip() action_status = normalize_action_status(item.get("action_status"), status) execution_status = item.get("execution_status") or "" try: entry_triggered = int(item.get("entry_triggered") or 0) == 1 except Exception: entry_triggered = False if entry_triggered: return True if status in ("hit_tp1", "hit_tp2", "stopped_out"): return True if item.get("display_bucket") == "position" or execution_status in ("holding", "completed"): return True return is_executed_lifecycle(status, action_status, execution_status) def classify_recommendation_result(item): """Classify recommendation outcome without counting untriggered watch items as trades.""" status = item.get("status") or "" pnl_pct = item.get("pnl_pct") or 0 max_pnl_pct = item.get("max_pnl_pct") or 0 max_drawdown_pct = item.get("max_drawdown_pct") or 0 if status in ("hit_tp1", "hit_tp2"): return "success", "✅ 止盈成功" if status == "stopped_out": return "failed", "❌ 止损失败" if not is_executed_trade(item): return "pending", "⏳ 未执行" if status == "expired": if max_pnl_pct >= 5: return "success", "✅ 交易成功" if pnl_pct <= -3 or max_drawdown_pct <= -5: return "failed", "❌ 交易失败" return "pending", "⏳ 跟踪中" if status == "active": if max_pnl_pct >= 5: return "success", "✅ 交易成功" if pnl_pct <= -3 or max_drawdown_pct <= -5: return "failed", "❌ 交易失败" return "pending", "⏳ 跟踪中" return "pending", "⏳ 未执行"