From cca36615c1a1425198f26d8665a699575577449a Mon Sep 17 00:00:00 2001 From: aaron <> Date: Tue, 26 May 2026 21:10:22 +0800 Subject: [PATCH] 1 --- AGENTS.md | 2 + app/config/config_loader.py | 13 +- app/db/altcoin_db.py | 5 + app/db/paper_trading.py | 132 ++++++++++++++++++ app/db/recommendation_commands.py | 60 ++++++++ app/services/altcoin_confirm.py | 50 ++++++- app/services/paper_trader.py | 11 +- app/services/review_engine.py | 8 +- rules.yaml | 8 +- tests/test_confirm_market_risk_gate.py | 28 ++++ tests/test_factor_scoring.py | 33 ++++- ...st_market_risk_recommendation_downgrade.py | 39 ++++++ tests/test_paper_trading.py | 42 ++++++ tests/test_price_streamer.py | 38 ++--- tests/test_review_accuracy_pipeline.py | 2 +- 15 files changed, 438 insertions(+), 33 deletions(-) create mode 100644 tests/test_confirm_market_risk_gate.py create mode 100644 tests/test_market_risk_recommendation_downgrade.py diff --git a/AGENTS.md b/AGENTS.md index 5558b13..43d0309 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -107,6 +107,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - `signal_performance` 是复盘后动态权重来源;`review_engine.py` 更新信号绩效后,`config_loader.get_signal_weights()` 会让下一轮筛选/确认读取生效权重。 - 当前确认层已把核心技术因子、资金面因子、板块因子、舆情因子和买点风险因子接入 `FactorScorer`,并在 `market_context.factor_score_breakdown` / `entry_plan.factor_score_breakdown` 中保留因子明细。 - `FactorScorer` 已加入因子组去相关,同一类 `momentum` / `structure` / `entry_quality` / `onchain_flow` / `narrative` 信号会受 group cap 限制,避免同一根行情被重复加分。 +- 小样本复盘不能直接杀死核心因子。`signal_performance` 的动态权重至少要满足 `review.min_samples_for_weight` 与 `review.signal_deprecation.min_samples` 后才覆盖确认层基线;未达样本门槛时只用于观察,不应用 0 权重把 15min 启动、日线突破回踩等因子压没。 - 扣分因子应传负数,例如 `FactorScorer.delta("false_breakout", -5, ...)`,不要再外部 `score -= delta`,否则 `factor_score_breakdown` 会把风险误记成正向贡献。 - 确认层会输出 `score_components`:`opportunity_score` 表示机会质量,`entry_score` 表示买点质量,`risk_score` 表示扣分风险;后续策略不要再只看单一 `rec_score`。 - `market_context.decision_log` / `entry_plan.decision_log` 是结构化决策解释;paper trading 开仓事件也会记录当时 `market_regime`、`global_risk` 和 `score_components`。 @@ -163,6 +164,7 @@ AlphaX 是一个以 `Python + FastAPI + PostgreSQL + Docker + 静态 HTML` 组 - 市场环境识别中心,第一版基于市场快照、BTC/ETH 涨跌、山寨涨跌广度、强势/大跌数量和 funding 热度识别 `risk_off`、`btc_main_uptrend`、`altcoin_rotation`、`sideways_chop`、`meme_frenzy`、`unknown`。 - `app/core/global_risk.py` - paper trading 全局风控门禁。单币机会进入开仓或挂单成交前,需要先检查市场环境和账户风险;critical 禁止新开仓,high 只允许高质量机会。 + - 确认层也会应用同一市场风控语义:`risk_level=critical` 且 `position_multiplier=0` 时,强势发现仍可记录为观察,但不能输出 `buy_now` 或新挂单动作;已有活跃可交易推荐会被降级为观察并写入 `market_risk_gate`。 ## 5. 数据与状态中心 diff --git a/app/config/config_loader.py b/app/config/config_loader.py index c301a94..3789eaa 100644 --- a/app/config/config_loader.py +++ b/app/config/config_loader.py @@ -267,7 +267,10 @@ def get_signal_weights(): - 旧脚本仍可能用历史 key(如 "1H放量(量价背离)")直接查 weights[...] 因此返回值同时暴露 canonical key + alias key,避免旧调用方 KeyError。 """ - rules = load_rules() + # Signal weights need a stable baseline. Runtime strategy_override may + # contain small-sample governance writes; those are only trusted through + # signal_performance after the sample-size gate below. + rules = _load_yaml_baseline() yaml_weights = copy.deepcopy(rules.get("signal_weights", {})) canonical = {} @@ -277,9 +280,15 @@ def get_signal_weights(): try: from app.db.altcoin_db import get_signal_weights as db_get_weights db_weights = db_get_weights() + review_params = get_review_params() + deprecation_params = review_params.get("signal_deprecation") or {} + min_samples = max( + int(review_params.get("min_samples_for_weight", 12) or 12), + int(deprecation_params.get("min_samples", 12) or 12), + ) for sig, data in db_weights.items(): norm_sig = normalize_signal_name(sig) - if data.get("total_count", 0) >= 3: + if data.get("total_count", 0) >= min_samples: canonical[norm_sig] = data["weight"] except Exception: pass diff --git a/app/db/altcoin_db.py b/app/db/altcoin_db.py index 4922bf2..7c0ae87 100644 --- a/app/db/altcoin_db.py +++ b/app/db/altcoin_db.py @@ -70,6 +70,11 @@ def expire_old_recommendations(*args, **kwargs): return _recommendation_commands.expire_old_recommendations(*args, **kwargs) +def downgrade_active_entries_for_market_risk(*args, **kwargs): + _sync_command_compat_hooks() + return _recommendation_commands.downgrade_active_entries_for_market_risk(*args, **kwargs) + + def apply_recommendation_state_transition(*args, **kwargs): _sync_command_compat_hooks() return _recommendation_commands.apply_recommendation_state_transition(*args, **kwargs) diff --git a/app/db/paper_trading.py b/app/db/paper_trading.py index bd655c5..4c1e93e 100644 --- a/app/db/paper_trading.py +++ b/app/db/paper_trading.py @@ -1390,6 +1390,138 @@ def sync_recommendation(rec: dict, current_price: float, event_time: str = "") - pass +def sync_pending_paper_orders(limit: int = 100, event_time: str = "", config: dict | None = None) -> dict: + """Reconcile pending limit orders against the latest shared price cache. + + The strategy runner can miss an existing order if the recommendation is + later derived back into observe status. Pending orders are executable + state, so they need their own reconciliation pass based on the same + latest_price_cache used by the Web page. + """ + if not paper_trading_enabled(): + return {"enabled": False, "processed_count": 0, "results": []} + limit = max(1, min(_safe_int(limit, 100), 500)) + event_time = event_time or _now() + cfg = _paper_cfg(config) + conn = get_conn() + results = [] + try: + rows = conn.execute( + """ + SELECT + po.*, + r.status AS rec_status, + r.rec_state, + r.rec_score, + r.action_status, + r.execution_status, + r.lifecycle_state, + r.display_bucket, + r.entry_price, + r.current_price AS rec_current_price, + r.stop_loss AS rec_stop_loss, + r.tp1 AS rec_tp1, + r.tp2 AS rec_tp2, + r.strategy_version AS rec_strategy_version, + r.entry_plan_json, + r.market_context_json, + r.derivatives_context_json, + r.sector_context_json, + lpc.price AS latest_price, + lpc.updated_at AS latest_price_updated_at + FROM paper_orders po + LEFT JOIN recommendation r ON r.id = po.recommendation_id + LEFT JOIN latest_price_cache lpc ON lpc.symbol = po.symbol + WHERE po.status='pending' + ORDER BY po.created_at ASC, po.id ASC + LIMIT %s + """, + (limit,), + ).fetchall() + for row in rows: + item = dict(row) + current_price = _safe_float(item.get("latest_price")) + order = {k: item.get(k) for k in [ + "id", + "recommendation_id", + "symbol", + "side", + "order_type", + "status", + "source_status", + "source_action", + "target_price", + "current_price_at_create", + "fill_price", + "notional_usdt", + "stop_loss", + "tp1", + "tp2", + "strategy_version", + "entry_plan_snapshot_json", + "created_at", + "updated_at", + "expires_at", + "filled_at", + "canceled_at", + "cancel_reason", + ]} + rec = { + "id": item.get("recommendation_id"), + "symbol": item.get("symbol"), + "status": item.get("rec_status") or "active", + "rec_state": item.get("rec_state"), + "rec_score": item.get("rec_score"), + "action_status": item.get("action_status") or item.get("source_action"), + "execution_status": item.get("execution_status") or item.get("source_status"), + "lifecycle_state": item.get("lifecycle_state"), + "display_bucket": item.get("display_bucket"), + "entry_price": item.get("entry_price") or item.get("target_price"), + "current_price": item.get("rec_current_price") or current_price, + "stop_loss": item.get("rec_stop_loss") or item.get("stop_loss"), + "tp1": item.get("rec_tp1") or item.get("tp1"), + "tp2": item.get("rec_tp2") or item.get("tp2"), + "strategy_version": item.get("rec_strategy_version") or item.get("strategy_version"), + "entry_plan_json": item.get("entry_plan_json") or item.get("entry_plan_snapshot_json"), + "market_context_json": item.get("market_context_json"), + "derivatives_context_json": item.get("derivatives_context_json"), + "sector_context_json": item.get("sector_context_json"), + } + if current_price <= 0: + result = { + "skipped": True, + "reason": "missing_latest_price", + "paper_order_id": order.get("id"), + "symbol": order.get("symbol"), + "target_price": order.get("target_price"), + } + else: + result = _sync_wait_pullback_order(conn, rec, current_price, event_time, cfg) + result.update({ + "symbol": order.get("symbol"), + "latest_price": current_price, + "latest_price_updated_at": item.get("latest_price_updated_at") or "", + }) + results.append(result) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + try: + conn.close() + except Exception: + pass + return { + "enabled": True, + "processed_count": len(results), + "filled_count": sum(1 for r in results if r.get("paper_order", {}).get("filled")), + "canceled_count": sum(1 for r in results if str(r.get("reason") or "").startswith("paper_order_") and "canceled" in str(r.get("reason") or "")), + "results": results, + "run_time": event_time, + } + + def get_paper_trading_summary(days: int = 30) -> dict: days = max(1, min(_safe_int(days, 30), 365)) cutoff = (datetime.now() - timedelta(days=days)).isoformat() diff --git a/app/db/recommendation_commands.py b/app/db/recommendation_commands.py index ca9c376..21f85f7 100644 --- a/app/db/recommendation_commands.py +++ b/app/db/recommendation_commands.py @@ -244,6 +244,66 @@ def expire_old_recommendations(hours=48): conn.close() +def downgrade_active_entries_for_market_risk(reason: str, event_time: str | None = None) -> dict: + """Downgrade active executable recommendations when global market risk blocks entries.""" + event_time = event_time or datetime.now().isoformat() + reason = str(reason or "全市场风险过高,暂停新开仓与新挂单").strip() + conn = get_conn() + rows = conn.execute( + """ + SELECT id, entry_plan_json, action_status + FROM recommendation + WHERE status='active' + AND ( + COALESCE(action_status,'') IN ('可即刻买入','等回踩') + OR COALESCE(execution_status,'') IN ('buy_now','wait_pullback') + ) + ORDER BY id DESC + """ + ).fetchall() + updated = 0 + execution_status, display_bucket, lifecycle_state, entry_triggered, state_reason = derive_minimal_state_fields( + "active", "观察", {} + ) + for row in rows: + entry_plan = normalize_json_object(row["entry_plan_json"]) + previous_action = str(row["action_status"] or "").strip() + entry_plan["market_risk_gate"] = { + "blocked_action": previous_action, + "final_action": "观察", + "risk_level": "critical", + "reasons": [reason], + "updated_at": event_time, + } + entry_plan["entry_action"] = "观察" + conn.execute( + """ + UPDATE recommendation + SET action_status='观察', + execution_status=%s, + display_bucket=%s, + lifecycle_state=%s, + entry_triggered=%s, + state_reason=%s, + entry_plan_json=%s + WHERE id=%s + """, + ( + execution_status, + display_bucket, + lifecycle_state, + entry_triggered, + state_reason, + json.dumps(entry_plan, ensure_ascii=False), + row["id"], + ), + ) + updated += 1 + conn.commit() + conn.close() + return {"updated_count": updated, "reason": reason, "updated_at": event_time} + + def apply_recommendation_state_transition(rec_id, requested_action, current_price, event_time=None, signals=None): """The single DB entry for turning price events into recommendation action state.""" event_time = event_time or datetime.now().strftime("%Y-%m-%dT%H:%M:%S") diff --git a/app/services/altcoin_confirm.py b/app/services/altcoin_confirm.py index b5c6a0f..4930901 100644 --- a/app/services/altcoin_confirm.py +++ b/app/services/altcoin_confirm.py @@ -29,7 +29,7 @@ from app.core.sector_map import get_burst_threshold, is_meme_coin, get_sector_fo from app.db.altcoin_db import ( init_db, expire_old_states, expire_old_recommendations, get_candidates_for_confirm, update_state, get_conn, create_recommendation, log_screening, - log_cron_run, update_latest_price_cache, + log_cron_run, update_latest_price_cache, downgrade_active_entries_for_market_risk, ) from app.config.config_loader import ( get_strategy_direction, @@ -524,6 +524,32 @@ def _decision_log(module: str, decision: str, *, score: float = 0.0, reasons=Non } +def _apply_market_risk_entry_gate(entry_plan: dict, signals: list, market_regime: dict) -> tuple[dict, str]: + """Keep high-risk market discoveries visible, but block executable entries.""" + plan = dict(entry_plan or {}) + regime = market_regime if isinstance(market_regime, dict) else {} + risk_level = str(regime.get("risk_level") or "").strip().lower() + position_multiplier = float(regime.get("position_multiplier") or 0) + current_action = str(plan.get("entry_action") or "").strip() + if risk_level != "critical" or position_multiplier > 0: + return plan, "" + if current_action not in {"可即刻买入", "即刻买入", "等回踩"}: + return plan, "" + + reason = "全市场处于 critical 风险,暂停新开仓与新挂单,保留为观察机会" + plan["market_risk_gate"] = { + "blocked_action": current_action, + "final_action": "观察", + "risk_level": risk_level, + "position_multiplier": position_multiplier, + "reasons": [reason], + } + plan["entry_action"] = "观察" + if not any("市场风控闸门" in str(sig) for sig in signals): + signals.append(f"⚠️ 市场风控闸门: {reason}") + return plan, reason + + # ==================== 确认逻辑 ==================== def detect_volume_price_fly_1h(df_1h): @@ -1437,6 +1463,15 @@ def confirm_burst(symbol, cand): sector_context = compute_sector_context(symbol, cand_detail) regime_context = _current_market_regime_context() market_regime = regime_context.get("market_regime") or {} + if entry_plan: + entry_plan, market_risk_gate_reason = _apply_market_risk_entry_gate(entry_plan, signals, market_regime) + if market_risk_gate_reason: + score += factor_scorer.delta( + "entry_quality_gate", + -2, + evidence="全局市场风险闸门降为观察", + value=market_risk_gate_reason, + ) factor_score_breakdown = factor_scorer.summary() opportunity_score = round(float(factor_score_breakdown.get("opportunity_score") or 0), 3) entry_score = round(float(factor_score_breakdown.get("entry_score") or 0), 3) @@ -1498,7 +1533,7 @@ def confirm_burst(symbol, cand): "pa_15min": pa_15min_result, "pa_1d": pa_1d, "m30_aligned": m30_aligned, - "entry_action": entry_action, + "entry_action": (entry_plan or {}).get("entry_action") or entry_action, "market_context": market_context, "derivatives_context": derivatives_context, "sector_context": sector_context, @@ -1584,6 +1619,15 @@ def main(compact: bool = False): try: init_db() expire_old_states() + regime_context = _current_market_regime_context() + market_regime = regime_context.get("market_regime") or {} + if str(market_regime.get("risk_level") or "").strip().lower() == "critical" and float(market_regime.get("position_multiplier") or 0) <= 0: + downgrade_result = downgrade_active_entries_for_market_risk( + "全市场处于 critical 风险,暂停新开仓与新挂单,保留为观察机会", + event_time=datetime.now().isoformat(), + ) + else: + downgrade_result = {"updated_count": 0} candidates = get_candidates_for_confirm() @@ -1591,6 +1635,7 @@ def main(compact: bool = False): output = { "status": "no_candidates", "message": "无需要确认的候选(需加速状态+评分≥6)", + "market_risk_downgraded_count": downgrade_result.get("updated_count", 0), "check_time": datetime.now().isoformat(), } _emit_output(output, compact=compact) @@ -1734,6 +1779,7 @@ def main(compact: bool = False): "status": "confirmed" if confirmed else "unconfirmed", "confirmed_count": len(confirmed), "unconfirmed_count": len(unconfirmed), + "market_risk_downgraded_count": downgrade_result.get("updated_count", 0), "confirmed": confirmed, "unconfirmed": unconfirmed, "check_time": datetime.now().isoformat(), diff --git a/app/services/paper_trader.py b/app/services/paper_trader.py index 32f62cd..9f30bc0 100644 --- a/app/services/paper_trader.py +++ b/app/services/paper_trader.py @@ -8,7 +8,7 @@ from datetime import datetime import ccxt from app.db.altcoin_db import init_db, log_cron_run, update_latest_price_cache -from app.db.paper_trading import get_paper_trading_summary, sync_recommendation +from app.db.paper_trading import get_paper_trading_summary, sync_pending_paper_orders, sync_recommendation from app.db.recommendation_queries import get_active_recommendations_deduped from app.services.live_trading_sync import sync_paper_trade_to_live @@ -35,12 +35,19 @@ def run_once(limit: int = 100) -> dict: results.append(result) except Exception as exc: failed.append({"symbol": symbol, "error": str(exc)}) + pending_result = sync_pending_paper_orders(limit=limit, event_time=datetime.now().isoformat()) + for item in pending_result.get("results", []): + if item.get("trade_id") and (item.get("opened") or item.get("paper_order", {}).get("filled")): + item["live_sync"] = sync_paper_trade_to_live(int(item["trade_id"]), execute=True) output = { "status": "completed", "processed_count": len(results), + "pending_processed_count": pending_result.get("processed_count", 0), + "pending_filled_count": pending_result.get("filled_count", 0), "failed_count": len(failed), "failed": failed, "results": results, + "pending_results": pending_result.get("results", []), "summary": get_paper_trading_summary(days=30), "run_time": datetime.now().isoformat(), } @@ -77,6 +84,8 @@ def main(limit: int = 100): duration_ms=int((finished_at - started_at).total_seconds() * 1000), summary={ "processed_count": output.get("processed_count", 0), + "pending_processed_count": output.get("pending_processed_count", 0), + "pending_filled_count": output.get("pending_filled_count", 0), "failed_count": output.get("failed_count", 0), "open_count": output.get("summary", {}).get("open_count", 0), "closed_count": output.get("summary", {}).get("closed_count", 0), diff --git a/app/services/review_engine.py b/app/services/review_engine.py index b4fe154..70d779f 100644 --- a/app/services/review_engine.py +++ b/app/services/review_engine.py @@ -87,10 +87,10 @@ def _get_thresholds(): "hit_threshold_pct": params.get("hit_threshold_pct", 5.0), "fail_threshold_pct": params.get("fail_threshold_pct", -3.0), "missed_explosion_pct": params.get("missed_explosion_pct", 20.0), - "min_samples_for_weight": params.get("min_samples_for_weight", 3), + "min_samples_for_weight": params.get("min_samples_for_weight", 12), "weight_floor": params.get("weight_floor", 0.0), "hit_rate_kill_threshold": params.get("hit_rate_kill_threshold", 0.10), - "kill_min_samples": params.get("kill_min_samples", 5), + "kill_min_samples": params.get("kill_min_samples", 20), "category_base_weights": params.get("category_base_weights", {"前瞻": 2.0, "PA": 1.5, "滞后": 0.5}), } @@ -605,8 +605,8 @@ def _apply_daily_factor_weight_governance(): """ thresholds = _get_thresholds() weights = get_signal_weights() - min_samples = max(3, int(thresholds.get("min_samples_for_weight", 3) or 3)) - kill_min_samples = max(min_samples, int(thresholds.get("kill_min_samples", 5) or 5)) + min_samples = max(12, int(thresholds.get("min_samples_for_weight", 12) or 12)) + kill_min_samples = max(min_samples, int(thresholds.get("kill_min_samples", 20) or 20)) kill_hit_rate = float(thresholds.get("hit_rate_kill_threshold", 0.10) or 0.10) * 100 warn_hit_rate = float((thresholds.get("signal_deprecation") or {}).get("hit_rate_warn_threshold", 0.20) or 0.20) * 100 category_base = thresholds.get("category_base_weights") or {"前瞻": 2.0, "PA": 1.5, "滞后": 0.5} diff --git a/rules.yaml b/rules.yaml index f505afe..10b9949 100644 --- a/rules.yaml +++ b/rules.yaml @@ -271,21 +271,21 @@ review: hit_threshold_pct: 5.0 fail_threshold_pct: -3.0 missed_explosion_pct: 20.0 - min_samples_for_weight: 3 + min_samples_for_weight: 12 weight_floor: 0.0 hit_rate_kill_threshold: 0.1 - kill_min_samples: 5 + kill_min_samples: 20 category_base_weights: 前瞻: 2.0 PA: 1.5 滞后: 0.5 signal_deprecation: enabled: true - min_samples: 10 + min_samples: 20 hit_rate_warn_threshold: 0.2 hit_rate_deprecate_threshold: 0.1 min_tracking_days: 3 - note: 信号淘汰机制。累计≥10样本+跟踪≥3天后,命中率<10%自动淘汰(权重→0);<20%警告并自动降权。 + note: 信号淘汰机制。累计≥20样本+跟踪≥3天后,命中率<10%自动淘汰(权重→0);<20%警告并自动降权;小样本只观察,不直接影响确认层权重。 deprecate_action: set_weight_zero warn_action: halve_weight reverse_analysis: diff --git a/tests/test_confirm_market_risk_gate.py b/tests/test_confirm_market_risk_gate.py new file mode 100644 index 0000000..eb97414 --- /dev/null +++ b/tests/test_confirm_market_risk_gate.py @@ -0,0 +1,28 @@ +from app.services.altcoin_confirm import _apply_market_risk_entry_gate + + +def test_market_risk_gate_blocks_executable_action_when_critical(): + signals = ["15min即刻入场信号"] + plan, reason = _apply_market_risk_entry_gate( + {"entry_action": "可即刻买入", "entry_price": 1.0}, + signals, + {"risk_level": "critical", "position_multiplier": 0.0}, + ) + + assert plan["entry_action"] == "观察" + assert plan["market_risk_gate"]["blocked_action"] == "可即刻买入" + assert "暂停新开仓" in reason + assert any("市场风控闸门" in sig for sig in signals) + + +def test_market_risk_gate_keeps_observation_action_visible(): + signals = [] + plan, reason = _apply_market_risk_entry_gate( + {"entry_action": "观察", "entry_price": 1.0}, + signals, + {"risk_level": "critical", "position_multiplier": 0.0}, + ) + + assert plan["entry_action"] == "观察" + assert "market_risk_gate" not in plan + assert reason == "" diff --git a/tests/test_factor_scoring.py b/tests/test_factor_scoring.py index 19e49a1..7708d1f 100644 --- a/tests/test_factor_scoring.py +++ b/tests/test_factor_scoring.py @@ -45,10 +45,41 @@ def test_factor_scorer_records_negative_risk_as_risk_score(): def test_signal_weight_alias_keeps_legacy_chinese_keys_available(monkeypatch): - monkeypatch.setattr("app.config.config_loader.load_rules", lambda: {"signal_weights": {"量价齐飞": 5}}) + monkeypatch.setattr("app.config.config_loader._load_yaml_baseline", lambda: {"signal_weights": {"量价齐飞": 5}}) monkeypatch.setattr("app.db.altcoin_db.get_signal_weights", lambda: {}) weights = get_signal_weights() assert weights["vp_fly_1h_current"] == 5 assert weights["量价齐飞"] == 5 + + +def test_signal_weight_ignores_thin_dynamic_samples(monkeypatch): + monkeypatch.setattr( + "app.config.config_loader._load_yaml_baseline", + lambda: {"signal_weights": {"量价齐飞": 5}}, + ) + monkeypatch.setattr( + "app.config.config_loader.load_rules", + lambda: { + "review": { + "min_samples_for_weight": 12, + "signal_deprecation": {"min_samples": 20}, + }, + }, + ) + monkeypatch.setattr( + "app.db.altcoin_db.get_signal_weights", + lambda: { + "vp_fly_1h_current": { + "weight": 0, + "total_count": 3, + "hit_rate": 0, + "avg_pnl": -1, + } + }, + ) + + weights = get_signal_weights() + + assert weights["vp_fly_1h_current"] == 5 diff --git a/tests/test_market_risk_recommendation_downgrade.py b/tests/test_market_risk_recommendation_downgrade.py new file mode 100644 index 0000000..2634f99 --- /dev/null +++ b/tests/test_market_risk_recommendation_downgrade.py @@ -0,0 +1,39 @@ +import json + +from app.db.recommendation_commands import downgrade_active_entries_for_market_risk + + +def test_downgrade_active_entries_for_market_risk_updates_executable_recs(pg_conn): + pg_conn.execute( + """ + INSERT INTO recommendation ( + symbol, rec_time, rec_state, rec_score, entry_price, status, + action_status, execution_status, display_bucket, lifecycle_state, + entry_plan_json + ) + VALUES + ('BUY/USDT', '2026-05-26T10:00:00', '爆发', 30, 1.0, 'active', + '可即刻买入', 'buy_now', 'realtime', 'buyable', '{"entry_action":"可即刻买入"}'), + ('WAIT/USDT', '2026-05-26T10:01:00', '爆发', 20, 1.0, 'active', + '等回踩', 'wait_pullback', 'watch_pool', 'waiting_entry', '{"entry_action":"等回踩"}'), + ('OBS/USDT', '2026-05-26T10:02:00', '观察', 10, 1.0, 'active', + '观察', 'observe', 'watch_pool', 'watching', '{"entry_action":"观察"}') + """ + ) + pg_conn.commit() + + result = downgrade_active_entries_for_market_risk("critical test", event_time="2026-05-26T11:00:00") + + assert result["updated_count"] == 2 + rows = pg_conn.execute( + "SELECT symbol, action_status, execution_status, display_bucket, entry_plan_json FROM recommendation ORDER BY symbol" + ).fetchall() + by_symbol = {row["symbol"]: dict(row) for row in rows} + assert by_symbol["BUY/USDT"]["action_status"] == "观察" + assert by_symbol["BUY/USDT"]["execution_status"] == "observe" + assert by_symbol["WAIT/USDT"]["action_status"] == "观察" + assert by_symbol["WAIT/USDT"]["execution_status"] == "observe" + assert by_symbol["OBS/USDT"]["action_status"] == "观察" + plan = json.loads(by_symbol["BUY/USDT"]["entry_plan_json"]) + assert plan["market_risk_gate"]["blocked_action"] == "可即刻买入" + assert plan["market_risk_gate"]["final_action"] == "观察" diff --git a/tests/test_paper_trading.py b/tests/test_paper_trading.py index 31ed5af..3f59943 100644 --- a/tests/test_paper_trading.py +++ b/tests/test_paper_trading.py @@ -14,6 +14,7 @@ from app.db.paper_trading import ( list_paper_trades, reset_paper_trading_data, send_paper_trading_report, + sync_pending_paper_orders, sync_recommendation, ) @@ -675,6 +676,47 @@ def test_wait_pullback_paper_order_fills_when_price_touches(monkeypatch): assert order["fill_price"] == pytest.approx(95) +def test_pending_paper_order_reconciles_from_latest_price_cache(monkeypatch): + monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1") + monkeypatch.setenv("ALPHAX_PAPER_TRADE_NOTIONAL_USDT", "100") + monkeypatch.setenv("ALPHAX_PAPER_TRADE_FEE_RATE", "0") + monkeypatch.setenv("ALPHAX_PAPER_TRADE_SLIPPAGE_PCT", "0") + altcoin_db.init_db() + rec_id = altcoin_db.create_recommendation( + symbol="CACHEFILL/USDT", + rec_state="蓄力", + rec_score=22, + entry_price=95, + stop_loss=90, + tp1=105, + tp2=112, + signals=["等待回踩"], + entry_plan={"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "risk_reward_ok": True, "rr1": 2.0}, + ) + rec = { + "id": rec_id, + "symbol": "CACHEFILL/USDT", + "execution_status": "wait_pullback", + "action_status": "等回踩", + "entry_price": 95, + "stop_loss": 90, + "tp1": 105, + "tp2": 112, + "entry_plan": {"entry_action": "等回踩", "entry_price": 95, "stop_loss": 90, "tp1": 105, "risk_reward_ok": True, "rr1": 2.0}, + } + sync_recommendation(rec, 100, event_time="2026-05-16T10:00:00") + altcoin_db.update_latest_price_cache("CACHEFILL/USDT", 94.9, updated_at="2026-05-16T10:05:00", source="test") + + result = sync_pending_paper_orders(event_time="2026-05-16T10:05:00") + + assert result["filled_count"] == 1 + assert result["results"][0]["paper_order"]["filled"] is True + trade = list_paper_trades()["items"][0] + assert trade["symbol"] == "CACHEFILL/USDT" + order = list_paper_orders(status="filled")["items"][0] + assert order["fill_price"] == pytest.approx(95) + + def test_wait_pullback_order_cancels_when_recommendation_invalid(monkeypatch): monkeypatch.setenv("ALPHAX_PAPER_TRADING_ENABLED", "1") altcoin_db.init_db() diff --git a/tests/test_price_streamer.py b/tests/test_price_streamer.py index 09ebb69..3df49d5 100644 --- a/tests/test_price_streamer.py +++ b/tests/test_price_streamer.py @@ -6,15 +6,29 @@ from app.db.runtime_config_db import set_config from app.services import price_streamer -@pytest.fixture -def buy_now_rec(monkeypatch): - set_config("system", "paper_trading", { +def _paper_config(**overrides): + cfg = { "enabled": True, "trade_notional_usdt": 5000, "trade_leverage": 5, "fee_rate": 0, "slippage_pct": 0, - }) + "entry_gate_enabled": False, + "entry_min_rr": 1.2, + "order_min_rr": 1.2, + "max_cumulative_leverage": 0, + "max_stop_loss_leverage_risk_pct": 0, + "max_account_drawdown_pause_pct": 0, + "pause_after_weak_entries": 0, + "global_risk_gate_enabled": False, + } + cfg.update(overrides) + return cfg + + +@pytest.fixture +def buy_now_rec(monkeypatch): + set_config("system", "paper_trading", _paper_config()) set_config("system", "price_streamer", { "enabled": True, "update_latest_price_cache": True, @@ -95,13 +109,7 @@ def test_price_streamer_tracks_open_paper_trade_without_active_rec(buy_now_rec): def test_price_streamer_fills_pending_paper_order(): - set_config("system", "paper_trading", { - "enabled": True, - "trade_notional_usdt": 5000, - "trade_leverage": 5, - "fee_rate": 0, - "slippage_pct": 0, - }) + set_config("system", "paper_trading", _paper_config()) set_config("system", "price_streamer", { "enabled": True, "update_latest_price_cache": True, @@ -140,13 +148,7 @@ def test_price_streamer_fills_pending_paper_order(): def test_price_streamer_prioritizes_pending_order_over_same_symbol_recommendation(): - set_config("system", "paper_trading", { - "enabled": True, - "trade_notional_usdt": 5000, - "trade_leverage": 5, - "fee_rate": 0, - "slippage_pct": 0, - }) + set_config("system", "paper_trading", _paper_config()) set_config("system", "price_streamer", { "enabled": True, "update_latest_price_cache": True, diff --git a/tests/test_review_accuracy_pipeline.py b/tests/test_review_accuracy_pipeline.py index d14590b..a7a1739 100644 --- a/tests/test_review_accuracy_pipeline.py +++ b/tests/test_review_accuracy_pipeline.py @@ -161,7 +161,7 @@ def test_daily_factor_weight_governance_promotes_and_eliminates(monkeypatch, tem monkeypatch.setattr(review_engine, "update_signal_weight", lambda signal, weight: changes.append((signal, weight))) monkeypatch.setattr(review_engine, "get_signal_weights", lambda: { "good_factor": {"category": "前瞻", "total_count": 12, "hit_rate": 70, "avg_pnl": 4.2, "weight": 1.0}, - "bad_factor": {"category": "前瞻", "total_count": 12, "hit_rate": 5, "avg_pnl": -2.0, "weight": 1.2}, + "bad_factor": {"category": "前瞻", "total_count": 20, "hit_rate": 5, "avg_pnl": -2.0, "weight": 1.2}, "thin_factor": {"category": "PA", "total_count": 2, "hit_rate": 100, "avg_pnl": 6.0, "weight": 1.0}, })