428 lines
17 KiB
Python
428 lines
17 KiB
Python
"""
|
||
山寨币爆发监控系统 — 推荐信号跟踪 + paper trading 执行账本
|
||
趋势反转检测:1H连续阴动K、量价背离、空头加速(替代MACD/RSI)
|
||
推荐层只管理信号状态;模拟成交、TP/SL、移动止盈由 paper_trading 独立负责。
|
||
"""
|
||
|
||
import sys, os, shutil
|
||
|
||
# ⚠️ 安全机制:启动时强制清__pycache__,防止旧版字节码残留
|
||
for cache_dir in [
|
||
os.path.join(os.path.dirname(__file__), "__pycache__"),
|
||
os.path.join(os.path.dirname(__file__), "..", "__pycache__"),
|
||
]:
|
||
if os.path.exists(cache_dir):
|
||
shutil.rmtree(cache_dir, ignore_errors=True)
|
||
|
||
import ccxt
|
||
import pandas as pd
|
||
import json
|
||
import sys
|
||
import os
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
sys.path.insert(0, os.path.dirname(__file__))
|
||
from app.db.altcoin_db import (
|
||
init_db, get_active_recommendations, update_recommendation_tracking,
|
||
expire_old_recommendations, get_stats, update_recommendation_action_status,
|
||
apply_recommendation_state_transition, log_cron_run,
|
||
update_latest_price_cache,
|
||
)
|
||
from app.db.paper_trading import sync_recommendation as sync_paper_trade
|
||
from app.integrations.push_orchestrator import push_trade_action_update
|
||
from app.core.pa_engine import (
|
||
calc_atr, full_pa_analysis, detect_trend_exhaustion,
|
||
analyze_entry_point,
|
||
)
|
||
from app.config.config_loader import load_rules
|
||
from app.core.opportunity_lifecycle import apply_entry_quality_gate
|
||
from app.db.paper_trading import sync_recommendation as sync_paper_trade
|
||
|
||
exchange = ccxt.binance({"enableRateLimit": True})
|
||
REPO_ROOT = Path(__file__).resolve().parents[2]
|
||
|
||
PROVISIONAL_BUY_SIGNAL_MARKERS = (
|
||
"可即刻入场",
|
||
"当前价接近回踩目标",
|
||
"回踩确认完毕",
|
||
)
|
||
|
||
|
||
def fetch_klines(symbol, timeframe, limit=200):
|
||
try:
|
||
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
|
||
df = pd.DataFrame(ohlcv, columns=["timestamp", "open", "high", "low", "close", "volume"])
|
||
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
|
||
return df
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _format_tracking_price(price):
|
||
try:
|
||
price = float(price)
|
||
except Exception:
|
||
return ""
|
||
if price <= 0:
|
||
return ""
|
||
if price >= 1:
|
||
return f"${price:.3f}"
|
||
if price >= 0.01:
|
||
return f"${price:.4f}"
|
||
if price >= 0.0001:
|
||
return f"${price:.6f}"
|
||
return f"${price:.8f}"
|
||
|
||
|
||
def reconcile_buy_signals_after_gate(buy_signals, final_action, gated_plan, gate_reasons):
|
||
"""买点质量闸门降级后,移除临时买入文案,保留最终终端指引。"""
|
||
if final_action == "可即刻买入" or not gate_reasons:
|
||
return list(buy_signals or [])
|
||
|
||
filtered = [
|
||
str(signal)
|
||
for signal in (buy_signals or [])
|
||
if not any(marker in str(signal) for marker in PROVISIONAL_BUY_SIGNAL_MARKERS)
|
||
]
|
||
first_reason = str(gate_reasons[0])
|
||
if final_action == "等回踩":
|
||
target_price = (
|
||
(gated_plan or {}).get("rr_target_entry")
|
||
or (gated_plan or {}).get("entry_price")
|
||
or (gated_plan or {}).get("wait_price")
|
||
)
|
||
price_text = _format_tracking_price(target_price)
|
||
if price_text:
|
||
filtered.append(f"🟡 现价不买,等待回踩至{price_text}附近;{first_reason}")
|
||
else:
|
||
filtered.append(f"🟡 现价不买,继续等待回踩;{first_reason}")
|
||
elif final_action == "观察":
|
||
filtered.append(f"🟡 买点未达标,保持观察;{first_reason}")
|
||
return filtered
|
||
|
||
|
||
def analyze_tracking_signals(symbol, rec, current_price):
|
||
"""
|
||
对active推荐做动态跟踪分析
|
||
返回: {"action_status": str, "sell_signals": [...], "buy_signals": [...],
|
||
"exhaustion": {...}, "entry_update": {...}}
|
||
"""
|
||
sell_signals = []
|
||
buy_signals = []
|
||
action_status = "持有" # 默认状态
|
||
|
||
entry_price = rec["entry_price"]
|
||
stop_loss = rec.get("stop_loss", entry_price * 0.95)
|
||
tp1 = rec.get("tp1", entry_price * 1.03)
|
||
tp2 = rec.get("tp2", entry_price * 1.05)
|
||
entry_plan = rec.get("entry_plan") or {}
|
||
|
||
# ---- 拉取1H数据做趋势分析 ----
|
||
h1_df = fetch_klines(symbol, "1h", limit=100)
|
||
m15_df = fetch_klines(symbol, "15m", limit=100)
|
||
h4_df = fetch_klines(symbol, "4h", limit=100)
|
||
|
||
atr_1h = calc_atr(h1_df, 14) if h1_df is not None else 0
|
||
|
||
# ---- 趋势衰减检测 ----
|
||
exhaustion = {}
|
||
if h1_df is not None and atr_1h > 0:
|
||
exhaustion = detect_trend_exhaustion(h1_df, atr_1h)
|
||
if exhaustion.get("exhausted"):
|
||
for es in exhaustion.get("signals", []):
|
||
sell_signals.append(es)
|
||
if exhaustion["severity"] == "high":
|
||
action_status = "衰减"
|
||
elif exhaustion["severity"] == "medium":
|
||
# 中度衰减,还持有但需关注
|
||
sell_signals.append("趋势中度衰减,关注止盈")
|
||
|
||
# ---- 止盈信号检测 ----
|
||
pnl_pct = ((current_price / entry_price) - 1) * 100 if entry_price > 0 else 0
|
||
|
||
# TP/SL 是模拟交易生命周期,不再写成推荐信号动作。
|
||
if tp1 > 0 and current_price >= tp1:
|
||
sell_signals.append(f"模拟交易目标价已到达(${tp1:.4f}),执行结果以 paper trading 为准")
|
||
|
||
rules = load_rules()
|
||
|
||
if tp1 == 0 and pnl_pct >= 15:
|
||
sell_signals.append(f"无TP保护但浮盈已达+{pnl_pct:.1f}%,仅作为信号风险提醒,是否平仓由 paper trading/人工处理")
|
||
|
||
# ---- 止损接近警告 ----
|
||
if stop_loss > 0:
|
||
loss_pct = ((current_price / stop_loss) - 1) * 100
|
||
if loss_pct < 3: # 当前价离止损不到3%
|
||
sell_signals.append(f"⚠️ 接近止损!当前${current_price:.4f}离止损${stop_loss:.4f}仅{loss_pct:.1f}%")
|
||
if current_price <= stop_loss:
|
||
sell_signals.append(f"🔴 模拟交易止损价已触达!${current_price:.4f}≤${stop_loss:.4f},执行结果以 paper trading 为准")
|
||
|
||
# ---- 趋势反转信号(PA行为检测,替代MACD) ----
|
||
if h1_df is not None and len(h1_df) >= 30 and atr_1h > 0:
|
||
pa_1h = full_pa_analysis(h1_df, "1h")
|
||
pa_1h_candles = pa_1h.get("candles_class", [])
|
||
recent_candles = pa_1h_candles[-6:] if len(pa_1h_candles) >= 6 else pa_1h_candles
|
||
|
||
# 1H连续阴动K → 趋势反转
|
||
dy_bear_count = sum(1 for c in recent_candles if c["type"] == "dynamic" and c["direction"] == -1)
|
||
if dy_bear_count >= 3:
|
||
sell_signals.append(f"🔴 1H连续{dy_bear_count}根阴动K(趋势反转)")
|
||
if action_status == "持有":
|
||
action_status = "反转"
|
||
|
||
# 1H量价背离:放量但阴线(多头出货)
|
||
avg_vol = float(h1_df["volume"].rolling(20).mean().iloc[-1])
|
||
recent_3 = h1_df.tail(3)
|
||
high_vol_bear = 0
|
||
for _, row in recent_3.iterrows():
|
||
vol_r = row["volume"] / avg_vol if avg_vol > 0 else 0
|
||
if vol_r >= 2 and row["close"] < row["open"]:
|
||
high_vol_bear += 1
|
||
if high_vol_bear >= 2:
|
||
sell_signals.append("🔴 1H放量阴线×2(多头出货)")
|
||
|
||
# 1H连续K空头加速
|
||
cont_k = pa_1h.get("continuous_k", [])
|
||
for ck in cont_k:
|
||
if ck["type"] == "bearish_continue" and ck["length"] >= 3:
|
||
sell_signals.append(f"🔴 1H连续{ck['length']}K空头加速")
|
||
if action_status == "持有":
|
||
action_status = "反转"
|
||
|
||
# === 时间衰减 (v1.6.9) ===
|
||
# 持仓>24h仍无盈利→降级衰减
|
||
decay_cfg = rules.get("tracker", {}).get("time_decay", {})
|
||
if decay_cfg.get("enabled", True):
|
||
decay_hours = decay_cfg.get("decay_hours", 24)
|
||
rec_time = rec.get("rec_time", "")
|
||
if rec_time:
|
||
try:
|
||
t_rec = datetime.fromisoformat(rec_time)
|
||
hours_held = (datetime.now() - t_rec).total_seconds() / 3600
|
||
if hours_held > decay_hours and pnl_pct <= 0 and action_status == "持有":
|
||
sell_signals.append(f"⏰ 持仓{hours_held:.0f}h无盈利,降级衰减")
|
||
action_status = "衰减"
|
||
except Exception:
|
||
pass
|
||
|
||
# ---- 动态买入指引(只允许在未触发任何止盈/止损/退出信号时执行) ----
|
||
entry_update = {}
|
||
current_action = entry_plan.get("entry_action", "")
|
||
|
||
# 如果推荐状态是"等回踩",检查是否到了回踩价位。
|
||
# 注意:一旦本轮 action_status 已经是止盈/止损/跟踪止盈/反转/衰减,就绝不能再覆盖成“可即刻买入”。
|
||
if action_status == "持有" and current_action in ("等回踩", "🟡等回踩") and h4_df is not None and atr_1h > 0:
|
||
pa_4h = full_pa_analysis(h4_df, "4h")
|
||
h4_zones = pa_4h.get("zones", [])
|
||
direction = 1 # 做多方向
|
||
|
||
# 重新做15min入场点分析
|
||
if m15_df is not None and len(m15_df) >= 20:
|
||
entry_result = analyze_entry_point(
|
||
h1_df=h1_df, m15_df=m15_df, atr_1h=atr_1h,
|
||
zones_4h=h4_zones, direction=direction,
|
||
)
|
||
new_action = entry_result.get("action", "等回踩")
|
||
|
||
if new_action == "即刻买入":
|
||
buy_signals.append(f"🟢 回踩确认完毕!可即刻入场(15min动K确认)")
|
||
action_status = "可即刻买入"
|
||
elif new_action == "等回踩":
|
||
wait_price = entry_result.get("wait_price", 0)
|
||
if wait_price > 0:
|
||
# 检查当前价是否接近回踩目标
|
||
dist_pct = ((current_price / wait_price) - 1) * 100
|
||
if abs(dist_pct) < 2:
|
||
buy_signals.append(f"🟢 当前价接近回踩目标!${current_price:.4f}≈${wait_price:.4f}")
|
||
action_status = "可即刻买入"
|
||
|
||
entry_update = {
|
||
"new_action": new_action,
|
||
"reason": entry_result.get("reason", ""),
|
||
"wait_price": entry_result.get("wait_price", 0),
|
||
}
|
||
|
||
action_status, gated_plan, gate_reasons = apply_entry_quality_gate(
|
||
action_status=action_status,
|
||
entry_plan=entry_plan,
|
||
signals=rec.get("signals"),
|
||
current_price=current_price,
|
||
market_context=rec.get("market_context") or {},
|
||
derivatives_context=rec.get("derivatives_context") or {},
|
||
sector_context=rec.get("sector_context") or {},
|
||
)
|
||
if gate_reasons:
|
||
buy_signals = reconcile_buy_signals_after_gate(
|
||
buy_signals,
|
||
action_status,
|
||
gated_plan,
|
||
gate_reasons,
|
||
)
|
||
buy_signals.append("⚠️ 买点质量闸门: " + ";".join(gate_reasons[:3]))
|
||
entry_plan.update(gated_plan)
|
||
|
||
return {
|
||
"action_status": action_status,
|
||
"sell_signals": sell_signals,
|
||
"buy_signals": buy_signals,
|
||
"exhaustion": exhaustion,
|
||
"entry_update": entry_update,
|
||
"pnl_pct": round(pnl_pct, 2),
|
||
}
|
||
|
||
|
||
def track_prices():
|
||
"""拉取所有active推荐币的实时价格,更新盈亏 + 动态跟踪信号"""
|
||
recs = get_active_recommendations(actionable_only=True)
|
||
if not recs:
|
||
output = {
|
||
"status": "no_active",
|
||
"message": "无active推荐需要跟踪",
|
||
"stats": get_stats(),
|
||
"track_time": datetime.now().isoformat(),
|
||
}
|
||
print(json.dumps(output, ensure_ascii=False))
|
||
return output
|
||
|
||
results = []
|
||
tracked_count = 0
|
||
failed_symbols = []
|
||
for rec in recs:
|
||
symbol = rec["symbol"]
|
||
try:
|
||
if not rec.get("entry_triggered") and rec.get("display_bucket") != "position" and rec.get("execution_status") not in ("holding", "completed"):
|
||
results.append({
|
||
"symbol": symbol,
|
||
"rec_id": rec["id"],
|
||
"entry_price": rec["entry_price"],
|
||
"current_price": None,
|
||
"pnl_pct": None,
|
||
"status": "skipped_watch_only",
|
||
"action_status": rec.get("action_status"),
|
||
"sell_signals": [],
|
||
"buy_signals": [],
|
||
"exhaustion_severity": "low",
|
||
})
|
||
print(f" {symbol}: 观察池样本跳过跟踪与止盈判断")
|
||
continue
|
||
ticker = exchange.fetch_ticker(symbol)
|
||
current_price = ticker["last"]
|
||
|
||
# 最新价格缓存:看板读取小表 latest_price_cache,不再依赖 price_tracking 高频流水表
|
||
update_latest_price_cache(symbol, current_price, source="tracker")
|
||
|
||
# 基础盈亏跟踪
|
||
track_result = update_recommendation_tracking(rec["id"], current_price)
|
||
|
||
# PA增强:动态跟踪信号分析
|
||
tracking_signals = analyze_tracking_signals(symbol, rec, current_price)
|
||
|
||
# 主链路状态迁移:tracker 只提交“候选状态 + 当前价”,最终状态由 DB 主链路统一落库。
|
||
# 飞书推送只能消费主链路返回的最终状态,不能再自行判断。
|
||
terminal_action = {
|
||
"hit_tp2": "止盈2",
|
||
"stopped_out": "止损",
|
||
}.get(track_result.get("status"))
|
||
requested_action = terminal_action or tracking_signals["action_status"]
|
||
state_decision = apply_recommendation_state_transition(
|
||
rec["id"],
|
||
requested_action=requested_action,
|
||
current_price=current_price,
|
||
event_time=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
|
||
signals=tracking_signals.get("sell_signals", []) + tracking_signals.get("buy_signals", []),
|
||
)
|
||
final_action = state_decision.get("action_status", requested_action)
|
||
paper_result = sync_paper_trade(
|
||
{**rec, **state_decision, "id": rec["id"], "symbol": symbol},
|
||
current_price,
|
||
event_time=datetime.now().isoformat(),
|
||
)
|
||
if paper_result.get("opened") or paper_result.get("closed") or paper_result.get("activated") or paper_result.get("moved"):
|
||
print(f" {symbol}: paper trading event -> {paper_result}")
|
||
|
||
results.append({
|
||
"symbol": symbol,
|
||
"rec_id": rec["id"],
|
||
"entry_price": rec["entry_price"],
|
||
"current_price": current_price,
|
||
"pnl_pct": tracking_signals["pnl_pct"],
|
||
"status": track_result["status"],
|
||
"action_status": tracking_signals["action_status"],
|
||
"sell_signals": tracking_signals["sell_signals"],
|
||
"buy_signals": tracking_signals["buy_signals"],
|
||
"exhaustion_severity": tracking_signals.get("exhaustion", {}).get("severity", "low"),
|
||
"paper_trade": paper_result,
|
||
})
|
||
print(f" {symbol}: 入场${rec['entry_price']} → 现在${current_price} "
|
||
f"盈亏{tracking_signals['pnl_pct']}% 状态={track_result['status']} "
|
||
f"操作={tracking_signals['action_status']}")
|
||
tracked_count += 1
|
||
|
||
except Exception as e:
|
||
failed_symbols.append({"symbol": symbol, "error": str(e)})
|
||
print(f" {symbol}: 获取价格失败 - {e}")
|
||
|
||
# 过期检查
|
||
expire_old_recommendations()
|
||
|
||
output = {
|
||
"status": "tracked",
|
||
"tracked_count": tracked_count,
|
||
"failed_count": len(failed_symbols),
|
||
"failed_symbols": failed_symbols,
|
||
"results": results,
|
||
"stats": get_stats(),
|
||
"track_time": datetime.now().isoformat(),
|
||
}
|
||
print(json.dumps(output, ensure_ascii=False, indent=2))
|
||
return output
|
||
|
||
|
||
def main():
|
||
started_at = datetime.now()
|
||
try:
|
||
init_db()
|
||
output = track_prices()
|
||
except Exception as e:
|
||
finished_at = datetime.now()
|
||
log_cron_run(
|
||
job_name="跟踪",
|
||
script_name="price_tracker.py",
|
||
run_status="error",
|
||
result_status="exception",
|
||
started_at=started_at.isoformat(),
|
||
finished_at=finished_at.isoformat(),
|
||
duration_ms=int((finished_at - started_at).total_seconds() * 1000),
|
||
summary={},
|
||
error_message=str(e),
|
||
)
|
||
raise
|
||
else:
|
||
finished_at = datetime.now()
|
||
summary = {
|
||
"tracked_count": output.get("tracked_count", 0),
|
||
"failed_count": output.get("failed_count", 0),
|
||
"active_count": output.get("stats", {}).get("active_count", 0),
|
||
}
|
||
log_cron_run(
|
||
job_name="跟踪",
|
||
script_name="price_tracker.py",
|
||
run_status="success",
|
||
result_status=output.get("status", "completed"),
|
||
started_at=started_at.isoformat(),
|
||
finished_at=finished_at.isoformat(),
|
||
duration_ms=int((finished_at - started_at).total_seconds() * 1000),
|
||
summary=summary,
|
||
error_message="",
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="AlphaX Agent 价格跟踪任务")
|
||
parser.add_argument("--once", action="store_true", default=True, help="执行单轮跟踪并输出结果")
|
||
parser.parse_args()
|
||
main()
|