diff --git a/.env.example b/.env.example index 17e468a..df9182e 100644 --- a/.env.example +++ b/.env.example @@ -20,6 +20,18 @@ ALPHAX_DEFAULT_ADMIN_PASSWORD=AlphaXAdmin123 # ALTCOIN_FEISHU_WEBHOOK=https://open.feishu.cn/open-apis/bot/v2/hook/REDACTED ALTCOIN_FEISHU_WEBHOOK= +# LLM 解释层运行时配置。默认关闭;只异步生成缓存解释,不参与策略决策。 +ALPHAX_LLM_ENABLED=0 +ALPHAX_LLM_BASE_URL=https://api.openai.com/v1 +ALPHAX_LLM_API_KEY= +ALPHAX_LLM_API_KEY_ENV=ALPHAX_LLM_API_KEY +ALPHAX_LLM_MODEL=gpt-4o-mini +ALPHAX_LLM_TIMEOUT=20 +ALPHAX_LLM_MAX_TOKENS=900 +ALPHAX_LLM_RECOMMENDATIONS_ENABLED=1 +ALPHAX_LLM_SENTIMENT_ENABLED=1 +ALPHAX_LLM_REVIEW_ENABLED=1 + # 邮箱验证码 SMTP 配置。没有配置时,注册验证码只会生成,不会发邮件。 ASTOCK_SMTP_HOST= ASTOCK_SMTP_PORT=465 diff --git a/README_DOCKER.md b/README_DOCKER.md index f337fe0..4e236de 100644 --- a/README_DOCKER.md +++ b/README_DOCKER.md @@ -96,6 +96,25 @@ docker compose config > 当前机器如果没有 Docker,只能做离线文件/语法/DB 校验;到有 Docker 的机器上再执行 build/up。 +## LLM 解释层配置 + +LLM 是运行时系统能力,不属于策略参数,不写入 `rules.yaml`。在 `.env` 中配置即可: + +```bash +ALPHAX_LLM_ENABLED=1 +ALPHAX_LLM_BASE_URL=https://api.openai.com/v1 +ALPHAX_LLM_API_KEY=your-key +ALPHAX_LLM_MODEL=gpt-4o-mini +``` + +生成缓存解释: + +```bash +docker compose exec alphax-web python -m app.cli llm-insights --scope recommendations --limit 30 +docker compose exec alphax-web python -m app.cli llm-insights --scope sentiment --limit 30 +docker compose exec alphax-web python -m app.cli llm-insights --scope review --limit 10 +``` + ## 数据迁移 当前副本是从线上目录复制来的,包含复制时刻的 `altcoin_monitor.db`。为了避免误影响线上,容器读写的是副本目录下的: diff --git a/app/cli.py b/app/cli.py index 161846a..e77c5c6 100644 --- a/app/cli.py +++ b/app/cli.py @@ -29,6 +29,10 @@ def build_parser(): sentiment.add_argument("--check", action="store_true", help="输出舆情异动") sentiment.add_argument("--scores", action="store_true", help="输出评分") + llm = subparsers.add_parser("llm-insights", help="异步生成 LLM 缓存解释") + llm.add_argument("--scope", choices=["recommendations", "sentiment", "sentiment-events", "review"], default="recommendations") + llm.add_argument("--limit", type=int, default=30) + return parser @@ -70,6 +74,12 @@ def main(): } print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2)) return result + if args.command == "llm-insights": + from app.services import llm_insights + + result = llm_insights.run(scope=args.scope, limit=args.limit) + print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2)) + return result parser.error(f"unknown command: {args.command}") diff --git a/app/db/altcoin_db.py b/app/db/altcoin_db.py index 6f15429..8102c87 100644 --- a/app/db/altcoin_db.py +++ b/app/db/altcoin_db.py @@ -381,8 +381,39 @@ def init_db(): """) conn.execute("CREATE INDEX IF NOT EXISTS idx_sentiment_lookup ON sentiment_events(symbol, source, detected_at)") + # 10. LLM解释层缓存:只保存异步生成的解释/研究备忘,不参与交易决策。 + conn.execute(""" + CREATE TABLE IF NOT EXISTS llm_insights ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + target_type TEXT NOT NULL, + target_id TEXT NOT NULL, + insight_type TEXT NOT NULL, + prompt_version TEXT NOT NULL, + input_hash TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'success', + input_json TEXT DEFAULT '{}', + content_json TEXT DEFAULT '{}', + error TEXT DEFAULT '', + model TEXT DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + try: + conn.execute("ALTER TABLE llm_insights ADD COLUMN input_json TEXT DEFAULT '{}'") + except Exception: + pass + conn.execute(""" + CREATE UNIQUE INDEX IF NOT EXISTS idx_llm_insights_unique + ON llm_insights(target_type, target_id, insight_type, input_hash) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_llm_insights_lookup + ON llm_insights(target_type, target_id, insight_type, status, updated_at) + """) + conn.commit() - print("DB初始化完成(9+1表)") + print("DB初始化完成(9+2表)") # === 推送去重 === @@ -1482,8 +1513,17 @@ def get_active_recommendations_deduped(actionable_only=True, version="", hours=0 summary["expired_filtered"] = summary.pop("expired", 0) if not with_meta: - return all_items + try: + from app.services.llm_insights import attach_recommendation_insights + return attach_recommendation_insights(all_items) + except Exception: + return all_items page_items = all_items[offset: offset + limit] if limit else all_items[offset:] + try: + from app.services.llm_insights import attach_recommendation_insights + attach_recommendation_insights(page_items) + except Exception: + pass return { "items": page_items, "total": len(all_items), diff --git a/app/db/analytics.py b/app/db/analytics.py index ef12eae..3b88de8 100644 --- a/app/db/analytics.py +++ b/app/db/analytics.py @@ -1060,26 +1060,40 @@ def get_pipeline_runs(limit=30, hours=24, offset=0): """, (datetime.now().isoformat(), hours / 24.0, limit, offset), ).fetchall() + all_run_rows = conn.execute( + """ + SELECT * FROM cron_run_log + WHERE job_name = '粗筛' + AND julianday(?) - julianday(started_at) <= ? + ORDER BY started_at DESC, id DESC + """, + (datetime.now().isoformat(), hours / 24.0), + ).fetchall() runs = [] for row in run_rows: run = _cron_item(row) related = _select_pipeline_rows(conn, run) runs.append(_pipeline_summary_for_run(run, related)) + all_summaries = [] + for row in all_run_rows: + run = _cron_item(row) + related = _select_pipeline_rows(conn, run) + all_summaries.append(_pipeline_summary_for_run(run, related)) conn.close() kpi = { "hours": hours, - "run_count": len(runs), - "rough_candidates": sum(item["rough_candidates"] for item in runs), - "fine_qualified": sum(item["fine_qualified"] for item in runs), - "confirm_processed": sum(item["confirm_processed"] for item in runs), - "confirm_hits": sum(item["confirm_hits"] for item in runs), - "recommendations": sum(item["recommendations"] for item in runs), - "perf_success": sum(item["perf_success"] for item in runs), - "perf_failed": sum(item["perf_failed"] for item in runs), - "perf_pending": sum(item["perf_pending"] for item in runs), - "missed_count": sum(item["missed_count"] for item in runs), + "run_count": len(all_summaries), + "rough_candidates": sum(item["rough_candidates"] for item in all_summaries), + "fine_qualified": sum(item["fine_qualified"] for item in all_summaries), + "confirm_processed": sum(item["confirm_processed"] for item in all_summaries), + "confirm_hits": sum(item["confirm_hits"] for item in all_summaries), + "recommendations": sum(item["recommendations"] for item in all_summaries), + "perf_success": sum(item["perf_success"] for item in all_summaries), + "perf_failed": sum(item["perf_failed"] for item in all_summaries), + "perf_pending": sum(item["perf_pending"] for item in all_summaries), + "missed_count": sum(item["missed_count"] for item in all_summaries), } kpi["recommendation_rate"] = round(kpi["recommendations"] / kpi["fine_qualified"] * 100, 1) if kpi["fine_qualified"] else 0 kpi["performance_hit_rate"] = round(kpi["perf_success"] / (kpi["perf_success"] + kpi["perf_failed"]) * 100, 1) if (kpi["perf_success"] + kpi["perf_failed"]) else 0 diff --git a/app/db/llm_insights.py b/app/db/llm_insights.py new file mode 100644 index 0000000..c0e4e30 --- /dev/null +++ b/app/db/llm_insights.py @@ -0,0 +1,264 @@ +"""Cached LLM insight storage helpers.""" + +import hashlib +import json +from datetime import datetime + +from app.db.schema import get_conn + + +_MOJIBAKE_MARKERS = ("Ã", "Â", "ç", "è", "é", "å", "æ", "ä", "ï¼", "ã") + + +def _looks_mojibake(value): + text = str(value or "") + if not text: + return False + return any(marker in text for marker in _MOJIBAKE_MARKERS) + + +def repair_mojibake_text(value): + """Repair common UTF-8-as-latin1 mojibake from model/provider responses.""" + if not isinstance(value, str) or not _looks_mojibake(value): + return value + try: + repaired = value.encode("latin1").decode("utf-8") + except Exception: + repaired = _repair_mixed_mojibake_text(value) + return repaired if repaired != value else value + # Only accept the repair when it produces visible CJK text or common CJK punctuation. + if any("\u4e00" <= ch <= "\u9fff" for ch in repaired) or any(ch in repaired for ch in ",。;:!?()《》"): + return repaired + return value + + +def _repair_mixed_mojibake_text(value): + """Repair strings that contain normal CJK text plus mojibake fragments.""" + text = str(value or "") + separators = (": ", ":", " - ", ",", "。") + for sep in separators: + if sep not in text: + continue + left, right = text.split(sep, 1) + fixed_right = repair_mojibake_text(right) + if fixed_right != right: + return left + sep + fixed_right + return value + + +def repair_mojibake_json(value): + if isinstance(value, dict): + return { + repair_mojibake_text(k) if isinstance(k, str) else k: repair_mojibake_json(v) + for k, v in value.items() + } + if isinstance(value, list): + return [repair_mojibake_json(v) for v in value] + if isinstance(value, str): + return repair_mojibake_text(value) + return value + + +def compute_input_hash(payload): + """Stable hash for structured LLM input payloads.""" + raw = json.dumps(payload or {}, ensure_ascii=False, sort_keys=True, separators=(",", ":"), default=str) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def _load_content(row): + item = dict(row) + try: + item["content"] = repair_mojibake_json(json.loads(item.get("content_json") or "{}")) + except Exception: + item["content"] = {} + try: + item["input"] = repair_mojibake_json(json.loads(item.get("input_json") or "{}")) + except Exception: + item["input"] = {} + return item + + +def get_cached_insight(target_type, target_id, insight_type, input_hash=None, success_only=True): + conn = get_conn() + where = "target_type=? AND target_id=? AND insight_type=?" + params = [str(target_type), str(target_id), str(insight_type)] + if input_hash: + where += " AND input_hash=?" + params.append(str(input_hash)) + if success_only: + where += " AND status='success'" + row = conn.execute( + f""" + SELECT * FROM llm_insights + WHERE {where} + ORDER BY updated_at DESC, id DESC + LIMIT 1 + """, + tuple(params), + ).fetchone() + conn.close() + return _load_content(row) if row else None + + +def get_any_insight(target_type, target_id, insight_type, input_hash): + return get_cached_insight(target_type, target_id, insight_type, input_hash=input_hash, success_only=False) + + +def upsert_insight( + target_type, + target_id, + insight_type, + prompt_version, + input_hash, + status, + input_payload=None, + content=None, + error="", + model="", +): + now = datetime.now().isoformat() + input_json = json.dumps(repair_mojibake_json(input_payload or {}), ensure_ascii=False, default=str) + content_json = json.dumps(repair_mojibake_json(content or {}), ensure_ascii=False, default=str) + conn = get_conn() + conn.execute( + """ + INSERT INTO llm_insights ( + target_type, target_id, insight_type, prompt_version, input_hash, + status, input_json, content_json, error, model, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(target_type, target_id, insight_type, input_hash) DO UPDATE SET + prompt_version=excluded.prompt_version, + status=excluded.status, + input_json=excluded.input_json, + content_json=excluded.content_json, + error=excluded.error, + model=excluded.model, + updated_at=excluded.updated_at + """, + ( + str(target_type), + str(target_id), + str(insight_type), + str(prompt_version), + str(input_hash), + str(status), + input_json, + content_json, + str(error or "")[:2000], + str(model or ""), + now, + now, + ), + ) + conn.commit() + row = conn.execute( + """ + SELECT * FROM llm_insights + WHERE target_type=? AND target_id=? AND insight_type=? AND input_hash=? + """, + (str(target_type), str(target_id), str(insight_type), str(input_hash)), + ).fetchone() + conn.close() + return _load_content(row) if row else None + + +def get_insights_for_targets(target_type, target_ids, insight_type): + ids = [str(x) for x in (target_ids or []) if str(x or "").strip()] + if not ids: + return {} + placeholders = ",".join(["?"] * len(ids)) + conn = get_conn() + rows = conn.execute( + f""" + SELECT * FROM llm_insights + WHERE target_type=? AND insight_type=? AND status='success' + AND target_id IN ({placeholders}) + ORDER BY updated_at DESC, id DESC + """, + tuple([str(target_type), str(insight_type)] + ids), + ).fetchall() + conn.close() + result = {} + for row in rows: + item = _load_content(row) + result.setdefault(str(item.get("target_id")), item) + return result + + +def get_latest_insight_by_type(target_type, insight_type, success_only=True): + conn = get_conn() + status_clause = "AND status='success'" if success_only else "" + row = conn.execute( + f""" + SELECT * FROM llm_insights + WHERE target_type=? AND insight_type=? {status_clause} + ORDER BY updated_at DESC, id DESC + LIMIT 1 + """, + (str(target_type), str(insight_type)), + ).fetchone() + conn.close() + return _load_content(row) if row else None + + +def list_llm_insights(limit=50, offset=0, target_type="", status="", insight_type=""): + try: + limit = min(100, max(1, int(limit or 50))) + except Exception: + limit = 50 + try: + offset = max(0, int(offset or 0)) + except Exception: + offset = 0 + where = [] + params = [] + if target_type: + where.append("target_type=?") + params.append(str(target_type)) + if status: + where.append("status=?") + params.append(str(status)) + if insight_type: + where.append("insight_type=?") + params.append(str(insight_type)) + clause = ("WHERE " + " AND ".join(where)) if where else "" + conn = get_conn() + total = conn.execute(f"SELECT COUNT(*) FROM llm_insights {clause}", tuple(params)).fetchone()[0] + rows = conn.execute( + f""" + SELECT * FROM llm_insights + {clause} + ORDER BY updated_at DESC, id DESC + LIMIT ? OFFSET ? + """, + tuple(params + [limit, offset]), + ).fetchall() + conn.close() + return { + "items": [_load_content(row) for row in rows], + "total": int(total or 0), + "limit": limit, + "offset": offset, + "has_more": offset + len(rows) < int(total or 0), + } + + +def get_llm_insight_by_id(insight_id): + conn = get_conn() + row = conn.execute("SELECT * FROM llm_insights WHERE id=?", (int(insight_id or 0),)).fetchone() + conn.close() + return _load_content(row) if row else None + + +__all__ = [ + "compute_input_hash", + "get_any_insight", + "get_cached_insight", + "get_insights_for_targets", + "get_latest_insight_by_type", + "get_llm_insight_by_id", + "list_llm_insights", + "repair_mojibake_json", + "repair_mojibake_text", + "upsert_insight", +] diff --git a/app/db/recommendation_queries.py b/app/db/recommendation_queries.py index 16bc321..757d733 100644 --- a/app/db/recommendation_queries.py +++ b/app/db/recommendation_queries.py @@ -11,6 +11,7 @@ from app.db.altcoin_db import ( update_recommendation_tracking, ) from app.db.schema import get_conn +from app.services.llm_insights import attach_recommendation_insights def should_push(symbol: str, push_type: str, action_status: str = "") -> bool: @@ -208,8 +209,9 @@ def get_active_recommendations_deduped( summary["expired_filtered"] = summary.pop("expired", 0) if not with_meta: - return all_items + return attach_recommendation_insights(all_items) page_items = all_items[offset : offset + limit] if limit else all_items[offset:] + attach_recommendation_insights(page_items) return { "items": page_items, "total": len(all_items), diff --git a/app/db/scheduler_db.py b/app/db/scheduler_db.py new file mode 100644 index 0000000..6b67436 --- /dev/null +++ b/app/db/scheduler_db.py @@ -0,0 +1,491 @@ +"""SQLite-backed scheduler configuration and runtime state.""" + +import json +from datetime import datetime + +from app.db.schema import get_conn + + +DEFAULT_JOBS = [ + { + "job_name": "event", + "command": "event", + "args": [], + "every_seconds": 60, + "initial_delay": 5, + "lock_group": "recommendation_write", + "description": "事件/舆情驱动技术检查", + "sort_order": 10, + }, + { + "job_name": "tracker", + "command": "tracker", + "args": [], + "every_seconds": 180, + "initial_delay": 20, + "lock_group": "tracking_write", + "description": "推荐价格跟踪", + "sort_order": 20, + }, + { + "job_name": "confirm", + "command": "confirm", + "args": [], + "every_seconds": 600, + "initial_delay": 40, + "lock_group": "recommendation_write", + "description": "确认层", + "sort_order": 30, + }, + { + "job_name": "screener", + "command": "screener", + "args": [], + "every_seconds": 900, + "initial_delay": 80, + "lock_group": "screening_write", + "description": "粗筛/细筛", + "sort_order": 40, + }, + { + "job_name": "sentiment", + "command": "sentiment", + "args": ["--collect"], + "every_seconds": 1800, + "initial_delay": 120, + "lock_group": "sentiment_write", + "description": "舆情采集", + "sort_order": 50, + }, + { + "job_name": "llm-sentiment", + "command": "llm-insights", + "args": ["--scope", "sentiment", "--limit", "40"], + "every_seconds": 1800, + "initial_delay": 180, + "lock_group": "llm_write", + "description": "LLM 批量舆情分析", + "sort_order": 60, + }, + { + "job_name": "review", + "command": "review", + "args": [], + "every_seconds": 86400, + "initial_delay": 300, + "lock_group": "review_write", + "description": "复盘", + "sort_order": 70, + }, +] + + +def _now(): + return datetime.now().isoformat() + + +def _dump(value): + return json.dumps(value or [], ensure_ascii=False, default=str) + + +def _load(value, fallback=None): + try: + return json.loads(value) if isinstance(value, str) else (value if value is not None else fallback) + except Exception: + return fallback + + +def _create_runtime_table(conn): + conn.execute("DROP TABLE IF EXISTS scheduler_runtime_status") + conn.execute( + """ + CREATE TABLE scheduler_runtime_status ( + job_name TEXT PRIMARY KEY, + status TEXT DEFAULT 'idle', + pid INTEGER DEFAULT 0, + run_kind TEXT DEFAULT '', + trigger_id INTEGER DEFAULT 0, + locked_by TEXT DEFAULT '', + next_run_at TEXT DEFAULT '', + last_started_at TEXT DEFAULT '', + last_finished_at TEXT DEFAULT '', + last_exit_code INTEGER DEFAULT 0, + last_duration_ms INTEGER DEFAULT 0, + last_error TEXT DEFAULT '', + output_tail TEXT DEFAULT '', + updated_at TEXT NOT NULL + ) + """ + ) + + +def _create_config_table(conn): + conn.execute("DROP TABLE IF EXISTS scheduler_job_config") + conn.execute( + """ + CREATE TABLE scheduler_job_config ( + job_name TEXT PRIMARY KEY, + command TEXT NOT NULL, + args_json TEXT DEFAULT '[]', + enabled INTEGER DEFAULT 1, + every_seconds INTEGER NOT NULL, + initial_delay INTEGER DEFAULT 0, + lock_group TEXT DEFAULT '', + description TEXT DEFAULT '', + sort_order INTEGER DEFAULT 0, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """ + ) + + +def _create_manual_trigger_table(conn): + conn.execute("DROP INDEX IF EXISTS idx_scheduler_trigger_status") + conn.execute("DROP TABLE IF EXISTS scheduler_manual_trigger") + conn.execute( + """ + CREATE TABLE scheduler_manual_trigger ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_name TEXT NOT NULL, + force INTEGER DEFAULT 0, + status TEXT DEFAULT 'queued', + requested_by TEXT DEFAULT '', + requested_at TEXT NOT NULL, + started_at TEXT DEFAULT '', + finished_at TEXT DEFAULT '', + exit_code INTEGER DEFAULT 0, + duration_ms INTEGER DEFAULT 0, + output_tail TEXT DEFAULT '', + error_message TEXT DEFAULT '' + ) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_scheduler_trigger_status ON scheduler_manual_trigger(status, requested_at)") + + +def _reset_scheduler_tables(conn): + _create_manual_trigger_table(conn) + _create_runtime_table(conn) + _create_config_table(conn) + + +def _seed_scheduler_tables(conn): + now = _now() + for job in DEFAULT_JOBS: + conn.execute( + """ + INSERT INTO scheduler_job_config ( + job_name, command, args_json, enabled, every_seconds, initial_delay, + lock_group, description, sort_order, created_at, updated_at + ) VALUES (?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(job_name) DO UPDATE SET + command=excluded.command, + args_json=excluded.args_json, + initial_delay=excluded.initial_delay, + lock_group=excluded.lock_group, + description=excluded.description, + sort_order=excluded.sort_order, + updated_at=scheduler_job_config.updated_at + """, + ( + job["job_name"], + job["command"], + _dump(job.get("args")), + int(job["every_seconds"]), + int(job.get("initial_delay") or 0), + job.get("lock_group") or "", + job.get("description") or "", + int(job.get("sort_order") or 0), + now, + now, + ), + ) + conn.execute( + """ + INSERT INTO scheduler_runtime_status (job_name, status, updated_at) + VALUES (?, 'idle', ?) + ON CONFLICT(job_name) DO NOTHING + """, + (job["job_name"], now), + ) + + +def init_scheduler_tables(): + conn = get_conn() + conn.execute( + """ + CREATE TABLE IF NOT EXISTS scheduler_job_config ( + job_name TEXT PRIMARY KEY, + command TEXT NOT NULL, + args_json TEXT DEFAULT '[]', + enabled INTEGER DEFAULT 1, + every_seconds INTEGER NOT NULL, + initial_delay INTEGER DEFAULT 0, + lock_group TEXT DEFAULT '', + description TEXT DEFAULT '', + sort_order INTEGER DEFAULT 0, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """ + ) + try: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS scheduler_runtime_status ( + job_name TEXT PRIMARY KEY, + status TEXT DEFAULT 'idle', + pid INTEGER DEFAULT 0, + run_kind TEXT DEFAULT '', + trigger_id INTEGER DEFAULT 0, + locked_by TEXT DEFAULT '', + next_run_at TEXT DEFAULT '', + last_started_at TEXT DEFAULT '', + last_finished_at TEXT DEFAULT '', + last_exit_code INTEGER DEFAULT 0, + last_duration_ms INTEGER DEFAULT 0, + last_error TEXT DEFAULT '', + output_tail TEXT DEFAULT '', + updated_at TEXT NOT NULL + ) + """ + ) + conn.execute("SELECT COUNT(*) FROM scheduler_runtime_status").fetchone() + except Exception: + _create_runtime_table(conn) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS scheduler_manual_trigger ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_name TEXT NOT NULL, + force INTEGER DEFAULT 0, + status TEXT DEFAULT 'queued', + requested_by TEXT DEFAULT '', + requested_at TEXT NOT NULL, + started_at TEXT DEFAULT '', + finished_at TEXT DEFAULT '', + exit_code INTEGER DEFAULT 0, + duration_ms INTEGER DEFAULT 0, + output_tail TEXT DEFAULT '', + error_message TEXT DEFAULT '' + ) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_scheduler_trigger_status ON scheduler_manual_trigger(status, requested_at)") + try: + _seed_scheduler_tables(conn) + except Exception: + _reset_scheduler_tables(conn) + _seed_scheduler_tables(conn) + conn.commit() + conn.close() + + +def get_job_configs(): + init_scheduler_tables() + conn = get_conn() + rows = conn.execute("SELECT * FROM scheduler_job_config ORDER BY sort_order ASC, job_name ASC").fetchall() + conn.close() + jobs = [] + for row in rows: + item = dict(row) + item["args"] = _load(item.pop("args_json", "[]"), []) + item["enabled"] = bool(item.get("enabled")) + jobs.append(item) + return jobs + + +def get_job_config(job_name): + init_scheduler_tables() + conn = get_conn() + row = conn.execute("SELECT * FROM scheduler_job_config WHERE job_name=?", (job_name,)).fetchone() + conn.close() + if not row: + return None + item = dict(row) + item["args"] = _load(item.pop("args_json", "[]"), []) + item["enabled"] = bool(item.get("enabled")) + return item + + +def set_job_enabled(job_name, enabled): + init_scheduler_tables() + now = _now() + conn = get_conn() + cur = conn.execute( + "UPDATE scheduler_job_config SET enabled=?, updated_at=? WHERE job_name=?", + (1 if enabled else 0, now, job_name), + ) + conn.commit() + conn.close() + return cur.rowcount > 0 + + +def set_job_interval(job_name, every_seconds): + seconds = max(30, int(every_seconds or 0)) + init_scheduler_tables() + now = _now() + conn = get_conn() + cur = conn.execute( + "UPDATE scheduler_job_config SET every_seconds=?, updated_at=? WHERE job_name=?", + (seconds, now, job_name), + ) + conn.commit() + conn.close() + return cur.rowcount > 0 + + +def update_runtime(job_name, **fields): + init_scheduler_tables() + allowed = { + "status", "pid", "run_kind", "trigger_id", "locked_by", "next_run_at", + "last_started_at", "last_finished_at", "last_exit_code", "last_duration_ms", + "last_error", "output_tail", + } + values = {k: v for k, v in fields.items() if k in allowed} + values["updated_at"] = _now() + conn = get_conn() + try: + conn.execute( + "INSERT INTO scheduler_runtime_status (job_name, updated_at) VALUES (?, ?) ON CONFLICT(job_name) DO NOTHING", + (job_name, values["updated_at"]), + ) + assignments = ", ".join([f"{k}=?" for k in values]) + conn.execute( + f"UPDATE scheduler_runtime_status SET {assignments} WHERE job_name=?", + (*values.values(), job_name), + ) + except Exception: + _create_runtime_table(conn) + conn.execute( + "INSERT INTO scheduler_runtime_status (job_name, updated_at) VALUES (?, ?)", + (job_name, values["updated_at"]), + ) + assignments = ", ".join([f"{k}=?" for k in values]) + conn.execute( + f"UPDATE scheduler_runtime_status SET {assignments} WHERE job_name=?", + (*values.values(), job_name), + ) + conn.commit() + conn.close() + + +def enqueue_manual_trigger(job_name, force=False, requested_by=""): + init_scheduler_tables() + if not get_job_config(job_name): + return None + conn = get_conn() + cur = conn.execute( + """ + INSERT INTO scheduler_manual_trigger (job_name, force, status, requested_by, requested_at) + VALUES (?, ?, 'queued', ?, ?) + """, + (job_name, 1 if force else 0, requested_by or "", _now()), + ) + conn.commit() + trigger_id = cur.lastrowid + conn.close() + return trigger_id + + +def claim_manual_triggers(limit=10): + init_scheduler_tables() + conn = get_conn() + rows = conn.execute( + """ + SELECT * FROM scheduler_manual_trigger + WHERE status IN ('queued', 'pending') + ORDER BY requested_at ASC, id ASC + LIMIT ? + """, + (int(limit or 10),), + ).fetchall() + conn.close() + return [dict(row) for row in rows] + + +def update_manual_trigger(trigger_id, **fields): + init_scheduler_tables() + allowed = {"status", "started_at", "finished_at", "exit_code", "duration_ms", "output_tail", "error_message"} + values = {k: v for k, v in fields.items() if k in allowed} + if not values: + return + conn = get_conn() + assignments = ", ".join([f"{k}=?" for k in values]) + conn.execute( + f"UPDATE scheduler_manual_trigger SET {assignments} WHERE id=?", + (*values.values(), int(trigger_id)), + ) + conn.commit() + conn.close() + + +def list_manual_triggers(limit=30): + init_scheduler_tables() + limit = max(1, min(int(limit or 30), 100)) + conn = get_conn() + rows = conn.execute( + "SELECT * FROM scheduler_manual_trigger ORDER BY requested_at DESC, id DESC LIMIT ?", + (limit,), + ).fetchall() + conn.close() + return [dict(row) for row in rows] + + +def get_scheduler_overview(): + init_scheduler_tables() + conn = get_conn() + configs = conn.execute("SELECT * FROM scheduler_job_config ORDER BY sort_order ASC, job_name ASC").fetchall() + runtime_rows = conn.execute("SELECT * FROM scheduler_runtime_status").fetchall() + latest_rows = conn.execute( + """ + SELECT c.* + FROM cron_run_log c + JOIN ( + SELECT job_name, MAX(id) AS max_id + FROM cron_run_log + GROUP BY job_name + ) x ON x.max_id = c.id + """ + ).fetchall() + conn.close() + runtime = {row["job_name"]: dict(row) for row in runtime_rows} + latest = {row["job_name"]: dict(row) for row in latest_rows} + jobs = [] + for row in configs: + item = dict(row) + item["args"] = _load(item.pop("args_json", "[]"), []) + item["enabled"] = bool(item.get("enabled")) + item["runtime"] = runtime.get(item["job_name"], {}) + item["latest_cron"] = latest.get(_display_job_name(item["job_name"]), latest.get(item["job_name"], {})) + jobs.append(item) + return {"jobs": jobs, "updated_at": _now()} + + +def _display_job_name(job_name): + return { + "event": "事件舆情", + "tracker": "跟踪", + "confirm": "确认", + "screener": "粗筛", + "sentiment": "舆情", + "llm-sentiment": "AI舆情", + "review": "复盘", + }.get(job_name, job_name) + + +__all__ = [ + "DEFAULT_JOBS", + "claim_manual_triggers", + "enqueue_manual_trigger", + "get_job_config", + "get_job_configs", + "get_scheduler_overview", + "init_scheduler_tables", + "list_manual_triggers", + "set_job_enabled", + "set_job_interval", + "update_manual_trigger", + "update_runtime", +] diff --git a/app/services/event_driven_screener.py b/app/services/event_driven_screener.py index 5041695..20a478d 100644 --- a/app/services/event_driven_screener.py +++ b/app/services/event_driven_screener.py @@ -26,6 +26,7 @@ sys.path.insert(0, os.path.dirname(__file__)) from app.config.config_loader import load_rules, get_meta, get_strategy_direction from app.db.altcoin_db import init_db, get_conn, create_recommendation, log_screening, log_cron_run, get_recommendation_for_push +from app.db.llm_insights import repair_mojibake_json, repair_mojibake_text from app.services.altcoin_screener import ( fetch_all_tickers, detect_volume_price_fly, @@ -378,6 +379,129 @@ def store_events(events): return stored +def _normalize_llm_symbol(value): + text = str(value or "").strip().upper() + if not text: + return "" + text = text.replace("-", "/").replace("_", "/") + if "/" in text: + base = text.split("/")[0] + else: + base = re.sub(r"USDT$", "", text) + base = re.sub(r"[^A-Z0-9]", "", base) + if not base: + return "" + symbol = f"{base}/USDT" + return symbol if _tradable_symbol(symbol) else "" + + +def _llm_confidence_score(value): + try: + score = float(value or 0) + except Exception: + return 0.0 + return score * 100 if 0 < score <= 1 else score + + +def enqueue_llm_sentiment_candidates(analysis, source_insight_id="", min_confidence=70, max_candidates=10, cooldown_hours=6): + """Turn LLM sentiment analysis into event candidates for the existing technical gate. + + LLM output is allowed to request a technical check, but it never creates a + recommendation or changes scores directly. + """ + analysis = repair_mojibake_json(analysis) + if not isinstance(analysis, dict): + return {"queued": 0, "skipped": 0, "symbols": []} + + candidates = [] + seen = set() + for item in analysis.get("coin_impacts") or []: + if not isinstance(item, dict): + continue + if item.get("need_technical_check") is not True: + continue + direction = str(item.get("direction") or "").lower() + if direction not in ("positive", "neutral", "bullish", "利好", "正面", "中性"): + continue + confidence = _llm_confidence_score(item.get("confidence")) + if confidence < float(min_confidence or 0): + continue + symbol = _normalize_llm_symbol(item.get("symbol")) + if not symbol or symbol in seen: + continue + seen.add(symbol) + reason = str(repair_mojibake_text(item.get("reason") or "AI 舆情分析认为需要技术检查")).strip() + candidates.append({ + "source": "llm_sentiment", + "symbol": symbol, + "title": f"AI舆情候选 {symbol}: {reason[:160]}", + "url": "", + "published_at": _now(), + "importance": "A", + "event_type": "llm_sentiment_candidate", + "raw": { + "source_insight_id": source_insight_id, + "direction": direction, + "confidence": confidence, + "reason": reason, + }, + }) + if len(candidates) >= int(max_candidates or 10): + break + + if not candidates: + return {"queued": 0, "skipped": 0, "symbols": []} + + init_event_tables() + conn = get_conn() + queued = [] + skipped = 0 + cooldown_cutoff = (_now() - timedelta(hours=float(cooldown_hours or 6))).isoformat() + now = _now().isoformat() + for event in candidates: + recent = conn.execute( + """ + SELECT id FROM event_news + WHERE source='llm_sentiment' AND symbol=? AND detected_at >= ? + LIMIT 1 + """, + (event["symbol"], cooldown_cutoff), + ).fetchone() + if recent: + skipped += 1 + continue + h = _event_hash(event["source"], event["title"], event["symbol"]) + try: + conn.execute( + """ + INSERT INTO event_news + (event_hash, source, symbol, title, url, published_at, detected_at, importance, event_type, raw_json, processed) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0) + """, + ( + h, + event["source"], + event["symbol"], + event["title"], + event.get("url", ""), + event["published_at"].isoformat(), + now, + event["importance"], + event["event_type"], + json.dumps(event.get("raw", {}), ensure_ascii=False), + ), + ) + queued.append(event["symbol"]) + except sqlite3.IntegrityError: + skipped += 1 + except Exception as exc: + print(f"[event] llm candidate enqueue error {event.get('symbol')}: {exc}") + skipped += 1 + conn.commit() + conn.close() + return {"queued": len(queued), "skipped": skipped, "symbols": queued} + + def fetch_klines(symbol, timeframe, limit=120): try: ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit) diff --git a/app/services/llm_insights.py b/app/services/llm_insights.py new file mode 100644 index 0000000..578a878 --- /dev/null +++ b/app/services/llm_insights.py @@ -0,0 +1,635 @@ +"""Async cached LLM explanation layer.""" + +import json +import os +from datetime import datetime + +import requests + +from app.core.opportunity_lifecycle import normalize_action_status +from app.db.altcoin_db import get_conn, _derive_execution_fields +from app.db.llm_insights import compute_input_hash, get_any_insight, get_insights_for_targets, get_latest_insight_by_type, upsert_insight + +PROMPTS = { + "recommendation_explain_v1": "recommendation_explain_v1", + "sentiment_explain_v1": "sentiment_explain_v1", + "sentiment_batch_analyze_v1": "sentiment_batch_analyze_v1", + "review_memo_v1": "review_memo_v1", +} + + +def _env_bool(name, default=False): + value = os.getenv(name) + if value is None: + return default + return str(value).strip().lower() in ("1", "true", "yes", "on") + + +def get_llm_params(): + """Runtime LLM config. This is system config, not strategy config.""" + return { + "enabled": _env_bool("ALPHAX_LLM_ENABLED", False), + "base_url": os.getenv("ALPHAX_LLM_BASE_URL", "https://api.openai.com/v1").strip(), + "api_key_env": os.getenv("ALPHAX_LLM_API_KEY_ENV", "ALPHAX_LLM_API_KEY").strip(), + "model": os.getenv("ALPHAX_LLM_MODEL", "gpt-4o-mini").strip(), + "timeout": int(os.getenv("ALPHAX_LLM_TIMEOUT", "20") or "20"), + "max_tokens": int(os.getenv("ALPHAX_LLM_MAX_TOKENS", "900") or "900"), + } + + +def get_llm_module_enabled(module_name): + if not _env_bool("ALPHAX_LLM_ENABLED", False): + return False + env_name = f"ALPHAX_LLM_{str(module_name or '').upper()}_ENABLED" + return _env_bool(env_name, True) + + +def _dump_json(value): + return json.dumps(value or {}, ensure_ascii=False, sort_keys=True, default=str) + + +def _get_target_key(value): + if value is None: + return "" + return str(value) + + +def _json_fallback(value, fallback=None): + try: + return json.loads(value) if isinstance(value, str) else (value if value is not None else fallback) + except Exception: + return fallback + + +def _parse_insight_payload(content): + if not isinstance(content, dict): + return {} + if isinstance(content.get("content"), dict): + return content["content"] + return content + + +def _call_llm_json(prompt_version, payload): + params = get_llm_params() + api_key = os.getenv(str(params.get("api_key_env") or "OPENAI_API_KEY"), "").strip() + if not params.get("enabled", False) or not api_key: + return {"status": "skipped", "error": "llm_disabled_or_missing_key"} + base_url = str(params.get("base_url") or "").rstrip("/") + model = str(params.get("model") or "").strip() + timeout = int(params.get("timeout") or 20) + max_tokens = int(params.get("max_tokens") or 900) + system_prompt = ( + "You are a crypto research assistant. Return strict JSON only. " + "Do not change trading decisions, scores, or strategy state." + ) + user_prompt = _dump_json({ + "prompt_version": prompt_version, + "input": payload, + "output_schema_hint": "JSON object with concise Chinese fields only", + }) + try: + resp = requests.post( + f"{base_url}/chat/completions", + headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, + json={ + "model": model, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + "temperature": 0.2, + "max_tokens": max_tokens, + "response_format": {"type": "json_object"}, + }, + timeout=timeout, + ) + if resp.status_code >= 400: + return {"status": "failed", "error": f"http_{resp.status_code}", "raw": resp.text[:1000], "model": model} + data = resp.json() + content = (((data.get("choices") or [{}])[0]).get("message") or {}).get("content") or "{}" + parsed = json.loads(content) + if not isinstance(parsed, dict): + raise ValueError("llm_output_not_object") + return {"status": "success", "content": parsed, "model": model} + except json.JSONDecodeError as exc: + return {"status": "failed", "error": f"invalid_json:{exc}", "model": model} + except Exception as exc: + return {"status": "failed", "error": str(exc)[:1000], "model": model} + + +def _should_generate_recommendation(row): + action_status = normalize_action_status(row.get("action_status") or row.get("entry_plan", {}).get("entry_action") or "持有", row.get("status") or "active") + execution_status = str(row.get("execution_status") or "") + observe_tier = str(row.get("observe_tier") or "") + state_reason = str(row.get("state_reason") or row.get("execution_reason") or "") + entry_window = row.get("entry_window") or {} + if execution_status in ("buy_now", "wait_pullback", "invalid") or action_status in ("可即刻买入", "等回踩", "衰减") or row.get("display_bucket") == "realtime": + return True + if observe_tier == "strong" and ("回踩" in state_reason or "入场" in state_reason or "失效" in state_reason): + return True + if isinstance(entry_window, dict) and entry_window.get("status") == "active": + return True + if "重点观察" in state_reason: + return True + return False + + +def _should_generate_sentiment(row): + importance = str(row.get("importance") or "").upper() + source = str(row.get("source") or "").lower() + title = str(row.get("title") or "") + if importance in ("A", "S", "RISK"): + return True + if "binance" in source: + return True + if any(k in title.lower() for k in ("listing", "launch", "mainnet", "upgrade", "partnership", "hack", "exploit", "burn", "合约", "上币", "主网", "升级", "合作", "黑客", "漏洞")): + return True + return False + + +def _is_internal_sentiment_event(row): + event_type = str(row.get("event_type") or "") + title = str(row.get("title") or "") + source = str(row.get("source") or "") + return ( + event_type in ("market_heat", "theme_expansion", "theme_direct", "llm_sentiment_candidate") + or source == "llm_sentiment" + or title.startswith("[主题扩散:") + ) + + +def _should_generate_review(item): + metrics = item.get("metrics") or {} + release_decision = str(item.get("release_decision") or "") + failure_count = int(metrics.get("fail_count") or 0) + hit_count = int(metrics.get("hit_count") or 0) + pollution = item.get("pollution_summary") or {} + if release_decision in ("gray", "release", "hold"): + return True + if failure_count > 0 or hit_count > 0: + return True + if int(pollution.get("contaminated_symbol_count") or 0) > 0: + return True + return False + + +def _build_recommendation_payload(row): + entry_plan = row.get("entry_plan") or _json_fallback(row.get("entry_plan_json"), {}) or {} + signals = row.get("signals") or _json_fallback(row.get("signals"), []) or [] + if isinstance(signals, str): + signals = _json_fallback(signals, []) or [] + return { + "target_type": "recommendation", + "target_id": row.get("id"), + "symbol": row.get("symbol"), + "rec_time": row.get("rec_time"), + "status": row.get("status"), + "action_status": row.get("action_status"), + "execution_status": row.get("execution_status"), + "execution_label": row.get("execution_label"), + "execution_reason": row.get("execution_reason"), + "rec_score": row.get("rec_score"), + "entry_price": row.get("entry_price"), + "current_price": row.get("current_price"), + "stop_loss": row.get("stop_loss"), + "tp1": row.get("tp1"), + "tp2": row.get("tp2"), + "observe_tier": row.get("observe_tier"), + "observe_reason": row.get("observe_reason"), + "state_reason": row.get("state_reason"), + "entry_window": row.get("entry_window"), + "market_context": row.get("market_context"), + "derivatives_context": row.get("derivatives_context"), + "sector_context": row.get("sector_context"), + "entry_plan": entry_plan, + "signals": signals, + } + + +def _build_sentiment_payload(row): + return { + "target_type": "sentiment", + "target_id": row.get("event_id") or row.get("id"), + "source": row.get("source"), + "source_label": row.get("source_label"), + "event_type": row.get("event_type"), + "importance": row.get("importance"), + "title": row.get("title"), + "related_symbol": row.get("related_symbol"), + "related_base": row.get("related_base"), + "decision": row.get("decision"), + "tech_score": row.get("tech_score"), + "published_at": row.get("published_at"), + "detected_at": row.get("detected_at"), + "relation_tag": row.get("relation_tag"), + "in_active": row.get("in_active"), + "in_screened": row.get("in_screened"), + } + + +def _build_sentiment_batch_payload(hours=24, limit=40): + conn = get_conn() + conn.row_factory = None + events = [] + try: + rows = conn.execute( + """ + SELECT id, source, symbol, title, url, published_at, detected_at, importance, + event_type, decision, tech_score, rec_id, pushed + FROM event_news + WHERE detected_at >= datetime('now', '-' || ? || ' hours') + ORDER BY datetime(published_at) DESC, id DESC + LIMIT ? + """, + (int(hours or 24), int(limit or 40)), + ).fetchall() + except Exception: + rows = [] + for raw in rows: + row = { + "event_id": f"event_news:{raw[0]}", + "source": raw[1], + "related_symbol": raw[2], + "related_base": (str(raw[2] or "").split("/")[0] or "").upper(), + "title": raw[3], + "url": raw[4], + "published_at": raw[5], + "detected_at": raw[6], + "importance": raw[7], + "event_type": raw[8], + "decision": raw[9], + "tech_score": raw[10], + "rec_id": raw[11], + "pushed": bool(raw[12]), + } + if _is_internal_sentiment_event(row): + continue + events.append(row) + + try: + trend_rows = conn.execute( + """ + SELECT id, symbol, name, trend_rank, trend_score, market_cap_rank, detected_at, extra_json + FROM sentiment_events + WHERE detected_at = (SELECT MAX(detected_at) FROM sentiment_events WHERE source='coingecko') + ORDER BY trend_rank + LIMIT 20 + """ + ).fetchall() + except Exception: + trend_rows = [] + conn.close() + + trend_news = [] + for raw in trend_rows: + extra = _json_fallback(raw[7], {}) or {} + for n in (extra.get("news") or [])[:3]: + title = n.get("title") or "" + if not title: + continue + trend_news.append({ + "event_id": f"sentiment_event:{raw[0]}:{n.get('url') or title}", + "source": n.get("source") or "news", + "related_symbol": f"{str(raw[1] or '').upper()}/USDT", + "related_base": str(raw[1] or "").upper(), + "related_name": raw[2] or raw[1], + "title": title[:180], + "url": n.get("url") or "", + "published_at": n.get("published") or "", + "detected_at": raw[6], + "importance": "B", + "event_type": "news", + "trend_rank": raw[3], + "trend_score": raw[4], + "market_cap_rank": raw[5], + "price_usd": extra.get("price_usd", 0), + "change_24h_pct": extra.get("change_24h_pct", 0), + }) + + combined = events + trend_news + seen = set() + deduped = [] + for item in combined: + key = ((item.get("title") or "").strip().lower(), item.get("related_base"), item.get("source")) + if key in seen: + continue + seen.add(key) + deduped.append(item) + deduped = deduped[: int(limit or 40)] + return { + "target_type": "sentiment_batch", + "target_id": f"sentiment_batch:{int(hours or 24)}h", + "hours": int(hours or 24), + "generated_at": datetime.now().isoformat(), + "event_count": len(deduped), + "events": deduped, + "instructions": { + "role": "作为加密市场舆情分析师,判断这些新闻对山寨币行情的影响。", + "focus": [ + "归纳主线叙事和受影响币种", + "区分利好、利空、风险和噪音", + "给出可信度和短线影响窗口", + "指出哪些币种需要触发技术检查", + "不要给买卖指令,只做舆情影响分析", + ], + "expected_schema": { + "market_mood": "risk_on|neutral|risk_off", + "summary": "中文摘要", + "hot_themes": [{"theme": "", "impact": "", "symbols": [], "confidence": 0}], + "coin_impacts": [{"symbol": "", "direction": "positive|negative|risk|neutral", "reason": "", "confidence": 0, "need_technical_check": False}], + "risk_events": [{"title": "", "symbols": [], "risk_type": "", "severity": "low|medium|high"}], + "watchlist": [{"symbol": "", "why": "", "trigger": ""}], + }, + }, + } + + +def _build_review_payload(item): + return { + "target_type": "review", + "target_id": item.get("id") or item.get("created_at") or item.get("run_date"), + "run_date": item.get("run_date"), + "created_at": item.get("created_at"), + "title": item.get("title"), + "summary": item.get("summary"), + "metrics": item.get("metrics") or {}, + "findings": item.get("findings") or [], + "problems": item.get("problems") or [], + "actions": item.get("actions") or [], + "candidate_rules": item.get("candidate_rules") or [], + "success_analysis": item.get("success_analysis") or {}, + "failure_analysis": item.get("failure_analysis") or {}, + "pollution_summary": item.get("pollution_summary") or {}, + "version_change_summary": item.get("version_change_summary") or "", + } + + +def generate_recommendation_insights(limit=30): + if not get_llm_module_enabled("recommendations"): + return {"status": "skipped", "reason": "module_disabled", "processed": 0} + conn = get_conn() + rows = conn.execute( + """ + SELECT r.*, + lpc.price AS latest_cache_price, + lpc.updated_at AS latest_cache_updated_at + FROM recommendation r + LEFT JOIN latest_price_cache lpc ON lpc.symbol = r.symbol + WHERE r.status='active' AND COALESCE(r.display_bucket,'watch_pool') != 'history' + ORDER BY r.rec_time DESC + """ + ).fetchall() + conn.close() + items = [] + seen = set() + for row in rows: + item = _derive_execution_fields(dict(row)) + if not _should_generate_recommendation(item): + continue + if str(item.get("id")) in seen: + continue + seen.add(str(item.get("id"))) + items.append(item) + if limit and len(items) >= int(limit): + break + + processed = 0 + for row in items: + payload = _build_recommendation_payload(row) + input_hash = compute_input_hash(payload) + cached = get_any_insight("recommendation", payload["target_id"], PROMPTS["recommendation_explain_v1"], input_hash) + if cached: + continue + result = _call_llm_json(PROMPTS["recommendation_explain_v1"], payload) + upsert_insight( + "recommendation", + payload["target_id"], + PROMPTS["recommendation_explain_v1"], + PROMPTS["recommendation_explain_v1"], + input_hash, + result.get("status") or "failed", + input_payload=payload, + content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")}, + error=result.get("error", ""), + model=result.get("model", ""), + ) + processed += 1 + return {"status": "success", "processed": processed, "scanned": len(items)} + + +def generate_sentiment_insights(limit=30): + if not get_llm_module_enabled("sentiment"): + return {"status": "skipped", "reason": "module_disabled", "processed": 0} + conn = get_conn() + conn.row_factory = None + try: + rows = conn.execute( + """ + SELECT id AS event_id, source, symbol, title, url, published_at, detected_at, importance, + event_type, decision, tech_score, rec_id, pushed + FROM event_news + ORDER BY datetime(published_at) DESC, id DESC + LIMIT 120 + """ + ).fetchall() + except Exception: + rows = [] + finally: + conn.close() + processed = 0 + for raw in rows: + row = { + "event_id": f"event_news:{raw[0]}", + "source": raw[1], + "symbol": raw[2], + "title": raw[3], + "published_at": raw[5], + "detected_at": raw[6], + "importance": raw[7], + "event_type": raw[8], + "decision": raw[9], + "tech_score": raw[10], + "rec_id": raw[11], + "pushed": raw[12], + "source_label": "Binance公告" if "binance" in str(raw[1]).lower() else str(raw[1] or ""), + "related_symbol": raw[2], + "related_base": (str(raw[2] or "").split("/")[0] or "").upper(), + "in_active": False, + "in_screened": False, + "relation_tag": "", + } + if not _should_generate_sentiment(row): + continue + payload = _build_sentiment_payload(row) + input_hash = compute_input_hash(payload) + cached = get_any_insight("sentiment", payload["target_id"], PROMPTS["sentiment_explain_v1"], input_hash) + if cached: + continue + result = _call_llm_json(PROMPTS["sentiment_explain_v1"], payload) + upsert_insight( + "sentiment", + payload["target_id"], + PROMPTS["sentiment_explain_v1"], + PROMPTS["sentiment_explain_v1"], + input_hash, + result.get("status") or "failed", + input_payload=payload, + content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")}, + error=result.get("error", ""), + model=result.get("model", ""), + ) + processed += 1 + if limit and processed >= int(limit): + break + return {"status": "success", "processed": processed} + + +def generate_sentiment_batch_analysis(limit=40, hours=24): + if not get_llm_module_enabled("sentiment"): + return {"status": "skipped", "reason": "module_disabled", "processed": 0} + payload = _build_sentiment_batch_payload(hours=hours, limit=limit) + if not payload.get("events"): + return {"status": "skipped", "reason": "no_sentiment_events", "processed": 0} + input_hash = compute_input_hash(payload) + cached = get_any_insight("sentiment_batch", payload["target_id"], PROMPTS["sentiment_batch_analyze_v1"], input_hash) + if cached: + candidate_result = {"queued": 0, "skipped": 0, "symbols": []} + if cached.get("status") == "success": + try: + from app.services.event_driven_screener import enqueue_llm_sentiment_candidates + + candidate_result = enqueue_llm_sentiment_candidates( + cached.get("content") or {}, + source_insight_id=str(cached.get("id") or input_hash), + ) + except Exception as exc: + candidate_result = {"queued": 0, "skipped": 0, "symbols": [], "error": str(exc)[:300]} + return { + "status": "success", + "processed": 0, + "cached": True, + "event_count": payload.get("event_count", 0), + "candidate_events": candidate_result, + } + result = _call_llm_json(PROMPTS["sentiment_batch_analyze_v1"], payload) + candidate_result = {"queued": 0, "skipped": 0, "symbols": []} + if result.get("status") == "success": + try: + from app.services.event_driven_screener import enqueue_llm_sentiment_candidates + + candidate_result = enqueue_llm_sentiment_candidates( + result.get("content") or {}, + source_insight_id=input_hash, + ) + except Exception as exc: + candidate_result = {"queued": 0, "skipped": 0, "symbols": [], "error": str(exc)[:300]} + upsert_insight( + "sentiment_batch", + payload["target_id"], + PROMPTS["sentiment_batch_analyze_v1"], + PROMPTS["sentiment_batch_analyze_v1"], + input_hash, + result.get("status") or "failed", + input_payload=payload, + content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")}, + error=result.get("error", ""), + model=result.get("model", ""), + ) + return { + "status": "success", + "processed": 1, + "event_count": payload.get("event_count", 0), + "candidate_events": candidate_result, + } + + +def generate_review_memos(limit=10): + if not get_llm_module_enabled("review"): + return {"status": "skipped", "reason": "module_disabled", "processed": 0} + from app.db.review_queries import get_strategy_iteration_logs + + logs = get_strategy_iteration_logs(limit=max(limit or 10, 1)) + processed = 0 + for item in logs: + if not _should_generate_review(item): + continue + payload = _build_review_payload(item) + input_hash = compute_input_hash(payload) + cached = get_any_insight("review", payload["target_id"], PROMPTS["review_memo_v1"], input_hash) + if cached: + continue + result = _call_llm_json(PROMPTS["review_memo_v1"], payload) + upsert_insight( + "review", + payload["target_id"], + PROMPTS["review_memo_v1"], + PROMPTS["review_memo_v1"], + input_hash, + result.get("status") or "failed", + input_payload=payload, + content=result.get("content") if result.get("status") == "success" else {"raw": result.get("raw", "")}, + error=result.get("error", ""), + model=result.get("model", ""), + ) + processed += 1 + return {"status": "success", "processed": processed} + + +def run(scope="recommendations", limit=30): + scope = str(scope or "").strip() + if scope == "recommendations": + return generate_recommendation_insights(limit=limit) + if scope == "sentiment": + return generate_sentiment_batch_analysis(limit=limit) + if scope == "sentiment-events": + return generate_sentiment_insights(limit=limit) + if scope == "review": + return generate_review_memos(limit=limit) + raise ValueError(f"unknown llm scope: {scope}") + + +def attach_recommendation_insights(items): + ids = [str(item.get("id")) for item in items or [] if item.get("id") is not None] + insights = get_insights_for_targets("recommendation", ids, PROMPTS["recommendation_explain_v1"]) + for item in items or []: + insight = insights.get(str(item.get("id"))) + if insight: + item["llm_insight"] = insight + return items + + +def attach_sentiment_insights(items): + ids = [str(item.get("event_id") or item.get("id")) for item in items or [] if (item.get("event_id") or item.get("id")) is not None] + insights = get_insights_for_targets("sentiment", ids, PROMPTS["sentiment_explain_v1"]) + for item in items or []: + insight = insights.get(str(item.get("event_id") or item.get("id"))) + if insight: + item["llm_insight"] = insight + return items + + +def get_latest_review_memo(): + return get_latest_insight_by_type("review", PROMPTS["review_memo_v1"]) + + +def get_latest_sentiment_batch_analysis(): + return get_latest_insight_by_type("sentiment_batch", PROMPTS["sentiment_batch_analyze_v1"]) + + +def get_latest_sentiment_batch_attempt(): + return get_latest_insight_by_type("sentiment_batch", PROMPTS["sentiment_batch_analyze_v1"], success_only=False) + + +__all__ = [ + "PROMPTS", + "attach_recommendation_insights", + "attach_sentiment_insights", + "generate_recommendation_insights", + "generate_review_memos", + "generate_sentiment_batch_analysis", + "generate_sentiment_insights", + "get_latest_sentiment_batch_analysis", + "get_latest_sentiment_batch_attempt", + "get_latest_review_memo", + "run", +] diff --git a/app/web/routes_admin.py b/app/web/routes_admin.py index be8a437..39a7608 100644 --- a/app/web/routes_admin.py +++ b/app/web/routes_admin.py @@ -2,7 +2,21 @@ from fastapi import APIRouter, Cookie, HTTPException, Request from fastapi.responses import HTMLResponse from app.db import auth_db -from app.web.shared import login_redirect, require_admin +from app.db.scheduler_db import ( + enqueue_manual_trigger, + get_job_config, + get_scheduler_overview, + list_manual_triggers, + set_job_enabled, + set_job_interval, +) +from app.web.shared import ( + SchedulerIntervalRequest, + SchedulerToggleRequest, + SchedulerTriggerRequest, + login_redirect, + require_admin, +) def build_router(templates): router = APIRouter() @@ -40,4 +54,41 @@ def build_router(templates): require_admin(altcoin_session) return auth_db.get_admin_orders(search=search, offset=offset, limit=limit, status=status) + @router.get("/api/scheduler/jobs") + async def api_scheduler_jobs(altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + return get_scheduler_overview() + + @router.post("/api/scheduler/jobs/{job_name}/toggle") + async def api_scheduler_toggle(job_name: str, payload: SchedulerToggleRequest, altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + if not set_job_enabled(job_name, payload.enabled): + raise HTTPException(status_code=404, detail="任务不存在") + return {"ok": True, "job_name": job_name, "enabled": payload.enabled} + + @router.post("/api/scheduler/jobs/{job_name}/interval") + async def api_scheduler_interval(job_name: str, payload: SchedulerIntervalRequest, altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + if payload.every_seconds < 30: + raise HTTPException(status_code=400, detail="周期不能低于 30 秒") + if not set_job_interval(job_name, payload.every_seconds): + raise HTTPException(status_code=404, detail="任务不存在") + return {"ok": True, "job_name": job_name, "every_seconds": payload.every_seconds} + + @router.post("/api/scheduler/jobs/{job_name}/trigger") + async def api_scheduler_trigger(job_name: str, payload: SchedulerTriggerRequest, altcoin_session: str = Cookie(default="")): + user = require_admin(altcoin_session) + job = get_job_config(job_name) + if not job: + raise HTTPException(status_code=404, detail="任务不存在") + if not job.get("enabled") and not payload.force: + raise HTTPException(status_code=409, detail="任务已关闭,需要确认后 force=true 才能单次运行") + trigger_id = enqueue_manual_trigger(job_name, force=payload.force, requested_by=user.get("email", "")) + return {"ok": True, "job_name": job_name, "trigger_id": trigger_id, "force": payload.force} + + @router.get("/api/scheduler/triggers") + async def api_scheduler_triggers(limit: int = 30, altcoin_session: str = Cookie(default="")): + require_admin(altcoin_session) + return {"items": list_manual_triggers(limit=limit)} + return router diff --git a/app/web/routes_content.py b/app/web/routes_content.py index 5b794d3..97e2983 100644 --- a/app/web/routes_content.py +++ b/app/web/routes_content.py @@ -8,6 +8,7 @@ from fastapi import APIRouter, Cookie from fastapi.responses import JSONResponse from app.web.shared import require_api_user_with_subscription +from app.services.llm_insights import attach_sentiment_insights, get_latest_sentiment_batch_analysis, get_latest_sentiment_batch_attempt def build_router(repo_root: Path): @@ -83,7 +84,7 @@ def build_router(repo_root: Path): try: event_rows = conn.execute( """ - SELECT source, symbol, title, url, published_at, detected_at, importance, + SELECT id, source, symbol, title, url, published_at, detected_at, importance, event_type, decision, tech_score, rec_id, pushed FROM event_news WHERE detected_at >= datetime('now', '-' || ? || ' hours') @@ -96,14 +97,16 @@ def build_router(repo_root: Path): base = (r["symbol"] or "").split("/")[0].upper() source = r["source"] or "event" event_type = r["event_type"] or "event" - if event_type == "market_heat": + title = r["title"] or "" + if event_type in ("market_heat", "theme_expansion", "theme_direct", "llm_sentiment_candidate") or source == "llm_sentiment" or title.startswith("[主题扩散:"): continue events.append({ + "event_id": f"event_news:{r['id']}", "source": source, "source_label": "Binance公告" if "binance" in source else "CoinGecko热度" if "coingecko" in source else source, "event_type": event_type, "importance": r["importance"] or "B", - "title": r["title"] or "", + "title": title, "url": r["url"] or "", "published_at": r["published_at"], "detected_at": r["detected_at"], @@ -155,6 +158,7 @@ def build_router(repo_root: Path): if not _is_valuable_news_title(title): continue events.append({ + "event_id": f"sentiment_event:{r['id']}:{n.get('url') or title}", "source": n.get("source") or "news", "source_label": n.get("source") or "新闻", "event_type": "news", @@ -196,6 +200,7 @@ def build_router(repo_root: Path): deduped.append(e) deduped.sort(key=lambda item: (item.get("published_at") or item.get("detected_at") or "", {"RISK": 5, "S": 4, "A": 3, "B": 2, "C": 1}.get(item.get("importance"), 0)), reverse=True) + attach_sentiment_insights(deduped) check_time = deduped[0]["detected_at"] if deduped else None return { "check_time": check_time, @@ -207,6 +212,34 @@ def build_router(repo_root: Path): "total_trending": 0, } + @router.get("/api/sentiment/analysis") + async def api_sentiment_analysis(altcoin_session: str = Cookie(default="")): + require_api_user_with_subscription(altcoin_session) + insight = get_latest_sentiment_batch_analysis() + if not insight: + attempt = get_latest_sentiment_batch_attempt() + return { + "analysis": None, + "status": "empty" if not attempt else attempt.get("status"), + "updated_at": attempt.get("updated_at") if attempt else None, + "model": attempt.get("model") if attempt else "", + "error": attempt.get("error") if attempt else "", + "event_count": (attempt.get("input") or {}).get("event_count", 0) if attempt else 0, + "source_events": (attempt.get("input") or {}).get("events", []) if attempt else [], + } + content = insight.get("content") or {} + payload = insight.get("input") or {} + return { + "status": insight.get("status"), + "updated_at": insight.get("updated_at"), + "model": insight.get("model"), + "prompt_version": insight.get("prompt_version"), + "analysis": content, + "source_events": payload.get("events") or [], + "event_count": payload.get("event_count") or len(payload.get("events") or []), + "hours": payload.get("hours") or 24, + } + @router.get("/api/kline") async def api_kline(symbol: str, interval: str = "1d", limit: int = 60, altcoin_session: str = Cookie(default="")): require_api_user_with_subscription(altcoin_session) diff --git a/app/web/routes_pages.py b/app/web/routes_pages.py index 4432d25..ffe422f 100644 --- a/app/web/routes_pages.py +++ b/app/web/routes_pages.py @@ -1,10 +1,10 @@ from pathlib import Path -from fastapi import APIRouter, Cookie, Request +from fastapi import APIRouter, Cookie, HTTPException, Request from fastapi.responses import HTMLResponse from app.db import auth_db -from app.web.shared import require_page_user +from app.web.shared import require_admin, require_page_user def build_router(templates, repo_root: Path, stock_report_template: str): @@ -48,6 +48,24 @@ def build_router(templates, repo_root: Path, stock_report_template: str): return redirect return render_page("pipeline.html", request) + @router.get("/llm-insights", response_class=HTMLResponse) + async def llm_insights_page(request: Request): + user, redirect = require_page_user(request) + if redirect: + return redirect + return render_page("llm_insights.html", request) + + @router.get("/cron", response_class=HTMLResponse) + async def cron_page(request: Request): + user, redirect = require_page_user(request) + if redirect: + return redirect + try: + require_admin(request.cookies.get("altcoin_session", "")) + except HTTPException as exc: + return HTMLResponse(content=f"

