"""Async cached LLM explanation layer.""" import json import os from datetime import datetime, timedelta import requests from app.config.system_config import llm_config from app.core.opportunity_lifecycle import normalize_action_status from app.db.altcoin_db import get_conn, _derive_execution_fields from app.db.llm_insights import compute_input_hash, get_any_insight, get_insights_for_targets, get_latest_insight_by_type, upsert_insight PROMPTS = { "recommendation_explain_v1": "recommendation_explain_v1", "sentiment_explain_v1": "sentiment_explain_v1", "sentiment_batch_analyze_v1": "sentiment_batch_analyze_v1", "review_memo_v1": "review_memo_v1", } def _env_bool(name, default=False): value = os.getenv(name) if value is None: return default return str(value).strip().lower() in ("1", "true", "yes", "on") def _env_int(name, default=0): try: return int(os.getenv(name, default)) except Exception: return int(default or 0) def get_llm_params(): """Runtime LLM config. This is system config, not strategy config.""" cfg = llm_config() return { "enabled": bool(cfg.get("enabled", False)), "base_url": str(cfg.get("base_url") or "https://api.openai.com/v1").strip(), "api_key_env": str(cfg.get("api_key_env") or "ALPHAX_LLM_API_KEY").strip(), "model": str(cfg.get("model") or "gpt-4o-mini").strip(), "timeout": int(cfg.get("timeout") or 20), "max_tokens": int(cfg.get("max_tokens") or 900), "reasoning_effort": str(cfg.get("reasoning_effort") or "").strip(), "thinking_enabled": bool(cfg.get("thinking_enabled", False)), "modules": cfg.get("modules") or {}, } def get_llm_module_enabled(module_name): params = get_llm_params() if not params.get("enabled", False): return False modules = params.get("modules") or {} if module_name in modules: return bool(modules.get(module_name)) env_name = f"ALPHAX_LLM_{str(module_name or '').upper()}_ENABLED" return _env_bool(env_name, True) def _dump_json(value): return json.dumps(value or {}, ensure_ascii=False, sort_keys=True, default=str) def _get_target_key(value): if value is None: return "" return str(value) def _json_fallback(value, fallback=None): try: return json.loads(value) if isinstance(value, str) else (value if value is not None else fallback) except Exception: return fallback def _parse_insight_payload(content): if not isinstance(content, dict): return {} if isinstance(content.get("content"), dict): return content["content"] return content def _message_content_text(message: dict) -> str: content = (message or {}).get("content") if isinstance(content, str): return content.strip() if isinstance(content, list): parts = [] for item in content: if isinstance(item, str): parts.append(item) elif isinstance(item, dict): parts.append(str(item.get("text") or item.get("content") or "")) return "\n".join(x for x in parts if x).strip() return "" def _parse_json_object_text(content: str, *, detail=None) -> dict: text = str(content or "").strip() if not text: suffix = f":{_dump_json(detail)}" if detail else "" raise ValueError(f"llm_empty_content{suffix}") if text.startswith("```"): text = text.strip("`").strip() if text.lower().startswith("json"): text = text[4:].strip() try: parsed = json.loads(text) except json.JSONDecodeError: start = text.find("{") end = text.rfind("}") if start < 0 or end <= start: raise parsed = json.loads(text[start:end + 1]) if not isinstance(parsed, dict): raise ValueError("llm_output_not_object") if not parsed: raise ValueError("llm_empty_json_object") return parsed def _chat_completion_body(params, messages, max_tokens, *, response_format=True, temperature=0.2): model = str(params.get("model") or "").strip() body = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, } if response_format: body["response_format"] = {"type": "json_object"} reasoning_effort = str(params.get("reasoning_effort") or "").strip() if reasoning_effort: body["reasoning_effort"] = reasoning_effort if bool(params.get("thinking_enabled", False)): body["thinking"] = {"type": "enabled"} return body def _chat_completion_request(base_url, api_key, model, messages, max_tokens, timeout, *, response_format=True, params=None, temperature=0.2): request_params = dict(params or {}) request_params.setdefault("model", model) body = _chat_completion_body(request_params, messages, max_tokens, response_format=response_format, temperature=temperature) return requests.post( f"{base_url}/chat/completions", headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, json=body, timeout=timeout, ) def _parse_llm_response(resp, model): if resp.status_code >= 400: return {"status": "failed", "error": f"http_{resp.status_code}", "raw": resp.text[:1000], "model": model} data = resp.json() choice = (data.get("choices") or [{}])[0] message = (choice.get("message") or {}) content = _message_content_text(message) detail = { "finish_reason": choice.get("finish_reason"), "message_keys": sorted(message.keys()) if isinstance(message, dict) else [], "usage": data.get("usage") or {}, } parsed = _parse_json_object_text(content, detail=detail) return {"status": "success", "content": parsed, "model": model} def _call_llm_json(prompt_version, payload): params = get_llm_params() api_key = os.getenv(str(params.get("api_key_env") or "OPENAI_API_KEY"), "").strip() if not params.get("enabled", False) or not api_key: return {"status": "skipped", "error": "llm_disabled_or_missing_key"} base_url = str(params.get("base_url") or "").rstrip("/") model = str(params.get("model") or "").strip() timeout = int(params.get("timeout") or 20) max_tokens = int(params.get("max_tokens") or 900) system_prompt = ( "You are a crypto research assistant. Return strict JSON only. " "Do not change trading decisions, scores, or strategy state." ) user_prompt = _dump_json({ "prompt_version": prompt_version, "input": payload, "output_schema_hint": "JSON object with concise Chinese fields only", }) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] try: resp = _chat_completion_request(base_url, api_key, model, messages, max_tokens, timeout, response_format=True, params=params) first = _parse_llm_response(resp, model) if first.get("status") == "success": return first return first except ValueError as exc: # Some OpenAI-compatible providers occasionally return an empty message # when strict JSON response_format is enabled. Retry once with prompt-only # JSON enforcement so the job does not fail silently on provider quirks. error_text = str(exc) retryable = error_text.startswith("llm_empty_content") or error_text.startswith("llm_empty_json_object") if not retryable: return {"status": "failed", "error": error_text[:1000], "model": model} try: retry_messages = [ { "role": "system", "content": system_prompt + " Return one valid JSON object. No markdown, no prose.", }, {"role": "user", "content": user_prompt}, ] retry_max_tokens = max_tokens if "finish_reason" in error_text and "length" in error_text: retry_max_tokens = min(max(max_tokens * 2, max_tokens + 1200), 8192) resp = _chat_completion_request(base_url, api_key, model, retry_messages, retry_max_tokens, timeout, response_format=False, params=params) result = _parse_llm_response(resp, model) if result.get("status") == "success": result["retry"] = "without_response_format" return result except Exception as retry_exc: return {"status": "failed", "error": f"{exc}; retry_failed:{str(retry_exc)[:800]}", "model": model} except json.JSONDecodeError as exc: return {"status": "failed", "error": f"invalid_json:{exc}", "model": model} except Exception as exc: return {"status": "failed", "error": str(exc)[:1000], "model": model} def _should_generate_recommendation(row): action_status = normalize_action_status(row.get("action_status") or row.get("entry_plan", {}).get("entry_action") or "持有", row.get("status") or "active") execution_status = str(row.get("execution_status") or "") observe_tier = str(row.get("observe_tier") or "") state_reason = str(row.get("state_reason") or row.get("execution_reason") or "") entry_window = row.get("entry_window") or {} if execution_status in ("buy_now", "wait_pullback", "invalid") or action_status in ("可即刻买入", "等回踩", "衰减") or row.get("display_bucket") == "realtime": return True if observe_tier == "strong" and ("回踩" in state_reason or "入场" in state_reason or "失效" in state_reason): return True if isinstance(entry_window, dict) and entry_window.get("status") == "active": return True if "重点观察" in state_reason: return True return False def _should_generate_sentiment(row): importance = str(row.get("importance") or "").upper() source = str(row.get("source") or "").lower() title = str(row.get("title") or "") if importance in ("A", "S", "RISK"): return True if "binance" in source: return True if any(k in title.lower() for k in ("listing", "launch", "mainnet", "upgrade", "partnership", "hack", "exploit", "burn", "合约", "上币", "主网", "升级", "合作", "黑客", "漏洞")): return True return False def _is_internal_sentiment_event(row): event_type = str(row.get("event_type") or "") title = str(row.get("title") or "") source = str(row.get("source") or "") return ( event_type in ("market_heat", "theme_expansion", "theme_direct", "llm_sentiment_candidate") or source == "llm_sentiment" or title.startswith("[主题扩散:") ) def _should_generate_review(item): metrics = item.get("metrics") or {} release_decision = str(item.get("release_decision") or "") failure_count = int(metrics.get("fail_count") or 0) hit_count = int(metrics.get("hit_count") or 0) pollution = item.get("pollution_summary") or {} if release_decision in ("gray", "release", "hold"): return True if failure_count > 0 or hit_count > 0: return True if int(pollution.get("contaminated_symbol_count") or 0) > 0: return True return False def _build_recommendation_payload(row): entry_plan = row.get("entry_plan") or _json_fallback(row.get("entry_plan_json"), {}) or {} signals = row.get("signals") or _json_fallback(row.get("signals"), []) or [] if isinstance(signals, str): signals = _json_fallback(signals, []) or [] return { "target_type": "recommendation", "target_id": row.get("id"), "symbol": row.get("symbol"), "rec_time": row.get("rec_time"), "status": row.get("status"), "action_status": row.get("action_status"), "execution_status": row.get("execution_status"), "execution_label": row.get("execution_label"), "execution_reason": row.get("execution_reason"), "rec_score": row.get("rec_score"), "entry_price": row.get("entry_price"), "current_price": row.get("current_price"), "stop_loss": row.get("stop_loss"), "tp1": row.get("tp1"), "tp2": row.get("tp2"), "observe_tier": row.get("observe_tier"), "observe_reason": row.get("observe_reason"), "state_reason": row.get("state_reason"), "entry_window": row.get("entry_window"), "market_context": row.get("market_context"), "derivatives_context": row.get("derivatives_context"), "sector_context": row.get("sector_context"), "entry_plan": entry_plan, "signals": signals, } def _build_sentiment_payload(row): return { "target_type": "sentiment", "target_id": row.get("event_id") or row.get("id"), "source": row.get("source"), "source_label": row.get("source_label"), "event_type": row.get("event_type"), "importance": row.get("importance"), "title": row.get("title"), "related_symbol": row.get("related_symbol"), "related_base": row.get("related_base"), "decision": row.get("decision"), "tech_score": row.get("tech_score"), "published_at": row.get("published_at"), "detected_at": row.get("detected_at"), "relation_tag": row.get("relation_tag"), "in_active": row.get("in_active"), "in_screened": row.get("in_screened"), } def _build_sentiment_batch_payload(hours=24, limit=40): conn = get_conn() events = [] try: rows = conn.execute( """ SELECT id, source, symbol, title, url, published_at, detected_at, importance, event_type, decision, tech_score, rec_id, pushed FROM event_news WHERE detected_at >= %s ORDER BY published_at::timestamp DESC, id DESC LIMIT %s """, ((datetime.now() - timedelta(hours=int(hours or 24))).isoformat(), int(limit or 40)), ).fetchall() except Exception: rows = [] for raw in rows: row = { "event_id": f"event_news:{raw[0]}", "source": raw[1], "related_symbol": raw[2], "related_base": (str(raw[2] or "").split("/")[0] or "").upper(), "title": raw[3], "url": raw[4], "published_at": raw[5], "detected_at": raw[6], "importance": raw[7], "event_type": raw[8], "decision": raw[9], "tech_score": raw[10], "rec_id": raw[11], "pushed": bool(raw[12]), } if _is_internal_sentiment_event(row): continue events.append(row) try: trend_rows = conn.execute( """ SELECT id, symbol, name, trend_rank, trend_score, market_cap_rank, detected_at, extra_json FROM sentiment_events WHERE detected_at = (SELECT MAX(detected_at) FROM sentiment_events WHERE source='coingecko') ORDER BY trend_rank LIMIT 20 """ ).fetchall() except Exception: trend_rows = [] conn.close() trend_news = [] for raw in trend_rows: extra = _json_fallback(raw[7], {}) or {} for n in (extra.get("news") or [])[:3]: title = n.get("title") or "" if not title: continue trend_news.append({ "event_id": f"sentiment_event:{raw[0]}:{n.get('url') or title}", "source": n.get("source") or "news", "related_symbol": f"{str(raw[1] or '').upper()}/USDT", "related_base": str(raw[1] or "").upper(), "related_name": raw[2] or raw[1], "title": title[:180], "url": n.get("url") or "", "published_at": n.get("published") or "", "detected_at": raw[6], "importance": "B", "event_type": "news", "trend_rank": raw[3], "trend_score": raw[4], "market_cap_rank": raw[5], "price_usd": extra.get("price_usd", 0), "change_24h_pct": extra.get("change_24h_pct", 0), }) combined = events + trend_news seen = set() deduped = [] for item in combined: key = ((item.get("title") or "").strip().lower(), item.get("related_base"), item.get("source")) if key in seen: continue seen.add(key) deduped.append(item) deduped = deduped[: int(limit or 40)] return { "target_type": "sentiment_batch", "target_id": f"sentiment_batch:{int(hours or 24)}h", "hours": int(hours or 24), "generated_at": datetime.now().isoformat(), "event_count": len(deduped), "events": deduped, "instructions": { "role": "作为加密市场舆情分析师,判断这些新闻对山寨币行情的影响。", "focus": [ "归纳主线叙事和受影响币种", "区分利好、利空、风险和噪音", "给出可信度和短线影响窗口", "指出哪些币种需要触发技术检查", "不要给买卖指令,只做舆情影响分析", ], "expected_schema": { "market_mood": "risk_on|neutral|risk_off", "summary": "中文摘要", "hot_themes": [{"theme": "", "impact": "", "symbols": [], "confidence": 0}], "coin_impacts": [{"symbol": "", "direction": "positive|negative|risk|neutral", "reason": "", "confidence": 0, "need_technical_check": False}], "risk_events": [{"title": "", "symbols": [], "risk_type": "", "severity": "low|medium|high"}], "watchlist": [{"symbol": "", "why": "", "trigger": ""}], }, }, } def _sentiment_event_rank(item): importance_weight = {"S": 100, "A": 80, "RISK": 75, "B": 50, "C": 20} score = importance_weight.get(str(item.get("importance") or "").upper(), 30) if item.get("rec_id"): score += 18 if item.get("pushed"): score += 10 if item.get("related_symbol") or item.get("related_base"): score += 8 if item.get("tech_score") not in (None, ""): try: score += min(max(float(item.get("tech_score") or 0), 0), 100) / 10 except Exception: pass event_type = str(item.get("event_type") or "").lower() title = str(item.get("title") or "").lower() if any(k in event_type or k in title for k in ("listing", "launch", "hack", "exploit", "upgrade", "partnership", "上币", "主网", "黑客", "漏洞", "升级", "合作")): score += 12 if item.get("trend_rank"): try: score += max(0, 20 - float(item.get("trend_rank") or 0)) except Exception: pass return score def _compact_sentiment_batch_payload(payload, max_events=None): """Shrink noisy news/event rows before sending to the LLM.""" max_events = int(max_events or _env_int("ALPHAX_LLM_SENTIMENT_BATCH_MAX_EVENTS", 40) or 40) events = list((payload or {}).get("events") or []) ranked = sorted(events, key=_sentiment_event_rank, reverse=True) selected = [] seen = set() for item in ranked: title_key = " ".join(str(item.get("title") or "").lower().split())[:96] symbol_key = str(item.get("related_symbol") or item.get("symbol") or item.get("related_base") or "").upper() key = (title_key, symbol_key) if key in seen: continue seen.add(key) selected.append(item) if len(selected) >= max(max_events, 1): break compact_events = [] for idx, item in enumerate(selected, start=1): compact_events.append({ "rank": idx, "event_id": item.get("event_id"), "source": item.get("source"), "symbol": item.get("related_symbol") or item.get("symbol"), "base": item.get("related_base"), "name": item.get("related_name"), "title": str(item.get("title") or "")[:220], "published_at": item.get("published_at"), "detected_at": item.get("detected_at"), "importance": item.get("importance"), "event_type": item.get("event_type"), "decision": item.get("decision"), "tech_score": item.get("tech_score"), "rec_id": item.get("rec_id"), "pushed": bool(item.get("pushed", False)), "trend_rank": item.get("trend_rank"), "trend_score": item.get("trend_score"), "market_cap_rank": item.get("market_cap_rank"), "change_24h_pct": item.get("change_24h_pct"), }) return { "target_type": payload.get("target_type"), "target_id": payload.get("target_id"), "hours": payload.get("hours"), "generated_at": payload.get("generated_at"), "source_event_count": len(events), "event_count": len(compact_events), "events": compact_events, "instructions": { "role": "作为加密市场舆情分析师,基于精简事件判断短线市场影响。", "focus": [ "综合更多新闻和事件,提炼当前 24h 市场主线、叙事扩散和风险偏好", "给出受影响币种、方向、理由、置信度、影响窗口和是否值得技术检查", "区分已经反映在价格里的旧消息、可继续观察的催化、以及需要规避的风险", "建议要专业、可执行,但不要直接下真实买卖指令", "标记是否需要进入技术检查", "如果没有有效事件,也必须返回空数组和 neutral 结论", ], "expected_schema": { "market_mood": "risk_on|neutral|risk_off", "summary": "中文摘要", "professional_view": "更专业的市场解读,说明主线、资金偏好、潜在误判点", "hot_themes": [{"theme": "", "impact": "", "symbols": [], "confidence": 0}], "coin_impacts": [{"symbol": "", "direction": "positive|negative|risk|neutral", "reason": "", "confidence": 0, "impact_window": "intraday|1-3d|3-7d", "need_technical_check": False}], "risk_events": [{"title": "", "symbols": [], "risk_type": "", "severity": "low|medium|high"}], "watchlist": [{"symbol": "", "why": "", "trigger": "", "invalid_if": ""}], "suggestions": [{"type": "watch|avoid|verify", "symbol": "", "reason": "", "next_step": ""}], }, }, } def _build_review_payload(item): return { "target_type": "review", "target_id": item.get("id") or item.get("created_at") or item.get("run_date"), "run_date": item.get("run_date"), "created_at": item.get("created_at"), "title": item.get("title"), "summary": item.get("summary"), "metrics": item.get("metrics") or {}, "findings": item.get("findings") or [], "problems": item.get("problems") or [], "actions": item.get("actions") or [], "candidate_rules": item.get("candidate_rules") or [], "success_analysis": item.get("success_analysis") or {}, "failure_analysis": item.get("failure_analysis") or {}, "pollution_summary": item.get("pollution_summary") or {}, "version_change_summary": item.get("version_change_summary") or "", } def generate_recommendation_insights(limit=30): if not get_llm_module_enabled("recommendations"): return {"status": "skipped", "reason": "module_disabled", "processed": 0} conn = get_conn() rows = conn.execute( """ SELECT r.*, lpc.price AS latest_cache_price, lpc.updated_at AS latest_cache_updated_at FROM recommendation r LEFT JOIN latest_price_cache lpc ON lpc.symbol = r.symbol WHERE r.status='active' AND COALESCE(r.display_bucket,'watch_pool') != 'history' ORDER BY r.rec_time DESC """ ).fetchall() conn.close() items = [] seen = set() for row in rows: item = _derive_execution_fields(dict(row)) if not _should_generate_recommendation(item): continue if str(item.get("id")) in seen: continue seen.add(str(item.get("id"))) items.append(item) if limit and len(items) >= int(limit): break processed = 0 for row in items: payload = _build_recommendation_payload(row) input_hash = compute_input_hash(payload) cached = get_any_insight("recommendation", payload["target_id"], PROMPTS["recommendation_explain_v1"], input_hash) if cached: continue result = _call_llm_json(PROMPTS["recommendation_explain_v1"], payload) upsert_insight( "recommendation", payload["target_id"], PROMPTS["recommendation_explain_v1"], PROMPTS["recommendation_explain_v1"], input_hash, result.get("status") or "failed", input_payload=payload, content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")}, error=result.get("error", ""), model=result.get("model", ""), ) processed += 1 return {"status": "success", "processed": processed, "scanned": len(items)} def generate_sentiment_insights(limit=30): if not get_llm_module_enabled("sentiment"): return {"status": "skipped", "reason": "module_disabled", "processed": 0} conn = get_conn() try: rows = conn.execute( """ SELECT id AS event_id, source, symbol, title, url, published_at, detected_at, importance, event_type, decision, tech_score, rec_id, pushed FROM event_news ORDER BY published_at::timestamp DESC, id DESC LIMIT 120 """ ).fetchall() except Exception: rows = [] finally: conn.close() processed = 0 for raw in rows: row = { "event_id": f"event_news:{raw[0]}", "source": raw[1], "symbol": raw[2], "title": raw[3], "published_at": raw[5], "detected_at": raw[6], "importance": raw[7], "event_type": raw[8], "decision": raw[9], "tech_score": raw[10], "rec_id": raw[11], "pushed": raw[12], "source_label": "Binance公告" if "binance" in str(raw[1]).lower() else str(raw[1] or ""), "related_symbol": raw[2], "related_base": (str(raw[2] or "").split("/")[0] or "").upper(), "in_active": False, "in_screened": False, "relation_tag": "", } if not _should_generate_sentiment(row): continue payload = _build_sentiment_payload(row) input_hash = compute_input_hash(payload) cached = get_any_insight("sentiment", payload["target_id"], PROMPTS["sentiment_explain_v1"], input_hash) if cached: continue result = _call_llm_json(PROMPTS["sentiment_explain_v1"], payload) upsert_insight( "sentiment", payload["target_id"], PROMPTS["sentiment_explain_v1"], PROMPTS["sentiment_explain_v1"], input_hash, result.get("status") or "failed", input_payload=payload, content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")}, error=result.get("error", ""), model=result.get("model", ""), ) processed += 1 if limit and processed >= int(limit): break return {"status": "success", "processed": processed} def generate_sentiment_batch_analysis(limit=40, hours=24): if not get_llm_module_enabled("sentiment"): return {"status": "skipped", "reason": "module_disabled", "processed": 0} source_payload = _build_sentiment_batch_payload(hours=hours, limit=limit) if not source_payload.get("events"): return {"status": "skipped", "reason": "no_sentiment_events", "processed": 0} payload = _compact_sentiment_batch_payload(source_payload) input_hash = compute_input_hash(payload) cached = get_any_insight("sentiment_batch", payload["target_id"], PROMPTS["sentiment_batch_analyze_v1"], input_hash) if cached: candidate_result = {"queued": 0, "skipped": 0, "symbols": []} if cached.get("status") == "success": try: from app.services.event_driven_screener import enqueue_llm_sentiment_candidates candidate_result = enqueue_llm_sentiment_candidates( cached.get("content") or {}, source_insight_id=str(cached.get("id") or input_hash), ) except Exception as exc: candidate_result = {"queued": 0, "skipped": 0, "symbols": [], "error": str(exc)[:300]} return { "status": "success", "processed": 0, "cached": True, "event_count": payload.get("event_count", 0), "candidate_events": candidate_result, } result = _call_llm_json(PROMPTS["sentiment_batch_analyze_v1"], payload) candidate_result = {"queued": 0, "skipped": 0, "symbols": []} if result.get("status") == "success": try: from app.services.event_driven_screener import enqueue_llm_sentiment_candidates candidate_result = enqueue_llm_sentiment_candidates( result.get("content") or {}, source_insight_id=input_hash, ) except Exception as exc: candidate_result = {"queued": 0, "skipped": 0, "symbols": [], "error": str(exc)[:300]} upsert_insight( "sentiment_batch", payload["target_id"], PROMPTS["sentiment_batch_analyze_v1"], PROMPTS["sentiment_batch_analyze_v1"], input_hash, result.get("status") or "failed", input_payload=payload, content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")}, error=result.get("error", ""), model=result.get("model", ""), ) return { "status": "success", "processed": 1, "event_count": payload.get("event_count", 0), "candidate_events": candidate_result, } def generate_review_memos(limit=10): if not get_llm_module_enabled("review"): return {"status": "skipped", "reason": "module_disabled", "processed": 0} from app.db.review_queries import get_strategy_iteration_logs logs = get_strategy_iteration_logs(limit=max(limit or 10, 1)) processed = 0 for item in logs: if not _should_generate_review(item): continue payload = _build_review_payload(item) input_hash = compute_input_hash(payload) cached = get_any_insight("review", payload["target_id"], PROMPTS["review_memo_v1"], input_hash) if cached: continue result = _call_llm_json(PROMPTS["review_memo_v1"], payload) upsert_insight( "review", payload["target_id"], PROMPTS["review_memo_v1"], PROMPTS["review_memo_v1"], input_hash, result.get("status") or "failed", input_payload=payload, content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")}, error=result.get("error", ""), model=result.get("model", ""), ) processed += 1 return {"status": "success", "processed": processed} def run(scope="recommendations", limit=30): scope = str(scope or "").strip() if scope == "recommendations": return generate_recommendation_insights(limit=limit) if scope == "sentiment": return generate_sentiment_batch_analysis(limit=limit) if scope == "sentiment-events": return generate_sentiment_insights(limit=limit) if scope == "review": return generate_review_memos(limit=limit) raise ValueError(f"unknown llm scope: {scope}") def attach_recommendation_insights(items): ids = [str(item.get("id")) for item in items or [] if item.get("id") is not None] insights = get_insights_for_targets("recommendation", ids, PROMPTS["recommendation_explain_v1"]) for item in items or []: insight = insights.get(str(item.get("id"))) if insight: item["llm_insight"] = insight return items def attach_sentiment_insights(items): ids = [str(item.get("event_id") or item.get("id")) for item in items or [] if (item.get("event_id") or item.get("id")) is not None] insights = get_insights_for_targets("sentiment", ids, PROMPTS["sentiment_explain_v1"]) for item in items or []: insight = insights.get(str(item.get("event_id") or item.get("id"))) if insight: item["llm_insight"] = insight return items def get_latest_review_memo(): return get_latest_insight_by_type("review", PROMPTS["review_memo_v1"]) def get_latest_sentiment_batch_analysis(): return get_latest_insight_by_type("sentiment_batch", PROMPTS["sentiment_batch_analyze_v1"]) def get_latest_sentiment_batch_attempt(): return get_latest_insight_by_type("sentiment_batch", PROMPTS["sentiment_batch_analyze_v1"], success_only=False) __all__ = [ "PROMPTS", "attach_recommendation_insights", "attach_sentiment_insights", "generate_recommendation_insights", "generate_review_memos", "generate_sentiment_batch_analysis", "generate_sentiment_insights", "get_latest_sentiment_batch_analysis", "get_latest_sentiment_batch_attempt", "get_latest_review_memo", "run", ]