stock-ai-agent/backend/app/astock_agent/pullback_selector.py
2026-02-27 21:50:26 +08:00

374 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
A股龙回头选股器
策略:热门板块 + MA多头排列 + 量价配合龙回头
执行时间:每天盘前 9:00
【选股条件】
1. MA 多头排列MA5 > MA10 > MA30
2. 从近期高点回调 2-20%
3. 回调期间缩量(成交量 < 上涨期的 80%
4. 接近 MA10/30/60 支撑位±10%以内)
5. 近期再度放量最近2天 > 回调期的 1.2倍)
"""
import pandas as pd
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from app.utils.logger import logger
class PullbackStockSelector:
"""龙回头选股器"""
def __init__(self):
"""初始化选股器"""
try:
import tushare as ts
self.ts = ts
# 从配置获取 token
from app.config import get_settings
self.settings = get_settings()
self.pro = ts.pro_api(self.settings.tushare_token)
logger.info("龙回头选股器初始化成功")
except ImportError:
logger.error("tushare 未安装")
raise
except Exception as e:
logger.error(f"龙回头选股器初始化失败: {e}")
raise
def get_hot_concepts(self, limit: int = 10) -> List[tuple]:
"""
从 Tushare 获取热门概念板块
使用 ths_index 接口获取同花顺概念板块列表
Args:
limit: 返回板块数量
Returns:
[(概念代码, 概念名称), ...]
"""
try:
# 使用 ths_index 获取同花顺概念板块
index_df = self.pro.ths_index(
market='A', # A股市场
fields='ts_code,name'
)
if index_df.empty:
logger.warning("未能获取概念板块列表,使用备用列表")
return self._get_fallback_sectors()
# 筛选科技相关的热门概念(关键词匹配)
tech_keywords = ['人工智能', '芯片', '半导体', '新能源', '汽车', '云计算',
'网络安全', '软件', '数字', '5G', '锂电', '光伏', 'AI', '科技']
filtered = []
for _, row in index_df.iterrows():
concept_name = row['name']
concept_code = row['ts_code']
# 检查是否包含关键词
for keyword in tech_keywords:
if keyword in concept_name:
filtered.append((concept_code, concept_name))
break
if len(filtered) >= limit * 2: # 多获取一些,后续筛选
break
return filtered[:limit] if filtered else self._get_fallback_sectors()
except Exception as e:
logger.warning(f"获取热门概念失败: {e},使用备用列表")
return self._get_fallback_sectors()
def _get_fallback_sectors(self) -> List[tuple]:
"""备用板块列表(使用已验证的板块代码)"""
return [
('884031.TI', '人工智能'),
('884065.TI', '新能源汽车'),
('884039.TI', '云计算'),
('884145.TI', '国产软件'),
('884192.TI', '5G概念'),
]
def select_from_sector(self, sector_code: str, sector_name: str, max_stocks: int = 5) -> List[Dict[str, Any]]:
"""
从指定板块选出龙回头股票
Args:
sector_code: 板块代码
sector_name: 板块名称
max_stocks: 最多返回股票数
Returns:
符合条件的股票列表
"""
try:
# 1. 获取板块成分股
members_df = self.pro.ths_member(
ts_code=sector_code,
fields='ts_code,name,con_code,name'
)
if members_df.empty:
logger.warning(f"板块 {sector_name}({sector_code}) 无成分股数据,可能是板块代码不正确或该板块已下线")
return []
stock_codes = members_df['con_code'].tolist()
logger.info(f"板块 {sector_name}{len(stock_codes)} 只成分股")
# 2. 逐个检查股票
selected_stocks = []
for i, stock_code in enumerate(stock_codes[:50]): # 最多检查前50只
result = self._check_stock(stock_code)
if result:
result['sector_name'] = sector_name
selected_stocks.append(result)
logger.info(f" ✓ 找到: {result['name']}({stock_code}) - 回踩 {result['pullback_pct']:.2f}%")
if len(selected_stocks) >= max_stocks:
break
return selected_stocks
except Exception as e:
logger.error(f"从板块选股失败 {sector_name}({sector_code}): {e}")
return []
def _check_stock(self, stock_code: str) -> Optional[Dict[str, Any]]:
"""
检查单只股票是否符合量价配合龙回头条件
【新策略】量价配合龙回头:
1. MA 多头排列MA5 > MA10 > MA30上涨趋势
2. 从近期高点回调(寻找龙回头机会)
3. 回调期间缩量(成交量明显萎缩,主力未离场)
4. 接近 MA10/30/60 支撑位
5. 近期有再度放量迹象(资金重新入场)
Returns:
符合条件返回股票信息,否则返回 None
"""
try:
# 获取最近150天的日线数据
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=150)).strftime('%Y%m%d')
df = self.pro.daily(
ts_code=stock_code,
start_date=start_date,
end_date=end_date
)
if df.empty or len(df) < 70:
return None
df = df.sort_values('trade_date').reset_index(drop=True)
df = df.tail(90).reset_index(drop=True) # 取最近90天
close = df['close']
volume = df['vol'] # 成交量(手)
# ========== 1. 计算 MA 均线 ==========
ma5 = close.rolling(window=5).mean()
ma10 = close.rolling(window=10).mean()
ma30 = close.rolling(window=30).mean()
ma60 = close.rolling(window=60).mean()
latest = df.iloc[-1]
latest_close = latest['close']
latest_ma5 = ma5.iloc[-1]
latest_ma10 = ma10.iloc[-1]
latest_ma30 = ma30.iloc[-1]
latest_ma60 = ma60.iloc[-1]
# 条件1MA 多头排列 MA5 > MA10 > MA30
if not (latest_ma5 > latest_ma10 > latest_ma30):
logger.debug(f"{stock_code}: MA非多头排列 (MA5:{latest_ma5:.2f}, MA10:{latest_ma10:.2f}, MA30:{latest_ma30:.2f})")
return None
# ========== 2. 找近期高点最近20天内最高价 ==========
lookback_high = 20
recent_df = df.tail(lookback_high).reset_index(drop=True)
high_idx_in_recent = recent_df['close'].idxmax()
high_price = recent_df.loc[high_idx_in_recent, 'close']
high_idx_in_df = df.index[-lookback_high] + high_idx_in_recent
# 计算从高点的回踩幅度
pullback_pct = (high_price - latest_close) / high_price * 100
# 条件2回踩幅度 2-20%(放宽)
if not (2 <= pullback_pct <= 20):
logger.debug(f"{stock_code}: 回踩幅度不符合 (回踩:{pullback_pct:.2f}%, 需要2-20%)")
return None
# ========== 3. 量能形态分析:放量上涨→缩量回调→再度放量 ==========
# 上涨期间的成交量(从低点到高点)
rise_period = df.loc[:high_idx_in_df]
rise_volume_avg = rise_period['vol'].mean()
# 回调期间的成交量从高点到现在最近5天
pullback_start_idx = min(high_idx_in_df + 1, len(df) - 1)
pullback_period = df.loc[pullback_start_idx:][-5:] # 回调期间最近5天
pullback_volume_avg = pullback_period['vol'].mean()
# 条件3回调期间缩量成交量 < 上涨期间的 80%,放宽)
volume_shrink_ratio = pullback_volume_avg / rise_volume_avg if rise_volume_avg > 0 else 1
if volume_shrink_ratio >= 0.8:
logger.debug(f"{stock_code}: 回调未缩量 (缩量比:{volume_shrink_ratio:.2%}, 需要<80%)")
return None
# 条件4最近2天再度放量比回调期间平均成交量增加 20%+,放宽)
recent_2_days_volume = df.tail(2)['vol'].mean()
if recent_2_days_volume < pullback_volume_avg * 1.2:
logger.debug(f"{stock_code}: 未再度放量 (最近2天/回调期:{recent_2_days_volume/pullback_volume_avg:.2f}, 需要>1.2)")
return None
# ========== 4. 接近 MA 支撑位 ==========
ma10_diff = abs(latest_close - latest_ma10) / latest_ma10 * 100
ma30_diff = abs(latest_close - latest_ma30) / latest_ma30 * 100
ma60_diff = abs(latest_close - latest_ma60) / latest_ma60 * 100
# 接近任意一条 MA 线±10%以内,放宽)
near_ma = (ma10_diff <= 10 or ma30_diff <= 10 or ma60_diff <= 10)
# 未跌破 MA60允许跌破 10%,放宽)
above_ma60 = latest_close >= latest_ma60 * 0.90
if not (near_ma and above_ma60):
logger.debug(f"{stock_code}: 未接近MA支撑或已跌破MA60 (MA10差:{ma10_diff:.2f}%, MA30:{ma30_diff:.2f}%, MA60:{ma60_diff:.2f}%)")
return None
# ========== 5. 计算前期涨幅 ==========
rise_data = df.loc[:high_idx_in_df]
low_price = rise_data['low'].min()
rise_pct = (high_price - low_price) / low_price * 100
# 涨幅需 > 10%
if rise_pct < 10:
logger.debug(f"{stock_code}: 涨幅不足 (涨幅:{rise_pct:.2f}%, 需要>10%)")
return None
# 获取股票名称
stock_info = self.pro.stock_basic(ts_code=stock_code, fields='ts_code,name')
stock_name = stock_info.iloc[0]['name'] if not stock_info.empty else stock_code
return {
'ts_code': stock_code,
'name': stock_name,
'close': latest_close,
'high': high_price,
'rise_pct': rise_pct,
'pullback_pct': pullback_pct,
'ma5': latest_ma5,
'ma10': latest_ma10,
'ma30': latest_ma30,
'ma60': latest_ma60,
'volume_shrink_ratio': volume_shrink_ratio,
'recent_volume_ratio': recent_2_days_volume / pullback_volume_avg,
'trade_date': latest['trade_date']
}
except Exception as e:
logger.debug(f"检查股票 {stock_code} 失败: {e}")
return None
def select_from_hot_sectors(self, top_n: int = 5) -> Dict[str, List[Dict[str, Any]]]:
"""
从热门板块选股
Args:
top_n: 选择前N个热门板块
Returns:
{板块名称: [股票列表]}
"""
try:
# 使用 Tushare API 动态获取热门板块
hot_sectors = self.get_hot_concepts(limit=top_n * 2) # 多获取一些以备筛选
logger.info(f"开始龙回头选股,共 {len(hot_sectors)} 个热门板块")
results = {}
checked_count = 0
skipped_count = 0
for sector_code, sector_name in hot_sectors:
if checked_count >= top_n:
break
logger.info(f"检查板块: {sector_name}")
selected = self.select_from_sector(sector_code, sector_name, max_stocks=2)
if selected:
results[sector_name] = selected
logger.info(f" ✓ 板块 {sector_name} 选出 {len(selected)}")
checked_count += 1
else:
logger.info(f" - 板块 {sector_name} 未选出")
skipped_count += 1
# 跳过无数据的板块,继续检查下一个
if skipped_count >= 5: # 如果连续5个板块都没有数据就停止
logger.warning("连续多个板块无数据,停止检查")
break
return results
except Exception as e:
logger.error(f"从热门板块选股失败: {e}")
import traceback
logger.error(traceback.format_exc())
return {}
def format_result(self, results: Dict[str, List[Dict[str, Any]]]) -> str:
"""
格式化选股结果
Args:
results: 选股结果
Returns:
格式化的文本
"""
if not results:
return "今日未选出符合条件的龙回头股票"
lines = [
"📊 **龙回头选股结果**",
f"",
f"选股时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"",
]
total_stocks = 0
for sector_name, stocks in results.items():
lines.append(f"**🏢 {sector_name}**")
for stock in stocks:
total_stocks += 1
lines.append(f"{stock['name']}({stock['ts_code']})")
lines.append(f" 现价: ¥{stock['close']:.2f} | 回踩: {stock['pullback_pct']:.2f}% | 涨幅: {stock['rise_pct']:.2f}%")
lines.append(f" MA5: ¥{stock['ma5']:.2f} | MA10: ¥{stock['ma10']:.2f} | MA30: ¥{stock['ma30']:.2f} | MA60: ¥{stock['ma60']:.2f}")
lines.append(f" 缩量比: {stock['volume_shrink_ratio']:.0%} | 再放量: {stock['recent_volume_ratio']:.2f}x")
lines.append("")
lines.append(f"**共选出 {total_stocks} 只股票**")
lines.append("")
lines.append("*⚠️ 仅供参考,不构成投资建议*")
return "\n".join(lines)
# 全局单例
_pullback_selector: Optional[PullbackStockSelector] = None
def get_pullback_selector() -> PullbackStockSelector:
"""获取龙回头选股器单例"""
global _pullback_selector
if _pullback_selector is None:
_pullback_selector = PullbackStockSelector()
return _pullback_selector