需要管理员权限

{exc.detail}

返回看板", status_code=exc.status_code) + return render_page("cron.html", request) + @router.get("/strategy", response_class=HTMLResponse) async def strategy_page(request: Request): user, redirect = require_page_user(request) diff --git a/app/web/routes_recommendations.py b/app/web/routes_recommendations.py index c6cfc86..4fcd901 100644 --- a/app/web/routes_recommendations.py +++ b/app/web/routes_recommendations.py @@ -12,6 +12,7 @@ from app.db.analytics import ( get_screening_history, get_stats, ) +from app.db.llm_insights import get_llm_insight_by_id, list_llm_insights from app.db.recommendation_queries import get_active_recommendations, get_active_recommendations_deduped from app.config.config_loader import get_signal_weights from app.web.shared import ( @@ -24,6 +25,41 @@ from app.web.shared import ( router = APIRouter() +def _friendly_llm_item(item): + content = item.get("content") or {} + payload = item.get("input") or {} + target_type = item.get("target_type") or "" + status = item.get("status") or "" + type_label = { + "recommendation": "推荐解释", + "sentiment": "舆情解读", + "review": "复盘 memo", + }.get(target_type, target_type or "未知任务") + status_label = { + "success": "成功", + "failed": "失败", + "skipped": "跳过", + }.get(status, status or "未知") + subject = payload.get("symbol") or payload.get("related_symbol") or payload.get("title") or payload.get("run_date") or item.get("target_id") + summary = content.get("summary") or content.get("memo") or content.get("why_now_or_not") or content.get("raw") or item.get("error") or "" + return { + "id": item.get("id"), + "type_label": type_label, + "status_label": status_label, + "status": status, + "subject": subject, + "summary": summary, + "model": item.get("model") or "", + "prompt_version": item.get("prompt_version") or "", + "target_type": target_type, + "target_id": item.get("target_id"), + "updated_at": item.get("updated_at"), + "error": item.get("error") or "", + "content": content, + "input": payload, + } + + @router.get("/api/stats") async def api_stats(altcoin_session: str = Cookie(default="")): require_api_user_with_subscription(altcoin_session) @@ -166,3 +202,33 @@ async def api_pipeline_run_detail(run_id: int, altcoin_session: str = Cookie(def if not detail: return {"error": "pipeline run not found", "run_id": run_id} return detail + + +@router.get("/api/llm/insights") +async def api_llm_insights( + limit: int = 30, + offset: int = 0, + target_type: str = "", + status: str = "", + insight_type: str = "", + altcoin_session: str = Cookie(default=""), +): + require_api_user_with_subscription(altcoin_session) + data = list_llm_insights( + limit=limit, + offset=offset, + target_type=target_type or "", + status=status or "", + insight_type=insight_type or "", + ) + data["items"] = [_friendly_llm_item(item) for item in data.get("items", [])] + return data + + +@router.get("/api/llm/insights/{insight_id}") +async def api_llm_insight_detail(insight_id: int, altcoin_session: str = Cookie(default="")): + require_api_user_with_subscription(altcoin_session) + item = get_llm_insight_by_id(insight_id) + if not item: + return {"error": "llm insight not found", "id": insight_id} + return _friendly_llm_item(item) diff --git a/app/web/routes_strategy.py b/app/web/routes_strategy.py index db5d04f..95f1b1a 100644 --- a/app/web/routes_strategy.py +++ b/app/web/routes_strategy.py @@ -12,6 +12,7 @@ from app.db.review_queries import ( get_strategy_rule_candidates, refresh_strategy_candidate_performance, ) +from app.services.llm_insights import get_latest_review_memo from app.db.schema import get_conn from app.db.altcoin_db import _derive_execution_fields from app.web.shared import require_api_user_with_subscription @@ -74,7 +75,9 @@ async def api_strategy_insights(altcoin_session: str = Cookie(default="")): @router.get("/api/strategy/lifecycle") async def api_strategy_lifecycle(days: int = 30, altcoin_session: str = Cookie(default="")): require_api_user_with_subscription(altcoin_session) - return get_strategy_iteration_dashboard(days=days) + data = get_strategy_iteration_dashboard(days=days) + data["llm_review_memo"] = get_latest_review_memo() + return data @router.get("/api/iterations") diff --git a/app/web/shared.py b/app/web/shared.py index ba326a8..8d5d707 100644 --- a/app/web/shared.py +++ b/app/web/shared.py @@ -67,6 +67,18 @@ class PushRulesRequest(BaseModel): quiet_end: str = "" +class SchedulerToggleRequest(BaseModel): + enabled: bool + + +class SchedulerIntervalRequest(BaseModel): + every_seconds: int + + +class SchedulerTriggerRequest(BaseModel): + force: bool = False + + def auth_error(exc: Exception, status_code: int = 400): raise HTTPException(status_code=status_code, detail=str(exc)) diff --git a/docker/scheduler.py b/docker/scheduler.py index 481ccbe..28f4230 100755 --- a/docker/scheduler.py +++ b/docker/scheduler.py @@ -1,25 +1,35 @@ #!/usr/bin/env python3 -"""AlphaX 容器内轻量调度器。 +"""AlphaX Docker scheduler with lightweight process concurrency.""" -设计目标: -- 替代宿主机 crontab; -- 单进程串行执行,避免 SQLite 并发写锁; -- 默认 DRY_RUN=1,不影响线上,也不会真的跑任务; -- 部署验证通过后再把 ALPHAX_SCHEDULER_DRY_RUN=0 打开。 -""" from __future__ import annotations import os import subprocess import sys +import tempfile import time -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from pathlib import Path ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from app.db.schema import init_db +from app.db.scheduler_db import ( + claim_manual_triggers, + get_job_configs, + init_scheduler_tables, + update_manual_trigger, + update_runtime, +) + PYTHON = sys.executable DRY_RUN = os.getenv("ALPHAX_SCHEDULER_DRY_RUN", "1").strip() not in {"0", "false", "False", "no", "NO"} +POLL_SECONDS = 1.0 +CONFIG_RELOAD_SECONDS = 5.0 +PENDING_WARN_SECONDS = 30.0 @dataclass @@ -29,77 +39,262 @@ class Job: every_seconds: int args: tuple[str, ...] = () initial_delay: int = 0 + lock_group: str = "" + enabled: bool = True + description: str = "" next_run: float = 0.0 + pending: bool = False + pending_since: float = 0.0 + last_pending_log: float = 0.0 + + +@dataclass +class RunningJob: + job: Job + proc: subprocess.Popen + started_at: float + started_iso: str + run_kind: str = "auto" + trigger_id: int = 0 + output_file: object | None = None def now_str() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") +def iso_now() -> str: + return datetime.now().isoformat() + + def env_for_child() -> dict[str, str]: env = os.environ.copy() env.setdefault("PYTHONUNBUFFERED", "1") return env -def run_job(job: Job) -> None: +def _tail(text: str, limit: int = 8000) -> str: + value = text or "" + return value[-limit:] if len(value) > limit else value + + +def _next_run_iso(ts: float) -> str: + return datetime.fromtimestamp(ts).isoformat() if ts else "" + + +def load_jobs(existing: dict[str, Job], base: float, running: dict[str, RunningJob] | None = None) -> dict[str, Job]: + running = running or {} + loaded = {} + for cfg in get_job_configs(): + name = cfg["job_name"] + old = existing.get(name) + every = int(cfg.get("every_seconds") or 60) + job = Job( + name=name, + command=cfg.get("command") or name, + every_seconds=every, + args=tuple(cfg.get("args") or ()), + initial_delay=int(cfg.get("initial_delay") or 0), + lock_group=cfg.get("lock_group") or name, + enabled=bool(cfg.get("enabled")), + description=cfg.get("description") or "", + next_run=old.next_run if old else base + int(cfg.get("initial_delay") or 0), + pending=old.pending if old else False, + pending_since=old.pending_since if old else 0.0, + last_pending_log=old.last_pending_log if old else 0.0, + ) + loaded[name] = job + if name not in running: + update_runtime(name, next_run_at=_next_run_iso(job.next_run), status=("disabled" if not job.enabled else ("pending" if job.pending else "idle"))) + return loaded + + +def _lock_busy(job: Job, running: dict[str, RunningJob]) -> bool: + if job.name in running: + return True + return any(item.job.lock_group == job.lock_group for item in running.values()) + + +def _mark_pending(job: Job, reason: str) -> None: + now = time.time() + if not job.pending: + job.pending = True + job.pending_since = now + if now - job.last_pending_log >= PENDING_WARN_SECONDS: + print(f"[{now_str()}] [scheduler] pending {job.name}: {reason}", flush=True) + job.last_pending_log = now + update_runtime(job.name, status="pending", locked_by=reason, next_run_at=_next_run_iso(job.next_run)) + + +def start_job(job: Job, running: dict[str, RunningJob], run_kind: str = "auto", trigger_id: int = 0) -> bool: + if _lock_busy(job, running): + blocker = "same_job" if job.name in running else f"lock:{job.lock_group}" + _mark_pending(job, blocker) + return False + cmd = [PYTHON, "-m", "app.cli", job.command, *job.args] - print(f"[{now_str()}] [scheduler] start {job.name}: {' '.join(cmd)}", flush=True) + print(f"[{now_str()}] [scheduler] start {job.name} kind={run_kind}: {' '.join(cmd)}", flush=True) + job.pending = False + job.pending_since = 0.0 + job.last_pending_log = 0.0 + if DRY_RUN: print(f"[{now_str()}] [scheduler] DRY_RUN=1 skip {job.name}", flush=True) - return - started = time.time() - try: - proc = subprocess.run( - cmd, - cwd=ROOT, - env=env_for_child(), - text=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - timeout=max(job.every_seconds * 2, 600), + update_runtime( + job.name, + status="idle" if job.enabled else "disabled", + pid=0, + run_kind=run_kind, + trigger_id=trigger_id, + locked_by="", + last_started_at=iso_now(), + last_finished_at=iso_now(), + last_exit_code=0, + last_duration_ms=0, + last_error="dry_run", + output_tail="DRY_RUN=1", ) - duration = time.time() - started - out = (proc.stdout or "").strip() - if len(out) > 8000: - out = out[-8000:] - print(f"[{now_str()}] [scheduler] done {job.name} exit={proc.returncode} duration={duration:.1f}s", flush=True) - if out: - print(out, flush=True) - except subprocess.TimeoutExpired as e: - print(f"[{now_str()}] [scheduler] timeout {job.name}: {e}", flush=True) - except Exception as e: - print(f"[{now_str()}] [scheduler] error {job.name}: {e}", flush=True) + if trigger_id: + update_manual_trigger(trigger_id, status="skipped", finished_at=iso_now(), output_tail="DRY_RUN=1") + return True + + started_iso = iso_now() + output_file = tempfile.TemporaryFile(mode="w+t", encoding="utf-8", errors="replace") + proc = subprocess.Popen( + cmd, + cwd=ROOT, + env=env_for_child(), + text=True, + stdout=output_file, + stderr=subprocess.STDOUT, + ) + running[job.name] = RunningJob( + job=job, + proc=proc, + started_at=time.time(), + started_iso=started_iso, + run_kind=run_kind, + trigger_id=trigger_id, + output_file=output_file, + ) + update_runtime( + job.name, + status="running", + pid=proc.pid, + run_kind=run_kind, + trigger_id=trigger_id, + locked_by=job.lock_group, + last_started_at=started_iso, + last_error="", + output_tail="", + ) + if trigger_id: + update_manual_trigger(trigger_id, status="running", started_at=started_iso) + return True -def build_jobs() -> list[Job]: - # 与当前宿主机 crontab 对齐,但串行执行。 - return [ - Job("event", "event", 60, initial_delay=5), - Job("tracker", "tracker", 180, initial_delay=20), - Job("confirm", "confirm", 600, initial_delay=40), - Job("screener", "screener", 900, initial_delay=80), - Job("sentiment", "sentiment", 1800, ("--collect",), initial_delay=120), - Job("review", "review", 24 * 3600, initial_delay=300), - ] +def finish_running_jobs(running: dict[str, RunningJob]) -> None: + for name, item in list(running.items()): + proc = item.proc + timeout = max(item.job.every_seconds * 2, 600) + elapsed = time.time() - item.started_at + if proc.poll() is None and elapsed > timeout: + proc.kill() + print(f"[{now_str()}] [scheduler] timeout {name} after {elapsed:.1f}s", flush=True) + if proc.poll() is None: + continue + out = "" + try: + if item.output_file: + item.output_file.seek(0) + out = item.output_file.read() + except Exception: + out = "" + finally: + try: + if item.output_file: + item.output_file.close() + except Exception: + pass + duration_ms = int((time.time() - item.started_at) * 1000) + exit_code = int(proc.returncode or 0) + output_tail = _tail(out.strip()) + status = "idle" if item.job.enabled else "disabled" + err = "" if exit_code == 0 else f"exit={exit_code}" + print(f"[{now_str()}] [scheduler] done {name} exit={exit_code} duration={duration_ms/1000:.1f}s", flush=True) + if output_tail: + print(output_tail, flush=True) + update_runtime( + name, + status=status, + pid=0, + run_kind=item.run_kind, + trigger_id=item.trigger_id, + locked_by="", + next_run_at=_next_run_iso(item.job.next_run), + last_finished_at=iso_now(), + last_exit_code=exit_code, + last_duration_ms=duration_ms, + last_error=err, + output_tail=output_tail, + ) + if item.trigger_id: + update_manual_trigger( + item.trigger_id, + status="success" if exit_code == 0 else "error", + finished_at=iso_now(), + exit_code=exit_code, + duration_ms=duration_ms, + output_tail=output_tail, + error_message=err, + ) + running.pop(name, None) + + +def handle_manual_triggers(jobs: dict[str, Job], running: dict[str, RunningJob]) -> None: + for trigger in claim_manual_triggers(limit=10): + job = jobs.get(trigger.get("job_name")) + trigger_id = int(trigger.get("id") or 0) + if not job: + update_manual_trigger(trigger_id, status="error", error_message="unknown job", finished_at=iso_now()) + continue + if not job.enabled and not trigger.get("force"): + update_manual_trigger(trigger_id, status="rejected", error_message="job disabled", finished_at=iso_now()) + continue + if start_job(job, running, run_kind="manual", trigger_id=trigger_id): + continue + update_manual_trigger(trigger_id, status="pending") + + +def schedule_due_jobs(jobs: dict[str, Job], running: dict[str, RunningJob]) -> None: + now = time.time() + for job in sorted(jobs.values(), key=lambda item: item.next_run): + if not job.enabled: + update_runtime(job.name, status="disabled", next_run_at="") + continue + if job.pending or now >= job.next_run: + if start_job(job, running, run_kind="auto"): + job.next_run = time.time() + job.every_seconds + update_runtime(job.name, next_run_at=_next_run_iso(job.next_run)) def main() -> None: - jobs = build_jobs() + init_db() + init_scheduler_tables() base = time.time() - for job in jobs: - job.next_run = base + job.initial_delay - print(f"[{now_str()}] [scheduler] started jobs={len(jobs)} dry_run={DRY_RUN}", flush=True) + jobs: dict[str, Job] = {} + running: dict[str, RunningJob] = {} + jobs = load_jobs(jobs, base, running) + last_reload = time.time() + print(f"[{now_str()}] [scheduler] started jobs={len(jobs)} dry_run={DRY_RUN} mode=concurrent", flush=True) while True: - now = time.time() - due = [j for j in jobs if now >= j.next_run] - if not due: - time.sleep(1) - continue - # 串行执行;一个 job 跑完才跑下一个,避免 SQLite 写锁。 - for job in sorted(due, key=lambda j: j.next_run): - run_job(job) - job.next_run = time.time() + job.every_seconds + finish_running_jobs(running) + if time.time() - last_reload >= CONFIG_RELOAD_SECONDS: + jobs = load_jobs(jobs, time.time(), running) + last_reload = time.time() + handle_manual_triggers(jobs, running) + schedule_due_jobs(jobs, running) + time.sleep(POLL_SECONDS) if __name__ == "__main__": diff --git a/rules.yaml b/rules.yaml index 3df208d..cc96fa9 100644 --- a/rules.yaml +++ b/rules.yaml @@ -405,11 +405,11 @@ event_driven: note: Solana meme主题扩散 meta: version: 1 - last_review: '2026-05-14T10:26:01.120951' - last_reverse_analysis: '2026-05-14T10:26:36.940507' - total_reviews: 28 + last_review: '2026-05-14T17:09:45.630655' + last_reverse_analysis: '2026-05-14T17:10:41.080069' + total_reviews: 36 total_rules_learned: 37 - iteration_count: 33 + iteration_count: 41 strategy_version: v1.7.11 strategy_revision_started_at: '2026-05-09T01:20:00' strategy_revision_note: 'v1.7.11: 触发时效治理,旧形态只作背景,消息触发显式标记' diff --git a/static/admin.html b/static/admin.html index f08ca9a..7dd9b12 100644 --- a/static/admin.html +++ b/static/admin.html @@ -8,6 +8,8 @@ 推荐 + + diff --git a/static/app.html b/static/app.html index b58612a..8c9be02 100644 --- a/static/app.html +++ b/static/app.html @@ -155,6 +155,19 @@ .decision-strip.observe .decision-title { color: var(--blue); } .decision-strip.weak .decision-title { color: var(--muted); } +.ai-insight { margin: 0 18px 8px; border: 1px solid var(--hairline-soft); border-radius: var(--radius-lg); background: var(--surface); overflow: hidden; } +.ai-insight summary { list-style: none; cursor: pointer; padding: 8px 10px; display: flex; align-items: center; justify-content: space-between; gap: 10px; font-size: 11px; font-weight: 900; color: var(--ink); } +.ai-insight summary::-webkit-details-marker { display: none; } +.ai-insight .ai-tag { font-size: 10px; color: var(--blue); background: rgba(66,98,255,.08); border-radius: 999px; padding: 2px 8px; white-space: nowrap; } +.ai-insight .ai-body { border-top: 1px solid var(--hairline-soft); padding: 8px 10px 10px; display: grid; gap: 8px; } +.ai-insight .ai-summary { color: var(--slate); font-size: 12px; line-height: 1.5; } +.ai-insight .ai-grid { display: grid; grid-template-columns: repeat(2, minmax(0, 1fr)); gap: 6px; } +.ai-insight .ai-item { border: 1px solid var(--hairline-soft); border-radius: 10px; background: var(--canvas); padding: 6px 8px; min-width: 0; } +.ai-insight .ai-label { color: var(--stone); font-size: 10px; font-weight: 800; } +.ai-insight .ai-text { color: var(--ink); font-size: 12px; line-height: 1.45; margin-top: 3px; word-break: break-word; } +.ai-insight .ai-list { display: flex; flex-wrap: wrap; gap: 4px; } +.ai-insight .ai-pill { display: inline-flex; padding: 4px 7px; border-radius: 999px; font-size: 11px; color: var(--slate); background: var(--canvas); border: 1px solid var(--hairline-soft); } + /* ===== K-LINE ===== */ .kline-wrap { padding: 0 8px 4px; } .kline-int-bar { display: flex; gap: 2px; padding: 0 10px 6px; } @@ -728,6 +741,39 @@ function renderRecCard(r) { var decisionFocus = isBuy ? ('现价 '+fmtP(price)) : (isWait ? ('等 '+fmtP(entryRef)) : (isWeakObserve ? '低优先级观察' : '等待确认')); var decisionReason = cleanDisplayText(isBuy ? (entryWindowSummary() || '入场窗口有效') : (isWait ? '现价不追,等回踩价附近再评估' : (r.observe_reason || r.state_reason || '未形成入场窗口'))); var decisionHtml = '
最终建议'+decisionTitle+'
'+decisionFocus+''+decisionReason+'
'; + var aiInsightHtml = ''; + var aiInsight = r.llm_insight && r.llm_insight.content ? r.llm_insight.content : null; + function hasAiText(v) { + if (Array.isArray(v)) return v.some(function(x){ return cleanDisplayText(x).replace(/^-+$/,'').trim(); }); + return !!cleanDisplayText(v).replace(/^-+$/,'').trim(); + } + if (aiInsight && ( + hasAiText(aiInsight.summary) || + hasAiText(aiInsight.why_now_or_not) || + hasAiText(aiInsight.key_evidence) || + hasAiText(aiInsight.risk_flags) || + hasAiText(aiInsight.watch_points) || + hasAiText(aiInsight.invalid_if) + )) { + var evidenceHtml = (aiInsight.key_evidence || []).slice(0, 4).map(function(x){ return ''+cleanDisplayText(x)+''; }).join(''); + var riskHtml = (aiInsight.risk_flags || []).slice(0, 4).map(function(x){ return ''+cleanDisplayText(x)+''; }).join(''); + var watchHtml = (aiInsight.watch_points || []).slice(0, 4).map(function(x){ return ''+cleanDisplayText(x)+''; }).join(''); + var invalidHtml = (aiInsight.invalid_if || []).slice(0, 4).map(function(x){ return ''+cleanDisplayText(x)+''; }).join(''); + aiInsightHtml = + '
'+ + 'AI 解读缓存'+ + '
'+ + '
'+cleanDisplayText(aiInsight.summary || aiInsight.why_now_or_not || '暂无摘要')+'
'+ + '
'+ + '
为什么现在 / 为什么不现在
'+cleanDisplayText(aiInsight.why_now_or_not || '--')+'
'+ + '
关键证据
'+(evidenceHtml || '--')+'
'+ + '
风险提示
'+(riskHtml || '--')+'
'+ + '
观察点
'+(watchHtml || '--')+'
'+ + '
'+ + '
失效条件
'+(invalidHtml || '--')+'
'+ + '
'+ + '
'; + } var entryPlanHtml = ''; if (isTradePlan) { entryPlanHtml = '
' + @@ -747,6 +793,7 @@ function renderRecCard(r) { return '
'+base.slice(0,2).toUpperCase()+'
'+base+'
'+actionBadge+''+score+''+st.label+'
'+ '
$'+priceFmt+''+changeHtml+'
'+ decisionHtml+ + aiInsightHtml+ '
'+ (isWeakObserve ? weakNoteHtml : entryPlanHtml)+ (sigHtml?'
'+sigHtml+'
':'')+ diff --git a/static/base.html b/static/base.html index 39eb967..35026b5 100644 --- a/static/base.html +++ b/static/base.html @@ -153,6 +153,8 @@ a { color: inherit; text-decoration: none; } + + @@ -175,6 +177,8 @@ a { color: inherit; text-decoration: none; } 推荐 + + diff --git a/static/cron.html b/static/cron.html new file mode 100644 index 0000000..871120a --- /dev/null +++ b/static/cron.html @@ -0,0 +1,61 @@ +{% extends "base.html" %} +{% block title %}AlphaX Agent | Crypto — 调度中心{% endblock %} +{% block nav_links %} +看板 +舆情 +订阅 +推荐 + + + + + + + +{% endblock %} +{% block extra_head_css %} + +{% endblock %} +{% block content %} +
+
+

调度中心

查看 Docker scheduler 当前状态,控制任务启停、周期与手动触发。

+
+
+
+
+
任务
--
+
任务状态启用周期下次运行最近结果运行信息操作
加载中...
+
+
+
手动触发记录
最近 30 条
+
加载中...
+
+
+
+
+{% endblock %} +{% block extra_script %} + +{% endblock %} diff --git a/static/iteration.html b/static/iteration.html index a37645b..6eff4a1 100644 --- a/static/iteration.html +++ b/static/iteration.html @@ -7,6 +7,8 @@ 推荐 + + @@ -96,6 +98,9 @@ h2 { font-size:26px; font-weight:900; margin:0 0 8px; color:var(--ink); } .rule-quality.good { background:var(--green-light); color:var(--green); } .rule-quality.wait { background:var(--yellow-light); color:var(--yellow-dark); } .rule-quality.bad { background:var(--red-light); color:var(--red); } +.ai-memo { margin-top:12px; border:1px solid var(--hairline-soft); border-radius:18px; background:var(--surface); padding:14px; } +.ai-memo .label { color:var(--stone); font-size:11px; font-weight:900; margin-bottom:6px; } +.ai-memo .text { color:var(--slate); font-size:13px; line-height:1.7; } @media(max-width:860px){ .summary-report{grid-template-columns:1fr;} } @@ -160,6 +165,7 @@ function renderUserReport(d){ var ov=d.overview||{}, dry=d.dry_run||{}, ds=ov.dry_run_summary||{}; var decision=ov.latest_release_decision || (dry.would_bump_version?'release':'hold'); var reason=ov.latest_release_reason || dry.release_reason || '样本仍在积累,暂不改变线上策略。'; + var aiMemo = d.llm_review_memo && d.llm_review_memo.content ? d.llm_review_memo.content : null; var candidates=(d.candidates||[]).slice(0,3); var failures=((ov.failure_type_counts)||[]).slice(0,3); var candHtml=candidates.length?candidates.map(function(c){ @@ -170,7 +176,8 @@ function renderUserReport(d){ return '
'+esc(name)+''+qtxt+' 样本 '+esc(c.sample_size||0)+' · 置信 '+esc(c.confidence_score||0)+' · 平均表现 '+esc(c.avg_pnl||0)+'
'; }).join(''):'
暂无新规律当前没有足够证据支持策略改动。
'; var failHtml=failures.length?failures.map(function(f){return '
'+esc(f.type||'失败模式')+'出现 '+esc(f.count||0)+' 次,后续复盘会重点观察是否重复发生。
';}).join(''):'
暂无集中失败模式当前失败样本不足,先继续观察。
'; - $('userReport').innerHTML = '
本轮策略结论 '+badge(decision)+'
'+decisionText(decision)+'
'+esc(reason)+'
'+candHtml+'
' + + var aiHtml = aiMemo ? '
AI 复盘摘要
'+esc(aiMemo.summary || aiMemo.memo || aiMemo.why_it_matters || '暂无摘要')+'
' : ''; + $('userReport').innerHTML = '
本轮策略结论 '+badge(decision)+'
'+decisionText(decision)+'
'+esc(reason)+'
'+aiHtml+'
'+candHtml+'
' + '
最近最该关注的错误
系统不只看成功因子,也会记录反复导致失败的原因,避免下一轮继续犯同样的错。
'+failHtml+'
'; } diff --git a/static/llm_insights.html b/static/llm_insights.html new file mode 100644 index 0000000..5fe505d --- /dev/null +++ b/static/llm_insights.html @@ -0,0 +1,77 @@ +{% extends "base.html" %} +{% block title %}AlphaX Agent | Crypto — AI 记录{% endblock %} +{% block nav_links %} +看板 +舆情 +订阅 +推荐 + + + + + + + +{% endblock %} +{% block extra_head_css %} + +{% endblock %} +{% block content %} +
+
+
+

AI 记录

+

查看每一次 LLM 解释任务:它看了什么结构化输入、调用了哪个模型、产出了什么解释,以及是否失败。

+
+
+ + + +
+
+
+
+
+
调用记录
+
--
+
+
加载中...
+
+
+
这次 AI 做了什么
--
+
选择左侧一条记录查看详情
+
+
+
+{% endblock %} +{% block extra_script %} + +{% endblock %} diff --git a/static/pipeline.html b/static/pipeline.html index 5ab8936..c6291bb 100644 --- a/static/pipeline.html +++ b/static/pipeline.html @@ -7,6 +7,8 @@ 推荐 + + @@ -23,17 +25,22 @@
-
- - - -- -
加载中...
-
批次
--
+
+
+
批次
+
--
+
+
+ + + -- +
+
加载批次...
@@ -55,7 +62,7 @@ function statusCls(s){return s==='success'?'ok':'err';} function label(text,cls){return ''+esc(text)+'';} function updatePager(){var info=$('pageInfo');var prev=$('prevPageBtn');var next=$('nextPageBtn');if(!info||!prev||!next)return;var totalPages=state.totalPages||0;info.textContent=totalPages?('第 '+state.page+' / '+totalPages+' 页,共 '+state.totalCount+' 批'):'暂无数据';prev.disabled=state.page<=1;next.disabled=!totalPages||state.page>=totalPages;} function renderKpis(k){var items=[['批次数',k.run_count||0,''],['粗筛命中',k.rough_candidates||0,''],['细筛通过',k.fine_qualified||0,'blue'],['确认命中',k.confirm_hits||0,'blue'],['推荐生成',k.recommendations||0,'green'],['复盘成功',k.perf_success||0,'green'],['失败/漏选',(k.perf_failed||0)+' / '+(k.missed_count||0),'red']];$('kpis').innerHTML=items.map(function(x){return '
'+x[0]+''+x[1]+'
';}).join('');} -function renderRuns(){var list=state.runs||[];$('runCount').textContent=list.length+' 批';updatePager();if(!list.length){$('runList').innerHTML='
暂无粗筛批次
';return;}$('runList').innerHTML=list.map(function(r){var active=state.selected===r.run_id?' active':'';var notes=(r.issue_notes||[]).join(' / ')||'链路正常';return '
'+fmtTime(r.started_at)+'
'+esc(notes)+'
'+esc(r.result_status||r.run_status)+'
粗筛'+r.rough_candidates+'
细筛'+r.fine_qualified+'
确认'+r.confirm_hits+'/'+r.confirm_processed+'
推荐'+r.recommendations+'
绩效'+r.perf_success+'/'+r.perf_failed+'
';}).join('');} +function renderRuns(){var list=state.runs||[];$('runCount').textContent='本页 '+list.length+' / 全部 '+state.totalCount+' 批';updatePager();if(!list.length){$('runList').innerHTML='
暂无粗筛批次
';return;}$('runList').innerHTML=list.map(function(r){var active=state.selected===r.run_id?' active':'';var notes=(r.issue_notes||[]).join(' / ')||'链路正常';return '
'+fmtTime(r.started_at)+'
'+esc(notes)+'
'+esc(r.result_status||r.run_status)+'
粗筛'+r.rough_candidates+'
细筛'+r.fine_qualified+'
确认'+r.confirm_hits+'/'+r.confirm_processed+'
推荐'+r.recommendations+'
绩效'+r.perf_success+'/'+r.perf_failed+'
';}).join('');} function resetSelection(){state.selected=null;state.detail=null;} async function loadRuns(){try{$('runList').innerHTML='
加载批次...
';state.hours=Number($('hoursSel').value||24);var d=await (await fetch('/api/pipeline/runs?hours='+state.hours+'&limit='+state.limit+'&offset='+state.offset)).json();state.runs=d.runs||[];state.totalPages=(d.pagination&&d.pagination.total_pages)||0;state.totalCount=(d.pagination&&d.pagination.total_count)||0;state.page=(d.pagination&&d.pagination.page)||1;renderKpis(d.kpi||{});if(state.selected&&!state.runs.some(function(r){return r.run_id===state.selected;}))resetSelection();if(!state.selected&&state.runs[0])state.selected=state.runs[0].run_id;renderRuns();if(state.selected)selectRun(state.selected,true);else $('detailBody').innerHTML='
暂无可展示批次
';}catch(e){$('runList').innerHTML='
加载失败
';updatePager();}} async function selectRun(id,quiet){state.selected=id;renderRuns();$('detailBody').innerHTML='
加载详情...
';try{var d=await (await fetch('/api/pipeline/runs/'+id)).json();state.detail=d;state.filter='all';renderDetail();}catch(e){$('detailBody').innerHTML='
详情加载失败
';}} diff --git a/static/referral.html b/static/referral.html index 37f4c80..3178f65 100644 --- a/static/referral.html +++ b/static/referral.html @@ -8,6 +8,8 @@ 推荐 + + diff --git a/static/sentiment.html b/static/sentiment.html index a1742d7..a2c758a 100644 --- a/static/sentiment.html +++ b/static/sentiment.html @@ -7,6 +7,8 @@ 推荐 + + @@ -19,46 +21,44 @@ /* Page title */ .page-title { font-size: 24px; font-weight: 800; color: var(--ink); margin-bottom: 4px; } -.page-sub { font-size: 13px; color: var(--stone); margin-bottom: 24px; } +.page-sub { font-size: 13px; color: var(--stone); margin-bottom: 18px; } /* === SECTION: DASHBOARD === */ -.dashboard-grid { - display: grid; grid-template-columns: 1fr 1fr; - gap: 14px; margin-bottom: 28px; +.market-context { + display: grid; grid-template-columns: minmax(210px, 260px) 1fr; + gap: 10px; margin-bottom: 18px; } -@media(max-width:640px) { .dashboard-grid { grid-template-columns: 1fr; } } +@media(max-width:720px) { .market-context { grid-template-columns: 1fr; } } /* Fear & Greed */ .fg-card { background: var(--canvas); border: 1px solid var(--hairline-soft); - border-radius: var(--radius-xl); padding: 24px; - display: flex; flex-direction: column; align-items: center; gap: 12px; + border-radius: var(--radius-lg); padding: 10px 12px; + display: grid; grid-template-columns: auto 1fr; align-items: center; gap: 8px 10px; } -.fg-card .fg-label { font-size: 12px; color: var(--stone); font-weight: 600; text-transform: uppercase; letter-spacing: .5px; } -.fg-card .fg-value { font-size: 56px; font-weight: 900; line-height: 1; transition: color .3s; } -.fg-card .fg-class { font-size: 15px; font-weight: 700; padding: 4px 16px; border-radius: var(--radius-full); } -.fg-gauge { width: 100%; height: 8px; border-radius: 4px; background: linear-gradient(to right, #e53e3e, #f59e0b, #84cc16); position: relative; } +.fg-card .fg-label { grid-column: 1 / -1; font-size: 11px; color: var(--stone); font-weight: 800; } +.fg-card .fg-value { font-size: 28px; font-weight: 900; line-height: 1; transition: color .3s; } +.fg-card .fg-class { justify-self: start; font-size: 12px; font-weight: 800; padding: 3px 9px; border-radius: var(--radius-full); } +.fg-gauge { grid-column: 1 / -1; width: 100%; height: 5px; border-radius: 999px; background: linear-gradient(to right, #e53e3e, #f59e0b, #84cc16); position: relative; } .fg-gauge::after { - content: ""; position: absolute; top: -4px; - width: 16px; height: 16px; border-radius: 50%; background: var(--canvas); - border: 3px solid var(--ink); transition: left .5s; - left: 0%; + content: ""; position: absolute; top: -3px; + width: 11px; height: 11px; border-radius: 50%; background: var(--canvas); + border: 2px solid var(--ink); transition: left .5s; + left: calc(var(--pos, 0%) - 5px); } /* Trending card */ .trend-card { background: var(--canvas); border: 1px solid var(--hairline-soft); - border-radius: var(--radius-xl); padding: 18px 20px; + border-radius: var(--radius-lg); padding: 10px 12px; min-width: 0; } -.trend-card .section-label { font-size: 12px; color: var(--stone); font-weight: 600; margin-bottom: 14px; } -.trend-list { display: flex; flex-direction: column; gap: 10px; } -.trend-row { display: flex; align-items: center; gap: 10px; padding: 6px 0; border-bottom: 1px solid var(--hairline-soft); } -.trend-row:last-child { border-bottom: 0; } -.trend-icon { width: 28px; height: 28px; border-radius: 50%; background: var(--surface); display: grid; place-items: center; font-weight: 800; font-size: 10px; color: var(--steel); flex-shrink: 0; } -.trend-icon img { width: 28px; height: 28px; border-radius: 50%; } -.trend-name { font-weight: 700; font-size: 14px; color: var(--ink); } +.trend-card .section-label { font-size: 11px; color: var(--stone); font-weight: 800; margin-bottom: 8px; } +.trend-list { display: flex; flex-wrap: wrap; gap: 6px; min-height: 29px; align-items: center; } +.trend-pill { display: inline-flex; align-items: center; gap: 6px; max-width: 160px; padding: 4px 8px; border: 1px solid var(--hairline-soft); border-radius: 999px; background: var(--surface); color: var(--slate); font-size: 12px; font-weight: 800; } +.trend-pill img { width: 16px; height: 16px; border-radius: 50%; flex-shrink: 0; } +.trend-name { overflow: hidden; text-overflow: ellipsis; white-space: nowrap; color: var(--ink); } .trend-symbol { font-size: 11px; color: var(--stone); margin-left: 4px; } -.trend-rank { margin-left: auto; font-size: 11px; color: var(--muted); } +.trend-rank { font-size: 10px; color: var(--muted); } /* === SECTION: NEWS FEED === */ .feed-header { display: flex; align-items: center; gap: 10px; margin-bottom: 16px; } @@ -87,6 +87,30 @@ .news-meta { display: flex; align-items: center; gap: 8px; font-size: 11px; color: var(--muted); } .news-meta .dot { width: 3px; height: 3px; border-radius: 50%; background: var(--hairline); } +.ai-brief { margin-top: 8px; border: 1px solid var(--hairline-soft); border-radius: var(--radius-lg); background: var(--surface); padding: 10px 12px; } +.ai-brief .label { font-size: 10px; color: var(--stone); font-weight: 900; margin-bottom: 4px; } +.ai-brief .text { font-size: 12px; color: var(--slate); line-height: 1.55; } +.ai-brief .chips { display: flex; flex-wrap: wrap; gap: 4px; margin-top: 6px; } +.ai-brief .chip { display: inline-flex; padding: 3px 7px; border-radius: 999px; border: 1px solid var(--hairline-soft); background: var(--canvas); color: var(--slate); font-size: 10px; } +.analysis-card { background: var(--canvas); border: 1px solid var(--hairline-soft); border-radius: var(--radius-xl); padding: 18px; margin-bottom: 18px; } +.analysis-head { display:flex; align-items:flex-start; justify-content:space-between; gap:12px; margin-bottom:12px; } +.analysis-title { font-size: 16px; font-weight: 900; color: var(--ink); } +.analysis-meta { color: var(--stone); font-size: 11px; font-weight: 800; text-align:right; line-height:1.5; } +.mood { display:inline-flex; border-radius:999px; padding:4px 9px; font-size:11px; font-weight:900; background:var(--surface); color:var(--slate); border:1px solid var(--hairline-soft); } +.mood.risk_on { color: var(--green); background: var(--green-light); border-color: rgba(0,180,115,.18); } +.mood.risk_off { color: var(--red); background: var(--red-light); border-color: rgba(229,62,62,.18); } +.analysis-summary { color: var(--slate); font-size: 14px; line-height: 1.75; margin-bottom: 14px; } +.analysis-grid { display:grid; grid-template-columns: 1fr 1fr; gap: 12px; } +.analysis-section { border:1px solid var(--hairline-soft); border-radius: var(--radius-lg); background: var(--surface); padding: 12px; min-width:0; } +.analysis-section h3 { font-size: 12px; font-weight: 900; color: var(--ink); margin-bottom: 9px; } +.analysis-item { border-top:1px solid var(--hairline-soft); padding:8px 0; color:var(--slate); font-size:12px; line-height:1.55; } +.analysis-item:first-of-type { border-top:0; padding-top:0; } +.analysis-item b { color:var(--ink); font-size:13px; } +.analysis-item .sub { display:block; margin-top:3px; color:var(--stone); } +.symbol-tags { display:flex; flex-wrap:wrap; gap:4px; margin-top:5px; } +.symbol-tag { display:inline-flex; padding:3px 7px; border-radius:999px; background:var(--canvas); border:1px solid var(--hairline-soft); color:var(--blue); font-size:10px; font-weight:900; } +@media(max-width:760px){ .analysis-grid{grid-template-columns:1fr;} .analysis-head{display:block;} .analysis-meta{text-align:left;margin-top:8px;} } + /* Empty */ .empty-state { text-align:center; padding:48px 20px; color:var(--stone); } .empty-state p { font-size:14px; } @@ -97,8 +121,8 @@ .spin { animation: spin 1s linear infinite; } @keyframes spin { to{ transform:rotate(360deg) } } +@media(max-width:640px) { .shell { width: min(100% - 24px, 960px); } - .fg-card .fg-value { font-size: 42px; } .news-card { padding: 14px 14px; gap: 10px; } .news-source { min-width: 48px; font-size: 9px; padding: 3px 6px; } } @@ -107,10 +131,14 @@ {% block content %}

实时舆情

-

市场情绪 + 热门币种 + 最新加密新闻

+

AI 舆情研判 + 本轮分析来源

+ +
+

等待 AI 舆情分析结果...

+
-
+
恐惧 & 贪婪指数
--
@@ -121,14 +149,14 @@
-
?加载中...
+ 加载中...
- +
-

新闻信息流

+

本轮分析来源

--
@@ -202,6 +230,8 @@ function ageStr(h) { return Math.floor(h / 24) + '天前'; } +function esc(v){ return String(v==null?'':v).replace(/[&<>"']/g,function(c){return {'&':'&','<':'<','>':'>','"':'"',"'":'''}[c];}); } + function fgColor(v) { if (v <= 25) return 'var(--red)'; if (v <= 45) return 'var(--orange)'; @@ -210,10 +240,59 @@ function fgColor(v) { return 'var(--green)'; } +function fmtTime(t) { + if (!t) return '--'; + var d = new Date(t); + if (isNaN(d.getTime())) return t; + return (d.getMonth()+1)+'/'+d.getDate()+' '+String(d.getHours()).padStart(2,'0')+':'+String(d.getMinutes()).padStart(2,'0'); +} + +function renderAnalysis(resp) { + var box = document.getElementById('aiAnalysis'); + var a = resp && resp.analysis ? resp.analysis : null; + if (!a) { + var msg = resp && resp.error ? ('AI 舆情分析未完成:' + resp.error) : '暂无 AI 舆情分析。调度器会定时生成,或运行 llm-insights --scope sentiment。'; + box.innerHTML = '

'+esc(msg)+'

'; + return; + } + var mood = a.market_mood || 'neutral'; + function symbolsHtml(items) { + return (items || []).slice(0, 8).map(function(s){ return ''+esc(s)+''; }).join(''); + } + var themes = (a.hot_themes || []).slice(0, 6).map(function(x){ + return '
'+esc(x.theme || '主题')+''+esc(x.impact || x.reason || '--')+'
'+symbolsHtml(x.symbols || [])+'
'; + }).join('') || '
暂无明确主题
'; + var impacts = (a.coin_impacts || []).slice(0, 8).map(function(x){ + var check = x.need_technical_check ? ' · 需要技术检查' : ''; + return '
'+esc(x.symbol || '--')+''+esc((x.direction || 'neutral') + check)+''+esc(x.reason || '--')+'
'; + }).join('') || '
暂无币种影响
'; + var risks = (a.risk_events || []).slice(0, 6).map(function(x){ + return '
'+esc(x.risk_type || x.title || '风险事件')+''+esc(x.title || x.reason || '--')+'
'+symbolsHtml(x.symbols || [])+'
'; + }).join('') || '
暂无集中风险
'; + var watch = (a.watchlist || []).slice(0, 8).map(function(x){ + return '
'+esc(x.symbol || '--')+''+esc(x.why || '--')+'触发条件:'+esc(x.trigger || '--')+'
'; + }).join('') || '
暂无重点观察
'; + box.innerHTML = + '
AI 舆情研判 '+esc(mood)+'
模型 '+esc(resp.model || '--')+'
更新 '+fmtTime(resp.updated_at)+' · 来源 '+esc(resp.event_count || 0)+' 条
'+ + '
'+esc(a.summary || '暂无摘要')+'
'+ + '
'+ + '

主线主题

'+themes+'
'+ + '

币种影响

'+impacts+'
'+ + '

风险事件

'+risks+'
'+ + '

重点观察

'+watch+'
'+ + '
'; +} + async function loadFeed() { try { var resp = await fetch(API + '/api/newsfeed'); var data = await resp.json(); + var analysisResp = {}; + try { + var ar = await fetch(API + '/api/sentiment/analysis'); + if (ar.ok) analysisResp = await ar.json(); + } catch(e) {} + renderAnalysis(analysisResp); // Fear & Greed var fg = data.fear_greed; @@ -226,48 +305,64 @@ async function loadFeed() { document.getElementById('fgClass').style.color = clr; document.getElementById('fgClass').style.background = clr + '15'; document.getElementById('fgGauge').style.setProperty('--pos', v + '%'); - // Update gauge pointer position - var g = document.getElementById('fgGauge'); - g.style.setProperty('--pos', v + '%'); - // Re-apply ::after style with JS since CSS custom properties in pseudo-elements can be tricky - var sheet = document.createElement('style'); - sheet.textContent = '#fgGauge::after { left: calc(' + v + '% - 8px); }'; - document.head.appendChild(sheet); } // Trending var trends = data.trending || []; document.getElementById('trendList').classList.remove('loading-pulse'); if (trends.length) { - document.getElementById('trendList').innerHTML = trends.map(function(t, i) { - var icon = t.thumb - ? '' + t.symbol + '' + t.symbol.charAt(0) + '' - : t.symbol.slice(0, 2).toUpperCase(); - return '
' + - '
' + icon + '
' + - '
' + t.name + '' + t.symbol + '
' + - '#' + (t.market_cap_rank || '--') + '' + - '
'; + document.getElementById('trendList').innerHTML = trends.slice(0, 8).map(function(t) { + var symbol = String(t.symbol || '').toUpperCase(); + var icon = t.thumb ? '' + esc(symbol) + '' : ''; + return '' + + icon + + '' + esc(t.name || symbol || '--') + '' + + '' + esc(symbol) + '' + + '#' + esc(t.market_cap_rank || '--') + '' + + ''; }).join(''); } else { - document.getElementById('trendList').innerHTML = '
暂无数据
'; + document.getElementById('trendList').innerHTML = '暂无数据'; } - // News feed - var news = data.news || []; + // Source feed: only show the events/news that fed the latest AI analysis. + var events = ((analysisResp && analysisResp.source_events) || []).map(function(e){ + return { + title: e.title, + url: e.url, + source: e.source_label || e.source || '事件', + age_hours: null, + lang: e.related_base || '事件', + llm_insight: null, + relation_tag: e.related_symbol || e.related_base || '', + importance: e.importance + }; + }); + var news = events.slice(0, 50); document.getElementById('feedCount').textContent = news.length + ' 条'; if (news.length) { document.getElementById('newsFeed').innerHTML = news.map(function(n) { var isCn = n.lang === 'cn'; - return '' + - '' + n.source + '' + + var langLabel = n.relation_tag || (isCn ? '中文' : (n.importance ? ('重要性 ' + n.importance) : 'EN')); + var ai = n.llm_insight && n.llm_insight.content ? n.llm_insight.content : null; + var tags = ai ? (ai.key_tags || ai.theme_tags || ai.risk_types || []) : []; + var aiHtml = ai ? ( + '
' + + '
AI 解读
' + + '
' + esc(ai.summary || ai.why_now_or_not || '暂无摘要') + '
' + + '
' + (tags.slice(0,4).map(function(x){ return ''+esc(x)+''; }).join('') || '') + '
' + + '
' + ) : ''; + return '
' + + '' + esc(n.source) + '' + '
' + - '
' + n.title + '
' + + '
' + esc(n.title) + '
' + '
' + - '' + (isCn ? '中文' : 'EN') + '' + + '' + esc(langLabel) + '' + '' + '' + ageStr(n.age_hours) + '' + '
' + + aiHtml + '
' + '
'; }).join(''); diff --git a/static/strategy.html b/static/strategy.html index 30217d7..5bd026e 100644 --- a/static/strategy.html +++ b/static/strategy.html @@ -7,6 +7,8 @@ 推荐 + + diff --git a/static/subscription.html b/static/subscription.html index 270bb50..29b15f4 100644 --- a/static/subscription.html +++ b/static/subscription.html @@ -7,6 +7,8 @@ 推荐 + + diff --git a/static/watchlist.html b/static/watchlist.html index 7cff3c3..89034cb 100644 --- a/static/watchlist.html +++ b/static/watchlist.html @@ -7,6 +7,8 @@ 推荐 + + diff --git a/tests/test_llm_insights.py b/tests/test_llm_insights.py new file mode 100644 index 0000000..701ccb2 --- /dev/null +++ b/tests/test_llm_insights.py @@ -0,0 +1,306 @@ +import json +import os +import sqlite3 +import sys + +import pytest +from fastapi.testclient import TestClient + +PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if PROJECT_DIR not in sys.path: + sys.path.insert(0, PROJECT_DIR) + +from app.db import altcoin_db +from app.db.llm_insights import compute_input_hash, get_cached_insight, repair_mojibake_json, upsert_insight +from app.services import event_driven_screener, llm_insights +from app.web import web_server + + +@pytest.fixture +def temp_db(monkeypatch, tmp_path): + db_path = tmp_path / "altcoin_monitor.db" + monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) + altcoin_db.init_db() + return db_path + + +def _insert_recommendation(db_path, **kwargs): + defaults = dict( + symbol="AAA/USDT", + rec_time="2026-05-01T10:00:00", + rec_state="加速", + rec_score=80, + entry_price=100.0, + stop_loss=95.0, + tp1=110.0, + tp2=120.0, + sector="AI", + signals=json.dumps(["15min 即刻入场信号"], ensure_ascii=False), + signal_codes_json=json.dumps(["vp_fly_1h_current"], ensure_ascii=False), + signal_labels_json=json.dumps(["15min 即刻入场信号"], ensure_ascii=False), + is_meme=0, + status="active", + current_price=100.0, + max_price=104.0, + min_price=98.0, + pnl_pct=0.0, + max_pnl_pct=4.0, + max_drawdown_pct=-1.0, + hit_tp1_time="", + hit_tp2_time="", + stopped_out_time="", + expired_time="", + last_track_time="2026-05-01T10:10:00", + entry_plan_json=json.dumps({"entry_price": 100.0, "entry_action": "可即刻买入"}, ensure_ascii=False), + action_status="可即刻买入", + direction="多头启动", + execution_status="buy_now", + display_bucket="realtime", + lifecycle_state="position", + entry_triggered=1, + state_reason="推荐时就是可即刻买入", + strategy_version="v1.0", + ) + defaults.update(kwargs) + conn = sqlite3.connect(db_path) + cols = ",".join(defaults.keys()) + qs = ",".join(["?"] * len(defaults)) + conn.execute(f"INSERT INTO recommendation ({cols}) VALUES ({qs})", tuple(defaults.values())) + conn.commit() + conn.close() + + +def _fetch_llm_row(db_path): + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + row = conn.execute("SELECT * FROM llm_insights ORDER BY id DESC LIMIT 1").fetchone() + conn.close() + return dict(row) if row else None + + +def test_disabled_llm_skips_and_does_not_change_mainline(monkeypatch, temp_db): + _insert_recommendation(temp_db) + monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: False) + result = llm_insights.run(scope="recommendations", limit=10) + assert result["status"] == "skipped" + + rows = altcoin_db.get_active_recommendations_deduped(actionable_only=False) + target = next(r for r in rows if r["symbol"] == "AAA/USDT") + assert target["execution_status"] == "buy_now" + assert "llm_insight" not in target + + +def test_same_input_hash_is_cached_without_repeated_call(monkeypatch, temp_db): + _insert_recommendation(temp_db) + monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) + monkeypatch.setattr(llm_insights, "_call_llm_json", lambda prompt, payload: {"status": "success", "content": {"summary": "ok"}, "model": "m"}) + + calls = [] + original_upsert = llm_insights.upsert_insight + + def wrapped_upsert(*args, **kwargs): + calls.append((args, kwargs)) + return original_upsert(*args, **kwargs) + + monkeypatch.setattr(llm_insights, "upsert_insight", wrapped_upsert) + + first = llm_insights.run(scope="recommendations", limit=10) + second = llm_insights.run(scope="recommendations", limit=10) + assert first["processed"] == 1 + assert second["processed"] == 0 + assert len(calls) == 1 + + rows = altcoin_db.get_active_recommendations_deduped(actionable_only=False) + target = next(r for r in rows if r["symbol"] == "AAA/USDT") + assert target["llm_insight"]["content"]["summary"] == "ok" + + +def test_invalid_json_is_marked_failed(monkeypatch, temp_db): + _insert_recommendation(temp_db, symbol="BBB/USDT", rec_time="2026-05-01T11:00:00") + monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) + monkeypatch.setattr(llm_insights, "_call_llm_json", lambda prompt, payload: {"status": "failed", "error": "invalid_json", "model": "m"}) + llm_insights.run(scope="recommendations", limit=10) + row = _fetch_llm_row(temp_db) + assert row["status"] == "failed" + + +def test_only_key_samples_generate_insights(monkeypatch, temp_db): + _insert_recommendation(temp_db, symbol="CCC/USDT", action_status="观察", execution_status="observe", display_bucket="watch_pool", state_reason="普通观察") + _insert_recommendation(temp_db, symbol="DDD/USDT", action_status="等回踩", execution_status="wait_pullback", display_bucket="realtime", rec_time="2026-05-01T12:00:00") + monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) + seen = [] + monkeypatch.setattr(llm_insights, "_call_llm_json", lambda prompt, payload: seen.append(payload["symbol"]) or {"status": "success", "content": {"summary": "ok"}, "model": "m"}) + llm_insights.run(scope="recommendations", limit=10) + assert "CCC/USDT" not in seen + assert "DDD/USDT" in seen + + +def test_api_exposes_cached_ai_fields(monkeypatch, temp_db): + _insert_recommendation(temp_db) + monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) + monkeypatch.setattr(llm_insights, "_call_llm_json", lambda prompt, payload: {"status": "success", "content": {"summary": "AI 摘要", "key_evidence": ["量能增强"]}, "model": "m"}) + llm_insights.run(scope="recommendations", limit=10) + + client = TestClient(web_server.app) + resp = client.get("/api/recommendations/active?actionable_only=false") + assert resp.status_code == 200 + data = resp.json() + target = next(r for r in data if r["symbol"] == "AAA/USDT") + assert target["llm_insight"]["content"]["summary"] == "AI 摘要" + + +def test_invalid_json_gets_cached_as_failed(monkeypatch, temp_db): + _insert_recommendation(temp_db) + monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) + monkeypatch.setattr(llm_insights, "_call_llm_json", lambda prompt, payload: {"status": "failed", "error": "invalid_json:bad", "model": "m"}) + llm_insights.run(scope="recommendations", limit=10) + row = _fetch_llm_row(temp_db) + assert row["status"] == "failed" + assert "invalid_json" in row["error"] + + +def test_llm_insights_api_exposes_input_and_output(temp_db): + payload = {"symbol": "AAA/USDT", "action_status": "可即刻买入", "signals": ["15min 即刻入场信号"]} + upsert_insight( + "recommendation", + "7", + llm_insights.PROMPTS["recommendation_explain_v1"], + llm_insights.PROMPTS["recommendation_explain_v1"], + compute_input_hash(payload), + "success", + input_payload=payload, + content={"summary": "AI 判断现在处于入场窗口", "key_evidence": ["量能增强"]}, + model="test-model", + ) + client = TestClient(web_server.app) + resp = client.get("/api/llm/insights") + assert resp.status_code == 200 + data = resp.json() + assert data["items"][0]["model"] == "test-model" + assert data["items"][0]["input"]["symbol"] == "AAA/USDT" + assert data["items"][0]["content"]["summary"] == "AI 判断现在处于入场窗口" + + detail = client.get(f"/api/llm/insights/{data['items'][0]['id']}").json() + assert detail["input"]["signals"] == ["15min 即刻入场信号"] + + +def test_sentiment_batch_analysis_api_returns_cached_result(temp_db): + payload = { + "target_type": "sentiment_batch", + "target_id": "sentiment_batch:24h", + "hours": 24, + "event_count": 1, + "events": [{"title": "Binance Will List ABCUSDT", "related_symbol": "ABC/USDT"}], + } + upsert_insight( + "sentiment_batch", + "sentiment_batch:24h", + llm_insights.PROMPTS["sentiment_batch_analyze_v1"], + llm_insights.PROMPTS["sentiment_batch_analyze_v1"], + compute_input_hash(payload), + "success", + input_payload=payload, + content={ + "market_mood": "risk_on", + "summary": "上币事件带动短线风险偏好", + "coin_impacts": [{"symbol": "ABC/USDT", "direction": "positive", "reason": "Binance listing", "need_technical_check": True}], + }, + model="test-model", + ) + client = TestClient(web_server.app) + resp = client.get("/api/sentiment/analysis") + assert resp.status_code == 200 + data = resp.json() + assert data["analysis"]["summary"] == "上币事件带动短线风险偏好" + assert data["source_events"][0]["related_symbol"] == "ABC/USDT" + + +def test_sentiment_batch_enqueues_technical_check_candidates(monkeypatch, temp_db): + event_driven_screener.init_event_tables() + conn = sqlite3.connect(temp_db) + conn.execute( + """ + INSERT INTO event_news ( + event_hash, source, symbol, title, url, published_at, detected_at, + importance, event_type, processed + ) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'), ?, ?, 1) + """, + ( + "seed-llm-candidate", + "binance_latest", + "AAA/USDT", + "Binance announces AAA ecosystem update", + "", + "A", + "important_catalyst", + ), + ) + conn.commit() + conn.close() + + monkeypatch.setattr(llm_insights, "get_llm_module_enabled", lambda module: True) + monkeypatch.setattr( + llm_insights, + "_call_llm_json", + lambda prompt, payload: { + "status": "success", + "model": "m", + "content": { + "summary": "AAA 需要进入技术检查", + "coin_impacts": [ + { + "symbol": "AAA/USDT", + "direction": "positive", + "reason": "公告催化且主题关注升温", + "confidence": 88, + "need_technical_check": True, + }, + { + "symbol": "BBB/USDT", + "direction": "positive", + "reason": "置信度不足", + "confidence": 40, + "need_technical_check": True, + }, + ], + }, + }, + ) + + result = llm_insights.generate_sentiment_batch_analysis(limit=10) + assert result["candidate_events"]["queued"] == 1 + assert result["candidate_events"]["symbols"] == ["AAA/USDT"] + + conn = sqlite3.connect(temp_db) + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT source, symbol, event_type, processed, decision, rec_id FROM event_news WHERE source='llm_sentiment'" + ).fetchall() + conn.close() + assert len(rows) == 1 + assert rows[0]["symbol"] == "AAA/USDT" + assert rows[0]["event_type"] == "llm_sentiment_candidate" + assert rows[0]["processed"] == 0 + assert rows[0]["decision"] in ("", None) + assert rows[0]["rec_id"] == 0 + + +def test_llm_insights_page_route(temp_db): + client = TestClient(web_server.app) + resp = client.get("/llm-insights") + assert resp.status_code == 200 + assert "AI 记录" in resp.text + + +def test_mojibake_json_is_repaired_for_display(): + raw = {"结论": "ç­\x89å\x9b\x9e踩ï¼\x8cä¸\x8d追é«\x98", "list": ["æ\xa0¸å¿\x83å\x8e\x9få\x9b\xa0"]} + fixed = repair_mojibake_json(raw) + assert fixed["结论"] == "等回踩,不追高" + assert fixed["list"][0] == "核心原因" + + +def test_mixed_mojibake_text_is_repaired(): + raw = "AI舆情候选 ABC/USDT: Binance Futuresæ\x96°ä¸\x8a线永ç»\xadå\x90\x88约" + fixed = repair_mojibake_json(raw) + assert fixed == "AI舆情候选 ABC/USDT: Binance Futures新上线永续合约" diff --git a/tests/test_pipeline_runs_api.py b/tests/test_pipeline_runs_api.py index 9e8c2e1..596c3d1 100644 --- a/tests/test_pipeline_runs_api.py +++ b/tests/test_pipeline_runs_api.py @@ -235,6 +235,10 @@ def test_pipeline_runs_supports_pagination(temp_db): assert data_page2["pagination"]["page"] == 2 assert len(data_page2["runs"]) == 1 assert data_page1["runs"][0]["run_id"] != data_page2["runs"][0]["run_id"] + assert data_page1["kpi"]["run_count"] == 3 + assert data_page2["kpi"]["run_count"] == 3 + assert data_page1["kpi"]["rough_candidates"] == 6 + assert data_page2["kpi"]["rough_candidates"] == 6 def test_pipeline_api_keeps_observation_batch_without_recommendations(temp_db): diff --git a/tests/test_scheduler_control.py b/tests/test_scheduler_control.py new file mode 100644 index 0000000..cc7ce2e --- /dev/null +++ b/tests/test_scheduler_control.py @@ -0,0 +1,154 @@ +import os +import re +import subprocess +import sys + +from fastapi.testclient import TestClient + +PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if PROJECT_DIR not in sys.path: + sys.path.insert(0, PROJECT_DIR) + +from app.db import altcoin_db +from app.db import scheduler_db +from app.web import web_server +import docker.scheduler as scheduler + + +def test_scheduler_tables_seed_defaults(monkeypatch, tmp_path): + db_path = tmp_path / "altcoin_monitor.db" + monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + altcoin_db.init_db() + scheduler_db.init_scheduler_tables() + + jobs = {item["job_name"]: item for item in scheduler_db.get_job_configs()} + assert jobs["event"]["lock_group"] == "recommendation_write" + assert jobs["confirm"]["lock_group"] == "recommendation_write" + assert jobs["tracker"]["every_seconds"] == 180 + + +def test_scheduler_control_api_and_page(monkeypatch, tmp_path): + db_path = tmp_path / "altcoin_monitor.db" + monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) + altcoin_db.init_db() + scheduler_db.init_scheduler_tables() + + client = TestClient(web_server.app) + page = client.get("/cron") + assert page.status_code == 200 + assert "调度中心" in page.text + scripts = re.findall(r"", page.text) + scheduler_scripts = [s for s in scripts if "/api/scheduler/jobs" in s] + assert scheduler_scripts + subprocess.run( + ["node", "-e", f"new Function({scheduler_scripts[-1]!r})"], + check=True, + cwd=PROJECT_DIR, + ) + assert "data-action=\"trigger\"" in page.text + assert "onchange=\"toggleJob" not in page.text + assert "onclick=\"triggerJob" not in page.text + + resp = client.get("/api/scheduler/jobs") + assert resp.status_code == 200 + assert any(item["job_name"] == "event" for item in resp.json()["jobs"]) + + toggle = client.post("/api/scheduler/jobs/event/toggle", json={"enabled": False}) + assert toggle.status_code == 200 + assert scheduler_db.get_job_config("event")["enabled"] is False + + blocked = client.post("/api/scheduler/jobs/event/trigger", json={"force": False}) + assert blocked.status_code == 409 + + forced = client.post("/api/scheduler/jobs/event/trigger", json={"force": True}) + assert forced.status_code == 200 + triggers = scheduler_db.list_manual_triggers() + assert triggers[0]["job_name"] == "event" + assert triggers[0]["force"] == 1 + + interval = client.post("/api/scheduler/jobs/tracker/interval", json={"every_seconds": 240}) + assert interval.status_code == 200 + assert scheduler_db.get_job_config("tracker")["every_seconds"] == 240 + + +class _FakeProc: + _next_pid = 100 + + def __init__(self, cmd, **kwargs): + self.cmd = cmd + self.pid = _FakeProc._next_pid + _FakeProc._next_pid += 1 + self.returncode = None + self.stdout = self + + def poll(self): + return self.returncode + + def read(self): + return "ok" + + def kill(self): + self.returncode = -9 + + +def test_scheduler_starts_different_lock_groups_concurrently(monkeypatch, tmp_path): + db_path = tmp_path / "altcoin_monitor.db" + monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + altcoin_db.init_db() + scheduler_db.init_scheduler_tables() + monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc) + monkeypatch.setattr(scheduler, "DRY_RUN", False) + + jobs = { + "tracker": scheduler.Job("tracker", "tracker", 180, lock_group="tracking_write", enabled=True, next_run=0), + "screener": scheduler.Job("screener", "screener", 900, lock_group="screening_write", enabled=True, next_run=0), + } + running = {} + + scheduler.schedule_due_jobs(jobs, running) + + assert set(running) == {"tracker", "screener"} + + +def test_scheduler_blocks_shared_lock_and_prevents_reentry(monkeypatch, tmp_path): + db_path = tmp_path / "altcoin_monitor.db" + monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + altcoin_db.init_db() + scheduler_db.init_scheduler_tables() + monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc) + monkeypatch.setattr(scheduler, "DRY_RUN", False) + + jobs = { + "event": scheduler.Job("event", "event", 60, lock_group="recommendation_write", enabled=True, next_run=0), + "confirm": scheduler.Job("confirm", "confirm", 600, lock_group="recommendation_write", enabled=True, next_run=0), + } + running = {} + + scheduler.schedule_due_jobs(jobs, running) + + assert "event" in running + assert "confirm" not in running + assert jobs["confirm"].pending is True + + again = scheduler.start_job(jobs["event"], running) + assert again is False + assert len(running) == 1 + + +def test_disabled_job_does_not_auto_run_but_manual_force_can_start(monkeypatch, tmp_path): + db_path = tmp_path / "altcoin_monitor.db" + monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + altcoin_db.init_db() + scheduler_db.init_scheduler_tables() + monkeypatch.setattr(scheduler.subprocess, "Popen", _FakeProc) + monkeypatch.setattr(scheduler, "DRY_RUN", False) + + job = scheduler.Job("event", "event", 60, lock_group="recommendation_write", enabled=False, next_run=0) + running = {} + + scheduler.schedule_due_jobs({"event": job}, running) + assert running == {} + + assert scheduler.start_job(job, running, run_kind="manual", trigger_id=1) is True + assert "event" in running diff --git a/tests/test_sentiment_internal_events.py b/tests/test_sentiment_internal_events.py new file mode 100644 index 0000000..b42c03e --- /dev/null +++ b/tests/test_sentiment_internal_events.py @@ -0,0 +1,52 @@ +import os +import sqlite3 +import sys + +import pytest +from fastapi.testclient import TestClient + +PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if PROJECT_DIR not in sys.path: + sys.path.insert(0, PROJECT_DIR) + +from app.db import altcoin_db +from app.services.event_driven_screener import init_event_tables +from app.web import web_server + + +@pytest.fixture +def temp_db(monkeypatch, tmp_path): + db_path = tmp_path / "altcoin_monitor.db" + monkeypatch.setattr(altcoin_db, "DB_PATH", str(db_path)) + monkeypatch.setattr(web_server, "init_db", altcoin_db.init_db) + monkeypatch.setenv("ALPHAX_DB_PATH", str(db_path)) + altcoin_db.init_db() + init_event_tables() + return db_path + + +def _insert_event(db_path, title, event_type, symbol="SOL/USDT"): + conn = sqlite3.connect(db_path) + conn.execute( + """ + INSERT INTO event_news ( + event_hash, source, symbol, title, url, published_at, detected_at, + importance, event_type, raw_json, processed, decision, tech_score, rec_id, pushed + ) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'), ?, ?, '{}', 0, '', 0, 0, 0) + """, + (f"hash-{event_type}-{title}", "coingecko", symbol, title, "https://example.com", "A", event_type), + ) + conn.commit() + conn.close() + + +def test_sentiment_api_hides_internal_theme_expansion_events(temp_db): + _insert_event(temp_db, "[主题扩散:solana_meme] SOL(Solana) enters CoinGecko Trending #4", "theme_expansion") + _insert_event(temp_db, "Binance Will List ABCUSDT Perpetual Contracts", "major_listing_or_contract", "ABC/USDT") + + client = TestClient(web_server.app) + resp = client.get("/api/sentiment?hours=24") + assert resp.status_code == 200 + titles = [item["title"] for item in resp.json()["events"]] + assert "Binance Will List ABCUSDT Perpetual Contracts" in titles + assert not any("主题扩散" in title for title in titles)