846 lines
33 KiB
Python
846 lines
33 KiB
Python
"""Async cached LLM explanation layer."""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
|
|
import requests
|
|
|
|
from app.config.system_config import llm_config
|
|
from app.core.opportunity_lifecycle import normalize_action_status
|
|
from app.db.altcoin_db import get_conn, _derive_execution_fields
|
|
from app.db.llm_insights import compute_input_hash, get_any_insight, get_insights_for_targets, get_latest_insight_by_type, upsert_insight
|
|
|
|
PROMPTS = {
|
|
"recommendation_explain_v1": "recommendation_explain_v1",
|
|
"sentiment_explain_v1": "sentiment_explain_v1",
|
|
"sentiment_batch_analyze_v1": "sentiment_batch_analyze_v1",
|
|
"review_memo_v1": "review_memo_v1",
|
|
}
|
|
|
|
|
|
def _env_bool(name, default=False):
|
|
value = os.getenv(name)
|
|
if value is None:
|
|
return default
|
|
return str(value).strip().lower() in ("1", "true", "yes", "on")
|
|
|
|
|
|
def _env_int(name, default=0):
|
|
try:
|
|
return int(os.getenv(name, default))
|
|
except Exception:
|
|
return int(default or 0)
|
|
|
|
|
|
def get_llm_params():
|
|
"""Runtime LLM config. This is system config, not strategy config."""
|
|
cfg = llm_config()
|
|
return {
|
|
"enabled": bool(cfg.get("enabled", False)),
|
|
"base_url": str(cfg.get("base_url") or "https://api.openai.com/v1").strip(),
|
|
"api_key_env": str(cfg.get("api_key_env") or "ALPHAX_LLM_API_KEY").strip(),
|
|
"model": str(cfg.get("model") or "gpt-4o-mini").strip(),
|
|
"timeout": int(cfg.get("timeout") or 20),
|
|
"max_tokens": int(cfg.get("max_tokens") or 900),
|
|
"reasoning_effort": str(cfg.get("reasoning_effort") or "").strip(),
|
|
"thinking_enabled": bool(cfg.get("thinking_enabled", False)),
|
|
"modules": cfg.get("modules") or {},
|
|
}
|
|
|
|
|
|
def get_llm_module_enabled(module_name):
|
|
params = get_llm_params()
|
|
if not params.get("enabled", False):
|
|
return False
|
|
modules = params.get("modules") or {}
|
|
if module_name in modules:
|
|
return bool(modules.get(module_name))
|
|
env_name = f"ALPHAX_LLM_{str(module_name or '').upper()}_ENABLED"
|
|
return _env_bool(env_name, True)
|
|
|
|
|
|
def _dump_json(value):
|
|
return json.dumps(value or {}, ensure_ascii=False, sort_keys=True, default=str)
|
|
|
|
|
|
def _get_target_key(value):
|
|
if value is None:
|
|
return ""
|
|
return str(value)
|
|
|
|
|
|
def _json_fallback(value, fallback=None):
|
|
try:
|
|
return json.loads(value) if isinstance(value, str) else (value if value is not None else fallback)
|
|
except Exception:
|
|
return fallback
|
|
|
|
|
|
def _parse_insight_payload(content):
|
|
if not isinstance(content, dict):
|
|
return {}
|
|
if isinstance(content.get("content"), dict):
|
|
return content["content"]
|
|
return content
|
|
|
|
|
|
def _message_content_text(message: dict) -> str:
|
|
content = (message or {}).get("content")
|
|
if isinstance(content, str):
|
|
return content.strip()
|
|
if isinstance(content, list):
|
|
parts = []
|
|
for item in content:
|
|
if isinstance(item, str):
|
|
parts.append(item)
|
|
elif isinstance(item, dict):
|
|
parts.append(str(item.get("text") or item.get("content") or ""))
|
|
return "\n".join(x for x in parts if x).strip()
|
|
return ""
|
|
|
|
|
|
def _parse_json_object_text(content: str, *, detail=None) -> dict:
|
|
text = str(content or "").strip()
|
|
if not text:
|
|
suffix = f":{_dump_json(detail)}" if detail else ""
|
|
raise ValueError(f"llm_empty_content{suffix}")
|
|
if text.startswith("```"):
|
|
text = text.strip("`").strip()
|
|
if text.lower().startswith("json"):
|
|
text = text[4:].strip()
|
|
try:
|
|
parsed = json.loads(text)
|
|
except json.JSONDecodeError:
|
|
start = text.find("{")
|
|
end = text.rfind("}")
|
|
if start < 0 or end <= start:
|
|
raise
|
|
parsed = json.loads(text[start:end + 1])
|
|
if not isinstance(parsed, dict):
|
|
raise ValueError("llm_output_not_object")
|
|
if not parsed:
|
|
raise ValueError("llm_empty_json_object")
|
|
return parsed
|
|
|
|
|
|
def _chat_completion_body(params, messages, max_tokens, *, response_format=True, temperature=0.2):
|
|
model = str(params.get("model") or "").strip()
|
|
body = {
|
|
"model": model,
|
|
"messages": messages,
|
|
"temperature": temperature,
|
|
"max_tokens": max_tokens,
|
|
}
|
|
if response_format:
|
|
body["response_format"] = {"type": "json_object"}
|
|
reasoning_effort = str(params.get("reasoning_effort") or "").strip()
|
|
if reasoning_effort:
|
|
body["reasoning_effort"] = reasoning_effort
|
|
if bool(params.get("thinking_enabled", False)):
|
|
body["thinking"] = {"type": "enabled"}
|
|
return body
|
|
|
|
|
|
def _chat_completion_request(base_url, api_key, model, messages, max_tokens, timeout, *, response_format=True, params=None, temperature=0.2):
|
|
request_params = dict(params or {})
|
|
request_params.setdefault("model", model)
|
|
body = _chat_completion_body(request_params, messages, max_tokens, response_format=response_format, temperature=temperature)
|
|
return requests.post(
|
|
f"{base_url}/chat/completions",
|
|
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
|
|
json=body,
|
|
timeout=timeout,
|
|
)
|
|
|
|
|
|
def _parse_llm_response(resp, model):
|
|
if resp.status_code >= 400:
|
|
return {"status": "failed", "error": f"http_{resp.status_code}", "raw": resp.text[:1000], "model": model}
|
|
data = resp.json()
|
|
choice = (data.get("choices") or [{}])[0]
|
|
message = (choice.get("message") or {})
|
|
content = _message_content_text(message)
|
|
detail = {
|
|
"finish_reason": choice.get("finish_reason"),
|
|
"message_keys": sorted(message.keys()) if isinstance(message, dict) else [],
|
|
"usage": data.get("usage") or {},
|
|
}
|
|
parsed = _parse_json_object_text(content, detail=detail)
|
|
return {"status": "success", "content": parsed, "model": model}
|
|
|
|
|
|
def _call_llm_json(prompt_version, payload):
|
|
params = get_llm_params()
|
|
api_key = os.getenv(str(params.get("api_key_env") or "OPENAI_API_KEY"), "").strip()
|
|
if not params.get("enabled", False) or not api_key:
|
|
return {"status": "skipped", "error": "llm_disabled_or_missing_key"}
|
|
base_url = str(params.get("base_url") or "").rstrip("/")
|
|
model = str(params.get("model") or "").strip()
|
|
timeout = int(params.get("timeout") or 20)
|
|
max_tokens = int(params.get("max_tokens") or 900)
|
|
system_prompt = (
|
|
"You are a crypto research assistant. Return strict JSON only. "
|
|
"Do not change trading decisions, scores, or strategy state."
|
|
)
|
|
user_prompt = _dump_json({
|
|
"prompt_version": prompt_version,
|
|
"input": payload,
|
|
"output_schema_hint": "JSON object with concise Chinese fields only",
|
|
})
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt},
|
|
]
|
|
try:
|
|
resp = _chat_completion_request(base_url, api_key, model, messages, max_tokens, timeout, response_format=True, params=params)
|
|
first = _parse_llm_response(resp, model)
|
|
if first.get("status") == "success":
|
|
return first
|
|
return first
|
|
except ValueError as exc:
|
|
# Some OpenAI-compatible providers occasionally return an empty message
|
|
# when strict JSON response_format is enabled. Retry once with prompt-only
|
|
# JSON enforcement so the job does not fail silently on provider quirks.
|
|
error_text = str(exc)
|
|
retryable = error_text.startswith("llm_empty_content") or error_text.startswith("llm_empty_json_object")
|
|
if not retryable:
|
|
return {"status": "failed", "error": error_text[:1000], "model": model}
|
|
try:
|
|
retry_messages = [
|
|
{
|
|
"role": "system",
|
|
"content": system_prompt + " Return one valid JSON object. No markdown, no prose.",
|
|
},
|
|
{"role": "user", "content": user_prompt},
|
|
]
|
|
retry_max_tokens = max_tokens
|
|
if "finish_reason" in error_text and "length" in error_text:
|
|
retry_max_tokens = min(max(max_tokens * 2, max_tokens + 1200), 8192)
|
|
resp = _chat_completion_request(base_url, api_key, model, retry_messages, retry_max_tokens, timeout, response_format=False, params=params)
|
|
result = _parse_llm_response(resp, model)
|
|
if result.get("status") == "success":
|
|
result["retry"] = "without_response_format"
|
|
return result
|
|
except Exception as retry_exc:
|
|
return {"status": "failed", "error": f"{exc}; retry_failed:{str(retry_exc)[:800]}", "model": model}
|
|
except json.JSONDecodeError as exc:
|
|
return {"status": "failed", "error": f"invalid_json:{exc}", "model": model}
|
|
except Exception as exc:
|
|
return {"status": "failed", "error": str(exc)[:1000], "model": model}
|
|
|
|
|
|
def _should_generate_recommendation(row):
|
|
action_status = normalize_action_status(row.get("action_status") or row.get("entry_plan", {}).get("entry_action") or "持有", row.get("status") or "active")
|
|
execution_status = str(row.get("execution_status") or "")
|
|
observe_tier = str(row.get("observe_tier") or "")
|
|
state_reason = str(row.get("state_reason") or row.get("execution_reason") or "")
|
|
entry_window = row.get("entry_window") or {}
|
|
if execution_status in ("buy_now", "wait_pullback", "invalid") or action_status in ("可即刻买入", "等回踩", "衰减") or row.get("display_bucket") == "realtime":
|
|
return True
|
|
if observe_tier == "strong" and ("回踩" in state_reason or "入场" in state_reason or "失效" in state_reason):
|
|
return True
|
|
if isinstance(entry_window, dict) and entry_window.get("status") == "active":
|
|
return True
|
|
if "重点观察" in state_reason:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _should_generate_sentiment(row):
|
|
importance = str(row.get("importance") or "").upper()
|
|
source = str(row.get("source") or "").lower()
|
|
title = str(row.get("title") or "")
|
|
if importance in ("A", "S", "RISK"):
|
|
return True
|
|
if "binance" in source:
|
|
return True
|
|
if any(k in title.lower() for k in ("listing", "launch", "mainnet", "upgrade", "partnership", "hack", "exploit", "burn", "合约", "上币", "主网", "升级", "合作", "黑客", "漏洞")):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _is_internal_sentiment_event(row):
|
|
event_type = str(row.get("event_type") or "")
|
|
title = str(row.get("title") or "")
|
|
source = str(row.get("source") or "")
|
|
return (
|
|
event_type in ("market_heat", "theme_expansion", "theme_direct", "llm_sentiment_candidate")
|
|
or source == "llm_sentiment"
|
|
or title.startswith("[主题扩散:")
|
|
)
|
|
|
|
|
|
def _should_generate_review(item):
|
|
metrics = item.get("metrics") or {}
|
|
release_decision = str(item.get("release_decision") or "")
|
|
failure_count = int(metrics.get("fail_count") or 0)
|
|
hit_count = int(metrics.get("hit_count") or 0)
|
|
pollution = item.get("pollution_summary") or {}
|
|
if release_decision in ("gray", "release", "hold"):
|
|
return True
|
|
if failure_count > 0 or hit_count > 0:
|
|
return True
|
|
if int(pollution.get("contaminated_symbol_count") or 0) > 0:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _build_recommendation_payload(row):
|
|
entry_plan = row.get("entry_plan") or _json_fallback(row.get("entry_plan_json"), {}) or {}
|
|
signals = row.get("signals") or _json_fallback(row.get("signals"), []) or []
|
|
if isinstance(signals, str):
|
|
signals = _json_fallback(signals, []) or []
|
|
return {
|
|
"target_type": "recommendation",
|
|
"target_id": row.get("id"),
|
|
"symbol": row.get("symbol"),
|
|
"rec_time": row.get("rec_time"),
|
|
"status": row.get("status"),
|
|
"action_status": row.get("action_status"),
|
|
"execution_status": row.get("execution_status"),
|
|
"execution_label": row.get("execution_label"),
|
|
"execution_reason": row.get("execution_reason"),
|
|
"rec_score": row.get("rec_score"),
|
|
"entry_price": row.get("entry_price"),
|
|
"current_price": row.get("current_price"),
|
|
"stop_loss": row.get("stop_loss"),
|
|
"tp1": row.get("tp1"),
|
|
"tp2": row.get("tp2"),
|
|
"observe_tier": row.get("observe_tier"),
|
|
"observe_reason": row.get("observe_reason"),
|
|
"state_reason": row.get("state_reason"),
|
|
"entry_window": row.get("entry_window"),
|
|
"market_context": row.get("market_context"),
|
|
"derivatives_context": row.get("derivatives_context"),
|
|
"sector_context": row.get("sector_context"),
|
|
"entry_plan": entry_plan,
|
|
"signals": signals,
|
|
}
|
|
|
|
|
|
def _build_sentiment_payload(row):
|
|
return {
|
|
"target_type": "sentiment",
|
|
"target_id": row.get("event_id") or row.get("id"),
|
|
"source": row.get("source"),
|
|
"source_label": row.get("source_label"),
|
|
"event_type": row.get("event_type"),
|
|
"importance": row.get("importance"),
|
|
"title": row.get("title"),
|
|
"related_symbol": row.get("related_symbol"),
|
|
"related_base": row.get("related_base"),
|
|
"decision": row.get("decision"),
|
|
"tech_score": row.get("tech_score"),
|
|
"published_at": row.get("published_at"),
|
|
"detected_at": row.get("detected_at"),
|
|
"relation_tag": row.get("relation_tag"),
|
|
"in_active": row.get("in_active"),
|
|
"in_screened": row.get("in_screened"),
|
|
}
|
|
|
|
|
|
def _build_sentiment_batch_payload(hours=24, limit=40):
|
|
conn = get_conn()
|
|
events = []
|
|
try:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, source, symbol, title, url, published_at, detected_at, importance,
|
|
event_type, decision, tech_score, rec_id, pushed
|
|
FROM event_news
|
|
WHERE detected_at >= %s
|
|
ORDER BY published_at::timestamp DESC, id DESC
|
|
LIMIT %s
|
|
""",
|
|
((datetime.now() - timedelta(hours=int(hours or 24))).isoformat(), int(limit or 40)),
|
|
).fetchall()
|
|
except Exception:
|
|
rows = []
|
|
for raw in rows:
|
|
row = {
|
|
"event_id": f"event_news:{raw[0]}",
|
|
"source": raw[1],
|
|
"related_symbol": raw[2],
|
|
"related_base": (str(raw[2] or "").split("/")[0] or "").upper(),
|
|
"title": raw[3],
|
|
"url": raw[4],
|
|
"published_at": raw[5],
|
|
"detected_at": raw[6],
|
|
"importance": raw[7],
|
|
"event_type": raw[8],
|
|
"decision": raw[9],
|
|
"tech_score": raw[10],
|
|
"rec_id": raw[11],
|
|
"pushed": bool(raw[12]),
|
|
}
|
|
if _is_internal_sentiment_event(row):
|
|
continue
|
|
events.append(row)
|
|
|
|
try:
|
|
trend_rows = conn.execute(
|
|
"""
|
|
SELECT id, symbol, name, trend_rank, trend_score, market_cap_rank, detected_at, extra_json
|
|
FROM sentiment_events
|
|
WHERE detected_at = (SELECT MAX(detected_at) FROM sentiment_events WHERE source='coingecko')
|
|
ORDER BY trend_rank
|
|
LIMIT 20
|
|
"""
|
|
).fetchall()
|
|
except Exception:
|
|
trend_rows = []
|
|
conn.close()
|
|
|
|
trend_news = []
|
|
for raw in trend_rows:
|
|
extra = _json_fallback(raw[7], {}) or {}
|
|
for n in (extra.get("news") or [])[:3]:
|
|
title = n.get("title") or ""
|
|
if not title:
|
|
continue
|
|
trend_news.append({
|
|
"event_id": f"sentiment_event:{raw[0]}:{n.get('url') or title}",
|
|
"source": n.get("source") or "news",
|
|
"related_symbol": f"{str(raw[1] or '').upper()}/USDT",
|
|
"related_base": str(raw[1] or "").upper(),
|
|
"related_name": raw[2] or raw[1],
|
|
"title": title[:180],
|
|
"url": n.get("url") or "",
|
|
"published_at": n.get("published") or "",
|
|
"detected_at": raw[6],
|
|
"importance": "B",
|
|
"event_type": "news",
|
|
"trend_rank": raw[3],
|
|
"trend_score": raw[4],
|
|
"market_cap_rank": raw[5],
|
|
"price_usd": extra.get("price_usd", 0),
|
|
"change_24h_pct": extra.get("change_24h_pct", 0),
|
|
})
|
|
|
|
combined = events + trend_news
|
|
seen = set()
|
|
deduped = []
|
|
for item in combined:
|
|
key = ((item.get("title") or "").strip().lower(), item.get("related_base"), item.get("source"))
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
deduped.append(item)
|
|
deduped = deduped[: int(limit or 40)]
|
|
return {
|
|
"target_type": "sentiment_batch",
|
|
"target_id": f"sentiment_batch:{int(hours or 24)}h",
|
|
"hours": int(hours or 24),
|
|
"generated_at": datetime.now().isoformat(),
|
|
"event_count": len(deduped),
|
|
"events": deduped,
|
|
"instructions": {
|
|
"role": "作为加密市场舆情分析师,判断这些新闻对山寨币行情的影响。",
|
|
"focus": [
|
|
"归纳主线叙事和受影响币种",
|
|
"区分利好、利空、风险和噪音",
|
|
"给出可信度和短线影响窗口",
|
|
"指出哪些币种需要触发技术检查",
|
|
"不要给买卖指令,只做舆情影响分析",
|
|
],
|
|
"expected_schema": {
|
|
"market_mood": "risk_on|neutral|risk_off",
|
|
"summary": "中文摘要",
|
|
"hot_themes": [{"theme": "", "impact": "", "symbols": [], "confidence": 0}],
|
|
"coin_impacts": [{"symbol": "", "direction": "positive|negative|risk|neutral", "reason": "", "confidence": 0, "need_technical_check": False}],
|
|
"risk_events": [{"title": "", "symbols": [], "risk_type": "", "severity": "low|medium|high"}],
|
|
"watchlist": [{"symbol": "", "why": "", "trigger": ""}],
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def _sentiment_event_rank(item):
|
|
importance_weight = {"S": 100, "A": 80, "RISK": 75, "B": 50, "C": 20}
|
|
score = importance_weight.get(str(item.get("importance") or "").upper(), 30)
|
|
if item.get("rec_id"):
|
|
score += 18
|
|
if item.get("pushed"):
|
|
score += 10
|
|
if item.get("related_symbol") or item.get("related_base"):
|
|
score += 8
|
|
if item.get("tech_score") not in (None, ""):
|
|
try:
|
|
score += min(max(float(item.get("tech_score") or 0), 0), 100) / 10
|
|
except Exception:
|
|
pass
|
|
event_type = str(item.get("event_type") or "").lower()
|
|
title = str(item.get("title") or "").lower()
|
|
if any(k in event_type or k in title for k in ("listing", "launch", "hack", "exploit", "upgrade", "partnership", "上币", "主网", "黑客", "漏洞", "升级", "合作")):
|
|
score += 12
|
|
if item.get("trend_rank"):
|
|
try:
|
|
score += max(0, 20 - float(item.get("trend_rank") or 0))
|
|
except Exception:
|
|
pass
|
|
return score
|
|
|
|
|
|
def _compact_sentiment_batch_payload(payload, max_events=None):
|
|
"""Shrink noisy news/event rows before sending to the LLM."""
|
|
max_events = int(max_events or _env_int("ALPHAX_LLM_SENTIMENT_BATCH_MAX_EVENTS", 40) or 40)
|
|
events = list((payload or {}).get("events") or [])
|
|
ranked = sorted(events, key=_sentiment_event_rank, reverse=True)
|
|
selected = []
|
|
seen = set()
|
|
for item in ranked:
|
|
title_key = " ".join(str(item.get("title") or "").lower().split())[:96]
|
|
symbol_key = str(item.get("related_symbol") or item.get("symbol") or item.get("related_base") or "").upper()
|
|
key = (title_key, symbol_key)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
selected.append(item)
|
|
if len(selected) >= max(max_events, 1):
|
|
break
|
|
compact_events = []
|
|
for idx, item in enumerate(selected, start=1):
|
|
compact_events.append({
|
|
"rank": idx,
|
|
"event_id": item.get("event_id"),
|
|
"source": item.get("source"),
|
|
"symbol": item.get("related_symbol") or item.get("symbol"),
|
|
"base": item.get("related_base"),
|
|
"name": item.get("related_name"),
|
|
"title": str(item.get("title") or "")[:220],
|
|
"published_at": item.get("published_at"),
|
|
"detected_at": item.get("detected_at"),
|
|
"importance": item.get("importance"),
|
|
"event_type": item.get("event_type"),
|
|
"decision": item.get("decision"),
|
|
"tech_score": item.get("tech_score"),
|
|
"rec_id": item.get("rec_id"),
|
|
"pushed": bool(item.get("pushed", False)),
|
|
"trend_rank": item.get("trend_rank"),
|
|
"trend_score": item.get("trend_score"),
|
|
"market_cap_rank": item.get("market_cap_rank"),
|
|
"change_24h_pct": item.get("change_24h_pct"),
|
|
})
|
|
return {
|
|
"target_type": payload.get("target_type"),
|
|
"target_id": payload.get("target_id"),
|
|
"hours": payload.get("hours"),
|
|
"generated_at": payload.get("generated_at"),
|
|
"source_event_count": len(events),
|
|
"event_count": len(compact_events),
|
|
"events": compact_events,
|
|
"instructions": {
|
|
"role": "作为加密市场舆情分析师,基于精简事件判断短线市场影响。",
|
|
"focus": [
|
|
"综合更多新闻和事件,提炼当前 24h 市场主线、叙事扩散和风险偏好",
|
|
"给出受影响币种、方向、理由、置信度、影响窗口和是否值得技术检查",
|
|
"区分已经反映在价格里的旧消息、可继续观察的催化、以及需要规避的风险",
|
|
"建议要专业、可执行,但不要直接下真实买卖指令",
|
|
"标记是否需要进入技术检查",
|
|
"如果没有有效事件,也必须返回空数组和 neutral 结论",
|
|
],
|
|
"expected_schema": {
|
|
"market_mood": "risk_on|neutral|risk_off",
|
|
"summary": "中文摘要",
|
|
"professional_view": "更专业的市场解读,说明主线、资金偏好、潜在误判点",
|
|
"hot_themes": [{"theme": "", "impact": "", "symbols": [], "confidence": 0}],
|
|
"coin_impacts": [{"symbol": "", "direction": "positive|negative|risk|neutral", "reason": "", "confidence": 0, "impact_window": "intraday|1-3d|3-7d", "need_technical_check": False}],
|
|
"risk_events": [{"title": "", "symbols": [], "risk_type": "", "severity": "low|medium|high"}],
|
|
"watchlist": [{"symbol": "", "why": "", "trigger": "", "invalid_if": ""}],
|
|
"suggestions": [{"type": "watch|avoid|verify", "symbol": "", "reason": "", "next_step": ""}],
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def _build_review_payload(item):
|
|
return {
|
|
"target_type": "review",
|
|
"target_id": item.get("id") or item.get("created_at") or item.get("run_date"),
|
|
"run_date": item.get("run_date"),
|
|
"created_at": item.get("created_at"),
|
|
"title": item.get("title"),
|
|
"summary": item.get("summary"),
|
|
"metrics": item.get("metrics") or {},
|
|
"findings": item.get("findings") or [],
|
|
"problems": item.get("problems") or [],
|
|
"actions": item.get("actions") or [],
|
|
"candidate_rules": item.get("candidate_rules") or [],
|
|
"success_analysis": item.get("success_analysis") or {},
|
|
"failure_analysis": item.get("failure_analysis") or {},
|
|
"pollution_summary": item.get("pollution_summary") or {},
|
|
"version_change_summary": item.get("version_change_summary") or "",
|
|
}
|
|
|
|
|
|
def generate_recommendation_insights(limit=30):
|
|
if not get_llm_module_enabled("recommendations"):
|
|
return {"status": "skipped", "reason": "module_disabled", "processed": 0}
|
|
conn = get_conn()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT r.*,
|
|
lpc.price AS latest_cache_price,
|
|
lpc.updated_at AS latest_cache_updated_at
|
|
FROM recommendation r
|
|
LEFT JOIN latest_price_cache lpc ON lpc.symbol = r.symbol
|
|
WHERE r.status='active' AND COALESCE(r.display_bucket,'watch_pool') != 'history'
|
|
ORDER BY r.rec_time DESC
|
|
"""
|
|
).fetchall()
|
|
conn.close()
|
|
items = []
|
|
seen = set()
|
|
for row in rows:
|
|
item = _derive_execution_fields(dict(row))
|
|
if not _should_generate_recommendation(item):
|
|
continue
|
|
if str(item.get("id")) in seen:
|
|
continue
|
|
seen.add(str(item.get("id")))
|
|
items.append(item)
|
|
if limit and len(items) >= int(limit):
|
|
break
|
|
|
|
processed = 0
|
|
for row in items:
|
|
payload = _build_recommendation_payload(row)
|
|
input_hash = compute_input_hash(payload)
|
|
cached = get_any_insight("recommendation", payload["target_id"], PROMPTS["recommendation_explain_v1"], input_hash)
|
|
if cached:
|
|
continue
|
|
result = _call_llm_json(PROMPTS["recommendation_explain_v1"], payload)
|
|
upsert_insight(
|
|
"recommendation",
|
|
payload["target_id"],
|
|
PROMPTS["recommendation_explain_v1"],
|
|
PROMPTS["recommendation_explain_v1"],
|
|
input_hash,
|
|
result.get("status") or "failed",
|
|
input_payload=payload,
|
|
content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")},
|
|
error=result.get("error", ""),
|
|
model=result.get("model", ""),
|
|
)
|
|
processed += 1
|
|
return {"status": "success", "processed": processed, "scanned": len(items)}
|
|
|
|
|
|
def generate_sentiment_insights(limit=30):
|
|
if not get_llm_module_enabled("sentiment"):
|
|
return {"status": "skipped", "reason": "module_disabled", "processed": 0}
|
|
conn = get_conn()
|
|
try:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id AS event_id, source, symbol, title, url, published_at, detected_at, importance,
|
|
event_type, decision, tech_score, rec_id, pushed
|
|
FROM event_news
|
|
ORDER BY published_at::timestamp DESC, id DESC
|
|
LIMIT 120
|
|
"""
|
|
).fetchall()
|
|
except Exception:
|
|
rows = []
|
|
finally:
|
|
conn.close()
|
|
processed = 0
|
|
for raw in rows:
|
|
row = {
|
|
"event_id": f"event_news:{raw[0]}",
|
|
"source": raw[1],
|
|
"symbol": raw[2],
|
|
"title": raw[3],
|
|
"published_at": raw[5],
|
|
"detected_at": raw[6],
|
|
"importance": raw[7],
|
|
"event_type": raw[8],
|
|
"decision": raw[9],
|
|
"tech_score": raw[10],
|
|
"rec_id": raw[11],
|
|
"pushed": raw[12],
|
|
"source_label": "Binance公告" if "binance" in str(raw[1]).lower() else str(raw[1] or ""),
|
|
"related_symbol": raw[2],
|
|
"related_base": (str(raw[2] or "").split("/")[0] or "").upper(),
|
|
"in_active": False,
|
|
"in_screened": False,
|
|
"relation_tag": "",
|
|
}
|
|
if not _should_generate_sentiment(row):
|
|
continue
|
|
payload = _build_sentiment_payload(row)
|
|
input_hash = compute_input_hash(payload)
|
|
cached = get_any_insight("sentiment", payload["target_id"], PROMPTS["sentiment_explain_v1"], input_hash)
|
|
if cached:
|
|
continue
|
|
result = _call_llm_json(PROMPTS["sentiment_explain_v1"], payload)
|
|
upsert_insight(
|
|
"sentiment",
|
|
payload["target_id"],
|
|
PROMPTS["sentiment_explain_v1"],
|
|
PROMPTS["sentiment_explain_v1"],
|
|
input_hash,
|
|
result.get("status") or "failed",
|
|
input_payload=payload,
|
|
content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")},
|
|
error=result.get("error", ""),
|
|
model=result.get("model", ""),
|
|
)
|
|
processed += 1
|
|
if limit and processed >= int(limit):
|
|
break
|
|
return {"status": "success", "processed": processed}
|
|
|
|
|
|
def generate_sentiment_batch_analysis(limit=40, hours=24):
|
|
if not get_llm_module_enabled("sentiment"):
|
|
return {"status": "skipped", "reason": "module_disabled", "processed": 0}
|
|
source_payload = _build_sentiment_batch_payload(hours=hours, limit=limit)
|
|
if not source_payload.get("events"):
|
|
return {"status": "skipped", "reason": "no_sentiment_events", "processed": 0}
|
|
payload = _compact_sentiment_batch_payload(source_payload)
|
|
input_hash = compute_input_hash(payload)
|
|
cached = get_any_insight("sentiment_batch", payload["target_id"], PROMPTS["sentiment_batch_analyze_v1"], input_hash)
|
|
if cached:
|
|
candidate_result = {"queued": 0, "skipped": 0, "symbols": []}
|
|
if cached.get("status") == "success":
|
|
try:
|
|
from app.services.event_driven_screener import enqueue_llm_sentiment_candidates
|
|
|
|
candidate_result = enqueue_llm_sentiment_candidates(
|
|
cached.get("content") or {},
|
|
source_insight_id=str(cached.get("id") or input_hash),
|
|
)
|
|
except Exception as exc:
|
|
candidate_result = {"queued": 0, "skipped": 0, "symbols": [], "error": str(exc)[:300]}
|
|
return {
|
|
"status": "success",
|
|
"processed": 0,
|
|
"cached": True,
|
|
"event_count": payload.get("event_count", 0),
|
|
"candidate_events": candidate_result,
|
|
}
|
|
result = _call_llm_json(PROMPTS["sentiment_batch_analyze_v1"], payload)
|
|
candidate_result = {"queued": 0, "skipped": 0, "symbols": []}
|
|
if result.get("status") == "success":
|
|
try:
|
|
from app.services.event_driven_screener import enqueue_llm_sentiment_candidates
|
|
|
|
candidate_result = enqueue_llm_sentiment_candidates(
|
|
result.get("content") or {},
|
|
source_insight_id=input_hash,
|
|
)
|
|
except Exception as exc:
|
|
candidate_result = {"queued": 0, "skipped": 0, "symbols": [], "error": str(exc)[:300]}
|
|
upsert_insight(
|
|
"sentiment_batch",
|
|
payload["target_id"],
|
|
PROMPTS["sentiment_batch_analyze_v1"],
|
|
PROMPTS["sentiment_batch_analyze_v1"],
|
|
input_hash,
|
|
result.get("status") or "failed",
|
|
input_payload=payload,
|
|
content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")},
|
|
error=result.get("error", ""),
|
|
model=result.get("model", ""),
|
|
)
|
|
return {
|
|
"status": "success",
|
|
"processed": 1,
|
|
"event_count": payload.get("event_count", 0),
|
|
"candidate_events": candidate_result,
|
|
}
|
|
|
|
|
|
def generate_review_memos(limit=10):
|
|
if not get_llm_module_enabled("review"):
|
|
return {"status": "skipped", "reason": "module_disabled", "processed": 0}
|
|
from app.db.review_queries import get_strategy_iteration_logs
|
|
|
|
logs = get_strategy_iteration_logs(limit=max(limit or 10, 1))
|
|
processed = 0
|
|
for item in logs:
|
|
if not _should_generate_review(item):
|
|
continue
|
|
payload = _build_review_payload(item)
|
|
input_hash = compute_input_hash(payload)
|
|
cached = get_any_insight("review", payload["target_id"], PROMPTS["review_memo_v1"], input_hash)
|
|
if cached:
|
|
continue
|
|
result = _call_llm_json(PROMPTS["review_memo_v1"], payload)
|
|
upsert_insight(
|
|
"review",
|
|
payload["target_id"],
|
|
PROMPTS["review_memo_v1"],
|
|
PROMPTS["review_memo_v1"],
|
|
input_hash,
|
|
result.get("status") or "failed",
|
|
input_payload=payload,
|
|
content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")},
|
|
error=result.get("error", ""),
|
|
model=result.get("model", ""),
|
|
)
|
|
processed += 1
|
|
return {"status": "success", "processed": processed}
|
|
|
|
|
|
def run(scope="recommendations", limit=30):
|
|
scope = str(scope or "").strip()
|
|
if scope == "recommendations":
|
|
return generate_recommendation_insights(limit=limit)
|
|
if scope == "sentiment":
|
|
return generate_sentiment_batch_analysis(limit=limit)
|
|
if scope == "sentiment-events":
|
|
return generate_sentiment_insights(limit=limit)
|
|
if scope == "review":
|
|
return generate_review_memos(limit=limit)
|
|
raise ValueError(f"unknown llm scope: {scope}")
|
|
|
|
|
|
def attach_recommendation_insights(items):
|
|
ids = [str(item.get("id")) for item in items or [] if item.get("id") is not None]
|
|
insights = get_insights_for_targets("recommendation", ids, PROMPTS["recommendation_explain_v1"])
|
|
for item in items or []:
|
|
insight = insights.get(str(item.get("id")))
|
|
if insight:
|
|
item["llm_insight"] = insight
|
|
return items
|
|
|
|
|
|
def attach_sentiment_insights(items):
|
|
ids = [str(item.get("event_id") or item.get("id")) for item in items or [] if (item.get("event_id") or item.get("id")) is not None]
|
|
insights = get_insights_for_targets("sentiment", ids, PROMPTS["sentiment_explain_v1"])
|
|
for item in items or []:
|
|
insight = insights.get(str(item.get("event_id") or item.get("id")))
|
|
if insight:
|
|
item["llm_insight"] = insight
|
|
return items
|
|
|
|
|
|
def get_latest_review_memo():
|
|
return get_latest_insight_by_type("review", PROMPTS["review_memo_v1"])
|
|
|
|
|
|
def get_latest_sentiment_batch_analysis():
|
|
return get_latest_insight_by_type("sentiment_batch", PROMPTS["sentiment_batch_analyze_v1"])
|
|
|
|
|
|
def get_latest_sentiment_batch_attempt():
|
|
return get_latest_insight_by_type("sentiment_batch", PROMPTS["sentiment_batch_analyze_v1"], success_only=False)
|
|
|
|
|
|
__all__ = [
|
|
"PROMPTS",
|
|
"attach_recommendation_insights",
|
|
"attach_sentiment_insights",
|
|
"generate_recommendation_insights",
|
|
"generate_review_memos",
|
|
"generate_sentiment_batch_analysis",
|
|
"generate_sentiment_insights",
|
|
"get_latest_sentiment_batch_analysis",
|
|
"get_latest_sentiment_batch_attempt",
|
|
"get_latest_review_memo",
|
|
"run",
|
|
]
|