import json import os import sqlite3 import sys import pytest from fastapi.testclient import TestClient PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if PROJECT_DIR not in sys.path: sys.path.insert(0, PROJECT_DIR) from app.db import altcoin_db from app.db.llm_insights import compute_input_hash, get_cached_insight, repair_mojibake_json, upsert_insight from app.services import event_driven_screener, llm_insights from app.web import web_server @pytest.fixture def temp_db(monkeypatch, tmp_path): db_path = tmp_path / "altcoin_monitor.db" monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) altcoin_db.init_db() return db_path def _insert_recommendation(db_path, **kwargs): defaults = dict( symbol="AAA/USDT", rec_time="2026-05-01T10:00:00", rec_state="加速", rec_score=80, entry_price=100.0, stop_loss=95.0, tp1=110.0, tp2=120.0, sector="AI", signals=json.dumps(["15min 即刻入场信号"], ensure_ascii=False), signal_codes_json=json.dumps(["vp_fly_1h_current"], ensure_ascii=False), signal_labels_json=json.dumps(["15min 即刻入场信号"], ensure_ascii=False), is_meme=0, status="active", current_price=100.0, max_price=104.0, min_price=98.0, pnl_pct=0.0, max_pnl_pct=4.0, max_drawdown_pct=-1.0, hit_tp1_time="", hit_tp2_time="", stopped_out_time="", expired_time="", last_track_time="2026-05-01T10:10:00", entry_plan_json=json.dumps({"entry_price": 100.0, "entry_action": "可即刻买入", "entry_trigger_confirmed": True}, ensure_ascii=False), action_status="可即刻买入", direction="多头启动", execution_status="buy_now", display_bucket="realtime", lifecycle_state="position", entry_triggered=1, state_reason="推荐时就是可即刻买入", strategy_version="v1.0", ) defaults.update(kwargs) conn = sqlite3.connect(db_path) cols = ",".join(defaults.keys()) qs = ",".join(["?"] * len(defaults)) conn.execute(f"INSERT INTO recommendation ({cols}) VALUES ({qs})", tuple(defaults.values())) conn.commit() conn.close() def _fetch_llm_row(db_path): conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row row = conn.execute("SELECT * FROM llm_insights ORDER BY id DESC LIMIT 1").fetchone() conn.close() return dict(row) if row else None def test_disabled_llm_skips_and_does_not_change_strategy_state(monkeypatch, temp_db): _insert_recommendation(temp_db) monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: False) result = llm_insights.run(scope="recommendations", limit=10) assert result["status"] == "skipped" rows = altcoin_db.get_active_recommendations_deduped(actionable_only=False) target = next(r for r in rows if r["symbol"] == "AAA/USDT") assert target["execution_status"] == "buy_now" assert "llm_insight" not in target def test_same_input_hash_is_cached_without_repeated_call(monkeypatch, temp_db): _insert_recommendation(temp_db) monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) monkeypatch.setattr(llm_insights, "_call_llm_json", lambda prompt, payload: {"status": "success", "content": {"summary": "ok"}, "model": "m"}) calls = [] original_upsert = llm_insights.upsert_insight def wrapped_upsert(*args, **kwargs): calls.append((args, kwargs)) return original_upsert(*args, **kwargs) monkeypatch.setattr(llm_insights, "upsert_insight", wrapped_upsert) first = llm_insights.run(scope="recommendations", limit=10) second = llm_insights.run(scope="recommendations", limit=10) assert first["processed"] == 1 assert second["processed"] == 0 assert len(calls) == 1 rows = altcoin_db.get_active_recommendations_deduped(actionable_only=False) target = next(r for r in rows if r["symbol"] == "AAA/USDT") assert target["llm_insight"]["content"]["summary"] == "ok" def test_invalid_json_is_marked_failed(monkeypatch, temp_db): _insert_recommendation(temp_db, symbol="BBB/USDT", rec_time="2026-05-01T11:00:00") monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) monkeypatch.setattr(llm_insights, "_call_llm_json", lambda prompt, payload: {"status": "failed", "error": "invalid_json", "model": "m"}) llm_insights.run(scope="recommendations", limit=10) row = _fetch_llm_row(temp_db) assert row["status"] == "failed" def test_llm_empty_content_is_not_marked_success(monkeypatch): monkeypatch.setattr(llm_insights, "get_llm_params", lambda: { "enabled": True, "base_url": "https://llm.example/v1", "api_key_env": "TEST_LLM_KEY", "model": "deepseek-v4-pro", "timeout": 5, "max_tokens": 100, "modules": {}, }) monkeypatch.setenv("TEST_LLM_KEY", "test-key") class Resp: status_code = 200 text = "{}" def json(self): return {"choices": [{"message": {"content": ""}}]} monkeypatch.setattr(llm_insights.requests, "post", lambda *args, **kwargs: Resp()) result = llm_insights._call_llm_json("test_prompt", {"symbol": "AAA/USDT"}) assert result["status"] == "failed" assert "llm_empty_content" in result["error"] def test_llm_empty_content_retries_without_response_format(monkeypatch): monkeypatch.setattr(llm_insights, "get_llm_params", lambda: { "enabled": True, "base_url": "https://llm.example/v1", "api_key_env": "TEST_LLM_KEY", "model": "deepseek-v4-pro", "timeout": 5, "max_tokens": 100, "modules": {}, }) monkeypatch.setenv("TEST_LLM_KEY", "test-key") calls = [] class EmptyResp: status_code = 200 text = "{}" def json(self): return {"choices": [{"message": {"content": ""}}]} class JsonResp: status_code = 200 text = "{}" def json(self): return {"choices": [{"message": {"content": "{\"summary\":\"retry ok\"}"}}]} def fake_post(*args, **kwargs): calls.append(kwargs.get("json") or {}) return EmptyResp() if len(calls) == 1 else JsonResp() monkeypatch.setattr(llm_insights.requests, "post", fake_post) result = llm_insights._call_llm_json("test_prompt", {"symbol": "AAA/USDT"}) assert result["status"] == "success" assert result["content"]["summary"] == "retry ok" assert result["retry"] == "without_response_format" assert calls[0]["response_format"] == {"type": "json_object"} assert "response_format" not in calls[1] def test_llm_json_code_fence_is_parsed(monkeypatch): monkeypatch.setattr(llm_insights, "get_llm_params", lambda: { "enabled": True, "base_url": "https://llm.example/v1", "api_key_env": "TEST_LLM_KEY", "model": "deepseek-v4-pro", "timeout": 5, "max_tokens": 100, "modules": {}, }) monkeypatch.setenv("TEST_LLM_KEY", "test-key") class Resp: status_code = 200 text = "{}" def json(self): return {"choices": [{"message": {"content": "```json\n{\"summary\":\"ok\"}\n```"}}]} monkeypatch.setattr(llm_insights.requests, "post", lambda *args, **kwargs: Resp()) result = llm_insights._call_llm_json("test_prompt", {"symbol": "AAA/USDT"}) assert result["status"] == "success" assert result["content"]["summary"] == "ok" def test_only_key_samples_generate_insights(monkeypatch, temp_db): _insert_recommendation(temp_db, symbol="CCC/USDT", action_status="观察", execution_status="observe", display_bucket="watch_pool", state_reason="普通观察") _insert_recommendation(temp_db, symbol="DDD/USDT", action_status="等回踩", execution_status="wait_pullback", display_bucket="realtime", rec_time="2026-05-01T12:00:00") monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) seen = [] monkeypatch.setattr(llm_insights, "_call_llm_json", lambda prompt, payload: seen.append(payload["symbol"]) or {"status": "success", "content": {"summary": "ok"}, "model": "m"}) llm_insights.run(scope="recommendations", limit=10) assert "CCC/USDT" not in seen assert "DDD/USDT" in seen def test_api_exposes_cached_ai_fields(monkeypatch, temp_db): _insert_recommendation(temp_db) monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) monkeypatch.setattr(llm_insights, "_call_llm_json", lambda prompt, payload: {"status": "success", "content": {"summary": "AI 摘要", "key_evidence": ["量能增强"]}, "model": "m"}) llm_insights.run(scope="recommendations", limit=10) client = TestClient(web_server.app) resp = client.get("/api/recommendations/active?actionable_only=false") assert resp.status_code == 200 data = resp.json() target = next(r for r in data if r["symbol"] == "AAA/USDT") assert target["llm_insight"]["content"]["summary"] == "AI 摘要" def test_invalid_json_gets_cached_as_failed(monkeypatch, temp_db): _insert_recommendation(temp_db) monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) monkeypatch.setattr(llm_insights, "_call_llm_json", lambda prompt, payload: {"status": "failed", "error": "invalid_json:bad", "model": "m"}) llm_insights.run(scope="recommendations", limit=10) row = _fetch_llm_row(temp_db) assert row["status"] == "failed" assert "invalid_json" in row["error"] def test_llm_insights_api_exposes_input_and_output(temp_db): payload = {"symbol": "AAA/USDT", "action_status": "可即刻买入", "signals": ["15min 即刻入场信号"]} upsert_insight( "recommendation", "7", llm_insights.PROMPTS["recommendation_explain_v1"], llm_insights.PROMPTS["recommendation_explain_v1"], compute_input_hash(payload), "success", input_payload=payload, content={"summary": "AI 判断现在处于入场窗口", "key_evidence": ["量能增强"]}, model="test-model", ) client = TestClient(web_server.app) resp = client.get("/api/llm/insights") assert resp.status_code == 200 data = resp.json() assert data["items"][0]["model"] == "test-model" assert data["items"][0]["input"]["symbol"] == "AAA/USDT" assert data["items"][0]["content"]["summary"] == "AI 判断现在处于入场窗口" detail = client.get(f"/api/llm/insights/{data['items'][0]['id']}").json() assert detail["input"]["signals"] == ["15min 即刻入场信号"] def test_sentiment_batch_analysis_api_returns_cached_result(temp_db): payload = { "target_type": "sentiment_batch", "target_id": "sentiment_batch:24h", "hours": 24, "event_count": 1, "events": [{"title": "Binance Will List ABCUSDT", "related_symbol": "ABC/USDT"}], } upsert_insight( "sentiment_batch", "sentiment_batch:24h", llm_insights.PROMPTS["sentiment_batch_analyze_v1"], llm_insights.PROMPTS["sentiment_batch_analyze_v1"], compute_input_hash(payload), "success", input_payload=payload, content={ "market_mood": "risk_on", "summary": "上币事件带动短线风险偏好", "coin_impacts": [{"symbol": "ABC/USDT", "direction": "positive", "reason": "Binance listing", "need_technical_check": True}], }, model="test-model", ) client = TestClient(web_server.app) resp = client.get("/api/sentiment/analysis") assert resp.status_code == 200 data = resp.json() assert data["analysis"]["summary"] == "上币事件带动短线风险偏好" assert data["source_events"][0]["related_symbol"] == "ABC/USDT" def test_sentiment_batch_enqueues_technical_check_candidates(monkeypatch, temp_db): event_driven_screener.init_event_tables() conn = sqlite3.connect(temp_db) conn.execute( """ INSERT INTO event_news ( event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, processed ) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'), ?, ?, 1) """, ( "seed-llm-candidate", "binance_latest", "AAA/USDT", "Binance announces AAA ecosystem update", "", "A", "important_catalyst", ), ) conn.commit() conn.close() monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) monkeypatch.setattr( llm_insights, "_call_llm_json", lambda prompt, payload: { "status": "success", "model": "m", "content": { "summary": "AAA 需要进入技术检查", "coin_impacts": [ { "symbol": "AAA/USDT", "direction": "positive", "reason": "公告催化且主题关注升温", "confidence": 88, "need_technical_check": True, }, { "symbol": "BBB/USDT", "direction": "positive", "reason": "置信度不足", "confidence": 40, "need_technical_check": True, }, ], }, }, ) result = llm_insights.generate_sentiment_batch_analysis(limit=10) assert result["candidate_events"]["queued"] == 1 assert result["candidate_events"]["symbols"] == ["AAA/USDT"] conn = sqlite3.connect(temp_db) conn.row_factory = sqlite3.Row rows = conn.execute( "SELECT source, symbol, event_type, processed, decision, rec_id FROM event_news WHERE source='llm_sentiment'" ).fetchall() conn.close() assert len(rows) == 1 assert rows[0]["symbol"] == "AAA/USDT" assert rows[0]["event_type"] == "llm_sentiment_candidate" assert rows[0]["processed"] == 0 assert rows[0]["decision"] in ("", None) assert rows[0]["rec_id"] == 0 def test_llm_insights_page_route(temp_db): client = TestClient(web_server.app) resp = client.get("/llm-insights") assert resp.status_code == 200 assert "AI 记录" in resp.text def test_mojibake_json_is_repaired_for_display(): raw = {"结论": "ç­\x89å\x9b\x9e踩ï¼\x8cä¸\x8d追é«\x98", "list": ["æ\xa0¸å¿\x83å\x8e\x9få\x9b\xa0"]} fixed = repair_mojibake_json(raw) assert fixed["结论"] == "等回踩,不追高" assert fixed["list"][0] == "核心原因" def test_mixed_mojibake_text_is_repaired(): raw = "AI舆情候选 ABC/USDT: Binance Futuresæ\x96°ä¸\x8a线永ç»\xadå\x90\x88约" fixed = repair_mojibake_json(raw) assert fixed == "AI舆情候选 ABC/USDT: Binance Futures新上线永续合约"