From ec9ec9f3cc3d3e49151f6e7728e0c5d1f1c90572 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Wed, 20 May 2026 08:49:03 +0800 Subject: [PATCH] 1 --- app/core/opportunity_lifecycle.py | 35 +++++--- app/db/recommendation_commands.py | 23 ++++- app/services/altcoin_confirm.py | 96 ++++++++++++++++----- tests/test_opportunity_lifecycle.py | 81 ++++++++++++++++- tests/test_recommendation_state_mainline.py | 31 +++++++ 5 files changed, 229 insertions(+), 37 deletions(-) diff --git a/app/core/opportunity_lifecycle.py b/app/core/opportunity_lifecycle.py index b0a9992..124f672 100644 --- a/app/core/opportunity_lifecycle.py +++ b/app/core/opportunity_lifecycle.py @@ -179,7 +179,14 @@ def has_current_entry_trigger(signals: Iterable[Any], entry_plan: Dict[str, Any] return True for sig in signals or []: text = str(sig) - if "15min即刻入场" in text or "当前15min" in text or "当前 15min" in text: + if ( + "15min即刻入场" in text + or "当前15min" in text + or "当前 15min" in text + or "回踩确认完毕" in text + or "可即刻入场" in text + or "15min动K确认" in text + ): return True return False @@ -311,6 +318,8 @@ def apply_entry_quality_gate( entry_plan.pop("entry_quality_gate", None) breakout_distance = detect_breakout_distance_pct(signals) change_24h = to_float(market_context.get("change_24h")) + current_entry_trigger = has_current_entry_trigger(signals, entry_plan) + bearish_flow_risk = has_bearish_flow_risk(signals) lifecycle, ambush_reasons = build_low_ambush_plan( entry_plan=entry_plan, @@ -331,17 +340,19 @@ def apply_entry_quality_gate( }) if action_status in ("可即刻买入", "等回踩"): - if original_risk_reward_ok is False: + if risk_reward_ok is False: reasons.append(f"risk_reward_ok=false,盈亏比闸门禁止现价买入;实时rr1={rr1}") - if "rr1" in entry_plan and original_rr1 < _cfg_value(cfg, "min_rr_buy_now"): - reasons.append(f"rr1={original_rr1} < {_cfg_value(cfg, 'min_rr_buy_now')},禁止现价买入") + if "rr1" in entry_plan and rr1 < _cfg_value(cfg, "min_rr_buy_now"): + reasons.append(f"rr1={rr1} < {_cfg_value(cfg, 'min_rr_buy_now')},禁止现价买入") if action_status == "可即刻买入": - if level_max_action in ("observe", "wait_pullback"): + if level_max_action == "observe": reasons.append(f"{entry_plan.get('opportunity_level_label') or opportunity_level or '该机会'}级别最高只允许{level_meta.get('label') and ('观察/等待') or '观察/等待'},禁止现价买入") - if not has_current_entry_trigger(signals, entry_plan): + elif level_max_action == "wait_pullback" and not current_entry_trigger: + reasons.append(f"{entry_plan.get('opportunity_level_label') or opportunity_level or '该机会'}级别需要低周期触发后才允许买入") + if not current_entry_trigger: reasons.append("缺少当前15min触发,禁止现价买入") - if has_bearish_flow_risk(signals): + if bearish_flow_risk: reasons.append("出现空头加速/放量阴线风险,禁止现价买入") if current_price > 0: plan_entry_price = to_float(entry_plan.get("entry_price")) @@ -374,11 +385,11 @@ def apply_entry_quality_gate( if risk_reward_ok is False or rr1 < _cfg_value(cfg, "min_rr_buy_now"): target_action = "观察" reasons.append("回踩参考已到,但实时盈亏比不达标,转为观察") - elif level_max_action in ("observe", "wait_pullback") or has_bearish_flow_risk(signals): - target_action = "等回踩" if level_max_action == "wait_pullback" else "观察" - if level_max_action in ("observe", "wait_pullback"): + elif level_max_action == "observe" or (level_max_action == "wait_pullback" and not current_entry_trigger) or bearish_flow_risk: + target_action = "等回踩" if level_max_action == "wait_pullback" and not bearish_flow_risk else "观察" + if level_max_action == "observe" or (level_max_action == "wait_pullback" and not current_entry_trigger): reasons.append(f"{entry_plan.get('opportunity_level_label') or opportunity_level or '该机会'}级别最高只允许观察/等待,不能因到价直接升级为现价买入") - if has_bearish_flow_risk(signals): + if bearish_flow_risk: reasons.append("出现空头加速/放量阴线风险,到价也不升级为现价买入") else: target_action = "可即刻买入" @@ -402,7 +413,7 @@ def apply_entry_quality_gate( if any("回踩参考已下破" in str(x) for x in reasons): target_action = "观察" elif any("回踩参考已到或更优" in str(x) for x in reasons) and not ( - level_max_action in ("observe", "wait_pullback") or has_bearish_flow_risk(signals) + level_max_action == "observe" or (level_max_action == "wait_pullback" and not current_entry_trigger) or bearish_flow_risk ): target_action = "可即刻买入" elif action_status == "等回踩" and current_price > 0 and to_float(entry_plan.get("entry_price")) > 0 and current_price <= to_float(entry_plan.get("entry_price")) * 1.003 and (risk_reward_ok is False or rr1 < _cfg_value(cfg, "min_rr_buy_now")): diff --git a/app/db/recommendation_commands.py b/app/db/recommendation_commands.py index 0438fa7..ca9c376 100644 --- a/app/db/recommendation_commands.py +++ b/app/db/recommendation_commands.py @@ -23,6 +23,27 @@ from app.db.recommendation_state import ( from app.db.schema import get_conn +def _merged_signal_payload(*payloads): + merged = [] + seen = set() + for payload in payloads: + values = [] + if isinstance(payload, list): + values = payload + elif isinstance(payload, str) and payload.strip(): + try: + parsed = json.loads(payload) + values = parsed if isinstance(parsed, list) else [payload] + except Exception: + values = [payload] + for value in values: + key = str(value) + if key not in seen: + seen.add(key) + merged.append(value) + return merged + + def _serialized_signal_payload(signals): labels = build_signal_labels(signals if isinstance(signals, list) else normalize_signals(signals)) codes = build_signal_codes(labels) @@ -243,7 +264,7 @@ def apply_recommendation_state_transition(rec_id, requested_action, current_pric final_action, entry_plan, gate_reasons = apply_entry_quality_gate( action_status=final_action, entry_plan=entry_plan, - signals=signals if signals is not None else item.get("signals"), + signals=_merged_signal_payload(item.get("signals"), signals) if signals is not None else item.get("signals"), current_price=current_price, market_context=normalize_json_object(item.get("market_context_json")), derivatives_context=normalize_json_object(item.get("derivatives_context_json")), diff --git a/app/services/altcoin_confirm.py b/app/services/altcoin_confirm.py index 745c6f8..9008abb 100644 --- a/app/services/altcoin_confirm.py +++ b/app/services/altcoin_confirm.py @@ -89,6 +89,49 @@ def symbol_recently_closed(symbol: str, hours: int = 8) -> bool: return ((row[0] or 0) + (paper_row[0] or 0)) > 0 +def _active_recommendation_id(symbol: str) -> int: + """Return the current non-history recommendation id for merge/write diagnostics.""" + conn = get_conn() + try: + row = conn.execute( + """ + SELECT id FROM recommendation + WHERE symbol=%s AND status='active' AND COALESCE(display_bucket,'watch_pool') != 'history' + ORDER BY id DESC LIMIT 1 + """, + (symbol,), + ).fetchone() + return int(row["id"] if row else 0) + finally: + conn.close() + + +def _log_confirmed_screening(symbol, result, cand, cand_detail, recommendation_meta=None): + recommendation_meta = recommendation_meta or {} + detail = { + "candidate_stage": "trade_confirm", + "confirmed": True, + "confirmation_status": "confirmed", + "final_action": (result.get("entry_plan") or {}).get("entry_action", ""), + "fresh_reason": result.get("fresh_reason", ""), + "trigger_context": result.get("trigger_context") or {}, + "entry_plan": result.get("entry_plan") or {}, + **recommendation_meta, + } + log_screening( + layer="确认", symbol=symbol, state="爆发", score=result["score"], + price=result["price"], signals=result["signals"], + sector=cand_detail.get("sector", cand.get("sector", "")), + leader_status=cand_detail.get("leader_status", cand.get("leader_status", "")), + is_meme=int(is_meme_coin(symbol)), + detail=build_screening_detail( + layer="确认", + state="爆发", + signals=result.get("signals", []), + detail=detail, + ), + ) + def _event_time_from_age(df, age_bars: int): """把 age_bars 转成K线时间,用于候选信号新鲜度判断。""" @@ -150,8 +193,13 @@ def _build_trigger_context(fresh_reason, fresh_events, vp_data=None, stale_vp_co stale.append({"type": "technical", "label": "历史1H起爆点", "source": "pa_engine_1h", "count": len(stale_1h_ignitions)}) if stale_d1_ignitions: stale.append({"type": "technical", "label": "历史日线起爆点", "source": "pa_engine_1d", "count": len(stale_d1_ignitions)}) + fresh_event_bucket = current + fresh_event_label = "当前结构触发" + if fresh_reason == "stale_structure_background_only": + fresh_event_bucket = stale + fresh_event_label = "历史结构触发" for e in fresh_events: - current.append({"type": "technical", "label": "当前结构触发", "source": "pa_engine", **e}) + fresh_event_bucket.append({"type": "technical", "label": fresh_event_label, "source": "pa_engine", **e}) if (bp_daily or {}).get("detected"): stale.append({"type": "technical_background", "label": "日线底部突破回踩背景", "source": "daily_structure"}) if fresh_reason == "stale_structure_background_only": @@ -1354,27 +1402,6 @@ def main(compact: bool = False): # 飞书只是通知层:确认阶段不再绕过 recommendation 主链路直接推送。 # 先完成 create_recommendation + DB 主状态派生,再用同一条主链路结果决定是否通知。 - log_screening( - layer="确认", symbol=symbol, state="爆发", score=result["score"], - price=result["price"], signals=result["signals"], - sector=cand_detail.get("sector", cand.get("sector", "")), - leader_status=cand_detail.get("leader_status", cand.get("leader_status", "")), - is_meme=int(is_meme_coin(symbol)), - detail=build_screening_detail( - layer="确认", - state="爆发", - signals=result.get("signals", []), - detail={ - "candidate_stage": "trade_confirm", - "confirmation_status": "confirmed", - "final_action": (result.get("entry_plan") or {}).get("entry_action", ""), - "fresh_reason": result.get("fresh_reason", ""), - "trigger_context": result.get("trigger_context") or {}, - "entry_plan": result.get("entry_plan") or {}, - }, - ), - ) - # 🟢 只做做多!方向永远多头 rec_direction = get_strategy_direction() @@ -1383,6 +1410,17 @@ def main(compact: bool = False): cooldown_hours = 8 if symbol_recently_closed(symbol, hours=8) else 0 if cooldown_hours > 0: print(f"⏭ 跳过推荐({symbol}): 冷却期({cooldown_hours}h),刚止盈/止损不宜追") + _log_confirmed_screening( + symbol, + result, + cand, + cand_detail, + { + "recommendation_status": "skipped", + "recommendation_skip_reason": "cooling_off_recent_closed_trade", + "cooldown_hours": cooldown_hours, + }, + ) results.append({**result, "cooling_off": True}) continue @@ -1394,6 +1432,7 @@ def main(compact: bool = False): plan_entry = float(ep.get("entry_price") or 0) if plan_entry > 0 and (plan_stop >= plan_entry or (plan_tp1 > 0 and plan_tp1 <= plan_entry)): rec_entry_price = result["price"] + previous_rec_id = _active_recommendation_id(symbol) rec_id = create_recommendation( symbol=symbol, rec_state="爆发", rec_score=result["score"], entry_price=rec_entry_price, @@ -1409,6 +1448,19 @@ def main(compact: bool = False): ) update_latest_price_cache(symbol, result["price"], updated_at=datetime.now().isoformat(), source="confirm") result["rec_id"] = rec_id + write_action = "merged_existing" if previous_rec_id and int(rec_id) == previous_rec_id else "created" + _log_confirmed_screening( + symbol, + result, + cand, + cand_detail, + { + "recommendation_status": "written", + "recommendation_write_action": write_action, + "rec_id": int(rec_id), + "previous_rec_id": previous_rec_id or 0, + }, + ) else: cand_detail = json.loads(cand.get("detail_json", "{}")) diff --git a/tests/test_opportunity_lifecycle.py b/tests/test_opportunity_lifecycle.py index 73ec384..19ee15c 100644 --- a/tests/test_opportunity_lifecycle.py +++ b/tests/test_opportunity_lifecycle.py @@ -96,7 +96,83 @@ def test_structure_watch_pullback_touch_does_not_upgrade_to_buy_now(): assert action == '等回踩' assert action != '可即刻买入' - assert any('不能因到价直接升级' in r for r in reasons) + assert any('空头加速' in r for r in reasons) + + +def test_structure_watch_with_current_trigger_and_good_rr_can_upgrade_to_buy_now(): + action, plan, reasons = apply_entry_quality_gate( + action_status='等回踩', + entry_plan={ + 'entry_action': '等回踩', + 'entry_price': 0.114, + 'current_price': 0.114, + 'stop_loss': 0.1026, + 'tp1': 0.140743, + 'risk_reward_ok': True, + 'rr1': 2.35, + 'opportunity_level': 'structure_watch', + 'opportunity_level_label': '结构观察', + 'max_action': 'wait_pullback', + }, + signals=['4H需求区反弹', '🟢 15min即刻入场信号'], + current_price=0.114, + market_context={'change_24h': -6.9}, + ) + + assert action == '可即刻买入' + assert plan['entry_action'] == '可即刻买入' + assert plan['entry_trigger_confirmed'] is True + assert any('回踩参考已到或更优' in r for r in reasons) + + +def test_tracker_pullback_confirmation_signal_counts_as_current_trigger(): + action, plan, reasons = apply_entry_quality_gate( + action_status='可即刻买入', + entry_plan={ + 'entry_action': '等回踩', + 'entry_price': 0.114, + 'current_price': 0.114, + 'stop_loss': 0.1026, + 'tp1': 0.140743, + 'risk_reward_ok': True, + 'rr1': 2.35, + 'opportunity_level': 'structure_watch', + 'opportunity_level_label': '结构观察', + 'max_action': 'wait_pullback', + }, + signals=['🟢 回踩确认完毕!可即刻入场(15min动K确认)'], + current_price=0.114, + market_context={'change_24h': -6.9}, + ) + + assert action == '可即刻买入' + assert all('缺少当前15min触发' not in r for r in reasons) + + +def test_live_rr_recheck_overrides_stale_false_risk_reward_flag(): + action, plan, reasons = apply_entry_quality_gate( + action_status='可即刻买入', + entry_plan={ + 'entry_action': '即刻买入', + 'entry_price': 1.0, + 'current_price': 1.0, + 'stop_loss': 0.92, + 'tp1': 1.12, + 'risk_reward_ok': False, + 'rr1': 0.8, + 'opportunity_level': 'short_swing', + 'opportunity_level_label': '短波段', + 'max_action': 'buy_now', + }, + signals=['🟢 15min即刻入场信号', '1H 量价齐飞K(量3.2x)'], + current_price=0.97, + market_context={'change_24h': 1.5}, + ) + + assert plan['risk_reward_ok_live'] is True + assert action == '可即刻买入' + assert all('risk_reward_ok=false' not in r for r in reasons) + assert all('rr1=0.8' not in r for r in reasons) def test_tracker_gate_downgrade_removes_provisional_buy_signal(): @@ -200,4 +276,5 @@ def test_ws_tracker_does_not_push_when_gate_downgrades_buy_now(): ) assert action in ('等回踩', '观察') assert action != '可即刻买入' - assert any('risk_reward_ok=false' in r for r in reasons) + assert plan['risk_reward_ok_live'] is True + assert any('缺少当前15min触发' in r for r in reasons) diff --git a/tests/test_recommendation_state_mainline.py b/tests/test_recommendation_state_mainline.py index e09438a..62b69a5 100644 --- a/tests/test_recommendation_state_mainline.py +++ b/tests/test_recommendation_state_mainline.py @@ -131,6 +131,37 @@ class RecommendationStateMainlineTests(unittest.TestCase): self.assertNotEqual(row['action_status'], '可即刻买入') self.assertNotEqual(row['rec_time'], '2026-05-09T22:21:12') + def test_state_transition_merges_tracker_signal_with_persisted_signals(self): + rec_id = self._insert_rec( + action_status='等回踩', + entry_price=0.114, + current_price=0.114, + entry_plan_json=json.dumps({ + 'entry_price': 0.114, + 'entry_action': '等回踩', + 'risk_reward_ok': True, + 'rr1': 2.35, + 'stop_loss': 0.1026, + 'tp1': 0.140743, + 'opportunity_level': 'structure_watch', + 'opportunity_level_label': '结构观察', + 'max_action': 'wait_pullback', + }, ensure_ascii=False), + signals=json.dumps(['4H需求区反弹'], ensure_ascii=False), + ) + + decision = altcoin_db.apply_recommendation_state_transition( + rec_id, + requested_action='可即刻买入', + current_price=0.114, + event_time='2026-05-09T22:21:12', + signals=['🟢 回踩确认完毕!可即刻入场(15min动K确认)'], + ) + + self.assertEqual(decision['action_status'], '可即刻买入') + self.assertEqual(decision['execution_status'], 'buy_now') + self.assertTrue(decision['push_required']) + def test_api_derivation_consumes_persisted_state_without_promoting_initial_action(self): self._insert_rec( symbol='AAA/USDT',