761 lines
30 KiB
Python
761 lines
30 KiB
Python
"""Conversational crypto research assistant.
|
||
|
||
The assistant is read-only: it can inspect AlphaX data and Binance OHLCV, but
|
||
it never mutates recommendations, strategy state, or trading state.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import math
|
||
import os
|
||
import re
|
||
from datetime import datetime, timedelta
|
||
import ccxt
|
||
import pandas as pd
|
||
import requests
|
||
|
||
from app.config.system_config import llm_config
|
||
from app.core.pa_engine import calc_atr, full_pa_analysis
|
||
from app.db import chat_assistant_db
|
||
from app.db.analytics import get_pipeline_runs
|
||
from app.db.llm_insights import compute_input_hash, repair_mojibake_json, repair_mojibake_text
|
||
from app.db.onchain_db import get_onchain_token_detail, get_onchain_overview
|
||
from app.db.schema import get_conn
|
||
from app.services.llm_insights import get_llm_params
|
||
from app.services.market_overview import get_crypto_market_overview
|
||
|
||
|
||
exchange = ccxt.binance({"enableRateLimit": True})
|
||
|
||
CRYPTO_TERMS = {
|
||
"btc", "eth", "bnb", "sol", "xrp", "doge", "ada", "sui", "link", "qnt",
|
||
"币", "加密", "crypto", "usdt", "binance", "行情", "链上", "舆情", "推荐", "复盘",
|
||
"k线", "k 线", "技术面", "止盈", "止损", "山寨",
|
||
}
|
||
|
||
INTENT_LABELS = {
|
||
"coin_analysis": "单币分析",
|
||
"market_overview": "市场问答",
|
||
"recommendation_explain": "推荐解释",
|
||
"sentiment": "舆情解读",
|
||
"onchain": "链上异动",
|
||
"review": "复盘查询",
|
||
"restricted": "受限内容",
|
||
"help": "帮助",
|
||
"unsupported": "范围外",
|
||
}
|
||
|
||
TIMEFRAMES = ("15m", "1h", "4h", "1d")
|
||
|
||
|
||
def _now() -> str:
|
||
return datetime.now().isoformat(timespec="seconds")
|
||
|
||
|
||
def _safe_float(value, default=0.0) -> float:
|
||
try:
|
||
if value is None or value == "":
|
||
return default
|
||
return float(value)
|
||
except Exception:
|
||
return default
|
||
|
||
|
||
def _safe_int(value, default=0) -> int:
|
||
try:
|
||
return int(value or 0)
|
||
except Exception:
|
||
return default
|
||
|
||
|
||
def _json(value):
|
||
return json.dumps(value or {}, ensure_ascii=False, sort_keys=True, default=str)
|
||
|
||
|
||
def _compact_json(value, max_len=18000):
|
||
text = _json(value)
|
||
if len(text) <= max_len:
|
||
return text
|
||
return text[:max_len] + "...[truncated]"
|
||
|
||
|
||
def _normalize_symbol(value: str) -> str:
|
||
text = str(value or "").strip().upper()
|
||
text = text.replace(" ", "")
|
||
if not text:
|
||
return ""
|
||
if "/" in text:
|
||
base, quote = text.split("/", 1)
|
||
quote = quote or "USDT"
|
||
return f"{base}/USDT" if quote in ("USD", "USDT", "BUSD", "FDUSD") else f"{base}/{quote}"
|
||
text = re.sub(r"[^A-Z0-9]", "", text)
|
||
if text.endswith("USDT") and len(text) > 4:
|
||
text = text[:-4]
|
||
return f"{text}/USDT" if text else ""
|
||
|
||
|
||
def extract_symbol(message: str, session=None, preferences=None) -> str:
|
||
text = str(message or "")
|
||
patterns = [
|
||
r"\b([A-Z0-9]{2,15})\s*/\s*USDT\b",
|
||
r"\b([A-Z0-9]{2,15})USDT\b",
|
||
r"\$([A-Z][A-Z0-9]{1,12})\b",
|
||
]
|
||
for pat in patterns:
|
||
m = re.search(pat, text, flags=re.I)
|
||
if m:
|
||
return _normalize_symbol(m.group(1))
|
||
|
||
upper_tokens = re.findall(r"(?<![A-Za-z0-9])([A-Z][A-Z0-9]{1,12})(?![A-Za-z0-9])", text)
|
||
skip = {"AI", "API", "LLM", "K", "PA", "TP", "SL", "USDT", "USD", "BTCUSDT", "ETHUSDT"}
|
||
for token in upper_tokens:
|
||
if token.upper() not in skip:
|
||
return _normalize_symbol(token)
|
||
|
||
for source in (session or {}, preferences or {}):
|
||
value = source.get("last_symbol") if isinstance(source, dict) else ""
|
||
if value:
|
||
return _normalize_symbol(value)
|
||
preferred = (preferences or {}).get("preferred_symbols") or []
|
||
return _normalize_symbol(preferred[-1]) if preferred else ""
|
||
|
||
|
||
def detect_intent(message: str, symbol: str = "") -> str:
|
||
text = str(message or "").lower()
|
||
if any(k in text for k in ("策略交易", "模拟交易", "纸面交易", "paper trading", "paper-trading", "paper")):
|
||
return "restricted"
|
||
if not _is_crypto_question(text, symbol):
|
||
return "unsupported"
|
||
if any(k in text for k in ("怎么用", "能做什么", "帮助", "help", "问什么")):
|
||
return "help"
|
||
if any(k in text for k in ("链上", "onchain", "鲸鱼", "转账", "dex", "流动性", "合约")):
|
||
return "onchain"
|
||
if any(k in text for k in ("舆情", "新闻", "消息", "情绪", "热点", "叙事", "ai 舆情")):
|
||
return "sentiment"
|
||
if any(k in text for k in ("复盘", "历史", "亏损", "失败", "胜率", "漏选")):
|
||
return "review"
|
||
if any(k in text for k in ("推荐", "看板", "为什么", "等回踩", "可买", "观察", "信号")) and symbol:
|
||
return "recommendation_explain"
|
||
if any(k in text for k in ("市场", "大盘", "全市场", "资金费率", "涨幅榜", "广度")) and not symbol:
|
||
return "market_overview"
|
||
if symbol:
|
||
return "coin_analysis"
|
||
return "market_overview"
|
||
|
||
|
||
def _is_crypto_question(text: str, symbol: str = "") -> bool:
|
||
if symbol:
|
||
return True
|
||
return any(term in text for term in CRYPTO_TERMS)
|
||
|
||
|
||
def _binance_symbol_id(symbol: str) -> str:
|
||
return _normalize_symbol(symbol).replace("/", "")
|
||
|
||
|
||
def fetch_binance_klines(symbol: str, timeframe: str, limit: int = 160):
|
||
symbol = _normalize_symbol(symbol)
|
||
try:
|
||
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
|
||
except Exception:
|
||
return None
|
||
if not ohlcv:
|
||
return None
|
||
df = pd.DataFrame(ohlcv, columns=["timestamp", "open", "high", "low", "close", "volume"])
|
||
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
|
||
return df
|
||
|
||
|
||
def _ma(series, n):
|
||
if series is None or len(series) < n:
|
||
return 0.0
|
||
value = series.rolling(n).mean().iloc[-1]
|
||
return 0.0 if pd.isna(value) else float(value)
|
||
|
||
|
||
def _rsi(closes, period=14):
|
||
if closes is None or len(closes) < period + 2:
|
||
return 0.0
|
||
delta = closes.diff()
|
||
gain = delta.clip(lower=0).rolling(period).mean()
|
||
loss = (-delta.clip(upper=0)).rolling(period).mean()
|
||
rs = gain / loss.replace(0, math.nan)
|
||
value = 100 - (100 / (1 + rs.iloc[-1]))
|
||
return 50.0 if pd.isna(value) else round(float(value), 2)
|
||
|
||
|
||
def _support_resistance(df):
|
||
if df is None or len(df) < 20:
|
||
return {"support": 0, "resistance": 0, "range_position_pct": 0}
|
||
recent = df.tail(48 if len(df) >= 48 else len(df))
|
||
support = float(recent["low"].min())
|
||
resistance = float(recent["high"].max())
|
||
price = float(df["close"].iloc[-1])
|
||
position = (price - support) / (resistance - support) * 100 if resistance > support else 50
|
||
return {
|
||
"support": round(support, 8),
|
||
"resistance": round(resistance, 8),
|
||
"range_position_pct": round(max(0, min(100, position)), 2),
|
||
}
|
||
|
||
|
||
def _technical_summary_for_df(df, timeframe: str) -> dict:
|
||
if df is None or len(df) < 30:
|
||
return {"timeframe": timeframe, "available": False, "reason": "kline_unavailable"}
|
||
closes = df["close"].astype(float)
|
||
volumes = df["volume"].astype(float)
|
||
price = float(closes.iloc[-1])
|
||
prev = float(closes.iloc[-2]) if len(closes) >= 2 else price
|
||
change = ((price / prev) - 1) * 100 if prev > 0 else 0
|
||
change_window = ((price / float(closes.iloc[0])) - 1) * 100 if float(closes.iloc[0]) > 0 else 0
|
||
ma20 = _ma(closes, 20)
|
||
ma60 = _ma(closes, 60)
|
||
avg_vol20 = _ma(volumes, 20)
|
||
vol_ratio = float(volumes.iloc[-1]) / avg_vol20 if avg_vol20 > 0 else 0
|
||
atr = calc_atr(df, 14)
|
||
pa = full_pa_analysis(df, timeframe)
|
||
sr = _support_resistance(df)
|
||
last_candle = {
|
||
"open": round(float(df["open"].iloc[-1]), 8),
|
||
"high": round(float(df["high"].iloc[-1]), 8),
|
||
"low": round(float(df["low"].iloc[-1]), 8),
|
||
"close": round(price, 8),
|
||
"volume": round(float(volumes.iloc[-1]), 4),
|
||
"is_bullish": price >= float(df["open"].iloc[-1]),
|
||
}
|
||
trend = "sideways"
|
||
if ma20 > 0 and ma60 > 0 and price > ma20 > ma60:
|
||
trend = "uptrend"
|
||
elif ma20 > 0 and ma60 > 0 and price < ma20 < ma60:
|
||
trend = "downtrend"
|
||
elif ma20 > 0 and price > ma20:
|
||
trend = "rebound"
|
||
elif ma20 > 0 and price < ma20:
|
||
trend = "weak"
|
||
return {
|
||
"timeframe": timeframe,
|
||
"available": True,
|
||
"last_time": str(df["timestamp"].iloc[-1]),
|
||
"price": round(price, 8),
|
||
"change_last_bar_pct": round(change, 2),
|
||
"change_window_pct": round(change_window, 2),
|
||
"trend": trend,
|
||
"ma20": round(ma20, 8),
|
||
"ma60": round(ma60, 8),
|
||
"rsi14": _rsi(closes, 14),
|
||
"atr14": round(float(atr or 0), 8),
|
||
"volume_ratio_20": round(vol_ratio, 2),
|
||
"support": sr["support"],
|
||
"resistance": sr["resistance"],
|
||
"range_position_pct": sr["range_position_pct"],
|
||
"last_candle": last_candle,
|
||
"pa": {
|
||
"dynamic_count_recent": sum(1 for x in (pa.get("candles_class") or [])[-10:] if x.get("type") == "dynamic"),
|
||
"static_count_recent": sum(1 for x in (pa.get("candles_class") or [])[-10:] if x.get("type") == "static"),
|
||
"top_zones": (pa.get("zones") or [])[:3],
|
||
"continuous_k": (pa.get("continuous_k") or [])[-3:],
|
||
"ignition_points": (pa.get("ignition_points") or [])[-3:],
|
||
"trend_exhaustion": pa.get("trend_exhaustion") or {},
|
||
},
|
||
}
|
||
|
||
|
||
def analyze_symbol_technicals(symbol: str) -> dict:
|
||
symbol = _normalize_symbol(symbol)
|
||
result = {
|
||
"symbol": symbol,
|
||
"source": "binance_spot",
|
||
"binance_tradable": False,
|
||
"timeframes": {},
|
||
"summary": {},
|
||
"errors": [],
|
||
}
|
||
for tf in TIMEFRAMES:
|
||
df = fetch_binance_klines(symbol, tf, limit=180 if tf == "1d" else 160)
|
||
if df is None or len(df) < 30:
|
||
result["timeframes"][tf] = {"timeframe": tf, "available": False, "reason": "binance_kline_unavailable"}
|
||
result["errors"].append(f"{tf}: kline_unavailable")
|
||
continue
|
||
result["binance_tradable"] = True
|
||
result["timeframes"][tf] = _technical_summary_for_df(df, tf)
|
||
|
||
available = [x for x in result["timeframes"].values() if x.get("available")]
|
||
if not available:
|
||
result["summary"] = {"stance": "no_data", "headline": "未拿到 Binance K 线,无法做实时技术面判断。", "risk_level": "unknown"}
|
||
return result
|
||
|
||
h1 = result["timeframes"].get("1h") or {}
|
||
h4 = result["timeframes"].get("4h") or {}
|
||
d1 = result["timeframes"].get("1d") or {}
|
||
bullish = sum(1 for x in available if x.get("trend") in ("uptrend", "rebound") and _safe_float(x.get("volume_ratio_20")) >= 0.8)
|
||
bearish = sum(1 for x in available if x.get("trend") in ("downtrend", "weak") and _safe_float(x.get("change_window_pct")) < 0)
|
||
chase_risk = any(_safe_float(x.get("range_position_pct")) >= 85 and _safe_float(x.get("rsi14")) >= 70 for x in (h1, h4, d1) if x.get("available"))
|
||
exhaustion = (h1.get("pa") or {}).get("trend_exhaustion") or {}
|
||
if bullish >= 3 and not chase_risk:
|
||
stance = "bullish_watch"
|
||
headline = "多周期结构偏强,但仍需要结合入场窗口和风险收益比。"
|
||
elif chase_risk:
|
||
stance = "chase_risk"
|
||
headline = "价格处在区间高位或 RSI 偏热,追高风险需要优先处理。"
|
||
elif bearish >= 2 or exhaustion.get("exhausted"):
|
||
stance = "weak_or_exhausted"
|
||
headline = "短线结构偏弱或出现衰减,不适合直接按强势机会处理。"
|
||
else:
|
||
stance = "neutral"
|
||
headline = "结构还不够一致,适合观察等待更清晰触发。"
|
||
result["summary"] = {
|
||
"stance": stance,
|
||
"headline": headline,
|
||
"risk_level": "high" if chase_risk or exhaustion.get("severity") == "high" else "medium" if bearish else "normal",
|
||
"bullish_timeframe_count": bullish,
|
||
"bearish_timeframe_count": bearish,
|
||
"chase_risk": chase_risk,
|
||
"latest_price": h1.get("price") or available[0].get("price"),
|
||
}
|
||
return result
|
||
|
||
|
||
def _latest_recommendations(symbol: str = "", limit=5):
|
||
conn = get_conn()
|
||
params = []
|
||
where = "1=1"
|
||
if symbol:
|
||
where += " AND symbol=%s"
|
||
params.append(_normalize_symbol(symbol))
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT * FROM recommendation
|
||
WHERE {where}
|
||
ORDER BY id DESC
|
||
LIMIT %s
|
||
""",
|
||
tuple(params + [int(limit)]),
|
||
).fetchall()
|
||
conn.close()
|
||
items = []
|
||
from app.db.altcoin_db import _derive_execution_fields
|
||
|
||
for row in rows:
|
||
item = _derive_execution_fields(dict(row))
|
||
for key in ("market_context_json", "derivatives_context_json", "sector_context_json", "entry_plan_json", "signal_codes_json", "signal_labels_json"):
|
||
value = item.get(key)
|
||
if isinstance(value, str):
|
||
try:
|
||
if value:
|
||
parsed = json.loads(value)
|
||
elif key.endswith("context_json") or key == "entry_plan_json":
|
||
parsed = {}
|
||
else:
|
||
parsed = []
|
||
item[key.replace("_json", "")] = parsed
|
||
except Exception:
|
||
pass
|
||
items.append(item)
|
||
return items
|
||
|
||
|
||
def _sentiment_context(symbol: str = "", limit=8):
|
||
conn = get_conn()
|
||
params = []
|
||
where = "detected_at >= %s"
|
||
params.append((datetime.now() - timedelta(hours=72)).isoformat())
|
||
if symbol:
|
||
where += " AND symbol=%s"
|
||
params.append(_normalize_symbol(symbol))
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT id, source, symbol, title, url, published_at, detected_at, importance, event_type, decision, tech_score, rec_id
|
||
FROM event_news
|
||
WHERE {where}
|
||
ORDER BY detected_at DESC, id DESC
|
||
LIMIT %s
|
||
""",
|
||
tuple(params + [int(limit)]),
|
||
).fetchall()
|
||
conn.close()
|
||
return [dict(row) for row in rows]
|
||
|
||
|
||
def _review_context(symbol: str = "", limit=8):
|
||
conn = get_conn()
|
||
params = []
|
||
where = "1=1"
|
||
if symbol:
|
||
where += " AND rl.symbol=%s"
|
||
params.append(_normalize_symbol(symbol))
|
||
rows = conn.execute(
|
||
f"""
|
||
SELECT rl.*, r.execution_status, r.display_bucket, r.action_status, r.entry_triggered
|
||
FROM review_log rl
|
||
LEFT JOIN recommendation r ON r.id=rl.rec_id
|
||
WHERE {where}
|
||
ORDER BY rl.review_time DESC, rl.id DESC
|
||
LIMIT %s
|
||
""",
|
||
tuple(params + [int(limit)]),
|
||
).fetchall()
|
||
conn.close()
|
||
return [dict(row) for row in rows]
|
||
|
||
def _market_context():
|
||
try:
|
||
market = get_crypto_market_overview()
|
||
except Exception as exc:
|
||
market = {"error": str(exc)[:300]}
|
||
try:
|
||
onchain = get_onchain_overview(hours=24)
|
||
except Exception:
|
||
onchain = {}
|
||
try:
|
||
from app.services.llm_insights import get_latest_sentiment_batch_analysis
|
||
|
||
sentiment = get_latest_sentiment_batch_analysis() or {}
|
||
except Exception:
|
||
sentiment = {}
|
||
return {"market": market, "onchain": onchain, "ai_sentiment": sentiment}
|
||
|
||
|
||
def build_context(intent: str, message: str, symbol: str, preferences=None) -> dict:
|
||
symbol = _normalize_symbol(symbol)
|
||
ctx = {
|
||
"intent": intent,
|
||
"intent_label": INTENT_LABELS.get(intent, intent),
|
||
"symbol": symbol,
|
||
"generated_at": _now(),
|
||
"preferences": preferences or {},
|
||
"sources": [],
|
||
}
|
||
if intent == "unsupported":
|
||
return ctx
|
||
if intent == "restricted":
|
||
return ctx
|
||
if intent in ("coin_analysis", "recommendation_explain", "onchain", "sentiment") and symbol:
|
||
ctx["technicals"] = analyze_symbol_technicals(symbol)
|
||
ctx["recommendations"] = _latest_recommendations(symbol=symbol, limit=5)
|
||
ctx["sentiment_events"] = _sentiment_context(symbol=symbol, limit=8)
|
||
try:
|
||
ctx["onchain"] = get_onchain_token_detail(symbol=symbol, hours=168)
|
||
except Exception as exc:
|
||
ctx["onchain"] = {"error": str(exc)[:200]}
|
||
ctx["reviews"] = _review_context(symbol=symbol, limit=8)
|
||
ctx["sources"] = ["binance_klines", "recommendation", "event_news", "onchain", "review_log"]
|
||
elif intent == "market_overview":
|
||
ctx.update(_market_context())
|
||
ctx["pipeline"] = get_pipeline_runs(limit=5, hours=24, offset=0)
|
||
ctx["sources"] = ["market_overview", "onchain_overview", "llm_sentiment", "pipeline_runs"]
|
||
elif intent == "review":
|
||
ctx["reviews"] = _review_context(symbol=symbol, limit=12)
|
||
ctx["recommendations"] = _latest_recommendations(symbol=symbol, limit=8) if symbol else _latest_recommendations(limit=8)
|
||
ctx["sources"] = ["review_log", "recommendation"]
|
||
elif intent == "sentiment":
|
||
ctx["sentiment_events"] = _sentiment_context(symbol=symbol, limit=20)
|
||
try:
|
||
from app.services.llm_insights import get_latest_sentiment_batch_analysis
|
||
|
||
ctx["ai_sentiment"] = get_latest_sentiment_batch_analysis() or {}
|
||
except Exception:
|
||
ctx["ai_sentiment"] = {}
|
||
ctx["sources"] = ["event_news", "llm_sentiment"]
|
||
elif intent == "onchain":
|
||
if symbol:
|
||
try:
|
||
ctx["onchain"] = get_onchain_token_detail(symbol=symbol, hours=168)
|
||
except Exception as exc:
|
||
ctx["onchain"] = {"error": str(exc)[:200]}
|
||
else:
|
||
ctx["onchain"] = get_onchain_overview(hours=24)
|
||
ctx["sources"] = ["onchain"]
|
||
else:
|
||
ctx["capabilities"] = [
|
||
"单币技术面:自动拉取 Binance 15m/1h/4h/1d K 线",
|
||
"推荐解释:结合当前看板状态、入场窗口、TP/SL、风险理由",
|
||
"舆情和链上:读取当前系统已采集与 AI 解读的结果",
|
||
"复盘:区分信号表现、失效样本和真实推荐结果",
|
||
]
|
||
return ctx
|
||
|
||
|
||
def _fallback_answer(intent: str, message: str, context: dict) -> dict:
|
||
if intent == "restricted":
|
||
return {
|
||
"summary": "内部策略交易数据不可在智能问答中直接访问。",
|
||
"answer": "我不能读取或解释内部策略交易数据。你可以继续问公开行情、单币技术面、推荐解释、链上异动、舆情影响或复盘结果(不含交易账本明细)。",
|
||
"evidence": [],
|
||
"related_records": [],
|
||
"followups": ["分析 BTC/USDT 的技术面", "解释最新推荐为什么不是可买"],
|
||
}
|
||
if intent == "unsupported":
|
||
return {
|
||
"summary": "我只能回答加密货币和 AlphaX 当前数据相关的问题。",
|
||
"answer": "这个问题超出了 Crypto 研究助手的范围。你可以问某个币的技术面、看板推荐原因、链上异动、舆情影响或复盘表现。",
|
||
"evidence": [],
|
||
"related_records": [],
|
||
"followups": ["分析 BTC/USDT 的技术面", "今天市场适合追强势币吗?"],
|
||
}
|
||
symbol = context.get("symbol") or ""
|
||
tech = context.get("technicals") or {}
|
||
tech_summary = tech.get("summary") or {}
|
||
recommendations = context.get("recommendations") or []
|
||
sentiment = context.get("sentiment_events") or []
|
||
onchain = context.get("onchain") or {}
|
||
evidence = []
|
||
if tech_summary:
|
||
evidence.append(f"技术面:{tech_summary.get('headline', '')}")
|
||
if recommendations:
|
||
r = recommendations[0]
|
||
evidence.append(f"主链路:{r.get('execution_label') or r.get('action_status') or r.get('execution_status')},原因:{r.get('execution_reason') or r.get('state_reason') or '--'}")
|
||
if sentiment:
|
||
evidence.append(f"舆情:近 72h 有 {len(sentiment)} 条相关事件,最新为「{sentiment[0].get('title', '')[:60]}」。")
|
||
if isinstance(onchain, dict) and (onchain.get("events") or onchain.get("metrics")):
|
||
evidence.append(f"链上:近 7 天有 {len(onchain.get('events') or [])} 条映射事件。")
|
||
if not evidence:
|
||
evidence.append("当前数据库没有足够样本,结论需要降级为观察。")
|
||
|
||
if intent == "market_overview":
|
||
market = (context.get("market") or {})
|
||
state = market.get("state") or {}
|
||
summary = state.get("label") or "市场数据已读取"
|
||
answer = state.get("summary") or "我读取了全市场行情,但当前没有足够信息形成强结论。"
|
||
elif symbol:
|
||
summary = tech_summary.get("headline") or f"{symbol} 需要继续观察"
|
||
answer = "结论:" + summary + " 证据区已汇总技术面、推荐、舆情、链上和复盘上下文。"
|
||
else:
|
||
summary = "已读取当前系统数据"
|
||
answer = "我已经按你的问题读取了当前数据库,但没有识别到明确币种;可以继续追问具体币种。"
|
||
|
||
return {
|
||
"summary": summary,
|
||
"answer": answer,
|
||
"evidence": evidence[:8],
|
||
"related_records": _related_records(context),
|
||
"followups": _followups(intent, symbol),
|
||
}
|
||
|
||
|
||
def _related_records(context: dict) -> list[dict]:
|
||
records = []
|
||
for item in (context.get("recommendations") or [])[:3]:
|
||
records.append({"type": "recommendation", "label": f"推荐 #{item.get('id')}", "symbol": item.get("symbol"), "status": item.get("execution_status")})
|
||
for item in (context.get("sentiment_events") or [])[:3]:
|
||
records.append({"type": "sentiment", "label": item.get("title"), "symbol": item.get("symbol"), "status": item.get("importance")})
|
||
return records
|
||
|
||
|
||
def _compact_technical_context(context: dict) -> dict:
|
||
tech = context.get("technicals") or {}
|
||
tfs = {}
|
||
for tf, item in (tech.get("timeframes") or {}).items():
|
||
if not item.get("available"):
|
||
tfs[tf] = {"available": False, "reason": item.get("reason") or ""}
|
||
continue
|
||
tfs[tf] = {
|
||
"available": True,
|
||
"price": item.get("price"),
|
||
"trend": item.get("trend"),
|
||
"rsi14": item.get("rsi14"),
|
||
"volume_ratio_20": item.get("volume_ratio_20"),
|
||
"range_position_pct": item.get("range_position_pct"),
|
||
"support": item.get("support"),
|
||
"resistance": item.get("resistance"),
|
||
"pa": {
|
||
"dynamic_count_recent": (item.get("pa") or {}).get("dynamic_count_recent"),
|
||
"static_count_recent": (item.get("pa") or {}).get("static_count_recent"),
|
||
"ignition_count": len((item.get("pa") or {}).get("ignition_points") or []),
|
||
"zone_count": len((item.get("pa") or {}).get("top_zones") or []),
|
||
},
|
||
}
|
||
return {
|
||
"summary": tech.get("summary") or {},
|
||
"binance_tradable": bool(tech.get("binance_tradable")),
|
||
"timeframes": tfs,
|
||
}
|
||
|
||
|
||
def _followups(intent: str, symbol: str = "") -> list[str]:
|
||
if symbol:
|
||
base = symbol.replace("/USDT", "")
|
||
return [
|
||
f"{base} 现在追高风险大吗?",
|
||
f"{base} 的链上和舆情有没有共振?",
|
||
f"{base} 如果要等回踩,关键价位在哪里?",
|
||
]
|
||
if intent == "market_overview":
|
||
return ["今天市场更适合做日内还是 1-3 天波段?", "当前涨幅榜有什么共性?"]
|
||
return ["分析 BTC/USDT 现在的技术面", "解释最新推荐为什么不是可买"]
|
||
|
||
|
||
def _answer_style_for_intent(intent: str) -> str:
|
||
return {
|
||
"coin_analysis": "technical",
|
||
"recommendation_explain": "decision",
|
||
"market_overview": "market",
|
||
"sentiment": "news",
|
||
"onchain": "onchain",
|
||
"review": "review",
|
||
"restricted": "notice",
|
||
"unsupported": "notice",
|
||
"help": "help",
|
||
}.get(intent or "", "default")
|
||
|
||
|
||
def _call_chat_llm(message: str, context: dict, history=None) -> dict:
|
||
cfg = llm_config()
|
||
params = get_llm_params()
|
||
if not bool(cfg.get("enabled", False)) or not os.getenv(str(params.get("api_key_env") or "ALPHAX_LLM_API_KEY"), "").strip():
|
||
return {"status": "skipped", "error": "llm_disabled"}
|
||
if not bool((cfg.get("modules") or {}).get("chat", True)):
|
||
return {"status": "skipped", "error": "chat_module_disabled"}
|
||
api_key = os.getenv(str(params.get("api_key_env") or "ALPHAX_LLM_API_KEY"), "").strip()
|
||
base_url = str(params.get("base_url") or "https://api.openai.com/v1").rstrip("/")
|
||
model = str(params.get("model") or "gpt-4o-mini").strip()
|
||
payload = {
|
||
"user_question": message,
|
||
"context": context,
|
||
"recent_history": (history or [])[-8:],
|
||
"rules": [
|
||
"只回答加密货币、AlphaX 当前数据、技术面、链上、舆情、复盘相关问题,不要访问内部策略交易数据。",
|
||
"不要给真实下单指令,不要修改推荐状态,不要承诺收益。",
|
||
"回答使用中文,采用两段式:先结论,再证据。",
|
||
"根据 intent 选择 answer_style:technical/decision/market/news/onchain/review/notice/help/default。",
|
||
"输出严格 JSON:summary, answer, answer_style, evidence[], risk_flags[], related_records[], followups[]。",
|
||
],
|
||
}
|
||
system_prompt = "你是 AlphaX Agent 的 Crypto 研究助手。你只能基于提供的结构化数据回答,不能编造数据。"
|
||
try:
|
||
resp = requests.post(
|
||
f"{base_url}/chat/completions",
|
||
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
|
||
json={
|
||
"model": model,
|
||
"messages": [
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": _compact_json(payload)},
|
||
],
|
||
"temperature": 0.15,
|
||
"max_tokens": int(params.get("max_tokens") or 900),
|
||
"response_format": {"type": "json_object"},
|
||
},
|
||
timeout=int(params.get("timeout") or 20),
|
||
)
|
||
if resp.status_code >= 400:
|
||
return {"status": "failed", "error": f"http_{resp.status_code}:{resp.text[:300]}", "model": model}
|
||
data = resp.json()
|
||
content = (((data.get("choices") or [{}])[0]).get("message") or {}).get("content") or "{}"
|
||
content = repair_mojibake_text(content)
|
||
parsed = repair_mojibake_json(json.loads(content))
|
||
if not isinstance(parsed, dict):
|
||
raise ValueError("llm_output_not_object")
|
||
parsed.setdefault("summary", parsed.get("answer", "")[:80])
|
||
parsed.setdefault("answer_style", _answer_style_for_intent(context.get("intent")))
|
||
parsed.setdefault("evidence", [])
|
||
parsed.setdefault("risk_flags", [])
|
||
parsed.setdefault("related_records", _related_records(context))
|
||
parsed.setdefault("followups", _followups(context.get("intent"), context.get("symbol")))
|
||
return {"status": "success", "content": parsed, "model": model}
|
||
except Exception as exc:
|
||
return {"status": "failed", "error": str(exc)[:500], "model": model}
|
||
|
||
|
||
def answer_chat(user_id: int, message: str, session_id: int = 0) -> dict:
|
||
message = str(message or "").strip()
|
||
if not message:
|
||
raise ValueError("message_required")
|
||
prefs = chat_assistant_db.get_user_preferences(user_id)
|
||
session = chat_assistant_db.get_chat_session(session_id, user_id) if session_id else None
|
||
symbol = extract_symbol(message, session=session, preferences=prefs)
|
||
intent = detect_intent(message, symbol=symbol)
|
||
if not session:
|
||
title = symbol or message[:24]
|
||
session = chat_assistant_db.create_chat_session(user_id, title=title, last_symbol=symbol, last_intent=intent)
|
||
user_msg = chat_assistant_db.append_chat_message(
|
||
session["id"],
|
||
user_id,
|
||
"user",
|
||
content_text=message,
|
||
content_json={"text": message},
|
||
intent=intent,
|
||
symbol=symbol,
|
||
)
|
||
history = chat_assistant_db.list_chat_messages(session["id"], user_id, limit=12).get("items", [])
|
||
context = build_context(intent, message, symbol, preferences=prefs)
|
||
if intent == "restricted":
|
||
llm_result = {"status": "skipped", "error": "restricted_data_scope"}
|
||
answer = repair_mojibake_json(_fallback_answer(intent, message, context))
|
||
model = ""
|
||
else:
|
||
llm_result = _call_chat_llm(message, context, history=history)
|
||
if llm_result.get("status") == "success":
|
||
answer = repair_mojibake_json(llm_result.get("content") or {})
|
||
model = llm_result.get("model") or ""
|
||
else:
|
||
answer = repair_mojibake_json(_fallback_answer(intent, message, context))
|
||
answer["llm_status"] = llm_result.get("status")
|
||
answer["llm_error"] = llm_result.get("error", "")
|
||
model = llm_result.get("model") or ""
|
||
answer.setdefault("related_records", _related_records(context))
|
||
answer.setdefault("followups", _followups(intent, symbol))
|
||
answer.setdefault("evidence", [])
|
||
answer.setdefault("answer_style", _answer_style_for_intent(intent))
|
||
assistant_text = repair_mojibake_text(answer.get("answer") or answer.get("summary") or "")
|
||
assistant_msg = chat_assistant_db.append_chat_message(
|
||
session["id"],
|
||
user_id,
|
||
"assistant",
|
||
content_text=assistant_text,
|
||
content_json=answer,
|
||
context_json={
|
||
"intent": intent,
|
||
"symbol": symbol,
|
||
"context_hash": compute_input_hash(context),
|
||
"sources": context.get("sources") or [],
|
||
"technical_summary": (context.get("technicals") or {}).get("summary") or {},
|
||
"technicals": _compact_technical_context(context),
|
||
},
|
||
intent=intent,
|
||
symbol=symbol,
|
||
model=model,
|
||
)
|
||
memory = session.get("memory") or {}
|
||
if symbol:
|
||
memory["last_symbol"] = symbol
|
||
memory["last_intent"] = intent
|
||
chat_assistant_db.update_chat_session(
|
||
session["id"],
|
||
user_id,
|
||
title=session.get("title") if session.get("title") != "新对话" else (symbol or message[:24]),
|
||
memory_json=memory,
|
||
last_symbol=symbol,
|
||
last_intent=intent,
|
||
)
|
||
pref_patch = {"last_intent": intent, "recent_topics": [intent]}
|
||
if symbol:
|
||
pref_patch["last_symbol"] = symbol
|
||
pref_patch["preferred_symbols"] = [symbol]
|
||
chat_assistant_db.update_user_preferences(user_id, pref_patch)
|
||
return {
|
||
"ok": True,
|
||
"session": chat_assistant_db.get_chat_session(session["id"], user_id),
|
||
"user_message": user_msg,
|
||
"assistant_message": assistant_msg,
|
||
"answer": answer,
|
||
"intent": intent,
|
||
"intent_label": INTENT_LABELS.get(intent, intent),
|
||
"symbol": symbol,
|
||
"context": {
|
||
"sources": context.get("sources") or [],
|
||
"technicals": context.get("technicals") or {},
|
||
"related_records": answer.get("related_records") or [],
|
||
},
|
||
}
|
||
|
||
|
||
__all__ = [
|
||
"analyze_symbol_technicals",
|
||
"answer_chat",
|
||
"build_context",
|
||
"detect_intent",
|
||
"extract_symbol",
|
||
"fetch_binance_klines",
|
||
]
|