From a53d8dc9f934b3b21d50e25a66e3fd89b0151cfe Mon Sep 17 00:00:00 2001
From: aaron <>
Date: Mon, 18 May 2026 10:04:50 +0800
Subject: [PATCH] update
---
app/config/system_config.py | 1 +
app/db/chat_assistant_db.py | 324 ++++++++
app/db/migrations/0009_chat_assistant.sql | 37 +
app/db/paper_trading.py | 1 +
app/integrations/feishu_push.py | 463 +-----------
app/integrations/feishu_review_push.py | 298 --------
app/integrations/push_orchestrator.py | 43 --
app/services/altcoin_confirm.py | 5 +-
app/services/chat_assistant.py | 755 +++++++++++++++++++
app/services/event_driven_screener.py | 18 +-
app/services/price_tracker.py | 1 -
app/services/review_engine.py | 21 -
app/web/routes_chat.py | 54 ++
app/web/routes_pages.py | 7 +
app/web/shared.py | 9 +
app/web/web_server.py | 2 +
static/base.html | 2 +
static/chat.html | 127 ++++
tests/conftest.py | 3 +
tests/test_chat_assistant.py | 130 ++++
tests/test_price_tracker_watch_only_guard.py | 3 -
21 files changed, 1479 insertions(+), 825 deletions(-)
create mode 100644 app/db/chat_assistant_db.py
create mode 100644 app/db/migrations/0009_chat_assistant.sql
delete mode 100644 app/integrations/feishu_review_push.py
delete mode 100644 app/integrations/push_orchestrator.py
create mode 100644 app/services/chat_assistant.py
create mode 100644 app/web/routes_chat.py
create mode 100644 static/chat.html
create mode 100644 tests/test_chat_assistant.py
diff --git a/app/config/system_config.py b/app/config/system_config.py
index c63e698..b8bc304 100644
--- a/app/config/system_config.py
+++ b/app/config/system_config.py
@@ -63,6 +63,7 @@ def default_llm_config():
"recommendations": _env_bool("ALPHAX_LLM_RECOMMENDATIONS_ENABLED", True),
"sentiment": _env_bool("ALPHAX_LLM_SENTIMENT_ENABLED", True),
"review": _env_bool("ALPHAX_LLM_REVIEW_ENABLED", True),
+ "chat": _env_bool("ALPHAX_LLM_CHAT_ENABLED", True),
},
}
diff --git a/app/db/chat_assistant_db.py b/app/db/chat_assistant_db.py
new file mode 100644
index 0000000..5cb8924
--- /dev/null
+++ b/app/db/chat_assistant_db.py
@@ -0,0 +1,324 @@
+"""Chat assistant persistence helpers."""
+
+from __future__ import annotations
+
+import json
+from datetime import datetime
+
+from app.db.llm_insights import repair_mojibake_json, repair_mojibake_text
+from app.db.postgres_connection import ensure_migrations_once
+from app.db.schema import get_conn
+
+
+def _now() -> str:
+ return datetime.now().isoformat(timespec="seconds")
+
+
+def _loads(value, fallback=None):
+ try:
+ if isinstance(value, str) and value.strip():
+ return repair_mojibake_json(json.loads(value))
+ if value is not None:
+ return repair_mojibake_json(value)
+ except Exception:
+ pass
+ return fallback if fallback is not None else {}
+
+
+def _dumps(value) -> str:
+ return json.dumps(repair_mojibake_json(value if value is not None else {}), ensure_ascii=False, sort_keys=True, default=str)
+
+
+def init_chat_tables():
+ ensure_migrations_once()
+
+
+def _normalize_title(title: str) -> str:
+ title = str(title or "").strip()
+ return title[:32] or "新对话"
+
+
+def _load_session(row):
+ if not row:
+ return None
+ item = dict(row)
+ item["memory"] = _loads(item.pop("memory_json", "{}"), {})
+ return item
+
+
+def _load_message(row):
+ if not row:
+ return None
+ item = dict(row)
+ item["content_text"] = repair_mojibake_text(item.get("content_text", ""))
+ item["content"] = _loads(item.pop("content_json", "{}"), {})
+ item["context"] = _loads(item.pop("context_json", "{}"), {})
+ return item
+
+
+def get_user_preferences(user_id: int) -> dict:
+ init_chat_tables()
+ conn = get_conn()
+ try:
+ row = conn.execute("SELECT * FROM chat_user_preferences WHERE user_id=%s", (int(user_id),)).fetchone()
+ finally:
+ conn.close()
+ if not row:
+ return {
+ "preferred_symbols": [],
+ "preferred_timeframes": ["15m", "1h", "4h", "1d"],
+ "answer_style": "two_stage",
+ "risk_profile": "balanced",
+ "last_intent": "",
+ "last_symbol": "",
+ "recent_topics": [],
+ }
+ prefs = _loads(row.get("preferences_json"), {})
+ prefs.setdefault("preferred_symbols", [])
+ prefs.setdefault("preferred_timeframes", ["15m", "1h", "4h", "1d"])
+ prefs.setdefault("answer_style", "two_stage")
+ prefs.setdefault("risk_profile", "balanced")
+ prefs.setdefault("last_intent", "")
+ prefs.setdefault("last_symbol", "")
+ prefs.setdefault("recent_topics", [])
+ return prefs
+
+
+def update_user_preferences(user_id: int, patch: dict) -> dict:
+ init_chat_tables()
+ current = get_user_preferences(user_id)
+ patch = patch or {}
+ for key, value in patch.items():
+ if key in ("preferred_symbols", "preferred_timeframes", "recent_topics") and isinstance(value, list):
+ merged = list(dict.fromkeys([str(x) for x in current.get(key, []) if str(x).strip()] + [str(x) for x in value if str(x).strip()]))
+ current[key] = merged[-12:]
+ elif value is not None:
+ current[key] = value
+ now = _now()
+ conn = get_conn()
+ try:
+ conn.execute(
+ """
+ INSERT INTO chat_user_preferences (user_id, preferences_json, updated_at)
+ VALUES (%s, %s, %s)
+ ON CONFLICT(user_id) DO UPDATE SET
+ preferences_json=excluded.preferences_json,
+ updated_at=excluded.updated_at
+ """,
+ (int(user_id), _dumps(current), now),
+ )
+ conn.commit()
+ finally:
+ conn.close()
+ return current
+
+
+def create_chat_session(user_id: int, title: str = "", summary: str = "", last_symbol: str = "", last_intent: str = "") -> dict:
+ init_chat_tables()
+ now = _now()
+ conn = get_conn()
+ try:
+ row = conn.execute(
+ """
+ INSERT INTO chat_sessions (user_id, title, summary, memory_json, last_symbol, last_intent, created_at, updated_at)
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
+ RETURNING *
+ """,
+ (int(user_id), _normalize_title(title), summary or "", _dumps({}), last_symbol or "", last_intent or "", now, now),
+ ).fetchone()
+ conn.commit()
+ finally:
+ conn.close()
+ return _load_session(row)
+
+
+def list_chat_sessions(user_id: int, limit: int = 20, offset: int = 0) -> dict:
+ init_chat_tables()
+ limit = max(1, min(int(limit or 20), 100))
+ offset = max(0, int(offset or 0))
+ conn = get_conn()
+ try:
+ total = conn.execute("SELECT COUNT(*) FROM chat_sessions WHERE user_id=%s AND COALESCE(archived_at, '')=''", (int(user_id),)).fetchone()[0]
+ rows = conn.execute(
+ """
+ SELECT s.*,
+ (SELECT m.content_text FROM chat_messages m WHERE m.session_id=s.id ORDER BY m.id DESC LIMIT 1) AS last_message_text,
+ (SELECT m.role FROM chat_messages m WHERE m.session_id=s.id ORDER BY m.id DESC LIMIT 1) AS last_message_role,
+ (SELECT m.created_at FROM chat_messages m WHERE m.session_id=s.id ORDER BY m.id DESC LIMIT 1) AS last_message_at,
+ (SELECT COUNT(*) FROM chat_messages m WHERE m.session_id=s.id) AS message_count
+ FROM chat_sessions s
+ WHERE s.user_id=%s AND COALESCE(s.archived_at, '')=''
+ ORDER BY s.updated_at DESC, s.id DESC
+ LIMIT %s OFFSET %s
+ """,
+ (int(user_id), limit, offset),
+ ).fetchall()
+ finally:
+ conn.close()
+ items = []
+ for row in rows:
+ item = _load_session(row)
+ item["last_message_text"] = item.pop("last_message_text", "")
+ item["last_message_role"] = item.pop("last_message_role", "")
+ item["last_message_at"] = item.pop("last_message_at", "")
+ item["message_count"] = int(item.pop("message_count", 0) or 0)
+ items.append(item)
+ return {
+ "items": items,
+ "total": int(total or 0),
+ "limit": limit,
+ "offset": offset,
+ "has_more": offset + len(items) < int(total or 0),
+ }
+
+
+def get_chat_session(session_id: int, user_id: int) -> dict | None:
+ init_chat_tables()
+ conn = get_conn()
+ try:
+ row = conn.execute(
+ "SELECT * FROM chat_sessions WHERE id=%s AND user_id=%s AND COALESCE(archived_at, '')=''",
+ (int(session_id), int(user_id)),
+ ).fetchone()
+ finally:
+ conn.close()
+ return _load_session(row)
+
+
+def update_chat_session(session_id: int, user_id: int, **fields) -> dict | None:
+ init_chat_tables()
+ session = get_chat_session(session_id, user_id)
+ if not session:
+ return None
+ allowed = {"title", "summary", "memory_json", "last_symbol", "last_intent", "archived_at"}
+ updates = {}
+ for key, value in fields.items():
+ if key not in allowed or value is None:
+ continue
+ updates[key] = value
+ if not updates:
+ return session
+ updates["updated_at"] = _now()
+ if "title" in updates:
+ updates["title"] = _normalize_title(updates["title"])
+ if "memory_json" in updates and not isinstance(updates["memory_json"], str):
+ updates["memory_json"] = _dumps(updates["memory_json"])
+ if updates.get("archived_at") == "now":
+ updates["archived_at"] = _now()
+ sets = ", ".join(f"{key}=%s" for key in updates)
+ params = list(updates.values()) + [int(session_id), int(user_id)]
+ conn = get_conn()
+ try:
+ row = conn.execute(
+ f"UPDATE chat_sessions SET {sets} WHERE id=%s AND user_id=%s RETURNING *",
+ tuple(params),
+ ).fetchone()
+ conn.commit()
+ finally:
+ conn.close()
+ return _load_session(row)
+
+
+def list_chat_messages(session_id: int, user_id: int, limit: int = 50, offset: int = 0) -> dict:
+ init_chat_tables()
+ limit = max(1, min(int(limit or 50), 200))
+ offset = max(0, int(offset or 0))
+ conn = get_conn()
+ try:
+ total = conn.execute(
+ "SELECT COUNT(*) FROM chat_messages WHERE session_id=%s AND user_id=%s",
+ (int(session_id), int(user_id)),
+ ).fetchone()[0]
+ rows = conn.execute(
+ """
+ SELECT * FROM chat_messages
+ WHERE session_id=%s AND user_id=%s
+ ORDER BY id ASC
+ LIMIT %s OFFSET %s
+ """,
+ (int(session_id), int(user_id), limit, offset),
+ ).fetchall()
+ finally:
+ conn.close()
+ return {
+ "items": [_load_message(row) for row in rows],
+ "total": int(total or 0),
+ "limit": limit,
+ "offset": offset,
+ "has_more": offset + len(rows) < int(total or 0),
+ }
+
+
+def append_chat_message(
+ session_id: int,
+ user_id: int,
+ role: str,
+ content_text: str = "",
+ content_json=None,
+ context_json=None,
+ intent: str = "",
+ symbol: str = "",
+ timeframe: str = "",
+ model: str = "",
+) -> dict:
+ init_chat_tables()
+ now = _now()
+ conn = get_conn()
+ try:
+ row = conn.execute(
+ """
+ INSERT INTO chat_messages (
+ session_id, user_id, role, content_text, content_json, context_json,
+ intent, symbol, timeframe, model, created_at
+ ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
+ RETURNING *
+ """,
+ (
+ int(session_id),
+ int(user_id),
+ str(role or "user"),
+ repair_mojibake_text(str(content_text or "")),
+ _dumps(content_json or {}),
+ _dumps(context_json or {}),
+ str(intent or ""),
+ str(symbol or ""),
+ str(timeframe or ""),
+ str(model or ""),
+ now,
+ ),
+ ).fetchone()
+ conn.commit()
+ finally:
+ conn.close()
+ return _load_message(row)
+
+
+def bootstrap_chat(user_id: int) -> dict:
+ prefs = get_user_preferences(user_id)
+ sessions = list_chat_sessions(user_id=user_id, limit=20, offset=0)
+ prompts = [
+ "分析 BTC/USDT 现在的技术面",
+ "解释当前看板里这条推荐为什么是等回踩",
+ "看一下市场总览,今天是偏强还是偏弱",
+ "这个币的链上异动有哪些",
+ "帮我复盘最近一次纸面交易",
+ ]
+ return {
+ "preferences": prefs,
+ "sessions": sessions,
+ "suggested_prompts": prompts,
+ }
+
+
+__all__ = [
+ "append_chat_message",
+ "bootstrap_chat",
+ "create_chat_session",
+ "get_chat_session",
+ "get_user_preferences",
+ "init_chat_tables",
+ "list_chat_messages",
+ "list_chat_sessions",
+ "update_chat_session",
+ "update_user_preferences",
+]
diff --git a/app/db/migrations/0009_chat_assistant.sql b/app/db/migrations/0009_chat_assistant.sql
new file mode 100644
index 0000000..619b22d
--- /dev/null
+++ b/app/db/migrations/0009_chat_assistant.sql
@@ -0,0 +1,37 @@
+CREATE TABLE IF NOT EXISTS chat_sessions (
+ id BIGSERIAL PRIMARY KEY,
+ user_id BIGINT NOT NULL,
+ title TEXT NOT NULL DEFAULT '新对话',
+ summary TEXT DEFAULT '',
+ memory_json TEXT NOT NULL DEFAULT '{}',
+ last_symbol TEXT DEFAULT '',
+ last_intent TEXT DEFAULT '',
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL,
+ archived_at TEXT DEFAULT ''
+);
+CREATE INDEX IF NOT EXISTS idx_chat_sessions_user_updated ON chat_sessions(user_id, updated_at DESC);
+
+CREATE TABLE IF NOT EXISTS chat_messages (
+ id BIGSERIAL PRIMARY KEY,
+ session_id BIGINT NOT NULL,
+ user_id BIGINT NOT NULL,
+ role TEXT NOT NULL,
+ content_text TEXT DEFAULT '',
+ content_json TEXT NOT NULL DEFAULT '{}',
+ context_json TEXT NOT NULL DEFAULT '{}',
+ intent TEXT DEFAULT '',
+ symbol TEXT DEFAULT '',
+ timeframe TEXT DEFAULT '',
+ model TEXT DEFAULT '',
+ created_at TEXT NOT NULL
+);
+CREATE INDEX IF NOT EXISTS idx_chat_messages_session_time ON chat_messages(session_id, created_at ASC);
+CREATE INDEX IF NOT EXISTS idx_chat_messages_user_time ON chat_messages(user_id, created_at DESC);
+
+CREATE TABLE IF NOT EXISTS chat_user_preferences (
+ user_id BIGINT PRIMARY KEY,
+ preferences_json TEXT NOT NULL DEFAULT '{}',
+ updated_at TEXT NOT NULL
+);
+CREATE INDEX IF NOT EXISTS idx_chat_user_preferences_updated ON chat_user_preferences(updated_at DESC);
diff --git a/app/db/paper_trading.py b/app/db/paper_trading.py
index a357065..2bf9fd1 100644
--- a/app/db/paper_trading.py
+++ b/app/db/paper_trading.py
@@ -187,6 +187,7 @@ def _push_event_card(event_type: str, trade: dict, result: dict, event_time: str
if not title:
return
card = {
+ "metadata": {"source": "paper_trading", "event_type": event_type},
"config": {"wide_screen_mode": True},
"header": {
"template": "blue" if event_type == "open" else ("yellow" if event_type.startswith("trailing") else "red"),
diff --git a/app/integrations/feishu_push.py b/app/integrations/feishu_push.py
index 6a11799..6cdd0d0 100644
--- a/app/integrations/feishu_push.py
+++ b/app/integrations/feishu_push.py
@@ -1,12 +1,9 @@
-"""
-山寨币监控飞书卡片推送模块
+"""Feishu push transport for paper trading only."""
-通过飞书机器人 Webhook 直发,不经过 Hermes Agent 的飞书通道。
-Webhook 支持 v2 interactive cards。
-"""
+from __future__ import annotations
import os
-import json
+
import requests
from app.config.system_config import notification_config
@@ -29,441 +26,31 @@ def _feishu_settings():
def push_card(card_content):
- """通过 webhook 推送飞书交互式卡片"""
- payload = {
- "msg_type": "interactive",
- "card": card_content,
- }
- try:
- settings = _feishu_settings()
- if not settings["enabled"]:
- return False, "feishu notification disabled"
- if not settings["webhook_url"]:
- return False, f"{settings['webhook_env']} not configured"
- r = requests.post(settings["webhook_url"], json=payload, timeout=settings["timeout"])
- result = r.json()
- ok = (r.status_code == 200 and result.get("StatusCode") == 0)
- return ok, result
- except Exception as e:
- return False, str(e)
+ """Paper trading cards only.
-
-def push_altcoin_burst_alert(symbol, price, signals, entry_plan, sector="", leader_status="", direction="多头启动"):
+ Non-paper-trading cards are rejected by construction so no other channel can
+ continue using this transport by accident.
"""
- 推荐确认爆发推送 — 只做做多,方向永远多头🟢
- """
- dir_emoji = "🟢"
- dir_color = "green"
- coin_name = symbol.replace('/USDT', '')
+ if not isinstance(card_content, dict):
+ return False, {"skipped": True, "reason": "invalid_card"}
+ metadata = card_content.get("metadata") or {}
+ if str(metadata.get("source") or "").strip().lower() != "paper_trading":
+ return False, {"skipped": True, "reason": "paper_trading_only"}
- entry_lines = ""
- if entry_plan:
- rr_ok = "✅" if entry_plan.get("risk_reward_ok") else "❌"
- entry_lines = f"""---
-**入场方案**:
-• 入场价: ${entry_plan['entry_price']}
-• 入场方式: {entry_plan['entry_method']}
-• 止损价: ${entry_plan['stop_loss']} ({entry_plan['stop_pct']}%)
-• 止盈1: ${entry_plan['tp1']} (RR={entry_plan['rr1']} {rr_ok})
-• 止盈2: ${entry_plan['tp2']} (RR={entry_plan['rr2']})
-• 当前价: ${entry_plan['current_price']}"""
-
- sector_line = f"\n**板块**: {sector}" if sector else ""
- leader_line = f"\n**龙头状态**: {leader_status}" if leader_status else ""
- signal_lines = "\n".join([f" • {s}" for s in signals])
-
- card = {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": dir_color,
- "title": {"tag": "plain_text", "content": f"{dir_emoji} {direction}确认 — {coin_name}"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": f"**方向**: {dir_emoji} {direction}\n**价格**: ${price}{sector_line}{leader_line}\n\n**确认信号**:\n{signal_lines}{entry_lines}",
- },
- },
- {
- "tag": "action",
- "actions": [
- {
- "tag": "button",
- "text": {"tag": "plain_text", "content": f"🔥 {symbol.replace('/USDT','')} 爆发确认"},
- "type": "danger",
- }
- ],
- },
- ],
- }
- return push_card(card)
-
-
-def push_recommendation_state_alert(item, title_prefix=None):
- """主链路推荐状态推送:只渲染 DB/API 已派生好的状态,不做推荐判断。"""
- card = build_recommendation_state_card(item, title_prefix=title_prefix)
- if isinstance(card, tuple):
- ok, reason = card
- return ok, reason
- return push_card(card)
-
-
-def build_recommendation_state_card(item, title_prefix=None):
- """只构建卡片,不负责是否推送、冷却或落库。"""
- if not item:
- return True, {"skipped": True, "reason": "empty_mainline_item"}
- symbol = item.get("symbol", "")
- coin = symbol.replace("/USDT", "")
- execution_status = item.get("execution_status", "")
- action_status = item.get("action_status", "")
- execution_label = item.get("execution_label", "") or action_status or execution_status
- if execution_status == "buy_now":
- color, title = "blue", title_prefix or "入场窗口"
- elif execution_status == "wait_pullback":
- color, title = "yellow", title_prefix or "观察池:等回踩"
- elif execution_status == "observe":
- color, title = "blue", title_prefix or "观察池更新"
- else:
- color, title = "grey", title_prefix or "状态更新"
-
- entry_plan = item.get("entry_plan") or {}
- price = item.get("current_price") or item.get("entry_price") or 0
- entry_ref = entry_plan.get("entry_price") if execution_status == "wait_pullback" else item.get("entry_price")
- if not entry_ref:
- entry_ref = item.get("entry_price") or entry_plan.get("entry_price") or 0
- risk_line = entry_plan.get("stop_loss") or item.get("stop_loss") or 0
- space_ref = entry_plan.get("tp1") or item.get("tp1") or 0
- signals = item.get("signals") or []
- if isinstance(signals, str):
- try:
- signals = json.loads(signals)
- except Exception:
- signals = [signals]
- signal_lines = "\n".join([f" • {x}" for x in signals[:5]]) or " • 主链路状态更新"
- rec_id = item.get("id", "")
- reason = item.get("execution_reason", "")
- ver = item.get("strategy_version", "")
- return {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": color,
- "title": {"tag": "plain_text", "content": f"{title} — {coin}"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": (
- f"**币种**: {symbol}\n"
- f"**主链路状态**: {execution_label}\n"
- f"**当前价**: ${price}\n"
- f"**参考价**: ${entry_ref} | **风险边界**: ${risk_line} | **上方空间参考**: ${space_ref}\n"
- f"**推荐ID**: #{rec_id} | **版本**: {ver}\n"
- f"**说明**: {reason}\n\n"
- f"**信号摘要**:\n{signal_lines}"
- ),
- },
- }
- ],
- }
-
-
-def push_altcoin_accelerating_alert(symbol, price, signals, score, sector="", leader_status="", direction="多头启动"):
- """
- 加速信号推送 — 只做做多🟢
- ⚠️ 用户要求:不再推送到飞书,此函数保留但只写日志
- """
- coin_name = symbol.replace('/USDT', '')
- print(f"[飞书跳过] 🟠 加速信号 — {coin_name} @ ${price} 评分{score}/20 (用户要求不推送)")
- return True, {"skipped": True, "reason": "用户要求不推送加速信号"}
-
-
-def push_altcoin_sector_alert(hot_sectors, leaders_info):
- """
- 推送板块联动告警
- ⚠️ 用户要求:不再推送到飞书,此函数保留但只写日志
- """
- print(f"[飞书跳过] 🔵 板块联动信号 — {len(hot_sectors)}个板块 (用户要求不推送)")
- return True, {"skipped": True, "reason": "用户要求不推送板块联动"}
-
-
-def push_altcoin_tp_sl_alert(symbol, current_price, entry_price, pnl_pct, action_status, signals, stop_loss=0, tp1=0, tp2=0):
- """推送交易执行告警 — 可即刻买入 + 🆕v1.7.8 跟踪止盈触发。止盈/止损/衰减只落库展示,不发飞书。"""
- card = build_trade_action_card(
- symbol=symbol,
- current_price=current_price,
- entry_price=entry_price,
- pnl_pct=pnl_pct,
- action_status=action_status,
- signals=signals,
- stop_loss=stop_loss,
- tp1=tp1,
- tp2=tp2,
- )
- if isinstance(card, tuple):
- ok, reason = card
- return ok, reason
- return push_card(card)
-
-
-def build_trade_action_card(symbol, current_price, entry_price, pnl_pct, action_status, signals, stop_loss=0, tp1=0, tp2=0):
- """只构建交易执行卡片,不做冷却判断或落库。"""
- if action_status not in ("可即刻买入", "跟踪止盈", "移动止盈保护"):
- print(f"[飞书跳过] {symbol} {action_status} — 用户要求止盈/止损/衰减不推送,只在网站展示")
- return True, {"skipped": True, "reason": "only_buy_now_and_trailing_stop_push_enabled"}
-
- if action_status == "移动止盈保护":
- coin = symbol.replace("/USDT", "")
- signal_lines = "\n".join([f" • {s}" for s in signals]) or " • 移动止盈保护已启动"
- trail_info = f"入场${entry_price:.4f} → 当前${current_price:.4f}"
- if pnl_pct > 0:
- trail_info += f"\n**当前浮盈: +{pnl_pct:.2f}%**"
- return {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": "yellow",
- "title": {"tag": "plain_text", "content": f"🛡️ 移动止盈保护启动 — {coin}"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": f"{trail_info}\n\n**保护详情**:\n{signal_lines}\n\n💡 已进入利润保护阶段,后续跌破保护位会触发跟踪止盈。",
- },
- }
- ],
- }
-
- # v1.7.8: 跟踪止盈用独立的醒目卡片
- if action_status == "跟踪止盈":
- coin = symbol.replace("/USDT", "")
- signal_lines = "\n".join([f" • {s}" for s in signals])
- trail_info = f"入场${entry_price:.4f} → 当前${current_price:.4f}"
- if pnl_pct > 0:
- trail_info += f"\n**累计盈利: +{pnl_pct:.2f}%** 📈"
- elif pnl_pct < 0:
- trail_info += f"\n**保本出场: {pnl_pct:.2f}%**"
-
- return {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": "red",
- "title": {"tag": "plain_text", "content": f"🎯 跟踪止盈触发 — {coin}"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": f"{trail_info}\n\n**信号详情**:\n{signal_lines}\n\n💡 跟踪止盈触发,建议立即平仓锁定利润!",
- },
- },
- {
- "tag": "action",
- "actions": [
- {
- "tag": "button",
- "text": {"tag": "plain_text", "content": f"🎯 {coin} 跟踪止盈"},
- "type": "danger",
- }
- ],
- },
- ],
- }
-
- # 当前只保留入场时机到位推送
- event_config = {
- "可即刻买入": ("blue", "🟢", "入场时机到位"),
- }
- cfg = event_config.get(action_status, ("blue", "⚠️", action_status))
- color, emoji, title_prefix = cfg
-
- coin = symbol.replace("/USDT", "")
- signal_lines = "\n".join([f" • {s}" for s in signals])
-
- pnl_emoji = "📈" if pnl_pct > 0 else "📉" if pnl_pct < 0 else "➡️"
- price_lines = f"**入场价**: ${entry_price} → **当前价**: ${current_price} → **盈亏**: {pnl_emoji} {pnl_pct}%"
- if stop_loss > 0:
- price_lines += f"\n**止损**: ${stop_loss}"
- if tp1 > 0:
- price_lines += f"\n**止盈1**: ${tp1}"
-
- return {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": color,
- "title": {"tag": "plain_text", "content": f"{emoji} {title_prefix} — {coin}"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": f"{price_lines}\n\n**操作建议**: {action_status}\n\n**信号详情**:\n{signal_lines}",
- },
- },
- ],
- }
-
-
-def push_altcoin_exhaustion_alert(symbol, current_price, pnl_pct, exhaustion):
- """
- 推送趋势衰减告警 — ⚠️ 橙色卡片
- """
- coin = symbol.replace("/USDT", "")
- severity = exhaustion.get("severity", "low")
- sev_emoji = "⚠️" if severity == "medium" else "🔴"
- ex_signals = exhaustion.get("signals", [])
- signal_lines = "\n".join([f" • {s}" for s in ex_signals])
-
- card = {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": "orange",
- "title": {"tag": "plain_text", "content": f"{sev_emoji} 趋势衰减 — {coin}"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": f"**当前价**: ${current_price} | **盈亏**: {pnl_pct}%\n\n**衰减信号**:\n{signal_lines}\n\n💡 建议:关注止盈机会,趋势可能即将反转",
- },
- },
- ],
- }
- return push_card(card)
-
-
-def push_sentiment_alert(alert):
- """
- 推送舆情异动卡片 — 📢 蓝色信息卡
- alert: {"type": "holding_trending"|"new_trending", "symbol", "name", "trend_rank", "alert"}
- """
- coin = alert["symbol"].replace("/USDT", "")
- alert_type = alert["type"]
- emoji = "🔔" if alert_type == "holding_trending" else "🆕"
- color = "red" if alert_type == "holding_trending" else "blue"
-
- extra = ""
- if alert_type == "holding_trending":
- extra = f"\n⚠️ 持仓币进入热搜,关注价格异动"
-
- card = {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": color,
- "title": {"tag": "plain_text", "content": f"{emoji} 舆情异动 — {coin}"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": (
- f"**{alert['name']}** 进入 CoinGecko Trending #{alert['trend_rank']}\n"
- f"{alert['alert']}{extra}\n\n"
- f"💡 消息面热度上升,建议结合技术面判断入场时机"
- ),
- },
- },
- ],
- }
- return push_card(card)
-
-
-def push_event_driven_alert(event, result, rec_id=0):
- """事件驱动舆情触发选币推送。重大消息触发后,根据技术检查结果分为推荐/观察/风险。"""
- symbol = event.get("symbol", "")
- coin = symbol.replace("/USDT", "")
- decision = result.get("decision", "observe")
- importance = event.get("importance", "")
- title = event.get("title", "")
- source = event.get("source", "")
- url = event.get("url", "")
- published_at = event.get("published_at", "")
- price = result.get("price", 0)
- score = result.get("score", 0)
- reason = result.get("reason", "")
- signals = result.get("signals", [])
- entry_plan = result.get("entry_plan", {}) or {}
-
- if decision == "recommend":
- color, emoji, headline = "red", "🚨", "重大舆情触发:可交易机会"
- elif decision == "risk":
- color, emoji, headline = "orange", "⚠️", "重大舆情风险:不建议追"
- else:
- color, emoji, headline = "blue", "👀", "重大舆情观察:等待技术确认"
-
- signal_lines = "\n".join([f" • {s}" for s in signals[:8]])
- link_line = f"\n**来源链接**: [查看原文]({url})" if url else ""
- entry_lines = ""
- if entry_plan:
- entry_lines = (
- f"\n---\n**交易计划**:\n"
- f"• 动作: {entry_plan.get('entry_action', '')}\n"
- f"• 入场: ${entry_plan.get('entry_price', '')}\n"
- f"• 止损: ${entry_plan.get('stop_loss', '')} ({entry_plan.get('stop_pct', '')}%)\n"
- f"• TP1/TP2: ${entry_plan.get('tp1', '')} / ${entry_plan.get('tp2', '')}"
- )
- rec_line = f"\n**推荐ID**: #{rec_id}" if rec_id else ""
-
- card = {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": color,
- "title": {"tag": "plain_text", "content": f"{emoji} {headline} — {coin}"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": (
- f"**币种**: {symbol}\n"
- f"**重要性**: {importance}级 | **来源**: {source}\n"
- f"**发布时间**: {published_at}\n"
- f"**消息**: {title}{link_line}\n\n"
- f"**技术决策**: {reason}\n"
- f"**当前价**: ${price} | **技术分**: {score}\n"
- f"{rec_line}\n\n"
- f"**触发信号**:\n{signal_lines}"
- f"{entry_lines}"
- ),
- },
- },
- ],
- }
- return push_card(card)
-
-
-if __name__ == "__main__":
- # 测试推送
settings = _feishu_settings()
- print(f"Webhook env: {settings['webhook_env']} configured={bool(settings['webhook_url'])}")
- print("\n测试爆发卡片推送...")
- ok, result = push_altcoin_burst_alert(
- "FET/USDT", 2.15,
- ["1H放量突破阻力(2.3倍)", "1H MACD金叉", "1H 均线多头初成", "15min 5阳线+量递增"],
- {"entry_price": 2.10, "entry_method": "回踩确认", "stop_loss": 1.95,
- "stop_pct": 3.0, "tp1": 2.55, "tp2": 2.80, "rr1": 3.0, "rr2": 5.0,
- "risk_reward_ok": True, "current_price": 2.15, "atr_1h": 0.10},
- sector="AI_DePIN", leader_status="板块龙头(AI_DePIN)",
- )
- print(f"爆发卡片: ok={ok}, result={result}")
+ if not settings["enabled"]:
+ return False, "feishu notification disabled"
+ if not settings["webhook_url"]:
+ return False, f"{settings['webhook_env']} not configured"
- print("\n测试加速卡片推送(应被跳过)...")
- ok2, result2 = push_altcoin_accelerating_alert(
- "ARB/USDT", 0.125,
- ["4H MACD金叉", "4H RSI拐点(35→52)", "板块联动: Layer2龙头启动"],
- score=10, sector="Layer2",
- )
- print(f"加速卡片: ok={ok2}, result={result2}")
+ payload = {"msg_type": "interactive", "card": card_content}
+ try:
+ resp = requests.post(settings["webhook_url"], json=payload, timeout=settings["timeout"])
+ result = resp.json()
+ ok = resp.status_code == 200 and result.get("StatusCode") == 0
+ return ok, result
+ except Exception as exc:
+ return False, str(exc)
- print("\n测试板块联动推送(应被跳过)...")
- ok4, result4 = push_altcoin_sector_alert(["AI"], {"AI": {"leader": "FET/USDT", "leader_pct": 12.5, "is_leader_hot": True}})
- print(f"板块联动: ok={ok4}, result={result4}")
+
+__all__ = ["push_card"]
diff --git a/app/integrations/feishu_review_push.py b/app/integrations/feishu_review_push.py
deleted file mode 100644
index b5d3a9b..0000000
--- a/app/integrations/feishu_review_push.py
+++ /dev/null
@@ -1,298 +0,0 @@
-"""
-飞书复盘报告推送模块
-
-推送三类卡片:
-1. push_review_report — 策略复盘报告(蓝色主题)
-2. push_reverse_analysis_report — 逆向分析报告(紫色主题)
-3. push_rule_update_notification — 新规律通知(绿色主题)
-
-复用 feishu_push.py 的认证模式(load_feishu_creds → get_token → push_card)
-"""
-
-import os
-import sys
-import json
-
-sys.path.insert(0, os.path.dirname(__file__))
-from app.integrations.feishu_push import push_card
-
-CHAT_ID = "oc_2c597ad94167102922de142928e2917a"
-
-
-# ==================== 1. 策略复盘报告 ====================
-
-def push_review_report(review_results):
- """
- 推送策略复盘报告卡片 — 📊 蓝色主题
-
- Section 1: 推荐命中统计 (hit/fail/flat counts, hit rate %)
- Section 2: 信号绩效TOP5 (best performing signals)
- Section 3: 遗漏爆炸 (missed coins, why, what features)
- Section 4: 权重调整 (weight changes)
- """
- reviews = review_results.get("review_details", [])
- weight_adj = review_results.get("weight_adjustments", [])
- missed = review_results.get("missed_explosions", [])
-
- # Section 1: 命中统计
- hit_count = sum(1 for r in reviews if r.get("outcome") == "爆发")
- fail_count = sum(1 for r in reviews if r.get("outcome") == "失败")
- flat_count = sum(1 for r in reviews if r.get("outcome") == "横盘")
- total = len(reviews)
- hit_rate_pct = round(hit_count / total * 100, 1) if total > 0 else 0
-
- # 命中统计文案
- hit_emoji = "🔥" if hit_rate_pct >= 50 else "⚠️" if hit_rate_pct >= 30 else "❌"
- stats_line = (
- f"本次复盘 **{total}** 条推荐:\n"
- f" • 爆发(命中): **{hit_count}** ({hit_emoji})\n"
- f" • 横盘: **{flat_count}**\n"
- f" • 失败: **{fail_count}**\n"
- f" • 命中率: **{hit_rate_pct}%**"
- )
-
- # Section 2: 信号绩效TOP5
- # 从review_results中提取信号绩效信息
- from app.db.altcoin_db import get_signal_weights
- weights = get_signal_weights()
- sig_perf_list = sorted(
- [(sig, data) for sig, data in weights.items() if data.get("total_count", 0) >= 3],
- key=lambda x: x[1].get("hit_rate", 0),
- reverse=True,
- )[:5]
-
- sig_lines = ""
- if sig_perf_list:
- for sig, data in sig_perf_list:
- hr = data.get("hit_rate", 0)
- w = data.get("weight", 0)
- total_n = data.get("total_count", 0)
- cat = data.get("category", "")
- emoji = "✅" if hr >= 50 else "⚠️" if hr >= 30 else "❌"
- sig_lines += f"\n • {emoji} **{sig}**({cat}): 命中率{hr}% | 权重{w} | 样本{total_n}"
- else:
- sig_lines = "\n • 样本不足,暂无绩效数据"
-
- # Section 3: 遗漏爆炸
- missed_lines = ""
- if missed:
- for m in missed[:5]: # 最多展示5只
- symbol = m.get("symbol", "")
- gain = m.get("gain_pct", 0)
- reason = m.get("reason_missed", m.get("reason", ""))
- features = m.get("features_detected", [])
- if isinstance(features, str):
- try:
- features = json.loads(features)
- except:
- features = [features]
- feat_str = ", ".join(str(f) for f in features[:3]) if features else "无"
- missed_lines += f"\n • 💥 **{symbol}** 涨{gain}% | 原因: {reason} | 特征: {feat_str}"
- else:
- missed_lines = "\n • ✅ 无遗漏爆炸"
-
- # Section 4: 权重调整
- adj_lines = ""
- if weight_adj:
- for adj in weight_adj:
- adj_lines += f"\n • {adj}"
- else:
- adj_lines = "\n • 无权重调整"
-
- # 构建卡片
- card = {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": "blue",
- "title": {"tag": "plain_text", "content": "📊 山寨币策略复盘报告"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": (
- f"**=== 推荐命中统计 ===**\n{stats_line}\n\n"
- f"**=== 信号绩效TOP5 ===**\n{sig_lines}\n\n"
- f"**=== 遗漏爆炸 ===**\n{missed_lines}\n\n"
- f"**=== 权重调整 ===**\n{adj_lines}"
- ),
- },
- },
- {
- "tag": "action",
- "actions": [
- {
- "tag": "button",
- "text": {"tag": "plain_text", "content": f"📊 命中率{hit_rate_pct}%"},
- "type": "primary" if hit_rate_pct >= 50 else "warning" if hit_rate_pct >= 30 else "danger",
- }
- ],
- },
- ],
- }
- return push_card(card)
-
-
-# ==================== 2. 逆向分析报告 ====================
-
-def push_reverse_analysis_report(reverse_results):
- """
- 推送逆向分析报告卡片 — 🔍 紫色主题
-
- Section 1: 今日涨幅榜TOP10 (symbol, gain%, sector)
- Section 2: 起爆前共性特征 (pattern summary with percentages)
- Section 3: 新发现规律 (any new rules)
- """
- top_gainers = reverse_results.get("top_gainers", [])
- pattern_summary = reverse_results.get("pattern_summary", [])
- new_rules = reverse_results.get("new_rules", [])
- total_unrecommended = reverse_results.get("total_unrecommended", 0)
- total_analyzed = reverse_results.get("total_analyzed", 0)
-
- # Section 1: 涨幅榜TOP10
- gainer_lines = ""
- for i, g in enumerate(top_gainers[:10], 1):
- symbol = g.get("symbol", "").replace("/USDT", "")
- gain = g.get("gain_pct", 0)
- sector = g.get("sector", [])
- sector_str = sector[0] if isinstance(sector, list) and sector else (sector if sector else "未知")
- volume = g.get("volume_24h", 0)
- vol_str = f"${volume / 1e6:.1f}M" if volume > 0 else ""
- gainer_lines += f"\n {i}. **{symbol}** +{gain}% | {sector_str} | {vol_str}"
-
- if not gainer_lines:
- gainer_lines = "\n • 今日无明显涨幅"
-
- # Section 2: 起爆前共性特征
- pattern_lines = ""
- for p in pattern_summary[:8]: # 最多展示8个特征
- label = p.get("label", p.get("feature", ""))
- pct = p.get("percentage", 0)
- count = p.get("count", 0)
- total = p.get("total", 0)
- bar = "█" * int(pct / 10) + "░" * (10 - int(pct / 10))
- emoji = "🔥" if pct >= 60 else "✅" if pct >= 40 else "⚠️" if pct >= 20 else "❌"
- pattern_lines += f"\n • {emoji} **{label}**: {pct}%({count}/{total}) {bar}"
-
- if not pattern_lines:
- pattern_lines = "\n • 分析样本不足"
-
- # Section 3: 新发现规律
- rule_lines = ""
- if new_rules:
- for r in new_rules:
- rule_id = r.get("rule_id", "")
- desc = r.get("description", "")
- score_adj = r.get("score_adjust", 0)
- rule_type = r.get("type", "")
- rule_lines += f"\n • 🧠 **{rule_id}**: {desc} → 评分{score_adj}({rule_type})"
- else:
- rule_lines = "\n • 暂无新规律达到显著性阈值"
-
- # 分析概况
- overview = (
- f"涨幅榜共{len(top_gainers)}只 ≥10%\n"
- f"未被推荐: {total_unrecommended}只\n"
- f"已做PA分析: {total_analyzed}只"
- )
-
- # 构建卡片(紫色主题用 "violet" — 飞书卡片没有purple,用indigo近似)
- card = {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": "indigo",
- "title": {"tag": "plain_text", "content": "🔍 逆向分析报告 — 涨幅榜复盘"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": (
- f"**{overview}**\n\n"
- f"**=== 今日涨幅榜TOP10 ===**\n{gainer_lines}\n\n"
- f"**=== 起爆前共性特征 ===**\n{pattern_lines}\n\n"
- f"**=== 新发现规律 ===**\n{rule_lines}"
- ),
- },
- },
- {
- "tag": "action",
- "actions": [
- {
- "tag": "button",
- "text": {"tag": "plain_text", "content": f"🔍 分析{total_analyzed}只暴涨币"},
- "type": "primary",
- }
- ],
- },
- ],
- }
- return push_card(card)
-
-
-# ==================== 3. 新规律通知 ====================
-
-def push_rule_update_notification(rule_id, description, status="候选规则,未生效"):
- """
- 推送新规律学习通知 — 🧠 绿色主题
- 简洁卡片,告知策略自动迭代
- """
- card = {
- "config": {"wide_screen_mode": True},
- "header": {
- "template": "green",
- "title": {"tag": "plain_text", "content": "🧠 策略自学习 — 候选规则发现"},
- },
- "elements": [
- {
- "tag": "div",
- "text": {
- "tag": "lark_md",
- "content": (
- f"规则ID: **{rule_id}**\n\n"
- f"状态: **{status}**\n\n"
- f"描述: {description}\n\n"
- f"说明: 该规则仅进入候选池/灰度评估,未通过发布闸门前不会写入正式规则库,也不会影响下次选币。"
- ),
- },
- },
- {
- "tag": "action",
- "actions": [
- {
- "tag": "button",
- "text": {"tag": "plain_text", "content": f"🧠 {rule_id}"},
- "type": "primary",
- }
- ],
- },
- ],
- }
- return push_card(card)
-
-
-# ==================== 测试 ====================
-
-if __name__ == "__main__":
- print("测试复盘报告推送...")
-
- # 测试review report
- test_review = {
- "review_details": [
- {"symbol": "FET/USDT", "outcome": "爆发", "pnl_48h": 12.5},
- {"symbol": "ARB/USDT", "outcome": "横盘", "pnl_48h": 1.2},
- {"symbol": "PEPE/USDT", "outcome": "失败", "pnl_48h": -4.5},
- ],
- "weight_adjustments": ["量价齐飞: 3→4.0 (命中率67%)"],
- "missed_explosions": [
- {"symbol": "INJ/USDT", "gain_pct": 25, "reason_missed": "细筛淘汰(score=4)", "features_detected": ["ignition_point", "Q7_zone"]},
- ],
- }
- ok1, r1 = push_review_report(test_review)
- print(f"复盘报告: ok={ok1}")
-
- # 测试rule notification
- ok2, r2 = push_rule_update_notification("rule_20260429_001", "涨幅榜60%有起爆点 → 起爆点是爆发前必现信号")
- print(f"规律通知: ok={ok2}")
diff --git a/app/integrations/push_orchestrator.py b/app/integrations/push_orchestrator.py
deleted file mode 100644
index f67a4dc..0000000
--- a/app/integrations/push_orchestrator.py
+++ /dev/null
@@ -1,43 +0,0 @@
-"""Push orchestration helpers.
-
-Separates eligibility / cooldown decisions from payload rendering and transport.
-"""
-
-from app.db.recommendation_queries import log_push, should_push
-from app.integrations.feishu_push import build_trade_action_card, push_card
-
-
-def push_mainline_state_update(symbol: str, rec_id: int, mainline_item: dict, title_prefix: str | None = None, entry_push_type: str = "entry", watch_push_type: str = "watch_pool") -> bool:
- """主链路状态只记录,不再飞书推送。"""
- status = mainline_item.get("execution_status") if mainline_item else "missing"
- action = mainline_item.get("action_status", "") if mainline_item else ""
- print(f"[push] skip {symbol}: mainline notifications disabled (status={status}, action={action})")
- return False
-
-
-def push_trade_action_update(symbol: str, rec_id: int, state_decision: dict, final_action: str, push_type: str = "entry") -> bool:
- if not state_decision.get("push_required"):
- return False
- if not should_push(symbol, push_type, final_action):
- print(f"⏭ 跳过推送({symbol}): {push_type}/{final_action} 12h冷却中")
- return False
- card = build_trade_action_card(
- state_decision["push_symbol"],
- state_decision["push_current_price"],
- state_decision["push_entry_price"],
- state_decision["push_pnl_pct"],
- final_action,
- state_decision.get("push_signals", []),
- state_decision.get("stop_loss", 0),
- state_decision.get("tp1", 0),
- state_decision.get("tp2", 0),
- )
- if isinstance(card, tuple):
- ok, resp = card
- else:
- ok, resp = push_card(card)
- if ok:
- log_push(symbol, push_type, final_action, rec_id=rec_id)
- return True
- print(f"飞书推送失败({symbol}): {resp}")
- return False
diff --git a/app/services/altcoin_confirm.py b/app/services/altcoin_confirm.py
index be8b7ff..1b28609 100644
--- a/app/services/altcoin_confirm.py
+++ b/app/services/altcoin_confirm.py
@@ -29,9 +29,8 @@ from app.core.sector_map import get_burst_threshold, is_meme_coin, get_sector_fo
from app.db.altcoin_db import (
init_db, expire_old_states, expire_old_recommendations,
get_candidates_for_confirm, update_state, get_conn, create_recommendation, log_screening,
- log_cron_run, update_latest_price_cache, get_recommendation_for_push,
+ log_cron_run, update_latest_price_cache,
)
-from app.integrations.push_orchestrator import push_mainline_state_update
from app.config.config_loader import (
get_strategy_direction,
vp_fly_params,
@@ -1334,8 +1333,6 @@ def main(compact: bool = False):
update_latest_price_cache(symbol, result["price"], updated_at=datetime.now().isoformat(), source="confirm")
result["rec_id"] = rec_id
- mainline_item = get_recommendation_for_push(rec_id)
- push_mainline_state_update(symbol, rec_id, mainline_item)
else:
cand_detail = json.loads(cand.get("detail_json", "{}"))
log_screening(
diff --git a/app/services/chat_assistant.py b/app/services/chat_assistant.py
new file mode 100644
index 0000000..8ea01a2
--- /dev/null
+++ b/app/services/chat_assistant.py
@@ -0,0 +1,755 @@
+"""Conversational crypto research assistant.
+
+The assistant is read-only: it can inspect AlphaX data and Binance OHLCV, but
+it never mutates recommendations, strategy state, or trading state.
+"""
+
+from __future__ import annotations
+
+import json
+import math
+import os
+import re
+from datetime import datetime, timedelta
+import ccxt
+import pandas as pd
+import requests
+
+from app.config.system_config import llm_config
+from app.core.pa_engine import calc_atr, full_pa_analysis
+from app.db import chat_assistant_db
+from app.db.analytics import get_pipeline_runs
+from app.db.llm_insights import compute_input_hash, repair_mojibake_json, repair_mojibake_text
+from app.db.onchain_db import get_onchain_token_detail, get_onchain_overview
+from app.db.paper_trading import get_paper_trading_summary, list_paper_trade_events, list_paper_trades
+from app.db.schema import get_conn
+from app.services.llm_insights import get_llm_params
+from app.services.market_overview import get_crypto_market_overview
+
+
+exchange = ccxt.binance({"enableRateLimit": True})
+
+CRYPTO_TERMS = {
+ "btc", "eth", "bnb", "sol", "xrp", "doge", "ada", "sui", "link", "qnt",
+ "币", "加密", "crypto", "usdt", "binance", "行情", "链上", "舆情", "推荐", "复盘",
+ "k线", "k 线", "技术面", "止盈", "止损", "纸面", "模拟交易", "仓位", "山寨",
+}
+
+INTENT_LABELS = {
+ "coin_analysis": "单币分析",
+ "market_overview": "市场问答",
+ "recommendation_explain": "推荐解释",
+ "sentiment": "舆情解读",
+ "onchain": "链上异动",
+ "review": "复盘查询",
+ "paper_trading": "模拟交易",
+ "help": "帮助",
+ "unsupported": "范围外",
+}
+
+TIMEFRAMES = ("15m", "1h", "4h", "1d")
+
+
+def _now() -> str:
+ return datetime.now().isoformat(timespec="seconds")
+
+
+def _safe_float(value, default=0.0) -> float:
+ try:
+ if value is None or value == "":
+ return default
+ return float(value)
+ except Exception:
+ return default
+
+
+def _safe_int(value, default=0) -> int:
+ try:
+ return int(value or 0)
+ except Exception:
+ return default
+
+
+def _json(value):
+ return json.dumps(value or {}, ensure_ascii=False, sort_keys=True, default=str)
+
+
+def _compact_json(value, max_len=18000):
+ text = _json(value)
+ if len(text) <= max_len:
+ return text
+ return text[:max_len] + "...[truncated]"
+
+
+def _normalize_symbol(value: str) -> str:
+ text = str(value or "").strip().upper()
+ text = text.replace(" ", "")
+ if not text:
+ return ""
+ if "/" in text:
+ base, quote = text.split("/", 1)
+ quote = quote or "USDT"
+ return f"{base}/USDT" if quote in ("USD", "USDT", "BUSD", "FDUSD") else f"{base}/{quote}"
+ text = re.sub(r"[^A-Z0-9]", "", text)
+ if text.endswith("USDT") and len(text) > 4:
+ text = text[:-4]
+ return f"{text}/USDT" if text else ""
+
+
+def extract_symbol(message: str, session=None, preferences=None) -> str:
+ text = str(message or "")
+ patterns = [
+ r"\b([A-Z0-9]{2,15})\s*/\s*USDT\b",
+ r"\b([A-Z0-9]{2,15})USDT\b",
+ r"\$([A-Z][A-Z0-9]{1,12})\b",
+ ]
+ for pat in patterns:
+ m = re.search(pat, text, flags=re.I)
+ if m:
+ return _normalize_symbol(m.group(1))
+
+ upper_tokens = re.findall(r"(? str:
+ text = str(message or "").lower()
+ if not _is_crypto_question(text, symbol):
+ return "unsupported"
+ if any(k in text for k in ("怎么用", "能做什么", "帮助", "help", "问什么")):
+ return "help"
+ if any(k in text for k in ("模拟交易", "paper", "开仓", "平仓", "持仓", "收益", "仓位")):
+ return "paper_trading"
+ if any(k in text for k in ("链上", "onchain", "鲸鱼", "转账", "dex", "流动性", "合约")):
+ return "onchain"
+ if any(k in text for k in ("舆情", "新闻", "消息", "情绪", "热点", "叙事", "ai 舆情")):
+ return "sentiment"
+ if any(k in text for k in ("复盘", "历史", "亏损", "失败", "胜率", "漏选")):
+ return "review"
+ if any(k in text for k in ("推荐", "看板", "为什么", "等回踩", "可买", "观察", "信号")) and symbol:
+ return "recommendation_explain"
+ if any(k in text for k in ("市场", "大盘", "全市场", "资金费率", "涨幅榜", "广度")) and not symbol:
+ return "market_overview"
+ if symbol:
+ return "coin_analysis"
+ return "market_overview"
+
+
+def _is_crypto_question(text: str, symbol: str = "") -> bool:
+ if symbol:
+ return True
+ return any(term in text for term in CRYPTO_TERMS)
+
+
+def _binance_symbol_id(symbol: str) -> str:
+ return _normalize_symbol(symbol).replace("/", "")
+
+
+def fetch_binance_klines(symbol: str, timeframe: str, limit: int = 160):
+ symbol = _normalize_symbol(symbol)
+ try:
+ ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
+ except Exception:
+ return None
+ if not ohlcv:
+ return None
+ df = pd.DataFrame(ohlcv, columns=["timestamp", "open", "high", "low", "close", "volume"])
+ df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
+ return df
+
+
+def _ma(series, n):
+ if series is None or len(series) < n:
+ return 0.0
+ value = series.rolling(n).mean().iloc[-1]
+ return 0.0 if pd.isna(value) else float(value)
+
+
+def _rsi(closes, period=14):
+ if closes is None or len(closes) < period + 2:
+ return 0.0
+ delta = closes.diff()
+ gain = delta.clip(lower=0).rolling(period).mean()
+ loss = (-delta.clip(upper=0)).rolling(period).mean()
+ rs = gain / loss.replace(0, math.nan)
+ value = 100 - (100 / (1 + rs.iloc[-1]))
+ return 50.0 if pd.isna(value) else round(float(value), 2)
+
+
+def _support_resistance(df):
+ if df is None or len(df) < 20:
+ return {"support": 0, "resistance": 0, "range_position_pct": 0}
+ recent = df.tail(48 if len(df) >= 48 else len(df))
+ support = float(recent["low"].min())
+ resistance = float(recent["high"].max())
+ price = float(df["close"].iloc[-1])
+ position = (price - support) / (resistance - support) * 100 if resistance > support else 50
+ return {
+ "support": round(support, 8),
+ "resistance": round(resistance, 8),
+ "range_position_pct": round(max(0, min(100, position)), 2),
+ }
+
+
+def _technical_summary_for_df(df, timeframe: str) -> dict:
+ if df is None or len(df) < 30:
+ return {"timeframe": timeframe, "available": False, "reason": "kline_unavailable"}
+ closes = df["close"].astype(float)
+ volumes = df["volume"].astype(float)
+ price = float(closes.iloc[-1])
+ prev = float(closes.iloc[-2]) if len(closes) >= 2 else price
+ change = ((price / prev) - 1) * 100 if prev > 0 else 0
+ change_window = ((price / float(closes.iloc[0])) - 1) * 100 if float(closes.iloc[0]) > 0 else 0
+ ma20 = _ma(closes, 20)
+ ma60 = _ma(closes, 60)
+ avg_vol20 = _ma(volumes, 20)
+ vol_ratio = float(volumes.iloc[-1]) / avg_vol20 if avg_vol20 > 0 else 0
+ atr = calc_atr(df, 14)
+ pa = full_pa_analysis(df, timeframe)
+ sr = _support_resistance(df)
+ last_candle = {
+ "open": round(float(df["open"].iloc[-1]), 8),
+ "high": round(float(df["high"].iloc[-1]), 8),
+ "low": round(float(df["low"].iloc[-1]), 8),
+ "close": round(price, 8),
+ "volume": round(float(volumes.iloc[-1]), 4),
+ "is_bullish": price >= float(df["open"].iloc[-1]),
+ }
+ trend = "sideways"
+ if ma20 > 0 and ma60 > 0 and price > ma20 > ma60:
+ trend = "uptrend"
+ elif ma20 > 0 and ma60 > 0 and price < ma20 < ma60:
+ trend = "downtrend"
+ elif ma20 > 0 and price > ma20:
+ trend = "rebound"
+ elif ma20 > 0 and price < ma20:
+ trend = "weak"
+ return {
+ "timeframe": timeframe,
+ "available": True,
+ "last_time": str(df["timestamp"].iloc[-1]),
+ "price": round(price, 8),
+ "change_last_bar_pct": round(change, 2),
+ "change_window_pct": round(change_window, 2),
+ "trend": trend,
+ "ma20": round(ma20, 8),
+ "ma60": round(ma60, 8),
+ "rsi14": _rsi(closes, 14),
+ "atr14": round(float(atr or 0), 8),
+ "volume_ratio_20": round(vol_ratio, 2),
+ "support": sr["support"],
+ "resistance": sr["resistance"],
+ "range_position_pct": sr["range_position_pct"],
+ "last_candle": last_candle,
+ "pa": {
+ "dynamic_count_recent": sum(1 for x in (pa.get("candles_class") or [])[-10:] if x.get("type") == "dynamic"),
+ "static_count_recent": sum(1 for x in (pa.get("candles_class") or [])[-10:] if x.get("type") == "static"),
+ "top_zones": (pa.get("zones") or [])[:3],
+ "continuous_k": (pa.get("continuous_k") or [])[-3:],
+ "ignition_points": (pa.get("ignition_points") or [])[-3:],
+ "trend_exhaustion": pa.get("trend_exhaustion") or {},
+ },
+ }
+
+
+def analyze_symbol_technicals(symbol: str) -> dict:
+ symbol = _normalize_symbol(symbol)
+ result = {
+ "symbol": symbol,
+ "source": "binance_spot",
+ "binance_tradable": False,
+ "timeframes": {},
+ "summary": {},
+ "errors": [],
+ }
+ for tf in TIMEFRAMES:
+ df = fetch_binance_klines(symbol, tf, limit=180 if tf == "1d" else 160)
+ if df is None or len(df) < 30:
+ result["timeframes"][tf] = {"timeframe": tf, "available": False, "reason": "binance_kline_unavailable"}
+ result["errors"].append(f"{tf}: kline_unavailable")
+ continue
+ result["binance_tradable"] = True
+ result["timeframes"][tf] = _technical_summary_for_df(df, tf)
+
+ available = [x for x in result["timeframes"].values() if x.get("available")]
+ if not available:
+ result["summary"] = {"stance": "no_data", "headline": "未拿到 Binance K 线,无法做实时技术面判断。", "risk_level": "unknown"}
+ return result
+
+ h1 = result["timeframes"].get("1h") or {}
+ h4 = result["timeframes"].get("4h") or {}
+ d1 = result["timeframes"].get("1d") or {}
+ bullish = sum(1 for x in available if x.get("trend") in ("uptrend", "rebound") and _safe_float(x.get("volume_ratio_20")) >= 0.8)
+ bearish = sum(1 for x in available if x.get("trend") in ("downtrend", "weak") and _safe_float(x.get("change_window_pct")) < 0)
+ chase_risk = any(_safe_float(x.get("range_position_pct")) >= 85 and _safe_float(x.get("rsi14")) >= 70 for x in (h1, h4, d1) if x.get("available"))
+ exhaustion = (h1.get("pa") or {}).get("trend_exhaustion") or {}
+ if bullish >= 3 and not chase_risk:
+ stance = "bullish_watch"
+ headline = "多周期结构偏强,但仍需要结合入场窗口和风险收益比。"
+ elif chase_risk:
+ stance = "chase_risk"
+ headline = "价格处在区间高位或 RSI 偏热,追高风险需要优先处理。"
+ elif bearish >= 2 or exhaustion.get("exhausted"):
+ stance = "weak_or_exhausted"
+ headline = "短线结构偏弱或出现衰减,不适合直接按强势机会处理。"
+ else:
+ stance = "neutral"
+ headline = "结构还不够一致,适合观察等待更清晰触发。"
+ result["summary"] = {
+ "stance": stance,
+ "headline": headline,
+ "risk_level": "high" if chase_risk or exhaustion.get("severity") == "high" else "medium" if bearish else "normal",
+ "bullish_timeframe_count": bullish,
+ "bearish_timeframe_count": bearish,
+ "chase_risk": chase_risk,
+ "latest_price": h1.get("price") or available[0].get("price"),
+ }
+ return result
+
+
+def _latest_recommendations(symbol: str = "", limit=5):
+ conn = get_conn()
+ params = []
+ where = "1=1"
+ if symbol:
+ where += " AND symbol=%s"
+ params.append(_normalize_symbol(symbol))
+ rows = conn.execute(
+ f"""
+ SELECT * FROM recommendation
+ WHERE {where}
+ ORDER BY id DESC
+ LIMIT %s
+ """,
+ tuple(params + [int(limit)]),
+ ).fetchall()
+ conn.close()
+ items = []
+ from app.db.altcoin_db import _derive_execution_fields
+
+ for row in rows:
+ item = _derive_execution_fields(dict(row))
+ for key in ("market_context_json", "derivatives_context_json", "sector_context_json", "entry_plan_json", "signal_codes_json", "signal_labels_json"):
+ value = item.get(key)
+ if isinstance(value, str):
+ try:
+ if value:
+ parsed = json.loads(value)
+ elif key.endswith("context_json") or key == "entry_plan_json":
+ parsed = {}
+ else:
+ parsed = []
+ item[key.replace("_json", "")] = parsed
+ except Exception:
+ pass
+ items.append(item)
+ return items
+
+
+def _sentiment_context(symbol: str = "", limit=8):
+ conn = get_conn()
+ params = []
+ where = "detected_at >= %s"
+ params.append((datetime.now() - timedelta(hours=72)).isoformat())
+ if symbol:
+ where += " AND symbol=%s"
+ params.append(_normalize_symbol(symbol))
+ rows = conn.execute(
+ f"""
+ SELECT id, source, symbol, title, url, published_at, detected_at, importance, event_type, decision, tech_score, rec_id
+ FROM event_news
+ WHERE {where}
+ ORDER BY detected_at DESC, id DESC
+ LIMIT %s
+ """,
+ tuple(params + [int(limit)]),
+ ).fetchall()
+ conn.close()
+ return [dict(row) for row in rows]
+
+
+def _review_context(symbol: str = "", limit=8):
+ conn = get_conn()
+ params = []
+ where = "1=1"
+ if symbol:
+ where += " AND rl.symbol=%s"
+ params.append(_normalize_symbol(symbol))
+ rows = conn.execute(
+ f"""
+ SELECT rl.*, r.execution_status, r.display_bucket, r.action_status, r.entry_triggered
+ FROM review_log rl
+ LEFT JOIN recommendation r ON r.id=rl.rec_id
+ WHERE {where}
+ ORDER BY rl.review_time DESC, rl.id DESC
+ LIMIT %s
+ """,
+ tuple(params + [int(limit)]),
+ ).fetchall()
+ conn.close()
+ return [dict(row) for row in rows]
+
+
+def _paper_context(symbol: str = ""):
+ if symbol:
+ trades = list_paper_trades(limit=10, status="")
+ symbol_norm = _normalize_symbol(symbol)
+ items = [x for x in trades.get("items", []) if x.get("symbol") == symbol_norm][:5]
+ events = list_paper_trade_events(limit=20, symbol=symbol_norm)
+ return {"trades": items, "events": events.get("items", [])}
+ return {
+ "summary": get_paper_trading_summary(days=30),
+ "trades": list_paper_trades(limit=5).get("items", []),
+ "events": list_paper_trade_events(limit=8).get("items", []),
+ }
+
+
+def _market_context():
+ try:
+ market = get_crypto_market_overview()
+ except Exception as exc:
+ market = {"error": str(exc)[:300]}
+ try:
+ onchain = get_onchain_overview(hours=24)
+ except Exception:
+ onchain = {}
+ try:
+ from app.services.llm_insights import get_latest_sentiment_batch_analysis
+
+ sentiment = get_latest_sentiment_batch_analysis() or {}
+ except Exception:
+ sentiment = {}
+ return {"market": market, "onchain": onchain, "ai_sentiment": sentiment}
+
+
+def build_context(intent: str, message: str, symbol: str, preferences=None) -> dict:
+ symbol = _normalize_symbol(symbol)
+ ctx = {
+ "intent": intent,
+ "intent_label": INTENT_LABELS.get(intent, intent),
+ "symbol": symbol,
+ "generated_at": _now(),
+ "preferences": preferences or {},
+ "sources": [],
+ }
+ if intent == "unsupported":
+ return ctx
+ if intent in ("coin_analysis", "recommendation_explain", "onchain", "sentiment") and symbol:
+ ctx["technicals"] = analyze_symbol_technicals(symbol)
+ ctx["recommendations"] = _latest_recommendations(symbol=symbol, limit=5)
+ ctx["sentiment_events"] = _sentiment_context(symbol=symbol, limit=8)
+ try:
+ ctx["onchain"] = get_onchain_token_detail(symbol=symbol, hours=168)
+ except Exception as exc:
+ ctx["onchain"] = {"error": str(exc)[:200]}
+ ctx["reviews"] = _review_context(symbol=symbol, limit=8)
+ ctx["paper"] = _paper_context(symbol=symbol)
+ ctx["sources"] = ["binance_klines", "recommendation", "event_news", "onchain", "review_log", "paper_trading"]
+ elif intent == "market_overview":
+ ctx.update(_market_context())
+ ctx["pipeline"] = get_pipeline_runs(limit=5, hours=24, offset=0)
+ ctx["sources"] = ["market_overview", "onchain_overview", "llm_sentiment", "pipeline_runs"]
+ elif intent == "paper_trading":
+ ctx["paper"] = _paper_context(symbol=symbol)
+ ctx["sources"] = ["paper_trading"]
+ elif intent == "review":
+ ctx["reviews"] = _review_context(symbol=symbol, limit=12)
+ ctx["recommendations"] = _latest_recommendations(symbol=symbol, limit=8) if symbol else _latest_recommendations(limit=8)
+ ctx["paper"] = _paper_context(symbol=symbol)
+ ctx["sources"] = ["review_log", "recommendation", "paper_trading"]
+ elif intent == "sentiment":
+ ctx["sentiment_events"] = _sentiment_context(symbol=symbol, limit=20)
+ try:
+ from app.services.llm_insights import get_latest_sentiment_batch_analysis
+
+ ctx["ai_sentiment"] = get_latest_sentiment_batch_analysis() or {}
+ except Exception:
+ ctx["ai_sentiment"] = {}
+ ctx["sources"] = ["event_news", "llm_sentiment"]
+ elif intent == "onchain":
+ if symbol:
+ try:
+ ctx["onchain"] = get_onchain_token_detail(symbol=symbol, hours=168)
+ except Exception as exc:
+ ctx["onchain"] = {"error": str(exc)[:200]}
+ else:
+ ctx["onchain"] = get_onchain_overview(hours=24)
+ ctx["sources"] = ["onchain"]
+ else:
+ ctx["capabilities"] = [
+ "单币技术面:自动拉取 Binance 15m/1h/4h/1d K 线",
+ "推荐解释:结合当前看板状态、入场窗口、TP/SL、风险理由",
+ "舆情和链上:读取当前系统已采集与 AI 解读的结果",
+ "复盘和模拟交易:区分信号表现与纸面交易收益",
+ ]
+ return ctx
+
+
+def _fallback_answer(intent: str, message: str, context: dict) -> dict:
+ if intent == "unsupported":
+ return {
+ "summary": "我只能回答加密货币和 AlphaX 当前数据相关的问题。",
+ "answer": "这个问题超出了 Crypto 研究助手的范围。你可以问某个币的技术面、看板推荐原因、链上异动、舆情影响、复盘或模拟交易表现。",
+ "evidence": [],
+ "related_records": [],
+ "followups": ["分析 BTC/USDT 的技术面", "今天市场适合追强势币吗?"],
+ }
+ symbol = context.get("symbol") or ""
+ tech = context.get("technicals") or {}
+ tech_summary = tech.get("summary") or {}
+ recommendations = context.get("recommendations") or []
+ sentiment = context.get("sentiment_events") or []
+ onchain = context.get("onchain") or {}
+ paper = context.get("paper") or {}
+ evidence = []
+ if tech_summary:
+ evidence.append(f"技术面:{tech_summary.get('headline', '')}")
+ if recommendations:
+ r = recommendations[0]
+ evidence.append(f"主链路:{r.get('execution_label') or r.get('action_status') or r.get('execution_status')},原因:{r.get('execution_reason') or r.get('state_reason') or '--'}")
+ if sentiment:
+ evidence.append(f"舆情:近 72h 有 {len(sentiment)} 条相关事件,最新为「{sentiment[0].get('title', '')[:60]}」。")
+ if isinstance(onchain, dict) and (onchain.get("events") or onchain.get("metrics")):
+ evidence.append(f"链上:近 7 天有 {len(onchain.get('events') or [])} 条映射事件。")
+ if isinstance(paper, dict) and paper.get("trades"):
+ latest = paper["trades"][0]
+ evidence.append(f"模拟交易:最新状态 {latest.get('status')},收益 {latest.get('pnl_pct') or latest.get('realized_pnl_pct') or 0}%。")
+ if not evidence:
+ evidence.append("当前数据库没有足够样本,结论需要降级为观察。")
+
+ if intent == "market_overview":
+ market = (context.get("market") or {})
+ state = market.get("state") or {}
+ summary = state.get("label") or "市场数据已读取"
+ answer = state.get("summary") or "我读取了全市场行情,但当前没有足够信息形成强结论。"
+ elif symbol:
+ summary = tech_summary.get("headline") or f"{symbol} 需要继续观察"
+ answer = "结论:" + summary + " 证据区已汇总技术面、推荐、舆情、链上和模拟交易上下文。"
+ else:
+ summary = "已读取当前系统数据"
+ answer = "我已经按你的问题读取了当前数据库,但没有识别到明确币种;可以继续追问具体币种。"
+
+ return {
+ "summary": summary,
+ "answer": answer,
+ "evidence": evidence[:8],
+ "related_records": _related_records(context),
+ "followups": _followups(intent, symbol),
+ }
+
+
+def _related_records(context: dict) -> list[dict]:
+ records = []
+ for item in (context.get("recommendations") or [])[:3]:
+ records.append({"type": "recommendation", "label": f"推荐 #{item.get('id')}", "symbol": item.get("symbol"), "status": item.get("execution_status")})
+ for item in (context.get("sentiment_events") or [])[:3]:
+ records.append({"type": "sentiment", "label": item.get("title"), "symbol": item.get("symbol"), "status": item.get("importance")})
+ for item in ((context.get("paper") or {}).get("trades") or [])[:3]:
+ records.append({"type": "paper_trade", "label": f"模拟交易 #{item.get('id')}", "symbol": item.get("symbol"), "status": item.get("status")})
+ return records
+
+
+def _compact_technical_context(context: dict) -> dict:
+ tech = context.get("technicals") or {}
+ tfs = {}
+ for tf, item in (tech.get("timeframes") or {}).items():
+ if not item.get("available"):
+ tfs[tf] = {"available": False, "reason": item.get("reason") or ""}
+ continue
+ tfs[tf] = {
+ "available": True,
+ "price": item.get("price"),
+ "trend": item.get("trend"),
+ "rsi14": item.get("rsi14"),
+ "volume_ratio_20": item.get("volume_ratio_20"),
+ "range_position_pct": item.get("range_position_pct"),
+ "support": item.get("support"),
+ "resistance": item.get("resistance"),
+ "pa": {
+ "dynamic_count_recent": (item.get("pa") or {}).get("dynamic_count_recent"),
+ "static_count_recent": (item.get("pa") or {}).get("static_count_recent"),
+ "ignition_count": len((item.get("pa") or {}).get("ignition_points") or []),
+ "zone_count": len((item.get("pa") or {}).get("top_zones") or []),
+ },
+ }
+ return {
+ "summary": tech.get("summary") or {},
+ "binance_tradable": bool(tech.get("binance_tradable")),
+ "timeframes": tfs,
+ }
+
+
+def _followups(intent: str, symbol: str = "") -> list[str]:
+ if symbol:
+ base = symbol.replace("/USDT", "")
+ return [
+ f"{base} 现在追高风险大吗?",
+ f"{base} 的链上和舆情有没有共振?",
+ f"{base} 如果要等回踩,关键价位在哪里?",
+ ]
+ if intent == "market_overview":
+ return ["今天市场更适合做日内还是 1-3 天波段?", "当前涨幅榜有什么共性?"]
+ return ["分析 BTC/USDT 现在的技术面", "解释最新推荐为什么不是可买"]
+
+
+def _call_chat_llm(message: str, context: dict, history=None) -> dict:
+ cfg = llm_config()
+ params = get_llm_params()
+ if not bool(cfg.get("enabled", False)) or not os.getenv(str(params.get("api_key_env") or "ALPHAX_LLM_API_KEY"), "").strip():
+ return {"status": "skipped", "error": "llm_disabled"}
+ if not bool((cfg.get("modules") or {}).get("chat", True)):
+ return {"status": "skipped", "error": "chat_module_disabled"}
+ api_key = os.getenv(str(params.get("api_key_env") or "ALPHAX_LLM_API_KEY"), "").strip()
+ base_url = str(params.get("base_url") or "https://api.openai.com/v1").rstrip("/")
+ model = str(params.get("model") or "gpt-4o-mini").strip()
+ payload = {
+ "user_question": message,
+ "context": context,
+ "recent_history": (history or [])[-8:],
+ "rules": [
+ "只回答加密货币、AlphaX 当前数据、技术面、链上、舆情、复盘和模拟交易相关问题。",
+ "不要给真实下单指令,不要修改推荐状态,不要承诺收益。",
+ "回答使用中文,采用两段式:先结论,再证据。",
+ "输出严格 JSON:summary, answer, evidence[], risk_flags[], related_records[], followups[]。",
+ ],
+ }
+ system_prompt = "你是 AlphaX Agent 的 Crypto 研究助手。你只能基于提供的结构化数据回答,不能编造数据。"
+ try:
+ resp = requests.post(
+ f"{base_url}/chat/completions",
+ headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
+ json={
+ "model": model,
+ "messages": [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": _compact_json(payload)},
+ ],
+ "temperature": 0.15,
+ "max_tokens": int(params.get("max_tokens") or 900),
+ "response_format": {"type": "json_object"},
+ },
+ timeout=int(params.get("timeout") or 20),
+ )
+ if resp.status_code >= 400:
+ return {"status": "failed", "error": f"http_{resp.status_code}:{resp.text[:300]}", "model": model}
+ data = resp.json()
+ content = (((data.get("choices") or [{}])[0]).get("message") or {}).get("content") or "{}"
+ content = repair_mojibake_text(content)
+ parsed = repair_mojibake_json(json.loads(content))
+ if not isinstance(parsed, dict):
+ raise ValueError("llm_output_not_object")
+ parsed.setdefault("summary", parsed.get("answer", "")[:80])
+ parsed.setdefault("evidence", [])
+ parsed.setdefault("risk_flags", [])
+ parsed.setdefault("related_records", _related_records(context))
+ parsed.setdefault("followups", _followups(context.get("intent"), context.get("symbol")))
+ return {"status": "success", "content": parsed, "model": model}
+ except Exception as exc:
+ return {"status": "failed", "error": str(exc)[:500], "model": model}
+
+
+def answer_chat(user_id: int, message: str, session_id: int = 0) -> dict:
+ message = str(message or "").strip()
+ if not message:
+ raise ValueError("message_required")
+ prefs = chat_assistant_db.get_user_preferences(user_id)
+ session = chat_assistant_db.get_chat_session(session_id, user_id) if session_id else None
+ symbol = extract_symbol(message, session=session, preferences=prefs)
+ intent = detect_intent(message, symbol=symbol)
+ if not session:
+ title = symbol or message[:24]
+ session = chat_assistant_db.create_chat_session(user_id, title=title, last_symbol=symbol, last_intent=intent)
+ user_msg = chat_assistant_db.append_chat_message(
+ session["id"],
+ user_id,
+ "user",
+ content_text=message,
+ content_json={"text": message},
+ intent=intent,
+ symbol=symbol,
+ )
+ history = chat_assistant_db.list_chat_messages(session["id"], user_id, limit=12).get("items", [])
+ context = build_context(intent, message, symbol, preferences=prefs)
+ llm_result = _call_chat_llm(message, context, history=history)
+ if llm_result.get("status") == "success":
+ answer = repair_mojibake_json(llm_result.get("content") or {})
+ model = llm_result.get("model") or ""
+ else:
+ answer = repair_mojibake_json(_fallback_answer(intent, message, context))
+ answer["llm_status"] = llm_result.get("status")
+ answer["llm_error"] = llm_result.get("error", "")
+ model = llm_result.get("model") or ""
+ answer.setdefault("related_records", _related_records(context))
+ answer.setdefault("followups", _followups(intent, symbol))
+ answer.setdefault("evidence", [])
+ assistant_text = repair_mojibake_text(answer.get("answer") or answer.get("summary") or "")
+ assistant_msg = chat_assistant_db.append_chat_message(
+ session["id"],
+ user_id,
+ "assistant",
+ content_text=assistant_text,
+ content_json=answer,
+ context_json={
+ "intent": intent,
+ "symbol": symbol,
+ "context_hash": compute_input_hash(context),
+ "sources": context.get("sources") or [],
+ "technical_summary": (context.get("technicals") or {}).get("summary") or {},
+ "technicals": _compact_technical_context(context),
+ },
+ intent=intent,
+ symbol=symbol,
+ model=model,
+ )
+ memory = session.get("memory") or {}
+ if symbol:
+ memory["last_symbol"] = symbol
+ memory["last_intent"] = intent
+ chat_assistant_db.update_chat_session(
+ session["id"],
+ user_id,
+ title=session.get("title") if session.get("title") != "新对话" else (symbol or message[:24]),
+ memory_json=memory,
+ last_symbol=symbol,
+ last_intent=intent,
+ )
+ pref_patch = {"last_intent": intent, "recent_topics": [intent]}
+ if symbol:
+ pref_patch["last_symbol"] = symbol
+ pref_patch["preferred_symbols"] = [symbol]
+ chat_assistant_db.update_user_preferences(user_id, pref_patch)
+ return {
+ "ok": True,
+ "session": chat_assistant_db.get_chat_session(session["id"], user_id),
+ "user_message": user_msg,
+ "assistant_message": assistant_msg,
+ "answer": answer,
+ "intent": intent,
+ "intent_label": INTENT_LABELS.get(intent, intent),
+ "symbol": symbol,
+ "context": {
+ "sources": context.get("sources") or [],
+ "technicals": context.get("technicals") or {},
+ "related_records": answer.get("related_records") or [],
+ },
+ }
+
+
+__all__ = [
+ "analyze_symbol_technicals",
+ "answer_chat",
+ "build_context",
+ "detect_intent",
+ "extract_symbol",
+ "fetch_binance_klines",
+]
diff --git a/app/services/event_driven_screener.py b/app/services/event_driven_screener.py
index ec4595a..0b26cda 100644
--- a/app/services/event_driven_screener.py
+++ b/app/services/event_driven_screener.py
@@ -25,7 +25,7 @@ import yaml
sys.path.insert(0, os.path.dirname(__file__))
from app.config.config_loader import load_rules, get_meta, get_strategy_direction
-from app.db.altcoin_db import init_db, get_conn, create_recommendation, log_screening, log_cron_run, get_recommendation_for_push
+from app.db.altcoin_db import init_db, get_conn, create_recommendation, log_screening, log_cron_run
from app.db.postgres_connection import ensure_migrations_once
from app.db.llm_insights import repair_mojibake_json, repair_mojibake_text
from app.core.opportunity_funnel import build_screening_detail
@@ -41,7 +41,6 @@ from app.services.altcoin_screener import (
)
from app.services.altcoin_confirm import fetch_derivatives_context
from app.core.pa_engine import full_pa_analysis, calc_atr
-from app.integrations.push_orchestrator import push_mainline_state_update
exchange = ccxt.binance({"enableRateLimit": True})
@@ -838,21 +837,6 @@ def process_event(event):
sector_context={"event_title": event.get("title"), "event_url": event.get("url"), "event_source": event.get("source"), "event_importance": event.get("importance"), "trigger_context": result.get("trigger_context", {})},
)
- # 飞书只是通知层:事件脚本不再直接推 observe/risk,也不允许 rec_id=0 的事件旁路进通知。
- # 只有 decision=recommend 且已创建主推荐记录后,消费主链路派生状态进行通知。
- if decision == "recommend" and rec_id and _cfg().get("push", {}).get(decision, True):
- mainline_item = get_recommendation_for_push(rec_id)
- pushed = push_mainline_state_update(
- symbol,
- rec_id,
- mainline_item,
- title_prefix="事件触发机会",
- entry_push_type="event_entry",
- watch_push_type="event_watch_pool",
- )
- elif decision in ("observe", "risk"):
- print(f"[event] skip push {symbol}: decision={decision} is not a主链路推荐通知")
-
conn = get_conn()
conn.execute("""
UPDATE event_news SET processed=1, decision=%s, tech_score=%s, rec_id=%s, pushed=%s
diff --git a/app/services/price_tracker.py b/app/services/price_tracker.py
index 772b8bf..0435255 100644
--- a/app/services/price_tracker.py
+++ b/app/services/price_tracker.py
@@ -30,7 +30,6 @@ from app.db.altcoin_db import (
update_latest_price_cache,
)
from app.db.paper_trading import sync_recommendation as sync_paper_trade
-from app.integrations.push_orchestrator import push_trade_action_update
from app.core.pa_engine import (
calc_atr, full_pa_analysis, detect_trend_exhaustion,
analyze_entry_point,
diff --git a/app/services/review_engine.py b/app/services/review_engine.py
index 3932fce..e936abd 100644
--- a/app/services/review_engine.py
+++ b/app/services/review_engine.py
@@ -32,7 +32,6 @@ from app.config.config_loader import (
promote_candidate_rule_to_learned_rule, bump_strategy_patch_version,
)
from app.analysis import reverse_analysis
-from app.integrations import feishu_review_push
import requests
BINANCE_API = "https://api.binance.com/api/v3"
@@ -1499,26 +1498,6 @@ def run_review(push_enabled: bool = True, compact: bool = False):
f"\n信号淘汰: {'; '.join(results['signal_deprecations'][:5])}"
)
- # 7. 飞书推送
- if push_enabled:
- try:
- ok1, r1 = feishu_review_push.push_review_report(results)
- print(f"[review_engine] 复盘报告推送: ok={ok1}")
-
- if results["reverse_analysis"] and not results["reverse_analysis"].get("error"):
- ok2, r2 = feishu_review_push.push_reverse_analysis_report(results["reverse_analysis"])
- print(f"[review_engine] 逆向分析报告推送: ok={ok2}")
-
- for rule in new_pattern_rules:
- feishu_review_push.push_rule_update_notification(rule.get("candidate_id"), rule.get("description", ""), status="候选规则,未生效")
-
- if results["reverse_analysis"] and results["reverse_analysis"].get("new_rules"):
- for rule in results["reverse_analysis"]["new_rules"]:
- feishu_review_push.push_rule_update_notification(rule.get("candidate_id"), rule.get("description", ""), status="逆向候选,未生效")
-
- except Exception as e:
- print(f"[review_engine] 飞书推送失败: {e}")
-
# 8. 更新meta(迭代元数据)
update_meta("last_review", now.isoformat())
meta = get_review_params() # 先读当前meta
diff --git a/app/web/routes_chat.py b/app/web/routes_chat.py
new file mode 100644
index 0000000..a7a6718
--- /dev/null
+++ b/app/web/routes_chat.py
@@ -0,0 +1,54 @@
+from fastapi import APIRouter, Cookie, HTTPException
+
+from app.db import chat_assistant_db
+from app.services.chat_assistant import answer_chat
+from app.web.shared import ChatSendRequest, ChatSessionRequest, require_api_user_with_subscription
+
+
+router = APIRouter()
+
+
+@router.get("/api/chat/bootstrap")
+async def api_chat_bootstrap(altcoin_session: str = Cookie(default="")):
+ user = require_api_user_with_subscription(altcoin_session)
+ return chat_assistant_db.bootstrap_chat(user["id"])
+
+
+@router.get("/api/chat/sessions")
+async def api_chat_sessions(limit: int = 20, offset: int = 0, altcoin_session: str = Cookie(default="")):
+ user = require_api_user_with_subscription(altcoin_session)
+ return chat_assistant_db.list_chat_sessions(user["id"], limit=limit, offset=offset)
+
+
+@router.post("/api/chat/sessions")
+async def api_chat_create_session(req: ChatSessionRequest, altcoin_session: str = Cookie(default="")):
+ user = require_api_user_with_subscription(altcoin_session)
+ return {"ok": True, "session": chat_assistant_db.create_chat_session(user["id"], title=req.title)}
+
+
+@router.get("/api/chat/sessions/{session_id}")
+async def api_chat_session_detail(session_id: int, altcoin_session: str = Cookie(default="")):
+ user = require_api_user_with_subscription(altcoin_session)
+ session = chat_assistant_db.get_chat_session(session_id, user["id"])
+ if not session:
+ raise HTTPException(status_code=404, detail="对话不存在")
+ messages = chat_assistant_db.list_chat_messages(session_id, user["id"], limit=200)
+ return {"ok": True, "session": session, "messages": messages}
+
+
+@router.delete("/api/chat/sessions/{session_id}")
+async def api_chat_archive_session(session_id: int, altcoin_session: str = Cookie(default="")):
+ user = require_api_user_with_subscription(altcoin_session)
+ session = chat_assistant_db.update_chat_session(session_id, user["id"], archived_at="now")
+ if not session:
+ raise HTTPException(status_code=404, detail="对话不存在")
+ return {"ok": True}
+
+
+@router.post("/api/chat/send")
+async def api_chat_send(req: ChatSendRequest, altcoin_session: str = Cookie(default="")):
+ user = require_api_user_with_subscription(altcoin_session)
+ try:
+ return answer_chat(user["id"], req.message, session_id=req.session_id)
+ except ValueError as exc:
+ raise HTTPException(status_code=400, detail=str(exc))
diff --git a/app/web/routes_pages.py b/app/web/routes_pages.py
index d7430dc..540b84a 100644
--- a/app/web/routes_pages.py
+++ b/app/web/routes_pages.py
@@ -49,6 +49,13 @@ def build_router(templates, repo_root: Path, stock_report_template: str):
return redirect
return render_page("pipeline.html", request, active_nav="pipeline")
+ @router.get("/chat", response_class=HTMLResponse)
+ async def chat_page(request: Request):
+ user, redirect = require_page_user(request)
+ if redirect:
+ return redirect
+ return render_page("chat.html", request, active_nav="chat")
+
@router.get("/llm-insights", response_class=HTMLResponse)
async def llm_insights_page(request: Request):
user, redirect = require_page_user(request)
diff --git a/app/web/shared.py b/app/web/shared.py
index 5735f17..378b524 100644
--- a/app/web/shared.py
+++ b/app/web/shared.py
@@ -84,6 +84,15 @@ class RuntimeConfigRequest(BaseModel):
description: str = ""
+class ChatSessionRequest(BaseModel):
+ title: str = ""
+
+
+class ChatSendRequest(BaseModel):
+ session_id: int = 0
+ message: str
+
+
def auth_error(exc: Exception, status_code: int = 400):
raise HTTPException(status_code=status_code, detail=str(exc))
diff --git a/app/web/web_server.py b/app/web/web_server.py
index c1cacb1..179fca7 100644
--- a/app/web/web_server.py
+++ b/app/web/web_server.py
@@ -16,6 +16,7 @@ from app.db.analytics import get_all_recommendations, get_cron_run_logs, get_cro
from app.db.recommendation_queries import get_active_recommendations, get_active_recommendations_deduped
from app.web.routes_admin import build_router as build_admin_router
from app.web.routes_auth import router as auth_router
+from app.web.routes_chat import router as chat_router
from app.web.routes_content import build_router as build_content_router
from app.web.routes_market import router as market_router
from app.web.routes_onchain import router as onchain_router
@@ -44,6 +45,7 @@ app = FastAPI(title="AlphaX Agent", lifespan=lifespan)
templates = Jinja2Templates(directory=str(REPO_ROOT / "static"))
app.include_router(auth_router)
+app.include_router(chat_router)
app.include_router(recommendations_router)
app.include_router(strategy_router)
app.include_router(onchain_router)
diff --git a/static/base.html b/static/base.html
index 396ea15..d590141 100644
--- a/static/base.html
+++ b/static/base.html
@@ -155,6 +155,7 @@ a { color: inherit; text-decoration: none; }
+
@@ -175,6 +176,7 @@ a { color: inherit; text-decoration: none; }