""" 山寨币爆发监控系统 — 推荐信号跟踪 + 策略交易执行账本 趋势反转检测:1H连续阴动K、量价背离、空头加速(替代MACD/RSI) 推荐层只管理信号状态;策略交易成交、TP/SL、移动止盈由 paper_trading 独立负责。 """ import sys, os, shutil # ⚠️ 安全机制:启动时强制清__pycache__,防止旧版字节码残留 for cache_dir in [ os.path.join(os.path.dirname(__file__), "__pycache__"), os.path.join(os.path.dirname(__file__), "..", "__pycache__"), ]: if os.path.exists(cache_dir): shutil.rmtree(cache_dir, ignore_errors=True) import ccxt import pandas as pd import json import sys import os from datetime import datetime from pathlib import Path sys.path.insert(0, os.path.dirname(__file__)) from app.db.altcoin_db import ( init_db, get_active_recommendations, update_recommendation_tracking, expire_old_recommendations, get_stats, update_recommendation_action_status, apply_recommendation_state_transition, log_cron_run, update_latest_price_cache, ) from app.db.paper_trading import sync_recommendation as sync_paper_trade from app.core.pa_engine import ( calc_atr, full_pa_analysis, detect_trend_exhaustion, analyze_entry_point, ) from app.config.config_loader import load_rules from app.core.opportunity_lifecycle import apply_entry_quality_gate from app.core.trade_math import pnl_pct as side_pnl_pct, should_stop_loss, should_take_profit from app.core.trade_direction import normalize_trade_side from app.db.paper_trading import sync_recommendation as sync_paper_trade exchange = ccxt.binance({"enableRateLimit": True}) REPO_ROOT = Path(__file__).resolve().parents[2] PROVISIONAL_BUY_SIGNAL_MARKERS = ( "可即刻入场", "当前价接近回踩目标", "回踩确认完毕", ) def fetch_klines(symbol, timeframe, limit=200): try: ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit) df = pd.DataFrame(ohlcv, columns=["timestamp", "open", "high", "low", "close", "volume"]) df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") return df except Exception: return None def _format_tracking_price(price): try: price = float(price) except Exception: return "" if price <= 0: return "" if price >= 1: return f"${price:.3f}" if price >= 0.01: return f"${price:.4f}" if price >= 0.0001: return f"${price:.6f}" return f"${price:.8f}" def reconcile_buy_signals_after_gate(buy_signals, final_action, gated_plan, gate_reasons): """买点质量闸门降级后,移除临时买入文案,保留最终终端指引。""" if final_action == "可即刻买入" or not gate_reasons: return list(buy_signals or []) filtered = [ str(signal) for signal in (buy_signals or []) if not any(marker in str(signal) for marker in PROVISIONAL_BUY_SIGNAL_MARKERS) ] first_reason = str(gate_reasons[0]) if final_action == "等回踩": target_price = ( (gated_plan or {}).get("rr_target_entry") or (gated_plan or {}).get("entry_price") or (gated_plan or {}).get("wait_price") ) price_text = _format_tracking_price(target_price) if price_text: filtered.append(f"🟡 现价不买,等待回踩至{price_text}附近;{first_reason}") else: filtered.append(f"🟡 现价不买,继续等待回踩;{first_reason}") elif final_action == "观察": filtered.append(f"🟡 买点未达标,保持观察;{first_reason}") return filtered def analyze_tracking_signals(symbol, rec, current_price): """ 对active推荐做动态跟踪分析 返回: {"action_status": str, "sell_signals": [...], "buy_signals": [...], "exhaustion": {...}, "entry_update": {...}} """ sell_signals = [] buy_signals = [] action_status = "持有" # 默认状态 entry_price = rec["entry_price"] stop_loss = rec.get("stop_loss", entry_price * 0.95) tp1 = rec.get("tp1", entry_price * 1.03) tp2 = rec.get("tp2", entry_price * 1.05) entry_plan = rec.get("entry_plan") or {} side = normalize_trade_side(entry_plan.get("side") or rec.get("side") or rec.get("direction")) # ---- 拉取1H数据做趋势分析 ---- h1_df = fetch_klines(symbol, "1h", limit=100) m15_df = fetch_klines(symbol, "15m", limit=100) h4_df = fetch_klines(symbol, "4h", limit=100) atr_1h = calc_atr(h1_df, 14) if h1_df is not None else 0 # ---- 趋势衰减检测 ---- exhaustion = {} if h1_df is not None and atr_1h > 0: exhaustion = detect_trend_exhaustion(h1_df, atr_1h) if exhaustion.get("exhausted"): for es in exhaustion.get("signals", []): sell_signals.append(es) if exhaustion["severity"] == "high": action_status = "衰减" elif exhaustion["severity"] == "medium": # 中度衰减,还持有但需关注 sell_signals.append("趋势中度衰减,关注止盈") # ---- 止盈信号检测 ---- pnl_pct = side_pnl_pct(side, entry_price, current_price) if entry_price > 0 else 0 # TP/SL 是策略交易生命周期,不再写成推荐信号动作。 if tp1 > 0 and should_take_profit(side, current_price, tp1): sell_signals.append(f"策略交易目标价已到达(${tp1:.4f}),执行结果以交易账本为准") rules = load_rules() if tp1 == 0 and pnl_pct >= 15: sell_signals.append(f"无TP保护但浮盈已达+{pnl_pct:.1f}%,仅作为信号风险提醒,是否平仓由策略交易/人工处理") # ---- 止损接近警告 ---- if stop_loss > 0: loss_pct = ((current_price / stop_loss) - 1) * 100 if side != "short" else ((stop_loss / current_price) - 1) * 100 if loss_pct < 3: # 当前价离止损不到3% sell_signals.append(f"⚠️ 接近止损!当前${current_price:.4f}离止损${stop_loss:.4f}仅{loss_pct:.1f}%") if should_stop_loss(side, current_price, stop_loss): op = "≥" if side == "short" else "≤" sell_signals.append(f"🔴 策略交易止损价已触达!${current_price:.4f}{op}${stop_loss:.4f},执行结果以交易账本为准") # ---- 趋势反转信号(PA行为检测,替代MACD) ---- if h1_df is not None and len(h1_df) >= 30 and atr_1h > 0: pa_1h = full_pa_analysis(h1_df, "1h") pa_1h_candles = pa_1h.get("candles_class", []) recent_candles = pa_1h_candles[-6:] if len(pa_1h_candles) >= 6 else pa_1h_candles # 1H连续阴动K → 趋势反转 dy_bear_count = sum(1 for c in recent_candles if c["type"] == "dynamic" and c["direction"] == -1) if dy_bear_count >= 3: sell_signals.append(f"🔴 1H连续{dy_bear_count}根阴动K(趋势反转)") if action_status == "持有": action_status = "反转" # 1H量价背离:放量但阴线(多头出货) avg_vol = float(h1_df["volume"].rolling(20).mean().iloc[-1]) recent_3 = h1_df.tail(3) high_vol_bear = 0 for _, row in recent_3.iterrows(): vol_r = row["volume"] / avg_vol if avg_vol > 0 else 0 if vol_r >= 2 and row["close"] < row["open"]: high_vol_bear += 1 if high_vol_bear >= 2: sell_signals.append("🔴 1H放量阴线×2(多头出货)") # 1H连续K空头加速 cont_k = pa_1h.get("continuous_k", []) for ck in cont_k: if ck["type"] == "bearish_continue" and ck["length"] >= 3: sell_signals.append(f"🔴 1H连续{ck['length']}K空头加速") if action_status == "持有": action_status = "反转" # === 时间衰减 (v1.6.9) === # 持仓>24h仍无盈利→降级衰减 decay_cfg = rules.get("tracker", {}).get("time_decay", {}) if decay_cfg.get("enabled", True): decay_hours = decay_cfg.get("decay_hours", 24) rec_time = rec.get("rec_time", "") if rec_time: try: t_rec = datetime.fromisoformat(rec_time) hours_held = (datetime.now() - t_rec).total_seconds() / 3600 if hours_held > decay_hours and pnl_pct <= 0 and action_status == "持有": sell_signals.append(f"⏰ 持仓{hours_held:.0f}h无盈利,降级衰减") action_status = "衰减" except Exception: pass # ---- 动态买入指引(只允许在未触发任何止盈/止损/退出信号时执行) ---- entry_update = {} current_action = entry_plan.get("entry_action", "") # 如果推荐状态是"等回踩",检查是否到了回踩价位。 # 注意:一旦本轮 action_status 已经是止盈/止损/跟踪止盈/反转/衰减,就绝不能再覆盖成“可即刻买入”。 if action_status == "持有" and current_action in ("等回踩", "🟡等回踩") and h4_df is not None and atr_1h > 0: pa_4h = full_pa_analysis(h4_df, "4h") h4_zones = pa_4h.get("zones", []) direction = -1 if side == "short" else 1 # 重新做15min入场点分析 if m15_df is not None and len(m15_df) >= 20: entry_result = analyze_entry_point( h1_df=h1_df, m15_df=m15_df, atr_1h=atr_1h, zones_4h=h4_zones, direction=direction, ) new_action = entry_result.get("action", "等回踩") if new_action == "即刻买入": buy_signals.append("🟢 反抽确认完毕!可开空(15min动K确认)" if side == "short" else "🟢 回踩确认完毕!可即刻入场(15min动K确认)") action_status = "可即刻买入" elif new_action == "等回踩": wait_price = entry_result.get("wait_price", 0) if wait_price > 0: # 检查当前价是否接近回踩目标 dist_pct = ((current_price / wait_price) - 1) * 100 if side != "short" else ((wait_price / current_price) - 1) * 100 if abs(dist_pct) < 2: buy_signals.append(f"🟢 当前价接近反抽目标!${current_price:.4f}≈${wait_price:.4f}" if side == "short" else f"🟢 当前价接近回踩目标!${current_price:.4f}≈${wait_price:.4f}") action_status = "可即刻买入" entry_update = { "new_action": new_action, "reason": entry_result.get("reason", ""), "wait_price": entry_result.get("wait_price", 0), } action_status, gated_plan, gate_reasons = apply_entry_quality_gate( action_status=action_status, entry_plan=entry_plan, signals=rec.get("signals"), current_price=current_price, market_context=rec.get("market_context") or {}, derivatives_context=rec.get("derivatives_context") or {}, sector_context=rec.get("sector_context") or {}, strategy_code=rec.get("strategy_code") or entry_plan.get("strategy_code"), ) if gate_reasons: buy_signals = reconcile_buy_signals_after_gate( buy_signals, action_status, gated_plan, gate_reasons, ) buy_signals.append("⚠️ 买点质量闸门: " + ";".join(gate_reasons[:3])) entry_plan.update(gated_plan) return { "action_status": action_status, "sell_signals": sell_signals, "buy_signals": buy_signals, "exhaustion": exhaustion, "entry_update": entry_update, "pnl_pct": round(pnl_pct, 2), } def track_prices(): """拉取所有active推荐币的实时价格,更新盈亏 + 动态跟踪信号""" recs = get_active_recommendations(actionable_only=False) if not recs: output = { "status": "no_active", "message": "无active推荐需要跟踪", "stats": get_stats(), "track_time": datetime.now().isoformat(), } print(json.dumps(output, ensure_ascii=False)) return output results = [] tracked_count = 0 observed_count = 0 failed_symbols = [] for rec in recs: symbol = rec["symbol"] try: ticker = exchange.fetch_ticker(symbol) current_price = ticker["last"] # 最新价格缓存:看板读取小表 latest_price_cache,不再依赖 price_tracking 高频流水表 update_latest_price_cache(symbol, current_price, source="tracker") # 基础盈亏/最大涨幅记录。未入场样本只做观察绩效,不触发 TP/SL。 track_result = update_recommendation_tracking(rec["id"], current_price) if not rec.get("entry_triggered") and rec.get("display_bucket") != "position" and rec.get("execution_status") not in ("holding", "completed"): results.append({ "symbol": symbol, "rec_id": rec["id"], "entry_price": rec["entry_price"], "current_price": current_price, "pnl_pct": track_result.get("pnl_pct"), "status": "observed_watch_only", "action_status": rec.get("action_status"), "sell_signals": [], "buy_signals": [], "exhaustion_severity": "low", }) observed_count += 1 print(f" {symbol}: 观察池样本仅更新观察价格/PnL,不触发止盈止损") continue # PA增强:动态跟踪信号分析 tracking_signals = analyze_tracking_signals(symbol, rec, current_price) # 统一状态迁移:tracker 只提交“候选状态 + 当前价”,最终状态由 DB 状态机统一落库。 # 飞书推送只能消费统一状态机返回的最终状态,不能再自行判断。 terminal_action = { "hit_tp2": "止盈2", "stopped_out": "止损", }.get(track_result.get("status")) requested_action = terminal_action or tracking_signals["action_status"] state_decision = apply_recommendation_state_transition( rec["id"], requested_action=requested_action, current_price=current_price, event_time=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), signals=tracking_signals.get("sell_signals", []) + tracking_signals.get("buy_signals", []), ) final_action = state_decision.get("action_status", requested_action) paper_result = sync_paper_trade( {**rec, **state_decision, "id": rec["id"], "symbol": symbol}, current_price, event_time=datetime.now().isoformat(), ) if paper_result.get("opened") or paper_result.get("closed") or paper_result.get("activated") or paper_result.get("moved"): print(f" {symbol}: paper trading event -> {paper_result}") results.append({ "symbol": symbol, "rec_id": rec["id"], "entry_price": rec["entry_price"], "current_price": current_price, "pnl_pct": tracking_signals["pnl_pct"], "status": track_result["status"], "action_status": tracking_signals["action_status"], "sell_signals": tracking_signals["sell_signals"], "buy_signals": tracking_signals["buy_signals"], "exhaustion_severity": tracking_signals.get("exhaustion", {}).get("severity", "low"), "paper_trade": paper_result, }) print(f" {symbol}: 入场${rec['entry_price']} → 现在${current_price} " f"盈亏{tracking_signals['pnl_pct']}% 状态={track_result['status']} " f"操作={tracking_signals['action_status']}") tracked_count += 1 except Exception as e: failed_symbols.append({"symbol": symbol, "error": str(e)}) print(f" {symbol}: 获取价格失败 - {e}") # 过期检查 expire_old_recommendations() output = { "status": "tracked", "tracked_count": tracked_count, "observed_count": observed_count, "failed_count": len(failed_symbols), "failed_symbols": failed_symbols, "results": results, "stats": get_stats(), "track_time": datetime.now().isoformat(), } print(json.dumps(output, ensure_ascii=False, indent=2)) return output def main(): started_at = datetime.now() try: init_db() output = track_prices() except Exception as e: finished_at = datetime.now() log_cron_run( job_name="跟踪", script_name="price_tracker.py", run_status="error", result_status="exception", started_at=started_at.isoformat(), finished_at=finished_at.isoformat(), duration_ms=int((finished_at - started_at).total_seconds() * 1000), summary={}, error_message=str(e), ) raise else: finished_at = datetime.now() summary = { "tracked_count": output.get("tracked_count", 0), "observed_count": output.get("observed_count", 0), "failed_count": output.get("failed_count", 0), "active_count": output.get("stats", {}).get("active_count", 0), } log_cron_run( job_name="跟踪", script_name="price_tracker.py", run_status="success", result_status=output.get("status", "completed"), started_at=started_at.isoformat(), finished_at=finished_at.isoformat(), duration_ms=int((finished_at - started_at).total_seconds() * 1000), summary=summary, error_message="", ) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="AlphaX Agent 价格跟踪任务") parser.add_argument("--once", action="store_true", default=True, help="执行单轮跟踪并输出结果") parser.parse_args() main()