101 lines
3.1 KiB
Python
101 lines
3.1 KiB
Python
"""
|
||
测试A股异动监控 - 使用涨停板数据作为替代方案
|
||
当 eastmoney API 不可用时,使用涨停板数据分析
|
||
"""
|
||
import asyncio
|
||
import sys
|
||
import os
|
||
|
||
# 添加项目根目录到 Python 路径
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
import akshare as ak
|
||
import pandas as pd
|
||
from app.utils.logger import logger
|
||
|
||
|
||
def test_limit_up_approach():
|
||
"""使用涨停板数据发现异动板块"""
|
||
print("=" * 60)
|
||
print("涨停板数据分析测试")
|
||
print("=" * 60)
|
||
print()
|
||
|
||
try:
|
||
# 获取涨停板数据
|
||
print("获取涨停板数据...")
|
||
df = ak.stock_zt_pool_em(date='20260227')
|
||
|
||
if df.empty:
|
||
print("没有涨停数据")
|
||
return
|
||
|
||
print(f"获取到 {len(df)} 只涨停股")
|
||
print()
|
||
|
||
# 按行业分组统计
|
||
print("-" * 60)
|
||
print("按行业统计涨停股数:")
|
||
print("-" * 60)
|
||
|
||
# 获取行业列
|
||
industry_col = '所属行业'
|
||
if industry_col not in df.columns:
|
||
# 尝试其他可能的列名
|
||
for col in df.columns:
|
||
if '行业' in col or '板块' in col:
|
||
industry_col = col
|
||
break
|
||
|
||
if industry_col in df.columns:
|
||
# 按行业分组
|
||
industry_stats = df.groupby(industry_col).agg({
|
||
'代码': 'count',
|
||
'涨跌幅': 'mean',
|
||
'最新价': 'mean'
|
||
}).rename(columns={'代码': '涨停数', '涨跌幅': '平均涨幅', '最新价': '平均价格'})
|
||
|
||
# 排序
|
||
industry_stats = industry_stats.sort_values('涨停数', ascending=False)
|
||
|
||
# 显示Top 10
|
||
for idx, (industry, row) in enumerate(industry_stats.head(10).iterrows(), 1):
|
||
print(f"{idx}. {industry}: {int(row['涨停数'])}只涨停, 平均涨幅 {row['平均涨幅']:.2f}%")
|
||
|
||
print()
|
||
print("-" * 60)
|
||
print("涨停股详情 (Top 5):")
|
||
print("-" * 60)
|
||
|
||
# 显示涨幅最大的5只
|
||
top_5 = df.nlargest(5, '涨跌幅')
|
||
for idx, row in top_5.iterrows():
|
||
print(f"{int(row['序号'])}. {row['名称']} ({row['代码']})")
|
||
print(f" 涨幅: {row['涨跌幅']:.2f}% | 价格: {row['最新价']:.2f}")
|
||
print(f" 行业: {row.get(industry_col, 'N/A')}")
|
||
print(f" 封板时间: {row.get('最后封板时间', 'N/A')}")
|
||
print()
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"测试失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
|
||
if __name__ == "__main__":
|
||
success = test_limit_up_approach()
|
||
if success:
|
||
print("=" * 60)
|
||
print("测试成功!涨停板API可用")
|
||
print("=" * 60)
|
||
print()
|
||
print("建议: 可以使用涨停板数据作为板块异动监控的替代方案")
|
||
print(" - 按行业统计涨停股数")
|
||
print(" - 发现涨停集中的板块即为异动板块")
|
||
print(" - 涨停股本身就是最好的龙头股候选")
|
||
else:
|
||
print("测试失败")
|