"""Recommendation write commands and lifecycle transitions.""" import json from datetime import datetime, timedelta from app.config.config_loader import get_meta from app.core.opportunity_lifecycle import ( apply_entry_quality_gate, normalize_action_status, normalize_json_object, ) from app.core.signal_taxonomy import signal_codes as build_signal_codes, signal_labels as build_signal_labels from app.db.recommendation_state import ( derive_minimal_state_fields, entry_window_policy, execution_fields_from_persisted_state, normalize_entry_plan, normalize_signals, opportunity_fields_from_plan, risk_suggestion, state_fields_for_storage, ) from app.db.schema import get_conn def _serialized_signal_payload(signals): labels = build_signal_labels(signals if isinstance(signals, list) else normalize_signals(signals)) codes = build_signal_codes(labels) stored_signals = json.dumps(labels, ensure_ascii=False) if isinstance(signals, list) else signals return stored_signals, json.dumps(codes, ensure_ascii=False), json.dumps(labels, ensure_ascii=False) def create_recommendation( symbol, rec_state, rec_score, entry_price, stop_loss=0, tp1=0, tp2=0, sector="", signals="", is_meme=0, entry_plan=None, direction="中性", force_reason="", base_state="", sector_signal_count=0, market_context=None, derivatives_context=None, sector_context=None, ): """Create or merge the current recommendation record for one symbol.""" raw_pct = round(rec_score * 100.0 / 30) if rec_score else 0 rec_score_pct = min(raw_pct, 100) strategy_version = str(get_meta().get("strategy_version") or "").strip() now = datetime.now().isoformat() conn = get_conn() incoming_action = normalize_action_status((entry_plan or {}).get("entry_action", "观察") if entry_plan else "观察", "active") incoming_exec, incoming_bucket, incoming_lifecycle, incoming_triggered, incoming_reason = derive_minimal_state_fields( "active", incoming_action, entry_plan or {} ) stored_signals, signal_codes_json, signal_labels_json = _serialized_signal_payload(signals) opportunity_fields = opportunity_fields_from_plan(entry_plan or {}) duplicate_cursor = conn.execute( """ SELECT * FROM recommendation WHERE symbol=%s AND status='active' AND COALESCE(display_bucket,'watch_pool') != 'history' ORDER BY id DESC LIMIT 1 """, (symbol,), ) duplicate_row = duplicate_cursor.fetchone() if hasattr(duplicate_cursor, "fetchone") else None if duplicate_row and (entry_plan or duplicate_row["rec_state"] == rec_state): existing_id = duplicate_row["id"] if hasattr(duplicate_row, "keys") else duplicate_row[0] existing_score = duplicate_row["rec_score"] or 0 merged_state = rec_state merged_score = max(existing_score, rec_score_pct) conn.execute( """ UPDATE recommendation SET rec_state=%s, rec_score=%s, sector=COALESCE(NULLIF(%s, ''), sector), signals=%s, signal_codes_json=%s, signal_labels_json=%s, is_meme=%s, direction=%s, strategy_version=%s, force_reason=COALESCE(NULLIF(%s, ''), force_reason), base_state=COALESCE(NULLIF(%s, ''), base_state), sector_signal_count=GREATEST(COALESCE(sector_signal_count,0), %s), entry_plan_json=CASE WHEN %s != '{}' THEN %s ELSE entry_plan_json END, market_context_json=%s, derivatives_context_json=%s, sector_context_json=%s, opportunity_level=COALESCE(NULLIF(%s, ''), opportunity_level), opportunity_level_label=COALESCE(NULLIF(%s, ''), opportunity_level_label), holding_horizon=COALESCE(NULLIF(%s, ''), holding_horizon), entry_model=COALESCE(NULLIF(%s, ''), entry_model), stop_model=COALESCE(NULLIF(%s, ''), stop_model), tp_model=COALESCE(NULLIF(%s, ''), tp_model), action_status=CASE WHEN action_status IN ('止盈1','止盈2','止损','跟踪止盈','衰减','反转') THEN action_status ELSE COALESCE(NULLIF(%s, ''), action_status) END, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s WHERE id=%s """, ( merged_state, merged_score, sector, stored_signals, signal_codes_json, signal_labels_json, is_meme, direction, strategy_version, force_reason or "", base_state or "", int(sector_signal_count or 0), json.dumps(entry_plan or {}, ensure_ascii=False), json.dumps(entry_plan or {}, ensure_ascii=False), json.dumps(market_context or {}, ensure_ascii=False), json.dumps(derivatives_context or {}, ensure_ascii=False), json.dumps(sector_context or {}, ensure_ascii=False), opportunity_fields["opportunity_level"], opportunity_fields["opportunity_level_label"], opportunity_fields["holding_horizon"], opportunity_fields["entry_model"], opportunity_fields["stop_model"], opportunity_fields["tp_model"], incoming_action if entry_plan else "", incoming_exec, incoming_bucket, incoming_lifecycle, incoming_triggered, incoming_reason, existing_id, ), ) conn.commit() conn.close() return existing_id cursor = conn.execute( """ INSERT INTO recommendation (symbol, rec_time, rec_state, rec_score, entry_price, stop_loss, tp1, tp2, sector, signals, signal_codes_json, signal_labels_json, is_meme, direction, current_price, max_price, min_price, last_track_time, entry_plan_json, force_reason, base_state, sector_signal_count, market_context_json, derivatives_context_json, sector_context_json, opportunity_level, opportunity_level_label, holding_horizon, entry_model, stop_model, tp_model, action_status, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, strategy_version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( symbol, now, rec_state, rec_score_pct, entry_price, stop_loss, tp1, tp2, sector, stored_signals, signal_codes_json, signal_labels_json, is_meme, direction, entry_price, entry_price, entry_price, now, json.dumps(entry_plan, ensure_ascii=False) if entry_plan else "{}", force_reason or "", base_state or "", int(sector_signal_count or 0), json.dumps(market_context or {}, ensure_ascii=False), json.dumps(derivatives_context or {}, ensure_ascii=False), json.dumps(sector_context or {}, ensure_ascii=False), opportunity_fields["opportunity_level"], opportunity_fields["opportunity_level_label"], opportunity_fields["holding_horizon"], opportunity_fields["entry_model"], opportunity_fields["stop_model"], opportunity_fields["tp_model"], incoming_action, incoming_exec, incoming_bucket, incoming_lifecycle, incoming_triggered, incoming_reason, strategy_version, ), ) rec_id = cursor.fetchone()["id"] conn.commit() conn.close() return rec_id def expire_old_recommendations(hours=48): """Mark old active recommendations as expired.""" conn = get_conn() cutoff = (datetime.now() - timedelta(hours=float(hours or 48))).isoformat() execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = derive_minimal_state_fields( "expired", "过期", {} ) conn.execute( """ UPDATE recommendation SET status='expired', action_status=CASE WHEN action_status IN ('止盈1','止盈2','止损','跟踪止盈') THEN action_status ELSE '过期' END, expired_time=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s WHERE status='active' AND rec_time < %s """, (datetime.now().isoformat(), execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, cutoff), ) conn.commit() conn.close() def apply_recommendation_state_transition(rec_id, requested_action, current_price, event_time=None, signals=None): """The single DB entry for turning price events into recommendation action state.""" event_time = event_time or datetime.now().strftime("%Y-%m-%dT%H:%M:%S") conn = get_conn() row = conn.execute("SELECT * FROM recommendation WHERE id=%s", (rec_id,)).fetchone() if not row: conn.close() return {"updated": False, "push_required": False, "reason": "not_found"} item = dict(row) previous_action = (item.get("action_status") or "持有").strip() entry_plan = normalize_entry_plan(item.get("entry_plan_json")) terminal_map = {"hit_tp2": "止盈2", "stopped_out": "止损"} status = (item.get("status") or "active").strip() final_action = normalize_action_status(terminal_map.get(status, requested_action), status) if status not in terminal_map: final_action, entry_plan, gate_reasons = apply_entry_quality_gate( action_status=final_action, entry_plan=entry_plan, signals=signals if signals is not None else item.get("signals"), current_price=current_price, market_context=normalize_json_object(item.get("market_context_json")), derivatives_context=normalize_json_object(item.get("derivatives_context_json")), sector_context=normalize_json_object(item.get("sector_context_json")), ) else: gate_reasons = [] window_entry_price = item.get("entry_price") or current_price or 0 window_rec_time = item.get("rec_time") or event_time if final_action == "可即刻买入" and previous_action != "可即刻买入": window_entry_price = current_price window_rec_time = event_time entry_window = entry_window_policy(window_entry_price, current_price, window_rec_time, event_time) if final_action == "可即刻买入" and previous_action == "可即刻买入": if entry_window["status"] == "expired": final_action = "观察" gate_reasons.append(entry_window["reason"]) elif entry_window["status"] == "price_left_up": final_action = "等回踩" gate_reasons.append(entry_window["reason"]) elif entry_window["status"] == "price_left_down": final_action = "观察" gate_reasons.append(entry_window["reason"]) should_reset_entry = final_action == "可即刻买入" and previous_action != "可即刻买入" if should_reset_entry: max_price = current_price min_price = current_price pnl_pct = 0.0 max_pnl_pct = 0.0 max_drawdown_pct = 0.0 rec_time = event_time entry_price = current_price else: old_entry = item.get("entry_price") or current_price or 0 old_max = item.get("max_price") or old_entry old_min = item.get("min_price") or old_entry max_price = max(old_max, current_price) if current_price else old_max min_price = min(old_min, current_price) if current_price else old_min entry_price = old_entry rec_time = item.get("rec_time") pnl_pct = round((current_price / old_entry - 1) * 100, 2) if old_entry and current_price else item.get("pnl_pct", 0) max_pnl_pct = round((max_price / old_entry - 1) * 100, 2) if old_entry else item.get("max_pnl_pct", 0) max_drawdown_pct = round((min_price / old_entry - 1) * 100, 2) if old_entry else item.get("max_drawdown_pct", 0) execution_status, execution_label, execution_reason = execution_fields_from_persisted_state( {**item, "action_status": final_action, "status": status}, entry_plan ) execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = state_fields_for_storage( status, final_action, execution_status, execution_reason ) push_required = final_action in ("可即刻买入", "跟踪止盈") and previous_action != final_action and execution_status in ("buy_now", "completed") conn.execute( """ UPDATE recommendation SET action_status=%s, entry_plan_json=%s, current_price=%s, max_price=%s, min_price=%s, pnl_pct=%s, max_pnl_pct=%s, max_drawdown_pct=%s, last_track_time=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s, rec_time=CASE WHEN %s=1 THEN %s ELSE rec_time END, entry_price=CASE WHEN %s=1 THEN %s ELSE entry_price END WHERE id=%s """, ( final_action, json.dumps(entry_plan, ensure_ascii=False), current_price, max_price, min_price, pnl_pct, max_pnl_pct, max_drawdown_pct, event_time, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, 1 if should_reset_entry else 0, rec_time, 1 if should_reset_entry else 0, entry_price, rec_id, ), ) conn.commit() conn.close() return { "updated": True, "id": rec_id, "symbol": item.get("symbol"), "previous_action_status": previous_action, "action_status": final_action, "execution_status": execution_status, "execution_label": execution_label, "execution_reason": execution_reason, "display_bucket": display_bucket, "lifecycle_state": lifecycle_state, "entry_triggered": entry_triggered, "entry_price": entry_price, "current_price": current_price, "pnl_pct": pnl_pct, "stop_loss": item.get("stop_loss") or entry_plan.get("stop_loss") or 0, "tp1": item.get("tp1") or entry_plan.get("tp1") or entry_plan.get("take_profit_1") or 0, "tp2": item.get("tp2") or entry_plan.get("tp2") or 0, "entry_plan": entry_plan, "entry_window": entry_window, "risk_suggestion": risk_suggestion( entry_price, item.get("stop_loss") or entry_plan.get("stop_loss") or 0, item.get("tp1") or entry_plan.get("tp1") or entry_plan.get("take_profit_1") or 0, ), "gate_reasons": gate_reasons, "push_required": push_required, "push_symbol": item.get("symbol"), "push_entry_price": entry_price, "push_current_price": current_price, "push_pnl_pct": pnl_pct, "push_signals": signals or [], } def recompute_all_recommendation_state_fields(conn=None): """Backfill unified recommendation state fields from persisted status/action.""" owns_conn = conn is None if owns_conn: conn = get_conn() rows = conn.execute("SELECT id,status,action_status,entry_plan_json FROM recommendation").fetchall() updated = 0 for row in rows: ep = normalize_entry_plan(row["entry_plan_json"]) action = normalize_action_status(row["action_status"], row["status"]) execution_status, execution_label, execution_reason = execution_fields_from_persisted_state( {"status": row["status"], "action_status": action, "entry_plan_json": row["entry_plan_json"]}, ep ) execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = state_fields_for_storage( row["status"], action, execution_status, execution_reason ) conn.execute( """UPDATE recommendation SET action_status=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s WHERE id=%s""", (action, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, row["id"]), ) updated += 1 if owns_conn: conn.commit() conn.close() return updated def update_recommendation_action_status(rec_id, action_status): """Update action status while protecting terminal trades and quality gates.""" conn = get_conn() row = conn.execute( """ SELECT symbol, status, action_status, entry_plan_json, signals, current_price, market_context_json, derivatives_context_json, sector_context_json FROM recommendation WHERE id=%s """, (rec_id,), ).fetchone() terminal_map = { "hit_tp1": "止盈1", "hit_tp2": "止盈2", "stopped_out": "止损", } entry_plan = {} if row: if row["status"] in terminal_map and action_status not in ("止盈1", "止盈2", "止损", "跟踪止盈"): action_status = terminal_map[row["status"]] else: entry_plan = normalize_entry_plan(row["entry_plan_json"]) gated_action, gated_plan, _ = apply_entry_quality_gate( action_status=action_status, entry_plan=entry_plan, signals=row["signals"], current_price=row["current_price"] or 0, market_context=normalize_json_object(row["market_context_json"]), derivatives_context=normalize_json_object(row["derivatives_context_json"]), sector_context=normalize_json_object(row["sector_context_json"]), ) action_status = gated_action entry_plan = gated_plan if row["status"] not in terminal_map and row["symbol"]: try: trade = conn.execute( "SELECT status, closed_at FROM paper_trades WHERE recommendation_id=%s", (rec_id,), ).fetchone() if trade and trade.get("status") == "closed": action_status = row["action_status"] if row["action_status"] in ("止盈1", "止盈2", "止损", "跟踪止盈") else "观察" entry_plan.setdefault("paper_trade_closed", True) entry_plan.setdefault("paper_trade_closed_at", trade.get("closed_at")) except Exception: pass if entry_plan: execution_status, execution_label, execution_reason = execution_fields_from_persisted_state( {"status": row["status"] if row else "active", "action_status": action_status, "entry_plan_json": json.dumps(entry_plan, ensure_ascii=False)}, entry_plan, ) execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = state_fields_for_storage( row["status"] if row else "active", action_status, execution_status, execution_reason ) conn.execute( """ UPDATE recommendation SET action_status=%s, entry_plan_json=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s WHERE id=%s """, (action_status, json.dumps(entry_plan, ensure_ascii=False), execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, rec_id), ) else: execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = derive_minimal_state_fields( row["status"] if row else "active", action_status, {} ) conn.execute( """ UPDATE recommendation SET action_status=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s WHERE id=%s """, (action_status, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, rec_id), ) conn.commit() conn.close()