stock-ai-agent/backend/test_limit_up_approach.py
2026-02-27 09:54:17 +08:00

101 lines
3.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
测试A股异动监控 - 使用涨停板数据作为替代方案
当 eastmoney API 不可用时,使用涨停板数据分析
"""
import asyncio
import sys
import os
# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import akshare as ak
import pandas as pd
from app.utils.logger import logger
def test_limit_up_approach():
"""使用涨停板数据发现异动板块"""
print("=" * 60)
print("涨停板数据分析测试")
print("=" * 60)
print()
try:
# 获取涨停板数据
print("获取涨停板数据...")
df = ak.stock_zt_pool_em(date='20260227')
if df.empty:
print("没有涨停数据")
return
print(f"获取到 {len(df)} 只涨停股")
print()
# 按行业分组统计
print("-" * 60)
print("按行业统计涨停股数:")
print("-" * 60)
# 获取行业列
industry_col = '所属行业'
if industry_col not in df.columns:
# 尝试其他可能的列名
for col in df.columns:
if '行业' in col or '板块' in col:
industry_col = col
break
if industry_col in df.columns:
# 按行业分组
industry_stats = df.groupby(industry_col).agg({
'代码': 'count',
'涨跌幅': 'mean',
'最新价': 'mean'
}).rename(columns={'代码': '涨停数', '涨跌幅': '平均涨幅', '最新价': '平均价格'})
# 排序
industry_stats = industry_stats.sort_values('涨停数', ascending=False)
# 显示Top 10
for idx, (industry, row) in enumerate(industry_stats.head(10).iterrows(), 1):
print(f"{idx}. {industry}: {int(row['涨停数'])}只涨停, 平均涨幅 {row['平均涨幅']:.2f}%")
print()
print("-" * 60)
print("涨停股详情 (Top 5):")
print("-" * 60)
# 显示涨幅最大的5只
top_5 = df.nlargest(5, '涨跌幅')
for idx, row in top_5.iterrows():
print(f"{int(row['序号'])}. {row['名称']} ({row['代码']})")
print(f" 涨幅: {row['涨跌幅']:.2f}% | 价格: {row['最新价']:.2f}")
print(f" 行业: {row.get(industry_col, 'N/A')}")
print(f" 封板时间: {row.get('最后封板时间', 'N/A')}")
print()
return True
except Exception as e:
print(f"测试失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_limit_up_approach()
if success:
print("=" * 60)
print("测试成功涨停板API可用")
print("=" * 60)
print()
print("建议: 可以使用涨停板数据作为板块异动监控的替代方案")
print(" - 按行业统计涨停股数")
print(" - 发现涨停集中的板块即为异动板块")
print(" - 涨停股本身就是最好的龙头股候选")
else:
print("测试失败")