astock-agent/backend/app/analysis/potential_scanner.py
2026-04-08 22:39:51 +08:00

417 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""潜在启动股扫描器Channel B
从全市场中寻找底部蓄势、即将启动的股票。
不依赖板块热度,独立于 Channel A强中选强
筛选逻辑:
1. 基本面预筛PE/PB 合理、市值适中、非 ST 非次新
2. 底部形态:股价在 60 日低点附近,近期不再创新低
3. 缩量蓄势:成交量萎缩至地量水平
4. 技术早期信号MACD 底部金叉、RSI 从超卖回升等
"""
import logging
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from app.data.tushare_client import tushare_client
from app.analysis.technical import add_all_indicators
from app.analysis.capital_flow import _score_valuation
from app.config import settings
logger = logging.getLogger(__name__)
def scan_potential_breakout(
trade_date: str = None,
exclude_codes: set[str] = None,
) -> list[dict]:
"""扫描全市场潜在启动股票
Args:
trade_date: 交易日期,默认最新
exclude_codes: 需要排除的股票代码集合Channel A 已推荐的)
Returns:
list[dict]: [{ts_code, name, sector, capital_score, ...}]
"""
if not trade_date:
trade_date = tushare_client.get_latest_trade_date()
exclude_codes = exclude_codes or set()
# ── 第一步:基本面预筛 ──
basic_df = tushare_client.get_daily_basic(trade_date)
if basic_df.empty:
logger.warning("Channel B: daily_basic 数据为空")
return []
stock_basic = tushare_client.get_stock_basic()
# 过滤 ST
st_codes = set()
if not stock_basic.empty:
st_codes = set(stock_basic[stock_basic["name"].str.contains("ST", na=False)]["ts_code"])
# 过滤次新(上市不足 min_list_days 天)
new_codes = set()
if not stock_basic.empty:
cutoff = (datetime.now() - timedelta(days=settings.min_list_days)).strftime("%Y%m%d")
new_codes = set(stock_basic[stock_basic["list_date"] > cutoff]["ts_code"])
# 基本面硬性条件:仅沪深主板(00/60) + 创业板(30)
# 自动排除:北证(xxxxx.BJ)、科创板(68xxxx.SH)、ST、次新
eligible = basic_df[
(basic_df["ts_code"].str.startswith(("00", "30", "60"))) &
(~basic_df["ts_code"].isin(st_codes)) &
(~basic_df["ts_code"].isin(new_codes)) &
(~basic_df["ts_code"].isin(exclude_codes))
].copy()
# PE 筛选0-60排除亏损和泡沫
if "pe_ttm" in eligible.columns:
eligible = eligible[
eligible["pe_ttm"].notna() &
(eligible["pe_ttm"] > 0) &
(eligible["pe_ttm"] <= 60)
]
# PB 筛选0-8
if "pb" in eligible.columns:
eligible = eligible[
eligible["pb"].notna() &
(eligible["pb"] > 0) &
(eligible["pb"] <= 8)
]
# 流通市值30-500 亿
if "circ_mv" in eligible.columns:
eligible["circ_mv_yi"] = eligible["circ_mv"] / 10000 # 万 → 亿
eligible = eligible[
(eligible["circ_mv_yi"] >= 30) &
(eligible["circ_mv_yi"] <= 500)
]
# 换手率 > 1%(有一定活跃度)
if "turnover_rate" in eligible.columns:
eligible = eligible[eligible["turnover_rate"].notna() & (eligible["turnover_rate"] > 1)]
# 当日跌幅或微涨(底部区域:涨幅 < 3%,排除已启动的)
if "pct_chg" in eligible.columns:
eligible = eligible[
eligible["pct_chg"].notna() &
(eligible["pct_chg"] < 3)
]
if eligible.empty:
logger.warning("Channel B: 基本面预筛后无股票")
return []
# 按换手率排序取 Top N活跃度高的优先避免扫描过多
if "turnover_rate" in eligible.columns:
eligible = eligible.sort_values("turnover_rate", ascending=False)
max_candidates = 300 # 最多扫描 300 只,控制 K 线请求量
candidate_codes = eligible["ts_code"].head(max_candidates).tolist()
logger.info(f"Channel B: 基本面预筛后 {len(eligible)} 只,取 Top {len(candidate_codes)} 只进行技术扫描")
# ── 第二步:技术面扫描 ──
results = []
processed = 0
for ts_code in candidate_codes:
try:
df = tushare_client.get_stock_daily(ts_code, days=120)
if df.empty or len(df) < 30:
continue
df = df.sort_values("trade_date").reset_index(drop=True)
df = add_all_indicators(df)
# 底部形态检测
if not _check_bottom_reversal(df):
continue
# 缩量蓄势检测
volume_shrink = _check_volume_shrink(df)
# 技术早期信号(至少满足一个)
early_signal = _check_technical_early_signal(df)
if not early_signal:
continue
# 计算评分
score = _score_potential(df, volume_shrink)
# 估值评分
row_data = eligible[eligible["ts_code"] == ts_code]
pe = None
pb = None
circ_mv = None
turnover_rate = 0
if not row_data.empty:
r = row_data.iloc[0]
pe = float(r["pe_ttm"]) if pd.notna(r.get("pe_ttm")) else None
pb = float(r["pb"]) if pd.notna(r.get("pb")) else None
circ_mv = float(r.get("circ_mv_yi", 0)) or None
turnover_rate = float(r.get("turnover_rate", 0) or 0)
valuation_score = _score_valuation(pe, pb)
# 股票名称
name = ts_code
if not stock_basic.empty:
name_row = stock_basic[stock_basic["ts_code"] == ts_code]
if not name_row.empty:
name = name_row.iloc[0]["name"]
# 行业作为 sector
sector = ""
if not stock_basic.empty:
ind_row = stock_basic[stock_basic["ts_code"] == ts_code]
if not ind_row.empty:
sector = str(ind_row.iloc[0].get("industry", ""))
results.append({
"ts_code": ts_code,
"name": name,
"sector": sector,
"sectors": [sector] if sector else [],
"capital_score": round(score, 1),
"valuation_score": round(valuation_score, 1),
"main_net_inflow": 0,
"inflow_ratio": 0,
"turnover_rate": round(turnover_rate, 2),
"volume_ratio": None,
"circ_mv": circ_mv,
"pe": round(pe, 2) if pe else None,
"pb": round(pb, 2) if pb else None,
})
processed += 1
except Exception as e:
logger.debug(f"Channel B 扫描 {ts_code} 异常: {e}")
continue
# 按评分排序
results.sort(key=lambda x: x["capital_score"], reverse=True)
top = results[:settings.top_stock_count]
logger.info(f"Channel B: 扫描完成,{len(results)} 只底部股票,取 Top {len(top)}")
for r in top:
logger.info(
f"Channel B: {r['name']}({r['ts_code']}) "
f"评分={r['capital_score']} 估值={r['valuation_score']}"
)
return top
def _check_bottom_reversal(df: pd.DataFrame) -> bool:
"""底部反转形态检测
条件(满足任一即可):
1. 股价距 60 日低点 < 10%(深度底部)
2. 股价距 60 日低点 < 20% 且近 5 日止跌企稳
"""
if len(df) < 20:
return False
lookback = min(60, len(df))
recent = df.tail(lookback)
last = df.iloc[-1]
low_60d = recent["low"].min()
dist_from_low = (last["close"] - low_60d) / low_60d * 100
# 深度底部:距低点 < 10%,直接通过
if dist_from_low < 10:
return True
# 一般底部:距低点 < 20% 且近期止跌
if dist_from_low > 20:
return False
# 止跌检测:近 5 日最低价 vs 之前 5 日最低价
last_5 = df.tail(5)
if len(df) >= 10:
prev_5 = df.iloc[-10:-5]
recent_min = last_5["low"].min()
prev_min = prev_5["low"].min()
return recent_min >= prev_min * 0.97 # 允许 3% 误差
return False
def _check_volume_shrink(df: pd.DataFrame) -> bool:
"""缩量蓄势检测
条件:近 3 日均量 < vol_ma20 * 0.7
"""
if len(df) < 20:
return False
last = df.iloc[-1]
vol_ma20 = last.get("vol_ma10", last["vol"]) # 用 vol_ma10 近似
recent_3_vol = df["vol"].tail(3).mean()
return recent_3_vol < vol_ma20 * 0.7
def _check_technical_early_signal(df: pd.DataFrame) -> bool:
"""技术早期信号检测(满足任一即可)
1. MACD 底部金叉DIF 从负区上穿 DEA
2. MACD 柱状线由负转正或缩短(底部动能转换)
3. RSI 从超卖区回升5 日前 RSI < 40当前 > 40
4. 底部放量长阳(近 10 日内有涨幅 >3% 且量 > vol_ma5*1.3
5. MA5 开始向上拐头(短期趋势转变)
6. 布林带下轨支撑(触及下轨后反弹)
"""
if len(df) < 10:
return False
last = df.iloc[-1]
prev = df.iloc[-2]
# 信号 1MACD 底部金叉
if (prev["dif"] <= prev["dea"] and last["dif"] > last["dea"]
and last["dif"] < 0):
return True
# 信号 2MACD 柱状线缩短(绿柱变短,动能衰减)
if len(df) >= 3:
hist_1 = last["macd_hist"]
hist_2 = prev["macd_hist"]
hist_3 = df.iloc[-3]["macd_hist"]
if hist_1 < 0 and hist_2 < 0 and hist_1 > hist_2 and hist_2 > hist_3:
return True # 绿柱连续缩短
# 信号 3RSI 从超卖区回升
if len(df) >= 6:
rsi_5d_ago = df.iloc[-6].get("rsi14", 50)
rsi_now = last.get("rsi14", 50)
if rsi_5d_ago < 40 and rsi_now > 40:
return True
# 信号 4底部放量长阳
recent_10 = df.tail(10)
for _, row in recent_10.iterrows():
pct = row.get("pct_chg", 0) or 0
vol = row["vol"]
vol_ma = row.get("vol_ma5", vol)
if pct > 3 and vol > vol_ma * 1.3:
return True
# 信号 5MA5 向上拐头3 日前下降,现在上升)
if len(df) >= 4:
ma5_now = last["ma5"]
ma5_1d = prev["ma5"]
ma5_3d = df.iloc[-4]["ma5"]
if ma5_1d < ma5_3d and ma5_now > ma5_1d: # 先降后升,拐头
return True
# 信号 6布林带下轨支撑后反弹
if "boll_lower" in df.columns:
recent_5 = df.tail(5)
touched = any(row["low"] <= row["boll_lower"] * 1.01 for _, row in recent_5.iterrows())
if touched and last["close"] > last["boll_lower"]:
return True
return False
def _score_potential(df: pd.DataFrame, volume_shrink: bool) -> float:
"""潜在启动股评分0-100
评分维度:
- 底部位置得分 (35%): 距 60 日低点越近越好
- 技术信号强度 (30%): MACD/RSI/量价共振
- 缩量蓄势 (20%): 成交量萎缩程度
- 均线形态 (15%): 均线收敛或即将突破
"""
score = 0.0
last = df.iloc[-1]
lookback = min(60, len(df))
low_60d = df["low"].tail(lookback).min()
dist_from_low = (last["close"] - low_60d) / low_60d * 100
# 1) 底部位置得分 (35 分)
if dist_from_low < 5:
score += 35 # 几乎在最低点
elif dist_from_low < 8:
score += 28
elif dist_from_low < 12:
score += 20
elif dist_from_low <= 15:
score += 12
elif dist_from_low <= 20:
score += 6 # 距低点较远但仍属底部区域
# 2) 技术信号强度 (30 分)
signal_count = 0
prev = df.iloc[-2] if len(df) >= 2 else last
# MACD 金叉
if prev["dif"] <= prev["dea"] and last["dif"] > last["dea"] and last["dif"] < 0:
signal_count += 1
# RSI 回升
if len(df) >= 6:
rsi_5d_ago = df.iloc[-6].get("rsi14", 50)
rsi_now = last.get("rsi14", 50)
if rsi_5d_ago < 35 and rsi_now > 40:
signal_count += 1
# 放量长阳
recent_10 = df.tail(10)
for _, row in recent_10.iterrows():
pct = row.get("pct_chg", 0) or 0
vol = row["vol"]
vol_ma = row.get("vol_ma5", vol)
if pct > 3 and vol > vol_ma * 1.5:
signal_count += 1
break
if signal_count >= 3:
score += 30
elif signal_count == 2:
score += 22
elif signal_count == 1:
score += 14
# 3) 缩量蓄势 (20 分)
if volume_shrink:
recent_3_vol = df["vol"].tail(3).mean()
vol_ma10 = last.get("vol_ma10", df["vol"].mean())
shrink_ratio = recent_3_vol / vol_ma10 if vol_ma10 > 0 else 1
if shrink_ratio < 0.4:
score += 20 # 极度缩量
elif shrink_ratio < 0.55:
score += 15
elif shrink_ratio < 0.7:
score += 10
else:
score += 3
# 4) 均线形态 (15 分): 短期均线开始上翘
if len(df) >= 20:
ma5_now = last["ma5"]
ma5_3d = df.iloc[-3]["ma5"] if len(df) >= 3 else ma5_now
ma10_now = last["ma10"]
ma20_now = last["ma20"]
# MA5 上穿 MA10
if ma5_now > ma10_now and df.iloc[-2]["ma5"] <= df.iloc[-2]["ma10"]:
score += 15 # 金叉
# MA5 向上
elif ma5_now > ma5_3d:
score += 10
# 均线收敛MA5 接近 MA20
elif abs(ma5_now - ma20_now) / ma20_now < 0.02:
score += 8
return score