diff --git a/docs/PROJECT_STRUCTURE.md b/docs/PROJECT_STRUCTURE.md index d93858a..a1b773e 100644 --- a/docs/PROJECT_STRUCTURE.md +++ b/docs/PROJECT_STRUCTURE.md @@ -9,7 +9,7 @@ - 校验脚本集中到 `scripts/` - 模板资源集中到 `templates/` - 运行/分析产物集中到 `reports/` -- 历史遗留与备份集中到 `legacy/` +- 历史遗留与备份已清理,不再作为常驻目录 ## 当前建议心智模型 @@ -89,7 +89,6 @@ ### 7. 非主路径归档 -- `legacy/` - `reports/` ## 已整理的文件 @@ -116,14 +115,6 @@ - `schema.py` -> `docs/reference/schema_reference.py` -### 移入 `legacy/` - -- `coin_state_tracker.py` -- `price_tracker_ws.py` -- `legacy/web/index.html` -- `legacy/static/app.html.bak` -- `legacy/scratch/*` - ## 后续建议 如果继续整理,建议下一步做实现层拆分,而不是再搬目录: diff --git a/legacy/coin_state_tracker.py b/legacy/coin_state_tracker.py deleted file mode 100644 index 8a0ce84..0000000 --- a/legacy/coin_state_tracker.py +++ /dev/null @@ -1,261 +0,0 @@ -""" -山寨币状态跟踪器 — 去重 + 状态升级管理 - -状态生命周期:蓄力 → 加速 → 爁发 → 已告警 → 过期 -只有状态升级才告警,同级别12h内不重复推送 -""" - -import sqlite3 -import json -from datetime import datetime, timedelta - -DB_PATH = "/home/ubuntu/quant_monitor/altcoin/altcoin_monitor.db" - -STATE_ORDER = { - "蓄力": 1, - "加速": 2, - "爆发": 3, - "已告警": 4, - "过期": 5, -} - -ALERT_LEVELS = { - "蓄力": "low", - "加速": "medium", - "爆发": "high", -} - -# 12h内同级别不重复告警 -ALERT_COOLDOWN_HOURS = 12 - -# 24h后状态自动过期 -EXPIRE_HOURS = 24 - - -def init_db(): - """初始化数据库""" - conn = sqlite3.connect(DB_PATH) - conn.execute(""" - CREATE TABLE IF NOT EXISTS coin_state ( - symbol TEXT PRIMARY KEY, - state TEXT NOT NULL, - score REAL DEFAULT 0, - anomaly_type TEXT DEFAULT '', - sector TEXT DEFAULT '', - leader_status TEXT DEFAULT '', - detected_at TEXT NOT NULL, - last_alert_time TEXT DEFAULT '', - last_alert_level TEXT DEFAULT '', - detail_json TEXT DEFAULT '{}' - ) - """) - conn.commit() - conn.close() - - -def get_state(symbol): - """获取币种当前状态""" - conn = sqlite3.connect(DB_PATH) - row = conn.execute("SELECT * FROM coin_state WHERE symbol=?", (symbol,)).fetchone() - conn.close() - if row: - return { - "symbol": row[0], - "state": row[1], - "score": row[2], - "anomaly_type": row[3], - "sector": row[4], - "leader_status": row[5], - "detected_at": row[6], - "last_alert_time": row[7], - "last_alert_level": row[8], - "detail": json.loads(row[9]) if row[9] else {}, - } - return None - - -def update_state(symbol, new_state, score=0, anomaly_type="", sector="", leader_status="", detail={}): - """ - 更新币种状态,判断是否需要告警 - 返回: {"should_alert": bool, "alert_level": str, "reason": str} - """ - current = get_state(symbol) - now = datetime.now().isoformat() - - should_alert = False - alert_level = "" - reason = "" - - if current: - current_level = STATE_ORDER.get(current["state"], 0) - new_level = STATE_ORDER.get(new_state, 0) - - # 状态升级 → 检查冷却时间 - if new_level > current_level: - last_alert_time = current.get("last_alert_time", "") or "" - last_alert_level_str = current.get("last_alert_level", "") or "" - - if not last_alert_time: - # 没有上次告警记录 → 状态升级肯定要告警 - should_alert = True - alert_level = ALERT_LEVELS.get(new_state, "low") - reason = f"状态升级(首次告警): {current['state']} → {new_state}" - else: - last_dt = datetime.fromisoformat(last_alert_time) - cooldown_end = last_dt + timedelta(hours=ALERT_COOLDOWN_HOURS) - - # 新级别比上次告警级别高 → 不管冷却期都要告警 - last_alert_num = STATE_ORDER.get(last_alert_level_str, 0) - if new_level > last_alert_num: - should_alert = True - alert_level = ALERT_LEVELS.get(new_state, "low") - reason = f"状态升级: {current['state']} → {new_state}" - elif now > cooldown_end.isoformat(): - # 冷却期过了,可以再告警 - should_alert = True - alert_level = ALERT_LEVELS.get(new_state, "low") - reason = f"冷却期结束,重新告警: {new_state}" - else: - should_alert = False - reason = f"冷却期内(上次告警: {last_alert_level_str} @ {last_alert_time})" - - # 状态降级 → 标记为"信号消退" - elif new_level < current_level: - should_alert = False - reason = f"信号消退: {current['state']} → {new_state}" - - # 同级别,分数显著提升(≥3分) → 也告警 - elif new_level == current_level and score - current["score"] >= 3: - last_alert_time = current.get("last_alert_time", "") or "" - if not last_alert_time: - should_alert = True - alert_level = ALERT_LEVELS.get(new_state, "low") - reason = f"同级别分数显著提升(首次): {current['score']} → {score}" - else: - last_dt = datetime.fromisoformat(last_alert_time) - cooldown_end = last_dt + timedelta(hours=ALERT_COOLDOWN_HOURS) - if now > cooldown_end.isoformat(): - should_alert = True - alert_level = ALERT_LEVELS.get(new_state, "low") - reason = f"同级别但分数显著提升: {current['score']} → {score}" - - # 更新记录 - conn = sqlite3.connect(DB_PATH) - alert_time = now if should_alert else current.get("last_alert_time", "") - alert_lvl = alert_level if should_alert else current.get("last_alert_level", "") - conn.execute(""" - UPDATE coin_state SET state=?, score=?, anomaly_type=?, sector=?, - leader_status=?, detail_json=?, last_alert_time=?, last_alert_level=? - WHERE symbol=? - """, (new_state, score, anomaly_type, sector, leader_status, - json.dumps(detail, ensure_ascii=False), alert_time, alert_lvl, symbol)) - conn.commit() - conn.close() - - else: - # 新币种,首次检测 - # 蓄力级别首次不告警(太多噪音),加速和爆发才告警 - if STATE_ORDER.get(new_state, 0) >= STATE_ORDER.get("加速", 0): - should_alert = True - alert_level = ALERT_LEVELS.get(new_state, "low") - reason = f"新检测: {new_state}" - - conn = sqlite3.connect(DB_PATH) - alert_time = now if should_alert else "" - alert_lvl = alert_level if should_alert else "" - conn.execute(""" - INSERT INTO coin_state (symbol, state, score, anomaly_type, sector, - leader_status, detected_at, last_alert_time, last_alert_level, detail_json) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, (symbol, new_state, score, anomaly_type, sector, leader_status, - now, alert_time, alert_lvl, json.dumps(detail, ensure_ascii=False))) - conn.commit() - conn.close() - - return {"should_alert": should_alert, "alert_level": alert_level, "reason": reason} - - -def get_all_active(): - """获取所有活跃状态(未过期的)""" - conn = sqlite3.connect(DB_PATH) - cutoff = (datetime.now() - timedelta(hours=EXPIRE_HOURS)).isoformat() - rows = conn.execute(""" - SELECT symbol, state, score, anomaly_type, sector, leader_status, detected_at - FROM coin_state WHERE detected_at > ? AND state != '过期' - ORDER BY score DESC - """, (cutoff,)).fetchall() - conn.close() - - return [{ - "symbol": r[0], "state": r[1], "score": r[2], - "anomaly_type": r[3], "sector": r[4], "leader_status": r[5], - "detected_at": r[6] - } for r in rows] - - -def get_candidates_for_confirm(): - """获取加速状态的候选(需要第三层确认的)""" - conn = sqlite3.connect(DB_PATH) - cutoff = (datetime.now() - timedelta(hours=EXPIRE_HOURS)).isoformat() - rows = conn.execute(""" - SELECT symbol, state, score, anomaly_type, sector, leader_status, detail_json - FROM coin_state WHERE state IN ('加速', '蓄力') AND detected_at > ? - AND score >= 6 - ORDER BY score DESC - """, (cutoff,)).fetchall() - conn.close() - - return [{ - "symbol": r[0], "state": r[1], "score": r[2], - "anomaly_type": r[3], "sector": r[4], "leader_status": r[5], - "detail": json.loads(r[6]) if r[6] else {}, - } for r in rows] - - -def expire_old_states(): - """过期超过24h的状态""" - conn = sqlite3.connect(DB_PATH) - cutoff = (datetime.now() - timedelta(hours=EXPIRE_HOURS)).isoformat() - conn.execute("UPDATE coin_state SET state='过期' WHERE detected_at < ? AND state != '过期'", (cutoff,)) - conn.commit() - conn.close() - - -if __name__ == "__main__": - init_db() - print("DB初始化完成") - - # 测试状态升级 - r1 = update_state("FET/USDT", "蓄力", score=3, anomaly_type="布林收窄+量突变", sector="AI_DePIN") - print(f"FET 蓄力: alert={r1['should_alert']}, reason={r1['reason']}") - - r2 = update_state("FET/USDT", "加速", score=8, anomaly_type="MACD金叉+RSI拐点", sector="AI_DePIN") - print(f"FET 加速: alert={r2['should_alert']}, reason={r2['reason']}") - - r3 = update_state("FET/USDT", "爆发", score=12, anomaly_type="1H放量突破", sector="AI_DePIN") - print(f"FET 爁发: alert={r3['should_alert']}, reason={r3['reason']}") - - # 测试同级别不重复 - r4 = update_state("FET/USDT", "爆发", score=13, anomaly_type="1H放量突破+均线多头", sector="AI_DePIN") - print(f"FET 爁发(重复): alert={r4['should_alert']}, reason={r4['reason']}") - - # 测试分数显著提升 - r5 = update_state("FET/USDT", "爆发", score=16, anomaly_type="三级共振", sector="AI_DePIN") - print(f"FET 爁发(分数升3+): alert={r5['should_alert']}, reason={r5['reason']}") - - # 测试新币种首次检测 - r6 = update_state("PEPE/USDT", "蓄力", score=3, sector="MEME") - print(f"PEPE 蓄力(首次): alert={r6['should_alert']}, reason={r6['reason']}") - - r7 = update_state("PEPE/USDT", "加速", score=8, sector="MEME") - print(f"PEPE 加速(首次): alert={r7['should_alert']}, reason={r7['reason']}") - - # 查看活跃状态 - active = get_all_active() - print(f"\n活跃状态: {len(active)}个") - for a in active: - print(f" {a['symbol']}: {a['state']} (score={a['score']})") - - # 查看需要确认的候选 - candidates = get_candidates_for_confirm() - print(f"\n需要确认的候选: {len(candidates)}个") \ No newline at end of file diff --git a/legacy/price_tracker_ws.py b/legacy/price_tracker_ws.py deleted file mode 100644 index b785d7a..0000000 --- a/legacy/price_tracker_ws.py +++ /dev/null @@ -1,209 +0,0 @@ -#!/usr/bin/env python3 -""" -山寨币实时价格监控 — ccxt REST 高频轮询版。 - -职责边界:本进程只做实时价格采集和“候选状态”提交;最终状态落库、是否推送、推送价格口径 -全部由 altcoin_db.apply_recommendation_state_transition 主链路决定。 -""" -import sys -import os -import time -import ccxt -from datetime import datetime - -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -from app.db.altcoin_db import ( - init_db, - get_active_recommendations_deduped, - update_recommendation_tracking, - apply_recommendation_state_transition, - should_push, - log_push, -) -from app.integrations.feishu_push import push_altcoin_tp_sl_alert -from app.core.opportunity_lifecycle import apply_entry_quality_gate - -POLL_INTERVAL = 5 -REFRESH_INTERVAL = 60 -BATCH_SIZE = 50 - -exchange = ccxt.binance({"enableRateLimit": True}) -last_refresh = 0 -active_map = {} - - -def load_active(): - try: - recs = get_active_recommendations_deduped(actionable_only=False) - # 只监控 active 最新去重记录;结案记录由主链路/网站展示,不在实时入口反复触发。 - return {r["symbol"]: r for r in recs if r.get("symbol") and r.get("status") == "active"} - except Exception as e: - print(f"[{datetime.now():%H:%M:%S}] 加载推荐失败: {e}", flush=True) - return {} - - -def check_triggers(symbol, rec, current_price): - """向后兼容旧测试:结案记录不得再产生入场触发。""" - if rec.get("status") != "active": - return None - action, signals = detect_candidate_action(rec, current_price, {}) - if not action: - return None - return {"action_status": action, "signals": signals, "pushable": action == "可即刻买入"} - - -def detect_candidate_action(rec, current_price, track_result): - """仅产生候选状态;不得在这里推送或落最终状态。""" - terminal_action = { - "hit_tp1": "止盈1", - "hit_tp2": "止盈2", - "stopped_out": "止损", - }.get((track_result or {}).get("status")) - if terminal_action: - return terminal_action, [f"状态机检测到{terminal_action}"] - - ep = rec.get("entry_plan") or {} - entry_action = ep.get("entry_action", "") or rec.get("initial_action", "") - plan_entry_price = ep.get("entry_price", 0) or 0 - entry_price = rec.get("entry_price", 0) or 0 - cur_action = rec.get("action_status", "持有") - - candidate_action = None - candidate_signals = [] - if entry_action == "等回踩" and plan_entry_price > 0 and current_price <= plan_entry_price: - ep["entry_trigger_confirmed"] = True - candidate_action = "可即刻买入" - candidate_signals = [f"回踩到位 ${plan_entry_price:.6f}"] - elif entry_action in ("即刻买入", "可即刻买入") and current_price <= (plan_entry_price or entry_price): - ep["entry_trigger_confirmed"] = True - candidate_action = "可即刻买入" - candidate_signals = ["入场条件仍满足"] - elif cur_action == "可即刻买入": - candidate_action = "可即刻买入" - candidate_signals = ["入场窗口延续"] - - if candidate_action: - raw_signals = rec.get("signals", []) - if isinstance(raw_signals, str): - try: - raw_signals = json.loads(raw_signals) - except Exception: - raw_signals = [raw_signals] if raw_signals else [] - gated_action, gated_plan, reasons = apply_entry_quality_gate( - action_status=candidate_action, - entry_plan=ep, - signals=raw_signals, - current_price=current_price, - market_context=rec.get("market_context") or {}, - derivatives_context=rec.get("derivatives_context") or {}, - sector_context=rec.get("sector_context") or {}, - ) - if gated_action != "可即刻买入": - reason = reasons[0] if reasons else "买点质量闸门降级" - return gated_action, [reason] - return gated_action, candidate_signals - return None, [] - - -def fetch_prices(symbols): - all_tickers = {} - for i in range(0, len(symbols), BATCH_SIZE): - batch = symbols[i:i + BATCH_SIZE] - try: - tickers = exchange.fetch_tickers(batch) - for s in batch: - if s in tickers: - all_tickers[s] = tickers[s] - except Exception as e: - print(f"[{datetime.now():%H:%M:%S}] 拉取价格失败(批次{i//BATCH_SIZE}): {e}", flush=True) - time.sleep(1) - return all_tickers - - -def maybe_push_from_decision(symbol, decision): - """推送层只消费主链路 decision,不自行判断状态。""" - action = decision.get("action_status") - if not decision.get("push_required"): - return - if not should_push(symbol, "entry", action): - print(f"[{datetime.now():%H:%M:%S}] ⏭ 跳过推送 {symbol}: entry/{action} 冷却中", flush=True) - return - push_altcoin_tp_sl_alert( - decision["push_symbol"], - decision["push_current_price"], - decision["push_entry_price"], - decision["push_pnl_pct"], - action, - decision.get("push_signals", []), - decision.get("stop_loss", 0), - decision.get("tp1", 0), - decision.get("tp2", 0), - ) - log_push(symbol, "entry", action, rec_id=decision.get("id", 0)) - print(f"[{datetime.now():%H:%M:%S}] 📲 {symbol} → {action} (盈亏{decision['push_pnl_pct']}%)", flush=True) - - -def main_loop(): - global active_map, last_refresh - init_db() - active_map = load_active() - last_refresh = time.time() - print(f"[{datetime.now():%H:%M:%S}] 🚀 实时价格监控启动,活跃: {len(active_map)}只,轮询间隔{POLL_INTERVAL}s", flush=True) - - while True: - loop_start = time.time() - if loop_start - last_refresh >= REFRESH_INTERVAL: - active_map = load_active() - last_refresh = loop_start - - if not active_map: - time.sleep(POLL_INTERVAL) - continue - - tickers = fetch_prices(list(active_map.keys())) - for symbol, rec in list(active_map.items()): - ticker = tickers.get(symbol) - if not ticker: - continue - current_price = ticker.get("last", 0) - if current_price <= 0: - continue - - try: - track_result = update_recommendation_tracking(rec["id"], current_price) or {} - except Exception as e: - print(f"[{datetime.now():%H:%M:%S}] 跟踪价格失败 {symbol}: {e}", flush=True) - continue - - action, signals = detect_candidate_action(rec, current_price, track_result) - if not action: - continue - - decision = apply_recommendation_state_transition( - rec["id"], - requested_action=action, - current_price=current_price, - event_time=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), - signals=signals, - ) - if decision.get("action_status") in ("止盈1", "止盈2", "止损"): - active_map.pop(symbol, None) - else: - rec["action_status"] = decision.get("action_status", rec.get("action_status")) - rec["entry_price"] = decision.get("entry_price", rec.get("entry_price")) - rec["current_price"] = decision.get("current_price", current_price) - rec["pnl_pct"] = decision.get("pnl_pct", rec.get("pnl_pct")) - active_map[symbol] = rec - - try: - maybe_push_from_decision(symbol, decision) - except Exception as e: - print(f"[{datetime.now():%H:%M:%S}] 推送失败 {symbol}: {e}", flush=True) - - elapsed = time.time() - loop_start - time.sleep(max(0.5, POLL_INTERVAL - elapsed)) - - -if __name__ == "__main__": - main_loop() diff --git a/legacy/web/index.html b/legacy/web/index.html deleted file mode 100644 index 70da6e5..0000000 --- a/legacy/web/index.html +++ /dev/null @@ -1,330 +0,0 @@ - - -
- - -