From 8ddcb08c79b3be37503e62a8812f53f58ae23b90 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Mon, 8 Jun 2026 09:17:14 +0800 Subject: [PATCH] 1 --- .env.example | 1 + app/config/system_config.py | 4 + app/services/chat_assistant.py | 23 +++--- app/services/llm_insights.py | 132 ++++++++++++++++++++++++++++++--- tests/test_llm_insights.py | 49 ++++++++++++ 5 files changed, 186 insertions(+), 23 deletions(-) diff --git a/.env.example b/.env.example index a312c34..2ac2959 100644 --- a/.env.example +++ b/.env.example @@ -38,6 +38,7 @@ ALPHAX_LLM_MAX_TOKENS=900 ALPHAX_LLM_RECOMMENDATIONS_ENABLED=1 ALPHAX_LLM_SENTIMENT_ENABLED=1 ALPHAX_LLM_REVIEW_ENABLED=1 +ALPHAX_LLM_SENTIMENT_BATCH_MAX_EVENTS=12 # 策略交易挂单门控。wait_pullback 只是候选,必须通过这些条件才会创建挂单。 ALPHAX_PAPER_TRADING_MODE=intraday_trading diff --git a/app/config/system_config.py b/app/config/system_config.py index 1a45527..17daea8 100644 --- a/app/config/system_config.py +++ b/app/config/system_config.py @@ -63,6 +63,8 @@ def default_llm_config(): "model": _env_str("ALPHAX_LLM_MODEL", "gpt-4o-mini"), "timeout": _env_int("ALPHAX_LLM_TIMEOUT", 20), "max_tokens": _env_int("ALPHAX_LLM_MAX_TOKENS", 900), + "reasoning_effort": _env_str("ALPHAX_LLM_REASONING_EFFORT", ""), + "thinking_enabled": _env_bool("ALPHAX_LLM_THINKING_ENABLED", False), "modules": { "recommendations": _env_bool("ALPHAX_LLM_RECOMMENDATIONS_ENABLED", True), "sentiment": _env_bool("ALPHAX_LLM_SENTIMENT_ENABLED", True), @@ -82,6 +84,8 @@ def _llm_env_overrides(): "ALPHAX_LLM_MODEL": ("model", lambda: _env_str("ALPHAX_LLM_MODEL", "gpt-4o-mini")), "ALPHAX_LLM_TIMEOUT": ("timeout", lambda: _env_int("ALPHAX_LLM_TIMEOUT", 20)), "ALPHAX_LLM_MAX_TOKENS": ("max_tokens", lambda: _env_int("ALPHAX_LLM_MAX_TOKENS", 900)), + "ALPHAX_LLM_REASONING_EFFORT": ("reasoning_effort", lambda: _env_str("ALPHAX_LLM_REASONING_EFFORT", "")), + "ALPHAX_LLM_THINKING_ENABLED": ("thinking_enabled", lambda: _env_bool("ALPHAX_LLM_THINKING_ENABLED", False)), } for env_name, (key, loader) in checks.items(): if _env_present(env_name): diff --git a/app/services/chat_assistant.py b/app/services/chat_assistant.py index 4d69a20..bc8934a 100644 --- a/app/services/chat_assistant.py +++ b/app/services/chat_assistant.py @@ -21,7 +21,7 @@ from app.db import chat_assistant_db from app.db.analytics import get_pipeline_runs from app.db.llm_insights import compute_input_hash, repair_mojibake_json, repair_mojibake_text from app.db.schema import get_conn -from app.services.llm_insights import _message_content_text, _parse_json_object_text, get_llm_params +from app.services.llm_insights import _chat_completion_body, _message_content_text, _parse_json_object_text, get_llm_params from app.services.market_overview import get_crypto_market_overview @@ -598,19 +598,20 @@ def _call_chat_llm(message: str, context: dict, history=None) -> dict: } system_prompt = "你是 AlphaX Agent 的 Crypto 研究助手。你只能基于提供的结构化数据回答,不能编造数据。" try: + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": _compact_json(payload)}, + ] resp = requests.post( f"{base_url}/chat/completions", headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, - json={ - "model": model, - "messages": [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": _compact_json(payload)}, - ], - "temperature": 0.15, - "max_tokens": int(params.get("max_tokens") or 900), - "response_format": {"type": "json_object"}, - }, + json=_chat_completion_body( + params, + messages, + int(params.get("max_tokens") or 900), + response_format=True, + temperature=0.15, + ), timeout=int(params.get("timeout") or 20), ) if resp.status_code >= 400: diff --git a/app/services/llm_insights.py b/app/services/llm_insights.py index f14ee06..3925ea1 100644 --- a/app/services/llm_insights.py +++ b/app/services/llm_insights.py @@ -26,6 +26,13 @@ def _env_bool(name, default=False): return str(value).strip().lower() in ("1", "true", "yes", "on") +def _env_int(name, default=0): + try: + return int(os.getenv(name, default)) + except Exception: + return int(default or 0) + + def get_llm_params(): """Runtime LLM config. This is system config, not strategy config.""" cfg = llm_config() @@ -36,6 +43,8 @@ def get_llm_params(): "model": str(cfg.get("model") or "gpt-4o-mini").strip(), "timeout": int(cfg.get("timeout") or 20), "max_tokens": int(cfg.get("max_tokens") or 900), + "reasoning_effort": str(cfg.get("reasoning_effort") or "").strip(), + "thinking_enabled": bool(cfg.get("thinking_enabled", False)), "modules": cfg.get("modules") or {}, } @@ -91,10 +100,11 @@ def _message_content_text(message: dict) -> str: return "" -def _parse_json_object_text(content: str) -> dict: +def _parse_json_object_text(content: str, *, detail=None) -> dict: text = str(content or "").strip() if not text: - raise ValueError("llm_empty_content") + suffix = f":{_dump_json(detail)}" if detail else "" + raise ValueError(f"llm_empty_content{suffix}") if text.startswith("```"): text = text.strip("`").strip() if text.lower().startswith("json"): @@ -114,15 +124,28 @@ def _parse_json_object_text(content: str) -> dict: return parsed -def _chat_completion_request(base_url, api_key, model, messages, max_tokens, timeout, *, response_format=True): +def _chat_completion_body(params, messages, max_tokens, *, response_format=True, temperature=0.2): + model = str(params.get("model") or "").strip() body = { "model": model, "messages": messages, - "temperature": 0.2, + "temperature": temperature, "max_tokens": max_tokens, } if response_format: body["response_format"] = {"type": "json_object"} + reasoning_effort = str(params.get("reasoning_effort") or "").strip() + if reasoning_effort: + body["reasoning_effort"] = reasoning_effort + if bool(params.get("thinking_enabled", False)): + body["thinking"] = {"type": "enabled"} + return body + + +def _chat_completion_request(base_url, api_key, model, messages, max_tokens, timeout, *, response_format=True, params=None, temperature=0.2): + request_params = dict(params or {}) + request_params.setdefault("model", model) + body = _chat_completion_body(request_params, messages, max_tokens, response_format=response_format, temperature=temperature) return requests.post( f"{base_url}/chat/completions", headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, @@ -135,9 +158,15 @@ def _parse_llm_response(resp, model): if resp.status_code >= 400: return {"status": "failed", "error": f"http_{resp.status_code}", "raw": resp.text[:1000], "model": model} data = resp.json() - message = (((data.get("choices") or [{}])[0]).get("message") or {}) + choice = (data.get("choices") or [{}])[0] + message = (choice.get("message") or {}) content = _message_content_text(message) - parsed = _parse_json_object_text(content) + detail = { + "finish_reason": choice.get("finish_reason"), + "message_keys": sorted(message.keys()) if isinstance(message, dict) else [], + "usage": data.get("usage") or {}, + } + parsed = _parse_json_object_text(content, detail=detail) return {"status": "success", "content": parsed, "model": model} @@ -164,7 +193,7 @@ def _call_llm_json(prompt_version, payload): {"role": "user", "content": user_prompt}, ] try: - resp = _chat_completion_request(base_url, api_key, model, messages, max_tokens, timeout, response_format=True) + resp = _chat_completion_request(base_url, api_key, model, messages, max_tokens, timeout, response_format=True, params=params) first = _parse_llm_response(resp, model) if first.get("status") == "success": return first @@ -173,9 +202,10 @@ def _call_llm_json(prompt_version, payload): # Some OpenAI-compatible providers occasionally return an empty message # when strict JSON response_format is enabled. Retry once with prompt-only # JSON enforcement so the job does not fail silently on provider quirks. - retryable = str(exc) in ("llm_empty_content", "llm_empty_json_object") + error_text = str(exc) + retryable = error_text.startswith("llm_empty_content") or error_text.startswith("llm_empty_json_object") if not retryable: - return {"status": "failed", "error": str(exc)[:1000], "model": model} + return {"status": "failed", "error": error_text[:1000], "model": model} try: retry_messages = [ { @@ -184,7 +214,7 @@ def _call_llm_json(prompt_version, payload): }, {"role": "user", "content": user_prompt}, ] - resp = _chat_completion_request(base_url, api_key, model, retry_messages, max_tokens, timeout, response_format=False) + resp = _chat_completion_request(base_url, api_key, model, retry_messages, max_tokens, timeout, response_format=False, params=params) result = _parse_llm_response(resp, model) if result.get("status") == "success": result["retry"] = "without_response_format" @@ -423,6 +453,83 @@ def _build_sentiment_batch_payload(hours=24, limit=40): } +def _sentiment_event_rank(item): + importance_weight = {"S": 100, "A": 80, "RISK": 75, "B": 50, "C": 20} + score = importance_weight.get(str(item.get("importance") or "").upper(), 30) + if item.get("rec_id"): + score += 18 + if item.get("pushed"): + score += 10 + if item.get("related_symbol") or item.get("related_base"): + score += 8 + if item.get("tech_score") not in (None, ""): + try: + score += min(max(float(item.get("tech_score") or 0), 0), 100) / 10 + except Exception: + pass + event_type = str(item.get("event_type") or "").lower() + title = str(item.get("title") or "").lower() + if any(k in event_type or k in title for k in ("listing", "launch", "hack", "exploit", "upgrade", "partnership", "上币", "主网", "黑客", "漏洞", "升级", "合作")): + score += 12 + if item.get("trend_rank"): + try: + score += max(0, 20 - float(item.get("trend_rank") or 0)) + except Exception: + pass + return score + + +def _compact_sentiment_batch_payload(payload, max_events=None): + """Shrink noisy news/event rows before sending to the LLM.""" + max_events = int(max_events or _env_int("ALPHAX_LLM_SENTIMENT_BATCH_MAX_EVENTS", 12) or 12) + events = list((payload or {}).get("events") or []) + ranked = sorted(events, key=_sentiment_event_rank, reverse=True) + compact_events = [] + for idx, item in enumerate(ranked[:max(max_events, 1)], start=1): + compact_events.append({ + "rank": idx, + "event_id": item.get("event_id"), + "source": item.get("source"), + "symbol": item.get("related_symbol") or item.get("symbol"), + "base": item.get("related_base"), + "title": str(item.get("title") or "")[:160], + "published_at": item.get("published_at"), + "importance": item.get("importance"), + "event_type": item.get("event_type"), + "decision": item.get("decision"), + "tech_score": item.get("tech_score"), + "trend_rank": item.get("trend_rank"), + "trend_score": item.get("trend_score"), + "change_24h_pct": item.get("change_24h_pct"), + }) + return { + "target_type": payload.get("target_type"), + "target_id": payload.get("target_id"), + "hours": payload.get("hours"), + "generated_at": payload.get("generated_at"), + "source_event_count": len(events), + "event_count": len(compact_events), + "events": compact_events, + "instructions": { + "role": "作为加密市场舆情分析师,基于精简事件判断短线市场影响。", + "focus": [ + "只提炼真正可能影响 Binance 山寨币交易决策的主题", + "给出受影响币种、方向、理由和置信度", + "标记是否需要进入技术检查", + "如果没有有效事件,也必须返回空数组和 neutral 结论", + ], + "expected_schema": { + "market_mood": "risk_on|neutral|risk_off", + "summary": "中文摘要", + "hot_themes": [{"theme": "", "impact": "", "symbols": [], "confidence": 0}], + "coin_impacts": [{"symbol": "", "direction": "positive|negative|risk|neutral", "reason": "", "confidence": 0, "need_technical_check": False}], + "risk_events": [{"title": "", "symbols": [], "risk_type": "", "severity": "low|medium|high"}], + "watchlist": [{"symbol": "", "why": "", "trigger": ""}], + }, + }, + } + + def _build_review_payload(item): return { "target_type": "review", @@ -565,9 +672,10 @@ def generate_sentiment_insights(limit=30): def generate_sentiment_batch_analysis(limit=40, hours=24): if not get_llm_module_enabled("sentiment"): return {"status": "skipped", "reason": "module_disabled", "processed": 0} - payload = _build_sentiment_batch_payload(hours=hours, limit=limit) - if not payload.get("events"): + source_payload = _build_sentiment_batch_payload(hours=hours, limit=limit) + if not source_payload.get("events"): return {"status": "skipped", "reason": "no_sentiment_events", "processed": 0} + payload = _compact_sentiment_batch_payload(source_payload) input_hash = compute_input_hash(payload) cached = get_any_insight("sentiment_batch", payload["target_id"], PROMPTS["sentiment_batch_analyze_v1"], input_hash) if cached: diff --git a/tests/test_llm_insights.py b/tests/test_llm_insights.py index 5750c76..16f83cf 100644 --- a/tests/test_llm_insights.py +++ b/tests/test_llm_insights.py @@ -221,6 +221,33 @@ def test_llm_json_code_fence_is_parsed(monkeypatch): assert result["content"]["summary"] == "ok" +def test_deepseek_thinking_is_not_sent_unless_enabled(): + body = llm_insights._chat_completion_body( + { + "model": "deepseek-v4-pro", + "reasoning_effort": "", + "thinking_enabled": False, + }, + [{"role": "user", "content": "Hello"}], + 100, + ) + assert body["model"] == "deepseek-v4-pro" + assert "thinking" not in body + assert "reasoning_effort" not in body + + enabled = llm_insights._chat_completion_body( + { + "model": "deepseek-v4-pro", + "reasoning_effort": "high", + "thinking_enabled": True, + }, + [{"role": "user", "content": "Hello"}], + 100, + ) + assert enabled["reasoning_effort"] == "high" + assert enabled["thinking"] == {"type": "enabled"} + + def test_only_key_samples_generate_insights(monkeypatch, temp_db): _insert_recommendation(temp_db, symbol="CCC/USDT", action_status="观察", execution_status="observe", display_bucket="watch_pool", state_reason="普通观察") _insert_recommendation(temp_db, symbol="DDD/USDT", action_status="等回踩", execution_status="wait_pullback", display_bucket="realtime", rec_time="2026-05-01T12:00:00") @@ -312,6 +339,28 @@ def test_sentiment_batch_analysis_api_returns_cached_result(temp_db): assert data["source_events"][0]["related_symbol"] == "ABC/USDT" +def test_sentiment_batch_payload_is_compacted_before_llm(monkeypatch): + payload = { + "target_type": "sentiment_batch", + "target_id": "sentiment_batch:24h", + "hours": 24, + "generated_at": "2026-06-08T00:00:00", + "events": [ + {"event_id": "low", "title": "small update", "importance": "C", "url": "https://example.com/low"}, + {"event_id": "listing", "title": "Binance Will List AAAUSDT", "importance": "A", "related_symbol": "AAA/USDT", "event_type": "listing", "url": "https://example.com/high"}, + {"event_id": "risk", "title": "BBB exploit risk", "importance": "RISK", "related_symbol": "BBB/USDT", "event_type": "exploit", "url": "https://example.com/risk"}, + ], + } + monkeypatch.setenv("ALPHAX_LLM_SENTIMENT_BATCH_MAX_EVENTS", "2") + + compact = llm_insights._compact_sentiment_batch_payload(payload) + + assert compact["source_event_count"] == 3 + assert compact["event_count"] == 2 + assert [item["event_id"] for item in compact["events"]] == ["listing", "risk"] + assert "url" not in compact["events"][0] + + def test_sentiment_batch_enqueues_technical_check_candidates(monkeypatch, temp_db): event_driven_screener.init_event_tables() conn = sqlite3.connect(temp_db)