diff --git a/app/config/system_config.py b/app/config/system_config.py index c63e698..b8bc304 100644 --- a/app/config/system_config.py +++ b/app/config/system_config.py @@ -63,6 +63,7 @@ def default_llm_config(): "recommendations": _env_bool("ALPHAX_LLM_RECOMMENDATIONS_ENABLED", True), "sentiment": _env_bool("ALPHAX_LLM_SENTIMENT_ENABLED", True), "review": _env_bool("ALPHAX_LLM_REVIEW_ENABLED", True), + "chat": _env_bool("ALPHAX_LLM_CHAT_ENABLED", True), }, } diff --git a/app/db/chat_assistant_db.py b/app/db/chat_assistant_db.py new file mode 100644 index 0000000..5cb8924 --- /dev/null +++ b/app/db/chat_assistant_db.py @@ -0,0 +1,324 @@ +"""Chat assistant persistence helpers.""" + +from __future__ import annotations + +import json +from datetime import datetime + +from app.db.llm_insights import repair_mojibake_json, repair_mojibake_text +from app.db.postgres_connection import ensure_migrations_once +from app.db.schema import get_conn + + +def _now() -> str: + return datetime.now().isoformat(timespec="seconds") + + +def _loads(value, fallback=None): + try: + if isinstance(value, str) and value.strip(): + return repair_mojibake_json(json.loads(value)) + if value is not None: + return repair_mojibake_json(value) + except Exception: + pass + return fallback if fallback is not None else {} + + +def _dumps(value) -> str: + return json.dumps(repair_mojibake_json(value if value is not None else {}), ensure_ascii=False, sort_keys=True, default=str) + + +def init_chat_tables(): + ensure_migrations_once() + + +def _normalize_title(title: str) -> str: + title = str(title or "").strip() + return title[:32] or "新对话" + + +def _load_session(row): + if not row: + return None + item = dict(row) + item["memory"] = _loads(item.pop("memory_json", "{}"), {}) + return item + + +def _load_message(row): + if not row: + return None + item = dict(row) + item["content_text"] = repair_mojibake_text(item.get("content_text", "")) + item["content"] = _loads(item.pop("content_json", "{}"), {}) + item["context"] = _loads(item.pop("context_json", "{}"), {}) + return item + + +def get_user_preferences(user_id: int) -> dict: + init_chat_tables() + conn = get_conn() + try: + row = conn.execute("SELECT * FROM chat_user_preferences WHERE user_id=%s", (int(user_id),)).fetchone() + finally: + conn.close() + if not row: + return { + "preferred_symbols": [], + "preferred_timeframes": ["15m", "1h", "4h", "1d"], + "answer_style": "two_stage", + "risk_profile": "balanced", + "last_intent": "", + "last_symbol": "", + "recent_topics": [], + } + prefs = _loads(row.get("preferences_json"), {}) + prefs.setdefault("preferred_symbols", []) + prefs.setdefault("preferred_timeframes", ["15m", "1h", "4h", "1d"]) + prefs.setdefault("answer_style", "two_stage") + prefs.setdefault("risk_profile", "balanced") + prefs.setdefault("last_intent", "") + prefs.setdefault("last_symbol", "") + prefs.setdefault("recent_topics", []) + return prefs + + +def update_user_preferences(user_id: int, patch: dict) -> dict: + init_chat_tables() + current = get_user_preferences(user_id) + patch = patch or {} + for key, value in patch.items(): + if key in ("preferred_symbols", "preferred_timeframes", "recent_topics") and isinstance(value, list): + merged = list(dict.fromkeys([str(x) for x in current.get(key, []) if str(x).strip()] + [str(x) for x in value if str(x).strip()])) + current[key] = merged[-12:] + elif value is not None: + current[key] = value + now = _now() + conn = get_conn() + try: + conn.execute( + """ + INSERT INTO chat_user_preferences (user_id, preferences_json, updated_at) + VALUES (%s, %s, %s) + ON CONFLICT(user_id) DO UPDATE SET + preferences_json=excluded.preferences_json, + updated_at=excluded.updated_at + """, + (int(user_id), _dumps(current), now), + ) + conn.commit() + finally: + conn.close() + return current + + +def create_chat_session(user_id: int, title: str = "", summary: str = "", last_symbol: str = "", last_intent: str = "") -> dict: + init_chat_tables() + now = _now() + conn = get_conn() + try: + row = conn.execute( + """ + INSERT INTO chat_sessions (user_id, title, summary, memory_json, last_symbol, last_intent, created_at, updated_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + RETURNING * + """, + (int(user_id), _normalize_title(title), summary or "", _dumps({}), last_symbol or "", last_intent or "", now, now), + ).fetchone() + conn.commit() + finally: + conn.close() + return _load_session(row) + + +def list_chat_sessions(user_id: int, limit: int = 20, offset: int = 0) -> dict: + init_chat_tables() + limit = max(1, min(int(limit or 20), 100)) + offset = max(0, int(offset or 0)) + conn = get_conn() + try: + total = conn.execute("SELECT COUNT(*) FROM chat_sessions WHERE user_id=%s AND COALESCE(archived_at, '')=''", (int(user_id),)).fetchone()[0] + rows = conn.execute( + """ + SELECT s.*, + (SELECT m.content_text FROM chat_messages m WHERE m.session_id=s.id ORDER BY m.id DESC LIMIT 1) AS last_message_text, + (SELECT m.role FROM chat_messages m WHERE m.session_id=s.id ORDER BY m.id DESC LIMIT 1) AS last_message_role, + (SELECT m.created_at FROM chat_messages m WHERE m.session_id=s.id ORDER BY m.id DESC LIMIT 1) AS last_message_at, + (SELECT COUNT(*) FROM chat_messages m WHERE m.session_id=s.id) AS message_count + FROM chat_sessions s + WHERE s.user_id=%s AND COALESCE(s.archived_at, '')='' + ORDER BY s.updated_at DESC, s.id DESC + LIMIT %s OFFSET %s + """, + (int(user_id), limit, offset), + ).fetchall() + finally: + conn.close() + items = [] + for row in rows: + item = _load_session(row) + item["last_message_text"] = item.pop("last_message_text", "") + item["last_message_role"] = item.pop("last_message_role", "") + item["last_message_at"] = item.pop("last_message_at", "") + item["message_count"] = int(item.pop("message_count", 0) or 0) + items.append(item) + return { + "items": items, + "total": int(total or 0), + "limit": limit, + "offset": offset, + "has_more": offset + len(items) < int(total or 0), + } + + +def get_chat_session(session_id: int, user_id: int) -> dict | None: + init_chat_tables() + conn = get_conn() + try: + row = conn.execute( + "SELECT * FROM chat_sessions WHERE id=%s AND user_id=%s AND COALESCE(archived_at, '')=''", + (int(session_id), int(user_id)), + ).fetchone() + finally: + conn.close() + return _load_session(row) + + +def update_chat_session(session_id: int, user_id: int, **fields) -> dict | None: + init_chat_tables() + session = get_chat_session(session_id, user_id) + if not session: + return None + allowed = {"title", "summary", "memory_json", "last_symbol", "last_intent", "archived_at"} + updates = {} + for key, value in fields.items(): + if key not in allowed or value is None: + continue + updates[key] = value + if not updates: + return session + updates["updated_at"] = _now() + if "title" in updates: + updates["title"] = _normalize_title(updates["title"]) + if "memory_json" in updates and not isinstance(updates["memory_json"], str): + updates["memory_json"] = _dumps(updates["memory_json"]) + if updates.get("archived_at") == "now": + updates["archived_at"] = _now() + sets = ", ".join(f"{key}=%s" for key in updates) + params = list(updates.values()) + [int(session_id), int(user_id)] + conn = get_conn() + try: + row = conn.execute( + f"UPDATE chat_sessions SET {sets} WHERE id=%s AND user_id=%s RETURNING *", + tuple(params), + ).fetchone() + conn.commit() + finally: + conn.close() + return _load_session(row) + + +def list_chat_messages(session_id: int, user_id: int, limit: int = 50, offset: int = 0) -> dict: + init_chat_tables() + limit = max(1, min(int(limit or 50), 200)) + offset = max(0, int(offset or 0)) + conn = get_conn() + try: + total = conn.execute( + "SELECT COUNT(*) FROM chat_messages WHERE session_id=%s AND user_id=%s", + (int(session_id), int(user_id)), + ).fetchone()[0] + rows = conn.execute( + """ + SELECT * FROM chat_messages + WHERE session_id=%s AND user_id=%s + ORDER BY id ASC + LIMIT %s OFFSET %s + """, + (int(session_id), int(user_id), limit, offset), + ).fetchall() + finally: + conn.close() + return { + "items": [_load_message(row) for row in rows], + "total": int(total or 0), + "limit": limit, + "offset": offset, + "has_more": offset + len(rows) < int(total or 0), + } + + +def append_chat_message( + session_id: int, + user_id: int, + role: str, + content_text: str = "", + content_json=None, + context_json=None, + intent: str = "", + symbol: str = "", + timeframe: str = "", + model: str = "", +) -> dict: + init_chat_tables() + now = _now() + conn = get_conn() + try: + row = conn.execute( + """ + INSERT INTO chat_messages ( + session_id, user_id, role, content_text, content_json, context_json, + intent, symbol, timeframe, model, created_at + ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + RETURNING * + """, + ( + int(session_id), + int(user_id), + str(role or "user"), + repair_mojibake_text(str(content_text or "")), + _dumps(content_json or {}), + _dumps(context_json or {}), + str(intent or ""), + str(symbol or ""), + str(timeframe or ""), + str(model or ""), + now, + ), + ).fetchone() + conn.commit() + finally: + conn.close() + return _load_message(row) + + +def bootstrap_chat(user_id: int) -> dict: + prefs = get_user_preferences(user_id) + sessions = list_chat_sessions(user_id=user_id, limit=20, offset=0) + prompts = [ + "分析 BTC/USDT 现在的技术面", + "解释当前看板里这条推荐为什么是等回踩", + "看一下市场总览,今天是偏强还是偏弱", + "这个币的链上异动有哪些", + "帮我复盘最近一次纸面交易", + ] + return { + "preferences": prefs, + "sessions": sessions, + "suggested_prompts": prompts, + } + + +__all__ = [ + "append_chat_message", + "bootstrap_chat", + "create_chat_session", + "get_chat_session", + "get_user_preferences", + "init_chat_tables", + "list_chat_messages", + "list_chat_sessions", + "update_chat_session", + "update_user_preferences", +] diff --git a/app/db/migrations/0009_chat_assistant.sql b/app/db/migrations/0009_chat_assistant.sql new file mode 100644 index 0000000..619b22d --- /dev/null +++ b/app/db/migrations/0009_chat_assistant.sql @@ -0,0 +1,37 @@ +CREATE TABLE IF NOT EXISTS chat_sessions ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT NOT NULL, + title TEXT NOT NULL DEFAULT '新对话', + summary TEXT DEFAULT '', + memory_json TEXT NOT NULL DEFAULT '{}', + last_symbol TEXT DEFAULT '', + last_intent TEXT DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + archived_at TEXT DEFAULT '' +); +CREATE INDEX IF NOT EXISTS idx_chat_sessions_user_updated ON chat_sessions(user_id, updated_at DESC); + +CREATE TABLE IF NOT EXISTS chat_messages ( + id BIGSERIAL PRIMARY KEY, + session_id BIGINT NOT NULL, + user_id BIGINT NOT NULL, + role TEXT NOT NULL, + content_text TEXT DEFAULT '', + content_json TEXT NOT NULL DEFAULT '{}', + context_json TEXT NOT NULL DEFAULT '{}', + intent TEXT DEFAULT '', + symbol TEXT DEFAULT '', + timeframe TEXT DEFAULT '', + model TEXT DEFAULT '', + created_at TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_chat_messages_session_time ON chat_messages(session_id, created_at ASC); +CREATE INDEX IF NOT EXISTS idx_chat_messages_user_time ON chat_messages(user_id, created_at DESC); + +CREATE TABLE IF NOT EXISTS chat_user_preferences ( + user_id BIGINT PRIMARY KEY, + preferences_json TEXT NOT NULL DEFAULT '{}', + updated_at TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_chat_user_preferences_updated ON chat_user_preferences(updated_at DESC); diff --git a/app/db/paper_trading.py b/app/db/paper_trading.py index a357065..2bf9fd1 100644 --- a/app/db/paper_trading.py +++ b/app/db/paper_trading.py @@ -187,6 +187,7 @@ def _push_event_card(event_type: str, trade: dict, result: dict, event_time: str if not title: return card = { + "metadata": {"source": "paper_trading", "event_type": event_type}, "config": {"wide_screen_mode": True}, "header": { "template": "blue" if event_type == "open" else ("yellow" if event_type.startswith("trailing") else "red"), diff --git a/app/integrations/feishu_push.py b/app/integrations/feishu_push.py index 6a11799..6cdd0d0 100644 --- a/app/integrations/feishu_push.py +++ b/app/integrations/feishu_push.py @@ -1,12 +1,9 @@ -""" -山寨币监控飞书卡片推送模块 +"""Feishu push transport for paper trading only.""" -通过飞书机器人 Webhook 直发,不经过 Hermes Agent 的飞书通道。 -Webhook 支持 v2 interactive cards。 -""" +from __future__ import annotations import os -import json + import requests from app.config.system_config import notification_config @@ -29,441 +26,31 @@ def _feishu_settings(): def push_card(card_content): - """通过 webhook 推送飞书交互式卡片""" - payload = { - "msg_type": "interactive", - "card": card_content, - } - try: - settings = _feishu_settings() - if not settings["enabled"]: - return False, "feishu notification disabled" - if not settings["webhook_url"]: - return False, f"{settings['webhook_env']} not configured" - r = requests.post(settings["webhook_url"], json=payload, timeout=settings["timeout"]) - result = r.json() - ok = (r.status_code == 200 and result.get("StatusCode") == 0) - return ok, result - except Exception as e: - return False, str(e) + """Paper trading cards only. - -def push_altcoin_burst_alert(symbol, price, signals, entry_plan, sector="", leader_status="", direction="多头启动"): + Non-paper-trading cards are rejected by construction so no other channel can + continue using this transport by accident. """ - 推荐确认爆发推送 — 只做做多,方向永远多头🟢 - """ - dir_emoji = "🟢" - dir_color = "green" - coin_name = symbol.replace('/USDT', '') + if not isinstance(card_content, dict): + return False, {"skipped": True, "reason": "invalid_card"} + metadata = card_content.get("metadata") or {} + if str(metadata.get("source") or "").strip().lower() != "paper_trading": + return False, {"skipped": True, "reason": "paper_trading_only"} - entry_lines = "" - if entry_plan: - rr_ok = "✅" if entry_plan.get("risk_reward_ok") else "❌" - entry_lines = f"""--- -**入场方案**: -• 入场价: ${entry_plan['entry_price']} -• 入场方式: {entry_plan['entry_method']} -• 止损价: ${entry_plan['stop_loss']} ({entry_plan['stop_pct']}%) -• 止盈1: ${entry_plan['tp1']} (RR={entry_plan['rr1']} {rr_ok}) -• 止盈2: ${entry_plan['tp2']} (RR={entry_plan['rr2']}) -• 当前价: ${entry_plan['current_price']}""" - - sector_line = f"\n**板块**: {sector}" if sector else "" - leader_line = f"\n**龙头状态**: {leader_status}" if leader_status else "" - signal_lines = "\n".join([f" • {s}" for s in signals]) - - card = { - "config": {"wide_screen_mode": True}, - "header": { - "template": dir_color, - "title": {"tag": "plain_text", "content": f"{dir_emoji} {direction}确认 — {coin_name}"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": f"**方向**: {dir_emoji} {direction}\n**价格**: ${price}{sector_line}{leader_line}\n\n**确认信号**:\n{signal_lines}{entry_lines}", - }, - }, - { - "tag": "action", - "actions": [ - { - "tag": "button", - "text": {"tag": "plain_text", "content": f"🔥 {symbol.replace('/USDT','')} 爆发确认"}, - "type": "danger", - } - ], - }, - ], - } - return push_card(card) - - -def push_recommendation_state_alert(item, title_prefix=None): - """主链路推荐状态推送:只渲染 DB/API 已派生好的状态,不做推荐判断。""" - card = build_recommendation_state_card(item, title_prefix=title_prefix) - if isinstance(card, tuple): - ok, reason = card - return ok, reason - return push_card(card) - - -def build_recommendation_state_card(item, title_prefix=None): - """只构建卡片,不负责是否推送、冷却或落库。""" - if not item: - return True, {"skipped": True, "reason": "empty_mainline_item"} - symbol = item.get("symbol", "") - coin = symbol.replace("/USDT", "") - execution_status = item.get("execution_status", "") - action_status = item.get("action_status", "") - execution_label = item.get("execution_label", "") or action_status or execution_status - if execution_status == "buy_now": - color, title = "blue", title_prefix or "入场窗口" - elif execution_status == "wait_pullback": - color, title = "yellow", title_prefix or "观察池:等回踩" - elif execution_status == "observe": - color, title = "blue", title_prefix or "观察池更新" - else: - color, title = "grey", title_prefix or "状态更新" - - entry_plan = item.get("entry_plan") or {} - price = item.get("current_price") or item.get("entry_price") or 0 - entry_ref = entry_plan.get("entry_price") if execution_status == "wait_pullback" else item.get("entry_price") - if not entry_ref: - entry_ref = item.get("entry_price") or entry_plan.get("entry_price") or 0 - risk_line = entry_plan.get("stop_loss") or item.get("stop_loss") or 0 - space_ref = entry_plan.get("tp1") or item.get("tp1") or 0 - signals = item.get("signals") or [] - if isinstance(signals, str): - try: - signals = json.loads(signals) - except Exception: - signals = [signals] - signal_lines = "\n".join([f" • {x}" for x in signals[:5]]) or " • 主链路状态更新" - rec_id = item.get("id", "") - reason = item.get("execution_reason", "") - ver = item.get("strategy_version", "") - return { - "config": {"wide_screen_mode": True}, - "header": { - "template": color, - "title": {"tag": "plain_text", "content": f"{title} — {coin}"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": ( - f"**币种**: {symbol}\n" - f"**主链路状态**: {execution_label}\n" - f"**当前价**: ${price}\n" - f"**参考价**: ${entry_ref} | **风险边界**: ${risk_line} | **上方空间参考**: ${space_ref}\n" - f"**推荐ID**: #{rec_id} | **版本**: {ver}\n" - f"**说明**: {reason}\n\n" - f"**信号摘要**:\n{signal_lines}" - ), - }, - } - ], - } - - -def push_altcoin_accelerating_alert(symbol, price, signals, score, sector="", leader_status="", direction="多头启动"): - """ - 加速信号推送 — 只做做多🟢 - ⚠️ 用户要求:不再推送到飞书,此函数保留但只写日志 - """ - coin_name = symbol.replace('/USDT', '') - print(f"[飞书跳过] 🟠 加速信号 — {coin_name} @ ${price} 评分{score}/20 (用户要求不推送)") - return True, {"skipped": True, "reason": "用户要求不推送加速信号"} - - -def push_altcoin_sector_alert(hot_sectors, leaders_info): - """ - 推送板块联动告警 - ⚠️ 用户要求:不再推送到飞书,此函数保留但只写日志 - """ - print(f"[飞书跳过] 🔵 板块联动信号 — {len(hot_sectors)}个板块 (用户要求不推送)") - return True, {"skipped": True, "reason": "用户要求不推送板块联动"} - - -def push_altcoin_tp_sl_alert(symbol, current_price, entry_price, pnl_pct, action_status, signals, stop_loss=0, tp1=0, tp2=0): - """推送交易执行告警 — 可即刻买入 + 🆕v1.7.8 跟踪止盈触发。止盈/止损/衰减只落库展示,不发飞书。""" - card = build_trade_action_card( - symbol=symbol, - current_price=current_price, - entry_price=entry_price, - pnl_pct=pnl_pct, - action_status=action_status, - signals=signals, - stop_loss=stop_loss, - tp1=tp1, - tp2=tp2, - ) - if isinstance(card, tuple): - ok, reason = card - return ok, reason - return push_card(card) - - -def build_trade_action_card(symbol, current_price, entry_price, pnl_pct, action_status, signals, stop_loss=0, tp1=0, tp2=0): - """只构建交易执行卡片,不做冷却判断或落库。""" - if action_status not in ("可即刻买入", "跟踪止盈", "移动止盈保护"): - print(f"[飞书跳过] {symbol} {action_status} — 用户要求止盈/止损/衰减不推送,只在网站展示") - return True, {"skipped": True, "reason": "only_buy_now_and_trailing_stop_push_enabled"} - - if action_status == "移动止盈保护": - coin = symbol.replace("/USDT", "") - signal_lines = "\n".join([f" • {s}" for s in signals]) or " • 移动止盈保护已启动" - trail_info = f"入场${entry_price:.4f} → 当前${current_price:.4f}" - if pnl_pct > 0: - trail_info += f"\n**当前浮盈: +{pnl_pct:.2f}%**" - return { - "config": {"wide_screen_mode": True}, - "header": { - "template": "yellow", - "title": {"tag": "plain_text", "content": f"🛡️ 移动止盈保护启动 — {coin}"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": f"{trail_info}\n\n**保护详情**:\n{signal_lines}\n\n💡 已进入利润保护阶段,后续跌破保护位会触发跟踪止盈。", - }, - } - ], - } - - # v1.7.8: 跟踪止盈用独立的醒目卡片 - if action_status == "跟踪止盈": - coin = symbol.replace("/USDT", "") - signal_lines = "\n".join([f" • {s}" for s in signals]) - trail_info = f"入场${entry_price:.4f} → 当前${current_price:.4f}" - if pnl_pct > 0: - trail_info += f"\n**累计盈利: +{pnl_pct:.2f}%** 📈" - elif pnl_pct < 0: - trail_info += f"\n**保本出场: {pnl_pct:.2f}%**" - - return { - "config": {"wide_screen_mode": True}, - "header": { - "template": "red", - "title": {"tag": "plain_text", "content": f"🎯 跟踪止盈触发 — {coin}"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": f"{trail_info}\n\n**信号详情**:\n{signal_lines}\n\n💡 跟踪止盈触发,建议立即平仓锁定利润!", - }, - }, - { - "tag": "action", - "actions": [ - { - "tag": "button", - "text": {"tag": "plain_text", "content": f"🎯 {coin} 跟踪止盈"}, - "type": "danger", - } - ], - }, - ], - } - - # 当前只保留入场时机到位推送 - event_config = { - "可即刻买入": ("blue", "🟢", "入场时机到位"), - } - cfg = event_config.get(action_status, ("blue", "⚠️", action_status)) - color, emoji, title_prefix = cfg - - coin = symbol.replace("/USDT", "") - signal_lines = "\n".join([f" • {s}" for s in signals]) - - pnl_emoji = "📈" if pnl_pct > 0 else "📉" if pnl_pct < 0 else "➡️" - price_lines = f"**入场价**: ${entry_price} → **当前价**: ${current_price} → **盈亏**: {pnl_emoji} {pnl_pct}%" - if stop_loss > 0: - price_lines += f"\n**止损**: ${stop_loss}" - if tp1 > 0: - price_lines += f"\n**止盈1**: ${tp1}" - - return { - "config": {"wide_screen_mode": True}, - "header": { - "template": color, - "title": {"tag": "plain_text", "content": f"{emoji} {title_prefix} — {coin}"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": f"{price_lines}\n\n**操作建议**: {action_status}\n\n**信号详情**:\n{signal_lines}", - }, - }, - ], - } - - -def push_altcoin_exhaustion_alert(symbol, current_price, pnl_pct, exhaustion): - """ - 推送趋势衰减告警 — ⚠️ 橙色卡片 - """ - coin = symbol.replace("/USDT", "") - severity = exhaustion.get("severity", "low") - sev_emoji = "⚠️" if severity == "medium" else "🔴" - ex_signals = exhaustion.get("signals", []) - signal_lines = "\n".join([f" • {s}" for s in ex_signals]) - - card = { - "config": {"wide_screen_mode": True}, - "header": { - "template": "orange", - "title": {"tag": "plain_text", "content": f"{sev_emoji} 趋势衰减 — {coin}"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": f"**当前价**: ${current_price} | **盈亏**: {pnl_pct}%\n\n**衰减信号**:\n{signal_lines}\n\n💡 建议:关注止盈机会,趋势可能即将反转", - }, - }, - ], - } - return push_card(card) - - -def push_sentiment_alert(alert): - """ - 推送舆情异动卡片 — 📢 蓝色信息卡 - alert: {"type": "holding_trending"|"new_trending", "symbol", "name", "trend_rank", "alert"} - """ - coin = alert["symbol"].replace("/USDT", "") - alert_type = alert["type"] - emoji = "🔔" if alert_type == "holding_trending" else "🆕" - color = "red" if alert_type == "holding_trending" else "blue" - - extra = "" - if alert_type == "holding_trending": - extra = f"\n⚠️ 持仓币进入热搜,关注价格异动" - - card = { - "config": {"wide_screen_mode": True}, - "header": { - "template": color, - "title": {"tag": "plain_text", "content": f"{emoji} 舆情异动 — {coin}"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": ( - f"**{alert['name']}** 进入 CoinGecko Trending #{alert['trend_rank']}\n" - f"{alert['alert']}{extra}\n\n" - f"💡 消息面热度上升,建议结合技术面判断入场时机" - ), - }, - }, - ], - } - return push_card(card) - - -def push_event_driven_alert(event, result, rec_id=0): - """事件驱动舆情触发选币推送。重大消息触发后,根据技术检查结果分为推荐/观察/风险。""" - symbol = event.get("symbol", "") - coin = symbol.replace("/USDT", "") - decision = result.get("decision", "observe") - importance = event.get("importance", "") - title = event.get("title", "") - source = event.get("source", "") - url = event.get("url", "") - published_at = event.get("published_at", "") - price = result.get("price", 0) - score = result.get("score", 0) - reason = result.get("reason", "") - signals = result.get("signals", []) - entry_plan = result.get("entry_plan", {}) or {} - - if decision == "recommend": - color, emoji, headline = "red", "🚨", "重大舆情触发:可交易机会" - elif decision == "risk": - color, emoji, headline = "orange", "⚠️", "重大舆情风险:不建议追" - else: - color, emoji, headline = "blue", "👀", "重大舆情观察:等待技术确认" - - signal_lines = "\n".join([f" • {s}" for s in signals[:8]]) - link_line = f"\n**来源链接**: [查看原文]({url})" if url else "" - entry_lines = "" - if entry_plan: - entry_lines = ( - f"\n---\n**交易计划**:\n" - f"• 动作: {entry_plan.get('entry_action', '')}\n" - f"• 入场: ${entry_plan.get('entry_price', '')}\n" - f"• 止损: ${entry_plan.get('stop_loss', '')} ({entry_plan.get('stop_pct', '')}%)\n" - f"• TP1/TP2: ${entry_plan.get('tp1', '')} / ${entry_plan.get('tp2', '')}" - ) - rec_line = f"\n**推荐ID**: #{rec_id}" if rec_id else "" - - card = { - "config": {"wide_screen_mode": True}, - "header": { - "template": color, - "title": {"tag": "plain_text", "content": f"{emoji} {headline} — {coin}"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": ( - f"**币种**: {symbol}\n" - f"**重要性**: {importance}级 | **来源**: {source}\n" - f"**发布时间**: {published_at}\n" - f"**消息**: {title}{link_line}\n\n" - f"**技术决策**: {reason}\n" - f"**当前价**: ${price} | **技术分**: {score}\n" - f"{rec_line}\n\n" - f"**触发信号**:\n{signal_lines}" - f"{entry_lines}" - ), - }, - }, - ], - } - return push_card(card) - - -if __name__ == "__main__": - # 测试推送 settings = _feishu_settings() - print(f"Webhook env: {settings['webhook_env']} configured={bool(settings['webhook_url'])}") - print("\n测试爆发卡片推送...") - ok, result = push_altcoin_burst_alert( - "FET/USDT", 2.15, - ["1H放量突破阻力(2.3倍)", "1H MACD金叉", "1H 均线多头初成", "15min 5阳线+量递增"], - {"entry_price": 2.10, "entry_method": "回踩确认", "stop_loss": 1.95, - "stop_pct": 3.0, "tp1": 2.55, "tp2": 2.80, "rr1": 3.0, "rr2": 5.0, - "risk_reward_ok": True, "current_price": 2.15, "atr_1h": 0.10}, - sector="AI_DePIN", leader_status="板块龙头(AI_DePIN)", - ) - print(f"爆发卡片: ok={ok}, result={result}") + if not settings["enabled"]: + return False, "feishu notification disabled" + if not settings["webhook_url"]: + return False, f"{settings['webhook_env']} not configured" - print("\n测试加速卡片推送(应被跳过)...") - ok2, result2 = push_altcoin_accelerating_alert( - "ARB/USDT", 0.125, - ["4H MACD金叉", "4H RSI拐点(35→52)", "板块联动: Layer2龙头启动"], - score=10, sector="Layer2", - ) - print(f"加速卡片: ok={ok2}, result={result2}") + payload = {"msg_type": "interactive", "card": card_content} + try: + resp = requests.post(settings["webhook_url"], json=payload, timeout=settings["timeout"]) + result = resp.json() + ok = resp.status_code == 200 and result.get("StatusCode") == 0 + return ok, result + except Exception as exc: + return False, str(exc) - print("\n测试板块联动推送(应被跳过)...") - ok4, result4 = push_altcoin_sector_alert(["AI"], {"AI": {"leader": "FET/USDT", "leader_pct": 12.5, "is_leader_hot": True}}) - print(f"板块联动: ok={ok4}, result={result4}") + +__all__ = ["push_card"] diff --git a/app/integrations/feishu_review_push.py b/app/integrations/feishu_review_push.py deleted file mode 100644 index b5d3a9b..0000000 --- a/app/integrations/feishu_review_push.py +++ /dev/null @@ -1,298 +0,0 @@ -""" -飞书复盘报告推送模块 - -推送三类卡片: -1. push_review_report — 策略复盘报告(蓝色主题) -2. push_reverse_analysis_report — 逆向分析报告(紫色主题) -3. push_rule_update_notification — 新规律通知(绿色主题) - -复用 feishu_push.py 的认证模式(load_feishu_creds → get_token → push_card) -""" - -import os -import sys -import json - -sys.path.insert(0, os.path.dirname(__file__)) -from app.integrations.feishu_push import push_card - -CHAT_ID = "oc_2c597ad94167102922de142928e2917a" - - -# ==================== 1. 策略复盘报告 ==================== - -def push_review_report(review_results): - """ - 推送策略复盘报告卡片 — 📊 蓝色主题 - - Section 1: 推荐命中统计 (hit/fail/flat counts, hit rate %) - Section 2: 信号绩效TOP5 (best performing signals) - Section 3: 遗漏爆炸 (missed coins, why, what features) - Section 4: 权重调整 (weight changes) - """ - reviews = review_results.get("review_details", []) - weight_adj = review_results.get("weight_adjustments", []) - missed = review_results.get("missed_explosions", []) - - # Section 1: 命中统计 - hit_count = sum(1 for r in reviews if r.get("outcome") == "爆发") - fail_count = sum(1 for r in reviews if r.get("outcome") == "失败") - flat_count = sum(1 for r in reviews if r.get("outcome") == "横盘") - total = len(reviews) - hit_rate_pct = round(hit_count / total * 100, 1) if total > 0 else 0 - - # 命中统计文案 - hit_emoji = "🔥" if hit_rate_pct >= 50 else "⚠️" if hit_rate_pct >= 30 else "❌" - stats_line = ( - f"本次复盘 **{total}** 条推荐:\n" - f" • 爆发(命中): **{hit_count}** ({hit_emoji})\n" - f" • 横盘: **{flat_count}**\n" - f" • 失败: **{fail_count}**\n" - f" • 命中率: **{hit_rate_pct}%**" - ) - - # Section 2: 信号绩效TOP5 - # 从review_results中提取信号绩效信息 - from app.db.altcoin_db import get_signal_weights - weights = get_signal_weights() - sig_perf_list = sorted( - [(sig, data) for sig, data in weights.items() if data.get("total_count", 0) >= 3], - key=lambda x: x[1].get("hit_rate", 0), - reverse=True, - )[:5] - - sig_lines = "" - if sig_perf_list: - for sig, data in sig_perf_list: - hr = data.get("hit_rate", 0) - w = data.get("weight", 0) - total_n = data.get("total_count", 0) - cat = data.get("category", "") - emoji = "✅" if hr >= 50 else "⚠️" if hr >= 30 else "❌" - sig_lines += f"\n • {emoji} **{sig}**({cat}): 命中率{hr}% | 权重{w} | 样本{total_n}" - else: - sig_lines = "\n • 样本不足,暂无绩效数据" - - # Section 3: 遗漏爆炸 - missed_lines = "" - if missed: - for m in missed[:5]: # 最多展示5只 - symbol = m.get("symbol", "") - gain = m.get("gain_pct", 0) - reason = m.get("reason_missed", m.get("reason", "")) - features = m.get("features_detected", []) - if isinstance(features, str): - try: - features = json.loads(features) - except: - features = [features] - feat_str = ", ".join(str(f) for f in features[:3]) if features else "无" - missed_lines += f"\n • 💥 **{symbol}** 涨{gain}% | 原因: {reason} | 特征: {feat_str}" - else: - missed_lines = "\n • ✅ 无遗漏爆炸" - - # Section 4: 权重调整 - adj_lines = "" - if weight_adj: - for adj in weight_adj: - adj_lines += f"\n • {adj}" - else: - adj_lines = "\n • 无权重调整" - - # 构建卡片 - card = { - "config": {"wide_screen_mode": True}, - "header": { - "template": "blue", - "title": {"tag": "plain_text", "content": "📊 山寨币策略复盘报告"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": ( - f"**=== 推荐命中统计 ===**\n{stats_line}\n\n" - f"**=== 信号绩效TOP5 ===**\n{sig_lines}\n\n" - f"**=== 遗漏爆炸 ===**\n{missed_lines}\n\n" - f"**=== 权重调整 ===**\n{adj_lines}" - ), - }, - }, - { - "tag": "action", - "actions": [ - { - "tag": "button", - "text": {"tag": "plain_text", "content": f"📊 命中率{hit_rate_pct}%"}, - "type": "primary" if hit_rate_pct >= 50 else "warning" if hit_rate_pct >= 30 else "danger", - } - ], - }, - ], - } - return push_card(card) - - -# ==================== 2. 逆向分析报告 ==================== - -def push_reverse_analysis_report(reverse_results): - """ - 推送逆向分析报告卡片 — 🔍 紫色主题 - - Section 1: 今日涨幅榜TOP10 (symbol, gain%, sector) - Section 2: 起爆前共性特征 (pattern summary with percentages) - Section 3: 新发现规律 (any new rules) - """ - top_gainers = reverse_results.get("top_gainers", []) - pattern_summary = reverse_results.get("pattern_summary", []) - new_rules = reverse_results.get("new_rules", []) - total_unrecommended = reverse_results.get("total_unrecommended", 0) - total_analyzed = reverse_results.get("total_analyzed", 0) - - # Section 1: 涨幅榜TOP10 - gainer_lines = "" - for i, g in enumerate(top_gainers[:10], 1): - symbol = g.get("symbol", "").replace("/USDT", "") - gain = g.get("gain_pct", 0) - sector = g.get("sector", []) - sector_str = sector[0] if isinstance(sector, list) and sector else (sector if sector else "未知") - volume = g.get("volume_24h", 0) - vol_str = f"${volume / 1e6:.1f}M" if volume > 0 else "" - gainer_lines += f"\n {i}. **{symbol}** +{gain}% | {sector_str} | {vol_str}" - - if not gainer_lines: - gainer_lines = "\n • 今日无明显涨幅" - - # Section 2: 起爆前共性特征 - pattern_lines = "" - for p in pattern_summary[:8]: # 最多展示8个特征 - label = p.get("label", p.get("feature", "")) - pct = p.get("percentage", 0) - count = p.get("count", 0) - total = p.get("total", 0) - bar = "█" * int(pct / 10) + "░" * (10 - int(pct / 10)) - emoji = "🔥" if pct >= 60 else "✅" if pct >= 40 else "⚠️" if pct >= 20 else "❌" - pattern_lines += f"\n • {emoji} **{label}**: {pct}%({count}/{total}) {bar}" - - if not pattern_lines: - pattern_lines = "\n • 分析样本不足" - - # Section 3: 新发现规律 - rule_lines = "" - if new_rules: - for r in new_rules: - rule_id = r.get("rule_id", "") - desc = r.get("description", "") - score_adj = r.get("score_adjust", 0) - rule_type = r.get("type", "") - rule_lines += f"\n • 🧠 **{rule_id}**: {desc} → 评分{score_adj}({rule_type})" - else: - rule_lines = "\n • 暂无新规律达到显著性阈值" - - # 分析概况 - overview = ( - f"涨幅榜共{len(top_gainers)}只 ≥10%\n" - f"未被推荐: {total_unrecommended}只\n" - f"已做PA分析: {total_analyzed}只" - ) - - # 构建卡片(紫色主题用 "violet" — 飞书卡片没有purple,用indigo近似) - card = { - "config": {"wide_screen_mode": True}, - "header": { - "template": "indigo", - "title": {"tag": "plain_text", "content": "🔍 逆向分析报告 — 涨幅榜复盘"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": ( - f"**{overview}**\n\n" - f"**=== 今日涨幅榜TOP10 ===**\n{gainer_lines}\n\n" - f"**=== 起爆前共性特征 ===**\n{pattern_lines}\n\n" - f"**=== 新发现规律 ===**\n{rule_lines}" - ), - }, - }, - { - "tag": "action", - "actions": [ - { - "tag": "button", - "text": {"tag": "plain_text", "content": f"🔍 分析{total_analyzed}只暴涨币"}, - "type": "primary", - } - ], - }, - ], - } - return push_card(card) - - -# ==================== 3. 新规律通知 ==================== - -def push_rule_update_notification(rule_id, description, status="候选规则,未生效"): - """ - 推送新规律学习通知 — 🧠 绿色主题 - 简洁卡片,告知策略自动迭代 - """ - card = { - "config": {"wide_screen_mode": True}, - "header": { - "template": "green", - "title": {"tag": "plain_text", "content": "🧠 策略自学习 — 候选规则发现"}, - }, - "elements": [ - { - "tag": "div", - "text": { - "tag": "lark_md", - "content": ( - f"规则ID: **{rule_id}**\n\n" - f"状态: **{status}**\n\n" - f"描述: {description}\n\n" - f"说明: 该规则仅进入候选池/灰度评估,未通过发布闸门前不会写入正式规则库,也不会影响下次选币。" - ), - }, - }, - { - "tag": "action", - "actions": [ - { - "tag": "button", - "text": {"tag": "plain_text", "content": f"🧠 {rule_id}"}, - "type": "primary", - } - ], - }, - ], - } - return push_card(card) - - -# ==================== 测试 ==================== - -if __name__ == "__main__": - print("测试复盘报告推送...") - - # 测试review report - test_review = { - "review_details": [ - {"symbol": "FET/USDT", "outcome": "爆发", "pnl_48h": 12.5}, - {"symbol": "ARB/USDT", "outcome": "横盘", "pnl_48h": 1.2}, - {"symbol": "PEPE/USDT", "outcome": "失败", "pnl_48h": -4.5}, - ], - "weight_adjustments": ["量价齐飞: 3→4.0 (命中率67%)"], - "missed_explosions": [ - {"symbol": "INJ/USDT", "gain_pct": 25, "reason_missed": "细筛淘汰(score=4)", "features_detected": ["ignition_point", "Q7_zone"]}, - ], - } - ok1, r1 = push_review_report(test_review) - print(f"复盘报告: ok={ok1}") - - # 测试rule notification - ok2, r2 = push_rule_update_notification("rule_20260429_001", "涨幅榜60%有起爆点 → 起爆点是爆发前必现信号") - print(f"规律通知: ok={ok2}") diff --git a/app/integrations/push_orchestrator.py b/app/integrations/push_orchestrator.py deleted file mode 100644 index f67a4dc..0000000 --- a/app/integrations/push_orchestrator.py +++ /dev/null @@ -1,43 +0,0 @@ -"""Push orchestration helpers. - -Separates eligibility / cooldown decisions from payload rendering and transport. -""" - -from app.db.recommendation_queries import log_push, should_push -from app.integrations.feishu_push import build_trade_action_card, push_card - - -def push_mainline_state_update(symbol: str, rec_id: int, mainline_item: dict, title_prefix: str | None = None, entry_push_type: str = "entry", watch_push_type: str = "watch_pool") -> bool: - """主链路状态只记录,不再飞书推送。""" - status = mainline_item.get("execution_status") if mainline_item else "missing" - action = mainline_item.get("action_status", "") if mainline_item else "" - print(f"[push] skip {symbol}: mainline notifications disabled (status={status}, action={action})") - return False - - -def push_trade_action_update(symbol: str, rec_id: int, state_decision: dict, final_action: str, push_type: str = "entry") -> bool: - if not state_decision.get("push_required"): - return False - if not should_push(symbol, push_type, final_action): - print(f"⏭ 跳过推送({symbol}): {push_type}/{final_action} 12h冷却中") - return False - card = build_trade_action_card( - state_decision["push_symbol"], - state_decision["push_current_price"], - state_decision["push_entry_price"], - state_decision["push_pnl_pct"], - final_action, - state_decision.get("push_signals", []), - state_decision.get("stop_loss", 0), - state_decision.get("tp1", 0), - state_decision.get("tp2", 0), - ) - if isinstance(card, tuple): - ok, resp = card - else: - ok, resp = push_card(card) - if ok: - log_push(symbol, push_type, final_action, rec_id=rec_id) - return True - print(f"飞书推送失败({symbol}): {resp}") - return False diff --git a/app/services/altcoin_confirm.py b/app/services/altcoin_confirm.py index be8b7ff..1b28609 100644 --- a/app/services/altcoin_confirm.py +++ b/app/services/altcoin_confirm.py @@ -29,9 +29,8 @@ from app.core.sector_map import get_burst_threshold, is_meme_coin, get_sector_fo from app.db.altcoin_db import ( init_db, expire_old_states, expire_old_recommendations, get_candidates_for_confirm, update_state, get_conn, create_recommendation, log_screening, - log_cron_run, update_latest_price_cache, get_recommendation_for_push, + log_cron_run, update_latest_price_cache, ) -from app.integrations.push_orchestrator import push_mainline_state_update from app.config.config_loader import ( get_strategy_direction, vp_fly_params, @@ -1334,8 +1333,6 @@ def main(compact: bool = False): update_latest_price_cache(symbol, result["price"], updated_at=datetime.now().isoformat(), source="confirm") result["rec_id"] = rec_id - mainline_item = get_recommendation_for_push(rec_id) - push_mainline_state_update(symbol, rec_id, mainline_item) else: cand_detail = json.loads(cand.get("detail_json", "{}")) log_screening( diff --git a/app/services/chat_assistant.py b/app/services/chat_assistant.py new file mode 100644 index 0000000..8ea01a2 --- /dev/null +++ b/app/services/chat_assistant.py @@ -0,0 +1,755 @@ +"""Conversational crypto research assistant. + +The assistant is read-only: it can inspect AlphaX data and Binance OHLCV, but +it never mutates recommendations, strategy state, or trading state. +""" + +from __future__ import annotations + +import json +import math +import os +import re +from datetime import datetime, timedelta +import ccxt +import pandas as pd +import requests + +from app.config.system_config import llm_config +from app.core.pa_engine import calc_atr, full_pa_analysis +from app.db import chat_assistant_db +from app.db.analytics import get_pipeline_runs +from app.db.llm_insights import compute_input_hash, repair_mojibake_json, repair_mojibake_text +from app.db.onchain_db import get_onchain_token_detail, get_onchain_overview +from app.db.paper_trading import get_paper_trading_summary, list_paper_trade_events, list_paper_trades +from app.db.schema import get_conn +from app.services.llm_insights import get_llm_params +from app.services.market_overview import get_crypto_market_overview + + +exchange = ccxt.binance({"enableRateLimit": True}) + +CRYPTO_TERMS = { + "btc", "eth", "bnb", "sol", "xrp", "doge", "ada", "sui", "link", "qnt", + "币", "加密", "crypto", "usdt", "binance", "行情", "链上", "舆情", "推荐", "复盘", + "k线", "k 线", "技术面", "止盈", "止损", "纸面", "模拟交易", "仓位", "山寨", +} + +INTENT_LABELS = { + "coin_analysis": "单币分析", + "market_overview": "市场问答", + "recommendation_explain": "推荐解释", + "sentiment": "舆情解读", + "onchain": "链上异动", + "review": "复盘查询", + "paper_trading": "模拟交易", + "help": "帮助", + "unsupported": "范围外", +} + +TIMEFRAMES = ("15m", "1h", "4h", "1d") + + +def _now() -> str: + return datetime.now().isoformat(timespec="seconds") + + +def _safe_float(value, default=0.0) -> float: + try: + if value is None or value == "": + return default + return float(value) + except Exception: + return default + + +def _safe_int(value, default=0) -> int: + try: + return int(value or 0) + except Exception: + return default + + +def _json(value): + return json.dumps(value or {}, ensure_ascii=False, sort_keys=True, default=str) + + +def _compact_json(value, max_len=18000): + text = _json(value) + if len(text) <= max_len: + return text + return text[:max_len] + "...[truncated]" + + +def _normalize_symbol(value: str) -> str: + text = str(value or "").strip().upper() + text = text.replace(" ", "") + if not text: + return "" + if "/" in text: + base, quote = text.split("/", 1) + quote = quote or "USDT" + return f"{base}/USDT" if quote in ("USD", "USDT", "BUSD", "FDUSD") else f"{base}/{quote}" + text = re.sub(r"[^A-Z0-9]", "", text) + if text.endswith("USDT") and len(text) > 4: + text = text[:-4] + return f"{text}/USDT" if text else "" + + +def extract_symbol(message: str, session=None, preferences=None) -> str: + text = str(message or "") + patterns = [ + r"\b([A-Z0-9]{2,15})\s*/\s*USDT\b", + r"\b([A-Z0-9]{2,15})USDT\b", + r"\$([A-Z][A-Z0-9]{1,12})\b", + ] + for pat in patterns: + m = re.search(pat, text, flags=re.I) + if m: + return _normalize_symbol(m.group(1)) + + upper_tokens = re.findall(r"(? str: + text = str(message or "").lower() + if not _is_crypto_question(text, symbol): + return "unsupported" + if any(k in text for k in ("怎么用", "能做什么", "帮助", "help", "问什么")): + return "help" + if any(k in text for k in ("模拟交易", "paper", "开仓", "平仓", "持仓", "收益", "仓位")): + return "paper_trading" + if any(k in text for k in ("链上", "onchain", "鲸鱼", "转账", "dex", "流动性", "合约")): + return "onchain" + if any(k in text for k in ("舆情", "新闻", "消息", "情绪", "热点", "叙事", "ai 舆情")): + return "sentiment" + if any(k in text for k in ("复盘", "历史", "亏损", "失败", "胜率", "漏选")): + return "review" + if any(k in text for k in ("推荐", "看板", "为什么", "等回踩", "可买", "观察", "信号")) and symbol: + return "recommendation_explain" + if any(k in text for k in ("市场", "大盘", "全市场", "资金费率", "涨幅榜", "广度")) and not symbol: + return "market_overview" + if symbol: + return "coin_analysis" + return "market_overview" + + +def _is_crypto_question(text: str, symbol: str = "") -> bool: + if symbol: + return True + return any(term in text for term in CRYPTO_TERMS) + + +def _binance_symbol_id(symbol: str) -> str: + return _normalize_symbol(symbol).replace("/", "") + + +def fetch_binance_klines(symbol: str, timeframe: str, limit: int = 160): + symbol = _normalize_symbol(symbol) + try: + ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit) + except Exception: + return None + if not ohlcv: + return None + df = pd.DataFrame(ohlcv, columns=["timestamp", "open", "high", "low", "close", "volume"]) + df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") + return df + + +def _ma(series, n): + if series is None or len(series) < n: + return 0.0 + value = series.rolling(n).mean().iloc[-1] + return 0.0 if pd.isna(value) else float(value) + + +def _rsi(closes, period=14): + if closes is None or len(closes) < period + 2: + return 0.0 + delta = closes.diff() + gain = delta.clip(lower=0).rolling(period).mean() + loss = (-delta.clip(upper=0)).rolling(period).mean() + rs = gain / loss.replace(0, math.nan) + value = 100 - (100 / (1 + rs.iloc[-1])) + return 50.0 if pd.isna(value) else round(float(value), 2) + + +def _support_resistance(df): + if df is None or len(df) < 20: + return {"support": 0, "resistance": 0, "range_position_pct": 0} + recent = df.tail(48 if len(df) >= 48 else len(df)) + support = float(recent["low"].min()) + resistance = float(recent["high"].max()) + price = float(df["close"].iloc[-1]) + position = (price - support) / (resistance - support) * 100 if resistance > support else 50 + return { + "support": round(support, 8), + "resistance": round(resistance, 8), + "range_position_pct": round(max(0, min(100, position)), 2), + } + + +def _technical_summary_for_df(df, timeframe: str) -> dict: + if df is None or len(df) < 30: + return {"timeframe": timeframe, "available": False, "reason": "kline_unavailable"} + closes = df["close"].astype(float) + volumes = df["volume"].astype(float) + price = float(closes.iloc[-1]) + prev = float(closes.iloc[-2]) if len(closes) >= 2 else price + change = ((price / prev) - 1) * 100 if prev > 0 else 0 + change_window = ((price / float(closes.iloc[0])) - 1) * 100 if float(closes.iloc[0]) > 0 else 0 + ma20 = _ma(closes, 20) + ma60 = _ma(closes, 60) + avg_vol20 = _ma(volumes, 20) + vol_ratio = float(volumes.iloc[-1]) / avg_vol20 if avg_vol20 > 0 else 0 + atr = calc_atr(df, 14) + pa = full_pa_analysis(df, timeframe) + sr = _support_resistance(df) + last_candle = { + "open": round(float(df["open"].iloc[-1]), 8), + "high": round(float(df["high"].iloc[-1]), 8), + "low": round(float(df["low"].iloc[-1]), 8), + "close": round(price, 8), + "volume": round(float(volumes.iloc[-1]), 4), + "is_bullish": price >= float(df["open"].iloc[-1]), + } + trend = "sideways" + if ma20 > 0 and ma60 > 0 and price > ma20 > ma60: + trend = "uptrend" + elif ma20 > 0 and ma60 > 0 and price < ma20 < ma60: + trend = "downtrend" + elif ma20 > 0 and price > ma20: + trend = "rebound" + elif ma20 > 0 and price < ma20: + trend = "weak" + return { + "timeframe": timeframe, + "available": True, + "last_time": str(df["timestamp"].iloc[-1]), + "price": round(price, 8), + "change_last_bar_pct": round(change, 2), + "change_window_pct": round(change_window, 2), + "trend": trend, + "ma20": round(ma20, 8), + "ma60": round(ma60, 8), + "rsi14": _rsi(closes, 14), + "atr14": round(float(atr or 0), 8), + "volume_ratio_20": round(vol_ratio, 2), + "support": sr["support"], + "resistance": sr["resistance"], + "range_position_pct": sr["range_position_pct"], + "last_candle": last_candle, + "pa": { + "dynamic_count_recent": sum(1 for x in (pa.get("candles_class") or [])[-10:] if x.get("type") == "dynamic"), + "static_count_recent": sum(1 for x in (pa.get("candles_class") or [])[-10:] if x.get("type") == "static"), + "top_zones": (pa.get("zones") or [])[:3], + "continuous_k": (pa.get("continuous_k") or [])[-3:], + "ignition_points": (pa.get("ignition_points") or [])[-3:], + "trend_exhaustion": pa.get("trend_exhaustion") or {}, + }, + } + + +def analyze_symbol_technicals(symbol: str) -> dict: + symbol = _normalize_symbol(symbol) + result = { + "symbol": symbol, + "source": "binance_spot", + "binance_tradable": False, + "timeframes": {}, + "summary": {}, + "errors": [], + } + for tf in TIMEFRAMES: + df = fetch_binance_klines(symbol, tf, limit=180 if tf == "1d" else 160) + if df is None or len(df) < 30: + result["timeframes"][tf] = {"timeframe": tf, "available": False, "reason": "binance_kline_unavailable"} + result["errors"].append(f"{tf}: kline_unavailable") + continue + result["binance_tradable"] = True + result["timeframes"][tf] = _technical_summary_for_df(df, tf) + + available = [x for x in result["timeframes"].values() if x.get("available")] + if not available: + result["summary"] = {"stance": "no_data", "headline": "未拿到 Binance K 线,无法做实时技术面判断。", "risk_level": "unknown"} + return result + + h1 = result["timeframes"].get("1h") or {} + h4 = result["timeframes"].get("4h") or {} + d1 = result["timeframes"].get("1d") or {} + bullish = sum(1 for x in available if x.get("trend") in ("uptrend", "rebound") and _safe_float(x.get("volume_ratio_20")) >= 0.8) + bearish = sum(1 for x in available if x.get("trend") in ("downtrend", "weak") and _safe_float(x.get("change_window_pct")) < 0) + chase_risk = any(_safe_float(x.get("range_position_pct")) >= 85 and _safe_float(x.get("rsi14")) >= 70 for x in (h1, h4, d1) if x.get("available")) + exhaustion = (h1.get("pa") or {}).get("trend_exhaustion") or {} + if bullish >= 3 and not chase_risk: + stance = "bullish_watch" + headline = "多周期结构偏强,但仍需要结合入场窗口和风险收益比。" + elif chase_risk: + stance = "chase_risk" + headline = "价格处在区间高位或 RSI 偏热,追高风险需要优先处理。" + elif bearish >= 2 or exhaustion.get("exhausted"): + stance = "weak_or_exhausted" + headline = "短线结构偏弱或出现衰减,不适合直接按强势机会处理。" + else: + stance = "neutral" + headline = "结构还不够一致,适合观察等待更清晰触发。" + result["summary"] = { + "stance": stance, + "headline": headline, + "risk_level": "high" if chase_risk or exhaustion.get("severity") == "high" else "medium" if bearish else "normal", + "bullish_timeframe_count": bullish, + "bearish_timeframe_count": bearish, + "chase_risk": chase_risk, + "latest_price": h1.get("price") or available[0].get("price"), + } + return result + + +def _latest_recommendations(symbol: str = "", limit=5): + conn = get_conn() + params = [] + where = "1=1" + if symbol: + where += " AND symbol=%s" + params.append(_normalize_symbol(symbol)) + rows = conn.execute( + f""" + SELECT * FROM recommendation + WHERE {where} + ORDER BY id DESC + LIMIT %s + """, + tuple(params + [int(limit)]), + ).fetchall() + conn.close() + items = [] + from app.db.altcoin_db import _derive_execution_fields + + for row in rows: + item = _derive_execution_fields(dict(row)) + for key in ("market_context_json", "derivatives_context_json", "sector_context_json", "entry_plan_json", "signal_codes_json", "signal_labels_json"): + value = item.get(key) + if isinstance(value, str): + try: + if value: + parsed = json.loads(value) + elif key.endswith("context_json") or key == "entry_plan_json": + parsed = {} + else: + parsed = [] + item[key.replace("_json", "")] = parsed + except Exception: + pass + items.append(item) + return items + + +def _sentiment_context(symbol: str = "", limit=8): + conn = get_conn() + params = [] + where = "detected_at >= %s" + params.append((datetime.now() - timedelta(hours=72)).isoformat()) + if symbol: + where += " AND symbol=%s" + params.append(_normalize_symbol(symbol)) + rows = conn.execute( + f""" + SELECT id, source, symbol, title, url, published_at, detected_at, importance, event_type, decision, tech_score, rec_id + FROM event_news + WHERE {where} + ORDER BY detected_at DESC, id DESC + LIMIT %s + """, + tuple(params + [int(limit)]), + ).fetchall() + conn.close() + return [dict(row) for row in rows] + + +def _review_context(symbol: str = "", limit=8): + conn = get_conn() + params = [] + where = "1=1" + if symbol: + where += " AND rl.symbol=%s" + params.append(_normalize_symbol(symbol)) + rows = conn.execute( + f""" + SELECT rl.*, r.execution_status, r.display_bucket, r.action_status, r.entry_triggered + FROM review_log rl + LEFT JOIN recommendation r ON r.id=rl.rec_id + WHERE {where} + ORDER BY rl.review_time DESC, rl.id DESC + LIMIT %s + """, + tuple(params + [int(limit)]), + ).fetchall() + conn.close() + return [dict(row) for row in rows] + + +def _paper_context(symbol: str = ""): + if symbol: + trades = list_paper_trades(limit=10, status="") + symbol_norm = _normalize_symbol(symbol) + items = [x for x in trades.get("items", []) if x.get("symbol") == symbol_norm][:5] + events = list_paper_trade_events(limit=20, symbol=symbol_norm) + return {"trades": items, "events": events.get("items", [])} + return { + "summary": get_paper_trading_summary(days=30), + "trades": list_paper_trades(limit=5).get("items", []), + "events": list_paper_trade_events(limit=8).get("items", []), + } + + +def _market_context(): + try: + market = get_crypto_market_overview() + except Exception as exc: + market = {"error": str(exc)[:300]} + try: + onchain = get_onchain_overview(hours=24) + except Exception: + onchain = {} + try: + from app.services.llm_insights import get_latest_sentiment_batch_analysis + + sentiment = get_latest_sentiment_batch_analysis() or {} + except Exception: + sentiment = {} + return {"market": market, "onchain": onchain, "ai_sentiment": sentiment} + + +def build_context(intent: str, message: str, symbol: str, preferences=None) -> dict: + symbol = _normalize_symbol(symbol) + ctx = { + "intent": intent, + "intent_label": INTENT_LABELS.get(intent, intent), + "symbol": symbol, + "generated_at": _now(), + "preferences": preferences or {}, + "sources": [], + } + if intent == "unsupported": + return ctx + if intent in ("coin_analysis", "recommendation_explain", "onchain", "sentiment") and symbol: + ctx["technicals"] = analyze_symbol_technicals(symbol) + ctx["recommendations"] = _latest_recommendations(symbol=symbol, limit=5) + ctx["sentiment_events"] = _sentiment_context(symbol=symbol, limit=8) + try: + ctx["onchain"] = get_onchain_token_detail(symbol=symbol, hours=168) + except Exception as exc: + ctx["onchain"] = {"error": str(exc)[:200]} + ctx["reviews"] = _review_context(symbol=symbol, limit=8) + ctx["paper"] = _paper_context(symbol=symbol) + ctx["sources"] = ["binance_klines", "recommendation", "event_news", "onchain", "review_log", "paper_trading"] + elif intent == "market_overview": + ctx.update(_market_context()) + ctx["pipeline"] = get_pipeline_runs(limit=5, hours=24, offset=0) + ctx["sources"] = ["market_overview", "onchain_overview", "llm_sentiment", "pipeline_runs"] + elif intent == "paper_trading": + ctx["paper"] = _paper_context(symbol=symbol) + ctx["sources"] = ["paper_trading"] + elif intent == "review": + ctx["reviews"] = _review_context(symbol=symbol, limit=12) + ctx["recommendations"] = _latest_recommendations(symbol=symbol, limit=8) if symbol else _latest_recommendations(limit=8) + ctx["paper"] = _paper_context(symbol=symbol) + ctx["sources"] = ["review_log", "recommendation", "paper_trading"] + elif intent == "sentiment": + ctx["sentiment_events"] = _sentiment_context(symbol=symbol, limit=20) + try: + from app.services.llm_insights import get_latest_sentiment_batch_analysis + + ctx["ai_sentiment"] = get_latest_sentiment_batch_analysis() or {} + except Exception: + ctx["ai_sentiment"] = {} + ctx["sources"] = ["event_news", "llm_sentiment"] + elif intent == "onchain": + if symbol: + try: + ctx["onchain"] = get_onchain_token_detail(symbol=symbol, hours=168) + except Exception as exc: + ctx["onchain"] = {"error": str(exc)[:200]} + else: + ctx["onchain"] = get_onchain_overview(hours=24) + ctx["sources"] = ["onchain"] + else: + ctx["capabilities"] = [ + "单币技术面:自动拉取 Binance 15m/1h/4h/1d K 线", + "推荐解释:结合当前看板状态、入场窗口、TP/SL、风险理由", + "舆情和链上:读取当前系统已采集与 AI 解读的结果", + "复盘和模拟交易:区分信号表现与纸面交易收益", + ] + return ctx + + +def _fallback_answer(intent: str, message: str, context: dict) -> dict: + if intent == "unsupported": + return { + "summary": "我只能回答加密货币和 AlphaX 当前数据相关的问题。", + "answer": "这个问题超出了 Crypto 研究助手的范围。你可以问某个币的技术面、看板推荐原因、链上异动、舆情影响、复盘或模拟交易表现。", + "evidence": [], + "related_records": [], + "followups": ["分析 BTC/USDT 的技术面", "今天市场适合追强势币吗?"], + } + symbol = context.get("symbol") or "" + tech = context.get("technicals") or {} + tech_summary = tech.get("summary") or {} + recommendations = context.get("recommendations") or [] + sentiment = context.get("sentiment_events") or [] + onchain = context.get("onchain") or {} + paper = context.get("paper") or {} + evidence = [] + if tech_summary: + evidence.append(f"技术面:{tech_summary.get('headline', '')}") + if recommendations: + r = recommendations[0] + evidence.append(f"主链路:{r.get('execution_label') or r.get('action_status') or r.get('execution_status')},原因:{r.get('execution_reason') or r.get('state_reason') or '--'}") + if sentiment: + evidence.append(f"舆情:近 72h 有 {len(sentiment)} 条相关事件,最新为「{sentiment[0].get('title', '')[:60]}」。") + if isinstance(onchain, dict) and (onchain.get("events") or onchain.get("metrics")): + evidence.append(f"链上:近 7 天有 {len(onchain.get('events') or [])} 条映射事件。") + if isinstance(paper, dict) and paper.get("trades"): + latest = paper["trades"][0] + evidence.append(f"模拟交易:最新状态 {latest.get('status')},收益 {latest.get('pnl_pct') or latest.get('realized_pnl_pct') or 0}%。") + if not evidence: + evidence.append("当前数据库没有足够样本,结论需要降级为观察。") + + if intent == "market_overview": + market = (context.get("market") or {}) + state = market.get("state") or {} + summary = state.get("label") or "市场数据已读取" + answer = state.get("summary") or "我读取了全市场行情,但当前没有足够信息形成强结论。" + elif symbol: + summary = tech_summary.get("headline") or f"{symbol} 需要继续观察" + answer = "结论:" + summary + " 证据区已汇总技术面、推荐、舆情、链上和模拟交易上下文。" + else: + summary = "已读取当前系统数据" + answer = "我已经按你的问题读取了当前数据库,但没有识别到明确币种;可以继续追问具体币种。" + + return { + "summary": summary, + "answer": answer, + "evidence": evidence[:8], + "related_records": _related_records(context), + "followups": _followups(intent, symbol), + } + + +def _related_records(context: dict) -> list[dict]: + records = [] + for item in (context.get("recommendations") or [])[:3]: + records.append({"type": "recommendation", "label": f"推荐 #{item.get('id')}", "symbol": item.get("symbol"), "status": item.get("execution_status")}) + for item in (context.get("sentiment_events") or [])[:3]: + records.append({"type": "sentiment", "label": item.get("title"), "symbol": item.get("symbol"), "status": item.get("importance")}) + for item in ((context.get("paper") or {}).get("trades") or [])[:3]: + records.append({"type": "paper_trade", "label": f"模拟交易 #{item.get('id')}", "symbol": item.get("symbol"), "status": item.get("status")}) + return records + + +def _compact_technical_context(context: dict) -> dict: + tech = context.get("technicals") or {} + tfs = {} + for tf, item in (tech.get("timeframes") or {}).items(): + if not item.get("available"): + tfs[tf] = {"available": False, "reason": item.get("reason") or ""} + continue + tfs[tf] = { + "available": True, + "price": item.get("price"), + "trend": item.get("trend"), + "rsi14": item.get("rsi14"), + "volume_ratio_20": item.get("volume_ratio_20"), + "range_position_pct": item.get("range_position_pct"), + "support": item.get("support"), + "resistance": item.get("resistance"), + "pa": { + "dynamic_count_recent": (item.get("pa") or {}).get("dynamic_count_recent"), + "static_count_recent": (item.get("pa") or {}).get("static_count_recent"), + "ignition_count": len((item.get("pa") or {}).get("ignition_points") or []), + "zone_count": len((item.get("pa") or {}).get("top_zones") or []), + }, + } + return { + "summary": tech.get("summary") or {}, + "binance_tradable": bool(tech.get("binance_tradable")), + "timeframes": tfs, + } + + +def _followups(intent: str, symbol: str = "") -> list[str]: + if symbol: + base = symbol.replace("/USDT", "") + return [ + f"{base} 现在追高风险大吗?", + f"{base} 的链上和舆情有没有共振?", + f"{base} 如果要等回踩,关键价位在哪里?", + ] + if intent == "market_overview": + return ["今天市场更适合做日内还是 1-3 天波段?", "当前涨幅榜有什么共性?"] + return ["分析 BTC/USDT 现在的技术面", "解释最新推荐为什么不是可买"] + + +def _call_chat_llm(message: str, context: dict, history=None) -> dict: + cfg = llm_config() + params = get_llm_params() + if not bool(cfg.get("enabled", False)) or not os.getenv(str(params.get("api_key_env") or "ALPHAX_LLM_API_KEY"), "").strip(): + return {"status": "skipped", "error": "llm_disabled"} + if not bool((cfg.get("modules") or {}).get("chat", True)): + return {"status": "skipped", "error": "chat_module_disabled"} + api_key = os.getenv(str(params.get("api_key_env") or "ALPHAX_LLM_API_KEY"), "").strip() + base_url = str(params.get("base_url") or "https://api.openai.com/v1").rstrip("/") + model = str(params.get("model") or "gpt-4o-mini").strip() + payload = { + "user_question": message, + "context": context, + "recent_history": (history or [])[-8:], + "rules": [ + "只回答加密货币、AlphaX 当前数据、技术面、链上、舆情、复盘和模拟交易相关问题。", + "不要给真实下单指令,不要修改推荐状态,不要承诺收益。", + "回答使用中文,采用两段式:先结论,再证据。", + "输出严格 JSON:summary, answer, evidence[], risk_flags[], related_records[], followups[]。", + ], + } + system_prompt = "你是 AlphaX Agent 的 Crypto 研究助手。你只能基于提供的结构化数据回答,不能编造数据。" + try: + resp = requests.post( + f"{base_url}/chat/completions", + headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, + json={ + "model": model, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": _compact_json(payload)}, + ], + "temperature": 0.15, + "max_tokens": int(params.get("max_tokens") or 900), + "response_format": {"type": "json_object"}, + }, + timeout=int(params.get("timeout") or 20), + ) + if resp.status_code >= 400: + return {"status": "failed", "error": f"http_{resp.status_code}:{resp.text[:300]}", "model": model} + data = resp.json() + content = (((data.get("choices") or [{}])[0]).get("message") or {}).get("content") or "{}" + content = repair_mojibake_text(content) + parsed = repair_mojibake_json(json.loads(content)) + if not isinstance(parsed, dict): + raise ValueError("llm_output_not_object") + parsed.setdefault("summary", parsed.get("answer", "")[:80]) + parsed.setdefault("evidence", []) + parsed.setdefault("risk_flags", []) + parsed.setdefault("related_records", _related_records(context)) + parsed.setdefault("followups", _followups(context.get("intent"), context.get("symbol"))) + return {"status": "success", "content": parsed, "model": model} + except Exception as exc: + return {"status": "failed", "error": str(exc)[:500], "model": model} + + +def answer_chat(user_id: int, message: str, session_id: int = 0) -> dict: + message = str(message or "").strip() + if not message: + raise ValueError("message_required") + prefs = chat_assistant_db.get_user_preferences(user_id) + session = chat_assistant_db.get_chat_session(session_id, user_id) if session_id else None + symbol = extract_symbol(message, session=session, preferences=prefs) + intent = detect_intent(message, symbol=symbol) + if not session: + title = symbol or message[:24] + session = chat_assistant_db.create_chat_session(user_id, title=title, last_symbol=symbol, last_intent=intent) + user_msg = chat_assistant_db.append_chat_message( + session["id"], + user_id, + "user", + content_text=message, + content_json={"text": message}, + intent=intent, + symbol=symbol, + ) + history = chat_assistant_db.list_chat_messages(session["id"], user_id, limit=12).get("items", []) + context = build_context(intent, message, symbol, preferences=prefs) + llm_result = _call_chat_llm(message, context, history=history) + if llm_result.get("status") == "success": + answer = repair_mojibake_json(llm_result.get("content") or {}) + model = llm_result.get("model") or "" + else: + answer = repair_mojibake_json(_fallback_answer(intent, message, context)) + answer["llm_status"] = llm_result.get("status") + answer["llm_error"] = llm_result.get("error", "") + model = llm_result.get("model") or "" + answer.setdefault("related_records", _related_records(context)) + answer.setdefault("followups", _followups(intent, symbol)) + answer.setdefault("evidence", []) + assistant_text = repair_mojibake_text(answer.get("answer") or answer.get("summary") or "") + assistant_msg = chat_assistant_db.append_chat_message( + session["id"], + user_id, + "assistant", + content_text=assistant_text, + content_json=answer, + context_json={ + "intent": intent, + "symbol": symbol, + "context_hash": compute_input_hash(context), + "sources": context.get("sources") or [], + "technical_summary": (context.get("technicals") or {}).get("summary") or {}, + "technicals": _compact_technical_context(context), + }, + intent=intent, + symbol=symbol, + model=model, + ) + memory = session.get("memory") or {} + if symbol: + memory["last_symbol"] = symbol + memory["last_intent"] = intent + chat_assistant_db.update_chat_session( + session["id"], + user_id, + title=session.get("title") if session.get("title") != "新对话" else (symbol or message[:24]), + memory_json=memory, + last_symbol=symbol, + last_intent=intent, + ) + pref_patch = {"last_intent": intent, "recent_topics": [intent]} + if symbol: + pref_patch["last_symbol"] = symbol + pref_patch["preferred_symbols"] = [symbol] + chat_assistant_db.update_user_preferences(user_id, pref_patch) + return { + "ok": True, + "session": chat_assistant_db.get_chat_session(session["id"], user_id), + "user_message": user_msg, + "assistant_message": assistant_msg, + "answer": answer, + "intent": intent, + "intent_label": INTENT_LABELS.get(intent, intent), + "symbol": symbol, + "context": { + "sources": context.get("sources") or [], + "technicals": context.get("technicals") or {}, + "related_records": answer.get("related_records") or [], + }, + } + + +__all__ = [ + "analyze_symbol_technicals", + "answer_chat", + "build_context", + "detect_intent", + "extract_symbol", + "fetch_binance_klines", +] diff --git a/app/services/event_driven_screener.py b/app/services/event_driven_screener.py index ec4595a..0b26cda 100644 --- a/app/services/event_driven_screener.py +++ b/app/services/event_driven_screener.py @@ -25,7 +25,7 @@ import yaml sys.path.insert(0, os.path.dirname(__file__)) from app.config.config_loader import load_rules, get_meta, get_strategy_direction -from app.db.altcoin_db import init_db, get_conn, create_recommendation, log_screening, log_cron_run, get_recommendation_for_push +from app.db.altcoin_db import init_db, get_conn, create_recommendation, log_screening, log_cron_run from app.db.postgres_connection import ensure_migrations_once from app.db.llm_insights import repair_mojibake_json, repair_mojibake_text from app.core.opportunity_funnel import build_screening_detail @@ -41,7 +41,6 @@ from app.services.altcoin_screener import ( ) from app.services.altcoin_confirm import fetch_derivatives_context from app.core.pa_engine import full_pa_analysis, calc_atr -from app.integrations.push_orchestrator import push_mainline_state_update exchange = ccxt.binance({"enableRateLimit": True}) @@ -838,21 +837,6 @@ def process_event(event): sector_context={"event_title": event.get("title"), "event_url": event.get("url"), "event_source": event.get("source"), "event_importance": event.get("importance"), "trigger_context": result.get("trigger_context", {})}, ) - # 飞书只是通知层:事件脚本不再直接推 observe/risk,也不允许 rec_id=0 的事件旁路进通知。 - # 只有 decision=recommend 且已创建主推荐记录后,消费主链路派生状态进行通知。 - if decision == "recommend" and rec_id and _cfg().get("push", {}).get(decision, True): - mainline_item = get_recommendation_for_push(rec_id) - pushed = push_mainline_state_update( - symbol, - rec_id, - mainline_item, - title_prefix="事件触发机会", - entry_push_type="event_entry", - watch_push_type="event_watch_pool", - ) - elif decision in ("observe", "risk"): - print(f"[event] skip push {symbol}: decision={decision} is not a主链路推荐通知") - conn = get_conn() conn.execute(""" UPDATE event_news SET processed=1, decision=%s, tech_score=%s, rec_id=%s, pushed=%s diff --git a/app/services/price_tracker.py b/app/services/price_tracker.py index 772b8bf..0435255 100644 --- a/app/services/price_tracker.py +++ b/app/services/price_tracker.py @@ -30,7 +30,6 @@ from app.db.altcoin_db import ( update_latest_price_cache, ) from app.db.paper_trading import sync_recommendation as sync_paper_trade -from app.integrations.push_orchestrator import push_trade_action_update from app.core.pa_engine import ( calc_atr, full_pa_analysis, detect_trend_exhaustion, analyze_entry_point, diff --git a/app/services/review_engine.py b/app/services/review_engine.py index 3932fce..e936abd 100644 --- a/app/services/review_engine.py +++ b/app/services/review_engine.py @@ -32,7 +32,6 @@ from app.config.config_loader import ( promote_candidate_rule_to_learned_rule, bump_strategy_patch_version, ) from app.analysis import reverse_analysis -from app.integrations import feishu_review_push import requests BINANCE_API = "https://api.binance.com/api/v3" @@ -1499,26 +1498,6 @@ def run_review(push_enabled: bool = True, compact: bool = False): f"\n信号淘汰: {'; '.join(results['signal_deprecations'][:5])}" ) - # 7. 飞书推送 - if push_enabled: - try: - ok1, r1 = feishu_review_push.push_review_report(results) - print(f"[review_engine] 复盘报告推送: ok={ok1}") - - if results["reverse_analysis"] and not results["reverse_analysis"].get("error"): - ok2, r2 = feishu_review_push.push_reverse_analysis_report(results["reverse_analysis"]) - print(f"[review_engine] 逆向分析报告推送: ok={ok2}") - - for rule in new_pattern_rules: - feishu_review_push.push_rule_update_notification(rule.get("candidate_id"), rule.get("description", ""), status="候选规则,未生效") - - if results["reverse_analysis"] and results["reverse_analysis"].get("new_rules"): - for rule in results["reverse_analysis"]["new_rules"]: - feishu_review_push.push_rule_update_notification(rule.get("candidate_id"), rule.get("description", ""), status="逆向候选,未生效") - - except Exception as e: - print(f"[review_engine] 飞书推送失败: {e}") - # 8. 更新meta(迭代元数据) update_meta("last_review", now.isoformat()) meta = get_review_params() # 先读当前meta diff --git a/app/web/routes_chat.py b/app/web/routes_chat.py new file mode 100644 index 0000000..a7a6718 --- /dev/null +++ b/app/web/routes_chat.py @@ -0,0 +1,54 @@ +from fastapi import APIRouter, Cookie, HTTPException + +from app.db import chat_assistant_db +from app.services.chat_assistant import answer_chat +from app.web.shared import ChatSendRequest, ChatSessionRequest, require_api_user_with_subscription + + +router = APIRouter() + + +@router.get("/api/chat/bootstrap") +async def api_chat_bootstrap(altcoin_session: str = Cookie(default="")): + user = require_api_user_with_subscription(altcoin_session) + return chat_assistant_db.bootstrap_chat(user["id"]) + + +@router.get("/api/chat/sessions") +async def api_chat_sessions(limit: int = 20, offset: int = 0, altcoin_session: str = Cookie(default="")): + user = require_api_user_with_subscription(altcoin_session) + return chat_assistant_db.list_chat_sessions(user["id"], limit=limit, offset=offset) + + +@router.post("/api/chat/sessions") +async def api_chat_create_session(req: ChatSessionRequest, altcoin_session: str = Cookie(default="")): + user = require_api_user_with_subscription(altcoin_session) + return {"ok": True, "session": chat_assistant_db.create_chat_session(user["id"], title=req.title)} + + +@router.get("/api/chat/sessions/{session_id}") +async def api_chat_session_detail(session_id: int, altcoin_session: str = Cookie(default="")): + user = require_api_user_with_subscription(altcoin_session) + session = chat_assistant_db.get_chat_session(session_id, user["id"]) + if not session: + raise HTTPException(status_code=404, detail="对话不存在") + messages = chat_assistant_db.list_chat_messages(session_id, user["id"], limit=200) + return {"ok": True, "session": session, "messages": messages} + + +@router.delete("/api/chat/sessions/{session_id}") +async def api_chat_archive_session(session_id: int, altcoin_session: str = Cookie(default="")): + user = require_api_user_with_subscription(altcoin_session) + session = chat_assistant_db.update_chat_session(session_id, user["id"], archived_at="now") + if not session: + raise HTTPException(status_code=404, detail="对话不存在") + return {"ok": True} + + +@router.post("/api/chat/send") +async def api_chat_send(req: ChatSendRequest, altcoin_session: str = Cookie(default="")): + user = require_api_user_with_subscription(altcoin_session) + try: + return answer_chat(user["id"], req.message, session_id=req.session_id) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) diff --git a/app/web/routes_pages.py b/app/web/routes_pages.py index d7430dc..540b84a 100644 --- a/app/web/routes_pages.py +++ b/app/web/routes_pages.py @@ -49,6 +49,13 @@ def build_router(templates, repo_root: Path, stock_report_template: str): return redirect return render_page("pipeline.html", request, active_nav="pipeline") + @router.get("/chat", response_class=HTMLResponse) + async def chat_page(request: Request): + user, redirect = require_page_user(request) + if redirect: + return redirect + return render_page("chat.html", request, active_nav="chat") + @router.get("/llm-insights", response_class=HTMLResponse) async def llm_insights_page(request: Request): user, redirect = require_page_user(request) diff --git a/app/web/shared.py b/app/web/shared.py index 5735f17..378b524 100644 --- a/app/web/shared.py +++ b/app/web/shared.py @@ -84,6 +84,15 @@ class RuntimeConfigRequest(BaseModel): description: str = "" +class ChatSessionRequest(BaseModel): + title: str = "" + + +class ChatSendRequest(BaseModel): + session_id: int = 0 + message: str + + def auth_error(exc: Exception, status_code: int = 400): raise HTTPException(status_code=status_code, detail=str(exc)) diff --git a/app/web/web_server.py b/app/web/web_server.py index c1cacb1..179fca7 100644 --- a/app/web/web_server.py +++ b/app/web/web_server.py @@ -16,6 +16,7 @@ from app.db.analytics import get_all_recommendations, get_cron_run_logs, get_cro from app.db.recommendation_queries import get_active_recommendations, get_active_recommendations_deduped from app.web.routes_admin import build_router as build_admin_router from app.web.routes_auth import router as auth_router +from app.web.routes_chat import router as chat_router from app.web.routes_content import build_router as build_content_router from app.web.routes_market import router as market_router from app.web.routes_onchain import router as onchain_router @@ -44,6 +45,7 @@ app = FastAPI(title="AlphaX Agent", lifespan=lifespan) templates = Jinja2Templates(directory=str(REPO_ROOT / "static")) app.include_router(auth_router) +app.include_router(chat_router) app.include_router(recommendations_router) app.include_router(strategy_router) app.include_router(onchain_router) diff --git a/static/base.html b/static/base.html index 396ea15..d590141 100644 --- a/static/base.html +++ b/static/base.html @@ -155,6 +155,7 @@ a { color: inherit; text-decoration: none; } + @@ -175,6 +176,7 @@ a { color: inherit; text-decoration: none; }