This commit is contained in:
aaron 2026-05-20 08:49:03 +08:00
parent ea7d9eab63
commit ec9ec9f3cc
5 changed files with 229 additions and 37 deletions

View File

@ -179,7 +179,14 @@ def has_current_entry_trigger(signals: Iterable[Any], entry_plan: Dict[str, Any]
return True
for sig in signals or []:
text = str(sig)
if "15min即刻入场" in text or "当前15min" in text or "当前 15min" in text:
if (
"15min即刻入场" in text
or "当前15min" in text
or "当前 15min" in text
or "回踩确认完毕" in text
or "可即刻入场" in text
or "15min动K确认" in text
):
return True
return False
@ -311,6 +318,8 @@ def apply_entry_quality_gate(
entry_plan.pop("entry_quality_gate", None)
breakout_distance = detect_breakout_distance_pct(signals)
change_24h = to_float(market_context.get("change_24h"))
current_entry_trigger = has_current_entry_trigger(signals, entry_plan)
bearish_flow_risk = has_bearish_flow_risk(signals)
lifecycle, ambush_reasons = build_low_ambush_plan(
entry_plan=entry_plan,
@ -331,17 +340,19 @@ def apply_entry_quality_gate(
})
if action_status in ("可即刻买入", "等回踩"):
if original_risk_reward_ok is False:
if risk_reward_ok is False:
reasons.append(f"risk_reward_ok=false盈亏比闸门禁止现价买入实时rr1={rr1}")
if "rr1" in entry_plan and original_rr1 < _cfg_value(cfg, "min_rr_buy_now"):
reasons.append(f"rr1={original_rr1} < {_cfg_value(cfg, 'min_rr_buy_now')},禁止现价买入")
if "rr1" in entry_plan and rr1 < _cfg_value(cfg, "min_rr_buy_now"):
reasons.append(f"rr1={rr1} < {_cfg_value(cfg, 'min_rr_buy_now')},禁止现价买入")
if action_status == "可即刻买入":
if level_max_action in ("observe", "wait_pullback"):
if level_max_action == "observe":
reasons.append(f"{entry_plan.get('opportunity_level_label') or opportunity_level or '该机会'}级别最高只允许{level_meta.get('label') and ('观察/等待') or '观察/等待'},禁止现价买入")
if not has_current_entry_trigger(signals, entry_plan):
elif level_max_action == "wait_pullback" and not current_entry_trigger:
reasons.append(f"{entry_plan.get('opportunity_level_label') or opportunity_level or '该机会'}级别需要低周期触发后才允许买入")
if not current_entry_trigger:
reasons.append("缺少当前15min触发禁止现价买入")
if has_bearish_flow_risk(signals):
if bearish_flow_risk:
reasons.append("出现空头加速/放量阴线风险,禁止现价买入")
if current_price > 0:
plan_entry_price = to_float(entry_plan.get("entry_price"))
@ -374,11 +385,11 @@ def apply_entry_quality_gate(
if risk_reward_ok is False or rr1 < _cfg_value(cfg, "min_rr_buy_now"):
target_action = "观察"
reasons.append("回踩参考已到,但实时盈亏比不达标,转为观察")
elif level_max_action in ("observe", "wait_pullback") or has_bearish_flow_risk(signals):
target_action = "等回踩" if level_max_action == "wait_pullback" else "观察"
if level_max_action in ("observe", "wait_pullback"):
elif level_max_action == "observe" or (level_max_action == "wait_pullback" and not current_entry_trigger) or bearish_flow_risk:
target_action = "等回踩" if level_max_action == "wait_pullback" and not bearish_flow_risk else "观察"
if level_max_action == "observe" or (level_max_action == "wait_pullback" and not current_entry_trigger):
reasons.append(f"{entry_plan.get('opportunity_level_label') or opportunity_level or '该机会'}级别最高只允许观察/等待,不能因到价直接升级为现价买入")
if has_bearish_flow_risk(signals):
if bearish_flow_risk:
reasons.append("出现空头加速/放量阴线风险,到价也不升级为现价买入")
else:
target_action = "可即刻买入"
@ -402,7 +413,7 @@ def apply_entry_quality_gate(
if any("回踩参考已下破" in str(x) for x in reasons):
target_action = "观察"
elif any("回踩参考已到或更优" in str(x) for x in reasons) and not (
level_max_action in ("observe", "wait_pullback") or has_bearish_flow_risk(signals)
level_max_action == "observe" or (level_max_action == "wait_pullback" and not current_entry_trigger) or bearish_flow_risk
):
target_action = "可即刻买入"
elif action_status == "等回踩" and current_price > 0 and to_float(entry_plan.get("entry_price")) > 0 and current_price <= to_float(entry_plan.get("entry_price")) * 1.003 and (risk_reward_ok is False or rr1 < _cfg_value(cfg, "min_rr_buy_now")):

View File

@ -23,6 +23,27 @@ from app.db.recommendation_state import (
from app.db.schema import get_conn
def _merged_signal_payload(*payloads):
merged = []
seen = set()
for payload in payloads:
values = []
if isinstance(payload, list):
values = payload
elif isinstance(payload, str) and payload.strip():
try:
parsed = json.loads(payload)
values = parsed if isinstance(parsed, list) else [payload]
except Exception:
values = [payload]
for value in values:
key = str(value)
if key not in seen:
seen.add(key)
merged.append(value)
return merged
def _serialized_signal_payload(signals):
labels = build_signal_labels(signals if isinstance(signals, list) else normalize_signals(signals))
codes = build_signal_codes(labels)
@ -243,7 +264,7 @@ def apply_recommendation_state_transition(rec_id, requested_action, current_pric
final_action, entry_plan, gate_reasons = apply_entry_quality_gate(
action_status=final_action,
entry_plan=entry_plan,
signals=signals if signals is not None else item.get("signals"),
signals=_merged_signal_payload(item.get("signals"), signals) if signals is not None else item.get("signals"),
current_price=current_price,
market_context=normalize_json_object(item.get("market_context_json")),
derivatives_context=normalize_json_object(item.get("derivatives_context_json")),

View File

@ -89,6 +89,49 @@ def symbol_recently_closed(symbol: str, hours: int = 8) -> bool:
return ((row[0] or 0) + (paper_row[0] or 0)) > 0
def _active_recommendation_id(symbol: str) -> int:
"""Return the current non-history recommendation id for merge/write diagnostics."""
conn = get_conn()
try:
row = conn.execute(
"""
SELECT id FROM recommendation
WHERE symbol=%s AND status='active' AND COALESCE(display_bucket,'watch_pool') != 'history'
ORDER BY id DESC LIMIT 1
""",
(symbol,),
).fetchone()
return int(row["id"] if row else 0)
finally:
conn.close()
def _log_confirmed_screening(symbol, result, cand, cand_detail, recommendation_meta=None):
recommendation_meta = recommendation_meta or {}
detail = {
"candidate_stage": "trade_confirm",
"confirmed": True,
"confirmation_status": "confirmed",
"final_action": (result.get("entry_plan") or {}).get("entry_action", ""),
"fresh_reason": result.get("fresh_reason", ""),
"trigger_context": result.get("trigger_context") or {},
"entry_plan": result.get("entry_plan") or {},
**recommendation_meta,
}
log_screening(
layer="确认", symbol=symbol, state="爆发", score=result["score"],
price=result["price"], signals=result["signals"],
sector=cand_detail.get("sector", cand.get("sector", "")),
leader_status=cand_detail.get("leader_status", cand.get("leader_status", "")),
is_meme=int(is_meme_coin(symbol)),
detail=build_screening_detail(
layer="确认",
state="爆发",
signals=result.get("signals", []),
detail=detail,
),
)
def _event_time_from_age(df, age_bars: int):
"""把 age_bars 转成K线时间用于候选信号新鲜度判断。"""
@ -150,8 +193,13 @@ def _build_trigger_context(fresh_reason, fresh_events, vp_data=None, stale_vp_co
stale.append({"type": "technical", "label": "历史1H起爆点", "source": "pa_engine_1h", "count": len(stale_1h_ignitions)})
if stale_d1_ignitions:
stale.append({"type": "technical", "label": "历史日线起爆点", "source": "pa_engine_1d", "count": len(stale_d1_ignitions)})
fresh_event_bucket = current
fresh_event_label = "当前结构触发"
if fresh_reason == "stale_structure_background_only":
fresh_event_bucket = stale
fresh_event_label = "历史结构触发"
for e in fresh_events:
current.append({"type": "technical", "label": "当前结构触发", "source": "pa_engine", **e})
fresh_event_bucket.append({"type": "technical", "label": fresh_event_label, "source": "pa_engine", **e})
if (bp_daily or {}).get("detected"):
stale.append({"type": "technical_background", "label": "日线底部突破回踩背景", "source": "daily_structure"})
if fresh_reason == "stale_structure_background_only":
@ -1354,27 +1402,6 @@ def main(compact: bool = False):
# 飞书只是通知层:确认阶段不再绕过 recommendation 主链路直接推送。
# 先完成 create_recommendation + DB 主状态派生,再用同一条主链路结果决定是否通知。
log_screening(
layer="确认", symbol=symbol, state="爆发", score=result["score"],
price=result["price"], signals=result["signals"],
sector=cand_detail.get("sector", cand.get("sector", "")),
leader_status=cand_detail.get("leader_status", cand.get("leader_status", "")),
is_meme=int(is_meme_coin(symbol)),
detail=build_screening_detail(
layer="确认",
state="爆发",
signals=result.get("signals", []),
detail={
"candidate_stage": "trade_confirm",
"confirmation_status": "confirmed",
"final_action": (result.get("entry_plan") or {}).get("entry_action", ""),
"fresh_reason": result.get("fresh_reason", ""),
"trigger_context": result.get("trigger_context") or {},
"entry_plan": result.get("entry_plan") or {},
},
),
)
# 🟢 只做做多!方向永远多头
rec_direction = get_strategy_direction()
@ -1383,6 +1410,17 @@ def main(compact: bool = False):
cooldown_hours = 8 if symbol_recently_closed(symbol, hours=8) else 0
if cooldown_hours > 0:
print(f"⏭ 跳过推荐({symbol}): 冷却期({cooldown_hours}h),刚止盈/止损不宜追")
_log_confirmed_screening(
symbol,
result,
cand,
cand_detail,
{
"recommendation_status": "skipped",
"recommendation_skip_reason": "cooling_off_recent_closed_trade",
"cooldown_hours": cooldown_hours,
},
)
results.append({**result, "cooling_off": True})
continue
@ -1394,6 +1432,7 @@ def main(compact: bool = False):
plan_entry = float(ep.get("entry_price") or 0)
if plan_entry > 0 and (plan_stop >= plan_entry or (plan_tp1 > 0 and plan_tp1 <= plan_entry)):
rec_entry_price = result["price"]
previous_rec_id = _active_recommendation_id(symbol)
rec_id = create_recommendation(
symbol=symbol, rec_state="爆发", rec_score=result["score"],
entry_price=rec_entry_price,
@ -1409,6 +1448,19 @@ def main(compact: bool = False):
)
update_latest_price_cache(symbol, result["price"], updated_at=datetime.now().isoformat(), source="confirm")
result["rec_id"] = rec_id
write_action = "merged_existing" if previous_rec_id and int(rec_id) == previous_rec_id else "created"
_log_confirmed_screening(
symbol,
result,
cand,
cand_detail,
{
"recommendation_status": "written",
"recommendation_write_action": write_action,
"rec_id": int(rec_id),
"previous_rec_id": previous_rec_id or 0,
},
)
else:
cand_detail = json.loads(cand.get("detail_json", "{}"))

View File

@ -96,7 +96,83 @@ def test_structure_watch_pullback_touch_does_not_upgrade_to_buy_now():
assert action == '等回踩'
assert action != '可即刻买入'
assert any('不能因到价直接升级' in r for r in reasons)
assert any('空头加速' in r for r in reasons)
def test_structure_watch_with_current_trigger_and_good_rr_can_upgrade_to_buy_now():
action, plan, reasons = apply_entry_quality_gate(
action_status='等回踩',
entry_plan={
'entry_action': '等回踩',
'entry_price': 0.114,
'current_price': 0.114,
'stop_loss': 0.1026,
'tp1': 0.140743,
'risk_reward_ok': True,
'rr1': 2.35,
'opportunity_level': 'structure_watch',
'opportunity_level_label': '结构观察',
'max_action': 'wait_pullback',
},
signals=['4H需求区反弹', '🟢 15min即刻入场信号'],
current_price=0.114,
market_context={'change_24h': -6.9},
)
assert action == '可即刻买入'
assert plan['entry_action'] == '可即刻买入'
assert plan['entry_trigger_confirmed'] is True
assert any('回踩参考已到或更优' in r for r in reasons)
def test_tracker_pullback_confirmation_signal_counts_as_current_trigger():
action, plan, reasons = apply_entry_quality_gate(
action_status='可即刻买入',
entry_plan={
'entry_action': '等回踩',
'entry_price': 0.114,
'current_price': 0.114,
'stop_loss': 0.1026,
'tp1': 0.140743,
'risk_reward_ok': True,
'rr1': 2.35,
'opportunity_level': 'structure_watch',
'opportunity_level_label': '结构观察',
'max_action': 'wait_pullback',
},
signals=['🟢 回踩确认完毕!可即刻入场(15min动K确认)'],
current_price=0.114,
market_context={'change_24h': -6.9},
)
assert action == '可即刻买入'
assert all('缺少当前15min触发' not in r for r in reasons)
def test_live_rr_recheck_overrides_stale_false_risk_reward_flag():
action, plan, reasons = apply_entry_quality_gate(
action_status='可即刻买入',
entry_plan={
'entry_action': '即刻买入',
'entry_price': 1.0,
'current_price': 1.0,
'stop_loss': 0.92,
'tp1': 1.12,
'risk_reward_ok': False,
'rr1': 0.8,
'opportunity_level': 'short_swing',
'opportunity_level_label': '短波段',
'max_action': 'buy_now',
},
signals=['🟢 15min即刻入场信号', '1H 量价齐飞K(量3.2x)'],
current_price=0.97,
market_context={'change_24h': 1.5},
)
assert plan['risk_reward_ok_live'] is True
assert action == '可即刻买入'
assert all('risk_reward_ok=false' not in r for r in reasons)
assert all('rr1=0.8' not in r for r in reasons)
def test_tracker_gate_downgrade_removes_provisional_buy_signal():
@ -200,4 +276,5 @@ def test_ws_tracker_does_not_push_when_gate_downgrades_buy_now():
)
assert action in ('等回踩', '观察')
assert action != '可即刻买入'
assert any('risk_reward_ok=false' in r for r in reasons)
assert plan['risk_reward_ok_live'] is True
assert any('缺少当前15min触发' in r for r in reasons)

View File

@ -131,6 +131,37 @@ class RecommendationStateMainlineTests(unittest.TestCase):
self.assertNotEqual(row['action_status'], '可即刻买入')
self.assertNotEqual(row['rec_time'], '2026-05-09T22:21:12')
def test_state_transition_merges_tracker_signal_with_persisted_signals(self):
rec_id = self._insert_rec(
action_status='等回踩',
entry_price=0.114,
current_price=0.114,
entry_plan_json=json.dumps({
'entry_price': 0.114,
'entry_action': '等回踩',
'risk_reward_ok': True,
'rr1': 2.35,
'stop_loss': 0.1026,
'tp1': 0.140743,
'opportunity_level': 'structure_watch',
'opportunity_level_label': '结构观察',
'max_action': 'wait_pullback',
}, ensure_ascii=False),
signals=json.dumps(['4H需求区反弹'], ensure_ascii=False),
)
decision = altcoin_db.apply_recommendation_state_transition(
rec_id,
requested_action='可即刻买入',
current_price=0.114,
event_time='2026-05-09T22:21:12',
signals=['🟢 回踩确认完毕!可即刻入场(15min动K确认)'],
)
self.assertEqual(decision['action_status'], '可即刻买入')
self.assertEqual(decision['execution_status'], 'buy_now')
self.assertTrue(decision['push_required'])
def test_api_derivation_consumes_persisted_state_without_promoting_initial_action(self):
self._insert_rec(
symbol='AAA/USDT',