import os import sqlite3 import sys from datetime import datetime, timedelta import pytest from fastapi.testclient import TestClient PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if PROJECT_DIR not in sys.path: sys.path.insert(0, PROJECT_DIR) from app.db import altcoin_db from app.db.analytics import get_pipeline_run_detail, get_pipeline_runs from app.web import web_server @pytest.fixture def temp_db(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) altcoin_db.init_db() return db_path def _insert_screening(db_path, scan_time, layer, symbol, state="蓄力", score=6): conn = sqlite3.connect(db_path) conn.execute( """ INSERT INTO screening_log ( scan_time, layer, symbol, state, score, price, signals, sector, leader_status, is_meme, change_24h, funding_rate, detail_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( scan_time, layer, symbol, state, score, 1.23, '["vp_fly_1h_current"]', "AI", "leader", 0, 8.8, 0.01, '{"reason":"volume current"}', ), ) conn.commit() conn.close() def _insert_coin_state(db_path, symbol, state, score, detected_at): conn = sqlite3.connect(db_path) conn.execute( """ INSERT INTO coin_state ( symbol, state, score, anomaly_type, sector, leader_status, detected_at, last_alert_time, last_alert_level, detail_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (symbol, state, score, "", "", "", detected_at, detected_at, "low", "{}"), ) conn.commit() conn.close() def _insert_recommendation(db_path, rec_time, symbol="AAA/USDT", status="hit_tp1"): conn = sqlite3.connect(db_path) cur = conn.execute( """ INSERT INTO recommendation ( symbol, rec_time, rec_state, rec_score, entry_price, stop_loss, tp1, tp2, sector, signals, status, current_price, max_price, min_price, pnl_pct, max_pnl_pct, max_drawdown_pct, entry_plan_json, action_status, execution_status, display_bucket, lifecycle_state, entry_triggered, signal_codes_json, signal_labels_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( symbol, rec_time, "爆发", 82, 1.0, 0.94, 1.08, 1.16, "AI", '["1H当前放量"]', status, 1.1, 1.12, 0.98, 10, 12, -2, '{"entry_action":"可即刻买入"}', "可即刻买入", "buy_now", "actionable", "actionable", 1, '["vp_fly_1h_current"]', '["1H当前放量"]', ), ) conn.commit() conn.close() return cur.lastrowid def _insert_review(db_path, rec_id, review_time, outcome="爆发"): conn = sqlite3.connect(db_path) conn.execute( """ INSERT INTO review_log ( rec_id, symbol, review_time, outcome, pnl_48h, max_pnl_48h, triggered_signals, hit_signals, miss_signals, lesson ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( rec_id, "AAA/USDT", review_time, outcome, 6.5, 12.0, '["vp_fly_1h_current"]', '["vp_fly_1h_current"]', "[]", "当前放量有效", ), ) conn.commit() conn.close() def _insert_missed(db_path, detect_time, symbol="MISS/USDT", gain_pct=26.3): conn = sqlite3.connect(db_path) conn.execute( """ INSERT INTO missed_explosions ( symbol, detect_time, price_at_detect, price_before, gain_pct, reason_missed, features_detected, lesson ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (symbol, detect_time, 2.4, 1.9, gain_pct, "确认没过", '{"volume":"high"}', "提高确认层覆盖"), ) conn.commit() conn.close() def test_pipeline_runs_aggregates_funnel_and_performance(temp_db): base = datetime.now() - timedelta(minutes=40) started = base.isoformat(timespec="seconds") finished = (base + timedelta(seconds=20)).isoformat(timespec="seconds") altcoin_db.log_cron_run( "事件舆情", "event_driven_screener.py", "success", "processed", started_at=(base - timedelta(minutes=3)).isoformat(timespec="seconds"), finished_at=(base - timedelta(minutes=2)).isoformat(timespec="seconds"), summary={"processed_count": 5}, ) altcoin_db.log_cron_run( "粗筛", "altcoin_screener.py", "success", "screened", started_at=started, finished_at=finished, duration_ms=20000, summary={"total_candidates": 3, "total_qualified": 2, "alert_count": 2}, ) altcoin_db.log_cron_run( "确认", "altcoin_confirm.py", "success", "confirmed", started_at=(base + timedelta(minutes=5)).isoformat(timespec="seconds"), finished_at=(base + timedelta(minutes=6)).isoformat(timespec="seconds"), summary={"processed_count": 2, "confirmed_count": 1, "unconfirmed_count": 1}, ) _insert_screening(temp_db, (base + timedelta(seconds=5)).isoformat(timespec="seconds"), "粗筛", "AAA/USDT") _insert_screening(temp_db, (base + timedelta(seconds=6)).isoformat(timespec="seconds"), "细筛", "AAA/USDT") rec_id = _insert_recommendation(temp_db, (base + timedelta(minutes=7)).isoformat(timespec="seconds")) _insert_review(temp_db, rec_id, (base + timedelta(minutes=8)).isoformat(timespec="seconds"), outcome="爆发") _insert_missed(temp_db, (base + timedelta(minutes=9)).isoformat(timespec="seconds")) _insert_missed(temp_db, (base + timedelta(minutes=9, seconds=10)).isoformat(timespec="seconds")) data = get_pipeline_runs(limit=10, hours=24) assert data["kpi"]["run_count"] == 1 assert data["kpi"]["rough_candidates"] == 3 assert data["kpi"]["fine_qualified"] == 2 assert data["kpi"]["confirm_hits"] == 1 assert data["kpi"]["recommendations"] == 1 assert data["kpi"]["perf_success"] == 1 assert data["kpi"]["missed_count"] == 1 run = data["runs"][0] detail = get_pipeline_run_detail(run["run_id"]) assert detail["stage_counts"]["observation"] == 1 assert detail["stage_counts"]["fine"] == 1 assert detail["stage_counts"]["recommendation"] == 1 assert detail["recommendations"][0]["performance_status"] == "success" assert len(detail["missed_explosions"]) == 1 assert detail["missed_explosions"][0]["symbol"] == "MISS/USDT" def test_pipeline_runs_supports_pagination(temp_db): base = datetime.now() - timedelta(minutes=90) for i in range(3): start = (base + timedelta(minutes=i * 5)).isoformat(timespec="seconds") altcoin_db.log_cron_run( "粗筛", "altcoin_screener.py", "success", f"batch_{i}", started_at=start, finished_at=(base + timedelta(minutes=i * 5 + 1)).isoformat(timespec="seconds"), summary={"total_candidates": i + 1, "total_qualified": i}, ) data_page1 = get_pipeline_runs(limit=2, hours=24, offset=0) data_page2 = get_pipeline_runs(limit=2, hours=24, offset=2) assert data_page1["pagination"]["total_count"] == 3 assert data_page1["pagination"]["total_pages"] == 2 assert data_page1["pagination"]["page"] == 1 assert len(data_page1["runs"]) == 2 assert data_page2["pagination"]["page"] == 2 assert len(data_page2["runs"]) == 1 assert data_page1["runs"][0]["run_id"] != data_page2["runs"][0]["run_id"] assert data_page1["kpi"]["run_count"] == 3 assert data_page2["kpi"]["run_count"] == 3 assert data_page1["kpi"]["rough_candidates"] == 6 assert data_page2["kpi"]["rough_candidates"] == 6 def test_pipeline_api_keeps_observation_batch_without_recommendations(temp_db): base = datetime.now() - timedelta(minutes=20) altcoin_db.log_cron_run( "粗筛", "altcoin_screener.py", "success", "screened", started_at=base.isoformat(timespec="seconds"), finished_at=(base + timedelta(seconds=10)).isoformat(timespec="seconds"), summary={"total_candidates": 1, "total_qualified": 0}, ) _insert_screening(temp_db, (base + timedelta(seconds=2)).isoformat(timespec="seconds"), "粗筛", "OBS/USDT", score=4) client = TestClient(web_server.app) resp = client.get("/api/pipeline/runs?hours=24") assert resp.status_code == 200 data = resp.json() assert data["runs"][0]["rough_candidates"] == 1 assert data["runs"][0]["recommendations"] == 0 detail = client.get(f"/api/pipeline/runs/{data['runs'][0]['run_id']}").json() assert detail["screening_items"][0]["symbol"] == "OBS/USDT" assert detail["screening_items"][0]["stage_label"] == "观察候选" def test_pipeline_api_returns_pagination_meta(temp_db): base = datetime.now() - timedelta(minutes=15) for i in range(2): altcoin_db.log_cron_run( "粗筛", "altcoin_screener.py", "success", f"paged_{i}", started_at=(base + timedelta(minutes=i)).isoformat(timespec="seconds"), finished_at=(base + timedelta(minutes=i, seconds=20)).isoformat(timespec="seconds"), summary={"total_candidates": 1, "total_qualified": 1}, ) client = TestClient(web_server.app) resp = client.get("/api/pipeline/runs?hours=24&limit=1&offset=1") assert resp.status_code == 200 data = resp.json() assert data["pagination"]["limit"] == 1 assert data["pagination"]["offset"] == 1 assert data["pagination"]["page"] == 2 assert data["pagination"]["total_count"] >= 2 assert len(data["runs"]) == 1 def test_pipeline_api_reports_new_funnel_stages(temp_db): base = datetime.now() - timedelta(minutes=40) started = base.isoformat(timespec="seconds") finished = (base + timedelta(seconds=25)).isoformat(timespec="seconds") altcoin_db.log_cron_run( "粗筛", "altcoin_screener.py", "success", "screened", started_at=started, finished_at=finished, summary={"total_candidates": 2, "total_qualified": 1}, ) _insert_screening(temp_db, (base + timedelta(seconds=2)).isoformat(timespec="seconds"), "universe_gate", "RISK/USDT", state="过期", score=0) _insert_screening(temp_db, (base + timedelta(seconds=8)).isoformat(timespec="seconds"), "粗筛", "AAA/USDT", state="候选", score=8) _insert_screening(temp_db, (base + timedelta(seconds=12)).isoformat(timespec="seconds"), "细筛", "AAA/USDT", state="过期", score=3) data = get_pipeline_runs(limit=10, hours=24) assert data["kpi"]["universe_gate_count"] == 1 assert data["kpi"]["quality_reject_count"] == 1 detail = get_pipeline_run_detail(data["runs"][0]["run_id"]) assert detail["stage_counts"]["universe_gate"] == 1 assert detail["stage_counts"]["quality_reject"] == 1 stages = {item["candidate_stage"] for item in detail["screening_items"]} assert {"universe_gate", "discovery_candidate", "rejected_candidate"} <= stages def test_pipeline_page_nav_hides_watchlist_entry_and_watchlist_route_survives(temp_db): client = TestClient(web_server.app) pipeline_resp = client.get("/pipeline") assert pipeline_resp.status_code == 200 html = pipeline_resp.text assert "链路日志" in html assert 'href="/watchlist"' not in html watch_resp = client.get("/watchlist") assert watch_resp.status_code == 200 def test_sidebar_keeps_engineering_pages_in_admin_menu(temp_db): client = TestClient(web_server.app) resp = client.get("/app") assert resp.status_code == 200 html = resp.text assert "机会中心" in html assert "诊断中心" in html assert 'href="/operations"' not in html assert 'href="/llm-insights"' not in html assert 'href="/data-export"' not in html assert 'href="/strategy"' not in html assert 'href="/iteration"' not in html def test_pipeline_page_filters_missed_rows_as_missed(temp_db): client = TestClient(web_server.app) resp = client.get("/pipeline") assert resp.status_code == 200 html = resp.text assert "if(item.kind==='missed')return 'missed'" in html assert "['missed','漏选'" in html def test_opportunity_detail_page_available(temp_db): client = TestClient(web_server.app) resp = client.get("/opportunity?symbol=AAA%2FUSDT") assert resp.status_code == 200 assert "机会详情" in resp.text assert "/api/opportunity/detail" in resp.text def test_user_nav_keeps_review_center_but_hides_legacy_research_pages(temp_db): client = TestClient(web_server.app) resp = client.get("/app") assert resp.status_code == 200 html = resp.text assert 'href="/watchlist"' not in html assert 'href="/logs" style="display:none"' in html assert 'href="/review-center" style="display:none"' in html assert 'href="/strategy"' not in html assert 'href="/iteration"' not in html def test_confirm_candidates_prefer_recent_fine_screened_state(temp_db): from app.db.altcoin_db import get_candidates_for_confirm old_time = (datetime.now() - timedelta(hours=7)).isoformat(timespec="seconds") recent_time = (datetime.now() - timedelta(minutes=5)).isoformat(timespec="seconds") _insert_coin_state(temp_db, "CHIP/USDT", "蓄力", 5, old_time) _insert_coin_state(temp_db, "DOGE/USDT", "蓄力", 3, recent_time) symbols = [item["symbol"] for item in get_candidates_for_confirm()] assert symbols == ["DOGE/USDT"]