alphax/app/db/recommendation_commands.py
2026-05-26 21:10:22 +08:00

556 lines
23 KiB
Python

"""Recommendation write commands and lifecycle transitions."""
import json
from datetime import datetime, timedelta
from app.config.config_loader import get_meta
from app.core.opportunity_lifecycle import (
apply_entry_quality_gate,
normalize_action_status,
normalize_json_object,
)
from app.core.signal_taxonomy import signal_codes as build_signal_codes, signal_labels as build_signal_labels
from app.db.recommendation_state import (
derive_minimal_state_fields,
entry_window_policy,
execution_fields_from_persisted_state,
normalize_entry_plan,
normalize_signals,
opportunity_fields_from_plan,
risk_suggestion,
state_fields_for_storage,
)
from app.db.schema import get_conn
def _merged_signal_payload(*payloads):
merged = []
seen = set()
for payload in payloads:
values = []
if isinstance(payload, list):
values = payload
elif isinstance(payload, str) and payload.strip():
try:
parsed = json.loads(payload)
values = parsed if isinstance(parsed, list) else [payload]
except Exception:
values = [payload]
for value in values:
key = str(value)
if key not in seen:
seen.add(key)
merged.append(value)
return merged
def _serialized_signal_payload(signals):
labels = build_signal_labels(signals if isinstance(signals, list) else normalize_signals(signals))
codes = build_signal_codes(labels)
stored_signals = json.dumps(labels, ensure_ascii=False) if isinstance(signals, list) else signals
return stored_signals, json.dumps(codes, ensure_ascii=False), json.dumps(labels, ensure_ascii=False)
def create_recommendation(
symbol,
rec_state,
rec_score,
entry_price,
stop_loss=0,
tp1=0,
tp2=0,
sector="",
signals="",
is_meme=0,
entry_plan=None,
direction="中性",
force_reason="",
base_state="",
sector_signal_count=0,
market_context=None,
derivatives_context=None,
sector_context=None,
):
"""Create or merge the current recommendation record for one symbol."""
raw_pct = round(rec_score * 100.0 / 30) if rec_score else 0
rec_score_pct = min(raw_pct, 100)
strategy_version = str(get_meta().get("strategy_version") or "").strip()
now = datetime.now().isoformat()
conn = get_conn()
incoming_action = normalize_action_status((entry_plan or {}).get("entry_action", "观察") if entry_plan else "观察", "active")
incoming_exec, incoming_bucket, incoming_lifecycle, incoming_triggered, incoming_reason = derive_minimal_state_fields(
"active", incoming_action, entry_plan or {}
)
stored_signals, signal_codes_json, signal_labels_json = _serialized_signal_payload(signals)
opportunity_fields = opportunity_fields_from_plan(entry_plan or {})
duplicate_cursor = conn.execute(
"""
SELECT * FROM recommendation
WHERE symbol=%s AND status='active' AND COALESCE(display_bucket,'watch_pool') != 'history'
ORDER BY id DESC LIMIT 1
""",
(symbol,),
)
duplicate_row = duplicate_cursor.fetchone() if hasattr(duplicate_cursor, "fetchone") else None
if duplicate_row and (entry_plan or duplicate_row["rec_state"] == rec_state):
existing_id = duplicate_row["id"] if hasattr(duplicate_row, "keys") else duplicate_row[0]
existing_score = duplicate_row["rec_score"] or 0
merged_state = rec_state
merged_score = max(existing_score, rec_score_pct)
conn.execute(
"""
UPDATE recommendation
SET rec_state=%s, rec_score=%s, sector=COALESCE(NULLIF(%s, ''), sector),
signals=%s, signal_codes_json=%s, signal_labels_json=%s, is_meme=%s, direction=%s, strategy_version=%s,
force_reason=COALESCE(NULLIF(%s, ''), force_reason),
base_state=COALESCE(NULLIF(%s, ''), base_state),
sector_signal_count=GREATEST(COALESCE(sector_signal_count,0), %s),
entry_plan_json=CASE WHEN %s != '{}' THEN %s ELSE entry_plan_json END,
market_context_json=%s, derivatives_context_json=%s, sector_context_json=%s,
opportunity_level=COALESCE(NULLIF(%s, ''), opportunity_level),
opportunity_level_label=COALESCE(NULLIF(%s, ''), opportunity_level_label),
holding_horizon=COALESCE(NULLIF(%s, ''), holding_horizon),
entry_model=COALESCE(NULLIF(%s, ''), entry_model),
stop_model=COALESCE(NULLIF(%s, ''), stop_model),
tp_model=COALESCE(NULLIF(%s, ''), tp_model),
action_status=CASE
WHEN action_status IN ('止盈1','止盈2','止损','跟踪止盈','衰减','反转') THEN action_status
ELSE COALESCE(NULLIF(%s, ''), action_status)
END,
execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s
WHERE id=%s
""",
(
merged_state,
merged_score,
sector,
stored_signals,
signal_codes_json,
signal_labels_json,
is_meme,
direction,
strategy_version,
force_reason or "",
base_state or "",
int(sector_signal_count or 0),
json.dumps(entry_plan or {}, ensure_ascii=False),
json.dumps(entry_plan or {}, ensure_ascii=False),
json.dumps(market_context or {}, ensure_ascii=False),
json.dumps(derivatives_context or {}, ensure_ascii=False),
json.dumps(sector_context or {}, ensure_ascii=False),
opportunity_fields["opportunity_level"],
opportunity_fields["opportunity_level_label"],
opportunity_fields["holding_horizon"],
opportunity_fields["entry_model"],
opportunity_fields["stop_model"],
opportunity_fields["tp_model"],
incoming_action if entry_plan else "",
incoming_exec,
incoming_bucket,
incoming_lifecycle,
incoming_triggered,
incoming_reason,
existing_id,
),
)
conn.commit()
conn.close()
return existing_id
cursor = conn.execute(
"""
INSERT INTO recommendation (symbol, rec_time, rec_state, rec_score, entry_price,
stop_loss, tp1, tp2, sector, signals, signal_codes_json, signal_labels_json, is_meme, direction,
current_price, max_price, min_price, last_track_time, entry_plan_json,
force_reason, base_state, sector_signal_count,
market_context_json, derivatives_context_json, sector_context_json,
opportunity_level, opportunity_level_label, holding_horizon, entry_model, stop_model, tp_model,
action_status, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason,
strategy_version)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
symbol,
now,
rec_state,
rec_score_pct,
entry_price,
stop_loss,
tp1,
tp2,
sector,
stored_signals,
signal_codes_json,
signal_labels_json,
is_meme,
direction,
entry_price,
entry_price,
entry_price,
now,
json.dumps(entry_plan, ensure_ascii=False) if entry_plan else "{}",
force_reason or "",
base_state or "",
int(sector_signal_count or 0),
json.dumps(market_context or {}, ensure_ascii=False),
json.dumps(derivatives_context or {}, ensure_ascii=False),
json.dumps(sector_context or {}, ensure_ascii=False),
opportunity_fields["opportunity_level"],
opportunity_fields["opportunity_level_label"],
opportunity_fields["holding_horizon"],
opportunity_fields["entry_model"],
opportunity_fields["stop_model"],
opportunity_fields["tp_model"],
incoming_action,
incoming_exec,
incoming_bucket,
incoming_lifecycle,
incoming_triggered,
incoming_reason,
strategy_version,
),
)
rec_id = cursor.fetchone()["id"]
conn.commit()
conn.close()
return rec_id
def expire_old_recommendations(hours=48):
"""Mark old active recommendations as expired."""
conn = get_conn()
cutoff = (datetime.now() - timedelta(hours=float(hours or 48))).isoformat()
execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = derive_minimal_state_fields(
"expired", "过期", {}
)
conn.execute(
"""
UPDATE recommendation
SET status='expired',
action_status=CASE WHEN action_status IN ('止盈1','止盈2','止损','跟踪止盈') THEN action_status ELSE '过期' END,
expired_time=%s,
execution_status=%s,
display_bucket=%s,
lifecycle_state=%s,
entry_triggered=%s,
state_reason=%s
WHERE status='active' AND rec_time < %s
""",
(datetime.now().isoformat(), execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, cutoff),
)
conn.commit()
conn.close()
def downgrade_active_entries_for_market_risk(reason: str, event_time: str | None = None) -> dict:
"""Downgrade active executable recommendations when global market risk blocks entries."""
event_time = event_time or datetime.now().isoformat()
reason = str(reason or "全市场风险过高,暂停新开仓与新挂单").strip()
conn = get_conn()
rows = conn.execute(
"""
SELECT id, entry_plan_json, action_status
FROM recommendation
WHERE status='active'
AND (
COALESCE(action_status,'') IN ('可即刻买入','等回踩')
OR COALESCE(execution_status,'') IN ('buy_now','wait_pullback')
)
ORDER BY id DESC
"""
).fetchall()
updated = 0
execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = derive_minimal_state_fields(
"active", "观察", {}
)
for row in rows:
entry_plan = normalize_json_object(row["entry_plan_json"])
previous_action = str(row["action_status"] or "").strip()
entry_plan["market_risk_gate"] = {
"blocked_action": previous_action,
"final_action": "观察",
"risk_level": "critical",
"reasons": [reason],
"updated_at": event_time,
}
entry_plan["entry_action"] = "观察"
conn.execute(
"""
UPDATE recommendation
SET action_status='观察',
execution_status=%s,
display_bucket=%s,
lifecycle_state=%s,
entry_triggered=%s,
state_reason=%s,
entry_plan_json=%s
WHERE id=%s
""",
(
execution_status,
display_bucket,
lifecycle_state,
entry_triggered,
state_reason,
json.dumps(entry_plan, ensure_ascii=False),
row["id"],
),
)
updated += 1
conn.commit()
conn.close()
return {"updated_count": updated, "reason": reason, "updated_at": event_time}
def apply_recommendation_state_transition(rec_id, requested_action, current_price, event_time=None, signals=None):
"""The single DB entry for turning price events into recommendation action state."""
event_time = event_time or datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
conn = get_conn()
row = conn.execute("SELECT * FROM recommendation WHERE id=%s", (rec_id,)).fetchone()
if not row:
conn.close()
return {"updated": False, "push_required": False, "reason": "not_found"}
item = dict(row)
previous_action = (item.get("action_status") or "持有").strip()
entry_plan = normalize_entry_plan(item.get("entry_plan_json"))
terminal_map = {"hit_tp2": "止盈2", "stopped_out": "止损"}
status = (item.get("status") or "active").strip()
final_action = normalize_action_status(terminal_map.get(status, requested_action), status)
if status not in terminal_map:
final_action, entry_plan, gate_reasons = apply_entry_quality_gate(
action_status=final_action,
entry_plan=entry_plan,
signals=_merged_signal_payload(item.get("signals"), signals) if signals is not None else item.get("signals"),
current_price=current_price,
market_context=normalize_json_object(item.get("market_context_json")),
derivatives_context=normalize_json_object(item.get("derivatives_context_json")),
sector_context=normalize_json_object(item.get("sector_context_json")),
)
else:
gate_reasons = []
window_entry_price = item.get("entry_price") or current_price or 0
window_rec_time = item.get("rec_time") or event_time
if final_action == "可即刻买入" and previous_action != "可即刻买入":
window_entry_price = current_price
window_rec_time = event_time
entry_window = entry_window_policy(window_entry_price, current_price, window_rec_time, event_time)
if final_action == "可即刻买入" and previous_action == "可即刻买入":
if entry_window["status"] == "expired":
final_action = "观察"
gate_reasons.append(entry_window["reason"])
elif entry_window["status"] == "price_left_up":
final_action = "等回踩"
gate_reasons.append(entry_window["reason"])
elif entry_window["status"] == "price_left_down":
final_action = "观察"
gate_reasons.append(entry_window["reason"])
should_reset_entry = final_action == "可即刻买入" and previous_action != "可即刻买入"
if should_reset_entry:
max_price = current_price
min_price = current_price
pnl_pct = 0.0
max_pnl_pct = 0.0
max_drawdown_pct = 0.0
rec_time = event_time
entry_price = current_price
else:
old_entry = item.get("entry_price") or current_price or 0
old_max = item.get("max_price") or old_entry
old_min = item.get("min_price") or old_entry
max_price = max(old_max, current_price) if current_price else old_max
min_price = min(old_min, current_price) if current_price else old_min
entry_price = old_entry
rec_time = item.get("rec_time")
pnl_pct = round((current_price / old_entry - 1) * 100, 2) if old_entry and current_price else item.get("pnl_pct", 0)
max_pnl_pct = round((max_price / old_entry - 1) * 100, 2) if old_entry else item.get("max_pnl_pct", 0)
max_drawdown_pct = round((min_price / old_entry - 1) * 100, 2) if old_entry else item.get("max_drawdown_pct", 0)
execution_status, execution_label, execution_reason = execution_fields_from_persisted_state(
{**item, "action_status": final_action, "status": status}, entry_plan
)
execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = state_fields_for_storage(
status, final_action, execution_status, execution_reason
)
push_required = final_action in ("可即刻买入", "跟踪止盈") and previous_action != final_action and execution_status in ("buy_now", "completed")
conn.execute(
"""
UPDATE recommendation
SET action_status=%s, entry_plan_json=%s, current_price=%s, max_price=%s, min_price=%s,
pnl_pct=%s, max_pnl_pct=%s, max_drawdown_pct=%s, last_track_time=%s,
execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s,
rec_time=CASE WHEN %s=1 THEN %s ELSE rec_time END,
entry_price=CASE WHEN %s=1 THEN %s ELSE entry_price END
WHERE id=%s
""",
(
final_action,
json.dumps(entry_plan, ensure_ascii=False),
current_price,
max_price,
min_price,
pnl_pct,
max_pnl_pct,
max_drawdown_pct,
event_time,
execution_status,
display_bucket,
lifecycle_state,
entry_triggered,
state_reason,
1 if should_reset_entry else 0,
rec_time,
1 if should_reset_entry else 0,
entry_price,
rec_id,
),
)
conn.commit()
conn.close()
return {
"updated": True,
"id": rec_id,
"symbol": item.get("symbol"),
"previous_action_status": previous_action,
"action_status": final_action,
"execution_status": execution_status,
"execution_label": execution_label,
"execution_reason": execution_reason,
"display_bucket": display_bucket,
"lifecycle_state": lifecycle_state,
"entry_triggered": entry_triggered,
"entry_price": entry_price,
"current_price": current_price,
"pnl_pct": pnl_pct,
"stop_loss": item.get("stop_loss") or entry_plan.get("stop_loss") or 0,
"tp1": item.get("tp1") or entry_plan.get("tp1") or entry_plan.get("take_profit_1") or 0,
"tp2": item.get("tp2") or entry_plan.get("tp2") or 0,
"entry_plan": entry_plan,
"entry_window": entry_window,
"risk_suggestion": risk_suggestion(
entry_price,
item.get("stop_loss") or entry_plan.get("stop_loss") or 0,
item.get("tp1") or entry_plan.get("tp1") or entry_plan.get("take_profit_1") or 0,
),
"gate_reasons": gate_reasons,
"push_required": push_required,
"push_symbol": item.get("symbol"),
"push_entry_price": entry_price,
"push_current_price": current_price,
"push_pnl_pct": pnl_pct,
"push_signals": signals or [],
}
def recompute_all_recommendation_state_fields(conn=None):
"""Backfill unified recommendation state fields from persisted status/action."""
owns_conn = conn is None
if owns_conn:
conn = get_conn()
rows = conn.execute("SELECT id,status,action_status,entry_plan_json FROM recommendation").fetchall()
updated = 0
for row in rows:
ep = normalize_entry_plan(row["entry_plan_json"])
action = normalize_action_status(row["action_status"], row["status"])
execution_status, execution_label, execution_reason = execution_fields_from_persisted_state(
{"status": row["status"], "action_status": action, "entry_plan_json": row["entry_plan_json"]}, ep
)
execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = state_fields_for_storage(
row["status"], action, execution_status, execution_reason
)
conn.execute(
"""UPDATE recommendation
SET action_status=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s
WHERE id=%s""",
(action, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, row["id"]),
)
updated += 1
if owns_conn:
conn.commit()
conn.close()
return updated
def update_recommendation_action_status(rec_id, action_status):
"""Update action status while protecting terminal trades and quality gates."""
conn = get_conn()
row = conn.execute(
"""
SELECT symbol, status, action_status, entry_plan_json, signals, current_price,
market_context_json, derivatives_context_json, sector_context_json
FROM recommendation WHERE id=%s
""",
(rec_id,),
).fetchone()
terminal_map = {
"hit_tp1": "止盈1",
"hit_tp2": "止盈2",
"stopped_out": "止损",
}
entry_plan = {}
if row:
if row["status"] in terminal_map and action_status not in ("止盈1", "止盈2", "止损", "跟踪止盈"):
action_status = terminal_map[row["status"]]
else:
entry_plan = normalize_entry_plan(row["entry_plan_json"])
gated_action, gated_plan, _ = apply_entry_quality_gate(
action_status=action_status,
entry_plan=entry_plan,
signals=row["signals"],
current_price=row["current_price"] or 0,
market_context=normalize_json_object(row["market_context_json"]),
derivatives_context=normalize_json_object(row["derivatives_context_json"]),
sector_context=normalize_json_object(row["sector_context_json"]),
)
action_status = gated_action
entry_plan = gated_plan
if row["status"] not in terminal_map and row["symbol"]:
try:
trade = conn.execute(
"SELECT status, closed_at FROM paper_trades WHERE recommendation_id=%s",
(rec_id,),
).fetchone()
if trade and trade.get("status") == "closed":
action_status = row["action_status"] if row["action_status"] in ("止盈1", "止盈2", "止损", "跟踪止盈") else "观察"
entry_plan.setdefault("paper_trade_closed", True)
entry_plan.setdefault("paper_trade_closed_at", trade.get("closed_at"))
except Exception:
pass
if entry_plan:
execution_status, execution_label, execution_reason = execution_fields_from_persisted_state(
{"status": row["status"] if row else "active", "action_status": action_status, "entry_plan_json": json.dumps(entry_plan, ensure_ascii=False)},
entry_plan,
)
execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = state_fields_for_storage(
row["status"] if row else "active", action_status, execution_status, execution_reason
)
conn.execute(
"""
UPDATE recommendation
SET action_status=%s, entry_plan_json=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s
WHERE id=%s
""",
(action_status, json.dumps(entry_plan, ensure_ascii=False), execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, rec_id),
)
else:
execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = derive_minimal_state_fields(
row["status"] if row else "active", action_status, {}
)
conn.execute(
"""
UPDATE recommendation
SET action_status=%s, execution_status=%s, display_bucket=%s, lifecycle_state=%s, entry_triggered=%s, state_reason=%s
WHERE id=%s
""",
(action_status, execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason, rec_id),
)
conn.commit()
conn.close()