alphax/app/services/altcoin_confirm.py
2026-05-18 00:58:19 +08:00

1421 lines
62 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
山寨币爆发监控系统 v11 — 第三层:爆发确认 + 入场方案(纯前瞻版)
只用量价齐飞+PA起爆点+放量突破做确认不用MACD/RSI/均线
"""
import sys, os, shutil
# ⚠️ 安全机制启动时强制清__pycache__防止旧版字节码残留
for cache_dir in [
os.path.join(os.path.dirname(__file__), "__pycache__"),
os.path.join(os.path.dirname(__file__), "..", "__pycache__"),
]:
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir, ignore_errors=True)
import ccxt
import pandas as pd
import numpy as np
import json
import sys
import os
import time
import requests
from datetime import datetime, timedelta
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from app.core.sector_map import get_burst_threshold, is_meme_coin, get_sector_for_coin, COIN_TO_SECTORS
from app.db.altcoin_db import (
init_db, expire_old_states, expire_old_recommendations,
get_candidates_for_confirm, update_state, get_conn, create_recommendation, log_screening,
log_cron_run, update_latest_price_cache, get_recommendation_for_push,
)
from app.integrations.push_orchestrator import push_mainline_state_update
from app.config.config_loader import (
get_strategy_direction,
vp_fly_params,
confirm_min_score,
confirm_volume_breakout_ratio,
confirm_atr_multipliers,
confirm_stop_loss_params,
get_strategy_params,
)
from app.core.opportunity_lifecycle import apply_entry_quality_gate
from app.core.opportunity_level import (
attach_opportunity_level,
classify_opportunity_level,
level_tp_parameters,
select_level_stop_loss,
)
from app.core.opportunity_funnel import build_screening_detail
from app.config.config_loader import _get_section as _get_cfg_section
from app.core.pa_engine import (
classify_candles, calc_atr, find_supply_demand_zones,
find_continuous_k, detect_ignition_point, full_pa_analysis,
analyze_entry_point, detect_trend_exhaustion,
)
exchange = ccxt.binance({"enableRateLimit": True})
REPO_ROOT = Path(__file__).resolve().parents[2]
def fetch_klines(symbol, timeframe, limit=200):
try:
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
df = pd.DataFrame(ohlcv, columns=["timestamp", "open", "high", "low", "close", "volume"])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
return df
except Exception:
return None
def symbol_recently_closed(symbol: str, hours: int = 8) -> bool:
"""检查该币种最近N小时内是否有已完成的交易止盈/止损)。
用于冷却期:刚止盈的币不宜立即追入。"""
from datetime import datetime, timezone, timedelta
from app.db.schema import get_conn
conn = get_conn()
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
row = conn.execute("""
SELECT COUNT(*) FROM recommendation
WHERE symbol = %s AND status IN ('hit_tp1', 'hit_tp2', 'stopped_out')
AND COALESCE(hit_tp1_time, hit_tp2_time, stopped_out_time, '') >= %s
""", (symbol, cutoff)).fetchone()
conn.close()
return (row[0] or 0) > 0
def _event_time_from_age(df, age_bars: int):
"""把 age_bars 转成K线时间用于候选信号新鲜度判断。"""
try:
if df is None or age_bars is None:
return None
idx = len(df) - 1 - int(age_bars)
if idx < 0 or idx >= len(df):
return None
ts = df["timestamp"].iloc[idx]
return ts.to_pydatetime() if hasattr(ts, "to_pydatetime") else ts
except Exception:
return None
def _is_candidate_fresh(cand, event_times, max_hours=6):
"""候选新鲜度:当前触发或新近进入候选池,避免旧结构反复确认。"""
now = datetime.now()
fresh_events = []
for t in event_times or []:
if not t:
continue
try:
age_h = (now - t.replace(tzinfo=None)).total_seconds() / 3600
if age_h <= max_hours:
fresh_events.append({"time": t.isoformat(), "age_hours": round(age_h, 2)})
except Exception:
pass
if fresh_events:
return True, "current_trigger", fresh_events
detected_at = cand.get("detected_at") or cand.get("updated_at") or cand.get("created_at") or ""
try:
if detected_at:
age_h = (now - datetime.fromisoformat(str(detected_at).replace("Z", "").split("+")[0])).total_seconds() / 3600
if age_h <= max_hours:
return True, "fresh_candidate_state", [{"time": str(detected_at), "age_hours": round(age_h, 2)}]
# coin_state.detected_at 会被每轮扫描刷新,不适合作为“当前触发”。
# 超过窗口后只能作为历史结构背景是否确认必须依赖当前K线/消息等新触发。
return False, "stale_structure_background_only", [{"time": str(detected_at), "age_hours": round(age_h, 2)}]
except Exception:
pass
return False, "structure_candidate_unknown_age", []
def _build_trigger_context(fresh_reason, fresh_events, vp_data=None, stale_vp_count=0, stale_1h_ignitions=None, stale_d1_ignitions=None, bp_daily=None, entry_action=""):
"""生成用户可审计的触发上下文:区分当前触发、历史背景、消息触发。"""
fresh_events = fresh_events or []
stale_1h_ignitions = stale_1h_ignitions or []
stale_d1_ignitions = stale_d1_ignitions or []
current = []
stale = []
if (vp_data or {}).get("vp_fly_count", 0) > 0:
current.append({"type": "technical", "label": "当前1H量价齐飞", "source": "binance_ohlcv_1h"})
if stale_vp_count:
stale.append({"type": "technical", "label": "历史1H量价齐飞", "source": "binance_ohlcv_1h", "count": stale_vp_count})
if stale_1h_ignitions:
stale.append({"type": "technical", "label": "历史1H起爆点", "source": "pa_engine_1h", "count": len(stale_1h_ignitions)})
if stale_d1_ignitions:
stale.append({"type": "technical", "label": "历史日线起爆点", "source": "pa_engine_1d", "count": len(stale_d1_ignitions)})
for e in fresh_events:
current.append({"type": "technical", "label": "当前结构触发", "source": "pa_engine", **e})
if (bp_daily or {}).get("detected"):
stale.append({"type": "technical_background", "label": "日线底部突破回踩背景", "source": "daily_structure"})
if fresh_reason == "stale_structure_background_only":
status = "stale_background_only"
label = "历史结构背景缺少当前K线触发"
elif current:
status = "current_technical"
label = "当前K线/形态触发"
elif fresh_reason == "fresh_candidate_state":
status = "fresh_candidate"
label = "新近进入候选池"
else:
status = "background"
label = "结构背景观察"
return {
"trigger_status": status,
"trigger_label": label,
"fresh_reason": fresh_reason or "",
"current_triggers": current,
"stale_background": stale,
"entry_action": entry_action or "",
"is_current_opportunity": status in ("current_technical", "fresh_candidate"),
}
# ==================== 上下文数据 enrichment ====================
def _spot_to_futures(symbol):
"""BTC/USDT → BTCUSDT"""
return symbol.replace("/", "")
def fetch_derivatives_context(symbol):
"""从 Binance 期货 API 获取衍生品情绪数据。
返回: {funding_rate, open_interest_change_24h, top_trader_long_pct, top_trader_long_short_ratio}
失败时返回空 dict。
"""
futures_sym = _spot_to_futures(symbol)
ctx = {}
try:
# 1. Funding Rate
r = requests.get(
f"https://fapi.binance.com/fapi/v1/premiumIndex?symbol={futures_sym}",
timeout=5,
)
if r.status_code == 200:
ctx["funding_rate"] = float(r.json().get("lastFundingRate", 0) or 0)
except Exception:
pass
try:
# 2. Open Interest
r = requests.get(
f"https://fapi.binance.com/fapi/v1/openInterest?symbol={futures_sym}",
timeout=5,
)
if r.status_code == 200:
ctx["open_interest"] = float(r.json().get("openInterest", 0) or 0)
except Exception:
pass
try:
# 3. 大户多空比 (取最近2条对比OI变化)
r = requests.get(
f"https://fapi.binance.com/futures/data/topLongShortAccountRatio"
f"?symbol={futures_sym}&period=5m&limit=2",
timeout=5,
)
if r.status_code == 200:
data = r.json()
if len(data) >= 1:
last = data[-1]
long_pct = round(float(last["longAccount"]) * 100, 1)
short_pct = round(float(last["shortAccount"]) * 100, 1)
ctx["top_trader_long_pct"] = long_pct
ctx["top_trader_long_short_ratio"] = (
round(long_pct / short_pct, 2) if short_pct > 0 else 0
)
except Exception:
pass
# 4. OI 24h变化通过 openInterestHist 日线数据计算)
try:
r = requests.get(
f"https://fapi.binance.com/futures/data/openInterestHist"
f"?symbol={futures_sym}&period=1d&limit=2",
timeout=5,
)
if r.status_code == 200:
hist = r.json()
if len(hist) >= 2:
oi_yesterday = float(hist[0].get("sumOpenInterestValue", 0) or 0)
oi_today = float(hist[1].get("sumOpenInterestValue", 0) or 0)
if oi_yesterday > 0:
ctx["open_interest_change_24h"] = round(
(oi_today - oi_yesterday) / oi_yesterday * 100, 1
)
except Exception:
pass
# 兜底:没有 OI 24h 变化时标 0
if "open_interest_change_24h" not in ctx:
ctx["open_interest_change_24h"] = 0
return ctx
def compute_market_context(h1_df, price):
"""从已有 1H K线计算市场热度上下文。
返回: {volume_24h, turnover_acceleration_1h, turnover_acceleration_4h, change_24h}
"""
ctx = {}
try:
if h1_df is None or len(h1_df) < 2:
return ctx
# 24h成交量近24根1H K线
recent_24h = h1_df.tail(24)
ctx["volume_24h"] = round(float(recent_24h["volume"].sum()), 0)
# 1H量能加速最新1根 vs 近20根均值
avg_vol_1h = float(h1_df["volume"].rolling(20).mean().iloc[-1])
if avg_vol_1h > 0:
ctx["turnover_acceleration_1h"] = round(
float(h1_df["volume"].iloc[-1]) / avg_vol_1h, 2
)
else:
ctx["turnover_acceleration_1h"] = 1.0
# 4H量能加速近4根 vs 近12根均值
if len(h1_df) >= 16:
recent_4h_vol = float(h1_df["volume"].tail(4).sum())
prev_12h_vol = float(h1_df["volume"].iloc[-16:-4].sum()) / 3 # 每4H均值
if prev_12h_vol > 0:
ctx["turnover_acceleration_4h"] = round(
recent_4h_vol / 4 / (prev_12h_vol / 4), 2
)
else:
ctx["turnover_acceleration_4h"] = 1.0
else:
ctx["turnover_acceleration_4h"] = float(ctx.get("turnover_acceleration_1h", 1.0))
# 24h涨跌
if len(h1_df) >= 24:
price_24h_ago = float(h1_df["close"].iloc[-24])
if price_24h_ago > 0:
ctx["change_24h"] = round((price - price_24h_ago) / price_24h_ago * 100, 1)
else:
ctx["change_24h"] = 0
else:
ctx["change_24h"] = 0
except Exception:
pass
return ctx
def compute_sector_context(symbol, cand_detail=None):
"""用 sector_map.py 推断板块联动上下文。
优先使用粗筛/细筛已计算的 sector_context兜底自行推断。
返回: {sectors, hot_sectors, leader_symbol, leader_move_pct}
"""
ctx = {}
try:
sectors = get_sector_for_coin(symbol)
ctx["sectors"] = sectors
# 优先使用上游已计算的 sector_context粗筛/细筛已做板块龙头检测)
upstream = (cand_detail or {}).get("sector_context") or {}
upstream_hot = upstream.get("hot_sectors") or []
upstream_leader = upstream.get("leader_symbol") or ""
if upstream_hot:
ctx["hot_sectors"] = upstream_hot
elif sectors:
ctx["hot_sectors"] = sectors[:1]
else:
ctx["hot_sectors"] = []
if upstream_leader:
ctx["leader_symbol"] = upstream_leader
else:
ctx["leader_symbol"] = ""
# leader_move_pct优先上游 sector_context.leader_pct兜底本币24h涨跌
upstream_leader_pct = upstream.get("leader_pct")
if upstream_leader_pct is not None and upstream_leader_pct != 0:
ctx["leader_move_pct"] = round(float(upstream_leader_pct), 1)
else:
ctx["leader_move_pct"] = round(
float((cand_detail or {}).get("change_24h", 0) or 0), 1
)
except Exception:
pass
return ctx
# ==================== 确认逻辑 ====================
def detect_volume_price_fly_1h(df_1h):
"""确认层量价齐飞检测。
“1H量价齐飞”必须是当前/近当前信号默认只承认最近2根1H K线
更早的放量阳线属于历史爆发背景,不能继续作为当前爆发确认。
"""
if df_1h is None or len(df_1h) < 20:
return {"vp_fly_count": 0, "max_vol_ratio": 0, "vp_fly_details": [], "stale_vp_fly_details": []}
vp_cfg = vp_fly_params()
avg_vol = df_1h["volume"].rolling(20).mean().iloc[-1]
recent = df_1h.tail(12)
vp_fly_count = 0
max_vol_ratio = 0
vp_fly_details = []
stale_vp_fly_details = []
max_signal_age_hours = vp_cfg.get("max_signal_age_hours", 1)
for i, (_, row) in enumerate(recent.iterrows()):
vol_ratio = row["volume"] / avg_vol if avg_vol > 0 else 0
body_pct = abs(row["close"] - row["open"]) / (row["high"] - row["low"] + 0.00001) * 100
direction = 1 if row["close"] > row["open"] else -1
age_hours = len(recent) - 1 - i
max_vol_ratio = max(max_vol_ratio, vol_ratio)
if vol_ratio >= vp_cfg.get("vol_ratio_min", 5.0) and body_pct >= vp_cfg.get("body_ratio_min", 0.70) * 100 and direction == 1:
detail = {"vol_ratio": round(vol_ratio, 1), "body_pct": round(body_pct, 0), "age_hours": age_hours, "direction": ""}
if age_hours <= max_signal_age_hours:
vp_fly_count += 1
vp_fly_details.append(detail)
else:
detail["stale"] = True
stale_vp_fly_details.append(detail)
return {
"vp_fly_count": vp_fly_count,
"max_vol_ratio": round(max_vol_ratio, 1),
"vp_fly_details": vp_fly_details,
"stale_vp_fly_details": stale_vp_fly_details,
"stale_vp_fly_count": len(stale_vp_fly_details),
"latest_vp_age_hours": min((d.get("age_hours", 999) for d in vp_fly_details), default=None),
}
def _recent_pa_items(items, max_age_bars, direction=None):
"""过滤 PA 事件:只把当前/近当前事件当成触发信号。"""
result = []
stale = []
for item in items or []:
if direction is not None and item.get("direction") != direction:
continue
age = item.get("age_bars")
if age is None and item.get("index") is not None:
# 兼容旧结构:调用方传入的是 full_pa_analysis 当前窗口结果index 可推断相对位置。
age = 999
if age is not None and age <= max_age_bars:
result.append(item)
else:
stale.append(item)
return result, stale
def detect_breakout_pullback(df, timeframe="1d"):
"""日线/周线底部突破回踩检测(经典安全形态)。
模式:底部形成 → 放量突破关键阻力 → 回踩不破 → 起爆确认。
返回: {detected, score, signals, entry_zone, stop_level, quality}
"""
result = {"detected": False, "score": 0, "signals": [], "entry_zone": None, "stop_level": None, "quality": ""}
if df is None or len(df) < 50:
return result
closes = df["close"].values
highs = df["high"].values
lows = df["low"].values
volumes = df["volume"].values
# 1. 找波段低点局部最小值左右各5根K线
swing_lows = []
for i in range(10, len(closes) - 5):
if lows[i] == min(lows[i-10:i+5]):
swing_lows.append({"idx": i, "low": lows[i], "close": closes[i]})
if len(swing_lows) < 2:
return result
# 2. 找最近的重要底部(最低的低点)
recent_lows = [sl for sl in swing_lows if sl["idx"] >= len(closes) - 60]
if not recent_lows:
recent_lows = swing_lows[-5:]
bottom = min(recent_lows, key=lambda x: x["low"])
# 3. 找底部之后的第一个波段高点(突破目标)
swing_highs = []
for i in range(10, len(closes) - 5):
if highs[i] == max(highs[i-10:i+5]):
swing_highs.append({"idx": i, "high": highs[i]})
# 取底部之后的波段高点
post_bottom_highs = [sh for sh in swing_highs if sh["idx"] > bottom["idx"] and sh["idx"] < len(closes) - 5]
if not post_bottom_highs:
# 用底部到现在的最高点
max_idx = bottom["idx"] + np.argmax(highs[bottom["idx"]:])
post_bottom_highs = [{"idx": int(max_idx), "high": highs[int(max_idx)]}]
breakout_level = post_bottom_highs[0]["high"]
# 4. 检查突破最近20根内是否有收盘价超过突破位
recent_closes = closes[-20:]
broke_out = any(c > breakout_level * 1.005 for c in recent_closes) # 0.5% buffer
if not broke_out:
return result
# 5. 检查回踩:突破后是否有回落,但现在是否稳在突破位之上
current_price = closes[-1]
recent_lows_arr = lows[-10:]
pullback_low = min(recent_lows_arr)
# 回踩幅度 = (突破后最高 - 回踩最低) / 突破位
post_breakout_high = max(highs[-20:])
pullback_pct = (post_breakout_high - pullback_low) / breakout_level * 100 if breakout_level > 0 else 0
# 当前价格必须稳在突破位上方(或轻微跌破但快速收回)
holding = current_price >= breakout_level * 0.98
if not holding and pullback_low < breakout_level * 0.96:
return result # 跌破了,形态失败
# 6. 评分
score = 0
signals = []
# ---- 底部质量K线形态+量价关系,替代数天数) ----
bottom_idx = bottom["idx"]
bottom_price = float(bottom["low"])
# 底部观察窗口底部K线前后各15根
zone_start = max(0, bottom_idx - 15)
zone_end = min(len(closes), bottom_idx + 15)
zone_closes = closes[zone_start:zone_end]
# ① 底部附近量缩(卖盘枯竭)
pre_vol = np.mean(volumes[max(0, bottom_idx - 20):bottom_idx]) if bottom_idx >= 20 else np.mean(volumes[:bottom_idx])
post_vol = np.mean(volumes[bottom_idx:min(len(volumes), bottom_idx + 10)])
vol_shrink_ratio = post_vol / pre_vol if pre_vol > 0 else 1
if vol_shrink_ratio <= 0.7:
score += 4
signals.append(f"{timeframe} 底部缩量({vol_shrink_ratio:.1f}x)")
elif vol_shrink_ratio <= 0.9:
score += 2
signals.append(f"{timeframe} 底部量稳({vol_shrink_ratio:.1f}x)")
# ② K线实体收窄多空平衡
df_opens = df["open"].values[zone_start:zone_end]
df_closes = zone_closes
bodies_arr = np.abs(df_closes - df_opens)
body_mean = np.mean(bodies_arr)
body_recent_mean = np.mean(np.abs(closes[max(0, len(closes)-20):] - df["open"].values[max(0, len(closes)-20):]))
body_shrink = body_mean / body_recent_mean if body_recent_mean > 0 else 1
if body_shrink <= 0.6:
score += 3
signals.append(f"{timeframe} K线缩量收敛")
# ③ 反转形态检测(底部附近找锤子线/吞没)
reversal_found = False
for i in range(max(0, bottom_idx - 3), min(len(closes), bottom_idx + 3)):
o = float(df["open"].values[i])
c = float(closes[i])
h = float(highs[i])
l = float(lows[i])
body = abs(c - o)
upper_wick = h - max(o, c)
lower_wick = min(o, c) - l
total_range = h - l if h > l else 0.0001
if body < total_range * 0.3 and lower_wick > body * 2 and c > o:
reversal_found = True
signals.append(f"{timeframe} 锤子线反转")
score += 3
break
if i > 0:
prev_o = float(df["open"].values[i-1])
prev_c = float(closes[i-1])
if prev_c < prev_o and c > o and c > prev_o and o < prev_c:
reversal_found = True
signals.append(f"{timeframe} 看涨吞没")
score += 4
break
if not reversal_found:
# Check for morning star (3-candle pattern)
for i in range(max(0, bottom_idx - 5), min(len(closes) - 2, bottom_idx + 1)):
c1 = float(closes[i])
c2 = float(closes[i+1])
c3 = float(closes[i+2])
o1 = float(df["open"].values[i])
o3 = float(df["open"].values[i+2])
if (c1 < o1) and abs(c2 - o1) < abs(c1 - o1) * 0.5 and c3 > o3 and c3 > c1:
reversal_found = True
signals.append(f"{timeframe} 晨星反转")
score += 4
break
# ④ 底部后主力吸筹(阳线放量 > 阴线量)
up_vols = []
down_vols = []
for i in range(bottom_idx, min(len(closes), bottom_idx + 15)):
if float(closes[i]) >= float(df["open"].values[i]):
up_vols.append(float(volumes[i]))
else:
down_vols.append(float(volumes[i]))
if up_vols and down_vols:
up_avg = np.mean(up_vols)
down_avg = np.mean(down_vols)
if up_avg > down_avg * 1.3:
score += 3
signals.append(f"{timeframe} 阳线放量吸筹({up_avg/down_avg:.1f}x)")
elif up_avg > down_avg:
score += 1
# ⑤ 二次回踩量更小(抛压消失)
bottom_zone = bottom_price * 1.03
touches = []
for i in range(bottom_idx + 5, len(lows)):
if float(lows[i]) <= bottom_zone and i > bottom_idx + 3:
touches.append((i, float(volumes[i])))
if len(touches) >= 2:
first_touch_vol = touches[0][1]
last_touch_vol = touches[-1][1]
if last_touch_vol < first_touch_vol * 0.7:
score += 4
signals.append(f"{timeframe} 回踩缩量确认({len(touches)}次)")
elif len(touches) >= 2:
score += 2
signals.append(f"{timeframe} 多次回踩({len(touches)}次)")
elif len(touches) == 1:
score += 1
# ⑥ 底部形成时间(距离越远=越充分,但权重降低)
bars_from_bottom = len(closes) - bottom_idx
if bars_from_bottom >= 30:
score += 2
signals.append(f"{timeframe} 筑底{bars_from_bottom}")
elif bars_from_bottom >= 15:
score += 1
# 突破放量确认最新1根量 vs 近30日中位数≥3x才叫放量
breakout_vol = float(volumes[-1])
avg_vol = float(np.median(volumes[-30:])) if len(volumes) >= 30 else float(np.mean(volumes[-20:]))
vol_ratio = breakout_vol / avg_vol if avg_vol > 0 else 1
if vol_ratio >= 3.0:
score += 6
signals.append(f"{timeframe} 突破放量({vol_ratio:.1f}x)")
elif vol_ratio >= 2.0:
score += 3
signals.append(f"{timeframe} 突破量能确认({vol_ratio:.1f}x)")
# 回踩质量(浅回踩=强支撑)
if pullback_pct <= 3:
score += 5
signals.append(f"{timeframe} 浅回踩({pullback_pct:.1f}%)")
elif pullback_pct <= 6:
score += 3
signals.append(f"{timeframe} 正常回踩({pullback_pct:.1f}%)")
else:
score += 1
# 当前价站稳突破位上方
price_above_breakout = (current_price - breakout_level) / breakout_level * 100
if price_above_breakout >= 1:
score += 3
signals.append(f"{timeframe} 站稳突破位+{price_above_breakout:.1f}%")
elif price_above_breakout >= 0:
score += 2
elif price_above_breakout >= -2:
score += 1
signals.append(f"{timeframe} 回踩突破位(-{abs(price_above_breakout):.1f}%)")
# 质量评价
if score >= 15:
quality = "优质"
elif score >= 10:
quality = "良好"
elif score >= 6:
quality = "可观察"
else:
quality = ""
result["detected"] = True
result["score"] = score
result["signals"] = signals
result["entry_zone"] = round(float(breakout_level), 6)
result["stop_level"] = round(float(bottom["low"] * 0.97), 6) # 止损设在底部下方3%
result["quality"] = quality
return result
def confirm_burst(symbol, cand):
"""对单个候选做爆发确认v1.7.0:强共振旁路+量价齐飞双门控)
cand: coin_state行数据含leader_status/detail_json等
确认条件=量价齐飞K≥1 OR (起爆点≥10× + 蓄力≥4根 + 辅助信号≥1)
不用MACD/RSI/均线
"""
score = 0
signals = []
confirmed = False
entry_plan = {}
# 提取cand数据v1.7.0:用于辅助信号检测)
cand_detail = json.loads(cand.get("detail_json", "{}"))
leader_status = cand.get("leader_status", "")
h1_df = fetch_klines(symbol, "1h", limit=100)
m15_df = fetch_klines(symbol, "15m", limit=100)
h4_df = fetch_klines(symbol, "4h", limit=100)
d1_df = fetch_klines(symbol, "1d", limit=120) # 日线趋势安全检查+突破回踩
current_trigger_times = []
if h1_df is None or len(h1_df) < 50:
return {"confirmed": False, "score": 0, "signals": ["数据不足"], "entry_plan": {},
"pa_1h": {}, "pa_15min": {}, "pa_1d": {}, "m30_aligned": False,
"market_context": {}, "derivatives_context": {}, "sector_context": {}}
price = float(h1_df["close"].iloc[-1])
atr_1h = calc_atr(h1_df, 14)
confirm_cfg = _get_cfg_section("confirm")
pa_recency_cfg = confirm_cfg.get("pa_recency", {})
# ---- 1H量价行为核心前瞻信号 ----
vol_avg = float(h1_df["volume"].rolling(20).mean().iloc[-1])
vol_latest = float(h1_df["volume"].iloc[-1])
vol_ratio = vol_latest / vol_avg if vol_avg > 0 else 1
vp_data = detect_volume_price_fly_1h(h1_df)
vp_fly_count = vp_data["vp_fly_count"]
for d in vp_data.get("vp_fly_details", []):
t = _event_time_from_age(h1_df, d.get("age_hours"))
if t:
current_trigger_times.append(t)
stale_vp_count = vp_data.get("stale_vp_fly_count", 0)
# 量价齐飞K≥2 → 极强确认
if vp_fly_count >= 2:
signals.append(f"1H {vp_fly_count}根量价齐飞K(最强确认)")
score += 8
elif vp_fly_count == 1:
signals.append(f"1H 量价齐飞K(量{vp_data['max_vol_ratio']}x)")
score += 5
elif vp_data.get("stale_vp_fly_count", 0) > 0:
stale = vp_data.get("stale_vp_fly_details", [{}])[0]
signals.append(f"1H历史放量阳线已过期({stale.get('age_hours')}小时前, 量{stale.get('vol_ratio')}x)")
# 1H放量≥3x但不是量价齐飞=量价背离)
if vol_ratio >= 3 and vp_fly_count == 0:
signals.append(f"1H放量({vol_ratio:.1f}x)但无量价齐飞(量价背离)")
score += 1 # 低权重:量价背离是假信号
# ---- PA引擎4H级别阻力/支撑) ----
pa_4h = full_pa_analysis(h4_df, "4h") if h4_df is not None and len(h4_df) >= 30 else {}
h4_zones = pa_4h.get("zones", [])
high_q_supply = [z for z in h4_zones if z["type"] == "supply" and z["q_score"] >= 7]
resistance = None
if high_q_supply:
resistance = high_q_supply[0]["top"]
# ---- v1.7.7: 日线 PA 全分析(供需区 + 起爆点 + 动K高权重----
# 日线是最大的时间框架,信号强度远高于小时级
pa_1d = {}
d1_zones = []
if d1_df is not None and len(d1_df) >= 50:
pa_1d = full_pa_analysis(d1_df, "1d")
d1_zones = pa_1d.get("zones", [])
# 日线起爆点只承认最近1根日线内发生更早的日线起爆属于背景不当作当前触发。
d1_ignitions = pa_1d.get("ignition_points", [])
recent_d1_ignitions, stale_d1_ignitions = _recent_pa_items(d1_ignitions, confirm_cfg.get("pa_recency", {}).get("d1_ignition_max_age_bars", 1))
for ig in recent_d1_ignitions[-3:]:
if ig["direction"] == 1:
signals.append(f"日线 {ig['signal_type']}(强度{ig['strength_ratio']}×)")
score += 6 # 日线×1.5 vs 4H的+4
elif ig["direction"] == -1:
signals.append(f"日线 {ig['signal_type']}(强度{ig['strength_ratio']}×)")
score += 3
for ig in recent_d1_ignitions:
if ig.get("direction") == 1:
t = _event_time_from_age(d1_df, ig.get("age_bars"))
if t:
current_trigger_times.append(t)
if stale_d1_ignitions:
ig = stale_d1_ignitions[-1]
signals.append(f"日线历史起爆点已过期({ig.get('age_bars', '?')}根前, 强度{ig.get('strength_ratio')}×)")
# 日线连续动K(阳) — 日线趋势确认
d1_candles = pa_1d.get("candles_class", [])
recent_d1 = d1_candles[-5:] if len(d1_candles) >= 5 else d1_candles
dy_d1 = sum(1 for c in recent_d1 if c["type"] == "dynamic" and c["direction"] == 1)
if dy_d1 >= 3:
signals.append(f"日线 {dy_d1}动K(阳)趋势确认")
score += 5
elif dy_d1 >= 1:
signals.append(f"日线 {dy_d1}动K(阳)")
score += 2
# 日线需求区反弹 — 最强结构信号
d1_demand = [z for z in d1_zones if z["type"] == "demand" and z["q_score"] >= 5]
if d1_demand and price > 0:
nearest = min(d1_demand, key=lambda z: abs(z["top"] - price))
if nearest["top"] < price < nearest["top"] * 1.15:
signals.append(f"日线需求区反弹(Q={nearest['q_score']} ${nearest['top']:.4f})")
score += 6
# ---- 1H放量突破4H高质量阻力 ----
breakout_confirmed = False
if resistance and vol_ratio >= confirm_volume_breakout_ratio():
prev_close_1h = float(h1_df["close"].iloc[-2])
if price > resistance and prev_close_1h < resistance:
q_info = f"Q={high_q_supply[0]['q_score']}" if high_q_supply else ""
signals.append(f"1H放量突破阻力${resistance:.4f}({q_info}{vol_ratio:.1f}x)")
breakout_confirmed = True
elif vol_ratio >= 5:
# 5x以上放量即有效突破山寨币历史数据不足以形成阻力区
signals.append(f"1H极放量({vol_ratio:.1f}x)")
score += 2
# ---- PA引擎1H级别分析 ----
pa_1h = full_pa_analysis(h1_df, "1h") if atr_1h > 0 else {}
# ---- PA起爆点检测(1H)只承认最近2根1H内发生 ----
pa_1h_ignitions = pa_1h.get("ignition_points", [])
recent_1h_ignitions, stale_1h_ignitions = _recent_pa_items(
pa_1h_ignitions,
pa_recency_cfg.get("h1_ignition_max_age_bars", 1),
)
ignition_confirmed = False
for ig in recent_1h_ignitions[-3:]:
if ig["direction"] == 1:
signals.append(f"1H {ig['signal_type']}(强度{ig['strength_ratio']}×)")
score += 4
ignition_confirmed = True
elif ig["direction"] == -1:
signals.append(f"1H {ig['signal_type']}(强度{ig['strength_ratio']}×)")
score += 2
for ig in recent_1h_ignitions:
if ig.get("direction") == 1:
t = _event_time_from_age(h1_df, ig.get("age_bars"))
if t:
current_trigger_times.append(t)
if stale_1h_ignitions:
ig = stale_1h_ignitions[-1]
signals.append(f"1H历史起爆点已过期({ig.get('age_bars', '?')}根前, 强度{ig.get('strength_ratio')}×)")
# ---- 1H动K(阳)+量递增 ----
pa_1h_candles = pa_1h.get("candles_class", [])
recent_1h = pa_1h_candles[-6:] if len(pa_1h_candles) >= 6 else pa_1h_candles
dy_1h = sum(1 for c in recent_1h if c["type"] == "dynamic" and c["direction"] == 1)
if dy_1h >= 3:
recent_1h_df = h1_df.tail(dy_1h + 1)
vol_increasing = True
for i in range(1, len(recent_1h_df)):
if float(recent_1h_df["volume"].iloc[i]) < float(recent_1h_df["volume"].iloc[i-1]):
vol_increasing = False
break
if vol_increasing:
signals.append(f"1H {dy_1h}动K(阳)+量递增")
score += 3
else:
signals.append(f"1H {dy_1h}动K(阳)")
score += 1
# ---- 1H趋势衰减检测 ----
pa_1h_exhaustion = pa_1h.get("trend_exhaustion", {})
if pa_1h_exhaustion.get("exhausted"):
for es in pa_1h_exhaustion.get("signals", []):
signals.append(f"⚠️ {es}")
if pa_1h_exhaustion["severity"] == "high":
score -= 3
elif pa_1h_exhaustion["severity"] == "medium":
score -= 1
# ---- v1.7.7: 30min 桥接(填补 1H→15min 缺口)----
# 30min 是中间周期:确认 1H 趋势在中等周期是否有结构支撑
m30_df = fetch_klines(symbol, "30m", limit=60)
m30_aligned = False
if m30_df is not None and len(m30_df) >= 30 and atr_1h > 0:
pa_30min = full_pa_analysis(m30_df, "30m")
m30_candles = pa_30min.get("candles_class", [])
recent_30 = m30_candles[-8:] if len(m30_candles) >= 8 else m30_candles
dy_30 = sum(1 for c in recent_30 if c["type"] == "dynamic" and c["direction"] == 1)
st_30 = sum(1 for c in recent_30 if c["type"] == "static")
# 30min 与 1H 方向对齐阳动K≥3 且 阴动K≤1
if dy_30 >= 3 and sum(1 for c in recent_30 if c["type"] == "dynamic" and c["direction"] == -1) <= 1:
signals.append(f"30min {dy_30}阳动K(与1H共振)")
score += 3
m30_aligned = True
elif st_30 >= 4 and dy_30 >= 1:
# 30min 蓄力中静K多+少量动K— 待突破,不加分但也不扣
signals.append(f"30min 蓄力({st_30}静K+{dy_30}阳动K)")
m30_aligned = True # 不扣分,视为中性偏多
elif sum(1 for c in recent_30 if c["type"] == "dynamic" and c["direction"] == -1) >= 3:
signals.append("⚠️ 30min 阴动K≥3(与1H背离)")
score -= 2
# else: 无明确信号,不干预
# ---- PA引擎15min入场点分析 ----
pa_15min_result = {}
entry_action = "等回踩" # 默认保守PA引擎会覆盖
# v1.7.8: PA引擎始终调用有15min数据即可不再依赖 direction 变量
# direction=0 时仍可基于 h1_df + m15_df 独立判断入场时机
direction = 1 if (vp_fly_count >= 1 or ignition_confirmed or breakout_confirmed) else 0
if m15_df is not None and len(m15_df) >= 20 and atr_1h > 0:
# 始终调用 PA 引擎,不限制 direction。PA 内部自己判断即刻买入/等回踩/放弃
pa_15min_result = analyze_entry_point(
h1_df=h1_df, m15_df=m15_df, atr_1h=atr_1h,
zones_4h=h4_zones if h4_zones else [], direction=max(direction, 1),
)
entry_action = pa_15min_result.get("action", "等回踩")
if pa_15min_result.get("breakout_k_info"):
bk = pa_15min_result["breakout_k_info"]
t = _event_time_from_age(m15_df, bk.get("age_bars"))
if t:
current_trigger_times.append(t)
atr_ratio = bk.get("atr_ratio", 0)
if atr_ratio > 2.0:
signals.append(f"15min 强突破K线(ATR×{atr_ratio:.1f})")
score += 3
elif atr_ratio > 1.5:
signals.append(f"15min 突破K线(ATR×{atr_ratio:.1f})")
score += 2
if pa_15min_result.get("pullback_info"):
pb = pa_15min_result["pullback_info"]
signals.append(f"15min 回踩确认(${pb.get('low', 0):.4f}→${pb.get('high', 0):.4f})")
score += 2
if pa_15min_result.get("false_breakout"):
signals.append("⚠️ 15min假突破排除")
score -= 5
if entry_action == "即刻买入":
signals.append("🟢 15min即刻入场信号")
score += 3
elif entry_action == "等回踩":
wait_price = pa_15min_result.get("wait_price", 0)
if wait_price > 0:
signals.append(f"🟡 15min等回踩到${wait_price:.4f}")
elif entry_action == "放弃":
signals.append("🔴 15min无入场信号")
# ---- 日线底部突破回踩检测(提前到门控前,供高位过滤器复用)----
# 复用已拉取的 d1_df零额外API调用
bp_daily = {"detected": False}
try:
if d1_df is not None and len(d1_df) >= 50:
bp_daily = detect_breakout_pullback(d1_df, "日线")
except Exception:
pass
# ---- 底部突破回踩加分(必须在最终确认判定前生效)----
if bp_daily.get("detected"):
signals.extend(bp_daily.get("signals", []))
score += min(bp_daily["score"], 12)
# ---- 最终确认判定v1.7.0:双门控 — 量价齐飞 OR 强共振旁路)----
# 门控A量价齐飞K ≥1保留历史最可靠
# 门控B强共振旁路 — 起爆点≥10× + 蓄力≥4根 + 辅助信号≥1
# WIF案例: 起爆点15×+板块联动→等2天才放量$0.193→$0.211(+9.3%)
fresh_ok, fresh_reason, fresh_events = _is_candidate_fresh(
cand,
current_trigger_times,
max_hours=confirm_cfg.get("current_trigger_max_age_hours", 6),
)
if not fresh_ok:
signals.append("⛔ 候选过期无近6小时当前触发避免旧结构反复确认")
confirmed = fresh_ok and (vp_fly_count >= 1)
if not confirmed and fresh_ok:
# 旧放量/旧起爆不能直接确认;但如果当前 15min/日线/结构给出强分,
# 允许作为“历史强背景 + 当前结构确认”。避免修复时效后把所有结构型机会一刀切为0。
structure_gate_score = confirm_cfg.get("structure_gate_min_score", 12)
current_trigger_ok = bool(current_trigger_times)
recent_candidate_ok = (fresh_reason == "fresh_candidate_state")
if score >= structure_gate_score and entry_action in ("即刻买入", "可即刻买入") and (current_trigger_ok or recent_candidate_ok):
if fresh_reason != "stale_structure_background_only" and (stale_vp_count > 0 or stale_1h_ignitions or stale_d1_ignitions or bp_daily.get("detected")):
signals.append(f"🟡 历史强背景+当前结构确认(score≥{structure_gate_score})")
confirmed = True
# ---- v1.7.0: 强共振旁路(在量价齐飞门控未过时启用)----
bypass_confirmed = False
if fresh_ok and not confirmed:
bypass_cfg = _get_cfg_section("confirm").get("strong_resonance_bypass", {})
if bypass_cfg.get("enabled", True):
# 1. 最大起爆点强度1H + 4H + 日线)
max_ig_strength = 0
for ig in recent_1h_ignitions[-5:]:
if ig.get("direction") == 1:
max_ig_strength = max(max_ig_strength, ig.get("strength_ratio", 0))
if pa_4h:
pa_4h_ignitions = pa_4h.get("ignition_points", [])
recent_4h_ignitions, _ = _recent_pa_items(pa_4h_ignitions, pa_recency_cfg.get("h4_ignition_max_age_bars", 1))
for ig in recent_4h_ignitions[-5:]:
if ig.get("direction") == 1:
max_ig_strength = max(max_ig_strength, ig.get("strength_ratio", 0))
if pa_1d:
d1_ignitions = pa_1d.get("ignition_points", [])
recent_d1_for_bypass, _ = _recent_pa_items(d1_ignitions, pa_recency_cfg.get("d1_ignition_max_age_bars", 1))
for ig in recent_d1_for_bypass[-5:]:
if ig.get("direction") == 1:
max_ig_strength = max(max_ig_strength, ig.get("strength_ratio", 0) * 1.5) # 日线权重1.5×
# 2. 静K蓄力计数1H最近12根
static_k_count = sum(
1 for c in (pa_1h_candles[-12:] if pa_1h_candles else [])
if c.get("type") == "static"
)
# 3. 辅助信号计数
aux_count = 0
# 板块联动leader_status含"龙头"
if leader_status and "龙头" in str(leader_status):
aux_count += 1
# 大户偏多:>55%
deriv = cand_detail.get("derivatives_context", {})
top_long = deriv.get("top_trader_long_pct") if deriv else None
if top_long is not None and top_long > 55:
aux_count += 1
# 日线筑底
if bp_daily.get("detected"):
aux_count += 1
# 舆情共振screener给了sentiment_bonus
sentiment_bonus = cand_detail.get("sentiment_bonus")
if sentiment_bonus is not None and sentiment_bonus > 0:
aux_count += 1
min_ig = bypass_cfg.get("min_ignition_strength", 10)
min_static = bypass_cfg.get("min_static_k_count", 4)
min_aux = bypass_cfg.get("min_aux_signals", 1)
if max_ig_strength >= min_ig and static_k_count >= min_static and aux_count >= min_aux:
bonus = bypass_cfg.get("score_bonus", 3)
signals.append(
"🔥 强共振旁路(起爆{}×+蓄力{}根+辅助{})".format(
max_ig_strength, static_k_count, aux_count
)
)
bypass_confirmed = True
confirmed = True
score += bonus
# ---- 日线趋势安全检查v1.6.3----
# 日线持续走低 → 不确认即使有1H量价齐飞
if confirmed and d1_df is not None and len(d1_df) >= 20:
d1_closes = d1_df["close"].values.astype(float)
d1_sma20 = float(d1_closes[-20:].mean())
d1_last = float(d1_closes[-1])
# 近5日趋势净涨幅 < 0 且至少3天在跌
recent5 = d1_closes[-5:]
net_change_pct = (float(recent5[-1]) - float(recent5[0])) / float(recent5[0]) * 100
down_days = sum(1 for i in range(1, len(recent5)) if float(recent5[i]) <= float(recent5[i-1]))
if d1_last < d1_sma20 and net_change_pct < -2 and down_days >= 3:
signals.append(f"⛔ 日线持续走低(价{d1_last:.5f}<MA20 {d1_sma20:.5f}, 5日跌{net_change_pct:.1f}%),拒绝确认")
confirmed = False
# ---- 高位过滤器v1.6.6 → v1.7.8优化门槛从硬编码80%改为读取config----
# 高位币价在60日区间range_percentile%分位 + 近7日涨幅>gain_7d%+ 无底部突破信号 → 拒绝
# v1.7.8: range_percentile 从80%降到70%(数据:高位>70%入场TP率仅1%
# confirm_cfg 已在函数开头读取,供高位过滤+等回踩降权共用
if confirmed and d1_df is not None and len(d1_df) >= 60:
d1_cl_all = d1_df["close"].values.astype(float)
d1_hi_all = d1_df["high"].values.astype(float)
max_60d = float(d1_hi_all[-60:].max())
min_60d = float(d1_cl_all[-60:].min())
cur_close = float(d1_cl_all[-1])
range_60d = max_60d - min_60d
position_pct = (cur_close - min_60d) / range_60d * 100 if range_60d > 0 else 50
close_7d = float(d1_cl_all[-7])
gain_7d = (cur_close - close_7d) / close_7d * 100 if close_7d > 0 else 0
high_cfg = confirm_cfg.get("high_position", {})
high_percentile = high_cfg.get("range_percentile", 70)
high_gain = high_cfg.get("gain_7d_pct", 15)
is_high = position_pct > high_percentile and gain_7d > high_gain
has_bottom = bp_daily.get("detected", False)
if is_high and not has_bottom:
signals.append(
f"⛔ 高位无底部突破拒绝(区间{position_pct:.0f}%, 7日涨{gain_7d:.0f}%)"
)
confirmed = False
confirmed = confirmed and score >= confirm_min_score()
# === 等回踩降权 (v1.6.9) ===
# 数据: 等回踩37%失败率 vs 持有0%
# confirm_cfg 已在上面高位过滤器中获取
pullback_cfg = confirm_cfg.get("pullback_penalty", {})
if pullback_cfg.get("enabled", True) and entry_action == "等回踩":
penalty = pullback_cfg.get("score_deduction", 3)
score -= penalty
signals.append(f"⚠️ 等回踩降权(-{penalty}分)")
confirmed = confirmed and score >= confirm_min_score()
# 假突破排除
if pa_15min_result.get("false_breakout"):
confirmed = False
# 衰减严重时排除
if pa_1h_exhaustion.get("severity") == "high":
confirmed = False
# ---- 入场方案 ----
stop_cfg = confirm_stop_loss_params()
if confirmed and atr_1h > 0:
if entry_action == "即刻买入" and pa_15min_result:
entry_price = round(float(price), 6)
entry_method = "🟢15min即刻入场(突破进行中)"
elif entry_action == "等回踩" and pa_15min_result.get("wait_price", 0) > 0:
entry_price = round(pa_15min_result["wait_price"], 6)
entry_method = f"🟡等回踩到${entry_price:.4f}(15min静K确认)"
else:
last_k_low = float(h1_df["low"].iloc[-1])
atr_multipliers = confirm_atr_multipliers()
entry_price = round(float(last_k_low + atr_multipliers.get("entry_offset", 0.5) * atr_1h), 6)
entry_method = "1H突破回踩确认(突破K线低点+0.5ATR)"
# 🔴 v1.7.6 修复: 等回踩/突破回踩的入场价不能高于当前市价。
# 出现这种情况意味着价格在目标之下运行,不是真正的"回踩等待"。
# 此时应取当前价作为入场参考或取当前价×0.995作为保守目标。
if entry_action != "即刻买入" and entry_price > price:
entry_price = round(float(price), 6)
entry_method = f"{entry_method}(入场价已修正为当前价${price:.4f}"
signals.append("⚠️ 入场方案价高于现价,已修正为当前市价")
# === ATR动态止损 (v1.6.8 → v1.7.1) ===
# 止损% = max(2×ATR_1h/price, 5%地板)min(止损%, 10%天花板)
atr_stop_pct = (atr_1h * stop_cfg.get("atr_mult", 2.0)) / price
stop_pct_final = max(atr_stop_pct, stop_cfg.get("floor_pct", 0.05))
stop_pct_final = min(stop_pct_final, stop_cfg.get("ceiling_pct", 0.10))
atr_stop_price = round(float(price * (1 - stop_pct_final)), 6)
# 收集所有止损候选,再按机会级别选择紧/中/宽止损。
stop_candidates = [atr_stop_price]
# Q≥5需求区兜底有结构支撑优先用需求区底部
demand_zones = [z for z in h4_zones if z["type"] == "demand" and z["q_score"] >= stop_cfg.get("demand_zone_min_q", 5)]
if demand_zones:
zone_stop = round(demand_zones[0]["btm"] * stop_cfg.get("zone_discount", 0.98), 6)
stop_candidates.append(zone_stop)
# v1.7.1: 结构止损 — 最近swing_low下方AR案例入场$2.21→最低$2.06→$2.06止损存活)
swing_lookback = stop_cfg.get("swing_low_lookback", 12)
swing_buffer = stop_cfg.get("swing_low_buffer", 0.98)
if len(h1_df) >= swing_lookback:
swing_low = float(h1_df["low"].tail(swing_lookback).min())
if swing_low < price:
swing_stop = round(swing_low * swing_buffer, 6)
# 只取有效的结构止损(在入场价下方,且不过于极端)
if swing_stop > price * 0.82:
stop_candidates.append(swing_stop)
signals.append(f"结构止损(swing_low${swing_low:.4f}×{swing_buffer})")
level_meta = classify_opportunity_level(
signals=signals,
entry_plan={
"entry_action": entry_action,
"entry_price": entry_price,
"current_price": round(float(price), 6),
"pa_15min_summary": pa_15min_result.get("reason", ""),
},
market_context=compute_market_context(h1_df, price),
derivatives_context=cand_detail.get("derivatives_context", {}),
sector_context=cand_detail.get("sector_context", {}),
m30_aligned=m30_aligned,
)
opportunity_level = level_meta.get("opportunity_level", "structure_watch")
stop_loss, stop_basis = select_level_stop_loss(
level=opportunity_level,
price=price,
entry_price=entry_price,
stop_candidates=stop_candidates,
)
if stop_loss <= 0:
stop_loss = min(stop_candidates)
stop_pct_final = max((price - stop_loss) / price, 0) if price > 0 and stop_loss > 0 else stop_pct_final
# === 分级动态止盈 ===
# 日内启动更重视近端兑现;结构/主题级别使用更宽目标。
atr_multipliers = confirm_atr_multipliers()
level_tp = level_tp_parameters(opportunity_level)
tp1_atr_pct = (atr_1h * level_tp.get("tp1_atr", atr_multipliers.get("tp1", 3.0))) / price
tp1_pct = max(tp1_atr_pct, level_tp.get("tp1_floor", atr_multipliers.get("tp1_floor", 0.05)))
tp1_candidates = [round(float(price * (1 + tp1_pct)), 6)]
if high_q_supply:
tp1_candidates.append(round(high_q_supply[0]["top"], 6))
tp1 = min(tp1_candidates)
tp2_atr_pct = (atr_1h * level_tp.get("tp2_atr", atr_multipliers.get("tp2", 5.0))) / price
tp2_pct = max(tp2_atr_pct, level_tp.get("tp2_floor", atr_multipliers.get("tp2_floor", 0.08)))
tp2 = round(float(price * (1 + tp2_pct)), 6)
risk = price - stop_loss
reward1 = tp1 - price
reward2 = tp2 - price
rr1 = round(reward1 / risk, 2) if risk > 0 else 0
rr2 = round(reward2 / risk, 2) if risk > 0 else 0
entry_plan = {
"entry_price": entry_price,
"entry_method": entry_method,
"entry_action": entry_action,
"stop_loss": stop_loss,
"stop_pct": round(stop_pct_final * 100, 1),
"tp1": tp1,
"tp2": tp2,
"rr1": rr1, "rr2": rr2,
"atr_1h": round(float(atr_1h), 6),
"current_price": round(float(price), 6),
"risk_reward_ok": rr1 >= 1.5,
"pa_15min_summary": pa_15min_result.get("reason", ""),
"pa_1h_exhaustion": pa_1h_exhaustion.get("severity", "low"),
"stop_basis": stop_basis,
"tp_basis": level_meta.get("tp_model", ""),
}
entry_plan = attach_opportunity_level(entry_plan, level_meta)
# v1.7.5 买点质量闸门:确认强势 ≠ 允许现价追买。
gated_action, gated_plan, gate_reasons = apply_entry_quality_gate(
action_status="可即刻买入" if entry_action in ("即刻买入", "可即刻买入") else "等回踩",
entry_plan=entry_plan,
signals=signals,
current_price=price,
market_context=compute_market_context(h1_df, price),
derivatives_context=cand_detail.get("derivatives_context", {}),
sector_context=cand_detail.get("sector_context", {}),
)
if bypass_confirmed and vp_fly_count == 0 and not current_trigger_times and gated_action == "可即刻买入":
gated_action = "等回踩"
gated_plan["entry_quality_gate"] = {
"blocked_action": "可即刻买入",
"final_action": "等回踩",
"reasons": ["强共振旁路缺少当前1H/15min触发最高进入等待回踩"],
}
gate_reasons.append("强共振旁路缺少当前1H/15min触发最高进入等待回踩")
entry_plan = gated_plan
entry_plan["entry_action"] = gated_action
if gate_reasons:
signals.append("⚠️ 买点质量闸门: " + "".join(gate_reasons[:3]))
if gated_action == "观察":
score -= 2
# 周线突破回踩(需独立拉取)
bp_weekly = {"detected": False}
try:
w1_df = fetch_klines(symbol, "1w", limit=52)
if w1_df is not None and len(w1_df) >= 30:
bp_weekly = detect_breakout_pullback(w1_df, "周线")
except Exception:
pass
if bp_weekly.get("detected"):
signals.extend(bp_weekly.get("signals", []))
score += min(bp_weekly["score"], 10)
# ---- 计算上下文数据 ----
market_context = compute_market_context(h1_df, price)
derivatives_context = fetch_derivatives_context(symbol)
sector_context = compute_sector_context(symbol, cand_detail)
trigger_context = _build_trigger_context(
fresh_reason if 'fresh_reason' in locals() else "",
fresh_events if 'fresh_events' in locals() else [],
vp_data=vp_data if 'vp_data' in locals() else {},
stale_vp_count=stale_vp_count if 'stale_vp_count' in locals() else 0,
stale_1h_ignitions=stale_1h_ignitions if 'stale_1h_ignitions' in locals() else [],
stale_d1_ignitions=stale_d1_ignitions if 'stale_d1_ignitions' in locals() else [],
bp_daily=bp_daily if 'bp_daily' in locals() else {},
entry_action=entry_action,
)
market_context["trigger_context"] = trigger_context
return {
"confirmed": confirmed,
"score": score,
"signals": signals,
"entry_plan": entry_plan if confirmed else {},
"price": round(float(price), 6),
"pa_1h": pa_1h,
"pa_15min": pa_15min_result,
"pa_1d": pa_1d,
"m30_aligned": m30_aligned,
"entry_action": entry_action,
"market_context": market_context,
"derivatives_context": derivatives_context,
"sector_context": sector_context,
"fresh_reason": fresh_reason if 'fresh_reason' in locals() else "",
"fresh_events": fresh_events if 'fresh_events' in locals() else [],
"trigger_context": trigger_context if 'trigger_context' in locals() else {},
}
def _emit_output(output, compact: bool = False):
if compact:
print(json.dumps(output, ensure_ascii=False))
else:
print(json.dumps(output, ensure_ascii=False, indent=2))
def main(compact: bool = False):
started_at = datetime.now()
try:
init_db()
expire_old_states()
candidates = get_candidates_for_confirm()
if not candidates:
output = {
"status": "no_candidates",
"message": "无需要确认的候选(需加速状态+评分≥6",
"check_time": datetime.now().isoformat(),
}
_emit_output(output, compact=compact)
return output
results = []
for cand in candidates:
symbol = cand["symbol"]
result = confirm_burst(symbol, cand)
if result["confirmed"]:
cand_detail = json.loads(cand.get("detail_json", "{}"))
state_result = update_state(
symbol,
new_state="爆发",
score=result["score"],
anomaly_type=",".join(result["signals"][:3]),
sector=cand_detail.get("sector", cand.get("sector", "")),
leader_status=cand_detail.get("leader_status", cand.get("leader_status", "")),
detail={**cand_detail, **result},
)
result["state_update"] = state_result
# 飞书只是通知层:确认阶段不再绕过 recommendation 主链路直接推送。
# 先完成 create_recommendation + DB 主状态派生,再用同一条主链路结果决定是否通知。
log_screening(
layer="确认", symbol=symbol, state="爆发", score=result["score"],
price=result["price"], signals=result["signals"],
sector=cand_detail.get("sector", cand.get("sector", "")),
leader_status=cand_detail.get("leader_status", cand.get("leader_status", "")),
is_meme=int(is_meme_coin(symbol)),
detail=build_screening_detail(
layer="确认",
state="爆发",
signals=result.get("signals", []),
detail={
"candidate_stage": "trade_confirm",
"confirmation_status": "confirmed",
"final_action": (result.get("entry_plan") or {}).get("entry_action", ""),
"fresh_reason": result.get("fresh_reason", ""),
"trigger_context": result.get("trigger_context") or {},
"entry_plan": result.get("entry_plan") or {},
},
),
)
# 🟢 只做做多!方向永远多头
rec_direction = get_strategy_direction()
# 🔴 v1.7.7 冷却期:刚止盈/止损的币不立即重新推荐
# Sahara案例17:24止盈(+5%)→17:40重新推荐→现价跌-3%
cooldown_hours = 8 if symbol_recently_closed(symbol, hours=8) else 0
if cooldown_hours > 0:
print(f"⏭ 跳过推荐({symbol}): 冷却期({cooldown_hours}h),刚止盈/止损不宜追")
results.append({**result, "cooling_off": True})
continue
ep = result["entry_plan"]
rec_entry_price = ep.get("entry_price") or result["price"]
rec_id = create_recommendation(
symbol=symbol, rec_state="爆发", rec_score=result["score"],
entry_price=rec_entry_price,
stop_loss=ep.get("stop_loss", 0), tp1=ep.get("tp1", 0),
tp2=ep.get("tp2", 0),
sector=cand_detail.get("sector", cand.get("sector", "")),
signals=result["signals"], is_meme=int(is_meme_coin(symbol)),
entry_plan=ep,
direction=rec_direction,
market_context=result.get("market_context"),
derivatives_context=result.get("derivatives_context"),
sector_context=result.get("sector_context"),
)
update_latest_price_cache(symbol, result["price"], updated_at=datetime.now().isoformat(), source="confirm")
result["rec_id"] = rec_id
mainline_item = get_recommendation_for_push(rec_id)
push_mainline_state_update(symbol, rec_id, mainline_item)
else:
cand_detail = json.loads(cand.get("detail_json", "{}"))
log_screening(
layer="确认", symbol=symbol, state=cand.get("state", "蓄力"), score=result.get("score", 0),
price=result.get("price", 0), signals=result.get("signals", []),
sector=cand_detail.get("sector", cand.get("sector", "")),
leader_status=cand_detail.get("leader_status", cand.get("leader_status", "")),
is_meme=int(is_meme_coin(symbol)),
detail=build_screening_detail(
layer="确认",
state=cand.get("state", "蓄力"),
signals=result.get("signals", []),
detail={
"candidate_stage": "trade_confirm",
"confirmed": False,
"confirmation_status": "rejected",
"reason": "确认未通过",
"entry_plan": result.get("entry_plan") or {},
"fresh_reason": result.get("fresh_reason", ""),
"trigger_context": result.get("trigger_context") or {},
},
),
)
result["state_update"] = {"should_alert": False, "reason": "未确认爆发"}
results.append({"symbol": symbol, **result})
confirmed = [r for r in results if r["confirmed"]]
unconfirmed = [r for r in results if not r["confirmed"]]
output = {
"status": "confirmed" if confirmed else "unconfirmed",
"confirmed_count": len(confirmed),
"unconfirmed_count": len(unconfirmed),
"confirmed": confirmed,
"unconfirmed": unconfirmed,
"check_time": datetime.now().isoformat(),
}
_emit_output(output, compact=compact)
return output
except Exception as e:
finished_at = datetime.now()
log_cron_run(
job_name="确认",
script_name="altcoin_confirm.py",
run_status="error",
result_status="exception",
started_at=started_at.isoformat(),
finished_at=finished_at.isoformat(),
duration_ms=int((finished_at - started_at).total_seconds() * 1000),
summary={},
error_message=str(e),
)
raise
finally:
if 'output' in locals():
finished_at = datetime.now()
summary = {
"confirmed_count": output.get("confirmed_count", 0),
"unconfirmed_count": output.get("unconfirmed_count", 0),
"processed_count": output.get("confirmed_count", 0) + output.get("unconfirmed_count", 0),
}
log_cron_run(
job_name="确认",
script_name="altcoin_confirm.py",
run_status="success",
result_status=output.get("status", "completed"),
started_at=started_at.isoformat(),
finished_at=finished_at.isoformat(),
duration_ms=int((finished_at - started_at).total_seconds() * 1000),
summary=summary,
error_message="",
)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="AlphaX Agent Crypto 爆发确认主流程")
parser.add_argument("--compact", action="store_true", help="输出紧凑 JSON便于脚本消费")
args = parser.parse_args()
main(compact=args.compact)