374 lines
14 KiB
Python
374 lines
14 KiB
Python
"""
|
||
A股龙回头选股器
|
||
策略:热门板块 + MA多头排列 + 量价配合龙回头
|
||
执行时间:每天盘前 9:00
|
||
|
||
【选股条件】
|
||
1. MA 多头排列:MA5 > MA10 > MA30
|
||
2. 从近期高点回调 2-20%
|
||
3. 回调期间缩量(成交量 < 上涨期的 80%)
|
||
4. 接近 MA10/30/60 支撑位(±10%以内)
|
||
5. 近期再度放量(最近2天 > 回调期的 1.2倍)
|
||
"""
|
||
import pandas as pd
|
||
from typing import Dict, List, Any, Optional
|
||
from datetime import datetime, timedelta
|
||
from app.utils.logger import logger
|
||
|
||
|
||
class PullbackStockSelector:
|
||
"""龙回头选股器"""
|
||
|
||
def __init__(self):
|
||
"""初始化选股器"""
|
||
try:
|
||
import tushare as ts
|
||
self.ts = ts
|
||
# 从配置获取 token
|
||
from app.config import get_settings
|
||
self.settings = get_settings()
|
||
self.pro = ts.pro_api(self.settings.tushare_token)
|
||
logger.info("龙回头选股器初始化成功")
|
||
except ImportError:
|
||
logger.error("tushare 未安装")
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"龙回头选股器初始化失败: {e}")
|
||
raise
|
||
|
||
def get_hot_concepts(self, limit: int = 10) -> List[tuple]:
|
||
"""
|
||
从 Tushare 获取热门概念板块
|
||
|
||
使用 ths_index 接口获取同花顺概念板块列表
|
||
|
||
Args:
|
||
limit: 返回板块数量
|
||
|
||
Returns:
|
||
[(概念代码, 概念名称), ...]
|
||
"""
|
||
try:
|
||
# 使用 ths_index 获取同花顺概念板块
|
||
index_df = self.pro.ths_index(
|
||
market='A', # A股市场
|
||
fields='ts_code,name'
|
||
)
|
||
|
||
if index_df.empty:
|
||
logger.warning("未能获取概念板块列表,使用备用列表")
|
||
return self._get_fallback_sectors()
|
||
|
||
# 筛选科技相关的热门概念(关键词匹配)
|
||
tech_keywords = ['人工智能', '芯片', '半导体', '新能源', '汽车', '云计算',
|
||
'网络安全', '软件', '数字', '5G', '锂电', '光伏', 'AI', '科技']
|
||
|
||
filtered = []
|
||
for _, row in index_df.iterrows():
|
||
concept_name = row['name']
|
||
concept_code = row['ts_code']
|
||
|
||
# 检查是否包含关键词
|
||
for keyword in tech_keywords:
|
||
if keyword in concept_name:
|
||
filtered.append((concept_code, concept_name))
|
||
break
|
||
|
||
if len(filtered) >= limit * 2: # 多获取一些,后续筛选
|
||
break
|
||
|
||
return filtered[:limit] if filtered else self._get_fallback_sectors()
|
||
|
||
except Exception as e:
|
||
logger.warning(f"获取热门概念失败: {e},使用备用列表")
|
||
return self._get_fallback_sectors()
|
||
|
||
def _get_fallback_sectors(self) -> List[tuple]:
|
||
"""备用板块列表(使用已验证的板块代码)"""
|
||
return [
|
||
('884031.TI', '人工智能'),
|
||
('884065.TI', '新能源汽车'),
|
||
('884039.TI', '云计算'),
|
||
('884145.TI', '国产软件'),
|
||
('884192.TI', '5G概念'),
|
||
]
|
||
|
||
def select_from_sector(self, sector_code: str, sector_name: str, max_stocks: int = 5) -> List[Dict[str, Any]]:
|
||
"""
|
||
从指定板块选出龙回头股票
|
||
|
||
Args:
|
||
sector_code: 板块代码
|
||
sector_name: 板块名称
|
||
max_stocks: 最多返回股票数
|
||
|
||
Returns:
|
||
符合条件的股票列表
|
||
"""
|
||
try:
|
||
# 1. 获取板块成分股
|
||
members_df = self.pro.ths_member(
|
||
ts_code=sector_code,
|
||
fields='ts_code,name,con_code,name'
|
||
)
|
||
if members_df.empty:
|
||
logger.warning(f"板块 {sector_name}({sector_code}) 无成分股数据,可能是板块代码不正确或该板块已下线")
|
||
return []
|
||
|
||
stock_codes = members_df['con_code'].tolist()
|
||
logger.info(f"板块 {sector_name} 共 {len(stock_codes)} 只成分股")
|
||
|
||
# 2. 逐个检查股票
|
||
selected_stocks = []
|
||
for i, stock_code in enumerate(stock_codes[:50]): # 最多检查前50只
|
||
result = self._check_stock(stock_code)
|
||
if result:
|
||
result['sector_name'] = sector_name
|
||
selected_stocks.append(result)
|
||
logger.info(f" ✓ 找到: {result['name']}({stock_code}) - 回踩 {result['pullback_pct']:.2f}%")
|
||
|
||
if len(selected_stocks) >= max_stocks:
|
||
break
|
||
|
||
return selected_stocks
|
||
|
||
except Exception as e:
|
||
logger.error(f"从板块选股失败 {sector_name}({sector_code}): {e}")
|
||
return []
|
||
|
||
def _check_stock(self, stock_code: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
检查单只股票是否符合量价配合龙回头条件
|
||
|
||
【新策略】量价配合龙回头:
|
||
1. MA 多头排列:MA5 > MA10 > MA30(上涨趋势)
|
||
2. 从近期高点回调(寻找龙回头机会)
|
||
3. 回调期间缩量(成交量明显萎缩,主力未离场)
|
||
4. 接近 MA10/30/60 支撑位
|
||
5. 近期有再度放量迹象(资金重新入场)
|
||
|
||
Returns:
|
||
符合条件返回股票信息,否则返回 None
|
||
"""
|
||
try:
|
||
# 获取最近150天的日线数据
|
||
end_date = datetime.now().strftime('%Y%m%d')
|
||
start_date = (datetime.now() - timedelta(days=150)).strftime('%Y%m%d')
|
||
|
||
df = self.pro.daily(
|
||
ts_code=stock_code,
|
||
start_date=start_date,
|
||
end_date=end_date
|
||
)
|
||
|
||
if df.empty or len(df) < 70:
|
||
return None
|
||
|
||
df = df.sort_values('trade_date').reset_index(drop=True)
|
||
df = df.tail(90).reset_index(drop=True) # 取最近90天
|
||
|
||
close = df['close']
|
||
volume = df['vol'] # 成交量(手)
|
||
|
||
# ========== 1. 计算 MA 均线 ==========
|
||
ma5 = close.rolling(window=5).mean()
|
||
ma10 = close.rolling(window=10).mean()
|
||
ma30 = close.rolling(window=30).mean()
|
||
ma60 = close.rolling(window=60).mean()
|
||
|
||
latest = df.iloc[-1]
|
||
latest_close = latest['close']
|
||
latest_ma5 = ma5.iloc[-1]
|
||
latest_ma10 = ma10.iloc[-1]
|
||
latest_ma30 = ma30.iloc[-1]
|
||
latest_ma60 = ma60.iloc[-1]
|
||
|
||
# 条件1:MA 多头排列 MA5 > MA10 > MA30
|
||
if not (latest_ma5 > latest_ma10 > latest_ma30):
|
||
logger.debug(f" ✗ {stock_code}: MA非多头排列 (MA5:{latest_ma5:.2f}, MA10:{latest_ma10:.2f}, MA30:{latest_ma30:.2f})")
|
||
return None
|
||
|
||
# ========== 2. 找近期高点(最近20天内最高价) ==========
|
||
lookback_high = 20
|
||
recent_df = df.tail(lookback_high).reset_index(drop=True)
|
||
high_idx_in_recent = recent_df['close'].idxmax()
|
||
high_price = recent_df.loc[high_idx_in_recent, 'close']
|
||
high_idx_in_df = df.index[-lookback_high] + high_idx_in_recent
|
||
|
||
# 计算从高点的回踩幅度
|
||
pullback_pct = (high_price - latest_close) / high_price * 100
|
||
|
||
# 条件2:回踩幅度 2-20%(放宽)
|
||
if not (2 <= pullback_pct <= 20):
|
||
logger.debug(f" ✗ {stock_code}: 回踩幅度不符合 (回踩:{pullback_pct:.2f}%, 需要2-20%)")
|
||
return None
|
||
|
||
# ========== 3. 量能形态分析:放量上涨→缩量回调→再度放量 ==========
|
||
|
||
# 上涨期间的成交量(从低点到高点)
|
||
rise_period = df.loc[:high_idx_in_df]
|
||
rise_volume_avg = rise_period['vol'].mean()
|
||
|
||
# 回调期间的成交量(从高点到现在,最近5天)
|
||
pullback_start_idx = min(high_idx_in_df + 1, len(df) - 1)
|
||
pullback_period = df.loc[pullback_start_idx:][-5:] # 回调期间最近5天
|
||
pullback_volume_avg = pullback_period['vol'].mean()
|
||
|
||
# 条件3:回调期间缩量(成交量 < 上涨期间的 80%,放宽)
|
||
volume_shrink_ratio = pullback_volume_avg / rise_volume_avg if rise_volume_avg > 0 else 1
|
||
if volume_shrink_ratio >= 0.8:
|
||
logger.debug(f" ✗ {stock_code}: 回调未缩量 (缩量比:{volume_shrink_ratio:.2%}, 需要<80%)")
|
||
return None
|
||
|
||
# 条件4:最近2天再度放量(比回调期间平均成交量增加 20%+,放宽)
|
||
recent_2_days_volume = df.tail(2)['vol'].mean()
|
||
if recent_2_days_volume < pullback_volume_avg * 1.2:
|
||
logger.debug(f" ✗ {stock_code}: 未再度放量 (最近2天/回调期:{recent_2_days_volume/pullback_volume_avg:.2f}, 需要>1.2)")
|
||
return None
|
||
|
||
# ========== 4. 接近 MA 支撑位 ==========
|
||
ma10_diff = abs(latest_close - latest_ma10) / latest_ma10 * 100
|
||
ma30_diff = abs(latest_close - latest_ma30) / latest_ma30 * 100
|
||
ma60_diff = abs(latest_close - latest_ma60) / latest_ma60 * 100
|
||
|
||
# 接近任意一条 MA 线(±10%以内,放宽)
|
||
near_ma = (ma10_diff <= 10 or ma30_diff <= 10 or ma60_diff <= 10)
|
||
|
||
# 未跌破 MA60(允许跌破 10%,放宽)
|
||
above_ma60 = latest_close >= latest_ma60 * 0.90
|
||
|
||
if not (near_ma and above_ma60):
|
||
logger.debug(f" ✗ {stock_code}: 未接近MA支撑或已跌破MA60 (MA10差:{ma10_diff:.2f}%, MA30:{ma30_diff:.2f}%, MA60:{ma60_diff:.2f}%)")
|
||
return None
|
||
|
||
# ========== 5. 计算前期涨幅 ==========
|
||
rise_data = df.loc[:high_idx_in_df]
|
||
low_price = rise_data['low'].min()
|
||
rise_pct = (high_price - low_price) / low_price * 100
|
||
|
||
# 涨幅需 > 10%
|
||
if rise_pct < 10:
|
||
logger.debug(f" ✗ {stock_code}: 涨幅不足 (涨幅:{rise_pct:.2f}%, 需要>10%)")
|
||
return None
|
||
|
||
# 获取股票名称
|
||
stock_info = self.pro.stock_basic(ts_code=stock_code, fields='ts_code,name')
|
||
stock_name = stock_info.iloc[0]['name'] if not stock_info.empty else stock_code
|
||
|
||
return {
|
||
'ts_code': stock_code,
|
||
'name': stock_name,
|
||
'close': latest_close,
|
||
'high': high_price,
|
||
'rise_pct': rise_pct,
|
||
'pullback_pct': pullback_pct,
|
||
'ma5': latest_ma5,
|
||
'ma10': latest_ma10,
|
||
'ma30': latest_ma30,
|
||
'ma60': latest_ma60,
|
||
'volume_shrink_ratio': volume_shrink_ratio,
|
||
'recent_volume_ratio': recent_2_days_volume / pullback_volume_avg,
|
||
'trade_date': latest['trade_date']
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.debug(f"检查股票 {stock_code} 失败: {e}")
|
||
return None
|
||
|
||
def select_from_hot_sectors(self, top_n: int = 5) -> Dict[str, List[Dict[str, Any]]]:
|
||
"""
|
||
从热门板块选股
|
||
|
||
Args:
|
||
top_n: 选择前N个热门板块
|
||
|
||
Returns:
|
||
{板块名称: [股票列表]}
|
||
"""
|
||
try:
|
||
# 使用 Tushare API 动态获取热门板块
|
||
hot_sectors = self.get_hot_concepts(limit=top_n * 2) # 多获取一些以备筛选
|
||
|
||
logger.info(f"开始龙回头选股,共 {len(hot_sectors)} 个热门板块")
|
||
|
||
results = {}
|
||
checked_count = 0
|
||
skipped_count = 0
|
||
|
||
for sector_code, sector_name in hot_sectors:
|
||
if checked_count >= top_n:
|
||
break
|
||
|
||
logger.info(f"检查板块: {sector_name}")
|
||
|
||
selected = self.select_from_sector(sector_code, sector_name, max_stocks=2)
|
||
|
||
if selected:
|
||
results[sector_name] = selected
|
||
logger.info(f" ✓ 板块 {sector_name} 选出 {len(selected)} 只")
|
||
checked_count += 1
|
||
else:
|
||
logger.info(f" - 板块 {sector_name} 未选出")
|
||
skipped_count += 1
|
||
# 跳过无数据的板块,继续检查下一个
|
||
if skipped_count >= 5: # 如果连续5个板块都没有数据,就停止
|
||
logger.warning("连续多个板块无数据,停止检查")
|
||
break
|
||
|
||
return results
|
||
|
||
except Exception as e:
|
||
logger.error(f"从热门板块选股失败: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return {}
|
||
|
||
def format_result(self, results: Dict[str, List[Dict[str, Any]]]) -> str:
|
||
"""
|
||
格式化选股结果
|
||
|
||
Args:
|
||
results: 选股结果
|
||
|
||
Returns:
|
||
格式化的文本
|
||
"""
|
||
if not results:
|
||
return "今日未选出符合条件的龙回头股票"
|
||
|
||
lines = [
|
||
"📊 **龙回头选股结果**",
|
||
f"",
|
||
f"选股时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
|
||
f"",
|
||
]
|
||
|
||
total_stocks = 0
|
||
for sector_name, stocks in results.items():
|
||
lines.append(f"**🏢 {sector_name}**")
|
||
for stock in stocks:
|
||
total_stocks += 1
|
||
lines.append(f" • {stock['name']}({stock['ts_code']})")
|
||
lines.append(f" 现价: ¥{stock['close']:.2f} | 回踩: {stock['pullback_pct']:.2f}% | 涨幅: {stock['rise_pct']:.2f}%")
|
||
lines.append(f" MA5: ¥{stock['ma5']:.2f} | MA10: ¥{stock['ma10']:.2f} | MA30: ¥{stock['ma30']:.2f} | MA60: ¥{stock['ma60']:.2f}")
|
||
lines.append(f" 缩量比: {stock['volume_shrink_ratio']:.0%} | 再放量: {stock['recent_volume_ratio']:.2f}x")
|
||
|
||
lines.append("")
|
||
lines.append(f"**共选出 {total_stocks} 只股票**")
|
||
lines.append("")
|
||
lines.append("*⚠️ 仅供参考,不构成投资建议*")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
# 全局单例
|
||
_pullback_selector: Optional[PullbackStockSelector] = None
|
||
|
||
|
||
def get_pullback_selector() -> PullbackStockSelector:
|
||
"""获取龙回头选股器单例"""
|
||
global _pullback_selector
|
||
if _pullback_selector is None:
|
||
_pullback_selector = PullbackStockSelector()
|
||
return _pullback_selector
|