trading.ai/test_strong_sectors_advanced.py
2025-09-23 16:12:18 +08:00

313 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
高级强势板块筛选器
筛选条件:
1. 本周收阳(周涨幅>0
2. 周线级别创阶段新高20周新高
3. 成交额巨大超过1000亿
"""
import sys
from pathlib import Path
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# 添加项目根目录到路径
current_dir = Path(__file__).parent
sys.path.insert(0, str(current_dir))
from src.data.tushare_fetcher import TushareFetcher
from loguru import logger
def get_trading_dates(days_back=100):
"""获取过去N个交易日"""
dates = []
current = datetime.now()
while len(dates) < days_back:
if current.weekday() < 5: # 周一到周五
dates.append(current.strftime('%Y%m%d'))
current -= timedelta(days=1)
return sorted(dates) # 升序返回
def filter_index_concepts(ths_concepts):
"""过滤掉指数型板块"""
index_keywords = [
'成份股', '样本股', '成分股', '50', '100', '300', '500', '1000',
'上证', '深证', '中证', '创业板', '科创板', '北证',
'ETF', '指数', 'Index', '基准'
]
def is_concept_plate(name: str) -> bool:
name_lower = name.lower()
for keyword in index_keywords:
if keyword.lower() in name_lower:
return False
return True
original_count = len(ths_concepts)
filtered_concepts = ths_concepts[ths_concepts['name'].apply(is_concept_plate)]
filtered_count = len(filtered_concepts)
logger.info(f"过滤指数型板块: {original_count} -> {filtered_count}")
return filtered_concepts
def analyze_concept_strength(fetcher: TushareFetcher, concept_info, trading_dates):
"""分析单个概念的强势特征"""
ts_code = concept_info['ts_code']
name = concept_info['name']
try:
# 获取过去20周的数据约100个交易日
start_date = trading_dates[0]
end_date = trading_dates[-1]
daily_data = fetcher.pro.ths_daily(
ts_code=ts_code,
start_date=start_date,
end_date=end_date
)
if daily_data.empty or len(daily_data) < 20:
return None
# 按日期排序
daily_data = daily_data.sort_values('trade_date')
daily_data.reset_index(drop=True, inplace=True)
# 1. 计算本周涨幅最近5个交易日
recent_data = daily_data.tail(5)
if len(recent_data) < 2:
return None
week_start_close = recent_data.iloc[0]['close']
week_end_close = recent_data.iloc[-1]['close']
week_change = (week_end_close - week_start_close) / week_start_close * 100
# 2. 检查是否本周收阳
is_weekly_positive = week_change > 0
# 3. 计算20周新高约100个交易日
current_close = daily_data.iloc[-1]['close']
past_20weeks_high = daily_data['high'].max()
is_20week_high = current_close >= past_20weeks_high * 0.99 # 允许1%的误差
# 4. 计算成交额最近5日平均
recent_turnover = recent_data['vol'].mean() * recent_data['close'].mean() # 简化计算
turnover_100yi = recent_turnover / 100000000 # 转换为亿元
# 5. 计算技术指标
# RSI相对强弱指数
rsi = calculate_rsi(daily_data['close'].values)
# 20日均线趋势
ma20 = daily_data['close'].rolling(20).mean()
ma20_trend = (ma20.iloc[-1] - ma20.iloc[-10]) / ma20.iloc[-10] * 100 if len(ma20) >= 20 else 0
# 波动率
volatility = daily_data['pct_change'].std() * np.sqrt(250) # 年化波动率
return {
'ts_code': ts_code,
'name': name,
'week_change': week_change,
'is_weekly_positive': is_weekly_positive,
'is_20week_high': is_20week_high,
'avg_turnover_yi': turnover_100yi,
'current_close': current_close,
'rsi': rsi,
'ma20_trend': ma20_trend,
'volatility': volatility,
'data_length': len(daily_data)
}
except Exception as e:
logger.debug(f"分析 {name} 失败: {e}")
return None
def calculate_rsi(prices, period=14):
"""计算RSI指标"""
try:
delta = np.diff(prices)
gain = np.where(delta > 0, delta, 0)
loss = np.where(delta < 0, -delta, 0)
avg_gain = np.mean(gain[-period:]) if len(gain) >= period else 0
avg_loss = np.mean(loss[-period:]) if len(loss) >= period else 0
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
except:
return 50 # 默认值
def find_strong_sectors(fetcher: TushareFetcher):
"""寻找强势板块"""
try:
logger.info("🔍 开始寻找强势板块...")
# 获取交易日期
trading_dates = get_trading_dates(100)
logger.info(f"分析周期: {trading_dates[0]}{trading_dates[-1]} (100个交易日)")
# 获取同花顺概念列表
ths_concepts = fetcher.pro.ths_index(exchange='A', type='N')
if ths_concepts.empty:
logger.error("未获取到概念数据")
return
# 过滤指数型概念
ths_concepts = filter_index_concepts(ths_concepts)
# 分析概念强度
strong_concepts = []
total_concepts = min(80, len(ths_concepts)) # 分析前80个概念
logger.info(f"分析 {total_concepts} 个概念...")
for i, (_, concept) in enumerate(ths_concepts.head(total_concepts).iterrows()):
if i % 10 == 0:
logger.info(f"进度: {i+1}/{total_concepts}")
result = analyze_concept_strength(fetcher, concept, trading_dates)
if result:
strong_concepts.append(result)
if not strong_concepts:
logger.warning("未找到符合条件的强势板块")
return
# 转换为DataFrame
df = pd.DataFrame(strong_concepts)
# 强势板块筛选
logger.info("🚀 应用强势板块筛选条件...")
# 条件1本周收阳
weekly_positive = df[df['is_weekly_positive']]
logger.info(f"本周收阳概念: {len(weekly_positive)}")
# 条件220周新高
new_high_concepts = df[df['is_20week_high']]
logger.info(f"20周新高概念: {len(new_high_concepts)}")
# 条件3成交额超过1000亿这里设置为10亿因为单个概念1000亿太高
high_turnover = df[df['avg_turnover_yi'] >= 10]
logger.info(f"成交额超过10亿概念: {len(high_turnover)}")
# 综合强势板块满足至少2个条件
df['strength_score'] = (
df['is_weekly_positive'].astype(int) +
df['is_20week_high'].astype(int) +
(df['avg_turnover_yi'] >= 10).astype(int)
)
# 按强势得分和周涨幅排序
strong_sectors = df[df['strength_score'] >= 2].sort_values(['strength_score', 'week_change'], ascending=[False, False])
# 显示结果
display_strong_sectors(df, strong_sectors, weekly_positive, new_high_concepts, high_turnover)
return df
except Exception as e:
logger.error(f"寻找强势板块失败: {e}")
def display_strong_sectors(df, strong_sectors, weekly_positive, new_high_concepts, high_turnover):
"""显示强势板块分析结果"""
print("\n" + "="*100)
print("🔍 强势板块综合分析报告")
print("="*100)
# 1. 综合强势板块(满足多个条件)
if not strong_sectors.empty:
print(f"\n🚀 综合强势板块TOP10满足2+条件):")
print(f"{'排名':<4} {'概念名称':<25} {'周涨幅':<10} {'强势分':<8} {'RSI':<8} {'成交额(亿)':<12} {'条件':<20}")
print("-" * 100)
for i, (_, concept) in enumerate(strong_sectors.head(10).iterrows()):
rank = i + 1
name = concept['name'][:23] + '..' if len(concept['name']) > 23 else concept['name']
week_chg = f"{concept['week_change']:+.2f}%"
score = f"{concept['strength_score']}/3"
rsi = f"{concept['rsi']:.1f}"
turnover = f"{concept['avg_turnover_yi']:.1f}"
conditions = []
if concept['is_weekly_positive']:
conditions.append("周阳")
if concept['is_20week_high']:
conditions.append("新高")
if concept['avg_turnover_yi'] >= 10:
conditions.append("大额")
condition_str = "+".join(conditions)
print(f"{rank:<4} {name:<25} {week_chg:<10} {score:<8} {rsi:<8} {turnover:<12} {condition_str:<20}")
# 2. 分类展示
print(f"\n📊 分类统计:")
print(f" 本周收阳: {len(weekly_positive)}")
print(f" 20周新高: {len(new_high_concepts)}")
print(f" 大成交额: {len(high_turnover)}")
print(f" 综合强势: {len(strong_sectors)}")
# 3. 本周收阳TOP10
if not weekly_positive.empty:
top_weekly = weekly_positive.sort_values('week_change', ascending=False)
print(f"\n📈 本周收阳TOP10:")
for i, (_, concept) in enumerate(top_weekly.head(10).iterrows()):
print(f" {i+1:2d}. {concept['name']}: {concept['week_change']:+.2f}%")
# 4. 20周新高概念
if not new_high_concepts.empty:
print(f"\n🎯 20周新高概念TOP10:")
new_high_sorted = new_high_concepts.sort_values('week_change', ascending=False)
for i, (_, concept) in enumerate(new_high_sorted.head(10).iterrows()):
print(f" {i+1:2d}. {concept['name']}: {concept['week_change']:+.2f}% (RSI: {concept['rsi']:.1f})")
# 5. 大成交额概念
if not high_turnover.empty:
print(f"\n💰 大成交额概念TOP10:")
turnover_sorted = high_turnover.sort_values('avg_turnover_yi', ascending=False)
for i, (_, concept) in enumerate(turnover_sorted.head(10).iterrows()):
print(f" {i+1:2d}. {concept['name']}: {concept['avg_turnover_yi']:.1f}亿 ({concept['week_change']:+.2f}%)")
# 6. 技术面强势
strong_tech = df[(df['rsi'] > 60) & (df['ma20_trend'] > 0)].sort_values('week_change', ascending=False)
if not strong_tech.empty:
print(f"\n📊 技术面强势概念TOP10:")
for i, (_, concept) in enumerate(strong_tech.head(10).iterrows()):
print(f" {i+1:2d}. {concept['name']}: RSI {concept['rsi']:.1f}, MA20趋势 {concept['ma20_trend']:+.2f}%")
def main():
"""主函数"""
logger.info("🚀 开始高级强势板块分析...")
# 初始化Tushare数据获取器
token = "0ed6419a00d8923dc19c0b58fc92d94c9a0696949ab91a13aa58a0cc"
fetcher = TushareFetcher(token=token)
# 寻找强势板块
result_df = find_strong_sectors(fetcher)
if result_df is not None:
logger.info("✅ 强势板块分析完成!")
else:
logger.error("❌ 强势板块分析失败!")
if __name__ == "__main__":
main()