diff --git a/app/core/opportunity_lifecycle.py b/app/core/opportunity_lifecycle.py index 485714b..65089bc 100644 --- a/app/core/opportunity_lifecycle.py +++ b/app/core/opportunity_lifecycle.py @@ -131,6 +131,16 @@ def to_float(value: Any, default: float = 0.0) -> float: return default +def calc_rr_target_entry(stop_loss: float, tp1: float, min_rr: float) -> float: + """最高允许入场价:在该价格或更低买入,RR1 才能达到 min_rr。""" + stop_loss = to_float(stop_loss) + tp1 = to_float(tp1) + min_rr = to_float(min_rr) + if stop_loss <= 0 or tp1 <= stop_loss or min_rr <= 0: + return 0.0 + return round((tp1 + min_rr * stop_loss) / (1 + min_rr), 8) + + def detect_breakout_distance_pct(signals: Iterable[Any]) -> float: """从“站稳突破位 +66.7%”等信号中提取最大追高距离。""" max_pct = 0.0 @@ -320,6 +330,19 @@ def apply_entry_quality_gate( if action_status == "等回踩" and current_price > 0 and to_float(entry_plan.get("entry_price")) > 0 and current_price <= to_float(entry_plan.get("entry_price")) * 1.003 and (risk_reward_ok is False or rr1 < _cfg_value(cfg, "min_rr_buy_now")): target_action = "观察" reasons.append("回踩参考已到,但实时盈亏比不达标,转为观察") + elif action_status == "可即刻买入" and current_price > 0 and stop_loss > 0 and tp1 > stop_loss and (risk_reward_ok is False or rr1 < _cfg_value(cfg, "min_rr_buy_now")): + rr_target_entry = calc_rr_target_entry(stop_loss, tp1, _cfg_value(cfg, "min_rr_buy_now")) + if rr_target_entry > stop_loss and rr_target_entry < current_price * 0.997: + target_action = "等回踩" + entry_plan["entry_price"] = rr_target_entry + entry_plan["entry_method"] = f"等回踩至可买RR价 {rr_target_entry:.8g}" + entry_plan["entry_action"] = "等回踩" + entry_plan["rr_target_entry"] = rr_target_entry + entry_plan["rr_target_reason"] = f"现价RR不足,需回落到该价或更低,RR1才≥{_cfg_value(cfg, 'min_rr_buy_now')}" + reasons.append(f"现价不买,等回落到{rr_target_entry:.8g}附近再评估") + else: + target_action = "观察" + reasons.append("无法给出有效回踩买点,转为观察") else: # risk_reward_ok=false / rr1不足 / 追高距离过远 都代表“现价买入被禁止”; # 展示层必须降级为“等回踩/观察”,否则会出现“闸门禁止买入但仍显示入场窗口”的矛盾。 diff --git a/app/db/altcoin_db.py b/app/db/altcoin_db.py index 7240d87..6f15429 100644 --- a/app/db/altcoin_db.py +++ b/app/db/altcoin_db.py @@ -1612,11 +1612,30 @@ def update_state(symbol, new_state, score=0, anomaly_type="", sector="", def get_candidates_for_confirm(): - """获取需要确认层检查的候选(加速状态+score≥5)""" + """获取需要确认层检查的候选。 + + 优先处理最近一轮粗筛/细筛刚更新的候选,避免旧 coin_state 中的高分候选 + 抢占确认层,导致链路日志里“细筛通过”和“确认处理”对不上。 + """ + try: + _, _, accumulate_threshold = state_score_thresholds() + except Exception: + accumulate_threshold = 3 conn = get_conn() rows = conn.execute(""" - SELECT * FROM coin_state WHERE state IN ('加速', '蓄力') AND score >= 5 - """).fetchall() + SELECT * FROM coin_state + WHERE state IN ('加速', '蓄力') + AND score >= ? + AND julianday(?) - julianday(detected_at) <= ? + ORDER BY detected_at DESC, score DESC + """, (accumulate_threshold, datetime.now().isoformat(), 45 / 1440.0)).fetchall() + if not rows: + rows = conn.execute(""" + SELECT * FROM coin_state + WHERE state IN ('加速', '蓄力') + AND score >= 5 + ORDER BY detected_at DESC, score DESC + """).fetchall() conn.close() return [dict(r) for r in rows] diff --git a/app/db/analytics.py b/app/db/analytics.py index b0c10c5..fa549b2 100644 --- a/app/db/analytics.py +++ b/app/db/analytics.py @@ -1,7 +1,7 @@ """Analytics-facing DB API grouped by read concerns.""" import json -from datetime import datetime +from datetime import datetime, timedelta from app.db.altcoin_db import ( _classify_recommendation_result, @@ -38,6 +38,37 @@ def _loads_json(value, fallback): return fallback +def _safe_int(value, default=0): + try: + return int(value or 0) + except Exception: + return default + + +def _safe_float(value, default=0.0): + try: + return float(value or 0) + except Exception: + return default + + +def _parse_dt(value): + if isinstance(value, datetime): + return value + if not value: + return None + try: + return datetime.fromisoformat(str(value).replace("Z", "+00:00")).replace(tzinfo=None) + except Exception: + return None + + +def _iso(value): + if isinstance(value, datetime): + return value.isoformat(timespec="seconds") + return str(value or "") + + def get_observation_candidates(limit=50): """Return current coarse-screen observation candidates for the watch pool.""" conn = get_conn() @@ -750,11 +781,369 @@ def get_cron_run_summary(hours=24): } +def _pipeline_window(run, next_run_start=None): + started = _parse_dt(run.get("started_at")) or datetime.now() + finished = _parse_dt(run.get("finished_at")) or started + if finished < started: + finished = started + window_start = started - timedelta(minutes=10) + window_end = finished + timedelta(minutes=30) + next_started = _parse_dt(next_run_start) + if next_started and next_started > finished: + window_end = min(window_end, next_started - timedelta(seconds=1)) + return window_start, window_end + + +def _cron_item(row): + item = dict(row) + item["summary_json"] = _loads_json(item.get("summary_json"), {}) + return item + + +def _screening_item(row): + item = dict(row) + item["signals"] = _loads_json(item.get("signals"), []) + item["detail_json"] = _loads_json(item.get("detail_json"), {}) + if item.get("layer") == "细筛": + item["stage_bucket"] = "fine" + item["stage_label"] = "细筛通过" + elif item.get("layer") == "确认": + item["stage_bucket"] = "confirm" + item["stage_label"] = "确认记录" + else: + item["stage_bucket"] = "coarse" + item["stage_label"] = "观察候选" + return item + + +def _recommendation_item(row): + item = dict(row) + item["signals"] = _loads_json(item.get("signals"), []) + item["signal_codes"] = _loads_json(item.get("signal_codes_json"), []) + item["signal_labels"] = _loads_json(item.get("signal_labels_json"), []) + item["entry_plan"] = _loads_json(item.get("entry_plan_json"), {}) + rec_result, rec_result_label = _classify_recommendation_result(item) + item["recommendation_result"] = rec_result + item["recommendation_result_label"] = rec_result_label + _derive_execution_fields(item) + return item + + +def _review_item(row): + item = dict(row) + item["triggered_signals"] = _loads_json(item.get("triggered_signals"), []) + item["hit_signals"] = _loads_json(item.get("hit_signals"), []) + item["miss_signals"] = _loads_json(item.get("miss_signals"), []) + return item + + +def _missed_item(row): + item = dict(row) + item["features_detected"] = _loads_json(item.get("features_detected"), {}) + return item + + +def _performance_status(rec, reviews_by_rec): + status = (rec.get("status") or "").strip() + review_outcomes = [(r.get("outcome") or "").strip() for r in reviews_by_rec.get(rec.get("id"), [])] + if status in ("hit_tp1", "hit_tp2") or "爆发" in review_outcomes: + return "success" + if status == "stopped_out" or "失败" in review_outcomes: + return "failed" + return "pending" + + +def _select_pipeline_rows(conn, run): + next_row = conn.execute( + """ + SELECT started_at FROM cron_run_log + WHERE job_name='粗筛' AND started_at > ? + ORDER BY started_at ASC, id ASC + LIMIT 1 + """, + (run.get("started_at"),), + ).fetchone() + window_start, window_end = _pipeline_window(run, next_row["started_at"] if next_row else None) + run_started = _iso(_parse_dt(run.get("started_at")) or window_start) + run_finished = _iso(_parse_dt(run.get("finished_at")) or _parse_dt(run.get("started_at")) or window_start) + start_text = _iso(window_start) + end_text = _iso(window_end) + cron_rows = conn.execute( + """ + SELECT * FROM cron_run_log + WHERE started_at >= ? AND started_at <= ? + AND ( + job_name IN ('事件舆情', '跟踪', '复盘') + OR (job_name='粗筛' AND id=?) + OR (job_name='确认' AND started_at >= ?) + ) + ORDER BY started_at ASC, id ASC + """, + (start_text, end_text, run.get("id"), run_finished), + ).fetchall() + screening_rows = conn.execute( + """ + SELECT * FROM screening_log + WHERE ( + layer IN ('粗筛', '细筛') AND scan_time >= ? AND scan_time <= ? + ) OR ( + layer='确认' AND scan_time >= ? AND scan_time <= ? + ) OR ( + layer='舆情触发' AND scan_time >= ? AND scan_time <= ? + ) + ORDER BY scan_time ASC, score DESC, id ASC + """, + (run_started, run_finished, run_finished, end_text, start_text, end_text), + ).fetchall() + rec_rows = conn.execute( + """ + SELECT * FROM recommendation + WHERE rec_time >= ? AND rec_time <= ? + ORDER BY rec_time ASC, id ASC + """, + (run_finished, end_text), + ).fetchall() + rec_ids = [row["id"] for row in rec_rows] + reviews = [] + if rec_ids: + placeholders = ",".join(["?"] * len(rec_ids)) + reviews = conn.execute( + f""" + SELECT * FROM review_log + WHERE rec_id IN ({placeholders}) + ORDER BY review_time ASC, id ASC + """, + tuple(rec_ids), + ).fetchall() + review_window_rows = conn.execute( + """ + SELECT * FROM review_log + WHERE review_time >= ? AND review_time <= ? + ORDER BY review_time ASC, id ASC + """, + (run_finished, end_text), + ).fetchall() + known_review_ids = {row["id"] for row in reviews} + for row in review_window_rows: + if row["id"] not in known_review_ids: + reviews.append(row) + known_review_ids.add(row["id"]) + missed_rows = conn.execute( + """ + SELECT * FROM missed_explosions + WHERE detect_time >= ? AND detect_time <= ? + ORDER BY detect_time ASC, id ASC + """, + (run_finished, end_text), + ).fetchall() + return { + "window_start": start_text, + "window_end": end_text, + "cron_rows": [_cron_item(row) for row in cron_rows], + "screening_rows": [_screening_item(row) for row in screening_rows], + "recommendation_rows": [_recommendation_item(row) for row in rec_rows], + "review_rows": [_review_item(row) for row in reviews], + "missed_rows": [_missed_item(row) for row in missed_rows], + } + + +def _pipeline_summary_for_run(run, related): + summary = _loads_json(run.get("summary_json"), {}) + confirm_rows = [r for r in related["cron_rows"] if r.get("job_name") == "确认"] + event_rows = [r for r in related["cron_rows"] if r.get("job_name") == "事件舆情"] + track_rows = [r for r in related["cron_rows"] if r.get("job_name") == "跟踪"] + review_cron_rows = [r for r in related["cron_rows"] if r.get("job_name") == "复盘"] + + confirm_processed = 0 + confirm_hits = 0 + for row in confirm_rows: + s = row.get("summary_json") or {} + confirm_processed += _safe_int(s.get("processed_count")) + confirm_hits += _safe_int(s.get("confirmed_count")) + + reviews_by_rec = {} + for review in related["review_rows"]: + reviews_by_rec.setdefault(review.get("rec_id"), []).append(review) + + perf_counts = {"success": 0, "failed": 0, "pending": 0} + for rec in related["recommendation_rows"]: + perf_counts[_performance_status(rec, reviews_by_rec)] += 1 + + status = run.get("run_status") or "unknown" + rough_candidates = _safe_int(summary.get("total_candidates")) + fine_qualified = _safe_int(summary.get("total_qualified")) + if not rough_candidates: + rough_candidates = sum(1 for item in related["screening_rows"] if item.get("layer") == "粗筛") + if not fine_qualified: + fine_qualified = sum(1 for item in related["screening_rows"] if item.get("layer") == "细筛") + + recommendations = len(related["recommendation_rows"]) + hit_rate = round(recommendations / fine_qualified * 100, 1) if fine_qualified else 0 + issue_notes = [] + if status != "success": + issue_notes.append(run.get("error_message") or "任务异常") + if rough_candidates and not fine_qualified: + issue_notes.append("粗筛后细筛为空") + if fine_qualified and not confirm_hits: + issue_notes.append("确认未命中") + if confirm_hits and not recommendations: + issue_notes.append("确认命中但未生成推荐") + if perf_counts["failed"]: + issue_notes.append(f"失败 {perf_counts['failed']}") + if related["missed_rows"]: + issue_notes.append(f"漏选 {len(related['missed_rows'])}") + + return { + "id": run.get("id"), + "run_id": run.get("id"), + "job_name": run.get("job_name"), + "script_name": run.get("script_name"), + "started_at": run.get("started_at"), + "finished_at": run.get("finished_at"), + "duration_ms": _safe_int(run.get("duration_ms")), + "run_status": status, + "result_status": run.get("result_status") or "", + "error_message": run.get("error_message") or "", + "window_start": related["window_start"], + "window_end": related["window_end"], + "rough_candidates": rough_candidates, + "fine_qualified": fine_qualified, + "confirm_processed": confirm_processed, + "confirm_hits": confirm_hits, + "recommendations": recommendations, + "perf_success": perf_counts["success"], + "perf_failed": perf_counts["failed"], + "perf_pending": perf_counts["pending"], + "missed_count": len(related["missed_rows"]), + "event_count": sum(_safe_int((row.get("summary_json") or {}).get("processed_count")) for row in event_rows), + "tracked_count": sum(_safe_int((row.get("summary_json") or {}).get("tracked_count")) for row in track_rows), + "review_count": len(related["review_rows"]), + "review_run_count": len(review_cron_rows), + "hit_rate": hit_rate, + "issue_notes": issue_notes[:3], + } + + +def get_pipeline_runs(limit=30, hours=24): + """按粗筛任务批次聚合推荐链路日志。""" + try: + limit = max(1, min(int(limit or 30), 100)) + except Exception: + limit = 30 + try: + hours = max(1, min(int(hours or 24), 24 * 30)) + except Exception: + hours = 24 + + conn = get_conn() + run_rows = conn.execute( + """ + SELECT * FROM cron_run_log + WHERE job_name = '粗筛' + AND julianday(?) - julianday(started_at) <= ? + ORDER BY started_at DESC, id DESC + LIMIT ? + """, + (datetime.now().isoformat(), hours / 24.0, limit), + ).fetchall() + + runs = [] + for row in run_rows: + run = _cron_item(row) + related = _select_pipeline_rows(conn, run) + runs.append(_pipeline_summary_for_run(run, related)) + conn.close() + + kpi = { + "hours": hours, + "run_count": len(runs), + "rough_candidates": sum(item["rough_candidates"] for item in runs), + "fine_qualified": sum(item["fine_qualified"] for item in runs), + "confirm_processed": sum(item["confirm_processed"] for item in runs), + "confirm_hits": sum(item["confirm_hits"] for item in runs), + "recommendations": sum(item["recommendations"] for item in runs), + "perf_success": sum(item["perf_success"] for item in runs), + "perf_failed": sum(item["perf_failed"] for item in runs), + "perf_pending": sum(item["perf_pending"] for item in runs), + "missed_count": sum(item["missed_count"] for item in runs), + } + kpi["recommendation_rate"] = round(kpi["recommendations"] / kpi["fine_qualified"] * 100, 1) if kpi["fine_qualified"] else 0 + kpi["performance_hit_rate"] = round(kpi["perf_success"] / (kpi["perf_success"] + kpi["perf_failed"]) * 100, 1) if (kpi["perf_success"] + kpi["perf_failed"]) else 0 + return {"kpi": kpi, "runs": runs} + + +def get_pipeline_run_detail(run_id): + """返回某次粗筛批次的链路明细。""" + conn = get_conn() + row = conn.execute("SELECT * FROM cron_run_log WHERE id=? AND job_name='粗筛'", (run_id,)).fetchone() + if not row: + conn.close() + return None + run = _cron_item(row) + related = _select_pipeline_rows(conn, run) + conn.close() + + summary = _pipeline_summary_for_run(run, related) + reviews_by_rec = {} + for review in related["review_rows"]: + reviews_by_rec.setdefault(review.get("rec_id"), []).append(review) + + recommendations = [] + for rec in related["recommendation_rows"]: + rec_reviews = reviews_by_rec.get(rec.get("id"), []) + rec["performance_status"] = _performance_status(rec, reviews_by_rec) + rec["reviews"] = rec_reviews + if rec["performance_status"] == "success": + rec["stage_label"] = "复盘命中" + elif rec["performance_status"] == "failed": + rec["stage_label"] = "复盘失败" + else: + rec["stage_label"] = "交易推荐" + recommendations.append(rec) + + timeline = [] + for cron in related["cron_rows"]: + s = cron.get("summary_json") or {} + timeline.append({ + "stage": cron.get("job_name") or "任务", + "started_at": cron.get("started_at"), + "finished_at": cron.get("finished_at"), + "duration_ms": _safe_int(cron.get("duration_ms")), + "run_status": cron.get("run_status") or "", + "result_status": cron.get("result_status") or "", + "summary": s, + "error_message": cron.get("error_message") or "", + }) + + screening_items = related["screening_rows"] + stage_counts = { + "observation": sum(1 for item in screening_items if item.get("layer") == "粗筛"), + "fine": sum(1 for item in screening_items if item.get("layer") == "细筛"), + "confirm_rejected": max(0, summary["confirm_processed"] - summary["confirm_hits"]), + "recommendation": len(recommendations), + "review_success": summary["perf_success"], + "review_failed": summary["perf_failed"], + "missed": summary["missed_count"], + } + + return { + "summary": summary, + "timeline": timeline, + "stage_counts": stage_counts, + "screening_items": screening_items, + "recommendations": recommendations, + "reviews": related["review_rows"], + "missed_explosions": related["missed_rows"], + } + + __all__ = [ "get_all_recommendations", "get_observation_candidates", "get_cron_run_logs", "get_cron_run_summary", + "get_pipeline_run_detail", + "get_pipeline_runs", "get_review_stats", "get_screening_history", "get_stats", diff --git a/app/services/altcoin_confirm.py b/app/services/altcoin_confirm.py index 920a53a..4f74b1e 100644 --- a/app/services/altcoin_confirm.py +++ b/app/services/altcoin_confirm.py @@ -1292,6 +1292,21 @@ def main(compact: bool = False): mainline_item = get_recommendation_for_push(rec_id) push_mainline_state_update(symbol, rec_id, mainline_item) else: + cand_detail = json.loads(cand.get("detail_json", "{}")) + log_screening( + layer="确认", symbol=symbol, state=cand.get("state", "蓄力"), score=result.get("score", 0), + price=result.get("price", 0), signals=result.get("signals", []), + sector=cand_detail.get("sector", cand.get("sector", "")), + leader_status=cand_detail.get("leader_status", cand.get("leader_status", "")), + is_meme=int(is_meme_coin(symbol)), + detail={ + "confirmed": False, + "reason": "确认未通过", + "entry_plan": result.get("entry_plan") or {}, + "fresh_reason": result.get("fresh_reason", ""), + "trigger_context": result.get("trigger_context") or {}, + }, + ) result["state_update"] = {"should_alert": False, "reason": "未确认爆发"} results.append({"symbol": symbol, **result}) diff --git a/app/services/altcoin_screener.py b/app/services/altcoin_screener.py index 6fcd5b1..29f6839 100644 --- a/app/services/altcoin_screener.py +++ b/app/services/altcoin_screener.py @@ -882,6 +882,26 @@ def layer1_coarse_filter(): total_bypass = bypass_count + hl_count_total + cs_count_total print(f"粗筛结果: {len(candidates)}个候选(含{total_bypass}个旁路: 静K{bypass_count}+底抬{hl_count_total}+压放{cs_count_total})") + for symbol, cand in candidates.items(): + log_screening( + layer="粗筛", + symbol=symbol, + state="候选", + score=cand.get("anomaly_score", 0), + price=cand.get("price", 0), + signals=cand.get("anomalies", []), + is_meme=int(cand.get("is_meme") or 0), + change_24h=cand.get("change_24h", 0), + funding_rate=cand.get("funding_rate", 0), + detail={ + "candidate_stage": "coarse_candidate", + "volume_24h": cand.get("volume_24h", 0), + "turnover_acceleration_1h": cand.get("turnover_acceleration_1h", 0), + "turnover_acceleration_4h": cand.get("turnover_acceleration_4h", 0), + "signal_recency": _build_signal_recency(cand), + "bypass_origin": cand.get("bypass_origin", ""), + }, + ) return candidates diff --git a/app/services/price_tracker.py b/app/services/price_tracker.py index 94d45ad..cff795a 100644 --- a/app/services/price_tracker.py +++ b/app/services/price_tracker.py @@ -41,6 +41,12 @@ from app.core.opportunity_lifecycle import apply_entry_quality_gate exchange = ccxt.binance({"enableRateLimit": True}) REPO_ROOT = Path(__file__).resolve().parents[2] +PROVISIONAL_BUY_SIGNAL_MARKERS = ( + "可即刻入场", + "当前价接近回踩目标", + "回踩确认完毕", +) + def fetch_klines(symbol, timeframe, limit=200): try: @@ -52,6 +58,49 @@ def fetch_klines(symbol, timeframe, limit=200): return None +def _format_tracking_price(price): + try: + price = float(price) + except Exception: + return "" + if price <= 0: + return "" + if price >= 1: + return f"${price:.3f}" + if price >= 0.01: + return f"${price:.4f}" + if price >= 0.0001: + return f"${price:.6f}" + return f"${price:.8f}" + + +def reconcile_buy_signals_after_gate(buy_signals, final_action, gated_plan, gate_reasons): + """买点质量闸门降级后,移除临时买入文案,保留最终终端指引。""" + if final_action == "可即刻买入" or not gate_reasons: + return list(buy_signals or []) + + filtered = [ + str(signal) + for signal in (buy_signals or []) + if not any(marker in str(signal) for marker in PROVISIONAL_BUY_SIGNAL_MARKERS) + ] + first_reason = str(gate_reasons[0]) + if final_action == "等回踩": + target_price = ( + (gated_plan or {}).get("rr_target_entry") + or (gated_plan or {}).get("entry_price") + or (gated_plan or {}).get("wait_price") + ) + price_text = _format_tracking_price(target_price) + if price_text: + filtered.append(f"🟡 现价不买,等待回踩至{price_text}附近;{first_reason}") + else: + filtered.append(f"🟡 现价不买,继续等待回踩;{first_reason}") + elif final_action == "观察": + filtered.append(f"🟡 买点未达标,保持观察;{first_reason}") + return filtered + + def analyze_tracking_signals(symbol, rec, current_price): """ 对active推荐做动态跟踪分析 @@ -264,6 +313,12 @@ def analyze_tracking_signals(symbol, rec, current_price): sector_context=rec.get("sector_context") or {}, ) if gate_reasons: + buy_signals = reconcile_buy_signals_after_gate( + buy_signals, + action_status, + gated_plan, + gate_reasons, + ) buy_signals.append("⚠️ 买点质量闸门: " + ";".join(gate_reasons[:3])) entry_plan.update(gated_plan) diff --git a/app/web/routes_pages.py b/app/web/routes_pages.py index cc995bd..4432d25 100644 --- a/app/web/routes_pages.py +++ b/app/web/routes_pages.py @@ -41,6 +41,13 @@ def build_router(templates, repo_root: Path, stock_report_template: str): return redirect return render_page("watchlist.html", request) + @router.get("/pipeline", response_class=HTMLResponse) + async def pipeline_page(request: Request): + user, redirect = require_page_user(request) + if redirect: + return redirect + return render_page("pipeline.html", request) + @router.get("/strategy", response_class=HTMLResponse) async def strategy_page(request: Request): user, redirect = require_page_user(request) diff --git a/app/web/routes_recommendations.py b/app/web/routes_recommendations.py index 11bc372..a715845 100644 --- a/app/web/routes_recommendations.py +++ b/app/web/routes_recommendations.py @@ -6,6 +6,8 @@ from app.db.analytics import ( get_cron_run_logs, get_cron_run_summary, get_observation_candidates, + get_pipeline_run_detail, + get_pipeline_runs, get_review_stats, get_screening_history, get_stats, @@ -149,3 +151,18 @@ async def api_cron(limit: int = 50, job_name: str = "", altcoin_session: str = C async def api_cron_summary(hours: int = 24, altcoin_session: str = Cookie(default="")): require_api_user_with_subscription(altcoin_session) return get_cron_run_summary(hours=hours) + + +@router.get("/api/pipeline/runs") +async def api_pipeline_runs(limit: int = 30, hours: int = 24, altcoin_session: str = Cookie(default="")): + require_api_user_with_subscription(altcoin_session) + return get_pipeline_runs(limit=limit, hours=hours) + + +@router.get("/api/pipeline/runs/{run_id}") +async def api_pipeline_run_detail(run_id: int, altcoin_session: str = Cookie(default="")): + require_api_user_with_subscription(altcoin_session) + detail = get_pipeline_run_detail(run_id) + if not detail: + return {"error": "pipeline run not found", "run_id": run_id} + return detail diff --git a/docker-compose.yml b/docker-compose.yml index 4d02e7d..b91505a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,9 +35,9 @@ services: env_file: - .env environment: - # 默认 dry-run,确保第一次 docker compose up 不会直接写库/推送。 - # 验证无误后改成 0。 - ALPHAX_SCHEDULER_DRY_RUN: "1" + # 本地 Docker 副本需要真实跑链路,方便验证筛选/确认/跟踪/复盘结果。 + # 调度器仍然单进程串行执行,避免 SQLite 写锁。 + ALPHAX_SCHEDULER_DRY_RUN: "0" ALPHAX_DB_PATH: "/app/data/altcoin_monitor.db" command: ["scheduler"] volumes: diff --git a/rules.yaml b/rules.yaml index a90d80a..b4629fb 100644 --- a/rules.yaml +++ b/rules.yaml @@ -405,11 +405,11 @@ event_driven: note: Solana meme主题扩散 meta: version: 1 - last_review: '2026-05-14T01:10:42.599449' - last_reverse_analysis: '2026-05-14T01:11:19.360232' - total_reviews: 20 + last_review: '2026-05-14T09:19:05.923167' + last_reverse_analysis: '2026-05-14T09:19:39.019005' + total_reviews: 26 total_rules_learned: 37 - iteration_count: 25 + iteration_count: 31 strategy_version: v1.7.11 strategy_revision_started_at: '2026-05-09T01:20:00' strategy_revision_note: 'v1.7.11: 触发时效治理,旧形态只作背景,消息触发显式标记' diff --git a/static/admin.html b/static/admin.html index dd0610e..b66100c 100644 --- a/static/admin.html +++ b/static/admin.html @@ -3,12 +3,13 @@ {% block nav_links %} 看板 -关注 -策略 -迭代 舆情 订阅 推荐 + + + + {% endblock %} diff --git a/static/app.html b/static/app.html index de046ff..6cc18a4 100644 --- a/static/app.html +++ b/static/app.html @@ -139,6 +139,21 @@ .h-pnl-row { display: flex; align-items: center; gap: 8px; padding: 4px 18px 8px; } .h-arrow { color: var(--stone); font-size: 12px; } .h-duration { font-size: 11px; margin-left: auto; } +.decision-strip { display: grid; grid-template-columns: minmax(92px, auto) minmax(0, 1fr); align-items: center; gap: 10px; margin: 0 18px 10px; padding: 9px 10px; min-height: 48px; border: 1px solid var(--hairline-soft); border-radius: var(--radius-lg); background: var(--surface); min-width: 0; } +.decision-head { display: flex; flex-direction: column; gap: 3px; min-width: 0; } +.decision-label { color: var(--stone); font-size: 10px; font-weight: 900; line-height: 1.1; white-space: nowrap; } +.decision-title { font-size: 13px; font-weight: 900; line-height: 1.2; white-space: nowrap; } +.decision-body { min-width: 0; display: flex; flex-direction: column; gap: 3px; } +.decision-focus { color: var(--ink); font-size: 13px; font-weight: 900; line-height: 1.2; font-family: ui-monospace, SFMono-Regular, Menlo, monospace; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } +.decision-reason { color: var(--stone); font-size: 11px; font-weight: 700; line-height: 1.25; min-width: 0; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; } +.decision-strip.buy { background: var(--green-light); border-color: rgba(0,180,115,.18); } +.decision-strip.buy .decision-title { color: var(--green); } +.decision-strip.wait { background: var(--yellow-light); border-color: rgba(252,185,0,.24); } +.decision-strip.wait .decision-title { color: var(--yellow-dark); } +.decision-strip.observe, +.decision-strip.weak { background: rgba(66,98,255,.04); border-color: rgba(66,98,255,.12); } +.decision-strip.observe .decision-title { color: var(--blue); } +.decision-strip.weak .decision-title { color: var(--muted); } /* ===== K-LINE ===== */ .kline-wrap { padding: 0 8px 4px; } @@ -156,38 +171,9 @@ .ep-val { font-weight: 900; font-family: ui-monospace, SFMono-Regular, Menlo, monospace; font-size: 13px; line-height: 1.25; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } .ep-val.entry-ref { color: var(--yellow-dark); } .ep-val.risk-line { color: var(--red); } .ep-val.space-ref { color: var(--blue); } .ep-val.phase-ref { color: var(--green); } .ep-sub { color: var(--muted); font-size: 10px; font-weight: 600; line-height: 1.2; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } -.trigger-cause { margin: 0 18px 8px; padding: 8px 10px; border: 1px solid rgba(66,98,255,.12); border-radius: var(--radius-lg); background: rgba(66,98,255,.04); display: flex; align-items: center; gap: 8px; min-width: 0; } -.trigger-cause .tc-label { flex-shrink: 0; color: var(--blue); font-size: 10px; font-weight: 900; line-height: 1.2; } -.trigger-cause .tc-value { color: var(--slate); font-size: 12px; font-weight: 700; line-height: 1.35; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; } -.signal-context { display: flex; flex-direction: column; gap: 6px; padding: 0 18px 8px; } -.signal-context .trigger-cause, -.signal-context .trigger-meta { margin: 0; } -.trigger-meta { padding: 8px 10px; border-radius: var(--radius-lg); border: 1px solid var(--hairline-soft); background: var(--surface); font-size: 12px; color: var(--stone); display: flex; flex-direction: column; gap: 3px; min-width: 0; } -.trigger-meta span { font-size: 10px; font-weight: 900; color: var(--stone); line-height: 1.2; } -.trigger-meta small { font-size: 11px; color: var(--stone); line-height: 1.35; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; } -.trigger-meta.current { border-color: rgba(0,180,115,.18); background: rgba(0,180,115,.045); } -.trigger-meta.current span { color: var(--green); } -.trigger-meta.event { border-color: rgba(66,98,255,.16); background: rgba(66,98,255,.04); } -.trigger-meta.event span { color: var(--blue); } -.trigger-meta.stale { border-color: var(--hairline-soft); background: var(--surface); } -.trigger-meta.stale span { color: var(--muted); } -.trust-row { display: grid; grid-template-columns: 1fr 1fr; gap: 8px; padding: 0 18px 10px; } -.trust-pill { border: 1px solid var(--hairline-soft); border-radius: var(--radius-lg); background: var(--surface); padding: 8px 10px; min-width: 0; } -.trust-pill .trust-label { display: block; font-size: 10px; color: var(--stone); font-weight: 700; text-transform: uppercase; margin-bottom: 3px; } -.trust-pill .trust-value { display: block; font-size: 13px; color: var(--ink); font-weight: 800; line-height: 1.25; } -.trust-pill .trust-sub { display: block; font-size: 10px; color: var(--stone); margin-top: 2px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis; } -.trust-pill.window-active { background: var(--green-light); border-color: rgba(0,180,115,.18); } -.trust-pill.window-active .trust-value { color: var(--green); } -.trust-pill.window-warn { background: var(--yellow-light); border-color: rgba(252,185,0,.24); } -.trust-pill.window-warn .trust-value { color: var(--yellow-dark); } -.trust-pill.window-danger { background: var(--red-light); border-color: rgba(229,62,62,.20); } -.trust-pill.window-danger .trust-value { color: var(--red); } -.trust-pill.risk { background: rgba(66,98,255,.06); border-color: rgba(66,98,255,.12); } -.trust-pill.risk .trust-value { color: var(--blue); } - /* ===== SIGNALS ===== */ -.signals-row { display: flex; flex-wrap: wrap; gap: 4px; padding: 0 18px 8px; } -.sig { font-size: 11px; padding: 3px 8px; border-radius: var(--radius-full); font-weight: 700; white-space: nowrap; line-height: 1.3; } +.signals-row { display: flex; flex-wrap: nowrap; gap: 4px; padding: 0 18px 8px; min-height: 25px; overflow: hidden; } +.sig { font-size: 11px; padding: 3px 8px; border-radius: var(--radius-full); font-weight: 700; white-space: nowrap; line-height: 1.3; overflow: hidden; text-overflow: ellipsis; max-width: 50%; flex: 0 1 auto; } .sig.strong { color: #600000; background: #ffc6c6; } .sig.forward { color: var(--green); background: var(--green-light); } .sig.pa { color: var(--blue); background: rgba(66,98,255,.06); } @@ -241,12 +227,7 @@ .stats-strip { align-items: stretch; } .stats-main { width: 100%; } .entry-plan { grid-template-columns: repeat(2, minmax(0, 1fr)); padding: 8px 14px; } - .trigger-cause { margin: 0 14px 8px; align-items: flex-start; } - .trigger-cause .tc-value { white-space: normal; display: -webkit-box; -webkit-line-clamp: 2; -webkit-box-orient: vertical; } - .signal-context { padding: 0 14px 8px; } - .signal-context .trigger-cause { margin: 0; } - .trigger-meta small { white-space: normal; display: -webkit-box; -webkit-line-clamp: 2; -webkit-box-orient: vertical; } - .trust-row { grid-template-columns: 1fr; padding: 0 14px 8px; } + .decision-strip { margin: 0 14px 8px; grid-template-columns: 86px minmax(0,1fr); } } @media(max-width:360px) { @@ -648,7 +629,8 @@ function renderRecCard(r) { function opportunityPhase(r, triggerText, sigText) { var text = cleanDisplayText([r.execution_label, r.execution_reason, triggerText, sigText].join(' ')); if (r.execution_status === 'buy_now') return {label:'入场窗口', cls:'buy', short:'窗口'}; - if (/回踩|pullback/i.test(text)) return {label:'等回踩', cls:'wait', short:'回踩'}; + if (r.execution_status === 'wait_pullback' || r.lifecycle_state === 'waiting_entry') return {label:'等回踩', cls:'wait', short:'回踩'}; + if (r.execution_status === 'observe' || r.display_bucket === 'watch_pool') return (r.observe_tier === 'weak') ? {label:'弱观察', cls:'weak', short:'弱观察'} : {label:'观察中', cls:'obs', short:'观察'}; if (/突破|breakout|上破|放量突破|突破确认/i.test(text)) return {label:'等突破', cls:'wait', short:'突破'}; if (/确认|静K|收线|站稳|量能|放量|confirm/i.test(text)) return {label:'等确认', cls:'obs', short:'确认'}; return (r.observe_tier === 'weak') ? {label:'弱观察', cls:'weak', short:'弱观察'} : {label:'观察中', cls:'obs', short:'观察'}; @@ -700,14 +682,22 @@ function renderRecCard(r) { function fmtP(p) { return fmtPrice(p, priceDecimals(price || p)); } var pnl = r.pnl_pct||0, pnlCls = pnl>0?'pos':pnl<0?'neg':'zero', pnlSign = pnl>0?'+':''; var priceFmt = fmtPrice(price); - var sigHtml = sigs.slice(0,3).map(function(s){ + function displaySignalText(s) { + var text = cleanDisplayText(s); + if (!isBuy) { + text = text + .replace(/15min\s*入场窗口信号/g, '15min触发信号') + .replace(/入场窗口信号/g, '触发信号') + .replace(/入场窗口确认/g, '触发确认'); + } + return text; + } + var sigHtml = sigs.slice(0,2).map(function(s){ var cls = 'info'; if(/量价齐飞|起爆点|放量/.test(s)) cls='strong'; else if(/静K|筑底|回踩|突破|蓄力|底部抬高|压缩/.test(s)) cls='forward'; else if(/动K|PA|转折/.test(s)) cls='pa'; else if(/衰减|空头|风险|背离|闸门/.test(s)) cls='warn'; - return ''+cleanDisplayText(s)+''; + return ''+displaySignalText(s)+''; }).join(''); - var entryMethod = ep.entry_method || ''; - var triggerCause = normalizeTriggerCause(entryMethod || (isBuy?'15min 触发 · 窗口有效':phase.label+' · 等待条件满足')); var score = r.rec_score||0, st = scoreTier(score), ver = r.strategy_version||''; var hasQualityGate = ep.entry_quality_gate && Array.isArray(ep.entry_quality_gate.reasons) && ep.entry_quality_gate.reasons.length; var entryLabel = isWait ? '回踩参考' : (hasQualityGate ? '失效参考' : '参考价位'); @@ -723,18 +713,21 @@ function renderRecCard(r) { var riskLine = ep.stop_loss || r.stop_loss || 0; var spaceRef = ep.tp1 || r.tp1 || 0; var upsidePct = entryRef && spaceRef ? ((spaceRef / entryRef - 1) * 100) : 0; - function trustWindowHtml() { + function entryWindowSummary() { var w = r.entry_window || {}; if (!isBuy || !w.status) return ''; - var cls = w.status === 'active' ? 'window-active' : (w.status === 'price_left_up' ? 'window-warn' : 'window-danger'); var mins = Number(w.remaining_minutes || 0); var remain = mins >= 60 ? (Math.floor(mins/60)+'h'+Math.round(mins%60)+'m') : (Math.max(0, Math.round(mins))+'m'); var dev = Number(w.deviation_pct || 0); var devText = (dev>0?'+':'') + dev.toFixed(2) + '%'; - return '
窗口有效期'+cleanDisplayText(w.label||'入场窗口')+'剩余 '+remain+' · 偏离 '+devText+'
'; + return '剩余 '+remain+' · 偏离 '+devText; } - var trustHtml = trustWindowHtml(); var weakNoteHtml = isWeakObserve ? '
'+cleanDisplayText(r.observe_reason || '信号强度不足,仅保留为低优先级观察,不构成实时机会。')+'
' : ''; + var decisionCls = isBuy ? 'buy' : (isWait ? 'wait' : (isWeakObserve ? 'weak' : 'observe')); + var decisionTitle = cleanDisplayText(r.execution_label || phase.label); + var decisionFocus = isBuy ? ('现价 '+fmtP(price)) : (isWait ? ('等 '+fmtP(entryRef)) : (isWeakObserve ? '低优先级观察' : '等待确认')); + var decisionReason = cleanDisplayText(isBuy ? (entryWindowSummary() || '入场窗口有效') : (isWait ? '现价不追,等回踩价附近再评估' : (r.observe_reason || r.state_reason || '未形成入场窗口'))); + var decisionHtml = '
最终建议'+decisionTitle+'
'+decisionFocus+''+decisionReason+'
'; var entryPlanHtml = ''; if (isTradePlan) { entryPlanHtml = '
' + @@ -751,26 +744,11 @@ function renderRecCard(r) { '
绩效口径不计入未成交易推荐
'+ '
'; } - var triggerCauseHtml = triggerCause ? '
'+(hasQualityGate?'观察原因':'触发依据')+''+(hasQualityGate ? cleanDisplayText(r.observe_reason || triggerCause).slice(0,96) : triggerCause.slice(0,80))+'
' : ''; - var triggerCtx = (r.market_context && r.market_context.trigger_context) || (r.sector_context && r.sector_context.trigger_context) || ep.trigger_context || {}; - var curTriggers = Array.isArray(triggerCtx.current_triggers) ? triggerCtx.current_triggers : []; - var staleTriggers = Array.isArray(triggerCtx.stale_background) ? triggerCtx.stale_background : []; - var triggerBadgeHtml = ''; - if (triggerCtx.trigger_status || curTriggers.length || staleTriggers.length) { - var tCls = /news/.test(triggerCtx.trigger_status || '') ? 'event' : (/stale/.test(triggerCtx.trigger_status || '') ? 'stale' : 'current'); - var tLabel = triggerCtx.trigger_label || (curTriggers.length ? '当前触发' : '历史背景'); - if (tCls === 'stale') tLabel = '历史背景'; - var firstCur = curTriggers[0] || {}; - var sub = firstCur.title || firstCur.label || (staleTriggers[0] && staleTriggers[0].label) || ''; - triggerBadgeHtml = '
'+cleanDisplayText(tLabel).slice(0,32)+''+(sub?''+cleanDisplayText(sub).slice(0,72)+'':'')+'
'; - } - var contextHtml = (triggerCauseHtml || triggerBadgeHtml) ? '
'+triggerCauseHtml+triggerBadgeHtml+'
' : ''; return '
'+base.slice(0,2).toUpperCase()+'
'+base+'
'+actionBadge+''+score+''+st.label+'
'+ '
$'+priceFmt+''+changeHtml+'
'+ + decisionHtml+ '
'+ (isWeakObserve ? weakNoteHtml : entryPlanHtml)+ - (!isWeakObserve && trustHtml?'
'+trustHtml+'
':'')+ - contextHtml+ (sigHtml?'
'+sigHtml+'
':'')+ '
'; } diff --git a/static/base.html b/static/base.html index 5651ba6..db59c67 100644 --- a/static/base.html +++ b/static/base.html @@ -65,6 +65,7 @@ a { color: inherit; text-decoration: none; } .sidebar-link.active { color: var(--on-primary); background: var(--primary); font-weight: 600; } .sidebar-link .link-icon { width: 18px; height: 18px; flex-shrink: 0; opacity: .6; } .sidebar-link.active .link-icon { opacity: 1; } +.sidebar-section-label { padding: 12px 14px 5px; color: var(--muted); font-size: 10px; font-weight: 900; letter-spacing: .08em; } .sidebar-user { padding: 14px 16px calc(14px + var(--safe-bottom)); border-top: 1px solid var(--hairline-soft); @@ -151,6 +152,7 @@ a { color: inherit; text-decoration: none; } + @@ -169,12 +171,13 @@ a { color: inherit; text-decoration: none; } diff --git a/static/iteration.html b/static/iteration.html index 4396845..9f6e7c1 100644 --- a/static/iteration.html +++ b/static/iteration.html @@ -2,12 +2,13 @@ {% block title %}AlphaX — 策略进化{% endblock %} {% block nav_links %} 看板 -关注 -策略 -迭代 舆情 订阅 推荐 + + + + {% endblock %} diff --git a/static/pipeline.html b/static/pipeline.html new file mode 100644 index 0000000..5498a1e --- /dev/null +++ b/static/pipeline.html @@ -0,0 +1,62 @@ +{% extends "base.html" %} +{% block title %}AlphaX — 链路日志{% endblock %} +{% block nav_links %} +看板 +舆情 +订阅 +推荐 + + + + + +{% endblock %} +{% block extra_head_css %} + +{% endblock %} +{% block content %} +
+
+

链路日志

按粗筛批次还原事件、粗筛、确认、推荐、跟踪与复盘结果。

+
+ + +
+
+
加载中...
+
+
+
批次
--
+
加载批次...
+
+
+
批次详情
--
+
请选择一个批次
+
+
+
+{% endblock %} +{% block extra_script %} + +{% endblock %} diff --git a/static/referral.html b/static/referral.html index ed14c69..a8aa56c 100644 --- a/static/referral.html +++ b/static/referral.html @@ -3,12 +3,13 @@ {% block nav_links %} 看板 -关注 -策略 -迭代 舆情 订阅 推荐 + + + + {% endblock %} diff --git a/static/sentiment.html b/static/sentiment.html index 27ceaf5..035f886 100644 --- a/static/sentiment.html +++ b/static/sentiment.html @@ -2,12 +2,13 @@ {% block title %}AlphaX — 舆情雷达{% endblock %} {% block nav_links %} 看板 -关注 -策略 -迭代 舆情 订阅 推荐 + + + + {% endblock %} {% block extra_head_css %} @@ -283,4 +284,4 @@ loadFeed(); // Auto-refresh every 5 minutes setInterval(loadFeed, 300000); -{% endblock %} \ No newline at end of file +{% endblock %} diff --git a/static/strategy.html b/static/strategy.html index ff32ad5..922b08d 100644 --- a/static/strategy.html +++ b/static/strategy.html @@ -2,12 +2,13 @@ {% block title %}策略 — AlphaX{% endblock %} {% block nav_links %} 看板 -关注 -策略 -迭代 舆情 订阅 推荐 + + + + {% endblock %} {% block extra_head_css %} diff --git a/static/subscription.html b/static/subscription.html index 1b174f4..66057ca 100644 --- a/static/subscription.html +++ b/static/subscription.html @@ -2,12 +2,13 @@ {% block title %}订阅中心 — AlphaX{% endblock %} {% block nav_links %} 看板 -关注 -策略 -迭代 舆情 订阅 推荐 + + + + {% endblock %} {% block extra_head_css %} @@ -253,4 +254,4 @@ async function claimFreeTrial() { loadUser(); loadMe(); -{% endblock %} \ No newline at end of file +{% endblock %} diff --git a/static/watchlist.html b/static/watchlist.html index 51e0277..2e1bdff 100644 --- a/static/watchlist.html +++ b/static/watchlist.html @@ -2,12 +2,13 @@ {% block title %}关注 — AlphaX{% endblock %} {% block nav_links %} 看板 -关注 -策略 -迭代 舆情 订阅 推荐 + + + + {% endblock %} {% block extra_head_css %} diff --git a/tests/test_opportunity_lifecycle.py b/tests/test_opportunity_lifecycle.py index 2b5ba1d..53421ec 100644 --- a/tests/test_opportunity_lifecycle.py +++ b/tests/test_opportunity_lifecycle.py @@ -5,6 +5,7 @@ import sys sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from app.core.opportunity_lifecycle import apply_entry_quality_gate +from app.services.price_tracker import reconcile_buy_signals_after_gate from legacy import price_tracker_ws @@ -28,6 +29,46 @@ def test_risk_reward_false_blocks_buy_now(): assert any('risk_reward_ok=false' in r for r in reasons) +def test_buy_now_with_bad_rr_sets_real_pullback_price(): + action, plan, reasons = apply_entry_quality_gate( + action_status='可即刻买入', + entry_plan={ + 'entry_action': '即刻买入', + 'entry_price': 0.11455, + 'current_price': 0.11455, + 'stop_loss': 0.107457, + 'tp1': 0.120089, + 'risk_reward_ok': False, + 'rr1': 0.83, + }, + signals=['🟢 15min即刻入场信号', '日线 站稳突破位+19.2%'], + current_price=0.11455, + market_context={'change_24h': 3.1}, + ) + + assert action == '等回踩' + assert plan['entry_price'] < 0.11455 + assert round(plan['entry_price'], 6) == 0.113199 + assert plan['rr_target_entry'] == plan['entry_price'] + assert any('现价不买' in r for r in reasons) + + +def test_tracker_gate_downgrade_removes_provisional_buy_signal(): + signals = reconcile_buy_signals_after_gate( + [ + '🟢 回踩确认完毕!可即刻入场(15min动K确认)', + '其他背景信号', + ], + '等回踩', + {'rr_target_entry': 0.11322245, 'entry_price': 0.11322245}, + ['rr1=0.82 < 1.2,禁止现价买入', '现价不买,等回落到0.11322245附近再评估'], + ) + + assert all('可即刻入场' not in signal for signal in signals) + assert all('回踩确认完毕' not in signal for signal in signals) + assert any('现价不买' in signal and '$0.1132' in signal for signal in signals) + + def test_breakout_distance_over_60_forces_observe(): action, plan, reasons = apply_entry_quality_gate( action_status='可即刻买入', diff --git a/tests/test_pipeline_runs_api.py b/tests/test_pipeline_runs_api.py new file mode 100644 index 0000000..a998c55 --- /dev/null +++ b/tests/test_pipeline_runs_api.py @@ -0,0 +1,276 @@ +import os +import sqlite3 +import sys +from datetime import datetime, timedelta + +import pytest +from fastapi.testclient import TestClient + +PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if PROJECT_DIR not in sys.path: + sys.path.insert(0, PROJECT_DIR) + +from app.db import altcoin_db +from app.db.analytics import get_pipeline_run_detail, get_pipeline_runs +from app.web import web_server + + +@pytest.fixture +def temp_db(monkeypatch, tmp_path): + db_path = tmp_path / "altcoin_monitor.db" + monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) + altcoin_db.init_db() + return db_path + + +def _insert_screening(db_path, scan_time, layer, symbol, state="蓄力", score=6): + conn = sqlite3.connect(db_path) + conn.execute( + """ + INSERT INTO screening_log ( + scan_time, layer, symbol, state, score, price, signals, + sector, leader_status, is_meme, change_24h, funding_rate, detail_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + scan_time, + layer, + symbol, + state, + score, + 1.23, + '["vp_fly_1h_current"]', + "AI", + "leader", + 0, + 8.8, + 0.01, + '{"reason":"volume current"}', + ), + ) + conn.commit() + conn.close() + + +def _insert_coin_state(db_path, symbol, state, score, detected_at): + conn = sqlite3.connect(db_path) + conn.execute( + """ + INSERT INTO coin_state ( + symbol, state, score, anomaly_type, sector, leader_status, + detected_at, last_alert_time, last_alert_level, detail_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + (symbol, state, score, "", "", "", detected_at, detected_at, "low", "{}"), + ) + conn.commit() + conn.close() + + +def _insert_recommendation(db_path, rec_time, symbol="AAA/USDT", status="hit_tp1"): + conn = sqlite3.connect(db_path) + cur = conn.execute( + """ + INSERT INTO recommendation ( + symbol, rec_time, rec_state, rec_score, entry_price, stop_loss, tp1, tp2, + sector, signals, status, current_price, max_price, min_price, pnl_pct, + max_pnl_pct, max_drawdown_pct, entry_plan_json, action_status, + execution_status, display_bucket, lifecycle_state, entry_triggered, + signal_codes_json, signal_labels_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + symbol, + rec_time, + "爆发", + 82, + 1.0, + 0.94, + 1.08, + 1.16, + "AI", + '["1H当前放量"]', + status, + 1.1, + 1.12, + 0.98, + 10, + 12, + -2, + '{"entry_action":"可即刻买入"}', + "可即刻买入", + "buy_now", + "actionable", + "actionable", + 1, + '["vp_fly_1h_current"]', + '["1H当前放量"]', + ), + ) + conn.commit() + conn.close() + return cur.lastrowid + + +def _insert_review(db_path, rec_id, review_time, outcome="爆发"): + conn = sqlite3.connect(db_path) + conn.execute( + """ + INSERT INTO review_log ( + rec_id, symbol, review_time, outcome, pnl_48h, max_pnl_48h, + triggered_signals, hit_signals, miss_signals, lesson + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + rec_id, + "AAA/USDT", + review_time, + outcome, + 6.5, + 12.0, + '["vp_fly_1h_current"]', + '["vp_fly_1h_current"]', + "[]", + "当前放量有效", + ), + ) + conn.commit() + conn.close() + + +def _insert_missed(db_path, detect_time): + conn = sqlite3.connect(db_path) + conn.execute( + """ + INSERT INTO missed_explosions ( + symbol, detect_time, price_at_detect, price_before, + gain_pct, reason_missed, features_detected, lesson + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ("MISS/USDT", detect_time, 2.4, 1.9, 26.3, "确认没过", '{"volume":"high"}', "提高确认层覆盖"), + ) + conn.commit() + conn.close() + + +def test_pipeline_runs_aggregates_funnel_and_performance(temp_db): + base = datetime.now() - timedelta(minutes=40) + started = base.isoformat(timespec="seconds") + finished = (base + timedelta(seconds=20)).isoformat(timespec="seconds") + + altcoin_db.log_cron_run( + "事件舆情", + "event_driven_screener.py", + "success", + "processed", + started_at=(base - timedelta(minutes=3)).isoformat(timespec="seconds"), + finished_at=(base - timedelta(minutes=2)).isoformat(timespec="seconds"), + summary={"processed_count": 5}, + ) + altcoin_db.log_cron_run( + "粗筛", + "altcoin_screener.py", + "success", + "screened", + started_at=started, + finished_at=finished, + duration_ms=20000, + summary={"total_candidates": 3, "total_qualified": 2, "alert_count": 2}, + ) + altcoin_db.log_cron_run( + "确认", + "altcoin_confirm.py", + "success", + "confirmed", + started_at=(base + timedelta(minutes=5)).isoformat(timespec="seconds"), + finished_at=(base + timedelta(minutes=6)).isoformat(timespec="seconds"), + summary={"processed_count": 2, "confirmed_count": 1, "unconfirmed_count": 1}, + ) + _insert_screening(temp_db, (base + timedelta(seconds=5)).isoformat(timespec="seconds"), "粗筛", "AAA/USDT") + _insert_screening(temp_db, (base + timedelta(seconds=6)).isoformat(timespec="seconds"), "细筛", "AAA/USDT") + rec_id = _insert_recommendation(temp_db, (base + timedelta(minutes=7)).isoformat(timespec="seconds")) + _insert_review(temp_db, rec_id, (base + timedelta(minutes=8)).isoformat(timespec="seconds"), outcome="爆发") + _insert_missed(temp_db, (base + timedelta(minutes=9)).isoformat(timespec="seconds")) + + data = get_pipeline_runs(limit=10, hours=24) + assert data["kpi"]["run_count"] == 1 + assert data["kpi"]["rough_candidates"] == 3 + assert data["kpi"]["fine_qualified"] == 2 + assert data["kpi"]["confirm_hits"] == 1 + assert data["kpi"]["recommendations"] == 1 + assert data["kpi"]["perf_success"] == 1 + assert data["kpi"]["missed_count"] == 1 + + run = data["runs"][0] + detail = get_pipeline_run_detail(run["run_id"]) + assert detail["stage_counts"]["observation"] == 1 + assert detail["stage_counts"]["fine"] == 1 + assert detail["stage_counts"]["recommendation"] == 1 + assert detail["recommendations"][0]["performance_status"] == "success" + assert detail["missed_explosions"][0]["symbol"] == "MISS/USDT" + + +def test_pipeline_api_keeps_observation_batch_without_recommendations(temp_db): + base = datetime.now() - timedelta(minutes=20) + altcoin_db.log_cron_run( + "粗筛", + "altcoin_screener.py", + "success", + "screened", + started_at=base.isoformat(timespec="seconds"), + finished_at=(base + timedelta(seconds=10)).isoformat(timespec="seconds"), + summary={"total_candidates": 1, "total_qualified": 0}, + ) + _insert_screening(temp_db, (base + timedelta(seconds=2)).isoformat(timespec="seconds"), "粗筛", "OBS/USDT", score=4) + + client = TestClient(web_server.app) + resp = client.get("/api/pipeline/runs?hours=24") + assert resp.status_code == 200 + data = resp.json() + assert data["runs"][0]["rough_candidates"] == 1 + assert data["runs"][0]["recommendations"] == 0 + + detail = client.get(f"/api/pipeline/runs/{data['runs'][0]['run_id']}").json() + assert detail["screening_items"][0]["symbol"] == "OBS/USDT" + assert detail["screening_items"][0]["stage_label"] == "观察候选" + + +def test_pipeline_page_nav_hides_watchlist_entry_and_watchlist_route_survives(temp_db): + client = TestClient(web_server.app) + + pipeline_resp = client.get("/pipeline") + assert pipeline_resp.status_code == 200 + html = pipeline_resp.text + assert "链路日志" in html + assert 'href="/pipeline"' in html + assert 'href="/watchlist"' not in html + + watch_resp = client.get("/watchlist") + assert watch_resp.status_code == 200 + + +def test_user_nav_keeps_research_pages_in_admin_section(temp_db): + client = TestClient(web_server.app) + resp = client.get("/app") + assert resp.status_code == 200 + html = resp.text + + assert 'href="/watchlist"' not in html + assert 'href="/pipeline" style="display:none"' in html + assert 'href="/strategy" style="display:none"' in html + assert 'href="/iteration" style="display:none"' in html + assert "研发" in html + + +def test_confirm_candidates_prefer_recent_fine_screened_state(temp_db): + from app.db.altcoin_db import get_candidates_for_confirm + + old_time = (datetime.now() - timedelta(hours=7)).isoformat(timespec="seconds") + recent_time = (datetime.now() - timedelta(minutes=5)).isoformat(timespec="seconds") + _insert_coin_state(temp_db, "CHIP/USDT", "蓄力", 5, old_time) + _insert_coin_state(temp_db, "DOGE/USDT", "蓄力", 3, recent_time) + + symbols = [item["symbol"] for item in get_candidates_for_confirm()] + + assert symbols == ["DOGE/USDT"] diff --git a/tests/test_screener_optimizations.py b/tests/test_screener_optimizations.py index a781a83..4b02502 100644 --- a/tests/test_screener_optimizations.py +++ b/tests/test_screener_optimizations.py @@ -237,3 +237,45 @@ def test_strong_static_accumulation_can_promote_to_accelerate(monkeypatch): assert any("强静K蓄力直升加速" in s for s in qualified["PNT/USDT"]["signals"]) assert qualified["PNT/USDT"]["candidate_stage"] == "confirm_pending" assert "rec_id" not in qualified["PNT/USDT"] + + +def test_layer1_logs_coarse_candidate_details(monkeypatch): + logged = [] + h4_df = pd.DataFrame({ + "open": [1.0] * 24, + "high": [1.01] * 24, + "low": [0.99] * 24, + "close": [1.0] * 24, + "volume": [1000] * 24, + }) + + monkeypatch.setattr(altcoin_screener, "fetch_all_tickers", lambda: { + "DOGE/USDT": {"volume_24h": 20_000_000, "change_24h": 3.5, "price": 0.1}, + }) + monkeypatch.setattr(altcoin_screener, "fetch_funding_rates", lambda: {}) + monkeypatch.setattr(altcoin_screener, "get_dynamic_weights", _mock_weights) + monkeypatch.setattr(altcoin_screener, "is_meme_coin", lambda symbol: False) + monkeypatch.setattr(altcoin_screener, "get_burst_threshold", lambda symbol: 20) + monkeypatch.setattr(altcoin_screener, "fetch_klines", lambda symbol, timeframe, limit=200: h4_df if timeframe == "4h" else None) + monkeypatch.setattr( + altcoin_screener, + "detect_static_accumulation", + lambda symbol, df: {"static_count": 6, "vol_ratio": 1.5}, + ) + monkeypatch.setattr( + altcoin_screener, + "get_screener_section", + lambda name=None: { + "static_accumulation_bypass": {"min_volume_24h": 1_000_000, "min_vol_ratio": 1.2}, + "higher_lows": {"enabled": False}, + "compression_surge": {"enabled": False}, + "sentiment": {"enabled": False}, + }.get(name, {}), + ) + monkeypatch.setattr(altcoin_screener.exchange, "fapiPublicGetTicker24hr", lambda: []) + monkeypatch.setattr(altcoin_screener, "log_screening", lambda **kwargs: logged.append(kwargs)) + + candidates = altcoin_screener.layer1_coarse_filter() + + assert "DOGE/USDT" in candidates + assert any(item["layer"] == "粗筛" and item["symbol"] == "DOGE/USDT" for item in logged)