"""Recommendation write commands and lifecycle transitions.""" import json from datetime import datetime, timedelta from app.config.config_loader import get_meta from app.core.opportunity_lifecycle import ( apply_entry_quality_gate, normalize_action_status, normalize_json_object, ) from app.core.signal_taxonomy import signal_codes as build_signal_codes, signal_labels as build_signal_labels from app.core.strategy_contract import signal_to_recommendation_context from app.core.trade_direction import direction_label, trade_side_from_payload from app.core.strategy_registry import normalize_strategy_code from app.db.recommendation_state import ( derive_minimal_state_fields, entry_window_policy, execution_fields_from_persisted_state, normalize_entry_plan, normalize_signals, opportunity_fields_from_plan, risk_suggestion, state_fields_for_storage, ) from app.db.schema import get_conn def _merged_signal_payload(*payloads): merged = [] seen = set() for payload in payloads: values = [] if isinstance(payload, list): values = payload elif isinstance(payload, str) and payload.strip(): try: parsed = json.loads(payload) values = parsed if isinstance(parsed, list) else [payload] except Exception: values = [payload] for value in values: key = str(value) if key not in seen: seen.add(key) merged.append(value) return merged def _serialized_signal_payload(signals): labels = build_signal_labels(signals if isinstance(signals, list) else normalize_signals(signals)) codes = build_signal_codes(labels) stored_signals = json.dumps(labels, ensure_ascii=False) if isinstance(signals, list) else signals return stored_signals, json.dumps(codes, ensure_ascii=False), json.dumps(labels, ensure_ascii=False) def create_recommendation( symbol, rec_state, rec_score, entry_price, stop_loss=0, tp1=0, tp2=0, sector="", signals="", is_meme=0, entry_plan=None, direction="中性", force_reason="", base_state="", sector_signal_count=0, market_context=None, derivatives_context=None, sector_context=None, strategy_code="", strategy_signal_id=0, strategy_snapshot=None, factor_roles=None, ): """Create or merge the current recommendation record for one symbol.""" entry_plan = dict(entry_plan or {}) raw_pct = round(rec_score * 100.0 / 30) if rec_score else 0 rec_score_pct = min(raw_pct, 100) strategy_context = signal_to_recommendation_context( strategy_snapshot if strategy_snapshot else { "strategy_code": strategy_code, "strategy_signal_id": strategy_signal_id, "factor_roles": factor_roles or {}, }, fallback_symbol=symbol, fallback_score=rec_score_pct, signal_codes=build_signal_codes(build_signal_labels(signals if isinstance(signals, list) else normalize_signals(signals))), entry_plan=entry_plan, market_context=market_context or {}, ) side = trade_side_from_payload(entry_plan, strategy_context.get("strategy_snapshot"), direction) entry_plan["side"] = side direction = direction_label(side) if isinstance(strategy_context.get("strategy_snapshot"), dict): strategy_context["strategy_snapshot"]["direction"] = side snapshot_plan = dict(strategy_context["strategy_snapshot"].get("entry_plan") or {}) snapshot_plan["side"] = side strategy_context["strategy_snapshot"]["entry_plan"] = {**entry_plan, **snapshot_plan} strategy_code = normalize_strategy_code(strategy_context.get("strategy_code")) strategy_signal_id = int(strategy_context.get("strategy_signal_id") or 0) strategy_snapshot = strategy_context.get("strategy_snapshot") or {} factor_roles = strategy_context.get("factor_roles") or {} strategy_version = str(get_meta().get("strategy_version") or strategy_context.get("strategy_version") or "").strip() if isinstance(strategy_snapshot, dict): strategy_snapshot["strategy_version"] = strategy_version now = datetime.now().isoformat() conn = get_conn() incoming_action = normalize_action_status((entry_plan or {}).get("entry_action", "观察") if entry_plan else "观察", "active") incoming_exec, incoming_bucket, incoming_lifecycle, incoming_triggered, incoming_reason = derive_minimal_state_fields( "active", incoming_action, entry_plan or {} ) stored_signals, signal_codes_json, signal_labels_json = _serialized_signal_payload(signals) opportunity_fields = opportunity_fields_from_plan(entry_plan or {}) duplicate_cursor = conn.execute( """ SELECT * FROM recommendation WHERE symbol=%s AND status='active' AND direction=%s AND COALESCE(display_bucket,'watch_pool') != 'history' ORDER BY id DESC LIMIT 1 """, (symbol, direction), ) duplicate_row = duplicate_cursor.fetchone() if hasattr(duplicate_cursor, "fetchone") else None if duplicate_row and (entry_plan or duplicate_row["rec_state"] == rec_state): existing_id = duplicate_row["id"] if hasattr(duplicate_row, "keys") else duplicate_row[0] existing_score = duplicate_row["rec_score"] or 0 merged_state = rec_state merged_score = max(existing_score, rec_score_pct) conn.execute( """ UPDATE recommendation SET rec_state=%s, rec_score=%s, sector=COALESCE(NULLIF(%s, ''), sector), signals=%s, signal_codes_json=%s, signal_labels_json=%s, is_meme=%s, direction=%s, strategy_version=%s, strategy_code=COALESCE(NULLIF(%s, ''), NULLIF(strategy_code, ''), 'main_composite_v1'), strategy_signal_id=CASE WHEN %s > 0 THEN %s ELSE COALESCE(strategy_signal_id, 0) END, strategy_snapshot_json=CASE WHEN %s != '{}' THEN %s ELSE COALESCE(strategy_snapshot_json, '{}') END, factor_roles_json=CASE WHEN %s != '{}' THEN %s ELSE COALESCE(factor_roles_json, '{}') END, force_reason=COALESCE(NULLIF(%s, ''), force_reason), base_state=COALESCE(NULLIF(%s, ''), base_state), sector_signal_count=GREATEST(COALESCE(sector_signal_count,0), %s), entry_plan_json=CASE WHEN %s != '{}' THEN %s ELSE entry_plan_json END, market_context_json=%s, derivatives_context_json=%s, sector_context_json=%s, opportunity_level=COALESCE(NULLIF(%s, ''), opportunity_level), opportunity_level_label=COALESCE(NULLIF(%s, ''), opportunity_level_label), holding_horizon=COALESCE(NULLIF(%s, ''), holding_horizon), entry_model=COALESCE(NULLIF(%s, ''), entry_model), stop_model=COALESCE(NULLIF(%s, ''), stop_model), tp_model=COALESCE(NULLIF(%s, ''), tp_model), action_status=CASE WHEN action_status IN ('止盈1','止盈2','止损','跟踪止盈','衰减','反转') THEN action_status ELSE COALESCE(NULLIF(%s, ''), action_status) END, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s WHERE id=%s """, ( merged_state, merged_score, sector, stored_signals, signal_codes_json, signal_labels_json, is_meme, direction, strategy_version, strategy_code, strategy_signal_id, strategy_signal_id, json.dumps(strategy_snapshot or {}, ensure_ascii=False), json.dumps(strategy_snapshot or {}, ensure_ascii=False), json.dumps(factor_roles or {}, ensure_ascii=False), json.dumps(factor_roles or {}, ensure_ascii=False), force_reason or "", base_state or "", int(sector_signal_count or 0), json.dumps(entry_plan or {}, ensure_ascii=False), json.dumps(entry_plan or {}, ensure_ascii=False), json.dumps(market_context or {}, ensure_ascii=False), json.dumps(derivatives_context or {}, ensure_ascii=False), json.dumps(sector_context or {}, ensure_ascii=False), opportunity_fields["opportunity_level"], opportunity_fields["opportunity_level_label"], opportunity_fields["holding_horizon"], opportunity_fields["entry_model"], opportunity_fields["stop_model"], opportunity_fields["tp_model"], incoming_action if entry_plan else "", incoming_exec, incoming_bucket, incoming_lifecycle, incoming_triggered, incoming_reason, existing_id, ), ) conn.commit() conn.close() return existing_id cursor = conn.execute( """ INSERT INTO recommendation (symbol, rec_time, rec_state, rec_score, entry_price, stop_loss, tp1, tp2, sector, signals, signal_codes_json, signal_labels_json, is_meme, direction, current_price, max_price, min_price, last_track_time, entry_plan_json, force_reason, base_state, sector_signal_count, market_context_json, derivatives_context_json, sector_context_json, opportunity_level, opportunity_level_label, holding_horizon, entry_model, stop_model, tp_model, action_status, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, strategy_code, strategy_signal_id, strategy_snapshot_json, factor_roles_json, strategy_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( symbol, now, rec_state, rec_score_pct, entry_price, stop_loss, tp1, tp2, sector, stored_signals, signal_codes_json, signal_labels_json, is_meme, direction, entry_price, entry_price, entry_price, now, json.dumps(entry_plan, ensure_ascii=False) if entry_plan else "{}", force_reason or "", base_state or "", int(sector_signal_count or 0), json.dumps(market_context or {}, ensure_ascii=False), json.dumps(derivatives_context or {}, ensure_ascii=False), json.dumps(sector_context or {}, ensure_ascii=False), opportunity_fields["opportunity_level"], opportunity_fields["opportunity_level_label"], opportunity_fields["holding_horizon"], opportunity_fields["entry_model"], opportunity_fields["stop_model"], opportunity_fields["tp_model"], incoming_action, incoming_exec, incoming_bucket, incoming_lifecycle, incoming_triggered, incoming_reason, strategy_code, strategy_signal_id, json.dumps(strategy_snapshot or {}, ensure_ascii=False), json.dumps(factor_roles or {}, ensure_ascii=False), strategy_version, ), ) rec_id = cursor.fetchone()["id"] conn.commit() conn.close() return rec_id def expire_old_recommendations(hours=48): """Mark old active recommendations as expired.""" conn = get_conn() cutoff = (datetime.now() - timedelta(hours=float(hours or 48))).isoformat() execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = derive_minimal_state_fields( "expired", "过期", {} ) conn.execute( """ UPDATE recommendation SET status='expired', action_status=CASE WHEN action_status IN ('止盈1','止盈2','止损','跟踪止盈') THEN action_status ELSE '过期' END, expired_time=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s WHERE status='active' AND rec_time < %s """, (datetime.now().isoformat(), execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, cutoff), ) conn.commit() conn.close() def downgrade_active_entries_for_market_risk(reason: str, event_time: str | None = None) -> dict: """Downgrade active executable recommendations when global market risk blocks entries.""" event_time = event_time or datetime.now().isoformat() reason = str(reason or "全市场风险过高,暂停新开仓与新挂单").strip() conn = get_conn() rows = conn.execute( """ SELECT id, entry_plan_json, action_status FROM recommendation WHERE status='active' AND ( COALESCE(action_status,'') IN ('可即刻买入','等回踩') OR COALESCE(execution_status,'') IN ('buy_now','wait_pullback') ) ORDER BY id DESC """ ).fetchall() updated = 0 execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = derive_minimal_state_fields( "active", "观察", {} ) for row in rows: entry_plan = normalize_json_object(row["entry_plan_json"]) previous_action = str(row["action_status"] or "").strip() entry_plan["market_risk_gate"] = { "blocked_action": previous_action, "final_action": "观察", "risk_level": "critical", "reasons": [reason], "updated_at": event_time, } entry_plan["entry_action"] = "观察" conn.execute( """ UPDATE recommendation SET action_status='观察', execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s, entry_plan_json=%s WHERE id=%s """, ( execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, json.dumps(entry_plan, ensure_ascii=False), row["id"], ), ) updated += 1 conn.commit() conn.close() return {"updated_count": updated, "reason": reason, "updated_at": event_time} def apply_recommendation_state_transition(rec_id, requested_action, current_price, event_time=None, signals=None): """The single DB entry for turning price events into recommendation action state.""" event_time = event_time or datetime.now().strftime("%Y-%m-%dT%H:%M:%S") conn = get_conn() row = conn.execute("SELECT * FROM recommendation WHERE id=%s", (rec_id,)).fetchone() if not row: conn.close() return {"updated": False, "push_required": False, "reason": "not_found"} item = dict(row) previous_action = (item.get("action_status") or "持有").strip() entry_plan = normalize_entry_plan(item.get("entry_plan_json")) terminal_map = {"hit_tp2": "止盈2", "stopped_out": "止损"} status = (item.get("status") or "active").strip() final_action = normalize_action_status(terminal_map.get(status, requested_action), status) if status not in terminal_map: final_action, entry_plan, gate_reasons = apply_entry_quality_gate( action_status=final_action, entry_plan=entry_plan, signals=_merged_signal_payload(item.get("signals"), signals) if signals is not None else item.get("signals"), current_price=current_price, market_context=normalize_json_object(item.get("market_context_json")), derivatives_context=normalize_json_object(item.get("derivatives_context_json")), sector_context=normalize_json_object(item.get("sector_context_json")), strategy_code=item.get("strategy_code") or entry_plan.get("strategy_code"), ) else: gate_reasons = [] window_entry_price = item.get("entry_price") or current_price or 0 window_rec_time = item.get("rec_time") or event_time if final_action == "可即刻买入" and previous_action != "可即刻买入": window_entry_price = current_price window_rec_time = event_time entry_window = entry_window_policy(window_entry_price, current_price, window_rec_time, event_time) if final_action == "可即刻买入" and previous_action == "可即刻买入": if entry_window["status"] == "expired": final_action = "观察" gate_reasons.append(entry_window["reason"]) elif entry_window["status"] == "price_left_up": final_action = "等回踩" gate_reasons.append(entry_window["reason"]) elif entry_window["status"] == "price_left_down": final_action = "观察" gate_reasons.append(entry_window["reason"]) should_reset_entry = final_action == "可即刻买入" and previous_action != "可即刻买入" if should_reset_entry: max_price = current_price min_price = current_price pnl_pct = 0.0 max_pnl_pct = 0.0 max_drawdown_pct = 0.0 rec_time = event_time entry_price = current_price else: old_entry = item.get("entry_price") or current_price or 0 old_max = item.get("max_price") or old_entry old_min = item.get("min_price") or old_entry max_price = max(old_max, current_price) if current_price else old_max min_price = min(old_min, current_price) if current_price else old_min entry_price = old_entry rec_time = item.get("rec_time") pnl_pct = round((current_price / old_entry - 1) * 100, 2) if old_entry and current_price else item.get("pnl_pct", 0) max_pnl_pct = round((max_price / old_entry - 1) * 100, 2) if old_entry else item.get("max_pnl_pct", 0) max_drawdown_pct = round((min_price / old_entry - 1) * 100, 2) if old_entry else item.get("max_drawdown_pct", 0) execution_status, execution_label, execution_reason = execution_fields_from_persisted_state( {**item, "action_status": final_action, "status": status}, entry_plan ) execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = state_fields_for_storage( status, final_action, execution_status, execution_reason ) push_required = final_action in ("可即刻买入", "跟踪止盈") and previous_action != final_action and execution_status in ("buy_now", "completed") conn.execute( """ UPDATE recommendation SET action_status=%s, entry_plan_json=%s, current_price=%s, max_price=%s, min_price=%s, pnl_pct=%s, max_pnl_pct=%s, max_drawdown_pct=%s, last_track_time=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s, rec_time=CASE WHEN %s=1 THEN %s ELSE rec_time END, entry_price=CASE WHEN %s=1 THEN %s ELSE entry_price END WHERE id=%s """, ( final_action, json.dumps(entry_plan, ensure_ascii=False), current_price, max_price, min_price, pnl_pct, max_pnl_pct, max_drawdown_pct, event_time, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, 1 if should_reset_entry else 0, rec_time, 1 if should_reset_entry else 0, entry_price, rec_id, ), ) conn.commit() conn.close() return { "updated": True, "id": rec_id, "symbol": item.get("symbol"), "previous_action_status": previous_action, "action_status": final_action, "execution_status": execution_status, "execution_label": execution_label, "execution_reason": execution_reason, "display_bucket": display_bucket, "lifecycle_state": lifecycle_state, "entry_triggered": entry_triggered, "entry_price": entry_price, "current_price": current_price, "pnl_pct": pnl_pct, "stop_loss": item.get("stop_loss") or entry_plan.get("stop_loss") or 0, "tp1": item.get("tp1") or entry_plan.get("tp1") or entry_plan.get("take_profit_1") or 0, "tp2": item.get("tp2") or entry_plan.get("tp2") or 0, "entry_plan": entry_plan, "entry_window": entry_window, "risk_suggestion": risk_suggestion( entry_price, item.get("stop_loss") or entry_plan.get("stop_loss") or 0, item.get("tp1") or entry_plan.get("tp1") or entry_plan.get("take_profit_1") or 0, ), "gate_reasons": gate_reasons, "push_required": push_required, "push_symbol": item.get("symbol"), "push_entry_price": entry_price, "push_current_price": current_price, "push_pnl_pct": pnl_pct, "push_signals": signals or [], } def recompute_all_recommendation_state_fields(conn=None): """Backfill unified recommendation state fields from persisted status/action.""" owns_conn = conn is None if owns_conn: conn = get_conn() rows = conn.execute("SELECT id,status,action_status,entry_plan_json FROM recommendation").fetchall() updated = 0 for row in rows: ep = normalize_entry_plan(row["entry_plan_json"]) action = normalize_action_status(row["action_status"], row["status"]) execution_status, execution_label, execution_reason = execution_fields_from_persisted_state( {"status": row["status"], "action_status": action, "entry_plan_json": row["entry_plan_json"]}, ep ) execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = state_fields_for_storage( row["status"], action, execution_status, execution_reason ) conn.execute( """UPDATE recommendation SET action_status=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s WHERE id=%s""", (action, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, row["id"]), ) updated += 1 if owns_conn: conn.commit() conn.close() return updated def update_recommendation_action_status(rec_id, action_status): """Update action status while protecting terminal trades and quality gates.""" conn = get_conn() row = conn.execute( """ SELECT symbol, status, action_status, entry_plan_json, signals, current_price, market_context_json, derivatives_context_json, sector_context_json FROM recommendation WHERE id=%s """, (rec_id,), ).fetchone() terminal_map = { "hit_tp1": "止盈1", "hit_tp2": "止盈2", "stopped_out": "止损", } entry_plan = {} if row: if row["status"] in terminal_map and action_status not in ("止盈1", "止盈2", "止损", "跟踪止盈"): action_status = terminal_map[row["status"]] else: entry_plan = normalize_entry_plan(row["entry_plan_json"]) gated_action, gated_plan, _ = apply_entry_quality_gate( action_status=action_status, entry_plan=entry_plan, signals=row["signals"], current_price=row["current_price"] or 0, market_context=normalize_json_object(row["market_context_json"]), derivatives_context=normalize_json_object(row["derivatives_context_json"]), sector_context=normalize_json_object(row["sector_context_json"]), strategy_code=row["strategy_code"] or entry_plan.get("strategy_code"), ) action_status = gated_action entry_plan = gated_plan if row["status"] not in terminal_map and row["symbol"]: try: trade = conn.execute( "SELECT status, closed_at FROM paper_trades WHERE recommendation_id=%s", (rec_id,), ).fetchone() if trade and trade.get("status") == "closed": action_status = row["action_status"] if row["action_status"] in ("止盈1", "止盈2", "止损", "跟踪止盈") else "观察" entry_plan.setdefault("paper_trade_closed", True) entry_plan.setdefault("paper_trade_closed_at", trade.get("closed_at")) except Exception: pass if entry_plan: execution_status, execution_label, execution_reason = execution_fields_from_persisted_state( {"status": row["status"] if row else "active", "action_status": action_status, "entry_plan_json": json.dumps(entry_plan, ensure_ascii=False)}, entry_plan, ) execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = state_fields_for_storage( row["status"] if row else "active", action_status, execution_status, execution_reason ) conn.execute( """ UPDATE recommendation SET action_status=%s, entry_plan_json=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s WHERE id=%s """, (action_status, json.dumps(entry_plan, ensure_ascii=False), execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, rec_id), ) else: execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = derive_minimal_state_fields( row["status"] if row else "active", action_status, {} ) conn.execute( """ UPDATE recommendation SET action_status=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s WHERE id=%s """, (action_status, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, rec_id), ) conn.commit() conn.close()