alphax/app/db/recommendation_state.py
2026-05-29 10:09:30 +08:00

525 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Recommendation lifecycle and display-state derivation helpers."""
import json
from datetime import datetime
from app.config.config_loader import get_meta
from app.core.opportunity_lifecycle import (
apply_entry_quality_gate,
derive_display_bucket,
is_executed_lifecycle,
normalize_action_status,
)
from app.core.strategy_registry import normalize_strategy_code, strategy_label
def state_fields_for_storage(status, action_status, execution_status="", reason=""):
bucket = derive_display_bucket(status or "active", action_status, execution_status)
return (
bucket.get("execution_status", execution_status or "observe"),
bucket.get("display_bucket", "watch_pool"),
bucket.get("lifecycle_state", "watching"),
1 if is_executed_lifecycle(status or "active", action_status, bucket.get("execution_status")) else 0,
reason or "",
)
def derive_minimal_state_fields(status, action_status, entry_plan=None):
action = normalize_action_status(action_status, status)
if action == "可即刻买入":
execution_status = "buy_now"
reason = "策略确认当前入场窗口"
elif action == "等回踩":
execution_status = "wait_pullback"
reason = "等待回踩触发,未触发前不计推荐收益"
elif action == "持有":
execution_status = "holding"
reason = "已进入持仓跟踪"
elif action in ("止盈1", "止盈2", "跟踪止盈"):
execution_status = "completed"
reason = "利润管理/阶段兑现"
elif action in ("止损", "衰减", "反转", "放弃", "过期", "归档") or status in ("stopped_out", "expired", "invalid", "archived"):
execution_status = "invalid"
reason = "机会失效,归入历史复盘"
else:
execution_status = "observe"
reason = "观察池,未触发入场"
return state_fields_for_storage(status, action, execution_status, reason)
def opportunity_fields_from_plan(entry_plan):
plan = entry_plan if isinstance(entry_plan, dict) else {}
return {
"opportunity_level": str(plan.get("opportunity_level") or ""),
"opportunity_level_label": str(plan.get("opportunity_level_label") or ""),
"holding_horizon": str(plan.get("holding_horizon") or ""),
"entry_model": str(plan.get("entry_model") or ""),
"stop_model": str(plan.get("stop_model") or plan.get("stop_basis") or ""),
"tp_model": str(plan.get("tp_model") or plan.get("tp_basis") or ""),
}
def entry_window_policy(
entry_price,
current_price,
rec_time,
event_time=None,
window_hours=2.0,
up_deviation_pct=1.5,
down_deviation_pct=1.2,
):
"""Stage-1 entry window trust policy."""
event_time = event_time or datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
try:
entry_price = float(entry_price or 0)
current_price = float(current_price or 0)
except Exception:
entry_price = 0
current_price = 0
deviation_pct = round((current_price / entry_price - 1) * 100, 2) if entry_price and current_price else 0.0
age_minutes = 0.0
try:
start = datetime.fromisoformat(str(rec_time))
end = datetime.fromisoformat(str(event_time))
age_minutes = round((end - start).total_seconds() / 60.0, 1)
except Exception:
age_minutes = 0.0
remaining_minutes = round(max(0.0, window_hours * 60.0 - age_minutes), 1)
result = {
"status": "active",
"label": "入场窗口有效",
"reason": "入场窗口仍在有效期内,价格未明显脱离触发价",
"age_minutes": age_minutes,
"remaining_minutes": remaining_minutes,
"window_hours": window_hours,
"entry_price": entry_price,
"current_price": current_price,
"deviation_pct": deviation_pct,
"max_up_deviation_pct": up_deviation_pct,
"max_down_deviation_pct": down_deviation_pct,
}
if age_minutes > window_hours * 60.0:
result.update({
"status": "expired",
"label": "窗口已过期",
"reason": f"入场窗口超过有效期 {window_hours:g} 小时,避免沿用旧信号追入",
"remaining_minutes": 0.0,
})
elif deviation_pct > up_deviation_pct:
result.update({
"status": "price_left_up",
"label": "价格已上脱离",
"reason": f"当前价较触发价上脱离 {deviation_pct:.2f}%,超过 {up_deviation_pct:g}% 阈值,避免追高",
})
elif deviation_pct < -down_deviation_pct:
result.update({
"status": "price_left_down",
"label": "价格已下破",
"reason": f"当前价较触发价下破 {abs(deviation_pct):.2f}%,买点动能失效,转观察",
})
return result
def risk_suggestion(entry_price, stop_loss, tp1, risk_budget_pct=1.0, max_position_pct=100.0):
"""Convert entry/stop/TP1 into a simple position-size suggestion."""
try:
entry_price = float(entry_price or 0)
stop_loss = float(stop_loss or 0)
tp1 = float(tp1 or 0)
except Exception:
entry_price = stop_loss = tp1 = 0
stop_distance_pct = round(abs(entry_price - stop_loss) / entry_price * 100, 2) if entry_price and stop_loss else 0.0
suggested_position_pct = round(min(max_position_pct, risk_budget_pct / stop_distance_pct * 100), 2) if stop_distance_pct else 0.0
tp1_profit_pct = round((tp1 / entry_price - 1) * 100, 2) if entry_price and tp1 else 0.0
rr = round(tp1_profit_pct / stop_distance_pct, 2) if stop_distance_pct else 0.0
max_loss_pct = round(suggested_position_pct * stop_distance_pct / 100, 2) if suggested_position_pct else 0.0
return {
"risk_budget_pct": risk_budget_pct,
"stop_distance_pct": stop_distance_pct,
"suggested_position_pct": suggested_position_pct,
"max_loss_pct": max_loss_pct,
"tp1_profit_pct": tp1_profit_pct,
"rr": rr,
"max_position_pct": max_position_pct,
"valid": bool(entry_price and stop_loss and stop_distance_pct > 0),
}
def execution_fields_from_persisted_state(item, entry_plan=None):
"""Derive display execution state from persisted status/action only."""
entry_plan = entry_plan if entry_plan is not None else normalize_entry_plan(item.get("entry_plan_json"))
status = (item.get("status") or "active").strip()
action_status = normalize_action_status(item.get("action_status") or "持有", status)
bucket = derive_display_bucket(status, action_status, "")
execution_status = bucket.get("execution_status")
if execution_status == "completed":
return "completed", "✅ 已兑现,仅观察", f"该机会已进入{action_status or '利润管理'}阶段,仅作为持仓跟踪记录"
if execution_status == "invalid":
if action_status == "止损":
reason = "该机会已触发风险边界,原入场逻辑失效"
elif action_status == "衰减":
reason = "该机会已出现趋势衰减,追高性价比下降"
elif action_status == "反转":
reason = "该机会已出现趋势反转,原多头逻辑被破坏"
elif action_status == "放弃":
reason = "该机会已被标记为放弃,不再满足入场条件"
else:
reason = "该机会观察周期结束或逻辑失效,已归入历史复盘"
return "invalid", "🔴 已失效,勿追", reason
if execution_status == "buy_now":
stop = str(entry_plan.get("stop_loss", "")) if entry_plan else ""
return "buy_now", "🟢 现在可买", "推荐时就是可即刻买入;策略确认当前仍在入场窗口" + ((",风险边界 " + stop) if stop else "")
if execution_status == "wait_pullback":
gate = entry_plan.get("entry_quality_gate") or {}
if gate.get("reasons"):
reason = "等待更优位置;" + "".join(gate.get("reasons", [])[:3])
else:
reason = "等待回踩至 " + (str(entry_plan.get("entry_price", "")) if entry_plan else "参考价") + " 附近再评估"
return "wait_pullback", "🟡 等回踩,不追高", reason
if execution_status == "holding":
return "holding", "持仓跟踪", "该机会已触发入场,进入持仓跟踪"
gate = entry_plan.get("entry_quality_gate") or {}
if gate.get("reasons"):
reason = "机会结构仍在观察;" + "".join(gate.get("reasons", [])[:3])
else:
reason = "暂无明确入场窗口,继续观察"
return "observe", "观察池", reason
def normalize_entry_plan(entry_plan_json):
try:
if isinstance(entry_plan_json, dict):
return entry_plan_json
if entry_plan_json:
return json.loads(entry_plan_json)
except Exception:
pass
return {}
def normalize_json_object(payload):
try:
if isinstance(payload, dict):
return payload
if payload:
parsed = json.loads(payload)
if isinstance(parsed, dict):
return parsed
except Exception:
pass
return {}
def normalize_signals(payload):
try:
if isinstance(payload, list):
return payload
if isinstance(payload, str) and payload.strip():
parsed = json.loads(payload)
if isinstance(parsed, list):
return parsed
except Exception:
pass
return []
def observe_tier(item):
"""Observation pool tier: strong=worth user attention, weak=low-quality watch."""
status = str(item.get("execution_status") or "")
if status in ("buy_now", "wait_pullback") or item.get("display_bucket") == "realtime":
return "strong", "入场/等待类有效机会"
try:
score = float(item.get("rec_score") or 0)
except Exception:
score = 0
signals = item.get("signals") or []
if isinstance(signals, str):
signals = normalize_signals(signals)
sig_text = " ".join(str(x) for x in signals)
force_reason = str(item.get("force_reason") or "")
derivatives = normalize_json_object(item.get("derivatives_context_json") or item.get("derivatives_context"))
market = normalize_json_object(item.get("market_context_json") or item.get("market_context"))
if not derivatives and isinstance(item.get("derivatives_context"), dict):
derivatives = item.get("derivatives_context") or {}
if not market and isinstance(item.get("market_context"), dict):
market = item.get("market_context") or {}
long_pct = 0.0
try:
long_pct = float(derivatives.get("top_trader_long_pct") or 0)
except Exception:
long_pct = 0.0
acc1 = 0.0
acc4 = 0.0
try:
acc1 = float(market.get("turnover_acceleration_1h") or 0)
acc4 = float(market.get("turnover_acceleration_4h") or 0)
except Exception:
pass
stale_only = ("已过期" in sig_text or "历史" in sig_text) and not any(k in sig_text for k in ("当前", "新近", "刚刚", "入场窗口", "量价齐飞"))
weak_reasons = []
if score < 50:
weak_reasons.append(f"评分偏低({int(score)})")
if stale_only:
weak_reasons.append("主要触发来自历史/过期信号")
if "静K蓄力旁路" in force_reason and acc4 < 1.3 and acc1 < 1.3:
weak_reasons.append("静K旁路量能不足")
gate = {}
try:
ep = item.get("entry_plan") or normalize_json_object(item.get("entry_plan_json"))
gate = ep.get("entry_quality_gate") or {}
except Exception:
gate = {}
gate_reasons = gate.get("reasons") or []
gate_reason_text = "".join(str(x) for x in gate_reasons[:3])
if any("回踩参考已到" in str(x) and "不达标" in str(x) for x in gate_reasons):
return "weak" if score < 55 else "strong", (gate_reason_text or "回踩参考已到,但实时盈亏比不达标") + ";暂不构成入场窗口,继续观察是否重新恢复可买盈亏比"
strong_context = score >= 65 or long_pct >= 75 or max(acc1, acc4) >= 1.5
if weak_reasons and not strong_context:
return "weak", "".join(weak_reasons[:3])
if gate_reason_text:
return "strong", gate_reason_text + ";继续观察结构是否恢复"
return "strong", "观察池有效候选"
def derive_execution_fields(item):
entry_plan = normalize_entry_plan(item.get("entry_plan_json"))
market_context = normalize_json_object(item.get("market_context_json"))
derivatives_context = normalize_json_object(item.get("derivatives_context_json"))
sector_context = normalize_json_object(item.get("sector_context_json"))
signals = normalize_signals(item.get("signals"))
item["signals"] = signals
initial_action = normalize_action_status(entry_plan.get("entry_action") or item.get("action_status") or "持有", item.get("status") or "active")
action_status = normalize_action_status(item.get("action_status") or initial_action or "持有", item.get("status") or "active")
if action_status == "持有" and initial_action in ("可即刻买入", "等回踩", "观察"):
action_status = initial_action
current_price_for_window = item.get("latest_cache_price") or item.get("current_price") or item.get("entry_price") or 0
action_status, entry_plan, _entry_gate_reasons = apply_entry_quality_gate(
action_status=action_status,
entry_plan=entry_plan,
signals=item.get("signals"),
current_price=current_price_for_window,
market_context=market_context,
derivatives_context=derivatives_context,
sector_context=sector_context,
strategy_code=item.get("strategy_code") or entry_plan.get("strategy_code"),
)
try:
rec_score_for_gate = float(item.get("rec_score") or 0)
except Exception:
rec_score_for_gate = 0
if action_status == "可即刻买入" and rec_score_for_gate > 0 and rec_score_for_gate < 25:
reasons = [f"推荐评分{rec_score_for_gate:g}<25属于信号不足禁止展示为现价买入"]
gate = entry_plan.get("entry_quality_gate") if isinstance(entry_plan.get("entry_quality_gate"), dict) else {}
existing_reasons = list(gate.get("reasons") or [])
entry_plan["entry_quality_gate"] = {
**gate,
"blocked_action": gate.get("blocked_action") or action_status,
"final_action": "观察",
"reasons": existing_reasons + reasons,
}
action_status = "观察"
if initial_action == "可即刻买入" and action_status != "可即刻买入":
initial_action = action_status
status = (item.get("status") or "active").strip()
force_reason = (item.get("force_reason") or "").strip()
base_state = (item.get("base_state") or "").strip()
sector_signal_count = item.get("sector_signal_count")
strategy_version = str(item.get("strategy_version") or "").strip()
if not strategy_version:
strategy_version = str(get_meta().get("strategy_version") or "").strip()
if current_price_for_window:
item["current_price"] = current_price_for_window
try:
entry_price_for_pnl = float(item.get("entry_price") or 0)
current_price_float = float(current_price_for_window or 0)
if entry_price_for_pnl > 0 and current_price_float > 0:
item["pnl_pct"] = round((current_price_float - entry_price_for_pnl) / entry_price_for_pnl * 100, 2)
except Exception:
pass
if item.get("latest_cache_updated_at"):
item["current_price_updated_at"] = item.get("latest_cache_updated_at")
entry_window = entry_window_policy(
entry_plan.get("entry_price") or item.get("entry_price") or 0,
current_price_for_window,
item.get("rec_time") or "",
) if action_status == "可即刻买入" else {}
if entry_window.get("status") == "price_left_down":
try:
current_for_entry = float(current_price_for_window or 0)
stop_for_entry = float(entry_plan.get("stop_loss") or item.get("stop_loss") or 0)
rr_live_ok = entry_plan.get("risk_reward_ok_live") is True or entry_plan.get("risk_reward_ok") is True
trigger_ok = entry_plan.get("entry_trigger_confirmed") is True or int(item.get("entry_triggered") or 0) == 1
if current_for_entry > stop_for_entry > 0 and rr_live_ok and trigger_ok:
entry_window = {
**entry_window,
"status": "active",
"label": "价格更优",
"reason": "当前价低于计划入场价,但尚未跌破止损且实时盈亏比仍合格,可按更优价格执行",
}
except Exception:
pass
if action_status == "可即刻买入" and entry_window:
window_status = entry_window.get("status")
if window_status in ("expired", "price_left_down"):
action_status = "观察"
elif window_status == "price_left_up":
action_status = "等回踩"
if window_status and window_status != "active":
item["entry_window_alert"] = entry_window
item_for_execution = {**item, "action_status": action_status}
execution_status, execution_label, execution_reason = execution_fields_from_persisted_state(item_for_execution, entry_plan)
bucket_fields = derive_display_bucket(status, action_status, execution_status)
execution_status = bucket_fields.get("execution_status") or execution_status
item["initial_action"] = initial_action
item["action_status"] = normalize_action_status(action_status, status)
item["execution_status"] = execution_status
item["execution_label"] = execution_label
item["execution_reason"] = execution_reason
if item.get("entry_window_alert") and item["action_status"] == "可即刻买入":
item["action_status"] = "等回踩" if item["entry_window_alert"].get("status") == "price_left_up" else "观察"
execution_status, execution_label, execution_reason = execution_fields_from_persisted_state(
{**item, "action_status": item["action_status"], "status": status}, entry_plan
)
item["execution_status"] = execution_status
item["execution_label"] = execution_label
item["execution_reason"] = execution_reason
item["display_bucket"] = bucket_fields.get("display_bucket")
item["lifecycle_state"] = bucket_fields.get("lifecycle_state")
bucket_fields = derive_display_bucket(status, item["action_status"], item["execution_status"])
item["execution_status"] = bucket_fields.get("execution_status") or item["execution_status"]
item["display_bucket"] = bucket_fields.get("display_bucket")
item["lifecycle_state"] = bucket_fields.get("lifecycle_state")
item["entry_triggered"] = 1 if is_executed_lifecycle(status, item["action_status"], item["execution_status"]) else 0
observe_tier_value, observe_reason = observe_tier(item)
item["observe_tier"] = observe_tier_value
item["observe_reason"] = observe_reason
item["entry_plan"] = entry_plan
opportunity_fields = opportunity_fields_from_plan(entry_plan)
for key, value in opportunity_fields.items():
item[key] = item.get(key) or value
if item.get("opportunity_level") and not item.get("opportunity_level_label"):
try:
from app.core.opportunity_level import opportunity_level_meta
meta = opportunity_level_meta(item["opportunity_level"])
item["opportunity_level_label"] = meta.get("label", "")
item["holding_horizon"] = item.get("holding_horizon") or meta.get("holding_horizon", "")
item["entry_model"] = item.get("entry_model") or meta.get("entry_model", "")
item["stop_model"] = item.get("stop_model") or meta.get("stop_model", "")
item["tp_model"] = item.get("tp_model") or meta.get("tp_model", "")
except Exception:
pass
item["entry_window"] = entry_window
if entry_window and entry_window.get("status") != "active":
item["entry_window_alert"] = entry_window
item["risk_suggestion"] = risk_suggestion(
entry_plan.get("entry_price") or item.get("entry_price") or 0,
entry_plan.get("stop_loss") or item.get("stop_loss") or 0,
entry_plan.get("tp1") or entry_plan.get("take_profit_1") or item.get("tp1") or 0,
)
item["market_context"] = market_context
item["derivatives_context"] = derivatives_context
item["sector_context"] = sector_context
item["force_reason"] = force_reason
item["base_state"] = base_state
item["sector_signal_count"] = sector_signal_count
item["strategy_version"] = strategy_version
item["strategy_version_label"] = f"策略版本 {strategy_version}" if strategy_version else ""
strategy_code = normalize_strategy_code(item.get("strategy_code"))
item["strategy_code"] = strategy_code
item["strategy_name"] = strategy_label(strategy_code)
try:
item["strategy_snapshot"] = json.loads(item.get("strategy_snapshot_json") or "{}")
except Exception:
item["strategy_snapshot"] = {}
try:
item["factor_roles"] = json.loads(item.get("factor_roles_json") or "{}")
except Exception:
item["factor_roles"] = {}
attach_discovery_trade_fields(item)
return item
def is_actionable_execution_status(status):
return status in ("buy_now", "wait_pullback")
def attach_discovery_trade_fields(item):
"""Split discovery state from trade execution stage."""
discovery_state = str(item.get("rec_state") or "").strip()
trade_stage = str(item.get("execution_status") or "observe").strip() or "observe"
trade_label = item.get("execution_label") or ""
discovery_label_map = {
"爆发": "发现爆发",
"加速": "发现加速",
"蓄力": "发现蓄力",
"过期": "发现过期",
}
trade_label_map = {
"buy_now": "现在可买",
"wait_pullback": "等回踩",
"observe": "观察中",
"holding": "持仓跟踪",
"completed": "已兑现",
"invalid": "已失效",
}
item["discovery_state"] = discovery_state
item["discovery_label"] = discovery_label_map.get(discovery_state, discovery_state or "发现观察")
item["trade_stage"] = trade_stage
item["trade_stage_label"] = trade_label or trade_label_map.get(trade_stage, trade_stage or "观察中")
item["is_discovery_burst"] = discovery_state == "爆发"
item["is_executable_now"] = trade_stage == "buy_now"
item["is_trade_candidate"] = trade_stage in ("buy_now", "wait_pullback")
item["is_watch_pool"] = trade_stage in ("wait_pullback", "observe") or item.get("display_bucket") == "watch_pool"
return item
def is_executed_trade(item):
"""Only truly triggered/position/closed samples count as executed PnL."""
status = (item.get("status") or "").strip()
action_status = normalize_action_status(item.get("action_status"), status)
execution_status = item.get("execution_status") or ""
try:
entry_triggered = int(item.get("entry_triggered") or 0) == 1
except Exception:
entry_triggered = False
if entry_triggered:
return True
if status in ("hit_tp1", "hit_tp2", "stopped_out"):
return True
if item.get("display_bucket") == "position" or execution_status in ("holding", "completed"):
return True
return is_executed_lifecycle(status, action_status, execution_status)
def classify_recommendation_result(item):
"""Classify recommendation outcome without counting untriggered watch items as trades."""
status = item.get("status") or ""
pnl_pct = item.get("pnl_pct") or 0
max_pnl_pct = item.get("max_pnl_pct") or 0
max_drawdown_pct = item.get("max_drawdown_pct") or 0
if status in ("hit_tp1", "hit_tp2"):
return "success", "✅ 止盈成功"
if status == "stopped_out":
return "failed", "❌ 止损失败"
if not is_executed_trade(item):
return "pending", "⏳ 未执行"
if status == "expired":
if max_pnl_pct >= 5:
return "success", "✅ 交易成功"
if pnl_pct <= -3 or max_drawdown_pct <= -5:
return "failed", "❌ 交易失败"
return "pending", "⏳ 跟踪中"
if status == "active":
if max_pnl_pct >= 5:
return "success", "✅ 交易成功"
if pnl_pct <= -3 or max_drawdown_pct <= -5:
return "failed", "❌ 交易失败"
return "pending", "⏳ 跟踪中"
return "pending", "⏳ 未执行"